From 2c0311e2542253b58b0de6f084c54a421b3de9e2 Mon Sep 17 00:00:00 2001 From: SunnysidedJ Date: Thu, 17 Apr 2025 02:05:39 +0100 Subject: [PATCH 1/6] init commit, test fails --- .../network_beacon_processor/rpc_methods.rs | 12 ++++ .../src/network_beacon_processor/tests.rs | 56 +++++++++++++++++-- 2 files changed, 64 insertions(+), 4 deletions(-) diff --git a/beacon_node/network/src/network_beacon_processor/rpc_methods.rs b/beacon_node/network/src/network_beacon_processor/rpc_methods.rs index bc97f884922..ffc2f2c58b7 100644 --- a/beacon_node/network/src/network_beacon_processor/rpc_methods.rs +++ b/beacon_node/network/src/network_beacon_processor/rpc_methods.rs @@ -880,6 +880,7 @@ impl NetworkBeaconProcessor { "Received BlobsByRange Request" ); + // Check Deneb is enabled. Blobs are available since then. let request_start_slot = Slot::from(req.start_slot); let data_availability_boundary_slot = match self.chain.data_availability_boundary() { @@ -917,6 +918,17 @@ impl NetworkBeaconProcessor { }; } + // Check Fulu/PeerDAS is enabled. Blobs are not stored since then. + // Only need to check if the last slot is a Fulu slot or above. + let request_end_slot = Slot::from(req.start_slot + req.count); + let request_end_epoch = request_end_slot.epoch(T::EthSpec::slots_per_epoch()); + if self.chain.spec.is_peer_das_enabled_for_epoch(request_end_epoch) { + return Err(( + RpcErrorResponse::InvalidRequest, + "Req including Fulu slots", + )) + } + let block_roots = self.get_block_roots_for_slot_range(req.start_slot, req.count, "BlobsByRange")?; diff --git a/beacon_node/network/src/network_beacon_processor/tests.rs b/beacon_node/network/src/network_beacon_processor/tests.rs index 292e894870f..077da2b537d 100644 --- a/beacon_node/network/src/network_beacon_processor/tests.rs +++ b/beacon_node/network/src/network_beacon_processor/tests.rs @@ -1,4 +1,4 @@ -#![cfg(not(debug_assertions))] // Tests are too slow in debug. +//#![cfg(not(debug_assertions))] // Tests are too slow in debug. #![cfg(test)] use crate::{ @@ -429,13 +429,13 @@ impl TestRig { } } - pub fn enqueue_blobs_by_range_request(&self, count: u64) { + pub fn enqueue_blobs_by_range_request(&self, start_slot: u64, count: u64) { self.network_beacon_processor .send_blobs_by_range_request( PeerId::random(), InboundRequestId::new_unchecked(42, 24), BlobsByRangeRequest { - start_slot: 0, + start_slot, count, }, ) @@ -1216,9 +1216,55 @@ async fn test_blobs_by_range() { if test_spec::().deneb_fork_epoch.is_none() { return; }; + let rig_slot = 64; + let mut rig = TestRig::new(rig_slot).await; + let slot_count = 32; + // can I use slots_per_epoch from types::EthSpec? + rig.enqueue_blobs_by_range_request(0, slot_count); + + let mut blob_count = 0; + for slot in 0..slot_count { + let root = rig + .chain + .block_root_at_slot(Slot::new(slot), WhenSlotSkipped::None) + .unwrap(); + blob_count += root + .map(|root| { + rig.chain + .get_blobs(&root) + .map(|list| list.len()) + .unwrap_or(0) + }) + .unwrap_or(0); + } + let mut actual_count = 0; + while let Some(next) = rig._network_rx.recv().await { + if let NetworkMessage::SendResponse { + peer_id: _, + response: Response::BlobsByRange(blob), + inbound_request_id: _, + } = next + { + if blob.is_some() { + actual_count += 1; + } else { + break; + } + } else { + panic!("unexpected message {:?}", next); + } + } + assert_eq!(blob_count, actual_count); +} + +#[tokio::test] +async fn test_blobs_by_range_fulu() { + if !test_spec::().is_peer_das_scheduled() { + return; + }; let mut rig = TestRig::new(64).await; let slot_count = 32; - rig.enqueue_blobs_by_range_request(slot_count); + rig.enqueue_blobs_by_range_request(0, slot_count); let mut blob_count = 0; for slot in 0..slot_count { @@ -1252,5 +1298,7 @@ async fn test_blobs_by_range() { panic!("unexpected message {:?}", next); } } + assert_eq!(blob_count, 0); + assert_eq!(actual_count, 0); assert_eq!(blob_count, actual_count); } From f79d74182b9318ec5834ff00b49d5844ad13e073 Mon Sep 17 00:00:00 2001 From: SunnysidedJ Date: Wed, 30 Apr 2025 21:28:37 +0900 Subject: [PATCH 2/6] implementation complete with existing test passed --- beacon_node/beacon_chain/src/beacon_chain.rs | 5 ++++ .../network_beacon_processor/rpc_methods.rs | 23 +++++++++++-------- 2 files changed, 19 insertions(+), 9 deletions(-) diff --git a/beacon_node/beacon_chain/src/beacon_chain.rs b/beacon_node/beacon_chain/src/beacon_chain.rs index 64ef5ef17e5..bd1c83ebb33 100644 --- a/beacon_node/beacon_chain/src/beacon_chain.rs +++ b/beacon_node/beacon_chain/src/beacon_chain.rs @@ -7146,6 +7146,11 @@ impl BeaconChain { let end_slot = start_slot.saturating_add(count); let mut roots = vec![]; + // let's explicitly check count = 0 since it's a public function for readabiilty purpose. + if count == 0 { + return roots; + } + for (root, slot) in block_roots_iter { if slot < end_slot && slot >= start_slot { roots.push(root); diff --git a/beacon_node/network/src/network_beacon_processor/rpc_methods.rs b/beacon_node/network/src/network_beacon_processor/rpc_methods.rs index ffc2f2c58b7..3aae137780e 100644 --- a/beacon_node/network/src/network_beacon_processor/rpc_methods.rs +++ b/beacon_node/network/src/network_beacon_processor/rpc_methods.rs @@ -880,9 +880,16 @@ impl NetworkBeaconProcessor { "Received BlobsByRange Request" ); - // Check Deneb is enabled. Blobs are available since then. + if req.count == 0 { + return Err((RpcErrorResponse::InvalidRequest, "Request count is zero")); + } + + let mut req = req; let request_start_slot = Slot::from(req.start_slot); + let request_end_slot = Slot::from(req.start_slot + req.count - 1); + let request_end_epoch = request_end_slot.epoch(T::EthSpec::slots_per_epoch()); + // Check Deneb is enabled. Blobs are available since then. let data_availability_boundary_slot = match self.chain.data_availability_boundary() { Some(boundary) => boundary.start_slot(T::EthSpec::slots_per_epoch()), None => { @@ -918,15 +925,13 @@ impl NetworkBeaconProcessor { }; } - // Check Fulu/PeerDAS is enabled. Blobs are not stored since then. - // Only need to check if the last slot is a Fulu slot or above. - let request_end_slot = Slot::from(req.start_slot + req.count); - let request_end_epoch = request_end_slot.epoch(T::EthSpec::slots_per_epoch()); + // Check Fulu/PeerDAS is in the range. Blobs are not served since then. if self.chain.spec.is_peer_das_enabled_for_epoch(request_end_epoch) { - return Err(( - RpcErrorResponse::InvalidRequest, - "Req including Fulu slots", - )) + // Fulu epoch is Some type by the condition above. + let fulu_epoch = self.chain.spec.fulu_fork_epoch.unwrap(); + let fulu_start_slot = fulu_epoch.start_slot(T::EthSpec::slots_per_epoch()); + // See the justification for the formula in PR https://github.com/sigp/lighthouse/pull/7328 + req.count = fulu_start_slot.as_u64().saturating_sub(req.start_slot); } let block_roots = From 6d18d892d75e0a203d6fb36937ec78ccdcd379ed Mon Sep 17 00:00:00 2001 From: SunnysidedJ Date: Fri, 2 May 2025 13:10:50 +0900 Subject: [PATCH 3/6] tests blocked. non-zero start slot not working --- .../src/network_beacon_processor/tests.rs | 122 ++++++++++-------- 1 file changed, 71 insertions(+), 51 deletions(-) diff --git a/beacon_node/network/src/network_beacon_processor/tests.rs b/beacon_node/network/src/network_beacon_processor/tests.rs index 077da2b537d..87a55589432 100644 --- a/beacon_node/network/src/network_beacon_processor/tests.rs +++ b/beacon_node/network/src/network_beacon_processor/tests.rs @@ -26,11 +26,12 @@ use lighthouse_network::{ Client, MessageId, NetworkConfig, NetworkGlobals, PeerId, Response, }; use slot_clock::SlotClock; +use tracing::debug; use std::iter::Iterator; use std::sync::Arc; use std::time::Duration; use tokio::sync::mpsc; -use types::blob_sidecar::FixedBlobSidecarList; +use types::{blob_sidecar::FixedBlobSidecarList, ChainSpec}; use types::{ Attestation, AttesterSlashing, BlobSidecar, BlobSidecarList, DataColumnSidecarList, DataColumnSubnetId, Epoch, Hash256, MainnetEthSpec, ProposerSlashing, SignedAggregateAndProof, @@ -91,11 +92,28 @@ impl TestRig { } pub async fn new_parametric(chain_length: u64, enable_backfill_rate_limiting: bool) -> Self { - // This allows for testing voluntary exits without building out a massive chain. let mut spec = test_spec::(); spec.shard_committee_period = 2; let spec = Arc::new(spec); + Self::new_parametric_inner(spec, chain_length, enable_backfill_rate_limiting).await + } + + pub async fn new_parametric_with_custom_fulu_epoch( + chain_length: u64, + enable_backfill_rate_limiting: bool, + fulu_epoch: Epoch + ) -> Self { + let mut spec = test_spec::(); + spec.shard_committee_period = 2; + spec.fulu_fork_epoch = Some(fulu_epoch); + let spec = Arc::new(spec); + + Self::new_parametric_inner(spec, chain_length, enable_backfill_rate_limiting).await + } + + async fn new_parametric_inner(spec: Arc, chain_length: u64, enable_backfill_rate_limiting: bool) -> Self { + // This allows for testing voluntary exits without building out a massive chain. let harness = BeaconChainHarness::builder(MainnetEthSpec) .spec(spec.clone()) .deterministic_keypairs(VALIDATOR_COUNT) @@ -1211,23 +1229,21 @@ async fn test_backfill_sync_processing_rate_limiting_disabled() { .await; } -#[tokio::test] -async fn test_blobs_by_range() { - if test_spec::().deneb_fork_epoch.is_none() { - return; - }; - let rig_slot = 64; - let mut rig = TestRig::new(rig_slot).await; - let slot_count = 32; - // can I use slots_per_epoch from types::EthSpec? - rig.enqueue_blobs_by_range_request(0, slot_count); +async fn test_blobs_by_range( + rig: &mut TestRig, + start_slot: u64, + slot_count: u64, + //blob_count_expected: usize, +) { + rig.enqueue_blobs_by_range_request(start_slot, slot_count); let mut blob_count = 0; - for slot in 0..slot_count { + for slot in start_slot..slot_count { let root = rig .chain .block_root_at_slot(Slot::new(slot), WhenSlotSkipped::None) .unwrap(); + debug!("slot and root {} {:?}", slot, root); blob_count += root .map(|root| { rig.chain @@ -1254,51 +1270,55 @@ async fn test_blobs_by_range() { panic!("unexpected message {:?}", next); } } + debug!("{} {} {} {}", start_slot, slot_count, blob_count, actual_count); assert_eq!(blob_count, actual_count); } #[tokio::test] -async fn test_blobs_by_range_fulu() { - if !test_spec::().is_peer_das_scheduled() { +async fn test_blobs_by_range_pre_fulu() { + if test_spec::().deneb_fork_epoch.is_none() { return; }; - let mut rig = TestRig::new(64).await; + let rig_slot = 64; + let mut rig = TestRig::new(rig_slot).await; let slot_count = 32; - rig.enqueue_blobs_by_range_request(0, slot_count); + test_blobs_by_range(&mut rig, 0, slot_count).await; + test_blobs_by_range(&mut rig, 0, slot_count).await; + test_blobs_by_range(&mut rig, 0, 16).await; + test_blobs_by_range(&mut rig, 8, 16).await; + test_blobs_by_range(&mut rig, 32, slot_count).await; +} - let mut blob_count = 0; - for slot in 0..slot_count { - let root = rig - .chain - .block_root_at_slot(Slot::new(slot), WhenSlotSkipped::None) - .unwrap(); - blob_count += root - .map(|root| { - rig.chain - .get_blobs(&root) - .map(|list| list.len()) - .unwrap_or(0) - }) - .unwrap_or(0); - } - let mut actual_count = 0; - while let Some(next) = rig._network_rx.recv().await { - if let NetworkMessage::SendResponse { - peer_id: _, - response: Response::BlobsByRange(blob), - inbound_request_id: _, - } = next - { - if blob.is_some() { - actual_count += 1; - } else { - break; - } - } else { - panic!("unexpected message {:?}", next); - } +#[tokio::test] +async fn test_blobs_by_range_fulu() { + //if !test_spec::().is_peer_das_scheduled() { + // return; + //} + + let fulu_epoch = Slot::new(SLOTS_PER_EPOCH * 4).epoch(SLOTS_PER_EPOCH); + + let pre_fulu_start_slot = 0;//SLOTS_PER_EPOCH;// + (3 % SLOTS_PER_EPOCH); + let fulu_start_slot = SLOTS_PER_EPOCH * 6;// + (7 % SLOTS_PER_EPOCH); + let slot_count = SLOTS_PER_EPOCH;// + 1; + let slot_count_long = SLOTS_PER_EPOCH * 5;// + (2 % SLOTS_PER_EPOCH); + + let test_cases = vec![ + (0, slot_count_long), + //(pre_fulu_start_slot, slot_count), + //(pre_fulu_start_slot, 0), + //(pre_fulu_start_slot, slot_count_long), + //(fulu_start_slot, slot_count), + //(fulu_start_slot, 0), + ]; + + for (start_slot, count) in test_cases { + let mut rig = TestRig::new_parametric_with_custom_fulu_epoch( + SLOTS_PER_EPOCH * 8, + BeaconProcessorConfig::default().enable_backfill_rate_limiting, + fulu_epoch, + ) + .await; + + test_blobs_by_range(&mut rig, start_slot, count).await; } - assert_eq!(blob_count, 0); - assert_eq!(actual_count, 0); - assert_eq!(blob_count, actual_count); } From e8581909347ad917f796688e0128b83cc9b4d1d3 Mon Sep 17 00:00:00 2001 From: SunnysidedJ Date: Mon, 26 May 2025 11:23:41 +0900 Subject: [PATCH 4/6] add BlobsByRoot avoiding its tests. Also, fix for other tests --- .../network_beacon_processor/rpc_methods.rs | 33 +++- .../src/network_beacon_processor/tests.rs | 170 +++++++++++++----- beacon_node/store/src/hot_cold_store.rs | 8 + 3 files changed, 161 insertions(+), 50 deletions(-) diff --git a/beacon_node/network/src/network_beacon_processor/rpc_methods.rs b/beacon_node/network/src/network_beacon_processor/rpc_methods.rs index 3aae137780e..15f68eeef1b 100644 --- a/beacon_node/network/src/network_beacon_processor/rpc_methods.rs +++ b/beacon_node/network/src/network_beacon_processor/rpc_methods.rs @@ -3,6 +3,7 @@ use crate::network_beacon_processor::{NetworkBeaconProcessor, FUTURE_SLOT_TOLERA use crate::service::NetworkMessage; use crate::status::ToStatusMessage; use crate::sync::SyncMessage; +use beacon_chain::block_verification_types::AsBlock; use beacon_chain::{BeaconChainError, BeaconChainTypes, WhenSlotSkipped}; use itertools::{process_results, Itertools}; use lighthouse_network::rpc::methods::{ @@ -278,7 +279,34 @@ impl NetworkBeaconProcessor { let mut send_blob_count = 0; let mut blob_list_results = HashMap::new(); + let mut is_fulu_by_root = HashMap::new(); for id in request.blob_ids.as_slice() { + let BlobIdentifier { + block_root: root, + index, + } = id; + + if !is_fulu_by_root.contains_key(root) { + let epoch = if let Some(block) = self.chain.data_availability_checker.get_execution_valid_block(&root) { + block.message().epoch() + } else if let Some(block) = self.chain.early_attester_cache.get_block(*root) { + block.message().epoch() + } else if let Some(block) = self.chain.store.get_cached_block(root) { + block.message().epoch() + } else if let Ok(Some(block)) = self.chain.store.get_blinded_block(root) { + block.message().epoch() + } else { + // If we can't find the block in either cache, we can't determine the epoch. + continue; + }; + + is_fulu_by_root.insert(*root, self.chain.spec.is_peer_das_enabled_for_epoch(epoch)); + } + + if *is_fulu_by_root.get(root).unwrap() { + continue; + } + // First attempt to get the blobs from the RPC cache. if let Ok(Some(blob)) = self.chain.data_availability_checker.get_blob(id) { self.send_response( @@ -288,11 +316,6 @@ impl NetworkBeaconProcessor { ); send_blob_count += 1; } else { - let BlobIdentifier { - block_root: root, - index, - } = id; - let blob_list_result = match blob_list_results.entry(root) { Entry::Vacant(entry) => { entry.insert(self.chain.get_blobs_checking_early_attester_cache(root)) diff --git a/beacon_node/network/src/network_beacon_processor/tests.rs b/beacon_node/network/src/network_beacon_processor/tests.rs index 87a55589432..e6bb4a6a651 100644 --- a/beacon_node/network/src/network_beacon_processor/tests.rs +++ b/beacon_node/network/src/network_beacon_processor/tests.rs @@ -1,4 +1,4 @@ -//#![cfg(not(debug_assertions))] // Tests are too slow in debug. +#![cfg(not(debug_assertions))] // Tests are too slow in debug. #![cfg(test)] use crate::{ @@ -17,7 +17,7 @@ use beacon_chain::test_utils::{ use beacon_chain::{BeaconChain, WhenSlotSkipped}; use beacon_processor::{work_reprocessing_queue::*, *}; use itertools::Itertools; -use lighthouse_network::rpc::methods::{BlobsByRangeRequest, MetaDataV3}; +use lighthouse_network::rpc::methods::{BlobsByRangeRequest, BlobsByRootRequest, MetaDataV3}; use lighthouse_network::rpc::InboundRequestId; use lighthouse_network::{ discv5::enr::{self, CombinedKey}, @@ -26,12 +26,11 @@ use lighthouse_network::{ Client, MessageId, NetworkConfig, NetworkGlobals, PeerId, Response, }; use slot_clock::SlotClock; -use tracing::debug; use std::iter::Iterator; use std::sync::Arc; use std::time::Duration; use tokio::sync::mpsc; -use types::{blob_sidecar::FixedBlobSidecarList, ChainSpec}; +use types::{blob_sidecar::FixedBlobSidecarList, BlobIdentifier, ChainSpec, RuntimeVariableList}; use types::{ Attestation, AttesterSlashing, BlobSidecar, BlobSidecarList, DataColumnSidecarList, DataColumnSubnetId, Epoch, Hash256, MainnetEthSpec, ProposerSlashing, SignedAggregateAndProof, @@ -447,6 +446,18 @@ impl TestRig { } } + pub fn enqueue_blobs_by_root_request(&self, blob_ids: RuntimeVariableList) { + self.network_beacon_processor + .send_blobs_by_roots_request( + PeerId::random(), + InboundRequestId::new_unchecked(42, 24), + BlobsByRootRequest { + blob_ids, + }, + ) + .unwrap(); + } + pub fn enqueue_blobs_by_range_request(&self, start_slot: u64, count: u64) { self.network_beacon_processor .send_blobs_by_range_request( @@ -1233,18 +1244,16 @@ async fn test_blobs_by_range( rig: &mut TestRig, start_slot: u64, slot_count: u64, - //blob_count_expected: usize, ) { rig.enqueue_blobs_by_range_request(start_slot, slot_count); let mut blob_count = 0; - for slot in start_slot..slot_count { + for slot in start_slot..(start_slot + slot_count) { let root = rig .chain .block_root_at_slot(Slot::new(slot), WhenSlotSkipped::None) .unwrap(); - debug!("slot and root {} {:?}", slot, root); - blob_count += root + let slot_blob_count = root .map(|root| { rig.chain .get_blobs(&root) @@ -1252,7 +1261,13 @@ async fn test_blobs_by_range( .unwrap_or(0) }) .unwrap_or(0); + if rig.chain.spec.is_peer_das_enabled_for_epoch(Slot::new(slot).epoch(SLOTS_PER_EPOCH)) { + // If peer DAS is enabled, we expect the slot to have 0 blobs. + assert_eq!(slot_blob_count, 0); + } + blob_count += slot_blob_count; } + let mut actual_count = 0; while let Some(next) = rig._network_rx.recv().await { if let NetworkMessage::SendResponse { @@ -1270,55 +1285,120 @@ async fn test_blobs_by_range( panic!("unexpected message {:?}", next); } } - debug!("{} {} {} {}", start_slot, slot_count, blob_count, actual_count); assert_eq!(blob_count, actual_count); } #[tokio::test] async fn test_blobs_by_range_pre_fulu() { - if test_spec::().deneb_fork_epoch.is_none() { + if test_spec::().deneb_fork_epoch.is_none() || test_spec::().is_peer_das_scheduled() { return; }; - let rig_slot = 64; + let rig_slot = 2 * SLOTS_PER_EPOCH; let mut rig = TestRig::new(rig_slot).await; - let slot_count = 32; - test_blobs_by_range(&mut rig, 0, slot_count).await; - test_blobs_by_range(&mut rig, 0, slot_count).await; - test_blobs_by_range(&mut rig, 0, 16).await; - test_blobs_by_range(&mut rig, 8, 16).await; - test_blobs_by_range(&mut rig, 32, slot_count).await; + + // Test blobs by range. + test_blobs_by_range(&mut rig, 0, 32).await; + // Test duplicated request. + test_blobs_by_range(&mut rig, 0, 32).await; + // Test more random ranges. + test_blobs_by_range(&mut rig, 7, 23).await; + test_blobs_by_range(&mut rig, 0, 64).await; + test_blobs_by_range(&mut rig, 14, 64).await; } #[tokio::test] async fn test_blobs_by_range_fulu() { - //if !test_spec::().is_peer_das_scheduled() { - // return; - //} - - let fulu_epoch = Slot::new(SLOTS_PER_EPOCH * 4).epoch(SLOTS_PER_EPOCH); - - let pre_fulu_start_slot = 0;//SLOTS_PER_EPOCH;// + (3 % SLOTS_PER_EPOCH); - let fulu_start_slot = SLOTS_PER_EPOCH * 6;// + (7 % SLOTS_PER_EPOCH); - let slot_count = SLOTS_PER_EPOCH;// + 1; - let slot_count_long = SLOTS_PER_EPOCH * 5;// + (2 % SLOTS_PER_EPOCH); - - let test_cases = vec![ - (0, slot_count_long), - //(pre_fulu_start_slot, slot_count), - //(pre_fulu_start_slot, 0), - //(pre_fulu_start_slot, slot_count_long), - //(fulu_start_slot, slot_count), - //(fulu_start_slot, 0), - ]; - - for (start_slot, count) in test_cases { - let mut rig = TestRig::new_parametric_with_custom_fulu_epoch( - SLOTS_PER_EPOCH * 8, - BeaconProcessorConfig::default().enable_backfill_rate_limiting, - fulu_epoch, - ) - .await; + if !test_spec::().is_peer_das_scheduled() { + return; + } + + let fulu_epoch = Slot::new(2 * SLOTS_PER_EPOCH).epoch(SLOTS_PER_EPOCH); + + let mut rig = TestRig::new_parametric_with_custom_fulu_epoch( + SLOTS_PER_EPOCH * 5, + BeaconProcessorConfig::default().enable_backfill_rate_limiting, + fulu_epoch, + ) + .await; + + // Test deprecation from Fulu epoch. + test_blobs_by_range(&mut rig, 0, 96).await; + test_blobs_by_range(&mut rig, 32, 96).await; + test_blobs_by_range(&mut rig, 64, 128).await; + test_blobs_by_range(&mut rig, 93, 121).await; +} + +async fn test_blobs_by_root( + rig: &mut TestRig, + slots_and_indices: &[(u64, u64)], +) { + let mut blob_count = 0; + let mut blob_ids = RuntimeVariableList::empty(slots_and_indices.len()); + for (slot, index) in slots_and_indices { + let root = rig + .chain + .block_root_at_slot(Slot::new(*slot), WhenSlotSkipped::None) + .unwrap(); + let slot_blob_count = root + .map(|root| { + rig.chain + .get_blobs(&root) + .map(|list| list.len()) + .unwrap_or(0) + }) + .unwrap_or(0); + if rig.chain.spec.is_peer_das_enabled_for_epoch(Slot::new(*slot).epoch(SLOTS_PER_EPOCH)) { + // If peer DAS is enabled, we expect the slot to have 0 blobs. + assert_eq!(slot_blob_count, 0); + } + blob_count += slot_blob_count; + let blob_id = BlobIdentifier { + block_root: root.unwrap(), + index: *index, + }; + blob_ids.push(blob_id).unwrap(); + } + + rig.enqueue_blobs_by_root_request(blob_ids); - test_blobs_by_range(&mut rig, start_slot, count).await; + let mut actual_count = 0; + while let Some(next) = rig._network_rx.recv().await { + if let NetworkMessage::SendResponse { + peer_id: _, + response: Response::BlobsByRange(blob), + inbound_request_id: _, + } = next + { + if blob.is_some() { + actual_count += 1; + } else { + break; + } + } else { + panic!("unexpected message {:?}", next); + } } + assert_eq!(blob_count, actual_count); } + +#[tokio::test] +async fn test_blobs_by_root_fulu() { + if !test_spec::().is_peer_das_scheduled() { + return; + } + + let fulu_epoch = Slot::new(2 * SLOTS_PER_EPOCH).epoch(SLOTS_PER_EPOCH); + + let mut rig = TestRig::new_parametric_with_custom_fulu_epoch( + SLOTS_PER_EPOCH * 5, + BeaconProcessorConfig::default().enable_backfill_rate_limiting, + fulu_epoch, + ) + .await; + + // Test blobs by root. Fulu slots should not have blobs. + test_blobs_by_root(&mut rig, &[(0, 0)]).await; + test_blobs_by_root(&mut rig, &[(3, 0), (4, 0), (5, 0)]).await; + test_blobs_by_root(&mut rig, &[(32, 0), (65, 0), (120, 0)]).await; + test_blobs_by_root(&mut rig, &[(93, 0), (9, 0), (25, 0)]).await; +} \ No newline at end of file diff --git a/beacon_node/store/src/hot_cold_store.rs b/beacon_node/store/src/hot_cold_store.rs index 362c5d8014e..e7acfe985ca 100644 --- a/beacon_node/store/src/hot_cold_store.rs +++ b/beacon_node/store/src/hot_cold_store.rs @@ -2031,6 +2031,14 @@ impl, Cold: ItemStore> HotColdDB }) } + pub fn get_cached_block(&self, block_root: &Hash256) -> Option> { + if let Some(block) = self.block_cache.lock().get_block(block_root) { + Some(block.clone()) + } else { + None + } + } + /// Fetch columns for a given block from the store. pub fn get_data_columns( &self, From ea2bbad09250bcd88a40925824c28eb5fda31286 Mon Sep 17 00:00:00 2001 From: SunnysidedJ Date: Mon, 26 May 2025 17:38:49 +0900 Subject: [PATCH 5/6] cargo-fmt and lint --- .../network_beacon_processor/rpc_methods.rs | 14 ++++-- .../src/network_beacon_processor/tests.rs | 44 +++++++++---------- beacon_node/store/src/hot_cold_store.rs | 6 +-- 3 files changed, 34 insertions(+), 30 deletions(-) diff --git a/beacon_node/network/src/network_beacon_processor/rpc_methods.rs b/beacon_node/network/src/network_beacon_processor/rpc_methods.rs index 221bfebd2dd..3d75076eece 100644 --- a/beacon_node/network/src/network_beacon_processor/rpc_methods.rs +++ b/beacon_node/network/src/network_beacon_processor/rpc_methods.rs @@ -288,9 +288,13 @@ impl NetworkBeaconProcessor { } = id; if !is_fulu_by_root.contains_key(root) { - let epoch = if let Some(block) = self.chain.data_availability_checker.get_execution_valid_block(&root) { + let epoch = if let Some(block) = self + .chain + .data_availability_checker + .get_execution_valid_block(root) + { block.message().epoch() - } else if let Some(block) = self.chain.early_attester_cache.get_block(*root) { + } else if let Some(block) = self.chain.early_attester_cache.get_block(*root) { block.message().epoch() } else if let Some(block) = self.chain.store.get_cached_block(root) { block.message().epoch() @@ -951,7 +955,11 @@ impl NetworkBeaconProcessor { } // Check Fulu/PeerDAS is in the range. Blobs are not served since then. - if self.chain.spec.is_peer_das_enabled_for_epoch(request_end_epoch) { + if self + .chain + .spec + .is_peer_das_enabled_for_epoch(request_end_epoch) + { // Fulu epoch is Some type by the condition above. let fulu_epoch = self.chain.spec.fulu_fork_epoch.unwrap(); let fulu_start_slot = fulu_epoch.start_slot(T::EthSpec::slots_per_epoch()); diff --git a/beacon_node/network/src/network_beacon_processor/tests.rs b/beacon_node/network/src/network_beacon_processor/tests.rs index e6bb4a6a651..98f3029710e 100644 --- a/beacon_node/network/src/network_beacon_processor/tests.rs +++ b/beacon_node/network/src/network_beacon_processor/tests.rs @@ -101,17 +101,21 @@ impl TestRig { pub async fn new_parametric_with_custom_fulu_epoch( chain_length: u64, enable_backfill_rate_limiting: bool, - fulu_epoch: Epoch + fulu_epoch: Epoch, ) -> Self { let mut spec = test_spec::(); spec.shard_committee_period = 2; spec.fulu_fork_epoch = Some(fulu_epoch); let spec = Arc::new(spec); - + Self::new_parametric_inner(spec, chain_length, enable_backfill_rate_limiting).await } - async fn new_parametric_inner(spec: Arc, chain_length: u64, enable_backfill_rate_limiting: bool) -> Self { + async fn new_parametric_inner( + spec: Arc, + chain_length: u64, + enable_backfill_rate_limiting: bool, + ) -> Self { // This allows for testing voluntary exits without building out a massive chain. let harness = BeaconChainHarness::builder(MainnetEthSpec) .spec(spec.clone()) @@ -451,9 +455,7 @@ impl TestRig { .send_blobs_by_roots_request( PeerId::random(), InboundRequestId::new_unchecked(42, 24), - BlobsByRootRequest { - blob_ids, - }, + BlobsByRootRequest { blob_ids }, ) .unwrap(); } @@ -463,10 +465,7 @@ impl TestRig { .send_blobs_by_range_request( PeerId::random(), InboundRequestId::new_unchecked(42, 24), - BlobsByRangeRequest { - start_slot, - count, - }, + BlobsByRangeRequest { start_slot, count }, ) .unwrap(); } @@ -1240,11 +1239,7 @@ async fn test_backfill_sync_processing_rate_limiting_disabled() { .await; } -async fn test_blobs_by_range( - rig: &mut TestRig, - start_slot: u64, - slot_count: u64, -) { +async fn test_blobs_by_range(rig: &mut TestRig, start_slot: u64, slot_count: u64) { rig.enqueue_blobs_by_range_request(start_slot, slot_count); let mut blob_count = 0; @@ -1261,7 +1256,11 @@ async fn test_blobs_by_range( .unwrap_or(0) }) .unwrap_or(0); - if rig.chain.spec.is_peer_das_enabled_for_epoch(Slot::new(slot).epoch(SLOTS_PER_EPOCH)) { + if rig + .chain + .spec + .is_peer_das_enabled_for_epoch(Slot::new(slot).epoch(SLOTS_PER_EPOCH)) + { // If peer DAS is enabled, we expect the slot to have 0 blobs. assert_eq!(slot_blob_count, 0); } @@ -1328,10 +1327,7 @@ async fn test_blobs_by_range_fulu() { test_blobs_by_range(&mut rig, 93, 121).await; } -async fn test_blobs_by_root( - rig: &mut TestRig, - slots_and_indices: &[(u64, u64)], -) { +async fn test_blobs_by_root(rig: &mut TestRig, slots_and_indices: &[(u64, u64)]) { let mut blob_count = 0; let mut blob_ids = RuntimeVariableList::empty(slots_and_indices.len()); for (slot, index) in slots_and_indices { @@ -1347,7 +1343,11 @@ async fn test_blobs_by_root( .unwrap_or(0) }) .unwrap_or(0); - if rig.chain.spec.is_peer_das_enabled_for_epoch(Slot::new(*slot).epoch(SLOTS_PER_EPOCH)) { + if rig + .chain + .spec + .is_peer_das_enabled_for_epoch(Slot::new(*slot).epoch(SLOTS_PER_EPOCH)) + { // If peer DAS is enabled, we expect the slot to have 0 blobs. assert_eq!(slot_blob_count, 0); } @@ -1401,4 +1401,4 @@ async fn test_blobs_by_root_fulu() { test_blobs_by_root(&mut rig, &[(3, 0), (4, 0), (5, 0)]).await; test_blobs_by_root(&mut rig, &[(32, 0), (65, 0), (120, 0)]).await; test_blobs_by_root(&mut rig, &[(93, 0), (9, 0), (25, 0)]).await; -} \ No newline at end of file +} diff --git a/beacon_node/store/src/hot_cold_store.rs b/beacon_node/store/src/hot_cold_store.rs index e48dc4c0a04..4e953136ab4 100644 --- a/beacon_node/store/src/hot_cold_store.rs +++ b/beacon_node/store/src/hot_cold_store.rs @@ -2029,11 +2029,7 @@ impl, Cold: ItemStore> HotColdDB } pub fn get_cached_block(&self, block_root: &Hash256) -> Option> { - if let Some(block) = self.block_cache.lock().get_block(block_root) { - Some(block.clone()) - } else { - None - } + self.block_cache.lock().get_block(block_root).cloned() } /// Fetch all columns for a given block from the store. From 6f731d0050411ad1b3cb802311d9abf1cc074ca9 Mon Sep 17 00:00:00 2001 From: SunnysidedJ Date: Mon, 26 May 2025 18:28:16 +0900 Subject: [PATCH 6/6] Fix for by root tests --- beacon_node/network/src/network_beacon_processor/tests.rs | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/beacon_node/network/src/network_beacon_processor/tests.rs b/beacon_node/network/src/network_beacon_processor/tests.rs index 98f3029710e..97f0c6aeda5 100644 --- a/beacon_node/network/src/network_beacon_processor/tests.rs +++ b/beacon_node/network/src/network_beacon_processor/tests.rs @@ -1350,8 +1350,9 @@ async fn test_blobs_by_root(rig: &mut TestRig, slots_and_indices: &[(u64, u64)]) { // If peer DAS is enabled, we expect the slot to have 0 blobs. assert_eq!(slot_blob_count, 0); + } else if slot_blob_count > 0 { + blob_count += 1; } - blob_count += slot_blob_count; let blob_id = BlobIdentifier { block_root: root.unwrap(), index: *index, @@ -1365,7 +1366,7 @@ async fn test_blobs_by_root(rig: &mut TestRig, slots_and_indices: &[(u64, u64)]) while let Some(next) = rig._network_rx.recv().await { if let NetworkMessage::SendResponse { peer_id: _, - response: Response::BlobsByRange(blob), + response: Response::BlobsByRoot(blob), inbound_request_id: _, } = next {