-
Notifications
You must be signed in to change notification settings - Fork 11
chore: smarter loading scheme #696
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
Warning Rate limit exceeded@adityachoudhari26 has exceeded the limit for the number of commits or files that can be reviewed per hour. Please wait 6 minutes and 39 seconds before requesting another review. ⌛ How to resolve this issue?After the wait time has elapsed, a review can be triggered using the We recommend that you space out your commits to avoid hitting the rate limit. 🚦 How do rate limits work?CodeRabbit enforces hourly rate limits for each developer per organization. Our paid plans have higher rate limits than the trial, open-source and free plans. In all cases, we re-allow further reviews after a brief timeout. Please see our FAQ for further information. 📒 Files selected for processing (5)
WalkthroughAdds per-message offset tracking and replay detection across Kafka consumer, event routing, and store; extends WorkspaceSnapshot with WorkspaceID and Offset; updates snapshot persistence and seeking logic; changes workspace assignment mapping and loader/store replay flags; introduces workspace.save handler and comprehensive replay E2E tests. Changes
Sequence Diagram(s)sequenceDiagram
participant C as Kafka Consumer
participant K as Consumer Loop
participant H as Event Listener
participant S as Workspace Store
participant DB as Snapshot DB
rect #f0f7ff
Note over K,DB: Startup — load snapshots & seek partitions
K->>DB: GetLatestWorkspaceSnapshots(workspaceIDs)
DB-->>K: map[workspaceID] -> WorkspaceSnapshot{WorkspaceID,Offset,...}
K->>C: setOffsets(partition -> earliestOffset)
end
loop each message
C->>K: deliver message (MessageOffset, value)
K->>K: extract lastSnapshot, compute OffsetTracker
K->>H: ListenAndRoute(ctx, msg, OffsetTracker)
alt MessageOffset ≤ LastCommittedOffset
H->>S: SetIsReplay(true)
alt MessageOffset ≤ LastWorkspaceOffset
H-->>H: return early (skip processing)
else
H->>H: apply replay-aware processing
end
else
H->>H: normal processing
end
opt workspace.save event
H->>DB: WriteWorkspaceSnapshot(snapshot with WorkspaceID & Offset)
end
end
Estimated code review effort🎯 4 (Complex) | ⏱️ ~45 minutes Possibly related PRs
Suggested reviewers
Poem
Pre-merge checks and finishing touches❌ Failed checks (1 warning)
✅ Passed checks (2 passed)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
📊 DB Package Test Coveragepkg/db coverage: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 8
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (3)
apps/workspace-engine/pkg/workspace/kafka/state.go (1)
32-51: Remove the stale WorkspaceIDDiscoverer type definition and preallocate the result map.The type
WorkspaceIDDiscovererat state.go:30 is defined but unused anywhere in the codebase—it references an outdated signature and creates confusion. Remove it entirely since the newGetAssignedWorkspaceIDsfunction is already called directly with the updated signature. Additionally, preallocate the result map withlen(assignedPartitions)for the minor performance benefit.- type WorkspaceIDDiscoverer func(ctx context.Context, targetPartition int32, numPartitions int32) ([]string, error) - func GetAssignedWorkspaceIDs(ctx context.Context, assignedPartitions []int32, numPartitions int32) (map[int32][]string, error) { workspaceIDs, err := db.GetAllWorkspaceIDs(ctx) if err != nil { return nil, err } assignedSet := make(map[int32]bool) for _, p := range assignedPartitions { assignedSet[p] = true } - result := make(map[int32][]string) + result := make(map[int32][]string, len(assignedPartitions))apps/workspace-engine/pkg/kafka/kafka.go (1)
189-206: Do not commit Kafka offsets when persisting the snapshot fails.Committing after a failed workspace.Save loses the ability to resume from this message, breaking recovery guarantees.
Apply this diff:
if err := workspace.Save(ctx, storage, ws, snapshot); err != nil { log.Error("Failed to save workspace", "workspaceID", ws.ID, "snapshotPath", snapshot.Path, "error", err) - } - - // Commit offset to Kafka - if _, err := consumer.CommitMessage(msg); err != nil { - log.Error("Failed to commit message", "error", err) - continue - } + // Don't advance the consumer; allow redelivery on restart. + continue + } + + // Commit offset to Kafka only after successful persistence. + if _, err := consumer.CommitMessage(msg); err != nil { + log.Error("Failed to commit message", "error", err) + continue + }apps/workspace-engine/pkg/events/handler/handler.go (1)
111-120: Avoid recording full event payloads in tracing attributes.
attribute.String("event.data", string(msg.Value))can leak PII/secrets and inflate spans.Apply this diff:
- ctx, span := tracer.Start(ctx, "ListenAndRoute", - trace.WithAttributes( - attribute.String("kafka.topic", *msg.TopicPartition.Topic), - attribute.Int("kafka.partition", int(msg.TopicPartition.Partition)), - attribute.Int64("kafka.offset", int64(msg.TopicPartition.Offset)), - attribute.String("event.data", string(msg.Value)), - )) + ctx, span := tracer.Start(ctx, "ListenAndRoute", + trace.WithAttributes( + attribute.String("kafka.topic", *msg.TopicPartition.Topic), + attribute.Int("kafka.partition", int(msg.TopicPartition.Partition)), + attribute.Int64("kafka.offset", int64(msg.TopicPartition.Offset)), + ))If payload visibility is needed, emit a truncated preview at DEBUG logs behind a guard.
🧹 Nitpick comments (9)
apps/workspace-engine/pkg/kafka/consumer.go (2)
54-60: Avoid scanning from beginning when no workspaces mapped to a partition.When workspaceIDs is empty, this will seek to beginning (via getEarliestOffset) and scan unrelated workspaces. Early-continue:
for partition, workspaceIDs := range partitionWorkspaceMap { + if len(workspaceIDs) == 0 { + continue + } snapshots, err := db.GetLatestWorkspaceSnapshots(ctx, workspaceIDs)
41-49: Redundant fallback check.You already return beginning when len(snapshots)==0. The equality check against MaxInt64 is unreachable; simplify:
- if earliestOffset == math.MaxInt64 { - return beginning - }apps/workspace-engine/pkg/workspace/store/store.go (1)
64-70: Exported methods lack doc comments (lint).Add brief doc comments to IsReplay and SetIsReplay to satisfy linters and clarify intent (ephemeral runtime flag, not persisted).
As per coding guidelines.
apps/workspace-engine/pkg/kafka/offset.go (1)
143-164: Helper looks good; consider returning kafka.Offset and documenting sentinel behavior.Returning kafka.Offset avoids repeated casting and clarifies use of OffsetBeginning/Invalid. Optional:
-func getCommittedOffset(consumer *kafka.Consumer, partition int32) (int64, error) { +func getCommittedOffset(consumer *kafka.Consumer, partition int32) (kafka.Offset, error) { @@ - return int64(kafka.OffsetInvalid), err + return kafka.OffsetInvalid, err @@ - return int64(kafka.OffsetBeginning), nil + return kafka.OffsetBeginning, nil @@ - return int64(committed[0].Offset), nil + return committed[0].Offset, nilIs this helper wired into the initial seek logic? If not, I can propose where to integrate it to reconcile committed vs snapshot offsets.
apps/workspace-engine/pkg/kafka/kafka.go (3)
12-12: Rename alias to match package for readability.Use a consistent alias (e.g., handler) to avoid confusion.
- eventHanlder "workspace-engine/pkg/events/handler" + handler "workspace-engine/pkg/events/handler"Also update references:
eventHanlder.RawEvent→handler.RawEvent,eventHanlder.OffsetTracker→handler.OffsetTracker.
151-152: Confirm and handle setOffsets errors if any.If setOffsets can fail, handle the error to avoid starting consumption with wrong positions.
Please confirm its signature; if it returns error, do:
- setOffsets(ctx, consumer, partitionWorkspaceMap) + if err := setOffsets(ctx, consumer, partitionWorkspaceMap); err != nil { + return fmt.Errorf("failed to set initial offsets: %w", err) + }
169-181: Harden replay detection when no committed offset exists and avoid double JSON decode cost.
- When getCommittedOffset errors, it returns
kafka.OffsetInvalid(offset.go:155). Although the caller skips the message (kafka.go:174), the code lacks defensive normalization. Explicitly handle invalid offsets (map to -1 or -2) in getCommittedOffset or at lines 169–181 to prevent misclassification if control flow changes.- Double JSON decode confirmed:
getLastWorkspaceOffset(kafka.go:38–44) unmarshalsmsg.Valueto extract offset, thenListenAndRoute(handler.go:124) unmarshals the same data again. Decode once and pass theRawEventor workspaceID to eliminate redundant work.apps/workspace-engine/pkg/events/handler/handler.go (1)
132-137: Use Int64 for event timestamp.Avoid float conversion of Unix seconds; use the native Int64 attribute.
- attribute.Float64("event.timestamp", float64(msg.Timestamp.Unix())), + attribute.Int64("event.timestamp_unix", msg.Timestamp.Unix()),apps/workspace-engine/pkg/db/workspaces.go (1)
100-102: Add an index to support “latest by offset” lookups.Operational follow-up: add
CREATE INDEX CONCURRENTLY IF NOT EXISTS idx_workspace_snapshot_latest ON workspace_snapshot (workspace_id, offset DESC);to keep queries fast at scale.Also applies to: 129-166
📜 Review details
Configuration used: Path: .coderabbit.yaml
Review profile: CHILL
Plan: Pro
Disabled knowledge base sources:
- Linear integration is disabled by default for public repositories
You can enable these sources in your CodeRabbit configuration.
📒 Files selected for processing (10)
apps/workspace-engine/pkg/db/workspaces.go(3 hunks)apps/workspace-engine/pkg/events/handler/handler.go(2 hunks)apps/workspace-engine/pkg/kafka/consumer.go(2 hunks)apps/workspace-engine/pkg/kafka/kafka.go(5 hunks)apps/workspace-engine/pkg/kafka/offset.go(1 hunks)apps/workspace-engine/pkg/workspace/kafka/state.go(2 hunks)apps/workspace-engine/pkg/workspace/loader.go(1 hunks)apps/workspace-engine/pkg/workspace/releasemanager/deployment/executor.go(1 hunks)apps/workspace-engine/pkg/workspace/store/store.go(2 hunks)apps/workspace-engine/test/integration/workspace.go(1 hunks)
🧰 Additional context used
📓 Path-based instructions (1)
apps/workspace-engine/**/*.go
📄 CodeRabbit inference engine (apps/workspace-engine/CLAUDE.md)
apps/workspace-engine/**/*.go: Do not add extraneous inline comments that state the obvious
Do not add comments that simply restate what the code does
Do not add comments for standard Go patterns (e.g., noting WaitGroup or semaphore usage)
Write comments that explain why, document complex logic/algorithms, provide non-obvious context, include TODO/FIXME, and document exported functions/types/methods
Files:
apps/workspace-engine/pkg/workspace/releasemanager/deployment/executor.goapps/workspace-engine/pkg/workspace/loader.goapps/workspace-engine/pkg/events/handler/handler.goapps/workspace-engine/pkg/kafka/offset.goapps/workspace-engine/test/integration/workspace.goapps/workspace-engine/pkg/workspace/kafka/state.goapps/workspace-engine/pkg/kafka/consumer.goapps/workspace-engine/pkg/db/workspaces.goapps/workspace-engine/pkg/workspace/store/store.goapps/workspace-engine/pkg/kafka/kafka.go
🧬 Code graph analysis (7)
apps/workspace-engine/pkg/workspace/loader.go (1)
apps/workspace-engine/pkg/db/workspaces.go (1)
WriteWorkspaceSnapshot(173-185)
apps/workspace-engine/pkg/events/handler/handler.go (2)
apps/workspace-engine/pkg/workspace/workspace.go (1)
Workspace(43-49)apps/workspace-engine/pkg/workspace/store/store.go (1)
Store(40-62)
apps/workspace-engine/pkg/kafka/offset.go (1)
apps/workspace-engine/pkg/kafka/kafka.go (1)
Topic(24-24)
apps/workspace-engine/test/integration/workspace.go (1)
apps/workspace-engine/pkg/events/handler/handler.go (1)
OffsetTracker(105-109)
apps/workspace-engine/pkg/kafka/consumer.go (2)
apps/workspace-engine/pkg/db/workspaces.go (2)
WorkspaceSnapshot(91-98)GetLatestWorkspaceSnapshots(129-166)apps/workspace-engine/pkg/kafka/kafka.go (1)
Topic(24-24)
apps/workspace-engine/pkg/db/workspaces.go (1)
apps/workspace-engine/pkg/db/client.go (1)
GetDB(60-70)
apps/workspace-engine/pkg/kafka/kafka.go (3)
apps/workspace-engine/pkg/events/handler/handler.go (2)
RawEvent(82-87)OffsetTracker(105-109)apps/workspace-engine/pkg/db/workspaces.go (2)
GetWorkspaceSnapshot(104-127)WorkspaceSnapshot(91-98)apps/workspace-engine/pkg/workspace/kafka/state.go (1)
GetAssignedWorkspaceIDs(32-52)
🔇 Additional comments (7)
apps/workspace-engine/pkg/workspace/loader.go (1)
39-43: Defensive check not required for current usage—only caller properly sets WorkspaceID.The only caller found (kafka.go, line 190) explicitly sets
snapshot.WorkspaceID = ws.IDbefore callingSave. Current code is correct and requires no changes. If additional callers are added in the future that omit WorkspaceID, defensive code could be added then, but there is no evidence of this risk in the current codebase.apps/workspace-engine/pkg/events/handler/handler.go (2)
105-109: OffsetTracker addition looks good.Fields are sufficient for replay and early-return checks.
158-164: Replay and early-return logic LGTM; ensure upstream offset guards are in place.With the partition/topology check in kafka.getLastWorkspaceOffset, this logic is safe and efficient.
Please confirm
ws.Store().SetIsReplayonly toggles in-memory behavior and doesn’t inadvertently skip persistence steps downstream.apps/workspace-engine/pkg/db/workspaces.go (4)
91-98: Struct changes look correct.WorkspaceID and Offset additions align with snapshot usage elsewhere.
112-119: Scan order matches query; good.No issues found.
168-171: INSERT query update LGTM.Includes offset; matches struct and readers.
173-185: WriteWorkspaceSnapshot change LGTM.Signature and argument order align with the new INSERT.
| func GetLatestWorkspaceSnapshots(ctx context.Context, workspaceIDs []string) (map[string]*WorkspaceSnapshot, error) { | ||
| db, err := GetDB(ctx) | ||
| if err != nil { | ||
| return nil, err | ||
| } | ||
| defer db.Release() | ||
|
|
||
| const query = ` | ||
| SELECT DISTINCT ON (workspace_id) workspace_id, path, timestamp, partition, num_partitions, offset | ||
| FROM workspace_snapshot | ||
| WHERE workspace_id = ANY($1) | ||
| ORDER BY workspace_id, timestamp DESC | ||
| ` | ||
| rows, err := db.Query(ctx, query, workspaceIDs) | ||
| if err != nil { | ||
| return nil, err | ||
| } | ||
| defer rows.Close() | ||
|
|
||
| var snapshots []*WorkspaceSnapshot | ||
| for rows.Next() { | ||
| var snapshot WorkspaceSnapshot | ||
| err := rows.Scan(&snapshot.WorkspaceID, &snapshot.Path, &snapshot.Timestamp, &snapshot.Partition, &snapshot.NumPartitions, &snapshot.Offset) | ||
| if err != nil { | ||
| return nil, err | ||
| } | ||
| snapshots = append(snapshots, &snapshot) | ||
| } | ||
| if err := rows.Err(); err != nil { | ||
| return nil, err | ||
| } | ||
|
|
||
| snapshotMap := make(map[string]*WorkspaceSnapshot) | ||
| for _, snapshot := range snapshots { | ||
| snapshotMap[snapshot.WorkspaceID] = snapshot | ||
| } | ||
| return snapshotMap, nil | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fix DISTINCT ON ordering to prefer highest offset; add fast-path for empty input.
Ordering by timestamp can return stale offsets; prefer offset DESC then timestamp.
Apply this diff:
-func GetLatestWorkspaceSnapshots(ctx context.Context, workspaceIDs []string) (map[string]*WorkspaceSnapshot, error) {
- db, err := GetDB(ctx)
+func GetLatestWorkspaceSnapshots(ctx context.Context, workspaceIDs []string) (map[string]*WorkspaceSnapshot, error) {
+ if len(workspaceIDs) == 0 {
+ return map[string]*WorkspaceSnapshot{}, nil
+ }
+ db, err := GetDB(ctx)
@@
- const query = `
- SELECT DISTINCT ON (workspace_id) workspace_id, path, timestamp, partition, num_partitions, offset
- FROM workspace_snapshot
- WHERE workspace_id = ANY($1)
- ORDER BY workspace_id, timestamp DESC
- `
+ const query = `
+ SELECT DISTINCT ON (workspace_id)
+ workspace_id, path, timestamp, partition, num_partitions, offset
+ FROM workspace_snapshot
+ WHERE workspace_id = ANY($1)
+ ORDER BY workspace_id, offset DESC, timestamp DESC
+ `📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| func GetLatestWorkspaceSnapshots(ctx context.Context, workspaceIDs []string) (map[string]*WorkspaceSnapshot, error) { | |
| db, err := GetDB(ctx) | |
| if err != nil { | |
| return nil, err | |
| } | |
| defer db.Release() | |
| const query = ` | |
| SELECT DISTINCT ON (workspace_id) workspace_id, path, timestamp, partition, num_partitions, offset | |
| FROM workspace_snapshot | |
| WHERE workspace_id = ANY($1) | |
| ORDER BY workspace_id, timestamp DESC | |
| ` | |
| rows, err := db.Query(ctx, query, workspaceIDs) | |
| if err != nil { | |
| return nil, err | |
| } | |
| defer rows.Close() | |
| var snapshots []*WorkspaceSnapshot | |
| for rows.Next() { | |
| var snapshot WorkspaceSnapshot | |
| err := rows.Scan(&snapshot.WorkspaceID, &snapshot.Path, &snapshot.Timestamp, &snapshot.Partition, &snapshot.NumPartitions, &snapshot.Offset) | |
| if err != nil { | |
| return nil, err | |
| } | |
| snapshots = append(snapshots, &snapshot) | |
| } | |
| if err := rows.Err(); err != nil { | |
| return nil, err | |
| } | |
| snapshotMap := make(map[string]*WorkspaceSnapshot) | |
| for _, snapshot := range snapshots { | |
| snapshotMap[snapshot.WorkspaceID] = snapshot | |
| } | |
| return snapshotMap, nil | |
| } | |
| func GetLatestWorkspaceSnapshots(ctx context.Context, workspaceIDs []string) (map[string]*WorkspaceSnapshot, error) { | |
| if len(workspaceIDs) == 0 { | |
| return map[string]*WorkspaceSnapshot{}, nil | |
| } | |
| db, err := GetDB(ctx) | |
| if err != nil { | |
| return nil, err | |
| } | |
| defer db.Release() | |
| const query = ` | |
| SELECT DISTINCT ON (workspace_id) | |
| workspace_id, path, timestamp, partition, num_partitions, offset | |
| FROM workspace_snapshot | |
| WHERE workspace_id = ANY($1) | |
| ORDER BY workspace_id, offset DESC, timestamp DESC | |
| ` | |
| rows, err := db.Query(ctx, query, workspaceIDs) | |
| if err != nil { | |
| return nil, err | |
| } | |
| defer rows.Close() | |
| var snapshots []*WorkspaceSnapshot | |
| for rows.Next() { | |
| var snapshot WorkspaceSnapshot | |
| err := rows.Scan(&snapshot.WorkspaceID, &snapshot.Path, &snapshot.Timestamp, &snapshot.Partition, &snapshot.NumPartitions, &snapshot.Offset) | |
| if err != nil { | |
| return nil, err | |
| } | |
| snapshots = append(snapshots, &snapshot) | |
| } | |
| if err := rows.Err(); err != nil { | |
| return nil, err | |
| } | |
| snapshotMap := make(map[string]*WorkspaceSnapshot) | |
| for _, snapshot := range snapshots { | |
| snapshotMap[snapshot.WorkspaceID] = snapshot | |
| } | |
| return snapshotMap, nil | |
| } |
🤖 Prompt for AI Agents
In apps/workspace-engine/pkg/db/workspaces.go around lines 129 to 166, add a
fast-path that returns an empty map and nil error when workspaceIDs is empty,
and change the SQL DISTINCT ON ordering to prefer highest offset first (then
newest timestamp) by ordering with workspace_id, offset DESC, timestamp DESC so
DISTINCT ON picks the row with the largest offset; keep the rest of the function
logic the same (query, scanning rows, and building snapshotMap).
| earliestOffset := getEarliestOffset(snapshots) | ||
| if err := consumer.Seek(kafka.TopicPartition{ | ||
| Topic: &Topic, | ||
| Partition: partition, | ||
| Offset: kafka.Offset(earliestOffset), | ||
| }, 0); err != nil { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🧩 Analysis chain
Potential off-by-one: seek to next offset after the latest processed snapshot.
If snapshot.Offset represents the last processed message, you should seek to snapshot.Offset+1 (unless using a sentinel like OffsetBeginning). Suggested change:
- earliestOffset := getEarliestOffset(snapshots)
- if err := consumer.Seek(kafka.TopicPartition{
+ earliestOffset := getEarliestOffset(snapshots)
+ effective := earliestOffset
+ if effective >= 0 {
+ effective = earliestOffset + 1
+ }
+ if err := consumer.Seek(kafka.TopicPartition{
Topic: &Topic,
Partition: partition,
- Offset: kafka.Offset(earliestOffset),
+ Offset: kafka.Offset(effective),
}, 0); err != nil {Please confirm the stored snapshot offset semantics (last applied vs next to consume). I can adjust the diff accordingly.
I'll need to examine the codebase to verify the snapshot offset semantics. Let me gather information about how offsets are handled.
---
🏁 Script executed:
```shell
#!/bin/bash
# View the consumer.go file around lines 61-66 to see context
cat apps/workspace-engine/pkg/kafka/consumer.go | head -80 | tail -30
Length of output: 846
🏁 Script executed:
#!/bin/bash
# Search for getEarliestOffset function definition
rg -n "getEarliestOffset" apps/workspace-engine/pkg/kafka/Length of output: 217
🏁 Script executed:
#!/bin/bash
# Search for snapshot struct/type definitions to understand offset semantics
rg -n -A5 "type.*[Ss]napshot" apps/workspace-engine/pkg/Length of output: 418
🏁 Script executed:
#!/bin/bash
# Search for how snapshots are populated or created
rg -n "snapshot.*Offset\s*=" apps/workspace-engine/pkg/ -C3Length of output: 0
🏁 Script executed:
#!/bin/bash
# Search for other Seek() calls to understand the pattern
rg -n "Seek\(" apps/workspace-engine/pkg/kafka/ -C2Length of output: 379
I need to examine the complete WorkspaceSnapshot struct definition and trace where the Offset field comes from.
---
🏁 Script executed:
```shell
#!/bin/bash
# View complete WorkspaceSnapshot struct definition
cat apps/workspace-engine/pkg/db/workspaces.go | grep -A20 "type WorkspaceSnapshot struct"
Length of output: 586
🏁 Script executed:
#!/bin/bash
# Search for Offset usage on snapshot objects
rg -n "snapshot\.Offset" apps/workspace-engine/pkg/Length of output: 580
🏁 Script executed:
#!/bin/bash
# View GetLatestWorkspaceSnapshots function implementation
rg -n -A30 "func.*GetLatestWorkspaceSnapshots" apps/workspace-engine/pkg/db/Length of output: 2359
🏁 Script executed:
#!/bin/bash
# Check if Offset is defined as a method on WorkspaceSnapshot
rg -n "func.*\(.*\*?WorkspaceSnapshot\).*Offset" apps/workspace-engine/pkg/Length of output: 0
I need to understand what offset value is stored when a snapshot is created. Let me examine where snapshots are inserted and how the offset is set.
---
🏁 Script executed:
```shell
#!/bin/bash
# Search for where snapshots are inserted into the database
rg -n "workspace_snapshot.*INSERT" apps/workspace-engine/pkg/ -C3
Length of output: 0
🏁 Script executed:
#!/bin/bash
# Find the constant for insert query
rg -n "WORKSPACE_SNAPSHOT_INSERT" apps/workspace-engine/pkg/db/workspaces.go -A2Length of output: 404
🏁 Script executed:
#!/bin/bash
# Search for where snapshot offset is set before insertion
rg -n -B10 "snapshot\.Offset\s*=" apps/workspace-engine/pkg/ | head -50Length of output: 0
🏁 Script executed:
#!/bin/bash
# Search for Kafka message processing to understand when snapshots are created
rg -n "WorkspaceSnapshot\{" apps/workspace-engine/pkg/ -C5Length of output: 1656
🏁 Script executed:
#!/bin/bash
# Look for any comments or documentation about offset semantics
rg -n -i "offset.*process|offset.*commi|last.*message" apps/workspace-engine/pkg/kafka/ -C2Length of output: 2024
I need to verify the beginning constant and confirm the off-by-one concern is valid.
---
🏁 Script executed:
```shell
#!/bin/bash
# View full getEarliestOffset function with beginning constant
rg -n -B5 "func getEarliestOffset" apps/workspace-engine/pkg/kafka/consumer.go -A15
Length of output: 495
🏁 Script executed:
#!/bin/bash
# Find where beginning constant is defined
rg -n "beginning\s*:=" apps/workspace-engine/pkg/kafka/consumer.go -B3 -A1Length of output: 161
🏁 Script executed:
#!/bin/bash
# View the entire consumer.go file to understand context of setOffsets function
cat apps/workspace-engine/pkg/kafka/consumer.go | head -100Length of output: 1965
Fix off-by-one error: seek to next offset after last processed snapshot.
Snapshot.Offset stores the offset of the last processed message (set from msg.TopicPartition.Offset in kafka.go). Seeking to that offset directly re-processes the same message. The suggested diff correctly applies offset + 1 for real offsets while preserving OffsetBeginning (-2) when no snapshots exist.
- earliestOffset := getEarliestOffset(snapshots)
- if err := consumer.Seek(kafka.TopicPartition{
+ earliestOffset := getEarliestOffset(snapshots)
+ effective := earliestOffset
+ if effective >= 0 {
+ effective = earliestOffset + 1
+ }
+ if err := consumer.Seek(kafka.TopicPartition{
Topic: &Topic,
Partition: partition,
- Offset: kafka.Offset(earliestOffset),
+ Offset: kafka.Offset(effective),
}, 0); err != nil {🤖 Prompt for AI Agents
In apps/workspace-engine/pkg/kafka/consumer.go around lines 61-66, there's an
off-by-one error: Snapshot.Offset is the last processed message offset so
seeking to it re-processes that message. Change the seek offset to
earliestOffset+1 for real offsets while preserving OffsetBeginning when
snapshots don't exist (i.e. if earliestOffset is kafka.OffsetBeginning/-2, use
it as-is); ensure proper type conversion to kafka.Offset when passing to
consumer.Seek.
| func getLastWorkspaceOffset(ctx context.Context, msg *kafka.Message) int64 { | ||
| beginning := int64(kafka.OffsetBeginning) | ||
| var rawEvent eventHanlder.RawEvent | ||
| if err := json.Unmarshal(msg.Value, &rawEvent); err != nil { | ||
| log.Error("Failed to unmarshal event", "error", err, "message", string(msg.Value)) | ||
| return beginning | ||
| } | ||
|
|
||
| lastSnapshot, err := db.GetWorkspaceSnapshot(ctx, rawEvent.WorkspaceID) | ||
| if err != nil { | ||
| log.Error("Failed to get workspace snapshot", "error", err) | ||
| return beginning | ||
| } | ||
|
|
||
| if lastSnapshot == nil { | ||
| return beginning | ||
| } | ||
|
|
||
| return lastSnapshot.Offset | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Guard against cross-partition/cross-topology bugs when deriving last workspace offset.
Current logic ignores snapshot.Partition and snapshot.NumPartitions. If partitions changed or the workspace moved, you can incorrectly skip messages. Also avoid logging full payloads on JSON errors.
Apply this diff:
-func getLastWorkspaceOffset(ctx context.Context, msg *kafka.Message) int64 {
+func getLastWorkspaceOffset(ctx context.Context, msg *kafka.Message, currentPartition int32, currentNumPartitions int32) int64 {
beginning := int64(kafka.OffsetBeginning)
- var rawEvent eventHanlder.RawEvent
+ var rawEvent handler.RawEvent
if err := json.Unmarshal(msg.Value, &rawEvent); err != nil {
- log.Error("Failed to unmarshal event", "error", err, "message", string(msg.Value))
+ log.Error("Failed to unmarshal event", "error", err)
return beginning
}
lastSnapshot, err := db.GetWorkspaceSnapshot(ctx, rawEvent.WorkspaceID)
if err != nil {
log.Error("Failed to get workspace snapshot", "error", err)
return beginning
}
if lastSnapshot == nil {
return beginning
}
-
- return lastSnapshot.Offset
+ // Only trust offsets taken from the same partition/topology.
+ if lastSnapshot.Partition != currentPartition || lastSnapshot.NumPartitions != currentNumPartitions {
+ return beginning
+ }
+ return lastSnapshot.Offset
}And update the call site accordingly (see below).
Committable suggestion skipped: line range outside the PR's diff.
| if e.store.IsReplay() { | ||
| log.Info("Skipping job dispatch in replay mode", "job.id", newJob.Id) | ||
| return nil | ||
| } | ||
|
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🧩 Analysis chain
Replay mode leaves a zombie Pending job; either avoid creating it or mark it non-runnable.
Right now we upsert a new Pending job then return without dispatch, which can linger indefinitely and confuse consumers/UIs. Either:
- Gate job creation on replay and return before creating it, or
- Mark the job as Cancelled/Skipped with updated timestamp before returning.
Example (option B):
if e.store.IsReplay() {
- log.Info("Skipping job dispatch in replay mode", "job.id", newJob.Id)
- return nil
+ newJob.Status = oapi.Cancelled // or a dedicated Skipped/Replay status if available
+ newJob.UpdatedAt = time.Now()
+ e.store.Jobs.Upsert(ctx, newJob)
+ log.Info("Skipping job dispatch in replay mode", "job.id", newJob.Id)
+ return nil
}Also consider adding a span event/attr to aid traces.
Would you prefer skipping job creation entirely in replay? If so, I can provide a diff moving the replay check before Step 3.
I'll verify this review comment by examining the relevant code and checking the concerns raised.
🏁 Script executed:
#!/bin/bash
# Examine the executor.go file with context around lines 63-67
cat apps/workspace-engine/pkg/workspace/releasemanager/deployment/executor.go | head -150Length of output: 3979
🏁 Script executed:
#!/bin/bash
# Find Job struct definition and status types
fd -t f '\.go$' apps/workspace-engine/pkg/workspace/releasemanager | xargs rg -A5 'type Job struct|Status.*=' | head -100Length of output: 4384
🏁 Script executed:
#!/bin/bash
# Search for oapi package and Cancelled/Skipped status definitions
fd -t f -e go | xargs rg 'Cancelled|type.*Status' | grep -i 'oapi\|status' | head -50Length of output: 1998
🏁 Script executed:
#!/bin/bash
# Look for other replay mode checks and patterns
rg -n 'IsReplay\(\)' apps/workspace-engine --type go -B3 -A3Length of output: 1151
🏁 Script executed:
#!/bin/bash
# Find where Jobs.Upsert is called to understand creation flow
rg -n 'Jobs\.Upsert\|NewJob\|newJob' apps/workspace-engine/pkg/workspace/releasemanager/deployment/executor.go -B2 -A2Length of output: 0
🏁 Script executed:
#!/bin/bash
# Look for Step definitions and understand job creation flow
rg -n 'Step.*=' apps/workspace-engine/pkg/workspace/releasemanager/deployment/executor.go | head -20Length of output: 0
🏁 Script executed:
#!/bin/bash
# Find full function containing the replay check
rg -n 'func.*\(' apps/workspace-engine/pkg/workspace/releasemanager/deployment/executor.go | head -10Length of output: 280
🏁 Script executed:
#!/bin/bash
# Look for job status constants in oapi
fd -t f -e go apps/workspace-engine | xargs rg -l 'type.*Status|Cancelled|Skipped' | head -5Length of output: 666
🏁 Script executed:
#!/bin/bash
# Get broader view of the file structure
wc -l apps/workspace-engine/pkg/workspace/releasemanager/deployment/executor.go && head -200 apps/workspace-engine/pkg/workspace/releasemanager/deployment/executor.goLength of output: 4057
🏁 Script executed:
#!/bin/bash
# Search for Job type in oapi or workflow packages
fd -t f -e go apps/workspace-engine | xargs rg -B5 -A10 'type Job struct' | grep -A10 'Status'Length of output: 487
🏁 Script executed:
#!/bin/bash
# Look for any dispatch-related functions to understand the flow
rg -n 'dispatch|Dispatch' apps/workspace-engine/pkg/workspace/releasemanager/deployment/executor.go -B1 -A1Length of output: 609
🏁 Script executed:
#!/bin/bash
# Check imports to understand Job and oapi references
head -50 apps/workspace-engine/pkg/workspace/releasemanager/deployment/executor.go | grep 'import\|^import'Length of output: 8
🏁 Script executed:
#!/bin/bash
# Look for where newJob is created/instantiated
rg -n 'newJob\s*:=|newJob\s*=|&.*Job{' apps/workspace-engine/pkg/workspace/releasemanager/deployment/executor.go -B2 -A2Length of output: 123
🏁 Script executed:
#!/bin/bash
# Search for available Job statuses
rg -rn 'Status\s*=' apps/workspace-engine/pkg/workflow --type go | grep -i job | head -20Length of output: 74
🏁 Script executed:
#!/bin/bash
# Look at what happens after the replay check and before return
sed -n '50,120p' apps/workspace-engine/pkg/workspace/releasemanager/deployment/executor.goLength of output: 2224
🏁 Script executed:
#!/bin/bash
# Find all functions in executor.go to understand structure
rg -n '^func' apps/workspace-engine/pkg/workspace/releasemanager/deployment/executor.goLength of output: 264
🏁 Script executed:
#!/bin/bash
# Check for Job status type definition
fd -t f -e go apps/workspace-engine/pkg | xargs rg -B2 -A15 'type Job struct' | head -80Length of output: 1960
🏁 Script executed:
#!/bin/bash
# Look at complete context around lines 40-80 in executor
sed -n '40,80p' apps/workspace-engine/pkg/workspace/releasemanager/deployment/executor.goLength of output: 1162
🏁 Script executed:
#!/bin/bash
# Search for existing "Cancelled" or "Skipped" job status usage
rg -n 'Cancelled|Skipped' apps/workspace-engine/pkg --type go -B1 -A1 | head -40Length of output: 3879
🏁 Script executed:
#!/bin/bash
# Look for the function signature and early returns
sed -n '1,100p' apps/workspace-engine/pkg/workspace/releasemanager/deployment/executor.go | tail -60Length of output: 1748
🏁 Script executed:
#!/bin/bash
# Check time package imports for time.Now() availability
rg -n 'import.*time' apps/workspace-engine/pkg/workspace/releasemanager/deployment/executor.goLength of output: 0
🏁 Script executed:
#!/bin/bash
# Find existing UpdatedAt assignments to understand the pattern
rg -n 'UpdatedAt' apps/workspace-engine/pkg/workspace/releasemanager/deployment/executor.go -B1 -A1Length of output: 226
🏁 Script executed:
#!/bin/bash
# Search for how other statuses are set on Job objects
rg -n 'Job.*Status\s*=' apps/workspace-engine/pkg/workspace/releasemanager/deployment --type go | head -15Length of output: 119
🏁 Script executed:
#!/bin/bash
# Look at the entire function containing replay check
rg -B50 -A15 'IsReplay\(\)' apps/workspace-engine/pkg/workspace/releasemanager/deployment/executor.go | head -100Length of output: 2070
🏁 Script executed:
#!/bin/bash
# Check what Step 3 refers to - look for numbered steps or comments
rg -n 'Step' apps/workspace-engine/pkg/workspace/releasemanager/deployment/executor.go | head -20Length of output: 212
🏁 Script executed:
#!/bin/bash
# Look for job creation before replay check
rg -B30 'IsReplay\(\)' apps/workspace-engine/pkg/workspace/releasemanager/deployment/executor.go | tail -40Length of output: 1094
🏁 Script executed:
#!/bin/bash
# Verify oapi package location and what it exports
find apps/workspace-engine -name "oapi*" -o -name "*oapi*" | head -10Length of output: 139
🏁 Script executed:
#!/bin/bash
# Check Job struct and available status values
fd -t f -e go apps/workspace-engine/pkg | xargs grep -l 'type Job' | head -5Length of output: 405
🏁 Script executed:
#!/bin/bash
# Look at a wider range to understand where job is created
sed -n '1,120p' apps/workspace-engine/pkg/workspace/releasemanager/deployment/executor.goLength of output: 3713
🏁 Script executed:
#!/bin/bash
# Search for all occurrences of job creation in the function
rg -n 'Job{|NewJob' apps/workspace-engine/pkg/workspace/releasemanager/deployment/executor.go -B3 -A1Length of output: 97
🏁 Script executed:
#!/bin/bash
# Get line count and examine entire relevant section
lines=$(wc -l < apps/workspace-engine/pkg/workspace/releasemanager/deployment/executor.go) && echo "Total lines: $lines" && sed -n '50,150p' apps/workspace-engine/pkg/workspace/releasemanager/deployment/executor.goLength of output: 2507
Mark Pending job as non-runnable in replay mode or skip creation entirely.
The issue is confirmed: the job is created and persisted at line 57 before the replay check at line 63, leaving a zombie Pending job in the store. Either approach works:
- Option A: Move replay check before Step 3 (before job creation)
- Option B: Mark job with
oapi.Cancelledoroapi.Skippedand update timestamp before returning (pattern already used inCancelOutdatedJobsat lines 90–92)
Both oapi.Cancelled and oapi.Skipped are available in your JobStatus enum. Choose whichever semantically fits replay semantics better. Option B requires less refactoring but adds a spurious upsert; Option A is cleaner architecturally but requires moving logic before the factory call.
🤖 Prompt for AI Agents
In apps/workspace-engine/pkg/workspace/releasemanager/deployment/executor.go
around lines 57 and 63–67, a job is being created and persisted at line 57
before the replay-mode check at line 63, leaving a zombie Pending job; fix by
either (A) moving the e.store.IsReplay() check to before the job
factory/creation step so no job is created when replaying, or (B) after
detecting replay (current location) immediately mark the just-created job as
non-runnable by setting its status to oapi.Skipped (or oapi.Cancelled if that
fits semantics) and updating its timestamp, then upsert/update the job (follow
the same pattern used in CancelOutdatedJobs at lines 90–92) before returning.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is actually okay. in replay mode we still accept job updates, so if a job is still pending after replay mode is over, it genuinely is still pending as it means no updates happened in replay.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@adityachoudhari26, thank you for clarifying! That makes sense—since job updates are processed during replay, the Pending status accurately reflects the job's actual state (no updates received yet). The design is sound.
✏️ Learnings added
Learnt from: adityachoudhari26
PR: ctrlplanedev/ctrlplane#696
File: apps/workspace-engine/pkg/workspace/releasemanager/deployment/executor.go:63-67
Timestamp: 2025-10-24T00:02:29.695Z
Learning: In workspace-engine replay mode (apps/workspace-engine/pkg/workspace/releasemanager/deployment/executor.go), jobs are created with Pending status even during replay because job updates are still accepted and processed. If a job remains Pending after replay completes, it genuinely is still pending because no updates occurred during replay.
Note: Learnings are effective only in the context of similar code segments. To apply general coding guidelines, please configure review instructions. You can manage existing learnings in the app.
📊 DB Package Test Coveragepkg/db coverage: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
♻️ Duplicate comments (3)
apps/workspace-engine/pkg/kafka/kafka.go (3)
64-72: Blocker: derive last workspace offset only when snapshot topology matches current partition/topology.Without checking snapshot.Partition and snapshot.NumPartitions, you can incorrectly skip or replay messages after partition changes or workspace rebalancing. Add guards and extend the helper signature.
Apply this diff:
-func getLastWorkspaceOffset(snapshot *db.WorkspaceSnapshot) int64 { +func getLastWorkspaceOffset(snapshot *db.WorkspaceSnapshot, currentPartition int32, currentNumPartitions int32) int64 { beginning := int64(kafka.OffsetBeginning) if snapshot == nil { return beginning } - - return snapshot.Offset + // Only trust offsets from the same partition/topology. + if snapshot.Partition != currentPartition || snapshot.NumPartitions != currentNumPartitions { + return beginning + } + return snapshot.Offset }
190-197: Wire partition/topology into last-offset helper.Update the call to pass the message’s partition and current topology.
- lastWorkspaceOffset := getLastWorkspaceOffset(lastSnapshot) + lastWorkspaceOffset := getLastWorkspaceOffset( + lastSnapshot, + msg.TopicPartition.Partition, + numPartitions, + )
55-59: Do not log raw payloads on JSON errors; log metadata instead.Current log includes the entire message body, risking PII leakage and noisy logs. Log partition/offset/size instead.
- if err := json.Unmarshal(msg.Value, &rawEvent); err != nil { - log.Error("Failed to unmarshal event", "error", err, "message", string(msg.Value)) - return nil, fmt.Errorf("failed to unmarshal event: %w", err) - } + if err := json.Unmarshal(msg.Value, &rawEvent); err != nil { + log.Error("Failed to unmarshal event", + "error", err, + "partition", msg.TopicPartition.Partition, + "offset", msg.TopicPartition.Offset, + "value_bytes", len(msg.Value), + ) + return nil, fmt.Errorf("failed to unmarshal event: %w", err) + }
🧹 Nitpick comments (4)
apps/workspace-engine/pkg/kafka/kafka.go (4)
184-197: Avoid per-message DB lookups for last snapshot (add small cache).getLastSnapshot hits DB for every message. Cache last snapshot metadata per workspace (seed from setOffsets; update on Save) to cut DB load and latency. Keep a short TTL or invalidate on partition revocation.
Would you like a follow-up patch with a simple LRU map keyed by workspaceID?
210-219: Snapshot gating by message time can starve snapshots during replay.Using msg.Timestamp to gate snapshots may delay or prevent saves when consuming old/backfilled events. Consider wall-clock gating.
Proposed change:
- shouldSaveSnapshot := lastSnapshot == nil || lastSnapshot.Timestamp.Before(msg.Timestamp.Add(-time.Duration(MinSnapshotDistance)*time.Minute)) + window := time.Duration(MinSnapshotDistance) * time.Minute + shouldSaveSnapshot := lastSnapshot == nil || time.Since(lastSnapshot.Timestamp) >= windowIf event-time gating is intentional, please confirm and consider using a delta comparison:
- Compute msg.Timestamp.Sub(lastSnapshot.Timestamp) >= window to avoid boundary ambiguity.
40-52: Environment parsing ergonomics.Optional: parse durations directly and store MinSnapshotDistance as time.Duration to avoid unit conversions and overflow traps.
Example:
- MinSnapshotDistance = getEnvInt("SNAPSHOT_DISTANCE_MINUTES", 60) + MinSnapshotDistance = getEnvDuration("SNAPSHOT_DISTANCE", time.Hour)Helper:
func getEnvDuration(varName string, def time.Duration) time.Duration { v := os.Getenv(varName) if v == "" { return def } d, err := time.ParseDuration(v) // e.g., "60m", "1h" if err != nil { log.Warn("Invalid duration, using default", "var", varName, "value", v, "default", def) return def } return d }
13-13: Alias typo: prefer a clear, conventional name.Rename eventHanlder alias to handler for readability; update references.
- eventHanlder "workspace-engine/pkg/events/handler" + handler "workspace-engine/pkg/events/handler"Then update usages, e.g.:
var rawEvent handler.RawEvent offsetTracker := handler.OffsetTracker{ ... }
📜 Review details
Configuration used: Path: .coderabbit.yaml
Review profile: CHILL
Plan: Pro
Disabled knowledge base sources:
- Linear integration is disabled by default for public repositories
You can enable these sources in your CodeRabbit configuration.
📒 Files selected for processing (1)
apps/workspace-engine/pkg/kafka/kafka.go(5 hunks)
🧰 Additional context used
📓 Path-based instructions (1)
apps/workspace-engine/**/*.go
📄 CodeRabbit inference engine (apps/workspace-engine/CLAUDE.md)
apps/workspace-engine/**/*.go: Do not add extraneous inline comments that state the obvious
Do not add comments that simply restate what the code does
Do not add comments for standard Go patterns (e.g., noting WaitGroup or semaphore usage)
Write comments that explain why, document complex logic/algorithms, provide non-obvious context, include TODO/FIXME, and document exported functions/types/methods
Files:
apps/workspace-engine/pkg/kafka/kafka.go
🧬 Code graph analysis (1)
apps/workspace-engine/pkg/kafka/kafka.go (4)
apps/workspace-engine/pkg/db/workspaces.go (2)
WorkspaceSnapshot(91-98)GetWorkspaceSnapshot(104-127)apps/workspace-engine/pkg/events/handler/handler.go (2)
RawEvent(82-87)OffsetTracker(105-109)apps/workspace-engine/pkg/workspace/kafka/state.go (1)
GetAssignedWorkspaceIDs(32-52)apps/workspace-engine/pkg/workspace/loader.go (1)
Save(16-46)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (4)
- GitHub Check: workspace-engine-tests
- GitHub Check: build (linux/amd64)
- GitHub Check: Typecheck
- GitHub Check: Lint
🔇 Additional comments (1)
apps/workspace-engine/pkg/kafka/kafka.go (1)
191-195: getCommittedOffset is correctly topic-aware and per-partition.The function at offset.go:144 creates a TopicPartition with
Topic: &Topic(where Topic is the KAFKA_TOPIC environment variable set in kafka.go:25) and the providedpartitionparameter, then callsconsumer.Committed()to retrieve the committed offset for that specific topic and partition within the consumer group. The implementation is correct.
📊 DB Package Test Coveragepkg/db coverage: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 3
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
apps/workspace-engine/pkg/workspace/store/store.go (1)
84-127: Critical: GobDecode has unprotected concurrent access to Store fields.The pipeline failures confirm data races during deserialization. GobDecode mutates multiple Store fields (lines 100–124) without synchronization. If other goroutines access the Store concurrently—either reading fields or calling methods—races occur.
Consider one of these approaches:
Mutex protection: Add a
sync.RWMutexto Store; lock exclusively in GobDecode and lock for reading in all accessor methods (higher contention, simpler reasoning).Lifecycle constraint: Ensure the workspace is not used by any goroutine until
Load()completes. Document this invariant and verify test setup enforces it (no processing of events until post-load).Copy-on-write: Deserialize into a new Store instance, then atomically swap a pointer once deserialization completes (minimal contention, more complex).
Given the "ConcurrentWrites" test name in the pipeline failures, option 1 (mutex) or option 3 (atomic pointer swap) are most appropriate for allowing concurrent operations post-load.
Apply this diff if choosing option 1:
+import ( + "bytes" + "encoding/gob" + "sync" + "sync/atomic" + "workspace-engine/pkg/workspace/store/repository" +) + type Store struct { repo *repository.Repository + mu sync.RWMutex Policies *Policies ... isReplay atomic.Bool } func (s *Store) GobDecode(data []byte) error { + s.mu.Lock() + defer s.mu.Unlock() + // Create reader from provided data buf := bytes.NewReader(data) ...Then add
s.mu.RLock(); defer s.mu.RUnlock()to all public accessor methods that read Store fields.
♻️ Duplicate comments (2)
apps/workspace-engine/pkg/db/workspaces.go (1)
149-154: Add timestamp tie-breaker for deterministic DISTINCT ON.Use offset DESC, then timestamp DESC:
- ORDER BY workspace_id, offset DESC + ORDER BY workspace_id, offset DESC, timestamp DESCOperational note: consider an index on (workspace_id, offset DESC, timestamp DESC) to keep this query fast.
apps/workspace-engine/pkg/kafka/consumer.go (1)
61-65: Off-by-one for offset 0; use >= 0 (preserve OffsetBeginning).Use +1 when the stored snapshot offset is any real offset (>= 0):
- effectiveOffset := earliestOffset - if effectiveOffset > 0 { - effectiveOffset = effectiveOffset + 1 - } + effectiveOffset := earliestOffset + if effectiveOffset >= 0 { + effectiveOffset++ + }
🧹 Nitpick comments (1)
apps/workspace-engine/pkg/db/workspaces.go (1)
138-141: Prefer empty map over nil for empty input.Reduces nil checks downstream and allows safe writes to the returned map.
Apply:
- if len(workspaceIDs) == 0 { - return nil, nil - } + if len(workspaceIDs) == 0 { + return map[string]*WorkspaceSnapshot{}, nil + }
📜 Review details
Configuration used: Path: .coderabbit.yaml
Review profile: CHILL
Plan: Pro
Disabled knowledge base sources:
- Linear integration is disabled by default for public repositories
You can enable these sources in your CodeRabbit configuration.
📒 Files selected for processing (4)
apps/workspace-engine/pkg/db/workspaces.go(3 hunks)apps/workspace-engine/pkg/kafka/consumer.go(2 hunks)apps/workspace-engine/pkg/workspace/store/store.go(3 hunks)apps/workspace-engine/test/integration/workspace.go(2 hunks)
🚧 Files skipped from review as they are similar to previous changes (1)
- apps/workspace-engine/test/integration/workspace.go
🧰 Additional context used
📓 Path-based instructions (1)
apps/workspace-engine/**/*.go
📄 CodeRabbit inference engine (apps/workspace-engine/CLAUDE.md)
apps/workspace-engine/**/*.go: Do not add extraneous inline comments that state the obvious
Do not add comments that simply restate what the code does
Do not add comments for standard Go patterns (e.g., noting WaitGroup or semaphore usage)
Write comments that explain why, document complex logic/algorithms, provide non-obvious context, include TODO/FIXME, and document exported functions/types/methods
Files:
apps/workspace-engine/pkg/kafka/consumer.goapps/workspace-engine/pkg/workspace/store/store.goapps/workspace-engine/pkg/db/workspaces.go
🧬 Code graph analysis (3)
apps/workspace-engine/pkg/kafka/consumer.go (2)
apps/workspace-engine/pkg/db/workspaces.go (2)
WorkspaceSnapshot(91-98)GetLatestWorkspaceSnapshots(138-179)apps/workspace-engine/pkg/kafka/kafka.go (1)
Topic(25-25)
apps/workspace-engine/pkg/workspace/store/store.go (1)
apps/workspace-engine/pkg/workspace/loader.go (1)
Load(48-81)
apps/workspace-engine/pkg/db/workspaces.go (1)
apps/workspace-engine/pkg/db/client.go (1)
GetDB(60-70)
🪛 GitHub Actions: Go Tests
apps/workspace-engine/pkg/workspace/store/store.go
[error] 100-100: DATA RACE: GobDecode() concurrently accessed during TestEngine_Persistence_EmptyWorkspace
[error] 100-100: DATA RACE: GobDecode() concurrently accessed during TestEngine_Persistence_MultipleResources
[error] 100-100: DATA RACE: GobDecode() concurrently accessed during TestEngine_Persistence_ConcurrentWrites
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (4)
- GitHub Check: Typecheck
- GitHub Check: Lint
- GitHub Check: build (linux/amd64)
- GitHub Check: workspace-engine-tests
🔇 Additional comments (8)
apps/workspace-engine/pkg/workspace/store/store.go (2)
6-6: Atomic import and initialization correctly implemented.The addition of
sync/atomicand the initialization ofisReplayusing.Store(false)properly addresses the data race concern flagged in the previous review.Also applies to: 15-16
59-69: Atomic accessors correctly eliminate the data race.The
atomic.Boolfield and accessor methods properly synchronize concurrent reads and writes toisReplay, resolving the data race betweenhandler.go(writes viaSetIsReplay) andexecutor.go(reads viaIsReplaybefore spawning goroutines).apps/workspace-engine/pkg/db/workspaces.go (6)
92-92: Struct extensions look good.WorkspaceID and Offset additions align with snapshot/offset tracking.
Also applies to: 97-97
101-111: Selecting latest by offset is correct.Ordering by offset DESC ensures newest snapshot by Kafka position.
121-128: Scan mapping matches SELECT columns.
161-179: Row scan and map materialization look correct.
182-184: INSERT adds offset column correctly.
186-197: WriteWorkspaceSnapshot signature and Exec args align.Also applies to: 193-193
| func setOffsets(ctx context.Context, consumer *kafka.Consumer, partitionWorkspaceMap map[int32][]string) { | ||
| for partition, workspaceIDs := range partitionWorkspaceMap { | ||
| snapshots, err := db.GetLatestWorkspaceSnapshots(ctx, workspaceIDs) | ||
| if err != nil { | ||
| log.Error("Failed to get latest workspace snapshots", "error", err) | ||
| continue | ||
| } | ||
|
|
||
| earliestOffset := getEarliestOffset(snapshots) | ||
| effectiveOffset := earliestOffset | ||
| if effectiveOffset > 0 { | ||
| effectiveOffset = effectiveOffset + 1 | ||
| } | ||
| if err := consumer.Seek(kafka.TopicPartition{ | ||
| Topic: &Topic, | ||
| Partition: partition, | ||
| Offset: kafka.Offset(effectiveOffset), | ||
| }, 0); err != nil { | ||
| log.Error("Failed to seek to earliest offset", "error", err) | ||
| continue | ||
| } | ||
| } | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Partition mismatch risk: filter snapshots by current partition before computing earliest offset.
A workspace’s latest snapshot might be from a different partition (e.g., after partition-count changes). Using that offset to seek this partition is incorrect.
Apply this diff:
- snapshots, err := db.GetLatestWorkspaceSnapshots(ctx, workspaceIDs)
+ snapshots, err := db.GetLatestWorkspaceSnapshots(ctx, workspaceIDs)
if err != nil {
log.Error("Failed to get latest workspace snapshots", "error", err)
continue
}
- earliestOffset := getEarliestOffset(snapshots)
+ // Only consider snapshots that belong to this partition.
+ filtered := make(map[string]*db.WorkspaceSnapshot, len(snapshots))
+ for wid, s := range snapshots {
+ if s != nil && s.Partition == partition {
+ filtered[wid] = s
+ }
+ }
+ earliestOffset := getEarliestOffset(filtered)
effectiveOffset := earliestOffset
- if effectiveOffset > 0 {
- effectiveOffset = effectiveOffset + 1
+ if effectiveOffset >= 0 {
+ effectiveOffset++
}
if err := consumer.Seek(kafka.TopicPartition{
Topic: &Topic,
Partition: partition,
Offset: kafka.Offset(effectiveOffset),
}, 0); err != nil {
- log.Error("Failed to seek to earliest offset", "error", err)
+ log.Error("Failed to seek", "partition", partition, "targetOffset", effectiveOffset, "error", err)
continue
}🤖 Prompt for AI Agents
In apps/workspace-engine/pkg/kafka/consumer.go around lines 53 to 75, the code
calls db.GetLatestWorkspaceSnapshots for a set of workspace IDs then computes an
earliestOffset from all returned snapshots but doesn't restrict them to the
current Kafka partition; this can produce an incorrect offset when snapshots
belong to other partitions. Fix by filtering the snapshots returned by
GetLatestWorkspaceSnapshots to only those whose Partition equals the current
partition variable before calling getEarliestOffset; if the filtered slice is
empty, log/continue and skip seeking for that partition, otherwise compute
earliestOffset from the filtered snapshots and proceed with the existing
effectiveOffset/Seek logic.
📊 DB Package Test Coveragepkg/db coverage: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
♻️ Duplicate comments (2)
apps/workspace-engine/pkg/kafka/consumer.go (2)
67-69: Fix condition: offset 0 is valid and should be incremented.The condition
> 0incorrectly treats offset 0 (the first message in a partition) as a sentinel. This causes re-processing of message 0.Apply this diff:
- if effectiveOffset > 0 { + if effectiveOffset >= 0 { effectiveOffset = effectiveOffset + 1 }
59-65: Filter snapshots by partition to avoid incorrect offsets.
GetLatestWorkspaceSnapshotsreturns the latest snapshot per workspace globally, but a workspace's latest snapshot might belong to a different partition (e.g., after partition reassignment or rebalancing). Using that offset to seek the current partition is incorrect and can cause data corruption or missed messages.Apply this diff:
snapshots, err := db.GetLatestWorkspaceSnapshots(ctx, workspaceIDs) if err != nil { log.Error("Failed to get latest workspace snapshots", "error", err) continue } + // Filter to only snapshots from this partition + filtered := make(map[string]*db.WorkspaceSnapshot) + for wid, s := range snapshots { + if s != nil && s.Partition == partition { + filtered[wid] = s + } + } + - earliestOffset := getEarliestOffset(snapshots) + earliestOffset := getEarliestOffset(filtered)
🧹 Nitpick comments (1)
apps/workspace-engine/pkg/db/workspaces.go (1)
140-142: Return empty map instead of nil for consistency.Returning
nilfor empty input is inconsistent with Go conventions. Return an empty map to simplify caller logic.Apply this diff:
if len(workspaceIDs) == 0 { - return nil, nil + return map[string]*WorkspaceSnapshot{}, nil }
📜 Review details
Configuration used: Path: .coderabbit.yaml
Review profile: CHILL
Plan: Pro
Disabled knowledge base sources:
- Linear integration is disabled by default for public repositories
You can enable these sources in your CodeRabbit configuration.
📒 Files selected for processing (2)
apps/workspace-engine/pkg/db/workspaces.go(3 hunks)apps/workspace-engine/pkg/kafka/consumer.go(2 hunks)
🧰 Additional context used
📓 Path-based instructions (1)
apps/workspace-engine/**/*.go
📄 CodeRabbit inference engine (apps/workspace-engine/CLAUDE.md)
apps/workspace-engine/**/*.go: Do not add extraneous inline comments that state the obvious
Do not add comments that simply restate what the code does
Do not add comments for standard Go patterns (e.g., noting WaitGroup or semaphore usage)
Write comments that explain why, document complex logic/algorithms, provide non-obvious context, include TODO/FIXME, and document exported functions/types/methods
Files:
apps/workspace-engine/pkg/kafka/consumer.goapps/workspace-engine/pkg/db/workspaces.go
🧬 Code graph analysis (2)
apps/workspace-engine/pkg/kafka/consumer.go (2)
apps/workspace-engine/pkg/db/workspaces.go (2)
WorkspaceSnapshot(91-98)GetLatestWorkspaceSnapshots(139-180)apps/workspace-engine/pkg/kafka/kafka.go (1)
Topic(25-25)
apps/workspace-engine/pkg/db/workspaces.go (1)
apps/workspace-engine/pkg/db/client.go (1)
GetDB(60-70)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (5)
- GitHub Check: workspace-engine-tests
- GitHub Check: build (linux/amd64)
- GitHub Check: Typecheck
- GitHub Check: workspace-engine-tests
- GitHub Check: Lint
🔇 Additional comments (6)
apps/workspace-engine/pkg/db/workspaces.go (4)
92-92: LGTM: WorkspaceSnapshot extended with offset tracking.The new fields support offset-aware snapshot management across the Kafka consumer flow.
Also applies to: 97-97
100-137: LGTM: Query and scan logic correctly updated for offset tracking.The ordering by offset DESC ensures the latest snapshot is selected, and the scan matches the query columns.
182-185: LGTM: INSERT query correctly includes offset column.
187-199: LGTM: Cleaner signature and correct implementation.The refactored signature is cleaner since WorkspaceID is now part of the snapshot struct.
apps/workspace-engine/pkg/kafka/consumer.go (2)
3-9: LGTM: Imports added for new offset management functionality.
34-55: LGTM: Correct implementation of minimum offset computation.The "has" flag pattern correctly handles empty maps and nil snapshots, returning the Kafka sentinel when no valid offsets exist.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 2
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
apps/workspace-engine/pkg/events/handler/handler.go (1)
114-130: Remove full payload from traces/logs to avoid PII and reduce cardinality.Storing event.data and logging the full message on error can leak sensitive data.
ctx, span := tracer.Start(ctx, "ListenAndRoute", trace.WithAttributes( attribute.String("kafka.topic", *msg.TopicPartition.Topic), attribute.Int("kafka.partition", int(msg.TopicPartition.Partition)), attribute.Int64("kafka.offset", int64(msg.TopicPartition.Offset)), - attribute.String("event.data", string(msg.Value)), )) @@ if err := json.Unmarshal(msg.Value, &rawEvent); err != nil { span.RecordError(err) span.SetStatus(codes.Error, "failed to unmarshal event") - log.Error("Failed to unmarshal event", "error", err, "message", string(msg.Value)) + log.Error("Failed to unmarshal event", "error", err, + "topic", *msg.TopicPartition.Topic, + "partition", msg.TopicPartition.Partition, + "offset", msg.TopicPartition.Offset) return nil, fmt.Errorf("failed to unmarshal event: %w", err) }Optionally add span attributes for event.type and workspaceId only (already present).
♻️ Duplicate comments (1)
apps/workspace-engine/pkg/db/workspaces.go (1)
139-143: Return empty map on empty input to avoid nil-map surprises.Returning nil can force callers to add nil checks; an empty map is safer and conventional.
func GetLatestWorkspaceSnapshots(ctx context.Context, workspaceIDs []string) (map[string]*WorkspaceSnapshot, error) { - if len(workspaceIDs) == 0 { - return nil, nil - } + if len(workspaceIDs) == 0 { + return map[string]*WorkspaceSnapshot{}, nil + }
🧹 Nitpick comments (7)
apps/workspace-engine/pkg/db/workspaces.go (2)
100-112: Selecting latest snapshot by offset is correct.Ordering by "offset" DESC (then timestamp) fixes the “latest by timestamp” pitfall and ensures monotonic resume. Good choice to quote "offset".
For performance, consider an index like: CREATE INDEX CONCURRENTLY IF NOT EXISTS idx_ws_snapshot_ws_offset ON workspace_snapshot(workspace_id, "offset" DESC, timestamp DESC);
162-180: Avoid intermediate slice; scan directly into the map.Removes an extra allocation and simplifies control flow.
- var snapshots []*WorkspaceSnapshot - for rows.Next() { - var snapshot WorkspaceSnapshot - err := rows.Scan(&snapshot.WorkspaceID, &snapshot.Path, &snapshot.Timestamp, &snapshot.Partition, &snapshot.NumPartitions, &snapshot.Offset) - if err != nil { - return nil, err - } - snapshots = append(snapshots, &snapshot) - } - if err := rows.Err(); err != nil { - return nil, err - } - - snapshotMap := make(map[string]*WorkspaceSnapshot) - for _, snapshot := range snapshots { - snapshotMap[snapshot.WorkspaceID] = snapshot - } - return snapshotMap, nil + snapshotMap := make(map[string]*WorkspaceSnapshot) + for rows.Next() { + var s WorkspaceSnapshot + if err := rows.Scan(&s.WorkspaceID, &s.Path, &s.Timestamp, &s.Partition, &s.NumPartitions, &s.Offset); err != nil { + return nil, err + } + snapshotMap[s.WorkspaceID] = &s + } + if err := rows.Err(); err != nil { + return nil, err + } + return snapshotMap, nilapps/workspace-engine/pkg/events/handler/workspacesave/workspacesave.go (1)
16-23: Avoid double JSON decode of the same message.IsWorkspaceSaveEvent unmarshals msg.Value, and ListenAndRoute also unmarshals. Prefer parsing once and reusing the result to reduce CPU/allocs.
- Option A: Expose a small helper that returns (eventType, workspaceId, err) and reuse everywhere.
- Option B: Make ListenAndRoute return the parsed RawEvent (or at least EventType) so the consumer can decide whether to snapshot without re-decoding.
apps/workspace-engine/pkg/kafka/kafka.go (2)
12-13: Fix import alias typo for clarity.Minor readability nit: eventHanlder → eventhandler.
- eventHanlder "workspace-engine/pkg/events/handler" + eventhandler "workspace-engine/pkg/events/handler"And update references: eventHanlder.OffsetTracker → eventhandler.OffsetTracker, eventHanlder.RawEvent → eventhandler.RawEvent.
169-176: Parse message once; reuse parsed event to avoid repeated JSON unmarshalling.The loop decodes JSON in getLastSnapshot, again in ListenAndRoute, and again in IsWorkspaceSaveEvent. This adds avoidable CPU and GC pressure.
- Parse once at the top of the loop into a RawEvent and pass it to getLastSnapshot and ListenAndRoute; decide snapshot based on rawEvent.EventType instead of calling IsWorkspaceSaveEvent.
- Alternatively, have ListenAndRoute return the parsed RawEvent (or eventType) for the snapshot branch.
Also applies to: 195-205
apps/workspace-engine/test/e2e/engine_kafka_replay_test.go (2)
340-357: Clean up on-disk state to prevent test cross-talk.Remove ./state after tests to keep runs isolated.
func (env *testEnvironment) cleanup() { @@ cleanupKafkaTopic(env.t, env.topicName) cleanupTestWorkspace(env.ctx, env.workspaceID) // Reset kafka package variables to defaults os.Unsetenv("KAFKA_TOPIC") kafkapkg.Topic = "workspace-events" kafkapkg.GroupID = "workspace-engine" + + // Remove persisted state + _ = os.RemoveAll("./state") }
443-444: Remove unused offset parameter from producer helper and call sites.The offset argument isn’t used by Kafka; simplify signature and invocations.
- produceKafkaMessage(env.t, env.producer, env.topicName, event, int32(0), 0) + produceKafkaMessage(env.t, env.producer, env.topicName, event, int32(0))- func produceKafkaMessage(t *testing.T, producer *kafka.Producer, topic string, message []byte, partition int32, offset int64) { + func produceKafkaMessage(t *testing.T, producer *kafka.Producer, topic string, message []byte, partition int32) {Also applies to: 451-452, 584-586
📜 Review details
Configuration used: Path: .coderabbit.yaml
Review profile: CHILL
Plan: Pro
Disabled knowledge base sources:
- Linear integration is disabled by default for public repositories
You can enable these sources in your CodeRabbit configuration.
📒 Files selected for processing (6)
apps/workspace-engine/pkg/db/workspaces.go(3 hunks)apps/workspace-engine/pkg/events/events.go(2 hunks)apps/workspace-engine/pkg/events/handler/handler.go(3 hunks)apps/workspace-engine/pkg/events/handler/workspacesave/workspacesave.go(1 hunks)apps/workspace-engine/pkg/kafka/kafka.go(5 hunks)apps/workspace-engine/test/e2e/engine_kafka_replay_test.go(1 hunks)
🧰 Additional context used
📓 Path-based instructions (2)
apps/workspace-engine/**/*.go
📄 CodeRabbit inference engine (apps/workspace-engine/CLAUDE.md)
apps/workspace-engine/**/*.go: Do not add extraneous inline comments that state the obvious
Do not add comments that simply restate what the code does
Do not add comments for standard Go patterns (e.g., noting WaitGroup or semaphore usage)
Write comments that explain why, document complex logic/algorithms, provide non-obvious context, include TODO/FIXME, and document exported functions/types/methods
Files:
apps/workspace-engine/pkg/events/handler/workspacesave/workspacesave.goapps/workspace-engine/pkg/events/events.goapps/workspace-engine/pkg/db/workspaces.goapps/workspace-engine/pkg/events/handler/handler.goapps/workspace-engine/pkg/kafka/kafka.goapps/workspace-engine/test/e2e/engine_kafka_replay_test.go
apps/workspace-engine/**/*_test.go
📄 CodeRabbit inference engine (apps/workspace-engine/CLAUDE.md)
Follow the existing test structure used in *_test.go files
Files:
apps/workspace-engine/test/e2e/engine_kafka_replay_test.go
🧬 Code graph analysis (6)
apps/workspace-engine/pkg/events/handler/workspacesave/workspacesave.go (1)
apps/workspace-engine/pkg/events/handler/handler.go (3)
RawEvent(83-88)EventType(21-21)WorkspaceSave(79-79)
apps/workspace-engine/pkg/events/events.go (2)
apps/workspace-engine/pkg/events/handler/handler.go (1)
WorkspaceSave(79-79)apps/workspace-engine/pkg/events/handler/workspacesave/workspacesave.go (1)
HandleWorkspaceSave(12-14)
apps/workspace-engine/pkg/db/workspaces.go (1)
apps/workspace-engine/pkg/db/client.go (1)
GetDB(60-70)
apps/workspace-engine/pkg/events/handler/handler.go (3)
packages/events/src/kafka/events.ts (1)
Message(196-203)apps/workspace-engine/pkg/workspace/workspace.go (1)
Workspace(43-49)apps/workspace-engine/pkg/workspace/store/store.go (1)
Store(39-61)
apps/workspace-engine/pkg/kafka/kafka.go (5)
apps/workspace-engine/pkg/db/workspaces.go (2)
WorkspaceSnapshot(91-98)GetWorkspaceSnapshot(114-137)apps/workspace-engine/pkg/events/handler/handler.go (2)
RawEvent(83-88)OffsetTracker(106-110)apps/workspace-engine/pkg/workspace/kafka/state.go (1)
GetAssignedWorkspaceIDs(32-52)apps/workspace-engine/pkg/events/handler/workspacesave/workspacesave.go (1)
IsWorkspaceSaveEvent(16-23)apps/workspace-engine/pkg/workspace/loader.go (1)
Save(16-46)
apps/workspace-engine/test/e2e/engine_kafka_replay_test.go (5)
apps/workspace-engine/pkg/db/workspaces.go (3)
GetWorkspaceSnapshot(114-137)WorkspaceSnapshot(91-98)WriteWorkspaceSnapshot(187-199)apps/workspace-engine/pkg/workspace/storage_file.go (1)
NewFileStorage(20-22)apps/workspace-engine/pkg/kafka/kafka.go (4)
Topic(25-25)GroupID(26-26)RunConsumer(67-216)Brokers(27-27)apps/workspace-engine/pkg/db/client.go (2)
Close(73-77)GetDB(60-70)apps/workspace-engine/pkg/events/handler/handler.go (4)
ResourceCreate(24-24)WorkspaceSave(79-79)EventType(21-21)RawEvent(83-88)
🔇 Additional comments (5)
apps/workspace-engine/pkg/events/events.go (1)
78-79: Wiring WorkspaceSave handler looks good.Registration is consistent with other handlers; import path resolves.
Confirm that snapshot creation lives exclusively in the Kafka consumer (as coded) and the workspacesave handler is intentionally a no-op to avoid duplicate side effects.
apps/workspace-engine/pkg/events/handler/workspacesave/workspacesave.go (1)
12-14: Stub handler is acceptable for now.No-op keeps side effects centralized in the consumer’s snapshot block.
If future logic is added here, ensure it remains idempotent for replay.
apps/workspace-engine/pkg/events/handler/handler.go (2)
79-80: New event type constant is fine.
159-164: Offset-based replay logic is sensible.isReplay detection vs last committed, and early-return when MessageOffset <= LastWorkspaceOffset, align with snapshot semantics.
Please confirm tests cover the equality boundary cases (exactly equal to committed/snapshot offsets).
apps/workspace-engine/test/e2e/engine_kafka_replay_test.go (1)
210-221: Replay-mode setup: verify topology fields.You set Partition:0 and NumPartitions:1 in the synthetic “oldSnapshot”. Ensure topic in tests indeed has 1 partition (it does in create topics), otherwise guard expectations accordingly.
| func getLastSnapshot(ctx context.Context, msg *kafka.Message) (*db.WorkspaceSnapshot, error) { | ||
| var rawEvent eventHanlder.RawEvent | ||
| if err := json.Unmarshal(msg.Value, &rawEvent); err != nil { | ||
| log.Error("Failed to unmarshal event", "error", err, "message", string(msg.Value)) | ||
| return nil, fmt.Errorf("failed to unmarshal event: %w", err) | ||
| } | ||
|
|
||
| return db.GetWorkspaceSnapshot(ctx, rawEvent.WorkspaceID) | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Don’t log full payloads on JSON errors.
Logging string(msg.Value) risks PII leakage and noisy logs.
func getLastSnapshot(ctx context.Context, msg *kafka.Message) (*db.WorkspaceSnapshot, error) {
- var rawEvent eventHanlder.RawEvent
+ var rawEvent eventHanlder.RawEvent
if err := json.Unmarshal(msg.Value, &rawEvent); err != nil {
- log.Error("Failed to unmarshal event", "error", err, "message", string(msg.Value))
+ log.Error("Failed to unmarshal event", "error", err,
+ "topic", *msg.TopicPartition.Topic,
+ "partition", msg.TopicPartition.Partition,
+ "offset", msg.TopicPartition.Offset)
return nil, fmt.Errorf("failed to unmarshal event: %w", err)
}📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| func getLastSnapshot(ctx context.Context, msg *kafka.Message) (*db.WorkspaceSnapshot, error) { | |
| var rawEvent eventHanlder.RawEvent | |
| if err := json.Unmarshal(msg.Value, &rawEvent); err != nil { | |
| log.Error("Failed to unmarshal event", "error", err, "message", string(msg.Value)) | |
| return nil, fmt.Errorf("failed to unmarshal event: %w", err) | |
| } | |
| return db.GetWorkspaceSnapshot(ctx, rawEvent.WorkspaceID) | |
| } | |
| func getLastSnapshot(ctx context.Context, msg *kafka.Message) (*db.WorkspaceSnapshot, error) { | |
| var rawEvent eventHanlder.RawEvent | |
| if err := json.Unmarshal(msg.Value, &rawEvent); err != nil { | |
| log.Error("Failed to unmarshal event", "error", err, | |
| "topic", *msg.TopicPartition.Topic, | |
| "partition", msg.TopicPartition.Partition, | |
| "offset", msg.TopicPartition.Offset) | |
| return nil, fmt.Errorf("failed to unmarshal event: %w", err) | |
| } | |
| return db.GetWorkspaceSnapshot(ctx, rawEvent.WorkspaceID) | |
| } |
🤖 Prompt for AI Agents
In apps/workspace-engine/pkg/kafka/kafka.go around lines 39 to 47, the current
error log prints the full message payload (string(msg.Value)) which can leak PII
and produce noisy logs; change the logging to avoid printing the full payload —
instead log only safe metadata (e.g., message key, topic/partition/offset if
available, workspace ID parsed from the partial/unmarshaled data if present) and
a truncated or hashed representation of the payload (or its length), then return
the error; remove direct string(msg.Value) from logs and replace it with a
sanitized indicator (e.g., payload length or first N bytes or a SHA256 hex) so
debugging info remains useful without exposing full data.
| func getLastWorkspaceOffset(snapshot *db.WorkspaceSnapshot) int64 { | ||
| beginning := int64(kafka.OffsetBeginning) | ||
|
|
||
| if snapshot == nil { | ||
| return beginning | ||
| } | ||
|
|
||
| return snapshot.Offset | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Guard against cross-partition/topology mismatches when using stored offsets.
Using a stored offset from a different partition or topology can skip/duplicate processing.
-func getLastWorkspaceOffset(snapshot *db.WorkspaceSnapshot) int64 {
+func getLastWorkspaceOffset(snapshot *db.WorkspaceSnapshot, currentPartition int32, currentNumPartitions int32) int64 {
beginning := int64(kafka.OffsetBeginning)
if snapshot == nil {
return beginning
}
- return snapshot.Offset
+ if snapshot.Partition != currentPartition || snapshot.NumPartitions != currentNumPartitions {
+ return beginning
+ }
+ return snapshot.Offset
}
@@
- lastWorkspaceOffset := getLastWorkspaceOffset(lastSnapshot)
+ lastWorkspaceOffset := getLastWorkspaceOffset(
+ lastSnapshot,
+ msg.TopicPartition.Partition,
+ numPartitions,
+ )Also applies to: 175-182
🤖 Prompt for AI Agents
In apps/workspace-engine/pkg/kafka/kafka.go around lines 49-57 (and similarly at
175-182), the function currently returns a stored offset without validating it
against the current partition/topology; update the logic to verify that the
snapshot is non-nil and that its Partition (and any topology identifier used by
this service) matches the current consumer partition/topology before returning
snapshot.Offset; if the partition/topology differs or the snapshot lacks
matching identifiers, ignore the stored offset and return kafka.OffsetBeginning
(or the earliest safe offset) to avoid skipping/duplicating messages. Ensure you
add the necessary fields/checks (partition and topology id) and unit tests for
both matching and mismatching cases.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 4
🧹 Nitpick comments (5)
apps/workspace-engine/test/e2e/engine_kafka_replay_test.go (5)
506-513: Consider documenting random generation behavior.The
generateAlphanumericfunction usesmath/randwithout explicit seeding. While Go 1.20+ automatically seeds the default rand source, the deterministic behavior (same sequence across test runs) may or may not be intentional for reproducibility. Consider documenting this behavior or explicitly usingrand.New(rand.NewSource(time.Now().UnixNano()))if randomness is desired.
683-704: Refactor: Replace sleep-based timing with polling.Using
time.Sleep(duration)at line 693 to wait for message processing is brittle and can cause flaky tests. The fixed duration may be too short (causing failures) or too long (slowing tests).Consider refactoring to poll for expected state instead:
func (env *testEnvironment) runConsumerUntilCondition(condition func() bool, timeout time.Duration) { env.t.Helper() consumerCtx, cancelConsumer := context.WithCancel(env.ctx) defer cancelConsumer() consumerDone := make(chan error, 1) go func() { consumerDone <- kafkapkg.RunConsumer(consumerCtx) }() // Poll for condition deadline := time.Now().Add(timeout) ticker := time.NewTicker(100 * time.Millisecond) defer ticker.Stop() for { select { case <-ticker.C: if condition() { cancelConsumer() return } if time.Now().After(deadline) { env.t.Fatal("Condition not met within timeout") } case err := <-consumerDone: if err != nil && err != context.Canceled { env.t.Fatalf("Consumer error: %v", err) } return } } }
770-782: Add error handling for JSON marshaling.Silently ignoring JSON marshaling errors at lines 771 and 780 could hide bugs in test data construction. While these operations should typically succeed, adding error checks improves test reliability.
Apply this diff:
func createTestEvent(workspaceID string, eventType eventHandler.EventType, payload map[string]interface{}) []byte { - payloadBytes, _ := json.Marshal(payload) + payloadBytes, err := json.Marshal(payload) + if err != nil { + panic(fmt.Sprintf("Failed to marshal test payload: %v", err)) + } event := eventHandler.RawEvent{ EventType: eventType, WorkspaceID: workspaceID, Data: payloadBytes, Timestamp: time.Now().UnixNano(), } - data, _ := json.Marshal(event) + data, err := json.Marshal(event) + if err != nil { + panic(fmt.Sprintf("Failed to marshal test event: %v", err)) + } return data }
818-820: Replace hard-coded sleep with topic readiness polling.The hard-coded 1-second sleep at line 819 assumes topics will be ready, which may be insufficient under load or excessive when topics are ready sooner.
Consider polling for topic readiness:
// Wait for topic to be ready for i := 0; i < 10; i++ { metadata, err := producer.GetMetadata(&topic, false, 1000) if err == nil && len(metadata.Topics) > 0 && metadata.Topics[topic].Error.Code() == kafka.ErrNoError { break } time.Sleep(200 * time.Millisecond) }
159-177: Skipped tests with clear TODOs.Three test cases are intentionally skipped with descriptive TODO comments. This is acceptable for incremental development, though implementing these would improve coverage of edge cases (multiple workspaces, no snapshot scenarios, offset commit behavior, partition rebalancing).
Would you like me to help generate implementations for any of these skipped test cases?
Also applies to: 415-432
📜 Review details
Configuration used: Path: .coderabbit.yaml
Review profile: CHILL
Plan: Pro
Disabled knowledge base sources:
- Linear integration is disabled by default for public repositories
You can enable these sources in your CodeRabbit configuration.
📒 Files selected for processing (1)
apps/workspace-engine/test/e2e/engine_kafka_replay_test.go(1 hunks)
🧰 Additional context used
📓 Path-based instructions (2)
apps/workspace-engine/**/*.go
📄 CodeRabbit inference engine (apps/workspace-engine/CLAUDE.md)
apps/workspace-engine/**/*.go: Do not add extraneous inline comments that state the obvious
Do not add comments that simply restate what the code does
Do not add comments for standard Go patterns (e.g., noting WaitGroup or semaphore usage)
Write comments that explain why, document complex logic/algorithms, provide non-obvious context, include TODO/FIXME, and document exported functions/types/methods
Files:
apps/workspace-engine/test/e2e/engine_kafka_replay_test.go
apps/workspace-engine/**/*_test.go
📄 CodeRabbit inference engine (apps/workspace-engine/CLAUDE.md)
Follow the existing test structure used in *_test.go files
Files:
apps/workspace-engine/test/e2e/engine_kafka_replay_test.go
🧬 Code graph analysis (1)
apps/workspace-engine/test/e2e/engine_kafka_replay_test.go (5)
apps/workspace-engine/pkg/db/workspaces.go (3)
GetWorkspaceSnapshot(114-137)WorkspaceSnapshot(91-98)WriteWorkspaceSnapshot(187-199)apps/workspace-engine/pkg/workspace/storage_file.go (1)
NewFileStorage(20-22)apps/workspace-engine/pkg/kafka/kafka.go (4)
Topic(25-25)GroupID(26-26)RunConsumer(67-216)Brokers(27-27)apps/workspace-engine/pkg/db/client.go (2)
Close(73-77)GetDB(60-70)apps/workspace-engine/pkg/events/handler/handler.go (10)
ResourceCreate(24-24)WorkspaceSave(79-79)SystemCreate(52-52)EnvironmentCreate(48-48)GithubEntityCreate(74-74)JobAgentCreate(56-56)DeploymentCreate(36-36)DeploymentVersionCreate(40-40)EventType(21-21)RawEvent(83-88)
🔇 Additional comments (5)
apps/workspace-engine/test/e2e/engine_kafka_replay_test.go (5)
26-107: LGTM - Comprehensive e2e replay test.The test correctly exercises the full replay lifecycle: producing events, creating snapshots, verifying workspace state, and testing restoration. The assertions are thorough and check both in-memory state and persisted snapshots.
111-155: LGTM - Validates snapshot trigger behavior.Correctly tests that snapshots are created only when
workspace.saveevents are sent, not on regular resource events.
181-265: LGTM - Critical replay mode validation.The test correctly simulates replay by resetting the snapshot offset to 0, then verifies that workspace state is properly rebuilt from replay and new messages are processed. This validates core replay functionality.
269-393: LGTM - Thorough replay state validation.The test comprehensively validates that complex workspace state (jobs, release targets, deployments) is correctly maintained during replay mode. Good coverage of the critical business logic around state rebuild.
Also applies to: 395-411
868-892: LGTM - Proper SQL parameterization.Both workspace helper functions correctly use parameterized queries to prevent SQL injection and have appropriate error handling for test code.
| t.Fatalf("Failed to read snapshot file: %v", err) | ||
| } | ||
|
|
||
| restoredWs := workspace.New(uuid.New().String()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fix: Use correct workspace ID for restoration.
Creating a restored workspace with a new random UUID is incorrect. The restored workspace should use the same ID as the original workspace to properly validate snapshot restoration.
Apply this diff:
- restoredWs := workspace.New(uuid.New().String())
+ restoredWs := workspace.New(env.workspaceID)📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| restoredWs := workspace.New(uuid.New().String()) | |
| restoredWs := workspace.New(env.workspaceID) |
🤖 Prompt for AI Agents
In apps/workspace-engine/test/e2e/engine_kafka_replay_test.go around line 394,
the restored workspace is being created with a new random UUID; change
restoredWs := workspace.New(uuid.New().String()) to use the original workspace's
ID (the same ID used to create the snapshot in this test), e.g. restoredWs :=
workspace.New(ws.ID) or restoredWs := workspace.New(originalID), ensuring the
exact variable name matches the original workspace ID in the test and the type
is a string.
| // Must set BEFORE accessing kafka package variables | ||
| os.Setenv("KAFKA_TOPIC", topicName) | ||
|
|
||
| // Force reload of kafka package configuration | ||
| kafkapkg.Topic = topicName | ||
| kafkapkg.GroupID = fmt.Sprintf("test-group-%s", uuid.New().String()[:8]) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Critical: Race condition from mutating package-level variables.
Mutating kafkapkg.Topic and kafkapkg.GroupID at lines 464-465 is not thread-safe and will cause race conditions if tests run in parallel. Similarly, setting environment variables (line 461) affects the global process state.
Consider one of these approaches:
- Add
t.Setenv("KAFKA_TOPIC", topicName)instead ofos.Setenv(automatically cleaned up, but still affects parallel tests) - Mark these tests with
t.Parallel()carefully or add explicit serialization - Refactor the
kafkapkgpackage to accept configuration as parameters rather than using package-level variables - Use build tags or test-specific consumer initialization that doesn't rely on global state
The most robust solution is #3: refactor to avoid package-level mutable state.
🤖 Prompt for AI Agents
In apps/workspace-engine/test/e2e/engine_kafka_replay_test.go around lines 460
to 465, the test mutates global process and package-level state (os.Setenv and
kafkapkg.Topic / kafkapkg.GroupID), creating race conditions; change the test to
avoid mutating globals by using t.Setenv("KAFKA_TOPIC", topicName) and, instead
of assigning kafkapkg.Topic/GroupID, construct and use a locally scoped kafka
client/consumer with explicit configuration (e.g., call a kafkapkg factory like
NewClientWithConfig(topicName, fmt.Sprintf("test-group-%s", uuid...)) or a
helper that returns a configured instance) so no package-level variables are
written; if such a factory does not exist, add one in the kafkapkg package that
accepts topic and groupID parameters and use that from the test, or as a
temporary measure serialize the test (avoid parallel execution) until the
refactor is in place.
| func createTestConsumer(t *testing.T, topic string, groupID string) *kafka.Consumer { | ||
| t.Helper() | ||
|
|
||
| brokers := os.Getenv("KAFKA_BROKERS") | ||
| if brokers == "" { | ||
| brokers = "localhost:9092" | ||
| } | ||
|
|
||
| consumer, err := kafka.NewConsumer(&kafka.ConfigMap{ | ||
| "bootstrap.servers": brokers, | ||
| "group.id": groupID, | ||
| "auto.offset.reset": "earliest", | ||
| "enable.auto.commit": false, | ||
| }) | ||
| if err != nil { | ||
| t.Fatalf("Failed to create consumer: %v", err) | ||
| } | ||
|
|
||
| return consumer | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion | 🟠 Major
Remove unused function.
The createTestConsumer function is defined but never used in this test file.
Apply this diff to remove the dead code:
-func createTestConsumer(t *testing.T, topic string, groupID string) *kafka.Consumer {
- t.Helper()
-
- brokers := os.Getenv("KAFKA_BROKERS")
- if brokers == "" {
- brokers = "localhost:9092"
- }
-
- consumer, err := kafka.NewConsumer(&kafka.ConfigMap{
- "bootstrap.servers": brokers,
- "group.id": groupID,
- "auto.offset.reset": "earliest",
- "enable.auto.commit": false,
- })
- if err != nil {
- t.Fatalf("Failed to create consumer: %v", err)
- }
-
- return consumer
-}
-📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| func createTestConsumer(t *testing.T, topic string, groupID string) *kafka.Consumer { | |
| t.Helper() | |
| brokers := os.Getenv("KAFKA_BROKERS") | |
| if brokers == "" { | |
| brokers = "localhost:9092" | |
| } | |
| consumer, err := kafka.NewConsumer(&kafka.ConfigMap{ | |
| "bootstrap.servers": brokers, | |
| "group.id": groupID, | |
| "auto.offset.reset": "earliest", | |
| "enable.auto.commit": false, | |
| }) | |
| if err != nil { | |
| t.Fatalf("Failed to create consumer: %v", err) | |
| } | |
| return consumer | |
| } |
🤖 Prompt for AI Agents
In apps/workspace-engine/test/e2e/engine_kafka_replay_test.go around lines 749
to 768, the createTestConsumer function is dead code and should be removed;
delete the entire function definition (from the func createTestConsumer(...)
line through its closing brace) and then run `go test`/`gofmt` to ensure no
unused-import or formatting issues remain and that there are no remaining
references to this helper.
| return data | ||
| } | ||
|
|
||
| func produceKafkaMessage(t *testing.T, producer *kafka.Producer, topic string, message []byte, partition int32, offset int64) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion | 🟠 Major
Remove unused parameter.
The offset parameter is declared but never used in the function body.
Apply this diff:
-func produceKafkaMessage(t *testing.T, producer *kafka.Producer, topic string, message []byte, partition int32, offset int64) {
+func produceKafkaMessage(t *testing.T, producer *kafka.Producer, topic string, message []byte, partition int32) {Then update all call sites (lines 553, 561, 576, 601, 615, 636, 664, 679) to remove the 0 offset argument.
Committable suggestion skipped: line range outside the PR's diff.
🤖 Prompt for AI Agents
In apps/workspace-engine/test/e2e/engine_kafka_replay_test.go around line 784,
the function produceKafkaMessage has an unused parameter `offset`; remove the
`offset` parameter from the function signature and update its definition
accordingly, then update all call sites at lines 553, 561, 576, 601, 615, 636,
664, and 679 to remove the trailing `0` offset argument so calls match the new
signature.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
🧹 Nitpick comments (1)
apps/workspace-engine/pkg/db/workspaces_test.go (1)
142-161: Consider verifying additional snapshot fields.The test correctly validates the offset and path fields. For more comprehensive coverage, consider also asserting the WorkspaceID, Partition, and NumPartitions fields in the returned snapshots.
Example addition for ws1:
if ws1Snapshot.Path != "ws1-v1.gob" { t.Fatalf("Expected ws1-v1.gob (highest offset), got %s", ws1Snapshot.Path) } +if ws1Snapshot.WorkspaceID != ws1ID { + t.Fatalf("Expected WorkspaceID %s, got %s", ws1ID, ws1Snapshot.WorkspaceID) +} +if ws1Snapshot.Partition != 0 { + t.Fatalf("Expected Partition 0, got %d", ws1Snapshot.Partition) +} +if ws1Snapshot.NumPartitions != 3 { + t.Fatalf("Expected NumPartitions 3, got %d", ws1Snapshot.NumPartitions) +}
📜 Review details
Configuration used: Path: .coderabbit.yaml
Review profile: CHILL
Plan: Pro
Disabled knowledge base sources:
- Linear integration is disabled by default for public repositories
You can enable these sources in your CodeRabbit configuration.
📒 Files selected for processing (1)
apps/workspace-engine/pkg/db/workspaces_test.go(2 hunks)
🧰 Additional context used
📓 Path-based instructions (2)
apps/workspace-engine/**/*.go
📄 CodeRabbit inference engine (apps/workspace-engine/CLAUDE.md)
apps/workspace-engine/**/*.go: Do not add extraneous inline comments that state the obvious
Do not add comments that simply restate what the code does
Do not add comments for standard Go patterns (e.g., noting WaitGroup or semaphore usage)
Write comments that explain why, document complex logic/algorithms, provide non-obvious context, include TODO/FIXME, and document exported functions/types/methods
Files:
apps/workspace-engine/pkg/db/workspaces_test.go
apps/workspace-engine/**/*_test.go
📄 CodeRabbit inference engine (apps/workspace-engine/CLAUDE.md)
Follow the existing test structure used in *_test.go files
Files:
apps/workspace-engine/pkg/db/workspaces_test.go
🧬 Code graph analysis (1)
apps/workspace-engine/pkg/db/workspaces_test.go (1)
apps/workspace-engine/pkg/db/workspaces.go (3)
WorkspaceSnapshot(91-98)WriteWorkspaceSnapshot(187-199)GetLatestWorkspaceSnapshots(139-180)
🔇 Additional comments (3)
apps/workspace-engine/pkg/db/workspaces_test.go (3)
5-5: LGTM: Time import added for timestamp manipulation.The
timepackage import is correctly added to support timestamp creation in the test.
74-114: LGTM: Test setup correctly validates offset-based selection.The test setup properly creates snapshots with varying offsets and timestamps to verify that the highest offset (100) is selected despite having the oldest timestamp. This is a critical test case that ensures the correct ordering behavior.
163-192: LGTM: Comprehensive edge case coverage.The test thoroughly covers important edge cases:
- Workspaces without snapshots
- Empty input arrays
- Non-existent workspace IDs
All assertions correctly validate the expected behavior for these scenarios.
📊 DB Package Test Coveragepkg/db coverage: |
Summary by CodeRabbit
New Features
Bug Fixes
Infrastructure
Tests