Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.flink.table.planner.plan.trait.FlinkRelDistribution;
import org.apache.flink.table.planner.plan.trait.FlinkRelDistributionTraitDef;
import org.apache.flink.table.planner.plan.utils.RelExplainUtil;
import org.apache.flink.table.planner.utils.JavaScalaConversionUtil;
import org.apache.flink.table.runtime.operators.join.FlinkJoinType;
import org.apache.flink.table.runtime.operators.join.stream.keyselector.AttributeBasedJoinKeyExtractor.ConditionAttributeRef;
import org.apache.flink.table.runtime.operators.join.stream.keyselector.JoinKeyExtractor;
Expand All @@ -52,6 +53,8 @@
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;

Expand Down Expand Up @@ -161,14 +164,18 @@ public RelWriter explainTerms(final RelWriter pw) {
for (final Ord<RelNode> ord : Ord.zip(inputs)) {
pw.input("input#" + ord.i, ord.e);
}
return pw.item("joinFilter", joinFilter)
.item("joinTypes", joinTypes)
.item("joinConditions", joinConditions)
.item("joinAttributeMap", joinAttributeMap)
.itemIf("postJoinFilter", postJoinFilter, postJoinFilter != null)

return pw.item("commonJoinKey", getCommonJoinKeyFieldNames())
.item("joinTypes", formatJoinTypes())
.item("inputUniqueKeys", formatInputUniqueKeysWithFieldNames())
.itemIf("stateTtlHints", RelExplainUtil.hintsToString(hints), !hints.isEmpty())
.item("joinConditions", formatJoinConditionsWithFieldNames(pw))
.itemIf(
"postJoinFilter",
formatExpressionWithFieldNames(postJoinFilter, pw),
postJoinFilter != null)
.item("select", String.join(",", getRowType().getFieldNames()))
.item("rowType", getRowType())
.itemIf("stateTtlHints", RelExplainUtil.hintsToString(hints), !hints.isEmpty());
.item("rowType", getRowType());
}

