Skip to content

Commit 20f7fd4

Browse files
committed
rebase & addr feedback
1 parent 190bc92 commit 20f7fd4

File tree

7 files changed

+51
-62
lines changed

7 files changed

+51
-62
lines changed

flow/activities/flowable.go

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1077,13 +1077,12 @@ func (a *FlowableActivity) emitLogRetentionHours(
10771077
}
10781078

10791079
var activeFlowStatuses = map[protos.FlowStatus]struct{}{
1080-
protos.FlowStatus_STATUS_RUNNING: {},
1081-
protos.FlowStatus_STATUS_PAUSED: {},
1082-
protos.FlowStatus_STATUS_PAUSING: {},
1083-
protos.FlowStatus_STATUS_SETUP: {},
1084-
protos.FlowStatus_STATUS_SNAPSHOT: {},
1085-
protos.FlowStatus_STATUS_RESYNC: {},
1086-
protos.FlowStatus_STATUS_MODIFYING: {},
1080+
protos.FlowStatus_STATUS_RUNNING: {},
1081+
protos.FlowStatus_STATUS_PAUSED: {},
1082+
protos.FlowStatus_STATUS_PAUSING: {},
1083+
protos.FlowStatus_STATUS_SETUP: {},
1084+
protos.FlowStatus_STATUS_SNAPSHOT: {},
1085+
protos.FlowStatus_STATUS_RESYNC: {},
10871086
}
10881087

