Skip to content

Commit dc1258e

Browse files
chore: smarter loading scheme (#696)
1 parent b230dd8 commit dc1258e

File tree

14 files changed

+1612
-27
lines changed

14 files changed

+1612
-27
lines changed

apps/workspace-engine/pkg/db/workspaces.go

Lines changed: 62 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -89,14 +89,26 @@ func GetAllWorkspaceIDs(ctx context.Context) ([]string, error) {
8989
}
9090

9191
type WorkspaceSnapshot struct {
92+
WorkspaceID string
9293
Path string
9394
Timestamp time.Time
9495
Partition int32
9596
NumPartitions int32
97+
Offset int64
9698
}
9799

98100
const WORKSPACE_SNAPSHOT_SELECT_QUERY = `
99-
SELECT path, timestamp, partition, num_partitions FROM workspace_snapshot WHERE workspace_id = $1 ORDER BY timestamp DESC LIMIT 1
101+
SELECT
102+
workspace_id,
103+
path,
104+
timestamp,
105+
partition,
106+
num_partitions,
107+
"offset"
108+
FROM workspace_snapshot
109+
WHERE workspace_id = $1
110+
ORDER BY "offset" DESC, timestamp DESC
111+
LIMIT 1
100112
`
101113

102114
func GetWorkspaceSnapshot(ctx context.Context, workspaceID string) (*WorkspaceSnapshot, error) {
@@ -108,10 +120,12 @@ func GetWorkspaceSnapshot(ctx context.Context, workspaceID string) (*WorkspaceSn
108120

109121
workspaceSnapshot := &WorkspaceSnapshot{}
110122
err = db.QueryRow(ctx, WORKSPACE_SNAPSHOT_SELECT_QUERY, workspaceID).Scan(
123+
&workspaceSnapshot.WorkspaceID,
111124
&workspaceSnapshot.Path,
112125
&workspaceSnapshot.Timestamp,
113126
&workspaceSnapshot.Partition,
114127
&workspaceSnapshot.NumPartitions,
128+
&workspaceSnapshot.Offset,
115129
)
116130
if err != nil {
117131
if err == pgx.ErrNoRows {
@@ -122,19 +136,62 @@ func GetWorkspaceSnapshot(ctx context.Context, workspaceID string) (*WorkspaceSn
122136
return workspaceSnapshot, nil
123137
}
124138

139+
func GetLatestWorkspaceSnapshots(ctx context.Context, workspaceIDs []string) (map[string]*WorkspaceSnapshot, error) {
140+
if len(workspaceIDs) == 0 {
141+
return nil, nil
142+
}
143+
144+
db, err := GetDB(ctx)
145+
if err != nil {
146+
return nil, err
147+
}
148+
defer db.Release()
149+
150+
const query = `
151+
SELECT DISTINCT ON (workspace_id) workspace_id, path, timestamp, partition, num_partitions, "offset"
152+
FROM workspace_snapshot
153+
WHERE workspace_id = ANY($1)
154+
ORDER BY workspace_id, "offset" DESC, timestamp DESC
155+
`
156+
rows, err := db.Query(ctx, query, workspaceIDs)
157+
if err != nil {
158+
return nil, err
159+
}
160+
defer rows.Close()
161+
162+
var snapshots []*WorkspaceSnapshot
163+
for rows.Next() {
164+
var snapshot WorkspaceSnapshot
165+
err := rows.Scan(&snapshot.WorkspaceID, &snapshot.Path, &snapshot.Timestamp, &snapshot.Partition, &snapshot.NumPartitions, &snapshot.Offset)
166+
if err != nil {
167+
return nil, err
168+
}
169+
snapshots = append(snapshots, &snapshot)
170+
}
171+
if err := rows.Err(); err != nil {
172+
return nil, err
173+
}
174+
175+
snapshotMap := make(map[string]*WorkspaceSnapshot)
176+
for _, snapshot := range snapshots {
177+
snapshotMap[snapshot.WorkspaceID] = snapshot
178+
}
179+
return snapshotMap, nil
180+
}
181+
125182
const WORKSPACE_SNAPSHOT_INSERT_QUERY = `
126-
INSERT INTO workspace_snapshot (workspace_id, path, timestamp, partition, num_partitions)
127-
VALUES ($1, $2, $3, $4, $5)
183+
INSERT INTO workspace_snapshot (workspace_id, path, timestamp, partition, num_partitions, "offset")
184+
VALUES ($1, $2, $3, $4, $5, $6)
128185
`
129186

130-
func WriteWorkspaceSnapshot(ctx context.Context, workspaceID string, snapshot *WorkspaceSnapshot) error {
187+
func WriteWorkspaceSnapshot(ctx context.Context, snapshot *WorkspaceSnapshot) error {
131188
db, err := GetDB(ctx)
132189
if err != nil {
133190
return err
134191
}
135192
defer db.Release()
136193

137-
_, err = db.Exec(ctx, WORKSPACE_SNAPSHOT_INSERT_QUERY, workspaceID, snapshot.Path, snapshot.Timestamp, snapshot.Partition, snapshot.NumPartitions)
194+
_, err = db.Exec(ctx, WORKSPACE_SNAPSHOT_INSERT_QUERY, snapshot.WorkspaceID, snapshot.Path, snapshot.Timestamp, snapshot.Partition, snapshot.NumPartitions, snapshot.Offset)
138195
if err != nil {
139196
return err
140197
}

apps/workspace-engine/pkg/db/workspaces_test.go

Lines changed: 121 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package db
22

33
import (
44
"testing"
5+
"time"
56

67
"github.com/google/uuid"
78
)
@@ -69,3 +70,123 @@ func TestDBWorkspaces_GetWorkspaceIDs(t *testing.T) {
6970
_ = conn2
7071
_ = conn3
7172
}
73+
74+
func TestDBWorkspaces_GetLatestWorkspaceSnapshots(t *testing.T) {
75+
ctx := t.Context()
76+
77+
// Create 3 workspaces
78+
ws1ID, _ := setupTestWithWorkspace(t)
79+
ws2ID, _ := setupTestWithWorkspace(t)
80+
ws3ID, _ := setupTestWithWorkspace(t)
81+
82+
// Create multiple snapshots for workspace 1 (verify it returns latest by offset)
83+
ws1Snapshots := []*WorkspaceSnapshot{
84+
{
85+
WorkspaceID: ws1ID,
86+
Path: "ws1-v1.gob",
87+
Timestamp: time.Now().Add(-10 * time.Hour), // Oldest time
88+
Partition: 0,
89+
Offset: 100, // Highest offset - should be returned
90+
NumPartitions: 3,
91+
},
92+
{
93+
WorkspaceID: ws1ID,
94+
Path: "ws1-v2.gob",
95+
Timestamp: time.Now(), // Newest time
96+
Partition: 0,
97+
Offset: 50, // Lower offset
98+
NumPartitions: 3,
99+
},
100+
{
101+
WorkspaceID: ws1ID,
102+
Path: "ws1-v3.gob",
103+
Timestamp: time.Now().Add(-5 * time.Hour),
104+
Partition: 0,
105+
Offset: 75,
106+
NumPartitions: 3,
107+
},
108+
}
109+
110+
for _, snapshot := range ws1Snapshots {
111+
if err := WriteWorkspaceSnapshot(ctx, snapshot); err != nil {
112+
t.Fatalf("Failed to write snapshot for ws1: %v", err)
113+
}
114+
}
115+
116+
// Create snapshots for workspace 2
117+
ws2Snapshot := &WorkspaceSnapshot{
118+
WorkspaceID: ws2ID,
119+
Path: "ws2.gob",
120+
Timestamp: time.Now(),
121+
Partition: 1,
122+
Offset: 200,
123+
NumPartitions: 3,
124+
}
125+
126+
if err := WriteWorkspaceSnapshot(ctx, ws2Snapshot); err != nil {
127+
t.Fatalf("Failed to write snapshot for ws2: %v", err)
128+
}
129+
130+
// Workspace 3 has no snapshots
131+
132+
// Test 1: Get snapshots for workspaces with snapshots
133+
snapshots, err := GetLatestWorkspaceSnapshots(ctx, []string{ws1ID, ws2ID})
134+
if err != nil {
135+
t.Fatalf("Failed to get latest snapshots: %v", err)
136+
}
137+
138+
if len(snapshots) != 2 {
139+
t.Fatalf("Expected 2 snapshots, got %d", len(snapshots))
140+
}
141+
142+
// Verify workspace 1 returns snapshot with HIGHEST offset (not newest timestamp)
143+
ws1Snapshot := snapshots[ws1ID]
144+
if ws1Snapshot == nil {
145+
t.Fatal("No snapshot returned for ws1")
146+
}
147+
if ws1Snapshot.Offset != 100 {
148+
t.Fatalf("Expected ws1 snapshot with offset 100 (highest), got %d", ws1Snapshot.Offset)
149+
}
150+
if ws1Snapshot.Path != "ws1-v1.gob" {
151+
t.Fatalf("Expected ws1-v1.gob (highest offset), got %s", ws1Snapshot.Path)
152+
}
153+
154+
// Verify workspace 2 returns its snapshot
155+
ws2SnapshotResult := snapshots[ws2ID]
156+
if ws2SnapshotResult == nil {
157+
t.Fatal("No snapshot returned for ws2")
158+
}
159+
if ws2SnapshotResult.Offset != 200 {
160+
t.Fatalf("Expected ws2 snapshot with offset 200, got %d", ws2SnapshotResult.Offset)
161+
}
162+
163+
// Test 2: Get snapshots including workspace with no snapshots
164+
snapshots, err = GetLatestWorkspaceSnapshots(ctx, []string{ws1ID, ws2ID, ws3ID})
165+
if err != nil {
166+
t.Fatalf("Failed to get snapshots with ws3: %v", err)
167+
}
168+
169+
// Should still return 2 (ws3 has no snapshots)
170+
if len(snapshots) != 2 {
171+
t.Fatalf("Expected 2 snapshots (ws3 has none), got %d", len(snapshots))
172+
}
173+
174+
// Test 3: Empty workspace ID array
175+
snapshots, err = GetLatestWorkspaceSnapshots(ctx, []string{})
176+
if err != nil {
177+
t.Fatalf("Expected no error for empty array, got %v", err)
178+
}
179+
if snapshots != nil {
180+
t.Fatalf("Expected nil for empty array, got %d snapshots", len(snapshots))
181+
}
182+
183+
// Test 4: Non-existent workspace IDs
184+
fakeID := uuid.New().String()
185+
snapshots, err = GetLatestWorkspaceSnapshots(ctx, []string{fakeID})
186+
if err != nil {
187+
t.Fatalf("Expected no error for non-existent workspace, got %v", err)
188+
}
189+
if len(snapshots) != 0 {
190+
t.Fatalf("Expected 0 snapshots for non-existent workspace, got %d", len(snapshots))
191+
}
192+
}

apps/workspace-engine/pkg/events/events.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ import (
1717
"workspace-engine/pkg/events/handler/system"
1818
"workspace-engine/pkg/events/handler/tick"
1919
"workspace-engine/pkg/events/handler/userapprovalrecords"
20+
"workspace-engine/pkg/events/handler/workspacesave"
2021
)
2122

2223
var handlers = handler.HandlerRegistry{
@@ -75,6 +76,7 @@ var handlers = handler.HandlerRegistry{
7576
handler.GithubEntityDelete: githubentities.HandleGithubEntityDeleted,
7677

7778
handler.WorkspaceTick: tick.HandleWorkspaceTick,
79+
handler.WorkspaceSave: workspacesave.HandleWorkspaceSave,
7880

7981
handler.ReleaseTargetDeploy: redeploy.HandleReleaseTargetDeploy,
8082
}

apps/workspace-engine/pkg/events/handler/handler.go

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,7 @@ const (
7676
GithubEntityDelete EventType = "github-entity.deleted"
7777

7878
WorkspaceTick EventType = "workspace.tick"
79+
WorkspaceSave EventType = "workspace.save"
7980

8081
ReleaseTargetDeploy EventType = "release-target.deploy"
8182
)
@@ -104,8 +105,14 @@ func NewEventListener(handlers HandlerRegistry) *EventListener {
104105
return &EventListener{handlers: handlers}
105106
}
106107

108+
type OffsetTracker struct {
109+
LastCommittedOffset int64
110+
LastWorkspaceOffset int64
111+
MessageOffset int64
112+
}
113+
107114
// ListenAndRoute processes incoming Kafka messages and routes them to the appropriate handler
108-
func (el *EventListener) ListenAndRoute(ctx context.Context, msg *kafka.Message) (*workspace.Workspace, error) {
115+
func (el *EventListener) ListenAndRoute(ctx context.Context, msg *kafka.Message, offsetTracker OffsetTracker) (*workspace.Workspace, error) {
109116
ctx, span := tracer.Start(ctx, "ListenAndRoute",
110117
trace.WithAttributes(
111118
attribute.String("kafka.topic", *msg.TopicPartition.Topic),
@@ -151,6 +158,13 @@ func (el *EventListener) ListenAndRoute(ctx context.Context, msg *kafka.Message)
151158
return nil, fmt.Errorf("workspace not found: %s", rawEvent.WorkspaceID)
152159
}
153160

161+
isReplay := offsetTracker.MessageOffset <= offsetTracker.LastCommittedOffset
162+
ws.Store().SetIsReplay(isReplay)
163+
164+
if offsetTracker.MessageOffset <= offsetTracker.LastWorkspaceOffset {
165+
return ws, nil
166+
}
167+
154168
ctx = changeset.WithChangeSet(ctx, changeSet)
155169

156170
if err := handler(ctx, ws, rawEvent); err != nil {
Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
package workspacesave
2+
3+
import (
4+
"context"
5+
"encoding/json"
6+
"workspace-engine/pkg/events/handler"
7+
"workspace-engine/pkg/workspace"
8+
9+
"github.com/confluentinc/confluent-kafka-go/v2/kafka"
10+
)
11+
12+
func HandleWorkspaceSave(_ctx context.Context, _ws *workspace.Workspace, _event handler.RawEvent) error {
13+
return nil
14+
}
15+
16+
func IsWorkspaceSaveEvent(msg *kafka.Message) bool {
17+
var rawEvent handler.RawEvent
18+
if err := json.Unmarshal(msg.Value, &rawEvent); err != nil {
19+
return false
20+
}
21+
22+
return rawEvent.EventType == handler.WorkspaceSave
23+
}

apps/workspace-engine/pkg/kafka/consumer.go

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,9 @@
11
package kafka
22

33
import (
4+
"context"
5+
"workspace-engine/pkg/db"
6+
47
"github.com/charmbracelet/log"
58
"github.com/confluentinc/confluent-kafka-go/v2/kafka"
69
)
@@ -27,3 +30,50 @@ func createConsumer() (*kafka.Consumer, error) {
2730

2831
return c, nil
2932
}
33+
34+
func getEarliestOffset(snapshots map[string]*db.WorkspaceSnapshot) int64 {
35+
beginning := int64(kafka.OffsetBeginning)
36+
if len(snapshots) == 0 {
37+
return beginning
38+
}
39+
40+
var earliestOffset int64
41+
has := false
42+
for _, snapshot := range snapshots {
43+
if snapshot == nil {
44+
continue
45+
}
46+
if !has || snapshot.Offset < earliestOffset {
47+
earliestOffset = snapshot.Offset
48+
has = true
49+
}
50+
}
51+
if !has {
52+
return beginning
53+
}
54+
return earliestOffset
55+
}
56+
57+
func setOffsets(ctx context.Context, consumer *kafka.Consumer, partitionWorkspaceMap map[int32][]string) {
58+
for partition, workspaceIDs := range partitionWorkspaceMap {
59+
snapshots, err := db.GetLatestWorkspaceSnapshots(ctx, workspaceIDs)
60+
if err != nil {
61+
log.Error("Failed to get latest workspace snapshots", "error", err)
62+
continue
63+
}
64+
65+
earliestOffset := getEarliestOffset(snapshots)
66+
effectiveOffset := earliestOffset
67+
if effectiveOffset > 0 {
68+
effectiveOffset = effectiveOffset + 1
69+
}
70+
if err := consumer.Seek(kafka.TopicPartition{
71+
Topic: &Topic,
72+
Partition: partition,
73+
Offset: kafka.Offset(effectiveOffset),
74+
}, 0); err != nil {
75+
log.Error("Failed to seek to earliest offset", "error", err)
76+
continue
77+
}
78+
}
79+
}

0 commit comments

Comments
 (0)