Skip to content

Commit c4605cd

Browse files
committed
New filesystem test for eviction breaking
1 parent e7f29fe commit c4605cd

File tree

4 files changed

+98
-60
lines changed

4 files changed

+98
-60
lines changed

nativelink-store/src/filesystem_store.rs

Lines changed: 21 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -220,12 +220,17 @@ pub trait FileEntry: LenEntry + Send + Sync + Debug + 'static {
220220
pub struct FileEntryImpl {
221221
data_size: u64,
222222
block_size: u64,
223-
encoded_file_path: RwLock<EncodedFilePath>,
223+
// We lock around this as it gets rewritten when we move between temp and content types
224+
encoded_file_path: Arc<RwLock<EncodedFilePath>>,
224225
}
225226

226227
impl FileEntryImpl {
227-
pub fn get_shared_context_for_test(&mut self) -> Arc<SharedContext> {
228-
self.encoded_file_path.get_mut().shared_context.clone()
228+
pub async fn get_shared_context_for_test(&mut self) -> Arc<SharedContext> {
229+
self.encoded_file_path
230+
.read_arc()
231+
.await
232+
.shared_context
233+
.clone()
229234
}
230235
}
231236

@@ -234,7 +239,7 @@ impl FileEntry for FileEntryImpl {
234239
Self {
235240
data_size,
236241
block_size,
237-
encoded_file_path,
242+
encoded_file_path: Arc::new(encoded_file_path),
238243
}
239244
}
240245

@@ -362,11 +367,18 @@ impl LenEntry for FileEntryImpl {
362367
// target file location to the new temp file. `unref()` should only ever be called once.
363368
#[inline]
364369
async fn unref(&self) {
365-
{
366-
let mut encoded_file_path = self.encoded_file_path.write().await;
370+
// If this is called during emplace_file, it'll block, so we need to spawn a new task.
371+
let local_encoded_path = self.encoded_file_path.clone();
372+
debug!("Spawning filesystem_unref");
373+
background_spawn!("filesystem_unref", async move {
374+
let mut encoded_file_path = local_encoded_path.write().await;
367375
if encoded_file_path.path_type == PathType::Temp {
368376
// We are already a temp file that is now marked for deletion on drop.
369377
// This is very rare, but most likely the rename into the content path failed.
378+
warn!(
379+
key = ?encoded_file_path.key,
380+
"File is already a temp file",
381+
);
370382
return;
371383
}
372384
let from_path = encoded_file_path.get_file_path();
@@ -388,12 +400,12 @@ impl LenEntry for FileEntryImpl {
388400
key = ?encoded_file_path.key,
389401
?from_path,
390402
?to_path,
391-
"Renamed file",
403+
"Renamed file (unref)",
392404
);
393405
encoded_file_path.path_type = PathType::Temp;
394406
encoded_file_path.key = new_key;
395407
}
396-
}
408+
});
397409
}
398410
}
399411

@@ -531,7 +543,7 @@ async fn add_files_to_cache<Fe: FileEntry>(
531543
if let Err(err) = rename_fn(&from_file, &to_file) {
532544
warn!(?from_file, ?to_file, ?err, "Failed to rename file",);
533545
} else {
534-
debug!(?from_file, ?to_file, "Renamed file",);
546+
debug!(?from_file, ?to_file, "Renamed file (old cache)",);
535547
}
536548
}
537549
Ok(())

nativelink-store/tests/fast_slow_store_test.rs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,9 @@ use std::sync::{Arc, Mutex};
1818

