Skip to content

Commit 1f02007

Browse files
MasterPtatoNathanFlurry
authored andcommitted
fix(workflows): fix signal publish/listen race condition
1 parent 5a3772a commit 1f02007

File tree

3 files changed

+26
-104
lines changed

3 files changed

+26
-104
lines changed

packages/common/chirp-workflow/core/src/db/fdb_sqlite_nats/mod.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1915,9 +1915,9 @@ impl Database for DatabaseFdbSqliteNats {
19151915
limit: Some(1),
19161916
..(&pending_signal_subspace).into()
19171917
},
1918-
// NOTE: This does not have to be SERIALIZABLE because the conflict occurs
1919-
// with acking which is a separate row. See below
1920-
SNAPSHOT,
1918+
// NOTE: This is serializable because any insert into this subspace
1919+
// should cause a conflict and retry of this txn
1920+
SERIALIZABLE,
19211921
)
19221922
})
19231923
.collect::<Vec<_>>();
@@ -2245,7 +2245,7 @@ impl Database for DatabaseFdbSqliteNats {
22452245
.map_err(|x| fdb::FdbBindingError::CustomError(x.into()))?,
22462246
);
22472247

2248-
// Write ray id ts
2248+
// Write ray id
22492249
let ray_id_key = keys::signal::RayIdKey::new(signal_id);
22502250
tx.set(
22512251
&self.subspace.pack(&ray_id_key),

packages/common/chirp-workflow/core/tests/integration.rs

Lines changed: 19 additions & 93 deletions
Original file line numberDiff line numberDiff line change
@@ -39,68 +39,23 @@ async fn fdb_sqlite_nats_driver() {
3939
// .await
4040
// .unwrap();
4141

42-
// let res = db
43-
// .find_workflow(
44-
// "workflow_name",
45-
// &json!({
46-
// "bald": "eagle",
47-
// "fat": "man"
48-
// }),
49-
// )
50-
// .await
51-
// .unwrap();
52-
// tracing::info!(?res);
53-
54-
// db.update_workflow_tags(
55-
// workflow_id,
56-
// "workflow_name",
57-
// &json!({
58-
// "bald": "eagle",
59-
// "fat": "man"
60-
// }),
61-
// )
62-
// .await
63-
// .unwrap();
42+
let workflow_id = ctx.workflow(def::Input { })
43+
.dispatch()
44+
.await
45+
.unwrap();
6446

65-
// let res = db
66-
// .find_workflow(
67-
// "workflow_name",
68-
// &json!({
69-
// "bald": "eagle",
70-
// "fat": "man"
71-
// }),
72-
// )
73-
// .await
74-
// .unwrap();
75-
// tracing::info!(?res);
76-
77-
if std::env::var("SPAWN_WF").unwrap_or_default() == "1" {
78-
for _ in 0..1 {
79-
let ctx2 = ctx.clone();
80-
tokio::spawn(async move {
81-
ctx2.workflow(def::Input {})
82-
.tag("foo", "bar")
83-
.dispatch()
84-
.await
85-
.unwrap();
86-
});
87-
}
88-
}
47+
let ctx2 = ctx.clone();
48+
tokio::spawn(async move {
49+
tokio::time::sleep(Duration::from_millis(110)).await;
8950

90-
// let ctx2 = ctx.clone();
91-
// tokio::spawn(async move {
92-
// for _ in 0..10 {
93-
// tokio::time::sleep(Duration::from_secs(2)).await;
94-
// ctx2.signal(def::MySignal {
95-
// test: Uuid::new_v4(),
96-
// })
97-
// .to_workflow::<def::Workflow>()
98-
// .tag("foo", "bar")
99-
// .send()
100-
// .await
101-
// .unwrap();
102-
// }
103-
// });
51+
ctx2.signal(def::MySignal {
52+
test: Uuid::new_v4(),
53+
})
54+
.to_workflow_id(workflow_id)
55+
.send()
56+
.await
57+
.unwrap();
58+
});
10459

10560
let worker = Worker::new(reg.clone(), db.clone());
10661

@@ -120,43 +75,14 @@ mod def {
12075
pub async fn test(ctx: &mut WorkflowCtx, input: &Input) -> GlobalResult<()> {
12176
tracing::info!(w=?ctx.workflow_id(), "hello from workflow");
12277

123-
ctx.activity(TestActivityInput {
124-
foo: "bar".to_string(),
125-
})
126-
.await?;
127-
128-
// let workflow_id = ctx.workflow_id();
129-
// ctx.signal(MySignal {
130-
// test: Uuid::new_v4(),
78+
// ctx.activity(TestActivityInput {
79+
// foo: "bar".to_string(),
13180
// })
132-
// .to_workflow_id(workflow_id)
133-
// .send()
13481
// .await?;
13582

136-
ctx.repeat(|ctx| {
137-
async move {
138-
let sig = ctx.listen_with_timeout::<MySignal>(5 * 1000).await?;
139-
tracing::info!(?sig);
140-
141-
let start = std::time::Instant::now();
142-
143-
ctx.activity(TestActivityInput {
144-
foo: "bar".to_string(),
145-
})
146-
.await?;
83+
let sig = ctx.listen::<MySignal>().await?;
14784

148-
ctx.activity(TestActivityInput {
149-
foo: "bar".to_string(),
150-
})
151-
.await?;
152-
153-
tracing::info!(dt=?start.elapsed(), "-------------");
154-
155-
Ok(Loop::<()>::Continue)
156-
}
157-
.boxed()
158-
})
159-
.await?;
85+
tracing::info!(?sig, "signal recv ------------------");
16086

16187
Ok(())
16288
}

packages/edge/services/pegboard/src/ops/actor/allocate_ingress_ports.rs

Lines changed: 3 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
use chirp_workflow::prelude::*;
2-
use fdb_util::{end_of_key_range, FormalKey, SNAPSHOT};
2+
use fdb_util::{end_of_key_range, FormalKey, SERIALIZABLE};
33
use foundationdb::{
44
self as fdb,
55
options::{ConflictRangeType, StreamingMode},
@@ -80,9 +80,7 @@ pub(crate) async fn pegboard_actor_allocate_ingress_ports(
8080
mode: StreamingMode::Iterator,
8181
..(start_key, end_key.clone()).into()
8282
},
83-
// NOTE: This is not SERIALIZABLE because we don't want to conflict with all of the keys,
84-
// just the one we choose
85-
SNAPSHOT,
83+
SERIALIZABLE,
8684
);
8785

8886
// Continue iterating over the same stream until all of the required ports are found
@@ -111,9 +109,7 @@ pub(crate) async fn pegboard_actor_allocate_ingress_ports(
111109
limit: Some(old_start as usize),
112110
..(start_key, end_key.clone()).into()
113111
},
114-
// NOTE: This is not SERIALIZABLE because we don't want to conflict
115-
// with all of the keys, just the one we choose
116-
SNAPSHOT,
112+
SERIALIZABLE,
117113
);
118114

119115
continue;

0 commit comments

Comments
 (0)