Skip to content

Commit b5ea8a1

Browse files
committed
fix: convert runner protocol to protobuf
1 parent f86bfc5 commit b5ea8a1

File tree

17 files changed

+765
-281
lines changed

17 files changed

+765
-281
lines changed

packages/edge/infra/client/actor-kv/Cargo.toml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,8 @@ fdb-util.workspace = true
1111
foundationdb.workspace = true
1212
futures-util = { version = "0.3" }
1313
indexmap = { version = "2.0" }
14-
prost = "0.13.3"
14+
pegboard-config.workspace = true
15+
prost = "0.14"
1516
rivet-util-id.workspace = true
1617
serde = { version = "1.0.195", features = ["derive"] }
1718
serde_json = "1.0.111"

packages/edge/infra/client/actor-kv/src/entry.rs

Lines changed: 37 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,39 @@
11
use anyhow::*;
22
use foundationdb as fdb;
3+
use pegboard_config::runner_protocol::proto::kv;
34
use prost::Message;
4-
use serde::{Deserialize, Serialize};
55

6-
use crate::{key::Key, metadata::Metadata};
6+
use crate::key::Key;
7+
8+
/// Represents a Rivet KV value.
9+
#[derive(Clone, Debug)]
10+
pub struct Entry {
11+
inner: kv::Entry,
12+
}
13+
14+
impl std::ops::Deref for Entry {
15+
type Target = kv::Entry;
16+
17+
fn deref(&self) -> &Self::Target {
18+
&self.inner
19+
}
20+
}
21+
22+
impl From<kv::Entry> for Entry {
23+
fn from(value: kv::Entry) -> Entry {
24+
Entry { inner: value }
25+
}
26+
}
27+
28+
impl From<Entry> for kv::Entry {
29+
fn from(value: Entry) -> kv::Entry {
30+
value.inner
31+
}
32+
}
733

834
#[derive(Default)]
935
pub(crate) struct EntryBuilder {
10-
metadata: Option<Metadata>,
36+
metadata: Option<kv::Metadata>,
1137
value: Vec<u8>,
1238
next_idx: usize,
1339
}
@@ -19,7 +45,7 @@ impl EntryBuilder {
1945
// We ignore setting the metadata again because it means the same key was given twice in the
2046
// input keys for `ActorKv::get`. We don't perform automatic deduplication.
2147
if self.metadata.is_none() {
22-
self.metadata = Some(Metadata::decode(value.value())?);
48+
self.metadata = Some(kv::Metadata::decode(value.value())?);
2349
}
2450
}
2551
SubKey::Chunk(idx, value) => {
@@ -40,21 +66,17 @@ impl EntryBuilder {
4066
ensure!(!self.value.is_empty(), "empty value at key {key:?}");
4167

4268
Ok(Entry {
43-
metadata: self
44-
.metadata
45-
.with_context(|| format!("no metadata for key {key:?}"))?,
46-
value: self.value,
69+
inner: kv::Entry {
70+
metadata: Some(
71+
self.metadata
72+
.with_context(|| format!("no metadata for key {key:?}"))?,
73+
),
74+
value: self.value,
75+
},
4776
})
4877
}
4978
}
5079

