From 0dfb28e0a9b7d5b3a0c87f74ef25c8ffd6d7a516 Mon Sep 17 00:00:00 2001 From: "heecheol.park" Date: Fri, 29 Aug 2025 18:12:43 +0900 Subject: [PATCH 1/3] feat: add stable file locking using std::fs APIs - Use Rust 1.89+ stabilized std::fs::File locking APIs - Platform-specific implementation: Unix uses try_lock_shared(), Windows uses try_lock() - Maintains retry logic and advisory file locking behavior - No external dependencies required for file locking --- codex-rs/core/src/message_history.rs | 134 ++++++++++++--------------- 1 file changed, 61 insertions(+), 73 deletions(-) diff --git a/codex-rs/core/src/message_history.rs b/codex-rs/core/src/message_history.rs index 9f13ababb7..6d041556df 100644 --- a/codex-rs/core/src/message_history.rs +++ b/codex-rs/core/src/message_history.rs @@ -105,46 +105,35 @@ pub(crate) async fn append_entry(text: &str, session_id: &Uuid, config: &Config) // Ensure permissions. ensure_owner_only_permissions(&history_file).await?; - // Lock file. - acquire_exclusive_lock_with_retry(&history_file).await?; - - // We use sync I/O with spawn_blocking() because we are using a - // [`std::fs::File`] instead of a [`tokio::fs::File`] to leverage an - // advisory file locking API that is not available in the async API. + // Perform a blocking write under an advisory write lock using std::fs. tokio::task::spawn_blocking(move || -> Result<()> { - history_file.write_all(line.as_bytes())?; - history_file.flush()?; - Ok(()) + // Retry a few times to avoid indefinite blocking when contended. + for _ in 0..MAX_RETRIES { + match history_file.try_lock() { + Ok(()) => { + // While holding the exclusive lock, write the full line. + history_file.write_all(line.as_bytes())?; + history_file.flush()?; + return Ok(()); + } + Err(std::fs::TryLockError::WouldBlock) => { + std::thread::sleep(RETRY_SLEEP); + } + Err(e) => return Err(e.into()), + } + } + + Err(std::io::Error::new( + std::io::ErrorKind::WouldBlock, + "could not acquire exclusive lock on history file after multiple attempts", + )) }) .await??; Ok(()) } -/// Attempt to acquire an exclusive advisory lock on `file`, retrying up to 10 -/// times if the lock is currently held by another process. This prevents a -/// potential indefinite wait while still giving other writers some time to -/// finish their operation. -async fn acquire_exclusive_lock_with_retry(file: &File) -> Result<()> { - use tokio::time::sleep; - - for _ in 0..MAX_RETRIES { - match file.try_lock() { - Ok(()) => return Ok(()), - Err(e) => match e { - std::fs::TryLockError::WouldBlock => { - sleep(RETRY_SLEEP).await; - } - other => return Err(other.into()), - }, - } - } - - Err(std::io::Error::new( - std::io::ErrorKind::WouldBlock, - "could not acquire exclusive lock on history file after multiple attempts", - )) -} +// Exclusive lock handled inline in append_entry using std::fs::File::try_lock. /// Asynchronously fetch the history file's *identifier* (inode on Unix) and /// the current number of entries by counting newline characters. @@ -221,29 +210,46 @@ pub(crate) fn lookup(log_id: u64, offset: usize, config: &Config) -> Option l, - Err(e) => { - tracing::warn!(error = %e, "failed to read line from history file"); + // Open & lock file for reading using a shared lock. + // Retry a few times to avoid indefinite blocking. + for _ in 0..MAX_RETRIES { + // Note: On Windows, try_lock_shared behaves identically to try_lock + #[cfg(windows)] + let lock_result = file.try_lock(); + #[cfg(not(windows))] + let lock_result = file.try_lock_shared(); + + match lock_result { + Ok(()) => { + let reader = BufReader::new(&file); + for (idx, line_res) in reader.lines().enumerate() { + let line = match line_res { + Ok(l) => l, + Err(e) => { + tracing::warn!(error = %e, "failed to read line from history file"); + return None; + } + }; + + if idx == offset { + match serde_json::from_str::(&line) { + Ok(entry) => return Some(entry), + Err(e) => { + tracing::warn!(error = %e, "failed to parse history entry"); + return None; + } + } + } + } + // Not found at requested offset. return None; } - }; - - if idx == offset { - match serde_json::from_str::(&line) { - Ok(entry) => return Some(entry), - Err(e) => { - tracing::warn!(error = %e, "failed to parse history entry"); - return None; - } + Err(std::fs::TryLockError::WouldBlock) => { + std::thread::sleep(RETRY_SLEEP); + } + Err(e) => { + tracing::warn!(error = %e, "failed to acquire shared lock on history file"); + return None; } } } @@ -258,25 +264,7 @@ pub(crate) fn lookup(log_id: u64, offset: usize, config: &Config) -> Option Result<()> { - for _ in 0..MAX_RETRIES { - match file.try_lock_shared() { - Ok(()) => return Ok(()), - Err(e) => match e { - std::fs::TryLockError::WouldBlock => { - std::thread::sleep(RETRY_SLEEP); - } - other => return Err(other.into()), - }, - } - } - - Err(std::io::Error::new( - std::io::ErrorKind::WouldBlock, - "could not acquire shared lock on history file after multiple attempts", - )) -} +// Shared lock handled inline in lookup using std::fs::File::try_lock_shared (Unix) or try_lock (Windows). /// On Unix systems ensure the file permissions are `0o600` (rw-------). If the /// permissions cannot be changed the error is propagated to the caller. From 402ae36121f720268e7f90acf7c099cec8bccf6c Mon Sep 17 00:00:00 2001 From: Michael Bolin Date: Tue, 2 Sep 2025 23:26:52 -0700 Subject: [PATCH 2/3] Update message_history.rs Remove comment that talks about deleted code (was probably written by Codex?). --- codex-rs/core/src/message_history.rs | 2 -- 1 file changed, 2 deletions(-) diff --git a/codex-rs/core/src/message_history.rs b/codex-rs/core/src/message_history.rs index 6d041556df..ae68e71a4c 100644 --- a/codex-rs/core/src/message_history.rs +++ b/codex-rs/core/src/message_history.rs @@ -264,8 +264,6 @@ pub(crate) fn lookup(log_id: u64, offset: usize, config: &Config) -> Option Date: Tue, 2 Sep 2025 23:33:30 -0700 Subject: [PATCH 3/3] Update message_history.rs Fix another comment and also remove a cfg(windows) gate that was inside a function that was cfg(unix) (so it was effectively unreachable code). --- codex-rs/core/src/message_history.rs | 6 ------ 1 file changed, 6 deletions(-) diff --git a/codex-rs/core/src/message_history.rs b/codex-rs/core/src/message_history.rs index ae68e71a4c..75bc5aa75c 100644 --- a/codex-rs/core/src/message_history.rs +++ b/codex-rs/core/src/message_history.rs @@ -133,8 +133,6 @@ pub(crate) async fn append_entry(text: &str, session_id: &Uuid, config: &Config) Ok(()) } -// Exclusive lock handled inline in append_entry using std::fs::File::try_lock. - /// Asynchronously fetch the history file's *identifier* (inode on Unix) and /// the current number of entries by counting newline characters. pub(crate) async fn history_metadata(config: &Config) -> (u64, usize) { @@ -213,10 +211,6 @@ pub(crate) fn lookup(log_id: u64, offset: usize, config: &Config) -> Option