Skip to content

Commit 3e2c9d1

Browse files
authored
splittable RequestStream (#97)
Closes #83
1 parent bddccee commit 3e2c9d1

File tree

5 files changed

+158
-58
lines changed

5 files changed

+158
-58
lines changed

h3/src/client.rs

Lines changed: 20 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -261,17 +261,11 @@ impl Builder {
261261
}
262262
}
263263

264-
pub struct RequestStream<S, B>
265-
where
266-
S: quic::RecvStream,
267-
{
268-
inner: connection::RequestStream<FrameStream<S>, B>,
264+
pub struct RequestStream<S, B> {
265+
inner: connection::RequestStream<S, B>,
269266
}
270267

271-
impl<S, B> ConnectionState for RequestStream<S, B>
272-
where
273-
S: quic::RecvStream,
274-
{
268+
impl<S, B> ConnectionState for RequestStream<S, B> {
275269
fn shared_state(&self) -> &SharedStateRef {
276270
&self.inner.conn_state
277271
}
@@ -339,7 +333,7 @@ where
339333

340334
impl<S, B> RequestStream<S, B>
341335
where
342-
S: quic::RecvStream + quic::SendStream<B>,
336+
S: quic::SendStream<B>,
343337
B: Buf,
344338
{
345339
pub async fn send_data(&mut self, buf: B) -> Result<(), Error> {
@@ -354,3 +348,19 @@ where
354348
self.inner.finish().await
355349
}
356350
}
351+
352+
impl<S, B> RequestStream<S, B>
353+
where
354+
S: quic::BidiStream<B>,
355+
B: Buf,
356+
{
357+
pub fn split(
358+
self,
359+
) -> (
360+
RequestStream<S::SendStream, B>,
361+
RequestStream<S::RecvStream, B>,
362+
) {
363+
let (send, recv) = self.inner.split();
364+
(RequestStream { inner: send }, RequestStream { inner: recv })
365+
}
366+
}

h3/src/connection.rs

Lines changed: 42 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
11
use std::{
22
convert::TryFrom,
3-
marker::PhantomData,
43
sync::{Arc, RwLock, RwLockReadGuard, RwLockWriteGuard},
54
task::{Context, Poll},
65
};
@@ -79,9 +78,9 @@ where
7978
pub(super) shared: SharedStateRef,
8079
conn: C,
8180
control_send: C::SendStream,
82-
control_recv: Option<FrameStream<C::RecvStream>>,
83-
decoder_recv: Option<AcceptedRecvStream<C::RecvStream>>,
84-
encoder_recv: Option<AcceptedRecvStream<C::RecvStream>>,
81+
control_recv: Option<FrameStream<C::RecvStream, B>>,
82+
decoder_recv: Option<AcceptedRecvStream<C::RecvStream, B>>,
83+
encoder_recv: Option<AcceptedRecvStream<C::RecvStream, B>>,
8584
pending_recv_streams: Vec<AcceptRecvStream<C::RecvStream>>,
8685
// The id of the last stream received by this connection:
8786
// request and push stream for server and clients respectively.
@@ -307,21 +306,23 @@ where
307306
}
308307

309308
pub struct RequestStream<S, B> {
310-
pub(super) stream: S,
309+
pub(super) stream: FrameStream<S, B>,
311310
pub(super) trailers: Option<Bytes>,
312311
pub(super) conn_state: SharedStateRef,
313312
pub(super) max_field_section_size: u64,
314-
_phantom_buffer: PhantomData<B>,
315313
}
316314

317315
impl<S, B> RequestStream<S, B> {
318-
pub fn new(stream: S, max_field_section_size: u64, conn_state: SharedStateRef) -> Self {
316+
pub fn new(
317+
stream: FrameStream<S, B>,
318+
max_field_section_size: u64,
319+
conn_state: SharedStateRef,
320+
) -> Self {
319321
Self {
320322
stream,
321323
conn_state,
322324
max_field_section_size,
323325
trailers: None,
324-
_phantom_buffer: PhantomData,
325326
}
326327
}
327328
}
@@ -332,7 +333,7 @@ impl<S, B> ConnectionState for RequestStream<S, B> {
332333
}
333334
}
334335

335-
impl<S, B> RequestStream<FrameStream<S>, B>
336+
impl<S, B> RequestStream<S, B>
336337
where
337338
S: quic::RecvStream,
338339
{
@@ -406,9 +407,9 @@ where
406407
}
407408
}
408409

409-
impl<S, B> RequestStream<FrameStream<S>, B>
410+
impl<S, B> RequestStream<S, B>
410411
where
411-
S: quic::SendStream<B> + quic::RecvStream,
412+
S: quic::SendStream<B>,
412413
B: Buf,
413414
{
414415
/// Send some data on the response body.
@@ -449,3 +450,33 @@ where
449450
.map_err(|e| self.maybe_conn_err(e))
450451
}
451452
}
453+
454+
impl<S, B> RequestStream<S, B>
455+
where
456+
S: quic::BidiStream<B>,
457+
B: Buf,
458+
{
459+
pub(crate) fn split(
460+
self,
461+
) -> (
462+
RequestStream<S::SendStream, B>,
463+
RequestStream<S::RecvStream, B>,
464+
) {
465+
let (send, recv) = self.stream.split();
466+
467+
(
468+
RequestStream {
469+
stream: send,
470+
trailers: None,
471+
conn_state: self.conn_state.clone(),
472+
max_field_section_size: 0,
473+
},
474+
RequestStream {
475+
stream: recv,
476+
trailers: self.trailers,
477+
conn_state: self.conn_state,
478+
max_field_section_size: self.max_field_section_size,
479+
},
480+
)
481+
}
482+
}

