Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
68 changes: 51 additions & 17 deletions flow/activities/flowable.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,9 @@ import (
"errors"
"fmt"
"log/slog"
"maps"
"os"
"slices"
"strconv"
"sync/atomic"
"time"
Expand Down Expand Up @@ -124,6 +126,12 @@ func (a *FlowableActivity) EnsurePullability(
}
defer connectors.CloseConnector(ctx, srcConn)

cfg, err := internal.FetchConfigFromDB(config.FlowJobName)
if err != nil {
return nil, a.Alerter.LogFlowError(ctx, config.FlowJobName, fmt.Errorf("failed to fetch config from DB: %w", err))
}

config.SourceTableIdentifiers = slices.Sorted(maps.Keys(internal.TableNameMapping(cfg.TableMappings)))
output, err := srcConn.EnsurePullability(ctx, config)
if err != nil {
return nil, a.Alerter.LogFlowError(ctx, config.FlowJobName, fmt.Errorf("failed to ensure pullability: %w", err))
Expand Down Expand Up @@ -165,6 +173,18 @@ func (a *FlowableActivity) SetupTableSchema(
})
defer shutdown()

// have to fetch config from the DB
cfg, err := internal.FetchConfigFromDB(config.FlowName)
if err != nil {
return a.Alerter.LogFlowError(ctx, config.FlowName, fmt.Errorf("failed to fetch config from DB: %w", err))
}
tableMappings := cfg.TableMappings
if len(config.FilteredTableMappings) > 0 {
// we use the filtered table mappings if provided. they are provided from
// the sync flow which includes changes to the schema.
tableMappings = config.FilteredTableMappings
}

logger := internal.LoggerFromCtx(ctx)
ctx = context.WithValue(ctx, shared.FlowNameKey, config.FlowName)
srcConn, err := connectors.GetByNameAs[connectors.GetTableSchemaConnector](ctx, config.Env, a.CatalogPool, config.PeerName)
Expand All @@ -173,11 +193,11 @@ func (a *FlowableActivity) SetupTableSchema(
}
defer connectors.CloseConnector(ctx, srcConn)

tableNameSchemaMapping, err := srcConn.GetTableSchema(ctx, config.Env, config.Version, config.System, config.TableMappings)
tableNameSchemaMapping, err := srcConn.GetTableSchema(ctx, config.Env, config.Version, config.System, tableMappings)
if err != nil {
return a.Alerter.LogFlowError(ctx, config.FlowName, fmt.Errorf("failed to get GetTableSchemaConnector: %w", err))
}
processed := internal.BuildProcessedSchemaMapping(config.TableMappings, tableNameSchemaMapping, logger)
processed := internal.BuildProcessedSchemaMapping(tableMappings, tableNameSchemaMapping, logger)

tx, err := a.CatalogPool.BeginTx(ctx, pgx.TxOptions{})
if err != nil {
Expand Down Expand Up @@ -213,6 +233,13 @@ func (a *FlowableActivity) CreateNormalizedTable(
numTablesSetup := atomic.Uint32{}
numTablesToSetup := atomic.Int32{}

cfg, err := internal.FetchConfigFromDB(config.FlowName)
if err != nil {
return nil, a.Alerter.LogFlowError(ctx, config.FlowName, fmt.Errorf("failed to fetch config from DB: %w", err))
}

tableMappings := cfg.TableMappings

shutdown := heartbeatRoutine(ctx, func() string {
return fmt.Sprintf("setting up normalized tables - %d of %d done", numTablesSetup.Load(), numTablesToSetup.Load())
})
Expand Down Expand Up @@ -244,7 +271,7 @@ func (a *FlowableActivity) CreateNormalizedTable(

numTablesToSetup.Store(int32(len(tableNameSchemaMapping)))
tableExistsMapping := make(map[string]bool, len(tableNameSchemaMapping))
for _, tableMapping := range config.TableMappings {
for _, tableMapping := range tableMappings {
tableIdentifier := tableMapping.DestinationTableIdentifier
tableSchema := tableNameSchemaMapping[tableIdentifier]
existing, err := conn.SetupNormalizedTable(
Expand Down Expand Up @@ -292,6 +319,12 @@ func (a *FlowableActivity) SyncFlow(
var syncingBatchID atomic.Int64
var syncState atomic.Pointer[string]
syncState.Store(shared.Ptr("setup"))

config, err := internal.FetchConfigFromDB(config.FlowJobName)
if err != nil {
return fmt.Errorf("unable to query flow config from catalog: %w", err)
}

shutdown := heartbeatRoutine(ctx, func() string {
// Must load Waiting after BatchID to avoid race saying we're waiting on currently processing batch
sBatchID := syncingBatchID.Load()
Expand Down Expand Up @@ -363,10 +396,10 @@ func (a *FlowableActivity) SyncFlow(
var syncResponse *model.SyncResponse
var syncErr error
if config.System == protos.TypeSystem_Q {
syncResponse, syncErr = a.syncRecords(groupCtx, config, options, srcConn.(connectors.CDCPullConnector),
syncResponse, syncErr = a.syncRecords(groupCtx, config, srcConn.(connectors.CDCPullConnector),
normRequests, normResponses, normBufferSize, &syncingBatchID, &syncState)
} else {
syncResponse, syncErr = a.syncPg(groupCtx, config, options, srcConn.(connectors.CDCPullPgConnector),
syncResponse, syncErr = a.syncPg(groupCtx, config, srcConn.(connectors.CDCPullPgConnector),
normRequests, normResponses, normBufferSize, &syncingBatchID, &syncState)
}

Expand Down Expand Up @@ -414,7 +447,6 @@ func (a *FlowableActivity) SyncFlow(
func (a *FlowableActivity) syncRecords(
ctx context.Context,
config *protos.FlowConnectionConfigs,
options *protos.SyncFlowOptions,
srcConn connectors.CDCPullConnector,
normRequests *concurrency.LastChan,
normResponses *concurrency.LastChan,
Expand Down Expand Up @@ -451,7 +483,7 @@ func (a *FlowableActivity) syncRecords(
return stream, nil
}
}
return syncCore(ctx, a, config, options, srcConn,
return syncCore(ctx, a, config, srcConn,
normRequests, normResponses, normBufferSize,
syncingBatchID, syncWaiting, adaptStream,
connectors.CDCPullConnector.PullRecords,
Expand All @@ -461,15 +493,14 @@ func (a *FlowableActivity) syncRecords(
func (a *FlowableActivity) syncPg(
ctx context.Context,
config *protos.FlowConnectionConfigs,
options *protos.SyncFlowOptions,
srcConn connectors.CDCPullPgConnector,
normRequests *concurrency.LastChan,
normResponses *concurrency.LastChan,
normBufferSize int64,
syncingBatchID *atomic.Int64,
syncWaiting *atomic.Pointer[string],
) (*model.SyncResponse, error) {
return syncCore(ctx, a, config, options, srcConn,
return syncCore(ctx, a, config, srcConn,
normRequests, normResponses, normBufferSize,
syncingBatchID, syncWaiting, nil,
connectors.CDCPullPgConnector.PullPg,
Expand Down Expand Up @@ -1046,13 +1077,12 @@ func (a *FlowableActivity) emitLogRetentionHours(
}

var activeFlowStatuses = map[protos.FlowStatus]struct{}{
protos.FlowStatus_STATUS_RUNNING: {},
protos.FlowStatus_STATUS_PAUSED: {},
protos.FlowStatus_STATUS_PAUSING: {},
protos.FlowStatus_STATUS_SETUP: {},
protos.FlowStatus_STATUS_SNAPSHOT: {},
protos.FlowStatus_STATUS_RESYNC: {},
protos.FlowStatus_STATUS_MODIFYING: {},
protos.FlowStatus_STATUS_RUNNING: {},
protos.FlowStatus_STATUS_PAUSED: {},
protos.FlowStatus_STATUS_PAUSING: {},
protos.FlowStatus_STATUS_SETUP: {},
protos.FlowStatus_STATUS_SNAPSHOT: {},
protos.FlowStatus_STATUS_RESYNC: {},
}

func (a *FlowableActivity) QRepHasNewRows(ctx context.Context,
Expand Down Expand Up @@ -1230,9 +1260,13 @@ func (a *FlowableActivity) ReplicateXminPartition(ctx context.Context,
}
}

func (a *FlowableActivity) AddTablesToPublication(ctx context.Context, cfg *protos.FlowConnectionConfigs,
func (a *FlowableActivity) AddTablesToPublication(ctx context.Context, flowJobName string,
additionalTableMappings []*protos.TableMapping,
) error {
cfg, err := internal.FetchConfigFromDB(flowJobName)
if err != nil {
return fmt.Errorf("unable to query flow config from catalog: %w", err)
}
ctx = context.WithValue(ctx, shared.FlowNameKey, cfg.FlowJobName)
srcConn, err := connectors.GetByNameAs[*connpostgres.PostgresConnector](ctx, cfg.Env, a.CatalogPool, cfg.SourceName)
if err != nil {
Expand Down
35 changes: 17 additions & 18 deletions flow/activities/flowable_core.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,11 +80,10 @@ func (a *FlowableActivity) getTableNameSchemaMapping(ctx context.Context, flowNa
func (a *FlowableActivity) applySchemaDeltas(
ctx context.Context,
config *protos.FlowConnectionConfigs,
options *protos.SyncFlowOptions,
schemaDeltas []*protos.TableSchemaDelta,
) error {
filteredTableMappings := make([]*protos.TableMapping, 0, len(schemaDeltas))
for _, tableMapping := range options.TableMappings {
for _, tableMapping := range config.TableMappings {
if slices.ContainsFunc(schemaDeltas, func(schemaDelta *protos.TableSchemaDelta) bool {
return schemaDelta.SrcTableName == tableMapping.SourceTableIdentifier &&
schemaDelta.DstTableName == tableMapping.DestinationTableIdentifier
Expand All @@ -95,12 +94,12 @@ func (a *FlowableActivity) applySchemaDeltas(

if len(schemaDeltas) > 0 {
if err := a.SetupTableSchema(ctx, &protos.SetupTableSchemaBatchInput{
PeerName: config.SourceName,
TableMappings: filteredTableMappings,
FlowName: config.FlowJobName,
System: config.System,
Env: config.Env,
Version: config.Version,
PeerName: config.SourceName,
FilteredTableMappings: filteredTableMappings,
FlowName: config.FlowJobName,
System: config.System,
Env: config.Env,
Version: config.Version,
}); err != nil {
return a.Alerter.LogFlowError(ctx, config.FlowJobName, fmt.Errorf("failed to execute schema update at source: %w", err))
}
Expand All @@ -112,7 +111,6 @@ func syncCore[TPull connectors.CDCPullConnectorCore, TSync connectors.CDCSyncCon
ctx context.Context,
a *FlowableActivity,
config *protos.FlowConnectionConfigs,
options *protos.SyncFlowOptions,
srcConn TPull,
normRequests *concurrency.LastChan,
normResponses *concurrency.LastChan,
Expand All @@ -127,16 +125,17 @@ func syncCore[TPull connectors.CDCPullConnectorCore, TSync connectors.CDCSyncCon
ctx = context.WithValue(ctx, shared.FlowNameKey, flowName)
logger := internal.LoggerFromCtx(ctx)

tblNameMapping := make(map[string]model.NameAndExclude, len(options.TableMappings))
for _, v := range options.TableMappings {
// we should be able to rely on `config` here.
tblNameMapping := make(map[string]model.NameAndExclude, len(config.TableMappings))
for _, v := range config.TableMappings {
tblNameMapping[v.SourceTableIdentifier] = model.NewNameAndExclude(v.DestinationTableIdentifier, v.Exclude)
}

if err := srcConn.ConnectionActive(ctx); err != nil {
return nil, temporal.NewNonRetryableApplicationError("connection to source down", "disconnect", nil)
}

batchSize := options.BatchSize
batchSize := config.MaxBatchSize
if batchSize == 0 {
batchSize = 250_000
}
Expand Down Expand Up @@ -190,13 +189,13 @@ func syncCore[TPull connectors.CDCPullConnectorCore, TSync connectors.CDCSyncCon
errGroup.Go(func() error {
return pull(srcConn, errCtx, a.CatalogPool, a.OtelManager, &model.PullRecordsRequest[Items]{
FlowJobName: flowName,
SrcTableIDNameMapping: options.SrcTableIdNameMapping,
SrcTableIDNameMapping: config.SrcTableIdNameMapping,
TableNameMapping: tblNameMapping,
LastOffset: lastOffset,
ConsumedOffset: &consumedOffset,
MaxBatchSize: batchSize,
IdleTimeout: internal.PeerDBCDCIdleTimeoutSeconds(
int(options.IdleTimeoutSeconds),
int(config.IdleTimeoutSeconds),
),
TableNameSchemaMapping: tableNameSchemaMapping,
OverridePublicationName: config.PublicationName,
Expand Down Expand Up @@ -234,11 +233,11 @@ func syncCore[TPull connectors.CDCPullConnectorCore, TSync connectors.CDCSyncCon
defer connectors.CloseConnector(ctx, dstConn)

syncState.Store(shared.Ptr("updating schema"))
if err := dstConn.ReplayTableSchemaDeltas(ctx, config.Env, flowName, options.TableMappings, recordBatchSync.SchemaDeltas); err != nil {
if err := dstConn.ReplayTableSchemaDeltas(ctx, config.Env, flowName, config.TableMappings, recordBatchSync.SchemaDeltas); err != nil {
return nil, fmt.Errorf("failed to sync schema: %w", err)
}

return nil, a.applySchemaDeltas(ctx, config, options, recordBatchSync.SchemaDeltas)
return nil, a.applySchemaDeltas(ctx, config, recordBatchSync.SchemaDeltas)
}

var res *model.SyncResponse
Expand Down Expand Up @@ -271,7 +270,7 @@ func syncCore[TPull connectors.CDCPullConnectorCore, TSync connectors.CDCSyncCon
Records: recordBatchSync,
ConsumedOffset: &consumedOffset,
FlowJobName: flowName,
TableMappings: options.TableMappings,
TableMappings: config.TableMappings,
StagingPath: config.CdcStagingPath,
Script: config.Script,
TableNameSchemaMapping: tableNameSchemaMapping,
Expand Down Expand Up @@ -334,7 +333,7 @@ func syncCore[TPull connectors.CDCPullConnectorCore, TSync connectors.CDCSyncCon
a.OtelManager.Metrics.CurrentBatchIdGauge.Record(ctx, res.CurrentSyncBatchID)

syncState.Store(shared.Ptr("updating schema"))
if err := a.applySchemaDeltas(ctx, config, options, res.TableSchemaDeltas); err != nil {
if err := a.applySchemaDeltas(ctx, config, res.TableSchemaDeltas); err != nil {
return nil, err
}

Expand Down
11 changes: 5 additions & 6 deletions flow/activities/maintenance_activity.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,12 +94,11 @@ func (a *MaintenanceActivity) WaitForRunningSnapshotsAndIntermediateStates(
}

var waitStatuses map[protos.FlowStatus]struct{} = map[protos.FlowStatus]struct{}{
protos.FlowStatus_STATUS_SNAPSHOT: {},
protos.FlowStatus_STATUS_SETUP: {},
protos.FlowStatus_STATUS_RESYNC: {},
protos.FlowStatus_STATUS_UNKNOWN: {},
protos.FlowStatus_STATUS_PAUSING: {},
protos.FlowStatus_STATUS_MODIFYING: {},
protos.FlowStatus_STATUS_SNAPSHOT: {},
protos.FlowStatus_STATUS_SETUP: {},
protos.FlowStatus_STATUS_RESYNC: {},
protos.FlowStatus_STATUS_UNKNOWN: {},
protos.FlowStatus_STATUS_PAUSING: {},
}

func (a *MaintenanceActivity) checkAndWaitIfNeeded(
Expand Down
3 changes: 1 addition & 2 deletions flow/cmd/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,6 @@ func (h *FlowRequestHandler) createCdcJobEntry(ctx context.Context,
if err != nil {
return fmt.Errorf("unable to marshal flow config: %w", err)
}

if _, err := h.pool.Exec(ctx,
`INSERT INTO flows (workflow_id, name, source_peer, destination_peer, config_proto, status,
description, source_table_identifier, destination_table_identifier) VALUES ($1,$2,$3,$4,$5,$6,'gRPC','','')`,
Expand Down Expand Up @@ -159,7 +158,7 @@ func (h *FlowRequestHandler) CreateCDCFlow(
return nil, exceptions.NewInternalApiError(fmt.Errorf("unable to create flow job entry: %w", err))
}

if _, err := h.temporalClient.ExecuteWorkflow(ctx, workflowOptions, peerflow.CDCFlowWorkflow, cfg, nil); err != nil {
if _, err := h.temporalClient.ExecuteWorkflow(ctx, workflowOptions, peerflow.CDCFlowWorkflow, internal.CreateMinimalConfigFromFlowJobName(cfg.FlowJobName), nil); err != nil {
slog.ErrorContext(ctx, "unable to start PeerFlow workflow", slog.Any("error", err))
return nil, exceptions.NewInternalApiError(fmt.Errorf("unable to start PeerFlow workflow: %w", err))
}
Expand Down
20 changes: 2 additions & 18 deletions flow/cmd/mirror_status.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,23 +148,7 @@ func (h *FlowRequestHandler) cdcFlowStatus(
slog.ErrorContext(ctx, "unable to query flow config from catalog", slog.Any("error", err))
return nil, err
}
workflowID, err := h.getWorkflowID(ctx, req.FlowJobName)
if err != nil {
slog.ErrorContext(ctx, "unable to get the workflow ID of mirror", slog.Any("error", err))
return nil, err
}
state, err := h.getCDCWorkflowState(ctx, workflowID)
if err != nil {
slog.ErrorContext(ctx, "unable to get the state of mirror", slog.Any("error", err))
return nil, err
}

// patching config to show latest values from state
if state.SyncFlowOptions != nil {
config.IdleTimeoutSeconds = state.SyncFlowOptions.IdleTimeoutSeconds
config.MaxBatchSize = state.SyncFlowOptions.BatchSize
config.TableMappings = state.SyncFlowOptions.TableMappings
}
// The config is now always sourced from DB, so no state patching needed

srcType, err := connectors.LoadPeerType(ctx, h.pool, config.SourceName)
if err != nil {
Expand Down Expand Up @@ -473,7 +457,7 @@ func (h *FlowRequestHandler) getFlowConfigFromCatalog(
) (*protos.FlowConnectionConfigs, error) {
var configBytes sql.RawBytes
if err := h.pool.QueryRow(ctx,
"SELECT config_proto FROM flows WHERE name = $1", flowJobName,
"SELECT config_proto FROM flows WHERE name = $1 LIMIT 1", flowJobName,
).Scan(&configBytes); err != nil {
slog.ErrorContext(ctx, "unable to query flow config from catalog", slog.Any("error", err))
return nil, fmt.Errorf("unable to query flow config from catalog: %w", err)
Expand Down
7 changes: 6 additions & 1 deletion flow/connectors/clickhouse/normalize.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,12 @@ func (c *ClickHouseConnector) generateCreateTableSQLForNormalizedTable(
tmEngine := protos.TableEngine_CH_ENGINE_REPLACING_MERGE_TREE

var tableMapping *protos.TableMapping
for _, tm := range config.TableMappings {

cfg, err := internal.FetchConfigFromDB(config.FlowName)
if err != nil {
return nil, fmt.Errorf("failed to fetch config from DB: %w", err)
}
for _, tm := range cfg.TableMappings {
if tm.DestinationTableIdentifier == tableIdentifier {
tmEngine = tm.Engine
tableMapping = tm
Expand Down
7 changes: 6 additions & 1 deletion flow/connectors/postgres/postgres.go
Original file line number Diff line number Diff line change
Expand Up @@ -448,10 +448,15 @@ func pullCore[Items model.Items](
return fmt.Errorf("failed to get get setting for originMetaAsDestinationColumn: %w", err)
}

cfgFromDB, err := internal.FetchConfigFromDB(req.FlowJobName)
if err != nil {
return fmt.Errorf("unable to query flow config from catalog: %w", err)
}

cdc, err := c.NewPostgresCDCSource(ctx, &PostgresCDCConfig{
CatalogPool: catalogPool,
OtelManager: otelManager,
SrcTableIDNameMapping: req.SrcTableIDNameMapping,
SrcTableIDNameMapping: cfgFromDB.SrcTableIdNameMapping,
TableNameMapping: req.TableNameMapping,
TableNameSchemaMapping: req.TableNameSchemaMapping,
RelationMessageMapping: c.relationMessageMapping,
Expand Down
Loading
Loading