Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 5 additions & 0 deletions consensus/fork_choice/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,5 +18,10 @@ types = { workspace = true }

[dev-dependencies]
beacon_chain = { workspace = true }
criterion = { workspace = true }
store = { workspace = true }
tokio = { workspace = true }

[[bench]]
name = "benches"
harness = false
58 changes: 58 additions & 0 deletions consensus/fork_choice/benches/benches.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
use criterion::{BenchmarkId, Criterion, criterion_group, criterion_main};
use fork_choice::{QueuedAttestation, dequeue_attestations};
use std::collections::VecDeque;
use types::{Epoch, Hash256, Slot};

fn all_benches(c: &mut Criterion) {
let num_attestations = 1_500_000_usize / 16;
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is ~2 slots worth of attestations on a 1.5M validator network

let unique_slots = 2_usize;
let num_iterations = 64;
let attestations_per_slot = num_attestations / unique_slots;
let queued_attestations = (0..num_attestations)
.map(|i| QueuedAttestation {
slot: Slot::from(i / attestations_per_slot),
attesting_indices: vec![],
block_root: Hash256::ZERO,
target_epoch: Epoch::new(0),
})
.collect::<VecDeque<_>>();

c.bench_with_input(
BenchmarkId::new("dequeue_attestations", num_attestations),
&queued_attestations,
|b, attestations| {
b.iter(|| {
// Simulate dequeueing and queuing of attestations over multiple slots.
let mut attestations = attestations.clone();
let end_slot = unique_slots;
for i in 1..=num_iterations {
let dequeued = dequeue_attestations(Slot::from(i), &mut attestations);
assert_eq!(dequeued.len(), attestations_per_slot);

// Capacity should be unchanged.
assert_eq!(attestations.capacity(), num_attestations);

let next_slot = end_slot + i - 1;
let new_attestations = std::iter::repeat_n(
QueuedAttestation {
slot: Slot::from(next_slot),
attesting_indices: vec![],
block_root: Hash256::ZERO,
target_epoch: Epoch::new(0),
},
attestations_per_slot,
);

for attestation in new_attestations {
attestations.push_back(attestation);
}

assert_eq!(attestations.len(), num_attestations);
}
})
},
);
}

