Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13,882 changes: 5,240 additions & 8,642 deletions Cargo.lock

Large diffs are not rendered by default.

19 changes: 0 additions & 19 deletions docs-internal/infrastructure/pegboard/ISOLATE_RUNNER.md

This file was deleted.

2 changes: 1 addition & 1 deletion examples/system-test-actor/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -23,4 +23,4 @@ RUN chown -R rivet:rivet /app/dist
USER rivet

# Start the server
CMD ["node", "dist/src/container/main.js"]
CMD ["node", "dist/src/main.js"]
9 changes: 2 additions & 7 deletions examples/system-test-actor/rivet.jsonc
Original file line number Diff line number Diff line change
@@ -1,11 +1,6 @@
{
"actors": {
"ws-isolate": {
"script": "src/isolate/main.ts"
}
},
"containers": {
"ws-container": {
"builds": {
"ws": {
"dockerfile": "Dockerfile"
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,18 +3,17 @@ import type { UpgradeWebSocket } from "hono/ws";

type GetUpgradeWebSocketFn = (app: Hono) => UpgradeWebSocket;

export function createAndStartServer(
export function createAndStartHttpServer(
getUpgradeWebSocket: GetUpgradeWebSocketFn,
): { app: Hono; port: number } {
// Setup auto-exit timer
// setTimeout(() => {
// console.error(
// "Actor should've been destroyed by now. Automatically exiting.",
// );
//
// if (typeof Deno !== "undefined") Deno.exit(1);
// else process.exit(1);
// }, 60 * 1000);
setTimeout(() => {
console.error(
"Actor should've been destroyed by now. Automatically exiting.",
);

process.exit(1);
}, 60 * 1000);

let tickIndex = 0;
setInterval(() => {
Expand All @@ -23,10 +22,7 @@ export function createAndStartServer(
}, 1000);

// Get port from environment
const portEnv =
typeof Deno !== "undefined"
? Deno.env.get("PORT_HTTP")
: process.env.PORT_HTTP;
const portEnv = process.env.PORT_HTTP;
if (!portEnv) {
throw new Error("missing PORT_HTTP");
}
Expand All @@ -41,8 +37,7 @@ export function createAndStartServer(
const query = c.req.query("code");
const exitCode = query ? Number(query) : 0;

if (typeof Deno != "undefined") Deno.exit(exitCode);
else process.exit(exitCode);
process.exit(exitCode);

return c.text("unreachable");
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

style: unreachable code after process.exit() should be removed

});
Expand Down
83 changes: 0 additions & 83 deletions examples/system-test-actor/src/isolate/main.ts

This file was deleted.

22 changes: 22 additions & 0 deletions examples/system-test-actor/src/main.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
import { serve } from "@hono/node-server";
import { createNodeWebSocket } from "@hono/node-ws";
import { createAndStartHttpServer } from "./httpServer.js";
import { createAndStartUdpServer } from "./udpServer.js";
import { connectToManager } from "./managerClient.js";

let injectWebSocket: any;
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

style: avoid using 'any' type - should define proper type from @hono/node-ws

const { app, port } = createAndStartHttpServer((app) => {
// Get Node.js WebSocket handler
const result = createNodeWebSocket({ app });
injectWebSocket = result.injectWebSocket;
return result.upgradeWebSocket;
});

const server = serve({ fetch: app.fetch, port });
injectWebSocket(server);
Comment on lines +15 to +16
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

style: server instance should be handled for graceful shutdown (e.g., process.on('SIGTERM'))


createAndStartUdpServer();

if (process.env.MULTI) {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

style: use explicit boolean check for environment variable

Suggested change
if (process.env.MULTI) {
if (process.env.MULTI === 'true') {

connectToManager();
}
85 changes: 85 additions & 0 deletions examples/system-test-actor/src/managerClient.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
import WebSocket from "ws";

export function connectToManager() {
let managerIp = process.env.RIVET_MANAGER_IP;
let managerPort = process.env.RIVET_MANAGER_PORT;
let pingInterval: NodeJS.Timeout;

if (!managerIp || !managerPort) {
console.error("Missing RIVET_MANAGER_IP or RIVET_MANAGER_PORT environment variables");
return;
}

let wsUrl = `ws://${managerIp}:${managerPort}`;
console.log(`Connecting to manager WebSocket at ${wsUrl}`);

let ws = new WebSocket(wsUrl);

ws.on("open", () => {
console.log("Connected to manager WebSocket");

let message = {
init: {
runner_id: process.env.RIVET_RUNNER_ID
}
};
let buffer = Buffer.from(JSON.stringify(message));
ws.send(buffer);

// Start ping loop to keep connection alive
pingInterval = setInterval(() => {
if (ws.readyState === WebSocket.OPEN) {
ws.ping();
}
}, 2000);
});

ws.on("message", (data) => {
let json = data.toString();

console.log("Received message from manager:", json);

let packet = JSON.parse(json);

if (packet.start_actor) {
let message = {
actor_state_update: {
actor_id: packet.start_actor.actor_id,
generation: packet.start_actor.generation,
state: {
running: null,
},
}
};
let buffer = Buffer.from(JSON.stringify(message));
ws.send(buffer);
} else if (packet.signal_actor) {
let message = {
actor_state_update: {
actor_id: packet.start_actor.actor_id,
generation: packet.start_actor.generation,
Comment on lines +59 to +60
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

logic: Incorrect property access - using packet.start_actor instead of packet.signal_actor in the signal_actor handler

Suggested change
actor_id: packet.start_actor.actor_id,
generation: packet.start_actor.generation,
actor_id: packet.signal_actor.actor_id,
generation: packet.signal_actor.generation,

state: {
exited: {
exit_code: 0,
}
},
}
};
let buffer = Buffer.from(JSON.stringify(message));
ws.send(buffer);
}
});

ws.on("error", (error) => {
console.error("WebSocket error:", error);
});

ws.on("close", code => {
console.log("WebSocket connection closed, attempting to reconnect...", code);

// Clear ping interval when connection closes
if (pingInterval) clearInterval(pingInterval);

setTimeout(connectToManager, 5000);
});
}
36 changes: 36 additions & 0 deletions examples/system-test-actor/src/udpServer.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
import dgram from 'dgram';

export function createAndStartUdpServer() {
// Get port from environment
const portEnv = process.env.PORT_UDP;

if (portEnv) {
// Create a UDP socket
const udpServer = dgram.createSocket('udp4');

// Listen for incoming messages
udpServer.on('message', (msg, rinfo) => {
console.log(`UDP server received: ${msg} from ${rinfo.address}:${rinfo.port}`);

// Echo the message back to the sender
udpServer.send(msg, rinfo.port, rinfo.address, (err) => {
if (err) console.error('Failed to send UDP response:', err);
});
});

// Handle errors
udpServer.on('error', (err) => {
console.error('UDP server error:', err);
udpServer.close();
});


const port2 = Number.parseInt(portEnv);
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

style: port validation should happen before socket creation to avoid creating resources that may need cleanup


udpServer.bind(port2, () => {
console.log(`UDP echo server running on port ${port2}`);
});
Comment on lines +30 to +32
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

logic: bind callback should include error handling in case port binding fails

Suggested change
udpServer.bind(port2, () => {
console.log(`UDP echo server running on port ${port2}`);
});
udpServer.bind(port2, (err) => {
if (err) {
console.error('Failed to bind UDP server:', err);
udpServer.close();
return;
}
console.log(`UDP echo server running on port ${port2}`);
});

} else {
console.warn("missing PORT_UDP");
}
}
31 changes: 10 additions & 21 deletions examples/system-test-actor/tests/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,7 @@ const RIVET_PROJECT = process.env.RIVET_PROJECT;
const RIVET_ENVIRONMENT = process.env.RIVET_ENVIRONMENT;

// Determine test kind from environment variable
const BUILD_NAME = process.env.BUILD;
if (BUILD_NAME !== "ws-isolate" && BUILD_NAME !== "ws-container") {
throw new Error(
"Must specify BUILD environment variable as either 'ws-isolate' or 'ws-container'",
);
}
const BUILD_NAME = process.env.BUILD ?? "ws";

let region = process.env.REGION;
if (!region || region.length === 0) {
Expand Down Expand Up @@ -49,13 +44,6 @@ async function run() {
guard: {},
},
},
http2: {
protocol: "http",
internalPort: 8085,
routing: {
guard: {},
},
},
udp: {
protocol: "udp",
// internalPort: 80,
Expand All @@ -65,17 +53,18 @@ async function run() {
},
},
},
runtime: {
environment: {
MULTI: "1",
}
},
lifecycle: {
durable: false,
},
...(BUILD_NAME === "ws-container"
? {
resources: {
cpu: 100,
memory: 100,
},
}
: {}),
resources: {
cpu: 100,
memory: 100,
},
},
});
actorId = actor.id;
Expand Down
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -30,4 +30,4 @@
"esbuild": "^0.25.5",
"actor-core": "file:./frontend/packages/actor-core.tgz"
}
}
}
Loading
Loading