Skip to content

Commit 3e706f3

Browse files
committed
New filesystem test for eviction breaking
1 parent e7f29fe commit 3e706f3

File tree

3 files changed

+197
-28
lines changed

3 files changed

+197
-28
lines changed

nativelink-store/src/filesystem_store.rs

Lines changed: 14 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -220,12 +220,16 @@ pub trait FileEntry: LenEntry + Send + Sync + Debug + 'static {
220220
pub struct FileEntryImpl {
221221
data_size: u64,
222222
block_size: u64,
223-
encoded_file_path: RwLock<EncodedFilePath>,
223+
encoded_file_path: Arc<RwLock<EncodedFilePath>>,
224224
}
225225

226226
impl FileEntryImpl {
227-
pub fn get_shared_context_for_test(&mut self) -> Arc<SharedContext> {
228-
self.encoded_file_path.get_mut().shared_context.clone()
227+
pub async fn get_shared_context_for_test(&mut self) -> Arc<SharedContext> {
228+
self.encoded_file_path
229+
.read_arc()
230+
.await
231+
.shared_context
232+
.clone()
229233
}
230234
}
231235

@@ -234,7 +238,7 @@ impl FileEntry for FileEntryImpl {
234238
Self {
235239
data_size,
236240
block_size,
237-
encoded_file_path,
241+
encoded_file_path: Arc::new(encoded_file_path),
238242
}
239243
}
240244

@@ -362,8 +366,11 @@ impl LenEntry for FileEntryImpl {
362366
// target file location to the new temp file. `unref()` should only ever be called once.
363367
#[inline]
364368
async fn unref(&self) {
365-
{
366-
let mut encoded_file_path = self.encoded_file_path.write().await;
369+
// If this is called during emplace_file, it'll block, so we need to spawn a new task.
370+
let local_encoded_path = self.encoded_file_path.clone();
371+
debug!("Spawning filesystem_unref");
372+
background_spawn!("filesystem_unref", async move {
373+
let mut encoded_file_path = local_encoded_path.write().await;
367374
if encoded_file_path.path_type == PathType::Temp {
368375
// We are already a temp file that is now marked for deletion on drop.
369376
// This is very rare, but most likely the rename into the content path failed.
@@ -393,7 +400,7 @@ impl LenEntry for FileEntryImpl {
393400
encoded_file_path.path_type = PathType::Temp;
394401
encoded_file_path.key = new_key;
395402
}
396-
}
403+
});
397404
}
398405
}
399406

nativelink-store/tests/fast_slow_store_test.rs

Lines changed: 98 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,9 @@ use std::sync::{Arc, Mutex};
1818

1919
use async_trait::async_trait;
2020
use bytes::Bytes;
21-
use nativelink_config::stores::{FastSlowSpec, MemorySpec, NoopSpec, StoreDirection, StoreSpec};
21+
use nativelink_config::stores::{
22+
EvictionPolicy, FastSlowSpec, MemorySpec, NoopSpec, StoreDirection, StoreSpec,
23+
};
2224
use nativelink_error::{Code, Error, ResultExt, make_err};
2325
use nativelink_macro::nativelink_test;
2426
use nativelink_metric::MetricsComponent;
@@ -32,6 +34,7 @@ use nativelink_util::store_trait::{RemoveItemCallback, Store, StoreDriver, Store
3234
use pretty_assertions::assert_eq;
3335
use rand::rngs::SmallRng;
3436
use rand::{Rng, SeedableRng};
37+
use tracing::debug;
3538

3639
const MEGABYTE_SZ: usize = 1024 * 1024;
3740

@@ -570,3 +573,97 @@ async fn fast_readonly_only_not_updated_on_get() -> Result<(), Error> {
570573
);
571574
Ok(())
572575
}
576+
577+
#[ignore]
578+
#[nativelink_test]
579+
async fn slow_store_safe_eviction() -> Result<(), Error> {
580+
let fast_store_spec = MemorySpec {
581+
eviction_policy: Some(EvictionPolicy {
582+
max_bytes: 100,
583+
..Default::default()
584+
}),
585+
};
586+
let slow_store_spec = MemorySpec {
587+
eviction_policy: Some(EvictionPolicy {
588+
max_bytes: 101,
589+
..Default::default()
590+
}),
591+
};
592+
let fast_store = Store::new(MemoryStore::new(&fast_store_spec));
593+
let slow_store = Store::new(MemoryStore::new(&slow_store_spec));
594+
let fast_slow_store = Store::new(FastSlowStore::new(
595+
&FastSlowSpec {
596+
fast: StoreSpec::Memory(fast_store_spec),
597+
slow: StoreSpec::Memory(slow_store_spec),
598+
fast_direction: StoreDirection::default(),
599+
slow_direction: StoreDirection::default(),
600+
},
601+
fast_store.clone(),
602+
slow_store.clone(),
603+
));
604+
605+
let data1 = make_random_data(100);
606+
let digest1 = DigestInfo::try_new(VALID_HASH, data1.len()).unwrap();
607+
608+
let data2 = make_random_data(10);
609+
let digest2 = DigestInfo::try_new(VALID_HASH, data2.len()).unwrap();
610+
611+
assert_eq!(
612+
fast_slow_store
613+
.has(digest1)
614+
.await
615+
.merge(fast_slow_store.has(digest2).await),
616+
Ok(None),
617+
"Expected data to not exist in store"
618+
);
619+
620+
debug!("Uploading first data...");
621+
fast_slow_store
622+
.update_oneshot(digest1, data1.clone().into())
623+
.await?;
624+
625+
debug!("Uploading second data...");
626+
fast_slow_store
627+
.update_oneshot(digest2, data2.clone().into())
628+
.await?;
629+
630+
debug!("Both uploads complete");
631+
632+
assert_eq!(
633+
fast_slow_store.has(digest1).await,
634+
Ok(Some(100)),
635+
"Expected first data to not exist in store, because eviction"
636+
);
637+
638+
let (tx, mut rx) = make_buf_channel_pair();
639+
640+
assert_eq!(
641+
fast_slow_store.get(digest1, tx).await,
642+
Ok(()),
643+
"Expected first data to not exist in store, because eviction"
644+
);
645+
646+
rx.recv().await.unwrap();
647+
assert_eq!(rx.get_bytes_received(), 100);
648+
649+
assert_eq!(
650+
fast_slow_store.has(digest2).await,
651+
Ok(Some(10)),
652+
"Expected first data to not exist in store, because eviction"
653+
);
654+
655+
let (tx, mut rx) = make_buf_channel_pair();
656+
657+
assert_eq!(
658+
fast_slow_store.get(digest2, tx).await,
659+
Ok(()),
660+
"Expected second data to not exist in store, because eviction"
661+
);
662+
663+
rx.recv().await.unwrap();
664+
assert_eq!(rx.get_bytes_received(), 10);
665+
666+
assert!(false);
667+
668+
Ok(())
669+
}

0 commit comments

Comments
 (0)