Skip to content

Commit b1d3e71

Browse files
committed
feat(actors): expose container system metrics
1 parent b7c477e commit b1d3e71

File tree

67 files changed

+2121
-35
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

67 files changed

+2121
-35
lines changed

docker/dev-full/docker-compose.yml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -233,6 +233,8 @@ services:
233233
# We only reserve 100 ports instead of the default 22,000. See
234234
# rivet-guard for explanation.
235235
- "7600-7699:7600-7699"
236+
# cAdvisor metrics endpoint
237+
- "7780:7780"
236238
networks:
237239
- rivet-network
238240

docker/dev-full/otel-collector/config.yaml

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,14 @@ receivers:
55
endpoint: 0.0.0.0:4317
66
http:
77
endpoint: 0.0.0.0:4318
8+
prometheus:
9+
config:
10+
scrape_configs:
11+
- job_name: 'cadvisor'
12+
static_configs:
13+
- targets: ['rivet-client:7780']
14+
metrics_path: /metrics
15+
scrape_interval: 30s
816

917
processors:
1018
batch:
@@ -52,7 +60,7 @@ service:
5260
processors: [batch]
5361
exporters: [clickhouse]
5462
metrics:
55-
receivers: [otlp]
63+
receivers: [otlp, prometheus]
5664
processors: [batch]
5765
exporters: [clickhouse]
5866

docker/universal/Dockerfile

Lines changed: 19 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -120,8 +120,8 @@ RUN apt-get update -y && \
120120
apt-get install -y ca-certificates openssl curl tini curl && \
121121
curl -Lf -o /lib/libfdb_c.so "https://github.com/apple/foundationdb/releases/download/7.1.60/libfdb_c.x86_64.so"
122122

123-
# MARK: Runner (Full)
124-
FROM --platform=linux/amd64 base-runner AS client-full
123+
# MARK: Runner (Slim)
124+
FROM --platform=linux/amd64 base-runner AS client-slim
125125
ARG CNI_PLUGINS_VERSION=1.3.0
126126
RUN apt-get install -y skopeo iproute2 runc dnsutils && \
127127
echo "Downloading lz4" && \
@@ -142,6 +142,23 @@ COPY ./docker/dev-full/rivet-client/rivet-actor.conflist /opt/cni/config/rivet-a
142142
COPY --from=builder /app/dist/rivet-client /app/dist/rivet-container-runner /usr/local/bin/
143143
ENTRYPOINT ["/usr/bin/tini", "--", "entrypoint.sh"]
144144

