Skip to content

CopyTo plan loses ordering requirement during physical plan optimization #16784

@bert-beyondloops

Description

@bert-beyondloops

Describe the bug

Following logical plan :

    Union
      CopyTo: format=csv output_url=/var/folders/jj/jnb45q9s6h9cv9mxxw2bbr800000gn/T/.tmpxeMDM4/target_ordered.csv options: ()
        Sort: column1 DESC NULLS FIRST
          Values: (UInt64(1)), (UInt64(10)), (UInt64(20)), (UInt64(100))
      Values: (UInt64(1))

results in optimized physical plan :

 UnionExec
      DataSinkExec: sink=CsvSink(file_groups=[])
        DataSourceExec: partitions=1, partition_sizes=[1]
      DataSourceExec: partitions=1, partition_sizes=[1]

The sorting requirement has been faulty eliminated.

As a reference, at the starting point, the physical plan does include the SortExec plan before optimization on physical plan level takes place.

Input physical plan before optimizations:

UnionExec
      DataSinkExec: sink=CsvSink(file_groups=[])
        SortExec: expr=[column1@0 DESC], preserve_partitioning=[false]
          DataSourceExec: partitions=1, partition_sizes=[1]
      DataSourceExec: partitions=1, partition_sizes=[1]

To Reproduce

Adding rust test code to reproduce, since the above logical plan cannot be constructed via a SQL statement.
SQL does not allow using the COPY TO (DML) as another (sub) query, although with the dataframe API, it is possible.

#[tokio::test]
async fn test_copy_to_preserves_order() -> Result<()> {
    let tmp_dir = TempDir::new()?;

    let session_state = SessionStateBuilder::new_with_default_features().build();
    let session_ctx = SessionContext::new_with_state(session_state);

    let target_path = tmp_dir.path().join("target_ordered.csv");
    let csv_file_format = session_ctx
        .state()
        .get_file_format_factory("csv")
        .map(|file_format_factory| format_as_file_type(file_format_factory))
        .unwrap();

    let ordered_select_plan = LogicalPlanBuilder::values(vec![
        vec![lit(1u64)],
        vec![lit(10u64)],
        vec![lit(20u64)],
        vec![lit(100u64)],
    ])?
    .sort(vec![SortExpr::new(col("column1"), false, true)])?
    .build()?;

    let copy_to_plan = LogicalPlanBuilder::copy_to(
        ordered_select_plan,
        target_path.to_str().unwrap().to_string(),
        csv_file_format,
        HashMap::new(),
        vec![],
    )?
    .build()?;

    let union_side_branch = LogicalPlanBuilder::values(vec![vec![lit(1u64)]])?.build()?;
    let union_plan = LogicalPlanBuilder::from(copy_to_plan)
        .union(union_side_branch)?
        .build()?;

    let frame = session_ctx.execute_logical_plan(union_plan).await?;
    let physical_plan = frame.create_physical_plan().await?;

    let physical_plan_format =
        displayable(physical_plan.as_ref()).indent(true).to_string();

    assert_snapshot!(
        physical_plan_format,
        @r###"
    UnionExec
      DataSinkExec: sink=CsvSink(file_groups=[])
        SortExec: expr=[column1@0 DESC], preserve_partitioning=[false]
          DataSourceExec: partitions=1, partition_sizes=[1]
      DataSourceExec: partitions=1, partition_sizes=[1]
    "###
    );
    Ok(())
}

This test will fail.

Expected behavior

The data sink should write the data in the order specified.
The SortExec may not be eliminated.

The actual physical plan should look like :

UnionExec
      DataSinkExec: sink=CsvSink(file_groups=[])
        SortExec: expr=[column1@0 DESC], preserve_partitioning=[false]
          DataSourceExec: partitions=1, partition_sizes=[1]
      DataSourceExec: partitions=1, partition_sizes=[1]

Additional context

I had to use a UNION to trigger this bug. It seems that in the other cases via standard COPY TO SQL, the ordering is kept.

The OutputRequirementExec concept is in this case not pushed down below the DataSinkExec since somehow this algorithm only runs on single (non-multi) child plans?

Metadata

Metadata

Assignees

No one assigned

    Labels

    bugSomething isn't working

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions