Skip to content

Commit 00660b8

Browse files
committed
Fetch configuration from DB
No longer passing the entire configuration object to the Temporal job as we can hit the 2MB hard limit imposed by Temporal. Instead, pass flowJobName and fetch the config from the DB in all the places required. There are some additional changes that were required to be made, as some of the jobs are used from multiple contexts - and I had to adapt some of the information passed to ensure that they continue to work as expected.
1 parent d07a9de commit 00660b8

File tree

14 files changed

+326
-263
lines changed

14 files changed

+326
-263
lines changed

docker-compose.yml

Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -116,7 +116,12 @@ services:
116116

117117
flow-api:
118118
container_name: flow_api
119-
image: ghcr.io/peerdb-io/flow-api:stable-v0.32.1
119+
build:
120+
context: .
121+
dockerfile: stacks/flow.Dockerfile
122+
target: flow-api
123+
args:
124+
PEERDB_VERSION_SHA_SHORT: ${PEERDB_VERSION_SHA_SHORT:-}
120125
restart: unless-stopped
121126
ports:
122127
- 8112:8112
@@ -132,7 +137,10 @@ services:
132137

133138
flow-snapshot-worker:
134139
container_name: flow-snapshot-worker
135-
image: ghcr.io/peerdb-io/flow-snapshot-worker:stable-v0.32.1
140+
build:
141+
context: .
142+
dockerfile: stacks/flow.Dockerfile
143+
target: flow-snapshot-worker
136144
restart: unless-stopped
137145
environment:
138146
<<: [*catalog-config, *flow-worker-env, *minio-config]
@@ -142,7 +150,10 @@ services:
142150

143151
flow-worker:
144152
container_name: flow-worker
145-
image: ghcr.io/peerdb-io/flow-worker:stable-v0.32.1
153+
build:
154+
context: .
155+
dockerfile: stacks/flow.Dockerfile
156+
target: flow-worker
146157
restart: unless-stopped
147158
environment:
148159
<<: [*catalog-config, *flow-worker-env, *minio-config]

flow/activities/flowable.go

