Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion datafusion/core/src/physical_planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1245,7 +1245,7 @@ impl DefaultPhysicalPlanner {
}

// N Children
LogicalPlan::Union(_) => Arc::new(UnionExec::new(children.vec())),
LogicalPlan::Union(_) => UnionExec::try_new(children.vec())?,
LogicalPlan::Extension(Extension { node }) => {
let mut maybe_plan = None;
let children = children.vec();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1783,6 +1783,7 @@ fn union_to_interleave() -> Result<()> {
);

// Union
#[allow(deprecated)]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's have an issue to clean these up and add a // TODO (issue link) resolve deprecation comment

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let plan = Arc::new(UnionExec::new(vec![left, right]));

// final agg
Expand Down Expand Up @@ -1827,6 +1828,7 @@ fn union_not_to_interleave() -> Result<()> {
);

// Union
#[allow(deprecated)]
let plan = Arc::new(UnionExec::new(vec![left, right]));

// final agg
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -356,6 +356,7 @@ mod test {
#[tokio::test]
async fn test_statistic_by_partition_of_union() -> Result<()> {
let scan = create_scan_exec_with_statistics(None, Some(2)).await;
#[allow(deprecated)]
let union_exec: Arc<dyn ExecutionPlan> =
Arc::new(UnionExec::new(vec![scan.clone(), scan]));
let statistics = (0..union_exec.output_partitioning().partition_count())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1535,6 +1535,7 @@ fn test_sort_preserving_after_projection() -> Result<()> {
#[test]
fn test_union_after_projection() -> Result<()> {
let csv = create_simple_csv_exec();
#[allow(deprecated)]
let union = Arc::new(UnionExec::new(vec![csv.clone(), csv.clone(), csv]));
let projection: Arc<dyn ExecutionPlan> = Arc::new(ProjectionExec::try_new(
vec![
Expand Down
1 change: 1 addition & 0 deletions datafusion/core/tests/physical_optimizer/test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -304,6 +304,7 @@ pub fn sort_preserving_merge_exec_with_fetch(
}

pub fn union_exec(input: Vec<Arc<dyn ExecutionPlan>>) -> Arc<dyn ExecutionPlan> {
#[allow(deprecated)]
Arc::new(UnionExec::new(input))
}

Expand Down
11 changes: 11 additions & 0 deletions datafusion/datasource-parquet/src/opener.rs
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ pub(super) struct ParquetOpener {
/// Coerce INT96 timestamps to specific TimeUnit
pub coerce_int96: Option<TimeUnit>,
/// Optional parquet FileDecryptionProperties
#[cfg(feature = "parquet_encryption")]
pub file_decryption_properties: Option<Arc<FileDecryptionProperties>>,
/// Rewrite expressions in the context of the file schema
pub(crate) expr_adapter_factory: Option<Arc<dyn PhysicalExprAdapterFactory>>,
Expand Down Expand Up @@ -151,9 +152,11 @@ impl FileOpener for ParquetOpener {
let mut predicate_file_schema = Arc::clone(&self.logical_file_schema);

let enable_page_index = self.enable_page_index;
#[cfg(feature = "parquet_encryption")]
let encryption_context = self.get_encryption_context();

Ok(Box::pin(async move {
#[cfg(feature = "parquet_encryption")]
let file_decryption_properties = encryption_context
.get_file_decryption_properties(&file_location)
.await?;
Expand Down Expand Up @@ -502,6 +505,7 @@ where
}

#[derive(Default)]
#[cfg_attr(not(feature = "parquet_encryption"), allow(dead_code))]
struct EncryptionContext {
#[cfg(feature = "parquet_encryption")]
file_decryption_properties: Option<Arc<FileDecryptionProperties>>,
Expand Down Expand Up @@ -544,6 +548,7 @@ impl EncryptionContext {
}

#[cfg(not(feature = "parquet_encryption"))]
#[allow(dead_code)]
impl EncryptionContext {
async fn get_file_decryption_properties(
&self,
Expand All @@ -563,6 +568,7 @@ impl ParquetOpener {
}

#[cfg(not(feature = "parquet_encryption"))]
#[allow(dead_code)]
fn get_encryption_context(&self) -> EncryptionContext {
EncryptionContext::default()
}
Expand Down Expand Up @@ -819,6 +825,7 @@ mod test {
schema_adapter_factory: Arc::new(DefaultSchemaAdapterFactory),
enable_row_group_stats_pruning: true,
coerce_int96: None,
#[cfg(feature = "parquet_encryption")]
file_decryption_properties: None,
expr_adapter_factory: Some(Arc::new(DefaultPhysicalExprAdapterFactory)),
#[cfg(feature = "parquet_encryption")]
Expand Down Expand Up @@ -907,6 +914,7 @@ mod test {
schema_adapter_factory: Arc::new(DefaultSchemaAdapterFactory),
enable_row_group_stats_pruning: true,
coerce_int96: None,
#[cfg(feature = "parquet_encryption")]
file_decryption_properties: None,
expr_adapter_factory: Some(Arc::new(DefaultPhysicalExprAdapterFactory)),
#[cfg(feature = "parquet_encryption")]
Expand Down Expand Up @@ -1011,6 +1019,7 @@ mod test {
schema_adapter_factory: Arc::new(DefaultSchemaAdapterFactory),
enable_row_group_stats_pruning: true,
coerce_int96: None,
#[cfg(feature = "parquet_encryption")]
file_decryption_properties: None,
expr_adapter_factory: Some(Arc::new(DefaultPhysicalExprAdapterFactory)),
#[cfg(feature = "parquet_encryption")]
Expand Down Expand Up @@ -1125,6 +1134,7 @@ mod test {
schema_adapter_factory: Arc::new(DefaultSchemaAdapterFactory),
enable_row_group_stats_pruning: false, // note that this is false!
coerce_int96: None,
#[cfg(feature = "parquet_encryption")]
file_decryption_properties: None,
expr_adapter_factory: Some(Arc::new(DefaultPhysicalExprAdapterFactory)),
#[cfg(feature = "parquet_encryption")]
Expand Down Expand Up @@ -1240,6 +1250,7 @@ mod test {
schema_adapter_factory: Arc::new(DefaultSchemaAdapterFactory),
enable_row_group_stats_pruning: true,
coerce_int96: None,
#[cfg(feature = "parquet_encryption")]
file_decryption_properties: None,
expr_adapter_factory: Some(Arc::new(DefaultPhysicalExprAdapterFactory)),
#[cfg(feature = "parquet_encryption")]
Expand Down
3 changes: 3 additions & 0 deletions datafusion/datasource-parquet/src/source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ use datafusion_physical_plan::metrics::Count;
use datafusion_physical_plan::metrics::ExecutionPlanMetricsSet;
use datafusion_physical_plan::DisplayFormatType;

#[cfg(feature = "parquet_encryption")]
use datafusion_common::encryption::map_config_decryption_to_decryption;
#[cfg(feature = "parquet_encryption")]
use datafusion_execution::parquet_encryption::EncryptionFactory;
Expand Down Expand Up @@ -541,6 +542,7 @@ impl FileSource for ParquetSource {
Arc::new(DefaultParquetFileReaderFactory::new(object_store)) as _
});

#[cfg(feature = "parquet_encryption")]
let file_decryption_properties = self
.table_parquet_options()
.crypto
Expand Down Expand Up @@ -576,6 +578,7 @@ impl FileSource for ParquetSource {
enable_row_group_stats_pruning: self.table_parquet_options.global.pruning,
schema_adapter_factory,
coerce_int96,
#[cfg(feature = "parquet_encryption")]
file_decryption_properties,
expr_adapter_factory,
#[cfg(feature = "parquet_encryption")]
Expand Down
2 changes: 2 additions & 0 deletions datafusion/physical-plan/src/repartition/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1783,6 +1783,7 @@ mod test {
let source1 = sorted_memory_exec(&schema, sort_exprs.clone());
let source2 = sorted_memory_exec(&schema, sort_exprs);
// output has multiple partitions, and is sorted
#[allow(deprecated)]
let union = UnionExec::new(vec![source1, source2]);
let exec =
RepartitionExec::try_new(Arc::new(union), Partitioning::RoundRobinBatch(10))
Expand Down Expand Up @@ -1825,6 +1826,7 @@ mod test {
let source1 = memory_exec(&schema);
let source2 = memory_exec(&schema);
// output has multiple partitions, but is not sorted
#[allow(deprecated)]
let union = UnionExec::new(vec![source1, source2]);
let exec =
RepartitionExec::try_new(Arc::new(union), Partitioning::RoundRobinBatch(10))
Expand Down
123 changes: 114 additions & 9 deletions datafusion/physical-plan/src/union.rs
Original file line number Diff line number Diff line change
Expand Up @@ -101,8 +101,10 @@ pub struct UnionExec {

impl UnionExec {
/// Create a new UnionExec
#[deprecated(since = "44.0.0", note = "Use UnionExec::try_new instead")]
pub fn new(inputs: Vec<Arc<dyn ExecutionPlan>>) -> Self {
let schema = union_schema(&inputs);
let schema =
union_schema(&inputs).expect("UnionExec::new called with empty inputs");
// The schema of the inputs and the union schema is consistent when:
// - They have the same number of fields, and
// - Their fields have same types at the same indices.
Expand All @@ -116,6 +118,37 @@ impl UnionExec {
}
}

/// Try to create a new UnionExec.
///
/// # Errors
/// Returns an error if:
/// - `inputs` is empty
///
/// # Optimization
/// If there is only one input, returns that input directly rather than wrapping it in a UnionExec
pub fn try_new(
inputs: Vec<Arc<dyn ExecutionPlan>>,
) -> Result<Arc<dyn ExecutionPlan>> {
match inputs.len() {
0 => exec_err!("UnionExec requires at least one input"),
1 => Ok(inputs.into_iter().next().unwrap()),
_ => {
let schema = union_schema(&inputs)?;
// The schema of the inputs and the union schema is consistent when:
// - They have the same number of fields, and
// - Their fields have same types at the same indices.
// Here, we know that schemas are consistent and the call below can
// not return an error.
let cache = Self::compute_properties(&inputs, schema).unwrap();
Ok(Arc::new(UnionExec {
inputs,
metrics: ExecutionPlanMetricsSet::new(),
cache,
}))
}
}
}

/// Get inputs of the execution plan
pub fn inputs(&self) -> &Vec<Arc<dyn ExecutionPlan>> {
&self.inputs
Expand Down Expand Up @@ -220,7 +253,7 @@ impl ExecutionPlan for UnionExec {
self: Arc<Self>,
children: Vec<Arc<dyn ExecutionPlan>>,
) -> Result<Arc<dyn ExecutionPlan>> {
Ok(Arc::new(UnionExec::new(children)))
UnionExec::try_new(children)
}

fn execute(
Expand Down Expand Up @@ -319,6 +352,7 @@ impl ExecutionPlan for UnionExec {
.map(|child| make_with_child(projection, child))
.collect::<Result<Vec<_>>>()?;

#[allow(deprecated)]
Ok(Some(Arc::new(UnionExec::new(new_children))))
}
}
Expand Down Expand Up @@ -373,7 +407,7 @@ impl InterleaveExec {
"Not all InterleaveExec children have a consistent hash partitioning"
);
}
let cache = Self::compute_properties(&inputs);
let cache = Self::compute_properties(&inputs)?;
Ok(InterleaveExec {
inputs,
metrics: ExecutionPlanMetricsSet::new(),
Expand All @@ -387,17 +421,17 @@ impl InterleaveExec {
}

/// This function creates the cache object that stores the plan properties such as schema, equivalence properties, ordering, partitioning, etc.
fn compute_properties(inputs: &[Arc<dyn ExecutionPlan>]) -> PlanProperties {
let schema = union_schema(inputs);
fn compute_properties(inputs: &[Arc<dyn ExecutionPlan>]) -> Result<PlanProperties> {
let schema = union_schema(inputs)?;
let eq_properties = EquivalenceProperties::new(schema);
// Get output partitioning:
let output_partitioning = inputs[0].output_partitioning().clone();
PlanProperties::new(
Ok(PlanProperties::new(
eq_properties,
output_partitioning,
emission_type_from_children(inputs),
boundedness_from_children(inputs),
)
))
}
}

Expand Down Expand Up @@ -538,7 +572,11 @@ pub fn can_interleave<T: Borrow<Arc<dyn ExecutionPlan>>>(
.all(|partition| partition == *reference)
}

fn union_schema(inputs: &[Arc<dyn ExecutionPlan>]) -> SchemaRef {
fn union_schema(inputs: &[Arc<dyn ExecutionPlan>]) -> Result<SchemaRef> {
if inputs.is_empty() {
return exec_err!("Cannot create union schema from empty inputs");
}

let first_schema = inputs[0].schema();

let fields = (0..first_schema.fields().len())
Expand Down Expand Up @@ -581,7 +619,10 @@ fn union_schema(inputs: &[Arc<dyn ExecutionPlan>]) -> SchemaRef {
.flat_map(|i| i.schema().metadata().clone().into_iter())
.collect();

Arc::new(Schema::new_with_metadata(fields, all_metadata_merged))
Ok(Arc::new(Schema::new_with_metadata(
fields,
all_metadata_merged,
)))
}

/// CombinedRecordBatchStream can be used to combine a Vec of SendableRecordBatchStreams into one
Expand Down Expand Up @@ -710,6 +751,7 @@ mod tests {
let csv = test::scan_partitioned(4);
let csv2 = test::scan_partitioned(5);

#[allow(deprecated)]
let union_exec = Arc::new(UnionExec::new(vec![csv, csv2]));

// Should have 9 partitions and 9 output batches
Expand Down Expand Up @@ -892,6 +934,7 @@ mod tests {
let mut union_expected_eq = EquivalenceProperties::new(Arc::clone(&schema));
union_expected_eq.add_orderings(union_expected_orderings);

#[allow(deprecated)]
let union = UnionExec::new(vec![child1, child2]);
let union_eq_properties = union.properties().equivalence_properties();
let err_msg = format!(
Expand All @@ -916,4 +959,66 @@ mod tests {
assert!(lhs_orderings.contains(rhs_ordering), "{}", err_msg);
}
}

#[test]
fn test_union_empty_inputs() {
// Test that UnionExec::try_new fails with empty inputs
let result = UnionExec::try_new(vec![]);
assert!(result
.unwrap_err()
.to_string()
.contains("UnionExec requires at least one input"));
}

#[test]
fn test_union_schema_empty_inputs() {
// Test that union_schema fails with empty inputs
let result = union_schema(&[]);
assert!(result
.unwrap_err()
.to_string()
.contains("Cannot create union schema from empty inputs"));
}

#[test]
fn test_union_single_input() -> Result<()> {
// Test that UnionExec::try_new returns the single input directly
let schema = create_test_schema()?;
let memory_exec: Arc<dyn ExecutionPlan> =
Arc::new(TestMemoryExec::try_new(&[], Arc::clone(&schema), None)?);
let memory_exec_clone = Arc::clone(&memory_exec);
let result = UnionExec::try_new(vec![memory_exec])?;

// Check that the result is the same as the input (no UnionExec wrapper)
assert_eq!(result.schema(), schema);
// Verify it's the same execution plan
assert!(Arc::ptr_eq(&result, &memory_exec_clone));

Ok(())
}

#[test]
fn test_union_schema_multiple_inputs() -> Result<()> {
// Test that existing functionality with multiple inputs still works
let schema = create_test_schema()?;
let memory_exec1 =
Arc::new(TestMemoryExec::try_new(&[], Arc::clone(&schema), None)?);
let memory_exec2 =
Arc::new(TestMemoryExec::try_new(&[], Arc::clone(&schema), None)?);

let union_plan = UnionExec::try_new(vec![memory_exec1, memory_exec2])?;

// Downcast to verify it's a UnionExec
let union = union_plan
.as_any()
.downcast_ref::<UnionExec>()
.expect("Expected UnionExec");

// Check that schema is correct
assert_eq!(union.schema(), schema);
// Check that we have 2 inputs
assert_eq!(union.inputs().len(), 2);

Ok(())
}
}
1 change: 1 addition & 0 deletions datafusion/proto/src/physical_plan/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1405,6 +1405,7 @@ impl protobuf::PhysicalPlanNode {
for input in &union.inputs {
inputs.push(input.try_into_physical_plan(ctx, runtime, extension_codec)?);
}
#[allow(deprecated)]
Ok(Arc::new(UnionExec::new(inputs)))
}

Expand Down
1 change: 1 addition & 0 deletions datafusion/proto/tests/cases/roundtrip_physical_plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1649,6 +1649,7 @@ fn roundtrip_union() -> Result<()> {
let left = EmptyExec::new(Arc::new(schema_left));
let right = EmptyExec::new(Arc::new(schema_right));
let inputs: Vec<Arc<dyn ExecutionPlan>> = vec![Arc::new(left), Arc::new(right)];
#[allow(deprecated)]
let union = UnionExec::new(inputs);
roundtrip_test(Arc::new(union))
}
Expand Down