Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ User=node_exporter
Group=node_exporter
Type=simple
# Reduce cardinality
ExecStart=/usr/bin/node_exporter --collector.disable-defaults --collector.cpu --collector.netdev --collector.conntrack --collector.meminfo --collector.filesystem --collector.filesystem.mount-points-exclude=^/opt/nomad/ --collector.netstat --collector.sockstat --collector.tcpstat --collector.network_route --collector.arp --collector.filefd --collector.interrupts --collector.softirqs --collector.processes
ExecStart=/usr/bin/node_exporter --collector.disable-defaults --collector.cpu --collector.netdev --collector.conntrack --collector.meminfo --collector.diskstats --collector.filesystem --collector.filesystem.mount-points-exclude=^/opt/nomad/ --collector.netstat --collector.sockstat --collector.tcpstat --collector.network_route --collector.arp --collector.filefd --collector.interrupts --collector.softirqs --collector.processes
Restart=always
RestartSec=2

Expand Down
171 changes: 52 additions & 119 deletions packages/edge/api/intercom/src/route/pegboard.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use chirp_workflow::prelude::*;
use cluster::types::BuildDeliveryMethod;
use fdb_util::SERIALIZABLE;
use foundationdb::{self as fdb, options::StreamingMode};
use futures_util::TryStreamExt;
use futures_util::StreamExt;
use pegboard::protocol;
use rivet_api::models;
use serde_json::json;
Expand All @@ -18,51 +18,22 @@ pub async fn prewarm_image(
) -> GlobalResult<serde_json::Value> {
ctx.auth().bypass()?;

let client_id = ctx
.fdb()
.await?
.run(|tx, _mc| async move {
let alloc_idx_subspace = pegboard::keys::subspace()
.subspace(&pegboard::keys::datacenter::ClientsByRemainingMemKey::entire_subspace());
let ping_threshold_ts =
util::timestamp::now() - pegboard::workflows::client::CLIENT_ELIGIBLE_THRESHOLD_MS;

let mut stream = tx.get_ranges_keyvalues(
fdb::RangeOption {
mode: StreamingMode::Iterator,
..(&alloc_idx_subspace).into()
},
SERIALIZABLE,
);

while let Some(entry) = stream.try_next().await? {
let key = pegboard::keys::subspace()
.unpack::<pegboard::keys::datacenter::ClientsByRemainingMemKey>(entry.key())
.map_err(|x| fdb::FdbBindingError::CustomError(x.into()))?;

// Scan by last ping
if key.last_ping_ts < ping_threshold_ts {
continue;
}

return Ok(Some(key.client_id));
}

Ok(None)
})
.custom_instrument(tracing::info_span!("prewarm_fetch_tx"))
.await?;

let Some(client_id) = client_id else {
tracing::error!("no eligible clients found to prewarm image");
return Ok(json!({}));
};

let dc_id = ctx.config().server()?.rivet.edge()?.datacenter_id;
let (dc_res, builds_res) = tokio::try_join!(
let (dc_res, servers_res, builds_res) = tokio::try_join!(
ctx.op(cluster::ops::datacenter::get::Input {
datacenter_ids: vec![dc_id],
}),
ctx.op(cluster::ops::server::list::Input {
filter: cluster::types::Filter {
datacenter_ids: Some(vec![dc_id]),
pool_types: Some(vec![cluster::types::PoolType::Ats]),
..Default::default()
},
include_destroyed: false,
exclude_installing: true,
exclude_draining: true,
exclude_no_vlan: true,
}),
ctx.op(build::ops::get::Input {
build_ids: vec![image_id],
}),
Expand All @@ -77,45 +48,47 @@ pub async fn prewarm_image(
return Ok(json!({}));
};

// Get the artifact size
let uploads_res = op!([ctx] upload_get {
upload_ids: vec![build.upload_id.into()],
})
.await?;
let upload = unwrap!(uploads_res.uploads.first());
let artifact_size_bytes = upload.content_length;

let res = ctx
.signal(pegboard::workflows::client::PrewarmImage2 {
image: protocol::Image {
id: image_id,
artifact_url_stub: pegboard::util::image_artifact_url_stub(
ctx.config(),
build.upload_id,
&build::utils::file_name(build.kind, build.compression),
)?,
// We will never need to fall back to fetching directly from S3. This short
// circuits earlier in the fn.
fallback_artifact_url: None,
artifact_size_bytes,
kind: build.kind.into(),
compression: build.compression.into(),
},
})
.to_workflow::<pegboard::workflows::client::Workflow>()
.tag("client_id", client_id)
.send()
.await;

if let Some(WorkflowError::WorkflowNotFound) = res.as_workflow_error() {
tracing::warn!(
?client_id,
"client workflow not found, likely already stopped"
);
} else {
res?;
if servers_res.servers.is_empty() {
tracing::warn!(?dc_id, "no ats nodes to prewarm");
}

let artifact_url_stub = pegboard::util::image_artifact_url_stub(
ctx.config(),
build.upload_id,
&build::utils::file_name(build.kind, build.compression),
)?;
let client = rivet_pools::reqwest::client().await?;

futures_util::stream::iter(
servers_res
.servers
.into_iter()
.flat_map(|server| server.lan_ip.map(|lan_ip| (server, lan_ip))),
)
.map(|(server, lan_ip)| {
let artifact_url_stub = artifact_url_stub.clone();
let client = client.clone();

async move {
if let Err(err) = client
.get(format!("http://{}:8080{}", lan_ip, &artifact_url_stub))
.send()
.await
.and_then(|res| res.error_for_status())
{
tracing::error!(
?err,
server_id=?server.server_id,
build_id=?build.build_id,
"failed prewarming",
);
}
}
})
.buffer_unordered(16)
.collect::<()>()
.await;
Comment on lines +73 to +90
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The current implementation silently continues execution even if all prewarming HTTP requests fail. While errors are logged, there's no mechanism to report the overall success/failure rate or propagate errors to the caller. This could lead to a situation where the system believes prewarming occurred successfully when it actually failed completely.

Consider enhancing the error handling by:

  1. Tracking successful vs. failed requests
  2. Returning a meaningful response that indicates the prewarming status
  3. Potentially implementing retry logic for failed requests

This would provide better visibility into the prewarming process and allow callers to make informed decisions based on the actual outcome.

Suggested change
if let Err(err) = client
.get(format!("http://{}:8080{}", lan_ip, &artifact_url_stub))
.send()
.await
.and_then(|res| res.error_for_status())
{
tracing::error!(
?err,
server_id=?server.server_id,
build_id=?build.build_id,
"failed prewarming",
);
}
}
})
.buffer_unordered(16)
.collect::<()>()
.await;
let result = client
.get(format!("http://{}:8080{}", lan_ip, &artifact_url_stub))
.send()
.await
.and_then(|res| res.error_for_status());
match result {
Ok(_) => Ok(()),
Err(err) => {
tracing::error!(
?err,
server_id=?server.server_id,
build_id=?build.build_id,
"failed prewarming",
);
Err(())
}
}
}
})
.buffer_unordered(16)
.collect::<Vec<Result<(), ()>>>()
.await;
let total_requests = results.len();
let successful_requests = results.iter().filter(|r| r.is_ok()).count();
let failed_requests = total_requests - successful_requests;
tracing::info!(
total_requests,
successful_requests,
failed_requests,
"prewarming completed"
);
PrewarmingResult {
total_requests,
successful_requests,
failed_requests,
}

