1
1
use std:: collections:: HashMap ;
2
2
3
3
use api_helper:: { anchor:: WatchIndexQuery , ctx:: Ctx } ;
4
+ use chirp_workflow:: prelude:: * ;
4
5
use futures_util:: { FutureExt , StreamExt , TryStreamExt } ;
5
6
use rivet_api:: models;
6
7
use rivet_convert:: { ApiInto , ApiTryInto } ;
7
- use rivet_operation:: prelude:: * ;
8
8
use serde:: Deserialize ;
9
9
use serde_json:: json;
10
10
use util:: serde:: AsHashableExt ;
@@ -152,14 +152,14 @@ pub async fn create(
152
152
}
153
153
} ;
154
154
155
- let allocated_fut = if network. wait_ready . unwrap_or_default ( ) {
155
+ let created_fut = if network. wait_ready . unwrap_or_default ( ) {
156
156
std:: future:: pending ( ) . boxed ( )
157
157
} else {
158
- let mut allocated_sub = ctx
159
- . subscribe :: < pegboard:: workflows:: actor:: Allocated > ( ( "actor_id" , actor_id) )
158
+ let mut created_sub = ctx
159
+ . subscribe :: < pegboard:: workflows:: actor:: CreateComplete > ( ( "actor_id" , actor_id) )
160
160
. await ?;
161
161
162
- async move { allocated_sub . next ( ) . await } . boxed ( )
162
+ async move { created_sub . next ( ) . await } . boxed ( )
163
163
} ;
164
164
let mut ready_sub = ctx
165
165
. subscribe :: < pegboard:: workflows:: actor:: Ready > ( ( "actor_id" , actor_id) )
@@ -239,9 +239,9 @@ pub async fn create(
239
239
. tag ( "actor_id" , actor_id)
240
240
. dispatch ( )
241
241
. await ?;
242
- // Wait for allocated /ready, fail, or destroy
242
+ // Wait for create /ready, fail, or destroy
243
243
tokio:: select! {
244
- res = allocated_fut => { res?; } ,
244
+ res = created_fut => { res?; } ,
245
245
res = ready_sub. next( ) => { res?; } ,
246
246
res = fail_sub. next( ) => {
247
247
let msg = res?;
@@ -258,14 +258,14 @@ pub async fn create(
258
258
let actor_id = util:: Id :: new_v1 ( ctx. config ( ) . server ( ) ?. rivet . edge ( ) ?. datacenter_label ( ) ) ;
259
259
tracing:: info!( ?actor_id, ?tags, "creating actor with tags" ) ;
260
260
261
- let allocated_fut = if network. wait_ready . unwrap_or_default ( ) {
261
+ let created_fut = if network. wait_ready . unwrap_or_default ( ) {
262
262
std:: future:: pending ( ) . boxed ( )
263
263
} else {
264
- let mut allocated_sub = ctx
265
- . subscribe :: < pegboard:: workflows:: actor2:: Allocated > ( ( "actor_id" , actor_id) )
264
+ let mut created_sub = ctx
265
+ . subscribe :: < pegboard:: workflows:: actor2:: CreateComplete > ( ( "actor_id" , actor_id) )
266
266
. await ?;
267
267
268
- async move { allocated_sub . next ( ) . await } . boxed ( )
268
+ async move { created_sub . next ( ) . await } . boxed ( )
269
269
} ;
270
270
let mut ready_sub = ctx
271
271
. subscribe :: < pegboard:: workflows:: actor2:: Ready > ( ( "actor_id" , actor_id) )
@@ -348,7 +348,7 @@ pub async fn create(
348
348
349
349
// Wait for create/ready, fail, or destroy
350
350
tokio:: select! {
351
- res = allocated_fut => { res?; } ,
351
+ res = created_fut => { res?; } ,
352
352
res = ready_sub. next( ) => { res?; } ,
353
353
res = fail_sub. next( ) => {
354
354
let msg = res?;
@@ -425,6 +425,9 @@ pub async fn destroy(
425
425
) ;
426
426
427
427
let mut sub = ctx
428
+ . subscribe :: < pegboard:: workflows:: actor2:: DestroyStarted > ( ( "actor_id" , actor_id) )
429
+ . await ?;
430
+ let mut old_sub = ctx
428
431
. subscribe :: < pegboard:: workflows:: actor:: DestroyStarted > ( ( "actor_id" , actor_id) )
429
432
. await ?;
430
433
@@ -436,15 +439,33 @@ pub async fn destroy(
436
439
return Ok ( json ! ( { } ) ) ;
437
440
}
438
441
439
- ctx. signal ( pegboard:: workflows:: actor:: Destroy {
440
- override_kill_timeout_ms : query. override_kill_timeout ,
441
- } )
442
- . to_workflow :: < pegboard:: workflows:: actor:: Workflow > ( )
443
- . tag ( "actor_id" , actor_id)
444
- . send ( )
445
- . await ?;
442
+ // Try actor2 first
443
+ let res = ctx
444
+ . signal ( pegboard:: workflows:: actor2:: Destroy {
445
+ override_kill_timeout_ms : query. override_kill_timeout ,
446
+ } )
447
+ . to_workflow :: < pegboard:: workflows:: actor2:: Workflow > ( )
448
+ . tag ( "actor_id" , actor_id)
449
+ . send ( )
450
+ . await ;
451
+
452
+ if let Some ( WorkflowError :: WorkflowNotFound ) = res. as_workflow_error ( ) {
453
+ // Try old actors
454
+ ctx
455
+ . signal ( pegboard:: workflows:: actor:: Destroy {
456
+ override_kill_timeout_ms : query. override_kill_timeout ,
457
+ } )
458
+ . to_workflow :: < pegboard:: workflows:: actor:: Workflow > ( )
459
+ . tag ( "actor_id" , actor_id)
460
+ . send ( )
461
+ . await ?;
462
+
463
+ old_sub. next ( ) . await ?;
464
+ } else {
465
+ res?;
446
466
447
- sub. next ( ) . await ?;
467
+ sub. next ( ) . await ?;
468
+ }
448
469
449
470
Ok ( json ! ( { } ) )
450
471
}
@@ -481,21 +502,29 @@ pub async fn upgrade(
481
502
)
482
503
. await ?;
483
504
484
- // TODO: Add back once we figure out how to cleanly handle if a wf is already complete when
485
- // upgrading
486
- // let mut sub = ctx
487
- // .subscribe::<pegboard::workflows::actor::UpgradeStarted>(("actor_id", actor_id))
488
- // .await?;
489
-
490
- ctx. signal ( pegboard:: workflows:: actor:: Upgrade {
491
- image_id : build. build_id ,
492
- } )
493
- . to_workflow :: < pegboard:: workflows:: actor:: Workflow > ( )
494
- . tag ( "actor_id" , actor_id)
495
- . send ( )
496
- . await ?;
497
-
498
- // sub.next().await?;
505
+ // Try actor2 first
506
+ let res = ctx
507
+ . signal ( pegboard:: workflows:: actor2:: Upgrade {
508
+ image_id : build. build_id ,
509
+ } )
510
+ . to_workflow :: < pegboard:: workflows:: actor2:: Workflow > ( )
511
+ . tag ( "actor_id" , actor_id)
512
+ . send ( )
513
+ . await ;
514
+
515
+ if let Some ( WorkflowError :: WorkflowNotFound ) = res. as_workflow_error ( ) {
516
+ // Try old actors
517
+ ctx
518
+ . signal ( pegboard:: workflows:: actor:: Upgrade {
519
+ image_id : build. build_id ,
520
+ } )
521
+ . to_workflow :: < pegboard:: workflows:: actor:: Workflow > ( )
522
+ . tag ( "actor_id" , actor_id)
523
+ . send ( )
524
+ . await ?;
525
+ } else {
526
+ res?;
527
+ }
499
528
500
529
Ok ( json ! ( { } ) )
501
530
}
@@ -589,35 +618,42 @@ pub async fn upgrade_all(
589
618
// cursor of [created_at, actor_id] that we pass to the fdb range
590
619
created_before = list_res. actors . last ( ) . map ( |x| x. create_ts - 1 ) ;
591
620
592
- // TODO: Add back once we figure out how to cleanly handle if a wf is already complete when
593
- // upgrading
594
- // let subs = futures_util::stream::iter(list_res.actor_ids.clone())
595
- // .map(|actor_id| {
596
- // ctx.subscribe::<pegboard::workflows::actor::UpgradeStarted>(("actor_id", actor_id))
597
- // })
598
- // .buffer_unordered(32)
599
- // .try_collect::<Vec<_>>()
600
- // .await?;
601
-
621
+ let ctx = ( * ctx) . clone ( ) ;
602
622
futures_util:: stream:: iter ( list_res. actors )
603
623
. map ( |actor| {
604
- ctx. signal ( pegboard:: workflows:: actor:: Upgrade {
605
- image_id : build. build_id ,
606
- } )
607
- . to_workflow :: < pegboard:: workflows:: actor:: Workflow > ( )
608
- . tag ( "actor_id" , actor. actor_id )
609
- . send ( )
624
+ let ctx = ctx. clone ( ) ;
625
+ async move {
626
+ // Try actor2 first
627
+ let res = ctx
628
+ . signal ( pegboard:: workflows:: actor2:: Upgrade {
629
+ image_id : build. build_id ,
630
+ } )
631
+ . to_workflow :: < pegboard:: workflows:: actor2:: Workflow > ( )
632
+ . tag ( "actor_id" , actor. actor_id )
633
+ . send ( )
634
+ . await ;
635
+
636
+ if let Some ( WorkflowError :: WorkflowNotFound ) = res. as_workflow_error ( ) {
637
+ // Try old actors
638
+ ctx
639
+ . signal ( pegboard:: workflows:: actor:: Upgrade {
640
+ image_id : build. build_id ,
641
+ } )
642
+ . to_workflow :: < pegboard:: workflows:: actor:: Workflow > ( )
643
+ . tag ( "actor_id" , actor. actor_id )
644
+ . send ( )
645
+ . await ?;
646
+ } else {
647
+ res?;
648
+ }
649
+
650
+ GlobalResult :: Ok ( ( ) )
651
+ }
610
652
} )
611
653
. buffer_unordered ( 32 )
612
654
. try_collect :: < Vec < _ > > ( )
613
655
. await ?;
614
656
615
- // futures_util::stream::iter(subs)
616
- // .map(|mut sub| async move { sub.next().await })
617
- // .buffer_unordered(32)
618
- // .try_collect::<Vec<_>>()
619
- // .await?;
620
-
621
657
if count < 10_000 {
622
658
break ;
623
659
}
0 commit comments