Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ console-subscriber = "0.4"
criterion = { version = "0.5", features = ["async_tokio", "html_reports"] }
fast-socks5 = "0.10"
futures = { version = "0.3.14", default-features = false, features = ["std"] }
ouroboros = "0.18.5"
pprof = { version = "0.15", features = ["flamegraph", "criterion"] }
socket2 = "0.6.0"
tracing-subscriber = { version = "0.3", features = ["env-filter"] }
Expand Down
11 changes: 7 additions & 4 deletions client/http-client/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -485,12 +485,12 @@ where
{
/// Send a subscription request to the server. Not implemented for HTTP; will always return
/// [`Error::HttpNotImplemented`].
fn subscribe<'a, N, Params>(
&self,
fn subscribe<'a, 'client, N, Params>(
&'client self,
_subscribe_method: &'a str,
_params: Params,
_unsubscribe_method: &'a str,
) -> impl Future<Output = Result<Subscription<N>, Error>>
) -> impl Future<Output = Result<Subscription<'client, Self, N>, Error>>
where
Params: ToRpcParams + Send,
N: DeserializeOwned,
Expand All @@ -499,7 +499,10 @@ where
}

/// Subscribe to a specific method. Not implemented for HTTP; will always return [`Error::HttpNotImplemented`].
fn subscribe_to_method<N>(&self, _method: &str) -> impl Future<Output = Result<Subscription<N>, Error>>
fn subscribe_to_method<'client, N>(
&'client self,
_method: &str,
) -> impl Future<Output = Result<Subscription<'client, Self, N>, Error>>
where
N: DeserializeOwned,
{
Expand Down
14 changes: 7 additions & 7 deletions client/ws-client/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ async fn subscription_works() {
let uri = to_ws_uri_string(server.local_addr());
let client = WsClientBuilder::default().build(&uri).with_default_timeout().await.unwrap().unwrap();
{
let mut sub: Subscription<String> = client
let mut sub: Subscription<_, String> = client
.subscribe("subscribe_hello", rpc_params![], "unsubscribe_hello")
.with_default_timeout()
.await
Expand All @@ -183,7 +183,7 @@ async fn notification_handler_works() {
let uri = to_ws_uri_string(server.local_addr());
let client = WsClientBuilder::default().build(&uri).with_default_timeout().await.unwrap().unwrap();
{
let mut nh: Subscription<String> =
let mut nh: Subscription<_, String> =
client.subscribe_to_method("test").with_default_timeout().await.unwrap().unwrap();
let response: String = nh.next().with_default_timeout().await.unwrap().unwrap().unwrap();
assert_eq!("server originated notification works".to_owned(), response);
Expand All @@ -203,7 +203,7 @@ async fn notification_no_params() {
let uri = to_ws_uri_string(server.local_addr());
let client = WsClientBuilder::default().build(&uri).with_default_timeout().await.unwrap().unwrap();
{
let mut nh: Subscription<serde_json::Value> =
let mut nh: Subscription<_, serde_json::Value> =
client.subscribe_to_method("no_params").with_default_timeout().await.unwrap().unwrap();
let response = nh.next().with_default_timeout().await.unwrap().unwrap().unwrap();
assert_eq!(response, serde_json::Value::Null);
Expand Down Expand Up @@ -244,15 +244,15 @@ async fn batched_notifs_works() {
// Ensure that subscription is returned back to the correct handle
// and is handled separately from ordinary notifications.
{
let mut nh: Subscription<String> =
let mut nh: Subscription<_, String> =
client.subscribe("sub", rpc_params![], "unsub").with_default_timeout().await.unwrap().unwrap();
let response: String = nh.next().with_default_timeout().await.unwrap().unwrap().unwrap();
assert_eq!("sub_notif", response);
}

// Ensure that method notif is returned back to the correct handle.
{
let mut nh: Subscription<String> =
let mut nh: Subscription<_, String> =
client.subscribe_to_method("sub").with_default_timeout().await.unwrap().unwrap();
let response: String = nh.next().with_default_timeout().await.unwrap().unwrap().unwrap();
assert_eq!("method_notif", response);
Expand All @@ -279,7 +279,7 @@ async fn notification_close_on_lagging() {
.await
.unwrap()
.unwrap();
let mut nh: Subscription<String> =
let mut nh: Subscription<_, String> =
client.subscribe_to_method("test").with_default_timeout().await.unwrap().unwrap();

// Don't poll the notification stream for 2 seconds, should be full now.
Expand All @@ -297,7 +297,7 @@ async fn notification_close_on_lagging() {
assert!(nh.next().with_default_timeout().await.unwrap().is_none());

// The same subscription should be possible to register again.
let mut other_nh: Subscription<String> =
let mut other_nh: Subscription<_, String> =
client.subscribe_to_method("test").with_default_timeout().await.unwrap().unwrap();

// check that the new subscription works.
Expand Down
12 changes: 6 additions & 6 deletions core/src/client/async_client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -596,12 +596,12 @@ where
///
/// The `subscribe_method` and `params` are used to ask for the subscription towards the
/// server. The `unsubscribe_method` is used to close the subscription.
fn subscribe<'a, Notif, Params>(
&self,
fn subscribe<'a, 'client, Notif, Params>(
&'client self,
subscribe_method: &'a str,
params: Params,
unsubscribe_method: &'a str,
) -> impl Future<Output = Result<Subscription<Notif>, Error>> + Send
) -> impl Future<Output = Result<Subscription<'client, Self, Notif>, Error>> + Send
where
Params: ToRpcParams + Send,
Notif: DeserializeOwned,
Expand Down Expand Up @@ -632,12 +632,12 @@ where
.await?
.into_subscription()
.expect("Extensions set to subscription, must return subscription; qed");
Ok(Subscription::new(self.to_back.clone(), sub.stream, SubscriptionKind::Subscription(sub.sub_id)))
Ok(Subscription::new(self, self.to_back.clone(), sub.stream, SubscriptionKind::Subscription(sub.sub_id)))
}
}

/// Subscribe to a specific method.
fn subscribe_to_method<N>(&self, method: &str) -> impl Future<Output = Result<Subscription<N>, Error>> + Send
fn subscribe_to_method<'client, N>(&'client self, method: &str) -> impl Future<Output = Result<Subscription<'client, Self, N>, Error>> + Send
where
N: DeserializeOwned,
{
Expand All @@ -664,7 +664,7 @@ where
Err(_) => return Err(self.on_disconnect().await),
};

Ok(Subscription::new(self.to_back.clone(), rx, SubscriptionKind::Method(method)))
Ok(Subscription::new(self, self.to_back.clone(), rx, SubscriptionKind::Method(method)))
}
}
}
Expand Down
29 changes: 15 additions & 14 deletions core/src/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -131,12 +131,12 @@ pub trait SubscriptionClientT: ClientT {
///
/// The `Notif` param is a generic type to receive generic subscriptions, see [`Subscription`] for further
/// documentation.
fn subscribe<'a, Notif, Params>(
&self,
fn subscribe<'a, 'client, Notif, Params>(
&'client self,
subscribe_method: &'a str,
params: Params,
unsubscribe_method: &'a str,
) -> impl Future<Output = Result<Subscription<Notif>, Error>> + Send
) -> impl Future<Output = Result<Subscription<'client, Self, Notif>, Error>> + Send
where
Params: ToRpcParams + Send,
Notif: DeserializeOwned;
Expand All @@ -145,10 +145,10 @@ pub trait SubscriptionClientT: ClientT {
///
/// The `Notif` param is a generic type to receive generic subscriptions, see [`Subscription`] for further
/// documentation.
fn subscribe_to_method<Notif>(
&self,
fn subscribe_to_method<'client, Notif>(
&'client self,
method: &str,
) -> impl Future<Output = Result<Subscription<Notif>, Error>> + Send
) -> impl Future<Output = Result<Subscription<'client, Self, Notif>, Error>> + Send
where
Notif: DeserializeOwned;
}
Expand Down Expand Up @@ -272,26 +272,27 @@ pub enum SubscriptionCloseReason {
/// You can call [`Subscription::close_reason`] to determine why
/// the subscription was closed.
#[derive(Debug)]
pub struct Subscription<Notif> {
pub struct Subscription<'a, Client: ?Sized, Notif> {
is_closed: bool,
/// Channel to send requests to the background task.
to_back: mpsc::Sender<FrontToBack>,
/// Channel from which we receive notifications from the server, as encoded JSON.
rx: SubscriptionReceiver,
/// Callback kind.
kind: Option<SubscriptionKind>,
_client: &'a Client,
/// Marker in order to pin the `Notif` parameter.
marker: PhantomData<Notif>,
}

// `Subscription` does not automatically implement this due to `PhantomData<Notif>`,
// but type type has no need to be pinned.
impl<Notif> std::marker::Unpin for Subscription<Notif> {}
impl<'a, Client: ?Sized, Notif> std::marker::Unpin for Subscription<'a, Client, Notif> {}

impl<Notif> Subscription<Notif> {
impl<'a, Client: ?Sized, Notif> Subscription<'a, Client, Notif> {
/// Create a new subscription.
fn new(to_back: mpsc::Sender<FrontToBack>, rx: SubscriptionReceiver, kind: SubscriptionKind) -> Self {
Self { to_back, rx, kind: Some(kind), marker: PhantomData, is_closed: false }
fn new(client: &'a Client, to_back: mpsc::Sender<FrontToBack>, rx: SubscriptionReceiver, kind: SubscriptionKind) -> Self {
Self { to_back, rx, kind: Some(kind), _client: client, marker: PhantomData, is_closed: false }
}

/// Return the subscription type and, if applicable, ID.
Expand Down Expand Up @@ -404,7 +405,7 @@ enum FrontToBack {
SubscriptionClosed(SubscriptionId<'static>),
}

impl<Notif> Subscription<Notif>
impl<'a, Client: ?Sized, Notif> Subscription<'a, Client, Notif>
where
Notif: DeserializeOwned,
{
Expand All @@ -421,7 +422,7 @@ where
}
}

impl<Notif> Stream for Subscription<Notif>
impl<'a, Client: ?Sized, Notif> Stream for Subscription<'a, Client, Notif>
where
Notif: DeserializeOwned,
{
Expand All @@ -439,7 +440,7 @@ where
}
}

impl<Notif> Drop for Subscription<Notif> {
impl<'a, Client: ?Sized, Notif> Drop for Subscription<'a, Client, Notif> {
fn drop(&mut self) {
// We can't actually guarantee that this goes through. If the background task is busy, then
// the channel's buffer will be full.
Expand Down
9 changes: 5 additions & 4 deletions examples/examples/client_subscription_drop_oldest_item.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ use std::time::Duration;

use futures::{Stream, StreamExt};
use jsonrpsee::core::DeserializeOwned;
use jsonrpsee::core::client::{Subscription, SubscriptionClientT};
use jsonrpsee::core::client::{Client, Subscription, SubscriptionClientT};
use jsonrpsee::rpc_params;
use jsonrpsee::server::{RpcModule, Server};
use jsonrpsee::ws_client::WsClientBuilder;
Expand All @@ -47,8 +47,9 @@ async fn main() -> anyhow::Result<()> {
let url = format!("ws://{}", addr);

let client = WsClientBuilder::default().build(&url).await?;
let client: &'static Client = Box::leak(Box::new(client));

let sub: Subscription<i32> = client.subscribe("subscribe_hello", rpc_params![], "unsubscribe_hello").await?;
let sub: Subscription<_, i32> = client.subscribe("subscribe_hello", rpc_params![], "unsubscribe_hello").await?;

// drop oldest messages from subscription:
let mut sub = drop_oldest_when_lagging(sub, 10);
Expand All @@ -73,8 +74,8 @@ async fn main() -> anyhow::Result<()> {
Ok(())
}

fn drop_oldest_when_lagging<T: Clone + DeserializeOwned + Send + Sync + 'static>(
mut sub: Subscription<T>,
fn drop_oldest_when_lagging<S: Sync + 'static, T: Clone + DeserializeOwned + Send + Sync + 'static>(
mut sub: Subscription<'static, S, T>,
buffer_size: usize,
) -> impl Stream<Item = Result<T, BroadcastStreamRecvError>> {
let (tx, rx) = tokio::sync::broadcast::channel(buffer_size);
Expand Down
2 changes: 1 addition & 1 deletion examples/examples/proc_macro.rs
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ async fn main() -> anyhow::Result<()> {
let client = WsClientBuilder::default().build(&url).await?;
assert_eq!(client.storage_keys(vec![1, 2, 3, 4], None::<ExampleHash>).await.unwrap(), vec![vec![1, 2, 3, 4]]);

let mut sub: Subscription<Vec<ExampleHash>> =
let mut sub: Subscription<_, Vec<ExampleHash>> =
RpcClient::<ExampleHash, ExampleStorageKey>::subscribe_storage(&client, None).await.unwrap();
assert_eq!(Some(vec![[0; 32]]), sub.next().await.transpose().unwrap());

Expand Down
4 changes: 2 additions & 2 deletions examples/examples/ws_pubsub_broadcast.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,8 @@ async fn main() -> anyhow::Result<()> {
WsClientBuilder::default().set_rpc_middleware(RpcServiceBuilder::new().rpc_logger(1024)).build(&url).await?;
let client2 =
WsClientBuilder::default().set_rpc_middleware(RpcServiceBuilder::new().rpc_logger(1024)).build(&url).await?;
let sub1: Subscription<i32> = client1.subscribe("subscribe_hello", rpc_params![], "unsubscribe_hello").await?;
let sub2: Subscription<i32> = client2.subscribe("subscribe_hello", rpc_params![], "unsubscribe_hello").await?;
let sub1: Subscription<_, i32> = client1.subscribe("subscribe_hello", rpc_params![], "unsubscribe_hello").await?;
let sub2: Subscription<_, i32> = client2.subscribe("subscribe_hello", rpc_params![], "unsubscribe_hello").await?;

let fut1 = sub1.take(NUM_SUBSCRIPTION_RESPONSES).for_each(|r| async move { tracing::info!("sub1 rx: {:?}", r) });
let fut2 = sub2.take(NUM_SUBSCRIPTION_RESPONSES).for_each(|r| async move { tracing::info!("sub2 rx: {:?}", r) });
Expand Down
4 changes: 2 additions & 2 deletions examples/examples/ws_pubsub_with_params.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,12 +49,12 @@ async fn main() -> anyhow::Result<()> {
let client = WsClientBuilder::default().build(&url).await?;

// Subscription with a single parameter
let mut sub_params_one: Subscription<Option<char>> =
let mut sub_params_one: Subscription<_, Option<char>> =
client.subscribe("sub_one_param", rpc_params![3], "unsub_one_param").await?;
tracing::info!("subscription with one param: {:?}", sub_params_one.next().await);

// Subscription with multiple parameters
let mut sub_params_two: Subscription<String> =
let mut sub_params_two: Subscription<_, String> =
client.subscribe("sub_params_two", rpc_params![2, 5], "unsub_params_two").await?;
tracing::info!("subscription with two params: {:?}", sub_params_two.next().await);

Expand Down
3 changes: 2 additions & 1 deletion proc-macros/src/render_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,8 @@ impl RpcDescription {
// into the `Subscription` object.
let sub_type = self.jrps_client_item(quote! { core::client::Subscription });
let item = &sub.item;
let returns = quote! { impl core::future::Future<Output = Result<#sub_type<#item>, #jrps_error>> + Send };
let returns =
quote! { impl core::future::Future<Output = Result<#sub_type<'_, Self, #item>, #jrps_error>> + Send };

// Encoded parameters for the request.
let parameter_builder = self.encode_params(&sub.params, &sub.param_kind, &sub.signature);
Expand Down
1 change: 1 addition & 0 deletions tests/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ hyper = { workspace = true }
hyper-util = { workspace = true, features = ["http1", "client", "client-legacy"] }
jsonrpsee = { path = "../jsonrpsee", features = ["server", "client-core", "http-client", "ws-client", "macros"] }
jsonrpsee-test-utils = { path = "../test-utils" }
ouroboros = { workspace = true }
serde = { workspace = true }
serde_json = { workspace = true }
tokio = { workspace = true, features = ["rt-multi-thread", "time"] }
Expand Down
Loading