Skip to content

Commit 9537bdd

Browse files
committed
New filesystem test for eviction breaking
1 parent 437a785 commit 9537bdd

File tree

3 files changed

+126
-45
lines changed

3 files changed

+126
-45
lines changed

nativelink-store/src/filesystem_store.rs

Lines changed: 52 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ use nativelink_util::store_trait::{
4040
};
4141
use tokio::io::{AsyncReadExt, AsyncWriteExt, Take};
4242
use tokio_stream::wrappers::ReadDirStream;
43-
use tracing::{debug, error, warn};
43+
use tracing::{debug, error, info, warn};
4444

4545
use crate::callback_utils::RemoveItemCallbackHolder;
4646
use crate::cas_utils::is_zero_digest;
@@ -129,7 +129,8 @@ impl Drop for EncodedFilePath {
129129
.fetch_add(1, Ordering::Relaxed)
130130
+ 1;
131131
debug!(
132-
?current_active_drop_spawns,
132+
%current_active_drop_spawns,
133+
?file_path,
133134
"Spawned a filesystem_delete_file"
134135
);
135136
background_spawn!("filesystem_delete_file", async move {
@@ -148,6 +149,7 @@ impl Drop for EncodedFilePath {
148149
- 1;
149150
debug!(
150151
?current_active_drop_spawns,
152+
?file_path,
151153
"Dropped a filesystem_delete_file"
152154
);
153155
});
@@ -220,6 +222,7 @@ pub trait FileEntry: LenEntry + Send + Sync + Debug + 'static {
220222
pub struct FileEntryImpl {
221223
data_size: u64,
222224
block_size: u64,
225+
// We lock around this as it gets rewritten when we move between temp and content types
223226
encoded_file_path: RwLock<EncodedFilePath>,
224227
}
225228

@@ -362,37 +365,38 @@ impl LenEntry for FileEntryImpl {
362365
// target file location to the new temp file. `unref()` should only ever be called once.
363366
#[inline]
364367
async fn unref(&self) {
365-
{
366-
let mut encoded_file_path = self.encoded_file_path.write().await;
367-
if encoded_file_path.path_type == PathType::Temp {
368-
// We are already a temp file that is now marked for deletion on drop.
369-
// This is very rare, but most likely the rename into the content path failed.
370-
return;
371-
}
372-
let from_path = encoded_file_path.get_file_path();
373-
let new_key = make_temp_key(&encoded_file_path.key);
374-
375-
let to_path =
376-
to_full_path_from_key(&encoded_file_path.shared_context.temp_path, &new_key);
377-
378-
if let Err(err) = fs::rename(&from_path, &to_path).await {
379-
warn!(
380-
key = ?encoded_file_path.key,
381-
?from_path,
382-
?to_path,
383-
?err,
384-
"Failed to rename file",
385-
);
386-
} else {
387-
debug!(
388-
key = ?encoded_file_path.key,
389-
?from_path,
390-
?to_path,
391-
"Renamed file",
392-
);
393-
encoded_file_path.path_type = PathType::Temp;
394-
encoded_file_path.key = new_key;
395-
}
368+
let mut encoded_file_path = self.encoded_file_path.write().await;
369+
if encoded_file_path.path_type == PathType::Temp {
370+
// We are already a temp file that is now marked for deletion on drop.
371+
// This is very rare, but most likely the rename into the content path failed.
372+
warn!(
373+
key = ?encoded_file_path.key,
374+
"File is already a temp file",
375+
);
376+
return;
377+
}
378+
let from_path = encoded_file_path.get_file_path();
379+
let new_key = make_temp_key(&encoded_file_path.key);
380+
381+
let to_path = to_full_path_from_key(&encoded_file_path.shared_context.temp_path, &new_key);
382+
383+
if let Err(err) = fs::rename(&from_path, &to_path).await {
384+
warn!(
385+
key = ?encoded_file_path.key,
386+
?from_path,
387+
?to_path,
388+
?err,
389+
"Failed to rename file",
390+
);
391+
} else {
392+
debug!(
393+
key = ?encoded_file_path.key,
394+
?from_path,
395+
?to_path,
396+
"Renamed file (unref)",
397+
);
398+
encoded_file_path.path_type = PathType::Temp;
399+
encoded_file_path.key = new_key;
396400
}
397401
}
398402
}
@@ -531,7 +535,7 @@ async fn add_files_to_cache<Fe: FileEntry>(
531535
if let Err(err) = rename_fn(&from_file, &to_file) {
532536
warn!(?from_file, ?to_file, ?err, "Failed to rename file",);
533537
} else {
534-
debug!(?from_file, ?to_file, "Renamed file",);
538+
debug!(?from_file, ?to_file, "Renamed file (old cache)",);
535539
}
536540
}
537541
Ok(())
@@ -751,6 +755,7 @@ impl<Fe: FileEntry> FilesystemStore<Fe> {
751755
.await
752756
.err_tip(|| "Failed to sync_data in filesystem store")?;
753757

758+
debug!(?temp_file, "Dropping file to update_file");
754759
drop(temp_file);
755760

756761
*entry.data_size_mut() = data_size;
@@ -781,17 +786,25 @@ impl<Fe: FileEntry> FilesystemStore<Fe> {
781786
// We need to guarantee that this will get to the end even if the parent future is dropped.
782787
// See: https://github.com/TraceMachina/nativelink/issues/495
783788
background_spawn!("filesystem_store_emplace_file", async move {
789+
evicting_map
790+
.insert(key.borrow().into_owned().into(), entry.clone())
791+
.await;
792+
793+
// The insert might have resulted in an eviction/unref so we need to check
794+
// it still exists in there. But first, get the lock...
784795
let mut encoded_file_path = entry.get_encoded_file_path().write().await;
796+
// Then check it's still in there...
797+
if evicting_map.get(&key).await.is_none() {
798+
info!(%key, "Got eviction while emplacing, dropping");
799+
return Ok(());
800+
}
801+
785802
let final_path = get_file_path_raw(
786803
&PathType::Content,
787804
encoded_file_path.shared_context.as_ref(),
788805
&key,
789806
);
790807

791-
evicting_map
792-
.insert(key.borrow().into_owned().into(), entry.clone())
793-
.await;
794-
795808
let from_path = encoded_file_path.get_file_path();
796809
// Internally tokio spawns fs commands onto a blocking thread anyways.
797810
// Since we are already on a blocking thread, we just need the `fs` wrapper to manage
@@ -941,6 +954,7 @@ impl<Fe: FileEntry> StoreDriver for FilesystemStore<Fe> {
941954
);
942955
// We are done with the file, if we hold a reference to the file here, it could
943956
// result in a deadlock if `emplace_file()` also needs file descriptors.
957+
debug!(?file, "Dropping file to to update_with_whole_file");
944958
drop(file);
945959
self.emplace_file(key.into_owned(), Arc::new(entry))
946960
.await

nativelink-store/tests/filesystem_store_test.rs

Lines changed: 69 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ use bytes::Bytes;
2626
use futures::executor::block_on;
2727
use futures::task::Poll;
2828
use futures::{Future, FutureExt, poll};
29-
use nativelink_config::stores::FilesystemSpec;
29+
use nativelink_config::stores::{EvictionPolicy, FilesystemSpec};
3030
use nativelink_error::{Code, Error, ResultExt, make_err};
3131
use nativelink_macro::nativelink_test;
3232
use nativelink_store::filesystem_store::{
@@ -41,7 +41,8 @@ use nativelink_util::{background_spawn, spawn};
4141
use opentelemetry::context::{Context, FutureExt as OtelFutureExt};
4242
use parking_lot::Mutex;
4343
use pretty_assertions::assert_eq;
44-
use rand::Rng;
44+
use rand::rngs::SmallRng;
45+
use rand::{Rng, SeedableRng};
4546
use sha2::{Digest, Sha256};
4647
use tokio::io::{AsyncReadExt, AsyncSeekExt, AsyncWriteExt, Take};
4748
use tokio::sync::{Barrier, Semaphore};
@@ -50,6 +51,15 @@ use tokio_stream::StreamExt;
5051
use tokio_stream::wrappers::ReadDirStream;
5152
use tracing::Instrument;
5253

54+
const VALID_HASH: &str = "0123456789abcdef000000000000000000010000000000000123456789abcdef";
55+
56+
fn make_random_data(sz: usize) -> Vec<u8> {
57+
let mut value = vec![0u8; sz];
58+
let mut rng = SmallRng::seed_from_u64(1);
59+
rng.fill(&mut value[..]);
60+
value
61+
}
62+
5363
trait FileEntryHooks {
5464
fn on_make_and_open(
5565
_encoded_file_path: &EncodedFilePath,
@@ -331,7 +341,7 @@ async fn temp_files_get_deleted_on_replace_test() -> Result<(), Error> {
331341
FilesystemStore::<TestFileEntry<LocalHooks>>::new(&FilesystemSpec {
332342
content_path: content_path.clone(),
333343
temp_path: temp_path.clone(),
334-
eviction_policy: Some(nativelink_config::stores::EvictionPolicy {
344+
eviction_policy: Some(EvictionPolicy {
335345
max_count: 3,
336346
..Default::default()
337347
}),
@@ -404,7 +414,7 @@ async fn file_continues_to_stream_on_content_replace_test() -> Result<(), Error>
404414
FilesystemStore::<TestFileEntry<LocalHooks>>::new(&FilesystemSpec {
405415
content_path: content_path.clone(),
406416
temp_path: temp_path.clone(),
407-
eviction_policy: Some(nativelink_config::stores::EvictionPolicy {
417+
eviction_policy: Some(EvictionPolicy {
408418
max_count: 3,
409419
..Default::default()
410420
}),
@@ -512,7 +522,7 @@ async fn file_gets_cleans_up_on_cache_eviction() -> Result<(), Error> {
512522
FilesystemStore::<TestFileEntry<LocalHooks>>::new(&FilesystemSpec {
513523
content_path: content_path.clone(),
514524
temp_path: temp_path.clone(),
515-
eviction_policy: Some(nativelink_config::stores::EvictionPolicy {
525+
eviction_policy: Some(EvictionPolicy {
516526
max_count: 1,
517527
..Default::default()
518528
}),
@@ -658,7 +668,7 @@ async fn eviction_on_insert_calls_unref_once() -> Result<(), Error> {
658668
FilesystemStore::<TestFileEntry<LocalHooks>>::new(&FilesystemSpec {
659669
content_path: make_temp_path("content_path"),
660670
temp_path: make_temp_path("temp_path"),
661-
eviction_policy: Some(nativelink_config::stores::EvictionPolicy {
671+
eviction_policy: Some(EvictionPolicy {
662672
max_bytes: 5,
663673
..Default::default()
664674
}),
@@ -1400,3 +1410,56 @@ async fn file_slot_taken_when_ready() -> Result<(), Error> {
14001410
.map_err(|_| make_err!(Code::Internal, "Deadlock detected"))?;
14011411
res_1.merge(res_2).merge(res_3).merge(res_4)
14021412
}
1413+
1414+
// If we insert a file larger than the max_bytes eviction policy, it should be safely
1415+
// evicted, without deadlocking.
1416+
#[nativelink_test]
1417+
async fn safe_small_safe_eviction() -> Result<(), Error> {
1418+
let store_spec = FilesystemSpec {
1419+
content_path: "/tmp/nativelink/safe_fs".into(),
1420+
temp_path: "/tmp/nativelink/safe_fs_temp".into(),
1421+
eviction_policy: Some(EvictionPolicy {
1422+
max_bytes: 1,
1423+
..Default::default()
1424+
}),
1425+
..Default::default()
1426+
};
1427+
let store = Store::new(<FilesystemStore>::new(&store_spec).await?);
1428+
1429+
// > than the max_bytes
1430+
let bytes = 2;
1431+
1432+
let data = make_random_data(bytes);
1433+
let digest = DigestInfo::try_new(VALID_HASH, data.len()).unwrap();
1434+
1435+
assert_eq!(
1436+
store.has(digest).await,
1437+
Ok(None),
1438+
"Expected data to not exist in store"
1439+
);
1440+
1441+
store.update_oneshot(digest, data.clone().into()).await?;
1442+
1443+
assert_eq!(
1444+
store.has(digest).await,
1445+
Ok(None),
1446+
"Expected data to not exist in store, because eviction"
1447+
);
1448+
1449+
let (tx, mut rx) = make_buf_channel_pair();
1450+
1451+
assert_eq!(
1452+
store.get(digest, tx).await,
1453+
Err(Error {
1454+
code: Code::NotFound,
1455+
messages: vec![format!(
1456+
"{VALID_HASH}-{bytes} not found in filesystem store here"
1457+
)],
1458+
}),
1459+
"Expected data to not exist in store, because eviction"
1460+
);
1461+
1462+
assert!(rx.recv().await.is_err());
1463+
1464+
Ok(())
1465+
}

nativelink-util/src/fs.rs

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ use rlimit::increase_nofile_limit;
2727
pub use tokio::fs::DirEntry;
2828
use tokio::io::{AsyncRead, AsyncReadExt, AsyncSeek, AsyncWrite, ReadBuf, SeekFrom, Take};
2929
use tokio::sync::{Semaphore, SemaphorePermit};
30-
use tracing::{error, info, warn};
30+
use tracing::{error, info, trace, warn};
3131

3232
use crate::spawn_blocking;
3333

@@ -121,6 +121,10 @@ pub static OPEN_FILE_SEMAPHORE: Semaphore = Semaphore::const_new(DEFAULT_OPEN_FI
121121
/// Try to acquire a permit from the open file semaphore.
122122
#[inline]
123123
pub async fn get_permit() -> Result<SemaphorePermit<'static>, Error> {
124+
trace!(
125+
available_permits = OPEN_FILE_SEMAPHORE.available_permits(),
126+
"getting FS permit"
127+
);
124128
OPEN_FILE_SEMAPHORE
125129
.acquire()
126130
.await

0 commit comments

Comments
 (0)