Skip to content

Commit 8feb70f

Browse files
fix cdc sinker and handle new table (#22389)
fix cdc sinker and handle new table Approved by: @ck89119, @daviszhen, @XuPeng-SH
1 parent e251b3a commit 8feb70f

File tree

5 files changed

+107
-8
lines changed

5 files changed

+107
-8
lines changed

pkg/cdc/sinker.go

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -855,6 +855,10 @@ var NewMysqlSink = func(
855855
return ret, err
856856
}
857857

858+
func (s *mysqlSink) Reset() {
859+
s.tx = nil
860+
}
861+
858862
func (s *mysqlSink) recordTxnSQL(sqlBuf []byte) {
859863
if !s.debugTxnRecorder.doRecord {
860864
return
@@ -938,17 +942,11 @@ func (s *mysqlSink) SendBegin(ctx context.Context) (err error) {
938942

939943
func (s *mysqlSink) SendCommit(_ context.Context) error {
940944
s.resetRecordedTxn()
941-
defer func() {
942-
s.tx = nil
943-
}()
944945
return s.tx.Commit()
945946
}
946947

947948
func (s *mysqlSink) SendRollback(_ context.Context) error {
948949
s.resetRecordedTxn()
949-
defer func() {
950-
s.tx = nil
951-
}()
952950
return s.tx.Rollback()
953951
}
954952

pkg/cdc/types.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -153,6 +153,7 @@ type Sink interface {
153153
SendBegin(ctx context.Context) error
154154
SendCommit(ctx context.Context) error
155155
SendRollback(ctx context.Context) error
156+
Reset()
156157
Close()
157158
}
158159

pkg/cdc/util.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -639,6 +639,8 @@ var GetTableDef = func(
639639
cnEngine engine.Engine,
640640
tblId uint64,
641641
) (*plan.TableDef, error) {
642+
ctx, cancel := context.WithTimeout(ctx, time.Minute*5)
643+
defer cancel()
642644
_, _, rel, err := cnEngine.GetRelationById(ctx, txnOp, tblId)
643645
if err != nil {
644646
return nil, err

pkg/frontend/cdc_exector.go

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -411,8 +411,6 @@ func (exec *CDCTaskExecutor) handleNewTables(allAccountTbls map[uint32]cdc.TblMa
411411

412412
accountId := uint32(exec.spec.Accounts[0].GetId())
413413
ctx := defines.AttachAccountId(context.Background(), accountId)
414-
ctx, cancel := context.WithTimeout(ctx, time.Minute*5)
415-
defer cancel()
416414

417415
txnOp, err := cdc.GetTxnOp(ctx, exec.cnEngine, exec.cnTxnClient, "cdc-handleNewTables")
418416
if err != nil {
Lines changed: 100 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,100 @@
1+
// Copyright 2024 Matrix Origin
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
package test
16+
17+
import (
18+
"context"
19+
"database/sql"
20+
"testing"
21+
"time"
22+
23+
"github.com/DATA-DOG/go-sqlmock"
24+
"github.com/matrixorigin/matrixone/pkg/catalog"
25+
"github.com/matrixorigin/matrixone/pkg/cdc"
26+
"github.com/matrixorigin/matrixone/pkg/defines"
27+
catalog2 "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/catalog"
28+
"github.com/matrixorigin/matrixone/pkg/vm/engine/test/testutil"
29+
"github.com/prashantv/gostub"
30+
"github.com/stretchr/testify/require"
31+
)
32+
33+
func TestCDC_Sinker1(t *testing.T) {
34+
var mock sqlmock.Sqlmock
35+
mockFn := func(_, _, _ string, _ int, _ string) (db *sql.DB, err error) {
36+
db, mock, err = sqlmock.New()
37+
return
38+
}
39+
stub := gostub.Stub(&cdc.OpenDbConn, mockFn)
40+
defer stub.Reset()
41+
42+
sink, err := cdc.NewMysqlSink(
43+
"root",
44+
"123456",
45+
"127.0.0.1",
46+
3306,
47+
3,
48+
3*time.Second,
49+
cdc.CDCDefaultSendSqlTimeout,
50+
false,
51+
)
52+
require.NoError(t, err)
53+
defer sink.Close()
54+
55+
ctx := context.Background()
56+
57+
mock.ExpectBegin()
58+
err = sink.SendBegin(ctx)
59+
require.NoError(t, err)
60+
mock.ExpectCommit()
61+
err = sink.SendCommit(ctx)
62+
require.NoError(t, err)
63+
mock.ExpectRollback()
64+
err = sink.SendRollback(ctx)
65+
require.Error(t, err)
66+
sink.Reset()
67+
}
68+
69+
func TestCDCUtil1(t *testing.T) {
70+
catalog.SetupDefines("")
71+
72+
var (
73+
accountId = catalog.System_Account
74+
tableName = "test1"
75+
databaseName = "db1"
76+
)
77+
78+
ctx, cancel := context.WithCancel(context.Background())
79+
defer cancel()
80+
ctx = context.WithValue(ctx, defines.TenantIDKey{}, accountId)
81+
82+
disttaeEngine, taeHandler, rpcAgent, _ := testutil.CreateEngines(ctx, testutil.TestOptions{}, t)
83+
defer func() {
84+
disttaeEngine.Close(ctx)
85+
taeHandler.Close(true)
86+
rpcAgent.Close()
87+
}()
88+
schema := catalog2.MockSchemaAll(10, 0)
89+
schema.Name = tableName
90+
ctx, cancel = context.WithTimeout(ctx, time.Minute*5)
91+
defer cancel()
92+
_, rel, err := disttaeEngine.CreateDatabaseAndTable(ctx, databaseName, tableName, schema)
93+
id := rel.GetTableID(context.Background())
94+
require.NoError(t, err)
95+
96+
txn, err := disttaeEngine.NewTxnOperator(ctx, disttaeEngine.Now())
97+
require.NoError(t, err)
98+
_, err = cdc.GetTableDef(ctx, txn, disttaeEngine.Engine, id)
99+
require.NoError(t, err)
100+
}

0 commit comments

Comments
 (0)