Skip to content

Commit 638297c

Browse files
committed
fix(pegboard): convert ws manager socket to unix socket
1 parent 4f10a0d commit 638297c

File tree

15 files changed

+198
-194
lines changed

15 files changed

+198
-194
lines changed

docker/dev-full/prometheus/prometheus.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,4 +14,4 @@ scrape_configs:
1414
- job_name: rivet-client
1515
static_configs:
1616
- targets:
17-
- rivet-client:6090
17+
- rivet-client:8091

docker/dev-full/vector-client/vector.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ sources:
88
prometheus_pegboard:
99
type: prometheus_scrape
1010
endpoints:
11-
- http://rivet-client:6090
11+
- http://rivet-client:8091
1212
scrape_interval_secs: 15
1313

1414
pegboard_manager:

docker/monolith/build-scripts/setup_s6.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -103,7 +103,7 @@ const services: Service[] = [
103103
rootUser: true,
104104
ports: {
105105
runner: 6080,
106-
metrics: 6090,
106+
metrics: 8091,
107107
},
108108
},
109109

docker/monolith/vector-client/vector.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ sources:
1212
prometheus_pegboard:
1313
type: prometheus_scrape
1414
endpoints:
15-
- http://rivet-client:6090
15+
- http://rivet-client:8091
1616
scrape_interval_secs: 15
1717

1818
pegboard_manager:

