Skip to content

Commit 5861425

Browse files
committed
#17411 Relax constraint that file sort order must only reference individual columns
1 parent f70ded5 commit 5861425

File tree

11 files changed

+171
-108
lines changed

11 files changed

+171
-108
lines changed

datafusion/catalog/src/stream.rs

Lines changed: 18 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -28,18 +28,21 @@ use std::sync::Arc;
2828
use crate::{Session, TableProvider, TableProviderFactory};
2929
use arrow::array::{RecordBatch, RecordBatchReader, RecordBatchWriter};
3030
use arrow::datatypes::SchemaRef;
31-
use datafusion_common::{config_err, plan_err, Constraints, DataFusionError, Result};
31+
use datafusion_common::{
32+
config_err, plan_err, Constraints, DFSchema, DataFusionError, Result,
33+
};
3234
use datafusion_common_runtime::SpawnedTask;
3335
use datafusion_datasource::sink::{DataSink, DataSinkExec};
3436
use datafusion_execution::{SendableRecordBatchStream, TaskContext};
3537
use datafusion_expr::dml::InsertOp;
3638
use datafusion_expr::{CreateExternalTable, Expr, SortExpr, TableType};
37-
use datafusion_physical_expr::create_ordering;
39+
use datafusion_physical_expr::create_lex_orderings;
3840
use datafusion_physical_plan::stream::RecordBatchReceiverStreamBuilder;
3941
use datafusion_physical_plan::streaming::{PartitionStream, StreamingTableExec};
4042
use datafusion_physical_plan::{DisplayAs, DisplayFormatType, ExecutionPlan};
4143

4244
use async_trait::async_trait;
45+
use datafusion_physical_expr::equivalence::project_orderings;
4346
use futures::StreamExt;
4447

