@@ -18,6 +18,7 @@ import (
18
18
"context"
19
19
"strings"
20
20
"sync"
21
+ "sync/atomic"
21
22
"testing"
22
23
"time"
23
24
@@ -31,6 +32,7 @@ import (
31
32
"github.com/matrixorigin/matrixone/pkg/pb/timestamp"
32
33
"github.com/matrixorigin/matrixone/pkg/txn/client"
33
34
"github.com/matrixorigin/matrixone/pkg/vm/engine"
35
+ "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/testutils"
34
36
"github.com/prashantv/gostub"
35
37
"github.com/stretchr/testify/assert"
36
38
)
@@ -140,6 +142,44 @@ func TestNewTableReader(t *testing.T) {
140
142
}
141
143
}
142
144
145
+ type testWatermarkUpdater struct {
146
+ wmMap map [WatermarkKey ]types.TS
147
+ errorMp map [WatermarkKey ]error
148
+ }
149
+
150
+ func (u * testWatermarkUpdater ) GetFromCache (ctx context.Context , key * WatermarkKey ) (watermark types.TS , err error ) {
151
+ watermark , ok := u .wmMap [* key ]
152
+ if ! ok {
153
+ return types.TS {}, moerr .NewInternalErrorNoCtx ("key not found" )
154
+ }
155
+ return watermark , nil
156
+ }
157
+
158
+ func (u * testWatermarkUpdater ) GetOrAddCommitted (ctx context.Context , key * WatermarkKey , watermark * types.TS ) (ret types.TS , err error ) {
159
+ ret , ok := u .wmMap [* key ]
160
+ if ok {
161
+ return ret , nil
162
+ }
163
+ u .wmMap [* key ] = * watermark
164
+ u .errorMp [* key ] = nil
165
+ return * watermark , nil
166
+ }
167
+
168
+ func (u * testWatermarkUpdater ) RemoveCachedWM (ctx context.Context , key * WatermarkKey ) (err error ) {
169
+ delete (u .wmMap , * key )
170
+ delete (u .errorMp , * key )
171
+ return nil
172
+ }
173
+
174
+ func (u * testWatermarkUpdater ) UpdateWatermarkErrMsg (ctx context.Context , key * WatermarkKey , errMsg string ) (err error ) {
175
+ u .errorMp [* key ] = moerr .NewInternalErrorNoCtx (errMsg )
176
+ return nil
177
+ }
178
+
179
+ func (u * testWatermarkUpdater ) UpdateWatermarkOnly (ctx context.Context , key * WatermarkKey , watermark * types.TS ) (err error ) {
180
+ u .wmMap [* key ] = * watermark
181
+ return nil
182
+ }
143
183
func Test_tableReader_Run (t * testing.T ) {
144
184
type fields struct {
145
185
cnTxnClient client.TxnClient
@@ -610,6 +650,7 @@ func Test_tableReader_readTable(t *testing.T) {
610
650
runningReaders : & sync.Map {},
611
651
sinker : NewConsoleSinker (nil , nil ),
612
652
wMarkUpdater : u ,
653
+ tick : time .NewTicker (DefaultFrequency ),
613
654
info : & DbTableInfo {
614
655
SourceDbName : "db1" ,
615
656
SourceTblName : "t1" ,
@@ -866,3 +907,89 @@ func allocTestBatch(
866
907
func Test_changesHandle (t * testing.T ) {
867
908
newTestChangesHandle ("db" , "t1" , 20 , 23 , types.TS {}, nil , nil )
868
909
}
910
+
911
+ func TestStaleRead (t * testing.T ) {
912
+ readDone := make (chan struct {})
913
+ defer close (readDone )
914
+ var readCount atomic.Int32
915
+ stub := gostub .Stub (& readTableWithTxn , func (* tableReader , context.Context , client.TxnOperator , * types.Packer , * ActiveRoutine ) error {
916
+ t .Logf ("read %d" , readCount .Load ())
917
+ readDone <- struct {}{}
918
+ var err error
919
+ if readCount .Load () == 0 {
920
+ err = moerr .NewErrStaleReadNoCtx ("" , "" )
921
+ }
922
+ readCount .Add (1 )
923
+ return err
924
+ })
925
+ defer stub .Reset ()
926
+ stub1 := gostub .Stub (& GetTxnOp ,
927
+ func (_ context.Context , _ engine.Engine , _ client.TxnClient , _ string ) (client.TxnOperator , error ) {
928
+ return nil , nil
929
+ })
930
+ defer stub1 .Reset ()
931
+
932
+ stub2 := gostub .Stub (& FinishTxnOp ,
933
+ func (ctx context.Context , inputErr error , txnOp client.TxnOperator , cnEngine engine.Engine ) {
934
+
935
+ })
936
+ defer stub2 .Reset ()
937
+ stub3 := gostub .Stub (& GetTxn ,
938
+ func (ctx context.Context , cnEngine engine.Engine , txnOp client.TxnOperator ) error {
939
+ return nil
940
+ })
941
+ defer stub3 .Reset ()
942
+
943
+ stub7 := gostub .Stub (& EnterRunSql , func (client.TxnOperator ) {})
944
+ defer stub7 .Reset ()
945
+
946
+ stub8 := gostub .Stub (& ExitRunSql , func (client.TxnOperator ) {})
947
+ defer stub8 .Reset ()
948
+
949
+ runnerReaders := & sync.Map {}
950
+ reader := & tableReader {
951
+ info : & DbTableInfo {
952
+ SourceDbName : "db1" ,
953
+ SourceTblName : "t1" ,
954
+ },
955
+ runningReaders : runnerReaders ,
956
+ accountId : 1 ,
957
+ taskId : "task1" ,
958
+ wMarkUpdater : & testWatermarkUpdater {
959
+ wmMap : map [WatermarkKey ]types.TS {},
960
+ errorMp : map [WatermarkKey ]error {},
961
+ },
962
+ sinker : NewConsoleSinker (nil , nil ),
963
+ frequency : time .Hour ,
964
+ tick : time .NewTicker (time .Hour ),
965
+ packerPool : fileservice .NewPool (
966
+ 128 ,
967
+ func () * types.Packer {
968
+ return types .NewPacker ()
969
+ },
970
+ func (packer * types.Packer ) {
971
+ packer .Reset ()
972
+ },
973
+ func (packer * types.Packer ) {
974
+ packer .Close ()
975
+ },
976
+ ),
977
+ }
978
+ key := GenDbTblKey (reader .info .SourceDbName , reader .info .SourceTblName )
979
+
980
+ ar := NewCdcActiveRoutine ()
981
+ ctx , cancel := context .WithCancel (context .Background ())
982
+ go reader .Run (ctx , ar )
983
+ for i := 0 ; i < 2 ; i ++ {
984
+ <- readDone
985
+ }
986
+ time .Sleep (2 * DefaultFrequency )
987
+ assert .Equal (t , int32 (2 ), readCount .Load ())
988
+ cancel ()
989
+ testutils .WaitExpect (1000 , func () bool {
990
+ _ , ok := runnerReaders .Load (key )
991
+ return ! ok
992
+ })
993
+ _ , ok := runnerReaders .Load (key )
994
+ assert .False (t , ok )
995
+ }
0 commit comments