diff --git a/Cargo.lock b/Cargo.lock index 481fe71df06..716b990af15 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3825,6 +3825,7 @@ name = "fork_choice" version = "0.1.0" dependencies = [ "beacon_chain", + "criterion", "ethereum_ssz", "ethereum_ssz_derive", "logging", diff --git a/consensus/fork_choice/Cargo.toml b/consensus/fork_choice/Cargo.toml index 0a244c2ba19..07b7eb7a03a 100644 --- a/consensus/fork_choice/Cargo.toml +++ b/consensus/fork_choice/Cargo.toml @@ -18,5 +18,10 @@ types = { workspace = true } [dev-dependencies] beacon_chain = { workspace = true } +criterion = { workspace = true } store = { workspace = true } tokio = { workspace = true } + +[[bench]] +name = "benches" +harness = false diff --git a/consensus/fork_choice/benches/benches.rs b/consensus/fork_choice/benches/benches.rs new file mode 100644 index 00000000000..e94f28d1f9c --- /dev/null +++ b/consensus/fork_choice/benches/benches.rs @@ -0,0 +1,58 @@ +use criterion::{BenchmarkId, Criterion, criterion_group, criterion_main}; +use fork_choice::{QueuedAttestation, dequeue_attestations}; +use std::collections::VecDeque; +use types::{Epoch, Hash256, Slot}; + +fn all_benches(c: &mut Criterion) { + let num_attestations = 1_500_000_usize / 16; + let unique_slots = 2_usize; + let num_iterations = 64; + let attestations_per_slot = num_attestations / unique_slots; + let queued_attestations = (0..num_attestations) + .map(|i| QueuedAttestation { + slot: Slot::from(i / attestations_per_slot), + attesting_indices: vec![], + block_root: Hash256::ZERO, + target_epoch: Epoch::new(0), + }) + .collect::>(); + + c.bench_with_input( + BenchmarkId::new("dequeue_attestations", num_attestations), + &queued_attestations, + |b, attestations| { + b.iter(|| { + // Simulate dequeueing and queuing of attestations over multiple slots. + let mut attestations = attestations.clone(); + let end_slot = unique_slots; + for i in 1..=num_iterations { + let dequeued = dequeue_attestations(Slot::from(i), &mut attestations); + assert_eq!(dequeued.len(), attestations_per_slot); + + // Capacity should be unchanged. + assert_eq!(attestations.capacity(), num_attestations); + + let next_slot = end_slot + i - 1; + let new_attestations = std::iter::repeat_n( + QueuedAttestation { + slot: Slot::from(next_slot), + attesting_indices: vec![], + block_root: Hash256::ZERO, + target_epoch: Epoch::new(0), + }, + attestations_per_slot, + ); + + for attestation in new_attestations { + attestations.push_back(attestation); + } + + assert_eq!(attestations.len(), num_attestations); + } + }) + }, + ); +} + +criterion_group!(benches, all_benches); +criterion_main!(benches); diff --git a/consensus/fork_choice/src/fork_choice.rs b/consensus/fork_choice/src/fork_choice.rs index 6565e7cdaf6..ccc3279a1b8 100644 --- a/consensus/fork_choice/src/fork_choice.rs +++ b/consensus/fork_choice/src/fork_choice.rs @@ -11,7 +11,7 @@ use state_processing::{ per_block_processing::errors::AttesterSlashingValidationError, per_epoch_processing, }; use std::cmp::Ordering; -use std::collections::BTreeSet; +use std::collections::{BTreeSet, VecDeque}; use std::marker::PhantomData; use std::time::Duration; use superstruct::superstruct; @@ -234,12 +234,12 @@ fn compute_start_slot_at_epoch(epoch: Epoch) -> Slot { /// Used for queuing attestations from the current slot. Only contains the minimum necessary /// information about the attestation. -#[derive(Clone, PartialEq, Encode, Decode)] +#[derive(Debug, Clone, PartialEq, Encode, Decode)] pub struct QueuedAttestation { - slot: Slot, - attesting_indices: Vec, - block_root: Hash256, - target_epoch: Epoch, + pub slot: Slot, + pub attesting_indices: Vec, + pub block_root: Hash256, + pub target_epoch: Epoch, } impl<'a, E: EthSpec> From> for QueuedAttestation { @@ -253,25 +253,35 @@ impl<'a, E: EthSpec> From> for QueuedAttestation { } } -/// Returns all values in `self.queued_attestations` that have a slot that is earlier than the -/// current slot. Also removes those values from `self.queued_attestations`. -fn dequeue_attestations( +/// Returns all values in `queued_attestations` that have a slot that is earlier than the +/// current slot. Also removes those values from `queued_attestations`. +pub fn dequeue_attestations( current_slot: Slot, - queued_attestations: &mut Vec, -) -> Vec { - let remaining = queued_attestations.split_off( - queued_attestations - .iter() - .position(|a| a.slot >= current_slot) - .unwrap_or(queued_attestations.len()), - ); + queued_attestations: &mut VecDeque, +) -> VecDeque { + // Find the position of the first attestation to keep in the queue, or equivalently the number + // of attestations to pop from the front of the queue. + // + // We are safe to use `partition_point` as we know the queue is sorted by ascending slot. + // Benchmarks show that `partition_point` is substantially faster (-18%) than using `find`. + let to_pop = queued_attestations.partition_point(|a| a.slot < current_slot); + + // Rotate the entries to remove into the *end* of the vec deque. + queued_attestations.rotate_left(to_pop); + + // Use `split_off` to remove the attestations we want to pop from the end of the queue, while + // keeping the same allocation for `queued_attestations` (preserving the capacity so we don't + // need to reallocate on future pushes). + let to_keep = queued_attestations.len().saturating_sub(to_pop); + + let popped = queued_attestations.split_off(to_keep); metrics::inc_counter_by( &metrics::FORK_CHOICE_DEQUEUED_ATTESTATIONS, - queued_attestations.len() as u64, + popped.len() as u64, ); - std::mem::replace(queued_attestations, remaining) + popped } /// Denotes whether an attestation we are processing was received from a block or from gossip. @@ -317,7 +327,7 @@ pub struct ForkChoice { /// The underlying representation of the block DAG. proto_array: ProtoArrayForkChoice, /// Attestations that arrived at the current slot and must be queued for later processing. - queued_attestations: Vec, + queued_attestations: VecDeque, /// Stores a cache of the values required to be sent to the execution layer. forkchoice_update_parameters: ForkchoiceUpdateParameters, _phantom: PhantomData, @@ -399,7 +409,7 @@ where let mut fork_choice = Self { fc_store, proto_array, - queued_attestations: vec![], + queued_attestations: VecDeque::new(), // This will be updated during the next call to `Self::get_head`. forkchoice_update_parameters: ForkchoiceUpdateParameters { head_hash: None, @@ -1115,7 +1125,7 @@ where // Delay consideration in the fork choice until their slot is in the past. // ``` self.queued_attestations - .push(QueuedAttestation::from(attestation)); + .push_back(QueuedAttestation::from(attestation)); } Ok(()) @@ -1384,7 +1394,7 @@ where } /// Returns a reference to the currently queued attestations. - pub fn queued_attestations(&self) -> &[QueuedAttestation] { + pub fn queued_attestations(&self) -> &VecDeque { &self.queued_attestations } @@ -1471,7 +1481,7 @@ where let mut fork_choice = Self { fc_store, proto_array, - queued_attestations: persisted.queued_attestations, + queued_attestations: persisted.queued_attestations.into(), // Will be updated in the following call to `Self::get_head`. forkchoice_update_parameters: ForkchoiceUpdateParameters { head_hash: None, @@ -1513,7 +1523,7 @@ where proto_array: self .proto_array() .as_ssz_container(self.justified_checkpoint(), self.finalized_checkpoint()), - queued_attestations: self.queued_attestations().to_vec(), + queued_attestations: self.queued_attestations().iter().cloned().collect(), } } @@ -1596,7 +1606,7 @@ mod tests { } } - fn get_queued_attestations() -> Vec { + fn get_queued_attestations() -> VecDeque { (1..4) .map(|i| QueuedAttestation { slot: Slot::new(i), @@ -1607,7 +1617,7 @@ mod tests { .collect() } - fn get_slots(queued_attestations: &[QueuedAttestation]) -> Vec { + fn get_slots(queued_attestations: &VecDeque) -> Vec { queued_attestations.iter().map(|a| a.slot.into()).collect() } diff --git a/consensus/fork_choice/src/lib.rs b/consensus/fork_choice/src/lib.rs index afe06dee1bc..da6a3ece716 100644 --- a/consensus/fork_choice/src/lib.rs +++ b/consensus/fork_choice/src/lib.rs @@ -6,6 +6,7 @@ pub use crate::fork_choice::{ AttestationFromBlock, Error, ForkChoice, ForkChoiceView, ForkchoiceUpdateParameters, InvalidAttestation, InvalidBlock, PayloadVerificationStatus, PersistedForkChoice, PersistedForkChoiceV17, PersistedForkChoiceV28, QueuedAttestation, ResetPayloadStatuses, + dequeue_attestations, }; pub use fork_choice_store::ForkChoiceStore; pub use proto_array::{ diff --git a/consensus/fork_choice/tests/tests.rs b/consensus/fork_choice/tests/tests.rs index 67b792ef0d8..1260cd5986e 100644 --- a/consensus/fork_choice/tests/tests.rs +++ b/consensus/fork_choice/tests/tests.rs @@ -11,6 +11,7 @@ use fork_choice::{ ForkChoiceStore, InvalidAttestation, InvalidBlock, PayloadVerificationStatus, QueuedAttestation, }; use state_processing::state_advance::complete_state_advance; +use std::collections::VecDeque; use std::fmt; use std::sync::Mutex; use std::time::Duration; @@ -135,7 +136,7 @@ impl ForkChoiceTest { /// Inspect the queued attestations in fork choice. pub fn inspect_queued_attestations(self, mut func: F) -> Self where - F: FnMut(&[QueuedAttestation]), + F: FnMut(&VecDeque), { self.harness .chain