Skip to content

Commit 2c3eaf4

Browse files
committed
fix: make pb image downloads random, remove prewarm from pb protocol
1 parent 17d427c commit 2c3eaf4

File tree

4 files changed

+73
-125
lines changed

4 files changed

+73
-125
lines changed

packages/core/services/cluster/src/workflows/server/install/install_scripts/files/node_exporter.sh

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ User=node_exporter
3131
Group=node_exporter
3232
Type=simple
3333
# Reduce cardinality
34-
ExecStart=/usr/bin/node_exporter --collector.disable-defaults --collector.cpu --collector.netdev --collector.conntrack --collector.meminfo --collector.filesystem --collector.filesystem.mount-points-exclude=^/opt/nomad/ --collector.netstat --collector.sockstat --collector.tcpstat --collector.network_route --collector.arp --collector.filefd --collector.interrupts --collector.softirqs --collector.processes
34+
ExecStart=/usr/bin/node_exporter --collector.disable-defaults --collector.cpu --collector.netdev --collector.conntrack --collector.meminfo --collector.diskstats --collector.filesystem --collector.filesystem.mount-points-exclude=^/opt/nomad/ --collector.netstat --collector.sockstat --collector.tcpstat --collector.network_route --collector.arp --collector.filefd --collector.interrupts --collector.softirqs --collector.processes
3535
Restart=always
3636
RestartSec=2
3737

packages/edge/api/intercom/src/route/pegboard.rs

