Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions bun.lock
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
"@lowerdeck/error": "^1.0.8",
"@lowerdeck/hash": "^1.0.4",
"@lowerdeck/id": "^1.0.5",
"@lowerdeck/lock": "^1.0.3",
"@lowerdeck/once": "^1.0.4",
"@lowerdeck/pagination": "^1.0.4",
"@lowerdeck/queue": "^1.0.4",
Expand Down Expand Up @@ -439,6 +440,8 @@

"@lowerdeck/id": ["@lowerdeck/[email protected]", "", { "dependencies": { "@lowerdeck/error": "^1.0.8", "@lowerdeck/hash": "^1.0.4", "nanoid": "^5.0.7", "short-uuid": "^5.2.0", "snowflake-uuid": "^1.0.0" } }, "sha512-8Xik3NbqE7mkfYTliMG+zxWjn+my1wK09BIHiKdxylvkdtAKaxBzls+XoALFO2JZ+VSZIdLG2DvtndXjFSwPQg=="],

"@lowerdeck/lock": ["@lowerdeck/[email protected]", "", { "dependencies": { "@lowerdeck/delay": "^1.0.4", "@lowerdeck/redis": "^1.0.3", "@types/bun": "^1.2.11", "ioredis": "^5.4.1", "redlock": "^5.0.0-beta.2", "superjson": "^2.2.5" } }, "sha512-1f/oC7E//s0ylFIWwKUtvFbyeZYn00Pt2KTtbB6s0fMrxI4Wm5BNFTbRvVSGGrafgl025QPYeOVLRh/X214idw=="],

"@lowerdeck/memo": ["@lowerdeck/[email protected]", "", {}, "sha512-Fyhi14j1EocQeAjZ6sqeYZaydPlCt23P+CEv99QVPb46Y6n77RGrslbHE7HDPgV0f9CJi4UWxSbfhk+Spmmj5g=="],

"@lowerdeck/once": ["@lowerdeck/[email protected]", "", {}, "sha512-na46nHHIVxO0Pc5Umm0T9CffI3rD7Z7+KSjODW543c1pLX/dmC9MG3X0QRp9wHQwcAF8c4Y3IOpTUMMdEN0EzA=="],
Expand Down Expand Up @@ -1483,6 +1486,8 @@

"redis-parser": ["[email protected]", "", { "dependencies": { "redis-errors": "^1.0.0" } }, "sha512-DJnGAeenTdpMEH6uAJRK/uiyEIH9WVsUmoLwzudwGJUwZPp80PDBWPHXSAGNPwNvIXAbe7MSUB1zQFugFml66A=="],

"redlock": ["[email protected]", "", { "dependencies": { "node-abort-controller": "^3.0.1" } }, "sha512-2RDWXg5jgRptDrB1w9O/JgSZC0j7y4SlaXnor93H/UJm/QyDiFgBKNtrh0TI6oCXqYSaSoXxFh6Sd3VtYfhRXw=="],

"reflect.getprototypeof": ["[email protected]", "", { "dependencies": { "call-bind": "^1.0.8", "define-properties": "^1.2.1", "es-abstract": "^1.23.9", "es-errors": "^1.3.0", "es-object-atoms": "^1.0.0", "get-intrinsic": "^1.2.7", "get-proto": "^1.0.1", "which-builtin-type": "^1.2.1" } }, "sha512-00o4I+DVrefhv+nX0ulyi3biSHCPDe+yLv5o/p6d/UVlirijB8E16FtfwSAi4g3tcqrQ4lRAqQSoFEZJehYEcw=="],

"regenerate": ["[email protected]", "", {}, "sha512-zrceR/XhGYU/d/opr2EKO7aRHUeiBI8qjtfHqADTwZd6Szfy16la6kqD0MIUs5z5hx6AaKa+PixpPrR289+I0A=="],
Expand Down
1 change: 1 addition & 0 deletions service/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
"@lowerdeck/error": "^1.0.8",
"@lowerdeck/hash": "^1.0.4",
"@lowerdeck/id": "^1.0.5",
"@lowerdeck/lock": "^1.0.3",
"@lowerdeck/once": "^1.0.4",
"@lowerdeck/pagination": "^1.0.4",
"@lowerdeck/queue": "^1.0.4",
Expand Down
111 changes: 108 additions & 3 deletions service/src/providers/_lib/buildContext.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,17 @@
import { notFoundError, ServiceError } from '@lowerdeck/error';
import type { Workflow, WorkflowRun } from '../../../prisma/generated/client';
import type { WorkflowVersionStep } from '../../../prisma/generated/browser';
import type { Workflow, WorkflowRun, WorkflowRunStep } from '../../../prisma/generated/client';
import { db } from '../../db';
import { encryption } from '../../encryption';
import { snowflake } from '../../id';
import { workflowArtifactService } from '../../services';
import { buildEndedQueue } from './queues';

