Skip to content

Commit ec71a65

Browse files
committed
fix(pegboard): revise actor rescheduling algorithm, add client metrics
1 parent e53a26f commit ec71a65

File tree

4 files changed

+94
-48
lines changed

4 files changed

+94
-48
lines changed

packages/edge/services/pegboard/src/metrics.rs

Lines changed: 23 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -8,9 +8,16 @@ lazy_static::lazy_static! {
88
*REGISTRY
99
).unwrap();
1010

11-
pub static ref CLIENT_CPU_ALLOCATED: IntGaugeVec = register_int_gauge_vec_with_registry!(
12-
"pegboard_client_cpu_allocated",
13-
"Total millicores of cpu allocated on a client.",
11+
pub static ref CLIENT_MEMORY_TOTAL: IntGaugeVec = register_int_gauge_vec_with_registry!(
12+
"pegboard_client_memory_total",
13+
"Total MiB of memory available on a client.",
14+
&["client_id", "flavor"],
15+
*REGISTRY
16+
).unwrap();
17+
18+
pub static ref CLIENT_CPU_TOTAL: IntGaugeVec = register_int_gauge_vec_with_registry!(
19+
"pegboard_client_cpu_total",
20+
"Total millicores of cpu available on a client.",
1421
&["client_id", "flavor"],
1522
*REGISTRY
1623
).unwrap();
@@ -22,6 +29,13 @@ lazy_static::lazy_static! {
2229
*REGISTRY
2330
).unwrap();
2431

32+
pub static ref CLIENT_CPU_ALLOCATED: IntGaugeVec = register_int_gauge_vec_with_registry!(
33+
"pegboard_client_cpu_allocated",
34+
"Total millicores of cpu allocated on a client.",
35+
&["client_id", "flavor"],
36+
*REGISTRY
37+
).unwrap();
38+
2539
pub static ref ACTOR_ALLOCATE_DURATION: HistogramVec = register_histogram_vec_with_registry!(
2640
"pegboard_actor_allocate_duration",
2741
"Total duration to reserve resources for an actor.",
@@ -38,16 +52,16 @@ lazy_static::lazy_static! {
3852
*REGISTRY,
3953
).unwrap();
4054

41-
pub static ref ENV_CPU_USAGE: IntGaugeVec = register_int_gauge_vec_with_registry!(
42-
"pegboard_env_cpu_usage",
43-
"Total millicores used by an environment.",
55+
pub static ref ENV_MEMORY_USAGE: IntGaugeVec = register_int_gauge_vec_with_registry!(
56+
"pegboard_env_memory_usage",
57+
"Total MiB of memory used by an environment.",
4458
&["env_id", "flavor"],
4559
*REGISTRY,
4660
).unwrap();
4761

48-
pub static ref ENV_MEMORY_USAGE: IntGaugeVec = register_int_gauge_vec_with_registry!(
49-
"pegboard_env_memory_usage",
50-
"Total MiB of memory used by an environment.",
62+
pub static ref ENV_CPU_USAGE: IntGaugeVec = register_int_gauge_vec_with_registry!(
63+
"pegboard_env_cpu_usage",
64+
"Total millicores used by an environment.",
5165
&["env_id", "flavor"],
5266
*REGISTRY,
5367
).unwrap();

packages/edge/services/pegboard/src/workflows/actor/mod.rs

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,9 @@ const ACTOR_START_THRESHOLD_MS: i64 = util::duration::seconds(30);
2626
const ACTOR_STOP_THRESHOLD_MS: i64 = util::duration::seconds(30);
2727
/// How long to wait after stopped and not receiving an exit state before setting actor as lost.
2828
const ACTOR_EXIT_THRESHOLD_MS: i64 = util::duration::seconds(5);
29+
/// How long an actor goes without retries before it's retry count is reset to 0, effectively resetting its
30+
/// backoff to 0.
31+
const RETRY_RESET_DURATION_MS: i64 = util::duration::minutes(10);
2932

3033
#[derive(Clone, Debug, Serialize, Deserialize)]
3134
pub struct Input {
@@ -124,9 +127,7 @@ pub async fn pegboard_actor(ctx: &mut WorkflowCtx, input: &Input) -> GlobalResul
124127
.send()
125128
.await?;
126129

127-
let Some((client_id, client_workflow_id)) =
128-
runtime::spawn_actor(ctx, input, &initial_actor_setup, 0).await?
129-
else {
130+
let Some(res) = runtime::spawn_actor(ctx, input, &initial_actor_setup, 0).await? else {
130131
ctx.msg(Failed {
131132
message: "Failed to allocate (no availability).".into(),
132133
})
@@ -147,7 +148,7 @@ pub async fn pegboard_actor(ctx: &mut WorkflowCtx, input: &Input) -> GlobalResul
147148

148149
let state_res = ctx
149150
.loope(
150-
runtime::State::new(client_id, client_workflow_id, input.image_id),
151+
runtime::State::new(res.client_id, res.client_workflow_id, input.image_id),
151152
|ctx, state| {
152153
let input = input.clone();
153154

@@ -229,7 +230,7 @@ pub async fn pegboard_actor(ctx: &mut WorkflowCtx, input: &Input) -> GlobalResul
229230

230231
ctx.activity(runtime::UpdateFdbInput {
231232
actor_id: input.actor_id,
232-
client_id,
233+
client_id: state.client_id,
233234
state: sig.state.clone(),
234235
})
235236
.await?;

packages/edge/services/pegboard/src/workflows/actor/runtime.rs

Lines changed: 33 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ use util::serde::AsHashableExt;
1414
use super::{
1515
destroy::{self, KillCtx},
1616
setup, Destroy, Input, ACTOR_START_THRESHOLD_MS, BASE_RETRY_TIMEOUT_MS,
17+
RETRY_RESET_DURATION_MS,
1718
};
1819
use crate::{
1920
keys, metrics,
@@ -26,11 +27,16 @@ use crate::{
2627
#[derive(Deserialize, Serialize)]
2728
pub struct State {
2829
pub generation: u32,
30+
2931
pub client_id: Uuid,
3032
pub client_workflow_id: Uuid,
3133
pub image_id: Option<Uuid>,
34+
3235
pub drain_timeout_ts: Option<i64>,
3336
pub gc_timeout_ts: Option<i64>,
37+
38+
#[serde(default)]
39+
reschedule_state: RescheduleState,
3440
}
3541

3642
impl State {
@@ -42,6 +48,7 @@ impl State {
4248
image_id: Some(image_id),
4349
drain_timeout_ts: None,
4450
gc_timeout_ts: Some(util::timestamp::now() + ACTOR_START_THRESHOLD_MS),
51+
reschedule_state: RescheduleState::default(),
4552
}
4653
}
4754
}
@@ -51,6 +58,12 @@ pub struct StateRes {
5158
pub kill: Option<KillCtx>,
5259
}
5360

61+
#[derive(Serialize, Deserialize, Clone, Default)]
62+
struct RescheduleState {
63+
last_retry_ts: i64,
64+
retry_count: usize,
65+
}
66+
5467
#[derive(Debug, Serialize, Deserialize, Hash)]
5568
struct UpdateClientInput {
5669
client_id: Uuid,
@@ -224,9 +237,9 @@ struct AllocateActorInputV2 {
224237
}
225238

226239
#[derive(Debug, Serialize, Deserialize)]
227-
struct AllocateActorOutputV2 {
228-
client_id: Uuid,
229-
client_workflow_id: Uuid,
240+
pub struct AllocateActorOutputV2 {
241+
pub client_id: Uuid,
242+
pub client_workflow_id: Uuid,
230243
}
231244

232245
#[activity(AllocateActorV2)]
@@ -611,7 +624,7 @@ pub async fn spawn_actor(
611624
input: &Input,
612625
actor_setup: &setup::ActorSetupCtx,
613626
generation: u32,
614-
) -> GlobalResult<Option<(Uuid, Uuid)>> {
627+
) -> GlobalResult<Option<AllocateActorOutputV2>> {
615628
let res = match ctx.check_version(2).await? {
616629
1 => {
617630
ctx.activity(AllocateActorInputV1 {
@@ -744,7 +757,7 @@ pub async fn spawn_actor(
744757
.send()
745758
.await?;
746759

747-
Ok(Some((res.client_id, res.client_workflow_id)))
760+
Ok(Some(res))
748761
}
749762

750763
pub async fn reschedule_actor(
@@ -769,7 +782,7 @@ pub async fn reschedule_actor(
769782

770783
// Waits for the actor to be ready (or destroyed) and automatically retries if failed to allocate.
771784
let res = ctx
772-
.loope(RescheduleState::default(), |ctx, state| {
785+
.loope(state.reschedule_state.clone(), |ctx, state| {
773786
let input = input.clone();
774787
let actor_setup = actor_setup.clone();
775788

@@ -778,14 +791,13 @@ pub async fn reschedule_actor(
778791
let mut backoff =
779792
util::Backoff::new_at(8, None, BASE_RETRY_TIMEOUT_MS, 500, state.retry_count);
780793

781-
// If the last retry ts is more than 2 * backoff ago, reset retry count to 0
794+
// If the last retry ts is more than RETRY_RESET_DURATION_MS, reset retry count to 0
782795
let now = util::timestamp::now();
783-
state.retry_count =
784-
if state.last_retry_ts < now - i64::try_from(2 * backoff.current_duration())? {
785-
0
786-
} else {
787-
state.retry_count + 1
788-
};
796+
state.retry_count = if state.last_retry_ts < now - RETRY_RESET_DURATION_MS {
797+
0
798+
} else {
799+
state.retry_count + 1
800+
};
789801
state.last_retry_ts = now;
790802

791803
// Don't sleep for first retry
@@ -797,14 +809,14 @@ pub async fn reschedule_actor(
797809
.listen_with_timeout::<Destroy>(Instant::from(next) - Instant::now())
798810
.await?
799811
{
800-
tracing::debug!("destroying before actor start");
812+
tracing::debug!("destroying before actor reschedule");
801813

802814
return Ok(Loop::Break(Err(sig)));
803815
}
804816
}
805817

806818
if let Some(res) = spawn_actor(ctx, &input, &actor_setup, next_generation).await? {
807-
Ok(Loop::Break(Ok(res)))
819+
Ok(Loop::Break(Ok((state.clone(), res))))
808820
} else {
809821
tracing::debug!(actor_id=?input.actor_id, "failed to reschedule actor, retrying");
810822

@@ -817,10 +829,13 @@ pub async fn reschedule_actor(
817829

818830
// Update loop state
819831
match res {
820-
Ok((client_id, client_workflow_id)) => {
832+
Ok((reschedule_state, res)) => {
821833
state.generation = next_generation;
822-
state.client_id = client_id;
823-
state.client_workflow_id = client_workflow_id;
834+
state.client_id = res.client_id;
835+
state.client_workflow_id = res.client_workflow_id;
836+
837+
// Save reschedule state in global state
838+
state.reschedule_state = reschedule_state;
824839

825840
// Reset gc timeout once allocated
826841
state.gc_timeout_ts = Some(util::timestamp::now() + ACTOR_START_THRESHOLD_MS);
@@ -831,12 +846,6 @@ pub async fn reschedule_actor(
831846
}
832847
}
833848

834-
#[derive(Serialize, Deserialize, Default)]
835-
struct RescheduleState {
836-
last_retry_ts: i64,
837-
retry_count: usize,
838-
}
839-
840849
#[derive(Debug, Serialize, Deserialize, Hash)]
841850
struct ClearPortsAndResourcesInput {
842851
actor_id: Uuid,

packages/edge/services/pegboard/src/workflows/client/mod.rs

Lines changed: 32 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -938,9 +938,19 @@ struct UpdateMetricsInput {
938938

939939
#[activity(UpdateMetrics)]
940940
async fn update_metrics(ctx: &ActivityCtx, input: &UpdateMetricsInput) -> GlobalResult<()> {
941-
let (memory, cpu) = if input.clear {
942-
(0, 0)
943-
} else {
941+
if input.clear {
942+
metrics::CLIENT_MEMORY_ALLOCATED
943+
.with_label_values(&[&input.client_id.to_string(), &input.flavor.to_string()])
944+
.set(0);
945+
946+
metrics::CLIENT_CPU_ALLOCATED
947+
.with_label_values(&[&input.client_id.to_string(), &input.flavor.to_string()])
948+
.set(0);
949+
950+
return Ok(());
951+
}
952+
953+
let (total_mem, total_cpu, remaining_mem, remaining_cpu) =
944954
ctx.fdb()
945955
.await?
946956
.run(|tx, _mc| async move {
@@ -983,21 +993,33 @@ async fn update_metrics(ctx: &ActivityCtx, input: &UpdateMetricsInput) -> Global
983993
.map_err(|x| fdb::FdbBindingError::CustomError(x.into()))?;
984994

985995
Ok((
986-
total_mem.saturating_sub(remaining_mem),
987-
total_cpu.saturating_sub(remaining_cpu),
996+
total_mem,
997+
remaining_mem,
998+
total_cpu,
999+
remaining_cpu,
9881000
))
9891001
})
9901002
.custom_instrument(tracing::info_span!("client_update_metrics_tx"))
991-
.await?
992-
};
1003+
.await?;
9931004

994-
metrics::CLIENT_CPU_ALLOCATED
1005+
metrics::CLIENT_MEMORY_TOTAL
1006+
.with_label_values(&[&input.client_id.to_string(), &input.flavor.to_string()])
1007+
.set(total_mem.try_into()?);
1008+
1009+
metrics::CLIENT_CPU_TOTAL
9951010
.with_label_values(&[&input.client_id.to_string(), &input.flavor.to_string()])
996-
.set(cpu.try_into()?);
1011+
.set(total_cpu.try_into()?);
1012+
1013+
let allocated_mem = total_mem.saturating_sub(remaining_mem);
1014+
let allocated_cpu = total_cpu.saturating_sub(remaining_cpu);
9971015

9981016
metrics::CLIENT_MEMORY_ALLOCATED
9991017
.with_label_values(&[&input.client_id.to_string(), &input.flavor.to_string()])
1000-
.set(memory.try_into()?);
1018+
.set(allocated_mem.try_into()?);
1019+
1020+
metrics::CLIENT_CPU_ALLOCATED
1021+
.with_label_values(&[&input.client_id.to_string(), &input.flavor.to_string()])
1022+
.set(allocated_cpu.try_into()?);
10011023

10021024
Ok(())
10031025
}

0 commit comments

Comments
 (0)