Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use uuid::Uuid;

use super::{
keys,
sqlite::{db_name_internal, SqlStub},
sqlite::SqlStub,
DatabaseFdbSqliteNats,
};
use crate::{
Expand Down Expand Up @@ -602,10 +602,10 @@ impl DatabaseDebug for DatabaseFdbSqliteNats {

for key in sub_workflow_wake_keys {
tracing::warn!(
"workflow {} is being waited on by sub workflow {}, silencing anyway",
key.workflow_id,
key.sub_workflow_id
);
"workflow {} is being waited on by sub workflow {}, silencing anyway",
key.workflow_id,
key.sub_workflow_id
);
}

for key in tag_keys {
Expand Down Expand Up @@ -741,7 +741,7 @@ impl DatabaseDebug for DatabaseFdbSqliteNats {
) -> Result<Option<HistoryData>> {
let pool = &self
.pools
.sqlite(db_name_internal(workflow_id), true)
.sqlite(crate::db::sqlite_db_name_internal(workflow_id), true)
.await?;

let (wf_data, event_rows, error_rows) = tokio::try_join!(
Expand Down
34 changes: 17 additions & 17 deletions packages/common/chirp-workflow/core/src/db/fdb_sqlite_nats/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ impl DatabaseFdbSqliteNats {
self.pools
.sqlite_manager()
.evict(vec![
sqlite::db_name_internal(workflow_id),
crate::db::sqlite_db_name_internal(workflow_id),
crate::db::sqlite_db_name_data(workflow_id),
])
.await?;
Expand Down Expand Up @@ -1277,7 +1277,7 @@ impl Database for DatabaseFdbSqliteNats {
async move {
let pool = &self
.pools
.sqlite(sqlite::db_name_internal(partial.workflow_id), false)
.sqlite(crate::db::sqlite_db_name_internal(partial.workflow_id), false)
.await?;

// Handle error during sqlite init
Expand Down Expand Up @@ -1876,7 +1876,7 @@ impl Database for DatabaseFdbSqliteNats {
) -> WorkflowResult<Option<SignalData>> {
let pool = &self
.pools
.sqlite(sqlite::db_name_internal(workflow_id), false)
.sqlite(crate::db::sqlite_db_name_internal(workflow_id), false)
.await?;

let owned_filter = filter
Expand Down Expand Up @@ -2323,7 +2323,7 @@ impl Database for DatabaseFdbSqliteNats {
) -> WorkflowResult<()> {
let pool = &self
.pools
.sqlite(sqlite::db_name_internal(from_workflow_id), false)
.sqlite(crate::db::sqlite_db_name_internal(from_workflow_id), false)
.await?;

// Insert history event
Expand Down Expand Up @@ -2351,7 +2351,7 @@ impl Database for DatabaseFdbSqliteNats {
.sqlite_manager()
.flush(
vec![
sqlite::db_name_internal(from_workflow_id),
crate::db::sqlite_db_name_internal(from_workflow_id),
crate::db::sqlite_db_name_data(from_workflow_id),
],
false,
Expand Down Expand Up @@ -2413,7 +2413,7 @@ impl Database for DatabaseFdbSqliteNats {
) -> WorkflowResult<Uuid> {
let pool = &self
.pools
.sqlite(sqlite::db_name_internal(workflow_id), false)
.sqlite(crate::db::sqlite_db_name_internal(workflow_id), false)
.await?;

// Insert history event
Expand Down Expand Up @@ -2448,7 +2448,7 @@ impl Database for DatabaseFdbSqliteNats {
.sqlite_manager()
.flush(
vec![
sqlite::db_name_internal(workflow_id),
crate::db::sqlite_db_name_internal(workflow_id),
crate::db::sqlite_db_name_data(workflow_id),
],
false,
Expand Down Expand Up @@ -2596,7 +2596,7 @@ impl Database for DatabaseFdbSqliteNats {
) -> WorkflowResult<()> {
let pool = &self
.pools
.sqlite(sqlite::db_name_internal(workflow_id), false)
.sqlite(crate::db::sqlite_db_name_internal(workflow_id), false)
.await?;
let input_hash = event_id.input_hash.to_be_bytes();

Expand Down Expand Up @@ -2702,7 +2702,7 @@ impl Database for DatabaseFdbSqliteNats {
.sqlite_manager()
.flush(
vec![
sqlite::db_name_internal(from_workflow_id),
crate::db::sqlite_db_name_internal(from_workflow_id),
crate::db::sqlite_db_name_data(from_workflow_id),
],
false,
Expand All @@ -2711,7 +2711,7 @@ impl Database for DatabaseFdbSqliteNats {

let pool = &self
.pools
.sqlite(sqlite::db_name_internal(from_workflow_id), false)
.sqlite(crate::db::sqlite_db_name_internal(from_workflow_id), false)
.await?;

sql_execute!(
Expand Down Expand Up @@ -2751,7 +2751,7 @@ impl Database for DatabaseFdbSqliteNats {
) -> WorkflowResult<()> {
let pool = &self
.pools
.sqlite(sqlite::db_name_internal(workflow_id), false)
.sqlite(crate::db::sqlite_db_name_internal(workflow_id), false)
.await?;

self.txn(|| async {
Expand Down Expand Up @@ -3040,7 +3040,7 @@ impl Database for DatabaseFdbSqliteNats {
) -> WorkflowResult<()> {
let pool = &self
.pools
.sqlite(sqlite::db_name_internal(from_workflow_id), false)
.sqlite(crate::db::sqlite_db_name_internal(from_workflow_id), false)
.await?;

sql_execute!(
Expand Down Expand Up @@ -3072,7 +3072,7 @@ impl Database for DatabaseFdbSqliteNats {
) -> WorkflowResult<()> {
let pool = &self
.pools
.sqlite(sqlite::db_name_internal(from_workflow_id), false)
.sqlite(crate::db::sqlite_db_name_internal(from_workflow_id), false)
.await?;

sql_execute!(
Expand Down Expand Up @@ -3100,7 +3100,7 @@ impl Database for DatabaseFdbSqliteNats {
) -> WorkflowResult<()> {
let pool = &self
.pools
.sqlite(sqlite::db_name_internal(from_workflow_id), false)
.sqlite(crate::db::sqlite_db_name_internal(from_workflow_id), false)
.await?;

sql_execute!(
Expand Down Expand Up @@ -3132,7 +3132,7 @@ impl Database for DatabaseFdbSqliteNats {
) -> WorkflowResult<()> {
let pool = &self
.pools
.sqlite(sqlite::db_name_internal(from_workflow_id), false)
.sqlite(crate::db::sqlite_db_name_internal(from_workflow_id), false)
.await?;

sql_execute!(
Expand Down Expand Up @@ -3164,7 +3164,7 @@ impl Database for DatabaseFdbSqliteNats {
) -> WorkflowResult<()> {
let pool = &self
.pools
.sqlite(sqlite::db_name_internal(from_workflow_id), false)
.sqlite(crate::db::sqlite_db_name_internal(from_workflow_id), false)
.await?;

sql_execute!(
Expand Down Expand Up @@ -3220,7 +3220,7 @@ async fn flush_handler(pools: rivet_pools::Pools, mut flush_rx: mpsc::UnboundedR
.sqlite_manager()
.flush(
vec![
sqlite::db_name_internal(workflow_id),
crate::db::sqlite_db_name_internal(workflow_id),
crate::db::sqlite_db_name_data(workflow_id),
],
true,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
use std::collections::HashMap;

use fdb_util::keys::*;
use include_dir::{include_dir, Dir, File};
use indoc::indoc;
use rivet_pools::prelude::*;
Expand Down Expand Up @@ -407,8 +406,3 @@ pub fn build_history(

Ok(events_by_location)
}

/// Database name for the workflow internal state.
pub fn db_name_internal(workflow_id: Uuid) -> (usize, Uuid, usize) {
(WORKFLOW, workflow_id, INTERNAL)
}
5 changes: 5 additions & 0 deletions packages/common/chirp-workflow/core/src/db/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -343,3 +343,8 @@ pub struct SignalData {
pub fn sqlite_db_name_data(workflow_id: Uuid) -> (usize, Uuid, usize) {
(WORKFLOW, workflow_id, DATA)
}

/// Database name for the workflow internal state.
pub fn sqlite_db_name_internal(workflow_id: Uuid) -> (usize, Uuid, usize) {
(WORKFLOW, workflow_id, INTERNAL)
}
Comment on lines +347 to +350
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

style: Add documentation about what INTERNAL constant represents vs DATA

40 changes: 23 additions & 17 deletions packages/common/pools/src/db/sqlite/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ use uuid::Uuid;

use crate::{metrics, Error, FdbPool};

mod keys;
pub mod keys;

#[cfg(test)]
mod tests;
Expand Down Expand Up @@ -146,7 +146,7 @@ impl SqliteWriterEntry {
/// DB key in packed form. This is not the full FDB key, this is the DB name segment in DbDataKey.
///
/// Stored in an `Arc` since this is frequently copied around.
type KeyPacked = Arc<Vec<u8>>;
pub type KeyPacked = Arc<Vec<u8>>;

pub type SqlitePoolManagerHandle = Arc<SqlitePoolManager>;
pub type SqlitePoolManagerHandleWeak = Weak<SqlitePoolManager>;
Expand Down Expand Up @@ -293,22 +293,15 @@ impl SqlitePoolManager {

// MARK: Private helpers
impl SqlitePoolManager {
fn db_info(&self, key_packed: &KeyPacked) -> (PathBuf, String) {
fn db_path(&self, key_packed: &KeyPacked) -> PathBuf {
let hex_key_str = hex::encode(&**key_packed);

match &self.storage {
SqliteStorage::Local { path } => {
// Determine the persistent location of this database
let db_path = path.join(format!("{hex_key_str}.db"));
let db_url = format!("sqlite://{}", db_path.display());
(db_path, db_url)
}
// Determine the persistent location of this database
SqliteStorage::Local { path } => path.join(format!("{hex_key_str}.db")),
// Generate temporary file location so multiple readers don't clobber each other
SqliteStorage::FoundationDb { path } => {
// Generate temporary file location so multiple readers don't clobber each other
let db_path =
path.join(format!("rivet-sqlite-{hex_key_str}-{}.db", Uuid::new_v4()));
let db_url = format!("sqlite://{}", db_path.display());
(db_path, db_url)
path.join(format!("rivet-sqlite-{hex_key_str}-{}.db", Uuid::new_v4()))
}
}
}
Expand Down Expand Up @@ -344,7 +337,7 @@ impl SqlitePoolManager {
}
}
},
clear_db_files(&self.storage, self.db_info(&key_packed).0),
clear_db_files(&self.storage, self.db_path(&key_packed)),
);
}
}
Expand Down Expand Up @@ -746,13 +739,13 @@ pub struct SqlitePoolInner {
}

impl SqlitePoolInner {
#[tracing::instrument(name = "sqlite_pool_new", skip_all)]
async fn new(
key_packed: KeyPacked,
conn_type: SqliteConnType,
manager: SqlitePoolManagerHandle,
) -> Result<SqlitePool, Error> {
let (db_path, db_url) = manager.db_info(&key_packed);
let db_path = manager.db_path(&key_packed);
let db_url = format!("sqlite://{}", db_path.display());

// Load database
match &manager.storage {
Expand Down Expand Up @@ -893,6 +886,7 @@ impl SqlitePoolInner {
}

impl SqlitePoolInner {
// TODO: Doesn't need a result type
#[tracing::instrument(name = "sqlite_pool_snapshot", skip_all)]
pub async fn snapshot(&self, vacuum: bool) -> GlobalResult<bool> {
match self
Expand All @@ -910,9 +904,21 @@ impl SqlitePoolInner {
}
}
}

#[tracing::instrument(name = "sqlite_pool_evict", skip_all)]
pub async fn evict(&self) -> GlobalResult<()> {
self
.manager
.evict_with_key(&[self.key_packed.clone()])
.await
}
}

impl SqlitePoolInner {
pub fn db_path(&self) -> &Path {
&self.db_path
}

#[tracing::instrument(skip_all)]
pub async fn conn(&self) -> Result<PoolConnection<Sqlite>, sqlx::Error> {
// Attempt to use an existing connection
Expand Down
4 changes: 3 additions & 1 deletion packages/common/server-cli/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,15 @@ chirp-workflow.workspace = true
chrono = "0.4.38"
clap = { version = "4.3", features = ["derive"] }
colored_json = "5.0.0"
fdb-util.workspace = true
foundationdb.workspace = true
futures-util = "0.3"
global-error.workspace = true
hex.workspace = true
include_dir = "0.7.4"
indoc = "2.0.5"
lz4_flex = "0.11.3"
reqwest = "0.12.9"
foundationdb.workspace = true
rivet-api.workspace = true
rivet-config.workspace = true
rivet-logs.workspace = true
Expand Down
10 changes: 10 additions & 0 deletions packages/common/server-cli/src/commands/db/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,10 @@ pub enum DatabaseType {
Redis,
#[clap(alias = "ch")]
Clickhouse,
#[clap(alias = "wfd")]
WorkflowData,
#[clap(alias = "wfi")]
WorkflowInternal,
}

impl SubCommand {
Expand Down Expand Up @@ -56,6 +60,12 @@ impl SubCommand {
DatabaseType::Clickhouse => {
crate::util::db::clickhouse_shell(config, shell_ctx).await?
}
DatabaseType::WorkflowData => {
crate::util::db::wf_sqlite_shell(config, shell_ctx, false).await?
}
DatabaseType::WorkflowInternal => {
crate::util::db::wf_sqlite_shell(config, shell_ctx, true).await?
}
}

Ok(())
Expand Down
Loading
Loading