Skip to content

Commit e77dbc0

Browse files
authored
Merge branch 'main' into or_expr
2 parents 289b528 + 262958c commit e77dbc0

File tree

72 files changed

+3057
-1298
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

72 files changed

+3057
-1298
lines changed

CODEOWNERS

Lines changed: 30 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -22,19 +22,19 @@
2222

2323
# Top-level folders
2424
/cgo @aunjgr
25-
/docs @fengttt
25+
/docs @fengttt @XuPeng-SH
2626
/etc @zhangxu19830126
27-
/optools @fengttt
28-
/pkg @fengttt
27+
/optools @fengttt @XuPeng-SH
28+
/pkg @fengttt @XuPeng-SH
2929
/proto @zhangxu19830126 @XuPeng-SH
30-
/cmd @daviszhen @zhangxu19830126 @XuPeng-SH
30+
/cmd @zhangxu19830126 @XuPeng-SH
3131
/test @aressu1985 @heni02
3232
/clients @XuPeng-SH
3333

3434
# cmd
3535
/cmd/mo-service @zhangxu19830126
3636

37-
/pkg/backup @daviszhen @LeftHandCold
37+
/pkg/backup @LeftHandCold
3838

3939
/pkg/datasync @volgariver6 @LeftHandCold
4040

@@ -49,21 +49,21 @@
4949
/pkg/iscp @XuPeng-SH
5050

5151
# pkg/cnservice
52-
/pkg/cnservice @reusee @daviszhen @XuPeng-SH
52+
/pkg/cnservice @XuPeng-SH
5353

5454
/pkg/cacheservice @volgariver6 @aptend
5555

5656
/pkg/clusterservice @volgariver6 @aptend
5757

5858
# pkg/common @fengttt
5959
/pkg/common @zhangxu19830126 @XuPeng-SH
60-
/pkg/common/mpool @reusee @XuPeng-SH
61-
/pkg/common/malloc @reusee
62-
/pkg/common/spool @reusee @ouyuanning @aunjgr
63-
/pkg/common/async @zhangxu19830126
60+
/pkg/common/mpool @XuPeng-SH
61+
/pkg/common/malloc @XuPeng-SH
62+
/pkg/common/spool @ouyuanning @aunjgr
63+
/pkg/common/async @zhangxu19830126 @XuPeng-SH
6464
/pkg/common/bitmap @aunjgr @XuPeng-SH
6565
/pkg/common/hashmap @aunjgr
66-
/pkg/common/moerr @xzxiong
66+
/pkg/common/moerr @xzxiong @XuPeng-SH
6767
/pkg/common/morpc @zhangxu19830126
6868
/pkg/common/moprobe @fengttt
6969
/pkg/common/stopper @zhangxu19830126
@@ -78,15 +78,15 @@
7878
/pkg/compress @aunjgr
7979

8080
# pkg/config
81-
/pkg/config @daviszhen @fengttt
81+
/pkg/config @fengttt @XuPeng-SH
8282

8383
# pkg/container
8484
/pkg/container @XuPeng-SH
8585
/pkg/container/vector @aunjgr @XuPeng-SH
86-
/pkg/container/pSpool @reusee @aunjgr @ouyuanning
86+
/pkg/container/pSpool @aunjgr @ouyuanning
8787

8888
# pkg/defines
89-
/pkg/defines @daviszhen @XuPeng-SH
89+
/pkg/defines @XuPeng-SH
9090

9191
# pkg/embed
9292
/pkg/embed @zhangxu19830126
@@ -95,22 +95,22 @@
9595
/pkg/tnservice @zhangxu19830126 @XuPeng-SH
9696

9797
# pkg/fileservice
98-
/pkg/fileservice @reusee @fengttt
98+
/pkg/fileservice @fengttt @XuPeng-SH
9999

100-
# pkg/frontend @daviszhen
101-
/pkg/frontend @daviszhen @XuPeng-SH
100+
# pkg/frontend
101+
/pkg/frontend @XuPeng-SH
102102

103103
# pkg/hakeeper
104104
/pkg/hakeeper @zhangxu19830126 @volgariver6
105105

106106
# pkg/incrservice
107-
/pkg/incrservice @zhangxu19830126
107+
/pkg/incrservice @zhangxu19830126 @XuPeng-SH
108108

109109
# pkg/lockservice
110-
/pkg/lockservice @zhangxu19830126
110+
/pkg/lockservice @zhangxu19830126 @iamlinjunhong
111111

