11use std:: collections:: HashMap ;
22
33use api_helper:: { anchor:: WatchIndexQuery , ctx:: Ctx } ;
4+ use chirp_workflow:: prelude:: * ;
45use futures_util:: { FutureExt , StreamExt , TryStreamExt } ;
56use rivet_api:: models;
67use rivet_convert:: { ApiInto , ApiTryInto } ;
7- use rivet_operation:: prelude:: * ;
88use serde:: Deserialize ;
99use serde_json:: json;
1010use util:: serde:: AsHashableExt ;
@@ -152,14 +152,14 @@ pub async fn create(
152152 }
153153 } ;
154154
155- let allocated_fut = if network. wait_ready . unwrap_or_default ( ) {
155+ let created_fut = if network. wait_ready . unwrap_or_default ( ) {
156156 std:: future:: pending ( ) . boxed ( )
157157 } else {
158- let mut allocated_sub = ctx
159- . subscribe :: < pegboard:: workflows:: actor:: Allocated > ( ( "actor_id" , actor_id) )
158+ let mut created_sub = ctx
159+ . subscribe :: < pegboard:: workflows:: actor:: CreateComplete > ( ( "actor_id" , actor_id) )
160160 . await ?;
161161
162- async move { allocated_sub . next ( ) . await } . boxed ( )
162+ async move { created_sub . next ( ) . await } . boxed ( )
163163 } ;
164164 let mut ready_sub = ctx
165165 . subscribe :: < pegboard:: workflows:: actor:: Ready > ( ( "actor_id" , actor_id) )
@@ -239,9 +239,9 @@ pub async fn create(
239239 . tag ( "actor_id" , actor_id)
240240 . dispatch ( )
241241 . await ?;
242- // Wait for allocated /ready, fail, or destroy
242+ // Wait for create /ready, fail, or destroy
243243 tokio:: select! {
244- res = allocated_fut => { res?; } ,
244+ res = created_fut => { res?; } ,
245245 res = ready_sub. next( ) => { res?; } ,
246246 res = fail_sub. next( ) => {
247247 let msg = res?;
@@ -258,14 +258,14 @@ pub async fn create(
258258 let actor_id = util:: Id :: new_v1 ( ctx. config ( ) . server ( ) ?. rivet . edge ( ) ?. datacenter_label ( ) ) ;
259259 tracing:: info!( ?actor_id, ?tags, "creating actor with tags" ) ;
260260
261- let allocated_fut = if network. wait_ready . unwrap_or_default ( ) {
261+ let created_fut = if network. wait_ready . unwrap_or_default ( ) {
262262 std:: future:: pending ( ) . boxed ( )
263263 } else {
264- let mut allocated_sub = ctx
265- . subscribe :: < pegboard:: workflows:: actor2:: Allocated > ( ( "actor_id" , actor_id) )
264+ let mut created_sub = ctx
265+ . subscribe :: < pegboard:: workflows:: actor2:: CreateComplete > ( ( "actor_id" , actor_id) )
266266 . await ?;
267267
268- async move { allocated_sub . next ( ) . await } . boxed ( )
268+ async move { created_sub . next ( ) . await } . boxed ( )
269269 } ;
270270 let mut ready_sub = ctx
271271 . subscribe :: < pegboard:: workflows:: actor2:: Ready > ( ( "actor_id" , actor_id) )
@@ -348,7 +348,7 @@ pub async fn create(
348348
349349 // Wait for create/ready, fail, or destroy
350350 tokio:: select! {
351- res = allocated_fut => { res?; } ,
351+ res = created_fut => { res?; } ,
352352 res = ready_sub. next( ) => { res?; } ,
353353 res = fail_sub. next( ) => {
354354 let msg = res?;
@@ -425,6 +425,9 @@ pub async fn destroy(
425425 ) ;
426426
427427 let mut sub = ctx
428+ . subscribe :: < pegboard:: workflows:: actor2:: DestroyStarted > ( ( "actor_id" , actor_id) )
429+ . await ?;
430+ let mut old_sub = ctx
428431 . subscribe :: < pegboard:: workflows:: actor:: DestroyStarted > ( ( "actor_id" , actor_id) )
429432 . await ?;
430433
@@ -436,15 +439,33 @@ pub async fn destroy(
436439 return Ok ( json ! ( { } ) ) ;
437440 }
438441
439- ctx. signal ( pegboard:: workflows:: actor:: Destroy {
440- override_kill_timeout_ms : query. override_kill_timeout ,
441- } )
442- . to_workflow :: < pegboard:: workflows:: actor:: Workflow > ( )
443- . tag ( "actor_id" , actor_id)
444- . send ( )
445- . await ?;
442+ // Try actor2 first
443+ let res = ctx
444+ . signal ( pegboard:: workflows:: actor2:: Destroy {
445+ override_kill_timeout_ms : query. override_kill_timeout ,
446+ } )
447+ . to_workflow :: < pegboard:: workflows:: actor2:: Workflow > ( )
448+ . tag ( "actor_id" , actor_id)
449+ . send ( )
450+ . await ;
451+
452+ if let Some ( WorkflowError :: WorkflowNotFound ) = res. as_workflow_error ( ) {
453+ // Try old actors
454+ ctx
455+ . signal ( pegboard:: workflows:: actor:: Destroy {
456+ override_kill_timeout_ms : query. override_kill_timeout ,
457+ } )
458+ . to_workflow :: < pegboard:: workflows:: actor:: Workflow > ( )
459+ . tag ( "actor_id" , actor_id)
460+ . send ( )
461+ . await ?;
462+
463+ old_sub. next ( ) . await ?;
464+ } else {
465+ res?;
446466
447- sub. next ( ) . await ?;
467+ sub. next ( ) . await ?;
468+ }
448469
449470 Ok ( json ! ( { } ) )
450471}
@@ -481,21 +502,29 @@ pub async fn upgrade(
481502 )
482503 . await ?;
483504
484- // TODO: Add back once we figure out how to cleanly handle if a wf is already complete when
485- // upgrading
486- // let mut sub = ctx
487- // .subscribe::<pegboard::workflows::actor::UpgradeStarted>(("actor_id", actor_id))
488- // .await?;
489-
490- ctx. signal ( pegboard:: workflows:: actor:: Upgrade {
491- image_id : build. build_id ,
492- } )
493- . to_workflow :: < pegboard:: workflows:: actor:: Workflow > ( )
494- . tag ( "actor_id" , actor_id)
495- . send ( )
496- . await ?;
497-
498- // sub.next().await?;
505+ // Try actor2 first
506+ let res = ctx
507+ . signal ( pegboard:: workflows:: actor2:: Upgrade {
508+ image_id : build. build_id ,
509+ } )
510+ . to_workflow :: < pegboard:: workflows:: actor2:: Workflow > ( )
511+ . tag ( "actor_id" , actor_id)
512+ . send ( )
513+ . await ;
514+
515+ if let Some ( WorkflowError :: WorkflowNotFound ) = res. as_workflow_error ( ) {
516+ // Try old actors
517+ ctx
518+ . signal ( pegboard:: workflows:: actor:: Upgrade {
519+ image_id : build. build_id ,
520+ } )
521+ . to_workflow :: < pegboard:: workflows:: actor:: Workflow > ( )
522+ . tag ( "actor_id" , actor_id)
523+ . send ( )
524+ . await ?;
525+ } else {
526+ res?;
527+ }
499528
500529 Ok ( json ! ( { } ) )
501530}
@@ -589,35 +618,42 @@ pub async fn upgrade_all(
589618 // cursor of [created_at, actor_id] that we pass to the fdb range
590619 created_before = list_res. actors . last ( ) . map ( |x| x. create_ts - 1 ) ;
591620
592- // TODO: Add back once we figure out how to cleanly handle if a wf is already complete when
593- // upgrading
594- // let subs = futures_util::stream::iter(list_res.actor_ids.clone())
595- // .map(|actor_id| {
596- // ctx.subscribe::<pegboard::workflows::actor::UpgradeStarted>(("actor_id", actor_id))
597- // })
598- // .buffer_unordered(32)
599- // .try_collect::<Vec<_>>()
600- // .await?;
601-
621+ let ctx = ( * ctx) . clone ( ) ;
602622 futures_util:: stream:: iter ( list_res. actors )
603623 . map ( |actor| {
604- ctx. signal ( pegboard:: workflows:: actor:: Upgrade {
605- image_id : build. build_id ,
606- } )
607- . to_workflow :: < pegboard:: workflows:: actor:: Workflow > ( )
608- . tag ( "actor_id" , actor. actor_id )
609- . send ( )
624+ let ctx = ctx. clone ( ) ;
625+ async move {
626+ // Try actor2 first
627+ let res = ctx
628+ . signal ( pegboard:: workflows:: actor2:: Upgrade {
629+ image_id : build. build_id ,
630+ } )
631+ . to_workflow :: < pegboard:: workflows:: actor2:: Workflow > ( )
632+ . tag ( "actor_id" , actor. actor_id )
633+ . send ( )
634+ . await ;
635+
636+ if let Some ( WorkflowError :: WorkflowNotFound ) = res. as_workflow_error ( ) {
637+ // Try old actors
638+ ctx
639+ . signal ( pegboard:: workflows:: actor:: Upgrade {
640+ image_id : build. build_id ,
641+ } )
642+ . to_workflow :: < pegboard:: workflows:: actor:: Workflow > ( )
643+ . tag ( "actor_id" , actor. actor_id )
644+ . send ( )
645+ . await ?;
646+ } else {
647+ res?;
648+ }
649+
650+ GlobalResult :: Ok ( ( ) )
651+ }
610652 } )
611653 . buffer_unordered ( 32 )
612654 . try_collect :: < Vec < _ > > ( )
613655 . await ?;
614656
615- // futures_util::stream::iter(subs)
616- // .map(|mut sub| async move { sub.next().await })
617- // .buffer_unordered(32)
618- // .try_collect::<Vec<_>>()
619- // .await?;
620-
621657 if count < 10_000 {
622658 break ;
623659 }
0 commit comments