diff --git a/core/trino-main/src/main/java/io/trino/event/QueryMonitor.java b/core/trino-main/src/main/java/io/trino/event/QueryMonitor.java index b14dfeab2828..2d42542976a2 100644 --- a/core/trino-main/src/main/java/io/trino/event/QueryMonitor.java +++ b/core/trino-main/src/main/java/io/trino/event/QueryMonitor.java @@ -569,7 +569,7 @@ private Optional createQueryFailureInfo(ExecutionFailureInfo f private static Optional findFailedTask(StagesInfo stages) { - for (StageInfo stageInfo : stages.getSubStagesDeepPostOrder(stages.getOutputStageId(), true)) { + for (StageInfo stageInfo : stages.getSubStagesDeep(stages.getOutputStageId(), true)) { Optional failedTaskInfo = stageInfo.getTasks().stream() .filter(taskInfo -> taskInfo.taskStatus().getState() == TaskState.FAILED) .findFirst(); diff --git a/core/trino-main/src/main/java/io/trino/execution/StagesInfo.java b/core/trino-main/src/main/java/io/trino/execution/StagesInfo.java index 457bf0798e60..1c7775b33885 100644 --- a/core/trino-main/src/main/java/io/trino/execution/StagesInfo.java +++ b/core/trino-main/src/main/java/io/trino/execution/StagesInfo.java @@ -20,6 +20,7 @@ import com.google.common.collect.ImmutableSet; import com.google.common.collect.Maps; +import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Optional; @@ -92,13 +93,13 @@ public List getSubStages(StageId stageId) } @JsonIgnore - public List getSubStagesDeepPreOrder(StageId stageId) + public List getSubStagesDeep(StageId stageId) { - return getSubStagesDeepPreOrder(stageId, false); + return getSubStagesDeep(stageId, false); } @JsonIgnore - public List getSubStagesDeepPreOrder(StageId root, boolean includeRoot) + public List getSubStagesDeep(StageId root, boolean includeRoot) { StageInfo stageInfo = stagesById.get(root); checkArgument(stageInfo != null, "stage %s not found", root); @@ -112,7 +113,7 @@ public List getSubStagesDeepPreOrder(StageId root, boolean includeRoo return subStagesIds.build().stream().map(stagesById::get).collect(toImmutableList()); } - private void collectSubStageIdsPreOrder(StageInfo stageInfo, ImmutableSet.Builder collector) + private void collectSubStageIdsPreOrder(StageInfo stageInfo, ImmutableSet.Builder collector) { stageInfo.getSubStages().stream().forEach(subStageId -> { collector.add(subStageId); @@ -122,33 +123,29 @@ private void collectSubStageIdsPreOrder(StageInfo stageInfo, ImmutableSet.Builde } @JsonIgnore - public List getSubStagesDeepPostOrder(StageId stageId) + public List getSubStagesDeepTopological(StageId root, boolean includeRoot) { - return getSubStagesDeepPostOrder(stageId, false); + ImmutableList.Builder builder = ImmutableList.builder(); + getSubStagesDeepTopologicalInner(root, builder, new HashSet<>(), includeRoot); + + return builder.build().reverse(); } - @JsonIgnore - public List getSubStagesDeepPostOrder(StageId root, boolean includeRoot) + private void getSubStagesDeepTopologicalInner(StageId stageId, ImmutableList.Builder builder, Set visitedFragments, boolean includeRoot) { - StageInfo stageInfo = stagesById.get(root); - checkArgument(stageInfo != null, "stage %s not found", root); - - ImmutableSet.Builder subStagesIds = ImmutableSet.builder(); - collectSubStageIdsPostOrder(stageInfo, subStagesIds); - if (includeRoot) { - subStagesIds.add(root); + if (visitedFragments.contains(stageId)) { + return; } - return subStagesIds.build().stream().map(stagesById::get).collect(toImmutableList()); - } + StageInfo stageInfo = stagesById.get(stageId); - private void collectSubStageIdsPostOrder(StageInfo stageInfo, ImmutableSet.Builder collector) - { - stageInfo.getSubStages().stream().forEach(subStageId -> { - StageInfo subStage = stagesById.get(subStageId); - collectSubStageIdsPostOrder(subStage, collector); - collector.add(subStageId); - }); + for (StageId childId : stageInfo.getSubStages().reversed()) { + getSubStagesDeepTopologicalInner(childId, builder, visitedFragments, true); + } + if (includeRoot) { + builder.add(stageInfo); + } + visitedFragments.add(stageId); } public static List getAllStages(Optional stages) diff --git a/core/trino-main/src/main/java/io/trino/operator/ExplainAnalyzeOperator.java b/core/trino-main/src/main/java/io/trino/operator/ExplainAnalyzeOperator.java index 1ef690b4ef38..8455c785430e 100644 --- a/core/trino-main/src/main/java/io/trino/operator/ExplainAnalyzeOperator.java +++ b/core/trino-main/src/main/java/io/trino/operator/ExplainAnalyzeOperator.java @@ -28,7 +28,6 @@ import java.util.concurrent.TimeUnit; import static com.google.common.base.Preconditions.checkState; -import static com.google.common.collect.ImmutableList.toImmutableList; import static io.trino.spi.type.VarcharType.VARCHAR; import static io.trino.sql.planner.planprinter.PlanPrinter.textDistributedPlan; import static java.util.Objects.requireNonNull; @@ -153,15 +152,14 @@ public Page getOutput() QueryInfo queryInfo = queryPerformanceFetcher.getQueryInfo(operatorContext.getDriverContext().getTaskId().queryId()); checkState(queryInfo.getStages().isPresent(), "Stages informations is missing"); - checkState(queryInfo.getStages().get().getOutputStage().getSubStages().size() == 1, "Expected one sub stage of explain node"); + StagesInfo stagesInfo = queryInfo.getStages().get(); + checkState(stagesInfo.getOutputStage().getSubStages().size() == 1, "Expected one sub stage of explain node"); - if (!hasFinalStageInfo(queryInfo.getStages().get())) { + if (!hasFinalStageInfo(stagesInfo)) { return null; } - List stagesWithoutOutputStage = queryInfo.getStages().orElseThrow().getStages().stream() - .filter(stage -> !stage.getStageId().equals(queryInfo.getStages().orElseThrow().getOutputStageId())) - .collect(toImmutableList()); + List stagesWithoutOutputStage = stagesInfo.getSubStagesDeepTopological(stagesInfo.getOutputStageId(), false); String plan = textDistributedPlan( stagesWithoutOutputStage, @@ -194,7 +192,7 @@ private boolean hasFinalStageInfo(StagesInfo stages) private boolean isFinalStageInfo(StagesInfo stages) { - List subStages = stages.getSubStagesDeepPreOrder(operatorContext.getDriverContext().getTaskId().stageId()); + List subStages = stages.getSubStagesDeep(operatorContext.getDriverContext().getTaskId().stageId()); return subStages.stream().allMatch(StageInfo::isFinalStageInfo); } } diff --git a/core/trino-main/src/main/java/io/trino/sql/planner/planprinter/PlanPrinter.java b/core/trino-main/src/main/java/io/trino/sql/planner/planprinter/PlanPrinter.java index d9c692d73b48..b87e0662f76f 100644 --- a/core/trino-main/src/main/java/io/trino/sql/planner/planprinter/PlanPrinter.java +++ b/core/trino-main/src/main/java/io/trino/sql/planner/planprinter/PlanPrinter.java @@ -428,7 +428,7 @@ public static String textDistributedPlan( Anonymizer anonymizer, NodeVersion version) { - return textDistributedPlan(stages.getStages(), queryStats, valuePrinter, verbose, anonymizer, version); + return textDistributedPlan(stages.getSubStagesDeepTopological(stages.getOutputStageId(), true), queryStats, valuePrinter, verbose, anonymizer, version); } public static String textDistributedPlan(