Skip to content

Commit f31e0fe

Browse files
committed
fix(pegboard): convert ws manager socket to unix socket
1 parent 4f10a0d commit f31e0fe

File tree

6 files changed

+156
-175
lines changed

6 files changed

+156
-175
lines changed

packages/edge/infra/client/config/src/manager.rs

Lines changed: 0 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -85,13 +85,6 @@ pub struct Runner {
8585
/// Whether or not to use a mount for actor file systems.
8686
pub use_mounts: Option<bool>,
8787

88-
/// Address of the WebSocket server for runners. Should exist on a network interface that both the host
89-
/// and containers can access.
90-
pub ip: Option<IpAddr>,
91-
92-
/// WebSocket port for runners on this machine to connect to.
93-
pub port: Option<u16>,
94-
9588
pub container_runner_binary_path: Option<PathBuf>,
9689
}
9790

@@ -100,14 +93,6 @@ impl Runner {
10093
self.use_mounts.unwrap_or(true)
10194
}
10295

103-
pub fn ip(&self) -> IpAddr {
104-
self.ip.unwrap_or(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)))
105-
}
106-
107-
pub fn port(&self) -> u16 {
108-
self.port.unwrap_or(6080)
109-
}
110-
11196
pub fn container_runner_binary_path(&self) -> PathBuf {
11297
self.container_runner_binary_path
11398
.clone()

packages/edge/infra/client/config/src/runner_protocol.rs

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,3 +38,24 @@ pub enum ActorState {
3838
Running,
3939
Exited { exit_code: Option<i32> },
4040
}
41+
42+
pub fn codec() -> LengthDelimitedCodec {
43+
LengthDelimitedCodec::builder()
44+
.length_field_type::<u32>()
45+
.length_field_length(4)
46+
// No offset
47+
.length_field_offset(0)
48+
// Header length is not included in the length calculation
49+
.length_adjustment(4)
50+
// header is included in the returned bytes
51+
.num_skip(0)
52+
.new_codec()
53+
}
54+
55+
pub fn encode_frame() {
56+
57+
}
58+
59+
pub fn decode_frame() {
60+
61+
}

packages/edge/infra/client/manager/src/ctx.rs

Lines changed: 4 additions & 95 deletions
Original file line numberDiff line numberDiff line change
@@ -220,44 +220,6 @@ impl Ctx {
220220

221221
self.receive_init(&mut rx).await?;
222222

223-
// Start runner socket and attaches incoming connections to their corresponding runner
224-
let self2 = self.clone();
225-
let runner_socket: tokio::task::JoinHandle<Result<()>> = tokio::spawn(async move {
226-
tracing::info!(port=%self2.config().runner.port(), "listening for runner sockets");
227-
228-
let listener =
229-
TcpListener::bind((self2.config().runner.ip(), self2.config().runner.port()))
230-
.await
231-
.map_err(RuntimeError::RunnerSocketListenFailed)?;
232-
233-
loop {
234-
match listener.accept().await {
235-
Ok((stream, _)) => {
236-
let mut ws_stream = Some(tokio_tungstenite::accept_async(stream).await?);
237-
238-
tracing::info!("received new socket");
239-
240-
if let Err(err) = self2.receive_runner_init_message(&mut ws_stream).await {
241-
tracing::error!(
242-
?err,
243-
"failed to receive init message from runner socket"
244-
);
245-
}
246-
247-
// Close stream
248-
if let Some(mut ws_stream) = ws_stream {
249-
let close_frame = CloseFrame {
250-
code: CloseCode::Error,
251-
reason: "init failed".into(),
252-
};
253-
ws_stream.send(Message::Close(Some(close_frame))).await?;
254-
}
255-
}
256-
Err(err) => tracing::error!(?err, "failed to connect websocket"),
257-
}
258-
}
259-
});
260-
261223
// Start ping thread after init packet is received because ping denotes this client as "ready"
262224
let self2 = self.clone();
263225
let ping_thread: tokio::task::JoinHandle<Result<()>> = tokio::spawn(async move {
@@ -549,63 +511,6 @@ impl Ctx {
549511
Ok(())
550512
}
551513

552-
async fn receive_runner_init_message(
553-
self: &Arc<Self>,
554-
ws_stream: &mut Option<WebSocketStream<TcpStream>>,
555-
) -> Result<()> {
556-
match tokio::time::timeout(
557-
RUNNER_INIT_TIMEOUT,
558-
ws_stream.as_mut().context("ws_stream should exist")?.next(),
559-
)
560-
.await
561-
{
562-
Ok(msg) => match msg {
563-
Some(Ok(Message::Close(_))) | None => {
564-
tracing::debug!("runner socket closed");
565-
Ok(())
566-
}
567-
Some(Ok(Message::Binary(buf))) => {
568-
match serde_json::from_slice::<runner_protocol::ToManager>(&buf) {
569-
Ok(runner_protocol::ToManager::Init { access_token }) => {
570-
let claims = Claims::decode(&access_token, self.secret())
571-
.context("could not decode access token from init packet")?;
572-
573-
// Read runner id from claims
574-
let runner_id = 'ent: {
575-
for ent in claims.ent() {
576-
#[allow(irrefutable_let_patterns)]
577-
if let Entitlement::Runner { runner_id } = ent {
578-
break 'ent runner_id;
579-
}
580-
}
581-
582-
bail!("no runner entitlement");
583-
};
584-
585-
if let Some(runner) = self.runners.read().await.get(&runner_id) {
586-
runner
587-
.attach_socket(
588-
self,
589-
ws_stream.take().context("ws_stream should exist")?,
590-
)
591-
.await?;
592-
593-
Ok(())
594-
} else {
595-
bail!("killing unknown runner: {runner_id}");
596-
}
597-
}
598-
Ok(packet) => bail!("invalid init packet from runner: {packet:?}"),
599-
Err(err) => Err(err).context("invalid init packet from runner"),
600-
}
601-
}
602-
Some(Ok(packet)) => bail!("invalid init packet from runner: {packet:?}"),
603-
Some(Err(err)) => Err(err).context("runner socket error"),
604-
},
605-
Err(_) => bail!("runner socket init timed out"),
606-
}
607-
}
608-
609514
/// Returns None if the runner could not be found in the runners map on time.
610515
async fn get_or_create_runner(
611516
&self,
@@ -892,6 +797,10 @@ impl Ctx {
892797
let runner = runner.clone();
893798
let self2 = self.clone();
894799
tokio::spawn(async move {
800+
// The socket file already exists, this will reconnect and spawn the appropriate task to
801+
// handle the connection
802+
runner.start_socket(&self2).await?;
803+
895804
if let Err(err) = runner.observe(&self2, false).await {
896805
tracing::error!(?runner_id, ?err, "observe failed");
897806
}

packages/edge/infra/client/manager/src/runner/mod.rs

Lines changed: 47 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -111,7 +111,7 @@ impl Runner {
111111
pub async fn attach_socket(
112112
self: &Arc<Self>,
113113
ctx: &Arc<Ctx>,
114-
ws_stream: WebSocketStream<TcpStream>,
114+
stream: WebSocketStream<TcpStream>,
115115
) -> Result<()> {
116116
match &self.comms {
117117
Comms::Basic => bail!("attempt to attach socket to basic runner"),
@@ -120,33 +120,36 @@ impl Runner {
120120

121121
let mut guard = tx.lock().await;
122122

123-
if let Some(existing_ws_tx) = &mut *guard {
123+
if let Some(existing_tx) = &mut *guard {
124124
tracing::info!(runner_id=?self.runner_id, "runner received another socket, closing old one");
125125

126126
// Close the old socket
127127
let close_frame = CloseFrame {
128128
code: CloseCode::Error,
129129
reason: "replacing with new socket".into(),
130130
};
131-
existing_ws_tx
132-
.send(Message::Close(Some(close_frame)))
133-
.await?;
131+
132+
if let Err(err) = existing_tx.send(Message::Close(Some(close_frame))).await {
133+
tracing::error!(runner_id=?self.runner_id, ?err, "failed to close old socket");
134+
};
134135

135136
tracing::info!(runner_id=?self.runner_id, "socket replaced");
136137
} else {
137138
tracing::info!(runner_id=?self.runner_id, "socket attached");
138139
}
139140

140-
let (ws_tx, ws_rx) = ws_stream.split();
141+
// Wrap the stream in a framed transport
142+
let mut framed = Framed::new(stream, runner_protocol::codec());
143+
let (tx, rx) = stream.split();
141144

142-
*guard = Some(ws_tx);
145+
*guard = Some(tx);
143146
self.bump();
144147

145148
// Spawn a new thread to handle incoming messages
146149
let self2 = self.clone();
147150
let ctx2 = ctx.clone();
148151
tokio::task::spawn(async move {
149-
if let Err(err) = self2.receive_messages(&ctx2, ws_rx).await {
152+
if let Err(err) = self2.receive_frames(&ctx2, rx).await {
150153
tracing::error!(runner_id=?self2.runner_id, ?err, "socket error, killing runner");
151154

152155
if let Err(err) = self2.signal(Signal::SIGKILL).await {
@@ -160,52 +163,46 @@ impl Runner {
160163
Ok(())
161164
}
162165

163-
async fn receive_messages(
164-
&self,
165-
ctx: &Ctx,
166-
mut ws_rx: SplitStream<WebSocketStream<TcpStream>>,
167-
) -> Result<()> {
166+
async fn receive_frames(&self, ctx: &Ctx, mut ws_rx: SplitStream<UnixStream>) -> Result<()> {
168167
loop {
169-
match tokio::time::timeout(PING_TIMEOUT, ws_rx.next()).await {
170-
Ok(msg) => match msg {
171-
Some(Ok(Message::Ping(_))) => {
172-
// Pongs are sent automatically
173-
}
174-
Some(Ok(Message::Close(_))) | None => {
175-
tracing::debug!(runner_id=?self.runner_id, "runner socket closed");
176-
break Ok(());
177-
}
178-
Some(Ok(Message::Binary(buf))) => {
179-
let packet = serde_json::from_slice::<runner_protocol::ToManager>(&buf)?;
168+
let Ok(frame) = tokio::time::timeout(PING_TIMEOUT, ws_rx.next()).await else {
169+
bail!("runner socket ping timed out");
170+
};
180171

181-
self.process_packet(ctx, packet).await?;
182-
}
183-
Some(Ok(packet)) => bail!("runner socket unexpected packet: {packet:?}"),
184-
Some(Err(err)) => break Err(err).context("runner socket error"),
185-
},
186-
Err(_) => bail!("socket ping timed out"),
187-
}
188-
}
189-
}
172+
let Some(buf) = frame else {
173+
tracing::debug!(runner_id=?self.runner_id, "runner socket closed");
174+
break Ok(());
175+
};
176+
177+
let (_, packet) = runner_protocol::decode_frame::<runner_protocol::ToManager>(&buf)?;
178+
179+
tracing::debug!(?packet, "runner received packet");
190180

191-
async fn process_packet(&self, ctx: &Ctx, packet: runner_protocol::ToManager) -> Result<()> {
192-
tracing::debug!(?packet, "runner received packet");
193-
194-
match packet {
195-
runner_protocol::ToManager::Init { .. } => bail!("unexpected second init packet"),
196-
runner_protocol::ToManager::ActorStateUpdate {
197-
actor_id,
198-
generation,
199-
state,
200-
} => {
201-
// NOTE: We don't have to verify if the actor id given here is valid because only valid actors
202-
// are listening to this runner's `actor_observer_tx`. This means invalid messages are ignored.
203-
// NOTE: No receivers is not an error
204-
let _ = self.actor_observer_tx.send((actor_id, generation, state));
181+
match packet {
182+
runner_protocol::ToManager::Ping { .. } => {
183+
let socket = socket
184+
.lock()
185+
.await
186+
.context("must have socket if receiving ping")?;
187+
188+
let buf = runner_protocol::encode_frame(runner_protocol::ToRunner::Pong)?;
189+
socket
190+
.send(buf)
191+
.await
192+
.context("failed to send packet to socket")?;
193+
}
194+
runner_protocol::ToManager::ActorStateUpdate {
195+
actor_id,
196+
generation,
197+
state,
198+
} => {
199+
// NOTE: We don't have to verify if the actor id given here is valid because only valid actors
200+
// are listening to this runner's `actor_observer_tx`. This means invalid messages are ignored.
201+
// NOTE: No receivers is not an error
202+
let _ = self.actor_observer_tx.send((actor_id, generation, state));
203+
}
205204
}
206205
}
207-
208-
Ok(())
209206
}
210207

211208
pub async fn send(&self, packet: &runner_protocol::ToRunner) -> Result<()> {
@@ -618,7 +615,7 @@ impl Runner {
618615

619616
pub enum Comms {
620617
Basic,
621-
Socket(Mutex<Option<SplitSink<WebSocketStream<TcpStream>, Message>>>),
618+
Socket(Mutex<Option<SplitSink<UnixStream, u32>>>),
622619
}
623620

624621
impl Comms {

packages/edge/infra/client/manager/src/runner/oci_config.rs

Lines changed: 19 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ pub struct ConfigOpts<'a> {
1414
pub cpu: u64,
1515
pub memory: u64,
1616
pub memory_max: u64,
17+
pub mount_socket: bool,
1718
}
1819

1920
/// Generates base config.json for an OCI bundle.
@@ -160,7 +161,7 @@ fn capabilities() -> Vec<&'static str> {
160161
}
161162

162163
fn mounts(opts: &ConfigOpts) -> Result<serde_json::Value> {
163-
Ok(json!([
164+
let mut mounts = json!([
164165
{
165166
"destination": "/proc",
166167
"type": "proc",
@@ -245,7 +246,18 @@ fn mounts(opts: &ConfigOpts) -> Result<serde_json::Value> {
245246
"source": opts.runner_path.join("resolv.conf").to_str().context("resolv.conf path")?,
246247
"options": ["rbind", "rprivate"]
247248
},
248-
]))
249+
]);
250+
251+
if let Some(socket_path) = opts.mount_socket {
252+
mounts.as_array().unwrap().push(json!({
253+
"destination": socket_mount_dest_path().to_str().context("manager.sock dest path")?,
254+
"type": "bind",
255+
"source": opts.runner_path.join("manager.sock").to_str().context("manager.sock source path")?,
256+
"options": ["rbind", "ro"]
257+
},));
258+
}
259+
260+
Ok()
249261
}
250262

251263
fn linux_resources_devices() -> serde_json::Value {
@@ -322,3 +334,8 @@ fn linux_resources_devices() -> serde_json::Value {
322334
}
323335
])
324336
}
337+
338+
/// Mounting path of the manager socket inside of the container.
339+
pub fn socket_mount_dest_path() -> Path {
340+
Path::new("/srv/sockets/manager.sock")
341+
}

0 commit comments

Comments
 (0)