From d6f19c029f7374332b43eb3ca43cafb3e9a0621d Mon Sep 17 00:00:00 2001 From: MasterPtato Date: Mon, 2 Jun 2025 19:40:46 +0000 Subject: [PATCH] fix: fix pb metrics standalone --- .../common/chirp-workflow/core/src/db/mod.rs | 1 + .../edge/services/pegboard/src/keys/client.rs | 4 ++- .../edge/services/pegboard/src/metrics.rs | 2 +- .../services/pegboard/src/ops/actor/get.rs | 11 ++++---- .../pegboard/src/workflows/client/mod.rs | 25 ++++++++++++++++--- .../usage-metrics-publish/src/lib.rs | 4 +-- 6 files changed, 33 insertions(+), 14 deletions(-) diff --git a/packages/common/chirp-workflow/core/src/db/mod.rs b/packages/common/chirp-workflow/core/src/db/mod.rs index 2a6aa13c7b..682c400f37 100644 --- a/packages/common/chirp-workflow/core/src/db/mod.rs +++ b/packages/common/chirp-workflow/core/src/db/mod.rs @@ -299,6 +299,7 @@ pub trait Database: Send { ) -> WorkflowResult<()>; } +#[derive(Debug)] pub struct WorkflowData { pub workflow_id: Uuid, input: Box, diff --git a/packages/edge/services/pegboard/src/keys/client.rs b/packages/edge/services/pegboard/src/keys/client.rs index 9900db56a8..15347d5771 100644 --- a/packages/edge/services/pegboard/src/keys/client.rs +++ b/packages/edge/services/pegboard/src/keys/client.rs @@ -299,7 +299,9 @@ pub struct ActorSubspaceKey { impl ActorSubspaceKey { fn new(client_id: Uuid) -> Self { - ActorSubspaceKey { client_id: Some(client_id) } + ActorSubspaceKey { + client_id: Some(client_id), + } } fn entire() -> Self { diff --git a/packages/edge/services/pegboard/src/metrics.rs b/packages/edge/services/pegboard/src/metrics.rs index db5515a90e..e133c444ef 100644 --- a/packages/edge/services/pegboard/src/metrics.rs +++ b/packages/edge/services/pegboard/src/metrics.rs @@ -44,7 +44,7 @@ lazy_static::lazy_static! { &["env_id", "flavor"], *REGISTRY, ).unwrap(); - + pub static ref ENV_MEMORY_USAGE: IntGaugeVec = register_int_gauge_vec_with_registry!( "pegboard_env_memory_usage", "Total MiB of memory used by an environment.", diff --git a/packages/edge/services/pegboard/src/ops/actor/get.rs b/packages/edge/services/pegboard/src/ops/actor/get.rs index 6d6300f5d6..a8103c36c6 100644 --- a/packages/edge/services/pegboard/src/ops/actor/get.rs +++ b/packages/edge/services/pegboard/src/ops/actor/get.rs @@ -117,7 +117,7 @@ pub async fn pegboard_actor_get(ctx: &OperationCtx, input: &Input) -> GlobalResu let actor_data = futures_util::stream::iter(actors_with_wf_ids) .map(|(actor_id, workflow_id)| async move { - let pool = &match ctx.sqlite_for_workflow(workflow_id).await { + let pool = match ctx.sqlite_for_workflow(workflow_id).await { Ok(x) => x, Err(err) if matches!( @@ -125,13 +125,12 @@ pub async fn pegboard_actor_get(ctx: &OperationCtx, input: &Input) -> GlobalResu Some(WorkflowError::WorkflowNotFound) ) => { - // Workflow is complete - return GlobalResult::Ok(None); - } - Err(err) => { - return GlobalResult::Err(err); + tracing::warn!(?actor_id, ?workflow_id, "actor workflow not found"); + return Ok(None); } + res => res?, }; + let pool = &pool; let (actor_row, port_ingress_rows, port_host_rows, port_proxied_rows) = tokio::try_join!( sql_fetch_one!( diff --git a/packages/edge/services/pegboard/src/workflows/client/mod.rs b/packages/edge/services/pegboard/src/workflows/client/mod.rs index 7dcb876d88..79c59357ab 100644 --- a/packages/edge/services/pegboard/src/workflows/client/mod.rs +++ b/packages/edge/services/pegboard/src/workflows/client/mod.rs @@ -106,7 +106,11 @@ pub async fn pegboard_client(ctx: &mut WorkflowCtx, input: &Input) -> GlobalResu config, system, }), - activity(UpdateMetricsInput { client_id, flavor, clear: false }), + activity(UpdateMetricsInput { + client_id, + flavor, + clear: false, + }), )) .await?; } @@ -118,7 +122,11 @@ pub async fn pegboard_client(ctx: &mut WorkflowCtx, input: &Input) -> GlobalResu client_id, events: events.clone(), }), - activity(UpdateMetricsInput { client_id, flavor, clear: false }), + activity(UpdateMetricsInput { + client_id, + flavor, + clear: false, + }), )) .await?; @@ -243,7 +251,12 @@ pub async fn pegboard_client(ctx: &mut WorkflowCtx, input: &Input) -> GlobalResu }) .await?; - ctx.activity(UpdateMetricsInput { client_id: input.client_id, flavor: input.flavor, clear: true }).await?; + ctx.activity(UpdateMetricsInput { + client_id: input.client_id, + flavor: input.flavor, + clear: true, + }) + .await?; let actors = ctx .activity(FetchRemainingActorsInput { @@ -675,7 +688,11 @@ pub async fn handle_commands( activity(InsertCommandsInput { commands: raw_commands.clone(), }), - activity(UpdateMetricsInput { client_id, flavor, clear: false }), + activity(UpdateMetricsInput { + client_id, + flavor, + clear: false, + }), )) .await?; diff --git a/packages/edge/services/pegboard/standalone/usage-metrics-publish/src/lib.rs b/packages/edge/services/pegboard/standalone/usage-metrics-publish/src/lib.rs index 3cb3184a5f..1e3681dff6 100644 --- a/packages/edge/services/pegboard/standalone/usage-metrics-publish/src/lib.rs +++ b/packages/edge/services/pegboard/standalone/usage-metrics-publish/src/lib.rs @@ -32,7 +32,7 @@ pub async fn run_from_env( .wrap_new("pegboard-usage-metrics-publish"); let cache = rivet_cache::CacheInner::from_env(&config, pools.clone())?; let ctx = StandaloneCtx::new( - db::DatabaseCrdbNats::from_pools(pools.clone())?, + db::DatabaseFdbSqliteNats::from_pools(pools.clone())?, config, rivet_connection::Connection::new(client, pools, cache), "pegboard-usage-metrics-publish", @@ -68,7 +68,7 @@ pub async fn run_from_env( .try_collect::>() .await }) - .custom_instrument(tracing::info_span!("client_fetch_remaining_actors_tx")) + .custom_instrument(tracing::info_span!("fetch_running_actors_tx")) .await?; let actors_res = ctx