Skip to content

Commit 9ae7a7b

Browse files
committed
Convert poll_new_conn to an async function
I want to move the manual async bits to the Waitlist implementation, but for that I need to change the locks, which should be done in its own patch. This at least already makes the manual async code quite a bit smaller.
1 parent 9310a69 commit 9ae7a7b

File tree

2 files changed

+44
-16
lines changed

2 files changed

+44
-16
lines changed

src/conn/pool/futures/get_conn.rs

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66
// option. All files in the project carrying such notice may not be copied,
77
// modified, or distributed except according to those terms.
88

9-
use std::{fmt, future::poll_fn, task::Context};
9+
use std::fmt;
1010

1111
use crate::{
1212
conn::{
@@ -79,8 +79,7 @@ pub(crate) async fn get_conn(pool: Pool) -> Result<Conn> {
7979
match state.inner {
8080
GetConnInner::New => {
8181
let pool = state.pool_mut();
82-
let poll_new = |cx: &mut Context<'_>| pool.poll_new_conn(cx, queue_id);
83-
let next = poll_fn(poll_new).await?;
82+
let next = pool.new_conn(queue_id).await?;
8483
match next {
8584
GetConnInner::Connecting(conn_fut) => {
8685
state.inner = GetConnInner::Connecting(conn_fut);

src/conn/pool/mod.rs

Lines changed: 42 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ use std::{
1414
borrow::Borrow,
1515
cmp::Reverse,
1616
collections::VecDeque,
17+
future::poll_fn,
1718
future::Future,
1819
hash::{Hash, Hasher},
1920
str::FromStr,
@@ -336,12 +337,34 @@ impl Pool {
336337
}
337338
}
338339

339-
/// Poll the pool for an available connection.
340-
fn poll_new_conn(
341-
&mut self,
342-
cx: &mut Context<'_>,
343-
queue_id: QueueId,
344-
) -> Poll<Result<GetConnInner>> {
340+
fn queue(&mut self, cx: &mut Context<'_>, queue_id: QueueId) -> Poll<()> {
341+
let mut exchange = self.inner.exchange.lock().unwrap();
342+
exchange.waiting.push(cx.waker().clone(), queue_id);
343+
Poll::Ready(())
344+
}
345+
346+
fn poll_higher_priority(&mut self, cx: &mut Context<'_>, queue_id: QueueId) -> Poll<()> {
347+
let mut exchange = self.inner.exchange.lock().unwrap();
348+
let highest = if let Some(cur) = exchange.waiting.peek_id() {
349+
queue_id > cur
350+
} else {
351+
true
352+
};
353+
if highest {
354+
Poll::Ready(())
355+
} else {
356+
// to make sure the waker is updated
357+
exchange.waiting.push(cx.waker().clone(), queue_id);
358+
Poll::Pending
359+
}
360+
}
361+
362+
async fn queue_and_wait(&mut self, queue_id: QueueId) {
363+
poll_fn(|cx| self.queue(cx, queue_id)).await;
364+
poll_fn(|cx| self.poll_higher_priority(cx, queue_id)).await;
365+
}
366+
367+
fn try_new_conn(&mut self, queue_id: QueueId) -> Result<Option<GetConnInner>> {
345368
let mut exchange = self.inner.exchange.lock().unwrap();
346369

347370
// NOTE: this load must happen while we hold the lock,
@@ -362,8 +385,7 @@ impl Pool {
362385

363386
// If we are not, just queue
364387
if !highest {
365-
exchange.waiting.push(cx.waker().clone(), queue_id);
366-
return Poll::Pending;
388+
return Ok(None);
367389
}
368390

369391
#[allow(unused_variables)] // `since` is only used when `hdrhistogram` is enabled
@@ -379,7 +401,7 @@ impl Pool {
379401
#[cfg(feature = "hdrhistogram")]
380402
let metrics = self.metrics();
381403
conn.inner.active_since = Instant::now();
382-
return Poll::Ready(Ok(GetConnInner::Checking(
404+
return Ok(Some(GetConnInner::Checking(
383405
async move {
384406
conn.stream_mut()?.check().await?;
385407
#[cfg(feature = "hdrhistogram")]
@@ -419,7 +441,7 @@ impl Pool {
419441
#[cfg(feature = "hdrhistogram")]
420442
let metrics = self.metrics();
421443

422-
return Poll::Ready(Ok(GetConnInner::Connecting(
444+
return Ok(Some(GetConnInner::Connecting(
423445
async move {
424446
let conn = Conn::new(opts).await;
425447
#[cfg(feature = "hdrhistogram")]
@@ -437,10 +459,17 @@ impl Pool {
437459
.boxed(),
438460
)));
439461
}
462+
Ok(None)
463+
}
440464

441-
// Polled, but no conn available? Back into the queue.
442-
exchange.waiting.push(cx.waker().clone(), queue_id);
443-
Poll::Pending
465+
/// Get a new connection from the pool.
466+
async fn new_conn(&mut self, queue_id: QueueId) -> Result<GetConnInner> {
467+
loop {
468+
if let Some(conn) = self.try_new_conn(queue_id)? {
469+
return Ok(conn);
470+
}
471+
self.queue_and_wait(queue_id).await;
472+
}
444473
}
445474

446475
fn unqueue(&self, queue_id: QueueId) {

0 commit comments

Comments
 (0)