Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 18 additions & 1 deletion http-body-util/src/collected.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use std::{
task::{Context, Poll},
};

use bytes::{Buf, Bytes};
use bytes::{Buf, Bytes, BytesMut};
use http::HeaderMap;
use http_body::{Body, Frame};

Expand Down Expand Up @@ -38,6 +38,11 @@ impl<B: Buf> Collected<B> {
self.bufs.copy_to_bytes(self.bufs.remaining())
}

/// Convert this body into a [`BytesMut`].
pub fn to_bytes_mut(mut self) -> BytesMut {
self.bufs.copy_to_bytes_mut(self.bufs.remaining())
}

pub(crate) fn push_frame(&mut self, frame: Frame<B>) {
let frame = match frame.into_data() {
Ok(data) => {
Expand Down Expand Up @@ -125,6 +130,18 @@ mod tests {
assert_eq!(&buf.copy_to_bytes(buf.remaining())[..], b"helloworld!");
}

#[tokio::test]
async fn segmented_body_mut() {
let bufs = [&b"hello"[..], &b"world"[..], &b"!"[..]];
let body = StreamBody::new(stream::iter(bufs.map(Frame::data).map(Ok::<_, Infallible>)));

let buffered = body.collect().await.unwrap();

let mut buf = buffered.to_bytes_mut();

assert_eq!(&buf.copy_to_bytes(buf.remaining())[..], b"helloworld!");
}

#[tokio::test]
async fn delayed_segments() {
let one = stream::once(async { Ok::<_, Infallible>(Frame::data(&b"hello "[..])) });
Expand Down
14 changes: 10 additions & 4 deletions http-body-util/src/util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,14 @@ impl<T: Buf> BufList<T> {
pub(crate) fn pop(&mut self) -> Option<T> {
self.bufs.pop_front()
}

#[inline]
pub(crate) fn copy_to_bytes_mut(&mut self, len: usize) -> BytesMut {
assert!(len <= self.remaining(), "`len` greater than remaining");
let mut bm = BytesMut::with_capacity(len);
bm.put(self.take(len));
bm
}
}

impl<T: Buf> Buf for BufList<T> {
Expand Down Expand Up @@ -77,10 +85,8 @@ impl<T: Buf> Buf for BufList<T> {
}
Some(front) if front.remaining() > len => front.copy_to_bytes(len),
_ => {
assert!(len <= self.remaining(), "`len` greater than remaining");
let mut bm = BytesMut::with_capacity(len);
bm.put(self.take(len));
bm.freeze()
let bytes_mut = self.copy_to_bytes_mut(len);
bytes_mut.freeze()
}
}
}
Expand Down