From 71640b2f96d98ee9a91d81f4c4bf4c0160fadd04 Mon Sep 17 00:00:00 2001 From: Robert Escriva Date: Wed, 27 Aug 2025 14:26:16 -0700 Subject: [PATCH 1/3] [ENH] Scout logs issues a HEAD for cached manifests. --- Cargo.lock | 24 +++- rust/log-service/src/lib.rs | 64 +++++++-- rust/storage/src/admissioncontrolleds3.rs | 17 +++ rust/storage/src/lib.rs | 13 ++ rust/storage/src/local.rs | 4 + rust/storage/src/object_store.rs | 4 + rust/storage/src/s3.rs | 92 +++++++++++++ rust/wal3/src/manifest.rs | 85 ++++++++++++ rust/wal3/src/reader.rs | 157 +++++++++++++++++++++- 9 files changed, 439 insertions(+), 21 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index fdd8f0a44dc..2604224aeb1 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4758,7 +4758,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "fc2f4eb4bc735547cfed7c0a4922cbd04a4655978c09b54f1f7b228750664c34" dependencies = [ "cfg-if", - "windows-targets 0.52.6", + "windows-targets 0.48.5", ] [[package]] @@ -9137,9 +9137,9 @@ dependencies = [ [[package]] name = "utoipa-swagger-ui" -version = "9.0.0" +version = "9.0.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "161166ec520c50144922a625d8bc4925cc801b2dda958ab69878527c0e5c5d61" +checksum = "d047458f1b5b65237c2f6dc6db136945667f40a7668627b3490b9513a3d43a55" dependencies = [ "axum 0.8.1", "base64 0.22.1", @@ -9150,7 +9150,7 @@ dependencies = [ "serde_json", "url", "utoipa", - "zip", + "zip 3.0.0", ] [[package]] @@ -10169,6 +10169,20 @@ dependencies = [ "zstd", ] +[[package]] +name = "zip" +version = "3.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "12598812502ed0105f607f941c386f43d441e00148fce9dec3ca5ffb0bde9308" +dependencies = [ + "arbitrary", + "crc32fast", + "flate2", + "indexmap 2.6.0", + "memchr", + "zopfli", +] + [[package]] name = "zip-extract" version = "0.2.1" @@ -10177,7 +10191,7 @@ checksum = "25a8c9e90f27d1435088a7b540b6cc8ae6ee525d992a695f16012d2f365b3d3c" dependencies = [ "log", "thiserror 1.0.69", - "zip", + "zip 2.2.2", ] [[package]] diff --git a/rust/log-service/src/lib.rs b/rust/log-service/src/lib.rs index 3c6800e0c96..2ac45448033 100644 --- a/rust/log-service/src/lib.rs +++ b/rust/log-service/src/lib.rs @@ -1129,28 +1129,64 @@ impl LogServer { let collection_id = Uuid::parse_str(&scout_logs.collection_id) .map(CollectionUuid) .map_err(|_| Status::invalid_argument("Failed to parse collection id"))?; - let prefix = collection_id.storage_prefix_for_log(); let log_reader = LogReader::new( self.config.reader.clone(), Arc::clone(&self.storage), prefix, ); - let (start_position, limit_position) = match log_reader.manifest().await { - Ok(Some(manifest)) => (manifest.oldest_timestamp(), manifest.next_write_timestamp()), - Ok(None) => (LogPosition::from_offset(1), LogPosition::from_offset(1)), - Err(wal3::Error::UninitializedLog) => { - return Err(Status::not_found(format!( - "collection {collection_id} not found" - ))); + let cache_key = cache_key_for_manifest_and_etag(collection_id); + let mut cached_manifest_and_e_tag = None; + if let Some(cache) = self.cache.as_ref() { + if let Some(cache_bytes) = cache.get(&cache_key).await.ok().flatten() { + let met = serde_json::from_slice::(&cache_bytes.bytes).ok(); + cached_manifest_and_e_tag = met; } - Err(err) => { - return Err(Status::new( - err.code().into(), - format!("could not scout logs: {err:?}"), - )); + } + // NOTE(rescrv): We verify and if verification fails, we take the cached manifest to fall + // back to the uncached path. + if let Some(cached) = cached_manifest_and_e_tag.as_ref() { + if !log_reader.verify(cached).await.unwrap_or_default() { + cached_manifest_and_e_tag.take(); } - }; + } + let (start_position, limit_position) = + if let Some(manifest_and_e_tag) = cached_manifest_and_e_tag { + ( + manifest_and_e_tag.manifest.oldest_timestamp(), + manifest_and_e_tag.manifest.next_write_timestamp(), + ) + } else { + let (start_position, limit_position) = match log_reader.manifest_and_e_tag().await { + Ok(Some(manifest_and_e_tag)) => { + if let Some(cache) = self.cache.as_ref() { + let json = serde_json::to_string(&manifest_and_e_tag) + .map_err(|err| Status::unknown(err.to_string()))?; + let cached_bytes = CachedBytes { + bytes: Vec::from(json), + }; + cache.insert(cache_key, cached_bytes).await; + } + ( + manifest_and_e_tag.manifest.oldest_timestamp(), + manifest_and_e_tag.manifest.next_write_timestamp(), + ) + } + Ok(None) => (LogPosition::from_offset(1), LogPosition::from_offset(1)), + Err(wal3::Error::UninitializedLog) => { + return Err(Status::not_found(format!( + "collection {collection_id} not found" + ))); + } + Err(err) => { + return Err(Status::new( + err.code().into(), + format!("could not scout logs: {err:?}"), + )); + } + }; + (start_position, limit_position) + }; let start_offset = start_position.offset() as i64; let limit_offset = limit_position.offset() as i64; Ok(Response::new(ScoutLogsResponse { diff --git a/rust/storage/src/admissioncontrolleds3.rs b/rust/storage/src/admissioncontrolleds3.rs index d6604581f94..e2bcc7da4d0 100644 --- a/rust/storage/src/admissioncontrolleds3.rs +++ b/rust/storage/src/admissioncontrolleds3.rs @@ -458,6 +458,23 @@ impl AdmissionControlledS3Storage { .await } + pub async fn confirm_same(&self, key: &str, e_tag: &ETag) -> Result { + self.metrics.nac_outstanding_read_requests.record( + self.metrics + .outstanding_read_requests + .load(Ordering::Relaxed) as u64, + &self.metrics.hostname_attribute, + ); + self.metrics + .outstanding_read_requests + .fetch_add(1, Ordering::Relaxed); + let res = self.storage.confirm_same(key, e_tag).await; + self.metrics + .outstanding_read_requests + .fetch_sub(1, Ordering::Relaxed); + res + } + async fn execute_fetch( fetch_fn: FetchFn, input: Result<(Arc>, Option), StorageError>, diff --git a/rust/storage/src/lib.rs b/rust/storage/src/lib.rs index 508691f9bdd..2cf3195ded5 100644 --- a/rust/storage/src/lib.rs +++ b/rust/storage/src/lib.rs @@ -289,6 +289,19 @@ impl Storage { } } + // NOTE(rescrv): Returns Ok(true) if the file is definitely the same. Returns Ok(false) if + // the file cannot be confirmed to be the same but it exists. Returns Err on error. It is up + // to the user to know how they are confirming the same and to react to Ok(false) even if the + // file is definitely the same file on storage. + pub async fn confirm_same(&self, key: &str, e_tag: &ETag) -> Result { + match self { + Storage::ObjectStore(object_store) => object_store.confirm_same(key, e_tag).await, + Storage::S3(s3) => s3.confirm_same(key, e_tag).await, + Storage::Local(local) => local.confirm_same(key, e_tag).await, + Storage::AdmissionControlledS3(as3) => as3.confirm_same(key, e_tag).await, + } + } + pub async fn put_file( &self, key: &str, diff --git a/rust/storage/src/local.rs b/rust/storage/src/local.rs index 93957658b2d..40379b140ff 100644 --- a/rust/storage/src/local.rs +++ b/rust/storage/src/local.rs @@ -67,6 +67,10 @@ impl LocalStorage { Ok((bytes, Some(etag))) } + pub async fn confirm_same(&self, _: &str, _: &ETag) -> Result { + Err(StorageError::NotImplemented) + } + pub async fn put_bytes( &self, key: &str, diff --git a/rust/storage/src/object_store.rs b/rust/storage/src/object_store.rs index 10e970ab30b..ef7368166a7 100644 --- a/rust/storage/src/object_store.rs +++ b/rust/storage/src/object_store.rs @@ -153,6 +153,10 @@ impl ObjectStore { Err(StorageError::NotImplemented) } + pub async fn confirm_same(&self, _: &str, _: &ETag) -> Result { + Err(StorageError::NotImplemented) + } + pub async fn get_parallel(&self, key: &str) -> Result>, StorageError> { let meta = self.object_store.head(&Path::from(key)).await?; let file_size = meta.size; diff --git a/rust/storage/src/s3.rs b/rust/storage/src/s3.rs index f5b030e3f62..a1bc73db942 100644 --- a/rust/storage/src/s3.rs +++ b/rust/storage/src/s3.rs @@ -211,6 +211,31 @@ impl S3Storage { } } + #[allow(clippy::type_complexity)] + pub async fn confirm_same(&self, key: &str, e_tag: &ETag) -> Result { + let res = self + .client + .head_object() + .bucket(self.bucket.clone()) + .key(key) + .send() + .await; + match res { + Ok(res) => Ok(res.e_tag() == Some(&e_tag.0)), + Err(e) => match e { + SdkError::ServiceError(err) => { + let inner = err.into_err(); + Err(StorageError::Generic { + source: Arc::new(inner), + }) + } + _ => Err(StorageError::Generic { + source: Arc::new(e), + }), + }, + } + } + #[allow(clippy::type_complexity)] async fn get_stream_and_e_tag( &self, @@ -1516,4 +1541,71 @@ mod tests { eprintln!("Successfully deleted: {:#?}", delete_result.deleted); eprintln!("Errors for non-existent files: {:#?}", delete_result.errors); } + + #[tokio::test] + async fn test_k8s_integration_confirm_same_with_matching_etag() { + let storage = setup_with_bucket(1024 * 1024 * 8, 1024 * 1024 * 8).await; + + let test_data = "test data for etag validation"; + let etag = storage + .put_bytes( + "test-confirm-same", + test_data.as_bytes().to_vec(), + PutOptions::default(), + ) + .await + .unwrap() + .expect("put_bytes should return etag"); + + let result = storage + .confirm_same("test-confirm-same", &etag) + .await + .unwrap(); + assert!(result, "confirm_same should return true for matching etag"); + } + + #[tokio::test] + async fn test_k8s_integration_confirm_same_with_non_matching_etag() { + let storage = setup_with_bucket(1024 * 1024 * 8, 1024 * 1024 * 8).await; + + let test_data = "test data for etag validation"; + let _etag = storage + .put_bytes( + "test-confirm-same", + test_data.as_bytes().to_vec(), + PutOptions::default(), + ) + .await + .unwrap() + .expect("put_bytes should return etag"); + + let fake_etag = ETag("fake-etag-wont-match".to_string()); + let result = storage + .confirm_same("test-confirm-same", &fake_etag) + .await + .unwrap(); + assert!( + !result, + "confirm_same should return false for non-matching etag" + ); + } + + #[tokio::test] + async fn test_k8s_integration_confirm_same_with_nonexistent_file() { + let storage = setup_with_bucket(1024 * 1024 * 8, 1024 * 1024 * 8).await; + + let fake_etag = ETag("fake-etag".to_string()); + let result = storage.confirm_same("nonexistent-file", &fake_etag).await; + + assert!( + result.is_err(), + "confirm_same should return error for nonexistent file" + ); + match result.unwrap_err() { + StorageError::Generic { source: _ } => { + // This is expected - the head operation will fail on nonexistent file + } + other => panic!("Expected Generic error, got: {:?}", other), + } + } } diff --git a/rust/wal3/src/manifest.rs b/rust/wal3/src/manifest.rs index 596211669a3..4723be30fa5 100644 --- a/rust/wal3/src/manifest.rs +++ b/rust/wal3/src/manifest.rs @@ -723,6 +723,17 @@ impl Manifest { Ok(()) } + /// Validate the e_tag against the manifest on object storage. + pub async fn head( + _: &ThrottleOptions, + storage: &Storage, + prefix: &str, + e_tag: &ETag, + ) -> Result { + let path = manifest_path(prefix); + Ok(storage.confirm_same(&path, e_tag).await.map_err(Arc::new)?) + } + /// Load the latest manifest from object storage. pub async fn load( options: &ThrottleOptions, @@ -1719,4 +1730,78 @@ mod tests { "Expected None when fragments_to_drop_limit < initial_seq_no" ); } + + #[tokio::test] + async fn test_k8s_integration_head_returns_true_for_matching_etag() { + use chroma_storage::s3::s3_client_for_test_with_new_bucket; + + let storage = s3_client_for_test_with_new_bucket().await; + let prefix = "test-head-matching"; + let throttle_options = crate::ThrottleOptions::default(); + + let manifest = Manifest::new_empty("test-writer"); + + Manifest::initialize_from_manifest( + &crate::LogWriterOptions::default(), + &storage, + prefix, + manifest, + ) + .await + .unwrap(); + + let (_loaded_manifest, etag) = Manifest::load(&throttle_options, &storage, prefix) + .await + .unwrap() + .unwrap(); + + let result = Manifest::head(&throttle_options, &storage, prefix, &etag) + .await + .unwrap(); + assert!(result, "head should return true for matching etag"); + } + + #[tokio::test] + async fn test_k8s_integration_head_returns_false_for_non_matching_etag() { + use chroma_storage::s3::s3_client_for_test_with_new_bucket; + + let storage = s3_client_for_test_with_new_bucket().await; + let prefix = "test-head-non-matching"; + let throttle_options = crate::ThrottleOptions::default(); + + let manifest = Manifest::new_empty("test-writer"); + + Manifest::initialize_from_manifest( + &crate::LogWriterOptions::default(), + &storage, + prefix, + manifest, + ) + .await + .unwrap(); + + let fake_etag = chroma_storage::ETag("fake-etag-wont-match".to_string()); + + let result = Manifest::head(&throttle_options, &storage, prefix, &fake_etag) + .await + .unwrap(); + assert!(!result, "head should return false for non-matching etag"); + } + + #[tokio::test] + async fn test_k8s_integration_head_returns_error_for_nonexistent_manifest() { + use chroma_storage::s3::s3_client_for_test_with_new_bucket; + + let storage = s3_client_for_test_with_new_bucket().await; + let prefix = "test-head-nonexistent"; + let throttle_options = crate::ThrottleOptions::default(); + + let fake_etag = chroma_storage::ETag("fake-etag".to_string()); + + let result = Manifest::head(&throttle_options, &storage, prefix, &fake_etag).await; + assert!( + result.is_err(), + "head should return error for nonexistent manifest" + ); + } } diff --git a/rust/wal3/src/reader.rs b/rust/wal3/src/reader.rs index f091bc5c6a6..e079e6182d1 100644 --- a/rust/wal3/src/reader.rs +++ b/rust/wal3/src/reader.rs @@ -13,8 +13,8 @@ use chroma_storage::{ }; use crate::{ - parse_fragment_path, Error, Fragment, LogPosition, LogReaderOptions, Manifest, ScrubError, - ScrubSuccess, Snapshot, SnapshotCache, + parse_fragment_path, Error, Fragment, LogPosition, LogReaderOptions, Manifest, ManifestAndETag, + ScrubError, ScrubSuccess, Snapshot, SnapshotCache, }; fn ranges_overlap(lhs: (LogPosition, LogPosition), rhs: (LogPosition, LogPosition)) -> bool { @@ -74,6 +74,16 @@ impl LogReader { self.cache = Some(cache); } + pub async fn verify(&self, manifest_and_etag: &ManifestAndETag) -> Result { + Manifest::head( + &self.options.throttle, + &self.storage, + &self.prefix, + &manifest_and_etag.e_tag, + ) + .await + } + pub async fn manifest(&self) -> Result, Error> { Ok( Manifest::load(&self.options.throttle, &self.storage, &self.prefix) @@ -82,6 +92,14 @@ impl LogReader { ) } + pub async fn manifest_and_e_tag(&self) -> Result, Error> { + match Manifest::load(&self.options.throttle, &self.storage, &self.prefix).await { + Ok(Some((manifest, e_tag))) => Ok(Some(ManifestAndETag { manifest, e_tag })), + Ok(None) => Ok(None), + Err(err) => Err(err), + } + } + pub async fn oldest_timestamp(&self) -> Result { let Some((manifest, _)) = Manifest::load(&self.options.throttle, &self.storage, &self.prefix).await? @@ -3438,4 +3456,139 @@ mod tests { } ); } + + #[tokio::test] + async fn verify_returns_true_when_manifest_etag_matches() { + let storage = Arc::new(chroma_storage::s3::s3_client_for_test_with_new_bucket().await); + let prefix = "test-prefix".to_string(); + let options = LogReaderOptions::default(); + let reader = LogReader::new(options, storage.clone(), prefix.clone()); + + let manifest = Manifest::new_empty("test-writer"); + Manifest::initialize_from_manifest( + &crate::LogWriterOptions::default(), + &storage, + &prefix, + manifest.clone(), + ) + .await + .unwrap(); + + let (loaded_manifest, etag) = Manifest::load(&reader.options.throttle, &storage, &prefix) + .await + .unwrap() + .unwrap(); + + let manifest_and_etag = ManifestAndETag { + manifest: loaded_manifest, + e_tag: etag, + }; + + let result = reader.verify(&manifest_and_etag).await.unwrap(); + assert!(result, "verify should return true for matching etag"); + } + + #[tokio::test] + async fn verify_returns_false_when_manifest_etag_does_not_match() { + let storage = Arc::new(chroma_storage::s3::s3_client_for_test_with_new_bucket().await); + let prefix = "test-prefix".to_string(); + let options = LogReaderOptions::default(); + let reader = LogReader::new(options, storage.clone(), prefix.clone()); + + let manifest = Manifest::new_empty("test-writer"); + Manifest::initialize_from_manifest( + &crate::LogWriterOptions::default(), + &storage, + &prefix, + manifest.clone(), + ) + .await + .unwrap(); + + let fake_etag = chroma_storage::ETag("fake-etag-that-wont-match".to_string()); + let manifest_and_etag = ManifestAndETag { + manifest, + e_tag: fake_etag, + }; + + let result = reader.verify(&manifest_and_etag).await.unwrap(); + assert!(!result, "verify should return false for non-matching etag"); + } + + #[tokio::test] + async fn verify_handles_storage_errors_gracefully() { + use chroma_storage::local::LocalStorage; + + let storage = Arc::new(chroma_storage::Storage::Local(LocalStorage::new( + "./test-local", + ))); + let prefix = "test-prefix".to_string(); + let options = LogReaderOptions::default(); + let reader = LogReader::new(options, storage, prefix); + + let manifest = Manifest::new_empty("test-writer"); + let fake_etag = chroma_storage::ETag("fake-etag".to_string()); + let manifest_and_etag = ManifestAndETag { + manifest, + e_tag: fake_etag, + }; + + let result = reader.verify(&manifest_and_etag).await; + match result { + Err(crate::Error::StorageError(storage_error)) => { + match storage_error.as_ref() { + chroma_storage::StorageError::NotImplemented => { + // This is expected for local storage + } + _ => panic!("Unexpected storage error: {:?}", storage_error), + } + } + _ => panic!("Expected storage error for local storage verify"), + } + } + + #[tokio::test] + async fn manifest_and_e_tag_returns_both_manifest_and_etag() { + let storage = Arc::new(chroma_storage::s3::s3_client_for_test_with_new_bucket().await); + let prefix = "test-prefix".to_string(); + let options = LogReaderOptions::default(); + let reader = LogReader::new(options, storage.clone(), prefix.clone()); + + let manifest = Manifest::new_empty("test-writer"); + Manifest::initialize_from_manifest( + &crate::LogWriterOptions::default(), + &storage, + &prefix, + manifest.clone(), + ) + .await + .unwrap(); + + let result = reader.manifest_and_e_tag().await.unwrap(); + assert!( + result.is_some(), + "manifest_and_e_tag should return Some when manifest exists" + ); + + let manifest_and_etag = result.unwrap(); + assert_eq!(manifest_and_etag.manifest.writer, "test-writer"); + assert!( + !manifest_and_etag.e_tag.0.is_empty(), + "etag should not be empty" + ); + } + + #[tokio::test] + async fn manifest_and_e_tag_returns_none_when_no_manifest() { + let storage = Arc::new(chroma_storage::s3::s3_client_for_test_with_new_bucket().await); + let prefix = "nonexistent-prefix".to_string(); + let options = LogReaderOptions::default(); + let reader = LogReader::new(options, storage, prefix); + + let result = reader.manifest_and_e_tag().await.unwrap(); + assert!( + result.is_none(), + "manifest_and_e_tag should return None when no manifest exists" + ); + } } From bbc6610233f45d5d816dd9ae3202cb4fe2aefde1 Mon Sep 17 00:00:00 2001 From: Robert Escriva Date: Tue, 9 Sep 2025 16:56:58 -0700 Subject: [PATCH 2/3] more test k8s --- rust/wal3/src/reader.rs | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/rust/wal3/src/reader.rs b/rust/wal3/src/reader.rs index e079e6182d1..b9d8b6dda0e 100644 --- a/rust/wal3/src/reader.rs +++ b/rust/wal3/src/reader.rs @@ -3458,7 +3458,7 @@ mod tests { } #[tokio::test] - async fn verify_returns_true_when_manifest_etag_matches() { + async fn test_k8s_integration_verify_returns_true_when_manifest_etag_matches() { let storage = Arc::new(chroma_storage::s3::s3_client_for_test_with_new_bucket().await); let prefix = "test-prefix".to_string(); let options = LogReaderOptions::default(); @@ -3489,7 +3489,7 @@ mod tests { } #[tokio::test] - async fn verify_returns_false_when_manifest_etag_does_not_match() { + async fn test_k8s_integration_verify_returns_false_when_manifest_etag_does_not_match() { let storage = Arc::new(chroma_storage::s3::s3_client_for_test_with_new_bucket().await); let prefix = "test-prefix".to_string(); let options = LogReaderOptions::default(); @@ -3516,7 +3516,7 @@ mod tests { } #[tokio::test] - async fn verify_handles_storage_errors_gracefully() { + async fn test_k8s_integration_verify_handles_storage_errors_gracefully() { use chroma_storage::local::LocalStorage; let storage = Arc::new(chroma_storage::Storage::Local(LocalStorage::new( @@ -3548,7 +3548,7 @@ mod tests { } #[tokio::test] - async fn manifest_and_e_tag_returns_both_manifest_and_etag() { + async fn test_k8s_integration_manifest_and_e_tag_returns_both_manifest_and_etag() { let storage = Arc::new(chroma_storage::s3::s3_client_for_test_with_new_bucket().await); let prefix = "test-prefix".to_string(); let options = LogReaderOptions::default(); @@ -3579,7 +3579,7 @@ mod tests { } #[tokio::test] - async fn manifest_and_e_tag_returns_none_when_no_manifest() { + async fn test_k8s_integration_manifest_and_e_tag_returns_none_when_no_manifest() { let storage = Arc::new(chroma_storage::s3::s3_client_for_test_with_new_bucket().await); let prefix = "nonexistent-prefix".to_string(); let options = LogReaderOptions::default(); From 8f303b138735a48d26edf33b95bec1ab3e6450d2 Mon Sep 17 00:00:00 2001 From: Robert Escriva Date: Mon, 15 Sep 2025 10:48:33 -0700 Subject: [PATCH 3/3] docs --- rust/log-service/src/lib.rs | 7 +++++++ rust/wal3/src/reader.rs | 2 ++ 2 files changed, 9 insertions(+) diff --git a/rust/log-service/src/lib.rs b/rust/log-service/src/lib.rs index 2ac45448033..781b808bc52 100644 --- a/rust/log-service/src/lib.rs +++ b/rust/log-service/src/lib.rs @@ -1146,6 +1146,13 @@ impl LogServer { // NOTE(rescrv): We verify and if verification fails, we take the cached manifest to fall // back to the uncached path. if let Some(cached) = cached_manifest_and_e_tag.as_ref() { + // Here's the linearization point. We have a cached manifest and e_tag. + // + // If we verify (perform a head), then statistically speaking, the manifest and e_tag + // we have in hand is identical (barring md5 collision) to the manifest and e_tag on + // storage. We can use the cached manifest and e_tag in this case because it is the + // identical flow whether we read the whole manifest from storage or whether we pretend + // to read it/verify it with a HEAD and then read out of cache. if !log_reader.verify(cached).await.unwrap_or_default() { cached_manifest_and_e_tag.take(); } diff --git a/rust/wal3/src/reader.rs b/rust/wal3/src/reader.rs index b9d8b6dda0e..c36e325780f 100644 --- a/rust/wal3/src/reader.rs +++ b/rust/wal3/src/reader.rs @@ -74,6 +74,8 @@ impl LogReader { self.cache = Some(cache); } + /// Verify that the reader would read the same manifest as the one provided in + /// manifest_and_etag, but do it in a way that doesn't load the whole manifest. pub async fn verify(&self, manifest_and_etag: &ManifestAndETag) -> Result { Manifest::head( &self.options.throttle,