Skip to content

Commit e0dbc60

Browse files
committed
New filesystem test for eviction breaking
1 parent bef18b3 commit e0dbc60

File tree

3 files changed

+123
-39
lines changed

3 files changed

+123
-39
lines changed

nativelink-store/src/filesystem_store.rs

Lines changed: 49 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ use nativelink_util::store_trait::{
4040
};
4141
use tokio::io::{AsyncReadExt, AsyncWriteExt, Take};
4242
use tokio_stream::wrappers::ReadDirStream;
43-
use tracing::{debug, error, warn};
43+
use tracing::{debug, error, info, warn};
4444

4545
use crate::callback_utils::RemoveItemCallbackHolder;
4646
use crate::cas_utils::is_zero_digest;
@@ -129,7 +129,8 @@ impl Drop for EncodedFilePath {
129129
.fetch_add(1, Ordering::Relaxed)
130130
+ 1;
131131
debug!(
132-
?current_active_drop_spawns,
132+
%current_active_drop_spawns,
133+
?file_path,
133134
"Spawned a filesystem_delete_file"
134135
);
135136
background_spawn!("filesystem_delete_file", async move {
@@ -148,6 +149,7 @@ impl Drop for EncodedFilePath {
148149
- 1;
149150
debug!(
150151
?current_active_drop_spawns,
152+
?file_path,
151153
"Dropped a filesystem_delete_file"
152154
);
153155
});
@@ -220,6 +222,7 @@ pub trait FileEntry: LenEntry + Send + Sync + Debug + 'static {
220222
pub struct FileEntryImpl {
221223
data_size: u64,
222224
block_size: u64,
225+
// We lock around this as it gets rewritten when we move between temp and content types
223226
encoded_file_path: RwLock<EncodedFilePath>,
224227
}
225228

@@ -362,37 +365,49 @@ impl LenEntry for FileEntryImpl {
362365
// target file location to the new temp file. `unref()` should only ever be called once.
363366
#[inline]
364367
async fn unref(&self) {
365-
{
366-
let mut encoded_file_path = self.encoded_file_path.write().await;
367-
if encoded_file_path.path_type == PathType::Temp {
368-
// We are already a temp file that is now marked for deletion on drop.
369-
// This is very rare, but most likely the rename into the content path failed.
370-
return;
371-
}
372-
let from_path = encoded_file_path.get_file_path();
373-
let new_key = make_temp_key(&encoded_file_path.key);
374-
375-
let to_path =
376-
to_full_path_from_key(&encoded_file_path.shared_context.temp_path, &new_key);
377-
378-
if let Err(err) = fs::rename(&from_path, &to_path).await {
379-
warn!(
380-
key = ?encoded_file_path.key,
381-
?from_path,
382-
?to_path,
383-
?err,
384-
"Failed to rename file",
385-
);
386-
} else {
387-
debug!(
388-
key = ?encoded_file_path.key,
389-
?from_path,
390-
?to_path,
391-
"Renamed file",
368+
let mut encoded_file_path = match self.encoded_file_path.try_write() {
369+
Some(efp) => efp,
370+
None => {
371+
// This is being held onto by something else, usually emplace_path.
372+
// We shouldn't try and unref this, because they're doing things to it instead
373+
info!(
374+
?self,
375+
"Skipping trying to do unref as someone else is editing this"
392376
);
393-
encoded_file_path.path_type = PathType::Temp;
394-
encoded_file_path.key = new_key;
377+
return;
395378
}
379+
};
380+
if encoded_file_path.path_type == PathType::Temp {
381+
// We are already a temp file that is now marked for deletion on drop.
382+
// This is very rare, but most likely the rename into the content path failed.
383+
warn!(
384+
key = ?encoded_file_path.key,
385+
"File is already a temp file",
386+
);
387+
return;
388+
}
389+
let from_path = encoded_file_path.get_file_path();
390+
let new_key = make_temp_key(&encoded_file_path.key);
391+
392+
let to_path = to_full_path_from_key(&encoded_file_path.shared_context.temp_path, &new_key);
393+
394+
if let Err(err) = fs::rename(&from_path, &to_path).await {
395+
warn!(
396+
key = ?encoded_file_path.key,
397+
?from_path,
398+
?to_path,
399+
?err,
400+
"Failed to rename file",
401+
);
402+
} else {
403+
debug!(
404+
key = ?encoded_file_path.key,
405+
?from_path,
406+
?to_path,
407+
"Renamed file (unref)",
408+
);
409+
encoded_file_path.path_type = PathType::Temp;
410+
encoded_file_path.key = new_key;
396411
}
397412
}
398413
}
@@ -531,7 +546,7 @@ async fn add_files_to_cache<Fe: FileEntry>(
531546
if let Err(err) = rename_fn(&from_file, &to_file) {
532547
warn!(?from_file, ?to_file, ?err, "Failed to rename file",);
533548
} else {
534-
debug!(?from_file, ?to_file, "Renamed file",);
549+
debug!(?from_file, ?to_file, "Renamed file (old cache)",);
535550
}
536551
}
537552
Ok(())
@@ -740,6 +755,7 @@ impl<Fe: FileEntry> FilesystemStore<Fe> {
740755
.await
741756
.err_tip(|| "Failed to sync_data in filesystem store")?;
742757

758+
debug!(?temp_file, "Dropping file to update_file");
743759
drop(temp_file);
744760

745761
*entry.data_size_mut() = data_size;
@@ -921,6 +937,7 @@ impl<Fe: FileEntry> StoreDriver for FilesystemStore<Fe> {
921937
);
922938
// We are done with the file, if we hold a reference to the file here, it could
923939
// result in a deadlock if `emplace_file()` also needs file descriptors.
940+
debug!(?file, "Dropping file to to update_with_whole_file");
924941
drop(file);
925942
self.emplace_file(key.into_owned(), Arc::new(entry))
926943
.await

nativelink-store/tests/filesystem_store_test.rs

Lines changed: 69 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ use bytes::Bytes;
2626
use futures::executor::block_on;
2727
use futures::task::Poll;
2828
use futures::{Future, FutureExt, poll};
29-
use nativelink_config::stores::FilesystemSpec;
29+
use nativelink_config::stores::{EvictionPolicy, FilesystemSpec};
3030
use nativelink_error::{Code, Error, ResultExt, make_err};
3131
use nativelink_macro::nativelink_test;
3232
use nativelink_store::filesystem_store::{
@@ -41,7 +41,8 @@ use nativelink_util::{background_spawn, spawn};
4141
use opentelemetry::context::{Context, FutureExt as OtelFutureExt};
4242
use parking_lot::Mutex;
4343
use pretty_assertions::assert_eq;
44-
use rand::Rng;
44+
use rand::rngs::SmallRng;
45+
use rand::{Rng, SeedableRng};
4546
use sha2::{Digest, Sha256};
4647
use tokio::io::{AsyncReadExt, AsyncSeekExt, AsyncWriteExt, Take};
4748
use tokio::sync::{Barrier, Semaphore};
@@ -50,6 +51,15 @@ use tokio_stream::StreamExt;
5051
use tokio_stream::wrappers::ReadDirStream;
5152
use tracing::Instrument;
5253

54+
const VALID_HASH: &str = "0123456789abcdef000000000000000000010000000000000123456789abcdef";
55+
56+
fn make_random_data(sz: usize) -> Vec<u8> {
57+
let mut value = vec![0u8; sz];
58+
let mut rng = SmallRng::seed_from_u64(1);
59+
rng.fill(&mut value[..]);
60+
value
61+
}
62+
5363
trait FileEntryHooks {
5464
fn on_make_and_open(
5565
_encoded_file_path: &EncodedFilePath,
@@ -331,7 +341,7 @@ async fn temp_files_get_deleted_on_replace_test() -> Result<(), Error> {
331341
FilesystemStore::<TestFileEntry<LocalHooks>>::new(&FilesystemSpec {
332342
content_path: content_path.clone(),
333343
temp_path: temp_path.clone(),
334-
eviction_policy: Some(nativelink_config::stores::EvictionPolicy {
344+
eviction_policy: Some(EvictionPolicy {
335345
max_count: 3,
336346
..Default::default()
337347
}),
@@ -404,7 +414,7 @@ async fn file_continues_to_stream_on_content_replace_test() -> Result<(), Error>
404414
FilesystemStore::<TestFileEntry<LocalHooks>>::new(&FilesystemSpec {
405415
content_path: content_path.clone(),
406416
temp_path: temp_path.clone(),
407-
eviction_policy: Some(nativelink_config::stores::EvictionPolicy {
417+
eviction_policy: Some(EvictionPolicy {
408418
max_count: 3,
409419
..Default::default()
410420
}),
@@ -512,7 +522,7 @@ async fn file_gets_cleans_up_on_cache_eviction() -> Result<(), Error> {
512522
FilesystemStore::<TestFileEntry<LocalHooks>>::new(&FilesystemSpec {
513523
content_path: content_path.clone(),
514524
temp_path: temp_path.clone(),
515-
eviction_policy: Some(nativelink_config::stores::EvictionPolicy {
525+
eviction_policy: Some(EvictionPolicy {
516526
max_count: 1,
517527
..Default::default()
518528
}),
@@ -658,7 +668,7 @@ async fn eviction_on_insert_calls_unref_once() -> Result<(), Error> {
658668
FilesystemStore::<TestFileEntry<LocalHooks>>::new(&FilesystemSpec {
659669
content_path: make_temp_path("content_path"),
660670
temp_path: make_temp_path("temp_path"),
661-
eviction_policy: Some(nativelink_config::stores::EvictionPolicy {
671+
eviction_policy: Some(EvictionPolicy {
662672
max_bytes: 5,
663673
..Default::default()
664674
}),
@@ -1345,3 +1355,56 @@ async fn file_slot_taken_when_ready() -> Result<(), Error> {
13451355
.map_err(|_| make_err!(Code::Internal, "Deadlock detected"))?;
13461356
res_1.merge(res_2).merge(res_3).merge(res_4)
13471357
}
1358+
1359+
// If we insert a file larger than the max_bytes eviction policy, it should be safely
1360+
// evicted, without deadlocking.
1361+
#[nativelink_test]
1362+
async fn safe_small_safe_eviction() -> Result<(), Error> {
1363+
let store_spec = FilesystemSpec {
1364+
content_path: "/tmp/nativelink/safe_fs".into(),
1365+
temp_path: "/tmp/nativelink/safe_fs_temp".into(),
1366+
eviction_policy: Some(EvictionPolicy {
1367+
max_bytes: 1,
1368+
..Default::default()
1369+
}),
1370+
..Default::default()
1371+
};
1372+
let store = Store::new(<FilesystemStore>::new(&store_spec).await?);
1373+
1374+
// > than the max_bytes
1375+
let bytes = 2;
1376+
1377+
let data = make_random_data(bytes);
1378+
let digest = DigestInfo::try_new(VALID_HASH, data.len()).unwrap();
1379+
1380+
assert_eq!(
1381+
store.has(digest).await,
1382+
Ok(None),
1383+
"Expected data to not exist in store"
1384+
);
1385+
1386+
store.update_oneshot(digest, data.clone().into()).await?;
1387+
1388+
assert_eq!(
1389+
store.has(digest).await,
1390+
Ok(None),
1391+
"Expected data to not exist in store, because eviction"
1392+
);
1393+
1394+
let (tx, mut rx) = make_buf_channel_pair();
1395+
1396+
assert_eq!(
1397+
store.get(digest, tx).await,
1398+
Err(Error {
1399+
code: Code::NotFound,
1400+
messages: vec![format!(
1401+
"{VALID_HASH}-{bytes} not found in filesystem store here"
1402+
)],
1403+
}),
1404+
"Expected data to not exist in store, because eviction"
1405+
);
1406+
1407+
assert!(rx.recv().await.is_err());
1408+
1409+
Ok(())
1410+
}

nativelink-util/src/fs.rs

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ use rlimit::increase_nofile_limit;
2727
pub use tokio::fs::DirEntry;
2828
use tokio::io::{AsyncRead, AsyncReadExt, AsyncSeek, AsyncWrite, ReadBuf, SeekFrom, Take};
2929
use tokio::sync::{Semaphore, SemaphorePermit};
30-
use tracing::{error, info, warn};
30+
use tracing::{error, info, trace, warn};
3131

3232
use crate::spawn_blocking;
3333

@@ -121,6 +121,10 @@ pub static OPEN_FILE_SEMAPHORE: Semaphore = Semaphore::const_new(DEFAULT_OPEN_FI
121121
/// Try to acquire a permit from the open file semaphore.
122122
#[inline]
123123
pub async fn get_permit() -> Result<SemaphorePermit<'static>, Error> {
124+
trace!(
125+
available_permits = OPEN_FILE_SEMAPHORE.available_permits(),
126+
"getting FS permit"
127+
);
124128
OPEN_FILE_SEMAPHORE
125129
.acquire()
126130
.await

0 commit comments

Comments
 (0)