Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 0 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 0 additions & 2 deletions crates/engine/tree/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ reth-prune.workspace = true
reth-revm.workspace = true
reth-stages-api.workspace = true
reth-tasks.workspace = true
reth-trie-db.workspace = true
reth-trie-parallel.workspace = true
reth-trie-sparse = { workspace = true, features = ["std", "metrics"] }
reth-trie-sparse-parallel = { workspace = true, features = ["std"] }
Expand Down Expand Up @@ -134,7 +133,6 @@ test-utils = [
"reth-trie/test-utils",
"reth-trie-sparse/test-utils",
"reth-prune-types?/test-utils",
"reth-trie-db/test-utils",
"reth-trie-parallel/test-utils",
"reth-ethereum-primitives/test-utils",
"reth-node-ethereum/test-utils",
Expand Down
6 changes: 2 additions & 4 deletions crates/engine/tree/benches/state_root_task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,10 @@ use reth_evm::OnStateHook;
use reth_evm_ethereum::EthEvmConfig;
use reth_primitives_traits::{Account as RethAccount, Recovered, StorageEntry};
use reth_provider::{
providers::{BlockchainProvider, ConsistentDbView},
providers::{BlockchainProvider, OverlayStateProviderFactory},
test_utils::{create_test_provider_factory_with_chain_spec, MockNodeTypesWithDB},
AccountReader, ChainSpecProvider, HashingWriter, ProviderFactory,
};
use reth_trie::TrieInput;
use revm_primitives::{HashMap, U256};
use revm_state::{Account as RevmAccount, AccountInfo, AccountStatus, EvmState, EvmStorageSlot};
use std::{hint::black_box, sync::Arc};
Expand Down Expand Up @@ -238,8 +237,7 @@ fn bench_state_root(c: &mut Criterion) {
>,
>(),
StateProviderBuilder::new(provider.clone(), genesis_hash, None),
ConsistentDbView::new_with_latest_tip(provider).unwrap(),
TrieInput::default(),
OverlayStateProviderFactory::new(provider),
&TreeConfig::default(),
)
.map_err(|(err, ..)| err)
Expand Down
8 changes: 7 additions & 1 deletion crates/engine/tree/src/persistence.rs
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,10 @@ where
&self,
blocks: Vec<ExecutedBlock<N::Primitives>>,
) -> Result<Option<BlockNumHash>, PersistenceError> {
debug!(target: "engine::persistence", first=?blocks.first().map(|b| b.recovered_block.num_hash()), last=?blocks.last().map(|b| b.recovered_block.num_hash()), "Saving range of blocks");
let first_block_hash = blocks.first().map(|b| b.recovered_block.num_hash());
let last_block_hash = blocks.last().map(|b| b.recovered_block.num_hash());
debug!(target: "engine::persistence", first=?first_block_hash, last=?last_block_hash, "Saving range of blocks");

let start_time = Instant::now();
let last_block_hash_num = blocks.last().map(|block| BlockNumHash {
hash: block.recovered_block().hash(),
Expand All @@ -155,6 +158,9 @@ where
provider_rw.save_blocks(blocks)?;
provider_rw.commit()?;
}

debug!(target: "engine::persistence", first=?first_block_hash, last=?last_block_hash, "Saved range of blocks");

self.metrics.save_blocks_duration_seconds.record(start_time.elapsed());
Ok(last_block_hash_num)
}
Expand Down
5 changes: 2 additions & 3 deletions crates/engine/tree/src/tree/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,8 @@ use reth_payload_primitives::{
};
use reth_primitives_traits::{NodePrimitives, RecoveredBlock, SealedBlock, SealedHeader};
use reth_provider::{
providers::ConsistentDbView, BlockReader, DatabaseProviderFactory, HashedPostStateProvider,
ProviderError, StateProviderBox, StateProviderFactory, StateReader, TransactionVariant,
TrieReader,
BlockReader, DatabaseProviderFactory, HashedPostStateProvider, ProviderError, StateProviderBox,
StateProviderFactory, StateReader, TransactionVariant, TrieReader,
};
use reth_revm::database::StateProviderDatabase;
use reth_stages_api::ControlFlow;
Expand Down
60 changes: 24 additions & 36 deletions crates/engine/tree/src/tree/payload_processor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,12 @@ use reth_evm::{
ConfigureEvm, EvmEnvFor, OnStateHook, SpecFor, TxEnvFor,
};
use reth_primitives_traits::NodePrimitives;
use reth_provider::{
providers::ConsistentDbView, BlockReader, DatabaseProviderFactory, StateProviderFactory,
StateReader,
};
use reth_provider::{BlockReader, DatabaseProviderROFactory, StateProviderFactory, StateReader};
use reth_revm::{db::BundleState, state::EvmState};
use reth_trie::TrieInput;
use reth_trie::{
hashed_cursor::HashedCursorFactory, prefix_set::TriePrefixSetsMut,
trie_cursor::TrieCursorFactory,
};
use reth_trie_parallel::{
proof_task::{ProofTaskCtx, ProofWorkerHandle},
root::ParallelStateRootError,
Expand Down Expand Up @@ -121,8 +121,6 @@ where
>,
/// Whether to disable the parallel sparse trie.
disable_parallel_sparse_trie: bool,
/// A cleared trie input, kept around to be reused so allocations can be minimized.
trie_input: Option<TrieInput>,
Comment on lines -124 to -125
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

epic

/// Maximum concurrency for prewarm task.
prewarm_max_concurrency: usize,
}
Expand All @@ -149,7 +147,6 @@ where
precompile_cache_disabled: config.precompile_cache_disabled(),
precompile_cache_map,
sparse_state_trie: Arc::default(),
trie_input: None,
disable_parallel_sparse_trie: config.disable_parallel_sparse_trie(),
prewarm_max_concurrency: config.prewarm_max_concurrency(),
}
Expand Down Expand Up @@ -200,50 +197,45 @@ where
name = "payload processor",
skip_all
)]
pub fn spawn<P, I: ExecutableTxIterator<Evm>>(
pub fn spawn<P, F, I: ExecutableTxIterator<Evm>>(
&mut self,
env: ExecutionEnv<Evm>,
transactions: I,
provider_builder: StateProviderBuilder<N, P>,
consistent_view: ConsistentDbView<P>,
trie_input: TrieInput,
multiproof_provider_factory: F,
config: &TreeConfig,
) -> Result<
PayloadHandle<WithTxEnv<TxEnvFor<Evm>, I::Tx>, I::Error>,
(ParallelStateRootError, I, ExecutionEnv<Evm>, StateProviderBuilder<N, P>),
>
where
P: DatabaseProviderFactory<Provider: BlockReader>
+ BlockReader
+ StateProviderFactory
+ StateReader
P: BlockReader + StateProviderFactory + StateReader + Clone + 'static,
F: DatabaseProviderROFactory<Provider: TrieCursorFactory + HashedCursorFactory>
+ Clone
+ Send
+ 'static,
{
let span = tracing::Span::current();
let (to_sparse_trie, sparse_trie_rx) = channel();
// spawn multiproof task, save the trie input
let (trie_input, state_root_config) = MultiProofConfig::from_input(trie_input);
self.trie_input = Some(trie_input);

// We rely on the cursor factory to provide whatever DB overlay is necessary to see a
// consistent view of the database, including the trie tables. Because of this there is no
// need for an overarching prefix set to invalidate any section of the trie tables, and so
// we use an empty prefix set.
let prefix_sets = Arc::new(TriePrefixSetsMut::default());
Comment on lines +220 to +225
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can prefix_sets be removed from ProofTaskCtx too then?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think so yes, I had thought it would be good to keep it around in case we wanted to/needed to use it for some future use-case, but at the moment removing it would be possible.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can do as a follow-up, just nicer to not pass defaults everywhere if it's unused anyway


// Create and spawn the storage proof task
let task_ctx = ProofTaskCtx::new(
state_root_config.nodes_sorted.clone(),
state_root_config.state_sorted.clone(),
state_root_config.prefix_sets.clone(),
);
let task_ctx = ProofTaskCtx::new(multiproof_provider_factory, prefix_sets);
let storage_worker_count = config.storage_worker_count();
let account_worker_count = config.account_worker_count();
let proof_handle = ProofWorkerHandle::new(
self.executor.handle().clone(),
consistent_view,
task_ctx,
storage_worker_count,
account_worker_count,
);

let multi_proof_task = MultiProofTask::new(
state_root_config,
proof_handle.clone(),
to_sparse_trie,
config.multiproof_chunking_enabled().then_some(config.multiproof_chunk_size()),
Expand Down Expand Up @@ -393,11 +385,6 @@ where
CacheTaskHandle { cache, to_prewarm_task: Some(to_prewarm_task), cache_metrics }
}

/// Takes the trie input from the inner payload processor, if it exists.
pub const fn take_trie_input(&mut self) -> Option<TrieInput> {
self.trie_input.take()
}

/// Returns the cache for the given parent hash.
///
/// If the given hash is different then what is recently cached, then this will create a new
Expand Down Expand Up @@ -718,12 +705,12 @@ mod tests {
use reth_evm_ethereum::EthEvmConfig;
use reth_primitives_traits::{Account, Recovered, StorageEntry};
use reth_provider::{
providers::{BlockchainProvider, ConsistentDbView},
providers::{BlockchainProvider, OverlayStateProviderFactory},
test_utils::create_test_provider_factory_with_chain_spec,
ChainSpecProvider, HashingWriter,
};
use reth_testing_utils::generators;
use reth_trie::{test_utils::state_root, HashedPostState, TrieInput};
use reth_trie::{test_utils::state_root, HashedPostState};
use revm_primitives::{Address, HashMap, B256, KECCAK_EMPTY, U256};
use revm_state::{AccountInfo, AccountStatus, EvmState, EvmStorageSlot};
use std::sync::Arc;
Expand Down Expand Up @@ -905,17 +892,18 @@ mod tests {
&TreeConfig::default(),
PrecompileCacheMap::default(),
);
let provider = BlockchainProvider::new(factory).unwrap();

let provider_factory = BlockchainProvider::new(factory).unwrap();

let mut handle =
payload_processor
.spawn(
Default::default(),
core::iter::empty::<
Result<Recovered<TransactionSigned>, core::convert::Infallible>,
>(),
StateProviderBuilder::new(provider.clone(), genesis_hash, None),
ConsistentDbView::new_with_latest_tip(provider).unwrap(),
TrieInput::from_state(hashed_state),
StateProviderBuilder::new(provider_factory.clone(), genesis_hash, None),
OverlayStateProviderFactory::new(provider_factory),
&TreeConfig::default(),
)
.map_err(|(err, ..)| err)
Expand Down
42 changes: 17 additions & 25 deletions crates/engine/tree/src/tree/payload_processor/multiproof.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,8 +57,8 @@ impl SparseTrieUpdate {
}

/// Common configuration for multi proof tasks
#[derive(Debug, Clone)]
pub(super) struct MultiProofConfig {
#[derive(Debug, Clone, Default)]
pub(crate) struct MultiProofConfig {
/// The sorted collection of cached in-memory intermediate trie nodes that
/// can be reused for computation.
pub nodes_sorted: Arc<TrieUpdatesSorted>,
Expand All @@ -75,7 +75,7 @@ impl MultiProofConfig {
///
/// This returns a cleared [`TrieInput`] so that we can reuse any allocated space in the
/// [`TrieInput`].
pub(super) fn from_input(mut input: TrieInput) -> (TrieInput, Self) {
pub(crate) fn from_input(mut input: TrieInput) -> (TrieInput, Self) {
let config = Self {
nodes_sorted: Arc::new(input.nodes.drain_into_sorted()),
state_sorted: Arc::new(input.state.drain_into_sorted()),
Expand Down Expand Up @@ -289,7 +289,6 @@ impl StorageMultiproofInput {
/// Input parameters for dispatching a multiproof calculation.
#[derive(Debug)]
struct MultiproofInput {
config: MultiProofConfig,
source: Option<StateChangeSource>,
hashed_state_update: HashedPostState,
proof_targets: MultiProofTargets,
Expand Down Expand Up @@ -458,7 +457,6 @@ impl MultiproofManager {
/// Dispatches a single multiproof calculation to worker pool.
fn dispatch_multiproof(&mut self, multiproof_input: MultiproofInput) {
let MultiproofInput {
config,
source,
hashed_state_update,
proof_targets,
Expand All @@ -485,7 +483,7 @@ impl MultiproofManager {

// Extend prefix sets with targets
let frozen_prefix_sets =
ParallelProof::extend_prefix_sets_with_targets(&config.prefix_sets, &proof_targets);
ParallelProof::extend_prefix_sets_with_targets(&Default::default(), &proof_targets);

// Dispatch account multiproof to worker pool with result sender
let input = AccountMultiproofInput {
Expand Down Expand Up @@ -671,8 +669,6 @@ pub(super) struct MultiProofTask {
/// The size of proof targets chunk to spawn in one calculation.
/// If None, chunking is disabled and all targets are processed in a single proof.
chunk_size: Option<usize>,
/// Task configuration.
config: MultiProofConfig,
/// Receiver for state root related messages (prefetch, state updates, finish signal).
rx: CrossbeamReceiver<MultiProofMessage>,
/// Sender for state root related messages.
Expand All @@ -696,7 +692,6 @@ pub(super) struct MultiProofTask {
impl MultiProofTask {
/// Creates a new multi proof task with the unified message channel
pub(super) fn new(
config: MultiProofConfig,
proof_worker_handle: ProofWorkerHandle,
to_sparse_trie: std::sync::mpsc::Sender<SparseTrieUpdate>,
chunk_size: Option<usize>,
Expand All @@ -707,7 +702,6 @@ impl MultiProofTask {

Self {
chunk_size,
config,
rx,
tx,
proof_result_rx,
Expand Down Expand Up @@ -761,7 +755,6 @@ impl MultiProofTask {
let mut dispatch = |proof_targets| {
self.multiproof_manager.dispatch(
MultiproofInput {
config: self.config.clone(),
source: None,
hashed_state_update: Default::default(),
proof_targets,
Expand Down Expand Up @@ -909,7 +902,6 @@ impl MultiProofTask {

self.multiproof_manager.dispatch(
MultiproofInput {
config: self.config.clone(),
source: Some(source),
hashed_state_update,
proof_targets,
Expand Down Expand Up @@ -1253,10 +1245,11 @@ mod tests {
use super::*;
use alloy_primitives::map::B256Set;
use reth_provider::{
providers::ConsistentDbView, test_utils::create_test_provider_factory, BlockReader,
DatabaseProviderFactory,
providers::OverlayStateProviderFactory, test_utils::create_test_provider_factory,
BlockReader, DatabaseProviderFactory, PruneCheckpointReader, StageCheckpointReader,
TrieReader,
};
use reth_trie::{MultiProof, TrieInput};
use reth_trie::MultiProof;
use reth_trie_parallel::proof_task::{ProofTaskCtx, ProofWorkerHandle};
use revm_primitives::{B256, U256};
use std::sync::OnceLock;
Expand All @@ -1275,20 +1268,19 @@ mod tests {

fn create_test_state_root_task<F>(factory: F) -> MultiProofTask
where
F: DatabaseProviderFactory<Provider: BlockReader> + Clone + 'static,
F: DatabaseProviderFactory<
Provider: BlockReader + TrieReader + StageCheckpointReader + PruneCheckpointReader,
> + Clone
+ Send
+ 'static,
{
let rt_handle = get_test_runtime_handle();
let (_trie_input, config) = MultiProofConfig::from_input(TrieInput::default());
let task_ctx = ProofTaskCtx::new(
config.nodes_sorted.clone(),
config.state_sorted.clone(),
config.prefix_sets.clone(),
);
let consistent_view = ConsistentDbView::new(factory, None);
let proof_handle = ProofWorkerHandle::new(rt_handle, consistent_view, task_ctx, 1, 1);
let overlay_factory = OverlayStateProviderFactory::new(factory);
let task_ctx = ProofTaskCtx::new(overlay_factory, Default::default());
let proof_handle = ProofWorkerHandle::new(rt_handle, task_ctx, 1, 1);
let (to_sparse_trie, _receiver) = std::sync::mpsc::channel();

MultiProofTask::new(config, proof_handle, to_sparse_trie, Some(1))
MultiProofTask::new(proof_handle, to_sparse_trie, Some(1))
}

#[test]
Expand Down
Loading
Loading