Skip to content
Merged
6 changes: 5 additions & 1 deletion downstreamadapter/dispatcher/basic_dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -458,10 +458,14 @@ func (d *BasicDispatcher) handleEvents(dispatcherEvents []DispatcherEvent, wakeC
}
}

tableID := int64(0)
if ddl.TableInfo != nil {
tableID = ddl.TableInfo.TableName.TableID
}
log.Info("dispatcher receive ddl event",
zap.Stringer("dispatcher", d.id),
zap.String("query", ddl.Query),
zap.Int64("table", ddl.TableID),
zap.Int64("table", tableID),
zap.Uint64("commitTs", event.GetCommitTs()),
zap.Uint64("seq", event.GetSeq()))
ddl.AddPostFlushFunc(func() {
Expand Down
1 change: 0 additions & 1 deletion downstreamadapter/dispatcher/event_dispatcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -826,7 +826,6 @@ func TestDispatcherSplittableCheck(t *testing.T) {
},
TableInfo: commonTableInfo,
Query: "ALTER TABLE t ADD COLUMN new_col INT",
TableID: 1,
}

// Create dispatcher event
Expand Down
2 changes: 2 additions & 0 deletions downstreamadapter/sink/kafka/sink_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,7 @@ func TestKafkaSinkBasicFunctionality(t *testing.T) {
Query: job.Query,
SchemaName: job.SchemaName,
TableName: job.TableName,
TableInfo: common.WrapTableInfo(job.SchemaName, job.BinlogInfo.TableInfo),
FinishedTs: 1,
BlockedTables: &commonEvent.InfluencedTables{
InfluenceType: commonEvent.InfluenceTypeNormal,
Expand All @@ -220,6 +221,7 @@ func TestKafkaSinkBasicFunctionality(t *testing.T) {
Query: job.Query,
SchemaName: job.SchemaName,
TableName: job.TableName,
TableInfo: common.WrapTableInfo(job.SchemaName, job.BinlogInfo.TableInfo),
FinishedTs: 4,
BlockedTables: &commonEvent.InfluencedTables{
InfluenceType: commonEvent.InfluenceTypeNormal,
Expand Down
52 changes: 40 additions & 12 deletions logservice/schemastore/persist_storage_ddl_handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -1450,7 +1450,9 @@ func extractTableInfoFuncForAddPartition(event *PersistedDDLEvent, tableID int64
newCreatedIDs := getCreatedIDs(event.PrevPartitions, getAllPartitionIDs(event.TableInfo))
for _, partition := range newCreatedIDs {
if tableID == partition {
return common.WrapTableInfo(event.SchemaName, event.TableInfo), false
// Use WrapTableInfoWithTableID to ensure the returned TableInfo carries
// the physical partition ID (tableID) instead of the logical table ID.
return common.WrapTableInfoWithTableID(event.SchemaName, event.TableInfo, tableID), false
}
}
return nil, false
Expand All @@ -1477,7 +1479,9 @@ func extractTableInfoFuncForTruncateAndReorganizePartition(event *PersistedDDLEv
newCreatedIDs := getCreatedIDs(event.PrevPartitions, physicalIDs)
for _, partition := range newCreatedIDs {
if tableID == partition {
return common.WrapTableInfo(event.SchemaName, event.TableInfo), false
// Use WrapTableInfoWithTableID to ensure the returned TableInfo carries
// the physical partition ID (tableID) instead of the logical table ID.
return common.WrapTableInfoWithTableID(event.SchemaName, event.TableInfo, tableID), false
}
}
return nil, false
Expand Down Expand Up @@ -1536,7 +1540,9 @@ func extractTableInfoFuncForAlterTablePartitioning(event *PersistedDDLEvent, tab
}
for _, partitionID := range getAllPartitionIDs(event.TableInfo) {
if tableID == partitionID {
return common.WrapTableInfo(event.SchemaName, event.TableInfo), false
// Use WrapTableInfoWithTableID to ensure the returned TableInfo carries
// the physical partition ID (tableID) instead of the logical table ID.
return common.WrapTableInfoWithTableID(event.SchemaName, event.TableInfo, tableID), false
}
}
log.Panic("should not reach here", zap.Int64("tableID", tableID))
Expand All @@ -1545,7 +1551,9 @@ func extractTableInfoFuncForAlterTablePartitioning(event *PersistedDDLEvent, tab

func extractTableInfoFuncForRemovePartitioning(event *PersistedDDLEvent, tableID int64) (*common.TableInfo, bool) {
if event.TableID == tableID {
return common.WrapTableInfo(event.SchemaName, event.TableInfo), false
// Use WrapTableInfoWithTableID to ensure the returned TableInfo carries
// the physical partition ID (tableID) instead of the logical table ID.
return common.WrapTableInfoWithTableID(event.SchemaName, event.TableInfo, tableID), false
}
for _, partitionID := range event.PrevPartitions {
if tableID == partitionID {
Expand All @@ -1560,7 +1568,26 @@ func extractTableInfoFuncForRemovePartitioning(event *PersistedDDLEvent, tableID
// buildDDLEvent begin
// =======

// buildDDLEventCommon wraps buildDDLEventCommonWithTableID, defaulting to use
// the logical table ID (rawEvent.TableInfo.ID).
// This maintains compatibility for DDL handlers that do not operate on
// specific physical partitions.
func buildDDLEventCommon(rawEvent *PersistedDDLEvent, tableFilter filter.Filter, tiDBOnly bool) (commonEvent.DDLEvent, bool, error) {
tableID := int64(0)
if rawEvent.TableInfo != nil {
tableID = rawEvent.TableInfo.ID
}
return buildDDLEventCommonWithTableID(rawEvent, tableID, tableFilter, tiDBOnly)
}

// buildDDLEventCommonWithTableID constructs a commonEvent.DDLEvent using a
// *specific* tableID.
// This function is the core builder, ensuring that both the DDLEvent.TableID
// field and the associated wrapTableInfo (if it exists) are populated
// with the provided tableID.
// For partition-related DDLs, this tableID should be the physical partition ID.
// For other DDLs, it defaults to the logical table ID.
func buildDDLEventCommonWithTableID(rawEvent *PersistedDDLEvent, tableID int64, tableFilter filter.Filter, tiDBOnly bool) (commonEvent.DDLEvent, bool, error) {
var wrapTableInfo *common.TableInfo
// Note: not all ddl types will respect the `filtered` result: create tables, rename tables, rename table, exchange table partition.
filtered, notSync := false, false
Expand Down Expand Up @@ -1623,14 +1650,15 @@ func buildDDLEventCommon(rawEvent *PersistedDDLEvent, tableFilter filter.Filter,
}

if rawEvent.TableInfo != nil {
wrapTableInfo = common.WrapTableInfo(rawEvent.SchemaName, rawEvent.TableInfo)
// Use the provided tableID (which could be a physical partition ID)
// to create the TableInfo wrapper.
wrapTableInfo = common.WrapTableInfoWithTableID(rawEvent.SchemaName, rawEvent.TableInfo, tableID)
}

return commonEvent.DDLEvent{
Type: rawEvent.Type,
// TODO: whether the following four fields are needed
SchemaID: rawEvent.SchemaID,
TableID: rawEvent.TableID,
SchemaName: rawEvent.SchemaName,
TableName: rawEvent.TableName,

Expand Down Expand Up @@ -2084,7 +2112,7 @@ func buildDDLEventForRenameTable(rawEvent *PersistedDDLEvent, tableFilter filter
}

func buildDDLEventForAddPartition(rawEvent *PersistedDDLEvent, tableFilter filter.Filter, tableID int64) (commonEvent.DDLEvent, bool, error) {
ddlEvent, ok, err := buildDDLEventCommon(rawEvent, tableFilter, WithoutTiDBOnly)
ddlEvent, ok, err := buildDDLEventCommonWithTableID(rawEvent, tableID, tableFilter, WithoutTiDBOnly)
if err != nil {
return commonEvent.DDLEvent{}, false, err
}
Expand Down Expand Up @@ -2113,7 +2141,7 @@ func buildDDLEventForAddPartition(rawEvent *PersistedDDLEvent, tableFilter filte
}

func buildDDLEventForDropPartition(rawEvent *PersistedDDLEvent, tableFilter filter.Filter, tableID int64) (commonEvent.DDLEvent, bool, error) {
ddlEvent, ok, err := buildDDLEventCommon(rawEvent, tableFilter, WithoutTiDBOnly)
ddlEvent, ok, err := buildDDLEventCommonWithTableID(rawEvent, tableID, tableFilter, WithoutTiDBOnly)
if err != nil {
return commonEvent.DDLEvent{}, false, err
}
Expand Down Expand Up @@ -2166,7 +2194,7 @@ func buildDDLEventForDropView(rawEvent *PersistedDDLEvent, tableFilter filter.Fi
}

func buildDDLEventForTruncateAndReorganizePartition(rawEvent *PersistedDDLEvent, tableFilter filter.Filter, tableID int64) (commonEvent.DDLEvent, bool, error) {
ddlEvent, ok, err := buildDDLEventCommon(rawEvent, tableFilter, WithoutTiDBOnly)
ddlEvent, ok, err := buildDDLEventCommonWithTableID(rawEvent, tableID, tableFilter, WithoutTiDBOnly)
if err != nil {
return commonEvent.DDLEvent{}, false, err
}
Expand Down Expand Up @@ -2199,7 +2227,7 @@ func buildDDLEventForTruncateAndReorganizePartition(rawEvent *PersistedDDLEvent,
}

func buildDDLEventForExchangeTablePartition(rawEvent *PersistedDDLEvent, tableFilter filter.Filter, tableID int64) (commonEvent.DDLEvent, bool, error) {
ddlEvent, ok, err := buildDDLEventCommon(rawEvent, tableFilter, WithoutTiDBOnly)
ddlEvent, ok, err := buildDDLEventCommonWithTableID(rawEvent, tableID, tableFilter, WithoutTiDBOnly)
if err != nil {
return commonEvent.DDLEvent{}, false, err
}
Expand Down Expand Up @@ -2622,7 +2650,7 @@ func buildDDLEventForCreateTables(rawEvent *PersistedDDLEvent, tableFilter filte
}

func buildDDLEventForAlterTablePartitioning(rawEvent *PersistedDDLEvent, tableFilter filter.Filter, tableID int64) (commonEvent.DDLEvent, bool, error) {
ddlEvent, ok, err := buildDDLEventCommon(rawEvent, tableFilter, WithoutTiDBOnly)
ddlEvent, ok, err := buildDDLEventCommonWithTableID(rawEvent, tableID, tableFilter, WithoutTiDBOnly)
if err != nil {
return commonEvent.DDLEvent{}, false, err
}
Expand Down Expand Up @@ -2658,7 +2686,7 @@ func buildDDLEventForAlterTablePartitioning(rawEvent *PersistedDDLEvent, tableFi
}

func buildDDLEventForRemovePartitioning(rawEvent *PersistedDDLEvent, tableFilter filter.Filter, tableID int64) (commonEvent.DDLEvent, bool, error) {
ddlEvent, ok, err := buildDDLEventCommon(rawEvent, tableFilter, WithoutTiDBOnly)
ddlEvent, ok, err := buildDDLEventCommonWithTableID(rawEvent, tableID, tableFilter, WithoutTiDBOnly)
if err != nil {
return commonEvent.DDLEvent{}, false, err
}
Expand Down
13 changes: 3 additions & 10 deletions pkg/common/event/ddl_event.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,14 +37,7 @@ type DDLEvent struct {
DispatcherID common.DispatcherID `json:"-"`
Type byte `json:"type"`
// SchemaID is from upstream job.SchemaID
SchemaID int64 `json:"schema_id"`
// TableID is from upstream job.TableID
// TableID means different for different job types:
// - for most ddl types which just involve a single table id, it is the table id of the table
// - for ExchangeTablePartition, it is the table id of the normal table before exchange
// and it is one of of the partition ids after exchange
// - for TruncateTable, it the table ID of the old table
TableID int64 `json:"table_id"`
SchemaID int64 `json:"schema_id"`
SchemaName string `json:"schema_name"`
TableName string `json:"table_name"`
// the following two fields are just used for RenameTable,
Expand Down Expand Up @@ -105,8 +98,8 @@ type DDLEvent struct {
}

func (d *DDLEvent) String() string {
return fmt.Sprintf("DDLEvent{Version: %d, DispatcherID: %s, Type: %d, SchemaID: %d, TableID: %d, SchemaName: %s, TableName: %s, ExtraSchemaName: %s, ExtraTableName: %s, Query: %s, TableInfo: %v, FinishedTs: %d, Seq: %d, BlockedTables: %v, NeedDroppedTables: %v, NeedAddedTables: %v, UpdatedSchemas: %v, TableNameChange: %v, TiDBOnly: %t, BDRMode: %s, Err: %s, eventSize: %d}",
d.Version, d.DispatcherID.String(), d.Type, d.SchemaID, d.TableID, d.SchemaName, d.TableName, d.ExtraSchemaName, d.ExtraTableName, d.Query, d.TableInfo, d.FinishedTs, d.Seq, d.BlockedTables, d.NeedDroppedTables, d.NeedAddedTables, d.UpdatedSchemas, d.TableNameChange, d.TiDBOnly, d.BDRMode, d.Err, d.eventSize)
return fmt.Sprintf("DDLEvent{Version: %d, DispatcherID: %s, Type: %d, SchemaID: %d, SchemaName: %s, TableName: %s, ExtraSchemaName: %s, ExtraTableName: %s, Query: %s, TableInfo: %v, FinishedTs: %d, Seq: %d, BlockedTables: %v, NeedDroppedTables: %v, NeedAddedTables: %v, UpdatedSchemas: %v, TableNameChange: %v, TiDBOnly: %t, BDRMode: %s, Err: %s, eventSize: %d}",
d.Version, d.DispatcherID.String(), d.Type, d.SchemaID, d.SchemaName, d.TableName, d.ExtraSchemaName, d.ExtraTableName, d.Query, d.TableInfo, d.FinishedTs, d.Seq, d.BlockedTables, d.NeedDroppedTables, d.NeedAddedTables, d.UpdatedSchemas, d.TableNameChange, d.TiDBOnly, d.BDRMode, d.Err, d.eventSize)
}

func (d *DDLEvent) GetType() int {
Expand Down
2 changes: 0 additions & 2 deletions pkg/common/event/ddl_event_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@ func TestDDLEvent(t *testing.T) {
DispatcherID: common.NewDispatcherID(),
Type: byte(ddlJob.Type),
SchemaID: ddlJob.SchemaID,
TableID: ddlJob.TableID,
SchemaName: ddlJob.SchemaName,
TableName: ddlJob.TableName,
Query: ddlJob.Query,
Expand All @@ -58,7 +57,6 @@ func TestDDLEvent(t *testing.T) {
require.Equal(t, ddlEvent.DispatcherID, reverseEvent.DispatcherID)
require.Equal(t, ddlEvent.Type, reverseEvent.Type)
require.Equal(t, ddlEvent.SchemaID, reverseEvent.SchemaID)
require.Equal(t, ddlEvent.TableID, reverseEvent.TableID)
require.Equal(t, ddlEvent.SchemaName, reverseEvent.SchemaName)
require.Equal(t, ddlEvent.TableName, reverseEvent.TableName)
require.Equal(t, ddlEvent.Query, reverseEvent.Query)
Expand Down
1 change: 0 additions & 1 deletion pkg/common/event/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,6 @@ func (s *EventTestHelper) DDL2Event(ddl string) *DDLEvent {
info := s.GetTableInfo(job)
return &DDLEvent{
SchemaID: job.SchemaID,
TableID: info.TableName.TableID,
SchemaName: job.SchemaName,
TableName: job.TableName,
Query: job.Query,
Expand Down
16 changes: 13 additions & 3 deletions pkg/common/table_info.go
Original file line number Diff line number Diff line change
Expand Up @@ -558,12 +558,22 @@ func NewTableInfo(schemaName string, tableName string, tableID int64, isPartitio
return ti
}

// WrapTableInfo creates a TableInfo from a model.TableInfo
func WrapTableInfo(schemaName string, info *model.TableInfo) *TableInfo {
// WrapTableInfoWithTableID creates a TableInfo from a model.TableInfo.
// It explicitly uses the provided tableID instead of the one from info.ID.
// This is used to create TableInfo wrappers for physical partitions,
// ensuring they carry the physical partition ID, not the logical table ID.
func WrapTableInfoWithTableID(schemaName string, info *model.TableInfo, tableID int64) *TableInfo {
// search column schema object
sharedColumnSchemaStorage := GetSharedColumnSchemaStorage()
columnSchema := sharedColumnSchemaStorage.GetOrSetColumnSchema(info)
return NewTableInfo(schemaName, info.Name.O, info.ID, info.GetPartitionInfo() != nil, columnSchema, info)
return NewTableInfo(schemaName, info.Name.O, tableID, info.GetPartitionInfo() != nil, columnSchema, info)
}

// WrapTableInfo creates a TableInfo from a model.TableInfo.
// This function is a convenience wrapper around WrapTableInfoWithTableID,
// defaulting to use the logical table ID (info.ID).
func WrapTableInfo(schemaName string, info *model.TableInfo) *TableInfo {
return WrapTableInfoWithTableID(schemaName, info, info.ID)
}

// NewTableInfo4Decoder is only used by the codec decoder for the test purpose,
Expand Down
7 changes: 6 additions & 1 deletion pkg/eventservice/event_broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -243,10 +243,15 @@ func (c *eventBroker) sendDDL(ctx context.Context, remoteID node.ID, e *event.DD
case c.getMessageCh(d.messageWorkerIndex, common.IsRedoMode(d.info.GetMode())) <- ddlEvent:
updateMetricEventServiceSendDDLCount(d.info.GetMode())
}

tableID := int64(0)
if e.TableInfo != nil {
tableID = e.TableInfo.TableName.TableID
}
log.Info("send ddl event to dispatcher",
zap.Stringer("dispatcherID", d.id),
zap.Int64("DDLSpanTableID", d.info.GetTableSpan().TableID),
zap.Int64("EventTableID", e.TableID),
zap.Int64("EventTableID", tableID),
zap.String("query", e.Query), zap.Uint64("commitTs", e.FinishedTs),
zap.Uint64("seq", e.Seq), zap.Int64("mode", d.info.GetMode()))
}
Expand Down
Loading