Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .github/workflows/flow.yml
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ jobs:
image: ghcr.io/shopify/toxiproxy:2.12.0@sha256:9378ed52a28bc50edc1350f936f518f31fa95f0d15917d6eb40b8e376d1a214e
ports:
- 18474:8474
- 9902:9902
- 42001:42001
- 42002:42002
- 42003:42003
Expand Down
105 changes: 79 additions & 26 deletions flow/cmd/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,11 @@ import (
"time"

"github.com/google/uuid"
"github.com/jackc/pgerrcode"
"github.com/jackc/pgx/v5"
"github.com/jackc/pgx/v5/pgtype"
tEnums "go.temporal.io/api/enums/v1"
"go.temporal.io/api/serviceerror"
"go.temporal.io/api/workflowservice/v1"
"go.temporal.io/sdk/client"
"google.golang.org/protobuf/proto"
Expand Down Expand Up @@ -57,8 +59,14 @@ func (h *FlowRequestHandler) getPeerID(ctx context.Context, peerName string) (in
return id.Int32, nil
}

func (h *FlowRequestHandler) cdcJobEntryExists(ctx context.Context, flowJobName string) (bool, error) {
var exists bool
err := h.pool.QueryRow(ctx, `SELECT EXISTS(SELECT 1 FROM flows WHERE name = $1)`, flowJobName).Scan(&exists)
return exists, err
}

func (h *FlowRequestHandler) createCdcJobEntry(ctx context.Context,
req *protos.CreateCDCFlowRequest, workflowID string,
req *protos.CreateCDCFlowRequest, workflowID string, idempotent bool,
) error {
sourcePeerID, srcErr := h.getPeerID(ctx, req.ConnectionConfigs.SourceName)
if srcErr != nil {
Expand All @@ -77,11 +85,11 @@ func (h *FlowRequestHandler) createCdcJobEntry(ctx context.Context,
return fmt.Errorf("unable to marshal flow config: %w", err)
}

if _, err := h.pool.Exec(ctx,
`INSERT INTO flows (workflow_id, name, source_peer, destination_peer, config_proto, status,
description, source_table_identifier, destination_table_identifier) VALUES ($1,$2,$3,$4,$5,$6,'gRPC','','')`,
if _, err = h.pool.Exec(ctx,
`INSERT INTO flows (workflow_id, name, source_peer, destination_peer, config_proto, status, description)
VALUES ($1,$2,$3,$4,$5,$6,'gRPC')`,
workflowID, req.ConnectionConfigs.FlowJobName, sourcePeerID, destinationPeerID, cfgBytes, protos.FlowStatus_STATUS_SETUP,
); err != nil {
); err != nil && !(idempotent && shared.IsSQLStateError(err, pgerrcode.UniqueViolation)) {
return fmt.Errorf("unable to insert into flows table for flow %s: %w",
req.ConnectionConfigs.FlowJobName, err)
}
Expand Down Expand Up @@ -113,9 +121,8 @@ func (h *FlowRequestHandler) createQRepJobEntry(ctx context.Context,

flowName := req.QrepConfig.FlowJobName
if _, err := h.pool.Exec(ctx, `INSERT INTO flows(workflow_id,name,source_peer,destination_peer,config_proto,status,
description, destination_table_identifier, query_string) VALUES ($1,$2,$3,$4,$5,$6,'gRPC',$7,$8)
description, query_string) VALUES ($1,$2,$3,$4,$5,$6,'gRPC',$7)
`, workflowID, flowName, sourcePeerID, destinationPeerID, cfgBytes, protos.FlowStatus_STATUS_RUNNING,
req.QrepConfig.DestinationTableIdentifier,
req.QrepConfig.Query,
); err != nil {
return fmt.Errorf("unable to insert into flows table for flow %s with source table %s: %w",
Expand All @@ -135,30 +142,74 @@ func (h *FlowRequestHandler) CreateCDCFlow(
}
cfg.Version = internalVersion

// For resync, we validate the mirror before dropping it and getting to this step.
// There is no point validating again here if it's a resync - the mirror is dropped already
if !cfg.Resync {
if _, err := h.ValidateCDCMirror(ctx, req); err != nil {
slog.ErrorContext(ctx, "validate mirror error", slog.Any("error", err))
return nil, err
if !req.AttachToExisting {
if exists, err := h.cdcJobEntryExists(ctx, cfg.FlowJobName); err != nil {
return nil, NewInternalApiError(fmt.Errorf("unable to check flow job entry: %w", err))
} else if exists {
return nil, NewAlreadyExistsApiError(fmt.Errorf("flow already exists: %s", cfg.FlowJobName))
}
}

workflowID := fmt.Sprintf("%s-peerflow-%s", cfg.FlowJobName, uuid.New())
workflowID := getWorkflowID(cfg.FlowJobName)
var errNotFound *serviceerror.NotFound
desc, err := h.temporalClient.DescribeWorkflow(ctx, workflowID, "")
if err != nil && !errors.As(err, &errNotFound) {
return nil, NewInternalApiError(fmt.Errorf("failed to query the workflow execution: %w", err))
} else if err == nil {
// If workflow is actively running, handle based on AttachToExisting
// Workflows in terminal states are fine
if desc.WorkflowExecutionMetadata.Status == tEnums.WORKFLOW_EXECUTION_STATUS_RUNNING ||
desc.WorkflowExecutionMetadata.Status == tEnums.WORKFLOW_EXECUTION_STATUS_CONTINUED_AS_NEW {
if req.AttachToExisting {
// Idempotent attach to running workflow
return &protos.CreateCDCFlowResponse{
WorkflowId: workflowID,
}, nil
} else {
// Can't create duplicate of running workflow
return nil, NewAlreadyExistsApiError(fmt.Errorf("workflow already exists for flow: %s", cfg.FlowJobName))
}
}
}
// No running workflow, do the validations and start a new one

// Use idempotent validation that skips mirror existence check
if _, err := h.validateCDCMirrorImpl(ctx, req, true); err != nil {
slog.ErrorContext(ctx, "validate mirror error", slog.Any("error", err))
return nil, NewInternalApiError(fmt.Errorf("invalid mirror: %w", err))
}

if resp, err := h.createCDCFlow(ctx, req, workflowID); err != nil {
return nil, NewInternalApiError(err)
} else {
return resp, nil
}
}

func getWorkflowID(flowName string) string {
return flowName + "-peerflow"
}

func (h *FlowRequestHandler) createCDCFlow(
ctx context.Context, req *protos.CreateCDCFlowRequest, workflowID string,
) (*protos.CreateCDCFlowResponse, error) {
cfg := req.ConnectionConfigs
workflowOptions := client.StartWorkflowOptions{
ID: workflowID,
TaskQueue: h.peerflowTaskQueueID,
TypedSearchAttributes: shared.NewSearchAttributes(cfg.FlowJobName),
ID: workflowID,
TaskQueue: h.peerflowTaskQueueID,
TypedSearchAttributes: shared.NewSearchAttributes(cfg.FlowJobName),
WorkflowIDConflictPolicy: tEnums.WORKFLOW_ID_CONFLICT_POLICY_USE_EXISTING, // two racing requests end up with the same workflow
WorkflowIDReusePolicy: tEnums.WORKFLOW_ID_REUSE_POLICY_ALLOW_DUPLICATE, // but creating the same id as a completed one is allowed
}

if err := h.createCdcJobEntry(ctx, req, workflowID); err != nil {
if err := h.createCdcJobEntry(ctx, req, workflowID, true); err != nil {
slog.ErrorContext(ctx, "unable to create flow job entry", slog.Any("error", err))
return nil, NewInternalApiError(fmt.Errorf("unable to create flow job entry: %w", err))
return nil, fmt.Errorf("unable to create flow job entry: %w", err)
}

if _, err := h.temporalClient.ExecuteWorkflow(ctx, workflowOptions, peerflow.CDCFlowWorkflow, cfg, nil); err != nil {
slog.ErrorContext(ctx, "unable to start PeerFlow workflow", slog.Any("error", err))
return nil, NewInternalApiError(fmt.Errorf("unable to start PeerFlow workflow: %w", err))
return nil, fmt.Errorf("unable to start PeerFlow workflow: %w", err)
}

return &protos.CreateCDCFlowResponse{
Expand Down Expand Up @@ -257,6 +308,7 @@ func (h *FlowRequestHandler) shutdownFlow(
DropFlowStats: deleteStats,
FlowConnectionConfigs: cdcConfig,
SkipDestinationDrop: skipDestinationDrop,
// NOTE: Resync is false here during snapshot-only resync
})
if err != nil {
slog.ErrorContext(ctx, "unable to start DropFlow workflow", logs, slog.Any("error", err))
Expand Down Expand Up @@ -344,7 +396,7 @@ func (h *FlowRequestHandler) FlowStateChange(
}
case protos.FlowStatus_STATUS_RESYNC:
if currState == protos.FlowStatus_STATUS_COMPLETED {
changeErr = h.resyncMirror(ctx, req.FlowJobName, req.DropMirrorStats)
changeErr = h.resyncCompletedSnapshot(ctx, req.FlowJobName, req.DropMirrorStats)
} else if isCDC, err := h.isCDCFlow(ctx, req.FlowJobName); err != nil {
return nil, NewInternalApiError(fmt.Errorf("unable to determine if mirror is cdc: %w", err))
} else if !isCDC {
Expand Down Expand Up @@ -489,8 +541,7 @@ func (h *FlowRequestHandler) getWorkflowID(ctx context.Context, flowJobName stri
return workflowID, nil
}

// only supports CDC resync for now
func (h *FlowRequestHandler) resyncMirror(
func (h *FlowRequestHandler) resyncCompletedSnapshot(
ctx context.Context,
flowName string,
dropStats bool,
Expand Down Expand Up @@ -527,9 +578,11 @@ func (h *FlowRequestHandler) resyncMirror(
return err
}

if _, err := h.CreateCDCFlow(ctx, &protos.CreateCDCFlowRequest{
ConnectionConfigs: config,
}); err != nil {
workflowID := getWorkflowID(config.FlowJobName)
if _, err := h.createCDCFlow(ctx,
&protos.CreateCDCFlowRequest{ConnectionConfigs: config},
workflowID,
); err != nil {
return err
}
return nil
Expand Down
9 changes: 8 additions & 1 deletion flow/cmd/validate_mirror.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,12 @@ var CustomColumnTypeRegex = regexp.MustCompile(`^$|^[a-zA-Z][a-zA-Z0-9(),]*$`)

func (h *FlowRequestHandler) ValidateCDCMirror(
ctx context.Context, req *protos.CreateCDCFlowRequest,
) (*protos.ValidateCDCMirrorResponse, APIError) {
return h.validateCDCMirrorImpl(ctx, req, false)
}

func (h *FlowRequestHandler) validateCDCMirrorImpl(
ctx context.Context, req *protos.CreateCDCFlowRequest, idempotent bool,
) (*protos.ValidateCDCMirrorResponse, APIError) {
ctx = context.WithValue(ctx, shared.FlowNameKey, req.ConnectionConfigs.FlowJobName)
underMaintenance, err := internal.PeerDBMaintenanceModeEnabled(ctx, nil)
Expand All @@ -30,7 +36,8 @@ func (h *FlowRequestHandler) ValidateCDCMirror(
return nil, NewUnavailableApiError(ErrUnderMaintenance)
}

if !req.ConnectionConfigs.Resync {
// Skip mirror existence check when idempotent (for managed creates)
if !idempotent && !req.ConnectionConfigs.Resync {
mirrorExists, existCheckErr := h.checkIfMirrorNameExists(ctx, req.ConnectionConfigs.FlowJobName)
if existCheckErr != nil {
slog.ErrorContext(ctx, "/validatecdc failed to check if mirror name exists", slog.Any("error", existCheckErr))
Expand Down
Loading
Loading