Skip to content

refactor(query): migrate physical plan from enum to trait-based architecture #18268

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 52 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
52 commits
Select commit Hold shift + click to select a range
8359dd8
refactor(query): use trait to refactor physical plan
zhang2014 Jun 28, 2025
e23c2d7
refactor(query): use trait to refactor physical plan
zhang2014 Jun 28, 2025
ed320f8
refactor(query): use trait to refactor physical plan
zhang2014 Jun 30, 2025
5b53eb0
refactor(query): use trait to refactor physical plan
zhang2014 Jun 30, 2025
b269460
refactor(query): use trait to refactor physical plan
zhang2014 Jun 30, 2025
adbf021
refactor(query): use trait to refactor physical plan
zhang2014 Jul 3, 2025
397bca7
refactor(query): use trait to refactor physical plan
zhang2014 Jul 3, 2025
8eb1139
refactor(query): use trait to refactor physical plan
zhang2014 Jul 3, 2025
d4cf0c7
refactor(query): refactor physical plan
zhang2014 Jul 3, 2025
d2dcd20
refactor(query): use trait to refactor physical plan
zhang2014 Jul 13, 2025
416ebfe
refactor(query): use trait to refactor physical plan
zhang2014 Jul 14, 2025
c5a0a94
refactor(query): use trait to refactor physical plan
zhang2014 Jul 14, 2025
c4f22fd
refactor(query): use trait to refactor physical plan
zhang2014 Jul 14, 2025
0dd0362
refactor(query): use trait to refactor physical plan
zhang2014 Jul 15, 2025
81e1b8a
refactor(query): use trait to refactor physical plan
zhang2014 Jul 15, 2025
373c80d
Merge branch 'main' of https://github.com/datafuselabs/databend into …
zhang2014 Jul 15, 2025
e8a174f
refactor(query): use trait to refactor physical plan
zhang2014 Jul 15, 2025
eb867b1
refactor(query): use trait to refactor physical plan
zhang2014 Jul 15, 2025
66045a3
refactor(query): use trait to refactor physical plan
zhang2014 Jul 15, 2025
9b19b1d
refactor(query): use trait to refactor physical plan
zhang2014 Jul 15, 2025
58186fe
refactor(query): use trait to refactor physical plan
zhang2014 Jul 15, 2025
5e6bedf
refactor(query): use trait to refactor physical plan
zhang2014 Jul 15, 2025
5bd2815
refactor(query): use trait to refactor physical plan
zhang2014 Jul 15, 2025
9f44159
refactor(query): use trait to refactor physical plan
zhang2014 Jul 16, 2025
f0b1997
refactor(query): use trait to refactor physical plan
zhang2014 Jul 24, 2025
ba7863c
Merge branch 'main' of https://github.com/datafuselabs/databend into …
zhang2014 Aug 10, 2025
d63e231
refactor(query): use trait to refactor physical plan
zhang2014 Aug 10, 2025
994d159
refactor(query): use trait to refactor physical plan
zhang2014 Aug 11, 2025
b62ee0a
refactor(query): use trait to refactor physical plan
zhang2014 Aug 12, 2025
3c9d370
refactor(query): use trait to refactor physical plan
zhang2014 Aug 12, 2025
16ecc6a
refactor(query): use trait to refactor physical plan
zhang2014 Aug 13, 2025
2ad606c
Merge branch 'main' of https://github.com/datafuselabs/databend into …
zhang2014 Aug 13, 2025
e9ad156
refactor(query): use trait to refactor physical plan
zhang2014 Aug 13, 2025
85e99d8
refactor(query): use trait to refactor physical plan
zhang2014 Aug 14, 2025
2e94ad8
refactor(query): use trait to refactor physical plan
zhang2014 Aug 14, 2025
36b9652
refactor(query): use trait to refactor physical plan
zhang2014 Aug 14, 2025
2d41447
refactor(query): use trait to refactor physical plan
zhang2014 Aug 14, 2025
84242dd
refactor(query): use trait to refactor physical plan
zhang2014 Aug 14, 2025
e10897b
refactor(query): use trait to refactor physical plan
zhang2014 Aug 14, 2025
34876c7
refactor(query): use trait to refactor physical plan
zhang2014 Aug 14, 2025
4260a37
refactor(query): use trait to refactor physical plan
zhang2014 Aug 14, 2025
bf834f8
refactor(query): use trait to refactor physical plan
zhang2014 Aug 14, 2025
d9fe2e2
refactor(query): use trait to refactor physical plan
zhang2014 Aug 14, 2025
d2b615e
refactor(query): use trait to refactor physical plan
zhang2014 Aug 14, 2025
d54900c
refactor(query): use trait to refactor physical plan
zhang2014 Aug 14, 2025
73b056b
refactor(query): use trait to refactor physical plan
zhang2014 Aug 15, 2025
dfa3898
refactor(query): use trait to refactor physical plan
zhang2014 Aug 15, 2025
85bcb0d
refactor(query): use trait to refactor physical plan
zhang2014 Aug 15, 2025
3339a95
Merge branch 'main' into refactor/physical_plan_trait
zhang2014 Aug 15, 2025
fbdce99
refactor(query): use trait to refactor physical plan
zhang2014 Aug 15, 2025
9754542
refactor(query): use trait to refactor physical plan
zhang2014 Aug 17, 2025
eea6de3
Merge branch 'main' into refactor/physical_plan_trait
zhang2014 Aug 17, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions src/query/service/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,7 @@ databend-storages-common-session = { workspace = true }
databend-storages-common-stage = { workspace = true }
databend-storages-common-table-meta = { workspace = true }
derive-visitor = { workspace = true }
dyn-clone = { workspace = true }
enum-as-inner = { workspace = true }
ethnum = { workspace = true }
fastrace = { workspace = true }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,6 @@ use databend_common_exception::Result;
use databend_common_expression::infer_table_schema;
use databend_common_meta_app::schema::UpdateStreamMetaReq;
use databend_common_pipeline_core::ExecutionInfo;
use databend_common_sql::executor::physical_plans::CopyIntoLocation;
use databend_common_sql::executor::PhysicalPlan;
use databend_storages_common_stage::CopyIntoLocationInfo;
use log::debug;
use log::info;
Expand All @@ -29,6 +27,9 @@ use crate::interpreters::common::check_deduplicate_label;
use crate::interpreters::common::dml_build_update_stream_req;
use crate::interpreters::Interpreter;
use crate::interpreters::SelectInterpreter;
use crate::physical_plans::CopyIntoLocation;
use crate::physical_plans::PhysicalPlan;
use crate::physical_plans::PhysicalPlanMeta;
use crate::pipelines::PipelineBuildResult;
use crate::schedulers::build_query_pipeline_without_render_result_set;
use crate::sessions::QueryContext;
Expand Down Expand Up @@ -89,14 +90,14 @@ impl CopyIntoLocationInterpreter {
let query_result_schema = query_interpreter.get_result_schema();
let table_schema = infer_table_schema(&query_result_schema)?;

let mut physical_plan = PhysicalPlan::CopyIntoLocation(Box::new(CopyIntoLocation {
plan_id: 0,
input: Box::new(query_physical_plan),
let mut physical_plan: PhysicalPlan = Box::new(CopyIntoLocation {
input: query_physical_plan,
project_columns: query_interpreter.get_result_columns(),
input_data_schema: query_result_schema,
input_table_schema: table_schema,
info: info.clone(),
}));
meta: PhysicalPlanMeta::new("CopyIntoLocation"),
});

let mut next_plan_id = 0;
physical_plan.adjust_plan_id(&mut next_plan_id);
Expand Down
31 changes: 16 additions & 15 deletions src/query/service/src/interpreters/interpreter_copy_into_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,14 +26,9 @@ use databend_common_expression::SendableDataBlockStream;
use databend_common_meta_app::schema::TableInfo;
use databend_common_meta_app::schema::UpdateStreamMetaReq;
use databend_common_pipeline_core::Pipeline;
use databend_common_sql::executor::physical_plans::CopyIntoTable;
use databend_common_sql::executor::physical_plans::CopyIntoTableSource;
use databend_common_sql::executor::physical_plans::Exchange;
use databend_common_sql::executor::physical_plans::FragmentKind;
use databend_common_sql::executor::physical_plans::MutationKind;
use databend_common_sql::executor::physical_plans::TableScan;
use databend_common_sql::executor::table_read_plan::ToReadDataSourcePlan;
use databend_common_sql::executor::PhysicalPlan;
use databend_common_storage::StageFileInfo;
use databend_common_storages_fuse::FuseTable;
use databend_common_storages_stage::StageTable;
Expand All @@ -46,6 +41,12 @@ use crate::interpreters::common::dml_build_update_stream_req;
use crate::interpreters::HookOperator;
use crate::interpreters::Interpreter;
use crate::interpreters::SelectInterpreter;
use crate::physical_plans::CopyIntoTable;
use crate::physical_plans::CopyIntoTableSource;
use crate::physical_plans::Exchange;
use crate::physical_plans::PhysicalPlan;
use crate::physical_plans::PhysicalPlanMeta;
use crate::physical_plans::TableScan;
use crate::pipelines::PipelineBuildResult;
use crate::pipelines::PipelineBuilder;
use crate::schedulers::build_query_pipeline_without_render_result_set;
Expand Down Expand Up @@ -113,7 +114,7 @@ impl CopyIntoTableInterpreter {

let (query_interpreter, update_stream_meta) = self.build_query(&query).await?;
update_stream_meta_reqs = update_stream_meta;
let query_physical_plan = Box::new(query_interpreter.build_physical_plan().await?);
let query_physical_plan = query_interpreter.build_physical_plan().await?;

let result_columns = query_interpreter.get_result_columns();
(
Expand All @@ -133,21 +134,20 @@ impl CopyIntoTableInterpreter {
}

(
CopyIntoTableSource::Stage(Box::new(PhysicalPlan::TableScan(TableScan {
plan_id: 0,
CopyIntoTableSource::Stage(Box::new(TableScan {
scan_id: 0,
name_mapping,
stat_info: None,
table_index: None,
internal_column: None,
source: Box::new(data_source_plan),
}))),
meta: PhysicalPlanMeta::new("TableScan"),
})),
None,
)
};

let mut root = PhysicalPlan::CopyIntoTable(Box::new(CopyIntoTable {
plan_id: 0,
let mut root: PhysicalPlan = Box::new(CopyIntoTable {
required_values_schema: plan.required_values_schema.clone(),
values_consts: plan.values_consts.clone(),
required_source_schema: plan.required_source_schema.clone(),
Expand All @@ -159,16 +159,17 @@ impl CopyIntoTableInterpreter {
source,
is_transform: plan.is_transform,
table_meta_timestamps,
}));
meta: PhysicalPlanMeta::new("CopyIntoTable"),
});

if plan.enable_distributed {
root = PhysicalPlan::Exchange(Exchange {
plan_id: 0,
input: Box::new(root),
root = Box::new(Exchange {
input: root,
kind: FragmentKind::Merge,
keys: Vec::new(),
allow_adjust_parallelism: true,
ignore_exchange: false,
meta: PhysicalPlanMeta::new("Exchange"),
});
}

Expand Down
77 changes: 51 additions & 26 deletions src/query/service/src/interpreters/interpreter_explain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,6 @@ use databend_common_pipeline_core::processors::PlanProfile;
use databend_common_pipeline_core::ExecutionInfo;
use databend_common_pipeline_core::Pipeline;
use databend_common_sql::binder::ExplainConfig;
use databend_common_sql::executor::format_partial_tree;
use databend_common_sql::executor::MutationBuildInfo;
use databend_common_sql::plans::Mutation;
use databend_common_sql::BindContext;
use databend_common_sql::ColumnSet;
Expand All @@ -55,6 +53,11 @@ use crate::interpreters::interpreter::on_execution_finished;
use crate::interpreters::interpreter_mutation::build_mutation_info;
use crate::interpreters::interpreter_mutation::MutationInterpreter;
use crate::interpreters::Interpreter;
use crate::physical_plans::FormatContext;
use crate::physical_plans::MutationBuildInfo;
use crate::physical_plans::PhysicalPlan;
use crate::physical_plans::PhysicalPlanBuilder;
use crate::physical_plans::PhysicalPlanDynExt;
use crate::pipelines::executor::ExecutorSettings;
use crate::pipelines::executor::PipelineCompleteExecutor;
use crate::pipelines::executor::PipelinePullingExecutor;
Expand All @@ -64,8 +67,6 @@ use crate::schedulers::build_query_pipeline;
use crate::schedulers::Fragmenter;
use crate::schedulers::QueryFragmentsActions;
use crate::sessions::QueryContext;
use crate::sql::executor::PhysicalPlan;
use crate::sql::executor::PhysicalPlanBuilder;
use crate::sql::optimizer::ir::SExpr;
use crate::sql::plans::Plan;

Expand Down Expand Up @@ -174,7 +175,20 @@ impl Interpreter for ExplainInterpreter {
let ctx = self.ctx.clone();
let mut builder = PhysicalPlanBuilder::new(metadata.clone(), ctx, true);
let plan = builder.build(s_expr, bind_context.column_set()).await?;
self.explain_join_order(&plan, metadata)?

let metadata = metadata.read();
let mut context = FormatContext {
profs: HashMap::new(),
metadata: &metadata,
scan_id_to_runtime_filters: HashMap::new(),
};

let formatter = plan.formatter()?;
let format_node = formatter.format_join(&mut context)?;
let result = format_node.format_pretty()?;
let line_split_result: Vec<&str> = result.lines().collect();
let formatted_plan = StringType::from_data(line_split_result);
vec![DataBlock::new_from_columns(vec![formatted_plan])]
}
_ => Err(ErrorCode::Unimplemented(
"Unsupported EXPLAIN JOIN statement",
Expand Down Expand Up @@ -360,25 +374,15 @@ impl ExplainInterpreter {
}
}

let metadata = metadata.read();
let result = plan
.format(metadata.clone(), Default::default())?
.format(&metadata, Default::default())?
.format_pretty()?;
let line_split_result: Vec<&str> = result.lines().collect();
let formatted_plan = StringType::from_data(line_split_result);
Ok(vec![DataBlock::new_from_columns(vec![formatted_plan])])
}

pub fn explain_join_order(
&self,
plan: &PhysicalPlan,
metadata: &MetadataRef,
) -> Result<Vec<DataBlock>> {
let result = plan.format_join(metadata)?.format_pretty()?;
let line_split_result: Vec<&str> = result.lines().collect();
let formatted_plan = StringType::from_data(line_split_result);
Ok(vec![DataBlock::new_from_columns(vec![formatted_plan])])
}

fn format_pipeline(build_res: &PipelineBuildResult) -> Vec<DataBlock> {
let mut blocks = Vec::with_capacity(1 + build_res.sources_pipelines.len());
// Format root pipeline
Expand Down Expand Up @@ -412,10 +416,13 @@ impl ExplainInterpreter {
.build(&s_expr, required)
.await?;

let root_fragment = Fragmenter::try_create(ctx.clone())?.build_fragment(&plan)?;
let fragments = Fragmenter::try_create(ctx.clone())?.build_fragment(&plan)?;

let mut fragments_actions = QueryFragmentsActions::create(ctx.clone());
root_fragment.get_actions(ctx, &mut fragments_actions)?;

for fragment in fragments {
fragment.get_actions(ctx.clone(), &mut fragments_actions)?;
}

let display_string = fragments_actions.display_indent(&metadata).to_string();
let line_split_result = display_string.lines().collect::<Vec<_>>();
Expand Down Expand Up @@ -479,12 +486,27 @@ impl ExplainInterpreter {
if !pruned_partitions_stats.is_empty() {
plan.set_pruning_stats(&mut pruned_partitions_stats);
}
let result = if self.partial {
format_partial_tree(&plan, metadata, &query_profiles)?.format_pretty()?
} else {
plan.format(metadata.clone(), query_profiles.clone())?
.format_pretty()?

let result = match self.partial {
true => {
let metadata = metadata.read();
let mut context = FormatContext {
profs: query_profiles.clone(),
metadata: &metadata,
scan_id_to_runtime_filters: HashMap::new(),
};

let formatter = plan.formatter()?;
let format_node = formatter.partial_format(&mut context)?;
format_node.format_pretty()?
}
false => {
let metadata = metadata.read();
plan.format(&metadata, query_profiles.clone())?
.format_pretty()?
}
};

let line_split_result: Vec<&str> = result.lines().collect();
let formatted_plan = StringType::from_data(line_split_result);
if self.graphical {
Expand Down Expand Up @@ -570,10 +592,13 @@ impl ExplainInterpreter {
mutation.metadata.clone(),
)?;
let plan = interpreter.build_physical_plan(&mutation, true).await?;
let root_fragment = Fragmenter::try_create(self.ctx.clone())?.build_fragment(&plan)?;
let fragments = Fragmenter::try_create(self.ctx.clone())?.build_fragment(&plan)?;

let mut fragments_actions = QueryFragmentsActions::create(self.ctx.clone());
root_fragment.get_actions(self.ctx.clone(), &mut fragments_actions)?;

for fragment in fragments {
fragment.get_actions(self.ctx.clone(), &mut fragments_actions)?;
}

let display_string = fragments_actions
.display_indent(&mutation.metadata)
Expand Down
2 changes: 1 addition & 1 deletion src/query/service/src/interpreters/interpreter_factory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -250,8 +250,8 @@ impl InterpreterFactory {
)?)),
Plan::ExplainAnalyze {
graphical,
partial,
plan,
partial,
} => Ok(Arc::new(ExplainInterpreter::try_create(
ctx,
*plan.clone(),
Expand Down
Loading
Loading