Skip to content

Commit

Permalink
Merge remote-tracking branch 'apache/main' into alamb/doc_extended
Browse files Browse the repository at this point in the history
  • Loading branch information
alamb committed Jan 29, 2025
2 parents dfab8d2 + a93b4de commit 93f1658
Show file tree
Hide file tree
Showing 21 changed files with 397 additions and 247 deletions.
144 changes: 88 additions & 56 deletions datafusion-cli/Cargo.lock

Large diffs are not rendered by default.

4 changes: 2 additions & 2 deletions datafusion-cli/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ aws-config = "=1.5.10"
## 1.53.0 requires a higher MSRV 1.81 so lock until DataFusion MSRV catches up
aws-sdk-sso = "=1.50.0"
## 1.54.0 requires a higher MSRV 1.81 so lock until DataFusion MSRV catches up
aws-sdk-ssooidc = "=1.51.0"
aws-sdk-ssooidc = "=1.57.1"
## 1.54.1 requires a higher MSRV 1.81 so lock until DataFusion MSRV catches up
aws-sdk-sts = "=1.51.0"
# end pin aws-sdk crates
Expand All @@ -58,7 +58,7 @@ dirs = "6.0.0"
env_logger = "0.11"
futures = "0.3"
# pin as home 0.5.11 has MSRV 1.81. Can remove this once we bump MSRV to 1.81
home = "=0.5.9"
home = "=0.5.11"
mimalloc = { version = "0.1", default-features = false }
object_store = { version = "0.11.0", features = ["aws", "gcp", "http"] }
parking_lot = { version = "0.12" }
Expand Down
2 changes: 1 addition & 1 deletion datafusion-cli/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
# specific language governing permissions and limitations
# under the License.

FROM rust:1.80-bookworm AS builder
FROM rust:bookworm AS builder

COPY . /usr/src/datafusion
COPY ./datafusion /usr/src/datafusion/datafusion
Expand Down
2 changes: 2 additions & 0 deletions datafusion/catalog/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,12 @@ version.workspace = true
[dependencies]
arrow-schema = { workspace = true }
async-trait = { workspace = true }
dashmap = { workspace = true }
datafusion-common = { workspace = true }
datafusion-execution = { workspace = true }
datafusion-expr = { workspace = true }
datafusion-physical-plan = { workspace = true }
itertools = { workspace = true }
parking_lot = { workspace = true }

[dev-dependencies]
Expand Down
10 changes: 10 additions & 0 deletions datafusion/catalog/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,16 @@
// specific language governing permissions and limitations
// under the License.

//! Interfaces and default implementations of catalogs and schemas.
//!
//! Implementations
//! * Simple memory based catalog: [`MemoryCatalogProviderList`], [`MemoryCatalogProvider`], [`MemorySchemaProvider`]
pub mod memory;
pub use memory::{
MemoryCatalogProvider, MemoryCatalogProviderList, MemorySchemaProvider,
};

mod r#async;
mod catalog;
mod dynamic_file;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,7 @@
//! [`MemoryCatalogProvider`], [`MemoryCatalogProviderList`]: In-memory
//! implementations of [`CatalogProviderList`] and [`CatalogProvider`].
use crate::catalog::{
CatalogProvider, CatalogProviderList, SchemaProvider, TableProvider,
};
use crate::{CatalogProvider, CatalogProviderList, SchemaProvider, TableProvider};
use async_trait::async_trait;
use dashmap::DashMap;
use datafusion_common::{exec_err, DataFusionError};
Expand Down Expand Up @@ -200,156 +198,3 @@ impl SchemaProvider for MemorySchemaProvider {
self.tables.contains_key(name)
}
}

