Skip to content

Commit

Permalink
Merge remote-tracking branch 'apache/main' into alamb/specialized_gro…
Browse files Browse the repository at this point in the history
…up_keys
  • Loading branch information
alamb committed Feb 6, 2024
2 parents 18adc9f + d1aca48 commit 1009b85
Show file tree
Hide file tree
Showing 48 changed files with 1,269 additions and 725 deletions.
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ arrow-flight = { version = "50.0.0", features = ["flight-sql-experimental"] }
arrow-ipc = { version = "50.0.0", default-features = false, features = ["lz4"] }
arrow-ord = { version = "50.0.0", default-features = false }
arrow-schema = { version = "50.0.0", default-features = false }
arrow-string = { version = "50.0.0", default-features = false }
async-trait = "0.1.73"
bigdecimal = "0.4.1"
bytes = "1.4"
Expand Down
21 changes: 11 additions & 10 deletions datafusion-cli/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion datafusion-cli/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ dirs = "4.0.0"
env_logger = "0.9"
futures = "0.3"
mimalloc = { version = "0.1", default-features = false }
object_store = { version = "0.9.0", features = ["aws", "gcp"] }
object_store = { version = "0.9.0", features = ["aws", "gcp", "http"] }
parking_lot = { version = "0.12" }
parquet = { version = "50.0.0", default-features = false }
regex = "1.8"
Expand Down
2 changes: 1 addition & 1 deletion datafusion-cli/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
# specific language governing permissions and limitations
# under the License.

FROM rust:1.70 as builder
FROM rust:1.72 as builder

COPY . /usr/src/arrow-datafusion
COPY ./datafusion /usr/src/arrow-datafusion/datafusion
Expand Down
124 changes: 108 additions & 16 deletions datafusion-cli/src/exec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

//! Execution functions
use std::collections::HashMap;
use std::io::prelude::*;
use std::io::BufReader;
use std::time::Instant;
Expand All @@ -42,6 +43,9 @@ use datafusion::physical_plan::{collect, execute_stream};
use datafusion::prelude::SessionContext;
use datafusion::sql::{parser::DFParser, sqlparser::dialect::dialect_from_str};