criterion_group!(benches, all_benches);
criterion_main!(benches);
64 changes: 37 additions & 27 deletions consensus/fork_choice/src/fork_choice.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use state_processing::{
per_block_processing::errors::AttesterSlashingValidationError, per_epoch_processing,
};
use std::cmp::Ordering;
use std::collections::BTreeSet;
use std::collections::{BTreeSet, VecDeque};
use std::marker::PhantomData;
use std::time::Duration;
use superstruct::superstruct;
Expand Down Expand Up @@ -234,12 +234,12 @@ fn compute_start_slot_at_epoch<E: EthSpec>(epoch: Epoch) -> Slot {

/// Used for queuing attestations from the current slot. Only contains the minimum necessary
/// information about the attestation.
#[derive(Clone, PartialEq, Encode, Decode)]
#[derive(Debug, Clone, PartialEq, Encode, Decode)]
pub struct QueuedAttestation {
slot: Slot,
attesting_indices: Vec<u64>,
block_root: Hash256,
target_epoch: Epoch,
pub slot: Slot,
pub attesting_indices: Vec<u64>,
pub block_root: Hash256,
pub target_epoch: Epoch,
}

impl<'a, E: EthSpec> From<IndexedAttestationRef<'a, E>> for QueuedAttestation {
Expand All @@ -253,25 +253,35 @@ impl<'a, E: EthSpec> From<IndexedAttestationRef<'a, E>> for QueuedAttestation {
}
}

/// Returns all values in `self.queued_attestations` that have a slot that is earlier than the
/// current slot. Also removes those values from `self.queued_attestations`.
fn dequeue_attestations(
/// Returns all values in `queued_attestations` that have a slot that is earlier than the
/// current slot. Also removes those values from `queued_attestations`.
pub fn dequeue_attestations(
current_slot: Slot,
queued_attestations: &mut Vec<QueuedAttestation>,
) -> Vec<QueuedAttestation> {
let remaining = queued_attestations.split_off(
queued_attestations
.iter()
.position(|a| a.slot >= current_slot)
.unwrap_or(queued_attestations.len()),
);
queued_attestations: &mut VecDeque<QueuedAttestation>,
) -> VecDeque<QueuedAttestation> {
// Find the position of the first attestation to keep in the queue, or equivalently the number
// of attestations to pop from the front of the queue.
//
// We are safe to use `partition_point` as we know the queue is sorted by ascending slot.
// Benchmarks show that `partition_point` is substantially faster (-18%) than using `find`.
let to_pop = queued_attestations.partition_point(|a| a.slot < current_slot);

// Rotate the entries to remove into the *end* of the vec deque.
queued_attestations.rotate_left(to_pop);

// Use `split_off` to remove the attestations we want to pop from the end of the queue, while
// keeping the same allocation for `queued_attestations` (preserving the capacity so we don't
// need to reallocate on future pushes).
let to_keep = queued_attestations.len().saturating_sub(to_pop);

let popped = queued_attestations.split_off(to_keep);

metrics::inc_counter_by(
&metrics::FORK_CHOICE_DEQUEUED_ATTESTATIONS,
queued_attestations.len() as u64,
popped.len() as u64,
);

std::mem::replace(queued_attestations, remaining)
popped
}

/// Denotes whether an attestation we are processing was received from a block or from gossip.
Expand Down Expand Up @@ -317,7 +327,7 @@ pub struct ForkChoice<T, E> {
/// The underlying representation of the block DAG.
proto_array: ProtoArrayForkChoice,
/// Attestations that arrived at the current slot and must be queued for later processing.
queued_attestations: Vec<QueuedAttestation>,
queued_attestations: VecDeque<QueuedAttestation>,
/// Stores a cache of the values required to be sent to the execution layer.
forkchoice_update_parameters: ForkchoiceUpdateParameters,
_phantom: PhantomData<E>,
Expand Down Expand Up @@ -399,7 +409,7 @@ where
let mut fork_choice = Self {
fc_store,
proto_array,
queued_attestations: vec![],
queued_attestations: VecDeque::new(),
// This will be updated during the next call to `Self::get_head`.
forkchoice_update_parameters: ForkchoiceUpdateParameters {
head_hash: None,
Expand Down Expand Up @@ -1115,7 +1125,7 @@ where
// Delay consideration in the fork choice until their slot is in the past.
// ```
self.queued_attestations
.push(QueuedAttestation::from(attestation));
.push_back(QueuedAttestation::from(attestation));
}

Ok(())
Expand Down Expand Up @@ -1384,7 +1394,7 @@ where
}

/// Returns a reference to the currently queued attestations.
pub fn queued_attestations(&self) -> &[QueuedAttestation] {
pub fn queued_attestations(&self) -> &VecDeque<QueuedAttestation> {
&self.queued_attestations
}

Expand Down Expand Up @@ -1471,7 +1481,7 @@ where
let mut fork_choice = Self {
fc_store,
proto_array,
queued_attestations: persisted.queued_attestations,
queued_attestations: persisted.queued_attestations.into(),
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This conversion Vec => VecDeque is trivial and cheap, but only happens once on startup anyway.

// Will be updated in the following call to `Self::get_head`.
forkchoice_update_parameters: ForkchoiceUpdateParameters {
head_hash: None,
Expand Down Expand Up @@ -1513,7 +1523,7 @@ where
proto_array: self
.proto_array()
.as_ssz_container(self.justified_checkpoint(), self.finalized_checkpoint()),
queued_attestations: self.queued_attestations().to_vec(),
queued_attestations: self.queued_attestations().iter().cloned().collect(),
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is potentially slower than if we implemented Encode for VecDeque. The conversion from VecDeque to Vec is non-trivial in most cases where the VecDeque doesn't start at index 0:

https://doc.rust-lang.org/std/collections/struct.VecDeque.html#impl-From%3CVecDeque%3CT,+A%3E%3E-for-Vec%3CT,+A%3E

I'll check fork choice persistence times to make sure this isn't having a substantial impact.

}
}

Expand Down Expand Up @@ -1596,7 +1606,7 @@ mod tests {
}
}

fn get_queued_attestations() -> Vec<QueuedAttestation> {
fn get_queued_attestations() -> VecDeque<QueuedAttestation> {
(1..4)
.map(|i| QueuedAttestation {
slot: Slot::new(i),
Expand All @@ -1607,7 +1617,7 @@ mod tests {
.collect()
}

fn get_slots(queued_attestations: &[QueuedAttestation]) -> Vec<u64> {
fn get_slots(queued_attestations: &VecDeque<QueuedAttestation>) -> Vec<u64> {
queued_attestations.iter().map(|a| a.slot.into()).collect()
}

Expand Down
1 change: 1 addition & 0 deletions consensus/fork_choice/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ pub use crate::fork_choice::{
AttestationFromBlock, Error, ForkChoice, ForkChoiceView, ForkchoiceUpdateParameters,
InvalidAttestation, InvalidBlock, PayloadVerificationStatus, PersistedForkChoice,
PersistedForkChoiceV17, PersistedForkChoiceV28, QueuedAttestation, ResetPayloadStatuses,
dequeue_attestations,
};
pub use fork_choice_store::ForkChoiceStore;
pub use proto_array::{
Expand Down
3 changes: 2 additions & 1 deletion consensus/fork_choice/tests/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ use fork_choice::{
ForkChoiceStore, InvalidAttestation, InvalidBlock, PayloadVerificationStatus, QueuedAttestation,
};
use state_processing::state_advance::complete_state_advance;
use std::collections::VecDeque;
use std::fmt;
use std::sync::Mutex;
use std::time::Duration;
Expand Down Expand Up @@ -135,7 +136,7 @@ impl ForkChoiceTest {
/// Inspect the queued attestations in fork choice.
pub fn inspect_queued_attestations<F>(self, mut func: F) -> Self
where
F: FnMut(&[QueuedAttestation]),
F: FnMut(&VecDeque<QueuedAttestation>),
{
self.harness
.chain
Expand Down