From e0aa917ab988535f77615ba67922340410d13e34 Mon Sep 17 00:00:00 2001 From: Tan Chee Keong Date: Thu, 17 Jul 2025 12:52:19 +0800 Subject: [PATCH 01/18] add fulu_start_slot empty response --- .../src/network_beacon_processor/rpc_methods.rs | 16 ++++++++++++++++ 1 file changed, 16 insertions(+) diff --git a/beacon_node/network/src/network_beacon_processor/rpc_methods.rs b/beacon_node/network/src/network_beacon_processor/rpc_methods.rs index 4004305f83c..68da3dbfb59 100644 --- a/beacon_node/network/src/network_beacon_processor/rpc_methods.rs +++ b/beacon_node/network/src/network_beacon_processor/rpc_methods.rs @@ -885,6 +885,22 @@ impl NetworkBeaconProcessor { let request_start_slot = Slot::from(req.start_slot); + // Check if the request slot is after a Fulu slot; if so, return empty response + if let Some(fulu_epoch) = self.chain.spec.fulu_fork_epoch { + let fulu_start_slot = fulu_epoch.start_slot(T::EthSpec::slots_per_epoch()); + + if request_start_slot >= fulu_start_slot { + debug!( + %peer_id, + %request_start_slot, + %fulu_start_slot, + returned = 0, + "BlobsByRange request is at or after a Fulu slot, returning empty response" + ); + return Ok(()); + } + } + let data_availability_boundary_slot = match self.chain.data_availability_boundary() { Some(boundary) => boundary.start_slot(T::EthSpec::slots_per_epoch()), None => { From 4f8be4fc5a51774d6c653519c8f52f63e1962b7f Mon Sep 17 00:00:00 2001 From: Tan Chee Keong Date: Thu, 17 Jul 2025 21:21:13 +0800 Subject: [PATCH 02/18] BlobsByRange that spans Fulu fork --- .../network_beacon_processor/rpc_methods.rs | 20 ++++++++++++++++--- 1 file changed, 17 insertions(+), 3 deletions(-) diff --git a/beacon_node/network/src/network_beacon_processor/rpc_methods.rs b/beacon_node/network/src/network_beacon_processor/rpc_methods.rs index 68da3dbfb59..2bdfbe8e9c7 100644 --- a/beacon_node/network/src/network_beacon_processor/rpc_methods.rs +++ b/beacon_node/network/src/network_beacon_processor/rpc_methods.rs @@ -884,11 +884,14 @@ impl NetworkBeaconProcessor { ); let request_start_slot = Slot::from(req.start_slot); + // This variable may only change when the request_start_slot + req.count spans across the Fulu fork slot + let mut effective_count = req.count; - // Check if the request slot is after a Fulu slot; if so, return empty response if let Some(fulu_epoch) = self.chain.spec.fulu_fork_epoch { let fulu_start_slot = fulu_epoch.start_slot(T::EthSpec::slots_per_epoch()); + let request_end_slot = request_start_slot + req.count - 1; + // If the request_start_slot is at or after a Fulu slot, return empty response if request_start_slot >= fulu_start_slot { debug!( %peer_id, @@ -898,6 +901,17 @@ impl NetworkBeaconProcessor { "BlobsByRange request is at or after a Fulu slot, returning empty response" ); return Ok(()); + // For the case that the request slots spans across the Fulu fork slot + } else if request_start_slot < fulu_start_slot && request_end_slot >= fulu_start_slot { + effective_count = (fulu_start_slot - request_start_slot).as_u64(); + debug!( + %peer_id, + %request_start_slot, + %fulu_start_slot, + requested_count = req.count, + served_count = effective_count, + "BlobsByRange request spans across Fulu fork, only serving blobs before Fulu slots" + ) } } @@ -937,7 +951,7 @@ impl NetworkBeaconProcessor { } let block_roots = - self.get_block_roots_for_slot_range(req.start_slot, req.count, "BlobsByRange")?; + self.get_block_roots_for_slot_range(req.start_slot, effective_count, "BlobsByRange")?; let current_slot = self .chain @@ -964,7 +978,7 @@ impl NetworkBeaconProcessor { // Due to skip slots, blobs could be out of the range, we ensure they // are in the range before sending if blob_sidecar.slot() >= request_start_slot - && blob_sidecar.slot() < request_start_slot + req.count + && blob_sidecar.slot() < request_start_slot + effective_count { blobs_sent += 1; self.send_network_message(NetworkMessage::SendResponse { From cbc4306c12632da942a8087d5e84d54c2f890e20 Mon Sep 17 00:00:00 2001 From: Tan Chee Keong Date: Thu, 17 Jul 2025 21:44:24 +0800 Subject: [PATCH 03/18] rename --- .../network/src/network_beacon_processor/rpc_methods.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/beacon_node/network/src/network_beacon_processor/rpc_methods.rs b/beacon_node/network/src/network_beacon_processor/rpc_methods.rs index 2bdfbe8e9c7..53594c6a352 100644 --- a/beacon_node/network/src/network_beacon_processor/rpc_methods.rs +++ b/beacon_node/network/src/network_beacon_processor/rpc_methods.rs @@ -908,8 +908,8 @@ impl NetworkBeaconProcessor { %peer_id, %request_start_slot, %fulu_start_slot, - requested_count = req.count, - served_count = effective_count, + requested = req.count, + returned = effective_count, "BlobsByRange request spans across Fulu fork, only serving blobs before Fulu slots" ) } From b9149f6126e19e0517d06b3d353a8ff980f73e83 Mon Sep 17 00:00:00 2001 From: Tan Chee Keong Date: Fri, 18 Jul 2025 21:52:12 +0800 Subject: [PATCH 04/18] Add BlobsByRange test --- .../src/network_beacon_processor/tests.rs | 104 +++++++++++++++++- 1 file changed, 98 insertions(+), 6 deletions(-) diff --git a/beacon_node/network/src/network_beacon_processor/tests.rs b/beacon_node/network/src/network_beacon_processor/tests.rs index 109c361ebe8..a540e402315 100644 --- a/beacon_node/network/src/network_beacon_processor/tests.rs +++ b/beacon_node/network/src/network_beacon_processor/tests.rs @@ -417,15 +417,12 @@ impl TestRig { } } - pub fn enqueue_blobs_by_range_request(&self, count: u64) { + pub fn enqueue_blobs_by_range_request(&self, start_slot: u64, count: u64) { self.network_beacon_processor .send_blobs_by_range_request( PeerId::random(), InboundRequestId::new_unchecked(42, 24), - BlobsByRangeRequest { - start_slot: 0, - count, - }, + BlobsByRangeRequest { start_slot, count }, ) .unwrap(); } @@ -1325,8 +1322,9 @@ async fn test_blobs_by_range() { return; }; let mut rig = TestRig::new(64).await; + let start_slot = 0; let slot_count = 32; - rig.enqueue_blobs_by_range_request(slot_count); + rig.enqueue_blobs_by_range_request(start_slot, slot_count); let mut blob_count = 0; for slot in 0..slot_count { @@ -1362,3 +1360,97 @@ async fn test_blobs_by_range() { } assert_eq!(blob_count, actual_count); } + +#[tokio::test] +async fn test_blobs_by_range_post_fulu_should_returns_empty() { + // Only test for Fulu fork + if test_spec::().fulu_fork_epoch.is_none() { + return; + }; + let mut rig = TestRig::new(64).await; + let start_slot = 0; + let slot_count = 32; + rig.enqueue_blobs_by_range_request(start_slot, slot_count); + + let mut actual_count = 0; + + while let Some(next) = rig.network_rx.recv().await { + if let NetworkMessage::SendResponse { + peer_id: _, + response: Response::BlobsByRange(blob), + inbound_request_id: _, + } = next + { + if blob.is_some() { + actual_count += 1; + } else { + break; + } + } else { + panic!("unexpected message {:?}", next); + } + } + // Post-Fulu should return 0 blobs + assert_eq!(0, actual_count); +} + +#[tokio::test] +async fn test_blobs_by_range_spans_fulu_fork() { + // Only test for Electra & Fulu fork transition + if test_spec::().electra_fork_epoch.is_none() { + return; + }; + let mut spec = test_spec::(); + spec.fulu_fork_epoch = Some(Epoch::new(1)); + + let mut rig = TestRig::new_parametric(64, BeaconProcessorConfig::default(), spec).await; + + let start_slot = 16; + // This will span from epoch 0 (Electra) to epoch 1 (Fulu) + let slot_count = 32; + + println!("Deneb fork epoch is: {:?}", rig.chain.spec.deneb_fork_epoch); + println!( + "Electra fork epoch is: {:?}", + rig.chain.spec.electra_fork_epoch + ); + println!("Fulu fork epoch is: {:?}", rig.chain.spec.fulu_fork_epoch); + + rig.enqueue_blobs_by_range_request(start_slot, slot_count); + + let mut blob_count = 0; + for slot in start_slot..slot_count { + let root = rig + .chain + .block_root_at_slot(Slot::new(slot), WhenSlotSkipped::None) + .unwrap(); + blob_count += root + .map(|root| { + rig.chain + .get_blobs(&root) + .map(|list| list.len()) + .unwrap_or(0) + }) + .unwrap_or(0); + } + + let mut actual_count = 0; + + while let Some(next) = rig.network_rx.recv().await { + if let NetworkMessage::SendResponse { + peer_id: _, + response: Response::BlobsByRange(blob), + inbound_request_id: _, + } = next + { + if blob.is_some() { + actual_count += 1; + } else { + break; + } + } else { + panic!("unexpected message {:?}", next); + } + } + assert_eq!(blob_count, actual_count); +} From befef72f92f742fab2846203deae4e326207274b Mon Sep 17 00:00:00 2001 From: Tan Chee Keong Date: Tue, 22 Jul 2025 07:50:26 +0800 Subject: [PATCH 05/18] add Fuly BlobsByRoot --- .../network_beacon_processor/rpc_methods.rs | 44 ++++++++++++++++--- 1 file changed, 39 insertions(+), 5 deletions(-) diff --git a/beacon_node/network/src/network_beacon_processor/rpc_methods.rs b/beacon_node/network/src/network_beacon_processor/rpc_methods.rs index 53594c6a352..39a663d3660 100644 --- a/beacon_node/network/src/network_beacon_processor/rpc_methods.rs +++ b/beacon_node/network/src/network_beacon_processor/rpc_methods.rs @@ -279,10 +279,35 @@ impl NetworkBeaconProcessor { .collect::>(); let mut send_blob_count = 0; + let fulu_start_slot = self + .chain + .spec + .fulu_fork_epoch + .map(|epoch| epoch.start_slot(T::EthSpec::slots_per_epoch())); + let mut blob_list_results = HashMap::new(); for id in request.blob_ids.as_slice() { + let BlobIdentifier { + block_root: root, + index, + } = id; + // First attempt to get the blobs from the RPC cache. if let Ok(Some(blob)) = self.chain.data_availability_checker.get_blob(id) { + // Check if the blob requested is from a Fulu slot, if so, skip the current blob id and proceed to the next + if let Some(fulu_slot) = fulu_start_slot { + if blob.slot() >= fulu_slot { + debug!( + %peer_id, + request_root = %root, + blob_slot = %blob.slot(), + %fulu_slot, + "BlobsByRoot request is at or after Fulu slot, returning empty response" + ); + continue; + } + } + self.send_response( peer_id, inbound_request_id, @@ -290,11 +315,6 @@ impl NetworkBeaconProcessor { ); send_blob_count += 1; } else { - let BlobIdentifier { - block_root: root, - index, - } = id; - let blob_list_result = match blob_list_results.entry(root) { Entry::Vacant(entry) => { entry.insert(self.chain.get_blobs_checking_early_attester_cache(root)) @@ -306,6 +326,20 @@ impl NetworkBeaconProcessor { Ok(blobs_sidecar_list) => { 'inner: for blob_sidecar in blobs_sidecar_list.iter() { if blob_sidecar.index == *index { + // Same logic as above to check for Fulu slot + if let Some(fulu_slot) = fulu_start_slot { + if blob_sidecar.slot() >= fulu_slot { + debug!( + %peer_id, + request_root = %root, + blob_slot = %blob_sidecar.slot(), + %fulu_slot, + "BlobsByRoot request is at or after Fulu slot, returning empty response" + ); + break 'inner; + } + } + self.send_response( peer_id, inbound_request_id, From 18322732cee92f5a218c98216e8ef69cef77d859 Mon Sep 17 00:00:00 2001 From: Tan Chee Keong Date: Tue, 22 Jul 2025 08:19:29 +0800 Subject: [PATCH 06/18] test --- .../src/network_beacon_processor/tests.rs | 75 ++++++++++++++++--- 1 file changed, 63 insertions(+), 12 deletions(-) diff --git a/beacon_node/network/src/network_beacon_processor/tests.rs b/beacon_node/network/src/network_beacon_processor/tests.rs index a540e402315..f55958a8a95 100644 --- a/beacon_node/network/src/network_beacon_processor/tests.rs +++ b/beacon_node/network/src/network_beacon_processor/tests.rs @@ -20,7 +20,7 @@ use beacon_chain::{BeaconChain, WhenSlotSkipped}; use beacon_processor::{work_reprocessing_queue::*, *}; use gossipsub::MessageAcceptance; use itertools::Itertools; -use lighthouse_network::rpc::methods::{BlobsByRangeRequest, MetaDataV3}; +use lighthouse_network::rpc::methods::{BlobsByRangeRequest, BlobsByRootRequest, MetaDataV3}; use lighthouse_network::rpc::InboundRequestId; use lighthouse_network::{ discv5::enr::{self, CombinedKey}, @@ -34,11 +34,12 @@ use std::iter::Iterator; use std::sync::Arc; use std::time::Duration; use tokio::sync::mpsc; -use types::blob_sidecar::FixedBlobSidecarList; +use types::blob_sidecar::{BlobIdentifier, FixedBlobSidecarList}; use types::{ AttesterSlashing, BlobSidecar, BlobSidecarList, ChainSpec, DataColumnSidecarList, - DataColumnSubnetId, Epoch, Hash256, MainnetEthSpec, ProposerSlashing, SignedAggregateAndProof, - SignedBeaconBlock, SignedVoluntaryExit, SingleAttestation, Slot, SubnetId, + DataColumnSubnetId, Epoch, Hash256, MainnetEthSpec, ProposerSlashing, RuntimeVariableList, + SignedAggregateAndProof, SignedBeaconBlock, SignedVoluntaryExit, SingleAttestation, Slot, + SubnetId, }; type E = MainnetEthSpec; @@ -427,6 +428,16 @@ impl TestRig { .unwrap(); } + pub fn enqueue_blobs_by_root_request(&self, blob_ids: RuntimeVariableList) { + self.network_beacon_processor + .send_blobs_by_roots_request( + PeerId::random(), + InboundRequestId::new_unchecked(42, 24), + BlobsByRootRequest { blob_ids }, + ) + .unwrap(); + } + pub fn enqueue_backfill_batch(&self) { self.network_beacon_processor .send_chain_segment( @@ -1362,7 +1373,7 @@ async fn test_blobs_by_range() { } #[tokio::test] -async fn test_blobs_by_range_post_fulu_should_returns_empty() { +async fn test_blobs_by_range_post_fulu_should_return_empty() { // Only test for Fulu fork if test_spec::().fulu_fork_epoch.is_none() { return; @@ -1409,13 +1420,6 @@ async fn test_blobs_by_range_spans_fulu_fork() { // This will span from epoch 0 (Electra) to epoch 1 (Fulu) let slot_count = 32; - println!("Deneb fork epoch is: {:?}", rig.chain.spec.deneb_fork_epoch); - println!( - "Electra fork epoch is: {:?}", - rig.chain.spec.electra_fork_epoch - ); - println!("Fulu fork epoch is: {:?}", rig.chain.spec.fulu_fork_epoch); - rig.enqueue_blobs_by_range_request(start_slot, slot_count); let mut blob_count = 0; @@ -1454,3 +1458,50 @@ async fn test_blobs_by_range_spans_fulu_fork() { } assert_eq!(blob_count, actual_count); } + +#[tokio::test] +async fn test_blobs_by_root_post_fulu_should_return_empty() { + // Only test for Fulu fork + if test_spec::().fulu_fork_epoch.is_none() { + return; + }; + + let mut rig = TestRig::new(64).await; + + // Get the block root of a sample slot, e.g., slot 1 + let block_root = rig + .chain + .block_root_at_slot(Slot::new(1), WhenSlotSkipped::None) + .unwrap() + .unwrap(); + + let blob_ids = vec![BlobIdentifier { + block_root, + index: 0, + }]; + + let blob_ids_list = RuntimeVariableList::new(blob_ids, 1).unwrap(); + + rig.enqueue_blobs_by_root_request(blob_ids_list); + + let mut actual_count = 0; + + while let Some(next) = rig.network_rx.recv().await { + if let NetworkMessage::SendResponse { + peer_id: _, + response: Response::BlobsByRoot(blob), + inbound_request_id: _, + } = next + { + if blob.is_some() { + actual_count += 1; + } else { + break; + } + } else { + panic!("unexpected message {:?}", next); + } + } + // Post-Fulu should return 0 blobs + assert_eq!(0, actual_count); +} From bf208808a65bea6765b5071cf6039f18232827b0 Mon Sep 17 00:00:00 2001 From: Tan Chee Keong Date: Fri, 1 Aug 2025 11:40:18 +0800 Subject: [PATCH 07/18] fix BlobsByRoot --- .../network_beacon_processor/rpc_methods.rs | 54 +++++++++++++------ 1 file changed, 37 insertions(+), 17 deletions(-) diff --git a/beacon_node/network/src/network_beacon_processor/rpc_methods.rs b/beacon_node/network/src/network_beacon_processor/rpc_methods.rs index 39a663d3660..643c6c39177 100644 --- a/beacon_node/network/src/network_beacon_processor/rpc_methods.rs +++ b/beacon_node/network/src/network_beacon_processor/rpc_methods.rs @@ -286,28 +286,48 @@ impl NetworkBeaconProcessor { .map(|epoch| epoch.start_slot(T::EthSpec::slots_per_epoch())); let mut blob_list_results = HashMap::new(); + let mut retrieve_blob_slot = HashMap::new(); for id in request.blob_ids.as_slice() { let BlobIdentifier { block_root: root, index, } = id; - // First attempt to get the blobs from the RPC cache. - if let Ok(Some(blob)) = self.chain.data_availability_checker.get_blob(id) { - // Check if the blob requested is from a Fulu slot, if so, skip the current blob id and proceed to the next - if let Some(fulu_slot) = fulu_start_slot { - if blob.slot() >= fulu_slot { - debug!( - %peer_id, - request_root = %root, - blob_slot = %blob.slot(), - %fulu_slot, - "BlobsByRoot request is at or after Fulu slot, returning empty response" - ); - continue; - } + // Get the slot for where the blob belongs to from the HashMap or cache without touching the database + let slot = if let Some(slot) = retrieve_blob_slot.get(root) { + *slot + } else { + // Try to get block from caches to extract slot + if let Some(block) = self + .chain + .data_availability_checker + .get_execution_valid_block(root) + .or_else(|| self.chain.early_attester_cache.get_block(*root)) + { + let slot = block.slot(); + retrieve_blob_slot.insert(*root, slot); + slot + } else { + continue; } + }; + // Skip if slot is >= fulu_start_slot + if let Some(fulu_slot) = fulu_start_slot { + if slot >= fulu_slot { + debug!( + %peer_id, + request_root = %root, + %slot, + %fulu_slot, + "BlobsByRoot request is at or after Fulu slot, returning empty response" + ); + continue; + } + } + + // First attempt to get the blobs from the RPC cache. + if let Ok(Some(blob)) = self.chain.data_availability_checker.get_blob(id) { self.send_response( peer_id, inbound_request_id, @@ -925,7 +945,7 @@ impl NetworkBeaconProcessor { let fulu_start_slot = fulu_epoch.start_slot(T::EthSpec::slots_per_epoch()); let request_end_slot = request_start_slot + req.count - 1; - // If the request_start_slot is at or after a Fulu slot, return empty response + // If the request_start_slot is at or after a Fulu slot, return an empty response if request_start_slot >= fulu_start_slot { debug!( %peer_id, @@ -936,14 +956,14 @@ impl NetworkBeaconProcessor { ); return Ok(()); // For the case that the request slots spans across the Fulu fork slot - } else if request_start_slot < fulu_start_slot && request_end_slot >= fulu_start_slot { + } else if request_end_slot >= fulu_start_slot { effective_count = (fulu_start_slot - request_start_slot).as_u64(); debug!( %peer_id, %request_start_slot, %fulu_start_slot, requested = req.count, - returned = effective_count, + effective_count, "BlobsByRange request spans across Fulu fork, only serving blobs before Fulu slots" ) } From 792c0d320590e18bb92dffd7b79e37506099fd0b Mon Sep 17 00:00:00 2001 From: Tan Chee Keong Date: Fri, 1 Aug 2025 16:13:30 +0800 Subject: [PATCH 08/18] add check --- .../src/network_beacon_processor/rpc_methods.rs | 13 ++++++++++++- consensus/types/src/chain_spec.rs | 2 +- 2 files changed, 13 insertions(+), 2 deletions(-) diff --git a/beacon_node/network/src/network_beacon_processor/rpc_methods.rs b/beacon_node/network/src/network_beacon_processor/rpc_methods.rs index 643c6c39177..23a724e6349 100644 --- a/beacon_node/network/src/network_beacon_processor/rpc_methods.rs +++ b/beacon_node/network/src/network_beacon_processor/rpc_methods.rs @@ -938,12 +938,23 @@ impl NetworkBeaconProcessor { ); let request_start_slot = Slot::from(req.start_slot); + let request_start_epoch = request_start_slot.epoch(T::EthSpec::slots_per_epoch()); + // Should not send more than max request blob sidecars + if req.max_blobs_requested(request_start_epoch, &self.chain.spec) + > self.chain.spec.max_request_blob_sidecars_electra + { + return Err(( + RpcErrorResponse::InvalidRequest, + "Request exceeded `MAX_REQUEST_BLOBS_SIDECARS_ELECTRA`", + )); + } + // This variable may only change when the request_start_slot + req.count spans across the Fulu fork slot let mut effective_count = req.count; if let Some(fulu_epoch) = self.chain.spec.fulu_fork_epoch { let fulu_start_slot = fulu_epoch.start_slot(T::EthSpec::slots_per_epoch()); - let request_end_slot = request_start_slot + req.count - 1; + let request_end_slot = request_start_slot.saturating_add(req.count) - 1; // If the request_start_slot is at or after a Fulu slot, return an empty response if request_start_slot >= fulu_start_slot { diff --git a/consensus/types/src/chain_spec.rs b/consensus/types/src/chain_spec.rs index 4476cd69b3a..7f62ffc2a26 100644 --- a/consensus/types/src/chain_spec.rs +++ b/consensus/types/src/chain_spec.rs @@ -241,7 +241,7 @@ pub struct ChainSpec { */ max_blobs_per_block_electra: u64, blob_sidecar_subnet_count_electra: u64, - max_request_blob_sidecars_electra: u64, + pub max_request_blob_sidecars_electra: u64, /* * Networking Fulu From 32c3ab0d4acbd7f7ccbc0e331bdceffb76329b46 Mon Sep 17 00:00:00 2001 From: Tan Chee Keong Date: Fri, 1 Aug 2025 16:25:48 +0800 Subject: [PATCH 09/18] remove mut in effective_count --- .../network_beacon_processor/rpc_methods.rs | 18 ++++++++++-------- 1 file changed, 10 insertions(+), 8 deletions(-) diff --git a/beacon_node/network/src/network_beacon_processor/rpc_methods.rs b/beacon_node/network/src/network_beacon_processor/rpc_methods.rs index 23a724e6349..df881a02bce 100644 --- a/beacon_node/network/src/network_beacon_processor/rpc_methods.rs +++ b/beacon_node/network/src/network_beacon_processor/rpc_methods.rs @@ -949,10 +949,7 @@ impl NetworkBeaconProcessor { )); } - // This variable may only change when the request_start_slot + req.count spans across the Fulu fork slot - let mut effective_count = req.count; - - if let Some(fulu_epoch) = self.chain.spec.fulu_fork_epoch { + let effective_count = if let Some(fulu_epoch) = self.chain.spec.fulu_fork_epoch { let fulu_start_slot = fulu_epoch.start_slot(T::EthSpec::slots_per_epoch()); let request_end_slot = request_start_slot.saturating_add(req.count) - 1; @@ -968,17 +965,22 @@ impl NetworkBeaconProcessor { return Ok(()); // For the case that the request slots spans across the Fulu fork slot } else if request_end_slot >= fulu_start_slot { - effective_count = (fulu_start_slot - request_start_slot).as_u64(); + let count = (fulu_start_slot - request_start_slot).as_u64(); debug!( %peer_id, %request_start_slot, %fulu_start_slot, requested = req.count, - effective_count, + effective_count = count, "BlobsByRange request spans across Fulu fork, only serving blobs before Fulu slots" - ) + ); + count + } else { + req.count } - } + } else { + req.count + }; let data_availability_boundary_slot = match self.chain.data_availability_boundary() { Some(boundary) => boundary.start_slot(T::EthSpec::slots_per_epoch()), From 415ec1da86e8dfde664c2bb96e3e4c64a7bd7223 Mon Sep 17 00:00:00 2001 From: Tan Chee Keong Date: Mon, 4 Aug 2025 11:03:04 +0800 Subject: [PATCH 10/18] revise --- .../network/src/network_beacon_processor/rpc_methods.rs | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/beacon_node/network/src/network_beacon_processor/rpc_methods.rs b/beacon_node/network/src/network_beacon_processor/rpc_methods.rs index df881a02bce..9202aba1efc 100644 --- a/beacon_node/network/src/network_beacon_processor/rpc_methods.rs +++ b/beacon_node/network/src/network_beacon_processor/rpc_methods.rs @@ -308,7 +308,14 @@ impl NetworkBeaconProcessor { retrieve_blob_slot.insert(*root, slot); slot } else { - continue; + match self.chain.get_blinded_block(root) { + Ok(Some(block)) => { + let slot = block.slot(); + retrieve_blob_slot.insert(*root, slot); + slot + } + _ => continue, + } } }; From c762e918ce1b49e83f3ab9dc093434a459ac7ad3 Mon Sep 17 00:00:00 2001 From: Tan Chee Keong Date: Mon, 4 Aug 2025 12:13:36 +0800 Subject: [PATCH 11/18] revise logging --- beacon_node/network/src/network_beacon_processor/rpc_methods.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/beacon_node/network/src/network_beacon_processor/rpc_methods.rs b/beacon_node/network/src/network_beacon_processor/rpc_methods.rs index 9202aba1efc..035ec0c144a 100644 --- a/beacon_node/network/src/network_beacon_processor/rpc_methods.rs +++ b/beacon_node/network/src/network_beacon_processor/rpc_methods.rs @@ -1118,7 +1118,7 @@ impl NetworkBeaconProcessor { if req.max_requested::() > self.chain.spec.max_request_data_column_sidecars { return Err(( RpcErrorResponse::InvalidRequest, - "Request exceeded `MAX_REQUEST_BLOBS_SIDECARS`", + "Request exceeded `MAX_REQUEST_DATA_COLUMN_SIDECARS`", )); } From f79124465c4546733967c9cb978abdbbc0607818 Mon Sep 17 00:00:00 2001 From: Tan Chee Keong Date: Wed, 6 Aug 2025 15:35:58 +0800 Subject: [PATCH 12/18] Improve logging --- .../network_beacon_processor/rpc_methods.rs | 31 ++++++++++--------- 1 file changed, 17 insertions(+), 14 deletions(-) diff --git a/beacon_node/network/src/network_beacon_processor/rpc_methods.rs b/beacon_node/network/src/network_beacon_processor/rpc_methods.rs index 035ec0c144a..e6e61d55ee2 100644 --- a/beacon_node/network/src/network_beacon_processor/rpc_methods.rs +++ b/beacon_node/network/src/network_beacon_processor/rpc_methods.rs @@ -266,17 +266,12 @@ impl NetworkBeaconProcessor { inbound_request_id: InboundRequestId, request: BlobsByRootRequest, ) -> Result<(), (RpcErrorResponse, &'static str)> { - let Some(requested_root) = request.blob_ids.as_slice().first().map(|id| id.block_root) + let Some(_requested_root) = request.blob_ids.as_slice().first().map(|id| id.block_root) else { // No blob ids requested. return Ok(()); }; - let requested_indices = request - .blob_ids - .as_slice() - .iter() - .map(|id| id.index) - .collect::>(); + let mut send_blob_count = 0; let fulu_start_slot = self @@ -287,6 +282,8 @@ impl NetworkBeaconProcessor { let mut blob_list_results = HashMap::new(); let mut retrieve_blob_slot = HashMap::new(); + // For logging purpose, to display one log per block root + let mut logging = HashMap::new(); for id in request.blob_ids.as_slice() { let BlobIdentifier { block_root: root, @@ -341,6 +338,7 @@ impl NetworkBeaconProcessor { Response::BlobsByRoot(Some(blob)), ); send_blob_count += 1; + logging.entry(*root).or_insert(Vec::new()).push(*index); } else { let blob_list_result = match blob_list_results.entry(root) { Entry::Vacant(entry) => { @@ -373,6 +371,7 @@ impl NetworkBeaconProcessor { Response::BlobsByRoot(Some(blob_sidecar.clone())), ); send_blob_count += 1; + logging.entry(*root).or_insert(Vec::new()).push(*index); break 'inner; } } @@ -388,13 +387,17 @@ impl NetworkBeaconProcessor { } } } - debug!( - %peer_id, - %requested_root, - ?requested_indices, - returned = send_blob_count, - "BlobsByRoot outgoing response processed" - ); + + // log once per block_root + for (block_root, blobs_indices) in &logging { + debug!( + %peer_id, + %block_root, + ?blobs_indices, + returned = send_blob_count, + "BlobsByRoot outgoing response processed" + ); + } Ok(()) } From 029e418ffa41cd793dfccc2a692155459f2ab84b Mon Sep 17 00:00:00 2001 From: Tan Chee Keong Date: Wed, 6 Aug 2025 16:41:20 +0800 Subject: [PATCH 13/18] revise --- .../network_beacon_processor/rpc_methods.rs | 18 ++++++------------ 1 file changed, 6 insertions(+), 12 deletions(-) diff --git a/beacon_node/network/src/network_beacon_processor/rpc_methods.rs b/beacon_node/network/src/network_beacon_processor/rpc_methods.rs index e6e61d55ee2..1e02a8934bb 100644 --- a/beacon_node/network/src/network_beacon_processor/rpc_methods.rs +++ b/beacon_node/network/src/network_beacon_processor/rpc_methods.rs @@ -292,7 +292,7 @@ impl NetworkBeaconProcessor { // Get the slot for where the blob belongs to from the HashMap or cache without touching the database let slot = if let Some(slot) = retrieve_blob_slot.get(root) { - *slot + Some(*slot) } else { // Try to get block from caches to extract slot if let Some(block) = self @@ -303,21 +303,15 @@ impl NetworkBeaconProcessor { { let slot = block.slot(); retrieve_blob_slot.insert(*root, slot); - slot + Some(slot) } else { - match self.chain.get_blinded_block(root) { - Ok(Some(block)) => { - let slot = block.slot(); - retrieve_blob_slot.insert(*root, slot); - slot - } - _ => continue, - } + // Blobs not found in cache, return None to avoid hitting the database + None } }; // Skip if slot is >= fulu_start_slot - if let Some(fulu_slot) = fulu_start_slot { + if let (Some(slot), Some(fulu_slot)) = (slot, fulu_start_slot) { if slot >= fulu_slot { debug!( %peer_id, @@ -351,7 +345,7 @@ impl NetworkBeaconProcessor { Ok(blobs_sidecar_list) => { 'inner: for blob_sidecar in blobs_sidecar_list.iter() { if blob_sidecar.index == *index { - // Same logic as above to check for Fulu slot + // Check for Fulu slot if let Some(fulu_slot) = fulu_start_slot { if blob_sidecar.slot() >= fulu_slot { debug!( From 1fe8f8276ab19a5257c00fea0fc76250a36864b8 Mon Sep 17 00:00:00 2001 From: Tan Chee Keong Date: Thu, 7 Aug 2025 15:20:50 +0800 Subject: [PATCH 14/18] lint --- beacon_node/network/src/network_beacon_processor/tests.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/beacon_node/network/src/network_beacon_processor/tests.rs b/beacon_node/network/src/network_beacon_processor/tests.rs index bba89c311ee..310879d150f 100644 --- a/beacon_node/network/src/network_beacon_processor/tests.rs +++ b/beacon_node/network/src/network_beacon_processor/tests.rs @@ -38,8 +38,8 @@ use types::blob_sidecar::{BlobIdentifier, FixedBlobSidecarList}; use types::{ AttesterSlashing, BlobSidecar, BlobSidecarList, ChainSpec, DataColumnSidecarList, DataColumnSubnetId, Epoch, EthSpec, Hash256, MainnetEthSpec, ProposerSlashing, - SignedAggregateAndProof, SignedBeaconBlock, SignedVoluntaryExit, SingleAttestation, Slot, - SubnetId, + RuntimeVariableList, SignedAggregateAndProof, SignedBeaconBlock, SignedVoluntaryExit, + SingleAttestation, Slot, SubnetId, }; type E = MainnetEthSpec; From 00687f27d377a93693592f5bc8e7c2e4f2e2412e Mon Sep 17 00:00:00 2001 From: Tan Chee Keong Date: Wed, 13 Aug 2025 11:34:01 +0800 Subject: [PATCH 15/18] if loop lint --- .../network_beacon_processor/rpc_methods.rs | 44 +++++++++---------- 1 file changed, 22 insertions(+), 22 deletions(-) diff --git a/beacon_node/network/src/network_beacon_processor/rpc_methods.rs b/beacon_node/network/src/network_beacon_processor/rpc_methods.rs index 9817d05d9b4..95549e15e59 100644 --- a/beacon_node/network/src/network_beacon_processor/rpc_methods.rs +++ b/beacon_node/network/src/network_beacon_processor/rpc_methods.rs @@ -313,17 +313,17 @@ impl NetworkBeaconProcessor { }; // Skip if slot is >= fulu_start_slot - if let (Some(slot), Some(fulu_slot)) = (slot, fulu_start_slot) { - if slot >= fulu_slot { - debug!( - %peer_id, - request_root = %root, - %slot, - %fulu_slot, - "BlobsByRoot request is at or after Fulu slot, returning empty response" - ); - continue; - } + if let (Some(slot), Some(fulu_slot)) = (slot, fulu_start_slot) + && slot >= fulu_slot + { + debug!( + %peer_id, + request_root = %root, + %slot, + %fulu_slot, + "BlobsByRoot request is at or after Fulu slot, returning empty response" + ); + continue; } // First attempt to get the blobs from the RPC cache. @@ -348,17 +348,17 @@ impl NetworkBeaconProcessor { 'inner: for blob_sidecar in blobs_sidecar_list.iter() { if blob_sidecar.index == *index { // Check for Fulu slot - if let Some(fulu_slot) = fulu_start_slot { - if blob_sidecar.slot() >= fulu_slot { - debug!( - %peer_id, - request_root = %root, - blob_slot = %blob_sidecar.slot(), - %fulu_slot, - "BlobsByRoot request is at or after Fulu slot, returning empty response" - ); - break 'inner; - } + if let Some(fulu_slot) = fulu_start_slot + && blob_sidecar.slot() >= fulu_slot + { + debug!( + %peer_id, + request_root = %root, + blob_slot = %blob_sidecar.slot(), + %fulu_slot, + "BlobsByRoot request is at or after Fulu slot, returning empty response" + ); + break 'inner; } self.send_response( From 0cbd9fd57ead7accaf76f4b33f1a7608323b7e8a Mon Sep 17 00:00:00 2001 From: Tan Chee Keong Date: Wed, 20 Aug 2025 09:33:03 +0800 Subject: [PATCH 16/18] revise --- .../network_beacon_processor/rpc_methods.rs | 84 +++++-------------- .../src/network_beacon_processor/tests.rs | 65 +++++++++++++- consensus/types/src/chain_spec.rs | 2 +- 3 files changed, 87 insertions(+), 64 deletions(-) diff --git a/beacon_node/network/src/network_beacon_processor/rpc_methods.rs b/beacon_node/network/src/network_beacon_processor/rpc_methods.rs index 95549e15e59..130ca994d05 100644 --- a/beacon_node/network/src/network_beacon_processor/rpc_methods.rs +++ b/beacon_node/network/src/network_beacon_processor/rpc_methods.rs @@ -268,12 +268,6 @@ impl NetworkBeaconProcessor { inbound_request_id: InboundRequestId, request: BlobsByRootRequest, ) -> Result<(), (RpcErrorResponse, &'static str)> { - let Some(_requested_root) = request.blob_ids.as_slice().first().map(|id| id.block_root) - else { - // No blob ids requested. - return Ok(()); - }; - let mut send_blob_count = 0; let fulu_start_slot = self @@ -283,9 +277,9 @@ impl NetworkBeaconProcessor { .map(|epoch| epoch.start_slot(T::EthSpec::slots_per_epoch())); let mut blob_list_results = HashMap::new(); - let mut retrieve_blob_slot = HashMap::new(); + let mut slots_by_block_root = HashMap::new(); // For logging purpose, to display one log per block root - let mut logging = HashMap::new(); + let mut logging_index_by_block_root = HashMap::new(); for id in request.blob_ids.as_slice() { let BlobIdentifier { block_root: root, @@ -293,7 +287,7 @@ impl NetworkBeaconProcessor { } = id; // Get the slot for where the blob belongs to from the HashMap or cache without touching the database - let slot = if let Some(slot) = retrieve_blob_slot.get(root) { + let slot = if let Some(slot) = slots_by_block_root.get(root) { Some(*slot) } else { // Try to get block from caches to extract slot @@ -304,10 +298,10 @@ impl NetworkBeaconProcessor { .or_else(|| self.chain.early_attester_cache.get_block(*root)) { let slot = block.slot(); - retrieve_blob_slot.insert(*root, slot); + slots_by_block_root.insert(*root, slot); Some(slot) } else { - // Blobs not found in cache, return None to avoid hitting the database + // Block not found in cache, returning None to query blobs from the database later None } }; @@ -316,13 +310,6 @@ impl NetworkBeaconProcessor { if let (Some(slot), Some(fulu_slot)) = (slot, fulu_start_slot) && slot >= fulu_slot { - debug!( - %peer_id, - request_root = %root, - %slot, - %fulu_slot, - "BlobsByRoot request is at or after Fulu slot, returning empty response" - ); continue; } @@ -334,7 +321,10 @@ impl NetworkBeaconProcessor { Response::BlobsByRoot(Some(blob)), ); send_blob_count += 1; - logging.entry(*root).or_insert(Vec::new()).push(*index); + logging_index_by_block_root + .entry(*root) + .or_insert(Vec::new()) + .push(*index); } else { let blob_list_result = match blob_list_results.entry(root) { Entry::Vacant(entry) => { @@ -347,27 +337,16 @@ impl NetworkBeaconProcessor { Ok(blobs_sidecar_list) => { 'inner: for blob_sidecar in blobs_sidecar_list.iter() { if blob_sidecar.index == *index { - // Check for Fulu slot - if let Some(fulu_slot) = fulu_start_slot - && blob_sidecar.slot() >= fulu_slot - { - debug!( - %peer_id, - request_root = %root, - blob_slot = %blob_sidecar.slot(), - %fulu_slot, - "BlobsByRoot request is at or after Fulu slot, returning empty response" - ); - break 'inner; - } - self.send_response( peer_id, inbound_request_id, Response::BlobsByRoot(Some(blob_sidecar.clone())), ); send_blob_count += 1; - logging.entry(*root).or_insert(Vec::new()).push(*index); + logging_index_by_block_root + .entry(*root) + .or_insert(Vec::new()) + .push(*index); break 'inner; } } @@ -384,16 +363,12 @@ impl NetworkBeaconProcessor { } } - // log once per block_root - for (block_root, blobs_indices) in &logging { - debug!( - %peer_id, - %block_root, - ?blobs_indices, - returned = send_blob_count, - "BlobsByRoot outgoing response processed" - ); - } + debug!( + %peer_id, + block_root = ?logging_index_by_block_root.keys(), + returned = send_blob_count, + "BlobsByRoot outgoing response processed" + ); Ok(()) } @@ -952,9 +927,10 @@ impl NetworkBeaconProcessor { let request_start_slot = Slot::from(req.start_slot); let request_start_epoch = request_start_slot.epoch(T::EthSpec::slots_per_epoch()); + let fork_name = self.chain.spec.fork_name_at_epoch(request_start_epoch); // Should not send more than max request blob sidecars if req.max_blobs_requested(request_start_epoch, &self.chain.spec) - > self.chain.spec.max_request_blob_sidecars_electra + > self.chain.spec.max_request_blob_sidecars(fork_name) as u64 { return Err(( RpcErrorResponse::InvalidRequest, @@ -968,26 +944,10 @@ impl NetworkBeaconProcessor { // If the request_start_slot is at or after a Fulu slot, return an empty response if request_start_slot >= fulu_start_slot { - debug!( - %peer_id, - %request_start_slot, - %fulu_start_slot, - returned = 0, - "BlobsByRange request is at or after a Fulu slot, returning empty response" - ); return Ok(()); // For the case that the request slots spans across the Fulu fork slot } else if request_end_slot >= fulu_start_slot { - let count = (fulu_start_slot - request_start_slot).as_u64(); - debug!( - %peer_id, - %request_start_slot, - %fulu_start_slot, - requested = req.count, - effective_count = count, - "BlobsByRange request spans across Fulu fork, only serving blobs before Fulu slots" - ); - count + (fulu_start_slot - request_start_slot).as_u64() } else { req.count } diff --git a/beacon_node/network/src/network_beacon_processor/tests.rs b/beacon_node/network/src/network_beacon_processor/tests.rs index ee8fe686336..8c6e76c44b3 100644 --- a/beacon_node/network/src/network_beacon_processor/tests.rs +++ b/beacon_node/network/src/network_beacon_processor/tests.rs @@ -1461,6 +1461,70 @@ async fn test_blobs_by_range_spans_fulu_fork() { assert_eq!(blob_count, actual_count); } +#[tokio::test] +async fn test_blobs_by_root() { + if test_spec::().deneb_fork_epoch.is_none() { + return; + }; + + let mut rig = TestRig::new(64).await; + + // Get the block root of a sample slot, e.g., slot 1 + let block_root = rig + .chain + .block_root_at_slot(Slot::new(1), WhenSlotSkipped::None) + .unwrap() + .unwrap(); + + let blobs = rig.chain.get_blobs(&block_root).unwrap(); + let blob_count = blobs.len(); + + let blob_ids: Vec = (0..blob_count) + .map(|index| BlobIdentifier { + block_root, + index: index as u64, + }) + .collect(); + + let blob_ids_list = RuntimeVariableList::new(blob_ids, blob_count).unwrap(); + + rig.enqueue_blobs_by_root_request(blob_ids_list); + + let mut blob_count = 0; + let root = rig + .chain + .block_root_at_slot(Slot::new(1), WhenSlotSkipped::None) + .unwrap(); + blob_count += root + .map(|root| { + rig.chain + .get_blobs(&root) + .map(|list| list.len()) + .unwrap_or(0) + }) + .unwrap_or(0); + + let mut actual_count = 0; + + while let Some(next) = rig.network_rx.recv().await { + if let NetworkMessage::SendResponse { + peer_id: _, + response: Response::BlobsByRoot(blob), + inbound_request_id: _, + } = next + { + if blob.is_some() { + actual_count += 1; + } else { + break; + } + } else { + panic!("unexpected message {:?}", next); + } + } + assert_eq!(blob_count, actual_count); +} + #[tokio::test] async fn test_blobs_by_root_post_fulu_should_return_empty() { // Only test for Fulu fork @@ -1470,7 +1534,6 @@ async fn test_blobs_by_root_post_fulu_should_return_empty() { let mut rig = TestRig::new(64).await; - // Get the block root of a sample slot, e.g., slot 1 let block_root = rig .chain .block_root_at_slot(Slot::new(1), WhenSlotSkipped::None) diff --git a/consensus/types/src/chain_spec.rs b/consensus/types/src/chain_spec.rs index 55612f0d508..659bd1d23f3 100644 --- a/consensus/types/src/chain_spec.rs +++ b/consensus/types/src/chain_spec.rs @@ -243,7 +243,7 @@ pub struct ChainSpec { */ max_blobs_per_block_electra: u64, blob_sidecar_subnet_count_electra: u64, - pub max_request_blob_sidecars_electra: u64, + max_request_blob_sidecars_electra: u64, /* * Networking Fulu From 7f0a3f3c6ee486e0c045f8040a419148d3f4445f Mon Sep 17 00:00:00 2001 From: Tan Chee Keong Date: Wed, 27 Aug 2025 12:12:27 +0800 Subject: [PATCH 17/18] fix --- beacon_node/network/src/network_beacon_processor/tests.rs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/beacon_node/network/src/network_beacon_processor/tests.rs b/beacon_node/network/src/network_beacon_processor/tests.rs index d2693f0fb03..eefd1aa388a 100644 --- a/beacon_node/network/src/network_beacon_processor/tests.rs +++ b/beacon_node/network/src/network_beacon_processor/tests.rs @@ -1608,6 +1608,7 @@ async fn test_blobs_by_root_post_fulu_should_return_empty() { } // Post-Fulu should return 0 blobs assert_eq!(0, actual_count); +} /// Ensure that data column processing that results in block import sends a sync notification #[tokio::test] @@ -1666,4 +1667,4 @@ async fn test_data_column_import_notifies_sync() { other => panic!("expected GossipBlockProcessResult, got {:?}", other), } } -} \ No newline at end of file +} From 763af594e74d879df97317a7b9f99123247020d0 Mon Sep 17 00:00:00 2001 From: Tan Chee Keong Date: Wed, 27 Aug 2025 16:42:21 +0800 Subject: [PATCH 18/18] Merge conflict and fix test by adding gloas_fork_epoch --- beacon_node/network/src/network_beacon_processor/tests.rs | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/beacon_node/network/src/network_beacon_processor/tests.rs b/beacon_node/network/src/network_beacon_processor/tests.rs index eefd1aa388a..4f6261700c5 100644 --- a/beacon_node/network/src/network_beacon_processor/tests.rs +++ b/beacon_node/network/src/network_beacon_processor/tests.rs @@ -21,7 +21,9 @@ use beacon_processor::{work_reprocessing_queue::*, *}; use gossipsub::MessageAcceptance; use itertools::Itertools; use lighthouse_network::rpc::InboundRequestId; -use lighthouse_network::rpc::methods::{BlobsByRangeRequest, BlobsByRootRequest, MetaDataV3}; +use lighthouse_network::rpc::methods::{ + BlobsByRangeRequest, BlobsByRootRequest, DataColumnsByRangeRequest, MetaDataV3, +}; use lighthouse_network::{ Client, MessageId, NetworkConfig, NetworkGlobals, PeerId, Response, discv5::enr::{self, CombinedKey}, @@ -1454,6 +1456,7 @@ async fn test_blobs_by_range_spans_fulu_fork() { }; let mut spec = test_spec::(); spec.fulu_fork_epoch = Some(Epoch::new(1)); + spec.gloas_fork_epoch = Some(Epoch::new(2)); let mut rig = TestRig::new_parametric(64, BeaconProcessorConfig::default(), spec).await;