Skip to content

Commit e13b6be

Browse files
committed
fix(optimism): Reconnect if ws stream ends in WsFlashBlockStream
1 parent 3d8d7ce commit e13b6be

File tree

1 file changed

+33
-6
lines changed

1 file changed

+33
-6
lines changed

crates/optimism/flashblocks/src/ws/stream.rs

Lines changed: 33 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -81,7 +81,7 @@ where
8181
match ready!(this.connect.poll_unpin(cx)) {
8282
Ok((sink, stream)) => this.stream(sink, stream),
8383
Err(err) => {
84-
this.state = State::Initial;
84+
this.connect();
8585

8686
return Poll::Ready(Some(Err(err)));
8787
}
@@ -104,7 +104,9 @@ where
104104
.expect("Stream state should be unreachable without stream")
105105
.poll_next_unpin(cx))
106106
else {
107-
return Poll::Ready(None);
107+
this.connect();
108+
109+
break;
108110
};
109111

110112
match msg {
@@ -432,18 +434,43 @@ mod tests {
432434
let ws_url = "http://localhost".parse().unwrap();
433435
let stream = WsFlashBlockStream::with_connector(ws_url, messages);
434436

435-
let actual_messages: Vec<_> = stream.map(Result::unwrap).collect().await;
437+
let actual_messages: Vec<_> = stream.take(1).map(Result::unwrap).collect().await;
436438
let expected_messages = flashblocks.to_vec();
437439

438440
assert_eq!(actual_messages, expected_messages);
439441
}
440442

441443
#[tokio::test]
442444
async fn test_stream_ignores_non_binary_message() {
443-
let messages = FakeConnector::from([Ok(Message::Text(Utf8Bytes::from("test")))]);
445+
let flashblock = FlashBlock {
446+
payload_id: Default::default(),
447+
index: 0,
448+
base: Some(ExecutionPayloadBaseV1 {
449+
parent_beacon_block_root: Default::default(),
450+
parent_hash: Default::default(),
451+
fee_recipient: Default::default(),
452+
prev_randao: Default::default(),
453+
block_number: 0,
454+
gas_limit: 0,
455+
timestamp: 0,
456+
extra_data: Default::default(),
457+
base_fee_per_gas: Default::default(),
458+
}),
459+
diff: Default::default(),
460+
metadata: Default::default(),
461+
};
462+
let messages = FakeConnector::from([
463+
Ok(Message::Text(Utf8Bytes::from("test"))),
464+
to_json_message(&flashblock),
465+
]);
444466
let ws_url = "http://localhost".parse().unwrap();
445467
let mut stream = WsFlashBlockStream::with_connector(ws_url, messages);
446-
assert!(stream.next().await.is_none());
468+
469+
let expected_message = flashblock;
470+
let actual_message =
471+
stream.next().await.expect("Binary message should not be ignored").unwrap();
472+
473+
assert_eq!(actual_message, expected_message)
447474
}
448475

449476
#[tokio::test]
@@ -453,7 +480,7 @@ mod tests {
453480
let stream = WsFlashBlockStream::with_connector(ws_url, messages);
454481

455482
let actual_messages: Vec<_> =
456-
stream.map(Result::unwrap_err).map(|e| format!("{e}")).collect().await;
483+
stream.take(1).map(Result::unwrap_err).map(|e| format!("{e}")).collect().await;
457484
let expected_messages = vec!["Attack attempt detected".to_owned()];
458485

459486
assert_eq!(actual_messages, expected_messages);

0 commit comments

Comments
 (0)