Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .github/assets/check_wasm.sh
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ exclude_crates=(
reth-payload-builder # reth-metrics
reth-provider # tokio
reth-prune # tokio
reth-prune-static-files # reth-provider
reth-stages-api # reth-provider, reth-prune
reth-static-file # tokio
reth-transaction-pool # c-kzg
Expand Down
41 changes: 3 additions & 38 deletions crates/node/builder/src/launch/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,12 +34,11 @@ use crate::{
hooks::OnComponentInitializedHook,
BuilderContext, ExExLauncher, NodeAdapter, PrimitivesTy,
};
use alloy_consensus::BlockHeader as _;
use alloy_eips::eip2124::Head;
use alloy_primitives::{BlockNumber, B256};
use eyre::Context;
use rayon::ThreadPoolBuilder;
use reth_chainspec::{Chain, EthChainSpec, EthereumHardfork, EthereumHardforks};
use reth_chainspec::{Chain, EthChainSpec, EthereumHardforks};
use reth_config::{config::EtlConfig, PruneConfig};
use reth_consensus::noop::NoopConsensus;
use reth_db_api::{database::Database, database_metrics::DatabaseMetrics};
Expand Down Expand Up @@ -67,8 +66,8 @@ use reth_node_metrics::{
};
use reth_provider::{
providers::{NodeTypesForProvider, ProviderNodeTypes, StaticFileProvider},
BlockHashReader, BlockNumReader, BlockReaderIdExt, ProviderError, ProviderFactory,
ProviderResult, StageCheckpointReader, StaticFileProviderFactory,
BlockHashReader, BlockNumReader, ProviderError, ProviderFactory, ProviderResult,
StageCheckpointReader, StaticFileProviderFactory,
};
use reth_prune::{PruneModes, PrunerBuilder};
use reth_rpc_builder::config::RethRpcServerConfig;
Expand Down Expand Up @@ -945,40 +944,6 @@ where
Ok(None)
}

/// Expire the pre-merge transactions if the node is configured to do so and the chain has a
/// merge block.
///
/// If the node is configured to prune pre-merge transactions and it has synced past the merge
/// block, it will delete the pre-merge transaction static files if they still exist.
pub fn expire_pre_merge_transactions(&self) -> eyre::Result<()>
where
T: FullNodeTypes<Provider: StaticFileProviderFactory>,
{
if self.node_config().pruning.bodies_pre_merge &&
let Some(merge_block) = self
.chain_spec()
.ethereum_fork_activation(EthereumHardfork::Paris)
.block_number()
{
// Ensure we only expire transactions after we synced past the merge block.
let Some(latest) = self.blockchain_db().latest_header()? else { return Ok(()) };
if latest.number() > merge_block {
let provider = self.blockchain_db().static_file_provider();
if provider
.get_lowest_transaction_static_file_block()
.is_some_and(|lowest| lowest < merge_block)
{
info!(target: "reth::cli", merge_block, "Expiring pre-merge transactions");
provider.delete_transactions_below(merge_block)?;
} else {
debug!(target: "reth::cli", merge_block, "No pre-merge transactions to expire");
}
}
}

Ok(())
}

/// Returns the metrics sender.
pub fn sync_metrics_tx(&self) -> UnboundedSender<MetricEvent> {
self.right().db_provider_container.metrics_sender.clone()
Expand Down
3 changes: 0 additions & 3 deletions crates/node/builder/src/launch/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -117,9 +117,6 @@ impl EngineNodeLauncher {
})?
.with_components(components_builder, on_component_initialized).await?;

// Try to expire pre-merge transaction history if configured
ctx.expire_pre_merge_transactions()?;

// spawn exexs if any
let maybe_exex_manager_handle = ctx.launch_exex(installed_exex).await?;

