Skip to content

Commit 6763cf0

Browse files
committed
feat: add actor queue
1 parent c73f767 commit 6763cf0

File tree

23 files changed

+3710
-952
lines changed

23 files changed

+3710
-952
lines changed

Cargo.lock

Lines changed: 19 additions & 390 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

docker/dev-full/grafana/dashboards/pegboard.json

Lines changed: 2082 additions & 0 deletions
Large diffs are not rendered by default.

examples/system-test-actor/src/managerClient.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -119,6 +119,7 @@ function encodeFrame(payload: any): Buffer {
119119
payloadLength.writeUInt32BE(json.length, 0);
120120

121121
const header = Buffer.alloc(4); // All zeros for now
122+
122123
return Buffer.concat([payloadLength, header, Buffer.from(json)]);
123124
}
124125

packages/common/fdb-util/src/keys.rs

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -41,17 +41,19 @@ pub const ENV: usize = 39;
4141
pub const PORT: usize = 40;
4242
pub const INGRESS: usize = 41;
4343
pub const PROXIED: usize = 42;
44-
pub const CLIENTS_BY_REMAINING_MEM: usize = 43;
44+
pub const CLIENT_BY_REMAINING_MEM: usize = 43;
4545
pub const SQLITE: usize = 44;
4646
pub const INTERNAL: usize = 45;
4747
pub const METADATA: usize = 46;
4848
pub const COMPRESSED_DATA: usize = 47;
4949
pub const RUNNER: usize = 48;
50-
pub const RUNNERS_BY_REMAINING_SLOTS: usize = 49;
50+
pub const RUNNER_BY_REMAINING_SLOTS: usize = 49;
5151
pub const REMAINING_SLOTS: usize = 50;
5252
pub const TOTAL_SLOTS: usize = 51;
5353
pub const IMAGE_ID: usize = 52;
5454
pub const ACTOR2: usize = 53;
55+
pub const PENDING_ACTOR: usize = 54;
56+
pub const PENDING_ACTOR_BY_IMAGE_ID: usize = 55;
5557

5658
// Directories with fdbrs must use string paths instead of tuples
5759
pub mod dir {
@@ -105,17 +107,19 @@ pub fn key_from_str(key: &str) -> Option<usize> {
105107
"port" => Some(PORT),
106108
"ingress" => Some(INGRESS),
107109
"proxied" => Some(PROXIED),
108-
"clients_by_remaining_mem" => Some(CLIENTS_BY_REMAINING_MEM),
110+
"client_by_remaining_mem" => Some(CLIENT_BY_REMAINING_MEM),
109111
"sqlite" => Some(SQLITE),
110112
"internal" => Some(INTERNAL),
111113
"metadata" => Some(METADATA),
112114
"compressed_data" => Some(COMPRESSED_DATA),
113115
"runner" => Some(RUNNER),
114-
"runners_by_remaining_slots" => Some(RUNNERS_BY_REMAINING_SLOTS),
116+
"runner_by_remaining_slots" => Some(RUNNER_BY_REMAINING_SLOTS),
115117
"remaining_slots" => Some(REMAINING_SLOTS),
116118
"total_slots" => Some(TOTAL_SLOTS),
117119
"image_id" => Some(IMAGE_ID),
118120
"actor2" => Some(ACTOR2),
121+
"pending_actor" => Some(PENDING_ACTOR),
122+
"pending_actor_by_image_id" => Some(PENDING_ACTOR_BY_IMAGE_ID),
119123
_ => None,
120124
}
121125
}

packages/common/server-cli/src/util/fdb.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,8 @@ impl SimpleTupleValue {
3636
SimpleTupleValue::F64(v)
3737
} else if let Ok(v) = Uuid::from_str(value) {
3838
SimpleTupleValue::Uuid(v)
39+
} else if let Ok(v) = rivet_util::Id::from_str(value) {
40+
SimpleTupleValue::Id(v)
3941
} else {
4042
SimpleTupleValue::String(unescape(value))
4143
}

packages/core/services/build/src/ops/create.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ pub struct Input {
1919
pub kind: BuildKind,
2020
pub compression: BuildCompression,
2121
pub allocation_type: BuildAllocationType,
22-
pub allocation_total_slots: u64,
22+
pub allocation_total_slots: u32,
2323
pub resources: Option<BuildResources>,
2424
}
2525

packages/core/services/build/src/types.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@ pub struct Build {
5151
pub kind: BuildKind,
5252
pub compression: BuildCompression,
5353
pub allocation_type: BuildAllocationType,
54-
pub allocation_total_slots: u64,
54+
pub allocation_total_slots: u32,
5555
pub resources: Option<BuildResources>,
5656
pub tags: HashMap<String, String>,
5757
}

packages/edge/api/actor/src/route/actors.rs

Lines changed: 93 additions & 57 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,10 @@
11
use std::collections::HashMap;
22