Lines changed: 45 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,9 @@ import (
55
"errors"
66
"fmt"
77
"log/slog"
8+
"maps"
89
"os"
10+
"slices"
911
"strconv"
1012
"sync/atomic"
1113
"time"
@@ -124,6 +126,12 @@ func (a *FlowableActivity) EnsurePullability(
124126
}
125127
defer connectors.CloseConnector(ctx, srcConn)
126128

129+
cfg, err := internal.FetchConfigFromDB(config.FlowJobName)
130+
if err != nil {
131+
return nil, a.Alerter.LogFlowError(ctx, config.FlowJobName, fmt.Errorf("failed to fetch config from DB: %w", err))
132+
}
133+
134+
config.SourceTableIdentifiers = slices.Sorted(maps.Keys(internal.TableNameMapping(cfg.TableMappings)))
127135
output, err := srcConn.EnsurePullability(ctx, config)
128136
if err != nil {
129137
return nil, a.Alerter.LogFlowError(ctx, config.FlowJobName, fmt.Errorf("failed to ensure pullability: %w", err))
@@ -165,6 +173,18 @@ func (a *FlowableActivity) SetupTableSchema(
165173
})
166174
defer shutdown()
167175

176+
// have to fetch config from the DB
177+
cfg, err := internal.FetchConfigFromDB(config.FlowName)
178+
if err != nil {
179+
return a.Alerter.LogFlowError(ctx, config.FlowName, fmt.Errorf("failed to fetch config from DB: %w", err))
180+
}
181+
tableMappings := cfg.TableMappings
182+
if len(config.FilteredTableMappings) > 0 {
183+
// we use the filtered table mappings if provided. they are provided from
184+
// the sync flow which includes changes to the schema.
185+
tableMappings = config.FilteredTableMappings
186+
}
187+
168188
logger := internal.LoggerFromCtx(ctx)
169189
ctx = context.WithValue(ctx, shared.FlowNameKey, config.FlowName)
170190
srcConn, err := connectors.GetByNameAs[connectors.GetTableSchemaConnector](ctx, config.Env, a.CatalogPool, config.PeerName)
@@ -173,11 +193,11 @@ func (a *FlowableActivity) SetupTableSchema(
173193
}
174194
defer connectors.CloseConnector(ctx, srcConn)
175195

176-
tableNameSchemaMapping, err := srcConn.GetTableSchema(ctx, config.Env, config.Version, config.System, config.TableMappings)
196+
tableNameSchemaMapping, err := srcConn.GetTableSchema(ctx, config.Env, config.Version, config.System, tableMappings)
177197
if err != nil {
178198
return a.Alerter.LogFlowError(ctx, config.FlowName, fmt.Errorf("failed to get GetTableSchemaConnector: %w", err))
179199
}
180-
processed := internal.BuildProcessedSchemaMapping(config.TableMappings, tableNameSchemaMapping, logger)
200+
processed := internal.BuildProcessedSchemaMapping(tableMappings, tableNameSchemaMapping, logger)
181201

182202
tx, err := a.CatalogPool.BeginTx(ctx, pgx.TxOptions{})
183203
if err != nil {
@@ -213,6 +233,13 @@ func (a *FlowableActivity) CreateNormalizedTable(
213233
numTablesSetup := atomic.Uint32{}
214234
numTablesToSetup := atomic.Int32{}
215235

236+
cfg, err := internal.FetchConfigFromDB(config.FlowName)
237+
if err != nil {
238+
return nil, a.Alerter.LogFlowError(ctx, config.FlowName, fmt.Errorf("failed to fetch config from DB: %w", err))
239+
}
240+
241+
tableMappings := cfg.TableMappings
242+
216243
shutdown := heartbeatRoutine(ctx, func() string {
217244
return fmt.Sprintf("setting up normalized tables - %d of %d done", numTablesSetup.Load(), numTablesToSetup.Load())
218245
})
@@ -244,7 +271,7 @@ func (a *FlowableActivity) CreateNormalizedTable(
244271

245272
numTablesToSetup.Store(int32(len(tableNameSchemaMapping)))
246273
tableExistsMapping := make(map[string]bool, len(tableNameSchemaMapping))
247-
for _, tableMapping := range config.TableMappings {
274+
for _, tableMapping := range tableMappings {
248275
tableIdentifier := tableMapping.DestinationTableIdentifier
249276
tableSchema := tableNameSchemaMapping[tableIdentifier]
250277
existing, err := conn.SetupNormalizedTable(
@@ -292,6 +319,12 @@ func (a *FlowableActivity) SyncFlow(
292319
var syncingBatchID atomic.Int64
293320
var syncState atomic.Pointer[string]
294321
syncState.Store(shared.Ptr("setup"))
322+
323+
config, err := internal.FetchConfigFromDB(config.FlowJobName)
324+
if err != nil {
325+
return fmt.Errorf("unable to query flow config from catalog: %w", err)
326+
}
327+
295328
shutdown := heartbeatRoutine(ctx, func() string {
296329
// Must load Waiting after BatchID to avoid race saying we're waiting on currently processing batch
297330
sBatchID := syncingBatchID.Load()
@@ -363,10 +396,10 @@ func (a *FlowableActivity) SyncFlow(
363396
var syncResponse *model.SyncResponse
364397
var syncErr error
365398
if config.System == protos.TypeSystem_Q {
366-
syncResponse, syncErr = a.syncRecords(groupCtx, config, options, srcConn.(connectors.CDCPullConnector),
399+
syncResponse, syncErr = a.syncRecords(groupCtx, config, srcConn.(connectors.CDCPullConnector),
367400
normRequests, normResponses, normBufferSize, &syncingBatchID, &syncState)
368401
} else {
369-
syncResponse, syncErr = a.syncPg(groupCtx, config, options, srcConn.(connectors.CDCPullPgConnector),
402+
syncResponse, syncErr = a.syncPg(groupCtx, config, srcConn.(connectors.CDCPullPgConnector),
370403
normRequests, normResponses, normBufferSize, &syncingBatchID, &syncState)
371404
}
372405

@@ -414,7 +447,6 @@ func (a *FlowableActivity) SyncFlow(
414447
func (a *FlowableActivity) syncRecords(
415448
ctx context.Context,
416449
config *protos.FlowConnectionConfigs,
417-
options *protos.SyncFlowOptions,
418450
srcConn connectors.CDCPullConnector,
419451
normRequests *concurrency.LastChan,
420452
normResponses *concurrency.LastChan,
@@ -451,7 +483,7 @@ func (a *FlowableActivity) syncRecords(
451483
return stream, nil
452484
}
453485
}
454-
return syncCore(ctx, a, config, options, srcConn,
486+
return syncCore(ctx, a, config, srcConn,
455487
normRequests, normResponses, normBufferSize,
456488
syncingBatchID, syncWaiting, adaptStream,
457489
connectors.CDCPullConnector.PullRecords,
@@ -461,15 +493,14 @@ func (a *FlowableActivity) syncRecords(
461493
func (a *FlowableActivity) syncPg(
462494
ctx context.Context,
463495
config *protos.FlowConnectionConfigs,
464-
options *protos.SyncFlowOptions,
465496
srcConn connectors.CDCPullPgConnector,
466497
normRequests *concurrency.LastChan,
467498
normResponses *concurrency.LastChan,
468499
normBufferSize int64,
469500
syncingBatchID *atomic.Int64,
470501
syncWaiting *atomic.Pointer[string],
471502
) (*model.SyncResponse, error) {
472-
return syncCore(ctx, a, config, options, srcConn,
503+
return syncCore(ctx, a, config, srcConn,
473504
normRequests, normResponses, normBufferSize,
474505
syncingBatchID, syncWaiting, nil,
475506
connectors.CDCPullPgConnector.PullPg,
@@ -1214,9 +1245,13 @@ func (a *FlowableActivity) ReplicateXminPartition(ctx context.Context,
12141245
}
12151246
}
12161247

1217-
func (a *FlowableActivity) AddTablesToPublication(ctx context.Context, cfg *protos.FlowConnectionConfigs,
1248+
func (a *FlowableActivity) AddTablesToPublication(ctx context.Context, flowJobName string,
12181249
additionalTableMappings []*protos.TableMapping,
12191250
) error {
1251+
cfg, err := internal.FetchConfigFromDB(flowJobName)
1252+
if err != nil {
1253+
return fmt.Errorf("unable to query flow config from catalog: %w", err)
1254+
}
12201255
ctx = context.WithValue(ctx, shared.FlowNameKey, cfg.FlowJobName)
12211256
srcConn, err := connectors.GetByNameAs[*connpostgres.PostgresConnector](ctx, cfg.Env, a.CatalogPool, cfg.SourceName)
12221257
if err != nil {

flow/activities/flowable_core.go

Lines changed: 17 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -80,11 +80,10 @@ func (a *FlowableActivity) getTableNameSchemaMapping(ctx context.Context, flowNa
8080
func (a *FlowableActivity) applySchemaDeltas(
8181
ctx context.Context,
8282
config *protos.FlowConnectionConfigs,
83-
options *protos.SyncFlowOptions,
8483
schemaDeltas []*protos.TableSchemaDelta,
8584
) error {
8685
filteredTableMappings := make([]*protos.TableMapping, 0, len(schemaDeltas))
87-
for _, tableMapping := range options.TableMappings {
86+
for _, tableMapping := range config.TableMappings {
8887
if slices.ContainsFunc(schemaDeltas, func(schemaDelta *protos.TableSchemaDelta) bool {
8988
return schemaDelta.SrcTableName == tableMapping.SourceTableIdentifier &&
9089
schemaDelta.DstTableName == tableMapping.DestinationTableIdentifier
@@ -95,12 +94,12 @@ func (a *FlowableActivity) applySchemaDeltas(
9594

9695
if len(schemaDeltas) > 0 {
9796
if err := a.SetupTableSchema(ctx, &protos.SetupTableSchemaBatchInput{
98-
PeerName: config.SourceName,
99-
TableMappings: filteredTableMappings,
100-
FlowName: config.FlowJobName,
101-
System: config.System,
102-
Env: config.Env,
103-
Version: config.Version,
97+
PeerName: config.SourceName,
98+
FilteredTableMappings: filteredTableMappings,
99+
FlowName: config.FlowJobName,
100+
System: config.System,
101+
Env: config.Env,
102+
Version: config.Version,
104103
}); err != nil {
105104
return a.Alerter.LogFlowError(ctx, config.FlowJobName, fmt.Errorf("failed to execute schema update at source: %w", err))
106105
}
@@ -112,7 +111,6 @@ func syncCore[TPull connectors.CDCPullConnectorCore, TSync connectors.CDCSyncCon
112111
ctx context.Context,
113112
a *FlowableActivity,
114113
config *protos.FlowConnectionConfigs,
115-
options *protos.SyncFlowOptions,
116114
srcConn TPull,
117115
normRequests *concurrency.LastChan,
118116
normResponses *concurrency.LastChan,
@@ -127,16 +125,17 @@ func syncCore[TPull connectors.CDCPullConnectorCore, TSync connectors.CDCSyncCon
127125
ctx = context.WithValue(ctx, shared.FlowNameKey, flowName)
128126
logger := internal.LoggerFromCtx(ctx)
129127

130-
tblNameMapping := make(map[string]model.NameAndExclude, len(options.TableMappings))
131-
for _, v := range options.TableMappings {
128+
// we should be able to rely on `config` here.
129+
tblNameMapping := make(map[string]model.NameAndExclude, len(config.TableMappings))
130+
for _, v := range config.TableMappings {
132131
tblNameMapping[v.SourceTableIdentifier] = model.NewNameAndExclude(v.DestinationTableIdentifier, v.Exclude)
133132
}
134133

135134
if err := srcConn.ConnectionActive(ctx); err != nil {
136135
return nil, temporal.NewNonRetryableApplicationError("connection to source down", "disconnect", nil)
137136
}
138137

139-
batchSize := options.BatchSize
138+
batchSize := config.MaxBatchSize
140139
if batchSize == 0 {
141140
batchSize = 250_000
142141
}
@@ -189,13 +188,13 @@ func syncCore[TPull connectors.CDCPullConnectorCore, TSync connectors.CDCSyncCon
189188
errGroup.Go(func() error {
190189
return pull(srcConn, errCtx, a.CatalogPool, a.OtelManager, &model.PullRecordsRequest[Items]{
191190
FlowJobName: flowName,
192-
SrcTableIDNameMapping: options.SrcTableIdNameMapping,
191+
SrcTableIDNameMapping: config.SrcTableIdNameMapping,
193192
TableNameMapping: tblNameMapping,
194193
LastOffset: lastOffset,
195194
ConsumedOffset: &consumedOffset,
196195
MaxBatchSize: batchSize,
197196
IdleTimeout: internal.PeerDBCDCIdleTimeoutSeconds(
198-
int(options.IdleTimeoutSeconds),
197+
int(config.IdleTimeoutSeconds),
199198
),
200199
TableNameSchemaMapping: tableNameSchemaMapping,
201200
OverridePublicationName: config.PublicationName,
@@ -231,11 +230,11 @@ func syncCore[TPull connectors.CDCPullConnectorCore, TSync connectors.CDCSyncCon
231230
defer connectors.CloseConnector(ctx, dstConn)
232231

233232
syncState.Store(shared.Ptr("updating schema"))
234-
if err := dstConn.ReplayTableSchemaDeltas(ctx, config.Env, flowName, options.TableMappings, recordBatchSync.SchemaDeltas); err != nil {
233+
if err := dstConn.ReplayTableSchemaDeltas(ctx, config.Env, flowName, config.TableMappings, recordBatchSync.SchemaDeltas); err != nil {
235234
return nil, fmt.Errorf("failed to sync schema: %w", err)
236235
}
237236

238-
return nil, a.applySchemaDeltas(ctx, config, options, recordBatchSync.SchemaDeltas)
237+
return nil, a.applySchemaDeltas(ctx, config, recordBatchSync.SchemaDeltas)
239238
}
240239

241240
var res *model.SyncResponse
@@ -268,7 +267,7 @@ func syncCore[TPull connectors.CDCPullConnectorCore, TSync connectors.CDCSyncCon
268267
Records: recordBatchSync,
269268
ConsumedOffset: &consumedOffset,
270269
FlowJobName: flowName,
271-
TableMappings: options.TableMappings,
270+
TableMappings: config.TableMappings,
272271
StagingPath: config.CdcStagingPath,
273272
Script: config.Script,
274273
TableNameSchemaMapping: tableNameSchemaMapping,
@@ -331,7 +330,7 @@ func syncCore[TPull connectors.CDCPullConnectorCore, TSync connectors.CDCSyncCon
331330
a.OtelManager.Metrics.CurrentBatchIdGauge.Record(ctx, res.CurrentSyncBatchID)
332331

333332
syncState.Store(shared.Ptr("updating schema"))
334-
if err := a.applySchemaDeltas(ctx, config, options, res.TableSchemaDeltas); err != nil {
333+
if err := a.applySchemaDeltas(ctx, config, res.TableSchemaDeltas); err != nil {
335334
return nil, err
336335
}
337336

flow/cmd/handler.go

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,6 @@ func (h *FlowRequestHandler) createCdcJobEntry(ctx context.Context,
7575
if err != nil {
7676
return fmt.Errorf("unable to marshal flow config: %w", err)
7777
}
78-
7978
if _, err := h.pool.Exec(ctx,
8079
`INSERT INTO flows (workflow_id, name, source_peer, destination_peer, config_proto, status,
8180
description, source_table_identifier, destination_table_identifier) VALUES ($1,$2,$3,$4,$5,$6,'gRPC','','')`,
@@ -155,7 +154,7 @@ func (h *FlowRequestHandler) CreateCDCFlow(
155154
return nil, fmt.Errorf("unable to create flow job entry: %w", err)
156155
}
157156

158-
if _, err := h.temporalClient.ExecuteWorkflow(ctx, workflowOptions, peerflow.CDCFlowWorkflow, cfg, nil); err != nil {
157+
if _, err := h.temporalClient.ExecuteWorkflow(ctx, workflowOptions, peerflow.CDCFlowWorkflow, cfg.FlowJobName, nil); err != nil {
159158
slog.Error("unable to start PeerFlow workflow", slog.Any("error", err))
160159
return nil, fmt.Errorf("unable to start PeerFlow workflow: %w", err)
161160
}

flow/cmd/mirror_status.go

Lines changed: 1 addition & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@ import (
1818
"github.com/PeerDB-io/peerdb/flow/generated/protos"
1919
"github.com/PeerDB-io/peerdb/flow/internal"
2020
"github.com/PeerDB-io/peerdb/flow/shared"
21-
peerflow "github.com/PeerDB-io/peerdb/flow/workflows"
2221
)
2322

2423
func (h *FlowRequestHandler) ListMirrors(
@@ -137,23 +136,6 @@ func (h *FlowRequestHandler) cdcFlowStatus(
137136
slog.Error("unable to query flow config from catalog", slog.Any("error", err))
138137
return nil, err
139138
}
140-
workflowID, err := h.getWorkflowID(ctx, req.FlowJobName)
141-
if err != nil {
142-
slog.Error("unable to get the workflow ID of mirror", slog.Any("error", err))
143-
return nil, err
144-
}
145-
state, err := h.getCDCWorkflowState(ctx, workflowID)
146-
if err != nil {
147-
slog.Error("unable to get the state of mirror", slog.Any("error", err))
148-
return nil, err
149-
}
150-
151-
// patching config to show latest values from state
152-
if state.SyncFlowOptions != nil {
153-
config.IdleTimeoutSeconds = state.SyncFlowOptions.IdleTimeoutSeconds
154-
config.MaxBatchSize = state.SyncFlowOptions.BatchSize
155-
config.TableMappings = state.SyncFlowOptions.TableMappings
156-
}
157139

158140
srcType, err := connectors.LoadPeerType(ctx, h.pool, config.SourceName)
159141
if err != nil {
@@ -462,7 +444,7 @@ func (h *FlowRequestHandler) getFlowConfigFromCatalog(
462444
) (*protos.FlowConnectionConfigs, error) {
463445
var configBytes sql.RawBytes
464446
if err := h.pool.QueryRow(ctx,
465-
"SELECT config_proto FROM flows WHERE name = $1", flowJobName,
447+
"SELECT config_proto FROM flows WHERE name = $1 LIMIT 1", flowJobName,
466448
).Scan(&configBytes); err != nil {
467449
slog.Error("unable to query flow config from catalog", slog.Any("error", err))
468450
return nil, fmt.Errorf("unable to query flow config from catalog: %w", err)
@@ -492,24 +474,6 @@ func (h *FlowRequestHandler) getWorkflowStatus(ctx context.Context, workflowID s
492474
return internal.GetWorkflowStatus(ctx, h.pool, workflowID)
493475
}
494476

495-
func (h *FlowRequestHandler) getCDCWorkflowState(ctx context.Context,
496-
workflowID string,
497-
) (*peerflow.CDCFlowWorkflowState, error) {
498-
res, err := h.temporalClient.QueryWorkflow(ctx, workflowID, "", shared.CDCFlowStateQuery)
499-
if err != nil {
500-
slog.Error(fmt.Sprintf("failed to get state in workflow with ID %s: %s", workflowID, err.Error()))
501-
return nil,
502-
fmt.Errorf("failed to get state in workflow with ID %s: %w", workflowID, err)
503-
}
504-
var state peerflow.CDCFlowWorkflowState
505-
if err := res.Get(&state); err != nil {
506-
slog.Error(fmt.Sprintf("failed to get state in workflow with ID %s: %s", workflowID, err.Error()))
507-
return nil,
508-
fmt.Errorf("failed to get state in workflow with ID %s: %w", workflowID, err)
509-
}
510-
return &state, nil
511-
}
512-
513477
func (h *FlowRequestHandler) getMirrorCreatedAt(ctx context.Context, flowJobName string) (*time.Time, error) {
514478
var createdAt pgtype.Timestamp
515479
err := h.pool.QueryRow(ctx, "SELECT created_at FROM flows WHERE name=$1", flowJobName).Scan(&createdAt)

flow/connectors/clickhouse/normalize.go

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -89,7 +89,12 @@ func (c *ClickHouseConnector) generateCreateTableSQLForNormalizedTable(
8989
tmEngine := protos.TableEngine_CH_ENGINE_REPLACING_MERGE_TREE
9090

9191
var tableMapping *protos.TableMapping
92-
for _, tm := range config.TableMappings {
92+
93+
cfg, err := internal.FetchConfigFromDB(config.FlowName)
94+
if err != nil {
95+
return nil, fmt.Errorf("failed to fetch config from DB: %w", err)
96+
}
97+
for _, tm := range cfg.TableMappings {
9398
if tm.DestinationTableIdentifier == tableIdentifier {
9499
tmEngine = tm.Engine
95100
tableMapping = tm

0 commit comments

Comments
 (0)