Expand Down
6 changes: 4 additions & 2 deletions crates/node/core/src/args/pruning.rs
Original file line number Diff line number Diff line change
Expand Up @@ -131,8 +131,10 @@ impl PruningArgs {
receipts: Some(PruneMode::Distance(MINIMUM_PRUNING_DISTANCE)),
account_history: Some(PruneMode::Distance(MINIMUM_PRUNING_DISTANCE)),
storage_history: Some(PruneMode::Distance(MINIMUM_PRUNING_DISTANCE)),
// TODO: set default to pre-merge block if available
bodies_history: None,
bodies_history: chain_spec
.ethereum_fork_activation(EthereumHardfork::Paris)
.block_number()
.map(PruneMode::Before),
Comment on lines +134 to +137
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure we should do this in this pr

I'd prefer a dedicated pr for this

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would rather have it in the same PR, because it removes the explicit ctx.expire_pre_merge_transactions() call, and we now rely on the pruner running on node startup with new bodies pruning.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see, makes sense

merkle_changesets: PruneMode::Distance(MINIMUM_PRUNING_DISTANCE),
#[expect(deprecated)]
receipts_log_filter: (),
Expand Down
4 changes: 2 additions & 2 deletions crates/prune/prune/src/segments/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,8 @@ pub use set::SegmentSet;
use std::{fmt::Debug, ops::RangeInclusive};
use tracing::error;
pub use user::{
AccountHistory, MerkleChangeSets, Receipts as UserReceipts, SenderRecovery, StorageHistory,
TransactionLookup,
AccountHistory, Bodies, MerkleChangeSets, Receipts as UserReceipts, SenderRecovery,
StorageHistory, TransactionLookup,
};

/// A segment represents a pruning of some portion of the data.
Expand Down
8 changes: 5 additions & 3 deletions crates/prune/prune/src/segments/set.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use crate::segments::{
AccountHistory, MerkleChangeSets, Segment, SenderRecovery, StorageHistory, TransactionLookup,
UserReceipts,
AccountHistory, Bodies, MerkleChangeSets, Segment, SenderRecovery, StorageHistory,
TransactionLookup, UserReceipts,
};
use alloy_eips::eip2718::Encodable2718;
use reth_db_api::{table::Value, transaction::DbTxMut};
Expand Down Expand Up @@ -66,12 +66,14 @@ where
receipts,
account_history,
storage_history,
bodies_history: _,
bodies_history,
merkle_changesets,
receipts_log_filter: (),
} = prune_modes;

Self::default()
// Bodies - run first since file deletion is fast
.segment_opt(bodies_history.map(Bodies::new))
// Merkle changesets
.segment(MerkleChangeSets::new(merkle_changesets))
// Account history
Expand Down
210 changes: 210 additions & 0 deletions crates/prune/prune/src/segments/user/bodies.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,210 @@
use crate::{
segments::{PruneInput, Segment},
PrunerError,
};
use reth_provider::{BlockReader, StaticFileProviderFactory};
use reth_prune_types::{
PruneMode, PruneProgress, PrunePurpose, PruneSegment, SegmentOutput, SegmentOutputCheckpoint,
};
use reth_static_file_types::StaticFileSegment;

/// Segment responsible for pruning transactions in static files.
///
/// This segment is controlled by the `bodies_history` configuration.
#[derive(Debug)]
pub struct Bodies {
mode: PruneMode,
}

impl Bodies {
/// Creates a new [`Bodies`] segment with the given prune mode.
pub const fn new(mode: PruneMode) -> Self {
Self { mode }
}
}

impl<Provider> Segment<Provider> for Bodies
where
Provider: StaticFileProviderFactory + BlockReader,
{
fn segment(&self) -> PruneSegment {
PruneSegment::Bodies
}

fn mode(&self) -> Option<PruneMode> {
Some(self.mode)
}

fn purpose(&self) -> PrunePurpose {
PrunePurpose::User
}

fn prune(&self, provider: &Provider, input: PruneInput) -> Result<SegmentOutput, PrunerError> {
let deleted_headers = provider
.static_file_provider()
.delete_segment_below_block(StaticFileSegment::Transactions, input.to_block + 1)?;

if deleted_headers.is_empty() {
return Ok(SegmentOutput::done())
}

let tx_ranges = deleted_headers.iter().filter_map(|header| header.tx_range());

let pruned = tx_ranges.clone().map(|range| range.len()).sum::<u64>() as usize;

Ok(SegmentOutput {
progress: PruneProgress::Finished,
pruned,
checkpoint: Some(SegmentOutputCheckpoint {
block_number: Some(input.to_block),
tx_number: tx_ranges.map(|range| range.end()).max(),
}),
})
}
}

