@@ -40,7 +40,7 @@ use nativelink_util::store_trait::{
4040} ;
4141use tokio:: io:: { AsyncReadExt , AsyncWriteExt , Take } ;
4242use tokio_stream:: wrappers:: ReadDirStream ;
43- use tracing:: { debug, error, warn} ;
43+ use tracing:: { debug, error, info , warn} ;
4444
4545use crate :: callback_utils:: RemoveItemCallbackHolder ;
4646use crate :: cas_utils:: is_zero_digest;
@@ -129,7 +129,8 @@ impl Drop for EncodedFilePath {
129129 . fetch_add ( 1 , Ordering :: Relaxed )
130130 + 1 ;
131131 debug ! (
132- ?current_active_drop_spawns,
132+ %current_active_drop_spawns,
133+ ?file_path,
133134 "Spawned a filesystem_delete_file"
134135 ) ;
135136 background_spawn ! ( "filesystem_delete_file" , async move {
@@ -148,6 +149,7 @@ impl Drop for EncodedFilePath {
148149 - 1 ;
149150 debug!(
150151 ?current_active_drop_spawns,
152+ ?file_path,
151153 "Dropped a filesystem_delete_file"
152154 ) ;
153155 } ) ;
@@ -220,6 +222,7 @@ pub trait FileEntry: LenEntry + Send + Sync + Debug + 'static {
220222pub struct FileEntryImpl {
221223 data_size : u64 ,
222224 block_size : u64 ,
225+ // We lock around this as it gets rewritten when we move between temp and content types
223226 encoded_file_path : RwLock < EncodedFilePath > ,
224227}
225228
@@ -362,37 +365,38 @@ impl LenEntry for FileEntryImpl {
362365 // target file location to the new temp file. `unref()` should only ever be called once.
363366 #[ inline]
364367 async fn unref ( & self ) {
365- {
366- let mut encoded_file_path = self . encoded_file_path . write ( ) . await ;
367- if encoded_file_path. path_type == PathType :: Temp {
368- // We are already a temp file that is now marked for deletion on drop.
369- // This is very rare, but most likely the rename into the content path failed.
370- return ;
371- }
372- let from_path = encoded_file_path. get_file_path ( ) ;
373- let new_key = make_temp_key ( & encoded_file_path. key ) ;
374-
375- let to_path =
376- to_full_path_from_key ( & encoded_file_path. shared_context . temp_path , & new_key) ;
377-
378- if let Err ( err) = fs:: rename ( & from_path, & to_path) . await {
379- warn ! (
380- key = ?encoded_file_path. key,
381- ?from_path,
382- ?to_path,
383- ?err,
384- "Failed to rename file" ,
385- ) ;
386- } else {
387- debug ! (
388- key = ?encoded_file_path. key,
389- ?from_path,
390- ?to_path,
391- "Renamed file" ,
392- ) ;
393- encoded_file_path. path_type = PathType :: Temp ;
394- encoded_file_path. key = new_key;
395- }
368+ let mut encoded_file_path = self . encoded_file_path . write ( ) . await ;
369+ if encoded_file_path. path_type == PathType :: Temp {
370+ // We are already a temp file that is now marked for deletion on drop.
371+ // This is very rare, but most likely the rename into the content path failed.
372+ warn ! (
373+ key = ?encoded_file_path. key,
374+ "File is already a temp file" ,
375+ ) ;
376+ return ;
377+ }
378+ let from_path = encoded_file_path. get_file_path ( ) ;
379+ let new_key = make_temp_key ( & encoded_file_path. key ) ;
380+
381+ let to_path = to_full_path_from_key ( & encoded_file_path. shared_context . temp_path , & new_key) ;
382+
383+ if let Err ( err) = fs:: rename ( & from_path, & to_path) . await {
384+ warn ! (
385+ key = ?encoded_file_path. key,
386+ ?from_path,
387+ ?to_path,
388+ ?err,
389+ "Failed to rename file" ,
390+ ) ;
391+ } else {
392+ debug ! (
393+ key = ?encoded_file_path. key,
394+ ?from_path,
395+ ?to_path,
396+ "Renamed file (unref)" ,
397+ ) ;
398+ encoded_file_path. path_type = PathType :: Temp ;
399+ encoded_file_path. key = new_key;
396400 }
397401 }
398402}
@@ -531,7 +535,7 @@ async fn add_files_to_cache<Fe: FileEntry>(
531535 if let Err ( err) = rename_fn ( & from_file, & to_file) {
532536 warn ! ( ?from_file, ?to_file, ?err, "Failed to rename file" , ) ;
533537 } else {
534- debug ! ( ?from_file, ?to_file, "Renamed file" , ) ;
538+ debug ! ( ?from_file, ?to_file, "Renamed file (old cache) " , ) ;
535539 }
536540 }
537541 Ok ( ( ) )
@@ -740,6 +744,7 @@ impl<Fe: FileEntry> FilesystemStore<Fe> {
740744 . await
741745 . err_tip ( || "Failed to sync_data in filesystem store" ) ?;
742746
747+ debug ! ( ?temp_file, "Dropping file to update_file" ) ;
743748 drop ( temp_file) ;
744749
745750 * entry. data_size_mut ( ) = data_size;
@@ -770,17 +775,25 @@ impl<Fe: FileEntry> FilesystemStore<Fe> {
770775 // We need to guarantee that this will get to the end even if the parent future is dropped.
771776 // See: https://github.com/TraceMachina/nativelink/issues/495
772777 background_spawn ! ( "filesystem_store_emplace_file" , async move {
778+ evicting_map
779+ . insert( key. borrow( ) . into_owned( ) . into( ) , entry. clone( ) )
780+ . await ;
781+
782+ // The insert might have resulted in an eviction/unref so we need to check
783+ // it still exists in there. But first, get the lock...
773784 let mut encoded_file_path = entry. get_encoded_file_path( ) . write( ) . await ;
785+ // Then check it's still in there...
786+ if evicting_map. get( & key) . await . is_none( ) {
787+ info!( %key, "Got eviction while emplacing, dropping" ) ;
788+ return Ok ( ( ) ) ;
789+ }
790+
774791 let final_path = get_file_path_raw(
775792 & PathType :: Content ,
776793 encoded_file_path. shared_context. as_ref( ) ,
777794 & key,
778795 ) ;
779796
780- evicting_map
781- . insert( key. borrow( ) . into_owned( ) . into( ) , entry. clone( ) )
782- . await ;
783-
784797 let from_path = encoded_file_path. get_file_path( ) ;
785798 // Internally tokio spawns fs commands onto a blocking thread anyways.
786799 // Since we are already on a blocking thread, we just need the `fs` wrapper to manage
@@ -921,6 +934,7 @@ impl<Fe: FileEntry> StoreDriver for FilesystemStore<Fe> {
921934 ) ;
922935 // We are done with the file, if we hold a reference to the file here, it could
923936 // result in a deadlock if `emplace_file()` also needs file descriptors.
937+ debug ! ( ?file, "Dropping file to to update_with_whole_file" ) ;
924938 drop ( file) ;
925939 self . emplace_file ( key. into_owned ( ) , Arc :: new ( entry) )
926940 . await
0 commit comments