diff --git a/Cargo.toml b/Cargo.toml index 9eb4d5b39af0..85b26f802f05 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -65,7 +65,7 @@ homepage = "https://datafusion.apache.org" license = "Apache-2.0" readme = "README.md" repository = "https://github.com/apache/datafusion" -rust-version = "1.80.1" +rust-version = "1.81.0" version = "44.0.0" [workspace.dependencies] diff --git a/datafusion-cli/Cargo.toml b/datafusion-cli/Cargo.toml index 30060665934c..c418d17ce29b 100644 --- a/datafusion-cli/Cargo.toml +++ b/datafusion-cli/Cargo.toml @@ -25,7 +25,7 @@ keywords = ["arrow", "datafusion", "query", "sql"] license = "Apache-2.0" homepage = "https://datafusion.apache.org" repository = "https://github.com/apache/datafusion" -rust-version = "1.80.1" +rust-version = "1.81.0" readme = "README.md" [dependencies] diff --git a/datafusion-examples/examples/expr_api.rs b/datafusion-examples/examples/expr_api.rs index 6bfde2ebbf52..2908edbb754d 100644 --- a/datafusion-examples/examples/expr_api.rs +++ b/datafusion-examples/examples/expr_api.rs @@ -270,7 +270,7 @@ fn range_analysis_demo() -> Result<()> { // In this case, we can see that, as expected, `analyze` has figured out // that in this case, `date` must be in the range `['2020-09-01', '2020-10-01']` let expected_range = Interval::try_new(september_1, october_1)?; - assert_eq!(analysis_result.boundaries[0].interval, expected_range); + assert_eq!(analysis_result.boundaries[0].interval, Some(expected_range)); Ok(()) } diff --git a/datafusion/common/src/scalar/mod.rs b/datafusion/common/src/scalar/mod.rs index 14243d95a666..9d3429b67796 100644 --- a/datafusion/common/src/scalar/mod.rs +++ b/datafusion/common/src/scalar/mod.rs @@ -3963,7 +3963,7 @@ mod tests { use arrow::error::ArrowError; use arrow::util::pretty::pretty_format_columns; use arrow_array::types::Float64Type; - use arrow_buffer::{Buffer, NullBuffer}; + use arrow_buffer::{Buffer, NullBufferBuilder}; use arrow_schema::Fields; use chrono::NaiveDate; use rand::Rng; @@ -6912,12 +6912,11 @@ mod tests { let array_b = Arc::new(Int32Array::from_iter_values([2])); let arrays: Vec = vec![array_a, array_b]; - let mut not_nulls = BooleanBufferBuilder::new(1); - not_nulls.append(true); - let not_nulls = not_nulls.finish(); - let not_nulls = Some(NullBuffer::new(not_nulls)); + let mut not_nulls = NullBufferBuilder::new(1); - let ar = StructArray::new(fields, arrays, not_nulls); + not_nulls.append_non_null(); + + let ar = StructArray::new(fields, arrays, not_nulls.finish()); let s = ScalarValue::Struct(Arc::new(ar)); assert_eq!(s.to_string(), "{a:1,b:2}"); diff --git a/datafusion/common/src/stats.rs b/datafusion/common/src/stats.rs index d2ce965c5c49..dd8848d24923 100644 --- a/datafusion/common/src/stats.rs +++ b/datafusion/common/src/stats.rs @@ -21,7 +21,7 @@ use std::fmt::{self, Debug, Display}; use crate::{Result, ScalarValue}; -use arrow_schema::{Schema, SchemaRef}; +use arrow_schema::{DataType, Schema, SchemaRef}; /// Represents a value with a degree of certainty. `Precision` is used to /// propagate information the precision of statistical values. @@ -170,24 +170,63 @@ impl Precision { pub fn add(&self, other: &Precision) -> Precision { match (self, other) { (Precision::Exact(a), Precision::Exact(b)) => { - if let Ok(result) = a.add(b) { - Precision::Exact(result) - } else { - Precision::Absent - } + a.add(b).map(Precision::Exact).unwrap_or(Precision::Absent) } (Precision::Inexact(a), Precision::Exact(b)) | (Precision::Exact(a), Precision::Inexact(b)) - | (Precision::Inexact(a), Precision::Inexact(b)) => { - if let Ok(result) = a.add(b) { - Precision::Inexact(result) - } else { - Precision::Absent - } + | (Precision::Inexact(a), Precision::Inexact(b)) => a + .add(b) + .map(Precision::Inexact) + .unwrap_or(Precision::Absent), + (_, _) => Precision::Absent, + } + } + + /// Calculates the difference of two (possibly inexact) [`ScalarValue`] values, + /// conservatively propagating exactness information. If one of the input + /// values is [`Precision::Absent`], the result is `Absent` too. + pub fn sub(&self, other: &Precision) -> Precision { + match (self, other) { + (Precision::Exact(a), Precision::Exact(b)) => { + a.sub(b).map(Precision::Exact).unwrap_or(Precision::Absent) } + (Precision::Inexact(a), Precision::Exact(b)) + | (Precision::Exact(a), Precision::Inexact(b)) + | (Precision::Inexact(a), Precision::Inexact(b)) => a + .sub(b) + .map(Precision::Inexact) + .unwrap_or(Precision::Absent), + (_, _) => Precision::Absent, + } + } + + /// Calculates the multiplication of two (possibly inexact) [`ScalarValue`] values, + /// conservatively propagating exactness information. If one of the input + /// values is [`Precision::Absent`], the result is `Absent` too. + pub fn multiply(&self, other: &Precision) -> Precision { + match (self, other) { + (Precision::Exact(a), Precision::Exact(b)) => a + .mul_checked(b) + .map(Precision::Exact) + .unwrap_or(Precision::Absent), + (Precision::Inexact(a), Precision::Exact(b)) + | (Precision::Exact(a), Precision::Inexact(b)) + | (Precision::Inexact(a), Precision::Inexact(b)) => a + .mul_checked(b) + .map(Precision::Inexact) + .unwrap_or(Precision::Absent), (_, _) => Precision::Absent, } } + + /// Casts the value to the given data type, propagating exactness information. + pub fn cast_to(&self, data_type: &DataType) -> Result> { + match self { + Precision::Exact(value) => value.cast_to(data_type).map(Precision::Exact), + Precision::Inexact(value) => value.cast_to(data_type).map(Precision::Inexact), + Precision::Absent => Ok(Precision::Absent), + } + } } impl Debug for Precision { @@ -210,6 +249,18 @@ impl Display for Precision { } } +impl From> for Precision { + fn from(value: Precision) -> Self { + match value { + Precision::Exact(v) => Precision::Exact(ScalarValue::UInt64(Some(v as u64))), + Precision::Inexact(v) => { + Precision::Inexact(ScalarValue::UInt64(Some(v as u64))) + } + Precision::Absent => Precision::Absent, + } + } +} + /// Statistics for a relation /// Fields are optional and can be inexact because the sources /// sometimes provide approximate estimates for performance reasons @@ -401,6 +452,11 @@ impl Display for Statistics { } else { s }; + let s = if cs.sum_value != Precision::Absent { + format!("{} Sum={}", s, cs.sum_value) + } else { + s + }; let s = if cs.null_count != Precision::Absent { format!("{} Null={}", s, cs.null_count) } else { @@ -436,6 +492,8 @@ pub struct ColumnStatistics { pub max_value: Precision, /// Minimum value of column pub min_value: Precision, + /// Sum value of a column + pub sum_value: Precision, /// Number of distinct values pub distinct_count: Precision, } @@ -458,6 +516,7 @@ impl ColumnStatistics { null_count: Precision::Absent, max_value: Precision::Absent, min_value: Precision::Absent, + sum_value: Precision::Absent, distinct_count: Precision::Absent, } } @@ -469,6 +528,7 @@ impl ColumnStatistics { self.null_count = self.null_count.to_inexact(); self.max_value = self.max_value.to_inexact(); self.min_value = self.min_value.to_inexact(); + self.sum_value = self.sum_value.to_inexact(); self.distinct_count = self.distinct_count.to_inexact(); self } @@ -563,6 +623,26 @@ mod tests { assert_eq!(precision1.add(&absent_precision), Precision::Absent); } + #[test] + fn test_add_scalar() { + let precision = Precision::Exact(ScalarValue::Int32(Some(42))); + + assert_eq!( + precision.add(&Precision::Exact(ScalarValue::Int32(Some(23)))), + Precision::Exact(ScalarValue::Int32(Some(65))), + ); + assert_eq!( + precision.add(&Precision::Inexact(ScalarValue::Int32(Some(23)))), + Precision::Inexact(ScalarValue::Int32(Some(65))), + ); + assert_eq!( + precision.add(&Precision::Exact(ScalarValue::Int32(None))), + // As per behavior of ScalarValue::add + Precision::Exact(ScalarValue::Int32(None)), + ); + assert_eq!(precision.add(&Precision::Absent), Precision::Absent); + } + #[test] fn test_sub() { let precision1 = Precision::Exact(42); @@ -575,6 +655,26 @@ mod tests { assert_eq!(precision1.sub(&absent_precision), Precision::Absent); } + #[test] + fn test_sub_scalar() { + let precision = Precision::Exact(ScalarValue::Int32(Some(42))); + + assert_eq!( + precision.sub(&Precision::Exact(ScalarValue::Int32(Some(23)))), + Precision::Exact(ScalarValue::Int32(Some(19))), + ); + assert_eq!( + precision.sub(&Precision::Inexact(ScalarValue::Int32(Some(23)))), + Precision::Inexact(ScalarValue::Int32(Some(19))), + ); + assert_eq!( + precision.sub(&Precision::Exact(ScalarValue::Int32(None))), + // As per behavior of ScalarValue::sub + Precision::Exact(ScalarValue::Int32(None)), + ); + assert_eq!(precision.sub(&Precision::Absent), Precision::Absent); + } + #[test] fn test_multiply() { let precision1 = Precision::Exact(6); @@ -588,6 +688,54 @@ mod tests { assert_eq!(precision1.multiply(&absent_precision), Precision::Absent); } + #[test] + fn test_multiply_scalar() { + let precision = Precision::Exact(ScalarValue::Int32(Some(6))); + + assert_eq!( + precision.multiply(&Precision::Exact(ScalarValue::Int32(Some(5)))), + Precision::Exact(ScalarValue::Int32(Some(30))), + ); + assert_eq!( + precision.multiply(&Precision::Inexact(ScalarValue::Int32(Some(5)))), + Precision::Inexact(ScalarValue::Int32(Some(30))), + ); + assert_eq!( + precision.multiply(&Precision::Exact(ScalarValue::Int32(None))), + // As per behavior of ScalarValue::mul_checked + Precision::Exact(ScalarValue::Int32(None)), + ); + assert_eq!(precision.multiply(&Precision::Absent), Precision::Absent); + } + + #[test] + fn test_cast_to() { + // Valid + assert_eq!( + Precision::Exact(ScalarValue::Int32(Some(42))) + .cast_to(&DataType::Int64) + .unwrap(), + Precision::Exact(ScalarValue::Int64(Some(42))), + ); + assert_eq!( + Precision::Inexact(ScalarValue::Int32(Some(42))) + .cast_to(&DataType::Int64) + .unwrap(), + Precision::Inexact(ScalarValue::Int64(Some(42))), + ); + // Null + assert_eq!( + Precision::Exact(ScalarValue::Int32(None)) + .cast_to(&DataType::Int64) + .unwrap(), + Precision::Exact(ScalarValue::Int64(None)), + ); + // Overflow returns error + assert!(Precision::Exact(ScalarValue::Int32(Some(256))) + .cast_to(&DataType::Int8) + .is_err()); + } + #[test] fn test_precision_cloning() { // Precision is copy @@ -646,6 +794,7 @@ mod tests { null_count: Precision::Exact(null_count), max_value: Precision::Exact(ScalarValue::Int64(Some(42))), min_value: Precision::Exact(ScalarValue::Int64(Some(64))), + sum_value: Precision::Exact(ScalarValue::Int64(Some(4600))), distinct_count: Precision::Exact(100), } } diff --git a/datafusion/core/src/datasource/statistics.rs b/datafusion/core/src/datasource/statistics.rs index 201bbfd5c007..f02927619a7d 100644 --- a/datafusion/core/src/datasource/statistics.rs +++ b/datafusion/core/src/datasource/statistics.rs @@ -76,6 +76,7 @@ pub async fn get_statistics_with_limit( col_stats_set[index].null_count = file_column.null_count; col_stats_set[index].max_value = file_column.max_value; col_stats_set[index].min_value = file_column.min_value; + col_stats_set[index].sum_value = file_column.sum_value; } // If the number of rows exceeds the limit, we can stop processing @@ -113,12 +114,14 @@ pub async fn get_statistics_with_limit( null_count: file_nc, max_value: file_max, min_value: file_min, + sum_value: file_sum, distinct_count: _, } = file_col_stats; col_stats.null_count = add_row_stats(*file_nc, col_stats.null_count); set_max_if_greater(file_max, &mut col_stats.max_value); - set_min_if_lesser(file_min, &mut col_stats.min_value) + set_min_if_lesser(file_min, &mut col_stats.min_value); + col_stats.sum_value = file_sum.add(&col_stats.sum_value); } // If the number of rows exceeds the limit, we can stop processing @@ -204,6 +207,7 @@ pub(crate) fn get_col_stats( null_count: null_counts[i], max_value: max_value.map(Precision::Exact).unwrap_or(Precision::Absent), min_value: min_value.map(Precision::Exact).unwrap_or(Precision::Absent), + sum_value: Precision::Absent, distinct_count: Precision::Absent, } }) diff --git a/datafusion/core/tests/custom_sources_cases/statistics.rs b/datafusion/core/tests/custom_sources_cases/statistics.rs index 9d3bd594a929..1fd6dfec79fb 100644 --- a/datafusion/core/tests/custom_sources_cases/statistics.rs +++ b/datafusion/core/tests/custom_sources_cases/statistics.rs @@ -200,12 +200,14 @@ fn fully_defined() -> (Statistics, Schema) { distinct_count: Precision::Exact(2), max_value: Precision::Exact(ScalarValue::Int32(Some(1023))), min_value: Precision::Exact(ScalarValue::Int32(Some(-24))), + sum_value: Precision::Exact(ScalarValue::Int64(Some(10))), null_count: Precision::Exact(0), }, ColumnStatistics { distinct_count: Precision::Exact(13), max_value: Precision::Exact(ScalarValue::Int64(Some(5486))), min_value: Precision::Exact(ScalarValue::Int64(Some(-6783))), + sum_value: Precision::Exact(ScalarValue::Int64(Some(10))), null_count: Precision::Exact(5), }, ], diff --git a/datafusion/expr/src/expr.rs b/datafusion/expr/src/expr.rs index b8e495ee7ae9..7070761f6383 100644 --- a/datafusion/expr/src/expr.rs +++ b/datafusion/expr/src/expr.rs @@ -2281,6 +2281,11 @@ impl Display for SchemaDisplay<'_> { Ok(()) } // Expr is not shown since it is aliased + Expr::Alias(Alias { + name, + relation: Some(relation), + .. + }) => write!(f, "{relation}.{name}"), Expr::Alias(Alias { name, .. }) => write!(f, "{name}"), Expr::Between(Between { expr, @@ -2769,10 +2774,10 @@ fn fmt_function( /// The name of the column (field) that this `Expr` will produce in the physical plan. /// The difference from [Expr::schema_name] is that top-level columns are unqualified. pub fn physical_name(expr: &Expr) -> Result { - if let Expr::Column(col) = expr { - Ok(col.name.clone()) - } else { - Ok(expr.schema_name().to_string()) + match expr { + Expr::Column(col) => Ok(col.name.clone()), + Expr::Alias(alias) => Ok(alias.name.clone()), + _ => Ok(expr.schema_name().to_string()), } } @@ -3023,6 +3028,30 @@ mod test { ) } + #[test] + fn test_schema_display_alias_with_relation() { + assert_eq!( + format!( + "{}", + SchemaDisplay( + &lit(1).alias_qualified("table_name".into(), "column_name") + ) + ), + "table_name.column_name" + ); + } + + #[test] + fn test_schema_display_alias_without_relation() { + assert_eq!( + format!( + "{}", + SchemaDisplay(&lit(1).alias_qualified(None::<&str>, "column_name")) + ), + "column_name" + ); + } + fn wildcard_options( opt_ilike: Option, opt_exclude: Option, diff --git a/datafusion/expr/src/logical_plan/plan.rs b/datafusion/expr/src/logical_plan/plan.rs index 24fb0609b0fe..7e9c0cb75ec8 100644 --- a/datafusion/expr/src/logical_plan/plan.rs +++ b/datafusion/expr/src/logical_plan/plan.rs @@ -1497,7 +1497,9 @@ impl LogicalPlan { (_, Some(dt)) => { param_types.insert(id.clone(), Some(dt.clone())); } - _ => {} + _ => { + param_types.insert(id.clone(), None); + } } } Ok(TreeNodeRecursion::Continue) @@ -4347,4 +4349,25 @@ digraph { plan.rewrite_with_subqueries(&mut rewriter).unwrap(); assert!(!rewriter.filter_found); } + + #[test] + fn test_with_unresolved_placeholders() { + let field_name = "id"; + let placeholder_value = "$1"; + let schema = Schema::new(vec![Field::new(field_name, DataType::Int32, false)]); + + let plan = table_scan(TableReference::none(), &schema, None) + .unwrap() + .filter(col(field_name).eq(placeholder(placeholder_value))) + .unwrap() + .build() + .unwrap(); + + // Check that the placeholder parameters have not received a DataType. + let params = plan.get_parameter_types().unwrap(); + assert_eq!(params.len(), 1); + + let parameter_type = params.clone().get(placeholder_value).unwrap().clone(); + assert_eq!(parameter_type, None); + } } diff --git a/datafusion/functions/src/math/monotonicity.rs b/datafusion/functions/src/math/monotonicity.rs index 46c670b8e651..7c87d025e929 100644 --- a/datafusion/functions/src/math/monotonicity.rs +++ b/datafusion/functions/src/math/monotonicity.rs @@ -558,3 +558,405 @@ pub fn get_tanh_doc() -> &'static Documentation { .build() }) } + +#[cfg(test)] +mod tests { + use arrow::compute::SortOptions; + use datafusion_common::Result; + + use super::*; + + #[derive(Debug)] + struct MonotonicityTestCase { + name: &'static str, + func: fn(&[ExprProperties]) -> Result, + lower: f64, + upper: f64, + input_sort: SortProperties, + expected: Result, + } + + #[test] + fn test_monotonicity_table() { + fn create_ep(lower: f64, upper: f64, sp: SortProperties) -> ExprProperties { + ExprProperties { + range: Interval::try_new( + ScalarValue::from(lower), + ScalarValue::from(upper), + ) + .unwrap(), + sort_properties: sp, + preserves_lex_ordering: false, + } + } + + let test_cases = vec![ + MonotonicityTestCase { + name: "acos_order within domain", + func: acos_order, + lower: -0.5, + upper: 0.5, + input_sort: SortProperties::Ordered(SortOptions { + descending: false, + nulls_first: false, + }), + expected: Ok(SortProperties::Ordered(SortOptions { + descending: true, + nulls_first: false, + })), + }, + MonotonicityTestCase { + name: "acos_order out of domain", + func: acos_order, + lower: -2.0, + upper: 1.0, + input_sort: SortProperties::Ordered(SortOptions { + descending: false, + nulls_first: false, + }), + expected: exec_err!("Input range of ACOS contains out-of-domain values"), + }, + MonotonicityTestCase { + name: "acosh_order within domain", + func: acosh_order, + lower: 2.0, + upper: 100.0, + input_sort: SortProperties::Ordered(SortOptions { + descending: false, + nulls_first: true, + }), + expected: Ok(SortProperties::Ordered(SortOptions { + descending: false, + nulls_first: true, + })), + }, + MonotonicityTestCase { + name: "acosh_order out of domain", + func: acosh_order, + lower: 0.5, + upper: 1.0, + input_sort: SortProperties::Ordered(SortOptions { + descending: true, + nulls_first: false, + }), + expected: exec_err!("Input range of ACOSH contains out-of-domain values"), + }, + MonotonicityTestCase { + name: "asin_order within domain", + func: asin_order, + lower: -0.5, + upper: 0.5, + input_sort: SortProperties::Ordered(SortOptions { + descending: false, + nulls_first: false, + }), + expected: Ok(SortProperties::Ordered(SortOptions { + descending: false, + nulls_first: false, + })), + }, + MonotonicityTestCase { + name: "asin_order out of domain", + func: asin_order, + lower: -2.0, + upper: 1.0, + input_sort: SortProperties::Ordered(SortOptions { + descending: false, + nulls_first: false, + }), + expected: exec_err!("Input range of ASIN contains out-of-domain values"), + }, + MonotonicityTestCase { + name: "asinh_order within domain", + func: asinh_order, + lower: -1.0, + upper: 1.0, + input_sort: SortProperties::Ordered(SortOptions { + descending: false, + nulls_first: false, + }), + expected: Ok(SortProperties::Ordered(SortOptions { + descending: false, + nulls_first: false, + })), + }, + MonotonicityTestCase { + name: "asinh_order out of domain", + func: asinh_order, + lower: -2.0, + upper: 1.0, + input_sort: SortProperties::Ordered(SortOptions { + descending: false, + nulls_first: false, + }), + expected: Ok(SortProperties::Ordered(SortOptions { + descending: false, + nulls_first: false, + })), + }, + MonotonicityTestCase { + name: "atan_order within domain", + func: atan_order, + lower: -1.0, + upper: 1.0, + input_sort: SortProperties::Ordered(SortOptions { + descending: false, + nulls_first: false, + }), + expected: Ok(SortProperties::Ordered(SortOptions { + descending: false, + nulls_first: false, + })), + }, + MonotonicityTestCase { + name: "atan_order out of domain", + func: atan_order, + lower: -2.0, + upper: 1.0, + input_sort: SortProperties::Ordered(SortOptions { + descending: false, + nulls_first: false, + }), + expected: Ok(SortProperties::Ordered(SortOptions { + descending: false, + nulls_first: false, + })), + }, + MonotonicityTestCase { + name: "atanh_order within domain", + func: atanh_order, + lower: -0.6, + upper: 0.6, + input_sort: SortProperties::Ordered(SortOptions { + descending: false, + nulls_first: false, + }), + expected: Ok(SortProperties::Ordered(SortOptions { + descending: false, + nulls_first: false, + })), + }, + MonotonicityTestCase { + name: "atanh_order out of domain", + func: atanh_order, + lower: -2.0, + upper: 1.0, + input_sort: SortProperties::Ordered(SortOptions { + descending: false, + nulls_first: false, + }), + expected: exec_err!("Input range of ATANH contains out-of-domain values"), + }, + MonotonicityTestCase { + name: "cbrt_order within domain", + func: cbrt_order, + lower: -1.0, + upper: 1.0, + input_sort: SortProperties::Ordered(SortOptions { + descending: false, + nulls_first: false, + }), + expected: Ok(SortProperties::Ordered(SortOptions { + descending: false, + nulls_first: false, + })), + }, + MonotonicityTestCase { + name: "cbrt_order out of domain", + func: cbrt_order, + lower: -2.0, + upper: 1.0, + input_sort: SortProperties::Ordered(SortOptions { + descending: false, + nulls_first: false, + }), + expected: Ok(SortProperties::Ordered(SortOptions { + descending: false, + nulls_first: false, + })), + }, + MonotonicityTestCase { + name: "ceil_order within domain", + func: ceil_order, + lower: -1.0, + upper: 1.0, + input_sort: SortProperties::Ordered(SortOptions { + descending: false, + nulls_first: false, + }), + expected: Ok(SortProperties::Ordered(SortOptions { + descending: false, + nulls_first: false, + })), + }, + MonotonicityTestCase { + name: "ceil_order out of domain", + func: ceil_order, + lower: -2.0, + upper: 1.0, + input_sort: SortProperties::Ordered(SortOptions { + descending: false, + nulls_first: false, + }), + expected: Ok(SortProperties::Ordered(SortOptions { + descending: false, + nulls_first: false, + })), + }, + MonotonicityTestCase { + name: "cos_order within domain", + func: cos_order, + lower: 0.0, + upper: 2.0 * std::f64::consts::PI, + input_sort: SortProperties::Ordered(SortOptions { + descending: false, + nulls_first: false, + }), + expected: Ok(SortProperties::Unordered), + }, + MonotonicityTestCase { + name: "cos_order out of domain", + func: cos_order, + lower: -2.0, + upper: 1.0, + input_sort: SortProperties::Ordered(SortOptions { + descending: false, + nulls_first: false, + }), + expected: Ok(SortProperties::Unordered), + }, + MonotonicityTestCase { + name: "cosh_order within domain positive", + func: cosh_order, + lower: 5.0, + upper: 100.0, + input_sort: SortProperties::Ordered(SortOptions { + descending: false, + nulls_first: false, + }), + expected: Ok(SortProperties::Ordered(SortOptions { + descending: false, + nulls_first: false, + })), + }, + MonotonicityTestCase { + name: "cosh_order within domain negative", + func: cosh_order, + lower: -100.0, + upper: -5.0, + input_sort: SortProperties::Ordered(SortOptions { + descending: false, + nulls_first: false, + }), + expected: Ok(SortProperties::Ordered(SortOptions { + descending: true, + nulls_first: false, + })), + }, + MonotonicityTestCase { + name: "cosh_order out of domain so unordered", + func: cosh_order, + lower: -1.0, + upper: 1.0, + input_sort: SortProperties::Ordered(SortOptions { + descending: false, + nulls_first: false, + }), + expected: Ok(SortProperties::Unordered), + }, + MonotonicityTestCase { + name: "degrees_order", + func: degrees_order, + lower: -1.0, + upper: 1.0, + input_sort: SortProperties::Ordered(SortOptions { + descending: true, + nulls_first: true, + }), + expected: Ok(SortProperties::Ordered(SortOptions { + descending: true, + nulls_first: true, + })), + }, + MonotonicityTestCase { + name: "exp_order", + func: exp_order, + lower: -1000.0, + upper: 1000.0, + input_sort: SortProperties::Ordered(SortOptions { + descending: false, + nulls_first: false, + }), + expected: Ok(SortProperties::Ordered(SortOptions { + descending: false, + nulls_first: false, + })), + }, + MonotonicityTestCase { + name: "floor_order", + func: floor_order, + lower: -1.0, + upper: 1.0, + input_sort: SortProperties::Ordered(SortOptions { + descending: true, + nulls_first: true, + }), + expected: Ok(SortProperties::Ordered(SortOptions { + descending: true, + nulls_first: true, + })), + }, + MonotonicityTestCase { + name: "ln_order within domain", + func: ln_order, + lower: 1.0, + upper: 2.0, + input_sort: SortProperties::Ordered(SortOptions { + descending: false, + nulls_first: false, + }), + expected: Ok(SortProperties::Ordered(SortOptions { + descending: false, + nulls_first: false, + })), + }, + MonotonicityTestCase { + name: "ln_order out of domain", + func: ln_order, + lower: -5.0, + upper: -4.0, + input_sort: SortProperties::Ordered(SortOptions { + descending: false, + nulls_first: false, + }), + expected: exec_err!("Input range of LN contains out-of-domain values"), + }, + ]; + + for tcase in test_cases { + let input = vec![create_ep(tcase.lower, tcase.upper, tcase.input_sort)]; + let actual = (tcase.func)(&input); + match (&actual, &tcase.expected) { + (Ok(a), Ok(e)) => assert_eq!( + a, e, + "Test '{}' failed: got {:?}, expected {:?}", + tcase.name, a, e + ), + (Err(e1), Err(e2)) => { + assert_eq!( + e1.strip_backtrace().to_string(), + e2.strip_backtrace().to_string(), + "Test '{}' failed: got {:?}, expected {:?}", + tcase.name, + e1, + e2 + ) + } // Both are errors, so it's fine + _ => panic!( + "Test '{}' failed: got {:?}, expected {:?}", + tcase.name, actual, tcase.expected + ), + } + } + } +} diff --git a/datafusion/optimizer/src/push_down_limit.rs b/datafusion/optimizer/src/push_down_limit.rs index 8a3aa4bb8459..4da112d5153a 100644 --- a/datafusion/optimizer/src/push_down_limit.rs +++ b/datafusion/optimizer/src/push_down_limit.rs @@ -255,7 +255,6 @@ fn push_down_join(mut join: Join, limit: usize) -> Transformed { match join.join_type { Left => (Some(limit), None), Right => (None, Some(limit)), - Full => (Some(limit), Some(limit)), _ => (None, None), } }; diff --git a/datafusion/physical-expr/src/analysis.rs b/datafusion/physical-expr/src/analysis.rs index b602a9cba4f4..ceec21c71171 100644 --- a/datafusion/physical-expr/src/analysis.rs +++ b/datafusion/physical-expr/src/analysis.rs @@ -81,8 +81,12 @@ impl AnalysisContext { #[derive(Clone, Debug, PartialEq)] pub struct ExprBoundaries { pub column: Column, - /// Minimum and maximum values this expression can have. - pub interval: Interval, + /// Minimum and maximum values this expression can have. A `None` value + /// indicates that evaluating the given column results in an empty set. + /// For example, if the column `a` has values in the range [10, 20], + /// and there is a filter asserting that `a > 50`, then the resulting interval + /// range of `a` will be `None`. + pub interval: Option, /// Maximum number of distinct values this expression can produce, if known. pub distinct_count: Precision, } @@ -118,7 +122,7 @@ impl ExprBoundaries { let column = Column::new(field.name(), col_index); Ok(ExprBoundaries { column, - interval, + interval: Some(interval), distinct_count: col_stats.distinct_count, }) } @@ -133,7 +137,7 @@ impl ExprBoundaries { .map(|(i, field)| { Ok(Self { column: Column::new(field.name(), i), - interval: Interval::make_unbounded(field.data_type())?, + interval: Some(Interval::make_unbounded(field.data_type())?), distinct_count: Precision::Absent, }) }) @@ -161,40 +165,71 @@ pub fn analyze( context: AnalysisContext, schema: &Schema, ) -> Result { - let target_boundaries = context.boundaries; - - let mut graph = ExprIntervalGraph::try_new(Arc::clone(expr), schema)?; - - let columns = collect_columns(expr) - .into_iter() - .map(|c| Arc::new(c) as _) - .collect::>(); - - let target_expr_and_indices = graph.gather_node_indices(columns.as_slice()); - - let mut target_indices_and_boundaries = target_expr_and_indices + let initial_boundaries = &context.boundaries; + if initial_boundaries .iter() - .filter_map(|(expr, i)| { - target_boundaries.iter().find_map(|bound| { - expr.as_any() - .downcast_ref::() - .filter(|expr_column| bound.column.eq(*expr_column)) - .map(|_| (*i, bound.interval.clone())) - }) - }) - .collect::>(); - - match graph - .update_ranges(&mut target_indices_and_boundaries, Interval::CERTAINLY_TRUE)? + .all(|bound| bound.interval.is_none()) { - PropagationResult::Success => { - shrink_boundaries(graph, target_boundaries, target_expr_and_indices) + if initial_boundaries + .iter() + .any(|bound| bound.distinct_count != Precision::Exact(0)) + { + return internal_err!( + "ExprBoundaries has a non-zero distinct count although it represents an empty table" + ); + } + if context.selectivity != Some(0.0) { + return internal_err!( + "AnalysisContext has a non-zero selectivity although it represents an empty table" + ); } - PropagationResult::Infeasible => { - Ok(AnalysisContext::new(target_boundaries).with_selectivity(0.0)) + Ok(context) + } else if initial_boundaries + .iter() + .any(|bound| bound.interval.is_none()) + { + internal_err!( + "AnalysisContext is an inconsistent state. Some columns represent empty table while others don't" + ) + } else { + let mut target_boundaries = context.boundaries; + let mut graph = ExprIntervalGraph::try_new(Arc::clone(expr), schema)?; + let columns = collect_columns(expr) + .into_iter() + .map(|c| Arc::new(c) as _) + .collect::>(); + + let mut target_indices_and_boundaries = vec![]; + let target_expr_and_indices = graph.gather_node_indices(columns.as_slice()); + + for (expr, index) in &target_expr_and_indices { + if let Some(column) = expr.as_any().downcast_ref::() { + if let Some(bound) = + target_boundaries.iter().find(|b| b.column == *column) + { + // Now, it's safe to unwrap + target_indices_and_boundaries + .push((*index, bound.interval.as_ref().unwrap().clone())); + } + } } - PropagationResult::CannotPropagate => { - Ok(AnalysisContext::new(target_boundaries).with_selectivity(1.0)) + + match graph + .update_ranges(&mut target_indices_and_boundaries, Interval::CERTAINLY_TRUE)? + { + PropagationResult::Success => { + shrink_boundaries(graph, target_boundaries, target_expr_and_indices) + } + PropagationResult::Infeasible => { + // If the propagation result is infeasible, set intervals to None + target_boundaries + .iter_mut() + .for_each(|bound| bound.interval = None); + Ok(AnalysisContext::new(target_boundaries).with_selectivity(0.0)) + } + PropagationResult::CannotPropagate => { + Ok(AnalysisContext::new(target_boundaries).with_selectivity(1.0)) + } } } } @@ -215,12 +250,12 @@ fn shrink_boundaries( .iter_mut() .find(|bound| bound.column.eq(column)) { - bound.interval = graph.get_interval(*i); + bound.interval = Some(graph.get_interval(*i)); }; } }); - let selectivity = calculate_selectivity(&target_boundaries, &initial_boundaries); + let selectivity = calculate_selectivity(&target_boundaries, &initial_boundaries)?; if !(0.0..=1.0).contains(&selectivity) { return internal_err!("Selectivity is out of limit: {}", selectivity); @@ -235,16 +270,31 @@ fn shrink_boundaries( fn calculate_selectivity( target_boundaries: &[ExprBoundaries], initial_boundaries: &[ExprBoundaries], -) -> f64 { +) -> Result { // Since the intervals are assumed uniform and the values // are not correlated, we need to multiply the selectivities // of multiple columns to get the overall selectivity. - initial_boundaries - .iter() - .zip(target_boundaries.iter()) - .fold(1.0, |acc, (initial, target)| { - acc * cardinality_ratio(&initial.interval, &target.interval) - }) + if target_boundaries.len() != initial_boundaries.len() { + return Err(internal_datafusion_err!( + "The number of columns in the initial and target boundaries should be the same" + )); + } + let mut acc: f64 = 1.0; + for (initial, target) in initial_boundaries.iter().zip(target_boundaries) { + match (initial.interval.as_ref(), target.interval.as_ref()) { + (Some(initial), Some(target)) => { + acc *= cardinality_ratio(initial, target); + } + (None, Some(_)) => { + return internal_err!( + "Initial boundary cannot be None while having a Some() target boundary" + ); + } + _ => return Ok(0.0), + } + } + + Ok(acc) } #[cfg(test)] @@ -313,16 +363,6 @@ mod tests { Some(16), Some(19), ), - // (a > 10 AND a < 20) AND (a > 20 AND a < 30) - ( - col("a") - .gt(lit(10)) - .and(col("a").lt(lit(20))) - .and(col("a").gt(lit(20))) - .and(col("a").lt(lit(30))), - None, - None, - ), ]; for (expr, lower, upper) in test_cases { let boundaries = ExprBoundaries::try_new_unbounded(&schema).unwrap(); @@ -335,7 +375,9 @@ mod tests { df_schema.as_ref(), ) .unwrap(); - let actual = &analysis_result.boundaries[0].interval; + let Some(actual) = &analysis_result.boundaries[0].interval else { + panic!("The analysis result should contain non-empty intervals for all columns"); + }; let expected = Interval::make(lower, upper).unwrap(); assert_eq!( &expected, actual, @@ -344,6 +386,41 @@ mod tests { } } + #[test] + fn test_analyze_empty_set_boundary_exprs() { + let schema = Arc::new(Schema::new(vec![make_field("a", DataType::Int32)])); + + let test_cases: Vec = vec![ + // a > 10 AND a < 10 + col("a").gt(lit(10)).and(col("a").lt(lit(10))), + // a > 5 AND (a < 20 OR a > 20) + // a > 10 AND a < 20 + // (a > 10 AND a < 20) AND (a > 20 AND a < 30) + col("a") + .gt(lit(10)) + .and(col("a").lt(lit(20))) + .and(col("a").gt(lit(20))) + .and(col("a").lt(lit(30))), + ]; + + for expr in test_cases { + let boundaries = ExprBoundaries::try_new_unbounded(&schema).unwrap(); + let df_schema = DFSchema::try_from(Arc::clone(&schema)).unwrap(); + let physical_expr = + create_physical_expr(&expr, &df_schema, &ExecutionProps::new()).unwrap(); + let analysis_result = analyze( + &physical_expr, + AnalysisContext::new(boundaries), + df_schema.as_ref(), + ) + .unwrap(); + + for boundary in analysis_result.boundaries { + assert!(boundary.interval.is_none()); + } + } + } + #[test] fn test_analyze_invalid_boundary_exprs() { let schema = Arc::new(Schema::new(vec![make_field("a", DataType::Int32)])); diff --git a/datafusion/physical-plan/src/aggregates/group_values/multi_group_by/bytes.rs b/datafusion/physical-plan/src/aggregates/group_values/multi_group_by/bytes.rs index 8e975e10180f..e75c75a235b7 100644 --- a/datafusion/physical-plan/src/aggregates/group_values/multi_group_by/bytes.rs +++ b/datafusion/physical-plan/src/aggregates/group_values/multi_group_by/bytes.rs @@ -405,7 +405,7 @@ mod tests { use crate::aggregates::group_values::multi_group_by::bytes::ByteGroupValueBuilder; use arrow_array::{ArrayRef, StringArray}; - use arrow_buffer::{BooleanBufferBuilder, NullBuffer}; + use arrow_buffer::NullBufferBuilder; use datafusion_physical_expr::binary_map::OutputType; use super::GroupColumn; @@ -602,16 +602,15 @@ mod tests { .into_parts(); // explicitly build a boolean buffer where one of the null values also happens to match - let mut boolean_buffer_builder = BooleanBufferBuilder::new(6); - boolean_buffer_builder.append(true); - boolean_buffer_builder.append(false); // this sets Some("bar") to null above - boolean_buffer_builder.append(false); - boolean_buffer_builder.append(false); - boolean_buffer_builder.append(true); - boolean_buffer_builder.append(true); - let nulls = NullBuffer::new(boolean_buffer_builder.finish()); + let mut nulls = NullBufferBuilder::new(6); + nulls.append_non_null(); + nulls.append_null(); // this sets Some("bar") to null above + nulls.append_null(); + nulls.append_null(); + nulls.append_non_null(); + nulls.append_non_null(); let input_array = - Arc::new(StringArray::new(offsets, buffer, Some(nulls))) as ArrayRef; + Arc::new(StringArray::new(offsets, buffer, nulls.finish())) as ArrayRef; // Check let mut equal_to_results = vec![true; builder.len()]; diff --git a/datafusion/physical-plan/src/aggregates/group_values/multi_group_by/bytes_view.rs b/datafusion/physical-plan/src/aggregates/group_values/multi_group_by/bytes_view.rs index 811790f4e588..c3d88b894999 100644 --- a/datafusion/physical-plan/src/aggregates/group_values/multi_group_by/bytes_view.rs +++ b/datafusion/physical-plan/src/aggregates/group_values/multi_group_by/bytes_view.rs @@ -548,7 +548,7 @@ mod tests { use arrow::array::AsArray; use arrow::datatypes::StringViewType; use arrow_array::{ArrayRef, StringViewArray}; - use arrow_buffer::{BooleanBufferBuilder, NullBuffer}; + use arrow_buffer::NullBufferBuilder; use super::GroupColumn; @@ -751,22 +751,21 @@ mod tests { .into_parts(); // explicitly build a boolean buffer where one of the null values also happens to match - let mut boolean_buffer_builder = BooleanBufferBuilder::new(9); - boolean_buffer_builder.append(true); - boolean_buffer_builder.append(false); // this sets Some("bar") to null above - boolean_buffer_builder.append(false); - boolean_buffer_builder.append(false); - boolean_buffer_builder.append(true); - boolean_buffer_builder.append(true); - boolean_buffer_builder.append(true); - boolean_buffer_builder.append(true); - boolean_buffer_builder.append(true); - boolean_buffer_builder.append(true); - boolean_buffer_builder.append(true); - boolean_buffer_builder.append(true); - let nulls = NullBuffer::new(boolean_buffer_builder.finish()); + let mut nulls = NullBufferBuilder::new(9); + nulls.append_non_null(); + nulls.append_null(); // this sets Some("bar") to null above + nulls.append_null(); + nulls.append_null(); + nulls.append_non_null(); + nulls.append_non_null(); + nulls.append_non_null(); + nulls.append_non_null(); + nulls.append_non_null(); + nulls.append_non_null(); + nulls.append_non_null(); + nulls.append_non_null(); let input_array = - Arc::new(StringViewArray::new(views, buffer, Some(nulls))) as ArrayRef; + Arc::new(StringViewArray::new(views, buffer, nulls.finish())) as ArrayRef; // Check let mut equal_to_results = vec![true; input_array.len()]; diff --git a/datafusion/physical-plan/src/aggregates/group_values/multi_group_by/primitive.rs b/datafusion/physical-plan/src/aggregates/group_values/multi_group_by/primitive.rs index 4ceeb634bad2..cd5dfae86ee9 100644 --- a/datafusion/physical-plan/src/aggregates/group_values/multi_group_by/primitive.rs +++ b/datafusion/physical-plan/src/aggregates/group_values/multi_group_by/primitive.rs @@ -214,7 +214,7 @@ mod tests { use crate::aggregates::group_values::multi_group_by::primitive::PrimitiveGroupValueBuilder; use arrow::datatypes::Int64Type; use arrow_array::{ArrayRef, Int64Array}; - use arrow_buffer::{BooleanBufferBuilder, NullBuffer}; + use arrow_buffer::NullBufferBuilder; use arrow_schema::DataType; use super::GroupColumn; @@ -304,16 +304,15 @@ mod tests { Int64Array::from(vec![Some(1), Some(2), None, None, Some(1), Some(3)]) .into_parts(); - // explicitly build a boolean buffer where one of the null values also happens to match - let mut boolean_buffer_builder = BooleanBufferBuilder::new(6); - boolean_buffer_builder.append(true); - boolean_buffer_builder.append(false); // this sets Some(2) to null above - boolean_buffer_builder.append(false); - boolean_buffer_builder.append(false); - boolean_buffer_builder.append(true); - boolean_buffer_builder.append(true); - let nulls = NullBuffer::new(boolean_buffer_builder.finish()); - let input_array = Arc::new(Int64Array::new(values, Some(nulls))) as ArrayRef; + // explicitly build a null buffer where one of the null values also happens to match + let mut nulls = NullBufferBuilder::new(6); + nulls.append_non_null(); + nulls.append_null(); // this sets Some(2) to null above + nulls.append_null(); + nulls.append_null(); + nulls.append_non_null(); + nulls.append_non_null(); + let input_array = Arc::new(Int64Array::new(values, nulls.finish())) as ArrayRef; // Check let mut equal_to_results = vec![true; builder.len()]; diff --git a/datafusion/physical-plan/src/common.rs b/datafusion/physical-plan/src/common.rs index aefb90d1d1b7..20a4e89dba94 100644 --- a/datafusion/physical-plan/src/common.rs +++ b/datafusion/physical-plan/src/common.rs @@ -333,12 +333,14 @@ mod tests { distinct_count: Precision::Absent, max_value: Precision::Absent, min_value: Precision::Absent, + sum_value: Precision::Absent, null_count: Precision::Exact(0), }, ColumnStatistics { distinct_count: Precision::Absent, max_value: Precision::Absent, min_value: Precision::Absent, + sum_value: Precision::Absent, null_count: Precision::Exact(0), }, ], @@ -371,6 +373,7 @@ mod tests { distinct_count: Precision::Absent, max_value: Precision::Absent, min_value: Precision::Absent, + sum_value: Precision::Absent, null_count: Precision::Exact(3), }], }; diff --git a/datafusion/physical-plan/src/filter.rs b/datafusion/physical-plan/src/filter.rs index ae4a15ba5249..39f022b58cf3 100644 --- a/datafusion/physical-plan/src/filter.rs +++ b/datafusion/physical-plan/src/filter.rs @@ -41,7 +41,7 @@ use arrow::record_batch::RecordBatch; use datafusion_common::cast::as_boolean_array; use datafusion_common::stats::Precision; use datafusion_common::{ - internal_err, plan_err, project_schema, DataFusionError, Result, + internal_err, plan_err, project_schema, DataFusionError, Result, ScalarValue, }; use datafusion_execution::TaskContext; use datafusion_expr::Operator; @@ -457,6 +457,15 @@ fn collect_new_statistics( .. }, )| { + let Some(interval) = interval else { + // If the interval is `None`, we can say that there are no rows: + return ColumnStatistics { + null_count: Precision::Exact(0), + max_value: Precision::Exact(ScalarValue::Null), + min_value: Precision::Exact(ScalarValue::Null), + distinct_count: Precision::Exact(0), + }; + }; let (lower, upper) = interval.into_bounds(); let (min_value, max_value) = if lower.eq(&upper) { (Precision::Exact(lower), Precision::Exact(upper)) @@ -467,6 +476,7 @@ fn collect_new_statistics( null_count: input_column_stats[idx].null_count.to_inexact(), max_value, min_value, + sum_value: Precision::Absent, distinct_count: distinct_count.to_inexact(), } }, @@ -1078,14 +1088,16 @@ mod tests { statistics.column_statistics, vec![ ColumnStatistics { - min_value: Precision::Inexact(ScalarValue::Int32(Some(1))), - max_value: Precision::Inexact(ScalarValue::Int32(Some(100))), - ..Default::default() + min_value: Precision::Exact(ScalarValue::Null), + max_value: Precision::Exact(ScalarValue::Null), + distinct_count: Precision::Exact(0), + null_count: Precision::Exact(0), }, ColumnStatistics { - min_value: Precision::Inexact(ScalarValue::Int32(Some(1))), - max_value: Precision::Inexact(ScalarValue::Int32(Some(3))), - ..Default::default() + min_value: Precision::Exact(ScalarValue::Null), + max_value: Precision::Exact(ScalarValue::Null), + distinct_count: Precision::Exact(0), + null_count: Precision::Exact(0), }, ] ); @@ -1185,6 +1197,7 @@ mod tests { null_count: Precision::Absent, min_value: Precision::Inexact(ScalarValue::Int32(Some(5))), max_value: Precision::Inexact(ScalarValue::Int32(Some(10))), + sum_value: Precision::Absent, distinct_count: Precision::Absent, }], }; diff --git a/datafusion/physical-plan/src/joins/cross_join.rs b/datafusion/physical-plan/src/joins/cross_join.rs index 87fd0f96586a..ab94c132a209 100644 --- a/datafusion/physical-plan/src/joins/cross_join.rs +++ b/datafusion/physical-plan/src/joins/cross_join.rs @@ -411,12 +411,36 @@ fn stats_cartesian_product( distinct_count: s.distinct_count, min_value: s.min_value, max_value: s.max_value, + sum_value: s + .sum_value + .get_value() + // Cast the row count into the same type as any existing sum value + .and_then(|v| { + Precision::::from(right_row_count) + .cast_to(&v.data_type()) + .ok() + }) + .map(|row_count| s.sum_value.multiply(&row_count)) + .unwrap_or(Precision::Absent), }) - .chain(right_col_stats.into_iter().map(|s| ColumnStatistics { - null_count: s.null_count.multiply(&left_row_count), - distinct_count: s.distinct_count, - min_value: s.min_value, - max_value: s.max_value, + .chain(right_col_stats.into_iter().map(|s| { + ColumnStatistics { + null_count: s.null_count.multiply(&left_row_count), + distinct_count: s.distinct_count, + min_value: s.min_value, + max_value: s.max_value, + sum_value: s + .sum_value + .get_value() + // Cast the row count into the same type as any existing sum value + .and_then(|v| { + Precision::::from(left_row_count) + .cast_to(&v.data_type()) + .ok() + }) + .map(|row_count| s.sum_value.multiply(&row_count)) + .unwrap_or(Precision::Absent), + } })) .collect(); @@ -650,12 +674,14 @@ mod tests { distinct_count: Precision::Exact(5), max_value: Precision::Exact(ScalarValue::Int64(Some(21))), min_value: Precision::Exact(ScalarValue::Int64(Some(-4))), + sum_value: Precision::Exact(ScalarValue::Int64(Some(42))), null_count: Precision::Exact(0), }, ColumnStatistics { distinct_count: Precision::Exact(1), max_value: Precision::Exact(ScalarValue::from("x")), min_value: Precision::Exact(ScalarValue::from("a")), + sum_value: Precision::Absent, null_count: Precision::Exact(3), }, ], @@ -668,6 +694,7 @@ mod tests { distinct_count: Precision::Exact(3), max_value: Precision::Exact(ScalarValue::Int64(Some(12))), min_value: Precision::Exact(ScalarValue::Int64(Some(0))), + sum_value: Precision::Exact(ScalarValue::Int64(Some(20))), null_count: Precision::Exact(2), }], }; @@ -682,18 +709,25 @@ mod tests { distinct_count: Precision::Exact(5), max_value: Precision::Exact(ScalarValue::Int64(Some(21))), min_value: Precision::Exact(ScalarValue::Int64(Some(-4))), + sum_value: Precision::Exact(ScalarValue::Int64(Some( + 42 * right_row_count as i64, + ))), null_count: Precision::Exact(0), }, ColumnStatistics { distinct_count: Precision::Exact(1), max_value: Precision::Exact(ScalarValue::from("x")), min_value: Precision::Exact(ScalarValue::from("a")), + sum_value: Precision::Absent, null_count: Precision::Exact(3 * right_row_count), }, ColumnStatistics { distinct_count: Precision::Exact(3), max_value: Precision::Exact(ScalarValue::Int64(Some(12))), min_value: Precision::Exact(ScalarValue::Int64(Some(0))), + sum_value: Precision::Exact(ScalarValue::Int64(Some( + 20 * left_row_count as i64, + ))), null_count: Precision::Exact(2 * left_row_count), }, ], @@ -714,12 +748,14 @@ mod tests { distinct_count: Precision::Exact(5), max_value: Precision::Exact(ScalarValue::Int64(Some(21))), min_value: Precision::Exact(ScalarValue::Int64(Some(-4))), + sum_value: Precision::Exact(ScalarValue::Int64(Some(42))), null_count: Precision::Exact(0), }, ColumnStatistics { distinct_count: Precision::Exact(1), max_value: Precision::Exact(ScalarValue::from("x")), min_value: Precision::Exact(ScalarValue::from("a")), + sum_value: Precision::Absent, null_count: Precision::Exact(3), }, ], @@ -732,6 +768,7 @@ mod tests { distinct_count: Precision::Exact(3), max_value: Precision::Exact(ScalarValue::Int64(Some(12))), min_value: Precision::Exact(ScalarValue::Int64(Some(0))), + sum_value: Precision::Exact(ScalarValue::Int64(Some(20))), null_count: Precision::Exact(2), }], }; @@ -746,18 +783,23 @@ mod tests { distinct_count: Precision::Exact(5), max_value: Precision::Exact(ScalarValue::Int64(Some(21))), min_value: Precision::Exact(ScalarValue::Int64(Some(-4))), + sum_value: Precision::Absent, // we don't know the row count on the right null_count: Precision::Absent, // we don't know the row count on the right }, ColumnStatistics { distinct_count: Precision::Exact(1), max_value: Precision::Exact(ScalarValue::from("x")), min_value: Precision::Exact(ScalarValue::from("a")), + sum_value: Precision::Absent, null_count: Precision::Absent, // we don't know the row count on the right }, ColumnStatistics { distinct_count: Precision::Exact(3), max_value: Precision::Exact(ScalarValue::Int64(Some(12))), min_value: Precision::Exact(ScalarValue::Int64(Some(0))), + sum_value: Precision::Exact(ScalarValue::Int64(Some( + 20 * left_row_count as i64, + ))), null_count: Precision::Exact(2 * left_row_count), }, ], diff --git a/datafusion/physical-plan/src/joins/utils.rs b/datafusion/physical-plan/src/joins/utils.rs index dea4305fa6a1..5327793d01e2 100644 --- a/datafusion/physical-plan/src/joins/utils.rs +++ b/datafusion/physical-plan/src/joins/utils.rs @@ -2026,6 +2026,7 @@ mod tests { distinct_count, min_value: min.map(ScalarValue::from), max_value: max.map(ScalarValue::from), + sum_value: Absent, null_count, } } diff --git a/datafusion/physical-plan/src/memory.rs b/datafusion/physical-plan/src/memory.rs index 5ad3c4881b39..198b8ccd6992 100644 --- a/datafusion/physical-plan/src/memory.rs +++ b/datafusion/physical-plan/src/memory.rs @@ -937,6 +937,7 @@ mod tests { distinct_count: Precision::Absent, max_value: Precision::Absent, min_value: Precision::Absent, + sum_value: Precision::Absent, },], } ); diff --git a/datafusion/physical-plan/src/projection.rs b/datafusion/physical-plan/src/projection.rs index b364d4a870e3..3ebfd8f8ca80 100644 --- a/datafusion/physical-plan/src/projection.rs +++ b/datafusion/physical-plan/src/projection.rs @@ -1106,18 +1106,21 @@ mod tests { distinct_count: Precision::Exact(5), max_value: Precision::Exact(ScalarValue::Int64(Some(21))), min_value: Precision::Exact(ScalarValue::Int64(Some(-4))), + sum_value: Precision::Exact(ScalarValue::Int64(Some(42))), null_count: Precision::Exact(0), }, ColumnStatistics { distinct_count: Precision::Exact(1), max_value: Precision::Exact(ScalarValue::from("x")), min_value: Precision::Exact(ScalarValue::from("a")), + sum_value: Precision::Absent, null_count: Precision::Exact(3), }, ColumnStatistics { distinct_count: Precision::Absent, max_value: Precision::Exact(ScalarValue::Float32(Some(1.1))), min_value: Precision::Exact(ScalarValue::Float32(Some(0.1))), + sum_value: Precision::Exact(ScalarValue::Float32(Some(5.5))), null_count: Precision::Absent, }, ], @@ -1150,12 +1153,14 @@ mod tests { distinct_count: Precision::Exact(1), max_value: Precision::Exact(ScalarValue::from("x")), min_value: Precision::Exact(ScalarValue::from("a")), + sum_value: Precision::Absent, null_count: Precision::Exact(3), }, ColumnStatistics { distinct_count: Precision::Exact(5), max_value: Precision::Exact(ScalarValue::Int64(Some(21))), min_value: Precision::Exact(ScalarValue::Int64(Some(-4))), + sum_value: Precision::Exact(ScalarValue::Int64(Some(42))), null_count: Precision::Exact(0), }, ], @@ -1184,12 +1189,14 @@ mod tests { distinct_count: Precision::Absent, max_value: Precision::Exact(ScalarValue::Float32(Some(1.1))), min_value: Precision::Exact(ScalarValue::Float32(Some(0.1))), + sum_value: Precision::Exact(ScalarValue::Float32(Some(5.5))), null_count: Precision::Absent, }, ColumnStatistics { distinct_count: Precision::Exact(5), max_value: Precision::Exact(ScalarValue::Int64(Some(21))), min_value: Precision::Exact(ScalarValue::Int64(Some(-4))), + sum_value: Precision::Exact(ScalarValue::Int64(Some(42))), null_count: Precision::Exact(0), }, ], diff --git a/datafusion/physical-plan/src/union.rs b/datafusion/physical-plan/src/union.rs index bacd02398ec0..a41336ea6eb7 100644 --- a/datafusion/physical-plan/src/union.rs +++ b/datafusion/physical-plan/src/union.rs @@ -610,6 +610,7 @@ fn col_stats_union( left.distinct_count = Precision::Absent; left.min_value = left.min_value.min(&right.min_value); left.max_value = left.max_value.max(&right.max_value); + left.sum_value = left.sum_value.add(&right.sum_value); left.null_count = left.null_count.add(&right.null_count); left @@ -702,18 +703,21 @@ mod tests { distinct_count: Precision::Exact(5), max_value: Precision::Exact(ScalarValue::Int64(Some(21))), min_value: Precision::Exact(ScalarValue::Int64(Some(-4))), + sum_value: Precision::Exact(ScalarValue::Int64(Some(42))), null_count: Precision::Exact(0), }, ColumnStatistics { distinct_count: Precision::Exact(1), max_value: Precision::Exact(ScalarValue::from("x")), min_value: Precision::Exact(ScalarValue::from("a")), + sum_value: Precision::Absent, null_count: Precision::Exact(3), }, ColumnStatistics { distinct_count: Precision::Absent, max_value: Precision::Exact(ScalarValue::Float32(Some(1.1))), min_value: Precision::Exact(ScalarValue::Float32(Some(0.1))), + sum_value: Precision::Exact(ScalarValue::Float32(Some(42.0))), null_count: Precision::Absent, }, ], @@ -727,18 +731,21 @@ mod tests { distinct_count: Precision::Exact(3), max_value: Precision::Exact(ScalarValue::Int64(Some(34))), min_value: Precision::Exact(ScalarValue::Int64(Some(1))), + sum_value: Precision::Exact(ScalarValue::Int64(Some(42))), null_count: Precision::Exact(1), }, ColumnStatistics { distinct_count: Precision::Absent, max_value: Precision::Exact(ScalarValue::from("c")), min_value: Precision::Exact(ScalarValue::from("b")), + sum_value: Precision::Absent, null_count: Precision::Absent, }, ColumnStatistics { distinct_count: Precision::Absent, max_value: Precision::Absent, min_value: Precision::Absent, + sum_value: Precision::Absent, null_count: Precision::Absent, }, ], @@ -753,18 +760,21 @@ mod tests { distinct_count: Precision::Absent, max_value: Precision::Exact(ScalarValue::Int64(Some(34))), min_value: Precision::Exact(ScalarValue::Int64(Some(-4))), + sum_value: Precision::Exact(ScalarValue::Int64(Some(84))), null_count: Precision::Exact(1), }, ColumnStatistics { distinct_count: Precision::Absent, max_value: Precision::Exact(ScalarValue::from("x")), min_value: Precision::Exact(ScalarValue::from("a")), + sum_value: Precision::Absent, null_count: Precision::Absent, }, ColumnStatistics { distinct_count: Precision::Absent, max_value: Precision::Absent, min_value: Precision::Absent, + sum_value: Precision::Absent, null_count: Precision::Absent, }, ], diff --git a/datafusion/physical-plan/src/unnest.rs b/datafusion/physical-plan/src/unnest.rs index 3e2b3fb26d45..ef6797c9b10d 100644 --- a/datafusion/physical-plan/src/unnest.rs +++ b/datafusion/physical-plan/src/unnest.rs @@ -953,7 +953,7 @@ mod tests { use super::*; use arrow::datatypes::{Field, Int32Type}; use arrow_array::{GenericListArray, OffsetSizeTrait, StringArray}; - use arrow_buffer::{BooleanBufferBuilder, NullBuffer, OffsetBuffer}; + use arrow_buffer::{NullBuffer, NullBufferBuilder, OffsetBuffer}; use datafusion_common::assert_batches_eq; // Create a GenericListArray with the following list values: @@ -964,43 +964,43 @@ mod tests { { let mut values = vec![]; let mut offsets: Vec = vec![OffsetSize::zero()]; - let mut valid = BooleanBufferBuilder::new(6); + let mut valid = NullBufferBuilder::new(6); // [A, B, C] values.extend_from_slice(&[Some("A"), Some("B"), Some("C")]); offsets.push(OffsetSize::from_usize(values.len()).unwrap()); - valid.append(true); + valid.append_non_null(); // [] offsets.push(OffsetSize::from_usize(values.len()).unwrap()); - valid.append(true); + valid.append_non_null(); // NULL with non-zero value length // Issue https://github.com/apache/datafusion/issues/9932 values.push(Some("?")); offsets.push(OffsetSize::from_usize(values.len()).unwrap()); - valid.append(false); + valid.append_null(); // [D] values.push(Some("D")); offsets.push(OffsetSize::from_usize(values.len()).unwrap()); - valid.append(true); + valid.append_non_null(); // Another NULL with zero value length offsets.push(OffsetSize::from_usize(values.len()).unwrap()); - valid.append(false); + valid.append_null(); // [NULL, F] values.extend_from_slice(&[None, Some("F")]); offsets.push(OffsetSize::from_usize(values.len()).unwrap()); - valid.append(true); + valid.append_non_null(); let field = Arc::new(Field::new_list_field(DataType::Utf8, true)); GenericListArray::::new( field, OffsetBuffer::new(offsets.into()), Arc::new(StringArray::from(values)), - Some(NullBuffer::new(valid.finish())), + valid.finish(), ) } @@ -1055,10 +1055,10 @@ mod tests { let list_arr1_ref = Arc::new(list_arr1) as ArrayRef; let offsets = OffsetBuffer::from_lengths([3, 3, 0]); - let mut nulls = BooleanBufferBuilder::new(3); - nulls.append(true); - nulls.append(true); - nulls.append(false); + let mut nulls = NullBufferBuilder::new(3); + nulls.append_non_null(); + nulls.append_non_null(); + nulls.append_null(); // list> let col1_field = Field::new_list_field( DataType::List(Arc::new(Field::new_list_field( @@ -1074,7 +1074,7 @@ mod tests { )), offsets, list_arr1_ref, - Some(NullBuffer::new(nulls.finish())), + nulls.finish(), ); let list_arr2 = StringArray::from(vec![ @@ -1086,8 +1086,8 @@ mod tests { ]); let offsets = OffsetBuffer::from_lengths([2, 2, 1]); - let mut nulls = BooleanBufferBuilder::new(3); - nulls.append_n(3, true); + let mut nulls = NullBufferBuilder::new(3); + nulls.append_n_non_nulls(3); let col2_field = Field::new( "col2", DataType::List(Arc::new(Field::new_list_field(DataType::Utf8, true))), @@ -1097,7 +1097,7 @@ mod tests { Arc::new(Field::new_list_field(DataType::Utf8, true)), OffsetBuffer::new(offsets.into()), Arc::new(list_arr2), - Some(NullBuffer::new(nulls.finish())), + nulls.finish(), ); // convert col1 and col2 to a record batch let schema = Arc::new(Schema::new(vec![col1_field, col2_field])); diff --git a/datafusion/physical-plan/src/values.rs b/datafusion/physical-plan/src/values.rs index a30b8981fdd8..960e3f544ee0 100644 --- a/datafusion/physical-plan/src/values.rs +++ b/datafusion/physical-plan/src/values.rs @@ -34,7 +34,16 @@ use datafusion_execution::TaskContext; use datafusion_physical_expr::EquivalenceProperties; /// Execution plan for values list based relation (produces constant rows) -#[deprecated(since = "45.0.0", note = "Use `MemoryExec::try_new_as_values` instead")] +/// +/// Note this structure is the same as [`MemoryExec`] and is deprecated. +/// Please see the following for alternatives +/// * [`MemoryExec::try_new`] +/// * [`MemoryExec::try_new_from_batches`] +/// +/// [`MemoryExec`]: crate::memory::MemoryExec +/// [`MemoryExec::try_new`]: crate::memory::MemoryExec::try_new +/// [`MemoryExec::try_new_from_batches`]: crate::memory::MemoryExec::try_new_from_batches +#[deprecated(since = "45.0.0", note = "Use `MemoryExec` instead")] #[derive(Debug, Clone)] pub struct ValuesExec { /// The schema @@ -48,6 +57,7 @@ pub struct ValuesExec { #[allow(deprecated)] impl ValuesExec { /// Create a new values exec from data as expr + #[deprecated(since = "45.0.0", note = "Use `MemoryExec::try_new` instead")] pub fn try_new( schema: SchemaRef, data: Vec>>, @@ -101,6 +111,10 @@ impl ValuesExec { /// /// Errors if any of the batches don't match the provided schema, or if no /// batches are provided. + #[deprecated( + since = "45.0.0", + note = "Use `MemoryExec::try_new_from_batches` instead" + )] pub fn try_new_from_batches( schema: SchemaRef, batches: Vec, @@ -307,6 +321,7 @@ mod tests { distinct_count: Precision::Absent, max_value: Precision::Absent, min_value: Precision::Absent, + sum_value: Precision::Absent, },], } ); diff --git a/datafusion/proto-common/proto/datafusion_common.proto b/datafusion/proto-common/proto/datafusion_common.proto index 6a7dc1604b0a..1c2807f390bf 100644 --- a/datafusion/proto-common/proto/datafusion_common.proto +++ b/datafusion/proto-common/proto/datafusion_common.proto @@ -570,6 +570,7 @@ message Statistics { message ColumnStats { Precision min_value = 1; Precision max_value = 2; + Precision sum_value = 5; Precision null_count = 3; Precision distinct_count = 4; } diff --git a/datafusion/proto-common/src/from_proto/mod.rs b/datafusion/proto-common/src/from_proto/mod.rs index d88186fbf366..b022e52b6a6f 100644 --- a/datafusion/proto-common/src/from_proto/mod.rs +++ b/datafusion/proto-common/src/from_proto/mod.rs @@ -701,6 +701,11 @@ impl From<&protobuf::ColumnStats> for ColumnStatistics { } else { Precision::Absent }, + sum_value: if let Some(sum) = &cs.sum_value { + sum.clone().into() + } else { + Precision::Absent + }, distinct_count: if let Some(dc) = &cs.distinct_count { dc.clone().into() } else { diff --git a/datafusion/proto-common/src/generated/pbjson.rs b/datafusion/proto-common/src/generated/pbjson.rs index e9f9de09d4d1..40687de098c1 100644 --- a/datafusion/proto-common/src/generated/pbjson.rs +++ b/datafusion/proto-common/src/generated/pbjson.rs @@ -985,6 +985,9 @@ impl serde::Serialize for ColumnStats { if self.max_value.is_some() { len += 1; } + if self.sum_value.is_some() { + len += 1; + } if self.null_count.is_some() { len += 1; } @@ -998,6 +1001,9 @@ impl serde::Serialize for ColumnStats { if let Some(v) = self.max_value.as_ref() { struct_ser.serialize_field("maxValue", v)?; } + if let Some(v) = self.sum_value.as_ref() { + struct_ser.serialize_field("sumValue", v)?; + } if let Some(v) = self.null_count.as_ref() { struct_ser.serialize_field("nullCount", v)?; } @@ -1018,6 +1024,8 @@ impl<'de> serde::Deserialize<'de> for ColumnStats { "minValue", "max_value", "maxValue", + "sum_value", + "sumValue", "null_count", "nullCount", "distinct_count", @@ -1028,6 +1036,7 @@ impl<'de> serde::Deserialize<'de> for ColumnStats { enum GeneratedField { MinValue, MaxValue, + SumValue, NullCount, DistinctCount, } @@ -1053,6 +1062,7 @@ impl<'de> serde::Deserialize<'de> for ColumnStats { match value { "minValue" | "min_value" => Ok(GeneratedField::MinValue), "maxValue" | "max_value" => Ok(GeneratedField::MaxValue), + "sumValue" | "sum_value" => Ok(GeneratedField::SumValue), "nullCount" | "null_count" => Ok(GeneratedField::NullCount), "distinctCount" | "distinct_count" => Ok(GeneratedField::DistinctCount), _ => Err(serde::de::Error::unknown_field(value, FIELDS)), @@ -1076,6 +1086,7 @@ impl<'de> serde::Deserialize<'de> for ColumnStats { { let mut min_value__ = None; let mut max_value__ = None; + let mut sum_value__ = None; let mut null_count__ = None; let mut distinct_count__ = None; while let Some(k) = map_.next_key()? { @@ -1092,6 +1103,12 @@ impl<'de> serde::Deserialize<'de> for ColumnStats { } max_value__ = map_.next_value()?; } + GeneratedField::SumValue => { + if sum_value__.is_some() { + return Err(serde::de::Error::duplicate_field("sumValue")); + } + sum_value__ = map_.next_value()?; + } GeneratedField::NullCount => { if null_count__.is_some() { return Err(serde::de::Error::duplicate_field("nullCount")); @@ -1109,6 +1126,7 @@ impl<'de> serde::Deserialize<'de> for ColumnStats { Ok(ColumnStats { min_value: min_value__, max_value: max_value__, + sum_value: sum_value__, null_count: null_count__, distinct_count: distinct_count__, }) diff --git a/datafusion/proto-common/src/generated/prost.rs b/datafusion/proto-common/src/generated/prost.rs index 3263c1c755af..9e4a1ecb6b09 100644 --- a/datafusion/proto-common/src/generated/prost.rs +++ b/datafusion/proto-common/src/generated/prost.rs @@ -873,6 +873,8 @@ pub struct ColumnStats { pub min_value: ::core::option::Option, #[prost(message, optional, tag = "2")] pub max_value: ::core::option::Option, + #[prost(message, optional, tag = "5")] + pub sum_value: ::core::option::Option, #[prost(message, optional, tag = "3")] pub null_count: ::core::option::Option, #[prost(message, optional, tag = "4")] diff --git a/datafusion/proto-common/src/to_proto/mod.rs b/datafusion/proto-common/src/to_proto/mod.rs index 88bbbfd60426..ced1865795aa 100644 --- a/datafusion/proto-common/src/to_proto/mod.rs +++ b/datafusion/proto-common/src/to_proto/mod.rs @@ -750,6 +750,7 @@ impl From<&ColumnStatistics> for protobuf::ColumnStats { protobuf::ColumnStats { min_value: Some(protobuf::Precision::from(&s.min_value)), max_value: Some(protobuf::Precision::from(&s.max_value)), + sum_value: Some(protobuf::Precision::from(&s.sum_value)), null_count: Some(protobuf::Precision::from(&s.null_count)), distinct_count: Some(protobuf::Precision::from(&s.distinct_count)), } diff --git a/datafusion/proto/src/generated/datafusion_proto_common.rs b/datafusion/proto/src/generated/datafusion_proto_common.rs index 3263c1c755af..9e4a1ecb6b09 100644 --- a/datafusion/proto/src/generated/datafusion_proto_common.rs +++ b/datafusion/proto/src/generated/datafusion_proto_common.rs @@ -873,6 +873,8 @@ pub struct ColumnStats { pub min_value: ::core::option::Option, #[prost(message, optional, tag = "2")] pub max_value: ::core::option::Option, + #[prost(message, optional, tag = "5")] + pub sum_value: ::core::option::Option, #[prost(message, optional, tag = "3")] pub null_count: ::core::option::Option, #[prost(message, optional, tag = "4")] diff --git a/datafusion/sql/src/unparser/dialect.rs b/datafusion/sql/src/unparser/dialect.rs index 5c318a96ef6c..830435fd013c 100644 --- a/datafusion/sql/src/unparser/dialect.rs +++ b/datafusion/sql/src/unparser/dialect.rs @@ -23,7 +23,9 @@ use datafusion_expr::Expr; use regex::Regex; use sqlparser::tokenizer::Span; use sqlparser::{ - ast::{self, BinaryOperator, Function, Ident, ObjectName, TimezoneInfo}, + ast::{ + self, BinaryOperator, Function, Ident, ObjectName, TimezoneInfo, WindowFrameBound, + }, keywords::ALL_KEYWORDS, }; @@ -153,6 +155,18 @@ pub trait Dialect: Send + Sync { Ok(None) } + /// Allows the dialect to choose to omit window frame in unparsing + /// based on function name and window frame bound + /// Returns false if specific function name / window frame bound indicates no window frame is needed in unparsing + fn window_func_support_window_frame( + &self, + _func_name: &str, + _start_bound: &WindowFrameBound, + _end_bound: &WindowFrameBound, + ) -> bool { + true + } + /// Extends the dialect's default rules for unparsing scalar functions. /// This is useful for supporting application-specific UDFs or custom engine extensions. fn with_custom_scalar_overrides( @@ -500,6 +514,7 @@ pub struct CustomDialect { supports_column_alias_in_table_alias: bool, requires_derived_table_alias: bool, division_operator: BinaryOperator, + window_func_support_window_frame: bool, full_qualified_col: bool, unnest_as_table_factor: bool, } @@ -527,6 +542,7 @@ impl Default for CustomDialect { supports_column_alias_in_table_alias: true, requires_derived_table_alias: false, division_operator: BinaryOperator::Divide, + window_func_support_window_frame: true, full_qualified_col: false, unnest_as_table_factor: false, } @@ -634,6 +650,15 @@ impl Dialect for CustomDialect { self.division_operator.clone() } + fn window_func_support_window_frame( + &self, + _func_name: &str, + _start_bound: &WindowFrameBound, + _end_bound: &WindowFrameBound, + ) -> bool { + self.window_func_support_window_frame + } + fn full_qualified_col(&self) -> bool { self.full_qualified_col } @@ -675,6 +700,7 @@ pub struct CustomDialectBuilder { supports_column_alias_in_table_alias: bool, requires_derived_table_alias: bool, division_operator: BinaryOperator, + window_func_support_window_frame: bool, full_qualified_col: bool, unnest_as_table_factor: bool, } @@ -708,6 +734,7 @@ impl CustomDialectBuilder { supports_column_alias_in_table_alias: true, requires_derived_table_alias: false, division_operator: BinaryOperator::Divide, + window_func_support_window_frame: true, full_qualified_col: false, unnest_as_table_factor: false, } @@ -733,6 +760,7 @@ impl CustomDialectBuilder { .supports_column_alias_in_table_alias, requires_derived_table_alias: self.requires_derived_table_alias, division_operator: self.division_operator, + window_func_support_window_frame: self.window_func_support_window_frame, full_qualified_col: self.full_qualified_col, unnest_as_table_factor: self.unnest_as_table_factor, } @@ -857,6 +885,14 @@ impl CustomDialectBuilder { self } + pub fn with_window_func_support_window_frame( + mut self, + window_func_support_window_frame: bool, + ) -> Self { + self.window_func_support_window_frame = window_func_support_window_frame; + self + } + /// Customize the dialect to allow full qualified column names pub fn with_full_qualified_col(mut self, full_qualified_col: bool) -> Self { self.full_qualified_col = full_qualified_col; diff --git a/datafusion/sql/src/unparser/expr.rs b/datafusion/sql/src/unparser/expr.rs index 7a110fd0785c..96e1cec001cb 100644 --- a/datafusion/sql/src/unparser/expr.rs +++ b/datafusion/sql/src/unparser/expr.rs @@ -217,6 +217,21 @@ impl Unparser<'_> { let start_bound = self.convert_bound(&window_frame.start_bound)?; let end_bound = self.convert_bound(&window_frame.end_bound)?; + + let window_frame = if self.dialect.window_func_support_window_frame( + func_name, + &start_bound, + &end_bound, + ) { + Some(ast::WindowFrame { + units, + start_bound, + end_bound: Some(end_bound), + }) + } else { + None + }; + let over = Some(ast::WindowType::WindowSpec(ast::WindowSpec { window_name: None, partition_by: partition_by @@ -224,11 +239,7 @@ impl Unparser<'_> { .map(|e| self.expr_to_sql_inner(e)) .collect::>>()?, order_by, - window_frame: Some(ast::WindowFrame { - units, - start_bound, - end_bound: Option::from(end_bound), - }), + window_frame, })); Ok(ast::Expr::Function(Function { @@ -1632,6 +1643,7 @@ mod tests { use datafusion_functions_aggregate::expr_fn::sum; use datafusion_functions_nested::expr_fn::{array_element, make_array}; use datafusion_functions_nested::map::map; + use datafusion_functions_window::rank::rank_udwf; use datafusion_functions_window::row_number::row_number_udwf; use crate::unparser::dialect::{ @@ -2677,6 +2689,39 @@ mod tests { Ok(()) } + #[test] + fn test_window_func_support_window_frame() -> Result<()> { + let default_dialect: Arc = + Arc::new(CustomDialectBuilder::new().build()); + + let test_dialect: Arc = Arc::new( + CustomDialectBuilder::new() + .with_window_func_support_window_frame(false) + .build(), + ); + + for (dialect, expected) in [ + ( + default_dialect, + "rank() OVER (ORDER BY a ASC NULLS FIRST ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING)", + ), + (test_dialect, "rank() OVER (ORDER BY a ASC NULLS FIRST)"), + ] { + let unparser = Unparser::new(dialect.as_ref()); + let func = WindowFunctionDefinition::WindowUDF(rank_udwf()); + let mut window_func = WindowFunction::new(func, vec![]); + window_func.order_by = vec![Sort::new(col("a"), true, true)]; + let expr = Expr::WindowFunction(window_func); + let ast = unparser.expr_to_sql(&expr)?; + + let actual = ast.to_string(); + let expected = expected.to_string(); + + assert_eq!(actual, expected); + } + Ok(()) + } + #[test] fn test_utf8_view_to_sql() -> Result<()> { let dialect = CustomDialectBuilder::new() diff --git a/datafusion/sqllogictest/Cargo.toml b/datafusion/sqllogictest/Cargo.toml index c1dc41196b36..38fe14ab90b7 100644 --- a/datafusion/sqllogictest/Cargo.toml +++ b/datafusion/sqllogictest/Cargo.toml @@ -53,7 +53,7 @@ object_store = { workspace = true } postgres-protocol = { version = "0.6.7", optional = true } postgres-types = { version = "0.2.8", features = ["derive", "with-chrono-0_4"], optional = true } rust_decimal = { version = "1.36.0", features = ["tokio-pg"] } -sqllogictest = "0.26.0" +sqllogictest = "0.26.4" sqlparser = { workspace = true } tempfile = { workspace = true } testcontainers = { version = "0.23", features = ["default"], optional = true } diff --git a/datafusion/sqllogictest/src/engines/datafusion_engine/normalize.rs b/datafusion/sqllogictest/src/engines/datafusion_engine/normalize.rs index 7421edb87b11..09cf70280e7c 100644 --- a/datafusion/sqllogictest/src/engines/datafusion_engine/normalize.rs +++ b/datafusion/sqllogictest/src/engines/datafusion_engine/normalize.rs @@ -250,7 +250,7 @@ pub fn cell_to_string(col: &ArrayRef, row: usize) -> Result { } /// Converts columns to a result as expected by sqllogicteset. -pub(crate) fn convert_schema_to_types(columns: &Fields) -> Vec { +pub fn convert_schema_to_types(columns: &Fields) -> Vec { columns .iter() .map(|f| f.data_type()) diff --git a/datafusion/sqllogictest/src/engines/mod.rs b/datafusion/sqllogictest/src/engines/mod.rs index 7b65c0aa77cb..3569dea70176 100644 --- a/datafusion/sqllogictest/src/engines/mod.rs +++ b/datafusion/sqllogictest/src/engines/mod.rs @@ -21,6 +21,8 @@ mod datafusion_engine; mod output; pub use datafusion_engine::convert_batches; +pub use datafusion_engine::convert_schema_to_types; +pub use datafusion_engine::DFSqlLogicTestError; pub use datafusion_engine::DataFusion; pub use output::DFColumnType; pub use output::DFOutput; diff --git a/datafusion/sqllogictest/src/lib.rs b/datafusion/sqllogictest/src/lib.rs index 82f194321a8e..0ea55782d34e 100644 --- a/datafusion/sqllogictest/src/lib.rs +++ b/datafusion/sqllogictest/src/lib.rs @@ -20,8 +20,10 @@ mod engines; pub use engines::convert_batches; +pub use engines::convert_schema_to_types; pub use engines::DFColumnType; pub use engines::DFOutput; +pub use engines::DFSqlLogicTestError; pub use engines::DataFusion; #[cfg(feature = "postgres")] diff --git a/datafusion/sqllogictest/test_files/join.slt b/datafusion/sqllogictest/test_files/join.slt.part similarity index 100% rename from datafusion/sqllogictest/test_files/join.slt rename to datafusion/sqllogictest/test_files/join.slt.part diff --git a/datafusion/sqllogictest/test_files/join_disable_repartition_joins.slt b/datafusion/sqllogictest/test_files/join_disable_repartition_joins.slt index cf897d628da5..0bdf223a11b7 100644 --- a/datafusion/sqllogictest/test_files/join_disable_repartition_joins.slt +++ b/datafusion/sqllogictest/test_files/join_disable_repartition_joins.slt @@ -23,7 +23,7 @@ statement ok set datafusion.optimizer.repartition_joins = false; -include ./join.slt +include ./join.slt.part statement ok CREATE EXTERNAL TABLE annotated_data ( diff --git a/datafusion/sqllogictest/test_files/join_only.slt b/datafusion/sqllogictest/test_files/join_only.slt new file mode 100644 index 000000000000..b2b6a1fa9b9d --- /dev/null +++ b/datafusion/sqllogictest/test_files/join_only.slt @@ -0,0 +1,18 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at + +# http://www.apache.org/licenses/LICENSE-2.0 + +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +include ./join.slt.part diff --git a/datafusion/sqllogictest/test_files/joins.slt b/datafusion/sqllogictest/test_files/joins.slt index 496c6c609e45..ac02aeb6fea4 100644 --- a/datafusion/sqllogictest/test_files/joins.slt +++ b/datafusion/sqllogictest/test_files/joins.slt @@ -4240,10 +4240,8 @@ EXPLAIN SELECT * FROM t0 FULL JOIN t1 ON t0.c1 = t1.c1 LIMIT 2; logical_plan 01)Limit: skip=0, fetch=2 02)--Full Join: t0.c1 = t1.c1 -03)----Limit: skip=0, fetch=2 -04)------TableScan: t0 projection=[c1, c2], fetch=2 -05)----Limit: skip=0, fetch=2 -06)------TableScan: t1 projection=[c1, c2, c3], fetch=2 +03)----TableScan: t0 projection=[c1, c2] +04)----TableScan: t1 projection=[c1, c2, c3] physical_plan 01)CoalesceBatchesExec: target_batch_size=3, fetch=2 02)--HashJoinExec: mode=CollectLeft, join_type=Full, on=[(c1@0, c1@0)] @@ -4257,10 +4255,8 @@ EXPLAIN SELECT * FROM t0 FULL JOIN t1 ON t0.c2 >= t1.c2 LIMIT 2; logical_plan 01)Limit: skip=0, fetch=2 02)--Full Join: Filter: t0.c2 >= t1.c2 -03)----Limit: skip=0, fetch=2 -04)------TableScan: t0 projection=[c1, c2], fetch=2 -05)----Limit: skip=0, fetch=2 -06)------TableScan: t1 projection=[c1, c2, c3], fetch=2 +03)----TableScan: t0 projection=[c1, c2] +04)----TableScan: t1 projection=[c1, c2, c3] physical_plan 01)GlobalLimitExec: skip=0, fetch=2 02)--NestedLoopJoinExec: join_type=Full, filter=c2@0 >= c2@1 @@ -4274,16 +4270,155 @@ EXPLAIN SELECT * FROM t0 FULL JOIN t1 ON t0.c1 = t1.c1 AND t0.c2 >= t1.c2 LIMIT logical_plan 01)Limit: skip=0, fetch=2 02)--Full Join: t0.c1 = t1.c1 Filter: t0.c2 >= t1.c2 -03)----Limit: skip=0, fetch=2 -04)------TableScan: t0 projection=[c1, c2], fetch=2 -05)----Limit: skip=0, fetch=2 -06)------TableScan: t1 projection=[c1, c2, c3], fetch=2 +03)----TableScan: t0 projection=[c1, c2] +04)----TableScan: t1 projection=[c1, c2, c3] physical_plan 01)CoalesceBatchesExec: target_batch_size=3, fetch=2 02)--HashJoinExec: mode=CollectLeft, join_type=Full, on=[(c1@0, c1@0)], filter=c2@0 >= c2@1 03)----MemoryExec: partitions=1, partition_sizes=[1] 04)----MemoryExec: partitions=1, partition_sizes=[1] +## Add more test cases for join limit pushdown +statement ok +drop table t1 + +## Test limit pushdown through OUTER JOIN including left/right and full outer join cases +statement ok +set datafusion.execution.target_partitions = 1; + +### Limit pushdown through join + +# Note we use csv as MemoryExec does not support limit push down (so doesn't manifest +# bugs if limits are improperly pushed down) +query I +COPY (values (1), (2), (3), (4), (5)) TO 'test_files/scratch/limit/t1.csv' +STORED AS CSV +---- +5 + +# store t2 in different order so the top N rows are not the same as the top N rows of t1 +query I +COPY (values (5), (4), (3), (2), (1)) TO 'test_files/scratch/limit/t2.csv' +STORED AS CSV +---- +5 + +statement ok +create external table t1(a int) stored as CSV location 'test_files/scratch/limit/t1.csv'; + +statement ok +create external table t2(b int) stored as CSV location 'test_files/scratch/limit/t2.csv'; + +###### +## LEFT JOIN w/ LIMIT +###### +query II +select * from t1 LEFT JOIN t2 ON t1.a = t2.b LIMIT 2; +---- +2 2 +1 1 + +# the output of this query should be two rows from the previous query +# there should be no nulls +query II +select * from t1 LEFT JOIN t2 ON t1.a = t2.b LIMIT 2; +---- +2 2 +1 1 + +# can only push down to t1 (preserved side) +query TT +explain select * from t1 LEFT JOIN t2 ON t1.a = t2.b LIMIT 2; +---- +logical_plan +01)Limit: skip=0, fetch=2 +02)--Left Join: t1.a = t2.b +03)----Limit: skip=0, fetch=2 +04)------TableScan: t1 projection=[a], fetch=2 +05)----TableScan: t2 projection=[b] +physical_plan +01)CoalesceBatchesExec: target_batch_size=3, fetch=2 +02)--HashJoinExec: mode=CollectLeft, join_type=Left, on=[(a@0, b@0)] +03)----CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/limit/t1.csv]]}, projection=[a], limit=2, has_header=true +04)----CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/limit/t2.csv]]}, projection=[b], has_header=true + +###### +## RIGHT JOIN w/ LIMIT +###### + +query II +select * from t1 RIGHT JOIN t2 ON t1.a = t2.b LIMIT 2; +---- +5 5 +4 4 + +# the output of this query should be two rows from the previous query +# there should be no nulls +query II +select * from t1 RIGHT JOIN t2 ON t1.a = t2.b LIMIT 2; +---- +5 5 +4 4 + +# can only push down to t2 (preserved side) +query TT +explain select * from t1 RIGHT JOIN t2 ON t1.a = t2.b LIMIT 2; +---- +logical_plan +01)Limit: skip=0, fetch=2 +02)--Right Join: t1.a = t2.b +03)----TableScan: t1 projection=[a] +04)----Limit: skip=0, fetch=2 +05)------TableScan: t2 projection=[b], fetch=2 +physical_plan +01)CoalesceBatchesExec: target_batch_size=3, fetch=2 +02)--HashJoinExec: mode=CollectLeft, join_type=Right, on=[(a@0, b@0)] +03)----CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/limit/t1.csv]]}, projection=[a], has_header=true +04)----CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/limit/t2.csv]]}, projection=[b], limit=2, has_header=true + +###### +## FULL JOIN w/ LIMIT +###### +query II rowsort +select * from t1 FULL JOIN t2 ON t1.a = t2.b; +---- +1 1 +2 2 +3 3 +4 4 +5 5 + +# the output of this query should be two rows from the previous query +# there should be no nulls +# Reproducer for https://github.com/apache/datafusion/issues/14335 +query II +select * from t1 FULL JOIN t2 ON t1.a = t2.b LIMIT 2; +---- +5 5 +4 4 + + +# can't push limit for full outer join +query TT +explain select * from t1 FULL JOIN t2 ON t1.a = t2.b LIMIT 2; +---- +logical_plan +01)Limit: skip=0, fetch=2 +02)--Full Join: t1.a = t2.b +03)----TableScan: t1 projection=[a] +04)----TableScan: t2 projection=[b] +physical_plan +01)CoalesceBatchesExec: target_batch_size=3, fetch=2 +02)--HashJoinExec: mode=CollectLeft, join_type=Full, on=[(a@0, b@0)] +03)----CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/limit/t1.csv]]}, projection=[a], has_header=true +04)----CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/limit/t2.csv]]}, projection=[b], has_header=true + +statement ok +drop table t1; + +statement ok +drop table t2; + # Test Utf8View as Join Key # Issue: https://github.com/apache/datafusion/issues/12468 statement ok