@@ -6,6 +6,7 @@ use std::{
66use moka:: future:: { Cache , CacheBuilder } ;
77use redis:: AsyncCommands ;
88use rivet_pools:: prelude:: * ;
9+ use tracing:: Instrument ;
910
1011use crate :: { error:: Error , metrics} ;
1112
@@ -21,6 +22,7 @@ pub enum Driver {
2122
2223impl Driver {
2324 /// Fetch multiple values from cache at once
25+ #[ tracing:: instrument( skip_all, fields( driver=%self ) ) ]
2426 pub async fn fetch_values < ' a > (
2527 & ' a self ,
2628 base_key : & ' a str ,
@@ -33,6 +35,7 @@ impl Driver {
3335 }
3436
3537 /// Set multiple values in cache at once
38+ #[ tracing:: instrument( skip_all, fields( driver=%self ) ) ]
3639 pub async fn set_values < ' a > (
3740 & ' a self ,
3841 base_key : & ' a str ,
@@ -45,6 +48,7 @@ impl Driver {
4548 }
4649
4750 /// Delete multiple keys from cache
51+ #[ tracing:: instrument( skip_all, fields( driver=%self ) ) ]
4852 pub async fn delete_keys < ' a > (
4953 & ' a self ,
5054 base_key : & ' a str ,
@@ -95,6 +99,7 @@ impl Driver {
9599 }
96100
97101 /// Increment a rate limit counter and return the new count
102+ #[ tracing:: instrument( skip_all, fields( driver=%self ) ) ]
98103 pub async fn rate_limit_increment < ' a > (
99104 & ' a self ,
100105 key : & ' a str ,
@@ -123,6 +128,15 @@ impl Driver {
123128 }
124129}
125130
131+ impl std:: fmt:: Display for Driver {
132+ fn fmt ( & self , f : & mut std:: fmt:: Formatter < ' _ > ) -> std:: fmt:: Result {
133+ match self {
134+ Driver :: Redis ( _) => write ! ( f, "redis" ) ,
135+ Driver :: InMemory ( _) => write ! ( f, "in_memory" ) ,
136+ }
137+ }
138+ }
139+
126140/// Redis cache driver implementation
127141#[ derive( Clone ) ]
128142pub struct RedisDriver {
@@ -176,6 +190,7 @@ impl RedisDriver {
176190
177191 match mget_cmd
178192 . query_async :: < _ , Vec < Option < CacheValue > > > ( & mut redis_conn)
193+ . instrument ( tracing:: info_span!( "redis_query" ) )
179194 . await
180195 {
181196 Ok ( values) => {
@@ -215,7 +230,11 @@ impl RedisDriver {
215230 . ignore ( ) ;
216231 }
217232
218- match pipe. query_async ( & mut redis_conn) . await {
233+ match pipe
234+ . query_async ( & mut redis_conn)
235+ . instrument ( tracing:: info_span!( "redis_query" ) )
236+ . await
237+ {
219238 Ok ( ( ) ) => {
220239 tracing:: trace!( "successfully wrote to cache" ) ;
221240 Ok ( ( ) )
@@ -242,7 +261,11 @@ impl RedisDriver {
242261 . with_label_values ( & [ & base_key] )
243262 . inc_by ( redis_keys. len ( ) as u64 ) ;
244263
245- match redis_conn. del :: < _ , ( ) > ( redis_keys) . await {
264+ match redis_conn
265+ . del :: < _ , ( ) > ( redis_keys)
266+ . instrument ( tracing:: info_span!( "redis_query" ) )
267+ . await
268+ {
246269 Ok ( _) => {
247270 tracing:: trace!( "successfully deleted keys" ) ;
248271 Ok ( ( ) )
@@ -300,7 +323,11 @@ impl RedisDriver {
300323 pipe. incr ( key, 1 ) ;
301324 pipe. pexpire ( key, ttl_ms as usize ) . ignore ( ) ;
302325
303- match pipe. query_async :: < _ , ( i64 , ) > ( & mut redis_conn) . await {
326+ match pipe
327+ . query_async :: < _ , ( i64 , ) > ( & mut redis_conn)
328+ . instrument ( tracing:: info_span!( "redis_query" ) )
329+ . await
330+ {
304331 Ok ( ( incr, ) ) => Ok ( incr) ,
305332 Err ( err) => {
306333 tracing:: error!( ?err, ?key, "failed to increment rate limit key" ) ;
@@ -415,9 +442,14 @@ impl InMemoryDriver {
415442
416443 let mut result = Vec :: with_capacity ( keys. len ( ) ) ;
417444
418- for key in keys {
419- result. push ( cache. get ( & key) . await . map ( |x| x. value . clone ( ) ) ) ;
445+ // Async block for metrics
446+ async {
447+ for key in keys {
448+ result. push ( cache. get ( & key) . await . map ( |x| x. value . clone ( ) ) ) ;
449+ }
420450 }
451+ . instrument ( tracing:: info_span!( "get" ) )
452+ . await ;
421453
422454 tracing:: debug!(
423455 cached_len = result. iter( ) . filter( |x| x. is_some( ) ) . count( ) ,
@@ -435,16 +467,21 @@ impl InMemoryDriver {
435467 ) -> Result < ( ) , Error > {
436468 let cache = self . cache . clone ( ) ;
437469
438- for ( key, value, expire_at) in keys_values {
439- // Create an entry with the value and expiration time
440- let entry = ExpiringValue {
441- value,
442- expiry_time : expire_at,
443- } ;
444-
445- // Store in cache - expiry will be handled by ValueExpiry
446- cache. insert ( key, entry) . await ;
470+ // Async block for metrics
471+ async {
472+ for ( key, value, expire_at) in keys_values {
473+ // Create an entry with the value and expiration time
474+ let entry = ExpiringValue {
475+ value,
476+ expiry_time : expire_at,
477+ } ;
478+
479+ // Store in cache - expiry will be handled by ValueExpiry
480+ cache. insert ( key, entry) . await ;
481+ }
447482 }
483+ . instrument ( tracing:: info_span!( "set" ) )
484+ . await ;
448485
449486 tracing:: trace!( "successfully wrote to in-memory cache with per-key expiry" ) ;
450487 Ok ( ( ) )
@@ -465,10 +502,15 @@ impl InMemoryDriver {
465502 . with_label_values ( & [ & base_key] )
466503 . inc_by ( keys. len ( ) as u64 ) ;
467504
468- for key in keys {
469- // Use remove instead of invalidate to ensure it's actually removed
470- cache. remove ( & key) . await ;
505+ // Async block for metrics
506+ async {
507+ for key in keys {
508+ // Use remove instead of invalidate to ensure it's actually removed
509+ cache. remove ( & key) . await ;
510+ }
471511 }
512+ . instrument ( tracing:: info_span!( "remove" ) )
513+ . await ;
472514
473515 tracing:: trace!( "successfully deleted keys from in-memory cache" ) ;
474516 Ok ( ( ) )
0 commit comments