diff --git a/downstreamadapter/dispatcher/basic_dispatcher.go b/downstreamadapter/dispatcher/basic_dispatcher.go index db7c51f23b..9fac6fe94a 100644 --- a/downstreamadapter/dispatcher/basic_dispatcher.go +++ b/downstreamadapter/dispatcher/basic_dispatcher.go @@ -458,10 +458,14 @@ func (d *BasicDispatcher) handleEvents(dispatcherEvents []DispatcherEvent, wakeC } } + tableID := int64(0) + if ddl.TableInfo != nil { + tableID = ddl.TableInfo.TableName.TableID + } log.Info("dispatcher receive ddl event", zap.Stringer("dispatcher", d.id), zap.String("query", ddl.Query), - zap.Int64("table", ddl.TableID), + zap.Int64("table", tableID), zap.Uint64("commitTs", event.GetCommitTs()), zap.Uint64("seq", event.GetSeq())) ddl.AddPostFlushFunc(func() { diff --git a/downstreamadapter/dispatcher/event_dispatcher_test.go b/downstreamadapter/dispatcher/event_dispatcher_test.go index 85f02f2951..c5a198162c 100644 --- a/downstreamadapter/dispatcher/event_dispatcher_test.go +++ b/downstreamadapter/dispatcher/event_dispatcher_test.go @@ -826,7 +826,6 @@ func TestDispatcherSplittableCheck(t *testing.T) { }, TableInfo: commonTableInfo, Query: "ALTER TABLE t ADD COLUMN new_col INT", - TableID: 1, } // Create dispatcher event diff --git a/downstreamadapter/sink/kafka/sink_test.go b/downstreamadapter/sink/kafka/sink_test.go index 4209450e5e..02d5369a92 100644 --- a/downstreamadapter/sink/kafka/sink_test.go +++ b/downstreamadapter/sink/kafka/sink_test.go @@ -205,6 +205,7 @@ func TestKafkaSinkBasicFunctionality(t *testing.T) { Query: job.Query, SchemaName: job.SchemaName, TableName: job.TableName, + TableInfo: common.WrapTableInfo(job.SchemaName, job.BinlogInfo.TableInfo), FinishedTs: 1, BlockedTables: &commonEvent.InfluencedTables{ InfluenceType: commonEvent.InfluenceTypeNormal, @@ -220,6 +221,7 @@ func TestKafkaSinkBasicFunctionality(t *testing.T) { Query: job.Query, SchemaName: job.SchemaName, TableName: job.TableName, + TableInfo: common.WrapTableInfo(job.SchemaName, job.BinlogInfo.TableInfo), FinishedTs: 4, BlockedTables: &commonEvent.InfluencedTables{ InfluenceType: commonEvent.InfluenceTypeNormal, diff --git a/logservice/schemastore/persist_storage_ddl_handlers.go b/logservice/schemastore/persist_storage_ddl_handlers.go index 48cbea8b02..0db0b5c2df 100644 --- a/logservice/schemastore/persist_storage_ddl_handlers.go +++ b/logservice/schemastore/persist_storage_ddl_handlers.go @@ -1450,7 +1450,9 @@ func extractTableInfoFuncForAddPartition(event *PersistedDDLEvent, tableID int64 newCreatedIDs := getCreatedIDs(event.PrevPartitions, getAllPartitionIDs(event.TableInfo)) for _, partition := range newCreatedIDs { if tableID == partition { - return common.WrapTableInfo(event.SchemaName, event.TableInfo), false + // Use WrapTableInfoWithTableID to ensure the returned TableInfo carries + // the physical partition ID (tableID) instead of the logical table ID. + return common.WrapTableInfoWithTableID(event.SchemaName, event.TableInfo, tableID), false } } return nil, false @@ -1477,7 +1479,9 @@ func extractTableInfoFuncForTruncateAndReorganizePartition(event *PersistedDDLEv newCreatedIDs := getCreatedIDs(event.PrevPartitions, physicalIDs) for _, partition := range newCreatedIDs { if tableID == partition { - return common.WrapTableInfo(event.SchemaName, event.TableInfo), false + // Use WrapTableInfoWithTableID to ensure the returned TableInfo carries + // the physical partition ID (tableID) instead of the logical table ID. + return common.WrapTableInfoWithTableID(event.SchemaName, event.TableInfo, tableID), false } } return nil, false @@ -1536,7 +1540,9 @@ func extractTableInfoFuncForAlterTablePartitioning(event *PersistedDDLEvent, tab } for _, partitionID := range getAllPartitionIDs(event.TableInfo) { if tableID == partitionID { - return common.WrapTableInfo(event.SchemaName, event.TableInfo), false + // Use WrapTableInfoWithTableID to ensure the returned TableInfo carries + // the physical partition ID (tableID) instead of the logical table ID. + return common.WrapTableInfoWithTableID(event.SchemaName, event.TableInfo, tableID), false } } log.Panic("should not reach here", zap.Int64("tableID", tableID)) @@ -1545,7 +1551,9 @@ func extractTableInfoFuncForAlterTablePartitioning(event *PersistedDDLEvent, tab func extractTableInfoFuncForRemovePartitioning(event *PersistedDDLEvent, tableID int64) (*common.TableInfo, bool) { if event.TableID == tableID { - return common.WrapTableInfo(event.SchemaName, event.TableInfo), false + // Use WrapTableInfoWithTableID to ensure the returned TableInfo carries + // the physical partition ID (tableID) instead of the logical table ID. + return common.WrapTableInfoWithTableID(event.SchemaName, event.TableInfo, tableID), false } for _, partitionID := range event.PrevPartitions { if tableID == partitionID { @@ -1560,7 +1568,26 @@ func extractTableInfoFuncForRemovePartitioning(event *PersistedDDLEvent, tableID // buildDDLEvent begin // ======= +// buildDDLEventCommon wraps buildDDLEventCommonWithTableID, defaulting to use +// the logical table ID (rawEvent.TableInfo.ID). +// This maintains compatibility for DDL handlers that do not operate on +// specific physical partitions. func buildDDLEventCommon(rawEvent *PersistedDDLEvent, tableFilter filter.Filter, tiDBOnly bool) (commonEvent.DDLEvent, bool, error) { + tableID := int64(0) + if rawEvent.TableInfo != nil { + tableID = rawEvent.TableInfo.ID + } + return buildDDLEventCommonWithTableID(rawEvent, tableID, tableFilter, tiDBOnly) +} + +// buildDDLEventCommonWithTableID constructs a commonEvent.DDLEvent using a +// *specific* tableID. +// This function is the core builder, ensuring that both the DDLEvent.TableID +// field and the associated wrapTableInfo (if it exists) are populated +// with the provided tableID. +// For partition-related DDLs, this tableID should be the physical partition ID. +// For other DDLs, it defaults to the logical table ID. +func buildDDLEventCommonWithTableID(rawEvent *PersistedDDLEvent, tableID int64, tableFilter filter.Filter, tiDBOnly bool) (commonEvent.DDLEvent, bool, error) { var wrapTableInfo *common.TableInfo // Note: not all ddl types will respect the `filtered` result: create tables, rename tables, rename table, exchange table partition. filtered, notSync := false, false @@ -1623,14 +1650,15 @@ func buildDDLEventCommon(rawEvent *PersistedDDLEvent, tableFilter filter.Filter, } if rawEvent.TableInfo != nil { - wrapTableInfo = common.WrapTableInfo(rawEvent.SchemaName, rawEvent.TableInfo) + // Use the provided tableID (which could be a physical partition ID) + // to create the TableInfo wrapper. + wrapTableInfo = common.WrapTableInfoWithTableID(rawEvent.SchemaName, rawEvent.TableInfo, tableID) } return commonEvent.DDLEvent{ Type: rawEvent.Type, // TODO: whether the following four fields are needed SchemaID: rawEvent.SchemaID, - TableID: rawEvent.TableID, SchemaName: rawEvent.SchemaName, TableName: rawEvent.TableName, @@ -2084,7 +2112,7 @@ func buildDDLEventForRenameTable(rawEvent *PersistedDDLEvent, tableFilter filter } func buildDDLEventForAddPartition(rawEvent *PersistedDDLEvent, tableFilter filter.Filter, tableID int64) (commonEvent.DDLEvent, bool, error) { - ddlEvent, ok, err := buildDDLEventCommon(rawEvent, tableFilter, WithoutTiDBOnly) + ddlEvent, ok, err := buildDDLEventCommonWithTableID(rawEvent, tableID, tableFilter, WithoutTiDBOnly) if err != nil { return commonEvent.DDLEvent{}, false, err } @@ -2113,7 +2141,7 @@ func buildDDLEventForAddPartition(rawEvent *PersistedDDLEvent, tableFilter filte } func buildDDLEventForDropPartition(rawEvent *PersistedDDLEvent, tableFilter filter.Filter, tableID int64) (commonEvent.DDLEvent, bool, error) { - ddlEvent, ok, err := buildDDLEventCommon(rawEvent, tableFilter, WithoutTiDBOnly) + ddlEvent, ok, err := buildDDLEventCommonWithTableID(rawEvent, tableID, tableFilter, WithoutTiDBOnly) if err != nil { return commonEvent.DDLEvent{}, false, err } @@ -2166,7 +2194,7 @@ func buildDDLEventForDropView(rawEvent *PersistedDDLEvent, tableFilter filter.Fi } func buildDDLEventForTruncateAndReorganizePartition(rawEvent *PersistedDDLEvent, tableFilter filter.Filter, tableID int64) (commonEvent.DDLEvent, bool, error) { - ddlEvent, ok, err := buildDDLEventCommon(rawEvent, tableFilter, WithoutTiDBOnly) + ddlEvent, ok, err := buildDDLEventCommonWithTableID(rawEvent, tableID, tableFilter, WithoutTiDBOnly) if err != nil { return commonEvent.DDLEvent{}, false, err } @@ -2199,7 +2227,7 @@ func buildDDLEventForTruncateAndReorganizePartition(rawEvent *PersistedDDLEvent, } func buildDDLEventForExchangeTablePartition(rawEvent *PersistedDDLEvent, tableFilter filter.Filter, tableID int64) (commonEvent.DDLEvent, bool, error) { - ddlEvent, ok, err := buildDDLEventCommon(rawEvent, tableFilter, WithoutTiDBOnly) + ddlEvent, ok, err := buildDDLEventCommonWithTableID(rawEvent, tableID, tableFilter, WithoutTiDBOnly) if err != nil { return commonEvent.DDLEvent{}, false, err } @@ -2622,7 +2650,7 @@ func buildDDLEventForCreateTables(rawEvent *PersistedDDLEvent, tableFilter filte } func buildDDLEventForAlterTablePartitioning(rawEvent *PersistedDDLEvent, tableFilter filter.Filter, tableID int64) (commonEvent.DDLEvent, bool, error) { - ddlEvent, ok, err := buildDDLEventCommon(rawEvent, tableFilter, WithoutTiDBOnly) + ddlEvent, ok, err := buildDDLEventCommonWithTableID(rawEvent, tableID, tableFilter, WithoutTiDBOnly) if err != nil { return commonEvent.DDLEvent{}, false, err } @@ -2658,7 +2686,7 @@ func buildDDLEventForAlterTablePartitioning(rawEvent *PersistedDDLEvent, tableFi } func buildDDLEventForRemovePartitioning(rawEvent *PersistedDDLEvent, tableFilter filter.Filter, tableID int64) (commonEvent.DDLEvent, bool, error) { - ddlEvent, ok, err := buildDDLEventCommon(rawEvent, tableFilter, WithoutTiDBOnly) + ddlEvent, ok, err := buildDDLEventCommonWithTableID(rawEvent, tableID, tableFilter, WithoutTiDBOnly) if err != nil { return commonEvent.DDLEvent{}, false, err } diff --git a/pkg/common/event/ddl_event.go b/pkg/common/event/ddl_event.go index 0c88175426..6a95160880 100644 --- a/pkg/common/event/ddl_event.go +++ b/pkg/common/event/ddl_event.go @@ -37,14 +37,7 @@ type DDLEvent struct { DispatcherID common.DispatcherID `json:"-"` Type byte `json:"type"` // SchemaID is from upstream job.SchemaID - SchemaID int64 `json:"schema_id"` - // TableID is from upstream job.TableID - // TableID means different for different job types: - // - for most ddl types which just involve a single table id, it is the table id of the table - // - for ExchangeTablePartition, it is the table id of the normal table before exchange - // and it is one of of the partition ids after exchange - // - for TruncateTable, it the table ID of the old table - TableID int64 `json:"table_id"` + SchemaID int64 `json:"schema_id"` SchemaName string `json:"schema_name"` TableName string `json:"table_name"` // the following two fields are just used for RenameTable, @@ -105,8 +98,8 @@ type DDLEvent struct { } func (d *DDLEvent) String() string { - return fmt.Sprintf("DDLEvent{Version: %d, DispatcherID: %s, Type: %d, SchemaID: %d, TableID: %d, SchemaName: %s, TableName: %s, ExtraSchemaName: %s, ExtraTableName: %s, Query: %s, TableInfo: %v, FinishedTs: %d, Seq: %d, BlockedTables: %v, NeedDroppedTables: %v, NeedAddedTables: %v, UpdatedSchemas: %v, TableNameChange: %v, TiDBOnly: %t, BDRMode: %s, Err: %s, eventSize: %d}", - d.Version, d.DispatcherID.String(), d.Type, d.SchemaID, d.TableID, d.SchemaName, d.TableName, d.ExtraSchemaName, d.ExtraTableName, d.Query, d.TableInfo, d.FinishedTs, d.Seq, d.BlockedTables, d.NeedDroppedTables, d.NeedAddedTables, d.UpdatedSchemas, d.TableNameChange, d.TiDBOnly, d.BDRMode, d.Err, d.eventSize) + return fmt.Sprintf("DDLEvent{Version: %d, DispatcherID: %s, Type: %d, SchemaID: %d, SchemaName: %s, TableName: %s, ExtraSchemaName: %s, ExtraTableName: %s, Query: %s, TableInfo: %v, FinishedTs: %d, Seq: %d, BlockedTables: %v, NeedDroppedTables: %v, NeedAddedTables: %v, UpdatedSchemas: %v, TableNameChange: %v, TiDBOnly: %t, BDRMode: %s, Err: %s, eventSize: %d}", + d.Version, d.DispatcherID.String(), d.Type, d.SchemaID, d.SchemaName, d.TableName, d.ExtraSchemaName, d.ExtraTableName, d.Query, d.TableInfo, d.FinishedTs, d.Seq, d.BlockedTables, d.NeedDroppedTables, d.NeedAddedTables, d.UpdatedSchemas, d.TableNameChange, d.TiDBOnly, d.BDRMode, d.Err, d.eventSize) } func (d *DDLEvent) GetType() int { diff --git a/pkg/common/event/ddl_event_test.go b/pkg/common/event/ddl_event_test.go index 137cdd1970..7e4bdc2ba8 100644 --- a/pkg/common/event/ddl_event_test.go +++ b/pkg/common/event/ddl_event_test.go @@ -35,7 +35,6 @@ func TestDDLEvent(t *testing.T) { DispatcherID: common.NewDispatcherID(), Type: byte(ddlJob.Type), SchemaID: ddlJob.SchemaID, - TableID: ddlJob.TableID, SchemaName: ddlJob.SchemaName, TableName: ddlJob.TableName, Query: ddlJob.Query, @@ -58,7 +57,6 @@ func TestDDLEvent(t *testing.T) { require.Equal(t, ddlEvent.DispatcherID, reverseEvent.DispatcherID) require.Equal(t, ddlEvent.Type, reverseEvent.Type) require.Equal(t, ddlEvent.SchemaID, reverseEvent.SchemaID) - require.Equal(t, ddlEvent.TableID, reverseEvent.TableID) require.Equal(t, ddlEvent.SchemaName, reverseEvent.SchemaName) require.Equal(t, ddlEvent.TableName, reverseEvent.TableName) require.Equal(t, ddlEvent.Query, reverseEvent.Query) diff --git a/pkg/common/event/util.go b/pkg/common/event/util.go index 3d86e1dc2a..8c5f6e579c 100644 --- a/pkg/common/event/util.go +++ b/pkg/common/event/util.go @@ -206,7 +206,6 @@ func (s *EventTestHelper) DDL2Event(ddl string) *DDLEvent { info := s.GetTableInfo(job) return &DDLEvent{ SchemaID: job.SchemaID, - TableID: info.TableName.TableID, SchemaName: job.SchemaName, TableName: job.TableName, Query: job.Query, diff --git a/pkg/common/table_info.go b/pkg/common/table_info.go index 35a8380304..8c93a5fe24 100644 --- a/pkg/common/table_info.go +++ b/pkg/common/table_info.go @@ -558,12 +558,22 @@ func NewTableInfo(schemaName string, tableName string, tableID int64, isPartitio return ti } -// WrapTableInfo creates a TableInfo from a model.TableInfo -func WrapTableInfo(schemaName string, info *model.TableInfo) *TableInfo { +// WrapTableInfoWithTableID creates a TableInfo from a model.TableInfo. +// It explicitly uses the provided tableID instead of the one from info.ID. +// This is used to create TableInfo wrappers for physical partitions, +// ensuring they carry the physical partition ID, not the logical table ID. +func WrapTableInfoWithTableID(schemaName string, info *model.TableInfo, tableID int64) *TableInfo { // search column schema object sharedColumnSchemaStorage := GetSharedColumnSchemaStorage() columnSchema := sharedColumnSchemaStorage.GetOrSetColumnSchema(info) - return NewTableInfo(schemaName, info.Name.O, info.ID, info.GetPartitionInfo() != nil, columnSchema, info) + return NewTableInfo(schemaName, info.Name.O, tableID, info.GetPartitionInfo() != nil, columnSchema, info) +} + +// WrapTableInfo creates a TableInfo from a model.TableInfo. +// This function is a convenience wrapper around WrapTableInfoWithTableID, +// defaulting to use the logical table ID (info.ID). +func WrapTableInfo(schemaName string, info *model.TableInfo) *TableInfo { + return WrapTableInfoWithTableID(schemaName, info, info.ID) } // NewTableInfo4Decoder is only used by the codec decoder for the test purpose, diff --git a/pkg/eventservice/event_broker.go b/pkg/eventservice/event_broker.go index 6944c4476e..2e67b71895 100644 --- a/pkg/eventservice/event_broker.go +++ b/pkg/eventservice/event_broker.go @@ -243,10 +243,15 @@ func (c *eventBroker) sendDDL(ctx context.Context, remoteID node.ID, e *event.DD case c.getMessageCh(d.messageWorkerIndex, common.IsRedoMode(d.info.GetMode())) <- ddlEvent: updateMetricEventServiceSendDDLCount(d.info.GetMode()) } + + tableID := int64(0) + if e.TableInfo != nil { + tableID = e.TableInfo.TableName.TableID + } log.Info("send ddl event to dispatcher", zap.Stringer("dispatcherID", d.id), zap.Int64("DDLSpanTableID", d.info.GetTableSpan().TableID), - zap.Int64("EventTableID", e.TableID), + zap.Int64("EventTableID", tableID), zap.String("query", e.Query), zap.Uint64("commitTs", e.FinishedTs), zap.Uint64("seq", e.Seq), zap.Int64("mode", d.info.GetMode())) } diff --git a/pkg/eventservice/event_scanner_test.go b/pkg/eventservice/event_scanner_test.go index ec92617366..6b81c3bdeb 100644 --- a/pkg/eventservice/event_scanner_test.go +++ b/pkg/eventservice/event_scanner_test.go @@ -319,7 +319,6 @@ func TestEventScanner(t *testing.T) { fakeDDL := event.DDLEvent{ FinishedTs: kvEvents[0].CRTs, TableInfo: ddlEvent.TableInfo, - TableID: ddlEvent.TableID, } broker.schemaStore.(*mockSchemaStore).AppendDDLEvent(tableID, fakeDDL) @@ -489,7 +488,6 @@ func TestEventScannerWithDDL(t *testing.T) { fakeDDL := event.DDLEvent{ FinishedTs: dml2.CRTs, TableInfo: ddlEvent.TableInfo, - TableID: ddlEvent.TableID, } mockSchemaStore.AppendDDLEvent(tableID, fakeDDL) @@ -637,12 +635,10 @@ func TestEventScannerWithDDL(t *testing.T) { fakeDDL2 := event.DDLEvent{ FinishedTs: resolvedTs + 1, TableInfo: ddlEvent.TableInfo, - TableID: ddlEvent.TableID, } fakeDDL3 := event.DDLEvent{ FinishedTs: resolvedTs + 2, TableInfo: ddlEvent.TableInfo, - TableID: ddlEvent.TableID, } mockSchemaStore.AppendDDLEvent(tableID, fakeDDL2) mockSchemaStore.AppendDDLEvent(tableID, fakeDDL3) @@ -690,7 +686,7 @@ func TestDMLProcessor(t *testing.T) { }...) tableInfo := ddlEvent.TableInfo - tableID := ddlEvent.TableID + tableID := ddlEvent.TableInfo.TableName.TableID dispatcherID := common.NewDispatcherID() // Create a mock mounter and schema getter @@ -873,7 +869,7 @@ func TestDMLProcessor(t *testing.T) { helper.Tk().MustExec("use test") ddlEvent := helper.DDL2Event("create table t2 (id int primary key, a int(50), b char(50), unique key uk_a(a))") tableInfo := ddlEvent.TableInfo - tableID := ddlEvent.TableID + tableID := ddlEvent.TableInfo.TableName.TableID _, updateEvent := helper.DML2UpdateEvent("test", "t2", "insert into test.t2(id, a, b) values (0, 1, 'b0')", @@ -913,7 +909,7 @@ func TestDMLProcessorAppendRow(t *testing.T) { }...) tableInfo := ddlEvent.TableInfo - tableID := ddlEvent.TableID + tableID := ddlEvent.TableInfo.TableName.TableID dispatcherID := common.NewDispatcherID() // Create a mock mounter and schema getter @@ -1235,7 +1231,7 @@ func TestEventMerger(t *testing.T) { `insert into test.t(id,c) values (0, "c0")`, }...) - tableID := ddlEvent.TableID + tableID := ddlEvent.TableInfo.TableName.TableID mockSchemaGetter := NewMockSchemaStore() mockSchemaGetter.AppendDDLEvent(tableID, ddlEvent) @@ -1268,7 +1264,7 @@ func TestEventMerger(t *testing.T) { merger := newEventMerger([]event.Event{&ddlEvent}) - tableID := ddlEvent.TableID + tableID := ddlEvent.TableInfo.TableName.TableID mockSchemaGetter := NewMockSchemaStore() mockSchemaGetter.AppendDDLEvent(tableID, ddlEvent) processor := newDMLProcessor(&mockMounter{}, mockSchemaGetter, nil, false) @@ -1304,26 +1300,24 @@ func TestEventMerger(t *testing.T) { []string{ `insert into test.t1(id,c) values (1, "c1")`, }...) - mockSchemaGetter.AppendDDLEvent(ddlEvent1.TableID, ddlEvent1) + mockSchemaGetter.AppendDDLEvent(ddlEvent1.TableInfo.TableName.TableID, ddlEvent1) ddlEvent2 := event.DDLEvent{ FinishedTs: kvEvents1[0].CRTs + 10, // DDL2 after DML1 TableInfo: ddlEvent1.TableInfo, - TableID: ddlEvent1.TableID, } - mockSchemaGetter.AppendDDLEvent(ddlEvent2.TableID, ddlEvent2) + mockSchemaGetter.AppendDDLEvent(ddlEvent2.TableInfo.TableName.TableID, ddlEvent2) ddlEvent3 := event.DDLEvent{ FinishedTs: kvEvents1[0].CRTs + 30, // DDL3 after DML1 TableInfo: ddlEvent1.TableInfo, - TableID: ddlEvent1.TableID, } - mockSchemaGetter.AppendDDLEvent(ddlEvent3.TableID, ddlEvent3) + mockSchemaGetter.AppendDDLEvent(ddlEvent3.TableInfo.TableName.TableID, ddlEvent3) ddlEvents := []event.Event{&ddlEvent1, &ddlEvent2, &ddlEvent3} merger := newEventMerger(ddlEvents) - tableID := ddlEvent1.TableID + tableID := ddlEvent1.TableInfo.TableName.TableID tableInfo := ddlEvent1.TableInfo processor := newDMLProcessor(mounter, mockSchemaGetter, nil, false) @@ -1384,17 +1378,14 @@ func TestEventMerger(t *testing.T) { ddlEvent2 := event.DDLEvent{ FinishedTs: 100, TableInfo: ddlEvent1.TableInfo, - TableID: ddlEvent1.TableID, } ddlEvent3 := event.DDLEvent{ FinishedTs: 200, TableInfo: ddlEvent1.TableInfo, - TableID: ddlEvent1.TableID, } ddlEvent4 := event.DDLEvent{ FinishedTs: 300, TableInfo: ddlEvent1.TableInfo, - TableID: ddlEvent1.TableID, } ddlEvents := []event.Event{&ddlEvent2, &ddlEvent3, &ddlEvent4} @@ -1424,7 +1415,7 @@ func TestScanAndMergeEventsSingleUKUpdate(t *testing.T) { // Create table with unique key on column 'a' helper.Tk().MustExec("use test") ddlEvent := helper.DDL2Event("create table t_uk (id int primary key, a int, b char(50), unique key uk_a(a))") - tableID := ddlEvent.TableID + tableID := ddlEvent.TableInfo.TableName.TableID // Generate update event that changes UK _, updateEvent := helper.DML2UpdateEvent("test", "t_uk", diff --git a/pkg/eventservice/event_service_test.go b/pkg/eventservice/event_service_test.go index 4bc1ca8d26..7af7f556ed 100644 --- a/pkg/eventservice/event_service_test.go +++ b/pkg/eventservice/event_service_test.go @@ -506,7 +506,6 @@ func genEvents(helper *commonEvent.EventTestHelper, ddl string, dmls ...string) return commonEvent.DDLEvent{ Version: commonEvent.DDLEventVersion, FinishedTs: job.BinlogInfo.FinishedTS, - TableID: job.BinlogInfo.TableInfo.ID, SchemaName: job.SchemaName, TableName: job.TableName, Query: ddl, diff --git a/pkg/redo/codec/codec_test.go b/pkg/redo/codec/codec_test.go index d554c06437..7418ebc2c0 100644 --- a/pkg/redo/codec/codec_test.go +++ b/pkg/redo/codec/codec_test.go @@ -31,7 +31,6 @@ func TestDDLRedoConvert(t *testing.T) { Query: "ALTER TABLE test.t1 ADD COLUMN a int", SchemaName: "Hello", TableName: "World", - TableID: 1, TableInfo: &common.TableInfo{}, } diff --git a/pkg/sink/codec/avro/decoder.go b/pkg/sink/codec/avro/decoder.go index 6faff1ff92..974f861043 100644 --- a/pkg/sink/codec/avro/decoder.go +++ b/pkg/sink/codec/avro/decoder.go @@ -461,10 +461,9 @@ func (d *decoder) NextDDLEvent() *commonEvent.DDLEvent { result.FinishedTs = baseDDLEvent.CommitTs actionType := common.GetDDLActionType(result.Query) result.Type = byte(actionType) - result.TableID = tableIDAllocator.Allocate(result.SchemaName, result.TableName) if d.idx == 0 { - tableIDAllocator.AddBlockTableID(result.SchemaName, result.TableName, result.TableID) + tableIDAllocator.AddBlockTableID(result.SchemaName, result.TableName, tableIDAllocator.Allocate(result.SchemaName, result.TableName)) result.BlockedTables = common.GetBlockedTables(tableIDAllocator, result) } return result diff --git a/pkg/sink/codec/canal/canal_json_decoder.go b/pkg/sink/codec/canal/canal_json_decoder.go index 0da8bba018..22e0f62ba5 100644 --- a/pkg/sink/codec/canal/canal_json_decoder.go +++ b/pkg/sink/codec/canal/canal_json_decoder.go @@ -356,8 +356,7 @@ func (d *decoder) NextDDLEvent() *commonEvent.DDLEvent { result.Query = d.msg.getQuery() actionType := common.GetDDLActionType(result.Query) result.Type = byte(actionType) - result.TableID = tableIDAllocator.Allocate(result.SchemaName, result.TableName) - tableIDAllocator.AddBlockTableID(result.SchemaName, result.TableName, result.TableID) + tableIDAllocator.AddBlockTableID(result.SchemaName, result.TableName, tableIDAllocator.Allocate(result.SchemaName, result.TableName)) result.BlockedTables = common.GetBlockedTables(tableIDAllocator, result) cacheKey := tableKey{ diff --git a/pkg/sink/codec/debezium/decoder.go b/pkg/sink/codec/debezium/decoder.go index 0e7c8d6ef9..ac8ec8855d 100644 --- a/pkg/sink/codec/debezium/decoder.go +++ b/pkg/sink/codec/debezium/decoder.go @@ -136,10 +136,9 @@ func (d *decoder) NextDDLEvent() *commonEvent.DDLEvent { event.Query = d.valuePayload["ddl"].(string) actionType := common.GetDDLActionType(event.Query) event.Type = byte(actionType) - event.TableID = tableIDAllocator.Allocate(event.SchemaName, event.TableName) if d.idx == 0 { - tableIDAllocator.AddBlockTableID(event.SchemaName, event.TableName, event.TableID) + tableIDAllocator.AddBlockTableID(event.SchemaName, event.TableName, tableIDAllocator.Allocate(event.SchemaName, event.TableName)) event.BlockedTables = common.GetBlockedTables(tableIDAllocator, event) if event.Type == byte(timodel.ActionRenameTable) { schemaName = event.ExtraSchemaName diff --git a/pkg/sink/codec/open/decoder.go b/pkg/sink/codec/open/decoder.go index 7cfecdf8b0..7459d20932 100644 --- a/pkg/sink/codec/open/decoder.go +++ b/pkg/sink/codec/open/decoder.go @@ -176,11 +176,10 @@ func (b *decoder) NextDDLEvent() *commonEvent.DDLEvent { result.FinishedTs = b.nextKey.Ts result.SchemaName = b.nextKey.Schema result.TableName = b.nextKey.Table - result.TableID = tableIDAllocator.Allocate(result.SchemaName, result.TableName) // only the DDL comes from the first partition will be processed. if b.idx == 0 { - tableIDAllocator.AddBlockTableID(result.SchemaName, result.TableName, result.TableID) + tableIDAllocator.AddBlockTableID(result.SchemaName, result.TableName, tableIDAllocator.Allocate(result.SchemaName, result.TableName)) result.BlockedTables = common.GetBlockedTables(tableIDAllocator, result) } diff --git a/pkg/sink/codec/open/encoder.go b/pkg/sink/codec/open/encoder.go index 70db0ac9f8..81d2cfe6fe 100644 --- a/pkg/sink/codec/open/encoder.go +++ b/pkg/sink/codec/open/encoder.go @@ -235,7 +235,11 @@ func enhancedKeyValue(key, value []byte) ([]byte, []byte) { func (d *batchEncoder) EncodeDDLEvent(e *commonEvent.DDLEvent) (*common.Message, error) { lock.Lock() - delete(columnFlagsCache, e.TableID) + tableID := int64(0) + if e.TableInfo != nil { + tableID = e.TableInfo.TableName.TableID + } + delete(columnFlagsCache, tableID) defer lock.Unlock() key, value, err := encodeDDLEvent(e, d.config) diff --git a/pkg/sink/codec/open/encoder_test.go b/pkg/sink/codec/open/encoder_test.go index 916ed2c3d7..463fa69b14 100644 --- a/pkg/sink/codec/open/encoder_test.go +++ b/pkg/sink/codec/open/encoder_test.go @@ -19,6 +19,7 @@ import ( "github.com/pingcap/log" "github.com/pingcap/ticdc/downstreamadapter/sink/columnselector" + commonTable "github.com/pingcap/ticdc/pkg/common" commonEvent "github.com/pingcap/ticdc/pkg/common/event" "github.com/pingcap/ticdc/pkg/config" "github.com/pingcap/ticdc/pkg/errors" @@ -621,6 +622,7 @@ func TestCreateTableDDL(t *testing.T) { Type: byte(job.Type), SchemaName: job.SchemaName, TableName: job.TableName, + TableInfo: commonTable.WrapTableInfo(job.SchemaName, job.BinlogInfo.TableInfo), FinishedTs: 1, } diff --git a/pkg/sink/codec/simple/decoder.go b/pkg/sink/codec/simple/decoder.go index bb25b1b6c9..1b5d7034f0 100644 --- a/pkg/sink/codec/simple/decoder.go +++ b/pkg/sink/codec/simple/decoder.go @@ -507,8 +507,7 @@ func (d *Decoder) buildDDLEvent(msg *message) *commonEvent.DDLEvent { result.FinishedTs = msg.CommitTs result.SchemaName = tableInfo.TableName.Schema result.TableName = tableInfo.TableName.Table - result.TableID = tableInfo.TableName.TableID - tableIDAllocator.AddBlockTableID(result.SchemaName, result.TableName, result.TableID) + tableIDAllocator.AddBlockTableID(result.SchemaName, result.TableName, tableInfo.TableName.TableID) if preTableInfo != nil { result.ExtraSchemaName = preTableInfo.GetSchemaName() diff --git a/pkg/sink/codec/simple/encoder_test.go b/pkg/sink/codec/simple/encoder_test.go index b453c32bdb..46ef418a18 100644 --- a/pkg/sink/codec/simple/encoder_test.go +++ b/pkg/sink/codec/simple/encoder_test.go @@ -245,7 +245,7 @@ func TestE2EPartitionTable(t *testing.T) { decodedDDL := dec.NextDDLEvent() require.NoError(t, err) require.NotNil(t, decodedDDL) - require.Equal(t, decodedDDL.TableID, createPartitionTableDDL.TableID, format) + require.Equal(t, decodedDDL.TableInfo.TableName.TableID, createPartitionTableDDL.TableInfo.TableName.TableID, format) physicalTableID := make([]int64, 0, len(events)) for _, e := range events { @@ -1004,7 +1004,6 @@ func TestEncodeIntegerTypes(t *testing.T) { job := helper.DDL2Job(createTableDDL) ddlEvent := &commonEvent.DDLEvent{ SchemaID: job.SchemaID, - TableID: job.TableID, Query: job.Query, TableInfo: helper.GetTableInfo(job), FinishedTs: job.BinlogInfo.FinishedTS,