33
use api_helper::{anchor::WatchIndexQuery, ctx::Ctx};
4+
use chirp_workflow::prelude::*;
45
use futures_util::{FutureExt, StreamExt, TryStreamExt};
56
use rivet_api::models;
67
use rivet_convert::{ApiInto, ApiTryInto};
7-
use rivet_operation::prelude::*;
88
use serde::Deserialize;
99
use serde_json::json;
1010
use util::serde::AsHashableExt;
@@ -152,14 +152,14 @@ pub async fn create(
152152
}
153153
};
154154

155-
let allocated_fut = if network.wait_ready.unwrap_or_default() {
155+
let created_fut = if network.wait_ready.unwrap_or_default() {
156156
std::future::pending().boxed()
157157
} else {
158-
let mut allocated_sub = ctx
159-
.subscribe::<pegboard::workflows::actor::Allocated>(("actor_id", actor_id))
158+
let mut created_sub = ctx
159+
.subscribe::<pegboard::workflows::actor::CreateComplete>(("actor_id", actor_id))
160160
.await?;
161161

162-
async move { allocated_sub.next().await }.boxed()
162+
async move { created_sub.next().await }.boxed()
163163
};
164164
let mut ready_sub = ctx
165165
.subscribe::<pegboard::workflows::actor::Ready>(("actor_id", actor_id))
@@ -239,9 +239,9 @@ pub async fn create(
239239
.tag("actor_id", actor_id)
240240
.dispatch()
241241
.await?;
242-
// Wait for allocated/ready, fail, or destroy
242+
// Wait for create/ready, fail, or destroy
243243
tokio::select! {
244-
res = allocated_fut => { res?; },
244+
res = created_fut => { res?; },
245245
res = ready_sub.next() => { res?; },
246246
res = fail_sub.next() => {
247247
let msg = res?;
@@ -258,14 +258,14 @@ pub async fn create(
258258
let actor_id = util::Id::new_v1(ctx.config().server()?.rivet.edge()?.datacenter_label());
259259
tracing::info!(?actor_id, ?tags, "creating actor with tags");
260260

261-
let allocated_fut = if network.wait_ready.unwrap_or_default() {
261+
let created_fut = if network.wait_ready.unwrap_or_default() {
262262
std::future::pending().boxed()
263263
} else {
264-
let mut allocated_sub = ctx
265-
.subscribe::<pegboard::workflows::actor2::Allocated>(("actor_id", actor_id))
264+
let mut created_sub = ctx
265+
.subscribe::<pegboard::workflows::actor2::CreateComplete>(("actor_id", actor_id))
266266
.await?;
267267

268-
async move { allocated_sub.next().await }.boxed()
268+
async move { created_sub.next().await }.boxed()
269269
};
270270
let mut ready_sub = ctx
271271
.subscribe::<pegboard::workflows::actor2::Ready>(("actor_id", actor_id))
@@ -348,7 +348,7 @@ pub async fn create(
348348

349349
// Wait for create/ready, fail, or destroy
350350
tokio::select! {
351-
res = allocated_fut => { res?; },
351+
res = created_fut => { res?; },
352352
res = ready_sub.next() => { res?; },
353353
res = fail_sub.next() => {
354354
let msg = res?;
@@ -425,6 +425,9 @@ pub async fn destroy(
425425
);
426426

427427
let mut sub = ctx
428+
.subscribe::<pegboard::workflows::actor2::DestroyStarted>(("actor_id", actor_id))
429+
.await?;
430+
let mut old_sub = ctx
428431
.subscribe::<pegboard::workflows::actor::DestroyStarted>(("actor_id", actor_id))
429432
.await?;
430433

@@ -436,15 +439,33 @@ pub async fn destroy(
436439
return Ok(json!({}));
437440
}
438441

439-
ctx.signal(pegboard::workflows::actor::Destroy {
440-
override_kill_timeout_ms: query.override_kill_timeout,
441-
})
442-
.to_workflow::<pegboard::workflows::actor::Workflow>()
443-
.tag("actor_id", actor_id)
444-
.send()
445-
.await?;
442+
// Try actor2 first
443+
let res = ctx
444+
.signal(pegboard::workflows::actor2::Destroy {
445+
override_kill_timeout_ms: query.override_kill_timeout,
446+
})
447+
.to_workflow::<pegboard::workflows::actor2::Workflow>()
448+
.tag("actor_id", actor_id)
449+
.send()
450+
.await;
451+
452+
if let Some(WorkflowError::WorkflowNotFound) = res.as_workflow_error() {
453+
// Try old actors
454+
ctx
455+
.signal(pegboard::workflows::actor::Destroy {
456+
override_kill_timeout_ms: query.override_kill_timeout,
457+
})
458+
.to_workflow::<pegboard::workflows::actor::Workflow>()
459+
.tag("actor_id", actor_id)
460+
.send()
461+
.await?;
462+
463+
old_sub.next().await?;
464+
} else {
465+
res?;
446466

447-
sub.next().await?;
467+
sub.next().await?;
468+
}
448469

449470
Ok(json!({}))
450471
}
@@ -481,21 +502,29 @@ pub async fn upgrade(
481502
)
482503
.await?;
483504

484-
// TODO: Add back once we figure out how to cleanly handle if a wf is already complete when
485-
// upgrading
486-
// let mut sub = ctx
487-
// .subscribe::<pegboard::workflows::actor::UpgradeStarted>(("actor_id", actor_id))
488-
// .await?;
489-
490-
ctx.signal(pegboard::workflows::actor::Upgrade {
491-
image_id: build.build_id,
492-
})
493-
.to_workflow::<pegboard::workflows::actor::Workflow>()
494-
.tag("actor_id", actor_id)
495-
.send()
496-
.await?;
497-
498-
// sub.next().await?;
505+
// Try actor2 first
506+
let res = ctx
507+
.signal(pegboard::workflows::actor2::Upgrade {
508+
image_id: build.build_id,
509+
})
510+
.to_workflow::<pegboard::workflows::actor2::Workflow>()
511+
.tag("actor_id", actor_id)
512+
.send()
513+
.await;
514+
515+
if let Some(WorkflowError::WorkflowNotFound) = res.as_workflow_error() {
516+
// Try old actors
517+
ctx
518+
.signal(pegboard::workflows::actor::Upgrade {
519+
image_id: build.build_id,
520+
})
521+
.to_workflow::<pegboard::workflows::actor::Workflow>()
522+
.tag("actor_id", actor_id)
523+
.send()
524+
.await?;
525+
} else {
526+
res?;
527+
}
499528

500529
Ok(json!({}))
501530
}
@@ -589,35 +618,42 @@ pub async fn upgrade_all(
589618
// cursor of [created_at, actor_id] that we pass to the fdb range
590619
created_before = list_res.actors.last().map(|x| x.create_ts - 1);
591620

592-
// TODO: Add back once we figure out how to cleanly handle if a wf is already complete when
593-
// upgrading
594-
// let subs = futures_util::stream::iter(list_res.actor_ids.clone())
595-
// .map(|actor_id| {
596-
// ctx.subscribe::<pegboard::workflows::actor::UpgradeStarted>(("actor_id", actor_id))
597-
// })
598-
// .buffer_unordered(32)
599-
// .try_collect::<Vec<_>>()
600-
// .await?;
601-
621+
let ctx = (*ctx).clone();
602622
futures_util::stream::iter(list_res.actors)
603623
.map(|actor| {
604-
ctx.signal(pegboard::workflows::actor::Upgrade {
605-
image_id: build.build_id,
606-
})
607-
.to_workflow::<pegboard::workflows::actor::Workflow>()
608-
.tag("actor_id", actor.actor_id)
609-
.send()
624+
let ctx = ctx.clone();
625+
async move {
626+
// Try actor2 first
627+
let res = ctx
628+
.signal(pegboard::workflows::actor2::Upgrade {
629+
image_id: build.build_id,
630+
})
631+
.to_workflow::<pegboard::workflows::actor2::Workflow>()
632+
.tag("actor_id", actor.actor_id)
633+
.send()
634+
.await;
635+
636+
if let Some(WorkflowError::WorkflowNotFound) = res.as_workflow_error() {
637+
// Try old actors
638+
ctx
639+
.signal(pegboard::workflows::actor::Upgrade {
640+
image_id: build.build_id,
641+
})
642+
.to_workflow::<pegboard::workflows::actor::Workflow>()
643+
.tag("actor_id", actor.actor_id)
644+
.send()
645+
.await?;
646+
} else {
647+
res?;
648+
}
649+
650+
GlobalResult::Ok(())
651+
}
610652
})
611653
.buffer_unordered(32)
612654
.try_collect::<Vec<_>>()
613655
.await?;
614656

615-
// futures_util::stream::iter(subs)
616-
// .map(|mut sub| async move { sub.next().await })
617-
// .buffer_unordered(32)
618-
// .try_collect::<Vec<_>>()
619-
// .await?;
620-
621657
if count < 10_000 {
622658
break;
623659
}

packages/edge/infra/client/manager/src/main.rs

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -128,6 +128,12 @@ async fn init() -> Result<Init> {
128128
),
129129
};
130130

131+
// SAFETY: No other task has spawned yet.
132+
// Set client_id env var so it can be read by the prometheus registry
133+
unsafe {
134+
std::env::set_var("CLIENT_ID", config.client.cluster.client_id.to_string());
135+
}
136+
131137
if config.client.logs.redirect_logs() {
132138
rivet_logs::Logs::new(
133139
config.client.data_dir().join("logs"),
@@ -137,12 +143,6 @@ async fn init() -> Result<Init> {
137143
.await?;
138144
}
139145

140-
// SAFETY: No other task has spawned yet.
141-
// Set client_id env var so it can be read by the prometheus registry
142-
unsafe {
143-
std::env::set_var("CLIENT_ID", config.client.cluster.client_id.to_string());
144-
}
145-
146146
// Read system metrics
147147
let system = crate::system_info::fetch().await?;
148148

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
ALTER TABLE actors
2+
DROP COLUMN runner_id;

0 commit comments

Comments
 (0)