Skip to content

Commit 96de70f

Browse files
[FLINK-38211][table-planner] Comma between inputs and add noUniqueKey
1 parent b32fc83 commit 96de70f

File tree

7 files changed

+42
-37
lines changed

7 files changed

+42
-37
lines changed

flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalMultiJoin.java

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -164,9 +164,13 @@ public RelWriter explainTerms(final RelWriter pw) {
164164
}
165165

166166
return pw.item("commonJoinKey", getCommonJoinKeyFieldNames())
167-
.item("joinTypes", joinTypes)
168-
.item("joinConditions", formatJoinConditionsWithFieldNames(pw))
167+
.item(
168+
"joinTypes",
169+
joinTypes.stream()
170+
.map(JoinRelType::toString)
171+
.collect(Collectors.joining(", ")))
169172
.item("inputUniqueKeys", formatInputUniqueKeysWithFieldNames())
173+
.item("joinConditions", formatJoinConditionsWithFieldNames(pw))
170174
.itemIf(
171175
"joinFilter",
172176
formatExpressionWithFieldNames(joinFilter, pw),
@@ -343,7 +347,7 @@ private String formatJoinConditionsWithFieldNames(final RelWriter pw) {
343347
}
344348
}
345349

346-
return String.join(" AND ", formattedConditions);
350+
return String.join(", ", formattedConditions);
347351
}
348352

349353
private String formatInputUniqueKeysWithFieldNames() {
@@ -367,13 +371,14 @@ private String formatInputUniqueKeysWithFieldNames() {
367371
}
368372
}
369373
if (!uniqueKeyStrings.isEmpty()) {
370-
inputUniqueKeyStrings.add(
371-
"input#" + i + ": " + String.join(", ", uniqueKeyStrings));
374+
inputUniqueKeyStrings.add(String.join(", ", uniqueKeyStrings));
372375
}
376+
} else {
377+
inputUniqueKeyStrings.add("noUniqueKey");
373378
}
374379
}
375380

376-
return String.join(" ", inputUniqueKeyStrings);
381+
return String.join(", ", inputUniqueKeyStrings);
377382
}
378383

