From ef58180567feef80487a2efbcd18b11f3f2f7c57 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Rafael=20=C3=81vila=20de=20Esp=C3=ADndola?= Date: Thu, 24 Jul 2025 21:43:10 +0000 Subject: [PATCH 1/4] Drop a lock earlier Once we have the connection, we can unlock the exchange. --- src/conn/pool/recycler.rs | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/src/conn/pool/recycler.rs b/src/conn/pool/recycler.rs index 2809dc0..4a83451 100644 --- a/src/conn/pool/recycler.rs +++ b/src/conn/pool/recycler.rs @@ -165,9 +165,12 @@ impl Future for Recycler { // if we've been asked to close, reclaim any idle connections if close || self.eof { - while let Some(IdlingConn { conn, .. }) = - self.inner.exchange.lock().unwrap().available.pop_front() - { + loop { + let Some(IdlingConn { conn, .. }) = + self.inner.exchange.lock().unwrap().available.pop_front() + else { + break; + }; assert!(conn.inner.pool.is_none()); conn_decision!(self, conn); } From b69c1cc5136ecbf63f34b29f097e87c0a3eed133 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Rafael=20=C3=81vila=20de=20Esp=C3=ADndola?= Date: Thu, 24 Jul 2025 21:34:25 +0000 Subject: [PATCH 2/4] Convert the conn_return macro into a function --- src/conn/pool/recycler.rs | 74 ++++++++++++++++++--------------------- 1 file changed, 34 insertions(+), 40 deletions(-) diff --git a/src/conn/pool/recycler.rs b/src/conn/pool/recycler.rs index 4a83451..beb7eb0 100644 --- a/src/conn/pool/recycler.rs +++ b/src/conn/pool/recycler.rs @@ -54,6 +54,38 @@ impl Recycler { eof: false, } } + + fn conn_return(&mut self, conn: Conn, pool_is_closed: bool) { + let mut exchange = self.inner.exchange.lock().unwrap(); + if pool_is_closed || exchange.available.len() >= self.pool_opts.active_bound() { + drop(exchange); + self.inner + .metrics + .discarded_superfluous_connection + .fetch_add(1, Ordering::Relaxed); + self.discard.push(conn.close_conn().boxed()); + } else { + self.inner + .metrics + .connection_returned_to_pool + .fetch_add(1, Ordering::Relaxed); + #[cfg(feature = "hdrhistogram")] + self.inner + .metrics + .connection_active_duration + .lock() + .unwrap() + .saturating_record(conn.inner.active_since.elapsed().as_micros() as u64); + exchange.available.push_back(conn.into()); + self.inner + .metrics + .connections_in_pool + .store(exchange.available.len(), Ordering::Relaxed); + if let Some(w) = exchange.waiting.pop() { + w.wake(); + } + } + } } impl Future for Recycler { @@ -62,44 +94,6 @@ impl Future for Recycler { fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { let mut close = self.inner.close.load(Ordering::Acquire); - macro_rules! conn_return { - ($self:ident, $conn:ident, $pool_is_closed: expr) => {{ - let mut exchange = $self.inner.exchange.lock().unwrap(); - if $pool_is_closed || exchange.available.len() >= $self.pool_opts.active_bound() { - drop(exchange); - $self - .inner - .metrics - .discarded_superfluous_connection - .fetch_add(1, Ordering::Relaxed); - $self.discard.push($conn.close_conn().boxed()); - } else { - $self - .inner - .metrics - .connection_returned_to_pool - .fetch_add(1, Ordering::Relaxed); - #[cfg(feature = "hdrhistogram")] - $self - .inner - .metrics - .connection_active_duration - .lock() - .unwrap() - .saturating_record($conn.inner.active_since.elapsed().as_micros() as u64); - exchange.available.push_back($conn.into()); - $self - .inner - .metrics - .connections_in_pool - .store(exchange.available.len(), Ordering::Relaxed); - if let Some(w) = exchange.waiting.pop() { - w.wake(); - } - } - }}; - } - macro_rules! conn_decision { ($self:ident, $conn:ident) => { if $conn.inner.stream.is_none() || $conn.inner.disconnected { @@ -132,7 +126,7 @@ impl Future for Recycler { .fetch_add(1, Ordering::Relaxed); $self.reset.push($conn.reset_for_pool().boxed()); } else { - conn_return!($self, $conn, false); + $self.conn_return($conn, false); } }; } @@ -202,7 +196,7 @@ impl Future for Recycler { loop { match Pin::new(&mut self.reset).poll_next(cx) { Poll::Pending | Poll::Ready(None) => break, - Poll::Ready(Some(Ok(conn))) => conn_return!(self, conn, close), + Poll::Ready(Some(Ok(conn))) => self.conn_return(conn, close), Poll::Ready(Some(Err(e))) => { // an error during reset. // replace with a new connection From 3b79b05186e25da0b619f1dbc7220dace619ce11 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Rafael=20=C3=81vila=20de=20Esp=C3=ADndola?= Date: Tue, 5 Aug 2025 17:31:04 +0000 Subject: [PATCH 3/4] Refactor the waitlist to another module --- src/conn/pool/futures/disconnect_pool.rs | 2 +- src/conn/pool/futures/get_conn.rs | 2 +- src/conn/pool/mod.rs | 158 ++------------------- src/conn/pool/waitlist.rs | 166 +++++++++++++++++++++++ 4 files changed, 177 insertions(+), 151 deletions(-) create mode 100644 src/conn/pool/waitlist.rs diff --git a/src/conn/pool/futures/disconnect_pool.rs b/src/conn/pool/futures/disconnect_pool.rs index b8a07d4..78e8d52 100644 --- a/src/conn/pool/futures/disconnect_pool.rs +++ b/src/conn/pool/futures/disconnect_pool.rs @@ -16,7 +16,7 @@ use futures_core::ready; use tokio::sync::mpsc::UnboundedSender; use crate::{ - conn::pool::{Inner, Pool, QUEUE_END_ID}, + conn::pool::{waitlist::QUEUE_END_ID, Inner, Pool}, error::Error, Conn, }; diff --git a/src/conn/pool/futures/get_conn.rs b/src/conn/pool/futures/get_conn.rs index b89f9bc..0745146 100644 --- a/src/conn/pool/futures/get_conn.rs +++ b/src/conn/pool/futures/get_conn.rs @@ -22,7 +22,7 @@ use { use crate::{ conn::{ - pool::{Pool, QueueId}, + pool::{waitlist::QueueId, Pool}, Conn, }, error::*, diff --git a/src/conn/pool/mod.rs b/src/conn/pool/mod.rs index de68544..8213edd 100644 --- a/src/conn/pool/mod.rs +++ b/src/conn/pool/mod.rs @@ -7,17 +7,13 @@ // modified, or distributed except according to those terms. use futures_util::FutureExt; -use keyed_priority_queue::KeyedPriorityQueue; use tokio::sync::mpsc; use std::{ - borrow::Borrow, - cmp::Reverse, collections::VecDeque, - hash::{Hash, Hasher}, str::FromStr, sync::{atomic, Arc, Mutex}, - task::{Context, Poll, Waker}, + task::{Context, Poll}, time::{Duration, Instant}, }; @@ -29,12 +25,14 @@ use crate::{ }; pub use metrics::Metrics; +use waitlist::{QueueId, Waitlist}; mod recycler; // this is a really unfortunate name for a module pub mod futures; mod metrics; mod ttl_check_inerval; +mod waitlist; /// Connection that is idling in the pool. #[derive(Debug)] @@ -104,103 +102,6 @@ impl Exchange { } } -#[derive(Default, Debug)] -struct Waitlist { - queue: KeyedPriorityQueue, - metrics: Arc, -} - -impl Waitlist { - /// Returns `true` if pushed. - fn push(&mut self, waker: Waker, queue_id: QueueId) -> bool { - // The documentation of Future::poll says: - // Note that on multiple calls to poll, only the Waker from - // the Context passed to the most recent call should be - // scheduled to receive a wakeup. - // - // But the the documentation of KeyedPriorityQueue::push says: - // Adds new element to queue if missing key or replace its - // priority if key exists. In second case doesn’t replace key. - // - // This means we have to remove first to have the most recent - // waker in the queue. - let occupied = self.remove(queue_id); - self.queue.push(QueuedWaker { queue_id, waker }, queue_id); - - self.metrics - .active_wait_requests - .fetch_add(1, atomic::Ordering::Relaxed); - - !occupied - } - - fn pop(&mut self) -> Option { - match self.queue.pop() { - Some((qw, _)) => { - self.metrics - .active_wait_requests - .fetch_sub(1, atomic::Ordering::Relaxed); - Some(qw.waker) - } - None => None, - } - } - - /// Returns `true` if removed. - fn remove(&mut self, id: QueueId) -> bool { - let is_removed = self.queue.remove(&id).is_some(); - if is_removed { - self.metrics - .active_wait_requests - .fetch_sub(1, atomic::Ordering::Relaxed); - } - - is_removed - } - - fn peek_id(&mut self) -> Option { - self.queue.peek().map(|(qw, _)| qw.queue_id) - } -} - -const QUEUE_END_ID: QueueId = QueueId(Reverse(u64::MAX)); -#[derive(Debug, Copy, Clone, Eq, PartialEq, Ord, PartialOrd, Hash)] -pub(crate) struct QueueId(Reverse); - -impl QueueId { - fn next() -> Self { - static NEXT_QUEUE_ID: atomic::AtomicU64 = atomic::AtomicU64::new(0); - let id = NEXT_QUEUE_ID.fetch_add(1, atomic::Ordering::SeqCst); - QueueId(Reverse(id)) - } -} - -#[derive(Debug)] -struct QueuedWaker { - queue_id: QueueId, - waker: Waker, -} - -impl Eq for QueuedWaker {} - -impl Borrow for QueuedWaker { - fn borrow(&self) -> &QueueId { - &self.queue_id - } -} - -impl PartialEq for QueuedWaker { - fn eq(&self, other: &Self) -> bool { - self.queue_id == other.queue_id - } -} - -impl Hash for QueuedWaker { - fn hash(&self, state: &mut H) { - self.queue_id.hash(state) - } -} - /// Connection pool data. #[derive(Debug)] pub struct Inner { @@ -248,10 +149,7 @@ impl Pool { metrics: metrics.clone(), exchange: Mutex::new(Exchange { available: VecDeque::with_capacity(pool_opts.constraints().max()), - waiting: Waitlist { - queue: KeyedPriorityQueue::default(), - metrics, - }, + waiting: Waitlist::new(metrics), exist: 0, recycler: Some((rx, pool_opts)), }), @@ -480,20 +378,16 @@ mod test { use waker_fn::waker_fn; use std::{ - cmp::Reverse, future::Future, pin::pin, sync::{Arc, OnceLock}, - task::{Context, Poll, RawWaker, RawWakerVTable, Waker}, + task::{Context, Poll}, time::Duration, }; use crate::{ - conn::pool::{Pool, QueueId, Waitlist, QUEUE_END_ID}, - opts::PoolOpts, - prelude::*, - test_misc::get_opts, - PoolConstraints, Row, TxOpts, Value, + conn::pool::Pool, opts::PoolOpts, prelude::*, test_misc::get_opts, PoolConstraints, Row, + TxOpts, Value, }; macro_rules! conn_ex_field { @@ -1022,7 +916,7 @@ mod test { } drop(only_conn); - assert_eq!(0, pool.inner.exchange.lock().unwrap().waiting.queue.len()); + assert_eq!(0, pool.inner.exchange.lock().unwrap().waiting.len()); // metrics should catch up with waiting queue (see #335) assert_eq!( 0, @@ -1076,40 +970,6 @@ mod test { Ok(()) } - #[test] - fn waitlist_integrity() { - const DATA: *const () = &(); - const NOOP_CLONE_FN: unsafe fn(*const ()) -> RawWaker = |_| RawWaker::new(DATA, &RW_VTABLE); - const NOOP_FN: unsafe fn(*const ()) = |_| {}; - static RW_VTABLE: RawWakerVTable = - RawWakerVTable::new(NOOP_CLONE_FN, NOOP_FN, NOOP_FN, NOOP_FN); - let w = unsafe { Waker::from_raw(RawWaker::new(DATA, &RW_VTABLE)) }; - - let mut waitlist = Waitlist::default(); - assert_eq!(0, waitlist.queue.len()); - - waitlist.push(w.clone(), QueueId(Reverse(4))); - waitlist.push(w.clone(), QueueId(Reverse(2))); - waitlist.push(w.clone(), QueueId(Reverse(8))); - waitlist.push(w.clone(), QUEUE_END_ID); - waitlist.push(w.clone(), QueueId(Reverse(10))); - - waitlist.remove(QueueId(Reverse(8))); - - assert_eq!(4, waitlist.queue.len()); - - let (_, id) = waitlist.queue.pop().unwrap(); - assert_eq!(2, id.0 .0); - let (_, id) = waitlist.queue.pop().unwrap(); - assert_eq!(4, id.0 .0); - let (_, id) = waitlist.queue.pop().unwrap(); - assert_eq!(10, id.0 .0); - let (_, id) = waitlist.queue.pop().unwrap(); - assert_eq!(QUEUE_END_ID, id); - - assert_eq!(0, waitlist.queue.len()); - } - #[tokio::test] async fn check_absolute_connection_ttl() -> super::Result<()> { let constraints = PoolConstraints::new(1, 3).unwrap(); @@ -1191,7 +1051,7 @@ mod test { let queue_len = || { let exchange = pool.inner.exchange.lock().unwrap(); - exchange.waiting.queue.len() + exchange.waiting.len() }; // Get a connection, so we know the next futures will be diff --git a/src/conn/pool/waitlist.rs b/src/conn/pool/waitlist.rs new file mode 100644 index 0000000..390eb1d --- /dev/null +++ b/src/conn/pool/waitlist.rs @@ -0,0 +1,166 @@ +use keyed_priority_queue::KeyedPriorityQueue; + +use std::{ + borrow::Borrow, + cmp::Reverse, + hash::{Hash, Hasher}, + sync::atomic, + sync::Arc, + task::Waker, +}; + +use crate::Metrics; + +#[derive(Debug)] +pub(crate) struct Waitlist { + queue: KeyedPriorityQueue, + metrics: Arc, +} + +impl Waitlist { + pub(crate) fn new(metrics: Arc) -> Waitlist { + Waitlist { + queue: Default::default(), + metrics, + } + } + + /// Returns `true` if pushed. + pub(crate) fn push(&mut self, waker: Waker, queue_id: QueueId) -> bool { + // The documentation of Future::poll says: + // Note that on multiple calls to poll, only the Waker from + // the Context passed to the most recent call should be + // scheduled to receive a wakeup. + // + // But the the documentation of KeyedPriorityQueue::push says: + // Adds new element to queue if missing key or replace its + // priority if key exists. In second case doesn’t replace key. + // + // This means we have to remove first to have the most recent + // waker in the queue. + let occupied = self.remove(queue_id); + self.queue.push(QueuedWaker { queue_id, waker }, queue_id); + + self.metrics + .active_wait_requests + .fetch_add(1, atomic::Ordering::Relaxed); + + !occupied + } + + pub(crate) fn pop(&mut self) -> Option { + match self.queue.pop() { + Some((qw, _)) => { + self.metrics + .active_wait_requests + .fetch_sub(1, atomic::Ordering::Relaxed); + Some(qw.waker) + } + None => None, + } + } + + /// Returns `true` if removed. + pub(crate) fn remove(&mut self, id: QueueId) -> bool { + let is_removed = self.queue.remove(&id).is_some(); + if is_removed { + self.metrics + .active_wait_requests + .fetch_sub(1, atomic::Ordering::Relaxed); + } + + is_removed + } + + pub(crate) fn peek_id(&mut self) -> Option { + self.queue.peek().map(|(qw, _)| qw.queue_id) + } + + // only used in tests for now + #[allow(dead_code)] + pub(crate) fn len(&self) -> usize { + self.queue.len() + } +} + +pub(crate) const QUEUE_END_ID: QueueId = QueueId(Reverse(u64::MAX)); +#[derive(Debug, Copy, Clone, Eq, PartialEq, Ord, PartialOrd, Hash)] +pub(crate) struct QueueId(Reverse); + +impl QueueId { + pub(crate) fn next() -> Self { + static NEXT_QUEUE_ID: atomic::AtomicU64 = atomic::AtomicU64::new(0); + let id = NEXT_QUEUE_ID.fetch_add(1, atomic::Ordering::SeqCst); + QueueId(Reverse(id)) + } +} + +#[derive(Debug)] +struct QueuedWaker { + queue_id: QueueId, + waker: Waker, +} + +impl Eq for QueuedWaker {} + +impl Borrow for QueuedWaker { + fn borrow(&self) -> &QueueId { + &self.queue_id + } +} + +impl PartialEq for QueuedWaker { + fn eq(&self, other: &Self) -> bool { + self.queue_id == other.queue_id + } +} + +impl Hash for QueuedWaker { + fn hash(&self, state: &mut H) { + self.queue_id.hash(state) + } +} + +#[cfg(test)] +mod test { + use std::cmp::Reverse; + use std::task::RawWaker; + use std::task::RawWakerVTable; + use std::task::Waker; + + use super::*; + + #[test] + fn waitlist_integrity() { + const DATA: *const () = &(); + const NOOP_CLONE_FN: unsafe fn(*const ()) -> RawWaker = |_| RawWaker::new(DATA, &RW_VTABLE); + const NOOP_FN: unsafe fn(*const ()) = |_| {}; + static RW_VTABLE: RawWakerVTable = + RawWakerVTable::new(NOOP_CLONE_FN, NOOP_FN, NOOP_FN, NOOP_FN); + let w = unsafe { Waker::from_raw(RawWaker::new(DATA, &RW_VTABLE)) }; + + let mut waitlist = Waitlist::new(Default::default()); + assert_eq!(0, waitlist.queue.len()); + + waitlist.push(w.clone(), QueueId(Reverse(4))); + waitlist.push(w.clone(), QueueId(Reverse(2))); + waitlist.push(w.clone(), QueueId(Reverse(8))); + waitlist.push(w.clone(), QUEUE_END_ID); + waitlist.push(w.clone(), QueueId(Reverse(10))); + + waitlist.remove(QueueId(Reverse(8))); + + assert_eq!(4, waitlist.queue.len()); + + let (_, id) = waitlist.queue.pop().unwrap(); + assert_eq!(2, id.0 .0); + let (_, id) = waitlist.queue.pop().unwrap(); + assert_eq!(4, id.0 .0); + let (_, id) = waitlist.queue.pop().unwrap(); + assert_eq!(10, id.0 .0); + let (_, id) = waitlist.queue.pop().unwrap(); + assert_eq!(QUEUE_END_ID, id); + + assert_eq!(0, waitlist.queue.len()); + } +} From 5c740bcbc54911a730d06d2f84a0b55da8bed156 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Rafael=20=C3=81vila=20de=20Esp=C3=ADndola?= Date: Tue, 12 Aug 2025 16:26:16 +0000 Subject: [PATCH 4/4] Make the Waitlist responsible for calling wake This api is harder to misuse, as one cannot forget to call wake after pop. --- src/conn/pool/mod.rs | 4 +--- src/conn/pool/recycler.rs | 12 +++--------- src/conn/pool/waitlist.rs | 8 +++++--- 3 files changed, 9 insertions(+), 15 deletions(-) diff --git a/src/conn/pool/mod.rs b/src/conn/pool/mod.rs index 8213edd..ae55bed 100644 --- a/src/conn/pool/mod.rs +++ b/src/conn/pool/mod.rs @@ -229,9 +229,7 @@ impl Pool { .connection_count .store(exchange.exist, atomic::Ordering::Relaxed); // we just enabled the creation of a new connection! - if let Some(w) = exchange.waiting.pop() { - w.wake(); - } + exchange.waiting.wake(); } /// Poll the pool for an available connection. diff --git a/src/conn/pool/recycler.rs b/src/conn/pool/recycler.rs index beb7eb0..f63eccd 100644 --- a/src/conn/pool/recycler.rs +++ b/src/conn/pool/recycler.rs @@ -81,9 +81,7 @@ impl Recycler { .metrics .connections_in_pool .store(exchange.available.len(), Ordering::Relaxed); - if let Some(w) = exchange.waiting.pop() { - w.wake(); - } + exchange.waiting.wake(); } } } @@ -244,9 +242,7 @@ impl Future for Recycler { .connection_count .store(exchange.exist, Ordering::Relaxed); for _ in 0..self.discarded { - if let Some(w) = exchange.waiting.pop() { - w.wake(); - } + exchange.waiting.wake(); } drop(exchange); self.discarded = 0; @@ -282,9 +278,7 @@ impl Future for Recycler { if self.inner.closed.load(Ordering::Acquire) { // `DisconnectPool` might still wait to be woken up. let mut exchange = self.inner.exchange.lock().unwrap(); - while let Some(w) = exchange.waiting.pop() { - w.wake(); - } + while exchange.waiting.wake() {} // we're about to exit, so there better be no outstanding connections assert_eq!(exchange.exist, 0); assert_eq!(exchange.available.len(), 0); diff --git a/src/conn/pool/waitlist.rs b/src/conn/pool/waitlist.rs index 390eb1d..5aeacc7 100644 --- a/src/conn/pool/waitlist.rs +++ b/src/conn/pool/waitlist.rs @@ -48,15 +48,17 @@ impl Waitlist { !occupied } - pub(crate) fn pop(&mut self) -> Option { + /// Returns `true` if anyone was awaken + pub(crate) fn wake(&mut self) -> bool { match self.queue.pop() { Some((qw, _)) => { self.metrics .active_wait_requests .fetch_sub(1, atomic::Ordering::Relaxed); - Some(qw.waker) + qw.waker.wake(); + true } - None => None, + None => false, } }