Skip to content

Commit c0d01c1

Browse files
chore: don't hardcode event type in conversion func
1 parent da17999 commit c0d01c1

File tree

7 files changed

+203
-55
lines changed

7 files changed

+203
-55
lines changed

packages/events/src/kafka/event-dispatch/deployment-variable.ts

Lines changed: 16 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import { eq, takeFirst, takeFirstOrNull } from "@ctrlplane/db";
55
import { db } from "@ctrlplane/db/client";
66
import * as schema from "@ctrlplane/db/schema";
77

8+
import type { GoEventPayload, GoMessage } from "../events.js";
89
import { createSpanWrapper } from "../../span.js";
910
import { sendGoEvent, sendNodeEvent } from "../client.js";
1011
import { Event } from "../events.js";
@@ -34,9 +35,10 @@ const getWorkspaceIdForVariable = async (variableId: string) =>
3435
const convertDeploymentVariableToNodeEvent = (
3536
deploymentVariable: schema.DeploymentVariable,
3637
workspaceId: string,
38+
eventType: Event,
3739
) => ({
3840
workspaceId,
39-
eventType: Event.DeploymentVariableCreated,
41+
eventType,
4042
eventId: deploymentVariable.id,
4143
timestamp: Date.now(),
4244
source: "api" as const,
@@ -91,9 +93,10 @@ const getOapiDeploymentVariable = async (
9193
const convertDeploymentVariableToGoEvent = async (
9294
deploymentVariable: schema.DeploymentVariable,
9395
workspaceId: string,
94-
) => ({
96+
eventType: keyof GoEventPayload,
97+
): Promise<GoMessage<keyof GoEventPayload>> => ({
9598
workspaceId,
96-
eventType: Event.DeploymentVariableCreated as const,
99+
eventType,
97100
data: await getOapiDeploymentVariable(deploymentVariable),
98101
timestamp: Date.now(),
99102
});
@@ -110,13 +113,17 @@ export const dispatchDeploymentVariableCreated = createSpanWrapper(
110113
);
111114
span.setAttribute("workspace.id", workspaceId);
112115

116+
const eventType = Event.DeploymentVariableCreated;
117+
113118
const nodeEvent = convertDeploymentVariableToNodeEvent(
114119
deploymentVariable,
115120
workspaceId,
121+
eventType,
116122
);
117123
const goEvent = await convertDeploymentVariableToGoEvent(
118124
deploymentVariable,
119125
workspaceId,
126+
eventType as keyof GoEventPayload,
120127
);
121128
await Promise.all([sendNodeEvent(nodeEvent), sendGoEvent(goEvent)]);
122129
},
@@ -135,10 +142,11 @@ export const dispatchDeploymentVariableUpdated = createSpanWrapper(
135142

136143
const workspaceId = await getWorkspaceIdForDeployment(current.deploymentId);
137144
span.setAttribute("workspace.id", workspaceId);
145+
const eventType = Event.DeploymentVariableUpdated;
138146

139147
const nodeEvent = {
140148
workspaceId,
141-
eventType: Event.DeploymentVariableUpdated,
149+
eventType,
142150
eventId: current.id,
143151
timestamp: Date.now(),
144152
source: "api" as const,
@@ -148,6 +156,7 @@ export const dispatchDeploymentVariableUpdated = createSpanWrapper(
148156
const goEvent = await convertDeploymentVariableToGoEvent(
149157
current,
150158
workspaceId,
159+
eventType as keyof GoEventPayload,
151160
);
152161
await Promise.all([sendNodeEvent(nodeEvent), sendGoEvent(goEvent)]);
153162
},
@@ -165,13 +174,16 @@ export const dispatchDeploymentVariableDeleted = createSpanWrapper(
165174
);
166175
span.setAttribute("workspace.id", workspaceId);
167176

177+
const eventType = Event.DeploymentVariableDeleted;
168178
const nodeEvent = convertDeploymentVariableToNodeEvent(
169179
deploymentVariable,
170180
workspaceId,
181+
eventType,
171182
);
172183
const goEvent = await convertDeploymentVariableToGoEvent(
173184
deploymentVariable,
174185
workspaceId,
186+
eventType as keyof GoEventPayload,
175187
);
176188
await Promise.all([sendNodeEvent(nodeEvent), sendGoEvent(goEvent)]);
177189
},

packages/events/src/kafka/event-dispatch/deployment-version.ts

Lines changed: 37 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import { eq, takeFirst } from "@ctrlplane/db";
66
import { db as dbClient } from "@ctrlplane/db/client";
77
import * as schema from "@ctrlplane/db/schema";
88

9+
import type { GoEventPayload, GoMessage } from "../events.js";
910
import { createSpanWrapper } from "../../span.js";
1011
import { sendGoEvent, sendNodeEvent } from "../client.js";
1112
import { Event } from "../events.js";
@@ -26,9 +27,10 @@ const getWorkspaceId = async (tx: Tx, deploymentVersionId: string) =>
2627
const convertVersionToNodeEvent = (
2728
deploymentVersion: schema.DeploymentVersion,
2829
workspaceId: string,
30+
eventType: Event,
2931
) => ({
3032
workspaceId,
31-
eventType: Event.DeploymentVersionCreated,
33+
eventType,
3234
eventId: deploymentVersion.id,
3335
timestamp: Date.now(),
3436
source: "api" as const,
@@ -52,9 +54,10 @@ const getOapiDeploymentVersion = (
5254
const convertVersionToGoEvent = (
5355
deploymentVersion: schema.DeploymentVersion,
5456
workspaceId: string,
55-
) => ({
57+
eventType: keyof GoEventPayload,
58+
): GoMessage<keyof GoEventPayload> => ({
5659
workspaceId,
57-
eventType: Event.DeploymentVersionCreated as const,
60+
eventType,
5861
data: getOapiDeploymentVersion(deploymentVersion),
5962
timestamp: Date.now(),
6063
});
@@ -70,9 +73,18 @@ export const dispatchDeploymentVersionCreated = createSpanWrapper(
7073
const workspaceId = await getWorkspaceId(tx, deploymentVersion.id);
7174
span.setAttribute("workspace.id", workspaceId);
7275

76+
const eventType = Event.DeploymentVersionCreated;
7377
await Promise.all([
74-
sendNodeEvent(convertVersionToNodeEvent(deploymentVersion, workspaceId)),
75-
sendGoEvent(convertVersionToGoEvent(deploymentVersion, workspaceId)),
78+
sendNodeEvent(
79+
convertVersionToNodeEvent(deploymentVersion, workspaceId, eventType),
80+
),
81+
sendGoEvent(
82+
convertVersionToGoEvent(
83+
deploymentVersion,
84+
workspaceId,
85+
eventType as keyof GoEventPayload,
86+
),
87+
),
7688
]);
7789
},
7890
);
@@ -93,16 +105,23 @@ export const dispatchDeploymentVersionUpdated = createSpanWrapper(
93105
const workspaceId = await getWorkspaceId(tx, current.id);
94106
span.setAttribute("workspace.id", workspaceId);
95107

108+
const eventType = Event.DeploymentVersionUpdated;
96109
await Promise.all([
97110
sendNodeEvent({
98111
workspaceId,
99-
eventType: Event.DeploymentVersionUpdated,
112+
eventType,
100113
eventId: current.id,
101114
timestamp: Date.now(),
102115
source: "api" as const,
103116
payload: { previous, current },
104117
}),
105-
sendGoEvent(convertVersionToGoEvent(current, workspaceId)),
118+
sendGoEvent(
119+
convertVersionToGoEvent(
120+
current,
121+
workspaceId,
122+
eventType as keyof GoEventPayload,
123+
),
124+
),
106125
]);
107126
},
108127
);
@@ -118,9 +137,18 @@ export const dispatchDeploymentVersionDeleted = createSpanWrapper(
118137
const workspaceId = await getWorkspaceId(tx, deploymentVersion.id);
119138
span.setAttribute("workspace.id", workspaceId);
120139

140+
const eventType = Event.DeploymentVersionDeleted;
121141
await Promise.all([
122-
sendNodeEvent(convertVersionToNodeEvent(deploymentVersion, workspaceId)),
123-
sendGoEvent(convertVersionToGoEvent(deploymentVersion, workspaceId)),
142+
sendNodeEvent(
143+
convertVersionToNodeEvent(deploymentVersion, workspaceId, eventType),
144+
),
145+
sendGoEvent(
146+
convertVersionToGoEvent(
147+
deploymentVersion,
148+
workspaceId,
149+
eventType as keyof GoEventPayload,
150+
),
151+
),
124152
]);
125153
},
126154
);

packages/events/src/kafka/event-dispatch/deployment.ts

Lines changed: 31 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -21,9 +21,10 @@ const getSystem = async (tx: Tx, systemId: string) =>
2121
const convertDeploymentToNodeEvent = (
2222
deployment: schema.Deployment,
2323
workspaceId: string,
24+
eventType: Event,
2425
) => ({
2526
workspaceId,
26-
eventType: Event.DeploymentCreated,
27+
eventType,
2728
eventId: deployment.id,
2829
timestamp: Date.now(),
2930
source: "api" as const,
@@ -46,9 +47,10 @@ const getOapiDeployment = (
4647
const convertDeploymentToGoEvent = (
4748
deployment: schema.Deployment,
4849
workspaceId: string,
50+
eventType: keyof GoEventPayload,
4951
) => ({
5052
workspaceId,
51-
eventType: Event.DeploymentCreated as const,
53+
eventType,
5254
data: getOapiDeployment(deployment),
5355
timestamp: Date.now(),
5456
});
@@ -64,11 +66,18 @@ export const dispatchDeploymentCreated = createSpanWrapper(
6466
const system = await getSystem(tx, deployment.systemId);
6567
span.setAttribute("workspace.id", system.workspaceId);
6668

69+
const eventType = Event.DeploymentCreated;
6770
await Promise.all([
6871
sendNodeEvent(
69-
convertDeploymentToNodeEvent(deployment, system.workspaceId),
72+
convertDeploymentToNodeEvent(deployment, system.workspaceId, eventType),
73+
),
74+
sendGoEvent(
75+
convertDeploymentToGoEvent(
76+
deployment,
77+
system.workspaceId,
78+
eventType as keyof GoEventPayload,
79+
),
7080
),
71-
sendGoEvent(convertDeploymentToGoEvent(deployment, system.workspaceId)),
7281
]);
7382
},
7483
);
@@ -89,16 +98,23 @@ export const dispatchDeploymentUpdated = createSpanWrapper(
8998
const system = await getSystem(tx, current.systemId);
9099
span.setAttribute("workspace.id", system.workspaceId);
91100

101+
const eventType = Event.DeploymentUpdated;
92102
await Promise.all([
93103
sendNodeEvent({
94104
workspaceId: system.workspaceId,
95-
eventType: Event.DeploymentUpdated,
105+
eventType,
96106
eventId: current.id,
97107
timestamp: Date.now(),
98108
source: "api" as const,
99109
payload: { previous, current },
100110
}),
101-
sendGoEvent(convertDeploymentToGoEvent(current, system.workspaceId)),
111+
sendGoEvent(
112+
convertDeploymentToGoEvent(
113+
current,
114+
system.workspaceId,
115+
eventType as keyof GoEventPayload,
116+
),
117+
),
102118
]);
103119
},
104120
);
@@ -114,11 +130,18 @@ export const dispatchDeploymentDeleted = createSpanWrapper(
114130
const system = await getSystem(tx, deployment.systemId);
115131
span.setAttribute("workspace.id", system.workspaceId);
116132

133+
const eventType = Event.DeploymentDeleted;
117134
await Promise.all([
118135
sendNodeEvent(
119-
convertDeploymentToNodeEvent(deployment, system.workspaceId),
136+
convertDeploymentToNodeEvent(deployment, system.workspaceId, eventType),
137+
),
138+
sendGoEvent(
139+
convertDeploymentToGoEvent(
140+
deployment,
141+
system.workspaceId,
142+
eventType as keyof GoEventPayload,
143+
),
120144
),
121-
sendGoEvent(convertDeploymentToGoEvent(deployment, system.workspaceId)),
122145
]);
123146
},
124147
);

packages/events/src/kafka/event-dispatch/environment.ts

Lines changed: 39 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -21,9 +21,10 @@ const getSystem = async (tx: Tx, systemId: string) =>
2121
const convertEnvironmentToNodeEvent = (
2222
environment: schema.Environment,
2323
workspaceId: string,
24+
eventType: Event,
2425
) => ({
2526
workspaceId,
26-
eventType: Event.EnvironmentCreated,
27+
eventType,
2728
eventId: environment.id,
2829
timestamp: Date.now(),
2930
source: "api" as const,
@@ -44,9 +45,10 @@ const getOapiEnvironment = (
4445
const convertEnvironmentToGoEvent = (
4546
environment: schema.Environment,
4647
workspaceId: string,
48+
eventType: keyof GoEventPayload,
4749
) => ({
4850
workspaceId,
49-
eventType: Event.EnvironmentCreated as const,
51+
eventType,
5052
data: getOapiEnvironment(environment),
5153
timestamp: Date.now(),
5254
});
@@ -67,11 +69,22 @@ export const dispatchEnvironmentCreated = createSpanWrapper(
6769
const system = await getSystem(tx, environment.systemId);
6870
span.setAttribute("workspace.id", system.workspaceId);
6971

72+
const eventType = Event.EnvironmentCreated;
7073
await Promise.all([
7174
sendNodeEvent(
72-
convertEnvironmentToNodeEvent(environment, system.workspaceId),
75+
convertEnvironmentToNodeEvent(
76+
environment,
77+
system.workspaceId,
78+
eventType,
79+
),
80+
),
81+
sendGoEvent(
82+
convertEnvironmentToGoEvent(
83+
environment,
84+
system.workspaceId,
85+
eventType as keyof GoEventPayload,
86+
),
7387
),
74-
sendGoEvent(convertEnvironmentToGoEvent(environment, system.workspaceId)),
7588
]);
7689
},
7790
);
@@ -93,16 +106,23 @@ export const dispatchEnvironmentUpdated = createSpanWrapper(
93106
const system = await getSystem(tx, current.systemId);
94107
span.setAttribute("workspace.id", system.workspaceId);
95108

109+
const eventType = Event.EnvironmentUpdated;
96110
await Promise.all([
97111
sendNodeEvent({
98112
workspaceId: system.workspaceId,
99-
eventType: Event.EnvironmentUpdated,
113+
eventType,
100114
eventId: current.id,
101115
timestamp: Date.now(),
102116
source: source ?? "api",
103117
payload: { previous, current },
104118
}),
105-
sendGoEvent(convertEnvironmentToGoEvent(current, system.workspaceId)),
119+
sendGoEvent(
120+
convertEnvironmentToGoEvent(
121+
current,
122+
system.workspaceId,
123+
eventType as keyof GoEventPayload,
124+
),
125+
),
106126
]);
107127
},
108128
);
@@ -123,11 +143,22 @@ export const dispatchEnvironmentDeleted = createSpanWrapper(
123143
const system = await getSystem(tx, environment.systemId);
124144
span.setAttribute("workspace.id", system.workspaceId);
125145

146+
const eventType = Event.EnvironmentDeleted;
126147
await Promise.all([
127148
sendNodeEvent(
128-
convertEnvironmentToNodeEvent(environment, system.workspaceId),
149+
convertEnvironmentToNodeEvent(
150+
environment,
151+
system.workspaceId,
152+
eventType,
153+
),
154+
),
155+
sendGoEvent(
156+
convertEnvironmentToGoEvent(
157+
environment,
158+
system.workspaceId,
159+
eventType as keyof GoEventPayload,
160+
),
129161
),
130-
sendGoEvent(convertEnvironmentToGoEvent(environment, system.workspaceId)),
131162
]);
132163
},
133164
);

0 commit comments

Comments
 (0)