From 834e1f04aa37836eefd38656aeec73314e4605d5 Mon Sep 17 00:00:00 2001 From: Jason Volk Date: Wed, 25 Dec 2024 05:28:10 +0000 Subject: [PATCH] feat: Add queuing strategies for listener lists. Two strategies are available: - FIFO: The original round-robin queuing; listeners are inserted at the back. - LIFO: The new most-recent queuing; listeners are inserted at the front. LIFO queuing is beneficial for cache-efficiency with workloads that are tolerant of starvation. The same listener is repeatedly drawn from the list until the load dictates additional listeners be drawn from the list. These listeners expand outward as a "hot set" for optimal reuse of resources rather than continuously drawing from the coldest resources in a FIFO schedule. Signed-off-by: Jason Volk --- benches/bench.rs | 17 ++++- src/intrusive.rs | 95 ++++++++++++++--------- src/lib.rs | 48 +++++++++++- src/slab.rs | 11 ++- tests/notify.rs | 195 ++++++++++++++++++++++++++++++++++++++++------- 5 files changed, 292 insertions(+), 74 deletions(-) diff --git a/benches/bench.rs b/benches/bench.rs index d9e0db1..7ec2483 100644 --- a/benches/bench.rs +++ b/benches/bench.rs @@ -1,7 +1,7 @@ use std::iter; use criterion::{criterion_group, criterion_main, Criterion}; -use event_listener::{Event, Listener}; +use event_listener::{Event, Listener, QueueStrategy}; const COUNT: usize = 8000; @@ -20,6 +20,21 @@ fn bench_events(c: &mut Criterion) { } }); }); + + c.bench_function("notify_and_wait_lifo", |b| { + let ev = Event::new_with_queue_strategy(QueueStrategy::Lifo); + let mut handles = Vec::with_capacity(COUNT); + + b.iter(|| { + handles.extend(iter::repeat_with(|| ev.listen()).take(COUNT)); + + ev.notify(COUNT); + + for handle in handles.drain(..) { + handle.wait(); + } + }); + }); } criterion_group!(benches, bench_events); diff --git a/src/intrusive.rs b/src/intrusive.rs index 7150237..572880a 100644 --- a/src/intrusive.rs +++ b/src/intrusive.rs @@ -6,7 +6,7 @@ use crate::notify::{GenericNotify, Internal, Notification}; use crate::sync::atomic::Ordering; use crate::sync::cell::{Cell, UnsafeCell}; -use crate::{RegisterResult, State, TaskRef}; +use crate::{QueueStrategy, RegisterResult, State, TaskRef}; #[cfg(feature = "critical-section")] use core::cell::RefCell; @@ -42,17 +42,21 @@ struct Inner { /// The number of notified listeners. notified: usize, + + /// Strategy by which the list is organized. + strategy: QueueStrategy, } impl List { /// Create a new, empty event listener list. - pub(super) fn new() -> Self { + pub(super) fn new(strategy: QueueStrategy) -> Self { let inner = Inner { head: None, tail: None, next: None, len: 0, notified: 0, + strategy, }; #[cfg(feature = "critical-section")] @@ -149,39 +153,9 @@ impl crate::Inner { }) } - /// Add a new listener to the list. - pub(crate) fn insert(&self, mut listener: Pin<&mut Option>>) { - self.with_inner(|inner| { - listener.as_mut().set(Some(Listener { - link: UnsafeCell::new(Link { - state: Cell::new(State::Created), - prev: Cell::new(inner.tail), - next: Cell::new(None), - }), - _pin: PhantomPinned, - })); - let listener = listener.as_pin_mut().unwrap(); - - { - let entry_guard = listener.link.get(); - // SAFETY: We are locked, so we can access the inner `link`. - let entry = unsafe { entry_guard.deref() }; - - // Replace the tail with the new entry. - match inner.tail.replace(entry.into()) { - None => inner.head = Some(entry.into()), - Some(t) => unsafe { t.as_ref().next.set(Some(entry.into())) }, - }; - } - - // If there are no unnotified entries, this is the first one. - if inner.next.is_none() { - inner.next = inner.tail; - } - - // Bump the entry count. - inner.len += 1; - }); + /// Adds a listener to the list. + pub(crate) fn insert(&self, listener: Pin<&mut Option>>) { + self.with_inner(|inner| inner.insert(listener)) } /// Remove a listener from the list. @@ -248,6 +222,53 @@ impl crate::Inner { } impl Inner { + fn insert(&mut self, mut listener: Pin<&mut Option>>) { + use QueueStrategy::{Fifo, Lifo}; + + listener.as_mut().set(Some(Listener { + link: UnsafeCell::new(Link { + state: Cell::new(State::Created), + prev: Cell::new(self.tail.filter(|_| self.strategy == Fifo)), + next: Cell::new(self.head.filter(|_| self.strategy == Lifo)), + }), + _pin: PhantomPinned, + })); + let listener = listener.as_pin_mut().unwrap(); + + { + let entry_guard = listener.link.get(); + // SAFETY: We are locked, so we can access the inner `link`. + let entry = unsafe { entry_guard.deref() }; + + // Replace the head or tail with the new entry. + let replacing = match self.strategy { + Lifo => &mut self.head, + Fifo => &mut self.tail, + }; + + match replacing.replace(entry.into()) { + None => *replacing = Some(entry.into()), + Some(t) if self.strategy == Lifo => unsafe { + t.as_ref().prev.set(Some(entry.into())) + }, + Some(t) if self.strategy == Fifo => unsafe { + t.as_ref().next.set(Some(entry.into())) + }, + Some(_) => unimplemented!("unimplemented queue strategy"), + }; + } + + // If there are no unnotified entries, or if using LIFO strategy, this is the first one. + if self.strategy == Lifo { + self.next = self.head; + } else if self.next.is_none() { + self.next = self.tail; + } + + // Bump the entry count. + self.len += 1; + } + fn remove( &mut self, mut listener: Pin<&mut Option>>, @@ -413,7 +434,7 @@ mod tests { #[test] fn insert() { - let inner = crate::Inner::new(); + let inner = crate::Inner::new(QueueStrategy::Fifo); make_listeners!(listen1, listen2, listen3); // Register the listeners. @@ -434,7 +455,7 @@ mod tests { #[test] fn drop_non_notified() { - let inner = crate::Inner::new(); + let inner = crate::Inner::new(QueueStrategy::Fifo); make_listeners!(listen1, listen2, listen3); // Register the listeners. diff --git a/src/lib.rs b/src/lib.rs index cb6cdc0..1a12929 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -129,6 +129,16 @@ use sync::WithMut; use notify::NotificationPrivate; pub use notify::{IntoNotification, Notification}; +/// Queuing strategy for listeners. +#[derive(Clone, Copy, Debug, PartialEq)] +pub enum QueueStrategy { + /// First-in-first-out listeners are added to the back of the list. + Fifo, + + /// Last-in-first-out listeners are added to the front of the list. + Lifo, +} + /// Inner state of [`Event`]. struct Inner { /// The number of notified entries, or `usize::MAX` if all of them have been notified. @@ -145,10 +155,10 @@ struct Inner { } impl Inner { - fn new() -> Self { + fn new(queue_strategy: QueueStrategy) -> Self { Self { notified: AtomicUsize::new(usize::MAX), - list: sys::List::new(), + list: sys::List::new(queue_strategy), } } } @@ -179,6 +189,11 @@ pub struct Event { /// is an `Arc` so it's important to keep in mind that it contributes to the [`Arc`]'s /// reference count. inner: AtomicPtr>, + + /// Queuing strategy. + /// + /// Listeners waiting for notification will be arranged according to the strategy. + queue_strategy: QueueStrategy, } unsafe impl Send for Event {} @@ -240,6 +255,7 @@ impl Event { pub const fn with_tag() -> Self { Self { inner: AtomicPtr::new(ptr::null_mut()), + queue_strategy: QueueStrategy::Fifo, } } #[cfg(all(feature = "std", loom))] @@ -247,6 +263,7 @@ impl Event { pub fn with_tag() -> Self { Self { inner: AtomicPtr::new(ptr::null_mut()), + queue_strategy: QueueStrategy::Fifo, } } @@ -473,7 +490,7 @@ impl Event { // If this is the first use, initialize the state. if inner.is_null() { // Allocate the state on the heap. - let new = Arc::new(Inner::::new()); + let new = Arc::new(Inner::::new(self.queue_strategy)); // Convert the state to a raw pointer. let new = Arc::into_raw(new) as *mut Inner; @@ -558,16 +575,39 @@ impl Event<()> { #[inline] #[cfg(not(loom))] pub const fn new() -> Self { + Self::new_with_queue_strategy(QueueStrategy::Fifo) + } + + #[inline] + #[cfg(loom)] + pub fn new() -> Self { + Self::new_with_queue_strategy(QueueStrategy::Fifo) + } + + /// Creates a new [`Event`] with specific queue strategy. + /// + /// # Examples + /// + /// ``` + /// use event_listener::{Event, QueueStrategy}; + /// + /// let event = Event::new_with_queue_strategy(QueueStrategy::Fifo); + /// ``` + #[inline] + #[cfg(not(loom))] + pub const fn new_with_queue_strategy(queue_strategy: QueueStrategy) -> Self { Self { inner: AtomicPtr::new(ptr::null_mut()), + queue_strategy, } } #[inline] #[cfg(loom)] - pub fn new() -> Self { + pub fn new_with_queue_strategy(queue_strategy: QueueStrategy) -> Self { Self { inner: AtomicPtr::new(ptr::null_mut()), + queue_strategy, } } diff --git a/src/slab.rs b/src/slab.rs index 59e1c21..11e9a12 100644 --- a/src/slab.rs +++ b/src/slab.rs @@ -18,7 +18,7 @@ use crate::notify::{GenericNotify, Internal, Notification}; use crate::sync::atomic::{AtomicBool, Ordering}; use crate::sync::cell::{Cell, ConstPtr, UnsafeCell}; use crate::sync::Arc; -use crate::{RegisterResult, State, Task, TaskRef}; +use crate::{QueueStrategy, RegisterResult, State, Task, TaskRef}; use core::fmt; use core::marker::PhantomData; @@ -229,7 +229,12 @@ pub(crate) struct List { } impl List { - pub(super) fn new() -> List { + pub(super) fn new(strategy: QueueStrategy) -> List { + debug_assert!( + strategy == QueueStrategy::Fifo, + "Slab list only supports FIFO strategy" + ); + List { inner: Mutex::new(ListenerSlab::new()), queue: concurrent_queue::ConcurrentQueue::unbounded(), @@ -1362,7 +1367,7 @@ mod tests { #[test] fn uncontended_inner() { - let inner = crate::Inner::new(); + let inner = crate::Inner::new(QueueStrategy::Fifo); // Register two listeners. let (mut listener1, mut listener2, mut listener3) = (None, None, None); diff --git a/tests/notify.rs b/tests/notify.rs index c37dc9a..4ad4fe9 100644 --- a/tests/notify.rs +++ b/tests/notify.rs @@ -3,7 +3,7 @@ use std::pin::Pin; use std::sync::{Arc, Mutex}; use std::task::Context; -use event_listener::{Event, EventListener}; +use event_listener::{Event, EventListener, QueueStrategy}; use waker_fn::waker_fn; #[cfg(target_family = "wasm")] @@ -17,8 +17,17 @@ fn is_notified(listener: &mut EventListener) -> bool { } #[test] -fn notify() { - let event = Event::new(); +fn notify_fifo() { + notify(QueueStrategy::Fifo) +} + +#[test] +fn notify_lifo() { + notify(QueueStrategy::Lifo) +} + +fn notify(queue_strategy: QueueStrategy) { + let event = Event::new_with_queue_strategy(queue_strategy); let mut l1 = event.listen(); let mut l2 = event.listen(); @@ -31,14 +40,32 @@ fn notify() { assert_eq!(event.notify(2), 2); assert_eq!(event.notify(1), 0); - assert!(is_notified(&mut l1)); - assert!(is_notified(&mut l2)); - assert!(!is_notified(&mut l3)); + match queue_strategy { + QueueStrategy::Fifo => { + assert!(is_notified(&mut l1)); + assert!(is_notified(&mut l2)); + assert!(!is_notified(&mut l3)); + } + QueueStrategy::Lifo => { + assert!(is_notified(&mut l3)); + assert!(is_notified(&mut l2)); + assert!(!is_notified(&mut l1)); + } + } } #[test] -fn notify_additional() { - let event = Event::new(); +fn notify_additional_fifo() { + notify_additional(QueueStrategy::Fifo) +} + +#[test] +fn notify_additional_lifo() { + notify_additional(QueueStrategy::Lifo) +} + +fn notify_additional(queue_strategy: QueueStrategy) { + let event = Event::new_with_queue_strategy(queue_strategy); let mut l1 = event.listen(); let mut l2 = event.listen(); @@ -48,14 +75,32 @@ fn notify_additional() { assert_eq!(event.notify(1), 0); assert_eq!(event.notify_additional(1), 1); - assert!(is_notified(&mut l1)); - assert!(is_notified(&mut l2)); - assert!(!is_notified(&mut l3)); + match queue_strategy { + QueueStrategy::Fifo => { + assert!(is_notified(&mut l1)); + assert!(is_notified(&mut l2)); + assert!(!is_notified(&mut l3)); + } + QueueStrategy::Lifo => { + assert!(is_notified(&mut l3)); + assert!(is_notified(&mut l2)); + assert!(!is_notified(&mut l1)); + } + } } #[test] -fn notify_one() { - let event = Event::new(); +fn notify_one_fifo() { + notify_one(QueueStrategy::Fifo) +} + +#[test] +fn notify_one_lifo() { + notify_one(QueueStrategy::Lifo) +} + +fn notify_one(queue_strategy: QueueStrategy) { + let event = Event::new_with_queue_strategy(queue_strategy); let mut l1 = event.listen(); let mut l2 = event.listen(); @@ -64,16 +109,36 @@ fn notify_one() { assert!(!is_notified(&mut l2)); assert_eq!(event.notify(1), 1); - assert!(is_notified(&mut l1)); - assert!(!is_notified(&mut l2)); + match queue_strategy { + QueueStrategy::Fifo => { + assert!(is_notified(&mut l1)); + assert!(!is_notified(&mut l2)); + } + QueueStrategy::Lifo => { + assert!(is_notified(&mut l2)); + assert!(!is_notified(&mut l1)); + } + } assert_eq!(event.notify(1), 1); - assert!(is_notified(&mut l2)); + match queue_strategy { + QueueStrategy::Fifo => assert!(is_notified(&mut l2)), + QueueStrategy::Lifo => assert!(is_notified(&mut l1)), + } } #[test] -fn notify_all() { - let event = Event::new(); +fn notify_all_fifo() { + notify_all(QueueStrategy::Fifo) +} + +#[test] +fn notify_all_lifo() { + notify_all(QueueStrategy::Lifo) +} + +fn notify_all(queue_strategy: QueueStrategy) { + let event = Event::new_with_queue_strategy(queue_strategy); let mut l1 = event.listen(); let mut l2 = event.listen(); @@ -87,8 +152,8 @@ fn notify_all() { } #[test] -fn drop_notified() { - let event = Event::new(); +fn drop_notified_fifo() { + let event = Event::new_with_queue_strategy(QueueStrategy::Fifo); let l1 = event.listen(); let mut l2 = event.listen(); @@ -101,8 +166,22 @@ fn drop_notified() { } #[test] -fn drop_notified2() { - let event = Event::new(); +fn drop_notified_lifo() { + let event = Event::new_with_queue_strategy(QueueStrategy::Lifo); + + let mut l1 = event.listen(); + let mut l2 = event.listen(); + let l3 = event.listen(); + + assert_eq!(event.notify(1), 1); + drop(l3); + assert!(is_notified(&mut l2)); + assert!(!is_notified(&mut l1)); +} + +#[test] +fn drop_notified2_fifo() { + let event = Event::new_with_queue_strategy(QueueStrategy::Fifo); let l1 = event.listen(); let mut l2 = event.listen(); @@ -115,8 +194,22 @@ fn drop_notified2() { } #[test] -fn drop_notified_additional() { - let event = Event::new(); +fn drop_notified2_lifo() { + let event = Event::new_with_queue_strategy(QueueStrategy::Lifo); + + let mut l1 = event.listen(); + let mut l2 = event.listen(); + let l3 = event.listen(); + + assert_eq!(event.notify(2), 2); + drop(l3); + assert!(is_notified(&mut l2)); + assert!(!is_notified(&mut l1)); +} + +#[test] +fn drop_notified_additional_fifo() { + let event = Event::new_with_queue_strategy(QueueStrategy::Fifo); let l1 = event.listen(); let mut l2 = event.listen(); @@ -132,8 +225,25 @@ fn drop_notified_additional() { } #[test] -fn drop_non_notified() { - let event = Event::new(); +fn drop_notified_additional_lifo() { + let event = Event::new_with_queue_strategy(QueueStrategy::Lifo); + + let mut l1 = event.listen(); + let mut l2 = event.listen(); + let mut l3 = event.listen(); + let l4 = event.listen(); + + assert_eq!(event.notify_additional(1), 1); + assert_eq!(event.notify(2), 1); + drop(l4); + assert!(is_notified(&mut l3)); + assert!(is_notified(&mut l2)); + assert!(!is_notified(&mut l1)); +} + +#[test] +fn drop_non_notified_fifo() { + let event = Event::new_with_queue_strategy(QueueStrategy::Fifo); let mut l1 = event.listen(); let mut l2 = event.listen(); @@ -146,8 +256,31 @@ fn drop_non_notified() { } #[test] -fn notify_all_fair() { - let event = Event::new(); +fn drop_non_notified_lifo() { + let event = Event::new_with_queue_strategy(QueueStrategy::Lifo); + + let l1 = event.listen(); + let mut l2 = event.listen(); + let mut l3 = event.listen(); + + assert_eq!(event.notify(1), 1); + drop(l1); + assert!(is_notified(&mut l3)); + assert!(!is_notified(&mut l2)); +} + +#[test] +fn notify_all_fair_fifo() { + notify_all_fair(QueueStrategy::Fifo) +} + +#[test] +fn notify_all_fair_lifo() { + notify_all_fair(QueueStrategy::Lifo) +} + +fn notify_all_fair(queue_strategy: QueueStrategy) { + let event = Event::new_with_queue_strategy(queue_strategy); let v = Arc::new(Mutex::new(vec![])); let mut l1 = event.listen(); @@ -178,7 +311,11 @@ fn notify_all_fair() { .is_pending()); assert_eq!(event.notify(usize::MAX), 3); - assert_eq!(&*v.lock().unwrap(), &[1, 2, 3]); + + match queue_strategy { + QueueStrategy::Fifo => assert_eq!(&*v.lock().unwrap(), &[1, 2, 3]), + QueueStrategy::Lifo => assert_eq!(&*v.lock().unwrap(), &[3, 2, 1]), + } assert!(Pin::new(&mut l1) .poll(&mut Context::from_waker(&waker1))