112112
# pkg/shardservice
113-
/pkg/shardservice @zhangxu19830126
113+
/pkg/shardservice @zhangxu19830126 @iamlinjunhong
114114

115115
# pkg/partitionservice
116116
/pkg/partitionservice @zhangxu19830126 @iamlinjunhong
@@ -119,7 +119,7 @@
119119
/pkg/logservice @zhangxu19830126 @volgariver6
120120

121121
# pkg/logutil
122-
/pkg/logutil @daviszhen @XuPeng-SH
122+
/pkg/logutil @XuPeng-SH
123123

124124
# pkg/objectio
125125
/pkg/objectio @LeftHandCold @XuPeng-SH
@@ -131,21 +131,21 @@
131131
# for compatibility issues required for rolling upgrade.
132132
/pkg/pb @zhangxu19830126 @XuPeng-SH
133133
/pkg/pb/logservice @zhangxu19830126
134-
/pkg/pb/metadata @zhangxu19830126
134+
/pkg/pb/metadata @zhangxu19830126 @XuPeng-SH
135135
/pkg/pb/metric @aptend
136136
/pkg/pb/pipeline @ouyuanning @aunjgr
137137
/pkg/pb/plan @ouyuanning @aunjgr
138138
/pkg/pb/timestamp @zhangxu19830126
139139
/pkg/pb/txn @zhangxu19830126
140140

141141
# pkg/perfcounter
142-
/pkg/perfcounter @reusee
142+
/pkg/perfcounter @XuPeng-SH
143143

144144
# pkg/sort
145145
/pkg/sort @aunjgr
146146

147147
# pkg/bootstrap
148-
/pkg/bootstrap @daviszhen @zhangxu19830126 @LeftHandCold
148+
/pkg/bootstrap @zhangxu19830126 @LeftHandCold
149149

150150
# pkg/sql
151151
/pkg/sql @aunjgr
@@ -156,7 +156,7 @@
156156
/pkg/sql/plan @ouyuanning @aunjgr
157157
/pkg/sql/plan/function @ouyuanning @aunjgr
158158
/pkg/sql/plan/explain @ouyuanning @aunjgr
159-
/pkg/sql/plan/tools @daviszhen @gouhongshen
159+
/pkg/sql/plan/tools @gouhongshen
160160
/pkg/sql/colexec/indexjoin @aunjgr
161161
/pkg/sql/colexec/indexbuild @aunjgr
162162
/pkg/sql/colexec/intersect @aunjgr
@@ -182,19 +182,19 @@
182182
/pkg/sql/colexec/multi_update @ouyuanning
183183

184184
# pkg/taskservice
185-
/pkg/taskservice @zhangxu19830126
185+
/pkg/taskservice @zhangxu19830126 @XuPeng-SH
186186

187187
# pkg/tests
188188
/pkg/tests @zhangxu19830126
189189
/pkg/tests/service @zhangxu19830126
190190
/pkg/tests/txn @zhangxu19830126
191191

192192
# pkg/testutil
193-
/pkg/testutil @ouyuanning @daviszhen @XuPeng-SH
193+
/pkg/testutil @ouyuanning @XuPeng-SH
194194

195195
# pkg/txn
196196
/pkg/txn @zhangxu19830126
197-
/pkg/txn/storage/memorystorage @reusee
197+
/pkg/txn/storage/memorystorage @XuPeng-SH
198198

199199
# pkg/util
200200
/pkg/util @zhangxu19830126 @XuPeng-SH
@@ -213,12 +213,12 @@
213213
/pkg/vm @XuPeng-SH
214214
/pkg/vm/engine @XuPeng-SH
215215
/pkg/vm/pipeline @ouyuanning @aunjgr
216-
/pkg/vm/process @reusee @aunjgr @XuPeng-SH
216+
/pkg/vm/process @aunjgr @XuPeng-SH
217217
/pkg/vm/message @aunjgr
218218
/pkg/vm/engine/disttae @XuPeng-SH @gouhongshen
219219
/pkg/vm/engine/disttae/logtailreplay @XuPeng-SH @gouhongshen
220220
/pkg/vm/engine/tae @XuPeng-SH
221-
/pkg/vm/engine/memoryengine @reusee
221+
/pkg/vm/engine/memoryengine @XuPeng-SH
222222
/pkg/vm/engine/tae/logtail/service @XuPeng-SH @volgariver6
223223
/pkg/vm/engine/tae/logstore @XuPeng-SH @jiangxinmeng1
224224
/pkg/vm/engine/tae/wal @XuPeng-SH @jiangxinmeng1

