Skip to content

Commit 7e81f73

Browse files
committed
chore(pegboard): fix local cache bugs
1 parent 799de6e commit 7e81f73

File tree

7 files changed

+69
-21
lines changed

7 files changed

+69
-21
lines changed

packages/edge/infra/client/manager/src/actor/mod.rs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -155,8 +155,7 @@ impl Actor {
155155
let (_, ports) = tokio::try_join!(
156156
async {
157157
self.download_image(&ctx).await?;
158-
self.make_fs(&ctx).await?;
159-
Result::<(), anyhow::Error>::Ok(())
158+
self.make_fs(&ctx).await
160159
},
161160
async {
162161
let ports = self.bind_ports(ctx).await?;

packages/edge/infra/client/manager/src/actor/setup.rs

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -391,7 +391,8 @@ impl Actor {
391391
.arg("add")
392392
.arg(netns_path.file_name().context("bad netns path")?)
393393
.output()
394-
.await?;
394+
.await
395+
.context("failed to run `ip`")?;
395396
ensure!(
396397
cmd_out.status.success(),
397398
"failed `ip netns` command\n{}",
@@ -413,7 +414,8 @@ impl Actor {
413414
.env("CNI_IFNAME", &ctx.config().cni.network_interface)
414415
.env("CAP_ARGS", cni_params_json)
415416
.output()
416-
.await?;
417+
.await
418+
.context("failed to run `cnitool`")?;
417419
ensure!(
418420
cmd_out.status.success(),
419421
"failed `cnitool` command\n{}",

packages/edge/infra/client/manager/src/image_download_handler.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -230,11 +230,11 @@ impl ImageDownloadHandler {
230230
metrics::IMAGE_CACHE_SIZE.set(images_dir_size + image_size as i64 - removed_bytes);
231231

232232
// Update state to signify download completed successfully
233-
sqlx::query(indoc!(
233+
let foo = sqlx::query(indoc!(
234234
"
235235
UPDATE images_cache
236236
SET
237-
download_complete_ts = ?2 AND
237+
download_complete_ts = ?2,
238238
size = ?3
239239
WHERE image_id = ?1
240240
",

packages/edge/infra/client/manager/src/main.rs

Lines changed: 28 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ use tokio::{
1616
fs,
1717
runtime::{Builder, Runtime},
1818
};
19-
use tracing_subscriber::prelude::*;
19+
use tracing_subscriber::{prelude::*, EnvFilter};
2020
use url::Url;
2121

2222
mod actor;
@@ -239,8 +239,34 @@ fn init_tracing() {
239239
tracing_subscriber::registry()
240240
.with(
241241
tracing_logfmt::builder()
242+
.with_span_name(std::env::var("RUST_LOG_SPAN_NAME").map_or(false, |x| x == "1"))
243+
.with_span_path(std::env::var("RUST_LOG_SPAN_PATH").map_or(false, |x| x == "1"))
244+
.with_target(std::env::var("RUST_LOG_TARGET").map_or(false, |x| x == "1"))
245+
.with_location(std::env::var("RUST_LOG_LOCATION").map_or(false, |x| x == "1"))
246+
.with_module_path(std::env::var("RUST_LOG_MODULE_PATH").map_or(false, |x| x == "1"))
247+
.with_ansi_color(std::env::var("RUST_LOG_ANSI_COLOR").map_or(false, |x| x == "1"))
242248
.layer()
243-
.with_filter(tracing_subscriber::filter::LevelFilter::INFO),
249+
.with_filter(env_filter("RUST_LOG")),
244250
)
245251
.init();
246252
}
253+
254+
fn env_filter(env_var: &str) -> EnvFilter {
255+
// Create env filter
256+
let mut env_filter = EnvFilter::default()
257+
// Default filter
258+
.add_directive("info".parse().unwrap())
259+
// Disable verbose logs
260+
.add_directive("tokio_cron_scheduler=warn".parse().unwrap())
261+
.add_directive("tokio=warn".parse().unwrap())
262+
.add_directive("hyper=warn".parse().unwrap())
263+
.add_directive("h2=warn".parse().unwrap());
264+
265+
if let Ok(filter) = std::env::var(env_var) {
266+
for s in filter.split(',').filter(|x| !x.is_empty()) {
267+
env_filter = env_filter.add_directive(s.parse().expect("invalid env filter"));
268+
}
269+
}
270+
271+
env_filter
272+
}

packages/edge/infra/client/manager/src/utils/mod.rs

Lines changed: 24 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -378,15 +378,34 @@ pub async fn total_dir_size<P: AsRef<Path>>(path: P) -> Result<u64> {
378378
ensure!(path.is_dir(), "path is not a directory: {}", path.display());
379379

380380
let mut total_size = 0;
381-
let mut read_dir = fs::read_dir(path).await?;
382-
383-
while let Some(entry) = read_dir.next_entry().await? {
381+
let mut read_dir = fs::read_dir(path).await.context("failed to read dir")?;
382+
383+
while let Some(entry) = read_dir.next_entry().await.transpose() {
384+
let entry = match entry {
385+
Ok(entry) => entry,
386+
Err(err) => {
387+
tracing::debug!(?err, "failed to read entry");
388+
continue;
389+
}
390+
};
384391
let entry_path = entry.path();
385392

386393
if entry_path.is_dir() {
387-
total_size += Box::pin(total_dir_size(entry_path)).await?;
394+
match Box::pin(total_dir_size(entry_path)).await {
395+
Ok(size) => total_size += size,
396+
Err(err) => {
397+
tracing::debug!(?err, p=?entry.path().display(), "failed to calculate size for directory");
398+
continue;
399+
}
400+
}
388401
} else {
389-
total_size += fs::metadata(entry_path).await?.len();
402+
match fs::metadata(entry_path).await {
403+
Ok(metadata) => total_size += metadata.len(),
404+
Err(err) => {
405+
tracing::debug!(?err, p=?entry.path().display(), "failed to get metadata for file");
406+
continue;
407+
}
408+
}
390409
}
391410
}
392411

packages/edge/infra/client/manager/tests/container_lifecycle.rs

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -83,9 +83,6 @@ async fn handle_connection(
8383
send_init_packet(&mut tx).await;
8484

8585
start_echo_actor(&mut tx, actor_id).await;
86-
start_echo_actor(&mut tx, Uuid::new_v4()).await;
87-
88-
tokio::time::sleep(std::time::Duration::from_millis(10000)).await;
8986
}
9087
protocol::ToServer::Events(events) => {
9188
for event in events {
@@ -188,7 +185,7 @@ async fn handle_connection(
188185
);
189186
}
190187

191-
tokio::time::sleep(Duration::from_millis(50)).await;
188+
tokio::time::sleep(Duration::from_millis(1000)).await;
192189

193190
// Verify client state
194191
let actors = ctx.actors().read().await;

packages/edge/infra/client/manager/tests/isolate_lifecycle.rs

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -40,8 +40,13 @@ async fn isolate_lifecycle() {
4040

4141
// Init project directories
4242
let tmp_dir = tempfile::TempDir::new().unwrap();
43-
let config = init_client(&gen_tmp_dir_path, tmp_dir.path()).await;
44-
tracing::info!(path=%tmp_dir.path().display(), "client dir");
43+
let path = tmp_dir.path();
44+
// let path = std::path::Path::new(
45+
// "/home/rivet/rivet-ee/oss/packages/edge/infra/client/manager/tests/foo",
46+
// );
47+
48+
let config = init_client(&gen_tmp_dir_path, &path).await;
49+
tracing::info!(path=%path.display(), "client dir");
4550

4651
start_client(config, ctx_wrapper, close_rx.clone(), port).await;
4752
}
@@ -121,7 +126,7 @@ async fn handle_connection(
121126
"actor not in client memory"
122127
);
123128

124-
tokio::time::sleep(std::time::Duration::from_millis(250))
129+
tokio::time::sleep(std::time::Duration::from_millis(1000))
125130
.await;
126131

127132
tracing::info!("sending echo");
@@ -180,7 +185,7 @@ async fn handle_connection(
180185
);
181186
}
182187

183-
tokio::time::sleep(Duration::from_millis(5)).await;
188+
tokio::time::sleep(Duration::from_millis(50)).await;
184189

185190
// Verify client state
186191
let actors = ctx.actors().read().await;

0 commit comments

Comments
 (0)