Skip to content

Commit 2dd5ff4

Browse files
chore: send system events
1 parent b43d24c commit 2dd5ff4

File tree

9 files changed

+143
-5
lines changed

9 files changed

+143
-5
lines changed

apps/event-queue/src/events/index.ts

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,12 @@ const workspaceHandlers: Record<Event, Handler<any>> = {
7070
[Event.PolicyDeleted]: deletedPolicy,
7171
[Event.JobUpdated]: updateJob,
7272
[Event.EvaluateReleaseTarget]: evaluateReleaseTarget,
73+
[Event.SystemCreated]: () => Promise.resolve(),
74+
[Event.SystemUpdated]: () => Promise.resolve(),
75+
[Event.SystemDeleted]: () => Promise.resolve(),
76+
[Event.UserApprovalRecordCreated]: () => Promise.resolve(),
77+
[Event.UserApprovalRecordUpdated]: () => Promise.resolve(),
78+
[Event.UserApprovalRecordDeleted]: () => Promise.resolve(),
7379
};
7480

7581
export type Handler<T extends keyof EventPayload> = (

apps/webservice/src/app/api/v1/systems/[systemId]/route.ts

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import httpStatus from "http-status";
44

55
import { eq, takeFirst } from "@ctrlplane/db";
66
import * as schema from "@ctrlplane/db/schema";
7+
import { eventDispatcher } from "@ctrlplane/events";
78
import { Permission } from "@ctrlplane/validators/auth";
89

910
import { authn, authz } from "../../auth";
@@ -58,6 +59,10 @@ export const PATCH = request()
5859
.where(eq(schema.system.id, systemId))
5960
.returning()
6061
.then(takeFirst)
62+
.then(async (system) => {
63+
await eventDispatcher.dispatchSystemUpdated(system);
64+
return system;
65+
})
6166
.then((system) => NextResponse.json(system, { status: httpStatus.OK }))
6267
.catch((error) =>
6368
NextResponse.json(
@@ -93,6 +98,10 @@ export const DELETE = request()
9398
.where(eq(schema.system.id, systemId))
9499
.returning()
95100
.then(takeFirst)
101+
.then(async (system) => {
102+
await eventDispatcher.dispatchSystemDeleted(system);
103+
return system;
104+
})
96105
.then((system) =>
97106
NextResponse.json(
98107
{ message: "System deleted", system },

apps/webservice/src/app/api/v1/systems/route.ts

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import { z } from "zod";
55

66
import { and, eq, takeFirst, takeFirstOrNull } from "@ctrlplane/db";
77
import * as schema from "@ctrlplane/db/schema";
8+
import { eventDispatcher } from "@ctrlplane/events";
89
import { logger } from "@ctrlplane/logger";
910
import { Permission } from "@ctrlplane/validators/auth";
1011

@@ -38,24 +39,26 @@ export const POST = request()
3839
.then(takeFirstOrNull);
3940

4041
if (existingSystem != null) {
41-
// Update existing system
4242
const updatedSystem = await ctx.db
4343
.update(schema.system)
4444
.set(ctx.body)
4545
.where(eq(schema.system.id, existingSystem.id))
4646
.returning()
4747
.then(takeFirst);
4848

49+
await eventDispatcher.dispatchSystemUpdated(updatedSystem);
50+
4951
return NextResponse.json(updatedSystem, { status: httpStatus.OK });
5052
}
5153

52-
// Create new system
5354
const newSystem = await ctx.db
5455
.insert(schema.system)
5556
.values(ctx.body)
5657
.returning()
5758
.then(takeFirst);
5859

60+
await eventDispatcher.dispatchSystemCreated(newSystem);
61+
5962
return NextResponse.json(newSystem, { status: httpStatus.CREATED });
6063
} catch (error) {
6164
if (error instanceof z.ZodError)

packages/api/src/router/system.ts

Lines changed: 21 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ import {
2828
updateSystem,
2929
workspace,
3030
} from "@ctrlplane/db/schema";
31+
import { eventDispatcher } from "@ctrlplane/events";
3132
import { Permission } from "@ctrlplane/validators/auth";
3233
import {
3334
ComparisonOperator,
@@ -199,11 +200,20 @@ export const systemRouter = createTRPCRouter({
199200
.returning()
200201
.then(takeFirst);
201202

202-
await Promise.all([
203+
await eventDispatcher.dispatchSystemCreated(sys);
204+
205+
const defaultEnvs = await Promise.all([
203206
upsertEnv(db, { systemId: sys.id, name: "Production" }),
204207
upsertEnv(db, { systemId: sys.id, name: "QA" }),
205208
upsertEnv(db, { systemId: sys.id, name: "Staging" }),
206209
]);
210+
211+
await Promise.all(
212+
defaultEnvs.map((env) =>
213+
eventDispatcher.dispatchEnvironmentCreated(env),
214+
),
215+
);
216+
207217
return sys;
208218
}),
209219
),
@@ -222,7 +232,11 @@ export const systemRouter = createTRPCRouter({
222232
.set(input.data)
223233
.where(eq(system.id, input.id))
224234
.returning()
225-
.then(takeFirst),
235+
.then(takeFirst)
236+
.then((sys) => {
237+
eventDispatcher.dispatchSystemUpdated(sys);
238+
return sys;
239+
}),
226240
),
227241

228242
delete: protectedProcedure
@@ -238,7 +252,11 @@ export const systemRouter = createTRPCRouter({
238252
.delete(system)
239253
.where(eq(system.id, input))
240254
.returning()
241-
.then(takeFirst),
255+
.then(takeFirst)
256+
.then((sys) => {
257+
eventDispatcher.dispatchSystemDeleted(sys);
258+
return sys;
259+
}),
242260
),
243261

244262
resources: protectedProcedure

packages/events/src/dispatch-jobs.ts

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -163,6 +163,18 @@ export const dispatchQueueJob = () => ({
163163
});
164164

165165
export class BullMQEventDispatcher implements EventDispatcher {
166+
async dispatchSystemCreated(_: schema.System): Promise<void> {
167+
return Promise.resolve();
168+
}
169+
170+
async dispatchSystemUpdated(_: schema.System): Promise<void> {
171+
return Promise.resolve();
172+
}
173+
174+
async dispatchSystemDeleted(_: schema.System): Promise<void> {
175+
return Promise.resolve();
176+
}
177+
166178
async dispatchResourceCreated(resource: schema.Resource): Promise<void> {
167179
await getQueue(Channel.NewResource).add(resource.id, resource);
168180
}

packages/events/src/event-dispatcher.ts

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,10 @@ export type FullPolicy = schema.Policy & {
1313
};
1414

1515
export interface EventDispatcher {
16+
dispatchSystemCreated(system: schema.System): Promise<void>;
17+
dispatchSystemUpdated(system: schema.System): Promise<void>;
18+
dispatchSystemDeleted(system: schema.System): Promise<void>;
19+
1620
dispatchResourceCreated(resource: schema.Resource): Promise<void>;
1721
dispatchResourceUpdated(
1822
previous: schema.Resource,

packages/events/src/kafka/event-dispatch/index.ts

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,9 +9,22 @@ import * as jobDispatch from "./job.js";
99
import * as policyDispatch from "./policy.js";
1010
import * as releaseTargetDispatch from "./release-target.js";
1111
import * as resourceDispatch from "./resource.js";
12+
import * as systemDispatch from "./system.js";
1213
import * as userApprovalRecordDispatch from "./user-approval-record.js";
1314

1415
export class KafkaEventDispatcher implements EventDispatcher {
16+
async dispatchSystemCreated(system: schema.System): Promise<void> {
17+
await systemDispatch.dispatchSystemCreated(system);
18+
}
19+
20+
async dispatchSystemUpdated(system: schema.System): Promise<void> {
21+
await systemDispatch.dispatchSystemUpdated(system);
22+
}
23+
24+
async dispatchSystemDeleted(system: schema.System): Promise<void> {
25+
await systemDispatch.dispatchSystemDeleted(system);
26+
}
27+
1528
async dispatchResourceCreated(resource: schema.Resource): Promise<void> {
1629
await resourceDispatch.dispatchResourceCreated(resource);
1730
}
Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,63 @@
1+
import type * as schema from "@ctrlplane/db/schema";
2+
import type { Span } from "@ctrlplane/logger";
3+
import type { WorkspaceEngine } from "@ctrlplane/workspace-engine-sdk";
4+
5+
import type { GoEventPayload, GoMessage } from "../events.js";
6+
import { createSpanWrapper } from "../../span.js";
7+
import { sendGoEvent } from "../client.js";
8+
import { Event } from "../events.js";
9+
10+
const getOapiSystem = (
11+
system: schema.System,
12+
): WorkspaceEngine["schemas"]["System"] => ({
13+
id: system.id,
14+
workspaceId: system.workspaceId,
15+
name: system.name,
16+
description: system.description,
17+
});
18+
19+
const convertSystemToGoEvent = (
20+
system: schema.System,
21+
eventType: keyof GoEventPayload,
22+
): GoMessage<keyof GoEventPayload> => ({
23+
workspaceId: system.workspaceId,
24+
eventType,
25+
data: getOapiSystem(system),
26+
timestamp: Date.now(),
27+
});
28+
29+
export const dispatchSystemCreated = createSpanWrapper(
30+
"dispatchSystemCreated",
31+
async (span: Span, system: schema.System) => {
32+
span.setAttribute("system.id", system.id);
33+
span.setAttribute("system.workspaceId", system.workspaceId);
34+
span.setAttribute("system.name", system.name);
35+
span.setAttribute("system.description", system.description);
36+
37+
await sendGoEvent(convertSystemToGoEvent(system, Event.SystemCreated));
38+
},
39+
);
40+
41+
export const dispatchSystemUpdated = createSpanWrapper(
42+
"dispatchSystemUpdated",
43+
async (span: Span, system: schema.System) => {
44+
span.setAttribute("system.id", system.id);
45+
span.setAttribute("system.workspaceId", system.workspaceId);
46+
span.setAttribute("system.name", system.name);
47+
span.setAttribute("system.description", system.description);
48+
49+
await sendGoEvent(convertSystemToGoEvent(system, Event.SystemUpdated));
50+
},
51+
);
52+
53+
export const dispatchSystemDeleted = createSpanWrapper(
54+
"dispatchSystemDeleted",
55+
async (span: Span, system: schema.System) => {
56+
span.setAttribute("system.id", system.id);
57+
span.setAttribute("system.workspaceId", system.workspaceId);
58+
span.setAttribute("system.name", system.name);
59+
span.setAttribute("system.description", system.description);
60+
61+
await sendGoEvent(convertSystemToGoEvent(system, Event.SystemDeleted));
62+
},
63+
);

packages/events/src/kafka/events.ts

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,10 @@ import type { WorkspaceEngine } from "@ctrlplane/workspace-engine-sdk";
44
import type * as PB from "../workspace-engine/types/index.js";
55

66
export enum Event {
7+
SystemCreated = "system.created",
8+
SystemUpdated = "system.updated",
9+
SystemDeleted = "system.deleted",
10+
711
ResourceCreated = "resource.created",
812
ResourceUpdated = "resource.updated",
913
ResourceDeleted = "resource.deleted",
@@ -130,6 +134,9 @@ export type EventPayload = {
130134
[Event.UserApprovalRecordCreated]: schema.PolicyRuleAnyApprovalRecord;
131135
[Event.UserApprovalRecordUpdated]: schema.PolicyRuleAnyApprovalRecord;
132136
[Event.UserApprovalRecordDeleted]: schema.PolicyRuleAnyApprovalRecord;
137+
[Event.SystemCreated]: schema.System;
138+
[Event.SystemUpdated]: schema.System;
139+
[Event.SystemDeleted]: schema.System;
133140
// [Event.JobCreated]: schema.Job;
134141
// [Event.JobDeleted]: schema.Job;
135142
// [Event.SystemCreated]: schema.System;
@@ -138,6 +145,9 @@ export type EventPayload = {
138145
};
139146

140147
export type GoEventPayload = {
148+
[Event.SystemCreated]: WorkspaceEngine["schemas"]["System"];
149+
[Event.SystemUpdated]: WorkspaceEngine["schemas"]["System"];
150+
[Event.SystemDeleted]: WorkspaceEngine["schemas"]["System"];
141151
[Event.ResourceCreated]: WorkspaceEngine["schemas"]["Resource"];
142152
[Event.ResourceUpdated]: WorkspaceEngine["schemas"]["Resource"];
143153
[Event.ResourceDeleted]: WorkspaceEngine["schemas"]["Resource"];

0 commit comments

Comments
 (0)