Skip to content

Commit ba62978

Browse files
MasterPtatoabcxff
authored andcommitted
feat: db sh for workflows
1 parent 549dc3f commit ba62978

File tree

12 files changed

+400
-52
lines changed

12 files changed

+400
-52
lines changed

packages/common/chirp-workflow/core/src/db/fdb_sqlite_nats/debug.rs

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ use uuid::Uuid;
1717

1818
use super::{
1919
keys,
20-
sqlite::{db_name_internal, SqlStub},
20+
sqlite::SqlStub,
2121
DatabaseFdbSqliteNats,
2222
};
2323
use crate::{
@@ -602,10 +602,10 @@ impl DatabaseDebug for DatabaseFdbSqliteNats {
602602

603603
for key in sub_workflow_wake_keys {
604604
tracing::warn!(
605-
"workflow {} is being waited on by sub workflow {}, silencing anyway",
606-
key.workflow_id,
607-
key.sub_workflow_id
608-
);
605+
"workflow {} is being waited on by sub workflow {}, silencing anyway",
606+
key.workflow_id,
607+
key.sub_workflow_id
608+
);
609609
}
610610

611611
for key in tag_keys {
@@ -741,7 +741,7 @@ impl DatabaseDebug for DatabaseFdbSqliteNats {
741741
) -> Result<Option<HistoryData>> {
742742
let pool = &self
743743
.pools
744-
.sqlite(db_name_internal(workflow_id), true)
744+
.sqlite(crate::db::sqlite_db_name_internal(workflow_id), true)
745745
.await?;
746746

747747
let (wf_data, event_rows, error_rows) = tokio::try_join!(

packages/common/chirp-workflow/core/src/db/fdb_sqlite_nats/mod.rs

Lines changed: 17 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -142,7 +142,7 @@ impl DatabaseFdbSqliteNats {
142142
self.pools
143143
.sqlite_manager()
144144
.evict(vec![
145-
sqlite::db_name_internal(workflow_id),
145+
crate::db::sqlite_db_name_internal(workflow_id),
146146
crate::db::sqlite_db_name_data(workflow_id),
147147
])
148148
.await?;
@@ -1277,7 +1277,7 @@ impl Database for DatabaseFdbSqliteNats {
12771277
async move {
12781278
let pool = &self
12791279
.pools
1280-
.sqlite(sqlite::db_name_internal(partial.workflow_id), false)
1280+
.sqlite(crate::db::sqlite_db_name_internal(partial.workflow_id), false)
12811281
.await?;
12821282

12831283
// Handle error during sqlite init
@@ -1876,7 +1876,7 @@ impl Database for DatabaseFdbSqliteNats {
18761876
) -> WorkflowResult<Option<SignalData>> {
18771877
let pool = &self
18781878
.pools
1879-
.sqlite(sqlite::db_name_internal(workflow_id), false)
1879+
.sqlite(crate::db::sqlite_db_name_internal(workflow_id), false)
18801880
.await?;
18811881

18821882
let owned_filter = filter
@@ -2323,7 +2323,7 @@ impl Database for DatabaseFdbSqliteNats {
23232323
) -> WorkflowResult<()> {
23242324
let pool = &self
23252325
.pools
2326-
.sqlite(sqlite::db_name_internal(from_workflow_id), false)
2326+
.sqlite(crate::db::sqlite_db_name_internal(from_workflow_id), false)
23272327
.await?;
23282328

23292329
// Insert history event
@@ -2351,7 +2351,7 @@ impl Database for DatabaseFdbSqliteNats {
23512351
.sqlite_manager()
23522352
.flush(
23532353
vec![
2354-
sqlite::db_name_internal(from_workflow_id),
2354+
crate::db::sqlite_db_name_internal(from_workflow_id),
23552355
crate::db::sqlite_db_name_data(from_workflow_id),
23562356
],
23572357
false,
@@ -2413,7 +2413,7 @@ impl Database for DatabaseFdbSqliteNats {
24132413
) -> WorkflowResult<Uuid> {
24142414
let pool = &self
24152415
.pools
2416-
.sqlite(sqlite::db_name_internal(workflow_id), false)
2416+
.sqlite(crate::db::sqlite_db_name_internal(workflow_id), false)
24172417
.await?;
24182418

24192419
// Insert history event
@@ -2448,7 +2448,7 @@ impl Database for DatabaseFdbSqliteNats {
24482448
.sqlite_manager()
24492449
.flush(
24502450
vec![
2451-
sqlite::db_name_internal(workflow_id),
2451+
crate::db::sqlite_db_name_internal(workflow_id),
24522452
crate::db::sqlite_db_name_data(workflow_id),
24532453
],
24542454
false,
@@ -2596,7 +2596,7 @@ impl Database for DatabaseFdbSqliteNats {
25962596
) -> WorkflowResult<()> {
25972597
let pool = &self
25982598
.pools
2599-
.sqlite(sqlite::db_name_internal(workflow_id), false)
2599+
.sqlite(crate::db::sqlite_db_name_internal(workflow_id), false)
26002600
.await?;
26012601
let input_hash = event_id.input_hash.to_be_bytes();
26022602

@@ -2702,7 +2702,7 @@ impl Database for DatabaseFdbSqliteNats {
27022702
.sqlite_manager()
27032703
.flush(
27042704
vec![
2705-
sqlite::db_name_internal(from_workflow_id),
2705+
crate::db::sqlite_db_name_internal(from_workflow_id),
27062706
crate::db::sqlite_db_name_data(from_workflow_id),
27072707
],
27082708
false,
@@ -2711,7 +2711,7 @@ impl Database for DatabaseFdbSqliteNats {
27112711

27122712
let pool = &self
27132713
.pools
2714-
.sqlite(sqlite::db_name_internal(from_workflow_id), false)
2714+
.sqlite(crate::db::sqlite_db_name_internal(from_workflow_id), false)
27152715
.await?;
27162716

27172717
sql_execute!(
@@ -2751,7 +2751,7 @@ impl Database for DatabaseFdbSqliteNats {
27512751
) -> WorkflowResult<()> {
27522752
let pool = &self
27532753
.pools
2754-
.sqlite(sqlite::db_name_internal(workflow_id), false)
2754+
.sqlite(crate::db::sqlite_db_name_internal(workflow_id), false)
27552755
.await?;
27562756

27572757
self.txn(|| async {
@@ -3040,7 +3040,7 @@ impl Database for DatabaseFdbSqliteNats {
30403040
) -> WorkflowResult<()> {
30413041
let pool = &self
30423042
.pools
3043-
.sqlite(sqlite::db_name_internal(from_workflow_id), false)
3043+
.sqlite(crate::db::sqlite_db_name_internal(from_workflow_id), false)
30443044
.await?;
30453045

30463046
sql_execute!(
@@ -3072,7 +3072,7 @@ impl Database for DatabaseFdbSqliteNats {
30723072
) -> WorkflowResult<()> {
30733073
let pool = &self
30743074
.pools
3075-
.sqlite(sqlite::db_name_internal(from_workflow_id), false)
3075+
.sqlite(crate::db::sqlite_db_name_internal(from_workflow_id), false)
30763076
.await?;
30773077

30783078
sql_execute!(
@@ -3100,7 +3100,7 @@ impl Database for DatabaseFdbSqliteNats {
31003100
) -> WorkflowResult<()> {
31013101
let pool = &self
31023102
.pools
3103-
.sqlite(sqlite::db_name_internal(from_workflow_id), false)
3103+
.sqlite(crate::db::sqlite_db_name_internal(from_workflow_id), false)
31043104
.await?;
31053105

31063106
sql_execute!(
@@ -3132,7 +3132,7 @@ impl Database for DatabaseFdbSqliteNats {
31323132
) -> WorkflowResult<()> {
31333133
let pool = &self
31343134
.pools
3135-
.sqlite(sqlite::db_name_internal(from_workflow_id), false)
3135+
.sqlite(crate::db::sqlite_db_name_internal(from_workflow_id), false)
31363136
.await?;
31373137

31383138
sql_execute!(
@@ -3164,7 +3164,7 @@ impl Database for DatabaseFdbSqliteNats {
31643164
) -> WorkflowResult<()> {
31653165
let pool = &self
31663166
.pools
3167-
.sqlite(sqlite::db_name_internal(from_workflow_id), false)
3167+
.sqlite(crate::db::sqlite_db_name_internal(from_workflow_id), false)
31683168
.await?;
31693169

31703170
sql_execute!(
@@ -3220,7 +3220,7 @@ async fn flush_handler(pools: rivet_pools::Pools, mut flush_rx: mpsc::UnboundedR
32203220
.sqlite_manager()
32213221
.flush(
32223222
vec![
3223-
sqlite::db_name_internal(workflow_id),
3223+
crate::db::sqlite_db_name_internal(workflow_id),
32243224
crate::db::sqlite_db_name_data(workflow_id),
32253225
],
32263226
true,

packages/common/chirp-workflow/core/src/db/fdb_sqlite_nats/sqlite/mod.rs

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
11
use std::collections::HashMap;
22

3-
use fdb_util::keys::*;
43
use include_dir::{include_dir, Dir, File};
54
use indoc::indoc;
65
use rivet_pools::prelude::*;
@@ -407,8 +406,3 @@ pub fn build_history(
407406

408407
Ok(events_by_location)
409408
}
410-
411-
/// Database name for the workflow internal state.
412-
pub fn db_name_internal(workflow_id: Uuid) -> (usize, Uuid, usize) {
413-
(WORKFLOW, workflow_id, INTERNAL)
414-
}

packages/common/chirp-workflow/core/src/db/mod.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -343,3 +343,8 @@ pub struct SignalData {
343343
pub fn sqlite_db_name_data(workflow_id: Uuid) -> (usize, Uuid, usize) {
344344
(WORKFLOW, workflow_id, DATA)
345345
}
346+
347+
/// Database name for the workflow internal state.
348+
pub fn sqlite_db_name_internal(workflow_id: Uuid) -> (usize, Uuid, usize) {
349+
(WORKFLOW, workflow_id, INTERNAL)
350+
}

packages/common/pools/src/db/sqlite/mod.rs

Lines changed: 23 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ use uuid::Uuid;
2828

2929
use crate::{metrics, Error, FdbPool};
3030

31-
mod keys;
31+
pub mod keys;
3232

3333
#[cfg(test)]
3434
mod tests;
@@ -146,7 +146,7 @@ impl SqliteWriterEntry {
146146
/// DB key in packed form. This is not the full FDB key, this is the DB name segment in DbDataKey.
147147
///
148148
/// Stored in an `Arc` since this is frequently copied around.
149-
type KeyPacked = Arc<Vec<u8>>;
149+
pub type KeyPacked = Arc<Vec<u8>>;
150150

151151
pub type SqlitePoolManagerHandle = Arc<SqlitePoolManager>;
152152
pub type SqlitePoolManagerHandleWeak = Weak<SqlitePoolManager>;
@@ -293,22 +293,15 @@ impl SqlitePoolManager {
293293

294294
// MARK: Private helpers
295295
impl SqlitePoolManager {
296-
fn db_info(&self, key_packed: &KeyPacked) -> (PathBuf, String) {
296+
fn db_path(&self, key_packed: &KeyPacked) -> PathBuf {
297297
let hex_key_str = hex::encode(&**key_packed);
298298

299299
match &self.storage {
300-
SqliteStorage::Local { path } => {
301-
// Determine the persistent location of this database
302-
let db_path = path.join(format!("{hex_key_str}.db"));
303-
let db_url = format!("sqlite://{}", db_path.display());
304-
(db_path, db_url)
305-
}
300+
// Determine the persistent location of this database
301+
SqliteStorage::Local { path } => path.join(format!("{hex_key_str}.db")),
302+
// Generate temporary file location so multiple readers don't clobber each other
306303
SqliteStorage::FoundationDb { path } => {
307-
// Generate temporary file location so multiple readers don't clobber each other
308-
let db_path =
309-
path.join(format!("rivet-sqlite-{hex_key_str}-{}.db", Uuid::new_v4()));
310-
let db_url = format!("sqlite://{}", db_path.display());
311-
(db_path, db_url)
304+
path.join(format!("rivet-sqlite-{hex_key_str}-{}.db", Uuid::new_v4()))
312305
}
313306
}
314307
}
@@ -344,7 +337,7 @@ impl SqlitePoolManager {
344337
}
345338
}
346339
},
347-
clear_db_files(&self.storage, self.db_info(&key_packed).0),
340+
clear_db_files(&self.storage, self.db_path(&key_packed)),
348341
);
349342
}
350343
}
@@ -746,13 +739,13 @@ pub struct SqlitePoolInner {
746739
}
747740

748741
impl SqlitePoolInner {
749-
#[tracing::instrument(name = "sqlite_pool_new", skip_all)]
750742
async fn new(
751743
key_packed: KeyPacked,
752744
conn_type: SqliteConnType,
753745
manager: SqlitePoolManagerHandle,
754746
) -> Result<SqlitePool, Error> {
755-
let (db_path, db_url) = manager.db_info(&key_packed);
747+
let db_path = manager.db_path(&key_packed);
748+
let db_url = format!("sqlite://{}", db_path.display());
756749

757750
// Load database
758751
match &manager.storage {
@@ -893,6 +886,7 @@ impl SqlitePoolInner {
893886
}
894887

895888
impl SqlitePoolInner {
889+
// TODO: Doesn't need a result type
896890
#[tracing::instrument(name = "sqlite_pool_snapshot", skip_all)]
897891
pub async fn snapshot(&self, vacuum: bool) -> GlobalResult<bool> {
898892
match self
@@ -910,9 +904,21 @@ impl SqlitePoolInner {
910904
}
911905
}
912906
}
907+
908+
#[tracing::instrument(name = "sqlite_pool_evict", skip_all)]
909+
pub async fn evict(&self) -> GlobalResult<()> {
910+
self
911+
.manager
912+
.evict_with_key(&[self.key_packed.clone()])
913+
.await
914+
}
913915
}
914916

915917
impl SqlitePoolInner {
918+
pub fn db_path(&self) -> &Path {
919+
&self.db_path
920+
}
921+
916922
#[tracing::instrument(skip_all)]
917923
pub async fn conn(&self) -> Result<PoolConnection<Sqlite>, sqlx::Error> {
918924
// Attempt to use an existing connection

packages/common/server-cli/Cargo.toml

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,13 +11,15 @@ chirp-workflow.workspace = true
1111
chrono = "0.4.38"
1212
clap = { version = "4.3", features = ["derive"] }
1313
colored_json = "5.0.0"
14+
fdb-util.workspace = true
15+
foundationdb.workspace = true
1416
futures-util = "0.3"
1517
global-error.workspace = true
1618
hex.workspace = true
1719
include_dir = "0.7.4"
1820
indoc = "2.0.5"
21+
lz4_flex = "0.11.3"
1922
reqwest = "0.12.9"
20-
foundationdb.workspace = true
2123
rivet-api.workspace = true
2224
rivet-config.workspace = true
2325
rivet-logs.workspace = true

packages/common/server-cli/src/commands/db/mod.rs

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,10 @@ pub enum DatabaseType {
2929
Redis,
3030
#[clap(alias = "ch")]
3131
Clickhouse,
32+
#[clap(alias = "wfd")]
33+
WorkflowData,
34+
#[clap(alias = "wfi")]
35+
WorkflowInternal,
3236
}
3337

3438
impl SubCommand {
@@ -56,6 +60,12 @@ impl SubCommand {
5660
DatabaseType::Clickhouse => {
5761
crate::util::db::clickhouse_shell(config, shell_ctx).await?
5862
}
63+
DatabaseType::WorkflowData => {
64+
crate::util::db::wf_sqlite_shell(config, shell_ctx, false).await?
65+
}
66+
DatabaseType::WorkflowInternal => {
67+
crate::util::db::wf_sqlite_shell(config, shell_ctx, true).await?
68+
}
5969
}
6070

6171
Ok(())

0 commit comments

Comments
 (0)