Skip to content

Commit 6f53c7f

Browse files
authored
HIVE-29166: Fix the partition column update logic in ConvertJoinMapJoin#convertJoinBucketMapJoin. (#6048)
1 parent c62fa28 commit 6f53c7f

File tree

5 files changed

+222
-20
lines changed

5 files changed

+222
-20
lines changed
Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
set hive.auto.convert.join=true;
2+
set hive.convert.join.bucket.mapjoin.tez=true;
3+
4+
CREATE TABLE tbl (foid string, part string, id string) PARTITIONED BY SPEC(bucket(10, id), bucket(10, part)) STORED BY ICEBERG;
5+
INSERT INTO tbl VALUES ('1234', 'PART_123', '1'), ('1235', 'PART_124', '2');
6+
7+
EXPLAIN
8+
SELECT * FROM tbl JOIN tbl tbl2 ON tbl.id = tbl2.id AND tbl.part = tbl2.part;
9+
SELECT * FROM tbl JOIN tbl tbl2 ON tbl.id = tbl2.id AND tbl.part = tbl2.part;
Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,65 @@
1+
PREHOOK: query: CREATE TABLE tbl (foid string, part string, id string) PARTITIONED BY SPEC(bucket(10, id), bucket(10, part)) STORED BY ICEBERG
2+
PREHOOK: type: CREATETABLE
3+
PREHOOK: Output: database:default
4+
PREHOOK: Output: default@tbl
5+
POSTHOOK: query: CREATE TABLE tbl (foid string, part string, id string) PARTITIONED BY SPEC(bucket(10, id), bucket(10, part)) STORED BY ICEBERG
6+
POSTHOOK: type: CREATETABLE
7+
POSTHOOK: Output: database:default
8+
POSTHOOK: Output: default@tbl
9+
PREHOOK: query: INSERT INTO tbl VALUES ('1234', 'PART_123', '1'), ('1235', 'PART_124', '2')
10+
PREHOOK: type: QUERY
11+
PREHOOK: Input: _dummy_database@_dummy_table
12+
PREHOOK: Output: default@tbl
13+
POSTHOOK: query: INSERT INTO tbl VALUES ('1234', 'PART_123', '1'), ('1235', 'PART_124', '2')
14+
POSTHOOK: type: QUERY
15+
POSTHOOK: Input: _dummy_database@_dummy_table
16+
POSTHOOK: Output: default@tbl
17+
PREHOOK: query: EXPLAIN
18+
SELECT * FROM tbl JOIN tbl tbl2 ON tbl.id = tbl2.id AND tbl.part = tbl2.part
19+
PREHOOK: type: QUERY
20+
PREHOOK: Input: default@tbl
21+
PREHOOK: Output: hdfs://### HDFS PATH ###
22+
POSTHOOK: query: EXPLAIN
23+
SELECT * FROM tbl JOIN tbl tbl2 ON tbl.id = tbl2.id AND tbl.part = tbl2.part
24+
POSTHOOK: type: QUERY
25+
POSTHOOK: Input: default@tbl
26+
POSTHOOK: Output: hdfs://### HDFS PATH ###
27+
Plan optimized by CBO.
28+
29+
Vertex dependency in root stage
30+
Map 1 <- Map 2 (CUSTOM_EDGE)
31+
32+
Stage-0
33+
Fetch Operator
34+
limit:-1
35+
Stage-1
36+
Map 1 vectorized
37+
File Output Operator [FS_53]
38+
Map Join Operator [MAPJOIN_52] (rows=2 width=530)
39+
BucketMapJoin:true,Conds:SEL_51._col1, _col2=RS_49._col1, _col2(Inner),Output:["_col0","_col1","_col2","_col3","_col4","_col5"]
40+
<-Map 2 [CUSTOM_EDGE] vectorized
41+
MULTICAST [RS_49]
42+
PartitionCols:_col2, _col1
43+
Select Operator [SEL_48] (rows=2 width=265)
44+
Output:["_col0","_col1","_col2"]
45+
Filter Operator [FIL_47] (rows=2 width=265)
46+
predicate:(id is not null and part is not null)
47+
TableScan [TS_3] (rows=2 width=265)
48+
default@tbl,tbl2,Tbl:COMPLETE,Col:COMPLETE,Output:["foid","part","id"]
49+
<-Select Operator [SEL_51] (rows=2 width=265)
50+
Output:["_col0","_col1","_col2"]
51+
Filter Operator [FIL_50] (rows=2 width=265)
52+
predicate:(id is not null and part is not null)
53+
TableScan [TS_0] (rows=2 width=265)
54+
default@tbl,tbl,Tbl:COMPLETE,Col:COMPLETE,Grouping Num Buckets:100,Grouping Partition Columns:["id","part"],Output:["foid","part","id"]
55+
56+
PREHOOK: query: SELECT * FROM tbl JOIN tbl tbl2 ON tbl.id = tbl2.id AND tbl.part = tbl2.part
57+
PREHOOK: type: QUERY
58+
PREHOOK: Input: default@tbl
59+
PREHOOK: Output: hdfs://### HDFS PATH ###
60+
POSTHOOK: query: SELECT * FROM tbl JOIN tbl tbl2 ON tbl.id = tbl2.id AND tbl.part = tbl2.part
61+
POSTHOOK: type: QUERY
62+
POSTHOOK: Input: default@tbl
63+
POSTHOOK: Output: hdfs://### HDFS PATH ###
64+
1234 PART_123 1 1234 PART_123 1
65+
1235 PART_124 2 1235 PART_124 2

ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConvertJoinMapJoin.java