1919
use async_trait::async_trait;
2020
use bytes::Bytes;
21-
use nativelink_config::stores::{FastSlowSpec, MemorySpec, NoopSpec, StoreDirection, StoreSpec};
21+
use nativelink_config::stores::{
22+
EvictionPolicy, FastSlowSpec, MemorySpec, NoopSpec, StoreDirection, StoreSpec,
23+
};
2224
use nativelink_error::{Code, Error, ResultExt, make_err};
2325
use nativelink_macro::nativelink_test;
2426
use nativelink_metric::MetricsComponent;
@@ -32,6 +34,7 @@ use nativelink_util::store_trait::{RemoveItemCallback, Store, StoreDriver, Store
3234
use pretty_assertions::assert_eq;
3335
use rand::rngs::SmallRng;
3436
use rand::{Rng, SeedableRng};
37+
use tracing::debug;
3538

3639
const MEGABYTE_SZ: usize = 1024 * 1024;
3740

nativelink-store/tests/filesystem_store_test.rs

Lines changed: 71 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ use bytes::Bytes;
2626
use futures::executor::block_on;
2727
use futures::task::Poll;
2828
use futures::{Future, FutureExt, poll};
29-
use nativelink_config::stores::FilesystemSpec;
29+
use nativelink_config::stores::{EvictionPolicy, FilesystemSpec};
3030
use nativelink_error::{Code, Error, ResultExt, make_err};
3131
use nativelink_macro::nativelink_test;
3232
use nativelink_store::filesystem_store::{
@@ -41,7 +41,8 @@ use nativelink_util::{background_spawn, spawn};
4141
use opentelemetry::context::{Context, FutureExt as OtelFutureExt};
4242
use parking_lot::Mutex;
4343
use pretty_assertions::assert_eq;
44-
use rand::Rng;
44+
use rand::rngs::SmallRng;
45+
use rand::{Rng, SeedableRng};
4546
use sha2::{Digest, Sha256};
4647
use tokio::io::{AsyncReadExt, AsyncSeekExt, AsyncWriteExt, Take};
4748
use tokio::sync::{Barrier, Semaphore};
@@ -50,6 +51,15 @@ use tokio_stream::StreamExt;
5051
use tokio_stream::wrappers::ReadDirStream;
5152
use tracing::Instrument;
5253

54+
const VALID_HASH: &str = "0123456789abcdef000000000000000000010000000000000123456789abcdef";
55+
56+
fn make_random_data(sz: usize) -> Vec<u8> {
57+
let mut value = vec![0u8; sz];
58+
let mut rng = SmallRng::seed_from_u64(1);
59+
rng.fill(&mut value[..]);
60+
value
61+
}
62+
5363
trait FileEntryHooks {
5464
fn on_make_and_open(
5565
_encoded_file_path: &EncodedFilePath,
@@ -156,14 +166,14 @@ impl<Hooks: FileEntryHooks + 'static + Sync + Send> LenEntry for TestFileEntry<H
156166
impl<Hooks: FileEntryHooks + 'static + Sync + Send> Drop for TestFileEntry<Hooks> {
157167
fn drop(&mut self) {
158168
let mut inner = self.inner.take().unwrap();
159-
let shared_context = inner.get_shared_context_for_test();
160169
let current_context = Context::current();
161170

162171
// We do this complicated bit here because tokio does not give a way to run a
163172
// command that will wait for all tasks and sub spawns to complete.
164173
// Sadly we need to rely on `active_drop_spawns` to hit zero to ensure that
165174
// all tasks have completed.
166175
let fut = async move {
176+
let shared_context = inner.get_shared_context_for_test().await;
167177
// Drop the FileEntryImpl in a controlled setting then wait for the
168178
// `active_drop_spawns` to hit zero.
169179
drop(inner);
@@ -331,7 +341,7 @@ async fn temp_files_get_deleted_on_replace_test() -> Result<(), Error> {
331341
FilesystemStore::<TestFileEntry<LocalHooks>>::new(&FilesystemSpec {
332342
content_path: content_path.clone(),
333343
temp_path: temp_path.clone(),
334-
eviction_policy: Some(nativelink_config::stores::EvictionPolicy {
344+
eviction_policy: Some(EvictionPolicy {
335345
max_count: 3,
336346
..Default::default()
337347
}),
@@ -404,7 +414,7 @@ async fn file_continues_to_stream_on_content_replace_test() -> Result<(), Error>
404414
FilesystemStore::<TestFileEntry<LocalHooks>>::new(&FilesystemSpec {
405415
content_path: content_path.clone(),
406416
temp_path: temp_path.clone(),
407-
eviction_policy: Some(nativelink_config::stores::EvictionPolicy {
417+
eviction_policy: Some(EvictionPolicy {
408418
max_count: 3,
409419
..Default::default()
410420
}),
@@ -439,7 +449,7 @@ async fn file_continues_to_stream_on_content_replace_test() -> Result<(), Error>
439449
}
440450

441451
// Replace content.
442-
store.update_oneshot(digest1, VALUE2.into()).await?;
452+
store.update_oneshot(digest1, VALUE1.into()).await?;
443453

444454
// Ensure we let any background tasks finish.
445455
tokio::task::yield_now().await;
@@ -512,7 +522,7 @@ async fn file_gets_cleans_up_on_cache_eviction() -> Result<(), Error> {
512522
FilesystemStore::<TestFileEntry<LocalHooks>>::new(&FilesystemSpec {
513523
content_path: content_path.clone(),
514524
temp_path: temp_path.clone(),
515-
eviction_policy: Some(nativelink_config::stores::EvictionPolicy {
525+
eviction_policy: Some(EvictionPolicy {
516526
max_count: 1,
517527
..Default::default()
518528
}),
@@ -586,47 +596,6 @@ async fn file_gets_cleans_up_on_cache_eviction() -> Result<(), Error> {
586596
check_temp_empty(&temp_path).await
587597
}
588598

589-
// Test to ensure that if we are holding a reference to `FileEntry` and the contents are
590-
// replaced, the `FileEntry` continues to use the old data.
591-
// `FileEntry` file contents should be immutable for the lifetime of the object.
592-
#[nativelink_test]
593-
async fn digest_contents_replaced_continues_using_old_data() -> Result<(), Error> {
594-
let digest = DigestInfo::try_new(HASH1, VALUE1.len())?;
595-
596-
let store = Box::pin(
597-
FilesystemStore::<FileEntryImpl>::new(&FilesystemSpec {
598-
content_path: make_temp_path("content_path"),
599-
temp_path: make_temp_path("temp_path"),
600-
eviction_policy: None,
601-
..Default::default()
602-
})
603-
.await?,
604-
);
605-
// Insert data into store.
606-
store.update_oneshot(digest, VALUE1.into()).await?;
607-
let file_entry = store.get_file_entry_for_digest(&digest).await?;
608-
{
609-
// The file contents should equal our initial data.
610-
let mut reader = file_entry.read_file_part(0, u64::MAX).await?;
611-
let mut file_contents = String::new();
612-
reader.read_to_string(&mut file_contents).await?;
613-
assert_eq!(file_contents, VALUE1);
614-
}
615-
616-
// Now replace the data.
617-
store.update_oneshot(digest, VALUE2.into()).await?;
618-
619-
{
620-
// The file contents still equal our old data.
621-
let mut reader = file_entry.read_file_part(0, u64::MAX).await?;
622-
let mut file_contents = String::new();
623-
reader.read_to_string(&mut file_contents).await?;
624-
assert_eq!(file_contents, VALUE1);
625-
}
626-
627-
Ok(())
628-
}
629-
630599
#[nativelink_test]
631600
async fn eviction_on_insert_calls_unref_once() -> Result<(), Error> {
632601
const SMALL_VALUE: &str = "01";
@@ -658,7 +627,7 @@ async fn eviction_on_insert_calls_unref_once() -> Result<(), Error> {
658627
FilesystemStore::<TestFileEntry<LocalHooks>>::new(&FilesystemSpec {
659628
content_path: make_temp_path("content_path"),
660629
temp_path: make_temp_path("temp_path"),
661-
eviction_policy: Some(nativelink_config::stores::EvictionPolicy {
630+
eviction_policy: Some(EvictionPolicy {
662631
max_bytes: 5,
663632
..Default::default()
664633
}),
@@ -1345,3 +1314,56 @@ async fn file_slot_taken_when_ready() -> Result<(), Error> {
13451314
.map_err(|_| make_err!(Code::Internal, "Deadlock detected"))?;
13461315
res_1.merge(res_2).merge(res_3).merge(res_4)
13471316
}
1317+
1318+
// If we insert a file larger than the max_bytes eviction policy, it should be safely
1319+
// evicted, without deadlocking.
1320+
#[nativelink_test]
1321+
async fn safe_small_safe_eviction() -> Result<(), Error> {
1322+
let store_spec = FilesystemSpec {
1323+
content_path: "/tmp/nativelink/safe_fs".into(),
1324+
temp_path: "/tmp/nativelink/safe_fs_temp".into(),
1325+
eviction_policy: Some(EvictionPolicy {
1326+
max_bytes: 1,
1327+
..Default::default()
1328+
}),
1329+
..Default::default()
1330+
};
1331+
let store = Store::new(<FilesystemStore>::new(&store_spec).await?);
1332+
1333+
// > than the max_bytes
1334+
let bytes = 2;
1335+
1336+
let data = make_random_data(bytes);
1337+
let digest = DigestInfo::try_new(VALID_HASH, data.len()).unwrap();
1338+
1339+
assert_eq!(
1340+
store.has(digest).await,
1341+
Ok(None),
1342+
"Expected data to not exist in store"
1343+
);
1344+
1345+
store.update_oneshot(digest, data.clone().into()).await?;
1346+
1347+
assert_eq!(
1348+
store.has(digest).await,
1349+
Ok(None),
1350+
"Expected data to not exist in store, because eviction"
1351+
);
1352+
1353+
let (tx, mut rx) = make_buf_channel_pair();
1354+
1355+
assert_eq!(
1356+
store.get(digest, tx).await,
1357+
Err(Error {
1358+
code: Code::NotFound,
1359+
messages: vec![format!(
1360+
"{VALID_HASH}-{bytes} not found in filesystem store here"
1361+
)],
1362+
}),
1363+
"Expected data to not exist in store, because eviction"
1364+
);
1365+
1366+
assert!(rx.recv().await.is_err());
1367+
1368+
Ok(())
1369+
}

nativelink-util/src/fs.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ use rlimit::increase_nofile_limit;
2727
pub use tokio::fs::DirEntry;
2828
use tokio::io::{AsyncRead, AsyncReadExt, AsyncSeek, AsyncWrite, ReadBuf, SeekFrom, Take};
2929
use tokio::sync::{Semaphore, SemaphorePermit};
30-
use tracing::{error, info, warn};
30+
use tracing::{debug, error, info, trace, warn};
3131

3232
use crate::spawn_blocking;
3333

@@ -121,6 +121,7 @@ pub static OPEN_FILE_SEMAPHORE: Semaphore = Semaphore::const_new(DEFAULT_OPEN_FI
121121
/// Try to acquire a permit from the open file semaphore.
122122
#[inline]
123123
pub async fn get_permit() -> Result<SemaphorePermit<'static>, Error> {
124+
trace!(available_permits = OPEN_FILE_SEMAPHORE.available_permits(), "getting FS permit");
124125
OPEN_FILE_SEMAPHORE
125126
.acquire()
126127
.await

0 commit comments

Comments
 (0)