Lines changed: 52 additions & 119 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ use chirp_workflow::prelude::*;
33
use cluster::types::BuildDeliveryMethod;
44
use fdb_util::SERIALIZABLE;
55
use foundationdb::{self as fdb, options::StreamingMode};
6-
use futures_util::TryStreamExt;
6+
use futures_util::StreamExt;
77
use pegboard::protocol;
88
use rivet_api::models;
99
use serde_json::json;
@@ -18,51 +18,22 @@ pub async fn prewarm_image(
1818
) -> GlobalResult<serde_json::Value> {
1919
ctx.auth().bypass()?;
2020

21-
let client_id = ctx
22-
.fdb()
23-
.await?
24-
.run(|tx, _mc| async move {
25-
let alloc_idx_subspace = pegboard::keys::subspace()
26-
.subspace(&pegboard::keys::datacenter::ClientsByRemainingMemKey::entire_subspace());
27-
let ping_threshold_ts =
28-
util::timestamp::now() - pegboard::workflows::client::CLIENT_ELIGIBLE_THRESHOLD_MS;
29-
30-
let mut stream = tx.get_ranges_keyvalues(
31-
fdb::RangeOption {
32-
mode: StreamingMode::Iterator,
33-
..(&alloc_idx_subspace).into()
34-
},
35-
SERIALIZABLE,
36-
);
37-
38-
while let Some(entry) = stream.try_next().await? {
39-
let key = pegboard::keys::subspace()
40-
.unpack::<pegboard::keys::datacenter::ClientsByRemainingMemKey>(entry.key())
41-
.map_err(|x| fdb::FdbBindingError::CustomError(x.into()))?;
42-
43-
// Scan by last ping
44-
if key.last_ping_ts < ping_threshold_ts {
45-
continue;
46-
}
47-
48-
return Ok(Some(key.client_id));
49-
}
50-
51-
Ok(None)
52-
})
53-
.custom_instrument(tracing::info_span!("prewarm_fetch_tx"))
54-
.await?;
55-
56-
let Some(client_id) = client_id else {
57-
tracing::error!("no eligible clients found to prewarm image");
58-
return Ok(json!({}));
59-
};
60-
6121
let dc_id = ctx.config().server()?.rivet.edge()?.datacenter_id;
62-
let (dc_res, builds_res) = tokio::try_join!(
22+
let (dc_res, servers_res, builds_res) = tokio::try_join!(
6323
ctx.op(cluster::ops::datacenter::get::Input {
6424
datacenter_ids: vec![dc_id],
6525
}),
26+
ctx.op(cluster::ops::server::list::Input {
27+
filter: cluster::types::Filter {
28+
datacenter_ids: Some(vec![dc_id]),
29+
pool_types: Some(vec![cluster::types::PoolType::Ats]),
30+
..Default::default()
31+
},
32+
include_destroyed: false,
33+
exclude_installing: true,
34+
exclude_draining: true,
35+
exclude_no_vlan: true,
36+
}),
6637
ctx.op(build::ops::get::Input {
6738
build_ids: vec![image_id],
6839
}),
@@ -77,45 +48,47 @@ pub async fn prewarm_image(
7748
return Ok(json!({}));
7849
};
7950

80-
// Get the artifact size
81-
let uploads_res = op!([ctx] upload_get {
82-
upload_ids: vec![build.upload_id.into()],
83-
})
84-
.await?;
85-
let upload = unwrap!(uploads_res.uploads.first());
86-
let artifact_size_bytes = upload.content_length;
87-
88-
let res = ctx
89-
.signal(pegboard::workflows::client::PrewarmImage2 {
90-
image: protocol::Image {
91-
id: image_id,
92-
artifact_url_stub: pegboard::util::image_artifact_url_stub(
93-
ctx.config(),
94-
build.upload_id,
95-
&build::utils::file_name(build.kind, build.compression),
96-
)?,
97-
// We will never need to fall back to fetching directly from S3. This short
98-
// circuits earlier in the fn.
99-
fallback_artifact_url: None,
100-
artifact_size_bytes,
101-
kind: build.kind.into(),
102-
compression: build.compression.into(),
103-
},
104-
})
105-
.to_workflow::<pegboard::workflows::client::Workflow>()
106-
.tag("client_id", client_id)
107-
.send()
108-
.await;
109-
110-
if let Some(WorkflowError::WorkflowNotFound) = res.as_workflow_error() {
111-
tracing::warn!(
112-
?client_id,
113-
"client workflow not found, likely already stopped"
114-
);
115-
} else {
116-
res?;
51+
if servers_res.servers.is_empty() {
52+
tracing::warn!(?dc_id, "no ats nodes to prewarm");
11753
}
11854

55+
let artifact_url_stub = pegboard::util::image_artifact_url_stub(
56+
ctx.config(),
57+
build.upload_id,
58+
&build::utils::file_name(build.kind, build.compression),
59+
)?;
60+
let client = rivet_pools::reqwest::client().await?;
61+
62+
futures_util::stream::iter(
63+
servers_res
64+
.servers
65+
.into_iter()
66+
.flat_map(|server| server.lan_ip.map(|lan_ip| (server, lan_ip))),
67+
)
68+
.map(|(server, lan_ip)| {
69+
let artifact_url_stub = artifact_url_stub.clone();
70+
let client = client.clone();
71+
72+
async move {
73+
if let Err(err) = client
74+
.get(format!("http://{}:8080{}", lan_ip, &artifact_url_stub))
75+
.send()
76+
.await
77+
.and_then(|res| res.error_for_status())
78+
{
79+
tracing::error!(
80+
?err,
81+
server_id=?server.server_id,
82+
build_id=?build.build_id,
83+
"failed prewarming",
84+
);
85+
}
86+
}
87+
})
88+
.buffer_unordered(16)
89+
.collect::<()>()
90+
.await;
91+
11992
Ok(json!({}))
12093
}
12194

@@ -171,43 +144,3 @@ pub async fn toggle_drain_client(
171144

172145
Ok(json!({}))
173146
}
174-
175-
async fn resolve_image_fallback_artifact_url(
176-
ctx: &Ctx<Auth>,
177-
dc_build_delivery_method: BuildDeliveryMethod,
178-
build: &build::types::Build,
179-
) -> GlobalResult<Option<String>> {
180-
if let BuildDeliveryMethod::S3Direct = dc_build_delivery_method {
181-
tracing::debug!("using s3 direct delivery");
182-
183-
// Build client
184-
let s3_client = s3_util::Client::with_bucket_and_endpoint(
185-
ctx.config(),
186-
"bucket-build",
187-
s3_util::EndpointKind::EdgeInternal,
188-
)
189-
.await?;
190-
191-
let presigned_req = s3_client
192-
.get_object()
193-
.bucket(s3_client.bucket())
194-
.key(format!(
195-
"{upload_id}/{file_name}",
196-
upload_id = build.upload_id,
197-
file_name = build::utils::file_name(build.kind, build.compression),
198-
))
199-
.presigned(
200-
s3_util::aws_sdk_s3::presigning::PresigningConfig::builder()
201-
.expires_in(std::time::Duration::from_secs(15 * 60))
202-
.build()?,
203-
)
204-
.await?;
205-
206-
let addr_str = presigned_req.uri().to_string();
207-
tracing::debug!(addr = %addr_str, "resolved artifact s3 presigned request");
208-
209-
Ok(Some(addr_str))
210-
} else {
211-
Ok(None)
212-
}
213-
}

packages/edge/infra/client/manager/src/image_download_handler.rs

Lines changed: 13 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -280,6 +280,10 @@ impl ImageDownloadHandler {
280280
let image_size = self.download_inner(ctx, image_config).await?;
281281
let download_duration = download_start_instant.elapsed().as_secs_f64();
282282

283+
crate::metrics::DOWNLOAD_IMAGE_RATE
284+
.with_label_values(&[&(start_instant.elapsed().as_nanos() % 100).to_string()])
285+
.set(image_download_size as f64 / download_duration);
286+
283287
let convert_start_instant = Instant::now();
284288
self.convert(ctx, image_config).await?;
285289
let convert_duration = convert_start_instant.elapsed().as_secs_f64();
@@ -531,12 +535,16 @@ impl ImageDownloadHandler {
531535
ctx: &Ctx,
532536
image_config: &protocol::Image,
533537
) -> Result<Vec<String>> {
534-
// Get hash from image id
535-
let mut hasher = DefaultHasher::new();
536-
hasher.write(image_config.id.as_bytes());
537-
let hash = hasher.finish();
538+
// // Get hash from image id
539+
// let mut hasher = DefaultHasher::new();
540+
// hasher.write(image_config.id.as_bytes());
541+
// let hash = hasher.finish();
542+
543+
// let mut rng = ChaCha12Rng::seed_from_u64(hash);
538544

539-
let mut rng = ChaCha12Rng::seed_from_u64(hash);
545+
// TODO: Replaced hash based randomizer with complete randomness for now
546+
let mut rng =
547+
rand::rngs::StdRng::from_rng(&mut rand::thread_rng()).context("failed creating rng")?;
540548

541549
// Shuffle based on hash
542550
let mut addresses = self

packages/edge/infra/client/manager/src/metrics/mod.rs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,13 @@ lazy_static::lazy_static! {
5858
*REGISTRY,
5959
).unwrap();
6060

61+
pub static ref DOWNLOAD_IMAGE_RATE: GaugeVec = register_gauge_vec_with_registry!(
62+
"download_image_rate",
63+
"Rate of image download in bytes/sec",
64+
&["bucket"],
65+
*REGISTRY,
66+
).unwrap();
67+
6168
// MARK: Actor setup step duration metrics
6269
pub static ref SETUP_TOTAL_DURATION: Histogram = register_histogram_with_registry!(
6370
"actor_setup_total_duration",

0 commit comments

Comments
 (0)