From f64697c2f4060346c54efc416389b42d62ee45c2 Mon Sep 17 00:00:00 2001 From: MasterPtato Date: Fri, 18 Jul 2025 18:49:20 +0000 Subject: [PATCH] fix: make query_as macros retry --- .../core/src/db/crdb_nats/debug.rs | 4 +- .../pools/src/utils/sql_query_macros.rs | 162 +++++++++++++++--- .../core/services/build/ops/get/src/lib.rs | 2 +- .../core/services/build/src/ops/patch_tags.rs | 4 +- .../core/services/cdn/ops/site-get/src/lib.rs | 2 +- .../cf-custom-hostname/ops/get/src/lib.rs | 2 +- .../cloud/ops/game-config-get/src/lib.rs | 2 +- .../cloud/ops/namespace-get/src/lib.rs | 2 +- .../services/cloud/ops/version-get/src/lib.rs | 2 +- .../src/workers/version_name_reserve.rs | 2 +- .../cluster/src/ops/datacenter/get.rs | 2 +- .../cluster/src/ops/server/lost_list.rs | 2 +- .../src/ops/server/prune_with_filter.rs | 2 +- .../cluster/src/workflows/datacenter/mod.rs | 6 +- .../cluster/src/workflows/server/mod.rs | 2 +- .../core/services/game/ops/get/src/lib.rs | 2 +- .../ops/namespace-resolve-name-id/src/lib.rs | 2 +- .../services/game/ops/version-get/src/lib.rs | 2 +- .../services/linode/standalone/gc/src/lib.rs | 2 +- .../load-test/standalone/sqlx/src/lib.rs | 2 +- .../mm-config/ops/lobby-group-get/src/lib.rs | 2 +- .../lobby-group-resolve-version/src/lib.rs | 2 +- .../mm-config/ops/namespace-get/src/lib.rs | 2 +- .../mm/ops/lobby-for-run-id/src/lib.rs | 2 +- .../core/services/mm/ops/lobby-get/src/lib.rs | 2 +- .../services/mm/ops/player-get/src/lib.rs | 2 +- .../core/services/route/src/ops/upsert.rs | 4 +- .../core/services/team/ops/get/src/lib.rs | 2 +- .../services/team/ops/member-get/src/lib.rs | 2 +- .../core/services/upload/ops/get/src/lib.rs | 2 +- .../core/services/user/ops/get/src/lib.rs | 2 +- .../user/ops/profile-validate/src/lib.rs | 2 +- 32 files changed, 177 insertions(+), 57 deletions(-) diff --git a/packages/common/chirp-workflow/core/src/db/crdb_nats/debug.rs b/packages/common/chirp-workflow/core/src/db/crdb_nats/debug.rs index c1e7c8ad5f..d9db5e2539 100644 --- a/packages/common/chirp-workflow/core/src/db/crdb_nats/debug.rs +++ b/packages/common/chirp-workflow/core/src/db/crdb_nats/debug.rs @@ -49,7 +49,7 @@ impl DatabaseDebug for DatabaseCrdbNats { WHERE workflow_id = ANY($1) ", - workflow_ids, + &workflow_ids, ) .await?; @@ -493,7 +493,7 @@ impl DatabaseDebug for DatabaseCrdbNats { FROM db_workflow.tagged_signals WHERE signal_id = ANY($1) ", - signal_ids, + &signal_ids, ) .await?; diff --git a/packages/common/pools/src/utils/sql_query_macros.rs b/packages/common/pools/src/utils/sql_query_macros.rs index 2a67aa02c1..c57a4bd579 100644 --- a/packages/common/pools/src/utils/sql_query_macros.rs +++ b/packages/common/pools/src/utils/sql_query_macros.rs @@ -203,18 +203,58 @@ macro_rules! __sql_query { }; ([$ctx:expr, @tx $tx:expr] $sql:expr, $($bind:expr),* $(,)?) => { async { - let query = sqlx::query($crate::__opt_indoc!($sql)) - $( - .bind($bind) - )*; - // Execute query $crate::__sql_query_metrics_acquire!(_acquire); $crate::__sql_query_metrics_start!($ctx, execute, _acquire, _start); - let res = query.execute(&mut **$tx).await.map_err(Into::::into); + + let mut backoff = $crate::__rivet_util::Backoff::new( + 4, + None, + $crate::utils::sql_query_macros::QUERY_RETRY_MS, + 50 + ); + let mut i = 0; + + // Retry loop + let res = loop { + let query = sqlx::query($crate::__opt_indoc!($sql)) + $( + .bind($bind) + )*; + + match query.execute(&mut **$tx).await { + Err(err) => { + i += 1; + if i > $crate::utils::sql_query_macros::MAX_QUERY_RETRIES { + break Err( + sqlx::Error::Io( + std::io::Error::new( + std::io::ErrorKind::Other, + $crate::utils::sql_query_macros::Error::MaxSqlRetries(err), + ) + ) + ); + } + + use sqlx::Error::*; + match &err { + // Retry other errors with a backoff + Database(_) | Io(_) | Tls(_) | Protocol(_) | PoolTimedOut | PoolClosed + | WorkerCrashed => { + tracing::warn!(?err, "query retry ({i}/{})", $crate::utils::sql_query_macros::MAX_QUERY_RETRIES); + backoff.tick().await; + } + // Throw error + _ => break Err(err), + } + } + x => break x, + } + }; + $crate::__sql_query_metrics_finish!($ctx, execute, _start); - res + res.map_err(Into::::into) } .instrument(tracing::info_span!("sql_query")) }; @@ -229,11 +269,6 @@ macro_rules! __sql_query_as { async { use sqlx::Acquire; - let query = sqlx::query_as::<_, $rv>($crate::__opt_indoc!($sql)) - $( - .bind($bind) - )*; - // Acquire connection $crate::__sql_query_metrics_acquire!(_acquire); let driver = $driver; @@ -241,27 +276,112 @@ macro_rules! __sql_query_as { // Execute query $crate::__sql_query_metrics_start!($ctx, $action, _acquire, _start); - let res = query.$action(&mut *conn).await.map_err(Into::::into); + + let mut backoff = $crate::__rivet_util::Backoff::new( + 4, + None, + $crate::utils::sql_query_macros::QUERY_RETRY_MS, + 50 + ); + let mut i = 0; + + // Retry loop + let res = loop { + let query = sqlx::query_as::<_, $rv>($crate::__opt_indoc!($sql)) + $( + .bind($bind) + )*; + + match query.$action(&mut *conn).await { + Err(err) => { + i += 1; + if i > $crate::utils::sql_query_macros::MAX_QUERY_RETRIES { + break Err( + sqlx::Error::Io( + std::io::Error::new( + std::io::ErrorKind::Other, + $crate::utils::sql_query_macros::Error::MaxSqlRetries(err), + ) + ) + ); + } + + use sqlx::Error::*; + match &err { + // Retry other errors with a backoff + Database(_) | Io(_) | Tls(_) | Protocol(_) | PoolTimedOut | PoolClosed + | WorkerCrashed => { + tracing::warn!(?err, "query retry ({i}/{})", $crate::utils::sql_query_macros::MAX_QUERY_RETRIES); + backoff.tick().await; + } + // Throw error + _ => break Err(err), + } + } + x => break x, + } + }; + $crate::__sql_query_metrics_finish!($ctx, $action, _start); - res + res.map_err(Into::::into) } .instrument(tracing::info_span!("sql_query_as")) }; ([$ctx:expr, $rv:ty, $action:ident, @tx $tx:expr] $sql:expr, $($bind:expr),* $(,)?) => { async { - let query = sqlx::query_as::<_, $rv>($crate::__opt_indoc!($sql)) - $( - .bind($bind) - )*; - // Execute query $crate::__sql_query_metrics_acquire!(_acquire); $crate::__sql_query_metrics_start!($ctx, $action, _acquire, _start); - let res = query.$action(&mut **$tx).await.map_err(Into::::into); + + let mut backoff = $crate::__rivet_util::Backoff::new( + 4, + None, + $crate::utils::sql_query_macros::QUERY_RETRY_MS, + 50 + ); + let mut i = 0; + + // Retry loop + let res = loop { + let query = sqlx::query_as::<_, $rv>($crate::__opt_indoc!($sql)) + $( + .bind($bind) + )*; + + match query.$action(&mut **$tx).await { + Err(err) => { + i += 1; + if i > $crate::utils::sql_query_macros::MAX_QUERY_RETRIES { + break Err( + sqlx::Error::Io( + std::io::Error::new( + std::io::ErrorKind::Other, + $crate::utils::sql_query_macros::Error::MaxSqlRetries(err), + ) + ) + ); + } + + use sqlx::Error::*; + match &err { + // Retry other errors with a backoff + Database(_) | Io(_) | Tls(_) | Protocol(_) | PoolTimedOut | PoolClosed + | WorkerCrashed => { + tracing::warn!(?err, "query retry ({i}/{})", $crate::utils::sql_query_macros::MAX_QUERY_RETRIES); + backoff.tick().await; + } + // Throw error + _ => break Err(err), + } + } + x => break x, + } + }; + $crate::__sql_query_metrics_finish!($ctx, $action, _start); - res + res.map_err(Into::::into) } .instrument(tracing::info_span!("sql_query_as")) }; diff --git a/packages/core/services/build/ops/get/src/lib.rs b/packages/core/services/build/ops/get/src/lib.rs index b681be6433..3b0bc2a71c 100644 --- a/packages/core/services/build/ops/get/src/lib.rs +++ b/packages/core/services/build/ops/get/src/lib.rs @@ -43,7 +43,7 @@ async fn handle(ctx: OperationContext) -> GlobalResult GlobalResult GlobalResult) -> GlobalResult GlobalRe input.provider as i64, &input.provider_datacenter_id, &input.provider_api_token, - pools_buf, + &pools_buf, input.build_delivery_method as i64, input.prebakes_enabled, util::timestamp::now(), - gph_dns_parent, - gph_static + &gph_dns_parent, + &gph_static, ) .await?; diff --git a/packages/core/services/cluster/src/workflows/server/mod.rs b/packages/core/services/cluster/src/workflows/server/mod.rs index b21e44709c..c75892ff34 100644 --- a/packages/core/services/cluster/src/workflows/server/mod.rs +++ b/packages/core/services/cluster/src/workflows/server/mod.rs @@ -597,7 +597,7 @@ async fn get_vlan_ip(ctx: &ActivityCtx, input: &GetVlanIpInput) -> GlobalResult< // Choose a random index to start from for better index spread rand::thread_rng().gen_range(0i64..max_idx), max_idx, - shared_net_pool_types, + &shared_net_pool_types, input.datacenter_id, input.server_id, ) diff --git a/packages/core/services/game/ops/get/src/lib.rs b/packages/core/services/game/ops/get/src/lib.rs index 0b42e47bb8..bddb8b0557 100644 --- a/packages/core/services/game/ops/get/src/lib.rs +++ b/packages/core/services/game/ops/get/src/lib.rs @@ -68,7 +68,7 @@ async fn handle(ctx: OperationContext) -> GlobalResult>(), + name_ids.iter().map(|(_, name_id)| name_id).collect::>(), ) .await?; diff --git a/packages/core/services/game/ops/version-get/src/lib.rs b/packages/core/services/game/ops/version-get/src/lib.rs index 08e0e1cfe8..2dab399679 100644 --- a/packages/core/services/game/ops/version-get/src/lib.rs +++ b/packages/core/services/game/ops/version-get/src/lib.rs @@ -43,7 +43,7 @@ async fn handle( WHERE version_id = ANY($1) ORDER BY create_ts DESC ", - version_ids, + &version_ids, ) .await?; diff --git a/packages/core/services/linode/standalone/gc/src/lib.rs b/packages/core/services/linode/standalone/gc/src/lib.rs index 7e172f17d7..3d3cbbfad7 100644 --- a/packages/core/services/linode/standalone/gc/src/lib.rs +++ b/packages/core/services/linode/standalone/gc/src/lib.rs @@ -106,7 +106,7 @@ async fn run_for_linode_account( complete_ts IS NULL RETURNING image_id ", - image_ids, + &image_ids, util::timestamp::now(), ) .await?; diff --git a/packages/core/services/load-test/standalone/sqlx/src/lib.rs b/packages/core/services/load-test/standalone/sqlx/src/lib.rs index 18a5eb7bd1..3c75da5d07 100644 --- a/packages/core/services/load-test/standalone/sqlx/src/lib.rs +++ b/packages/core/services/load-test/standalone/sqlx/src/lib.rs @@ -104,7 +104,7 @@ async fn exec(ctx: OperationContext<()>) -> GlobalResult<()> { sql_fetch_one!( [ctx, (String,)] "SELECT display_name FROM db_user.users WHERE user_id = ANY($1)", - user_ids, + &user_ids, ), sql_fetch_all!( [ctx, (Uuid, String, i64)] diff --git a/packages/core/services/mm-config/ops/lobby-group-get/src/lib.rs b/packages/core/services/mm-config/ops/lobby-group-get/src/lib.rs index 54199b2bcc..e8d88d0aff 100644 --- a/packages/core/services/mm-config/ops/lobby-group-get/src/lib.rs +++ b/packages/core/services/mm-config/ops/lobby-group-get/src/lib.rs @@ -33,7 +33,7 @@ async fn handle( FROM db_mm_config.lobby_groups AS lg WHERE lobby_group_id = ANY($1) ", - lobby_group_ids, + &lobby_group_ids, ) .await?; diff --git a/packages/core/services/mm-config/ops/lobby-group-resolve-version/src/lib.rs b/packages/core/services/mm-config/ops/lobby-group-resolve-version/src/lib.rs index f90988ad2b..eeefca07cc 100644 --- a/packages/core/services/mm-config/ops/lobby-group-resolve-version/src/lib.rs +++ b/packages/core/services/mm-config/ops/lobby-group-resolve-version/src/lib.rs @@ -42,7 +42,7 @@ async fn handle( FROM db_mm_config.lobby_groups WHERE lobby_group_id = ANY($1) ", - lobby_group_ids, + &lobby_group_ids, ) .await? .into_iter(); diff --git a/packages/core/services/mm-config/ops/namespace-get/src/lib.rs b/packages/core/services/mm-config/ops/namespace-get/src/lib.rs index f06ae56c35..64168e6568 100644 --- a/packages/core/services/mm-config/ops/namespace-get/src/lib.rs +++ b/packages/core/services/mm-config/ops/namespace-get/src/lib.rs @@ -36,7 +36,7 @@ async fn handle( FROM db_mm_config.game_namespaces WHERE namespace_id = ANY($1) ", - namespace_ids, + &namespace_ids, ) .await? .into_iter() diff --git a/packages/core/services/mm/ops/lobby-for-run-id/src/lib.rs b/packages/core/services/mm/ops/lobby-for-run-id/src/lib.rs index 76e8e8c878..5c39ab4ade 100644 --- a/packages/core/services/mm/ops/lobby-for-run-id/src/lib.rs +++ b/packages/core/services/mm/ops/lobby-for-run-id/src/lib.rs @@ -33,7 +33,7 @@ pub async fn handle( FROM db_mm_state.lobbies WHERE run_id = ANY($1) ", - run_ids, + &run_ids, ) .await? .into_iter() diff --git a/packages/core/services/mm/ops/lobby-get/src/lib.rs b/packages/core/services/mm/ops/lobby-get/src/lib.rs index f2986cc01c..15d834c4c3 100644 --- a/packages/core/services/mm/ops/lobby-get/src/lib.rs +++ b/packages/core/services/mm/ops/lobby-get/src/lib.rs @@ -83,7 +83,7 @@ async fn handle( FROM db_mm_state.lobbies WHERE lobby_id = ANY($1) ", - lobby_ids, + &lobby_ids, ) .await? .into_iter() diff --git a/packages/core/services/mm/ops/player-get/src/lib.rs b/packages/core/services/mm/ops/player-get/src/lib.rs index db700d3bdf..e53b914d07 100644 --- a/packages/core/services/mm/ops/player-get/src/lib.rs +++ b/packages/core/services/mm/ops/player-get/src/lib.rs @@ -43,7 +43,7 @@ async fn handle( FROM db_mm_state.players WHERE player_id = ANY($1) ", - player_ids, + &player_ids, ) .await? .into_iter() diff --git a/packages/core/services/route/src/ops/upsert.rs b/packages/core/services/route/src/ops/upsert.rs index 12479c65f7..0157091b3a 100644 --- a/packages/core/services/route/src/ops/upsert.rs +++ b/packages/core/services/route/src/ops/upsert.rs @@ -111,7 +111,7 @@ pub async fn upsert(ctx: &OperationCtx, input: &Input) -> GlobalResult { input_route_subpaths, input_strip_prefix, 0, // Actors type - actors_selector_tags_json, + &actors_selector_tags_json, now, existing_id, input_namespace_id @@ -140,7 +140,7 @@ pub async fn upsert(ctx: &OperationCtx, input: &Input) -> GlobalResult { input_route_subpaths, input_strip_prefix, 0, // Actors type - actors_selector_tags_json, + &actors_selector_tags_json, now, now ) diff --git a/packages/core/services/team/ops/get/src/lib.rs b/packages/core/services/team/ops/get/src/lib.rs index 600f8baf0a..3c82e1e068 100644 --- a/packages/core/services/team/ops/get/src/lib.rs +++ b/packages/core/services/team/ops/get/src/lib.rs @@ -38,7 +38,7 @@ async fn handle(ctx: OperationContext) -> GlobalResult