Skip to content

Commit 7ab9bac

Browse files
committed
fix: do not assign capacity for pending streams
`Prioritize::send_data` has a check to prevent assigning capacity to streams that are not yet open. Assigning flow control window to pending streams could starve already open streams. This change adds a similar check to `Prioritize::reserve_capacity`. Test `capacity_not_assigned_to_unopened_streams` in `flow_control.rs` demonstrates the fix. A number of other tests must be changed because they were assuming that pending streams immediately received connection capacity. This may be related to #853.
1 parent e793b24 commit 7ab9bac

File tree

4 files changed

+145
-25
lines changed

4 files changed

+145
-25
lines changed

src/proto/streams/prioritize.rs

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -186,13 +186,7 @@ impl Prioritize {
186186

187187
// `try_assign_capacity` will queue the stream to `pending_capacity` if the capcaity
188188
// cannot be assigned at the time it is called.
189-
//
190-
// Streams over the max concurrent count will still call `send_data` so we should be
191-
// careful not to put it into `pending_capacity` as it will starve the connection
192-
// capacity for other streams
193-
if !stream.is_pending_open {
194-
self.try_assign_capacity(stream);
195-
}
189+
self.try_assign_capacity(stream);
196190
}
197191

198192
if frame.is_end_stream() {
@@ -414,6 +408,12 @@ impl Prioritize {
414408

415409
/// Request capacity to send data
416410
fn try_assign_capacity(&mut self, stream: &mut store::Ptr) {
411+
// Streams over the max concurrent count should not have capacity assign to avoid starving the connection
412+
// capacity for open streams
413+
if stream.is_pending_open {
414+
return;
415+
}
416+
417417
let total_requested = stream.requested_send_capacity;
418418

419419
// Total requested should never go below actual assigned

tests/h2-tests/tests/flow_control.rs

Lines changed: 121 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -469,7 +469,7 @@ async fn stream_close_by_data_frame_releases_capacity() {
469469

470470
// The capacity should be immediately available as nothing else is
471471
// happening on the stream.
472-
assert_eq!(s1.capacity(), window_size);
472+
let mut s1 = h2.drive(util::wait_for_capacity(s1, window_size)).await;
473473

474474
let request = Request::builder()
475475
.method(Method::POST)
@@ -492,7 +492,7 @@ async fn stream_close_by_data_frame_releases_capacity() {
492492
s1.send_data("".into(), true).unwrap();
493493

494494
// The capacity should be available
495-
assert_eq!(s2.capacity(), 5);
495+
let mut s2 = h2.drive(util::wait_for_capacity(s2, 5)).await;
496496

497497
// Send the frame
498498
s2.send_data("hello".into(), true).unwrap();
@@ -539,9 +539,7 @@ async fn stream_close_by_trailers_frame_releases_capacity() {
539539
// This effectively reserves the entire connection window
540540
s1.reserve_capacity(window_size);
541541

542-
// The capacity should be immediately available as nothing else is
543-
// happening on the stream.
544-
assert_eq!(s1.capacity(), window_size);
542+
let mut s1 = h2.drive(util::wait_for_capacity(s1, window_size)).await;
545543

546544
let request = Request::builder()
547545
.method(Method::POST)
@@ -564,7 +562,7 @@ async fn stream_close_by_trailers_frame_releases_capacity() {
564562
s1.send_trailers(Default::default()).unwrap();
565563

566564
// The capacity should be available
567-
assert_eq!(s2.capacity(), 5);
565+
let mut s2 = h2.drive(util::wait_for_capacity(s2, 5)).await;
568566

569567
// Send the frame
570568
s2.send_data("hello".into(), true).unwrap();
@@ -997,10 +995,10 @@ async fn recv_no_init_window_then_receive_some_init_window() {
997995

998996
let (response, mut stream) = client.send_request(request, false).unwrap();
999997

1000-
stream.reserve_capacity(11);
998+
stream.reserve_capacity(10);
1001999

1002-
let mut stream = h2.drive(util::wait_for_capacity(stream, 11)).await;
1003-
assert_eq!(stream.capacity(), 11);
1000+
let mut stream = h2.drive(util::wait_for_capacity(stream, 10)).await;
1001+
assert_eq!(stream.capacity(), 10);
10041002

10051003
stream.send_data("hello world".into(), true).unwrap();
10061004

@@ -2068,6 +2066,120 @@ async fn reclaim_reserved_capacity() {
20682066
join(mock, h2).await;
20692067
}
20702068

2069+
#[tokio::test]
2070+
async fn capacity_not_assigned_to_unopened_streams() {
2071+
h2_support::trace_init!();
2072+
2073+
let (io, mut srv) = mock::new();
2074+
2075+
let mock = async move {
2076+
let mut settings = frame::Settings::default();
2077+
settings.set_max_concurrent_streams(Some(1));
2078+
let settings = srv.assert_client_handshake_with_settings(settings).await;
2079+
assert_default_settings!(settings);
2080+
2081+
srv.recv_frame(frames::headers(1).request("POST", "https://www.example.com/"))
2082+
.await;
2083+
srv.recv_frame(frames::data(1, "hello")).await;
2084+
srv.recv_frame(frames::data(1, "world").eos()).await;
2085+
srv.send_frame(frames::headers(1).response(200).eos()).await;
2086+
2087+
srv.recv_frame(frames::headers(3).request("POST", "https://www.example.com/"))
2088+
.await;
2089+
srv.send_frame(frames::window_update(
2090+
0,
2091+
frame::DEFAULT_INITIAL_WINDOW_SIZE + 10,
2092+
))
2093+
.await;
2094+
srv.recv_frame(frames::reset(3).cancel()).await;
2095+
};
2096+
2097+
let h2 = async move {
2098+
let (mut client, mut h2) = client::handshake(io).await.unwrap();
2099+
let request = Request::builder()
2100+
.method(Method::POST)
2101+
.uri("https://www.example.com/")
2102+
.body(())
2103+
.unwrap();
2104+
2105+
let (response1, mut stream1) = client.send_request(request.clone(), false).unwrap();
2106+
stream1.send_data("hello".into(), false).unwrap();
2107+
let (_, mut stream2) = client.send_request(request.clone(), false).unwrap();
2108+
stream2.reserve_capacity(frame::DEFAULT_INITIAL_WINDOW_SIZE as usize);
2109+
stream1.send_data("world".into(), true).unwrap();
2110+
h2.drive(response1).await.unwrap();
2111+
let stream2 = h2
2112+
.drive(util::wait_for_capacity(
2113+
stream2,
2114+
frame::DEFAULT_INITIAL_WINDOW_SIZE as usize,
2115+
))
2116+
.await;
2117+
drop(stream2);
2118+
h2.await.unwrap();
2119+
};
2120+
2121+
join(mock, h2).await;
2122+
}
2123+
2124+
#[tokio::test]
2125+
async fn new_initial_window_size_capacity_not_assigned_to_unopened_streams() {
2126+
h2_support::trace_init!();
2127+
2128+
let (io, mut srv) = mock::new();
2129+
2130+
let mock = async move {
2131+
let mut settings = frame::Settings::default();
2132+
settings.set_max_concurrent_streams(Some(1));
2133+
settings.set_initial_window_size(Some(10));
2134+
let settings = srv.assert_client_handshake_with_settings(settings).await;
2135+
assert_default_settings!(settings);
2136+
2137+
srv.recv_frame(frames::headers(1).request("POST", "https://www.example.com/"))
2138+
.await;
2139+
srv.recv_frame(frames::data(1, "hello")).await;
2140+
srv.send_frame(frames::settings().initial_window_size(frame::DEFAULT_INITIAL_WINDOW_SIZE))
2141+
.await;
2142+
srv.recv_frame(frames::settings_ack()).await;
2143+
srv.send_frame(frames::headers(1).response(200).eos()).await;
2144+
srv.recv_frame(frames::data(1, "world").eos()).await;
2145+
2146+
srv.recv_frame(frames::headers(3).request("POST", "https://www.example.com/"))
2147+
.await;
2148+
srv.send_frame(frames::window_update(
2149+
0,
2150+
frame::DEFAULT_INITIAL_WINDOW_SIZE + 10,
2151+
))
2152+
.await;
2153+
srv.recv_frame(frames::reset(3).cancel()).await;
2154+
};
2155+
2156+
let h2 = async move {
2157+
let (mut client, mut h2) = client::handshake(io).await.unwrap();
2158+
let request = Request::builder()
2159+
.method(Method::POST)
2160+
.uri("https://www.example.com/")
2161+
.body(())
2162+
.unwrap();
2163+
2164+
let (response1, mut stream1) = client.send_request(request.clone(), false).unwrap();
2165+
stream1.send_data("hello".into(), false).unwrap();
2166+
let (_, mut stream2) = client.send_request(request.clone(), false).unwrap();
2167+
stream2.reserve_capacity(frame::DEFAULT_INITIAL_WINDOW_SIZE as usize);
2168+
h2.drive(response1).await.unwrap();
2169+
stream1.send_data("world".into(), true).unwrap();
2170+
let stream2 = h2
2171+
.drive(util::wait_for_capacity(
2172+
stream2,
2173+
frame::DEFAULT_INITIAL_WINDOW_SIZE as usize,
2174+
))
2175+
.await;
2176+
drop(stream2);
2177+
h2.await.unwrap();
2178+
};
2179+
2180+
join(mock, h2).await;
2181+
}
2182+
20712183
// ==== abusive window updates ====
20722184

20732185
#[tokio::test]

tests/h2-tests/tests/prioritization.rs

Lines changed: 14 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,9 @@ async fn single_stream_send_large_body() {
5252
stream.reserve_capacity(payload.len());
5353

5454
// The capacity should be immediately allocated
55-
assert_eq!(stream.capacity(), payload.len());
55+
let mut stream = h2
56+
.drive(util::wait_for_capacity(stream, payload.len()))
57+
.await;
5658

5759
// Send the data
5860
stream.send_data(payload.into(), true).unwrap();
@@ -108,7 +110,9 @@ async fn multiple_streams_with_payload_greater_than_default_window() {
108110
// The capacity should be immediately
109111
// allocated to default window size (smaller than payload)
110112
stream1.reserve_capacity(payload_clone.len());
111-
assert_eq!(stream1.capacity(), DEFAULT_WINDOW_SIZE);
113+
let mut stream1 = conn
114+
.drive(util::wait_for_capacity(stream1, DEFAULT_WINDOW_SIZE))
115+
.await;
112116

113117
stream2.reserve_capacity(payload_clone.len());
114118
assert_eq!(stream2.capacity(), 0);
@@ -179,7 +183,9 @@ async fn single_stream_send_extra_large_body_multi_frames_one_buffer() {
179183
stream.reserve_capacity(payload.len());
180184

181185
// The capacity should be immediately allocated
182-
assert_eq!(stream.capacity(), payload.len());
186+
let mut stream = h2
187+
.drive(util::wait_for_capacity(stream, payload.len()))
188+
.await;
183189

184190
// Send the data
185191
stream.send_data(payload.into(), true).unwrap();
@@ -296,13 +302,13 @@ async fn single_stream_send_extra_large_body_multi_frames_multi_buffer() {
296302
0, 0, 16, 1, 4, 0, 0, 0, 1, 131, 135, 65, 139, 157, 41, 172, 75, 143, 168, 233, 25, 151,
297303
33, 233, 132,
298304
])
305+
.write(frames::SETTINGS_ACK)
306+
.read(frames::SETTINGS_ACK)
299307
.write(&[
300308
// DATA
301309
0, 64, 0, 0, 0, 0, 0, 0, 1,
302310
])
303311
.write(&payload[0..16_384])
304-
.write(frames::SETTINGS_ACK)
305-
.read(frames::SETTINGS_ACK)
306312
.wait(Duration::from_millis(10))
307313
.write(&[
308314
// DATA
@@ -326,7 +332,9 @@ async fn single_stream_send_extra_large_body_multi_frames_multi_buffer() {
326332
stream.reserve_capacity(payload.len());
327333

328334
// The capacity should be immediately allocated
329-
assert_eq!(stream.capacity(), payload.len());
335+
let mut stream = h2
336+
.drive(util::wait_for_capacity(stream, payload.len()))
337+
.await;
330338

331339
// Send the data
332340
stream.send_data(payload.into(), true).unwrap();

tests/h2-tests/tests/stream_states.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -51,11 +51,11 @@ async fn send_recv_data() {
5151
0, 0, 16, 1, 4, 0, 0, 0, 1, 131, 135, 65, 139, 157, 41, 172, 75, 143, 168, 233, 25, 151,
5252
33, 233, 132,
5353
])
54+
.write(frames::SETTINGS_ACK)
5455
.write(&[
5556
// DATA
5657
0, 0, 5, 0, 1, 0, 0, 0, 1, 104, 101, 108, 108, 111,
5758
])
58-
.write(frames::SETTINGS_ACK)
5959
// Read response
6060
.read(&[
6161
// HEADERS
@@ -78,10 +78,10 @@ async fn send_recv_data() {
7878
// Reserve send capacity
7979
stream.reserve_capacity(5);
8080

81-
assert_eq!(stream.capacity(), 5);
81+
let mut stream = h2.drive(util::wait_for_capacity(stream, 5)).await;
8282

8383
// Send the data
84-
stream.send_data("hello".as_bytes(), true).unwrap();
84+
stream.send_data("hello".into(), true).unwrap();
8585

8686
// Get the response
8787
let resp = h2.run(response).await.unwrap();

0 commit comments

Comments
 (0)