Skip to content

Commit 480c48a

Browse files
chore: send job agent events
1 parent 42b30d2 commit 480c48a

File tree

9 files changed

+157
-11
lines changed

9 files changed

+157
-11
lines changed

apps/event-queue/src/events/index.ts

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,9 @@ const workspaceHandlers: Record<Event, Handler<any>> = {
7676
[Event.UserApprovalRecordCreated]: () => Promise.resolve(),
7777
[Event.UserApprovalRecordUpdated]: () => Promise.resolve(),
7878
[Event.UserApprovalRecordDeleted]: () => Promise.resolve(),
79+
[Event.JobAgentCreated]: () => Promise.resolve(),
80+
[Event.JobAgentUpdated]: () => Promise.resolve(),
81+
[Event.JobAgentDeleted]: () => Promise.resolve(),
7982
};
8083

8184
export type Handler<T extends keyof EventPayload> = (

apps/webservice/src/app/api/github/installation/route.ts

Lines changed: 16 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ import { auth } from "@ctrlplane/auth";
1111
import { eq, takeFirst, takeFirstOrNull } from "@ctrlplane/db";
1212
import { db } from "@ctrlplane/db/client";
1313
import { githubEntity, jobAgent, user, workspace } from "@ctrlplane/db/schema";
14+
import { eventDispatcher } from "@ctrlplane/events";
1415

1516
import type { AuthedOctokitClient } from "../octokit";
1617
import { env } from "~/env";
@@ -162,15 +163,21 @@ export const GET = async (req: NextRequest) => {
162163
u.id,
163164
);
164165

165-
await db.insert(jobAgent).values({
166-
workspaceId: activeWorkspace.id,
167-
name: entity.slug,
168-
type: "github-app",
169-
config: {
170-
installationId: entity.installationId,
171-
owner: entity.slug,
172-
},
173-
});
166+
const createdAgent = await db
167+
.insert(jobAgent)
168+
.values({
169+
workspaceId: activeWorkspace.id,
170+
name: entity.slug,
171+
type: "github-app",
172+
config: {
173+
installationId: entity.installationId,
174+
owner: entity.slug,
175+
},
176+
})
177+
.returning()
178+
.then(takeFirst);
179+
180+
await eventDispatcher.dispatchJobAgentCreated(createdAgent);
174181

175182
return NextResponse.redirect(
176183
`${env.BASE_URL}/${activeWorkspace.slug}/settings/workspace/integrations/github`,

apps/webservice/src/app/api/v1/job-agents/name/route.ts

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ import { z } from "zod";
33

44
import { eq, takeFirst, takeFirstOrNull } from "@ctrlplane/db";
55
import { jobAgent, workspace } from "@ctrlplane/db/schema";
6+
import { eventDispatcher } from "@ctrlplane/events";
67
import { Permission } from "@ctrlplane/validators/auth";
78

89
import { authn, authz } from "~/app/api/v1/auth";
@@ -48,5 +49,7 @@ export const PATCH = request()
4849
.returning()
4950
.then(takeFirst);
5051

52+
await eventDispatcher.dispatchJobAgentCreated(tp);
53+
5154
return NextResponse.json(tp);
5255
});

packages/api/src/router/job.ts

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ import {
1515
takeFirst,
1616
} from "@ctrlplane/db";
1717
import * as schema from "@ctrlplane/db/schema";
18+
import { eventDispatcher } from "@ctrlplane/events";
1819
import { updateJob } from "@ctrlplane/job-dispatch";
1920
import { Permission } from "@ctrlplane/validators/auth";
2021
import { JobAgentType } from "@ctrlplane/validators/jobs";
@@ -86,7 +87,15 @@ const jobAgentRouter = createTRPCRouter({
8687
})
8788
.input(schema.createJobAgent)
8889
.mutation(({ ctx, input }) =>
89-
ctx.db.insert(schema.jobAgent).values(input).returning().then(takeFirst),
90+
ctx.db
91+
.insert(schema.jobAgent)
92+
.values(input)
93+
.returning()
94+
.then(takeFirst)
95+
.then(async (agent) => {
96+
await eventDispatcher.dispatchJobAgentCreated(agent);
97+
return agent;
98+
}),
9099
),
91100