@Override
Expand Down Expand Up @@ -216,7 +223,7 @@ private RexNode createMultiJoinCondition() {

public List<List<int[]>> getUniqueKeysForInputs() {
if (inputUniqueKeys == null) {
final List<List<int[]>> computed =
inputUniqueKeys =
inputs.stream()
.map(
input -> {
Expand All @@ -231,8 +238,7 @@ public List<List<int[]>> getUniqueKeysForInputs() {
.map(ImmutableBitSet::toArray)
.collect(Collectors.toList());
})
.collect(Collectors.toList());
inputUniqueKeys = Collections.unmodifiableList(computed);
.collect(Collectors.toUnmodifiableList());
}
return inputUniqueKeys;
}
Expand Down Expand Up @@ -274,6 +280,102 @@ public List<JoinRelType> getJoinTypes() {
return joinTypes;
}

/**
* Returns the common join key field names as a comma-separated string. Uses the field names
* from the first input to map the common join key indices.
*
* @return comma-separated string of common join key field names, or empty string if no common
* join key
*/
private String getCommonJoinKeyFieldNames() {
final int[] commonJoinKeyIndices = keyExtractor.getCommonJoinKeyIndices(0);
final RelNode firstInput = inputs.get(0);
final List<String> fieldNames = firstInput.getRowType().getFieldNames();
final List<String> commonJoinKey = new ArrayList<>();

for (final int index : commonJoinKeyIndices) {
if (index < fieldNames.size()) {
commonJoinKey.add(fieldNames.get(index));
}
}

if (commonJoinKey.isEmpty()) {
return "noCommonJoinKey";
}

return String.join(", ", commonJoinKey);
}

/**
* Formats a RexNode expression with field names for better readability in explain output.
*
* @param expression the expression to format
* @param pw the RelWriter for determining format preferences
* @return formatted expression string with field names
*/
private String formatExpressionWithFieldNames(final RexNode expression, final RelWriter pw) {
if (expression == null) {
return "";
}

return getExpressionString(
expression,
JavaScalaConversionUtil.toScala(getRowType().getFieldNames()).toList(),
JavaScalaConversionUtil.toScala(Optional.empty()),
RelExplainUtil.preferExpressionFormat(pw),
RelExplainUtil.preferExpressionDetail(pw));
}

/**
* Formats join conditions with field names for better readability in explain output.
*
* @param pw the RelWriter for determining format preferences
* @return formatted join conditions string with field names
*/
private String formatJoinConditionsWithFieldNames(final RelWriter pw) {
return joinConditions.stream()
.skip(1)
.filter(Objects::nonNull)
.map(condition -> formatExpressionWithFieldNames(condition, pw))
.collect(Collectors.joining(", "));
}

private String formatJoinTypes() {
return joinTypes.stream()
.skip(1)
.map(JoinRelType::toString)
.collect(Collectors.joining(", "));
}

private String formatInputUniqueKeysWithFieldNames() {
final List<String> inputUniqueKeyStrings = new ArrayList<>();
for (final RelNode input : inputs) {
final Set<ImmutableBitSet> uniqueKeys = getUniqueKeys(input);

if (uniqueKeys != null && !uniqueKeys.isEmpty()) {
final List<String> fieldNames = input.getRowType().getFieldNames();
final List<String> uniqueKeyStrings = new ArrayList<>();
for (final ImmutableBitSet uniqueKey : uniqueKeys) {
final List<String> keyFieldNames = new ArrayList<>();
for (final int index : uniqueKey.toArray()) {
if (index < fieldNames.size()) {
keyFieldNames.add(fieldNames.get(index));
}
}
if (!keyFieldNames.isEmpty()) {
uniqueKeyStrings.add("(" + String.join(", ", keyFieldNames) + ")");
}
}

inputUniqueKeyStrings.add(String.join(", ", uniqueKeyStrings));
} else {
inputUniqueKeyStrings.add("noUniqueKey");
}
}

return String.join(", ", inputUniqueKeyStrings);
}

/**
* This is mainly used in `FlinkChangelogModeInferenceProgram.SatisfyUpdateKindTraitVisitor`. If
* the unique key of input is a superset of the common join key, then we can ignore
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@

import scala.Enumeration;

import static org.apache.flink.table.api.config.OptimizerConfigOptions.TABLE_OPTIMIZER_MULTI_JOIN_ENABLED;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static scala.runtime.BoxedUnit.UNIT;
Expand Down Expand Up @@ -346,87 +345,6 @@ void testMultiSinkOnJoinedView() {
false);
}

@Test
void testMultiSinkOnMultiJoinedView() {
tEnv.getConfig().set(TABLE_OPTIMIZER_MULTI_JOIN_ENABLED, true);
tEnv.executeSql(
"create temporary table src1 (\n"
+ " a int,\n"
+ " b bigint,\n"
+ " c string,\n"
+ " d int,\n"
+ " primary key(a, c) not enforced\n"
+ ") with (\n"
+ " 'connector' = 'values',\n"
+ " 'changelog-mode' = 'I,UA,UB,D'\n"
+ ")");
tEnv.executeSql(
"create temporary table src2 (\n"
+ " a int,\n"
+ " b bigint,\n"
+ " c string,\n"
+ " d int,\n"
+ " primary key(a, c) not enforced\n"
+ ") with (\n"
+ " 'connector' = 'values',\n"
+ " 'changelog-mode' = 'I,UA,UB,D'\n"
+ ")");
tEnv.executeSql(
"create temporary table sink1 (\n"
+ " a int,\n"
+ " b string,\n"
+ " c bigint,\n"
+ " d bigint\n"
+ ") with (\n"
+ " 'connector' = 'values',\n"
+ " 'sink-insert-only' = 'false'\n"
+ ")");
tEnv.executeSql(
"create temporary table sink2 (\n"
+ " a int,\n"
+ " b string,\n"
+ " c bigint,\n"
+ " d string\n"
+ ") with (\n"
+ " 'connector' = 'values',\n"
+ " 'sink-insert-only' = 'false'\n"
+ ")");
tEnv.executeSql(
"create temporary view v1 as\n"
+ "select\n"
+ " t1.a as a, t1.`day` as `day`, t2.b as b, t2.c as c\n"
+ "from (\n"
+ " select a, b, DATE_FORMAT(CURRENT_TIMESTAMP, 'yyMMdd') as `day`\n"
+ " from src1\n"
+ " ) t1\n"
+ "join (\n"
+ " select b, CONCAT(c, DATE_FORMAT(CURRENT_TIMESTAMP, 'yyMMdd')) as `day`, c, d\n"
+ " from src2\n"
+ ") t2\n"
+ " on t1.a = t2.d");

StatementSet stmtSet = tEnv.createStatementSet();
stmtSet.addInsertSql(
"insert into sink1\n"
+ " select a, `day`, sum(b), count(distinct c)\n"
+ " from v1\n"
+ " group by a, `day`");
stmtSet.addInsertSql(
"insert into sink2\n"
+ " select a, `day`, b, c\n"
+ " from v1\n"
+ " where b > 100");

util.doVerifyPlan(
stmtSet,
new ExplainDetail[] {ExplainDetail.PLAN_ADVICE},
false,
new Enumeration.Value[] {PlanKind.OPT_REL_WITH_ADVICE()},
() -> UNIT,
false,
false);
}

@Test
void testCdcJoinDimWithPkOutputNoPkSinkWithoutPk() {
// from NonDeterministicDagTest#testCdcJoinDimWithPkOutputNoPkSinkWithoutPk
Expand Down
Loading