Skip to content

Commit 1eac431

Browse files
MasterPtatoNathanFlurry
authored andcommitted
feat: add pb usage metrics, server state
1 parent ecbaa48 commit 1eac431

File tree

13 files changed

+247
-176
lines changed

13 files changed

+247
-176
lines changed

Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,7 @@ rev = "b2c6f366ee68a7956fb69dd4f39357b3c184bd15"
6565

6666
[workspace.dependencies.rivet-term]
6767
git = "https://github.com/rivet-gg/rivet-term"
68-
rev = "b21d7a2"
68+
rev = "55e328470b68c557fb9bc8298369f90182d35b6d"
6969

7070
[workspace.dependencies.redis]
7171
git = "https://github.com/rivet-gg/redis-rs"

packages/core/services/cluster/src/ops/server/get.rs

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ use std::{
55

66
use chirp_workflow::prelude::*;
77

8-
use crate::types::{PoolType, Server};
8+
use crate::types::{PoolType, Server, ServerState};
99

1010
#[derive(Debug)]
1111
pub struct Input {
@@ -26,6 +26,7 @@ pub(crate) struct ServerRow {
2626
vlan_ip: Option<IpAddr>,
2727
public_ip: Option<IpAddr>,
2828
cloud_destroy_ts: Option<i64>,
29+
state: i64,
2930
}
3031

3132
impl TryFrom<ServerRow> for Server {
@@ -40,6 +41,7 @@ impl TryFrom<ServerRow> for Server {
4041
lan_ip: value.vlan_ip,
4142
wan_ip: value.public_ip,
4243
cloud_destroy_ts: value.cloud_destroy_ts,
44+
state: unwrap!(ServerState::from_repr(value.state.try_into()?)),
4345
})
4446
}
4547
}
@@ -56,7 +58,16 @@ pub async fn cluster_server_get(ctx: &OperationCtx, input: &Input) -> GlobalResu
5658
provider_server_id,
5759
vlan_ip,
5860
public_ip,
59-
cloud_destroy_ts
61+
cloud_destroy_ts,
62+
CASE
63+
WHEN cloud_destroy_ts IS NOT NULL THEN 6 -- Destroyed
64+
WHEN taint_ts IS NOT NULL AND drain_ts IS NOT NULL THEN 5 -- TaintedDraining
65+
WHEN drain_ts IS NOT NULL THEN 4 -- Draining
66+
WHEN taint_ts IS NOT NULL THEN 3 -- Tainted
67+
WHEN install_complete_ts IS NOT NULL THEN 2 -- Running
68+
WHEN provision_complete_ts IS NOT NULL THEN 1 -- Installing
69+
ELSE 0 -- Provisioning
70+
END AS state
6071
FROM db_cluster.servers
6172
WHERE server_id = ANY($1)
6273
",

packages/core/services/cluster/src/ops/server/list.rs

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,16 @@ pub async fn cluster_server_list(ctx: &OperationCtx, input: &Input) -> GlobalRes
3030
s.provider_server_id,
3131
s.vlan_ip,
3232
s.public_ip,
33-
s.cloud_destroy_ts
33+
s.cloud_destroy_ts,
34+
CASE
35+
WHEN s.cloud_destroy_ts IS NOT NULL THEN 6 -- Destroyed
36+
WHEN s.taint_ts IS NOT NULL AND s.drain_ts IS NOT NULL THEN 5 -- TaintedDraining
37+
WHEN s.drain_ts IS NOT NULL THEN 4 -- Draining
38+
WHEN s.taint_ts IS NOT NULL THEN 3 -- Tainted
39+
WHEN s.install_complete_ts IS NOT NULL THEN 2 -- Running
40+
WHEN s.provision_complete_ts IS NOT NULL THEN 1 -- Installing
41+
ELSE 0 -- Provisioning
42+
END AS state
3443
FROM db_cluster.servers AS s
3544
JOIN db_cluster.datacenters AS d
3645
ON s.datacenter_id = d.datacenter_id

packages/core/services/cluster/src/types.rs

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -149,6 +149,18 @@ pub struct Server {
149149
pub lan_ip: Option<IpAddr>,
150150
pub wan_ip: Option<IpAddr>,
151151
pub cloud_destroy_ts: Option<i64>,
152+
pub state: ServerState,
153+
}
154+
155+
#[derive(Serialize, Deserialize, Hash, Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, FromRepr)]
156+
pub enum ServerState {
157+
Provisioning = 0,
158+
Installing = 1,
159+
Running = 2,
160+
Tainted = 3,
161+
Draining = 4,
162+
TaintedDraining = 5,
163+
Destroyed = 6,
152164
}
153165

154166
#[derive(Debug, Default, Clone)]

packages/edge/infra/edge-server/src/run_config.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,11 @@ pub fn config(_rivet_config: rivet_config::Config) -> Result<RunConfigData> {
2222
ServiceKind::Standalone,
2323
|config, pools| Box::pin(edge_monolith_workflow_worker::start(config, pools)),
2424
),
25+
Service::new(
26+
"pegboard_usage_metrics_publish",
27+
ServiceKind::Singleton,
28+
|config, pools| Box::pin(pegboard_usage_metrics_publish::start(config, pools)),
29+
),
2530
];
2631

