Skip to content
This repository was archived by the owner on Oct 22, 2025. It is now read-only.

Commit 6a4ec8b

Browse files
committed
feat(engine): add engine driver
1 parent 0433897 commit 6a4ec8b

18 files changed

+849
-95
lines changed

.gitattributes

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,4 +7,5 @@
77

88
*.png binary
99
*.jpg binary
10+
*.tgz binary
1011

packages/core/package.json

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -158,20 +158,21 @@
158158
},
159159
"dependencies": {
160160
"@hono/standard-validator": "^0.1.3",
161-
"cbor-x": "^1.6.0",
161+
"@hono/zod-openapi": "^0.19.10",
162162
"@rivetkit/fast-json-patch": "^3.1.2",
163+
"cbor-x": "^1.6.0",
164+
"hono": "^4.7.0",
163165
"invariant": "^2.2.4",
164166
"nanoevents": "^9.1.0",
165167
"on-change": "^5.0.1",
166168
"p-retry": "^6.2.1",
167-
"zod": "^3.25.76",
168-
"@hono/zod-openapi": "^0.19.10",
169-
"hono": "^4.7.0"
169+
"zod": "^3.25.76"
170170
},
171171
"devDependencies": {
172-
"@hono/node-server": "^1.14.0",
172+
"@hono/node-server": "^1.18.2",
173173
"@hono/node-ws": "^1.1.1",
174174
"@rivet-gg/actor-core": "^25.1.0",
175+
"@rivetkit/engine-runner": "https://pkg.pr.new/rivet-gg/engine/@rivetkit/engine-runner@390",
175176
"@types/invariant": "^2",
176177
"@types/node": "^22.13.1",
177178
"@types/ws": "^8",

packages/core/src/drivers/default.ts

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
1+
import { UserError } from "@/actor/errors";
12
import { logger } from "@/actor/log";
23
import { createFileSystemOrMemoryDriver } from "@/drivers/file-system/mod";
3-
import { type DriverConfig, UserError } from "@/mod";
4+
import type { DriverConfig } from "@/registry/run-config";
45
import { getEnvUniversal } from "@/utils";
56

67
/**
Lines changed: 274 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,274 @@
1+
import type {
2+
ActorConfig as RunnerActorConfig,
3+
RunnerConfig,
4+
} from "@rivetkit/engine-runner";
5+
import { Runner } from "@rivetkit/engine-runner";
6+
import * as cbor from "cbor-x";
7+
import { ActionContext } from "@/actor/action";
8+
import { generateConnId, generateConnToken } from "@/actor/connection";
9+
import {
10+
CONN_DRIVER_GENERIC_HTTP,
11+
type GenericHttpDriverState,
12+
} from "@/actor/generic-conn-driver";
13+
import * as protoHttpAction from "@/actor/protocol/http/action";
14+
import { deserialize, serialize } from "@/actor/protocol/serde";
15+
import { ActorHandle } from "@/client/actor-handle";
16+
import type { Client } from "@/client/client";
17+
import type {
18+
ActorDriver,
19+
AnyActorInstance,
20+
ManagerDriver,
21+
} from "@/driver-helpers/mod";
22+
import {
23+
type ActorDriver,
24+
serializeEmptyPersistData,
25+
} from "@/driver-helpers/mod";
26+
import type {
27+
ActorRouter,
28+
AnyActorInstance as CoreAnyActorInstance,
29+
RegistryConfig,
30+
RunConfig,
31+
} from "@/mod";
32+
import {
33+
createActorRouter,
34+
createGenericConnDrivers,
35+
GenericConnGlobalState,
36+
lookupInRegistry,
37+
} from "@/mod";
38+
import type { Config } from "./config";
39+
import { logger } from "./log";
40+
import { deserializeKey } from "./utils";
41+
42+
interface ActorHandler {
43+
actor?: AnyActorInstance;
44+
actorPromise?: PromiseWithResolvers<void>;
45+
genericConnGlobalState: GenericConnGlobalState;
46+
metadata: RunnerActorConfig["metadata"];
47+
persistedData?: Uint8Array;
48+
}
49+
50+
export type DriverContext = {};
51+
52+
export class EngineActorDriver implements ActorDriver {
53+
#registryConfig: RegistryConfig;
54+
#runConfig: RunConfig;
55+
#managerDriver: ManagerDriver;
56+
#inlineClient: Client<any>;
57+
#config: Config;
58+
#runner: Runner;
59+
#actors: Map<string, ActorHandler> = new Map();
60+
#actorRouter: ActorRouter;
61+
#version: number = 1; // Version for the runner protocol
62+
63+
constructor(
64+
registryConfig: RegistryConfig,
65+
runConfig: RunConfig,
66+
managerDriver: ManagerDriver,
67+
inlineClient: Client<any>,
68+
config: Config,
69+
) {
70+
this.#registryConfig = registryConfig;
71+
this.#runConfig = runConfig;
72+
this.#managerDriver = managerDriver;
73+
this.#inlineClient = inlineClient;
74+
this.#config = config;
75+
this.#actorRouter = createActorRouter(runConfig, this);
76+
77+
// Create runner configuration
78+
const runnerConfig: RunnerConfig = {
79+
version: this.#version,
80+
endpoint: config.endpoint,
81+
pegboardEndpoint: config.pegboardEndpoint,
82+
namespace: config.namespace,
83+
addresses: config.addresses,
84+
totalSlots: config.totalSlots,
85+
runnerName: config.runnerName,
86+
fetch: this.#runnerFetch.bind(this),
87+
onActorStart: this.#runnerOnActorStart.bind(this),
88+
onActorStop: this.#runnerOnActorStop.bind(this),
89+
onActorSleep: this.#runerOnActorSleep.bind(this),
90+
};
91+
92+
// Create and start runner
93+
this.#runner = new Runner(runnerConfig);
94+
this.#runner.start();
95+
logger().info("engine runner started", {
96+
endpoint: config.endpoint,
97+
namespace: config.namespace,
98+
runnerName: config.runnerName,
99+
});
100+
}
101+
102+
async #loadActorHandler(actorId: string): Promise<ActorHandler> {
103+
// Check if actor is already loaded
104+
let handler = this.#actors.get(actorId);
105+
if (handler) {
106+
if (handler.actorPromise) await handler.actorPromise.promise;
107+
if (!handler.actor) throw new Error("Actor should be loaded");
108+
return handler;
109+
}
110+
111+
// Create new actor handler
112+
handler = {
113+
genericConnGlobalState: new GenericConnGlobalState(),
114+
};
115+
this.#actors.set(actorId, handler);
116+
117+
// This will resolve in onActorStart
118+
handler.actorPromise = Promise.withResolvers();
119+
120+
// Wait for actor to be loaded (will be resolved in onActorStart)
121+
await handler.actorPromise.promise;
122+
123+
if (!handler.actor) {
124+
throw new Error(`Actor ${actorId} failed to load`);
125+
}
126+
127+
return handler;
128+
}
129+
130+
async loadActor(actorId: string): Promise<AnyActorInstance> {
131+
const handler = await this.#loadActorHandler(actorId);
132+
if (!handler.actor) throw new Error(`Actor ${actorId} failed to load`);
133+
return handler.actor;
134+
}
135+
136+
getGenericConnGlobalState(actorId: string): GenericConnGlobalState {
137+
const handler = this.#actors.get(actorId);
138+
if (!handler) {
139+
throw new Error(`Actor ${actorId} not loaded`);
140+
}
141+
return handler.genericConnGlobalState;
142+
}
143+
144+
getContext(actorId: string): DriverContext {
145+
// Return empty context for now
146+
return {};
147+
}
148+
149+
async readPersistedData(actorId: string): Promise<Uint8Array | undefined> {
150+
const handler = this.#actors.get(actorId);
151+
if (!handler) {
152+
throw new Error(`Actor ${actorId} not loaded`);
153+
}
154+
return handler.persistedData;
155+
}
156+
157+
async writePersistedData(actorId: string, data: Uint8Array): Promise<void> {
158+
const handler = this.#actors.get(actorId);
159+
if (!handler) {
160+
throw new Error(`Actor ${actorId} not loaded`);
161+
}
162+
handler.persistedData = data;
163+
}
164+
165+
async setAlarm(actor: AnyActorInstance, timestamp: number): Promise<void> {
166+
// Not implemented for engine driver
167+
throw new Error("Alarms not implemented for engine driver");
168+
}
169+
170+
async getDatabase(actorId: string): Promise<unknown | undefined> {
171+
// Not implemented for engine driver
172+
throw new Error("Database not implemented for engine driver");
173+
}
174+
175+
// Runner lifecycle callbacks
176+
async #runnerOnActorStart(
177+
actorId: string,
178+
generation: number,
179+
config: RunnerActorConfig,
180+
): Promise<void> {
181+
logger().info("actor starting", { actorId, generation });
182+
183+
// Deserialize input
184+
let input;
185+
if (config.input) {
186+
input = cbor.decode(config.input);
187+
}
188+
189+
// Get or create handler
190+
let handler = this.#actors.get(actorId);
191+
if (!handler) {
192+
handler = {
193+
genericConnGlobalState: new GenericConnGlobalState(),
194+
actorPromise: Promise.withResolvers(),
195+
metadata: config.metadata,
196+
persistedData: serializeEmptyPersistData(input),
197+
};
198+
this.#actors.set(actorId, handler);
199+
}
200+
201+
const name = config.metadata.actor.name as string;
202+
const key = deserializeKey(config.metadata.actor.keys[0]);
203+
204+
// Create actor instance
205+
const definition = lookupInRegistry(
206+
this.#registryConfig,
207+
config.metadata.actor.name as string, // TODO: Remove cast
208+
);
209+
handler.actor = definition.instantiate();
210+
211+
// Start actor
212+
const connDrivers = createGenericConnDrivers(
213+
handler.genericConnGlobalState,
214+
);
215+
await handler.actor.start(
216+
connDrivers,
217+
this,
218+
this.#inlineClient,
219+
actorId,
220+
name,
221+
key,
222+
"unknown", // TODO: Add regions
223+
);
224+
225+
// Resolve promise if waiting
226+
handler.actorPromise?.resolve();
227+
handler.actorPromise = undefined;
228+
229+
logger().info("actor started", { actorId, name, key });
230+
}
231+
232+
async #runnerOnActorStop(actorId: string, generation: number): Promise<void> {
233+
logger().info("actor stopping", { actorId, generation });
234+
235+
const handler = this.#actors.get(actorId);
236+
if (handler?.actor) {
237+
await handler.actor.stop();
238+
this.#actors.delete(actorId);
239+
}
240+
241+
logger().info("actor stopped", { actorId });
242+
}
243+
244+
async #runerOnActorSleep(actorId: string, generation: number): Promise<void> {
245+
logger().info("actor sleeping", { actorId, generation });
246+
// No special handling needed for sleep in this implementation
247+
}
248+
249+
async #runnerFetch(actorId: string, request: Request): Promise<Response> {
250+
return await this.#actorRouter.fetch(request, { actorId });
251+
}
252+
253+
async stop(): Promise<void> {
254+
logger().info("stopping engine actor driver");
255+
await this.#runner.stop();
256+
}
257+
}
258+
259+
export function createEngineActorDriverBuilder(config: Config) {
260+
return (
261+
registryConfig: RegistryConfig,
262+
runConfig: RunConfig,
263+
managerDriver: ManagerDriver,
264+
inlineClient: Client<any>,
265+
) => {
266+
return new EngineActorDriver(
267+
registryConfig,
268+
runConfig,
269+
managerDriver,
270+
inlineClient,
271+
config,
272+
);
273+
};
274+
}
Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
import type { Hono } from "hono";
2+
import { z } from "zod";
3+
import { RunConfigSchema } from "@/registry/run-config";
4+
5+
export const ConfigSchema = z
6+
.object({
7+
app: z.custom<Hono>().optional(),
8+
endpoint: z.string().default("http://localhost:6420"),
9+
pegboardEndpoint: z.string().optional(),
10+
namespace: z.string().default("default"),
11+
runnerName: z.string().default("rivetkit-runner"),
12+
totalSlots: z.number().default(100),
13+
addresses: z
14+
.record(
15+
z.object({
16+
host: z.string(),
17+
port: z.number(),
18+
}),
19+
)
20+
.default({ main: { host: "127.0.0.1", port: 5051 } }),
21+
guardPort: z.number().default(7080),
22+
})
23+
.default({});
24+
25+
export type InputConfig = z.input<typeof ConfigSchema>;
26+
export type Config = z.infer<typeof ConfigSchema>;
Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
import { getLogger } from "@/common/log";
2+
3+
export const LOGGER_NAME = "driver-engine-runner";
4+
5+
export function logger() {
6+
return getLogger(LOGGER_NAME);
7+
}

0 commit comments

Comments
 (0)