51-
/// Represents a Rivet KV value.
52-
#[derive(Clone, Debug, Serialize, Deserialize)]
53-
pub struct Entry {
54-
pub metadata: Metadata,
55-
pub value: Vec<u8>,
56-
}
57-
5880
/// Represents FDB keys within a Rivet KV key.
5981
pub(crate) enum SubKey {
6082
Metadata(fdb::future::FdbValue),

packages/edge/infra/client/actor-kv/src/key.rs

Lines changed: 56 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,22 @@
11
use foundationdb::tuple::{
22
Bytes, PackResult, TupleDepth, TuplePack, TupleUnpack, VersionstampOffset,
33
};
4-
use serde::{Deserialize, Serialize};
4+
use pegboard_config::runner_protocol::proto::kv;
5+
use prost::Message;
56

67
// TODO: Custom deser impl that uses arrays instead of objects?
7-
#[derive(Clone, Serialize, Deserialize)]
8-
pub struct Key(Vec<Vec<u8>>);
8+
#[derive(Clone)]
9+
#[repr(transparent)]
10+
pub struct Key {
11+
inner: kv::Key,
12+
}
13+
14+
impl Key {
15+
pub fn convert_vec(value: Vec<kv::Key>) -> Vec<Key> {
16+
// SAFETY: Key is a wrapper around kv::Kky, identical memory layout
17+
unsafe { std::mem::transmute(value) }
18+
}
19+
}
920

1021
impl std::fmt::Debug for Key {
1122
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
@@ -15,24 +26,23 @@ impl std::fmt::Debug for Key {
1526

1627
impl PartialEq for Key {
1728
fn eq(&self, other: &Self) -> bool {
18-
self.0 == other.0
29+
self.inner == other.inner
1930
}
2031
}
2132

2233
impl Eq for Key {}
2334

2435
impl std::hash::Hash for Key {
2536
fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
26-
for buffer in &self.0 {
37+
for buffer in &self.inner.segments {
2738
state.write(buffer);
2839
}
2940
}
3041
}
3142

3243
impl Key {
3344
pub fn len(&self) -> usize {
34-
// Arbitrary 4 accounting for nesting overhead
35-
self.0.iter().fold(0, |acc, x| acc + x.len()) + 4 * self.0.len()
45+
self.inner.encoded_len()
3646
}
3747
}
3848

@@ -47,7 +57,7 @@ impl TuplePack for Key {
4757
w.write_all(&[fdb_util::codes::NESTED])?;
4858
offset += 1;
4959

50-
for v in self.0.iter() {
60+
for v in self.inner.segments.iter() {
5161
offset += v.pack(w, tuple_depth.increment())?;
5262
}
5363

@@ -62,22 +72,41 @@ impl<'de> TupleUnpack<'de> for Key {
6272
fn unpack(mut input: &[u8], tuple_depth: TupleDepth) -> PackResult<(&[u8], Self)> {
6373
input = fdb_util::parse_code(input, fdb_util::codes::NESTED)?;
6474

65-
let mut vec = Vec::new();
75+
let mut segments = Vec::new();
6676
while !is_end_of_tuple(input, true) {
6777
let (rem, v) = Bytes::unpack(input, tuple_depth.increment())?;
6878
input = rem;
69-
vec.push(v.into_owned());
79+
segments.push(v.into_owned());
7080
}
7181

7282
input = fdb_util::parse_code(input, fdb_util::codes::NIL)?;
7383

74-
Ok((input, Key(vec)))
84+
Ok((
85+
input,
86+
Key {
87+
inner: kv::Key { segments },
88+
},
89+
))
90+
}
91+
}
92+
93+
impl From<kv::Key> for Key {
94+
fn from(value: kv::Key) -> Key {
95+
Key { inner: value }
96+
}
97+
}
98+
99+
impl From<Key> for kv::Key {
100+
fn from(value: Key) -> kv::Key {
101+
value.inner
75102
}
76103
}
77104

78105
/// Same as Key: except when packing, it leaves off the NIL byte to allow for an open range.
79-
#[derive(Clone, Serialize, Deserialize)]
80-
pub struct ListKey(Vec<Vec<u8>>);
106+
#[derive(Clone)]
107+
pub struct ListKey {
108+
inner: kv::Key,
109+
}
81110

82111
impl std::fmt::Debug for ListKey {
83112
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
@@ -96,7 +125,7 @@ impl TuplePack for ListKey {
96125
w.write_all(&[fdb_util::codes::NESTED])?;
97126
offset += 1;
98127

99-
for v in &self.0 {
128+
for v in &self.inner.segments {
100129
offset += v.pack(w, tuple_depth.increment())?;
101130
}
102131

@@ -108,8 +137,19 @@ impl TuplePack for ListKey {
108137

109138
impl ListKey {
110139
pub fn len(&self) -> usize {
111-
// Arbitrary 4 accounting for nesting overhead
112-
self.0.iter().fold(0, |acc, x| acc + x.len()) + 4 * self.0.len()
140+
self.inner.encoded_len()
141+
}
142+
}
143+
144+
impl From<kv::Key> for ListKey {
145+
fn from(value: kv::Key) -> ListKey {
146+
ListKey { inner: value }
147+
}
148+
}
149+
150+
impl From<ListKey> for kv::Key {
151+
fn from(value: ListKey) -> kv::Key {
152+
value.inner
113153
}
114154
}
115155

packages/edge/infra/client/actor-kv/src/lib.rs

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -14,15 +14,14 @@ use indexmap::IndexMap;
1414
pub use key::Key;
1515
use list_query::ListLimitReached;
1616
pub use list_query::ListQuery;
17-
pub use metadata::Metadata;
17+
use pegboard_config::runner_protocol::proto::kv;
1818
use prost::Message;
1919
use tokio::sync::Mutex;
2020
use utils::{validate_entries, validate_keys, TransactionExt};
2121

2222
mod entry;
2323
pub mod key;
2424
mod list_query;
25-
mod metadata;
2625
mod utils;
2726

2827
const MAX_KEY_SIZE: usize = 2 * 1024;
@@ -325,8 +324,8 @@ impl ActorKv {
325324
// Clear previous before setting
326325
tx.clear_subspace_range(&key_subspace);
327326

328-
let metadata = Metadata {
329-
kv_version: self.version.as_bytes().to_vec(),
327+
let metadata = kv::Metadata {
328+
version: self.version.as_bytes().to_vec(),
330329
create_ts: utils::now(),
331330
};
332331
let mut buf = Vec::new();

packages/edge/infra/client/actor-kv/src/list_query.rs

Lines changed: 62 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -1,34 +1,40 @@
11
use anyhow::*;
22
use foundationdb::tuple::Subspace;
33
use indexmap::IndexMap;
4-
use serde::{Deserialize, Serialize};
4+
use pegboard_config::runner_protocol::proto::kv;
55

66
use crate::{
77
entry::EntryBuilder,
88
key::{Key, ListKey},
99
MAX_KEY_SIZE,
1010
};
1111

12-
#[derive(Clone, Debug, Serialize, Deserialize)]
13-
#[serde(rename_all = "snake_case")]
12+
#[derive(Clone, Debug)]
1413
pub enum ListQuery {
1514
All,
16-
RangeInclusive(ListKey, Key),
17-
RangeExclusive(ListKey, Key),
15+
Range {
16+
start: ListKey,
17+
end: Key,
18+
exclusive: bool,
19+
},
1820
Prefix(ListKey),
1921
}
2022

2123
impl ListQuery {
2224
pub(crate) fn range(&self, subspace: &Subspace) -> (Vec<u8>, Vec<u8>) {
2325
match self {
2426
ListQuery::All => subspace.range(),
25-
ListQuery::RangeInclusive(start, end) => (
27+
ListQuery::Range {
28+
start,
29+
end,
30+
exclusive,
31+
} => (
2632
subspace.subspace(&start).range().0,
27-
subspace.subspace(&end).range().1,
28-
),
29-
ListQuery::RangeExclusive(start, end) => (
30-
subspace.subspace(&start).range().0,
31-
subspace.subspace(&end).range().1,
33+
if *exclusive {
34+
subspace.subspace(&end).range().0
35+
} else {
36+
subspace.subspace(&end).range().1
37+
},
3238
),
3339
ListQuery::Prefix(prefix) => subspace.subspace(&prefix).range(),
3440
}
@@ -37,7 +43,7 @@ impl ListQuery {
3743
pub(crate) fn validate(&self) -> Result<()> {
3844
match self {
3945
ListQuery::All => {}
40-
ListQuery::RangeInclusive(start, end) => {
46+
ListQuery::Range { start, end, .. } => {
4147
ensure!(
4248
start.len() <= MAX_KEY_SIZE,
4349
"start key is too long (max 2048 bytes)"
@@ -47,16 +53,6 @@ impl ListQuery {
4753
"end key is too long (max 2048 bytes)"
4854
);
4955
}
50-
ListQuery::RangeExclusive(start, end) => {
51-
ensure!(
52-
start.len() <= MAX_KEY_SIZE,
53-
"startAfter key is too long (max 2048 bytes)"
54-
);
55-
ensure!(
56-
end.len() <= MAX_KEY_SIZE,
57-
"end key is too long (max 2048 bytes)"
58-
);
59-
}
6056
ListQuery::Prefix(prefix) => {
6157
ensure!(
6258
prefix.len() <= MAX_KEY_SIZE,
@@ -69,6 +65,50 @@ impl ListQuery {
6965
}
7066
}
7167

68+
impl TryFrom<kv::ListQuery> for ListQuery {
69+
type Error = Error;
70+
71+
fn try_from(value: kv::ListQuery) -> Result<ListQuery> {
72+
match value.kind.context("ListQuery.kind")? {
73+
kv::list_query::Kind::All(_) => Ok(ListQuery::All),
74+
kv::list_query::Kind::Range(range) => Ok(ListQuery::Range {
75+
start: range.start.context("Range.start")?.into(),
76+
end: range.end.context("Range.end")?.into(),
77+
exclusive: range.exclusive,
78+
}),
79+
kv::list_query::Kind::Prefix(prefix) => {
80+
Ok(ListQuery::Prefix(prefix.key.context("Prefix.key")?.into()))
81+
}
82+
}
83+
}
84+
}
85+
86+
impl From<ListQuery> for kv::ListQuery {
87+
fn from(value: ListQuery) -> kv::ListQuery {
88+
match value {
89+
ListQuery::All => kv::ListQuery {
90+
kind: Some(kv::list_query::Kind::All(kv::list_query::All {})),
91+
},
92+
ListQuery::Range {
93+
start,
94+
end,
95+
exclusive,
96+
} => kv::ListQuery {
97+
kind: Some(kv::list_query::Kind::Range(kv::list_query::Range {
98+
start: Some(start.into()),
99+
end: Some(end.into()),
100+
exclusive,
101+
})),
102+
},
103+
ListQuery::Prefix(key) => kv::ListQuery {
104+
kind: Some(kv::list_query::Kind::Prefix(kv::list_query::Prefix {
105+
key: Some(key.into()),
106+
})),
107+
},
108+
}
109+
}
110+
}
111+
72112
// Used to short circuit after the
73113
pub struct ListLimitReached(pub IndexMap<Key, EntryBuilder>);
74114

packages/edge/infra/client/actor-kv/src/metadata.rs

Lines changed: 0 additions & 9 deletions
This file was deleted.

0 commit comments

Comments
 (0)