Skip to content

Commit 99a5602

Browse files
committed
fix(optimism): Remove loop in poll_next to prevent infinite cycles
1 parent e13b6be commit 99a5602

File tree

1 file changed

+36
-37
lines changed

1 file changed

+36
-37
lines changed

crates/optimism/flashblocks/src/ws/stream.rs

Lines changed: 36 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -72,53 +72,51 @@ where
7272
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
7373
let this = self.get_mut();
7474

75-
loop {
76-
if this.state == State::Initial {
77-
this.connect();
78-
}
75+
if this.state == State::Initial {
76+
this.connect();
77+
}
7978

80-
if this.state == State::Connect {
81-
match ready!(this.connect.poll_unpin(cx)) {
82-
Ok((sink, stream)) => this.stream(sink, stream),
83-
Err(err) => {
84-
this.connect();
79+
if this.state == State::Connect {
80+
match ready!(this.connect.poll_unpin(cx)) {
81+
Ok((sink, stream)) => this.stream(sink, stream),
82+
Err(err) => {
83+
this.connect();
8584

86-
return Poll::Ready(Some(Err(err)));
87-
}
85+
return Poll::Ready(Some(Err(err)));
8886
}
8987
}
88+
}
9089

91-
while let State::Stream(pong) = &mut this.state {
92-
if pong.is_some() {
93-
let mut sink = Pin::new(this.sink.as_mut().unwrap());
94-
let _ = ready!(sink.as_mut().poll_ready(cx));
95-
if let Some(pong) = pong.take() {
96-
let _ = sink.as_mut().start_send(pong);
97-
}
98-
let _ = ready!(sink.as_mut().poll_flush(cx));
90+
while let State::Stream(pong) = &mut this.state {
91+
if pong.is_some() {
92+
let mut sink = Pin::new(this.sink.as_mut().unwrap());
93+
let _ = ready!(sink.as_mut().poll_ready(cx));
94+
if let Some(pong) = pong.take() {
95+
let _ = sink.as_mut().start_send(pong);
9996
}
97+
let _ = ready!(sink.as_mut().poll_flush(cx));
98+
}
10099

101-
let Some(msg) = ready!(this
102-
.stream
103-
.as_mut()
104-
.expect("Stream state should be unreachable without stream")
105-
.poll_next_unpin(cx))
106-
else {
107-
this.connect();
100+
let Some(msg) = ready!(this
101+
.stream
102+
.as_mut()
103+
.expect("Stream state should be unreachable without stream")
104+
.poll_next_unpin(cx))
105+
else {
106+
this.connect();
108107

109-
break;
110-
};
108+
break;
109+
};
111110

112-
match msg {
113-
Ok(Message::Binary(bytes)) => {
114-
return Poll::Ready(Some(FlashBlock::decode(bytes)))
115-
}
116-
Ok(Message::Ping(bytes)) => this.ping(bytes),
117-
Ok(msg) => debug!("Received unexpected message: {:?}", msg),
118-
Err(err) => return Poll::Ready(Some(Err(err.into()))),
119-
}
111+
match msg {
112+
Ok(Message::Binary(bytes)) => return Poll::Ready(Some(FlashBlock::decode(bytes))),
113+
Ok(Message::Ping(bytes)) => this.ping(bytes),
114+
Ok(msg) => debug!("Received unexpected message: {:?}", msg),
115+
Err(err) => return Poll::Ready(Some(Err(err.into()))),
120116
}
121117
}
118+
119+
Poll::Pending
122120
}
123121
}
124122

@@ -223,6 +221,7 @@ mod tests {
223221
use crate::ExecutionPayloadBaseV1;
224222
use alloy_primitives::bytes::Bytes;
225223
use brotli::enc::BrotliEncoderParams;
224+
use futures_util::poll;
226225
use std::{future, iter};
227226
use tokio_tungstenite::tungstenite::{Error, Utf8Bytes};
228227

@@ -510,7 +509,7 @@ mod tests {
510509
let ws_url = "http://localhost".parse().unwrap();
511510
let mut stream = WsFlashBlockStream::with_connector(ws_url, connector);
512511

513-
let _ = stream.next().await;
512+
let _ = poll!(stream.next());
514513

515514
let FakeSink(actual_buffered_messages, actual_sent_messages) = stream.sink.unwrap();
516515

0 commit comments

Comments
 (0)