@@ -2,7 +2,9 @@ package db
22
33import (
44 "context"
5+ "encoding/json"
56 "fmt"
7+ "strings"
68 "time"
79 "workspace-engine/pkg/oapi"
810 "workspace-engine/pkg/workspace/store"
@@ -21,14 +23,23 @@ const JOB_SELECT_QUERY = `
2123 rj.release_id,
2224 j.started_at,
2325 j.status,
24- j.updated_at
26+ j.updated_at,
27+ COALESCE(
28+ json_object_agg(
29+ COALESCE(jm.key, ''),
30+ COALESCE(jm.value, '')
31+ ) FILTER (WHERE jm.key IS NOT NULL),
32+ '{}'::json
33+ ) as metadata
2534 FROM job j
2635 INNER JOIN release_job rj ON rj.job_id = j.id
2736 INNER JOIN release r ON r.id = rj.release_id
2837 INNER JOIN version_release vr ON vr.id = r.version_release_id
2938 INNER JOIN release_target rt ON rt.id = vr.release_target_id
3039 INNER JOIN resource res ON res.id = rt.resource_id
40+ LEFT JOIN job_metadata jm ON jm.job_id = j.id
3141 WHERE res.workspace_id = $1
42+ GROUP BY j.id, j.completed_at, j.created_at, j.external_id, j.job_agent_id, rj.release_id, j.started_at, j.status, j.updated_at
3243`
3344
3445func getJobs (ctx context.Context , workspaceID string ) ([]* oapi.Job , error ) {
@@ -91,6 +102,7 @@ func scanJobRow(rows pgx.Rows) (*oapi.Job, error) {
91102 var startedAt , completedAt * time.Time
92103 var createdAt , updatedAt time.Time
93104 var statusStr string
105+ var metadataJSON []byte
94106 err := rows .Scan (
95107 & completedAt ,
96108 & createdAt ,
@@ -102,6 +114,7 @@ func scanJobRow(rows pgx.Rows) (*oapi.Job, error) {
102114 & startedAt ,
103115 & statusStr ,
104116 & updatedAt ,
117+ & metadataJSON ,
105118 )
106119 if err != nil {
107120 return nil , err
@@ -114,9 +127,30 @@ func scanJobRow(rows pgx.Rows) (*oapi.Job, error) {
114127
115128 job .Status = convertJobStatusToEnum (statusStr )
116129
130+ if err := setJobMetadata (job , metadataJSON ); err != nil {
131+ return nil , err
132+ }
133+
117134 return job , nil
118135}
119136
137+ func setJobMetadata (job * oapi.Job , metadataJSON []byte ) error {
138+ if len (metadataJSON ) == 0 {
139+ return nil
140+ }
141+
142+ var metadataMap map [string ]string
143+ if err := json .Unmarshal (metadataJSON , & metadataMap ); err != nil {
144+ return err
145+ }
146+
147+ // Only set metadata if it's not empty
148+ if len (metadataMap ) > 0 {
149+ job .Metadata = & metadataMap
150+ }
151+ return nil
152+ }
153+
120154const JOB_UPSERT_QUERY = `
121155 INSERT INTO job (id, job_agent_id, job_agent_config, external_id, status, created_at, started_at, completed_at, updated_at)
122156 VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9)
@@ -207,6 +241,45 @@ func writeJob(ctx context.Context, job *oapi.Job, store *store.Store, tx pgx.Tx)
207241 return err
208242 }
209243 }
244+
245+ // Handle metadata
246+ if _ , err := tx .Exec (ctx , "DELETE FROM job_metadata WHERE job_id = $1" , job .Id ); err != nil {
247+ return fmt .Errorf ("failed to delete existing job metadata: %w" , err )
248+ }
249+
250+ if job .Metadata != nil && len (* job .Metadata ) > 0 {
251+ if err := writeJobMetadata (ctx , job .Id , * job .Metadata , tx ); err != nil {
252+ return err
253+ }
254+ }
255+
256+ return nil
257+ }
258+
259+ func writeJobMetadata (ctx context.Context , jobId string , metadata map [string ]string , tx pgx.Tx ) error {
260+ if len (metadata ) == 0 {
261+ return nil
262+ }
263+
264+ valueStrings := make ([]string , 0 , len (metadata ))
265+ valueArgs := make ([]interface {}, 0 , len (metadata )* 3 )
266+ i := 1
267+ for k , v := range metadata {
268+ valueStrings = append (valueStrings ,
269+ "($" + fmt .Sprintf ("%d" , i )+ ", $" + fmt .Sprintf ("%d" , i + 1 )+ ", $" + fmt .Sprintf ("%d" , i + 2 )+ ")" ,
270+ )
271+ valueArgs = append (valueArgs , jobId , k , v )
272+ i += 3
273+ }
274+
275+ query := "INSERT INTO job_metadata (job_id, key, value) VALUES " +
276+ strings .Join (valueStrings , ", " ) +
277+ " ON CONFLICT (job_id, key) DO UPDATE SET value = EXCLUDED.value"
278+
279+ _ , err := tx .Exec (ctx , query , valueArgs ... )
280+ if err != nil {
281+ return err
282+ }
210283 return nil
211284}
212285
0 commit comments