From 57d739aae08e8392a5669c3125816d355cb39313 Mon Sep 17 00:00:00 2001 From: Marco Neumann Date: Tue, 24 Jun 2025 10:06:58 +0200 Subject: [PATCH 1/9] ci: test CI --- ci.test | 0 1 file changed, 0 insertions(+), 0 deletions(-) create mode 100644 ci.test diff --git a/ci.test b/ci.test new file mode 100644 index 000000000000..e69de29bb2d1 From ee563770c13add03f541eaaef57744033ab9f17c Mon Sep 17 00:00:00 2001 From: kosiew Date: Sat, 7 Jun 2025 02:03:09 +0800 Subject: [PATCH 2/9] Fix intermittent SQL logic test failure in limit.slt by adding ORDER BY clause (#16257) * Add order by clause to limit query for consistent results * test: update explain plan --- datafusion/sqllogictest/test_files/limit.slt | 15 ++++++++------- 1 file changed, 8 insertions(+), 7 deletions(-) diff --git a/datafusion/sqllogictest/test_files/limit.slt b/datafusion/sqllogictest/test_files/limit.slt index 1af14a52e2bc..8a5fdd74dd53 100644 --- a/datafusion/sqllogictest/test_files/limit.slt +++ b/datafusion/sqllogictest/test_files/limit.slt @@ -835,6 +835,7 @@ explain with selection as ( select * from test_limit_with_partitions + order by part_key limit 1 ) select 1 as foo @@ -847,19 +848,19 @@ logical_plan 02)--Sort: selection.part_key ASC NULLS LAST, fetch=1000 03)----Projection: Int64(1) AS foo, selection.part_key 04)------SubqueryAlias: selection -05)--------Limit: skip=0, fetch=1 -06)----------TableScan: test_limit_with_partitions projection=[part_key], fetch=1 +05)--------Sort: test_limit_with_partitions.part_key ASC NULLS LAST, fetch=1 +06)----------TableScan: test_limit_with_partitions projection=[part_key] physical_plan -01)ProjectionExec: expr=[foo@0 as foo] -02)--SortExec: TopK(fetch=1000), expr=[part_key@1 ASC NULLS LAST], preserve_partitioning=[false] -03)----ProjectionExec: expr=[1 as foo, part_key@0 as part_key] -04)------CoalescePartitionsExec: fetch=1 -05)--------DataSourceExec: file_groups={3 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet/test_limit_with_partitions/part-2.parquet:0..265], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet/test_limit_with_partitions/part-2.parquet:265..530], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet/test_limit_with_partitions/part-2.parquet:530..794]]}, projection=[part_key], limit=1, file_type=parquet +01)ProjectionExec: expr=[1 as foo] +02)--SortPreservingMergeExec: [part_key@0 ASC NULLS LAST], fetch=1 +03)----SortExec: TopK(fetch=1), expr=[part_key@0 ASC NULLS LAST], preserve_partitioning=[true] +04)------DataSourceExec: file_groups={3 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet/test_limit_with_partitions/part-0.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet/test_limit_with_partitions/part-1.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet/test_limit_with_partitions/part-2.parquet]]}, projection=[part_key], file_type=parquet query I with selection as ( select * from test_limit_with_partitions + order by part_key limit 1 ) select 1 as foo From acc8ea43271ded61dc741c2ec152d3c023621ecb Mon Sep 17 00:00:00 2001 From: wiedld Date: Tue, 15 Oct 2024 12:51:48 -0700 Subject: [PATCH 3/9] chore: default=true for skip_physical_aggregate_schema_check, and add warn logging --- datafusion/common/src/config.rs | 2 +- datafusion/core/src/physical_planner.rs | 3 +++ datafusion/sqllogictest/test_files/information_schema.slt | 4 ++-- docs/source/user-guide/configs.md | 2 +- 4 files changed, 7 insertions(+), 4 deletions(-) diff --git a/datafusion/common/src/config.rs b/datafusion/common/src/config.rs index 883d2b60a897..7152bc67cf99 100644 --- a/datafusion/common/src/config.rs +++ b/datafusion/common/src/config.rs @@ -328,7 +328,7 @@ config_namespace! { /// /// This is used to workaround bugs in the planner that are now caught by /// the new schema verification step. - pub skip_physical_aggregate_schema_check: bool, default = false + pub skip_physical_aggregate_schema_check: bool, default = true /// Specifies the reserved memory for each spillable sort operation to /// facilitate an in-memory merge. diff --git a/datafusion/core/src/physical_planner.rs b/datafusion/core/src/physical_planner.rs index c65fcb4c4c93..7a65fa080648 100644 --- a/datafusion/core/src/physical_planner.rs +++ b/datafusion/core/src/physical_planner.rs @@ -692,6 +692,9 @@ impl DefaultPhysicalPlanner { differences.push(format!("field nullability at index {} [{}]: (physical) {} vs (logical) {}", i, physical_field.name(), physical_field.is_nullable(), logical_field.is_nullable())); } } + + log::warn!("Physical input schema should be the same as the one converted from logical input schema, but did not match for logical plan:\n{}", input.display_indent()); + return internal_err!("Physical input schema should be the same as the one converted from logical input schema. Differences: {}", differences .iter() .map(|s| format!("\n\t- {s}")) diff --git a/datafusion/sqllogictest/test_files/information_schema.slt b/datafusion/sqllogictest/test_files/information_schema.slt index 2ce64ffc6836..ccf0e73e6e0e 100644 --- a/datafusion/sqllogictest/test_files/information_schema.slt +++ b/datafusion/sqllogictest/test_files/information_schema.slt @@ -259,7 +259,7 @@ datafusion.execution.parquet.writer_version 1.0 datafusion.execution.planning_concurrency 13 datafusion.execution.skip_partial_aggregation_probe_ratio_threshold 0.8 datafusion.execution.skip_partial_aggregation_probe_rows_threshold 100000 -datafusion.execution.skip_physical_aggregate_schema_check false +datafusion.execution.skip_physical_aggregate_schema_check true datafusion.execution.soft_max_rows_per_output_file 50000000 datafusion.execution.sort_in_place_threshold_bytes 1048576 datafusion.execution.sort_spill_reservation_bytes 10485760 @@ -369,7 +369,7 @@ datafusion.execution.parquet.writer_version 1.0 (writing) Sets parquet writer ve datafusion.execution.planning_concurrency 13 Fan-out during initial physical planning. This is mostly use to plan `UNION` children in parallel. Defaults to the number of CPU cores on the system datafusion.execution.skip_partial_aggregation_probe_ratio_threshold 0.8 Aggregation ratio (number of distinct groups / number of input rows) threshold for skipping partial aggregation. If the value is greater then partial aggregation will skip aggregation for further input datafusion.execution.skip_partial_aggregation_probe_rows_threshold 100000 Number of input rows partial aggregation partition should process, before aggregation ratio check and trying to switch to skipping aggregation mode -datafusion.execution.skip_physical_aggregate_schema_check false When set to true, skips verifying that the schema produced by planning the input of `LogicalPlan::Aggregate` exactly matches the schema of the input plan. When set to false, if the schema does not match exactly (including nullability and metadata), a planning error will be raised. This is used to workaround bugs in the planner that are now caught by the new schema verification step. +datafusion.execution.skip_physical_aggregate_schema_check true When set to true, skips verifying that the schema produced by planning the input of `LogicalPlan::Aggregate` exactly matches the schema of the input plan. When set to false, if the schema does not match exactly (including nullability and metadata), a planning error will be raised. This is used to workaround bugs in the planner that are now caught by the new schema verification step. datafusion.execution.soft_max_rows_per_output_file 50000000 Target number of rows in output files when writing multiple. This is a soft max, so it can be exceeded slightly. There also will be one file smaller than the limit if the total number of rows written is not roughly divisible by the soft max datafusion.execution.sort_in_place_threshold_bytes 1048576 When sorting, below what size should data be concatenated and sorted in a single RecordBatch rather than sorted in batches and merged. datafusion.execution.sort_spill_reservation_bytes 10485760 Specifies the reserved memory for each spillable sort operation to facilitate an in-memory merge. When a sort operation spills to disk, the in-memory data must be sorted and merged before being written to a file. This setting reserves a specific amount of memory for that in-memory sort/merge process. Note: This setting is irrelevant if the sort operation cannot spill (i.e., if there's no `DiskManager` configured). diff --git a/docs/source/user-guide/configs.md b/docs/source/user-guide/configs.md index 05cc36651a1a..5bcc0b14df82 100644 --- a/docs/source/user-guide/configs.md +++ b/docs/source/user-guide/configs.md @@ -82,7 +82,7 @@ Environment variables are read during `SessionConfig` initialisation so they mus | datafusion.execution.parquet.maximum_parallel_row_group_writers | 1 | (writing) By default parallel parquet writer is tuned for minimum memory usage in a streaming execution plan. You may see a performance benefit when writing large parquet files by increasing maximum_parallel_row_group_writers and maximum_buffered_record_batches_per_stream if your system has idle cores and can tolerate additional memory usage. Boosting these values is likely worthwhile when writing out already in-memory data, such as from a cached data frame. | | datafusion.execution.parquet.maximum_buffered_record_batches_per_stream | 2 | (writing) By default parallel parquet writer is tuned for minimum memory usage in a streaming execution plan. You may see a performance benefit when writing large parquet files by increasing maximum_parallel_row_group_writers and maximum_buffered_record_batches_per_stream if your system has idle cores and can tolerate additional memory usage. Boosting these values is likely worthwhile when writing out already in-memory data, such as from a cached data frame. | | datafusion.execution.planning_concurrency | 0 | Fan-out during initial physical planning. This is mostly use to plan `UNION` children in parallel. Defaults to the number of CPU cores on the system | -| datafusion.execution.skip_physical_aggregate_schema_check | false | When set to true, skips verifying that the schema produced by planning the input of `LogicalPlan::Aggregate` exactly matches the schema of the input plan. When set to false, if the schema does not match exactly (including nullability and metadata), a planning error will be raised. This is used to workaround bugs in the planner that are now caught by the new schema verification step. | +| datafusion.execution.skip_physical_aggregate_schema_check | true | When set to true, skips verifying that the schema produced by planning the input of `LogicalPlan::Aggregate` exactly matches the schema of the input plan. When set to false, if the schema does not match exactly (including nullability and metadata), a planning error will be raised. This is used to workaround bugs in the planner that are now caught by the new schema verification step. | | datafusion.execution.sort_spill_reservation_bytes | 10485760 | Specifies the reserved memory for each spillable sort operation to facilitate an in-memory merge. When a sort operation spills to disk, the in-memory data must be sorted and merged before being written to a file. This setting reserves a specific amount of memory for that in-memory sort/merge process. Note: This setting is irrelevant if the sort operation cannot spill (i.e., if there's no `DiskManager` configured). | | datafusion.execution.sort_in_place_threshold_bytes | 1048576 | When sorting, below what size should data be concatenated and sorted in a single RecordBatch rather than sorted in batches and merged. | | datafusion.execution.meta_fetch_concurrency | 32 | Number of files to read in parallel when inferring schema and statistics | From 341f9e5a891eaed97ba1e7fea14d41c85d74004b Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Tue, 16 Jul 2024 12:14:19 -0400 Subject: [PATCH 4/9] (New) Test + workaround for SanityCheck plan --- datafusion/physical-optimizer/src/sanity_checker.rs | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/datafusion/physical-optimizer/src/sanity_checker.rs b/datafusion/physical-optimizer/src/sanity_checker.rs index 8edbb0f09114..ad01a0047eaa 100644 --- a/datafusion/physical-optimizer/src/sanity_checker.rs +++ b/datafusion/physical-optimizer/src/sanity_checker.rs @@ -32,6 +32,8 @@ use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode}; use datafusion_physical_expr::intervals::utils::{check_support, is_datatype_supported}; use datafusion_physical_plan::execution_plan::{Boundedness, EmissionType}; use datafusion_physical_plan::joins::SymmetricHashJoinExec; +use datafusion_physical_plan::sorts::sort::SortExec; +use datafusion_physical_plan::union::UnionExec; use datafusion_physical_plan::{get_plan_string, ExecutionPlanProperties}; use crate::PhysicalOptimizerRule; @@ -135,6 +137,14 @@ pub fn check_plan_sanity( plan.required_input_ordering(), plan.required_input_distribution(), ) { + // TEMP HACK WORKAROUND https://github.com/apache/datafusion/issues/11492 + if child.as_any().downcast_ref::().is_some() { + continue; + } + if child.as_any().downcast_ref::().is_some() { + continue; + } + let child_eq_props = child.equivalence_properties(); if let Some(sort_req) = sort_req { if !child_eq_props.ordering_satisfy_requirement(&sort_req) { From 9dc8df32b90cd9db0fbac799502b3bbce51c1027 Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Tue, 24 Jun 2025 14:54:00 +0200 Subject: [PATCH 5/9] chore: skip order calculation / exponential planning --- .../src/equivalence/properties/union.rs | 42 +++++++++++++++---- 1 file changed, 34 insertions(+), 8 deletions(-) diff --git a/datafusion/physical-expr/src/equivalence/properties/union.rs b/datafusion/physical-expr/src/equivalence/properties/union.rs index 64ef9278e248..c23e4670164b 100644 --- a/datafusion/physical-expr/src/equivalence/properties/union.rs +++ b/datafusion/physical-expr/src/equivalence/properties/union.rs @@ -76,17 +76,38 @@ fn calculate_union_binary( }) .collect::>(); + // TEMP HACK WORKAROUND + // Revert code from https://github.com/apache/datafusion/pull/12562 + // Context: https://github.com/apache/datafusion/issues/13748 + // Context: https://github.com/influxdata/influxdb_iox/issues/13038 + // Next, calculate valid orderings for the union by searching for prefixes // in both sides. - let mut orderings = UnionEquivalentOrderingBuilder::new(); - orderings.add_satisfied_orderings(lhs.normalized_oeq_class(), lhs.constants(), &rhs); - orderings.add_satisfied_orderings(rhs.normalized_oeq_class(), rhs.constants(), &lhs); - let orderings = orderings.build(); - - let mut eq_properties = - EquivalenceProperties::new(lhs.schema).with_constants(constants); - + let mut orderings = vec![]; + for mut ordering in lhs.normalized_oeq_class().into_iter() { + // Progressively shorten the ordering to search for a satisfied prefix: + while !rhs.ordering_satisfy(&ordering) { + ordering.pop(); + } + // There is a non-trivial satisfied prefix, add it as a valid ordering: + if !ordering.is_empty() { + orderings.push(ordering); + } + } + for mut ordering in rhs.normalized_oeq_class().into_iter() { + // Progressively shorten the ordering to search for a satisfied prefix: + while !lhs.ordering_satisfy(&ordering) { + ordering.pop(); + } + // There is a non-trivial satisfied prefix, add it as a valid ordering: + if !ordering.is_empty() { + orderings.push(ordering); + } + } + let mut eq_properties = EquivalenceProperties::new(lhs.schema); + eq_properties.constants = constants; eq_properties.add_new_orderings(orderings); + Ok(eq_properties) } @@ -132,6 +153,7 @@ struct UnionEquivalentOrderingBuilder { orderings: Vec, } +#[expect(unused)] impl UnionEquivalentOrderingBuilder { fn new() -> Self { Self { orderings: vec![] } @@ -509,6 +531,7 @@ mod tests { } #[test] + #[ignore = "InfluxData patch: chore: skip order calculation / exponential planning"] fn test_union_equivalence_properties_constants_fill_gaps() { let schema = create_test_schema().unwrap(); UnionEquivalenceTest::new(&schema) @@ -584,6 +607,7 @@ mod tests { } #[test] + #[ignore = "InfluxData patch: chore: skip order calculation / exponential planning"] fn test_union_equivalence_properties_constants_fill_gaps_non_symmetric() { let schema = create_test_schema().unwrap(); UnionEquivalenceTest::new(&schema) @@ -611,6 +635,7 @@ mod tests { } #[test] + #[ignore = "InfluxData patch: chore: skip order calculation / exponential planning"] fn test_union_equivalence_properties_constants_gap_fill_symmetric() { let schema = create_test_schema().unwrap(); UnionEquivalenceTest::new(&schema) @@ -662,6 +687,7 @@ mod tests { } #[test] + #[ignore = "InfluxData patch: chore: skip order calculation / exponential planning"] fn test_union_equivalence_properties_constants_middle_desc() { let schema = create_test_schema().unwrap(); UnionEquivalenceTest::new(&schema) From 58f69dfeffbadb2d50abe498746aa4a3bd786565 Mon Sep 17 00:00:00 2001 From: Marco Neumann Date: Tue, 1 Jul 2025 16:43:37 +0200 Subject: [PATCH 6/9] fix: temporary fix to handle incorrect coalesce (inserted during EnforceDistribution) which later causes an error during EnforceSort (without our patch). The next DataFusion version 46 upgrade does the proper fix, which is to not insert the coalesce in the first place. test: recreating the iox plan: * demonstrate the insertion of coalesce after the use of column estimates, and the removal of the test scenario's forcing of rr repartitioning test: reproducer of SanityCheck failure after EnforceSorting removes the coalesce added in the EnforceDistribution fix: special case to not remove the needed coalesce --- .../enforce_distribution.rs | 346 +++++++++++++++++- .../physical_optimizer/enforce_sorting.rs | 98 ++++- .../src/enforce_sorting/mod.rs | 6 +- datafusion/physical-optimizer/src/utils.rs | 6 + 4 files changed, 438 insertions(+), 18 deletions(-) diff --git a/datafusion/core/tests/physical_optimizer/enforce_distribution.rs b/datafusion/core/tests/physical_optimizer/enforce_distribution.rs index 4034800c30cb..e616a379254f 100644 --- a/datafusion/core/tests/physical_optimizer/enforce_distribution.rs +++ b/datafusion/core/tests/physical_optimizer/enforce_distribution.rs @@ -21,7 +21,7 @@ use std::sync::Arc; use crate::physical_optimizer::test_utils::{ check_integrity, coalesce_partitions_exec, repartition_exec, schema, - sort_merge_join_exec, sort_preserving_merge_exec, + sort_merge_join_exec, sort_preserving_merge_exec, trim_plan_display, }; use crate::physical_optimizer::test_utils::{ parquet_exec_with_sort, parquet_exec_with_stats, @@ -40,10 +40,12 @@ use datafusion::datasource::MemTable; use datafusion::prelude::{SessionConfig, SessionContext}; use datafusion_common::error::Result; use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode}; -use datafusion_common::ScalarValue; +use datafusion_common::{assert_contains, ScalarValue}; use datafusion_datasource::file_groups::FileGroup; use datafusion_datasource::file_scan_config::FileScanConfigBuilder; -use datafusion_expr::{JoinType, Operator}; +use datafusion_execution::{SendableRecordBatchStream, TaskContext}; +use datafusion_expr::{AggregateUDF, JoinType, Operator}; +use datafusion_physical_expr::aggregate::AggregateExprBuilder; use datafusion_physical_expr::expressions::{BinaryExpr, Column, Literal}; use datafusion_physical_expr::PhysicalExpr; use datafusion_physical_expr::{ @@ -53,6 +55,7 @@ use datafusion_physical_expr_common::sort_expr::LexRequirement; use datafusion_physical_optimizer::enforce_distribution::*; use datafusion_physical_optimizer::enforce_sorting::EnforceSorting; use datafusion_physical_optimizer::output_requirements::OutputRequirements; +use datafusion_physical_optimizer::sanity_checker::check_plan_sanity; use datafusion_physical_optimizer::PhysicalOptimizerRule; use datafusion_physical_plan::aggregates::{ AggregateExec, AggregateMode, PhysicalGroupBy, @@ -68,8 +71,8 @@ use datafusion_physical_plan::projection::ProjectionExec; use datafusion_physical_plan::sorts::sort::SortExec; use datafusion_physical_plan::sorts::sort_preserving_merge::SortPreservingMergeExec; use datafusion_physical_plan::union::UnionExec; -use datafusion_physical_plan::ExecutionPlanProperties; use datafusion_physical_plan::PlanProperties; +use datafusion_physical_plan::{displayable, ExecutionPlanProperties}; use datafusion_physical_plan::{ get_plan_string, DisplayAs, DisplayFormatType, Statistics, }; @@ -170,8 +173,8 @@ impl ExecutionPlan for SortRequiredExec { fn execute( &self, _partition: usize, - _context: Arc, - ) -> Result { + _context: Arc, + ) -> Result { unreachable!(); } @@ -245,7 +248,7 @@ fn csv_exec_multiple_sorted(output_ordering: Vec) -> Arc, alias_pairs: Vec<(String, String)>, ) -> Arc { @@ -259,6 +262,15 @@ fn projection_exec_with_alias( fn aggregate_exec_with_alias( input: Arc, alias_pairs: Vec<(String, String)>, +) -> Arc { + aggregate_exec_with_aggr_expr_and_alias(input, vec![], alias_pairs) +} + +#[expect(clippy::type_complexity)] +fn aggregate_exec_with_aggr_expr_and_alias( + input: Arc, + aggr_expr: Vec<(Arc, Vec>)>, + alias_pairs: Vec<(String, String)>, ) -> Arc { let schema = schema(); let mut group_by_expr: Vec<(Arc, String)> = vec![]; @@ -279,18 +291,31 @@ fn aggregate_exec_with_alias( .collect::>(); let final_grouping = PhysicalGroupBy::new_single(final_group_by_expr); + let aggr_expr = aggr_expr + .into_iter() + .map(|(udaf, exprs)| { + AggregateExprBuilder::new(udaf.clone(), exprs) + .alias(udaf.name()) + .schema(Arc::clone(&schema)) + .build() + .map(Arc::new) + .unwrap() + }) + .collect::>(); + let filter_exprs = std::iter::repeat_n(None, aggr_expr.len()).collect::>(); + Arc::new( AggregateExec::try_new( AggregateMode::FinalPartitioned, final_grouping, - vec![], - vec![], + aggr_expr.clone(), + filter_exprs.clone(), Arc::new( AggregateExec::try_new( AggregateMode::Partial, group_by, - vec![], - vec![], + aggr_expr, + filter_exprs, input, schema.clone(), ) @@ -461,6 +486,12 @@ impl TestConfig { self } + /// Set batch size. + fn with_batch_size(mut self, batch_size: usize) -> Self { + self.config.execution.batch_size = batch_size; + self + } + /// Perform a series of runs using the current [`TestConfig`], /// assert the expected plan result, /// and return the result plan (for potentional subsequent runs). @@ -538,6 +569,22 @@ impl TestConfig { } } +// macro_rules! assert_optimized_without_forced_roundrobin { +// ($EXPECTED_LINES: expr, $PLAN: expr, $FIRST_ENFORCE_DIST: expr) => { +// assert_optimized!( +// $EXPECTED_LINES, +// $PLAN, +// $FIRST_ENFORCE_DIST, +// false, // prefer existing sort = default +// 10, // target partitions = default +// false, // repartition file scans = default +// 1024, // REPARTITION_FILE_MIN_SIZE = default +// false, // PREFER_EXISTING_UNION = default +// 100 // batch size +// ); +// }; +// } + macro_rules! assert_plan_txt { ($EXPECTED_LINES: expr, $PLAN: expr) => { let expected_lines: Vec<&str> = $EXPECTED_LINES.iter().map(|s| *s).collect(); @@ -2045,6 +2092,283 @@ fn repartition_ignores_union() -> Result<()> { Ok(()) } +fn aggregate_over_union(input: Vec>) -> Arc { + let union = union_exec(input); + let plan = + aggregate_exec_with_alias(union, vec![("a".to_string(), "a1".to_string())]); + + // Demonstrate starting plan. + let before = displayable(plan.as_ref()).indent(true).to_string(); + let before = trim_plan_display(&before); + assert_eq!( + before, + vec![ + "AggregateExec: mode=FinalPartitioned, gby=[a1@0 as a1], aggr=[]", + "AggregateExec: mode=Partial, gby=[a@0 as a1], aggr=[]", + "UnionExec", + "DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet", + "DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet", + ], + ); + + plan +} + +// Aggregate over a union, +// with current testing setup. +// +// It will repartiton twice for an aggregate over a union. +// * repartitions before the partial aggregate. +// * repartitions before the final aggregation. +#[test] +fn repartitions_twice_for_aggregate_after_union() -> Result<()> { + let plan = aggregate_over_union(vec![parquet_exec(); 2]); + + // We get a distribution error without repartitioning. + let err = check_plan_sanity(plan.clone(), &Default::default()).unwrap_err(); + assert_contains!( + err.message(), + "DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet\"] does not satisfy distribution requirements: HashPartitioned[[a1@0]]). Child-0 output partitioning: UnknownPartitioning(2)" + ); + + // Updated plan (post optimization) will have added RepartitionExecs (btwn union and aggregation). + let expected = &[ + "AggregateExec: mode=FinalPartitioned, gby=[a1@0 as a1], aggr=[]", + " RepartitionExec: partitioning=Hash([a1@0], 10), input_partitions=10", + " AggregateExec: mode=Partial, gby=[a@0 as a1], aggr=[]", + " RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=2", + " UnionExec", + " DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet", + " DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet", + ]; + let test_config = TestConfig::default(); + test_config.run(expected, plan.clone(), &DISTRIB_DISTRIB_SORT)?; + test_config.run(expected, plan, &SORT_DISTRIB_DISTRIB)?; + + Ok(()) +} + +// Aggregate over a union, +// but make the test setup more realistic. +// +// It will repartiton once for an aggregate over a union. +// * repartitions btwn partial & final aggregations. +#[test] +fn repartitions_once_for_aggregate_after_union() -> Result<()> { + // use parquet exec with stats + let plan: Arc = + aggregate_over_union(vec![parquet_exec_with_stats(10000); 2]); + + // We get a distribution error without repartitioning. + let err = check_plan_sanity(plan.clone(), &Default::default()).unwrap_err(); + assert_contains!( + err.message(), + "DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet\"] does not satisfy distribution requirements: HashPartitioned[[a1@0]]). Child-0 output partitioning: UnknownPartitioning(2)" + ); + + // This removes the forced round-robin repartitioning, + // by no longer hard-coding batch_size=1. + // + // Updated plan (post optimization) will have added only 1 RepartitionExec. + let expected = &[ + "AggregateExec: mode=FinalPartitioned, gby=[a1@0 as a1], aggr=[]", + " RepartitionExec: partitioning=Hash([a1@0], 10), input_partitions=10", + " AggregateExec: mode=Partial, gby=[a@0 as a1], aggr=[]", + " RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=2", + " UnionExec", + " DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet", + " DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet", + ]; + let test_config = TestConfig::default().with_batch_size(100); + test_config.run(expected, plan.clone(), &DISTRIB_DISTRIB_SORT)?; + test_config.run(expected, plan, &SORT_DISTRIB_DISTRIB)?; + + Ok(()) +} + +/// Same as [`aggregate_over_union`], but with a sort btwn the union and aggregation. +fn aggregate_over_sorted_union( + input: Vec>, +) -> Arc { + let union = union_exec(input); + let schema = schema(); + let sort_key = LexOrdering::new(vec![PhysicalSortExpr { + expr: col("a", &schema).unwrap(), + options: SortOptions::default(), + }]); + let sort = sort_exec(sort_key, union, false); + let plan = aggregate_exec_with_alias(sort, vec![("a".to_string(), "a1".to_string())]); + + // Demonstrate starting plan. + // Notice the `ordering_mode=Sorted` on the aggregations. + let before = displayable(plan.as_ref()).indent(true).to_string(); + let before = trim_plan_display(&before); + assert_eq!( + before, + vec![ + "AggregateExec: mode=FinalPartitioned, gby=[a1@0 as a1], aggr=[], ordering_mode=Sorted", + "AggregateExec: mode=Partial, gby=[a@0 as a1], aggr=[], ordering_mode=Sorted", + "SortExec: expr=[a@0 ASC], preserve_partitioning=[false]", + "UnionExec", + "DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet", + "DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet", + ], + ); + + plan +} + +/// Same as [`repartitions_once_for_aggregate_after_union`], but adds a sort btwn +/// the union and the aggregate. This changes the outcome: +/// +/// * we no longer get a distribution error. +/// * but we still get repartitioning? +#[test] +fn repartitions_for_aggregate_after_sorted_union() -> Result<()> { + let plan = aggregate_over_sorted_union(vec![parquet_exec_with_stats(10000); 2]); + + // With the sort, there is no distribution error. + let checker = check_plan_sanity(plan.clone(), &Default::default()); + assert!(checker.is_ok()); + + // It does not repartition on the first run + let expected_after_first_run = &[ + "AggregateExec: mode=FinalPartitioned, gby=[a1@0 as a1], aggr=[], ordering_mode=Sorted", + " SortExec: expr=[a1@0 ASC NULLS LAST], preserve_partitioning=[true]", + " RepartitionExec: partitioning=Hash([a1@0], 10), input_partitions=10", + " AggregateExec: mode=Partial, gby=[a@0 as a1], aggr=[], ordering_mode=Sorted", + " RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", + " SortPreservingMergeExec: [a@0 ASC]", + " UnionExec", + " SortExec: expr=[a@0 ASC], preserve_partitioning=[false]", + " DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet", + " SortExec: expr=[a@0 ASC], preserve_partitioning=[false]", + " DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet", + ]; + let test_config = TestConfig::default().with_batch_size(100); + test_config.run( + expected_after_first_run, + plan.clone(), + &DISTRIB_DISTRIB_SORT, + )?; + + // But does repartition on the second run. + let expected_after_second_run = &[ + "AggregateExec: mode=FinalPartitioned, gby=[a1@0 as a1], aggr=[], ordering_mode=Sorted", + " SortExec: expr=[a1@0 ASC NULLS LAST], preserve_partitioning=[true]", + " RepartitionExec: partitioning=Hash([a1@0], 10), input_partitions=10", + " SortExec: expr=[a1@0 ASC NULLS LAST], preserve_partitioning=[true]", + " AggregateExec: mode=Partial, gby=[a@0 as a1], aggr=[], ordering_mode=Sorted", + " SortExec: expr=[a@0 ASC NULLS LAST], preserve_partitioning=[true]", + " RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=2", + " SortExec: expr=[a@0 ASC NULLS LAST], preserve_partitioning=[true]", + " UnionExec", + " SortExec: expr=[a@0 ASC], preserve_partitioning=[false]", + " DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet", + " SortExec: expr=[a@0 ASC], preserve_partitioning=[false]", + " DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet", + ]; + test_config.run(expected_after_second_run, plan, &SORT_DISTRIB_DISTRIB)?; + + Ok(()) +} + +/// Same as [`aggregate_over_sorted_union`], but with a sort btwn the union and aggregation. +fn aggregate_over_sorted_union_projection( + input: Vec>, +) -> Arc { + let union = union_exec(input); + let union_projection = projection_exec_with_alias( + union, + vec![ + ("a".to_string(), "a".to_string()), + ("b".to_string(), "value".to_string()), + ], + ); + let schema = schema(); + let sort_key = LexOrdering::new(vec![PhysicalSortExpr { + expr: col("a", &schema).unwrap(), + options: SortOptions::default(), + }]); + let sort = sort_exec(sort_key, union_projection, false); + let plan = aggregate_exec_with_alias(sort, vec![("a".to_string(), "a1".to_string())]); + + // Demonstrate starting plan. + // Notice the `ordering_mode=Sorted` on the aggregations. + let before = displayable(plan.as_ref()).indent(true).to_string(); + let before = trim_plan_display(&before); + assert_eq!( + before, + vec![ + "AggregateExec: mode=FinalPartitioned, gby=[a1@0 as a1], aggr=[], ordering_mode=Sorted", + "AggregateExec: mode=Partial, gby=[a@0 as a1], aggr=[], ordering_mode=Sorted", + "SortExec: expr=[a@0 ASC], preserve_partitioning=[false]", + "ProjectionExec: expr=[a@0 as a, b@1 as value]", + "UnionExec", + "DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet", + "DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet", + ], + ); + + plan +} + +/// Same as [`repartitions_for_aggregate_after_sorted_union`], but adds a projection +/// as well between the union and aggregate. This change the outcome: +/// +/// * we no longer get repartitioning, and instead get coalescing. +#[test] +fn coalesces_for_aggregate_after_sorted_union_projection() -> Result<()> { + let plan = + aggregate_over_sorted_union_projection(vec![parquet_exec_with_stats(10000); 2]); + + // Same as `repartitions_for_aggregate_after_sorted_union`. No error. + let checker = check_plan_sanity(plan.clone(), &Default::default()); + assert!(checker.is_ok()); + + // It no longer does a repartition on the first run. + // Instead adds a SPM. + let expected_after_first_run = &[ + "AggregateExec: mode=FinalPartitioned, gby=[a1@0 as a1], aggr=[], ordering_mode=Sorted", + " SortExec: expr=[a1@0 ASC NULLS LAST], preserve_partitioning=[true]", + " RepartitionExec: partitioning=Hash([a1@0], 10), input_partitions=10", + " AggregateExec: mode=Partial, gby=[a@0 as a1], aggr=[], ordering_mode=Sorted", + " RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", + " SortPreservingMergeExec: [a@0 ASC]", + " SortExec: expr=[a@0 ASC], preserve_partitioning=[true]", + " ProjectionExec: expr=[a@0 as a, b@1 as value]", + " UnionExec", + " DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet", + " DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet", + ]; + let test_config = TestConfig::default().with_batch_size(100); + test_config.run( + expected_after_first_run, + plan.clone(), + &DISTRIB_DISTRIB_SORT, + )?; + + // Then it removes the SPM, and inserts a coalesace on the second run. + let expected_after_second_run = &[ + "AggregateExec: mode=FinalPartitioned, gby=[a1@0 as a1], aggr=[], ordering_mode=Sorted", + " SortExec: expr=[a1@0 ASC NULLS LAST], preserve_partitioning=[true]", + " RepartitionExec: partitioning=Hash([a1@0], 10), input_partitions=10", + " SortExec: expr=[a1@0 ASC NULLS LAST], preserve_partitioning=[true]", + " AggregateExec: mode=Partial, gby=[a@0 as a1], aggr=[], ordering_mode=Sorted", + " SortExec: expr=[a@0 ASC NULLS LAST], preserve_partitioning=[true]", + " RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=2", + " SortExec: expr=[a@0 ASC NULLS LAST], preserve_partitioning=[true]", + " SortExec: expr=[a@0 ASC], preserve_partitioning=[true]", + " ProjectionExec: expr=[a@0 as a, b@1 as value]", + " UnionExec", + " DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet", + " DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet", + ]; + test_config.run(expected_after_second_run, plan, &SORT_DISTRIB_DISTRIB)?; + + Ok(()) +} + #[test] fn repartition_through_sort_preserving_merge() -> Result<()> { // sort preserving merge with non-sorted input diff --git a/datafusion/core/tests/physical_optimizer/enforce_sorting.rs b/datafusion/core/tests/physical_optimizer/enforce_sorting.rs index f7668c8aab11..296a70200b31 100644 --- a/datafusion/core/tests/physical_optimizer/enforce_sorting.rs +++ b/datafusion/core/tests/physical_optimizer/enforce_sorting.rs @@ -17,14 +17,16 @@ use std::sync::Arc; +use crate::physical_optimizer::enforce_distribution::projection_exec_with_alias; use crate::physical_optimizer::test_utils::{ aggregate_exec, bounded_window_exec, check_integrity, coalesce_batches_exec, coalesce_partitions_exec, create_test_schema, create_test_schema2, create_test_schema3, filter_exec, global_limit_exec, hash_join_exec, limit_exec, - local_limit_exec, memory_exec, parquet_exec, repartition_exec, sort_exec, - sort_exec_with_fetch, sort_expr, sort_expr_options, sort_merge_join_exec, - sort_preserving_merge_exec, sort_preserving_merge_exec_with_fetch, - spr_repartition_exec, stream_exec_ordered, union_exec, RequirementsTestExec, + local_limit_exec, memory_exec, parquet_exec, parquet_exec_with_stats, + repartition_exec, schema, sort_exec, sort_exec_with_fetch, sort_expr, + sort_expr_options, sort_merge_join_exec, sort_preserving_merge_exec, + sort_preserving_merge_exec_with_fetch, spr_repartition_exec, stream_exec_ordered, + union_exec, RequirementsTestExec, }; use arrow::compute::SortOptions; @@ -38,6 +40,8 @@ use datafusion_physical_expr_common::physical_expr::PhysicalExpr; use datafusion_physical_expr_common::sort_expr::{LexOrdering, PhysicalSortExpr}; use datafusion_physical_expr::expressions::{col, Column, NotExpr}; use datafusion_physical_expr::Partitioning; +use datafusion_physical_optimizer::sanity_checker::SanityCheckPlan; +use datafusion_physical_plan::aggregates::{AggregateExec, AggregateMode, PhysicalGroupBy}; use datafusion_physical_plan::coalesce_partitions::CoalescePartitionsExec; use datafusion_physical_plan::limit::{GlobalLimitExec, LocalLimitExec}; use datafusion_physical_plan::repartition::RepartitionExec; @@ -1917,6 +1921,92 @@ async fn test_commutativity() -> Result<()> { Ok(()) } +fn single_partition_aggregate( + input: Arc, + alias_pairs: Vec<(String, String)>, +) -> Arc { + let schema = schema(); + let group_by = alias_pairs + .iter() + .map(|(column, alias)| (col(column, &input.schema()).unwrap(), alias.to_string())) + .collect::>(); + let group_by = PhysicalGroupBy::new_single(group_by); + + Arc::new( + AggregateExec::try_new( + AggregateMode::SinglePartitioned, + group_by, + vec![], + vec![], + input, + schema, + ) + .unwrap(), + ) +} + +#[tokio::test] +async fn test_preserve_needed_coalesce() -> Result<()> { + // Input to EnforceSorting, from our test case. + let plan = projection_exec_with_alias( + union_exec(vec![parquet_exec_with_stats(10000); 2]), + vec![ + ("a".to_string(), "a".to_string()), + ("b".to_string(), "value".to_string()), + ], + ); + let plan = Arc::new(CoalescePartitionsExec::new(plan)); + let schema = schema(); + let sort_key = LexOrdering::new(vec![PhysicalSortExpr { + expr: col("a", &schema).unwrap(), + options: SortOptions::default(), + }]); + let plan: Arc = + single_partition_aggregate(plan, vec![("a".to_string(), "a1".to_string())]); + let plan = sort_exec(sort_key, plan); + + // Starting plan: as in our test case. + assert_eq!( + get_plan_string(&plan), + vec![ + "SortExec: expr=[a@0 ASC], preserve_partitioning=[false]", + " AggregateExec: mode=SinglePartitioned, gby=[a@0 as a1], aggr=[]", + " CoalescePartitionsExec", + " ProjectionExec: expr=[a@0 as a, b@1 as value]", + " UnionExec", + " DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet", + " DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet", + ], + ); + + let checker = SanityCheckPlan::new().optimize(plan.clone(), &Default::default()); + assert!(checker.is_ok()); + + // EnforceSorting will remove the coalesce, and add an SPM further up (above the aggregate). + let optimizer = EnforceSorting::new(); + let optimized = optimizer.optimize(plan, &Default::default())?; + assert_eq!( + get_plan_string(&optimized), + vec![ + "SortPreservingMergeExec: [a@0 ASC]", + " SortExec: expr=[a@0 ASC], preserve_partitioning=[false]", + " AggregateExec: mode=SinglePartitioned, gby=[a@0 as a1], aggr=[]", + " CoalescePartitionsExec", + " ProjectionExec: expr=[a@0 as a, b@1 as value]", + " UnionExec", + " DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet", + " DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet", + ], + ); + + // Plan is valid. + let checker = SanityCheckPlan::new(); + let checker = checker.optimize(optimized, &Default::default()); + assert!(checker.is_ok()); + + Ok(()) +} + #[tokio::test] async fn test_coalesce_propagate() -> Result<()> { let schema = create_test_schema()?; diff --git a/datafusion/physical-optimizer/src/enforce_sorting/mod.rs b/datafusion/physical-optimizer/src/enforce_sorting/mod.rs index 37fec2eab3f9..f32c84c7fd31 100644 --- a/datafusion/physical-optimizer/src/enforce_sorting/mod.rs +++ b/datafusion/physical-optimizer/src/enforce_sorting/mod.rs @@ -47,8 +47,8 @@ use crate::enforce_sorting::sort_pushdown::{ assign_initial_requirements, pushdown_sorts, SortPushDown, }; use crate::utils::{ - add_sort_above, add_sort_above_with_check, is_coalesce_partitions, is_limit, - is_repartition, is_sort, is_sort_preserving_merge, is_union, is_window, + add_sort_above, add_sort_above_with_check, is_aggregation, is_coalesce_partitions, + is_limit, is_repartition, is_sort, is_sort_preserving_merge, is_union, is_window, }; use crate::PhysicalOptimizerRule; @@ -628,7 +628,7 @@ fn remove_bottleneck_in_subplan( ) -> Result { let plan = &requirements.plan; let children = &mut requirements.children; - if is_coalesce_partitions(&children[0].plan) { + if is_coalesce_partitions(&children[0].plan) && !is_aggregation(plan) { // We can safely use the 0th index since we have a `CoalescePartitionsExec`. let mut new_child_node = children[0].children.swap_remove(0); while new_child_node.plan.output_partitioning() == plan.output_partitioning() diff --git a/datafusion/physical-optimizer/src/utils.rs b/datafusion/physical-optimizer/src/utils.rs index 57a193315a5c..46a90a713ff0 100644 --- a/datafusion/physical-optimizer/src/utils.rs +++ b/datafusion/physical-optimizer/src/utils.rs @@ -19,6 +19,7 @@ use std::sync::Arc; use datafusion_physical_expr::LexRequirement; use datafusion_physical_expr_common::sort_expr::LexOrdering; +use datafusion_physical_plan::aggregates::AggregateExec; use datafusion_physical_plan::coalesce_partitions::CoalescePartitionsExec; use datafusion_physical_plan::limit::{GlobalLimitExec, LocalLimitExec}; use datafusion_physical_plan::repartition::RepartitionExec; @@ -109,3 +110,8 @@ pub fn is_repartition(plan: &Arc) -> bool { pub fn is_limit(plan: &Arc) -> bool { plan.as_any().is::() || plan.as_any().is::() } + +// Checks whether the given operator is a [`AggregateExec`]. +pub fn is_aggregation(plan: &Arc) -> bool { + plan.as_any().is::() +} From d25423bbd9ba526bc49a1c847b2a6da989189d92 Mon Sep 17 00:00:00 2001 From: Marco Neumann Date: Fri, 27 Jun 2025 11:24:56 +0200 Subject: [PATCH 7/9] fix: reserved keywords in qualified column names See: - #14141 - https://github.com/apache/datafusion-sqlparser-rs/issues/1909 --- datafusion/sql/src/expr/mod.rs | 21 ++++++++++++++++--- datafusion/sqllogictest/test_files/select.slt | 17 ++++++++++++--- 2 files changed, 32 insertions(+), 6 deletions(-) diff --git a/datafusion/sql/src/expr/mod.rs b/datafusion/sql/src/expr/mod.rs index e92869873731..1254a1997dbe 100644 --- a/datafusion/sql/src/expr/mod.rs +++ b/datafusion/sql/src/expr/mod.rs @@ -21,8 +21,9 @@ use datafusion_expr::planner::{ }; use sqlparser::ast::{ AccessExpr, BinaryOperator, CastFormat, CastKind, DataType as SQLDataType, - DictionaryField, Expr as SQLExpr, ExprWithAlias as SQLExprWithAlias, MapEntry, - StructField, Subscript, TrimWhereField, Value, ValueWithSpan, + DictionaryField, Expr as SQLExpr, ExprWithAlias as SQLExprWithAlias, + FunctionArguments, MapEntry, StructField, Subscript, TrimWhereField, Value, + ValueWithSpan, }; use datafusion_common::{ @@ -476,7 +477,21 @@ impl SqlToRel<'_, S> { ), SQLExpr::Function(function) => { - self.sql_function_to_expr(function, schema, planner_context) + // workaround for https://github.com/apache/datafusion-sqlparser-rs/issues/1909 + if matches!(function.args, FunctionArguments::None) + && function.name.0.len() > 1 + && function.name.0.iter().all(|part| part.as_ident().is_some()) + { + let ids = function + .name + .0 + .iter() + .map(|part| part.as_ident().expect("just checked").clone()) + .collect(); + self.sql_compound_identifier_to_expr(ids, schema, planner_context) + } else { + self.sql_function_to_expr(function, schema, planner_context) + } } SQLExpr::Rollup(exprs) => { diff --git a/datafusion/sqllogictest/test_files/select.slt b/datafusion/sqllogictest/test_files/select.slt index aa14faf984e4..9febf06b2510 100644 --- a/datafusion/sqllogictest/test_files/select.slt +++ b/datafusion/sqllogictest/test_files/select.slt @@ -408,7 +408,7 @@ VALUES (1,2,3,4,5,6,7,8,9,10,11,12,13,NULL,'F',3.5) # Test non-literal expressions in VALUES query II -VALUES (1, CASE WHEN RANDOM() > 0.5 THEN 1 ELSE 1 END), +VALUES (1, CASE WHEN RANDOM() > 0.5 THEN 1 ELSE 1 END), (2, CASE WHEN RANDOM() > 0.5 THEN 2 ELSE 2 END); ---- 1 1 @@ -558,7 +558,7 @@ EXPLAIN SELECT * FROM ((SELECT column1 FROM foo) "T1" CROSS JOIN (SELECT column2 ---- logical_plan 01)SubqueryAlias: F -02)--Cross Join: +02)--Cross Join: 03)----SubqueryAlias: T1 04)------TableScan: foo projection=[column1] 05)----SubqueryAlias: T2 @@ -1641,7 +1641,7 @@ query II SELECT CASE WHEN B.x > 0 THEN A.x / B.x ELSE 0 END AS value1, CASE WHEN B.x > 0 AND B.y > 0 THEN A.x / B.x ELSE 0 END AS value3 -FROM t AS A, (SELECT * FROM t WHERE x = 0) AS B; +FROM t AS A, (SELECT * FROM t WHERE x = 0) AS B; ---- 0 0 0 0 @@ -1871,3 +1871,14 @@ select *, count(*) over() as ta from t; statement count 0 drop table t; + +# test "user" column +# See https://github.com/apache/datafusion/issues/14141 +statement count 0 +create table t_with_user(a int, user text) as values (1,'test'), (2,null); + +query T +select t_with_user.user from t_with_user; +---- +test +NULL From 9a98f71d3fe8ab1a4ca105d90dc48659da96b930 Mon Sep 17 00:00:00 2001 From: epgif Date: Wed, 18 Jun 2025 16:09:48 -0500 Subject: [PATCH 8/9] feat: add SchemaProvider::table_type(table_name: &str) (#16401) * feat: add SchemaProvider::table_type(table_name: &str) InformationSchemaConfig::make_tables only needs the TableType not the whole TableProvider, and the former may require an expensive catalog operation to construct and the latter may not. This allows avoiding `SELECT * FROM information_schema.tables` having to make 1 of those potentially expensive operations per table. * test: new InformationSchemaConfig::make_tables behavior * Move tests to same file to fix CI --------- Co-authored-by: Andrew Lamb --- datafusion/catalog/src/information_schema.rs | 95 +++++++++++++++++++- datafusion/catalog/src/schema.rs | 9 ++ 2 files changed, 102 insertions(+), 2 deletions(-) diff --git a/datafusion/catalog/src/information_schema.rs b/datafusion/catalog/src/information_schema.rs index 057d1a819882..83b6d64ef47b 100644 --- a/datafusion/catalog/src/information_schema.rs +++ b/datafusion/catalog/src/information_schema.rs @@ -103,12 +103,14 @@ impl InformationSchemaConfig { // schema name may not exist in the catalog, so we need to check if let Some(schema) = catalog.schema(&schema_name) { for table_name in schema.table_names() { - if let Some(table) = schema.table(&table_name).await? { + if let Some(table_type) = + schema.table_type(&table_name).await? + { builder.add_table( &catalog_name, &schema_name, &table_name, - table.table_type(), + table_type, ); } } @@ -1359,3 +1361,92 @@ impl PartitionStream for InformationSchemaParameters { )) } } + +#[cfg(test)] +mod tests { + use super::*; + use crate::CatalogProvider; + + #[tokio::test] + async fn make_tables_uses_table_type() { + let config = InformationSchemaConfig { + catalog_list: Arc::new(Fixture), + }; + let mut builder = InformationSchemaTablesBuilder { + catalog_names: StringBuilder::new(), + schema_names: StringBuilder::new(), + table_names: StringBuilder::new(), + table_types: StringBuilder::new(), + schema: Arc::new(Schema::empty()), + }; + + assert!(config.make_tables(&mut builder).await.is_ok()); + + assert_eq!("BASE TABLE", builder.table_types.finish().value(0)); + } + + #[derive(Debug)] + struct Fixture; + + #[async_trait] + impl SchemaProvider for Fixture { + // InformationSchemaConfig::make_tables should use this. + async fn table_type(&self, _: &str) -> Result> { + Ok(Some(TableType::Base)) + } + + // InformationSchemaConfig::make_tables used this before `table_type` + // existed but should not, as it may be expensive. + async fn table(&self, _: &str) -> Result>> { + panic!("InformationSchemaConfig::make_tables called SchemaProvider::table instead of table_type") + } + + fn as_any(&self) -> &dyn Any { + unimplemented!("not required for these tests") + } + + fn table_names(&self) -> Vec { + vec!["atable".to_string()] + } + + fn table_exist(&self, _: &str) -> bool { + unimplemented!("not required for these tests") + } + } + + impl CatalogProviderList for Fixture { + fn as_any(&self) -> &dyn Any { + unimplemented!("not required for these tests") + } + + fn register_catalog( + &self, + _: String, + _: Arc, + ) -> Option> { + unimplemented!("not required for these tests") + } + + fn catalog_names(&self) -> Vec { + vec!["acatalog".to_string()] + } + + fn catalog(&self, _: &str) -> Option> { + Some(Arc::new(Self)) + } + } + + impl CatalogProvider for Fixture { + fn as_any(&self) -> &dyn Any { + unimplemented!("not required for these tests") + } + + fn schema_names(&self) -> Vec { + vec!["aschema".to_string()] + } + + fn schema(&self, _: &str) -> Option> { + Some(Arc::new(Self)) + } + } +} diff --git a/datafusion/catalog/src/schema.rs b/datafusion/catalog/src/schema.rs index 5b37348fd742..9ba55256f182 100644 --- a/datafusion/catalog/src/schema.rs +++ b/datafusion/catalog/src/schema.rs @@ -26,6 +26,7 @@ use std::sync::Arc; use crate::table::TableProvider; use datafusion_common::Result; +use datafusion_expr::TableType; /// Represents a schema, comprising a number of named tables. /// @@ -54,6 +55,14 @@ pub trait SchemaProvider: Debug + Sync + Send { name: &str, ) -> Result>, DataFusionError>; + /// Retrieves the type of a specific table from the schema by name, if it exists, otherwise + /// returns `None`. Implementations for which this operation is cheap but [Self::table] is + /// expensive can override this to improve operations that only need the type, e.g. + /// `SELECT * FROM information_schema.tables`. + async fn table_type(&self, name: &str) -> Result> { + self.table(name).await.map(|o| o.map(|t| t.table_type())) + } + /// If supported by the implementation, adds a new table named `name` to /// this schema. /// From 3670b679394d4224b99f16c361eb82a8e65a82ec Mon Sep 17 00:00:00 2001 From: Marco Neumann Date: Thu, 17 Jul 2025 13:48:09 +0200 Subject: [PATCH 9/9] fix: support nullable columns in pre-sorted data sources (#16783) --- datafusion/datasource/src/file_scan_config.rs | 57 ++++++++++++++----- datafusion/datasource/src/statistics.rs | 9 +-- .../sqllogictest/test_files/parquet.slt | 6 +- .../test_files/parquet_sorted_statistics.slt | 57 +++++++++++-------- 4 files changed, 79 insertions(+), 50 deletions(-) diff --git a/datafusion/datasource/src/file_scan_config.rs b/datafusion/datasource/src/file_scan_config.rs index b6cdd05e5230..39e1e280b260 100644 --- a/datafusion/datasource/src/file_scan_config.rs +++ b/datafusion/datasource/src/file_scan_config.rs @@ -1908,13 +1908,28 @@ mod tests { struct File { name: &'static str, date: &'static str, - statistics: Vec>, + statistics: Vec, Option)>>, } impl File { fn new( name: &'static str, date: &'static str, statistics: Vec>, + ) -> Self { + Self::new_nullable( + name, + date, + statistics + .into_iter() + .map(|opt| opt.map(|(min, max)| (Some(min), Some(max)))) + .collect(), + ) + } + + fn new_nullable( + name: &'static str, + date: &'static str, + statistics: Vec, Option)>>, ) -> Self { Self { name, @@ -1981,21 +1996,35 @@ mod tests { sort: vec![col("value").sort(false, true)], expected_result: Ok(vec![vec!["1", "0"], vec!["2"]]), }, - // reject nullable sort columns TestCase { - name: "no nullable sort columns", + name: "nullable sort columns, nulls last", file_schema: Schema::new(vec![Field::new( "value".to_string(), DataType::Float64, - true, // should fail because nullable + true, )]), files: vec![ - File::new("0", "2023-01-01", vec![Some((0.00, 0.49))]), - File::new("1", "2023-01-01", vec![Some((0.50, 1.00))]), - File::new("2", "2023-01-02", vec![Some((0.00, 1.00))]), + File::new_nullable("0", "2023-01-01", vec![Some((Some(0.00), Some(0.49)))]), + File::new_nullable("1", "2023-01-01", vec![Some((Some(0.50), None))]), + File::new_nullable("2", "2023-01-02", vec![Some((Some(0.00), None))]), ], sort: vec![col("value").sort(true, false)], - expected_result: Err("construct min/max statistics for split_groups_by_statistics\ncaused by\nbuild min rows\ncaused by\ncreate sorting columns\ncaused by\nError during planning: cannot sort by nullable column") + expected_result: Ok(vec![vec!["0", "1"], vec!["2"]]) + }, + TestCase { + name: "nullable sort columns, nulls first", + file_schema: Schema::new(vec![Field::new( + "value".to_string(), + DataType::Float64, + true, + )]), + files: vec![ + File::new_nullable("0", "2023-01-01", vec![Some((None, Some(0.49)))]), + File::new_nullable("1", "2023-01-01", vec![Some((Some(0.50), Some(1.00)))]), + File::new_nullable("2", "2023-01-02", vec![Some((None, Some(1.00)))]), + ], + sort: vec![col("value").sort(true, true)], + expected_result: Ok(vec![vec!["0", "1"], vec!["2"]]) }, TestCase { name: "all three non-overlapping", @@ -2153,12 +2182,12 @@ mod tests { .map(|stats| { stats .map(|(min, max)| ColumnStatistics { - min_value: Precision::Exact(ScalarValue::from( - min, - )), - max_value: Precision::Exact(ScalarValue::from( - max, - )), + min_value: Precision::Exact( + ScalarValue::Float64(min), + ), + max_value: Precision::Exact( + ScalarValue::Float64(max), + ), ..Default::default() }) .unwrap_or_default() diff --git a/datafusion/datasource/src/statistics.rs b/datafusion/datasource/src/statistics.rs index b42d3bb361b7..33dae03d5142 100644 --- a/datafusion/datasource/src/statistics.rs +++ b/datafusion/datasource/src/statistics.rs @@ -33,7 +33,7 @@ use arrow::{ row::{Row, Rows}, }; use datafusion_common::stats::Precision; -use datafusion_common::{plan_datafusion_err, plan_err, DataFusionError, Result}; +use datafusion_common::{plan_datafusion_err, DataFusionError, Result}; use datafusion_physical_expr::{expressions::Column, PhysicalSortExpr}; use datafusion_physical_expr_common::sort_expr::LexOrdering; use datafusion_physical_plan::{ColumnStatistics, Statistics}; @@ -228,14 +228,7 @@ impl MinMaxStatistics { .zip(sort_columns.iter().copied()) .map(|(sort_expr, column)| { let schema = values.schema(); - let idx = schema.index_of(column.name())?; - let field = schema.field(idx); - - // check that sort columns are non-nullable - if field.is_nullable() { - return plan_err!("cannot sort by nullable column"); - } Ok(SortColumn { values: Arc::clone(values.column(idx)), diff --git a/datafusion/sqllogictest/test_files/parquet.slt b/datafusion/sqllogictest/test_files/parquet.slt index abc6fdab3c8a..39c1a5b459cc 100644 --- a/datafusion/sqllogictest/test_files/parquet.slt +++ b/datafusion/sqllogictest/test_files/parquet.slt @@ -130,8 +130,7 @@ STORED AS PARQUET; ---- 3 -# Check output plan again, expect no "output_ordering" clause in the physical_plan -> ParquetExec, -# due to there being more files than partitions: +# Check output plan again query TT EXPLAIN SELECT int_col, string_col FROM test_table @@ -142,8 +141,7 @@ logical_plan 02)--TableScan: test_table projection=[int_col, string_col] physical_plan 01)SortPreservingMergeExec: [string_col@1 ASC NULLS LAST, int_col@0 ASC NULLS LAST] -02)--SortExec: expr=[string_col@1 ASC NULLS LAST, int_col@0 ASC NULLS LAST], preserve_partitioning=[true] -03)----DataSourceExec: file_groups={2 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet/test_table/0.parquet, WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet/test_table/1.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet/test_table/2.parquet]]}, projection=[int_col, string_col], file_type=parquet +02)--DataSourceExec: file_groups={2 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet/test_table/0.parquet, WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet/test_table/1.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet/test_table/2.parquet]]}, projection=[int_col, string_col], output_ordering=[string_col@1 ASC NULLS LAST, int_col@0 ASC NULLS LAST], file_type=parquet # Perform queries using MIN and MAX diff --git a/datafusion/sqllogictest/test_files/parquet_sorted_statistics.slt b/datafusion/sqllogictest/test_files/parquet_sorted_statistics.slt index a10243f62720..fe909e70ffb0 100644 --- a/datafusion/sqllogictest/test_files/parquet_sorted_statistics.slt +++ b/datafusion/sqllogictest/test_files/parquet_sorted_statistics.slt @@ -38,20 +38,22 @@ CREATE TABLE src_table ( bigint_col BIGINT, date_col DATE, overlapping_col INT, - constant_col INT + constant_col INT, + nulls_first_col INT, + nulls_last_col INT ) AS VALUES -- first file -(1, 3, 'aaa', 100, 1, 0, 0), -(2, 2, 'bbb', 200, 2, 1, 0), -(3, 1, 'ccc', 300, 3, 2, 0), +(1, 3, 'aaa', 100, 1, 0, 0, NULL, 1), +(2, 2, 'bbb', 200, 2, 1, 0, NULL, 2), +(3, 1, 'ccc', 300, 3, 2, 0, 1, 3), -- second file -(4, 6, 'ddd', 400, 4, 0, 0), -(5, 5, 'eee', 500, 5, 1, 0), -(6, 4, 'fff', 600, 6, 2, 0), +(4, 6, 'ddd', 400, 4, 0, 0, 2, 4), +(5, 5, 'eee', 500, 5, 1, 0, 3, 5), +(6, 4, 'fff', 600, 6, 2, 0, 4, 6), -- third file -(7, 9, 'ggg', 700, 7, 3, 0), -(8, 8, 'hhh', 800, 8, 4, 0), -(9, 7, 'iii', 900, 9, 5, 0); +(7, 9, 'ggg', 700, 7, 3, 0, 5, 7), +(8, 8, 'hhh', 800, 8, 4, 0, 6, NULL), +(9, 7, 'iii', 900, 9, 5, 0, 7, NULL); # Setup 3 files, in particular more files than there are partitions @@ -90,11 +92,18 @@ CREATE EXTERNAL TABLE test_table ( bigint_col BIGINT NOT NULL, date_col DATE NOT NULL, overlapping_col INT NOT NULL, - constant_col INT NOT NULL + constant_col INT NOT NULL, + nulls_first_col INT, + nulls_last_col INT ) STORED AS PARQUET PARTITIONED BY (partition_col) -WITH ORDER (int_col ASC NULLS LAST, bigint_col ASC NULLS LAST) +WITH ORDER ( + int_col ASC NULLS LAST, + bigint_col ASC NULLS LAST, + nulls_first_col ASC NULLS FIRST, + nulls_last_col ASC NULLS LAST +) LOCATION 'test_files/scratch/parquet_sorted_statistics/test_table'; # Order by numeric columns @@ -102,33 +111,33 @@ LOCATION 'test_files/scratch/parquet_sorted_statistics/test_table'; # DataFusion doesn't currently support string column statistics # This should not require a sort. query TT -EXPLAIN SELECT int_col, bigint_col +EXPLAIN SELECT int_col, bigint_col, nulls_first_col, nulls_last_col FROM test_table -ORDER BY int_col, bigint_col; +ORDER BY int_col, bigint_col, nulls_first_col NULLS FIRST, nulls_last_col NULLS LAST; ---- logical_plan -01)Sort: test_table.int_col ASC NULLS LAST, test_table.bigint_col ASC NULLS LAST -02)--TableScan: test_table projection=[int_col, bigint_col] +01)Sort: test_table.int_col ASC NULLS LAST, test_table.bigint_col ASC NULLS LAST, test_table.nulls_first_col ASC NULLS FIRST, test_table.nulls_last_col ASC NULLS LAST +02)--TableScan: test_table projection=[int_col, bigint_col, nulls_first_col, nulls_last_col] physical_plan -01)SortPreservingMergeExec: [int_col@0 ASC NULLS LAST, bigint_col@1 ASC NULLS LAST] -02)--DataSourceExec: file_groups={2 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_sorted_statistics/test_table/partition_col=A/0.parquet, WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_sorted_statistics/test_table/partition_col=C/2.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_sorted_statistics/test_table/partition_col=B/1.parquet]]}, projection=[int_col, bigint_col], output_ordering=[int_col@0 ASC NULLS LAST, bigint_col@1 ASC NULLS LAST], file_type=parquet +01)SortPreservingMergeExec: [int_col@0 ASC NULLS LAST, bigint_col@1 ASC NULLS LAST, nulls_first_col@2 ASC, nulls_last_col@3 ASC NULLS LAST] +02)--DataSourceExec: file_groups={2 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_sorted_statistics/test_table/partition_col=A/0.parquet, WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_sorted_statistics/test_table/partition_col=C/2.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_sorted_statistics/test_table/partition_col=B/1.parquet]]}, projection=[int_col, bigint_col, nulls_first_col, nulls_last_col], output_ordering=[int_col@0 ASC NULLS LAST, bigint_col@1 ASC NULLS LAST, nulls_first_col@2 ASC, nulls_last_col@3 ASC NULLS LAST], file_type=parquet # Another planning test, but project on a column with unsupported statistics # We should be able to ignore this and look at only the relevant statistics query TT EXPLAIN SELECT string_col FROM test_table -ORDER BY int_col, bigint_col; +ORDER BY int_col, bigint_col, nulls_first_col NULLS FIRST, nulls_last_col NULLS LAST; ---- logical_plan 01)Projection: test_table.string_col -02)--Sort: test_table.int_col ASC NULLS LAST, test_table.bigint_col ASC NULLS LAST -03)----Projection: test_table.string_col, test_table.int_col, test_table.bigint_col -04)------TableScan: test_table projection=[int_col, string_col, bigint_col] +02)--Sort: test_table.int_col ASC NULLS LAST, test_table.bigint_col ASC NULLS LAST, test_table.nulls_first_col ASC NULLS FIRST, test_table.nulls_last_col ASC NULLS LAST +03)----Projection: test_table.string_col, test_table.int_col, test_table.bigint_col, test_table.nulls_first_col, test_table.nulls_last_col +04)------TableScan: test_table projection=[int_col, string_col, bigint_col, nulls_first_col, nulls_last_col] physical_plan 01)ProjectionExec: expr=[string_col@0 as string_col] -02)--SortPreservingMergeExec: [int_col@1 ASC NULLS LAST, bigint_col@2 ASC NULLS LAST] -03)----DataSourceExec: file_groups={2 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_sorted_statistics/test_table/partition_col=A/0.parquet, WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_sorted_statistics/test_table/partition_col=C/2.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_sorted_statistics/test_table/partition_col=B/1.parquet]]}, projection=[string_col, int_col, bigint_col], output_ordering=[int_col@1 ASC NULLS LAST, bigint_col@2 ASC NULLS LAST], file_type=parquet +02)--SortPreservingMergeExec: [int_col@1 ASC NULLS LAST, bigint_col@2 ASC NULLS LAST, nulls_first_col@3 ASC, nulls_last_col@4 ASC NULLS LAST] +03)----DataSourceExec: file_groups={2 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_sorted_statistics/test_table/partition_col=A/0.parquet, WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_sorted_statistics/test_table/partition_col=C/2.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_sorted_statistics/test_table/partition_col=B/1.parquet]]}, projection=[string_col, int_col, bigint_col, nulls_first_col, nulls_last_col], output_ordering=[int_col@1 ASC NULLS LAST, bigint_col@2 ASC NULLS LAST, nulls_first_col@3 ASC, nulls_last_col@4 ASC NULLS LAST], file_type=parquet # Clean up & recreate but sort on descending column statement ok