Skip to content

Commit 55d1a0f

Browse files
committed
feat: Add ackable events
1 parent a4e5287 commit 55d1a0f

File tree

7 files changed

+358
-51
lines changed

7 files changed

+358
-51
lines changed

socketio/src/asynchronous/client/builder.rs

Lines changed: 80 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -196,6 +196,51 @@ impl ClientBuilder {
196196
self
197197
}
198198

199+
/// Registers a new callback for a certain [`crate::event::Event`] that expects the client to
200+
/// ack. The event could either be one of the common events like `message`, `error`, `open`,
201+
/// `close` or a custom event defined by a string, e.g. `onPayment` or `foo`.
202+
///
203+
/// # Example
204+
/// ```rust
205+
/// use rust_socketio::{asynchronous::{ClientBuilder, Client}, i32, Payload};
206+
/// use futures_util::FutureExt;
207+
///
208+
/// #[tokio::main]
209+
/// async fn main() {
210+
/// let socket = ClientBuilder::new("http://localhost:4200/")
211+
/// .namespace("/admin")
212+
/// .on_with_ack("test", |payload: Payload, client: Client, ack: i32| {
213+
/// async move {
214+
/// match payload {
215+
/// Payload::Text(values) => println!("Received: {:#?}", values),
216+
/// Payload::Binary(bin_data) => println!("Received bytes: {:#?}", bin_data),
217+
/// // This is deprecated, use Payload::Text instead
218+
/// Payload::String(str) => println!("Received: {}", str),
219+
/// }
220+
/// client.ack(ack, "received").await;
221+
/// }
222+
/// .boxed()
223+
/// })
224+
/// .on("error", |err, _| async move { eprintln!("Error: {:#?}", err) }.boxed())
225+
/// .connect()
226+
/// .await;
227+
/// }
228+
///
229+
#[cfg(feature = "async-callbacks")]
230+
pub fn on_with_ack<T: Into<Event>, F>(mut self, event: T, callback: F) -> Self
231+
where
232+
F: for<'a> std::ops::FnMut(Payload, Client, i32) -> BoxFuture<'static, ()>
233+
+ 'static
234+
+ Send
235+
+ Sync,
236+
{
237+
self.on.insert(
238+
event.into(),
239+
Callback::<DynAsyncCallback>::new_with_ack(callback),
240+
);
241+
self
242+
}
243+
199244
/// Registers a callback for reconnect events. The event handler must return
200245
/// a [ReconnectSettings] struct with the settings that should be updated.
201246
///
@@ -263,6 +308,41 @@ impl ClientBuilder {
263308
self
264309
}
265310

311+
/// Registers a Callback for all [`crate::event::Event::Custom`] and
312+
/// [`crate::event::Event::Message`] that expect the client to ack.
313+
///
314+
/// # Example
315+
/// ```rust
316+
/// use rust_socketio::{asynchronous::ClientBuilder, Payload};
317+
/// use futures_util::future::FutureExt;
318+
///
319+
/// #[tokio::main]
320+
/// async fn main() {
321+
/// let client = ClientBuilder::new("http://localhost:4200/")
322+
/// .namespace("/admin")
323+
/// .on_any_with_ack(|event, payload, client, ack| {
324+
/// async {
325+
/// if let Payload::String(str) = payload {
326+
/// println!("{}: {}", String::from(event), str);
327+
/// }
328+
/// client.ack(ack, "received").await;
329+
/// }.boxed()
330+
/// })
331+
/// .connect()
332+
/// .await;
333+
/// }
334+
/// ```
335+
pub fn on_any_with_ack<F>(mut self, callback: F) -> Self
336+
where
337+
F: for<'a> FnMut(Event, Payload, Client, i32) -> BoxFuture<'static, ()>
338+
+ 'static
339+
+ Send
340+
+ Sync,
341+
{
342+
self.on_any = Some(Callback::<DynAsyncAnyCallback>::new_with_ack(callback));
343+
self
344+
}
345+
266346
/// Uses a preconfigured TLS connector for secure communication. This configures
267347
/// both the `polling` as well as the `websocket` transport type.
268348
/// # Example

socketio/src/asynchronous/client/callback.rs

Lines changed: 55 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
1-
use futures_util::future::BoxFuture;
1+
use futures_util::{future::BoxFuture, FutureExt};
22
use std::{
33
fmt::Debug,
4+
future::Future,
45
ops::{Deref, DerefMut},
56
};
67

@@ -9,11 +10,18 @@ use crate::{Event, Payload};
910
use super::client::{Client, ReconnectSettings};
1011

1112
/// Internal type, provides a way to store futures and return them in a boxed manner.
12-
pub(crate) type DynAsyncCallback =
13-
Box<dyn for<'a> FnMut(Payload, Client) -> BoxFuture<'static, ()> + 'static + Send + Sync>;
13+
pub(crate) type DynAsyncCallback = Box<
14+
dyn for<'a> FnMut(Payload, Client, Option<i32>) -> BoxFuture<'static, ()>
15+
+ 'static
16+
+ Send
17+
+ Sync,
18+
>;
1419

