Skip to content

Add a test to demonstrate connection logical deadlock under high concurrency #852

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 6 commits into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
201 changes: 201 additions & 0 deletions tests/h2-tests/tests/deadlock.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,201 @@
use std::{
mem,
net::SocketAddr,
sync::atomic::{self, AtomicUsize},
time::Instant,
};

use futures::StreamExt;
use h2_support::prelude::*;
use tokio::{net::TcpStream, runtime::Runtime};

static REQUESTS_COMPLETED: AtomicUsize = AtomicUsize::new(0);

// The magic happens when concurrency is much greater than max_concurrent_streams.
const CONCURRENCY: usize = 100;
const MAX_CONCURRENT_STREAMS: u32 = 20;

// If we successfully complete this many requests per task, we can call the test a success.
// Typically, this test seems to fail in the 10-25 request range but no doubt there are many
// variables in play, as failures are somewhat random and timing-dependent.
const TARGET_REQUESTS_PER_TASK: usize = 100;

// The test is successful if all the expected requests have been completed.
const TARGET_REQUESTS_COMPLETED: usize = CONCURRENCY * TARGET_REQUESTS_PER_TASK;

// If no requests have been completed in this long, we consider the connection deadlocked.
const MUST_MAKE_PROGRESS_INTERVAL: Duration = Duration::from_secs(2);
const CHECK_FOR_PROGRESS_INTERVAL: Duration = Duration::from_millis(100);

#[test]
fn logical_deadlock() {
let server_addr = serve();

let runtime = Runtime::new().unwrap();

runtime.block_on(async move {
let tcp_connection = TcpStream::connect(server_addr).await.unwrap();
let (client, h2_connection) = client::handshake(tcp_connection).await.unwrap();

tokio::spawn(async move {
// This just does "connection stuff" independent of any requests.
h2_connection.await.unwrap();
});

let mut join_handles = Vec::new();

for _ in 0..CONCURRENCY {
join_handles.push(tokio::spawn({
let mut client = client.clone();

async move {
for _ in 0..TARGET_REQUESTS_PER_TASK {
let request = Request::builder()
.method(Method::POST)
.uri("/")
.body(())
.unwrap();

let (response_future, mut request_body) =
client.send_request(request, false).unwrap();

request_body
.send_data(Bytes::from_static(REQUEST_BODY), true)
.unwrap();

// The anomaly we expect to see will result in this await never completing
// for all `CONCURRENCY` concurrent requests enqueued on this connection.
let mut response = match response_future.await {
Ok(response) => response,
Err(e)
if e.is_reset() && e.reason() == Some(Reason::REFUSED_STREAM) =>
{
// Not relevant - it means the server sent a max_concurrent_streams
// limit and the client has not yet seen it, so it is not enforced.
// Not an ideal outcome in general but not relevant to this test
// scenario - the anomaly we expect to see results in deadlocked
// requests, not failing requests.
REQUESTS_COMPLETED.fetch_add(1, atomic::Ordering::Relaxed);
continue;
}
Err(e) => {
// This failure is not expected as part of the test scenario, neither for fail nor pass results.
panic!("Request failed unexpectedly: {:?}", e);
}
};

let response_body_stream = response.body_mut();
let mut total_length: usize = 0;

while let Some(Ok(chunk)) = response_body_stream.next().await {
total_length += chunk.len();
}

assert_eq!(total_length, 10 * 1024);

REQUESTS_COMPLETED.fetch_add(1, atomic::Ordering::Relaxed);
}
}
}));
}

let mut last_progress = Instant::now();
let mut last_requests_completed = 0;

while REQUESTS_COMPLETED.load(atomic::Ordering::Relaxed) < TARGET_REQUESTS_COMPLETED {
let requests_completed = REQUESTS_COMPLETED.load(atomic::Ordering::Relaxed);

println!("Requests completed: {requests_completed} of {TARGET_REQUESTS_COMPLETED}",);

if requests_completed > last_requests_completed {
last_progress = Instant::now();
}

last_requests_completed = requests_completed;

if Instant::now().duration_since(last_progress) > MUST_MAKE_PROGRESS_INTERVAL {
panic!(
"No requests completed in the last {:?}, deadlock likely occurred",
MUST_MAKE_PROGRESS_INTERVAL
);
}

tokio::time::sleep(CHECK_FOR_PROGRESS_INTERVAL).await;
}

println!("All requests completed successfully.");

for handle in join_handles {
handle.await.unwrap();
}
});
}

static REQUEST_BODY: &[u8] = &[77; 10 * 1024];
static RESPONSE_BODY: &[u8] = &[88; 10 * 1024];

/// Starts a test server that will open a TCP socket and listen for HTTP/2 connections.
///
/// For simplify, it will listen forever (until the test binary terminates).
///
/// Expects to read 10 KB of request body from each request to `/`,
/// to which it responds with a 200 OK with 10 KB of response body.
fn serve() -> SocketAddr {
let runtime = Runtime::new().unwrap();

let local_addr = runtime.block_on(async {
let listener = tokio::net::TcpListener::bind(SocketAddr::from(([127, 0, 0, 1], 0)))
.await
.unwrap();

let local_addr = listener.local_addr().unwrap();

tokio::spawn(async move {
loop {
let (socket, _) = listener.accept().await.unwrap();

// Fork each connection.
tokio::spawn(async move {
let mut connection = server::Builder::new()
.max_concurrent_streams(MAX_CONCURRENT_STREAMS)
.handshake(socket)
.await
.expect("Failed to handshake");

while let Some(incoming) = connection.next().await {
let Ok((mut request, mut responder)) = incoming else {
// This generally means connection terminated.
break;
};

// Fork each request.
tokio::spawn(async move {
let request_body_stream = request.body_mut();
let mut total_length: usize = 0;

while let Some(Ok(chunk)) = request_body_stream.next().await {
total_length += chunk.len();
}

assert_eq!(total_length, 10 * 1024);

let response =
Response::builder().status(StatusCode::OK).body(()).unwrap();
let mut body_sender = responder.send_response(response, false).unwrap();
body_sender
.send_data(Bytes::from_static(RESPONSE_BODY), true)
.unwrap();
});
}
});
}
});

local_addr
});

// Keep it running until end of process.
mem::forget(runtime);

local_addr
}