From 746cf6851681c0c5490a474647033ea53ae0a93f Mon Sep 17 00:00:00 2001 From: Ash Manning <10554686+A-Manning@users.noreply.github.com> Date: Wed, 6 Aug 2025 16:31:15 +0800 Subject: [PATCH] Annotate subscriptions with lifetimes --- Cargo.toml | 1 + client/http-client/src/client.rs | 11 +- client/ws-client/src/tests.rs | 14 +- core/src/client/async_client/mod.rs | 12 +- core/src/client/mod.rs | 29 ++-- .../client_subscription_drop_oldest_item.rs | 9 +- examples/examples/proc_macro.rs | 2 +- examples/examples/ws_pubsub_broadcast.rs | 4 +- examples/examples/ws_pubsub_with_params.rs | 4 +- proc-macros/src/render_client.rs | 3 +- tests/Cargo.toml | 1 + tests/tests/integration_tests.rs | 126 ++++++++++++------ 12 files changed, 134 insertions(+), 82 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 93bc3e24cf..933e062f78 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -88,6 +88,7 @@ console-subscriber = "0.4" criterion = { version = "0.5", features = ["async_tokio", "html_reports"] } fast-socks5 = "0.10" futures = { version = "0.3.14", default-features = false, features = ["std"] } +ouroboros = "0.18.5" pprof = { version = "0.15", features = ["flamegraph", "criterion"] } socket2 = "0.6.0" tracing-subscriber = { version = "0.3", features = ["env-filter"] } diff --git a/client/http-client/src/client.rs b/client/http-client/src/client.rs index fb61cec16b..6a5ba16ec4 100644 --- a/client/http-client/src/client.rs +++ b/client/http-client/src/client.rs @@ -485,12 +485,12 @@ where { /// Send a subscription request to the server. Not implemented for HTTP; will always return /// [`Error::HttpNotImplemented`]. - fn subscribe<'a, N, Params>( - &self, + fn subscribe<'a, 'client, N, Params>( + &'client self, _subscribe_method: &'a str, _params: Params, _unsubscribe_method: &'a str, - ) -> impl Future, Error>> + ) -> impl Future, Error>> where Params: ToRpcParams + Send, N: DeserializeOwned, @@ -499,7 +499,10 @@ where } /// Subscribe to a specific method. Not implemented for HTTP; will always return [`Error::HttpNotImplemented`]. - fn subscribe_to_method(&self, _method: &str) -> impl Future, Error>> + fn subscribe_to_method<'client, N>( + &'client self, + _method: &str, + ) -> impl Future, Error>> where N: DeserializeOwned, { diff --git a/client/ws-client/src/tests.rs b/client/ws-client/src/tests.rs index eb0fd25110..c46d5cf762 100644 --- a/client/ws-client/src/tests.rs +++ b/client/ws-client/src/tests.rs @@ -159,7 +159,7 @@ async fn subscription_works() { let uri = to_ws_uri_string(server.local_addr()); let client = WsClientBuilder::default().build(&uri).with_default_timeout().await.unwrap().unwrap(); { - let mut sub: Subscription = client + let mut sub: Subscription<_, String> = client .subscribe("subscribe_hello", rpc_params![], "unsubscribe_hello") .with_default_timeout() .await @@ -183,7 +183,7 @@ async fn notification_handler_works() { let uri = to_ws_uri_string(server.local_addr()); let client = WsClientBuilder::default().build(&uri).with_default_timeout().await.unwrap().unwrap(); { - let mut nh: Subscription = + let mut nh: Subscription<_, String> = client.subscribe_to_method("test").with_default_timeout().await.unwrap().unwrap(); let response: String = nh.next().with_default_timeout().await.unwrap().unwrap().unwrap(); assert_eq!("server originated notification works".to_owned(), response); @@ -203,7 +203,7 @@ async fn notification_no_params() { let uri = to_ws_uri_string(server.local_addr()); let client = WsClientBuilder::default().build(&uri).with_default_timeout().await.unwrap().unwrap(); { - let mut nh: Subscription = + let mut nh: Subscription<_, serde_json::Value> = client.subscribe_to_method("no_params").with_default_timeout().await.unwrap().unwrap(); let response = nh.next().with_default_timeout().await.unwrap().unwrap().unwrap(); assert_eq!(response, serde_json::Value::Null); @@ -244,7 +244,7 @@ async fn batched_notifs_works() { // Ensure that subscription is returned back to the correct handle // and is handled separately from ordinary notifications. { - let mut nh: Subscription = + let mut nh: Subscription<_, String> = client.subscribe("sub", rpc_params![], "unsub").with_default_timeout().await.unwrap().unwrap(); let response: String = nh.next().with_default_timeout().await.unwrap().unwrap().unwrap(); assert_eq!("sub_notif", response); @@ -252,7 +252,7 @@ async fn batched_notifs_works() { // Ensure that method notif is returned back to the correct handle. { - let mut nh: Subscription = + let mut nh: Subscription<_, String> = client.subscribe_to_method("sub").with_default_timeout().await.unwrap().unwrap(); let response: String = nh.next().with_default_timeout().await.unwrap().unwrap().unwrap(); assert_eq!("method_notif", response); @@ -279,7 +279,7 @@ async fn notification_close_on_lagging() { .await .unwrap() .unwrap(); - let mut nh: Subscription = + let mut nh: Subscription<_, String> = client.subscribe_to_method("test").with_default_timeout().await.unwrap().unwrap(); // Don't poll the notification stream for 2 seconds, should be full now. @@ -297,7 +297,7 @@ async fn notification_close_on_lagging() { assert!(nh.next().with_default_timeout().await.unwrap().is_none()); // The same subscription should be possible to register again. - let mut other_nh: Subscription = + let mut other_nh: Subscription<_, String> = client.subscribe_to_method("test").with_default_timeout().await.unwrap().unwrap(); // check that the new subscription works. diff --git a/core/src/client/async_client/mod.rs b/core/src/client/async_client/mod.rs index 2cc4463e43..ca8604c1aa 100644 --- a/core/src/client/async_client/mod.rs +++ b/core/src/client/async_client/mod.rs @@ -596,12 +596,12 @@ where /// /// The `subscribe_method` and `params` are used to ask for the subscription towards the /// server. The `unsubscribe_method` is used to close the subscription. - fn subscribe<'a, Notif, Params>( - &self, + fn subscribe<'a, 'client, Notif, Params>( + &'client self, subscribe_method: &'a str, params: Params, unsubscribe_method: &'a str, - ) -> impl Future, Error>> + Send + ) -> impl Future, Error>> + Send where Params: ToRpcParams + Send, Notif: DeserializeOwned, @@ -632,12 +632,12 @@ where .await? .into_subscription() .expect("Extensions set to subscription, must return subscription; qed"); - Ok(Subscription::new(self.to_back.clone(), sub.stream, SubscriptionKind::Subscription(sub.sub_id))) + Ok(Subscription::new(self, self.to_back.clone(), sub.stream, SubscriptionKind::Subscription(sub.sub_id))) } } /// Subscribe to a specific method. - fn subscribe_to_method(&self, method: &str) -> impl Future, Error>> + Send + fn subscribe_to_method<'client, N>(&'client self, method: &str) -> impl Future, Error>> + Send where N: DeserializeOwned, { @@ -664,7 +664,7 @@ where Err(_) => return Err(self.on_disconnect().await), }; - Ok(Subscription::new(self.to_back.clone(), rx, SubscriptionKind::Method(method))) + Ok(Subscription::new(self, self.to_back.clone(), rx, SubscriptionKind::Method(method))) } } } diff --git a/core/src/client/mod.rs b/core/src/client/mod.rs index ff9371b9e8..3f394fac65 100644 --- a/core/src/client/mod.rs +++ b/core/src/client/mod.rs @@ -131,12 +131,12 @@ pub trait SubscriptionClientT: ClientT { /// /// The `Notif` param is a generic type to receive generic subscriptions, see [`Subscription`] for further /// documentation. - fn subscribe<'a, Notif, Params>( - &self, + fn subscribe<'a, 'client, Notif, Params>( + &'client self, subscribe_method: &'a str, params: Params, unsubscribe_method: &'a str, - ) -> impl Future, Error>> + Send + ) -> impl Future, Error>> + Send where Params: ToRpcParams + Send, Notif: DeserializeOwned; @@ -145,10 +145,10 @@ pub trait SubscriptionClientT: ClientT { /// /// The `Notif` param is a generic type to receive generic subscriptions, see [`Subscription`] for further /// documentation. - fn subscribe_to_method( - &self, + fn subscribe_to_method<'client, Notif>( + &'client self, method: &str, - ) -> impl Future, Error>> + Send + ) -> impl Future, Error>> + Send where Notif: DeserializeOwned; } @@ -272,7 +272,7 @@ pub enum SubscriptionCloseReason { /// You can call [`Subscription::close_reason`] to determine why /// the subscription was closed. #[derive(Debug)] -pub struct Subscription { +pub struct Subscription<'a, Client: ?Sized, Notif> { is_closed: bool, /// Channel to send requests to the background task. to_back: mpsc::Sender, @@ -280,18 +280,19 @@ pub struct Subscription { rx: SubscriptionReceiver, /// Callback kind. kind: Option, + _client: &'a Client, /// Marker in order to pin the `Notif` parameter. marker: PhantomData, } // `Subscription` does not automatically implement this due to `PhantomData`, // but type type has no need to be pinned. -impl std::marker::Unpin for Subscription {} +impl<'a, Client: ?Sized, Notif> std::marker::Unpin for Subscription<'a, Client, Notif> {} -impl Subscription { +impl<'a, Client: ?Sized, Notif> Subscription<'a, Client, Notif> { /// Create a new subscription. - fn new(to_back: mpsc::Sender, rx: SubscriptionReceiver, kind: SubscriptionKind) -> Self { - Self { to_back, rx, kind: Some(kind), marker: PhantomData, is_closed: false } + fn new(client: &'a Client, to_back: mpsc::Sender, rx: SubscriptionReceiver, kind: SubscriptionKind) -> Self { + Self { to_back, rx, kind: Some(kind), _client: client, marker: PhantomData, is_closed: false } } /// Return the subscription type and, if applicable, ID. @@ -404,7 +405,7 @@ enum FrontToBack { SubscriptionClosed(SubscriptionId<'static>), } -impl Subscription +impl<'a, Client: ?Sized, Notif> Subscription<'a, Client, Notif> where Notif: DeserializeOwned, { @@ -421,7 +422,7 @@ where } } -impl Stream for Subscription +impl<'a, Client: ?Sized, Notif> Stream for Subscription<'a, Client, Notif> where Notif: DeserializeOwned, { @@ -439,7 +440,7 @@ where } } -impl Drop for Subscription { +impl<'a, Client: ?Sized, Notif> Drop for Subscription<'a, Client, Notif> { fn drop(&mut self) { // We can't actually guarantee that this goes through. If the background task is busy, then // the channel's buffer will be full. diff --git a/examples/examples/client_subscription_drop_oldest_item.rs b/examples/examples/client_subscription_drop_oldest_item.rs index 12414c7ff9..3b1c032454 100644 --- a/examples/examples/client_subscription_drop_oldest_item.rs +++ b/examples/examples/client_subscription_drop_oldest_item.rs @@ -29,7 +29,7 @@ use std::time::Duration; use futures::{Stream, StreamExt}; use jsonrpsee::core::DeserializeOwned; -use jsonrpsee::core::client::{Subscription, SubscriptionClientT}; +use jsonrpsee::core::client::{Client, Subscription, SubscriptionClientT}; use jsonrpsee::rpc_params; use jsonrpsee::server::{RpcModule, Server}; use jsonrpsee::ws_client::WsClientBuilder; @@ -47,8 +47,9 @@ async fn main() -> anyhow::Result<()> { let url = format!("ws://{}", addr); let client = WsClientBuilder::default().build(&url).await?; + let client: &'static Client = Box::leak(Box::new(client)); - let sub: Subscription = client.subscribe("subscribe_hello", rpc_params![], "unsubscribe_hello").await?; + let sub: Subscription<_, i32> = client.subscribe("subscribe_hello", rpc_params![], "unsubscribe_hello").await?; // drop oldest messages from subscription: let mut sub = drop_oldest_when_lagging(sub, 10); @@ -73,8 +74,8 @@ async fn main() -> anyhow::Result<()> { Ok(()) } -fn drop_oldest_when_lagging( - mut sub: Subscription, +fn drop_oldest_when_lagging( + mut sub: Subscription<'static, S, T>, buffer_size: usize, ) -> impl Stream> { let (tx, rx) = tokio::sync::broadcast::channel(buffer_size); diff --git a/examples/examples/proc_macro.rs b/examples/examples/proc_macro.rs index 3e8bb32a8c..ee2e94961a 100644 --- a/examples/examples/proc_macro.rs +++ b/examples/examples/proc_macro.rs @@ -102,7 +102,7 @@ async fn main() -> anyhow::Result<()> { let client = WsClientBuilder::default().build(&url).await?; assert_eq!(client.storage_keys(vec![1, 2, 3, 4], None::).await.unwrap(), vec![vec![1, 2, 3, 4]]); - let mut sub: Subscription> = + let mut sub: Subscription<_, Vec> = RpcClient::::subscribe_storage(&client, None).await.unwrap(); assert_eq!(Some(vec![[0; 32]]), sub.next().await.transpose().unwrap()); diff --git a/examples/examples/ws_pubsub_broadcast.rs b/examples/examples/ws_pubsub_broadcast.rs index df68af5ad0..e5d974c5a9 100644 --- a/examples/examples/ws_pubsub_broadcast.rs +++ b/examples/examples/ws_pubsub_broadcast.rs @@ -55,8 +55,8 @@ async fn main() -> anyhow::Result<()> { WsClientBuilder::default().set_rpc_middleware(RpcServiceBuilder::new().rpc_logger(1024)).build(&url).await?; let client2 = WsClientBuilder::default().set_rpc_middleware(RpcServiceBuilder::new().rpc_logger(1024)).build(&url).await?; - let sub1: Subscription = client1.subscribe("subscribe_hello", rpc_params![], "unsubscribe_hello").await?; - let sub2: Subscription = client2.subscribe("subscribe_hello", rpc_params![], "unsubscribe_hello").await?; + let sub1: Subscription<_, i32> = client1.subscribe("subscribe_hello", rpc_params![], "unsubscribe_hello").await?; + let sub2: Subscription<_, i32> = client2.subscribe("subscribe_hello", rpc_params![], "unsubscribe_hello").await?; let fut1 = sub1.take(NUM_SUBSCRIPTION_RESPONSES).for_each(|r| async move { tracing::info!("sub1 rx: {:?}", r) }); let fut2 = sub2.take(NUM_SUBSCRIPTION_RESPONSES).for_each(|r| async move { tracing::info!("sub2 rx: {:?}", r) }); diff --git a/examples/examples/ws_pubsub_with_params.rs b/examples/examples/ws_pubsub_with_params.rs index 39bd77c52f..a3d57876cf 100644 --- a/examples/examples/ws_pubsub_with_params.rs +++ b/examples/examples/ws_pubsub_with_params.rs @@ -49,12 +49,12 @@ async fn main() -> anyhow::Result<()> { let client = WsClientBuilder::default().build(&url).await?; // Subscription with a single parameter - let mut sub_params_one: Subscription> = + let mut sub_params_one: Subscription<_, Option> = client.subscribe("sub_one_param", rpc_params![3], "unsub_one_param").await?; tracing::info!("subscription with one param: {:?}", sub_params_one.next().await); // Subscription with multiple parameters - let mut sub_params_two: Subscription = + let mut sub_params_two: Subscription<_, String> = client.subscribe("sub_params_two", rpc_params![2, 5], "unsub_params_two").await?; tracing::info!("subscription with two params: {:?}", sub_params_two.next().await); diff --git a/proc-macros/src/render_client.rs b/proc-macros/src/render_client.rs index 3f4adc200d..074b084022 100644 --- a/proc-macros/src/render_client.rs +++ b/proc-macros/src/render_client.rs @@ -186,7 +186,8 @@ impl RpcDescription { // into the `Subscription` object. let sub_type = self.jrps_client_item(quote! { core::client::Subscription }); let item = &sub.item; - let returns = quote! { impl core::future::Future, #jrps_error>> + Send }; + let returns = + quote! { impl core::future::Future, #jrps_error>> + Send }; // Encoded parameters for the request. let parameter_builder = self.encode_params(&sub.params, &sub.param_kind, &sub.signature); diff --git a/tests/Cargo.toml b/tests/Cargo.toml index c3780a471a..7c5b82f941 100644 --- a/tests/Cargo.toml +++ b/tests/Cargo.toml @@ -20,6 +20,7 @@ hyper = { workspace = true } hyper-util = { workspace = true, features = ["http1", "client", "client-legacy"] } jsonrpsee = { path = "../jsonrpsee", features = ["server", "client-core", "http-client", "ws-client", "macros"] } jsonrpsee-test-utils = { path = "../test-utils" } +ouroboros = { workspace = true } serde = { workspace = true } serde_json = { workspace = true } tokio = { workspace = true, features = ["rt-multi-thread", "time"] } diff --git a/tests/tests/integration_tests.rs b/tests/tests/integration_tests.rs index fde43082ec..5885d2ea9e 100644 --- a/tests/tests/integration_tests.rs +++ b/tests/tests/integration_tests.rs @@ -34,6 +34,7 @@ use std::sync::Arc; use std::sync::atomic::AtomicBool; use std::time::Duration; +use futures::FutureExt; use futures::stream::FuturesUnordered; use futures::{StreamExt, TryStreamExt, channel::mpsc}; use helpers::{ @@ -44,7 +45,7 @@ use http_body_util::BodyExt; use hyper::http::HeaderValue; use hyper_util::rt::TokioExecutor; use jsonrpsee::core::client::SubscriptionCloseReason; -use jsonrpsee::core::client::{ClientT, Error, IdKind, Subscription, SubscriptionClientT}; +use jsonrpsee::core::client::{Client, ClientT, Error, IdKind, Subscription, SubscriptionClientT}; use jsonrpsee::core::params::{ArrayParams, BatchRequestBuilder}; use jsonrpsee::core::{JsonValue, SubscriptionError}; use jsonrpsee::http_client::HttpClientBuilder; @@ -69,9 +70,9 @@ async fn ws_subscription_works() { let server_addr = server_with_subscription().await; let server_url = format!("ws://{}", server_addr); let client = WsClientBuilder::default().build(&server_url).await.unwrap(); - let mut hello_sub: Subscription = + let mut hello_sub: Subscription<_, String> = client.subscribe("subscribe_hello", rpc_params![], "unsubscribe_hello").await.unwrap(); - let mut foo_sub: Subscription = + let mut foo_sub: Subscription<_, u64> = client.subscribe("subscribe_foo", rpc_params![], "unsubscribe_foo").await.unwrap(); for _ in 0..10 { @@ -92,9 +93,9 @@ async fn ws_subscription_works_over_proxy_stream() { let socks_stream = connect_over_socks_stream(server_addr).await; let client = WsClientBuilder::default().build_with_stream(target_url, socks_stream).await.unwrap(); - let mut hello_sub: Subscription = + let mut hello_sub: Subscription<_, String> = client.subscribe("subscribe_hello", rpc_params![], "unsubscribe_hello").await.unwrap(); - let mut foo_sub: Subscription = + let mut foo_sub: Subscription<_, u64> = client.subscribe("subscribe_foo", rpc_params![], "unsubscribe_foo").await.unwrap(); for _ in 0..10 { @@ -114,7 +115,7 @@ async fn ws_unsubscription_works() { let server_url = format!("ws://{}", server_addr); let client = WsClientBuilder::default().build(&server_url).await.unwrap(); - let sub: Subscription = + let sub: Subscription<_, usize> = client.subscribe("subscribe_sleep", rpc_params![], "unsubscribe_sleep").await.unwrap(); sub.unsubscribe().with_default_timeout().await.unwrap().unwrap(); @@ -135,7 +136,7 @@ async fn ws_unsubscription_works_over_proxy_stream() { let socks_stream = connect_over_socks_stream(server_addr).await; let client = WsClientBuilder::default().build_with_stream(&server_url, socks_stream).await.unwrap(); - let sub: Subscription = + let sub: Subscription<_, usize> = client.subscribe("subscribe_sleep", rpc_params![], "unsubscribe_sleep").await.unwrap(); sub.unsubscribe().with_default_timeout().await.unwrap().unwrap(); @@ -152,7 +153,7 @@ async fn ws_subscription_with_input_works() { let server_addr = server_with_subscription().await; let server_url = format!("ws://{}", server_addr); let client = WsClientBuilder::default().build(&server_url).await.unwrap(); - let mut add_one: Subscription = + let mut add_one: Subscription<_, u64> = client.subscribe("subscribe_add_one", rpc_params![1], "unsubscribe_add_one").await.unwrap(); for i in 2..4 { @@ -171,7 +172,7 @@ async fn ws_subscription_with_input_works_over_proxy_stream() { let socks_stream = connect_over_socks_stream(server_addr).await; let client = WsClientBuilder::default().build_with_stream(&server_url, socks_stream).await.unwrap(); - let mut add_one: Subscription = + let mut add_one: Subscription<_, u64> = client.subscribe("subscribe_add_one", rpc_params![1], "unsubscribe_add_one").await.unwrap(); for i in 2..4 { @@ -307,14 +308,30 @@ async fn ws_subscription_several_clients() { let server_addr = server_with_subscription().await; let server_url = format!("ws://{}", server_addr); - let mut clients = Vec::with_capacity(10); + // Subscriptions with owned client + #[ouroboros::self_referencing] + struct ClientSubscriptions { + client: Client, + #[borrows(client)] + #[covariant] + hello: Subscription<'this, Client, JsonValue>, + #[borrows(client)] + #[covariant] + foo: Subscription<'this, Client, JsonValue>, + } + + let mut clients_subscriptions = Vec::with_capacity(10); for _ in 0..10 { let client = WsClientBuilder::default().build(&server_url).await.unwrap(); - let hello_sub: Subscription = - client.subscribe("subscribe_hello", rpc_params![], "unsubscribe_hello").await.unwrap(); - let foo_sub: Subscription = - client.subscribe("subscribe_foo", rpc_params![], "unsubscribe_foo").await.unwrap(); - clients.push((client, hello_sub, foo_sub)) + let client_subscriptions = ClientSubscriptionsAsyncTryBuilder { + client, + hello_builder: |client| client.subscribe("subscribe_hello", rpc_params![], "unsubscribe_hello").boxed(), + foo_builder: |client| client.subscribe("subscribe_foo", rpc_params![], "unsubscribe_foo").boxed(), + } + .try_build() + .await + .unwrap(); + clients_subscriptions.push(client_subscriptions) } } @@ -325,33 +342,58 @@ async fn ws_subscription_several_clients_with_drop() { let server_addr = server_with_subscription().await; let server_url = format!("ws://{}", server_addr); - let mut clients = Vec::with_capacity(10); + // Subscriptions with owned client + #[ouroboros::self_referencing] + struct ClientSubscriptions { + client: Client, + #[borrows(client)] + #[covariant] + hello: Subscription<'this, Client, JsonValue>, + #[borrows(client)] + #[covariant] + foo: Subscription<'this, Client, JsonValue>, + } + + impl ClientSubscriptions { + async fn next_hello<'a>(&'a mut self) -> Option> { + futures::future::poll_fn(move |cx| self.with_hello_mut(move |hello| hello.poll_next_unpin(cx))).await + } + + async fn next_foo<'a>(&'a mut self) -> Option> { + futures::future::poll_fn(move |cx| self.with_foo_mut(move |foo| foo.poll_next_unpin(cx))).await + } + } + + let mut clients_subscriptions = Vec::with_capacity(10); for _ in 0..10 { let client = WsClientBuilder::default() .max_buffer_capacity_per_subscription(u32::MAX as usize) .build(&server_url) .await .unwrap(); - let hello_sub: Subscription = - client.subscribe("subscribe_hello", rpc_params![], "unsubscribe_hello").await.unwrap(); - let foo_sub: Subscription = - client.subscribe("subscribe_foo", rpc_params![], "unsubscribe_foo").await.unwrap(); - clients.push((client, hello_sub, foo_sub)) + let client_subscriptions = ClientSubscriptionsAsyncTryBuilder { + client, + hello_builder: |client| client.subscribe("subscribe_hello", rpc_params![], "unsubscribe_hello").boxed(), + foo_builder: |client| client.subscribe("subscribe_foo", rpc_params![], "unsubscribe_foo").boxed(), + } + .try_build() + .await + .unwrap(); + clients_subscriptions.push(client_subscriptions) } for _ in 0..10 { - for (_client, hello_sub, foo_sub) in &mut clients { - let hello = hello_sub.next().await.unwrap().unwrap(); - let foo = foo_sub.next().await.unwrap().unwrap(); + for client_subscriptions in &mut clients_subscriptions { + let hello = client_subscriptions.next_hello().await.unwrap().unwrap(); + let foo = client_subscriptions.next_foo().await.unwrap().unwrap(); assert_eq!(&hello, "hello from subscription"); assert_eq!(foo, 1337); } } for i in 0..5 { - let (client, hello_sub, foo_sub) = clients.remove(i); - drop(hello_sub); - drop(foo_sub); + let client_subscriptions = clients_subscriptions.remove(i); + let client = client_subscriptions.into_heads().client; assert!(client.is_connected()); drop(client); } @@ -360,10 +402,10 @@ async fn ws_subscription_several_clients_with_drop() { // would be good to know that subscriptions actually were removed but not possible to verify at // this layer. for _ in 0..10 { - for (client, hello_sub, foo_sub) in &mut clients { - assert!(client.is_connected()); - let hello = hello_sub.next().await.unwrap().unwrap(); - let foo = foo_sub.next().await.unwrap().unwrap(); + for client_subscriptions in &mut clients_subscriptions { + assert!(client_subscriptions.borrow_client().is_connected()); + let hello = client_subscriptions.next_hello().await.unwrap().unwrap(); + let foo = client_subscriptions.next_foo().await.unwrap().unwrap(); assert_eq!(&hello, "hello from subscription"); assert_eq!(foo, 1337); } @@ -378,7 +420,7 @@ async fn ws_subscription_close_on_lagging() { let server_url = format!("ws://{}", server_addr); let client = WsClientBuilder::default().max_buffer_capacity_per_subscription(4).build(&server_url).await.unwrap(); - let mut hello_sub: Subscription = + let mut hello_sub: Subscription<_, JsonValue> = client.subscribe("subscribe_hello", rpc_params![], "unsubscribe_hello").await.unwrap(); // Don't poll the subscription stream for 2 seconds, should be full now. @@ -399,7 +441,7 @@ async fn ws_subscription_close_on_lagging() { let _hello_req: JsonValue = client.request("say_hello", rpc_params![]).await.unwrap(); // The same subscription should be possible to register again. - let mut other_sub: Subscription = + let mut other_sub: Subscription<_, JsonValue> = client.subscribe("subscribe_hello", rpc_params![], "unsubscribe_hello").await.unwrap(); assert!(other_sub.next().with_default_timeout().await.unwrap().is_some()); @@ -453,10 +495,10 @@ async fn ws_unsubscribe_releases_request_slots() { let client = WsClientBuilder::default().max_concurrent_requests(1).build(&server_url).await.unwrap(); - let sub1: Subscription = + let sub1: Subscription<_, JsonValue> = client.subscribe("subscribe_hello", rpc_params![], "unsubscribe_hello").await.unwrap(); drop(sub1); - let _: Subscription = + let _: Subscription<_, JsonValue> = client.subscribe("subscribe_hello", rpc_params![], "unsubscribe_hello").await.unwrap(); } @@ -469,7 +511,7 @@ async fn server_should_be_able_to_close_subscriptions() { let client = WsClientBuilder::default().build(&server_url).await.unwrap(); - let mut sub: Subscription = + let mut sub: Subscription<_, String> = client.subscribe("subscribe_noop", rpc_params![], "unsubscribe_noop").await.unwrap(); assert!(sub.next().await.is_none()); @@ -484,7 +526,7 @@ async fn ws_close_pending_subscription_when_server_terminated() { let c1 = WsClientBuilder::default().build(&server_url).await.unwrap(); - let mut sub: Subscription = + let mut sub: Subscription<_, String> = c1.subscribe("subscribe_hello", rpc_params![], "unsubscribe_hello").await.unwrap(); assert!(matches!(sub.next().await, Some(Ok(_)))); @@ -492,7 +534,7 @@ async fn ws_close_pending_subscription_when_server_terminated() { server_handle.stop().unwrap(); server_handle.stopped().await; - let sub2: Result, _> = + let sub2: Result, _> = c1.subscribe("subscribe_hello", rpc_params![], "unsubscribe_hello").await; // no new request should be accepted. @@ -546,12 +588,13 @@ async fn ws_server_should_stop_subscription_after_client_drop() { let client = WsClientBuilder::default().build(&server_url).await.unwrap(); - let mut sub: Subscription = + let mut sub: Subscription<_, usize> = client.subscribe("subscribe_hello", rpc_params![], "unsubscribe_hello").await.unwrap(); let res = sub.next().await.unwrap().unwrap(); assert_eq!(res, 1); + drop(sub); drop(client); let close_err = rx.next().await.unwrap(); @@ -674,6 +717,7 @@ async fn ws_server_cancels_subscriptions_on_reset_conn() { } // terminate connection. + drop(subs); drop(client); let rx_len = rx.take(10).fold(0, |acc, _| async move { acc + 1 }).await; @@ -689,9 +733,9 @@ async fn ws_server_subscribe_with_stream() { let server_url = format!("ws://{}", addr); let client = WsClientBuilder::default().build(&server_url).await.unwrap(); - let mut sub1: Subscription = + let mut sub1: Subscription<_, usize> = client.subscribe("subscribe_5_ints", rpc_params![], "unsubscribe_5_ints").await.unwrap(); - let mut sub2: Subscription = + let mut sub2: Subscription<_, usize> = client.subscribe("subscribe_5_ints", rpc_params![], "unsubscribe_5_ints").await.unwrap(); let (r1, r2) = futures::future::try_join(