diff --git a/apps/workspace-engine/pkg/db/workspaces.go b/apps/workspace-engine/pkg/db/workspaces.go index 002308395..0f7e77bb7 100644 --- a/apps/workspace-engine/pkg/db/workspaces.go +++ b/apps/workspace-engine/pkg/db/workspaces.go @@ -89,14 +89,26 @@ func GetAllWorkspaceIDs(ctx context.Context) ([]string, error) { } type WorkspaceSnapshot struct { + WorkspaceID string Path string Timestamp time.Time Partition int32 NumPartitions int32 + Offset int64 } const WORKSPACE_SNAPSHOT_SELECT_QUERY = ` - SELECT path, timestamp, partition, num_partitions FROM workspace_snapshot WHERE workspace_id = $1 ORDER BY timestamp DESC LIMIT 1 + SELECT + workspace_id, + path, + timestamp, + partition, + num_partitions, + "offset" + FROM workspace_snapshot + WHERE workspace_id = $1 + ORDER BY "offset" DESC, timestamp DESC + LIMIT 1 ` func GetWorkspaceSnapshot(ctx context.Context, workspaceID string) (*WorkspaceSnapshot, error) { @@ -108,10 +120,12 @@ func GetWorkspaceSnapshot(ctx context.Context, workspaceID string) (*WorkspaceSn workspaceSnapshot := &WorkspaceSnapshot{} err = db.QueryRow(ctx, WORKSPACE_SNAPSHOT_SELECT_QUERY, workspaceID).Scan( + &workspaceSnapshot.WorkspaceID, &workspaceSnapshot.Path, &workspaceSnapshot.Timestamp, &workspaceSnapshot.Partition, &workspaceSnapshot.NumPartitions, + &workspaceSnapshot.Offset, ) if err != nil { if err == pgx.ErrNoRows { @@ -122,19 +136,62 @@ func GetWorkspaceSnapshot(ctx context.Context, workspaceID string) (*WorkspaceSn return workspaceSnapshot, nil } +func GetLatestWorkspaceSnapshots(ctx context.Context, workspaceIDs []string) (map[string]*WorkspaceSnapshot, error) { + if len(workspaceIDs) == 0 { + return nil, nil + } + + db, err := GetDB(ctx) + if err != nil { + return nil, err + } + defer db.Release() + + const query = ` + SELECT DISTINCT ON (workspace_id) workspace_id, path, timestamp, partition, num_partitions, "offset" + FROM workspace_snapshot + WHERE workspace_id = ANY($1) + ORDER BY workspace_id, "offset" DESC, timestamp DESC + ` + rows, err := db.Query(ctx, query, workspaceIDs) + if err != nil { + return nil, err + } + defer rows.Close() + + var snapshots []*WorkspaceSnapshot + for rows.Next() { + var snapshot WorkspaceSnapshot + err := rows.Scan(&snapshot.WorkspaceID, &snapshot.Path, &snapshot.Timestamp, &snapshot.Partition, &snapshot.NumPartitions, &snapshot.Offset) + if err != nil { + return nil, err + } + snapshots = append(snapshots, &snapshot) + } + if err := rows.Err(); err != nil { + return nil, err + } + + snapshotMap := make(map[string]*WorkspaceSnapshot) + for _, snapshot := range snapshots { + snapshotMap[snapshot.WorkspaceID] = snapshot + } + return snapshotMap, nil +} + const WORKSPACE_SNAPSHOT_INSERT_QUERY = ` - INSERT INTO workspace_snapshot (workspace_id, path, timestamp, partition, num_partitions) - VALUES ($1, $2, $3, $4, $5) + INSERT INTO workspace_snapshot (workspace_id, path, timestamp, partition, num_partitions, "offset") + VALUES ($1, $2, $3, $4, $5, $6) ` -func WriteWorkspaceSnapshot(ctx context.Context, workspaceID string, snapshot *WorkspaceSnapshot) error { +func WriteWorkspaceSnapshot(ctx context.Context, snapshot *WorkspaceSnapshot) error { db, err := GetDB(ctx) if err != nil { return err } defer db.Release() - _, err = db.Exec(ctx, WORKSPACE_SNAPSHOT_INSERT_QUERY, workspaceID, snapshot.Path, snapshot.Timestamp, snapshot.Partition, snapshot.NumPartitions) + _, err = db.Exec(ctx, WORKSPACE_SNAPSHOT_INSERT_QUERY, snapshot.WorkspaceID, snapshot.Path, snapshot.Timestamp, snapshot.Partition, snapshot.NumPartitions, snapshot.Offset) if err != nil { return err } diff --git a/apps/workspace-engine/pkg/db/workspaces_test.go b/apps/workspace-engine/pkg/db/workspaces_test.go index fc579f51c..bf4f8df78 100644 --- a/apps/workspace-engine/pkg/db/workspaces_test.go +++ b/apps/workspace-engine/pkg/db/workspaces_test.go @@ -2,6 +2,7 @@ package db import ( "testing" + "time" "github.com/google/uuid" ) @@ -69,3 +70,123 @@ func TestDBWorkspaces_GetWorkspaceIDs(t *testing.T) { _ = conn2 _ = conn3 } + +func TestDBWorkspaces_GetLatestWorkspaceSnapshots(t *testing.T) { + ctx := t.Context() + + // Create 3 workspaces + ws1ID, _ := setupTestWithWorkspace(t) + ws2ID, _ := setupTestWithWorkspace(t) + ws3ID, _ := setupTestWithWorkspace(t) + + // Create multiple snapshots for workspace 1 (verify it returns latest by offset) + ws1Snapshots := []*WorkspaceSnapshot{ + { + WorkspaceID: ws1ID, + Path: "ws1-v1.gob", + Timestamp: time.Now().Add(-10 * time.Hour), // Oldest time + Partition: 0, + Offset: 100, // Highest offset - should be returned + NumPartitions: 3, + }, + { + WorkspaceID: ws1ID, + Path: "ws1-v2.gob", + Timestamp: time.Now(), // Newest time + Partition: 0, + Offset: 50, // Lower offset + NumPartitions: 3, + }, + { + WorkspaceID: ws1ID, + Path: "ws1-v3.gob", + Timestamp: time.Now().Add(-5 * time.Hour), + Partition: 0, + Offset: 75, + NumPartitions: 3, + }, + } + + for _, snapshot := range ws1Snapshots { + if err := WriteWorkspaceSnapshot(ctx, snapshot); err != nil { + t.Fatalf("Failed to write snapshot for ws1: %v", err) + } + } + + // Create snapshots for workspace 2 + ws2Snapshot := &WorkspaceSnapshot{ + WorkspaceID: ws2ID, + Path: "ws2.gob", + Timestamp: time.Now(), + Partition: 1, + Offset: 200, + NumPartitions: 3, + } + + if err := WriteWorkspaceSnapshot(ctx, ws2Snapshot); err != nil { + t.Fatalf("Failed to write snapshot for ws2: %v", err) + } + + // Workspace 3 has no snapshots + + // Test 1: Get snapshots for workspaces with snapshots + snapshots, err := GetLatestWorkspaceSnapshots(ctx, []string{ws1ID, ws2ID}) + if err != nil { + t.Fatalf("Failed to get latest snapshots: %v", err) + } + + if len(snapshots) != 2 { + t.Fatalf("Expected 2 snapshots, got %d", len(snapshots)) + } + + // Verify workspace 1 returns snapshot with HIGHEST offset (not newest timestamp) + ws1Snapshot := snapshots[ws1ID] + if ws1Snapshot == nil { + t.Fatal("No snapshot returned for ws1") + } + if ws1Snapshot.Offset != 100 { + t.Fatalf("Expected ws1 snapshot with offset 100 (highest), got %d", ws1Snapshot.Offset) + } + if ws1Snapshot.Path != "ws1-v1.gob" { + t.Fatalf("Expected ws1-v1.gob (highest offset), got %s", ws1Snapshot.Path) + } + + // Verify workspace 2 returns its snapshot + ws2SnapshotResult := snapshots[ws2ID] + if ws2SnapshotResult == nil { + t.Fatal("No snapshot returned for ws2") + } + if ws2SnapshotResult.Offset != 200 { + t.Fatalf("Expected ws2 snapshot with offset 200, got %d", ws2SnapshotResult.Offset) + } + + // Test 2: Get snapshots including workspace with no snapshots + snapshots, err = GetLatestWorkspaceSnapshots(ctx, []string{ws1ID, ws2ID, ws3ID}) + if err != nil { + t.Fatalf("Failed to get snapshots with ws3: %v", err) + } + + // Should still return 2 (ws3 has no snapshots) + if len(snapshots) != 2 { + t.Fatalf("Expected 2 snapshots (ws3 has none), got %d", len(snapshots)) + } + + // Test 3: Empty workspace ID array + snapshots, err = GetLatestWorkspaceSnapshots(ctx, []string{}) + if err != nil { + t.Fatalf("Expected no error for empty array, got %v", err) + } + if snapshots != nil { + t.Fatalf("Expected nil for empty array, got %d snapshots", len(snapshots)) + } + + // Test 4: Non-existent workspace IDs + fakeID := uuid.New().String() + snapshots, err = GetLatestWorkspaceSnapshots(ctx, []string{fakeID}) + if err != nil { + t.Fatalf("Expected no error for non-existent workspace, got %v", err) + } + if len(snapshots) != 0 { + t.Fatalf("Expected 0 snapshots for non-existent workspace, got %d", len(snapshots)) + } +} diff --git a/apps/workspace-engine/pkg/events/events.go b/apps/workspace-engine/pkg/events/events.go index de6e07e11..371273fc8 100644 --- a/apps/workspace-engine/pkg/events/events.go +++ b/apps/workspace-engine/pkg/events/events.go @@ -17,6 +17,7 @@ import ( "workspace-engine/pkg/events/handler/system" "workspace-engine/pkg/events/handler/tick" "workspace-engine/pkg/events/handler/userapprovalrecords" + "workspace-engine/pkg/events/handler/workspacesave" ) var handlers = handler.HandlerRegistry{ @@ -75,6 +76,7 @@ var handlers = handler.HandlerRegistry{ handler.GithubEntityDelete: githubentities.HandleGithubEntityDeleted, handler.WorkspaceTick: tick.HandleWorkspaceTick, + handler.WorkspaceSave: workspacesave.HandleWorkspaceSave, handler.ReleaseTargetDeploy: redeploy.HandleReleaseTargetDeploy, } diff --git a/apps/workspace-engine/pkg/events/handler/handler.go b/apps/workspace-engine/pkg/events/handler/handler.go index 023bed7ce..a2023734c 100644 --- a/apps/workspace-engine/pkg/events/handler/handler.go +++ b/apps/workspace-engine/pkg/events/handler/handler.go @@ -76,6 +76,7 @@ const ( GithubEntityDelete EventType = "github-entity.deleted" WorkspaceTick EventType = "workspace.tick" + WorkspaceSave EventType = "workspace.save" ReleaseTargetDeploy EventType = "release-target.deploy" ) @@ -104,8 +105,14 @@ func NewEventListener(handlers HandlerRegistry) *EventListener { return &EventListener{handlers: handlers} } +type OffsetTracker struct { + LastCommittedOffset int64 + LastWorkspaceOffset int64 + MessageOffset int64 +} + // ListenAndRoute processes incoming Kafka messages and routes them to the appropriate handler -func (el *EventListener) ListenAndRoute(ctx context.Context, msg *kafka.Message) (*workspace.Workspace, error) { +func (el *EventListener) ListenAndRoute(ctx context.Context, msg *kafka.Message, offsetTracker OffsetTracker) (*workspace.Workspace, error) { ctx, span := tracer.Start(ctx, "ListenAndRoute", trace.WithAttributes( attribute.String("kafka.topic", *msg.TopicPartition.Topic), @@ -151,6 +158,13 @@ func (el *EventListener) ListenAndRoute(ctx context.Context, msg *kafka.Message) return nil, fmt.Errorf("workspace not found: %s", rawEvent.WorkspaceID) } + isReplay := offsetTracker.MessageOffset <= offsetTracker.LastCommittedOffset + ws.Store().SetIsReplay(isReplay) + + if offsetTracker.MessageOffset <= offsetTracker.LastWorkspaceOffset { + return ws, nil + } + ctx = changeset.WithChangeSet(ctx, changeSet) if err := handler(ctx, ws, rawEvent); err != nil { diff --git a/apps/workspace-engine/pkg/events/handler/workspacesave/workspacesave.go b/apps/workspace-engine/pkg/events/handler/workspacesave/workspacesave.go new file mode 100644 index 000000000..38d469dd2 --- /dev/null +++ b/apps/workspace-engine/pkg/events/handler/workspacesave/workspacesave.go @@ -0,0 +1,23 @@ +package workspacesave + +import ( + "context" + "encoding/json" + "workspace-engine/pkg/events/handler" + "workspace-engine/pkg/workspace" + + "github.com/confluentinc/confluent-kafka-go/v2/kafka" +) + +func HandleWorkspaceSave(_ctx context.Context, _ws *workspace.Workspace, _event handler.RawEvent) error { + return nil +} + +func IsWorkspaceSaveEvent(msg *kafka.Message) bool { + var rawEvent handler.RawEvent + if err := json.Unmarshal(msg.Value, &rawEvent); err != nil { + return false + } + + return rawEvent.EventType == handler.WorkspaceSave +} diff --git a/apps/workspace-engine/pkg/kafka/consumer.go b/apps/workspace-engine/pkg/kafka/consumer.go index 56917fcb7..cfedb3eff 100644 --- a/apps/workspace-engine/pkg/kafka/consumer.go +++ b/apps/workspace-engine/pkg/kafka/consumer.go @@ -1,6 +1,9 @@ package kafka import ( + "context" + "workspace-engine/pkg/db" + "github.com/charmbracelet/log" "github.com/confluentinc/confluent-kafka-go/v2/kafka" ) @@ -27,3 +30,50 @@ func createConsumer() (*kafka.Consumer, error) { return c, nil } + +func getEarliestOffset(snapshots map[string]*db.WorkspaceSnapshot) int64 { + beginning := int64(kafka.OffsetBeginning) + if len(snapshots) == 0 { + return beginning + } + + var earliestOffset int64 + has := false + for _, snapshot := range snapshots { + if snapshot == nil { + continue + } + if !has || snapshot.Offset < earliestOffset { + earliestOffset = snapshot.Offset + has = true + } + } + if !has { + return beginning + } + return earliestOffset +} + +func setOffsets(ctx context.Context, consumer *kafka.Consumer, partitionWorkspaceMap map[int32][]string) { + for partition, workspaceIDs := range partitionWorkspaceMap { + snapshots, err := db.GetLatestWorkspaceSnapshots(ctx, workspaceIDs) + if err != nil { + log.Error("Failed to get latest workspace snapshots", "error", err) + continue + } + + earliestOffset := getEarliestOffset(snapshots) + effectiveOffset := earliestOffset + if effectiveOffset > 0 { + effectiveOffset = effectiveOffset + 1 + } + if err := consumer.Seek(kafka.TopicPartition{ + Topic: &Topic, + Partition: partition, + Offset: kafka.Offset(effectiveOffset), + }, 0); err != nil { + log.Error("Failed to seek to earliest offset", "error", err) + continue + } + } +} diff --git a/apps/workspace-engine/pkg/kafka/kafka.go b/apps/workspace-engine/pkg/kafka/kafka.go index a5b05501a..415f62adb 100644 --- a/apps/workspace-engine/pkg/kafka/kafka.go +++ b/apps/workspace-engine/pkg/kafka/kafka.go @@ -2,18 +2,22 @@ package kafka import ( "context" + "encoding/json" "fmt" "os" "time" "workspace-engine/pkg/db" "workspace-engine/pkg/events" + eventHanlder "workspace-engine/pkg/events/handler" + "workspace-engine/pkg/events/handler/workspacesave" "workspace-engine/pkg/oapi" "workspace-engine/pkg/workspace" wskafka "workspace-engine/pkg/workspace/kafka" "github.com/aws/smithy-go/ptr" "github.com/charmbracelet/log" + "github.com/confluentinc/confluent-kafka-go/v2/kafka" ) // Configuration variables loaded from environment @@ -32,6 +36,26 @@ func getEnv(varName string, defaultValue string) string { return v } +func getLastSnapshot(ctx context.Context, msg *kafka.Message) (*db.WorkspaceSnapshot, error) { + var rawEvent eventHanlder.RawEvent + if err := json.Unmarshal(msg.Value, &rawEvent); err != nil { + log.Error("Failed to unmarshal event", "error", err, "message", string(msg.Value)) + return nil, fmt.Errorf("failed to unmarshal event: %w", err) + } + + return db.GetWorkspaceSnapshot(ctx, rawEvent.WorkspaceID) +} + +func getLastWorkspaceOffset(snapshot *db.WorkspaceSnapshot) int64 { + beginning := int64(kafka.OffsetBeginning) + + if snapshot == nil { + return beginning + } + + return snapshot.Offset +} + // RunConsumerWithWorkspaceLoader starts the Kafka consumer with workspace-based offset resume // // Flow: @@ -83,11 +107,17 @@ func RunConsumer(ctx context.Context) error { } log.Info("Partition assignment complete", "assigned", assignedPartitions) - allWorkspaceIDs, err := wskafka.GetAssignedWorkspaceIDs(ctx, assignedPartitions, numPartitions) + partitionWorkspaceMap, err := wskafka.GetAssignedWorkspaceIDs(ctx, assignedPartitions, numPartitions) if err != nil { return fmt.Errorf("failed to get assigned workspace IDs: %w", err) } + // Flatten the map to get all workspace IDs + var allWorkspaceIDs []string + for _, workspaceIDs := range partitionWorkspaceMap { + allWorkspaceIDs = append(allWorkspaceIDs, workspaceIDs...) + } + storage := workspace.NewFileStorage("./state") if workspace.IsGCSStorageEnabled() { storage, err = workspace.NewGCSStorageClient(ctx) @@ -118,6 +148,8 @@ func RunConsumer(ctx context.Context) error { // Start consuming messages handler := events.NewEventHandler() + setOffsets(ctx, consumer, partitionWorkspaceMap) + for { // Check for cancellation select { @@ -134,7 +166,27 @@ func RunConsumer(ctx context.Context) error { continue } - ws, err := handler.ListenAndRoute(ctx, msg) + lastSnapshot, err := getLastSnapshot(ctx, msg) + if err != nil { + log.Error("Failed to get last snapshot", "error", err) + continue + } + + messageOffset := int64(msg.TopicPartition.Offset) + lastCommittedOffset, err := getCommittedOffset(consumer, msg.TopicPartition.Partition) + if err != nil { + log.Error("Failed to get committed offset", "error", err) + continue + } + lastWorkspaceOffset := getLastWorkspaceOffset(lastSnapshot) + + offsetTracker := eventHanlder.OffsetTracker{ + LastCommittedOffset: lastCommittedOffset, + LastWorkspaceOffset: lastWorkspaceOffset, + MessageOffset: messageOffset, + } + + ws, err := handler.ListenAndRoute(ctx, msg, offsetTracker) if err != nil { log.Error("Failed to route message", "error", err) } @@ -150,15 +202,19 @@ func RunConsumer(ctx context.Context) error { continue } - snapshot := &db.WorkspaceSnapshot{ - Path: fmt.Sprintf("%s.gob", ws.ID), - Timestamp: msg.Timestamp, - Partition: int32(msg.TopicPartition.Partition), - NumPartitions: numPartitions, - } - - if err := workspace.Save(ctx, storage, ws, snapshot); err != nil { - log.Error("Failed to save workspace", "workspaceID", ws.ID, "snapshotPath", snapshot.Path, "error", err) + if workspacesave.IsWorkspaceSaveEvent(msg) { + snapshot := &db.WorkspaceSnapshot{ + WorkspaceID: ws.ID, + Path: fmt.Sprintf("%s.gob", ws.ID), + Timestamp: msg.Timestamp, + Partition: int32(msg.TopicPartition.Partition), + Offset: int64(msg.TopicPartition.Offset), + NumPartitions: numPartitions, + } + + if err := workspace.Save(ctx, storage, ws, snapshot); err != nil { + log.Error("Failed to save workspace", "workspaceID", ws.ID, "snapshotPath", snapshot.Path, "error", err) + } } } } diff --git a/apps/workspace-engine/pkg/kafka/offset.go b/apps/workspace-engine/pkg/kafka/offset.go index 6ad7a182e..58ff5319a 100644 --- a/apps/workspace-engine/pkg/kafka/offset.go +++ b/apps/workspace-engine/pkg/kafka/offset.go @@ -139,3 +139,26 @@ func getTopicPartitionCount(c *kafka.Consumer) (int32, error) { return numPartitions, nil } + +// getCommittedOffset retrieves the last committed offset for a partition +func getCommittedOffset(consumer *kafka.Consumer, partition int32) (int64, error) { + partitions := []kafka.TopicPartition{ + { + Topic: &Topic, + Partition: partition, + Offset: kafka.OffsetStored, // This fetches the committed offset + }, + } + + committed, err := consumer.Committed(partitions, 5000) + if err != nil { + return int64(kafka.OffsetInvalid), err + } + + if len(committed) == 0 || committed[0].Offset == kafka.OffsetInvalid { + // No committed offset yet, this is the beginning + return int64(kafka.OffsetBeginning), nil + } + + return int64(committed[0].Offset), nil +} diff --git a/apps/workspace-engine/pkg/workspace/kafka/state.go b/apps/workspace-engine/pkg/workspace/kafka/state.go index a25e542f8..273d16c42 100644 --- a/apps/workspace-engine/pkg/workspace/kafka/state.go +++ b/apps/workspace-engine/pkg/workspace/kafka/state.go @@ -29,7 +29,7 @@ func FilterWorkspaceIDsForPartition(workspaceIDs []string, targetPartition int32 type WorkspaceIDDiscoverer func(ctx context.Context, targetPartition int32, numPartitions int32) ([]string, error) -func GetAssignedWorkspaceIDs(ctx context.Context, assignedPartitions []int32, numPartitions int32) ([]string, error) { +func GetAssignedWorkspaceIDs(ctx context.Context, assignedPartitions []int32, numPartitions int32) (map[int32][]string, error) { workspaceIDs, err := db.GetAllWorkspaceIDs(ctx) if err != nil { return nil, err @@ -40,10 +40,11 @@ func GetAssignedWorkspaceIDs(ctx context.Context, assignedPartitions []int32, nu assignedSet[p] = true } - var result []string + result := make(map[int32][]string) for _, workspaceID := range workspaceIDs { - if assignedSet[PartitionForWorkspace(workspaceID, numPartitions)] { - result = append(result, workspaceID) + partition := PartitionForWorkspace(workspaceID, numPartitions) + if assignedSet[partition] { + result[partition] = append(result[partition], workspaceID) } } diff --git a/apps/workspace-engine/pkg/workspace/loader.go b/apps/workspace-engine/pkg/workspace/loader.go index 925c0337f..9fac3f363 100644 --- a/apps/workspace-engine/pkg/workspace/loader.go +++ b/apps/workspace-engine/pkg/workspace/loader.go @@ -36,7 +36,7 @@ func Save(ctx context.Context, storage StorageClient, workspace *Workspace, snap return fmt.Errorf("failed to write workspace to disk: %w", err) } - if err := db.WriteWorkspaceSnapshot(ctx, workspace.ID, snapshot); err != nil { + if err := db.WriteWorkspaceSnapshot(ctx, snapshot); err != nil { span.RecordError(err) span.SetStatus(codes.Error, "Failed to write workspace snapshot") return fmt.Errorf("failed to write workspace snapshot: %w", err) diff --git a/apps/workspace-engine/pkg/workspace/releasemanager/deployment/executor.go b/apps/workspace-engine/pkg/workspace/releasemanager/deployment/executor.go index 2823adafb..444a98547 100644 --- a/apps/workspace-engine/pkg/workspace/releasemanager/deployment/executor.go +++ b/apps/workspace-engine/pkg/workspace/releasemanager/deployment/executor.go @@ -61,6 +61,11 @@ func (e *Executor) ExecuteRelease(ctx context.Context, releaseToDeploy *oapi.Rel attribute.String("job.status", string(newJob.Status)), ) + if e.store.IsReplay() { + log.Info("Skipping job dispatch in replay mode", "job.id", newJob.Id) + return nil + } + // Step 4: Dispatch job to integration (ASYNC) // Skip dispatch if job already has InvalidJobAgent status if newJob.Status != oapi.InvalidJobAgent { diff --git a/apps/workspace-engine/pkg/workspace/store/store.go b/apps/workspace-engine/pkg/workspace/store/store.go index 0ea35f299..489c7c2eb 100644 --- a/apps/workspace-engine/pkg/workspace/store/store.go +++ b/apps/workspace-engine/pkg/workspace/store/store.go @@ -3,6 +3,7 @@ package store import ( "bytes" "encoding/gob" + "sync/atomic" "workspace-engine/pkg/workspace/store/repository" ) @@ -11,9 +12,8 @@ var _ gob.GobDecoder = (*Store)(nil) func New() *Store { repo := repository.New() - store := &Store{ - repo: repo, - } + store := &Store{repo: repo} + store.isReplay.Store(false) store.Deployments = NewDeployments(store) store.Environments = NewEnvironments(store) @@ -56,6 +56,16 @@ type Store struct { Relationships *RelationshipRules Variables *Variables GithubEntities *GithubEntities + + isReplay atomic.Bool +} + +func (s *Store) IsReplay() bool { + return s.isReplay.Load() +} + +func (s *Store) SetIsReplay(isReplay bool) { + s.isReplay.Store(isReplay) } func (s *Store) Repo() *repository.Repository { diff --git a/apps/workspace-engine/test/e2e/engine_kafka_replay_test.go b/apps/workspace-engine/test/e2e/engine_kafka_replay_test.go new file mode 100644 index 000000000..583d19690 --- /dev/null +++ b/apps/workspace-engine/test/e2e/engine_kafka_replay_test.go @@ -0,0 +1,1217 @@ +package e2e + +import ( + "context" + "encoding/json" + "fmt" + "math/rand" + "os" + "testing" + "time" + "workspace-engine/pkg/db" + eventHandler "workspace-engine/pkg/events/handler" + kafkapkg "workspace-engine/pkg/kafka" + "workspace-engine/pkg/workspace" + + "github.com/confluentinc/confluent-kafka-go/v2/kafka" + "github.com/google/uuid" +) + +// TestEngine_Kafka_Replay_BasicFlow tests the full e2e replay logic with actual consumer: +// - Creates resources via Kafka messages +// - Sends workspace save event to trigger snapshot +// - Runs actual consumer to process messages +// - Verifies workspace state, snapshots, and file persistence +// - Tests full restore cycle +func TestEngine_Kafka_Replay_BasicFlow(t *testing.T) { + env := setupTestEnvironment(t) + defer env.cleanup() + + resourceIDs := []string{ + uuid.New().String(), + uuid.New().String(), + uuid.New().String(), + } + + // Produce resource creation messages to Kafka + for _, resourceID := range resourceIDs { + env.produceResourceCreateEvent(resourceID) + } + + // Produce workspace save event to trigger snapshot creation + env.produceWorkspaceSaveEvent() + + // Run actual consumer to process messages + env.runConsumer(5 * time.Second) + + // Verify workspace state was updated + ws := workspace.GetWorkspace(env.workspaceID) + if ws == nil { + t.Fatal("Workspace not found in memory") + } + + resources := ws.Resources().Items() + if len(resources) != len(resourceIDs) { + t.Fatalf("Expected %d resources, got %d", len(resourceIDs), len(resources)) + } + + for _, resourceID := range resourceIDs { + resource, exists := ws.Resources().Get(resourceID) + if !exists { + t.Fatalf("Resource %s not found in workspace", resourceID) + } + if resource.Id != resourceID { + t.Fatalf("Resource ID mismatch: expected %s, got %s", resourceID, resource.Id) + } + } + + // Verify snapshots were created + latestSnapshot, err := db.GetWorkspaceSnapshot(env.ctx, env.workspaceID) + if err != nil { + t.Fatalf("Failed to get latest snapshot: %v", err) + } + if latestSnapshot == nil { + t.Fatal("No snapshot found") + } + + // Verify snapshot file was written + storage := workspace.NewFileStorage("./state") + snapshotData, err := storage.Get(env.ctx, latestSnapshot.Path) + if err != nil { + t.Fatalf("Failed to read snapshot file: %v", err) + } + if len(snapshotData) == 0 { + t.Fatal("Snapshot file is empty") + } + + // Test snapshot restore + restoredWs := workspace.New(env.workspaceID) + if err := restoredWs.GobDecode(snapshotData); err != nil { + t.Fatalf("Failed to decode snapshot: %v", err) + } + + restoredResources := restoredWs.Resources().Items() + if len(restoredResources) != len(resourceIDs) { + t.Fatalf("Restored workspace: expected %d resources, got %d", len(resourceIDs), len(restoredResources)) + } + + for _, resourceID := range resourceIDs { + resource, exists := restoredWs.Resources().Get(resourceID) + if !exists { + t.Fatalf("Resource %s not found in restored workspace", resourceID) + } + if resource.Id != resourceID { + t.Fatalf("Restored resource ID mismatch: expected %s, got %s", resourceID, resource.Id) + } + } +} + +// TestEngine_Kafka_Replay_WorkspaceSaveEvent tests that snapshots are only created +// when workspace.save events are sent +func TestEngine_Kafka_Replay_WorkspaceSaveEvent(t *testing.T) { + env := setupTestEnvironment(t) + defer env.cleanup() + + // Produce resource events without workspace save + env.produceResourceCreateEvent(uuid.New().String()) + env.produceResourceCreateEvent(uuid.New().String()) + + // Run consumer + env.runConsumer(3 * time.Second) + + // Verify NO snapshot was created (no workspace.save event sent) + snapshot1, err := db.GetWorkspaceSnapshot(env.ctx, env.workspaceID) + if err != nil { + t.Fatalf("Failed to get snapshot: %v", err) + } + if snapshot1 != nil { + t.Fatalf("Expected no snapshot without workspace.save event, but found one at offset %d", snapshot1.Offset) + } + + // Now produce workspace save event + env.produceWorkspaceSaveEvent() + + // Run consumer again + env.runConsumer(3 * time.Second) + + // Verify snapshot WAS created after workspace.save event + snapshot2, err := db.GetWorkspaceSnapshot(env.ctx, env.workspaceID) + if err != nil { + t.Fatalf("Failed to get snapshot after save event: %v", err) + } + if snapshot2 == nil { + t.Fatal("Expected snapshot to be created after workspace.save event") + } + + // Verify snapshot contains the workspace state + storage := workspace.NewFileStorage("./state") + snapshotData, err := storage.Get(env.ctx, snapshot2.Path) + if err != nil { + t.Fatalf("Failed to read snapshot file: %v", err) + } + if len(snapshotData) == 0 { + t.Fatal("Snapshot file is empty") + } +} + +// TestEngine_Kafka_Replay_MultipleWorkspaces tests replay logic with multiple workspaces +// on the same partition, each with different snapshot offsets (CRITICAL FOR PARTITION SHARING) +func TestEngine_Kafka_Replay_MultipleWorkspaces(t *testing.T) { + if !isKafkaAvailable(t) { + t.Skip("Kafka broker not available") + } + + ctx := context.Background() + cleanupAllTestWorkspaces(ctx) + + topicName := fmt.Sprintf("test-multi-ws-%s", uuid.New().String()[:8]) + + // Set environment + os.Setenv("KAFKA_TOPIC", topicName) + kafkapkg.Topic = topicName + kafkapkg.GroupID = fmt.Sprintf("test-group-%s", uuid.New().String()[:8]) + + defer func() { + os.Unsetenv("KAFKA_TOPIC") + kafkapkg.Topic = "workspace-events" + kafkapkg.GroupID = "workspace-engine" + }() + + // Create 3 workspaces + wsIDs := []string{ + uuid.New().String(), + uuid.New().String(), + uuid.New().String(), + } + + // Create workspaces in database + for _, wsID := range wsIDs { + if err := createTestWorkspace(ctx, wsID); err != nil { + t.Fatalf("Failed to create workspace %s: %v", wsID, err) + } + } + + defer func() { + for _, wsID := range wsIDs { + cleanupTestWorkspace(ctx, wsID) + } + cleanupKafkaTopic(t, topicName) + }() + + // Create snapshots at different offsets + // WS1: offset 5, WS2: offset 10, WS3: offset 15 + snapshots := []*db.WorkspaceSnapshot{ + { + WorkspaceID: wsIDs[0], + Path: fmt.Sprintf("%s.gob", wsIDs[0]), + Timestamp: time.Now().Add(-1 * time.Hour), + Partition: 0, + Offset: 5, + NumPartitions: 1, + }, + { + WorkspaceID: wsIDs[1], + Path: fmt.Sprintf("%s.gob", wsIDs[1]), + Timestamp: time.Now().Add(-1 * time.Hour), + Partition: 0, + Offset: 10, + NumPartitions: 1, + }, + { + WorkspaceID: wsIDs[2], + Path: fmt.Sprintf("%s.gob", wsIDs[2]), + Timestamp: time.Now().Add(-1 * time.Hour), + Partition: 0, + Offset: 15, + NumPartitions: 1, + }, + } + + for _, snapshot := range snapshots { + if err := db.WriteWorkspaceSnapshot(ctx, snapshot); err != nil { + t.Fatalf("Failed to write snapshot: %v", err) + } + } + + // Create producer + producer := createTestProducer(t) + defer producer.Close() + + factory1 := newResourceFactory(wsIDs[0]) + factory2 := newResourceFactory(wsIDs[1]) + factory3 := newResourceFactory(wsIDs[2]) + + // Produce 30 messages (10 per workspace) for clearer distribution + // WS1 messages at offsets: 0, 3, 6, 9, 12, 15, 18, 21, 24, 27 + // WS2 messages at offsets: 1, 4, 7, 10, 13, 16, 19, 22, 25, 28 + // WS3 messages at offsets: 2, 5, 8, 11, 14, 17, 20, 23, 26, 29 + for i := 0; i < 30; i++ { + var wsID string + var factory *resourceFactory + + switch i % 3 { + case 0: + wsID = wsIDs[0] + factory = factory1 + case 1: + wsID = wsIDs[1] + factory = factory2 + case 2: + wsID = wsIDs[2] + factory = factory3 + } + + resourceID := uuid.New().String() + payload := factory.create(resourceID) + + event := createTestEvent(wsID, eventHandler.ResourceCreate, payload) + produceKafkaMessage(t, producer, topicName, event, int32(0), 0) + } + + // Produce workspace save events for all workspaces + for _, wsID := range wsIDs { + event := createTestEvent(wsID, eventHandler.WorkspaceSave, map[string]interface{}{}) + produceKafkaMessage(t, producer, topicName, event, int32(0), 0) + } + + // Run consumer + consumerCtx, cancelConsumer := context.WithCancel(ctx) + consumerDone := make(chan error, 1) + + go func() { + consumerDone <- kafkapkg.RunConsumer(consumerCtx) + }() + + // Wait longer for multiple workspaces to process + time.Sleep(10 * time.Second) + cancelConsumer() + + select { + case err := <-consumerDone: + if err != nil && err != context.Canceled { + t.Fatalf("Consumer error: %v", err) + } + case <-time.After(5 * time.Second): + t.Fatal("Consumer shutdown timeout") + } + + // Give async operations time to complete + time.Sleep(1 * time.Second) + + // Verify each workspace processed correct messages + ws1 := workspace.GetWorkspace(wsIDs[0]) + ws2 := workspace.GetWorkspace(wsIDs[1]) + ws3 := workspace.GetWorkspace(wsIDs[2]) + + if ws1 == nil || ws2 == nil || ws3 == nil { + t.Fatal("Not all workspaces found") + } + + // Count resources processed by each workspace + ws1Resources := len(ws1.Resources().Items()) + ws2Resources := len(ws2.Resources().Items()) + ws3Resources := len(ws3.Resources().Items()) + + // Expected distribution with 30 messages: + // WS1 messages at offsets: 0, 3, 6, 9, 12, 15, 18, 21, 24, 27 + // BC boundary offset 5: skips 0,3 → processes 6,9,12,15,18,21,24,27 → 8 resources + // WS2 messages at offsets: 1, 4, 7, 10, 13, 16, 19, 22, 25, 28 + // BC boundary offset 10: skips 1,4,7,10 → processes 13,16,19,22,25,28 → 6 resources + // WS3 messages at offsets: 2, 5, 8, 11, 14, 17, 20, 23, 26, 29 + // BC boundary offset 15: skips 2,5,8,11,14 → processes 17,20,23,26,29 → 5 resources + + if ws1Resources < 5 { + t.Fatalf("WS1 (BC: offset 5) expected ~8 resources, got %d", ws1Resources) + } + if ws2Resources < 4 { + t.Fatalf("WS2 (BC: offset 10) expected ~6 resources, got %d", ws2Resources) + } + if ws3Resources < 3 { + t.Fatalf("WS3 (BC: offset 15) expected ~5 resources, got %d", ws3Resources) + } + + // Critical: Verify gradient (independent BC boundaries per workspace) + if ws1Resources <= ws2Resources { + t.Fatalf("WS1 (BC: offset 5) should have MORE resources than WS2 (BC: offset 10): ws1=%d, ws2=%d", + ws1Resources, ws2Resources) + } + if ws2Resources <= ws3Resources { + t.Fatalf("WS2 (BC: offset 10) should have MORE resources than WS3 (BC: offset 15): ws2=%d, ws3=%d", + ws2Resources, ws3Resources) + } + + // Verify snapshots updated independently + finalSnapshots, err := db.GetLatestWorkspaceSnapshots(ctx, wsIDs) + if err != nil { + t.Fatalf("Failed to get final snapshots: %v", err) + } + + if len(finalSnapshots) != 3 { + t.Fatalf("Expected 3 final snapshots, got %d", len(finalSnapshots)) + } + + // Verify each workspace's snapshot offset advanced + for i, wsID := range wsIDs { + finalSnapshot := finalSnapshots[wsID] + if finalSnapshot == nil { + t.Fatalf("No final snapshot for WS%d", i+1) + } + + if finalSnapshot.Offset <= snapshots[i].Offset { + t.Fatalf("WS%d snapshot should advance beyond %d, got %d", + i+1, snapshots[i].Offset, finalSnapshot.Offset) + } + } + + // Critical: Verify consumer seeked to EARLIEST offset (offset 6) + if ws1Resources == 0 { + t.Fatal("WS1 has no resources - consumer seeked to wrong offset (should seek to earliest: 6)") + } +} + +// TestEngine_Kafka_Replay_NoSnapshot tests behavior when no snapshot exists +// (new workspace or first time processing) - should load from database +func TestEngine_Kafka_Replay_NoSnapshot(t *testing.T) { + env := setupTestEnvironment(t) + defer env.cleanup() + + systemID := uuid.New().String() + resource1ID := uuid.New().String() + resource2ID := uuid.New().String() + + // Insert entities directly into database (simulating existing workspace data) + if err := insertSystemIntoDB(env.ctx, env.workspaceID, systemID, "db-system"); err != nil { + t.Fatalf("Failed to insert system: %v", err) + } + if err := insertResourceIntoDB(env.ctx, env.workspaceID, resource1ID, "db-resource-1"); err != nil { + t.Fatalf("Failed to insert resource 1: %v", err) + } + if err := insertResourceIntoDB(env.ctx, env.workspaceID, resource2ID, "db-resource-2"); err != nil { + t.Fatalf("Failed to insert resource 2: %v", err) + } + + // Verify NO snapshot exists + snapshot, err := db.GetWorkspaceSnapshot(env.ctx, env.workspaceID) + if err != nil { + t.Fatalf("Failed to check snapshot: %v", err) + } + if snapshot != nil { + t.Fatal("Expected no snapshot, but one exists") + } + + // Produce new resource via Kafka + newResourceID := uuid.New().String() + env.produceResourceCreateEvent(newResourceID) + env.produceWorkspaceSaveEvent() + + // Run consumer - should load initial state from DB, then process Kafka message + env.runConsumer(5 * time.Second) + + // Verify workspace loaded from database + ws := workspace.GetWorkspace(env.workspaceID) + if ws == nil { + t.Fatal("Workspace not found") + } + + // Verify entities from DB were loaded + if _, exists := ws.Systems().Get(systemID); !exists { + t.Fatal("System from DB not loaded (PopulateWorkspaceWithInitialState failed)") + } + if _, exists := ws.Resources().Get(resource1ID); !exists { + t.Fatal("Resource 1 from DB not loaded") + } + if _, exists := ws.Resources().Get(resource2ID); !exists { + t.Fatal("Resource 2 from DB not loaded") + } + + // Verify new resource from Kafka was also processed + if _, exists := ws.Resources().Get(newResourceID); !exists { + t.Fatal("New resource from Kafka not processed") + } + + // Total should be 3 (2 from DB + 1 from Kafka) + resources := ws.Resources().Items() + if len(resources) != 3 { + t.Fatalf("Expected 3 resources (2 from DB + 1 from Kafka), got %d", len(resources)) + } + + // Verify snapshot was created after processing Kafka messages + finalSnapshot, err := db.GetWorkspaceSnapshot(env.ctx, env.workspaceID) + if err != nil { + t.Fatalf("Failed to get final snapshot: %v", err) + } + if finalSnapshot == nil { + t.Fatal("Snapshot should be created after workspace.save event") + } + + // Verify all messages processed in normal mode (not replay) + // Since there was no previous snapshot, all messages are "new" (AD mode) + if finalSnapshot.Offset < 0 { + t.Fatalf("Invalid snapshot offset: %d", finalSnapshot.Offset) + } +} + +// TestEngine_Kafka_Replay_ReplayMode tests that replay mode is correctly detected +// and workspace state is rebuilt without triggering side effects (CRITICAL BUSINESS LOGIC) +func TestEngine_Kafka_Replay_ReplayMode(t *testing.T) { + env := setupTestEnvironment(t) + defer env.cleanup() + + resourceIDs := []string{ + uuid.New().String(), + uuid.New().String(), + uuid.New().String(), + } + + // Phase 1: Process initial messages and create snapshot + for _, resourceID := range resourceIDs { + env.produceResourceCreateEvent(resourceID) + } + env.produceWorkspaceSaveEvent() + env.runConsumer(5 * time.Second) + + // Get snapshot + snapshot, err := db.GetWorkspaceSnapshot(env.ctx, env.workspaceID) + if err != nil || snapshot == nil { + t.Fatal("Snapshot not created") + } + + // Verify resources exist + ws := workspace.GetWorkspace(env.workspaceID) + if len(ws.Resources().Items()) != len(resourceIDs) { + t.Fatalf("Expected %d resources, got %d", len(resourceIDs), len(ws.Resources().Items())) + } + + snapshotOffset := snapshot.Offset + + // Phase 2: Create old snapshot to simulate replay scenario + // Set snapshot offset back to simulate being behind committed offset + oldSnapshot := &db.WorkspaceSnapshot{ + WorkspaceID: env.workspaceID, + Path: snapshot.Path, + Timestamp: snapshot.Timestamp, + Partition: 0, + Offset: 0, // Set to beginning to force replay + NumPartitions: 1, + } + + if err := db.WriteWorkspaceSnapshot(env.ctx, oldSnapshot); err != nil { + t.Fatalf("Failed to write old snapshot: %v", err) + } + + // Phase 3: Produce new messages and verify they're processed in replay mode + // Since consumer has already committed up to snapshotOffset, but workspace snapshot is at 0, + // messages between 0 and snapshotOffset will be in replay mode + newResourceID := uuid.New().String() + env.produceResourceCreateEvent(newResourceID) + env.produceWorkspaceSaveEvent() + + // Run consumer - should process in replay mode for already-committed messages + env.runConsumer(5 * time.Second) + + // Verify workspace state was rebuilt + ws = workspace.GetWorkspace(env.workspaceID) + if ws == nil { + t.Fatal("Workspace not found after replay") + } + + // All original resources should still exist (state rebuilt from replay) + for _, resourceID := range resourceIDs { + if _, exists := ws.Resources().Get(resourceID); !exists { + t.Fatalf("Resource %s not found after replay (state should be rebuilt)", resourceID) + } + } + + // New resource should also exist + if _, exists := ws.Resources().Get(newResourceID); !exists { + t.Fatal("New resource not found after replay processing") + } + + // Verify final snapshot reflects the new offset + finalSnapshot, err := db.GetWorkspaceSnapshot(env.ctx, env.workspaceID) + if err != nil { + t.Fatalf("Failed to get final snapshot: %v", err) + } + + if finalSnapshot.Offset <= snapshotOffset { + t.Fatalf("Expected snapshot offset to advance beyond %d, got %d", + snapshotOffset, finalSnapshot.Offset) + } +} + +// TestEngine_Kafka_Replay_JobDispatchPrevention tests that workspace state is rebuilt +// correctly during replay mode (verifies replay flag behavior) +func TestEngine_Kafka_Replay_JobDispatchPrevention(t *testing.T) { + env := setupTestEnvironment(t) + defer env.cleanup() + + systemID := uuid.New().String() + environmentID := uuid.New().String() + deploymentID := uuid.New().String() + versionID := uuid.New().String() + jobAgentID := uuid.New().String() + resourceID := uuid.New().String() + + // Create complete deployment setup + env.produceSystemCreateEvent(systemID, "test-system") + env.produceEnvironmentCreateEvent(systemID, environmentID, "production") + env.produceGithubEntityCreateEvent("test-owner", 12345) + env.produceJobAgentCreateEvent(jobAgentID, "github-agent", 12345, "test-owner", "test-repo", 789) + env.produceDeploymentCreateEvent(systemID, deploymentID, "api-service", jobAgentID) + env.produceResourceCreateEvent(resourceID) + env.produceDeploymentVersionCreateEvent(deploymentID, versionID, "v1.0.0") + env.produceWorkspaceSaveEvent() + + // Run consumer + env.runConsumer(5 * time.Second) + + ws := workspace.GetWorkspace(env.workspaceID) + if ws == nil { + t.Fatal("Workspace not found") + } + + // Verify deployment setup + if _, exists := ws.Deployments().Get(deploymentID); !exists { + t.Fatal("Deployment not created") + } + if _, exists := ws.Environments().Get(environmentID); !exists { + t.Fatal("Environment not created") + } + if _, exists := ws.Resources().Get(resourceID); !exists { + t.Fatal("Resource not created") + } + + // Verify release targets created + releaseTargets, err := ws.ReleaseTargets().Items(env.ctx) + if err != nil { + t.Fatalf("Failed to get release targets: %v", err) + } + if len(releaseTargets) == 0 { + t.Fatal("No release targets created") + } + + initialJobCount := len(ws.Jobs().Items()) + + // Get snapshot + snapshot, err := db.GetWorkspaceSnapshot(env.ctx, env.workspaceID) + if err != nil || snapshot == nil { + t.Fatal("Snapshot not created") + } + + // Test replay mode: reset snapshot to force replay + oldSnapshot := &db.WorkspaceSnapshot{ + WorkspaceID: env.workspaceID, + Path: snapshot.Path, + Timestamp: snapshot.Timestamp, + Partition: 0, + Offset: 0, + NumPartitions: 1, + } + + if err := db.WriteWorkspaceSnapshot(env.ctx, oldSnapshot); err != nil { + t.Fatalf("Failed to write old snapshot: %v", err) + } + + // Create new version in replay mode + version2ID := uuid.New().String() + env.produceDeploymentVersionCreateEvent(deploymentID, version2ID, "v2.0.0") + env.produceWorkspaceSaveEvent() + + env.runConsumer(5 * time.Second) + + // Verify version created during replay + ws = workspace.GetWorkspace(env.workspaceID) + versions := ws.DeploymentVersions().Items() + + versionFound := false + for _, v := range versions { + if v.Id == version2ID { + versionFound = true + break + } + } + + if !versionFound { + t.Fatal("Deployment version not created during replay") + } + + // Verify workspace state maintained during replay + replayJobCount := len(ws.Jobs().Items()) + if replayJobCount < initialJobCount { + t.Fatal("Jobs lost during replay (state rebuild failed)") + } + + // Verify release targets still exist after replay + replayReleaseTargets, err := ws.ReleaseTargets().Items(env.ctx) + if err != nil { + t.Fatalf("Failed to get release targets after replay: %v", err) + } + if len(replayReleaseTargets) == 0 { + t.Fatal("Release targets lost during replay") + } + + // Verify snapshot was created and contains release targets + finalSnapshot, err := db.GetWorkspaceSnapshot(env.ctx, env.workspaceID) + if err != nil { + t.Fatalf("Failed to get final snapshot: %v", err) + } + if finalSnapshot == nil { + t.Fatal("No final snapshot created") + } + + // Load snapshot and verify release targets are persisted + storage := workspace.NewFileStorage("./state") + snapshotData, err := storage.Get(env.ctx, finalSnapshot.Path) + if err != nil { + t.Fatalf("Failed to read snapshot file: %v", err) + } + + restoredWs := workspace.New(uuid.New().String()) + if err := restoredWs.GobDecode(snapshotData); err != nil { + t.Fatalf("Failed to decode snapshot: %v", err) + } + + // Verify release targets restored from snapshot + restoredReleaseTargets, err := restoredWs.ReleaseTargets().Items(env.ctx) + if err != nil { + t.Fatalf("Failed to get release targets from restored workspace: %v", err) + } + if len(restoredReleaseTargets) == 0 { + t.Fatal("Release targets not persisted in snapshot") + } + if len(restoredReleaseTargets) != len(releaseTargets) { + t.Fatalf("Release target count mismatch: expected %d, got %d in restored snapshot", + len(releaseTargets), len(restoredReleaseTargets)) + } +} + +// TestEngine_Kafka_Replay_OffsetCommit tests that offsets are committed correctly +// after message processing +func TestEngine_Kafka_Replay_OffsetCommit(t *testing.T) { + t.Skip("TODO: Implement offset commit test") + // This test should: + // 1. Process messages and commit offsets + // 2. Restart consumer + // 3. Verify consumer resumes from committed offset + // 4. Verify messages before committed offset are treated as replay +} + +// TestEngine_Kafka_Replay_PartitionRebalance tests replay logic during partition rebalance +func TestEngine_Kafka_Replay_PartitionRebalance(t *testing.T) { + t.Skip("TODO: Implement partition rebalance test") + // This test should: + // 1. Start consumer with multiple partitions + // 2. Create snapshots for workspaces on different partitions + // 3. Simulate rebalance + // 4. Verify correct seek behavior for newly assigned partitions +} + +// Helper functions + +// testEnvironment encapsulates all test resources that need cleanup +type testEnvironment struct { + ctx context.Context + t *testing.T + workspaceID string + topicName string + producer *kafka.Producer + consumer *kafka.Consumer +} + +// setupTestEnvironment creates and initializes all test resources +func setupTestEnvironment(t *testing.T) *testEnvironment { + t.Helper() + + // Skip if Kafka not available + if !isKafkaAvailable(t) { + t.Skip("Kafka broker not available, skipping e2e test") + } + + ctx := context.Background() + + // Clean up any leftover test workspaces from previous runs + cleanupAllTestWorkspaces(ctx) + + workspaceID := uuid.New().String() + topicName := fmt.Sprintf("test-replay-%s-%s", t.Name(), uuid.New().String()[:8]) + + // Set environment variable for consumer to use this topic + // Must set BEFORE accessing kafka package variables + os.Setenv("KAFKA_TOPIC", topicName) + + // Force reload of kafka package configuration + kafkapkg.Topic = topicName + kafkapkg.GroupID = fmt.Sprintf("test-group-%s", uuid.New().String()[:8]) + + // Create workspace in database + if err := createTestWorkspace(ctx, workspaceID); err != nil { + t.Fatalf("Failed to create test workspace: %v", err) + } + + // Create Kafka producer + producer := createTestProducer(t) + + env := &testEnvironment{ + ctx: ctx, + t: t, + workspaceID: workspaceID, + topicName: topicName, + producer: producer, + } + + return env +} + +// cleanup cleans up all test resources +func (env *testEnvironment) cleanup() { + env.t.Helper() + + if env.consumer != nil { + env.consumer.Close() + } + if env.producer != nil { + env.producer.Close() + } + cleanupKafkaTopic(env.t, env.topicName) + cleanupTestWorkspace(env.ctx, env.workspaceID) + + // Reset kafka package variables to defaults + os.Unsetenv("KAFKA_TOPIC") + kafkapkg.Topic = "workspace-events" + kafkapkg.GroupID = "workspace-engine" +} + +// generateAlphanumeric generates a random alphanumeric string of specified length +func generateAlphanumeric(length int) string { + const charset = "abcdefghijklmnopqrstuvwxyz0123456789" + b := make([]byte, length) + for i := range b { + b[i] = charset[rand.Intn(len(charset))] + } + return string(b) +} + +// resourceFactory creates a resource payload with random alphanumeric values +type resourceFactory struct { + workspaceID string +} + +func newResourceFactory(workspaceID string) *resourceFactory { + return &resourceFactory{workspaceID: workspaceID} +} + +func (rf *resourceFactory) create(resourceID string) map[string]interface{} { + return map[string]interface{}{ + "id": resourceID, + "workspaceId": rf.workspaceID, + "name": fmt.Sprintf("resource-%s", generateAlphanumeric(8)), + "kind": fmt.Sprintf("kind-%s", generateAlphanumeric(6)), + "version": fmt.Sprintf("v%s", generateAlphanumeric(4)), + "identifier": generateAlphanumeric(12), + "config": map[string]interface{}{ + "region": generateAlphanumeric(10), + "environment": generateAlphanumeric(8), + "tier": generateAlphanumeric(6), + }, + "metadata": map[string]interface{}{ + "label-" + generateAlphanumeric(4): generateAlphanumeric(10), + "label-" + generateAlphanumeric(4): generateAlphanumeric(10), + "tag-" + generateAlphanumeric(4): generateAlphanumeric(8), + }, + } +} + +// produceResourceCreateEvent produces a resource creation event with auto-generated payload +func (env *testEnvironment) produceResourceCreateEvent(resourceID string) { + env.t.Helper() + + factory := newResourceFactory(env.workspaceID) + payload := factory.create(resourceID) + + event := createTestEvent(env.workspaceID, eventHandler.ResourceCreate, payload) + produceKafkaMessage(env.t, env.producer, env.topicName, event, int32(0), 0) +} + +// produceWorkspaceSaveEvent produces a workspace save event to trigger snapshot creation +func (env *testEnvironment) produceWorkspaceSaveEvent() { + env.t.Helper() + + event := createTestEvent(env.workspaceID, eventHandler.WorkspaceSave, map[string]interface{}{}) + produceKafkaMessage(env.t, env.producer, env.topicName, event, int32(0), 0) +} + +// produceSystemCreateEvent produces a system creation event +func (env *testEnvironment) produceSystemCreateEvent(systemID, name string) { + env.t.Helper() + + payload := map[string]interface{}{ + "id": systemID, + "workspaceId": env.workspaceID, + "name": name, + "description": fmt.Sprintf("System %s", name), + } + + event := createTestEvent(env.workspaceID, eventHandler.SystemCreate, payload) + produceKafkaMessage(env.t, env.producer, env.topicName, event, int32(0), 0) +} + +// produceEnvironmentCreateEvent produces an environment creation event with resource selector +func (env *testEnvironment) produceEnvironmentCreateEvent(systemID, environmentID, name string) { + env.t.Helper() + + // Match-all resource selector + selector := map[string]interface{}{ + "json": map[string]interface{}{ + "type": "name", + "operator": "contains", + "value": "", + }, + } + + payload := map[string]interface{}{ + "id": environmentID, + "workspaceId": env.workspaceID, + "systemId": systemID, + "name": name, + "resourceSelector": selector, + } + + event := createTestEvent(env.workspaceID, eventHandler.EnvironmentCreate, payload) + produceKafkaMessage(env.t, env.producer, env.topicName, event, int32(0), 0) +} + +// produceGithubEntityCreateEvent produces a GitHub entity creation event +func (env *testEnvironment) produceGithubEntityCreateEvent(slug string, installationID int) { + env.t.Helper() + + payload := map[string]interface{}{ + "workspaceId": env.workspaceID, + "slug": slug, + "installationId": installationID, + } + + event := createTestEvent(env.workspaceID, eventHandler.GithubEntityCreate, payload) + produceKafkaMessage(env.t, env.producer, env.topicName, event, int32(0), 0) +} + +// produceJobAgentCreateEvent produces a job agent creation event +func (env *testEnvironment) produceJobAgentCreateEvent(jobAgentID, name string, installationID int, owner, repo string, workflowID int) { + env.t.Helper() + + payload := map[string]interface{}{ + "id": jobAgentID, + "workspaceId": env.workspaceID, + "name": name, + "type": "github", + "config": map[string]interface{}{ + "installationId": installationID, + "owner": owner, + "repo": repo, + "workflowId": workflowID, + }, + } + + event := createTestEvent(env.workspaceID, eventHandler.JobAgentCreate, payload) + produceKafkaMessage(env.t, env.producer, env.topicName, event, int32(0), 0) +} + +// produceDeploymentCreateEvent produces a deployment creation event +func (env *testEnvironment) produceDeploymentCreateEvent(systemID, deploymentID, name, jobAgentID string) { + env.t.Helper() + + // Match-all resource selector + selector := map[string]interface{}{ + "json": map[string]interface{}{ + "type": "name", + "operator": "contains", + "value": "", + }, + } + + payload := map[string]interface{}{ + "id": deploymentID, + "workspaceId": env.workspaceID, + "systemId": systemID, + "name": name, + "slug": name, + "jobAgentId": jobAgentID, + "jobAgentConfig": map[string]interface{}{}, + "resourceSelector": selector, + } + + event := createTestEvent(env.workspaceID, eventHandler.DeploymentCreate, payload) + produceKafkaMessage(env.t, env.producer, env.topicName, event, int32(0), 0) +} + +// produceDeploymentVersionCreateEvent produces a deployment version creation event +func (env *testEnvironment) produceDeploymentVersionCreateEvent(deploymentID, versionID, version string) { + env.t.Helper() + + payload := map[string]interface{}{ + "id": versionID, + "workspaceId": env.workspaceID, + "deploymentId": deploymentID, + "version": version, + } + + event := createTestEvent(env.workspaceID, eventHandler.DeploymentVersionCreate, payload) + produceKafkaMessage(env.t, env.producer, env.topicName, event, int32(0), 0) +} + +// runConsumer runs the consumer for the specified duration and handles shutdown +func (env *testEnvironment) runConsumer(duration time.Duration) { + env.t.Helper() + + consumerCtx, cancelConsumer := context.WithCancel(env.ctx) + consumerDone := make(chan error, 1) + + go func() { + consumerDone <- kafkapkg.RunConsumer(consumerCtx) + }() + + time.Sleep(duration) + cancelConsumer() + + select { + case err := <-consumerDone: + if err != nil && err != context.Canceled { + env.t.Fatalf("Consumer stopped with unexpected error: %v", err) + } + case <-time.After(5 * time.Second): + env.t.Fatal("Consumer did not shutdown in time") + } +} + +func isKafkaAvailable(t *testing.T) bool { + t.Helper() + + brokers := os.Getenv("KAFKA_BROKERS") + if brokers == "" { + brokers = "localhost:9092" + } + + producer, err := kafka.NewProducer(&kafka.ConfigMap{ + "bootstrap.servers": brokers, + }) + if err != nil { + return false + } + defer producer.Close() + + // Try to get metadata + metadata, err := producer.GetMetadata(nil, false, 1000) + if err != nil { + return false + } + + return len(metadata.Brokers) > 0 +} + +func createTestProducer(t *testing.T) *kafka.Producer { + t.Helper() + + brokers := os.Getenv("KAFKA_BROKERS") + if brokers == "" { + brokers = "localhost:9092" + } + + producer, err := kafka.NewProducer(&kafka.ConfigMap{ + "bootstrap.servers": brokers, + }) + if err != nil { + t.Fatalf("Failed to create producer: %v", err) + } + + return producer +} + +func createTestConsumer(t *testing.T, topic string, groupID string) *kafka.Consumer { + t.Helper() + + brokers := os.Getenv("KAFKA_BROKERS") + if brokers == "" { + brokers = "localhost:9092" + } + + consumer, err := kafka.NewConsumer(&kafka.ConfigMap{ + "bootstrap.servers": brokers, + "group.id": groupID, + "auto.offset.reset": "earliest", + "enable.auto.commit": false, + }) + if err != nil { + t.Fatalf("Failed to create consumer: %v", err) + } + + return consumer +} + +func createTestEvent(workspaceID string, eventType eventHandler.EventType, payload map[string]interface{}) []byte { + payloadBytes, _ := json.Marshal(payload) + + event := eventHandler.RawEvent{ + EventType: eventType, + WorkspaceID: workspaceID, + Data: payloadBytes, + Timestamp: time.Now().UnixNano(), + } + + data, _ := json.Marshal(event) + return data +} + +func produceKafkaMessage(t *testing.T, producer *kafka.Producer, topic string, message []byte, partition int32, offset int64) { + t.Helper() + + // Create topic if it doesn't exist + adminClient, err := kafka.NewAdminClientFromProducer(producer) + if err != nil { + t.Fatalf("Failed to create admin client: %v", err) + } + defer adminClient.Close() + + // Check if topic exists + metadata, err := producer.GetMetadata(&topic, false, 5000) + if err != nil || len(metadata.Topics) == 0 || metadata.Topics[topic].Error.Code() != kafka.ErrNoError { + // Create topic + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + + results, err := adminClient.CreateTopics(ctx, []kafka.TopicSpecification{ + { + Topic: topic, + NumPartitions: 1, + ReplicationFactor: 1, + }, + }) + if err != nil { + t.Fatalf("Failed to create topic: %v", err) + } + + for _, result := range results { + if result.Error.Code() != kafka.ErrNoError && result.Error.Code() != kafka.ErrTopicAlreadyExists { + t.Fatalf("Failed to create topic %s: %v", result.Topic, result.Error) + } + } + + // Wait for topic to be ready + time.Sleep(1 * time.Second) + } + + // Produce message + deliveryChan := make(chan kafka.Event, 1) + err = producer.Produce(&kafka.Message{ + TopicPartition: kafka.TopicPartition{ + Topic: &topic, + Partition: partition, + }, + Value: message, + Timestamp: time.Now(), + }, deliveryChan) + + if err != nil { + t.Fatalf("Failed to produce message: %v", err) + } + + // Wait for delivery confirmation + e := <-deliveryChan + m := e.(*kafka.Message) + + if m.TopicPartition.Error != nil { + t.Fatalf("Failed to deliver message: %v", m.TopicPartition.Error) + } +} + +func cleanupKafkaTopic(t *testing.T, topic string) { + t.Helper() + + brokers := os.Getenv("KAFKA_BROKERS") + if brokers == "" { + brokers = "localhost:9092" + } + + adminClient, err := kafka.NewAdminClient(&kafka.ConfigMap{ + "bootstrap.servers": brokers, + }) + if err != nil { + return + } + defer adminClient.Close() + + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + + _, _ = adminClient.DeleteTopics(ctx, []string{topic}) +} + +func createTestWorkspace(ctx context.Context, workspaceID string) error { + conn, err := db.GetDB(ctx) + if err != nil { + return err + } + defer conn.Release() + + _, err = conn.Exec(ctx, ` + INSERT INTO workspace (id, name, slug, created_at) + VALUES ($1, $2, $3, NOW()) + `, workspaceID, "test-workspace-"+workspaceID[:8], "test-"+workspaceID[:8]) + + return err +} + +func cleanupTestWorkspace(ctx context.Context, workspaceID string) { + conn, err := db.GetDB(ctx) + if err != nil { + return + } + defer conn.Release() + + // Delete workspace (cascade will delete snapshots) + _, _ = conn.Exec(ctx, `DELETE FROM workspace WHERE id = $1`, workspaceID) +} + +func cleanupAllTestWorkspaces(ctx context.Context) { + conn, err := db.GetDB(ctx) + if err != nil { + return + } + defer conn.Release() + + // Delete all workspaces with test prefix (from this or previous test runs) + // This also deletes their snapshots via CASCADE + _, _ = conn.Exec(ctx, `DELETE FROM workspace WHERE slug LIKE 'test-%'`) + + // Also clean up orphaned snapshot files from ./state directory + _ = os.RemoveAll("./state") + _ = os.Mkdir("./state", 0755) +} + +func insertSystemIntoDB(ctx context.Context, workspaceID, systemID, name string) error { + conn, err := db.GetDB(ctx) + if err != nil { + return err + } + defer conn.Release() + + _, err = conn.Exec(ctx, ` + INSERT INTO system (id, workspace_id, name, slug, description) + VALUES ($1, $2, $3, $4, $5) + `, systemID, workspaceID, name, name, "") + + return err +} + +func insertResourceIntoDB(ctx context.Context, workspaceID, resourceID, name string) error { + conn, err := db.GetDB(ctx) + if err != nil { + return err + } + defer conn.Release() + + _, err = conn.Exec(ctx, ` + INSERT INTO resource (id, workspace_id, name, version, kind, identifier, config) + VALUES ($1, $2, $3, $4, $5, $6, $7) + `, resourceID, workspaceID, name, "v1", "test-kind", name, "{}") + + return err +} diff --git a/apps/workspace-engine/test/integration/workspace.go b/apps/workspace-engine/test/integration/workspace.go index 5b77ff354..5e90b34bb 100644 --- a/apps/workspace-engine/test/integration/workspace.go +++ b/apps/workspace-engine/test/integration/workspace.go @@ -161,7 +161,7 @@ func (tw *TestWorkspace) PushEvent(ctx context.Context, eventType handler.EventT // Create a mock Kafka message topic := "test-topic" partition := int32(0) - offset := kafka.Offset(0) + offset := kafka.Offset(1) msg := &kafka.Message{ TopicPartition: kafka.TopicPartition{ @@ -172,7 +172,13 @@ func (tw *TestWorkspace) PushEvent(ctx context.Context, eventType handler.EventT Value: eventBytes, } - if _, err := tw.eventListener.ListenAndRoute(ctx, msg); err != nil { + offsetTracker := handler.OffsetTracker{ + LastCommittedOffset: 0, + LastWorkspaceOffset: 0, + MessageOffset: int64(offset), + } + + if _, err := tw.eventListener.ListenAndRoute(ctx, msg, offsetTracker); err != nil { tw.t.Fatalf("failed to listen and route event: %v", err) }