Skip to content

Commit 7e10f6f

Browse files
committed
Merge branch 'rescrv/cache-manifest-etag' into rescrv/scout-logs-uses-head
2 parents 0dcdabb + a24196a commit 7e10f6f

File tree

5 files changed

+38
-25
lines changed

5 files changed

+38
-25
lines changed

rust/log-service/src/lib.rs

Lines changed: 13 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ use uuid::Uuid;
4646
use wal3::{
4747
Cursor, CursorName, CursorStore, CursorStoreOptions, Fragment, GarbageCollectionOptions,
4848
Limits, LogPosition, LogReader, LogReaderOptions, LogWriter, LogWriterOptions, Manifest,
49-
MarkDirty as MarkDirtyTrait, Witness,
49+
ManifestAndETag, MarkDirty as MarkDirtyTrait, Witness,
5050
};
5151

5252
pub mod state_hash_table;
@@ -288,10 +288,10 @@ async fn get_log_from_handle_with_mutex_held<'a>(
288288
})
289289
}
290290

291-
////////////////////////////////////// cache_key_for_manifest //////////////////////////////////////
291+
////////////////////////////////////////// cache_key_for_* /////////////////////////////////////////
292292

293-
fn cache_key_for_manifest(collection_id: CollectionUuid) -> String {
294-
format!("{collection_id}::MANIFEST")
293+
fn cache_key_for_manifest_and_etag(collection_id: CollectionUuid) -> String {
294+
format!("{collection_id}::MANIFEST/ETAG")
295295
}
296296

297297
fn cache_key_for_cursor(collection_id: CollectionUuid, name: &CursorName) -> String {
@@ -1104,11 +1104,11 @@ impl LogServer {
11041104
Err(err) => return Err(Status::new(err.code().into(), err.to_string())),
11051105
};
11061106
if let Some(cache) = self.cache.as_ref() {
1107-
let cache_key = cache_key_for_manifest(collection_id);
1108-
if let Some(manifest) = log.manifest() {
1109-
if let Ok(manifest_bytes) = serde_json::to_vec(&manifest) {
1107+
let cache_key = cache_key_for_manifest_and_etag(collection_id);
1108+
if let Some(manifest_and_etag) = log.manifest_and_etag() {
1109+
if let Ok(manifest_and_etag_bytes) = serde_json::to_vec(&manifest_and_etag) {
11101110
let cache_value = CachedBytes {
1111-
bytes: manifest_bytes,
1111+
bytes: manifest_and_etag_bytes,
11121112
};
11131113
cache.insert(cache_key, cache_value).await;
11141114
}
@@ -1181,16 +1181,17 @@ impl LogServer {
11811181
pull_logs: &PullLogsRequest,
11821182
) -> Option<Vec<Fragment>> {
11831183
if let Some(cache) = self.cache.as_ref() {
1184-
let cache_key = cache_key_for_manifest(collection_id);
1184+
let cache_key = cache_key_for_manifest_and_etag(collection_id);
11851185
let cached_bytes = cache.get(&cache_key).await.ok().flatten()?;
1186-
let manifest: Manifest = serde_json::from_slice(&cached_bytes.bytes).ok()?;
1186+
let manifest_and_etag: ManifestAndETag =
1187+
serde_json::from_slice(&cached_bytes.bytes).ok()?;
11871188
let limits = Limits {
11881189
max_files: Some(pull_logs.batch_size as u64 + 1),
11891190
max_bytes: None,
11901191
max_records: Some(pull_logs.batch_size as u64),
11911192
};
11921193
LogReader::scan_from_manifest(
1193-
&manifest,
1194+
&manifest_and_etag.manifest,
11941195
LogPosition::from_offset(pull_logs.start_from_offset as u64),
11951196
limits,
11961197
)
@@ -1715,7 +1716,7 @@ impl LogServer {
17151716
let collection_id = Uuid::parse_str(&x)
17161717
.map(CollectionUuid)
17171718
.map_err(|_| Status::invalid_argument("Failed to parse collection id"))?;
1718-
Some(cache_key_for_manifest(collection_id))
1719+
Some(cache_key_for_manifest_and_etag(collection_id))
17191720
}
17201721
Some(EntryToEvict::Fragment(f)) => {
17211722
let collection_id = Uuid::parse_str(&f.collection_id)

rust/wal3/src/lib.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,9 @@ pub use copy::copy;
2323
pub use cursors::{Cursor, CursorName, CursorStore, Witness};
2424
pub use destroy::destroy;
2525
pub use gc::{Garbage, GarbageCollector};
26-
pub use manifest::{unprefixed_snapshot_path, Manifest, Snapshot, SnapshotPointer};
26+
pub use manifest::{
27+
unprefixed_snapshot_path, Manifest, ManifestAndETag, Snapshot, SnapshotPointer,
28+
};
2729
pub use manifest_manager::ManifestManager;
2830
pub use reader::{Limits, LogReader};
2931
pub use snapshot_cache::SnapshotCache;

rust/wal3/src/manifest.rs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -925,6 +925,14 @@ impl Manifest {
925925
}
926926
}
927927

928+
////////////////////////////////////////// ManifestAndETag /////////////////////////////////////////
929+
930+
#[derive(Clone, Debug, Eq, PartialEq, serde::Deserialize, serde::Serialize)]
931+
pub struct ManifestAndETag {
932+
pub manifest: Manifest,
933+
pub e_tag: ETag,
934+
}
935+
928936
/////////////////////////////////////////////// tests //////////////////////////////////////////////
929937

930938
#[cfg(test)]

rust/wal3/src/manifest_manager.rs

Lines changed: 3 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -4,22 +4,14 @@ use std::time::Instant;
44
use chroma_storage::{ETag, Storage};
55

66
use crate::gc::Garbage;
7-
use crate::manifest::{Manifest, Snapshot};
7+
use crate::manifest::{Manifest, ManifestAndETag, Snapshot};
88
use crate::reader::read_fragment;
99
use crate::writer::MarkDirty;
1010
use crate::{
1111
unprefixed_fragment_path, Error, Fragment, FragmentSeqNo, GarbageCollectionOptions,
1212
LogPosition, SnapshotCache, SnapshotOptions, SnapshotPointerOrFragmentSeqNo, ThrottleOptions,
1313
};
1414

15-
////////////////////////////////////////// ManifestAndETag /////////////////////////////////////////
16-
17-
#[derive(Debug)]
18-
struct ManifestAndETag {
19-
manifest: Manifest,
20-
e_tag: ETag,
21-
}
22-
2315
////////////////////////////////////////////// Staging /////////////////////////////////////////////
2416

2517
#[derive(Debug)]
@@ -270,9 +262,9 @@ impl ManifestManager {
270262
}
271263

272264
/// Return the latest stable manifest
273-
pub fn latest(&self) -> Manifest {
265+
pub fn latest(&self) -> ManifestAndETag {
274266
let staging = self.staging.lock().unwrap();
275-
staging.stable.manifest.clone()
267+
staging.stable.clone()
276268
}
277269

278270
/// Recover from a fault in writing. It is possible that fragments have been written that are

rust/wal3/src/writer.rs

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,8 @@ use tracing_opentelemetry::OpenTelemetrySpanExt;
1818
use crate::{
1919
unprefixed_fragment_path, BatchManager, CursorStore, CursorStoreOptions, Error,
2020
ExponentialBackoff, Fragment, FragmentSeqNo, Garbage, GarbageCollectionOptions, LogPosition,
21-
LogReader, LogReaderOptions, LogWriterOptions, Manifest, ManifestManager, ThrottleOptions,
21+
LogReader, LogReaderOptions, LogWriterOptions, Manifest, ManifestAndETag, ManifestManager,
22+
ThrottleOptions,
2223
};
2324

2425
/// The epoch writer is a counting writer. Every epoch exists. An epoch goes
@@ -275,6 +276,15 @@ impl LogWriter {
275276
}
276277

277278
pub fn manifest(&self) -> Option<Manifest> {
279+
// SAFETY(rescrv): Mutex poisoning.
280+
let inner = self.inner.lock().unwrap();
281+
inner
282+
.writer
283+
.as_ref()
284+
.map(|writer| writer.manifest_manager.latest().manifest)
285+
}
286+
287+
pub fn manifest_and_etag(&self) -> Option<ManifestAndETag> {
278288
// SAFETY(rescrv): Mutex poisoning.
279289
let inner = self.inner.lock().unwrap();
280290
inner

0 commit comments

Comments
 (0)