etc/launch/cn.toml

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,15 @@ level = "info"
88
uuid = "dd1dccb4-4d3c-41f8-b482-5251dc7a41bf"
99
port-base = 18000
1010

11+
[cn.Engine]
12+
# only prefetch the matched db.table
13+
prefetch-on-subscribed = [
14+
'^mo_catalog\.mo_tables$',
15+
'^mysql\..*$',
16+
'^test1\..*$',
17+
'^test2\.t1$',
18+
]
19+
1120
[malloc]
1221
check-fraction = 65536
1322
enable-metrics = true

pkg/cnservice/distributed_tae.go

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -79,7 +79,11 @@ func (s *service) initDistributedTAE(
7979
s.cfg.LogtailUpdateWorkerFactor,
8080

8181
disttae.WithCNTransferTxnLifespanThreshold(
82-
s.cfg.Engine.CNTransferTxnLifespanThreshold),
82+
s.cfg.Engine.CNTransferTxnLifespanThreshold,
83+
),
84+
disttae.WithPrefetchOnSubscribed(
85+
s.cfg.Engine.PrefetchOnSubscribed,
86+
),
8387
disttae.WithMoTableStatsConf(s.cfg.Engine.Stats),
8488
disttae.WithSQLExecFunc(internalExecutorFactory),
8589
disttae.WithMoServerStateChecker(func() bool {

pkg/cnservice/server_query.go

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -100,6 +100,7 @@ func (s *service) initQueryCommandHandler() {
100100
s.queryService.AddHandleFunc(query.CmdMethod_CtlMoTableStats, s.handleMoTableStats, false)
101101
s.queryService.AddHandleFunc(query.CmdMethod_WorkspaceThreshold, s.handleWorkspaceThresholdRequest, false)
102102
s.queryService.AddHandleFunc(query.CmdMethod_MinTimestamp, s.handleGetMinTimestamp, false)
103+
s.queryService.AddHandleFunc(query.CmdMethod_CtlPrefetchOnSubscribed, s.handleCtlPrefetchOnSubscribed, false)
103104
}
104105

105106
func (s *service) handleKillConn(ctx context.Context, req *query.Request, resp *query.Response, _ *morpc.Buffer) error {
@@ -179,6 +180,17 @@ func (s *service) handleCtlReader(ctx context.Context, req *query.Request, resp
179180
return nil
180181
}
181182

183+
func (s *service) handleCtlPrefetchOnSubscribed(ctx context.Context, req *query.Request, resp *query.Response, _ *morpc.Buffer) error {
184+
if req == nil || req.CtlPrefetchOnSubscribedRequest == nil {
185+
return moerr.NewInternalError(ctx, "bad request")
186+
}
187+
resp.CtlPrefetchOnSubscribedResponse = new(query.CtlPrefetchOnSubscribedResponse)
188+
resp.CtlPrefetchOnSubscribedResponse.Resp = ctl.UpdateCurrentCNPrefetchOnSubscribed(
189+
req.CtlPrefetchOnSubscribedRequest.Patterns,
190+
)
191+
return nil
192+
}
193+
182194
// handleGetLockInfo sends the lock info on current cn to another cn that needs.
183195
func (s *service) handleGetLockInfo(ctx context.Context, req *query.Request, resp *query.Response, _ *morpc.Buffer) error {
184196
resp.GetLockInfoResponse = new(query.GetLockInfoResponse)
@@ -408,7 +420,7 @@ func (s *service) handleUnsubscribeTable(ctx context.Context, req *query.Request
408420
if req.UnsubscribeTable == nil {
409421
return moerr.NewInternalError(ctx, "bad request")
410422
}
411-
err := s.storeEngine.UnsubscribeTable(ctx, req.UnsubscribeTable.DatabaseID, req.UnsubscribeTable.TableID)
423+
err := s.storeEngine.UnsubscribeTable(ctx, 0, req.UnsubscribeTable.DatabaseID, req.UnsubscribeTable.TableID)
412424
if err != nil {
413425
resp.WrapError(err)
414426
return nil

pkg/cnservice/server_query_test.go

Lines changed: 20 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -518,10 +518,10 @@ func Test_service_handleUnsubscribeTable(t *testing.T) {
518518
err := dummyErr
519519
ctl := gomock.NewController(t)
520520
mockEng := mock_frontend.NewMockEngine(ctl)
521-
mockEng.EXPECT().UnsubscribeTable(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil).AnyTimes()
521+
mockEng.EXPECT().UnsubscribeTable(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).Return(nil).AnyTimes()
522522

523523
mockEngErr := mock_frontend.NewMockEngine(ctl)
524-
mockEngErr.EXPECT().UnsubscribeTable(gomock.Any(), gomock.Any(), gomock.Any()).Return(err).AnyTimes()
524+
mockEngErr.EXPECT().UnsubscribeTable(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).Return(err).AnyTimes()
525525

526526
respWithErr := &query.Response{}
527527
respWithErr.WrapError(err)
@@ -759,6 +759,24 @@ func Test_service_handleCtlReader(t *testing.T) {
759759
}
760760
}
761761

762+
func Test_service_handleCtlPrefetchOnSubscribed(t *testing.T) {
763+
ctx := context.Background()
764+
765+
req := &query.Request{CtlPrefetchOnSubscribedRequest: &query.CtlPrefetchOnSubscribedRequest{
766+
Patterns: []string{"^foo$"},
767+
}}
768+
resp := &query.Response{}
769+
t.Cleanup(func() {
770+
require.NoError(t, engine.SetPrefetchOnSubscribed(nil))
771+
})
772+
s := &service{}
773+
err := s.handleCtlPrefetchOnSubscribed(ctx, req, resp, nil)
774+
require.NoError(t, err)
775+
require.Equal(t, &query.Response{CtlPrefetchOnSubscribedResponse: &query.CtlPrefetchOnSubscribedResponse{
776+
Resp: "prefetch_on_subscribed updated, patterns: 1",
777+
}}, resp)
778+
}
779+
762780
func Test_service_handleRunTask(t *testing.T) {
763781

764782
ctx := context.Background()

pkg/cnservice/types.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -130,6 +130,13 @@ type Config struct {
130130
Engine struct {
131131
Type EngineType `toml:"type"`
132132

133+
// only prefetch the matched dbname.tablename
134+
// '^mo_catalog\.mo_tables$',
135+
// '^mysql\..*$',
136+
// '^test1\..*$',
137+
// '^test2\.t1$'
138+
PrefetchOnSubscribed []string `toml:"prefetch-on-subscribed"`
139+
133140
MoTableStatsUseOldImpl bool `toml:"mo-table-stats-use-old-impl"`
134141
CNTransferTxnLifespanThreshold time.Duration `toml:"cn-transfer-txn-lifespan-threshold"`
135142

pkg/fileservice/local_fs.go

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -389,11 +389,31 @@ read_disk_cache:
389389
if l.diskCache != nil {
390390

391391
t0 := time.Now()
392+
LogEvent(ctx, str_read_disk_cache_Caches_begin)
393+
// Record which entries are not done before reading from disk cache
394+
undoneBefore := make(map[int]bool)
395+
for i, entry := range vector.Entries {
396+
undoneBefore[i] = !entry.done
397+
}
392398
err := readCache(ctx, l.diskCache, vector)
399+
LogEvent(ctx, str_read_disk_cache_Caches_end)
393400
metric.FSReadDurationReadDiskCache.Observe(time.Since(t0).Seconds())
394401
if err != nil {
395402
return err
396403
}
404+
// Count bytes actually read from disk cache (entries that became done and from disk cache)
405+
var actualDiskReadBytes int64
406+
for i, entry := range vector.Entries {
407+
if undoneBefore[i] && entry.done && entry.fromCache == l.diskCache {
408+
actualDiskReadBytes += entry.Size
409+
}
410+
}
411+
// Record disk read size
412+
if actualDiskReadBytes > 0 {
413+
perfcounter.Update(ctx, func(counter *perfcounter.CounterSet) {
414+
counter.FileService.DiskReadSize.Add(actualDiskReadBytes)
415+
})
416+
}
397417
if vector.allDone() {
398418
return nil
399419
}
@@ -440,10 +460,23 @@ read_disk_cache:
440460
}
441461
}
442462

463+
// Count bytes that will be read from local disk (entries that are not done yet)
464+
var localDiskReadBytes int64
465+
for _, entry := range vector.Entries {
466+
if !entry.done {
467+
localDiskReadBytes += entry.Size
468+
}
469+
}
443470
err = l.read(ctx, vector, bytesCounter)
444471
if err != nil {
445472
return err
446473
}
474+
// Record disk read size (all bytes read from local disk)
475+
if localDiskReadBytes > 0 {
476+
perfcounter.Update(ctx, func(counter *perfcounter.CounterSet) {
477+
counter.FileService.DiskReadSize.Add(localDiskReadBytes)
478+
})
479+
}
447480

448481
return nil
449482
}

0 commit comments

Comments
 (0)