#[cfg(test)]
mod test {
use super::*;
use crate::catalog::CatalogProvider;
use crate::catalog_common::memory::MemorySchemaProvider;
use crate::datasource::empty::EmptyTable;
use crate::datasource::listing::{ListingTable, ListingTableConfig, ListingTableUrl};
use crate::prelude::SessionContext;
use arrow_schema::Schema;
use datafusion_common::assert_batches_eq;
use std::any::Any;
use std::sync::Arc;

#[test]
fn memory_catalog_dereg_nonempty_schema() {
let cat = Arc::new(MemoryCatalogProvider::new()) as Arc<dyn CatalogProvider>;

let schema = Arc::new(MemorySchemaProvider::new()) as Arc<dyn SchemaProvider>;
let test_table = Arc::new(EmptyTable::new(Arc::new(Schema::empty())))
as Arc<dyn TableProvider>;
schema.register_table("t".into(), test_table).unwrap();

cat.register_schema("foo", schema.clone()).unwrap();

assert!(
cat.deregister_schema("foo", false).is_err(),
"dropping empty schema without cascade should error"
);
assert!(cat.deregister_schema("foo", true).unwrap().is_some());
}

#[test]
fn memory_catalog_dereg_empty_schema() {
let cat = Arc::new(MemoryCatalogProvider::new()) as Arc<dyn CatalogProvider>;

let schema = Arc::new(MemorySchemaProvider::new()) as Arc<dyn SchemaProvider>;
cat.register_schema("foo", schema).unwrap();

assert!(cat.deregister_schema("foo", false).unwrap().is_some());
}

#[test]
fn memory_catalog_dereg_missing() {
let cat = Arc::new(MemoryCatalogProvider::new()) as Arc<dyn CatalogProvider>;
assert!(cat.deregister_schema("foo", false).unwrap().is_none());
}

#[test]
fn default_register_schema_not_supported() {
// mimic a new CatalogProvider and ensure it does not support registering schemas
#[derive(Debug)]
struct TestProvider {}
impl CatalogProvider for TestProvider {
fn as_any(&self) -> &dyn Any {
self
}

fn schema_names(&self) -> Vec<String> {
unimplemented!()
}

fn schema(&self, _name: &str) -> Option<Arc<dyn SchemaProvider>> {
unimplemented!()
}
}

let schema = Arc::new(MemorySchemaProvider::new()) as Arc<dyn SchemaProvider>;
let catalog = Arc::new(TestProvider {});

match catalog.register_schema("foo", schema) {
Ok(_) => panic!("unexpected OK"),
Err(e) => assert_eq!(e.strip_backtrace(), "This feature is not implemented: Registering new schemas is not supported"),
};
}

#[tokio::test]
async fn test_mem_provider() {
let provider = MemorySchemaProvider::new();
let table_name = "test_table_exist";
assert!(!provider.table_exist(table_name));
assert!(provider.deregister_table(table_name).unwrap().is_none());
let test_table = EmptyTable::new(Arc::new(Schema::empty()));
// register table successfully
assert!(provider
.register_table(table_name.to_string(), Arc::new(test_table))
.unwrap()
.is_none());
assert!(provider.table_exist(table_name));
let other_table = EmptyTable::new(Arc::new(Schema::empty()));
let result =
provider.register_table(table_name.to_string(), Arc::new(other_table));
assert!(result.is_err());
}

#[tokio::test]
async fn test_schema_register_listing_table() {
let testdata = crate::test_util::parquet_test_data();
let testdir = if testdata.starts_with('/') {
format!("file://{testdata}")
} else {
format!("file:///{testdata}")
};
let filename = if testdir.ends_with('/') {
format!("{}{}", testdir, "alltypes_plain.parquet")
} else {
format!("{}/{}", testdir, "alltypes_plain.parquet")
};

let table_path = ListingTableUrl::parse(filename).unwrap();

let catalog = MemoryCatalogProvider::new();
let schema = MemorySchemaProvider::new();

let ctx = SessionContext::new();

let config = ListingTableConfig::new(table_path)
.infer(&ctx.state())
.await
.unwrap();
let table = ListingTable::try_new(config).unwrap();

schema
.register_table("alltypes_plain".to_string(), Arc::new(table))
.unwrap();

catalog.register_schema("active", Arc::new(schema)).unwrap();
ctx.register_catalog("cat", Arc::new(catalog));

let df = ctx
.sql("SELECT id, bool_col FROM cat.active.alltypes_plain")
.await
.unwrap();

let actual = df.collect().await.unwrap();

let expected = [
"+----+----------+",
"| id | bool_col |",
"+----+----------+",
"| 4 | true |",
"| 5 | false |",
"| 6 | true |",
"| 7 | false |",
"| 2 | true |",
"| 3 | false |",
"| 0 | true |",
"| 1 | false |",
"+----+----------+",
];
assert_batches_eq!(expected, &actual);
}
}
1 change: 0 additions & 1 deletion datafusion/core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,6 @@ async-trait = { workspace = true }
bytes = { workspace = true }
bzip2 = { version = "0.5.0", optional = true }
chrono = { workspace = true }
dashmap = { workspace = true }
datafusion-catalog = { workspace = true }
datafusion-common = { workspace = true, features = ["object_store"] }
datafusion-common-runtime = { workspace = true }
Expand Down
6 changes: 0 additions & 6 deletions datafusion/core/src/catalog_common/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,18 +18,12 @@
//! Interfaces and default implementations of catalogs and schemas.
//!
//! Implementations
//! * Simple memory based catalog: [`MemoryCatalogProviderList`], [`MemoryCatalogProvider`], [`MemorySchemaProvider`]
//! * Information schema: [`information_schema`]
//! * Listing schema: [`listing_schema`]
pub mod information_schema;
pub mod listing_schema;
pub mod memory;

