Skip to content

Commit f48ad1c

Browse files
authored
to 3.0: Try snapshot read if the subscription does not exist. (#22412)
Try snapshot read if the subscription does not exist. Approved by: @XuPeng-SH
1 parent 062ebd3 commit f48ad1c

File tree

2 files changed

+31
-19
lines changed

2 files changed

+31
-19
lines changed

pkg/vm/engine/disttae/logtail_consumer.go

Lines changed: 12 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -388,18 +388,24 @@ func (c *PushClient) toSubscribeTable(
388388
dbName string,
389389
) (ps *logtailreplay.PartitionState, err error) {
390390

391-
if faultInjected, _ := objectio.LogCNSubscribeTableFailInjected(
391+
var (
392+
skip bool
393+
state SubscribeState
394+
injected bool
395+
)
396+
397+
if injected, _ = objectio.LogCNSubscribeTableFailInjected(
392398
dbName, tableName,
393-
); faultInjected {
394-
return nil, moerr.NewInternalErrorNoCtx("injected subscribe table err")
399+
); injected {
400+
return nil,
401+
moerr.NewInternalErrorNoCtx("injected subscribe table err")
395402
}
396403

397-
var skip bool
398404
if skip, ps = c.skipSubIfSubscribed(ctx, tableID, dbID); skip {
399405
return ps, nil
400406
}
401407

402-
state, err := c.toSubIfUnsubscribed(ctx, dbID, tableID)
408+
state, err = c.toSubIfUnsubscribed(ctx, dbID, tableID)
403409
if err != nil {
404410
return nil, err
405411
}
@@ -423,12 +429,7 @@ func (c *PushClient) toSubscribeTable(
423429
}
424430
case SubRspTableNotExist:
425431
c.subscribed.clearTable(dbID, tableID)
426-
return nil, moerr.NewInternalErrorf(
427-
ctx,
428-
"%s to subcribe tbl[%d-%s] failed since table is not exist",
429-
logTag,
430-
tableID,
431-
tableName)
432+
return nil, moerr.NewNoSuchTable(ctx, fmt.Sprintf("%s(%d)", dbName, dbID), fmt.Sprintf("%s(%d)", tableName, tableID))
432433
case Unsubscribing:
433434
//need to wait for unsubscribe succeed for making the subscribe and unsubscribe execute in order,
434435
// otherwise the partition state will leak log tails.

pkg/vm/engine/disttae/txn_table.go

Lines changed: 19 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -2025,7 +2025,12 @@ func (tbl *txnTable) getPartitionState(
20252025
}
20262026
}()
20272027

2028-
createdInTxn, err := tbl.isCreatedInTxn(ctx)
2028+
var (
2029+
eng = tbl.eng.(*Engine)
2030+
createdInTxn bool
2031+
)
2032+
2033+
createdInTxn, err = tbl.isCreatedInTxn(ctx)
20292034
if err != nil {
20302035
return nil, err
20312036
}
@@ -2041,7 +2046,6 @@ func (tbl *txnTable) getPartitionState(
20412046
}
20422047

20432048
// Subscribe a latest partition state
2044-
eng := tbl.eng.(*Engine)
20452049
if ps, err = eng.PushClient().toSubscribeTable(
20462050
ctx,
20472051
tbl.tableId,
@@ -2050,14 +2054,21 @@ func (tbl *txnTable) getPartitionState(
20502054
tbl.db.databaseName,
20512055
); err != nil {
20522056
logutil.Error(
2053-
"Txn-Table-Subscribe-Failed",
2054-
zap.String("table-name", tbl.tableName),
2055-
zap.Uint64("table-id", tbl.tableId),
2056-
zap.String("txn", tbl.db.op.Txn().DebugString()),
2057-
zap.Bool("is-snapshot", tbl.db.op.IsSnapOp()),
2057+
"Txn-Table-ToSubscribeTable-Failed",
2058+
zap.String("db-name", tbl.db.databaseName),
2059+
zap.Uint64("db-id", tbl.db.databaseId),
2060+
zap.String("tbl-name", tbl.tableName),
2061+
zap.Uint64("tbl-id", tbl.tableId),
2062+
zap.String("txn-info", tbl.db.op.Txn().DebugString()),
2063+
zap.Bool("is-snapshot-op", tbl.db.op.IsSnapOp()),
20582064
zap.Error(err),
20592065
)
2060-
return nil, err
2066+
2067+
// if the table not exists, try snapshot read
2068+
if !moerr.IsMoErrCode(err, moerr.ErrNoSuchTable) {
2069+
return nil, err
2070+
}
2071+
20612072
} else if ps != nil && ps.CanServe(types.TimestampToTS(tbl.db.op.SnapshotTS())) {
20622073
return
20632074
}

0 commit comments

Comments
 (0)