diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalMultiJoin.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalMultiJoin.java index 6bbc470958dc3..4d7aa3a519fc5 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalMultiJoin.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalMultiJoin.java @@ -27,6 +27,7 @@ import org.apache.flink.table.planner.plan.trait.FlinkRelDistribution; import org.apache.flink.table.planner.plan.trait.FlinkRelDistributionTraitDef; import org.apache.flink.table.planner.plan.utils.RelExplainUtil; +import org.apache.flink.table.planner.utils.JavaScalaConversionUtil; import org.apache.flink.table.runtime.operators.join.FlinkJoinType; import org.apache.flink.table.runtime.operators.join.stream.keyselector.AttributeBasedJoinKeyExtractor.ConditionAttributeRef; import org.apache.flink.table.runtime.operators.join.stream.keyselector.JoinKeyExtractor; @@ -52,6 +53,8 @@ import java.util.Collections; import java.util.List; import java.util.Map; +import java.util.Objects; +import java.util.Optional; import java.util.Set; import java.util.stream.Collectors; @@ -161,14 +164,18 @@ public RelWriter explainTerms(final RelWriter pw) { for (final Ord ord : Ord.zip(inputs)) { pw.input("input#" + ord.i, ord.e); } - return pw.item("joinFilter", joinFilter) - .item("joinTypes", joinTypes) - .item("joinConditions", joinConditions) - .item("joinAttributeMap", joinAttributeMap) - .itemIf("postJoinFilter", postJoinFilter, postJoinFilter != null) + + return pw.item("commonJoinKey", getCommonJoinKeyFieldNames()) + .item("joinTypes", formatJoinTypes()) + .item("inputUniqueKeys", formatInputUniqueKeysWithFieldNames()) + .itemIf("stateTtlHints", RelExplainUtil.hintsToString(hints), !hints.isEmpty()) + .item("joinConditions", formatJoinConditionsWithFieldNames(pw)) + .itemIf( + "postJoinFilter", + formatExpressionWithFieldNames(postJoinFilter, pw), + postJoinFilter != null) .item("select", String.join(",", getRowType().getFieldNames())) - .item("rowType", getRowType()) - .itemIf("stateTtlHints", RelExplainUtil.hintsToString(hints), !hints.isEmpty()); + .item("rowType", getRowType()); } @Override @@ -216,7 +223,7 @@ private RexNode createMultiJoinCondition() { public List> getUniqueKeysForInputs() { if (inputUniqueKeys == null) { - final List> computed = + inputUniqueKeys = inputs.stream() .map( input -> { @@ -231,8 +238,7 @@ public List> getUniqueKeysForInputs() { .map(ImmutableBitSet::toArray) .collect(Collectors.toList()); }) - .collect(Collectors.toList()); - inputUniqueKeys = Collections.unmodifiableList(computed); + .collect(Collectors.toUnmodifiableList()); } return inputUniqueKeys; } @@ -274,6 +280,102 @@ public List getJoinTypes() { return joinTypes; } + /** + * Returns the common join key field names as a comma-separated string. Uses the field names + * from the first input to map the common join key indices. + * + * @return comma-separated string of common join key field names, or empty string if no common + * join key + */ + private String getCommonJoinKeyFieldNames() { + final int[] commonJoinKeyIndices = keyExtractor.getCommonJoinKeyIndices(0); + final RelNode firstInput = inputs.get(0); + final List fieldNames = firstInput.getRowType().getFieldNames(); + final List commonJoinKey = new ArrayList<>(); + + for (final int index : commonJoinKeyIndices) { + if (index < fieldNames.size()) { + commonJoinKey.add(fieldNames.get(index)); + } + } + + if (commonJoinKey.isEmpty()) { + return "noCommonJoinKey"; + } + + return String.join(", ", commonJoinKey); + } + + /** + * Formats a RexNode expression with field names for better readability in explain output. + * + * @param expression the expression to format + * @param pw the RelWriter for determining format preferences + * @return formatted expression string with field names + */ + private String formatExpressionWithFieldNames(final RexNode expression, final RelWriter pw) { + if (expression == null) { + return ""; + } + + return getExpressionString( + expression, + JavaScalaConversionUtil.toScala(getRowType().getFieldNames()).toList(), + JavaScalaConversionUtil.toScala(Optional.empty()), + RelExplainUtil.preferExpressionFormat(pw), + RelExplainUtil.preferExpressionDetail(pw)); + } + + /** + * Formats join conditions with field names for better readability in explain output. + * + * @param pw the RelWriter for determining format preferences + * @return formatted join conditions string with field names + */ + private String formatJoinConditionsWithFieldNames(final RelWriter pw) { + return joinConditions.stream() + .skip(1) + .filter(Objects::nonNull) + .map(condition -> formatExpressionWithFieldNames(condition, pw)) + .collect(Collectors.joining(", ")); + } + + private String formatJoinTypes() { + return joinTypes.stream() + .skip(1) + .map(JoinRelType::toString) + .collect(Collectors.joining(", ")); + } + + private String formatInputUniqueKeysWithFieldNames() { + final List inputUniqueKeyStrings = new ArrayList<>(); + for (final RelNode input : inputs) { + final Set uniqueKeys = getUniqueKeys(input); + + if (uniqueKeys != null && !uniqueKeys.isEmpty()) { + final List fieldNames = input.getRowType().getFieldNames(); + final List uniqueKeyStrings = new ArrayList<>(); + for (final ImmutableBitSet uniqueKey : uniqueKeys) { + final List keyFieldNames = new ArrayList<>(); + for (final int index : uniqueKey.toArray()) { + if (index < fieldNames.size()) { + keyFieldNames.add(fieldNames.get(index)); + } + } + if (!keyFieldNames.isEmpty()) { + uniqueKeyStrings.add("(" + String.join(", ", keyFieldNames) + ")"); + } + } + + inputUniqueKeyStrings.add(String.join(", ", uniqueKeyStrings)); + } else { + inputUniqueKeyStrings.add("noUniqueKey"); + } + } + + return String.join(", ", inputUniqueKeyStrings); + } + /** * This is mainly used in `FlinkChangelogModeInferenceProgram.SatisfyUpdateKindTraitVisitor`. If * the unique key of input is a superset of the common join key, then we can ignore diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/analyze/NonDeterministicUpdateAnalyzerTest.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/analyze/NonDeterministicUpdateAnalyzerTest.java index 99cc20b60da64..3b085ddbc1dfc 100644 --- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/analyze/NonDeterministicUpdateAnalyzerTest.java +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/analyze/NonDeterministicUpdateAnalyzerTest.java @@ -35,7 +35,6 @@ import scala.Enumeration; -import static org.apache.flink.table.api.config.OptimizerConfigOptions.TABLE_OPTIMIZER_MULTI_JOIN_ENABLED; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertThrows; import static scala.runtime.BoxedUnit.UNIT; @@ -346,87 +345,6 @@ void testMultiSinkOnJoinedView() { false); } - @Test - void testMultiSinkOnMultiJoinedView() { - tEnv.getConfig().set(TABLE_OPTIMIZER_MULTI_JOIN_ENABLED, true); - tEnv.executeSql( - "create temporary table src1 (\n" - + " a int,\n" - + " b bigint,\n" - + " c string,\n" - + " d int,\n" - + " primary key(a, c) not enforced\n" - + ") with (\n" - + " 'connector' = 'values',\n" - + " 'changelog-mode' = 'I,UA,UB,D'\n" - + ")"); - tEnv.executeSql( - "create temporary table src2 (\n" - + " a int,\n" - + " b bigint,\n" - + " c string,\n" - + " d int,\n" - + " primary key(a, c) not enforced\n" - + ") with (\n" - + " 'connector' = 'values',\n" - + " 'changelog-mode' = 'I,UA,UB,D'\n" - + ")"); - tEnv.executeSql( - "create temporary table sink1 (\n" - + " a int,\n" - + " b string,\n" - + " c bigint,\n" - + " d bigint\n" - + ") with (\n" - + " 'connector' = 'values',\n" - + " 'sink-insert-only' = 'false'\n" - + ")"); - tEnv.executeSql( - "create temporary table sink2 (\n" - + " a int,\n" - + " b string,\n" - + " c bigint,\n" - + " d string\n" - + ") with (\n" - + " 'connector' = 'values',\n" - + " 'sink-insert-only' = 'false'\n" - + ")"); - tEnv.executeSql( - "create temporary view v1 as\n" - + "select\n" - + " t1.a as a, t1.`day` as `day`, t2.b as b, t2.c as c\n" - + "from (\n" - + " select a, b, DATE_FORMAT(CURRENT_TIMESTAMP, 'yyMMdd') as `day`\n" - + " from src1\n" - + " ) t1\n" - + "join (\n" - + " select b, CONCAT(c, DATE_FORMAT(CURRENT_TIMESTAMP, 'yyMMdd')) as `day`, c, d\n" - + " from src2\n" - + ") t2\n" - + " on t1.a = t2.d"); - - StatementSet stmtSet = tEnv.createStatementSet(); - stmtSet.addInsertSql( - "insert into sink1\n" - + " select a, `day`, sum(b), count(distinct c)\n" - + " from v1\n" - + " group by a, `day`"); - stmtSet.addInsertSql( - "insert into sink2\n" - + " select a, `day`, b, c\n" - + " from v1\n" - + " where b > 100"); - - util.doVerifyPlan( - stmtSet, - new ExplainDetail[] {ExplainDetail.PLAN_ADVICE}, - false, - new Enumeration.Value[] {PlanKind.OPT_REL_WITH_ADVICE()}, - () -> UNIT, - false, - false); - } - @Test void testCdcJoinDimWithPkOutputNoPkSinkWithoutPk() { // from NonDeterministicDagTest#testCdcJoinDimWithPkOutputNoPkSinkWithoutPk diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/serde/ExecNodeMultiJoinJsonSerdeTest.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/serde/ExecNodeMultiJoinJsonSerdeTest.java deleted file mode 100644 index 14e2489a06d5a..0000000000000 --- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/serde/ExecNodeMultiJoinJsonSerdeTest.java +++ /dev/null @@ -1,171 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.table.planner.plan.nodes.exec.serde; - -import org.apache.flink.FlinkVersion; -import org.apache.flink.configuration.Configuration; -import org.apache.flink.table.planner.calcite.FlinkTypeFactory; -import org.apache.flink.table.planner.calcite.FlinkTypeSystem; -import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeGraph; -import org.apache.flink.table.planner.plan.nodes.exec.InputProperty; -import org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecMultiJoin; -import org.apache.flink.table.runtime.operators.join.FlinkJoinType; -import org.apache.flink.table.runtime.operators.join.stream.keyselector.AttributeBasedJoinKeyExtractor.ConditionAttributeRef; -import org.apache.flink.table.types.logical.IntType; -import org.apache.flink.table.types.logical.RowType; -import org.apache.flink.table.types.logical.VarCharType; - -import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode; -import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper; -import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.TextNode; - -import org.apache.calcite.rel.type.RelDataType; -import org.apache.calcite.rex.RexBuilder; -import org.apache.calcite.rex.RexInputRef; -import org.apache.calcite.rex.RexNode; -import org.apache.calcite.sql.fun.SqlStdOperatorTable; -import org.junit.jupiter.api.Test; - -import java.io.IOException; -import java.util.Arrays; -import java.util.Collections; -import java.util.HashMap; -import java.util.List; -import java.util.Map; - -import static org.assertj.core.api.Assertions.assertThat; - -class ExecNodeMultiJoinJsonSerdeTest { - private final ClassLoader classLoader = Thread.currentThread().getContextClassLoader(); - private final SerdeContext serdeContext = JsonSerdeTestUtil.configuredSerdeContext(); - - @Test - void testSerializingStreamExecMultiJoin() throws IOException { - // Create test data - final StreamExecMultiJoin execNode = createTestMultiJoinNode(); - final ExecNodeGraph graph = - new ExecNodeGraph(FlinkVersion.v2_1, Collections.singletonList(execNode)); - - // Test if we can serialize - final String serializedGraph = JsonSerdeTestUtil.toJson(serdeContext, graph); - assertThat(serializedGraph).isNotEmpty(); - - // Test if we can deserialize - final ExecNodeGraph deserializedGraph = - JsonSerdeTestUtil.toObject(serdeContext, serializedGraph, ExecNodeGraph.class); - - // Verify some general value checks on the deserialized node - assertThat(deserializedGraph.getRootNodes()).hasSize(1); - final StreamExecMultiJoin deserializedMultiJoinNode = - (StreamExecMultiJoin) deserializedGraph.getRootNodes().get(0); - assertThat(deserializedMultiJoinNode.getDescription()).isEqualTo("test-multi-join"); - assertThat(deserializedMultiJoinNode.getOutputType()) - .isEqualTo(RowType.of(VarCharType.STRING_TYPE, new IntType())); - } - - @Test - void testSerializedJsonStructure() throws IOException { - // Create test data - final StreamExecMultiJoin execNode = createTestMultiJoinNode(); - final ExecNodeGraph graph = - new ExecNodeGraph(FlinkVersion.v2_1, Collections.singletonList(execNode)); - - // Serialize to JSON - final String json = JsonSerdeTestUtil.toJson(serdeContext, graph); - final JsonNode jsonNode = new ObjectMapper().readTree(json); - - // Verify JSON structure using JsonSerdeTestUtil assertions - // Basic node structure - JsonSerdeTestUtil.assertThatJsonContains(jsonNode, "nodes", "0", "type"); - JsonSerdeTestUtil.assertThatJsonContains(jsonNode, "nodes", "0", "id"); - JsonSerdeTestUtil.assertThatJsonContains(jsonNode, "nodes", "0", "description"); - JsonSerdeTestUtil.assertThatJsonContains(jsonNode, "nodes", "0", "inputProperties"); - JsonSerdeTestUtil.assertThatJsonContains(jsonNode, "nodes", "0", "outputType"); - - // MultiJoin specific fields - JsonSerdeTestUtil.assertThatJsonContains(jsonNode, "nodes", "0", "joinTypes"); - JsonSerdeTestUtil.assertThatJsonContains(jsonNode, "nodes", "0", "joinAttributeMap"); - JsonSerdeTestUtil.assertThatJsonContains(jsonNode, "nodes", "0", "inputUniqueKeys"); - JsonSerdeTestUtil.assertThatJsonContains(jsonNode, "nodes", "0", "joinConditions"); - - // Verify specific field values - JsonNode node = jsonNode.get("nodes").get(0); - assertThat(node.get("type").asText()).isEqualTo("stream-exec-multi-join_1"); - assertThat(node.get("description").asText()).isEqualTo("test-multi-join"); - assertThat(node.get("joinTypes").isArray()).isTrue(); - assertThat(node.get("joinTypes")) - .containsExactly(new TextNode("INNER"), new TextNode("INNER")); - assertThat(node.get("joinAttributeMap").isObject()).isTrue(); - assertThat(node.get("inputUniqueKeys").isArray()).isTrue(); - assertThat(node.get("inputUniqueKeys")).hasSize(2); - assertThat(node.get("joinConditions").isArray()).isTrue(); - assertThat(node.get("joinConditions")).hasSize(2); - assertThat(node.get("inputProperties").isArray()).isTrue(); - assertThat(node.get("inputProperties")).hasSize(2); - } - - private StreamExecMultiJoin createTestMultiJoinNode() { - final FlinkTypeFactory typeFactory = - new FlinkTypeFactory(classLoader, FlinkTypeSystem.INSTANCE); - final RexBuilder builder = new RexBuilder(typeFactory); - final RelDataType varCharType = - typeFactory.createFieldTypeFromLogicalType(new VarCharType()); - - final RexNode condition = - builder.makeCall( - SqlStdOperatorTable.EQUALS, - new RexInputRef(0, varCharType), - new RexInputRef(1, varCharType)); - - final Map> joinAttributeMap = createJoinAttributeMap(); - - final var execNode = - new StreamExecMultiJoin( - new Configuration(), - Arrays.asList(FlinkJoinType.INNER, FlinkJoinType.INNER), - Arrays.asList(null, condition), - null, - joinAttributeMap, - List.of( - List.of(new int[] {0}), - List.of(new int[] {0})), // left keys for each join - Collections.emptyMap(), - Arrays.asList(InputProperty.DEFAULT, InputProperty.DEFAULT), - RowType.of(VarCharType.STRING_TYPE, new IntType()), - "test-multi-join"); - - execNode.setInputEdges(Collections.emptyList()); - return execNode; - } - - private static Map> createJoinAttributeMap() { - final Map> joinAttributeMap = new HashMap<>(); - - // Corresponds to a join between input 0 and 1 on their first fields. - final List attributesForJoinWithInput1 = - List.of(new ConditionAttributeRef(0, 0, 1, 0)); - joinAttributeMap.put(1, attributesForJoinWithInput1); // Key is the right-side input index. - - // Corresponds to a join between input 0 and 2 on their first fields. - final List attributesForJoinWithInput2 = - List.of(new ConditionAttributeRef(0, 0, 2, 0)); - joinAttributeMap.put(2, attributesForJoinWithInput2); // Key is the right-side input index. - return joinAttributeMap; - } -} diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/stream/sql/MultiJoinTest.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/stream/sql/MultiJoinTest.java index ff826fab581ad..ece24f9d95cfd 100644 --- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/stream/sql/MultiJoinTest.java +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/stream/sql/MultiJoinTest.java @@ -18,18 +18,25 @@ package org.apache.flink.table.planner.plan.stream.sql; +import org.apache.flink.table.api.ExplainDetail; +import org.apache.flink.table.api.StatementSet; import org.apache.flink.table.api.TableConfig; import org.apache.flink.table.api.config.OptimizerConfigOptions; +import org.apache.flink.table.planner.utils.PlanKind; +import org.apache.flink.table.planner.utils.StreamTableTestUtil; import org.apache.flink.table.planner.utils.TableTestBase; -import org.apache.flink.table.planner.utils.TableTestUtil; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import scala.Enumeration; + +import static scala.runtime.BoxedUnit.UNIT; + /** Tests for multi-join plans. */ public class MultiJoinTest extends TableTestBase { - private TableTestUtil util; + private StreamTableTestUtil util; @BeforeEach void setup() { @@ -553,4 +560,89 @@ void testPreservesUpsertKeyFourWayComplex() { + "JOIN AddressPK a" + " ON u.user_id = a.user_id AND a.location IS NOT NULL"); } + + @Test + void testMultiSinkOnMultiJoinedView() { + util.tableEnv() + .executeSql( + "create temporary table src1 (\n" + + " a int,\n" + + " b bigint,\n" + + " c string,\n" + + " d int,\n" + + " primary key(a, c) not enforced\n" + + ") with (\n" + + " 'connector' = 'values',\n" + + " 'changelog-mode' = 'I,UA,UB,D'\n" + + ")"); + util.tableEnv() + .executeSql( + "create temporary table src2 (\n" + + " a int,\n" + + " b bigint,\n" + + " c string,\n" + + " d int,\n" + + " primary key(a, c) not enforced\n" + + ") with (\n" + + " 'connector' = 'values',\n" + + " 'changelog-mode' = 'I,UA,UB,D'\n" + + ")"); + util.tableEnv() + .executeSql( + "create temporary table sink1 (\n" + + " a int,\n" + + " b string,\n" + + " c bigint,\n" + + " d bigint\n" + + ") with (\n" + + " 'connector' = 'values',\n" + + " 'sink-insert-only' = 'false'\n" + + ")"); + util.tableEnv() + .executeSql( + "create temporary table sink2 (\n" + + " a int,\n" + + " b string,\n" + + " c bigint,\n" + + " d string\n" + + ") with (\n" + + " 'connector' = 'values',\n" + + " 'sink-insert-only' = 'false'\n" + + ")"); + util.tableEnv() + .executeSql( + "create temporary view v1 as\n" + + "select\n" + + " t1.a as a, t1.`day` as `day`, t2.b as b, t2.c as c\n" + + "from (\n" + + " select a, b, DATE_FORMAT(CURRENT_TIMESTAMP, 'yyMMdd') as `day`\n" + + " from src1\n" + + " ) t1\n" + + "join (\n" + + " select b, CONCAT(c, DATE_FORMAT(CURRENT_TIMESTAMP, 'yyMMdd')) as `day`, c, d\n" + + " from src2\n" + + ") t2\n" + + " on t1.a = t2.d"); + + StatementSet stmtSet = util.tableEnv().createStatementSet(); + stmtSet.addInsertSql( + "insert into sink1\n" + + " select a, `day`, sum(b), count(distinct c)\n" + + " from v1\n" + + " group by a, `day`"); + stmtSet.addInsertSql( + "insert into sink2\n" + + " select a, `day`, b, c\n" + + " from v1\n" + + " where b > 100"); + + util.doVerifyPlan( + stmtSet, + new ExplainDetail[] {ExplainDetail.PLAN_ADVICE}, + false, + new Enumeration.Value[] {PlanKind.OPT_REL_WITH_ADVICE()}, + () -> UNIT, + false, + false); + } } diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/analyze/NonDeterministicUpdateAnalyzerTest.xml b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/analyze/NonDeterministicUpdateAnalyzerTest.xml index ff9aba197d08b..dc45a0b5c9dc9 100644 --- a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/analyze/NonDeterministicUpdateAnalyzerTest.xml +++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/analyze/NonDeterministicUpdateAnalyzerTest.xml @@ -115,41 +115,6 @@ source node: TableSourceScan(table=[[default_catalog, default_database, cdc_with_meta_rename, project=[a, b, c], metadata=[metadata_3]]], fields=[a, b, c, metadata_3], changelogMode=[I,UB,UA,D], upsertKeys=[[a]]) -]]> - - - - - (b, 100)]) - +- TableSourceScan(table=[[default_catalog, default_database, src2, project=[b, c, d], metadata=[]]], fields=[b, c, d]) - -advice[1]: [ADVICE] You might want to enable local-global two-phase optimization by configuring ('table.exec.mini-batch.enabled' to 'true', 'table.exec.mini-batch.allow-latency' to a positive long value, 'table.exec.mini-batch.size' to a positive long value). -advice[2]: [WARNING] The column(s): day(generated by non-deterministic function: CURRENT_TIMESTAMP ) can not satisfy the determinism requirement for correctly processing update message('UB'/'UA'/'D' in changelogMode, not 'I' only), this usually happens when input node has no upsertKey(upsertKeys=[{}]) or current node outputs non-deterministic update messages. Please consider removing these non-deterministic columns or making them deterministic by using deterministic functions. - -related rel plan: -Calc(select=[a, DATE_FORMAT(CURRENT_TIMESTAMP(), _UTF-16LE'yyMMdd') AS day], changelogMode=[I,UB,UA,D]) -+- TableSourceScan(table=[[default_catalog, default_database, src1, project=[a], metadata=[]]], fields=[a], changelogMode=[I,UB,UA,D]) - - ]]> diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/MultiJoinTest.xml b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/MultiJoinTest.xml index 8196aa986d271..f9568b3b6e553 100644 --- a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/MultiJoinTest.xml +++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/MultiJoinTest.xml @@ -35,7 +35,7 @@ LogicalProject(user_id_0=[$0], name=[$1], order_id=[$3], payment_id=[$6], locati =($2, $6), <($6, 0)))], joinTypes=[[INNER, LEFT, INNER, LEFT]], joinConditions=[[true, =($0, $4), AND(=($0, $7), OR(>=($2, $6), <($6, 0))), =($7, $9)]], joinAttributeMap=[{0=[LeftInputId:-1;LeftFieldIndex:-1;RightInputId:0;RightFieldIndex:0;, LeftInputId:-1;LeftFieldIndex:-1;RightInputId:0;RightFieldIndex:0;], 1=[LeftInputId:0;LeftFieldIndex:0;RightInputId:1;RightFieldIndex:1;], 2=[LeftInputId:0;LeftFieldIndex:0;RightInputId:2;RightFieldIndex:2;], 3=[LeftInputId:2;LeftFieldIndex:2;RightInputId:3;RightFieldIndex:1;]}], select=[user_id_0,name,cash,order_id,user_id_1,payment_id,price,user_id_2,location,user_id_3], rowType=[RecordType(VARCHAR(2147483647) user_id_0, VARCHAR(2147483647) name, INTEGER cash, VARCHAR(2147483647) order_id, VARCHAR(2147483647) user_id_1, VARCHAR(2147483647) payment_id, INTEGER price, VARCHAR(2147483647) user_id_2, VARCHAR(2147483647) location, VARCHAR(2147483647) user_id_3)]) ++- MultiJoin(commonJoinKey=[user_id_0], joinTypes=[LEFT, INNER, LEFT], inputUniqueKeys=[(user_id_0), (order_id), (payment_id), noUniqueKey], joinConditions=[(user_id_0 = user_id_1), ((user_id_0 = user_id_2) AND ((cash >= price) OR (price < 0))), (user_id_2 = user_id_3)], select=[user_id_0,name,cash,order_id,user_id_1,payment_id,price,user_id_2,location,user_id_3], rowType=[RecordType(VARCHAR(2147483647) user_id_0, VARCHAR(2147483647) name, INTEGER cash, VARCHAR(2147483647) order_id, VARCHAR(2147483647) user_id_1, VARCHAR(2147483647) payment_id, INTEGER price, VARCHAR(2147483647) user_id_2, VARCHAR(2147483647) location, VARCHAR(2147483647) user_id_3)]) :- Exchange(distribution=[hash[user_id_0]]) : +- ChangelogNormalize(key=[user_id_0]) : +- Exchange(distribution=[hash[user_id_0]]) @@ -63,7 +63,7 @@ LogicalProject(user_id_0=[$0], name=[$1], order_id=[$3], payment_id=[$6], locati == Optimized Physical Plan == Calc(select=[user_id_0, name, order_id, payment_id, location]) -+- MultiJoin(joinFilter=[AND(=($0, $7), OR(>=($2, $6), <($6, 0)))], joinTypes=[[INNER, LEFT, INNER, LEFT]], joinConditions=[[true, =($0, $4), AND(=($0, $7), OR(>=($2, $6), <($6, 0))), =($7, $9)]], joinAttributeMap=[{0=[LeftInputId:-1;LeftFieldIndex:-1;RightInputId:0;RightFieldIndex:0;, LeftInputId:-1;LeftFieldIndex:-1;RightInputId:0;RightFieldIndex:0;], 1=[LeftInputId:0;LeftFieldIndex:0;RightInputId:1;RightFieldIndex:1;], 2=[LeftInputId:0;LeftFieldIndex:0;RightInputId:2;RightFieldIndex:2;], 3=[LeftInputId:2;LeftFieldIndex:2;RightInputId:3;RightFieldIndex:1;]}], select=[user_id_0,name,cash,order_id,user_id_1,payment_id,price,user_id_2,location,user_id_3], rowType=[RecordType(VARCHAR(2147483647) user_id_0, VARCHAR(2147483647) name, INTEGER cash, VARCHAR(2147483647) order_id, VARCHAR(2147483647) user_id_1, VARCHAR(2147483647) payment_id, INTEGER price, VARCHAR(2147483647) user_id_2, VARCHAR(2147483647) location, VARCHAR(2147483647) user_id_3)]) ++- MultiJoin(commonJoinKey=[user_id_0], joinTypes=[LEFT, INNER, LEFT], inputUniqueKeys=[(user_id_0), (order_id), (payment_id), noUniqueKey], joinConditions=[=(user_id_0, user_id_1), AND(=(user_id_0, user_id_2), OR(>=(cash, price), <(price, 0))), =(user_id_2, user_id_3)], select=[user_id_0,name,cash,order_id,user_id_1,payment_id,price,user_id_2,location,user_id_3], rowType=[RecordType(VARCHAR(2147483647) user_id_0, VARCHAR(2147483647) name, INTEGER cash, VARCHAR(2147483647) order_id, VARCHAR(2147483647) user_id_1, VARCHAR(2147483647) payment_id, INTEGER price, VARCHAR(2147483647) user_id_2, VARCHAR(2147483647) location, VARCHAR(2147483647) user_id_3)]) :- Exchange(distribution=[hash[user_id_0]]) : +- ChangelogNormalize(key=[user_id_0]) : +- Exchange(distribution=[hash[user_id_0]]) @@ -77,7 +77,7 @@ Calc(select=[user_id_0, name, order_id, payment_id, location]) == Optimized Execution Plan == Calc(select=[user_id_0, name, order_id, payment_id, location]) -+- MultiJoin(joinFilter=[AND(=($0, $7), OR(>=($2, $6), <($6, 0)))], joinTypes=[[INNER, LEFT, INNER, LEFT]], joinConditions=[[true, =($0, $4), AND(=($0, $7), OR(>=($2, $6), <($6, 0))), =($7, $9)]], joinAttributeMap=[{0=[LeftInputId:-1;LeftFieldIndex:-1;RightInputId:0;RightFieldIndex:0;, LeftInputId:-1;LeftFieldIndex:-1;RightInputId:0;RightFieldIndex:0;], 1=[LeftInputId:0;LeftFieldIndex:0;RightInputId:1;RightFieldIndex:1;], 2=[LeftInputId:0;LeftFieldIndex:0;RightInputId:2;RightFieldIndex:2;], 3=[LeftInputId:2;LeftFieldIndex:2;RightInputId:3;RightFieldIndex:1;]}], select=[user_id_0,name,cash,order_id,user_id_1,payment_id,price,user_id_2,location,user_id_3], rowType=[RecordType(VARCHAR(2147483647) user_id_0, VARCHAR(2147483647) name, INTEGER cash, VARCHAR(2147483647) order_id, VARCHAR(2147483647) user_id_1, VARCHAR(2147483647) payment_id, INTEGER price, VARCHAR(2147483647) user_id_2, VARCHAR(2147483647) location, VARCHAR(2147483647) user_id_3)]) ++- MultiJoin(commonJoinKey=[user_id_0], joinTypes=[LEFT, INNER, LEFT], inputUniqueKeys=[(user_id_0), (order_id), (payment_id), noUniqueKey], joinConditions=[(user_id_0 = user_id_1), ((user_id_0 = user_id_2) AND ((cash >= price) OR (price < 0))), (user_id_2 = user_id_3)], select=[user_id_0,name,cash,order_id,user_id_1,payment_id,price,user_id_2,location,user_id_3], rowType=[RecordType(VARCHAR(2147483647) user_id_0, VARCHAR(2147483647) name, INTEGER cash, VARCHAR(2147483647) order_id, VARCHAR(2147483647) user_id_1, VARCHAR(2147483647) payment_id, INTEGER price, VARCHAR(2147483647) user_id_2, VARCHAR(2147483647) location, VARCHAR(2147483647) user_id_3)]) :- Exchange(distribution=[hash[user_id_0]]) : +- ChangelogNormalize(key=[user_id_0]) : +- Exchange(distribution=[hash[user_id_0]]) @@ -110,7 +110,7 @@ LogicalProject(user_id_0=[$0], name=[$1], order_id=[$3], payment_id=[$6], locati =($2, $6), <($6, 0)))], joinTypes=[[INNER, LEFT, INNER, LEFT]], joinConditions=[[true, =($0, $4), AND(=($0, $7), OR(>=($2, $6), <($6, 0))), =($7, $9)]], joinAttributeMap=[{0=[LeftInputId:-1;LeftFieldIndex:-1;RightInputId:0;RightFieldIndex:0;, LeftInputId:-1;LeftFieldIndex:-1;RightInputId:0;RightFieldIndex:0;], 1=[LeftInputId:0;LeftFieldIndex:0;RightInputId:1;RightFieldIndex:1;], 2=[LeftInputId:0;LeftFieldIndex:0;RightInputId:2;RightFieldIndex:2;], 3=[LeftInputId:2;LeftFieldIndex:2;RightInputId:3;RightFieldIndex:1;]}], select=[user_id_0,name,cash,order_id,user_id_1,payment_id,price,user_id_2,location,user_id_3], rowType=[RecordType(VARCHAR(2147483647) user_id_0, VARCHAR(2147483647) name, INTEGER cash, VARCHAR(2147483647) order_id, VARCHAR(2147483647) user_id_1, VARCHAR(2147483647) payment_id, INTEGER price, VARCHAR(2147483647) user_id_2, VARCHAR(2147483647) location, VARCHAR(2147483647) user_id_3)]) ++- MultiJoin(commonJoinKey=[user_id_0], joinTypes=[LEFT, INNER, LEFT], inputUniqueKeys=[(user_id_0), (order_id), (payment_id), noUniqueKey], joinConditions=[=(user_id_0, user_id_1), AND(=(user_id_0, user_id_2), OR(>=(cash, price), <(price, 0))), =(user_id_2, user_id_3)], select=[user_id_0,name,cash,order_id,user_id_1,payment_id,price,user_id_2,location,user_id_3], rowType=[RecordType(VARCHAR(2147483647) user_id_0, VARCHAR(2147483647) name, INTEGER cash, VARCHAR(2147483647) order_id, VARCHAR(2147483647) user_id_1, VARCHAR(2147483647) payment_id, INTEGER price, VARCHAR(2147483647) user_id_2, VARCHAR(2147483647) location, VARCHAR(2147483647) user_id_3)]) :- Exchange(distribution=[hash[user_id_0]]) : +- ChangelogNormalize(key=[user_id_0]) : +- Exchange(distribution=[hash[user_id_0]]) @@ -143,10 +143,10 @@ LogicalProject(user_id_0=[$0], name=[$1], order_id=[$3], payment_id=[$6], locati + + + + + (b, 100)]) + +- TableSourceScan(table=[[default_catalog, default_database, src2, project=[b, c, d], metadata=[]]], fields=[b, c, d]) + +advice[1]: [ADVICE] You might want to enable local-global two-phase optimization by configuring ('table.exec.mini-batch.enabled' to 'true', 'table.exec.mini-batch.allow-latency' to a positive long value, 'table.exec.mini-batch.size' to a positive long value). +advice[2]: [WARNING] The column(s): day(generated by non-deterministic function: CURRENT_TIMESTAMP ) can not satisfy the determinism requirement for correctly processing update message('UB'/'UA'/'D' in changelogMode, not 'I' only), this usually happens when input node has no upsertKey(upsertKeys=[{}]) or current node outputs non-deterministic update messages. Please consider removing these non-deterministic columns or making them deterministic by using deterministic functions. + +related rel plan: +Calc(select=[a, DATE_FORMAT(CURRENT_TIMESTAMP(), _UTF-16LE'yyMMdd') AS day], changelogMode=[I,UB,UA,D]) ++- TableSourceScan(table=[[default_catalog, default_database, src1, project=[a], metadata=[]]], fields=[a], changelogMode=[I,UB,UA,D]) + + ]]> @@ -272,7 +307,7 @@ LogicalSink(table=[default_catalog.default_database.sink_four_way], fields=[user ", "description" : "TableSourceScan(table=[[default_catalog, default_database, Users]], fields=[user_id_0, name])" }, { - "id" : 2, + "id" : 69, "type" : "stream-exec-exchange_1", "inputProperties" : [ { "requiredDistribution" : { @@ -40,7 +40,7 @@ "outputType" : "ROW<`user_id_0` VARCHAR(2147483647) NOT NULL, `name` VARCHAR(2147483647)>", "description" : "Exchange(distribution=[hash[user_id_0]])" }, { - "id" : 3, + "id" : 70, "type" : "stream-exec-table-source-scan_1", "scanTableSource" : { "table" : { @@ -73,7 +73,7 @@ "outputType" : "ROW<`order_id` VARCHAR(2147483647) NOT NULL, `user_id_1` VARCHAR(2147483647) NOT NULL, `product` VARCHAR(2147483647)>", "description" : "TableSourceScan(table=[[default_catalog, default_database, Orders, filter=[]]], fields=[order_id, user_id_1, product])" }, { - "id" : 4, + "id" : 71, "type" : "stream-exec-calc_1", "projection" : [ { "kind" : "INPUT_REF", @@ -105,7 +105,7 @@ "outputType" : "ROW<`order_id` VARCHAR(2147483647) NOT NULL, `user_id_1` VARCHAR(2147483647) NOT NULL>", "description" : "Calc(select=[order_id, user_id_1], where=[product IS NOT NULL])" }, { - "id" : 5, + "id" : 72, "type" : "stream-exec-exchange_1", "inputProperties" : [ { "requiredDistribution" : { @@ -118,7 +118,7 @@ "outputType" : "ROW<`order_id` VARCHAR(2147483647) NOT NULL, `user_id_1` VARCHAR(2147483647) NOT NULL>", "description" : "Exchange(distribution=[hash[user_id_1]])" }, { - "id" : 6, + "id" : 73, "type" : "stream-exec-table-source-scan_1", "scanTableSource" : { "table" : { @@ -151,7 +151,7 @@ "outputType" : "ROW<`payment_id` VARCHAR(2147483647) NOT NULL, `user_id_2` VARCHAR(2147483647) NOT NULL, `price` INT>", "description" : "TableSourceScan(table=[[default_catalog, default_database, Payments, filter=[]]], fields=[payment_id, user_id_2, price])" }, { - "id" : 7, + "id" : 74, "type" : "stream-exec-calc_1", "projection" : [ { "kind" : "INPUT_REF", @@ -187,7 +187,7 @@ "outputType" : "ROW<`payment_id` VARCHAR(2147483647) NOT NULL, `user_id_2` VARCHAR(2147483647) NOT NULL>", "description" : "Calc(select=[payment_id, user_id_2], where=[(price >= 0)])" }, { - "id" : 8, + "id" : 75, "type" : "stream-exec-exchange_1", "inputProperties" : [ { "requiredDistribution" : { @@ -200,7 +200,7 @@ "outputType" : "ROW<`payment_id` VARCHAR(2147483647) NOT NULL, `user_id_2` VARCHAR(2147483647) NOT NULL>", "description" : "Exchange(distribution=[hash[user_id_2]])" }, { - "id" : 9, + "id" : 76, "type" : "stream-exec-table-source-scan_1", "scanTableSource" : { "table" : { @@ -230,7 +230,7 @@ "outputType" : "ROW<`user_id_3` VARCHAR(2147483647) NOT NULL, `location` VARCHAR(2147483647)>", "description" : "TableSourceScan(table=[[default_catalog, default_database, Address, filter=[]]], fields=[user_id_3, location])" }, { - "id" : 10, + "id" : 77, "type" : "stream-exec-calc_1", "projection" : [ { "kind" : "INPUT_REF", @@ -262,7 +262,7 @@ "outputType" : "ROW<`user_id_3` VARCHAR(2147483647) NOT NULL, `location` VARCHAR(2147483647)>", "description" : "Calc(select=[user_id_3, location], where=[location IS NOT NULL])" }, { - "id" : 11, + "id" : 78, "type" : "stream-exec-exchange_1", "inputProperties" : [ { "requiredDistribution" : { @@ -275,7 +275,7 @@ "outputType" : "ROW<`user_id_3` VARCHAR(2147483647) NOT NULL, `location` VARCHAR(2147483647)>", "description" : "Exchange(distribution=[hash[user_id_3]])" }, { - "id" : 12, + "id" : 79, "type" : "stream-exec-multi-join_1", "joinTypes" : [ "INNER", "INNER", "INNER", "INNER" ], "joinConditions" : [ { @@ -458,9 +458,9 @@ "priority" : 3 } ], "outputType" : "ROW<`user_id_0` VARCHAR(2147483647) NOT NULL, `name` VARCHAR(2147483647), `order_id` VARCHAR(2147483647) NOT NULL, `user_id_1` VARCHAR(2147483647) NOT NULL, `payment_id` VARCHAR(2147483647) NOT NULL, `user_id_2` VARCHAR(2147483647) NOT NULL, `user_id_3` VARCHAR(2147483647) NOT NULL, `location` VARCHAR(2147483647)>", - "description" : "MultiJoin(joinFilter=[AND(=($0, $6), =($0, $5), =($0, $3))], joinTypes=[[INNER, INNER, INNER, INNER]], joinConditions=[[true, =($0, $3), =($0, $5), =($0, $6)]], joinAttributeMap=[{0=[LeftInputId:-1;LeftFieldIndex:-1;RightInputId:0;RightFieldIndex:0;, LeftInputId:-1;LeftFieldIndex:-1;RightInputId:0;RightFieldIndex:0;, LeftInputId:-1;LeftFieldIndex:-1;RightInputId:0;RightFieldIndex:0;], 1=[LeftInputId:0;LeftFieldIndex:0;RightInputId:1;RightFieldIndex:1;], 2=[LeftInputId:0;LeftFieldIndex:0;RightInputId:2;RightFieldIndex:1;], 3=[LeftInputId:0;LeftFieldIndex:0;RightInputId:3;RightFieldIndex:0;]}], select=[user_id_0,name,order_id,user_id_1,payment_id,user_id_2,user_id_3,location], rowType=[RecordType(VARCHAR(2147483647) user_id_0, VARCHAR(2147483647) name, VARCHAR(2147483647) order_id, VARCHAR(2147483647) user_id_1, VARCHAR(2147483647) payment_id, VARCHAR(2147483647) user_id_2, VARCHAR(2147483647) user_id_3, VARCHAR(2147483647) location)])" + "description" : "MultiJoin(commonJoinKey=[user_id_0], joinTypes=[INNER, INNER, INNER], inputUniqueKeys=[(user_id_0), (order_id, user_id_1), (payment_id, user_id_2), (user_id_3)], joinConditions=[(user_id_0 = user_id_1), (user_id_0 = user_id_2), (user_id_0 = user_id_3)], select=[user_id_0,name,order_id,user_id_1,payment_id,user_id_2,user_id_3,location], rowType=[RecordType(VARCHAR(2147483647) user_id_0, VARCHAR(2147483647) name, VARCHAR(2147483647) order_id, VARCHAR(2147483647) user_id_1, VARCHAR(2147483647) payment_id, VARCHAR(2147483647) user_id_2, VARCHAR(2147483647) user_id_3, VARCHAR(2147483647) location)])" }, { - "id" : 13, + "id" : 80, "type" : "stream-exec-calc_1", "projection" : [ { "kind" : "INPUT_REF", @@ -502,7 +502,7 @@ "outputType" : "ROW<`user_id_0` VARCHAR(2147483647) NOT NULL, `order_id` VARCHAR(2147483647) NOT NULL, `user_id_1` VARCHAR(2147483647) NOT NULL, `payment_id` VARCHAR(2147483647) NOT NULL, `user_id_2` VARCHAR(2147483647) NOT NULL, `name` VARCHAR(2147483647), `location` VARCHAR(2147483647)>", "description" : "Calc(select=[user_id_0, order_id, user_id_1, payment_id, user_id_2, name, location])" }, { - "id" : 14, + "id" : 81, "type" : "stream-exec-sink_1", "configuration" : { "table.exec.sink.keyed-shuffle" : "AUTO", @@ -560,92 +560,92 @@ "description" : "Sink(table=[default_catalog.default_database.sink], fields=[user_id_0, order_id, user_id_1, payment_id, user_id_2, name, location])" } ], "edges" : [ { - "source" : 1, - "target" : 2, + "source" : 68, + "target" : 69, "shuffle" : { "type" : "FORWARD" }, "shuffleMode" : "PIPELINED" }, { - "source" : 3, - "target" : 4, + "source" : 70, + "target" : 71, "shuffle" : { "type" : "FORWARD" }, "shuffleMode" : "PIPELINED" }, { - "source" : 4, - "target" : 5, + "source" : 71, + "target" : 72, "shuffle" : { "type" : "FORWARD" }, "shuffleMode" : "PIPELINED" }, { - "source" : 6, - "target" : 7, + "source" : 73, + "target" : 74, "shuffle" : { "type" : "FORWARD" }, "shuffleMode" : "PIPELINED" }, { - "source" : 7, - "target" : 8, + "source" : 74, + "target" : 75, "shuffle" : { "type" : "FORWARD" }, "shuffleMode" : "PIPELINED" }, { - "source" : 9, - "target" : 10, + "source" : 76, + "target" : 77, "shuffle" : { "type" : "FORWARD" }, "shuffleMode" : "PIPELINED" }, { - "source" : 10, - "target" : 11, + "source" : 77, + "target" : 78, "shuffle" : { "type" : "FORWARD" }, "shuffleMode" : "PIPELINED" }, { - "source" : 2, - "target" : 12, + "source" : 69, + "target" : 79, "shuffle" : { "type" : "FORWARD" }, "shuffleMode" : "PIPELINED" }, { - "source" : 5, - "target" : 12, + "source" : 72, + "target" : 79, "shuffle" : { "type" : "FORWARD" }, "shuffleMode" : "PIPELINED" }, { - "source" : 8, - "target" : 12, + "source" : 75, + "target" : 79, "shuffle" : { "type" : "FORWARD" }, "shuffleMode" : "PIPELINED" }, { - "source" : 11, - "target" : 12, + "source" : 78, + "target" : 79, "shuffle" : { "type" : "FORWARD" }, "shuffleMode" : "PIPELINED" }, { - "source" : 12, - "target" : 13, + "source" : 79, + "target" : 80, "shuffle" : { "type" : "FORWARD" }, "shuffleMode" : "PIPELINED" }, { - "source" : 13, - "target" : 14, + "source" : 80, + "target" : 81, "shuffle" : { "type" : "FORWARD" }, diff --git a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-multi-join_1/four-way-complex-preserves-upsert-key-with-restore/savepoint/_metadata b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-multi-join_1/four-way-complex-preserves-upsert-key-with-restore/savepoint/_metadata index 5005f767c1d4b..742b3e9fc4bbb 100644 Binary files a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-multi-join_1/four-way-complex-preserves-upsert-key-with-restore/savepoint/_metadata and b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-multi-join_1/four-way-complex-preserves-upsert-key-with-restore/savepoint/_metadata differ diff --git a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-multi-join_1/four-way-complex-updating-join-with-restore/plan/four-way-complex-updating-join-with-restore.json b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-multi-join_1/four-way-complex-updating-join-with-restore/plan/four-way-complex-updating-join-with-restore.json index 1976c12bf927d..7da2dc1cdf64c 100644 --- a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-multi-join_1/four-way-complex-updating-join-with-restore/plan/four-way-complex-updating-join-with-restore.json +++ b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-multi-join_1/four-way-complex-updating-join-with-restore/plan/four-way-complex-updating-join-with-restore.json @@ -380,6 +380,17 @@ "type" : "BOOLEAN" }, "joinAttributeMap" : { + "0" : [ { + "leftInputId" : -1, + "leftFieldIndex" : -1, + "rightInputId" : 0, + "rightFieldIndex" : 1 + }, { + "leftInputId" : -1, + "leftFieldIndex" : -1, + "rightInputId" : 0, + "rightFieldIndex" : 1 + } ], "1" : [ { "leftInputId" : 0, "leftFieldIndex" : 1, @@ -447,7 +458,7 @@ "priority" : 3 } ], "outputType" : "ROW<`name` VARCHAR(2147483647), `user_id_0` VARCHAR(2147483647) NOT NULL, `cash` INT, `order_id` VARCHAR(2147483647), `user_id_1` VARCHAR(2147483647), `user_id_2` VARCHAR(2147483647), `payment_id` VARCHAR(2147483647) NOT NULL, `price` INT, `location` VARCHAR(2147483647), `user_id_3` VARCHAR(2147483647)>", - "description" : "MultiJoin(joinFilter=[AND(=($1, $5), OR(>=($2, $7), <($7, 0)))], joinTypes=[[INNER, LEFT, INNER, LEFT]], joinConditions=[[true, =($1, $4), AND(=($1, $5), OR(>=($2, $7), <($7, 0))), =($5, $9)]], joinAttributeMap=[{1=[LeftInputId:0;LeftFieldIndex:1;RightInputId:1;RightFieldIndex:1;], 2=[LeftInputId:0;LeftFieldIndex:1;RightInputId:2;RightFieldIndex:0;], 3=[LeftInputId:2;LeftFieldIndex:0;RightInputId:3;RightFieldIndex:1;]}], select=[name,user_id_0,cash,order_id,user_id_1,user_id_2,payment_id,price,location,user_id_3], rowType=[RecordType(VARCHAR(2147483647) name, VARCHAR(2147483647) user_id_0, INTEGER cash, VARCHAR(2147483647) order_id, VARCHAR(2147483647) user_id_1, VARCHAR(2147483647) user_id_2, VARCHAR(2147483647) payment_id, INTEGER price, VARCHAR(2147483647) location, VARCHAR(2147483647) user_id_3)])" + "description" : "MultiJoin(commonJoinKey=[user_id_0], joinTypes=[LEFT, INNER, LEFT], inputUniqueKeys=[(user_id_0), (order_id), (payment_id), noUniqueKey], joinConditions=[(user_id_0 = user_id_1), ((user_id_0 = user_id_2) AND ((cash >= price) OR (price < 0))), (user_id_2 = user_id_3)], select=[name,user_id_0,cash,order_id,user_id_1,user_id_2,payment_id,price,location,user_id_3], rowType=[RecordType(VARCHAR(2147483647) name, VARCHAR(2147483647) user_id_0, INTEGER cash, VARCHAR(2147483647) order_id, VARCHAR(2147483647) user_id_1, VARCHAR(2147483647) user_id_2, VARCHAR(2147483647) payment_id, INTEGER price, VARCHAR(2147483647) location, VARCHAR(2147483647) user_id_3)])" }, { "id" : 30, "type" : "stream-exec-calc_1", diff --git a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-multi-join_1/four-way-complex-updating-join-with-restore/savepoint/_metadata b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-multi-join_1/four-way-complex-updating-join-with-restore/savepoint/_metadata index d7a9f9d013732..726814add3e0e 100644 Binary files a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-multi-join_1/four-way-complex-updating-join-with-restore/savepoint/_metadata and b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-multi-join_1/four-way-complex-updating-join-with-restore/savepoint/_metadata differ diff --git a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-multi-join_1/four-way-join-no-common-join-key-with-restore/plan/four-way-join-no-common-join-key-with-restore.json b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-multi-join_1/four-way-join-no-common-join-key-with-restore/plan/four-way-join-no-common-join-key-with-restore.json index 596942d5eea74..2b2930725b8b0 100644 --- a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-multi-join_1/four-way-join-no-common-join-key-with-restore/plan/four-way-join-no-common-join-key-with-restore.json +++ b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-multi-join_1/four-way-join-no-common-join-key-with-restore/plan/four-way-join-no-common-join-key-with-restore.json @@ -262,6 +262,17 @@ "type" : "BOOLEAN" }, "joinAttributeMap" : { + "0" : [ { + "leftInputId" : -1, + "leftFieldIndex" : -1, + "rightInputId" : 0, + "rightFieldIndex" : 1 + }, { + "leftInputId" : -1, + "leftFieldIndex" : -1, + "rightInputId" : 0, + "rightFieldIndex" : 1 + } ], "1" : [ { "leftInputId" : 0, "leftFieldIndex" : 1, @@ -312,7 +323,7 @@ "priority" : 2 } ], "outputType" : "ROW<`name` VARCHAR(2147483647), `user_id_0` VARCHAR(2147483647) NOT NULL, `order_id` VARCHAR(2147483647), `user_id_1` VARCHAR(2147483647), `user_id_2` VARCHAR(2147483647), `payment_id` VARCHAR(2147483647) NOT NULL>", - "description" : "MultiJoin(joinFilter=[=($1, $4)], joinTypes=[[INNER, LEFT, INNER]], joinConditions=[[true, =($1, $3), =($1, $4)]], joinAttributeMap=[{1=[LeftInputId:0;LeftFieldIndex:1;RightInputId:1;RightFieldIndex:1;], 2=[LeftInputId:0;LeftFieldIndex:1;RightInputId:2;RightFieldIndex:0;]}], select=[name,user_id_0,order_id,user_id_1,user_id_2,payment_id], rowType=[RecordType(VARCHAR(2147483647) name, VARCHAR(2147483647) user_id_0, VARCHAR(2147483647) order_id, VARCHAR(2147483647) user_id_1, VARCHAR(2147483647) user_id_2, VARCHAR(2147483647) payment_id)])" + "description" : "MultiJoin(commonJoinKey=[user_id_0], joinTypes=[LEFT, INNER], inputUniqueKeys=[(user_id_0), (order_id), (payment_id)], joinConditions=[(user_id_0 = user_id_1), (user_id_0 = user_id_2)], select=[name,user_id_0,order_id,user_id_1,user_id_2,payment_id], rowType=[RecordType(VARCHAR(2147483647) name, VARCHAR(2147483647) user_id_0, VARCHAR(2147483647) order_id, VARCHAR(2147483647) user_id_1, VARCHAR(2147483647) user_id_2, VARCHAR(2147483647) payment_id)])" }, { "id" : 41, "type" : "stream-exec-calc_1", @@ -429,6 +440,12 @@ "type" : "BOOLEAN" }, "joinAttributeMap" : { + "0" : [ { + "leftInputId" : -1, + "leftFieldIndex" : -1, + "rightInputId" : 0, + "rightFieldIndex" : 3 + } ], "1" : [ { "leftInputId" : 0, "leftFieldIndex" : 3, @@ -462,7 +479,7 @@ "priority" : 1 } ], "outputType" : "ROW<`name` VARCHAR(2147483647), `user_id_0` VARCHAR(2147483647) NOT NULL, `order_id` VARCHAR(2147483647), `payment_id` VARCHAR(2147483647) NOT NULL, `location` VARCHAR(2147483647), `user_id_3` VARCHAR(2147483647)>", - "description" : "MultiJoin(joinFilter=[true], joinTypes=[[INNER, LEFT]], joinConditions=[[true, =($3, $5)]], joinAttributeMap=[{1=[LeftInputId:0;LeftFieldIndex:3;RightInputId:1;RightFieldIndex:1;]}], select=[name,user_id_0,order_id,payment_id,location,user_id_3], rowType=[RecordType(VARCHAR(2147483647) name, VARCHAR(2147483647) user_id_0, VARCHAR(2147483647) order_id, VARCHAR(2147483647) payment_id, VARCHAR(2147483647) location, VARCHAR(2147483647) user_id_3)])" + "description" : "MultiJoin(commonJoinKey=[payment_id], joinTypes=[LEFT], inputUniqueKeys=[noUniqueKey, noUniqueKey], joinConditions=[(payment_id = user_id_3)], select=[name,user_id_0,order_id,payment_id,location,user_id_3], rowType=[RecordType(VARCHAR(2147483647) name, VARCHAR(2147483647) user_id_0, VARCHAR(2147483647) order_id, VARCHAR(2147483647) payment_id, VARCHAR(2147483647) location, VARCHAR(2147483647) user_id_3)])" }, { "id" : 46, "type" : "stream-exec-calc_1", diff --git a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-multi-join_1/four-way-join-no-common-join-key-with-restore/savepoint/_metadata b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-multi-join_1/four-way-join-no-common-join-key-with-restore/savepoint/_metadata index 60cec49936607..e9f406aa17531 100644 Binary files a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-multi-join_1/four-way-join-no-common-join-key-with-restore/savepoint/_metadata and b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-multi-join_1/four-way-join-no-common-join-key-with-restore/savepoint/_metadata differ diff --git a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-multi-join_1/three-way-inner-join-with-restore/plan/three-way-inner-join-with-restore.json b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-multi-join_1/three-way-inner-join-with-restore/plan/three-way-inner-join-with-restore.json index 8c6259fcd1e19..4b00a7d9410d2 100644 --- a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-multi-join_1/three-way-inner-join-with-restore/plan/three-way-inner-join-with-restore.json +++ b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-multi-join_1/three-way-inner-join-with-restore/plan/three-way-inner-join-with-restore.json @@ -175,6 +175,17 @@ "type" : "BOOLEAN" }, "joinAttributeMap" : { + "0" : [ { + "leftInputId" : -1, + "leftFieldIndex" : -1, + "rightInputId" : 0, + "rightFieldIndex" : 0 + }, { + "leftInputId" : -1, + "leftFieldIndex" : -1, + "rightInputId" : 0, + "rightFieldIndex" : 0 + } ], "1" : [ { "leftInputId" : 0, "leftFieldIndex" : 0, @@ -225,7 +236,7 @@ "priority" : 2 } ], "outputType" : "ROW<`user_id` VARCHAR(2147483647), `name` VARCHAR(2147483647), `user_id0` VARCHAR(2147483647), `order_id` VARCHAR(2147483647), `user_id1` VARCHAR(2147483647), `payment_id` VARCHAR(2147483647)>", - "description" : "MultiJoin(joinFilter=[AND(=($0, $4), =($0, $2))], joinTypes=[[INNER, INNER, INNER]], joinConditions=[[true, =($0, $2), =($0, $4)]], joinAttributeMap=[{1=[LeftInputId:0;LeftFieldIndex:0;RightInputId:1;RightFieldIndex:0;], 2=[LeftInputId:0;LeftFieldIndex:0;RightInputId:2;RightFieldIndex:0;]}], select=[user_id,name,user_id0,order_id,user_id1,payment_id], rowType=[RecordType(VARCHAR(2147483647) user_id, VARCHAR(2147483647) name, VARCHAR(2147483647) user_id0, VARCHAR(2147483647) order_id, VARCHAR(2147483647) user_id1, VARCHAR(2147483647) payment_id)])" + "description" : "MultiJoin(commonJoinKey=[user_id], joinTypes=[INNER, INNER], inputUniqueKeys=[noUniqueKey, noUniqueKey, noUniqueKey], joinConditions=[(user_id = user_id0), (user_id = user_id1)], select=[user_id,name,user_id0,order_id,user_id1,payment_id], rowType=[RecordType(VARCHAR(2147483647) user_id, VARCHAR(2147483647) name, VARCHAR(2147483647) user_id0, VARCHAR(2147483647) order_id, VARCHAR(2147483647) user_id1, VARCHAR(2147483647) payment_id)])" }, { "id" : 8, "type" : "stream-exec-calc_1", diff --git a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-multi-join_1/three-way-inner-join-with-restore/savepoint/_metadata b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-multi-join_1/three-way-inner-join-with-restore/savepoint/_metadata index 7004ae3c1cb2f..e2234d5cadeef 100644 Binary files a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-multi-join_1/three-way-inner-join-with-restore/savepoint/_metadata and b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-multi-join_1/three-way-inner-join-with-restore/savepoint/_metadata differ diff --git a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-multi-join_1/three-way-inner-join-with-ttl-hints-with-restore/plan/three-way-inner-join-with-ttl-hints-with-restore.json b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-multi-join_1/three-way-inner-join-with-ttl-hints-with-restore/plan/three-way-inner-join-with-ttl-hints-with-restore.json index 4c85b95a98ee9..f44731007c4b5 100644 --- a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-multi-join_1/three-way-inner-join-with-ttl-hints-with-restore/plan/three-way-inner-join-with-ttl-hints-with-restore.json +++ b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-multi-join_1/three-way-inner-join-with-ttl-hints-with-restore/plan/three-way-inner-join-with-ttl-hints-with-restore.json @@ -236,7 +236,7 @@ "priority" : 2 } ], "outputType" : "ROW<`user_id` VARCHAR(2147483647), `name` VARCHAR(2147483647), `user_id0` VARCHAR(2147483647), `order_id` VARCHAR(2147483647), `user_id1` VARCHAR(2147483647), `payment_id` VARCHAR(2147483647)>", - "description" : "MultiJoin(joinFilter=[AND(=($0, $4), =($0, $2))], joinTypes=[[INNER, INNER, INNER]], joinConditions=[[true, =($0, $2), =($0, $4)]], joinAttributeMap=[{0=[LeftInputId:-1;LeftFieldIndex:-1;RightInputId:0;RightFieldIndex:0;, LeftInputId:-1;LeftFieldIndex:-1;RightInputId:0;RightFieldIndex:0;], 1=[LeftInputId:0;LeftFieldIndex:0;RightInputId:1;RightFieldIndex:0;], 2=[LeftInputId:0;LeftFieldIndex:0;RightInputId:2;RightFieldIndex:0;]}], select=[user_id,name,user_id0,order_id,user_id1,payment_id], rowType=[RecordType(VARCHAR(2147483647) user_id, VARCHAR(2147483647) name, VARCHAR(2147483647) user_id0, VARCHAR(2147483647) order_id, VARCHAR(2147483647) user_id1, VARCHAR(2147483647) payment_id)], stateTtlHints=[[[STATE_TTL options:[1d, 0s, 2d]]]])" + "description" : "MultiJoin(commonJoinKey=[user_id], joinTypes=[INNER, INNER], inputUniqueKeys=[noUniqueKey, noUniqueKey, noUniqueKey], stateTtlHints=[[[STATE_TTL options:[1d, 0s, 2d]]]], joinConditions=[(user_id = user_id0), (user_id = user_id1)], select=[user_id,name,user_id0,order_id,user_id1,payment_id], rowType=[RecordType(VARCHAR(2147483647) user_id, VARCHAR(2147483647) name, VARCHAR(2147483647) user_id0, VARCHAR(2147483647) order_id, VARCHAR(2147483647) user_id1, VARCHAR(2147483647) payment_id)])" }, { "id" : 66, "type" : "stream-exec-calc_1", diff --git a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-multi-join_1/three-way-inner-join-with-ttl-hints-with-restore/savepoint/_metadata b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-multi-join_1/three-way-inner-join-with-ttl-hints-with-restore/savepoint/_metadata index 390abae01b99b..a9143ca6973fc 100644 Binary files a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-multi-join_1/three-way-inner-join-with-ttl-hints-with-restore/savepoint/_metadata and b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-multi-join_1/three-way-inner-join-with-ttl-hints-with-restore/savepoint/_metadata differ diff --git a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-multi-join_1/three-way-join-with-time-attributes-with-restore/plan/three-way-join-with-time-attributes-with-restore.json b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-multi-join_1/three-way-join-with-time-attributes-with-restore/plan/three-way-join-with-time-attributes-with-restore.json index fd59b7964255b..dd64efa2ed9a9 100644 --- a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-multi-join_1/three-way-join-with-time-attributes-with-restore/plan/three-way-join-with-time-attributes-with-restore.json +++ b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-multi-join_1/three-way-join-with-time-attributes-with-restore/plan/three-way-join-with-time-attributes-with-restore.json @@ -307,6 +307,17 @@ "type" : "BOOLEAN" }, "joinAttributeMap" : { + "0" : [ { + "leftInputId" : -1, + "leftFieldIndex" : -1, + "rightInputId" : 0, + "rightFieldIndex" : 0 + }, { + "leftInputId" : -1, + "leftFieldIndex" : -1, + "rightInputId" : 0, + "rightFieldIndex" : 0 + } ], "1" : [ { "leftInputId" : 0, "leftFieldIndex" : 0, @@ -357,7 +368,7 @@ "priority" : 2 } ], "outputType" : "ROW<`user_id_0` VARCHAR(2147483647) NOT NULL, `name` VARCHAR(2147483647), `order_id` VARCHAR(2147483647) NOT NULL, `user_id_1` VARCHAR(2147483647), `rowtime` TIMESTAMP(3), `price` INT, `user_id_2` VARCHAR(2147483647)>", - "description" : "MultiJoin(joinFilter=[AND(=($0, $6), =($0, $3))], joinTypes=[[INNER, INNER, INNER]], joinConditions=[[true, =($0, $3), =($0, $6)]], joinAttributeMap=[{1=[LeftInputId:0;LeftFieldIndex:0;RightInputId:1;RightFieldIndex:1;], 2=[LeftInputId:0;LeftFieldIndex:0;RightInputId:2;RightFieldIndex:1;]}], select=[user_id_0,name,order_id,user_id_1,rowtime,price,user_id_2], rowType=[RecordType(VARCHAR(2147483647) user_id_0, VARCHAR(2147483647) name, VARCHAR(2147483647) order_id, VARCHAR(2147483647) user_id_1, TIMESTAMP(3) rowtime, INTEGER price, VARCHAR(2147483647) user_id_2)])" + "description" : "MultiJoin(commonJoinKey=[user_id_0], joinTypes=[INNER, INNER], inputUniqueKeys=[(user_id_0), (order_id), noUniqueKey], joinConditions=[(user_id_0 = user_id_1), (user_id_0 = user_id_2)], select=[user_id_0,name,order_id,user_id_1,rowtime,price,user_id_2], rowType=[RecordType(VARCHAR(2147483647) user_id_0, VARCHAR(2147483647) name, VARCHAR(2147483647) order_id, VARCHAR(2147483647) user_id_1, TIMESTAMP(3) rowtime, INTEGER price, VARCHAR(2147483647) user_id_2)])" }, { "id" : 57, "type" : "stream-exec-calc_1", diff --git a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-multi-join_1/three-way-join-with-time-attributes-with-restore/savepoint/_metadata b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-multi-join_1/three-way-join-with-time-attributes-with-restore/savepoint/_metadata index 5faf4cbb4998b..6f757f0178ce4 100644 Binary files a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-multi-join_1/three-way-join-with-time-attributes-with-restore/savepoint/_metadata and b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-multi-join_1/three-way-join-with-time-attributes-with-restore/savepoint/_metadata differ diff --git a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-multi-join_1/three-way-left-outer-join-with-restore/plan/three-way-left-outer-join-with-restore.json b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-multi-join_1/three-way-left-outer-join-with-restore/plan/three-way-left-outer-join-with-restore.json index 4c43ee089d560..4684d76de0499 100644 --- a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-multi-join_1/three-way-left-outer-join-with-restore/plan/three-way-left-outer-join-with-restore.json +++ b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-multi-join_1/three-way-left-outer-join-with-restore/plan/three-way-left-outer-join-with-restore.json @@ -175,6 +175,17 @@ "type" : "BOOLEAN" }, "joinAttributeMap" : { + "0" : [ { + "leftInputId" : -1, + "leftFieldIndex" : -1, + "rightInputId" : 0, + "rightFieldIndex" : 0 + }, { + "leftInputId" : -1, + "leftFieldIndex" : -1, + "rightInputId" : 0, + "rightFieldIndex" : 0 + } ], "1" : [ { "leftInputId" : 0, "leftFieldIndex" : 0, @@ -225,7 +236,7 @@ "priority" : 2 } ], "outputType" : "ROW<`user_id` VARCHAR(2147483647), `name` VARCHAR(2147483647), `user_id0` VARCHAR(2147483647), `order_id` VARCHAR(2147483647), `user_id1` VARCHAR(2147483647), `payment_id` VARCHAR(2147483647)>", - "description" : "MultiJoin(joinFilter=[true], joinTypes=[[INNER, LEFT, LEFT]], joinConditions=[[true, =($0, $2), =($0, $4)]], joinAttributeMap=[{1=[LeftInputId:0;LeftFieldIndex:0;RightInputId:1;RightFieldIndex:0;], 2=[LeftInputId:0;LeftFieldIndex:0;RightInputId:2;RightFieldIndex:0;]}], select=[user_id,name,user_id0,order_id,user_id1,payment_id], rowType=[RecordType(VARCHAR(2147483647) user_id, VARCHAR(2147483647) name, VARCHAR(2147483647) user_id0, VARCHAR(2147483647) order_id, VARCHAR(2147483647) user_id1, VARCHAR(2147483647) payment_id)])" + "description" : "MultiJoin(commonJoinKey=[user_id], joinTypes=[LEFT, LEFT], inputUniqueKeys=[noUniqueKey, noUniqueKey, noUniqueKey], joinConditions=[(user_id = user_id0), (user_id = user_id1)], select=[user_id,name,user_id0,order_id,user_id1,payment_id], rowType=[RecordType(VARCHAR(2147483647) user_id, VARCHAR(2147483647) name, VARCHAR(2147483647) user_id0, VARCHAR(2147483647) order_id, VARCHAR(2147483647) user_id1, VARCHAR(2147483647) payment_id)])" }, { "id" : 17, "type" : "stream-exec-calc_1", diff --git a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-multi-join_1/three-way-left-outer-join-with-restore/savepoint/_metadata b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-multi-join_1/three-way-left-outer-join-with-restore/savepoint/_metadata index 585903d812857..ce1481c26cb40 100644 Binary files a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-multi-join_1/three-way-left-outer-join-with-restore/savepoint/_metadata and b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-multi-join_1/three-way-left-outer-join-with-restore/savepoint/_metadata differ diff --git a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-multi-join_1/three-way-upsert-preserves-key-with-restore/plan/three-way-upsert-preserves-key-with-restore.json b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-multi-join_1/three-way-upsert-preserves-key-with-restore/plan/three-way-upsert-preserves-key-with-restore.json index 7d7e110abe624..45bbdbcb0ec0f 100644 --- a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-multi-join_1/three-way-upsert-preserves-key-with-restore/plan/three-way-upsert-preserves-key-with-restore.json +++ b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-multi-join_1/three-way-upsert-preserves-key-with-restore/plan/three-way-upsert-preserves-key-with-restore.json @@ -1,7 +1,7 @@ { "flinkVersion" : "2.2", "nodes" : [ { - "id" : 8, + "id" : 89, "type" : "stream-exec-table-source-scan_1", "scanTableSource" : { "table" : { @@ -39,7 +39,7 @@ "outputType" : "ROW<`user_id` INT NOT NULL, `description` VARCHAR(2147483647)>", "description" : "TableSourceScan(table=[[default_catalog, default_database, Users, project=[user_id, description], metadata=[]]], fields=[user_id, description])" }, { - "id" : 9, + "id" : 90, "type" : "stream-exec-exchange_1", "inputProperties" : [ { "requiredDistribution" : { @@ -52,7 +52,7 @@ "outputType" : "ROW<`user_id` INT NOT NULL, `description` VARCHAR(2147483647)>", "description" : "Exchange(distribution=[hash[user_id]])" }, { - "id" : 10, + "id" : 91, "type" : "stream-exec-table-source-scan_1", "scanTableSource" : { "table" : { @@ -90,7 +90,7 @@ "outputType" : "ROW<`user_id` INT NOT NULL, `order_id` BIGINT NOT NULL>", "description" : "TableSourceScan(table=[[default_catalog, default_database, Orders, project=[user_id, order_id], metadata=[]]], fields=[user_id, order_id])" }, { - "id" : 11, + "id" : 92, "type" : "stream-exec-exchange_1", "inputProperties" : [ { "requiredDistribution" : { @@ -103,7 +103,7 @@ "outputType" : "ROW<`user_id` INT NOT NULL, `order_id` BIGINT NOT NULL>", "description" : "Exchange(distribution=[hash[user_id]])" }, { - "id" : 12, + "id" : 93, "type" : "stream-exec-table-source-scan_1", "scanTableSource" : { "table" : { @@ -141,7 +141,7 @@ "outputType" : "ROW<`user_id` INT NOT NULL, `payment_id` BIGINT NOT NULL>", "description" : "TableSourceScan(table=[[default_catalog, default_database, Payments, project=[user_id, payment_id], metadata=[]]], fields=[user_id, payment_id])" }, { - "id" : 13, + "id" : 94, "type" : "stream-exec-exchange_1", "inputProperties" : [ { "requiredDistribution" : { @@ -154,7 +154,7 @@ "outputType" : "ROW<`user_id` INT NOT NULL, `payment_id` BIGINT NOT NULL>", "description" : "Exchange(distribution=[hash[user_id]])" }, { - "id" : 14, + "id" : 95, "type" : "stream-exec-multi-join_1", "joinTypes" : [ "INNER", "INNER", "INNER" ], "joinConditions" : [ { @@ -282,9 +282,9 @@ "priority" : 2 } ], "outputType" : "ROW<`user_id` INT NOT NULL, `description` VARCHAR(2147483647), `user_id0` INT NOT NULL, `order_id` BIGINT NOT NULL, `user_id1` INT NOT NULL, `payment_id` BIGINT NOT NULL>", - "description" : "MultiJoin(joinFilter=[AND(=($2, $4), =($2, $0))], joinTypes=[[INNER, INNER, INNER]], joinConditions=[[true, =($2, $0), =($2, $4)]], joinAttributeMap=[{0=[LeftInputId:-1;LeftFieldIndex:-1;RightInputId:0;RightFieldIndex:0;], 1=[LeftInputId:0;LeftFieldIndex:0;RightInputId:1;RightFieldIndex:0;], 2=[LeftInputId:1;LeftFieldIndex:0;RightInputId:2;RightFieldIndex:0;]}], select=[user_id,description,user_id0,order_id,user_id1,payment_id], rowType=[RecordType(INTEGER user_id, VARCHAR(2147483647) description, INTEGER user_id0, BIGINT order_id, INTEGER user_id1, BIGINT payment_id)])" + "description" : "MultiJoin(commonJoinKey=[user_id], joinTypes=[INNER, INNER], inputUniqueKeys=[(user_id), (user_id, order_id), (user_id, payment_id)], joinConditions=[(user_id0 = user_id), (user_id0 = user_id1)], select=[user_id,description,user_id0,order_id,user_id1,payment_id], rowType=[RecordType(INTEGER user_id, VARCHAR(2147483647) description, INTEGER user_id0, BIGINT order_id, INTEGER user_id1, BIGINT payment_id)])" }, { - "id" : 15, + "id" : 96, "type" : "stream-exec-calc_1", "projection" : [ { "kind" : "INPUT_REF", @@ -322,7 +322,7 @@ "outputType" : "ROW<`user_id` INT NOT NULL, `order_id` BIGINT NOT NULL, `user_id0` INT NOT NULL, `payment_id` BIGINT NOT NULL, `user_id1` INT NOT NULL, `description` VARCHAR(2147483647)>", "description" : "Calc(select=[user_id0 AS user_id, order_id, user_id1 AS user_id0, payment_id, user_id AS user_id1, description])" }, { - "id" : 16, + "id" : 97, "type" : "stream-exec-sink_1", "configuration" : { "table.exec.sink.keyed-shuffle" : "AUTO", @@ -364,7 +364,7 @@ } } }, - "inputChangelogMode" : [ "INSERT", "UPDATE_AFTER", "DELETE" ], + "inputChangelogMode" : [ "INSERT", "UPDATE_AFTER", "~DELETE" ], "inputUpsertKey" : [ 0, 1, 2, 3 ], "inputProperties" : [ { "requiredDistribution" : { @@ -377,57 +377,57 @@ "description" : "Sink(table=[default_catalog.default_database.sink], fields=[user_id, order_id, user_id0, payment_id, user_id1, description])" } ], "edges" : [ { - "source" : 8, - "target" : 9, + "source" : 89, + "target" : 90, "shuffle" : { "type" : "FORWARD" }, "shuffleMode" : "PIPELINED" }, { - "source" : 10, - "target" : 11, + "source" : 91, + "target" : 92, "shuffle" : { "type" : "FORWARD" }, "shuffleMode" : "PIPELINED" }, { - "source" : 12, - "target" : 13, + "source" : 93, + "target" : 94, "shuffle" : { "type" : "FORWARD" }, "shuffleMode" : "PIPELINED" }, { - "source" : 9, - "target" : 14, + "source" : 90, + "target" : 95, "shuffle" : { "type" : "FORWARD" }, "shuffleMode" : "PIPELINED" }, { - "source" : 11, - "target" : 14, + "source" : 92, + "target" : 95, "shuffle" : { "type" : "FORWARD" }, "shuffleMode" : "PIPELINED" }, { - "source" : 13, - "target" : 14, + "source" : 94, + "target" : 95, "shuffle" : { "type" : "FORWARD" }, "shuffleMode" : "PIPELINED" }, { - "source" : 14, - "target" : 15, + "source" : 95, + "target" : 96, "shuffle" : { "type" : "FORWARD" }, "shuffleMode" : "PIPELINED" }, { - "source" : 15, - "target" : 16, + "source" : 96, + "target" : 97, "shuffle" : { "type" : "FORWARD" }, diff --git a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-multi-join_1/three-way-upsert-preserves-key-with-restore/savepoint/_metadata b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-multi-join_1/three-way-upsert-preserves-key-with-restore/savepoint/_metadata index 43c8c803b596e..42eb977bb8470 100644 Binary files a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-multi-join_1/three-way-upsert-preserves-key-with-restore/savepoint/_metadata and b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-multi-join_1/three-way-upsert-preserves-key-with-restore/savepoint/_metadata differ diff --git a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-multi-join_1/two-way-left-join-preserves-upsert-key-with-restore/plan/two-way-left-join-preserves-upsert-key-with-restore.json b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-multi-join_1/two-way-left-join-preserves-upsert-key-with-restore/plan/two-way-left-join-preserves-upsert-key-with-restore.json index 1b8b16172e123..ff7446857f8ee 100644 --- a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-multi-join_1/two-way-left-join-preserves-upsert-key-with-restore/plan/two-way-left-join-preserves-upsert-key-with-restore.json +++ b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-multi-join_1/two-way-left-join-preserves-upsert-key-with-restore/plan/two-way-left-join-preserves-upsert-key-with-restore.json @@ -1,7 +1,7 @@ { "flinkVersion" : "2.2", "nodes" : [ { - "id" : 1, + "id" : 82, "type" : "stream-exec-table-source-scan_1", "scanTableSource" : { "table" : { @@ -30,7 +30,7 @@ "outputType" : "ROW<`user_id` INT NOT NULL, `order_id` BIGINT NOT NULL, `product` VARCHAR(2147483647)>", "description" : "TableSourceScan(table=[[default_catalog, default_database, Orders]], fields=[user_id, order_id, product])" }, { - "id" : 2, + "id" : 83, "type" : "stream-exec-exchange_1", "inputProperties" : [ { "requiredDistribution" : { @@ -43,7 +43,7 @@ "outputType" : "ROW<`user_id` INT NOT NULL, `order_id` BIGINT NOT NULL, `product` VARCHAR(2147483647)>", "description" : "Exchange(distribution=[hash[user_id]])" }, { - "id" : 3, + "id" : 84, "type" : "stream-exec-table-source-scan_1", "scanTableSource" : { "table" : { @@ -81,7 +81,7 @@ "outputType" : "ROW<`user_id` INT NOT NULL, `shard_id` INT NOT NULL>", "description" : "TableSourceScan(table=[[default_catalog, default_database, Users, project=[user_id, shard_id], metadata=[]]], fields=[user_id, shard_id])" }, { - "id" : 4, + "id" : 85, "type" : "stream-exec-exchange_1", "inputProperties" : [ { "requiredDistribution" : { @@ -94,7 +94,7 @@ "outputType" : "ROW<`user_id` INT NOT NULL, `shard_id` INT NOT NULL>", "description" : "Exchange(distribution=[hash[user_id]])" }, { - "id" : 5, + "id" : 86, "type" : "stream-exec-multi-join_1", "joinTypes" : [ "INNER", "LEFT" ], "joinConditions" : [ { @@ -171,9 +171,9 @@ "priority" : 1 } ], "outputType" : "ROW<`user_id` INT NOT NULL, `order_id` BIGINT NOT NULL, `product` VARCHAR(2147483647), `user_id0` INT, `shard_id` INT>", - "description" : "MultiJoin(joinFilter=[true], joinTypes=[[INNER, LEFT]], joinConditions=[[true, =($3, $0)]], joinAttributeMap=[{0=[LeftInputId:-1;LeftFieldIndex:-1;RightInputId:0;RightFieldIndex:0;], 1=[LeftInputId:0;LeftFieldIndex:0;RightInputId:1;RightFieldIndex:0;]}], select=[user_id,order_id,product,user_id0,shard_id], rowType=[RecordType(INTEGER user_id, BIGINT order_id, VARCHAR(2147483647) product, INTEGER user_id0, INTEGER shard_id)])" + "description" : "MultiJoin(commonJoinKey=[user_id], joinTypes=[LEFT], inputUniqueKeys=[(user_id, order_id), (user_id)], joinConditions=[(user_id0 = user_id)], select=[user_id,order_id,product,user_id0,shard_id], rowType=[RecordType(INTEGER user_id, BIGINT order_id, VARCHAR(2147483647) product, INTEGER user_id0, INTEGER shard_id)])" }, { - "id" : 6, + "id" : 87, "type" : "stream-exec-calc_1", "projection" : [ { "kind" : "INPUT_REF", @@ -203,7 +203,7 @@ "outputType" : "ROW<`user_id` INT NOT NULL, `order_id` BIGINT NOT NULL, `product` VARCHAR(2147483647), `shard_id` INT>", "description" : "Calc(select=[user_id, order_id, product, shard_id])" }, { - "id" : 7, + "id" : 88, "type" : "stream-exec-sink_1", "configuration" : { "table.exec.sink.keyed-shuffle" : "AUTO", @@ -239,7 +239,7 @@ } } }, - "inputChangelogMode" : [ "INSERT", "UPDATE_AFTER", "DELETE" ], + "inputChangelogMode" : [ "INSERT", "UPDATE_AFTER", "~DELETE" ], "inputUpsertKey" : [ 0, 1 ], "inputProperties" : [ { "requiredDistribution" : { @@ -252,43 +252,43 @@ "description" : "Sink(table=[default_catalog.default_database.sink], fields=[user_id, order_id, product, shard_id])" } ], "edges" : [ { - "source" : 1, - "target" : 2, + "source" : 82, + "target" : 83, "shuffle" : { "type" : "FORWARD" }, "shuffleMode" : "PIPELINED" }, { - "source" : 3, - "target" : 4, + "source" : 84, + "target" : 85, "shuffle" : { "type" : "FORWARD" }, "shuffleMode" : "PIPELINED" }, { - "source" : 2, - "target" : 5, + "source" : 83, + "target" : 86, "shuffle" : { "type" : "FORWARD" }, "shuffleMode" : "PIPELINED" }, { - "source" : 4, - "target" : 5, + "source" : 85, + "target" : 86, "shuffle" : { "type" : "FORWARD" }, "shuffleMode" : "PIPELINED" }, { - "source" : 5, - "target" : 6, + "source" : 86, + "target" : 87, "shuffle" : { "type" : "FORWARD" }, "shuffleMode" : "PIPELINED" }, { - "source" : 6, - "target" : 7, + "source" : 87, + "target" : 88, "shuffle" : { "type" : "FORWARD" }, diff --git a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-multi-join_1/two-way-left-join-preserves-upsert-key-with-restore/savepoint/_metadata b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-multi-join_1/two-way-left-join-preserves-upsert-key-with-restore/savepoint/_metadata index e601e1672c6c0..0bda1243b2c3a 100644 Binary files a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-multi-join_1/two-way-left-join-preserves-upsert-key-with-restore/savepoint/_metadata and b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-multi-join_1/two-way-left-join-preserves-upsert-key-with-restore/savepoint/_metadata differ