@@ -203,18 +203,58 @@ macro_rules! __sql_query {
203
203
} ;
204
204
( [ $ctx: expr, @tx $tx: expr] $sql: expr, $( $bind: expr) ,* $( , ) ?) => {
205
205
async {
206
- let query = sqlx:: query( $crate:: __opt_indoc!( $sql) )
207
- $(
208
- . bind( $bind)
209
- ) * ;
210
-
211
206
// Execute query
212
207
$crate:: __sql_query_metrics_acquire!( _acquire) ;
213
208
$crate:: __sql_query_metrics_start!( $ctx, execute, _acquire, _start) ;
214
- let res = query. execute( & mut * * $tx) . await . map_err( Into :: <GlobalError >:: into) ;
209
+
210
+ let mut backoff = $crate:: __rivet_util:: Backoff :: new(
211
+ 4 ,
212
+ None ,
213
+ $crate:: utils:: sql_query_macros:: QUERY_RETRY_MS ,
214
+ 50
215
+ ) ;
216
+ let mut i = 0 ;
217
+
218
+ // Retry loop
219
+ let res = loop {
220
+ let query = sqlx:: query( $crate:: __opt_indoc!( $sql) )
221
+ $(
222
+ . bind( $bind)
223
+ ) * ;
224
+
225
+ match query. execute( & mut * * $tx) . await {
226
+ Err ( err) => {
227
+ i += 1 ;
228
+ if i > $crate:: utils:: sql_query_macros:: MAX_QUERY_RETRIES {
229
+ break Err (
230
+ sqlx:: Error :: Io (
231
+ std:: io:: Error :: new(
232
+ std:: io:: ErrorKind :: Other ,
233
+ $crate:: utils:: sql_query_macros:: Error :: MaxSqlRetries ( err) ,
234
+ )
235
+ )
236
+ ) ;
237
+ }
238
+
239
+ use sqlx:: Error :: * ;
240
+ match & err {
241
+ // Retry other errors with a backoff
242
+ Database ( _) | Io ( _) | Tls ( _) | Protocol ( _) | PoolTimedOut | PoolClosed
243
+ | WorkerCrashed => {
244
+ tracing:: warn!( ?err, "query retry ({i}/{})" , $crate:: utils:: sql_query_macros:: MAX_QUERY_RETRIES ) ;
245
+ backoff. tick( ) . await ;
246
+ }
247
+ // Throw error
248
+ _ => break Err ( err) ,
249
+ }
250
+ }
251
+ x => break x,
252
+ }
253
+ } ;
254
+
215
255
$crate:: __sql_query_metrics_finish!( $ctx, execute, _start) ;
216
256
217
- res
257
+ res. map_err ( Into :: < GlobalError > :: into )
218
258
}
219
259
. instrument( tracing:: info_span!( "sql_query" ) )
220
260
} ;
@@ -229,39 +269,119 @@ macro_rules! __sql_query_as {
229
269
async {
230
270
use sqlx:: Acquire ;
231
271
232
- let query = sqlx:: query_as:: <_, $rv>( $crate:: __opt_indoc!( $sql) )
233
- $(
234
- . bind( $bind)
235
- ) * ;
236
-
237
272
// Acquire connection
238
273
$crate:: __sql_query_metrics_acquire!( _acquire) ;
239
274
let driver = $driver;
240
275
let mut conn = $crate:: __sql_acquire!( $ctx, driver) ;
241
276
242
277
// Execute query
243
278
$crate:: __sql_query_metrics_start!( $ctx, $action, _acquire, _start) ;
244
- let res = query. $action( & mut * conn) . await . map_err( Into :: <GlobalError >:: into) ;
279
+
280
+ let mut backoff = $crate:: __rivet_util:: Backoff :: new(
281
+ 4 ,
282
+ None ,
283
+ $crate:: utils:: sql_query_macros:: QUERY_RETRY_MS ,
284
+ 50
285
+ ) ;
286
+ let mut i = 0 ;
287
+
288
+ // Retry loop
289
+ let res = loop {
290
+ let query = sqlx:: query_as:: <_, $rv>( $crate:: __opt_indoc!( $sql) )
291
+ $(
292
+ . bind( $bind)
293
+ ) * ;
294
+
295
+ match query. $action( & mut * conn) . await {
296
+ Err ( err) => {
297
+ i += 1 ;
298
+ if i > $crate:: utils:: sql_query_macros:: MAX_QUERY_RETRIES {
299
+ break Err (
300
+ sqlx:: Error :: Io (
301
+ std:: io:: Error :: new(
302
+ std:: io:: ErrorKind :: Other ,
303
+ $crate:: utils:: sql_query_macros:: Error :: MaxSqlRetries ( err) ,
304
+ )
305
+ )
306
+ ) ;
307
+ }
308
+
309
+ use sqlx:: Error :: * ;
310
+ match & err {
311
+ // Retry other errors with a backoff
312
+ Database ( _) | Io ( _) | Tls ( _) | Protocol ( _) | PoolTimedOut | PoolClosed
313
+ | WorkerCrashed => {
314
+ tracing:: warn!( ?err, "query retry ({i}/{})" , $crate:: utils:: sql_query_macros:: MAX_QUERY_RETRIES ) ;
315
+ backoff. tick( ) . await ;
316
+ }
317
+ // Throw error
318
+ _ => break Err ( err) ,
319
+ }
320
+ }
321
+ x => break x,
322
+ }
323
+ } ;
324
+
245
325
$crate:: __sql_query_metrics_finish!( $ctx, $action, _start) ;
246
326
247
- res
327
+ res. map_err ( Into :: < GlobalError > :: into )
248
328
}
249
329
. instrument( tracing:: info_span!( "sql_query_as" ) )
250
330
} ;
251
331
( [ $ctx: expr, $rv: ty, $action: ident, @tx $tx: expr] $sql: expr, $( $bind: expr) ,* $( , ) ?) => {
252
332
async {
253
- let query = sqlx:: query_as:: <_, $rv>( $crate:: __opt_indoc!( $sql) )
254
- $(
255
- . bind( $bind)
256
- ) * ;
257
-
258
333
// Execute query
259
334
$crate:: __sql_query_metrics_acquire!( _acquire) ;
260
335
$crate:: __sql_query_metrics_start!( $ctx, $action, _acquire, _start) ;
261
- let res = query. $action( & mut * * $tx) . await . map_err( Into :: <GlobalError >:: into) ;
336
+
337
+ let mut backoff = $crate:: __rivet_util:: Backoff :: new(
338
+ 4 ,
339
+ None ,
340
+ $crate:: utils:: sql_query_macros:: QUERY_RETRY_MS ,
341
+ 50
342
+ ) ;
343
+ let mut i = 0 ;
344
+
345
+ // Retry loop
346
+ let res = loop {
347
+ let query = sqlx:: query_as:: <_, $rv>( $crate:: __opt_indoc!( $sql) )
348
+ $(
349
+ . bind( $bind)
350
+ ) * ;
351
+
352
+ match query. $action( & mut * * $tx) . await {
353
+ Err ( err) => {
354
+ i += 1 ;
355
+ if i > $crate:: utils:: sql_query_macros:: MAX_QUERY_RETRIES {
356
+ break Err (
357
+ sqlx:: Error :: Io (
358
+ std:: io:: Error :: new(
359
+ std:: io:: ErrorKind :: Other ,
360
+ $crate:: utils:: sql_query_macros:: Error :: MaxSqlRetries ( err) ,
361
+ )
362
+ )
363
+ ) ;
364
+ }
365
+
366
+ use sqlx:: Error :: * ;
367
+ match & err {
368
+ // Retry other errors with a backoff
369
+ Database ( _) | Io ( _) | Tls ( _) | Protocol ( _) | PoolTimedOut | PoolClosed
370
+ | WorkerCrashed => {
371
+ tracing:: warn!( ?err, "query retry ({i}/{})" , $crate:: utils:: sql_query_macros:: MAX_QUERY_RETRIES ) ;
372
+ backoff. tick( ) . await ;
373
+ }
374
+ // Throw error
375
+ _ => break Err ( err) ,
376
+ }
377
+ }
378
+ x => break x,
379
+ }
380
+ } ;
381
+
262
382
$crate:: __sql_query_metrics_finish!( $ctx, $action, _start) ;
263
383
264
- res
384
+ res. map_err ( Into :: < GlobalError > :: into )
265
385
}
266
386
. instrument( tracing:: info_span!( "sql_query_as" ) )
267
387
} ;
0 commit comments