Skip to content

Commit 2641f7f

Browse files
committed
init seed handler
1 parent 6828ed6 commit 2641f7f

File tree

9 files changed

+194
-50
lines changed

9 files changed

+194
-50
lines changed

apps/workspace-engine/pkg/events/handler/handler.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -153,9 +153,9 @@ func (el *EventListener) ListenAndRoute(ctx context.Context, msg *kafka.Message,
153153
var ws *workspace.Workspace
154154
changeSet := changeset.NewChangeSet[any]()
155155

156-
ws = workspace.GetWorkspace(rawEvent.WorkspaceID)
156+
ws, err := workspace.GetWorkspaceAndLoad(rawEvent.WorkspaceID)
157157
if ws == nil {
158-
return nil, fmt.Errorf("workspace not found: %s", rawEvent.WorkspaceID)
158+
return nil, fmt.Errorf("workspace not found: %s: %w", rawEvent.WorkspaceID, err)
159159
}
160160

161161
isReplay := offsetTracker.MessageOffset <= offsetTracker.LastCommittedOffset

apps/workspace-engine/pkg/kafka/consumer.go

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -54,12 +54,12 @@ func getEarliestOffset(snapshots map[string]*db.WorkspaceSnapshot) int64 {
5454
return earliestOffset
5555
}
5656

57-
func setOffsets(ctx context.Context, consumer *kafka.Consumer, partitionWorkspaceMap map[int32][]string) {
57+
func setOffsets(ctx context.Context, consumer *kafka.Consumer, partitionWorkspaceMap map[int32][]string) error {
5858
for partition, workspaceIDs := range partitionWorkspaceMap {
5959
snapshots, err := db.GetLatestWorkspaceSnapshots(ctx, workspaceIDs)
6060
if err != nil {
6161
log.Error("Failed to get latest workspace snapshots", "error", err)
62-
continue
62+
return err
6363
}
6464

6565
earliestOffset := getEarliestOffset(snapshots)
@@ -73,7 +73,8 @@ func setOffsets(ctx context.Context, consumer *kafka.Consumer, partitionWorkspac
7373
Offset: kafka.Offset(effectiveOffset),
7474
}, 0); err != nil {
7575
log.Error("Failed to seek to earliest offset", "error", err)
76-
continue
76+
return err
7777
}
7878
}
79+
return nil
7980
}

apps/workspace-engine/pkg/kafka/kafka.go

Lines changed: 6 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -11,11 +11,9 @@ import (
1111
"workspace-engine/pkg/events"
1212
eventHanlder "workspace-engine/pkg/events/handler"
1313
"workspace-engine/pkg/events/handler/workspacesave"
14-
"workspace-engine/pkg/oapi"
1514
"workspace-engine/pkg/workspace"
1615
wskafka "workspace-engine/pkg/workspace/kafka"
1716

18-
"github.com/aws/smithy-go/ptr"
1917
"github.com/charmbracelet/log"
2018
"github.com/confluentinc/confluent-kafka-go/v2/kafka"
2119
)
@@ -118,38 +116,22 @@ func RunConsumer(ctx context.Context) error {
118116
allWorkspaceIDs = append(allWorkspaceIDs, workspaceIDs...)
119117
}
120118

121-
storage := workspace.NewFileStorage("./state")
122-
if workspace.IsGCSStorageEnabled() {
123-
storage, err = workspace.NewGCSStorageClient(ctx)
124-
if err != nil {
125-
return fmt.Errorf("failed to create GCS storage: %w", err)
126-
}
127-
}
128-
129119
log.Info("All workspace IDs", "workspaceIDs", allWorkspaceIDs)
130120
for _, workspaceID := range allWorkspaceIDs {
131-
ws := workspace.GetWorkspace(workspaceID)
121+
ws, err := workspace.GetWorkspaceAndLoad(workspaceID)
132122
if ws == nil {
133-
log.Error("Workspace not found", "workspaceID", workspaceID)
134-
continue
135-
}
136-
if err := workspace.Load(ctx, storage, ws); err != nil {
137-
log.Error("Failed to load workspace", "workspaceID", workspaceID, "error", err)
123+
log.Error("Workspace not found", "workspaceID", workspaceID, "error", err)
138124
continue
139125
}
126+
}
140127

141-
ws.Systems().Upsert(ctx, &oapi.System{
142-
Id: "00000000-0000-0000-0000-000000000000",
143-
Name: "Default",
144-
Description: ptr.String("Default system"),
145-
})
128+
if err := setOffsets(ctx, consumer, partitionWorkspaceMap); err != nil {
129+
return fmt.Errorf("failed to set offsets: %w", err)
146130
}
147131

148132
// Start consuming messages
149133
handler := events.NewEventHandler()
150134

151-
setOffsets(ctx, consumer, partitionWorkspaceMap)
152-
153135
for {
154136
// Check for cancellation
155137
select {
@@ -212,7 +194,7 @@ func RunConsumer(ctx context.Context) error {
212194
NumPartitions: numPartitions,
213195
}
214196

215-
if err := workspace.Save(ctx, storage, ws, snapshot); err != nil {
197+
if err := workspace.Save(ctx, ws, snapshot); err != nil {
216198
log.Error("Failed to save workspace", "workspaceID", ws.ID, "snapshotPath", snapshot.Path, "error", err)
217199
}
218200
}

apps/workspace-engine/pkg/server/openapi/utils/utils.go

Lines changed: 2 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -18,13 +18,8 @@ func GetWorkspace(c *gin.Context, workspaceId string) (*workspace.Workspace, err
1818
}
1919

2020
if workspace.Exists(workspaceId) {
21-
return workspace.GetWorkspace(workspaceId), nil
21+
return workspace.GetWorkspaceAndLoad(workspaceId)
2222
}
2323

24-
ws := workspace.New(workspaceId)
25-
if err := workspace.PopulateWorkspaceWithInitialState(c.Request.Context(), ws); err != nil {
26-
return nil, fmt.Errorf("failed to populate workspace with initial state: %w", err)
27-
}
28-
workspace.Set(workspaceId, ws)
29-
return ws, nil
24+
return nil, fmt.Errorf("workspace %s not found", workspaceId)
3025
}

apps/workspace-engine/pkg/workspace/loader.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ import (
1313

1414
var tracer = otel.Tracer("workspace/loader")
1515

16-
func Save(ctx context.Context, storage StorageClient, workspace *Workspace, snapshot *db.WorkspaceSnapshot) error {
16+
func Save(ctx context.Context, workspace *Workspace, snapshot *db.WorkspaceSnapshot) error {
1717
ctx, span := tracer.Start(ctx, "SaveWorkspace")
1818
defer span.End()
1919

@@ -30,7 +30,7 @@ func Save(ctx context.Context, storage StorageClient, workspace *Workspace, snap
3030
}
3131

3232
// Write to file with appropriate permissions
33-
if err := storage.Put(ctx, snapshot.Path, data); err != nil {
33+
if err := Storage.Put(ctx, snapshot.Path, data); err != nil {
3434
span.RecordError(err)
3535
span.SetStatus(codes.Error, "Failed to write workspace to disk")
3636
return fmt.Errorf("failed to write workspace to disk: %w", err)

apps/workspace-engine/pkg/workspace/storage.go

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,10 @@
11
package workspace
22

33
import (
4+
"context"
45
"errors"
6+
7+
"github.com/charmbracelet/log"
58
)
69

710
var ErrWorkspaceSnapshotNotFound = errors.New("workspace snapshot not found")
@@ -10,3 +13,24 @@ type WorkspaceStorageObject struct {
1013
ID string
1114
StoreData []byte
1215
}
16+
17+
var Storage StorageClient = nil
18+
19+
func init() {
20+
log.Info("Initializing workspace storage")
21+
ctx := context.Background()
22+
if IsGCSStorageEnabled() {
23+
log.Info("Using GCS storage")
24+
storage, err := NewGCSStorageClient(ctx)
25+
if err != nil {
26+
log.Error("Failed to create GCS storage", "error", err)
27+
panic(err)
28+
}
29+
Storage = storage
30+
return
31+
}
32+
33+
log.Info("Using file storage")
34+
storage := NewFileStorage("./state")
35+
Storage = storage
36+
}

apps/workspace-engine/pkg/workspace/workspace.go

Lines changed: 26 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,12 +2,16 @@ package workspace
22

33
import (
44
"bytes"
5+
"context"
56
"encoding/gob"
67
"workspace-engine/pkg/changeset"
78
"workspace-engine/pkg/cmap"
89
"workspace-engine/pkg/db"
10+
"workspace-engine/pkg/oapi"
911
"workspace-engine/pkg/workspace/releasemanager"
1012
"workspace-engine/pkg/workspace/store"
13+
14+
"github.com/aws/smithy-go/ptr"
1115
)
1216

1317
var _ gob.GobEncoder = (*Workspace)(nil)
@@ -27,6 +31,21 @@ func New(id string) *Workspace {
2731
return ws
2832
}
2933

34+
func NewAndLoad(ctx context.Context, id string) (*Workspace, error) {
35+
ws := New(id)
36+
if err := Load(ctx, Storage, ws); err != nil {
37+
return nil, err
38+
}
39+
40+
ws.Systems().Upsert(ctx, &oapi.System{
41+
Id: "00000000-0000-0000-0000-000000000000",
42+
Name: "Default",
43+
Description: ptr.String("Default system"),
44+
})
45+
46+
return ws, nil
47+
}
48+
3049
func NewNoFlush(id string) *Workspace {
3150
s := store.New()
3251
rm := releasemanager.New(s)
@@ -195,13 +214,17 @@ func HasWorkspace(id string) bool {
195214
return workspaces.Has(id)
196215
}
197216

198-
func GetWorkspace(id string) *Workspace {
217+
func GetWorkspaceAndLoad(id string) (*Workspace, error) {
199218
workspace, _ := workspaces.Get(id)
200219
if workspace == nil {
201-
workspace = New(id)
220+
workspace, err := NewAndLoad(context.Background(), id)
221+
if workspace == nil {
222+
return nil, err
223+
}
202224
workspaces.Set(id, workspace)
225+
return workspace, err
203226
}
204-
return workspace
227+
return workspace, nil
205228
}
206229

207230
func GetNoFlushWorkspace(id string) *Workspace {

apps/workspace-engine/test/e2e/engine_kafka_replay_test.go

Lines changed: 36 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,10 @@ func TestEngine_Kafka_Replay_BasicFlow(t *testing.T) {
4545
env.runConsumer(5 * time.Second)
4646

4747
// Verify workspace state was updated
48-
ws := workspace.GetWorkspace(env.workspaceID)
48+
ws, err := workspace.GetWorkspaceAndLoad(env.workspaceID)
49+
if err != nil {
50+
t.Fatalf("Failed to get workspace: %v", err)
51+
}
4952
if ws == nil {
5053
t.Fatal("Workspace not found in memory")
5154
}
@@ -299,9 +302,18 @@ func TestEngine_Kafka_Replay_MultipleWorkspaces(t *testing.T) {
299302
time.Sleep(1 * time.Second)
300303

301304
// Verify each workspace processed correct messages
302-
ws1 := workspace.GetWorkspace(wsIDs[0])
303-
ws2 := workspace.GetWorkspace(wsIDs[1])
304-
ws3 := workspace.GetWorkspace(wsIDs[2])
305+
ws1, err := workspace.GetWorkspaceAndLoad(wsIDs[0])
306+
if err != nil {
307+
t.Fatalf("Failed to get workspace: %v", err)
308+
}
309+
ws2, err := workspace.GetWorkspaceAndLoad(wsIDs[1])
310+
if err != nil {
311+
t.Fatalf("Failed to get workspace: %v", err)
312+
}
313+
ws3, err := workspace.GetWorkspaceAndLoad(wsIDs[2])
314+
if err != nil {
315+
t.Fatalf("Failed to get workspace: %v", err)
316+
}
305317

306318
if ws1 == nil || ws2 == nil || ws3 == nil {
307319
t.Fatal("Not all workspaces found")
@@ -408,7 +420,10 @@ func TestEngine_Kafka_Replay_NoSnapshot(t *testing.T) {
408420
env.runConsumer(5 * time.Second)
409421

410422
// Verify workspace loaded from database
411-
ws := workspace.GetWorkspace(env.workspaceID)
423+
ws, err := workspace.GetWorkspaceAndLoad(env.workspaceID)
424+
if err != nil {
425+
t.Fatalf("Failed to get workspace: %v", err)
426+
}
412427
if ws == nil {
413428
t.Fatal("Workspace not found")
414429
}
@@ -477,7 +492,10 @@ func TestEngine_Kafka_Replay_ReplayMode(t *testing.T) {
477492
}
478493

479494
// Verify resources exist
480-
ws := workspace.GetWorkspace(env.workspaceID)
495+
ws, err := workspace.GetWorkspaceAndLoad(env.workspaceID)
496+
if err != nil {
497+
t.Fatalf("Failed to get workspace: %v", err)
498+
}
481499
if len(ws.Resources().Items()) != len(resourceIDs) {
482500
t.Fatalf("Expected %d resources, got %d", len(resourceIDs), len(ws.Resources().Items()))
483501
}
@@ -510,7 +528,10 @@ func TestEngine_Kafka_Replay_ReplayMode(t *testing.T) {
510528
env.runConsumer(5 * time.Second)
511529

512530
// Verify workspace state was rebuilt
513-
ws = workspace.GetWorkspace(env.workspaceID)
531+
ws, err = workspace.GetWorkspaceAndLoad(env.workspaceID)
532+
if err != nil {
533+
t.Fatalf("Failed to get workspace: %v", err)
534+
}
514535
if ws == nil {
515536
t.Fatal("Workspace not found after replay")
516537
}
@@ -565,7 +586,10 @@ func TestEngine_Kafka_Replay_JobDispatchPrevention(t *testing.T) {
565586
// Run consumer
566587
env.runConsumer(5 * time.Second)
567588

568-
ws := workspace.GetWorkspace(env.workspaceID)
589+
ws, err := workspace.GetWorkspaceAndLoad(env.workspaceID)
590+
if err != nil {
591+
t.Fatalf("Failed to get workspace: %v", err)
592+
}
569593
if ws == nil {
570594
t.Fatal("Workspace not found")
571595
}
@@ -620,7 +644,10 @@ func TestEngine_Kafka_Replay_JobDispatchPrevention(t *testing.T) {
620644
env.runConsumer(5 * time.Second)
621645

622646
// Verify version created during replay
623-
ws = workspace.GetWorkspace(env.workspaceID)
647+
ws, err = workspace.GetWorkspaceAndLoad(env.workspaceID)
648+
if err != nil {
649+
t.Fatalf("Failed to get workspace: %v", err)
650+
}
624651
versions := ws.DeploymentVersions().Items()
625652

626653
versionFound := false

0 commit comments

Comments
 (0)