From 5c607af746e0c8710951e8a7bd2c9394b9f226b1 Mon Sep 17 00:00:00 2001 From: MasterPtato Date: Fri, 6 Jun 2025 00:36:12 +0000 Subject: [PATCH] feat(pegboard): add draining state to alloc metrics --- .../edge/services/pegboard/src/metrics.rs | 8 +- .../pegboard/src/workflows/actor/analytics.rs | 3 +- .../pegboard/src/workflows/client/mod.rs | 138 +++++++++++++++--- 3 files changed, 125 insertions(+), 24 deletions(-) diff --git a/packages/edge/services/pegboard/src/metrics.rs b/packages/edge/services/pegboard/src/metrics.rs index 24a7941054..0eb40d908e 100644 --- a/packages/edge/services/pegboard/src/metrics.rs +++ b/packages/edge/services/pegboard/src/metrics.rs @@ -11,28 +11,28 @@ lazy_static::lazy_static! { pub static ref CLIENT_MEMORY_TOTAL: IntGaugeVec = register_int_gauge_vec_with_registry!( "pegboard_client_memory_total", "Total MiB of memory available on a client.", - &["client_id", "flavor"], + &["client_id", "flavor", "state"], *REGISTRY ).unwrap(); pub static ref CLIENT_CPU_TOTAL: IntGaugeVec = register_int_gauge_vec_with_registry!( "pegboard_client_cpu_total", "Total millicores of cpu available on a client.", - &["client_id", "flavor"], + &["client_id", "flavor", "state"], *REGISTRY ).unwrap(); pub static ref CLIENT_MEMORY_ALLOCATED: IntGaugeVec = register_int_gauge_vec_with_registry!( "pegboard_client_memory_allocated", "Total MiB of memory allocated on a client.", - &["client_id", "flavor"], + &["client_id", "flavor", "state"], *REGISTRY ).unwrap(); pub static ref CLIENT_CPU_ALLOCATED: IntGaugeVec = register_int_gauge_vec_with_registry!( "pegboard_client_cpu_allocated", "Total millicores of cpu allocated on a client.", - &["client_id", "flavor"], + &["client_id", "flavor", "state"], *REGISTRY ).unwrap(); diff --git a/packages/edge/services/pegboard/src/workflows/actor/analytics.rs b/packages/edge/services/pegboard/src/workflows/actor/analytics.rs index 9b42c73697..3d3f29e197 100644 --- a/packages/edge/services/pegboard/src/workflows/actor/analytics.rs +++ b/packages/edge/services/pegboard/src/workflows/actor/analytics.rs @@ -277,7 +277,8 @@ pub async fn insert_clickhouse( selected_memory_mib: state_row.selected_resources_memory_mib.unwrap_or_default() as u32, root_user_enabled: state_row.root_user_enabled, env_vars: state_row.environment.len() as i64, - env_var_bytes: state_row.environment + env_var_bytes: state_row + .environment .iter() .map(|(k, v)| k.len() + v.len()) .sum::() as i64, diff --git a/packages/edge/services/pegboard/src/workflows/client/mod.rs b/packages/edge/services/pegboard/src/workflows/client/mod.rs index c5e31c5c5f..238c732d24 100644 --- a/packages/edge/services/pegboard/src/workflows/client/mod.rs +++ b/packages/edge/services/pegboard/src/workflows/client/mod.rs @@ -109,6 +109,7 @@ pub async fn pegboard_client(ctx: &mut WorkflowCtx, input: &Input) -> GlobalResu activity(UpdateMetricsInput { client_id, flavor, + draining: state.drain_timeout_ts.is_some(), clear: false, }), )) @@ -125,6 +126,7 @@ pub async fn pegboard_client(ctx: &mut WorkflowCtx, input: &Input) -> GlobalResu activity(UpdateMetricsInput { client_id, flavor, + draining: state.drain_timeout_ts.is_some(), clear: false, }), )) @@ -254,6 +256,7 @@ pub async fn pegboard_client(ctx: &mut WorkflowCtx, input: &Input) -> GlobalResu ctx.activity(UpdateMetricsInput { client_id: input.client_id, flavor: input.flavor, + draining: false, clear: true, }) .await?; @@ -691,6 +694,7 @@ pub async fn handle_commands( activity(UpdateMetricsInput { client_id, flavor, + draining: drain_timeout_ts.is_some(), clear: false, }), )) @@ -933,24 +937,75 @@ async fn check_expired(ctx: &ActivityCtx, input: &CheckExpiredInput) -> GlobalRe struct UpdateMetricsInput { client_id: Uuid, flavor: ClientFlavor, + #[serde(default)] + draining: bool, clear: bool, } #[activity(UpdateMetrics)] async fn update_metrics(ctx: &ActivityCtx, input: &UpdateMetricsInput) -> GlobalResult<()> { if input.clear { + metrics::CLIENT_MEMORY_TOTAL + .with_label_values(&[ + &input.client_id.to_string(), + &input.flavor.to_string(), + "active", + ]) + .set(0); + metrics::CLIENT_CPU_TOTAL + .with_label_values(&[ + &input.client_id.to_string(), + &input.flavor.to_string(), + "active", + ]) + .set(0); + metrics::CLIENT_MEMORY_TOTAL + .with_label_values(&[ + &input.client_id.to_string(), + &input.flavor.to_string(), + "draining", + ]) + .set(0); + metrics::CLIENT_CPU_TOTAL + .with_label_values(&[ + &input.client_id.to_string(), + &input.flavor.to_string(), + "draining", + ]) + .set(0); metrics::CLIENT_MEMORY_ALLOCATED - .with_label_values(&[&input.client_id.to_string(), &input.flavor.to_string()]) + .with_label_values(&[ + &input.client_id.to_string(), + &input.flavor.to_string(), + "active", + ]) .set(0); - metrics::CLIENT_CPU_ALLOCATED - .with_label_values(&[&input.client_id.to_string(), &input.flavor.to_string()]) + .with_label_values(&[ + &input.client_id.to_string(), + &input.flavor.to_string(), + "active", + ]) + .set(0); + metrics::CLIENT_MEMORY_ALLOCATED + .with_label_values(&[ + &input.client_id.to_string(), + &input.flavor.to_string(), + "draining", + ]) + .set(0); + metrics::CLIENT_CPU_ALLOCATED + .with_label_values(&[ + &input.client_id.to_string(), + &input.flavor.to_string(), + "draining", + ]) .set(0); return Ok(()); } - let (total_mem, total_cpu, remaining_mem, remaining_cpu) = + let (total_mem, remaining_mem, total_cpu, remaining_cpu) = ctx.fdb() .await? .run(|tx, _mc| async move { @@ -992,35 +1047,80 @@ async fn update_metrics(ctx: &ActivityCtx, input: &UpdateMetricsInput) -> Global ) .map_err(|x| fdb::FdbBindingError::CustomError(x.into()))?; - Ok(( - total_mem, - remaining_mem, - total_cpu, - remaining_cpu, - )) + Ok((total_mem, remaining_mem, total_cpu, remaining_cpu)) }) .custom_instrument(tracing::info_span!("client_update_metrics_tx")) .await?; + let (state, other_state) = if input.draining { + ("draining", "active") + } else { + ("active", "draining") + }; + let allocated_mem = total_mem.saturating_sub(remaining_mem); + let allocated_cpu = total_cpu.saturating_sub(remaining_cpu); + metrics::CLIENT_MEMORY_TOTAL - .with_label_values(&[&input.client_id.to_string(), &input.flavor.to_string()]) + .with_label_values(&[ + &input.client_id.to_string(), + &input.flavor.to_string(), + state, + ]) .set(total_mem.try_into()?); - metrics::CLIENT_CPU_TOTAL - .with_label_values(&[&input.client_id.to_string(), &input.flavor.to_string()]) + .with_label_values(&[ + &input.client_id.to_string(), + &input.flavor.to_string(), + state, + ]) .set(total_cpu.try_into()?); - let allocated_mem = total_mem.saturating_sub(remaining_mem); - let allocated_cpu = total_cpu.saturating_sub(remaining_cpu); - metrics::CLIENT_MEMORY_ALLOCATED - .with_label_values(&[&input.client_id.to_string(), &input.flavor.to_string()]) + .with_label_values(&[ + &input.client_id.to_string(), + &input.flavor.to_string(), + state, + ]) .set(allocated_mem.try_into()?); - metrics::CLIENT_CPU_ALLOCATED - .with_label_values(&[&input.client_id.to_string(), &input.flavor.to_string()]) + .with_label_values(&[ + &input.client_id.to_string(), + &input.flavor.to_string(), + state, + ]) .set(allocated_cpu.try_into()?); + // Clear other state + metrics::CLIENT_MEMORY_TOTAL + .with_label_values(&[ + &input.client_id.to_string(), + &input.flavor.to_string(), + other_state, + ]) + .set(0); + metrics::CLIENT_CPU_TOTAL + .with_label_values(&[ + &input.client_id.to_string(), + &input.flavor.to_string(), + other_state, + ]) + .set(0); + + metrics::CLIENT_MEMORY_ALLOCATED + .with_label_values(&[ + &input.client_id.to_string(), + &input.flavor.to_string(), + other_state, + ]) + .set(0); + metrics::CLIENT_CPU_ALLOCATED + .with_label_values(&[ + &input.client_id.to_string(), + &input.flavor.to_string(), + other_state, + ]) + .set(0); + Ok(()) }