Skip to content

Commit c14acc4

Browse files
author
Your Name
committed
splittable RequestStream
1 parent 022d7ef commit c14acc4

File tree

5 files changed

+151
-51
lines changed

5 files changed

+151
-51
lines changed

h3/src/client.rs

Lines changed: 20 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -261,17 +261,11 @@ impl Builder {
261261
}
262262
}
263263

264-
pub struct RequestStream<S, B>
265-
where
266-
S: quic::RecvStream,
267-
{
268-
inner: connection::RequestStream<FrameStream<S>, B>,
264+
pub struct RequestStream<S, B> {
265+
inner: connection::RequestStream<S, B>,
269266
}
270267

271-
impl<S, B> ConnectionState for RequestStream<S, B>
272-
where
273-
S: quic::RecvStream,
274-
{
268+
impl<S, B> ConnectionState for RequestStream<S, B> {
275269
fn shared_state(&self) -> &SharedStateRef {
276270
&self.inner.conn_state
277271
}
@@ -339,7 +333,7 @@ where
339333

340334
impl<S, B> RequestStream<S, B>
341335
where
342-
S: quic::RecvStream + quic::SendStream<B>,
336+
S: quic::SendStream<B>,
343337
B: Buf,
344338
{
345339
pub async fn send_data(&mut self, buf: B) -> Result<(), Error> {
@@ -354,3 +348,19 @@ where
354348
self.inner.finish().await
355349
}
356350
}
351+
352+
impl<S, B> RequestStream<S, B>
353+
where
354+
S: quic::BidiStream<B>,
355+
B: Buf,
356+
{
357+
pub fn split(
358+
self,
359+
) -> (
360+
RequestStream<S::SendStream, B>,
361+
RequestStream<S::RecvStream, B>,
362+
) {
363+
let (send, recv) = self.inner.split();
364+
(RequestStream { inner: send }, RequestStream { inner: recv })
365+
}
366+
}

h3/src/connection.rs

Lines changed: 42 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
11
use std::{
22
convert::TryFrom,
3-
marker::PhantomData,
43
sync::{Arc, RwLock, RwLockReadGuard, RwLockWriteGuard},
54
task::{Context, Poll},
65
};
@@ -79,9 +78,9 @@ where
7978
pub(super) shared: SharedStateRef,
8079
conn: C,
8180
control_send: C::SendStream,
82-
control_recv: Option<FrameStream<C::RecvStream>>,
83-
decoder_recv: Option<AcceptedRecvStream<C::RecvStream>>,
84-
encoder_recv: Option<AcceptedRecvStream<C::RecvStream>>,
81+
control_recv: Option<FrameStream<C::RecvStream, B>>,
82+
decoder_recv: Option<AcceptedRecvStream<C::RecvStream, B>>,
83+
encoder_recv: Option<AcceptedRecvStream<C::RecvStream, B>>,
8584
pending_recv_streams: Vec<AcceptRecvStream<C::RecvStream>>,
8685
// The id of the last stream received by this connection:
8786
// request and push stream for server and clients respectively.
@@ -307,21 +306,23 @@ where
307306
}
308307

309308
pub struct RequestStream<S, B> {
310-
pub(super) stream: S,
309+
pub(super) stream: FrameStream<S, B>,
311310
pub(super) trailers: Option<Bytes>,
312311
pub(super) conn_state: SharedStateRef,
313312
pub(super) max_field_section_size: u64,
314-
_phantom_buffer: PhantomData<B>,
315313
}
316314

317315
impl<S, B> RequestStream<S, B> {
318-
pub fn new(stream: S, max_field_section_size: u64, conn_state: SharedStateRef) -> Self {
316+
pub fn new(
317+
stream: FrameStream<S, B>,
318+
max_field_section_size: u64,
319+
conn_state: SharedStateRef,
320+
) -> Self {
319321
Self {
320322
stream,
321323
conn_state,
322324
max_field_section_size,
323325
trailers: None,
324-
_phantom_buffer: PhantomData,
325326
}
326327
}
327328
}
@@ -332,7 +333,7 @@ impl<S, B> ConnectionState for RequestStream<S, B> {
332333
}
333334
}
334335

335-
impl<S, B> RequestStream<FrameStream<S>, B>
336+
impl<S, B> RequestStream<S, B>
336337
where
337338
S: quic::RecvStream,
338339
{
@@ -406,9 +407,9 @@ where
406407
}
407408
}
408409

409-
impl<S, B> RequestStream<FrameStream<S>, B>
410+
impl<S, B> RequestStream<S, B>
410411
where
411-
S: quic::SendStream<B> + quic::RecvStream,
412+
S: quic::SendStream<B>,
412413
B: Buf,
413414
{
414415
/// Send some data on the response body.
@@ -449,3 +450,33 @@ where
449450
.map_err(|e| self.maybe_conn_err(e))
450451
}
451452
}
453+
454+
impl<S, B> RequestStream<S, B>
455+
where
456+
S: quic::BidiStream<B>,
457+
B: Buf,
458+
{
459+
pub(crate) fn split(
460+
self,
461+
) -> (
462+
RequestStream<S::SendStream, B>,
463+
RequestStream<S::RecvStream, B>,
464+
) {
465+
let (send, recv) = self.stream.split();
466+
467+
(
468+
RequestStream {
469+
stream: send,
470+
trailers: None,
471+
conn_state: self.conn_state.clone(),
472+
max_field_section_size: 0,
473+
},
474+
RequestStream {
475+
stream: recv,
476+
trailers: self.trailers,
477+
conn_state: self.conn_state,
478+
max_field_section_size: self.max_field_section_size,
479+
},
480+
)
481+
}
482+
}