145+
# MARK: Runner (Full)
146+
FROM client-slim AS client-full
147+
ARG CADVISOR_VERSION=v0.52.0
148+
RUN apt-get update -y && \
149+
apt-get install -y wget && \
150+
wget -O /usr/local/bin/cadvisor "https://github.com/google/cadvisor/releases/download/${CADVISOR_VERSION}/cadvisor-${CADVISOR_VERSION}-linux-amd64" && \
151+
chmod +x /usr/local/bin/cadvisor && \
152+
apt-get clean && \
153+
rm -rf /var/lib/apt/lists/*
154+
155+
COPY docker/universal/client-full-entrypoint.sh /usr/local/bin/client-full-entrypoint.sh
156+
RUN chmod +x /usr/local/bin/client-full-entrypoint.sh
157+
158+
EXPOSE 7780
159+
160+
ENTRYPOINT ["/usr/bin/tini", "--", "/usr/local/bin/client-full-entrypoint.sh"]
161+
145162
# MARK: Monlith
146163
FROM --platform=linux/amd64 debian:12.9-slim AS monolith
147164
ENV DEBIAN_FRONTEND=noninteractive
Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
#!/bin/bash
2+
set -e
3+
4+
# Start cadvisor in the background
5+
cadvisor \
6+
--port=7780 \
7+
--listen_ip=0.0.0.0 \
8+
--prometheus_endpoint="/metrics" \
9+
--enable_metrics=cpu,cpuLoad,memory,network,disk,diskIO,oom_event,process,tcp,udp \
10+
--docker_only=false \
11+
--disable_root_cgroup_stats=false &
12+
13+
# TODO:
14+
# --raw_cgroup_prefix_whitelist="" \
15+
16+
# Start rivet-client with all passed arguments
17+
exec rivet-client "$@"
Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
---
2+
name = "ACTOR_METRICS_INVALID_METRICS"
3+
description = "Invalid metrics format."
4+
http_status = 400
5+
---
6+
7+
# Invalid Metrics
8+
9+
The provided list of metrics is not in a valid JSON format. Please provide a valid JSON array of metric names.
Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
---
2+
name = "ACTOR_METRICS_NO_METRICS"
3+
description = "No metrics specified."
4+
http_status = 400
5+
---
6+
7+
# No Metrics
8+
9+
No metrics were specified in the request. Please provide at least one metric name to query.
Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
---
2+
name = "ACTOR_METRICS_UNSUPPORTED_METRICS"
3+
description = "Unsupported metrics requested."
4+
http_status = 400
5+
---
6+
7+
# Unsupported Metrics
8+
9+
The requested metrics are not supported. Supported metrics include: cpu, memory, memory_limit, network_rx_bytes, network_tx_bytes.
Lines changed: 270 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,270 @@
1+
use api_helper::{
2+
anchor::{WatchIndexQuery, WatchResponse},
3+
ctx::Ctx,
4+
};
5+
use rivet_api::models;
6+
use rivet_operation::prelude::*;
7+
use serde::Deserialize;
8+
use std::collections::HashMap;
9+
use std::time::Duration;
10+
11+
use crate::{
12+
assert,
13+
auth::{Auth, CheckOpts, CheckOutput},
14+
utils::build_global_query_compat,
15+
};
16+
17+
use super::GlobalQuery;
18+
19+
// MARK: GET /actors/metrics/history
20+
#[derive(Debug, Deserialize)]
21+
pub struct GetActorMetricsQuery {
22+
#[serde(flatten)]
23+
pub global: GlobalQuery,
24+
pub start_at: i64,
25+
pub finish_at: i64,
26+
pub interval: i64,
27+
pub actor_ids_json: String,
28+
pub metrics_json: String,
29+
}
30+
31+
#[derive(Debug, clickhouse::Row, serde::Deserialize)]
32+
pub struct MetricRow {
33+
pub time_bucket: i64,
34+
pub actor_id_str: String,
35+
pub metric_name: String,
36+
pub value: f64,
37+
}
38+
39+
#[tracing::instrument(skip_all)]
40+
pub async fn get_metrics(
41+
ctx: Ctx<Auth>,
42+
watch_index: WatchIndexQuery,
43+
query: GetActorMetricsQuery,
44+
) -> GlobalResult<models::ActorsGetActorMetricsResponse> {
45+
let CheckOutput { game_id, env_id } = ctx
46+
.auth()
47+
.check(
48+
ctx.op_ctx(),
49+
CheckOpts {
50+
query: &query.global,
51+
allow_service_token: false,
52+
opt_auth: false,
53+
},
54+
)
55+
.await?;
56+
57+
// Parse actor IDs from the JSON string
58+
let actor_ids: Vec<Uuid> = unwrap_with!(
59+
serde_json::from_str(&query.actor_ids_json).ok(),
60+
ACTOR_LOGS_INVALID_ACTOR_IDS
61+
);
62+
63+
ensure_with!(!actor_ids.is_empty(), ACTOR_LOGS_NO_ACTOR_IDS);
64+
65+
// Parse metrics from the JSON string
66+
let requested_metrics: Vec<String> = unwrap_with!(
67+
serde_json::from_str(&query.metrics_json).ok(),
68+
ACTOR_METRICS_INVALID_METRICS
69+
);
70+
71+
ensure_with!(!requested_metrics.is_empty(), ACTOR_METRICS_NO_METRICS);
72+
73+
// Filter to only valid actors for this game/env
74+
let valid_actor_ids = assert::actor_for_env(&ctx, &actor_ids, game_id, env_id, None).await?;
75+
76+
// Exit early if no valid actors
77+
ensure_with!(!valid_actor_ids.is_empty(), ACTOR_LOGS_NO_VALID_ACTOR_IDS);
78+
79+
// Use only the valid actor IDs from now on
80+
let actor_ids = valid_actor_ids;
81+
82+
// Get ClickHouse client
83+
let clickhouse = ctx.clickhouse().await?;
84+
85+
// Convert actor IDs to strings for the query
86+
let actor_id_strings: Vec<String> = actor_ids.iter().map(|id| id.to_string()).collect();
87+
88+
// Map common metric names to ClickHouse metric names
89+
let metric_mapping = get_metric_mapping();
90+
let clickhouse_metrics: Vec<String> = requested_metrics
91+
.iter()
92+
.filter_map(|m| metric_mapping.get(m).cloned())
93+
.collect();
94+
95+
ensure_with!(
96+
!clickhouse_metrics.is_empty(),
97+
ACTOR_METRICS_UNSUPPORTED_METRICS
98+
);
99+
100+
// Build the query - try to use container label first, fallback to container name parsing
101+
let query_sql = formatdoc!(
102+
"
103+
SELECT
104+
toUnixTimestamp(toStartOfInterval(TimeUnix, INTERVAL ? SECOND)) as time_bucket,
105+
COALESCE(
106+
ResourceAttributes['container_label_com_rivet_actor_id'],
107+
if(
108+
match(Attributes['container_name'], '^[0-9a-f]{{8}}-[0-9a-f]{{4}}-[0-9a-f]{{4}}-[0-9a-f]{{4}}-[0-9a-f]{{12}}-[0-9]+$'),
109+
substring(Attributes['container_name'], 1, 36),
110+
''
111+
)
112+
) as actor_id_str,
113+
MetricName as metric_name,
114+
avg(Value) as value
115+
FROM otel.otel_metrics_gauge
116+
WHERE
117+
TimeUnix >= fromUnixTimestamp(?)
118+
AND TimeUnix <= fromUnixTimestamp(?)
119+
AND (
120+
ResourceAttributes['container_label_com_rivet_actor_id'] IN ?
121+
OR substring(Attributes['container_name'], 1, 36) IN ?
122+
)
123+
AND MetricName IN ?
124+
AND actor_id_str != ''
125+
GROUP BY time_bucket, actor_id_str, metric_name
126+
ORDER BY time_bucket ASC, actor_id_str, metric_name
127+
"
128+
);
129+
130+
let rows = clickhouse
131+
.query(&query_sql)
132+
.bind(query.interval)
133+
.bind(query.start_at)
134+
.bind(query.finish_at)
135+
.bind(&actor_id_strings)
136+
.bind(&actor_id_strings) // Used twice in the query
137+
.bind(&clickhouse_metrics)
138+
.fetch_all::<MetricRow>()
139+
.await
140+
.map_err(|err| GlobalError::from(err))?;
141+
142+
// Process the results
143+
let mut actor_metrics: HashMap<String, HashMap<String, Vec<f64>>> = HashMap::new();
144+
let mut time_buckets: Vec<i64> = Vec::new();
145+
146+
// Initialize data structures
147+
for actor_id in &actor_id_strings {
148+
actor_metrics.insert(actor_id.clone(), HashMap::new());
149+
for metric in &requested_metrics {
150+
actor_metrics
151+
.get_mut(actor_id)
152+
.unwrap()
153+
.insert(metric.clone(), Vec::new());
154+
}
155+
}
156+
157+
// Generate time buckets
158+
let mut current_time = query.start_at;
159+
while current_time <= query.finish_at {
160+
time_buckets.push(current_time);
161+
current_time += query.interval;
162+
}
163+
164+
// Fill in the data
165+
let reverse_metric_mapping: HashMap<String, String> = metric_mapping
166+
.iter()
167+
.map(|(k, v)| (v.clone(), k.clone()))
168+
.collect();
169+
170+
for row in rows {
171+
if let (Some(original_metric), Some(actor_metrics_map)) = (
172+
reverse_metric_mapping.get(&row.metric_name),
173+
actor_metrics.get_mut(&row.actor_id_str),
174+
) {
175+
if let Some(metric_values) = actor_metrics_map.get_mut(original_metric) {
176+
// Find the index for this time bucket
177+
if let Some(bucket_index) = time_buckets.iter().position(|&t| t == row.time_bucket)
178+
{
179+
// Extend the vector if needed
180+
while metric_values.len() <= bucket_index {
181+
metric_values.push(0.0);
182+
}
183+
metric_values[bucket_index] = row.value;
184+
}
185+
}
186+
}
187+
}
188+
189+
// Fill in missing values with 0.0 and ensure all vectors are the same length
190+
for actor_id in &actor_id_strings {
191+
if let Some(actor_metrics_map) = actor_metrics.get_mut(actor_id) {
192+
for metric_values in actor_metrics_map.values_mut() {
193+
while metric_values.len() < time_buckets.len() {
194+
metric_values.push(0.0);
195+
}
196+
}
197+
}
198+
}
199+
200+
// Prepare the response format: metrics[metric_index][time_index] = value
201+
// The API expects metrics: Vec<Vec<f64>> where outer vec is per metric, inner vec is per time
202+
let mut response_metrics: Vec<Vec<f64>> = Vec::new();
203+
204+
for metric in &requested_metrics {
205+
let mut metric_time_series: Vec<f64> = Vec::new();
206+
207+
for time_bucket in &time_buckets {
208+
let mut total_value = 0.0;
209+
let mut count = 0;
210+
211+
// Aggregate across all actors for this metric at this time
212+
for actor_id in &actor_id_strings {
213+
if let Some(actor_metrics_map) = actor_metrics.get(actor_id) {
214+
if let Some(metric_values) = actor_metrics_map.get(metric) {
215+
if let Some(bucket_index) =
216+
time_buckets.iter().position(|&t| t == *time_bucket)
217+
{
218+
if bucket_index < metric_values.len() {
219+
total_value += metric_values[bucket_index];
220+
count += 1;
221+
}
222+
}
223+
}
224+
}
225+
}
226+
227+
// Use average value across actors
228+
metric_time_series.push(if count > 0 {
229+
total_value / count as f64
230+
} else {
231+
0.0
232+
});
233+
}
234+
235+
response_metrics.push(metric_time_series);
236+
}
237+
238+
Ok(models::ActorsGetActorMetricsResponse {
239+
actor_ids: actor_id_strings,
240+
metrics: response_metrics,
241+
})
242+
}
243+
244+
fn get_metric_mapping() -> HashMap<String, String> {
245+
let mut mapping = HashMap::new();
246+
247+
// Map user-friendly metric names to ClickHouse metric names from cAdvisor
248+
mapping.insert(
249+
"cpu".to_string(),
250+
"container_cpu_usage_seconds_total".to_string(),
251+
);
252+
mapping.insert(
253+
"memory".to_string(),
254+
"container_memory_usage_bytes".to_string(),
255+
);
256+
mapping.insert(
257+
"memory_limit".to_string(),
258+
"container_spec_memory_limit_bytes".to_string(),
259+
);
260+
mapping.insert(
261+
"network_rx_bytes".to_string(),
262+
"container_network_receive_bytes_total".to_string(),
263+
);
264+
mapping.insert(
265+
"network_tx_bytes".to_string(),
266+
"container_network_transmit_bytes_total".to_string(),
267+
);
268+
269+
mapping
270+
}

0 commit comments

Comments
 (0)