Skip to content

fix: convert runner protocol to protobuf #2702

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: 07-04-fix_even_more_actor_api_changes
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion packages/edge/infra/client/actor-kv/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,8 @@ fdb-util.workspace = true
foundationdb.workspace = true
futures-util = { version = "0.3" }
indexmap = { version = "2.0" }
prost = "0.13.3"
pegboard-config.workspace = true
prost = "0.14"
rivet-util-id.workspace = true
serde = { version = "1.0.195", features = ["derive"] }
serde_json = "1.0.111"
Expand Down
52 changes: 37 additions & 15 deletions packages/edge/infra/client/actor-kv/src/entry.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,39 @@
use anyhow::*;
use foundationdb as fdb;
use pegboard_config::runner_protocol::proto::kv;
use prost::Message;
use serde::{Deserialize, Serialize};

use crate::{key::Key, metadata::Metadata};
use crate::key::Key;

/// Represents a Rivet KV value.
#[derive(Clone, Debug)]
pub struct Entry {
inner: kv::Entry,
}

impl std::ops::Deref for Entry {
type Target = kv::Entry;

fn deref(&self) -> &Self::Target {
&self.inner
}
}

impl From<kv::Entry> for Entry {
fn from(value: kv::Entry) -> Entry {
Entry { inner: value }
}
}

impl From<Entry> for kv::Entry {
fn from(value: Entry) -> kv::Entry {
value.inner
}
}

#[derive(Default)]
pub(crate) struct EntryBuilder {
metadata: Option<Metadata>,
metadata: Option<kv::Metadata>,
value: Vec<u8>,
next_idx: usize,
}
Expand All @@ -19,7 +45,7 @@ impl EntryBuilder {
// We ignore setting the metadata again because it means the same key was given twice in the
// input keys for `ActorKv::get`. We don't perform automatic deduplication.
if self.metadata.is_none() {
self.metadata = Some(Metadata::decode(value.value())?);
self.metadata = Some(kv::Metadata::decode(value.value())?);
}
}
SubKey::Chunk(idx, value) => {
Expand All @@ -40,21 +66,17 @@ impl EntryBuilder {
ensure!(!self.value.is_empty(), "empty value at key {key:?}");

Ok(Entry {
metadata: self
.metadata
.with_context(|| format!("no metadata for key {key:?}"))?,
value: self.value,
inner: kv::Entry {
metadata: Some(
self.metadata
.with_context(|| format!("no metadata for key {key:?}"))?,
),
value: self.value,
},
})
}
}

/// Represents a Rivet KV value.
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct Entry {
pub metadata: Metadata,
pub value: Vec<u8>,
}

