Skip to content

Commit ea17d08

Browse files
authored
optimize top operator's batch processing 3.0 (#22403)
- Replace UnionOne with UnionBatch to improve performance - Allocate off-heap memory for a large limit value (over 10000) Approved by: @aunjgr, @XuPeng-SH
1 parent 8feb70f commit ea17d08

File tree

2 files changed

+60
-41
lines changed

2 files changed

+60
-41
lines changed

pkg/sql/colexec/mergetop/top.go

Lines changed: 30 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -160,9 +160,13 @@ func (ctr *container) build(ap *MergeTop, proc *process.Process, analyzer proces
160160
}
161161

162162
if ctr.bat == nil {
163-
ctr.bat = batch.NewWithSize(len(bat.Vecs))
163+
batNew, vecNew := batch.NewWithSize, vector.NewVec
164+
if ap.ctr.limit > 10240 {
165+
batNew, vecNew = batch.NewOffHeapWithSize, vector.NewOffHeapVecWithType
166+
}
167+
ctr.bat = batNew(len(bat.Vecs))
164168
for i, vec := range bat.Vecs {
165-
ctr.bat.Vecs[i] = vector.NewVec(*vec.GetType())
169+
ctr.bat.Vecs[i] = vecNew(*vec.GetType())
166170
}
167171
}
168172

@@ -193,37 +197,43 @@ func (ctr *container) build(ap *MergeTop, proc *process.Process, analyzer proces
193197
}
194198

195199
func (ctr *container) processBatch(limit uint64, bat *batch.Batch, proc *process.Process) error {
196-
var start int64
200+
rowCount := int64(bat.RowCount())
201+
toFillCount := int64(limit) - int64(len(ctr.sels))
197202

198-
length := int64(bat.RowCount())
199-
if n := uint64(len(ctr.sels)); n < limit {
200-
start = int64(limit - n)
201-
if start > length {
202-
start = length
203-
}
204-
for i := int64(0); i < start; i++ {
205-
for j, vec := range ctr.bat.Vecs {
206-
if err := vec.UnionOne(bat.Vecs[j], i, proc.Mp()); err != nil {
207-
return err
208-
}
203+
processCount := min(int64(toFillCount), rowCount)
204+
205+
if processCount > 0 {
206+
for j, vec := range ctr.bat.Vecs {
207+
if err := vec.UnionBatch(
208+
bat.Vecs[j],
209+
0,
210+
int(processCount),
211+
nil,
212+
proc.Mp(),
213+
); err != nil {
214+
return err
209215
}
210-
ctr.sels = append(ctr.sels, int64(n))
211-
n++
212216
}
213-
ctr.bat.AddRowCount(bat.RowCount())
214-
if n == limit {
217+
baseSel := int64(len(ctr.sels))
218+
for i := range processCount {
219+
ctr.sels = append(ctr.sels, baseSel+i)
220+
}
221+
ctr.bat.AddRowCount(int(processCount))
222+
223+
if uint64(len(ctr.sels)) == limit {
215224
ctr.sort()
216225
}
217226
}
218-
if start == length {
227+
228+
if processCount == rowCount {
219229
return nil
220230
}
221231

222232
// bat is still have items
223233
for i, cmp := range ctr.cmps {
224234
cmp.Set(1, bat.Vecs[i])
225235
}
226-
for i, j := start, length; i < j; i++ {
236+
for i, j := processCount, rowCount; i < j; i++ {
227237
if ctr.compare(1, 0, i, ctr.sels[0]) < 0 {
228238
for _, cmp := range ctr.cmps {
229239
if err := cmp.Copy(1, 0, i, ctr.sels[0], proc); err != nil {

pkg/sql/colexec/top/top.go

Lines changed: 30 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -192,9 +192,13 @@ func (ctr *container) build(ap *Top, bat *batch.Batch, proc *process.Process) er
192192
}
193193

194194
if ctr.bat == nil {
195-
ctr.bat = batch.NewWithSize(len(bat.Vecs))
195+
batNew, vecNew := batch.NewWithSize, vector.NewVec
196+
if ap.ctr.limit > 10240 {
197+
batNew, vecNew = batch.NewOffHeapWithSize, vector.NewOffHeapVecWithType
198+
}
199+
ctr.bat = batNew(len(bat.Vecs))
196200
for i, vec := range bat.Vecs {
197-
ctr.bat.Vecs[i] = vector.NewVec(*vec.GetType())
201+
ctr.bat.Vecs[i] = vecNew(*vec.GetType())
198202
}
199203
}
200204

@@ -223,38 +227,43 @@ func (ctr *container) build(ap *Top, bat *batch.Batch, proc *process.Process) er
223227
}
224228

225229
func (ctr *container) processBatch(limit uint64, bat *batch.Batch, proc *process.Process) error {
226-
var start int64
227-
228-
length := int64(bat.RowCount())
229-
if n := uint64(len(ctr.sels)); n < limit {
230-
start = int64(limit - n)
231-
if start > length {
232-
start = length
233-
}
234-
for i := int64(0); i < start; i++ {
235-
for j, vec := range ctr.bat.Vecs {
236-
if err := vec.UnionOne(bat.Vecs[j], i, proc.Mp()); err != nil {
237-
return err
238-
}
230+
rowCount := int64(bat.RowCount())
231+
toFillCount := int64(limit) - int64(len(ctr.sels))
232+
233+
processCount := min(int64(toFillCount), rowCount)
234+
235+
if processCount > 0 {
236+
for j, vec := range ctr.bat.Vecs {
237+
if err := vec.UnionBatch(
238+
bat.Vecs[j],
239+
0,
240+
int(processCount),
241+
nil,
242+
proc.Mp(),
243+
); err != nil {
244+
return err
239245
}
240-
ctr.sels = append(ctr.sels, int64(n))
241-
n++
242246
}
243-
ctr.bat.AddRowCount(int(start))
247+
baseSel := int64(len(ctr.sels))
248+
for i := range processCount {
249+
ctr.sels = append(ctr.sels, baseSel+i)
250+
}
251+
ctr.bat.AddRowCount(int(processCount))
244252

245-
if n == limit {
253+
if uint64(len(ctr.sels)) == limit {
246254
ctr.sort()
247255
}
248256
}
249-
if start == length {
257+
258+
if processCount == rowCount {
250259
return nil
251260
}
252261

253262
// bat is still have items
254263
for i, cmp := range ctr.cmps {
255264
cmp.Set(1, bat.Vecs[i])
256265
}
257-
for i, j := start, length; i < j; i++ {
266+
for i, j := processCount, rowCount; i < j; i++ {
258267
if ctr.compare(1, 0, i, ctr.sels[0]) < 0 {
259268
for _, cmp := range ctr.cmps {
260269
if err := cmp.Copy(1, 0, i, ctr.sels[0], proc); err != nil {

0 commit comments

Comments
 (0)