From 190bc9259f190b95d3b4f64aabf6e75a433eebf6 Mon Sep 17 00:00:00 2001 From: Stoica Alexandru Date: Mon, 4 Aug 2025 16:42:48 +0100 Subject: [PATCH 1/2] Fetch configuration from DB No longer passing the entire configuration object to the Temporal job as we can hit the 2MB hard limit imposed by Temporal. Instead, pass flowJobName and fetch the config from the DB in all the places required. There are some additional changes that were required to be made, as some of the jobs are used from multiple contexts - and I had to adapt some of the information passed to ensure that they continue to work as expected. --- flow/activities/flowable.go | 55 +++++- flow/activities/flowable_core.go | 35 ++-- flow/cmd/handler.go | 1 - flow/cmd/mirror_status.go | 2 +- flow/connectors/clickhouse/normalize.go | 7 +- flow/connectors/postgres/postgres.go | 7 +- flow/e2e/postgres/peer_flow_pg_test.go | 23 ++- flow/internal/flow_configuration_helpers.go | 38 ++++ flow/workflows/cdc_flow.go | 197 ++++++++++---------- flow/workflows/qrep_flow.go | 16 +- flow/workflows/setup_flow.go | 49 +++-- flow/workflows/snapshot_flow.go | 96 ++++++---- protos/flow.proto | 9 +- 13 files changed, 309 insertions(+), 226 deletions(-) create mode 100644 flow/internal/flow_configuration_helpers.go diff --git a/flow/activities/flowable.go b/flow/activities/flowable.go index 26681d19dc..2229dc0619 100644 --- a/flow/activities/flowable.go +++ b/flow/activities/flowable.go @@ -5,7 +5,9 @@ import ( "errors" "fmt" "log/slog" + "maps" "os" + "slices" "strconv" "sync/atomic" "time" @@ -124,6 +126,12 @@ func (a *FlowableActivity) EnsurePullability( } defer connectors.CloseConnector(ctx, srcConn) + cfg, err := internal.FetchConfigFromDB(config.FlowJobName) + if err != nil { + return nil, a.Alerter.LogFlowError(ctx, config.FlowJobName, fmt.Errorf("failed to fetch config from DB: %w", err)) + } + + config.SourceTableIdentifiers = slices.Sorted(maps.Keys(internal.TableNameMapping(cfg.TableMappings))) output, err := srcConn.EnsurePullability(ctx, config) if err != nil { return nil, a.Alerter.LogFlowError(ctx, config.FlowJobName, fmt.Errorf("failed to ensure pullability: %w", err)) @@ -165,6 +173,18 @@ func (a *FlowableActivity) SetupTableSchema( }) defer shutdown() + // have to fetch config from the DB + cfg, err := internal.FetchConfigFromDB(config.FlowName) + if err != nil { + return a.Alerter.LogFlowError(ctx, config.FlowName, fmt.Errorf("failed to fetch config from DB: %w", err)) + } + tableMappings := cfg.TableMappings + if len(config.FilteredTableMappings) > 0 { + // we use the filtered table mappings if provided. they are provided from + // the sync flow which includes changes to the schema. + tableMappings = config.FilteredTableMappings + } + logger := internal.LoggerFromCtx(ctx) ctx = context.WithValue(ctx, shared.FlowNameKey, config.FlowName) srcConn, err := connectors.GetByNameAs[connectors.GetTableSchemaConnector](ctx, config.Env, a.CatalogPool, config.PeerName) @@ -173,11 +193,11 @@ func (a *FlowableActivity) SetupTableSchema( } defer connectors.CloseConnector(ctx, srcConn) - tableNameSchemaMapping, err := srcConn.GetTableSchema(ctx, config.Env, config.Version, config.System, config.TableMappings) + tableNameSchemaMapping, err := srcConn.GetTableSchema(ctx, config.Env, config.Version, config.System, tableMappings) if err != nil { return a.Alerter.LogFlowError(ctx, config.FlowName, fmt.Errorf("failed to get GetTableSchemaConnector: %w", err)) } - processed := internal.BuildProcessedSchemaMapping(config.TableMappings, tableNameSchemaMapping, logger) + processed := internal.BuildProcessedSchemaMapping(tableMappings, tableNameSchemaMapping, logger) tx, err := a.CatalogPool.BeginTx(ctx, pgx.TxOptions{}) if err != nil { @@ -213,6 +233,13 @@ func (a *FlowableActivity) CreateNormalizedTable( numTablesSetup := atomic.Uint32{} numTablesToSetup := atomic.Int32{} + cfg, err := internal.FetchConfigFromDB(config.FlowName) + if err != nil { + return nil, a.Alerter.LogFlowError(ctx, config.FlowName, fmt.Errorf("failed to fetch config from DB: %w", err)) + } + + tableMappings := cfg.TableMappings + shutdown := heartbeatRoutine(ctx, func() string { return fmt.Sprintf("setting up normalized tables - %d of %d done", numTablesSetup.Load(), numTablesToSetup.Load()) }) @@ -244,7 +271,7 @@ func (a *FlowableActivity) CreateNormalizedTable( numTablesToSetup.Store(int32(len(tableNameSchemaMapping))) tableExistsMapping := make(map[string]bool, len(tableNameSchemaMapping)) - for _, tableMapping := range config.TableMappings { + for _, tableMapping := range tableMappings { tableIdentifier := tableMapping.DestinationTableIdentifier tableSchema := tableNameSchemaMapping[tableIdentifier] existing, err := conn.SetupNormalizedTable( @@ -292,6 +319,12 @@ func (a *FlowableActivity) SyncFlow( var syncingBatchID atomic.Int64 var syncState atomic.Pointer[string] syncState.Store(shared.Ptr("setup")) + + config, err := internal.FetchConfigFromDB(config.FlowJobName) + if err != nil { + return fmt.Errorf("unable to query flow config from catalog: %w", err) + } + shutdown := heartbeatRoutine(ctx, func() string { // Must load Waiting after BatchID to avoid race saying we're waiting on currently processing batch sBatchID := syncingBatchID.Load() @@ -363,10 +396,10 @@ func (a *FlowableActivity) SyncFlow( var syncResponse *model.SyncResponse var syncErr error if config.System == protos.TypeSystem_Q { - syncResponse, syncErr = a.syncRecords(groupCtx, config, options, srcConn.(connectors.CDCPullConnector), + syncResponse, syncErr = a.syncRecords(groupCtx, config, srcConn.(connectors.CDCPullConnector), normRequests, normResponses, normBufferSize, &syncingBatchID, &syncState) } else { - syncResponse, syncErr = a.syncPg(groupCtx, config, options, srcConn.(connectors.CDCPullPgConnector), + syncResponse, syncErr = a.syncPg(groupCtx, config, srcConn.(connectors.CDCPullPgConnector), normRequests, normResponses, normBufferSize, &syncingBatchID, &syncState) } @@ -414,7 +447,6 @@ func (a *FlowableActivity) SyncFlow( func (a *FlowableActivity) syncRecords( ctx context.Context, config *protos.FlowConnectionConfigs, - options *protos.SyncFlowOptions, srcConn connectors.CDCPullConnector, normRequests *concurrency.LastChan, normResponses *concurrency.LastChan, @@ -451,7 +483,7 @@ func (a *FlowableActivity) syncRecords( return stream, nil } } - return syncCore(ctx, a, config, options, srcConn, + return syncCore(ctx, a, config, srcConn, normRequests, normResponses, normBufferSize, syncingBatchID, syncWaiting, adaptStream, connectors.CDCPullConnector.PullRecords, @@ -461,7 +493,6 @@ func (a *FlowableActivity) syncRecords( func (a *FlowableActivity) syncPg( ctx context.Context, config *protos.FlowConnectionConfigs, - options *protos.SyncFlowOptions, srcConn connectors.CDCPullPgConnector, normRequests *concurrency.LastChan, normResponses *concurrency.LastChan, @@ -469,7 +500,7 @@ func (a *FlowableActivity) syncPg( syncingBatchID *atomic.Int64, syncWaiting *atomic.Pointer[string], ) (*model.SyncResponse, error) { - return syncCore(ctx, a, config, options, srcConn, + return syncCore(ctx, a, config, srcConn, normRequests, normResponses, normBufferSize, syncingBatchID, syncWaiting, nil, connectors.CDCPullPgConnector.PullPg, @@ -1230,9 +1261,13 @@ func (a *FlowableActivity) ReplicateXminPartition(ctx context.Context, } } -func (a *FlowableActivity) AddTablesToPublication(ctx context.Context, cfg *protos.FlowConnectionConfigs, +func (a *FlowableActivity) AddTablesToPublication(ctx context.Context, flowJobName string, additionalTableMappings []*protos.TableMapping, ) error { + cfg, err := internal.FetchConfigFromDB(flowJobName) + if err != nil { + return fmt.Errorf("unable to query flow config from catalog: %w", err) + } ctx = context.WithValue(ctx, shared.FlowNameKey, cfg.FlowJobName) srcConn, err := connectors.GetByNameAs[*connpostgres.PostgresConnector](ctx, cfg.Env, a.CatalogPool, cfg.SourceName) if err != nil { diff --git a/flow/activities/flowable_core.go b/flow/activities/flowable_core.go index b6381fa48f..52a076030b 100644 --- a/flow/activities/flowable_core.go +++ b/flow/activities/flowable_core.go @@ -80,11 +80,10 @@ func (a *FlowableActivity) getTableNameSchemaMapping(ctx context.Context, flowNa func (a *FlowableActivity) applySchemaDeltas( ctx context.Context, config *protos.FlowConnectionConfigs, - options *protos.SyncFlowOptions, schemaDeltas []*protos.TableSchemaDelta, ) error { filteredTableMappings := make([]*protos.TableMapping, 0, len(schemaDeltas)) - for _, tableMapping := range options.TableMappings { + for _, tableMapping := range config.TableMappings { if slices.ContainsFunc(schemaDeltas, func(schemaDelta *protos.TableSchemaDelta) bool { return schemaDelta.SrcTableName == tableMapping.SourceTableIdentifier && schemaDelta.DstTableName == tableMapping.DestinationTableIdentifier @@ -95,12 +94,12 @@ func (a *FlowableActivity) applySchemaDeltas( if len(schemaDeltas) > 0 { if err := a.SetupTableSchema(ctx, &protos.SetupTableSchemaBatchInput{ - PeerName: config.SourceName, - TableMappings: filteredTableMappings, - FlowName: config.FlowJobName, - System: config.System, - Env: config.Env, - Version: config.Version, + PeerName: config.SourceName, + FilteredTableMappings: filteredTableMappings, + FlowName: config.FlowJobName, + System: config.System, + Env: config.Env, + Version: config.Version, }); err != nil { return a.Alerter.LogFlowError(ctx, config.FlowJobName, fmt.Errorf("failed to execute schema update at source: %w", err)) } @@ -112,7 +111,6 @@ func syncCore[TPull connectors.CDCPullConnectorCore, TSync connectors.CDCSyncCon ctx context.Context, a *FlowableActivity, config *protos.FlowConnectionConfigs, - options *protos.SyncFlowOptions, srcConn TPull, normRequests *concurrency.LastChan, normResponses *concurrency.LastChan, @@ -127,8 +125,9 @@ func syncCore[TPull connectors.CDCPullConnectorCore, TSync connectors.CDCSyncCon ctx = context.WithValue(ctx, shared.FlowNameKey, flowName) logger := internal.LoggerFromCtx(ctx) - tblNameMapping := make(map[string]model.NameAndExclude, len(options.TableMappings)) - for _, v := range options.TableMappings { + // we should be able to rely on `config` here. + tblNameMapping := make(map[string]model.NameAndExclude, len(config.TableMappings)) + for _, v := range config.TableMappings { tblNameMapping[v.SourceTableIdentifier] = model.NewNameAndExclude(v.DestinationTableIdentifier, v.Exclude) } @@ -136,7 +135,7 @@ func syncCore[TPull connectors.CDCPullConnectorCore, TSync connectors.CDCSyncCon return nil, temporal.NewNonRetryableApplicationError("connection to source down", "disconnect", nil) } - batchSize := options.BatchSize + batchSize := config.MaxBatchSize if batchSize == 0 { batchSize = 250_000 } @@ -190,13 +189,13 @@ func syncCore[TPull connectors.CDCPullConnectorCore, TSync connectors.CDCSyncCon errGroup.Go(func() error { return pull(srcConn, errCtx, a.CatalogPool, a.OtelManager, &model.PullRecordsRequest[Items]{ FlowJobName: flowName, - SrcTableIDNameMapping: options.SrcTableIdNameMapping, + SrcTableIDNameMapping: config.SrcTableIdNameMapping, TableNameMapping: tblNameMapping, LastOffset: lastOffset, ConsumedOffset: &consumedOffset, MaxBatchSize: batchSize, IdleTimeout: internal.PeerDBCDCIdleTimeoutSeconds( - int(options.IdleTimeoutSeconds), + int(config.IdleTimeoutSeconds), ), TableNameSchemaMapping: tableNameSchemaMapping, OverridePublicationName: config.PublicationName, @@ -234,11 +233,11 @@ func syncCore[TPull connectors.CDCPullConnectorCore, TSync connectors.CDCSyncCon defer connectors.CloseConnector(ctx, dstConn) syncState.Store(shared.Ptr("updating schema")) - if err := dstConn.ReplayTableSchemaDeltas(ctx, config.Env, flowName, options.TableMappings, recordBatchSync.SchemaDeltas); err != nil { + if err := dstConn.ReplayTableSchemaDeltas(ctx, config.Env, flowName, config.TableMappings, recordBatchSync.SchemaDeltas); err != nil { return nil, fmt.Errorf("failed to sync schema: %w", err) } - return nil, a.applySchemaDeltas(ctx, config, options, recordBatchSync.SchemaDeltas) + return nil, a.applySchemaDeltas(ctx, config, recordBatchSync.SchemaDeltas) } var res *model.SyncResponse @@ -271,7 +270,7 @@ func syncCore[TPull connectors.CDCPullConnectorCore, TSync connectors.CDCSyncCon Records: recordBatchSync, ConsumedOffset: &consumedOffset, FlowJobName: flowName, - TableMappings: options.TableMappings, + TableMappings: config.TableMappings, StagingPath: config.CdcStagingPath, Script: config.Script, TableNameSchemaMapping: tableNameSchemaMapping, @@ -334,7 +333,7 @@ func syncCore[TPull connectors.CDCPullConnectorCore, TSync connectors.CDCSyncCon a.OtelManager.Metrics.CurrentBatchIdGauge.Record(ctx, res.CurrentSyncBatchID) syncState.Store(shared.Ptr("updating schema")) - if err := a.applySchemaDeltas(ctx, config, options, res.TableSchemaDeltas); err != nil { + if err := a.applySchemaDeltas(ctx, config, res.TableSchemaDeltas); err != nil { return nil, err } diff --git a/flow/cmd/handler.go b/flow/cmd/handler.go index 9ae5c6a855..8058d8c92e 100644 --- a/flow/cmd/handler.go +++ b/flow/cmd/handler.go @@ -77,7 +77,6 @@ func (h *FlowRequestHandler) createCdcJobEntry(ctx context.Context, if err != nil { return fmt.Errorf("unable to marshal flow config: %w", err) } - if _, err := h.pool.Exec(ctx, `INSERT INTO flows (workflow_id, name, source_peer, destination_peer, config_proto, status, description, source_table_identifier, destination_table_identifier) VALUES ($1,$2,$3,$4,$5,$6,'gRPC','','')`, diff --git a/flow/cmd/mirror_status.go b/flow/cmd/mirror_status.go index bdb54760a3..e721ace2d2 100644 --- a/flow/cmd/mirror_status.go +++ b/flow/cmd/mirror_status.go @@ -473,7 +473,7 @@ func (h *FlowRequestHandler) getFlowConfigFromCatalog( ) (*protos.FlowConnectionConfigs, error) { var configBytes sql.RawBytes if err := h.pool.QueryRow(ctx, - "SELECT config_proto FROM flows WHERE name = $1", flowJobName, + "SELECT config_proto FROM flows WHERE name = $1 LIMIT 1", flowJobName, ).Scan(&configBytes); err != nil { slog.ErrorContext(ctx, "unable to query flow config from catalog", slog.Any("error", err)) return nil, fmt.Errorf("unable to query flow config from catalog: %w", err) diff --git a/flow/connectors/clickhouse/normalize.go b/flow/connectors/clickhouse/normalize.go index 58b9c216f1..b8e2d6b46d 100644 --- a/flow/connectors/clickhouse/normalize.go +++ b/flow/connectors/clickhouse/normalize.go @@ -90,7 +90,12 @@ func (c *ClickHouseConnector) generateCreateTableSQLForNormalizedTable( tmEngine := protos.TableEngine_CH_ENGINE_REPLACING_MERGE_TREE var tableMapping *protos.TableMapping - for _, tm := range config.TableMappings { + + cfg, err := internal.FetchConfigFromDB(config.FlowName) + if err != nil { + return nil, fmt.Errorf("failed to fetch config from DB: %w", err) + } + for _, tm := range cfg.TableMappings { if tm.DestinationTableIdentifier == tableIdentifier { tmEngine = tm.Engine tableMapping = tm diff --git a/flow/connectors/postgres/postgres.go b/flow/connectors/postgres/postgres.go index 78c7025b2d..bbe84cc26d 100644 --- a/flow/connectors/postgres/postgres.go +++ b/flow/connectors/postgres/postgres.go @@ -448,10 +448,15 @@ func pullCore[Items model.Items]( return fmt.Errorf("failed to get get setting for originMetaAsDestinationColumn: %w", err) } + cfgFromDB, err := internal.FetchConfigFromDB(req.FlowJobName) + if err != nil { + return fmt.Errorf("unable to query flow config from catalog: %w", err) + } + cdc, err := c.NewPostgresCDCSource(ctx, &PostgresCDCConfig{ CatalogPool: catalogPool, OtelManager: otelManager, - SrcTableIDNameMapping: req.SrcTableIDNameMapping, + SrcTableIDNameMapping: cfgFromDB.SrcTableIdNameMapping, TableNameMapping: req.TableNameMapping, TableNameSchemaMapping: req.TableNameSchemaMapping, RelationMessageMapping: c.relationMessageMapping, diff --git a/flow/e2e/postgres/peer_flow_pg_test.go b/flow/e2e/postgres/peer_flow_pg_test.go index 451d5a7437..b81fb0072d 100644 --- a/flow/e2e/postgres/peer_flow_pg_test.go +++ b/flow/e2e/postgres/peer_flow_pg_test.go @@ -13,6 +13,7 @@ import ( "github.com/PeerDB-io/peerdb/flow/e2e" "github.com/PeerDB-io/peerdb/flow/generated/protos" + "github.com/PeerDB-io/peerdb/flow/internal" "github.com/PeerDB-io/peerdb/flow/model" "github.com/PeerDB-io/peerdb/flow/shared" peerflow "github.com/PeerDB-io/peerdb/flow/workflows" @@ -934,11 +935,12 @@ func (s PeerFlowE2ETestSuitePG) Test_Dynamic_Mirror_Config_Via_Signals() { return s.comparePGTables(srcTable1Name, dstTable1Name, "id,t") == nil }) - workflowState := e2e.EnvGetWorkflowState(s.t, env) - assert.EqualValues(s.t, 7, workflowState.SyncFlowOptions.IdleTimeoutSeconds) - assert.EqualValues(s.t, 6, workflowState.SyncFlowOptions.BatchSize) - assert.Len(s.t, workflowState.SyncFlowOptions.TableMappings, 1) - assert.Len(s.t, workflowState.SyncFlowOptions.SrcTableIdNameMapping, 1) + updatedConfig, err := internal.FetchConfigFromDB(config.FlowJobName) + require.NoError(s.t, err) + assert.EqualValues(s.t, 7, updatedConfig.IdleTimeoutSeconds) + assert.EqualValues(s.t, 6, updatedConfig.MaxBatchSize) + assert.Len(s.t, updatedConfig.TableMappings, 1) + assert.Len(s.t, updatedConfig.SrcTableIdNameMapping, 1) if !s.t.Failed() { e2e.SignalWorkflow(s.t.Context(), env, model.FlowSignal, model.PauseSignal) @@ -976,11 +978,12 @@ func (s PeerFlowE2ETestSuitePG) Test_Dynamic_Mirror_Config_Via_Signals() { return s.comparePGTables(srcTable2Name, dstTable2Name, "id,t") == nil }) - workflowState = e2e.EnvGetWorkflowState(s.t, env) - assert.EqualValues(s.t, 14, workflowState.SyncFlowOptions.IdleTimeoutSeconds) - assert.EqualValues(s.t, 12, workflowState.SyncFlowOptions.BatchSize) - assert.Len(s.t, workflowState.SyncFlowOptions.TableMappings, 2) - assert.Len(s.t, workflowState.SyncFlowOptions.SrcTableIdNameMapping, 2) + updatedConfig, err = internal.FetchConfigFromDB(config.FlowJobName) + require.NoError(s.t, err) + assert.EqualValues(s.t, 14, updatedConfig.IdleTimeoutSeconds) + assert.EqualValues(s.t, 12, updatedConfig.MaxBatchSize) + assert.Len(s.t, updatedConfig.TableMappings, 2) + assert.Len(s.t, updatedConfig.SrcTableIdNameMapping, 2) } env.Cancel(s.t.Context()) diff --git a/flow/internal/flow_configuration_helpers.go b/flow/internal/flow_configuration_helpers.go new file mode 100644 index 0000000000..beb2f275ea --- /dev/null +++ b/flow/internal/flow_configuration_helpers.go @@ -0,0 +1,38 @@ +package internal + +import ( + "context" + "database/sql" + "fmt" + + "google.golang.org/protobuf/proto" + + "github.com/PeerDB-io/peerdb/flow/generated/protos" +) + +func TableNameMapping(tableMappings []*protos.TableMapping) map[string]string { + tblNameMapping := make(map[string]string, len(tableMappings)) + for _, v := range tableMappings { + tblNameMapping[v.SourceTableIdentifier] = v.DestinationTableIdentifier + } + return tblNameMapping +} + +func FetchConfigFromDB(flowName string) (*protos.FlowConnectionConfigs, error) { + var configBytes sql.RawBytes + dbCtx := context.Background() + pool, _ := GetCatalogConnectionPoolFromEnv(dbCtx) + defer dbCtx.Done() + if err := pool.QueryRow(dbCtx, + "SELECT config_proto FROM flows WHERE name = $1 LIMIT 1", flowName, + ).Scan(&configBytes); err != nil { + return nil, fmt.Errorf("unable to query flow config from catalog: %w", err) + } + + var cfgFromDB protos.FlowConnectionConfigs + if err := proto.Unmarshal(configBytes, &cfgFromDB); err != nil { + return nil, fmt.Errorf("unable to unmarshal flow config: %w", err) + } + + return &cfgFromDB, nil +} diff --git a/flow/workflows/cdc_flow.go b/flow/workflows/cdc_flow.go index 2b9f66fe53..2172936a3f 100644 --- a/flow/workflows/cdc_flow.go +++ b/flow/workflows/cdc_flow.go @@ -25,8 +25,6 @@ import ( type CDCFlowWorkflowState struct { // flow config update request, set to nil after processed FlowConfigUpdate *protos.CDCFlowConfigUpdate - // options passed to all SyncFlows - SyncFlowOptions *protos.SyncFlowOptions // for becoming DropFlow DropFlowInput *protos.DropFlowInput // used for computing backoff timeout @@ -35,33 +33,14 @@ type CDCFlowWorkflowState struct { // Current signalled state of the peer flow. ActiveSignal model.CDCFlowSignal CurrentFlowStatus protos.FlowStatus - - // Initial load settings - SnapshotNumRowsPerPartition uint32 - SnapshotNumPartitionsOverride uint32 - SnapshotMaxParallelWorkers uint32 - SnapshotNumTablesInParallel uint32 } // returns a new empty PeerFlowState func NewCDCFlowWorkflowState(ctx workflow.Context, logger log.Logger, cfg *protos.FlowConnectionConfigs) *CDCFlowWorkflowState { - tableMappings := make([]*protos.TableMapping, 0, len(cfg.TableMappings)) - for _, tableMapping := range cfg.TableMappings { - tableMappings = append(tableMappings, proto.CloneOf(tableMapping)) - } state := CDCFlowWorkflowState{ ActiveSignal: model.NoopSignal, CurrentFlowStatus: protos.FlowStatus_STATUS_SETUP, FlowConfigUpdate: nil, - SyncFlowOptions: &protos.SyncFlowOptions{ - BatchSize: cfg.MaxBatchSize, - IdleTimeoutSeconds: cfg.IdleTimeoutSeconds, - TableMappings: tableMappings, - }, - SnapshotNumRowsPerPartition: cfg.SnapshotNumRowsPerPartition, - SnapshotNumPartitionsOverride: cfg.SnapshotNumPartitionsOverride, - SnapshotMaxParallelWorkers: cfg.SnapshotMaxParallelWorkers, - SnapshotNumTablesInParallel: cfg.SnapshotNumTablesInParallel, } syncStatusToCatalog(ctx, logger, state.CurrentFlowStatus) return &state @@ -100,23 +79,16 @@ func GetChildWorkflowID( func updateFlowConfigWithLatestSettings( cfg *protos.FlowConnectionConfigs, - state *CDCFlowWorkflowState, + flowConfigUpdate *protos.CDCFlowConfigUpdate, ) *protos.FlowConnectionConfigs { cloneCfg := proto.CloneOf(cfg) - cloneCfg.MaxBatchSize = state.SyncFlowOptions.BatchSize - cloneCfg.IdleTimeoutSeconds = state.SyncFlowOptions.IdleTimeoutSeconds - cloneCfg.TableMappings = state.SyncFlowOptions.TableMappings - if state.SnapshotNumRowsPerPartition > 0 { - cloneCfg.SnapshotNumRowsPerPartition = state.SnapshotNumRowsPerPartition - } - if state.SnapshotNumPartitionsOverride > 0 { - cloneCfg.SnapshotNumPartitionsOverride = state.SnapshotNumPartitionsOverride - } - if state.SnapshotMaxParallelWorkers > 0 { - cloneCfg.SnapshotMaxParallelWorkers = state.SnapshotMaxParallelWorkers - } - if state.SnapshotNumTablesInParallel > 0 { - cloneCfg.SnapshotNumTablesInParallel = state.SnapshotNumTablesInParallel + if flowConfigUpdate != nil { + cloneCfg.MaxBatchSize = flowConfigUpdate.BatchSize + cloneCfg.IdleTimeoutSeconds = flowConfigUpdate.IdleTimeout + cloneCfg.SnapshotNumRowsPerPartition = flowConfigUpdate.SnapshotNumRowsPerPartition + cloneCfg.SnapshotNumPartitionsOverride = flowConfigUpdate.SnapshotNumPartitionsOverride + cloneCfg.SnapshotMaxParallelWorkers = flowConfigUpdate.SnapshotMaxParallelWorkers + cloneCfg.SnapshotNumTablesInParallel = flowConfigUpdate.SnapshotNumTablesInParallel } return cloneCfg } @@ -127,9 +99,9 @@ type CDCFlowWorkflowResult = CDCFlowWorkflowState func syncStateToConfigProtoInCatalog( ctx workflow.Context, cfg *protos.FlowConnectionConfigs, - state *CDCFlowWorkflowState, + flowConfigUpdate *protos.CDCFlowConfigUpdate, ) *protos.FlowConnectionConfigs { - cloneCfg := updateFlowConfigWithLatestSettings(cfg, state) + cloneCfg := updateFlowConfigWithLatestSettings(cfg, flowConfigUpdate) uploadConfigToCatalog(ctx, cloneCfg) return cloneCfg } @@ -157,34 +129,19 @@ func processCDCFlowConfigUpdate( ) error { flowConfigUpdate := state.FlowConfigUpdate - // only modify for options since SyncFlow uses it - if flowConfigUpdate.BatchSize > 0 { - state.SyncFlowOptions.BatchSize = flowConfigUpdate.BatchSize - } - if flowConfigUpdate.IdleTimeout > 0 { - state.SyncFlowOptions.IdleTimeoutSeconds = flowConfigUpdate.IdleTimeout - } if flowConfigUpdate.UpdatedEnv != nil { if cfg.Env == nil { cfg.Env = make(map[string]string, len(flowConfigUpdate.UpdatedEnv)) } maps.Copy(cfg.Env, flowConfigUpdate.UpdatedEnv) } - if flowConfigUpdate.SnapshotNumRowsPerPartition > 0 { - state.SnapshotNumRowsPerPartition = flowConfigUpdate.SnapshotNumRowsPerPartition - } - if flowConfigUpdate.SnapshotNumPartitionsOverride > 0 { - state.SnapshotNumPartitionsOverride = flowConfigUpdate.SnapshotNumPartitionsOverride - } - if flowConfigUpdate.SnapshotMaxParallelWorkers > 0 { - state.SnapshotMaxParallelWorkers = flowConfigUpdate.SnapshotMaxParallelWorkers - } - if flowConfigUpdate.SnapshotNumTablesInParallel > 0 { - state.SnapshotNumTablesInParallel = flowConfigUpdate.SnapshotNumTablesInParallel - } + cfg = syncStateToConfigProtoInCatalog(ctx, cfg, state.FlowConfigUpdate) tablesAreAdded := len(flowConfigUpdate.AdditionalTables) > 0 tablesAreRemoved := len(flowConfigUpdate.RemovedTables) > 0 + if !tablesAreAdded && !tablesAreRemoved { + return nil + } if tablesAreAdded || tablesAreRemoved { logger.Info("processing CDCFlowConfigUpdate", slog.Any("updatedState", flowConfigUpdate)) @@ -203,7 +160,6 @@ func processCDCFlowConfigUpdate( } } - syncStateToConfigProtoInCatalog(ctx, cfg, state) return nil } @@ -219,7 +175,7 @@ func handleFlowSignalStateChange( case protos.FlowStatus_STATUS_TERMINATING: logger.Info("terminating CDCFlow", slog.String("operation", op)) state.ActiveSignal = model.TerminateSignal - dropCfg := syncStateToConfigProtoInCatalog(ctx, cfg, state) + dropCfg := syncStateToConfigProtoInCatalog(ctx, cfg, state.FlowConfigUpdate) state.DropFlowInput = &protos.DropFlowInput{ FlowJobName: dropCfg.FlowJobName, FlowConnectionConfigs: dropCfg, @@ -256,12 +212,7 @@ func processTableAdditions( ) error { flowConfigUpdate := state.FlowConfigUpdate if len(flowConfigUpdate.AdditionalTables) == 0 { - syncStateToConfigProtoInCatalog(ctx, cfg, state) - return nil - } - if internal.AdditionalTablesHasOverlap(state.SyncFlowOptions.TableMappings, flowConfigUpdate.AdditionalTables) { - logger.Warn("duplicate source/destination tables found in additionalTables") - syncStateToConfigProtoInCatalog(ctx, cfg, state) + syncStateToConfigProtoInCatalog(ctx, cfg, state.FlowConfigUpdate) return nil } state.updateStatus(ctx, logger, protos.FlowStatus_STATUS_SNAPSHOT) @@ -278,7 +229,7 @@ func processTableAdditions( alterPublicationAddAdditionalTablesFuture := workflow.ExecuteActivity( alterPublicationAddAdditionalTablesCtx, flowable.AddTablesToPublication, - cfg, flowConfigUpdate.AdditionalTables) + cfg.FlowJobName, flowConfigUpdate.AdditionalTables) var res *CDCFlowWorkflowResult var addTablesFlowErr error @@ -291,7 +242,7 @@ func processTableAdditions( additionalTablesCfg := proto.CloneOf(cfg) additionalTablesCfg.DoInitialSnapshot = !flowConfigUpdate.SkipInitialSnapshotForTableAdditions additionalTablesCfg.InitialSnapshotOnly = true - additionalTablesCfg.TableMappings = flowConfigUpdate.AdditionalTables + additionalTablesCfg.TableMappings = append(additionalTablesCfg.TableMappings, flowConfigUpdate.AdditionalTables...) additionalTablesCfg.Resync = false if state.SnapshotNumRowsPerPartition > 0 { additionalTablesCfg.SnapshotNumRowsPerPartition = state.SnapshotNumRowsPerPartition @@ -306,6 +257,8 @@ func processTableAdditions( additionalTablesCfg.SnapshotNumTablesInParallel = state.SnapshotNumTablesInParallel } + uploadConfigToCatalog(ctx, additionalTablesCfg) + // execute the sync flow as a child workflow childAddTablesCDCFlowOpts := workflow.ChildWorkflowOptions{ WorkflowID: childAdditionalTablesCDCFlowID, @@ -317,11 +270,12 @@ func processTableAdditions( WaitForCancellation: true, } childAddTablesCDCFlowCtx := workflow.WithChildOptions(ctx, childAddTablesCDCFlowOpts) + childAddTablesCDCFlowFuture := workflow.ExecuteChildWorkflow( childAddTablesCDCFlowCtx, CDCFlowWorkflow, - additionalTablesCfg, - nil, + additionalTablesCfg.FlowJobName, + nil, // nil is passed to trigger `setup` flow. ) addTablesSelector.AddFuture(childAddTablesCDCFlowFuture, func(f workflow.Future) { addTablesFlowErr = f.Get(childAddTablesCDCFlowCtx, &res) @@ -330,13 +284,16 @@ func processTableAdditions( }) // additional tables should also be resynced, we don't know how much was done so far - state.SyncFlowOptions.TableMappings = append(state.SyncFlowOptions.TableMappings, flowConfigUpdate.AdditionalTables...) + // state.SyncFlowOptions.TableMappings = append(state.SyncFlowOptions.TableMappings, flowConfigUpdate.AdditionalTables...) for res == nil { addTablesSelector.Select(ctx) if state.ActiveSignal == model.TerminateSignal || state.ActiveSignal == model.ResyncSignal { if state.ActiveSignal == model.ResyncSignal { - resyncCfg := syncStateToConfigProtoInCatalog(ctx, cfg, state) + // additional tables should also be resynced, we don't know how much was done so far + // state.SyncFlowOptions.TableMappings = append(state.SyncFlowOptions.TableMappings, flowConfigUpdate.AdditionalTables...) + resyncCfg := syncStateToConfigProtoInCatalog(ctx, cfg, state.FlowConfigUpdate) + state.DropFlowInput.FlowJobName = resyncCfg.FlowJobName state.DropFlowInput.FlowConnectionConfigs = resyncCfg } return workflow.NewContinueAsNewError(ctx, DropFlowWorkflow, state.DropFlowInput) @@ -351,8 +308,6 @@ func processTableAdditions( } } - maps.Copy(state.SyncFlowOptions.SrcTableIdNameMapping, res.SyncFlowOptions.SrcTableIdNameMapping) - logger.Info("additional tables added to sync flow") return nil } @@ -425,11 +380,11 @@ func processTableRemovals( for _, removedTable := range state.FlowConfigUpdate.RemovedTables { removedTables[removedTable.SourceTableIdentifier] = struct{}{} } - maps.DeleteFunc(state.SyncFlowOptions.SrcTableIdNameMapping, func(k uint32, v string) bool { + maps.DeleteFunc(cfg.SrcTableIdNameMapping, func(k uint32, v string) bool { _, removed := removedTables[v] return removed }) - state.SyncFlowOptions.TableMappings = slices.DeleteFunc(state.SyncFlowOptions.TableMappings, func(tm *protos.TableMapping) bool { + cfg.TableMappings = slices.DeleteFunc(cfg.TableMappings, func(tm *protos.TableMapping) bool { _, removed := removedTables[tm.SourceTableIdentifier] return removed }) @@ -438,7 +393,7 @@ func processTableRemovals( removeTablesSelector.Select(ctx) if state.ActiveSignal == model.TerminateSignal || state.ActiveSignal == model.ResyncSignal { if state.ActiveSignal == model.ResyncSignal { - resyncCfg := syncStateToConfigProtoInCatalog(ctx, cfg, state) + resyncCfg := syncStateToConfigProtoInCatalog(ctx, cfg, state.FlowConfigUpdate) state.DropFlowInput.FlowConnectionConfigs = resyncCfg } return workflow.NewContinueAsNewError(ctx, DropFlowWorkflow, state.DropFlowInput) @@ -452,7 +407,6 @@ func processTableRemovals( return fmt.Errorf("failed to execute child CDCFlow for additional tables: %w", removeTablesFlowErr) } } - return nil } @@ -467,8 +421,6 @@ func addCdcPropertiesSignalListener( // do this irrespective of additional tables being present, for auto unpausing state.FlowConfigUpdate = cdcConfigUpdate logger.Info("CDC Signal received", - slog.Uint64("BatchSize", uint64(state.SyncFlowOptions.BatchSize)), - slog.Uint64("IdleTimeout", state.SyncFlowOptions.IdleTimeoutSeconds), slog.Any("AdditionalTables", cdcConfigUpdate.AdditionalTables), slog.Any("RemovedTables", cdcConfigUpdate.RemovedTables), slog.Any("UpdatedEnv", cdcConfigUpdate.UpdatedEnv), @@ -483,12 +435,17 @@ func addCdcPropertiesSignalListener( func CDCFlowWorkflow( ctx workflow.Context, - cfg *protos.FlowConnectionConfigs, + flowJobName string, + // cfg *protos.FlowConnectionConfigs, state *CDCFlowWorkflowState, ) (*CDCFlowWorkflowResult, error) { + cfg, err := internal.FetchConfigFromDB(flowJobName) if cfg == nil { return nil, errors.New("invalid connection configs") } + if err != nil { + return nil, fmt.Errorf("unable to unmarshal flow config: %w", err) + } logger := log.With(workflow.GetLogger(ctx), slog.String(string(shared.FlowNameKey), cfg.FlowJobName)) if state == nil { @@ -523,7 +480,7 @@ func CDCFlowWorkflow( switch val.RequestedFlowState { case protos.FlowStatus_STATUS_TERMINATING: state.ActiveSignal = model.TerminateSignal - dropCfg := syncStateToConfigProtoInCatalog(ctx, cfg, state) + dropCfg := syncStateToConfigProtoInCatalog(ctx, cfg, state.FlowConfigUpdate) state.DropFlowInput = &protos.DropFlowInput{ FlowJobName: dropCfg.FlowJobName, FlowConnectionConfigs: dropCfg, @@ -534,7 +491,7 @@ func CDCFlowWorkflow( state.ActiveSignal = model.ResyncSignal cfg.Resync = true cfg.DoInitialSnapshot = true - resyncCfg := syncStateToConfigProtoInCatalog(ctx, cfg, state) + resyncCfg := syncStateToConfigProtoInCatalog(ctx, cfg, state.FlowConfigUpdate) state.DropFlowInput = &protos.DropFlowInput{ FlowJobName: resyncCfg.FlowJobName, FlowConnectionConfigs: resyncCfg, @@ -576,11 +533,10 @@ func CDCFlowWorkflow( logger.Info("mirror resumed", slog.Duration("after", time.Since(startTime))) state.updateStatus(ctx, logger, protos.FlowStatus_STATUS_RUNNING) - return state, workflow.NewContinueAsNewError(ctx, CDCFlowWorkflow, cfg, state) + return state, workflow.NewContinueAsNewError(ctx, CDCFlowWorkflow, cfg.FlowJobName, state) } originalRunID := workflow.GetInfo(ctx).OriginalRunID - state.SyncFlowOptions.NumberOfSyncs = 0 // removed feature for { if err := ctx.Err(); err != nil { @@ -609,6 +565,7 @@ func CDCFlowWorkflow( // for safety, rely on the idempotency of SetupFlow instead // also, no signals are being handled until the loop starts, so no PAUSE/DROP will take here. if state.CurrentFlowStatus != protos.FlowStatus_STATUS_RUNNING { + // have to get cfg from DB. originalTableMappings := make([]*protos.TableMapping, 0, len(cfg.TableMappings)) for _, tableMapping := range cfg.TableMappings { originalTableMappings = append(originalTableMappings, proto.CloneOf(tableMapping)) @@ -616,13 +573,9 @@ func CDCFlowWorkflow( // if resync is true, alter the table name schema mapping to temporarily add // a suffix to the table names. if cfg.Resync { - for _, mapping := range state.SyncFlowOptions.TableMappings { - if mapping.Engine != protos.TableEngine_CH_ENGINE_NULL { - mapping.DestinationTableIdentifier += "_resync" - } - } - // because we have renamed the tables. - cfg.TableMappings = state.SyncFlowOptions.TableMappings + return nil, errors.New("cannot start CDCFlow with Resync enabled, please drop the flow and start again") + // TODOAS: this will need to be resolved somehow, as we cannot pass all of + // table mappings. } // start the SetupFlow workflow as a child workflow, and wait for it to complete @@ -637,7 +590,7 @@ func CDCFlowWorkflow( logger.Warn("pause requested during setup, ignoring") case protos.FlowStatus_STATUS_TERMINATING: state.ActiveSignal = model.TerminateSignal - dropCfg := syncStateToConfigProtoInCatalog(ctx, cfg, state) + dropCfg := syncStateToConfigProtoInCatalog(ctx, cfg, state.FlowConfigUpdate) state.DropFlowInput = &protos.DropFlowInput{ FlowJobName: dropCfg.FlowJobName, FlowConnectionConfigs: dropCfg, @@ -672,7 +625,7 @@ func CDCFlowWorkflow( WaitForCancellation: true, } setupFlowCtx := workflow.WithChildOptions(ctx, childSetupFlowOpts) - setupFlowFuture := workflow.ExecuteChildWorkflow(setupFlowCtx, SetupFlowWorkflow, cfg) + setupFlowFuture := workflow.ExecuteChildWorkflow(setupFlowCtx, SetupFlowWorkflow, cfg.FlowJobName) var setupFlowOutput *protos.SetupFlowOutput var setupFlowError error @@ -695,9 +648,40 @@ func CDCFlowWorkflow( } } - state.SyncFlowOptions.SrcTableIdNameMapping = setupFlowOutput.SrcTableIdNameMapping state.updateStatus(ctx, logger, protos.FlowStatus_STATUS_SNAPSHOT) + if cfg.SrcTableIdNameMapping == nil { + cfg.SrcTableIdNameMapping = make(map[uint32]string, len(setupFlowOutput.SrcTableIdNameMapping)) + } + // list of table names which are in cfg but not in the setupFlowOutput; are + // the ones which have been added to the flow. + + var newTables []string + + if cfg.SrcTableIdNameMapping == nil { + cfg.SrcTableIdNameMapping = make(map[uint32]string) + } + + for k, v := range setupFlowOutput.SrcTableIdNameMapping { + if _, exists := cfg.SrcTableIdNameMapping[k]; !exists { + newTables = append(newTables, v) + cfg.SrcTableIdNameMapping[k] = v + } + } + + // compute additional tables by selecting + + var additionalTables []*protos.TableMapping + for _, tableMapping := range cfg.TableMappings { + if slices.Contains(newTables, tableMapping.SourceTableIdentifier) { + additionalTables = append(additionalTables, tableMapping) + } + } + + // TODOAS: here we will also store the table mappings in the state. + maps.Copy(cfg.SrcTableIdNameMapping, setupFlowOutput.SrcTableIdNameMapping) + uploadConfigToCatalog(ctx, cfg) + // next part of the setup is to snapshot-initial-copy and setup replication slots. snapshotFlowID := GetChildWorkflowID("snapshot-flow", cfg.FlowJobName, originalRunID) @@ -716,7 +700,17 @@ func CDCFlowWorkflow( // so we can use the same cfg for snapshot flow, and then rely on being state being saved to catalog // during any operation that triggers another snapshot (INCLUDING add tables). // this could fail for very weird Temporal resets - snapshotFlowFuture := workflow.ExecuteChildWorkflow(snapshotFlowCtx, SnapshotFlowWorkflow, cfg) + + // TODOAS : this will send the additionalTables to `temporal`, meaning + // that we cannot add too many tables at once, or we risk the blob is too + // large (2MB limit). + snapshotFlowFuture := workflow.ExecuteChildWorkflow( + snapshotFlowCtx, + SnapshotFlowWorkflow, + cfg.FlowJobName, + additionalTables, + ) + var snapshotDone bool var snapshotError error setupSnapshotSelector.AddFuture(snapshotFlowFuture, func(f workflow.Future) { @@ -748,7 +742,7 @@ func CDCFlowWorkflow( SoftDeleteColName: cfg.SoftDeleteColName, } - for _, mapping := range state.SyncFlowOptions.TableMappings { + for _, mapping := range cfg.TableMappings { if mapping.Engine != protos.TableEngine_CH_ENGINE_NULL { oldName := mapping.DestinationTableIdentifier newName := strings.TrimSuffix(oldName, "_resync") @@ -772,6 +766,7 @@ func CDCFlowWorkflow( InitialInterval: 1 * time.Minute, }, }) + // renameOpts will need to be computed again as it holds list of tables. renameTablesFuture := workflow.ExecuteActivity(renameTablesCtx, flowable.RenameTables, renameOpts) var renameTablesDone bool var renameTablesError error @@ -808,7 +803,7 @@ func CDCFlowWorkflow( logger.Info("executed setup flow and snapshot flow, start running") state.updateStatus(ctx, logger, protos.FlowStatus_STATUS_RUNNING) } - return state, workflow.NewContinueAsNewError(ctx, CDCFlowWorkflow, cfg, state) + return state, workflow.NewContinueAsNewError(ctx, CDCFlowWorkflow, cfg.FlowJobName, state) } var finished bool @@ -819,7 +814,7 @@ func CDCFlowWorkflow( WaitForCancellation: true, RetryPolicy: &temporal.RetryPolicy{MaximumAttempts: 1}, })) - syncFlowFuture := workflow.ExecuteActivity(syncCtx, flowable.SyncFlow, cfg, state.SyncFlowOptions) + syncFlowFuture := workflow.ExecuteActivity(syncCtx, flowable.SyncFlow, cfg, nil) mainLoopSelector := workflow.NewNamedSelector(ctx, "MainLoop") mainLoopSelector.AddReceive(ctx.Done(), func(_ workflow.ReceiveChannel, _ bool) { @@ -874,7 +869,7 @@ func CDCFlowWorkflow( switch val.RequestedFlowState { case protos.FlowStatus_STATUS_TERMINATING: state.ActiveSignal = model.TerminateSignal - dropCfg := syncStateToConfigProtoInCatalog(ctx, cfg, state) + dropCfg := syncStateToConfigProtoInCatalog(ctx, cfg, state.FlowConfigUpdate) state.DropFlowInput = &protos.DropFlowInput{ FlowJobName: dropCfg.FlowJobName, FlowConnectionConfigs: dropCfg, @@ -885,7 +880,7 @@ func CDCFlowWorkflow( state.ActiveSignal = model.ResyncSignal cfg.Resync = true cfg.DoInitialSnapshot = true - resyncCfg := syncStateToConfigProtoInCatalog(ctx, cfg, state) + resyncCfg := syncStateToConfigProtoInCatalog(ctx, cfg, state.FlowConfigUpdate) state.DropFlowInput = &protos.DropFlowInput{ FlowJobName: resyncCfg.FlowJobName, FlowConnectionConfigs: resyncCfg, @@ -938,7 +933,7 @@ func CDCFlowWorkflow( if state.ActiveSignal == model.TerminateSignal || state.ActiveSignal == model.ResyncSignal { return state, workflow.NewContinueAsNewError(ctx, DropFlowWorkflow, state.DropFlowInput) } - return state, workflow.NewContinueAsNewError(ctx, CDCFlowWorkflow, cfg, state) + return state, workflow.NewContinueAsNewError(ctx, CDCFlowWorkflow, cfg.FlowJobName, state) } } } diff --git a/flow/workflows/qrep_flow.go b/flow/workflows/qrep_flow.go index 9dd42ed6f3..633c287a4c 100644 --- a/flow/workflows/qrep_flow.go +++ b/flow/workflows/qrep_flow.go @@ -107,12 +107,6 @@ func (q *QRepFlowExecution) setupTableSchema(ctx workflow.Context, tableName str tableSchemaInput := &protos.SetupTableSchemaBatchInput{ PeerName: q.config.SourceName, - TableMappings: []*protos.TableMapping{ - { - SourceTableIdentifier: tableName, - DestinationTableIdentifier: q.config.DestinationTableIdentifier, - }, - }, FlowName: q.config.FlowJobName, System: q.config.System, Env: q.config.Env, @@ -145,15 +139,7 @@ func (q *QRepFlowExecution) setupWatermarkTableOnDestination(ctx workflow.Contex // now setup the normalized tables on the destination peer setupConfig := &protos.SetupNormalizedTableBatchInput{ - PeerName: q.config.DestinationName, - TableMappings: []*protos.TableMapping{ - { - SourceTableIdentifier: q.config.WatermarkTable, - DestinationTableIdentifier: q.config.DestinationTableIdentifier, - Exclude: q.config.Exclude, - Columns: q.config.Columns, - }, - }, + PeerName: q.config.DestinationName, SyncedAtColName: q.config.SyncedAtColName, SoftDeleteColName: q.config.SoftDeleteColName, FlowName: q.config.FlowJobName, diff --git a/flow/workflows/setup_flow.go b/flow/workflows/setup_flow.go index f78371b346..fb85f64806 100644 --- a/flow/workflows/setup_flow.go +++ b/flow/workflows/setup_flow.go @@ -13,6 +13,7 @@ import ( "github.com/PeerDB-io/peerdb/flow/activities" "github.com/PeerDB-io/peerdb/flow/generated/protos" + "github.com/PeerDB-io/peerdb/flow/internal" "github.com/PeerDB-io/peerdb/flow/shared" ) @@ -33,18 +34,16 @@ import ( // - creating the normalized table on the destination peer type SetupFlowExecution struct { log.Logger - tableNameMapping map[string]string - cdcFlowName string - executionID string + cdcFlowName string + executionID string } // NewSetupFlowExecution creates a new instance of SetupFlowExecution. -func NewSetupFlowExecution(ctx workflow.Context, tableNameMapping map[string]string, cdcFlowName string) *SetupFlowExecution { +func NewSetupFlowExecution(ctx workflow.Context, cdcFlowName string) *SetupFlowExecution { return &SetupFlowExecution{ - Logger: log.With(workflow.GetLogger(ctx), slog.String(string(shared.FlowNameKey), cdcFlowName)), - tableNameMapping: tableNameMapping, - cdcFlowName: cdcFlowName, - executionID: workflow.GetInfo(ctx).WorkflowExecution.ID, + Logger: log.With(workflow.GetLogger(ctx), slog.String(string(shared.FlowNameKey), cdcFlowName)), + cdcFlowName: cdcFlowName, + executionID: workflow.GetInfo(ctx).WorkflowExecution.ID, } } @@ -121,7 +120,7 @@ func (s *SetupFlowExecution) ensurePullability( ensurePullabilityInput := &protos.EnsurePullabilityBatchInput{ PeerName: config.SourceName, FlowJobName: s.cdcFlowName, - SourceTableIdentifiers: slices.Sorted(maps.Keys(s.tableNameMapping)), + SourceTableIdentifiers: []string{}, CheckConstraints: checkConstraints, } @@ -164,7 +163,7 @@ func (s *SetupFlowExecution) createRawTable( createRawTblInput := &protos.CreateRawTableInput{ PeerName: config.DestinationName, FlowJobName: s.cdcFlowName, - TableNameMapping: s.tableNameMapping, + TableNameMapping: internal.TableNameMapping(config.TableMappings), } rawTblFuture := workflow.ExecuteActivity(ctx, flowable.CreateRawTable, createRawTblInput) @@ -191,12 +190,11 @@ func (s *SetupFlowExecution) setupNormalizedTables( }) tableSchemaInput := &protos.SetupTableSchemaBatchInput{ - PeerName: flowConnectionConfigs.SourceName, - TableMappings: flowConnectionConfigs.TableMappings, - FlowName: s.cdcFlowName, - System: flowConnectionConfigs.System, - Env: flowConnectionConfigs.Env, - Version: flowConnectionConfigs.Version, + PeerName: flowConnectionConfigs.SourceName, + FlowName: s.cdcFlowName, + System: flowConnectionConfigs.System, + Env: flowConnectionConfigs.Env, + Version: flowConnectionConfigs.Version, } if err := workflow.ExecuteActivity(ctx, flowable.SetupTableSchema, tableSchemaInput).Get(ctx, nil); err != nil { @@ -207,7 +205,6 @@ func (s *SetupFlowExecution) setupNormalizedTables( s.Info("setting up normalized tables on destination peer", slog.String("destination", flowConnectionConfigs.DestinationName)) setupConfig := &protos.SetupNormalizedTableBatchInput{ PeerName: flowConnectionConfigs.DestinationName, - TableMappings: flowConnectionConfigs.TableMappings, SoftDeleteColName: flowConnectionConfigs.SoftDeleteColName, SyncedAtColName: flowConnectionConfigs.SyncedAtColName, FlowName: flowConnectionConfigs.FlowJobName, @@ -227,9 +224,14 @@ func (s *SetupFlowExecution) setupNormalizedTables( // executeSetupFlow executes the setup flow. func (s *SetupFlowExecution) executeSetupFlow( ctx workflow.Context, - config *protos.FlowConnectionConfigs, + flowJobName string, ) (*protos.SetupFlowOutput, error) { s.Info("executing setup flow") + // gotta fetch the config from the catalog. + config, err := internal.FetchConfigFromDB(flowJobName) + if err != nil { + return nil, fmt.Errorf("unable to fetch config from DB: %w", err) + } // first check the connectionsAndSetupMetadataTables if err := s.checkConnectionsAndSetupMetadataTables(ctx, config); err != nil { @@ -260,17 +262,12 @@ func (s *SetupFlowExecution) executeSetupFlow( } // SetupFlowWorkflow is the workflow that sets up the flow. -func SetupFlowWorkflow(ctx workflow.Context, config *protos.FlowConnectionConfigs) (*protos.SetupFlowOutput, error) { - tblNameMapping := make(map[string]string, len(config.TableMappings)) - for _, v := range config.TableMappings { - tblNameMapping[v.SourceTableIdentifier] = v.DestinationTableIdentifier - } - +func SetupFlowWorkflow(ctx workflow.Context, flowJobName string) (*protos.SetupFlowOutput, error) { // create the setup flow execution - setupFlowExecution := NewSetupFlowExecution(ctx, tblNameMapping, config.FlowJobName) + setupFlowExecution := NewSetupFlowExecution(ctx, flowJobName) // execute the setup flow - setupFlowOutput, err := setupFlowExecution.executeSetupFlow(ctx, config) + setupFlowOutput, err := setupFlowExecution.executeSetupFlow(ctx, flowJobName) if err != nil { return nil, fmt.Errorf("failed to execute setup flow: %w", err) } diff --git a/flow/workflows/snapshot_flow.go b/flow/workflows/snapshot_flow.go index ca62fe5c8c..1d873983c3 100644 --- a/flow/workflows/snapshot_flow.go +++ b/flow/workflows/snapshot_flow.go @@ -27,8 +27,9 @@ const ( ) type SnapshotFlowExecution struct { - config *protos.FlowConnectionConfigs - logger log.Logger + FlowJobName string + logger log.Logger + AdditionalTables []*protos.TableMapping } func getPeerType(wCtx workflow.Context, name string) (protos.DBType, error) { @@ -44,7 +45,7 @@ func getPeerType(wCtx workflow.Context, name string) (protos.DBType, error) { func (s *SnapshotFlowExecution) setupReplication( ctx workflow.Context, ) (*protos.SetupReplicationOutput, error) { - flowName := s.config.FlowJobName + flowName := s.FlowJobName s.logger.Info("setting up replication on source for peer flow") ctx = workflow.WithActivityOptions(ctx, workflow.ActivityOptions{ @@ -55,19 +56,24 @@ func (s *SnapshotFlowExecution) setupReplication( }, }) - tblNameMapping := make(map[string]string, len(s.config.TableMappings)) - for _, v := range s.config.TableMappings { + config, err := internal.FetchConfigFromDB(s.FlowJobName) + if err != nil { + return nil, fmt.Errorf("unable to fetch config from DB for flow-job-name %s; err : %w", s.FlowJobName, err) + } + + tblNameMapping := make(map[string]string, len(s.AdditionalTables)) + for _, v := range s.AdditionalTables { tblNameMapping[v.SourceTableIdentifier] = v.DestinationTableIdentifier } setupReplicationInput := &protos.SetupReplicationInput{ - PeerName: s.config.SourceName, + PeerName: config.SourceName, FlowJobName: flowName, TableNameMapping: tblNameMapping, - DoInitialSnapshot: s.config.DoInitialSnapshot, - ExistingPublicationName: s.config.PublicationName, - ExistingReplicationSlotName: s.config.ReplicationSlotName, - Env: s.config.Env, + DoInitialSnapshot: config.DoInitialSnapshot, + ExistingPublicationName: config.PublicationName, + ExistingReplicationSlotName: config.ReplicationSlotName, + Env: config.Env, } var res *protos.SetupReplicationOutput @@ -92,7 +98,7 @@ func (s *SnapshotFlowExecution) closeSlotKeepAlive( }, }) - if err := workflow.ExecuteActivity(ctx, snapshot.CloseSlotKeepAlive, s.config.FlowJobName).Get(ctx, nil); err != nil { + if err := workflow.ExecuteActivity(ctx, snapshot.CloseSlotKeepAlive, s.FlowJobName).Get(ctx, nil); err != nil { return fmt.Errorf("failed to close slot keep alive for peer flow: %w", err) } @@ -107,7 +113,7 @@ func (s *SnapshotFlowExecution) cloneTable( snapshotName string, mapping *protos.TableMapping, ) error { - flowName := s.config.FlowJobName + flowName := s.FlowJobName cloneLog := slog.Group("clone-log", slog.String(string(shared.FlowNameKey), flowName), slog.String("snapshotName", snapshotName)) @@ -146,7 +152,7 @@ func (s *SnapshotFlowExecution) cloneTable( return workflow.ExecuteActivity( schemaCtx, snapshot.LoadTableSchema, - s.config.FlowJobName, + s.FlowJobName, dstName, ).Get(ctx, &tableSchema) } @@ -173,25 +179,30 @@ func (s *SnapshotFlowExecution) cloneTable( // usually MySQL supports double quotes with ANSI_QUOTES, but Vitess doesn't // Vitess currently only supports initial load so change here is enough srcTableEscaped := parsedSrcTable.String() - if dbtype, err := getPeerType(ctx, s.config.SourceName); err != nil { + config, err := internal.FetchConfigFromDB(s.FlowJobName) + if err != nil { + return fmt.Errorf("unable to fetch config from DB: %w", err) + } + + if dbtype, err := getPeerType(ctx, config.SourceName); err != nil { return err } else if dbtype == protos.DBType_MYSQL { srcTableEscaped = parsedSrcTable.MySQL() } numWorkers := uint32(8) - if s.config.SnapshotMaxParallelWorkers > 0 { - numWorkers = s.config.SnapshotMaxParallelWorkers + if config.SnapshotMaxParallelWorkers > 0 { + numWorkers = config.SnapshotMaxParallelWorkers } numRowsPerPartition := uint32(250000) - if s.config.SnapshotNumRowsPerPartition > 0 { - numRowsPerPartition = s.config.SnapshotNumRowsPerPartition + if config.SnapshotNumRowsPerPartition > 0 { + numRowsPerPartition = config.SnapshotNumRowsPerPartition } numPartitionsOverride := uint32(0) - if s.config.SnapshotNumPartitionsOverride > 0 { - numPartitionsOverride = s.config.SnapshotNumPartitionsOverride + if config.SnapshotNumPartitionsOverride > 0 { + numPartitionsOverride = config.SnapshotNumPartitionsOverride } snapshotWriteMode := &protos.QRepWriteMode{ @@ -208,7 +219,7 @@ func (s *SnapshotFlowExecution) cloneTable( // ensure document IDs are synchronized across initial load and CDC // for the same document - if dbtype, err := getPeerType(ctx, s.config.DestinationName); err != nil { + if dbtype, err := getPeerType(ctx, config.DestinationName); err != nil { return err } else if dbtype == protos.DBType_ELASTICSEARCH { if err := initTableSchema(); err != nil { @@ -220,10 +231,10 @@ func (s *SnapshotFlowExecution) cloneTable( } } - config := &protos.QRepConfig{ + qRepConfig := &protos.QRepConfig{ FlowJobName: childWorkflowID, - SourceName: s.config.SourceName, - DestinationName: s.config.DestinationName, + SourceName: config.SourceName, + DestinationName: config.DestinationName, Query: query, WatermarkColumn: mapping.PartitionKey, WatermarkTable: srcName, @@ -233,20 +244,20 @@ func (s *SnapshotFlowExecution) cloneTable( NumRowsPerPartition: numRowsPerPartition, NumPartitionsOverride: numPartitionsOverride, MaxParallelWorkers: numWorkers, - StagingPath: s.config.SnapshotStagingPath, - SyncedAtColName: s.config.SyncedAtColName, - SoftDeleteColName: s.config.SoftDeleteColName, + StagingPath: config.SnapshotStagingPath, + SyncedAtColName: config.SyncedAtColName, + SoftDeleteColName: config.SoftDeleteColName, WriteMode: snapshotWriteMode, - System: s.config.System, - Script: s.config.Script, - Env: s.config.Env, + System: config.System, + Script: config.Script, + Env: config.Env, ParentMirrorName: flowName, Exclude: mapping.Exclude, Columns: mapping.Columns, - Version: s.config.Version, + Version: config.Version, } - return boundSelector.SpawnChild(childCtx, QRepFlowWorkflow, nil, config, nil) + return boundSelector.SpawnChild(childCtx, QRepFlowWorkflow, nil, qRepConfig, nil) } func (s *SnapshotFlowExecution) cloneTables( @@ -269,15 +280,19 @@ func (s *SnapshotFlowExecution) cloneTables( }, }) + config, err := internal.FetchConfigFromDB(s.FlowJobName) + if err != nil { + return fmt.Errorf("unable to query flow config from catalog: %w", err) + } + var res *protos.GetDefaultPartitionKeyForTablesOutput if err := workflow.ExecuteActivity(getParallelLoadKeyForTablesCtx, - snapshot.GetDefaultPartitionKeyForTables, s.config).Get(ctx, &res); err != nil { + snapshot.GetDefaultPartitionKeyForTables, config).Get(ctx, &res); err != nil { return fmt.Errorf("failed to get default partition keys for tables: %w", err) } boundSelector := shared.NewBoundSelector(ctx, "CloneTablesSelector", maxParallelClones) - - for _, v := range s.config.TableMappings { + for _, v := range s.AdditionalTables { source := v.SourceTableIdentifier destination := v.DestinationTableIdentifier s.logger.Info( @@ -341,10 +356,17 @@ func (s *SnapshotFlowExecution) cloneTablesWithSlot( func SnapshotFlowWorkflow( ctx workflow.Context, - config *protos.FlowConnectionConfigs, + flowJobName string, + additionalTables []*protos.TableMapping, ) error { + config, err := internal.FetchConfigFromDB(flowJobName) + if err != nil { + return fmt.Errorf("unable to fetch config from DB: %w", err) + } + se := &SnapshotFlowExecution{ - config: config, + FlowJobName: flowJobName, + AdditionalTables: additionalTables, logger: log.With(workflow.GetLogger(ctx), slog.String(string(shared.FlowNameKey), config.FlowJobName), slog.String("sourcePeer", config.SourceName)), diff --git a/protos/flow.proto b/protos/flow.proto index f6c4796416..a39d04a85a 100644 --- a/protos/flow.proto +++ b/protos/flow.proto @@ -82,6 +82,7 @@ message FlowConnectionConfigs { map env = 24; uint32 version = 25; + map src_table_id_name_mapping = 27; } message RenameTableOption { @@ -120,11 +121,9 @@ message CreateTablesFromExistingOutput { } message SyncFlowOptions { - reserved 5; + reserved 4,5,6; uint32 batch_size = 1; uint64 idle_timeout_seconds = 3; - map src_table_id_name_mapping = 4; - repeated TableMapping table_mappings = 6; int32 number_of_syncs = 7; } @@ -188,13 +187,13 @@ message FieldDescription { } message SetupTableSchemaBatchInput { - reserved 2; + reserved 2,6; map env = 1; string flow_name = 3; TypeSystem system = 4; string peer_name = 5; - repeated TableMapping table_mappings = 6; uint32 version = 7; + repeated TableMapping filtered_table_mappings = 8; } message SetupNormalizedTableBatchInput { From 0b42cca63fdf02ee40a004c2624d96418bf3f3a1 Mon Sep 17 00:00:00 2001 From: Stoica Alexandru Date: Mon, 15 Sep 2025 18:54:18 +0100 Subject: [PATCH 2/2] rebase & addr feedback --- flow/activities/flowable.go | 13 +++--- flow/activities/maintenance_activity.go | 11 ++--- flow/cmd/handler.go | 2 +- flow/cmd/mirror_status.go | 18 +------ flow/internal/flow_configuration_helpers.go | 8 ++++ flow/workflows/cdc_flow.go | 52 ++++++++++----------- flow/workflows/drop_flow.go | 3 +- 7 files changed, 48 insertions(+), 59 deletions(-) diff --git a/flow/activities/flowable.go b/flow/activities/flowable.go index 2229dc0619..e2abdd3400 100644 --- a/flow/activities/flowable.go +++ b/flow/activities/flowable.go @@ -1077,13 +1077,12 @@ func (a *FlowableActivity) emitLogRetentionHours( } var activeFlowStatuses = map[protos.FlowStatus]struct{}{ - protos.FlowStatus_STATUS_RUNNING: {}, - protos.FlowStatus_STATUS_PAUSED: {}, - protos.FlowStatus_STATUS_PAUSING: {}, - protos.FlowStatus_STATUS_SETUP: {}, - protos.FlowStatus_STATUS_SNAPSHOT: {}, - protos.FlowStatus_STATUS_RESYNC: {}, - protos.FlowStatus_STATUS_MODIFYING: {}, + protos.FlowStatus_STATUS_RUNNING: {}, + protos.FlowStatus_STATUS_PAUSED: {}, + protos.FlowStatus_STATUS_PAUSING: {}, + protos.FlowStatus_STATUS_SETUP: {}, + protos.FlowStatus_STATUS_SNAPSHOT: {}, + protos.FlowStatus_STATUS_RESYNC: {}, } func (a *FlowableActivity) QRepHasNewRows(ctx context.Context, diff --git a/flow/activities/maintenance_activity.go b/flow/activities/maintenance_activity.go index a93b351ec0..0a745d6ef9 100644 --- a/flow/activities/maintenance_activity.go +++ b/flow/activities/maintenance_activity.go @@ -94,12 +94,11 @@ func (a *MaintenanceActivity) WaitForRunningSnapshotsAndIntermediateStates( } var waitStatuses map[protos.FlowStatus]struct{} = map[protos.FlowStatus]struct{}{ - protos.FlowStatus_STATUS_SNAPSHOT: {}, - protos.FlowStatus_STATUS_SETUP: {}, - protos.FlowStatus_STATUS_RESYNC: {}, - protos.FlowStatus_STATUS_UNKNOWN: {}, - protos.FlowStatus_STATUS_PAUSING: {}, - protos.FlowStatus_STATUS_MODIFYING: {}, + protos.FlowStatus_STATUS_SNAPSHOT: {}, + protos.FlowStatus_STATUS_SETUP: {}, + protos.FlowStatus_STATUS_RESYNC: {}, + protos.FlowStatus_STATUS_UNKNOWN: {}, + protos.FlowStatus_STATUS_PAUSING: {}, } func (a *MaintenanceActivity) checkAndWaitIfNeeded( diff --git a/flow/cmd/handler.go b/flow/cmd/handler.go index 8058d8c92e..62bfa626b8 100644 --- a/flow/cmd/handler.go +++ b/flow/cmd/handler.go @@ -158,7 +158,7 @@ func (h *FlowRequestHandler) CreateCDCFlow( return nil, exceptions.NewInternalApiError(fmt.Errorf("unable to create flow job entry: %w", err)) } - if _, err := h.temporalClient.ExecuteWorkflow(ctx, workflowOptions, peerflow.CDCFlowWorkflow, cfg, nil); err != nil { + if _, err := h.temporalClient.ExecuteWorkflow(ctx, workflowOptions, peerflow.CDCFlowWorkflow, internal.CreateMinimalConfigFromFlowJobName(cfg.FlowJobName), nil); err != nil { slog.ErrorContext(ctx, "unable to start PeerFlow workflow", slog.Any("error", err)) return nil, exceptions.NewInternalApiError(fmt.Errorf("unable to start PeerFlow workflow: %w", err)) } diff --git a/flow/cmd/mirror_status.go b/flow/cmd/mirror_status.go index e721ace2d2..8b3837bd76 100644 --- a/flow/cmd/mirror_status.go +++ b/flow/cmd/mirror_status.go @@ -148,23 +148,7 @@ func (h *FlowRequestHandler) cdcFlowStatus( slog.ErrorContext(ctx, "unable to query flow config from catalog", slog.Any("error", err)) return nil, err } - workflowID, err := h.getWorkflowID(ctx, req.FlowJobName) - if err != nil { - slog.ErrorContext(ctx, "unable to get the workflow ID of mirror", slog.Any("error", err)) - return nil, err - } - state, err := h.getCDCWorkflowState(ctx, workflowID) - if err != nil { - slog.ErrorContext(ctx, "unable to get the state of mirror", slog.Any("error", err)) - return nil, err - } - - // patching config to show latest values from state - if state.SyncFlowOptions != nil { - config.IdleTimeoutSeconds = state.SyncFlowOptions.IdleTimeoutSeconds - config.MaxBatchSize = state.SyncFlowOptions.BatchSize - config.TableMappings = state.SyncFlowOptions.TableMappings - } + // The config is now always sourced from DB, so no state patching needed srcType, err := connectors.LoadPeerType(ctx, h.pool, config.SourceName) if err != nil { diff --git a/flow/internal/flow_configuration_helpers.go b/flow/internal/flow_configuration_helpers.go index beb2f275ea..992d86825d 100644 --- a/flow/internal/flow_configuration_helpers.go +++ b/flow/internal/flow_configuration_helpers.go @@ -36,3 +36,11 @@ func FetchConfigFromDB(flowName string) (*protos.FlowConnectionConfigs, error) { return &cfgFromDB, nil } + +// CreateMinimalConfigFromFlowJobName creates a minimal FlowConnectionConfigs with just the FlowJobName set. +// This is used when calling CDCFlowWorkflow which will fetch the full config from DB internally. +func CreateMinimalConfigFromFlowJobName(flowJobName string) *protos.FlowConnectionConfigs { + return &protos.FlowConnectionConfigs{ + FlowJobName: flowJobName, + } +} diff --git a/flow/workflows/cdc_flow.go b/flow/workflows/cdc_flow.go index 2172936a3f..abc7b236ff 100644 --- a/flow/workflows/cdc_flow.go +++ b/flow/workflows/cdc_flow.go @@ -240,21 +240,22 @@ func processTableAdditions( additionalTablesUUID := GetUUID(ctx) childAdditionalTablesCDCFlowID := GetChildWorkflowID("additional-cdc-flow", cfg.FlowJobName, additionalTablesUUID) additionalTablesCfg := proto.CloneOf(cfg) - additionalTablesCfg.DoInitialSnapshot = !flowConfigUpdate.SkipInitialSnapshotForTableAdditions + // Default to doing initial snapshot for additional tables + additionalTablesCfg.DoInitialSnapshot = true additionalTablesCfg.InitialSnapshotOnly = true additionalTablesCfg.TableMappings = append(additionalTablesCfg.TableMappings, flowConfigUpdate.AdditionalTables...) additionalTablesCfg.Resync = false - if state.SnapshotNumRowsPerPartition > 0 { - additionalTablesCfg.SnapshotNumRowsPerPartition = state.SnapshotNumRowsPerPartition + if flowConfigUpdate.SnapshotNumRowsPerPartition > 0 { + additionalTablesCfg.SnapshotNumRowsPerPartition = flowConfigUpdate.SnapshotNumRowsPerPartition } - if state.SnapshotNumPartitionsOverride > 0 { - additionalTablesCfg.SnapshotNumPartitionsOverride = state.SnapshotNumPartitionsOverride + if flowConfigUpdate.SnapshotNumPartitionsOverride > 0 { + additionalTablesCfg.SnapshotNumPartitionsOverride = flowConfigUpdate.SnapshotNumPartitionsOverride } - if state.SnapshotMaxParallelWorkers > 0 { - additionalTablesCfg.SnapshotMaxParallelWorkers = state.SnapshotMaxParallelWorkers + if flowConfigUpdate.SnapshotMaxParallelWorkers > 0 { + additionalTablesCfg.SnapshotMaxParallelWorkers = flowConfigUpdate.SnapshotMaxParallelWorkers } - if state.SnapshotNumTablesInParallel > 0 { - additionalTablesCfg.SnapshotNumTablesInParallel = state.SnapshotNumTablesInParallel + if flowConfigUpdate.SnapshotNumTablesInParallel > 0 { + additionalTablesCfg.SnapshotNumTablesInParallel = flowConfigUpdate.SnapshotNumTablesInParallel } uploadConfigToCatalog(ctx, additionalTablesCfg) @@ -274,7 +275,7 @@ func processTableAdditions( childAddTablesCDCFlowFuture := workflow.ExecuteChildWorkflow( childAddTablesCDCFlowCtx, CDCFlowWorkflow, - additionalTablesCfg.FlowJobName, + internal.CreateMinimalConfigFromFlowJobName(additionalTablesCfg.FlowJobName), nil, // nil is passed to trigger `setup` flow. ) addTablesSelector.AddFuture(childAddTablesCDCFlowFuture, func(f workflow.Future) { @@ -283,15 +284,10 @@ func processTableAdditions( } }) - // additional tables should also be resynced, we don't know how much was done so far - // state.SyncFlowOptions.TableMappings = append(state.SyncFlowOptions.TableMappings, flowConfigUpdate.AdditionalTables...) - for res == nil { addTablesSelector.Select(ctx) if state.ActiveSignal == model.TerminateSignal || state.ActiveSignal == model.ResyncSignal { if state.ActiveSignal == model.ResyncSignal { - // additional tables should also be resynced, we don't know how much was done so far - // state.SyncFlowOptions.TableMappings = append(state.SyncFlowOptions.TableMappings, flowConfigUpdate.AdditionalTables...) resyncCfg := syncStateToConfigProtoInCatalog(ctx, cfg, state.FlowConfigUpdate) state.DropFlowInput.FlowJobName = resyncCfg.FlowJobName state.DropFlowInput.FlowConnectionConfigs = resyncCfg @@ -435,17 +431,19 @@ func addCdcPropertiesSignalListener( func CDCFlowWorkflow( ctx workflow.Context, - flowJobName string, - // cfg *protos.FlowConnectionConfigs, + cfg *protos.FlowConnectionConfigs, state *CDCFlowWorkflowState, ) (*CDCFlowWorkflowResult, error) { - cfg, err := internal.FetchConfigFromDB(flowJobName) - if cfg == nil { - return nil, errors.New("invalid connection configs") - } + // Fetch full config from DB using the flowJobName from the minimal config + cfgFromDB, err := internal.FetchConfigFromDB(cfg.FlowJobName) if err != nil { - return nil, fmt.Errorf("unable to unmarshal flow config: %w", err) + return nil, fmt.Errorf("unable to fetch flow config from DB: %w", err) + } + if cfgFromDB == nil { + return nil, errors.New("invalid connection configs") } + // Use the config from DB for all operations + cfg = cfgFromDB logger := log.With(workflow.GetLogger(ctx), slog.String(string(shared.FlowNameKey), cfg.FlowJobName)) if state == nil { @@ -533,7 +531,7 @@ func CDCFlowWorkflow( logger.Info("mirror resumed", slog.Duration("after", time.Since(startTime))) state.updateStatus(ctx, logger, protos.FlowStatus_STATUS_RUNNING) - return state, workflow.NewContinueAsNewError(ctx, CDCFlowWorkflow, cfg.FlowJobName, state) + return state, workflow.NewContinueAsNewError(ctx, CDCFlowWorkflow, internal.CreateMinimalConfigFromFlowJobName(cfg.FlowJobName), state) } originalRunID := workflow.GetInfo(ctx).OriginalRunID @@ -678,7 +676,7 @@ func CDCFlowWorkflow( } } - // TODOAS: here we will also store the table mappings in the state. + // here we will also store the table mappings in the state. maps.Copy(cfg.SrcTableIdNameMapping, setupFlowOutput.SrcTableIdNameMapping) uploadConfigToCatalog(ctx, cfg) @@ -701,7 +699,7 @@ func CDCFlowWorkflow( // during any operation that triggers another snapshot (INCLUDING add tables). // this could fail for very weird Temporal resets - // TODOAS : this will send the additionalTables to `temporal`, meaning + // This will send the additionalTables to `temporal`, meaning // that we cannot add too many tables at once, or we risk the blob is too // large (2MB limit). snapshotFlowFuture := workflow.ExecuteChildWorkflow( @@ -803,7 +801,7 @@ func CDCFlowWorkflow( logger.Info("executed setup flow and snapshot flow, start running") state.updateStatus(ctx, logger, protos.FlowStatus_STATUS_RUNNING) } - return state, workflow.NewContinueAsNewError(ctx, CDCFlowWorkflow, cfg.FlowJobName, state) + return state, workflow.NewContinueAsNewError(ctx, CDCFlowWorkflow, internal.CreateMinimalConfigFromFlowJobName(cfg.FlowJobName), state) } var finished bool @@ -933,7 +931,7 @@ func CDCFlowWorkflow( if state.ActiveSignal == model.TerminateSignal || state.ActiveSignal == model.ResyncSignal { return state, workflow.NewContinueAsNewError(ctx, DropFlowWorkflow, state.DropFlowInput) } - return state, workflow.NewContinueAsNewError(ctx, CDCFlowWorkflow, cfg.FlowJobName, state) + return state, workflow.NewContinueAsNewError(ctx, CDCFlowWorkflow, internal.CreateMinimalConfigFromFlowJobName(cfg.FlowJobName), state) } } } diff --git a/flow/workflows/drop_flow.go b/flow/workflows/drop_flow.go index f586b2a40d..8893df8da7 100644 --- a/flow/workflows/drop_flow.go +++ b/flow/workflows/drop_flow.go @@ -11,6 +11,7 @@ import ( "go.temporal.io/sdk/workflow" "github.com/PeerDB-io/peerdb/flow/generated/protos" + "github.com/PeerDB-io/peerdb/flow/internal" "github.com/PeerDB-io/peerdb/flow/model" "github.com/PeerDB-io/peerdb/flow/shared" ) @@ -211,7 +212,7 @@ func DropFlowWorkflow(ctx workflow.Context, input *protos.DropFlowInput) error { } if input.Resync { - return workflow.NewContinueAsNewError(ctx, CDCFlowWorkflow, input.FlowConnectionConfigs, nil) + return workflow.NewContinueAsNewError(ctx, CDCFlowWorkflow, internal.CreateMinimalConfigFromFlowJobName(input.FlowConnectionConfigs.FlowJobName), nil) } return nil