use datafusion::logical_expr::dml::CopyTo;
use datafusion::sql::parser::Statement;
use object_store::http::HttpBuilder;
use object_store::ObjectStore;
use rustyline::error::ReadlineError;
use rustyline::Editor;
Expand Down Expand Up @@ -220,7 +224,7 @@ async fn exec_and_print(

let statements = DFParser::parse_sql_with_dialect(&sql, dialect.as_ref())?;
for statement in statements {
let mut plan = ctx.state().statement_to_plan(statement).await?;
let plan = create_plan(ctx, statement).await?;

// For plans like `Explain` ignore `MaxRows` option and always display all rows
let should_ignore_maxrows = matches!(
Expand All @@ -230,13 +234,6 @@ async fn exec_and_print(
| LogicalPlan::Analyze(_)
);

// Note that cmd is a mutable reference so that create_external_table function can remove all
// datafusion-cli specific options before passing through to datafusion. Otherwise, datafusion
// will raise Configuration errors.
if let LogicalPlan::Ddl(DdlStatement::CreateExternalTable(cmd)) = &mut plan {
create_external_table(ctx, cmd).await?;
}

let df = ctx.execute_logical_plan(plan).await?;
let physical_plan = df.create_physical_plan().await?;

Expand All @@ -259,6 +256,36 @@ async fn exec_and_print(
Ok(())
}

async fn create_plan(
ctx: &mut SessionContext,
statement: Statement,
) -> Result<LogicalPlan, DataFusionError> {
let mut plan = ctx.state().statement_to_plan(statement).await?;

// Note that cmd is a mutable reference so that create_external_table function can remove all
// datafusion-cli specific options before passing through to datafusion. Otherwise, datafusion
// will raise Configuration errors.
if let LogicalPlan::Ddl(DdlStatement::CreateExternalTable(cmd)) = &mut plan {
create_external_table(ctx, cmd).await?;
}

if let LogicalPlan::Copy(copy_to) = &mut plan {
register_object_store(ctx, copy_to).await?;
}
Ok(plan)
}

async fn register_object_store(
ctx: &SessionContext,
copy_to: &mut CopyTo,
) -> Result<(), DataFusionError> {
let url = ListingTableUrl::parse(copy_to.output_url.as_str())?;
let store =
get_object_store(ctx, &mut HashMap::new(), url.scheme(), url.as_ref()).await?;
ctx.runtime_env().register_object_store(url.as_ref(), store);
Ok(())
}

async fn create_external_table(
ctx: &SessionContext,
cmd: &mut CreateExternalTable,
Expand All @@ -268,19 +295,37 @@ async fn create_external_table(
let url: &Url = table_path.as_ref();

// registering the cloud object store dynamically using cmd.options
let store = get_object_store(ctx, &mut cmd.options, scheme, url).await?;

ctx.runtime_env().register_object_store(url, store);

Ok(())
}

async fn get_object_store(
ctx: &SessionContext,
options: &mut HashMap<String, String>,
scheme: &str,
url: &Url,
) -> Result<Arc<dyn ObjectStore>, DataFusionError> {
let store = match scheme {
"s3" => {
let builder = get_s3_object_store_builder(url, cmd).await?;
let builder = get_s3_object_store_builder(url, options).await?;
Arc::new(builder.build()?) as Arc<dyn ObjectStore>
}
"oss" => {
let builder = get_oss_object_store_builder(url, cmd)?;
let builder = get_oss_object_store_builder(url, options)?;
Arc::new(builder.build()?) as Arc<dyn ObjectStore>
}
"gs" | "gcs" => {
let builder = get_gcs_object_store_builder(url, cmd)?;
let builder = get_gcs_object_store_builder(url, options)?;
Arc::new(builder.build()?) as Arc<dyn ObjectStore>
}
"http" | "https" => Arc::new(
HttpBuilder::new()
.with_url(url.origin().ascii_serialization())
.build()?,
) as Arc<dyn ObjectStore>,
_ => {
// for other types, try to get from the object_store_registry
ctx.runtime_env()
Expand All @@ -291,10 +336,7 @@ async fn create_external_table(
})?
}
};

ctx.runtime_env().register_object_store(url, store);

Ok(())
Ok(store)
}

#[cfg(test)]
Expand All @@ -303,7 +345,9 @@ mod tests {

use super::*;
use datafusion::common::plan_err;
use datafusion_common::{file_options::StatementOptions, FileTypeWriterOptions};
use datafusion_common::{
file_options::StatementOptions, FileType, FileTypeWriterOptions,
};

async fn create_external_table_test(location: &str, sql: &str) -> Result<()> {
let ctx = SessionContext::new();
Expand All @@ -329,12 +373,60 @@ mod tests {
return plan_err!("LogicalPlan is not a CreateExternalTable");
}

// Ensure the URL is supported by the object store
ctx.runtime_env()
.object_store(ListingTableUrl::parse(location)?)?;

Ok(())
}

#[tokio::test]
async fn create_object_store_table_http() -> Result<()> {
// Should be OK
let location = "http://example.com/file.parquet";
let sql =
format!("CREATE EXTERNAL TABLE test STORED AS PARQUET LOCATION '{location}'");
create_external_table_test(location, &sql).await?;

Ok(())
}
#[tokio::test]
async fn copy_to_external_object_store_test() -> Result<()> {
let locations = vec![
"s3://bucket/path/file.parquet",
"oss://bucket/path/file.parquet",
"gcs://bucket/path/file.parquet",
];
let mut ctx = SessionContext::new();
let task_ctx = ctx.task_ctx();
let dialect = &task_ctx.session_config().options().sql_parser.dialect;
let dialect = dialect_from_str(dialect).ok_or_else(|| {
plan_datafusion_err!(
"Unsupported SQL dialect: {dialect}. Available dialects: \
Generic, MySQL, PostgreSQL, Hive, SQLite, Snowflake, Redshift, \
MsSQL, ClickHouse, BigQuery, Ansi."
)
})?;
for location in locations {
let sql = format!("copy (values (1,2)) to '{}';", location);
let statements = DFParser::parse_sql_with_dialect(&sql, dialect.as_ref())?;
for statement in statements {
//Should not fail
let mut plan = create_plan(&mut ctx, statement).await?;
if let LogicalPlan::Copy(copy_to) = &mut plan {
assert_eq!(copy_to.output_url, location);
assert_eq!(copy_to.file_format, FileType::PARQUET);
ctx.runtime_env()
.object_store_registry
.get_store(&Url::parse(&copy_to.output_url).unwrap())?;
} else {
return plan_err!("LogicalPlan is not a CopyTo");
}
}
}
Ok(())
}

#[tokio::test]
async fn create_object_store_table_s3() -> Result<()> {
let access_key_id = "fake_access_key_id";
Expand Down
Loading

0 comments on commit 1009b85

Please sign in to comment.