Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 19 additions & 11 deletions flow/activities/flowable.go
Original file line number Diff line number Diff line change
Expand Up @@ -705,7 +705,7 @@ func (a *FlowableActivity) ReplicateQRepPartitions(ctx context.Context,
a.Alerter.LogFlowInfo(
ctx,
config.FlowJobName,
"replicated all rows to destination for table "+config.DestinationTableIdentifier,
fmt.Sprintf("replicated %d partitions to destination for table %s", numPartitions, config.DestinationTableIdentifier),
)
return nil
}
Expand Down Expand Up @@ -1424,7 +1424,7 @@ func (a *FlowableActivity) RenameTables(ctx context.Context, config *protos.Rena
option.CurrentName,
)
if err != nil {
return nil, fmt.Errorf("failed to load schema to rename tables: %w", err)
return nil, a.Alerter.LogFlowError(ctx, config.FlowJobName, fmt.Errorf("failed to load schema to rename tables: %w", err))
}
tableNameSchemaMapping[option.CurrentName] = schema
}
Expand All @@ -1451,7 +1451,7 @@ func (a *FlowableActivity) updateTableSchemaMappingForResync(
) error {
tx, err := a.CatalogPool.Begin(ctx)
if err != nil {
return fmt.Errorf("failed to begin updating table_schema_mapping: %w", err)
return a.Alerter.LogFlowError(ctx, flowJobName, fmt.Errorf("failed to begin updating table_schema_mapping: %w", err))
}
logger := log.With(internal.LoggerFromCtx(ctx), slog.String(string(shared.FlowNameKey), flowJobName))
defer shared.RollbackTx(tx, logger)
Expand All @@ -1464,7 +1464,7 @@ func (a *FlowableActivity) updateTableSchemaMappingForResync(
flowJobName,
option.NewName,
); err != nil {
return fmt.Errorf("failed to delete _resync entries in table_schema_mapping: %w", err)
return a.Alerter.LogFlowError(ctx, flowJobName, fmt.Errorf("failed to update table_schema_mapping: %w", err))
}
}
if _, err := tx.Exec(
Expand All @@ -1474,10 +1474,16 @@ func (a *FlowableActivity) updateTableSchemaMappingForResync(
option.CurrentName,
option.NewName,
); err != nil {
return fmt.Errorf("failed to update table_schema_mapping: %w", err)
return a.Alerter.LogFlowError(ctx, flowJobName, fmt.Errorf("failed to update table_schema_mapping: %w", err))
}
}
return tx.Commit(ctx)

a.Alerter.LogFlowInfo(ctx, flowJobName, "Resync completed for all tables")

if commitErr := tx.Commit(ctx); commitErr != nil {
return a.Alerter.LogFlowError(ctx, flowJobName, fmt.Errorf("failed to commit updating table_schema_mapping: %w", commitErr))
}
return nil
}

func (a *FlowableActivity) DeleteMirrorStats(ctx context.Context, flowName string) error {
Expand Down Expand Up @@ -1602,13 +1608,13 @@ func (a *FlowableActivity) RemoveTablesFromRawTable(
normBatchID, err := pgMetadata.GetLastNormalizeBatchID(ctx, cfg.FlowJobName)
if err != nil {
logger.Error("[RemoveTablesFromRawTable] failed to get last normalize batch id", slog.Any("error", err))
return err
return a.Alerter.LogFlowError(ctx, cfg.FlowJobName, err)
}

syncBatchID, err := pgMetadata.GetLastSyncBatchID(ctx, cfg.FlowJobName)
if err != nil {
logger.Error("[RemoveTablesFromRawTable] failed to get last sync batch id", slog.Any("error", err))
return err
return a.Alerter.LogFlowError(ctx, cfg.FlowJobName, err)
}

dstConn, err := connectors.GetByNameAs[connectors.RawTableConnector](ctx, cfg.Env, a.CatalogPool, cfg.DestinationName)
Expand Down Expand Up @@ -1649,14 +1655,16 @@ func (a *FlowableActivity) RemoveTablesFromCatalog(
removedTables = append(removedTables, tm.DestinationTableIdentifier)
}

_, err := a.CatalogPool.Exec(
if _, err := a.CatalogPool.Exec(
ctx,
"delete from table_schema_mapping where flow_name = $1 and table_name = ANY($2)",
cfg.FlowJobName,
removedTables,
)
); err != nil {
return a.Alerter.LogFlowError(ctx, cfg.FlowJobName, err)
}

return err
return nil
}

func (a *FlowableActivity) RemoveFlowDetailsFromCatalog(
Expand Down
Loading