Skip to content

Commit ad11bf9

Browse files
committed
Improve performance of Merge for MarkDistinct
1 parent be991cc commit ad11bf9

File tree

1 file changed

+17
-6
lines changed

1 file changed

+17
-6
lines changed

core/trino-main/src/main/java/io/trino/sql/planner/QueryPlanner.java

Lines changed: 17 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -751,9 +751,9 @@ public MergeWriterNode plan(Merge merge)
751751
.process(merge.getTarget());
752752

753753
// Assign a unique id to every target table row
754-
Symbol uniqueIdSymbol = symbolAllocator.newSymbol("unique_id", BIGINT);
754+
Symbol targetUniqueIdSymbol = symbolAllocator.newSymbol("target_unique_id", BIGINT);
755755
RelationPlan planWithUniqueId = new RelationPlan(
756-
new AssignUniqueId(idAllocator.getNextId(), targetTablePlan.getRoot(), uniqueIdSymbol),
756+
new AssignUniqueId(idAllocator.getNextId(), targetTablePlan.getRoot(), targetUniqueIdSymbol),
757757
mergeAnalysis.getTargetTableScope(),
758758
targetTablePlan.getFieldMappings(),
759759
outerContext);
@@ -774,8 +774,16 @@ public MergeWriterNode plan(Merge merge)
774774
RelationPlan source = new RelationPlanner(analysis, symbolAllocator, idAllocator, lambdaDeclarationToSymbolMap, plannerContext, outerContext, session, recursiveSubqueries)
775775
.process(merge.getSource());
776776

777+
// Assign a unique id to every source table row
778+
Symbol sourceUniqueIdSymbol = symbolAllocator.newSymbol("source_unique_id", BIGINT);
779+
RelationPlan sourcePlanWithUniqueId = new RelationPlan(
780+
new AssignUniqueId(idAllocator.getNextId(), source.getRoot(), sourceUniqueIdSymbol),
781+
source.getScope(),
782+
source.getFieldMappings(),
783+
outerContext);
784+
777785
RelationPlan joinPlan = new RelationPlanner(analysis, symbolAllocator, idAllocator, lambdaDeclarationToSymbolMap, plannerContext, outerContext, session, recursiveSubqueries)
778-
.planJoin(merge.getPredicate(), Join.Type.RIGHT, mergeAnalysis.getJoinScope(), planWithPresentColumn, source, analysis.getSubqueries(merge)); // TODO: ir
786+
.planJoin(merge.getPredicate(), Join.Type.RIGHT, mergeAnalysis.getJoinScope(), planWithPresentColumn, sourcePlanWithUniqueId, analysis.getSubqueries(merge)); // TODO: ir
779787

780788
PlanBuilder subPlan = newPlanBuilder(joinPlan, analysis, lambdaDeclarationToSymbolMap, session, plannerContext);
781789

@@ -864,10 +872,10 @@ public MergeWriterNode plan(Merge merge)
864872

865873
List<io.trino.sql.tree.Expression> constraints = analysis.getCheckConstraints(mergeAnalysis.getTargetTable());
866874
if (!constraints.isEmpty()) {
867-
assignments.putIdentity(uniqueIdSymbol);
875+
assignments.putIdentity(targetUniqueIdSymbol);
868876
assignments.putIdentity(presentColumn);
869877
assignments.putIdentity(rowIdSymbol);
870-
assignments.putIdentities(source.getFieldMappings());
878+
assignments.putIdentities(sourcePlanWithUniqueId.getFieldMappings());
871879
subPlan = subPlan.withNewRoot(new ProjectNode(
872880
idAllocator.getNextId(),
873881
subPlan.getRoot(),
@@ -887,6 +895,7 @@ public MergeWriterNode plan(Merge merge)
887895
.build()),
888896
null));
889897

898+
Symbol uniqueIdSymbol = symbolAllocator.newSymbol("unique_id", BIGINT);
890899
Symbol mergeRowSymbol = symbolAllocator.newSymbol("merge_row", mergeAnalysis.getMergeRowType());
891900
Symbol caseNumberSymbol = symbolAllocator.newSymbol("case_number", INTEGER);
892901

@@ -897,7 +906,9 @@ public MergeWriterNode plan(Merge merge)
897906
Symbol symbol = planWithPresentColumn.getFieldMappings().get(fieldIndex);
898907
projectionAssignmentsBuilder.putIdentity(symbol);
899908
}
900-
projectionAssignmentsBuilder.putIdentity(uniqueIdSymbol);
909+
910+
Expression uniqueIdExpression = new Coalesce(targetUniqueIdSymbol.toSymbolReference(), sourceUniqueIdSymbol.toSymbolReference());
911+
projectionAssignmentsBuilder.put(uniqueIdSymbol, uniqueIdExpression);
901912
projectionAssignmentsBuilder.putIdentity(rowIdSymbol);
902913
projectionAssignmentsBuilder.put(mergeRowSymbol, caseExpression);
903914

0 commit comments

Comments
 (0)