Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions packages/common/chirp-workflow/core/src/db/crdb_nats/debug.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ impl DatabaseDebug for DatabaseCrdbNats {
WHERE
workflow_id = ANY($1)
",
workflow_ids,
&workflow_ids,
)
.await?;

Expand Down Expand Up @@ -493,7 +493,7 @@ impl DatabaseDebug for DatabaseCrdbNats {
FROM db_workflow.tagged_signals
WHERE signal_id = ANY($1)
",
signal_ids,
&signal_ids,
)
.await?;

Expand Down
162 changes: 141 additions & 21 deletions packages/common/pools/src/utils/sql_query_macros.rs
Original file line number Diff line number Diff line change
Expand Up @@ -203,18 +203,58 @@ macro_rules! __sql_query {
};
([$ctx:expr, @tx $tx:expr] $sql:expr, $($bind:expr),* $(,)?) => {
async {
let query = sqlx::query($crate::__opt_indoc!($sql))
$(
.bind($bind)
)*;

// Execute query
$crate::__sql_query_metrics_acquire!(_acquire);
$crate::__sql_query_metrics_start!($ctx, execute, _acquire, _start);
let res = query.execute(&mut **$tx).await.map_err(Into::<GlobalError>::into);

let mut backoff = $crate::__rivet_util::Backoff::new(
4,
None,
$crate::utils::sql_query_macros::QUERY_RETRY_MS,
50
);
Comment on lines +210 to +215
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Verification agent

🧩 Analysis chain

Verify the Backoff initialization parameters.

The Backoff is initialized with parameters (4, None, QUERY_RETRY_MS, 50) in multiple places. The first parameter 4 is unclear - please verify what this represents in the __rivet_util::Backoff::new() constructor and ensure it's appropriate for SQL query retries.

Also applies to: 280-285, 337-342


🏁 Script executed:

#!/bin/bash
# Search for the Backoff struct definition to understand the parameters
ast-grep --pattern 'impl Backoff {
  $$$
  fn new($$$) {
    $$$
  }
  $$$
}'

# Also search for any documentation on Backoff
rg -A 10 "struct Backoff" --type rust

Length of output: 738


🏁 Script executed:

#!/bin/bash
# Show the Backoff::new signature and docs
rg -n -A5 -B2 "pub fn new" packages/common/util/core/src/lib.rs

# Show the implementation of next() to understand retry behavior
rg -n -A5 -B3 "fn next" packages/common/util/core/src/lib.rs

Length of output: 435


Clarify Backoff::new parameters

The call

Backoff::new(4, None, QUERY_RETRY_MS, 50)

maps to

pub fn new(
    max_exponent: usize,    // first arg
    max_retries: Option<usize>,
    wait: usize,            // base wait in ms
    randomness: usize,      // max jitter in ms
)
  • max_exponent = 4 ⇒ delays = wait * 2^n up to 2⁴ → 100 ms → 1 600 ms (± jitter)
  • max_retries = Noneunbounded retry attempts
  • wait = QUERY_RETRY_MS (100 ms) and randomness = 50 ms jitter

Please confirm that:

  • A maximum exponent of 4 (1.6 s cap) is sufficient.
  • Unlimited retries are desired. Otherwise, pass Some(MAX_QUERY_RETRIES) instead of None to bound attempts.

Applies at:

  • packages/common/pools/src/utils/sql_query_macros.rs:210–215
  • packages/common/pools/src/utils/sql_query_macros.rs:280–285
  • packages/common/pools/src/utils/sql_query_macros.rs:337–342
🤖 Prompt for AI Agents
In packages/common/pools/src/utils/sql_query_macros.rs around lines 210 to 215,
review the Backoff::new call parameters to ensure they match desired retry
behavior. Confirm if a max_exponent of 4 (capping delay at about 1.6 seconds) is
sufficient for your use case and whether unlimited retries (max_retries set to
None) are intended. If you want to limit retry attempts, replace None with
Some(MAX_QUERY_RETRIES) to bound the number of retries. Apply the same
confirmation and changes as needed at lines 280–285 and 337–342.

let mut i = 0;

// Retry loop
let res = loop {
let query = sqlx::query($crate::__opt_indoc!($sql))
$(
.bind($bind)
)*;

match query.execute(&mut **$tx).await {
Err(err) => {
i += 1;
if i > $crate::utils::sql_query_macros::MAX_QUERY_RETRIES {
break Err(
sqlx::Error::Io(
std::io::Error::new(
std::io::ErrorKind::Other,
$crate::utils::sql_query_macros::Error::MaxSqlRetries(err),
)
)
);
}

use sqlx::Error::*;
match &err {
// Retry other errors with a backoff
Database(_) | Io(_) | Tls(_) | Protocol(_) | PoolTimedOut | PoolClosed
| WorkerCrashed => {
tracing::warn!(?err, "query retry ({i}/{})", $crate::utils::sql_query_macros::MAX_QUERY_RETRIES);
backoff.tick().await;
}
// Throw error
_ => break Err(err),
}
}
x => break x,
}
};

$crate::__sql_query_metrics_finish!($ctx, execute, _start);

res
res.map_err(Into::<GlobalError>::into)
}
.instrument(tracing::info_span!("sql_query"))
};
Expand All @@ -229,39 +269,119 @@ macro_rules! __sql_query_as {
async {
use sqlx::Acquire;

let query = sqlx::query_as::<_, $rv>($crate::__opt_indoc!($sql))
$(
.bind($bind)
)*;

// Acquire connection
$crate::__sql_query_metrics_acquire!(_acquire);
let driver = $driver;
let mut conn = $crate::__sql_acquire!($ctx, driver);

// Execute query
$crate::__sql_query_metrics_start!($ctx, $action, _acquire, _start);
let res = query.$action(&mut *conn).await.map_err(Into::<GlobalError>::into);

let mut backoff = $crate::__rivet_util::Backoff::new(
4,
None,
$crate::utils::sql_query_macros::QUERY_RETRY_MS,
50
);
let mut i = 0;

// Retry loop
let res = loop {
let query = sqlx::query_as::<_, $rv>($crate::__opt_indoc!($sql))
$(
.bind($bind)
)*;

match query.$action(&mut *conn).await {
Err(err) => {
i += 1;
if i > $crate::utils::sql_query_macros::MAX_QUERY_RETRIES {
break Err(
sqlx::Error::Io(
std::io::Error::new(
std::io::ErrorKind::Other,
$crate::utils::sql_query_macros::Error::MaxSqlRetries(err),
)
)
);
}

use sqlx::Error::*;
match &err {
// Retry other errors with a backoff
Database(_) | Io(_) | Tls(_) | Protocol(_) | PoolTimedOut | PoolClosed
| WorkerCrashed => {
tracing::warn!(?err, "query retry ({i}/{})", $crate::utils::sql_query_macros::MAX_QUERY_RETRIES);
backoff.tick().await;
}
// Throw error
_ => break Err(err),
}
}
x => break x,
}
};

$crate::__sql_query_metrics_finish!($ctx, $action, _start);

res
res.map_err(Into::<GlobalError>::into)
}
.instrument(tracing::info_span!("sql_query_as"))
};
([$ctx:expr, $rv:ty, $action:ident, @tx $tx:expr] $sql:expr, $($bind:expr),* $(,)?) => {
async {
let query = sqlx::query_as::<_, $rv>($crate::__opt_indoc!($sql))
$(
.bind($bind)
)*;

// Execute query
$crate::__sql_query_metrics_acquire!(_acquire);
$crate::__sql_query_metrics_start!($ctx, $action, _acquire, _start);
let res = query.$action(&mut **$tx).await.map_err(Into::<GlobalError>::into);

let mut backoff = $crate::__rivet_util::Backoff::new(
4,
None,
$crate::utils::sql_query_macros::QUERY_RETRY_MS,
50
);
let mut i = 0;

// Retry loop
let res = loop {
let query = sqlx::query_as::<_, $rv>($crate::__opt_indoc!($sql))
$(
.bind($bind)
)*;

match query.$action(&mut **$tx).await {
Err(err) => {
i += 1;
if i > $crate::utils::sql_query_macros::MAX_QUERY_RETRIES {
break Err(
sqlx::Error::Io(
std::io::Error::new(
std::io::ErrorKind::Other,
$crate::utils::sql_query_macros::Error::MaxSqlRetries(err),
)
)
);
}

use sqlx::Error::*;
match &err {
// Retry other errors with a backoff
Database(_) | Io(_) | Tls(_) | Protocol(_) | PoolTimedOut | PoolClosed
| WorkerCrashed => {
tracing::warn!(?err, "query retry ({i}/{})", $crate::utils::sql_query_macros::MAX_QUERY_RETRIES);
backoff.tick().await;
}
// Throw error
_ => break Err(err),
}
}
x => break x,
}
};

$crate::__sql_query_metrics_finish!($ctx, $action, _start);

res
res.map_err(Into::<GlobalError>::into)
}
.instrument(tracing::info_span!("sql_query_as"))
};
Expand Down
2 changes: 1 addition & 1 deletion packages/core/services/build/ops/get/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ async fn handle(ctx: OperationContext<build::get::Request>) -> GlobalResult<buil
WHERE
build_id = ANY($1)
",
build_ids,
&build_ids,
)
.await?
.into_iter()
Expand Down
4 changes: 2 additions & 2 deletions packages/core/services/build/src/ops/patch_tags.rs
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ pub async fn patch_tags(ctx: &OperationCtx, input: &Input) -> GlobalResult<Outpu
WHERE b.build_id = f2.build_id
",
build_id,
exclusive_tags_json,
&exclusive_tags_json,
)
.await?;
}
Expand All @@ -147,7 +147,7 @@ pub async fn patch_tags(ctx: &OperationCtx, input: &Input) -> GlobalResult<Outpu
WHERE build_id = $1
",
build_id,
tags_json,
&tags_json,
)
.await?;