379384
/**

flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/MultiJoinTest.xml

Lines changed: 25 additions & 25 deletions
Large diffs are not rendered by default.

flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-multi-join_1/four-way-complex-updating-join-with-restore/plan/four-way-complex-updating-join-with-restore.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -447,7 +447,7 @@
447447
"priority" : 3
448448
} ],
449449
"outputType" : "ROW<`name` VARCHAR(2147483647), `user_id_0` VARCHAR(2147483647) NOT NULL, `cash` INT, `order_id` VARCHAR(2147483647), `user_id_1` VARCHAR(2147483647), `user_id_2` VARCHAR(2147483647), `payment_id` VARCHAR(2147483647) NOT NULL, `price` INT, `location` VARCHAR(2147483647), `user_id_3` VARCHAR(2147483647)>",
450-
"description" : "MultiJoin(commonJoinKey=[user_id_0], joinTypes=[[INNER, LEFT, INNER, LEFT]], joinConditions=[true AND (user_id_0 = user_id_1) AND ((user_id_0 = user_id_2) AND ((cash >= price) OR (price < 0))) AND (user_id_2 = user_id_3)], inputUniqueKeys=[input#0: (user_id_0) input#1: (order_id) input#2: (payment_id)], joinFilter=[((user_id_0 = user_id_2) AND ((cash >= price) OR (price < 0)))], select=[name,user_id_0,cash,order_id,user_id_1,user_id_2,payment_id,price,location,user_id_3], outputRowType=[RecordType(VARCHAR(2147483647) name, VARCHAR(2147483647) user_id_0, INTEGER cash, VARCHAR(2147483647) order_id, VARCHAR(2147483647) user_id_1, VARCHAR(2147483647) user_id_2, VARCHAR(2147483647) payment_id, INTEGER price, VARCHAR(2147483647) location, VARCHAR(2147483647) user_id_3)])"
450+
"description" : "MultiJoin(commonJoinKey=[user_id_0], joinTypes=[INNER, LEFT, INNER, LEFT], inputUniqueKeys=[(user_id_0), (order_id), (payment_id), noUniqueKey], joinConditions=[true, (user_id_0 = user_id_1), ((user_id_0 = user_id_2) AND ((cash >= price) OR (price < 0))), (user_id_2 = user_id_3)], joinFilter=[((user_id_0 = user_id_2) AND ((cash >= price) OR (price < 0)))], select=[name,user_id_0,cash,order_id,user_id_1,user_id_2,payment_id,price,location,user_id_3], outputRowType=[RecordType(VARCHAR(2147483647) name, VARCHAR(2147483647) user_id_0, INTEGER cash, VARCHAR(2147483647) order_id, VARCHAR(2147483647) user_id_1, VARCHAR(2147483647) user_id_2, VARCHAR(2147483647) payment_id, INTEGER price, VARCHAR(2147483647) location, VARCHAR(2147483647) user_id_3)])"
451451
}, {
452452
"id" : 30,
453453
"type" : "stream-exec-calc_1",

flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-multi-join_1/four-way-join-no-common-join-key-with-restore/plan/four-way-join-no-common-join-key-with-restore.json

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -312,7 +312,7 @@
312312
"priority" : 2
313313
} ],
314314
"outputType" : "ROW<`name` VARCHAR(2147483647), `user_id_0` VARCHAR(2147483647) NOT NULL, `order_id` VARCHAR(2147483647), `user_id_1` VARCHAR(2147483647), `user_id_2` VARCHAR(2147483647), `payment_id` VARCHAR(2147483647) NOT NULL>",
315-
"description" : "MultiJoin(commonJoinKey=[user_id_0], joinTypes=[[INNER, LEFT, INNER]], joinConditions=[true AND (user_id_0 = user_id_1) AND (user_id_0 = user_id_2)], inputUniqueKeys=[input#0: (user_id_0) input#1: (order_id) input#2: (payment_id)], joinFilter=[(user_id_0 = user_id_2)], select=[name,user_id_0,order_id,user_id_1,user_id_2,payment_id], outputRowType=[RecordType(VARCHAR(2147483647) name, VARCHAR(2147483647) user_id_0, VARCHAR(2147483647) order_id, VARCHAR(2147483647) user_id_1, VARCHAR(2147483647) user_id_2, VARCHAR(2147483647) payment_id)])"
315+
"description" : "MultiJoin(commonJoinKey=[user_id_0], joinTypes=[INNER, LEFT, INNER], inputUniqueKeys=[(user_id_0), (order_id), (payment_id)], joinConditions=[true, (user_id_0 = user_id_1), (user_id_0 = user_id_2)], joinFilter=[(user_id_0 = user_id_2)], select=[name,user_id_0,order_id,user_id_1,user_id_2,payment_id], outputRowType=[RecordType(VARCHAR(2147483647) name, VARCHAR(2147483647) user_id_0, VARCHAR(2147483647) order_id, VARCHAR(2147483647) user_id_1, VARCHAR(2147483647) user_id_2, VARCHAR(2147483647) payment_id)])"
316316
}, {
317317
"id" : 41,
318318
"type" : "stream-exec-calc_1",
@@ -462,7 +462,7 @@
462462
"priority" : 1
463463
} ],
464464
"outputType" : "ROW<`name` VARCHAR(2147483647), `user_id_0` VARCHAR(2147483647) NOT NULL, `order_id` VARCHAR(2147483647), `payment_id` VARCHAR(2147483647) NOT NULL, `location` VARCHAR(2147483647), `user_id_3` VARCHAR(2147483647)>",
465-
"description" : "MultiJoin(commonJoinKey=[payment_id], joinTypes=[[INNER, LEFT]], joinConditions=[true AND (payment_id = user_id_3)], inputUniqueKeys=[], joinFilter=[true], select=[name,user_id_0,order_id,payment_id,location,user_id_3], outputRowType=[RecordType(VARCHAR(2147483647) name, VARCHAR(2147483647) user_id_0, VARCHAR(2147483647) order_id, VARCHAR(2147483647) payment_id, VARCHAR(2147483647) location, VARCHAR(2147483647) user_id_3)])"
465+
"description" : "MultiJoin(commonJoinKey=[payment_id], joinTypes=[INNER, LEFT], inputUniqueKeys=[noUniqueKey, noUniqueKey], joinConditions=[true, (payment_id = user_id_3)], joinFilter=[true], select=[name,user_id_0,order_id,payment_id,location,user_id_3], outputRowType=[RecordType(VARCHAR(2147483647) name, VARCHAR(2147483647) user_id_0, VARCHAR(2147483647) order_id, VARCHAR(2147483647) payment_id, VARCHAR(2147483647) location, VARCHAR(2147483647) user_id_3)])"
466466
}, {
467467
"id" : 46,
468468
"type" : "stream-exec-calc_1",

flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-multi-join_1/three-way-inner-join-with-restore/plan/three-way-inner-join-with-restore.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -225,7 +225,7 @@
225225
"priority" : 2
226226
} ],
227227
"outputType" : "ROW<`user_id` VARCHAR(2147483647), `name` VARCHAR(2147483647), `user_id0` VARCHAR(2147483647), `order_id` VARCHAR(2147483647), `user_id1` VARCHAR(2147483647), `payment_id` VARCHAR(2147483647)>",
228-
"description" : "MultiJoin(commonJoinKey=[user_id], joinTypes=[[INNER, INNER, INNER]], joinConditions=[true AND (user_id = user_id0) AND (user_id = user_id1)], inputUniqueKeys=[], joinFilter=[((user_id = user_id1) AND (user_id = user_id0))], select=[user_id,name,user_id0,order_id,user_id1,payment_id], outputRowType=[RecordType(VARCHAR(2147483647) user_id, VARCHAR(2147483647) name, VARCHAR(2147483647) user_id0, VARCHAR(2147483647) order_id, VARCHAR(2147483647) user_id1, VARCHAR(2147483647) payment_id)])"
228+
"description" : "MultiJoin(commonJoinKey=[user_id], joinTypes=[INNER, INNER, INNER], inputUniqueKeys=[noUniqueKey, noUniqueKey, noUniqueKey], joinConditions=[true, (user_id = user_id0), (user_id = user_id1)], joinFilter=[((user_id = user_id1) AND (user_id = user_id0))], select=[user_id,name,user_id0,order_id,user_id1,payment_id], outputRowType=[RecordType(VARCHAR(2147483647) user_id, VARCHAR(2147483647) name, VARCHAR(2147483647) user_id0, VARCHAR(2147483647) order_id, VARCHAR(2147483647) user_id1, VARCHAR(2147483647) payment_id)])"
229229
}, {
230230
"id" : 8,
231231
"type" : "stream-exec-calc_1",

flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-multi-join_1/three-way-join-with-time-attributes-with-restore/plan/three-way-join-with-time-attributes-with-restore.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -357,7 +357,7 @@
357357
"priority" : 2
358358
} ],
359359
"outputType" : "ROW<`user_id_0` VARCHAR(2147483647) NOT NULL, `name` VARCHAR(2147483647), `order_id` VARCHAR(2147483647) NOT NULL, `user_id_1` VARCHAR(2147483647), `rowtime` TIMESTAMP(3), `price` INT, `user_id_2` VARCHAR(2147483647)>",
360-
"description" : "MultiJoin(commonJoinKey=[user_id_0], joinTypes=[[INNER, INNER, INNER]], joinConditions=[true AND (user_id_0 = user_id_1) AND (user_id_0 = user_id_2)], inputUniqueKeys=[input#0: (user_id_0) input#1: (order_id)], joinFilter=[((user_id_0 = user_id_2) AND (user_id_0 = user_id_1))], select=[user_id_0,name,order_id,user_id_1,rowtime,price,user_id_2], outputRowType=[RecordType(VARCHAR(2147483647) user_id_0, VARCHAR(2147483647) name, VARCHAR(2147483647) order_id, VARCHAR(2147483647) user_id_1, TIMESTAMP(3) rowtime, INTEGER price, VARCHAR(2147483647) user_id_2)])"
360+
"description" : "MultiJoin(commonJoinKey=[user_id_0], joinTypes=[INNER, INNER, INNER], inputUniqueKeys=[(user_id_0), (order_id), noUniqueKey], joinConditions=[true, (user_id_0 = user_id_1), (user_id_0 = user_id_2)], joinFilter=[((user_id_0 = user_id_2) AND (user_id_0 = user_id_1))], select=[user_id_0,name,order_id,user_id_1,rowtime,price,user_id_2], outputRowType=[RecordType(VARCHAR(2147483647) user_id_0, VARCHAR(2147483647) name, VARCHAR(2147483647) order_id, VARCHAR(2147483647) user_id_1, TIMESTAMP(3) rowtime, INTEGER price, VARCHAR(2147483647) user_id_2)])"
361361
}, {
362362
"id" : 57,
363363
"type" : "stream-exec-calc_1",

flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-multi-join_1/three-way-left-outer-join-with-restore/plan/three-way-left-outer-join-with-restore.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -225,7 +225,7 @@
225225
"priority" : 2
226226
} ],
227227
"outputType" : "ROW<`user_id` VARCHAR(2147483647), `name` VARCHAR(2147483647), `user_id0` VARCHAR(2147483647), `order_id` VARCHAR(2147483647), `user_id1` VARCHAR(2147483647), `payment_id` VARCHAR(2147483647)>",
228-
"description" : "MultiJoin(commonJoinKey=[user_id], joinTypes=[[INNER, LEFT, LEFT]], joinConditions=[true AND (user_id = user_id0) AND (user_id = user_id1)], inputUniqueKeys=[], joinFilter=[true], select=[user_id,name,user_id0,order_id,user_id1,payment_id], outputRowType=[RecordType(VARCHAR(2147483647) user_id, VARCHAR(2147483647) name, VARCHAR(2147483647) user_id0, VARCHAR(2147483647) order_id, VARCHAR(2147483647) user_id1, VARCHAR(2147483647) payment_id)])"
228+
"description" : "MultiJoin(commonJoinKey=[user_id], joinTypes=[INNER, LEFT, LEFT], inputUniqueKeys=[noUniqueKey, noUniqueKey, noUniqueKey], joinConditions=[true, (user_id = user_id0), (user_id = user_id1)], joinFilter=[true], select=[user_id,name,user_id0,order_id,user_id1,payment_id], outputRowType=[RecordType(VARCHAR(2147483647) user_id, VARCHAR(2147483647) name, VARCHAR(2147483647) user_id0, VARCHAR(2147483647) order_id, VARCHAR(2147483647) user_id1, VARCHAR(2147483647) payment_id)])"
229229
}, {
230230
"id" : 17,
231231
"type" : "stream-exec-calc_1",

0 commit comments

Comments
 (0)