Skip to content

Commit e0f6026

Browse files
sanityclaude
andauthored
fix: prevent message loss in peer_connection_listener race condition (#2255)
Co-authored-by: Claude Opus 4.5 <[email protected]>
1 parent 5243d77 commit e0f6026

File tree

7 files changed

+460
-508
lines changed

7 files changed

+460
-508
lines changed

crates/core/src/client_events/combinator.rs

Lines changed: 47 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -138,32 +138,67 @@ impl<const N: usize> super::ClientEventsProxy for ClientEventsCombinator<N> {
138138
}
139139
}
140140

141+
/// Handles bidirectional communication between a client and the host.
142+
///
143+
/// Uses `tokio::select! { biased; ... }` to ensure host responses are always
144+
/// processed before client disconnect errors, preventing lost responses.
145+
/// The `biased` modifier ensures futures are polled in declaration order,
146+
/// and if multiple are ready, the first one wins.
141147
async fn client_fn(
142148
mut client: BoxedClient,
143149
mut rx: Receiver<(ClientId, HostResult)>,
144150
tx_host: Sender<Result<OpenRequest<'static>, ClientError>>,
145151
) {
146152
loop {
147153
tokio::select! {
148-
host_msg = rx.recv() => {
149-
if let Some((client_id, response)) = host_msg {
150-
if client.send(client_id, response).await.is_err() {
154+
biased;
155+
156+
// Priority 1: Host responses (highest priority)
157+
// Ensures responses are sent to client before handling disconnect
158+
msg = rx.recv() => {
159+
match msg {
160+
Some((client_id, response)) => {
161+
if client.send(client_id, response).await.is_err() {
162+
break;
163+
}
164+
}
165+
None => {
166+
tracing::debug!("disconnected host");
151167
break;
152168
}
153-
} else {
154-
tracing::debug!("disconnected host");
155-
break;
156169
}
157170
}
158-
client_msg = client.recv() => {
159-
match client_msg {
160-
Ok(OpenRequest { client_id, request_id, request, notification_channel, token, attested_contract }) => {
161-
tracing::debug!("received msg @ combinator from external id {client_id}, msg: {request}");
162-
if tx_host.send(Ok(OpenRequest { client_id, request_id, request, notification_channel, token, attested_contract })).await.is_err() {
171+
172+
// Priority 2: Client requests
173+
result = client.recv() => {
174+
match result {
175+
Ok(OpenRequest {
176+
client_id,
177+
request_id,
178+
request,
179+
notification_channel,
180+
token,
181+
attested_contract,
182+
}) => {
183+
tracing::debug!(
184+
"received msg @ combinator from external id {client_id}, msg: {request}"
185+
);
186+
if tx_host
187+
.send(Ok(OpenRequest {
188+
client_id,
189+
request_id,
190+
request,
191+
notification_channel,
192+
token,
193+
attested_contract,
194+
}))
195+
.await
196+
.is_err()
197+
{
163198
break;
164199
}
165200
}
166-
Err(err) if matches!(err.kind(), ErrorKind::ChannelClosed) =>{
201+
Err(err) if matches!(err.kind(), ErrorKind::ChannelClosed) => {
167202
tracing::debug!("disconnected client");
168203
let _ = tx_host.send(Err(err)).await;
169204
break;

crates/core/src/contract/executor.rs

Lines changed: 17 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -6,9 +6,13 @@ use std::collections::{HashMap, HashSet};
66
use std::fmt::Display;
77
use std::future::Future;
88
use std::path::PathBuf;
9+
use std::pin::Pin;
910
use std::sync::Arc;
11+
use std::task::{Context, Poll};
1012
use std::time::{Duration, Instant};
1113

14+
use futures::Stream;
15+
1216
use either::Either;
1317
use freenet_stdlib::client_api::{
1418
ClientError as WsClientError, ClientRequest, ContractError as StdContractError,
@@ -343,16 +347,11 @@ impl ExecutorToEventLoopChannel<ExecutorHalve> {
343347
impl ExecutorToEventLoopChannel<NetworkEventListenerHalve> {
344348
pub async fn transaction_from_executor(&mut self) -> anyhow::Result<Transaction> {
345349
tracing::trace!("Waiting to receive transaction from executor channel");
346-
let tx = self
347-
.end
348-
.waiting_for_op_rx
349-
.recv()
350-
.await
351-
.ok_or_else(|| {
352-
tracing::error!("Executor channel closed - all senders have been dropped");
353-
tracing::error!("This typically happens when: 1) The executor task panicked/exited, 2) Network timeout cascaded to channel closure, 3) Resource constraints in CI");
354-
anyhow::anyhow!("channel closed")
355-
})?;
350+
let tx = self.end.waiting_for_op_rx.recv().await.ok_or_else(|| {
351+
tracing::error!("Executor channel closed - all senders have been dropped");
352+
tracing::error!("This typically happens when: 1) The executor task panicked/exited, 2) Network timeout cascaded to channel closure, 3) Resource constraints in CI");
353+
anyhow::anyhow!("channel closed")
354+
})?;
356355
tracing::trace!("Successfully received transaction from executor channel");
357356
Ok(tx)
358357
}
@@ -367,6 +366,14 @@ impl ExecutorToEventLoopChannel<NetworkEventListenerHalve> {
367366
}
368367
}
369368

369+
impl Stream for ExecutorToEventLoopChannel<NetworkEventListenerHalve> {
370+
type Item = Transaction;
371+
372+
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
373+
Pin::new(&mut self.end.waiting_for_op_rx).poll_recv(cx)
374+
}
375+
}
376+
370377
impl ExecutorToEventLoopChannel<Callback> {
371378
pub async fn response(&mut self, result: OpEnum) {
372379
if self.end.response_for_tx.send(result).await.is_err() {

crates/core/src/contract/handler.rs

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,10 +9,14 @@
99
use std::collections::BTreeMap;
1010
use std::future::Future;
1111
use std::hash::Hash;
12+
use std::pin::Pin;
1213
use std::sync::atomic::{AtomicU64, Ordering::SeqCst};
1314
use std::sync::Arc;
15+
use std::task::{Context, Poll};
1416
use std::time::Duration;
1517

18+
use futures::Stream;
19+
1620
use freenet_stdlib::client_api::DelegateRequest;
1721
use freenet_stdlib::prelude::*;
1822
use serde::{Deserialize, Serialize};
@@ -281,6 +285,14 @@ impl ContractHandlerChannel<WaitingResolution> {
281285
}
282286
}
283287

288+
impl Stream for ContractHandlerChannel<WaitingResolution> {
289+
type Item = (ClientId, WaitingTransaction);
290+
291+
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
292+
Pin::new(&mut self.end.wait_for_res_rx).poll_recv(cx)
293+
}
294+
}
295+
284296
impl ContractHandlerChannel<SenderHalve> {
285297
// TODO: the timeout should be derived from whatever is the worst
286298
// case we are willing to accept for waiting out for an event;

crates/core/src/node/network_bridge/p2p_protoc.rs

Lines changed: 79 additions & 63 deletions
Original file line numberDiff line numberDiff line change
@@ -2075,6 +2075,11 @@ impl P2pConnManager {
20752075
tokio::spawn(async move {
20762076
peer_connection_listener(rx, connection, peer_addr, conn_events).await;
20772077
});
2078+
// Yield to allow the spawned peer_connection_listener task to start.
2079+
// This is important because on some runtimes (especially in tests with boxed_local
2080+
// futures), spawned tasks may not be scheduled immediately, causing messages
2081+
// sent to the channel to pile up without being processed.
2082+
tokio::task::yield_now().await;
20782083
newly_inserted = true;
20792084
} else {
20802085
tracing::debug!(
@@ -2669,42 +2674,47 @@ async fn notify_transport_closed(
26692674
}
26702675
}
26712676

2677+
/// Listens for messages on a peer connection using drain-then-select pattern.
2678+
///
2679+
/// On each iteration, drains all pending outbound messages via `try_recv()` before
2680+
/// waiting for either new outbound or inbound messages. This approach:
2681+
/// 1. Ensures queued outbound messages are sent promptly
2682+
/// 2. Avoids starving inbound (which would happen with biased select)
2683+
/// 3. No messages are lost due to poll ordering
26722684
async fn peer_connection_listener(
26732685
mut rx: PeerConnChannelRecv,
26742686
mut conn: PeerConnection,
26752687
peer_addr: SocketAddr,
26762688
conn_events: Sender<ConnEvent>,
26772689
) {
2678-
const MAX_IMMEDIATE_SENDS: usize = 32;
26792690
let remote_addr = conn.remote_addr();
26802691
tracing::debug!(
26812692
to = %remote_addr,
26822693
%peer_addr,
26832694
"[CONN_LIFECYCLE] Starting peer_connection_listener task"
26842695
);
2696+
26852697
loop {
2686-
let mut drained = 0;
2698+
// Drain all pending outbound messages first before checking inbound.
2699+
// This ensures queued outbound messages are sent promptly without starving
2700+
// the inbound channel (which would happen with biased select).
26872701
loop {
26882702
match rx.try_recv() {
26892703
Ok(msg) => {
26902704
if let Err(error) = handle_peer_channel_message(&mut conn, msg).await {
26912705
tracing::debug!(
26922706
to = %remote_addr,
26932707
?error,
2694-
"[CONN_LIFECYCLE] Shutting down connection after send failure"
2708+
"[CONN_LIFECYCLE] Connection closed after channel command"
26952709
);
26962710
notify_transport_closed(&conn_events, remote_addr, error).await;
26972711
return;
26982712
}
2699-
drained += 1;
2700-
if drained >= MAX_IMMEDIATE_SENDS {
2701-
break;
2702-
}
27032713
}
27042714
Err(TryRecvError::Empty) => break,
27052715
Err(TryRecvError::Disconnected) => {
27062716
tracing::warn!(
2707-
to = %conn.remote_addr(),
2717+
to = %remote_addr,
27082718
"[CONN_LIFECYCLE] peer_connection_listener channel closed without explicit DropConnection"
27092719
);
27102720
notify_transport_closed(
@@ -2718,74 +2728,80 @@ async fn peer_connection_listener(
27182728
}
27192729
}
27202730

2731+
// Now wait for either new outbound or inbound messages fairly
27212732
tokio::select! {
27222733
msg = rx.recv() => {
2723-
let Some(msg) = msg else {
2724-
tracing::warn!(
2725-
to = %conn.remote_addr(),
2726-
"[CONN_LIFECYCLE] peer_connection_listener channel closed without explicit DropConnection"
2727-
);
2728-
notify_transport_closed(
2729-
&conn_events,
2730-
remote_addr,
2731-
TransportError::ConnectionClosed(remote_addr),
2732-
)
2733-
.await;
2734-
return;
2735-
};
2736-
if let Err(error) = handle_peer_channel_message(&mut conn, msg).await {
2737-
tracing::debug!(
2738-
to = %remote_addr,
2739-
?error,
2740-
"[CONN_LIFECYCLE] Connection closed after channel command"
2741-
);
2742-
notify_transport_closed(&conn_events, remote_addr, error).await;
2743-
return;
2734+
match msg {
2735+
Some(msg) => {
2736+
if let Err(error) = handle_peer_channel_message(&mut conn, msg).await {
2737+
tracing::debug!(
2738+
to = %remote_addr,
2739+
?error,
2740+
"[CONN_LIFECYCLE] Connection closed after channel command"
2741+
);
2742+
notify_transport_closed(&conn_events, remote_addr, error).await;
2743+
return;
2744+
}
2745+
}
2746+
None => {
2747+
tracing::warn!(
2748+
to = %remote_addr,
2749+
"[CONN_LIFECYCLE] peer_connection_listener channel closed without explicit DropConnection"
2750+
);
2751+
notify_transport_closed(
2752+
&conn_events,
2753+
remote_addr,
2754+
TransportError::ConnectionClosed(remote_addr),
2755+
)
2756+
.await;
2757+
return;
2758+
}
27442759
}
27452760
}
2761+
27462762
msg = conn.recv() => {
27472763
match msg {
2748-
Ok(msg) => {
2749-
match decode_msg(&msg) {
2750-
Ok(net_message) => {
2751-
let tx = *net_message.id();
2764+
Ok(msg) => match decode_msg(&msg) {
2765+
Ok(net_message) => {
2766+
let tx = *net_message.id();
2767+
tracing::debug!(
2768+
from = %remote_addr,
2769+
%tx,
2770+
tx_type = ?tx.transaction_type(),
2771+
msg_type = %net_message,
2772+
"[CONN_LIFECYCLE] Received inbound NetMessage from peer"
2773+
);
2774+
if conn_events
2775+
.send(ConnEvent::InboundMessage(IncomingMessage::with_remote(
2776+
net_message,
2777+
remote_addr,
2778+
)))
2779+
.await
2780+
.is_err()
2781+
{
27522782
tracing::debug!(
2753-
from = %conn.remote_addr(),
2754-
%tx,
2755-
tx_type = ?tx.transaction_type(),
2756-
msg_type = %net_message,
2757-
"[CONN_LIFECYCLE] Received inbound NetMessage from peer"
2758-
);
2759-
if conn_events.send(ConnEvent::InboundMessage(IncomingMessage::with_remote(net_message, remote_addr))).await.is_err() {
2760-
tracing::debug!(
2761-
from = %remote_addr,
2762-
"[CONN_LIFECYCLE] conn_events receiver dropped; stopping listener"
2763-
);
2764-
return;
2765-
}
2766-
}
2767-
Err(error) => {
2768-
tracing::error!(
2769-
from = %conn.remote_addr(),
2770-
?error,
2771-
"[CONN_LIFECYCLE] Failed to deserialize inbound message; closing connection"
2783+
from = %remote_addr,
2784+
"[CONN_LIFECYCLE] conn_events receiver dropped; stopping listener"
27722785
);
2773-
let transport_error = TransportError::Other(anyhow!(
2774-
"Failed to deserialize inbound message from {remote_addr}: {error:?}"
2775-
));
2776-
notify_transport_closed(
2777-
&conn_events,
2778-
remote_addr,
2779-
transport_error,
2780-
)
2781-
.await;
27822786
return;
27832787
}
27842788
}
2785-
}
2789+
Err(error) => {
2790+
tracing::error!(
2791+
from = %remote_addr,
2792+
?error,
2793+
"[CONN_LIFECYCLE] Failed to deserialize inbound message; closing connection"
2794+
);
2795+
let transport_error = TransportError::Other(anyhow!(
2796+
"Failed to deserialize inbound message from {remote_addr}: {error:?}"
2797+
));
2798+
notify_transport_closed(&conn_events, remote_addr, transport_error).await;
2799+
return;
2800+
}
2801+
},
27862802
Err(error) => {
27872803
tracing::debug!(
2788-
from = %conn.remote_addr(),
2804+
from = %remote_addr,
27892805
?error,
27902806
"[CONN_LIFECYCLE] peer_connection_listener terminating after recv error"
27912807
);

0 commit comments

Comments
 (0)