Skip to content

Commit e251b3a

Browse files
fix cdc scanner (#22377)
fix cdc scanner Approved by: @daviszhen, @XuPeng-SH, @ck89119
1 parent 5dbd9ee commit e251b3a

File tree

4 files changed

+46
-4
lines changed

4 files changed

+46
-4
lines changed

pkg/cdc/table_scanner.go

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -185,10 +185,18 @@ func (s *TableDetector) scanTableLoop(ctx context.Context) {
185185
logutil.Info("CDC-TableDetector-Scan-Start")
186186
defer logutil.Info("CDC-TableDetector-Scan-End")
187187

188-
ticker := time.NewTicker(15 * time.Second)
188+
var tickerDuration, retryTickerDuration time.Duration
189+
if msg, injected := objectio.CDCScanTableInjected(); injected || msg == "fast scan" {
190+
tickerDuration = 1 * time.Millisecond
191+
retryTickerDuration = 1 * time.Millisecond
192+
} else {
193+
tickerDuration = 15 * time.Second
194+
retryTickerDuration = 5 * time.Second
195+
}
196+
ticker := time.NewTicker(tickerDuration)
189197
defer ticker.Stop()
190198

191-
retryTicker := time.NewTicker(5 * time.Second)
199+
retryTicker := time.NewTicker(retryTickerDuration)
192200
defer retryTicker.Stop()
193201

194202
for {
@@ -205,7 +213,7 @@ func (s *TableDetector) scanTableLoop(ctx context.Context) {
205213

206214
s.mu.Unlock()
207215

208-
go s.scanAndProcess(ctx)
216+
s.scanAndProcess(ctx)
209217
case <-retryTicker.C:
210218
s.mu.Lock()
211219
if s.handling || s.lastMp == nil {
@@ -229,7 +237,7 @@ func (s *TableDetector) scanAndProcess(ctx context.Context) {
229237
s.lastMp = s.Mp
230238
s.mu.Unlock()
231239

232-
s.processCallback(ctx, s.lastMp)
240+
go s.processCallback(ctx, s.lastMp)
233241
}
234242

235243
func (s *TableDetector) processCallback(ctx context.Context, tables map[uint32]TblMap) {

pkg/cdc/table_scanner_test.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ import (
1919
"strings"
2020
"sync"
2121
"testing"
22+
"time"
2223

2324
"github.com/matrixorigin/matrixone/pkg/objectio"
2425
"github.com/matrixorigin/matrixone/pkg/util/fault"
@@ -163,6 +164,12 @@ func TestScanAndProcess(t *testing.T) {
163164

164165
fault.Enable()
165166
objectio.SimpleInject(objectio.FJ_CDCScanTableErr)
167+
rm, _ := objectio.InjectCDCScanTable("fast scan")
168+
ctx, cancel := context.WithCancel(context.Background())
169+
defer cancel()
170+
go td.scanTableLoop(ctx)
171+
time.Sleep(2 * time.Millisecond)
172+
defer rm()
166173
td.scanAndProcess(context.Background())
167174
fault.Disable()
168175

pkg/frontend/cdc_exector.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -451,6 +451,8 @@ func (exec *CDCTaskExecutor) handleNewTables(allAccountTbls map[uint32]cdc.TblMa
451451
readerInfo := reader.Info()
452452
// wait the old reader to stop
453453
if info.OnlyDiffinTblId(readerInfo) {
454+
logutil.Infof("cdc task wait old reader to stop %s %d->%d",
455+
key, readerInfo.SourceTblId, info.SourceTblId)
454456
waitChan := make(chan struct{})
455457
go func() {
456458
defer close(waitChan)

pkg/objectio/injects.go

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,8 @@ const (
5454
FJ_CronJobsOpen = "fj/cronjobs/open"
5555
FJ_CDCRecordTxn = "fj/cdc/recordtxn"
5656

57+
FJ_CDCScanTable = "fj/cdc/scantable"
58+
5759
FJ_CDCHandleSlow = "fj/cdc/handleslow"
5860
FJ_CDCHandleErr = "fj/cdc/handleerr"
5961
FJ_CDCScanTableErr = "fj/cdc/scantableerr"
@@ -343,6 +345,11 @@ func NotifyInjected(key string) {
343345
fault.TriggerFault(key)
344346
}
345347

348+
func CDCScanTableInjected() (string, bool) {
349+
_, sarg, injected := fault.TriggerFault(FJ_CDCScanTable)
350+
return sarg, injected
351+
}
352+
346353
func InjectWait(key string) (rmFault func(), err error) {
347354
if err = fault.AddFaultPoint(
348355
context.Background(),
@@ -452,6 +459,24 @@ func InjectGCDumpTable(msg string) (rmFault func() (bool, error), err error) {
452459
return
453460
}
454461

462+
func InjectCDCScanTable(msg string) (rmFault func() (bool, error), err error) {
463+
if err = fault.AddFaultPoint(
464+
context.Background(),
465+
FJ_CDCScanTable,
466+
":::",
467+
"echo",
468+
0,
469+
msg,
470+
false,
471+
); err != nil {
472+
return
473+
}
474+
rmFault = func() (ok bool, err error) {
475+
return fault.RemoveFaultPoint(context.Background(), FJ_CDCScanTable)
476+
}
477+
return
478+
}
479+
455480
func Debug19524Injected() bool {
456481
_, _, injected := fault.TriggerFault(FJ_Debug19524)
457482
return injected

0 commit comments

Comments
 (0)