diff --git a/Cargo.lock b/Cargo.lock index b8c0da68164..831892a020b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -7324,11 +7324,13 @@ dependencies = [ name = "reth-bench" version = "1.8.2" dependencies = [ + "alloy-consensus", "alloy-eips", "alloy-json-rpc", "alloy-primitives", "alloy-provider", "alloy-pubsub", + "alloy-rlp", "alloy-rpc-client", "alloy-rpc-types-engine", "alloy-transport", diff --git a/bin/reth-bench/Cargo.toml b/bin/reth-bench/Cargo.toml index 891fa4f9780..ec818ff8256 100644 --- a/bin/reth-bench/Cargo.toml +++ b/bin/reth-bench/Cargo.toml @@ -25,6 +25,8 @@ reth-tracing.workspace = true # alloy alloy-eips.workspace = true alloy-json-rpc.workspace = true +alloy-consensus.workspace = true +alloy-rlp.workspace = true alloy-primitives.workspace = true alloy-provider = { workspace = true, features = ["engine-api", "reqwest-rustls-tls"], default-features = false } alloy-pubsub.workspace = true diff --git a/bin/reth-bench/src/bench/block_storage.rs b/bin/reth-bench/src/bench/block_storage.rs new file mode 100644 index 00000000000..abc0969f825 --- /dev/null +++ b/bin/reth-bench/src/bench/block_storage.rs @@ -0,0 +1,348 @@ +use crate::bench::{ + context::{BenchContext, BlockSource}, + output::BLOCK_STORAGE_OUTPUT_SUFFIX, +}; +use alloy_provider::{network::AnyNetwork, Provider, RootProvider}; + +use alloy_consensus::{ + transaction::{Either, EthereumTxEnvelope}, + Block, BlockBody, Header, TxEip4844Variant, +}; +use alloy_rlp::{Decodable, Encodable}; +use clap::Parser; +use eyre::{Context, OptionExt}; +use op_alloy_consensus::OpTxEnvelope; +use reth_cli_runner::CliContext; +use reth_node_core::args::BenchmarkArgs; +use std::{ + fs::File, + io::{BufReader, BufWriter, Read, Write}, + path::Path, +}; +use tracing::{debug, info}; + +//type alias for the consensus block +type ConsensusBlock = Block, OpTxEnvelope>, Header>; + +/// File format version for future compatibility +const FILE_FORMAT_VERSION: u8 = 1; + +/// Magic bytes to identify the file format +const MAGIC_BYTES: &[u8] = b"RETH"; + +/// `reth benchmark generate-file` command +#[derive(Debug, Parser)] +pub struct Command { + /// The RPC url to use for getting data. + #[arg(long, value_name = "RPC_URL", verbatim_doc_comment)] + rpc_url: String, + + #[command(flatten)] + benchmark: BenchmarkArgs, +} + +/// Block file header +#[derive(Debug)] +pub(crate) struct BlockFileHeader { + version: u8, + block_type: BlockType, + from_block: u64, + to_block: u64, +} + +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +#[repr(u8)] +pub(crate) enum BlockType { + Ethereum = 0, + Optimism = 1, +} + +impl BlockFileHeader { + fn new(is_optimism: bool, from_block: u64, to_block: u64) -> Self { + Self { + version: FILE_FORMAT_VERSION, + block_type: if is_optimism { BlockType::Optimism } else { BlockType::Ethereum }, + from_block, + to_block, + } + } + + /// Get the block type from the header + pub(super) fn block_type(&self) -> BlockType { + self.block_type + } + + fn write_to(&self, writer: &mut impl Write) -> eyre::Result<()> { + writer.write_all(MAGIC_BYTES)?; + writer.write_all(&[self.version])?; + writer.write_all(&[self.block_type as u8])?; + writer.write_all(&self.from_block.to_le_bytes())?; + writer.write_all(&self.to_block.to_le_bytes())?; + Ok(()) + } + + /// Read header from file (for the decoder) + fn read_from(reader: &mut impl std::io::Read) -> eyre::Result { + let mut magic = [0u8; 4]; + reader.read_exact(&mut magic)?; + if magic != MAGIC_BYTES { + return Err(eyre::eyre!("Invalid file format")); + } + + let mut version = [0u8; 1]; + reader.read_exact(&mut version)?; + if version[0] != FILE_FORMAT_VERSION { + return Err(eyre::eyre!("Unsupported file version: {}", version[0])); + } + + let mut block_type = [0u8; 1]; + reader.read_exact(&mut block_type)?; + + let mut from_block = [0u8; 8]; + reader.read_exact(&mut from_block)?; + let from_block = u64::from_le_bytes(from_block); + + let mut to_block = [0u8; 8]; + reader.read_exact(&mut to_block)?; + let to_block = u64::from_le_bytes(to_block); + + Ok(Self { + version: version[0], + block_type: match block_type[0] { + 0 => BlockType::Ethereum, + 1 => BlockType::Optimism, + _ => return Err(eyre::eyre!("Unknown block type: {}", block_type[0])), + }, + from_block, + to_block, + }) + } +} + +/// Writer for block storage files +struct BlockFileWriter { + writer: BufWriter, + blocks_written: usize, +} + +impl BlockFileWriter { + fn new(path: &Path, header: BlockFileHeader) -> eyre::Result { + let mut writer = BufWriter::new(File::create(path)?); + header.write_to(&mut writer)?; + + Ok(Self { writer, blocks_written: 0 }) + } + + fn write_block(&mut self, rlp_data: &[u8]) -> eyre::Result<()> { + // Write length as 4 bytes (supports blocks up to 4GB) + self.writer.write_all(&(rlp_data.len() as u32).to_le_bytes())?; + self.writer.write_all(rlp_data)?; + self.blocks_written += 1; + Ok(()) + } + + fn finish(mut self) -> eyre::Result { + self.writer.flush()?; + Ok(self.blocks_written) + } +} + +/// Block file reader that streams blocks in batches +pub(crate) struct BlockFileReader { + reader: BufReader, + batch_size: usize, + blocks_read: usize, + block_type: BlockType, +} + +impl BlockFileReader { + /// Create a new block file reader + pub(crate) fn new(path: &Path, batch_size: usize) -> eyre::Result { + let file = + File::open(path).wrap_err_with(|| format!("Failed to open block file: {:?}", path))?; + + let mut reader = BufReader::new(file); + + info!("Opened block file: {:?}", path); + + let header = BlockFileHeader::read_from(&mut reader)?; + + let block_type = header.block_type; + + Ok(Self { reader, batch_size, blocks_read: 0, block_type }) + } + + /// Get the header from the file + pub(crate) fn get_header(path: &Path) -> eyre::Result { + let mut file = + File::open(path).wrap_err_with(|| format!("Failed to open block file: {:?}", path))?; + BlockFileHeader::read_from(&mut file) + } + + /// Read the next batch of blocks from the file + pub(crate) fn read_batch(&mut self) -> eyre::Result> { + let mut batch = Vec::with_capacity(self.batch_size); + + for _ in 0..self.batch_size { + match self.read_next_block()? { + Some(block) => batch.push(block), + None => break, // EOF + } + } + + if !batch.is_empty() { + debug!("Read batch of {} blocks (total read: {})", batch.len(), self.blocks_read); + } + + Ok(batch) + } + + /// Read a single block from the file + fn read_next_block(&mut self) -> eyre::Result> { + // Read length prefix (4 bytes, little-endian) + let mut len_bytes = [0u8; 4]; + match self.reader.read_exact(&mut len_bytes) { + Ok(_) => {} + Err(e) if e.kind() == std::io::ErrorKind::UnexpectedEof => { + return Ok(None); + } + Err(e) => return Err(e.into()), + } + + let len = u32::from_le_bytes(len_bytes) as usize; + + // Read RLP-encoded block data + let mut block_bytes = vec![0u8; len]; + self.reader.read_exact(&mut block_bytes).wrap_err("Failed to read block data")?; + + let block: ConsensusBlock = match self.block_type { + BlockType::Ethereum => Block::, Header>::decode( + &mut block_bytes.as_slice(), + )? + .map_transactions(|tx| Either::Left(tx)), + BlockType::Optimism => { + Block::::decode(&mut block_bytes.as_slice())? + .map_transactions(|tx| Either::Right(tx)) + } + }; + + self.blocks_read += 1; + + Ok(Some(block)) + } +} + +impl Command { + /// Execute `benchmark block-storage` command + pub async fn execute(self, _ctx: CliContext) -> eyre::Result<()> { + info!("Generating file from RPC: {}", self.rpc_url); + + let BenchContext { block_source, is_optimism, .. } = + BenchContext::new(&self.benchmark, Some(self.rpc_url.clone())).await?; + + // Extract RPC provider, next_block, and mode + let (provider, mut next_block, mode) = match block_source { + BlockSource::Rpc { provider, next_block, mode } => (provider, next_block, mode), + BlockSource::File { .. } => { + return Err(eyre::eyre!("block-storage command requires RPC mode, not file mode")); + } + }; + // Initialize file writer with header + let output_path = self + .benchmark + .output + .as_ref() + .ok_or_eyre("--output is required")? + .join(BLOCK_STORAGE_OUTPUT_SUFFIX); + + let from_block = self.benchmark.from.ok_or_eyre("--from is required")?; + let to_block = self.benchmark.to.ok_or_eyre("--to is required")?; + + let header = BlockFileHeader::new(is_optimism, from_block, to_block); + let mut file_writer = BlockFileWriter::new(&output_path, header)?; + + info!( + "Writing {} blocks to {:?}", + if is_optimism { "Optimism" } else { "Ethereum" }, + output_path + ); + + // Process blocks + while mode.contains(next_block) { + info!("Processing block {}", next_block); + + // Fetch and encode block + let rlp_data = self.fetch_and_encode_block(&provider, next_block, is_optimism).await?; + + // Write to file + file_writer.write_block(&rlp_data)?; + + next_block += 1; + } + + let blocks_written = file_writer.finish()?; + info!("Successfully wrote {} blocks to file", blocks_written); + + Ok(()) + } + + async fn fetch_and_encode_block( + &self, + provider: &RootProvider, + block_number: u64, + is_optimism: bool, + ) -> eyre::Result> { + // Fetch block + let block = provider + .get_block_by_number(block_number.into()) + .full() + .await? + .ok_or_eyre(format!("Block {} not found", block_number))?; + + let inner_block = block.into_inner(); + + let rlp = if is_optimism { + // Create Optimism block + let op_block = Block { + header: inner_block.header.inner.into_header_with_defaults(), + body: BlockBody { + transactions: inner_block + .transactions + .into_transactions() + .into_iter() + .map(|tx| tx.try_into()) + .collect::, _>>() + .map_err(|e| eyre::eyre!("Failed to convert to Optimism tx: {:?}", e))?, + ommers: vec![], + withdrawals: inner_block.withdrawals, + }, + }; + + let mut buf = Vec::with_capacity(op_block.length()); + op_block.encode(&mut buf); + buf + } else { + // Create Ethereum block + let eth_block = Block { + header: inner_block.header.inner.into_header_with_defaults(), + body: BlockBody { + transactions: inner_block + .transactions + .into_transactions() + .into_iter() + .map(|tx| tx.try_into_envelope()) + .collect::>, _>>() + .map_err(|e| eyre::eyre!("Failed to convert to Ethereum tx: {:?}", e))?, + ommers: vec![], + withdrawals: inner_block.withdrawals, + }, + }; + + let mut buf = Vec::with_capacity(eth_block.length()); + eth_block.encode(&mut buf); + buf + }; + + Ok(rlp) + } +} diff --git a/bin/reth-bench/src/bench/context.rs b/bin/reth-bench/src/bench/context.rs index 1d53ce8e1a3..6b316936148 100644 --- a/bin/reth-bench/src/bench/context.rs +++ b/bin/reth-bench/src/bench/context.rs @@ -1,32 +1,52 @@ //! This contains the [`BenchContext`], which is information that all replay-based benchmarks need. //! The initialization code is also the same, so this can be shared across benchmark commands. -use crate::{authenticated_transport::AuthenticatedTransportConnect, bench_mode::BenchMode}; +use std::path::PathBuf; + +use crate::{ + authenticated_transport::AuthenticatedTransportConnect, + bench::block_storage::{BlockFileReader, BlockType}, + bench_mode::BenchMode, +}; use alloy_eips::BlockNumberOrTag; use alloy_primitives::address; use alloy_provider::{network::AnyNetwork, Provider, RootProvider}; use alloy_rpc_client::ClientBuilder; use alloy_rpc_types_engine::JwtSecret; use alloy_transport::layers::RetryBackoffLayer; +use eyre::OptionExt; use reqwest::Url; use reth_node_core::args::BenchmarkArgs; use tracing::info; -/// This is intended to be used by benchmarks that replay blocks from an RPC. +/// Represents the source of blocks for benchmarking +pub(crate) enum BlockSource { + /// Blocks are fetched from an RPC endpoint + Rpc { + /// The RPC provider for fetching blocks + provider: RootProvider, + /// The next block number to process + next_block: u64, + /// The benchmark mode (range or continuous) + mode: BenchMode, + }, + /// Blocks are loaded from a file + File { + /// Path to the block file + path: PathBuf, + }, +} + +/// This is intended to be used by benchmarks that replay blocks from an RPC or a file. /// -/// It contains an authenticated provider for engine API queries, a block provider for block -/// queries, a [`BenchMode`] to determine whether the benchmark should run for a closed or open -/// range of blocks, and the next block to fetch. +/// It contains an authenticated provider for engine API queries, +/// the block source, and a [`BenchMode`] to determine whether the benchmark should run for a +/// closed, open, or file-based range of blocks. pub(crate) struct BenchContext { /// The auth provider is used for engine API queries. pub(crate) auth_provider: RootProvider, - /// The block provider is used for block queries. - pub(crate) block_provider: RootProvider, - /// The benchmark mode, which defines whether the benchmark should run for a closed or open - /// range of blocks. - pub(crate) benchmark_mode: BenchMode, - /// The next block to fetch. - pub(crate) next_block: u64, + /// The source of blocks for the benchmark. + pub(crate) block_source: BlockSource, /// Whether the chain is an OP rollup. pub(crate) is_optimism: bool, } @@ -34,9 +54,10 @@ pub(crate) struct BenchContext { impl BenchContext { /// This is the initialization code for most benchmarks, taking in a [`BenchmarkArgs`] and /// returning the providers needed to run a benchmark. - pub(crate) async fn new(bench_args: &BenchmarkArgs, rpc_url: String) -> eyre::Result { - info!("Running benchmark using data from RPC URL: {}", rpc_url); - + pub(crate) async fn new( + bench_args: &BenchmarkArgs, + rpc_url: Option, + ) -> eyre::Result { // Ensure that output directory exists and is a directory if let Some(output) = &bench_args.output { if output.is_file() { @@ -49,89 +70,90 @@ impl BenchContext { } } - // set up alloy client for blocks - let client = ClientBuilder::default() - .layer(RetryBackoffLayer::new(10, 800, u64::MAX)) - .http(rpc_url.parse()?); - let block_provider = RootProvider::::new(client); - - // Check if this is an OP chain by checking code at a predeploy address. - let is_optimism = !block_provider - .get_code_at(address!("0x420000000000000000000000000000000000000F")) - .await? - .is_empty(); - - // construct the authenticated provider + // Construct the authenticated provider first (needed for --advance) let auth_jwt = bench_args .auth_jwtsecret .clone() .ok_or_else(|| eyre::eyre!("--jwt-secret must be provided for authenticated RPC"))?; - // fetch jwt from file - // - // the jwt is hex encoded so we will decode it after + // Fetch jwt from file (the jwt is hex encoded so we will decode it after) let jwt = std::fs::read_to_string(auth_jwt)?; let jwt = JwtSecret::from_hex(jwt)?; - // get engine url + // Get engine url let auth_url = Url::parse(&bench_args.engine_rpc_url)?; - // construct the authed transport + // Construct the authed transport info!("Connecting to Engine RPC at {} for replay", auth_url); let auth_transport = AuthenticatedTransportConnect::new(auth_url, jwt); let client = ClientBuilder::default().connect_with(auth_transport).await?; let auth_provider = RootProvider::::new(client); - // Computes the block range for the benchmark. - // - // - If `--advance` is provided, fetches the latest block and sets: - // - `from = head + 1` - // - `to = head + advance` - // - Otherwise, uses the values from `--from` and `--to`. - let (from, to) = if let Some(advance) = bench_args.advance { - if advance == 0 { - return Err(eyre::eyre!("--advance must be greater than 0")); - } + // Determine block source, benchmark mode, and whether it's optimism - all in one place + let (block_source, is_optimism) = if let Some(file_path) = &bench_args.from_file { + // File-based loading + info!("Running benchmark using data from file: {:?}", file_path); - let head_block = auth_provider - .get_block_by_number(BlockNumberOrTag::Latest) - .await? - .ok_or_else(|| eyre::eyre!("Failed to fetch latest block for --advance"))?; - let head_number = head_block.header.number; - (Some(head_number), Some(head_number + advance)) + // Read block type from file header + let file_header = BlockFileReader::get_header(file_path)?; + let is_optimism = file_header.block_type() == BlockType::Optimism; + + let block_source = BlockSource::File { path: file_path.clone() }; + + (block_source, is_optimism) } else { - (bench_args.from, bench_args.to) - }; + // RPC-based loading + let rpc_url = rpc_url + .ok_or_else(|| eyre::eyre!("Either --rpc-url or --from-file must be provided"))?; - // If neither `--from` nor `--to` are provided, we will run the benchmark continuously, - // starting at the latest block. - let mut benchmark_mode = BenchMode::new(from, to)?; + info!("Running benchmark using data from RPC URL: {}", rpc_url); - let first_block = match benchmark_mode { - BenchMode::Continuous => { - // fetch Latest block - block_provider.get_block_by_number(BlockNumberOrTag::Latest).full().await?.unwrap() - } - BenchMode::Range(ref mut range) => { - match range.next() { - Some(block_number) => { - // fetch first block in range - block_provider - .get_block_by_number(block_number.into()) - .full() - .await? - .unwrap() - } - None => { - return Err(eyre::eyre!( - "Benchmark mode range is empty, please provide a larger range" - )); - } + let client = ClientBuilder::default() + .layer(RetryBackoffLayer::new(10, 800, u64::MAX)) + .http(rpc_url.parse()?); + let provider = RootProvider::::new(client); + + // Check if this is an OP chain by checking code at a predeploy address + let is_optimism = !provider + .get_code_at(address!("0x420000000000000000000000000000000000000F")) + .await? + .is_empty(); + + // Compute the block range for the benchmark + let (from, to) = if let Some(advance) = bench_args.advance { + if advance == 0 { + return Err(eyre::eyre!("--advance must be greater than 0")); } - } + + let head_block = auth_provider + .get_block_by_number(BlockNumberOrTag::Latest) + .await? + .ok_or_else(|| eyre::eyre!("Failed to fetch latest block for --advance"))?; + let head_number = head_block.header.number; + (Some(head_number), Some(head_number + advance)) + } else { + (bench_args.from, bench_args.to) + }; + + let mode = BenchMode::new(from, to)?; + + // Determine next_block based on benchmark mode + let next_block = match &mode { + BenchMode::Continuous => { + let latest = provider + .get_block_by_number(BlockNumberOrTag::Latest) + .await? + .ok_or_eyre("Failed to fetch latest block")?; + latest.header.number + 1 + } + BenchMode::Range(range) => *range.start() + 1, + }; + + let block_source = BlockSource::Rpc { provider, next_block, mode }; + + (block_source, is_optimism) }; - let next_block = first_block.header.number + 1; - Ok(Self { auth_provider, block_provider, benchmark_mode, next_block, is_optimism }) + Ok(Self { auth_provider, block_source, is_optimism }) } } diff --git a/bin/reth-bench/src/bench/mod.rs b/bin/reth-bench/src/bench/mod.rs index da3ccb1a8bb..97e8106efa5 100644 --- a/bin/reth-bench/src/bench/mod.rs +++ b/bin/reth-bench/src/bench/mod.rs @@ -5,6 +5,7 @@ use reth_cli_runner::CliContext; use reth_node_core::args::LogArgs; use reth_tracing::FileWorkerGuard; +mod block_storage; mod context; mod new_payload_fcu; mod new_payload_only; @@ -41,6 +42,9 @@ pub enum Subcommands { /// `cast block latest --full --json | reth-bench send-payload --rpc-url localhost:5000 /// --jwt-secret $(cat ~/.local/share/reth/mainnet/jwt.hex)` SendPayload(send_payload::Command), + + /// Command for generating a file of blocks from an RPC endpoint. + BlockStorage(block_storage::Command), } impl BenchmarkCommand { @@ -53,6 +57,7 @@ impl BenchmarkCommand { Subcommands::NewPayloadFcu(command) => command.execute(ctx).await, Subcommands::NewPayloadOnly(command) => command.execute(ctx).await, Subcommands::SendPayload(command) => command.execute(ctx).await, + Subcommands::BlockStorage(command) => command.execute(ctx).await, } } diff --git a/bin/reth-bench/src/bench/new_payload_fcu.rs b/bin/reth-bench/src/bench/new_payload_fcu.rs index ce094895ee3..172a328bcfc 100644 --- a/bin/reth-bench/src/bench/new_payload_fcu.rs +++ b/bin/reth-bench/src/bench/new_payload_fcu.rs @@ -3,14 +3,19 @@ use crate::{ bench::{ - context::BenchContext, + block_storage::BlockFileReader, + context::{BenchContext, BlockSource}, output::{ CombinedResult, NewPayloadResult, TotalGasOutput, TotalGasRow, COMBINED_OUTPUT_SUFFIX, GAS_OUTPUT_SUFFIX, }, }, - valid_payload::{block_to_new_payload, call_forkchoice_updated, call_new_payload}, + valid_payload::{ + block_to_new_payload, call_forkchoice_updated, call_new_payload, + consensus_block_to_new_payload, + }, }; +use alloy_consensus::Header; use alloy_provider::Provider; use alloy_rpc_types_engine::ForkchoiceState; use clap::Parser; @@ -27,7 +32,7 @@ use tracing::{debug, info}; pub struct Command { /// The RPC url to use for getting data. #[arg(long, value_name = "RPC_URL", verbatim_doc_comment)] - rpc_url: String, + rpc_url: Option, /// How long to wait after a forkchoice update before sending the next payload. #[arg(long, value_name = "WAIT_TIME", value_parser = parse_duration, verbatim_doc_comment)] @@ -50,13 +55,8 @@ pub struct Command { impl Command { /// Execute `benchmark new-payload-fcu` command pub async fn execute(self, _ctx: CliContext) -> eyre::Result<()> { - let BenchContext { - benchmark_mode, - block_provider, - auth_provider, - mut next_block, - is_optimism, - } = BenchContext::new(&self.benchmark, self.rpc_url).await?; + let BenchContext { block_source, auth_provider, is_optimism } = + BenchContext::new(&self.benchmark, self.rpc_url).await?; let buffer_size = self.rpc_block_buffer_size; @@ -64,67 +64,142 @@ impl Command { let (error_sender, mut error_receiver) = tokio::sync::oneshot::channel(); let (sender, mut receiver) = tokio::sync::mpsc::channel(buffer_size); - tokio::task::spawn(async move { - while benchmark_mode.contains(next_block) { - let block_res = block_provider - .get_block_by_number(next_block.into()) - .full() - .await - .wrap_err_with(|| format!("Failed to fetch block by number {next_block}")); - let block = match block_res.and_then(|opt| opt.ok_or_eyre("Block not found")) { - Ok(block) => block, - Err(e) => { - tracing::error!("Failed to fetch block {next_block}: {e}"); - let _ = error_sender.send(e); - break; + // Spawn task based on block source + match block_source { + BlockSource::File { path } => { + // Use file reader for blocks + tokio::task::spawn(async move { + let mut reader = match BlockFileReader::new(&path, buffer_size) { + Ok(r) => r, + Err(e) => { + tracing::error!("Failed to create block reader: {e}"); + let _ = error_sender.send(e); + return; + } + }; + + // At anypoint if we fail to process a block, or read a batch of blocks, it + // means the file is corrupted somehow so the benchmark should exit as the + // results are invalidated + loop { + let blocks = match reader.read_batch() { + Ok(batch) if batch.is_empty() => break, // EOF + Ok(batch) => batch, + Err(e) => { + tracing::error!("Failed to read blocks from file: {e}"); + let _ = error_sender.send(e); + return; + } + }; + + for block in blocks { + let header: Header = block.header.clone(); + + let (version, params) = + match consensus_block_to_new_payload(block, is_optimism) { + Ok(result) => result, + Err(e) => { + tracing::error!( + "Failed to convert block to new payload: {e}" + ); + let _ = error_sender.send(e); + return; + } + }; + + let head_block_hash = header.hash_slow(); + // For file-based blocks, use the same hash for safe/finalized + let safe_block_hash = head_block_hash; + let finalized_block_hash = head_block_hash; + + if let Err(e) = sender + .send(( + header, + version, + params, + head_block_hash, + safe_block_hash, + finalized_block_hash, + )) + .await + { + tracing::error!("Failed to send block data: {e}"); + return; + } + } } - }; - let header = block.header.clone(); - - let (version, params) = match block_to_new_payload(block, is_optimism) { - Ok(result) => result, - Err(e) => { - tracing::error!("Failed to convert block to new payload: {e}"); - let _ = error_sender.send(e); - break; + }); + } + BlockSource::Rpc { provider, mut next_block, mode } => { + // Use RPC provider for blocks + tokio::task::spawn(async move { + while mode.contains(next_block) { + let block_res = provider + .get_block_by_number(next_block.into()) + .full() + .await + .wrap_err_with(|| { + format!("Failed to fetch block by number {next_block}") + }); + let block = + match block_res.and_then(|opt| opt.ok_or_eyre("Block not found")) { + Ok(block) => block, + Err(e) => { + tracing::error!("Failed to fetch block {next_block}: {e}"); + let _ = error_sender.send(e); + break; + } + }; + + let header = block.header.clone().inner.into_header_with_defaults(); + + let (version, params) = match block_to_new_payload(block, is_optimism) { + Ok(result) => result, + Err(e) => { + tracing::error!("Failed to convert block to new payload: {e}"); + let _ = error_sender.send(e); + break; + } + }; + let head_block_hash = header.hash_slow(); + let safe_block_hash = + provider.get_block_by_number(header.number.saturating_sub(32).into()); + + let finalized_block_hash = + provider.get_block_by_number(header.number.saturating_sub(64).into()); + + let (safe, finalized) = + tokio::join!(safe_block_hash, finalized_block_hash,); + + let safe_block_hash = match safe { + Ok(Some(block)) => block.header.hash, + Ok(None) | Err(_) => head_block_hash, + }; + + let finalized_block_hash = match finalized { + Ok(Some(block)) => block.header.hash, + Ok(None) | Err(_) => head_block_hash, + }; + + next_block += 1; + if let Err(e) = sender + .send(( + header, + version, + params, + head_block_hash, + safe_block_hash, + finalized_block_hash, + )) + .await + { + tracing::error!("Failed to send block data: {e}"); + break; + } } - }; - let head_block_hash = header.hash; - let safe_block_hash = - block_provider.get_block_by_number(header.number.saturating_sub(32).into()); - - let finalized_block_hash = - block_provider.get_block_by_number(header.number.saturating_sub(64).into()); - - let (safe, finalized) = tokio::join!(safe_block_hash, finalized_block_hash,); - - let safe_block_hash = match safe { - Ok(Some(block)) => block.header.hash, - Ok(None) | Err(_) => head_block_hash, - }; - - let finalized_block_hash = match finalized { - Ok(Some(block)) => block.header.hash, - Ok(None) | Err(_) => head_block_hash, - }; - - next_block += 1; - if let Err(e) = sender - .send(( - header, - version, - params, - head_block_hash, - safe_block_hash, - finalized_block_hash, - )) - .await - { - tracing::error!("Failed to send block data: {e}"); - break; - } + }); } - }); + } // put results in a summary vec so they can be printed at the end let mut results = Vec::new(); diff --git a/bin/reth-bench/src/bench/new_payload_only.rs b/bin/reth-bench/src/bench/new_payload_only.rs index 3dfa619ec7b..b15a50742fb 100644 --- a/bin/reth-bench/src/bench/new_payload_only.rs +++ b/bin/reth-bench/src/bench/new_payload_only.rs @@ -2,14 +2,16 @@ use crate::{ bench::{ - context::BenchContext, + block_storage::BlockFileReader, + context::{BenchContext, BlockSource}, output::{ NewPayloadResult, TotalGasOutput, TotalGasRow, GAS_OUTPUT_SUFFIX, NEW_PAYLOAD_OUTPUT_SUFFIX, }, }, - valid_payload::{block_to_new_payload, call_new_payload}, + valid_payload::{block_to_new_payload, call_new_payload, consensus_block_to_new_payload}, }; +use alloy_consensus::Header; use alloy_provider::Provider; use clap::Parser; use csv::Writer; @@ -24,7 +26,7 @@ use tracing::{debug, info}; pub struct Command { /// The RPC url to use for getting data. #[arg(long, value_name = "RPC_URL", verbatim_doc_comment)] - rpc_url: String, + rpc_url: Option, /// The size of the block buffer (channel capacity) for prefetching blocks from the RPC /// endpoint. @@ -43,13 +45,8 @@ pub struct Command { impl Command { /// Execute `benchmark new-payload-only` command pub async fn execute(self, _ctx: CliContext) -> eyre::Result<()> { - let BenchContext { - benchmark_mode, - block_provider, - auth_provider, - mut next_block, - is_optimism, - } = BenchContext::new(&self.benchmark, self.rpc_url).await?; + let BenchContext { block_source, auth_provider, is_optimism } = + BenchContext::new(&self.benchmark, self.rpc_url).await?; let buffer_size = self.rpc_block_buffer_size; @@ -57,39 +54,95 @@ impl Command { let (error_sender, mut error_receiver) = tokio::sync::oneshot::channel(); let (sender, mut receiver) = tokio::sync::mpsc::channel(buffer_size); - tokio::task::spawn(async move { - while benchmark_mode.contains(next_block) { - let block_res = block_provider - .get_block_by_number(next_block.into()) - .full() - .await - .wrap_err_with(|| format!("Failed to fetch block by number {next_block}")); - let block = match block_res.and_then(|opt| opt.ok_or_eyre("Block not found")) { - Ok(block) => block, - Err(e) => { - tracing::error!("Failed to fetch block {next_block}: {e}"); - let _ = error_sender.send(e); - break; + // Spawn task based on block source + match block_source { + BlockSource::File { path } => { + // Use file reader for blocks + tokio::task::spawn(async move { + let mut reader = match BlockFileReader::new(&path, buffer_size) { + Ok(r) => r, + Err(e) => { + tracing::error!("Failed to create block reader: {e}"); + let _ = error_sender.send(e); + return; + } + }; + + // At anypoint if we fail to process a block, or read a batch of blocks, it + // means the file is corrupted somehow so the benchmark should exit as the + // results are invalidated + loop { + let blocks = match reader.read_batch() { + Ok(batch) if batch.is_empty() => break, // EOF + Ok(batch) => batch, + Err(e) => { + tracing::error!("Failed to read blocks from file: {e}"); + let _ = error_sender.send(e); + return; + } + }; + for block in blocks { + let header: Header = block.header.clone(); + let (version, params) = + match consensus_block_to_new_payload(block, is_optimism) { + Ok(result) => result, + Err(e) => { + tracing::error!( + "Failed to convert block to new payload: {e}" + ); + let _ = error_sender.send(e); + return; + } + }; + + if let Err(e) = sender.send((header, version, params)).await { + tracing::error!("Failed to send block data: {e}"); + return; + } + } } - }; - let header = block.header.clone(); - - let (version, params) = match block_to_new_payload(block, is_optimism) { - Ok(result) => result, - Err(e) => { - tracing::error!("Failed to convert block to new payload: {e}"); - let _ = error_sender.send(e); - break; + }); + } + BlockSource::Rpc { provider, mut next_block, mode } => { + // Use RPC provider for blocks + tokio::task::spawn(async move { + while mode.contains(next_block) { + let block_res = provider + .get_block_by_number(next_block.into()) + .full() + .await + .wrap_err_with(|| { + format!("Failed to fetch block by number {next_block}") + }); + let block = + match block_res.and_then(|opt| opt.ok_or_eyre("Block not found")) { + Ok(block) => block, + Err(e) => { + tracing::error!("Failed to fetch block {next_block}: {e}"); + let _ = error_sender.send(e); + break; + } + }; + let header = block.header.clone().inner.into_header_with_defaults(); + + let (version, params) = match block_to_new_payload(block, is_optimism) { + Ok(result) => result, + Err(e) => { + tracing::error!("Failed to convert block to new payload: {e}"); + let _ = error_sender.send(e); + break; + } + }; + + next_block += 1; + if let Err(e) = sender.send((header, version, params)).await { + tracing::error!("Failed to send block data: {e}"); + break; + } } - }; - - next_block += 1; - if let Err(e) = sender.send((header, version, params)).await { - tracing::error!("Failed to send block data: {e}"); - break; - } + }); } - }); + } // put results in a summary vec so they can be printed at the end let mut results = Vec::new(); diff --git a/bin/reth-bench/src/bench/output.rs b/bin/reth-bench/src/bench/output.rs index 794cd2768df..09b6125fedc 100644 --- a/bin/reth-bench/src/bench/output.rs +++ b/bin/reth-bench/src/bench/output.rs @@ -15,6 +15,9 @@ pub(crate) const COMBINED_OUTPUT_SUFFIX: &str = "combined_latency.csv"; /// This is the suffix for new payload output csv files. pub(crate) const NEW_PAYLOAD_OUTPUT_SUFFIX: &str = "new_payload_latency.csv"; +/// This is the suffix for block output files. +pub(crate) const BLOCK_STORAGE_OUTPUT_SUFFIX: &str = "blocks.bin"; + /// This represents the results of a single `newPayload` call in the benchmark, containing the gas /// used and the `newPayload` latency. #[derive(Debug)] diff --git a/bin/reth-bench/src/valid_payload.rs b/bin/reth-bench/src/valid_payload.rs index d253506b22b..305919551a3 100644 --- a/bin/reth-bench/src/valid_payload.rs +++ b/bin/reth-bench/src/valid_payload.rs @@ -2,7 +2,8 @@ //! response. This is useful for benchmarking, as it allows us to wait for a payload to be valid //! before sending additional calls. -use alloy_eips::eip7685::Requests; +use alloy_consensus::{Block, BlockHeader, Sealable, Transaction}; +use alloy_eips::{eip7685::Requests, Encodable2718}; use alloy_provider::{ext::EngineApi, network::AnyRpcBlock, Network, Provider}; use alloy_rpc_types_engine::{ ExecutionPayload, ExecutionPayloadInputV2, ForkchoiceState, ForkchoiceUpdated, @@ -146,6 +147,17 @@ pub(crate) fn block_to_new_payload( })? .into_consensus(); + consensus_block_to_new_payload(block, is_optimism) +} + +pub(crate) fn consensus_block_to_new_payload( + block: Block, + is_optimism: bool, +) -> eyre::Result<(EngineApiMessageVersion, serde_json::Value)> +where + T: Encodable2718 + Transaction, + H: BlockHeader + Sealable, +{ // Convert to execution payload let (payload, sidecar) = ExecutionPayload::from_block_slow(&block); @@ -160,7 +172,7 @@ pub(crate) fn block_to_new_payload( serde_json::to_value(( OpExecutionPayloadV4 { payload_inner: payload, - withdrawals_root: block.withdrawals_root.unwrap(), + withdrawals_root: block.withdrawals_root().unwrap(), }, cancun.versioned_hashes.clone(), cancun.parent_beacon_block_root, diff --git a/crates/node/core/src/args/benchmark_args.rs b/crates/node/core/src/args/benchmark_args.rs index 2865054ded1..2dc96551caa 100644 --- a/crates/node/core/src/args/benchmark_args.rs +++ b/crates/node/core/src/args/benchmark_args.rs @@ -45,6 +45,15 @@ pub struct BenchmarkArgs { )] pub engine_rpc_url: String, + /// The path to the file that contains blocks to be used for the benchmark. + #[arg( + long = "from-file", + value_name = "FILE", + conflicts_with_all = &["from", "to", "advance"], + verbatim_doc_comment + )] + pub from_file: Option, + /// The path to the output directory for granular benchmark results. #[arg(long, short, value_name = "BENCHMARK_OUTPUT", verbatim_doc_comment)] pub output: Option,