h3/src/frame.rs

Lines changed: 41 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
use std::marker::PhantomData;
12
use std::task::{Context, Poll};
23

34
use bytes::{Buf, Bytes};
@@ -12,26 +13,21 @@ use crate::{
1213
frame::{self, Frame, PayloadLen},
1314
stream::StreamId,
1415
},
15-
quic::{RecvStream, SendStream},
16+
quic::{BidiStream, RecvStream, SendStream},
1617
stream::WriteBuf,
1718
};
1819

19-
pub struct FrameStream<S>
20-
where
21-
S: RecvStream,
22-
{
20+
pub struct FrameStream<S, B> {
2321
stream: S,
2422
bufs: BufList<Bytes>,
2523
decoder: FrameDecoder,
2624
remaining_data: usize,
2725
/// Set to true when `stream` reaches the end.
2826
is_eos: bool,
27+
_phantom_buffer: PhantomData<B>,
2928
}
3029

31-
impl<S> FrameStream<S>
32-
where
33-
S: RecvStream,
34-
{
30+
impl<S, B> FrameStream<S, B> {
3531
pub fn new(stream: S) -> Self {
3632
Self::with_bufs(stream, BufList::new())
3733
}
@@ -43,9 +39,15 @@ where
4339
decoder: FrameDecoder::default(),
4440
remaining_data: 0,
4541
is_eos: false,
42+
_phantom_buffer: PhantomData,
4643
}
4744
}
45+
}
4846

47+
impl<S, B> FrameStream<S, B>
48+
where
49+
S: RecvStream,
50+
{
4951
pub fn poll_next(
5052
&mut self,
5153
cx: &mut Context<'_>,
@@ -136,9 +138,9 @@ where
136138
}
137139
}
138140

139-
impl<T, B> SendStream<B> for FrameStream<T>
141+
impl<T, B> SendStream<B> for FrameStream<T, B>
140142
where
141-
T: SendStream<B> + RecvStream,
143+
T: SendStream<B>,
142144
B: Buf,
143145
{
144146
type Error = <T as SendStream<B>>::Error;
@@ -164,6 +166,34 @@ where
164166
}
165167
}
166168

169+
impl<S, B> FrameStream<S, B>
170+
where
171+
S: BidiStream<B>,
172+
B: Buf,
173+
{
174+
pub(crate) fn split(self) -> (FrameStream<S::SendStream, B>, FrameStream<S::RecvStream, B>) {
175+
let (send, recv) = self.stream.split();
176+
(
177+
FrameStream {
178+
stream: send,
179+
bufs: BufList::new(),
180+
decoder: FrameDecoder::default(),
181+
remaining_data: 0,
182+
is_eos: false,
183+
_phantom_buffer: PhantomData,
184+
},
185+
FrameStream {
186+
stream: recv,
187+
bufs: self.bufs,
188+
decoder: self.decoder,
189+
remaining_data: self.remaining_data,
190+
is_eos: self.is_eos,
191+
_phantom_buffer: PhantomData,
192+
},
193+
)
194+
}
195+
}
196+
167197
#[derive(Default)]
168198
pub struct FrameDecoder {
169199
expected: Option<usize>,

h3/src/server.rs

Lines changed: 44 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
use std::{
22
collections::HashSet,
33
convert::TryFrom,
4+
sync::Arc,
45
task::{Context, Poll},
56
};
67

@@ -107,8 +108,10 @@ where
107108
};
108109