4548
/// A [`TableProviderFactory`] for [`StreamTable`]
@@ -321,24 +324,26 @@ impl TableProvider for StreamTable {
321324

322325
async fn scan(
323326
&self,
324-
_state: &dyn Session,
327+
state: &dyn Session,
325328
projection: Option<&Vec<usize>>,
326329
_filters: &[Expr],
327330
limit: Option<usize>,
328331
) -> Result<Arc<dyn ExecutionPlan>> {
329-
let projected_schema = match projection {
330-
Some(p) => {
331-
let projected = self.0.source.schema().project(p)?;
332-
create_ordering(&projected, &self.0.order)?
333-
}
334-
None => create_ordering(self.0.source.schema(), &self.0.order)?,
332+
let schema = self.0.source.schema();
333+
let df_schema = DFSchema::try_from(Arc::clone(schema))?;
334+
let mut sort_information =
335+
create_lex_orderings(&self.0.order, &df_schema, state.execution_props())?;
336+
337+
// If there is a projection on the source, we also need to project orderings
338+
if let Some(projection) = projection {
339+
sort_information = project_orderings(sort_information, schema, projection)?;
335340
};
336341

337342
Ok(Arc::new(StreamingTableExec::try_new(
338343
Arc::clone(self.0.source.schema()),
339344
vec![Arc::new(StreamRead(Arc::clone(&self.0))) as _],
340345
projection,
341-
projected_schema,
346+
sort_information,
342347
true,
343348
limit,
344349
)?))
@@ -351,7 +356,9 @@ impl TableProvider for StreamTable {
351356
_insert_op: InsertOp,
352357
) -> Result<Arc<dyn ExecutionPlan>> {
353358
let schema = self.0.source.schema();
354-
let orders = create_ordering(schema, &self.0.order)?;
359+
let df_schema = DFSchema::try_from(Arc::clone(schema))?;
360+
let orders =
361+
create_lex_orderings(&self.0.order, &df_schema, _state.execution_props())?;
355362
// It is sufficient to pass only one of the equivalent orderings:
356363
let ordering = orders.into_iter().next().map(Into::into);
357364

datafusion/core/src/datasource/listing/table.rs

Lines changed: 55 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ use super::{
2323
};
2424
use crate::{
2525
datasource::file_format::{file_compression_type::FileCompressionType, FileFormat},
26-
datasource::{create_ordering, physical_plan::FileSinkConfig},
26+
datasource::physical_plan::FileSinkConfig,
2727
execution::context::SessionState,
2828
};
2929
use arrow::datatypes::{DataType, Field, SchemaBuilder, SchemaRef};
@@ -32,7 +32,7 @@ use async_trait::async_trait;
3232
use datafusion_catalog::{Session, TableProvider};
3333
use datafusion_common::{
3434
config_datafusion_err, config_err, internal_err, plan_err, project_schema,
35-
stats::Precision, Constraints, DataFusionError, Result, SchemaExt,
35+
stats::Precision, Constraints, DFSchema, DataFusionError, Result, SchemaExt,
3636
};
3737
use datafusion_datasource::{
3838
compute_all_files_statistics,
@@ -45,16 +45,19 @@ use datafusion_execution::{
4545
cache::{cache_manager::FileStatisticsCache, cache_unit::DefaultFileStatisticsCache},
4646
config::SessionConfig,
4747
};
48+
use datafusion_expr::execution_props::ExecutionProps;
4849
use datafusion_expr::{
4950
dml::InsertOp, Expr, SortExpr, TableProviderFilterPushDown, TableType,
5051
};
52+
use datafusion_physical_expr::create_lex_orderings;
5153
use datafusion_physical_expr_adapter::PhysicalExprAdapterFactory;
5254
use datafusion_physical_expr_common::sort_expr::LexOrdering;
5355
use datafusion_physical_plan::{empty::EmptyExec, ExecutionPlan, Statistics};
5456
use futures::{future, stream, Stream, StreamExt, TryStreamExt};
5557
use itertools::Itertools;
5658
use object_store::ObjectStore;
5759
use std::{any::Any, collections::HashMap, str::FromStr, sync::Arc};
60+
5861
/// Indicates the source of the schema for a [`ListingTable`]
5962
// PartialEq required for assert_eq! in tests
6063
#[derive(Debug, Clone, Copy, PartialEq, Default)]
@@ -1126,8 +1129,12 @@ impl ListingTable {
11261129
}
11271130

11281131
/// If file_sort_order is specified, creates the appropriate physical expressions
1129-
fn try_create_output_ordering(&self) -> Result<Vec<LexOrdering>> {
1130-
create_ordering(&self.table_schema, &self.options.file_sort_order)
1132+
fn try_create_output_ordering(
1133+
&self,
1134+
execution_props: &ExecutionProps,
1135+
) -> Result<Vec<LexOrdering>> {
1136+
let df_schema = DFSchema::try_from(Arc::clone(&self.table_schema))?;
1137+
create_lex_orderings(&self.options.file_sort_order, &df_schema, execution_props)
11311138
}
11321139
}
11331140

@@ -1199,7 +1206,7 @@ impl TableProvider for ListingTable {
11991206
return Ok(Arc::new(EmptyExec::new(projected_schema)));
12001207
}
12011208

1202-
let output_ordering = self.try_create_output_ordering()?;
1209+
let output_ordering = self.try_create_output_ordering(state.execution_props())?;
12031210
match state
12041211
.config_options()
12051212
.execution
@@ -1334,7 +1341,7 @@ impl TableProvider for ListingTable {
13341341
file_extension: self.options().format.get_ext(),
13351342
};
13361343

1337-
let orderings = self.try_create_output_ordering()?;
1344+
let orderings = self.try_create_output_ordering(state.execution_props())?;
13381345
// It is sufficient to pass only one of the equivalent orderings:
13391346
let order_requirements = orderings.into_iter().next().map(Into::into);
13401347

@@ -1562,6 +1569,7 @@ mod tests {
15621569
SchemaAdapter, SchemaAdapterFactory, SchemaMapper,
15631570
};
15641571
use datafusion_expr::{BinaryExpr, LogicalPlanBuilder, Operator};
1572+
use datafusion_physical_expr::expressions::binary;
15651573
use datafusion_physical_expr::PhysicalSortExpr;
15661574
use datafusion_physical_plan::{collect, ExecutionPlanProperties};
15671575
use rstest::rstest;
@@ -1694,29 +1702,44 @@ mod tests {
16941702

16951703
use crate::datasource::file_format::parquet::ParquetFormat;
16961704
use datafusion_physical_plan::expressions::col as physical_col;
1705+
use datafusion_physical_plan::expressions::lit as physical_lit;
16971706
use std::ops::Add;
16981707

16991708
// (file_sort_order, expected_result)
17001709
let cases = vec![
1701-
(vec![], Ok(Vec::<LexOrdering>::new())),
1710+
(
1711+
vec![],
1712+
Ok::<Vec<LexOrdering>, DataFusionError>(Vec::<LexOrdering>::new()),
1713+
),
17021714
// sort expr, but non column
17031715
(
1704-
vec![vec![
1705-
col("int_col").add(lit(1)).sort(true, true),
1706-
]],
1707-
Err("Expected single column reference in sort_order[0][0], got int_col + Int32(1)"),
1716+
vec![vec![col("int_col").add(lit(1)).sort(true, true)]],
1717+
Ok(vec![[PhysicalSortExpr {
1718+
expr: binary(
1719+
physical_col("int_col", &schema).unwrap(),
1720+
Operator::Plus,
1721+
physical_lit(1),
1722+
&schema,
1723+
)
1724+
.unwrap(),
1725+
options: SortOptions {
1726+
descending: false,
1727+
nulls_first: true,
1728+
},
1729+
}]
1730+
.into()]),
17081731
),
17091732
// ok with one column
17101733
(
17111734
vec![vec![col("string_col").sort(true, false)]],
17121735
Ok(vec![[PhysicalSortExpr {
1713-
expr: physical_col("string_col", &schema).unwrap(),
1714-
options: SortOptions {
1715-
descending: false,
1716-
nulls_first: false,
1717-
},
1718-
}].into(),
1719-
])
1736+
expr: physical_col("string_col", &schema).unwrap(),
1737+
options: SortOptions {
1738+
descending: false,
1739+
nulls_first: false,
1740+
},
1741+
}]
1742+
.into()]),
17201743
),
17211744
// ok with two columns, different options
17221745
(
@@ -1725,14 +1748,18 @@ mod tests {
17251748
col("int_col").sort(false, true),
17261749
]],
17271750
Ok(vec![[
1728-
PhysicalSortExpr::new_default(physical_col("string_col", &schema).unwrap())
1729-
.asc()
1730-
.nulls_last(),
1731-
PhysicalSortExpr::new_default(physical_col("int_col", &schema).unwrap())
1732-
.desc()
1733-
.nulls_first()
1734-
].into(),
1735-
])
1751+
PhysicalSortExpr::new_default(
1752+
physical_col("string_col", &schema).unwrap(),
1753+
)
1754+
.asc()
1755+
.nulls_last(),
1756+
PhysicalSortExpr::new_default(
1757+
physical_col("int_col", &schema).unwrap(),
1758+
)
1759+
.desc()
1760+
.nulls_first(),
1761+
]
1762+
.into()]),
17361763
),
17371764
];
17381765

