diff --git a/flow/activities/flowable.go b/flow/activities/flowable.go index d5d04fe740..e0052d27fa 100644 --- a/flow/activities/flowable.go +++ b/flow/activities/flowable.go @@ -705,7 +705,7 @@ func (a *FlowableActivity) ReplicateQRepPartitions(ctx context.Context, a.Alerter.LogFlowInfo( ctx, config.FlowJobName, - "replicated all rows to destination for table "+config.DestinationTableIdentifier, + fmt.Sprintf("replicated %d partitions to destination for table %s", numPartitions, config.DestinationTableIdentifier), ) return nil } @@ -1424,7 +1424,7 @@ func (a *FlowableActivity) RenameTables(ctx context.Context, config *protos.Rena option.CurrentName, ) if err != nil { - return nil, fmt.Errorf("failed to load schema to rename tables: %w", err) + return nil, a.Alerter.LogFlowError(ctx, config.FlowJobName, fmt.Errorf("failed to load schema to rename tables: %w", err)) } tableNameSchemaMapping[option.CurrentName] = schema } @@ -1451,7 +1451,7 @@ func (a *FlowableActivity) updateTableSchemaMappingForResync( ) error { tx, err := a.CatalogPool.Begin(ctx) if err != nil { - return fmt.Errorf("failed to begin updating table_schema_mapping: %w", err) + return a.Alerter.LogFlowError(ctx, flowJobName, fmt.Errorf("failed to begin updating table_schema_mapping: %w", err)) } logger := log.With(internal.LoggerFromCtx(ctx), slog.String(string(shared.FlowNameKey), flowJobName)) defer shared.RollbackTx(tx, logger) @@ -1464,7 +1464,7 @@ func (a *FlowableActivity) updateTableSchemaMappingForResync( flowJobName, option.NewName, ); err != nil { - return fmt.Errorf("failed to delete _resync entries in table_schema_mapping: %w", err) + return a.Alerter.LogFlowError(ctx, flowJobName, fmt.Errorf("failed to update table_schema_mapping: %w", err)) } } if _, err := tx.Exec( @@ -1474,10 +1474,16 @@ func (a *FlowableActivity) updateTableSchemaMappingForResync( option.CurrentName, option.NewName, ); err != nil { - return fmt.Errorf("failed to update table_schema_mapping: %w", err) + return a.Alerter.LogFlowError(ctx, flowJobName, fmt.Errorf("failed to update table_schema_mapping: %w", err)) } } - return tx.Commit(ctx) + + a.Alerter.LogFlowInfo(ctx, flowJobName, "Resync completed for all tables") + + if commitErr := tx.Commit(ctx); commitErr != nil { + return a.Alerter.LogFlowError(ctx, flowJobName, fmt.Errorf("failed to commit updating table_schema_mapping: %w", commitErr)) + } + return nil } func (a *FlowableActivity) DeleteMirrorStats(ctx context.Context, flowName string) error { @@ -1602,13 +1608,13 @@ func (a *FlowableActivity) RemoveTablesFromRawTable( normBatchID, err := pgMetadata.GetLastNormalizeBatchID(ctx, cfg.FlowJobName) if err != nil { logger.Error("[RemoveTablesFromRawTable] failed to get last normalize batch id", slog.Any("error", err)) - return err + return a.Alerter.LogFlowError(ctx, cfg.FlowJobName, err) } syncBatchID, err := pgMetadata.GetLastSyncBatchID(ctx, cfg.FlowJobName) if err != nil { logger.Error("[RemoveTablesFromRawTable] failed to get last sync batch id", slog.Any("error", err)) - return err + return a.Alerter.LogFlowError(ctx, cfg.FlowJobName, err) } dstConn, err := connectors.GetByNameAs[connectors.RawTableConnector](ctx, cfg.Env, a.CatalogPool, cfg.DestinationName) @@ -1649,14 +1655,16 @@ func (a *FlowableActivity) RemoveTablesFromCatalog( removedTables = append(removedTables, tm.DestinationTableIdentifier) } - _, err := a.CatalogPool.Exec( + if _, err := a.CatalogPool.Exec( ctx, "delete from table_schema_mapping where flow_name = $1 and table_name = ANY($2)", cfg.FlowJobName, removedTables, - ) + ); err != nil { + return a.Alerter.LogFlowError(ctx, cfg.FlowJobName, err) + } - return err + return nil } func (a *FlowableActivity) RemoveFlowDetailsFromCatalog(