Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion Cargo.toml

Large diffs are not rendered by default.

5 changes: 4 additions & 1 deletion examples/system-test-actor/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,15 @@
"dependencies": {
"@hono/node-server": "^1.13.8",
"@hono/node-ws": "^1.1.0",
"@rivet-gg/runner-protocol": "file:../../sdks/runner-protocol",
"google-protobuf": "^3.21.2",
"hono": "^4.6.17"
},
"devDependencies": {
"@rivet-gg/actor-core": "^5.1.2",
"@rivet-gg/api": "^24.6.2",
"@types/deno": "^2.2.0",
"@types/google-protobuf": "^3.15.0",
"@types/node": "^22.13.9",
"@types/ws": "^8.18.0",
"dgram": "^1.0.1",
Expand All @@ -21,7 +24,7 @@
"ws": "^8.18.1"
},
"scripts": {
"start": "tsx ws.ts",
"start": "tsx src/main.ts",
"test": "tsx tests/client.ts",
"build": "tsc --outDir dist"
}
Expand Down
119 changes: 46 additions & 73 deletions examples/system-test-actor/src/managerClient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import * as net from "net";
import * as fs from "fs";
import { setInterval, clearInterval } from "timers";
import * as util from "util";
import { encodeFrame, decodeFrames, types as protocol } from "@rivet-gg/runner-protocol";

export function connectToManager() {
const socketPath = process.env.RIVET_MANAGER_SOCKET_PATH;
Expand All @@ -25,77 +26,79 @@ export function connectToManager() {

// Start ping loop to keep connection alive
pingInterval = setInterval(() => {
const pingMessage = { ping: null };
const pingMessage = new protocol.ToManager({
ping: new protocol.ToManager.Ping()
});
client.write(encodeFrame(pingMessage));
}, 2000);
});

client.on("data", (data) => {
const packets = decodeFrames(data);
const packets = decodeFrames(data, protocol.ToRunner);

for (let packet of packets) {
console.log("Received packet from manager:", util.inspect(packet, { depth: null }));

if (packet.start_actor) {
const response = {
actor_state_update: {
const response = new protocol.ToManager({
actor_state_update: new protocol.ToManager.ActorStateUpdate({
actor_id: packet.start_actor.actor_id,
generation: packet.start_actor.generation,
state: {
running: null,
},
},
};
state: new protocol.ActorState({
running: new protocol.ActorState.Running()
})
})
});
client.write(encodeFrame(response));

console.log(`actor_${packet.start_actor.actor_id}`, 'fweh');

const kvMessage = {
kv: {
const kvMessage = new protocol.ToManager({
kv: new rivet.pegboard.kv.Request({
actor_id: packet.start_actor.actor_id,
generation: packet.start_actor.generation,
request_id: 1,
data: {
put: {
keys: [
[[1, 2, 3], [4, 5, 6]],
],
values: [
[11, 12, 13, 14, 15, 16]
],
}
}
}
};
put: new rivet.pegboard.kv.Request.Put({
keys: [
new rivet.pegboard.kv.Key({
segments: [new Uint8Array([1, 2, 3]), new Uint8Array([4, 5, 6])]
})
],
values: [
new Uint8Array([11, 12, 13, 14, 15, 16])
]
})
})
});
client.write(encodeFrame(kvMessage));

const kvMessage2 = {
kv: {
const kvMessage2 = new protocol.ToManager({
kv: new rivet.pegboard.kv.Request({
actor_id: packet.start_actor.actor_id,
generation: packet.start_actor.generation,
request_id: 2,
data: {
get: {
keys: [
[[1, 2, 3], [4, 5, 6]]
],
}
}
}
};
get: new rivet.pegboard.kv.Request.Get({
keys: [
new rivet.pegboard.kv.Key({
segments: [new Uint8Array([1, 2, 3]), new Uint8Array([4, 5, 6])]
})
]
})
})
});
client.write(encodeFrame(kvMessage2));
} else if (packet.signal_actor) {
const response = {
actor_state_update: {
const response = new protocol.ToManager({
actor_state_update: new protocol.ToManager.ActorStateUpdate({
actor_id: packet.signal_actor.actor_id,
generation: packet.signal_actor.generation,
state: {
exited: {
exit_code: 0,
},
},
},
};
state: new protocol.ActorState({
exited: new protocol.ActorState.Exited({
exit_code: 0
})
})
})
});
client.write(encodeFrame(response));
}
}
Expand All @@ -115,33 +118,3 @@ export function connectToManager() {
});
}