92101
update: protectedProcedure
@@ -103,7 +112,11 @@ const jobAgentRouter = createTRPCRouter({
103112
.set(input.data)
104113
.where(eq(schema.jobAgent.id, input.id))
105114
.returning()
106-
.then(takeFirst),
115+
.then(takeFirst)
116+
.then(async (agent) => {
117+
await eventDispatcher.dispatchJobAgentUpdated(agent);
118+
return agent;
119+
}),
107120
),
108121

109122
history: createTRPCRouter({

packages/events/src/dispatch-jobs.ts

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -324,6 +324,17 @@ export class BullMQEventDispatcher implements EventDispatcher {
324324
await Promise.resolve();
325325
}
326326

327+
async dispatchJobAgentCreated(_: schema.JobAgent): Promise<void> {
328+
return Promise.resolve();
329+
}
330+
async dispatchJobAgentUpdated(_: schema.JobAgent): Promise<void> {
331+
return Promise.resolve();
332+
}
333+
334+
async dispatchJobAgentDeleted(_: schema.JobAgent): Promise<void> {
335+
return Promise.resolve();
336+
}
337+
327338
async dispatchPolicyCreated(policy: FullPolicy): Promise<void> {
328339
await getQueue(Channel.NewPolicy).add(policy.id, policy);
329340
}

packages/events/src/event-dispatcher.ts

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,10 @@ export interface EventDispatcher {
8282
deploymentVariableValue: schema.DeploymentVariableValue,
8383
): Promise<void>;
8484

85+
dispatchJobAgentCreated(jobAgent: schema.JobAgent): Promise<void>;
86+
dispatchJobAgentUpdated(jobAgent: schema.JobAgent): Promise<void>;
87+
dispatchJobAgentDeleted(jobAgent: schema.JobAgent): Promise<void>;
88+
8589
dispatchPolicyCreated(policy: FullPolicy): Promise<void>;
8690
dispatchPolicyUpdated(
8791
previous: FullPolicy,

packages/events/src/kafka/event-dispatch/index.ts

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import * as deploymentVariableDispatch from "./deployment-variable.js";
55
import * as deploymentVersionDispatch from "./deployment-version.js";
66
import * as deploymentDispatch from "./deployment.js";
77
import * as environmentDispatch from "./environment.js";
8+
import * as jobAgentDispatch from "./job-agent.js";
89
import * as jobDispatch from "./job.js";
910
import * as policyDispatch from "./policy.js";
1011
import * as releaseTargetDispatch from "./release-target.js";
@@ -175,6 +176,18 @@ export class KafkaEventDispatcher implements EventDispatcher {
175176
);
176177
}
177178

179+
async dispatchJobAgentCreated(jobAgent: schema.JobAgent): Promise<void> {
180+
await jobAgentDispatch.dispatchJobAgentCreated(jobAgent);
181+
}
182+
183+
async dispatchJobAgentUpdated(jobAgent: schema.JobAgent): Promise<void> {
184+
await jobAgentDispatch.dispatchJobAgentUpdated(jobAgent);
185+
}
186+
187+
async dispatchJobAgentDeleted(jobAgent: schema.JobAgent): Promise<void> {
188+
await jobAgentDispatch.dispatchJobAgentDeleted(jobAgent);
189+
}
190+
178191
async dispatchPolicyCreated(policy: FullPolicy): Promise<void> {
179192
await policyDispatch.dispatchPolicyCreated(policy);
180193
}
Lines changed: 82 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,82 @@
1+
import type * as schema from "@ctrlplane/db/schema";
2+
import type { Span } from "@ctrlplane/logger";
3+
import type { WorkspaceEngine } from "@ctrlplane/workspace-engine-sdk";
4+
5+
import type { GoEventPayload, GoMessage } from "../events.js";
6+
import { createSpanWrapper } from "../../span.js";
7+
import { sendGoEvent } from "../client.js";
8+
import { Event } from "../events.js";
9+
10+
const getOapiJobAgent = (
11+
jobAgent: schema.JobAgent,
12+
): WorkspaceEngine["schemas"]["JobAgent"] => ({
13+
id: jobAgent.id,
14+
workspaceId: jobAgent.workspaceId,
15+
name: jobAgent.name,
16+
type: jobAgent.type,
17+
config: jobAgent.config,
18+
});
19+
20+
const convertJobAgentToGoEvent = (
21+
jobAgent: schema.JobAgent,
22+
eventType: keyof GoEventPayload,
23+
): GoMessage<keyof GoEventPayload> => ({
24+
workspaceId: jobAgent.workspaceId,
25+
eventType,
26+
data: getOapiJobAgent(jobAgent),
27+
timestamp: Date.now(),
28+
});
29+
30+
export const dispatchJobAgentCreated = createSpanWrapper(
31+
"dispatchJobAgentCreated",
32+
async (span: Span, jobAgent: schema.JobAgent) => {
33+
span.setAttribute("job-agent.id", jobAgent.id);
34+
span.setAttribute("job-agent.workspaceId", jobAgent.workspaceId);
35+
span.setAttribute("job-agent.name", jobAgent.name);
36+
span.setAttribute("job-agent.type", jobAgent.type);
37+
span.setAttribute(
38+
"job-agent.config",
39+
JSON.stringify(jobAgent.config, null, 2),
40+
);
41+
42+
await sendGoEvent(
43+
convertJobAgentToGoEvent(jobAgent, Event.JobAgentCreated),
44+
);
45+
},
46+
);
47+
48+
export const dispatchJobAgentUpdated = createSpanWrapper(
49+
"dispatchJobAgentUpdated",
50+
async (span: Span, jobAgent: schema.JobAgent) => {
51+
span.setAttribute("job-agent.id", jobAgent.id);
52+
span.setAttribute("job-agent.workspaceId", jobAgent.workspaceId);
53+
span.setAttribute("job-agent.name", jobAgent.name);
54+
span.setAttribute("job-agent.type", jobAgent.type);
55+
span.setAttribute(
56+
"job-agent.config",
57+
JSON.stringify(jobAgent.config, null, 2),
58+
);
59+
60+
await sendGoEvent(
61+
convertJobAgentToGoEvent(jobAgent, Event.JobAgentUpdated),
62+
);
63+
},
64+
);
65+
66+
export const dispatchJobAgentDeleted = createSpanWrapper(
67+
"dispatchJobAgentDeleted",
68+
async (span: Span, jobAgent: schema.JobAgent) => {
69+
span.setAttribute("job-agent.id", jobAgent.id);
70+
span.setAttribute("job-agent.workspaceId", jobAgent.workspaceId);
71+
span.setAttribute("job-agent.name", jobAgent.name);
72+
span.setAttribute("job-agent.type", jobAgent.type);
73+
span.setAttribute(
74+
"job-agent.config",
75+
JSON.stringify(jobAgent.config, null, 2),
76+
);
77+
78+
await sendGoEvent(
79+
convertJobAgentToGoEvent(jobAgent, Event.JobAgentDeleted),
80+
);
81+
},
82+
);

packages/events/src/kafka/events.ts

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,10 @@ export enum Event {
3232
DeploymentVersionUpdated = "deployment-version.updated",
3333
DeploymentVersionDeleted = "deployment-version.deleted",
3434

35+
JobAgentCreated = "job-agent.created",
36+
JobAgentUpdated = "job-agent.updated",
37+
JobAgentDeleted = "job-agent.deleted",
38+
3539
EnvironmentCreated = "environment.created",
3640
EnvironmentUpdated = "environment.updated",
3741
EnvironmentDeleted = "environment.deleted",
@@ -117,6 +121,9 @@ export type EventPayload = {
117121
current: schema.DeploymentVersion;
118122
};
119123
[Event.DeploymentVersionDeleted]: schema.DeploymentVersion;
124+
[Event.JobAgentCreated]: schema.JobAgent;
125+
[Event.JobAgentUpdated]: schema.JobAgent;
126+
[Event.JobAgentDeleted]: schema.JobAgent;
120127
[Event.EnvironmentCreated]: schema.Environment;
121128
[Event.EnvironmentUpdated]: {
122129
previous: schema.Environment;
@@ -163,6 +170,9 @@ export type GoEventPayload = {
163170
[Event.DeploymentVersionCreated]: WorkspaceEngine["schemas"]["DeploymentVersion"];
164171
[Event.DeploymentVersionUpdated]: WorkspaceEngine["schemas"]["DeploymentVersion"];
165172
[Event.DeploymentVersionDeleted]: WorkspaceEngine["schemas"]["DeploymentVersion"];
173+
[Event.JobAgentCreated]: WorkspaceEngine["schemas"]["JobAgent"];
174+
[Event.JobAgentUpdated]: WorkspaceEngine["schemas"]["JobAgent"];
175+
[Event.JobAgentDeleted]: WorkspaceEngine["schemas"]["JobAgent"];
166176
[Event.EnvironmentCreated]: WorkspaceEngine["schemas"]["Environment"];
167177
[Event.EnvironmentUpdated]: WorkspaceEngine["schemas"]["Environment"];
168178
[Event.EnvironmentDeleted]: WorkspaceEngine["schemas"]["Environment"];

0 commit comments

Comments
 (0)