Skip to content

Commit 01234eb

Browse files
bert-beyondloopsBert Vermeirenalambxudong963
authored
Fix: Preserve sorting for the COPY TO plan (#16785)
* Fix: Preserve sorting for the COPY TO plan * Update datafusion/core/tests/dataframe/mod.rs Co-authored-by: Andrew Lamb <[email protected]> * Update mod.rs --------- Co-authored-by: Bert Vermeiren <[email protected]> Co-authored-by: Andrew Lamb <[email protected]> Co-authored-by: xudong.w <[email protected]>
1 parent 2a33c87 commit 01234eb

File tree

2 files changed

+68
-3
lines changed

2 files changed

+68
-3
lines changed

datafusion/core/src/physical_planner.rs

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -556,8 +556,15 @@ impl DefaultPhysicalPlanner {
556556
file_extension,
557557
};
558558

559+
let ordering = input_exec.properties().output_ordering().cloned();
560+
559561
sink_format
560-
.create_writer_physical_plan(input_exec, session_state, config, None)
562+
.create_writer_physical_plan(
563+
input_exec,
564+
session_state,
565+
config,
566+
ordering.map(Into::into),
567+
)
561568
.await?
562569
}
563570
LogicalPlan::Dml(DmlStatement {

datafusion/core/tests/dataframe/mod.rs

Lines changed: 60 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -68,15 +68,16 @@ use datafusion_common::{
6868
TableReference, UnnestOptions,
6969
};
7070
use datafusion_common_runtime::SpawnedTask;
71+
use datafusion_datasource::file_format::format_as_file_type;
7172
use datafusion_execution::config::SessionConfig;
7273
use datafusion_execution::runtime_env::RuntimeEnv;
7374
use datafusion_expr::expr::{FieldMetadata, GroupingSet, Sort, WindowFunction};
7475
use datafusion_expr::var_provider::{VarProvider, VarType};
7576
use datafusion_expr::{
7677
cast, col, create_udf, exists, in_subquery, lit, out_ref_col, placeholder,
7778
scalar_subquery, when, wildcard, Expr, ExprFunctionExt, ExprSchemable, LogicalPlan,
78-
ScalarFunctionImplementation, WindowFrame, WindowFrameBound, WindowFrameUnits,
79-
WindowFunctionDefinition,
79+
LogicalPlanBuilder, ScalarFunctionImplementation, SortExpr, WindowFrame,
80+
WindowFrameBound, WindowFrameUnits, WindowFunctionDefinition,
8081
};
8182
use datafusion_physical_expr::expressions::Column;
8283
use datafusion_physical_expr::Partitioning;
@@ -6193,3 +6194,60 @@ async fn test_copy_schema() -> Result<()> {
61936194
assert_logical_expr_schema_eq_physical_expr_schema(result).await?;
61946195
Ok(())
61956196
}
6197+
6198+
#[tokio::test]
6199+
async fn test_copy_to_preserves_order() -> Result<()> {
6200+
let tmp_dir = TempDir::new()?;
6201+
6202+
let session_state = SessionStateBuilder::new_with_default_features().build();
6203+
let session_ctx = SessionContext::new_with_state(session_state);
6204+
6205+
let target_path = tmp_dir.path().join("target_ordered.csv");
6206+
let csv_file_format = session_ctx
6207+
.state()
6208+
.get_file_format_factory("csv")
6209+
.map(format_as_file_type)
6210+
.unwrap();
6211+
6212+
let ordered_select_plan = LogicalPlanBuilder::values(vec![
6213+
vec![lit(1u64)],
6214+
vec![lit(10u64)],
6215+
vec![lit(20u64)],
6216+
vec![lit(100u64)],
6217+
])?
6218+
.sort(vec![SortExpr::new(col("column1"), false, true)])?
6219+
.build()?;
6220+
6221+
let copy_to_plan = LogicalPlanBuilder::copy_to(
6222+
ordered_select_plan,
6223+
target_path.to_str().unwrap().to_string(),
6224+
csv_file_format,
6225+
HashMap::new(),
6226+
vec![],
6227+
)?
6228+
.build()?;
6229+
6230+
let union_side_branch = LogicalPlanBuilder::values(vec![vec![lit(1u64)]])?.build()?;
6231+
let union_plan = LogicalPlanBuilder::from(copy_to_plan)
6232+
.union(union_side_branch)?
6233+
.build()?;
6234+
6235+
let frame = session_ctx.execute_logical_plan(union_plan).await?;
6236+
let physical_plan = frame.create_physical_plan().await?;
6237+
6238+
let physical_plan_format =
6239+
displayable(physical_plan.as_ref()).indent(true).to_string();
6240+
6241+
// Expect that input to the DataSinkExec is sorted correctly
6242+
assert_snapshot!(
6243+
physical_plan_format,
6244+
@r###"
6245+
UnionExec
6246+
DataSinkExec: sink=CsvSink(file_groups=[])
6247+
SortExec: expr=[column1@0 DESC], preserve_partitioning=[false]
6248+
DataSourceExec: partitions=1, partition_sizes=[1]
6249+
DataSourceExec: partitions=1, partition_sizes=[1]
6250+
"###
6251+
);
6252+
Ok(())
6253+
}

0 commit comments

Comments
 (0)