Skip to content

Commit c73f767

Browse files
committed
feat: add actor kv to runners
1 parent 0bb001f commit c73f767

File tree

25 files changed

+507
-577
lines changed

25 files changed

+507
-577
lines changed

Cargo.toml

Lines changed: 0 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -76,14 +76,6 @@ default-features = false
7676
git = "https://github.com/rivet-gg/serde_array_query"
7777
rev = "b9f8bfa"
7878

79-
[workspace.dependencies.deno_core]
80-
git = "https://github.com/rivet-gg/deno_core"
81-
rev = "8a313913fa73d58f4f9532565b0084e723bc34ad"
82-
83-
[workspace.dependencies.deno_runtime]
84-
git = "https://github.com/rivet-gg/deno"
85-
rev = "a6903d67063e07b82836399f63c7a0fa5be8bf56"
86-
8779
[workspace.dependencies.api-helper]
8880
path = "packages/common/api-helper/build"
8981

examples/system-test-actor/src/managerClient.ts

Lines changed: 41 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
import * as net from "net";
22
import * as fs from "fs";
33
import { setInterval, clearInterval } from "timers";
4+
import * as util from "util";
45

56
export function connectToManager() {
67
const socketPath = process.env.RIVET_MANAGER_SOCKET_PATH;
@@ -31,8 +32,9 @@ export function connectToManager() {
3132

3233
client.on("data", (data) => {
3334
const packets = decodeFrames(data);
34-
packets.forEach((packet) => {
35-
console.log("Received packet from manager:", packet);
35+
36+
for (let packet of packets) {
37+
console.log("Received packet from manager:", util.inspect(packet, { depth: null }));
3638

3739
if (packet.start_actor) {
3840
const response = {
@@ -45,6 +47,41 @@ export function connectToManager() {
4547
},
4648
};
4749
client.write(encodeFrame(response));
50+
51+
const kvMessage = {
52+
kv: {
53+
actor_id: packet.start_actor.actor_id,
54+
generation: packet.start_actor.generation,
55+
request_id: 1,
56+
data: {
57+
put: {
58+
keys: [
59+
[[1, 2, 3], [4, 5, 6]],
60+
],
61+
values: [
62+
[11, 12, 13, 14, 15, 16]
63+
],
64+
}
65+
}
66+
}
67+
};
68+
client.write(encodeFrame(kvMessage));
69+
70+
const kvMessage2 = {
71+
kv: {
72+
actor_id: packet.start_actor.actor_id,
73+
generation: packet.start_actor.generation,
74+
request_id: 2,
75+
data: {
76+
get: {
77+
keys: [
78+
[[1, 2, 3], [4, 5, 6]]
79+
],
80+
}
81+
}
82+
}
83+
};
84+
client.write(encodeFrame(kvMessage2));
4885
} else if (packet.signal_actor) {
4986
const response = {
5087
actor_state_update: {
@@ -59,7 +96,7 @@ export function connectToManager() {
5996
};
6097
client.write(encodeFrame(response));
6198
}
62-
});
99+
}
63100
});
64101

65102
client.on("error", (error) => {
@@ -98,7 +135,7 @@ function decodeFrames(buffer: Buffer): any[] {
98135
offset += 4;
99136

100137
if (buffer.length - offset < payloadLength) break; // Incomplete frame data
101-
const json = buffer.slice(offset, offset + payloadLength).toString();
138+
const json = buffer.subarray(offset, offset + payloadLength).toString();
102139
packets.push(JSON.parse(json));
103140
offset += payloadLength;
104141
}

packages/common/fdb-util/src/codes.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,8 @@
1+
// === Copied from foundationdbrs ===
2+
pub const NIL: u8 = 0x00;
3+
pub const NESTED: u8 = 0x05;
4+
pub const ESCAPE: u8 = 0xff;
5+
16
// FDB defines a range (0x40-0x4f) of user type codes for use with its tuple encoding system.
27
// https://github.com/apple/foundationdb/blob/main/design/tuple.md#user-type-codes
38

packages/edge/infra/client/actor-kv/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,12 +7,12 @@ license = "Apache-2.0"
77

88
[dependencies]
99
anyhow.workspace = true
10-
deno_core.workspace = true
1110
fdb-util.workspace = true
1211
foundationdb.workspace = true
1312
futures-util = { version = "0.3" }
1413
indexmap = { version = "2.0" }
1514
prost = "0.13.3"
15+
rivet-util-id.workspace = true
1616
serde = { version = "1.0.195", features = ["derive"] }
1717
serde_json = "1.0.111"
1818
tokio-tungstenite = "0.23.1"

packages/edge/infra/client/actor-kv/src/entry.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
use anyhow::*;
22
use foundationdb as fdb;
33
use prost::Message;
4-
use serde::Serialize;
4+
use serde::{Deserialize, Serialize};
55

66
use crate::{key::Key, metadata::Metadata};
77

@@ -49,7 +49,7 @@ impl EntryBuilder {
4949
}
5050

5151
/// Represents a Rivet KV value.
52-
#[derive(Serialize)]
52+
#[derive(Clone, Debug, Serialize, Deserialize)]
5353
pub struct Entry {
5454
pub metadata: Metadata,
5555
pub value: Vec<u8>,
Lines changed: 34 additions & 91 deletions
Original file line numberDiff line numberDiff line change
@@ -1,18 +1,11 @@
1-
use deno_core::JsBuffer;
21
use foundationdb::tuple::{
3-
Bytes, PackError, PackResult, TupleDepth, TuplePack, TupleUnpack, VersionstampOffset,
2+
Bytes, PackResult, TupleDepth, TuplePack, TupleUnpack, VersionstampOffset,
43
};
5-
use serde::Deserialize;
4+
use serde::{Serialize, Deserialize};
65

76
// TODO: Custom deser impl that uses arrays instead of objects?
8-
#[derive(Clone, Deserialize)]
9-
#[serde(rename_all = "camelCase")]
10-
pub enum Key {
11-
/// Contains references to v8-owned buffers. Requires no copies.
12-
JsInKey(Vec<JsBuffer>),
13-
/// Cant use `ToJsBuffer` because of its API, so it gets converted to ToJsBuffer in the KV ext.
14-
JsOutKey(Vec<Vec<u8>>),
15-
}
7+
#[derive(Clone, Serialize, Deserialize)]
8+
pub struct Key(Vec<Vec<u8>>);
169

1710
impl std::fmt::Debug for Key {
1811
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
@@ -22,49 +15,24 @@ impl std::fmt::Debug for Key {
2215

2316
impl PartialEq for Key {
2417
fn eq(&self, other: &Self) -> bool {
25-
match (self, other) {
26-
(Key::JsInKey(a), Key::JsInKey(b)) => a
27-
.iter()
28-
.map(|x| x.as_ref())
29-
.eq(b.iter().map(|x| x.as_ref())),
30-
(Key::JsOutKey(a), Key::JsOutKey(b)) => a == b,
31-
(Key::JsInKey(a), Key::JsOutKey(b)) => a.iter().map(|x| x.as_ref()).eq(b.iter()),
32-
(Key::JsOutKey(a), Key::JsInKey(b)) => a.iter().eq(b.iter().map(|x| x.as_ref())),
33-
}
18+
self.0 == other.0
3419
}
3520
}
3621

3722
impl Eq for Key {}
3823

3924
impl std::hash::Hash for Key {
4025
fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
41-
match self {
42-
Key::JsInKey(js_in_key) => {
43-
for buffer in js_in_key {
44-
state.write(buffer.as_ref());
45-
}
46-
}
47-
Key::JsOutKey(out_key) => {
48-
for buffer in out_key {
49-
state.write(buffer);
50-
}
51-
}
26+
for buffer in &self.0 {
27+
state.write(buffer);
5228
}
5329
}
5430
}
5531

5632
impl Key {
5733
pub fn len(&self) -> usize {
58-
match self {
59-
Key::JsInKey(js_in_key) => {
60-
// Arbitrary 4 accounting for nesting overhead
61-
js_in_key.iter().fold(0, |acc, x| acc + x.len()) + 4 * js_in_key.len()
62-
}
63-
Key::JsOutKey(out_key) => {
64-
// Arbitrary 4 accounting for nesting overhead
65-
out_key.iter().fold(0, |acc, x| acc + x.len()) + 4 * out_key.len()
66-
}
67-
}
34+
// Arbitrary 4 accounting for nesting overhead
35+
self.0.iter().fold(0, |acc, x| acc + x.len()) + 4 * self.0.len()
6836
}
6937
}
7038

@@ -74,30 +42,25 @@ impl TuplePack for Key {
7442
w: &mut W,
7543
tuple_depth: TupleDepth,
7644
) -> std::io::Result<VersionstampOffset> {
77-
match self {
78-
Key::JsInKey(tuple) => {
79-
let mut offset = VersionstampOffset::None { size: 0 };
45+
let mut offset = VersionstampOffset::None { size: 0 };
8046

81-
w.write_all(&[NESTED])?;
82-
offset += 1;
47+
w.write_all(&[fdb_util::codes::NESTED])?;
48+
offset += 1;
8349

84-
for v in tuple.iter() {
85-
offset += v.as_ref().pack(w, tuple_depth.increment())?;
86-
}
50+
for v in self.0.iter() {
51+
offset += v.pack(w, tuple_depth.increment())?;
52+
}
8753

88-
w.write_all(&[NIL])?;
89-
offset += 1;
54+
w.write_all(&[fdb_util::codes::NIL])?;
55+
offset += 1;
9056

91-
Ok(offset)
92-
}
93-
Key::JsOutKey(_) => unreachable!("should not be packing out keys"),
94-
}
57+
Ok(offset)
9558
}
9659
}
9760

9861
impl<'de> TupleUnpack<'de> for Key {
9962
fn unpack(mut input: &[u8], tuple_depth: TupleDepth) -> PackResult<(&[u8], Self)> {
100-
input = parse_code(input, NESTED)?;
63+
input = fdb_util::parse_code(input, fdb_util::codes::NESTED)?;
10164

10265
let mut vec = Vec::new();
10366
while !is_end_of_tuple(input, true) {
@@ -106,15 +69,21 @@ impl<'de> TupleUnpack<'de> for Key {
10669
vec.push(v.into_owned());
10770
}
10871

109-
input = parse_code(input, NIL)?;
72+
input = fdb_util::parse_code(input, fdb_util::codes::NIL)?;
11073

111-
Ok((input, Key::JsOutKey(vec)))
74+
Ok((input, Key(vec)))
11275
}
11376
}
11477

115-
/// Same as Key::JsInKey except when packing, it leaves off the NIL byte to allow for an open range.
116-
#[derive(Deserialize)]
117-
pub struct ListKey(Vec<JsBuffer>);
78+
/// Same as Key: except when packing, it leaves off the NIL byte to allow for an open range.
79+
#[derive(Clone, Serialize, Deserialize)]
80+
pub struct ListKey(Vec<Vec<u8>>);
81+
82+
impl std::fmt::Debug for ListKey {
83+
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
84+
write!(f, "ListKey({})", self.len())
85+
}
86+
}
11887

11988
impl TuplePack for ListKey {
12089
fn pack<W: std::io::Write>(
@@ -124,11 +93,11 @@ impl TuplePack for ListKey {
12493
) -> std::io::Result<VersionstampOffset> {
12594
let mut offset = VersionstampOffset::None { size: 0 };
12695

127-
w.write_all(&[NESTED])?;
96+
w.write_all(&[fdb_util::codes::NESTED])?;
12897
offset += 1;
12998

130-
for v in self.0.iter() {
131-
offset += v.as_ref().pack(w, tuple_depth.increment())?;
99+
for v in &self.0 {
100+
offset += v.pack(w, tuple_depth.increment())?;
132101
}
133102

134103
// No ending NIL byte compared to `Key::pack`
@@ -144,37 +113,11 @@ impl ListKey {
144113
}
145114
}
146115

147-
// === Copied from foundationdbrs ===
148-
const NIL: u8 = 0x00;
149-
const NESTED: u8 = 0x05;
150-
const ESCAPE: u8 = 0xff;
151-
152-
#[inline]
153-
fn parse_byte(input: &[u8]) -> PackResult<(&[u8], u8)> {
154-
if input.is_empty() {
155-
Err(PackError::MissingBytes)
156-
} else {
157-
Ok((&input[1..], input[0]))
158-
}
159-
}
160-
161-
fn parse_code(input: &[u8], expected: u8) -> PackResult<&[u8]> {
162-
let (input, found) = parse_byte(input)?;
163-
if found == expected {
164-
Ok(input)
165-
} else {
166-
Err(PackError::BadCode {
167-
found,
168-
expected: Some(expected),
169-
})
170-
}
171-
}
172-
173116
fn is_end_of_tuple(input: &[u8], nested: bool) -> bool {
174117
match input.first() {
175118
None => true,
176119
_ if !nested => false,
177-
Some(&NIL) => Some(&ESCAPE) != input.get(1),
120+
Some(&fdb_util::codes::NIL) => Some(&fdb_util::codes::ESCAPE) != input.get(1),
178121
_ => false,
179122
}
180123
}

0 commit comments

Comments
 (0)