Skip to content

Commit 3493a1d

Browse files
committed
fix(workflows): fix race condition of workflow waking before commit (#2748)
<!-- Please make sure there is an issue that this PR is correlated to. --> ## Changes <!-- If there are frontend changes, please include screenshots. --> <!-- This is an auto-generated comment: release notes by coderabbit.ai --> ## Summary by CodeRabbit * **New Features** * Enhanced CLI output to display workflow and signal timestamps with millisecond precision. * CLI now shows workflow and signal tags and workflow IDs in a formatted, readable manner. * **Improvements** * Grafana traces dashboard updated: simplified queries, improved trace table with error indicators, reorganized panels, and enhanced trace details. * Logs and error panels in the dashboard were restructured for better visibility and usability. * Worker shutdown process improved to accurately reflect remaining workflows at start. * Workflow processing logic updated to ensure timely worker wake-up and avoid race conditions. * **Bug Fixes** * Reduced the threshold for logging long signal receive lag, enabling earlier detection of delays. * **Chores** * Added a new error variant for SQL connection failures to improve error clarity. * Updated test structure: converted async test to sync with explicit runtime and removed obsolete test files. * Improved internal logging messages for metrics loop shutdown. * Minor comment and typo corrections for clarity. <!-- end of auto-generated comment: release notes by coderabbit.ai -->
1 parent 6705af9 commit 3493a1d

File tree

15 files changed

+367
-622
lines changed

15 files changed

+367
-622
lines changed

docker/dev-full/grafana/dashboards/traces.json

Lines changed: 271 additions & 365 deletions
Large diffs are not rendered by default.

docker/dev-full/otel-collector/config.yaml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,8 @@ exporters:
4848
initial_interval: 5s
4949
max_interval: 30s
5050
max_elapsed_time: 300s
51+
# debug:
52+
# verbosity: detailed
5153

5254
service:
5355
pipelines:

packages/common/chirp-workflow/core/src/ctx/listen.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -83,7 +83,7 @@ impl<'a> ListenCtx<'a> {
8383
.with_label_values(&[self.ctx.name(), &signal.signal_name])
8484
.observe(recv_lag);
8585

86-
if recv_lag > 15.0 {
86+
if recv_lag > 3.0 {
8787
// We print an error here so the trace of this workflow does not get dropped
8888
tracing::error!(
8989
?recv_lag,

packages/common/chirp-workflow/core/src/db/crdb_nats/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,7 @@ impl DatabaseCrdbNats {
5656
Ok(conn)
5757
} else {
5858
// Create a new connection
59-
self.pool.acquire().await.map_err(WorkflowError::Sqlx)
59+
self.pool.acquire().await.map_err(WorkflowError::ConnSqlx)
6060
}
6161
}
6262

packages/common/chirp-workflow/core/src/db/fdb_sqlite_nats/mod.rs

Lines changed: 32 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1548,7 +1548,8 @@ impl Database for DatabaseFdbSqliteNats {
15481548
// Evict databases before releasing lease
15491549
self.evict_wf_sqlite(workflow_id).await?;
15501550

1551-
self.pools
1551+
let wrote_to_wake_idx = self
1552+
.pools
15521553
.fdb()?
15531554
.run(|tx, _mc| {
15541555
async move {
@@ -1569,9 +1570,11 @@ impl Database for DatabaseFdbSqliteNats {
15691570
SERIALIZABLE,
15701571
);
15711572

1572-
let (_, tag_keys, wake_deadline_entry) = tokio::try_join!(
1573+
let (wrote_to_wake_idx, tag_keys, wake_deadline_entry) = tokio::try_join!(
15731574
// Check for other workflows waiting on this one, wake all
15741575
async {
1576+
let mut wrote_to_wake_idx = false;
1577+
15751578
while let Some(entry) = stream.try_next().await? {
15761579
let sub_workflow_wake_key = self
15771580
.subspace
@@ -1599,9 +1602,11 @@ impl Database for DatabaseFdbSqliteNats {
15991602

16001603
// Clear secondary index
16011604
tx.clear(entry.key());
1605+
1606+
wrote_to_wake_idx = true;
16021607
}
16031608

1604-
Result::<_, fdb::FdbBindingError>::Ok(())
1609+
Result::<_, fdb::FdbBindingError>::Ok(wrote_to_wake_idx)
16051610
},
16061611
// Read tags
16071612
async {
@@ -1695,13 +1700,16 @@ impl Database for DatabaseFdbSqliteNats {
16951700
keys::workflow::WorkerInstanceIdKey::new(workflow_id);
16961701
tx.clear(&self.subspace.pack(&worker_instance_id_key));
16971702

1698-
Ok(())
1703+
Ok(wrote_to_wake_idx)
16991704
}
17001705
})
17011706
.custom_instrument(tracing::info_span!("complete_workflows_tx"))
17021707
.await?;
17031708

1704-
self.wake_worker();
1709+
// Wake worker again in case some other workflow was waiting for this one to complete
1710+
if wrote_to_wake_idx {
1711+
self.wake_worker();
1712+
}
17051713

17061714
let dt = start_instant.elapsed().as_secs_f64();
17071715
metrics::COMPLETE_WORKFLOW_DURATION
@@ -1849,14 +1857,24 @@ impl Database for DatabaseFdbSqliteNats {
18491857
.custom_instrument(tracing::info_span!("commit_workflow_tx"))
18501858
.await?;
18511859

1852-
// Wake worker again if the deadline is before the next tick
1853-
if let Some(deadline_ts) = wake_deadline_ts {
1854-
if deadline_ts
1855-
<= rivet_util::timestamp::now() + self.worker_poll_interval().as_millis() as i64
1856-
{
1857-
self.wake_worker();
1858-
}
1859-
}
1860+
// Always wake the worker immediately again. This is an IMPORTANT implementation detail to prevent
1861+
// race conditions with workflow sleep. Imagine the scenario:
1862+
//
1863+
// 1. workflow is between user code and commit
1864+
// 2. worker reads wake condition for said workflow but cannot run it because it is already leased
1865+
// 3. workflow commits
1866+
//
1867+
// This will result in the workflow sleeping instead of immediately running again.
1868+
//
1869+
// Adding this wake_worker call ensures that if the workflow has a valid wake condition before commit
1870+
// then it will immediately wake up again.
1871+
//
1872+
// This is simpler than having this commit_workflow fn read wake conditions because:
1873+
// - the wake conditions are not indexed by wf id
1874+
// - would involve informing the worker to restart the workflow in memory instead of the usual
1875+
// workflow lifecycle
1876+
// - the worker is already designed to pull wake conditions frequently
1877+
self.wake_worker();
18601878

18611879
let dt = start_instant.elapsed().as_secs_f64();
18621880
metrics::COMMIT_WORKFLOW_DURATION
@@ -2266,7 +2284,7 @@ impl Database for DatabaseFdbSqliteNats {
22662284
let wake_signal_key =
22672285
keys::workflow::WakeSignalKey::new(workflow_id, signal_name.to_string());
22682286

2269-
// If the workflow is currently listening for this signal, wake it
2287+
// If the workflow currently has a wake signal key for this signal, wake it
22702288
if tx
22712289
.get(&self.subspace.pack(&wake_signal_key), SERIALIZABLE)
22722290
.await?

packages/common/chirp-workflow/core/src/error.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -144,6 +144,9 @@ pub enum WorkflowError {
144144
#[error("sql error: {0}")]
145145
Sqlx(#[from] sqlx::Error),
146146

147+
#[error("failed to create sql connection: {0}")]
148+
ConnSqlx(sqlx::Error),
149+
147150
#[error("fdb error: {0}")]
148151
Fdb(#[from] fdb::FdbBindingError),
149152

packages/common/chirp-workflow/core/src/worker.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -112,6 +112,10 @@ impl Worker {
112112

113113
#[tracing::instrument(skip_all)]
114114
async fn shutdown(mut self, mut sigterm: Signal) {
115+
// For an accurate count of how many remaining workflows there are, retain before checking length
116+
self.running_workflows
117+
.retain(|_, wf| !wf.handle.is_finished());
118+
115119
// Shutdown sequence
116120
tracing::info!(
117121
duration=?SHUTDOWN_DURATION,

packages/common/chirp-workflow/core/tests/common.rs

Lines changed: 0 additions & 16 deletions
This file was deleted.

packages/common/chirp-workflow/core/tests/integration.rs

Lines changed: 24 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,33 @@
11
use std::time::Duration;
22

33
use chirp_workflow::prelude::*;
4-
// use serde_json::json;
5-
// use uuid::Uuid;
64

7-
mod common;
8-
use common::*;
5+
#[test]
6+
fn fdb_sqlite_nats_driver() {
7+
// SAFETY: No other threads exist yet
8+
unsafe {
9+
std::env::set_var("RUST_LOG", "DEBUG");
10+
std::env::set_var("RUST_LOG_ANSI_COLOR", "1");
11+
std::env::set_var("RIVET_OTEL_ENABLED", "1");
12+
std::env::set_var("RIVET_OTEL_SAMPLER_RATIO", "1");
13+
std::env::set_var("RIVET_SERVICE_NAME", "test");
14+
std::env::set_var("RIVET_OTEL_ENDPOINT", "http://127.0.0.1:4317");
15+
}
916

10-
#[tokio::test(flavor = "multi_thread")]
11-
async fn fdb_sqlite_nats_driver() {
12-
setup_tracing();
17+
// Build runtime
18+
chirp_workflow::prelude::__rivet_runtime::run(
19+
chirp_workflow::prelude::tracing::Instrument::instrument(
20+
async move {
21+
tracing::info!("test starting");
22+
fdb_sqlite_nats_driver_inner().await;
23+
tracing::info!("test finished");
24+
},
25+
chirp_workflow::prelude::tracing::info_span!("fdb_sqlite_nats_driver"),
26+
),
27+
);
28+
}
1329

30+
async fn fdb_sqlite_nats_driver_inner() {
1431
let ctx = chirp_workflow::prelude::TestCtx::from_env::<db::DatabaseFdbSqliteNats>(
1532
"fdb_sqlite_nats_driver",
1633
true,
@@ -62,8 +79,6 @@ async fn fdb_sqlite_nats_driver() {
6279

6380
mod def {
6481
use chirp_workflow::prelude::*;
65-
use futures_util::FutureExt;
66-
use sqlx::Acquire;
6782

6883
#[derive(Debug, Serialize, Deserialize)]
6984
pub struct Input {}

0 commit comments

Comments
 (0)