Spotted by Diamond

Fix in Graphite


Is this helpful? React 👍 or 👎 to let us know.


Ok(json!({}))
}

Expand Down Expand Up @@ -171,43 +144,3 @@ pub async fn toggle_drain_client(

Ok(json!({}))
}

async fn resolve_image_fallback_artifact_url(
ctx: &Ctx<Auth>,
dc_build_delivery_method: BuildDeliveryMethod,
build: &build::types::Build,
) -> GlobalResult<Option<String>> {
if let BuildDeliveryMethod::S3Direct = dc_build_delivery_method {
tracing::debug!("using s3 direct delivery");

// Build client
let s3_client = s3_util::Client::with_bucket_and_endpoint(
ctx.config(),
"bucket-build",
s3_util::EndpointKind::EdgeInternal,
)
.await?;

let presigned_req = s3_client
.get_object()
.bucket(s3_client.bucket())
.key(format!(
"{upload_id}/{file_name}",
upload_id = build.upload_id,
file_name = build::utils::file_name(build.kind, build.compression),
))
.presigned(
s3_util::aws_sdk_s3::presigning::PresigningConfig::builder()
.expires_in(std::time::Duration::from_secs(15 * 60))
.build()?,
)
.await?;

let addr_str = presigned_req.uri().to_string();
tracing::debug!(addr = %addr_str, "resolved artifact s3 presigned request");

Ok(Some(addr_str))
} else {
Ok(None)
}
}
18 changes: 13 additions & 5 deletions packages/edge/infra/client/manager/src/image_download_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -280,6 +280,10 @@ impl ImageDownloadHandler {
let image_size = self.download_inner(ctx, image_config).await?;
let download_duration = download_start_instant.elapsed().as_secs_f64();

crate::metrics::DOWNLOAD_IMAGE_RATE
.with_label_values(&[&(start_instant.elapsed().as_nanos() % 100).to_string()])
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There appears to be a variable name inconsistency in this code. The metric is using image_download_size which isn't defined in the current scope, while the function call on line 280 returns a value stored in image_size. To fix the compilation error, please update the metric to use the correct variable:

crate::metrics::DOWNLOAD_IMAGE_RATE
    .with_label_values(&[&(start_instant.elapsed().as_nanos() % 100).to_string()])
    .set(image_size as f64 / download_duration);

Spotted by Diamond

Fix in Graphite


Is this helpful? React 👍 or 👎 to let us know.

.set(image_download_size as f64 / download_duration);

let convert_start_instant = Instant::now();
self.convert(ctx, image_config).await?;
let convert_duration = convert_start_instant.elapsed().as_secs_f64();
Expand Down Expand Up @@ -531,12 +535,16 @@ impl ImageDownloadHandler {
ctx: &Ctx,
image_config: &protocol::Image,
) -> Result<Vec<String>> {
// Get hash from image id
let mut hasher = DefaultHasher::new();
hasher.write(image_config.id.as_bytes());
let hash = hasher.finish();
// // Get hash from image id
// let mut hasher = DefaultHasher::new();
// hasher.write(image_config.id.as_bytes());
// let hash = hasher.finish();

// let mut rng = ChaCha12Rng::seed_from_u64(hash);

let mut rng = ChaCha12Rng::seed_from_u64(hash);
// TODO: Replaced hash based randomizer with complete randomness for now
let mut rng =
rand::rngs::StdRng::from_rng(&mut rand::thread_rng()).context("failed creating rng")?;

// Shuffle based on hash
let mut addresses = self
Expand Down
7 changes: 7 additions & 0 deletions packages/edge/infra/client/manager/src/metrics/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,13 @@ lazy_static::lazy_static! {
*REGISTRY,
).unwrap();

pub static ref DOWNLOAD_IMAGE_RATE: GaugeVec = register_gauge_vec_with_registry!(
"download_image_rate",
"Rate of image download in bytes/sec",
&["bucket"],
*REGISTRY,
).unwrap();

// MARK: Actor setup step duration metrics
pub static ref SETUP_TOTAL_DURATION: Histogram = register_histogram_with_registry!(
"actor_setup_total_duration",
Expand Down
Loading