h3/src/frame.rs

Lines changed: 48 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
use std::marker::PhantomData;
12
use std::task::{Context, Poll};
23

34
use bytes::{Buf, Bytes};
@@ -12,26 +13,21 @@ use crate::{
1213
frame::{self, Frame, PayloadLen},
1314
stream::StreamId,
1415
},
15-
quic::{RecvStream, SendStream},
16+
quic::{BidiStream, RecvStream, SendStream},
1617
stream::WriteBuf,
1718
};
1819

19-
pub struct FrameStream<S>
20-
where
21-
S: RecvStream,
22-
{
20+
pub struct FrameStream<S, B> {
2321
stream: S,
2422
bufs: BufList<Bytes>,
2523
decoder: FrameDecoder,
2624
remaining_data: usize,
2725
/// Set to true when `stream` reaches the end.
2826
is_eos: bool,
27+
_phantom_buffer: PhantomData<B>,
2928
}
3029

31-
impl<S> FrameStream<S>
32-
where
33-
S: RecvStream,
34-
{
30+
impl<S, B> FrameStream<S, B> {
3531
pub fn new(stream: S) -> Self {
3632
Self::with_bufs(stream, BufList::new())
3733
}
@@ -43,9 +39,15 @@ where
4339
decoder: FrameDecoder::default(),
4440
remaining_data: 0,
4541
is_eos: false,
42+
_phantom_buffer: PhantomData,
4643
}
4744
}
45+
}
4846

47+
impl<S, B> FrameStream<S, B>
48+
where
49+
S: RecvStream,
50+
{
4951
pub fn poll_next(
5052
&mut self,
5153
cx: &mut Context<'_>,
@@ -136,9 +138,9 @@ where
136138
}
137139
}
138140

139-
impl<T, B> SendStream<B> for FrameStream<T>
141+
impl<T, B> SendStream<B> for FrameStream<T, B>
140142
where
141-
T: SendStream<B> + RecvStream,
143+
T: SendStream<B>,
142144
B: Buf,
143145
{
144146
type Error = <T as SendStream<B>>::Error;
@@ -164,6 +166,34 @@ where
164166
}
165167
}
166168