function encodeFrame(payload: any): Buffer {
const json = JSON.stringify(payload);
const payloadLength = Buffer.alloc(4);
payloadLength.writeUInt32BE(json.length, 0);

const header = Buffer.alloc(4); // All zeros for now

return Buffer.concat([payloadLength, header, Buffer.from(json)]);
}

function decodeFrames(buffer: Buffer): any[] {
const packets = [];
let offset = 0;

while (offset < buffer.length) {
if (buffer.length - offset < 8) break; // Incomplete frame length + header
const payloadLength = buffer.readUInt32BE(offset);
offset += 4;

// Skip the header (4 bytes)
offset += 4;

if (buffer.length - offset < payloadLength) break; // Incomplete frame data
const json = buffer.subarray(offset, offset + payloadLength).toString();
packets.push(JSON.parse(json));
offset += payloadLength;
}

return packets;
}
26 changes: 26 additions & 0 deletions examples/system-test-actor/yarn.lock
Original file line number Diff line number Diff line change
Expand Up @@ -277,13 +277,29 @@ __metadata:
languageName: node
linkType: hard

"@rivet-gg/runner-protocol@file:../../sdks/runner-protocol::locator=system-test-container%40workspace%3A.":
version: 1.0.0
resolution: "@rivet-gg/runner-protocol@file:../../sdks/runner-protocol#../../sdks/runner-protocol::hash=3263e1&locator=system-test-container%40workspace%3A."
dependencies:
google-protobuf: "npm:^3.21.0"
checksum: 10c0/b6c92f6d634eac885af3812bee67f905bbcab9edd7c32558540e0037703bb3dd273754e7282044a47aeedf980dfefb50f7828a144f4c5fe4a6b54099c973d7d8
languageName: node
linkType: hard

"@types/deno@npm:^2.2.0":
version: 2.2.0
resolution: "@types/deno@npm:2.2.0"
checksum: 10c0/cb45bbffe66a3008224a509c6bcb338921cc68b9045363f77ba5d84650d879b8fd4c810db24369a93fbce4a8e2855808bb141c0447feb47d911a7512ba374bde
languageName: node
linkType: hard

"@types/google-protobuf@npm:^3.15.0":
version: 3.15.12
resolution: "@types/google-protobuf@npm:3.15.12"
checksum: 10c0/721783234e627f367dd710c345a1eaa9dca4ac64910032cef0c851c1821e05d06ffb51e9d1693080f1c0797a8674f89130fa56a390395ec7791ea8506d2f3bfb
languageName: node
linkType: hard

"@types/node@npm:*, @types/node@npm:^22.13.9":
version: 22.13.9
resolution: "@types/node@npm:22.13.9"
Expand Down Expand Up @@ -847,6 +863,13 @@ __metadata:
languageName: node
linkType: hard

"google-protobuf@npm:^3.21.0, google-protobuf@npm:^3.21.2":
version: 3.21.4
resolution: "google-protobuf@npm:3.21.4"
checksum: 10c0/28f2800f7fe1a8fc55eb58ba76e158268407bfb3b90646eaf8a177dd92a2e522459b773f8132ae546e60ac3b6f5947557a1cf3d963a05bb594f43bcde640f54f
languageName: node
linkType: hard

