@@ -17,6 +17,7 @@ package join
17
17
import (
18
18
"bytes"
19
19
"context"
20
+ "slices"
20
21
"testing"
21
22
22
23
"github.com/matrixorigin/matrixone/pkg/common/mpool"
@@ -84,6 +85,225 @@ func TestString(t *testing.T) {
84
85
}
85
86
}
86
87
88
+ func TestJoinSplit (t * testing.T ) {
89
+ cs := [][]* plan.Expr {
90
+ {newExpr (0 , types .T_int32 .ToType ())},
91
+ {newExpr (0 , types .T_int32 .ToType ())},
92
+ }
93
+ tag := int32 (42 )
94
+ join := & InnerJoin {
95
+ Result : []colexec.ResultPos {
96
+ colexec .NewResultPos (0 , 0 ),
97
+ colexec .NewResultPos (1 , 0 ),
98
+ },
99
+ Conditions : cs ,
100
+ Cond : nil ,
101
+ OperatorBase : vm.OperatorBase {
102
+ OperatorInfo : vm.OperatorInfo {
103
+ Idx : 1 ,
104
+ IsFirst : false ,
105
+ IsLast : false ,
106
+ },
107
+ },
108
+ JoinMapTag : tag ,
109
+ }
110
+ hashbuild := & hashbuild.HashBuild {
111
+ NeedHashMap : true ,
112
+ Conditions : cs [1 ],
113
+ NeedBatches : true ,
114
+ OperatorBase : vm.OperatorBase {
115
+ OperatorInfo : vm.OperatorInfo {
116
+ Idx : 0 ,
117
+ IsFirst : false ,
118
+ IsLast : false ,
119
+ },
120
+ },
121
+ NeedAllocateSels : true ,
122
+ JoinMapTag : tag ,
123
+ JoinMapRefCnt : 1 ,
124
+ }
125
+ bat := batch .New ([]string {"a" , "b" })
126
+ bat .Vecs [0 ] = testutil .MakeInt32Vector ([]int32 {1 , 2 , 4 , 5 , 6 }, nil )
127
+ bat .Vecs [1 ] = testutil .MakeInt32Vector ([]int32 {1 , 2 , 3 , 4 , 5 }, nil )
128
+ bat .SetRowCount (bat .Vecs [0 ].Length ())
129
+ join .AppendChild (colexec .NewMockOperator ().WithBatchs ([]* batch.Batch {bat }))
130
+
131
+ bat2 := batch .New ([]string {"a" , "b" })
132
+ vals := []int32 {0 , 2 , 3 }
133
+ vals = append (vals , slices .Repeat ([]int32 {1 }, 5000 )... )
134
+ vals = append (vals , slices .Repeat ([]int32 {4 }, 5000 )... )
135
+ vals = append (vals , 5 , 5 , 7 , 7 )
136
+ bat2 .Vecs [0 ] = testutil .MakeInt32Vector (vals , nil )
137
+ bat2 .Vecs [1 ] = testutil .MakeInt32Vector (vals , nil )
138
+ bat2 .SetRowCount (bat2 .Vecs [0 ].Length ())
139
+ hashbuild .AppendChild (colexec .NewMockOperator ().WithBatchs ([]* batch.Batch {bat2 }))
140
+
141
+ proc := testutil .NewProcessWithMPool (t , "" , mpool .MustNewZero ())
142
+ proc .SetMessageBoard (message .NewMessageBoard ())
143
+
144
+ err := join .Prepare (proc )
145
+ require .NoError (t , err )
146
+ err = hashbuild .Prepare (proc )
147
+ require .NoError (t , err )
148
+ res , err := vm .Exec (hashbuild , proc )
149
+ require .NoError (t , err )
150
+ require .Equal (t , true , res .Batch == nil )
151
+
152
+ batCnt := 0
153
+ rowCount := 0
154
+ for end := false ; ! end ; {
155
+ result , er := vm .Exec (join , proc )
156
+ if er != nil {
157
+ t .Fatal (er )
158
+ }
159
+ end = result .Status == vm .ExecStop || result .Batch == nil
160
+ if result .Batch != nil {
161
+ rowCount += result .Batch .RowCount ()
162
+ batCnt ++
163
+ }
164
+ }
165
+
166
+ require .Equal (t , 2 , batCnt )
167
+ require .Equal (t , 10000 + 3 , rowCount )
168
+
169
+ t .Log (batCnt , rowCount )
170
+
171
+ join .Free (proc , false , nil )
172
+ hashbuild .Free (proc , false , nil )
173
+ proc .Free ()
174
+ }
175
+
176
+ func TestJoinEvalCondFalse (t * testing.T ) {
177
+ cs := [][]* plan.Expr {
178
+ {newExpr (0 , types .T_int32 .ToType ())},
179
+ {newExpr (0 , types .T_int32 .ToType ())},
180
+ }
181
+ tag := int32 (42 )
182
+ args := make ([]* plan.Expr , 0 , 2 )
183
+ args = append (args , & plan.Expr {
184
+ Typ : plan.Type {
185
+ Id : int32 (types .T_int32 ),
186
+ },
187
+ Expr : & plan.Expr_Col {
188
+ Col : & plan.ColRef {
189
+ RelPos : 0 ,
190
+ ColPos : 0 ,
191
+ },
192
+ },
193
+ })
194
+ args = append (args , & plan.Expr {
195
+ Typ : plan.Type {
196
+ Id : int32 (types .T_int32 ),
197
+ },
198
+ Expr : & plan.Expr_Col {
199
+ Col : & plan.ColRef {
200
+ RelPos : 1 ,
201
+ ColPos : 1 ,
202
+ },
203
+ },
204
+ })
205
+
206
+ fr , _ := function .GetFunctionByName (
207
+ context .Background (),
208
+ ">" ,
209
+ []types.Type {types .T_int32 .ToType (), types .T_int32 .ToType ()},
210
+ )
211
+ fid := fr .GetEncodedOverloadID ()
212
+ cond := & plan.Expr {
213
+ Typ : plan.Type {
214
+ Id : int32 (types .T_bool ),
215
+ },
216
+ Expr : & plan.Expr_F {
217
+ F : & plan.Function {
218
+ Args : args ,
219
+ Func : & plan.ObjectRef {Obj : fid , ObjName : ">" },
220
+ },
221
+ },
222
+ }
223
+ join := & InnerJoin {
224
+ Result : []colexec.ResultPos {
225
+ colexec .NewResultPos (0 , 0 ),
226
+ colexec .NewResultPos (1 , 0 ),
227
+ },
228
+ Conditions : cs ,
229
+ Cond : cond ,
230
+ OperatorBase : vm.OperatorBase {
231
+ OperatorInfo : vm.OperatorInfo {
232
+ Idx : 1 ,
233
+ IsFirst : false ,
234
+ IsLast : false ,
235
+ },
236
+ },
237
+ JoinMapTag : tag ,
238
+ }
239
+ hashbuild := & hashbuild.HashBuild {
240
+ NeedHashMap : true ,
241
+ Conditions : cs [1 ],
242
+ NeedBatches : true ,
243
+ OperatorBase : vm.OperatorBase {
244
+ OperatorInfo : vm.OperatorInfo {
245
+ Idx : 0 ,
246
+ IsFirst : false ,
247
+ IsLast : false ,
248
+ },
249
+ },
250
+ NeedAllocateSels : true ,
251
+ JoinMapTag : tag ,
252
+ JoinMapRefCnt : 1 ,
253
+ }
254
+ bat := batch .New ([]string {"a" , "b" })
255
+ bat .Vecs [0 ] = testutil .MakeInt32Vector ([]int32 {1 , 2 , 4 , 5 , 6 }, nil )
256
+ bat .Vecs [1 ] = testutil .MakeInt32Vector ([]int32 {1 , 2 , 3 , 4 , 5 }, nil )
257
+ bat .SetRowCount (bat .Vecs [0 ].Length ())
258
+ join .AppendChild (colexec .NewMockOperator ().WithBatchs ([]* batch.Batch {bat }))
259
+
260
+ bat2 := batch .New ([]string {"a" , "b" })
261
+ vals := []int32 {0 , 2 , 3 } // !(2 > 2), 2 will be filtered out
262
+ vals = append (vals , slices .Repeat ([]int32 {1 }, 5000 )... ) // !(1 > 2), 1 will be filtered out
263
+ vals = append (vals , slices .Repeat ([]int32 {4 }, 5050 )... ) // 4 > 2, 4 will be selected
264
+ vals = append (vals , 5 , 5 , 7 , 7 ) // 5 > 2, 5 will be selected
265
+ col2 := slices .Repeat ([]int32 {2 }, len (vals ))
266
+ bat2 .Vecs [0 ] = testutil .MakeInt32Vector (vals , nil )
267
+ bat2 .Vecs [1 ] = testutil .MakeInt32Vector (col2 , nil )
268
+ bat2 .SetRowCount (bat2 .Vecs [0 ].Length ())
269
+ hashbuild .AppendChild (colexec .NewMockOperator ().WithBatchs ([]* batch.Batch {bat2 }))
270
+
271
+ proc := testutil .NewProcessWithMPool (t , "" , mpool .MustNewZero ())
272
+ proc .SetMessageBoard (message .NewMessageBoard ())
273
+
274
+ err := join .Prepare (proc )
275
+ require .NoError (t , err )
276
+ err = hashbuild .Prepare (proc )
277
+ require .NoError (t , err )
278
+ res , err := vm .Exec (hashbuild , proc )
279
+ require .NoError (t , err )
280
+ require .Equal (t , true , res .Batch == nil )
281
+
282
+ batCnt := 0
283
+ rowCount := 0
284
+ for end := false ; ! end ; {
285
+ result , er := vm .Exec (join , proc )
286
+ if er != nil {
287
+ t .Fatal (er )
288
+ }
289
+ end = result .Status == vm .ExecStop || result .Batch == nil
290
+ if result .Batch != nil {
291
+ rowCount += result .Batch .RowCount ()
292
+ batCnt ++
293
+ }
294
+ }
295
+
296
+ require .Equal (t , 1 , batCnt )
297
+ require .Equal (t , 5052 , rowCount )
298
+
299
+ t .Log (batCnt , rowCount )
300
+
301
+ join .Free (proc , false , nil )
302
+ hashbuild .Free (proc , false , nil )
303
+ proc .Free ()
304
+
305
+ }
306
+
87
307
func TestJoin (t * testing.T ) {
88
308
for _ , tc := range makeTestCases (t ) {
89
309
0 commit comments