Skip to content

Commit 190bc92

Browse files
committed
Fetch configuration from DB
No longer passing the entire configuration object to the Temporal job as we can hit the 2MB hard limit imposed by Temporal. Instead, pass flowJobName and fetch the config from the DB in all the places required. There are some additional changes that were required to be made, as some of the jobs are used from multiple contexts - and I had to adapt some of the information passed to ensure that they continue to work as expected.
1 parent c8c8164 commit 190bc92

File tree

13 files changed

+309
-226
lines changed

13 files changed

+309
-226
lines changed

flow/activities/flowable.go

Lines changed: 45 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,9 @@ import (
55
"errors"
66
"fmt"
77
"log/slog"
8+
"maps"
89
"os"
10+
"slices"
911
"strconv"
1012
"sync/atomic"
1113
"time"
@@ -124,6 +126,12 @@ func (a *FlowableActivity) EnsurePullability(
124126
}
125127
defer connectors.CloseConnector(ctx, srcConn)
126128

129+
cfg, err := internal.FetchConfigFromDB(config.FlowJobName)
130+
if err != nil {
131+
return nil, a.Alerter.LogFlowError(ctx, config.FlowJobName, fmt.Errorf("failed to fetch config from DB: %w", err))
132+
}
133+
134+
config.SourceTableIdentifiers = slices.Sorted(maps.Keys(internal.TableNameMapping(cfg.TableMappings)))
127135
output, err := srcConn.EnsurePullability(ctx, config)
128136
if err != nil {
129137
return nil, a.Alerter.LogFlowError(ctx, config.FlowJobName, fmt.Errorf("failed to ensure pullability: %w", err))
@@ -165,6 +173,18 @@ func (a *FlowableActivity) SetupTableSchema(
165173
})
166174
defer shutdown()
167175

176+
// have to fetch config from the DB
177+
cfg, err := internal.FetchConfigFromDB(config.FlowName)
178+
if err != nil {
179+
return a.Alerter.LogFlowError(ctx, config.FlowName, fmt.Errorf("failed to fetch config from DB: %w", err))
180+
}
181+
tableMappings := cfg.TableMappings
182+
if len(config.FilteredTableMappings) > 0 {
183+
// we use the filtered table mappings if provided. they are provided from
184+
// the sync flow which includes changes to the schema.
185+
tableMappings = config.FilteredTableMappings
186+
}
187+
168188
logger := internal.LoggerFromCtx(ctx)
169189
ctx = context.WithValue(ctx, shared.FlowNameKey, config.FlowName)
170190
srcConn, err := connectors.GetByNameAs[connectors.GetTableSchemaConnector](ctx, config.Env, a.CatalogPool, config.PeerName)
@@ -173,11 +193,11 @@ func (a *FlowableActivity) SetupTableSchema(
173193
}
174194
defer connectors.CloseConnector(ctx, srcConn)
175195

176-
tableNameSchemaMapping, err := srcConn.GetTableSchema(ctx, config.Env, config.Version, config.System, config.TableMappings)
196+
tableNameSchemaMapping, err := srcConn.GetTableSchema(ctx, config.Env, config.Version, config.System, tableMappings)
177197
if err != nil {
178198
return a.Alerter.LogFlowError(ctx, config.FlowName, fmt.Errorf("failed to get GetTableSchemaConnector: %w", err))
179199
}
180-
processed := internal.BuildProcessedSchemaMapping(config.TableMappings, tableNameSchemaMapping, logger)
200+
processed := internal.BuildProcessedSchemaMapping(tableMappings, tableNameSchemaMapping, logger)
181201

182202
tx, err := a.CatalogPool.BeginTx(ctx, pgx.TxOptions{})
183203
if err != nil {
@@ -213,6 +233,13 @@ func (a *FlowableActivity) CreateNormalizedTable(
213233
numTablesSetup := atomic.Uint32{}
214234
numTablesToSetup := atomic.Int32{}
215235

236+
cfg, err := internal.FetchConfigFromDB(config.FlowName)
237+
if err != nil {
238+
return nil, a.Alerter.LogFlowError(ctx, config.FlowName, fmt.Errorf("failed to fetch config from DB: %w", err))
239+
}
240+
241+
tableMappings := cfg.TableMappings
242+
216243
shutdown := heartbeatRoutine(ctx, func() string {
217244
return fmt.Sprintf("setting up normalized tables - %d of %d done", numTablesSetup.Load(), numTablesToSetup.Load())
218245
})
@@ -244,7 +271,7 @@ func (a *FlowableActivity) CreateNormalizedTable(
244271

245272
numTablesToSetup.Store(int32(len(tableNameSchemaMapping)))
246273
tableExistsMapping := make(map[string]bool, len(tableNameSchemaMapping))
247-
for _, tableMapping := range config.TableMappings {
274+
for _, tableMapping := range tableMappings {
248275
tableIdentifier := tableMapping.DestinationTableIdentifier
249276
tableSchema := tableNameSchemaMapping[tableIdentifier]
250277
existing, err := conn.SetupNormalizedTable(
@@ -292,6 +319,12 @@ func (a *FlowableActivity) SyncFlow(
292319
var syncingBatchID atomic.Int64
293320
var syncState atomic.Pointer[string]
294321
syncState.Store(shared.Ptr("setup"))
322+
323+
config, err := internal.FetchConfigFromDB(config.FlowJobName)
324+
if err != nil {
325+
return fmt.Errorf("unable to query flow config from catalog: %w", err)
326+
}
327+
295328
shutdown := heartbeatRoutine(ctx, func() string {
296329
// Must load Waiting after BatchID to avoid race saying we're waiting on currently processing batch
297330
sBatchID := syncingBatchID.Load()
@@ -363,10 +396,10 @@ func (a *FlowableActivity) SyncFlow(
363396
var syncResponse *model.SyncResponse
364397
var syncErr error
365398
if config.System == protos.TypeSystem_Q {
366-
syncResponse, syncErr = a.syncRecords(groupCtx, config, options, srcConn.(connectors.CDCPullConnector),
399+
syncResponse, syncErr = a.syncRecords(groupCtx, config, srcConn.(connectors.CDCPullConnector),
367400
normRequests, normResponses, normBufferSize, &syncingBatchID, &syncState)
368401
} else {
369-
syncResponse, syncErr = a.syncPg(groupCtx, config, options, srcConn.(connectors.CDCPullPgConnector),
402+
syncResponse, syncErr = a.syncPg(groupCtx, config, srcConn.(connectors.CDCPullPgConnector),
370403
normRequests, normResponses, normBufferSize, &syncingBatchID, &syncState)
371404
}
372405

@@ -414,7 +447,6 @@ func (a *FlowableActivity) SyncFlow(
414447
func (a *FlowableActivity) syncRecords(
415448
ctx context.Context,
416449
config *protos.FlowConnectionConfigs,
417-
options *protos.SyncFlowOptions,
418450
srcConn connectors.CDCPullConnector,
419451
normRequests *concurrency.LastChan,
420452
normResponses *concurrency.LastChan,
@@ -451,7 +483,7 @@ func (a *FlowableActivity) syncRecords(
451483
return stream, nil
452484
}
453485
}
454-
return syncCore(ctx, a, config, options, srcConn,
486+
return syncCore(ctx, a, config, srcConn,
455487
normRequests, normResponses, normBufferSize,
456488
syncingBatchID, syncWaiting, adaptStream,
457489
connectors.CDCPullConnector.PullRecords,
@@ -461,15 +493,14 @@ func (a *FlowableActivity) syncRecords(
461493
func (a *FlowableActivity) syncPg(
462494
ctx context.Context,
463495
config *protos.FlowConnectionConfigs,
464-
options *protos.SyncFlowOptions,
465496
srcConn connectors.CDCPullPgConnector,
466497
normRequests *concurrency.LastChan,
467498
normResponses *concurrency.LastChan,
468499
normBufferSize int64,
469500
syncingBatchID *atomic.Int64,
470501
syncWaiting *atomic.Pointer[string],
471502
) (*model.SyncResponse, error) {
472-
return syncCore(ctx, a, config, options, srcConn,
503+
return syncCore(ctx, a, config, srcConn,
473504
normRequests, normResponses, normBufferSize,
474505
syncingBatchID, syncWaiting, nil,
475506
connectors.CDCPullPgConnector.PullPg,
@@ -1230,9 +1261,13 @@ func (a *FlowableActivity) ReplicateXminPartition(ctx context.Context,
12301261
}
12311262
}
12321263

1233-
func (a *FlowableActivity) AddTablesToPublication(ctx context.Context, cfg *protos.FlowConnectionConfigs,
1264+
func (a *FlowableActivity) AddTablesToPublication(ctx context.Context, flowJobName string,
12341265
additionalTableMappings []*protos.TableMapping,
12351266
) error {
1267+
cfg, err := internal.FetchConfigFromDB(flowJobName)
1268+
if err != nil {
1269+
return fmt.Errorf("unable to query flow config from catalog: %w", err)
1270+
}
12361271
ctx = context.WithValue(ctx, shared.FlowNameKey, cfg.FlowJobName)
12371272
srcConn, err := connectors.GetByNameAs[*connpostgres.PostgresConnector](ctx, cfg.Env, a.CatalogPool, cfg.SourceName)
12381273
if err != nil {

flow/activities/flowable_core.go

Lines changed: 17 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -80,11 +80,10 @@ func (a *FlowableActivity) getTableNameSchemaMapping(ctx context.Context, flowNa
8080
func (a *FlowableActivity) applySchemaDeltas(
8181
ctx context.Context,
8282
config *protos.FlowConnectionConfigs,
83-
options *protos.SyncFlowOptions,
8483
schemaDeltas []*protos.TableSchemaDelta,
8584
) error {
8685
filteredTableMappings := make([]*protos.TableMapping, 0, len(schemaDeltas))
87-
for _, tableMapping := range options.TableMappings {
86+
for _, tableMapping := range config.TableMappings {
8887
if slices.ContainsFunc(schemaDeltas, func(schemaDelta *protos.TableSchemaDelta) bool {
8988
return schemaDelta.SrcTableName == tableMapping.SourceTableIdentifier &&
9089
schemaDelta.DstTableName == tableMapping.DestinationTableIdentifier
@@ -95,12 +94,12 @@ func (a *FlowableActivity) applySchemaDeltas(
9594

9695
if len(schemaDeltas) > 0 {
9796
if err := a.SetupTableSchema(ctx, &protos.SetupTableSchemaBatchInput{
98-
PeerName: config.SourceName,
99-
TableMappings: filteredTableMappings,
100-
FlowName: config.FlowJobName,
101-
System: config.System,
102-
Env: config.Env,
103-
Version: config.Version,
97+
PeerName: config.SourceName,
98+
FilteredTableMappings: filteredTableMappings,
99+
FlowName: config.FlowJobName,
100+
System: config.System,
101+
Env: config.Env,
102+
Version: config.Version,
104103
}); err != nil {
105104
return a.Alerter.LogFlowError(ctx, config.FlowJobName, fmt.Errorf("failed to execute schema update at source: %w", err))
106105
}
@@ -112,7 +111,6 @@ func syncCore[TPull connectors.CDCPullConnectorCore, TSync connectors.CDCSyncCon
112111
ctx context.Context,
113112
a *FlowableActivity,
114113
config *protos.FlowConnectionConfigs,
115-
options *protos.SyncFlowOptions,
116114
srcConn TPull,
117115
normRequests *concurrency.LastChan,
118116
normResponses *concurrency.LastChan,
@@ -127,16 +125,17 @@ func syncCore[TPull connectors.CDCPullConnectorCore, TSync connectors.CDCSyncCon
127125
ctx = context.WithValue(ctx, shared.FlowNameKey, flowName)
128126
logger := internal.LoggerFromCtx(ctx)
129127

130-
tblNameMapping := make(map[string]model.NameAndExclude, len(options.TableMappings))
131-
for _, v := range options.TableMappings {
128+
// we should be able to rely on `config` here.
129+
tblNameMapping := make(map[string]model.NameAndExclude, len(config.TableMappings))
130+
for _, v := range config.TableMappings {
132131
tblNameMapping[v.SourceTableIdentifier] = model.NewNameAndExclude(v.DestinationTableIdentifier, v.Exclude)
133132
}
134133

135134
if err := srcConn.ConnectionActive(ctx); err != nil {
136135
return nil, temporal.NewNonRetryableApplicationError("connection to source down", "disconnect", nil)
137136
}
138137

139-
batchSize := options.BatchSize
138+
batchSize := config.MaxBatchSize
140139
if batchSize == 0 {
141140
batchSize = 250_000
142141
}
@@ -190,13 +189,13 @@ func syncCore[TPull connectors.CDCPullConnectorCore, TSync connectors.CDCSyncCon
190189
errGroup.Go(func() error {
191190
return pull(srcConn, errCtx, a.CatalogPool, a.OtelManager, &model.PullRecordsRequest[Items]{
192191
FlowJobName: flowName,
193-
SrcTableIDNameMapping: options.SrcTableIdNameMapping,
192+
SrcTableIDNameMapping: config.SrcTableIdNameMapping,
194193
TableNameMapping: tblNameMapping,
195194
LastOffset: lastOffset,
196195
ConsumedOffset: &consumedOffset,
197196
MaxBatchSize: batchSize,
198197
IdleTimeout: internal.PeerDBCDCIdleTimeoutSeconds(
199-
int(options.IdleTimeoutSeconds),
198+
int(config.IdleTimeoutSeconds),
200199
),
201200
TableNameSchemaMapping: tableNameSchemaMapping,
202201
OverridePublicationName: config.PublicationName,
@@ -234,11 +233,11 @@ func syncCore[TPull connectors.CDCPullConnectorCore, TSync connectors.CDCSyncCon
234233
defer connectors.CloseConnector(ctx, dstConn)
235234

236235
syncState.Store(shared.Ptr("updating schema"))
237-
if err := dstConn.ReplayTableSchemaDeltas(ctx, config.Env, flowName, options.TableMappings, recordBatchSync.SchemaDeltas); err != nil {
236+
if err := dstConn.ReplayTableSchemaDeltas(ctx, config.Env, flowName, config.TableMappings, recordBatchSync.SchemaDeltas); err != nil {
238237
return nil, fmt.Errorf("failed to sync schema: %w", err)
239238
}
240239

241-
return nil, a.applySchemaDeltas(ctx, config, options, recordBatchSync.SchemaDeltas)
240+
return nil, a.applySchemaDeltas(ctx, config, recordBatchSync.SchemaDeltas)
242241
}
243242

244243
var res *model.SyncResponse
@@ -271,7 +270,7 @@ func syncCore[TPull connectors.CDCPullConnectorCore, TSync connectors.CDCSyncCon
271270
Records: recordBatchSync,
272271
ConsumedOffset: &consumedOffset,
273272
FlowJobName: flowName,
274-
TableMappings: options.TableMappings,
273+
TableMappings: config.TableMappings,
275274
StagingPath: config.CdcStagingPath,
276275
Script: config.Script,
277276
TableNameSchemaMapping: tableNameSchemaMapping,
@@ -334,7 +333,7 @@ func syncCore[TPull connectors.CDCPullConnectorCore, TSync connectors.CDCSyncCon
334333
a.OtelManager.Metrics.CurrentBatchIdGauge.Record(ctx, res.CurrentSyncBatchID)
335334

336335
syncState.Store(shared.Ptr("updating schema"))
337-
if err := a.applySchemaDeltas(ctx, config, options, res.TableSchemaDeltas); err != nil {
336+
if err := a.applySchemaDeltas(ctx, config, res.TableSchemaDeltas); err != nil {
338337
return nil, err
339338
}
340339

flow/cmd/handler.go

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -77,7 +77,6 @@ func (h *FlowRequestHandler) createCdcJobEntry(ctx context.Context,
7777
if err != nil {
7878
return fmt.Errorf("unable to marshal flow config: %w", err)
7979
}
80-
8180
if _, err := h.pool.Exec(ctx,
8281
`INSERT INTO flows (workflow_id, name, source_peer, destination_peer, config_proto, status,
8382
description, source_table_identifier, destination_table_identifier) VALUES ($1,$2,$3,$4,$5,$6,'gRPC','','')`,

flow/cmd/mirror_status.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -473,7 +473,7 @@ func (h *FlowRequestHandler) getFlowConfigFromCatalog(
473473
) (*protos.FlowConnectionConfigs, error) {
474474
var configBytes sql.RawBytes
475475
if err := h.pool.QueryRow(ctx,
476-
"SELECT config_proto FROM flows WHERE name = $1", flowJobName,
476+
"SELECT config_proto FROM flows WHERE name = $1 LIMIT 1", flowJobName,
477477
).Scan(&configBytes); err != nil {
478478
slog.ErrorContext(ctx, "unable to query flow config from catalog", slog.Any("error", err))
479479
return nil, fmt.Errorf("unable to query flow config from catalog: %w", err)

flow/connectors/clickhouse/normalize.go

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -90,7 +90,12 @@ func (c *ClickHouseConnector) generateCreateTableSQLForNormalizedTable(
9090
tmEngine := protos.TableEngine_CH_ENGINE_REPLACING_MERGE_TREE
9191

9292
var tableMapping *protos.TableMapping
93-
for _, tm := range config.TableMappings {
93+
94+
cfg, err := internal.FetchConfigFromDB(config.FlowName)
95+
if err != nil {
96+
return nil, fmt.Errorf("failed to fetch config from DB: %w", err)
97+
}
98+
for _, tm := range cfg.TableMappings {
9499
if tm.DestinationTableIdentifier == tableIdentifier {
95100
tmEngine = tm.Engine
96101
tableMapping = tm

flow/connectors/postgres/postgres.go

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -448,10 +448,15 @@ func pullCore[Items model.Items](
448448
return fmt.Errorf("failed to get get setting for originMetaAsDestinationColumn: %w", err)
449449
}
450450

451+
cfgFromDB, err := internal.FetchConfigFromDB(req.FlowJobName)
452+
if err != nil {
453+
return fmt.Errorf("unable to query flow config from catalog: %w", err)
454+
}
455+
451456
cdc, err := c.NewPostgresCDCSource(ctx, &PostgresCDCConfig{
452457
CatalogPool: catalogPool,
453458
OtelManager: otelManager,
454-
SrcTableIDNameMapping: req.SrcTableIDNameMapping,
459+
SrcTableIDNameMapping: cfgFromDB.SrcTableIdNameMapping,
455460
TableNameMapping: req.TableNameMapping,
456461
TableNameSchemaMapping: req.TableNameSchemaMapping,
457462
RelationMessageMapping: c.relationMessageMapping,

flow/e2e/postgres/peer_flow_pg_test.go

Lines changed: 13 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ import (
1313

1414
"github.com/PeerDB-io/peerdb/flow/e2e"
1515
"github.com/PeerDB-io/peerdb/flow/generated/protos"
16+
"github.com/PeerDB-io/peerdb/flow/internal"
1617
"github.com/PeerDB-io/peerdb/flow/model"
1718
"github.com/PeerDB-io/peerdb/flow/shared"
1819
peerflow "github.com/PeerDB-io/peerdb/flow/workflows"
@@ -934,11 +935,12 @@ func (s PeerFlowE2ETestSuitePG) Test_Dynamic_Mirror_Config_Via_Signals() {
934935
return s.comparePGTables(srcTable1Name, dstTable1Name, "id,t") == nil
935936
})
936937

937-
workflowState := e2e.EnvGetWorkflowState(s.t, env)
938-
assert.EqualValues(s.t, 7, workflowState.SyncFlowOptions.IdleTimeoutSeconds)
939-
assert.EqualValues(s.t, 6, workflowState.SyncFlowOptions.BatchSize)
940-
assert.Len(s.t, workflowState.SyncFlowOptions.TableMappings, 1)
941-
assert.Len(s.t, workflowState.SyncFlowOptions.SrcTableIdNameMapping, 1)
938+
updatedConfig, err := internal.FetchConfigFromDB(config.FlowJobName)
939+
require.NoError(s.t, err)
940+
assert.EqualValues(s.t, 7, updatedConfig.IdleTimeoutSeconds)
941+
assert.EqualValues(s.t, 6, updatedConfig.MaxBatchSize)
942+
assert.Len(s.t, updatedConfig.TableMappings, 1)
943+
assert.Len(s.t, updatedConfig.SrcTableIdNameMapping, 1)
942944

943945
if !s.t.Failed() {
944946
e2e.SignalWorkflow(s.t.Context(), env, model.FlowSignal, model.PauseSignal)
@@ -976,11 +978,12 @@ func (s PeerFlowE2ETestSuitePG) Test_Dynamic_Mirror_Config_Via_Signals() {
976978
return s.comparePGTables(srcTable2Name, dstTable2Name, "id,t") == nil
977979
})
978980

979-
workflowState = e2e.EnvGetWorkflowState(s.t, env)
980-
assert.EqualValues(s.t, 14, workflowState.SyncFlowOptions.IdleTimeoutSeconds)
981-
assert.EqualValues(s.t, 12, workflowState.SyncFlowOptions.BatchSize)
982-
assert.Len(s.t, workflowState.SyncFlowOptions.TableMappings, 2)
983-
assert.Len(s.t, workflowState.SyncFlowOptions.SrcTableIdNameMapping, 2)
981+
updatedConfig, err = internal.FetchConfigFromDB(config.FlowJobName)
982+
require.NoError(s.t, err)
983+
assert.EqualValues(s.t, 14, updatedConfig.IdleTimeoutSeconds)
984+
assert.EqualValues(s.t, 12, updatedConfig.MaxBatchSize)
985+
assert.Len(s.t, updatedConfig.TableMappings, 2)
986+
assert.Len(s.t, updatedConfig.SrcTableIdNameMapping, 2)
984987
}
985988

986989
env.Cancel(s.t.Context())
Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
package internal
2+
3+
import (
4+
"context"
5+
"database/sql"
6+
"fmt"
7+
8+
"google.golang.org/protobuf/proto"
9+
10+
"github.com/PeerDB-io/peerdb/flow/generated/protos"
11+
)
12+
13+
func TableNameMapping(tableMappings []*protos.TableMapping) map[string]string {
14+
tblNameMapping := make(map[string]string, len(tableMappings))
15+
for _, v := range tableMappings {
16+
tblNameMapping[v.SourceTableIdentifier] = v.DestinationTableIdentifier
17+
}
18+
return tblNameMapping
19+
}
20+
21+
func FetchConfigFromDB(flowName string) (*protos.FlowConnectionConfigs, error) {
22+
var configBytes sql.RawBytes
23+
dbCtx := context.Background()
24+
pool, _ := GetCatalogConnectionPoolFromEnv(dbCtx)
25+
defer dbCtx.Done()
26+
if err := pool.QueryRow(dbCtx,
27+
"SELECT config_proto FROM flows WHERE name = $1 LIMIT 1", flowName,
28+
).Scan(&configBytes); err != nil {
29+
return nil, fmt.Errorf("unable to query flow config from catalog: %w", err)
30+
}
31+
32+
var cfgFromDB protos.FlowConnectionConfigs
33+
if err := proto.Unmarshal(configBytes, &cfgFromDB); err != nil {
34+
return nil, fmt.Errorf("unable to unmarshal flow config: %w", err)
35+
}
36+
37+
return &cfgFromDB, nil
38+
}

0 commit comments

Comments
 (0)