2732
Ok(RunConfigData {

packages/edge/services/pegboard/src/keys/client.rs

Lines changed: 20 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -246,6 +246,10 @@ impl ActorKey {
246246
pub fn subspace(client_id: Uuid) -> ActorSubspaceKey {
247247
ActorSubspaceKey::new(client_id)
248248
}
249+
250+
pub fn entire_subspace() -> ActorSubspaceKey {
251+
ActorSubspaceKey::entire()
252+
}
249253
}
250254

251255
impl FormalKey for ActorKey {
@@ -290,12 +294,16 @@ impl<'de> TupleUnpack<'de> for ActorKey {
290294
}
291295

292296
pub struct ActorSubspaceKey {
293-
client_id: Uuid,
297+
client_id: Option<Uuid>,
294298
}
295299

296300
impl ActorSubspaceKey {
297301
fn new(client_id: Uuid) -> Self {
298-
ActorSubspaceKey { client_id }
302+
ActorSubspaceKey { client_id: Some(client_id) }
303+
}
304+
305+
fn entire() -> Self {
306+
ActorSubspaceKey { client_id: None }
299307
}
300308
}
301309

@@ -305,8 +313,16 @@ impl TuplePack for ActorSubspaceKey {
305313
w: &mut W,
306314
tuple_depth: TupleDepth,
307315
) -> std::io::Result<VersionstampOffset> {
308-
let t = (CLIENT, ACTOR, self.client_id);
309-
t.pack(w, tuple_depth)
316+
let mut offset = VersionstampOffset::None { size: 0 };
317+
318+
let t = (CLIENT, ACTOR);
319+
offset += t.pack(w, tuple_depth)?;
320+
321+
if let Some(client_id) = &self.client_id {
322+
offset += client_id.pack(w, tuple_depth)?;
323+
}
324+
325+
Ok(offset)
310326
}
311327
}
312328

packages/edge/services/pegboard/src/metrics.rs

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,4 +37,18 @@ lazy_static::lazy_static! {
3737
BUCKETS.to_vec(),
3838
*REGISTRY,
3939
).unwrap();
40+
41+
pub static ref ENV_CPU_USAGE: IntGaugeVec = register_int_gauge_vec_with_registry!(
42+
"pegboard_env_cpu_usage",
43+
"Total percent of CPU (per core) used by an environment.",
44+
&["env_id", "flavor"],
45+
*REGISTRY,
46+
).unwrap();
47+
48+
pub static ref ENV_MEMORY_USAGE: IntGaugeVec = register_int_gauge_vec_with_registry!(
49+
"pegboard_env_memory_usage",
50+
"Total MiB of memory used by an environment.",
51+
&["env_id", "flavor"],
52+
*REGISTRY,
53+
).unwrap();
4054
}
Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,2 +1 @@
11
pub mod update_allocation_idx;
2-
pub mod usage_get;

packages/edge/services/pegboard/src/ops/client/usage_get.rs

Lines changed: 0 additions & 162 deletions
This file was deleted.

packages/edge/services/pegboard/src/workflows/client/mod.rs

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -106,7 +106,7 @@ pub async fn pegboard_client(ctx: &mut WorkflowCtx, input: &Input) -> GlobalResu
106106
config,
107107
system,
108108
}),
109-
activity(UpdateMetricsInput { client_id, flavor }),
109+
activity(UpdateMetricsInput { client_id, flavor, clear: false }),
110110
))
111111
.await?;
112112
}
@@ -118,7 +118,7 @@ pub async fn pegboard_client(ctx: &mut WorkflowCtx, input: &Input) -> GlobalResu
118118
client_id,
119119
events: events.clone(),
120120
}),
121-
activity(UpdateMetricsInput { client_id, flavor }),
121+
activity(UpdateMetricsInput { client_id, flavor, clear: false }),
122122
))
123123
.await?;
124124

@@ -243,6 +243,8 @@ pub async fn pegboard_client(ctx: &mut WorkflowCtx, input: &Input) -> GlobalResu
243243
})
244244
.await?;
245245

246+
ctx.activity(UpdateMetricsInput { client_id: input.client_id, flavor: input.flavor, clear: true }).await?;
247+
246248
let actors = ctx
247249
.activity(FetchRemainingActorsInput {
248250
client_id: input.client_id,
@@ -673,7 +675,7 @@ pub async fn handle_commands(
673675
activity(InsertCommandsInput {
674676
commands: raw_commands.clone(),
675677
}),
676-
activity(UpdateMetricsInput { client_id, flavor }),
678+
activity(UpdateMetricsInput { client_id, flavor, clear: false }),
677679
))
678680
.await?;
679681

@@ -914,11 +916,14 @@ async fn check_expired(ctx: &ActivityCtx, input: &CheckExpiredInput) -> GlobalRe
914916
struct UpdateMetricsInput {
915917
client_id: Uuid,
916918
flavor: ClientFlavor,
919+
clear: bool,
917920
}
918921

919922
#[activity(UpdateMetrics)]
920923
async fn update_metrics(ctx: &ActivityCtx, input: &UpdateMetricsInput) -> GlobalResult<()> {
921-
let (memory, cpu) =
924+
let (memory, cpu) = if input.clear {
925+
(0, 0)
926+
} else {
922927
ctx.fdb()
923928
.await?
924929
.run(|tx, _mc| async move {
@@ -966,7 +971,8 @@ async fn update_metrics(ctx: &ActivityCtx, input: &UpdateMetricsInput) -> Global
966971
))
967972
})
968973
.custom_instrument(tracing::info_span!("client_update_metrics_tx"))
969-
.await?;
974+
.await?
975+
};
970976

971977
metrics::CLIENT_CPU_ALLOCATED
972978
.with_label_values(&[&input.client_id.to_string(), &input.flavor.to_string()])

0 commit comments

Comments
 (0)