Skip to content

Commit da2e172

Browse files
committed
fix: convert runner protocol to protobuf
1 parent afc89a9 commit da2e172

File tree

12 files changed

+283
-183
lines changed

12 files changed

+283
-183
lines changed

packages/edge/infra/client/actor-kv/Cargo.toml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,8 @@ fdb-util.workspace = true
1111
foundationdb.workspace = true
1212
futures-util = { version = "0.3" }
1313
indexmap = { version = "2.0" }
14-
prost = "0.13.3"
14+
pegboard-config.workspace = true
15+
prost = "0.14"
1516
rivet-util-id.workspace = true
1617
serde = { version = "1.0.195", features = ["derive"] }
1718
serde_json = "1.0.111"

packages/edge/infra/client/actor-kv/src/entry.rs

Lines changed: 25 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,27 @@
11
use anyhow::*;
22
use foundationdb as fdb;
3+
use pegboard_config::runner_protocol::proto::kv;
34
use prost::Message;
4-
use serde::{Deserialize, Serialize};
55

6-
use crate::{key::Key, metadata::Metadata};
6+
use crate::key::Key;
7+
8+
/// Represents a Rivet KV value.
9+
#[derive(Clone, Debug)]
10+
pub struct Entry {
11+
inner: kv::Entry,
12+
}
13+
14+
impl std::ops::Deref for Entry {
15+
type Target = kv::Entry;
16+
17+
fn deref(&self) -> &Self::Target {
18+
&self.inner
19+
}
20+
}
721

822
#[derive(Default)]
923
pub(crate) struct EntryBuilder {
10-
metadata: Option<Metadata>,
24+
metadata: Option<kv::Metadata>,
1125
value: Vec<u8>,
1226
next_idx: usize,
1327
}
@@ -19,7 +33,7 @@ impl EntryBuilder {
1933
// We ignore setting the metadata again because it means the same key was given twice in the
2034
// input keys for `ActorKv::get`. We don't perform automatic deduplication.
2135
if self.metadata.is_none() {
22-
self.metadata = Some(Metadata::decode(value.value())?);
36+
self.metadata = Some(kv::Metadata::decode(value.value())?);
2337
}
2438
}
2539
SubKey::Chunk(idx, value) => {
@@ -40,21 +54,17 @@ impl EntryBuilder {
4054
ensure!(!self.value.is_empty(), "empty value at key {key:?}");
4155

4256
Ok(Entry {
43-
metadata: self
44-
.metadata
45-
.with_context(|| format!("no metadata for key {key:?}"))?,
46-
value: self.value,
57+
inner: kv::Entry {
58+
metadata: Some(
59+
self.metadata
60+
.with_context(|| format!("no metadata for key {key:?}"))?,
61+
),
62+
value: self.value,
63+
},
4764
})
4865
}
4966
}
5067

