@@ -3,75 +3,32 @@ use futures_core::task::{Context, Poll};
33use futures_io:: { AsyncRead , AsyncWrite } ;
44use std:: io;
55use std:: pin:: Pin ;
6+ use super :: { BufReader , CopyBufInto } ;
7+ use pin_utils:: unsafe_pinned;
68
79/// Future for the [`copy_into`](super::AsyncReadExt::copy_into) method.
810#[ derive( Debug ) ]
911#[ must_use = "futures do nothing unless you `.await` or poll them" ]
10- pub struct CopyInto < ' a , R : ?Sized + Unpin , W : ?Sized + Unpin > {
11- reader : & ' a mut R ,
12- read_done : bool ,
13- writer : & ' a mut W ,
14- pos : usize ,
15- cap : usize ,
16- amt : u64 ,
17- buf : Box < [ u8 ] > ,
12+ pub struct CopyInto < R : AsyncRead , W > {
13+ inner : CopyBufInto < BufReader < R > , W > ,
1814}
1915
20- impl < R : ? Sized + Unpin , W : ? Sized + Unpin > Unpin for CopyInto < ' _ , R , W > { }
16+ impl < R : AsyncRead , W > Unpin for CopyInto < R , W > where CopyBufInto < BufReader < R > , W > : Unpin { }
2117
22- impl < ' a , R : ?Sized + Unpin , W : ?Sized + Unpin > CopyInto < ' a , R , W > {
23- pub ( super ) fn new ( reader : & ' a mut R , writer : & ' a mut W ) -> Self {
18+ impl < R : AsyncRead , W > CopyInto < R , W > {
19+ unsafe_pinned ! ( inner: CopyBufInto <BufReader <R >, W >) ;
20+
21+ pub ( super ) fn new ( reader : R , writer : W ) -> Self {
2422 CopyInto {
25- reader,
26- read_done : false ,
27- writer,
28- amt : 0 ,
29- pos : 0 ,
30- cap : 0 ,
31- buf : Box :: new ( [ 0 ; 2048 ] ) ,
23+ inner : CopyBufInto :: new ( BufReader :: with_capacity ( 2048 , reader) , writer) ,
3224 }
3325 }
3426}
3527
36- impl < R , W > Future for CopyInto < ' _ , R , W >
37- where R : AsyncRead + ?Sized + Unpin ,
38- W : AsyncWrite + ?Sized + Unpin ,
39- {
28+ impl < R : AsyncRead , W : AsyncWrite > Future for CopyInto < R , W > {
4029 type Output = io:: Result < u64 > ;
4130
42- fn poll ( mut self : Pin < & mut Self > , cx : & mut Context < ' _ > ) -> Poll < Self :: Output > {
43- let this = & mut * self ;
44- loop {
45- // If our buffer is empty, then we need to read some data to
46- // continue.
47- if this. pos == this. cap && !this. read_done {
48- let n = ready ! ( Pin :: new( & mut this. reader) . poll_read( cx, & mut this. buf) ) ?;
49- if n == 0 {
50- this. read_done = true ;
51- } else {
52- this. pos = 0 ;
53- this. cap = n;
54- }
55- }
56-
57- // If our buffer has some data, let's write it out!
58- while this. pos < this. cap {
59- let i = ready ! ( Pin :: new( & mut this. writer) . poll_write( cx, & this. buf[ this. pos..this. cap] ) ) ?;
60- if i == 0 {
61- return Poll :: Ready ( Err ( io:: ErrorKind :: WriteZero . into ( ) ) )
62- } else {
63- this. pos += i;
64- this. amt += i as u64 ;
65- }
66- }
67-
68- // If we've written al the data and we've seen EOF, flush out the
69- // data and finish the transfer.
70- // done with the entire transfer.
71- if this. pos == this. cap && this. read_done {
72- ready ! ( Pin :: new( & mut this. writer) . poll_flush( cx) ) ?;
73- return Poll :: Ready ( Ok ( this. amt ) ) ;
74- }
75- }
31+ fn poll ( self : Pin < & mut Self > , cx : & mut Context < ' _ > ) -> Poll < Self :: Output > {
32+ self . inner ( ) . poll ( cx)
7633 }
7734}
0 commit comments