Skip to content

Commit 062ebd3

Browse files
authored
to 3.0: report the error when the subscription table fails. (#22406)
Report an error when the subscription table fails. Approved by: @XuPeng-SH, @heni02, @aunjgr, @fengttt
1 parent ea17d08 commit 062ebd3

File tree

7 files changed

+55
-10
lines changed

7 files changed

+55
-10
lines changed

pkg/fulltext/fixedbytepool.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -123,7 +123,7 @@ func (part *Partition) Id() uint64 {
123123

124124
// memory allocation with mpool.MPool
125125
func (part *Partition) alloc(capacity uint64) (err error) {
126-
part.data, err = part.proc.Mp().Alloc(int(capacity), false)
126+
part.data, err = part.proc.Mp().Alloc(int(capacity), true)
127127
if err != nil {
128128
return err
129129
}

pkg/objectio/injects.go

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,8 @@ const (
6262
FJ_CDCAddExecErr = "fj/cdc/addexecerr"
6363
FJ_CDCAddExecConsumeTruncate = "fj/cdc/addexecconsumetruncate"
6464

65-
FJ_CNFlushSmallObjs = "fj/cn/flush_small_objs"
65+
FJ_CNFlushSmallObjs = "fj/cn/flush_small_objs"
66+
FJ_CNSubscribeTableFail = "fj/cn/subscribe_table_fail"
6667
)
6768

6869
const (
@@ -186,6 +187,16 @@ func LogCNFlushSmallObjsInjected(args ...string) (bool, int) {
186187
return ok, level
187188
}
188189

190+
func LogCNSubscribeTableFailInjected(args ...string) (bool, int) {
191+
iarg, sarg, injected := fault.TriggerFault(FJ_CNSubscribeTableFail)
192+
if !injected {
193+
return false, 0
194+
}
195+
196+
ok, level := checkLoggingArgs(int(iarg), sarg, args...)
197+
return ok, level
198+
}
199+
189200
func InjectLogPartitionState(
190201
databaseName string,
191202
tableName string,

pkg/sql/colexec/anti/join.go

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -155,13 +155,13 @@ func (ctr *container) emptyProbe(ap *AntiJoin, inbat *batch.Batch, proc *process
155155
if n > hashmap.UnitLimit {
156156
n = hashmap.UnitLimit
157157
}
158-
for k := 0; k < n; k++ {
159-
for j, pos := range ap.Result {
160-
if err := ctr.rbat.Vecs[j].UnionOne(inbat.Vecs[pos], int64(i+k), proc.Mp()); err != nil {
161-
return err
162-
}
158+
159+
for j, pos := range ap.Result {
160+
if err := ctr.rbat.Vecs[j].UnionBatch(inbat.Vecs[pos], int64(i), n, nil, proc.Mp()); err != nil {
161+
return err
163162
}
164163
}
164+
165165
ctr.rbat.AddRowCount(n)
166166
}
167167

pkg/vm/engine/disttae/logtail_consumer.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -388,6 +388,12 @@ func (c *PushClient) toSubscribeTable(
388388
dbName string,
389389
) (ps *logtailreplay.PartitionState, err error) {
390390

391+
if faultInjected, _ := objectio.LogCNSubscribeTableFailInjected(
392+
dbName, tableName,
393+
); faultInjected {
394+
return nil, moerr.NewInternalErrorNoCtx("injected subscribe table err")
395+
}
396+
391397
var skip bool
392398
if skip, ps = c.skipSubIfSubscribed(ctx, tableID, dbID); skip {
393399
return ps, nil

pkg/vm/engine/disttae/txn_table.go

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2042,13 +2042,23 @@ func (tbl *txnTable) getPartitionState(
20422042

20432043
// Subscribe a latest partition state
20442044
eng := tbl.eng.(*Engine)
2045-
ps, err = eng.PushClient().toSubscribeTable(
2045+
if ps, err = eng.PushClient().toSubscribeTable(
20462046
ctx,
20472047
tbl.tableId,
20482048
tbl.tableName,
20492049
tbl.db.databaseId,
2050-
tbl.db.databaseName)
2051-
if ps != nil && ps.CanServe(types.TimestampToTS(tbl.db.op.SnapshotTS())) {
2050+
tbl.db.databaseName,
2051+
); err != nil {
2052+
logutil.Error(
2053+
"Txn-Table-Subscribe-Failed",
2054+
zap.String("table-name", tbl.tableName),
2055+
zap.Uint64("table-id", tbl.tableId),
2056+
zap.String("txn", tbl.db.op.Txn().DebugString()),
2057+
zap.Bool("is-snapshot", tbl.db.op.IsSnapOp()),
2058+
zap.Error(err),
2059+
)
2060+
return nil, err
2061+
} else if ps != nil && ps.CanServe(types.TimestampToTS(tbl.db.op.SnapshotTS())) {
20522062
return
20532063
}
20542064

test/distributed/cases/snapshot/clone/table_clone.result

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,17 @@ insert into t1(a,b) values(20,20),(21,21);
77
insert into t1(b) values(20000),(20001);
88
insert into t1(a,b) values(40000, 40000);
99
create table t2 clone t1;
10+
select enable_fault_injection();
11+
enable_fault_injection()
12+
true
13+
select add_fault_point('fj/cn/subscribe_table_fail',':::','echo',40,'db1.t2');
14+
add_fault_point(fj/cn/subscribe_table_fail, :::, echo, 40, db1.t2)
15+
true
16+
select * from t2 order by a asc;
17+
internal error: injected subscribe table err
18+
select disable_fault_injection();
19+
disable_fault_injection()
20+
true
1021
select * from t2 order by a asc;
1122
a b
1223
1 1

test/distributed/cases/snapshot/clone/table_clone.sql

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,13 @@ insert into t1(b) values(20000),(20001);
99
insert into t1(a,b) values(40000, 40000);
1010

1111
create table t2 clone t1;
12+
13+
select enable_fault_injection();
14+
-- @ignore:0
15+
select add_fault_point('fj/cn/subscribe_table_fail',':::','echo',40,'db1.t2');
16+
select * from t2 order by a asc;
17+
select disable_fault_injection();
18+
1219
select * from t2 order by a asc;
1320

1421
insert into t2(b) values(3),(4);

0 commit comments

Comments
 (0)