10891088
func (a *FlowableActivity) QRepHasNewRows(ctx context.Context,

flow/activities/maintenance_activity.go

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -94,12 +94,11 @@ func (a *MaintenanceActivity) WaitForRunningSnapshotsAndIntermediateStates(
9494
}
9595

9696
var waitStatuses map[protos.FlowStatus]struct{} = map[protos.FlowStatus]struct{}{
97-
protos.FlowStatus_STATUS_SNAPSHOT: {},
98-
protos.FlowStatus_STATUS_SETUP: {},
99-
protos.FlowStatus_STATUS_RESYNC: {},
100-
protos.FlowStatus_STATUS_UNKNOWN: {},
101-
protos.FlowStatus_STATUS_PAUSING: {},
102-
protos.FlowStatus_STATUS_MODIFYING: {},
97+
protos.FlowStatus_STATUS_SNAPSHOT: {},
98+
protos.FlowStatus_STATUS_SETUP: {},
99+
protos.FlowStatus_STATUS_RESYNC: {},
100+
protos.FlowStatus_STATUS_UNKNOWN: {},
101+
protos.FlowStatus_STATUS_PAUSING: {},
103102
}
104103

105104
func (a *MaintenanceActivity) checkAndWaitIfNeeded(

flow/cmd/handler.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -158,7 +158,7 @@ func (h *FlowRequestHandler) CreateCDCFlow(
158158
return nil, exceptions.NewInternalApiError(fmt.Errorf("unable to create flow job entry: %w", err))
159159
}
160160

161-
if _, err := h.temporalClient.ExecuteWorkflow(ctx, workflowOptions, peerflow.CDCFlowWorkflow, cfg, nil); err != nil {
161+
if _, err := h.temporalClient.ExecuteWorkflow(ctx, workflowOptions, peerflow.CDCFlowWorkflow, internal.CreateMinimalConfigFromFlowJobName(cfg.FlowJobName), nil); err != nil {
162162
slog.ErrorContext(ctx, "unable to start PeerFlow workflow", slog.Any("error", err))
163163
return nil, exceptions.NewInternalApiError(fmt.Errorf("unable to start PeerFlow workflow: %w", err))
164164
}
@@ -428,7 +428,8 @@ func (h *FlowRequestHandler) CreatePeer(
428428
ctx context.Context,
429429
req *protos.CreatePeerRequest,
430430
) (*protos.CreatePeerResponse, error) {
431-
if !req.DisableValidation {
431+
// Always validate peers
432+
if true {
432433
status, validateErr := h.ValidatePeer(ctx, &protos.ValidatePeerRequest{Peer: req.Peer})
433434
if validateErr != nil {
434435
// ValidatePeer returns proper grpc errors

flow/cmd/mirror_status.go

Lines changed: 1 addition & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -148,23 +148,7 @@ func (h *FlowRequestHandler) cdcFlowStatus(
148148
slog.ErrorContext(ctx, "unable to query flow config from catalog", slog.Any("error", err))
149149
return nil, err
150150
}
151-
workflowID, err := h.getWorkflowID(ctx, req.FlowJobName)
152-
if err != nil {
153-
slog.ErrorContext(ctx, "unable to get the workflow ID of mirror", slog.Any("error", err))
154-
return nil, err
155-
}
156-
state, err := h.getCDCWorkflowState(ctx, workflowID)
157-
if err != nil {
158-
slog.ErrorContext(ctx, "unable to get the state of mirror", slog.Any("error", err))
159-
return nil, err
160-
}
161-
162-
// patching config to show latest values from state
163-
if state.SyncFlowOptions != nil {
164-
config.IdleTimeoutSeconds = state.SyncFlowOptions.IdleTimeoutSeconds
165-
config.MaxBatchSize = state.SyncFlowOptions.BatchSize
166-
config.TableMappings = state.SyncFlowOptions.TableMappings
167-
}
151+
// The config is now always sourced from DB, so no state patching needed
168152

169153
srcType, err := connectors.LoadPeerType(ctx, h.pool, config.SourceName)
170154
if err != nil {

flow/internal/flow_configuration_helpers.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,3 +36,11 @@ func FetchConfigFromDB(flowName string) (*protos.FlowConnectionConfigs, error) {
3636

3737
return &cfgFromDB, nil
3838
}
39+
40+
// CreateMinimalConfigFromFlowJobName creates a minimal FlowConnectionConfigs with just the FlowJobName set.
41+
// This is used when calling CDCFlowWorkflow which will fetch the full config from DB internally.
42+
func CreateMinimalConfigFromFlowJobName(flowJobName string) *protos.FlowConnectionConfigs {
43+
return &protos.FlowConnectionConfigs{
44+
FlowJobName: flowJobName,
45+
}
46+
}

flow/workflows/cdc_flow.go

Lines changed: 26 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -240,21 +240,22 @@ func processTableAdditions(
240240
additionalTablesUUID := GetUUID(ctx)
241241
childAdditionalTablesCDCFlowID := GetChildWorkflowID("additional-cdc-flow", cfg.FlowJobName, additionalTablesUUID)
242242
additionalTablesCfg := proto.CloneOf(cfg)
243-
additionalTablesCfg.DoInitialSnapshot = !flowConfigUpdate.SkipInitialSnapshotForTableAdditions
243+
// Default to doing initial snapshot for additional tables
244+
additionalTablesCfg.DoInitialSnapshot = true
244245
additionalTablesCfg.InitialSnapshotOnly = true
245246
additionalTablesCfg.TableMappings = append(additionalTablesCfg.TableMappings, flowConfigUpdate.AdditionalTables...)
246247
additionalTablesCfg.Resync = false
247-
if state.SnapshotNumRowsPerPartition > 0 {
248-
additionalTablesCfg.SnapshotNumRowsPerPartition = state.SnapshotNumRowsPerPartition
248+
if flowConfigUpdate.SnapshotNumRowsPerPartition > 0 {
249+
additionalTablesCfg.SnapshotNumRowsPerPartition = flowConfigUpdate.SnapshotNumRowsPerPartition
249250
}
250-
if state.SnapshotNumPartitionsOverride > 0 {
251-
additionalTablesCfg.SnapshotNumPartitionsOverride = state.SnapshotNumPartitionsOverride
251+
if flowConfigUpdate.SnapshotNumPartitionsOverride > 0 {
252+
additionalTablesCfg.SnapshotNumPartitionsOverride = flowConfigUpdate.SnapshotNumPartitionsOverride
252253
}
253-
if state.SnapshotMaxParallelWorkers > 0 {
254-
additionalTablesCfg.SnapshotMaxParallelWorkers = state.SnapshotMaxParallelWorkers
254+
if flowConfigUpdate.SnapshotMaxParallelWorkers > 0 {
255+
additionalTablesCfg.SnapshotMaxParallelWorkers = flowConfigUpdate.SnapshotMaxParallelWorkers
255256
}
256-
if state.SnapshotNumTablesInParallel > 0 {
257-
additionalTablesCfg.SnapshotNumTablesInParallel = state.SnapshotNumTablesInParallel
257+
if flowConfigUpdate.SnapshotNumTablesInParallel > 0 {
258+
additionalTablesCfg.SnapshotNumTablesInParallel = flowConfigUpdate.SnapshotNumTablesInParallel
258259
}
259260

260261
uploadConfigToCatalog(ctx, additionalTablesCfg)
@@ -274,7 +275,7 @@ func processTableAdditions(
274275
childAddTablesCDCFlowFuture := workflow.ExecuteChildWorkflow(
275276
childAddTablesCDCFlowCtx,
276277
CDCFlowWorkflow,
277-
additionalTablesCfg.FlowJobName,
278+
internal.CreateMinimalConfigFromFlowJobName(additionalTablesCfg.FlowJobName),
278279
nil, // nil is passed to trigger `setup` flow.
279280
)
280281
addTablesSelector.AddFuture(childAddTablesCDCFlowFuture, func(f workflow.Future) {
@@ -283,15 +284,10 @@ func processTableAdditions(
283284
}
284285
})
285286

286-
// additional tables should also be resynced, we don't know how much was done so far
287-
// state.SyncFlowOptions.TableMappings = append(state.SyncFlowOptions.TableMappings, flowConfigUpdate.AdditionalTables...)
288-
289287
for res == nil {
290288
addTablesSelector.Select(ctx)
291289
if state.ActiveSignal == model.TerminateSignal || state.ActiveSignal == model.ResyncSignal {
292290
if state.ActiveSignal == model.ResyncSignal {
293-
// additional tables should also be resynced, we don't know how much was done so far
294-
// state.SyncFlowOptions.TableMappings = append(state.SyncFlowOptions.TableMappings, flowConfigUpdate.AdditionalTables...)
295291
resyncCfg := syncStateToConfigProtoInCatalog(ctx, cfg, state.FlowConfigUpdate)
296292
state.DropFlowInput.FlowJobName = resyncCfg.FlowJobName
297293
state.DropFlowInput.FlowConnectionConfigs = resyncCfg
@@ -318,7 +314,7 @@ func processTableRemovals(
318314
cfg *protos.FlowConnectionConfigs,
319315
state *CDCFlowWorkflowState,
320316
) error {
321-
state.updateStatus(ctx, logger, protos.FlowStatus_STATUS_MODIFYING)
317+
state.updateStatus(ctx, logger, protos.FlowStatus_STATUS_RUNNING)
322318
removeTablesSelector := workflow.NewNamedSelector(ctx, "RemoveTables")
323319
removeTablesSelector.AddReceive(ctx.Done(), func(_ workflow.ReceiveChannel, _ bool) {})
324320
flowSignalStateChangeChan := model.FlowSignalStateChange.GetSignalChannel(ctx)
@@ -428,24 +424,25 @@ func addCdcPropertiesSignalListener(
428424
slog.Uint64("SnapshotNumPartitionsOverride", uint64(cdcConfigUpdate.SnapshotNumPartitionsOverride)),
429425
slog.Uint64("SnapshotMaxParallelWorkers", uint64(cdcConfigUpdate.SnapshotMaxParallelWorkers)),
430426
slog.Uint64("SnapshotNumTablesInParallel", uint64(cdcConfigUpdate.SnapshotNumTablesInParallel)),
431-
slog.Bool("SkipInitialSnapshotForTableAdditions", cdcConfigUpdate.SkipInitialSnapshotForTableAdditions),
432427
)
433428
})
434429
}
435430

436431
func CDCFlowWorkflow(
437432
ctx workflow.Context,
438-
flowJobName string,
439-
// cfg *protos.FlowConnectionConfigs,
433+
cfg *protos.FlowConnectionConfigs,
440434
state *CDCFlowWorkflowState,
441435
) (*CDCFlowWorkflowResult, error) {
442-
cfg, err := internal.FetchConfigFromDB(flowJobName)
443-
if cfg == nil {
444-
return nil, errors.New("invalid connection configs")
445-
}
436+
// Fetch full config from DB using the flowJobName from the minimal config
437+
cfgFromDB, err := internal.FetchConfigFromDB(cfg.FlowJobName)
446438
if err != nil {
447-
return nil, fmt.Errorf("unable to unmarshal flow config: %w", err)
439+
return nil, fmt.Errorf("unable to fetch flow config from DB: %w", err)
440+
}
441+
if cfgFromDB == nil {
442+
return nil, errors.New("invalid connection configs")
448443
}
444+
// Use the config from DB for all operations
445+
cfg = cfgFromDB
449446

450447
logger := log.With(workflow.GetLogger(ctx), slog.String(string(shared.FlowNameKey), cfg.FlowJobName))
451448
if state == nil {
@@ -533,7 +530,7 @@ func CDCFlowWorkflow(
533530

534531
logger.Info("mirror resumed", slog.Duration("after", time.Since(startTime)))
535532
state.updateStatus(ctx, logger, protos.FlowStatus_STATUS_RUNNING)
536-
return state, workflow.NewContinueAsNewError(ctx, CDCFlowWorkflow, cfg.FlowJobName, state)
533+
return state, workflow.NewContinueAsNewError(ctx, CDCFlowWorkflow, internal.CreateMinimalConfigFromFlowJobName(cfg.FlowJobName), state)
537534
}
538535

539536
originalRunID := workflow.GetInfo(ctx).OriginalRunID
@@ -678,7 +675,7 @@ func CDCFlowWorkflow(
678675
}
679676
}
680677

681-
// TODOAS: here we will also store the table mappings in the state.
678+
// here we will also store the table mappings in the state.
682679
maps.Copy(cfg.SrcTableIdNameMapping, setupFlowOutput.SrcTableIdNameMapping)
683680
uploadConfigToCatalog(ctx, cfg)
684681

@@ -701,7 +698,7 @@ func CDCFlowWorkflow(
701698
// during any operation that triggers another snapshot (INCLUDING add tables).
702699
// this could fail for very weird Temporal resets
703700

704-
// TODOAS : this will send the additionalTables to `temporal`, meaning
701+
// This will send the additionalTables to `temporal`, meaning
705702
// that we cannot add too many tables at once, or we risk the blob is too
706703
// large (2MB limit).
707704
snapshotFlowFuture := workflow.ExecuteChildWorkflow(
@@ -803,7 +800,7 @@ func CDCFlowWorkflow(
803800
logger.Info("executed setup flow and snapshot flow, start running")
804801
state.updateStatus(ctx, logger, protos.FlowStatus_STATUS_RUNNING)
805802
}
806-
return state, workflow.NewContinueAsNewError(ctx, CDCFlowWorkflow, cfg.FlowJobName, state)
803+
return state, workflow.NewContinueAsNewError(ctx, CDCFlowWorkflow, internal.CreateMinimalConfigFromFlowJobName(cfg.FlowJobName), state)
807804
}
808805

809806
var finished bool
@@ -933,7 +930,7 @@ func CDCFlowWorkflow(
933930
if state.ActiveSignal == model.TerminateSignal || state.ActiveSignal == model.ResyncSignal {
934931
return state, workflow.NewContinueAsNewError(ctx, DropFlowWorkflow, state.DropFlowInput)
935932
}
936-
return state, workflow.NewContinueAsNewError(ctx, CDCFlowWorkflow, cfg.FlowJobName, state)
933+
return state, workflow.NewContinueAsNewError(ctx, CDCFlowWorkflow, internal.CreateMinimalConfigFromFlowJobName(cfg.FlowJobName), state)
937934
}
938935
}
939936
}

flow/workflows/drop_flow.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ import (
1111
"go.temporal.io/sdk/workflow"
1212

1313
"github.com/PeerDB-io/peerdb/flow/generated/protos"
14+
"github.com/PeerDB-io/peerdb/flow/internal"
1415
"github.com/PeerDB-io/peerdb/flow/model"
1516
"github.com/PeerDB-io/peerdb/flow/shared"
1617
)
@@ -211,7 +212,7 @@ func DropFlowWorkflow(ctx workflow.Context, input *protos.DropFlowInput) error {
211212
}
212213

213214
if input.Resync {
214-
return workflow.NewContinueAsNewError(ctx, CDCFlowWorkflow, input.FlowConnectionConfigs, nil)
215+
return workflow.NewContinueAsNewError(ctx, CDCFlowWorkflow, internal.CreateMinimalConfigFromFlowJobName(input.FlowConnectionConfigs.FlowJobName), nil)
215216
}
216217

217218
return nil

0 commit comments

Comments
 (0)