Skip to content

Commit acc626a

Browse files
authored
Support one-shot router handlers (#418)
* Make Router::add_route private. Signed-off-by: Josh Matthews <[email protected]> * Add one-shot handlers to Router API. Signed-off-by: Josh Matthews <[email protected]> * Publish 0.21.0. Signed-off-by: Josh Matthews <[email protected]> * Ensure tests shut down router thread. Signed-off-by: Josh Matthews <[email protected]> * Ignore additional messages associated with one-shot handlers. Signed-off-by: Josh Matthews <[email protected]> --------- Signed-off-by: Josh Matthews <[email protected]>
1 parent d6cab32 commit acc626a

File tree

3 files changed

+88
-21
lines changed

3 files changed

+88
-21
lines changed

Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
[package]
22
name = "ipc-channel"
3-
version = "0.20.2"
3+
version = "0.21.0"
44
description = "A multiprocess drop-in replacement for Rust channels"
55
authors = ["The Servo Project Developers"]
66
license = "MIT OR Apache-2.0"

src/router.rs

Lines changed: 61 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,12 @@ pub struct RouterProxy {
3838
comm: Mutex<RouterProxyComm>,
3939
}
4040

41+
impl Drop for RouterProxy {
42+
fn drop(&mut self) {
43+
self.shutdown();
44+
}
45+
}
46+
4147
#[allow(clippy::new_without_default)]
4248
impl RouterProxy {
4349
pub fn new() -> RouterProxy {
@@ -62,11 +68,7 @@ impl RouterProxy {
6268

6369
/// Add a new (receiver, callback) pair to the router, and send a wakeup message
6470
/// to the router.
65-
///
66-
/// Consider using [add_typed_route](Self::add_typed_route) instead, which prevents
67-
/// mismatches between the receiver and callback types.
68-
#[deprecated(since = "0.19.0", note = "please use 'add_typed_route' instead")]
69-
pub fn add_route(&self, receiver: OpaqueIpcReceiver, callback: RouterHandler) {
71+
fn add_route(&self, receiver: OpaqueIpcReceiver, callback: RouterHandler) {
7072
let comm = self.comm.lock().unwrap();
7173

7274
if comm.shutdown {
@@ -81,11 +83,11 @@ impl RouterProxy {
8183

8284
/// Add a new `(receiver, callback)` pair to the router, and send a wakeup message
8385
/// to the router.
84-
///
85-
/// Unlike [add_route](Self::add_route) this method is strongly typed and guarantees
86-
/// that the `receiver` and the `callback` use the same message type.
87-
pub fn add_typed_route<T>(&self, receiver: IpcReceiver<T>, mut callback: TypedRouterHandler<T>)
88-
where
86+
pub fn add_typed_route<T>(
87+
&self,
88+
receiver: IpcReceiver<T>,
89+
mut callback: TypedRouterMultiHandler<T>,
90+
) where
8991
T: Serialize + for<'de> Deserialize<'de> + 'static,
9092
{
9193
// Before passing the message on to the callback, turn it into the appropriate type
@@ -94,8 +96,31 @@ impl RouterProxy {
9496
callback(typed_message)
9597
};
9698

97-
#[allow(deprecated)]
98-
self.add_route(receiver.to_opaque(), Box::new(modified_callback));
99+
self.add_route(
100+
receiver.to_opaque(),
101+
RouterHandler::Multi(Box::new(modified_callback)),
102+
);
103+
}
104+
105+
/// Add a new `(receiver, callback)` pair to the router, and send a wakeup message
106+
/// to the router.
107+
pub fn add_typed_one_shot_route<T>(
108+
&self,
109+
receiver: IpcReceiver<T>,
110+
callback: TypedRouterOneShotHandler<T>,
111+
) where
112+
T: Serialize + for<'de> Deserialize<'de> + 'static,
113+
{
114+
// Before passing the message on to the callback, turn it into the appropriate type
115+
let modified_callback = move |msg: IpcMessage| {
116+
let typed_message = msg.to::<T>();
117+
callback(typed_message)
118+
};
119+
120+
self.add_route(
121+
receiver.to_opaque(),
122+
RouterHandler::Once(Some(Box::new(modified_callback))),
123+
);
99124
}
100125

101126
/// Send a shutdown message to the router containing a ACK sender,
@@ -226,7 +251,16 @@ impl Router {
226251
},
227252
// Event from one of our registered receivers, call callback.
228253
IpcSelectionResult::MessageReceived(id, message) => {
229-
self.handlers.get_mut(&id).unwrap()(message)
254+
match self.handlers.get_mut(&id).unwrap() {
255+
RouterHandler::Once(handler) => {
256+
if let Some(handler) = handler.take() {
257+
(handler)(message);
258+
}
259+
},
260+
RouterHandler::Multi(ref mut handler) => {
261+
(handler)(message);
262+
},
263+
}
230264
},
231265
IpcSelectionResult::ChannelClosed(id) => {
232266
let _ = self.handlers.remove(&id).unwrap();
@@ -246,7 +280,18 @@ enum RouterMsg {
246280
}
247281

248282
/// Function to call when a new event is received from the corresponding receiver.
249-
pub type RouterHandler = Box<dyn FnMut(IpcMessage) + Send>;
283+
pub type RouterMultiHandler = Box<dyn FnMut(IpcMessage) + Send>;
284+
285+
/// Function to call the first time that a message is received from the corresponding receiver.
286+
pub type RouterOneShotHandler = Box<dyn FnOnce(IpcMessage) + Send>;
287+
288+
enum RouterHandler {
289+
Once(Option<RouterOneShotHandler>),
290+
Multi(RouterMultiHandler),
291+
}
292+
293+
/// Like [RouterMultiHandler] but includes the type that will be passed to the callback
294+
pub type TypedRouterMultiHandler<T> = Box<dyn FnMut(Result<T, bincode::Error>) + Send>;
250295

251-
/// Like [RouterHandler] but includes the type that will be passed to the callback
252-
pub type TypedRouterHandler<T> = Box<dyn FnMut(Result<T, bincode::Error>) + Send>;
296+
/// Like [RouterOneShotHandler] but includes the type that will be passed to the callback
297+
pub type TypedRouterOneShotHandler<T> = Box<dyn FnOnce(Result<T, bincode::Error>) + Send>;

src/test.rs

Lines changed: 26 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -274,11 +274,10 @@ fn router_simple_global() {
274274
tx.send(person.clone()).unwrap();
275275

276276
let (callback_fired_sender, callback_fired_receiver) = crossbeam_channel::unbounded::<Person>();
277-
#[allow(deprecated)]
278-
ROUTER.add_route(
279-
rx.to_opaque(),
277+
ROUTER.add_typed_route(
278+
rx,
280279
Box::new(move |person| {
281-
callback_fired_sender.send(person.to().unwrap()).unwrap();
280+
callback_fired_sender.send(person.unwrap()).unwrap();
282281
}),
283282
);
284283
let received_person = callback_fired_receiver.recv().unwrap();
@@ -357,6 +356,29 @@ fn router_routing_to_new_crossbeam_receiver() {
357356
assert_eq!(received_person, person);
358357
}
359358

359+
#[test]
360+
fn router_once_handler() {
361+
let person = ("Patrick Walton".to_owned(), 29);
362+
let (tx, rx) = ipc::channel().unwrap();
363+
let (tx2, rx2) = ipc::channel().unwrap();
364+
365+
let router = RouterProxy::new();
366+
let mut once_tx2 = Some(tx2);
367+
router.add_typed_one_shot_route(
368+
rx,
369+
Box::new(move |_msg| once_tx2.take().unwrap().send(()).unwrap()),
370+
);
371+
372+
// Send one single event.
373+
tx.send(person.clone()).unwrap();
374+
// Wait for acknowledgement that the callback ran.
375+
rx2.recv().unwrap();
376+
// This send should succeed but no handler should run. If it does run,
377+
// a panic will occur.
378+
tx.send(person.clone()).unwrap();
379+
assert!(rx2.recv().is_err());
380+
}
381+
360382
#[test]
361383
fn router_multiplexing() {
362384
let person = ("Patrick Walton".to_owned(), 29);

0 commit comments

Comments
 (0)