Skip to content

Commit 29e785f

Browse files
committed
fix(pb): get new actor ids working e2e
1 parent 7861f1b commit 29e785f

File tree

9 files changed

+107
-114
lines changed

9 files changed

+107
-114
lines changed

packages/core/services/cluster/src/ops/datacenter/get_for_label.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,7 @@ async fn get_dcs(ctx: OperationCtx, labels: Vec<u16>) -> GlobalResult<Vec<Datace
5757
FROM db_cluster.datacenters@datacenter_label_idx
5858
WHERE label = ANY($1)
5959
",
60-
labels.into_iter().map(|x| x as i64).collect::<Vec<_>>(),
60+
labels.into_iter().map(|x| x.to_be_bytes()).collect::<Vec<_>>(),
6161
)
6262
.await?;
6363

packages/core/services/cluster/src/workflows/server/install/install_scripts/components/mod.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -233,7 +233,7 @@ pub mod lz4 {
233233
indoc!(
234234
r#"
235235
echo 'Downloading lz4'
236-
curl -L https://releases.rivet.gg/tools/lz4/1.10.0/debian11-amd64/lz4 -o /usr/local/bin/lz4
236+
curl -Lfo /usr/local/bin/lz4 https://releases.rivet.gg/tools/lz4/1.10.0/debian11-amd64/lz4
237237
chmod +x /usr/local/bin/lz4
238238
"#
239239
)
@@ -286,7 +286,7 @@ pub mod umoci {
286286
indoc!(
287287
r#"
288288
echo 'Downloading umoci'
289-
curl -Lf -o /usr/bin/umoci "https://github.com/opencontainers/umoci/releases/download/v0.4.7/umoci.amd64"
289+
curl -Lfo /usr/bin/umoci "https://github.com/opencontainers/umoci/releases/download/v0.4.7/umoci.amd64"
290290
chmod +x /usr/bin/umoci
291291
"#
292292
).to_string()
@@ -300,7 +300,7 @@ pub mod cni {
300300
indoc!(
301301
r#"
302302
echo 'Downloading cnitool'
303-
curl -Lf -o /usr/bin/cnitool "https://github.com/rivet-gg/cni/releases/download/v1.1.2-build3/cnitool"
303+
curl -Lfo /usr/bin/cnitool "https://github.com/rivet-gg/cni/releases/download/v1.1.2-build3/cnitool"
304304
chmod +x /usr/bin/cnitool
305305
"#
306306
).to_string()

packages/edge/infra/client/echo/Cargo.toml

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,11 +6,13 @@ authors = ["Rivet Gaming, LLC <[email protected]>"]
66
license = "Apache-2.0"
77

88
[dependencies]
9+
anyhow = "1.0"
910
bytes = "1.0"
1011
futures-util = "0.3"
1112
http = "0.2"
13+
serde = { version = "1.0", features = ["derive"] }
1214
serde_json = "1.0"
1315
tokio = { version = "1.40", features = ["full",] }
14-
tokio-tungstenite = "0.23.1"
16+
tokio-util = "0.7"
1517
uuid = { version = "1", features = ["v4", "serde"] }
1618
warp = "0.3.7"
Lines changed: 88 additions & 71 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,12 @@
1-
use std::{env, net::SocketAddr, sync::Arc, time::Duration};
1+
use std::{env, io::Cursor, net::SocketAddr, path::PathBuf, sync::Arc, time::Duration};
22

3+
use anyhow::*;
4+
use bytes::Bytes;
35
use futures_util::{SinkExt, StreamExt};
6+
use serde::{de::DeserializeOwned, Serialize};
47
use serde_json::json;
5-
use tokio::sync::Mutex;
6-
use tokio_tungstenite::{connect_async, tungstenite::protocol::Message};
7-
use uuid::Uuid;
8+
use tokio::{net::UnixStream, sync::Mutex};
9+
use tokio_util::codec::{Framed, LengthDelimitedCodec};
810
use warp::Filter;
911

1012
const PING_INTERVAL: Duration = Duration::from_secs(1);
@@ -18,20 +20,20 @@ async fn main() {
1820
}
1921

2022
// Get manager connection details from env vars
21-
let manager_ip = env::var("RIVET_MANAGER_IP").expect("RIVET_MANAGER_IP not set");
22-
let manager_port = env::var("RIVET_MANAGER_PORT").expect("RIVET_MANAGER_PORT not set");
23-
let manager_addr = format!("ws://{}:{}", manager_ip, manager_port);
23+
let manager_socket_path = PathBuf::from(
24+
env::var("RIVET_MANAGER_SOCKET_PATH").expect("RIVET_MANAGER_SOCKET_PATH not set"),
25+
);
2426

2527
// Get HTTP server port from env var or use default
2628
let http_port = env::var("PORT_MAIN")
2729
.expect("PORT_MAIN not set")
2830
.parse::<u16>()
2931
.expect("bad PORT_MAIN");
3032

31-
// Spawn the WebSocket client
33+
// Spawn the unix socket client
3234
tokio::spawn(async move {
33-
if let Err(e) = run_websocket_client(&manager_addr).await {
34-
eprintln!("WebSocket client error: {}", e);
35+
if let Err(e) = run_socket_client(manager_socket_path).await {
36+
eprintln!("Socket client error: {}", e);
3537
}
3638
});
3739

@@ -53,25 +55,28 @@ async fn main() {
5355
warp::serve(echo).run(http_addr).await;
5456
}
5557

56-
async fn run_websocket_client(url: &str) -> Result<(), Box<dyn std::error::Error>> {
57-
println!("Connecting to WebSocket at {}", url);
58+
async fn run_socket_client(socket_path: PathBuf) -> Result<()> {
59+
println!("Connecting to socket at {}", socket_path.display());
5860

59-
// Connect to the WebSocket server
60-
let (ws_stream, _) = connect_async(url).await?;
61-
println!("WebSocket connection established");
61+
// Connect to the socket server
62+
let stream = UnixStream::connect(socket_path).await?;
63+
println!("Socket connection established");
6264

63-
// Split the stream
64-
let (mut write, mut read) = ws_stream.split();
65+
let codec = LengthDelimitedCodec::builder()
66+
.length_field_type::<u32>()
67+
.length_field_length(4)
68+
// No offset
69+
.length_field_offset(0)
70+
// Header length is not included in the length calculation
71+
.length_adjustment(4)
72+
// header is included in the returned bytes
73+
.num_skip(0)
74+
.new_codec();
6575

66-
let payload = json!({
67-
"init": {
68-
"access_token": env::var("RIVET_ACCESS_TOKEN").expect("RIVET_ACCESS_TOKEN not set"),
69-
},
70-
});
76+
let framed = Framed::new(stream, codec);
7177

72-
let data = serde_json::to_vec(&payload)?;
73-
write.send(Message::Binary(data)).await?;
74-
println!("Sent init message");
78+
// Split the stream
79+
let (write, mut read) = framed.split();
7580

7681
// Ping thread
7782
let write = Arc::new(Mutex::new(write));
@@ -80,10 +85,14 @@ async fn run_websocket_client(url: &str) -> Result<(), Box<dyn std::error::Error
8085
loop {
8186
tokio::time::sleep(PING_INTERVAL).await;
8287

88+
let payload = json!({
89+
"ping": {}
90+
});
91+
8392
if write2
8493
.lock()
8594
.await
86-
.send(Message::Ping(Vec::new()))
95+
.send(encode_frame(&payload).unwrap())
8796
.await
8897
.is_err()
8998
{
@@ -93,53 +102,61 @@ async fn run_websocket_client(url: &str) -> Result<(), Box<dyn std::error::Error
93102
});
94103

95104
// Process incoming messages
96-
while let Some(message) = read.next().await {
97-
match message {
98-
Ok(msg) => match msg {
99-
Message::Pong(_) => {}
100-
Message::Binary(buf) => {
101-
let packet = serde_json::from_slice::<serde_json::Value>(&buf)?;
102-
println!("Received packet: {packet:?}");
103-
104-
if let Some(packet) = packet.get("start_actor") {
105-
let payload = json!({
106-
"actor_state_update": {
107-
"actor_id": packet["actor_id"],
108-
"generation": packet["generation"],
109-
"state": {
110-
"running": null,
111-
},
112-
},
113-
});
114-
115-
let data = serde_json::to_vec(&payload)?;
116-
write.lock().await.send(Message::Binary(data)).await?;
117-
} else if let Some(packet) = packet.get("signal_actor") {
118-
let payload = json!({
119-
"actor_state_update": {
120-
"actor_id": packet["actor_id"],
121-
"generation": packet["generation"],
122-
"state": {
123-
"exited": {
124-
"exit_code": null,
125-
},
126-
},
127-
},
128-
});
129-
130-
let data = serde_json::to_vec(&payload)?;
131-
write.lock().await.send(Message::Binary(data)).await?;
132-
}
133-
}
134-
msg => eprintln!("Unexpected message: {msg:?}"),
135-
},
136-
Err(e) => {
137-
eprintln!("Error reading message: {}", e);
138-
break;
139-
}
105+
while let Some(frame) = read.next().await.transpose()? {
106+
let (_, packet) = decode_frame::<serde_json::Value>(&frame.freeze())?;
107+
println!("Received packet: {packet:?}");
108+
109+
if let Some(packet) = packet.get("start_actor") {
110+
let payload = json!({
111+
"actor_state_update": {
112+
"actor_id": packet["actor_id"],
113+
"generation": packet["generation"],
114+
"state": {
115+
"running": null,
116+
},
117+
},
118+
});
119+
120+
write.lock().await.send(encode_frame(&payload)?).await?;
121+
} else if let Some(packet) = packet.get("signal_actor") {
122+
let payload = json!({
123+
"actor_state_update": {
124+
"actor_id": packet["actor_id"],
125+
"generation": packet["generation"],
126+
"state": {
127+
"exited": {
128+
"exit_code": null,
129+
},
130+
},
131+
},
132+
});
133+
134+
write.lock().await.send(encode_frame(&payload)?).await?;
140135
}
141136
}
142137

143-
println!("WebSocket connection closed");
138+
println!("Socket connection closed");
144139
Ok(())
145140
}
141+
142+
fn decode_frame<T: DeserializeOwned>(frame: &Bytes) -> Result<([u8; 4], T)> {
143+
ensure!(frame.len() >= 4, "Frame too short");
144+
145+
// Extract the header (first 4 bytes)
146+
let header = [frame[0], frame[1], frame[2], frame[3]];
147+
148+
// Deserialize the rest of the frame (payload after the header)
149+
let payload = serde_json::from_slice(&frame[4..])?;
150+
151+
Ok((header, payload))
152+
}
153+
154+
fn encode_frame<T: Serialize>(payload: &T) -> Result<Bytes> {
155+
let mut buf = Vec::with_capacity(4);
156+
buf.extend_from_slice(&[0u8; 4]); // header (currently unused)
157+
158+
let mut cursor = Cursor::new(&mut buf);
159+
serde_json::to_writer(&mut cursor, payload)?;
160+
161+
Ok(buf.into())
162+
}

packages/edge/infra/client/manager/src/utils/mod.rs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -128,6 +128,8 @@ async fn build_sqlite_pool(db_url: &str) -> Result<SqlitePool> {
128128
.busy_timeout(Duration::from_secs(5))
129129
// Enable foreign key constraint enforcement
130130
.foreign_keys(true)
131+
// Increases write performance
132+
.journal_mode(SqliteJournalMode::Wal)
131133
// Enable auto vacuuming and set it to incremental mode for gradual space reclaiming
132134
.auto_vacuum(SqliteAutoVacuum::Incremental)
133135
// Set synchronous mode to NORMAL for performance and data safety balance
@@ -241,7 +243,8 @@ async fn init_sqlite_schema(pool: &SqlitePool) -> Result<()> {
241243
generation INTEGER NOT NULL,
242244
config BLOB NOT NULL, -- JSONB
243245
244-
runner_id NOT NULL, -- Already exists in `config`, set here for ease of querying
246+
-- Already exists in `config`, set here for ease of querying
247+
runner_id BLOB NOT NULL, -- UUID
245248
246249
start_ts INTEGER NOT NULL,
247250
running_ts INTEGER,

packages/edge/infra/guard/core/src/lib.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@ pub mod proxy_service;
55
pub mod request_context;
66
mod server;
77
pub mod types;
8-
pub mod util;
98

109
pub use cert_resolver::CertResolverFn;
1110
pub use proxy_service::{MiddlewareFn, ProxyService, ProxyState, RouteTarget, RoutingFn};

packages/edge/infra/guard/core/src/util.rs

Lines changed: 0 additions & 23 deletions
This file was deleted.

packages/edge/infra/guard/server/src/tls.rs

Lines changed: 4 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -157,14 +157,11 @@ pub async fn create_cert_resolver(
157157
}
158158
Ok(None) => {
159159
tracing::warn!(
160-
"Could not build dynamic hostname actor routing regex - pattern will be skipped"
161-
);
160+
"Could not build dynamic hostname actor routing regex - pattern will be skipped"
161+
);
162162
None
163163
}
164-
Err(err) => bail!(
165-
"Failed to build dynamic hostname actor routing regex: {}",
166-
err
167-
),
164+
Err(e) => bail!("Failed to build dynamic hostname actor routing regex: {}", e),
168165
};
169166
let actor_hostname_regex_static =
170167
match build_actor_hostname_and_path_regex(EndpointType::Path, guard_hostname) {
@@ -178,9 +175,7 @@ pub async fn create_cert_resolver(
178175
);
179176
None
180177
}
181-
Err(e) => {
182-
bail!("Failed to build static path actor routing regex: {}", e);
183-
}
178+
Err(e) => bail!("Failed to build static path actor routing regex: {}", e),
184179
};
185180

186181
// Create resolver function that matches the routing logic

packages/edge/services/pegboard/src/util.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ use regex::Regex;
55
use crate::types::{EndpointType, GameGuardProtocol};
66

77
// Constants for regex patterns
8-
const UUID_PATTERN: &str = r"[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}";
8+
const ID_PATTERN: &str = r"[a-zA-Z0-9-]+";
99
const PORT_NAME_PATTERN: &str = r"[a-zA-Z0-9-_]+";
1010

1111
pub fn build_actor_hostname_and_path(
@@ -59,7 +59,7 @@ pub fn build_actor_hostname_and_path_regex(
5959
// server in the subdomain is a convenience
6060
(EndpointType::Hostname, GuardPublicHostname::DnsParent(dns_parent)) => {
6161
let hostname_regex = Regex::new(&format!(
62-
r"^(?P<actor_id>{UUID_PATTERN})-(?P<port_name>{PORT_NAME_PATTERN})\.actor\.{}$",
62+
r"^(?P<actor_id>{ID_PATTERN})-(?P<port_name>{PORT_NAME_PATTERN})\.actor\.{}$",
6363
regex::escape(dns_parent.as_str())
6464
))?;
6565
Ok(Some((hostname_regex, None)))
@@ -81,7 +81,7 @@ pub fn build_actor_hostname_and_path_regex(
8181
))?;
8282

8383
let path_regex = Regex::new(&format!(
84-
r"^/(?P<actor_id>{UUID_PATTERN})-(?P<port_name>{PORT_NAME_PATTERN})(?:/.*)?$"
84+
r"^/(?P<actor_id>{ID_PATTERN})-(?P<port_name>{PORT_NAME_PATTERN})(?:/.*)?$"
8585
))?;
8686

8787
Ok(Some((hostname_regex, Some(path_regex))))
@@ -91,7 +91,7 @@ pub fn build_actor_hostname_and_path_regex(
9191
let hostname_regex = Regex::new(&format!(r"^{}$", regex::escape(static_.as_str())))?;
9292

9393
let path_regex = Regex::new(&format!(
94-
r"^/(?P<actor_id>{UUID_PATTERN})-(?P<port_name>{PORT_NAME_PATTERN})(?:/.*)?$"
94+
r"^/(?P<actor_id>{ID_PATTERN})-(?P<port_name>{PORT_NAME_PATTERN})(?:/.*)?$"
9595
))?;
9696

9797
Ok(Some((hostname_regex, Some(path_regex))))

0 commit comments

Comments
 (0)