169+
impl<S, B> FrameStream<S, B>
170+
where
171+
S: BidiStream<B>,
172+
B: Buf,
173+
{
174+
pub(crate) fn split(self) -> (FrameStream<S::SendStream, B>, FrameStream<S::RecvStream, B>) {
175+
let (send, recv) = self.stream.split();
176+
(
177+
FrameStream {
178+
stream: send,
179+
bufs: BufList::new(),
180+
decoder: FrameDecoder::default(),
181+
remaining_data: 0,
182+
is_eos: false,
183+
_phantom_buffer: PhantomData,
184+
},
185+
FrameStream {
186+
stream: recv,
187+
bufs: self.bufs,
188+
decoder: self.decoder,
189+
remaining_data: self.remaining_data,
190+
is_eos: self.is_eos,
191+
_phantom_buffer: PhantomData,
192+
},
193+
)
194+
}
195+
}
196+
167197
#[derive(Default)]
168198
pub struct FrameDecoder {
169199
expected: Option<usize>,
@@ -338,7 +368,7 @@ mod tests {
338368
Frame::headers(&b"trailer"[..]).encode_with_payload(&mut buf);
339369
recv.chunk(buf.freeze());
340370

341-
let mut stream = FrameStream::new(recv);
371+
let mut stream: FrameStream<_, ()> = FrameStream::new(recv);
342372

343373
assert_poll_matches!(
344374
|mut cx| stream.poll_next(&mut cx),
@@ -366,7 +396,7 @@ mod tests {
366396
Frame::headers(&b"header"[..]).encode_with_payload(&mut buf);
367397
let mut buf = buf.freeze();
368398
recv.chunk(buf.split_to(buf.len() - 1));
369-
let mut stream = FrameStream::new(recv);
399+
let mut stream: FrameStream<_, ()> = FrameStream::new(recv);
370400

371401
assert_poll_matches!(
372402
|mut cx| stream.poll_next(&mut cx),
@@ -385,7 +415,7 @@ mod tests {
385415
FrameType::DATA.encode(&mut buf);
386416
VarInt::from(4u32).encode(&mut buf);
387417
recv.chunk(buf.freeze());
388-
let mut stream = FrameStream::new(recv);
418+
let mut stream: FrameStream<_, ()> = FrameStream::new(recv);
389419

390420
assert_poll_matches!(
391421
|mut cx| stream.poll_next(&mut cx),
@@ -407,7 +437,7 @@ mod tests {
407437
let mut buf = buf.freeze();
408438
recv.chunk(buf.split_to(buf.len() - 2));
409439
recv.chunk(buf);
410-
let mut stream = FrameStream::new(recv);
440+
let mut stream: FrameStream<_, ()> = FrameStream::new(recv);
411441

412442
// We get the total size of data about to be received
413443
assert_poll_matches!(
@@ -436,7 +466,7 @@ mod tests {
436466
VarInt::from(4u32).encode(&mut buf);
437467
buf.put_slice(&b"b"[..]);
438468
recv.chunk(buf.freeze());
439-
let mut stream = FrameStream::new(recv);
469+
let mut stream: FrameStream<_, ()> = FrameStream::new(recv);
440470

441471
assert_poll_matches!(
442472
|mut cx| stream.poll_next(&mut cx),
@@ -468,7 +498,7 @@ mod tests {
468498
Frame::Data(Bytes::from("body")).encode_with_payload(&mut buf);
469499

470500
recv.chunk(buf.freeze());
471-
let mut stream = FrameStream::new(recv);
501+
let mut stream: FrameStream<_, ()> = FrameStream::new(recv);
472502

473503
assert_poll_matches!(
474504
|mut cx| stream.poll_next(&mut cx),
@@ -490,7 +520,7 @@ mod tests {
490520
buf.put_slice(&b"bo"[..]);
491521
recv.chunk(buf.clone().freeze());
492522

493-
let mut stream = FrameStream::new(recv);
523+
let mut stream: FrameStream<_, ()> = FrameStream::new(recv);
494524

495525
assert_poll_matches!(
496526
|mut cx| stream.poll_next(&mut cx),

0 commit comments

Comments
 (0)