/// Represents FDB keys within a Rivet KV key.
pub(crate) enum SubKey {
Metadata(fdb::future::FdbValue),
Expand Down
72 changes: 56 additions & 16 deletions packages/edge/infra/client/actor-kv/src/key.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,22 @@
use foundationdb::tuple::{
Bytes, PackResult, TupleDepth, TuplePack, TupleUnpack, VersionstampOffset,
};
use serde::{Deserialize, Serialize};
use pegboard_config::runner_protocol::proto::kv;
use prost::Message;

// TODO: Custom deser impl that uses arrays instead of objects?
#[derive(Clone, Serialize, Deserialize)]
pub struct Key(Vec<Vec<u8>>);
#[derive(Clone)]
#[repr(transparent)]
pub struct Key {
inner: kv::Key,
}

impl Key {
pub fn convert_vec(value: Vec<kv::Key>) -> Vec<Key> {
// SAFETY: Key is a wrapper around kv::Kky, identical memory layout
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's a small typo in the safety comment: kv::Kky should be kv::Key. This might cause confusion for future readers trying to understand the memory layout guarantee being referenced.

Spotted by Diamond

Is this helpful? React 👍 or 👎 to let us know.

unsafe { std::mem::transmute(value) }
}
}

impl std::fmt::Debug for Key {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
Expand All @@ -15,24 +26,23 @@ impl std::fmt::Debug for Key {

impl PartialEq for Key {
fn eq(&self, other: &Self) -> bool {
self.0 == other.0
self.inner == other.inner
}
}

impl Eq for Key {}

impl std::hash::Hash for Key {
fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
for buffer in &self.0 {
for buffer in &self.inner.segments {
state.write(buffer);
}
}
}

impl Key {
pub fn len(&self) -> usize {
// Arbitrary 4 accounting for nesting overhead
self.0.iter().fold(0, |acc, x| acc + x.len()) + 4 * self.0.len()
self.inner.encoded_len()
}
}

Expand All @@ -47,7 +57,7 @@ impl TuplePack for Key {
w.write_all(&[fdb_util::codes::NESTED])?;
offset += 1;

for v in self.0.iter() {
for v in self.inner.segments.iter() {
offset += v.pack(w, tuple_depth.increment())?;
}

Expand All @@ -62,22 +72,41 @@ impl<'de> TupleUnpack<'de> for Key {
fn unpack(mut input: &[u8], tuple_depth: TupleDepth) -> PackResult<(&[u8], Self)> {
input = fdb_util::parse_code(input, fdb_util::codes::NESTED)?;

let mut vec = Vec::new();
let mut segments = Vec::new();
while !is_end_of_tuple(input, true) {
let (rem, v) = Bytes::unpack(input, tuple_depth.increment())?;
input = rem;
vec.push(v.into_owned());
segments.push(v.into_owned());
}

input = fdb_util::parse_code(input, fdb_util::codes::NIL)?;

Ok((input, Key(vec)))
Ok((
input,
Key {
inner: kv::Key { segments },
},
))
}
}

impl From<kv::Key> for Key {
fn from(value: kv::Key) -> Key {
Key { inner: value }
}
}

impl From<Key> for kv::Key {
fn from(value: Key) -> kv::Key {
value.inner
}
}

/// Same as Key: except when packing, it leaves off the NIL byte to allow for an open range.
#[derive(Clone, Serialize, Deserialize)]
pub struct ListKey(Vec<Vec<u8>>);
#[derive(Clone)]
pub struct ListKey {
inner: kv::Key,
}

impl std::fmt::Debug for ListKey {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
Expand All @@ -96,7 +125,7 @@ impl TuplePack for ListKey {
w.write_all(&[fdb_util::codes::NESTED])?;
offset += 1;

for v in &self.0 {
for v in &self.inner.segments {
offset += v.pack(w, tuple_depth.increment())?;
}

Expand All @@ -108,8 +137,19 @@ impl TuplePack for ListKey {

impl ListKey {
pub fn len(&self) -> usize {
// Arbitrary 4 accounting for nesting overhead
self.0.iter().fold(0, |acc, x| acc + x.len()) + 4 * self.0.len()
self.inner.encoded_len()
}
}

impl From<kv::Key> for ListKey {
fn from(value: kv::Key) -> ListKey {
ListKey { inner: value }
}
}

impl From<ListKey> for kv::Key {
fn from(value: ListKey) -> kv::Key {
value.inner
}
}

Expand Down
7 changes: 3 additions & 4 deletions packages/edge/infra/client/actor-kv/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,14 @@ use indexmap::IndexMap;
pub use key::Key;
use list_query::ListLimitReached;
pub use list_query::ListQuery;
pub use metadata::Metadata;
use pegboard_config::runner_protocol::proto::kv;
use prost::Message;
use tokio::sync::Mutex;
use utils::{validate_entries, validate_keys, TransactionExt};

mod entry;
pub mod key;
mod list_query;
mod metadata;
mod utils;

const MAX_KEY_SIZE: usize = 2 * 1024;
Expand Down Expand Up @@ -325,8 +324,8 @@ impl ActorKv {
// Clear previous before setting
tx.clear_subspace_range(&key_subspace);

let metadata = Metadata {
kv_version: self.version.as_bytes().to_vec(),
let metadata = kv::Metadata {
version: self.version.as_bytes().to_vec(),
create_ts: utils::now(),
};
let mut buf = Vec::new();
Expand Down
84 changes: 62 additions & 22 deletions packages/edge/infra/client/actor-kv/src/list_query.rs
Original file line number Diff line number Diff line change
@@ -1,34 +1,40 @@
use anyhow::*;
use foundationdb::tuple::Subspace;
use indexmap::IndexMap;
use serde::{Deserialize, Serialize};
use pegboard_config::runner_protocol::proto::kv;

use crate::{
entry::EntryBuilder,
key::{Key, ListKey},
MAX_KEY_SIZE,
};

#[derive(Clone, Debug, Serialize, Deserialize)]
#[serde(rename_all = "snake_case")]
#[derive(Clone, Debug)]
pub enum ListQuery {
All,
RangeInclusive(ListKey, Key),
RangeExclusive(ListKey, Key),
Range {
start: ListKey,
end: Key,
exclusive: bool,
},
Prefix(ListKey),
}

impl ListQuery {
pub(crate) fn range(&self, subspace: &Subspace) -> (Vec<u8>, Vec<u8>) {
match self {
ListQuery::All => subspace.range(),
ListQuery::RangeInclusive(start, end) => (
ListQuery::Range {
start,
end,
exclusive,
} => (
subspace.subspace(&start).range().0,
subspace.subspace(&end).range().1,
),
ListQuery::RangeExclusive(start, end) => (
subspace.subspace(&start).range().0,
subspace.subspace(&end).range().1,
if *exclusive {
subspace.subspace(&end).range().0
} else {
subspace.subspace(&end).range().1
},
),
ListQuery::Prefix(prefix) => subspace.subspace(&prefix).range(),
}
Expand All @@ -37,7 +43,7 @@ impl ListQuery {
pub(crate) fn validate(&self) -> Result<()> {
match self {
ListQuery::All => {}
ListQuery::RangeInclusive(start, end) => {
ListQuery::Range { start, end, .. } => {
ensure!(
start.len() <= MAX_KEY_SIZE,
"start key is too long (max 2048 bytes)"
Expand All @@ -47,16 +53,6 @@ impl ListQuery {
"end key is too long (max 2048 bytes)"
);
}
ListQuery::RangeExclusive(start, end) => {
ensure!(
start.len() <= MAX_KEY_SIZE,
"startAfter key is too long (max 2048 bytes)"
);
ensure!(
end.len() <= MAX_KEY_SIZE,
"end key is too long (max 2048 bytes)"
);
}
ListQuery::Prefix(prefix) => {
ensure!(
prefix.len() <= MAX_KEY_SIZE,
Expand All @@ -69,6 +65,50 @@ impl ListQuery {
}
}

impl TryFrom<kv::ListQuery> for ListQuery {
type Error = Error;

fn try_from(value: kv::ListQuery) -> Result<ListQuery> {
match value.kind.context("ListQuery.kind")? {
kv::list_query::Kind::All(_) => Ok(ListQuery::All),
kv::list_query::Kind::Range(range) => Ok(ListQuery::Range {
start: range.start.context("Range.start")?.into(),
end: range.end.context("Range.end")?.into(),
exclusive: range.exclusive,
}),
kv::list_query::Kind::Prefix(prefix) => {
Ok(ListQuery::Prefix(prefix.key.context("Prefix.key")?.into()))
}
}
}
}

impl From<ListQuery> for kv::ListQuery {
fn from(value: ListQuery) -> kv::ListQuery {
match value {
ListQuery::All => kv::ListQuery {
kind: Some(kv::list_query::Kind::All(kv::list_query::All {})),
},
ListQuery::Range {
start,
end,
exclusive,
} => kv::ListQuery {
kind: Some(kv::list_query::Kind::Range(kv::list_query::Range {
start: Some(start.into()),
end: Some(end.into()),
exclusive,
})),
},
ListQuery::Prefix(key) => kv::ListQuery {
kind: Some(kv::list_query::Kind::Prefix(kv::list_query::Prefix {
key: Some(key.into()),
})),
},
}
}
}

// Used to short circuit after the
pub struct ListLimitReached(pub IndexMap<Key, EntryBuilder>);

Expand Down
9 changes: 0 additions & 9 deletions packages/edge/infra/client/actor-kv/src/metadata.rs

This file was deleted.

Loading
Loading