From 261c04e9c682446a5b3d37998fd126dafa144c7d Mon Sep 17 00:00:00 2001 From: Eeshan Date: Sat, 6 Sep 2025 01:55:28 +0530 Subject: [PATCH 1/4] fix: prevent UnionExec panic with empty inputs This commit fixes a panic in UnionExec when constructed with empty inputs. Previously, UnionExec::new(vec![]) would cause an index out of bounds panic at union.rs:542 when trying to access inputs[0]. Changes: - Made UnionExec::new() return Result with proper validation - Made union_schema() return Result with empty input checks - Added descriptive error messages for empty input cases - Updated all call sites to handle the new Result return type - Added comprehensive tests for edge cases Error messages: - "UnionExec requires at least one input" - "Cannot create union schema from empty inputs" The fix maintains backward compatibility for valid inputs while preventing crashes and providing clear error messages for invalid usage. Fixes #17052 --- datafusion/core/src/physical_planner.rs | 2 +- .../enforce_distribution.rs | 4 +- .../partition_statistics.rs | 2 +- .../physical_optimizer/projection_pushdown.rs | 2 +- .../tests/physical_optimizer/test_utils.rs | 2 +- .../physical-plan/src/repartition/mod.rs | 4 +- datafusion/physical-plan/src/union.rs | 90 +++++++++++++++---- 7 files changed, 83 insertions(+), 23 deletions(-) diff --git a/datafusion/core/src/physical_planner.rs b/datafusion/core/src/physical_planner.rs index 6618d9495d78..ce46840d477b 100644 --- a/datafusion/core/src/physical_planner.rs +++ b/datafusion/core/src/physical_planner.rs @@ -1245,7 +1245,7 @@ impl DefaultPhysicalPlanner { } // N Children - LogicalPlan::Union(_) => Arc::new(UnionExec::new(children.vec())), + LogicalPlan::Union(_) => Arc::new(UnionExec::new(children.vec())?), LogicalPlan::Extension(Extension { node }) => { let mut maybe_plan = None; let children = children.vec(); diff --git a/datafusion/core/tests/physical_optimizer/enforce_distribution.rs b/datafusion/core/tests/physical_optimizer/enforce_distribution.rs index e0826c90dd8d..69616842c943 100644 --- a/datafusion/core/tests/physical_optimizer/enforce_distribution.rs +++ b/datafusion/core/tests/physical_optimizer/enforce_distribution.rs @@ -1783,7 +1783,7 @@ fn union_to_interleave() -> Result<()> { ); // Union - let plan = Arc::new(UnionExec::new(vec![left, right])); + let plan = Arc::new(UnionExec::new(vec![left, right])?); // final agg let plan = @@ -1827,7 +1827,7 @@ fn union_not_to_interleave() -> Result<()> { ); // Union - let plan = Arc::new(UnionExec::new(vec![left, right])); + let plan = Arc::new(UnionExec::new(vec![left, right])?); // final agg let plan = diff --git a/datafusion/core/tests/physical_optimizer/partition_statistics.rs b/datafusion/core/tests/physical_optimizer/partition_statistics.rs index 7e9d5bf1b901..ce36a5cb71a9 100644 --- a/datafusion/core/tests/physical_optimizer/partition_statistics.rs +++ b/datafusion/core/tests/physical_optimizer/partition_statistics.rs @@ -357,7 +357,7 @@ mod test { async fn test_statistic_by_partition_of_union() -> Result<()> { let scan = create_scan_exec_with_statistics(None, Some(2)).await; let union_exec: Arc = - Arc::new(UnionExec::new(vec![scan.clone(), scan])); + Arc::new(UnionExec::new(vec![scan.clone(), scan])?); let statistics = (0..union_exec.output_partitioning().partition_count()) .map(|idx| union_exec.partition_statistics(Some(idx))) .collect::>>()?; diff --git a/datafusion/core/tests/physical_optimizer/projection_pushdown.rs b/datafusion/core/tests/physical_optimizer/projection_pushdown.rs index 7160ed4184b0..fa4848ea516f 100644 --- a/datafusion/core/tests/physical_optimizer/projection_pushdown.rs +++ b/datafusion/core/tests/physical_optimizer/projection_pushdown.rs @@ -1535,7 +1535,7 @@ fn test_sort_preserving_after_projection() -> Result<()> { #[test] fn test_union_after_projection() -> Result<()> { let csv = create_simple_csv_exec(); - let union = Arc::new(UnionExec::new(vec![csv.clone(), csv.clone(), csv])); + let union = Arc::new(UnionExec::new(vec![csv.clone(), csv.clone(), csv])?); let projection: Arc = Arc::new(ProjectionExec::try_new( vec![ ProjectionExpr::new(Arc::new(Column::new("c", 2)), "c".to_string()), diff --git a/datafusion/core/tests/physical_optimizer/test_utils.rs b/datafusion/core/tests/physical_optimizer/test_utils.rs index 69dbe04927b2..623b7321b18e 100644 --- a/datafusion/core/tests/physical_optimizer/test_utils.rs +++ b/datafusion/core/tests/physical_optimizer/test_utils.rs @@ -304,7 +304,7 @@ pub fn sort_preserving_merge_exec_with_fetch( } pub fn union_exec(input: Vec>) -> Arc { - Arc::new(UnionExec::new(input)) + Arc::new(UnionExec::new(input).expect("Failed to create UnionExec")) } pub fn local_limit_exec( diff --git a/datafusion/physical-plan/src/repartition/mod.rs b/datafusion/physical-plan/src/repartition/mod.rs index 3cd6ee6c1af3..a2c86ade26a3 100644 --- a/datafusion/physical-plan/src/repartition/mod.rs +++ b/datafusion/physical-plan/src/repartition/mod.rs @@ -1783,7 +1783,7 @@ mod test { let source1 = sorted_memory_exec(&schema, sort_exprs.clone()); let source2 = sorted_memory_exec(&schema, sort_exprs); // output has multiple partitions, and is sorted - let union = UnionExec::new(vec![source1, source2]); + let union = UnionExec::new(vec![source1, source2])?; let exec = RepartitionExec::try_new(Arc::new(union), Partitioning::RoundRobinBatch(10)) .unwrap() @@ -1825,7 +1825,7 @@ mod test { let source1 = memory_exec(&schema); let source2 = memory_exec(&schema); // output has multiple partitions, but is not sorted - let union = UnionExec::new(vec![source1, source2]); + let union = UnionExec::new(vec![source1, source2])?; let exec = RepartitionExec::try_new(Arc::new(union), Partitioning::RoundRobinBatch(10)) .unwrap() diff --git a/datafusion/physical-plan/src/union.rs b/datafusion/physical-plan/src/union.rs index aca03c57b1b4..2fe384f1f793 100644 --- a/datafusion/physical-plan/src/union.rs +++ b/datafusion/physical-plan/src/union.rs @@ -101,19 +101,23 @@ pub struct UnionExec { impl UnionExec { /// Create a new UnionExec - pub fn new(inputs: Vec>) -> Self { - let schema = union_schema(&inputs); + pub fn new(inputs: Vec>) -> Result { + if inputs.is_empty() { + return exec_err!("UnionExec requires at least one input"); + } + + let schema = union_schema(&inputs)?; // The schema of the inputs and the union schema is consistent when: // - They have the same number of fields, and // - Their fields have same types at the same indices. // Here, we know that schemas are consistent and the call below can // not return an error. let cache = Self::compute_properties(&inputs, schema).unwrap(); - UnionExec { + Ok(UnionExec { inputs, metrics: ExecutionPlanMetricsSet::new(), cache, - } + }) } /// Get inputs of the execution plan @@ -220,7 +224,7 @@ impl ExecutionPlan for UnionExec { self: Arc, children: Vec>, ) -> Result> { - Ok(Arc::new(UnionExec::new(children))) + Ok(Arc::new(UnionExec::new(children)?)) } fn execute( @@ -319,7 +323,7 @@ impl ExecutionPlan for UnionExec { .map(|child| make_with_child(projection, child)) .collect::>>()?; - Ok(Some(Arc::new(UnionExec::new(new_children)))) + Ok(Some(Arc::new(UnionExec::new(new_children)?))) } } @@ -373,7 +377,7 @@ impl InterleaveExec { "Not all InterleaveExec children have a consistent hash partitioning" ); } - let cache = Self::compute_properties(&inputs); + let cache = Self::compute_properties(&inputs)?; Ok(InterleaveExec { inputs, metrics: ExecutionPlanMetricsSet::new(), @@ -387,17 +391,17 @@ impl InterleaveExec { } /// This function creates the cache object that stores the plan properties such as schema, equivalence properties, ordering, partitioning, etc. - fn compute_properties(inputs: &[Arc]) -> PlanProperties { - let schema = union_schema(inputs); + fn compute_properties(inputs: &[Arc]) -> Result { + let schema = union_schema(inputs)?; let eq_properties = EquivalenceProperties::new(schema); // Get output partitioning: let output_partitioning = inputs[0].output_partitioning().clone(); - PlanProperties::new( + Ok(PlanProperties::new( eq_properties, output_partitioning, emission_type_from_children(inputs), boundedness_from_children(inputs), - ) + )) } } @@ -538,7 +542,11 @@ pub fn can_interleave>>( .all(|partition| partition == *reference) } -fn union_schema(inputs: &[Arc]) -> SchemaRef { +fn union_schema(inputs: &[Arc]) -> Result { + if inputs.is_empty() { + return exec_err!("Cannot create union schema from empty inputs"); + } + let first_schema = inputs[0].schema(); let fields = (0..first_schema.fields().len()) @@ -581,7 +589,7 @@ fn union_schema(inputs: &[Arc]) -> SchemaRef { .flat_map(|i| i.schema().metadata().clone().into_iter()) .collect(); - Arc::new(Schema::new_with_metadata(fields, all_metadata_merged)) + Ok(Arc::new(Schema::new_with_metadata(fields, all_metadata_merged))) } /// CombinedRecordBatchStream can be used to combine a Vec of SendableRecordBatchStreams into one @@ -710,7 +718,7 @@ mod tests { let csv = test::scan_partitioned(4); let csv2 = test::scan_partitioned(5); - let union_exec = Arc::new(UnionExec::new(vec![csv, csv2])); + let union_exec = Arc::new(UnionExec::new(vec![csv, csv2])?); // Should have 9 partitions and 9 output batches assert_eq!( @@ -892,7 +900,7 @@ mod tests { let mut union_expected_eq = EquivalenceProperties::new(Arc::clone(&schema)); union_expected_eq.add_orderings(union_expected_orderings); - let union = UnionExec::new(vec![child1, child2]); + let union = UnionExec::new(vec![child1, child2])?; let union_eq_properties = union.properties().equivalence_properties(); let err_msg = format!( "Error in test id: {:?}, test case: {:?}", @@ -916,4 +924,56 @@ mod tests { assert!(lhs_orderings.contains(rhs_ordering), "{}", err_msg); } } + + #[test] + fn test_union_empty_inputs() { + // Test that UnionExec::new fails with empty inputs + let result = UnionExec::new(vec![]); + assert!(result.is_err()); + assert!(result + .unwrap_err() + .to_string() + .contains("UnionExec requires at least one input")); + } + + #[test] + fn test_union_schema_empty_inputs() { + // Test that union_schema fails with empty inputs + let result = union_schema(&[]); + assert!(result.is_err()); + assert!(result + .unwrap_err() + .to_string() + .contains("Cannot create union schema from empty inputs")); + } + + #[test] + fn test_union_single_input() -> Result<()> { + // Test that UnionExec works with a single input + let schema = create_test_schema()?; + let memory_exec = Arc::new(TestMemoryExec::try_new(&[], schema.clone(), None)?); + let union = UnionExec::new(vec![memory_exec])?; + + // Check that schema is correct + assert_eq!(union.schema(), schema); + + Ok(()) + } + + #[test] + fn test_union_multiple_inputs_still_works() -> Result<()> { + // Test that existing functionality with multiple inputs still works + let schema = create_test_schema()?; + let memory_exec1 = Arc::new(TestMemoryExec::try_new(&[], schema.clone(), None)?); + let memory_exec2 = Arc::new(TestMemoryExec::try_new(&[], schema.clone(), None)?); + + let union = UnionExec::new(vec![memory_exec1, memory_exec2])?; + + // Check that schema is correct + assert_eq!(union.schema(), schema); + // Check that we have 2 inputs + assert_eq!(union.inputs().len(), 2); + + Ok(()) + } } From 0d9f5f1a2082befe59c1626deffa05a18fb5b64c Mon Sep 17 00:00:00 2001 From: ebembi-crdb Date: Sat, 13 Sep 2025 21:03:19 +0530 Subject: [PATCH 2/4] refactor: address PR review comments for UnionExec empty inputs fix - Add new try_new method that returns Result> - Deprecate existing new method in favor of try_new - Optimize single-input case: try_new returns the input directly - Remove redundant assert!(result.is_err()) from tests - Rename test_union_multiple_inputs_still_works to test_union_schema_multiple_inputs - Update all call sites to use appropriate API (try_new for new code, deprecated new for tests) This maintains backward compatibility while providing better error handling and optimization for single-input cases. --- datafusion/core/src/physical_planner.rs | 2 +- .../enforce_distribution.rs | 6 +- .../partition_statistics.rs | 3 +- .../physical_optimizer/projection_pushdown.rs | 3 +- .../tests/physical_optimizer/test_utils.rs | 3 +- .../physical-plan/src/repartition/mod.rs | 6 +- datafusion/physical-plan/src/union.rs | 82 +++++++++++++------ datafusion/proto/src/physical_plan/mod.rs | 1 + .../tests/cases/roundtrip_physical_plan.rs | 1 + 9 files changed, 76 insertions(+), 31 deletions(-) diff --git a/datafusion/core/src/physical_planner.rs b/datafusion/core/src/physical_planner.rs index ce46840d477b..87f5dda70c4d 100644 --- a/datafusion/core/src/physical_planner.rs +++ b/datafusion/core/src/physical_planner.rs @@ -1245,7 +1245,7 @@ impl DefaultPhysicalPlanner { } // N Children - LogicalPlan::Union(_) => Arc::new(UnionExec::new(children.vec())?), + LogicalPlan::Union(_) => UnionExec::try_new(children.vec())?, LogicalPlan::Extension(Extension { node }) => { let mut maybe_plan = None; let children = children.vec(); diff --git a/datafusion/core/tests/physical_optimizer/enforce_distribution.rs b/datafusion/core/tests/physical_optimizer/enforce_distribution.rs index 69616842c943..1ddeb3c61148 100644 --- a/datafusion/core/tests/physical_optimizer/enforce_distribution.rs +++ b/datafusion/core/tests/physical_optimizer/enforce_distribution.rs @@ -1783,7 +1783,8 @@ fn union_to_interleave() -> Result<()> { ); // Union - let plan = Arc::new(UnionExec::new(vec![left, right])?); + #[allow(deprecated)] + let plan = Arc::new(UnionExec::new(vec![left, right])); // final agg let plan = @@ -1827,7 +1828,8 @@ fn union_not_to_interleave() -> Result<()> { ); // Union - let plan = Arc::new(UnionExec::new(vec![left, right])?); + #[allow(deprecated)] + let plan = Arc::new(UnionExec::new(vec![left, right])); // final agg let plan = diff --git a/datafusion/core/tests/physical_optimizer/partition_statistics.rs b/datafusion/core/tests/physical_optimizer/partition_statistics.rs index ce36a5cb71a9..eb174fba79f5 100644 --- a/datafusion/core/tests/physical_optimizer/partition_statistics.rs +++ b/datafusion/core/tests/physical_optimizer/partition_statistics.rs @@ -356,8 +356,9 @@ mod test { #[tokio::test] async fn test_statistic_by_partition_of_union() -> Result<()> { let scan = create_scan_exec_with_statistics(None, Some(2)).await; + #[allow(deprecated)] let union_exec: Arc = - Arc::new(UnionExec::new(vec![scan.clone(), scan])?); + Arc::new(UnionExec::new(vec![scan.clone(), scan])); let statistics = (0..union_exec.output_partitioning().partition_count()) .map(|idx| union_exec.partition_statistics(Some(idx))) .collect::>>()?; diff --git a/datafusion/core/tests/physical_optimizer/projection_pushdown.rs b/datafusion/core/tests/physical_optimizer/projection_pushdown.rs index fa4848ea516f..ab753d00b4a9 100644 --- a/datafusion/core/tests/physical_optimizer/projection_pushdown.rs +++ b/datafusion/core/tests/physical_optimizer/projection_pushdown.rs @@ -1535,7 +1535,8 @@ fn test_sort_preserving_after_projection() -> Result<()> { #[test] fn test_union_after_projection() -> Result<()> { let csv = create_simple_csv_exec(); - let union = Arc::new(UnionExec::new(vec![csv.clone(), csv.clone(), csv])?); + #[allow(deprecated)] + let union = Arc::new(UnionExec::new(vec![csv.clone(), csv.clone(), csv])); let projection: Arc = Arc::new(ProjectionExec::try_new( vec![ ProjectionExpr::new(Arc::new(Column::new("c", 2)), "c".to_string()), diff --git a/datafusion/core/tests/physical_optimizer/test_utils.rs b/datafusion/core/tests/physical_optimizer/test_utils.rs index 623b7321b18e..49efe24fb8f9 100644 --- a/datafusion/core/tests/physical_optimizer/test_utils.rs +++ b/datafusion/core/tests/physical_optimizer/test_utils.rs @@ -304,7 +304,8 @@ pub fn sort_preserving_merge_exec_with_fetch( } pub fn union_exec(input: Vec>) -> Arc { - Arc::new(UnionExec::new(input).expect("Failed to create UnionExec")) + #[allow(deprecated)] + Arc::new(UnionExec::new(input)) } pub fn local_limit_exec( diff --git a/datafusion/physical-plan/src/repartition/mod.rs b/datafusion/physical-plan/src/repartition/mod.rs index a2c86ade26a3..cd188a648f94 100644 --- a/datafusion/physical-plan/src/repartition/mod.rs +++ b/datafusion/physical-plan/src/repartition/mod.rs @@ -1783,7 +1783,8 @@ mod test { let source1 = sorted_memory_exec(&schema, sort_exprs.clone()); let source2 = sorted_memory_exec(&schema, sort_exprs); // output has multiple partitions, and is sorted - let union = UnionExec::new(vec![source1, source2])?; + #[allow(deprecated)] + let union = UnionExec::new(vec![source1, source2]); let exec = RepartitionExec::try_new(Arc::new(union), Partitioning::RoundRobinBatch(10)) .unwrap() @@ -1825,7 +1826,8 @@ mod test { let source1 = memory_exec(&schema); let source2 = memory_exec(&schema); // output has multiple partitions, but is not sorted - let union = UnionExec::new(vec![source1, source2])?; + #[allow(deprecated)] + let union = UnionExec::new(vec![source1, source2]); let exec = RepartitionExec::try_new(Arc::new(union), Partitioning::RoundRobinBatch(10)) .unwrap() diff --git a/datafusion/physical-plan/src/union.rs b/datafusion/physical-plan/src/union.rs index 2fe384f1f793..c707c197e25d 100644 --- a/datafusion/physical-plan/src/union.rs +++ b/datafusion/physical-plan/src/union.rs @@ -101,23 +101,49 @@ pub struct UnionExec { impl UnionExec { /// Create a new UnionExec - pub fn new(inputs: Vec>) -> Result { - if inputs.is_empty() { - return exec_err!("UnionExec requires at least one input"); - } - - let schema = union_schema(&inputs)?; + #[deprecated(since = "44.0.0", note = "Use UnionExec::try_new instead")] + pub fn new(inputs: Vec>) -> Self { + let schema = union_schema(&inputs).expect("UnionExec::new called with empty inputs"); // The schema of the inputs and the union schema is consistent when: // - They have the same number of fields, and // - Their fields have same types at the same indices. // Here, we know that schemas are consistent and the call below can // not return an error. let cache = Self::compute_properties(&inputs, schema).unwrap(); - Ok(UnionExec { + UnionExec { inputs, metrics: ExecutionPlanMetricsSet::new(), cache, - }) + } + } + + /// Try to create a new UnionExec. + /// + /// # Errors + /// Returns an error if: + /// - `inputs` is empty + /// + /// # Optimization + /// If there is only one input, returns that input directly rather than wrapping it in a UnionExec + pub fn try_new(inputs: Vec>) -> Result> { + match inputs.len() { + 0 => exec_err!("UnionExec requires at least one input"), + 1 => Ok(inputs.into_iter().next().unwrap()), + _ => { + let schema = union_schema(&inputs)?; + // The schema of the inputs and the union schema is consistent when: + // - They have the same number of fields, and + // - Their fields have same types at the same indices. + // Here, we know that schemas are consistent and the call below can + // not return an error. + let cache = Self::compute_properties(&inputs, schema).unwrap(); + Ok(Arc::new(UnionExec { + inputs, + metrics: ExecutionPlanMetricsSet::new(), + cache, + })) + } + } } /// Get inputs of the execution plan @@ -224,7 +250,7 @@ impl ExecutionPlan for UnionExec { self: Arc, children: Vec>, ) -> Result> { - Ok(Arc::new(UnionExec::new(children)?)) + UnionExec::try_new(children) } fn execute( @@ -323,7 +349,8 @@ impl ExecutionPlan for UnionExec { .map(|child| make_with_child(projection, child)) .collect::>>()?; - Ok(Some(Arc::new(UnionExec::new(new_children)?))) + #[allow(deprecated)] + Ok(Some(Arc::new(UnionExec::new(new_children)))) } } @@ -718,7 +745,8 @@ mod tests { let csv = test::scan_partitioned(4); let csv2 = test::scan_partitioned(5); - let union_exec = Arc::new(UnionExec::new(vec![csv, csv2])?); + #[allow(deprecated)] + let union_exec = Arc::new(UnionExec::new(vec![csv, csv2])); // Should have 9 partitions and 9 output batches assert_eq!( @@ -900,7 +928,8 @@ mod tests { let mut union_expected_eq = EquivalenceProperties::new(Arc::clone(&schema)); union_expected_eq.add_orderings(union_expected_orderings); - let union = UnionExec::new(vec![child1, child2])?; + #[allow(deprecated)] + let union = UnionExec::new(vec![child1, child2]); let union_eq_properties = union.properties().equivalence_properties(); let err_msg = format!( "Error in test id: {:?}, test case: {:?}", @@ -927,9 +956,8 @@ mod tests { #[test] fn test_union_empty_inputs() { - // Test that UnionExec::new fails with empty inputs - let result = UnionExec::new(vec![]); - assert!(result.is_err()); + // Test that UnionExec::try_new fails with empty inputs + let result = UnionExec::try_new(vec![]); assert!(result .unwrap_err() .to_string() @@ -940,7 +968,6 @@ mod tests { fn test_union_schema_empty_inputs() { // Test that union_schema fails with empty inputs let result = union_schema(&[]); - assert!(result.is_err()); assert!(result .unwrap_err() .to_string() @@ -949,25 +976,34 @@ mod tests { #[test] fn test_union_single_input() -> Result<()> { - // Test that UnionExec works with a single input + // Test that UnionExec::try_new returns the single input directly let schema = create_test_schema()?; - let memory_exec = Arc::new(TestMemoryExec::try_new(&[], schema.clone(), None)?); - let union = UnionExec::new(vec![memory_exec])?; + let memory_exec: Arc = Arc::new(TestMemoryExec::try_new(&[], schema.clone(), None)?); + let memory_exec_clone = Arc::clone(&memory_exec); + let result = UnionExec::try_new(vec![memory_exec])?; - // Check that schema is correct - assert_eq!(union.schema(), schema); + // Check that the result is the same as the input (no UnionExec wrapper) + assert_eq!(result.schema(), schema); + // Verify it's the same execution plan + assert!(Arc::ptr_eq(&result, &memory_exec_clone)); Ok(()) } #[test] - fn test_union_multiple_inputs_still_works() -> Result<()> { + fn test_union_schema_multiple_inputs() -> Result<()> { // Test that existing functionality with multiple inputs still works let schema = create_test_schema()?; let memory_exec1 = Arc::new(TestMemoryExec::try_new(&[], schema.clone(), None)?); let memory_exec2 = Arc::new(TestMemoryExec::try_new(&[], schema.clone(), None)?); - let union = UnionExec::new(vec![memory_exec1, memory_exec2])?; + let union_plan = UnionExec::try_new(vec![memory_exec1, memory_exec2])?; + + // Downcast to verify it's a UnionExec + let union = union_plan + .as_any() + .downcast_ref::() + .expect("Expected UnionExec"); // Check that schema is correct assert_eq!(union.schema(), schema); diff --git a/datafusion/proto/src/physical_plan/mod.rs b/datafusion/proto/src/physical_plan/mod.rs index 8e38b0d1bf5b..ad5c33e0e74e 100644 --- a/datafusion/proto/src/physical_plan/mod.rs +++ b/datafusion/proto/src/physical_plan/mod.rs @@ -1405,6 +1405,7 @@ impl protobuf::PhysicalPlanNode { for input in &union.inputs { inputs.push(input.try_into_physical_plan(ctx, runtime, extension_codec)?); } + #[allow(deprecated)] Ok(Arc::new(UnionExec::new(inputs))) } diff --git a/datafusion/proto/tests/cases/roundtrip_physical_plan.rs b/datafusion/proto/tests/cases/roundtrip_physical_plan.rs index a5357a132eef..f408ec1a9124 100644 --- a/datafusion/proto/tests/cases/roundtrip_physical_plan.rs +++ b/datafusion/proto/tests/cases/roundtrip_physical_plan.rs @@ -1649,6 +1649,7 @@ fn roundtrip_union() -> Result<()> { let left = EmptyExec::new(Arc::new(schema_left)); let right = EmptyExec::new(Arc::new(schema_right)); let inputs: Vec> = vec![Arc::new(left), Arc::new(right)]; + #[allow(deprecated)] let union = UnionExec::new(inputs); roundtrip_test(Arc::new(union)) } From 179beffae023d6a27d61a6902acfa231aafbd2d8 Mon Sep 17 00:00:00 2001 From: ebembi-crdb Date: Sun, 14 Sep 2025 17:51:15 +0530 Subject: [PATCH 3/4] Fix cargo fmt and clippy warnings - Add proper feature gates for parquet_encryption in datasource-parquet - Format code to pass cargo fmt checks - All tests passing --- datafusion/datasource-parquet/src/opener.rs | 11 +++++++ datafusion/datasource-parquet/src/source.rs | 3 ++ datafusion/physical-plan/src/union.rs | 35 ++++++++++++--------- 3 files changed, 35 insertions(+), 14 deletions(-) diff --git a/datafusion/datasource-parquet/src/opener.rs b/datafusion/datasource-parquet/src/opener.rs index 93a3d4af5432..44fad2d2dd9d 100644 --- a/datafusion/datasource-parquet/src/opener.rs +++ b/datafusion/datasource-parquet/src/opener.rs @@ -98,6 +98,7 @@ pub(super) struct ParquetOpener { /// Coerce INT96 timestamps to specific TimeUnit pub coerce_int96: Option, /// Optional parquet FileDecryptionProperties + #[cfg(feature = "parquet_encryption")] pub file_decryption_properties: Option>, /// Rewrite expressions in the context of the file schema pub(crate) expr_adapter_factory: Option>, @@ -151,9 +152,11 @@ impl FileOpener for ParquetOpener { let mut predicate_file_schema = Arc::clone(&self.logical_file_schema); let enable_page_index = self.enable_page_index; + #[cfg(feature = "parquet_encryption")] let encryption_context = self.get_encryption_context(); Ok(Box::pin(async move { + #[cfg(feature = "parquet_encryption")] let file_decryption_properties = encryption_context .get_file_decryption_properties(&file_location) .await?; @@ -502,6 +505,7 @@ where } #[derive(Default)] +#[cfg_attr(not(feature = "parquet_encryption"), allow(dead_code))] struct EncryptionContext { #[cfg(feature = "parquet_encryption")] file_decryption_properties: Option>, @@ -544,6 +548,7 @@ impl EncryptionContext { } #[cfg(not(feature = "parquet_encryption"))] +#[allow(dead_code)] impl EncryptionContext { async fn get_file_decryption_properties( &self, @@ -563,6 +568,7 @@ impl ParquetOpener { } #[cfg(not(feature = "parquet_encryption"))] + #[allow(dead_code)] fn get_encryption_context(&self) -> EncryptionContext { EncryptionContext::default() } @@ -819,6 +825,7 @@ mod test { schema_adapter_factory: Arc::new(DefaultSchemaAdapterFactory), enable_row_group_stats_pruning: true, coerce_int96: None, + #[cfg(feature = "parquet_encryption")] file_decryption_properties: None, expr_adapter_factory: Some(Arc::new(DefaultPhysicalExprAdapterFactory)), #[cfg(feature = "parquet_encryption")] @@ -907,6 +914,7 @@ mod test { schema_adapter_factory: Arc::new(DefaultSchemaAdapterFactory), enable_row_group_stats_pruning: true, coerce_int96: None, + #[cfg(feature = "parquet_encryption")] file_decryption_properties: None, expr_adapter_factory: Some(Arc::new(DefaultPhysicalExprAdapterFactory)), #[cfg(feature = "parquet_encryption")] @@ -1011,6 +1019,7 @@ mod test { schema_adapter_factory: Arc::new(DefaultSchemaAdapterFactory), enable_row_group_stats_pruning: true, coerce_int96: None, + #[cfg(feature = "parquet_encryption")] file_decryption_properties: None, expr_adapter_factory: Some(Arc::new(DefaultPhysicalExprAdapterFactory)), #[cfg(feature = "parquet_encryption")] @@ -1125,6 +1134,7 @@ mod test { schema_adapter_factory: Arc::new(DefaultSchemaAdapterFactory), enable_row_group_stats_pruning: false, // note that this is false! coerce_int96: None, + #[cfg(feature = "parquet_encryption")] file_decryption_properties: None, expr_adapter_factory: Some(Arc::new(DefaultPhysicalExprAdapterFactory)), #[cfg(feature = "parquet_encryption")] @@ -1240,6 +1250,7 @@ mod test { schema_adapter_factory: Arc::new(DefaultSchemaAdapterFactory), enable_row_group_stats_pruning: true, coerce_int96: None, + #[cfg(feature = "parquet_encryption")] file_decryption_properties: None, expr_adapter_factory: Some(Arc::new(DefaultPhysicalExprAdapterFactory)), #[cfg(feature = "parquet_encryption")] diff --git a/datafusion/datasource-parquet/src/source.rs b/datafusion/datasource-parquet/src/source.rs index 007c239ef492..644cea85ca0a 100644 --- a/datafusion/datasource-parquet/src/source.rs +++ b/datafusion/datasource-parquet/src/source.rs @@ -52,6 +52,7 @@ use datafusion_physical_plan::metrics::Count; use datafusion_physical_plan::metrics::ExecutionPlanMetricsSet; use datafusion_physical_plan::DisplayFormatType; +#[cfg(feature = "parquet_encryption")] use datafusion_common::encryption::map_config_decryption_to_decryption; #[cfg(feature = "parquet_encryption")] use datafusion_execution::parquet_encryption::EncryptionFactory; @@ -541,6 +542,7 @@ impl FileSource for ParquetSource { Arc::new(DefaultParquetFileReaderFactory::new(object_store)) as _ }); + #[cfg(feature = "parquet_encryption")] let file_decryption_properties = self .table_parquet_options() .crypto @@ -576,6 +578,7 @@ impl FileSource for ParquetSource { enable_row_group_stats_pruning: self.table_parquet_options.global.pruning, schema_adapter_factory, coerce_int96, + #[cfg(feature = "parquet_encryption")] file_decryption_properties, expr_adapter_factory, #[cfg(feature = "parquet_encryption")] diff --git a/datafusion/physical-plan/src/union.rs b/datafusion/physical-plan/src/union.rs index c707c197e25d..376361841481 100644 --- a/datafusion/physical-plan/src/union.rs +++ b/datafusion/physical-plan/src/union.rs @@ -103,7 +103,8 @@ impl UnionExec { /// Create a new UnionExec #[deprecated(since = "44.0.0", note = "Use UnionExec::try_new instead")] pub fn new(inputs: Vec>) -> Self { - let schema = union_schema(&inputs).expect("UnionExec::new called with empty inputs"); + let schema = + union_schema(&inputs).expect("UnionExec::new called with empty inputs"); // The schema of the inputs and the union schema is consistent when: // - They have the same number of fields, and // - Their fields have same types at the same indices. @@ -117,15 +118,17 @@ impl UnionExec { } } - /// Try to create a new UnionExec. - /// + /// Try to create a new UnionExec. + /// /// # Errors /// Returns an error if: /// - `inputs` is empty - /// + /// /// # Optimization /// If there is only one input, returns that input directly rather than wrapping it in a UnionExec - pub fn try_new(inputs: Vec>) -> Result> { + pub fn try_new( + inputs: Vec>, + ) -> Result> { match inputs.len() { 0 => exec_err!("UnionExec requires at least one input"), 1 => Ok(inputs.into_iter().next().unwrap()), @@ -573,7 +576,7 @@ fn union_schema(inputs: &[Arc]) -> Result { if inputs.is_empty() { return exec_err!("Cannot create union schema from empty inputs"); } - + let first_schema = inputs[0].schema(); let fields = (0..first_schema.fields().len()) @@ -616,7 +619,10 @@ fn union_schema(inputs: &[Arc]) -> Result { .flat_map(|i| i.schema().metadata().clone().into_iter()) .collect(); - Ok(Arc::new(Schema::new_with_metadata(fields, all_metadata_merged))) + Ok(Arc::new(Schema::new_with_metadata( + fields, + all_metadata_merged, + ))) } /// CombinedRecordBatchStream can be used to combine a Vec of SendableRecordBatchStreams into one @@ -978,15 +984,16 @@ mod tests { fn test_union_single_input() -> Result<()> { // Test that UnionExec::try_new returns the single input directly let schema = create_test_schema()?; - let memory_exec: Arc = Arc::new(TestMemoryExec::try_new(&[], schema.clone(), None)?); + let memory_exec: Arc = + Arc::new(TestMemoryExec::try_new(&[], schema.clone(), None)?); let memory_exec_clone = Arc::clone(&memory_exec); let result = UnionExec::try_new(vec![memory_exec])?; - + // Check that the result is the same as the input (no UnionExec wrapper) assert_eq!(result.schema(), schema); // Verify it's the same execution plan assert!(Arc::ptr_eq(&result, &memory_exec_clone)); - + Ok(()) } @@ -996,20 +1003,20 @@ mod tests { let schema = create_test_schema()?; let memory_exec1 = Arc::new(TestMemoryExec::try_new(&[], schema.clone(), None)?); let memory_exec2 = Arc::new(TestMemoryExec::try_new(&[], schema.clone(), None)?); - + let union_plan = UnionExec::try_new(vec![memory_exec1, memory_exec2])?; - + // Downcast to verify it's a UnionExec let union = union_plan .as_any() .downcast_ref::() .expect("Expected UnionExec"); - + // Check that schema is correct assert_eq!(union.schema(), schema); // Check that we have 2 inputs assert_eq!(union.inputs().len(), 2); - + Ok(()) } } From 0400ed5eaf79283b3b4deafbf8ae29cb099dfed4 Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Mon, 15 Sep 2025 06:39:09 -0400 Subject: [PATCH 4/4] Fix clippy --- datafusion/physical-plan/src/union.rs | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/datafusion/physical-plan/src/union.rs b/datafusion/physical-plan/src/union.rs index 376361841481..f1e9ee53ac86 100644 --- a/datafusion/physical-plan/src/union.rs +++ b/datafusion/physical-plan/src/union.rs @@ -985,7 +985,7 @@ mod tests { // Test that UnionExec::try_new returns the single input directly let schema = create_test_schema()?; let memory_exec: Arc = - Arc::new(TestMemoryExec::try_new(&[], schema.clone(), None)?); + Arc::new(TestMemoryExec::try_new(&[], Arc::clone(&schema), None)?); let memory_exec_clone = Arc::clone(&memory_exec); let result = UnionExec::try_new(vec![memory_exec])?; @@ -1001,8 +1001,10 @@ mod tests { fn test_union_schema_multiple_inputs() -> Result<()> { // Test that existing functionality with multiple inputs still works let schema = create_test_schema()?; - let memory_exec1 = Arc::new(TestMemoryExec::try_new(&[], schema.clone(), None)?); - let memory_exec2 = Arc::new(TestMemoryExec::try_new(&[], schema.clone(), None)?); + let memory_exec1 = + Arc::new(TestMemoryExec::try_new(&[], Arc::clone(&schema), None)?); + let memory_exec2 = + Arc::new(TestMemoryExec::try_new(&[], Arc::clone(&schema), None)?); let union_plan = UnionExec::try_new(vec![memory_exec1, memory_exec2])?;