packages/common/chirp-workflow/core/src/worker.rs

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,14 @@ impl Worker {
8383
loop {
8484
tokio::select! {
8585
_ = tick_interval.tick() => {},
86+
res = wake_sub.next() => {
87+
if res.is_none() {
88+
return Err(WorkflowError::SubscriptionUnsubscribed.into());
89+
}
90+
91+
tick_interval.reset();
92+
},
93+
8694
res = &mut gc_handle => {
8795
tracing::error!(?res, "metrics task unexpectedly stopped");
8896
break;
@@ -91,13 +99,6 @@ impl Worker {
9199
tracing::error!(?res, "metrics task unexpectedly stopped");
92100
break;
93101
},
94-
res = wake_sub.next() => {
95-
if res.is_none() {
96-
return Err(WorkflowError::SubscriptionUnsubscribed.into());
97-
}
98-
99-
tick_interval.reset();
100-
},
101102
_ = ctrl_c() => break,
102103
_ = sigterm.recv() => break,
103104
}

packages/common/metrics/src/server.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,12 @@
1+
use std::net::SocketAddr;
2+
13
use global_error::prelude::*;
24
use hyper::{
35
header::CONTENT_TYPE,
46
service::{make_service_fn, service_fn},
57
Body, Request, Response, Server,
68
};
79
use prometheus::{Encoder, TextEncoder};
8-
use std::net::SocketAddr;
910

1011
// TODO: Record extra labels
1112

packages/core/services/cluster/src/workflows/server/install/install_scripts/mod.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -175,7 +175,7 @@ pub async fn gen_initialize(
175175
"pegboard".into(),
176176
components::vector::PrometheusTarget {
177177
// Should match port from pb manager config
178-
endpoint: "http://127.0.0.1:6090".into(),
178+
endpoint: "http://127.0.0.1:8091".into(),
179179
scrape_interval: 15,
180180
},
181181
);
@@ -190,7 +190,7 @@ pub async fn gen_initialize(
190190
"pegboard".into(),
191191
components::vector::PrometheusTarget {
192192
// Should match port from pb manager config
193-
endpoint: "http://127.0.0.1:6090".into(),
193+
endpoint: "http://127.0.0.1:8091".into(),
194194
scrape_interval: 15,
195195
},
196196
);

packages/edge/infra/client/config/Cargo.toml

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,10 +6,13 @@ license.workspace = true
66
edition.workspace = true
77

88
[dependencies]
9+
anyhow = "1.0"
910
ipnet = { version = "2.10.1", features = ["serde"] }
11+
pegboard.workspace = true
12+
rivet-util.workspace = true
1013
schemars = { version = "0.8.21", features = ["url", "uuid1"] }
1114
serde = { version = "1.0.195", features = ["derive"] }
15+
serde_json = "1.0"
16+
tokio-util = { version = "0.7", features = ["codec"] }
1217
url = "2.2.2"
1318
uuid = { version = "1.6.1", features = ["v4"] }
14-
pegboard.workspace = true
15-
rivet-util.workspace = true

packages/edge/infra/client/config/src/manager.rs

Lines changed: 1 addition & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -85,13 +85,6 @@ pub struct Runner {
8585
/// Whether or not to use a mount for actor file systems.
8686
pub use_mounts: Option<bool>,
8787

88-
/// Address of the WebSocket server for runners. Should exist on a network interface that both the host
89-
/// and containers can access.
90-
pub ip: Option<IpAddr>,
91-
92-
/// WebSocket port for runners on this machine to connect to.
93-
pub port: Option<u16>,
94-
9588
pub container_runner_binary_path: Option<PathBuf>,
9689
}
9790

@@ -100,14 +93,6 @@ impl Runner {
10093
self.use_mounts.unwrap_or(true)
10194
}
10295

103-
pub fn ip(&self) -> IpAddr {
104-
self.ip.unwrap_or(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)))
105-
}
106-
107-
pub fn port(&self) -> u16 {
108-
self.port.unwrap_or(6080)
109-
}
110-
11196
pub fn container_runner_binary_path(&self) -> PathBuf {
11297
self.container_runner_binary_path
11398
.clone()
@@ -247,7 +232,7 @@ pub struct Metrics {
247232

248233
impl Metrics {
249234
pub fn port(&self) -> u16 {
250-
self.port.unwrap_or(6090)
235+
self.port.unwrap_or(8091)
251236
}
252237
}
253238

packages/edge/infra/client/config/src/runner_protocol.rs

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,9 @@
1+
use anyhow::*;
12
use pegboard::protocol;
23
use serde::{Deserialize, Serialize};
4+
use tokio_util::codec::LengthDelimitedCodec;
5+
use serde::{de::DeserializeOwned, Serialize};
6+
use bytes::Bytes;
37

48
#[derive(Debug, Serialize, Deserialize)]
59
#[serde(rename_all = "snake_case", deny_unknown_fields)]
@@ -38,3 +42,38 @@ pub enum ActorState {
3842
Running,
3943
Exited { exit_code: Option<i32> },
4044
}
45+
46+
pub fn codec() -> LengthDelimitedCodec {
47+
LengthDelimitedCodec::builder()
48+
.length_field_type::<u32>()
49+
.length_field_length(4)
50+
// No offset
51+
.length_field_offset(0)
52+
// Header length is not included in the length calculation
53+
.length_adjustment(4)
54+
// Header is included in the returned bytes
55+
.num_skip(0)
56+
.new_codec()
57+
}
58+
59+
pub fn encode_frame<T: Serialize>(payload: &T) -> Result<Vec<u8>> {
60+
let mut buf = Vec::with_capacity(4);
61+
buf.extend_from_slice(&[0u8; 4]); // header (currently unused)
62+
63+
let mut cursor = Cursor::new(&mut buf);
64+
serde_json::to_writer(&mut cursor, payload)?;
65+
66+
Ok(buf)
67+
}
68+
69+
fn decode_frame<T: DeserializeOwned>(frame: &Vec<u8>) -> Result<([u8; 4], T)> {
70+
ensure!(frame.len() >= 4, "Frame too short");
71+
72+
// Extract the header (first 4 bytes)
73+
let header = [frame[0], frame[1], frame[2], frame[3]];
74+
75+
// Deserialize the rest of the frame (payload after the header)
76+
let payload = serde_json::from_slice(&frame[4..])?;
77+
78+
Ok((header, payload))
79+
}

0 commit comments

Comments
 (0)