#[cfg(test)]
mod tests {
use super::*;
use crate::Pruner;
use alloy_primitives::BlockNumber;
use reth_exex_types::FinishedExExHeight;
use reth_provider::{
test_utils::{create_test_provider_factory, MockNodeTypesWithDB},
ProviderFactory, StaticFileWriter,
};
use reth_prune_types::{PruneMode, PruneProgress, PruneSegment};
use reth_static_file_types::{
SegmentHeader, SegmentRangeInclusive, StaticFileSegment, DEFAULT_BLOCKS_PER_STATIC_FILE,
};

/// Creates empty static file jars at 500k block intervals up to the tip block.
///
/// Each jar contains sequential transaction ranges for testing deletion logic.
fn setup_static_file_jars<P: StaticFileProviderFactory>(provider: &P, tip_block: u64) {
let num_jars = (tip_block + 1) / DEFAULT_BLOCKS_PER_STATIC_FILE;
let txs_per_jar = 1000;
let static_file_provider = provider.static_file_provider();

let mut writer =
static_file_provider.latest_writer(StaticFileSegment::Transactions).unwrap();

for jar_idx in 0..num_jars {
let block_start = jar_idx * DEFAULT_BLOCKS_PER_STATIC_FILE;
let block_end = ((jar_idx + 1) * DEFAULT_BLOCKS_PER_STATIC_FILE - 1).min(tip_block);

let tx_start = jar_idx * txs_per_jar;
let tx_end = tx_start + txs_per_jar - 1;

*writer.user_header_mut() = SegmentHeader::new(
SegmentRangeInclusive::new(block_start, block_end),
Some(SegmentRangeInclusive::new(block_start, block_end)),
Some(SegmentRangeInclusive::new(tx_start, tx_end)),
StaticFileSegment::Transactions,
);

writer.inner().set_dirty();
writer.commit().expect("commit empty jar");

if jar_idx < num_jars - 1 {
writer.increment_block(block_end + 1).expect("increment block");
}
}

static_file_provider.initialize_index().expect("initialize index");
}

struct PruneTestCase {
prune_mode: PruneMode,
expected_pruned: usize,
expected_lowest_block: Option<BlockNumber>,
}

fn run_prune_test(
factory: &ProviderFactory<MockNodeTypesWithDB>,
finished_exex_height_rx: &tokio::sync::watch::Receiver<FinishedExExHeight>,
test_case: PruneTestCase,
tip: BlockNumber,
) {
let bodies = Bodies::new(test_case.prune_mode);
let segments: Vec<Box<dyn Segment<_>>> = vec![Box::new(bodies)];

let mut pruner = Pruner::new_with_factory(
factory.clone(),
segments,
5,
10000,
None,
finished_exex_height_rx.clone(),
);

let result = pruner.run(tip).expect("pruner run");

assert_eq!(result.progress, PruneProgress::Finished);
assert_eq!(result.segments.len(), 1);

let (segment, output) = &result.segments[0];
assert_eq!(*segment, PruneSegment::Bodies);
assert_eq!(output.pruned, test_case.expected_pruned);

let static_provider = factory.static_file_provider();
assert_eq!(
static_provider.get_lowest_static_file_block(StaticFileSegment::Transactions),
test_case.expected_lowest_block
);
assert_eq!(
static_provider.get_highest_static_file_block(StaticFileSegment::Transactions),
Some(tip)
);
}

#[test]
fn bodies_prune_through_pruner() {
let factory = create_test_provider_factory();
let tip = 2_499_999;
setup_static_file_jars(&factory, tip);

let (_, finished_exex_height_rx) = tokio::sync::watch::channel(FinishedExExHeight::NoExExs);

let test_cases = vec![
// Test 1: PruneMode::Before(750_000) → deletes jar 1 (0-499_999)
PruneTestCase {
prune_mode: PruneMode::Before(750_000),
expected_pruned: 1000,
expected_lowest_block: Some(999_999),
},
// Test 2: PruneMode::Before(850_000) → no deletion (jar 2: 500_000-999_999 contains
// target)
PruneTestCase {
prune_mode: PruneMode::Before(850_000),
expected_pruned: 0,
expected_lowest_block: Some(999_999),
},
// Test 3: PruneMode::Before(1_599_999) → deletes jar 2 (500_000-999_999) and jar 3
// (1_000_000-1_499_999)
PruneTestCase {
prune_mode: PruneMode::Before(1_599_999),
expected_pruned: 2000,
expected_lowest_block: Some(1_999_999),
},
// Test 4: PruneMode::Distance(500_000) with tip=2_499_999 → deletes jar 4
// (1_500_000-1_999_999)
PruneTestCase {
prune_mode: PruneMode::Distance(500_000),
expected_pruned: 1000,
expected_lowest_block: Some(2_499_999),
},
// Test 5: PruneMode::Before(2_300_000) → no deletion (jar 5: 2_000_000-2_499_999
// contains target)
PruneTestCase {
prune_mode: PruneMode::Before(2_300_000),
expected_pruned: 0,
expected_lowest_block: Some(2_499_999),
},
];

for test_case in test_cases {
run_prune_test(&factory, &finished_exex_height_rx, test_case, tip);
}
}
}
2 changes: 2 additions & 0 deletions crates/prune/prune/src/segments/user/mod.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
mod account_history;
mod bodies;
mod history;
mod merkle_change_sets;
mod receipts;
Expand All @@ -7,6 +8,7 @@ mod storage_history;
mod transaction_lookup;

pub use account_history::AccountHistory;
pub use bodies::Bodies;
pub use merkle_change_sets::MerkleChangeSets;
pub use receipts::Receipts;
pub use sender_recovery::SenderRecovery;
Expand Down
3 changes: 3 additions & 0 deletions crates/prune/types/src/segment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ pub enum PruneSegment {
/// Prune segment responsible for all rows in `AccountsTrieChangeSets` and
/// `StoragesTrieChangeSets` table.
MerkleChangeSets,
/// Prune segment responsible for bodies (transactions in static files).
Bodies,
}

#[cfg(test)]
Expand All @@ -56,6 +58,7 @@ impl PruneSegment {
Self::AccountHistory |
Self::StorageHistory |
Self::MerkleChangeSets |
Self::Bodies |
Self::Receipts => MINIMUM_PRUNING_DISTANCE,
#[expect(deprecated)]
#[expect(clippy::match_same_arms)]
Expand Down
Loading