Skip to content

Commit 3de646c

Browse files
authored
Enable reconstruction for nodes custodying more than 50% of columns and instrument tracing (#8052)
Co-Authored-By: Jimmy Chen <[email protected]> Co-Authored-By: Jimmy Chen <[email protected]>
1 parent 242bdfc commit 3de646c

File tree

7 files changed

+76
-72
lines changed

7 files changed

+76
-72
lines changed

beacon_node/beacon_chain/src/beacon_chain.rs

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3299,10 +3299,14 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
32993299

33003300
let data_availability_checker = self.data_availability_checker.clone();
33013301

3302+
let current_span = Span::current();
33023303
let result = self
33033304
.task_executor
33043305
.spawn_blocking_handle(
3305-
move || data_availability_checker.reconstruct_data_columns(&block_root),
3306+
move || {
3307+
let _guard = current_span.enter();
3308+
data_availability_checker.reconstruct_data_columns(&block_root)
3309+
},
33063310
"reconstruct_data_columns",
33073311
)
33083312
.ok_or(BeaconChainError::RuntimeShutdown)?

beacon_node/beacon_chain/src/data_availability_checker.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -547,6 +547,7 @@ impl<T: BeaconChainTypes> DataAvailabilityChecker<T> {
547547
}
548548
}
549549

