Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
73 changes: 71 additions & 2 deletions src/write.rs
Original file line number Diff line number Diff line change
Expand Up @@ -293,6 +293,43 @@ where
}
}

/// Try to publish once without waiting.
///
/// This performs a single, non-blocking check of reader epochs. If all current readers have
/// advanced since the last swap, it performs a publish and returns `true`. If any reader may
/// still be accessing the old copy, it does nothing and returns `false`.
///
/// Unlike [`publish`](Self::publish), this never spins or waits. Use it on latency-sensitive
/// paths where skipping a publish is preferable to blocking; call again later or fall back to
/// [`publish`](Self::publish) if you must ensure visibility.
///
/// Returns `true` if a publish occurred, `false` otherwise.

pub fn try_publish(&mut self) -> bool {
let epochs = Arc::clone(&self.epochs);
let mut epochs = epochs.lock().unwrap();

// we're over-estimating here, but slab doesn't expose its max index
self.last_epochs.resize(epochs.capacity(), 0);
for (ri, epoch) in epochs.iter() {
if self.last_epochs[ri] % 2 == 0 {
continue;
}

let now = epoch.load(Ordering::Acquire);
if now != self.last_epochs[ri] {
// reader must have seen the last swap, since they have done at least one
// operation since we last looked at their epoch, which _must_ mean that they
// are no longer using the old pointer value.
continue;
} else {
return false;
}
}
self.do_publish(&mut epochs);
return true;
}

/// Publish all operations append to the log to reads.
///
/// This method needs to wait for all readers to move to the "other" copy of the data so that
Expand All @@ -312,6 +349,14 @@ where

self.wait(&mut epochs);

self.do_publish(&mut epochs)
}

/// Actual doing the publishing
fn do_publish(
&mut self,
epochs: &mut MutexGuard<'_, slab::Slab<Arc<AtomicUsize>>>,
) -> &mut Self {
if !self.first {
// all the readers have left!
// safety: we haven't freed the Box, and no readers are accessing the w_handle
Expand Down Expand Up @@ -558,8 +603,8 @@ struct CheckWriteHandleSend;

#[cfg(test)]
mod tests {
use crate::sync::{AtomicUsize, Mutex, Ordering};
use crate::Absorb;
use crate::sync::{Arc, AtomicUsize, Mutex, Ordering};
use crate::{read, Absorb};
use slab::Slab;
include!("./utilities.rs");

Expand Down Expand Up @@ -713,4 +758,28 @@ mod tests {
w.publish();
assert_eq!(w.refreshes, 4);
}

#[test]
fn try_publish() {
let (mut w, _r) = crate::new::<i32, _>();

// Case 1: A reader has not advanced (odd and unchanged) -> returns false
let mut epochs_slab = Slab::new();
let idx = epochs_slab.insert(Arc::new(AtomicUsize::new(1))); // odd epoch, "in read"
// Ensure last_epochs sees this reader as odd and unchanged
w.last_epochs = vec![0; epochs_slab.capacity()];
w.last_epochs[idx] = 1;
w.epochs = Arc::new(Mutex::new(epochs_slab));
assert_eq!(w.try_publish(), false);

// Case 2: All readers have advanced since last swap -> returns true and publishes
let mut epochs_slab_ok = Slab::new();
let idx_ok = epochs_slab_ok.insert(Arc::new(AtomicUsize::new(2))); // advanced
w.last_epochs = vec![0; epochs_slab_ok.capacity()];
w.last_epochs[idx_ok] = 1; // previously odd
w.epochs = Arc::new(Mutex::new(epochs_slab_ok));
let before = w.refreshes;
assert_eq!(w.try_publish(), true);
assert_eq!(w.refreshes, before + 1);
}
}