Skip to content

Commit 6a71c38

Browse files
committed
feat(pb): get multi-actors working e2e, add resources for builds
1 parent 869e278 commit 6a71c38

File tree

147 files changed

+2317
-4371
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

147 files changed

+2317
-4371
lines changed

Cargo.lock

Lines changed: 313 additions & 3703 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

docs-internal/infrastructure/pegboard/ISOLATE_RUNNER.md

Lines changed: 0 additions & 19 deletions
This file was deleted.

examples/system-test-actor/Dockerfile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,4 +23,4 @@ RUN chown -R rivet:rivet /app/dist
2323
USER rivet
2424

2525
# Start the server
26-
CMD ["node", "dist/src/container/main.js"]
26+
CMD ["node", "dist/src/main.js"]

examples/system-test-actor/rivet.jsonc

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,6 @@
11
{
22
"builds": {
3-
// "ws-isolate": {
4-
// "script": "src/isolate/main.ts"
5-
// },
6-
"ws-container": {
3+
"ws": {
74
"dockerfile": "Dockerfile"
85
}
96
}

examples/system-test-actor/src/container/main.ts

Lines changed: 0 additions & 52 deletions
This file was deleted.

examples/system-test-actor/src/shared/server.ts renamed to examples/system-test-actor/src/httpServer.ts

Lines changed: 4 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ import type { UpgradeWebSocket } from "hono/ws";
33

44
type GetUpgradeWebSocketFn = (app: Hono) => UpgradeWebSocket;
55

6-
export function createAndStartServer(
6+
export function createAndStartHttpServer(
77
getUpgradeWebSocket: GetUpgradeWebSocketFn,
88
): { app: Hono; port: number } {
99
// Setup auto-exit timer
@@ -12,8 +12,7 @@ export function createAndStartServer(
1212
"Actor should've been destroyed by now. Automatically exiting.",
1313
);
1414

15-
if (typeof Deno !== "undefined") Deno.exit(1);
16-
else process.exit(1);
15+
process.exit(1);
1716
}, 60 * 1000);
1817

1918
let tickIndex = 0;
@@ -23,10 +22,7 @@ export function createAndStartServer(
2322
}, 1000);
2423

2524
// Get port from environment
26-
const portEnv =
27-
typeof Deno !== "undefined"
28-
? Deno.env.get("PORT_HTTP")
29-
: process.env.PORT_HTTP;
25+
const portEnv = process.env.PORT_HTTP;
3026
if (!portEnv) {
3127
throw new Error("missing PORT_HTTP");
3228
}
@@ -41,8 +37,7 @@ export function createAndStartServer(
4137
const query = c.req.query("code");
4238
const exitCode = query ? Number(query) : 0;
4339

44-
if (typeof Deno != "undefined") Deno.exit(exitCode);
45-
else process.exit(exitCode);
40+
process.exit(exitCode);
4641

4742
return c.text("unreachable");
4843
});

examples/system-test-actor/src/isolate/main.ts

