Skip to content

Commit c87b743

Browse files
committed
feat(pool): add a Singleton pool type
1 parent 3021828 commit c87b743

File tree

4 files changed

+342
-0
lines changed

4 files changed

+342
-0
lines changed

Cargo.toml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@ futures-util = { version = "0.3.16", default-features = false, features = ["allo
4242
http-body-util = "0.1.0"
4343
tokio = { version = "1", features = ["macros", "test-util", "signal"] }
4444
tokio-test = "0.4"
45+
tower-test = "0.4"
4546
pretty_env_logger = "0.5"
4647

4748
[target.'cfg(any(target_os = "linux", target_os = "macos"))'.dev-dependencies]
@@ -60,6 +61,7 @@ default = []
6061
full = [
6162
"client",
6263
"client-legacy",
64+
"client-pool",
6365
"client-proxy",
6466
"client-proxy-system",
6567
"server",
@@ -74,6 +76,7 @@ full = [
7476

7577
client = ["hyper/client", "tokio/net", "dep:tracing", "dep:futures-channel", "dep:tower-service"]
7678
client-legacy = ["client", "dep:socket2", "tokio/sync", "dep:libc", "dep:futures-util"]
79+
client-pool = []
7780
client-proxy = ["client", "dep:base64", "dep:ipnet", "dep:percent-encoding"]
7881
client-proxy-system = ["dep:system-configuration", "dep:windows-registry"]
7982

src/client/mod.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,5 +4,8 @@
44
#[cfg(feature = "client-legacy")]
55
pub mod legacy;
66

7+
#[cfg(feature = "client-pool")]
8+
pub mod pool;
9+
710
#[cfg(feature = "client-proxy")]
811
pub mod proxy;

src/client/pool/mod.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
//! Composable pool services
2+
3+
pub mod singleton;

src/client/pool/singleton.rs

Lines changed: 333 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,333 @@
1+
//! Singleton pools
2+
//!
3+
//! The singleton pool combines a MakeService that should only produce a single
4+
//! active connection. It can bundle all concurrent calls to it, so that only
5+
//! one connection is made. All calls to the singleton will return a clone of
6+
//! the inner service once established.
7+
//!
8+
//! This fits the HTTP/2 case well.
9+
10+
use std::sync::{Arc, Mutex};
11+
use std::task::{self, Poll};
12+
13+
use tokio::sync::oneshot;
14+
use tower_service::Service;
15+
16+
use self::internal::{DitchGuard, SingletonError, SingletonFuture};
17+
18+
type BoxError = Box<dyn std::error::Error + Send + Sync>;
19+
20+
/// A singleton pool over an inner service.
21+
///
22+
/// The singleton wraps an inner service maker, bundling all calls to ensure
23+
/// only one service is created. Once made, it returns clones of the made
24+
/// service.
25+
#[derive(Debug)]
26+
pub struct Singleton<M, Dst>
27+
where
28+
M: Service<Dst>,
29+
{
30+
mk_svc: M,
31+
state: Arc<Mutex<State<M::Response>>>,
32+
}
33+
34+
#[derive(Debug)]
35+
enum State<S> {
36+
Empty,
37+
Making(Vec<oneshot::Sender<S>>),
38+
Made(S),
39+
}
40+
41+
impl<M, Target> Singleton<M, Target>
42+
where
43+
M: Service<Target>,
44+
M::Response: Clone,
45+
{
46+
/// Create a new singleton pool over an inner make service.
47+
pub fn new(mk_svc: M) -> Self {
48+
Singleton {
49+
mk_svc,
50+
state: Arc::new(Mutex::new(State::Empty)),
51+
}
52+
}
53+
54+
// pub fn reset?
55+
// pub fn retain?
56+
}
57+
58+
impl<M, Target> Service<Target> for Singleton<M, Target>
59+
where
60+
M: Service<Target>,
61+
M::Response: Clone,
62+
M::Error: Into<BoxError>,
63+
{
64+
type Response = M::Response;
65+
type Error = SingletonError;
66+
type Future = SingletonFuture<M::Future, M::Response>;
67+
68+
fn poll_ready(&mut self, cx: &mut task::Context<'_>) -> Poll<Result<(), Self::Error>> {
69+
if let State::Empty = *self.state.lock().unwrap() {
70+
return self
71+
.mk_svc
72+
.poll_ready(cx)
73+
.map_err(|e| SingletonError(e.into()));
74+
}
75+
Poll::Ready(Ok(()))
76+
}
77+
78+
fn call(&mut self, dst: Target) -> Self::Future {
79+
let mut locked = self.state.lock().unwrap();
80+
match *locked {
81+
State::Empty => {
82+
let fut = self.mk_svc.call(dst);
83+
*locked = State::Making(Vec::new());
84+
SingletonFuture::Driving {
85+
future: fut,
86+
singleton: DitchGuard(Arc::downgrade(&self.state)),
87+
}
88+
}
89+
State::Making(ref mut waiters) => {
90+
let (tx, rx) = oneshot::channel();
91+
waiters.push(tx);
92+
SingletonFuture::Waiting { rx }
93+
}
94+
State::Made(ref svc) => SingletonFuture::Made {
95+
svc: Some(svc.clone()),
96+
},
97+
}
98+
}
99+
}
100+
101+
impl<M, Target> Clone for Singleton<M, Target>
102+
where
103+
M: Service<Target> + Clone,
104+
{
105+
fn clone(&self) -> Self {
106+
Self {
107+
mk_svc: self.mk_svc.clone(),
108+
state: self.state.clone(),
109+
}
110+
}
111+
}
112+
113+
// Holds some "pub" items that otherwise shouldn't be public.
114+
mod internal {
115+
use std::future::Future;
116+
use std::pin::Pin;
117+
use std::sync::{Mutex, Weak};
118+
use std::task::{self, Poll};
119+
120+
use futures_core::ready;
121+
use pin_project_lite::pin_project;
122+
use tokio::sync::oneshot;
123+
124+
use super::{BoxError, State};
125+
126+
pin_project! {
127+
#[project = SingletonFutureProj]
128+
pub enum SingletonFuture<F, S> {
129+
Driving {
130+
#[pin]
131+
future: F,
132+
singleton: DitchGuard<S>,
133+
},
134+
Waiting {
135+
rx: oneshot::Receiver<S>,
136+
},
137+
Made {
138+
svc: Option<S>,
139+
},
140+
}
141+
}
142+
143+
// XXX: pub because of the enum SingletonFuture
144+
pub struct DitchGuard<S>(pub(super) Weak<Mutex<State<S>>>);
145+
146+
impl<F, S, E> Future for SingletonFuture<F, S>
147+
where
148+
F: Future<Output = Result<S, E>>,
149+
E: Into<BoxError>,
150+
S: Clone,
151+
{
152+
type Output = Result<S, SingletonError>;
153+
154+
fn poll(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output> {
155+
match self.project() {
156+
SingletonFutureProj::Driving { future, singleton } => {
157+
match ready!(future.poll(cx)) {
158+
Ok(svc) => {
159+
if let Some(state) = singleton.0.upgrade() {
160+
let mut locked = state.lock().unwrap();
161+
singleton.0 = Weak::new();
162+
match std::mem::replace(&mut *locked, State::Made(svc.clone())) {
163+
State::Making(waiters) => {
164+
for tx in waiters {
165+
let _ = tx.send(svc.clone());
166+
}
167+
}
168+
State::Empty | State::Made(_) => {
169+
// shouldn't happen!
170+
}
171+
}
172+
}
173+
Poll::Ready(Ok(svc))
174+
}
175+
Err(e) => {
176+
if let Some(state) = singleton.0.upgrade() {
177+
let mut locked = state.lock().unwrap();
178+
singleton.0 = Weak::new();
179+
*locked = State::Empty;
180+
}
181+
Poll::Ready(Err(SingletonError(e.into())))
182+
}
183+
}
184+
}
185+
SingletonFutureProj::Waiting { rx } => match ready!(Pin::new(rx).poll(cx)) {
186+
Ok(svc) => Poll::Ready(Ok(svc)),
187+
Err(_canceled) => Poll::Ready(Err(SingletonError(Canceled.into()))),
188+
},
189+
SingletonFutureProj::Made { svc } => Poll::Ready(Ok(svc.take().unwrap())),
190+
}
191+
}
192+
}
193+
194+
impl<S> Drop for DitchGuard<S> {
195+
fn drop(&mut self) {
196+
if let Some(state) = self.0.upgrade() {
197+
if let Ok(mut locked) = state.lock() {
198+
*locked = State::Empty;
199+
}
200+
}
201+
}
202+
}
203+
204+
// An opaque error type. By not exposing the type, nor being specifically
205+
// Box<dyn Error>, we can _change_ the type once we no longer need the Canceled
206+
// error type. This will be possible with the refactor to baton passing.
207+
#[derive(Debug)]
208+
pub struct SingletonError(pub(super) BoxError);
209+
210+
impl std::fmt::Display for SingletonError {
211+
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
212+
f.write_str("singleton connection error")
213+
}
214+
}
215+
216+
impl std::error::Error for SingletonError {
217+
fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
218+
Some(&*self.0)
219+
}
220+
}
221+
222+
#[derive(Debug)]
223+
struct Canceled;
224+
225+
impl std::fmt::Display for Canceled {
226+
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
227+
f.write_str("singleton connection canceled")
228+
}
229+
}
230+
231+
impl std::error::Error for Canceled {}
232+
}
233+
234+
#[cfg(test)]
235+
mod tests {
236+
use std::future::Future;
237+
use std::pin::Pin;
238+
use std::task::Poll;
239+
240+
use tower_service::Service;
241+
242+
use super::Singleton;
243+
244+
#[tokio::test]
245+
async fn first_call_drives_subsequent_wait() {
246+
let (mock_svc, mut handle) = tower_test::mock::pair::<(), &'static str>();
247+
248+
let mut singleton = Singleton::new(mock_svc);
249+
250+
handle.allow(1);
251+
crate::common::future::poll_fn(|cx| singleton.poll_ready(cx))
252+
.await
253+
.unwrap();
254+
// First call: should go into Driving
255+
let fut1 = singleton.call(());
256+
// Second call: should go into Waiting
257+
let fut2 = singleton.call(());
258+
259+
// Expect exactly one request to the inner service
260+
let ((), send_response) = handle.next_request().await.unwrap();
261+
send_response.send_response("svc");
262+
263+
// Both futures should resolve to the same value
264+
assert_eq!(fut1.await.unwrap(), "svc");
265+
assert_eq!(fut2.await.unwrap(), "svc");
266+
}
267+
268+
#[tokio::test]
269+
async fn made_state_returns_immediately() {
270+
let (mock_svc, mut handle) = tower_test::mock::pair::<(), &'static str>();
271+
let mut singleton = Singleton::new(mock_svc);
272+
273+
handle.allow(1);
274+
crate::common::future::poll_fn(|cx| singleton.poll_ready(cx))
275+
.await
276+
.unwrap();
277+
// Drive first call to completion
278+
let fut1 = singleton.call(());
279+
let ((), send_response) = handle.next_request().await.unwrap();
280+
send_response.send_response("svc");
281+
assert_eq!(fut1.await.unwrap(), "svc");
282+
283+
// Second call should not hit inner service
284+
let res = singleton.call(()).await.unwrap();
285+
assert_eq!(res, "svc");
286+
}
287+
288+
#[tokio::test]
289+
async fn cancel_waiter_does_not_affect_others() {
290+
let (mock_svc, mut handle) = tower_test::mock::pair::<(), &'static str>();
291+
let mut singleton = Singleton::new(mock_svc);
292+
293+
crate::common::future::poll_fn(|cx| singleton.poll_ready(cx))
294+
.await
295+
.unwrap();
296+
let fut1 = singleton.call(());
297+
let fut2 = singleton.call(());
298+
drop(fut2); // cancel one waiter
299+
300+
let ((), send_response) = handle.next_request().await.unwrap();
301+
send_response.send_response("svc");
302+
303+
assert_eq!(fut1.await.unwrap(), "svc");
304+
}
305+
306+
// TODO: this should be able to be improved with a cooperative baton refactor
307+
#[tokio::test]
308+
async fn cancel_driver_cancels_all() {
309+
let (mock_svc, mut handle) = tower_test::mock::pair::<(), &'static str>();
310+
let mut singleton = Singleton::new(mock_svc);
311+
312+
crate::common::future::poll_fn(|cx| singleton.poll_ready(cx))
313+
.await
314+
.unwrap();
315+
let mut fut1 = singleton.call(());
316+
let fut2 = singleton.call(());
317+
318+
// poll driver just once, and then drop
319+
crate::common::future::poll_fn(move |cx| {
320+
let _ = Pin::new(&mut fut1).poll(cx);
321+
Poll::Ready(())
322+
})
323+
.await;
324+
325+
let ((), send_response) = handle.next_request().await.unwrap();
326+
send_response.send_response("svc");
327+
328+
assert_eq!(
329+
fut2.await.unwrap_err().0.to_string(),
330+
"singleton connection canceled"
331+
);
332+
}
333+
}

0 commit comments

Comments
 (0)