pub use crate::catalog::{CatalogProvider, CatalogProviderList, SchemaProvider};
pub use memory::{
MemoryCatalogProvider, MemoryCatalogProviderList, MemorySchemaProvider,
};

pub use datafusion_sql::{ResolvedTableReference, TableReference};

Expand Down
4 changes: 2 additions & 2 deletions datafusion/core/src/execution/context/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

//! [`SessionContext`] API for registering data sources and executing queries
use datafusion_catalog::memory::MemorySchemaProvider;
use datafusion_catalog::MemoryCatalogProvider;
use std::collections::HashSet;
use std::fmt::Debug;
use std::sync::{Arc, Weak};
Expand All @@ -27,8 +29,6 @@ use crate::{
CatalogProvider, CatalogProviderList, TableProvider, TableProviderFactory,
},
catalog_common::listing_schema::ListingSchemaProvider,
catalog_common::memory::MemorySchemaProvider,
catalog_common::MemoryCatalogProvider,
dataframe::DataFrame,
datasource::listing::{
ListingOptions, ListingTable, ListingTableConfig, ListingTableUrl,
Expand Down
4 changes: 2 additions & 2 deletions datafusion/core/src/execution/session_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,13 @@ use crate::catalog::{CatalogProviderList, SchemaProvider, TableProviderFactory};
use crate::catalog_common::information_schema::{
InformationSchemaProvider, INFORMATION_SCHEMA,
};
use crate::catalog_common::MemoryCatalogProviderList;
use crate::datasource::cte_worktable::CteWorkTable;
use crate::datasource::file_format::{format_as_file_type, FileFormatFactory};
use crate::datasource::provider_as_source;
use crate::execution::context::{EmptySerializerRegistry, FunctionFactory, QueryPlanner};
use crate::execution::SessionStateDefaults;
use crate::physical_planner::{DefaultPhysicalPlanner, PhysicalPlanner};
use datafusion_catalog::MemoryCatalogProviderList;

use arrow_schema::{DataType, SchemaRef};
use datafusion_catalog::{Session, TableFunction, TableFunctionImpl};
Expand Down Expand Up @@ -1987,11 +1987,11 @@ pub(crate) struct PreparedPlan {
#[cfg(test)]
mod tests {
use super::{SessionContextProvider, SessionStateBuilder};
use crate::catalog_common::MemoryCatalogProviderList;
use crate::datasource::MemTable;
use crate::execution::context::SessionState;
use arrow_array::{ArrayRef, Int32Array, RecordBatch, StringArray};
use arrow_schema::{DataType, Field, Schema};
use datafusion_catalog::MemoryCatalogProviderList;
use datafusion_common::DFSchema;
use datafusion_common::Result;
use datafusion_execution::config::SessionConfig;
Expand Down
2 changes: 1 addition & 1 deletion datafusion/core/src/execution/session_state_defaults.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@

use crate::catalog::{CatalogProvider, TableProviderFactory};
use crate::catalog_common::listing_schema::ListingSchemaProvider;
use crate::catalog_common::{MemoryCatalogProvider, MemorySchemaProvider};
use crate::datasource::file_format::arrow::ArrowFormatFactory;
use crate::datasource::file_format::avro::AvroFormatFactory;
use crate::datasource::file_format::csv::CsvFormatFactory;
Expand All @@ -31,6 +30,7 @@ use crate::execution::context::SessionState;
use crate::functions_nested;
use crate::{functions, functions_aggregate, functions_table, functions_window};
use datafusion_catalog::TableFunction;
use datafusion_catalog::{MemoryCatalogProvider, MemorySchemaProvider};
use datafusion_execution::config::SessionConfig;
use datafusion_execution::object_store::ObjectStoreUrl;
use datafusion_execution::runtime_env::RuntimeEnv;
Expand Down
Loading

0 comments on commit 93f1658

Please sign in to comment.