51-
/// Represents a Rivet KV value.
52-
#[derive(Clone, Debug, Serialize, Deserialize)]
53-
pub struct Entry {
54-
pub metadata: Metadata,
55-
pub value: Vec<u8>,
56-
}
57-
5868
/// Represents FDB keys within a Rivet KV key.
5969
pub(crate) enum SubKey {
6070
Metadata(fdb::future::FdbValue),

packages/edge/infra/client/actor-kv/src/key.rs

Lines changed: 24 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,14 @@
11
use foundationdb::tuple::{
22
Bytes, PackResult, TupleDepth, TuplePack, TupleUnpack, VersionstampOffset,
33
};
4-
use serde::{Deserialize, Serialize};
4+
use pegboard_config::runner_protocol::proto::kv;
5+
use prost::Message;
56

67
// TODO: Custom deser impl that uses arrays instead of objects?
7-
#[derive(Clone, Serialize, Deserialize)]
8-
pub struct Key(Vec<Vec<u8>>);
8+
#[derive(Clone)]
9+
pub struct Key {
10+
inner: kv::Key,
11+
}
912

1013
impl std::fmt::Debug for Key {
1114
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
@@ -15,24 +18,23 @@ impl std::fmt::Debug for Key {
1518

1619
impl PartialEq for Key {
1720
fn eq(&self, other: &Self) -> bool {
18-
self.0 == other.0
21+
self.inner == other.inner
1922
}
2023
}
2124

2225
impl Eq for Key {}
2326

2427
impl std::hash::Hash for Key {
2528
fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
26-
for buffer in &self.0 {
29+
for buffer in &self.inner.segments {
2730
state.write(buffer);
2831
}
2932
}
3033
}
3134

3235
impl Key {
3336
pub fn len(&self) -> usize {
34-
// Arbitrary 4 accounting for nesting overhead
35-
self.0.iter().fold(0, |acc, x| acc + x.len()) + 4 * self.0.len()
37+
self.inner.encoded_len()
3638
}
3739
}
3840

@@ -47,7 +49,7 @@ impl TuplePack for Key {
4749
w.write_all(&[fdb_util::codes::NESTED])?;
4850
offset += 1;
4951

50-
for v in self.0.iter() {
52+
for v in self.inner.segments.iter() {
5153
offset += v.pack(w, tuple_depth.increment())?;
5254
}
5355

@@ -62,22 +64,29 @@ impl<'de> TupleUnpack<'de> for Key {
6264
fn unpack(mut input: &[u8], tuple_depth: TupleDepth) -> PackResult<(&[u8], Self)> {
6365
input = fdb_util::parse_code(input, fdb_util::codes::NESTED)?;
6466

65-
let mut vec = Vec::new();
67+
let mut segments = Vec::new();
6668
while !is_end_of_tuple(input, true) {
6769
let (rem, v) = Bytes::unpack(input, tuple_depth.increment())?;
6870
input = rem;
69-
vec.push(v.into_owned());
71+
segments.push(v.into_owned());
7072
}
7173

7274
input = fdb_util::parse_code(input, fdb_util::codes::NIL)?;
7375

74-
Ok((input, Key(vec)))
76+
Ok((
77+
input,
78+
Key {
79+
inner: kv::Key { segments },
80+
},
81+
))
7582
}
7683
}
7784

7885
/// Same as Key: except when packing, it leaves off the NIL byte to allow for an open range.
79-
#[derive(Clone, Serialize, Deserialize)]
80-
pub struct ListKey(Vec<Vec<u8>>);
86+
#[derive(Clone)]
87+
pub struct ListKey {
88+
inner: kv::Key,
89+
}
8190

8291
impl std::fmt::Debug for ListKey {
8392
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
@@ -96,7 +105,7 @@ impl TuplePack for ListKey {
96105
w.write_all(&[fdb_util::codes::NESTED])?;
97106
offset += 1;
98107

99-
for v in &self.0 {
108+
for v in &self.inner.segments {
100109
offset += v.pack(w, tuple_depth.increment())?;
101110
}
102111

@@ -108,8 +117,7 @@ impl TuplePack for ListKey {
108117

109118
impl ListKey {
110119
pub fn len(&self) -> usize {
111-
// Arbitrary 4 accounting for nesting overhead
112-
self.0.iter().fold(0, |acc, x| acc + x.len()) + 4 * self.0.len()
120+
self.inner.encoded_len()
113121
}
114122
}
115123

packages/edge/infra/client/actor-kv/src/lib.rs

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -14,15 +14,14 @@ use indexmap::IndexMap;
1414
pub use key::Key;
1515
use list_query::ListLimitReached;
1616
pub use list_query::ListQuery;
17-
pub use metadata::Metadata;
17+
use pegboard_config::runner_protocol::proto::kv;
1818
use prost::Message;
1919
use tokio::sync::Mutex;
2020
use utils::{validate_entries, validate_keys, TransactionExt};
2121

2222
mod entry;
2323
pub mod key;
2424
mod list_query;
25-
mod metadata;
2625
mod utils;
2726

2827
const MAX_KEY_SIZE: usize = 2 * 1024;
@@ -325,8 +324,8 @@ impl ActorKv {
325324
// Clear previous before setting
326325
tx.clear_subspace_range(&key_subspace);
327326

328-
let metadata = Metadata {
329-
kv_version: self.version.as_bytes().to_vec(),
327+
let metadata = kv::Metadata {
328+
version: self.version.as_bytes().to_vec(),
330329
create_ts: utils::now(),
331330
};
332331
let mut buf = Vec::new();

packages/edge/infra/client/actor-kv/src/list_query.rs

Lines changed: 17 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -1,34 +1,39 @@
11
use anyhow::*;
22
use foundationdb::tuple::Subspace;
33
use indexmap::IndexMap;
4-
use serde::{Deserialize, Serialize};
54

65
use crate::{
76
entry::EntryBuilder,
87
key::{Key, ListKey},
98
MAX_KEY_SIZE,
109
};
1110

12-
#[derive(Clone, Debug, Serialize, Deserialize)]
13-
#[serde(rename_all = "snake_case")]
11+
#[derive(Clone, Debug)]
1412
pub enum ListQuery {
1513
All,
16-
RangeInclusive(ListKey, Key),
17-
RangeExclusive(ListKey, Key),
14+
Range {
15+
start: ListKey,
16+
end: Key,
17+
exclusive: bool,
18+
},
1819
Prefix(ListKey),
1920
}
2021

2122
impl ListQuery {
2223
pub(crate) fn range(&self, subspace: &Subspace) -> (Vec<u8>, Vec<u8>) {
2324
match self {
2425
ListQuery::All => subspace.range(),
25-
ListQuery::RangeInclusive(start, end) => (
26+
ListQuery::Range {
27+
start,
28+
end,
29+
exclusive,
30+
} => (
2631
subspace.subspace(&start).range().0,
27-
subspace.subspace(&end).range().1,
28-
),
29-
ListQuery::RangeExclusive(start, end) => (
30-
subspace.subspace(&start).range().0,
31-
subspace.subspace(&end).range().1,
32+
if *exclusive {
33+
subspace.subspace(&end).range().0
34+
} else {
35+
subspace.subspace(&end).range().1
36+
},
3237
),
3338
ListQuery::Prefix(prefix) => subspace.subspace(&prefix).range(),
3439
}
@@ -37,7 +42,7 @@ impl ListQuery {
3742
pub(crate) fn validate(&self) -> Result<()> {
3843
match self {
3944
ListQuery::All => {}
40-
ListQuery::RangeInclusive(start, end) => {
45+
ListQuery::Range { start, end, .. } => {
4146
ensure!(
4247
start.len() <= MAX_KEY_SIZE,
4348
"start key is too long (max 2048 bytes)"
@@ -47,16 +52,6 @@ impl ListQuery {
4752
"end key is too long (max 2048 bytes)"
4853
);
4954
}
50-
ListQuery::RangeExclusive(start, end) => {
51-
ensure!(
52-
start.len() <= MAX_KEY_SIZE,
53-
"startAfter key is too long (max 2048 bytes)"
54-
);
55-
ensure!(
56-
end.len() <= MAX_KEY_SIZE,
57-
"end key is too long (max 2048 bytes)"
58-
);
59-
}
6055
ListQuery::Prefix(prefix) => {
6156
ensure!(
6257
prefix.len() <= MAX_KEY_SIZE,

packages/edge/infra/client/actor-kv/src/metadata.rs

Lines changed: 0 additions & 9 deletions
This file was deleted.

packages/edge/infra/client/actor-kv/src/utils.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
use std::{collections::HashMap, result::Result::Ok};
1+
use std::result::Result::Ok;
22

33
use anyhow::*;
44
use foundationdb as fdb;

packages/edge/infra/client/config/Cargo.toml

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,11 +10,14 @@ anyhow = "1.0"
1010
indexmap = { version = "2.0" }
1111
ipnet = { version = "2.10.1", features = ["serde"] }
1212
pegboard.workspace = true
13-
pegboard-actor-kv.workspace = true
13+
prost = "0.14"
1414
rivet-util-id.workspace = true
1515
schemars = { version = "0.8.21", features = ["url", "uuid1"] }
1616
serde = { version = "1.0.195", features = ["derive"] }
1717
serde_json = "1.0"
1818
tokio-util = { version = "0.7", features = ["codec"] }
1919
url = "2.2.2"
20-
uuid = { version = "1.6.1", features = ["v4"] }
20+
uuid = { version = "1.6.1", features = ["v4"] }
21+
22+
[build-dependencies]
23+
prost-build = "0.14"
Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
use std::io::Result;
2+
3+
fn main() -> Result<()> {
4+
prost_build::compile_protos(
5+
&[
6+
"resources/proto/kv.proto",
7+
"resources/proto/runner_protocol.proto",
8+
],
9+
&["resources/proto/"],
10+
)?;
11+
12+
Ok(())
13+
}

0 commit comments

Comments
 (0)