From 9b7ac5557db517090aeb415cb5147a94eff784d8 Mon Sep 17 00:00:00 2001 From: Yongting You <2010youy01@gmail.com> Date: Sun, 7 Sep 2025 11:39:52 +0800 Subject: [PATCH 01/15] Make join selection configurable through `enable_*` and `prefer_*` options --- datafusion/common/src/config.rs | 29 ++- datafusion/core/src/physical_planner.rs | 80 ++----- .../core/src/physical_planner/join_planner.rs | 196 ++++++++++++++++ datafusion/core/tests/memory_limit/mod.rs | 2 +- .../tests/cases/roundtrip_physical_plan.rs | 2 +- .../sqllogictest/test_files/aggregate.slt | 1 - .../sqllogictest/test_files/explain_tree.slt | 2 +- .../test_files/information_schema.slt | 14 +- .../test_files/join_planner_preferences.slt | 220 ++++++++++++++++++ datafusion/sqllogictest/test_files/joins.slt | 6 + .../test_files/sort_merge_join.slt | 2 +- .../sqllogictest/test_files/tpch/tpch.slt | 2 +- docs/source/user-guide/configs.md | 7 +- 13 files changed, 488 insertions(+), 75 deletions(-) create mode 100644 datafusion/core/src/physical_planner/join_planner.rs create mode 100644 datafusion/sqllogictest/test_files/join_planner_preferences.slt diff --git a/datafusion/common/src/config.rs b/datafusion/common/src/config.rs index cdd8e72a06cc..3e74d406e3c8 100644 --- a/datafusion/common/src/config.rs +++ b/datafusion/common/src/config.rs @@ -822,9 +822,34 @@ config_namespace! { /// process to reorder the join keys pub top_down_join_key_reordering: bool, default = true - /// When set to true, the physical plan optimizer will prefer HashJoin over SortMergeJoin. + /// When set to true, the physical plan optimizer will prefer HashJoin when applicable. + /// If there are multiple preferred and applicable join types, the optimizer + /// will choose one based on heuristics. /// HashJoin can work more efficiently than SortMergeJoin but consumes more memory - pub prefer_hash_join: bool, default = true + pub prefer_hash_join: bool, default = false + + /// When set to true, the physical plan optimizer will prefer SortMergeJoin when applicable. + /// If there are multiple preferred and applicable join types, the optimizer + /// will choose one according to heuristics. + pub prefer_sort_merge_join: bool, default = false + + /// When set to true, the physical plan optimizer will prefer NestedLoopJoin + /// when applicable. + /// If there are multiple preferred and applicable join types, the optimizer + /// will choose one according to heuristics. + pub prefer_nested_loop_join: bool, default = false + + /// Enables planning HashJoin operators. If set to false, the optimizer will avoid + /// producing HashJoin plans and consider other join strategies instead. + pub enable_hash_join: bool, default = true + + /// Enables planning SortMergeJoin operators. If set to false, the optimizer will avoid + /// producing SortMergeJoin plans and consider other join strategies instead. + pub enable_sort_merge_join: bool, default = true + + /// Enables planning NestedLoopJoin operators. If set to false, the optimizer will avoid + /// producing NestedLoopJoin plans and consider other join strategies instead. + pub enable_nested_loop_join: bool, default = true /// The maximum estimated size in bytes for one input side of a HashJoin /// will be collected into a single partition diff --git a/datafusion/core/src/physical_planner.rs b/datafusion/core/src/physical_planner.rs index 6618d9495d78..0bd0a0014e7b 100644 --- a/datafusion/core/src/physical_planner.rs +++ b/datafusion/core/src/physical_planner.rs @@ -41,9 +41,7 @@ use crate::physical_plan::analyze::AnalyzeExec; use crate::physical_plan::explain::ExplainExec; use crate::physical_plan::filter::FilterExec; use crate::physical_plan::joins::utils as join_utils; -use crate::physical_plan::joins::{ - CrossJoinExec, HashJoinExec, NestedLoopJoinExec, PartitionMode, SortMergeJoinExec, -}; +// Join exec types used by join_planner module use crate::physical_plan::limit::{GlobalLimitExec, LocalLimitExec}; use crate::physical_plan::projection::{ProjectionExec, ProjectionExpr}; use crate::physical_plan::repartition::RepartitionExec; @@ -58,7 +56,6 @@ use crate::physical_plan::{ use crate::schema_equivalence::schema_satisfied_by; use arrow::array::{builder::StringBuilder, RecordBatch}; -use arrow::compute::SortOptions; use arrow::datatypes::{Schema, SchemaRef}; use datafusion_common::display::ToStringifiedPlan; use datafusion_common::tree_node::{ @@ -79,8 +76,8 @@ use datafusion_expr::expr_rewriter::unnormalize_cols; use datafusion_expr::logical_plan::builder::wrap_projection_for_join_if_necessary; use datafusion_expr::{ Analyze, DescribeTable, DmlStatement, Explain, ExplainFormat, Extension, FetchType, - Filter, JoinType, RecursiveQuery, SkipType, StringifiedPlan, WindowFrame, - WindowFrameBound, WriteOp, + Filter, RecursiveQuery, SkipType, StringifiedPlan, WindowFrame, WindowFrameBound, + WriteOp, }; use datafusion_physical_expr::aggregate::{AggregateExprBuilder, AggregateFunctionExpr}; use datafusion_physical_expr::expressions::{Column, Literal}; @@ -103,6 +100,10 @@ use itertools::{multiunzip, Itertools}; use log::debug; use tokio::sync::Mutex; +// Submodules +mod join_planner; +use self::join_planner::plan_join_exec; + /// Physical query planner that converts a `LogicalPlan` to an /// `ExecutionPlan` suitable for execution. #[async_trait] @@ -1165,64 +1166,15 @@ impl DefaultPhysicalPlanner { _ => None, }; - let prefer_hash_join = - session_state.config_options().optimizer.prefer_hash_join; - - let join: Arc = if join_on.is_empty() { - if join_filter.is_none() && matches!(join_type, JoinType::Inner) { - // cross join if there is no join conditions and no join filter set - Arc::new(CrossJoinExec::new(physical_left, physical_right)) - } else { - // there is no equal join condition, use the nested loop join - Arc::new(NestedLoopJoinExec::try_new( - physical_left, - physical_right, - join_filter, - join_type, - None, - )?) - } - } else if session_state.config().target_partitions() > 1 - && session_state.config().repartition_joins() - && !prefer_hash_join - { - // Use SortMergeJoin if hash join is not preferred - let join_on_len = join_on.len(); - Arc::new(SortMergeJoinExec::try_new( - physical_left, - physical_right, - join_on, - join_filter, - *join_type, - vec![SortOptions::default(); join_on_len], - *null_equality, - )?) - } else if session_state.config().target_partitions() > 1 - && session_state.config().repartition_joins() - && prefer_hash_join - { - Arc::new(HashJoinExec::try_new( - physical_left, - physical_right, - join_on, - join_filter, - join_type, - None, - PartitionMode::Auto, - *null_equality, - )?) - } else { - Arc::new(HashJoinExec::try_new( - physical_left, - physical_right, - join_on, - join_filter, - join_type, - None, - PartitionMode::CollectLeft, - *null_equality, - )?) - }; + let join: Arc = plan_join_exec( + session_state, + physical_left, + physical_right, + join_on, + join_filter, + join_type, + null_equality, + )?; // If plan was mutated previously then need to create the ExecutionPlan // for the new Projection that was applied on top. diff --git a/datafusion/core/src/physical_planner/join_planner.rs b/datafusion/core/src/physical_planner/join_planner.rs new file mode 100644 index 000000000000..4923bf83e844 --- /dev/null +++ b/datafusion/core/src/physical_planner/join_planner.rs @@ -0,0 +1,196 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::sync::Arc; + +use crate::error::Result; +use crate::execution::context::SessionState; +use crate::physical_plan::joins::utils as join_utils; +use crate::physical_plan::joins::{ + CrossJoinExec, HashJoinExec, NestedLoopJoinExec, PartitionMode, SortMergeJoinExec, +}; +use crate::physical_plan::ExecutionPlan; +use arrow::compute::SortOptions; +use datafusion_common::plan_err; +use datafusion_expr::JoinType; + +/// Build the appropriate join `ExecutionPlan` for the given join type, filter, and +/// configurations. +/// +/// For example, given an equi-join, the planner may execute it as a Nested Loop +/// Join, Hash Join, or another strategy. Configuration settings determine which +/// ExecutionPlan is used. +/// +/// # Strategy +/// - Step 1: Find all possible physical join types for the given join logical plan +/// - No join on keys and no filter => CrossJoin +/// - With equality? => HJ and SMJ(if with multiple partition) +/// TODO: The constraint on SMJ is added previously for optimization. Should +/// we remove it for configurability? +/// TODO: Allow NLJ for equal join for better configurability. +/// - Without equality? => NLJ +/// - Step 2: Filter the possible join types from step 1 according to the configuration +/// , by checking if they're enabled by options like `datafusion.optimizer.enable_hash_join` +/// - Step 3: Choose one according to the built-in heuristics and also the preference +/// in the configuration, e.g. `datafusion.optimizer.prefer_hash_join` +pub(super) fn plan_join_exec( + session_state: &SessionState, + physical_left: Arc, + physical_right: Arc, + join_on: join_utils::JoinOn, + join_filter: Option, + join_type: &JoinType, + null_equality: &datafusion_common::NullEquality, +) -> Result> { + // Short-circuit: handle pure cross join (existing behavior) + if join_on.is_empty() { + if join_filter.is_none() && matches!(join_type, JoinType::Inner) { + return Ok(Arc::new(CrossJoinExec::new(physical_left, physical_right))); + } + } + + // Step 1: Find possible join types for the given Logical Plan + // ---------------------------------------------------------------------- + + // Build the list of possible algorithms for this join + #[derive(Debug, Clone, Copy, PartialEq, Eq)] + enum Algo { + NLJ, + HJ, + SMJ, + } + + let cfg = &session_state.config_options().optimizer; + let can_smj = session_state.config().target_partitions() > 1 + && session_state.config().repartition_joins(); + + let mut possible: Vec = Vec::new(); + if join_on.is_empty() { + possible.push(Algo::NLJ); + } else { + possible.push(Algo::HJ); + if can_smj { + possible.push(Algo::SMJ); + } + } + + // Step 2: Filter the possible list according to enable flags from config + // ---------------------------------------------------------------------- + + // Filter by enable flags + let enabled_and_possible: Vec = possible + .iter() + .copied() + .filter(|a| match a { + Algo::NLJ => cfg.enable_nested_loop_join, + Algo::HJ => cfg.enable_hash_join, + Algo::SMJ => cfg.enable_sort_merge_join, + }) + .collect(); + + if enabled_and_possible.is_empty() { + return plan_err!( + "No enabled join algorithm is applicable for this join. Possible join types are {:?}. Try to enable them through configurations like `datafusion.optimizer.enable_hash_join`", &possible + ); + } + + // Step 2: Choose and plan the physical join type according to preference + // from the configuration, and also the built-in heuristics + // ---------------------------------------------------------------------- + + // Collect preferred algorithms + let mut preferred: Vec = Vec::new(); + if cfg.prefer_hash_join { + preferred.push(Algo::HJ); + } + if cfg.prefer_sort_merge_join { + preferred.push(Algo::SMJ); + } + if cfg.prefer_nested_loop_join { + preferred.push(Algo::NLJ); + } + + // Helper to pick by priority HJ > SMJ > NLJ + let pick_by_priority = |candidates: &[Algo]| -> Algo { + if candidates.iter().any(|a| *a == Algo::HJ) { + Algo::HJ + } else if candidates.iter().any(|a| *a == Algo::SMJ) { + Algo::SMJ + } else { + Algo::NLJ + } + }; + + // If there is overlap with preferred, use that; otherwise use enabled list by priority + let chosen = if !preferred.is_empty() { + let overlaps: Vec = enabled_and_possible + .iter() + .copied() + .filter(|a| preferred.contains(a)) + .collect(); + if !overlaps.is_empty() { + pick_by_priority(&overlaps) + } else { + pick_by_priority(&enabled_and_possible) + } + } else { + pick_by_priority(&enabled_and_possible) + }; + + match chosen { + Algo::NLJ => Ok(Arc::new(NestedLoopJoinExec::try_new( + physical_left, + physical_right, + join_filter, + join_type, + None, + )?)), + Algo::HJ => { + // Determine partition mode based solely on partitioning configuration + let partition_mode = if session_state.config().target_partitions() > 1 + && session_state.config().repartition_joins() + { + PartitionMode::Auto + } else { + PartitionMode::CollectLeft + }; + + Ok(Arc::new(HashJoinExec::try_new( + physical_left, + physical_right, + join_on, + join_filter, + join_type, + None, + partition_mode, + *null_equality, + )?)) + } + Algo::SMJ => { + let join_on_len = join_on.len(); + Ok(Arc::new(SortMergeJoinExec::try_new( + physical_left, + physical_right, + join_on, + join_filter, + *join_type, + vec![SortOptions::default(); join_on_len], + *null_equality, + )?)) + } + } +} diff --git a/datafusion/core/tests/memory_limit/mod.rs b/datafusion/core/tests/memory_limit/mod.rs index 554c30eb872e..a5d88e080338 100644 --- a/datafusion/core/tests/memory_limit/mod.rs +++ b/datafusion/core/tests/memory_limit/mod.rs @@ -174,7 +174,7 @@ async fn sort_merge_join_no_spill() { // Planner chooses MergeJoin only if number of partitions > 1 let config = SessionConfig::new() .with_target_partitions(2) - .set_bool("datafusion.optimizer.prefer_hash_join", false); + .set_bool("datafusion.optimizer.prefer_sort_merge_join", true); TestCase::new() .with_query( diff --git a/datafusion/proto/tests/cases/roundtrip_physical_plan.rs b/datafusion/proto/tests/cases/roundtrip_physical_plan.rs index a5357a132eef..264b333a8df0 100644 --- a/datafusion/proto/tests/cases/roundtrip_physical_plan.rs +++ b/datafusion/proto/tests/cases/roundtrip_physical_plan.rs @@ -2218,7 +2218,7 @@ async fn roundtrip_logical_plan_sort_merge_join() -> Result<()> { ) .await?; - ctx.sql("SET datafusion.optimizer.prefer_hash_join = false") + ctx.sql("SET datafusion.optimizer.prefer_sort_merge_join = true") .await? .show() .await?; diff --git a/datafusion/sqllogictest/test_files/aggregate.slt b/datafusion/sqllogictest/test_files/aggregate.slt index 35b2a6c03b39..1be2549ace71 100644 --- a/datafusion/sqllogictest/test_files/aggregate.slt +++ b/datafusion/sqllogictest/test_files/aggregate.slt @@ -7444,4 +7444,3 @@ NULL NULL statement ok drop table distinct_avg; - diff --git a/datafusion/sqllogictest/test_files/explain_tree.slt b/datafusion/sqllogictest/test_files/explain_tree.slt index 0df361a75bae..a21414a14dfc 100644 --- a/datafusion/sqllogictest/test_files/explain_tree.slt +++ b/datafusion/sqllogictest/test_files/explain_tree.slt @@ -1295,7 +1295,7 @@ physical_plan # Query with sort merge join. statement ok -set datafusion.optimizer.prefer_hash_join = false; +set datafusion.optimizer.prefer_sort_merge_join = true; query TT explain select * from hashjoin_datatype_table_t1 t1 join hashjoin_datatype_table_t2 t2 on t1.c1 = t2.c1 diff --git a/datafusion/sqllogictest/test_files/information_schema.slt b/datafusion/sqllogictest/test_files/information_schema.slt index fb2c89020112..773a154f2544 100644 --- a/datafusion/sqllogictest/test_files/information_schema.slt +++ b/datafusion/sqllogictest/test_files/information_schema.slt @@ -287,7 +287,10 @@ datafusion.optimizer.allow_symmetric_joins_without_pruning true datafusion.optimizer.default_filter_selectivity 20 datafusion.optimizer.enable_distinct_aggregation_soft_limit true datafusion.optimizer.enable_dynamic_filter_pushdown true +datafusion.optimizer.enable_hash_join true +datafusion.optimizer.enable_nested_loop_join true datafusion.optimizer.enable_round_robin_repartition true +datafusion.optimizer.enable_sort_merge_join true datafusion.optimizer.enable_topk_aggregation true datafusion.optimizer.enable_window_limits true datafusion.optimizer.expand_views_at_output false @@ -297,7 +300,9 @@ datafusion.optimizer.hash_join_single_partition_threshold_rows 131072 datafusion.optimizer.max_passes 3 datafusion.optimizer.prefer_existing_sort false datafusion.optimizer.prefer_existing_union false -datafusion.optimizer.prefer_hash_join true +datafusion.optimizer.prefer_hash_join false +datafusion.optimizer.prefer_nested_loop_join false +datafusion.optimizer.prefer_sort_merge_join false datafusion.optimizer.repartition_aggregations true datafusion.optimizer.repartition_file_min_size 10485760 datafusion.optimizer.repartition_file_scans true @@ -401,7 +406,10 @@ datafusion.optimizer.allow_symmetric_joins_without_pruning true Should DataFusio datafusion.optimizer.default_filter_selectivity 20 The default filter selectivity used by Filter Statistics when an exact selectivity cannot be determined. Valid values are between 0 (no selectivity) and 100 (all rows are selected). datafusion.optimizer.enable_distinct_aggregation_soft_limit true When set to true, the optimizer will push a limit operation into grouped aggregations which have no aggregate expressions, as a soft limit, emitting groups once the limit is reached, before all rows in the group are read. datafusion.optimizer.enable_dynamic_filter_pushdown true When set to true attempts to push down dynamic filters generated by operators into the file scan phase. For example, for a query such as `SELECT * FROM t ORDER BY timestamp DESC LIMIT 10`, the optimizer will attempt to push down the current top 10 timestamps that the TopK operator references into the file scans. This means that if we already have 10 timestamps in the year 2025 any files that only have timestamps in the year 2024 can be skipped / pruned at various stages in the scan. +datafusion.optimizer.enable_hash_join true Enables planning HashJoin operators. If set to false, the optimizer will avoid producing HashJoin plans and consider other join strategies instead. +datafusion.optimizer.enable_nested_loop_join true Enables planning NestedLoopJoin operators. If set to false, the optimizer will avoid producing NestedLoopJoin plans and consider other join strategies instead. datafusion.optimizer.enable_round_robin_repartition true When set to true, the physical plan optimizer will try to add round robin repartitioning to increase parallelism to leverage more CPU cores +datafusion.optimizer.enable_sort_merge_join true Enables planning SortMergeJoin operators. If set to false, the optimizer will avoid producing SortMergeJoin plans and consider other join strategies instead. datafusion.optimizer.enable_topk_aggregation true When set to true, the optimizer will attempt to perform limit operations during aggregations, if possible datafusion.optimizer.enable_window_limits true When set to true, the optimizer will attempt to push limit operations past window functions, if possible datafusion.optimizer.expand_views_at_output false When set to true, if the returned type is a view type then the output will be coerced to a non-view. Coerces `Utf8View` to `LargeUtf8`, and `BinaryView` to `LargeBinary`. @@ -411,7 +419,9 @@ datafusion.optimizer.hash_join_single_partition_threshold_rows 131072 The maximu datafusion.optimizer.max_passes 3 Number of times that the optimizer will attempt to optimize the plan datafusion.optimizer.prefer_existing_sort false When true, DataFusion will opportunistically remove sorts when the data is already sorted, (i.e. setting `preserve_order` to true on `RepartitionExec` and using `SortPreservingMergeExec`) When false, DataFusion will maximize plan parallelism using `RepartitionExec` even if this requires subsequently resorting data using a `SortExec`. datafusion.optimizer.prefer_existing_union false When set to true, the optimizer will not attempt to convert Union to Interleave -datafusion.optimizer.prefer_hash_join true When set to true, the physical plan optimizer will prefer HashJoin over SortMergeJoin. HashJoin can work more efficiently than SortMergeJoin but consumes more memory +datafusion.optimizer.prefer_hash_join false When set to true, the physical plan optimizer will prefer HashJoin when applicable. If there are multiple preferred and applicable join types, the optimizer will choose one based on heuristics. HashJoin can work more efficiently than SortMergeJoin but consumes more memory +datafusion.optimizer.prefer_nested_loop_join false When set to true, the physical plan optimizer will prefer NestedLoopJoin when applicable. If there are multiple preferred and applicable join types, the optimizer will choose one according to heuristics. +datafusion.optimizer.prefer_sort_merge_join false When set to true, the physical plan optimizer will prefer SortMergeJoin when applicable. If there are multiple preferred and applicable join types, the optimizer will choose one according to heuristics. datafusion.optimizer.repartition_aggregations true Should DataFusion repartition data using the aggregate keys to execute aggregates in parallel using the provided `target_partitions` level datafusion.optimizer.repartition_file_min_size 10485760 Minimum total files size in bytes to perform file scan repartitioning. datafusion.optimizer.repartition_file_scans true When set to `true`, datasource partitions will be repartitioned to achieve maximum parallelism. This applies to both in-memory partitions and FileSource's file groups (1 group is 1 partition). For FileSources, only Parquet and CSV formats are currently supported. If set to `true` for a FileSource, all files will be repartitioned evenly (i.e., a single large file might be partitioned into smaller chunks) for parallel scanning. If set to `false` for a FileSource, different files will be read in parallel, but repartitioning won't happen within a single file. If set to `true` for an in-memory source, all memtable's partitions will have their batches repartitioned evenly to the desired number of `target_partitions`. Repartitioning can change the total number of partitions and batches per partition, but does not slice the initial record tables provided to the MemTable on creation. diff --git a/datafusion/sqllogictest/test_files/join_planner_preferences.slt b/datafusion/sqllogictest/test_files/join_planner_preferences.slt new file mode 100644 index 000000000000..6228e736eb5d --- /dev/null +++ b/datafusion/sqllogictest/test_files/join_planner_preferences.slt @@ -0,0 +1,220 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +########## +## Join Planner Preferences & Enablement Tests +## +## These tests verify the interaction of the following configuration options: +## - datafusion.optimizer.enable_hash_join +## - datafusion.optimizer.enable_sort_merge_join +## - datafusion.optimizer.enable_nested_loop_join +## - datafusion.optimizer.prefer_hash_join +## - datafusion.optimizer.prefer_sort_merge_join +## - datafusion.optimizer.prefer_nested_loop_join +## +## Strategy: set configs via SQL, then EXPLAIN a simple equi-join and +## assert which physical join operator is chosen. +########## + +statement ok +set datafusion.explain.logical_plan_only = false; + +statement ok +set datafusion.explain.physical_plan_only = true; + +# Ensure SortMergeJoin is eligible when enabled and preferred +statement ok +set datafusion.execution.target_partitions = 2; + +statement ok +set datafusion.optimizer.repartition_joins = true; + +# Avoid plan noise from batch coalescing so the join node is top-level +statement ok +set datafusion.execution.coalesce_batches = false; + +statement ok +CREATE TABLE jt1(a int, b text) AS VALUES (1, 'x'), (2, 'y'), (3, 'z'); + +statement ok +CREATE TABLE jt2(a int, b text) AS VALUES (1, 'p'), (2, 'q'), (4, 'r'); + +########## +## Config 1 -- Enable all, no preference -> expect HashJoin +########## +statement ok +set datafusion.optimizer.enable_nested_loop_join = true; + +statement ok +set datafusion.optimizer.enable_sort_merge_join = true; + +statement ok +set datafusion.optimizer.enable_hash_join = true; + +statement ok +set datafusion.optimizer.prefer_nested_loop_join = false; + +statement ok +set datafusion.optimizer.prefer_sort_merge_join = false; + +statement ok +set datafusion.optimizer.prefer_hash_join = false; + +query TT +EXPLAIN SELECT * FROM jt1 JOIN jt2 ON jt1.a = jt2.a +---- +physical_plan +01)HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(a@0, a@0)] +02)--DataSourceExec: partitions=1, partition_sizes=[1] +03)--DataSourceExec: partitions=1, partition_sizes=[1] + +########## +## Config 2 -- Force SMJ (only SMJ enabled) -> expect SortMergeJoin +########## +statement ok +set datafusion.optimizer.enable_nested_loop_join = false; + +statement ok +set datafusion.optimizer.enable_sort_merge_join = true; + +statement ok +set datafusion.optimizer.enable_hash_join = false; + +statement ok +set datafusion.optimizer.prefer_nested_loop_join = false; + +statement ok +set datafusion.optimizer.prefer_sort_merge_join = true; + +statement ok +set datafusion.optimizer.prefer_hash_join = false; + +query TT +EXPLAIN SELECT * FROM jt1 JOIN jt2 ON jt1.a = jt2.a +---- +physical_plan +01)SortMergeJoin: join_type=Inner, on=[(a@0, a@0)] +02)--SortExec: expr=[a@0 ASC], preserve_partitioning=[false] +03)----DataSourceExec: partitions=1, partition_sizes=[1] +04)--SortExec: expr=[a@0 ASC], preserve_partitioning=[false] +05)----DataSourceExec: partitions=1, partition_sizes=[1] + +########## +## Config 3 -- Prefer SMJ, others allowed -> expect SortMergeJoin +########## +statement ok +set datafusion.optimizer.enable_nested_loop_join = true; + +statement ok +set datafusion.optimizer.enable_sort_merge_join = true; + +statement ok +set datafusion.optimizer.enable_hash_join = true; + +statement ok +set datafusion.optimizer.prefer_nested_loop_join = false; + +statement ok +set datafusion.optimizer.prefer_sort_merge_join = true; + +statement ok +set datafusion.optimizer.prefer_hash_join = false; + +query TT +EXPLAIN SELECT * FROM jt1 JOIN jt2 ON jt1.a = jt2.a +---- +physical_plan +01)SortMergeJoin: join_type=Inner, on=[(a@0, a@0)] +02)--SortExec: expr=[a@0 ASC], preserve_partitioning=[false] +03)----DataSourceExec: partitions=1, partition_sizes=[1] +04)--SortExec: expr=[a@0 ASC], preserve_partitioning=[false] +05)----DataSourceExec: partitions=1, partition_sizes=[1] + +########## +## Config 4 -- Prefer SMJ and HJ equally -> expect HashJoin (HJ > SMJ) +########## +statement ok +set datafusion.optimizer.enable_nested_loop_join = true; + +statement ok +set datafusion.optimizer.enable_sort_merge_join = true; + +statement ok +set datafusion.optimizer.enable_hash_join = true; + +statement ok +set datafusion.optimizer.prefer_nested_loop_join = false; + +statement ok +set datafusion.optimizer.prefer_sort_merge_join = true; + +statement ok +set datafusion.optimizer.prefer_hash_join = true; + +query TT +EXPLAIN SELECT * FROM jt1 JOIN jt2 ON jt1.a = jt2.a +---- +physical_plan +01)HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(a@0, a@0)] +02)--DataSourceExec: partitions=1, partition_sizes=[1] +03)--DataSourceExec: partitions=1, partition_sizes=[1] + +########## +## Config 5 -- Prefer disabled type (NLJ preferred but disabled) -> expect HashJoin +########## +statement ok +set datafusion.optimizer.enable_nested_loop_join = false; + +statement ok +set datafusion.optimizer.enable_sort_merge_join = true; + +statement ok +set datafusion.optimizer.enable_hash_join = true; + +statement ok +set datafusion.optimizer.prefer_nested_loop_join = true; + +statement ok +set datafusion.optimizer.prefer_sort_merge_join = false; + +statement ok +set datafusion.optimizer.prefer_hash_join = false; + +query TT +EXPLAIN SELECT * FROM jt1 JOIN jt2 ON jt1.a = jt2.a +---- +physical_plan +01)HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(a@0, a@0)] +02)--DataSourceExec: partitions=1, partition_sizes=[1] +03)--DataSourceExec: partitions=1, partition_sizes=[1] + +########## +## Config 6 -- No join type enabled -> expect planning error +########## +statement ok +set datafusion.optimizer.enable_nested_loop_join = false; + +statement ok +set datafusion.optimizer.enable_sort_merge_join = false; + +statement ok +set datafusion.optimizer.enable_hash_join = false; + +query TT +EXPLAIN SELECT * FROM jt1 JOIN jt2 ON jt1.a = jt2.a +---- +physical_plan_error Error during planning: No enabled join algorithm is applicable for this join. Possible join types are [HJ, SMJ]. Try to enable them through configurations like `datafusion.optimizer.enable_hash_join` diff --git a/datafusion/sqllogictest/test_files/joins.slt b/datafusion/sqllogictest/test_files/joins.slt index ad21bdac6d2d..009c9eb469b2 100644 --- a/datafusion/sqllogictest/test_files/joins.slt +++ b/datafusion/sqllogictest/test_files/joins.slt @@ -2726,6 +2726,9 @@ set datafusion.explain.logical_plan_only = false; statement ok set datafusion.optimizer.prefer_hash_join = false; +statement ok +set datafusion.optimizer.prefer_sort_merge_join = true; + statement ok set datafusion.execution.target_partitions = 2; @@ -3153,6 +3156,9 @@ set datafusion.explain.logical_plan_only = false; statement ok set datafusion.optimizer.prefer_hash_join = false; +statement ok +set datafusion.optimizer.prefer_sort_merge_join = true; + statement ok set datafusion.execution.target_partitions = 2; diff --git a/datafusion/sqllogictest/test_files/sort_merge_join.slt b/datafusion/sqllogictest/test_files/sort_merge_join.slt index ed463333217a..235da41af8fc 100644 --- a/datafusion/sqllogictest/test_files/sort_merge_join.slt +++ b/datafusion/sqllogictest/test_files/sort_merge_join.slt @@ -20,7 +20,7 @@ ########## statement ok -set datafusion.optimizer.prefer_hash_join = false; +set datafusion.optimizer.prefer_sort_merge_join = true; statement ok CREATE TABLE t1(a text, b int) AS VALUES ('Alice', 50), ('Alice', 100), ('Bob', 1); diff --git a/datafusion/sqllogictest/test_files/tpch/tpch.slt b/datafusion/sqllogictest/test_files/tpch/tpch.slt index c6d630997e29..a2eb899588ef 100644 --- a/datafusion/sqllogictest/test_files/tpch/tpch.slt +++ b/datafusion/sqllogictest/test_files/tpch/tpch.slt @@ -23,7 +23,7 @@ include ./answers/q*.slt.part # test answers with sort merge join statement ok -set datafusion.optimizer.prefer_hash_join = false; +set datafusion.optimizer.prefer_sort_merge_join = true; include ./answers/q*.slt.part diff --git a/docs/source/user-guide/configs.md b/docs/source/user-guide/configs.md index 5060bc3805fd..b2d6d3caf929 100644 --- a/docs/source/user-guide/configs.md +++ b/docs/source/user-guide/configs.md @@ -143,7 +143,12 @@ The following configuration settings are available: | datafusion.optimizer.skip_failed_rules | false | When set to true, the logical plan optimizer will produce warning messages if any optimization rules produce errors and then proceed to the next rule. When set to false, any rules that produce errors will cause the query to fail | | datafusion.optimizer.max_passes | 3 | Number of times that the optimizer will attempt to optimize the plan | | datafusion.optimizer.top_down_join_key_reordering | true | When set to true, the physical plan optimizer will run a top down process to reorder the join keys | -| datafusion.optimizer.prefer_hash_join | true | When set to true, the physical plan optimizer will prefer HashJoin over SortMergeJoin. HashJoin can work more efficiently than SortMergeJoin but consumes more memory | +| datafusion.optimizer.prefer_hash_join | false | When set to true, the physical plan optimizer will prefer HashJoin when applicable. If there are multiple preferred and applicable join types, the optimizer will choose one based on heuristics. HashJoin can work more efficiently than SortMergeJoin but consumes more memory | +| datafusion.optimizer.prefer_sort_merge_join | false | When set to true, the physical plan optimizer will prefer SortMergeJoin when applicable. If there are multiple preferred and applicable join types, the optimizer will choose one according to heuristics. | +| datafusion.optimizer.prefer_nested_loop_join | false | When set to true, the physical plan optimizer will prefer NestedLoopJoin when applicable. If there are multiple preferred and applicable join types, the optimizer will choose one according to heuristics. | +| datafusion.optimizer.enable_hash_join | true | Enables planning HashJoin operators. If set to false, the optimizer will avoid producing HashJoin plans and consider other join strategies instead. | +| datafusion.optimizer.enable_sort_merge_join | true | Enables planning SortMergeJoin operators. If set to false, the optimizer will avoid producing SortMergeJoin plans and consider other join strategies instead. | +| datafusion.optimizer.enable_nested_loop_join | true | Enables planning NestedLoopJoin operators. If set to false, the optimizer will avoid producing NestedLoopJoin plans and consider other join strategies instead. | | datafusion.optimizer.hash_join_single_partition_threshold | 1048576 | The maximum estimated size in bytes for one input side of a HashJoin will be collected into a single partition | | datafusion.optimizer.hash_join_single_partition_threshold_rows | 131072 | The maximum estimated size in rows for one input side of a HashJoin will be collected into a single partition | | datafusion.optimizer.default_filter_selectivity | 20 | The default filter selectivity used by Filter Statistics when an exact selectivity cannot be determined. Valid values are between 0 (no selectivity) and 100 (all rows are selected). | From bde1a65f5367ec950363f02b475d0432f31850b1 Mon Sep 17 00:00:00 2001 From: Yongting You <2010youy01@gmail.com> Date: Mon, 8 Sep 2025 13:57:03 +0800 Subject: [PATCH 02/15] clean up --- datafusion/core/src/physical_planner.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/datafusion/core/src/physical_planner.rs b/datafusion/core/src/physical_planner.rs index 0bd0a0014e7b..aa2f0c47a597 100644 --- a/datafusion/core/src/physical_planner.rs +++ b/datafusion/core/src/physical_planner.rs @@ -41,7 +41,6 @@ use crate::physical_plan::analyze::AnalyzeExec; use crate::physical_plan::explain::ExplainExec; use crate::physical_plan::filter::FilterExec; use crate::physical_plan::joins::utils as join_utils; -// Join exec types used by join_planner module use crate::physical_plan::limit::{GlobalLimitExec, LocalLimitExec}; use crate::physical_plan::projection::{ProjectionExec, ProjectionExpr}; use crate::physical_plan::repartition::RepartitionExec; From 521e51b4ef5d916a50baaa2fa7c6f823fc2d1749 Mon Sep 17 00:00:00 2001 From: Yongting You <2010youy01@gmail.com> Date: Mon, 8 Sep 2025 14:05:23 +0800 Subject: [PATCH 03/15] Update datafusion/core/src/physical_planner/join_planner.rs Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --- datafusion/core/src/physical_planner/join_planner.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/datafusion/core/src/physical_planner/join_planner.rs b/datafusion/core/src/physical_planner/join_planner.rs index 4923bf83e844..b52b506624cb 100644 --- a/datafusion/core/src/physical_planner/join_planner.rs +++ b/datafusion/core/src/physical_planner/join_planner.rs @@ -108,7 +108,7 @@ pub(super) fn plan_join_exec( ); } - // Step 2: Choose and plan the physical join type according to preference + // Step 3: Choose and plan the physical join type according to preference // from the configuration, and also the built-in heuristics // ---------------------------------------------------------------------- From abe8e2ae42920998329399696e1642461ffbf4d2 Mon Sep 17 00:00:00 2001 From: Yongting You <2010youy01@gmail.com> Date: Mon, 8 Sep 2025 16:57:49 +0800 Subject: [PATCH 04/15] fix clippy --- .../core/src/physical_planner/join_planner.rs | 61 ++++++++++--------- 1 file changed, 31 insertions(+), 30 deletions(-) diff --git a/datafusion/core/src/physical_planner/join_planner.rs b/datafusion/core/src/physical_planner/join_planner.rs index b52b506624cb..04ac9f249cb1 100644 --- a/datafusion/core/src/physical_planner/join_planner.rs +++ b/datafusion/core/src/physical_planner/join_planner.rs @@ -38,15 +38,15 @@ use datafusion_expr::JoinType; /// # Strategy /// - Step 1: Find all possible physical join types for the given join logical plan /// - No join on keys and no filter => CrossJoin -/// - With equality? => HJ and SMJ(if with multiple partition) -/// TODO: The constraint on SMJ is added previously for optimization. Should -/// we remove it for configurability? -/// TODO: Allow NLJ for equal join for better configurability. +/// - With equality? => HJ and SMJ (if with multiple partition) +/// TODO: The constraint on SMJ is added previously for optimization. Should +/// we remove it for configurability? +/// TODO: Allow NLJ for equal join for better configurability. /// - Without equality? => NLJ /// - Step 2: Filter the possible join types from step 1 according to the configuration -/// , by checking if they're enabled by options like `datafusion.optimizer.enable_hash_join` +/// by checking if they're enabled by options like `datafusion.optimizer.enable_hash_join` /// - Step 3: Choose one according to the built-in heuristics and also the preference -/// in the configuration, e.g. `datafusion.optimizer.prefer_hash_join` +/// in the configuration, e.g. `datafusion.optimizer.prefer_hash_join` pub(super) fn plan_join_exec( session_state: &SessionState, physical_left: Arc, @@ -57,10 +57,11 @@ pub(super) fn plan_join_exec( null_equality: &datafusion_common::NullEquality, ) -> Result> { // Short-circuit: handle pure cross join (existing behavior) - if join_on.is_empty() { - if join_filter.is_none() && matches!(join_type, JoinType::Inner) { - return Ok(Arc::new(CrossJoinExec::new(physical_left, physical_right))); - } + if join_on.is_empty() + && join_filter.is_none() + && matches!(join_type, JoinType::Inner) + { + return Ok(Arc::new(CrossJoinExec::new(physical_left, physical_right))); } // Step 1: Find possible join types for the given Logical Plan @@ -69,9 +70,9 @@ pub(super) fn plan_join_exec( // Build the list of possible algorithms for this join #[derive(Debug, Clone, Copy, PartialEq, Eq)] enum Algo { - NLJ, - HJ, - SMJ, + Nlj, + Hj, + Smj, } let cfg = &session_state.config_options().optimizer; @@ -80,11 +81,11 @@ pub(super) fn plan_join_exec( let mut possible: Vec = Vec::new(); if join_on.is_empty() { - possible.push(Algo::NLJ); + possible.push(Algo::Nlj); } else { - possible.push(Algo::HJ); + possible.push(Algo::Hj); if can_smj { - possible.push(Algo::SMJ); + possible.push(Algo::Smj); } } @@ -96,9 +97,9 @@ pub(super) fn plan_join_exec( .iter() .copied() .filter(|a| match a { - Algo::NLJ => cfg.enable_nested_loop_join, - Algo::HJ => cfg.enable_hash_join, - Algo::SMJ => cfg.enable_sort_merge_join, + Algo::Nlj => cfg.enable_nested_loop_join, + Algo::Hj => cfg.enable_hash_join, + Algo::Smj => cfg.enable_sort_merge_join, }) .collect(); @@ -115,23 +116,23 @@ pub(super) fn plan_join_exec( // Collect preferred algorithms let mut preferred: Vec = Vec::new(); if cfg.prefer_hash_join { - preferred.push(Algo::HJ); + preferred.push(Algo::Hj); } if cfg.prefer_sort_merge_join { - preferred.push(Algo::SMJ); + preferred.push(Algo::Smj); } if cfg.prefer_nested_loop_join { - preferred.push(Algo::NLJ); + preferred.push(Algo::Nlj); } // Helper to pick by priority HJ > SMJ > NLJ let pick_by_priority = |candidates: &[Algo]| -> Algo { - if candidates.iter().any(|a| *a == Algo::HJ) { - Algo::HJ - } else if candidates.iter().any(|a| *a == Algo::SMJ) { - Algo::SMJ + if candidates.contains(&Algo::Hj) { + Algo::Hj + } else if candidates.contains(&Algo::Smj) { + Algo::Smj } else { - Algo::NLJ + Algo::Nlj } }; @@ -152,14 +153,14 @@ pub(super) fn plan_join_exec( }; match chosen { - Algo::NLJ => Ok(Arc::new(NestedLoopJoinExec::try_new( + Algo::Nlj => Ok(Arc::new(NestedLoopJoinExec::try_new( physical_left, physical_right, join_filter, join_type, None, )?)), - Algo::HJ => { + Algo::Hj => { // Determine partition mode based solely on partitioning configuration let partition_mode = if session_state.config().target_partitions() > 1 && session_state.config().repartition_joins() @@ -180,7 +181,7 @@ pub(super) fn plan_join_exec( *null_equality, )?)) } - Algo::SMJ => { + Algo::Smj => { let join_on_len = join_on.len(); Ok(Arc::new(SortMergeJoinExec::try_new( physical_left, From 941be551861e87c6add1d50085863f3f98e255dd Mon Sep 17 00:00:00 2001 From: Yongting You <2010youy01@gmail.com> Date: Mon, 8 Sep 2025 18:13:17 +0800 Subject: [PATCH 05/15] fix test --- datafusion/sqllogictest/test_files/join_planner_preferences.slt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/datafusion/sqllogictest/test_files/join_planner_preferences.slt b/datafusion/sqllogictest/test_files/join_planner_preferences.slt index 6228e736eb5d..9dbba8a27d4f 100644 --- a/datafusion/sqllogictest/test_files/join_planner_preferences.slt +++ b/datafusion/sqllogictest/test_files/join_planner_preferences.slt @@ -217,4 +217,4 @@ set datafusion.optimizer.enable_hash_join = false; query TT EXPLAIN SELECT * FROM jt1 JOIN jt2 ON jt1.a = jt2.a ---- -physical_plan_error Error during planning: No enabled join algorithm is applicable for this join. Possible join types are [HJ, SMJ]. Try to enable them through configurations like `datafusion.optimizer.enable_hash_join` +physical_plan_error Error during planning: No enabled join algorithm is applicable for this join. Possible join types are [Hj, Smj]. Try to enable them through configurations like `datafusion.optimizer.enable_hash_join` From 60437162d118ede2dd3c120b20426895d2b207a9 Mon Sep 17 00:00:00 2001 From: Yongting You <2010youy01@gmail.com> Date: Mon, 8 Sep 2025 19:47:17 +0800 Subject: [PATCH 06/15] fix fmt --- datafusion/core/src/physical_planner/join_planner.rs | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/datafusion/core/src/physical_planner/join_planner.rs b/datafusion/core/src/physical_planner/join_planner.rs index 04ac9f249cb1..23d530908bae 100644 --- a/datafusion/core/src/physical_planner/join_planner.rs +++ b/datafusion/core/src/physical_planner/join_planner.rs @@ -57,9 +57,7 @@ pub(super) fn plan_join_exec( null_equality: &datafusion_common::NullEquality, ) -> Result> { // Short-circuit: handle pure cross join (existing behavior) - if join_on.is_empty() - && join_filter.is_none() - && matches!(join_type, JoinType::Inner) + if join_on.is_empty() && join_filter.is_none() && matches!(join_type, JoinType::Inner) { return Ok(Arc::new(CrossJoinExec::new(physical_left, physical_right))); } From 3bf325b7411c088a004c9d26f6c55f99ff2aac08 Mon Sep 17 00:00:00 2001 From: Yongting You <2010youy01@gmail.com> Date: Sat, 13 Sep 2025 19:55:45 +0800 Subject: [PATCH 07/15] use `join_method_priority` instead --- benchmarks/src/imdb/run.rs | 1 + benchmarks/src/tpch/run.rs | 1 + datafusion/common/src/config.rs | 40 +++-- .../core/src/physical_planner/join_planner.rs | 87 ++++++---- datafusion/core/tests/dataframe/mod.rs | 8 +- datafusion/core/tests/memory_limit/mod.rs | 4 +- .../sql/.explain_analyze.rs.pending-snap | 33 ++++ .../tests/cases/roundtrip_physical_plan.rs | 2 +- .../sqllogictest/test_files/explain_tree.slt | 4 +- .../test_files/information_schema.slt | 10 +- .../test_files/join_planner_preferences.slt | 161 ++++++++++++++---- datafusion/sqllogictest/test_files/joins.slt | 18 +- .../test_files/sort_merge_join.slt | 4 +- .../sqllogictest/test_files/tpch/tpch.slt | 2 +- 14 files changed, 265 insertions(+), 110 deletions(-) create mode 100644 datafusion/core/tests/sql/.explain_analyze.rs.pending-snap diff --git a/benchmarks/src/imdb/run.rs b/benchmarks/src/imdb/run.rs index 90e0947f64f6..2d3d399a3764 100644 --- a/benchmarks/src/imdb/run.rs +++ b/benchmarks/src/imdb/run.rs @@ -300,6 +300,7 @@ impl RunOpt { Ok(()) } + #[allow(deprecated)] async fn benchmark_query(&self, query_id: usize) -> Result> { let mut config = self .common diff --git a/benchmarks/src/tpch/run.rs b/benchmarks/src/tpch/run.rs index 30ecb4d33baa..e026219b2ac3 100644 --- a/benchmarks/src/tpch/run.rs +++ b/benchmarks/src/tpch/run.rs @@ -99,6 +99,7 @@ pub struct RunOpt { } impl RunOpt { + #[allow(deprecated)] pub async fn run(self) -> Result<()> { println!("Running benchmarks with the following options: {self:?}"); let query_range = match self.query { diff --git a/datafusion/common/src/config.rs b/datafusion/common/src/config.rs index 3e74d406e3c8..5d1eef83c437 100644 --- a/datafusion/common/src/config.rs +++ b/datafusion/common/src/config.rs @@ -826,18 +826,36 @@ config_namespace! { /// If there are multiple preferred and applicable join types, the optimizer /// will choose one based on heuristics. /// HashJoin can work more efficiently than SortMergeJoin but consumes more memory - pub prefer_hash_join: bool, default = false - - /// When set to true, the physical plan optimizer will prefer SortMergeJoin when applicable. - /// If there are multiple preferred and applicable join types, the optimizer - /// will choose one according to heuristics. - pub prefer_sort_merge_join: bool, default = false + /// + /// Note: if `join_method_priority` is set, this configuration will be overriden + #[deprecated(since = "51.0.0", note = "Please use configuration option `join_method_priority` instead")] + pub prefer_hash_join: bool, default = true - /// When set to true, the physical plan optimizer will prefer NestedLoopJoin - /// when applicable. - /// If there are multiple preferred and applicable join types, the optimizer - /// will choose one according to heuristics. - pub prefer_nested_loop_join: bool, default = false + /// Comma-separated priority list of join algorithms to try, in order. + /// + /// Supported join methods (case-insensitive): + /// - "hj", "hash_join" + /// - "smj", "sort_merge_join" + /// - "nlj", "nested_loop_join" + /// + /// The planner picks the first algorithm in this list that is both + /// applicable to the current join and enabled via the corresponding + /// `enable_*_join` flags. If none match, a default heuristic order + /// is used as a fallback. + /// + /// Examples: + /// - `hj, nlj`: prefer Hash Join; if not applicable, try Nested Loop Join next. + /// - `smj`: prefer Sort-Merge Join; if disabled or not applicable, fall back to + /// the default heuristic order. + /// - `hj, nlj`: on an non-equi-join, Hash Join is not applicable, so Nested + /// Loop Join is chosen if enabled. + /// + /// Notes: + /// - Tokens are case-insensitive; unknown tokens cause a validation error + /// - When set, this option supersedes legacy configuration `prefer_hash_join` + /// , that is deprecated and ignored when `join_method_priority` is + /// provided(not empty string ""). + pub join_method_priority: String, transform = str::to_lowercase, default = "".to_string() /// Enables planning HashJoin operators. If set to false, the optimizer will avoid /// producing HashJoin plans and consider other join strategies instead. diff --git a/datafusion/core/src/physical_planner/join_planner.rs b/datafusion/core/src/physical_planner/join_planner.rs index 23d530908bae..aaa44e20c41a 100644 --- a/datafusion/core/src/physical_planner/join_planner.rs +++ b/datafusion/core/src/physical_planner/join_planner.rs @@ -25,7 +25,7 @@ use crate::physical_plan::joins::{ }; use crate::physical_plan::ExecutionPlan; use arrow::compute::SortOptions; -use datafusion_common::plan_err; +use datafusion_common::{config_err, plan_err}; use datafusion_expr::JoinType; /// Build the appropriate join `ExecutionPlan` for the given join type, filter, and @@ -46,7 +46,7 @@ use datafusion_expr::JoinType; /// - Step 2: Filter the possible join types from step 1 according to the configuration /// by checking if they're enabled by options like `datafusion.optimizer.enable_hash_join` /// - Step 3: Choose one according to the built-in heuristics and also the preference -/// in the configuration, e.g. `datafusion.optimizer.prefer_hash_join` +/// in the configuration `datafusion.optimizer.join_method_priority` pub(super) fn plan_join_exec( session_state: &SessionState, physical_left: Arc, @@ -107,49 +107,66 @@ pub(super) fn plan_join_exec( ); } - // Step 3: Choose and plan the physical join type according to preference - // from the configuration, and also the built-in heuristics + // Step 3: Choose and plan the physical join type according to + // `join_method_priority` and built-in heuristics // ---------------------------------------------------------------------- - // Collect preferred algorithms - let mut preferred: Vec = Vec::new(); - if cfg.prefer_hash_join { - preferred.push(Algo::Hj); - } - if cfg.prefer_sort_merge_join { - preferred.push(Algo::Smj); - } - if cfg.prefer_nested_loop_join { - preferred.push(Algo::Nlj); - } - - // Helper to pick by priority HJ > SMJ > NLJ - let pick_by_priority = |candidates: &[Algo]| -> Algo { - if candidates.contains(&Algo::Hj) { - Algo::Hj - } else if candidates.contains(&Algo::Smj) { - Algo::Smj - } else { - Algo::Nlj + // Parse join method priority string into an ordered list of algorithms + let parse_priority = |s: &str| -> Result> { + let mut out = Vec::new(); + let mut unknown = Vec::new(); + for token in s.split(',').map(|t| t.trim()).filter(|t| !t.is_empty()) { + match token { + "hj" | "hash_join" => out.push(Algo::Hj), + "smj" | "sort_merge_join" => out.push(Algo::Smj), + "nlj" | "nested_loop_join" => out.push(Algo::Nlj), + other => unknown.push(other.to_string()), + } + } + if !unknown.is_empty() { + let valid = "hj/hash_join, smj/sort_merge_join, nlj/nested_loop_join"; + return config_err!( + "Invalid join method(s) in datafusion.optimizer.join_method_priority: {}. Valid values: {}", + unknown.join(", "), + valid + ); } + Ok(out) }; - // If there is overlap with preferred, use that; otherwise use enabled list by priority - let chosen = if !preferred.is_empty() { - let overlaps: Vec = enabled_and_possible - .iter() - .copied() - .filter(|a| preferred.contains(a)) - .collect(); - if !overlaps.is_empty() { - pick_by_priority(&overlaps) + // Backward compatibility: + // If `join_method_priority` is empty, honor legacy `prefer_hash_join` by + // setting the priority to a single entry accordingly. Otherwise, parse the + // provided priority string. + let priority = if cfg.join_method_priority.trim().is_empty() { + #[allow(deprecated)] + if cfg.prefer_hash_join { + vec![Algo::Hj] } else { - pick_by_priority(&enabled_and_possible) + vec![Algo::Smj] } } else { - pick_by_priority(&enabled_and_possible) + parse_priority(&cfg.join_method_priority)? }; + // Default heuristic order if priority is empty or does not match any candidate + let default_order = [Algo::Hj, Algo::Smj, Algo::Nlj]; + + // Helper: pick the first algorithm in `order` that is in `candidates` + let pick_in_order = |candidates: &[Algo], order: &[Algo]| -> Option { + for algo in order { + if candidates.contains(algo) { + return Some(*algo); + } + } + None + }; + + // Intersect enabled+possible with priority order first; otherwise fallback to default order + let chosen = pick_in_order(&enabled_and_possible, &priority) + .or_else(|| pick_in_order(&enabled_and_possible, &default_order)) + .expect("enabled_and_possible is non-empty"); + match chosen { Algo::Nlj => Ok(Arc::new(NestedLoopJoinExec::try_new( physical_left, diff --git a/datafusion/core/tests/dataframe/mod.rs b/datafusion/core/tests/dataframe/mod.rs index a563459f42a1..95e5e2f44013 100644 --- a/datafusion/core/tests/dataframe/mod.rs +++ b/datafusion/core/tests/dataframe/mod.rs @@ -128,7 +128,9 @@ async fn assert_logical_expr_schema_eq_physical_expr_schema(df: DataFrame) -> Re #[tokio::test] async fn test_array_agg_ord_schema() -> Result<()> { - let ctx = SessionContext::new(); + let cfg = + SessionConfig::new().set_str("datafusion.optimizer.join_method_priority", "hj"); + let ctx = SessionContext::new_with_config(cfg); let create_table_query = r#" CREATE TABLE test_table ( @@ -4740,7 +4742,9 @@ fn create_join_context() -> Result { ], )?; - let config = SessionConfig::new().with_target_partitions(4); + let config = SessionConfig::new() + .with_target_partitions(4) + .set_str("datafusion.optimizer.join_method_priority", "hj"); let ctx = SessionContext::new_with_config(config); // let ctx = SessionContext::new(); diff --git a/datafusion/core/tests/memory_limit/mod.rs b/datafusion/core/tests/memory_limit/mod.rs index a5d88e080338..1bf5959eb33e 100644 --- a/datafusion/core/tests/memory_limit/mod.rs +++ b/datafusion/core/tests/memory_limit/mod.rs @@ -174,7 +174,7 @@ async fn sort_merge_join_no_spill() { // Planner chooses MergeJoin only if number of partitions > 1 let config = SessionConfig::new() .with_target_partitions(2) - .set_bool("datafusion.optimizer.prefer_sort_merge_join", true); + .set_str("datafusion.optimizer.join_method_priority", "smj"); TestCase::new() .with_query( @@ -197,7 +197,7 @@ async fn sort_merge_join_spill() { // Planner chooses MergeJoin only if number of partitions > 1 let config = SessionConfig::new() .with_target_partitions(2) - .set_bool("datafusion.optimizer.prefer_hash_join", false); + .set_str("datafusion.optimizer.join_method_priority", "smj"); TestCase::new() .with_query( diff --git a/datafusion/core/tests/sql/.explain_analyze.rs.pending-snap b/datafusion/core/tests/sql/.explain_analyze.rs.pending-snap new file mode 100644 index 000000000000..7ed67241fbfe --- /dev/null +++ b/datafusion/core/tests/sql/.explain_analyze.rs.pending-snap @@ -0,0 +1,33 @@ +{"run_id":"481ba350-a37e-4b16-86e9-e45d01d1b0ef","line":652,"new":{"module_name":"core_integration__sql__explain_analyze","snapshot_name":"physical_plan_display_indent_multi_children","metadata":{"source":"datafusion/core/tests/sql/explain_analyze.rs","assertion_line":652,"expression":"actual"},"snapshot":"ProjectionExec: expr=[c1@0 as c1]\n SortMergeJoin: join_type=Inner, on=[(c1@0, c2@0)]\n SortExec: expr=[c1@0 ASC], preserve_partitioning=[true]\n CoalesceBatchesExec: target_batch_size=4096\n RepartitionExec: partitioning=Hash([c1@0], 9000), input_partitions=9000\n RepartitionExec: partitioning=RoundRobinBatch(9000), input_partitions=1\n DataSourceExec: file_groups={1 group: [[ARROW_TEST_DATA/csv/aggregate_test_100.csv]]}, projection=[c1], file_type=csv, has_header=true\n SortExec: expr=[c2@0 ASC], preserve_partitioning=[true]\n CoalesceBatchesExec: target_batch_size=4096\n RepartitionExec: partitioning=Hash([c2@0], 9000), input_partitions=9000\n RepartitionExec: partitioning=RoundRobinBatch(9000), input_partitions=1\n ProjectionExec: expr=[c1@0 as c2]\n DataSourceExec: file_groups={1 group: [[ARROW_TEST_DATA/csv/aggregate_test_100.csv]]}, projection=[c1], file_type=csv, has_header=true"},"old":{"module_name":"core_integration__sql__explain_analyze","metadata":{},"snapshot":"CoalesceBatchesExec: target_batch_size=4096\n HashJoinExec: mode=Partitioned, join_type=Inner, on=[(c1@0, c2@0)], projection=[c1@0]\n CoalesceBatchesExec: target_batch_size=4096\n RepartitionExec: partitioning=Hash([c1@0], 9000), input_partitions=9000\n RepartitionExec: partitioning=RoundRobinBatch(9000), input_partitions=1\n DataSourceExec: file_groups={1 group: [[ARROW_TEST_DATA/csv/aggregate_test_100.csv]]}, projection=[c1], file_type=csv, has_header=true\n CoalesceBatchesExec: target_batch_size=4096\n RepartitionExec: partitioning=Hash([c2@0], 9000), input_partitions=9000\n RepartitionExec: partitioning=RoundRobinBatch(9000), input_partitions=1\n ProjectionExec: expr=[c1@0 as c2]\n DataSourceExec: file_groups={1 group: [[ARROW_TEST_DATA/csv/aggregate_test_100.csv]]}, projection=[c1], file_type=csv, has_header=true"}} +{"run_id":"1b3b3d20-8bc9-43b0-8c52-e33dbec51ba7","line":181,"new":null,"old":null} +{"run_id":"1b3b3d20-8bc9-43b0-8c52-e33dbec51ba7","line":194,"new":null,"old":null} +{"run_id":"1b3b3d20-8bc9-43b0-8c52-e33dbec51ba7","line":207,"new":null,"old":null} +{"run_id":"1b3b3d20-8bc9-43b0-8c52-e33dbec51ba7","line":252,"new":null,"old":null} +{"run_id":"1b3b3d20-8bc9-43b0-8c52-e33dbec51ba7","line":265,"new":null,"old":null} +{"run_id":"1b3b3d20-8bc9-43b0-8c52-e33dbec51ba7","line":279,"new":null,"old":null} +{"run_id":"1b3b3d20-8bc9-43b0-8c52-e33dbec51ba7","line":398,"new":null,"old":null} +{"run_id":"1b3b3d20-8bc9-43b0-8c52-e33dbec51ba7","line":411,"new":null,"old":null} +{"run_id":"1b3b3d20-8bc9-43b0-8c52-e33dbec51ba7","line":424,"new":null,"old":null} +{"run_id":"1b3b3d20-8bc9-43b0-8c52-e33dbec51ba7","line":469,"new":null,"old":null} +{"run_id":"1b3b3d20-8bc9-43b0-8c52-e33dbec51ba7","line":482,"new":null,"old":null} +{"run_id":"1b3b3d20-8bc9-43b0-8c52-e33dbec51ba7","line":495,"new":null,"old":null} +{"run_id":"1b3b3d20-8bc9-43b0-8c52-e33dbec51ba7","line":776,"new":null,"old":null} +{"run_id":"1b3b3d20-8bc9-43b0-8c52-e33dbec51ba7","line":799,"new":null,"old":null} +{"run_id":"1b3b3d20-8bc9-43b0-8c52-e33dbec51ba7","line":652,"new":null,"old":null} +{"run_id":"1b3b3d20-8bc9-43b0-8c52-e33dbec51ba7","line":606,"new":null,"old":null} +{"run_id":"f71f4510-f259-4eaa-ac65-527f62ca6bfd","line":181,"new":null,"old":null} +{"run_id":"f71f4510-f259-4eaa-ac65-527f62ca6bfd","line":194,"new":null,"old":null} +{"run_id":"f71f4510-f259-4eaa-ac65-527f62ca6bfd","line":207,"new":null,"old":null} +{"run_id":"f71f4510-f259-4eaa-ac65-527f62ca6bfd","line":252,"new":null,"old":null} +{"run_id":"f71f4510-f259-4eaa-ac65-527f62ca6bfd","line":265,"new":null,"old":null} +{"run_id":"f71f4510-f259-4eaa-ac65-527f62ca6bfd","line":279,"new":null,"old":null} +{"run_id":"f71f4510-f259-4eaa-ac65-527f62ca6bfd","line":398,"new":null,"old":null} +{"run_id":"f71f4510-f259-4eaa-ac65-527f62ca6bfd","line":411,"new":null,"old":null} +{"run_id":"f71f4510-f259-4eaa-ac65-527f62ca6bfd","line":424,"new":null,"old":null} +{"run_id":"f71f4510-f259-4eaa-ac65-527f62ca6bfd","line":469,"new":null,"old":null} +{"run_id":"f71f4510-f259-4eaa-ac65-527f62ca6bfd","line":482,"new":null,"old":null} +{"run_id":"f71f4510-f259-4eaa-ac65-527f62ca6bfd","line":495,"new":null,"old":null} +{"run_id":"f71f4510-f259-4eaa-ac65-527f62ca6bfd","line":776,"new":null,"old":null} +{"run_id":"f71f4510-f259-4eaa-ac65-527f62ca6bfd","line":799,"new":null,"old":null} +{"run_id":"f71f4510-f259-4eaa-ac65-527f62ca6bfd","line":606,"new":null,"old":null} +{"run_id":"f71f4510-f259-4eaa-ac65-527f62ca6bfd","line":652,"new":null,"old":null} diff --git a/datafusion/proto/tests/cases/roundtrip_physical_plan.rs b/datafusion/proto/tests/cases/roundtrip_physical_plan.rs index 264b333a8df0..6acb522d3a28 100644 --- a/datafusion/proto/tests/cases/roundtrip_physical_plan.rs +++ b/datafusion/proto/tests/cases/roundtrip_physical_plan.rs @@ -2218,7 +2218,7 @@ async fn roundtrip_logical_plan_sort_merge_join() -> Result<()> { ) .await?; - ctx.sql("SET datafusion.optimizer.prefer_sort_merge_join = true") + ctx.sql("SET datafusion.optimizer.join_method_priority = 'smj'") .await? .show() .await?; diff --git a/datafusion/sqllogictest/test_files/explain_tree.slt b/datafusion/sqllogictest/test_files/explain_tree.slt index a21414a14dfc..31f9c3fbe13b 100644 --- a/datafusion/sqllogictest/test_files/explain_tree.slt +++ b/datafusion/sqllogictest/test_files/explain_tree.slt @@ -1295,7 +1295,7 @@ physical_plan # Query with sort merge join. statement ok -set datafusion.optimizer.prefer_sort_merge_join = true; +set datafusion.optimizer.join_method_priority = 'smj'; query TT explain select * from hashjoin_datatype_table_t1 t1 join hashjoin_datatype_table_t2 t2 on t1.c1 = t2.c1 @@ -1320,7 +1320,7 @@ physical_plan 17)└───────────────────────────┘└───────────────────────────┘ statement ok -set datafusion.optimizer.prefer_hash_join = true; +set datafusion.optimizer.join_method_priority = 'hj'; # cleanup statement ok diff --git a/datafusion/sqllogictest/test_files/information_schema.slt b/datafusion/sqllogictest/test_files/information_schema.slt index 773a154f2544..355a16292040 100644 --- a/datafusion/sqllogictest/test_files/information_schema.slt +++ b/datafusion/sqllogictest/test_files/information_schema.slt @@ -297,12 +297,11 @@ datafusion.optimizer.expand_views_at_output false datafusion.optimizer.filter_null_join_keys false datafusion.optimizer.hash_join_single_partition_threshold 1048576 datafusion.optimizer.hash_join_single_partition_threshold_rows 131072 +datafusion.optimizer.join_method_priority (empty) datafusion.optimizer.max_passes 3 datafusion.optimizer.prefer_existing_sort false datafusion.optimizer.prefer_existing_union false -datafusion.optimizer.prefer_hash_join false -datafusion.optimizer.prefer_nested_loop_join false -datafusion.optimizer.prefer_sort_merge_join false +datafusion.optimizer.prefer_hash_join true datafusion.optimizer.repartition_aggregations true datafusion.optimizer.repartition_file_min_size 10485760 datafusion.optimizer.repartition_file_scans true @@ -416,12 +415,11 @@ datafusion.optimizer.expand_views_at_output false When set to true, if the retur datafusion.optimizer.filter_null_join_keys false When set to true, the optimizer will insert filters before a join between a nullable and non-nullable column to filter out nulls on the nullable side. This filter can add additional overhead when the file format does not fully support predicate push down. datafusion.optimizer.hash_join_single_partition_threshold 1048576 The maximum estimated size in bytes for one input side of a HashJoin will be collected into a single partition datafusion.optimizer.hash_join_single_partition_threshold_rows 131072 The maximum estimated size in rows for one input side of a HashJoin will be collected into a single partition +datafusion.optimizer.join_method_priority (empty) Comma-separated priority list of join algorithms to try, in order. Supported join methods (case-insensitive): - "hj", "hash_join" - "smj", "sort_merge_join" - "nlj", "nested_loop_join" The planner picks the first algorithm in this list that is both applicable to the current join and enabled via the corresponding `enable_*_join` flags. If none match, a default heuristic order is used as a fallback. Examples: - `hj, nlj`: prefer Hash Join; if not applicable, try Nested Loop Join next. - `smj`: prefer Sort-Merge Join; if disabled or not applicable, fall back to the default heuristic order. - `hj, nlj`: on an non-equi-join, Hash Join is not applicable, so Nested Loop Join is chosen if enabled. Notes: - Tokens are case-insensitive; unknown tokens cause a validation error - When set, this option supersedes legacy configuration `prefer_hash_join` , that is deprecated and ignored when `join_method_priority` is provided(not empty string ""). datafusion.optimizer.max_passes 3 Number of times that the optimizer will attempt to optimize the plan datafusion.optimizer.prefer_existing_sort false When true, DataFusion will opportunistically remove sorts when the data is already sorted, (i.e. setting `preserve_order` to true on `RepartitionExec` and using `SortPreservingMergeExec`) When false, DataFusion will maximize plan parallelism using `RepartitionExec` even if this requires subsequently resorting data using a `SortExec`. datafusion.optimizer.prefer_existing_union false When set to true, the optimizer will not attempt to convert Union to Interleave -datafusion.optimizer.prefer_hash_join false When set to true, the physical plan optimizer will prefer HashJoin when applicable. If there are multiple preferred and applicable join types, the optimizer will choose one based on heuristics. HashJoin can work more efficiently than SortMergeJoin but consumes more memory -datafusion.optimizer.prefer_nested_loop_join false When set to true, the physical plan optimizer will prefer NestedLoopJoin when applicable. If there are multiple preferred and applicable join types, the optimizer will choose one according to heuristics. -datafusion.optimizer.prefer_sort_merge_join false When set to true, the physical plan optimizer will prefer SortMergeJoin when applicable. If there are multiple preferred and applicable join types, the optimizer will choose one according to heuristics. +datafusion.optimizer.prefer_hash_join true When set to true, the physical plan optimizer will prefer HashJoin when applicable. If there are multiple preferred and applicable join types, the optimizer will choose one based on heuristics. HashJoin can work more efficiently than SortMergeJoin but consumes more memory Note: if `join_method_priority` is set, this configuration will be overriden datafusion.optimizer.repartition_aggregations true Should DataFusion repartition data using the aggregate keys to execute aggregates in parallel using the provided `target_partitions` level datafusion.optimizer.repartition_file_min_size 10485760 Minimum total files size in bytes to perform file scan repartitioning. datafusion.optimizer.repartition_file_scans true When set to `true`, datasource partitions will be repartitioned to achieve maximum parallelism. This applies to both in-memory partitions and FileSource's file groups (1 group is 1 partition). For FileSources, only Parquet and CSV formats are currently supported. If set to `true` for a FileSource, all files will be repartitioned evenly (i.e., a single large file might be partitioned into smaller chunks) for parallel scanning. If set to `false` for a FileSource, different files will be read in parallel, but repartitioning won't happen within a single file. If set to `true` for an in-memory source, all memtable's partitions will have their batches repartitioned evenly to the desired number of `target_partitions`. Repartitioning can change the total number of partitions and batches per partition, but does not slice the initial record tables provided to the MemTable on creation. diff --git a/datafusion/sqllogictest/test_files/join_planner_preferences.slt b/datafusion/sqllogictest/test_files/join_planner_preferences.slt index 9dbba8a27d4f..f62a14e8a0d9 100644 --- a/datafusion/sqllogictest/test_files/join_planner_preferences.slt +++ b/datafusion/sqllogictest/test_files/join_planner_preferences.slt @@ -22,9 +22,7 @@ ## - datafusion.optimizer.enable_hash_join ## - datafusion.optimizer.enable_sort_merge_join ## - datafusion.optimizer.enable_nested_loop_join -## - datafusion.optimizer.prefer_hash_join -## - datafusion.optimizer.prefer_sort_merge_join -## - datafusion.optimizer.prefer_nested_loop_join +## - datafusion.optimizer.join_method_priority ## ## Strategy: set configs via SQL, then EXPLAIN a simple equi-join and ## assert which physical join operator is chosen. @@ -66,13 +64,7 @@ statement ok set datafusion.optimizer.enable_hash_join = true; statement ok -set datafusion.optimizer.prefer_nested_loop_join = false; - -statement ok -set datafusion.optimizer.prefer_sort_merge_join = false; - -statement ok -set datafusion.optimizer.prefer_hash_join = false; +set datafusion.optimizer.join_method_priority = 'hj'; query TT EXPLAIN SELECT * FROM jt1 JOIN jt2 ON jt1.a = jt2.a @@ -95,13 +87,7 @@ statement ok set datafusion.optimizer.enable_hash_join = false; statement ok -set datafusion.optimizer.prefer_nested_loop_join = false; - -statement ok -set datafusion.optimizer.prefer_sort_merge_join = true; - -statement ok -set datafusion.optimizer.prefer_hash_join = false; +set datafusion.optimizer.join_method_priority = 'smj'; query TT EXPLAIN SELECT * FROM jt1 JOIN jt2 ON jt1.a = jt2.a @@ -126,13 +112,7 @@ statement ok set datafusion.optimizer.enable_hash_join = true; statement ok -set datafusion.optimizer.prefer_nested_loop_join = false; - -statement ok -set datafusion.optimizer.prefer_sort_merge_join = true; - -statement ok -set datafusion.optimizer.prefer_hash_join = false; +set datafusion.optimizer.join_method_priority = 'smj'; query TT EXPLAIN SELECT * FROM jt1 JOIN jt2 ON jt1.a = jt2.a @@ -157,13 +137,30 @@ statement ok set datafusion.optimizer.enable_hash_join = true; statement ok -set datafusion.optimizer.prefer_nested_loop_join = false; +set datafusion.optimizer.join_method_priority = 'hj'; +query TT +EXPLAIN SELECT * FROM jt1 JOIN jt2 ON jt1.a = jt2.a +---- +physical_plan +01)HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(a@0, a@0)] +02)--DataSourceExec: partitions=1, partition_sizes=[1] +03)--DataSourceExec: partitions=1, partition_sizes=[1] + +########## +## Config 5 -- Prefer disabled type (NLJ preferred but disabled) -> expect HashJoin +########## statement ok -set datafusion.optimizer.prefer_sort_merge_join = true; +set datafusion.optimizer.enable_nested_loop_join = false; statement ok -set datafusion.optimizer.prefer_hash_join = true; +set datafusion.optimizer.enable_sort_merge_join = true; + +statement ok +set datafusion.optimizer.enable_hash_join = true; + +statement ok +set datafusion.optimizer.join_method_priority = 'nlj, hj'; query TT EXPLAIN SELECT * FROM jt1 JOIN jt2 ON jt1.a = jt2.a @@ -174,11 +171,28 @@ physical_plan 03)--DataSourceExec: partitions=1, partition_sizes=[1] ########## -## Config 5 -- Prefer disabled type (NLJ preferred but disabled) -> expect HashJoin +## Config 6 -- No join type enabled -> expect planning error ########## statement ok set datafusion.optimizer.enable_nested_loop_join = false; +statement ok +set datafusion.optimizer.enable_sort_merge_join = false; + +statement ok +set datafusion.optimizer.enable_hash_join = false; + +query TT +EXPLAIN SELECT * FROM jt1 JOIN jt2 ON jt1.a = jt2.a +---- +physical_plan_error Error during planning: No enabled join algorithm is applicable for this join. Possible join types are [Hj, Smj]. Try to enable them through configurations like `datafusion.optimizer.enable_hash_join` + +########## +## Config 7 -- Priority 'smj, hj' with both enabled -> expect SortMergeJoin +########## +statement ok +set datafusion.optimizer.enable_nested_loop_join = true; + statement ok set datafusion.optimizer.enable_sort_merge_join = true; @@ -186,13 +200,23 @@ statement ok set datafusion.optimizer.enable_hash_join = true; statement ok -set datafusion.optimizer.prefer_nested_loop_join = true; +set datafusion.optimizer.join_method_priority = 'smj, hj'; -statement ok -set datafusion.optimizer.prefer_sort_merge_join = false; +query TT +EXPLAIN SELECT * FROM jt1 JOIN jt2 ON jt1.a = jt2.a +---- +physical_plan +01)SortMergeJoin: join_type=Inner, on=[(a@0, a@0)] +02)--SortExec: expr=[a@0 ASC], preserve_partitioning=[false] +03)----DataSourceExec: partitions=1, partition_sizes=[1] +04)--SortExec: expr=[a@0 ASC], preserve_partitioning=[false] +05)----DataSourceExec: partitions=1, partition_sizes=[1] +########## +## Config 8 -- Priority 'hj, smj' with both enabled -> expect HashJoin +########## statement ok -set datafusion.optimizer.prefer_hash_join = false; +set datafusion.optimizer.join_method_priority = 'hj, smj'; query TT EXPLAIN SELECT * FROM jt1 JOIN jt2 ON jt1.a = jt2.a @@ -203,18 +227,83 @@ physical_plan 03)--DataSourceExec: partitions=1, partition_sizes=[1] ########## -## Config 6 -- No join type enabled -> expect planning error +## Config 9 -- Priority includes NLJ first, but NLJ is not applicable for equi-join -> expect SortMergeJoin ########## statement ok -set datafusion.optimizer.enable_nested_loop_join = false; +set datafusion.optimizer.join_method_priority = 'nlj, smj'; + +query TT +EXPLAIN SELECT * FROM jt1 JOIN jt2 ON jt1.a = jt2.a +---- +physical_plan +01)SortMergeJoin: join_type=Inner, on=[(a@0, a@0)] +02)--SortExec: expr=[a@0 ASC], preserve_partitioning=[false] +03)----DataSourceExec: partitions=1, partition_sizes=[1] +04)--SortExec: expr=[a@0 ASC], preserve_partitioning=[false] +05)----DataSourceExec: partitions=1, partition_sizes=[1] +########## +## Config 10 -- Priority 'smj' but SMJ disabled -> expect HashJoin via fallback +########## statement ok set datafusion.optimizer.enable_sort_merge_join = false; statement ok -set datafusion.optimizer.enable_hash_join = false; +set datafusion.optimizer.enable_hash_join = true; + +statement ok +set datafusion.optimizer.join_method_priority = 'smj'; query TT EXPLAIN SELECT * FROM jt1 JOIN jt2 ON jt1.a = jt2.a ---- -physical_plan_error Error during planning: No enabled join algorithm is applicable for this join. Possible join types are [Hj, Smj]. Try to enable them through configurations like `datafusion.optimizer.enable_hash_join` +physical_plan +01)HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(a@0, a@0)] +02)--DataSourceExec: partitions=1, partition_sizes=[1] +03)--DataSourceExec: partitions=1, partition_sizes=[1] + +########## +## Config 11 -- Long names 'sort_merge_join, hash_join' -> expect SortMergeJoin +########## +statement ok +set datafusion.optimizer.enable_sort_merge_join = true; + +statement ok +set datafusion.optimizer.join_method_priority = 'sort_merge_join, hash_join'; + +query TT +EXPLAIN SELECT * FROM jt1 JOIN jt2 ON jt1.a = jt2.a +---- +physical_plan +01)SortMergeJoin: join_type=Inner, on=[(a@0, a@0)] +02)--SortExec: expr=[a@0 ASC], preserve_partitioning=[false] +03)----DataSourceExec: partitions=1, partition_sizes=[1] +04)--SortExec: expr=[a@0 ASC], preserve_partitioning=[false] +05)----DataSourceExec: partitions=1, partition_sizes=[1] + +########## +## Config 12 -- Case-insensitive priority tokens 'SMJ, HJ' -> expect SortMergeJoin +########## +statement ok +set datafusion.optimizer.join_method_priority = 'SMJ, HJ'; + +query TT +EXPLAIN SELECT * FROM jt1 JOIN jt2 ON jt1.a = jt2.a +---- +physical_plan +01)SortMergeJoin: join_type=Inner, on=[(a@0, a@0)] +02)--SortExec: expr=[a@0 ASC], preserve_partitioning=[false] +03)----DataSourceExec: partitions=1, partition_sizes=[1] +04)--SortExec: expr=[a@0 ASC], preserve_partitioning=[false] +05)----DataSourceExec: partitions=1, partition_sizes=[1] + +########## +## Config 13 -- Typo in priority 'smj, hsj' -> expect configuration error listing valid values +########## +statement ok +set datafusion.optimizer.join_method_priority = 'smj, hsj'; + +query TT +EXPLAIN SELECT * FROM jt1 JOIN jt2 ON jt1.a = jt2.a +---- +physical_plan_error Invalid or Unsupported Configuration: Invalid join method(s) in datafusion.optimizer.join_method_priority: hsj. Valid values: hj/hash_join, smj/sort_merge_join, nlj/nested_loop_join diff --git a/datafusion/sqllogictest/test_files/joins.slt b/datafusion/sqllogictest/test_files/joins.slt index 009c9eb469b2..b5441d5bd459 100644 --- a/datafusion/sqllogictest/test_files/joins.slt +++ b/datafusion/sqllogictest/test_files/joins.slt @@ -2558,7 +2558,7 @@ statement ok set datafusion.explain.logical_plan_only = false; statement ok -set datafusion.optimizer.prefer_hash_join = true; +set datafusion.optimizer.join_method_priority = 'hj'; # explain hash join on timestamp with timezone type query TT @@ -2724,10 +2724,7 @@ statement ok set datafusion.explain.logical_plan_only = false; statement ok -set datafusion.optimizer.prefer_hash_join = false; - -statement ok -set datafusion.optimizer.prefer_sort_merge_join = true; +set datafusion.optimizer.join_method_priority = 'smj'; statement ok set datafusion.execution.target_partitions = 2; @@ -2806,7 +2803,7 @@ statement ok set datafusion.explain.logical_plan_only = true; statement ok -set datafusion.optimizer.prefer_hash_join = true; +set datafusion.optimizer.join_method_priority = 'hj'; statement ok set datafusion.execution.target_partitions = 2; @@ -3154,10 +3151,7 @@ statement ok set datafusion.explain.logical_plan_only = false; statement ok -set datafusion.optimizer.prefer_hash_join = false; - -statement ok -set datafusion.optimizer.prefer_sort_merge_join = true; +set datafusion.optimizer.join_method_priority = 'smj'; statement ok set datafusion.execution.target_partitions = 2; @@ -3293,7 +3287,7 @@ physical_plan 17)------------------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a0, a, b, c, d], output_ordering=[a@1 ASC, b@2 ASC NULLS LAST, c@3 ASC NULLS LAST], file_type=csv, has_header=true statement ok -set datafusion.optimizer.prefer_hash_join = true; +set datafusion.optimizer.join_method_priority = 'hj'; # to preserve ordering until Hash join set target partition to 1. # Otherwise RepartitionExec s inserted may broke ordering. @@ -3538,7 +3532,7 @@ statement ok set datafusion.explain.logical_plan_only = true; statement ok -set datafusion.optimizer.prefer_hash_join = true; +set datafusion.optimizer.join_method_priority = 'hj'; statement ok set datafusion.execution.target_partitions = 2; diff --git a/datafusion/sqllogictest/test_files/sort_merge_join.slt b/datafusion/sqllogictest/test_files/sort_merge_join.slt index 235da41af8fc..31cd234ae841 100644 --- a/datafusion/sqllogictest/test_files/sort_merge_join.slt +++ b/datafusion/sqllogictest/test_files/sort_merge_join.slt @@ -20,7 +20,7 @@ ########## statement ok -set datafusion.optimizer.prefer_sort_merge_join = true; +set datafusion.optimizer.join_method_priority = 'smj'; statement ok CREATE TABLE t1(a text, b int) AS VALUES ('Alice', 50), ('Alice', 100), ('Bob', 1); @@ -890,4 +890,4 @@ drop table t2; # return sql params back to default values statement ok -set datafusion.optimizer.prefer_hash_join = true; +set datafusion.optimizer.join_method_priority = 'hj'; diff --git a/datafusion/sqllogictest/test_files/tpch/tpch.slt b/datafusion/sqllogictest/test_files/tpch/tpch.slt index a2eb899588ef..a9154317059a 100644 --- a/datafusion/sqllogictest/test_files/tpch/tpch.slt +++ b/datafusion/sqllogictest/test_files/tpch/tpch.slt @@ -23,7 +23,7 @@ include ./answers/q*.slt.part # test answers with sort merge join statement ok -set datafusion.optimizer.prefer_sort_merge_join = true; +set datafusion.optimizer.join_method_priority = 'smj'; include ./answers/q*.slt.part From 498077f9f853b1ff88e002ef3dc219dad4575034 Mon Sep 17 00:00:00 2001 From: Yongting You <2010youy01@gmail.com> Date: Sat, 13 Sep 2025 20:04:35 +0800 Subject: [PATCH 08/15] fix typo --- datafusion/common/src/config.rs | 4 ++- .../sql/.explain_analyze.rs.pending-snap | 33 ------------------- .../test_files/information_schema.slt | 2 +- 3 files changed, 4 insertions(+), 35 deletions(-) delete mode 100644 datafusion/core/tests/sql/.explain_analyze.rs.pending-snap diff --git a/datafusion/common/src/config.rs b/datafusion/common/src/config.rs index 5d1eef83c437..7cc1d73dd802 100644 --- a/datafusion/common/src/config.rs +++ b/datafusion/common/src/config.rs @@ -827,7 +827,7 @@ config_namespace! { /// will choose one based on heuristics. /// HashJoin can work more efficiently than SortMergeJoin but consumes more memory /// - /// Note: if `join_method_priority` is set, this configuration will be overriden + /// Note: if `join_method_priority` is set, this configuration will be overridden #[deprecated(since = "51.0.0", note = "Please use configuration option `join_method_priority` instead")] pub prefer_hash_join: bool, default = true @@ -842,6 +842,8 @@ config_namespace! { /// applicable to the current join and enabled via the corresponding /// `enable_*_join` flags. If none match, a default heuristic order /// is used as a fallback. + /// The default join method selection policy is, for equi-joins, Hash Join is + /// used; for non equi-joins, Nested Loop Join is used. /// /// Examples: /// - `hj, nlj`: prefer Hash Join; if not applicable, try Nested Loop Join next. diff --git a/datafusion/core/tests/sql/.explain_analyze.rs.pending-snap b/datafusion/core/tests/sql/.explain_analyze.rs.pending-snap deleted file mode 100644 index 7ed67241fbfe..000000000000 --- a/datafusion/core/tests/sql/.explain_analyze.rs.pending-snap +++ /dev/null @@ -1,33 +0,0 @@ -{"run_id":"481ba350-a37e-4b16-86e9-e45d01d1b0ef","line":652,"new":{"module_name":"core_integration__sql__explain_analyze","snapshot_name":"physical_plan_display_indent_multi_children","metadata":{"source":"datafusion/core/tests/sql/explain_analyze.rs","assertion_line":652,"expression":"actual"},"snapshot":"ProjectionExec: expr=[c1@0 as c1]\n SortMergeJoin: join_type=Inner, on=[(c1@0, c2@0)]\n SortExec: expr=[c1@0 ASC], preserve_partitioning=[true]\n CoalesceBatchesExec: target_batch_size=4096\n RepartitionExec: partitioning=Hash([c1@0], 9000), input_partitions=9000\n RepartitionExec: partitioning=RoundRobinBatch(9000), input_partitions=1\n DataSourceExec: file_groups={1 group: [[ARROW_TEST_DATA/csv/aggregate_test_100.csv]]}, projection=[c1], file_type=csv, has_header=true\n SortExec: expr=[c2@0 ASC], preserve_partitioning=[true]\n CoalesceBatchesExec: target_batch_size=4096\n RepartitionExec: partitioning=Hash([c2@0], 9000), input_partitions=9000\n RepartitionExec: partitioning=RoundRobinBatch(9000), input_partitions=1\n ProjectionExec: expr=[c1@0 as c2]\n DataSourceExec: file_groups={1 group: [[ARROW_TEST_DATA/csv/aggregate_test_100.csv]]}, projection=[c1], file_type=csv, has_header=true"},"old":{"module_name":"core_integration__sql__explain_analyze","metadata":{},"snapshot":"CoalesceBatchesExec: target_batch_size=4096\n HashJoinExec: mode=Partitioned, join_type=Inner, on=[(c1@0, c2@0)], projection=[c1@0]\n CoalesceBatchesExec: target_batch_size=4096\n RepartitionExec: partitioning=Hash([c1@0], 9000), input_partitions=9000\n RepartitionExec: partitioning=RoundRobinBatch(9000), input_partitions=1\n DataSourceExec: file_groups={1 group: [[ARROW_TEST_DATA/csv/aggregate_test_100.csv]]}, projection=[c1], file_type=csv, has_header=true\n CoalesceBatchesExec: target_batch_size=4096\n RepartitionExec: partitioning=Hash([c2@0], 9000), input_partitions=9000\n RepartitionExec: partitioning=RoundRobinBatch(9000), input_partitions=1\n ProjectionExec: expr=[c1@0 as c2]\n DataSourceExec: file_groups={1 group: [[ARROW_TEST_DATA/csv/aggregate_test_100.csv]]}, projection=[c1], file_type=csv, has_header=true"}} -{"run_id":"1b3b3d20-8bc9-43b0-8c52-e33dbec51ba7","line":181,"new":null,"old":null} -{"run_id":"1b3b3d20-8bc9-43b0-8c52-e33dbec51ba7","line":194,"new":null,"old":null} -{"run_id":"1b3b3d20-8bc9-43b0-8c52-e33dbec51ba7","line":207,"new":null,"old":null} -{"run_id":"1b3b3d20-8bc9-43b0-8c52-e33dbec51ba7","line":252,"new":null,"old":null} -{"run_id":"1b3b3d20-8bc9-43b0-8c52-e33dbec51ba7","line":265,"new":null,"old":null} -{"run_id":"1b3b3d20-8bc9-43b0-8c52-e33dbec51ba7","line":279,"new":null,"old":null} -{"run_id":"1b3b3d20-8bc9-43b0-8c52-e33dbec51ba7","line":398,"new":null,"old":null} -{"run_id":"1b3b3d20-8bc9-43b0-8c52-e33dbec51ba7","line":411,"new":null,"old":null} -{"run_id":"1b3b3d20-8bc9-43b0-8c52-e33dbec51ba7","line":424,"new":null,"old":null} -{"run_id":"1b3b3d20-8bc9-43b0-8c52-e33dbec51ba7","line":469,"new":null,"old":null} -{"run_id":"1b3b3d20-8bc9-43b0-8c52-e33dbec51ba7","line":482,"new":null,"old":null} -{"run_id":"1b3b3d20-8bc9-43b0-8c52-e33dbec51ba7","line":495,"new":null,"old":null} -{"run_id":"1b3b3d20-8bc9-43b0-8c52-e33dbec51ba7","line":776,"new":null,"old":null} -{"run_id":"1b3b3d20-8bc9-43b0-8c52-e33dbec51ba7","line":799,"new":null,"old":null} -{"run_id":"1b3b3d20-8bc9-43b0-8c52-e33dbec51ba7","line":652,"new":null,"old":null} -{"run_id":"1b3b3d20-8bc9-43b0-8c52-e33dbec51ba7","line":606,"new":null,"old":null} -{"run_id":"f71f4510-f259-4eaa-ac65-527f62ca6bfd","line":181,"new":null,"old":null} -{"run_id":"f71f4510-f259-4eaa-ac65-527f62ca6bfd","line":194,"new":null,"old":null} -{"run_id":"f71f4510-f259-4eaa-ac65-527f62ca6bfd","line":207,"new":null,"old":null} -{"run_id":"f71f4510-f259-4eaa-ac65-527f62ca6bfd","line":252,"new":null,"old":null} -{"run_id":"f71f4510-f259-4eaa-ac65-527f62ca6bfd","line":265,"new":null,"old":null} -{"run_id":"f71f4510-f259-4eaa-ac65-527f62ca6bfd","line":279,"new":null,"old":null} -{"run_id":"f71f4510-f259-4eaa-ac65-527f62ca6bfd","line":398,"new":null,"old":null} -{"run_id":"f71f4510-f259-4eaa-ac65-527f62ca6bfd","line":411,"new":null,"old":null} -{"run_id":"f71f4510-f259-4eaa-ac65-527f62ca6bfd","line":424,"new":null,"old":null} -{"run_id":"f71f4510-f259-4eaa-ac65-527f62ca6bfd","line":469,"new":null,"old":null} -{"run_id":"f71f4510-f259-4eaa-ac65-527f62ca6bfd","line":482,"new":null,"old":null} -{"run_id":"f71f4510-f259-4eaa-ac65-527f62ca6bfd","line":495,"new":null,"old":null} -{"run_id":"f71f4510-f259-4eaa-ac65-527f62ca6bfd","line":776,"new":null,"old":null} -{"run_id":"f71f4510-f259-4eaa-ac65-527f62ca6bfd","line":799,"new":null,"old":null} -{"run_id":"f71f4510-f259-4eaa-ac65-527f62ca6bfd","line":606,"new":null,"old":null} -{"run_id":"f71f4510-f259-4eaa-ac65-527f62ca6bfd","line":652,"new":null,"old":null} diff --git a/datafusion/sqllogictest/test_files/information_schema.slt b/datafusion/sqllogictest/test_files/information_schema.slt index 355a16292040..02e408ec1005 100644 --- a/datafusion/sqllogictest/test_files/information_schema.slt +++ b/datafusion/sqllogictest/test_files/information_schema.slt @@ -419,7 +419,7 @@ datafusion.optimizer.join_method_priority (empty) Comma-separated priority list datafusion.optimizer.max_passes 3 Number of times that the optimizer will attempt to optimize the plan datafusion.optimizer.prefer_existing_sort false When true, DataFusion will opportunistically remove sorts when the data is already sorted, (i.e. setting `preserve_order` to true on `RepartitionExec` and using `SortPreservingMergeExec`) When false, DataFusion will maximize plan parallelism using `RepartitionExec` even if this requires subsequently resorting data using a `SortExec`. datafusion.optimizer.prefer_existing_union false When set to true, the optimizer will not attempt to convert Union to Interleave -datafusion.optimizer.prefer_hash_join true When set to true, the physical plan optimizer will prefer HashJoin when applicable. If there are multiple preferred and applicable join types, the optimizer will choose one based on heuristics. HashJoin can work more efficiently than SortMergeJoin but consumes more memory Note: if `join_method_priority` is set, this configuration will be overriden +datafusion.optimizer.prefer_hash_join true When set to true, the physical plan optimizer will prefer HashJoin when applicable. If there are multiple preferred and applicable join types, the optimizer will choose one based on heuristics. HashJoin can work more efficiently than SortMergeJoin but consumes more memory Note: if `join_method_priority` is set, this configuration will be overridden datafusion.optimizer.repartition_aggregations true Should DataFusion repartition data using the aggregate keys to execute aggregates in parallel using the provided `target_partitions` level datafusion.optimizer.repartition_file_min_size 10485760 Minimum total files size in bytes to perform file scan repartitioning. datafusion.optimizer.repartition_file_scans true When set to `true`, datasource partitions will be repartitioned to achieve maximum parallelism. This applies to both in-memory partitions and FileSource's file groups (1 group is 1 partition). For FileSources, only Parquet and CSV formats are currently supported. If set to `true` for a FileSource, all files will be repartitioned evenly (i.e., a single large file might be partitioned into smaller chunks) for parallel scanning. If set to `false` for a FileSource, different files will be read in parallel, but repartitioning won't happen within a single file. If set to `true` for an in-memory source, all memtable's partitions will have their batches repartitioned evenly to the desired number of `target_partitions`. Repartitioning can change the total number of partitions and batches per partition, but does not slice the initial record tables provided to the MemTable on creation. From 73a9cd5763c8735222b0bbebd73950aad8c25c7b Mon Sep 17 00:00:00 2001 From: Yongting You <2010youy01@gmail.com> Date: Sat, 13 Sep 2025 20:11:37 +0800 Subject: [PATCH 09/15] clean up --- datafusion/common/src/config.rs | 4 +--- datafusion/core/tests/dataframe/mod.rs | 7 ++----- 2 files changed, 3 insertions(+), 8 deletions(-) diff --git a/datafusion/common/src/config.rs b/datafusion/common/src/config.rs index 7cc1d73dd802..1bf5d1ec4ff2 100644 --- a/datafusion/common/src/config.rs +++ b/datafusion/common/src/config.rs @@ -822,9 +822,7 @@ config_namespace! { /// process to reorder the join keys pub top_down_join_key_reordering: bool, default = true - /// When set to true, the physical plan optimizer will prefer HashJoin when applicable. - /// If there are multiple preferred and applicable join types, the optimizer - /// will choose one based on heuristics. + /// When set to true, the physical plan optimizer will prefer HashJoin over SortMergeJoin. /// HashJoin can work more efficiently than SortMergeJoin but consumes more memory /// /// Note: if `join_method_priority` is set, this configuration will be overridden diff --git a/datafusion/core/tests/dataframe/mod.rs b/datafusion/core/tests/dataframe/mod.rs index 95e5e2f44013..ceae0cd0e952 100644 --- a/datafusion/core/tests/dataframe/mod.rs +++ b/datafusion/core/tests/dataframe/mod.rs @@ -128,8 +128,7 @@ async fn assert_logical_expr_schema_eq_physical_expr_schema(df: DataFrame) -> Re #[tokio::test] async fn test_array_agg_ord_schema() -> Result<()> { - let cfg = - SessionConfig::new().set_str("datafusion.optimizer.join_method_priority", "hj"); + let ctx = SessionContext::new(); let ctx = SessionContext::new_with_config(cfg); let create_table_query = r#" @@ -4742,9 +4741,7 @@ fn create_join_context() -> Result { ], )?; - let config = SessionConfig::new() - .with_target_partitions(4) - .set_str("datafusion.optimizer.join_method_priority", "hj"); + let config = SessionConfig::new().with_target_partitions(4); let ctx = SessionContext::new_with_config(config); // let ctx = SessionContext::new(); From 71e0f90e91e4ad12f1e10602abe1b2640666e981 Mon Sep 17 00:00:00 2001 From: Yongting You <2010youy01@gmail.com> Date: Sat, 13 Sep 2025 20:35:35 +0800 Subject: [PATCH 10/15] fix test --- datafusion/common/src/config.rs | 28 ++++--------------- .../test_files/information_schema.slt | 4 +-- docs/source/user-guide/configs.md | 5 ++-- 3 files changed, 10 insertions(+), 27 deletions(-) diff --git a/datafusion/common/src/config.rs b/datafusion/common/src/config.rs index 1bf5d1ec4ff2..c81bcf9dd729 100644 --- a/datafusion/common/src/config.rs +++ b/datafusion/common/src/config.rs @@ -829,32 +829,16 @@ config_namespace! { #[deprecated(since = "51.0.0", note = "Please use configuration option `join_method_priority` instead")] pub prefer_hash_join: bool, default = true - /// Comma-separated priority list of join algorithms to try, in order. + /// Comma-separated join priority (case-insensitive) selecting the first applicable and enabled (through configurations like `enable_hash_join`) join method. /// - /// Supported join methods (case-insensitive): - /// - "hj", "hash_join" - /// - "smj", "sort_merge_join" - /// - "nlj", "nested_loop_join" + /// Valid join methods are: hj/hash_join, smj/sort_merge_join, nlj/nested_loop_join. /// - /// The planner picks the first algorithm in this list that is both - /// applicable to the current join and enabled via the corresponding - /// `enable_*_join` flags. If none match, a default heuristic order - /// is used as a fallback. - /// The default join method selection policy is, for equi-joins, Hash Join is - /// used; for non equi-joins, Nested Loop Join is used. + /// It is allowed to specify any number of join methods in the configuration. If none of them is applicable, the default join planning heuristic will be used. + /// The default join planning policy is: for equi-joins Hash Join will be selected; for non equi-joins, Nested Loop Join will be chosen. /// - /// Examples: - /// - `hj, nlj`: prefer Hash Join; if not applicable, try Nested Loop Join next. - /// - `smj`: prefer Sort-Merge Join; if disabled or not applicable, fall back to - /// the default heuristic order. - /// - `hj, nlj`: on an non-equi-join, Hash Join is not applicable, so Nested - /// Loop Join is chosen if enabled. + /// Example usage: SET datafusion.optimizer.join_method_priority = 'hj, nlj' /// - /// Notes: - /// - Tokens are case-insensitive; unknown tokens cause a validation error - /// - When set, this option supersedes legacy configuration `prefer_hash_join` - /// , that is deprecated and ignored when `join_method_priority` is - /// provided(not empty string ""). + /// Note: If this option is not set (default empty string), the deprecated legacy option `prefer_hash_join` will be used. pub join_method_priority: String, transform = str::to_lowercase, default = "".to_string() /// Enables planning HashJoin operators. If set to false, the optimizer will avoid diff --git a/datafusion/sqllogictest/test_files/information_schema.slt b/datafusion/sqllogictest/test_files/information_schema.slt index 02e408ec1005..c5dc3641bc88 100644 --- a/datafusion/sqllogictest/test_files/information_schema.slt +++ b/datafusion/sqllogictest/test_files/information_schema.slt @@ -415,11 +415,11 @@ datafusion.optimizer.expand_views_at_output false When set to true, if the retur datafusion.optimizer.filter_null_join_keys false When set to true, the optimizer will insert filters before a join between a nullable and non-nullable column to filter out nulls on the nullable side. This filter can add additional overhead when the file format does not fully support predicate push down. datafusion.optimizer.hash_join_single_partition_threshold 1048576 The maximum estimated size in bytes for one input side of a HashJoin will be collected into a single partition datafusion.optimizer.hash_join_single_partition_threshold_rows 131072 The maximum estimated size in rows for one input side of a HashJoin will be collected into a single partition -datafusion.optimizer.join_method_priority (empty) Comma-separated priority list of join algorithms to try, in order. Supported join methods (case-insensitive): - "hj", "hash_join" - "smj", "sort_merge_join" - "nlj", "nested_loop_join" The planner picks the first algorithm in this list that is both applicable to the current join and enabled via the corresponding `enable_*_join` flags. If none match, a default heuristic order is used as a fallback. Examples: - `hj, nlj`: prefer Hash Join; if not applicable, try Nested Loop Join next. - `smj`: prefer Sort-Merge Join; if disabled or not applicable, fall back to the default heuristic order. - `hj, nlj`: on an non-equi-join, Hash Join is not applicable, so Nested Loop Join is chosen if enabled. Notes: - Tokens are case-insensitive; unknown tokens cause a validation error - When set, this option supersedes legacy configuration `prefer_hash_join` , that is deprecated and ignored when `join_method_priority` is provided(not empty string ""). +datafusion.optimizer.join_method_priority (empty) Comma-separated join priority (case-insensitive) selecting the first applicable and enabled (through configurations like `enable_hash_join`) join method. Valid join methods are: hj/hash_join, smj/sort_merge_join, nlj/nested_loop_join. It is allowed to specify any number of join methods in the configuration. If none of them is applicable, the default join planning heuristic will be used. The default join planning policy is: for equi-joins Hash Join will be selected; for non equi-joins, Nested Loop Join will be chosen. Example usage: SET datafusion.optimizer.join_method_priority = 'hj, nlj' Note: If this option is not set (default empty string), the deprecated legacy option `prefer_hash_join` will be used. datafusion.optimizer.max_passes 3 Number of times that the optimizer will attempt to optimize the plan datafusion.optimizer.prefer_existing_sort false When true, DataFusion will opportunistically remove sorts when the data is already sorted, (i.e. setting `preserve_order` to true on `RepartitionExec` and using `SortPreservingMergeExec`) When false, DataFusion will maximize plan parallelism using `RepartitionExec` even if this requires subsequently resorting data using a `SortExec`. datafusion.optimizer.prefer_existing_union false When set to true, the optimizer will not attempt to convert Union to Interleave -datafusion.optimizer.prefer_hash_join true When set to true, the physical plan optimizer will prefer HashJoin when applicable. If there are multiple preferred and applicable join types, the optimizer will choose one based on heuristics. HashJoin can work more efficiently than SortMergeJoin but consumes more memory Note: if `join_method_priority` is set, this configuration will be overridden +datafusion.optimizer.prefer_hash_join true When set to true, the physical plan optimizer will prefer HashJoin over SortMergeJoin. HashJoin can work more efficiently than SortMergeJoin but consumes more memory Note: if `join_method_priority` is set, this configuration will be overridden datafusion.optimizer.repartition_aggregations true Should DataFusion repartition data using the aggregate keys to execute aggregates in parallel using the provided `target_partitions` level datafusion.optimizer.repartition_file_min_size 10485760 Minimum total files size in bytes to perform file scan repartitioning. datafusion.optimizer.repartition_file_scans true When set to `true`, datasource partitions will be repartitioned to achieve maximum parallelism. This applies to both in-memory partitions and FileSource's file groups (1 group is 1 partition). For FileSources, only Parquet and CSV formats are currently supported. If set to `true` for a FileSource, all files will be repartitioned evenly (i.e., a single large file might be partitioned into smaller chunks) for parallel scanning. If set to `false` for a FileSource, different files will be read in parallel, but repartitioning won't happen within a single file. If set to `true` for an in-memory source, all memtable's partitions will have their batches repartitioned evenly to the desired number of `target_partitions`. Repartitioning can change the total number of partitions and batches per partition, but does not slice the initial record tables provided to the MemTable on creation. diff --git a/docs/source/user-guide/configs.md b/docs/source/user-guide/configs.md index b2d6d3caf929..d21950e909f2 100644 --- a/docs/source/user-guide/configs.md +++ b/docs/source/user-guide/configs.md @@ -143,9 +143,8 @@ The following configuration settings are available: | datafusion.optimizer.skip_failed_rules | false | When set to true, the logical plan optimizer will produce warning messages if any optimization rules produce errors and then proceed to the next rule. When set to false, any rules that produce errors will cause the query to fail | | datafusion.optimizer.max_passes | 3 | Number of times that the optimizer will attempt to optimize the plan | | datafusion.optimizer.top_down_join_key_reordering | true | When set to true, the physical plan optimizer will run a top down process to reorder the join keys | -| datafusion.optimizer.prefer_hash_join | false | When set to true, the physical plan optimizer will prefer HashJoin when applicable. If there are multiple preferred and applicable join types, the optimizer will choose one based on heuristics. HashJoin can work more efficiently than SortMergeJoin but consumes more memory | -| datafusion.optimizer.prefer_sort_merge_join | false | When set to true, the physical plan optimizer will prefer SortMergeJoin when applicable. If there are multiple preferred and applicable join types, the optimizer will choose one according to heuristics. | -| datafusion.optimizer.prefer_nested_loop_join | false | When set to true, the physical plan optimizer will prefer NestedLoopJoin when applicable. If there are multiple preferred and applicable join types, the optimizer will choose one according to heuristics. | +| datafusion.optimizer.prefer_hash_join | true | When set to true, the physical plan optimizer will prefer HashJoin over SortMergeJoin. HashJoin can work more efficiently than SortMergeJoin but consumes more memory Note: if `join_method_priority` is set, this configuration will be overridden | +| datafusion.optimizer.join_method_priority | | Comma-separated join priority (case-insensitive) selecting the first applicable and enabled (through configurations like `enable_hash_join`) join method. Valid join methods are: hj/hash_join, smj/sort_merge_join, nlj/nested_loop_join. It is allowed to specify any number of join methods in the configuration. If none of them is applicable, the default join planning heuristic will be used. The default join planning policy is: for equi-joins Hash Join will be selected; for non equi-joins, Nested Loop Join will be chosen. Example usage: SET datafusion.optimizer.join_method_priority = 'hj, nlj' Note: If this option is not set (default empty string), the deprecated legacy option `prefer_hash_join` will be used. | | datafusion.optimizer.enable_hash_join | true | Enables planning HashJoin operators. If set to false, the optimizer will avoid producing HashJoin plans and consider other join strategies instead. | | datafusion.optimizer.enable_sort_merge_join | true | Enables planning SortMergeJoin operators. If set to false, the optimizer will avoid producing SortMergeJoin plans and consider other join strategies instead. | | datafusion.optimizer.enable_nested_loop_join | true | Enables planning NestedLoopJoin operators. If set to false, the optimizer will avoid producing NestedLoopJoin plans and consider other join strategies instead. | From 3a1471fcdaf32507a729b90766717e8d01d90b98 Mon Sep 17 00:00:00 2001 From: Yongting You <2010youy01@gmail.com> Date: Sat, 13 Sep 2025 20:48:52 +0800 Subject: [PATCH 11/15] fix test --- datafusion/core/tests/dataframe/mod.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/datafusion/core/tests/dataframe/mod.rs b/datafusion/core/tests/dataframe/mod.rs index ceae0cd0e952..a563459f42a1 100644 --- a/datafusion/core/tests/dataframe/mod.rs +++ b/datafusion/core/tests/dataframe/mod.rs @@ -129,7 +129,6 @@ async fn assert_logical_expr_schema_eq_physical_expr_schema(df: DataFrame) -> Re #[tokio::test] async fn test_array_agg_ord_schema() -> Result<()> { let ctx = SessionContext::new(); - let ctx = SessionContext::new_with_config(cfg); let create_table_query = r#" CREATE TABLE test_table ( From 00f6dbb41e5db4c90a3e0af8e1abebbca97b3a64 Mon Sep 17 00:00:00 2001 From: Yongting You <2010youy01@gmail.com> Date: Sat, 13 Sep 2025 23:12:39 +0800 Subject: [PATCH 12/15] review --- datafusion/core/src/physical_planner.rs | 4 ++-- datafusion/core/src/physical_planner/join_planner.rs | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/datafusion/core/src/physical_planner.rs b/datafusion/core/src/physical_planner.rs index aa2f0c47a597..3d03f38f64d0 100644 --- a/datafusion/core/src/physical_planner.rs +++ b/datafusion/core/src/physical_planner.rs @@ -101,7 +101,7 @@ use tokio::sync::Mutex; // Submodules mod join_planner; -use self::join_planner::plan_join_exec; +use self::join_planner::plan_initial_join_exec; /// Physical query planner that converts a `LogicalPlan` to an /// `ExecutionPlan` suitable for execution. @@ -1165,7 +1165,7 @@ impl DefaultPhysicalPlanner { _ => None, }; - let join: Arc = plan_join_exec( + let join: Arc = plan_initial_join_exec( session_state, physical_left, physical_right, diff --git a/datafusion/core/src/physical_planner/join_planner.rs b/datafusion/core/src/physical_planner/join_planner.rs index aaa44e20c41a..eba47533b3f8 100644 --- a/datafusion/core/src/physical_planner/join_planner.rs +++ b/datafusion/core/src/physical_planner/join_planner.rs @@ -47,7 +47,7 @@ use datafusion_expr::JoinType; /// by checking if they're enabled by options like `datafusion.optimizer.enable_hash_join` /// - Step 3: Choose one according to the built-in heuristics and also the preference /// in the configuration `datafusion.optimizer.join_method_priority` -pub(super) fn plan_join_exec( +pub(super) fn plan_initial_join_exec( session_state: &SessionState, physical_left: Arc, physical_right: Arc, From b659c6158ac00ef5febdd0d8417a7c171bb9ea9a Mon Sep 17 00:00:00 2001 From: Yongting You <2010youy01@gmail.com> Date: Sun, 14 Sep 2025 16:39:53 +0800 Subject: [PATCH 13/15] bot review --- datafusion/common/src/config.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/datafusion/common/src/config.rs b/datafusion/common/src/config.rs index c81bcf9dd729..8f45588f8f59 100644 --- a/datafusion/common/src/config.rs +++ b/datafusion/common/src/config.rs @@ -839,7 +839,7 @@ config_namespace! { /// Example usage: SET datafusion.optimizer.join_method_priority = 'hj, nlj' /// /// Note: If this option is not set (default empty string), the deprecated legacy option `prefer_hash_join` will be used. - pub join_method_priority: String, transform = str::to_lowercase, default = "".to_string() + pub join_method_priority: String, default = "".to_string() /// Enables planning HashJoin operators. If set to false, the optimizer will avoid /// producing HashJoin plans and consider other join strategies instead. From 2323ef741c4c6db8a4e8145e6ea0f5e06dd8e1eb Mon Sep 17 00:00:00 2001 From: Yongting You <2010youy01@gmail.com> Date: Sun, 14 Sep 2025 17:03:25 +0800 Subject: [PATCH 14/15] revert suggestion from AI -- it's not true --- datafusion/common/src/config.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/datafusion/common/src/config.rs b/datafusion/common/src/config.rs index 8f45588f8f59..f7b65faad63c 100644 --- a/datafusion/common/src/config.rs +++ b/datafusion/common/src/config.rs @@ -827,7 +827,7 @@ config_namespace! { /// /// Note: if `join_method_priority` is set, this configuration will be overridden #[deprecated(since = "51.0.0", note = "Please use configuration option `join_method_priority` instead")] - pub prefer_hash_join: bool, default = true + pub prefer_hash_join: bool, transform = str::to_lowercase, default = true /// Comma-separated join priority (case-insensitive) selecting the first applicable and enabled (through configurations like `enable_hash_join`) join method. /// From 415dac5a6e551a9841901981269e418c04a55b4c Mon Sep 17 00:00:00 2001 From: Yongting You <2010youy01@gmail.com> Date: Sun, 14 Sep 2025 17:24:21 +0800 Subject: [PATCH 15/15] fix --- datafusion/common/src/config.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/datafusion/common/src/config.rs b/datafusion/common/src/config.rs index f7b65faad63c..c81bcf9dd729 100644 --- a/datafusion/common/src/config.rs +++ b/datafusion/common/src/config.rs @@ -827,7 +827,7 @@ config_namespace! { /// /// Note: if `join_method_priority` is set, this configuration will be overridden #[deprecated(since = "51.0.0", note = "Please use configuration option `join_method_priority` instead")] - pub prefer_hash_join: bool, transform = str::to_lowercase, default = true + pub prefer_hash_join: bool, default = true /// Comma-separated join priority (case-insensitive) selecting the first applicable and enabled (through configurations like `enable_hash_join`) join method. /// @@ -839,7 +839,7 @@ config_namespace! { /// Example usage: SET datafusion.optimizer.join_method_priority = 'hj, nlj' /// /// Note: If this option is not set (default empty string), the deprecated legacy option `prefer_hash_join` will be used. - pub join_method_priority: String, default = "".to_string() + pub join_method_priority: String, transform = str::to_lowercase, default = "".to_string() /// Enables planning HashJoin operators. If set to false, the optimizer will avoid /// producing HashJoin plans and consider other join strategies instead.