Skip to content

Commit 999a600

Browse files
taiki-ecarllerche
authored andcommitted
io: add async BufReader/BufWriter (#1438)
1 parent fb9809c commit 999a600

File tree

6 files changed

+342
-3
lines changed

6 files changed

+342
-3
lines changed

tokio-io/Cargo.toml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,13 +20,14 @@ Core I/O primitives for asynchronous I/O in Rust.
2020
categories = ["asynchronous"]
2121

2222
[features]
23-
util = ["memchr"]
23+
util = ["memchr", "pin-utils"]
2424

2525
[dependencies]
2626
bytes = "0.4.7"
2727
log = "0.4"
2828
futures-core-preview = "=0.3.0-alpha.18"
2929
memchr = { version = "2.2", optional = true }
30+
pin-utils = { version = "=0.1.0-alpha.4", optional = true }
3031

3132
[dev-dependencies]
3233
tokio = { version = "=0.2.0-alpha.1", path = "../tokio" }

tokio-io/src/io/buf_reader.rs

Lines changed: 168 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,168 @@
1+
use super::DEFAULT_BUF_SIZE;
2+
use crate::{AsyncBufRead, AsyncRead};
3+
use futures_core::ready;
4+
use pin_utils::{unsafe_pinned, unsafe_unpinned};
5+
use std::io::{self, Read};
6+
use std::pin::Pin;
7+
use std::task::{Context, Poll};
8+
use std::{cmp, fmt};
9+
10+
/// The `BufReader` struct adds buffering to any reader.
11+
///
12+
/// It can be excessively inefficient to work directly with a [`AsyncRead`]
13+
/// instance. A `BufReader` performs large, infrequent reads on the underlying
14+
/// [`AsyncRead`] and maintains an in-memory buffer of the results.
15+
///
16+
/// `BufReader` can improve the speed of programs that make *small* and
17+
/// *repeated* read calls to the same file or network socket. It does not
18+
/// help when reading very large amounts at once, or reading just one or a few
19+
/// times. It also provides no advantage when reading from a source that is
20+
/// already in memory, like a `Vec<u8>`.
21+
///
22+
/// When the `BufReader` is dropped, the contents of its buffer will be
23+
/// discarded. Creating multiple instances of a `BufReader` on the same
24+
/// stream can cause data loss.
25+
///
26+
/// [`AsyncRead`]: tokio_io::AsyncRead
27+
///
28+
// TODO: Examples
29+
pub struct BufReader<R> {
30+
inner: R,
31+
buf: Box<[u8]>,
32+
pos: usize,
33+
cap: usize,
34+
}
35+
36+
impl<R: AsyncRead> BufReader<R> {
37+
unsafe_pinned!(inner: R);
38+
unsafe_unpinned!(pos: usize);
39+
unsafe_unpinned!(cap: usize);
40+
41+
/// Creates a new `BufReader` with a default buffer capacity. The default is currently 8 KB,
42+
/// but may change in the future.
43+
pub fn new(inner: R) -> Self {
44+
Self::with_capacity(DEFAULT_BUF_SIZE, inner)
45+
}
46+
47+
/// Creates a new `BufReader` with the specified buffer capacity.
48+
pub fn with_capacity(capacity: usize, inner: R) -> Self {
49+
unsafe {
50+
let mut buffer = Vec::with_capacity(capacity);
51+
buffer.set_len(capacity);
52+
inner.prepare_uninitialized_buffer(&mut buffer);
53+
Self {
54+
inner,
55+
buf: buffer.into_boxed_slice(),
56+
pos: 0,
57+
cap: 0,
58+
}
59+
}
60+
}
61+
62+
/// Gets a reference to the underlying reader.
63+
///
64+
/// It is inadvisable to directly read from the underlying reader.
65+
pub fn get_ref(&self) -> &R {
66+
&self.inner
67+
}
68+
69+
/// Gets a mutable reference to the underlying reader.
70+
///
71+
/// It is inadvisable to directly read from the underlying reader.
72+
pub fn get_mut(&mut self) -> &mut R {
73+
&mut self.inner
74+
}
75+
76+
/// Gets a pinned mutable reference to the underlying reader.
77+
///
78+
/// It is inadvisable to directly read from the underlying reader.
79+
pub fn get_pin_mut(self: Pin<&mut Self>) -> Pin<&mut R> {
80+
self.inner()
81+
}
82+
83+
/// Consumes this `BufWriter`, returning the underlying reader.
84+
///
85+
/// Note that any leftover data in the internal buffer is lost.
86+
pub fn into_inner(self) -> R {
87+
self.inner
88+
}
89+
90+
/// Returns a reference to the internally buffered data.
91+
///
92+
/// Unlike `fill_buf`, this will not attempt to fill the buffer if it is empty.
93+
pub fn buffer(&self) -> &[u8] {
94+
&self.buf[self.pos..self.cap]
95+
}
96+
97+
/// Invalidates all data in the internal buffer.
98+
#[inline]
99+
fn discard_buffer(mut self: Pin<&mut Self>) {
100+
*self.as_mut().pos() = 0;
101+
*self.cap() = 0;
102+
}
103+
}
104+
105+
impl<R: AsyncRead> AsyncRead for BufReader<R> {
106+
fn poll_read(
107+
mut self: Pin<&mut Self>,
108+
cx: &mut Context<'_>,
109+
buf: &mut [u8],
110+
) -> Poll<io::Result<usize>> {
111+
// If we don't have any buffered data and we're doing a massive read
112+
// (larger than our internal buffer), bypass our internal buffer
113+
// entirely.
114+
if self.pos == self.cap && buf.len() >= self.buf.len() {
115+
let res = ready!(self.as_mut().inner().poll_read(cx, buf));
116+
self.discard_buffer();
117+
return Poll::Ready(res);
118+
}
119+
let mut rem = ready!(self.as_mut().poll_fill_buf(cx))?;
120+
let nread = rem.read(buf)?;
121+
self.consume(nread);
122+
Poll::Ready(Ok(nread))
123+
}
124+
125+
// we can't skip unconditionally because of the large buffer case in read.
126+
unsafe fn prepare_uninitialized_buffer(&self, buf: &mut [u8]) -> bool {
127+
self.inner.prepare_uninitialized_buffer(buf)
128+
}
129+
}
130+
131+
impl<R: AsyncRead> AsyncBufRead for BufReader<R> {
132+
fn poll_fill_buf(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<&[u8]>> {
133+
let Self {
134+
inner,
135+
buf,
136+
cap,
137+
pos,
138+
} = unsafe { self.get_unchecked_mut() };
139+
let mut inner = unsafe { Pin::new_unchecked(inner) };
140+
141+
// If we've reached the end of our internal buffer then we need to fetch
142+
// some more data from the underlying reader.
143+
// Branch using `>=` instead of the more correct `==`
144+
// to tell the compiler that the pos..cap slice is always valid.
145+
if *pos >= *cap {
146+
debug_assert!(*pos == *cap);
147+
*cap = ready!(inner.as_mut().poll_read(cx, buf))?;
148+
*pos = 0;
149+
}
150+
Poll::Ready(Ok(&buf[*pos..*cap]))
151+
}
152+
153+
fn consume(mut self: Pin<&mut Self>, amt: usize) {
154+
*self.as_mut().pos() = cmp::min(self.pos + amt, self.cap);
155+
}
156+
}
157+
158+
impl<R: AsyncRead + fmt::Debug> fmt::Debug for BufReader<R> {
159+
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
160+
f.debug_struct("BufReader")
161+
.field("reader", &self.inner)
162+
.field(
163+
"buffer",
164+
&format_args!("{}/{}", self.cap - self.pos, self.buf.len()),
165+
)
166+
.finish()
167+
}
168+
}

tokio-io/src/io/buf_writer.rs

Lines changed: 159 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,159 @@
1+
use super::DEFAULT_BUF_SIZE;
2+
use crate::AsyncWrite;
3+
use futures_core::ready;
4+
use pin_utils::{unsafe_pinned, unsafe_unpinned};
5+
use std::fmt;
6+
use std::io::{self, Write};
7+
use std::pin::Pin;
8+
use std::task::{Context, Poll};
9+
10+
/// Wraps a writer and buffers its output.
11+
///
12+
/// It can be excessively inefficient to work directly with something that
13+
/// implements [`AsyncWrite`]. A `BufWriter` keeps an in-memory buffer of data and
14+
/// writes it to an underlying writer in large, infrequent batches.
15+
///
16+
/// `BufWriter` can improve the speed of programs that make *small* and
17+
/// *repeated* write calls to the same file or network socket. It does not
18+
/// help when writing very large amounts at once, or writing just one or a few
19+
/// times. It also provides no advantage when writing to a destination that is
20+
/// in memory, like a `Vec<u8>`.
21+
///
22+
/// When the `BufWriter` is dropped, the contents of its buffer will be
23+
/// discarded. Creating multiple instances of a `BufWriter` on the same
24+
/// stream can cause data loss. If you need to write out the contents of its
25+
/// buffer, you must manually call flush before the writer is dropped.
26+
///
27+
/// [`AsyncWrite`]: tokio_io::AsyncWrite
28+
/// [`flush`]: super::AsyncWriteExt::flush
29+
///
30+
// TODO: Examples
31+
pub struct BufWriter<W> {
32+
inner: W,
33+
buf: Vec<u8>,
34+
written: usize,
35+
}
36+
37+
impl<W: AsyncWrite> BufWriter<W> {
38+
unsafe_pinned!(inner: W);
39+
unsafe_unpinned!(buf: Vec<u8>);
40+
41+
/// Creates a new `BufWriter` with a default buffer capacity. The default is currently 8 KB,
42+
/// but may change in the future.
43+
pub fn new(inner: W) -> Self {
44+
Self::with_capacity(DEFAULT_BUF_SIZE, inner)
45+
}
46+
47+
/// Creates a new `BufWriter` with the specified buffer capacity.
48+
pub fn with_capacity(cap: usize, inner: W) -> Self {
49+
Self {
50+
inner,
51+
buf: Vec::with_capacity(cap),
52+
written: 0,
53+
}
54+
}
55+
56+
fn flush_buf(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
57+
let Self {
58+
inner,
59+
buf,
60+
written,
61+
} = unsafe { self.get_unchecked_mut() };
62+
let mut inner = unsafe { Pin::new_unchecked(inner) };
63+
64+
let len = buf.len();
65+
let mut ret = Ok(());
66+
while *written < len {
67+
match ready!(inner.as_mut().poll_write(cx, &buf[*written..])) {
68+
Ok(0) => {
69+
ret = Err(io::Error::new(
70+
io::ErrorKind::WriteZero,
71+
"failed to write the buffered data",
72+
));
73+
break;
74+
}
75+
Ok(n) => *written += n,
76+
Err(e) => {
77+
ret = Err(e);
78+
break;
79+
}
80+
}
81+
}
82+
if *written > 0 {
83+
buf.drain(..*written);
84+
}
85+
*written = 0;
86+
Poll::Ready(ret)
87+
}
88+
89+
/// Gets a reference to the underlying writer.
90+
pub fn get_ref(&self) -> &W {
91+
&self.inner
92+
}
93+
94+
/// Gets a mutable reference to the underlying writer.
95+
///
96+
/// It is inadvisable to directly write to the underlying writer.
97+
pub fn get_mut(&mut self) -> &mut W {
98+
&mut self.inner
99+
}
100+
101+
/// Gets a pinned mutable reference to the underlying writer.
102+
///
103+
/// It is inadvisable to directly write to the underlying writer.
104+
pub fn get_pin_mut(self: Pin<&mut Self>) -> Pin<&mut W> {
105+
self.inner()
106+
}
107+
108+
/// Consumes this `BufWriter`, returning the underlying writer.
109+
///
110+
/// Note that any leftover data in the internal buffer is lost.
111+
pub fn into_inner(self) -> W {
112+
self.inner
113+
}
114+
115+
/// Returns a reference to the internally buffered data.
116+
pub fn buffer(&self) -> &[u8] {
117+
&self.buf
118+
}
119+
}
120+
121+
impl<W: AsyncWrite> AsyncWrite for BufWriter<W> {
122+
fn poll_write(
123+
mut self: Pin<&mut Self>,
124+
cx: &mut Context<'_>,
125+
buf: &[u8],
126+
) -> Poll<io::Result<usize>> {
127+
if self.buf.len() + buf.len() > self.buf.capacity() {
128+
ready!(self.as_mut().flush_buf(cx))?;
129+
}
130+
if buf.len() >= self.buf.capacity() {
131+
self.inner().poll_write(cx, buf)
132+
} else {
133+
Poll::Ready(self.buf().write(buf))
134+
}
135+
}
136+
137+
fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
138+
ready!(self.as_mut().flush_buf(cx))?;
139+
self.inner().poll_flush(cx)
140+
}
141+
142+
fn poll_shutdown(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
143+
ready!(self.as_mut().flush_buf(cx))?;
144+
self.inner().poll_shutdown(cx)
145+
}
146+
}
147+
148+
impl<W: AsyncWrite + fmt::Debug> fmt::Debug for BufWriter<W> {
149+
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
150+
f.debug_struct("BufWriter")
151+
.field("writer", &self.inner)
152+
.field(
153+
"buffer",
154+
&format_args!("{}/{}", self.buf.len(), self.buf.capacity()),
155+
)
156+
.field("written", &self.written)
157+
.finish()
158+
}
159+
}

