From 6b97985e2ded99c57b050d62dfc145125d2256a7 Mon Sep 17 00:00:00 2001 From: liuzsen Date: Sat, 11 Oct 2025 18:07:52 +0800 Subject: [PATCH] fix(actix-ws): wake task when multiple continuation frames exist in WebSocket stream --- actix-ws/src/aggregated.rs | 140 +++++++++++++++++++------------------ 1 file changed, 73 insertions(+), 67 deletions(-) diff --git a/actix-ws/src/aggregated.rs b/actix-ws/src/aggregated.rs index 24a0add2b..d15b4a73e 100644 --- a/actix-ws/src/aggregated.rs +++ b/actix-ws/src/aggregated.rs @@ -110,94 +110,100 @@ impl Stream for AggregatedMessageStream { fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { let this = self.get_mut(); - let Some(msg) = ready!(Pin::new(&mut this.stream).poll_next(cx)?) else { - return Poll::Ready(None); - }; - - match msg { - Message::Continuation(item) => match item { - Item::FirstText(bytes) => { - this.continuation_kind = ContinuationKind::Text; - this.current_size += bytes.len(); - - if this.current_size > this.max_size { - this.continuations.clear(); - return size_error(); - } - - this.continuations.push(bytes); - - Poll::Pending - } + loop { + let Some(msg) = ready!(Pin::new(&mut this.stream).poll_next(cx)?) else { + return Poll::Ready(None); + }; + + match msg { + Message::Continuation(item) => match item { + Item::FirstText(bytes) => { + this.continuation_kind = ContinuationKind::Text; + this.current_size += bytes.len(); + + if this.current_size > this.max_size { + this.continuations.clear(); + return size_error(); + } - Item::FirstBinary(bytes) => { - this.continuation_kind = ContinuationKind::Binary; - this.current_size += bytes.len(); + this.continuations.push(bytes); - if this.current_size > this.max_size { - this.continuations.clear(); - return size_error(); + continue; } - this.continuations.push(bytes); + Item::FirstBinary(bytes) => { + this.continuation_kind = ContinuationKind::Binary; + this.current_size += bytes.len(); - Poll::Pending - } + if this.current_size > this.max_size { + this.continuations.clear(); + return size_error(); + } - Item::Continue(bytes) => { - this.current_size += bytes.len(); + this.continuations.push(bytes); - if this.current_size > this.max_size { - this.continuations.clear(); - return size_error(); + continue; } - this.continuations.push(bytes); + Item::Continue(bytes) => { + this.current_size += bytes.len(); - Poll::Pending - } + if this.current_size > this.max_size { + this.continuations.clear(); + return size_error(); + } - Item::Last(bytes) => { - this.current_size += bytes.len(); + this.continuations.push(bytes); - if this.current_size > this.max_size { - // reset current_size, as this is the last message for - // the current continuation - this.current_size = 0; - this.continuations.clear(); - - return size_error(); + continue; } - this.continuations.push(bytes); - let bytes = collect(&mut this.continuations); + Item::Last(bytes) => { + this.current_size += bytes.len(); - this.current_size = 0; + if this.current_size > this.max_size { + // reset current_size, as this is the last message for + // the current continuation + this.current_size = 0; + this.continuations.clear(); - match this.continuation_kind { - ContinuationKind::Text => { - Poll::Ready(Some(match ByteString::try_from(bytes) { - Ok(bytestring) => Ok(AggregatedMessage::Text(bytestring)), - Err(err) => Err(ProtocolError::Io(io::Error::new( - io::ErrorKind::InvalidData, - err.to_string(), - ))), - })) + return size_error(); } - ContinuationKind::Binary => { - Poll::Ready(Some(Ok(AggregatedMessage::Binary(bytes)))) + + this.continuations.push(bytes); + let bytes = collect(&mut this.continuations); + + this.current_size = 0; + + match this.continuation_kind { + ContinuationKind::Text => { + return Poll::Ready(Some(match ByteString::try_from(bytes) { + Ok(bytestring) => Ok(AggregatedMessage::Text(bytestring)), + Err(err) => Err(ProtocolError::Io(io::Error::new( + io::ErrorKind::InvalidData, + err.to_string(), + ))), + })) + } + ContinuationKind::Binary => { + return Poll::Ready(Some(Ok(AggregatedMessage::Binary(bytes)))) + } } } - } - }, + }, - Message::Text(text) => Poll::Ready(Some(Ok(AggregatedMessage::Text(text)))), - Message::Binary(binary) => Poll::Ready(Some(Ok(AggregatedMessage::Binary(binary)))), - Message::Ping(ping) => Poll::Ready(Some(Ok(AggregatedMessage::Ping(ping)))), - Message::Pong(pong) => Poll::Ready(Some(Ok(AggregatedMessage::Pong(pong)))), - Message::Close(close) => Poll::Ready(Some(Ok(AggregatedMessage::Close(close)))), + Message::Text(text) => return Poll::Ready(Some(Ok(AggregatedMessage::Text(text)))), + Message::Binary(binary) => { + return Poll::Ready(Some(Ok(AggregatedMessage::Binary(binary)))) + } + Message::Ping(ping) => return Poll::Ready(Some(Ok(AggregatedMessage::Ping(ping)))), + Message::Pong(pong) => return Poll::Ready(Some(Ok(AggregatedMessage::Pong(pong)))), + Message::Close(close) => { + return Poll::Ready(Some(Ok(AggregatedMessage::Close(close)))) + } - Message::Nop => unreachable!("MessageStream should not produce no-ops"), + Message::Nop => unreachable!("MessageStream should not produce no-ops"), + } } } }