Skip to content

Commit ad0ed8a

Browse files
committed
Move socket to beginning of worker startup
- Moved socket connection to beginning of worker startup. - Seemed to be causing landlock violations that didn't happen before. - Removed the conversion to a tokio socket, to rule this out as a cause (we have to eventually remove the tokio dependency anyway). - Realized we were still removing the socket on the host-side, which wasn't needed (on failed rendezvous the host will just wipe the whole worker dir). Removed this removal, to remove a potential cause. - Thought we needed a landlock exception for the socket, so I expanded `try_restrict` to allow for multiple exceptions (wasn't needed in the end, but left it in because why not) - Saw that Landlock was ignoring exceptions for files that didn't exist. I added a check for this case. It clued me in to the actual problem. - Added a bunch of logging and some tests to try to narrow down this befuddling issue. - The socket was fine. Tthe issue turned out to be that landlock exceptions are based on fd's and not paths. We were creating new files for each new job, so except for the first excepted files, no exceptions were accepted. - Applied an exception to the whole worker dir and called it a day.
1 parent a696d40 commit ad0ed8a

File tree

10 files changed

+288
-217
lines changed

10 files changed

+288
-217
lines changed

polkadot/node/core/pvf/common/src/execute.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ pub struct Handshake {
2929
}
3030

3131
/// The response from an execution job on the worker.
32-
#[derive(Encode, Decode)]
32+
#[derive(Debug, Encode, Decode)]
3333
pub enum Response {
3434
/// The job completed successfully.
3535
Ok {

polkadot/node/core/pvf/common/src/lib.rs

Lines changed: 15 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -31,8 +31,11 @@ pub use sp_tracing;
3131

3232
const LOG_TARGET: &str = "parachain::pvf-common";
3333

34-
use std::mem;
35-
use tokio::io::{self, AsyncRead, AsyncReadExt as _, AsyncWrite, AsyncWriteExt as _};
34+
use std::{
35+
io::{Read, Write},
36+
mem,
37+
};
38+
use tokio::io;
3639

3740
#[cfg(feature = "test-utils")]
3841
pub mod tests {
@@ -51,20 +54,22 @@ pub struct SecurityStatus {
5154
pub can_unshare_user_namespace_and_change_root: bool,
5255
}
5356

54-
/// Write some data prefixed by its length into `w`.
55-
pub async fn framed_send(w: &mut (impl AsyncWrite + Unpin), buf: &[u8]) -> io::Result<()> {
57+
/// Write some data prefixed by its length into `w`. Sync version of `framed_send` to avoid
58+
/// dependency on tokio.
59+
pub fn framed_send_blocking(w: &mut (impl Write + Unpin), buf: &[u8]) -> io::Result<()> {
5660
let len_buf = buf.len().to_le_bytes();
57-
w.write_all(&len_buf).await?;
58-
w.write_all(buf).await?;
61+
w.write_all(&len_buf)?;
62+
w.write_all(buf)?;
5963
Ok(())
6064
}
6165

62-
/// Read some data prefixed by its length from `r`.
63-
pub async fn framed_recv(r: &mut (impl AsyncRead + Unpin)) -> io::Result<Vec<u8>> {
66+
/// Read some data prefixed by its length from `r`. Sync version of `framed_recv` to avoid
67+
/// dependency on tokio.
68+
pub fn framed_recv_blocking(r: &mut (impl Read + Unpin)) -> io::Result<Vec<u8>> {
6469
let mut len_buf = [0u8; mem::size_of::<usize>()];
65-
r.read_exact(&mut len_buf).await?;
70+
r.read_exact(&mut len_buf)?;
6671
let len = usize::from_le_bytes(len_buf);
6772
let mut buf = vec![0; len];
68-
r.read_exact(&mut buf).await?;
73+
r.read_exact(&mut buf)?;
6974
Ok(buf)
7075
}

polkadot/node/core/pvf/common/src/worker/mod.rs

Lines changed: 79 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -24,12 +24,12 @@ use futures::never::Never;
2424
use std::{
2525
any::Any,
2626
fmt,
27-
os::unix::net::UnixStream as StdUnixStream,
27+
os::unix::net::UnixStream,
2828
path::PathBuf,
2929
sync::mpsc::{Receiver, RecvTimeoutError},
3030
time::Duration,
3131
};
32-
use tokio::{io, net::UnixStream, runtime::Runtime};
32+
use tokio::{io, runtime::Runtime};
3333

3434
/// Use this macro to declare a `fn main() {}` that will create an executable that can be used for
3535
/// spawning the desired worker.
@@ -50,6 +50,8 @@ macro_rules! decl_worker_main {
5050
// See <https://github.com/paritytech/polkadot/issues/7117>.
5151
$crate::sp_tracing::try_init_simple();
5252

53+
let worker_pid = std::process::id();
54+
5355
let args = std::env::args().collect::<Vec<_>>();
5456
if args.len() == 1 {
5557
print_help($expected_command);
@@ -75,20 +77,25 @@ macro_rules! decl_worker_main {
7577
},
7678
"--check-can-unshare-user-namespace-and-change-root" => {
7779
#[cfg(target_os = "linux")]
78-
let status = if security::unshare_user_namespace_and_change_root(
79-
$crate::worker::WorkerKind::Execute,
80+
let status = if let Err(err) = security::unshare_user_namespace_and_change_root(
81+
$crate::worker::WorkerKind::CheckPivotRoot,
82+
worker_pid,
8083
// We're not accessing any files, so we can try to pivot_root in the temp
8184
// dir without conflicts with other processes.
8285
&std::env::temp_dir(),
83-
)
84-
.is_ok()
85-
{
86-
0
87-
} else {
86+
) {
87+
// Write the error to stderr, log it on the host-side.
88+
eprintln!("{}", err);
8889
-1
90+
} else {
91+
0
8992
};
9093
#[cfg(not(target_os = "linux"))]
91-
let status = -1;
94+
let status = {
95+
// Write the error to stderr, log it on the host-side.
96+
eprintln!("not available on macos");
97+
-1
98+
};
9299
std::process::exit(status)
93100
},
94101

@@ -153,13 +160,15 @@ pub const JOB_TIMEOUT_OVERHEAD: Duration = Duration::from_millis(50);
153160
pub enum WorkerKind {
154161
Prepare,
155162
Execute,
163+
CheckPivotRoot,
156164
}
157165

158166
impl fmt::Display for WorkerKind {
159167
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
160168
match self {
161169
Self::Prepare => write!(f, "prepare"),
162170
Self::Execute => write!(f, "execute"),
171+
Self::CheckPivotRoot => write!(f, "check pivot root"),
163172
}
164173
}
165174
}
@@ -178,13 +187,61 @@ pub fn worker_event_loop<F, Fut>(
178187
Fut: futures::Future<Output = io::Result<Never>>,
179188
{
180189
let worker_pid = std::process::id();
181-
gum::debug!(target: LOG_TARGET, %worker_pid, ?worker_dir_path, "starting pvf worker ({})", worker_kind);
190+
gum::debug!(
191+
target: LOG_TARGET,
192+
%worker_pid,
193+
?worker_dir_path,
194+
?security_status,
195+
"starting pvf worker ({})",
196+
worker_kind
197+
);
198+
199+
// Check for a mismatch between the node and worker versions.
200+
if let (Some(node_version), Some(worker_version)) = (node_version, worker_version) {
201+
if node_version != worker_version {
202+
gum::error!(
203+
target: LOG_TARGET,
204+
%worker_kind,
205+
%worker_pid,
206+
%node_version,
207+
%worker_version,
208+
"Node and worker version mismatch, node needs restarting, forcing shutdown",
209+
);
210+
kill_parent_node_in_emergency();
211+
worker_shutdown_message(worker_kind, worker_pid, "Version mismatch");
212+
return
213+
}
214+
}
215+
216+
// Make sure that we can read the worker dir path, and log its contents.
217+
let entries = || -> Result<Vec<_>, io::Error> {
218+
std::fs::read_dir(&worker_dir_path)?
219+
.map(|res| res.map(|e| e.file_name()))
220+
.collect()
221+
}();
222+
match entries {
223+
Ok(entries) =>
224+
gum::trace!(target: LOG_TARGET, %worker_pid, ?worker_dir_path, "content of worker dir: {:?}", entries),
225+
Err(err) => {
226+
gum::error!(
227+
target: LOG_TARGET,
228+
%worker_kind,
229+
%worker_pid,
230+
?worker_dir_path,
231+
"Could not read worker dir: {}",
232+
err.to_string()
233+
);
234+
worker_shutdown_message(worker_kind, worker_pid, &err.to_string());
235+
return
236+
},
237+
}
182238

183239
// Connect to the socket.
184-
let stream = || -> std::io::Result<StdUnixStream> {
185-
let socket_path = worker_dir::socket(&worker_dir_path);
186-
let stream = StdUnixStream::connect(&socket_path)?;
187-
stream.set_nonblocking(true)?; // See note for `from_std`.
240+
let socket_path = worker_dir::socket(&worker_dir_path);
241+
let stream = || -> std::io::Result<UnixStream> {
242+
let stream = UnixStream::connect(&socket_path)?;
243+
// Remove the socket here. We don't also need to do this on the host-side; on failed
244+
// rendezvous, the host will delete the whole worker dir.
188245
std::fs::remove_file(&socket_path)?;
189246
Ok(stream)
190247
}();
@@ -203,23 +260,6 @@ pub fn worker_event_loop<F, Fut>(
203260
},
204261
};
205262

206-
// Check for a mismatch between the node and worker versions.
207-
if let (Some(node_version), Some(worker_version)) = (node_version, worker_version) {
208-
if node_version != worker_version {
209-
gum::error!(
210-
target: LOG_TARGET,
211-
%worker_kind,
212-
%worker_pid,
213-
%node_version,
214-
%worker_version,
215-
"Node and worker version mismatch, node needs restarting, forcing shutdown",
216-
);
217-
kill_parent_node_in_emergency();
218-
worker_shutdown_message(worker_kind, worker_pid, "Version mismatch");
219-
return
220-
}
221-
}
222-
223263
// Enable some security features.
224264
{
225265
// Call based on whether we can change root. Error out if it should work but fails.
@@ -230,9 +270,11 @@ pub fn worker_event_loop<F, Fut>(
230270
// > CLONE_NEWUSER requires that the calling process is not threaded.
231271
#[cfg(target_os = "linux")]
232272
if security_status.can_unshare_user_namespace_and_change_root {
233-
if let Err(err) =
234-
security::unshare_user_namespace_and_change_root(worker_kind, &worker_dir_path)
235-
{
273+
if let Err(err) = security::unshare_user_namespace_and_change_root(
274+
worker_kind,
275+
worker_pid,
276+
&worker_dir_path,
277+
) {
236278
// The filesystem may be in an inconsistent state, bail out.
237279
gum::error!(
238280
target: LOG_TARGET,
@@ -251,7 +293,7 @@ pub fn worker_event_loop<F, Fut>(
251293
#[cfg(target_os = "linux")]
252294
if security_status.can_enable_landlock {
253295
let landlock_status =
254-
security::landlock::enable_for_worker(worker_kind, &worker_dir_path);
296+
security::landlock::enable_for_worker(worker_kind, worker_pid, &worker_dir_path);
255297
if !matches!(landlock_status, Ok(landlock::RulesetStatus::FullyEnforced)) {
256298
// We previously were able to enable, so this should never happen.
257299
//
@@ -284,10 +326,7 @@ pub fn worker_event_loop<F, Fut>(
284326
// Run the main worker loop.
285327
let rt = Runtime::new().expect("Creates tokio runtime. If this panics the worker will die and the host will detect that and deal with it.");
286328
let err = rt
287-
.block_on(async move {
288-
let stream = UnixStream::from_std(stream)?;
289-
event_loop(stream, worker_dir_path).await
290-
})
329+
.block_on(event_loop(stream, worker_dir_path))
291330
// It's never `Ok` because it's `Ok(Never)`.
292331
.unwrap_err();
293332

0 commit comments

Comments
 (0)