Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions bin/reth-bench/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,10 @@ reth-primitives-traits.workspace = true
reth-tracing.workspace = true

# alloy
alloy-rlp.workspace = true
alloy-eips.workspace = true
alloy-json-rpc.workspace = true
alloy-consensus.workspace = true
alloy-primitives.workspace = true
alloy-provider = { workspace = true, features = ["engine-api", "reqwest-rustls-tls"], default-features = false }
alloy-pubsub.workspace = true
Expand Down
167 changes: 167 additions & 0 deletions bin/reth-bench/src/bench/block_storage.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,167 @@
//! Block storage module for reading blocks from files

use alloy_consensus::{
transaction::{Either, EthereumTxEnvelope},
Block, Header, TxEip4844Variant,
};
use alloy_rlp::Decodable;
use eyre::Context;
use op_alloy_consensus::OpTxEnvelope;
use std::{
fs::File,
io::{BufReader, Read},
path::Path,
};
use tracing::{debug, info};

pub(crate) enum ConsensusBlock {
Ethereum(Block<EthereumTxEnvelope<TxEip4844Variant>, Header>),
Op(Block<OpTxEnvelope, Header>),
}

impl ConsensusBlock {
pub(crate) fn header(&self) -> &Header {
match self {
Self::Ethereum(block) => &block.header,
Self::Op(block) => &block.header,
}
}

pub(crate) fn into_either_block(
self,
) -> Block<Either<EthereumTxEnvelope<TxEip4844Variant>, OpTxEnvelope>, Header> {
match self {
Self::Ethereum(block) => block.map_transactions(Either::Left),
Self::Op(block) => block.map_transactions(Either::Right),
}
}
}

/// Detect if a block file contains Optimism blocks
///
/// This works by attempting to decode the first block as both Ethereum and Optimism types.
/// OP blocks have additional fields that Ethereum blocks don't have, so we check for those.
pub(crate) fn detect_optimism_from_file(path: &Path) -> eyre::Result<bool> {
let file = File::open(path)?;
let mut reader = BufReader::new(file);

// Read length prefix
let mut len_bytes = [0u8; 4];
reader.read_exact(&mut len_bytes)?;
let len = u32::from_le_bytes(len_bytes) as usize;

// Read block data
let mut block_bytes = vec![0u8; len];
reader.read_exact(&mut block_bytes)?;

// Try decoding as both types
let op_decode = Block::<OpTxEnvelope, Header>::decode(&mut block_bytes.as_slice());
let eth_decode =
Block::<EthereumTxEnvelope<TxEip4844Variant>, Header>::decode(&mut block_bytes.as_slice());

match (op_decode, eth_decode) {
(Ok(_), Err(_)) => {
// Only OP decode succeeded - definitely Optimism
info!("Detected Optimism chain from block file");
Ok(true)
}
(Err(_), Ok(_)) => {
// Only Ethereum decode succeeded - definitely Ethereum
info!("Detected Ethereum chain from block file");
Ok(false)
}
(Ok(_), Ok(_)) => {
// Need to check whether this can happen in practice
info!("Block file format is ambiguous, defaulting to Ethereum");
Ok(false)
}
(Err(op_err), Err(eth_err)) => Err(eyre::eyre!(
"Failed to decode block as either Ethereum or Optimism.\nOP error: {}\nETH error: {}",
op_err,
eth_err
)),
}
}

/// Block file reader that streams blocks in batches
pub(crate) struct BlockFileReader {
reader: BufReader<File>,
batch_size: usize,
blocks_read: usize,
is_optimism: bool,
}

impl BlockFileReader {
/// Create a new block file reader
pub(crate) fn new(path: &Path, batch_size: usize, is_optimism: bool) -> eyre::Result<Self> {
let file =
File::open(path).wrap_err_with(|| format!("Failed to open block file: {:?}", path))?;

info!("Opened block file: {:?}", path);

Ok(Self { reader: BufReader::new(file), batch_size, blocks_read: 0, is_optimism })
}

/// Read the next batch of blocks from the file
pub(crate) fn read_batch(&mut self) -> eyre::Result<Vec<ConsensusBlock>> {
let mut batch = Vec::with_capacity(self.batch_size);

for _ in 0..self.batch_size {
match self.read_next_block()? {
Some(block) => batch.push(block),
None => break, // EOF
}
}

if !batch.is_empty() {
debug!("Read batch of {} blocks (total read: {})", batch.len(), self.blocks_read);
}

Ok(batch)
}

/// Read a single block from the file
fn read_next_block(&mut self) -> eyre::Result<Option<ConsensusBlock>> {
// Read length prefix (4 bytes, little-endian)
let mut len_bytes = [0u8; 4];
match self.reader.read_exact(&mut len_bytes) {
Ok(_) => {}
Err(e) if e.kind() == std::io::ErrorKind::UnexpectedEof => {
return Ok(None);
}
Err(e) => return Err(e.into()),
}

let len = u32::from_le_bytes(len_bytes) as usize;

// Read RLP-encoded block data
let mut block_bytes = vec![0u8; len];
self.reader.read_exact(&mut block_bytes).wrap_err("Failed to read block data")?;

let block = if self.is_optimism {
let block = Block::<OpTxEnvelope, Header>::decode(&mut block_bytes.as_slice())?;
ConsensusBlock::Op(block)
} else {
let block = Block::<EthereumTxEnvelope<TxEip4844Variant>, Header>::decode(
&mut block_bytes.as_slice(),
)?;
ConsensusBlock::Ethereum(block)
};

// Skip newline separator if present
let mut newline = [0u8; 1];
let _ = self.reader.read_exact(&mut newline);

self.blocks_read += 1;

Ok(Some(block))
}
}