Lines changed: 27 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -656,32 +656,39 @@ private boolean convertJoinBucketMapJoin(JoinOperator joinOp, OptimizeTezProcCon
656656
// on small table(s).
657657
ReduceSinkOperator bigTableRS = (ReduceSinkOperator)joinOp.getParentOperators().get(bigTablePosition);
658658
OpTraits opTraits = bigTableRS.getOpTraits();
659-
List<List<String>> listBucketCols = opTraits.getBucketColNames();
659+
// It is guaranteed there is only 1 list within bigTableRS.getOpTraits().getBucketColNames().
660+
List<String> listBucketCols = opTraits.getBucketColNames().get(0);
660661
List<ExprNodeDesc> bigTablePartitionCols = bigTableRS.getConf().getPartitionCols();
661-
boolean updatePartitionCols = false;
662+
boolean updatePartitionCols = listBucketCols.size() != bigTablePartitionCols.size();
662663
List<Integer> positions = new ArrayList<>();
663664

664-
CustomBucketFunction bucketFunction = opTraits.getCustomBucketFunctions().get(0);
665-
if (listBucketCols.get(0).size() != bigTablePartitionCols.size()) {
666-
updatePartitionCols = true;
667-
// Prepare updated partition columns for small table(s).
668-
// Get the positions of bucketed columns
669-
670-
int bigTableExprPos = 0;
671-
Map<String, ExprNodeDesc> colExprMap = bigTableRS.getColumnExprMap();
672-
final boolean[] retainedColumns = new boolean[listBucketCols.get(0).size()];
673-
for (ExprNodeDesc bigTableExpr : bigTablePartitionCols) {
674-
// It is guaranteed there is only 1 list within listBucketCols.
675-
for (int i = 0; i < listBucketCols.get(0).size(); i++) {
676-
final String colName = listBucketCols.get(0).get(i);
677-
if (colExprMap.get(colName).isSame(bigTableExpr)) {
678-
positions.add(bigTableExprPos);
679-
retainedColumns[i] = true;
680-
}
665+
// Compare the partition columns and the bucket columns of bigTableRS.
666+
Map<String, ExprNodeDesc> colExprMap = bigTableRS.getColumnExprMap();
667+
final boolean[] retainedColumns = new boolean[listBucketCols.size()];
668+
for (int bucketColIdx = 0; bucketColIdx < listBucketCols.size(); bucketColIdx++) {
669+
for (int bigTablePartIdx = 0; bigTablePartIdx < bigTablePartitionCols.size(); bigTablePartIdx++) {
670+
ExprNodeDesc bigTablePartExpr = bigTablePartitionCols.get(bigTablePartIdx);
671+
ExprNodeDesc bucketColExpr = colExprMap.get(listBucketCols.get(bucketColIdx));
672+
if (bigTablePartExpr.isSame(bucketColExpr)) {
673+
positions.add(bigTablePartIdx);
674+
retainedColumns[bucketColIdx] = true;
675+
// If the positions of the partition column and the bucket column are not the same,
676+
// then we need to update the position of the partition column in small tables.
677+
updatePartitionCols = updatePartitionCols || bucketColIdx != bigTablePartIdx;
678+
break;
681679
}
682-
bigTableExprPos = bigTableExprPos + 1;
683680
}
681+
}
684682

683+
// If the number of partition columns is less than the number of bucket columns,
684+
// then we cannot properly distribute small tables onto bucketized map tasks.
685+
// Bail out.
686+
if (positions.size() < listBucketCols.size()) {
687+
return false;
688+
}
689+
690+
CustomBucketFunction bucketFunction = opTraits.getCustomBucketFunctions().get(0);
691+
if (updatePartitionCols) {
685692
Preconditions.checkState(opTraits.getCustomBucketFunctions().size() == 1);
686693
if (opTraits.getCustomBucketFunctions().get(0) != null) {
687694
final Optional<CustomBucketFunction> selected =
Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
set hive.auto.convert.join=true;
2+
set hive.convert.join.bucket.mapjoin.tez=true;
3+
4+
CREATE TABLE tbl (foid string, part string, id string) CLUSTERED BY (id, part) INTO 64 BUCKETS;
5+
INSERT INTO tbl VALUES ('1234', 'PART_123', '1'), ('1235', 'PART_124', '2');
6+
7+
EXPLAIN
8+
SELECT * FROM tbl JOIN tbl tbl2 ON tbl.id = tbl2.id AND tbl.part = tbl2.part;
9+
SELECT * FROM tbl JOIN tbl tbl2 ON tbl.id = tbl2.id AND tbl.part = tbl2.part;
Lines changed: 112 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,112 @@
1+
PREHOOK: query: CREATE TABLE tbl (foid string, part string, id string) CLUSTERED BY (id, part) INTO 64 BUCKETS
2+
PREHOOK: type: CREATETABLE
3+
PREHOOK: Output: database:default
4+
PREHOOK: Output: default@tbl
5+
POSTHOOK: query: CREATE TABLE tbl (foid string, part string, id string) CLUSTERED BY (id, part) INTO 64 BUCKETS
6+
POSTHOOK: type: CREATETABLE
7+
POSTHOOK: Output: database:default
8+
POSTHOOK: Output: default@tbl
9+
PREHOOK: query: INSERT INTO tbl VALUES ('1234', 'PART_123', '1'), ('1235', 'PART_124', '2')
10+
PREHOOK: type: QUERY
11+
PREHOOK: Input: _dummy_database@_dummy_table
12+
PREHOOK: Output: default@tbl
13+
POSTHOOK: query: INSERT INTO tbl VALUES ('1234', 'PART_123', '1'), ('1235', 'PART_124', '2')
14+
POSTHOOK: type: QUERY
15+
POSTHOOK: Input: _dummy_database@_dummy_table
16+
POSTHOOK: Output: default@tbl
17+
POSTHOOK: Lineage: tbl.foid SCRIPT []
18+
POSTHOOK: Lineage: tbl.id SCRIPT []
19+
POSTHOOK: Lineage: tbl.part SCRIPT []
20+
PREHOOK: query: EXPLAIN
21+
SELECT * FROM tbl JOIN tbl tbl2 ON tbl.id = tbl2.id AND tbl.part = tbl2.part
22+
PREHOOK: type: QUERY
23+
PREHOOK: Input: default@tbl
24+
#### A masked pattern was here ####
25+
POSTHOOK: query: EXPLAIN
26+
SELECT * FROM tbl JOIN tbl tbl2 ON tbl.id = tbl2.id AND tbl.part = tbl2.part
27+
POSTHOOK: type: QUERY
28+
POSTHOOK: Input: default@tbl
29+
#### A masked pattern was here ####
30+
STAGE DEPENDENCIES:
31+
Stage-1 is a root stage
32+
Stage-0 depends on stages: Stage-1
33+
34+
STAGE PLANS:
35+
Stage: Stage-1
36+
Tez
37+
#### A masked pattern was here ####
38+
Edges:
39+
Map 1 <- Map 2 (CUSTOM_EDGE)
40+
#### A masked pattern was here ####
41+
Vertices:
42+
Map 1
43+
Map Operator Tree:
44+
TableScan
45+
alias: tbl
46+
filterExpr: (id is not null and part is not null) (type: boolean)
47+
Statistics: Num rows: 2 Data size: 530 Basic stats: COMPLETE Column stats: COMPLETE
48+
Filter Operator
49+
predicate: (id is not null and part is not null) (type: boolean)
50+
Statistics: Num rows: 2 Data size: 530 Basic stats: COMPLETE Column stats: COMPLETE
51+
Select Operator
52+
expressions: foid (type: string), part (type: string), id (type: string)
53+
outputColumnNames: _col0, _col1, _col2
54+
Statistics: Num rows: 2 Data size: 530 Basic stats: COMPLETE Column stats: COMPLETE
55+
Map Join Operator
56+
condition map:
57+
Inner Join 0 to 1
58+
keys:
59+
0 _col1 (type: string), _col2 (type: string)
60+
1 _col1 (type: string), _col2 (type: string)
61+
outputColumnNames: _col0, _col1, _col2, _col3, _col4, _col5
62+
input vertices:
63+
1 Map 2
64+
Statistics: Num rows: 2 Data size: 1060 Basic stats: COMPLETE Column stats: COMPLETE
65+
File Output Operator
66+
compressed: false
67+
Statistics: Num rows: 2 Data size: 1060 Basic stats: COMPLETE Column stats: COMPLETE
68+
table:
69+
input format: org.apache.hadoop.mapred.SequenceFileInputFormat
70+
output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
71+
serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
72+
Execution mode: vectorized, llap
73+
LLAP IO: all inputs
74+
Map 2
75+
Map Operator Tree:
76+
TableScan
77+
alias: tbl2
78+
filterExpr: (id is not null and part is not null) (type: boolean)
79+
Statistics: Num rows: 2 Data size: 530 Basic stats: COMPLETE Column stats: COMPLETE
80+
Filter Operator
81+
predicate: (id is not null and part is not null) (type: boolean)
82+
Statistics: Num rows: 2 Data size: 530 Basic stats: COMPLETE Column stats: COMPLETE
83+
Select Operator
84+
expressions: foid (type: string), part (type: string), id (type: string)
85+
outputColumnNames: _col0, _col1, _col2
86+
Statistics: Num rows: 2 Data size: 530 Basic stats: COMPLETE Column stats: COMPLETE
87+
Reduce Output Operator
88+
key expressions: _col1 (type: string), _col2 (type: string)
89+
null sort order: zz
90+
sort order: ++
91+
Map-reduce partition columns: _col2 (type: string), _col1 (type: string)
92+
Statistics: Num rows: 2 Data size: 530 Basic stats: COMPLETE Column stats: COMPLETE
93+
value expressions: _col0 (type: string)
94+
Execution mode: vectorized, llap
95+
LLAP IO: all inputs
96+
97+
Stage: Stage-0
98+
Fetch Operator
99+
limit: -1
100+
Processor Tree:
101+
ListSink
102+
103+
PREHOOK: query: SELECT * FROM tbl JOIN tbl tbl2 ON tbl.id = tbl2.id AND tbl.part = tbl2.part
104+
PREHOOK: type: QUERY
105+
PREHOOK: Input: default@tbl
106+
#### A masked pattern was here ####
107+
POSTHOOK: query: SELECT * FROM tbl JOIN tbl tbl2 ON tbl.id = tbl2.id AND tbl.part = tbl2.part
108+
POSTHOOK: type: QUERY
109+
POSTHOOK: Input: default@tbl
110+
#### A masked pattern was here ####
111+
1234 PART_123 1 1234 PART_123 1
112+
1235 PART_124 2 1235 PART_124 2

0 commit comments

Comments
 (0)