Skip to content

Commit bfa4fd3

Browse files
committed
fix(pegboard): convert ws manager socket to unix socket
1 parent 933385f commit bfa4fd3

File tree

15 files changed

+198
-186
lines changed

15 files changed

+198
-186
lines changed

docker/dev-full/prometheus/prometheus.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,4 +14,4 @@ scrape_configs:
1414
- job_name: rivet-client
1515
static_configs:
1616
- targets:
17-
- rivet-client:6090
17+
- rivet-client:8091

docker/dev-full/vector-client/vector.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ sources:
88
prometheus_pegboard:
99
type: prometheus_scrape
1010
endpoints:
11-
- http://rivet-client:6090
11+
- http://rivet-client:8091
1212
scrape_interval_secs: 15
1313

1414
dynamic_events_http:

docker/monolith/build-scripts/setup_s6.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -103,7 +103,7 @@ const services: Service[] = [
103103
rootUser: true,
104104
ports: {
105105
runner: 6080,
106-
metrics: 6090,
106+
metrics: 8091,
107107
},
108108
},
109109

docker/monolith/vector-client/vector.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ sources:
1212
prometheus_pegboard:
1313
type: prometheus_scrape
1414
endpoints:
15-
- http://rivet-client:6090
15+
- http://rivet-client:8091
1616
scrape_interval_secs: 15
1717

1818
pegboard_manager:

packages/common/chirp-workflow/core/src/worker.rs

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,14 @@ impl Worker {
8383
loop {
8484
tokio::select! {
8585
_ = tick_interval.tick() => {},
86+
res = wake_sub.next() => {
87+
if res.is_none() {
88+
return Err(WorkflowError::SubscriptionUnsubscribed.into());
89+
}
90+
91+
tick_interval.reset();
92+
},
93+
8694
res = &mut gc_handle => {
8795
tracing::error!(?res, "metrics task unexpectedly stopped");
8896
break;
@@ -91,13 +99,6 @@ impl Worker {
9199
tracing::error!(?res, "metrics task unexpectedly stopped");
92100
break;
93101
},
94-
res = wake_sub.next() => {
95-
if res.is_none() {
96-
return Err(WorkflowError::SubscriptionUnsubscribed.into());
97-
}
98-
99-
tick_interval.reset();
100-
},
101102
_ = ctrl_c() => break,
102103
_ = sigterm.recv() => break,
103104
}

packages/common/metrics/src/server.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,12 @@
1+
use std::net::SocketAddr;
2+
13
use global_error::prelude::*;
24
use hyper::{
35
header::CONTENT_TYPE,
46
service::{make_service_fn, service_fn},
57
Body, Request, Response, Server,
68
};
79
use prometheus::{Encoder, TextEncoder};
8-
use std::net::SocketAddr;
910

1011
// TODO: Record extra labels
1112

packages/core/services/cluster/src/workflows/server/install/install_scripts/mod.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -179,7 +179,7 @@ pub async fn gen_initialize(
179179
"pegboard".into(),
180180
components::vector::PrometheusTarget {
181181
// Should match port from pb manager config
182-
endpoint: "http://127.0.0.1:6090".into(),
182+
endpoint: "http://127.0.0.1:8091".into(),
183183
scrape_interval: 15,
184184
},
185185
);
@@ -194,7 +194,7 @@ pub async fn gen_initialize(
194194
"pegboard".into(),
195195
components::vector::PrometheusTarget {
196196
// Should match port from pb manager config
197-
endpoint: "http://127.0.0.1:6090".into(),
197+
endpoint: "http://127.0.0.1:8091".into(),
198198
scrape_interval: 15,
199199
},
200200
);

packages/edge/infra/client/config/Cargo.toml

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,10 +6,13 @@ license.workspace = true
66
edition.workspace = true
77

88
[dependencies]
9+
anyhow = "1.0"
910
ipnet = { version = "2.10.1", features = ["serde"] }
11+
pegboard.workspace = true
12+
rivet-util.workspace = true
1013
schemars = { version = "0.8.21", features = ["url", "uuid1"] }
1114
serde = { version = "1.0.195", features = ["derive"] }
15+
serde_json = "1.0"
16+
tokio-util = { version = "0.7", features = ["codec"] }
1217
url = "2.2.2"
1318
uuid = { version = "1.6.1", features = ["v4"] }
14-
pegboard.workspace = true
15-
rivet-util.workspace = true

packages/edge/infra/client/config/src/manager.rs

Lines changed: 1 addition & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -94,9 +94,6 @@ pub struct Runner {
9494
/// ````
9595
pub use_resource_constraints: Option<bool>,
9696

97-
/// WebSocket Port for runners on this machine to connect to.
98-
pub port: Option<u16>,
99-
10097
pub container_runner_binary_path: Option<PathBuf>,
10198
}
10299

@@ -109,10 +106,6 @@ impl Runner {
109106
self.use_resource_constraints.unwrap_or(true)
110107
}
111108

112-
pub fn port(&self) -> u16 {
113-
self.port.unwrap_or(6080)
114-
}
115-
116109
pub fn container_runner_binary_path(&self) -> PathBuf {
117110
self.container_runner_binary_path
118111
.clone()
@@ -259,7 +252,7 @@ pub struct Metrics {
259252

260253
impl Metrics {
261254
pub fn port(&self) -> u16 {
262-
self.port.unwrap_or(6090)
255+
self.port.unwrap_or(8091)
263256
}
264257
}
265258

packages/edge/infra/client/config/src/runner_protocol.rs

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,9 @@
1+
use anyhow::*;
12
use pegboard::protocol;
23
use serde::{Deserialize, Serialize};
4+
use tokio_util::codec::LengthDelimitedCodec;
5+
use serde::{de::DeserializeOwned, Serialize};
6+
use bytes::Bytes;
37

48
#[derive(Debug, Serialize, Deserialize)]
59
#[serde(rename_all = "snake_case", deny_unknown_fields)]
@@ -38,3 +42,38 @@ pub enum ActorState {
3842
Running,
3943
Exited { exit_code: Option<i32> },
4044
}
45+
46+
pub fn codec() -> LengthDelimitedCodec {
47+
LengthDelimitedCodec::builder()
48+
.length_field_type::<u32>()
49+
.length_field_length(4)
50+
// No offset
51+
.length_field_offset(0)
52+
// Header length is not included in the length calculation
53+
.length_adjustment(4)
54+
// Header is included in the returned bytes
55+
.num_skip(0)
56+
.new_codec()
57+
}
58+
59+
pub fn encode_frame<T: Serialize>(payload: &T) -> Result<Vec<u8>> {
60+
let mut buf = Vec::with_capacity(4);
61+
buf.extend_from_slice(&[0u8; 4]); // header (currently unused)
62+
63+
let mut cursor = Cursor::new(&mut buf);
64+
serde_json::to_writer(&mut cursor, payload)?;
65+
66+
Ok(buf)
67+
}
68+
69+
fn decode_frame<T: DeserializeOwned>(frame: &Vec<u8>) -> Result<([u8; 4], T)> {
70+
ensure!(frame.len() >= 4, "Frame too short");
71+
72+
// Extract the header (first 4 bytes)
73+
let header = [frame[0], frame[1], frame[2], frame[3]];
74+
75+
// Deserialize the rest of the frame (payload after the header)
76+
let payload = serde_json::from_slice(&frame[4..])?;
77+
78+
Ok((header, payload))
79+
}

0 commit comments

Comments
 (0)