export class BuildContext {
private constructor(public readonly workflow: Workflow, public readonly run: WorkflowRun) {}
private constructor(
public readonly workflow: Workflow,
public readonly run: WorkflowRun
) {}

static async of(runId: string): Promise<BuildContext> {
let run = await db.workflowRun.findUnique({
Expand All @@ -15,6 +23,17 @@ export class BuildContext {
return new BuildContext(run.workflow, run);
}

async DANGEROUSLY_getDecryptedEnvVars() {
let envVars: Record<string, string> = JSON.parse(
await encryption.decrypt({
entityId: this.run.id,
encrypted: this.run.encryptedEnvironmentVariables
})
);

return envVars;
}

async listArtifacts() {
return await db.workflowArtifact.findMany({
where: { runOid: this.run.oid }
Expand All @@ -24,7 +43,7 @@ export class BuildContext {
async listSteps() {
return await db.workflowRunStep.findMany({
where: { runOid: this.run.oid },
include: { step: true }
include: { step: { include: { artifactToDownload: true } } }
});
}

Expand All @@ -35,4 +54,90 @@ export class BuildContext {
if (!version) throw new ServiceError(notFoundError('workflow.version'));
return version;
}

async getArtifactUploadInfo() {
return await workflowArtifactService.putArtifactFromBuilderStart({
run: this.run,
expirationSecs: 60 * 60 * 6
});
}

async completeArtifactUpload(d: {
step: WorkflowRunStep & { step: WorkflowVersionStep | null };
artifactData: { bucket: string; storageKey: string };
}) {
if (d.step.step?.type != 'upload_artifact') return;

await workflowArtifactService.putArtifactFromBuilderFinish({
run: this.run,
name: d.step.step.artifactToUploadName!,
type: 'output',
artifactData: d.artifactData
});
}

async startRun(d: { startedAt?: Date }) {
await db.workflowRun.updateMany({
where: { oid: this.run.oid, status: 'pending' },
data: {
status: 'running',
startedAt: d.startedAt ?? new Date()
}
});
}

async startStep(d: { stepId: string; startedAt?: Date }) {
return await db.workflowRunStep.update({
where: {
id: d.stepId
},
data: {
status: 'running',
startedAt: d.startedAt ?? new Date()
}
});
}

async completeStep(d: { stepId: string; status: 'succeeded' | 'failed'; endedAt?: Date }) {
return await db.workflowRunStep.update({
where: {
id: d.stepId
},
data: {
status: d.status,
endedAt: d.endedAt ?? new Date()
}
});
}

async getStepById(stepId: string) {
return await db.workflowRunStep.findFirst({
where: { id: stepId, runOid: this.run.oid },
include: { step: true }
});
}

async storeTempOutput(d: { stepOid: bigint; message: string }) {
await db.workflowRunOutputTemp.create({
data: {
oid: snowflake.nextId(),
runOid: this.run.oid,
stepOid: d.stepOid,
output: d.message.trim()
}
});
}

async completeBuild(d: {
status: 'succeeded' | 'failed';
endedAt?: Date;
stepArtifacts?: { stepId: string; bucket: string; storageKey: string }[];
}) {
await buildEndedQueue.add({
runId: this.run.id,
status: d.status,
endedAt: d.endedAt ?? new Date(),
stepArtifacts: d.stepArtifacts ?? []
});
}
}
172 changes: 172 additions & 0 deletions service/src/providers/_lib/queues.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,172 @@
import { createLock } from '@lowerdeck/lock';
import { combineQueueProcessors, createQueue } from '@lowerdeck/queue';
import { db } from '../../db';
import { env } from '../../env';
import { workflowArtifactService } from '../../services';
import { storage } from '../../storage';

let runLock = createLock({
name: 'frg/bctx/runlock',
redisUrl: env.service.REDIS_URL
});

export let buildEndedQueue = createQueue<{
runId: string;
status: 'failed' | 'succeeded';
endedAt: Date;
stepArtifacts: { stepId: string; bucket: string; storageKey: string }[];
}>({
redisUrl: env.service.REDIS_URL,
name: 'frg/bctx/end'
});

let buildEndedQueueProcessor = buildEndedQueue.process(async data => {
await runLock.usingLock(data.runId, async () => {
let run = await db.workflowRun.findFirst({ where: { id: data.runId } });
if (!run || run.status == 'succeeded' || run.status == 'failed') return;

await db.workflowRun.updateMany({
where: { id: data.runId },
data: { status: data.status, endedAt: data.endedAt }
});
await db.workflowRunStep.updateMany({
where: { runOid: run.oid, status: 'running' },
data: { status: data.status, endedAt: data.endedAt }
});
await db.workflowRunStep.updateMany({
where: { runOid: run.oid, status: 'pending' },
data: { status: 'canceled' }
});

if (data.status == 'succeeded') {
await createArtifactsQueue.add({
runId: data.runId,
stepArtifacts: data.stepArtifacts
});
}

await storeOutputQueue.add({ runId: data.runId });
});
});

let createArtifactsQueue = createQueue<{
runId: string;
stepArtifacts: { stepId: string; bucket: string; storageKey: string }[];
}>({
redisUrl: env.service.REDIS_URL,
name: 'frg/bctx/patar/many'
});

let createArtifactsQueueProcessor = createArtifactsQueue.process(async data => {
await createArtifactQueue.addMany(
data.stepArtifacts.map(sa => ({
runId: data.runId,
stepId: sa.stepId,
bucket: sa.bucket,
storageKey: sa.storageKey
}))
);
});

let createArtifactQueue = createQueue<{
runId: string;
stepId: string;
bucket: string;
storageKey: string;
}>({
redisUrl: env.service.REDIS_URL,
name: 'frg/bctx/patar/single'
});

let createArtifactQueueProcessor = createArtifactQueue.process(async data => {
let artifact = await db.workflowArtifact.findFirst({
where: {
run: { id: data.runId },
storageKey: data.storageKey
}
});
if (artifact) return;

try {
await storage.headObject(data.bucket, data.storageKey);
} catch {
// Artifact does not exist in storage
return;
}

let run = await db.workflowRun.findFirst({ where: { id: data.runId } });
let step = await db.workflowRunStep.findFirst({
where: { id: data.stepId },
include: { step: true }
});
if (!run || !step) return;

await workflowArtifactService.putArtifactFromBuilderFinish({
run,
name: step.step?.artifactToUploadName || 'artifact',
type: 'output',
artifactData: { bucket: data.bucket, storageKey: data.storageKey }
});
});

let storeOutputQueue = createQueue<{ runId: string }>({
redisUrl: env.service.REDIS_URL,
name: 'frg/bctx/storou'
});

let storeOutputQueueProcessor = storeOutputQueue.process(async data => {
let run = await db.workflowRun.findFirst({ where: { id: data.runId } });
if (!run) return;

let steps = await db.workflowRunStep.findMany({
where: { runOid: run.oid }
});

for (let step of steps) {
let outputs = await db.workflowRunOutputTemp.findMany({
where: { stepOid: step.oid },
orderBy: { createdAt: 'asc' }
});

let fullOutput = outputs.map(o => o.output).join('\n');

let outputBucket = env.storage.LOG_BUCKET_NAME;
let outputStorageKey = `runs/${run.id}/log/${step.id}`;

await storage.putObject(outputBucket, outputStorageKey, fullOutput);

await db.workflowRunStep.updateMany({
where: { oid: step.oid },
data: {
outputBucket,
outputStorageKey
}
});
}

await storeOutputCleanupQueue.add({ runOid: run.oid }, { delay: 10000 });
});

let storeOutputCleanupQueue = createQueue<{ runOid: bigint }>({
redisUrl: env.service.REDIS_URL,
name: 'frg/bctx/stoclu'
});

let storeOutputCleanupQueueProcessor = storeOutputCleanupQueue.process(async data => {
await db.workflowRunOutputTemp.deleteMany({
where: { runOid: data.runOid }
});

await db.workflowRun.updateMany({
where: { oid: data.runOid },
data: { encryptedEnvironmentVariables: '' }
});
});

export let buildQueueProcessors = combineQueueProcessors([
buildEndedQueueProcessor,
storeOutputQueueProcessor,
storeOutputCleanupQueueProcessor,
createArtifactsQueueProcessor,
createArtifactQueueProcessor
]);
Loading