diff --git a/bun.lock b/bun.lock index 279b27e..b357886 100644 --- a/bun.lock +++ b/bun.lock @@ -39,6 +39,7 @@ "@lowerdeck/error": "^1.0.8", "@lowerdeck/hash": "^1.0.4", "@lowerdeck/id": "^1.0.5", + "@lowerdeck/lock": "^1.0.3", "@lowerdeck/once": "^1.0.4", "@lowerdeck/pagination": "^1.0.4", "@lowerdeck/queue": "^1.0.4", @@ -439,6 +440,8 @@ "@lowerdeck/id": ["@lowerdeck/id@1.0.5", "", { "dependencies": { "@lowerdeck/error": "^1.0.8", "@lowerdeck/hash": "^1.0.4", "nanoid": "^5.0.7", "short-uuid": "^5.2.0", "snowflake-uuid": "^1.0.0" } }, "sha512-8Xik3NbqE7mkfYTliMG+zxWjn+my1wK09BIHiKdxylvkdtAKaxBzls+XoALFO2JZ+VSZIdLG2DvtndXjFSwPQg=="], + "@lowerdeck/lock": ["@lowerdeck/lock@1.0.3", "", { "dependencies": { "@lowerdeck/delay": "^1.0.4", "@lowerdeck/redis": "^1.0.3", "@types/bun": "^1.2.11", "ioredis": "^5.4.1", "redlock": "^5.0.0-beta.2", "superjson": "^2.2.5" } }, "sha512-1f/oC7E//s0ylFIWwKUtvFbyeZYn00Pt2KTtbB6s0fMrxI4Wm5BNFTbRvVSGGrafgl025QPYeOVLRh/X214idw=="], + "@lowerdeck/memo": ["@lowerdeck/memo@1.0.4", "", {}, "sha512-Fyhi14j1EocQeAjZ6sqeYZaydPlCt23P+CEv99QVPb46Y6n77RGrslbHE7HDPgV0f9CJi4UWxSbfhk+Spmmj5g=="], "@lowerdeck/once": ["@lowerdeck/once@1.0.4", "", {}, "sha512-na46nHHIVxO0Pc5Umm0T9CffI3rD7Z7+KSjODW543c1pLX/dmC9MG3X0QRp9wHQwcAF8c4Y3IOpTUMMdEN0EzA=="], @@ -1483,6 +1486,8 @@ "redis-parser": ["redis-parser@3.0.0", "", { "dependencies": { "redis-errors": "^1.0.0" } }, "sha512-DJnGAeenTdpMEH6uAJRK/uiyEIH9WVsUmoLwzudwGJUwZPp80PDBWPHXSAGNPwNvIXAbe7MSUB1zQFugFml66A=="], + "redlock": ["redlock@5.0.0-beta.2", "", { "dependencies": { "node-abort-controller": "^3.0.1" } }, "sha512-2RDWXg5jgRptDrB1w9O/JgSZC0j7y4SlaXnor93H/UJm/QyDiFgBKNtrh0TI6oCXqYSaSoXxFh6Sd3VtYfhRXw=="], + "reflect.getprototypeof": ["reflect.getprototypeof@1.0.10", "", { "dependencies": { "call-bind": "^1.0.8", "define-properties": "^1.2.1", "es-abstract": "^1.23.9", "es-errors": "^1.3.0", "es-object-atoms": "^1.0.0", "get-intrinsic": "^1.2.7", "get-proto": "^1.0.1", "which-builtin-type": "^1.2.1" } }, "sha512-00o4I+DVrefhv+nX0ulyi3biSHCPDe+yLv5o/p6d/UVlirijB8E16FtfwSAi4g3tcqrQ4lRAqQSoFEZJehYEcw=="], "regenerate": ["regenerate@1.4.2", "", {}, "sha512-zrceR/XhGYU/d/opr2EKO7aRHUeiBI8qjtfHqADTwZd6Szfy16la6kqD0MIUs5z5hx6AaKa+PixpPrR289+I0A=="], diff --git a/service/package.json b/service/package.json index 8d2090f..8487936 100644 --- a/service/package.json +++ b/service/package.json @@ -25,6 +25,7 @@ "@lowerdeck/error": "^1.0.8", "@lowerdeck/hash": "^1.0.4", "@lowerdeck/id": "^1.0.5", + "@lowerdeck/lock": "^1.0.3", "@lowerdeck/once": "^1.0.4", "@lowerdeck/pagination": "^1.0.4", "@lowerdeck/queue": "^1.0.4", diff --git a/service/src/providers/_lib/buildContext.ts b/service/src/providers/_lib/buildContext.ts index c93bb17..fa715ab 100644 --- a/service/src/providers/_lib/buildContext.ts +++ b/service/src/providers/_lib/buildContext.ts @@ -1,9 +1,17 @@ import { notFoundError, ServiceError } from '@lowerdeck/error'; -import type { Workflow, WorkflowRun } from '../../../prisma/generated/client'; +import type { WorkflowVersionStep } from '../../../prisma/generated/browser'; +import type { Workflow, WorkflowRun, WorkflowRunStep } from '../../../prisma/generated/client'; import { db } from '../../db'; +import { encryption } from '../../encryption'; +import { snowflake } from '../../id'; +import { workflowArtifactService } from '../../services'; +import { buildEndedQueue } from './queues'; export class BuildContext { - private constructor(public readonly workflow: Workflow, public readonly run: WorkflowRun) {} + private constructor( + public readonly workflow: Workflow, + public readonly run: WorkflowRun + ) {} static async of(runId: string): Promise { let run = await db.workflowRun.findUnique({ @@ -15,6 +23,17 @@ export class BuildContext { return new BuildContext(run.workflow, run); } + async DANGEROUSLY_getDecryptedEnvVars() { + let envVars: Record = JSON.parse( + await encryption.decrypt({ + entityId: this.run.id, + encrypted: this.run.encryptedEnvironmentVariables + }) + ); + + return envVars; + } + async listArtifacts() { return await db.workflowArtifact.findMany({ where: { runOid: this.run.oid } @@ -24,7 +43,7 @@ export class BuildContext { async listSteps() { return await db.workflowRunStep.findMany({ where: { runOid: this.run.oid }, - include: { step: true } + include: { step: { include: { artifactToDownload: true } } } }); } @@ -35,4 +54,90 @@ export class BuildContext { if (!version) throw new ServiceError(notFoundError('workflow.version')); return version; } + + async getArtifactUploadInfo() { + return await workflowArtifactService.putArtifactFromBuilderStart({ + run: this.run, + expirationSecs: 60 * 60 * 6 + }); + } + + async completeArtifactUpload(d: { + step: WorkflowRunStep & { step: WorkflowVersionStep | null }; + artifactData: { bucket: string; storageKey: string }; + }) { + if (d.step.step?.type != 'upload_artifact') return; + + await workflowArtifactService.putArtifactFromBuilderFinish({ + run: this.run, + name: d.step.step.artifactToUploadName!, + type: 'output', + artifactData: d.artifactData + }); + } + + async startRun(d: { startedAt?: Date }) { + await db.workflowRun.updateMany({ + where: { oid: this.run.oid, status: 'pending' }, + data: { + status: 'running', + startedAt: d.startedAt ?? new Date() + } + }); + } + + async startStep(d: { stepId: string; startedAt?: Date }) { + return await db.workflowRunStep.update({ + where: { + id: d.stepId + }, + data: { + status: 'running', + startedAt: d.startedAt ?? new Date() + } + }); + } + + async completeStep(d: { stepId: string; status: 'succeeded' | 'failed'; endedAt?: Date }) { + return await db.workflowRunStep.update({ + where: { + id: d.stepId + }, + data: { + status: d.status, + endedAt: d.endedAt ?? new Date() + } + }); + } + + async getStepById(stepId: string) { + return await db.workflowRunStep.findFirst({ + where: { id: stepId, runOid: this.run.oid }, + include: { step: true } + }); + } + + async storeTempOutput(d: { stepOid: bigint; message: string }) { + await db.workflowRunOutputTemp.create({ + data: { + oid: snowflake.nextId(), + runOid: this.run.oid, + stepOid: d.stepOid, + output: d.message.trim() + } + }); + } + + async completeBuild(d: { + status: 'succeeded' | 'failed'; + endedAt?: Date; + stepArtifacts?: { stepId: string; bucket: string; storageKey: string }[]; + }) { + await buildEndedQueue.add({ + runId: this.run.id, + status: d.status, + endedAt: d.endedAt ?? new Date(), + stepArtifacts: d.stepArtifacts ?? [] + }); + } } diff --git a/service/src/providers/_lib/queues.ts b/service/src/providers/_lib/queues.ts new file mode 100644 index 0000000..645ef98 --- /dev/null +++ b/service/src/providers/_lib/queues.ts @@ -0,0 +1,172 @@ +import { createLock } from '@lowerdeck/lock'; +import { combineQueueProcessors, createQueue } from '@lowerdeck/queue'; +import { db } from '../../db'; +import { env } from '../../env'; +import { workflowArtifactService } from '../../services'; +import { storage } from '../../storage'; + +let runLock = createLock({ + name: 'frg/bctx/runlock', + redisUrl: env.service.REDIS_URL +}); + +export let buildEndedQueue = createQueue<{ + runId: string; + status: 'failed' | 'succeeded'; + endedAt: Date; + stepArtifacts: { stepId: string; bucket: string; storageKey: string }[]; +}>({ + redisUrl: env.service.REDIS_URL, + name: 'frg/bctx/end' +}); + +let buildEndedQueueProcessor = buildEndedQueue.process(async data => { + await runLock.usingLock(data.runId, async () => { + let run = await db.workflowRun.findFirst({ where: { id: data.runId } }); + if (!run || run.status == 'succeeded' || run.status == 'failed') return; + + await db.workflowRun.updateMany({ + where: { id: data.runId }, + data: { status: data.status, endedAt: data.endedAt } + }); + await db.workflowRunStep.updateMany({ + where: { runOid: run.oid, status: 'running' }, + data: { status: data.status, endedAt: data.endedAt } + }); + await db.workflowRunStep.updateMany({ + where: { runOid: run.oid, status: 'pending' }, + data: { status: 'canceled' } + }); + + if (data.status == 'succeeded') { + await createArtifactsQueue.add({ + runId: data.runId, + stepArtifacts: data.stepArtifacts + }); + } + + await storeOutputQueue.add({ runId: data.runId }); + }); +}); + +let createArtifactsQueue = createQueue<{ + runId: string; + stepArtifacts: { stepId: string; bucket: string; storageKey: string }[]; +}>({ + redisUrl: env.service.REDIS_URL, + name: 'frg/bctx/patar/many' +}); + +let createArtifactsQueueProcessor = createArtifactsQueue.process(async data => { + await createArtifactQueue.addMany( + data.stepArtifacts.map(sa => ({ + runId: data.runId, + stepId: sa.stepId, + bucket: sa.bucket, + storageKey: sa.storageKey + })) + ); +}); + +let createArtifactQueue = createQueue<{ + runId: string; + stepId: string; + bucket: string; + storageKey: string; +}>({ + redisUrl: env.service.REDIS_URL, + name: 'frg/bctx/patar/single' +}); + +let createArtifactQueueProcessor = createArtifactQueue.process(async data => { + let artifact = await db.workflowArtifact.findFirst({ + where: { + run: { id: data.runId }, + storageKey: data.storageKey + } + }); + if (artifact) return; + + try { + await storage.headObject(data.bucket, data.storageKey); + } catch { + // Artifact does not exist in storage + return; + } + + let run = await db.workflowRun.findFirst({ where: { id: data.runId } }); + let step = await db.workflowRunStep.findFirst({ + where: { id: data.stepId }, + include: { step: true } + }); + if (!run || !step) return; + + await workflowArtifactService.putArtifactFromBuilderFinish({ + run, + name: step.step?.artifactToUploadName || 'artifact', + type: 'output', + artifactData: { bucket: data.bucket, storageKey: data.storageKey } + }); +}); + +let storeOutputQueue = createQueue<{ runId: string }>({ + redisUrl: env.service.REDIS_URL, + name: 'frg/bctx/storou' +}); + +let storeOutputQueueProcessor = storeOutputQueue.process(async data => { + let run = await db.workflowRun.findFirst({ where: { id: data.runId } }); + if (!run) return; + + let steps = await db.workflowRunStep.findMany({ + where: { runOid: run.oid } + }); + + for (let step of steps) { + let outputs = await db.workflowRunOutputTemp.findMany({ + where: { stepOid: step.oid }, + orderBy: { createdAt: 'asc' } + }); + + let fullOutput = outputs.map(o => o.output).join('\n'); + + let outputBucket = env.storage.LOG_BUCKET_NAME; + let outputStorageKey = `runs/${run.id}/log/${step.id}`; + + await storage.putObject(outputBucket, outputStorageKey, fullOutput); + + await db.workflowRunStep.updateMany({ + where: { oid: step.oid }, + data: { + outputBucket, + outputStorageKey + } + }); + } + + await storeOutputCleanupQueue.add({ runOid: run.oid }, { delay: 10000 }); +}); + +let storeOutputCleanupQueue = createQueue<{ runOid: bigint }>({ + redisUrl: env.service.REDIS_URL, + name: 'frg/bctx/stoclu' +}); + +let storeOutputCleanupQueueProcessor = storeOutputCleanupQueue.process(async data => { + await db.workflowRunOutputTemp.deleteMany({ + where: { runOid: data.runOid } + }); + + await db.workflowRun.updateMany({ + where: { oid: data.runOid }, + data: { encryptedEnvironmentVariables: '' } + }); +}); + +export let buildQueueProcessors = combineQueueProcessors([ + buildEndedQueueProcessor, + storeOutputQueueProcessor, + storeOutputCleanupQueueProcessor, + createArtifactsQueueProcessor, + createArtifactQueueProcessor +]); diff --git a/service/src/providers/aws-codebuild/index.ts b/service/src/providers/aws-codebuild/index.ts index 9d6a6bf..eb63fc6 100644 --- a/service/src/providers/aws-codebuild/index.ts +++ b/service/src/providers/aws-codebuild/index.ts @@ -2,11 +2,7 @@ import { GetLogEventsCommand } from '@aws-sdk/client-cloudwatch-logs'; import { BatchGetBuildsCommand, StartBuildCommand } from '@aws-sdk/client-codebuild'; import { combineQueueProcessors, createQueue } from '@lowerdeck/queue'; import { stringify } from 'yaml'; -import { db } from '../../db'; -import { encryption } from '../../encryption'; import { env } from '../../env'; -import { snowflake } from '../../id'; -import { workflowArtifactService } from '../../services'; import { storage } from '../../storage'; import { BuildContext } from '../_lib/buildContext'; import { codebuild, logsClient } from './codeBuild'; @@ -30,7 +26,7 @@ let parseSystemLog = (line: string) => { export let startAwsCodeBuildQueue = createQueue<{ runId: string }>({ redisUrl: env.service.REDIS_URL, - name: 'forge/aws-cbld/start', + name: 'frg/aws.cb/bld/start', workerOpts: { concurrency: 5, limiter: { @@ -58,15 +54,10 @@ let startBuildQueueProcessor = startAwsCodeBuildQueue.process(async data => { let actionSteps = steps.filter(s => s.type === 'action'); let cleanupSteps = steps.filter(s => s.type === 'cleanup'); - let envVars: Record = JSON.parse( - await encryption.decrypt({ - entityId: ctx.run.id, - encrypted: ctx.run.encryptedEnvironmentVariables - }) - ); - let artifactData: Record = {}; + let envVars = await ctx.DANGEROUSLY_getDecryptedEnvVars(); + let startBuildResp = await codebuild.send( new StartBuildCommand({ projectName: project.projectName, @@ -143,12 +134,9 @@ let startBuildQueueProcessor = startAwsCodeBuildQueue.process(async data => { if (step.step?.type == 'script') { inner = step.step?.actionScript ?? ['echo "No action"']; } else if (step.step?.type == 'download_artifact') { - let artifact = await db.workflowArtifact.findFirstOrThrow({ - where: { - oid: step.step.artifactToDownloadOid!, - workflowOid: ctx.run.workflowOid - } - }); + let artifact = step.step.artifactToDownload; + if (!artifact) throw new Error('WTF - Artifact to download not found'); + let res = await storage.getPublicURL( artifact.bucket, artifact.storageKey, @@ -162,11 +150,7 @@ let startBuildQueueProcessor = startAwsCodeBuildQueue.process(async data => { `echo "Download complete."` ]; } else if (step.step?.type == 'upload_artifact') { - let uploadInfo = - await workflowArtifactService.putArtifactFromBuilderStart({ - run: ctx.run, - expirationSecs: 60 * 60 * 6 - }); + let uploadInfo = await ctx.getArtifactUploadInfo(); artifactData[step.id] = { bucket: uploadInfo.bucket, @@ -225,16 +209,10 @@ let waitForBuildQueue = createQueue<{ buildId: string; attemptNo: number; - artifactData: Record< - string, - { - bucket: string; - storageKey: string; - } - >; + artifactData: Record; }>({ redisUrl: env.service.REDIS_URL, - name: 'forge/aws-cbld/wait' + name: 'frg/aws.cb/bld/wait' }); let waitForBuildQueueProcessor = waitForBuildQueue.process(async data => { @@ -283,16 +261,10 @@ let startedBuildQueue = createQueue<{ buildId: string; cloudwatch: { groupName: string; streamName: string }; - artifactData: Record< - string, - { - bucket: string; - storageKey: string; - } - >; + artifactData: Record; }>({ redisUrl: env.service.REDIS_URL, - name: 'forge/aws-cbld/started' + name: 'frg/aws.cb/bld/started' }); let startedBuildQueueProcessor = startedBuildQueue.process(async data => { @@ -326,22 +298,18 @@ let monitorBuildOutputQueue = createQueue<{ currentStepOid?: bigint; - artifactData: Record< - string, - { - bucket: string; - storageKey: string; - } - >; + artifactData: Record; afterCheckNo?: number; }>({ redisUrl: env.service.REDIS_URL, - name: 'forge/aws-cbld/mopt' + name: 'frg/aws.cb/bld/mopt' }); let monitorBuildOutputQueueProcessor = monitorBuildOutputQueue.process(async data => { if (!logsClient || !codebuild) throw new Error('CodeBuild client not initialized'); + let ctx = await BuildContext.of(data.runId); + let buildInfo = await codebuild.send( new BatchGetBuildsCommand({ ids: [data.buildId] @@ -356,18 +324,17 @@ let monitorBuildOutputQueueProcessor = monitorBuildOutputQueue.process(async dat logStreamName: data.cloudwatch.streamName, nextToken: data.nextToken, - startFromHead: true + startFromHead: true, + limit: 1000 }) ); - let buildStarted = !!data.buildStarted; - let buildEndedNaturally = !!data.buildEnded; - - let currentStepOid = data.currentStepOid; - let collectedMessages = new Map(); - for (let event of logResp.events || []) { + let events = logResp.events || []; + let hasManyEvents = events.length >= 500; + + for (let event of events) { let message = (event.message || '').trim(); if (message.startsWith('[Container]')) continue; @@ -375,84 +342,65 @@ let monitorBuildOutputQueueProcessor = monitorBuildOutputQueue.process(async dat if (systemLog) { if (systemLog.type === 'build.start') { - await db.workflowRun.update({ - where: { id: data.runId }, - data: { - status: 'running', - startedAt: build.startTime ? new Date(build.startTime) : new Date() - } + await ctx.startRun({ + startedAt: event.timestamp ? new Date(event.timestamp) : new Date() }); - - buildStarted = true; + data.buildStarted = true; } else if (systemLog.type === 'build.end') { - buildEndedNaturally = true; + data.buildEnded = true; } else if (systemLog.type === 'step.start') { - let step = await db.workflowRunStep.update({ - where: { id: systemLog.stepId, runOid: data.runOid }, - data: { - status: 'running', - startedAt: event.timestamp ? new Date(event.timestamp) : new Date() - } + let step = await ctx.startStep({ + stepId: systemLog.stepId, + startedAt: event.timestamp ? new Date(event.timestamp) : new Date() }); - currentStepOid = step.oid; + data.currentStepOid = step.oid; } else if (systemLog.type === 'step.end') { - let step = await db.workflowRunStep.update({ - where: { id: systemLog.stepId, runOid: data.runOid }, - data: { - status: 'succeeded', - endedAt: event.timestamp ? new Date(event.timestamp) : new Date() - } + let step = await ctx.completeStep({ + stepId: systemLog.stepId, + status: 'succeeded', + endedAt: event.timestamp ? new Date(event.timestamp) : new Date() }); - if (currentStepOid === step.oid) currentStepOid = undefined; + if (data.currentStepOid === step.oid) data.currentStepOid = undefined; } else if (systemLog.type === 'upload-artifact.register') { - let step = await db.workflowRunStep.findFirst({ - where: { - id: systemLog.stepId, - runOid: data.runOid - }, - include: { step: true, run: true } - }); + let step = await ctx.getStepById(systemLog.stepId); let artifactData = step ? data.artifactData[step.id] : null; - if (artifactData && step?.step?.type === 'upload_artifact') { - await workflowArtifactService.putArtifactFromBuilderFinish({ - run: step.run, - name: step.step.artifactToUploadName!, - type: 'output', + if (artifactData && step) { + await ctx.completeArtifactUpload({ + step, artifactData }); } } - } else if (buildStarted && !buildEndedNaturally && currentStepOid) { - let string = collectedMessages.get(currentStepOid) || ''; + } else if (data.buildStarted && !data.buildEnded && data.currentStepOid) { + let string = collectedMessages.get(data.currentStepOid) || ''; string += JSON.stringify([event.timestamp || 0, message]) + '\n'; - collectedMessages.set(currentStepOid, string); + collectedMessages.set(data.currentStepOid, string); } } for (let [stepOid, msg] of collectedMessages.entries()) { - let step = await db.workflowRunStep.findFirstOrThrow({ - where: { oid: stepOid, runOid: data.runOid } - }); - await db.workflowRunOutputTemp.create({ - data: { - oid: snowflake.nextId(), - runOid: data.runOid, - stepOid: step.oid, - output: msg.trim() - } + await ctx.storeTempOutput({ + stepOid, + message: msg }); } let finalAfterCheckNo = data.afterCheckNo !== undefined && data.afterCheckNo >= 5; // Build ended as we expected or we've waited long enough after it ended - if (buildEndedNaturally || finalAfterCheckNo) { - await buildEndedQueue.add({ runId: data.runId, buildId: data.buildId }); + if (data.buildEnded || finalAfterCheckNo) { + await buildEndedQueue.add({ + runId: data.runId, + buildId: data.buildId, + artifactData: data.artifactData + }); return; } let buildEndedUnexpectedly = build.buildStatus != 'IN_PROGRESS'; + data.buildEnded = data.buildEnded || buildEndedUnexpectedly; + let afterCheckNo = buildEndedUnexpectedly ? (data.afterCheckNo || 0) + 1 : undefined; if (logResp.nextForwardToken) { @@ -460,22 +408,26 @@ let monitorBuildOutputQueueProcessor = monitorBuildOutputQueue.process(async dat { ...data, nextToken: logResp.nextForwardToken, - buildEnded: buildEndedNaturally, - buildStarted, - currentStepOid, afterCheckNo }, - { delay: 1000 } + { delay: hasManyEvents ? 50 : 1000 } ); } else { // If we don't have a new token, we can end the build - await buildEndedQueue.add({ runId: data.runId, buildId: data.buildId }); + await buildEndedQueue.add( + { runId: data.runId, buildId: data.buildId, artifactData: data.artifactData }, + { delay: 5000 } + ); } }); -let buildEndedQueue = createQueue<{ runId: string; buildId: string }>({ +let buildEndedQueue = createQueue<{ + runId: string; + buildId: string; + artifactData: Record; +}>({ redisUrl: env.service.REDIS_URL, - name: 'forge/aws-cbld/end' + name: 'frg/aws.cb/bld/end' }); let buildEndedQueueProcessor = buildEndedQueue.process(async data => { @@ -489,81 +441,19 @@ let buildEndedQueueProcessor = buildEndedQueue.process(async data => { let build = buildInfo.builds?.[0]; if (!build) return; - let ctx = await BuildContext.of(data.runId); - - let failed = build.buildStatus != 'SUCCEEDED'; - - await db.workflowRunStep.updateMany({ - where: { runOid: ctx.run.oid, status: 'running' }, - data: { status: failed ? 'failed' : 'succeeded', endedAt: new Date() } - }); - await db.workflowRunStep.updateMany({ - where: { runOid: ctx.run.oid, status: 'pending' }, - data: { status: 'canceled' } - }); - - await db.workflowRun.updateMany({ - where: { id: data.runId }, - data: { - status: failed ? 'failed' : 'succeeded', - endedAt: build.endTime ? new Date(build.endTime) : new Date() - } - }); - - await storeOutputQueue.add({ runId: data.runId }); -}); - -let storeOutputQueue = createQueue<{ runId: string }>({ - redisUrl: env.service.REDIS_URL, - name: 'forge/aws-cbld/storou' -}); - -let storeOutputQueueProcessor = storeOutputQueue.process(async data => { - let run = await db.workflowRun.findFirst({ where: { id: data.runId } }); - if (!run) return; - - let steps = await db.workflowRunStep.findMany({ - where: { runOid: run.oid } - }); - - for (let step of steps) { - let outputs = await db.workflowRunOutputTemp.findMany({ - where: { stepOid: step.oid }, - orderBy: { createdAt: 'asc' } - }); - - let fullOutput = outputs.map(o => o.output).join('\n'); - - let outputBucket = env.storage.LOG_BUCKET_NAME; - let outputStorageKey = `runs/${run.id}/log/${step.id}`; - - await storage.putObject(outputBucket, outputStorageKey, fullOutput); - - await db.workflowRunStep.updateMany({ - where: { oid: step.oid }, - data: { - outputBucket, - outputStorageKey - } - }); + if (build.buildStatus != 'FAILED' && build.buildStatus != 'SUCCEEDED') { + await buildEndedQueue.add(data, { delay: 3000 }); + return; } - await storeOutputCleanupQueue.add({ runOid: run.oid }, { delay: 10000 }); -}); - -let storeOutputCleanupQueue = createQueue<{ runOid: bigint }>({ - redisUrl: env.service.REDIS_URL, - name: 'forge/aws-cbld/stoclu' -}); - -let storeOutputCleanupQueueProcessor = storeOutputCleanupQueue.process(async data => { - await db.workflowRunOutputTemp.deleteMany({ - where: { runOid: data.runOid } - }); + let ctx = await BuildContext.of(data.runId); - await db.workflowRun.updateMany({ - where: { oid: data.runOid }, - data: { encryptedEnvironmentVariables: '' } + await ctx.completeBuild({ + status: build.buildStatus == 'FAILED' ? 'failed' : 'succeeded', + stepArtifacts: Object.entries(data.artifactData).map(([stepId, info]) => ({ + stepId, + ...info + })) }); }); @@ -572,7 +462,5 @@ export let awsCodeBuildProcessors = combineQueueProcessors([ waitForBuildQueueProcessor, startedBuildQueueProcessor, monitorBuildOutputQueueProcessor, - buildEndedQueueProcessor, - storeOutputQueueProcessor, - storeOutputCleanupQueueProcessor + buildEndedQueueProcessor ]); diff --git a/service/src/worker.ts b/service/src/worker.ts index 97bc4cc..5d0629c 100644 --- a/service/src/worker.ts +++ b/service/src/worker.ts @@ -1,4 +1,5 @@ import { runQueueProcessors } from '@lowerdeck/queue'; +import { buildQueueProcessors } from './providers/_lib/queues'; import { awsCodeBuildProcessors } from './providers/aws-codebuild'; import { deleteWorkflowProcessors } from './queues/deleteWorkflow'; import { deleteWorkflowArtifactProcessors } from './queues/deleteWorkflowArtifact'; @@ -9,5 +10,7 @@ await runQueueProcessors([ deleteWorkflowRunProcessors, deleteWorkflowArtifactProcessors, - awsCodeBuildProcessors + awsCodeBuildProcessors, + + buildQueueProcessors ]);