Skip to content

Commit 47dc432

Browse files
fix(reth-bench): Lower block channel capacity and make it configurable (#19226)
1 parent 712569d commit 47dc432

File tree

2 files changed

+65
-6
lines changed

2 files changed

+65
-6
lines changed

bin/reth-bench/src/bench/new_payload_fcu.rs

Lines changed: 23 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,16 @@ pub struct Command {
3333
#[arg(long, value_name = "WAIT_TIME", value_parser = parse_duration, verbatim_doc_comment)]
3434
wait_time: Option<Duration>,
3535

36+
/// The size of the block buffer (channel capacity) for prefetching blocks from the RPC
37+
/// endpoint.
38+
#[arg(
39+
long = "rpc-block-buffer-size",
40+
value_name = "RPC_BLOCK_BUFFER_SIZE",
41+
default_value = "20",
42+
verbatim_doc_comment
43+
)]
44+
rpc_block_buffer_size: usize,
45+
3646
#[command(flatten)]
3747
benchmark: BenchmarkArgs,
3848
}
@@ -48,7 +58,12 @@ impl Command {
4858
is_optimism,
4959
} = BenchContext::new(&self.benchmark, self.rpc_url).await?;
5060

51-
let (sender, mut receiver) = tokio::sync::mpsc::channel(1000);
61+
let buffer_size = self.rpc_block_buffer_size;
62+
63+
// Use a oneshot channel to propagate errors from the spawned task
64+
let (error_sender, mut error_receiver) = tokio::sync::oneshot::channel();
65+
let (sender, mut receiver) = tokio::sync::mpsc::channel(buffer_size);
66+
5267
tokio::task::spawn(async move {
5368
while benchmark_mode.contains(next_block) {
5469
let block_res = block_provider
@@ -60,6 +75,7 @@ impl Command {
6075
Ok(block) => block,
6176
Err(e) => {
6277
tracing::error!("Failed to fetch block {next_block}: {e}");
78+
let _ = error_sender.send(e);
6379
break;
6480
}
6581
};
@@ -69,6 +85,7 @@ impl Command {
6985
Ok(result) => result,
7086
Err(e) => {
7187
tracing::error!("Failed to convert block to new payload: {e}");
88+
let _ = error_sender.send(e);
7289
break;
7390
}
7491
};
@@ -163,6 +180,11 @@ impl Command {
163180
results.push((gas_row, combined_result));
164181
}
165182

183+
// Check if the spawned task encountered an error
184+
if let Ok(error) = error_receiver.try_recv() {
185+
return Err(error);
186+
}
187+
166188
let (gas_output_results, combined_results): (_, Vec<CombinedResult>) =
167189
results.into_iter().unzip();
168190

bin/reth-bench/src/bench/new_payload_only.rs

Lines changed: 42 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ use crate::{
1313
use alloy_provider::Provider;
1414
use clap::Parser;
1515
use csv::Writer;
16-
use eyre::Context;
16+
use eyre::{Context, OptionExt};
1717
use reth_cli_runner::CliContext;
1818
use reth_node_core::args::BenchmarkArgs;
1919
use std::time::{Duration, Instant};
@@ -26,6 +26,16 @@ pub struct Command {
2626
#[arg(long, value_name = "RPC_URL", verbatim_doc_comment)]
2727
rpc_url: String,
2828

29+
/// The size of the block buffer (channel capacity) for prefetching blocks from the RPC
30+
/// endpoint.
31+
#[arg(
32+
long = "rpc-block-buffer-size",
33+
value_name = "RPC_BLOCK_BUFFER_SIZE",
34+
default_value = "20",
35+
verbatim_doc_comment
36+
)]
37+
rpc_block_buffer_size: usize,
38+
2939
#[command(flatten)]
3040
benchmark: BenchmarkArgs,
3141
}
@@ -41,21 +51,43 @@ impl Command {
4151
is_optimism,
4252
} = BenchContext::new(&self.benchmark, self.rpc_url).await?;
4353

44-
let (sender, mut receiver) = tokio::sync::mpsc::channel(1000);
54+
let buffer_size = self.rpc_block_buffer_size;
55+
56+
// Use a oneshot channel to propagate errors from the spawned task
57+
let (error_sender, mut error_receiver) = tokio::sync::oneshot::channel();
58+
let (sender, mut receiver) = tokio::sync::mpsc::channel(buffer_size);
59+
4560
tokio::task::spawn(async move {
4661
while benchmark_mode.contains(next_block) {
4762
let block_res = block_provider
4863
.get_block_by_number(next_block.into())
4964
.full()
5065
.await
5166
.wrap_err_with(|| format!("Failed to fetch block by number {next_block}"));
52-
let block = block_res.unwrap().unwrap();
67+
let block = match block_res.and_then(|opt| opt.ok_or_eyre("Block not found")) {
68+
Ok(block) => block,
69+
Err(e) => {
70+
tracing::error!("Failed to fetch block {next_block}: {e}");
71+
let _ = error_sender.send(e);
72+
break;
73+
}
74+
};
5375
let header = block.header.clone();
5476

55-
let (version, params) = block_to_new_payload(block, is_optimism).unwrap();
77+
let (version, params) = match block_to_new_payload(block, is_optimism) {
78+
Ok(result) => result,
79+
Err(e) => {
80+
tracing::error!("Failed to convert block to new payload: {e}");
81+
let _ = error_sender.send(e);
82+
break;
83+
}
84+
};
5685

5786
next_block += 1;
58-
sender.send((header, version, params)).await.unwrap();
87+
if let Err(e) = sender.send((header, version, params)).await {
88+
tracing::error!("Failed to send block data: {e}");
89+
break;
90+
}
5991
}
6092
});
6193

@@ -96,6 +128,11 @@ impl Command {
96128
results.push((row, new_payload_result));
97129
}
98130

131+
// Check if the spawned task encountered an error
132+
if let Ok(error) = error_receiver.try_recv() {
133+
return Err(error);
134+
}
135+
99136
let (gas_output_results, new_payload_results): (_, Vec<NewPayloadResult>) =
100137
results.into_iter().unzip();
101138

0 commit comments

Comments
 (0)