From d6f26f94424444a4ad2f9f8fde9fdd11342ac6e1 Mon Sep 17 00:00:00 2001 From: Nathan Flurry Date: Sat, 28 Jun 2025 02:59:01 +0000 Subject: [PATCH] chore: cargo fmt --- .../common/api-helper/build/src/macro_util.rs | 2 +- .../common/clickhouse-inserter/src/error.rs | 18 ++--- .../config/src/config/server/rivet/mod.rs | 6 +- packages/common/pools/src/db/redis.rs | 2 +- packages/common/pools/src/error.rs | 6 +- packages/common/pools/src/pools.rs | 5 +- packages/common/pools/src/prelude.rs | 6 +- packages/common/util/core/src/format.rs | 4 +- packages/core/api/actor/src/route/builds.rs | 10 +-- .../api/provision/src/route/datacenters.rs | 6 +- packages/core/api/status/src/route/actor.rs | 12 ++- packages/core/api/ui/src/route.rs | 25 ++++--- .../build/src/ops/resolve_for_tags.rs | 22 +++--- .../cluster/src/ops/datacenter/mod.rs | 2 +- .../src/ops/datacenter/server_discovery.rs | 5 +- .../dynamic-config/src/ops/get_config.rs | 11 ++- packages/edge/api/actor/src/route/actors.rs | 2 +- .../edge/api/intercom/src/route/pegboard.rs | 30 ++++---- .../edge/infra/client/config/src/manager.rs | 2 +- .../infra/client/manager/src/actor/setup.rs | 7 +- .../manager/src/image_download_handler.rs | 72 +++++++++++++++--- .../edge/infra/client/manager/src/main.rs | 1 - .../client/manager/src/pull_addr_handler.rs | 5 +- .../infra/client/manager/src/utils/mod.rs | 41 ---------- packages/edge/infra/guard/core/src/metrics.rs | 2 +- .../edge/infra/guard/core/tests/common/mod.rs | 2 +- .../core/tests/streaming_response_test.rs | 74 +++++++++++-------- .../infra/guard/server/src/routing/api.rs | 6 +- .../edge/services/pegboard/src/protocol.rs | 26 +++---- .../toolchain/cli/src/commands/actor/logs.rs | 2 +- .../cli/src/commands/function/endpoint.rs | 1 - .../cli/src/commands/function/list.rs | 1 - .../cli/src/commands/route/endpoint.rs | 63 ++++++++++------ .../toolchain/cli/src/commands/route/list.rs | 2 +- .../toolchain/cli/src/commands/route/mod.rs | 2 +- packages/toolchain/cli/src/util/deploy.rs | 1 - .../toolchain/src/util/actor/logs.rs | 13 ++-- .../toolchain/src/util/docker/build_remote.rs | 67 ++++++++--------- .../toolchain/src/util/docker/mod.rs | 2 +- 39 files changed, 303 insertions(+), 263 deletions(-) diff --git a/packages/common/api-helper/build/src/macro_util.rs b/packages/common/api-helper/build/src/macro_util.rs index 03d556e0e7..20841ad68a 100644 --- a/packages/common/api-helper/build/src/macro_util.rs +++ b/packages/common/api-helper/build/src/macro_util.rs @@ -237,7 +237,7 @@ pub fn __deserialize_query(route: &Url) -> GlobalRes } #[doc(hidden)] -#[tracing::instrument(skip_all, name="setup_ctx")] +#[tracing::instrument(skip_all, name = "setup_ctx")] pub async fn __with_ctx< A: auth::ApiAuth + Send, DB: chirp_workflow::db::Database + Sync + 'static, diff --git a/packages/common/clickhouse-inserter/src/error.rs b/packages/common/clickhouse-inserter/src/error.rs index 9d389ecabb..eb5a600f98 100644 --- a/packages/common/clickhouse-inserter/src/error.rs +++ b/packages/common/clickhouse-inserter/src/error.rs @@ -2,15 +2,15 @@ use thiserror::Error; #[derive(Debug, Error)] pub enum Error { - #[error("failed to send event to ClickHouse inserter")] - ChannelSendError, + #[error("failed to send event to ClickHouse inserter")] + ChannelSendError, - #[error("serialization error: {0}")] - SerializationError(#[source] serde_json::Error), + #[error("serialization error: {0}")] + SerializationError(#[source] serde_json::Error), - #[error("failed to build reqwest client: {0}")] - ReqwestBuildError(#[source] reqwest::Error), + #[error("failed to build reqwest client: {0}")] + ReqwestBuildError(#[source] reqwest::Error), - #[error("failed to spawn background task")] - TaskSpawnError, -} \ No newline at end of file + #[error("failed to spawn background task")] + TaskSpawnError, +} diff --git a/packages/common/config/src/config/server/rivet/mod.rs b/packages/common/config/src/config/server/rivet/mod.rs index 9ddaedf7f8..1e53d14b94 100644 --- a/packages/common/config/src/config/server/rivet/mod.rs +++ b/packages/common/config/src/config/server/rivet/mod.rs @@ -177,7 +177,7 @@ impl Rivet { } } -impl Rivet { +impl Rivet { pub fn default_cluster_id(&self) -> GlobalResult { if let Some(default_cluster_id) = self.default_cluster_id { ensure!( @@ -190,7 +190,9 @@ impl Rivet { // Return default development clusters AccessKind::Development => Ok(default_dev_cluster::CLUSTER_ID), // No cluster configured - AccessKind::Public | AccessKind::Private => bail!("`default_cluster_id` not configured"), + AccessKind::Public | AccessKind::Private => { + bail!("`default_cluster_id` not configured") + } } } } diff --git a/packages/common/pools/src/db/redis.rs b/packages/common/pools/src/db/redis.rs index 6e377f8c25..5ef0bfa3dc 100644 --- a/packages/common/pools/src/db/redis.rs +++ b/packages/common/pools/src/db/redis.rs @@ -65,4 +65,4 @@ pub async fn setup(config: Config) -> Result, Error> tracing::debug!("redis connected"); Ok(redis) -} \ No newline at end of file +} diff --git a/packages/common/pools/src/error.rs b/packages/common/pools/src/error.rs index 7425a3c692..77617c5c42 100644 --- a/packages/common/pools/src/error.rs +++ b/packages/common/pools/src/error.rs @@ -74,7 +74,7 @@ pub enum Error { } impl From for Error { - fn from(err: global_error::GlobalError) -> Self { - Error::Global(err) - } + fn from(err: global_error::GlobalError) -> Self { + Error::Global(err) + } } diff --git a/packages/common/pools/src/pools.rs b/packages/common/pools/src/pools.rs index bc5623411e..04fbaa7fa9 100644 --- a/packages/common/pools/src/pools.rs +++ b/packages/common/pools/src/pools.rs @@ -46,8 +46,9 @@ impl Pools { let clickhouse_inserter = if let Some(vector_http) = config.server.as_ref().and_then(|x| x.vector_http.as_ref()) { - let inserter = clickhouse_inserter::create_inserter(&vector_http.host, vector_http.port) - .map_err(Error::BuildClickHouseInserter)?; + let inserter = + clickhouse_inserter::create_inserter(&vector_http.host, vector_http.port) + .map_err(Error::BuildClickHouseInserter)?; Some(inserter) } else { None diff --git a/packages/common/pools/src/prelude.rs b/packages/common/pools/src/prelude.rs index 48d7a39030..b729a9f96d 100644 --- a/packages/common/pools/src/prelude.rs +++ b/packages/common/pools/src/prelude.rs @@ -4,7 +4,7 @@ pub use redis; pub use sqlx; pub use crate::{ - ClickHouseInserterHandle, ClickHousePool, CrdbPool, FdbPool, NatsPool, RedisPool, SqlitePool, __sql_query, - __sql_query_as, __sql_query_as_raw, sql_execute, sql_fetch, sql_fetch_all, sql_fetch_many, - sql_fetch_one, sql_fetch_optional, + ClickHouseInserterHandle, ClickHousePool, CrdbPool, FdbPool, NatsPool, RedisPool, SqlitePool, + __sql_query, __sql_query_as, __sql_query_as_raw, sql_execute, sql_fetch, sql_fetch_all, + sql_fetch_many, sql_fetch_one, sql_fetch_optional, }; diff --git a/packages/common/util/core/src/format.rs b/packages/common/util/core/src/format.rs index 7b95d1b9a3..82bc65b15c 100644 --- a/packages/common/util/core/src/format.rs +++ b/packages/common/util/core/src/format.rs @@ -162,7 +162,7 @@ pub fn duration(ms: i64, relative: bool) -> String { let hours = (ms % 86_400_000) / 3_600_000; let minutes = (ms % 3_600_000) / 60_000; let seconds = (ms % 60_000) / 1_000; - + if days > 0 { parts.push(format!("{days}d")); } @@ -181,5 +181,5 @@ pub fn duration(ms: i64, relative: bool) -> String { parts.push("ago".to_string()); } - parts.join(" ") + parts.join(" ") } diff --git a/packages/core/api/actor/src/route/builds.rs b/packages/core/api/actor/src/route/builds.rs index 7b95d8c6f6..e4c8e6935d 100644 --- a/packages/core/api/actor/src/route/builds.rs +++ b/packages/core/api/actor/src/route/builds.rs @@ -517,13 +517,9 @@ pub async fn complete_build( ..Default::default() }; - edge_intercom_pegboard_prewarm_image( - &config, - &build_id.to_string(), - json!({}), - ) - .await - .map_err(Into::::into) + edge_intercom_pegboard_prewarm_image(&config, &build_id.to_string(), json!({})) + .await + .map_err(Into::::into) } }) .buffer_unordered(16) diff --git a/packages/core/api/provision/src/route/datacenters.rs b/packages/core/api/provision/src/route/datacenters.rs index d5fac34aa4..4008335624 100644 --- a/packages/core/api/provision/src/route/datacenters.rs +++ b/packages/core/api/provision/src/route/datacenters.rs @@ -59,11 +59,7 @@ pub async fn servers( let servers_res = ctx .op(cluster::ops::datacenter::server_discovery::Input { datacenter_id, - pool_types: query - .pools - .into_iter() - .map(ApiInto::api_into) - .collect(), + pool_types: query.pools.into_iter().map(ApiInto::api_into).collect(), }) .await?; diff --git a/packages/core/api/status/src/route/actor.rs b/packages/core/api/status/src/route/actor.rs index 441a7c4ba4..d19258840a 100644 --- a/packages/core/api/status/src/route/actor.rs +++ b/packages/core/api/status/src/route/actor.rs @@ -231,7 +231,10 @@ pub async fn status( _ => { bail_with!( INTERNAL_STATUS_CHECK_FAILED, - error = format!("unknown request error: {:?} {:?}", content.status, content.content) + error = format!( + "unknown request error: {:?} {:?}", + content.status, content.content + ) ); } }, @@ -277,7 +280,7 @@ pub async fn status( .instrument(tracing::info_span!("actor_destroy_request", base_path=%config.base_path)) .await { - Ok(_res) => {}, + Ok(_res) => {} Err(rivet_api::apis::Error::ResponseError(content)) => match content.entity { Some(Status400(body)) | Some(Status403(body)) @@ -293,7 +296,10 @@ pub async fn status( _ => { bail_with!( INTERNAL_STATUS_CHECK_FAILED, - error = format!("unknown request error: {:?} {:?}", content.status, content.content) + error = format!( + "unknown request error: {:?} {:?}", + content.status, content.content + ) ); } }, diff --git a/packages/core/api/ui/src/route.rs b/packages/core/api/ui/src/route.rs index 02f7011ab6..51c14f420a 100644 --- a/packages/core/api/ui/src/route.rs +++ b/packages/core/api/ui/src/route.rs @@ -39,19 +39,23 @@ impl Router { // Build proxy URL by joining request path to base URL let mut proxy_url = config.server()?.rivet.ui.proxy_origin().clone(); - + // Remove leading slash from request path since join() expects relative paths - let request_path = request.uri().path().strip_prefix('/').unwrap_or(request.uri().path()); - + let request_path = request + .uri() + .path() + .strip_prefix('/') + .unwrap_or(request.uri().path()); + // Join the request path to the base URL proxy_url = match proxy_url.join(request_path) { Ok(url) => url, Err(e) => bail!("Failed to build proxy URL: {}", e), }; - + // Set query string if present proxy_url.set_query(request.uri().query()); - + let full_proxy_url = proxy_url.to_string(); tracing::debug!( @@ -77,10 +81,13 @@ impl Router { // Forward body for non-GET requests if request.method() != Method::GET && request.method() != Method::HEAD { - let body_bytes = match hyper::body::to_bytes(std::mem::replace(request.body_mut(), Body::empty())).await { - Ok(bytes) => bytes, - Err(e) => bail!("Failed to read request body: {}", e), - }; + let body_bytes = + match hyper::body::to_bytes(std::mem::replace(request.body_mut(), Body::empty())) + .await + { + Ok(bytes) => bytes, + Err(e) => bail!("Failed to read request body: {}", e), + }; req_builder = req_builder.body(body_bytes.to_vec()); } diff --git a/packages/core/services/build/src/ops/resolve_for_tags.rs b/packages/core/services/build/src/ops/resolve_for_tags.rs index 81daf697a5..ec64ecdab8 100644 --- a/packages/core/services/build/src/ops/resolve_for_tags.rs +++ b/packages/core/services/build/src/ops/resolve_for_tags.rs @@ -27,25 +27,21 @@ pub async fn build_resolve_for_tags(ctx: &OperationCtx, input: &Input) -> Global unwrap!( ctx.cache() .ttl(util::duration::seconds(15)) - .fetch_one_json( - "build", - (input.env_id, tags_str.as_str()), - { + .fetch_one_json("build", (input.env_id, tags_str.as_str()), { + let ctx = ctx.clone(); + let tags_str = tags_str.clone(); + move |mut cache, key| { let ctx = ctx.clone(); let tags_str = tags_str.clone(); - move |mut cache, key| { - let ctx = ctx.clone(); - let tags_str = tags_str.clone(); - async move { - let builds = get_builds(&ctx, input.env_id, &tags_str).await?; + async move { + let builds = get_builds(&ctx, input.env_id, &tags_str).await?; - cache.resolve(&key, builds); + cache.resolve(&key, builds); - Ok(cache) - } + Ok(cache) } } - ) + }) .await? ) }; diff --git a/packages/core/services/cluster/src/ops/datacenter/mod.rs b/packages/core/services/cluster/src/ops/datacenter/mod.rs index 5a8a194479..712a6edc5b 100644 --- a/packages/core/services/cluster/src/ops/datacenter/mod.rs +++ b/packages/core/services/cluster/src/ops/datacenter/mod.rs @@ -1,8 +1,8 @@ -pub mod server_discovery; pub mod get; pub mod list; pub mod location_get; pub mod resolve_for_name_id; +pub mod server_discovery; pub mod server_spec_get; pub mod tls_get; pub mod topology_get; diff --git a/packages/core/services/cluster/src/ops/datacenter/server_discovery.rs b/packages/core/services/cluster/src/ops/datacenter/server_discovery.rs index 0e70a1a81b..d6a53b82b1 100644 --- a/packages/core/services/cluster/src/ops/datacenter/server_discovery.rs +++ b/packages/core/services/cluster/src/ops/datacenter/server_discovery.rs @@ -17,7 +17,10 @@ pub struct Output { /// Wrapper around server::list with very short cache. #[operation] -pub async fn cluster_datacenter_server_discovery(ctx: &OperationCtx, input: &Input) -> GlobalResult { +pub async fn cluster_datacenter_server_discovery( + ctx: &OperationCtx, + input: &Input, +) -> GlobalResult { let cache_keys = if input.pool_types.is_empty() { vec![(input.datacenter_id, "all".to_string())] } else { diff --git a/packages/core/services/dynamic-config/src/ops/get_config.rs b/packages/core/services/dynamic-config/src/ops/get_config.rs index 45025494f8..c27e206d13 100644 --- a/packages/core/services/dynamic-config/src/ops/get_config.rs +++ b/packages/core/services/dynamic-config/src/ops/get_config.rs @@ -19,12 +19,11 @@ pub async fn get_config(ctx: &OperationCtx, input: &Input) -> GlobalResult>()? - .timestamp_millis(), - }) - .to_workflow::() - .tag("client_id", client_id) - .send() - .await; + let res = ctx + .signal(pegboard::workflows::client::Drain { + drain_timeout_ts: unwrap_with!( + body.drain_complete_ts, + API_BAD_BODY, + error = "missing `drain_complete_ts`" + ) + .parse::>()? + .timestamp_millis(), + }) + .to_workflow::() + .tag("client_id", client_id) + .send() + .await; if let Some(WorkflowError::WorkflowNotFound) = res.as_workflow_error() { tracing::warn!( @@ -151,7 +152,8 @@ pub async fn toggle_drain_client( res?; } } else { - let res = ctx.signal(pegboard::workflows::client::Undrain {}) + let res = ctx + .signal(pegboard::workflows::client::Undrain {}) .to_workflow::() .tag("client_id", client_id) .send() diff --git a/packages/edge/infra/client/config/src/manager.rs b/packages/edge/infra/client/config/src/manager.rs index 7638c57e1f..787e7f8a06 100644 --- a/packages/edge/infra/client/config/src/manager.rs +++ b/packages/edge/infra/client/config/src/manager.rs @@ -101,7 +101,7 @@ pub struct Runner { pub container_runner_binary_path: Option, pub isolate_runner_binary_path: Option, - + /// Custom host entries to append to /etc/hosts in actor containers. #[serde(default)] pub custom_hosts: Option>, diff --git a/packages/edge/infra/client/manager/src/actor/setup.rs b/packages/edge/infra/client/manager/src/actor/setup.rs index b995d19759..bd4fab3fd1 100644 --- a/packages/edge/infra/client/manager/src/actor/setup.rs +++ b/packages/edge/infra/client/manager/src/actor/setup.rs @@ -776,12 +776,13 @@ fn build_hosts_content(ctx: &Ctx) -> String { 127.0.0.1 localhost ::1 localhost ip6-localhost ip6-loopback " - ).to_string(); - + ) + .to_string(); + for host_entry in ctx.config().runner.custom_hosts() { content.push_str(&format!("{}\t{}\n", host_entry.ip, host_entry.hostname)); } - + content } diff --git a/packages/edge/infra/client/manager/src/image_download_handler.rs b/packages/edge/infra/client/manager/src/image_download_handler.rs index 6deed48840..0a8e051dbd 100644 --- a/packages/edge/infra/client/manager/src/image_download_handler.rs +++ b/packages/edge/infra/client/manager/src/image_download_handler.rs @@ -220,11 +220,18 @@ impl ImageDownloadHandler { // Release lock on sqlite pool drop(conn); - self.download_inner(ctx, image_config).await?; + // Download image & compute size + // + // `image_size` is a slight over-estimate of the image size, since this is + // counting the number of bytes read from the tar. This is fine since + // over-estimating is safe for caching. + let download_start_instant = Instant::now(); + let image_size = self.download_inner(ctx, image_config).await?; + let download_duration = download_start_instant.elapsed().as_secs_f64(); + + let convert_start_instant = Instant::now(); self.convert(ctx, image_config).await?; - - // Calculate dir size after unpacking image and save to db - let image_size = utils::total_dir_size(&image_path).await?; + let convert_duration = convert_start_instant.elapsed().as_secs_f64(); // Update metrics after unpacking metrics::IMAGE_CACHE_SIZE.set(images_dir_size + image_size as i64 - removed_bytes); @@ -245,9 +252,14 @@ impl ImageDownloadHandler { .execute(&mut *ctx.sql().await?) .await?; - let duration = start_instant.elapsed().as_secs_f64(); - crate::metrics::DOWNLOAD_IMAGE_DURATION.observe(duration); - tracing::info!(duration_seconds = duration, "image download completed"); + let total_duration = start_instant.elapsed().as_secs_f64(); + crate::metrics::DOWNLOAD_IMAGE_DURATION.observe(total_duration); + tracing::info!( + total_duration, + download_duration, + convert_duration, + "image download completed" + ); // The lock on entry is held until this point. After this any other parallel downloaders will // continue with the image already downloaded @@ -258,7 +270,7 @@ impl ImageDownloadHandler { Ok(()) } - async fn download_inner(&self, ctx: &Ctx, image_config: &protocol::Image) -> Result<()> { + async fn download_inner(&self, ctx: &Ctx, image_config: &protocol::Image) -> Result { let image_path = ctx.image_path(image_config.id); let addresses = self.get_image_addresses(ctx, image_config).await?; @@ -318,7 +330,7 @@ impl ImageDownloadHandler { // Use curl piped to tar for extraction format!( - "curl -sSfL '{}' | tar -x -C '{}'", + "curl -sSfL '{}' | tar -x --totals -C '{}'", url, image_path.display() ) @@ -336,7 +348,7 @@ impl ImageDownloadHandler { // Use curl piped to lz4 for decompression, then to tar for extraction format!( - "curl -sSfL '{}' | lz4 -d | tar -x -C '{}'", + "curl -sSfL '{}' | lz4 -d | tar -x --totals -C '{}'", url, image_path.display() ) @@ -349,9 +361,27 @@ impl ImageDownloadHandler { match cmd_result { Ok(output) if output.status.success() => { - tracing::info!(image_id=?image_config.id, ?url, "successfully downloaded image"); + // Parse bytes read from tar to get dir size efficiently + // + // This is an over-estimate since the size on disk is smaller than the actual + // tar + let stderr = String::from_utf8_lossy(&output.stderr); + let bytes_read = match parse_tar_total_bytes(&stderr) { + Some(x) => x, + None => { + tracing::error!(%stderr, "failed to parse bytes read from tar output"); + bail!("failed to parse bytes read from tar output") + } + }; - return Ok(()); + tracing::info!( + image_id=?image_config.id, + ?url, + bytes_read=?bytes_read, + "successfully downloaded image" + ); + + return Ok(bytes_read); } Ok(output) => { // Command ran but failed @@ -535,3 +565,21 @@ impl ImageDownloadHandler { bail!("artifact url could not be resolved"); } } + +/// Parses total bytes read from tar output. +fn parse_tar_total_bytes(stderr: &str) -> Option { + // Example: "Total bytes read: 646737920 (617MiB, 339MiB/s)" + for line in stderr.lines() { + if line.starts_with("Total bytes read: ") { + if let Some(bytes_str) = line.strip_prefix("Total bytes read: ") { + if let Some(space_pos) = bytes_str.find(' ') { + let bytes_part = &bytes_str[..space_pos]; + if let Ok(bytes) = bytes_part.parse::() { + return Some(bytes); + } + } + } + } + } + None +} diff --git a/packages/edge/infra/client/manager/src/main.rs b/packages/edge/infra/client/manager/src/main.rs index e96d05e211..7a2d7b6b52 100644 --- a/packages/edge/infra/client/manager/src/main.rs +++ b/packages/edge/infra/client/manager/src/main.rs @@ -193,7 +193,6 @@ async fn run(init: Init, first: bool) -> Result<()> { async { metrics_thread.await?.map_err(Into::into) }, ctx.run(rx), )?; - Ok(()) } diff --git a/packages/edge/infra/client/manager/src/pull_addr_handler.rs b/packages/edge/infra/client/manager/src/pull_addr_handler.rs index 7e8b1285f7..867018e618 100644 --- a/packages/edge/infra/client/manager/src/pull_addr_handler.rs +++ b/packages/edge/infra/client/manager/src/pull_addr_handler.rs @@ -1,4 +1,7 @@ -use std::{net::Ipv4Addr, time::{Duration, Instant}}; +use std::{ + net::Ipv4Addr, + time::{Duration, Instant}, +}; use anyhow::*; use pegboard_config::{Addresses, Client}; diff --git a/packages/edge/infra/client/manager/src/utils/mod.rs b/packages/edge/infra/client/manager/src/utils/mod.rs index 3f3c98705b..01a8cee22d 100644 --- a/packages/edge/infra/client/manager/src/utils/mod.rs +++ b/packages/edge/infra/client/manager/src/utils/mod.rs @@ -370,44 +370,3 @@ pub async fn copy_dir_all, Q: AsRef>(src: P, dst: Q) -> Res Ok(()) } - -/// Calculates the total size of a folder in bytes. -pub async fn total_dir_size>(path: P) -> Result { - let path = path.as_ref(); - - ensure!(path.is_dir(), "path is not a directory: {}", path.display()); - - let mut total_size = 0; - let mut read_dir = fs::read_dir(path).await.context("failed to read dir")?; - - while let Some(entry) = read_dir.next_entry().await.transpose() { - let entry = match entry { - Ok(entry) => entry, - Err(err) => { - tracing::debug!(?err, "failed to read entry"); - continue; - } - }; - let entry_path = entry.path(); - - if entry_path.is_dir() { - match Box::pin(total_dir_size(entry_path)).await { - Ok(size) => total_size += size, - Err(err) => { - tracing::debug!(?err, p=?entry.path().display(), "failed to calculate size for directory"); - continue; - } - } - } else { - match fs::metadata(entry_path).await { - Ok(metadata) => total_size += metadata.len(), - Err(err) => { - tracing::debug!(?err, p=?entry.path().display(), "failed to get metadata for file"); - continue; - } - } - } - } - - Ok(total_size) -} diff --git a/packages/edge/infra/guard/core/src/metrics.rs b/packages/edge/infra/guard/core/src/metrics.rs index ae1f3c0ced..6a91107e66 100644 --- a/packages/edge/infra/guard/core/src/metrics.rs +++ b/packages/edge/infra/guard/core/src/metrics.rs @@ -1,5 +1,5 @@ use lazy_static::lazy_static; -use rivet_metrics::{prometheus::*, REGISTRY, BUCKETS}; +use rivet_metrics::{prometheus::*, BUCKETS, REGISTRY}; lazy_static! { // MARK: Internal diff --git a/packages/edge/infra/guard/core/tests/common/mod.rs b/packages/edge/infra/guard/core/tests/common/mod.rs index 20f5a77436..68fa4bef65 100644 --- a/packages/edge/infra/guard/core/tests/common/mod.rs +++ b/packages/edge/infra/guard/core/tests/common/mod.rs @@ -552,7 +552,7 @@ pub async fn start_guard_with_middleware( routing_fn_clone, middleware_fn_clone, rivet_guard_core::proxy_service::PortType::Http, // Default port type for tests - None, // No ClickHouse inserter for tests + None, // No ClickHouse inserter for tests )); // Run the server until shutdown signal diff --git a/packages/edge/infra/guard/core/tests/streaming_response_test.rs b/packages/edge/infra/guard/core/tests/streaming_response_test.rs index c843ea6368..25e14bf53e 100644 --- a/packages/edge/infra/guard/core/tests/streaming_response_test.rs +++ b/packages/edge/infra/guard/core/tests/streaming_response_test.rs @@ -1,7 +1,8 @@ mod common; use bytes::Bytes; -use http_body_util::{Full, BodyExt}; +use futures_util::StreamExt; +use http_body_util::{BodyExt, Full}; use hyper::service::service_fn; use hyper::{body::Incoming, Request, Response, StatusCode}; use hyper_util::rt::TokioIo; @@ -9,13 +10,10 @@ use std::net::SocketAddr; use std::time::Duration; use tokio::net::TcpListener; use tokio::sync::mpsc; -use futures_util::StreamExt; -use common::{ - create_test_config, start_guard, init_tracing, -}; +use common::{create_test_config, init_tracing, start_guard}; use rivet_guard_core::proxy_service::{ - RouteConfig, RouteTarget, RoutingOutput, RoutingTimeout, RoutingFn, + RouteConfig, RouteTarget, RoutingFn, RoutingOutput, RoutingTimeout, }; use uuid::Uuid; @@ -44,16 +42,15 @@ async fn test_streaming_response_should_timeout() { let test_timeout = Duration::from_secs(3); // Create an HTTP client to make requests to the guard proxy (not directly to our server) - let client = hyper_util::client::legacy::Client::builder( - hyper_util::rt::TokioExecutor::new() - ).build_http(); + let client = hyper_util::client::legacy::Client::builder(hyper_util::rt::TokioExecutor::new()) + .build_http(); // Construct the request URI pointing to the guard proxy let uri = format!("http://{}/stream", guard_addr); let request = Request::builder() .method("GET") .uri(&uri) - .header("Host", "example.com") // Required for routing + .header("Host", "example.com") // Required for routing .header("Accept", "text/event-stream") .body(Full::::new(Bytes::new())) .expect("Failed to build request"); @@ -72,24 +69,28 @@ async fn test_streaming_response_should_timeout() { match response_result { Ok(Ok(response)) => { println!("✅ Got response immediately: {}", response.status()); - + // If we get here, streaming is working. Let's verify we can read data let (parts, body) = response.into_parts(); - + // Try to read the first chunk with a timeout let mut body_stream = body.into_data_stream(); - let first_chunk_result = tokio::time::timeout( - Duration::from_millis(500), - body_stream.next() - ).await; - + let first_chunk_result = + tokio::time::timeout(Duration::from_millis(500), body_stream.next()).await; + match first_chunk_result { Ok(Some(Ok(chunk))) => { let chunk_str = String::from_utf8_lossy(&chunk); println!("✅ Received first chunk: {}", chunk_str); - assert!(chunk_str.contains("data: "), "Chunk should contain streaming data"); - println!("✅ Streaming is working! Received {} bytes of data", chunk.len()); - + assert!( + chunk_str.contains("data: "), + "Chunk should contain streaming data" + ); + println!( + "✅ Streaming is working! Received {} bytes of data", + chunk.len() + ); + // If we got this far, streaming is working correctly! // The test was designed to timeout if streaming was broken } @@ -109,8 +110,13 @@ async fn test_streaming_response_should_timeout() { } Err(_) => { // This is what we expect to happen when streaming is broken - println!("❌ Test timed out after {}s - streaming is NOT working!", test_timeout.as_secs()); - println!("❌ This indicates the proxy is buffering the entire response before returning it"); + println!( + "❌ Test timed out after {}s - streaming is NOT working!", + test_timeout.as_secs() + ); + println!( + "❌ This indicates the proxy is buffering the entire response before returning it" + ); panic!("Streaming response test timed out - proxy is buffering responses instead of streaming"); } } @@ -122,7 +128,7 @@ async fn test_streaming_response_should_timeout() { async fn start_streaming_server() -> (SocketAddr, mpsc::Sender) { // Create a channel for sending messages to the streaming endpoint let (message_tx, _message_rx) = mpsc::channel::(100); - + // Bind to a random port let listener = TcpListener::bind("127.0.0.1:0") .await @@ -156,7 +162,11 @@ async fn start_streaming_server() -> (SocketAddr, mpsc::Sender) { tokio::spawn(async move { let service = service_fn(move |req: Request| { async move { - println!("Streaming server: Received request: {} {}", req.method(), req.uri()); + println!( + "Streaming server: Received request: {} {}", + req.method(), + req.uri() + ); // Check if this is a streaming request if req.uri().path() != "/stream" { @@ -173,19 +183,19 @@ async fn start_streaming_server() -> (SocketAddr, mpsc::Sender) { // Create a large response that will take time to fully buffer // This simulates streaming behavior - the proxy should return this immediately // but if it buffers, it will wait for the full response - + // Create a large response to simulate a slow streaming endpoint let mut large_data = String::new(); large_data.push_str("data: stream-started\n\n"); - + // Add a lot of data to make buffering take noticeable time for i in 0..1000 { large_data.push_str(&format!("data: chunk-{}\n\n", i)); } - + // Add a delay to simulate network latency tokio::time::sleep(Duration::from_millis(500)).await; - + println!("Streaming server: Returning large streaming response"); Ok(Response::builder() .status(StatusCode::OK) @@ -218,11 +228,11 @@ async fn start_streaming_server() -> (SocketAddr, mpsc::Sender) { fn create_streaming_routing_fn(server_addr: SocketAddr) -> RoutingFn { std::sync::Arc::new( move |_hostname: &str, - path: &str, - _port_type: rivet_guard_core::proxy_service::PortType| { + path: &str, + _port_type: rivet_guard_core::proxy_service::PortType| { Box::pin(async move { println!("Guard: Routing request - path: {}", path); - + if path == "/stream" { let target = RouteTarget { actor_id: Some(Uuid::new_v4()), @@ -249,4 +259,4 @@ fn create_streaming_routing_fn(server_addr: SocketAddr) -> RoutingFn { }) }, ) -} \ No newline at end of file +} diff --git a/packages/edge/infra/guard/server/src/routing/api.rs b/packages/edge/infra/guard/server/src/routing/api.rs index c34655c952..4b07e2d10e 100644 --- a/packages/edge/infra/guard/server/src/routing/api.rs +++ b/packages/edge/infra/guard/server/src/routing/api.rs @@ -46,7 +46,11 @@ pub async fn route_api_request( // NOTE: We use service discovery instead of server::list or datacenter::server_discovery because cache is not // shared between edge-edge or edge-core. SD requests the core which has a single cache. - let url = Url::parse(&format!("http://127.0.0.1:{TUNNEL_API_EDGE_PORT}/provision/datacenters/{dc_id}/servers?pools=worker"))?; + let edge = ctx.config().server()?.rivet.edge()?; + let url = Url::parse(&format!( + "{}provision/datacenters/{dc_id}/servers?pools=worker", + edge.intercom_endpoint + ))?; let sd = ServiceDiscovery::new(url); let servers = sd.fetch().await?; diff --git a/packages/edge/services/pegboard/src/protocol.rs b/packages/edge/services/pegboard/src/protocol.rs index 1c0052dba5..fb13aa1530 100644 --- a/packages/edge/services/pegboard/src/protocol.rs +++ b/packages/edge/services/pegboard/src/protocol.rs @@ -133,13 +133,13 @@ pub enum ImageKind { } impl From for ImageKind { - fn from(kind: build::types::BuildKind) -> Self { - match kind { - build::types::BuildKind::DockerImage => ImageKind::DockerImage, - build::types::BuildKind::OciBundle => ImageKind::OciBundle, - build::types::BuildKind::JavaScript => ImageKind::JavaScript, - } - } + fn from(kind: build::types::BuildKind) -> Self { + match kind { + build::types::BuildKind::DockerImage => ImageKind::DockerImage, + build::types::BuildKind::OciBundle => ImageKind::OciBundle, + build::types::BuildKind::JavaScript => ImageKind::JavaScript, + } + } } impl ImageKind { @@ -159,12 +159,12 @@ pub enum ImageCompression { } impl From for ImageCompression { - fn from(compression: build::types::BuildCompression) -> Self { - match compression { - build::types::BuildCompression::None => ImageCompression::None, - build::types::BuildCompression::Lz4 => ImageCompression::Lz4, - } - } + fn from(compression: build::types::BuildCompression) -> Self { + match compression { + build::types::BuildCompression::None => ImageCompression::None, + build::types::BuildCompression::Lz4 => ImageCompression::Lz4, + } + } } #[derive(Debug, Serialize, Deserialize, Clone, Hash)] diff --git a/packages/toolchain/cli/src/commands/actor/logs.rs b/packages/toolchain/cli/src/commands/actor/logs.rs index d502cf8a79..662df560f1 100644 --- a/packages/toolchain/cli/src/commands/actor/logs.rs +++ b/packages/toolchain/cli/src/commands/actor/logs.rs @@ -52,7 +52,7 @@ impl Opts { .unwrap_or(toolchain::util::actor::logs::LogStream::All), follow: !self.no_follow, print_type, - exit_on_ctrl_c: true + exit_on_ctrl_c: true, }, ) .await?; diff --git a/packages/toolchain/cli/src/commands/function/endpoint.rs b/packages/toolchain/cli/src/commands/function/endpoint.rs index f3c13bdcab..0a07b305a4 100644 --- a/packages/toolchain/cli/src/commands/function/endpoint.rs +++ b/packages/toolchain/cli/src/commands/function/endpoint.rs @@ -61,4 +61,3 @@ async fn get_route( Ok(matching_route) } - diff --git a/packages/toolchain/cli/src/commands/function/list.rs b/packages/toolchain/cli/src/commands/function/list.rs index 45a8a81689..7a1af78029 100644 --- a/packages/toolchain/cli/src/commands/function/list.rs +++ b/packages/toolchain/cli/src/commands/function/list.rs @@ -35,4 +35,3 @@ impl Opts { Ok(()) } } - diff --git a/packages/toolchain/cli/src/commands/route/endpoint.rs b/packages/toolchain/cli/src/commands/route/endpoint.rs index d941854370..d3afc7c24e 100644 --- a/packages/toolchain/cli/src/commands/route/endpoint.rs +++ b/packages/toolchain/cli/src/commands/route/endpoint.rs @@ -41,10 +41,10 @@ impl Opts { pub async fn execute(&self) -> Result<()> { let ctx = crate::util::login::load_or_login().await?; let env = crate::util::env::get_or_select(&ctx, self.environment.as_ref()).await?; - + // Get existing route if it exists let route = get_route(&ctx, &env, &self.name).await?; - + // Parse selector tags let selector_tags = self .selector_tags @@ -55,20 +55,26 @@ impl Opts { // Build route update body let mut update_route_body = models::RoutesUpdateRouteBody { - hostname: route.as_ref().map(|r| r.hostname.clone()).unwrap_or_else(|| { - // Default hostname is project-env.domain - format!( - "{}-{}.{}", - ctx.project.name_id, - env, - ctx.bootstrap - .domains - .job - .as_ref() - .expect("bootstrap.domains.job") - ) - }), - path: route.as_ref().map(|r| r.path.clone()).unwrap_or_else(|| "/".to_string()), + hostname: route + .as_ref() + .map(|r| r.hostname.clone()) + .unwrap_or_else(|| { + // Default hostname is project-env.domain + format!( + "{}-{}.{}", + ctx.project.name_id, + env, + ctx.bootstrap + .domains + .job + .as_ref() + .expect("bootstrap.domains.job") + ) + }), + path: route + .as_ref() + .map(|r| r.path.clone()) + .unwrap_or_else(|| "/".to_string()), route_subpaths: route.as_ref().map(|r| r.route_subpaths).unwrap_or(true), strip_prefix: route.as_ref().map(|r| r.strip_prefix).unwrap_or(true), target: Box::new(models::RoutesRouteTarget { @@ -124,15 +130,24 @@ impl Opts { Result::Ok(_) => { println!( "Successfully {} route: {}{}", - if route.is_some() { "updated" } else { "created" }, - update_route_body.hostname, + if route.is_some() { + "updated" + } else { + "created" + }, + update_route_body.hostname, update_route_body.path ); Ok(()) } Err(err) => { - eprintln!("Failed to {}: {}", - if route.is_some() { "update route" } else { "create route" }, + eprintln!( + "Failed to {}: {}", + if route.is_some() { + "update route" + } else { + "create route" + }, err ); Err(err.into()) @@ -142,7 +157,11 @@ impl Opts { } // Helper function to get route if it exists -async fn get_route(ctx: &ToolchainCtx, env: &str, route_id: &str) -> Result> { +async fn get_route( + ctx: &ToolchainCtx, + env: &str, + route_id: &str, +) -> Result> { let routes_response = apis::routes_api::routes_list( &ctx.openapi_config_cloud, Some(&ctx.project.name_id.to_string()), @@ -158,4 +177,4 @@ async fn get_route(ctx: &ToolchainCtx, env: &str, route_id: &str) -> Result opts.execute().await, } } -} \ No newline at end of file +} diff --git a/packages/toolchain/cli/src/util/deploy.rs b/packages/toolchain/cli/src/util/deploy.rs index 4b6a9e585c..a292551cdb 100644 --- a/packages/toolchain/cli/src/util/deploy.rs +++ b/packages/toolchain/cli/src/util/deploy.rs @@ -432,4 +432,3 @@ async fn create_function_route( Ok(()) } - diff --git a/packages/toolchain/toolchain/src/util/actor/logs.rs b/packages/toolchain/toolchain/src/util/actor/logs.rs index f2c6023dd8..adbd1cba04 100644 --- a/packages/toolchain/toolchain/src/util/actor/logs.rs +++ b/packages/toolchain/toolchain/src/util/actor/logs.rs @@ -1,14 +1,14 @@ +use crate::{ + rivet_api::{apis, models}, + ToolchainCtx, +}; use anyhow::*; use base64::{engine::general_purpose::STANDARD, Engine}; -use clap::ValueEnum; use chrono::{DateTime, Utc}; +use clap::ValueEnum; use std::time::Duration; use tokio::signal; use tokio::sync::watch; -use crate::{ - rivet_api::{apis, models}, - ToolchainCtx -}; use uuid::Uuid; #[derive(ValueEnum, Clone)] @@ -46,7 +46,7 @@ pub async fn tail(ctx: &ToolchainCtx, opts: TailOpts<'_>) -> Result<()> { let (stderr_fetched_tx, stderr_fetched_rx) = watch::channel(false); let exit_on_ctrl_c = opts.exit_on_ctrl_c; - + tokio::select! { result = tail_streams(ctx, &opts, stdout_fetched_tx, stderr_fetched_tx) => result, result = poll_actor_state(ctx, &opts, stdout_fetched_rx, stderr_fetched_rx) => result, @@ -140,7 +140,6 @@ async fn tail_stream( } }; - match &opts.print_type { PrintType::Custom(callback) => { (callback)(ts, decoded_line); diff --git a/packages/toolchain/toolchain/src/util/docker/build_remote.rs b/packages/toolchain/toolchain/src/util/docker/build_remote.rs index 7516c78798..b0812690e3 100644 --- a/packages/toolchain/toolchain/src/util/docker/build_remote.rs +++ b/packages/toolchain/toolchain/src/util/docker/build_remote.rs @@ -1,7 +1,13 @@ use anyhow::*; use flate2::{write::GzEncoder, Compression}; use serde_json::json; -use std::{collections::HashMap, io::Write, path::{Path, PathBuf}, result::Result::Ok, time::Duration}; +use std::{ + collections::HashMap, + io::Write, + path::{Path, PathBuf}, + result::Result::Ok, + time::Duration, +}; use tempfile::NamedTempFile; use uuid::Uuid; @@ -129,7 +135,7 @@ async fn get_or_create_ci_namespace( async fn upload_ci_manager_build( ctx: &ToolchainCtx, task: task::TaskCtx, - ci_env: &TEMPEnvironment + ci_env: &TEMPEnvironment, ) -> Result { upload_ci_build(ctx, task, ci_env, "ci-manager", CI_MANAGER_RELEASE_URL).await } @@ -137,7 +143,7 @@ async fn upload_ci_manager_build( async fn upload_ci_runner_build( ctx: &ToolchainCtx, task: task::TaskCtx, - ci_env: &TEMPEnvironment + ci_env: &TEMPEnvironment, ) -> Result { upload_ci_build(ctx, task, ci_env, "ci-runner", CI_RUNNER_RELEASE_URL).await } @@ -382,10 +388,7 @@ async fn get_or_create_ci_manager_actor( /// Moves over build context to a temp directory, /// ignoring all .dockerignore files. -fn prepare_build_context_dir( - build_path: &Path, - dockerfile_path: &Path -) -> Result> { +fn prepare_build_context_dir(build_path: &Path, dockerfile_path: &Path) -> Result> { const DOCKERIGNORE_FILENAME: &str = ".dockerignore"; let mut paths = Vec::new(); @@ -405,18 +408,16 @@ fn prepare_build_context_dir( .add_custom_ignore_filename(DOCKERIGNORE_FILENAME) .parents(true) .build(); - + for entry in walk { let entry = entry?; - if entry.path() == dockerfile_path || entry.path() == &dockerignore_path { + if entry.path() == dockerfile_path || entry.path() == &dockerignore_path { // Skip the Dockerfile or .dockerignore itself, we already added it continue; } - let is_file = entry.file_type() - .map(|ft| ft.is_file()) - .unwrap_or(false); + let is_file = entry.file_type().map(|ft| ft.is_file()).unwrap_or(false); if is_file { let file_path = entry.path(); @@ -452,7 +453,8 @@ async fn create_build_context_archive( // Add the prepared build file paths to the archive for file_path in build_file_paths.iter() { - let relative_path = file_path.strip_prefix(build_path) + let relative_path = file_path + .strip_prefix(build_path) .context("Failed to strip build path prefix")?; tar.append_path_with_name(&file_path, relative_path) @@ -494,9 +496,8 @@ async fn upload_build_context( task.log("[Remote Build] Uploading build context..."); // Serialize build args if provided - let serialized_build_args = serde_json::to_string( - build_arg_flags.as_deref().unwrap_or(&[]) - ).context("Failed to serialize build args")?; + let serialized_build_args = serde_json::to_string(build_arg_flags.as_deref().unwrap_or(&[])) + .context("Failed to serialize build args")?; // Create FormData-like structure using reqwest let form = reqwest::multipart::Form::new() @@ -517,7 +518,7 @@ async fn upload_build_context( .file_name("context.tar.gz") .mime_str("application/gzip")?, ); - + let form = if let Some(target) = build_target { form.text("buildTarget", target.clone()) } else { @@ -657,11 +658,14 @@ async fn poll_build_status( actor_id, stream: crate::util::actor::logs::LogStream::All, follow: true, - print_type: crate::util::actor::logs::PrintType::Custom(handle_build_log_line), - exit_on_ctrl_c: false + print_type: crate::util::actor::logs::PrintType::Custom( + handle_build_log_line, + ), + exit_on_ctrl_c: false, }, ) - .await { + .await + { Ok(_) => { task.log("[Remote Build] Build logs streaming completed."); } @@ -682,26 +686,17 @@ async fn poll_build_status( } } } else { - task.log(format!( - "[Remote Build] Poll failed: HTTP {}", - res.status() - )); + task.log(format!("[Remote Build] Poll failed: HTTP {}", res.status())); } } Err(e) => { - task.log(format!( - "[Remote Build] Poll failed: {}", - e - )); + task.log(format!("[Remote Build] Poll failed: {}", e)); } } } } -fn handle_build_log_line( - _timestamp: chrono::DateTime, - line: String, -) { +fn handle_build_log_line(_timestamp: chrono::DateTime, line: String) { let line = strip_ansi_escape_codes(&line); // If the line starts with INFO[.+], its a Kaniko log line @@ -725,9 +720,7 @@ fn transform_log_line(line: String) -> Option { // If it starts with uppercase word, its probably important // since it's probably a Dockerfile instruction - let first_word = &line.split_whitespace() - .next() - .unwrap_or(""); + let first_word = &line.split_whitespace().next().unwrap_or(""); let is_docker_instruction = first_word .chars() .filter(|c| c.is_alphabetic()) @@ -736,7 +729,7 @@ fn transform_log_line(line: String) -> Option { if is_docker_instruction { return Some(line); } - + if line.starts_with("Unpacking rootfs") { return Some("Initializing image filesystem...".to_string()); } @@ -757,7 +750,7 @@ fn strip_ansi_escape_codes(line: &str) -> String { // as strip_ansi_escapes happens to strip tabs as well. // (See https://github.com/luser/strip-ansi-escapes/issues/20) if line.contains('\x1b') { - return strip_ansi_escapes::strip_str(line).to_string() + return strip_ansi_escapes::strip_str(line).to_string(); } line.to_string() diff --git a/packages/toolchain/toolchain/src/util/docker/mod.rs b/packages/toolchain/toolchain/src/util/docker/mod.rs index c02609481f..1e31281c94 100644 --- a/packages/toolchain/toolchain/src/util/docker/mod.rs +++ b/packages/toolchain/toolchain/src/util/docker/mod.rs @@ -1,7 +1,7 @@ pub mod archive; pub mod build; -pub mod push; pub mod build_remote; +pub mod push; pub mod users; use uuid::Uuid;