Expand Down
2 changes: 1 addition & 1 deletion packages/core/services/cdn/ops/site-get/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ async fn handle(
FROM db_cdn.sites
WHERE site_id = ANY($1)
",
site_ids,
&site_ids,
)
.await?
.into_iter()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ async fn handle(
FROM db_cf_custom_hostname.custom_hostnames
WHERE identifier = ANY($1)
",
identifiers,
&identifiers,
)
.await?;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ async fn handle(
FROM db_cloud.game_configs
WHERE game_id = ANY($1)
",
game_ids,
&game_ids,
)
.await?
.into_iter()
Expand Down
2 changes: 1 addition & 1 deletion packages/core/services/cloud/ops/namespace-get/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ async fn handle(
FROM db_cloud.game_namespaces
WHERE namespace_id = ANY($1)
",
namespace_ids,
&namespace_ids,
)
.await?;

Expand Down
2 changes: 1 addition & 1 deletion packages/core/services/cloud/ops/version-get/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ async fn handle(
FROM db_cloud.game_versions
WHERE version_id = ANY($1)
",
req_version_ids,
&req_version_ids,
)
.await?;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ async fn worker(
RETURNING version_display_name;
"#,
game_id,
date_prefix,
&date_prefix,
ctx.ts(),
)
.await?;
Expand Down
2 changes: 1 addition & 1 deletion packages/core/services/cluster/src/ops/datacenter/get.rs
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ async fn get_dcs(ctx: OperationCtx, datacenter_ids: Vec<Uuid>) -> GlobalResult<V
FROM db_cluster.datacenters
WHERE datacenter_id = ANY($1)
",
datacenter_ids,
&datacenter_ids,
)
.await?;

