diff --git a/.github/workflows/flow.yml b/.github/workflows/flow.yml index ed856ec317..e92fadfeff 100644 --- a/.github/workflows/flow.yml +++ b/.github/workflows/flow.yml @@ -61,6 +61,7 @@ jobs: image: ghcr.io/shopify/toxiproxy:2.12.0@sha256:9378ed52a28bc50edc1350f936f518f31fa95f0d15917d6eb40b8e376d1a214e ports: - 18474:8474 + - 9902:9902 - 42001:42001 - 42002:42002 - 42003:42003 diff --git a/flow/cmd/handler.go b/flow/cmd/handler.go index bd93d76333..a84fff909a 100644 --- a/flow/cmd/handler.go +++ b/flow/cmd/handler.go @@ -9,9 +9,11 @@ import ( "time" "github.com/google/uuid" + "github.com/jackc/pgerrcode" "github.com/jackc/pgx/v5" "github.com/jackc/pgx/v5/pgtype" tEnums "go.temporal.io/api/enums/v1" + "go.temporal.io/api/serviceerror" "go.temporal.io/api/workflowservice/v1" "go.temporal.io/sdk/client" "google.golang.org/protobuf/proto" @@ -57,8 +59,14 @@ func (h *FlowRequestHandler) getPeerID(ctx context.Context, peerName string) (in return id.Int32, nil } +func (h *FlowRequestHandler) cdcJobEntryExists(ctx context.Context, flowJobName string) (bool, error) { + var exists bool + err := h.pool.QueryRow(ctx, `SELECT EXISTS(SELECT 1 FROM flows WHERE name = $1)`, flowJobName).Scan(&exists) + return exists, err +} + func (h *FlowRequestHandler) createCdcJobEntry(ctx context.Context, - req *protos.CreateCDCFlowRequest, workflowID string, + req *protos.CreateCDCFlowRequest, workflowID string, idempotent bool, ) error { sourcePeerID, srcErr := h.getPeerID(ctx, req.ConnectionConfigs.SourceName) if srcErr != nil { @@ -77,11 +85,11 @@ func (h *FlowRequestHandler) createCdcJobEntry(ctx context.Context, return fmt.Errorf("unable to marshal flow config: %w", err) } - if _, err := h.pool.Exec(ctx, - `INSERT INTO flows (workflow_id, name, source_peer, destination_peer, config_proto, status, - description, source_table_identifier, destination_table_identifier) VALUES ($1,$2,$3,$4,$5,$6,'gRPC','','')`, + if _, err = h.pool.Exec(ctx, + `INSERT INTO flows (workflow_id, name, source_peer, destination_peer, config_proto, status, description) + VALUES ($1,$2,$3,$4,$5,$6,'gRPC')`, workflowID, req.ConnectionConfigs.FlowJobName, sourcePeerID, destinationPeerID, cfgBytes, protos.FlowStatus_STATUS_SETUP, - ); err != nil { + ); err != nil && !(idempotent && shared.IsSQLStateError(err, pgerrcode.UniqueViolation)) { return fmt.Errorf("unable to insert into flows table for flow %s: %w", req.ConnectionConfigs.FlowJobName, err) } @@ -113,9 +121,8 @@ func (h *FlowRequestHandler) createQRepJobEntry(ctx context.Context, flowName := req.QrepConfig.FlowJobName if _, err := h.pool.Exec(ctx, `INSERT INTO flows(workflow_id,name,source_peer,destination_peer,config_proto,status, - description, destination_table_identifier, query_string) VALUES ($1,$2,$3,$4,$5,$6,'gRPC',$7,$8) + description, query_string) VALUES ($1,$2,$3,$4,$5,$6,'gRPC',$7) `, workflowID, flowName, sourcePeerID, destinationPeerID, cfgBytes, protos.FlowStatus_STATUS_RUNNING, - req.QrepConfig.DestinationTableIdentifier, req.QrepConfig.Query, ); err != nil { return fmt.Errorf("unable to insert into flows table for flow %s with source table %s: %w", @@ -135,30 +142,74 @@ func (h *FlowRequestHandler) CreateCDCFlow( } cfg.Version = internalVersion - // For resync, we validate the mirror before dropping it and getting to this step. - // There is no point validating again here if it's a resync - the mirror is dropped already - if !cfg.Resync { - if _, err := h.ValidateCDCMirror(ctx, req); err != nil { - slog.ErrorContext(ctx, "validate mirror error", slog.Any("error", err)) - return nil, err + if !req.AttachToExisting { + if exists, err := h.cdcJobEntryExists(ctx, cfg.FlowJobName); err != nil { + return nil, NewInternalApiError(fmt.Errorf("unable to check flow job entry: %w", err)) + } else if exists { + return nil, NewAlreadyExistsApiError(fmt.Errorf("flow already exists: %s", cfg.FlowJobName)) } } - workflowID := fmt.Sprintf("%s-peerflow-%s", cfg.FlowJobName, uuid.New()) + workflowID := getWorkflowID(cfg.FlowJobName) + var errNotFound *serviceerror.NotFound + desc, err := h.temporalClient.DescribeWorkflow(ctx, workflowID, "") + if err != nil && !errors.As(err, &errNotFound) { + return nil, NewInternalApiError(fmt.Errorf("failed to query the workflow execution: %w", err)) + } else if err == nil { + // If workflow is actively running, handle based on AttachToExisting + // Workflows in terminal states are fine + if desc.WorkflowExecutionMetadata.Status == tEnums.WORKFLOW_EXECUTION_STATUS_RUNNING || + desc.WorkflowExecutionMetadata.Status == tEnums.WORKFLOW_EXECUTION_STATUS_CONTINUED_AS_NEW { + if req.AttachToExisting { + // Idempotent attach to running workflow + return &protos.CreateCDCFlowResponse{ + WorkflowId: workflowID, + }, nil + } else { + // Can't create duplicate of running workflow + return nil, NewAlreadyExistsApiError(fmt.Errorf("workflow already exists for flow: %s", cfg.FlowJobName)) + } + } + } + // No running workflow, do the validations and start a new one + + // Use idempotent validation that skips mirror existence check + if _, err := h.validateCDCMirrorImpl(ctx, req, true); err != nil { + slog.ErrorContext(ctx, "validate mirror error", slog.Any("error", err)) + return nil, NewInternalApiError(fmt.Errorf("invalid mirror: %w", err)) + } + + if resp, err := h.createCDCFlow(ctx, req, workflowID); err != nil { + return nil, NewInternalApiError(err) + } else { + return resp, nil + } +} + +func getWorkflowID(flowName string) string { + return flowName + "-peerflow" +} + +func (h *FlowRequestHandler) createCDCFlow( + ctx context.Context, req *protos.CreateCDCFlowRequest, workflowID string, +) (*protos.CreateCDCFlowResponse, error) { + cfg := req.ConnectionConfigs workflowOptions := client.StartWorkflowOptions{ - ID: workflowID, - TaskQueue: h.peerflowTaskQueueID, - TypedSearchAttributes: shared.NewSearchAttributes(cfg.FlowJobName), + ID: workflowID, + TaskQueue: h.peerflowTaskQueueID, + TypedSearchAttributes: shared.NewSearchAttributes(cfg.FlowJobName), + WorkflowIDConflictPolicy: tEnums.WORKFLOW_ID_CONFLICT_POLICY_USE_EXISTING, // two racing requests end up with the same workflow + WorkflowIDReusePolicy: tEnums.WORKFLOW_ID_REUSE_POLICY_ALLOW_DUPLICATE, // but creating the same id as a completed one is allowed } - if err := h.createCdcJobEntry(ctx, req, workflowID); err != nil { + if err := h.createCdcJobEntry(ctx, req, workflowID, true); err != nil { slog.ErrorContext(ctx, "unable to create flow job entry", slog.Any("error", err)) - return nil, NewInternalApiError(fmt.Errorf("unable to create flow job entry: %w", err)) + return nil, fmt.Errorf("unable to create flow job entry: %w", err) } if _, err := h.temporalClient.ExecuteWorkflow(ctx, workflowOptions, peerflow.CDCFlowWorkflow, cfg, nil); err != nil { slog.ErrorContext(ctx, "unable to start PeerFlow workflow", slog.Any("error", err)) - return nil, NewInternalApiError(fmt.Errorf("unable to start PeerFlow workflow: %w", err)) + return nil, fmt.Errorf("unable to start PeerFlow workflow: %w", err) } return &protos.CreateCDCFlowResponse{ @@ -257,6 +308,7 @@ func (h *FlowRequestHandler) shutdownFlow( DropFlowStats: deleteStats, FlowConnectionConfigs: cdcConfig, SkipDestinationDrop: skipDestinationDrop, + // NOTE: Resync is false here during snapshot-only resync }) if err != nil { slog.ErrorContext(ctx, "unable to start DropFlow workflow", logs, slog.Any("error", err)) @@ -344,7 +396,7 @@ func (h *FlowRequestHandler) FlowStateChange( } case protos.FlowStatus_STATUS_RESYNC: if currState == protos.FlowStatus_STATUS_COMPLETED { - changeErr = h.resyncMirror(ctx, req.FlowJobName, req.DropMirrorStats) + changeErr = h.resyncCompletedSnapshot(ctx, req.FlowJobName, req.DropMirrorStats) } else if isCDC, err := h.isCDCFlow(ctx, req.FlowJobName); err != nil { return nil, NewInternalApiError(fmt.Errorf("unable to determine if mirror is cdc: %w", err)) } else if !isCDC { @@ -489,8 +541,7 @@ func (h *FlowRequestHandler) getWorkflowID(ctx context.Context, flowJobName stri return workflowID, nil } -// only supports CDC resync for now -func (h *FlowRequestHandler) resyncMirror( +func (h *FlowRequestHandler) resyncCompletedSnapshot( ctx context.Context, flowName string, dropStats bool, @@ -527,9 +578,11 @@ func (h *FlowRequestHandler) resyncMirror( return err } - if _, err := h.CreateCDCFlow(ctx, &protos.CreateCDCFlowRequest{ - ConnectionConfigs: config, - }); err != nil { + workflowID := getWorkflowID(config.FlowJobName) + if _, err := h.createCDCFlow(ctx, + &protos.CreateCDCFlowRequest{ConnectionConfigs: config}, + workflowID, + ); err != nil { return err } return nil diff --git a/flow/cmd/validate_mirror.go b/flow/cmd/validate_mirror.go index 8bfcc46050..58f01871f4 100644 --- a/flow/cmd/validate_mirror.go +++ b/flow/cmd/validate_mirror.go @@ -17,6 +17,12 @@ var CustomColumnTypeRegex = regexp.MustCompile(`^$|^[a-zA-Z][a-zA-Z0-9(),]*$`) func (h *FlowRequestHandler) ValidateCDCMirror( ctx context.Context, req *protos.CreateCDCFlowRequest, +) (*protos.ValidateCDCMirrorResponse, APIError) { + return h.validateCDCMirrorImpl(ctx, req, false) +} + +func (h *FlowRequestHandler) validateCDCMirrorImpl( + ctx context.Context, req *protos.CreateCDCFlowRequest, idempotent bool, ) (*protos.ValidateCDCMirrorResponse, APIError) { ctx = context.WithValue(ctx, shared.FlowNameKey, req.ConnectionConfigs.FlowJobName) underMaintenance, err := internal.PeerDBMaintenanceModeEnabled(ctx, nil) @@ -30,7 +36,8 @@ func (h *FlowRequestHandler) ValidateCDCMirror( return nil, NewUnavailableApiError(ErrUnderMaintenance) } - if !req.ConnectionConfigs.Resync { + // Skip mirror existence check when idempotent (for managed creates) + if !idempotent && !req.ConnectionConfigs.Resync { mirrorExists, existCheckErr := h.checkIfMirrorNameExists(ctx, req.ConnectionConfigs.FlowJobName) if existCheckErr != nil { slog.ErrorContext(ctx, "/validatecdc failed to check if mirror name exists", slog.Any("error", existCheckErr)) diff --git a/flow/e2e/api_test.go b/flow/e2e/api_test.go index 7f4fdbdc4d..56073021e4 100644 --- a/flow/e2e/api_test.go +++ b/flow/e2e/api_test.go @@ -6,14 +6,18 @@ import ( "fmt" "slices" "strings" + "sync" "testing" "time" + tp "github.com/Shopify/toxiproxy/v2/client" "github.com/jackc/pgx/v5" "github.com/jackc/pgx/v5/pgtype" "github.com/stretchr/testify/require" "go.mongodb.org/mongo-driver/v2/bson" "go.mongodb.org/mongo-driver/v2/mongo/options" + "go.temporal.io/api/enums/v1" + "go.temporal.io/api/workflowservice/v1" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" "google.golang.org/protobuf/proto" @@ -1113,18 +1117,19 @@ func (s APITestSuite) TestQRep() { PeerName: s.source.GeneratePeer(s.t).Name, }) require.NoError(s.t, err) - tblName := "qrepapi" + s.suffix - schemaQualified := AttachSchema(s, tblName) + tableName := AddSuffix(s, "qrepapi") + schemaQualified := AttachSchema(s, tableName) require.NoError(s.t, s.source.Exec(s.t.Context(), fmt.Sprintf("CREATE TABLE %s(id int primary key, val text)", schemaQualified))) require.NoError(s.t, s.source.Exec(s.t.Context(), fmt.Sprintf("INSERT INTO %s(id, val) values (1,'first')", schemaQualified))) + flowName := fmt.Sprintf("qrepapiflow_%s_%s", peerType.PeerType, s.suffix) qrepConfig := CreateQRepWorkflowConfig( s.t, - "qrepapiflow"+"_"+peerType.PeerType, + flowName, schemaQualified, - tblName, + tableName, fmt.Sprintf("SELECT * FROM %s WHERE id BETWEEN {{.start}} AND {{.end}}", schemaQualified), s.ch.Peer().Name, "", @@ -1146,12 +1151,12 @@ func (s APITestSuite) TestQRep() { env, err := GetPeerflow(s.t.Context(), s.pg.PostgresConnector.Conn(), tc, qrepConfig.FlowJobName) require.NoError(s.t, err) - EnvWaitForEqualTables(env, s.ch, "qrep initial load", tblName, "id,val") + EnvWaitForEqualTables(env, s.ch, "qrep initial load", tableName, "id,val") require.NoError(s.t, s.source.Exec(s.t.Context(), fmt.Sprintf("INSERT INTO %s(id, val) values (2,'second')", schemaQualified))) - EnvWaitForEqualTables(env, s.ch, "insert post qrep initial load", tblName, "id,val") + EnvWaitForEqualTables(env, s.ch, "insert post qrep initial load", tableName, "id,val") statusResponse, err := s.MirrorStatus(s.t.Context(), &protos.MirrorStatusRequest{ FlowJobName: qrepConfig.FlowJobName, IncludeFlowInfo: true, @@ -1293,3 +1298,434 @@ func (s APITestSuite) TestDropMissing() { }) require.NoError(s.t, err) } + +func (s APITestSuite) TestCreateCDCFlowAttachConcurrentRequests() { + // Test: two concurrent requests succeed + if _, ok := s.source.(*PostgresSource); !ok { + s.t.Skip("only testing with PostgreSQL") + } + + tableName := "concurrent_test" + require.NoError(s.t, s.source.Exec(s.t.Context(), + fmt.Sprintf("CREATE TABLE %s(id int primary key, val text)", AttachSchema(s, tableName)))) + require.NoError(s.t, s.source.Exec(s.t.Context(), + fmt.Sprintf("INSERT INTO %s(id, val) values (1,'first')", AttachSchema(s, tableName)))) + + connectionGen := FlowConnectionGenerationConfig{ + FlowJobName: "create_concurrent_" + s.suffix, + TableNameMapping: map[string]string{AttachSchema(s, tableName): tableName}, + Destination: s.ch.Peer().Name, + } + flowConnConfig := connectionGen.GenerateFlowConnectionConfigs(s) + flowConnConfig.DoInitialSnapshot = true + + // Two concurrent requests should succeed and return the same workflow ID + var wg sync.WaitGroup + var response1, response2 *protos.CreateCDCFlowResponse + var err1, err2 error + + wg.Add(2) + go func() { + defer wg.Done() + response1, err1 = s.CreateCDCFlow(s.t.Context(), &protos.CreateCDCFlowRequest{ + ConnectionConfigs: flowConnConfig, + AttachToExisting: true, + }) + }() + go func() { + defer wg.Done() + response2, err2 = s.CreateCDCFlow(s.t.Context(), &protos.CreateCDCFlowRequest{ + ConnectionConfigs: flowConnConfig, + AttachToExisting: true, + }) + }() + wg.Wait() + + require.NoError(s.t, err1) + require.NoError(s.t, err2) + require.NotNil(s.t, response1) + require.NotNil(s.t, response2) + require.Equal(s.t, response1.WorkflowId, response2.WorkflowId) + + // Verify workflow is actually running + tc := NewTemporalClient(s.t) + env, err := GetPeerflow(s.t.Context(), s.pg.PostgresConnector.Conn(), tc, flowConnConfig.FlowJobName) + require.NoError(s.t, err) + SetupCDCFlowStatusQuery(s.t, env, flowConnConfig) + EnvWaitFor(s.t, env, 3*time.Minute, "wait for flow to be running", func() bool { + return env.GetFlowStatus(s.t) == protos.FlowStatus_STATUS_RUNNING + }) + + // Clean up + env.Cancel(s.t.Context()) + RequireEnvCanceled(s.t, env) +} + +func (s APITestSuite) TestCreateCDCFlowAttachConcurrentRequestsToxi() { + // Test: use Toxiproxy to ensure concurrent requests are truly concurrent + + // To run locally, requires toxiproxy running: + // docker run -d \ + // --name peerdb-toxiproxy \ + // -p 18474:8474 \ + // -p 9902:9902 \ + // ghcr.io/shopify/toxiproxy:2.11.0 + + if _, ok := s.source.(*PostgresSource); !ok { + s.t.Skip("only testing with PostgreSQL") + } + + // Setup PostgreSQL with Toxiproxy + suffix := "race_" + s.suffix + pgWithProxy, proxy, err := SetupPostgresWithToxiproxy(s.t, suffix) + require.NoError(s.t, err) + defer pgWithProxy.Teardown(s.t, s.t.Context(), suffix) + + // Create table + tableName := "toxiproxy_race_test" + require.NoError(s.t, pgWithProxy.Exec(s.t.Context(), + fmt.Sprintf("CREATE TABLE e2e_test_%s.%s(id int primary key, val text)", suffix, tableName))) + + // Create peer for the proxy connection + proxyConfig := internal.GetCatalogPostgresConfigFromEnv(s.t.Context()) + proxyConfig.Port = uint32(9902) + proxyPeer := &protos.Peer{ + Name: "proxy_postgres_" + suffix, + Type: protos.DBType_POSTGRES, + Config: &protos.Peer_PostgresConfig{ + PostgresConfig: proxyConfig, + }, + } + CreatePeer(s.t, proxyPeer) + defer func() { + _, _ = s.DropPeer(s.t.Context(), &protos.DropPeerRequest{PeerName: proxyPeer.Name}) + }() + + connectionGen := FlowConnectionGenerationConfig{ + FlowJobName: "create_concurrent_toxi_" + suffix, + TableNameMapping: map[string]string{ + fmt.Sprintf("e2e_test_%s.%s", suffix, tableName): tableName, + }, + Destination: s.ch.Peer().Name, + } + flowConnConfig := connectionGen.GenerateFlowConnectionConfigs(s) + flowConnConfig.DoInitialSnapshot = true + flowConnConfig.SourceName = proxyPeer.Name + + // Add latency toxic to ensure concurrent execution + const toxicDelay = 2 * time.Second + toxic, err := proxy.AddToxic("latency", "latency", "downstream", 1.0, tp.Attributes{ + "latency": int(toxicDelay.Milliseconds()), + }) + require.NoError(s.t, err) + + // Start concurrent requests + var wg sync.WaitGroup + var response1, response2 *protos.CreateCDCFlowResponse + var err1, err2 error + var duration1, duration2 time.Duration + + wg.Add(2) + go func() { + defer wg.Done() + start := time.Now() + response1, err1 = s.CreateCDCFlow(s.t.Context(), &protos.CreateCDCFlowRequest{ + ConnectionConfigs: flowConnConfig, + AttachToExisting: true, + }) + duration1 = time.Since(start) + }() + go func() { + defer wg.Done() + start := time.Now() + response2, err2 = s.CreateCDCFlow(s.t.Context(), &protos.CreateCDCFlowRequest{ + ConnectionConfigs: flowConnConfig, + AttachToExisting: true, + }) + duration2 = time.Since(start) + }() + + // Let goroutines start with toxic active + time.Sleep(toxicDelay) + + // Remove toxic so requests can complete + err = proxy.RemoveToxic(toxic.Name) + require.NoError(s.t, err) + + wg.Wait() + + // Verify both requests were delayed by toxic + require.Greater(s.t, duration1, toxicDelay) + require.Greater(s.t, duration2, toxicDelay) + + // Verify both succeeded with same workflow ID + require.NoError(s.t, err1) + require.NoError(s.t, err2) + require.NotNil(s.t, response1) + require.NotNil(s.t, response2) + require.Equal(s.t, response1.WorkflowId, response2.WorkflowId) + + // Verify workflow is actually running + tc := NewTemporalClient(s.t) + env, err := GetPeerflow(s.t.Context(), s.pg.PostgresConnector.Conn(), tc, flowConnConfig.FlowJobName) + require.NoError(s.t, err) + SetupCDCFlowStatusQuery(s.t, env, flowConnConfig) + EnvWaitFor(s.t, env, 3*time.Minute, "wait for flow to be running", func() bool { + return env.GetFlowStatus(s.t) == protos.FlowStatus_STATUS_RUNNING + }) + + // Clean up + env.Cancel(s.t.Context()) + RequireEnvCanceled(s.t, env) +} + +func (s APITestSuite) TestCreateCDCFlowAttachSequentialRequests() { + // Test: two sequential requests succeed, same workflow is returned + if _, ok := s.source.(*PostgresSource); !ok { + s.t.Skip("only testing with PostgreSQL") + } + + tableName := "sequential_test" + require.NoError(s.t, s.source.Exec(s.t.Context(), + fmt.Sprintf("CREATE TABLE %s(id int primary key, val text)", AttachSchema(s, tableName)))) + require.NoError(s.t, s.source.Exec(s.t.Context(), + fmt.Sprintf("INSERT INTO %s(id, val) values (1,'first')", AttachSchema(s, tableName)))) + + connectionGen := FlowConnectionGenerationConfig{ + FlowJobName: "create_sequential_" + s.suffix, + TableNameMapping: map[string]string{AttachSchema(s, tableName): tableName}, + Destination: s.ch.Peer().Name, + } + flowConnConfig := connectionGen.GenerateFlowConnectionConfigs(s) + flowConnConfig.DoInitialSnapshot = true + + // First request + response1, err1 := s.CreateCDCFlow(s.t.Context(), &protos.CreateCDCFlowRequest{ + ConnectionConfigs: flowConnConfig, + AttachToExisting: true, + }) + require.NoError(s.t, err1) + require.NotNil(s.t, response1) + + // Verify workflow is actually running + tc := NewTemporalClient(s.t) + env, err := GetPeerflow(s.t.Context(), s.pg.PostgresConnector.Conn(), tc, flowConnConfig.FlowJobName) + require.NoError(s.t, err) + SetupCDCFlowStatusQuery(s.t, env, flowConnConfig) + EnvWaitFor(s.t, env, 3*time.Minute, "wait for flow to be running", func() bool { + return env.GetFlowStatus(s.t) == protos.FlowStatus_STATUS_RUNNING + }) + + // Second sequential request should return the same workflow ID + response2, err2 := s.CreateCDCFlow(s.t.Context(), &protos.CreateCDCFlowRequest{ + ConnectionConfigs: flowConnConfig, + AttachToExisting: true, + }) + require.NoError(s.t, err2) + require.NotNil(s.t, response2) + require.Equal(s.t, response1.WorkflowId, response2.WorkflowId) + + // Clean up + env.Cancel(s.t.Context()) + RequireEnvCanceled(s.t, env) +} + +func (s APITestSuite) TestCreateCDCFlowAttachExternalFlowEntry() { + // Test: create a flows entry from the outside (simulate a crash before the workflow is created), do a request, should succeed + if _, ok := s.source.(*PostgresSource); !ok { + s.t.Skip("only testing with PostgreSQL") + } + + tableName := "external_entry_test" + require.NoError(s.t, s.source.Exec(s.t.Context(), + fmt.Sprintf("CREATE TABLE %s(id int primary key, val text)", AttachSchema(s, tableName)))) + + connectionGen := FlowConnectionGenerationConfig{ + FlowJobName: "create_external_" + s.suffix, + TableNameMapping: map[string]string{AttachSchema(s, tableName): tableName}, + Destination: s.ch.Peer().Name, + } + flowConnConfig := connectionGen.GenerateFlowConnectionConfigs(s) + flowConnConfig.DoInitialSnapshot = true + + // Simulate a crash: create flows entry without creating workflow + conn := s.pg.PostgresConnector.Conn() + sourcePeer := s.source.GeneratePeer(s.t) + destPeer, err := s.GetPeerInfo(s.t.Context(), &protos.PeerInfoRequest{PeerName: s.ch.Peer().Name}) + require.NoError(s.t, err) + + var sourcePeerID, destPeerID int32 + require.NoError(s.t, conn.QueryRow(s.t.Context(), + "SELECT id FROM peers WHERE name = $1", sourcePeer.Name).Scan(&sourcePeerID)) + require.NoError(s.t, conn.QueryRow(s.t.Context(), + "SELECT id FROM peers WHERE name = $1", destPeer.Peer.Name).Scan(&destPeerID)) + + cfgBytes, err := proto.Marshal(flowConnConfig) + require.NoError(s.t, err) + + workflowID := flowConnConfig.FlowJobName + "-peerflow" + _, err = conn.Exec(s.t.Context(), + `INSERT INTO flows (workflow_id, name, source_peer, destination_peer, config_proto, status, description) + VALUES ($1,$2,$3,$4,$5,$6,'gRPC')`, + workflowID, flowConnConfig.FlowJobName, sourcePeerID, destPeerID, cfgBytes, protos.FlowStatus_STATUS_SETUP, + ) + require.NoError(s.t, err) + + // Now call CreateCDCFlow - should start the workflow successfully + response, err := s.CreateCDCFlow(s.t.Context(), &protos.CreateCDCFlowRequest{ + ConnectionConfigs: flowConnConfig, + AttachToExisting: true, + }) + require.NoError(s.t, err) + require.NotNil(s.t, response) + require.Equal(s.t, workflowID, response.WorkflowId) + + // Verify workflow is created and running + tc := NewTemporalClient(s.t) + env, err := GetPeerflow(s.t.Context(), conn, tc, flowConnConfig.FlowJobName) + require.NoError(s.t, err) + SetupCDCFlowStatusQuery(s.t, env, flowConnConfig) + EnvWaitFor(s.t, env, 3*time.Minute, "wait for flow to be running", func() bool { + return env.GetFlowStatus(s.t) == protos.FlowStatus_STATUS_RUNNING + }) + + // Clean up + env.Cancel(s.t.Context()) + RequireEnvCanceled(s.t, env) +} + +func (s APITestSuite) TestCreateCDCFlowAttachCanceledWorkflow() { + // Test: when cdc flow workflow is failed/canceled, a new run can be created with the same workflow ID + if _, ok := s.source.(*PostgresSource); !ok { + s.t.Skip("only testing with PostgreSQL") + } + + tableName := "canceled_workflow_test" + require.NoError(s.t, s.source.Exec(s.t.Context(), + fmt.Sprintf("CREATE TABLE %s(id int primary key, val text)", AttachSchema(s, tableName)))) + + connectionGen := FlowConnectionGenerationConfig{ + FlowJobName: "create_canceled_" + s.suffix, + TableNameMapping: map[string]string{AttachSchema(s, tableName): tableName}, + Destination: s.ch.Peer().Name, + } + flowConnConfig := connectionGen.GenerateFlowConnectionConfigs(s) + flowConnConfig.DoInitialSnapshot = true + + // First create a normal flow + response1, err := s.CreateCDCFlow(s.t.Context(), &protos.CreateCDCFlowRequest{ + ConnectionConfigs: flowConnConfig, + AttachToExisting: true, + }) + require.NoError(s.t, err) + require.NotNil(s.t, response1) + + tc := NewTemporalClient(s.t) + env, err := GetPeerflow(s.t.Context(), s.pg.PostgresConnector.Conn(), tc, flowConnConfig.FlowJobName) + require.NoError(s.t, err) + + // Cancel the workflow to simulate failure + env.Cancel(s.t.Context()) + RequireEnvCanceled(s.t, env) + + // Wait for workflow to be canceled + var firstRunID string + EnvWaitFor(s.t, env, 30*time.Second, "wait for workflow to be canceled", func() bool { + desc, err := tc.DescribeWorkflowExecution(s.t.Context(), response1.WorkflowId, "") + if err != nil { + return false + } + status := desc.GetWorkflowExecutionInfo().GetStatus() + if status == enums.WORKFLOW_EXECUTION_STATUS_CANCELED { + firstRunID = desc.GetWorkflowExecutionInfo().GetExecution().GetRunId() + return true + } + return false + }) + + // Attempt to create again - should create a new workflow run with the same workflow ID + response2, err := s.CreateCDCFlow(s.t.Context(), &protos.CreateCDCFlowRequest{ + ConnectionConfigs: flowConnConfig, + AttachToExisting: true, + }) + require.NoError(s.t, err) + require.NotNil(s.t, response2) + require.Equal(s.t, response1.WorkflowId, response2.WorkflowId) + + // Verify a new workflow run was created (different run ID, status is RUNNING) + desc, err := tc.DescribeWorkflowExecution(s.t.Context(), response2.WorkflowId, "") + require.NoError(s.t, err) + require.Equal(s.t, enums.WORKFLOW_EXECUTION_STATUS_RUNNING, desc.GetWorkflowExecutionInfo().GetStatus()) + require.NotEqual(s.t, firstRunID, desc.GetWorkflowExecutionInfo().GetExecution().GetRunId(), + "should have created a new workflow run") + + // Clean up + env, err = GetPeerflow(s.t.Context(), s.pg.PostgresConnector.Conn(), tc, flowConnConfig.FlowJobName) + require.NoError(s.t, err) + env.Cancel(s.t.Context()) + RequireEnvCanceled(s.t, env) +} + +func (s APITestSuite) TestCreateCDCFlowAttachIdempotentAfterContinueAsNew() { + // Test: cdc flow workflow can continue-as-new and use the same id + if _, ok := s.source.(*PostgresSource); !ok { + s.t.Skip("only testing with PostgreSQL") + } + + tableName := "continue_as_new_test" + require.NoError(s.t, s.source.Exec(s.t.Context(), + fmt.Sprintf("CREATE TABLE %s(id int primary key, val text)", AttachSchema(s, tableName)))) + + connectionGen := FlowConnectionGenerationConfig{ + FlowJobName: "create_continue_" + s.suffix, + TableNameMapping: map[string]string{AttachSchema(s, tableName): tableName}, + Destination: s.ch.Peer().Name, + } + flowConnConfig := connectionGen.GenerateFlowConnectionConfigs(s) + flowConnConfig.DoInitialSnapshot = true + + // First create a normal flow + response1, err := s.CreateCDCFlow(s.t.Context(), &protos.CreateCDCFlowRequest{ + ConnectionConfigs: flowConnConfig, + AttachToExisting: true, + }) + require.NoError(s.t, err) + require.NotNil(s.t, response1) + + tc := NewTemporalClient(s.t) + env, err := GetPeerflow(s.t.Context(), s.pg.PostgresConnector.Conn(), tc, flowConnConfig.FlowJobName) + require.NoError(s.t, err) + SetupCDCFlowStatusQuery(s.t, env, flowConnConfig) + + // Wait for flow to be running + EnvWaitFor(s.t, env, 30*time.Second, "wait for flow to be running", func() bool { + return env.GetFlowStatus(s.t) == protos.FlowStatus_STATUS_RUNNING + }) + + // Check workflow executions - should have multiple due to continue-as-new during setup->running transition + listReq := &workflowservice.ListWorkflowExecutionsRequest{ + Namespace: "default", + Query: fmt.Sprintf("WorkflowId = '%s'", response1.WorkflowId), + } + listResp, err := tc.ListWorkflow(s.t.Context(), listReq) + require.NoError(s.t, err) + require.Greater(s.t, len(listResp.Executions), 1, "Should have multiple executions (continue-as-new happened)") + + // Call CreateCDCFlow again after continue-as-new - should return the same workflow ID + response2, err := s.CreateCDCFlow(s.t.Context(), &protos.CreateCDCFlowRequest{ + ConnectionConfigs: flowConnConfig, + AttachToExisting: true, + }) + require.NoError(s.t, err) + require.NotNil(s.t, response2) + require.Equal(s.t, response1.WorkflowId, response2.WorkflowId, "Should return same workflow ID after continue-as-new") + + // Verify workflow is still running + desc, err := tc.DescribeWorkflowExecution(s.t.Context(), response2.WorkflowId, "") + require.NoError(s.t, err) + require.Equal(s.t, enums.WORKFLOW_EXECUTION_STATUS_RUNNING, desc.GetWorkflowExecutionInfo().GetStatus()) + + // Clean up + env.Cancel(s.t.Context()) + RequireEnvCanceled(s.t, env) +} diff --git a/flow/e2e/bigquery_qrep_test.go b/flow/e2e/bigquery_qrep_test.go index f9797ed976..c37382d563 100644 --- a/flow/e2e/bigquery_qrep_test.go +++ b/flow/e2e/bigquery_qrep_test.go @@ -58,7 +58,9 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Complete_QRep_Flow_Avro() { query := fmt.Sprintf("SELECT * FROM e2e_test_%s.%s WHERE updated_at BETWEEN {{.start}} AND {{.end}}", s.bqSuffix, tblName) - qrepConfig := CreateQRepWorkflowConfig(s.t, "test_qrep_flow_avro", + jobName := AddSuffix(s, tblName) + qrepConfig := CreateQRepWorkflowConfig(s.t, + jobName, fmt.Sprintf("e2e_test_%s.%s", s.bqSuffix, tblName), tblName, query, @@ -82,7 +84,9 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Invalid_Timestamps_And_Date_QRep() { query := fmt.Sprintf("SELECT * FROM e2e_test_%s.%s WHERE watermark_ts BETWEEN {{.start}} AND {{.end}}", s.bqSuffix, tblName) - qrepConfig := CreateQRepWorkflowConfig(s.t, "test_invalid_time_bq", + jobName := AddSuffix(s, tblName) + qrepConfig := CreateQRepWorkflowConfig(s.t, + jobName, fmt.Sprintf("e2e_test_%s.%s", s.bqSuffix, tblName), tblName, query, @@ -123,7 +127,9 @@ func (s PeerFlowE2ETestSuiteBQ) Test_PeerDB_Columns_QRep_BQ() { query := fmt.Sprintf("SELECT * FROM e2e_test_%s.%s WHERE updated_at BETWEEN {{.start}} AND {{.end}}", s.bqSuffix, tblName) - qrepConfig := CreateQRepWorkflowConfig(s.t, "test_qrep_flow_avro", + jobName := AddSuffix(s, tblName) + qrepConfig := CreateQRepWorkflowConfig(s.t, + jobName, fmt.Sprintf("e2e_test_%s.%s", s.bqSuffix, tblName), tblName, query, diff --git a/flow/e2e/elasticsearch_qrep_test.go b/flow/e2e/elasticsearch_qrep_test.go index 229a5edbb6..0a7804c03e 100644 --- a/flow/e2e/elasticsearch_qrep_test.go +++ b/flow/e2e/elasticsearch_qrep_test.go @@ -16,7 +16,8 @@ func Test_Elasticsearch(t *testing.T) { } func (s elasticsearchSuite) Test_Simple_QRep_Append() { - srcTableName := AttachSchema(s, "es_simple_append") + jobName := AddSuffix(s, "test_es_simple_append") + srcTableName := AttachSchema(s, "test_es_simple_append") _, err := s.conn.Conn().Exec(s.t.Context(), fmt.Sprintf(` CREATE TABLE IF NOT EXISTS %s ( @@ -41,7 +42,8 @@ func (s elasticsearchSuite) Test_Simple_QRep_Append() { query := fmt.Sprintf("SELECT * FROM %s WHERE updated_at BETWEEN {{.start}} AND {{.end}}", srcTableName) - qrepConfig := CreateQRepWorkflowConfig(s.t, "test_es_simple_qrep", + qrepConfig := CreateQRepWorkflowConfig(s.t, + jobName, srcTableName, srcTableName, query, @@ -69,7 +71,8 @@ func (s elasticsearchSuite) Test_Simple_QRep_Append() { } func (s elasticsearchSuite) Test_Simple_QRep_Upsert() { - srcTableName := AttachSchema(s, "es_simple_upsert") + jobName := AddSuffix(s, "test_es_simple_upsert") + srcTableName := AttachSchema(s, "test_es_simple_upsert") _, err := s.conn.Conn().Exec(s.t.Context(), fmt.Sprintf(` CREATE TABLE IF NOT EXISTS %s ( @@ -94,7 +97,8 @@ func (s elasticsearchSuite) Test_Simple_QRep_Upsert() { query := fmt.Sprintf("SELECT * FROM %s WHERE updated_at BETWEEN {{.start}} AND {{.end}}", srcTableName) - qrepConfig := CreateQRepWorkflowConfig(s.t, "test_es_simple_qrep", + qrepConfig := CreateQRepWorkflowConfig(s.t, + jobName, srcTableName, srcTableName, query, diff --git a/flow/e2e/pg.go b/flow/e2e/pg.go index aeebe18ce9..49c86fb27c 100644 --- a/flow/e2e/pg.go +++ b/flow/e2e/pg.go @@ -4,10 +4,13 @@ import ( "context" "errors" "fmt" + "os" "strings" + "sync" "testing" "time" + tp "github.com/Shopify/toxiproxy/v2/client" "github.com/jackc/pgx/v5" "github.com/stretchr/testify/require" @@ -264,3 +267,113 @@ func (s *PostgresSource) GetLogCount(ctx context.Context, flowJobName, errorType return int(rows.Records[0][0].Value().(int64)), nil } + +// Toxiproxy support for testing concurrent scenarios + +var ( + toxiClient *tp.Client + toxiPostgresProxy *tp.Proxy + toxiOnce sync.Once + toxiProxyPort = 9902 + toxiAdminPort = 18474 +) + +// InitToxiproxy initializes the Toxiproxy client (singleton pattern) +func InitToxiproxy() error { + var err error + toxiOnce.Do(func() { + adminAddr := fmt.Sprintf("localhost:%d", toxiAdminPort) + toxiClient = tp.NewClient(adminAddr) + // Test connection + _, err = toxiClient.Proxies() + }) + return err +} + +// SetupPostgresWithToxiproxy creates a PostgreSQL source that connects through Toxiproxy +func SetupPostgresWithToxiproxy(t *testing.T, suffix string) (*PostgresSource, *tp.Proxy, error) { + t.Helper() + + // Initialize Toxiproxy client + if err := InitToxiproxy(); err != nil { + return nil, nil, fmt.Errorf("failed to init toxiproxy: %w", err) + } + + // Get or create proxy + proxy, err := GetPostgresToxicProxy(t) + if err != nil { + return nil, nil, err + } + + // Create config pointing to proxy + config := internal.GetCatalogPostgresConfigFromEnv(t.Context()) + config.Host = "localhost" + config.Port = uint32(toxiProxyPort) + // Don't set RequireTls - let it use the default from env + + // Rest is same as SetupPostgres + connector, err := connpostgres.NewPostgresConnector(t.Context(), nil, config) + if err != nil { + return nil, nil, fmt.Errorf("failed to create postgres connection: %w", err) + } + conn := connector.Conn() + + if err := cleanPostgres(t.Context(), conn, suffix); err != nil { + connector.Close() + return nil, nil, err + } + + if err := setupPostgresSchema(t, conn, suffix); err != nil { + connector.Close() + return nil, nil, err + } + + return &PostgresSource{PostgresConnector: connector}, proxy, nil +} + +// GetPostgresToxicProxy gets or creates the PostgreSQL proxy +func GetPostgresToxicProxy(t *testing.T) (*tp.Proxy, error) { + t.Helper() + + if toxiPostgresProxy == nil { + // Get upstream from environment configuration + config := internal.GetCatalogPostgresConfigFromEnv(context.Background()) + + // Allow override of upstream host for Toxiproxy + // In CI, Toxiproxy runs as a service container and needs to use service names + // while test code connects via localhost + upstreamHost := os.Getenv("TOXIPROXY_POSTGRES_HOST") + if upstreamHost == "" { + upstreamHost = config.Host + } + upstream := fmt.Sprintf("%s:%d", upstreamHost, config.Port) + + // Try to create proxy + proxy, err := toxiClient.CreateProxy("postgres", + fmt.Sprintf("0.0.0.0:%d", toxiProxyPort), + upstream) + if err != nil { + // Proxy might already exist, try to get it + proxy, err = toxiClient.Proxy("postgres") + if err != nil { + return nil, fmt.Errorf("failed to create/get postgres proxy: %w", err) + } + } + toxiPostgresProxy = proxy + } + + // Ensure enabled and clean + if err := toxiPostgresProxy.Enable(); err != nil { + return nil, fmt.Errorf("failed to enable proxy: %w", err) + } + + // Remove all existing toxics to start fresh + toxics, err := toxiPostgresProxy.Toxics() + if err == nil { + for _, toxic := range toxics { + _ = toxiPostgresProxy.RemoveToxic(toxic.Name) + } + } + + return toxiPostgresProxy, nil +} diff --git a/flow/e2e/postgres_qrep_test.go b/flow/e2e/postgres_qrep_test.go index 293af9d362..92a1c9b6a6 100644 --- a/flow/e2e/postgres_qrep_test.go +++ b/flow/e2e/postgres_qrep_test.go @@ -166,10 +166,11 @@ func (s PeerFlowE2ETestSuitePG) TestSimpleSlotCreation() { func (s PeerFlowE2ETestSuitePG) Test_Complete_QRep_Flow_Multi_Insert_PG() { numRows := 10 - srcTable := "test_qrep_flow_avro_pg_1" + baseName := "test_qrep_flow_avro_pg" + srcTable := baseName + "_1" s.setupSourceTable(srcTable, numRows) - dstTable := "test_qrep_flow_avro_pg_2" + dstTable := baseName + "_2" err := CreateTableForQRep(s.t.Context(), s.Conn(), s.suffix, dstTable) require.NoError(s.t, err) @@ -180,9 +181,10 @@ func (s PeerFlowE2ETestSuitePG) Test_Complete_QRep_Flow_Multi_Insert_PG() { query := fmt.Sprintf("SELECT * FROM e2e_test_%s.%s WHERE updated_at BETWEEN {{.start}} AND {{.end}}", s.suffix, srcTable) + jobName := AddSuffix(s, baseName) qrepConfig := CreateQRepWorkflowConfig( s.t, - "test_qrep_flow_avro_pg", + jobName, srcSchemaQualified, dstSchemaQualified, query, @@ -204,10 +206,11 @@ func (s PeerFlowE2ETestSuitePG) Test_Complete_QRep_Flow_Multi_Insert_PG() { func (s PeerFlowE2ETestSuitePG) Test_PG_TypeSystemQRep() { numRows := 10 - srcTable := "test_qrep_flow_pgpg_1" + baseName := "test_qrep_flow_pgpg" + srcTable := baseName + "_1" s.setupSourceTable(srcTable, numRows) - dstTable := "test_qrep_flow_pgpg_2" + dstTable := baseName + "_2" err := CreateTableForQRep(s.t.Context(), s.Conn(), s.suffix, dstTable) require.NoError(s.t, err) @@ -218,9 +221,10 @@ func (s PeerFlowE2ETestSuitePG) Test_PG_TypeSystemQRep() { query := fmt.Sprintf("SELECT * FROM %s WHERE updated_at BETWEEN {{.start}} AND {{.end}}", srcSchemaQualified) + jobName := AddSuffix(s, baseName) qrepConfig := CreateQRepWorkflowConfig( s.t, - "test_qrep_flow_pgpg", + jobName, srcSchemaQualified, dstSchemaQualified, query, @@ -243,10 +247,11 @@ func (s PeerFlowE2ETestSuitePG) Test_PG_TypeSystemQRep() { func (s PeerFlowE2ETestSuitePG) Test_PeerDB_Columns_QRep_PG() { numRows := 10 - srcTable := "test_qrep_columns_pg_1" + baseName := "test_qrep_columns_pg" + srcTable := baseName + "_1" s.setupSourceTable(srcTable, numRows) - dstTable := "test_qrep_columns_pg_2" + dstTable := baseName + "_2" srcSchemaQualified := fmt.Sprintf("%s_%s.%s", "e2e_test", s.suffix, srcTable) dstSchemaQualified := fmt.Sprintf("%s_%s.%s", "e2e_test", s.suffix, dstTable) @@ -254,9 +259,10 @@ func (s PeerFlowE2ETestSuitePG) Test_PeerDB_Columns_QRep_PG() { query := fmt.Sprintf("SELECT * FROM e2e_test_%s.%s WHERE updated_at BETWEEN {{.start}} AND {{.end}}", s.suffix, srcTable) + jobName := AddSuffix(s, baseName) qrepConfig := CreateQRepWorkflowConfig( s.t, - "test_qrep_columns_pg", + jobName, srcSchemaQualified, dstSchemaQualified, query, @@ -278,10 +284,11 @@ func (s PeerFlowE2ETestSuitePG) Test_PeerDB_Columns_QRep_PG() { func (s PeerFlowE2ETestSuitePG) Test_Overwrite_PG() { numRows := 10 - srcTable := "test_overwrite_pg_1" + baseName := "test_overwrite_pg" + srcTable := baseName + "_1" s.setupSourceTable(srcTable, numRows) - dstTable := "test_overwrite_pg_2" + dstTable := baseName + "_2" srcSchemaQualified := fmt.Sprintf("%s_%s.%s", "e2e_test", s.suffix, srcTable) dstSchemaQualified := fmt.Sprintf("%s_%s.%s", "e2e_test", s.suffix, dstTable) @@ -289,9 +296,10 @@ func (s PeerFlowE2ETestSuitePG) Test_Overwrite_PG() { query := fmt.Sprintf("SELECT * FROM e2e_test_%s.%s WHERE updated_at BETWEEN {{.start}} AND {{.end}}", s.suffix, srcTable) + jobName := AddSuffix(s, baseName) qrepConfig := CreateQRepWorkflowConfig( s.t, - "test_overwrite_pg", + jobName, srcSchemaQualified, dstSchemaQualified, query, @@ -327,10 +335,11 @@ func (s PeerFlowE2ETestSuitePG) Test_Overwrite_PG() { func (s PeerFlowE2ETestSuitePG) Test_No_Rows_QRep_PG() { numRows := 0 - srcTable := "test_no_rows_qrep_pg_1" + baseName := "test_no_rows_qrep_pg" + srcTable := baseName + "_1" s.setupSourceTable(srcTable, numRows) - dstTable := "test_no_rows_qrep_pg_2" + dstTable := baseName + "_2" srcSchemaQualified := fmt.Sprintf("%s_%s.%s", "e2e_test", s.suffix, srcTable) dstSchemaQualified := fmt.Sprintf("%s_%s.%s", "e2e_test", s.suffix, dstTable) @@ -338,9 +347,10 @@ func (s PeerFlowE2ETestSuitePG) Test_No_Rows_QRep_PG() { query := fmt.Sprintf("SELECT * FROM e2e_test_%s.%s WHERE updated_at BETWEEN {{.start}} AND {{.end}}", s.suffix, srcTable) + jobName := AddSuffix(s, baseName) qrepConfig := CreateQRepWorkflowConfig( s.t, - "test_no_rows_qrep_pg", + jobName, srcSchemaQualified, dstSchemaQualified, query, @@ -360,10 +370,10 @@ func (s PeerFlowE2ETestSuitePG) Test_No_Rows_QRep_PG() { func (s PeerFlowE2ETestSuitePG) TestQRepPause() { numRows := 10 - srcTable := "qrep_pause" + srcTable := "test_qrep_pause" s.setupSourceTable(srcTable, numRows) - dstTable := "qrep_pause_dst" + dstTable := "test_qrep_pause_dst" srcSchemaQualified := fmt.Sprintf("%s_%s.%s", "e2e_test", s.suffix, srcTable) dstSchemaQualified := fmt.Sprintf("%s_%s.%s", "e2e_test", s.suffix, dstTable) @@ -371,9 +381,10 @@ func (s PeerFlowE2ETestSuitePG) TestQRepPause() { query := fmt.Sprintf("SELECT * FROM e2e_test_%s.%s WHERE updated_at BETWEEN {{.start}} AND {{.end}}", s.suffix, srcTable) + jobName := AddSuffix(s, srcTable) config := CreateQRepWorkflowConfig( s.t, - "test_qrep_pause_pg", + jobName, srcSchemaQualified, dstSchemaQualified, query, @@ -422,6 +433,7 @@ func (s PeerFlowE2ETestSuitePG) TestQRepPause() { func (s PeerFlowE2ETestSuitePG) TestXminPause() { numRows := 10 + baseName := "test_xmin_pause_pg" srcTable := "xmin_pause" s.setupSourceTable(srcTable, numRows) @@ -432,9 +444,10 @@ func (s PeerFlowE2ETestSuitePG) TestXminPause() { query := fmt.Sprintf("SELECT * FROM e2e_test_%s.%s", s.suffix, srcTable) + jobName := AddSuffix(s, baseName) config := CreateQRepWorkflowConfig( s.t, - "test_xmin_pause_pg", + jobName, srcSchemaQualified, dstSchemaQualified, query, @@ -498,9 +511,10 @@ func (s PeerFlowE2ETestSuitePG) TestTransform() { ('pgtransform', 'lua', 'function transformRow(row) row.myreal = 1729 end') on conflict do nothing`) require.NoError(s.t, err) + jobName := AddSuffix(s, srcTable) qrepConfig := CreateQRepWorkflowConfig( s.t, - "test_transform", + jobName, srcSchemaQualified, dstSchemaQualified, query, diff --git a/flow/e2e/s3_qrep_test.go b/flow/e2e/s3_qrep_test.go index 88b4558c75..47147504dc 100644 --- a/flow/e2e/s3_qrep_test.go +++ b/flow/e2e/s3_qrep_test.go @@ -127,10 +127,11 @@ func (s PeerFlowE2ETestSuiteS3) Test_Complete_QRep_Flow_S3() { tc := NewTemporalClient(s.t) - jobName := "test_complete_flow_s3" - schemaQualifiedName := fmt.Sprintf("e2e_test_%s.%s", s.suffix, jobName) + tableName := "test_complete_flow_s3" + jobName := AddSuffix(s, tableName) + schemaQualifiedName := AttachSchema(s, tableName) - s.setupSourceTable(jobName, 10) + s.setupSourceTable(tableName, 10) query := fmt.Sprintf("SELECT * FROM %s WHERE updated_at >= {{.start}} AND updated_at < {{.end}}", schemaQualifiedName) qrepConfig := CreateQRepWorkflowConfig( @@ -170,10 +171,11 @@ func (s PeerFlowE2ETestSuiteS3) Test_Complete_QRep_Flow_S3_CTID() { tc := NewTemporalClient(s.t) - jobName := "test_complete_flow_s3_ctid" - schemaQualifiedName := fmt.Sprintf("e2e_test_%s.%s", s.suffix, jobName) + tableName := "test_complete_flow_s3_ctid" + jobName := AddSuffix(s, tableName) + schemaQualifiedName := AttachSchema(s, tableName) - s.setupSourceTable(jobName, 20000) + s.setupSourceTable(tableName, 20000) query := fmt.Sprintf("SELECT * FROM %s WHERE ctid BETWEEN {{.start}} AND {{.end}}", schemaQualifiedName) qrepConfig := CreateQRepWorkflowConfig( s.t, diff --git a/flow/e2e/snowflake_qrep_test.go b/flow/e2e/snowflake_qrep_test.go index 2c76af517b..0d929326c5 100644 --- a/flow/e2e/snowflake_qrep_test.go +++ b/flow/e2e/snowflake_qrep_test.go @@ -58,9 +58,10 @@ func (s PeerFlowE2ETestSuiteSF) Test_Complete_QRep_Flow_Avro_SF() { query := fmt.Sprintf("SELECT * FROM e2e_test_%s.%s WHERE updated_at BETWEEN {{.start}} AND {{.end}}", s.pgSuffix, tblName) + jobName := AddSuffix(s, tblName) qrepConfig := CreateQRepWorkflowConfig( s.t, - "test_qrep_flow_avro_sf", + jobName, fmt.Sprintf("e2e_test_%s.%s", s.pgSuffix, tblName), dstSchemaQualified, query, @@ -95,9 +96,10 @@ func (s PeerFlowE2ETestSuiteSF) Test_Complete_QRep_Flow_Avro_SF_Upsert_Simple() query := fmt.Sprintf("SELECT * FROM e2e_test_%s.%s WHERE updated_at BETWEEN {{.start}} AND {{.end}}", s.pgSuffix, tblName) + jobName := AddSuffix(s, tblName) qrepConfig := CreateQRepWorkflowConfig( s.t, - "test_qrep_flow_avro_sf", + jobName, fmt.Sprintf("e2e_test_%s.%s", s.pgSuffix, tblName), dstSchemaQualified, query, @@ -135,9 +137,10 @@ func (s PeerFlowE2ETestSuiteSF) Test_Complete_QRep_Flow_Avro_SF_S3() { query := fmt.Sprintf("SELECT * FROM e2e_test_%s.%s WHERE updated_at BETWEEN {{.start}} AND {{.end}}", s.pgSuffix, tblName) + jobName := AddSuffix(s, tblName) qrepConfig := CreateQRepWorkflowConfig( s.t, - "test_qrep_flow_avro_sf", + jobName, s.attachSchemaSuffix(tblName), dstSchemaQualified, query, @@ -170,9 +173,10 @@ func (s PeerFlowE2ETestSuiteSF) Test_Complete_QRep_Flow_Avro_SF_Upsert_XMIN() { query := fmt.Sprintf("SELECT * FROM e2e_test_%s.%s", s.pgSuffix, tblName) + jobName := AddSuffix(s, tblName) qrepConfig := CreateQRepWorkflowConfig( s.t, - "test_qrep_flow_avro_sf_xmin", + jobName, fmt.Sprintf("e2e_test_%s.%s", s.pgSuffix, tblName), dstSchemaQualified, query, @@ -213,9 +217,10 @@ func (s PeerFlowE2ETestSuiteSF) Test_Complete_QRep_Flow_Avro_SF_S3_Integration() s.sfHelper.Config.S3Integration = "peerdb_s3_integration" + jobName := AddSuffix(s, tblName) qrepConfig := CreateQRepWorkflowConfig( s.t, - "test_qrep_flow_avro_sf_int", + jobName, s.attachSchemaSuffix(tblName), dstSchemaQualified, query, @@ -241,7 +246,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_PeerDB_Columns_QRep_SF() { numRows := 10 - tblName := "test_qrep_columns_sf" + tblName := "test_columns_qrep_sf" s.setupSourceTable(tblName, numRows) dstSchemaQualified := fmt.Sprintf("%s.%s", s.sfHelper.testSchemaName, tblName) @@ -249,9 +254,10 @@ func (s PeerFlowE2ETestSuiteSF) Test_PeerDB_Columns_QRep_SF() { query := fmt.Sprintf("SELECT * FROM e2e_test_%s.%s WHERE updated_at BETWEEN {{.start}} AND {{.end}}", s.pgSuffix, tblName) + jobName := AddSuffix(s, tblName) qrepConfig := CreateQRepWorkflowConfig( s.t, - "test_columns_qrep_sf", + jobName, fmt.Sprintf("e2e_test_%s.%s", s.pgSuffix, tblName), dstSchemaQualified, query, @@ -287,9 +293,10 @@ func (s PeerFlowE2ETestSuiteSF) Test_Soft_Delete_Default_False_SF() { query := fmt.Sprintf("SELECT * FROM e2e_test_%s.%s WHERE updated_at BETWEEN {{.start}} AND {{.end}}", s.pgSuffix, tblName) + jobName := AddSuffix(s, tblName) qrepConfig := CreateQRepWorkflowConfig( s.t, - "test_deleted_false_qrep_sf", + jobName, fmt.Sprintf("e2e_test_%s.%s", s.pgSuffix, tblName), dstSchemaQualified, query, diff --git a/nexus/catalog/migrations/V48__unique_flows.sql b/nexus/catalog/migrations/V48__unique_flows.sql new file mode 100644 index 0000000000..281bfb1a94 --- /dev/null +++ b/nexus/catalog/migrations/V48__unique_flows.sql @@ -0,0 +1,14 @@ +-- these are never read +UPDATE flows +SET source_table_identifier = NULL, destination_table_identifier = NULL; + +-- deduplicate flows by name +DELETE FROM flows +WHERE id NOT IN ( + SELECT MAX(id) + FROM flows + GROUP BY name +); + +-- make them unique going forward +ALTER TABLE flows ADD CONSTRAINT flows_name_unique UNIQUE (name); diff --git a/nexus/catalog/src/lib.rs b/nexus/catalog/src/lib.rs index bc2f43285b..e819ce8154 100644 --- a/nexus/catalog/src/lib.rs +++ b/nexus/catalog/src/lib.rs @@ -388,16 +388,12 @@ impl Catalog { .pg .prepare_typed( "INSERT INTO flows (name, source_peer, destination_peer, description, - destination_table_identifier, query_string, flow_metadata) VALUES ($1, $2, $3, $4, $5, $6, $7)", + query_string, flow_metadata) VALUES ($1, $2, $3, $4, $5, $6, $7)", &[types::Type::TEXT, types::Type::INT4, types::Type::INT4, types::Type::TEXT, - types::Type::TEXT, types::Type::TEXT, types::Type::JSONB], + types::Type::TEXT, types::Type::JSONB], ) .await?; - let Some(destination_table_name) = job.flow_options.get("destination_table_name") else { - return Err(anyhow!("destination_table_name not found in flow options")); - }; - let _rows = self .pg .execute( @@ -407,7 +403,6 @@ impl Catalog { &source_peer_id, &destination_peer_id, &job.description, - &destination_table_name.as_str().unwrap(), &job.query_string, &serde_json::to_value(job.flow_options.clone()) .context("unable to serialize flow options")?, diff --git a/nexus/flow-rs/src/grpc.rs b/nexus/flow-rs/src/grpc.rs index 7bd1489b28..ac611c55ba 100644 --- a/nexus/flow-rs/src/grpc.rs +++ b/nexus/flow-rs/src/grpc.rs @@ -59,6 +59,7 @@ impl FlowGrpcClient { ) -> anyhow::Result { let create_peer_flow_req = pt::peerdb_route::CreateCdcFlowRequest { connection_configs: Some(peer_flow_config), + attach_to_existing: false, }; let response = self.client.create_cdc_flow(create_peer_flow_req).await?; let workflow_id = response.into_inner().workflow_id; diff --git a/protos/route.proto b/protos/route.proto index dec928ce3b..e051e0c94d 100644 --- a/protos/route.proto +++ b/protos/route.proto @@ -11,6 +11,7 @@ package peerdb_route; message CreateCDCFlowRequest { peerdb_flow.FlowConnectionConfigs connection_configs = 1; + bool attach_to_existing = 2; } message CreateCDCFlowResponse { string workflow_id = 1; }