tokio-io/src/io/mod.rs

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,8 @@
3939
mod async_buf_read_ext;
4040
mod async_read_ext;
4141
mod async_write_ext;
42+
mod buf_reader;
43+
mod buf_writer;
4244
mod copy;
4345
mod flush;
4446
mod lines;
@@ -58,3 +60,11 @@ pub use self::async_buf_read_ext::AsyncBufReadExt;
5860
pub use self::async_read_ext::AsyncReadExt;
5961
#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
6062
pub use self::async_write_ext::AsyncWriteExt;
63+
#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
64+
pub use self::buf_reader::BufReader;
65+
#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
66+
pub use self::buf_writer::BufWriter;
67+
68+
// used by `BufReader` and `BufWriter`
69+
// https://github.com/rust-lang/rust/blob/master/src/libstd/sys_common/io.rs#L1
70+
const DEFAULT_BUF_SIZE: usize = 8 * 1024;

tokio-io/src/lib.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ pub use self::async_read::AsyncRead;
2727
pub use self::async_write::AsyncWrite;
2828

2929
#[cfg(feature = "util")]
30-
pub use self::io::{AsyncBufReadExt, AsyncReadExt, AsyncWriteExt};
30+
pub use self::io::{AsyncBufReadExt, AsyncReadExt, AsyncWriteExt, BufReader, BufWriter};
3131

3232
// Re-export `Buf` and `BufMut` since they are part of the API
3333
pub use bytes::{Buf, BufMut};

tokio/src/io.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,8 @@
4040
#[cfg(feature = "fs")]
4141
pub use tokio_fs::{stderr, stdin, stdout, Stderr, Stdin, Stdout};
4242
pub use tokio_io::{
43-
AsyncBufRead, AsyncBufReadExt, AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt,
43+
AsyncBufRead, AsyncBufReadExt, AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt, BufReader,
44+
BufWriter,
4445
};
4546

4647
// Re-export io::Error so that users don't have to deal

0 commit comments

Comments
 (0)