Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
68 changes: 50 additions & 18 deletions tokio/src/sync/mpsc/bounded.rs
Original file line number Diff line number Diff line change
Expand Up @@ -244,24 +244,56 @@ impl<T> Receiver<T> {

/// Receives the next values for this receiver and extends `buffer`.
///
/// This method extends `buffer` by no more than a fixed number of values
/// as specified by `limit`. If `limit` is zero, the function immediately
/// returns `0`. The return value is the number of values added to `buffer`.
///
/// For `limit > 0`, if there are no messages in the channel's queue, but
/// the channel has not yet been closed, this method will sleep until a
/// message is sent or the channel is closed. Note that if [`close`] is
/// called, but there are still outstanding [`Permits`] from before it was
/// closed, the channel is not considered closed by `recv_many` until the
/// permits are released.
///
/// For non-zero values of `limit`, this method will never return `0` unless
/// the channel has been closed and there are no remaining messages in the
/// channel's queue. This indicates that no further values can ever be
/// received from this `Receiver`. The channel is closed when all senders
/// have been dropped, or when [`close`] is called.
///
/// The capacity of `buffer` is increased as needed.
/// This method waits for at least one message and pushes it into the buffer,
/// then tries to extend `buffer` with up to `limit - 1` additional
/// messages that are immediately available in the channel buffer.
///
/// It does not receive any message in the following cases:
/// - The channel is closed, and there is no active [`Permit`].
/// - The `limit` is zero.
///
/// In other words, it is a more efficient version of the
/// following implementation.
/// ```
/// # struct Mock<T> {
/// # _p: std::marker::PhantomData<T>,
/// # }
/// # impl<T> Mock<T> {
/// #
/// # async fn recv(&mut self) -> Option<T> {
/// # todo!()
/// # }
/// #
/// # fn try_recv(&mut self) -> Result<T, ()> {
/// # todo!()
/// # }
/// async fn recv_many(&mut self, buffer: &mut Vec<T>, limit: usize) -> usize {
/// if limit == 0 {
/// return 0;
/// }
///
/// // Wait for at least one message (or channel close).
/// let Some(first_message) = self.recv().await else {
/// return 0;
/// };
///
/// buffer.push(first_message);
/// let mut num_pushed = 1;
///
/// // Try to get more messages, but don't sleep.
/// while num_pushed < limit {
/// if let Ok(msg) = self.try_recv() {
/// buffer.push(msg);
/// num_pushed += 1;
/// } else {
/// break;
/// }
/// }
///
/// num_pushed
/// }
/// # }
/// ```
///
/// # Cancel safety
///
Expand Down
Loading