Skip to content

Commit c6ebcb0

Browse files
authored
Merge branch '3.0-dev' into copy_index_to_3.0
2 parents cca5f49 + 6ba4f5e commit c6ebcb0

17 files changed

+918
-143
lines changed

pkg/cdc/reader.go

Lines changed: 36 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,18 @@ import (
3535
var _ Reader = new(tableReader)
3636
var _ TableReader = new(tableReader)
3737

38+
const (
39+
DefaultFrequency = 200 * time.Millisecond
40+
)
41+
42+
type WatermarkUpdater interface {
43+
RemoveCachedWM(ctx context.Context, key *WatermarkKey) (err error)
44+
UpdateWatermarkErrMsg(ctx context.Context, key *WatermarkKey, errMsg string) (err error)
45+
GetFromCache(ctx context.Context, key *WatermarkKey) (watermark types.TS, err error)
46+
GetOrAddCommitted(ctx context.Context, key *WatermarkKey, watermark *types.TS) (ret types.TS, err error)
47+
UpdateWatermarkOnly(ctx context.Context, key *WatermarkKey, watermark *types.TS) (err error)
48+
}
49+
3850
type tableReader struct {
3951
cnTxnClient client.TxnClient
4052
cnEngine engine.Engine
@@ -44,8 +56,9 @@ type tableReader struct {
4456
taskId string
4557
info *DbTableInfo
4658
sinker Sinker
47-
wMarkUpdater *CDCWatermarkUpdater
59+
wMarkUpdater WatermarkUpdater
4860
tick *time.Ticker
61+
force bool
4962
initSnapshotSplitTxn bool
5063
runningReaders *sync.Map
5164
startTs, endTs types.TS
@@ -128,6 +141,16 @@ func (reader *tableReader) Close() {
128141
reader.sinker.Close()
129142
}
130143

144+
func (reader *tableReader) forceNextInterval(wait time.Duration) {
145+
logutil.Info(
146+
"CDC-TableReader-ResetNextInterval",
147+
zap.String("info", reader.info.String()),
148+
zap.Duration("wait", wait),
149+
)
150+
reader.force = true
151+
reader.tick.Reset(wait)
152+
}
153+
131154
func (reader *tableReader) Run(
132155
ctx context.Context,
133156
ar *ActiveRoutine,
@@ -208,8 +231,7 @@ func (reader *tableReader) Run(
208231
wait = 200 * time.Millisecond
209232
}
210233

211-
firstSync := true
212-
reader.tick.Reset(wait)
234+
reader.forceNextInterval(wait)
213235

214236
for {
215237
select {
@@ -222,16 +244,21 @@ func (reader *tableReader) Run(
222244
case <-reader.tick.C:
223245
}
224246

247+
if reader.force {
248+
reader.force = false
249+
reader.tick.Reset(reader.frequency)
250+
logutil.Info(
251+
"CDC-TableReader-ResetNextInterval",
252+
zap.String("info", reader.info.String()),
253+
zap.Duration("frequency", reader.frequency),
254+
)
255+
}
256+
logutil.Infof("lalala read")
225257
if err = reader.readTable(ctx, ar); err != nil {
226258
logutil.Errorf("cdc tableReader(%v) failed, err: %v", reader.info, err)
227259
return
228260
}
229261

230-
if firstSync {
231-
firstSync = false
232-
reader.tick.Reset(reader.frequency)
233-
}
234-
235262
}
236263
}
237264

@@ -326,6 +353,7 @@ func (reader *tableReader) readTable(ctx context.Context, ar *ActiveRoutine) (er
326353
zap.String("watermark", watermark.ToString()),
327354
)
328355
err = nil
356+
reader.forceNextInterval(DefaultFrequency)
329357
}
330358
return
331359
}

pkg/cdc/reader_test.go

Lines changed: 127 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ import (
1818
"context"
1919
"strings"
2020
"sync"
21+
"sync/atomic"
2122
"testing"
2223
"time"
2324

@@ -31,6 +32,7 @@ import (
3132
"github.com/matrixorigin/matrixone/pkg/pb/timestamp"
3233
"github.com/matrixorigin/matrixone/pkg/txn/client"
3334
"github.com/matrixorigin/matrixone/pkg/vm/engine"
35+
"github.com/matrixorigin/matrixone/pkg/vm/engine/tae/testutils"
3436
"github.com/prashantv/gostub"
3537
"github.com/stretchr/testify/assert"
3638
)
@@ -140,6 +142,44 @@ func TestNewTableReader(t *testing.T) {
140142
}
141143
}
142144

145+
type testWatermarkUpdater struct {
146+
wmMap map[WatermarkKey]types.TS
147+
errorMp map[WatermarkKey]error
148+
}
149+
150+
func (u *testWatermarkUpdater) GetFromCache(ctx context.Context, key *WatermarkKey) (watermark types.TS, err error) {
151+
watermark, ok := u.wmMap[*key]
152+
if !ok {
153+
return types.TS{}, moerr.NewInternalErrorNoCtx("key not found")
154+
}
155+
return watermark, nil
156+
}
157+
158+
func (u *testWatermarkUpdater) GetOrAddCommitted(ctx context.Context, key *WatermarkKey, watermark *types.TS) (ret types.TS, err error) {
159+
ret, ok := u.wmMap[*key]
160+
if ok {
161+
return ret, nil
162+
}
163+
u.wmMap[*key] = *watermark
164+
u.errorMp[*key] = nil
165+
return *watermark, nil
166+
}
167+
168+
func (u *testWatermarkUpdater) RemoveCachedWM(ctx context.Context, key *WatermarkKey) (err error) {
169+
delete(u.wmMap, *key)
170+
delete(u.errorMp, *key)
171+
return nil
172+
}
173+
174+
func (u *testWatermarkUpdater) UpdateWatermarkErrMsg(ctx context.Context, key *WatermarkKey, errMsg string) (err error) {
175+
u.errorMp[*key] = moerr.NewInternalErrorNoCtx(errMsg)
176+
return nil
177+
}
178+
179+
func (u *testWatermarkUpdater) UpdateWatermarkOnly(ctx context.Context, key *WatermarkKey, watermark *types.TS) (err error) {
180+
u.wmMap[*key] = *watermark
181+
return nil
182+
}
143183
func Test_tableReader_Run(t *testing.T) {
144184
type fields struct {
145185
cnTxnClient client.TxnClient
@@ -610,6 +650,7 @@ func Test_tableReader_readTable(t *testing.T) {
610650
runningReaders: &sync.Map{},
611651
sinker: NewConsoleSinker(nil, nil),
612652
wMarkUpdater: u,
653+
tick: time.NewTicker(DefaultFrequency),
613654
info: &DbTableInfo{
614655
SourceDbName: "db1",
615656
SourceTblName: "t1",
@@ -866,3 +907,89 @@ func allocTestBatch(
866907
func Test_changesHandle(t *testing.T) {
867908
newTestChangesHandle("db", "t1", 20, 23, types.TS{}, nil, nil)
868909
}
910+
911+
func TestStaleRead(t *testing.T) {
912+
readDone := make(chan struct{})
913+
defer close(readDone)
914+
var readCount atomic.Int32
915+
stub := gostub.Stub(&readTableWithTxn, func(*tableReader, context.Context, client.TxnOperator, *types.Packer, *ActiveRoutine) error {
916+
t.Logf("read %d", readCount.Load())
917+
readDone <- struct{}{}
918+
var err error
919+
if readCount.Load() == 0 {
920+
err = moerr.NewErrStaleReadNoCtx("", "")
921+
}
922+
readCount.Add(1)
923+
return err
924+
})
925+
defer stub.Reset()
926+
stub1 := gostub.Stub(&GetTxnOp,
927+
func(_ context.Context, _ engine.Engine, _ client.TxnClient, _ string) (client.TxnOperator, error) {
928+
return nil, nil
929+
})
930+
defer stub1.Reset()
931+
932+
stub2 := gostub.Stub(&FinishTxnOp,
933+
func(ctx context.Context, inputErr error, txnOp client.TxnOperator, cnEngine engine.Engine) {
934+
935+
})
936+
defer stub2.Reset()
937+
stub3 := gostub.Stub(&GetTxn,
938+
func(ctx context.Context, cnEngine engine.Engine, txnOp client.TxnOperator) error {
939+
return nil
940+
})
941+
defer stub3.Reset()
942+
943+
stub7 := gostub.Stub(&EnterRunSql, func(client.TxnOperator) {})
944+
defer stub7.Reset()
945+
946+
stub8 := gostub.Stub(&ExitRunSql, func(client.TxnOperator) {})
947+
defer stub8.Reset()
948+
949+
runnerReaders := &sync.Map{}
950+
reader := &tableReader{
951+
info: &DbTableInfo{
952+
SourceDbName: "db1",
953+
SourceTblName: "t1",
954+
},
955+
runningReaders: runnerReaders,
956+
accountId: 1,
957+
taskId: "task1",
958+
wMarkUpdater: &testWatermarkUpdater{
959+
wmMap: map[WatermarkKey]types.TS{},
960+
errorMp: map[WatermarkKey]error{},
961+
},
962+
sinker: NewConsoleSinker(nil, nil),
963+
frequency: time.Hour,
964+
tick: time.NewTicker(time.Hour),
965+
packerPool: fileservice.NewPool(
966+
128,
967+
func() *types.Packer {
968+
return types.NewPacker()
969+
},
970+
func(packer *types.Packer) {
971+
packer.Reset()
972+
},
973+
func(packer *types.Packer) {
974+
packer.Close()
975+
},
976+
),
977+
}
978+
key := GenDbTblKey(reader.info.SourceDbName, reader.info.SourceTblName)
979+
980+
ar := NewCdcActiveRoutine()
981+
ctx, cancel := context.WithCancel(context.Background())
982+
go reader.Run(ctx, ar)
983+
for i := 0; i < 2; i++ {
984+
<-readDone
985+
}
986+
time.Sleep(2 * DefaultFrequency)
987+
assert.Equal(t, int32(2), readCount.Load())
988+
cancel()
989+
testutils.WaitExpect(1000, func() bool {
990+
_, ok := runnerReaders.Load(key)
991+
return !ok
992+
})
993+
_, ok := runnerReaders.Load(key)
994+
assert.False(t, ok)
995+
}

pkg/cdc/table_scanner.go

Lines changed: 19 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,6 @@ import (
2222
"sync"
2323
"time"
2424

25-
"github.com/matrixorigin/matrixone/pkg/common/moerr"
2625
"github.com/matrixorigin/matrixone/pkg/objectio"
2726

2827
"go.uber.org/zap"
@@ -58,6 +57,7 @@ var GetTableDetector = func(cnUUID string) *TableDetector {
5857
CallBackTableName: make(map[string][]string),
5958
SubscribedTableNames: make(map[string][]string),
6059
}
60+
detector.scanTableFn = detector.scanTable
6161
})
6262
return detector
6363
}
@@ -88,6 +88,8 @@ type TableDetector struct {
8888
// tablename -> [taska, taskb ...]
8989
SubscribedTableNames map[string][]string
9090

91+
scanTableFn func() error
92+
9193
// to make sure there is at most only one handleNewTables running, so the truncate info will not be lost
9294
handling bool
9395
lastMp map[uint32]TblMap
@@ -174,6 +176,10 @@ func (s *TableDetector) UnRegister(id string) {
174176
}
175177

176178
delete(s.Callbacks, id)
179+
if len(s.Callbacks) == 0 {
180+
s.cancel()
181+
s.cancel = nil
182+
}
177183

178184
logutil.Info(
179185
"CDC-TableDetector-UnRegister",
@@ -216,28 +222,29 @@ func (s *TableDetector) scanTableLoop(ctx context.Context) {
216222
s.scanAndProcess(ctx)
217223
case <-retryTicker.C:
218224
s.mu.Lock()
219-
if s.handling || s.lastMp == nil {
220-
s.mu.Unlock()
225+
handling, lastMp := s.handling, s.lastMp
226+
s.mu.Unlock()
227+
if handling || lastMp == nil {
221228
continue
222229
}
223-
s.mu.Unlock()
224230

225-
go s.processCallback(ctx, s.lastMp)
231+
go s.processCallback(ctx, lastMp)
226232
}
227233
}
228234
}
229235

230236
func (s *TableDetector) scanAndProcess(ctx context.Context) {
231-
if err := s.scanTable(); err != nil {
237+
if err := s.scanTableFn(); err != nil {
232238
logutil.Error("CDC-TableDetector-Scan-Error", zap.Error(err))
233239
return
234240
}
235241

236242
s.mu.Lock()
237243
s.lastMp = s.Mp
244+
mp := s.lastMp
238245
s.mu.Unlock()
239246

240-
go s.processCallback(ctx, s.lastMp)
247+
go s.processCallback(ctx, mp)
241248
}
242249

243250
func (s *TableDetector) processCallback(ctx context.Context, tables map[uint32]TblMap) {
@@ -263,11 +270,13 @@ func (s *TableDetector) processCallback(ctx context.Context, tables map[uint32]T
263270
s.handling = false
264271
}
265272

266-
func (s *TableDetector) scanTable() error {
267-
if objectio.CDCScanTableErrInjected() {
268-
return moerr.NewInternalError(context.Background(), "CDC_SCANTABLE_ERR")
273+
func (s *TableDetector) Close() {
274+
if s.cancel != nil {
275+
s.cancel()
269276
}
277+
}
270278

279+
func (s *TableDetector) scanTable() error {
271280
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
272281
defer cancel()
273282
var (

0 commit comments

Comments
 (0)