From 2c3eaf4c20883b6304e29a69f3eedafeeb7eae41 Mon Sep 17 00:00:00 2001 From: MasterPtato Date: Tue, 22 Jul 2025 01:17:45 +0000 Subject: [PATCH] fix: make pb image downloads random, remove prewarm from pb protocol --- .../install_scripts/files/node_exporter.sh | 2 +- .../edge/api/intercom/src/route/pegboard.rs | 171 ++++++------------ .../manager/src/image_download_handler.rs | 18 +- .../infra/client/manager/src/metrics/mod.rs | 7 + 4 files changed, 73 insertions(+), 125 deletions(-) diff --git a/packages/core/services/cluster/src/workflows/server/install/install_scripts/files/node_exporter.sh b/packages/core/services/cluster/src/workflows/server/install/install_scripts/files/node_exporter.sh index 48eddd719f..b7fa605786 100644 --- a/packages/core/services/cluster/src/workflows/server/install/install_scripts/files/node_exporter.sh +++ b/packages/core/services/cluster/src/workflows/server/install/install_scripts/files/node_exporter.sh @@ -31,7 +31,7 @@ User=node_exporter Group=node_exporter Type=simple # Reduce cardinality -ExecStart=/usr/bin/node_exporter --collector.disable-defaults --collector.cpu --collector.netdev --collector.conntrack --collector.meminfo --collector.filesystem --collector.filesystem.mount-points-exclude=^/opt/nomad/ --collector.netstat --collector.sockstat --collector.tcpstat --collector.network_route --collector.arp --collector.filefd --collector.interrupts --collector.softirqs --collector.processes +ExecStart=/usr/bin/node_exporter --collector.disable-defaults --collector.cpu --collector.netdev --collector.conntrack --collector.meminfo --collector.diskstats --collector.filesystem --collector.filesystem.mount-points-exclude=^/opt/nomad/ --collector.netstat --collector.sockstat --collector.tcpstat --collector.network_route --collector.arp --collector.filefd --collector.interrupts --collector.softirqs --collector.processes Restart=always RestartSec=2 diff --git a/packages/edge/api/intercom/src/route/pegboard.rs b/packages/edge/api/intercom/src/route/pegboard.rs index 9388cd0f21..9cd0ac0acf 100644 --- a/packages/edge/api/intercom/src/route/pegboard.rs +++ b/packages/edge/api/intercom/src/route/pegboard.rs @@ -3,7 +3,7 @@ use chirp_workflow::prelude::*; use cluster::types::BuildDeliveryMethod; use fdb_util::SERIALIZABLE; use foundationdb::{self as fdb, options::StreamingMode}; -use futures_util::TryStreamExt; +use futures_util::StreamExt; use pegboard::protocol; use rivet_api::models; use serde_json::json; @@ -18,51 +18,22 @@ pub async fn prewarm_image( ) -> GlobalResult { ctx.auth().bypass()?; - let client_id = ctx - .fdb() - .await? - .run(|tx, _mc| async move { - let alloc_idx_subspace = pegboard::keys::subspace() - .subspace(&pegboard::keys::datacenter::ClientsByRemainingMemKey::entire_subspace()); - let ping_threshold_ts = - util::timestamp::now() - pegboard::workflows::client::CLIENT_ELIGIBLE_THRESHOLD_MS; - - let mut stream = tx.get_ranges_keyvalues( - fdb::RangeOption { - mode: StreamingMode::Iterator, - ..(&alloc_idx_subspace).into() - }, - SERIALIZABLE, - ); - - while let Some(entry) = stream.try_next().await? { - let key = pegboard::keys::subspace() - .unpack::(entry.key()) - .map_err(|x| fdb::FdbBindingError::CustomError(x.into()))?; - - // Scan by last ping - if key.last_ping_ts < ping_threshold_ts { - continue; - } - - return Ok(Some(key.client_id)); - } - - Ok(None) - }) - .custom_instrument(tracing::info_span!("prewarm_fetch_tx")) - .await?; - - let Some(client_id) = client_id else { - tracing::error!("no eligible clients found to prewarm image"); - return Ok(json!({})); - }; - let dc_id = ctx.config().server()?.rivet.edge()?.datacenter_id; - let (dc_res, builds_res) = tokio::try_join!( + let (dc_res, servers_res, builds_res) = tokio::try_join!( ctx.op(cluster::ops::datacenter::get::Input { datacenter_ids: vec![dc_id], }), + ctx.op(cluster::ops::server::list::Input { + filter: cluster::types::Filter { + datacenter_ids: Some(vec![dc_id]), + pool_types: Some(vec![cluster::types::PoolType::Ats]), + ..Default::default() + }, + include_destroyed: false, + exclude_installing: true, + exclude_draining: true, + exclude_no_vlan: true, + }), ctx.op(build::ops::get::Input { build_ids: vec![image_id], }), @@ -77,45 +48,47 @@ pub async fn prewarm_image( return Ok(json!({})); }; - // Get the artifact size - let uploads_res = op!([ctx] upload_get { - upload_ids: vec![build.upload_id.into()], - }) - .await?; - let upload = unwrap!(uploads_res.uploads.first()); - let artifact_size_bytes = upload.content_length; - - let res = ctx - .signal(pegboard::workflows::client::PrewarmImage2 { - image: protocol::Image { - id: image_id, - artifact_url_stub: pegboard::util::image_artifact_url_stub( - ctx.config(), - build.upload_id, - &build::utils::file_name(build.kind, build.compression), - )?, - // We will never need to fall back to fetching directly from S3. This short - // circuits earlier in the fn. - fallback_artifact_url: None, - artifact_size_bytes, - kind: build.kind.into(), - compression: build.compression.into(), - }, - }) - .to_workflow::() - .tag("client_id", client_id) - .send() - .await; - - if let Some(WorkflowError::WorkflowNotFound) = res.as_workflow_error() { - tracing::warn!( - ?client_id, - "client workflow not found, likely already stopped" - ); - } else { - res?; + if servers_res.servers.is_empty() { + tracing::warn!(?dc_id, "no ats nodes to prewarm"); } + let artifact_url_stub = pegboard::util::image_artifact_url_stub( + ctx.config(), + build.upload_id, + &build::utils::file_name(build.kind, build.compression), + )?; + let client = rivet_pools::reqwest::client().await?; + + futures_util::stream::iter( + servers_res + .servers + .into_iter() + .flat_map(|server| server.lan_ip.map(|lan_ip| (server, lan_ip))), + ) + .map(|(server, lan_ip)| { + let artifact_url_stub = artifact_url_stub.clone(); + let client = client.clone(); + + async move { + if let Err(err) = client + .get(format!("http://{}:8080{}", lan_ip, &artifact_url_stub)) + .send() + .await + .and_then(|res| res.error_for_status()) + { + tracing::error!( + ?err, + server_id=?server.server_id, + build_id=?build.build_id, + "failed prewarming", + ); + } + } + }) + .buffer_unordered(16) + .collect::<()>() + .await; + Ok(json!({})) } @@ -171,43 +144,3 @@ pub async fn toggle_drain_client( Ok(json!({})) } - -async fn resolve_image_fallback_artifact_url( - ctx: &Ctx, - dc_build_delivery_method: BuildDeliveryMethod, - build: &build::types::Build, -) -> GlobalResult> { - if let BuildDeliveryMethod::S3Direct = dc_build_delivery_method { - tracing::debug!("using s3 direct delivery"); - - // Build client - let s3_client = s3_util::Client::with_bucket_and_endpoint( - ctx.config(), - "bucket-build", - s3_util::EndpointKind::EdgeInternal, - ) - .await?; - - let presigned_req = s3_client - .get_object() - .bucket(s3_client.bucket()) - .key(format!( - "{upload_id}/{file_name}", - upload_id = build.upload_id, - file_name = build::utils::file_name(build.kind, build.compression), - )) - .presigned( - s3_util::aws_sdk_s3::presigning::PresigningConfig::builder() - .expires_in(std::time::Duration::from_secs(15 * 60)) - .build()?, - ) - .await?; - - let addr_str = presigned_req.uri().to_string(); - tracing::debug!(addr = %addr_str, "resolved artifact s3 presigned request"); - - Ok(Some(addr_str)) - } else { - Ok(None) - } -} diff --git a/packages/edge/infra/client/manager/src/image_download_handler.rs b/packages/edge/infra/client/manager/src/image_download_handler.rs index 60e0606565..8bfcffc57e 100644 --- a/packages/edge/infra/client/manager/src/image_download_handler.rs +++ b/packages/edge/infra/client/manager/src/image_download_handler.rs @@ -280,6 +280,10 @@ impl ImageDownloadHandler { let image_size = self.download_inner(ctx, image_config).await?; let download_duration = download_start_instant.elapsed().as_secs_f64(); + crate::metrics::DOWNLOAD_IMAGE_RATE + .with_label_values(&[&(start_instant.elapsed().as_nanos() % 100).to_string()]) + .set(image_download_size as f64 / download_duration); + let convert_start_instant = Instant::now(); self.convert(ctx, image_config).await?; let convert_duration = convert_start_instant.elapsed().as_secs_f64(); @@ -531,12 +535,16 @@ impl ImageDownloadHandler { ctx: &Ctx, image_config: &protocol::Image, ) -> Result> { - // Get hash from image id - let mut hasher = DefaultHasher::new(); - hasher.write(image_config.id.as_bytes()); - let hash = hasher.finish(); + // // Get hash from image id + // let mut hasher = DefaultHasher::new(); + // hasher.write(image_config.id.as_bytes()); + // let hash = hasher.finish(); + + // let mut rng = ChaCha12Rng::seed_from_u64(hash); - let mut rng = ChaCha12Rng::seed_from_u64(hash); + // TODO: Replaced hash based randomizer with complete randomness for now + let mut rng = + rand::rngs::StdRng::from_rng(&mut rand::thread_rng()).context("failed creating rng")?; // Shuffle based on hash let mut addresses = self diff --git a/packages/edge/infra/client/manager/src/metrics/mod.rs b/packages/edge/infra/client/manager/src/metrics/mod.rs index 6d99f02893..ae3d993b78 100644 --- a/packages/edge/infra/client/manager/src/metrics/mod.rs +++ b/packages/edge/infra/client/manager/src/metrics/mod.rs @@ -58,6 +58,13 @@ lazy_static::lazy_static! { *REGISTRY, ).unwrap(); + pub static ref DOWNLOAD_IMAGE_RATE: GaugeVec = register_gauge_vec_with_registry!( + "download_image_rate", + "Rate of image download in bytes/sec", + &["bucket"], + *REGISTRY, + ).unwrap(); + // MARK: Actor setup step duration metrics pub static ref SETUP_TOTAL_DURATION: Histogram = register_histogram_with_registry!( "actor_setup_total_duration",