From e758a03ea1961195195f6feabc29899cfe45c6d4 Mon Sep 17 00:00:00 2001 From: MasterPtato Date: Thu, 3 Jul 2025 01:31:17 +0000 Subject: [PATCH] fix: convert runner protocol to protobuf --- .../edge/infra/client/actor-kv/Cargo.toml | 3 +- .../edge/infra/client/actor-kv/src/entry.rs | 52 ++- .../edge/infra/client/actor-kv/src/key.rs | 72 +++- .../edge/infra/client/actor-kv/src/lib.rs | 7 +- .../infra/client/actor-kv/src/list_query.rs | 84 +++-- .../infra/client/actor-kv/src/metadata.rs | 9 - .../edge/infra/client/actor-kv/src/utils.rs | 2 +- packages/edge/infra/client/config/Cargo.toml | 7 +- packages/edge/infra/client/config/build.rs | 13 + .../client/config/resources/proto/kv.proto | 104 ++++++ .../resources/proto/runner_protocol.proto | 148 ++++++++ .../client/config/src/runner_protocol.rs | 138 ++----- .../infra/client/manager/src/actor/mod.rs | 349 ++++++++++++++---- .../infra/client/manager/src/runner/mod.rs | 48 ++- .../edge/services/pegboard/src/protocol.rs | 1 + .../src/workflows/actor/v1/runtime.rs | 7 +- .../usage-metrics-publish/src/lib.rs | 2 +- 17 files changed, 765 insertions(+), 281 deletions(-) delete mode 100644 packages/edge/infra/client/actor-kv/src/metadata.rs create mode 100644 packages/edge/infra/client/config/build.rs create mode 100644 packages/edge/infra/client/config/resources/proto/kv.proto create mode 100644 packages/edge/infra/client/config/resources/proto/runner_protocol.proto diff --git a/packages/edge/infra/client/actor-kv/Cargo.toml b/packages/edge/infra/client/actor-kv/Cargo.toml index 23d54865d3..ce9c12b004 100644 --- a/packages/edge/infra/client/actor-kv/Cargo.toml +++ b/packages/edge/infra/client/actor-kv/Cargo.toml @@ -11,7 +11,8 @@ fdb-util.workspace = true foundationdb.workspace = true futures-util = { version = "0.3" } indexmap = { version = "2.0" } -prost = "0.13.3" +pegboard-config.workspace = true +prost = "0.14" rivet-util-id.workspace = true serde = { version = "1.0.195", features = ["derive"] } serde_json = "1.0.111" diff --git a/packages/edge/infra/client/actor-kv/src/entry.rs b/packages/edge/infra/client/actor-kv/src/entry.rs index d9ffce45df..66aa7a733d 100644 --- a/packages/edge/infra/client/actor-kv/src/entry.rs +++ b/packages/edge/infra/client/actor-kv/src/entry.rs @@ -1,13 +1,39 @@ use anyhow::*; use foundationdb as fdb; +use pegboard_config::runner_protocol::proto::kv; use prost::Message; -use serde::{Deserialize, Serialize}; -use crate::{key::Key, metadata::Metadata}; +use crate::key::Key; + +/// Represents a Rivet KV value. +#[derive(Clone, Debug)] +pub struct Entry { + inner: kv::Entry, +} + +impl std::ops::Deref for Entry { + type Target = kv::Entry; + + fn deref(&self) -> &Self::Target { + &self.inner + } +} + +impl From for Entry { + fn from(value: kv::Entry) -> Entry { + Entry { inner: value } + } +} + +impl From for kv::Entry { + fn from(value: Entry) -> kv::Entry { + value.inner + } +} #[derive(Default)] pub(crate) struct EntryBuilder { - metadata: Option, + metadata: Option, value: Vec, next_idx: usize, } @@ -19,7 +45,7 @@ impl EntryBuilder { // We ignore setting the metadata again because it means the same key was given twice in the // input keys for `ActorKv::get`. We don't perform automatic deduplication. if self.metadata.is_none() { - self.metadata = Some(Metadata::decode(value.value())?); + self.metadata = Some(kv::Metadata::decode(value.value())?); } } SubKey::Chunk(idx, value) => { @@ -40,21 +66,17 @@ impl EntryBuilder { ensure!(!self.value.is_empty(), "empty value at key {key:?}"); Ok(Entry { - metadata: self - .metadata - .with_context(|| format!("no metadata for key {key:?}"))?, - value: self.value, + inner: kv::Entry { + metadata: Some( + self.metadata + .with_context(|| format!("no metadata for key {key:?}"))?, + ), + value: self.value, + }, }) } } -/// Represents a Rivet KV value. -#[derive(Clone, Debug, Serialize, Deserialize)] -pub struct Entry { - pub metadata: Metadata, - pub value: Vec, -} - /// Represents FDB keys within a Rivet KV key. pub(crate) enum SubKey { Metadata(fdb::future::FdbValue), diff --git a/packages/edge/infra/client/actor-kv/src/key.rs b/packages/edge/infra/client/actor-kv/src/key.rs index 44c432411f..33d6ea7647 100644 --- a/packages/edge/infra/client/actor-kv/src/key.rs +++ b/packages/edge/infra/client/actor-kv/src/key.rs @@ -1,11 +1,22 @@ use foundationdb::tuple::{ Bytes, PackResult, TupleDepth, TuplePack, TupleUnpack, VersionstampOffset, }; -use serde::{Deserialize, Serialize}; +use pegboard_config::runner_protocol::proto::kv; +use prost::Message; // TODO: Custom deser impl that uses arrays instead of objects? -#[derive(Clone, Serialize, Deserialize)] -pub struct Key(Vec>); +#[derive(Clone)] +#[repr(transparent)] +pub struct Key { + inner: kv::Key, +} + +impl Key { + pub fn convert_vec(value: Vec) -> Vec { + // SAFETY: Key is a wrapper around kv::Kky, identical memory layout + unsafe { std::mem::transmute(value) } + } +} impl std::fmt::Debug for Key { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { @@ -15,7 +26,7 @@ impl std::fmt::Debug for Key { impl PartialEq for Key { fn eq(&self, other: &Self) -> bool { - self.0 == other.0 + self.inner == other.inner } } @@ -23,7 +34,7 @@ impl Eq for Key {} impl std::hash::Hash for Key { fn hash(&self, state: &mut H) { - for buffer in &self.0 { + for buffer in &self.inner.segments { state.write(buffer); } } @@ -31,8 +42,7 @@ impl std::hash::Hash for Key { impl Key { pub fn len(&self) -> usize { - // Arbitrary 4 accounting for nesting overhead - self.0.iter().fold(0, |acc, x| acc + x.len()) + 4 * self.0.len() + self.inner.encoded_len() } } @@ -47,7 +57,7 @@ impl TuplePack for Key { w.write_all(&[fdb_util::codes::NESTED])?; offset += 1; - for v in self.0.iter() { + for v in self.inner.segments.iter() { offset += v.pack(w, tuple_depth.increment())?; } @@ -62,22 +72,41 @@ impl<'de> TupleUnpack<'de> for Key { fn unpack(mut input: &[u8], tuple_depth: TupleDepth) -> PackResult<(&[u8], Self)> { input = fdb_util::parse_code(input, fdb_util::codes::NESTED)?; - let mut vec = Vec::new(); + let mut segments = Vec::new(); while !is_end_of_tuple(input, true) { let (rem, v) = Bytes::unpack(input, tuple_depth.increment())?; input = rem; - vec.push(v.into_owned()); + segments.push(v.into_owned()); } input = fdb_util::parse_code(input, fdb_util::codes::NIL)?; - Ok((input, Key(vec))) + Ok(( + input, + Key { + inner: kv::Key { segments }, + }, + )) + } +} + +impl From for Key { + fn from(value: kv::Key) -> Key { + Key { inner: value } + } +} + +impl From for kv::Key { + fn from(value: Key) -> kv::Key { + value.inner } } /// Same as Key: except when packing, it leaves off the NIL byte to allow for an open range. -#[derive(Clone, Serialize, Deserialize)] -pub struct ListKey(Vec>); +#[derive(Clone)] +pub struct ListKey { + inner: kv::Key, +} impl std::fmt::Debug for ListKey { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { @@ -96,7 +125,7 @@ impl TuplePack for ListKey { w.write_all(&[fdb_util::codes::NESTED])?; offset += 1; - for v in &self.0 { + for v in &self.inner.segments { offset += v.pack(w, tuple_depth.increment())?; } @@ -108,8 +137,19 @@ impl TuplePack for ListKey { impl ListKey { pub fn len(&self) -> usize { - // Arbitrary 4 accounting for nesting overhead - self.0.iter().fold(0, |acc, x| acc + x.len()) + 4 * self.0.len() + self.inner.encoded_len() + } +} + +impl From for ListKey { + fn from(value: kv::Key) -> ListKey { + ListKey { inner: value } + } +} + +impl From for kv::Key { + fn from(value: ListKey) -> kv::Key { + value.inner } } diff --git a/packages/edge/infra/client/actor-kv/src/lib.rs b/packages/edge/infra/client/actor-kv/src/lib.rs index 7e8ac6dc2f..81605b899d 100644 --- a/packages/edge/infra/client/actor-kv/src/lib.rs +++ b/packages/edge/infra/client/actor-kv/src/lib.rs @@ -14,7 +14,7 @@ use indexmap::IndexMap; pub use key::Key; use list_query::ListLimitReached; pub use list_query::ListQuery; -pub use metadata::Metadata; +use pegboard_config::runner_protocol::proto::kv; use prost::Message; use tokio::sync::Mutex; use utils::{validate_entries, validate_keys, TransactionExt}; @@ -22,7 +22,6 @@ use utils::{validate_entries, validate_keys, TransactionExt}; mod entry; pub mod key; mod list_query; -mod metadata; mod utils; const MAX_KEY_SIZE: usize = 2 * 1024; @@ -325,8 +324,8 @@ impl ActorKv { // Clear previous before setting tx.clear_subspace_range(&key_subspace); - let metadata = Metadata { - kv_version: self.version.as_bytes().to_vec(), + let metadata = kv::Metadata { + version: self.version.as_bytes().to_vec(), create_ts: utils::now(), }; let mut buf = Vec::new(); diff --git a/packages/edge/infra/client/actor-kv/src/list_query.rs b/packages/edge/infra/client/actor-kv/src/list_query.rs index c214337e46..7ee6dcff11 100644 --- a/packages/edge/infra/client/actor-kv/src/list_query.rs +++ b/packages/edge/infra/client/actor-kv/src/list_query.rs @@ -1,7 +1,7 @@ use anyhow::*; use foundationdb::tuple::Subspace; use indexmap::IndexMap; -use serde::{Deserialize, Serialize}; +use pegboard_config::runner_protocol::proto::kv; use crate::{ entry::EntryBuilder, @@ -9,12 +9,14 @@ use crate::{ MAX_KEY_SIZE, }; -#[derive(Clone, Debug, Serialize, Deserialize)] -#[serde(rename_all = "snake_case")] +#[derive(Clone, Debug)] pub enum ListQuery { All, - RangeInclusive(ListKey, Key), - RangeExclusive(ListKey, Key), + Range { + start: ListKey, + end: Key, + exclusive: bool, + }, Prefix(ListKey), } @@ -22,13 +24,17 @@ impl ListQuery { pub(crate) fn range(&self, subspace: &Subspace) -> (Vec, Vec) { match self { ListQuery::All => subspace.range(), - ListQuery::RangeInclusive(start, end) => ( + ListQuery::Range { + start, + end, + exclusive, + } => ( subspace.subspace(&start).range().0, - subspace.subspace(&end).range().1, - ), - ListQuery::RangeExclusive(start, end) => ( - subspace.subspace(&start).range().0, - subspace.subspace(&end).range().1, + if *exclusive { + subspace.subspace(&end).range().0 + } else { + subspace.subspace(&end).range().1 + }, ), ListQuery::Prefix(prefix) => subspace.subspace(&prefix).range(), } @@ -37,7 +43,7 @@ impl ListQuery { pub(crate) fn validate(&self) -> Result<()> { match self { ListQuery::All => {} - ListQuery::RangeInclusive(start, end) => { + ListQuery::Range { start, end, .. } => { ensure!( start.len() <= MAX_KEY_SIZE, "start key is too long (max 2048 bytes)" @@ -47,16 +53,6 @@ impl ListQuery { "end key is too long (max 2048 bytes)" ); } - ListQuery::RangeExclusive(start, end) => { - ensure!( - start.len() <= MAX_KEY_SIZE, - "startAfter key is too long (max 2048 bytes)" - ); - ensure!( - end.len() <= MAX_KEY_SIZE, - "end key is too long (max 2048 bytes)" - ); - } ListQuery::Prefix(prefix) => { ensure!( prefix.len() <= MAX_KEY_SIZE, @@ -69,6 +65,50 @@ impl ListQuery { } } +impl TryFrom for ListQuery { + type Error = Error; + + fn try_from(value: kv::ListQuery) -> Result { + match value.kind.context("ListQuery.kind")? { + kv::list_query::Kind::All(_) => Ok(ListQuery::All), + kv::list_query::Kind::Range(range) => Ok(ListQuery::Range { + start: range.start.context("Range.start")?.into(), + end: range.end.context("Range.end")?.into(), + exclusive: range.exclusive, + }), + kv::list_query::Kind::Prefix(prefix) => { + Ok(ListQuery::Prefix(prefix.key.context("Prefix.key")?.into())) + } + } + } +} + +impl From for kv::ListQuery { + fn from(value: ListQuery) -> kv::ListQuery { + match value { + ListQuery::All => kv::ListQuery { + kind: Some(kv::list_query::Kind::All(kv::list_query::All {})), + }, + ListQuery::Range { + start, + end, + exclusive, + } => kv::ListQuery { + kind: Some(kv::list_query::Kind::Range(kv::list_query::Range { + start: Some(start.into()), + end: Some(end.into()), + exclusive, + })), + }, + ListQuery::Prefix(key) => kv::ListQuery { + kind: Some(kv::list_query::Kind::Prefix(kv::list_query::Prefix { + key: Some(key.into()), + })), + }, + } + } +} + // Used to short circuit after the pub struct ListLimitReached(pub IndexMap); diff --git a/packages/edge/infra/client/actor-kv/src/metadata.rs b/packages/edge/infra/client/actor-kv/src/metadata.rs deleted file mode 100644 index a1af22fea6..0000000000 --- a/packages/edge/infra/client/actor-kv/src/metadata.rs +++ /dev/null @@ -1,9 +0,0 @@ -use serde::{Deserialize, Serialize}; - -#[derive(Clone, PartialEq, ::prost::Message, Deserialize, Serialize)] -pub struct Metadata { - #[prost(bytes = "vec", tag = "1")] - pub kv_version: Vec, - #[prost(int64, tag = "2")] - pub create_ts: i64, -} diff --git a/packages/edge/infra/client/actor-kv/src/utils.rs b/packages/edge/infra/client/actor-kv/src/utils.rs index 2fd4274fb9..040c92cde2 100644 --- a/packages/edge/infra/client/actor-kv/src/utils.rs +++ b/packages/edge/infra/client/actor-kv/src/utils.rs @@ -1,4 +1,4 @@ -use std::{collections::HashMap, result::Result::Ok}; +use std::result::Result::Ok; use anyhow::*; use foundationdb as fdb; diff --git a/packages/edge/infra/client/config/Cargo.toml b/packages/edge/infra/client/config/Cargo.toml index caffc30362..9b3ee0772a 100644 --- a/packages/edge/infra/client/config/Cargo.toml +++ b/packages/edge/infra/client/config/Cargo.toml @@ -10,11 +10,14 @@ anyhow = "1.0" indexmap = { version = "2.0" } ipnet = { version = "2.10.1", features = ["serde"] } pegboard.workspace = true -pegboard-actor-kv.workspace = true +prost = "0.14" rivet-util-id.workspace = true schemars = { version = "0.8.21", features = ["url", "uuid1"] } serde = { version = "1.0.195", features = ["derive"] } serde_json = "1.0" tokio-util = { version = "0.7", features = ["codec"] } url = "2.2.2" -uuid = { version = "1.6.1", features = ["v4"] } \ No newline at end of file +uuid = { version = "1.6.1", features = ["v4"] } + +[build-dependencies] +prost-build = "0.14" diff --git a/packages/edge/infra/client/config/build.rs b/packages/edge/infra/client/config/build.rs new file mode 100644 index 0000000000..6ab03227ce --- /dev/null +++ b/packages/edge/infra/client/config/build.rs @@ -0,0 +1,13 @@ +use std::io::Result; + +fn main() -> Result<()> { + prost_build::compile_protos( + &[ + "resources/proto/kv.proto", + "resources/proto/runner_protocol.proto", + ], + &["resources/proto/"], + )?; + + Ok(()) +} diff --git a/packages/edge/infra/client/config/resources/proto/kv.proto b/packages/edge/infra/client/config/resources/proto/kv.proto new file mode 100644 index 0000000000..89e7c0f0f9 --- /dev/null +++ b/packages/edge/infra/client/config/resources/proto/kv.proto @@ -0,0 +1,104 @@ +syntax = "proto3"; + +package rivet.pegboard.kv; + +message Request { + message Get { + repeated Key keys = 1; + } + + message List { + ListQuery query = 1; + bool reverse = 2; + optional uint64 limit = 3; + } + + message Put { + repeated Key keys = 1; + repeated bytes values = 2; + } + + message Delete { + repeated Key keys = 1; + } + + message Drop {} + + string actor_id = 1; + uint32 generation = 2; + uint32 request_id = 3; + + oneof data { + Get get = 4; + List list = 5; + Put put = 6; + Delete delete = 7; + Drop drop = 8; + } +} + +message Response { + message Error { + string message = 1; + } + + message Get { + repeated Key keys = 1; + repeated Entry values = 2; + } + + message List { + repeated Key keys = 1; + repeated Entry values = 2; + } + + message Put {} + + message Delete {} + + message Drop {} + + uint32 request_id = 1; + oneof data { + Error error = 2; + Get get = 3; + List list = 4; + Put put = 5; + Delete delete = 6; + Drop drop = 7; + } +} + +message Key { + repeated bytes segments = 1; +} + +message Entry { + bytes value = 1; + optional Metadata metadata = 2; +} + +message Metadata { + bytes _version = 1; + int64 create_ts = 2; +} + +message ListQuery { + message All {} + + message Range { + Key start = 1; + Key end = 2; + bool exclusive = 3; + } + + message Prefix { + Key key = 1; + } + + oneof kind { + All all = 1; + Range range = 2; + Prefix prefix = 3; + } +} diff --git a/packages/edge/infra/client/config/resources/proto/runner_protocol.proto b/packages/edge/infra/client/config/resources/proto/runner_protocol.proto new file mode 100644 index 0000000000..9bf3417975 --- /dev/null +++ b/packages/edge/infra/client/config/resources/proto/runner_protocol.proto @@ -0,0 +1,148 @@ +syntax = "proto3"; + +package rivet.pegboard.runner_protocol; + +import "kv.proto"; + +message ToManager { + message ActorStateUpdate { + string actor_id = 1; + uint32 generation = 2; + ActorState state = 3; + } + + message Ping {} + + oneof message { + ActorStateUpdate actor_state_update = 1; + Ping ping = 2; + rivet.pegboard.kv.Request kv = 3; + } +} + +message ToRunner { + message Init { + bytes input = 1; + } + + message Pong {} + + message Close { + optional string reason = 1; + } + + message StartActor { + string actor_id = 1; + uint32 generation = 2; + map env = 3; + ActorMetadata metadata = 4; + } + + message SignalActor { + string actor_id = 1; + uint32 generation = 2; + int32 signal = 3; + bool persist_storage = 4; + } + + oneof message { + Init init = 1; + Pong pong = 2; + Close close = 3; + StartActor start_actor = 4; + SignalActor signal_actor = 5; + rivet.pegboard.kv.Response kv = 6; + } +} + +message ActorMetadata { + message Actor { + string actor_id = 1; + map tags = 2; + int64 create_ts = 3; + } + + message Network { + map ports = 1; + } + + message Project { + string project_id = 1; + string slug = 2; + } + + message Environment { + string env_id = 1; + string slug = 2; + } + + message Datacenter { + string name_id = 1; + string display_name = 2; + } + + message Cluster { + string cluster_id = 1; + } + + message Build { + string build_id = 1; + } + + Actor actor = 1; + Network network = 2; + Project project = 3; + Environment environment = 4; + Datacenter datacenter = 5; + Cluster cluster = 6; + Build build = 7; +} + +message Port { + optional int32 internal_port = 1; + optional string public_hostname = 2; + optional int32 public_port = 3; + optional string public_path = 4; + Routing routing = 5; +} + +message Routing { + message GameGuard { + GameGuardProtocol protocol = 1; + } + + message Host { + HostProtocol protocol = 1; + } + + oneof routing { + GameGuard game_guard = 1; + Host host = 2; + } +} + +enum GameGuardProtocol { + GG_HTTP = 0; + GG_HTTPS = 1; + GG_TCP = 2; + GG_TCP_TLS = 3; + GG_UDP = 4; +} + +enum HostProtocol { + HOST_TCP = 0; + HOST_UDP = 1; +} + +message ActorState { + message Running {} + + message Exited { + optional int32 exit_code = 1; + } + + oneof state { + Running running = 1; + Exited exited = 2; + } +} diff --git a/packages/edge/infra/client/config/src/runner_protocol.rs b/packages/edge/infra/client/config/src/runner_protocol.rs index ca6bc98dc9..0600a2e762 100644 --- a/packages/edge/infra/client/config/src/runner_protocol.rs +++ b/packages/edge/infra/client/config/src/runner_protocol.rs @@ -1,114 +1,29 @@ -use std::io::{Cursor, Write}; - use anyhow::*; -use pegboard::protocol; -use pegboard_actor_kv as kv; -use serde::{de::DeserializeOwned, Deserialize, Serialize}; +use prost::Message; use tokio_util::codec::LengthDelimitedCodec; -#[derive(Debug, Serialize, Deserialize)] -#[serde(rename_all = "snake_case", deny_unknown_fields)] -pub enum ToManager { - ActorStateUpdate { - actor_id: rivet_util_id::Id, - generation: u32, - state: ActorState, - }, - Ping, - Kv(KvRequest), -} +// Include generated protobuf code +pub mod proto { + pub mod kv { + include!(concat!(env!("OUT_DIR"), "/rivet.pegboard.kv.rs")); + } -#[derive(Debug, Serialize, Deserialize)] -#[serde(rename_all = "snake_case", deny_unknown_fields)] -pub enum ToRunner { - Pong, - Close { - reason: Option, - }, - - StartActor { - actor_id: rivet_util_id::Id, - generation: u32, - env: protocol::HashableMap, - metadata: protocol::Raw, - }, - SignalActor { - actor_id: rivet_util_id::Id, - generation: u32, - signal: i32, - persist_storage: bool, - }, - Kv(KvResponse), -} - -#[derive(Clone, Debug, Serialize, Deserialize)] -#[serde(rename_all = "snake_case", deny_unknown_fields)] -pub enum ActorState { - Running, - Exited { exit_code: Option }, -} - -#[derive(Clone, Debug, Serialize, Deserialize)] -pub struct KvRequest { - pub actor_id: rivet_util_id::Id, - // TODO: This shouldn't require generation since all gens share the same kv - pub generation: u32, - /// Deduplication id. - pub request_id: u32, - pub data: KvRequestData, -} - -#[derive(Clone, Debug, Serialize, Deserialize)] -#[serde(rename_all = "snake_case", deny_unknown_fields)] -pub enum KvRequestData { - Get { - keys: Vec, - }, - List { - query: kv::ListQuery, - reverse: bool, - limit: Option, - }, - Put { - keys: Vec, - values: Vec>, - }, - Delete { - keys: Vec, - }, - Drop {}, -} - -#[derive(Debug, Serialize, Deserialize)] -pub struct KvResponse { - /// Deduplication id. - pub request_id: u32, - pub data: Option, - pub error: Option, -} - -#[derive(Debug, Serialize, Deserialize)] -#[serde(rename_all = "snake_case", deny_unknown_fields)] -pub enum KvResponseData { - Get { - keys: Vec, - values: Vec, - }, - List { - keys: Vec, - values: Vec, - }, - Put {}, - Delete {}, - Drop {}, + pub mod runner_protocol { + include!(concat!( + env!("OUT_DIR"), + "/rivet.pegboard.runner_protocol.rs" + )); + } + pub use runner_protocol::*; } // Small subset of the ToRunner enum that gets proxied to the actor -#[derive(Debug, Clone, Serialize, Deserialize)] -#[serde(rename_all = "snake_case", deny_unknown_fields)] +#[derive(Debug, Clone)] pub enum ToActor { - StateUpdate { state: ActorState }, - Kv(KvRequest), + StateUpdate { + state: proto::ActorState, + }, + Kv(proto::kv::Request), } pub fn codec() -> LengthDelimitedCodec { @@ -124,27 +39,24 @@ pub fn codec() -> LengthDelimitedCodec { .new_codec() } -pub fn encode_frame(payload: &T) -> Result> { - let mut buf = Vec::with_capacity(4); - let mut cursor = Cursor::new(&mut buf); - - cursor.write(&[0u8; 4])?; // header (currently unused) +pub fn encode_frame(payload: &T) -> Result> { + let mut buf = Vec::with_capacity(4 + payload.encoded_len()); - serde_json::to_writer(&mut cursor, payload)?; + buf.extend_from_slice(&[0u8; 4]); // header (currently unused) - cursor.flush()?; + payload.encode(&mut buf)?; Ok(buf) } -pub fn decode_frame(frame: &[u8]) -> Result<([u8; 4], T)> { +pub fn decode_frame(frame: &[u8]) -> Result<([u8; 4], T)> { ensure!(frame.len() >= 4, "Frame too short"); // Extract the header (first 4 bytes) let header = [frame[0], frame[1], frame[2], frame[3]]; - // Deserialize the rest of the frame (payload after the header) - let payload = serde_json::from_slice(&frame[4..])?; + // Decode the rest of the frame (payload after the header) + let payload = T::decode(&frame[4..])?; Ok((header, payload)) } diff --git a/packages/edge/infra/client/manager/src/actor/mod.rs b/packages/edge/infra/client/manager/src/actor/mod.rs index 80d758171b..4dec369805 100644 --- a/packages/edge/infra/client/manager/src/actor/mod.rs +++ b/packages/edge/infra/client/manager/src/actor/mod.rs @@ -1,4 +1,5 @@ use std::{ + collections::HashMap, result::Result::{Err, Ok}, sync::Arc, }; @@ -133,11 +134,19 @@ impl Actor { } protocol::ImageAllocationType::Multi => { self.runner - .send(&runner_protocol::ToRunner::StartActor { - actor_id: self.actor_id, - generation: self.generation, - env: self.config.env.clone(), - metadata: self.config.metadata.clone(), + .send(&runner_protocol::proto::ToRunner { + message: Some( + runner_protocol::proto::to_runner::Message::StartActor( + runner_protocol::proto::to_runner::StartActor { + actor_id: self.actor_id.to_string(), + generation: self.generation, + env: self.runner.config().env.clone().into(), + metadata: Some(convert_actor_metadata_to_proto( + &self.config.metadata.deserialize()?, + )), + }, + ), + ), }) .await?; } @@ -152,11 +161,19 @@ impl Actor { } protocol::ImageAllocationType::Multi => { self.runner - .send(&runner_protocol::ToRunner::StartActor { - actor_id: self.actor_id, - generation: self.generation, - env: self.config.env.clone(), - metadata: self.config.metadata.clone(), + .send(&runner_protocol::proto::ToRunner { + message: Some( + runner_protocol::proto::to_runner::Message::StartActor( + runner_protocol::proto::to_runner::StartActor { + actor_id: self.actor_id.to_string(), + generation: self.generation, + env: self.runner.config().env.clone().into(), + metadata: Some(convert_actor_metadata_to_proto( + &self.config.metadata.deserialize()?, + )), + }, + ), + ), }) .await?; } @@ -187,8 +204,8 @@ impl Actor { match res { runner_protocol::ToActor::StateUpdate { state } => { - match state { - runner_protocol::ActorState::Running => { + match state.state.context("ActorState.state")? { + runner_protocol::proto::actor_state::State::Running(_) => { tracing::info!( actor_id=?self.actor_id, generation=?self.generation, @@ -202,76 +219,163 @@ impl Actor { self.set_running(ctx, pid, ports).await?; }, - runner_protocol::ActorState::Exited { - exit_code, - } => break exit_code, + runner_protocol::proto::actor_state::State::Exited(state) => { + break state.exit_code; + } } } runner_protocol::ToActor::Kv(req) => { // TODO: Add queue and bg thread for processing kv ops // Run kv operation - match req.data { - runner_protocol::KvRequestData::Get { keys } => { - let res = self.kv.get(keys).await; - let error = res.as_ref().err().map(|x| x.to_string()); - - self.runner.send(&runner_protocol::ToRunner::Kv(runner_protocol::KvResponse { - request_id: req.request_id, - data: res.ok().map(|entries| { - let (keys, values) = entries.into_iter().unzip(); - runner_protocol::KvResponseData::Get { - keys, - values, + match req.data.context("Request.data")? { + runner_protocol::proto::kv::request::Data::Get(body) => { + let res = self.kv.get(pegboard_actor_kv::Key::convert_vec(body.keys)).await; + + self.runner.send(&runner_protocol::proto::ToRunner { + message: Some(runner_protocol::proto::to_runner::Message::Kv( + runner_protocol::proto::kv::Response { + request_id: req.request_id, + data: match res { + Ok(entries) => { + let (keys, values) = entries + .into_iter() + .map(|(k, v)| (k.into(), v.into())) + .unzip(); + Some(runner_protocol::proto::kv::response::Data::Get( + runner_protocol::proto::kv::response::Get { + keys, + values, + } + )) + } + Err(err) => { + Some(runner_protocol::proto::kv::response::Data::Error( + runner_protocol::proto::kv::response::Error { + message: err.to_string(), + } + )) + } + }, } - }), - error, - })).await?; + )), + }).await?; } - runner_protocol::KvRequestData::List { query, reverse, limit } => { - let res = self.kv.list(query, reverse, limit).await; - let error = res.as_ref().err().map(|x| x.to_string()); - - self.runner.send(&runner_protocol::ToRunner::Kv(runner_protocol::KvResponse { - request_id: req.request_id, - data: res.ok().map(|entries| { - let (keys, values) = entries.into_iter().unzip(); - runner_protocol::KvResponseData::List { - keys, - values, + runner_protocol::proto::kv::request::Data::List(body) => { + let res = self.kv.list( + body.query.context("List.query")?.try_into()?, + body.reverse, + body.limit.map(TryInto::try_into).transpose()? + ).await; + + self.runner.send(&runner_protocol::proto::ToRunner { + message: Some(runner_protocol::proto::to_runner::Message::Kv( + runner_protocol::proto::kv::Response { + request_id: req.request_id, + data: match res { + Ok(entries) => { + let (keys, values) = entries + .into_iter() + .map(|(k, v)| (k.into(), v.into())) + .unzip(); + Some(runner_protocol::proto::kv::response::Data::List( + runner_protocol::proto::kv::response::List { + keys, + values, + } + )) + } + Err(err) => { + Some(runner_protocol::proto::kv::response::Data::Error( + runner_protocol::proto::kv::response::Error { + message: err.to_string(), + } + )) + } + }, } - }), - error, - })).await?; + )), + }).await?; } - runner_protocol::KvRequestData::Put { keys, values } => { - let res = self.kv.put(keys.into_iter().zip(values.into_iter()).collect()).await; - let error = res.as_ref().err().map(|x| x.to_string()); - - self.runner.send(&runner_protocol::ToRunner::Kv(runner_protocol::KvResponse { - request_id: req.request_id, - data: res.ok().map(|_| runner_protocol::KvResponseData::Put {}), - error, - })).await?; + runner_protocol::proto::kv::request::Data::Put(body) => { + let res = self.kv.put( + body.keys + .into_iter() + .map(|x| x.into()) + .zip(body.values.into_iter().map(|x| x.into())) + .collect() + ).await; + + self.runner.send(&runner_protocol::proto::ToRunner { + message: Some(runner_protocol::proto::to_runner::Message::Kv( + runner_protocol::proto::kv::Response { + request_id: req.request_id, + data: match res { + Ok(_) => { + Some(runner_protocol::proto::kv::response::Data::Put( + runner_protocol::proto::kv::response::Put {} + )) + } + Err(err) => { + Some(runner_protocol::proto::kv::response::Data::Error( + runner_protocol::proto::kv::response::Error { + message: err.to_string(), + } + )) + } + }, + } + )), + }).await?; } - runner_protocol::KvRequestData::Delete { keys } => { - let res = self.kv.delete(keys).await; - let error = res.as_ref().err().map(|x| x.to_string()); - - self.runner.send(&runner_protocol::ToRunner::Kv(runner_protocol::KvResponse { - request_id: req.request_id, - data: res.ok().map(|_| runner_protocol::KvResponseData::Delete {}), - error, - })).await?; + runner_protocol::proto::kv::request::Data::Delete(body) => { + let res = self.kv.delete(pegboard_actor_kv::Key::convert_vec(body.keys)).await; + + self.runner.send(&runner_protocol::proto::ToRunner { + message: Some(runner_protocol::proto::to_runner::Message::Kv( + runner_protocol::proto::kv::Response { + request_id: req.request_id, + data: match res { + Ok(_) => { + Some(runner_protocol::proto::kv::response::Data::Delete( + runner_protocol::proto::kv::response::Delete {} + )) + } + Err(err) => { + Some(runner_protocol::proto::kv::response::Data::Error( + runner_protocol::proto::kv::response::Error { + message: err.to_string(), + } + )) + } + }, + } + )), + }).await?; } - runner_protocol::KvRequestData::Drop { } => { + runner_protocol::proto::kv::request::Data::Drop(_) => { let res = self.kv.delete_all().await; - let error = res.as_ref().err().map(|x| x.to_string()); - self.runner.send(&runner_protocol::ToRunner::Kv(runner_protocol::KvResponse { - request_id: req.request_id, - data: res.ok().map(|_| runner_protocol::KvResponseData::Drop {}), - error, - })).await?; + self.runner.send(&runner_protocol::proto::ToRunner { + message: Some(runner_protocol::proto::to_runner::Message::Kv( + runner_protocol::proto::kv::Response { + request_id: req.request_id, + data: match res { + Ok(_) => { + Some(runner_protocol::proto::kv::response::Data::Drop( + runner_protocol::proto::kv::response::Drop {} + )) + } + Err(err) => { + Some(runner_protocol::proto::kv::response::Data::Error( + runner_protocol::proto::kv::response::Error { + message: err.to_string(), + } + )) + } + }, + } + )), + }).await?; } } } @@ -318,11 +422,15 @@ impl Actor { // Send message if self.runner.has_socket() { self.runner - .send(&runner_protocol::ToRunner::SignalActor { - actor_id: self.actor_id, - generation: self.generation, - signal: signal as i32, - persist_storage, + .send(&runner_protocol::proto::ToRunner { + message: Some(runner_protocol::proto::to_runner::Message::SignalActor( + runner_protocol::proto::to_runner::SignalActor { + actor_id: self.actor_id.to_string(), + generation: self.generation, + signal: signal as i32, + persist_storage, + }, + )), }) .await?; } @@ -465,3 +573,96 @@ impl Actor { Ok(()) } } + +/// Convert from serde ActorMetadata to proto ActorMetadata +pub fn convert_actor_metadata_to_proto( + metadata: &protocol::ActorMetadata, +) -> runner_protocol::proto::ActorMetadata { + use runner_protocol::proto::{ + actor_metadata, routing, ActorMetadata, GameGuardProtocol, HostProtocol, Port, Routing, + }; + + let mut proto_ports = HashMap::new(); + for (key, port) in &metadata + .network + .as_ref() + .expect("should have network") + .ports + { + proto_ports.insert( + key.clone(), + Port { + internal_port: port.internal_port, + public_hostname: port.public_hostname.clone(), + public_port: port.public_port, + public_path: port.public_path.clone(), + routing: Some(Routing { + routing: Some(match &port.routing { + pegboard::types::Routing::GameGuard { protocol } => { + routing::Routing::GameGuard(routing::GameGuard { + protocol: match protocol { + pegboard::types::GameGuardProtocol::Http => { + GameGuardProtocol::GgHttp as i32 + } + pegboard::types::GameGuardProtocol::Https => { + GameGuardProtocol::GgHttps as i32 + } + pegboard::types::GameGuardProtocol::Tcp => { + GameGuardProtocol::GgTcp as i32 + } + pegboard::types::GameGuardProtocol::TcpTls => { + GameGuardProtocol::GgTcpTls as i32 + } + pegboard::types::GameGuardProtocol::Udp => { + GameGuardProtocol::GgUdp as i32 + } + }, + }) + } + pegboard::types::Routing::Host { protocol } => { + routing::Routing::Host(routing::Host { + protocol: match protocol { + pegboard::types::HostProtocol::Tcp => { + HostProtocol::HostTcp as i32 + } + pegboard::types::HostProtocol::Udp => { + HostProtocol::HostUdp as i32 + } + }, + }) + } + }), + }), + }, + ); + } + let network = actor_metadata::Network { ports: proto_ports }; + + // Create the final ActorMetadata + ActorMetadata { + actor: Some(actor_metadata::Actor { + actor_id: metadata.actor.actor_id.to_string(), + tags: metadata.actor.tags.clone().into(), + create_ts: metadata.actor.create_ts, + }), + network: Some(network), + project: Some(actor_metadata::Project { + project_id: metadata.project.project_id.to_string(), + slug: metadata.project.slug.clone(), + }), + environment: Some(actor_metadata::Environment { + env_id: metadata.environment.env_id.to_string(), + slug: metadata.environment.slug.clone(), + }), + datacenter: Some(actor_metadata::Datacenter { + name_id: metadata.datacenter.name_id.clone(), + display_name: metadata.datacenter.display_name.clone(), + }), + cluster: Some(actor_metadata::Cluster { + cluster_id: metadata.cluster.cluster_id.to_string(), + }), + build: Some(actor_metadata::Build { + build_id: metadata.build.build_id.to_string(), + }), + } +} diff --git a/packages/edge/infra/client/manager/src/runner/mod.rs b/packages/edge/infra/client/manager/src/runner/mod.rs index 0c5ee6f758..e69defe3b7 100644 --- a/packages/edge/infra/client/manager/src/runner/mod.rs +++ b/packages/edge/infra/client/manager/src/runner/mod.rs @@ -124,8 +124,12 @@ impl Runner { tracing::info!(runner_id=?self.runner_id, "runner received another socket, closing old one"); // Close the old socket - let buf = runner_protocol::encode_frame(&runner_protocol::ToRunner::Close { - reason: Some("replacing with new socket".into()), + let buf = runner_protocol::encode_frame(&runner_protocol::proto::ToRunner { + message: Some(runner_protocol::proto::to_runner::Message::Close( + runner_protocol::proto::to_runner::Close { + reason: Some("replacing with new socket".into()), + }, + )), })?; if let Err(err) = existing_tx.send(buf.into()).await { @@ -178,21 +182,23 @@ impl Runner { break Ok(()); }; - let (_, packet) = runner_protocol::decode_frame::(&buf?) - .context("failed to decode frame")?; + let (_, packet) = + runner_protocol::decode_frame::(&buf?) + .context("failed to decode frame")?; tracing::debug!(?packet, "runner received packet"); - match packet { - runner_protocol::ToManager::Ping { .. } => { + match packet.message.context("ToManager.message")? { + runner_protocol::proto::to_manager::Message::Ping(_) => { // TODO: Rate limit? - self.send(&runner_protocol::ToRunner::Pong).await?; + self.send(&runner_protocol::proto::ToRunner { + message: Some(runner_protocol::proto::to_runner::Message::Pong( + runner_protocol::proto::to_runner::Pong {}, + )), + }) + .await?; } - runner_protocol::ToManager::ActorStateUpdate { - actor_id, - generation, - state, - } => { + runner_protocol::proto::to_manager::Message::ActorStateUpdate(msg) => { match self.config.image.allocation_type { protocol::ImageAllocationType::Single => { tracing::debug!("unexpected state update from non-multi runner"); @@ -202,25 +208,27 @@ impl Runner { // are listening to this runner's `actor_proxy_tx`. This means invalid messages are ignored. // NOTE: No receivers is not an error let _ = self.actor_proxy_tx.send(( - actor_id, - generation, - runner_protocol::ToActor::StateUpdate { state }, + rivet_util::Id::parse(&msg.actor_id)?, + msg.generation, + runner_protocol::ToActor::StateUpdate { + state: msg.state.context("ActorStateUpdate.state")?, + }, )); } } } - runner_protocol::ToManager::Kv(req) => { + runner_protocol::proto::to_manager::Message::Kv(msg) => { let _ = self.actor_proxy_tx.send(( - req.actor_id, - req.generation, - runner_protocol::ToActor::Kv(req), + rivet_util::Id::parse(&msg.actor_id)?, + msg.generation, + runner_protocol::ToActor::Kv(msg), )); } } } } - pub async fn send(&self, packet: &runner_protocol::ToRunner) -> Result<()> { + pub async fn send(&self, packet: &runner_protocol::proto::ToRunner) -> Result<()> { match &self.comms { Comms::Basic => bail!("cannot send socket message to basic runner"), Comms::Socket(socket) => { diff --git a/packages/edge/services/pegboard/src/protocol.rs b/packages/edge/services/pegboard/src/protocol.rs index 8b32a2ee71..167b36689f 100644 --- a/packages/edge/services/pegboard/src/protocol.rs +++ b/packages/edge/services/pegboard/src/protocol.rs @@ -245,6 +245,7 @@ pub struct Resources { #[derive(Debug, Serialize, Deserialize, Clone, Hash)] pub struct ActorMetadata { pub actor: ActorMetadataActor, + // TODO: Make not optional when all old actors are gone #[serde(default)] pub network: Option, pub project: ActorMetadataProject, diff --git a/packages/edge/services/pegboard/src/workflows/actor/v1/runtime.rs b/packages/edge/services/pegboard/src/workflows/actor/v1/runtime.rs index 9ad5454904..3bbe17b2e7 100644 --- a/packages/edge/services/pegboard/src/workflows/actor/v1/runtime.rs +++ b/packages/edge/services/pegboard/src/workflows/actor/v1/runtime.rs @@ -687,11 +687,12 @@ pub async fn spawn_actor( fallback_artifact_url: actor_setup.fallback_artifact_url.clone(), kind: actor_setup.meta.build_kind.into(), compression: actor_setup.meta.build_compression.into(), - // Always single, this is the old actor wf - allocation_type: protocol::ImageAllocationType::Single, - + // Calculated on the manager for old actors artifact_size: 0, + + // Always single, this is the old actor wf + allocation_type: protocol::ImageAllocationType::Single, }, root_user_enabled: input.root_user_enabled, env: input.environment.clone(), diff --git a/packages/edge/services/pegboard/standalone/usage-metrics-publish/src/lib.rs b/packages/edge/services/pegboard/standalone/usage-metrics-publish/src/lib.rs index 5aa03f03f7..72fa4175e5 100644 --- a/packages/edge/services/pegboard/standalone/usage-metrics-publish/src/lib.rs +++ b/packages/edge/services/pegboard/standalone/usage-metrics-publish/src/lib.rs @@ -217,7 +217,7 @@ pub async fn run_from_env( // NOTE: actor resources here is slightly different from build resources because it is // selected based on tier if let Some(resources) = &actor.resources { - match build.runtime { + match &build.runtime { None | Some(BuildRuntime::Container { .. }) => { env_usage.cpu += resources.cpu_millicores as u64; env_usage.memory += resources.memory_mib as u64;