109110
let mut request_stream = RequestStream {
110-
stream_id: stream.id(),
111-
request_end: self.request_end_send.clone(),
111+
request_end: Arc::new(RequestEnd {
112+
request_end: self.request_end_send.clone(),
113+
stream_id: stream.id(),
114+
}),
112115
inner: connection::RequestStream::new(
113116
stream,
114117
self.max_field_section_size,
@@ -296,19 +299,23 @@ impl Builder {
296299
}
297300
}
298301

299-
pub struct RequestStream<S, B>
300-
where
301-
S: quic::RecvStream,
302-
{
303-
inner: connection::RequestStream<FrameStream<S>, B>,
304-
stream_id: StreamId,
302+
pub struct RequestEnd {
305303
request_end: mpsc::UnboundedSender<StreamId>,
304+
stream_id: StreamId,
306305
}
307306

308-
impl<S, B> ConnectionState for RequestStream<S, B>
309-
where
310-
S: quic::RecvStream,
311-
{
307+
pub struct RequestStream<S, B> {
308+
inner: connection::RequestStream<S, B>,
309+
request_end: Arc<RequestEnd>,
310+
}
311+
312+
impl<S, B> AsMut<connection::RequestStream<S, B>> for RequestStream<S, B> {
313+
fn as_mut(&mut self) -> &mut connection::RequestStream<S, B> {
314+
&mut self.inner
315+
}
316+
}
317+
318+
impl<S, B> ConnectionState for RequestStream<S, B> {
312319
fn shared_state(&self) -> &SharedStateRef {
313320
&self.inner.conn_state
314321
}
@@ -329,7 +336,7 @@ where
329336

330337
impl<S, B> RequestStream<S, B>
331338
where
332-
S: quic::RecvStream + quic::SendStream<B>,
339+
S: quic::SendStream<B>,
333340
B: Buf,
334341
{
335342
pub async fn send_response(&mut self, resp: Response<()>) -> Result<(), Error> {
@@ -393,10 +400,32 @@ where
393400
}
394401
}
395402

396-
impl<S, B> Drop for RequestStream<S, B>
403+
impl<S, B> RequestStream<S, B>
397404
where
398-
S: quic::RecvStream,
405+
S: quic::BidiStream<B>,
406+
B: Buf,
399407
{
408+
pub fn split(
409+
self,
410+
) -> (
411+
RequestStream<S::SendStream, B>,
412+
RequestStream<S::RecvStream, B>,
413+
) {
414+
let (send, recv) = self.inner.split();
415+
(
416+
RequestStream {
417+
inner: send,
418+
request_end: self.request_end.clone(),
419+
},
420+
RequestStream {
421+
inner: recv,
422+
request_end: self.request_end,
423+
},
424+
)
425+
}
426+
}
427+
428+
impl Drop for RequestEnd {
400429
fn drop(&mut self) {
401430
if let Err(e) = self.request_end.send(self.stream_id) {
402431
error!(

h3/src/stream.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -159,12 +159,12 @@ where
159159
}
160160
}
161161

162-
pub(super) enum AcceptedRecvStream<S>
162+
pub(super) enum AcceptedRecvStream<S, B>
163163
where
164164
S: quic::RecvStream,
165165
{
166-
Control(FrameStream<S>),
167-
Push(u64, FrameStream<S>),
166+
Control(FrameStream<S, B>),
167+
Push(u64, FrameStream<S, B>),
168168
Encoder(S),
169169
Decoder(S),
170170
Reserved,
@@ -195,7 +195,7 @@ where
195195
}
196196
}
197197

198-
pub fn into_stream(self) -> Result<AcceptedRecvStream<S>, Error> {
198+
pub fn into_stream<B>(self) -> Result<AcceptedRecvStream<S, B>, Error> {
199199
Ok(match self.ty.expect("Stream type not resolved yet") {
200200
StreamType::CONTROL => {
201201
AcceptedRecvStream::Control(FrameStream::with_bufs(self.stream, self.buf))

0 commit comments

Comments
 (0)