Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
130 changes: 55 additions & 75 deletions codex-rs/core/src/message_history.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,47 +105,34 @@ pub(crate) async fn append_entry(text: &str, session_id: &Uuid, config: &Config)
// Ensure permissions.
ensure_owner_only_permissions(&history_file).await?;

// Lock file.
acquire_exclusive_lock_with_retry(&history_file).await?;

// We use sync I/O with spawn_blocking() because we are using a
// [`std::fs::File`] instead of a [`tokio::fs::File`] to leverage an
// advisory file locking API that is not available in the async API.
// Perform a blocking write under an advisory write lock using std::fs.
tokio::task::spawn_blocking(move || -> Result<()> {
history_file.write_all(line.as_bytes())?;
history_file.flush()?;
Ok(())
// Retry a few times to avoid indefinite blocking when contended.
for _ in 0..MAX_RETRIES {
match history_file.try_lock() {
Ok(()) => {
// While holding the exclusive lock, write the full line.
history_file.write_all(line.as_bytes())?;
history_file.flush()?;
return Ok(());
}
Err(std::fs::TryLockError::WouldBlock) => {
std::thread::sleep(RETRY_SLEEP);
}
Err(e) => return Err(e.into()),
}
}

Err(std::io::Error::new(
std::io::ErrorKind::WouldBlock,
"could not acquire exclusive lock on history file after multiple attempts",
))
})
.await??;

Ok(())
}

/// Attempt to acquire an exclusive advisory lock on `file`, retrying up to 10
/// times if the lock is currently held by another process. This prevents a
/// potential indefinite wait while still giving other writers some time to
/// finish their operation.
async fn acquire_exclusive_lock_with_retry(file: &File) -> Result<()> {
use tokio::time::sleep;

for _ in 0..MAX_RETRIES {
match file.try_lock() {
Ok(()) => return Ok(()),
Err(e) => match e {
std::fs::TryLockError::WouldBlock => {
sleep(RETRY_SLEEP).await;
}
other => return Err(other.into()),
},
}
}

Err(std::io::Error::new(
std::io::ErrorKind::WouldBlock,
"could not acquire exclusive lock on history file after multiple attempts",
))
}

/// Asynchronously fetch the history file's *identifier* (inode on Unix) and
/// the current number of entries by counting newline characters.
pub(crate) async fn history_metadata(config: &Config) -> (u64, usize) {
Expand Down Expand Up @@ -221,29 +208,42 @@ pub(crate) fn lookup(log_id: u64, offset: usize, config: &Config) -> Option<Hist
return None;
}

// Open & lock file for reading.
if let Err(e) = acquire_shared_lock_with_retry(&file) {
tracing::warn!(error = %e, "failed to acquire shared lock on history file");
return None;
}

let reader = BufReader::new(&file);
for (idx, line_res) in reader.lines().enumerate() {
let line = match line_res {
Ok(l) => l,
Err(e) => {
tracing::warn!(error = %e, "failed to read line from history file");
// Open & lock file for reading using a shared lock.
// Retry a few times to avoid indefinite blocking.
for _ in 0..MAX_RETRIES {
let lock_result = file.try_lock_shared();

match lock_result {
Ok(()) => {
let reader = BufReader::new(&file);
for (idx, line_res) in reader.lines().enumerate() {
let line = match line_res {
Ok(l) => l,
Err(e) => {
tracing::warn!(error = %e, "failed to read line from history file");
return None;
}
};

if idx == offset {
match serde_json::from_str::<HistoryEntry>(&line) {
Ok(entry) => return Some(entry),
Err(e) => {
tracing::warn!(error = %e, "failed to parse history entry");
return None;
}
}
}
}
// Not found at requested offset.
return None;
}
};

if idx == offset {
match serde_json::from_str::<HistoryEntry>(&line) {
Ok(entry) => return Some(entry),
Err(e) => {
tracing::warn!(error = %e, "failed to parse history entry");
return None;
}
Err(std::fs::TryLockError::WouldBlock) => {
std::thread::sleep(RETRY_SLEEP);
}
Err(e) => {
tracing::warn!(error = %e, "failed to acquire shared lock on history file");
return None;
}
}
}
Expand All @@ -258,26 +258,6 @@ pub(crate) fn lookup(log_id: u64, offset: usize, config: &Config) -> Option<Hist
None
}

#[cfg(unix)]
fn acquire_shared_lock_with_retry(file: &File) -> Result<()> {
for _ in 0..MAX_RETRIES {
match file.try_lock_shared() {
Ok(()) => return Ok(()),
Err(e) => match e {
std::fs::TryLockError::WouldBlock => {
std::thread::sleep(RETRY_SLEEP);
}
other => return Err(other.into()),
},
}
}

Err(std::io::Error::new(
std::io::ErrorKind::WouldBlock,
"could not acquire shared lock on history file after multiple attempts",
))
}

/// On Unix systems ensure the file permissions are `0o600` (rw-------). If the
/// permissions cannot be changed the error is propagated to the caller.
#[cfg(unix)]
Expand Down