Skip to content

Commit d19bf52

Browse files
authored
fix EquivalenceProperties calculation in DataSourceExec (#17323)
1 parent 5021b39 commit d19bf52

File tree

8 files changed

+83
-57
lines changed

8 files changed

+83
-57
lines changed

datafusion/core/tests/physical_optimizer/filter_pushdown/util.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -142,6 +142,10 @@ impl FileSource for TestSource {
142142
})
143143
}
144144

145+
fn filter(&self) -> Option<Arc<dyn PhysicalExpr>> {
146+
self.predicate.clone()
147+
}
148+
145149
fn as_any(&self) -> &dyn Any {
146150
todo!("should not be called")
147151
}

datafusion/datasource-parquet/src/source.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -587,6 +587,10 @@ impl FileSource for ParquetSource {
587587
self
588588
}
589589

590+
fn filter(&self) -> Option<Arc<dyn PhysicalExpr>> {
591+
self.predicate.clone()
592+
}
593+
590594
fn with_batch_size(&self, batch_size: usize) -> Arc<dyn FileSource> {
591595
let mut conf = self.clone();
592596
conf.batch_size = Some(batch_size);

datafusion/datasource/src/file.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,10 @@ pub trait FileSource: Send + Sync {
6969
fn with_projection(&self, config: &FileScanConfig) -> Arc<dyn FileSource>;
7070
/// Initialize new instance with projected statistics
7171
fn with_statistics(&self, statistics: Statistics) -> Arc<dyn FileSource>;
72+
/// Returns the filter expression that will be applied during the file scan.
73+
fn filter(&self) -> Option<Arc<dyn PhysicalExpr>> {
74+
None
75+
}
7276
/// Return execution plan metrics
7377
fn metrics(&self) -> &ExecutionPlanMetricsSet;
7478
/// Return projected statistics

datafusion/datasource/src/file_scan_config.rs

Lines changed: 40 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -52,18 +52,20 @@ use datafusion_common::{
5252
use datafusion_execution::{
5353
object_store::ObjectStoreUrl, SendableRecordBatchStream, TaskContext,
5454
};
55-
use datafusion_physical_expr::expressions::Column;
55+
use datafusion_physical_expr::{expressions::Column, utils::reassign_predicate_columns};
5656
use datafusion_physical_expr::{EquivalenceProperties, Partitioning};
5757
use datafusion_physical_expr_adapter::PhysicalExprAdapterFactory;
5858
use datafusion_physical_expr_common::physical_expr::PhysicalExpr;
5959
use datafusion_physical_expr_common::sort_expr::{LexOrdering, PhysicalSortExpr};
60-
use datafusion_physical_plan::filter_pushdown::FilterPushdownPropagation;
6160
use datafusion_physical_plan::{
6261
display::{display_orderings, ProjectSchemaDisplay},
6362
metrics::ExecutionPlanMetricsSet,
6463
projection::{all_alias_free_columns, new_projections_for_columns, ProjectionExec},
6564
DisplayAs, DisplayFormatType, ExecutionPlan,
6665
};
66+
use datafusion_physical_plan::{
67+
filter::collect_columns_from_predicate, filter_pushdown::FilterPushdownPropagation,
68+
};
6769

6870
use datafusion_physical_plan::coop::cooperative;
6971
use datafusion_physical_plan::execution_plan::SchedulingType;
@@ -577,8 +579,31 @@ impl DataSource for FileScanConfig {
577579

578580
fn eq_properties(&self) -> EquivalenceProperties {
579581
let (schema, constraints, _, orderings) = self.project();
580-
EquivalenceProperties::new_with_orderings(schema, orderings)
581-
.with_constraints(constraints)
582+
let mut eq_properties =
583+
EquivalenceProperties::new_with_orderings(Arc::clone(&schema), orderings)
584+
.with_constraints(constraints);
585+
if let Some(filter) = self.file_source.filter() {
586+
// We need to remap column indexes to match the projected schema since that's what the equivalence properties deal with.
587+
// Note that this will *ignore* any non-projected columns: these don't factor into ordering / equivalence.
588+
match reassign_predicate_columns(filter, &schema, true) {
589+
Ok(filter) => {
590+
match Self::add_filter_equivalence_info(filter, &mut eq_properties) {
591+
Ok(()) => {}
592+
Err(e) => {
593+
warn!("Failed to add filter equivalence info: {e}");
594+
#[cfg(debug_assertions)]
595+
panic!("Failed to add filter equivalence info: {e}");
596+
}
597+
}
598+
}
599+
Err(e) => {
600+
warn!("Failed to reassign predicate columns: {e}");
601+
#[cfg(debug_assertions)]
602+
panic!("Failed to reassign predicate columns: {e}");
603+
}
604+
};
605+
}
606+
eq_properties
582607
}
583608

584609
fn scheduling_type(&self) -> SchedulingType {
@@ -724,6 +749,17 @@ impl FileScanConfig {
724749
))
725750
}
726751

752+
fn add_filter_equivalence_info(
753+
filter: Arc<dyn PhysicalExpr>,
754+
eq_properties: &mut EquivalenceProperties,
755+
) -> Result<()> {
756+
let (equal_pairs, _) = collect_columns_from_predicate(&filter);
757+
for (lhs, rhs) in equal_pairs {
758+
eq_properties.add_equal_conditions(Arc::clone(lhs), Arc::clone(rhs))?
759+
}
760+
Ok(())
761+
}
762+
727763
pub fn projected_constraints(&self) -> Constraints {
728764
let indexes = self.projection_indices();
729765
self.constraints.project(&indexes).unwrap_or_default()

datafusion/datasource/src/source.rs

Lines changed: 2 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -39,11 +39,8 @@ use crate::file_scan_config::FileScanConfig;
3939
use datafusion_common::config::ConfigOptions;
4040
use datafusion_common::{Constraints, Result, Statistics};
4141
use datafusion_execution::{SendableRecordBatchStream, TaskContext};
42-
use datafusion_physical_expr::{
43-
conjunction, EquivalenceProperties, Partitioning, PhysicalExpr,
44-
};
42+
use datafusion_physical_expr::{EquivalenceProperties, Partitioning, PhysicalExpr};
4543
use datafusion_physical_expr_common::sort_expr::LexOrdering;
46-
use datafusion_physical_plan::filter::collect_columns_from_predicate;
4744
use datafusion_physical_plan::filter_pushdown::{
4845
ChildPushdownResult, FilterPushdownPhase, FilterPushdownPropagation, PushedDown,
4946
};
@@ -375,21 +372,10 @@ impl ExecutionPlan for DataSourceExec {
375372
Some(data_source) => {
376373
let mut new_node = self.clone();
377374
new_node.data_source = data_source;
375+
// Re-compute properties since we have new filters which will impact equivalence info
378376
new_node.cache =
379377
Self::compute_properties(Arc::clone(&new_node.data_source));
380378

381-
// Recompute equivalence info using new filters
382-
let filter = conjunction(
383-
res.filters
384-
.iter()
385-
.zip(parent_filters)
386-
.filter_map(|(s, f)| match s {
387-
PushedDown::Yes => Some(f),
388-
PushedDown::No => None,
389-
})
390-
.collect_vec(),
391-
);
392-
new_node = new_node.add_filter_equivalence_info(filter)?;
393379
Ok(FilterPushdownPropagation {
394380
filters: res.filters,
395381
updated_node: Some(Arc::new(new_node)),
@@ -437,20 +423,6 @@ impl DataSourceExec {
437423
self
438424
}
439425

440-
/// Add filters' equivalence info
441-
fn add_filter_equivalence_info(
442-
mut self,
443-
filter: Arc<dyn PhysicalExpr>,
444-
) -> Result<Self> {
445-
let (equal_pairs, _) = collect_columns_from_predicate(&filter);
446-
for (lhs, rhs) in equal_pairs {
447-
self.cache
448-
.eq_properties
449-
.add_equal_conditions(Arc::clone(lhs), Arc::clone(rhs))?
450-
}
451-
Ok(self)
452-
}
453-
454426
fn compute_properties(data_source: Arc<dyn DataSource>) -> PlanProperties {
455427
PlanProperties::new(
456428
data_source.eq_properties(),

datafusion/physical-expr/src/equivalence/properties/mod.rs

Lines changed: 19 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -326,6 +326,22 @@ impl EquivalenceProperties {
326326
self.add_orderings(std::iter::once(ordering));
327327
}
328328

329+
fn update_oeq_cache(&mut self) -> Result<()> {
330+
// Renormalize orderings if the equivalence group changes:
331+
let normal_cls = mem::take(&mut self.oeq_cache.normal_cls);
332+
let normal_orderings = normal_cls
333+
.into_iter()
334+
.map(|o| self.eq_group.normalize_sort_exprs(o));
335+
self.oeq_cache.normal_cls = OrderingEquivalenceClass::new(normal_orderings);
336+
self.oeq_cache.update_map();
337+
// Discover any new orderings based on the new equivalence classes:
338+
let leading_exprs: Vec<_> = self.oeq_cache.leading_map.keys().cloned().collect();
339+
for expr in leading_exprs {
340+
self.discover_new_orderings(expr)?;
341+
}
342+
Ok(())
343+
}
344+
329345
/// Incorporates the given equivalence group to into the existing
330346
/// equivalence group within.
331347
pub fn add_equivalence_group(
@@ -334,19 +350,7 @@ impl EquivalenceProperties {
334350
) -> Result<()> {
335351
if !other_eq_group.is_empty() {
336352
self.eq_group.extend(other_eq_group);
337-
// Renormalize orderings if the equivalence group changes:
338-
let normal_cls = mem::take(&mut self.oeq_cache.normal_cls);
339-
let normal_orderings = normal_cls
340-
.into_iter()
341-
.map(|o| self.eq_group.normalize_sort_exprs(o));
342-
self.oeq_cache.normal_cls = OrderingEquivalenceClass::new(normal_orderings);
343-
self.oeq_cache.update_map();
344-
// Discover any new orderings based on the new equivalence classes:
345-
let leading_exprs: Vec<_> =
346-
self.oeq_cache.leading_map.keys().cloned().collect();
347-
for expr in leading_exprs {
348-
self.discover_new_orderings(expr)?;
349-
}
353+
self.update_oeq_cache()?;
350354
}
351355
Ok(())
352356
}
@@ -373,16 +377,9 @@ impl EquivalenceProperties {
373377
) -> Result<()> {
374378
// Add equal expressions to the state:
375379
if self.eq_group.add_equal_conditions(Arc::clone(&left), right) {
376-
// Renormalize orderings if the equivalence group changes:
377-
let normal_cls = mem::take(&mut self.oeq_cache.normal_cls);
378-
let normal_orderings = normal_cls
379-
.into_iter()
380-
.map(|o| self.eq_group.normalize_sort_exprs(o));
381-
self.oeq_cache.normal_cls = OrderingEquivalenceClass::new(normal_orderings);
382-
self.oeq_cache.update_map();
383-
// Discover any new orderings:
384-
self.discover_new_orderings(left)?;
380+
self.update_oeq_cache()?;
385381
}
382+
self.update_oeq_cache()?;
386383
Ok(())
387384
}
388385

datafusion/proto/tests/cases/roundtrip_physical_plan.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1000,7 +1000,7 @@ fn roundtrip_parquet_exec_with_custom_predicate_expr() -> Result<()> {
10001000
self: Arc<Self>,
10011001
_children: Vec<Arc<dyn PhysicalExpr>>,
10021002
) -> Result<Arc<dyn PhysicalExpr>> {
1003-
todo!()
1003+
Ok(self)
10041004
}
10051005

10061006
fn fmt_sql(&self, f: &mut Formatter<'_>) -> std::fmt::Result {

datafusion/sqllogictest/test_files/parquet_filter_pushdown.slt

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -575,3 +575,12 @@ WHERE trace_id = '00000000000000000000000000000002'
575575
ORDER BY start_timestamp, trace_id;
576576
----
577577
staging
578+
579+
query P
580+
SELECT start_timestamp
581+
FROM t1
582+
WHERE trace_id = '00000000000000000000000000000002' AND deployment_environment = 'staging'
583+
ORDER BY start_timestamp, trace_id
584+
LIMIT 1;
585+
----
586+
2024-10-01T00:00:00Z

0 commit comments

Comments
 (0)