Skip to content

Commit c879c22

Browse files
Emit errors in RenameTables and a few places in table removal flow (#3470)
Adds LogFlowError calls for a bunch of error returns in renaming (in resync) and table removal activities.
1 parent 9c91264 commit c879c22

File tree

1 file changed

+19
-11
lines changed

1 file changed

+19
-11
lines changed

flow/activities/flowable.go

Lines changed: 19 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -705,7 +705,7 @@ func (a *FlowableActivity) ReplicateQRepPartitions(ctx context.Context,
705705
a.Alerter.LogFlowInfo(
706706
ctx,
707707
config.FlowJobName,
708-
"replicated all rows to destination for table "+config.DestinationTableIdentifier,
708+
fmt.Sprintf("replicated %d partitions to destination for table %s", numPartitions, config.DestinationTableIdentifier),
709709
)
710710
return nil
711711
}
@@ -1424,7 +1424,7 @@ func (a *FlowableActivity) RenameTables(ctx context.Context, config *protos.Rena
14241424
option.CurrentName,
14251425
)
14261426
if err != nil {
1427-
return nil, fmt.Errorf("failed to load schema to rename tables: %w", err)
1427+
return nil, a.Alerter.LogFlowError(ctx, config.FlowJobName, fmt.Errorf("failed to load schema to rename tables: %w", err))
14281428
}
14291429
tableNameSchemaMapping[option.CurrentName] = schema
14301430
}
@@ -1451,7 +1451,7 @@ func (a *FlowableActivity) updateTableSchemaMappingForResync(
14511451
) error {
14521452
tx, err := a.CatalogPool.Begin(ctx)
14531453
if err != nil {
1454-
return fmt.Errorf("failed to begin updating table_schema_mapping: %w", err)
1454+
return a.Alerter.LogFlowError(ctx, flowJobName, fmt.Errorf("failed to begin updating table_schema_mapping: %w", err))
14551455
}
14561456
logger := log.With(internal.LoggerFromCtx(ctx), slog.String(string(shared.FlowNameKey), flowJobName))
14571457
defer shared.RollbackTx(tx, logger)
@@ -1464,7 +1464,7 @@ func (a *FlowableActivity) updateTableSchemaMappingForResync(
14641464
flowJobName,
14651465
option.NewName,
14661466
); err != nil {
1467-
return fmt.Errorf("failed to delete _resync entries in table_schema_mapping: %w", err)
1467+
return a.Alerter.LogFlowError(ctx, flowJobName, fmt.Errorf("failed to update table_schema_mapping: %w", err))
14681468
}
14691469
}
14701470
if _, err := tx.Exec(
@@ -1474,10 +1474,16 @@ func (a *FlowableActivity) updateTableSchemaMappingForResync(
14741474
option.CurrentName,
14751475
option.NewName,
14761476
); err != nil {
1477-
return fmt.Errorf("failed to update table_schema_mapping: %w", err)
1477+
return a.Alerter.LogFlowError(ctx, flowJobName, fmt.Errorf("failed to update table_schema_mapping: %w", err))
14781478
}
14791479
}
1480-
return tx.Commit(ctx)
1480+
1481+
a.Alerter.LogFlowInfo(ctx, flowJobName, "Resync completed for all tables")
1482+
1483+
if commitErr := tx.Commit(ctx); commitErr != nil {
1484+
return a.Alerter.LogFlowError(ctx, flowJobName, fmt.Errorf("failed to commit updating table_schema_mapping: %w", commitErr))
1485+
}
1486+
return nil
14811487
}
14821488

14831489
func (a *FlowableActivity) DeleteMirrorStats(ctx context.Context, flowName string) error {
@@ -1602,13 +1608,13 @@ func (a *FlowableActivity) RemoveTablesFromRawTable(
16021608
normBatchID, err := pgMetadata.GetLastNormalizeBatchID(ctx, cfg.FlowJobName)
16031609
if err != nil {
16041610
logger.Error("[RemoveTablesFromRawTable] failed to get last normalize batch id", slog.Any("error", err))
1605-
return err
1611+
return a.Alerter.LogFlowError(ctx, cfg.FlowJobName, err)
16061612
}
16071613

16081614
syncBatchID, err := pgMetadata.GetLastSyncBatchID(ctx, cfg.FlowJobName)
16091615
if err != nil {
16101616
logger.Error("[RemoveTablesFromRawTable] failed to get last sync batch id", slog.Any("error", err))
1611-
return err
1617+
return a.Alerter.LogFlowError(ctx, cfg.FlowJobName, err)
16121618
}
16131619

16141620
dstConn, err := connectors.GetByNameAs[connectors.RawTableConnector](ctx, cfg.Env, a.CatalogPool, cfg.DestinationName)
@@ -1649,14 +1655,16 @@ func (a *FlowableActivity) RemoveTablesFromCatalog(
16491655
removedTables = append(removedTables, tm.DestinationTableIdentifier)
16501656
}
16511657

1652-
_, err := a.CatalogPool.Exec(
1658+
if _, err := a.CatalogPool.Exec(
16531659
ctx,
16541660
"delete from table_schema_mapping where flow_name = $1 and table_name = ANY($2)",
16551661
cfg.FlowJobName,
16561662
removedTables,
1657-
)
1663+
); err != nil {
1664+
return a.Alerter.LogFlowError(ctx, cfg.FlowJobName, err)
1665+
}
16581666

1659-
return err
1667+
return nil
16601668
}
16611669

16621670
func (a *FlowableActivity) RemoveFlowDetailsFromCatalog(

0 commit comments

Comments
 (0)