"gopd@npm:^1.2.0":
version: 1.2.0
resolution: "gopd@npm:1.2.0"
Expand Down Expand Up @@ -1533,10 +1556,13 @@ __metadata:
"@hono/node-ws": "npm:^1.1.0"
"@rivet-gg/actor-core": "npm:^5.1.2"
"@rivet-gg/api": "npm:^24.6.2"
"@rivet-gg/runner-protocol": "file:../../sdks/runner-protocol"
"@types/deno": "npm:^2.2.0"
"@types/google-protobuf": "npm:^3.15.0"
"@types/node": "npm:^22.13.9"
"@types/ws": "npm:^8.18.0"
dgram: "npm:^1.0.1"
google-protobuf: "npm:^3.21.2"
hono: "npm:^4.6.17"
node-fetch: "npm:^3.3.2"
tsx: "npm:^4.7.0"
Expand Down
2 changes: 2 additions & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
"devDependencies": {
"@biomejs/biome": "^1.9.4",
"@yarnpkg/plugin-exec": "^3.0.1",
"protoc-gen-js": "^3.21.2",
"protoc-gen-ts": "^0.8.7",
"turbo": "^2.0.1"
},
"resolutions": {
Expand Down
2 changes: 1 addition & 1 deletion packages/edge/infra/client/actor-kv/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ fdb-util.workspace = true
foundationdb.workspace = true
futures-util = { version = "0.3" }
indexmap = { version = "2.0" }
pegboard-config.workspace = true
pegboard-runner-protocol.workspace = true
prost = "0.14"
rivet-util-id.workspace = true
serde = { version = "1.0.195", features = ["derive"] }
Expand Down
2 changes: 1 addition & 1 deletion packages/edge/infra/client/actor-kv/src/entry.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use anyhow::*;
use foundationdb as fdb;
use pegboard_config::runner_protocol::proto::kv;
use pegboard_runner_protocol::proto::kv;
use prost::Message;

use crate::key::Key;
Expand Down
2 changes: 1 addition & 1 deletion packages/edge/infra/client/actor-kv/src/key.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use foundationdb::tuple::{
Bytes, PackResult, TupleDepth, TuplePack, TupleUnpack, VersionstampOffset,
};
use pegboard_config::runner_protocol::proto::kv;
use pegboard_runner_protocol::proto::kv;
use prost::Message;

// TODO: Custom deser impl that uses arrays instead of objects?
Expand Down
2 changes: 1 addition & 1 deletion packages/edge/infra/client/actor-kv/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use indexmap::IndexMap;
pub use key::Key;
use list_query::ListLimitReached;
pub use list_query::ListQuery;
use pegboard_config::runner_protocol::proto::kv;
use pegboard_runner_protocol::proto::kv;
use prost::Message;
use tokio::sync::Mutex;
use utils::{validate_entries, validate_keys, TransactionExt};
Expand Down
2 changes: 1 addition & 1 deletion packages/edge/infra/client/actor-kv/src/list_query.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use anyhow::*;
use foundationdb::tuple::Subspace;
use indexmap::IndexMap;
use pegboard_config::runner_protocol::proto::kv;
use pegboard_runner_protocol::proto::kv;

use crate::{
entry::EntryBuilder,
Expand Down
5 changes: 0 additions & 5 deletions packages/edge/infra/client/config/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,9 @@ anyhow = "1.0"
indexmap = { version = "2.0" }
ipnet = { version = "2.10.1", features = ["serde"] }
pegboard.workspace = true
prost = "0.14"
rivet-util-id.workspace = true
schemars = { version = "0.8.21", features = ["url", "uuid1"] }
serde = { version = "1.0.195", features = ["derive"] }
serde_json = "1.0"
tokio-util = { version = "0.7", features = ["codec"] }
url = "2.2.2"
uuid = { version = "1.6.1", features = ["v4"] }

[build-dependencies]
prost-build = "0.14"
1 change: 0 additions & 1 deletion packages/edge/infra/client/config/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,2 @@
mod manager;
pub mod runner_protocol;
pub use manager::*;
5 changes: 1 addition & 4 deletions packages/edge/infra/client/echo/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,9 @@ license = "Apache-2.0"

[dependencies]
anyhow = "1.0"
bytes = "1.0"
futures-util = "0.3"
http = "0.2"
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
pegboard-runner-protocol.workspace = true
tokio = { version = "1.40", features = ["full",] }
tokio-util = "0.7"
uuid = { version = "1", features = ["v4", "serde"] }
warp = "0.3.7"
Loading
Loading