1520
pub(crate) type DynAsyncAnyCallback = Box<
16-
dyn for<'a> FnMut(Event, Payload, Client) -> BoxFuture<'static, ()> + 'static + Send + Sync,
21+
dyn for<'a> FnMut(Event, Payload, Client, Option<i32>) -> BoxFuture<'static, ()>
22+
+ 'static
23+
+ Send
24+
+ Sync,
1725
>;
1826

1927
pub(crate) type DynAsyncReconnectSettingsCallback =
@@ -30,8 +38,10 @@ impl<T> Debug for Callback<T> {
3038
}
3139

3240
impl Deref for Callback<DynAsyncCallback> {
33-
type Target =
34-
dyn for<'a> FnMut(Payload, Client) -> BoxFuture<'static, ()> + 'static + Sync + Send;
41+
type Target = dyn for<'a> FnMut(Payload, Client, Option<i32>) -> BoxFuture<'static, ()>
42+
+ 'static
43+
+ Sync
44+
+ Send;
3545

3646
fn deref(&self) -> &Self::Target {
3747
self.inner.as_ref()
@@ -45,19 +55,34 @@ impl DerefMut for Callback<DynAsyncCallback> {
4555
}
4656

4757
impl Callback<DynAsyncCallback> {
48-
pub(crate) fn new<T>(callback: T) -> Self
58+
pub(crate) fn new_with_ack<T>(mut callback: T) -> Self
4959
where
50-
T: for<'a> FnMut(Payload, Client) -> BoxFuture<'static, ()> + 'static + Sync + Send,
60+
T: for<'a> FnMut(Payload, Client, i32) -> BoxFuture<'static, ()> + 'static + Sync + Send,
5161
{
5262
Callback {
53-
inner: Box::new(callback),
63+
inner: Box::new(move |p, c, a| match a {
64+
Some(a) => callback(p, c, a).boxed(),
65+
None => std::future::ready(()).boxed(),
66+
}),
67+
}
68+
}
69+
70+
pub(crate) fn new<T, Fut>(mut callback: T) -> Self
71+
where
72+
T: FnMut(Payload, Client) -> Fut + Sync + Send + 'static,
73+
Fut: Future<Output = ()> + 'static + Send,
74+
{
75+
Callback {
76+
inner: Box::new(move |p, c, _a| callback(p, c).boxed()),
5477
}
5578
}
5679
}
5780

5881
impl Deref for Callback<DynAsyncAnyCallback> {
59-
type Target =
60-
dyn for<'a> FnMut(Event, Payload, Client) -> BoxFuture<'static, ()> + 'static + Sync + Send;
82+
type Target = dyn for<'a> FnMut(Event, Payload, Client, Option<i32>) -> BoxFuture<'static, ()>
83+
+ 'static
84+
+ Sync
85+
+ Send;
6186

6287
fn deref(&self) -> &Self::Target {
6388
self.inner.as_ref()
@@ -71,12 +96,28 @@ impl DerefMut for Callback<DynAsyncAnyCallback> {
7196
}
7297

7398
impl Callback<DynAsyncAnyCallback> {
74-
pub(crate) fn new<T>(callback: T) -> Self
99+
pub(crate) fn new_with_ack<T>(mut callback: T) -> Self
75100
where
76-
T: for<'a> FnMut(Event, Payload, Client) -> BoxFuture<'static, ()> + 'static + Sync + Send,
101+
T: for<'a> FnMut(Event, Payload, Client, i32) -> BoxFuture<'static, ()>
102+
+ 'static
103+
+ Sync
104+
+ Send,
77105
{
78106
Callback {
79-
inner: Box::new(callback),
107+
inner: Box::new(move |e, p, c, a| match a {
108+
Some(a) => callback(e, p, c, a).boxed(),
109+
None => std::future::ready(()).boxed(),
110+
}),
111+
}
112+
}
113+
114+
pub(crate) fn new<T, Fut>(mut callback: T) -> Self
115+
where
116+
T: FnMut(Event, Payload, Client) -> Fut + Sync + Send + 'static,
117+
Fut: Future<Output = ()> + 'static + Send,
118+
{
119+
Callback {
120+
inner: Box::new(move |e, p, c, _a| callback(e, p, c).boxed()),
80121
}
81122
}
82123
}

socketio/src/asynchronous/client/client.rs

Lines changed: 39 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -188,7 +188,7 @@ impl Client {
188188
// We don't need to do that in the other cases, since proper server close
189189
// and manual client close are handled explicitly.
190190
if let Some(err) = client_clone
191-
.callback(&Event::Close, CloseReason::TransportClose.as_str())
191+
.callback(&Event::Close, CloseReason::TransportClose.as_str(), None)
192192
.await
193193
.err()
194194
{
@@ -410,19 +410,33 @@ impl Client {
410410
self.socket.read().await.send(socket_packet).await
411411
}
412412

413-
async fn callback<P: Into<Payload>>(&self, event: &Event, payload: P) -> Result<()> {
413+
pub async fn ack<D>(&self, ack_id: i32, data: D) -> Result<()>
414+
where
415+
D: Into<Payload>,
416+
{
417+
let socket_packet = Packet::new_ack(data.into(), &self.nsp, ack_id);
418+
419+
self.socket.read().await.send(socket_packet).await
420+
}
421+
422+
async fn callback<P: Into<Payload>>(
423+
&self,
424+
event: &Event,
425+
payload: P,
426+
ack_id: Option<i32>,
427+
) -> Result<()> {
414428
let mut builder = self.builder.write().await;
415429
let payload = payload.into();
416430

417431
if let Some(callback) = builder.on.get_mut(event) {
418-
callback(payload.clone(), self.clone()).await;
432+
callback(payload.clone(), self.clone(), ack_id).await;
419433
}
420434

421435
// Call on_any for all common and custom events.
422436
match event {
423437
Event::Message | Event::Custom(_) => {
424438
if let Some(callback) = builder.on_any.as_mut() {
425-
callback(event.clone(), payload, self.clone()).await;
439+
callback(event.clone(), payload, self.clone(), ack_id).await;
426440
}
427441
}
428442
_ => (),
@@ -445,6 +459,7 @@ impl Client {
445459
ack.callback.deref_mut()(
446460
Payload::from(payload.to_owned()),
447461
self.clone(),
462+
None,
448463
)
449464
.await;
450465
}
@@ -453,6 +468,7 @@ impl Client {
453468
ack.callback.deref_mut()(
454469
Payload::Binary(payload.to_owned()),
455470
self.clone(),
471+
None,
456472
)
457473
.await;
458474
}
@@ -480,8 +496,12 @@ impl Client {
480496

481497
if let Some(attachments) = &packet.attachments {
482498
if let Some(binary_payload) = attachments.get(0) {
483-
self.callback(&event, Payload::Binary(binary_payload.to_owned()))
484-
.await?;
499+
self.callback(
500+
&event,
501+
Payload::Binary(binary_payload.to_owned()),
502+
packet.id,
503+
)
504+
.await?;
485505
}
486506
}
487507
Ok(())
@@ -514,7 +534,7 @@ impl Client {
514534
};
515535

516536
// call the correct callback
517-
self.callback(&event, payloads.to_vec()).await?;
537+
self.callback(&event, payloads.to_vec(), packet.id).await?;
518538
}
519539

520540
Ok(())
@@ -529,23 +549,27 @@ impl Client {
529549
match packet.packet_type {
530550
PacketId::Ack | PacketId::BinaryAck => {
531551
if let Err(err) = self.handle_ack(packet).await {
532-
self.callback(&Event::Error, err.to_string()).await?;
552+
self.callback(&Event::Error, err.to_string(), None).await?;
533553
return Err(err);
534554
}
535555
}
536556
PacketId::BinaryEvent => {
537557
if let Err(err) = self.handle_binary_event(packet).await {
538-
self.callback(&Event::Error, err.to_string()).await?;
558+
self.callback(&Event::Error, err.to_string(), None).await?;
539559
}
540560
}
541561
PacketId::Connect => {
542562
*(self.disconnect_reason.write().await) = DisconnectReason::default();
543-
self.callback(&Event::Connect, "").await?;
563+
self.callback(&Event::Connect, "", None).await?;
544564
}
545565
PacketId::Disconnect => {
546566
*(self.disconnect_reason.write().await) = DisconnectReason::Server;
547-
self.callback(&Event::Close, CloseReason::IOServerDisconnect.as_str())
548-
.await?;
567+
self.callback(
568+
&Event::Close,
569+
CloseReason::IOServerDisconnect.as_str(),
570+
None,
571+
)
572+
.await?;
549573
}
550574
PacketId::ConnectError => {
551575
self.callback(
@@ -555,12 +579,13 @@ impl Client {
555579
.data
556580
.as_ref()
557581
.unwrap_or(&String::from("\"No error message provided\"")),
582+
None,
558583
)
559584
.await?;
560585
}
561586
PacketId::Event => {
562587
if let Err(err) = self.handle_event(packet).await {
563-
self.callback(&Event::Error, err.to_string()).await?;
588+
self.callback(&Event::Error, err.to_string(), None).await?;
564589
}
565590
}
566591
}
@@ -582,7 +607,7 @@ impl Client {
582607
None => None,
583608
Some(Err(err)) => {
584609
// call the error callback
585-
match self.callback(&Event::Error, err.to_string()).await {
610+
match self.callback(&Event::Error, err.to_string(), None).await {
586611
Err(callback_err) => Some((Err(callback_err), socket)),
587612
Ok(_) => Some((Err(err), socket)),
588613
}

0 commit comments

Comments
 (0)