Expand Down
2 changes: 1 addition & 1 deletion packages/core/services/cluster/src/ops/server/lost_list.rs
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ async fn run_for_linode_account(
($5 IS NULL OR s.pool_type = ANY($5)) AND
($6 IS NULL OR s.public_ip = ANY($6))
",
server_ids,
&server_ids,
&filter.server_ids,
&filter.datacenter_ids,
&filter.cluster_ids,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ async fn prune_linode(
provider_api_token IS NOT NULL AND
datacenter_id = ANY($1)
",
dc_ids,
&dc_ids,
)
.await?
.into_iter()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -228,12 +228,12 @@ async fn insert_db_inner(ctx: &ActivityCtx, input: &InsertDbInputV2) -> GlobalRe
input.provider as i64,
&input.provider_datacenter_id,
&input.provider_api_token,
pools_buf,
&pools_buf,
input.build_delivery_method as i64,
input.prebakes_enabled,
util::timestamp::now(),
gph_dns_parent,
gph_static
&gph_dns_parent,
&gph_static,
)
.await?;

Expand Down
2 changes: 1 addition & 1 deletion packages/core/services/cluster/src/workflows/server/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -597,7 +597,7 @@ async fn get_vlan_ip(ctx: &ActivityCtx, input: &GetVlanIpInput) -> GlobalResult<
// Choose a random index to start from for better index spread
rand::thread_rng().gen_range(0i64..max_idx),
max_idx,
shared_net_pool_types,
&shared_net_pool_types,
input.datacenter_id,
input.server_id,
)
Expand Down
2 changes: 1 addition & 1 deletion packages/core/services/game/ops/get/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ async fn handle(ctx: OperationContext<game::get::Request>) -> GlobalResult<game:
FROM db_game.games
WHERE game_id = ANY($1)
",
game_ids,
&game_ids,
)
.await?;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ async fn handle(
WHERE game_id = $1 AND name_id = ANY($2)
",
game_id,
name_ids.into_iter().map(|(_, name_id)| name_id).collect::<Vec<_>>(),
name_ids.iter().map(|(_, name_id)| name_id).collect::<Vec<_>>(),
)
.await?;

Expand Down
2 changes: 1 addition & 1 deletion packages/core/services/game/ops/version-get/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ async fn handle(
WHERE version_id = ANY($1)
ORDER BY create_ts DESC
",
version_ids,
&version_ids,
)
.await?;

Expand Down
2 changes: 1 addition & 1 deletion packages/core/services/linode/standalone/gc/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ async fn run_for_linode_account(
complete_ts IS NULL
RETURNING image_id
",
image_ids,
&image_ids,
util::timestamp::now(),
)
.await?;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ async fn exec(ctx: OperationContext<()>) -> GlobalResult<()> {
sql_fetch_one!(
[ctx, (String,)]
"SELECT display_name FROM db_user.users WHERE user_id = ANY($1)",
user_ids,
&user_ids,
),
sql_fetch_all!(
[ctx, (Uuid, String, i64)]
Expand Down
Loading
Loading