Skip to content

Conversation

@liuzsen
Copy link

@liuzsen liuzsen commented Oct 11, 2025

PR Type

Bug Fix

PR Checklist

  • Tests for the changes have been added / updated.
  • Documentation comments have been added / updated.
  • A changelog entry has been made for the appropriate packages.
  • A format code with the nightly rustfmt (cargo +nightly fmt).

Overview

Current Behavior (Bug)

When using AggregatedMessageStream to process WebSocket continuation frames, the stream can become permanently stuck if multiple continuation frames are received in a single payload. Specifically:

  1. When a batch of continuation frames (e.g., FirstText + Continue + Last) arrives in one TCP packet
  2. MessageStream decodes all frames and stores them in its internal messages queue
  3. AggregatedMessageStream.poll_next() receives the Continue frame and returns Poll::Pending while waiting for the Last frame
  4. However, the Last frame remains in MessageStream.messages queue unprocessed
  5. Since MessageStream doesn't wake the task when returning the first message, AggregatedMessageStream never gets polled again
  6. The stream hangs indefinitely at AggregatedMessageStream.next().await

New Behavior (Fix)

This PR fixes the issue by ensuring the task is woken up when MessageStream has additional messages queued after returning the first message. The change is minimal and surgical:

// In MessageStream::poll_next()
if let Some(msg) = this.messages.pop_front() {
    if this.messages.front().is_some() {
        cx.waker().wake_by_ref(); // NEW: Wake task if more messages exist
    }
    return Poll::Ready(Some(Ok(msg)));
}

Technical Impact

  • Fixed: Deadlock in AggregatedMessageStream when processing multiple continuation frames
  • Preserved: All existing behavior for single-message scenarios
  • No Breaking Changes: The fix maintains backward compatibility

Root Cause Analysis

The bug occurs due to a missing wakeup signal in the async coordination between:

  • Producer (MessageStream): Has buffered continuation frames ready
  • Consumer (AggregatedMessageStream): Waits for Last frame after processing Continue

Without the wakeup, the consumer never knows more data is available in the producer's buffer.

Affected Scenarios

  • Large WebSocket messages fragmented into continuation frames
  • High-frequency WebSocket applications with message batching
  • Any application using AggregatedMessageStream for handling continuation frames

Risk Assessment

Risk Level: Very Low

  • Change Scope: 3 lines added to existing code
  • No API Changes: Public interface unchanged

@liuzsen liuzsen changed the title fix(actix-ws): wake task when multiple continuation frames exist in W… fix(actix-ws): wake task when multiple continuation frames exist in WebSocket stream Oct 11, 2025
@asonix
Copy link
Contributor

asonix commented Oct 11, 2025

I think this is the wrong solution to this problem. The bug is not that the message stream fails to produce a value when asked. The problem is that the AggregatedMessageStream isn't asking enough.

I took a quick look through the AggregatedMessageStream code and it does seem like rather than returning Poll::Pending for continuations, we should either loop in the current poll_next or recursively call poll_next. That way we could continue to process existing messages without inserting calls to wake that would affect more than just this case.

@liuzsen
Copy link
Author

liuzsen commented Oct 12, 2025

Thank you for your review. I actually considered the approach you suggested at first, but ultimately decided to go with the current solution for the following reasons:

  • From the documentation and design perspective, the current fix aligns well with the intended use of Wakers in async Rust—it precisely addresses the scenario where a task needs to be notified that more data is immediately available.
  • This fix is minimal, straightforward, and completely resolves the issue without introducing complex control flow.
  • From a layering standpoint, requiring AggregatedMessageStream to loop internally (or recursively call poll_next) would force its implementer to be aware of internal details of MessageStream—specifically, that it buffers multiple messages in a VecDeque. Even though both structs was authored by the same person, this coupling increases cognitive overhead. Moreover, if other developers later build their own MyAggregatedMessageStream on top of MessageStream, they shouldn’t need to know about such internal buffering behavior to avoid subtle bugs. In fact, it took me several hours to debug this exact issue precisely because the problem wasn’t obvious from the public interface.

@asonix
Copy link
Contributor

asonix commented Oct 12, 2025

But the bug is not with MessageStream. If you poll a message stream it will give you an item if it has one. If another future is not polling MessageStream, then it isn't MessageStream's job to decide to wake that other future.

For basic implementations like the following, inserting a call to wake here will result in potentially waking this task when there is nothing to do, which could incur a performance cost. For an application handling many websockets, this could lead to a large number of needless wakes.

while let Some(message) = message_stream.try_next().await? {
    do_other_stuff(message).await;
}

@liuzsen
Copy link
Author

liuzsen commented Oct 13, 2025

You're absolutely right, and I appreciate you pointing this out. On reflection, I agree that my original fix isn't ideal. It introduces an extra wake in MessageStream that could be confusing, since we're waking while returning Poll::Ready, and it isn't strictly necessary when MessageStream is used directly.

I was overly focused on fixing the AggregatedMessageStream and didn't fully consider the broader implications for MessageStream's standalone usage, even if that scenario is uncommon.

I now agree that the cleaner solution is to modify AggregatedMessageStream::poll_next to keep polling the inner MessageStream in a loop until it either completes an aggregated message or exhausts all immediately available frames. This keeps the logic where it belongs and avoids unnecessary wakes.

I'll update the PR with this approach. Thanks again for the thoughtful feedback!

@asonix
Copy link
Contributor

asonix commented Oct 13, 2025

I also don't appreciate the use of chatgpt to talk to me in this PR. I'd rather you just talk to me yourself.

@liuzsen
Copy link
Author

liuzsen commented Oct 13, 2025

很抱歉,我可以看懂英文,但不足以准确表达我的想法,所以我借助了 AI 翻译了我的内容,并反复校对。我可以保证的是,我发布的内容都是我个人真实的表达,而不是 AI 生成的(当然, AI 可能会使用一些不像人类的表达习惯,这可能是我无法分辨的)

@liuzsen
Copy link
Author

liuzsen commented Oct 13, 2025

我能理解你的感受,和机器人说话的确很浪费时间,后续我会直接使用中文发布内容,并使用 AI 严格翻译,不加润色

I understand how you feel—it really is a waste of time talking to a bot. Going forward, I’ll post content directly in Chinese and use AI for strict translation without any polishing or embellishment.

@liuzsen
Copy link
Author

liuzsen commented Oct 13, 2025

我更新了 PR,使用了前面所说的方式修复了问题,我使用自己的 web 项目进行了简单测试,的确解决了问题

I've updated the PR, fixing the issue using the approach mentioned earlier. I performed a quick test with my own web project, and it indeed resolved the problem.

Copy link
Contributor

@asonix asonix left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

looks good, thanks!

Copy link
Member

@JohnTitor JohnTitor left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

While the implementation itself makes sense, but it'd also be great if you could add a regression test in some form, e.g. queueing and simulating messages like your issue. Otherwise a regresion could be caused in a future development.

asonix added a commit to asonix/actix-extras that referenced this pull request Nov 10, 2025
This includes an ignored failing test for actix#607
@asonix asonix mentioned this pull request Nov 10, 2025
4 tasks
@asonix
Copy link
Contributor

asonix commented Nov 11, 2025

There is now an ignored test for this in the main branch.

@liuzsen
Copy link
Author

liuzsen commented Nov 18, 2025

Sorry for the long silence—my work situation had some serious changes recently, and I just didn’t have energy to focus on anything else. Do I still need to do anything here?

@asonix
Copy link
Contributor

asonix commented Nov 28, 2025

I think what's left to do is merge main into this branch, then enable the test and see if it passes

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants