Skip to content

Commit c7b5d1d

Browse files
chore: version jobs go endpoint (#689)
1 parent e7e054c commit c7b5d1d

File tree

15 files changed

+1977
-178
lines changed

15 files changed

+1977
-178
lines changed

apps/workspace-engine/oapi/openapi.json

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1183,10 +1183,6 @@
11831183
},
11841184
"resourceId": {
11851185
"type": "string"
1186-
},
1187-
"rolloutTime": {
1188-
"format": "date-time",
1189-
"type": "string"
11901186
}
11911187
},
11921188
"type": "object"

apps/workspace-engine/oapi/spec/paths/deployment-version.jsonnet

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -57,8 +57,6 @@ local openapi = import '../lib/openapi.libsonnet';
5757
},
5858
},
5959
},
60-
// Optional rollout time
61-
rolloutTime: { type: 'string', format: 'date-time' },
6260
},
6361
},
6462
},

apps/workspace-engine/pkg/db/changeset.go

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -5,12 +5,13 @@ import (
55
"fmt"
66
"workspace-engine/pkg/changeset"
77
"workspace-engine/pkg/oapi"
8+
"workspace-engine/pkg/workspace/store"
89

910
"github.com/jackc/pgx/v5"
1011
"go.opentelemetry.io/otel/attribute"
1112
)
1213

13-
func FlushChangeset(ctx context.Context, cs *changeset.ChangeSet[any], workspaceID string) error {
14+
func FlushChangeset(ctx context.Context, cs *changeset.ChangeSet[any], workspaceID string, store *store.Store) error {
1415
ctx, span := tracer.Start(ctx, "DBFlushChangeset")
1516
defer span.End()
1617

@@ -37,7 +38,7 @@ func FlushChangeset(ctx context.Context, cs *changeset.ChangeSet[any], workspace
3738
defer func() { _ = tx.Rollback(ctx) }()
3839

3940
for _, change := range cs.Changes {
40-
if err := applyChange(ctx, tx, change, workspaceID); err != nil {
41+
if err := applyChange(ctx, tx, change, workspaceID, store); err != nil {
4142
return err
4243
}
4344
}
@@ -51,7 +52,7 @@ func FlushChangeset(ctx context.Context, cs *changeset.ChangeSet[any], workspace
5152
return nil
5253
}
5354

54-
func applyChange(ctx context.Context, conn pgx.Tx, change changeset.Change[any], workspaceID string) error {
55+
func applyChange(ctx context.Context, conn pgx.Tx, change changeset.Change[any], workspaceID string, store *store.Store) error {
5556
if e, ok := change.Entity.(*oapi.Resource); ok && e != nil {
5657
if change.Type == changeset.ChangeTypeDelete {
5758
return deleteResource(ctx, e.Id, conn)
@@ -133,7 +134,7 @@ func applyChange(ctx context.Context, conn pgx.Tx, change changeset.Change[any],
133134
if change.Type == changeset.ChangeTypeDelete {
134135
return deleteJob(ctx, e.Id, conn)
135136
}
136-
return writeJob(ctx, e, conn)
137+
return writeJob(ctx, e, store, conn)
137138
}
138139

139140
if e, ok := change.Entity.(*oapi.ReleaseTarget); ok && e != nil {
@@ -148,16 +149,18 @@ func applyChange(ctx context.Context, conn pgx.Tx, change changeset.Change[any],
148149

149150
type DbChangesetConsumer struct {
150151
workspaceID string
152+
store *store.Store
151153
}
152154

153155
var _ changeset.ChangesetConsumer[any] = (*DbChangesetConsumer)(nil)
154156

155-
func NewChangesetConsumer(workspaceID string) *DbChangesetConsumer {
157+
func NewChangesetConsumer(workspaceID string, store *store.Store) *DbChangesetConsumer {
156158
return &DbChangesetConsumer{
157159
workspaceID: workspaceID,
160+
store: store,
158161
}
159162
}
160163

161164
func (c *DbChangesetConsumer) FlushChangeset(ctx context.Context, changeset *changeset.ChangeSet[any]) error {
162-
return FlushChangeset(ctx, changeset, c.workspaceID)
165+
return FlushChangeset(ctx, changeset, c.workspaceID, c.store)
163166
}

apps/workspace-engine/pkg/db/jobs.go

Lines changed: 31 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,10 @@ package db
22

33
import (
44
"context"
5+
"fmt"
56
"time"
67
"workspace-engine/pkg/oapi"
8+
"workspace-engine/pkg/workspace/store"
79

810
"github.com/jackc/pgx/v5"
911
)
@@ -129,6 +131,20 @@ const JOB_UPSERT_QUERY = `
129131
updated_at = EXCLUDED.updated_at
130132
`
131133

134+
const RELEASE_JOB_CHECK_QUERY = `
135+
SELECT EXISTS(SELECT 1 FROM release_job WHERE release_id = $1 AND job_id = $2)
136+
`
137+
138+
const RELEASE_JOB_INSERT_QUERY = `
139+
INSERT INTO release_job (release_id, job_id)
140+
VALUES ($1, $2)
141+
`
142+
143+
func writeReleaseJob(ctx context.Context, releaseId string, jobId string, tx pgx.Tx) error {
144+
_, err := tx.Exec(ctx, RELEASE_JOB_INSERT_QUERY, releaseId, jobId)
145+
return err
146+
}
147+
132148
func convertOapiJobStatusToStr(status oapi.JobStatus) string {
133149
switch status {
134150
case oapi.Pending:
@@ -156,7 +172,11 @@ func convertOapiJobStatusToStr(status oapi.JobStatus) string {
156172
}
157173
}
158174

159-
func writeJob(ctx context.Context, job *oapi.Job, tx pgx.Tx) error {
175+
func writeJob(ctx context.Context, job *oapi.Job, store *store.Store, tx pgx.Tx) error {
176+
release, ok := store.Releases.Get(job.ReleaseId)
177+
if !ok {
178+
return fmt.Errorf("release not found for job %s", job.Id)
179+
}
160180
statusStr := convertOapiJobStatusToStr(job.Status)
161181
_, err := tx.Exec(
162182
ctx,
@@ -170,7 +190,16 @@ func writeJob(ctx context.Context, job *oapi.Job, tx pgx.Tx) error {
170190
job.StartedAt,
171191
job.CompletedAt,
172192
job.UpdatedAt)
173-
return err
193+
if err != nil {
194+
return err
195+
}
196+
197+
if job.ReleaseId != "" {
198+
if err := writeReleaseJob(ctx, release.UUID().String(), job.Id, tx); err != nil {
199+
return err
200+
}
201+
}
202+
return nil
174203
}
175204

176205
const DELETE_JOB_QUERY = `

0 commit comments

Comments
 (0)