From f027913c78c35980b1f5abb3be695aec6aa77c47 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Rafael=20=C3=81vila=20de=20Esp=C3=ADndola?= Date: Tue, 5 Aug 2025 17:31:04 +0000 Subject: [PATCH] Refactor the waitlist to another module --- src/conn/pool/futures/disconnect_pool.rs | 2 +- src/conn/pool/futures/get_conn.rs | 2 +- src/conn/pool/mod.rs | 136 ++-------------------- src/conn/pool/waitlist.rs | 139 +++++++++++++++++++++++ 4 files changed, 149 insertions(+), 130 deletions(-) create mode 100644 src/conn/pool/waitlist.rs diff --git a/src/conn/pool/futures/disconnect_pool.rs b/src/conn/pool/futures/disconnect_pool.rs index b8a07d4d..78e8d52e 100644 --- a/src/conn/pool/futures/disconnect_pool.rs +++ b/src/conn/pool/futures/disconnect_pool.rs @@ -16,7 +16,7 @@ use futures_core::ready; use tokio::sync::mpsc::UnboundedSender; use crate::{ - conn::pool::{Inner, Pool, QUEUE_END_ID}, + conn::pool::{waitlist::QUEUE_END_ID, Inner, Pool}, error::Error, Conn, }; diff --git a/src/conn/pool/futures/get_conn.rs b/src/conn/pool/futures/get_conn.rs index b89f9bc6..07451465 100644 --- a/src/conn/pool/futures/get_conn.rs +++ b/src/conn/pool/futures/get_conn.rs @@ -22,7 +22,7 @@ use { use crate::{ conn::{ - pool::{Pool, QueueId}, + pool::{waitlist::QueueId, Pool}, Conn, }, error::*, diff --git a/src/conn/pool/mod.rs b/src/conn/pool/mod.rs index 53688d9b..783b0189 100644 --- a/src/conn/pool/mod.rs +++ b/src/conn/pool/mod.rs @@ -7,17 +7,13 @@ // modified, or distributed except according to those terms. use futures_util::FutureExt; -use keyed_priority_queue::KeyedPriorityQueue; use tokio::sync::mpsc; use std::{ - borrow::Borrow, - cmp::Reverse, collections::VecDeque, - hash::{Hash, Hasher}, str::FromStr, sync::{atomic, Arc, Mutex}, - task::{Context, Poll, Waker}, + task::{Context, Poll}, time::{Duration, Instant}, }; @@ -29,12 +25,14 @@ use crate::{ }; pub use metrics::Metrics; +use waitlist::{QueueId, Waitlist}; mod recycler; // this is a really unfortunate name for a module pub mod futures; mod metrics; mod ttl_check_inerval; +mod waitlist; /// Connection that is idling in the pool. #[derive(Debug)] @@ -104,86 +102,6 @@ impl Exchange { } } -#[derive(Default, Debug)] -struct Waitlist { - queue: KeyedPriorityQueue, -} - -impl Waitlist { - /// Returns `true` if pushed. - fn push(&mut self, waker: Waker, queue_id: QueueId) -> bool { - // The documentation of Future::poll says: - // Note that on multiple calls to poll, only the Waker from - // the Context passed to the most recent call should be - // scheduled to receive a wakeup. - // - // But the the documentation of KeyedPriorityQueue::push says: - // Adds new element to queue if missing key or replace its - // priority if key exists. In second case doesn’t replace key. - // - // This means we have to remove first to have the most recent - // waker in the queue. - let occupied = self.remove(queue_id); - self.queue.push(QueuedWaker { queue_id, waker }, queue_id); - !occupied - } - - fn pop(&mut self) -> Option { - match self.queue.pop() { - Some((qw, _)) => Some(qw.waker), - None => None, - } - } - - /// Returns `true` if removed. - fn remove(&mut self, id: QueueId) -> bool { - self.queue.remove(&id).is_some() - } - - fn peek_id(&mut self) -> Option { - self.queue.peek().map(|(qw, _)| qw.queue_id) - } -} - -const QUEUE_END_ID: QueueId = QueueId(Reverse(u64::MAX)); - -#[derive(Debug, Copy, Clone, Eq, PartialEq, Ord, PartialOrd, Hash)] -pub(crate) struct QueueId(Reverse); - -impl QueueId { - fn next() -> Self { - static NEXT_QUEUE_ID: atomic::AtomicU64 = atomic::AtomicU64::new(0); - let id = NEXT_QUEUE_ID.fetch_add(1, atomic::Ordering::SeqCst); - QueueId(Reverse(id)) - } -} - -#[derive(Debug)] -struct QueuedWaker { - queue_id: QueueId, - waker: Waker, -} - -impl Eq for QueuedWaker {} - -impl Borrow for QueuedWaker { - fn borrow(&self) -> &QueueId { - &self.queue_id - } -} - -impl PartialEq for QueuedWaker { - fn eq(&self, other: &Self) -> bool { - self.queue_id == other.queue_id - } -} - -impl Hash for QueuedWaker { - fn hash(&self, state: &mut H) { - self.queue_id.hash(state) - } -} - /// Connection pool data. #[derive(Debug)] pub struct Inner { @@ -474,20 +392,16 @@ mod test { use waker_fn::waker_fn; use std::{ - cmp::Reverse, future::Future, pin::pin, sync::{Arc, OnceLock}, - task::{Context, Poll, RawWaker, RawWakerVTable, Waker}, + task::{Context, Poll}, time::Duration, }; use crate::{ - conn::pool::{Pool, QueueId, Waitlist, QUEUE_END_ID}, - opts::PoolOpts, - prelude::*, - test_misc::get_opts, - PoolConstraints, Row, TxOpts, Value, + conn::pool::Pool, opts::PoolOpts, prelude::*, test_misc::get_opts, PoolConstraints, Row, + TxOpts, Value, }; macro_rules! conn_ex_field { @@ -1016,7 +930,7 @@ mod test { } drop(only_conn); - assert_eq!(0, pool.inner.exchange.lock().unwrap().waiting.queue.len()); + assert_eq!(0, pool.inner.exchange.lock().unwrap().waiting.len()); // metrics should catch up with waiting queue (see #335) assert_eq!( 0, @@ -1070,40 +984,6 @@ mod test { Ok(()) } - #[test] - fn waitlist_integrity() { - const DATA: *const () = &(); - const NOOP_CLONE_FN: unsafe fn(*const ()) -> RawWaker = |_| RawWaker::new(DATA, &RW_VTABLE); - const NOOP_FN: unsafe fn(*const ()) = |_| {}; - static RW_VTABLE: RawWakerVTable = - RawWakerVTable::new(NOOP_CLONE_FN, NOOP_FN, NOOP_FN, NOOP_FN); - let w = unsafe { Waker::from_raw(RawWaker::new(DATA, &RW_VTABLE)) }; - - let mut waitlist = Waitlist::default(); - assert_eq!(0, waitlist.queue.len()); - - waitlist.push(w.clone(), QueueId(Reverse(4))); - waitlist.push(w.clone(), QueueId(Reverse(2))); - waitlist.push(w.clone(), QueueId(Reverse(8))); - waitlist.push(w.clone(), QUEUE_END_ID); - waitlist.push(w.clone(), QueueId(Reverse(10))); - - waitlist.remove(QueueId(Reverse(8))); - - assert_eq!(4, waitlist.queue.len()); - - let (_, id) = waitlist.queue.pop().unwrap(); - assert_eq!(2, id.0 .0); - let (_, id) = waitlist.queue.pop().unwrap(); - assert_eq!(4, id.0 .0); - let (_, id) = waitlist.queue.pop().unwrap(); - assert_eq!(10, id.0 .0); - let (_, id) = waitlist.queue.pop().unwrap(); - assert_eq!(QUEUE_END_ID, id); - - assert_eq!(0, waitlist.queue.len()); - } - #[tokio::test] async fn check_absolute_connection_ttl() -> super::Result<()> { let constraints = PoolConstraints::new(1, 3).unwrap(); @@ -1185,7 +1065,7 @@ mod test { let queue_len = || { let exchange = pool.inner.exchange.lock().unwrap(); - exchange.waiting.queue.len() + exchange.waiting.len() }; // Get a connection, so we know the next futures will be diff --git a/src/conn/pool/waitlist.rs b/src/conn/pool/waitlist.rs new file mode 100644 index 00000000..186de15d --- /dev/null +++ b/src/conn/pool/waitlist.rs @@ -0,0 +1,139 @@ +use keyed_priority_queue::KeyedPriorityQueue; + +use std::{ + borrow::Borrow, + cmp::Reverse, + hash::{Hash, Hasher}, + sync::atomic, + task::Waker, +}; + +pub(crate) const QUEUE_END_ID: QueueId = QueueId(Reverse(u64::MAX)); + +#[derive(Debug, Copy, Clone, Eq, PartialEq, Ord, PartialOrd, Hash)] +pub(crate) struct QueueId(Reverse); + +impl QueueId { + pub(crate) fn next() -> Self { + static NEXT_QUEUE_ID: atomic::AtomicU64 = atomic::AtomicU64::new(0); + let id = NEXT_QUEUE_ID.fetch_add(1, atomic::Ordering::SeqCst); + QueueId(Reverse(id)) + } +} + +#[derive(Debug)] +struct QueuedWaker { + queue_id: QueueId, + waker: Waker, +} + +impl Eq for QueuedWaker {} + +impl Borrow for QueuedWaker { + fn borrow(&self) -> &QueueId { + &self.queue_id + } +} + +impl PartialEq for QueuedWaker { + fn eq(&self, other: &Self) -> bool { + self.queue_id == other.queue_id + } +} + +impl Hash for QueuedWaker { + fn hash(&self, state: &mut H) { + self.queue_id.hash(state) + } +} + +#[derive(Default, Debug)] +pub(crate) struct Waitlist { + queue: KeyedPriorityQueue, +} + +impl Waitlist { + /// Returns `true` if pushed. + pub(crate) fn push(&mut self, waker: Waker, queue_id: QueueId) -> bool { + // The documentation of Future::poll says: + // Note that on multiple calls to poll, only the Waker from + // the Context passed to the most recent call should be + // scheduled to receive a wakeup. + // + // But the the documentation of KeyedPriorityQueue::push says: + // Adds new element to queue if missing key or replace its + // priority if key exists. In second case doesn’t replace key. + // + // This means we have to remove first to have the most recent + // waker in the queue. + let occupied = self.remove(queue_id); + self.queue.push(QueuedWaker { queue_id, waker }, queue_id); + !occupied + } + + pub(crate) fn pop(&mut self) -> Option { + match self.queue.pop() { + Some((qw, _)) => Some(qw.waker), + None => None, + } + } + + /// Returns `true` if removed. + pub(crate) fn remove(&mut self, id: QueueId) -> bool { + self.queue.remove(&id).is_some() + } + + pub(crate) fn peek_id(&mut self) -> Option { + self.queue.peek().map(|(qw, _)| qw.queue_id) + } + + // only used in tests for now + #[allow(dead_code)] + pub(crate) fn len(&self) -> usize { + self.queue.len() + } +} + +#[cfg(test)] +mod test { + use std::cmp::Reverse; + use std::task::RawWaker; + use std::task::RawWakerVTable; + use std::task::Waker; + + use super::*; + + #[test] + fn waitlist_integrity() { + const DATA: *const () = &(); + const NOOP_CLONE_FN: unsafe fn(*const ()) -> RawWaker = |_| RawWaker::new(DATA, &RW_VTABLE); + const NOOP_FN: unsafe fn(*const ()) = |_| {}; + static RW_VTABLE: RawWakerVTable = + RawWakerVTable::new(NOOP_CLONE_FN, NOOP_FN, NOOP_FN, NOOP_FN); + let w = unsafe { Waker::from_raw(RawWaker::new(DATA, &RW_VTABLE)) }; + + let mut waitlist = Waitlist::default(); + assert_eq!(0, waitlist.queue.len()); + + waitlist.push(w.clone(), QueueId(Reverse(4))); + waitlist.push(w.clone(), QueueId(Reverse(2))); + waitlist.push(w.clone(), QueueId(Reverse(8))); + waitlist.push(w.clone(), QUEUE_END_ID); + waitlist.push(w.clone(), QueueId(Reverse(10))); + + waitlist.remove(QueueId(Reverse(8))); + + assert_eq!(4, waitlist.queue.len()); + + let (_, id) = waitlist.queue.pop().unwrap(); + assert_eq!(2, id.0 .0); + let (_, id) = waitlist.queue.pop().unwrap(); + assert_eq!(4, id.0 .0); + let (_, id) = waitlist.queue.pop().unwrap(); + assert_eq!(10, id.0 .0); + let (_, id) = waitlist.queue.pop().unwrap(); + assert_eq!(QUEUE_END_ID, id); + + assert_eq!(0, waitlist.queue.len()); + } +}