diff --git a/client/http-client/src/client.rs b/client/http-client/src/client.rs index 81466914e5..0c802abc4a 100644 --- a/client/http-client/src/client.rs +++ b/client/http-client/src/client.rs @@ -519,6 +519,8 @@ where > + Send + Sync, { + type SubscriptionClient = Self; + /// Send a subscription request to the server. Not implemented for HTTP; will always return /// [`Error::HttpNotImplemented`]. fn subscribe<'a, N, Params>( @@ -526,7 +528,7 @@ where _subscribe_method: &'a str, _params: Params, _unsubscribe_method: &'a str, - ) -> impl Future, Error>> + ) -> impl Future, Error>> where Params: ToRpcParams + Send, N: DeserializeOwned, @@ -535,7 +537,10 @@ where } /// Subscribe to a specific method. Not implemented for HTTP; will always return [`Error::HttpNotImplemented`]. - fn subscribe_to_method(&self, _method: &str) -> impl Future, Error>> + fn subscribe_to_method( + &self, + _method: &str, + ) -> impl Future, Error>> where N: DeserializeOwned, { diff --git a/client/ws-client/src/tests.rs b/client/ws-client/src/tests.rs index eb0fd25110..c46d5cf762 100644 --- a/client/ws-client/src/tests.rs +++ b/client/ws-client/src/tests.rs @@ -159,7 +159,7 @@ async fn subscription_works() { let uri = to_ws_uri_string(server.local_addr()); let client = WsClientBuilder::default().build(&uri).with_default_timeout().await.unwrap().unwrap(); { - let mut sub: Subscription = client + let mut sub: Subscription<_, String> = client .subscribe("subscribe_hello", rpc_params![], "unsubscribe_hello") .with_default_timeout() .await @@ -183,7 +183,7 @@ async fn notification_handler_works() { let uri = to_ws_uri_string(server.local_addr()); let client = WsClientBuilder::default().build(&uri).with_default_timeout().await.unwrap().unwrap(); { - let mut nh: Subscription = + let mut nh: Subscription<_, String> = client.subscribe_to_method("test").with_default_timeout().await.unwrap().unwrap(); let response: String = nh.next().with_default_timeout().await.unwrap().unwrap().unwrap(); assert_eq!("server originated notification works".to_owned(), response); @@ -203,7 +203,7 @@ async fn notification_no_params() { let uri = to_ws_uri_string(server.local_addr()); let client = WsClientBuilder::default().build(&uri).with_default_timeout().await.unwrap().unwrap(); { - let mut nh: Subscription = + let mut nh: Subscription<_, serde_json::Value> = client.subscribe_to_method("no_params").with_default_timeout().await.unwrap().unwrap(); let response = nh.next().with_default_timeout().await.unwrap().unwrap().unwrap(); assert_eq!(response, serde_json::Value::Null); @@ -244,7 +244,7 @@ async fn batched_notifs_works() { // Ensure that subscription is returned back to the correct handle // and is handled separately from ordinary notifications. { - let mut nh: Subscription = + let mut nh: Subscription<_, String> = client.subscribe("sub", rpc_params![], "unsub").with_default_timeout().await.unwrap().unwrap(); let response: String = nh.next().with_default_timeout().await.unwrap().unwrap().unwrap(); assert_eq!("sub_notif", response); @@ -252,7 +252,7 @@ async fn batched_notifs_works() { // Ensure that method notif is returned back to the correct handle. { - let mut nh: Subscription = + let mut nh: Subscription<_, String> = client.subscribe_to_method("sub").with_default_timeout().await.unwrap().unwrap(); let response: String = nh.next().with_default_timeout().await.unwrap().unwrap().unwrap(); assert_eq!("method_notif", response); @@ -279,7 +279,7 @@ async fn notification_close_on_lagging() { .await .unwrap() .unwrap(); - let mut nh: Subscription = + let mut nh: Subscription<_, String> = client.subscribe_to_method("test").with_default_timeout().await.unwrap().unwrap(); // Don't poll the notification stream for 2 seconds, should be full now. @@ -297,7 +297,7 @@ async fn notification_close_on_lagging() { assert!(nh.next().with_default_timeout().await.unwrap().is_none()); // The same subscription should be possible to register again. - let mut other_nh: Subscription = + let mut other_nh: Subscription<_, String> = client.subscribe_to_method("test").with_default_timeout().await.unwrap().unwrap(); // check that the new subscription works. diff --git a/core/src/client/async_client/mod.rs b/core/src/client/async_client/mod.rs index 35695096ba..c9996ef84e 100644 --- a/core/src/client/async_client/mod.rs +++ b/core/src/client/async_client/mod.rs @@ -360,14 +360,16 @@ impl ClientBuilder { tokio::spawn(wait_for_shutdown(send_receive_task_sync_rx, client_dropped_rx, disconnect_reason.clone())); - Client { + let inner = ClientInner { to_back: to_back.clone(), service: self.service_builder.service(RpcService::new(to_back.clone())), request_timeout: self.request_timeout, error: ErrorFromBack::new(to_back, disconnect_reason), id_manager: RequestIdManager::new(self.id_kind), on_exit: Some(client_dropped_tx), - } + }; + + Client { inner: Arc::new(inner) } } /// Build the client with given transport. @@ -419,20 +421,21 @@ impl ClientBuilder { disconnect_reason.clone(), )); - Client { + let inner = ClientInner { to_back: to_back.clone(), service: self.service_builder.service(RpcService::new(to_back.clone())), request_timeout: self.request_timeout, error: ErrorFromBack::new(to_back, disconnect_reason), id_manager: RequestIdManager::new(self.id_kind), on_exit: Some(client_dropped_tx), - } + }; + + Client { inner: Arc::new(inner) } } } -/// Generic asynchronous client. #[derive(Debug)] -pub struct Client> { +struct ClientInner> { /// Channel to send requests to the background task. to_back: mpsc::Sender, error: ErrorFromBack, @@ -445,6 +448,21 @@ pub struct Client> { service: L, } +impl Drop for ClientInner { + fn drop(&mut self) { + if let Some(e) = self.on_exit.take() { + let _ = e.send(()); + } + } +} + +/// Generic asynchronous client. +#[derive(Debug)] +#[repr(transparent)] +pub struct Client> { + inner: Arc> +} + impl Client { /// Create a builder for the client. pub fn builder() -> ClientBuilder { @@ -455,13 +473,13 @@ impl Client { impl Client { /// Checks if the client is connected to the target. pub fn is_connected(&self) -> bool { - !self.to_back.is_closed() + !self.inner.to_back.is_closed() } async fn run_future_until_timeout(&self, fut: impl Future>) -> Result { tokio::pin!(fut); - match futures_util::future::select(fut, futures_timer::Delay::new(self.request_timeout)).await { + match futures_util::future::select(fut, futures_timer::Delay::new(self.inner.request_timeout)).await { Either::Left((Ok(r), _)) => Ok(r), Either::Left((Err(Error::ServiceDisconnect), _)) => Err(self.on_disconnect().await), Either::Left((Err(e), _)) => Err(e), @@ -476,20 +494,18 @@ impl Client { /// /// This method is cancel safe. pub async fn on_disconnect(&self) -> Error { - self.error.read_error().await + self.inner.error.read_error().await } /// Returns configured request timeout. pub fn request_timeout(&self) -> Duration { - self.request_timeout + self.inner.request_timeout } } -impl Drop for Client { - fn drop(&mut self) { - if let Some(e) = self.on_exit.take() { - let _ = e.send(()); - } +impl Clone for Client { + fn clone(&self) -> Self { + Self { inner: self.inner.clone() } } } @@ -508,9 +524,9 @@ where { async { // NOTE: we use this to guard against max number of concurrent requests. - let _req_id = self.id_manager.next_request_id(); + let _req_id = self.inner.id_manager.next_request_id(); let params = params.to_rpc_params()?.map(StdCow::Owned); - let fut = self.service.notification(jsonrpsee_types::Notification::new(method.into(), params)); + let fut = self.inner.service.notification(jsonrpsee_types::Notification::new(method.into(), params)); self.run_future_until_timeout(fut).await?; Ok(()) } @@ -522,9 +538,9 @@ where Params: ToRpcParams + Send, { async { - let id = self.id_manager.next_request_id(); + let id = self.inner.id_manager.next_request_id(); let params = params.to_rpc_params()?; - let fut = self.service.call(Request::borrowed(method, params.as_deref(), id.clone())); + let fut = self.inner.service.call(Request::borrowed(method, params.as_deref(), id.clone())); let rp = self.run_future_until_timeout(fut).await?; let success = ResponseSuccess::try_from(rp.into_response().into_inner())?; @@ -541,7 +557,7 @@ where { async { let batch = batch.build()?; - let id = self.id_manager.next_request_id(); + let id = self.inner.id_manager.next_request_id(); let id_range = generate_batch_id_range(id, batch.len() as u64)?; let mut b = Batch::with_capacity(batch.len()); @@ -549,7 +565,7 @@ where for ((method, params), id) in batch.into_iter().zip(id_range.clone()) { b.push(Request { jsonrpc: TwoPointZero, - id: self.id_manager.as_id_kind().into_id(id), + id: self.inner.id_manager.as_id_kind().into_id(id), method: method.into(), params: params.map(StdCow::Owned), extensions: Extensions::new(), @@ -558,7 +574,7 @@ where b.extensions_mut().insert(IsBatch { id_range }); - let fut = self.service.batch(b); + let fut = self.inner.service.batch(b); let json_values = self.run_future_until_timeout(fut).await?; let mut responses = Vec::with_capacity(json_values.len()); @@ -592,6 +608,8 @@ where > + Send + Sync, { + type SubscriptionClient = Self; + /// Send a subscription request to the server. /// /// The `subscribe_method` and `params` are used to ask for the subscription towards the @@ -601,7 +619,7 @@ where subscribe_method: &'a str, params: Params, unsubscribe_method: &'a str, - ) -> impl Future, Error>> + Send + ) -> impl Future, Error>> + Send where Params: ToRpcParams + Send, Notif: DeserializeOwned, @@ -611,8 +629,8 @@ where return Err(RegisterMethodError::SubscriptionNameConflict(unsubscribe_method.to_owned()).into()); } - let req_id_sub = self.id_manager.next_request_id(); - let req_id_unsub = self.id_manager.next_request_id(); + let req_id_sub = self.inner.id_manager.next_request_id(); + let req_id_unsub = self.inner.id_manager.next_request_id(); let params = params.to_rpc_params()?; let mut ext = Extensions::new(); @@ -626,24 +644,25 @@ where extensions: ext, }; - let fut = self.service.call(req); + let fut = self.inner.service.call(req); let sub = self .run_future_until_timeout(fut) .await? .into_subscription() .expect("Extensions set to subscription, must return subscription; qed"); - Ok(Subscription::new(self.to_back.clone(), sub.stream, SubscriptionKind::Subscription(sub.sub_id))) + Ok(Subscription::new(self.clone(), self.inner.to_back.clone(), sub.stream, SubscriptionKind::Subscription(sub.sub_id))) } } /// Subscribe to a specific method. - fn subscribe_to_method(&self, method: &str) -> impl Future, Error>> + Send + fn subscribe_to_method(&self, method: &str) -> impl Future, Error>> + Send where N: DeserializeOwned, { async { let (send_back_tx, send_back_rx) = oneshot::channel(); if self + .inner .to_back .clone() .send(FrontToBack::RegisterNotification(RegisterNotificationMessage { @@ -656,7 +675,7 @@ where return Err(self.on_disconnect().await); } - let res = call_with_timeout(self.request_timeout, send_back_rx).await; + let res = call_with_timeout(self.inner.request_timeout, send_back_rx).await; let (rx, method) = match res { Ok(Ok(val)) => val, @@ -664,7 +683,7 @@ where Err(_) => return Err(self.on_disconnect().await), }; - Ok(Subscription::new(self.to_back.clone(), rx, SubscriptionKind::Method(method))) + Ok(Subscription::new(self.clone(), self.inner.to_back.clone(), rx, SubscriptionKind::Method(method))) } } } diff --git a/core/src/client/mod.rs b/core/src/client/mod.rs index ff9371b9e8..f9ede2fa09 100644 --- a/core/src/client/mod.rs +++ b/core/src/client/mod.rs @@ -119,6 +119,8 @@ pub trait ClientT { /// [JSON-RPC](https://www.jsonrpc.org/specification) client interface that can make requests, notifications and subscriptions. pub trait SubscriptionClientT: ClientT { + type SubscriptionClient; + /// Initiate a subscription by performing a JSON-RPC method call where the server responds with /// a `Subscription ID` that is used to fetch messages on that subscription, /// @@ -136,7 +138,7 @@ pub trait SubscriptionClientT: ClientT { subscribe_method: &'a str, params: Params, unsubscribe_method: &'a str, - ) -> impl Future, Error>> + Send + ) -> impl Future, Error>> + Send where Params: ToRpcParams + Send, Notif: DeserializeOwned; @@ -148,7 +150,7 @@ pub trait SubscriptionClientT: ClientT { fn subscribe_to_method( &self, method: &str, - ) -> impl Future, Error>> + Send + ) -> impl Future, Error>> + Send where Notif: DeserializeOwned; } @@ -272,7 +274,7 @@ pub enum SubscriptionCloseReason { /// You can call [`Subscription::close_reason`] to determine why /// the subscription was closed. #[derive(Debug)] -pub struct Subscription { +pub struct Subscription { is_closed: bool, /// Channel to send requests to the background task. to_back: mpsc::Sender, @@ -282,16 +284,18 @@ pub struct Subscription { kind: Option, /// Marker in order to pin the `Notif` parameter. marker: PhantomData, + /// Keep client alive at least until subscription is dropped + _client: Client, } // `Subscription` does not automatically implement this due to `PhantomData`, // but type type has no need to be pinned. -impl std::marker::Unpin for Subscription {} +impl std::marker::Unpin for Subscription {} -impl Subscription { +impl Subscription { /// Create a new subscription. - fn new(to_back: mpsc::Sender, rx: SubscriptionReceiver, kind: SubscriptionKind) -> Self { - Self { to_back, rx, kind: Some(kind), marker: PhantomData, is_closed: false } + fn new(client: Client, to_back: mpsc::Sender, rx: SubscriptionReceiver, kind: SubscriptionKind) -> Self { + Self { _client: client, to_back, rx, kind: Some(kind), marker: PhantomData, is_closed: false } } /// Return the subscription type and, if applicable, ID. @@ -404,7 +408,7 @@ enum FrontToBack { SubscriptionClosed(SubscriptionId<'static>), } -impl Subscription +impl Subscription where Notif: DeserializeOwned, { @@ -421,7 +425,7 @@ where } } -impl Stream for Subscription +impl Stream for Subscription where Notif: DeserializeOwned, { @@ -439,7 +443,7 @@ where } } -impl Drop for Subscription { +impl Drop for Subscription { fn drop(&mut self) { // We can't actually guarantee that this goes through. If the background task is busy, then // the channel's buffer will be full. diff --git a/examples/examples/client_subscription_drop_oldest_item.rs b/examples/examples/client_subscription_drop_oldest_item.rs index 12414c7ff9..2679e41682 100644 --- a/examples/examples/client_subscription_drop_oldest_item.rs +++ b/examples/examples/client_subscription_drop_oldest_item.rs @@ -48,7 +48,7 @@ async fn main() -> anyhow::Result<()> { let client = WsClientBuilder::default().build(&url).await?; - let sub: Subscription = client.subscribe("subscribe_hello", rpc_params![], "unsubscribe_hello").await?; + let sub: Subscription<_, i32> = client.subscribe("subscribe_hello", rpc_params![], "unsubscribe_hello").await?; // drop oldest messages from subscription: let mut sub = drop_oldest_when_lagging(sub, 10); @@ -73,8 +73,8 @@ async fn main() -> anyhow::Result<()> { Ok(()) } -fn drop_oldest_when_lagging( - mut sub: Subscription, +fn drop_oldest_when_lagging( + mut sub: Subscription, buffer_size: usize, ) -> impl Stream> { let (tx, rx) = tokio::sync::broadcast::channel(buffer_size); diff --git a/examples/examples/proc_macro.rs b/examples/examples/proc_macro.rs index 3e8bb32a8c..ee2e94961a 100644 --- a/examples/examples/proc_macro.rs +++ b/examples/examples/proc_macro.rs @@ -102,7 +102,7 @@ async fn main() -> anyhow::Result<()> { let client = WsClientBuilder::default().build(&url).await?; assert_eq!(client.storage_keys(vec![1, 2, 3, 4], None::).await.unwrap(), vec![vec![1, 2, 3, 4]]); - let mut sub: Subscription> = + let mut sub: Subscription<_, Vec> = RpcClient::::subscribe_storage(&client, None).await.unwrap(); assert_eq!(Some(vec![[0; 32]]), sub.next().await.transpose().unwrap()); diff --git a/examples/examples/ws_pubsub_broadcast.rs b/examples/examples/ws_pubsub_broadcast.rs index df68af5ad0..e5d974c5a9 100644 --- a/examples/examples/ws_pubsub_broadcast.rs +++ b/examples/examples/ws_pubsub_broadcast.rs @@ -55,8 +55,8 @@ async fn main() -> anyhow::Result<()> { WsClientBuilder::default().set_rpc_middleware(RpcServiceBuilder::new().rpc_logger(1024)).build(&url).await?; let client2 = WsClientBuilder::default().set_rpc_middleware(RpcServiceBuilder::new().rpc_logger(1024)).build(&url).await?; - let sub1: Subscription = client1.subscribe("subscribe_hello", rpc_params![], "unsubscribe_hello").await?; - let sub2: Subscription = client2.subscribe("subscribe_hello", rpc_params![], "unsubscribe_hello").await?; + let sub1: Subscription<_, i32> = client1.subscribe("subscribe_hello", rpc_params![], "unsubscribe_hello").await?; + let sub2: Subscription<_, i32> = client2.subscribe("subscribe_hello", rpc_params![], "unsubscribe_hello").await?; let fut1 = sub1.take(NUM_SUBSCRIPTION_RESPONSES).for_each(|r| async move { tracing::info!("sub1 rx: {:?}", r) }); let fut2 = sub2.take(NUM_SUBSCRIPTION_RESPONSES).for_each(|r| async move { tracing::info!("sub2 rx: {:?}", r) }); diff --git a/examples/examples/ws_pubsub_with_params.rs b/examples/examples/ws_pubsub_with_params.rs index 39bd77c52f..a3d57876cf 100644 --- a/examples/examples/ws_pubsub_with_params.rs +++ b/examples/examples/ws_pubsub_with_params.rs @@ -49,12 +49,12 @@ async fn main() -> anyhow::Result<()> { let client = WsClientBuilder::default().build(&url).await?; // Subscription with a single parameter - let mut sub_params_one: Subscription> = + let mut sub_params_one: Subscription<_, Option> = client.subscribe("sub_one_param", rpc_params![3], "unsub_one_param").await?; tracing::info!("subscription with one param: {:?}", sub_params_one.next().await); // Subscription with multiple parameters - let mut sub_params_two: Subscription = + let mut sub_params_two: Subscription<_, String> = client.subscribe("sub_params_two", rpc_params![2, 5], "unsub_params_two").await?; tracing::info!("subscription with two params: {:?}", sub_params_two.next().await); diff --git a/proc-macros/src/render_client.rs b/proc-macros/src/render_client.rs index 3f4adc200d..52c9736547 100644 --- a/proc-macros/src/render_client.rs +++ b/proc-macros/src/render_client.rs @@ -185,8 +185,9 @@ impl RpcDescription { // `returns` represent the return type of the *rust method*, which is wrapped // into the `Subscription` object. let sub_type = self.jrps_client_item(quote! { core::client::Subscription }); + let super_trait = self.jrps_client_item(quote! { core::client::SubscriptionClientT }); let item = &sub.item; - let returns = quote! { impl core::future::Future, #jrps_error>> + Send }; + let returns = quote! { impl core::future::Future::SubscriptionClient, #item>, #jrps_error>> + Send }; // Encoded parameters for the request. let parameter_builder = self.encode_params(&sub.params, &sub.param_kind, &sub.signature); diff --git a/tests/tests/integration_tests.rs b/tests/tests/integration_tests.rs index fde43082ec..39a5209df8 100644 --- a/tests/tests/integration_tests.rs +++ b/tests/tests/integration_tests.rs @@ -69,9 +69,9 @@ async fn ws_subscription_works() { let server_addr = server_with_subscription().await; let server_url = format!("ws://{}", server_addr); let client = WsClientBuilder::default().build(&server_url).await.unwrap(); - let mut hello_sub: Subscription = + let mut hello_sub: Subscription<_, String> = client.subscribe("subscribe_hello", rpc_params![], "unsubscribe_hello").await.unwrap(); - let mut foo_sub: Subscription = + let mut foo_sub: Subscription<_, u64> = client.subscribe("subscribe_foo", rpc_params![], "unsubscribe_foo").await.unwrap(); for _ in 0..10 { @@ -92,9 +92,9 @@ async fn ws_subscription_works_over_proxy_stream() { let socks_stream = connect_over_socks_stream(server_addr).await; let client = WsClientBuilder::default().build_with_stream(target_url, socks_stream).await.unwrap(); - let mut hello_sub: Subscription = + let mut hello_sub: Subscription<_, String> = client.subscribe("subscribe_hello", rpc_params![], "unsubscribe_hello").await.unwrap(); - let mut foo_sub: Subscription = + let mut foo_sub: Subscription<_, u64> = client.subscribe("subscribe_foo", rpc_params![], "unsubscribe_foo").await.unwrap(); for _ in 0..10 { @@ -114,7 +114,7 @@ async fn ws_unsubscription_works() { let server_url = format!("ws://{}", server_addr); let client = WsClientBuilder::default().build(&server_url).await.unwrap(); - let sub: Subscription = + let sub: Subscription<_, usize> = client.subscribe("subscribe_sleep", rpc_params![], "unsubscribe_sleep").await.unwrap(); sub.unsubscribe().with_default_timeout().await.unwrap().unwrap(); @@ -135,7 +135,7 @@ async fn ws_unsubscription_works_over_proxy_stream() { let socks_stream = connect_over_socks_stream(server_addr).await; let client = WsClientBuilder::default().build_with_stream(&server_url, socks_stream).await.unwrap(); - let sub: Subscription = + let sub: Subscription<_, usize> = client.subscribe("subscribe_sleep", rpc_params![], "unsubscribe_sleep").await.unwrap(); sub.unsubscribe().with_default_timeout().await.unwrap().unwrap(); @@ -152,7 +152,7 @@ async fn ws_subscription_with_input_works() { let server_addr = server_with_subscription().await; let server_url = format!("ws://{}", server_addr); let client = WsClientBuilder::default().build(&server_url).await.unwrap(); - let mut add_one: Subscription = + let mut add_one: Subscription<_, u64> = client.subscribe("subscribe_add_one", rpc_params![1], "unsubscribe_add_one").await.unwrap(); for i in 2..4 { @@ -171,7 +171,7 @@ async fn ws_subscription_with_input_works_over_proxy_stream() { let socks_stream = connect_over_socks_stream(server_addr).await; let client = WsClientBuilder::default().build_with_stream(&server_url, socks_stream).await.unwrap(); - let mut add_one: Subscription = + let mut add_one: Subscription<_, u64> = client.subscribe("subscribe_add_one", rpc_params![1], "unsubscribe_add_one").await.unwrap(); for i in 2..4 { @@ -310,9 +310,9 @@ async fn ws_subscription_several_clients() { let mut clients = Vec::with_capacity(10); for _ in 0..10 { let client = WsClientBuilder::default().build(&server_url).await.unwrap(); - let hello_sub: Subscription = + let hello_sub: Subscription<_, JsonValue> = client.subscribe("subscribe_hello", rpc_params![], "unsubscribe_hello").await.unwrap(); - let foo_sub: Subscription = + let foo_sub: Subscription<_, JsonValue> = client.subscribe("subscribe_foo", rpc_params![], "unsubscribe_foo").await.unwrap(); clients.push((client, hello_sub, foo_sub)) } @@ -332,9 +332,9 @@ async fn ws_subscription_several_clients_with_drop() { .build(&server_url) .await .unwrap(); - let hello_sub: Subscription = + let hello_sub: Subscription<_, String> = client.subscribe("subscribe_hello", rpc_params![], "unsubscribe_hello").await.unwrap(); - let foo_sub: Subscription = + let foo_sub: Subscription<_, u64> = client.subscribe("subscribe_foo", rpc_params![], "unsubscribe_foo").await.unwrap(); clients.push((client, hello_sub, foo_sub)) } @@ -378,7 +378,7 @@ async fn ws_subscription_close_on_lagging() { let server_url = format!("ws://{}", server_addr); let client = WsClientBuilder::default().max_buffer_capacity_per_subscription(4).build(&server_url).await.unwrap(); - let mut hello_sub: Subscription = + let mut hello_sub: Subscription<_, JsonValue> = client.subscribe("subscribe_hello", rpc_params![], "unsubscribe_hello").await.unwrap(); // Don't poll the subscription stream for 2 seconds, should be full now. @@ -399,7 +399,7 @@ async fn ws_subscription_close_on_lagging() { let _hello_req: JsonValue = client.request("say_hello", rpc_params![]).await.unwrap(); // The same subscription should be possible to register again. - let mut other_sub: Subscription = + let mut other_sub: Subscription<_, JsonValue> = client.subscribe("subscribe_hello", rpc_params![], "unsubscribe_hello").await.unwrap(); assert!(other_sub.next().with_default_timeout().await.unwrap().is_some()); @@ -453,10 +453,10 @@ async fn ws_unsubscribe_releases_request_slots() { let client = WsClientBuilder::default().max_concurrent_requests(1).build(&server_url).await.unwrap(); - let sub1: Subscription = + let sub1: Subscription<_, JsonValue> = client.subscribe("subscribe_hello", rpc_params![], "unsubscribe_hello").await.unwrap(); drop(sub1); - let _: Subscription = + let _: Subscription<_, JsonValue> = client.subscribe("subscribe_hello", rpc_params![], "unsubscribe_hello").await.unwrap(); } @@ -469,7 +469,7 @@ async fn server_should_be_able_to_close_subscriptions() { let client = WsClientBuilder::default().build(&server_url).await.unwrap(); - let mut sub: Subscription = + let mut sub: Subscription<_, String> = client.subscribe("subscribe_noop", rpc_params![], "unsubscribe_noop").await.unwrap(); assert!(sub.next().await.is_none()); @@ -484,7 +484,7 @@ async fn ws_close_pending_subscription_when_server_terminated() { let c1 = WsClientBuilder::default().build(&server_url).await.unwrap(); - let mut sub: Subscription = + let mut sub: Subscription<_, String> = c1.subscribe("subscribe_hello", rpc_params![], "unsubscribe_hello").await.unwrap(); assert!(matches!(sub.next().await, Some(Ok(_)))); @@ -492,7 +492,7 @@ async fn ws_close_pending_subscription_when_server_terminated() { server_handle.stop().unwrap(); server_handle.stopped().await; - let sub2: Result, _> = + let sub2: Result, _> = c1.subscribe("subscribe_hello", rpc_params![], "unsubscribe_hello").await; // no new request should be accepted. @@ -512,7 +512,7 @@ async fn ws_close_pending_subscription_when_server_terminated() { } #[tokio::test] -async fn ws_server_should_stop_subscription_after_client_drop() { +async fn ws_server_should_stop_subscription_after_client_and_subscriptions_dropped() { use futures::{SinkExt, StreamExt, channel::mpsc}; use jsonrpsee::{RpcModule, server::ServerBuilder}; @@ -546,13 +546,14 @@ async fn ws_server_should_stop_subscription_after_client_drop() { let client = WsClientBuilder::default().build(&server_url).await.unwrap(); - let mut sub: Subscription = + let mut sub: Subscription<_, usize> = client.subscribe("subscribe_hello", rpc_params![], "unsubscribe_hello").await.unwrap(); let res = sub.next().await.unwrap().unwrap(); assert_eq!(res, 1); drop(client); + drop(sub); let close_err = rx.next().await.unwrap(); // assert that the server received `SubscriptionClosed` after the client was dropped. @@ -674,6 +675,7 @@ async fn ws_server_cancels_subscriptions_on_reset_conn() { } // terminate connection. + drop(subs); drop(client); let rx_len = rx.take(10).fold(0, |acc, _| async move { acc + 1 }).await; @@ -689,9 +691,9 @@ async fn ws_server_subscribe_with_stream() { let server_url = format!("ws://{}", addr); let client = WsClientBuilder::default().build(&server_url).await.unwrap(); - let mut sub1: Subscription = + let mut sub1: Subscription<_, usize> = client.subscribe("subscribe_5_ints", rpc_params![], "unsubscribe_5_ints").await.unwrap(); - let mut sub2: Subscription = + let mut sub2: Subscription<_, usize> = client.subscribe("subscribe_5_ints", rpc_params![], "unsubscribe_5_ints").await.unwrap(); let (r1, r2) = futures::future::try_join( diff --git a/tests/wasm-tests/tests/wasm.rs b/tests/wasm-tests/tests/wasm.rs index 8d90ff687f..adadbb2fa7 100644 --- a/tests/wasm-tests/tests/wasm.rs +++ b/tests/wasm-tests/tests/wasm.rs @@ -47,7 +47,7 @@ async fn rpc_method_call_works() { async fn rpc_subcription_works() { let client = WasmClientBuilder::default().build("ws://localhost:9944").await.unwrap(); - let mut sub: Subscription = + let mut sub: Subscription<_, serde_json::Value> = client.subscribe("state_subscribeStorage", rpc_params![], "state_unsubscribeStorage").await.unwrap(); for _ in 0..3 {