impl Iterator for BlockFileReader {
type Item = eyre::Result<ConsensusBlock>;

fn next(&mut self) -> Option<Self::Item> {
self.read_next_block().transpose()
}
}
116 changes: 75 additions & 41 deletions bin/reth-bench/src/bench/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,9 @@ use tracing::info;
pub(crate) struct BenchContext {
/// The auth provider is used for engine API queries.
pub(crate) auth_provider: RootProvider<AnyNetwork>,
/// The block provider is used for block queries.
pub(crate) block_provider: RootProvider<AnyNetwork>,
/// The block provider is used for block queries if we are using an RPC as the source of
/// blocks.
pub(crate) block_provider: Option<RootProvider<AnyNetwork>>,
/// The benchmark mode, which defines whether the benchmark should run for a closed or open
/// range of blocks.
pub(crate) benchmark_mode: BenchMode,
Expand All @@ -34,9 +35,10 @@ pub(crate) struct BenchContext {
impl BenchContext {
/// This is the initialization code for most benchmarks, taking in a [`BenchmarkArgs`] and
/// returning the providers needed to run a benchmark.
pub(crate) async fn new(bench_args: &BenchmarkArgs, rpc_url: String) -> eyre::Result<Self> {
info!("Running benchmark using data from RPC URL: {}", rpc_url);

pub(crate) async fn new(
bench_args: &BenchmarkArgs,
rpc_url: Option<String>,
) -> eyre::Result<Self> {
// Ensure that output directory exists and is a directory
if let Some(output) = &bench_args.output {
if output.is_file() {
Expand All @@ -49,34 +51,48 @@ impl BenchContext {
}
}

// set up alloy client for blocks
let client = ClientBuilder::default()
.layer(RetryBackoffLayer::new(10, 800, u64::MAX))
.http(rpc_url.parse()?);
let block_provider = RootProvider::<AnyNetwork>::new(client);
// Set up client if using RPC
let block_provider = if let Some(ref rpc_url) = rpc_url {
info!("Setting up block provider for RPC: {}", rpc_url);
let client = ClientBuilder::default()
.layer(RetryBackoffLayer::new(10, 800, u64::MAX))
.http(rpc_url.parse()?);
Some(RootProvider::<AnyNetwork>::new(client))
} else {
None
};

// Check if this is an OP chain by checking code at a predeploy address.
let is_optimism = !block_provider
.get_code_at(address!("0x420000000000000000000000000000000000000F"))
.await?
.is_empty();
// Determine if this is Optimism
let is_optimism = if let Some(file_path) = &bench_args.from_file {
// Detect from file
info!("Reading blocks from file: {:?}", file_path);
let is_op = crate::bench::block_storage::detect_optimism_from_file(file_path)?;
is_op
} else {
// Check if this is an OP chain by checking code at a predeploy address
let is_op = !block_provider
.as_ref()
.unwrap()
.get_code_at(address!("0x420000000000000000000000000000000000000F"))
.await?
.is_empty();
is_op
};

// construct the authenticated provider
// Construct the authenticated provider
let auth_jwt = bench_args
.auth_jwtsecret
.clone()
.ok_or_else(|| eyre::eyre!("--jwt-secret must be provided for authenticated RPC"))?;

// fetch jwt from file
//
// the jwt is hex encoded so we will decode it after
// Fetch jwt from file (hex encoded)
let jwt = std::fs::read_to_string(auth_jwt)?;
let jwt = JwtSecret::from_hex(jwt)?;

// get engine url
// Get engine url
let auth_url = Url::parse(&bench_args.engine_rpc_url)?;

// construct the authed transport
// Construct the authed transport
info!("Connecting to Engine RPC at {} for replay", auth_url);
let auth_transport = AuthenticatedTransportConnect::new(auth_url, jwt);
let client = ClientBuilder::default().connect_with(auth_transport).await?;
Expand Down Expand Up @@ -107,31 +123,49 @@ impl BenchContext {
// starting at the latest block.
let mut benchmark_mode = BenchMode::new(from, to)?;

let first_block = match benchmark_mode {
BenchMode::Continuous => {
// fetch Latest block
block_provider.get_block_by_number(BlockNumberOrTag::Latest).full().await?.unwrap()
}
BenchMode::Range(ref mut range) => {
match range.next() {
Some(block_number) => {
// fetch first block in range
block_provider
.get_block_by_number(block_number.into())
.full()
.await?
.unwrap()
}
None => {
return Err(eyre::eyre!(
"Benchmark mode range is empty, please provide a larger range"
));
// Only fetch first block if doing RPC benchmark
let next_block = if let Some(ref provider) = block_provider {
match benchmark_mode {
BenchMode::Continuous => {
// Fetch latest block
provider
.get_block_by_number(BlockNumberOrTag::Latest)
.full()
.await?
.ok_or_else(|| eyre::eyre!("Failed to fetch latest block"))?
.header
.number +
1
}
BenchMode::Range(ref mut range) => {
match range.next() {
Some(block_number) => {
// Fetch first block in range
provider
.get_block_by_number(block_number.into())
.full()
.await?
.ok_or_else(|| {
eyre::eyre!("Failed to fetch block {}", block_number)
})?
.header
.number +
1
}
None => {
return Err(eyre::eyre!(
"Benchmark mode range is empty, please provide a larger range"
));
}
}
}
}
} else {
// For file-based benchmarks, next_block isn't used by the fetching logic
// Set to 0 as a sentinel value
0
};

let next_block = first_block.header.number + 1;
Ok(Self { auth_provider, block_provider, benchmark_mode, next_block, is_optimism })
}
}
Loading