Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
140 changes: 73 additions & 67 deletions actix-ws/src/aggregated.rs
Original file line number Diff line number Diff line change
Expand Up @@ -110,94 +110,100 @@ impl Stream for AggregatedMessageStream {
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let this = self.get_mut();

let Some(msg) = ready!(Pin::new(&mut this.stream).poll_next(cx)?) else {
return Poll::Ready(None);
};

match msg {
Message::Continuation(item) => match item {
Item::FirstText(bytes) => {
this.continuation_kind = ContinuationKind::Text;
this.current_size += bytes.len();

if this.current_size > this.max_size {
this.continuations.clear();
return size_error();
}

this.continuations.push(bytes);

Poll::Pending
}
loop {
let Some(msg) = ready!(Pin::new(&mut this.stream).poll_next(cx)?) else {
return Poll::Ready(None);
};

match msg {
Message::Continuation(item) => match item {
Item::FirstText(bytes) => {
this.continuation_kind = ContinuationKind::Text;
this.current_size += bytes.len();

if this.current_size > this.max_size {
this.continuations.clear();
return size_error();
}

Item::FirstBinary(bytes) => {
this.continuation_kind = ContinuationKind::Binary;
this.current_size += bytes.len();
this.continuations.push(bytes);

if this.current_size > this.max_size {
this.continuations.clear();
return size_error();
continue;
}

this.continuations.push(bytes);
Item::FirstBinary(bytes) => {
this.continuation_kind = ContinuationKind::Binary;
this.current_size += bytes.len();

Poll::Pending
}
if this.current_size > this.max_size {
this.continuations.clear();
return size_error();
}

Item::Continue(bytes) => {
this.current_size += bytes.len();
this.continuations.push(bytes);

if this.current_size > this.max_size {
this.continuations.clear();
return size_error();
continue;
}

this.continuations.push(bytes);
Item::Continue(bytes) => {
this.current_size += bytes.len();

Poll::Pending
}
if this.current_size > this.max_size {
this.continuations.clear();
return size_error();
}

Item::Last(bytes) => {
this.current_size += bytes.len();
this.continuations.push(bytes);

if this.current_size > this.max_size {
// reset current_size, as this is the last message for
// the current continuation
this.current_size = 0;
this.continuations.clear();

return size_error();
continue;
}

this.continuations.push(bytes);
let bytes = collect(&mut this.continuations);
Item::Last(bytes) => {
this.current_size += bytes.len();

this.current_size = 0;
if this.current_size > this.max_size {
// reset current_size, as this is the last message for
// the current continuation
this.current_size = 0;
this.continuations.clear();

match this.continuation_kind {
ContinuationKind::Text => {
Poll::Ready(Some(match ByteString::try_from(bytes) {
Ok(bytestring) => Ok(AggregatedMessage::Text(bytestring)),
Err(err) => Err(ProtocolError::Io(io::Error::new(
io::ErrorKind::InvalidData,
err.to_string(),
))),
}))
return size_error();
}
ContinuationKind::Binary => {
Poll::Ready(Some(Ok(AggregatedMessage::Binary(bytes))))

this.continuations.push(bytes);
let bytes = collect(&mut this.continuations);

this.current_size = 0;

match this.continuation_kind {
ContinuationKind::Text => {
return Poll::Ready(Some(match ByteString::try_from(bytes) {
Ok(bytestring) => Ok(AggregatedMessage::Text(bytestring)),
Err(err) => Err(ProtocolError::Io(io::Error::new(
io::ErrorKind::InvalidData,
err.to_string(),
))),
}))
}
ContinuationKind::Binary => {
return Poll::Ready(Some(Ok(AggregatedMessage::Binary(bytes))))
}
}
}
}
},
},

Message::Text(text) => Poll::Ready(Some(Ok(AggregatedMessage::Text(text)))),
Message::Binary(binary) => Poll::Ready(Some(Ok(AggregatedMessage::Binary(binary)))),
Message::Ping(ping) => Poll::Ready(Some(Ok(AggregatedMessage::Ping(ping)))),
Message::Pong(pong) => Poll::Ready(Some(Ok(AggregatedMessage::Pong(pong)))),
Message::Close(close) => Poll::Ready(Some(Ok(AggregatedMessage::Close(close)))),
Message::Text(text) => return Poll::Ready(Some(Ok(AggregatedMessage::Text(text)))),
Message::Binary(binary) => {
return Poll::Ready(Some(Ok(AggregatedMessage::Binary(binary))))
}
Message::Ping(ping) => return Poll::Ready(Some(Ok(AggregatedMessage::Ping(ping)))),
Message::Pong(pong) => return Poll::Ready(Some(Ok(AggregatedMessage::Pong(pong)))),
Message::Close(close) => {
return Poll::Ready(Some(Ok(AggregatedMessage::Close(close))))
}

Message::Nop => unreachable!("MessageStream should not produce no-ops"),
Message::Nop => unreachable!("MessageStream should not produce no-ops"),
}
}
}
}
Expand Down
Loading