Lines changed: 0 additions & 83 deletions
This file was deleted.
Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
import { serve } from "@hono/node-server";
2+
import { createNodeWebSocket } from "@hono/node-ws";
3+
import { createAndStartHttpServer } from "./httpServer.js";
4+
import { createAndStartUdpServer } from "./udpServer.js";
5+
import { connectToManager } from "./managerClient.js";
6+
7+
let injectWebSocket: any;
8+
const { app, port } = createAndStartHttpServer((app) => {
9+
// Get Node.js WebSocket handler
10+
const result = createNodeWebSocket({ app });
11+
injectWebSocket = result.injectWebSocket;
12+
return result.upgradeWebSocket;
13+
});
14+
15+
const server = serve({ fetch: app.fetch, port });
16+
injectWebSocket(server);
17+
18+
createAndStartUdpServer();
19+
20+
if (process.env.MULTI) {
21+
connectToManager();
22+
}
Lines changed: 85 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,85 @@
1+
import WebSocket from "ws";
2+
3+
export function connectToManager() {
4+
let managerIp = process.env.RIVET_MANAGER_IP;
5+
let managerPort = process.env.RIVET_MANAGER_PORT;
6+
let pingInterval: NodeJS.Timeout;
7+
8+
if (!managerIp || !managerPort) {
9+
console.error("Missing RIVET_MANAGER_IP or RIVET_MANAGER_PORT environment variables");
10+
return;
11+
}
12+
13+
let wsUrl = `ws://${managerIp}:${managerPort}`;
14+
console.log(`Connecting to manager WebSocket at ${wsUrl}`);
15+
16+
let ws = new WebSocket(wsUrl);
17+
18+
ws.on("open", () => {
19+
console.log("Connected to manager WebSocket");
20+
21+
let message = {
22+
init: {
23+
runner_id: process.env.RIVET_RUNNER_ID
24+
}
25+
};
26+
let buffer = Buffer.from(JSON.stringify(message));
27+
ws.send(buffer);
28+
29+
// Start ping loop to keep connection alive
30+
pingInterval = setInterval(() => {
31+
if (ws.readyState === WebSocket.OPEN) {
32+
ws.ping();
33+
}
34+
}, 2000);
35+
});
36+
37+
ws.on("message", (data) => {
38+
let json = data.toString();
39+
40+
console.log("Received message from manager:", json);
41+
42+
let packet = JSON.parse(json);
43+
44+
if (packet.start_actor) {
45+
let message = {
46+
actor_state_update: {
47+
actor_id: packet.start_actor.actor_id,
48+
generation: packet.start_actor.generation,
49+
state: {
50+
running: null,
51+
},
52+
}
53+
};
54+
let buffer = Buffer.from(JSON.stringify(message));
55+
ws.send(buffer);
56+
} else if (packet.signal_actor) {
57+
let message = {
58+
actor_state_update: {
59+
actor_id: packet.start_actor.actor_id,
60+
generation: packet.start_actor.generation,
61+
state: {
62+
exited: {
63+
exit_code: 0,
64+
}
65+
},
66+
}
67+
};
68+
let buffer = Buffer.from(JSON.stringify(message));
69+
ws.send(buffer);
70+
}
71+
});
72+
73+
ws.on("error", (error) => {
74+
console.error("WebSocket error:", error);
75+
});
76+
77+
ws.on("close", code => {
78+
console.log("WebSocket connection closed, attempting to reconnect...", code);
79+
80+
// Clear ping interval when connection closes
81+
if (pingInterval) clearInterval(pingInterval);
82+
83+
setTimeout(connectToManager, 5000);
84+
});
85+
}
Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
import dgram from 'dgram';
2+
3+
export function createAndStartUdpServer() {
4+
// Get port from environment
5+
const portEnv = process.env.PORT_UDP;
6+
7+
if (portEnv) {
8+
// Create a UDP socket
9+
const udpServer = dgram.createSocket('udp4');
10+
11+
// Listen for incoming messages
12+
udpServer.on('message', (msg, rinfo) => {
13+
console.log(`UDP server received: ${msg} from ${rinfo.address}:${rinfo.port}`);
14+
15+
// Echo the message back to the sender
16+
udpServer.send(msg, rinfo.port, rinfo.address, (err) => {
17+
if (err) console.error('Failed to send UDP response:', err);
18+
});
19+
});
20+
21+
// Handle errors
22+
udpServer.on('error', (err) => {
23+
console.error('UDP server error:', err);
24+
udpServer.close();
25+
});
26+
27+
28+
const port2 = Number.parseInt(portEnv);
29+
30+
udpServer.bind(port2, () => {
31+
console.log(`UDP echo server running on port ${port2}`);
32+
});
33+
} else {
34+
console.warn("missing PORT_UDP");
35+
}
36+
}

0 commit comments

Comments
 (0)