From 1b9f3c85d23b4d2baeba6f843535bbddc6ca863d Mon Sep 17 00:00:00 2001 From: Ilia Demianenko Date: Wed, 17 Sep 2025 03:09:10 -0700 Subject: [PATCH 01/13] Idempotent CDC flow creation --- .github/workflows/flow.yml | 5 + flow/cmd/handler.go | 85 +++++++- flow/cmd/validate_mirror.go | 9 +- flow/e2e/api_test.go | 392 ++++++++++++++++++++++++++++++++++++ flow/e2e/pg.go | 104 ++++++++++ flow/go.mod | 7 + flow/go.sum | 19 ++ protos/route.proto | 6 + 8 files changed, 619 insertions(+), 8 deletions(-) diff --git a/.github/workflows/flow.yml b/.github/workflows/flow.yml index 65f943fb13..53c7966605 100644 --- a/.github/workflows/flow.yml +++ b/.github/workflows/flow.yml @@ -57,6 +57,11 @@ jobs: image: otel/opentelemetry-collector-contrib:0.135.0@sha256:89107a3a8f4636a396927edf7025bb9614b8da2d92f4cc3f43109e8d115736e2 ports: - 4317:4317 + toxiproxy: + image: ghcr.io/shopify/toxiproxy:2.11.0 + ports: + - 18474:8474 + - 9902:9902 steps: - uses: actions/checkout@08c6903cd8c0fde910a37f88322edcfb5dd907a8 # v5 diff --git a/flow/cmd/handler.go b/flow/cmd/handler.go index 9ae5c6a855..e4f5a2e53d 100644 --- a/flow/cmd/handler.go +++ b/flow/cmd/handler.go @@ -12,6 +12,7 @@ import ( "github.com/jackc/pgx/v5" "github.com/jackc/pgx/v5/pgtype" tEnums "go.temporal.io/api/enums/v1" + "go.temporal.io/api/serviceerror" "go.temporal.io/api/workflowservice/v1" "go.temporal.io/sdk/client" "google.golang.org/protobuf/proto" @@ -59,7 +60,7 @@ func (h *FlowRequestHandler) getPeerID(ctx context.Context, peerName string) (in } func (h *FlowRequestHandler) createCdcJobEntry(ctx context.Context, - req *protos.CreateCDCFlowRequest, workflowID string, + req *protos.CreateCDCFlowRequest, workflowID string, idempotent bool, ) error { sourcePeerID, srcErr := h.getPeerID(ctx, req.ConnectionConfigs.SourceName) if srcErr != nil { @@ -78,11 +79,23 @@ func (h *FlowRequestHandler) createCdcJobEntry(ctx context.Context, return fmt.Errorf("unable to marshal flow config: %w", err) } - if _, err := h.pool.Exec(ctx, - `INSERT INTO flows (workflow_id, name, source_peer, destination_peer, config_proto, status, - description, source_table_identifier, destination_table_identifier) VALUES ($1,$2,$3,$4,$5,$6,'gRPC','','')`, - workflowID, req.ConnectionConfigs.FlowJobName, sourcePeerID, destinationPeerID, cfgBytes, protos.FlowStatus_STATUS_SETUP, - ); err != nil { + if idempotent { + _, err = h.pool.Exec(ctx, + `INSERT INTO flows (workflow_id, name, source_peer, destination_peer, config_proto, status, + description, source_table_identifier, destination_table_identifier) + SELECT $1,$2,$3,$4,$5,$6,'gRPC','','' + WHERE NOT EXISTS (SELECT 1 FROM flows WHERE name = $7)`, + workflowID, req.ConnectionConfigs.FlowJobName, sourcePeerID, destinationPeerID, cfgBytes, protos.FlowStatus_STATUS_SETUP, req.ConnectionConfigs.FlowJobName, + ) + } else { + _, err = h.pool.Exec(ctx, + `INSERT INTO flows (workflow_id, name, source_peer, destination_peer, config_proto, status, + description, source_table_identifier, destination_table_identifier) + VALUES ($1,$2,$3,$4,$5,$6,'gRPC','','')`, + workflowID, req.ConnectionConfigs.FlowJobName, sourcePeerID, destinationPeerID, cfgBytes, protos.FlowStatus_STATUS_SETUP, + ) + } + if err != nil { return fmt.Errorf("unable to insert into flows table for flow %s: %w", req.ConnectionConfigs.FlowJobName, err) } @@ -154,7 +167,65 @@ func (h *FlowRequestHandler) CreateCDCFlow( TypedSearchAttributes: shared.NewSearchAttributes(cfg.FlowJobName), } - if err := h.createCdcJobEntry(ctx, req, workflowID); err != nil { + if err := h.createCdcJobEntry(ctx, req, workflowID, false); err != nil { + slog.ErrorContext(ctx, "unable to create flow job entry", slog.Any("error", err)) + return nil, exceptions.NewInternalApiError(fmt.Errorf("unable to create flow job entry: %w", err)) + } + + if _, err := h.temporalClient.ExecuteWorkflow(ctx, workflowOptions, peerflow.CDCFlowWorkflow, cfg, nil); err != nil { + slog.ErrorContext(ctx, "unable to start PeerFlow workflow", slog.Any("error", err)) + return nil, exceptions.NewInternalApiError(fmt.Errorf("unable to start PeerFlow workflow: %w", err)) + } + + return &protos.CreateCDCFlowResponse{ + WorkflowId: workflowID, + }, nil +} + +func (h *FlowRequestHandler) CreateCDCFlowManaged( + ctx context.Context, req *protos.CreateCDCFlowRequest, +) (*protos.CreateCDCFlowResponse, error) { + cfg := req.ConnectionConfigs + internalVersion, err := internal.PeerDBForceInternalVersion(ctx, req.ConnectionConfigs.Env) + if err != nil { + return nil, exceptions.NewInternalApiError(fmt.Errorf("failed to get internal version: %w", err)) + } + cfg.Version = internalVersion + + if cfg.Resync { + return nil, exceptions.NewInvalidArgumentApiError(errors.New("resync is not supported in the managed API")) + } + + workflowID := fmt.Sprintf("%s-peerflow", cfg.FlowJobName) + var errNotFound *serviceerror.NotFound + _, err = h.temporalClient.DescribeWorkflow(ctx, workflowID, "") + if err != nil && !errors.As(err, &errNotFound) { + return nil, exceptions.NewInternalApiError(fmt.Errorf("failed to query the workflow execution: %w", err)) + } else if err == nil { + // Previous CreateCDCFlowManaged already succeeded + return &protos.CreateCDCFlowResponse{ + WorkflowId: workflowID, + }, nil + } + // Workflow not found, do the validations and start a new one + + // Use idempotent validation that skips mirror existence check + if _, err := h.validateCDCMirrorImpl(ctx, req, true); err != nil { + slog.ErrorContext(ctx, "validate mirror error", slog.Any("error", err)) + // validateCDCMirrorImpl already returns a grpc error + //nopeertest:grpcReturn + return nil, fmt.Errorf("invalid mirror: %w", err) + } + + workflowOptions := client.StartWorkflowOptions{ + ID: workflowID, + TaskQueue: h.peerflowTaskQueueID, + TypedSearchAttributes: shared.NewSearchAttributes(cfg.FlowJobName), + WorkflowIDConflictPolicy: tEnums.WORKFLOW_ID_CONFLICT_POLICY_USE_EXISTING, + WorkflowIDReusePolicy: tEnums.WORKFLOW_ID_REUSE_POLICY_REJECT_DUPLICATE, + } + + if err := h.createCdcJobEntry(ctx, req, workflowID, true); err != nil { slog.ErrorContext(ctx, "unable to create flow job entry", slog.Any("error", err)) return nil, exceptions.NewInternalApiError(fmt.Errorf("unable to create flow job entry: %w", err)) } diff --git a/flow/cmd/validate_mirror.go b/flow/cmd/validate_mirror.go index de26a1ea59..70c27420a2 100644 --- a/flow/cmd/validate_mirror.go +++ b/flow/cmd/validate_mirror.go @@ -18,6 +18,12 @@ var CustomColumnTypeRegex = regexp.MustCompile(`^$|^[a-zA-Z][a-zA-Z0-9(),]*$`) func (h *FlowRequestHandler) ValidateCDCMirror( ctx context.Context, req *protos.CreateCDCFlowRequest, +) (*protos.ValidateCDCMirrorResponse, error) { + return h.validateCDCMirrorImpl(ctx, req, false) +} + +func (h *FlowRequestHandler) validateCDCMirrorImpl( + ctx context.Context, req *protos.CreateCDCFlowRequest, idempotent bool, ) (*protos.ValidateCDCMirrorResponse, error) { ctx = context.WithValue(ctx, shared.FlowNameKey, req.ConnectionConfigs.FlowJobName) underMaintenance, err := internal.PeerDBMaintenanceModeEnabled(ctx, nil) @@ -31,7 +37,8 @@ func (h *FlowRequestHandler) ValidateCDCMirror( return nil, exceptions.ErrUnderMaintenance } - if !req.ConnectionConfigs.Resync { + // Skip mirror existence check when idempotent (for managed API) + if !idempotent && !req.ConnectionConfigs.Resync { mirrorExists, existCheckErr := h.checkIfMirrorNameExists(ctx, req.ConnectionConfigs.FlowJobName) if existCheckErr != nil { slog.ErrorContext(ctx, "/validatecdc failed to check if mirror name exists", slog.Any("error", existCheckErr)) diff --git a/flow/e2e/api_test.go b/flow/e2e/api_test.go index 6a894dd23c..c53d2b58e4 100644 --- a/flow/e2e/api_test.go +++ b/flow/e2e/api_test.go @@ -6,14 +6,18 @@ import ( "fmt" "slices" "strings" + "sync" "testing" "time" + tp "github.com/Shopify/toxiproxy/v2/client" "github.com/jackc/pgx/v5" "github.com/jackc/pgx/v5/pgtype" "github.com/stretchr/testify/require" "go.mongodb.org/mongo-driver/v2/bson" "go.mongodb.org/mongo-driver/v2/mongo/options" + "go.temporal.io/api/enums/v1" + "go.temporal.io/api/workflowservice/v1" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" "google.golang.org/protobuf/proto" @@ -1271,3 +1275,391 @@ func (s APITestSuite) TestDropMissing() { }) require.NoError(s.t, err) } + +func (s APITestSuite) TestCreateCDCFlowManagedConcurrentRequests() { + // Test: two concurrent requests succeed + if _, ok := s.source.(*PostgresSource); !ok { + s.t.Skip("only testing with PostgreSQL") + } + + tableName := "concurrent_test" + require.NoError(s.t, s.source.Exec(s.t.Context(), + fmt.Sprintf("CREATE TABLE %s(id int primary key, val text)", AttachSchema(s, tableName)))) + require.NoError(s.t, s.source.Exec(s.t.Context(), + fmt.Sprintf("INSERT INTO %s(id, val) values (1,'first')", AttachSchema(s, tableName)))) + + connectionGen := FlowConnectionGenerationConfig{ + FlowJobName: "managed_concurrent_" + s.suffix, + TableNameMapping: map[string]string{AttachSchema(s, tableName): tableName}, + Destination: s.ch.Peer().Name, + } + flowConnConfig := connectionGen.GenerateFlowConnectionConfigs(s) + flowConnConfig.DoInitialSnapshot = true + + // Two concurrent requests should succeed and return the same workflow ID + var wg sync.WaitGroup + var response1, response2 *protos.CreateCDCFlowResponse + var err1, err2 error + + wg.Add(2) + go func() { + defer wg.Done() + response1, err1 = s.CreateCDCFlowManaged(s.t.Context(), &protos.CreateCDCFlowRequest{ConnectionConfigs: flowConnConfig}) + }() + go func() { + defer wg.Done() + response2, err2 = s.CreateCDCFlowManaged(s.t.Context(), &protos.CreateCDCFlowRequest{ConnectionConfigs: flowConnConfig}) + }() + wg.Wait() + + require.NoError(s.t, err1) + require.NoError(s.t, err2) + require.NotNil(s.t, response1) + require.NotNil(s.t, response2) + require.Equal(s.t, response1.WorkflowId, response2.WorkflowId) + + // Verify workflow is actually running + tc := NewTemporalClient(s.t) + env, err := GetPeerflow(s.t.Context(), s.pg.PostgresConnector.Conn(), tc, flowConnConfig.FlowJobName) + require.NoError(s.t, err) + SetupCDCFlowStatusQuery(s.t, env, flowConnConfig) + EnvWaitFor(s.t, env, 3*time.Minute, "wait for flow to be running", func() bool { + return env.GetFlowStatus(s.t) == protos.FlowStatus_STATUS_RUNNING + }) + + // Clean up + env.Cancel(s.t.Context()) + RequireEnvCanceled(s.t, env) +} + +func (s APITestSuite) TestCreateCDCFlowManagedConcurrentRequestsToxi() { + // Test: use Toxiproxy to ensure concurrent requests are truly concurrent + + // To run locally, requires toxiproxy running: + // docker run -d \ + // --name peerdb-toxiproxy \ + // -p 18474:8474 \ + // -p 9902:9902 \ + // ghcr.io/shopify/toxiproxy:2.11.0 + + if _, ok := s.source.(*PostgresSource); !ok { + s.t.Skip("only testing with PostgreSQL") + } + + // Setup PostgreSQL with Toxiproxy + suffix := "race_" + s.suffix + pgWithProxy, proxy, err := SetupPostgresWithToxiproxy(s.t, suffix) + require.NoError(s.t, err) + defer pgWithProxy.Teardown(s.t, s.t.Context(), suffix) + + // Create table + tableName := "toxiproxy_race_test" + require.NoError(s.t, pgWithProxy.Exec(s.t.Context(), + fmt.Sprintf("CREATE TABLE e2e_test_%s.%s(id int primary key, val text)", suffix, tableName))) + + // Create peer for the proxy connection + proxyConfig := internal.GetCatalogPostgresConfigFromEnv(s.t.Context()) + proxyConfig.Port = uint32(9902) + proxyPeer := &protos.Peer{ + Name: "proxy_postgres_" + suffix, + Type: protos.DBType_POSTGRES, + Config: &protos.Peer_PostgresConfig{ + PostgresConfig: proxyConfig, + }, + } + CreatePeer(s.t, proxyPeer) + defer func() { + _, _ = s.DropPeer(s.t.Context(), &protos.DropPeerRequest{PeerName: proxyPeer.Name}) + }() + + connectionGen := FlowConnectionGenerationConfig{ + FlowJobName: "managed_" + suffix, + TableNameMapping: map[string]string{ + fmt.Sprintf("e2e_test_%s.%s", suffix, tableName): tableName, + }, + Destination: s.ch.Peer().Name, + } + flowConnConfig := connectionGen.GenerateFlowConnectionConfigs(s) + flowConnConfig.DoInitialSnapshot = true + flowConnConfig.SourceName = proxyPeer.Name + + // Add latency toxic to ensure concurrent execution + const toxicDelay = 2 * time.Second + toxic, err := proxy.AddToxic("latency", "latency", "downstream", 1.0, tp.Attributes{ + "latency": int(toxicDelay.Milliseconds()), + }) + require.NoError(s.t, err) + + // Start concurrent requests + var wg sync.WaitGroup + var response1, response2 *protos.CreateCDCFlowResponse + var err1, err2 error + var duration1, duration2 time.Duration + + wg.Add(2) + go func() { + defer wg.Done() + start := time.Now() + response1, err1 = s.CreateCDCFlowManaged(s.t.Context(), + &protos.CreateCDCFlowRequest{ConnectionConfigs: flowConnConfig}) + duration1 = time.Since(start) + }() + go func() { + defer wg.Done() + start := time.Now() + response2, err2 = s.CreateCDCFlowManaged(s.t.Context(), + &protos.CreateCDCFlowRequest{ConnectionConfigs: flowConnConfig}) + duration2 = time.Since(start) + }() + + // Let goroutines start with toxic active + time.Sleep(toxicDelay) + + // Remove toxic so requests can complete + err = proxy.RemoveToxic(toxic.Name) + require.NoError(s.t, err) + + wg.Wait() + + // Verify both requests were delayed by toxic + require.Greater(s.t, duration1, toxicDelay) + require.Greater(s.t, duration2, toxicDelay) + + // Verify both succeeded with same workflow ID + require.NoError(s.t, err1) + require.NoError(s.t, err2) + require.NotNil(s.t, response1) + require.NotNil(s.t, response2) + require.Equal(s.t, response1.WorkflowId, response2.WorkflowId) + + // Verify workflow is actually running + tc := NewTemporalClient(s.t) + env, err := GetPeerflow(s.t.Context(), s.pg.PostgresConnector.Conn(), tc, flowConnConfig.FlowJobName) + require.NoError(s.t, err) + SetupCDCFlowStatusQuery(s.t, env, flowConnConfig) + EnvWaitFor(s.t, env, 3*time.Minute, "wait for flow to be running", func() bool { + return env.GetFlowStatus(s.t) == protos.FlowStatus_STATUS_RUNNING + }) + + // Clean up + env.Cancel(s.t.Context()) + RequireEnvCanceled(s.t, env) +} + +func (s APITestSuite) TestCreateCDCFlowManagedSequentialRequests() { + // Test: two sequential requests succeed, same workflow is returned + if _, ok := s.source.(*PostgresSource); !ok { + s.t.Skip("only testing with PostgreSQL") + } + + tableName := "sequential_test" + require.NoError(s.t, s.source.Exec(s.t.Context(), + fmt.Sprintf("CREATE TABLE %s(id int primary key, val text)", AttachSchema(s, tableName)))) + require.NoError(s.t, s.source.Exec(s.t.Context(), + fmt.Sprintf("INSERT INTO %s(id, val) values (1,'first')", AttachSchema(s, tableName)))) + + connectionGen := FlowConnectionGenerationConfig{ + FlowJobName: "managed_sequential_" + s.suffix, + TableNameMapping: map[string]string{AttachSchema(s, tableName): tableName}, + Destination: s.ch.Peer().Name, + } + flowConnConfig := connectionGen.GenerateFlowConnectionConfigs(s) + flowConnConfig.DoInitialSnapshot = true + + // First request + response1, err1 := s.CreateCDCFlowManaged(s.t.Context(), &protos.CreateCDCFlowRequest{ConnectionConfigs: flowConnConfig}) + require.NoError(s.t, err1) + require.NotNil(s.t, response1) + + // Verify workflow is actually running + tc := NewTemporalClient(s.t) + env, err := GetPeerflow(s.t.Context(), s.pg.PostgresConnector.Conn(), tc, flowConnConfig.FlowJobName) + require.NoError(s.t, err) + SetupCDCFlowStatusQuery(s.t, env, flowConnConfig) + EnvWaitFor(s.t, env, 3*time.Minute, "wait for flow to be running", func() bool { + return env.GetFlowStatus(s.t) == protos.FlowStatus_STATUS_RUNNING + }) + + // Second sequential request should return the same workflow ID + response2, err2 := s.CreateCDCFlowManaged(s.t.Context(), &protos.CreateCDCFlowRequest{ConnectionConfigs: flowConnConfig}) + require.NoError(s.t, err2) + require.NotNil(s.t, response2) + require.Equal(s.t, response1.WorkflowId, response2.WorkflowId) + + // Clean up + env.Cancel(s.t.Context()) + RequireEnvCanceled(s.t, env) +} + +func (s APITestSuite) TestCreateCDCFlowManagedExternalFlowEntry() { + // Test: create a flows entry from the outside (simulate a crash before the workflow is created), do a request, should succeed + if _, ok := s.source.(*PostgresSource); !ok { + s.t.Skip("only testing with PostgreSQL") + } + + tableName := "external_entry_test" + require.NoError(s.t, s.source.Exec(s.t.Context(), + fmt.Sprintf("CREATE TABLE %s(id int primary key, val text)", AttachSchema(s, tableName)))) + + connectionGen := FlowConnectionGenerationConfig{ + FlowJobName: "managed_external_" + s.suffix, + TableNameMapping: map[string]string{AttachSchema(s, tableName): tableName}, + Destination: s.ch.Peer().Name, + } + flowConnConfig := connectionGen.GenerateFlowConnectionConfigs(s) + flowConnConfig.DoInitialSnapshot = true + + // Simulate a crash: create flows entry without creating workflow + conn := s.pg.PostgresConnector.Conn() + sourcePeer := s.source.GeneratePeer(s.t) + destPeer, err := s.GetPeerInfo(s.t.Context(), &protos.PeerInfoRequest{PeerName: s.ch.Peer().Name}) + require.NoError(s.t, err) + + var sourcePeerID, destPeerID int32 + require.NoError(s.t, conn.QueryRow(s.t.Context(), + "SELECT id FROM peers WHERE name = $1", sourcePeer.Name).Scan(&sourcePeerID)) + require.NoError(s.t, conn.QueryRow(s.t.Context(), + "SELECT id FROM peers WHERE name = $1", destPeer.Peer.Name).Scan(&destPeerID)) + + cfgBytes, err := proto.Marshal(flowConnConfig) + require.NoError(s.t, err) + + workflowID := fmt.Sprintf("%s-peerflow", flowConnConfig.FlowJobName) + _, err = conn.Exec(s.t.Context(), + `INSERT INTO flows (workflow_id, name, source_peer, destination_peer, config_proto, status, + description, source_table_identifier, destination_table_identifier) + VALUES ($1,$2,$3,$4,$5,$6,'gRPC','','')`, + workflowID, flowConnConfig.FlowJobName, sourcePeerID, destPeerID, cfgBytes, protos.FlowStatus_STATUS_SETUP, + ) + require.NoError(s.t, err) + + // Now call CreateCDCFlowManaged - should start the workflow successfully + response, err := s.CreateCDCFlowManaged(s.t.Context(), &protos.CreateCDCFlowRequest{ConnectionConfigs: flowConnConfig}) + require.NoError(s.t, err) + require.NotNil(s.t, response) + require.Equal(s.t, workflowID, response.WorkflowId) + + // Verify workflow is created and running + tc := NewTemporalClient(s.t) + env, err := GetPeerflow(s.t.Context(), conn, tc, flowConnConfig.FlowJobName) + require.NoError(s.t, err) + SetupCDCFlowStatusQuery(s.t, env, flowConnConfig) + EnvWaitFor(s.t, env, 3*time.Minute, "wait for flow to be running", func() bool { + return env.GetFlowStatus(s.t) == protos.FlowStatus_STATUS_RUNNING + }) + + // Clean up + env.Cancel(s.t.Context()) + RequireEnvCanceled(s.t, env) +} + +func (s APITestSuite) TestCreateCDCFlowManagedCanceledWorkflow() { + // Test: when cdc flow workflow is failed/canceled, a new one is still not created + if _, ok := s.source.(*PostgresSource); !ok { + s.t.Skip("only testing with PostgreSQL") + } + + tableName := "canceled_workflow_test" + require.NoError(s.t, s.source.Exec(s.t.Context(), + fmt.Sprintf("CREATE TABLE %s(id int primary key, val text)", AttachSchema(s, tableName)))) + + connectionGen := FlowConnectionGenerationConfig{ + FlowJobName: "managed_canceled_" + s.suffix, + TableNameMapping: map[string]string{AttachSchema(s, tableName): tableName}, + Destination: s.ch.Peer().Name, + } + flowConnConfig := connectionGen.GenerateFlowConnectionConfigs(s) + flowConnConfig.DoInitialSnapshot = true + + // First create a normal flow + response1, err := s.CreateCDCFlowManaged(s.t.Context(), &protos.CreateCDCFlowRequest{ConnectionConfigs: flowConnConfig}) + require.NoError(s.t, err) + require.NotNil(s.t, response1) + + tc := NewTemporalClient(s.t) + env, err := GetPeerflow(s.t.Context(), s.pg.PostgresConnector.Conn(), tc, flowConnConfig.FlowJobName) + require.NoError(s.t, err) + + // Cancel the workflow to simulate failure + env.Cancel(s.t.Context()) + RequireEnvCanceled(s.t, env) + + // Wait for workflow to be canceled + EnvWaitFor(s.t, env, 30*time.Second, "wait for workflow to be canceled", func() bool { + desc, err := tc.DescribeWorkflowExecution(s.t.Context(), response1.WorkflowId, "") + if err != nil { + return false + } + status := desc.GetWorkflowExecutionInfo().GetStatus() + return status == enums.WORKFLOW_EXECUTION_STATUS_CANCELED + }) + + // Attempt to create again - should return the same workflow ID even though it's canceled + response2, err := s.CreateCDCFlowManaged(s.t.Context(), &protos.CreateCDCFlowRequest{ConnectionConfigs: flowConnConfig}) + require.NoError(s.t, err) + require.NotNil(s.t, response2) + require.Equal(s.t, response1.WorkflowId, response2.WorkflowId) + + // Verify workflow is still in canceled state (not restarted) + desc, err := tc.DescribeWorkflowExecution(s.t.Context(), response2.WorkflowId, "") + require.NoError(s.t, err) + require.Equal(s.t, enums.WORKFLOW_EXECUTION_STATUS_CANCELED, desc.GetWorkflowExecutionInfo().GetStatus()) +} + +func (s APITestSuite) TestCreateCDCFlowManagedIdempotentAfterContinueAsNew() { + // Test: cdc flow workflow can continue-as-new and use the same id + if _, ok := s.source.(*PostgresSource); !ok { + s.t.Skip("only testing with PostgreSQL") + } + + tableName := "continue_as_new_test" + require.NoError(s.t, s.source.Exec(s.t.Context(), + fmt.Sprintf("CREATE TABLE %s(id int primary key, val text)", AttachSchema(s, tableName)))) + + connectionGen := FlowConnectionGenerationConfig{ + FlowJobName: "managed_continue_" + s.suffix, + TableNameMapping: map[string]string{AttachSchema(s, tableName): tableName}, + Destination: s.ch.Peer().Name, + } + flowConnConfig := connectionGen.GenerateFlowConnectionConfigs(s) + flowConnConfig.DoInitialSnapshot = true + + // First create a normal flow + response1, err := s.CreateCDCFlowManaged(s.t.Context(), &protos.CreateCDCFlowRequest{ConnectionConfigs: flowConnConfig}) + require.NoError(s.t, err) + require.NotNil(s.t, response1) + + tc := NewTemporalClient(s.t) + env, err := GetPeerflow(s.t.Context(), s.pg.PostgresConnector.Conn(), tc, flowConnConfig.FlowJobName) + require.NoError(s.t, err) + SetupCDCFlowStatusQuery(s.t, env, flowConnConfig) + + // Wait for flow to be running + EnvWaitFor(s.t, env, 30*time.Second, "wait for flow to be running", func() bool { + return env.GetFlowStatus(s.t) == protos.FlowStatus_STATUS_RUNNING + }) + + // Check workflow executions - should have multiple due to continue-as-new during setup->running transition + listReq := &workflowservice.ListWorkflowExecutionsRequest{ + Namespace: "default", + Query: fmt.Sprintf("WorkflowId = '%s'", response1.WorkflowId), + } + listResp, err := tc.ListWorkflow(s.t.Context(), listReq) + require.NoError(s.t, err) + require.Greater(s.t, len(listResp.Executions), 1, "Should have multiple executions (continue-as-new happened)") + + // Call CreateCDCFlowManaged again after continue-as-new - should return the same workflow ID + response2, err := s.CreateCDCFlowManaged(s.t.Context(), &protos.CreateCDCFlowRequest{ConnectionConfigs: flowConnConfig}) + require.NoError(s.t, err) + require.NotNil(s.t, response2) + require.Equal(s.t, response1.WorkflowId, response2.WorkflowId, "Should return same workflow ID after continue-as-new") + + // Verify workflow is still running + desc, err := tc.DescribeWorkflowExecution(s.t.Context(), response2.WorkflowId, "") + require.NoError(s.t, err) + require.Equal(s.t, enums.WORKFLOW_EXECUTION_STATUS_RUNNING, desc.GetWorkflowExecutionInfo().GetStatus()) + + // Clean up + env.Cancel(s.t.Context()) + RequireEnvCanceled(s.t, env) +} diff --git a/flow/e2e/pg.go b/flow/e2e/pg.go index aeebe18ce9..7943ad6024 100644 --- a/flow/e2e/pg.go +++ b/flow/e2e/pg.go @@ -5,9 +5,11 @@ import ( "errors" "fmt" "strings" + "sync" "testing" "time" + tp "github.com/Shopify/toxiproxy/v2/client" "github.com/jackc/pgx/v5" "github.com/stretchr/testify/require" @@ -264,3 +266,105 @@ func (s *PostgresSource) GetLogCount(ctx context.Context, flowJobName, errorType return int(rows.Records[0][0].Value().(int64)), nil } + +// Toxiproxy support for testing concurrent scenarios + +var ( + toxiClient *tp.Client + toxiPostgresProxy *tp.Proxy + toxiOnce sync.Once + toxiProxyPort = 9902 + toxiAdminPort = 18474 +) + +// InitToxiproxy initializes the Toxiproxy client (singleton pattern) +func InitToxiproxy() error { + var err error + toxiOnce.Do(func() { + adminAddr := fmt.Sprintf("localhost:%d", toxiAdminPort) + toxiClient = tp.NewClient(adminAddr) + // Test connection + _, err = toxiClient.Proxies() + }) + return err +} + +// SetupPostgresWithToxiproxy creates a PostgreSQL source that connects through Toxiproxy +func SetupPostgresWithToxiproxy(t *testing.T, suffix string) (*PostgresSource, *tp.Proxy, error) { + t.Helper() + + // Initialize Toxiproxy client + if err := InitToxiproxy(); err != nil { + return nil, nil, fmt.Errorf("failed to init toxiproxy: %w", err) + } + + // Get or create proxy + proxy, err := GetPostgresToxicProxy(t) + if err != nil { + return nil, nil, err + } + + // Create config pointing to proxy + config := internal.GetCatalogPostgresConfigFromEnv(t.Context()) + config.Host = "localhost" + config.Port = uint32(toxiProxyPort) + // Don't set RequireTls - let it use the default from env + + // Rest is same as SetupPostgres + connector, err := connpostgres.NewPostgresConnector(t.Context(), nil, config) + if err != nil { + return nil, nil, fmt.Errorf("failed to create postgres connection: %w", err) + } + conn := connector.Conn() + + if err := cleanPostgres(t.Context(), conn, suffix); err != nil { + connector.Close() + return nil, nil, err + } + + if err := setupPostgresSchema(t, conn, suffix); err != nil { + connector.Close() + return nil, nil, err + } + + return &PostgresSource{PostgresConnector: connector}, proxy, nil +} + +// GetPostgresToxicProxy gets or creates the PostgreSQL proxy +func GetPostgresToxicProxy(t *testing.T) (*tp.Proxy, error) { + t.Helper() + + if toxiPostgresProxy == nil { + // Get upstream from environment configuration + config := internal.GetCatalogPostgresConfigFromEnv(context.Background()) + upstream := fmt.Sprintf("%s:%d", config.Host, config.Port) + + // Try to create proxy + proxy, err := toxiClient.CreateProxy("postgres", + fmt.Sprintf("0.0.0.0:%d", toxiProxyPort), + upstream) + if err != nil { + // Proxy might already exist, try to get it + proxy, err = toxiClient.Proxy("postgres") + if err != nil { + return nil, fmt.Errorf("failed to create/get postgres proxy: %w", err) + } + } + toxiPostgresProxy = proxy + } + + // Ensure enabled and clean + if err := toxiPostgresProxy.Enable(); err != nil { + return nil, fmt.Errorf("failed to enable proxy: %w", err) + } + + // Remove all existing toxics to start fresh + toxics, err := toxiPostgresProxy.Toxics() + if err == nil { + for _, toxic := range toxics { + _ = toxiPostgresProxy.RemoveToxic(toxic.Name) + } + } + + return toxiPostgresProxy, nil +} diff --git a/flow/go.mod b/flow/go.mod index c403f8ee7f..2c413bc300 100644 --- a/flow/go.mod +++ b/flow/go.mod @@ -106,6 +106,7 @@ require ( github.com/GoogleCloudPlatform/opentelemetry-operations-go/internal/resourcemapping v0.53.0 // indirect github.com/RaduBerinde/axisds v0.0.0-20250419182453-5135a0650657 // indirect github.com/RaduBerinde/btreemap v0.0.0-20250419232817-bf0d809ae648 // indirect + github.com/Shopify/toxiproxy/v2 v2.11.0 // indirect github.com/VividCortex/ewma v1.2.0 // indirect github.com/andybalholm/brotli v1.2.0 // indirect github.com/apache/arrow-go/v18 v18.4.1 // indirect @@ -186,6 +187,7 @@ require ( github.com/google/s2a-go v0.1.9 // indirect github.com/googleapis/enterprise-certificate-proxy v0.3.6 // indirect github.com/googleapis/gax-go/v2 v2.15.0 // indirect + github.com/gorilla/mux v1.8.1 // indirect github.com/gorilla/websocket v1.5.4-0.20250319132907-e064f32e3674 // indirect github.com/grpc-ecosystem/go-grpc-middleware v1.4.0 // indirect github.com/grpc-ecosystem/go-grpc-middleware/v2 v2.3.2 // indirect @@ -209,6 +211,8 @@ require ( github.com/lestrrat-go/option/v2 v2.0.0 // indirect github.com/lufia/plan9stats v0.0.0-20250827001030-24949be3fa54 // indirect github.com/mailru/easyjson v0.9.0 // indirect + github.com/mattn/go-colorable v0.1.13 // indirect + github.com/mattn/go-isatty v0.0.20 // indirect github.com/minio/asm2plan9s v0.0.0-20200509001527-cdd76441f9d8 // indirect github.com/minio/c2goasm v0.0.0-20190812172519-36a3d3bbc4f3 // indirect github.com/minio/minlz v1.0.1 // indirect @@ -237,6 +241,8 @@ require ( github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec // indirect github.com/robfig/cron v1.2.0 // indirect github.com/rogpeppe/go-internal v1.14.1 // indirect + github.com/rs/xid v1.5.0 // indirect + github.com/rs/zerolog v1.33.0 // indirect github.com/segmentio/asm v1.2.0 // indirect github.com/shirou/gopsutil/v3 v3.24.5 // indirect github.com/shoenig/go-m1cpu v0.1.6 // indirect @@ -285,6 +291,7 @@ require ( gopkg.in/evanphx/json-patch.v4 v4.13.0 // indirect gopkg.in/inf.v0 v0.9.1 // indirect gopkg.in/natefinch/lumberjack.v2 v2.2.1 // indirect + gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect k8s.io/api v0.34.1 // indirect k8s.io/klog/v2 v2.130.1 // indirect diff --git a/flow/go.sum b/flow/go.sum index 5b0b23be09..94613de41b 100644 --- a/flow/go.sum +++ b/flow/go.sum @@ -99,6 +99,8 @@ github.com/RaduBerinde/axisds v0.0.0-20250419182453-5135a0650657 h1:8XBWWQD+vFF+ github.com/RaduBerinde/axisds v0.0.0-20250419182453-5135a0650657/go.mod h1:UHGJonU9z4YYGKJxSaC6/TNcLOBptpmM5m2Cksbnw0Y= github.com/RaduBerinde/btreemap v0.0.0-20250419232817-bf0d809ae648 h1:0s1dtMVp3XcQ1tHazU9OCLCKoqj4TRD8GFU5SscItMM= github.com/RaduBerinde/btreemap v0.0.0-20250419232817-bf0d809ae648/go.mod h1:0tr7FllbE9gJkHq7CVeeDDFAFKQVy5RnCSSNBOvdqbc= +github.com/Shopify/toxiproxy/v2 v2.11.0 h1:iXm78nBN50T2BTs1Z8w1fdC0Y1kltkJZQEyMcYyCgGQ= +github.com/Shopify/toxiproxy/v2 v2.11.0/go.mod h1:EPnGLFvhpcwVKCsbFZwyOq4PxnGg9cFbhMrVT3ROBEo= github.com/VividCortex/ewma v1.2.0 h1:f58SaIzcDXrSy3kWaHNvuJgJ3Nmz59Zji6XoJR/q1ow= github.com/VividCortex/ewma v1.2.0/go.mod h1:nz4BbCtbLyFDeC9SUHbtcT5644juEuWfUAUnGx7j5l4= github.com/aclements/go-perfevent v0.0.0-20240301234650-f7843625020f h1:JjxwchlOepwsUWcQwD2mLUAGE9aCp0/ehy6yCHFBOvo= @@ -207,6 +209,7 @@ github.com/coocood/rtutil v0.0.0-20190304133409-c84515f646f2 h1:NnLfQ77q0G4k2Of2 github.com/coocood/rtutil v0.0.0-20190304133409-c84515f646f2/go.mod h1:7qG7YFnOALvsx6tKTNmQot8d7cGFXM9TidzvRFLWYwM= github.com/coreos/go-semver v0.3.1 h1:yi21YpKnrx1gt5R+la8n5WgS0kCrsPp33dmEyHReZr4= github.com/coreos/go-semver v0.3.1/go.mod h1:irMmmIw/7yzSRPWryHsK7EYSg09caPQL03VsM8rvUec= +github.com/coreos/go-systemd/v22 v22.5.0/go.mod h1:Y58oyj3AT4RCenI/lSvhwexgC+NSVTIJ3seZv2GcEnc= github.com/coreos/go-systemd/v22 v22.6.0 h1:aGVa/v8B7hpb0TKl0MWoAavPDmHvobFe5R5zn0bCJWo= github.com/coreos/go-systemd/v22 v22.6.0/go.mod h1:iG+pp635Fo7ZmV/j14KUcmEyWF+0X7Lua8rrTWzYgWU= github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E= @@ -337,6 +340,7 @@ github.com/goccy/go-json v0.10.5 h1:Fq85nIqj+gXn/S5ahsiTlK3TmC85qgirsdTP/+DeaC4= github.com/goccy/go-json v0.10.5/go.mod h1:oq7eo15ShAhp70Anwd5lgX2pLfOS3QCiwU/PULtXL6M= github.com/godbus/dbus v0.0.0-20190726142602-4481cbc300e2 h1:ZpnhV/YsD2/4cESfV5+Hoeu/iUR3ruzNvZ+yQfO03a0= github.com/godbus/dbus v0.0.0-20190726142602-4481cbc300e2/go.mod h1:bBOAhwG1umN6/6ZUMtDFBMQR8jRg9O75tm9K00oMsK4= +github.com/godbus/dbus/v5 v5.0.4/go.mod h1:xhWf0FNVPg57R7Z0UbKHbJfkEywrmjJnf7w5xrFpKfA= github.com/gogo/googleapis v1.4.1 h1:1Yx4Myt7BxzvUr5ldGSbwYiZG6t9wGBZ+8/fX3Wvtq0= github.com/gogo/googleapis v1.4.1/go.mod h1:2lpHqI5OcWCtVElxXnPt+s8oJvMpySlOyM6xDCrzib4= github.com/gogo/protobuf v0.0.0-20180717141946-636bf0302bc9/go.mod h1:r8qH/GZQm5c6nD/R0oafs1akxWv10x8SbQlK7atdtwQ= @@ -494,6 +498,12 @@ github.com/lufia/plan9stats v0.0.0-20250827001030-24949be3fa54 h1:mFWunSatvkQQDh github.com/lufia/plan9stats v0.0.0-20250827001030-24949be3fa54/go.mod h1:autxFIvghDt3jPTLoqZ9OZ7s9qTGNAWmYCjVFWPX/zg= github.com/mailru/easyjson v0.9.0 h1:PrnmzHw7262yW8sTBwxi1PdJA3Iw/EKBa8psRf7d9a4= github.com/mailru/easyjson v0.9.0/go.mod h1:1+xMtQp2MRNVL/V1bOzuP3aP8VNwRW55fQUto+XFtTU= +github.com/mattn/go-colorable v0.1.13 h1:fFA4WZxdEF4tXPZVKMLwD8oUnCTTo08duU7wxecdEvA= +github.com/mattn/go-colorable v0.1.13/go.mod h1:7S9/ev0klgBDR4GtXTXX8a3vIGJpMovkB8vQcUbaXHg= +github.com/mattn/go-isatty v0.0.16/go.mod h1:kYGgaQfpe5nmfYZH+SKPsOc2e4SrIfOl2e/yFXSvRLM= +github.com/mattn/go-isatty v0.0.19/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y= +github.com/mattn/go-isatty v0.0.20 h1:xfD0iDuEKnDkl03q4limB+vH+GxLEtL/jb4xVJSWWEY= +github.com/mattn/go-isatty v0.0.20/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y= github.com/minio/asm2plan9s v0.0.0-20200509001527-cdd76441f9d8 h1:AMFGa4R4MiIpspGNG7Z948v4n35fFGB3RR3G/ry4FWs= github.com/minio/asm2plan9s v0.0.0-20200509001527-cdd76441f9d8/go.mod h1:mC1jAcsrzbxHt8iiaC+zU4b1ylILSosueou12R++wfY= github.com/minio/c2goasm v0.0.0-20190812172519-36a3d3bbc4f3 h1:+n/aFZefKZp7spd8DFdX7uMikMLXX4oubIzJF4kv/wI= @@ -604,6 +614,10 @@ github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFR github.com/rogpeppe/go-internal v1.9.0/go.mod h1:WtVeX8xhTBvf0smdhujwtBcq4Qrzq/fJaraNFVN+nFs= github.com/rogpeppe/go-internal v1.14.1 h1:UQB4HGPB6osV0SQTLymcB4TgvyWu6ZyliaW0tI/otEQ= github.com/rogpeppe/go-internal v1.14.1/go.mod h1:MaRKkUm5W0goXpeCfT7UZI6fk/L7L7so1lCWt35ZSgc= +github.com/rs/xid v1.5.0 h1:mKX4bl4iPYJtEIxp6CYiUuLQ/8DYMoz0PUdtGgMFRVc= +github.com/rs/xid v1.5.0/go.mod h1:trrq9SKmegXys3aeAKXMUTdJsYXVwGY3RLcfgqegfbg= +github.com/rs/zerolog v1.33.0 h1:1cU2KZkvPxNyfgEmhHAz/1A9Bz+llsdYzklWFzgp0r8= +github.com/rs/zerolog v1.33.0/go.mod h1:/7mN4D5sKwJLZQ2b/znpjC3/GQWY/xaDXUM0kKWRHss= github.com/sasha-s/go-deadlock v0.3.5 h1:tNCOEEDG6tBqrNDOX35j/7hL5FcFViG6awUGROb2NsU= github.com/sasha-s/go-deadlock v0.3.5/go.mod h1:bugP6EGbdGYObIlx7pUZtWqlvo8k9H6vCBBsiChJQ5U= github.com/segmentio/asm v1.2.0 h1:9BQrFxC+YOHJlTlHGkTrFWf59nbL3XnCoFLTwDCI7ys= @@ -870,7 +884,10 @@ golang.org/x/sys v0.0.0-20211025201205-69cdffdb9359/go.mod h1:oPkhp1MJrh7nUepCBc golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20220811171246-fbc7d0a398ab/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.1.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.12.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.36.0 h1:KVRy2GtZBrk1cBYA7MKu5bEZFxQk4NIDV6RLVcC8o0k= golang.org/x/sys v0.36.0/go.mod h1:OgkHotnGiDImocRcuBABYBEXf8A9a87e/uXjp9XT3ks= golang.org/x/telemetry v0.0.0-20250908211612-aef8a434d053 h1:dHQOQddU4YHS5gY33/6klKjq7Gp3WwMyOXGNp5nzRj8= @@ -966,6 +983,8 @@ gopkg.in/inf.v0 v0.9.1/go.mod h1:cWUDdTG/fYaXco+Dcufb5Vnc6Gp2YChqWtbxRZE0mXw= gopkg.in/natefinch/lumberjack.v2 v2.0.0/go.mod h1:l0ndWWf7gzL7RNwBG7wST/UCcT4T24xpD6X8LsfU/+k= gopkg.in/natefinch/lumberjack.v2 v2.2.1 h1:bBRl1b0OH9s/DuPhuXpNl+VtCaJXFZ5/uEFST95x9zc= gopkg.in/natefinch/lumberjack.v2 v2.2.1/go.mod h1:YD8tP3GAjkrDg1eZH7EGmyESg/lsYskCTPBJVb9jqSc= +gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7 h1:uRGJdciOHaEIrze2W8Q3AKkepLTh2hOroT7a+7czfdQ= +gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7/go.mod h1:dt/ZhP58zS4L8KSrWDmTeBkI65Dw0HsyUHuEVlX15mw= gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.2.4/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.2.8/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= diff --git a/protos/route.proto b/protos/route.proto index dec928ce3b..ee8cfe7efa 100644 --- a/protos/route.proto +++ b/protos/route.proto @@ -527,6 +527,12 @@ service FlowService { body : "*" }; } + rpc CreateCDCFlowManaged(CreateCDCFlowRequest) returns (CreateCDCFlowResponse) { + option (google.api.http) = { + post : "/v1/flows/cdc/create_managed", + body : "*" + }; + } rpc CreateQRepFlow(CreateQRepFlowRequest) returns (CreateQRepFlowResponse) { option (google.api.http) = { post : "/v1/flows/qrep/create", From bc7b61bbbf5b6ee5c8086dce290ce554ae5972b4 Mon Sep 17 00:00:00 2001 From: Ilia Demianenko Date: Wed, 17 Sep 2025 03:30:05 -0700 Subject: [PATCH 02/13] lint --- flow/cmd/handler.go | 5 +++-- flow/e2e/api_test.go | 2 +- 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/flow/cmd/handler.go b/flow/cmd/handler.go index e4f5a2e53d..6bb3a46ccd 100644 --- a/flow/cmd/handler.go +++ b/flow/cmd/handler.go @@ -85,7 +85,8 @@ func (h *FlowRequestHandler) createCdcJobEntry(ctx context.Context, description, source_table_identifier, destination_table_identifier) SELECT $1,$2,$3,$4,$5,$6,'gRPC','','' WHERE NOT EXISTS (SELECT 1 FROM flows WHERE name = $7)`, - workflowID, req.ConnectionConfigs.FlowJobName, sourcePeerID, destinationPeerID, cfgBytes, protos.FlowStatus_STATUS_SETUP, req.ConnectionConfigs.FlowJobName, + workflowID, req.ConnectionConfigs.FlowJobName, sourcePeerID, destinationPeerID, cfgBytes, + protos.FlowStatus_STATUS_SETUP, req.ConnectionConfigs.FlowJobName, ) } else { _, err = h.pool.Exec(ctx, @@ -196,7 +197,7 @@ func (h *FlowRequestHandler) CreateCDCFlowManaged( return nil, exceptions.NewInvalidArgumentApiError(errors.New("resync is not supported in the managed API")) } - workflowID := fmt.Sprintf("%s-peerflow", cfg.FlowJobName) + workflowID := cfg.FlowJobName + "-peerflow" var errNotFound *serviceerror.NotFound _, err = h.temporalClient.DescribeWorkflow(ctx, workflowID, "") if err != nil && !errors.As(err, &errNotFound) { diff --git a/flow/e2e/api_test.go b/flow/e2e/api_test.go index c53d2b58e4..bac06568f9 100644 --- a/flow/e2e/api_test.go +++ b/flow/e2e/api_test.go @@ -1524,7 +1524,7 @@ func (s APITestSuite) TestCreateCDCFlowManagedExternalFlowEntry() { cfgBytes, err := proto.Marshal(flowConnConfig) require.NoError(s.t, err) - workflowID := fmt.Sprintf("%s-peerflow", flowConnConfig.FlowJobName) + workflowID := flowConnConfig.FlowJobName + "-peerflow" _, err = conn.Exec(s.t.Context(), `INSERT INTO flows (workflow_id, name, source_peer, destination_peer, config_proto, status, description, source_table_identifier, destination_table_identifier) From 346347b1d231611b516fe8d5089e2aaf69d38a56 Mon Sep 17 00:00:00 2001 From: Ilia Demianenko Date: Wed, 17 Sep 2025 03:47:44 -0700 Subject: [PATCH 03/13] Fix tests (maybe) --- .github/workflows/flow.yml | 1 + flow/cmd/validate_mirror.go | 2 ++ flow/e2e/pg.go | 11 ++++++++++- 3 files changed, 13 insertions(+), 1 deletion(-) diff --git a/.github/workflows/flow.yml b/.github/workflows/flow.yml index 53c7966605..54f559017d 100644 --- a/.github/workflows/flow.yml +++ b/.github/workflows/flow.yml @@ -432,6 +432,7 @@ jobs: CI_MONGO_URI: mongodb://localhost:27017 CI_MONGO_USERNAME: "csuser" CI_MONGO_PASSWORD: "cspass" + TOXIPROXY_POSTGRES_HOST: postgres ENABLE_OTEL_METRICS: ${{ (matrix.db-version.pg == '16' || matrix.db-version.mysql == 'mysql-pos') && 'true' || 'false' }} OTEL_EXPORTER_OTLP_METRICS_ENDPOINT: http://localhost:4317 OTEL_EXPORTER_OTLP_METRICS_PROTOCOL: grpc diff --git a/flow/cmd/validate_mirror.go b/flow/cmd/validate_mirror.go index 70c27420a2..97872de1e3 100644 --- a/flow/cmd/validate_mirror.go +++ b/flow/cmd/validate_mirror.go @@ -19,6 +19,8 @@ var CustomColumnTypeRegex = regexp.MustCompile(`^$|^[a-zA-Z][a-zA-Z0-9(),]*$`) func (h *FlowRequestHandler) ValidateCDCMirror( ctx context.Context, req *protos.CreateCDCFlowRequest, ) (*protos.ValidateCDCMirrorResponse, error) { + // validateCDCMirrorImpl already returns a grpc error + //nopeertest:grpcReturn return h.validateCDCMirrorImpl(ctx, req, false) } diff --git a/flow/e2e/pg.go b/flow/e2e/pg.go index 7943ad6024..49c86fb27c 100644 --- a/flow/e2e/pg.go +++ b/flow/e2e/pg.go @@ -4,6 +4,7 @@ import ( "context" "errors" "fmt" + "os" "strings" "sync" "testing" @@ -337,7 +338,15 @@ func GetPostgresToxicProxy(t *testing.T) (*tp.Proxy, error) { if toxiPostgresProxy == nil { // Get upstream from environment configuration config := internal.GetCatalogPostgresConfigFromEnv(context.Background()) - upstream := fmt.Sprintf("%s:%d", config.Host, config.Port) + + // Allow override of upstream host for Toxiproxy + // In CI, Toxiproxy runs as a service container and needs to use service names + // while test code connects via localhost + upstreamHost := os.Getenv("TOXIPROXY_POSTGRES_HOST") + if upstreamHost == "" { + upstreamHost = config.Host + } + upstream := fmt.Sprintf("%s:%d", upstreamHost, config.Port) // Try to create proxy proxy, err := toxiClient.CreateProxy("postgres", From 0261dad01a813d2c13e53dca4ec9dae99f85ce73 Mon Sep 17 00:00:00 2001 From: Ilia Demianenko Date: Wed, 17 Sep 2025 04:12:57 -0700 Subject: [PATCH 04/13] or maybe not --- .github/workflows/flow.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/flow.yml b/.github/workflows/flow.yml index 54f559017d..fff47d5784 100644 --- a/.github/workflows/flow.yml +++ b/.github/workflows/flow.yml @@ -432,7 +432,7 @@ jobs: CI_MONGO_URI: mongodb://localhost:27017 CI_MONGO_USERNAME: "csuser" CI_MONGO_PASSWORD: "cspass" - TOXIPROXY_POSTGRES_HOST: postgres + TOXIPROXY_POSTGRES_HOST: catalog ENABLE_OTEL_METRICS: ${{ (matrix.db-version.pg == '16' || matrix.db-version.mysql == 'mysql-pos') && 'true' || 'false' }} OTEL_EXPORTER_OTLP_METRICS_ENDPOINT: http://localhost:4317 OTEL_EXPORTER_OTLP_METRICS_PROTOCOL: grpc From ada331d0d5ab96ac424579a119bb6831d2209f2f Mon Sep 17 00:00:00 2001 From: Ilia Demianenko Date: Thu, 2 Oct 2025 00:23:03 -0700 Subject: [PATCH 05/13] Make sure tests still run --- flow/cmd/handler.go | 71 +++++++++++-------- flow/e2e/api_test.go | 5 +- .../catalog/migrations/V48__unique_flows.sql | 14 ++++ nexus/catalog/src/lib.rs | 5 +- nexus/flow-rs/src/grpc.rs | 1 + protos/route.proto | 1 + 6 files changed, 61 insertions(+), 36 deletions(-) create mode 100644 nexus/catalog/migrations/V48__unique_flows.sql diff --git a/flow/cmd/handler.go b/flow/cmd/handler.go index 6bb3a46ccd..9065d8acf6 100644 --- a/flow/cmd/handler.go +++ b/flow/cmd/handler.go @@ -9,6 +9,7 @@ import ( "time" "github.com/google/uuid" + "github.com/jackc/pgerrcode" "github.com/jackc/pgx/v5" "github.com/jackc/pgx/v5/pgtype" tEnums "go.temporal.io/api/enums/v1" @@ -59,6 +60,12 @@ func (h *FlowRequestHandler) getPeerID(ctx context.Context, peerName string) (in return id.Int32, nil } +func (h *FlowRequestHandler) cdcJobEntryExists(ctx context.Context, flowJobName string) (bool, error) { + var exists bool + err := h.pool.QueryRow(ctx, `SELECT EXISTS(SELECT 1 FROM flows WHERE name = $1)`, flowJobName).Scan(&exists) + return exists, err +} + func (h *FlowRequestHandler) createCdcJobEntry(ctx context.Context, req *protos.CreateCDCFlowRequest, workflowID string, idempotent bool, ) error { @@ -79,24 +86,11 @@ func (h *FlowRequestHandler) createCdcJobEntry(ctx context.Context, return fmt.Errorf("unable to marshal flow config: %w", err) } - if idempotent { - _, err = h.pool.Exec(ctx, - `INSERT INTO flows (workflow_id, name, source_peer, destination_peer, config_proto, status, - description, source_table_identifier, destination_table_identifier) - SELECT $1,$2,$3,$4,$5,$6,'gRPC','','' - WHERE NOT EXISTS (SELECT 1 FROM flows WHERE name = $7)`, - workflowID, req.ConnectionConfigs.FlowJobName, sourcePeerID, destinationPeerID, cfgBytes, - protos.FlowStatus_STATUS_SETUP, req.ConnectionConfigs.FlowJobName, - ) - } else { - _, err = h.pool.Exec(ctx, - `INSERT INTO flows (workflow_id, name, source_peer, destination_peer, config_proto, status, - description, source_table_identifier, destination_table_identifier) - VALUES ($1,$2,$3,$4,$5,$6,'gRPC','','')`, - workflowID, req.ConnectionConfigs.FlowJobName, sourcePeerID, destinationPeerID, cfgBytes, protos.FlowStatus_STATUS_SETUP, - ) - } - if err != nil { + if _, err = h.pool.Exec(ctx, + `INSERT INTO flows (workflow_id, name, source_peer, destination_peer, config_proto, status, description) + VALUES ($1,$2,$3,$4,$5,$6,'gRPC')`, + workflowID, req.ConnectionConfigs.FlowJobName, sourcePeerID, destinationPeerID, cfgBytes, protos.FlowStatus_STATUS_SETUP, + ); err != nil && !(idempotent && shared.IsSQLStateError(err, pgerrcode.UniqueViolation)) { return fmt.Errorf("unable to insert into flows table for flow %s: %w", req.ConnectionConfigs.FlowJobName, err) } @@ -128,9 +122,8 @@ func (h *FlowRequestHandler) createQRepJobEntry(ctx context.Context, flowName := req.QrepConfig.FlowJobName if _, err := h.pool.Exec(ctx, `INSERT INTO flows(workflow_id,name,source_peer,destination_peer,config_proto,status, - description, destination_table_identifier, query_string) VALUES ($1,$2,$3,$4,$5,$6,'gRPC',$7,$8) + description, query_string) VALUES ($1,$2,$3,$4,$5,$6,'gRPC',$7) `, workflowID, flowName, sourcePeerID, destinationPeerID, cfgBytes, protos.FlowStatus_STATUS_RUNNING, - req.QrepConfig.DestinationTableIdentifier, req.QrepConfig.Query, ); err != nil { return fmt.Errorf("unable to insert into flows table for flow %s with source table %s: %w", @@ -193,11 +186,15 @@ func (h *FlowRequestHandler) CreateCDCFlowManaged( } cfg.Version = internalVersion - if cfg.Resync { - return nil, exceptions.NewInvalidArgumentApiError(errors.New("resync is not supported in the managed API")) + if !req.AttachToExisting { + if exists, err := h.cdcJobEntryExists(ctx, cfg.FlowJobName); err != nil { + return nil, exceptions.NewInternalApiError(fmt.Errorf("unable to check flow job entry: %w", err)) + } else if exists { + return nil, exceptions.NewAlreadyExistsApiError(fmt.Errorf("flow already exists: %s", cfg.FlowJobName)) + } } - workflowID := cfg.FlowJobName + "-peerflow" + workflowID := getWorkflowID(cfg.FlowJobName) var errNotFound *serviceerror.NotFound _, err = h.temporalClient.DescribeWorkflow(ctx, workflowID, "") if err != nil && !errors.As(err, &errNotFound) { @@ -218,12 +215,23 @@ func (h *FlowRequestHandler) CreateCDCFlowManaged( return nil, fmt.Errorf("invalid mirror: %w", err) } + return h.createCDCFlow(ctx, req, workflowID, tEnums.WORKFLOW_ID_REUSE_POLICY_REJECT_DUPLICATE) +} + +func getWorkflowID(flowName string) string { + return flowName + "-peerflow" +} + +func (h *FlowRequestHandler) createCDCFlow( + ctx context.Context, req *protos.CreateCDCFlowRequest, workflowID string, workflowIDReusePolicy tEnums.WorkflowIdReusePolicy, +) (*protos.CreateCDCFlowResponse, error) { + cfg := req.ConnectionConfigs workflowOptions := client.StartWorkflowOptions{ ID: workflowID, TaskQueue: h.peerflowTaskQueueID, TypedSearchAttributes: shared.NewSearchAttributes(cfg.FlowJobName), WorkflowIDConflictPolicy: tEnums.WORKFLOW_ID_CONFLICT_POLICY_USE_EXISTING, - WorkflowIDReusePolicy: tEnums.WORKFLOW_ID_REUSE_POLICY_REJECT_DUPLICATE, + WorkflowIDReusePolicy: workflowIDReusePolicy, } if err := h.createCdcJobEntry(ctx, req, workflowID, true); err != nil { @@ -332,6 +340,7 @@ func (h *FlowRequestHandler) shutdownFlow( DropFlowStats: deleteStats, FlowConnectionConfigs: cdcConfig, SkipDestinationDrop: skipDestinationDrop, + // NOTE: Resync is false here during snapshot-only resync }) if err != nil { slog.ErrorContext(ctx, "unable to start DropFlow workflow", logs, slog.Any("error", err)) @@ -419,7 +428,7 @@ func (h *FlowRequestHandler) FlowStateChange( } case protos.FlowStatus_STATUS_RESYNC: if currState == protos.FlowStatus_STATUS_COMPLETED { - changeErr = h.resyncMirror(ctx, req.FlowJobName, req.DropMirrorStats) + changeErr = h.resyncCompletedSnapshot(ctx, req.FlowJobName, req.DropMirrorStats) } else if isCDC, err := h.isCDCFlow(ctx, req.FlowJobName); err != nil { return nil, exceptions.NewInternalApiError(fmt.Errorf("unable to determine if mirror is cdc: %w", err)) } else if !isCDC { @@ -568,8 +577,7 @@ func (h *FlowRequestHandler) getWorkflowID(ctx context.Context, flowJobName stri return workflowID, nil } -// only supports CDC resync for now -func (h *FlowRequestHandler) resyncMirror( +func (h *FlowRequestHandler) resyncCompletedSnapshot( ctx context.Context, flowName string, dropStats bool, @@ -606,9 +614,12 @@ func (h *FlowRequestHandler) resyncMirror( return err } - if _, err := h.CreateCDCFlow(ctx, &protos.CreateCDCFlowRequest{ - ConnectionConfigs: config, - }); err != nil { + workflowID := getWorkflowID(config.FlowJobName) + if _, err := h.createCDCFlow(ctx, + &protos.CreateCDCFlowRequest{ConnectionConfigs: config}, + workflowID, + tEnums.WORKFLOW_ID_REUSE_POLICY_ALLOW_DUPLICATE, + ); err != nil { return err } return nil diff --git a/flow/e2e/api_test.go b/flow/e2e/api_test.go index bac06568f9..ebc80de04e 100644 --- a/flow/e2e/api_test.go +++ b/flow/e2e/api_test.go @@ -1526,9 +1526,8 @@ func (s APITestSuite) TestCreateCDCFlowManagedExternalFlowEntry() { workflowID := flowConnConfig.FlowJobName + "-peerflow" _, err = conn.Exec(s.t.Context(), - `INSERT INTO flows (workflow_id, name, source_peer, destination_peer, config_proto, status, - description, source_table_identifier, destination_table_identifier) - VALUES ($1,$2,$3,$4,$5,$6,'gRPC','','')`, + `INSERT INTO flows (workflow_id, name, source_peer, destination_peer, config_proto, status, description) + VALUES ($1,$2,$3,$4,$5,$6,'gRPC')`, workflowID, flowConnConfig.FlowJobName, sourcePeerID, destPeerID, cfgBytes, protos.FlowStatus_STATUS_SETUP, ) require.NoError(s.t, err) diff --git a/nexus/catalog/migrations/V48__unique_flows.sql b/nexus/catalog/migrations/V48__unique_flows.sql new file mode 100644 index 0000000000..281bfb1a94 --- /dev/null +++ b/nexus/catalog/migrations/V48__unique_flows.sql @@ -0,0 +1,14 @@ +-- these are never read +UPDATE flows +SET source_table_identifier = NULL, destination_table_identifier = NULL; + +-- deduplicate flows by name +DELETE FROM flows +WHERE id NOT IN ( + SELECT MAX(id) + FROM flows + GROUP BY name +); + +-- make them unique going forward +ALTER TABLE flows ADD CONSTRAINT flows_name_unique UNIQUE (name); diff --git a/nexus/catalog/src/lib.rs b/nexus/catalog/src/lib.rs index bc2f43285b..fc5e01728e 100644 --- a/nexus/catalog/src/lib.rs +++ b/nexus/catalog/src/lib.rs @@ -388,9 +388,9 @@ impl Catalog { .pg .prepare_typed( "INSERT INTO flows (name, source_peer, destination_peer, description, - destination_table_identifier, query_string, flow_metadata) VALUES ($1, $2, $3, $4, $5, $6, $7)", + query_string, flow_metadata) VALUES ($1, $2, $3, $4, $5, $6, $7)", &[types::Type::TEXT, types::Type::INT4, types::Type::INT4, types::Type::TEXT, - types::Type::TEXT, types::Type::TEXT, types::Type::JSONB], + types::Type::TEXT, types::Type::JSONB], ) .await?; @@ -407,7 +407,6 @@ impl Catalog { &source_peer_id, &destination_peer_id, &job.description, - &destination_table_name.as_str().unwrap(), &job.query_string, &serde_json::to_value(job.flow_options.clone()) .context("unable to serialize flow options")?, diff --git a/nexus/flow-rs/src/grpc.rs b/nexus/flow-rs/src/grpc.rs index 7bd1489b28..ac611c55ba 100644 --- a/nexus/flow-rs/src/grpc.rs +++ b/nexus/flow-rs/src/grpc.rs @@ -59,6 +59,7 @@ impl FlowGrpcClient { ) -> anyhow::Result { let create_peer_flow_req = pt::peerdb_route::CreateCdcFlowRequest { connection_configs: Some(peer_flow_config), + attach_to_existing: false, }; let response = self.client.create_cdc_flow(create_peer_flow_req).await?; let workflow_id = response.into_inner().workflow_id; diff --git a/protos/route.proto b/protos/route.proto index ee8cfe7efa..8aa56dc383 100644 --- a/protos/route.proto +++ b/protos/route.proto @@ -11,6 +11,7 @@ package peerdb_route; message CreateCDCFlowRequest { peerdb_flow.FlowConnectionConfigs connection_configs = 1; + bool attach_to_existing = 2; } message CreateCDCFlowResponse { string workflow_id = 1; } From 0c0fd454fa7545bdc3a5459f4bde578ecd330f0c Mon Sep 17 00:00:00 2001 From: Ilia Demianenko Date: Thu, 2 Oct 2025 00:31:29 -0700 Subject: [PATCH 06/13] Replace CreateCDCFlow --- flow/cmd/handler.go | 45 +---------------------- flow/e2e/api_test.go | 85 ++++++++++++++++++++++++++++++-------------- protos/route.proto | 6 ---- 3 files changed, 59 insertions(+), 77 deletions(-) diff --git a/flow/cmd/handler.go b/flow/cmd/handler.go index 9065d8acf6..450f801ac1 100644 --- a/flow/cmd/handler.go +++ b/flow/cmd/handler.go @@ -143,49 +143,6 @@ func (h *FlowRequestHandler) CreateCDCFlow( } cfg.Version = internalVersion - // For resync, we validate the mirror before dropping it and getting to this step. - // There is no point validating again here if it's a resync - the mirror is dropped already - if !cfg.Resync { - if _, err := h.ValidateCDCMirror(ctx, req); err != nil { - slog.ErrorContext(ctx, "validate mirror error", slog.Any("error", err)) - // ValidateCDCMirror already returns a grpc error - //nopeertest:grpcReturn - return nil, fmt.Errorf("invalid mirror: %w", err) - } - } - - workflowID := fmt.Sprintf("%s-peerflow-%s", cfg.FlowJobName, uuid.New()) - workflowOptions := client.StartWorkflowOptions{ - ID: workflowID, - TaskQueue: h.peerflowTaskQueueID, - TypedSearchAttributes: shared.NewSearchAttributes(cfg.FlowJobName), - } - - if err := h.createCdcJobEntry(ctx, req, workflowID, false); err != nil { - slog.ErrorContext(ctx, "unable to create flow job entry", slog.Any("error", err)) - return nil, exceptions.NewInternalApiError(fmt.Errorf("unable to create flow job entry: %w", err)) - } - - if _, err := h.temporalClient.ExecuteWorkflow(ctx, workflowOptions, peerflow.CDCFlowWorkflow, cfg, nil); err != nil { - slog.ErrorContext(ctx, "unable to start PeerFlow workflow", slog.Any("error", err)) - return nil, exceptions.NewInternalApiError(fmt.Errorf("unable to start PeerFlow workflow: %w", err)) - } - - return &protos.CreateCDCFlowResponse{ - WorkflowId: workflowID, - }, nil -} - -func (h *FlowRequestHandler) CreateCDCFlowManaged( - ctx context.Context, req *protos.CreateCDCFlowRequest, -) (*protos.CreateCDCFlowResponse, error) { - cfg := req.ConnectionConfigs - internalVersion, err := internal.PeerDBForceInternalVersion(ctx, req.ConnectionConfigs.Env) - if err != nil { - return nil, exceptions.NewInternalApiError(fmt.Errorf("failed to get internal version: %w", err)) - } - cfg.Version = internalVersion - if !req.AttachToExisting { if exists, err := h.cdcJobEntryExists(ctx, cfg.FlowJobName); err != nil { return nil, exceptions.NewInternalApiError(fmt.Errorf("unable to check flow job entry: %w", err)) @@ -200,7 +157,7 @@ func (h *FlowRequestHandler) CreateCDCFlowManaged( if err != nil && !errors.As(err, &errNotFound) { return nil, exceptions.NewInternalApiError(fmt.Errorf("failed to query the workflow execution: %w", err)) } else if err == nil { - // Previous CreateCDCFlowManaged already succeeded + // Previous CreateCDCFlow already succeeded return &protos.CreateCDCFlowResponse{ WorkflowId: workflowID, }, nil diff --git a/flow/e2e/api_test.go b/flow/e2e/api_test.go index ebc80de04e..bbbbd0dd8e 100644 --- a/flow/e2e/api_test.go +++ b/flow/e2e/api_test.go @@ -1276,7 +1276,7 @@ func (s APITestSuite) TestDropMissing() { require.NoError(s.t, err) } -func (s APITestSuite) TestCreateCDCFlowManagedConcurrentRequests() { +func (s APITestSuite) TestCreateCDCFlowAttachConcurrentRequests() { // Test: two concurrent requests succeed if _, ok := s.source.(*PostgresSource); !ok { s.t.Skip("only testing with PostgreSQL") @@ -1289,7 +1289,7 @@ func (s APITestSuite) TestCreateCDCFlowManagedConcurrentRequests() { fmt.Sprintf("INSERT INTO %s(id, val) values (1,'first')", AttachSchema(s, tableName)))) connectionGen := FlowConnectionGenerationConfig{ - FlowJobName: "managed_concurrent_" + s.suffix, + FlowJobName: "create_concurrent_" + s.suffix, TableNameMapping: map[string]string{AttachSchema(s, tableName): tableName}, Destination: s.ch.Peer().Name, } @@ -1304,11 +1304,17 @@ func (s APITestSuite) TestCreateCDCFlowManagedConcurrentRequests() { wg.Add(2) go func() { defer wg.Done() - response1, err1 = s.CreateCDCFlowManaged(s.t.Context(), &protos.CreateCDCFlowRequest{ConnectionConfigs: flowConnConfig}) + response1, err1 = s.CreateCDCFlow(s.t.Context(), &protos.CreateCDCFlowRequest{ + ConnectionConfigs: flowConnConfig, + AttachToExisting: true, + }) }() go func() { defer wg.Done() - response2, err2 = s.CreateCDCFlowManaged(s.t.Context(), &protos.CreateCDCFlowRequest{ConnectionConfigs: flowConnConfig}) + response2, err2 = s.CreateCDCFlow(s.t.Context(), &protos.CreateCDCFlowRequest{ + ConnectionConfigs: flowConnConfig, + AttachToExisting: true, + }) }() wg.Wait() @@ -1332,7 +1338,7 @@ func (s APITestSuite) TestCreateCDCFlowManagedConcurrentRequests() { RequireEnvCanceled(s.t, env) } -func (s APITestSuite) TestCreateCDCFlowManagedConcurrentRequestsToxi() { +func (s APITestSuite) TestCreateCDCFlowAttachConcurrentRequestsToxi() { // Test: use Toxiproxy to ensure concurrent requests are truly concurrent // To run locally, requires toxiproxy running: @@ -1373,7 +1379,7 @@ func (s APITestSuite) TestCreateCDCFlowManagedConcurrentRequestsToxi() { }() connectionGen := FlowConnectionGenerationConfig{ - FlowJobName: "managed_" + suffix, + FlowJobName: "create_concurrent_toxi_" + suffix, TableNameMapping: map[string]string{ fmt.Sprintf("e2e_test_%s.%s", suffix, tableName): tableName, }, @@ -1400,15 +1406,19 @@ func (s APITestSuite) TestCreateCDCFlowManagedConcurrentRequestsToxi() { go func() { defer wg.Done() start := time.Now() - response1, err1 = s.CreateCDCFlowManaged(s.t.Context(), - &protos.CreateCDCFlowRequest{ConnectionConfigs: flowConnConfig}) + response1, err1 = s.CreateCDCFlow(s.t.Context(), &protos.CreateCDCFlowRequest{ + ConnectionConfigs: flowConnConfig, + AttachToExisting: true, + }) duration1 = time.Since(start) }() go func() { defer wg.Done() start := time.Now() - response2, err2 = s.CreateCDCFlowManaged(s.t.Context(), - &protos.CreateCDCFlowRequest{ConnectionConfigs: flowConnConfig}) + response2, err2 = s.CreateCDCFlow(s.t.Context(), &protos.CreateCDCFlowRequest{ + ConnectionConfigs: flowConnConfig, + AttachToExisting: true, + }) duration2 = time.Since(start) }() @@ -1446,7 +1456,7 @@ func (s APITestSuite) TestCreateCDCFlowManagedConcurrentRequestsToxi() { RequireEnvCanceled(s.t, env) } -func (s APITestSuite) TestCreateCDCFlowManagedSequentialRequests() { +func (s APITestSuite) TestCreateCDCFlowAttachSequentialRequests() { // Test: two sequential requests succeed, same workflow is returned if _, ok := s.source.(*PostgresSource); !ok { s.t.Skip("only testing with PostgreSQL") @@ -1459,7 +1469,7 @@ func (s APITestSuite) TestCreateCDCFlowManagedSequentialRequests() { fmt.Sprintf("INSERT INTO %s(id, val) values (1,'first')", AttachSchema(s, tableName)))) connectionGen := FlowConnectionGenerationConfig{ - FlowJobName: "managed_sequential_" + s.suffix, + FlowJobName: "create_sequential_" + s.suffix, TableNameMapping: map[string]string{AttachSchema(s, tableName): tableName}, Destination: s.ch.Peer().Name, } @@ -1467,7 +1477,10 @@ func (s APITestSuite) TestCreateCDCFlowManagedSequentialRequests() { flowConnConfig.DoInitialSnapshot = true // First request - response1, err1 := s.CreateCDCFlowManaged(s.t.Context(), &protos.CreateCDCFlowRequest{ConnectionConfigs: flowConnConfig}) + response1, err1 := s.CreateCDCFlow(s.t.Context(), &protos.CreateCDCFlowRequest{ + ConnectionConfigs: flowConnConfig, + AttachToExisting: true, + }) require.NoError(s.t, err1) require.NotNil(s.t, response1) @@ -1481,7 +1494,10 @@ func (s APITestSuite) TestCreateCDCFlowManagedSequentialRequests() { }) // Second sequential request should return the same workflow ID - response2, err2 := s.CreateCDCFlowManaged(s.t.Context(), &protos.CreateCDCFlowRequest{ConnectionConfigs: flowConnConfig}) + response2, err2 := s.CreateCDCFlow(s.t.Context(), &protos.CreateCDCFlowRequest{ + ConnectionConfigs: flowConnConfig, + AttachToExisting: true, + }) require.NoError(s.t, err2) require.NotNil(s.t, response2) require.Equal(s.t, response1.WorkflowId, response2.WorkflowId) @@ -1491,7 +1507,7 @@ func (s APITestSuite) TestCreateCDCFlowManagedSequentialRequests() { RequireEnvCanceled(s.t, env) } -func (s APITestSuite) TestCreateCDCFlowManagedExternalFlowEntry() { +func (s APITestSuite) TestCreateCDCFlowAttachExternalFlowEntry() { // Test: create a flows entry from the outside (simulate a crash before the workflow is created), do a request, should succeed if _, ok := s.source.(*PostgresSource); !ok { s.t.Skip("only testing with PostgreSQL") @@ -1502,7 +1518,7 @@ func (s APITestSuite) TestCreateCDCFlowManagedExternalFlowEntry() { fmt.Sprintf("CREATE TABLE %s(id int primary key, val text)", AttachSchema(s, tableName)))) connectionGen := FlowConnectionGenerationConfig{ - FlowJobName: "managed_external_" + s.suffix, + FlowJobName: "create_external_" + s.suffix, TableNameMapping: map[string]string{AttachSchema(s, tableName): tableName}, Destination: s.ch.Peer().Name, } @@ -1532,8 +1548,11 @@ func (s APITestSuite) TestCreateCDCFlowManagedExternalFlowEntry() { ) require.NoError(s.t, err) - // Now call CreateCDCFlowManaged - should start the workflow successfully - response, err := s.CreateCDCFlowManaged(s.t.Context(), &protos.CreateCDCFlowRequest{ConnectionConfigs: flowConnConfig}) + // Now call CreateCDCFlow - should start the workflow successfully + response, err := s.CreateCDCFlow(s.t.Context(), &protos.CreateCDCFlowRequest{ + ConnectionConfigs: flowConnConfig, + AttachToExisting: true, + }) require.NoError(s.t, err) require.NotNil(s.t, response) require.Equal(s.t, workflowID, response.WorkflowId) @@ -1552,7 +1571,7 @@ func (s APITestSuite) TestCreateCDCFlowManagedExternalFlowEntry() { RequireEnvCanceled(s.t, env) } -func (s APITestSuite) TestCreateCDCFlowManagedCanceledWorkflow() { +func (s APITestSuite) TestCreateCDCFlowAttachCanceledWorkflow() { // Test: when cdc flow workflow is failed/canceled, a new one is still not created if _, ok := s.source.(*PostgresSource); !ok { s.t.Skip("only testing with PostgreSQL") @@ -1563,7 +1582,7 @@ func (s APITestSuite) TestCreateCDCFlowManagedCanceledWorkflow() { fmt.Sprintf("CREATE TABLE %s(id int primary key, val text)", AttachSchema(s, tableName)))) connectionGen := FlowConnectionGenerationConfig{ - FlowJobName: "managed_canceled_" + s.suffix, + FlowJobName: "create_canceled_" + s.suffix, TableNameMapping: map[string]string{AttachSchema(s, tableName): tableName}, Destination: s.ch.Peer().Name, } @@ -1571,7 +1590,10 @@ func (s APITestSuite) TestCreateCDCFlowManagedCanceledWorkflow() { flowConnConfig.DoInitialSnapshot = true // First create a normal flow - response1, err := s.CreateCDCFlowManaged(s.t.Context(), &protos.CreateCDCFlowRequest{ConnectionConfigs: flowConnConfig}) + response1, err := s.CreateCDCFlow(s.t.Context(), &protos.CreateCDCFlowRequest{ + ConnectionConfigs: flowConnConfig, + AttachToExisting: true, + }) require.NoError(s.t, err) require.NotNil(s.t, response1) @@ -1594,7 +1616,10 @@ func (s APITestSuite) TestCreateCDCFlowManagedCanceledWorkflow() { }) // Attempt to create again - should return the same workflow ID even though it's canceled - response2, err := s.CreateCDCFlowManaged(s.t.Context(), &protos.CreateCDCFlowRequest{ConnectionConfigs: flowConnConfig}) + response2, err := s.CreateCDCFlow(s.t.Context(), &protos.CreateCDCFlowRequest{ + ConnectionConfigs: flowConnConfig, + AttachToExisting: true, + }) require.NoError(s.t, err) require.NotNil(s.t, response2) require.Equal(s.t, response1.WorkflowId, response2.WorkflowId) @@ -1605,7 +1630,7 @@ func (s APITestSuite) TestCreateCDCFlowManagedCanceledWorkflow() { require.Equal(s.t, enums.WORKFLOW_EXECUTION_STATUS_CANCELED, desc.GetWorkflowExecutionInfo().GetStatus()) } -func (s APITestSuite) TestCreateCDCFlowManagedIdempotentAfterContinueAsNew() { +func (s APITestSuite) TestCreateCDCFlowAttachIdempotentAfterContinueAsNew() { // Test: cdc flow workflow can continue-as-new and use the same id if _, ok := s.source.(*PostgresSource); !ok { s.t.Skip("only testing with PostgreSQL") @@ -1616,7 +1641,7 @@ func (s APITestSuite) TestCreateCDCFlowManagedIdempotentAfterContinueAsNew() { fmt.Sprintf("CREATE TABLE %s(id int primary key, val text)", AttachSchema(s, tableName)))) connectionGen := FlowConnectionGenerationConfig{ - FlowJobName: "managed_continue_" + s.suffix, + FlowJobName: "create_continue_" + s.suffix, TableNameMapping: map[string]string{AttachSchema(s, tableName): tableName}, Destination: s.ch.Peer().Name, } @@ -1624,7 +1649,10 @@ func (s APITestSuite) TestCreateCDCFlowManagedIdempotentAfterContinueAsNew() { flowConnConfig.DoInitialSnapshot = true // First create a normal flow - response1, err := s.CreateCDCFlowManaged(s.t.Context(), &protos.CreateCDCFlowRequest{ConnectionConfigs: flowConnConfig}) + response1, err := s.CreateCDCFlow(s.t.Context(), &protos.CreateCDCFlowRequest{ + ConnectionConfigs: flowConnConfig, + AttachToExisting: true, + }) require.NoError(s.t, err) require.NotNil(s.t, response1) @@ -1647,8 +1675,11 @@ func (s APITestSuite) TestCreateCDCFlowManagedIdempotentAfterContinueAsNew() { require.NoError(s.t, err) require.Greater(s.t, len(listResp.Executions), 1, "Should have multiple executions (continue-as-new happened)") - // Call CreateCDCFlowManaged again after continue-as-new - should return the same workflow ID - response2, err := s.CreateCDCFlowManaged(s.t.Context(), &protos.CreateCDCFlowRequest{ConnectionConfigs: flowConnConfig}) + // Call CreateCDCFlow again after continue-as-new - should return the same workflow ID + response2, err := s.CreateCDCFlow(s.t.Context(), &protos.CreateCDCFlowRequest{ + ConnectionConfigs: flowConnConfig, + AttachToExisting: true, + }) require.NoError(s.t, err) require.NotNil(s.t, response2) require.Equal(s.t, response1.WorkflowId, response2.WorkflowId, "Should return same workflow ID after continue-as-new") diff --git a/protos/route.proto b/protos/route.proto index 8aa56dc383..e051e0c94d 100644 --- a/protos/route.proto +++ b/protos/route.proto @@ -528,12 +528,6 @@ service FlowService { body : "*" }; } - rpc CreateCDCFlowManaged(CreateCDCFlowRequest) returns (CreateCDCFlowResponse) { - option (google.api.http) = { - post : "/v1/flows/cdc/create_managed", - body : "*" - }; - } rpc CreateQRepFlow(CreateQRepFlowRequest) returns (CreateQRepFlowResponse) { option (google.api.http) = { post : "/v1/flows/qrep/create", From ca1d26a5d4bc81eefad3083bcf810e1de6cfb041 Mon Sep 17 00:00:00 2001 From: Ilia Demianenko Date: Thu, 2 Oct 2025 00:40:04 -0700 Subject: [PATCH 07/13] clippy --- nexus/catalog/src/lib.rs | 4 ---- 1 file changed, 4 deletions(-) diff --git a/nexus/catalog/src/lib.rs b/nexus/catalog/src/lib.rs index fc5e01728e..e819ce8154 100644 --- a/nexus/catalog/src/lib.rs +++ b/nexus/catalog/src/lib.rs @@ -394,10 +394,6 @@ impl Catalog { ) .await?; - let Some(destination_table_name) = job.flow_options.get("destination_table_name") else { - return Err(anyhow!("destination_table_name not found in flow options")); - }; - let _rows = self .pg .execute( From 82af0ceda2ef64305c29e19edf7e7dec30fddd71 Mon Sep 17 00:00:00 2001 From: Ilia Demianenko Date: Fri, 3 Oct 2025 21:44:27 -0700 Subject: [PATCH 08/13] Allow reusing flow names when the previous one was deleted from catalog --- flow/cmd/handler.go | 31 ++++++++++++++++++++----------- flow/cmd/validate_mirror.go | 2 +- 2 files changed, 21 insertions(+), 12 deletions(-) diff --git a/flow/cmd/handler.go b/flow/cmd/handler.go index db9fccfcc0..a84fff909a 100644 --- a/flow/cmd/handler.go +++ b/flow/cmd/handler.go @@ -152,16 +152,26 @@ func (h *FlowRequestHandler) CreateCDCFlow( workflowID := getWorkflowID(cfg.FlowJobName) var errNotFound *serviceerror.NotFound - _, err = h.temporalClient.DescribeWorkflow(ctx, workflowID, "") + desc, err := h.temporalClient.DescribeWorkflow(ctx, workflowID, "") if err != nil && !errors.As(err, &errNotFound) { return nil, NewInternalApiError(fmt.Errorf("failed to query the workflow execution: %w", err)) } else if err == nil { - // Previous CreateCDCFlow already succeeded - return &protos.CreateCDCFlowResponse{ - WorkflowId: workflowID, - }, nil + // If workflow is actively running, handle based on AttachToExisting + // Workflows in terminal states are fine + if desc.WorkflowExecutionMetadata.Status == tEnums.WORKFLOW_EXECUTION_STATUS_RUNNING || + desc.WorkflowExecutionMetadata.Status == tEnums.WORKFLOW_EXECUTION_STATUS_CONTINUED_AS_NEW { + if req.AttachToExisting { + // Idempotent attach to running workflow + return &protos.CreateCDCFlowResponse{ + WorkflowId: workflowID, + }, nil + } else { + // Can't create duplicate of running workflow + return nil, NewAlreadyExistsApiError(fmt.Errorf("workflow already exists for flow: %s", cfg.FlowJobName)) + } + } } - // Workflow not found, do the validations and start a new one + // No running workflow, do the validations and start a new one // Use idempotent validation that skips mirror existence check if _, err := h.validateCDCMirrorImpl(ctx, req, true); err != nil { @@ -169,7 +179,7 @@ func (h *FlowRequestHandler) CreateCDCFlow( return nil, NewInternalApiError(fmt.Errorf("invalid mirror: %w", err)) } - if resp, err := h.createCDCFlow(ctx, req, workflowID, tEnums.WORKFLOW_ID_REUSE_POLICY_REJECT_DUPLICATE); err != nil { + if resp, err := h.createCDCFlow(ctx, req, workflowID); err != nil { return nil, NewInternalApiError(err) } else { return resp, nil @@ -181,15 +191,15 @@ func getWorkflowID(flowName string) string { } func (h *FlowRequestHandler) createCDCFlow( - ctx context.Context, req *protos.CreateCDCFlowRequest, workflowID string, workflowIDReusePolicy tEnums.WorkflowIdReusePolicy, + ctx context.Context, req *protos.CreateCDCFlowRequest, workflowID string, ) (*protos.CreateCDCFlowResponse, error) { cfg := req.ConnectionConfigs workflowOptions := client.StartWorkflowOptions{ ID: workflowID, TaskQueue: h.peerflowTaskQueueID, TypedSearchAttributes: shared.NewSearchAttributes(cfg.FlowJobName), - WorkflowIDConflictPolicy: tEnums.WORKFLOW_ID_CONFLICT_POLICY_USE_EXISTING, - WorkflowIDReusePolicy: workflowIDReusePolicy, + WorkflowIDConflictPolicy: tEnums.WORKFLOW_ID_CONFLICT_POLICY_USE_EXISTING, // two racing requests end up with the same workflow + WorkflowIDReusePolicy: tEnums.WORKFLOW_ID_REUSE_POLICY_ALLOW_DUPLICATE, // but creating the same id as a completed one is allowed } if err := h.createCdcJobEntry(ctx, req, workflowID, true); err != nil { @@ -572,7 +582,6 @@ func (h *FlowRequestHandler) resyncCompletedSnapshot( if _, err := h.createCDCFlow(ctx, &protos.CreateCDCFlowRequest{ConnectionConfigs: config}, workflowID, - tEnums.WORKFLOW_ID_REUSE_POLICY_ALLOW_DUPLICATE, ); err != nil { return err } diff --git a/flow/cmd/validate_mirror.go b/flow/cmd/validate_mirror.go index d585f0c335..58f01871f4 100644 --- a/flow/cmd/validate_mirror.go +++ b/flow/cmd/validate_mirror.go @@ -36,7 +36,7 @@ func (h *FlowRequestHandler) validateCDCMirrorImpl( return nil, NewUnavailableApiError(ErrUnderMaintenance) } - // Skip mirror existence check when idempotent (for managed API) + // Skip mirror existence check when idempotent (for managed creates) if !idempotent && !req.ConnectionConfigs.Resync { mirrorExists, existCheckErr := h.checkIfMirrorNameExists(ctx, req.ConnectionConfigs.FlowJobName) if existCheckErr != nil { From 81466dbe7cfc4bbac9509953cd9314f36893e8b0 Mon Sep 17 00:00:00 2001 From: Ilia Demianenko Date: Fri, 3 Oct 2025 22:23:56 -0700 Subject: [PATCH 09/13] Fix tests --- flow/e2e/api_test.go | 23 ++++++++++++++++++----- flow/e2e/elasticsearch_qrep_test.go | 12 ++++++++---- flow/e2e/s3_qrep_test.go | 8 ++++---- 3 files changed, 30 insertions(+), 13 deletions(-) diff --git a/flow/e2e/api_test.go b/flow/e2e/api_test.go index 9ba3e1b3ed..35f4460652 100644 --- a/flow/e2e/api_test.go +++ b/flow/e2e/api_test.go @@ -1594,7 +1594,7 @@ func (s APITestSuite) TestCreateCDCFlowAttachExternalFlowEntry() { } func (s APITestSuite) TestCreateCDCFlowAttachCanceledWorkflow() { - // Test: when cdc flow workflow is failed/canceled, a new one is still not created + // Test: when cdc flow workflow is failed/canceled, a new run can be created with the same workflow ID if _, ok := s.source.(*PostgresSource); !ok { s.t.Skip("only testing with PostgreSQL") } @@ -1628,16 +1628,21 @@ func (s APITestSuite) TestCreateCDCFlowAttachCanceledWorkflow() { RequireEnvCanceled(s.t, env) // Wait for workflow to be canceled + var firstRunID string EnvWaitFor(s.t, env, 30*time.Second, "wait for workflow to be canceled", func() bool { desc, err := tc.DescribeWorkflowExecution(s.t.Context(), response1.WorkflowId, "") if err != nil { return false } status := desc.GetWorkflowExecutionInfo().GetStatus() - return status == enums.WORKFLOW_EXECUTION_STATUS_CANCELED + if status == enums.WORKFLOW_EXECUTION_STATUS_CANCELED { + firstRunID = desc.GetWorkflowExecutionInfo().GetExecution().GetRunId() + return true + } + return false }) - // Attempt to create again - should return the same workflow ID even though it's canceled + // Attempt to create again - should create a new workflow run with the same workflow ID response2, err := s.CreateCDCFlow(s.t.Context(), &protos.CreateCDCFlowRequest{ ConnectionConfigs: flowConnConfig, AttachToExisting: true, @@ -1646,10 +1651,18 @@ func (s APITestSuite) TestCreateCDCFlowAttachCanceledWorkflow() { require.NotNil(s.t, response2) require.Equal(s.t, response1.WorkflowId, response2.WorkflowId) - // Verify workflow is still in canceled state (not restarted) + // Verify a new workflow run was created (different run ID, status is RUNNING) desc, err := tc.DescribeWorkflowExecution(s.t.Context(), response2.WorkflowId, "") require.NoError(s.t, err) - require.Equal(s.t, enums.WORKFLOW_EXECUTION_STATUS_CANCELED, desc.GetWorkflowExecutionInfo().GetStatus()) + require.Equal(s.t, enums.WORKFLOW_EXECUTION_STATUS_RUNNING, desc.GetWorkflowExecutionInfo().GetStatus()) + require.NotEqual(s.t, firstRunID, desc.GetWorkflowExecutionInfo().GetExecution().GetRunId(), + "should have created a new workflow run") + + // Clean up + env, err = GetPeerflow(s.t.Context(), s.pg.PostgresConnector.Conn(), tc, flowConnConfig.FlowJobName) + require.NoError(s.t, err) + env.Cancel(s.t.Context()) + RequireEnvCanceled(s.t, env) } func (s APITestSuite) TestCreateCDCFlowAttachIdempotentAfterContinueAsNew() { diff --git a/flow/e2e/elasticsearch_qrep_test.go b/flow/e2e/elasticsearch_qrep_test.go index 229a5edbb6..0a7804c03e 100644 --- a/flow/e2e/elasticsearch_qrep_test.go +++ b/flow/e2e/elasticsearch_qrep_test.go @@ -16,7 +16,8 @@ func Test_Elasticsearch(t *testing.T) { } func (s elasticsearchSuite) Test_Simple_QRep_Append() { - srcTableName := AttachSchema(s, "es_simple_append") + jobName := AddSuffix(s, "test_es_simple_append") + srcTableName := AttachSchema(s, "test_es_simple_append") _, err := s.conn.Conn().Exec(s.t.Context(), fmt.Sprintf(` CREATE TABLE IF NOT EXISTS %s ( @@ -41,7 +42,8 @@ func (s elasticsearchSuite) Test_Simple_QRep_Append() { query := fmt.Sprintf("SELECT * FROM %s WHERE updated_at BETWEEN {{.start}} AND {{.end}}", srcTableName) - qrepConfig := CreateQRepWorkflowConfig(s.t, "test_es_simple_qrep", + qrepConfig := CreateQRepWorkflowConfig(s.t, + jobName, srcTableName, srcTableName, query, @@ -69,7 +71,8 @@ func (s elasticsearchSuite) Test_Simple_QRep_Append() { } func (s elasticsearchSuite) Test_Simple_QRep_Upsert() { - srcTableName := AttachSchema(s, "es_simple_upsert") + jobName := AddSuffix(s, "test_es_simple_upsert") + srcTableName := AttachSchema(s, "test_es_simple_upsert") _, err := s.conn.Conn().Exec(s.t.Context(), fmt.Sprintf(` CREATE TABLE IF NOT EXISTS %s ( @@ -94,7 +97,8 @@ func (s elasticsearchSuite) Test_Simple_QRep_Upsert() { query := fmt.Sprintf("SELECT * FROM %s WHERE updated_at BETWEEN {{.start}} AND {{.end}}", srcTableName) - qrepConfig := CreateQRepWorkflowConfig(s.t, "test_es_simple_qrep", + qrepConfig := CreateQRepWorkflowConfig(s.t, + jobName, srcTableName, srcTableName, query, diff --git a/flow/e2e/s3_qrep_test.go b/flow/e2e/s3_qrep_test.go index 88b4558c75..0f4a8acb97 100644 --- a/flow/e2e/s3_qrep_test.go +++ b/flow/e2e/s3_qrep_test.go @@ -127,8 +127,8 @@ func (s PeerFlowE2ETestSuiteS3) Test_Complete_QRep_Flow_S3() { tc := NewTemporalClient(s.t) - jobName := "test_complete_flow_s3" - schemaQualifiedName := fmt.Sprintf("e2e_test_%s.%s", s.suffix, jobName) + jobName := AddSuffix(s, "test_complete_flow_s3") + schemaQualifiedName := AttachSchema(s, "test_complete_flow_s3") s.setupSourceTable(jobName, 10) query := fmt.Sprintf("SELECT * FROM %s WHERE updated_at >= {{.start}} AND updated_at < {{.end}}", @@ -170,8 +170,8 @@ func (s PeerFlowE2ETestSuiteS3) Test_Complete_QRep_Flow_S3_CTID() { tc := NewTemporalClient(s.t) - jobName := "test_complete_flow_s3_ctid" - schemaQualifiedName := fmt.Sprintf("e2e_test_%s.%s", s.suffix, jobName) + jobName := AddSuffix(s, "test_complete_flow_s3_ctid") + schemaQualifiedName := AttachSchema(s, "test_complete_flow_s3_ctid") s.setupSourceTable(jobName, 20000) query := fmt.Sprintf("SELECT * FROM %s WHERE ctid BETWEEN {{.start}} AND {{.end}}", schemaQualifiedName) From a5294d32c6c26bdac31f2bcb237ebf7cc2731d38 Mon Sep 17 00:00:00 2001 From: Ilia Demianenko Date: Fri, 3 Oct 2025 22:45:07 -0700 Subject: [PATCH 10/13] makes sense --- flow/e2e/s3_qrep_test.go | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/flow/e2e/s3_qrep_test.go b/flow/e2e/s3_qrep_test.go index 0f4a8acb97..1a74aefafe 100644 --- a/flow/e2e/s3_qrep_test.go +++ b/flow/e2e/s3_qrep_test.go @@ -127,8 +127,9 @@ func (s PeerFlowE2ETestSuiteS3) Test_Complete_QRep_Flow_S3() { tc := NewTemporalClient(s.t) - jobName := AddSuffix(s, "test_complete_flow_s3") - schemaQualifiedName := AttachSchema(s, "test_complete_flow_s3") + tableName := "test_complete_flow_s3" + jobName := AddSuffix(s, tableName) + schemaQualifiedName := AttachSchema(s, tableName) s.setupSourceTable(jobName, 10) query := fmt.Sprintf("SELECT * FROM %s WHERE updated_at >= {{.start}} AND updated_at < {{.end}}", @@ -170,8 +171,9 @@ func (s PeerFlowE2ETestSuiteS3) Test_Complete_QRep_Flow_S3_CTID() { tc := NewTemporalClient(s.t) - jobName := AddSuffix(s, "test_complete_flow_s3_ctid") - schemaQualifiedName := AttachSchema(s, "test_complete_flow_s3_ctid") + tableName := "test_complete_flow_s3_ctid" + jobName := AddSuffix(s, tableName) + schemaQualifiedName := AttachSchema(s, tableName) s.setupSourceTable(jobName, 20000) query := fmt.Sprintf("SELECT * FROM %s WHERE ctid BETWEEN {{.start}} AND {{.end}}", schemaQualifiedName) From 9e619e7555ec06a228e7c6ca7627e65a383fc4e9 Mon Sep 17 00:00:00 2001 From: Ilia Demianenko Date: Fri, 3 Oct 2025 23:22:56 -0700 Subject: [PATCH 11/13] test your tests --- flow/e2e/api_test.go | 14 ++++++++------ flow/e2e/s3_qrep_test.go | 4 ++-- 2 files changed, 10 insertions(+), 8 deletions(-) diff --git a/flow/e2e/api_test.go b/flow/e2e/api_test.go index 35f4460652..4328b57fa7 100644 --- a/flow/e2e/api_test.go +++ b/flow/e2e/api_test.go @@ -1117,18 +1117,20 @@ func (s APITestSuite) TestQRep() { PeerName: s.source.GeneratePeer(s.t).Name, }) require.NoError(s.t, err) - tblName := "qrepapi" + s.suffix - schemaQualified := AttachSchema(s, tblName) + tableName := "qrepapi" + schemaQualified := AttachSchema(s, tableName) require.NoError(s.t, s.source.Exec(s.t.Context(), fmt.Sprintf("CREATE TABLE %s(id int primary key, val text)", schemaQualified))) require.NoError(s.t, s.source.Exec(s.t.Context(), fmt.Sprintf("INSERT INTO %s(id, val) values (1,'first')", schemaQualified))) + flowName := fmt.Sprintf("qrepapiflow_%s_%s", peerType.PeerType, s.suffix) + destTableName := AddSuffix(s, tableName) qrepConfig := CreateQRepWorkflowConfig( s.t, - "qrepapiflow"+"_"+peerType.PeerType, + flowName, schemaQualified, - tblName, + destTableName, fmt.Sprintf("SELECT * FROM %s WHERE id BETWEEN {{.start}} AND {{.end}}", schemaQualified), s.ch.Peer().Name, "", @@ -1150,12 +1152,12 @@ func (s APITestSuite) TestQRep() { env, err := GetPeerflow(s.t.Context(), s.pg.PostgresConnector.Conn(), tc, qrepConfig.FlowJobName) require.NoError(s.t, err) - EnvWaitForEqualTables(env, s.ch, "qrep initial load", tblName, "id,val") + EnvWaitForEqualTables(env, s.ch, "qrep initial load", destTableName, "id,val") require.NoError(s.t, s.source.Exec(s.t.Context(), fmt.Sprintf("INSERT INTO %s(id, val) values (2,'second')", schemaQualified))) - EnvWaitForEqualTables(env, s.ch, "insert post qrep initial load", tblName, "id,val") + EnvWaitForEqualTables(env, s.ch, "insert post qrep initial load", destTableName, "id,val") statusResponse, err := s.MirrorStatus(s.t.Context(), &protos.MirrorStatusRequest{ FlowJobName: qrepConfig.FlowJobName, IncludeFlowInfo: true, diff --git a/flow/e2e/s3_qrep_test.go b/flow/e2e/s3_qrep_test.go index 1a74aefafe..47147504dc 100644 --- a/flow/e2e/s3_qrep_test.go +++ b/flow/e2e/s3_qrep_test.go @@ -131,7 +131,7 @@ func (s PeerFlowE2ETestSuiteS3) Test_Complete_QRep_Flow_S3() { jobName := AddSuffix(s, tableName) schemaQualifiedName := AttachSchema(s, tableName) - s.setupSourceTable(jobName, 10) + s.setupSourceTable(tableName, 10) query := fmt.Sprintf("SELECT * FROM %s WHERE updated_at >= {{.start}} AND updated_at < {{.end}}", schemaQualifiedName) qrepConfig := CreateQRepWorkflowConfig( @@ -175,7 +175,7 @@ func (s PeerFlowE2ETestSuiteS3) Test_Complete_QRep_Flow_S3_CTID() { jobName := AddSuffix(s, tableName) schemaQualifiedName := AttachSchema(s, tableName) - s.setupSourceTable(jobName, 20000) + s.setupSourceTable(tableName, 20000) query := fmt.Sprintf("SELECT * FROM %s WHERE ctid BETWEEN {{.start}} AND {{.end}}", schemaQualifiedName) qrepConfig := CreateQRepWorkflowConfig( s.t, From 164469f8fc7f987f46a9ccfd79c35d872854b2cb Mon Sep 17 00:00:00 2001 From: Ilia Demianenko Date: Fri, 3 Oct 2025 23:49:23 -0700 Subject: [PATCH 12/13] more tests --- flow/e2e/bigquery_qrep_test.go | 12 ++++++-- flow/e2e/postgres_qrep_test.go | 54 +++++++++++++++++++++------------ flow/e2e/snowflake_qrep_test.go | 23 +++++++++----- 3 files changed, 58 insertions(+), 31 deletions(-) diff --git a/flow/e2e/bigquery_qrep_test.go b/flow/e2e/bigquery_qrep_test.go index f9797ed976..c37382d563 100644 --- a/flow/e2e/bigquery_qrep_test.go +++ b/flow/e2e/bigquery_qrep_test.go @@ -58,7 +58,9 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Complete_QRep_Flow_Avro() { query := fmt.Sprintf("SELECT * FROM e2e_test_%s.%s WHERE updated_at BETWEEN {{.start}} AND {{.end}}", s.bqSuffix, tblName) - qrepConfig := CreateQRepWorkflowConfig(s.t, "test_qrep_flow_avro", + jobName := AddSuffix(s, tblName) + qrepConfig := CreateQRepWorkflowConfig(s.t, + jobName, fmt.Sprintf("e2e_test_%s.%s", s.bqSuffix, tblName), tblName, query, @@ -82,7 +84,9 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Invalid_Timestamps_And_Date_QRep() { query := fmt.Sprintf("SELECT * FROM e2e_test_%s.%s WHERE watermark_ts BETWEEN {{.start}} AND {{.end}}", s.bqSuffix, tblName) - qrepConfig := CreateQRepWorkflowConfig(s.t, "test_invalid_time_bq", + jobName := AddSuffix(s, tblName) + qrepConfig := CreateQRepWorkflowConfig(s.t, + jobName, fmt.Sprintf("e2e_test_%s.%s", s.bqSuffix, tblName), tblName, query, @@ -123,7 +127,9 @@ func (s PeerFlowE2ETestSuiteBQ) Test_PeerDB_Columns_QRep_BQ() { query := fmt.Sprintf("SELECT * FROM e2e_test_%s.%s WHERE updated_at BETWEEN {{.start}} AND {{.end}}", s.bqSuffix, tblName) - qrepConfig := CreateQRepWorkflowConfig(s.t, "test_qrep_flow_avro", + jobName := AddSuffix(s, tblName) + qrepConfig := CreateQRepWorkflowConfig(s.t, + jobName, fmt.Sprintf("e2e_test_%s.%s", s.bqSuffix, tblName), tblName, query, diff --git a/flow/e2e/postgres_qrep_test.go b/flow/e2e/postgres_qrep_test.go index 293af9d362..92a1c9b6a6 100644 --- a/flow/e2e/postgres_qrep_test.go +++ b/flow/e2e/postgres_qrep_test.go @@ -166,10 +166,11 @@ func (s PeerFlowE2ETestSuitePG) TestSimpleSlotCreation() { func (s PeerFlowE2ETestSuitePG) Test_Complete_QRep_Flow_Multi_Insert_PG() { numRows := 10 - srcTable := "test_qrep_flow_avro_pg_1" + baseName := "test_qrep_flow_avro_pg" + srcTable := baseName + "_1" s.setupSourceTable(srcTable, numRows) - dstTable := "test_qrep_flow_avro_pg_2" + dstTable := baseName + "_2" err := CreateTableForQRep(s.t.Context(), s.Conn(), s.suffix, dstTable) require.NoError(s.t, err) @@ -180,9 +181,10 @@ func (s PeerFlowE2ETestSuitePG) Test_Complete_QRep_Flow_Multi_Insert_PG() { query := fmt.Sprintf("SELECT * FROM e2e_test_%s.%s WHERE updated_at BETWEEN {{.start}} AND {{.end}}", s.suffix, srcTable) + jobName := AddSuffix(s, baseName) qrepConfig := CreateQRepWorkflowConfig( s.t, - "test_qrep_flow_avro_pg", + jobName, srcSchemaQualified, dstSchemaQualified, query, @@ -204,10 +206,11 @@ func (s PeerFlowE2ETestSuitePG) Test_Complete_QRep_Flow_Multi_Insert_PG() { func (s PeerFlowE2ETestSuitePG) Test_PG_TypeSystemQRep() { numRows := 10 - srcTable := "test_qrep_flow_pgpg_1" + baseName := "test_qrep_flow_pgpg" + srcTable := baseName + "_1" s.setupSourceTable(srcTable, numRows) - dstTable := "test_qrep_flow_pgpg_2" + dstTable := baseName + "_2" err := CreateTableForQRep(s.t.Context(), s.Conn(), s.suffix, dstTable) require.NoError(s.t, err) @@ -218,9 +221,10 @@ func (s PeerFlowE2ETestSuitePG) Test_PG_TypeSystemQRep() { query := fmt.Sprintf("SELECT * FROM %s WHERE updated_at BETWEEN {{.start}} AND {{.end}}", srcSchemaQualified) + jobName := AddSuffix(s, baseName) qrepConfig := CreateQRepWorkflowConfig( s.t, - "test_qrep_flow_pgpg", + jobName, srcSchemaQualified, dstSchemaQualified, query, @@ -243,10 +247,11 @@ func (s PeerFlowE2ETestSuitePG) Test_PG_TypeSystemQRep() { func (s PeerFlowE2ETestSuitePG) Test_PeerDB_Columns_QRep_PG() { numRows := 10 - srcTable := "test_qrep_columns_pg_1" + baseName := "test_qrep_columns_pg" + srcTable := baseName + "_1" s.setupSourceTable(srcTable, numRows) - dstTable := "test_qrep_columns_pg_2" + dstTable := baseName + "_2" srcSchemaQualified := fmt.Sprintf("%s_%s.%s", "e2e_test", s.suffix, srcTable) dstSchemaQualified := fmt.Sprintf("%s_%s.%s", "e2e_test", s.suffix, dstTable) @@ -254,9 +259,10 @@ func (s PeerFlowE2ETestSuitePG) Test_PeerDB_Columns_QRep_PG() { query := fmt.Sprintf("SELECT * FROM e2e_test_%s.%s WHERE updated_at BETWEEN {{.start}} AND {{.end}}", s.suffix, srcTable) + jobName := AddSuffix(s, baseName) qrepConfig := CreateQRepWorkflowConfig( s.t, - "test_qrep_columns_pg", + jobName, srcSchemaQualified, dstSchemaQualified, query, @@ -278,10 +284,11 @@ func (s PeerFlowE2ETestSuitePG) Test_PeerDB_Columns_QRep_PG() { func (s PeerFlowE2ETestSuitePG) Test_Overwrite_PG() { numRows := 10 - srcTable := "test_overwrite_pg_1" + baseName := "test_overwrite_pg" + srcTable := baseName + "_1" s.setupSourceTable(srcTable, numRows) - dstTable := "test_overwrite_pg_2" + dstTable := baseName + "_2" srcSchemaQualified := fmt.Sprintf("%s_%s.%s", "e2e_test", s.suffix, srcTable) dstSchemaQualified := fmt.Sprintf("%s_%s.%s", "e2e_test", s.suffix, dstTable) @@ -289,9 +296,10 @@ func (s PeerFlowE2ETestSuitePG) Test_Overwrite_PG() { query := fmt.Sprintf("SELECT * FROM e2e_test_%s.%s WHERE updated_at BETWEEN {{.start}} AND {{.end}}", s.suffix, srcTable) + jobName := AddSuffix(s, baseName) qrepConfig := CreateQRepWorkflowConfig( s.t, - "test_overwrite_pg", + jobName, srcSchemaQualified, dstSchemaQualified, query, @@ -327,10 +335,11 @@ func (s PeerFlowE2ETestSuitePG) Test_Overwrite_PG() { func (s PeerFlowE2ETestSuitePG) Test_No_Rows_QRep_PG() { numRows := 0 - srcTable := "test_no_rows_qrep_pg_1" + baseName := "test_no_rows_qrep_pg" + srcTable := baseName + "_1" s.setupSourceTable(srcTable, numRows) - dstTable := "test_no_rows_qrep_pg_2" + dstTable := baseName + "_2" srcSchemaQualified := fmt.Sprintf("%s_%s.%s", "e2e_test", s.suffix, srcTable) dstSchemaQualified := fmt.Sprintf("%s_%s.%s", "e2e_test", s.suffix, dstTable) @@ -338,9 +347,10 @@ func (s PeerFlowE2ETestSuitePG) Test_No_Rows_QRep_PG() { query := fmt.Sprintf("SELECT * FROM e2e_test_%s.%s WHERE updated_at BETWEEN {{.start}} AND {{.end}}", s.suffix, srcTable) + jobName := AddSuffix(s, baseName) qrepConfig := CreateQRepWorkflowConfig( s.t, - "test_no_rows_qrep_pg", + jobName, srcSchemaQualified, dstSchemaQualified, query, @@ -360,10 +370,10 @@ func (s PeerFlowE2ETestSuitePG) Test_No_Rows_QRep_PG() { func (s PeerFlowE2ETestSuitePG) TestQRepPause() { numRows := 10 - srcTable := "qrep_pause" + srcTable := "test_qrep_pause" s.setupSourceTable(srcTable, numRows) - dstTable := "qrep_pause_dst" + dstTable := "test_qrep_pause_dst" srcSchemaQualified := fmt.Sprintf("%s_%s.%s", "e2e_test", s.suffix, srcTable) dstSchemaQualified := fmt.Sprintf("%s_%s.%s", "e2e_test", s.suffix, dstTable) @@ -371,9 +381,10 @@ func (s PeerFlowE2ETestSuitePG) TestQRepPause() { query := fmt.Sprintf("SELECT * FROM e2e_test_%s.%s WHERE updated_at BETWEEN {{.start}} AND {{.end}}", s.suffix, srcTable) + jobName := AddSuffix(s, srcTable) config := CreateQRepWorkflowConfig( s.t, - "test_qrep_pause_pg", + jobName, srcSchemaQualified, dstSchemaQualified, query, @@ -422,6 +433,7 @@ func (s PeerFlowE2ETestSuitePG) TestQRepPause() { func (s PeerFlowE2ETestSuitePG) TestXminPause() { numRows := 10 + baseName := "test_xmin_pause_pg" srcTable := "xmin_pause" s.setupSourceTable(srcTable, numRows) @@ -432,9 +444,10 @@ func (s PeerFlowE2ETestSuitePG) TestXminPause() { query := fmt.Sprintf("SELECT * FROM e2e_test_%s.%s", s.suffix, srcTable) + jobName := AddSuffix(s, baseName) config := CreateQRepWorkflowConfig( s.t, - "test_xmin_pause_pg", + jobName, srcSchemaQualified, dstSchemaQualified, query, @@ -498,9 +511,10 @@ func (s PeerFlowE2ETestSuitePG) TestTransform() { ('pgtransform', 'lua', 'function transformRow(row) row.myreal = 1729 end') on conflict do nothing`) require.NoError(s.t, err) + jobName := AddSuffix(s, srcTable) qrepConfig := CreateQRepWorkflowConfig( s.t, - "test_transform", + jobName, srcSchemaQualified, dstSchemaQualified, query, diff --git a/flow/e2e/snowflake_qrep_test.go b/flow/e2e/snowflake_qrep_test.go index 2c76af517b..0d929326c5 100644 --- a/flow/e2e/snowflake_qrep_test.go +++ b/flow/e2e/snowflake_qrep_test.go @@ -58,9 +58,10 @@ func (s PeerFlowE2ETestSuiteSF) Test_Complete_QRep_Flow_Avro_SF() { query := fmt.Sprintf("SELECT * FROM e2e_test_%s.%s WHERE updated_at BETWEEN {{.start}} AND {{.end}}", s.pgSuffix, tblName) + jobName := AddSuffix(s, tblName) qrepConfig := CreateQRepWorkflowConfig( s.t, - "test_qrep_flow_avro_sf", + jobName, fmt.Sprintf("e2e_test_%s.%s", s.pgSuffix, tblName), dstSchemaQualified, query, @@ -95,9 +96,10 @@ func (s PeerFlowE2ETestSuiteSF) Test_Complete_QRep_Flow_Avro_SF_Upsert_Simple() query := fmt.Sprintf("SELECT * FROM e2e_test_%s.%s WHERE updated_at BETWEEN {{.start}} AND {{.end}}", s.pgSuffix, tblName) + jobName := AddSuffix(s, tblName) qrepConfig := CreateQRepWorkflowConfig( s.t, - "test_qrep_flow_avro_sf", + jobName, fmt.Sprintf("e2e_test_%s.%s", s.pgSuffix, tblName), dstSchemaQualified, query, @@ -135,9 +137,10 @@ func (s PeerFlowE2ETestSuiteSF) Test_Complete_QRep_Flow_Avro_SF_S3() { query := fmt.Sprintf("SELECT * FROM e2e_test_%s.%s WHERE updated_at BETWEEN {{.start}} AND {{.end}}", s.pgSuffix, tblName) + jobName := AddSuffix(s, tblName) qrepConfig := CreateQRepWorkflowConfig( s.t, - "test_qrep_flow_avro_sf", + jobName, s.attachSchemaSuffix(tblName), dstSchemaQualified, query, @@ -170,9 +173,10 @@ func (s PeerFlowE2ETestSuiteSF) Test_Complete_QRep_Flow_Avro_SF_Upsert_XMIN() { query := fmt.Sprintf("SELECT * FROM e2e_test_%s.%s", s.pgSuffix, tblName) + jobName := AddSuffix(s, tblName) qrepConfig := CreateQRepWorkflowConfig( s.t, - "test_qrep_flow_avro_sf_xmin", + jobName, fmt.Sprintf("e2e_test_%s.%s", s.pgSuffix, tblName), dstSchemaQualified, query, @@ -213,9 +217,10 @@ func (s PeerFlowE2ETestSuiteSF) Test_Complete_QRep_Flow_Avro_SF_S3_Integration() s.sfHelper.Config.S3Integration = "peerdb_s3_integration" + jobName := AddSuffix(s, tblName) qrepConfig := CreateQRepWorkflowConfig( s.t, - "test_qrep_flow_avro_sf_int", + jobName, s.attachSchemaSuffix(tblName), dstSchemaQualified, query, @@ -241,7 +246,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_PeerDB_Columns_QRep_SF() { numRows := 10 - tblName := "test_qrep_columns_sf" + tblName := "test_columns_qrep_sf" s.setupSourceTable(tblName, numRows) dstSchemaQualified := fmt.Sprintf("%s.%s", s.sfHelper.testSchemaName, tblName) @@ -249,9 +254,10 @@ func (s PeerFlowE2ETestSuiteSF) Test_PeerDB_Columns_QRep_SF() { query := fmt.Sprintf("SELECT * FROM e2e_test_%s.%s WHERE updated_at BETWEEN {{.start}} AND {{.end}}", s.pgSuffix, tblName) + jobName := AddSuffix(s, tblName) qrepConfig := CreateQRepWorkflowConfig( s.t, - "test_columns_qrep_sf", + jobName, fmt.Sprintf("e2e_test_%s.%s", s.pgSuffix, tblName), dstSchemaQualified, query, @@ -287,9 +293,10 @@ func (s PeerFlowE2ETestSuiteSF) Test_Soft_Delete_Default_False_SF() { query := fmt.Sprintf("SELECT * FROM e2e_test_%s.%s WHERE updated_at BETWEEN {{.start}} AND {{.end}}", s.pgSuffix, tblName) + jobName := AddSuffix(s, tblName) qrepConfig := CreateQRepWorkflowConfig( s.t, - "test_deleted_false_qrep_sf", + jobName, fmt.Sprintf("e2e_test_%s.%s", s.pgSuffix, tblName), dstSchemaQualified, query, From a6a8ffade49051ebca9f951b6e50649ba805eb87 Mon Sep 17 00:00:00 2001 From: Ilia Demianenko Date: Sat, 4 Oct 2025 00:28:20 -0700 Subject: [PATCH 13/13] sigh --- flow/e2e/api_test.go | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/flow/e2e/api_test.go b/flow/e2e/api_test.go index 4328b57fa7..56073021e4 100644 --- a/flow/e2e/api_test.go +++ b/flow/e2e/api_test.go @@ -1117,7 +1117,7 @@ func (s APITestSuite) TestQRep() { PeerName: s.source.GeneratePeer(s.t).Name, }) require.NoError(s.t, err) - tableName := "qrepapi" + tableName := AddSuffix(s, "qrepapi") schemaQualified := AttachSchema(s, tableName) require.NoError(s.t, s.source.Exec(s.t.Context(), fmt.Sprintf("CREATE TABLE %s(id int primary key, val text)", schemaQualified))) @@ -1125,12 +1125,11 @@ func (s APITestSuite) TestQRep() { fmt.Sprintf("INSERT INTO %s(id, val) values (1,'first')", schemaQualified))) flowName := fmt.Sprintf("qrepapiflow_%s_%s", peerType.PeerType, s.suffix) - destTableName := AddSuffix(s, tableName) qrepConfig := CreateQRepWorkflowConfig( s.t, flowName, schemaQualified, - destTableName, + tableName, fmt.Sprintf("SELECT * FROM %s WHERE id BETWEEN {{.start}} AND {{.end}}", schemaQualified), s.ch.Peer().Name, "", @@ -1152,12 +1151,12 @@ func (s APITestSuite) TestQRep() { env, err := GetPeerflow(s.t.Context(), s.pg.PostgresConnector.Conn(), tc, qrepConfig.FlowJobName) require.NoError(s.t, err) - EnvWaitForEqualTables(env, s.ch, "qrep initial load", destTableName, "id,val") + EnvWaitForEqualTables(env, s.ch, "qrep initial load", tableName, "id,val") require.NoError(s.t, s.source.Exec(s.t.Context(), fmt.Sprintf("INSERT INTO %s(id, val) values (2,'second')", schemaQualified))) - EnvWaitForEqualTables(env, s.ch, "insert post qrep initial load", destTableName, "id,val") + EnvWaitForEqualTables(env, s.ch, "insert post qrep initial load", tableName, "id,val") statusResponse, err := s.MirrorStatus(s.t.Context(), &protos.MirrorStatusRequest{ FlowJobName: qrepConfig.FlowJobName, IncludeFlowInfo: true,