Skip to content

Commit 568b89c

Browse files
committed
[ENH] Scout logs issues a HEAD for cached manifests.
1 parent 8f7bef7 commit 568b89c

File tree

8 files changed

+144
-16
lines changed

8 files changed

+144
-16
lines changed

rust/log-service/src/lib.rs

Lines changed: 50 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1129,28 +1129,64 @@ impl LogServer {
11291129
let collection_id = Uuid::parse_str(&scout_logs.collection_id)
11301130
.map(CollectionUuid)
11311131
.map_err(|_| Status::invalid_argument("Failed to parse collection id"))?;
1132-
11331132
let prefix = collection_id.storage_prefix_for_log();
11341133
let log_reader = LogReader::new(
11351134
self.config.reader.clone(),
11361135
Arc::clone(&self.storage),
11371136
prefix,
11381137
);
1139-
let (start_position, limit_position) = match log_reader.manifest().await {
1140-
Ok(Some(manifest)) => (manifest.oldest_timestamp(), manifest.next_write_timestamp()),
1141-
Ok(None) => (LogPosition::from_offset(1), LogPosition::from_offset(1)),
1142-
Err(wal3::Error::UninitializedLog) => {
1143-
return Err(Status::not_found(format!(
1144-
"collection {collection_id} not found"
1145-
)));
1138+
let cache_key = cache_key_for_manifest_and_etag(collection_id);
1139+
let mut cached_manifest_and_e_tag = None;
1140+
if let Some(cache) = self.cache.as_ref() {
1141+
if let Some(cache_bytes) = cache.get(&cache_key).await.ok().flatten() {
1142+
let met = serde_json::from_slice::<ManifestAndETag>(&cache_bytes.bytes).ok();
1143+
cached_manifest_and_e_tag = met;
11461144
}
1147-
Err(err) => {
1148-
return Err(Status::new(
1149-
err.code().into(),
1150-
format!("could not scout logs: {err:?}"),
1151-
));
1145+
}
1146+
// NOTE(rescrv): We verify and if verification fails, we take the cached manifest to fall
1147+
// back to the uncached path.
1148+
if let Some(cached) = cached_manifest_and_e_tag.as_ref() {
1149+
if !log_reader.verify(cached).await.unwrap_or_default() {
1150+
cached_manifest_and_e_tag.take();
11521151
}
1153-
};
1152+
}
1153+
let (start_position, limit_position) =
1154+
if let Some(manifest_and_e_tag) = cached_manifest_and_e_tag {
1155+
(
1156+
manifest_and_e_tag.manifest.oldest_timestamp(),
1157+
manifest_and_e_tag.manifest.next_write_timestamp(),
1158+
)
1159+
} else {
1160+
let (start_position, limit_position) = match log_reader.manifest_and_e_tag().await {
1161+
Ok(Some(manifest_and_e_tag)) => {
1162+
if let Some(cache) = self.cache.as_ref() {
1163+
let json = serde_json::to_string(&manifest_and_e_tag)
1164+
.map_err(|err| Status::unknown(err.to_string()))?;
1165+
let cached_bytes = CachedBytes {
1166+
bytes: Vec::from(json),
1167+
};
1168+
cache.insert(cache_key, cached_bytes).await;
1169+
}
1170+
(
1171+
manifest_and_e_tag.manifest.oldest_timestamp(),
1172+
manifest_and_e_tag.manifest.next_write_timestamp(),
1173+
)
1174+
}
1175+
Ok(None) => (LogPosition::from_offset(1), LogPosition::from_offset(1)),
1176+
Err(wal3::Error::UninitializedLog) => {
1177+
return Err(Status::not_found(format!(
1178+
"collection {collection_id} not found"
1179+
)));
1180+
}
1181+
Err(err) => {
1182+
return Err(Status::new(
1183+
err.code().into(),
1184+
format!("could not scout logs: {err:?}"),
1185+
));
1186+
}
1187+
};
1188+
(start_position, limit_position)
1189+
};
11541190
let start_offset = start_position.offset() as i64;
11551191
let limit_offset = limit_position.offset() as i64;
11561192
Ok(Response::new(ScoutLogsResponse {

rust/storage/src/admissioncontrolleds3.rs

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -458,6 +458,23 @@ impl AdmissionControlledS3Storage {
458458
.await
459459
}
460460

461+
pub async fn confirm_same(&self, key: &str, e_tag: &ETag) -> Result<bool, StorageError> {
462+
self.metrics.nac_outstanding_read_requests.record(
463+
self.metrics
464+
.outstanding_read_requests
465+
.load(Ordering::Relaxed) as u64,
466+
&self.metrics.hostname_attribute,
467+
);
468+
self.metrics
469+
.outstanding_read_requests
470+
.fetch_add(1, Ordering::Relaxed);
471+
let res = self.storage.confirm_same(key, e_tag).await;
472+
self.metrics
473+
.outstanding_read_requests
474+
.fetch_sub(1, Ordering::Relaxed);
475+
res
476+
}
477+
461478
async fn execute_fetch<FetchReturn, FetchFn, FetchFut>(
462479
fetch_fn: FetchFn,
463480
input: Result<(Arc<Vec<u8>>, Option<ETag>), StorageError>,

rust/storage/src/lib.rs

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -289,6 +289,19 @@ impl Storage {
289289
}
290290
}
291291

292+
// NOTE(rescrv): Returns Ok(true) if the file is definitely the same. Returns Ok(false) if
293+
// the file cannot be confirmed to be the same but it exists. Returns Err on error. It is up
294+
// to the user to know how they are confirming the same and to react to Ok(false) even if the
295+
// file is definitely the same file on storage.
296+
pub async fn confirm_same(&self, key: &str, e_tag: &ETag) -> Result<bool, StorageError> {
297+
match self {
298+
Storage::ObjectStore(object_store) => object_store.confirm_same(key, e_tag).await,
299+
Storage::S3(s3) => s3.confirm_same(key, e_tag).await,
300+
Storage::Local(local) => local.confirm_same(key, e_tag).await,
301+
Storage::AdmissionControlledS3(as3) => as3.confirm_same(key, e_tag).await,
302+
}
303+
}
304+
292305
pub async fn put_file(
293306
&self,
294307
key: &str,

rust/storage/src/local.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,10 @@ impl LocalStorage {
6767
Ok((bytes, Some(etag)))
6868
}
6969

70+
pub async fn confirm_same(&self, _: &str, _: &ETag) -> Result<bool, StorageError> {
71+
Err(StorageError::NotImplemented)
72+
}
73+
7074
pub async fn put_bytes(
7175
&self,
7276
key: &str,

rust/storage/src/object_store.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -153,6 +153,10 @@ impl ObjectStore {
153153
Err(StorageError::NotImplemented)
154154
}
155155

156+
pub async fn confirm_same(&self, _: &str, _: &ETag) -> Result<bool, StorageError> {
157+
Err(StorageError::NotImplemented)
158+
}
159+
156160
pub async fn get_parallel(&self, key: &str) -> Result<Arc<Vec<u8>>, StorageError> {
157161
let meta = self.object_store.head(&Path::from(key)).await?;
158162
let file_size = meta.size;

rust/storage/src/s3.rs

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -211,6 +211,31 @@ impl S3Storage {
211211
}
212212
}
213213

214+
#[allow(clippy::type_complexity)]
215+
pub async fn confirm_same(&self, key: &str, e_tag: &ETag) -> Result<bool, StorageError> {
216+
let res = self
217+
.client
218+
.head_object()
219+
.bucket(self.bucket.clone())
220+
.key(key)
221+
.send()
222+
.await;
223+
match res {
224+
Ok(res) => Ok(res.e_tag() == Some(&e_tag.0)),
225+
Err(e) => match e {
226+
SdkError::ServiceError(err) => {
227+
let inner = err.into_err();
228+
Err(StorageError::Generic {
229+
source: Arc::new(inner),
230+
})
231+
}
232+
_ => Err(StorageError::Generic {
233+
source: Arc::new(e),
234+
}),
235+
},
236+
}
237+
}
238+
214239
#[allow(clippy::type_complexity)]
215240
async fn get_stream_and_e_tag(
216241
&self,

rust/wal3/src/manifest.rs

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -723,6 +723,17 @@ impl Manifest {
723723
Ok(())
724724
}
725725

726+
/// Validate the e_tag against the manifest on object storage.
727+
pub async fn head(
728+
_: &ThrottleOptions,
729+
storage: &Storage,
730+
prefix: &str,
731+
e_tag: &ETag,
732+
) -> Result<bool, Error> {
733+
let path = manifest_path(prefix);
734+
Ok(storage.confirm_same(&path, e_tag).await.map_err(Arc::new)?)
735+
}
736+
726737
/// Load the latest manifest from object storage.
727738
pub async fn load(
728739
options: &ThrottleOptions,

rust/wal3/src/reader.rs

Lines changed: 20 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,8 +13,8 @@ use chroma_storage::{
1313
};
1414

1515
use crate::{
16-
parse_fragment_path, Error, Fragment, LogPosition, LogReaderOptions, Manifest, ScrubError,
17-
ScrubSuccess, Snapshot, SnapshotCache,
16+
parse_fragment_path, Error, Fragment, LogPosition, LogReaderOptions, Manifest, ManifestAndETag,
17+
ScrubError, ScrubSuccess, Snapshot, SnapshotCache,
1818
};
1919

2020
fn ranges_overlap(lhs: (LogPosition, LogPosition), rhs: (LogPosition, LogPosition)) -> bool {
@@ -74,6 +74,16 @@ impl LogReader {
7474
self.cache = Some(cache);
7575
}
7676

77+
pub async fn verify(&self, manifest_and_etag: &ManifestAndETag) -> Result<bool, Error> {
78+
Manifest::head(
79+
&self.options.throttle,
80+
&self.storage,
81+
&self.prefix,
82+
&manifest_and_etag.e_tag,
83+
)
84+
.await
85+
}
86+
7787
pub async fn manifest(&self) -> Result<Option<Manifest>, Error> {
7888
Ok(
7989
Manifest::load(&self.options.throttle, &self.storage, &self.prefix)
@@ -82,6 +92,14 @@ impl LogReader {
8292
)
8393
}
8494

95+
pub async fn manifest_and_e_tag(&self) -> Result<Option<ManifestAndETag>, Error> {
96+
match Manifest::load(&self.options.throttle, &self.storage, &self.prefix).await {
97+
Ok(Some((manifest, e_tag))) => Ok(Some(ManifestAndETag { manifest, e_tag })),
98+
Ok(None) => Ok(None),
99+
Err(err) => Err(err),
100+
}
101+
}
102+
85103
pub async fn oldest_timestamp(&self) -> Result<LogPosition, Error> {
86104
let Some((manifest, _)) =
87105
Manifest::load(&self.options.throttle, &self.storage, &self.prefix).await?

0 commit comments

Comments
 (0)