Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion apps/workspace-engine/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (

"workspace-engine/pkg/events/handler"
"workspace-engine/pkg/kafka"
"workspace-engine/pkg/kafka/producer"
"workspace-engine/pkg/server"
"workspace-engine/pkg/ticker"

Expand Down Expand Up @@ -139,7 +140,7 @@ func main() {
defer cancel()

// Initialize Kafka producer for ticker
producer, err := kafka.NewProducer()
producer, err := producer.NewProducer()
if err != nil {
log.Fatal("Failed to create Kafka producer", "error", err)
}
Expand Down
83 changes: 83 additions & 0 deletions apps/workspace-engine/pkg/events/handler/jobs/jobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,20 +3,64 @@ package jobs
import (
"context"
"encoding/json"
"errors"
"fmt"
"time"

"workspace-engine/pkg/events/handler"
"workspace-engine/pkg/kafka/producer"
"workspace-engine/pkg/oapi"
"workspace-engine/pkg/workspace"
"workspace-engine/pkg/workspace/releasemanager/deployment/jobs"

"github.com/charmbracelet/log"
"github.com/google/uuid"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"
)

var tracer = otel.Tracer("events/handler/jobs")

func isStringUUID(s string) bool {
_, err := uuid.Parse(s)
return err == nil
}

func dispatchAndNotifyJob(ctx context.Context, ws *workspace.Workspace, job *oapi.Job) {
err := ws.ReleaseManager().JobDispatcher().DispatchJob(ctx, job)
if err == nil || errors.Is(err, jobs.ErrUnsupportedJobAgent) {
return
}

log.Error("error dispatching job to integration", "error", err.Error())
job.Status = oapi.InvalidIntegration
job.UpdatedAt = time.Now()

kafkaProducer, err := producer.NewProducer()
if err != nil {
log.Error("error creating kafka producer", "error", err.Error())
return
}
defer kafkaProducer.Close()

jobUpdateEvent := &oapi.JobUpdateEvent{
AgentId: &job.JobAgentId,
ExternalId: job.ExternalId,
FieldsToUpdate: &[]oapi.JobUpdateEventFieldsToUpdate{
oapi.JobUpdateEventFieldsToUpdate("status"),
},
Id: &job.Id,
Job: *job,
}

err = kafkaProducer.ProduceEvent("job.updated", ws.ID, jobUpdateEvent)
if err != nil {
log.Error("error producing job updated event", "error", err.Error())
return
}
}

func getInternalReleaseID(ws *workspace.Workspace, jobUpdateEvent *oapi.JobUpdateEvent) string {
eventReleaseID := jobUpdateEvent.Job.ReleaseId
if eventReleaseID == "" {
Expand All @@ -36,6 +80,45 @@ func getInternalReleaseID(ws *workspace.Workspace, jobUpdateEvent *oapi.JobUpdat
return ""
}

func HandleJobCreated(
ctx context.Context,
ws *workspace.Workspace,
event handler.RawEvent,
) error {
ctx, span := tracer.Start(ctx, "HandleJobCreated")
defer span.End()

var job *oapi.Job
if err := json.Unmarshal(event.Data, &job); err != nil {
return err
}

span.SetAttributes(
attribute.Bool("job.created", true),
attribute.String("job.id", job.Id),
attribute.String("job.status", string(job.Status)),
attribute.String("workspace.id", ws.ID),
)

ws.Jobs().Upsert(ctx, job)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Check for Upsert errors

The Upsert call at line 103 can fail, but the error is neither checked nor logged. This could result in silent failures when persisting the job to the store.

Apply this diff to handle errors:

-	ws.Jobs().Upsert(ctx, job)
+	if err := ws.Jobs().Upsert(ctx, job); err != nil {
+		span.RecordError(err)
+		return fmt.Errorf("failed to upsert job: %w", err)
+	}
🤖 Prompt for AI Agents
In apps/workspace-engine/pkg/events/handler/jobs/jobs.go around line 103, the
call to ws.Jobs().Upsert(ctx, job) ignores the returned error; capture its error
result, check it, and handle it: assign the call to a variable (e.g., err :=
ws.Jobs().Upsert(ctx, job)), and if err != nil log the error with contextual
info (job ID/type) using the handler's existing logger and then return or
propagate the error to the caller (or convert to the appropriate error response)
so failures to persist jobs are not silent.


if ws.Store().IsReplay() {
span.SetAttributes(attribute.Bool("job.replay", true))
span.AddEvent("Skipping job creation in replay mode", trace.WithAttributes(
attribute.String("job.id", job.Id),
attribute.String("job.status", string(job.Status)),
attribute.String("workspace.id", ws.ID),
))
return nil
}

if job.Status != oapi.InvalidJobAgent {
go dispatchAndNotifyJob(ctx, ws, job)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

🧩 Analysis chain

Context passed to goroutine may be cancelled prematurely

The parent context ctx is passed to the async dispatchAndNotifyJob goroutine. If the parent request completes and cancels this context, the dispatch operation may fail unexpectedly. For detached async operations, use context.Background() or (in Go 1.21+) context.WithoutCancel(ctx) to preserve tracing metadata while preventing premature cancellation.

Consider this approach:

 	if job.Status != oapi.InvalidJobAgent {
-		go dispatchAndNotifyJob(ctx, ws, job)
+		go dispatchAndNotifyJob(context.Background(), ws, job)
 	}

Or if you need to preserve tracing context (Go 1.21+):

 	if job.Status != oapi.InvalidJobAgent {
-		go dispatchAndNotifyJob(ctx, ws, job)
+		go dispatchAndNotifyJob(context.WithoutCancel(ctx), ws, job)
 	}


---

🏁 Script executed:

```shell
#!/bin/bash
# Inspect the file around line 116 to understand context
cat apps/workspace-engine/pkg/events/handler/jobs/jobs.go | head -n 150 | tail -n 50

Length of output: 1311


🏁 Script executed:

#!/bin/bash
# Find the dispatchAndNotifyJob function definition
rg -n "func.*dispatchAndNotifyJob" apps/workspace-engine/

Length of output: 145


🏁 Script executed:

#!/bin/bash
# Check Go version in the project
if [ -f "go.mod" ]; then
  head -3 go.mod
fi

Length of output: 0


🏁 Script executed:

#!/bin/bash
# Search for other goroutine patterns passing ctx to understand codebase conventions
rg -n "go.*\(ctx" apps/workspace-engine/ -A 2 -B 2 | head -n 100

Length of output: 760



---

🏁 Script executed:

```shell
#!/bin/bash
# Get the dispatchAndNotifyJob function implementation
sed -n '30,80p' apps/workspace-engine/pkg/events/handler/jobs/jobs.go

Length of output: 1354


🏁 Script executed:

#!/bin/bash
# Get full go.mod for version info
cat go.mod | head -20

Length of output: 38


🏁 Script executed:

#!/bin/bash
# Check the parent handler function to understand context lifecycle
sed -n '95,130p' apps/workspace-engine/pkg/events/handler/jobs/jobs.go

Length of output: 883



---

🏁 Script executed:

```shell
#!/bin/bash
# Find go.mod file in the project
find . -name "go.mod" -type f 2>/dev/null | head -5

Length of output: 30


🏁 Script executed:

#!/bin/bash
# Check if there's a go version file or workspace setting
ls -la apps/workspace-engine/ | grep -E "go\.|versions"

Length of output: 115


🏁 Script executed:

#!/bin/bash
# Look for go directives in the jobs handler file or nearby
rg "go 1\." apps/workspace-engine/ --max-count 3

Length of output: 38


Use context.WithoutCancel(ctx) or context.Background() for detached async task

The dispatchAndNotifyJob goroutine calls ws.ReleaseManager().JobDispatcher().DispatchJob(ctx, job) at line 31, which actively uses the passed context. When the parent event handler returns, its context is cancelled, causing the async dispatch operation to fail prematurely. Since this is a fire-and-forget async operation, pass a detached context using context.WithoutCancel(ctx) (Go 1.21+) to preserve any tracing metadata while preventing premature cancellation, or use context.Background() if tracing isn't needed. Your project uses Go 1.24.0, so both approaches are fully supported.

 	if job.Status != oapi.InvalidJobAgent {
-		go dispatchAndNotifyJob(ctx, ws, job)
+		go dispatchAndNotifyJob(context.WithoutCancel(ctx), ws, job)
 	}
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
go dispatchAndNotifyJob(ctx, ws, job)
if job.Status != oapi.InvalidJobAgent {
go dispatchAndNotifyJob(context.WithoutCancel(ctx), ws, job)
}
🤖 Prompt for AI Agents
In apps/workspace-engine/pkg/events/handler/jobs/jobs.go around line 116, the
goroutine is started with the parent ctx which will be cancelled when the
handler returns causing the fire-and-forget dispatch to fail; change the
goroutine to use a detached context (preserving trace metadata) by creating ctx
:= context.WithoutCancel(ctx) (Go 1.21+) and passing that into
dispatchAndNotifyJob, or if tracing isn’t required use context.Background();
ensure the new detached ctx is created immediately before launching the
goroutine so the async DispatchJob call won’t be cancelled prematurely.

}

return nil
}

func HandleJobUpdated(
ctx context.Context,
ws *workspace.Workspace,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,13 +1,30 @@
package kafka
package producer

import (
"encoding/json"
"fmt"
"os"

"github.com/charmbracelet/log"
"github.com/confluentinc/confluent-kafka-go/v2/kafka"
)

// Configuration variables loaded from environment
var (
Topic = getEnv("KAFKA_TOPIC", "workspace-events")
GroupID = getEnv("KAFKA_GROUP_ID", "workspace-engine")
Brokers = getEnv("KAFKA_BROKERS", "localhost:9092")
)

// getEnv retrieves an environment variable or returns a default value
func getEnv(varName string, defaultValue string) string {
v := os.Getenv(varName)
if v == "" {
return defaultValue
}
return v
}

// EventProducer defines the interface for producing events to Kafka
type EventProducer interface {
ProduceEvent(eventType string, workspaceID string, data any) error
Expand Down
8 changes: 4 additions & 4 deletions apps/workspace-engine/pkg/ticker/ticker.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import (
"strconv"
"time"

"workspace-engine/pkg/kafka"
"workspace-engine/pkg/kafka/producer"
"workspace-engine/pkg/workspace"

"github.com/charmbracelet/log"
Expand All @@ -27,13 +27,13 @@ var tracer = otel.Tracer("ticker")

// Ticker periodically emits tick events for active workspaces
type Ticker struct {
producer kafka.EventProducer
producer producer.EventProducer
interval time.Duration
eventType string
}

// NewDefault creates a new ticker with the configured interval
func NewDefault(producer kafka.EventProducer) *Ticker {
func NewDefault(producer producer.EventProducer) *Ticker {
interval := getTickInterval()
log.Info("Ticker initialized", "interval", interval)

Expand All @@ -45,7 +45,7 @@ func NewDefault(producer kafka.EventProducer) *Ticker {
}

// New creates a new ticker with the configured interval and event type
func New(producer kafka.EventProducer, interval time.Duration, eventType string) *Ticker {
func New(producer producer.EventProducer, interval time.Duration, eventType string) *Ticker {
return &Ticker{
producer: producer,
interval: interval,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,13 @@ package deployment

import (
"context"
"errors"
"fmt"
"time"
"workspace-engine/pkg/kafka/producer"
"workspace-engine/pkg/oapi"
"workspace-engine/pkg/workspace/releasemanager/deployment/jobs"
"workspace-engine/pkg/workspace/store"

"github.com/charmbracelet/log"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"
)
Expand All @@ -30,6 +29,15 @@ func NewExecutor(store *store.Store) *Executor {
}
}

func (e *Executor) getWorkspaceID(releaseToDeploy *oapi.Release) (string, error) {
resourceID := releaseToDeploy.ReleaseTarget.ResourceId
resource, ok := e.store.Resources.Get(resourceID)
if !ok {
return "", fmt.Errorf("resource not found: %s", resourceID)
}
return resource.WorkspaceId, nil
}

// ExecuteRelease performs all write operations to deploy a release (WRITES TO STORE).
// Precondition: Planner has already determined this release NEEDS to be deployed.
// No additional "should we deploy" checks here - trust the planning phase.
Expand All @@ -54,29 +62,23 @@ func (e *Executor) ExecuteRelease(ctx context.Context, releaseToDeploy *oapi.Rel
return err
}

e.store.Jobs.Upsert(ctx, newJob)
span.SetAttributes(
attribute.Bool("job.created", true),
attribute.String("job.id", newJob.Id),
attribute.String("job.status", string(newJob.Status)),
)
workspaceID, err := e.getWorkspaceID(releaseToDeploy)
if err != nil {
span.RecordError(err)
return err
}

if e.store.IsReplay() {
log.Info("Skipping job dispatch in replay mode", "job.id", newJob.Id)
return nil
kafkaProducer, err := producer.NewProducer()
if err != nil {
span.RecordError(err)
return err
}
defer kafkaProducer.Close()

// Step 4: Dispatch job to integration (ASYNC)
// Skip dispatch if job already has InvalidJobAgent status
if newJob.Status != oapi.InvalidJobAgent {
go func() {
if err := e.jobDispatcher.DispatchJob(ctx, newJob); err != nil && !errors.Is(err, jobs.ErrUnsupportedJobAgent) {
log.Error("error dispatching job to integration", "error", err.Error())
newJob.Status = oapi.InvalidIntegration
newJob.UpdatedAt = time.Now()
e.store.Jobs.Upsert(ctx, newJob)
}
}()
err = kafkaProducer.ProduceEvent("job.created", workspaceID, newJob)
if err != nil {
span.RecordError(err)
return err
}

return nil
Expand Down Expand Up @@ -132,3 +134,7 @@ func BuildRelease(
CreatedAt: time.Now().Format(time.RFC3339),
}
}

func (e *Executor) JobDispatcher() *jobs.Dispatcher {
return e.jobDispatcher
}
5 changes: 5 additions & 0 deletions apps/workspace-engine/pkg/workspace/releasemanager/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"workspace-engine/pkg/cmap"
"workspace-engine/pkg/oapi"
"workspace-engine/pkg/workspace/releasemanager/deployment"
"workspace-engine/pkg/workspace/releasemanager/deployment/jobs"
"workspace-engine/pkg/workspace/releasemanager/policy"
"workspace-engine/pkg/workspace/releasemanager/targets"
"workspace-engine/pkg/workspace/releasemanager/variables"
Expand Down Expand Up @@ -269,3 +270,7 @@ func (m *Manager) GetReleaseTargetState(ctx context.Context, releaseTarget *oapi

return rts, nil
}

func (m *Manager) JobDispatcher() *jobs.Dispatcher {
return m.executor.JobDispatcher()
}
Loading