550+
#[instrument(skip_all, level = "debug")]
550551
pub fn reconstruct_data_columns(
551552
&self,
552553
block_root: &Hash256,

beacon_node/beacon_chain/src/data_availability_checker/overflow_lru_cache.rs

Lines changed: 16 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -592,9 +592,9 @@ impl<T: BeaconChainTypes> DataAvailabilityCheckerInner<T> {
592592

593593
/// Check whether data column reconstruction should be attempted.
594594
///
595-
/// Potentially trigger reconstruction if:
596-
/// - Our custody requirement is all columns (supernode), and we haven't got all columns
597-
/// - We have >= 50% of columns, but not all columns
595+
/// Potentially trigger reconstruction if all the following satisfy:
596+
/// - Our custody requirement is more than 50% of total columns,
597+
/// - We haven't received all required columns
598598
/// - Reconstruction hasn't been started for the block
599599
///
600600
/// If reconstruction is required, returns `PendingComponents` which contains the
@@ -609,15 +609,25 @@ impl<T: BeaconChainTypes> DataAvailabilityCheckerInner<T> {
609609
return ReconstructColumnsDecision::No("block already imported");
610610
};
611611

612-
// If we're sampling all columns, it means we must be custodying all columns.
612+
let Some(epoch) = pending_components
613+
.verified_data_columns
614+
.first()
615+
.map(|c| c.as_data_column().epoch())
616+
else {
617+
return ReconstructColumnsDecision::No("not enough columns");
618+
};
619+
613620
let total_column_count = T::EthSpec::number_of_columns();
621+
let sampling_column_count = self
622+
.custody_context
623+
.num_of_data_columns_to_sample(epoch, &self.spec);
614624
let received_column_count = pending_components.verified_data_columns.len();
615625

616626
if pending_components.reconstruction_started {
617627
return ReconstructColumnsDecision::No("already started");
618628
}
619-
if received_column_count >= total_column_count {
620-
return ReconstructColumnsDecision::No("all columns received");
629+
if received_column_count >= sampling_column_count {
630+
return ReconstructColumnsDecision::No("all sampling columns received");
621631
}
622632
if received_column_count < total_column_count / 2 {
623633
return ReconstructColumnsDecision::No("not enough columns");

beacon_node/beacon_chain/src/validator_custody.rs

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -130,7 +130,7 @@ pub struct CustodyContext<E: EthSpec> {
130130
/// and enr values.
131131
validator_custody_count: AtomicU64,
132132
/// Is the node run as a supernode based on current cli parameters.
133-
pub current_is_supernode: bool,
133+
current_is_supernode: bool,
134134
/// The persisted value for `is_supernode` based on the previous run of this node.
135135
///
136136
/// Note: We require this value because if a user restarts the node with a higher cli custody
@@ -307,6 +307,14 @@ impl<E: EthSpec> CustodyContext<E> {
307307
.expect("should compute node sampling size from valid chain spec")
308308
}
309309

310+
/// Returns whether the node should attempt reconstruction at a given epoch.
311+
pub fn should_attempt_reconstruction(&self, epoch: Epoch, spec: &ChainSpec) -> bool {
312+
let min_columns_for_reconstruction = E::number_of_columns() / 2;
313+
// performing reconstruction is not necessary if sampling column count is exactly 50%,
314+
// because the node doesn't need the remaining columns.
315+
self.num_of_data_columns_to_sample(epoch, spec) > min_columns_for_reconstruction
316+
}
317+
310318
/// Returns the ordered list of column indices that should be sampled for data availability checking at the given epoch.
311319
///
312320
/// # Parameters

beacon_node/network/src/network_beacon_processor/gossip_methods.rs

Lines changed: 36 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,6 @@ use std::path::PathBuf;
3434
use std::sync::Arc;
3535
use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
3636
use store::hot_cold_store::HotColdDBError;
37-
use tokio::sync::mpsc::error::TrySendError;
3837
use tracing::{Instrument, Span, debug, error, info, instrument, trace, warn};
3938
use types::{
4039
Attestation, AttestationData, AttestationRef, AttesterSlashing, BlobSidecar, DataColumnSidecar,
@@ -1054,36 +1053,43 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
10541053
"Processed data column, waiting for other components"
10551054
);
10561055

1057-
// Instead of triggering reconstruction immediately, schedule it to be run. If
1058-
// another column arrives it either completes availability or pushes
1059-
// reconstruction back a bit.
1060-
let cloned_self = Arc::clone(self);
1061-
let block_root = *block_root;
1062-
let send_result = self.beacon_processor_send.try_send(WorkEvent {
1063-
drop_during_sync: false,
1064-
work: Work::Reprocess(ReprocessQueueMessage::DelayColumnReconstruction(
1065-
QueuedColumnReconstruction {
1066-
block_root,
1067-
slot: *slot,
1068-
process_fn: Box::pin(async move {
1069-
cloned_self
1070-
.attempt_data_column_reconstruction(block_root)
1071-
.await;
1072-
}),
1073-
},
1074-
)),
1075-
});
1076-
if let Err(TrySendError::Full(WorkEvent {
1077-
work:
1078-
Work::Reprocess(ReprocessQueueMessage::DelayColumnReconstruction(
1079-
reconstruction,
1080-
)),
1081-
..
1082-
})) = send_result
1056+
if self
1057+
.chain
1058+
.data_availability_checker
1059+
.custody_context()
1060+
.should_attempt_reconstruction(
1061+
slot.epoch(T::EthSpec::slots_per_epoch()),
1062+
&self.chain.spec,
1063+
)
10831064
{
1084-
warn!("Unable to send reconstruction to reprocessing");
1085-
// Execute it immediately instead.
1086-
reconstruction.process_fn.await;
1065+
// Instead of triggering reconstruction immediately, schedule it to be run. If
1066+
// another column arrives, it either completes availability or pushes
1067+
// reconstruction back a bit.
1068+
let cloned_self = Arc::clone(self);
1069+
let block_root = *block_root;
1070+
1071+
if self
1072+
.beacon_processor_send
1073+
.try_send(WorkEvent {
1074+
drop_during_sync: false,
1075+
work: Work::Reprocess(
1076+
ReprocessQueueMessage::DelayColumnReconstruction(
1077+
QueuedColumnReconstruction {
1078+
block_root,
1079+
slot: *slot,
1080+
process_fn: Box::pin(async move {
1081+
cloned_self
1082+
.attempt_data_column_reconstruction(block_root)
1083+
.await;
1084+
}),
1085+
},
1086+
),
1087+
),
1088+
})
1089+
.is_err()
1090+
{
1091+
warn!("Unable to send reconstruction to reprocessing");
1092+
}
10871093
}
10881094
}
10891095
},

beacon_node/network/src/network_beacon_processor/mod.rs

Lines changed: 9 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ use std::sync::Arc;
2828
use std::time::Duration;
2929
use task_executor::TaskExecutor;
3030
use tokio::sync::mpsc::{self, error::TrySendError};
31-
use tracing::{debug, error, trace, warn};
31+
use tracing::{debug, error, instrument, trace, warn};
3232
use types::*;
3333

3434
pub use sync_methods::ChainSegmentProcessId;
@@ -825,30 +825,12 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
825825
}
826826
}
827827

828-
/// Attempt to reconstruct all data columns if the following conditions satisfies:
829-
/// - Our custody requirement is all columns
830-
/// - We have >= 50% of columns, but not all columns
831-
///
832-
/// Returns `Some(AvailabilityProcessingStatus)` if reconstruction is successfully performed,
833-
/// otherwise returns `None`.
834-
///
835-
/// The `publish_columns` parameter controls whether reconstructed columns should be published
836-
/// to the gossip network.
837-
async fn attempt_data_column_reconstruction(
838-
self: &Arc<Self>,
839-
block_root: Hash256,
840-
) -> Option<AvailabilityProcessingStatus> {
841-
// Only supernodes attempt reconstruction
842-
if !self
843-
.chain
844-
.data_availability_checker
845-
.custody_context()
846-
.current_is_supernode
847-
{
848-
return None;
849-
}
850-
828+
/// Attempts to reconstruct all data columns if the conditions checked in
829+
/// [`DataAvailabilityCheckerInner::check_and_set_reconstruction_started`] are satisfied.
830+
#[instrument(level = "debug", skip_all, fields(?block_root))]
831+
async fn attempt_data_column_reconstruction(self: &Arc<Self>, block_root: Hash256) {
851832
let result = self.chain.reconstruct_data_columns(block_root).await;
833+
852834
match result {
853835
Ok(Some((availability_processing_status, data_columns_to_publish))) => {
854836
self.publish_data_columns_gradually(data_columns_to_publish, block_root);
@@ -864,29 +846,25 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
864846
AvailabilityProcessingStatus::MissingComponents(_, _) => {
865847
debug!(
866848
result = "imported all custody columns",
867-
block_hash = %block_root,
849+
%block_root,
868850
"Block components still missing block after reconstruction"
869851
);
870852
}
871853
}
872-
873-
Some(availability_processing_status)
874854
}
875855
Ok(None) => {
876856
// reason is tracked via the `KZG_DATA_COLUMN_RECONSTRUCTION_INCOMPLETE_TOTAL` metric
877857
trace!(
878-
block_hash = %block_root,
858+
%block_root,
879859
"Reconstruction not required for block"
880860
);
881-
None
882861
}
883862
Err(e) => {
884863
error!(
885864
%block_root,
886865
error = ?e,
887866
"Error during data column reconstruction"
888867
);
889-
None
890868
}
891869
}
892870
}
@@ -975,6 +953,7 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
975953
/// by some nodes on the network as soon as possible. Our hope is that some columns arrive from
976954
/// other nodes in the meantime, obviating the need for us to publish them. If no other
977955
/// publisher exists for a column, it will eventually get published here.
956+
#[instrument(level="debug", skip_all, fields(?block_root, data_column_count=data_columns_to_publish.len()))]
978957
fn publish_data_columns_gradually(
979958
self: &Arc<Self>,
980959
mut data_columns_to_publish: DataColumnSidecarList<T::EthSpec>,

beacon_node/network/src/network_beacon_processor/tests.rs

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1009,10 +1009,6 @@ async fn import_gossip_block_acceptably_early() {
10091009
rig.assert_event_journal_completes(&[WorkType::GossipDataColumnSidecar])
10101010
.await;
10111011
}
1012-
if num_data_columns > 0 {
1013-
rig.assert_event_journal_completes(&[WorkType::ColumnReconstruction])
1014-
.await;
1015-
}
10161012

10171013
// Note: this section of the code is a bit race-y. We're assuming that we can set the slot clock
10181014
// and check the head in the time between the block arrived early and when its due for

0 commit comments

Comments
 (0)