diff --git a/Cargo.lock b/Cargo.lock index b8c0da68164..7c5012b4b53 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -8084,7 +8084,6 @@ dependencies = [ "reth-testing-utils", "reth-tracing", "reth-trie", - "reth-trie-db", "reth-trie-parallel", "reth-trie-sparse", "reth-trie-sparse-parallel", @@ -10770,7 +10769,6 @@ dependencies = [ "proptest-arbitrary-interop", "rand 0.9.2", "rayon", - "reth-db-api", "reth-execution-errors", "reth-metrics", "reth-primitives-traits", diff --git a/crates/engine/tree/Cargo.toml b/crates/engine/tree/Cargo.toml index 503b5af2630..ba99898a842 100644 --- a/crates/engine/tree/Cargo.toml +++ b/crates/engine/tree/Cargo.toml @@ -30,7 +30,6 @@ reth-prune.workspace = true reth-revm.workspace = true reth-stages-api.workspace = true reth-tasks.workspace = true -reth-trie-db.workspace = true reth-trie-parallel.workspace = true reth-trie-sparse = { workspace = true, features = ["std", "metrics"] } reth-trie-sparse-parallel = { workspace = true, features = ["std"] } @@ -134,7 +133,6 @@ test-utils = [ "reth-trie/test-utils", "reth-trie-sparse/test-utils", "reth-prune-types?/test-utils", - "reth-trie-db/test-utils", "reth-trie-parallel/test-utils", "reth-ethereum-primitives/test-utils", "reth-node-ethereum/test-utils", diff --git a/crates/engine/tree/benches/state_root_task.rs b/crates/engine/tree/benches/state_root_task.rs index 70d9e037e9d..e13ad26bc6b 100644 --- a/crates/engine/tree/benches/state_root_task.rs +++ b/crates/engine/tree/benches/state_root_task.rs @@ -20,11 +20,10 @@ use reth_evm::OnStateHook; use reth_evm_ethereum::EthEvmConfig; use reth_primitives_traits::{Account as RethAccount, Recovered, StorageEntry}; use reth_provider::{ - providers::{BlockchainProvider, ConsistentDbView}, + providers::{BlockchainProvider, OverlayStateProviderFactory}, test_utils::{create_test_provider_factory_with_chain_spec, MockNodeTypesWithDB}, AccountReader, ChainSpecProvider, HashingWriter, ProviderFactory, }; -use reth_trie::TrieInput; use revm_primitives::{HashMap, U256}; use revm_state::{Account as RevmAccount, AccountInfo, AccountStatus, EvmState, EvmStorageSlot}; use std::{hint::black_box, sync::Arc}; @@ -238,8 +237,7 @@ fn bench_state_root(c: &mut Criterion) { >, >(), StateProviderBuilder::new(provider.clone(), genesis_hash, None), - ConsistentDbView::new_with_latest_tip(provider).unwrap(), - TrieInput::default(), + OverlayStateProviderFactory::new(provider), &TreeConfig::default(), ) .map_err(|(err, ..)| err) diff --git a/crates/engine/tree/src/persistence.rs b/crates/engine/tree/src/persistence.rs index 751356fc399..12482b1a162 100644 --- a/crates/engine/tree/src/persistence.rs +++ b/crates/engine/tree/src/persistence.rs @@ -142,7 +142,10 @@ where &self, blocks: Vec>, ) -> Result, PersistenceError> { - debug!(target: "engine::persistence", first=?blocks.first().map(|b| b.recovered_block.num_hash()), last=?blocks.last().map(|b| b.recovered_block.num_hash()), "Saving range of blocks"); + let first_block_hash = blocks.first().map(|b| b.recovered_block.num_hash()); + let last_block_hash = blocks.last().map(|b| b.recovered_block.num_hash()); + debug!(target: "engine::persistence", first=?first_block_hash, last=?last_block_hash, "Saving range of blocks"); + let start_time = Instant::now(); let last_block_hash_num = blocks.last().map(|block| BlockNumHash { hash: block.recovered_block().hash(), @@ -155,6 +158,9 @@ where provider_rw.save_blocks(blocks)?; provider_rw.commit()?; } + + debug!(target: "engine::persistence", first=?first_block_hash, last=?last_block_hash, "Saved range of blocks"); + self.metrics.save_blocks_duration_seconds.record(start_time.elapsed()); Ok(last_block_hash_num) } diff --git a/crates/engine/tree/src/tree/mod.rs b/crates/engine/tree/src/tree/mod.rs index a189b643f98..324e3375d2c 100644 --- a/crates/engine/tree/src/tree/mod.rs +++ b/crates/engine/tree/src/tree/mod.rs @@ -29,9 +29,8 @@ use reth_payload_primitives::{ }; use reth_primitives_traits::{NodePrimitives, RecoveredBlock, SealedBlock, SealedHeader}; use reth_provider::{ - providers::ConsistentDbView, BlockReader, DatabaseProviderFactory, HashedPostStateProvider, - ProviderError, StateProviderBox, StateProviderFactory, StateReader, TransactionVariant, - TrieReader, + BlockReader, DatabaseProviderFactory, HashedPostStateProvider, ProviderError, StateProviderBox, + StateProviderFactory, StateReader, TransactionVariant, TrieReader, }; use reth_revm::database::StateProviderDatabase; use reth_stages_api::ControlFlow; diff --git a/crates/engine/tree/src/tree/payload_processor/mod.rs b/crates/engine/tree/src/tree/payload_processor/mod.rs index ac16c60dd67..7e54d8a38e2 100644 --- a/crates/engine/tree/src/tree/payload_processor/mod.rs +++ b/crates/engine/tree/src/tree/payload_processor/mod.rs @@ -26,12 +26,12 @@ use reth_evm::{ ConfigureEvm, EvmEnvFor, OnStateHook, SpecFor, TxEnvFor, }; use reth_primitives_traits::NodePrimitives; -use reth_provider::{ - providers::ConsistentDbView, BlockReader, DatabaseProviderFactory, StateProviderFactory, - StateReader, -}; +use reth_provider::{BlockReader, DatabaseProviderROFactory, StateProviderFactory, StateReader}; use reth_revm::{db::BundleState, state::EvmState}; -use reth_trie::TrieInput; +use reth_trie::{ + hashed_cursor::HashedCursorFactory, prefix_set::TriePrefixSetsMut, + trie_cursor::TrieCursorFactory, +}; use reth_trie_parallel::{ proof_task::{ProofTaskCtx, ProofWorkerHandle}, root::ParallelStateRootError, @@ -121,8 +121,6 @@ where >, /// Whether to disable the parallel sparse trie. disable_parallel_sparse_trie: bool, - /// A cleared trie input, kept around to be reused so allocations can be minimized. - trie_input: Option, /// Maximum concurrency for prewarm task. prewarm_max_concurrency: usize, } @@ -149,7 +147,6 @@ where precompile_cache_disabled: config.precompile_cache_disabled(), precompile_cache_map, sparse_state_trie: Arc::default(), - trie_input: None, disable_parallel_sparse_trie: config.disable_parallel_sparse_trie(), prewarm_max_concurrency: config.prewarm_max_concurrency(), } @@ -200,50 +197,45 @@ where name = "payload processor", skip_all )] - pub fn spawn>( + pub fn spawn>( &mut self, env: ExecutionEnv, transactions: I, provider_builder: StateProviderBuilder, - consistent_view: ConsistentDbView

, - trie_input: TrieInput, + multiproof_provider_factory: F, config: &TreeConfig, ) -> Result< PayloadHandle, I::Tx>, I::Error>, (ParallelStateRootError, I, ExecutionEnv, StateProviderBuilder), > where - P: DatabaseProviderFactory - + BlockReader - + StateProviderFactory - + StateReader + P: BlockReader + StateProviderFactory + StateReader + Clone + 'static, + F: DatabaseProviderROFactory + Clone + + Send + 'static, { let span = tracing::Span::current(); let (to_sparse_trie, sparse_trie_rx) = channel(); - // spawn multiproof task, save the trie input - let (trie_input, state_root_config) = MultiProofConfig::from_input(trie_input); - self.trie_input = Some(trie_input); + + // We rely on the cursor factory to provide whatever DB overlay is necessary to see a + // consistent view of the database, including the trie tables. Because of this there is no + // need for an overarching prefix set to invalidate any section of the trie tables, and so + // we use an empty prefix set. + let prefix_sets = Arc::new(TriePrefixSetsMut::default()); // Create and spawn the storage proof task - let task_ctx = ProofTaskCtx::new( - state_root_config.nodes_sorted.clone(), - state_root_config.state_sorted.clone(), - state_root_config.prefix_sets.clone(), - ); + let task_ctx = ProofTaskCtx::new(multiproof_provider_factory, prefix_sets); let storage_worker_count = config.storage_worker_count(); let account_worker_count = config.account_worker_count(); let proof_handle = ProofWorkerHandle::new( self.executor.handle().clone(), - consistent_view, task_ctx, storage_worker_count, account_worker_count, ); let multi_proof_task = MultiProofTask::new( - state_root_config, proof_handle.clone(), to_sparse_trie, config.multiproof_chunking_enabled().then_some(config.multiproof_chunk_size()), @@ -393,11 +385,6 @@ where CacheTaskHandle { cache, to_prewarm_task: Some(to_prewarm_task), cache_metrics } } - /// Takes the trie input from the inner payload processor, if it exists. - pub const fn take_trie_input(&mut self) -> Option { - self.trie_input.take() - } - /// Returns the cache for the given parent hash. /// /// If the given hash is different then what is recently cached, then this will create a new @@ -718,12 +705,12 @@ mod tests { use reth_evm_ethereum::EthEvmConfig; use reth_primitives_traits::{Account, Recovered, StorageEntry}; use reth_provider::{ - providers::{BlockchainProvider, ConsistentDbView}, + providers::{BlockchainProvider, OverlayStateProviderFactory}, test_utils::create_test_provider_factory_with_chain_spec, ChainSpecProvider, HashingWriter, }; use reth_testing_utils::generators; - use reth_trie::{test_utils::state_root, HashedPostState, TrieInput}; + use reth_trie::{test_utils::state_root, HashedPostState}; use revm_primitives::{Address, HashMap, B256, KECCAK_EMPTY, U256}; use revm_state::{AccountInfo, AccountStatus, EvmState, EvmStorageSlot}; use std::sync::Arc; @@ -905,7 +892,9 @@ mod tests { &TreeConfig::default(), PrecompileCacheMap::default(), ); - let provider = BlockchainProvider::new(factory).unwrap(); + + let provider_factory = BlockchainProvider::new(factory).unwrap(); + let mut handle = payload_processor .spawn( @@ -913,9 +902,8 @@ mod tests { core::iter::empty::< Result, core::convert::Infallible>, >(), - StateProviderBuilder::new(provider.clone(), genesis_hash, None), - ConsistentDbView::new_with_latest_tip(provider).unwrap(), - TrieInput::from_state(hashed_state), + StateProviderBuilder::new(provider_factory.clone(), genesis_hash, None), + OverlayStateProviderFactory::new(provider_factory), &TreeConfig::default(), ) .map_err(|(err, ..)| err) diff --git a/crates/engine/tree/src/tree/payload_processor/multiproof.rs b/crates/engine/tree/src/tree/payload_processor/multiproof.rs index 321de725bec..a000e7a5adf 100644 --- a/crates/engine/tree/src/tree/payload_processor/multiproof.rs +++ b/crates/engine/tree/src/tree/payload_processor/multiproof.rs @@ -57,8 +57,8 @@ impl SparseTrieUpdate { } /// Common configuration for multi proof tasks -#[derive(Debug, Clone)] -pub(super) struct MultiProofConfig { +#[derive(Debug, Clone, Default)] +pub(crate) struct MultiProofConfig { /// The sorted collection of cached in-memory intermediate trie nodes that /// can be reused for computation. pub nodes_sorted: Arc, @@ -75,7 +75,7 @@ impl MultiProofConfig { /// /// This returns a cleared [`TrieInput`] so that we can reuse any allocated space in the /// [`TrieInput`]. - pub(super) fn from_input(mut input: TrieInput) -> (TrieInput, Self) { + pub(crate) fn from_input(mut input: TrieInput) -> (TrieInput, Self) { let config = Self { nodes_sorted: Arc::new(input.nodes.drain_into_sorted()), state_sorted: Arc::new(input.state.drain_into_sorted()), @@ -289,7 +289,6 @@ impl StorageMultiproofInput { /// Input parameters for dispatching a multiproof calculation. #[derive(Debug)] struct MultiproofInput { - config: MultiProofConfig, source: Option, hashed_state_update: HashedPostState, proof_targets: MultiProofTargets, @@ -458,7 +457,6 @@ impl MultiproofManager { /// Dispatches a single multiproof calculation to worker pool. fn dispatch_multiproof(&mut self, multiproof_input: MultiproofInput) { let MultiproofInput { - config, source, hashed_state_update, proof_targets, @@ -485,7 +483,7 @@ impl MultiproofManager { // Extend prefix sets with targets let frozen_prefix_sets = - ParallelProof::extend_prefix_sets_with_targets(&config.prefix_sets, &proof_targets); + ParallelProof::extend_prefix_sets_with_targets(&Default::default(), &proof_targets); // Dispatch account multiproof to worker pool with result sender let input = AccountMultiproofInput { @@ -671,8 +669,6 @@ pub(super) struct MultiProofTask { /// The size of proof targets chunk to spawn in one calculation. /// If None, chunking is disabled and all targets are processed in a single proof. chunk_size: Option, - /// Task configuration. - config: MultiProofConfig, /// Receiver for state root related messages (prefetch, state updates, finish signal). rx: CrossbeamReceiver, /// Sender for state root related messages. @@ -696,7 +692,6 @@ pub(super) struct MultiProofTask { impl MultiProofTask { /// Creates a new multi proof task with the unified message channel pub(super) fn new( - config: MultiProofConfig, proof_worker_handle: ProofWorkerHandle, to_sparse_trie: std::sync::mpsc::Sender, chunk_size: Option, @@ -707,7 +702,6 @@ impl MultiProofTask { Self { chunk_size, - config, rx, tx, proof_result_rx, @@ -761,7 +755,6 @@ impl MultiProofTask { let mut dispatch = |proof_targets| { self.multiproof_manager.dispatch( MultiproofInput { - config: self.config.clone(), source: None, hashed_state_update: Default::default(), proof_targets, @@ -909,7 +902,6 @@ impl MultiProofTask { self.multiproof_manager.dispatch( MultiproofInput { - config: self.config.clone(), source: Some(source), hashed_state_update, proof_targets, @@ -1253,10 +1245,11 @@ mod tests { use super::*; use alloy_primitives::map::B256Set; use reth_provider::{ - providers::ConsistentDbView, test_utils::create_test_provider_factory, BlockReader, - DatabaseProviderFactory, + providers::OverlayStateProviderFactory, test_utils::create_test_provider_factory, + BlockReader, DatabaseProviderFactory, PruneCheckpointReader, StageCheckpointReader, + TrieReader, }; - use reth_trie::{MultiProof, TrieInput}; + use reth_trie::MultiProof; use reth_trie_parallel::proof_task::{ProofTaskCtx, ProofWorkerHandle}; use revm_primitives::{B256, U256}; use std::sync::OnceLock; @@ -1275,20 +1268,19 @@ mod tests { fn create_test_state_root_task(factory: F) -> MultiProofTask where - F: DatabaseProviderFactory + Clone + 'static, + F: DatabaseProviderFactory< + Provider: BlockReader + TrieReader + StageCheckpointReader + PruneCheckpointReader, + > + Clone + + Send + + 'static, { let rt_handle = get_test_runtime_handle(); - let (_trie_input, config) = MultiProofConfig::from_input(TrieInput::default()); - let task_ctx = ProofTaskCtx::new( - config.nodes_sorted.clone(), - config.state_sorted.clone(), - config.prefix_sets.clone(), - ); - let consistent_view = ConsistentDbView::new(factory, None); - let proof_handle = ProofWorkerHandle::new(rt_handle, consistent_view, task_ctx, 1, 1); + let overlay_factory = OverlayStateProviderFactory::new(factory); + let task_ctx = ProofTaskCtx::new(overlay_factory, Default::default()); + let proof_handle = ProofWorkerHandle::new(rt_handle, task_ctx, 1, 1); let (to_sparse_trie, _receiver) = std::sync::mpsc::channel(); - MultiProofTask::new(config, proof_handle, to_sparse_trie, Some(1)) + MultiProofTask::new(proof_handle, to_sparse_trie, Some(1)) } #[test] diff --git a/crates/engine/tree/src/tree/payload_validator.rs b/crates/engine/tree/src/tree/payload_validator.rs index 2770d9a3f9d..ecc475dd53a 100644 --- a/crates/engine/tree/src/tree/payload_validator.rs +++ b/crates/engine/tree/src/tree/payload_validator.rs @@ -5,17 +5,17 @@ use crate::tree::{ error::{InsertBlockError, InsertBlockErrorKind, InsertPayloadError}, executor::WorkloadExecutor, instrumented_state::InstrumentedStateProvider, - payload_processor::PayloadProcessor, + payload_processor::{multiproof::MultiProofConfig, PayloadProcessor}, persistence_state::CurrentPersistenceAction, precompile_cache::{CachedPrecompile, CachedPrecompileMetrics, PrecompileCacheMap}, sparse_trie::StateRootComputeOutcome, - ConsistentDbView, EngineApiMetrics, EngineApiTreeState, ExecutionEnv, PayloadHandle, - PersistenceState, PersistingKind, StateProviderBuilder, StateProviderDatabase, TreeConfig, + EngineApiMetrics, EngineApiTreeState, ExecutionEnv, PayloadHandle, PersistenceState, + PersistingKind, StateProviderBuilder, StateProviderDatabase, TreeConfig, }; use alloy_consensus::transaction::Either; use alloy_eips::{eip1898::BlockWithParent, NumHash}; use alloy_evm::Evm; -use alloy_primitives::B256; +use alloy_primitives::{BlockNumber, B256}; use reth_chain_state::{CanonicalInMemoryState, ExecutedBlock}; use reth_consensus::{ConsensusError, FullConsensus}; use reth_engine_primitives::{ @@ -33,16 +33,13 @@ use reth_primitives_traits::{ AlloyBlockHeader, BlockTy, GotExpected, NodePrimitives, RecoveredBlock, SealedHeader, }; use reth_provider::{ - BlockExecutionOutput, BlockNumReader, BlockReader, DBProvider, DatabaseProviderFactory, - ExecutionOutcome, HashedPostStateProvider, ProviderError, StateProvider, StateProviderFactory, - StateReader, StateRootProvider, TrieReader, + providers::OverlayStateProviderFactory, BlockExecutionOutput, BlockNumReader, BlockReader, + DBProvider, DatabaseProviderFactory, ExecutionOutcome, HashedPostStateProvider, ProviderError, + PruneCheckpointReader, StageCheckpointReader, StateProvider, StateProviderFactory, StateReader, + StateRootProvider, TrieReader, }; use reth_revm::db::State; -use reth_trie::{ - updates::{TrieUpdates, TrieUpdatesSorted}, - HashedPostState, KeccakKeyHasher, TrieInput, -}; -use reth_trie_db::DatabaseHashedPostState; +use reth_trie::{updates::TrieUpdates, HashedPostState, TrieInput}; use reth_trie_parallel::root::{ParallelStateRoot, ParallelStateRootError}; use std::{collections::HashMap, sync::Arc, time::Instant}; use tracing::{debug, debug_span, error, info, instrument, trace, warn}; @@ -162,13 +159,16 @@ where metrics: EngineApiMetrics, /// Validator for the payload. validator: V, + /// A cleared trie input, kept around to be reused so allocations can be minimized. + trie_input: Option, } impl BasicEngineValidator where N: NodePrimitives, - P: DatabaseProviderFactory - + BlockReader

+ P: DatabaseProviderFactory< + Provider: BlockReader + TrieReader + StageCheckpointReader + PruneCheckpointReader, + > + BlockReader
+ StateProviderFactory + StateReader + HashedPostStateProvider @@ -204,6 +204,7 @@ where invalid_block_hook, metrics: EngineApiMetrics::default(), validator, + trie_input: Default::default(), } } @@ -407,8 +408,7 @@ where let env = ExecutionEnv { evm_env, hash: input.hash(), parent_hash: input.parent_hash() }; // Plan the strategy used for state root computation. - let state_root_plan = self.plan_state_root_computation(&input, &ctx); - let persisting_kind = state_root_plan.persisting_kind; + let state_root_plan = self.plan_state_root_computation(); let strategy = state_root_plan.strategy; debug!( @@ -425,7 +425,6 @@ where env.clone(), txs, provider_builder, - persisting_kind, parent_hash, ctx.state(), strategy, @@ -495,7 +494,6 @@ where StateRootStrategy::Parallel => { debug!(target: "engine::tree::payload_validator", "Using parallel state root algorithm"); match self.compute_state_root_parallel( - persisting_kind, block.parent_hash(), &hashed_state, ctx.state(), @@ -530,7 +528,7 @@ where if self.config.state_root_fallback() { debug!(target: "engine::tree::payload_validator", "Using state root fallback for testing"); } else { - warn!(target: "engine::tree::payload_validator", ?persisting_kind, "Failed to compute state root in parallel"); + warn!(target: "engine::tree::payload_validator", "Failed to compute state root in parallel"); self.metrics.block_validation.state_root_parallel_fallback_total.increment(1); } @@ -678,24 +676,35 @@ where #[instrument(level = "debug", target = "engine::tree::payload_validator", skip_all)] fn compute_state_root_parallel( &self, - persisting_kind: PersistingKind, parent_hash: B256, hashed_state: &HashedPostState, state: &EngineApiTreeState, ) -> Result<(B256, TrieUpdates), ParallelStateRootError> { - let consistent_view = ConsistentDbView::new_with_latest_tip(self.provider.clone())?; + let provider = self.provider.database_provider_ro()?; + + let (mut input, block_number) = + self.compute_trie_input(provider, parent_hash, state, None)?; - let mut input = self.compute_trie_input( - persisting_kind, - consistent_view.provider_ro()?, - parent_hash, - state, - None, - )?; // Extend with block we are validating root for. input.append_ref(hashed_state); - ParallelStateRoot::new(consistent_view, input).incremental_root_with_updates() + // Convert the TrieInput into a MultProofConfig, since everything uses the sorted + // forms of the state/trie fields. + let (_, multiproof_config) = MultiProofConfig::from_input(input); + + let factory = OverlayStateProviderFactory::new(self.provider.clone()) + .with_block_number(Some(block_number)) + .with_trie_overlay(Some(multiproof_config.nodes_sorted)) + .with_hashed_state_overlay(Some(multiproof_config.state_sorted)); + + // The `hashed_state` argument is already taken into account as part of the overlay, but we + // need to use the prefix sets which were generated from it to indicate to the + // ParallelStateRoot which parts of the trie need to be recomputed. + let prefix_sets = Arc::into_inner(multiproof_config.prefix_sets) + .expect("MultiProofConfig was never cloned") + .freeze(); + + ParallelStateRoot::new(factory, prefix_sets).incremental_root_with_updates() } /// Validates the block after execution. @@ -777,7 +786,6 @@ where env: ExecutionEnv, txs: T, provider_builder: StateProviderBuilder, - persisting_kind: PersistingKind, parent_hash: B256, state: &EngineApiTreeState, strategy: StateRootStrategy, @@ -793,17 +801,13 @@ where > { match strategy { StateRootStrategy::StateRootTask => { - // use background tasks for state root calc - let consistent_view = ConsistentDbView::new_with_latest_tip(self.provider.clone())?; - // get allocated trie input if it exists - let allocated_trie_input = self.payload_processor.take_trie_input(); + let allocated_trie_input = self.trie_input.take(); // Compute trie input let trie_input_start = Instant::now(); - let trie_input = self.compute_trie_input( - persisting_kind, - consistent_view.provider_ro()?, + let (trie_input, block_number) = self.compute_trie_input( + self.provider.database_provider_ro()?, parent_hash, state, allocated_trie_input, @@ -814,50 +818,49 @@ where .trie_input_duration .record(trie_input_start.elapsed().as_secs_f64()); + // Convert the TrieInput into a MultProofConfig, since everything uses the sorted + // forms of the state/trie fields. + let (trie_input, multiproof_config) = MultiProofConfig::from_input(trie_input); + self.trie_input.replace(trie_input); + + // Create OverlayStateProviderFactory with the multiproof config, for use with + // multiproofs. + let multiproof_provider_factory = + OverlayStateProviderFactory::new(self.provider.clone()) + .with_block_number(Some(block_number)) + .with_trie_overlay(Some(multiproof_config.nodes_sorted)) + .with_hashed_state_overlay(Some(multiproof_config.state_sorted)); + // Use state root task only if prefix sets are empty, otherwise proof generation is // too expensive because it requires walking all paths in every proof. let spawn_start = Instant::now(); - let (handle, strategy) = if trie_input.prefix_sets.is_empty() { - match self.payload_processor.spawn( - env, - txs, - provider_builder, - consistent_view, - trie_input, - &self.config, - ) { - Ok(handle) => { - // Successfully spawned with state root task support - (handle, StateRootStrategy::StateRootTask) - } - Err((error, txs, env, provider_builder)) => { - // Failed to spawn proof workers, fallback to parallel state root - error!( - target: "engine::tree::payload_validator", - ?error, - "Failed to spawn proof workers, falling back to parallel state root" - ); - ( - self.payload_processor.spawn_cache_exclusive( - env, - txs, - provider_builder, - ), - StateRootStrategy::Parallel, - ) - } + let (handle, strategy) = match self.payload_processor.spawn( + env, + txs, + provider_builder, + multiproof_provider_factory, + &self.config, + ) { + Ok(handle) => { + // Successfully spawned with state root task support + (handle, StateRootStrategy::StateRootTask) + } + Err((error, txs, env, provider_builder)) => { + // Failed to spawn proof workers, fallback to parallel state root + error!( + target: "engine::tree::payload_validator", + ?error, + "Failed to spawn proof workers, falling back to parallel state root" + ); + ( + self.payload_processor.spawn_cache_exclusive( + env, + txs, + provider_builder, + ), + StateRootStrategy::Parallel, + ) } - // if prefix sets are not empty, we spawn a task that exclusively handles cache - // prewarming for transaction execution - } else { - debug!( - target: "engine::tree::payload_validator", - "Disabling state root task due to non-empty prefix sets" - ); - ( - self.payload_processor.spawn_cache_exclusive(env, txs, provider_builder), - StateRootStrategy::Parallel, - ) }; // record prewarming initialization duration @@ -915,48 +918,24 @@ where Ok(None) } - /// Determines the state root computation strategy based on persistence state and configuration. + /// Determines the state root computation strategy based on configuration. #[instrument(level = "debug", target = "engine::tree::payload_validator", skip_all)] - fn plan_state_root_computation>>( - &self, - input: &BlockOrPayload, - ctx: &TreeCtx<'_, N>, - ) -> StateRootPlan { - // We only run the parallel state root if we are not currently persisting any blocks or - // persisting blocks that are all ancestors of the one we are executing. - // - // If we're committing ancestor blocks, then: any trie updates being committed are a subset - // of the in-memory trie updates collected before fetching reverts. So any diff in - // reverts (pre vs post commit) is already covered by the in-memory trie updates we - // collect in `compute_state_root_parallel`. - // - // See https://github.com/paradigmxyz/reth/issues/12688 for more details - let persisting_kind = ctx.persisting_kind_for(input.block_with_parent()); - let can_run_parallel = - persisting_kind.can_run_parallel_state_root() && !self.config.state_root_fallback(); - - // Decide on the strategy. - // Use state root task only if: - // 1. No persistence is in progress - // 2. Config allows it - let strategy = if can_run_parallel { - if self.config.use_state_root_task() { - StateRootStrategy::StateRootTask - } else { - StateRootStrategy::Parallel - } - } else { + fn plan_state_root_computation(&self) -> StateRootPlan { + let strategy = if self.config.state_root_fallback() { StateRootStrategy::Synchronous + } else if self.config.use_state_root_task() { + StateRootStrategy::StateRootTask + } else { + StateRootStrategy::Parallel }; debug!( target: "engine::tree::payload_validator", - block=?input.num_hash(), ?strategy, "Planned state root computation strategy" ); - StateRootPlan { strategy, persisting_kind } + StateRootPlan { strategy } } /// Called when an invalid block is encountered during validation. @@ -975,7 +954,8 @@ where self.invalid_block_hook.on_invalid_block(parent_header, block, output, trie_updates); } - /// Computes the trie input at the provided parent hash. + /// Computes the trie input at the provided parent hash, as well as the block number of the + /// highest persisted ancestor. /// /// The goal of this function is to take in-memory blocks and generate a [`TrieInput`] that /// serves as an overlay to the database blocks. @@ -994,105 +974,40 @@ where level = "debug", target = "engine::tree::payload_validator", skip_all, - fields(persisting_kind, parent_hash) + fields(parent_hash) )] fn compute_trie_input( &self, - persisting_kind: PersistingKind, provider: TP, parent_hash: B256, state: &EngineApiTreeState, allocated_trie_input: Option, - ) -> ProviderResult { + ) -> ProviderResult<(TrieInput, BlockNumber)> { // get allocated trie input or use a default trie input let mut input = allocated_trie_input.unwrap_or_default(); - let best_block_number = provider.best_block_number()?; - - let (mut historical, mut blocks) = state + let (historical, blocks) = state .tree_state .blocks_by_hash(parent_hash) .map_or_else(|| (parent_hash.into(), vec![]), |(hash, blocks)| (hash.into(), blocks)); - // If the current block is a descendant of the currently persisting blocks, then we need to - // filter in-memory blocks, so that none of them are already persisted in the database. - let _enter = - debug_span!(target: "engine::tree::payload_validator", "filter in-memory blocks", len = blocks.len()) - .entered(); - if persisting_kind.is_descendant() { - // Iterate over the blocks from oldest to newest. - while let Some(block) = blocks.last() { - let recovered_block = block.recovered_block(); - if recovered_block.number() <= best_block_number { - // Remove those blocks that lower than or equal to the highest database - // block. - blocks.pop(); - } else { - // If the block is higher than the best block number, stop filtering, as it's - // the first block that's not in the database. - break - } - } - - historical = if let Some(block) = blocks.last() { - // If there are any in-memory blocks left after filtering, set the anchor to the - // parent of the oldest block. - (block.recovered_block().number() - 1).into() - } else { - // Otherwise, set the anchor to the original provided parent hash. - parent_hash.into() - }; - } - drop(_enter); - - let blocks_empty = blocks.is_empty(); - if blocks_empty { + if blocks.is_empty() { debug!(target: "engine::tree::payload_validator", "Parent found on disk"); } else { debug!(target: "engine::tree::payload_validator", %historical, blocks = blocks.len(), "Parent found in memory"); } - // Convert the historical block to the block number. + // Convert the historical block to the block number let block_number = provider .convert_hash_or_number(historical)? .ok_or_else(|| ProviderError::BlockHashNotFound(historical.as_hash().unwrap()))?; - let _enter = - debug_span!(target: "engine::tree::payload_validator", "revert state", blocks_empty) - .entered(); - // Retrieve revert state for historical block. - let (revert_state, revert_trie) = if block_number == best_block_number { - // We do not check against the `last_block_number` here because - // `HashedPostState::from_reverts` / `trie_reverts` only use the database tables, and - // not static files. - debug!(target: "engine::tree::payload_validator", block_number, best_block_number, "Empty revert state"); - (HashedPostState::default(), TrieUpdatesSorted::default()) - } else { - let revert_state = HashedPostState::from_reverts::( - provider.tx_ref(), - block_number + 1.., - ) - .map_err(ProviderError::from)?; - let revert_trie = provider.trie_reverts(block_number + 1)?; - debug!( - target: "engine::tree::payload_validator", - block_number, - best_block_number, - accounts = revert_state.accounts.len(), - storages = revert_state.storages.len(), - "Non-empty revert state" - ); - (revert_state, revert_trie) - }; - - input.append_cached(revert_trie.into(), revert_state); - // Extend with contents of parent in-memory blocks. input.extend_with_blocks( blocks.iter().rev().map(|block| (block.hashed_state(), block.trie_updates())), ); - Ok(input) + Ok((input, block_number)) } } @@ -1114,8 +1029,6 @@ enum StateRootStrategy { struct StateRootPlan { /// Strategy that should be attempted for computing the state root. strategy: StateRootStrategy, - /// The persisting kind for this block. - persisting_kind: PersistingKind, } /// Type that validates the payloads processed by the engine. @@ -1171,8 +1084,9 @@ pub trait EngineValidator< impl EngineValidator for BasicEngineValidator where - P: DatabaseProviderFactory - + BlockReader
+ P: DatabaseProviderFactory< + Provider: BlockReader + TrieReader + StageCheckpointReader + PruneCheckpointReader, + > + BlockReader
+ StateProviderFactory + StateReader + HashedPostStateProvider diff --git a/crates/engine/tree/src/tree/state.rs b/crates/engine/tree/src/tree/state.rs index f38faf6524c..a10d26e3f27 100644 --- a/crates/engine/tree/src/tree/state.rs +++ b/crates/engine/tree/src/tree/state.rs @@ -76,7 +76,8 @@ impl TreeState { } /// Returns all available blocks for the given hash that lead back to the canonical chain, from - /// newest to oldest. And the parent hash of the oldest block that is missing from the buffer. + /// newest to oldest, and the parent hash of the oldest returned block. This parent hash is the + /// highest persisted block connected to this chain. /// /// Returns `None` if the block for the given hash is not found. pub(crate) fn blocks_by_hash(&self, hash: B256) -> Option<(B256, Vec>)> { diff --git a/crates/storage/provider/src/providers/state/overlay.rs b/crates/storage/provider/src/providers/state/overlay.rs index 98bd17aa4f9..28f04f9f767 100644 --- a/crates/storage/provider/src/providers/state/overlay.rs +++ b/crates/storage/provider/src/providers/state/overlay.rs @@ -4,7 +4,8 @@ use reth_errors::ProviderError; use reth_prune_types::PruneSegment; use reth_stages_types::StageId; use reth_storage_api::{ - DBProvider, DatabaseProviderFactory, PruneCheckpointReader, StageCheckpointReader, TrieReader, + DBProvider, DatabaseProviderFactory, DatabaseProviderROFactory, PruneCheckpointReader, + StageCheckpointReader, TrieReader, }; use reth_trie::{ hashed_cursor::{HashedCursorFactory, HashedPostStateCursorFactory}, @@ -34,11 +35,7 @@ pub struct OverlayStateProviderFactory { hashed_state_overlay: Option>, } -impl OverlayStateProviderFactory -where - F: DatabaseProviderFactory, - F::Provider: Clone + TrieReader + StageCheckpointReader + PruneCheckpointReader, -{ +impl OverlayStateProviderFactory { /// Create a new overlay state provider factory pub const fn new(factory: F) -> Self { Self { factory, block_number: None, trie_overlay: None, hashed_state_overlay: None } @@ -69,7 +66,13 @@ where self.hashed_state_overlay = hashed_state_overlay; self } +} +impl OverlayStateProviderFactory +where + F: DatabaseProviderFactory, + F::Provider: TrieReader + StageCheckpointReader + PruneCheckpointReader, +{ /// Validates that there are sufficient changesets to revert to the requested block number. /// /// Returns an error if the `MerkleChangeSets` checkpoint doesn't cover the requested block. @@ -104,13 +107,8 @@ where let prune_lower_bound = prune_checkpoint.and_then(|chk| chk.block_number.map(|block| block + 1)); - // Use the higher of the two lower bounds (or error if neither is available) - let Some(lower_bound) = stage_lower_bound.max(prune_lower_bound) else { - return Err(ProviderError::InsufficientChangesets { - requested: requested_block, - available: 0..=upper_bound, - }) - }; + // Use the higher of the two lower bounds. If neither is available assume unbounded. + let lower_bound = stage_lower_bound.max(prune_lower_bound).unwrap_or(0); let available_range = lower_bound..=upper_bound; @@ -124,9 +122,17 @@ where Ok(()) } +} + +impl DatabaseProviderROFactory for OverlayStateProviderFactory +where + F: DatabaseProviderFactory, + F::Provider: TrieReader + StageCheckpointReader + PruneCheckpointReader, +{ + type Provider = OverlayStateProvider; /// Create a read-only [`OverlayStateProvider`]. - pub fn provider_ro(&self) -> Result, ProviderError> { + fn database_provider_ro(&self) -> Result, ProviderError> { // Get a read-only provider let provider = self.factory.database_provider_ro()?; @@ -184,7 +190,7 @@ where /// This provider uses in-memory trie updates and hashed post state as an overlay /// on top of a database provider, implementing [`TrieCursorFactory`] and [`HashedCursorFactory`] /// using the in-memory overlay factories. -#[derive(Debug, Clone)] +#[derive(Debug)] pub struct OverlayStateProvider { provider: Provider, trie_updates: Arc, diff --git a/crates/storage/provider/src/test_utils/mock.rs b/crates/storage/provider/src/test_utils/mock.rs index 4b3829cf8ed..16388de91ae 100644 --- a/crates/storage/provider/src/test_utils/mock.rs +++ b/crates/storage/provider/src/test_utils/mock.rs @@ -1,9 +1,9 @@ use crate::{ traits::{BlockSource, ReceiptProvider}, AccountReader, BlockHashReader, BlockIdReader, BlockNumReader, BlockReader, BlockReaderIdExt, - ChainSpecProvider, ChangeSetReader, HeaderProvider, ReceiptProviderIdExt, StateProvider, - StateProviderBox, StateProviderFactory, StateReader, StateRootProvider, TransactionVariant, - TransactionsProvider, + ChainSpecProvider, ChangeSetReader, HeaderProvider, PruneCheckpointReader, + ReceiptProviderIdExt, StateProvider, StateProviderBox, StateProviderFactory, StateReader, + StateRootProvider, TransactionVariant, TransactionsProvider, }; use alloy_consensus::{ constants::EMPTY_ROOT_HASH, @@ -29,7 +29,7 @@ use reth_primitives_traits::{ Account, Block, BlockBody, Bytecode, GotExpected, NodePrimitives, RecoveredBlock, SealedHeader, SignerRecoverable, }; -use reth_prune_types::PruneModes; +use reth_prune_types::{PruneCheckpoint, PruneModes, PruneSegment}; use reth_stages_types::{StageCheckpoint, StageId}; use reth_storage_api::{ BlockBodyIndicesProvider, BytecodeReader, DBProvider, DatabaseProviderFactory, @@ -756,6 +756,21 @@ impl StageCheckpointReader } } +impl PruneCheckpointReader + for MockEthProvider +{ + fn get_prune_checkpoint( + &self, + _segment: PruneSegment, + ) -> ProviderResult> { + Ok(None) + } + + fn get_prune_checkpoints(&self) -> ProviderResult> { + Ok(vec![]) + } +} + impl StateRootProvider for MockEthProvider where T: NodePrimitives, diff --git a/crates/storage/storage-api/src/database_provider.rs b/crates/storage/storage-api/src/database_provider.rs index c0e94a044bf..8b5d8281f42 100644 --- a/crates/storage/storage-api/src/database_provider.rs +++ b/crates/storage/storage-api/src/database_provider.rs @@ -160,6 +160,29 @@ pub trait DatabaseProviderFactory: Send + Sync { /// Helper type alias to get the associated transaction type from a [`DatabaseProviderFactory`]. pub type FactoryTx = <::DB as Database>::TX; +/// A trait which can be used to describe any factory-like type which returns a read-only provider. +pub trait DatabaseProviderROFactory { + /// Provider type returned by this factory. + /// + /// This type is intentionally left unconstrained; constraints can be added as-needed when this + /// is used. + type Provider; + + /// Creates and returns a Provider. + fn database_provider_ro(&self) -> ProviderResult; +} + +impl DatabaseProviderROFactory for T +where + T: DatabaseProviderFactory, +{ + type Provider = T::Provider; + + fn database_provider_ro(&self) -> ProviderResult { + ::database_provider_ro(self) + } +} + fn range_size_hint(range: &impl RangeBounds) -> Option { let start = match range.start_bound().cloned() { Bound::Included(start) => start, diff --git a/crates/trie/parallel/Cargo.toml b/crates/trie/parallel/Cargo.toml index b4463d9ede3..9fb882b44a5 100644 --- a/crates/trie/parallel/Cargo.toml +++ b/crates/trie/parallel/Cargo.toml @@ -13,12 +13,10 @@ workspace = true [dependencies] # reth -reth-db-api.workspace = true reth-execution-errors.workspace = true reth-provider.workspace = true reth-storage-errors.workspace = true reth-trie-common.workspace = true -reth-trie-db.workspace = true reth-trie-sparse = { workspace = true, features = ["std"] } reth-trie.workspace = true @@ -46,6 +44,7 @@ metrics = { workspace = true, optional = true } # reth reth-primitives-traits.workspace = true reth-provider = { workspace = true, features = ["test-utils"] } +reth-trie-db.workspace = true reth-trie = { workspace = true, features = ["test-utils"] } # misc @@ -59,7 +58,6 @@ tokio = { workspace = true, features = ["rt", "rt-multi-thread", "macros"] } default = ["metrics"] metrics = ["reth-metrics", "dep:metrics", "reth-trie/metrics", "reth-trie-sparse/metrics"] test-utils = [ - "reth-db-api/test-utils", "reth-primitives-traits/test-utils", "reth-provider/test-utils", "reth-trie-common/test-utils", diff --git a/crates/trie/parallel/benches/root.rs b/crates/trie/parallel/benches/root.rs index 48657cc8a70..53719892748 100644 --- a/crates/trie/parallel/benches/root.rs +++ b/crates/trie/parallel/benches/root.rs @@ -5,7 +5,8 @@ use proptest::{prelude::*, strategy::ValueTree, test_runner::TestRunner}; use proptest_arbitrary_interop::arb; use reth_primitives_traits::Account; use reth_provider::{ - providers::ConsistentDbView, test_utils::create_test_provider_factory, StateWriter, TrieWriter, + providers::OverlayStateProviderFactory, test_utils::create_test_provider_factory, StateWriter, + TrieWriter, }; use reth_trie::{ hashed_cursor::HashedPostStateCursorFactory, HashedPostState, HashedStorage, StateRoot, @@ -37,7 +38,7 @@ pub fn calculate_state_root(c: &mut Criterion) { provider_rw.commit().unwrap(); } - let view = ConsistentDbView::new(provider_factory.clone(), None); + let factory = OverlayStateProviderFactory::new(provider_factory.clone()); // state root group.bench_function(BenchmarkId::new("sync root", size), |b| { @@ -65,10 +66,8 @@ pub fn calculate_state_root(c: &mut Criterion) { group.bench_function(BenchmarkId::new("parallel root", size), |b| { b.iter_with_setup( || { - ParallelStateRoot::new( - view.clone(), - TrieInput::from_state(updated_state.clone()), - ) + let trie_input = TrieInput::from_state(updated_state.clone()); + ParallelStateRoot::new(factory.clone(), trie_input.prefix_sets.freeze()) }, |calculator| calculator.incremental_root(), ); diff --git a/crates/trie/parallel/src/proof.rs b/crates/trie/parallel/src/proof.rs index 4d54359d1bf..09f5e56e771 100644 --- a/crates/trie/parallel/src/proof.rs +++ b/crates/trie/parallel/src/proof.rs @@ -14,9 +14,7 @@ use reth_execution_errors::StorageRootError; use reth_storage_errors::db::DatabaseError; use reth_trie::{ prefix_set::{PrefixSet, PrefixSetMut, TriePrefixSets, TriePrefixSetsMut}, - updates::TrieUpdatesSorted, - DecodedMultiProof, DecodedStorageMultiProof, HashedPostState, HashedPostStateSorted, - MultiProofTargets, Nibbles, + DecodedMultiProof, DecodedStorageMultiProof, HashedPostState, MultiProofTargets, Nibbles, }; use reth_trie_common::added_removed_keys::MultiAddedRemovedKeys; use std::{sync::Arc, time::Instant}; @@ -28,14 +26,7 @@ use tracing::trace; /// that has proof targets. #[derive(Debug)] pub struct ParallelProof { - /// The sorted collection of cached in-memory intermediate trie nodes that - /// can be reused for computation. - pub nodes_sorted: Arc, - /// The sorted in-memory overlay hashed state. - pub state_sorted: Arc, - /// The collection of prefix sets for the computation. Since the prefix sets _always_ - /// invalidate the in-memory nodes, not all keys from `state_sorted` might be present here, - /// if we have cached nodes for them. + /// The collection of prefix sets for the computation. pub prefix_sets: Arc, /// Flag indicating whether to include branch node masks in the proof. collect_branch_node_masks: bool, @@ -53,15 +44,11 @@ pub struct ParallelProof { impl ParallelProof { /// Create new state proof generator. pub fn new( - nodes_sorted: Arc, - state_sorted: Arc, prefix_sets: Arc, missed_leaves_storage_roots: Arc>, proof_worker_handle: ProofWorkerHandle, ) -> Self { Self { - nodes_sorted, - state_sorted, prefix_sets, missed_leaves_storage_roots, collect_branch_node_masks: false, @@ -272,9 +259,7 @@ mod tests { }; use rand::Rng; use reth_primitives_traits::{Account, StorageEntry}; - use reth_provider::{ - providers::ConsistentDbView, test_utils::create_test_provider_factory, HashingWriter, - }; + use reth_provider::{test_utils::create_test_provider_factory, HashingWriter}; use reth_trie::proof::Proof; use reth_trie_db::{DatabaseHashedCursorFactory, DatabaseTrieCursorFactory}; use tokio::runtime::Runtime; @@ -282,7 +267,6 @@ mod tests { #[test] fn random_parallel_proof() { let factory = create_test_provider_factory(); - let consistent_view = ConsistentDbView::new(factory.clone(), None); let mut rng = rand::rng(); let state = (0..100) @@ -344,20 +328,14 @@ mod tests { let rt = Runtime::new().unwrap(); - let task_ctx = - ProofTaskCtx::new(Default::default(), Default::default(), Default::default()); - let proof_worker_handle = - ProofWorkerHandle::new(rt.handle().clone(), consistent_view, task_ctx, 1, 1); - - let parallel_result = ParallelProof::new( - Default::default(), - Default::default(), - Default::default(), - Default::default(), - proof_worker_handle.clone(), - ) - .decoded_multiproof(targets.clone()) - .unwrap(); + let factory = reth_provider::providers::OverlayStateProviderFactory::new(factory); + let task_ctx = ProofTaskCtx::new(factory, Default::default()); + let proof_worker_handle = ProofWorkerHandle::new(rt.handle().clone(), task_ctx, 1, 1); + + let parallel_result = + ParallelProof::new(Default::default(), Default::default(), proof_worker_handle.clone()) + .decoded_multiproof(targets.clone()) + .unwrap(); let sequential_result_raw = Proof::new(trie_cursor_factory, hashed_cursor_factory) .multiproof(targets.clone()) diff --git a/crates/trie/parallel/src/proof_task.rs b/crates/trie/parallel/src/proof_task.rs index 1b50dbe73ef..7e453cbc7c3 100644 --- a/crates/trie/parallel/src/proof_task.rs +++ b/crates/trie/parallel/src/proof_task.rs @@ -28,9 +28,6 @@ //! | v //! ProofResultMessage <-------- ProofResultSender --- //! ``` -//! -//! Individual [`ProofTaskTx`] instances manage a dedicated [`InMemoryTrieCursorFactory`] and -//! [`HashedPostStateCursorFactory`], which are each backed by a database transaction. use crate::{ root::ParallelStateRootError, @@ -44,29 +41,24 @@ use alloy_primitives::{ use alloy_rlp::{BufMut, Encodable}; use crossbeam_channel::{unbounded, Receiver as CrossbeamReceiver, Sender as CrossbeamSender}; use dashmap::DashMap; -use reth_db_api::transaction::DbTx; use reth_execution_errors::{SparseTrieError, SparseTrieErrorKind}; -use reth_provider::{ - providers::ConsistentDbView, BlockReader, DBProvider, DatabaseProviderFactory, ProviderError, -}; +use reth_provider::{DatabaseProviderROFactory, ProviderError}; use reth_storage_errors::db::DatabaseError; use reth_trie::{ - hashed_cursor::{HashedCursorFactory, HashedPostStateCursorFactory}, + hashed_cursor::HashedCursorFactory, node_iter::{TrieElement, TrieNodeIter}, prefix_set::{TriePrefixSets, TriePrefixSetsMut}, - proof::{ProofTrieNodeProviderFactory, StorageProof}, - trie_cursor::{InMemoryTrieCursorFactory, TrieCursorFactory}, - updates::TrieUpdatesSorted, + proof::{ProofBlindedAccountProvider, ProofBlindedStorageProvider, StorageProof}, + trie_cursor::TrieCursorFactory, walker::TrieWalker, - DecodedMultiProof, DecodedStorageMultiProof, HashBuilder, HashedPostState, - HashedPostStateSorted, MultiProofTargets, Nibbles, TRIE_ACCOUNT_RLP_MAX_SIZE, + DecodedMultiProof, DecodedStorageMultiProof, HashBuilder, HashedPostState, MultiProofTargets, + Nibbles, TRIE_ACCOUNT_RLP_MAX_SIZE, }; use reth_trie_common::{ added_removed_keys::MultiAddedRemovedKeys, prefix_set::{PrefixSet, PrefixSetMut}, proof::{DecodedProofNodes, ProofRetainer}, }; -use reth_trie_db::{DatabaseHashedCursorFactory, DatabaseTrieCursorFactory}; use reth_trie_sparse::provider::{RevealedNode, TrieNodeProvider, TrieNodeProviderFactory}; use std::{ sync::{ @@ -214,19 +206,20 @@ enum StorageWorkerJob { /// /// Worker shuts down when the crossbeam channel closes (all senders dropped). fn storage_worker_loop( - view: ConsistentDbView, - task_ctx: ProofTaskCtx, + task_ctx: ProofTaskCtx, work_rx: CrossbeamReceiver, worker_id: usize, available_workers: Arc, #[cfg(feature = "metrics")] metrics: ProofTaskTrieMetrics, ) where - Factory: DatabaseProviderFactory, + Factory: DatabaseProviderROFactory, { - // Create db transaction before entering work loop - let provider = - view.provider_ro().expect("Storage worker failed to initialize: database unavailable"); - let proof_tx = ProofTaskTx::new(provider.into_tx(), task_ctx, worker_id); + // Create provider from factory + let provider = task_ctx + .factory + .database_provider_ro() + .expect("Storage worker failed to initialize: unable to create provider"); + let proof_tx = ProofTaskTx::new(provider, task_ctx.prefix_sets, worker_id); trace!( target: "trie::proof_task", @@ -234,16 +227,6 @@ fn storage_worker_loop( "Storage worker started" ); - // Create factories once at worker startup to avoid recreation overhead. - let (trie_cursor_factory, hashed_cursor_factory) = proof_tx.create_factories(); - - // Create blinded provider factory once for all blinded node requests - let blinded_provider_factory = ProofTrieNodeProviderFactory::new( - trie_cursor_factory.clone(), - hashed_cursor_factory.clone(), - proof_tx.task_ctx.prefix_sets.clone(), - ); - let mut storage_proofs_processed = 0u64; let mut storage_nodes_processed = 0u64; @@ -270,12 +253,7 @@ fn storage_worker_loop( ); let proof_start = Instant::now(); - - let result = proof_tx.compute_storage_proof( - input, - trie_cursor_factory.clone(), - hashed_cursor_factory.clone(), - ); + let result = proof_tx.compute_storage_proof(input); let proof_elapsed = proof_start.elapsed(); storage_proofs_processed += 1; @@ -325,9 +303,15 @@ fn storage_worker_loop( "Processing blinded storage node" ); + let storage_node_provider = ProofBlindedStorageProvider::new( + &proof_tx.provider, + &proof_tx.provider, + proof_tx.prefix_sets.clone(), + account, + ); + let start = Instant::now(); - let result = - blinded_provider_factory.storage_node_provider(account).trie_node(&path); + let result = storage_node_provider.trie_node(&path); let elapsed = start.elapsed(); storage_nodes_processed += 1; @@ -393,20 +377,21 @@ fn storage_worker_loop( /// /// Worker shuts down when the crossbeam channel closes (all senders dropped). fn account_worker_loop( - view: ConsistentDbView, - task_ctx: ProofTaskCtx, + task_ctx: ProofTaskCtx, work_rx: CrossbeamReceiver, storage_work_tx: CrossbeamSender, worker_id: usize, available_workers: Arc, #[cfg(feature = "metrics")] metrics: ProofTaskTrieMetrics, ) where - Factory: DatabaseProviderFactory, + Factory: DatabaseProviderROFactory, { - // Create db transaction before entering work loop - let provider = - view.provider_ro().expect("Account worker failed to initialize: database unavailable"); - let proof_tx = ProofTaskTx::new(provider.into_tx(), task_ctx, worker_id); + // Create provider from factory + let provider = task_ctx + .factory + .database_provider_ro() + .expect("Account worker failed to initialize: unable to create provider"); + let proof_tx = ProofTaskTx::new(provider, task_ctx.prefix_sets, worker_id); trace!( target: "trie::proof_task", @@ -414,16 +399,6 @@ fn account_worker_loop( "Account worker started" ); - // Create factories once at worker startup to avoid recreation overhead. - let (trie_cursor_factory, hashed_cursor_factory) = proof_tx.create_factories(); - - // Create blinded provider factory once for all blinded node requests - let blinded_provider_factory = ProofTrieNodeProviderFactory::new( - trie_cursor_factory.clone(), - hashed_cursor_factory.clone(), - proof_tx.task_ctx.prefix_sets.clone(), - ); - let mut account_proofs_processed = 0u64; let mut account_nodes_processed = 0u64; @@ -511,8 +486,7 @@ fn account_worker_loop( }; let result = build_account_multiproof_with_storage_roots( - trie_cursor_factory.clone(), - hashed_cursor_factory.clone(), + &proof_tx.provider, ctx, &mut tracker, ); @@ -568,8 +542,14 @@ fn account_worker_loop( "Processing blinded account node" ); + let account_node_provider = ProofBlindedAccountProvider::new( + &proof_tx.provider, + &proof_tx.provider, + proof_tx.prefix_sets.clone(), + ); + let start = Instant::now(); - let result = blinded_provider_factory.account_node_provider().trie_node(&path); + let result = account_node_provider.trie_node(&path); let elapsed = start.elapsed(); account_nodes_processed += 1; @@ -617,22 +597,20 @@ fn account_worker_loop( /// enabling interleaved parallelism between account trie traversal and storage proof computation. /// /// Returns a `DecodedMultiProof` containing the account subtree and storage proofs. -fn build_account_multiproof_with_storage_roots( - trie_cursor_factory: C, - hashed_cursor_factory: H, +fn build_account_multiproof_with_storage_roots

( + provider: &P, ctx: AccountMultiproofParams<'_>, tracker: &mut ParallelTrieTracker, ) -> Result where - C: TrieCursorFactory + Clone, - H: HashedCursorFactory + Clone, + P: TrieCursorFactory + HashedCursorFactory, { let accounts_added_removed_keys = ctx.multi_added_removed_keys.as_ref().map(|keys| keys.get_accounts()); // Create the walker. let walker = TrieWalker::<_>::state_trie( - trie_cursor_factory.account_trie_cursor().map_err(ProviderError::Database)?, + provider.account_trie_cursor().map_err(ProviderError::Database)?, ctx.prefix_set, ) .with_added_removed_keys(accounts_added_removed_keys) @@ -656,7 +634,7 @@ where let mut account_rlp = Vec::with_capacity(TRIE_ACCOUNT_RLP_MAX_SIZE); let mut account_node_iter = TrieNodeIter::state_trie( walker, - hashed_cursor_factory.hashed_account_cursor().map_err(ProviderError::Database)?, + provider.hashed_account_cursor().map_err(ProviderError::Database)?, ); let mut storage_proof_receivers = ctx.storage_proof_receivers; @@ -708,23 +686,23 @@ where match ctx.missed_leaves_storage_roots.entry(hashed_address) { dashmap::Entry::Occupied(occ) => *occ.get(), dashmap::Entry::Vacant(vac) => { - let root = StorageProof::new_hashed( - trie_cursor_factory.clone(), - hashed_cursor_factory.clone(), - hashed_address, - ) - .with_prefix_set_mut(Default::default()) - .storage_multiproof( - ctx.targets.get(&hashed_address).cloned().unwrap_or_default(), - ) - .map_err(|e| { - ParallelStateRootError::StorageRoot( - reth_execution_errors::StorageRootError::Database( - DatabaseError::Other(e.to_string()), - ), - ) - })? - .root; + let root = + StorageProof::new_hashed(provider, provider, hashed_address) + .with_prefix_set_mut(Default::default()) + .storage_multiproof( + ctx.targets + .get(&hashed_address) + .cloned() + .unwrap_or_default(), + ) + .map_err(|e| { + ParallelStateRootError::StorageRoot( + reth_execution_errors::StorageRootError::Database( + DatabaseError::Other(e.to_string()), + ), + ) + })? + .root; vac.insert(root); root @@ -835,64 +813,35 @@ fn dispatch_storage_proofs( Ok(storage_proof_receivers) } -/// Type alias for the factory tuple returned by `create_factories` -type ProofFactories<'a, Tx> = ( - InMemoryTrieCursorFactory, &'a TrieUpdatesSorted>, - HashedPostStateCursorFactory, &'a HashedPostStateSorted>, -); - /// This contains all information shared between all storage proof instances. #[derive(Debug)] -pub struct ProofTaskTx { - /// The tx that is reused for proof calculations. - tx: Tx, +pub struct ProofTaskTx { + /// The provider that implements `TrieCursorFactory` and `HashedCursorFactory`. + provider: Provider, - /// Trie updates, prefix sets, and state updates - task_ctx: ProofTaskCtx, + /// The prefix sets for the computation. + prefix_sets: Arc, /// Identifier for the worker within the worker pool, used only for tracing. id: usize, } -impl ProofTaskTx { - /// Initializes a [`ProofTaskTx`] using the given transaction and a [`ProofTaskCtx`]. The id is - /// used only for tracing. - const fn new(tx: Tx, task_ctx: ProofTaskCtx, id: usize) -> Self { - Self { tx, task_ctx, id } +impl ProofTaskTx { + /// Initializes a [`ProofTaskTx`] with the given provider, prefix sets, and ID. + const fn new(provider: Provider, prefix_sets: Arc, id: usize) -> Self { + Self { provider, prefix_sets, id } } } -impl ProofTaskTx +impl ProofTaskTx where - Tx: DbTx, + Provider: TrieCursorFactory + HashedCursorFactory, { - #[inline] - fn create_factories(&self) -> ProofFactories<'_, Tx> { - let trie_cursor_factory = InMemoryTrieCursorFactory::new( - DatabaseTrieCursorFactory::new(&self.tx), - self.task_ctx.nodes_sorted.as_ref(), - ); - - let hashed_cursor_factory = HashedPostStateCursorFactory::new( - DatabaseHashedCursorFactory::new(&self.tx), - self.task_ctx.state_sorted.as_ref(), - ); - - (trie_cursor_factory, hashed_cursor_factory) - } - - /// Compute storage proof with pre-created factories. + /// Compute storage proof. /// - /// Accepts cursor factories as parameters to allow reuse across multiple proofs. - /// Used by storage workers in the worker pool to avoid factory recreation - /// overhead on each proof computation. + /// Used by storage workers in the worker pool to compute storage proofs. #[inline] - fn compute_storage_proof( - &self, - input: StorageProofInput, - trie_cursor_factory: impl TrieCursorFactory, - hashed_cursor_factory: impl HashedCursorFactory, - ) -> StorageProofResult { + fn compute_storage_proof(&self, input: StorageProofInput) -> StorageProofResult { // Consume the input so we can move large collections (e.g. target slots) without cloning. let StorageProofInput { hashed_address, @@ -919,7 +868,7 @@ where // Compute raw storage multiproof let raw_proof_result = - StorageProof::new_hashed(trie_cursor_factory, hashed_cursor_factory, hashed_address) + StorageProof::new_hashed(&self.provider, &self.provider, hashed_address) .with_prefix_set_mut(PrefixSetMut::from(prefix_set.iter().copied())) .with_branch_node_masks(with_branch_node_masks) .with_added_removed_keys(added_removed_keys) @@ -1034,27 +983,20 @@ enum AccountWorkerJob { } /// Data used for initializing cursor factories that is shared across all storage proof instances. -#[derive(Debug, Clone)] -pub struct ProofTaskCtx { - /// The sorted collection of cached in-memory intermediate trie nodes that can be reused for - /// computation. - nodes_sorted: Arc, - /// The sorted in-memory overlay hashed state. - state_sorted: Arc, +#[derive(Clone, Debug)] +pub struct ProofTaskCtx { + /// The factory for creating state providers. + factory: Factory, /// The collection of prefix sets for the computation. Since the prefix sets _always_ /// invalidate the in-memory nodes, not all keys from `state_sorted` might be present here, /// if we have cached nodes for them. prefix_sets: Arc, } -impl ProofTaskCtx { - /// Creates a new [`ProofTaskCtx`] with the given sorted nodes and state. - pub const fn new( - nodes_sorted: Arc, - state_sorted: Arc, - prefix_sets: Arc, - ) -> Self { - Self { nodes_sorted, state_sorted, prefix_sets } +impl ProofTaskCtx { + /// Creates a new [`ProofTaskCtx`] with the given factory and prefix sets. + pub const fn new(factory: Factory, prefix_sets: Arc) -> Self { + Self { factory, prefix_sets } } } @@ -1085,19 +1027,20 @@ impl ProofWorkerHandle { /// /// # Parameters /// - `executor`: Tokio runtime handle for spawning blocking tasks - /// - `view`: Consistent database view for creating transactions - /// - `task_ctx`: Shared context with trie updates and prefix sets + /// - `task_ctx`: Shared context with database view and prefix sets /// - `storage_worker_count`: Number of storage workers to spawn /// - `account_worker_count`: Number of account workers to spawn pub fn new( executor: Handle, - view: ConsistentDbView, - task_ctx: ProofTaskCtx, + task_ctx: ProofTaskCtx, storage_worker_count: usize, account_worker_count: usize, ) -> Self where - Factory: DatabaseProviderFactory + Clone + 'static, + Factory: DatabaseProviderROFactory + + Clone + + Send + + 'static, { let (storage_work_tx, storage_work_rx) = unbounded::(); let (account_work_tx, account_work_rx) = unbounded::(); @@ -1120,7 +1063,6 @@ impl ProofWorkerHandle { // Spawn storage workers for worker_id in 0..storage_worker_count { let span = debug_span!(target: "trie::proof_task", "storage worker", ?worker_id); - let view_clone = view.clone(); let task_ctx_clone = task_ctx.clone(); let work_rx_clone = storage_work_rx.clone(); let storage_available_workers_clone = storage_available_workers.clone(); @@ -1131,7 +1073,6 @@ impl ProofWorkerHandle { let _guard = span.enter(); storage_worker_loop( - view_clone, task_ctx_clone, work_rx_clone, worker_id, @@ -1149,7 +1090,6 @@ impl ProofWorkerHandle { // Spawn account workers for worker_id in 0..account_worker_count { let span = debug_span!(target: "trie::proof_task", "account worker", ?worker_id); - let view_clone = view.clone(); let task_ctx_clone = task_ctx.clone(); let work_rx_clone = account_work_rx.clone(); let storage_work_tx_clone = storage_work_tx.clone(); @@ -1161,7 +1101,6 @@ impl ProofWorkerHandle { let _guard = span.enter(); account_worker_loop( - view_clone, task_ctx_clone, work_rx_clone, storage_work_tx_clone, @@ -1357,24 +1296,13 @@ impl TrieNodeProvider for ProofTaskTrieNodeProvider { #[cfg(test)] mod tests { use super::*; - use alloy_primitives::map::B256Map; - use reth_provider::{providers::ConsistentDbView, test_utils::create_test_provider_factory}; - use reth_trie_common::{ - prefix_set::TriePrefixSetsMut, updates::TrieUpdatesSorted, HashedAccountsSorted, - HashedPostStateSorted, - }; + use reth_provider::test_utils::create_test_provider_factory; + use reth_trie_common::prefix_set::TriePrefixSetsMut; use std::sync::Arc; use tokio::{runtime::Builder, task}; - fn test_ctx() -> ProofTaskCtx { - ProofTaskCtx::new( - Arc::new(TrieUpdatesSorted::default()), - Arc::new(HashedPostStateSorted::new( - HashedAccountsSorted::default(), - B256Map::default(), - )), - Arc::new(TriePrefixSetsMut::default()), - ) + fn test_ctx(factory: Factory) -> ProofTaskCtx { + ProofTaskCtx::new(factory, Arc::new(TriePrefixSetsMut::default())) } /// Ensures `ProofWorkerHandle::new` spawns workers correctly. @@ -1383,11 +1311,12 @@ mod tests { let runtime = Builder::new_multi_thread().worker_threads(1).enable_all().build().unwrap(); runtime.block_on(async { let handle = tokio::runtime::Handle::current(); - let factory = create_test_provider_factory(); - let view = ConsistentDbView::new(factory, None); - let ctx = test_ctx(); + let provider_factory = create_test_provider_factory(); + let factory = + reth_provider::providers::OverlayStateProviderFactory::new(provider_factory); + let ctx = test_ctx(factory); - let proof_handle = ProofWorkerHandle::new(handle.clone(), view, ctx, 5, 3); + let proof_handle = ProofWorkerHandle::new(handle.clone(), ctx, 5, 3); // Verify handle can be cloned let _cloned_handle = proof_handle.clone(); diff --git a/crates/trie/parallel/src/root.rs b/crates/trie/parallel/src/root.rs index 61d8f69a1d2..5c9294e8f92 100644 --- a/crates/trie/parallel/src/root.rs +++ b/crates/trie/parallel/src/root.rs @@ -5,22 +5,20 @@ use alloy_primitives::B256; use alloy_rlp::{BufMut, Encodable}; use itertools::Itertools; use reth_execution_errors::StorageRootError; -use reth_provider::{ - providers::ConsistentDbView, BlockReader, DBProvider, DatabaseProviderFactory, ProviderError, -}; +use reth_provider::{DatabaseProviderROFactory, ProviderError}; use reth_storage_errors::db::DatabaseError; use reth_trie::{ - hashed_cursor::{HashedCursorFactory, HashedPostStateCursorFactory}, + hashed_cursor::HashedCursorFactory, node_iter::{TrieElement, TrieNodeIter}, - trie_cursor::{InMemoryTrieCursorFactory, TrieCursorFactory}, + prefix_set::TriePrefixSets, + trie_cursor::TrieCursorFactory, updates::TrieUpdates, walker::TrieWalker, - HashBuilder, Nibbles, StorageRoot, TrieInput, TRIE_ACCOUNT_RLP_MAX_SIZE, + HashBuilder, Nibbles, StorageRoot, TRIE_ACCOUNT_RLP_MAX_SIZE, }; -use reth_trie_db::{DatabaseHashedCursorFactory, DatabaseTrieCursorFactory}; use std::{ collections::HashMap, - sync::{mpsc, Arc, OnceLock}, + sync::{mpsc, OnceLock}, time::Duration, }; use thiserror::Error; @@ -34,20 +32,15 @@ use tracing::*; /// nodes in the process. Upon encountering a leaf node, it will poll the storage root /// task for the corresponding hashed address. /// -/// Internally, the calculator uses [`ConsistentDbView`] since -/// it needs to rely on database state saying the same until -/// the last transaction is open. -/// See docs of using [`ConsistentDbView`] for caveats. -/// /// Note: This implementation only serves as a fallback for the sparse trie-based /// state root calculation. The sparse trie approach is more efficient as it avoids traversing /// the entire trie, only operating on the modified parts. #[derive(Debug)] pub struct ParallelStateRoot { - /// Consistent view of the database. - view: ConsistentDbView, - /// Trie input. - input: TrieInput, + /// Factory for creating state providers. + factory: Factory, + // Prefix sets indicating which portions of the trie need to be recomputed. + prefix_sets: TriePrefixSets, /// Parallel state root metrics. #[cfg(feature = "metrics")] metrics: ParallelStateRootMetrics, @@ -55,10 +48,10 @@ pub struct ParallelStateRoot { impl ParallelStateRoot { /// Create new parallel state root calculator. - pub fn new(view: ConsistentDbView, input: TrieInput) -> Self { + pub fn new(factory: Factory, prefix_sets: TriePrefixSets) -> Self { Self { - view, - input, + factory, + prefix_sets, #[cfg(feature = "metrics")] metrics: ParallelStateRootMetrics::default(), } @@ -67,7 +60,10 @@ impl ParallelStateRoot { impl ParallelStateRoot where - Factory: DatabaseProviderFactory + Clone + Send + Sync + 'static, + Factory: DatabaseProviderROFactory + + Clone + + Send + + 'static, { /// Calculate incremental state root in parallel. pub fn incremental_root(self) -> Result { @@ -88,12 +84,12 @@ where retain_updates: bool, ) -> Result<(B256, TrieUpdates), ParallelStateRootError> { let mut tracker = ParallelTrieTracker::default(); - let trie_nodes_sorted = Arc::new(self.input.nodes.into_sorted()); - let hashed_state_sorted = Arc::new(self.input.state.into_sorted()); - let prefix_sets = self.input.prefix_sets.freeze(); let storage_root_targets = StorageRootTargets::new( - prefix_sets.account_prefix_set.iter().map(|nibbles| B256::from_slice(&nibbles.pack())), - prefix_sets.storage_prefix_sets, + self.prefix_sets + .account_prefix_set + .iter() + .map(|nibbles| B256::from_slice(&nibbles.pack())), + self.prefix_sets.storage_prefix_sets, ); // Pre-calculate storage roots in parallel for accounts which were changed. @@ -107,9 +103,7 @@ where for (hashed_address, prefix_set) in storage_root_targets.into_iter().sorted_unstable_by_key(|(address, _)| *address) { - let view = self.view.clone(); - let hashed_state_sorted = hashed_state_sorted.clone(); - let trie_nodes_sorted = trie_nodes_sorted.clone(); + let factory = self.factory.clone(); #[cfg(feature = "metrics")] let metrics = self.metrics.storage_trie.clone(); @@ -118,18 +112,10 @@ where // Spawn a blocking task to calculate account's storage root from database I/O drop(handle.spawn_blocking(move || { let result = (|| -> Result<_, ParallelStateRootError> { - let provider_ro = view.provider_ro()?; - let trie_cursor_factory = InMemoryTrieCursorFactory::new( - DatabaseTrieCursorFactory::new(provider_ro.tx_ref()), - &trie_nodes_sorted, - ); - let hashed_state = HashedPostStateCursorFactory::new( - DatabaseHashedCursorFactory::new(provider_ro.tx_ref()), - &hashed_state_sorted, - ); + let provider = factory.database_provider_ro()?; Ok(StorageRoot::new_hashed( - trie_cursor_factory, - hashed_state, + &provider, + &provider, hashed_address, prefix_set, #[cfg(feature = "metrics")] @@ -145,24 +131,16 @@ where trace!(target: "trie::parallel_state_root", "calculating state root"); let mut trie_updates = TrieUpdates::default(); - let provider_ro = self.view.provider_ro()?; - let trie_cursor_factory = InMemoryTrieCursorFactory::new( - DatabaseTrieCursorFactory::new(provider_ro.tx_ref()), - &trie_nodes_sorted, - ); - let hashed_cursor_factory = HashedPostStateCursorFactory::new( - DatabaseHashedCursorFactory::new(provider_ro.tx_ref()), - &hashed_state_sorted, - ); + let provider = self.factory.database_provider_ro()?; let walker = TrieWalker::<_>::state_trie( - trie_cursor_factory.account_trie_cursor().map_err(ProviderError::Database)?, - prefix_sets.account_prefix_set, + provider.account_trie_cursor().map_err(ProviderError::Database)?, + self.prefix_sets.account_prefix_set, ) .with_deletions_retained(retain_updates); let mut account_node_iter = TrieNodeIter::state_trie( walker, - hashed_cursor_factory.hashed_account_cursor().map_err(ProviderError::Database)?, + provider.hashed_account_cursor().map_err(ProviderError::Database)?, ); let mut hash_builder = HashBuilder::default().with_updates(retain_updates); @@ -186,8 +164,8 @@ where None => { tracker.inc_missed_leaves(); StorageRoot::new_hashed( - trie_cursor_factory.clone(), - hashed_cursor_factory.clone(), + &provider, + &provider, hashed_address, Default::default(), #[cfg(feature = "metrics")] @@ -223,7 +201,7 @@ where let root = hash_builder.root(); let removed_keys = account_node_iter.walker.take_removed_keys(); - trie_updates.finalize(hash_builder, removed_keys, prefix_sets.destroyed_accounts); + trie_updates.finalize(hash_builder, removed_keys, self.prefix_sets.destroyed_accounts); let stats = tracker.finish(); @@ -306,11 +284,13 @@ mod tests { use reth_primitives_traits::{Account, StorageEntry}; use reth_provider::{test_utils::create_test_provider_factory, HashingWriter}; use reth_trie::{test_utils, HashedPostState, HashedStorage}; + use std::sync::Arc; #[tokio::test] async fn random_parallel_root() { let factory = create_test_provider_factory(); - let consistent_view = ConsistentDbView::new(factory.clone(), None); + let mut overlay_factory = + reth_provider::providers::OverlayStateProviderFactory::new(factory.clone()); let mut rng = rand::rng(); let mut state = (0..100) @@ -353,7 +333,7 @@ mod tests { } assert_eq!( - ParallelStateRoot::new(consistent_view.clone(), Default::default()) + ParallelStateRoot::new(overlay_factory.clone(), Default::default()) .incremental_root() .unwrap(), test_utils::state_root(state.clone()) @@ -384,8 +364,12 @@ mod tests { } } + let prefix_sets = hashed_state.construct_prefix_sets(); + overlay_factory = + overlay_factory.with_hashed_state_overlay(Some(Arc::new(hashed_state.into_sorted()))); + assert_eq!( - ParallelStateRoot::new(consistent_view, TrieInput::from_state(hashed_state)) + ParallelStateRoot::new(overlay_factory, prefix_sets.freeze()) .incremental_root() .unwrap(), test_utils::state_root(state) diff --git a/crates/trie/trie/src/proof/trie_node.rs b/crates/trie/trie/src/proof/trie_node.rs index 3d964cf5e8b..3e197072d49 100644 --- a/crates/trie/trie/src/proof/trie_node.rs +++ b/crates/trie/trie/src/proof/trie_node.rs @@ -81,19 +81,18 @@ impl ProofBlindedAccountProvider { impl TrieNodeProvider for ProofBlindedAccountProvider where - T: TrieCursorFactory + Clone + Send + Sync, - H: HashedCursorFactory + Clone + Send + Sync, + T: TrieCursorFactory, + H: HashedCursorFactory, { fn trie_node(&self, path: &Nibbles) -> Result, SparseTrieError> { let start = enabled!(target: "trie::proof::blinded", Level::TRACE).then(Instant::now); let targets = MultiProofTargets::from_iter([(pad_path_to_key(path), HashSet::default())]); - let mut proof = - Proof::new(self.trie_cursor_factory.clone(), self.hashed_cursor_factory.clone()) - .with_prefix_sets_mut(self.prefix_sets.as_ref().clone()) - .with_branch_node_masks(true) - .multiproof(targets) - .map_err(|error| SparseTrieErrorKind::Other(Box::new(error)))?; + let mut proof = Proof::new(&self.trie_cursor_factory, &self.hashed_cursor_factory) + .with_prefix_sets_mut(self.prefix_sets.as_ref().clone()) + .with_branch_node_masks(true) + .multiproof(targets) + .map_err(|error| SparseTrieErrorKind::Other(Box::new(error)))?; let node = proof.account_subtree.into_inner().remove(path); let tree_mask = proof.branch_node_tree_masks.remove(path); let hash_mask = proof.branch_node_hash_masks.remove(path); @@ -138,8 +137,8 @@ impl ProofBlindedStorageProvider { impl TrieNodeProvider for ProofBlindedStorageProvider where - T: TrieCursorFactory + Clone + Send + Sync, - H: HashedCursorFactory + Clone + Send + Sync, + T: TrieCursorFactory, + H: HashedCursorFactory, { fn trie_node(&self, path: &Nibbles) -> Result, SparseTrieError> { let start = enabled!(target: "trie::proof::blinded", Level::TRACE).then(Instant::now); @@ -148,8 +147,8 @@ where let storage_prefix_set = self.prefix_sets.storage_prefix_sets.get(&self.account).cloned().unwrap_or_default(); let mut proof = StorageProof::new_hashed( - self.trie_cursor_factory.clone(), - self.hashed_cursor_factory.clone(), + &self.trie_cursor_factory, + &self.hashed_cursor_factory, self.account, ) .with_prefix_set_mut(storage_prefix_set)