Skip to content

Commit 1211aaf

Browse files
committed
Backfill peer attribution (#7762)
Squashed commit of the following: commit 41ad1a9 Author: Jimmy Chen <[email protected]> Date: Thu Jul 24 22:28:27 2025 +1000 Run devnet sync test on `Kurtosis` runner. commit 646033b Author: Jimmy Chen <[email protected]> Date: Thu Jul 24 14:13:55 2025 +1000 Use self-hosted runners for Fusaka devnet testing. commit 718e703 Author: Pawan Dhananjay <[email protected]> Date: Wed Jul 23 19:41:28 2025 -0700 lint commit 5246a20 Author: Pawan Dhananjay <[email protected]> Date: Wed Jul 23 19:38:24 2025 -0700 Add max retry logic commit d5bcf9f Merge: 432f68c 9911f34 Author: Pawan Dhananjay <[email protected]> Date: Wed Jul 23 17:49:41 2025 -0700 Merge branch 'unstable' into peer-attribution-backfill commit 432f68c Author: Pawan Dhananjay <[email protected]> Date: Wed Jul 23 17:49:06 2025 -0700 Cleanup Please enter the commit message for your changes. Lines starting commit 9bd0f28 Author: Pawan Dhananjay <[email protected]> Date: Fri Jul 18 16:46:07 2025 -0500 Add retries on backfill commit 0293d0a Author: Pawan Dhananjay <[email protected]> Date: Fri Jul 18 14:26:44 2025 -0500 Address some of lion's comments from the other PR
1 parent 744e979 commit 1211aaf

File tree

6 files changed

+315
-137
lines changed

6 files changed

+315
-137
lines changed

.github/workflows/local-testnet.yml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -173,7 +173,8 @@ jobs:
173173
# Tests checkpoint syncing to a live network (current fork) and a running devnet (usually next scheduled fork)
174174
checkpoint-sync-test:
175175
name: checkpoint-sync-test-${{ matrix.network }}
176-
runs-on: ubuntu-latest
176+
# Use self-hosted runner for Fusaka devnet testing as GitHub hosted ones aren't able to keep up with the chain.
177+
runs-on: ${{ github.repository == 'sigp/lighthouse' && matrix.network == 'devnet' && fromJson('["self-hosted", "linux", "Kurtosis", "large"]') || 'ubuntu-latest' }}
177178
needs: dockerfile-ubuntu
178179
if: contains(github.event.pull_request.labels.*.name, 'syncing')
179180
continue-on-error: true

beacon_node/lighthouse_network/src/peer_manager/peerdb.rs

Lines changed: 29 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -253,15 +253,17 @@ impl<E: EthSpec> PeerDB<E> {
253253
///
254254
/// If `earliest_available_slot` info is not available, then return peer anyway assuming it has the
255255
/// required data.
256+
///
257+
/// If `allowed_peers` is `Some`, then filters for the epoch only for those peers.
256258
pub fn synced_peers_for_epoch<'a>(
257259
&'a self,
258260
epoch: Epoch,
259-
allowed_peers: &'a HashSet<PeerId>,
261+
allowed_peers: Option<&'a HashSet<PeerId>>,
260262
) -> impl Iterator<Item = &'a PeerId> {
261263
self.peers
262264
.iter()
263265
.filter(move |(peer_id, info)| {
264-
allowed_peers.contains(peer_id)
266+
allowed_peers.is_none_or(|allowed| allowed.contains(peer_id))
265267
&& info.is_connected()
266268
&& match info.sync_status() {
267269
SyncStatus::Synced { info } => {
@@ -270,7 +272,9 @@ impl<E: EthSpec> PeerDB<E> {
270272
SyncStatus::Advanced { info } => {
271273
info.has_slot(epoch.end_slot(E::slots_per_epoch()))
272274
}
273-
_ => false,
275+
SyncStatus::IrrelevantPeer
276+
| SyncStatus::Behind { .. }
277+
| SyncStatus::Unknown => false,
274278
}
275279
})
276280
.map(|(peer_id, _)| peer_id)
@@ -320,22 +324,36 @@ impl<E: EthSpec> PeerDB<E> {
320324
}
321325

322326
/// Returns an iterator of all peers that are supposed to be custodying
323-
/// the given subnet id that also belong to `allowed_peers`.
324-
pub fn good_range_sync_custody_subnet_peer<'a>(
325-
&'a self,
327+
/// the given subnet id.
328+
pub fn good_range_sync_custody_subnet_peers(
329+
&self,
326330
subnet: DataColumnSubnetId,
327-
allowed_peers: &'a HashSet<PeerId>,
328-
) -> impl Iterator<Item = &'a PeerId> {
331+
) -> impl Iterator<Item = &PeerId> {
329332
self.peers
330333
.iter()
331-
.filter(move |(peer_id, info)| {
334+
.filter(move |(_, info)| {
332335
// The custody_subnets hashset can be populated via enr or metadata
333-
let is_custody_subnet_peer = info.is_assigned_to_custody_subnet(&subnet);
334-
allowed_peers.contains(peer_id) && info.is_connected() && is_custody_subnet_peer
336+
info.is_connected() && info.is_assigned_to_custody_subnet(&subnet)
335337
})
336338
.map(|(peer_id, _)| peer_id)
337339
}
338340

341+
/// Returns `true` if the given peer is assigned to the given subnet.
342+
/// else returns `false`
343+
///
344+
/// Returns `false` if peer doesn't exist in peerdb.
345+
pub fn is_good_range_sync_custody_subnet_peer(
346+
&self,
347+
subnet: DataColumnSubnetId,
348+
peer: &PeerId,
349+
) -> bool {
350+
if let Some(info) = self.peers.get(peer) {
351+
info.is_connected() && info.is_assigned_to_custody_subnet(&subnet)
352+
} else {
353+
false
354+
}
355+
}
356+
339357
/// Gives the ids of all known disconnected peers.
340358
pub fn disconnected_peers(&self) -> impl Iterator<Item = &PeerId> {
341359
self.peers

beacon_node/network/src/sync/backfill_sync/mod.rs

Lines changed: 135 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
//! sync as failed, log an error and attempt to retry once a new peer joins the node.
1010
1111
use crate::network_beacon_processor::ChainSegmentProcessId;
12+
use crate::sync::block_sidecar_coupling::CouplingError;
1213
use crate::sync::manager::BatchProcessResult;
1314
use crate::sync::network_context::{
1415
RangeRequestId, RpcRequestSendError, RpcResponseError, SyncNetworkContext,
@@ -28,7 +29,7 @@ use std::collections::{
2829
};
2930
use std::sync::Arc;
3031
use tracing::{debug, error, info, instrument, warn};
31-
use types::{Epoch, EthSpec};
32+
use types::{ColumnIndex, Epoch, EthSpec};
3233

3334
/// Blocks are downloaded in batches from peers. This constant specifies how many epochs worth of
3435
/// blocks per batch are requested _at most_. A batch may request less blocks to account for
@@ -223,9 +224,11 @@ impl<T: BeaconChainTypes> BackFillSync<T> {
223224
.network_globals
224225
.peers
225226
.read()
226-
.synced_peers()
227+
.synced_peers_for_epoch(self.to_be_downloaded, None)
227228
.next()
228229
.is_some()
230+
// backfill can't progress if we do not have peers in the required subnets post peerdas.
231+
&& self.good_peers_on_sampling_subnets(self.to_be_downloaded, network)
229232
{
230233
// If there are peers to resume with, begin the resume.
231234
debug!(start_epoch = ?self.current_start, awaiting_batches = self.batches.len(), processing_target = ?self.processing_target, "Resuming backfill sync");
@@ -334,6 +337,48 @@ impl<T: BeaconChainTypes> BackFillSync<T> {
334337
err: RpcResponseError,
335338
) -> Result<(), BackFillError> {
336339
if let Some(batch) = self.batches.get_mut(&batch_id) {
340+
if let RpcResponseError::BlockComponentCouplingError(coupling_error) = &err {
341+
match coupling_error {
342+
CouplingError::PeerFailure {
343+
error,
344+
faulty_peers,
345+
action,
346+
} => {
347+
debug!(?batch_id, error, "Block components coupling error");
348+
// Note: we don't fail the batch here because a `CouplingError` is
349+
// recoverable by requesting from other honest peers.
350+
let mut failed_columns = HashSet::new();
351+
let mut failed_peers = HashSet::new();
352+
for (column, peer) in faulty_peers {
353+
failed_columns.insert(*column);
354+
failed_peers.insert(*peer);
355+
}
356+
for peer in failed_peers.iter() {
357+
network.report_peer(*peer, *action, "failed to return columns");
358+
}
359+
360+
return self.retry_partial_batch(
361+
network,
362+
batch_id,
363+
request_id,
364+
failed_columns,
365+
failed_peers,
366+
);
367+
}
368+
CouplingError::ExceededMaxRetries(peers, action) => {
369+
for peer in peers.iter() {
370+
network.report_peer(
371+
*peer,
372+
*action,
373+
"failed to return columns, exceeded retry attempts",
374+
);
375+
}
376+
}
377+
CouplingError::InternalError(msg) => {
378+
debug!(?batch_id, msg, "Block components coupling internal error");
379+
}
380+
}
381+
}
337382
// A batch could be retried without the peer failing the request (disconnecting/
338383
// sending an error /timeout) if the peer is removed from the chain for other
339384
// reasons. Check that this block belongs to the expected peer
@@ -903,12 +948,16 @@ impl<T: BeaconChainTypes> BackFillSync<T> {
903948
network: &mut SyncNetworkContext<T>,
904949
batch_id: BatchId,
905950
) -> Result<(), BackFillError> {
951+
if matches!(self.state(), BackFillState::Paused) {
952+
return Err(BackFillError::Paused);
953+
}
906954
if let Some(batch) = self.batches.get_mut(&batch_id) {
955+
debug!(?batch_id, "Sending backfill batch");
907956
let synced_peers = self
908957
.network_globals
909958
.peers
910959
.read()
911-
.synced_peers()
960+
.synced_peers_for_epoch(batch_id, None)
912961
.cloned()
913962
.collect::<HashSet<_>>();
914963

@@ -967,6 +1016,54 @@ impl<T: BeaconChainTypes> BackFillSync<T> {
9671016
Ok(())
9681017
}
9691018

1019+
/// Retries partial column requests within the batch by creating new requests for the failed columns.
1020+
#[instrument(parent = None,
1021+
fields(service = "backfill_sync"),
1022+
name = "backfill_sync",
1023+
skip_all
1024+
)]
1025+
pub fn retry_partial_batch(
1026+
&mut self,
1027+
network: &mut SyncNetworkContext<T>,
1028+
batch_id: BatchId,
1029+
id: Id,
1030+
failed_columns: HashSet<ColumnIndex>,
1031+
mut failed_peers: HashSet<PeerId>,
1032+
) -> Result<(), BackFillError> {
1033+
if let Some(batch) = self.batches.get_mut(&batch_id) {
1034+
failed_peers.extend(&batch.failed_peers());
1035+
let req = batch.to_blocks_by_range_request().0;
1036+
1037+
let synced_peers = network
1038+
.network_globals()
1039+
.peers
1040+
.read()
1041+
.synced_peers()
1042+
.cloned()
1043+
.collect::<HashSet<_>>();
1044+
1045+
match network.retry_columns_by_range(
1046+
id,
1047+
&synced_peers,
1048+
&failed_peers,
1049+
req,
1050+
&failed_columns,
1051+
) {
1052+
Ok(_) => {
1053+
debug!(
1054+
?batch_id,
1055+
id, "Retried column requests from different peers"
1056+
);
1057+
return Ok(());
1058+
}
1059+
Err(e) => {
1060+
debug!(?batch_id, id, e, "Failed to retry partial batch");
1061+
}
1062+
}
1063+
}
1064+
Ok(())
1065+
}
1066+
9701067
/// When resuming a chain, this function searches for batches that need to be re-downloaded and
9711068
/// transitions their state to redownload the batch.
9721069
#[instrument(parent = None,
@@ -1057,6 +1154,11 @@ impl<T: BeaconChainTypes> BackFillSync<T> {
10571154
return None;
10581155
}
10591156

1157+
if !self.good_peers_on_sampling_subnets(self.to_be_downloaded, network) {
1158+
debug!("Waiting for peers to be available on custody column subnets");
1159+
return None;
1160+
}
1161+
10601162
let batch_id = self.to_be_downloaded;
10611163
// this batch could have been included already being an optimistic batch
10621164
match self.batches.entry(batch_id) {
@@ -1089,6 +1191,36 @@ impl<T: BeaconChainTypes> BackFillSync<T> {
10891191
}
10901192
}
10911193

1194+
/// Checks all sampling column subnets for peers. Returns `true` if there is at least one peer in
1195+
/// every sampling column subnet.
1196+
///
1197+
/// Returns `true` if peerdas isn't enabled for the epoch.
1198+
fn good_peers_on_sampling_subnets(
1199+
&self,
1200+
epoch: Epoch,
1201+
network: &SyncNetworkContext<T>,
1202+
) -> bool {
1203+
if network.chain.spec.is_peer_das_enabled_for_epoch(epoch) {
1204+
// Require peers on all sampling column subnets before sending batches
1205+
let peers_on_all_custody_subnets = network
1206+
.network_globals()
1207+
.sampling_subnets()
1208+
.iter()
1209+
.all(|subnet_id| {
1210+
let peer_count = network
1211+
.network_globals()
1212+
.peers
1213+
.read()
1214+
.good_range_sync_custody_subnet_peers(*subnet_id)
1215+
.count();
1216+
peer_count > 0
1217+
});
1218+
peers_on_all_custody_subnets
1219+
} else {
1220+
true
1221+
}
1222+
}
1223+
10921224
/// Resets the start epoch based on the beacon chain.
10931225
///
10941226
/// This errors if the beacon chain indicates that backfill sync has already completed or is

0 commit comments

Comments
 (0)