Skip to content

Commit 6ea0835

Browse files
committed
chore(pegboard): replace computing image size manually with using tar bytes read
1 parent dfc1837 commit 6ea0835

File tree

4 files changed

+62
-54
lines changed

4 files changed

+62
-54
lines changed

packages/edge/infra/client/isolate-v8-runner/src/main.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -233,7 +233,7 @@ async fn read_packet(
233233
return Ok(Packet::None);
234234
}
235235
Some(Ok(Message::Pong(_))) => {
236-
tracing::debug!("received pong");
236+
tracing::trace!("received pong");
237237
return Ok(Packet::Pong);
238238
}
239239
Some(Ok(msg)) => bail!("unexpected message: {msg:?}"),

packages/edge/infra/client/manager/src/ctx.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -371,7 +371,7 @@ impl Ctx {
371371

372372
self.process_packet(packet).await?;
373373
}
374-
Message::Pong(_) => tracing::debug!("received pong"),
374+
Message::Pong(_) => tracing::trace!("received pong"),
375375
Message::Close(Some(close_frame)) => {
376376
return Err(RuntimeError::SocketClosed(
377377
close_frame.code,

packages/edge/infra/client/manager/src/image_download_handler.rs

Lines changed: 60 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -220,11 +220,18 @@ impl ImageDownloadHandler {
220220
// Release lock on sqlite pool
221221
drop(conn);
222222

223-
self.download_inner(ctx, image_config).await?;
223+
// Download image & compute size
224+
//
225+
// `image_size` is a slight over-estimate of the image size, since this is
226+
// counting the number of bytes read from the tar. This is fine since
227+
// over-estimating is safe for caching.
228+
let download_start_instant = Instant::now();
229+
let image_size = self.download_inner(ctx, image_config).await?;
230+
let download_duration = download_start_instant.elapsed().as_secs_f64();
231+
232+
let convert_start_instant = Instant::now();
224233
self.convert(ctx, image_config).await?;
225-
226-
// Calculate dir size after unpacking image and save to db
227-
let image_size = utils::total_dir_size(&image_path).await?;
234+
let convert_duration = convert_start_instant.elapsed().as_secs_f64();
228235

229236
// Update metrics after unpacking
230237
metrics::IMAGE_CACHE_SIZE.set(images_dir_size + image_size as i64 - removed_bytes);
@@ -245,9 +252,14 @@ impl ImageDownloadHandler {
245252
.execute(&mut *ctx.sql().await?)
246253
.await?;
247254

248-
let duration = start_instant.elapsed().as_secs_f64();
249-
crate::metrics::DOWNLOAD_IMAGE_DURATION.observe(duration);
250-
tracing::info!(duration_seconds = duration, "image download completed");
255+
let total_duration = start_instant.elapsed().as_secs_f64();
256+
crate::metrics::DOWNLOAD_IMAGE_DURATION.observe(total_duration);
257+
tracing::info!(
258+
total_duration,
259+
download_duration,
260+
convert_duration,
261+
"image download completed"
262+
);
251263

252264
// The lock on entry is held until this point. After this any other parallel downloaders will
253265
// continue with the image already downloaded
@@ -258,7 +270,7 @@ impl ImageDownloadHandler {
258270
Ok(())
259271
}
260272

261-
async fn download_inner(&self, ctx: &Ctx, image_config: &protocol::Image) -> Result<()> {
273+
async fn download_inner(&self, ctx: &Ctx, image_config: &protocol::Image) -> Result<u64> {
262274
let image_path = ctx.image_path(image_config.id);
263275

264276
let addresses = self.get_image_addresses(ctx, image_config).await?;
@@ -318,7 +330,7 @@ impl ImageDownloadHandler {
318330

319331
// Use curl piped to tar for extraction
320332
format!(
321-
"curl -sSfL '{}' | tar -x -C '{}'",
333+
"curl -sSfL '{}' | tar -x --totals -C '{}'",
322334
url,
323335
image_path.display()
324336
)
@@ -336,7 +348,7 @@ impl ImageDownloadHandler {
336348

337349
// Use curl piped to lz4 for decompression, then to tar for extraction
338350
format!(
339-
"curl -sSfL '{}' | lz4 -d | tar -x -C '{}'",
351+
"curl -sSfL '{}' | lz4 -d | tar -x --totals -C '{}'",
340352
url,
341353
image_path.display()
342354
)
@@ -349,9 +361,27 @@ impl ImageDownloadHandler {
349361

350362
match cmd_result {
351363
Ok(output) if output.status.success() => {
352-
tracing::info!(image_id=?image_config.id, ?url, "successfully downloaded image");
364+
// Parse bytes read from tar to get dir size efficiently
365+
//
366+
// This is an over-estimate since the size on disk is smaller than the actual
367+
// tar
368+
let stderr = String::from_utf8_lossy(&output.stderr);
369+
let bytes_read = match parse_tar_total_bytes(&stderr) {
370+
Some(x) => x,
371+
None =>{
372+
tracing::error!(%stderr, "failed to parse bytes read from tar output");
373+
bail!("failed to parse bytes read from tar output")
374+
}
375+
};
353376

354-
return Ok(());
377+
tracing::info!(
378+
image_id=?image_config.id,
379+
?url,
380+
bytes_read=?bytes_read,
381+
"successfully downloaded image"
382+
);
383+
384+
return Ok(bytes_read);
355385
}
356386
Ok(output) => {
357387
// Command ran but failed
@@ -488,3 +518,21 @@ impl ImageDownloadHandler {
488518
Ok(addresses)
489519
}
490520
}
521+
522+
/// Parses total bytes read from tar output.
523+
fn parse_tar_total_bytes(stderr: &str) -> Option<u64> {
524+
// Example: "Total bytes read: 646737920 (617MiB, 339MiB/s)"
525+
for line in stderr.lines() {
526+
if line.starts_with("Total bytes read: ") {
527+
if let Some(bytes_str) = line.strip_prefix("Total bytes read: ") {
528+
if let Some(space_pos) = bytes_str.find(' ') {
529+
let bytes_part = &bytes_str[..space_pos];
530+
if let Ok(bytes) = bytes_part.parse::<u64>() {
531+
return Some(bytes);
532+
}
533+
}
534+
}
535+
}
536+
}
537+
None
538+
}

packages/edge/infra/client/manager/src/utils/mod.rs

Lines changed: 0 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -371,43 +371,3 @@ pub async fn copy_dir_all<P: AsRef<Path>, Q: AsRef<Path>>(src: P, dst: Q) -> Res
371371
Ok(())
372372
}
373373

374-
/// Calculates the total size of a folder in bytes.
375-
pub async fn total_dir_size<P: AsRef<Path>>(path: P) -> Result<u64> {
376-
let path = path.as_ref();
377-
378-
ensure!(path.is_dir(), "path is not a directory: {}", path.display());
379-
380-
let mut total_size = 0;
381-
let mut read_dir = fs::read_dir(path).await.context("failed to read dir")?;
382-
383-
while let Some(entry) = read_dir.next_entry().await.transpose() {
384-
let entry = match entry {
385-
Ok(entry) => entry,
386-
Err(err) => {
387-
tracing::debug!(?err, "failed to read entry");
388-
continue;
389-
}
390-
};
391-
let entry_path = entry.path();
392-
393-
if entry_path.is_dir() {
394-
match Box::pin(total_dir_size(entry_path)).await {
395-
Ok(size) => total_size += size,
396-
Err(err) => {
397-
tracing::debug!(?err, p=?entry.path().display(), "failed to calculate size for directory");
398-
continue;
399-
}
400-
}
401-
} else {
402-
match fs::metadata(entry_path).await {
403-
Ok(metadata) => total_size += metadata.len(),
404-
Err(err) => {
405-
tracing::debug!(?err, p=?entry.path().display(), "failed to get metadata for file");
406-
continue;
407-
}
408-
}
409-
}
410-
}
411-
412-
Ok(total_size)
413-
}

0 commit comments

Comments
 (0)