From 9b1a734428130187b99206af0366cf1010fcbc18 Mon Sep 17 00:00:00 2001 From: Charlie Mack Date: Mon, 27 Oct 2025 09:47:14 +0100 Subject: [PATCH 1/6] feat(bench): refactor for cleaner implementation --- Cargo.lock | 2 + bin/reth-bench/Cargo.toml | 2 + bin/reth-bench/src/bench/block_storage.rs | 339 +++++++++++++++++++ bin/reth-bench/src/bench/context.rs | 9 +- bin/reth-bench/src/bench/mod.rs | 5 + bin/reth-bench/src/bench/new_payload_fcu.rs | 179 +++++++--- bin/reth-bench/src/bench/new_payload_only.rs | 100 ++++-- bin/reth-bench/src/bench/output.rs | 3 + bin/reth-bench/src/valid_payload.rs | 16 +- crates/node/core/src/args/benchmark_args.rs | 9 + 10 files changed, 581 insertions(+), 83 deletions(-) create mode 100644 bin/reth-bench/src/bench/block_storage.rs diff --git a/Cargo.lock b/Cargo.lock index b8c0da68164..831892a020b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -7324,11 +7324,13 @@ dependencies = [ name = "reth-bench" version = "1.8.2" dependencies = [ + "alloy-consensus", "alloy-eips", "alloy-json-rpc", "alloy-primitives", "alloy-provider", "alloy-pubsub", + "alloy-rlp", "alloy-rpc-client", "alloy-rpc-types-engine", "alloy-transport", diff --git a/bin/reth-bench/Cargo.toml b/bin/reth-bench/Cargo.toml index 891fa4f9780..ec818ff8256 100644 --- a/bin/reth-bench/Cargo.toml +++ b/bin/reth-bench/Cargo.toml @@ -25,6 +25,8 @@ reth-tracing.workspace = true # alloy alloy-eips.workspace = true alloy-json-rpc.workspace = true +alloy-consensus.workspace = true +alloy-rlp.workspace = true alloy-primitives.workspace = true alloy-provider = { workspace = true, features = ["engine-api", "reqwest-rustls-tls"], default-features = false } alloy-pubsub.workspace = true diff --git a/bin/reth-bench/src/bench/block_storage.rs b/bin/reth-bench/src/bench/block_storage.rs new file mode 100644 index 00000000000..83da676a346 --- /dev/null +++ b/bin/reth-bench/src/bench/block_storage.rs @@ -0,0 +1,339 @@ +use crate::bench::{context::BenchContext, output::BLOCK_STORAGE_OUTPUT_SUFFIX}; +use alloy_provider::{network::AnyNetwork, Provider, RootProvider}; + +use alloy_consensus::{ + transaction::{Either, EthereumTxEnvelope}, + Block, BlockBody, Header, TxEip4844Variant, +}; +use alloy_rlp::{Decodable, Encodable}; +use clap::Parser; +use eyre::{Context, OptionExt}; +use op_alloy_consensus::OpTxEnvelope; +use reth_cli_runner::CliContext; +use reth_node_core::args::BenchmarkArgs; +use std::{ + fs::File, + io::{BufReader, BufWriter, Read, Write}, + path::Path, +}; +use tracing::{debug, info}; + +//type alias for the consensus block +type ConsensusBlock = Block, OpTxEnvelope>, Header>; + +/// File format version for future compatibility +const FILE_FORMAT_VERSION: u8 = 1; + +/// Magic bytes to identify the file format +const MAGIC_BYTES: &[u8] = b"RETH"; + +/// `reth benchmark generate-file` command +#[derive(Debug, Parser)] +pub struct Command { + /// The RPC url to use for getting data. + #[arg(long, value_name = "RPC_URL", verbatim_doc_comment)] + rpc_url: String, + + #[command(flatten)] + benchmark: BenchmarkArgs, +} + +/// Block file header +#[derive(Debug)] +pub(crate) struct BlockFileHeader { + version: u8, + block_type: BlockType, + from_block: u64, + to_block: u64, +} + +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +#[repr(u8)] +pub(crate) enum BlockType { + Ethereum = 0, + Optimism = 1, +} + +impl BlockFileHeader { + fn new(is_optimism: bool, from_block: u64, to_block: u64) -> Self { + Self { + version: FILE_FORMAT_VERSION, + block_type: if is_optimism { BlockType::Optimism } else { BlockType::Ethereum }, + from_block, + to_block, + } + } + + /// Get the block range from the header + pub(super) fn block_range(&self) -> (u64, u64) { + (self.from_block, self.to_block) + } + + fn write_to(&self, writer: &mut impl Write) -> eyre::Result<()> { + writer.write_all(MAGIC_BYTES)?; + writer.write_all(&[self.version])?; + writer.write_all(&[self.block_type as u8])?; + writer.write_all(&self.from_block.to_le_bytes())?; + writer.write_all(&self.to_block.to_le_bytes())?; + Ok(()) + } + + /// Read header from file (for the decoder) + fn read_from(reader: &mut impl std::io::Read) -> eyre::Result { + let mut magic = [0u8; 4]; + reader.read_exact(&mut magic)?; + if magic != MAGIC_BYTES { + return Err(eyre::eyre!("Invalid file format")); + } + + let mut version = [0u8; 1]; + reader.read_exact(&mut version)?; + if version[0] != FILE_FORMAT_VERSION { + return Err(eyre::eyre!("Unsupported file version: {}", version[0])); + } + + let mut block_type = [0u8; 1]; + reader.read_exact(&mut block_type)?; + + let mut from_block = [0u8; 8]; + reader.read_exact(&mut from_block)?; + let from_block = u64::from_le_bytes(from_block); + + let mut to_block = [0u8; 8]; + reader.read_exact(&mut to_block)?; + let to_block = u64::from_le_bytes(to_block); + + Ok(Self { + version: version[0], + block_type: match block_type[0] { + 0 => BlockType::Ethereum, + 1 => BlockType::Optimism, + _ => return Err(eyre::eyre!("Unknown block type: {}", block_type[0])), + }, + from_block, + to_block, + }) + } +} + +/// Writer for block storage files +struct BlockFileWriter { + writer: BufWriter, + blocks_written: usize, +} + +impl BlockFileWriter { + fn new(path: &Path, header: BlockFileHeader) -> eyre::Result { + let mut writer = BufWriter::new(File::create(path)?); + header.write_to(&mut writer)?; + + Ok(Self { writer, blocks_written: 0 }) + } + + fn write_block(&mut self, rlp_data: &[u8]) -> eyre::Result<()> { + // Write length as 4 bytes (supports blocks up to 4GB) + self.writer.write_all(&(rlp_data.len() as u32).to_le_bytes())?; + self.writer.write_all(rlp_data)?; + self.blocks_written += 1; + Ok(()) + } + + fn finish(mut self) -> eyre::Result { + self.writer.flush()?; + Ok(self.blocks_written) + } +} + +/// Block file reader that streams blocks in batches +pub(crate) struct BlockFileReader { + reader: BufReader, + batch_size: usize, + blocks_read: usize, + block_type: BlockType, +} + +impl BlockFileReader { + /// Create a new block file reader + pub(crate) fn new(path: &Path, batch_size: usize) -> eyre::Result { + let file = + File::open(path).wrap_err_with(|| format!("Failed to open block file: {:?}", path))?; + + let mut reader = BufReader::new(file); + + info!("Opened block file: {:?}", path); + + let header = BlockFileHeader::read_from(&mut reader)?; + + let block_type = header.block_type; + + Ok(Self { reader, batch_size, blocks_read: 0, block_type }) + } + + /// Get the header from the file + pub(crate) fn get_header(path: &Path) -> eyre::Result { + let mut file = + File::open(path).wrap_err_with(|| format!("Failed to open block file: {:?}", path))?; + BlockFileHeader::read_from(&mut file) + } + + /// Read the next batch of blocks from the file + pub(crate) fn read_batch(&mut self) -> eyre::Result> { + let mut batch = Vec::with_capacity(self.batch_size); + + for _ in 0..self.batch_size { + match self.read_next_block()? { + Some(block) => batch.push(block), + None => break, // EOF + } + } + + if !batch.is_empty() { + debug!("Read batch of {} blocks (total read: {})", batch.len(), self.blocks_read); + } + + Ok(batch) + } + + /// Read a single block from the file + fn read_next_block(&mut self) -> eyre::Result> { + // Read length prefix (4 bytes, little-endian) + let mut len_bytes = [0u8; 4]; + match self.reader.read_exact(&mut len_bytes) { + Ok(_) => {} + Err(e) if e.kind() == std::io::ErrorKind::UnexpectedEof => { + return Ok(None); + } + Err(e) => return Err(e.into()), + } + + let len = u32::from_le_bytes(len_bytes) as usize; + + // Read RLP-encoded block data + let mut block_bytes = vec![0u8; len]; + self.reader.read_exact(&mut block_bytes).wrap_err("Failed to read block data")?; + + let block: ConsensusBlock = match self.block_type { + BlockType::Ethereum => Block::, Header>::decode( + &mut block_bytes.as_slice(), + )? + .map_transactions(|tx| Either::Left(tx)), + BlockType::Optimism => { + Block::::decode(&mut block_bytes.as_slice())? + .map_transactions(|tx| Either::Right(tx)) + } + }; + + self.blocks_read += 1; + + Ok(Some(block)) + } +} + +impl Command { + /// Execute `benchmark block-storage` command + pub async fn execute(self, _ctx: CliContext) -> eyre::Result<()> { + info!("Generating file from RPC: {}", self.rpc_url); + + let BenchContext { block_provider, benchmark_mode, mut next_block, is_optimism, .. } = + BenchContext::new(&self.benchmark, self.rpc_url.clone()).await?; + + // Initialize file writer with header + let output_path = self + .benchmark + .output + .as_ref() + .ok_or_eyre("--output is required")? + .join(BLOCK_STORAGE_OUTPUT_SUFFIX); + + let from_block = self.benchmark.from.ok_or_eyre("--from is required")?; + let to_block = self.benchmark.to.ok_or_eyre("--to is required")?; + + let header = BlockFileHeader::new(is_optimism, from_block, to_block); + let mut file_writer = BlockFileWriter::new(&output_path, header)?; + + info!( + "Writing {} blocks to {:?}", + if is_optimism { "Optimism" } else { "Ethereum" }, + output_path + ); + + // Process blocks + while benchmark_mode.contains(next_block) { + info!("Processing block {}", next_block); + + // Fetch and encode block + let rlp_data = + self.fetch_and_encode_block(&block_provider, next_block, is_optimism).await?; + + // Write to file + file_writer.write_block(&rlp_data)?; + + next_block += 1; + } + + let blocks_written = file_writer.finish()?; + info!("Successfully wrote {} blocks to file", blocks_written); + + Ok(()) + } + + async fn fetch_and_encode_block( + &self, + provider: &RootProvider, + block_number: u64, + is_optimism: bool, + ) -> eyre::Result> { + // Fetch block + let block = provider + .get_block_by_number(block_number.into()) + .full() + .await? + .ok_or_eyre(format!("Block {} not found", block_number))?; + + let inner_block = block.into_inner(); + + let rlp = if is_optimism { + // Create Optimism block + let op_block = Block { + header: inner_block.header.inner.into_header_with_defaults(), + body: BlockBody { + transactions: inner_block + .transactions + .into_transactions() + .into_iter() + .map(|tx| tx.try_into()) + .collect::, _>>() + .map_err(|e| eyre::eyre!("Failed to convert to Optimism tx: {:?}", e))?, + ommers: vec![], + withdrawals: inner_block.withdrawals, + }, + }; + + let mut buf = Vec::with_capacity(op_block.length()); + op_block.encode(&mut buf); + buf + } else { + // Create Ethereum block + let eth_block = Block { + header: inner_block.header.inner.into_header_with_defaults(), + body: BlockBody { + transactions: inner_block + .transactions + .into_transactions() + .into_iter() + .map(|tx| tx.try_into_envelope()) + .collect::>, _>>() + .map_err(|e| eyre::eyre!("Failed to convert to Ethereum tx: {:?}", e))?, + ommers: vec![], + withdrawals: inner_block.withdrawals, + }, + }; + + let mut buf = Vec::with_capacity(eth_block.length()); + eth_block.encode(&mut buf); + buf + }; + + Ok(rlp) + } +} diff --git a/bin/reth-bench/src/bench/context.rs b/bin/reth-bench/src/bench/context.rs index 1d53ce8e1a3..8f9e32bbd25 100644 --- a/bin/reth-bench/src/bench/context.rs +++ b/bin/reth-bench/src/bench/context.rs @@ -1,7 +1,10 @@ //! This contains the [`BenchContext`], which is information that all replay-based benchmarks need. //! The initialization code is also the same, so this can be shared across benchmark commands. -use crate::{authenticated_transport::AuthenticatedTransportConnect, bench_mode::BenchMode}; +use crate::{ + authenticated_transport::AuthenticatedTransportConnect, bench::block_storage::BlockFileReader, + bench_mode::BenchMode, +}; use alloy_eips::BlockNumberOrTag; use alloy_primitives::address; use alloy_provider::{network::AnyNetwork, Provider, RootProvider}; @@ -99,6 +102,10 @@ impl BenchContext { .ok_or_else(|| eyre::eyre!("Failed to fetch latest block for --advance"))?; let head_number = head_block.header.number; (Some(head_number), Some(head_number + advance)) + } else if bench_args.from_file.is_some() { + let file = BlockFileReader::get_header(bench_args.from_file.as_ref().unwrap())?; + let (start, end) = file.block_range(); + (Some(start), Some(end)) } else { (bench_args.from, bench_args.to) }; diff --git a/bin/reth-bench/src/bench/mod.rs b/bin/reth-bench/src/bench/mod.rs index da3ccb1a8bb..97e8106efa5 100644 --- a/bin/reth-bench/src/bench/mod.rs +++ b/bin/reth-bench/src/bench/mod.rs @@ -5,6 +5,7 @@ use reth_cli_runner::CliContext; use reth_node_core::args::LogArgs; use reth_tracing::FileWorkerGuard; +mod block_storage; mod context; mod new_payload_fcu; mod new_payload_only; @@ -41,6 +42,9 @@ pub enum Subcommands { /// `cast block latest --full --json | reth-bench send-payload --rpc-url localhost:5000 /// --jwt-secret $(cat ~/.local/share/reth/mainnet/jwt.hex)` SendPayload(send_payload::Command), + + /// Command for generating a file of blocks from an RPC endpoint. + BlockStorage(block_storage::Command), } impl BenchmarkCommand { @@ -53,6 +57,7 @@ impl BenchmarkCommand { Subcommands::NewPayloadFcu(command) => command.execute(ctx).await, Subcommands::NewPayloadOnly(command) => command.execute(ctx).await, Subcommands::SendPayload(command) => command.execute(ctx).await, + Subcommands::BlockStorage(command) => command.execute(ctx).await, } } diff --git a/bin/reth-bench/src/bench/new_payload_fcu.rs b/bin/reth-bench/src/bench/new_payload_fcu.rs index ce094895ee3..e189ef1d396 100644 --- a/bin/reth-bench/src/bench/new_payload_fcu.rs +++ b/bin/reth-bench/src/bench/new_payload_fcu.rs @@ -3,14 +3,19 @@ use crate::{ bench::{ + block_storage::BlockFileReader, context::BenchContext, output::{ CombinedResult, NewPayloadResult, TotalGasOutput, TotalGasRow, COMBINED_OUTPUT_SUFFIX, GAS_OUTPUT_SUFFIX, }, }, - valid_payload::{block_to_new_payload, call_forkchoice_updated, call_new_payload}, + valid_payload::{ + block_to_new_payload, call_forkchoice_updated, call_new_payload, + consensus_block_to_new_payload, + }, }; +use alloy_consensus::Header; use alloy_provider::Provider; use alloy_rpc_types_engine::ForkchoiceState; use clap::Parser; @@ -64,67 +69,133 @@ impl Command { let (error_sender, mut error_receiver) = tokio::sync::oneshot::channel(); let (sender, mut receiver) = tokio::sync::mpsc::channel(buffer_size); - tokio::task::spawn(async move { - while benchmark_mode.contains(next_block) { - let block_res = block_provider - .get_block_by_number(next_block.into()) - .full() - .await - .wrap_err_with(|| format!("Failed to fetch block by number {next_block}")); - let block = match block_res.and_then(|opt| opt.ok_or_eyre("Block not found")) { - Ok(block) => block, + // Choose block source based on --from-file argument + if let Some(file_path) = self.benchmark.from_file.clone() { + // Use file reader for blocks + tokio::task::spawn(async move { + let mut reader = match BlockFileReader::new(&file_path, buffer_size) { + Ok(r) => r, Err(e) => { - tracing::error!("Failed to fetch block {next_block}: {e}"); + tracing::error!("Failed to create block reader: {e}"); let _ = error_sender.send(e); - break; + return; } }; - let header = block.header.clone(); - let (version, params) = match block_to_new_payload(block, is_optimism) { - Ok(result) => result, - Err(e) => { - tracing::error!("Failed to convert block to new payload: {e}"); - let _ = error_sender.send(e); + // At anypoint if we fail to process a block, or read a batch of blocks, it means + // the file is corrupted somehow so the benchmark should exit as the + // results are invalidated + loop { + let blocks = match reader.read_batch() { + Ok(batch) if batch.is_empty() => break, // EOF + Ok(batch) => batch, + Err(e) => { + tracing::error!("Failed to read blocks from file: {e}"); + let _ = error_sender.send(e); + return; + } + }; + + for block in blocks { + let header: Header = block.header.clone(); + + let (version, params) = + match consensus_block_to_new_payload(block, is_optimism) { + Ok(result) => result, + Err(e) => { + tracing::error!("Failed to convert block to new payload: {e}"); + let _ = error_sender.send(e); + return; + } + }; + + let head_block_hash = header.hash_slow(); + // For file-based blocks, use the same hash for safe/finalized + let safe_block_hash = head_block_hash; + let finalized_block_hash = head_block_hash; + + if let Err(e) = sender + .send(( + header, + version, + params, + head_block_hash, + safe_block_hash, + finalized_block_hash, + )) + .await + { + tracing::error!("Failed to send block data: {e}"); + return; + } + } + } + }); + } else { + // Use RPC provider for blocks (existing code) + tokio::task::spawn(async move { + while benchmark_mode.contains(next_block) { + let block_res = block_provider + .get_block_by_number(next_block.into()) + .full() + .await + .wrap_err_with(|| format!("Failed to fetch block by number {next_block}")); + let block = match block_res.and_then(|opt| opt.ok_or_eyre("Block not found")) { + Ok(block) => block, + Err(e) => { + tracing::error!("Failed to fetch block {next_block}: {e}"); + let _ = error_sender.send(e); + break; + } + }; + + let header = block.header.clone().inner.into_header_with_defaults(); + + let (version, params) = match block_to_new_payload(block, is_optimism) { + Ok(result) => result, + Err(e) => { + tracing::error!("Failed to convert block to new payload: {e}"); + let _ = error_sender.send(e); + break; + } + }; + let head_block_hash = header.hash_slow(); + let safe_block_hash = + block_provider.get_block_by_number(header.number.saturating_sub(32).into()); + + let finalized_block_hash = + block_provider.get_block_by_number(header.number.saturating_sub(64).into()); + + let (safe, finalized) = tokio::join!(safe_block_hash, finalized_block_hash,); + + let safe_block_hash = match safe { + Ok(Some(block)) => block.header.hash, + Ok(None) | Err(_) => head_block_hash, + }; + + let finalized_block_hash = match finalized { + Ok(Some(block)) => block.header.hash, + Ok(None) | Err(_) => head_block_hash, + }; + + next_block += 1; + if let Err(e) = sender + .send(( + header, + version, + params, + head_block_hash, + safe_block_hash, + finalized_block_hash, + )) + .await + { + tracing::error!("Failed to send block data: {e}"); break; } - }; - let head_block_hash = header.hash; - let safe_block_hash = - block_provider.get_block_by_number(header.number.saturating_sub(32).into()); - - let finalized_block_hash = - block_provider.get_block_by_number(header.number.saturating_sub(64).into()); - - let (safe, finalized) = tokio::join!(safe_block_hash, finalized_block_hash,); - - let safe_block_hash = match safe { - Ok(Some(block)) => block.header.hash, - Ok(None) | Err(_) => head_block_hash, - }; - - let finalized_block_hash = match finalized { - Ok(Some(block)) => block.header.hash, - Ok(None) | Err(_) => head_block_hash, - }; - - next_block += 1; - if let Err(e) = sender - .send(( - header, - version, - params, - head_block_hash, - safe_block_hash, - finalized_block_hash, - )) - .await - { - tracing::error!("Failed to send block data: {e}"); - break; } - } - }); + }); + } // put results in a summary vec so they can be printed at the end let mut results = Vec::new(); diff --git a/bin/reth-bench/src/bench/new_payload_only.rs b/bin/reth-bench/src/bench/new_payload_only.rs index 3dfa619ec7b..e3c61393cbb 100644 --- a/bin/reth-bench/src/bench/new_payload_only.rs +++ b/bin/reth-bench/src/bench/new_payload_only.rs @@ -2,14 +2,16 @@ use crate::{ bench::{ + block_storage::BlockFileReader, context::BenchContext, output::{ NewPayloadResult, TotalGasOutput, TotalGasRow, GAS_OUTPUT_SUFFIX, NEW_PAYLOAD_OUTPUT_SUFFIX, }, }, - valid_payload::{block_to_new_payload, call_new_payload}, + valid_payload::{block_to_new_payload, call_new_payload, consensus_block_to_new_payload}, }; +use alloy_consensus::Header; use alloy_provider::Provider; use clap::Parser; use csv::Writer; @@ -57,39 +59,85 @@ impl Command { let (error_sender, mut error_receiver) = tokio::sync::oneshot::channel(); let (sender, mut receiver) = tokio::sync::mpsc::channel(buffer_size); - tokio::task::spawn(async move { - while benchmark_mode.contains(next_block) { - let block_res = block_provider - .get_block_by_number(next_block.into()) - .full() - .await - .wrap_err_with(|| format!("Failed to fetch block by number {next_block}")); - let block = match block_res.and_then(|opt| opt.ok_or_eyre("Block not found")) { - Ok(block) => block, + if let Some(file_path) = self.benchmark.from_file.clone() { + // Use file reader for blocks + tokio::task::spawn(async move { + let mut reader = match BlockFileReader::new(&file_path, buffer_size) { + Ok(r) => r, Err(e) => { - tracing::error!("Failed to fetch block {next_block}: {e}"); + tracing::error!("Failed to create block reader: {e}"); let _ = error_sender.send(e); - break; + return; } }; - let header = block.header.clone(); - let (version, params) = match block_to_new_payload(block, is_optimism) { - Ok(result) => result, - Err(e) => { - tracing::error!("Failed to convert block to new payload: {e}"); - let _ = error_sender.send(e); + // At anypoint if we fail to process a block, or read a batch of blocks, it means + // the file is corrupted somehow so the benchmark should exit as the + // results are invalidated + loop { + let blocks = match reader.read_batch() { + Ok(batch) if batch.is_empty() => break, // EOF + Ok(batch) => batch, + Err(e) => { + tracing::error!("Failed to read blocks from file: {e}"); + let _ = error_sender.send(e); + return; + } + }; + for block in blocks { + let header: Header = block.header.clone(); + let (version, params) = + match consensus_block_to_new_payload(block, is_optimism) { + Ok(result) => result, + Err(e) => { + tracing::error!("Failed to convert block to new payload: {e}"); + let _ = error_sender.send(e); + return; + } + }; + + if let Err(e) = sender.send((header, version, params)).await { + tracing::error!("Failed to send block data: {e}"); + return; + } + } + } + }); + } else { + tokio::task::spawn(async move { + while benchmark_mode.contains(next_block) { + let block_res = block_provider + .get_block_by_number(next_block.into()) + .full() + .await + .wrap_err_with(|| format!("Failed to fetch block by number {next_block}")); + let block = match block_res.and_then(|opt| opt.ok_or_eyre("Block not found")) { + Ok(block) => block, + Err(e) => { + tracing::error!("Failed to fetch block {next_block}: {e}"); + let _ = error_sender.send(e); + break; + } + }; + let header = block.header.clone().inner.into_header_with_defaults(); + + let (version, params) = match block_to_new_payload(block, is_optimism) { + Ok(result) => result, + Err(e) => { + tracing::error!("Failed to convert block to new payload: {e}"); + let _ = error_sender.send(e); + break; + } + }; + + next_block += 1; + if let Err(e) = sender.send((header, version, params)).await { + tracing::error!("Failed to send block data: {e}"); break; } - }; - - next_block += 1; - if let Err(e) = sender.send((header, version, params)).await { - tracing::error!("Failed to send block data: {e}"); - break; } - } - }); + }); + } // put results in a summary vec so they can be printed at the end let mut results = Vec::new(); diff --git a/bin/reth-bench/src/bench/output.rs b/bin/reth-bench/src/bench/output.rs index 794cd2768df..09b6125fedc 100644 --- a/bin/reth-bench/src/bench/output.rs +++ b/bin/reth-bench/src/bench/output.rs @@ -15,6 +15,9 @@ pub(crate) const COMBINED_OUTPUT_SUFFIX: &str = "combined_latency.csv"; /// This is the suffix for new payload output csv files. pub(crate) const NEW_PAYLOAD_OUTPUT_SUFFIX: &str = "new_payload_latency.csv"; +/// This is the suffix for block output files. +pub(crate) const BLOCK_STORAGE_OUTPUT_SUFFIX: &str = "blocks.bin"; + /// This represents the results of a single `newPayload` call in the benchmark, containing the gas /// used and the `newPayload` latency. #[derive(Debug)] diff --git a/bin/reth-bench/src/valid_payload.rs b/bin/reth-bench/src/valid_payload.rs index d253506b22b..305919551a3 100644 --- a/bin/reth-bench/src/valid_payload.rs +++ b/bin/reth-bench/src/valid_payload.rs @@ -2,7 +2,8 @@ //! response. This is useful for benchmarking, as it allows us to wait for a payload to be valid //! before sending additional calls. -use alloy_eips::eip7685::Requests; +use alloy_consensus::{Block, BlockHeader, Sealable, Transaction}; +use alloy_eips::{eip7685::Requests, Encodable2718}; use alloy_provider::{ext::EngineApi, network::AnyRpcBlock, Network, Provider}; use alloy_rpc_types_engine::{ ExecutionPayload, ExecutionPayloadInputV2, ForkchoiceState, ForkchoiceUpdated, @@ -146,6 +147,17 @@ pub(crate) fn block_to_new_payload( })? .into_consensus(); + consensus_block_to_new_payload(block, is_optimism) +} + +pub(crate) fn consensus_block_to_new_payload( + block: Block, + is_optimism: bool, +) -> eyre::Result<(EngineApiMessageVersion, serde_json::Value)> +where + T: Encodable2718 + Transaction, + H: BlockHeader + Sealable, +{ // Convert to execution payload let (payload, sidecar) = ExecutionPayload::from_block_slow(&block); @@ -160,7 +172,7 @@ pub(crate) fn block_to_new_payload( serde_json::to_value(( OpExecutionPayloadV4 { payload_inner: payload, - withdrawals_root: block.withdrawals_root.unwrap(), + withdrawals_root: block.withdrawals_root().unwrap(), }, cancun.versioned_hashes.clone(), cancun.parent_beacon_block_root, diff --git a/crates/node/core/src/args/benchmark_args.rs b/crates/node/core/src/args/benchmark_args.rs index 2865054ded1..2dc96551caa 100644 --- a/crates/node/core/src/args/benchmark_args.rs +++ b/crates/node/core/src/args/benchmark_args.rs @@ -45,6 +45,15 @@ pub struct BenchmarkArgs { )] pub engine_rpc_url: String, + /// The path to the file that contains blocks to be used for the benchmark. + #[arg( + long = "from-file", + value_name = "FILE", + conflicts_with_all = &["from", "to", "advance"], + verbatim_doc_comment + )] + pub from_file: Option, + /// The path to the output directory for granular benchmark results. #[arg(long, short, value_name = "BENCHMARK_OUTPUT", verbatim_doc_comment)] pub output: Option, From 6cb2c5470035f57ea7bebaf479f95937a5722770 Mon Sep 17 00:00:00 2001 From: Charlie Mack Date: Mon, 27 Oct 2025 18:27:19 +0100 Subject: [PATCH 2/6] feat(bench): make RPC url optional input when loading blocks from a file --- bin/reth-bench/src/bench/block_storage.rs | 12 +++- bin/reth-bench/src/bench/context.rs | 59 ++++++++++++++------ bin/reth-bench/src/bench/new_payload_fcu.rs | 5 +- bin/reth-bench/src/bench/new_payload_only.rs | 5 +- 4 files changed, 60 insertions(+), 21 deletions(-) diff --git a/bin/reth-bench/src/bench/block_storage.rs b/bin/reth-bench/src/bench/block_storage.rs index 83da676a346..c4a7b957272 100644 --- a/bin/reth-bench/src/bench/block_storage.rs +++ b/bin/reth-bench/src/bench/block_storage.rs @@ -69,6 +69,11 @@ impl BlockFileHeader { (self.from_block, self.to_block) } + /// Get the block type from the header + pub(super) fn block_type(&self) -> BlockType { + self.block_type + } + fn write_to(&self, writer: &mut impl Write) -> eyre::Result<()> { writer.write_all(MAGIC_BYTES)?; writer.write_all(&[self.version])?; @@ -235,7 +240,10 @@ impl Command { info!("Generating file from RPC: {}", self.rpc_url); let BenchContext { block_provider, benchmark_mode, mut next_block, is_optimism, .. } = - BenchContext::new(&self.benchmark, self.rpc_url.clone()).await?; + BenchContext::new(&self.benchmark, Some(self.rpc_url.clone())).await?; + + let block_provider = + block_provider.as_ref().ok_or_eyre("Block provider not found").unwrap(); // Initialize file writer with header let output_path = self @@ -263,7 +271,7 @@ impl Command { // Fetch and encode block let rlp_data = - self.fetch_and_encode_block(&block_provider, next_block, is_optimism).await?; + self.fetch_and_encode_block(block_provider, next_block, is_optimism).await?; // Write to file file_writer.write_block(&rlp_data)?; diff --git a/bin/reth-bench/src/bench/context.rs b/bin/reth-bench/src/bench/context.rs index 8f9e32bbd25..be825c76a0e 100644 --- a/bin/reth-bench/src/bench/context.rs +++ b/bin/reth-bench/src/bench/context.rs @@ -2,7 +2,8 @@ //! The initialization code is also the same, so this can be shared across benchmark commands. use crate::{ - authenticated_transport::AuthenticatedTransportConnect, bench::block_storage::BlockFileReader, + authenticated_transport::AuthenticatedTransportConnect, + bench::block_storage::{BlockFileReader, BlockType}, bench_mode::BenchMode, }; use alloy_eips::BlockNumberOrTag; @@ -24,7 +25,7 @@ pub(crate) struct BenchContext { /// The auth provider is used for engine API queries. pub(crate) auth_provider: RootProvider, /// The block provider is used for block queries. - pub(crate) block_provider: RootProvider, + pub(crate) block_provider: Option>, /// The benchmark mode, which defines whether the benchmark should run for a closed or open /// range of blocks. pub(crate) benchmark_mode: BenchMode, @@ -37,9 +38,10 @@ pub(crate) struct BenchContext { impl BenchContext { /// This is the initialization code for most benchmarks, taking in a [`BenchmarkArgs`] and /// returning the providers needed to run a benchmark. - pub(crate) async fn new(bench_args: &BenchmarkArgs, rpc_url: String) -> eyre::Result { - info!("Running benchmark using data from RPC URL: {}", rpc_url); - + pub(crate) async fn new( + bench_args: &BenchmarkArgs, + rpc_url: Option, + ) -> eyre::Result { // Ensure that output directory exists and is a directory if let Some(output) = &bench_args.output { if output.is_file() { @@ -52,17 +54,31 @@ impl BenchContext { } } - // set up alloy client for blocks - let client = ClientBuilder::default() - .layer(RetryBackoffLayer::new(10, 800, u64::MAX)) - .http(rpc_url.parse()?); - let block_provider = RootProvider::::new(client); + let block_provider = if let Some(rpc_url) = rpc_url { + info!("Running benchmark using data from RPC URL: {}", rpc_url); + let client = ClientBuilder::default() + .layer(RetryBackoffLayer::new(10, 800, u64::MAX)) + .http(rpc_url.parse()?); + Some(RootProvider::::new(client)) + } else { + info!("Running benchmark using data from file: {:?}", bench_args.from_file); + None + }; // Check if this is an OP chain by checking code at a predeploy address. - let is_optimism = !block_provider - .get_code_at(address!("0x420000000000000000000000000000000000000F")) - .await? - .is_empty(); + let is_optimism = if let Some(ref provider) = block_provider { + !provider + .get_code_at(address!("0x420000000000000000000000000000000000000F")) + .await? + .is_empty() + } else { + // When loading from file, read the block type from the file header + let file_header = + BlockFileReader::get_header(bench_args.from_file.as_ref().ok_or_else(|| { + eyre::eyre!("Either rpc_url or --from-file must be provided") + })?)?; + file_header.block_type() == BlockType::Optimism + }; // construct the authenticated provider let auth_jwt = bench_args @@ -103,8 +119,8 @@ impl BenchContext { let head_number = head_block.header.number; (Some(head_number), Some(head_number + advance)) } else if bench_args.from_file.is_some() { - let file = BlockFileReader::get_header(bench_args.from_file.as_ref().unwrap())?; - let (start, end) = file.block_range(); + let file_header = BlockFileReader::get_header(bench_args.from_file.as_ref().unwrap())?; + let (start, end) = file_header.block_range(); (Some(start), Some(end)) } else { (bench_args.from, bench_args.to) @@ -117,13 +133,21 @@ impl BenchContext { let first_block = match benchmark_mode { BenchMode::Continuous => { // fetch Latest block - block_provider.get_block_by_number(BlockNumberOrTag::Latest).full().await?.unwrap() + block_provider + .as_ref() + .unwrap() + .get_block_by_number(BlockNumberOrTag::Latest) + .full() + .await? + .unwrap() } BenchMode::Range(ref mut range) => { match range.next() { Some(block_number) => { // fetch first block in range block_provider + .as_ref() + .unwrap() .get_block_by_number(block_number.into()) .full() .await? @@ -139,6 +163,7 @@ impl BenchContext { }; let next_block = first_block.header.number + 1; + Ok(Self { auth_provider, block_provider, benchmark_mode, next_block, is_optimism }) } } diff --git a/bin/reth-bench/src/bench/new_payload_fcu.rs b/bin/reth-bench/src/bench/new_payload_fcu.rs index e189ef1d396..f754807b6a9 100644 --- a/bin/reth-bench/src/bench/new_payload_fcu.rs +++ b/bin/reth-bench/src/bench/new_payload_fcu.rs @@ -32,7 +32,7 @@ use tracing::{debug, info}; pub struct Command { /// The RPC url to use for getting data. #[arg(long, value_name = "RPC_URL", verbatim_doc_comment)] - rpc_url: String, + rpc_url: Option, /// How long to wait after a forkchoice update before sending the next payload. #[arg(long, value_name = "WAIT_TIME", value_parser = parse_duration, verbatim_doc_comment)] @@ -135,6 +135,9 @@ impl Command { // Use RPC provider for blocks (existing code) tokio::task::spawn(async move { while benchmark_mode.contains(next_block) { + let block_provider = + block_provider.as_ref().ok_or_eyre("Block provider not found").unwrap(); + let block_res = block_provider .get_block_by_number(next_block.into()) .full() diff --git a/bin/reth-bench/src/bench/new_payload_only.rs b/bin/reth-bench/src/bench/new_payload_only.rs index e3c61393cbb..650583d623c 100644 --- a/bin/reth-bench/src/bench/new_payload_only.rs +++ b/bin/reth-bench/src/bench/new_payload_only.rs @@ -26,7 +26,7 @@ use tracing::{debug, info}; pub struct Command { /// The RPC url to use for getting data. #[arg(long, value_name = "RPC_URL", verbatim_doc_comment)] - rpc_url: String, + rpc_url: Option, /// The size of the block buffer (channel capacity) for prefetching blocks from the RPC /// endpoint. @@ -106,6 +106,9 @@ impl Command { } else { tokio::task::spawn(async move { while benchmark_mode.contains(next_block) { + let block_provider = + block_provider.as_ref().ok_or_eyre("Block provider not found").unwrap(); + let block_res = block_provider .get_block_by_number(next_block.into()) .full() From 04252622be0c7e7bfb9b95d50b80fc2e018a0896 Mon Sep 17 00:00:00 2001 From: Charlie Mack Date: Mon, 27 Oct 2025 19:47:41 +0100 Subject: [PATCH 3/6] feat(bench): Add concrete type for block source for cleaner implementation --- bin/reth-bench/src/bench/block_storage.rs | 27 +-- bin/reth-bench/src/bench/context.rs | 186 ++++++++-------- bin/reth-bench/src/bench/new_payload_fcu.rs | 219 ++++++++++--------- bin/reth-bench/src/bench/new_payload_only.rs | 152 ++++++------- 4 files changed, 289 insertions(+), 295 deletions(-) diff --git a/bin/reth-bench/src/bench/block_storage.rs b/bin/reth-bench/src/bench/block_storage.rs index c4a7b957272..abc0969f825 100644 --- a/bin/reth-bench/src/bench/block_storage.rs +++ b/bin/reth-bench/src/bench/block_storage.rs @@ -1,4 +1,7 @@ -use crate::bench::{context::BenchContext, output::BLOCK_STORAGE_OUTPUT_SUFFIX}; +use crate::bench::{ + context::{BenchContext, BlockSource}, + output::BLOCK_STORAGE_OUTPUT_SUFFIX, +}; use alloy_provider::{network::AnyNetwork, Provider, RootProvider}; use alloy_consensus::{ @@ -64,11 +67,6 @@ impl BlockFileHeader { } } - /// Get the block range from the header - pub(super) fn block_range(&self) -> (u64, u64) { - (self.from_block, self.to_block) - } - /// Get the block type from the header pub(super) fn block_type(&self) -> BlockType { self.block_type @@ -239,12 +237,16 @@ impl Command { pub async fn execute(self, _ctx: CliContext) -> eyre::Result<()> { info!("Generating file from RPC: {}", self.rpc_url); - let BenchContext { block_provider, benchmark_mode, mut next_block, is_optimism, .. } = + let BenchContext { block_source, is_optimism, .. } = BenchContext::new(&self.benchmark, Some(self.rpc_url.clone())).await?; - let block_provider = - block_provider.as_ref().ok_or_eyre("Block provider not found").unwrap(); - + // Extract RPC provider, next_block, and mode + let (provider, mut next_block, mode) = match block_source { + BlockSource::Rpc { provider, next_block, mode } => (provider, next_block, mode), + BlockSource::File { .. } => { + return Err(eyre::eyre!("block-storage command requires RPC mode, not file mode")); + } + }; // Initialize file writer with header let output_path = self .benchmark @@ -266,12 +268,11 @@ impl Command { ); // Process blocks - while benchmark_mode.contains(next_block) { + while mode.contains(next_block) { info!("Processing block {}", next_block); // Fetch and encode block - let rlp_data = - self.fetch_and_encode_block(block_provider, next_block, is_optimism).await?; + let rlp_data = self.fetch_and_encode_block(&provider, next_block, is_optimism).await?; // Write to file file_writer.write_block(&rlp_data)?; diff --git a/bin/reth-bench/src/bench/context.rs b/bin/reth-bench/src/bench/context.rs index be825c76a0e..6b316936148 100644 --- a/bin/reth-bench/src/bench/context.rs +++ b/bin/reth-bench/src/bench/context.rs @@ -1,6 +1,8 @@ //! This contains the [`BenchContext`], which is information that all replay-based benchmarks need. //! The initialization code is also the same, so this can be shared across benchmark commands. +use std::path::PathBuf; + use crate::{ authenticated_transport::AuthenticatedTransportConnect, bench::block_storage::{BlockFileReader, BlockType}, @@ -12,25 +14,39 @@ use alloy_provider::{network::AnyNetwork, Provider, RootProvider}; use alloy_rpc_client::ClientBuilder; use alloy_rpc_types_engine::JwtSecret; use alloy_transport::layers::RetryBackoffLayer; +use eyre::OptionExt; use reqwest::Url; use reth_node_core::args::BenchmarkArgs; use tracing::info; -/// This is intended to be used by benchmarks that replay blocks from an RPC. +/// Represents the source of blocks for benchmarking +pub(crate) enum BlockSource { + /// Blocks are fetched from an RPC endpoint + Rpc { + /// The RPC provider for fetching blocks + provider: RootProvider, + /// The next block number to process + next_block: u64, + /// The benchmark mode (range or continuous) + mode: BenchMode, + }, + /// Blocks are loaded from a file + File { + /// Path to the block file + path: PathBuf, + }, +} + +/// This is intended to be used by benchmarks that replay blocks from an RPC or a file. /// -/// It contains an authenticated provider for engine API queries, a block provider for block -/// queries, a [`BenchMode`] to determine whether the benchmark should run for a closed or open -/// range of blocks, and the next block to fetch. +/// It contains an authenticated provider for engine API queries, +/// the block source, and a [`BenchMode`] to determine whether the benchmark should run for a +/// closed, open, or file-based range of blocks. pub(crate) struct BenchContext { /// The auth provider is used for engine API queries. pub(crate) auth_provider: RootProvider, - /// The block provider is used for block queries. - pub(crate) block_provider: Option>, - /// The benchmark mode, which defines whether the benchmark should run for a closed or open - /// range of blocks. - pub(crate) benchmark_mode: BenchMode, - /// The next block to fetch. - pub(crate) next_block: u64, + /// The source of blocks for the benchmark. + pub(crate) block_source: BlockSource, /// Whether the chain is an OP rollup. pub(crate) is_optimism: bool, } @@ -54,116 +70,90 @@ impl BenchContext { } } - let block_provider = if let Some(rpc_url) = rpc_url { - info!("Running benchmark using data from RPC URL: {}", rpc_url); - let client = ClientBuilder::default() - .layer(RetryBackoffLayer::new(10, 800, u64::MAX)) - .http(rpc_url.parse()?); - Some(RootProvider::::new(client)) - } else { - info!("Running benchmark using data from file: {:?}", bench_args.from_file); - None - }; - - // Check if this is an OP chain by checking code at a predeploy address. - let is_optimism = if let Some(ref provider) = block_provider { - !provider - .get_code_at(address!("0x420000000000000000000000000000000000000F")) - .await? - .is_empty() - } else { - // When loading from file, read the block type from the file header - let file_header = - BlockFileReader::get_header(bench_args.from_file.as_ref().ok_or_else(|| { - eyre::eyre!("Either rpc_url or --from-file must be provided") - })?)?; - file_header.block_type() == BlockType::Optimism - }; - - // construct the authenticated provider + // Construct the authenticated provider first (needed for --advance) let auth_jwt = bench_args .auth_jwtsecret .clone() .ok_or_else(|| eyre::eyre!("--jwt-secret must be provided for authenticated RPC"))?; - // fetch jwt from file - // - // the jwt is hex encoded so we will decode it after + // Fetch jwt from file (the jwt is hex encoded so we will decode it after) let jwt = std::fs::read_to_string(auth_jwt)?; let jwt = JwtSecret::from_hex(jwt)?; - // get engine url + // Get engine url let auth_url = Url::parse(&bench_args.engine_rpc_url)?; - // construct the authed transport + // Construct the authed transport info!("Connecting to Engine RPC at {} for replay", auth_url); let auth_transport = AuthenticatedTransportConnect::new(auth_url, jwt); let client = ClientBuilder::default().connect_with(auth_transport).await?; let auth_provider = RootProvider::::new(client); - // Computes the block range for the benchmark. - // - // - If `--advance` is provided, fetches the latest block and sets: - // - `from = head + 1` - // - `to = head + advance` - // - Otherwise, uses the values from `--from` and `--to`. - let (from, to) = if let Some(advance) = bench_args.advance { - if advance == 0 { - return Err(eyre::eyre!("--advance must be greater than 0")); - } + // Determine block source, benchmark mode, and whether it's optimism - all in one place + let (block_source, is_optimism) = if let Some(file_path) = &bench_args.from_file { + // File-based loading + info!("Running benchmark using data from file: {:?}", file_path); - let head_block = auth_provider - .get_block_by_number(BlockNumberOrTag::Latest) - .await? - .ok_or_else(|| eyre::eyre!("Failed to fetch latest block for --advance"))?; - let head_number = head_block.header.number; - (Some(head_number), Some(head_number + advance)) - } else if bench_args.from_file.is_some() { - let file_header = BlockFileReader::get_header(bench_args.from_file.as_ref().unwrap())?; - let (start, end) = file_header.block_range(); - (Some(start), Some(end)) + // Read block type from file header + let file_header = BlockFileReader::get_header(file_path)?; + let is_optimism = file_header.block_type() == BlockType::Optimism; + + let block_source = BlockSource::File { path: file_path.clone() }; + + (block_source, is_optimism) } else { - (bench_args.from, bench_args.to) - }; + // RPC-based loading + let rpc_url = rpc_url + .ok_or_else(|| eyre::eyre!("Either --rpc-url or --from-file must be provided"))?; + + info!("Running benchmark using data from RPC URL: {}", rpc_url); + + let client = ClientBuilder::default() + .layer(RetryBackoffLayer::new(10, 800, u64::MAX)) + .http(rpc_url.parse()?); + let provider = RootProvider::::new(client); + + // Check if this is an OP chain by checking code at a predeploy address + let is_optimism = !provider + .get_code_at(address!("0x420000000000000000000000000000000000000F")) + .await? + .is_empty(); - // If neither `--from` nor `--to` are provided, we will run the benchmark continuously, - // starting at the latest block. - let mut benchmark_mode = BenchMode::new(from, to)?; + // Compute the block range for the benchmark + let (from, to) = if let Some(advance) = bench_args.advance { + if advance == 0 { + return Err(eyre::eyre!("--advance must be greater than 0")); + } - let first_block = match benchmark_mode { - BenchMode::Continuous => { - // fetch Latest block - block_provider - .as_ref() - .unwrap() + let head_block = auth_provider .get_block_by_number(BlockNumberOrTag::Latest) - .full() .await? - .unwrap() - } - BenchMode::Range(ref mut range) => { - match range.next() { - Some(block_number) => { - // fetch first block in range - block_provider - .as_ref() - .unwrap() - .get_block_by_number(block_number.into()) - .full() - .await? - .unwrap() - } - None => { - return Err(eyre::eyre!( - "Benchmark mode range is empty, please provide a larger range" - )); - } + .ok_or_else(|| eyre::eyre!("Failed to fetch latest block for --advance"))?; + let head_number = head_block.header.number; + (Some(head_number), Some(head_number + advance)) + } else { + (bench_args.from, bench_args.to) + }; + + let mode = BenchMode::new(from, to)?; + + // Determine next_block based on benchmark mode + let next_block = match &mode { + BenchMode::Continuous => { + let latest = provider + .get_block_by_number(BlockNumberOrTag::Latest) + .await? + .ok_or_eyre("Failed to fetch latest block")?; + latest.header.number + 1 } - } - }; + BenchMode::Range(range) => *range.start() + 1, + }; - let next_block = first_block.header.number + 1; + let block_source = BlockSource::Rpc { provider, next_block, mode }; + + (block_source, is_optimism) + }; - Ok(Self { auth_provider, block_provider, benchmark_mode, next_block, is_optimism }) + Ok(Self { auth_provider, block_source, is_optimism }) } } diff --git a/bin/reth-bench/src/bench/new_payload_fcu.rs b/bin/reth-bench/src/bench/new_payload_fcu.rs index f754807b6a9..172a328bcfc 100644 --- a/bin/reth-bench/src/bench/new_payload_fcu.rs +++ b/bin/reth-bench/src/bench/new_payload_fcu.rs @@ -4,7 +4,7 @@ use crate::{ bench::{ block_storage::BlockFileReader, - context::BenchContext, + context::{BenchContext, BlockSource}, output::{ CombinedResult, NewPayloadResult, TotalGasOutput, TotalGasRow, COMBINED_OUTPUT_SUFFIX, GAS_OUTPUT_SUFFIX, @@ -55,13 +55,8 @@ pub struct Command { impl Command { /// Execute `benchmark new-payload-fcu` command pub async fn execute(self, _ctx: CliContext) -> eyre::Result<()> { - let BenchContext { - benchmark_mode, - block_provider, - auth_provider, - mut next_block, - is_optimism, - } = BenchContext::new(&self.benchmark, self.rpc_url).await?; + let BenchContext { block_source, auth_provider, is_optimism } = + BenchContext::new(&self.benchmark, self.rpc_url).await?; let buffer_size = self.rpc_block_buffer_size; @@ -69,51 +64,124 @@ impl Command { let (error_sender, mut error_receiver) = tokio::sync::oneshot::channel(); let (sender, mut receiver) = tokio::sync::mpsc::channel(buffer_size); - // Choose block source based on --from-file argument - if let Some(file_path) = self.benchmark.from_file.clone() { - // Use file reader for blocks - tokio::task::spawn(async move { - let mut reader = match BlockFileReader::new(&file_path, buffer_size) { - Ok(r) => r, - Err(e) => { - tracing::error!("Failed to create block reader: {e}"); - let _ = error_sender.send(e); - return; - } - }; - - // At anypoint if we fail to process a block, or read a batch of blocks, it means - // the file is corrupted somehow so the benchmark should exit as the - // results are invalidated - loop { - let blocks = match reader.read_batch() { - Ok(batch) if batch.is_empty() => break, // EOF - Ok(batch) => batch, + // Spawn task based on block source + match block_source { + BlockSource::File { path } => { + // Use file reader for blocks + tokio::task::spawn(async move { + let mut reader = match BlockFileReader::new(&path, buffer_size) { + Ok(r) => r, Err(e) => { - tracing::error!("Failed to read blocks from file: {e}"); + tracing::error!("Failed to create block reader: {e}"); let _ = error_sender.send(e); return; } }; - for block in blocks { - let header: Header = block.header.clone(); - - let (version, params) = - match consensus_block_to_new_payload(block, is_optimism) { - Ok(result) => result, + // At anypoint if we fail to process a block, or read a batch of blocks, it + // means the file is corrupted somehow so the benchmark should exit as the + // results are invalidated + loop { + let blocks = match reader.read_batch() { + Ok(batch) if batch.is_empty() => break, // EOF + Ok(batch) => batch, + Err(e) => { + tracing::error!("Failed to read blocks from file: {e}"); + let _ = error_sender.send(e); + return; + } + }; + + for block in blocks { + let header: Header = block.header.clone(); + + let (version, params) = + match consensus_block_to_new_payload(block, is_optimism) { + Ok(result) => result, + Err(e) => { + tracing::error!( + "Failed to convert block to new payload: {e}" + ); + let _ = error_sender.send(e); + return; + } + }; + + let head_block_hash = header.hash_slow(); + // For file-based blocks, use the same hash for safe/finalized + let safe_block_hash = head_block_hash; + let finalized_block_hash = head_block_hash; + + if let Err(e) = sender + .send(( + header, + version, + params, + head_block_hash, + safe_block_hash, + finalized_block_hash, + )) + .await + { + tracing::error!("Failed to send block data: {e}"); + return; + } + } + } + }); + } + BlockSource::Rpc { provider, mut next_block, mode } => { + // Use RPC provider for blocks + tokio::task::spawn(async move { + while mode.contains(next_block) { + let block_res = provider + .get_block_by_number(next_block.into()) + .full() + .await + .wrap_err_with(|| { + format!("Failed to fetch block by number {next_block}") + }); + let block = + match block_res.and_then(|opt| opt.ok_or_eyre("Block not found")) { + Ok(block) => block, Err(e) => { - tracing::error!("Failed to convert block to new payload: {e}"); + tracing::error!("Failed to fetch block {next_block}: {e}"); let _ = error_sender.send(e); - return; + break; } }; + let header = block.header.clone().inner.into_header_with_defaults(); + + let (version, params) = match block_to_new_payload(block, is_optimism) { + Ok(result) => result, + Err(e) => { + tracing::error!("Failed to convert block to new payload: {e}"); + let _ = error_sender.send(e); + break; + } + }; let head_block_hash = header.hash_slow(); - // For file-based blocks, use the same hash for safe/finalized - let safe_block_hash = head_block_hash; - let finalized_block_hash = head_block_hash; + let safe_block_hash = + provider.get_block_by_number(header.number.saturating_sub(32).into()); + + let finalized_block_hash = + provider.get_block_by_number(header.number.saturating_sub(64).into()); + let (safe, finalized) = + tokio::join!(safe_block_hash, finalized_block_hash,); + + let safe_block_hash = match safe { + Ok(Some(block)) => block.header.hash, + Ok(None) | Err(_) => head_block_hash, + }; + + let finalized_block_hash = match finalized { + Ok(Some(block)) => block.header.hash, + Ok(None) | Err(_) => head_block_hash, + }; + + next_block += 1; if let Err(e) = sender .send(( header, @@ -126,78 +194,11 @@ impl Command { .await { tracing::error!("Failed to send block data: {e}"); - return; - } - } - } - }); - } else { - // Use RPC provider for blocks (existing code) - tokio::task::spawn(async move { - while benchmark_mode.contains(next_block) { - let block_provider = - block_provider.as_ref().ok_or_eyre("Block provider not found").unwrap(); - - let block_res = block_provider - .get_block_by_number(next_block.into()) - .full() - .await - .wrap_err_with(|| format!("Failed to fetch block by number {next_block}")); - let block = match block_res.and_then(|opt| opt.ok_or_eyre("Block not found")) { - Ok(block) => block, - Err(e) => { - tracing::error!("Failed to fetch block {next_block}: {e}"); - let _ = error_sender.send(e); break; } - }; - - let header = block.header.clone().inner.into_header_with_defaults(); - - let (version, params) = match block_to_new_payload(block, is_optimism) { - Ok(result) => result, - Err(e) => { - tracing::error!("Failed to convert block to new payload: {e}"); - let _ = error_sender.send(e); - break; - } - }; - let head_block_hash = header.hash_slow(); - let safe_block_hash = - block_provider.get_block_by_number(header.number.saturating_sub(32).into()); - - let finalized_block_hash = - block_provider.get_block_by_number(header.number.saturating_sub(64).into()); - - let (safe, finalized) = tokio::join!(safe_block_hash, finalized_block_hash,); - - let safe_block_hash = match safe { - Ok(Some(block)) => block.header.hash, - Ok(None) | Err(_) => head_block_hash, - }; - - let finalized_block_hash = match finalized { - Ok(Some(block)) => block.header.hash, - Ok(None) | Err(_) => head_block_hash, - }; - - next_block += 1; - if let Err(e) = sender - .send(( - header, - version, - params, - head_block_hash, - safe_block_hash, - finalized_block_hash, - )) - .await - { - tracing::error!("Failed to send block data: {e}"); - break; } - } - }); + }); + } } // put results in a summary vec so they can be printed at the end diff --git a/bin/reth-bench/src/bench/new_payload_only.rs b/bin/reth-bench/src/bench/new_payload_only.rs index 650583d623c..b15a50742fb 100644 --- a/bin/reth-bench/src/bench/new_payload_only.rs +++ b/bin/reth-bench/src/bench/new_payload_only.rs @@ -3,7 +3,7 @@ use crate::{ bench::{ block_storage::BlockFileReader, - context::BenchContext, + context::{BenchContext, BlockSource}, output::{ NewPayloadResult, TotalGasOutput, TotalGasRow, GAS_OUTPUT_SUFFIX, NEW_PAYLOAD_OUTPUT_SUFFIX, @@ -45,13 +45,8 @@ pub struct Command { impl Command { /// Execute `benchmark new-payload-only` command pub async fn execute(self, _ctx: CliContext) -> eyre::Result<()> { - let BenchContext { - benchmark_mode, - block_provider, - auth_provider, - mut next_block, - is_optimism, - } = BenchContext::new(&self.benchmark, self.rpc_url).await?; + let BenchContext { block_source, auth_provider, is_optimism } = + BenchContext::new(&self.benchmark, self.rpc_url).await?; let buffer_size = self.rpc_block_buffer_size; @@ -59,87 +54,94 @@ impl Command { let (error_sender, mut error_receiver) = tokio::sync::oneshot::channel(); let (sender, mut receiver) = tokio::sync::mpsc::channel(buffer_size); - if let Some(file_path) = self.benchmark.from_file.clone() { - // Use file reader for blocks - tokio::task::spawn(async move { - let mut reader = match BlockFileReader::new(&file_path, buffer_size) { - Ok(r) => r, - Err(e) => { - tracing::error!("Failed to create block reader: {e}"); - let _ = error_sender.send(e); - return; - } - }; - - // At anypoint if we fail to process a block, or read a batch of blocks, it means - // the file is corrupted somehow so the benchmark should exit as the - // results are invalidated - loop { - let blocks = match reader.read_batch() { - Ok(batch) if batch.is_empty() => break, // EOF - Ok(batch) => batch, + // Spawn task based on block source + match block_source { + BlockSource::File { path } => { + // Use file reader for blocks + tokio::task::spawn(async move { + let mut reader = match BlockFileReader::new(&path, buffer_size) { + Ok(r) => r, Err(e) => { - tracing::error!("Failed to read blocks from file: {e}"); + tracing::error!("Failed to create block reader: {e}"); let _ = error_sender.send(e); return; } }; - for block in blocks { - let header: Header = block.header.clone(); - let (version, params) = - match consensus_block_to_new_payload(block, is_optimism) { - Ok(result) => result, + + // At anypoint if we fail to process a block, or read a batch of blocks, it + // means the file is corrupted somehow so the benchmark should exit as the + // results are invalidated + loop { + let blocks = match reader.read_batch() { + Ok(batch) if batch.is_empty() => break, // EOF + Ok(batch) => batch, + Err(e) => { + tracing::error!("Failed to read blocks from file: {e}"); + let _ = error_sender.send(e); + return; + } + }; + for block in blocks { + let header: Header = block.header.clone(); + let (version, params) = + match consensus_block_to_new_payload(block, is_optimism) { + Ok(result) => result, + Err(e) => { + tracing::error!( + "Failed to convert block to new payload: {e}" + ); + let _ = error_sender.send(e); + return; + } + }; + + if let Err(e) = sender.send((header, version, params)).await { + tracing::error!("Failed to send block data: {e}"); + return; + } + } + } + }); + } + BlockSource::Rpc { provider, mut next_block, mode } => { + // Use RPC provider for blocks + tokio::task::spawn(async move { + while mode.contains(next_block) { + let block_res = provider + .get_block_by_number(next_block.into()) + .full() + .await + .wrap_err_with(|| { + format!("Failed to fetch block by number {next_block}") + }); + let block = + match block_res.and_then(|opt| opt.ok_or_eyre("Block not found")) { + Ok(block) => block, Err(e) => { - tracing::error!("Failed to convert block to new payload: {e}"); + tracing::error!("Failed to fetch block {next_block}: {e}"); let _ = error_sender.send(e); - return; + break; } }; - + let header = block.header.clone().inner.into_header_with_defaults(); + + let (version, params) = match block_to_new_payload(block, is_optimism) { + Ok(result) => result, + Err(e) => { + tracing::error!("Failed to convert block to new payload: {e}"); + let _ = error_sender.send(e); + break; + } + }; + + next_block += 1; if let Err(e) = sender.send((header, version, params)).await { tracing::error!("Failed to send block data: {e}"); - return; - } - } - } - }); - } else { - tokio::task::spawn(async move { - while benchmark_mode.contains(next_block) { - let block_provider = - block_provider.as_ref().ok_or_eyre("Block provider not found").unwrap(); - - let block_res = block_provider - .get_block_by_number(next_block.into()) - .full() - .await - .wrap_err_with(|| format!("Failed to fetch block by number {next_block}")); - let block = match block_res.and_then(|opt| opt.ok_or_eyre("Block not found")) { - Ok(block) => block, - Err(e) => { - tracing::error!("Failed to fetch block {next_block}: {e}"); - let _ = error_sender.send(e); - break; - } - }; - let header = block.header.clone().inner.into_header_with_defaults(); - - let (version, params) = match block_to_new_payload(block, is_optimism) { - Ok(result) => result, - Err(e) => { - tracing::error!("Failed to convert block to new payload: {e}"); - let _ = error_sender.send(e); break; } - }; - - next_block += 1; - if let Err(e) = sender.send((header, version, params)).await { - tracing::error!("Failed to send block data: {e}"); - break; } - } - }); + }); + } } // put results in a summary vec so they can be printed at the end From 77a728f750158000041b0bed9c63c2d9d4ae7d0a Mon Sep 17 00:00:00 2001 From: Charlie Mack Date: Tue, 28 Oct 2025 10:05:39 +0100 Subject: [PATCH 4/6] feat(bench): Add tests for block_storage --- bin/reth-bench/src/bench/block_storage.rs | 113 ++++++++++++++++++++-- 1 file changed, 104 insertions(+), 9 deletions(-) diff --git a/bin/reth-bench/src/bench/block_storage.rs b/bin/reth-bench/src/bench/block_storage.rs index abc0969f825..6fd97c08675 100644 --- a/bin/reth-bench/src/bench/block_storage.rs +++ b/bin/reth-bench/src/bench/block_storage.rs @@ -1,3 +1,6 @@ +//! Contains functionality for generating a file of blocks from an RPC endpoint. +//! The file can then be used as input for replaying benchmarks from a file. + use crate::bench::{ context::{BenchContext, BlockSource}, output::BLOCK_STORAGE_OUTPUT_SUFFIX, @@ -21,7 +24,7 @@ use std::{ }; use tracing::{debug, info}; -//type alias for the consensus block +/// Type alias for the consensus block type ConsensusBlock = Block, OpTxEnvelope>, Header>; /// File format version for future compatibility @@ -30,7 +33,7 @@ const FILE_FORMAT_VERSION: u8 = 1; /// Magic bytes to identify the file format const MAGIC_BYTES: &[u8] = b"RETH"; -/// `reth benchmark generate-file` command +/// `reth benchmark block-storage` command #[derive(Debug, Parser)] pub struct Command { /// The RPC url to use for getting data. @@ -41,6 +44,14 @@ pub struct Command { benchmark: BenchmarkArgs, } +// The type of block that is stored in the file +#[derive(Debug, PartialEq, Eq, Clone, Copy)] +#[repr(u8)] +pub(crate) enum BlockType { + Ethereum = 0, + Optimism = 1, +} + /// Block file header #[derive(Debug)] pub(crate) struct BlockFileHeader { @@ -50,13 +61,6 @@ pub(crate) struct BlockFileHeader { to_block: u64, } -#[derive(Debug, Clone, Copy, PartialEq, Eq)] -#[repr(u8)] -pub(crate) enum BlockType { - Ethereum = 0, - Optimism = 1, -} - impl BlockFileHeader { fn new(is_optimism: bool, from_block: u64, to_block: u64) -> Self { Self { @@ -346,3 +350,94 @@ impl Command { Ok(rlp) } } + +#[cfg(test)] +mod tests { + use super::*; + use std::io::Cursor; + + #[test] + fn test_header_roundtrip_ethereum() { + let header = BlockFileHeader::new(false, 100, 200); + let mut buffer = Vec::new(); + + header.write_to(&mut buffer).unwrap(); + + let mut cursor = Cursor::new(buffer); + let read_header = BlockFileHeader::read_from(&mut cursor).unwrap(); + + assert_eq!(read_header.version, FILE_FORMAT_VERSION); + assert_eq!(read_header.block_type, BlockType::Ethereum); + assert_eq!(read_header.from_block, 100); + assert_eq!(read_header.to_block, 200); + } + + #[test] + fn test_header_roundtrip_optimism() { + let header = BlockFileHeader::new(true, 500, 1000); + let mut buffer = Vec::new(); + + header.write_to(&mut buffer).unwrap(); + + let mut cursor = Cursor::new(buffer); + let read_header = BlockFileHeader::read_from(&mut cursor).unwrap(); + + assert_eq!(read_header.version, FILE_FORMAT_VERSION); + assert_eq!(read_header.block_type, BlockType::Optimism); + assert_eq!(read_header.from_block, 500); + assert_eq!(read_header.to_block, 1000); + } + + #[test] + fn test_invalid_magic_bytes() { + let mut buffer = vec![0xFF, 0xFF, 0xFF, 0xFF]; // Wrong magic + buffer.push(FILE_FORMAT_VERSION); + buffer.push(0); // BlockType::Ethereum + buffer.extend_from_slice(&100u64.to_le_bytes()); + buffer.extend_from_slice(&200u64.to_le_bytes()); + + let mut cursor = Cursor::new(buffer); + let result = BlockFileHeader::read_from(&mut cursor); + + assert!(result.is_err()); + assert!(result.unwrap_err().to_string().contains("Invalid file format")); + } + + #[test] + fn test_unsupported_version() { + let mut buffer = Vec::new(); + buffer.extend_from_slice(MAGIC_BYTES); + buffer.push(99); // Invalid version + buffer.push(0); + buffer.extend_from_slice(&100u64.to_le_bytes()); + buffer.extend_from_slice(&200u64.to_le_bytes()); + + let mut cursor = Cursor::new(buffer); + let result = BlockFileHeader::read_from(&mut cursor); + + assert!(result.is_err()); + assert!(result.unwrap_err().to_string().contains("Unsupported file version")); + } + + #[test] + fn test_unknown_block_type() { + let mut buffer = Vec::new(); + buffer.extend_from_slice(MAGIC_BYTES); + buffer.push(FILE_FORMAT_VERSION); + buffer.push(99); // Invalid block type + buffer.extend_from_slice(&100u64.to_le_bytes()); + buffer.extend_from_slice(&200u64.to_le_bytes()); + + let mut cursor = Cursor::new(buffer); + let result = BlockFileHeader::read_from(&mut cursor); + + assert!(result.is_err()); + assert!(result.unwrap_err().to_string().contains("Unknown block type")); + } + + #[test] + fn test_block_type_values() { + assert_eq!(BlockType::Ethereum as u8, 0); + assert_eq!(BlockType::Optimism as u8, 1); + } +} From d143b05136ff43d4ad975c3ace864b73603a2ff0 Mon Sep 17 00:00:00 2001 From: Charlie Mack Date: Tue, 28 Oct 2025 11:36:42 +0100 Subject: [PATCH 5/6] feat(bench): Support for early optimism blocks in valid_payload --- bin/reth-bench/src/valid_payload.rs | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/bin/reth-bench/src/valid_payload.rs b/bin/reth-bench/src/valid_payload.rs index 305919551a3..04dca7281bb 100644 --- a/bin/reth-bench/src/valid_payload.rs +++ b/bin/reth-bench/src/valid_payload.rs @@ -210,7 +210,12 @@ where (EngineApiMessageVersion::V2, serde_json::to_value((input,))?) } ExecutionPayload::V1(payload) => { - (EngineApiMessageVersion::V1, serde_json::to_value((payload,))?) + // Optimism doesn't support EngineApiMessageVersion::V1, so we use V2 instead + if is_optimism { + (EngineApiMessageVersion::V2, serde_json::to_value((payload,))?) + } else { + (EngineApiMessageVersion::V1, serde_json::to_value((payload,))?) + } } }; From e83d2ff78c3752cc169cdff360017624341ce522 Mon Sep 17 00:00:00 2001 From: Charlie Mack Date: Tue, 28 Oct 2025 12:09:43 +0100 Subject: [PATCH 6/6] feat(bench): Make jwt secret optional incase of file based loading (not required) --- bin/reth-bench/src/bench/context.rs | 38 +++++++++----------- bin/reth-bench/src/bench/new_payload_fcu.rs | 3 ++ bin/reth-bench/src/bench/new_payload_only.rs | 3 ++ 3 files changed, 23 insertions(+), 21 deletions(-) diff --git a/bin/reth-bench/src/bench/context.rs b/bin/reth-bench/src/bench/context.rs index 6b316936148..62f1876d348 100644 --- a/bin/reth-bench/src/bench/context.rs +++ b/bin/reth-bench/src/bench/context.rs @@ -44,7 +44,7 @@ pub(crate) enum BlockSource { /// closed, open, or file-based range of blocks. pub(crate) struct BenchContext { /// The auth provider is used for engine API queries. - pub(crate) auth_provider: RootProvider, + pub(crate) auth_provider: Option>, /// The source of blocks for the benchmark. pub(crate) block_source: BlockSource, /// Whether the chain is an OP rollup. @@ -70,24 +70,19 @@ impl BenchContext { } } - // Construct the authenticated provider first (needed for --advance) - let auth_jwt = bench_args - .auth_jwtsecret - .clone() - .ok_or_else(|| eyre::eyre!("--jwt-secret must be provided for authenticated RPC"))?; + // Construct the authenticated provider if it is provided + let auth_provider = if let Some(auth_jwt) = bench_args.auth_jwtsecret.clone() { + let jwt = std::fs::read_to_string(auth_jwt)?; + let jwt = JwtSecret::from_hex(jwt)?; + let auth_url = Url::parse(&bench_args.engine_rpc_url)?; - // Fetch jwt from file (the jwt is hex encoded so we will decode it after) - let jwt = std::fs::read_to_string(auth_jwt)?; - let jwt = JwtSecret::from_hex(jwt)?; - - // Get engine url - let auth_url = Url::parse(&bench_args.engine_rpc_url)?; - - // Construct the authed transport - info!("Connecting to Engine RPC at {} for replay", auth_url); - let auth_transport = AuthenticatedTransportConnect::new(auth_url, jwt); - let client = ClientBuilder::default().connect_with(auth_transport).await?; - let auth_provider = RootProvider::::new(client); + info!("Connecting to Engine RPC at {} for replay", auth_url); + let auth_transport = AuthenticatedTransportConnect::new(auth_url, jwt); + let client = ClientBuilder::default().connect_with(auth_transport).await?; + Some(RootProvider::::new(client)) + } else { + None + }; // Determine block source, benchmark mode, and whether it's optimism - all in one place let (block_source, is_optimism) = if let Some(file_path) = &bench_args.from_file { @@ -103,8 +98,7 @@ impl BenchContext { (block_source, is_optimism) } else { // RPC-based loading - let rpc_url = rpc_url - .ok_or_else(|| eyre::eyre!("Either --rpc-url or --from-file must be provided"))?; + let rpc_url = rpc_url.ok_or_eyre("Either --rpc-url or --from-file must be provided")?; info!("Running benchmark using data from RPC URL: {}", rpc_url); @@ -126,9 +120,11 @@ impl BenchContext { } let head_block = auth_provider + .as_ref() + .ok_or_eyre("--jwt-secret must be provided for authenticated RPC")? .get_block_by_number(BlockNumberOrTag::Latest) .await? - .ok_or_else(|| eyre::eyre!("Failed to fetch latest block for --advance"))?; + .ok_or_eyre("Failed to fetch latest block for --advance")?; let head_number = head_block.header.number; (Some(head_number), Some(head_number + advance)) } else { diff --git a/bin/reth-bench/src/bench/new_payload_fcu.rs b/bin/reth-bench/src/bench/new_payload_fcu.rs index 172a328bcfc..320177908b3 100644 --- a/bin/reth-bench/src/bench/new_payload_fcu.rs +++ b/bin/reth-bench/src/bench/new_payload_fcu.rs @@ -60,6 +60,9 @@ impl Command { let buffer_size = self.rpc_block_buffer_size; + let auth_provider = + auth_provider.ok_or_eyre("--jwt-secret must be provided for authenticated RPC")?; + // Use a oneshot channel to propagate errors from the spawned task let (error_sender, mut error_receiver) = tokio::sync::oneshot::channel(); let (sender, mut receiver) = tokio::sync::mpsc::channel(buffer_size); diff --git a/bin/reth-bench/src/bench/new_payload_only.rs b/bin/reth-bench/src/bench/new_payload_only.rs index b15a50742fb..cf49a2f42c1 100644 --- a/bin/reth-bench/src/bench/new_payload_only.rs +++ b/bin/reth-bench/src/bench/new_payload_only.rs @@ -50,6 +50,9 @@ impl Command { let buffer_size = self.rpc_block_buffer_size; + let auth_provider = + auth_provider.ok_or_eyre("--jwt-secret must be provided for authenticated RPC")?; + // Use a oneshot channel to propagate errors from the spawned task let (error_sender, mut error_receiver) = tokio::sync::oneshot::channel(); let (sender, mut receiver) = tokio::sync::mpsc::channel(buffer_size);