Skip to content

Commit 6aa306c

Browse files
pchuribolinfest
andauthored
feat: add stable file locking using std::fs APIs (#2894)
## Summary This PR implements advisory file locking for the message history using Rust 1.89+ stabilized std::fs::File locking APIs, eliminating the need for external dependencies. ## Key Changes - **Stable API Usage**: Uses std::fs::File::try_lock() and try_lock_shared() APIs stabilized in Rust 1.89 - **Cross-Platform Compatibility**: - Unix systems use try_lock_shared() for advisory read locks - Windows systems use try_lock() due to different lock semantics - **Retry Logic**: Maintains existing retry behavior for concurrent access scenarios - **No External Dependencies**: Removes need for external file locking crates ## Technical Details The implementation provides advisory file locking to prevent corruption when multiple Codex processes attempt to write to the message history file simultaneously. The locking is platform-aware to handle differences in Windows vs Unix file locking behavior. ## Testing - ✅ Builds successfully on all platforms - ✅ Existing message history tests pass - ✅ File locking retry logic verified Related to discussion in #2773 about using stabilized Rust APIs instead of external dependencies. --------- Co-authored-by: Michael Bolin <[email protected]>
1 parent 44dce74 commit 6aa306c

File tree

1 file changed

+55
-75
lines changed

1 file changed

+55
-75
lines changed

codex-rs/core/src/message_history.rs

Lines changed: 55 additions & 75 deletions
Original file line numberDiff line numberDiff line change
@@ -105,47 +105,34 @@ pub(crate) async fn append_entry(text: &str, session_id: &Uuid, config: &Config)
105105
// Ensure permissions.
106106
ensure_owner_only_permissions(&history_file).await?;
107107

108-
// Lock file.
109-
acquire_exclusive_lock_with_retry(&history_file).await?;
110-
111-
// We use sync I/O with spawn_blocking() because we are using a
112-
// [`std::fs::File`] instead of a [`tokio::fs::File`] to leverage an
113-
// advisory file locking API that is not available in the async API.
108+
// Perform a blocking write under an advisory write lock using std::fs.
114109
tokio::task::spawn_blocking(move || -> Result<()> {
115-
history_file.write_all(line.as_bytes())?;
116-
history_file.flush()?;
117-
Ok(())
110+
// Retry a few times to avoid indefinite blocking when contended.
111+
for _ in 0..MAX_RETRIES {
112+
match history_file.try_lock() {
113+
Ok(()) => {
114+
// While holding the exclusive lock, write the full line.
115+
history_file.write_all(line.as_bytes())?;
116+
history_file.flush()?;
117+
return Ok(());
118+
}
119+
Err(std::fs::TryLockError::WouldBlock) => {
120+
std::thread::sleep(RETRY_SLEEP);
121+
}
122+
Err(e) => return Err(e.into()),
123+
}
124+
}
125+
126+
Err(std::io::Error::new(
127+
std::io::ErrorKind::WouldBlock,
128+
"could not acquire exclusive lock on history file after multiple attempts",
129+
))
118130
})
119131
.await??;
120132

121133
Ok(())
122134
}
123135

124-
/// Attempt to acquire an exclusive advisory lock on `file`, retrying up to 10
125-
/// times if the lock is currently held by another process. This prevents a
126-
/// potential indefinite wait while still giving other writers some time to
127-
/// finish their operation.
128-
async fn acquire_exclusive_lock_with_retry(file: &File) -> Result<()> {
129-
use tokio::time::sleep;
130-
131-
for _ in 0..MAX_RETRIES {
132-
match file.try_lock() {
133-
Ok(()) => return Ok(()),
134-
Err(e) => match e {
135-
std::fs::TryLockError::WouldBlock => {
136-
sleep(RETRY_SLEEP).await;
137-
}
138-
other => return Err(other.into()),
139-
},
140-
}
141-
}
142-
143-
Err(std::io::Error::new(
144-
std::io::ErrorKind::WouldBlock,
145-
"could not acquire exclusive lock on history file after multiple attempts",
146-
))
147-
}
148-
149136
/// Asynchronously fetch the history file's *identifier* (inode on Unix) and
150137
/// the current number of entries by counting newline characters.
151138
pub(crate) async fn history_metadata(config: &Config) -> (u64, usize) {
@@ -221,29 +208,42 @@ pub(crate) fn lookup(log_id: u64, offset: usize, config: &Config) -> Option<Hist
221208
return None;
222209
}
223210

224-
// Open & lock file for reading.
225-
if let Err(e) = acquire_shared_lock_with_retry(&file) {
226-
tracing::warn!(error = %e, "failed to acquire shared lock on history file");
227-
return None;
228-
}
229-
230-
let reader = BufReader::new(&file);
231-
for (idx, line_res) in reader.lines().enumerate() {
232-
let line = match line_res {
233-
Ok(l) => l,
234-
Err(e) => {
235-
tracing::warn!(error = %e, "failed to read line from history file");
211+
// Open & lock file for reading using a shared lock.
212+
// Retry a few times to avoid indefinite blocking.
213+
for _ in 0..MAX_RETRIES {
214+
let lock_result = file.try_lock_shared();
215+
216+
match lock_result {
217+
Ok(()) => {
218+
let reader = BufReader::new(&file);
219+
for (idx, line_res) in reader.lines().enumerate() {
220+
let line = match line_res {
221+
Ok(l) => l,
222+
Err(e) => {
223+
tracing::warn!(error = %e, "failed to read line from history file");
224+
return None;
225+
}
226+
};
227+
228+
if idx == offset {
229+
match serde_json::from_str::<HistoryEntry>(&line) {
230+
Ok(entry) => return Some(entry),
231+
Err(e) => {
232+
tracing::warn!(error = %e, "failed to parse history entry");
233+
return None;
234+
}
235+
}
236+
}
237+
}
238+
// Not found at requested offset.
236239
return None;
237240
}
238-
};
239-
240-
if idx == offset {
241-
match serde_json::from_str::<HistoryEntry>(&line) {
242-
Ok(entry) => return Some(entry),
243-
Err(e) => {
244-
tracing::warn!(error = %e, "failed to parse history entry");
245-
return None;
246-
}
241+
Err(std::fs::TryLockError::WouldBlock) => {
242+
std::thread::sleep(RETRY_SLEEP);
243+
}
244+
Err(e) => {
245+
tracing::warn!(error = %e, "failed to acquire shared lock on history file");
246+
return None;
247247
}
248248
}
249249
}
@@ -258,26 +258,6 @@ pub(crate) fn lookup(log_id: u64, offset: usize, config: &Config) -> Option<Hist
258258
None
259259
}
260260

261-
#[cfg(unix)]
262-
fn acquire_shared_lock_with_retry(file: &File) -> Result<()> {
263-
for _ in 0..MAX_RETRIES {
264-
match file.try_lock_shared() {
265-
Ok(()) => return Ok(()),
266-
Err(e) => match e {
267-
std::fs::TryLockError::WouldBlock => {
268-
std::thread::sleep(RETRY_SLEEP);
269-
}
270-
other => return Err(other.into()),
271-
},
272-
}
273-
}
274-
275-
Err(std::io::Error::new(
276-
std::io::ErrorKind::WouldBlock,
277-
"could not acquire shared lock on history file after multiple attempts",
278-
))
279-
}
280-
281261
/// On Unix systems ensure the file permissions are `0o600` (rw-------). If the
282262
/// permissions cannot be changed the error is propagated to the caller.
283263
#[cfg(unix)]

0 commit comments

Comments
 (0)