@@ -1745,7 +1772,8 @@ mod tests {
17451772

17461773
let table =
17471774
ListingTable::try_new(config.clone()).expect("Creating the table");
1748-
let ordering_result = table.try_create_output_ordering();
1775+
let ordering_result =
1776+
table.try_create_output_ordering(state.execution_props());
17491777

17501778
match (expected_result, ordering_result) {
17511779
(Ok(expected), Ok(result)) => {

datafusion/datasource/src/file_scan_config.rs

Lines changed: 34 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -18,11 +18,6 @@
1818
//! [`FileScanConfig`] to configure scanning of possibly partitioned
1919
//! file sources.
2020
21-
use std::{
22-
any::Any, borrow::Cow, collections::HashMap, fmt::Debug, fmt::Formatter,
23-
fmt::Result as FmtResult, marker::PhantomData, sync::Arc,
24-
};
25-
2621
use crate::file_groups::FileGroup;
2722
#[allow(unused_imports)]
2823
use crate::schema_adapter::SchemaAdapterFactory;
@@ -52,7 +47,7 @@ use datafusion_physical_expr::{expressions::Column, utils::reassign_predicate_co
5247
use datafusion_physical_expr::{EquivalenceProperties, Partitioning};
5348
use datafusion_physical_expr_adapter::PhysicalExprAdapterFactory;
5449
use datafusion_physical_expr_common::physical_expr::PhysicalExpr;
55-
use datafusion_physical_expr_common::sort_expr::{LexOrdering, PhysicalSortExpr};
50+
use datafusion_physical_expr_common::sort_expr::LexOrdering;
5651
use datafusion_physical_plan::projection::ProjectionExpr;
5752
use datafusion_physical_plan::{
5853
display::{display_orderings, ProjectSchemaDisplay},
@@ -63,7 +58,13 @@ use datafusion_physical_plan::{
6358
use datafusion_physical_plan::{
6459
filter::collect_columns_from_predicate, filter_pushdown::FilterPushdownPropagation,
6560
};
61+
use std::ops::Range;
62+
use std::{
63+
any::Any, borrow::Cow, collections::HashMap, fmt::Debug, fmt::Formatter,
64+
fmt::Result as FmtResult, marker::PhantomData, sync::Arc,
65+
};
6666

67+
use datafusion_physical_expr::equivalence::project_orderings;
6768
use datafusion_physical_plan::coop::cooperative;
6869
use datafusion_physical_plan::execution_plan::SchedulingType;
6970
use log::{debug, warn};
@@ -703,12 +704,14 @@ impl FileScanConfig {
703704
fn projection_indices(&self) -> Vec<usize> {
704705
match &self.projection {
705706
Some(proj) => proj.clone(),
706-
None => (0..self.file_schema.fields().len()
707-
+ self.table_partition_cols.len())
708-
.collect(),
707+
None => self.column_indices().collect(),
709708
}
710709
}
711710

711+
fn column_indices(&self) -> Range<usize> {
712+
0..self.file_schema.fields().len() + self.table_partition_cols.len()
713+
}
714+
712715
pub fn projected_stats(&self) -> Statistics {
713716
let statistics = self.file_source.statistics().unwrap();
714717

@@ -734,9 +737,17 @@ impl FileScanConfig {
734737
}
735738

736739
pub fn projected_schema(&self) -> Arc<Schema> {
740+
let full_schema = self.full_schema();
741+
if let Some(projection) = &self.projection {
742+
Arc::new(full_schema.project(projection).unwrap())
743+
} else {
744+
full_schema
745+
}
746+
}
747+
748+
fn full_schema(&self) -> Arc<Schema> {
737749
let table_fields: Vec<_> = self
738-
.projection_indices()
739-
.into_iter()
750+
.column_indices()
740751
.map(|idx| {
741752
if idx < self.file_schema.fields().len() {
742753
self.file_schema.field(idx).clone()
@@ -1365,30 +1376,16 @@ fn get_projected_output_ordering(
13651376
base_config: &FileScanConfig,
13661377
projected_schema: &SchemaRef,
13671378
) -> Vec<LexOrdering> {
1368-
let mut all_orderings = vec![];
1369-
for output_ordering in &base_config.output_ordering {
1370-
let mut new_ordering = vec![];
1371-
for PhysicalSortExpr { expr, options } in output_ordering.iter() {
1372-
if let Some(col) = expr.as_any().downcast_ref::<Column>() {
1373-
let name = col.name();
1374-
if let Some((idx, _)) = projected_schema.column_with_name(name) {
1375-
// Compute the new sort expression (with correct index) after projection:
1376-
new_ordering.push(PhysicalSortExpr::new(
1377-
Arc::new(Column::new(name, idx)),
1378-
*options,
1379-
));
1380-
continue;
1381-
}
1382-
}
1383-
// Cannot find expression in the projected_schema, stop iterating
1384-
// since rest of the orderings are violated
1385-
break;
1386-
}
1387-
1388-
let Some(new_ordering) = LexOrdering::new(new_ordering) else {
1389-
continue;
1390-
};
1379+
let Ok(projected_orderings) = project_orderings(
1380+
base_config.output_ordering.clone(),
1381+
&base_config.full_schema(),
1382+
&base_config.projection_indices(),
1383+
) else {
1384+
return vec![];
1385+
};
13911386

1387+
let mut all_orderings = vec![];
1388+
for output_ordering in projected_orderings {
13921389
// Check if any file groups are not sorted
13931390
if base_config.file_groups.iter().any(|group| {
13941391
if group.len() <= 1 {
@@ -1397,7 +1394,7 @@ fn get_projected_output_ordering(
13971394
}
13981395

13991396
let statistics = match MinMaxStatistics::new_from_files(
1400-
&new_ordering,
1397+
&output_ordering,
14011398
projected_schema,
14021399
base_config.projection.as_deref(),
14031400
group.iter(),
@@ -1420,7 +1417,7 @@ fn get_projected_output_ordering(
14201417
continue;
14211418
}
14221419

1423-
all_orderings.push(new_ordering);
1420+
all_orderings.push(output_ordering);
14241421
}
14251422
all_orderings
14261423
}
@@ -1459,6 +1456,7 @@ mod tests {
14591456
use datafusion_common::{assert_batches_eq, internal_err};
14601457
use datafusion_expr::SortExpr;
14611458
use datafusion_physical_expr::create_physical_sort_expr;
1459+
use datafusion_physical_expr_common::sort_expr::PhysicalSortExpr;
14621460

14631461
/// Returns the column names on the schema
14641462
pub fn columns(schema: &Schema) -> Vec<String> {

0 commit comments

Comments
 (0)