@@ -1128,28 +1128,64 @@ impl LogServer {
11281128 let collection_id = Uuid :: parse_str ( & scout_logs. collection_id )
11291129 . map ( CollectionUuid )
11301130 . map_err ( |_| Status :: invalid_argument ( "Failed to parse collection id" ) ) ?;
1131-
11321131 let prefix = collection_id. storage_prefix_for_log ( ) ;
11331132 let log_reader = LogReader :: new (
11341133 self . config . reader . clone ( ) ,
11351134 Arc :: clone ( & self . storage ) ,
11361135 prefix,
11371136 ) ;
1138- let ( start_position, limit_position) = match log_reader. manifest ( ) . await {
1139- Ok ( Some ( manifest) ) => ( manifest. oldest_timestamp ( ) , manifest. next_write_timestamp ( ) ) ,
1140- Ok ( None ) => ( LogPosition :: from_offset ( 1 ) , LogPosition :: from_offset ( 1 ) ) ,
1141- Err ( wal3:: Error :: UninitializedLog ) => {
1142- return Err ( Status :: not_found ( format ! (
1143- "collection {collection_id} not found"
1144- ) ) ) ;
1137+ let cache_key = cache_key_for_manifest_and_etag ( collection_id) ;
1138+ let mut cached_manifest_and_e_tag = None ;
1139+ if let Some ( cache) = self . cache . as_ref ( ) {
1140+ if let Some ( cache_bytes) = cache. get ( & cache_key) . await . ok ( ) . flatten ( ) {
1141+ let met = serde_json:: from_slice :: < ManifestAndETag > ( & cache_bytes. bytes ) . ok ( ) ;
1142+ cached_manifest_and_e_tag = met;
11451143 }
1146- Err ( err) => {
1147- return Err ( Status :: new (
1148- err. code ( ) . into ( ) ,
1149- format ! ( "could not scout logs: {err:?}" ) ,
1150- ) ) ;
1144+ }
1145+ // NOTE(rescrv): We verify and if verification fails, we take the cached manifest to fall
1146+ // back to the uncached path.
1147+ if let Some ( cached) = cached_manifest_and_e_tag. as_ref ( ) {
1148+ if !log_reader. verify ( cached) . await . unwrap_or_default ( ) {
1149+ cached_manifest_and_e_tag. take ( ) ;
11511150 }
1152- } ;
1151+ }
1152+ let ( start_position, limit_position) =
1153+ if let Some ( manifest_and_e_tag) = cached_manifest_and_e_tag {
1154+ (
1155+ manifest_and_e_tag. manifest . oldest_timestamp ( ) ,
1156+ manifest_and_e_tag. manifest . next_write_timestamp ( ) ,
1157+ )
1158+ } else {
1159+ let ( start_position, limit_position) = match log_reader. manifest_and_e_tag ( ) . await {
1160+ Ok ( Some ( manifest_and_e_tag) ) => {
1161+ if let Some ( cache) = self . cache . as_ref ( ) {
1162+ let json = serde_json:: to_string ( & manifest_and_e_tag)
1163+ . map_err ( |err| Status :: unknown ( err. to_string ( ) ) ) ?;
1164+ let cached_bytes = CachedBytes {
1165+ bytes : Vec :: from ( json) ,
1166+ } ;
1167+ cache. insert ( cache_key, cached_bytes) . await ;
1168+ }
1169+ (
1170+ manifest_and_e_tag. manifest . oldest_timestamp ( ) ,
1171+ manifest_and_e_tag. manifest . next_write_timestamp ( ) ,
1172+ )
1173+ }
1174+ Ok ( None ) => ( LogPosition :: from_offset ( 1 ) , LogPosition :: from_offset ( 1 ) ) ,
1175+ Err ( wal3:: Error :: UninitializedLog ) => {
1176+ return Err ( Status :: not_found ( format ! (
1177+ "collection {collection_id} not found"
1178+ ) ) ) ;
1179+ }
1180+ Err ( err) => {
1181+ return Err ( Status :: new (
1182+ err. code ( ) . into ( ) ,
1183+ format ! ( "could not scout logs: {err:?}" ) ,
1184+ ) ) ;
1185+ }
1186+ } ;
1187+ ( start_position, limit_position)
1188+ } ;
11531189 let start_offset = start_position. offset ( ) as i64 ;
11541190 let limit_offset = limit_position. offset ( ) as i64 ;
11551191 Ok ( Response :: new ( ScoutLogsResponse {
0 commit comments