diff --git a/tokio/src/sync/mpsc/bounded.rs b/tokio/src/sync/mpsc/bounded.rs index 06eeffc3f09..eccf780b02f 100644 --- a/tokio/src/sync/mpsc/bounded.rs +++ b/tokio/src/sync/mpsc/bounded.rs @@ -244,24 +244,56 @@ impl Receiver { /// Receives the next values for this receiver and extends `buffer`. /// - /// This method extends `buffer` by no more than a fixed number of values - /// as specified by `limit`. If `limit` is zero, the function immediately - /// returns `0`. The return value is the number of values added to `buffer`. - /// - /// For `limit > 0`, if there are no messages in the channel's queue, but - /// the channel has not yet been closed, this method will sleep until a - /// message is sent or the channel is closed. Note that if [`close`] is - /// called, but there are still outstanding [`Permits`] from before it was - /// closed, the channel is not considered closed by `recv_many` until the - /// permits are released. - /// - /// For non-zero values of `limit`, this method will never return `0` unless - /// the channel has been closed and there are no remaining messages in the - /// channel's queue. This indicates that no further values can ever be - /// received from this `Receiver`. The channel is closed when all senders - /// have been dropped, or when [`close`] is called. - /// - /// The capacity of `buffer` is increased as needed. + /// This method waits for at least one message and pushes it into the buffer, + /// then tries to extend `buffer` with up to `limit - 1` additional + /// messages that are immediately available in the channel buffer. + /// + /// It does not receive any message in the following cases: + /// - The channel is closed, and there is no active [`Permit`]. + /// - The `limit` is zero. + /// + /// In other words, it is a more efficient version of the + /// following implementation. + /// ``` + /// # struct Mock { + /// # _p: std::marker::PhantomData, + /// # } + /// # impl Mock { + /// # + /// # async fn recv(&mut self) -> Option { + /// # todo!() + /// # } + /// # + /// # fn try_recv(&mut self) -> Result { + /// # todo!() + /// # } + /// async fn recv_many(&mut self, buffer: &mut Vec, limit: usize) -> usize { + /// if limit == 0 { + /// return 0; + /// } + /// + /// // Wait for at least one message (or channel close). + /// let Some(first_message) = self.recv().await else { + /// return 0; + /// }; + /// + /// buffer.push(first_message); + /// let mut num_pushed = 1; + /// + /// // Try to get more messages, but don't sleep. + /// while num_pushed < limit { + /// if let Ok(msg) = self.try_recv() { + /// buffer.push(msg); + /// num_pushed += 1; + /// } else { + /// break; + /// } + /// } + /// + /// num_pushed + /// } + /// # } + /// ``` /// /// # Cancel safety ///