Skip to content

Avoid attempting to serve blobs after Fulu fork #7756

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 18 commits into
base: unstable
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
103 changes: 82 additions & 21 deletions beacon_node/network/src/network_beacon_processor/rpc_methods.rs
Original file line number Diff line number Diff line change
Expand Up @@ -268,21 +268,51 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
inbound_request_id: InboundRequestId,
request: BlobsByRootRequest,
) -> Result<(), (RpcErrorResponse, &'static str)> {
let Some(requested_root) = request.blob_ids.as_slice().first().map(|id| id.block_root)
else {
// No blob ids requested.
return Ok(());
};
let requested_indices = request
.blob_ids
.as_slice()
.iter()
.map(|id| id.index)
.collect::<Vec<_>>();
let mut send_blob_count = 0;

let fulu_start_slot = self
.chain
.spec
.fulu_fork_epoch
.map(|epoch| epoch.start_slot(T::EthSpec::slots_per_epoch()));

let mut blob_list_results = HashMap::new();
let mut slots_by_block_root = HashMap::new();
// For logging purpose, to display one log per block root
let mut logging_index_by_block_root = HashMap::new();
for id in request.blob_ids.as_slice() {
let BlobIdentifier {
block_root: root,
index,
} = id;

// Get the slot for where the blob belongs to from the HashMap or cache without touching the database
let slot = if let Some(slot) = slots_by_block_root.get(root) {
Some(*slot)
} else {
// Try to get block from caches to extract slot
if let Some(block) = self
.chain
.data_availability_checker
.get_execution_valid_block(root)
.or_else(|| self.chain.early_attester_cache.get_block(*root))
{
let slot = block.slot();
slots_by_block_root.insert(*root, slot);
Some(slot)
} else {
// Block not found in cache, returning None to query blobs from the database later
None
}
};

// Skip if slot is >= fulu_start_slot
if let (Some(slot), Some(fulu_slot)) = (slot, fulu_start_slot)
&& slot >= fulu_slot
{
continue;
}

// First attempt to get the blobs from the RPC cache.
if let Ok(Some(blob)) = self.chain.data_availability_checker.get_blob(id) {
self.send_response(
Expand All @@ -291,12 +321,11 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
Response::BlobsByRoot(Some(blob)),
);
send_blob_count += 1;
logging_index_by_block_root
.entry(*root)
.or_insert(Vec::new())
.push(*index);
} else {
let BlobIdentifier {
block_root: root,
index,
} = id;

let blob_list_result = match blob_list_results.entry(root) {
Entry::Vacant(entry) => {
entry.insert(self.chain.get_blobs_checking_early_attester_cache(root))
Expand All @@ -314,6 +343,10 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
Response::BlobsByRoot(Some(blob_sidecar.clone())),
);
send_blob_count += 1;
logging_index_by_block_root
.entry(*root)
.or_insert(Vec::new())
.push(*index);
break 'inner;
}
}
Expand All @@ -329,10 +362,10 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
}
}
}

debug!(
%peer_id,
%requested_root,
?requested_indices,
block_root = ?logging_index_by_block_root.keys(),
returned = send_blob_count,
"BlobsByRoot outgoing response processed"
);
Expand Down Expand Up @@ -893,6 +926,34 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
);

let request_start_slot = Slot::from(req.start_slot);
let request_start_epoch = request_start_slot.epoch(T::EthSpec::slots_per_epoch());
let fork_name = self.chain.spec.fork_name_at_epoch(request_start_epoch);
// Should not send more than max request blob sidecars
if req.max_blobs_requested(request_start_epoch, &self.chain.spec)
> self.chain.spec.max_request_blob_sidecars(fork_name) as u64
{
return Err((
RpcErrorResponse::InvalidRequest,
"Request exceeded `MAX_REQUEST_BLOBS_SIDECARS_ELECTRA`",
));
}

let effective_count = if let Some(fulu_epoch) = self.chain.spec.fulu_fork_epoch {
let fulu_start_slot = fulu_epoch.start_slot(T::EthSpec::slots_per_epoch());
let request_end_slot = request_start_slot.saturating_add(req.count) - 1;

// If the request_start_slot is at or after a Fulu slot, return an empty response
if request_start_slot >= fulu_start_slot {
return Ok(());
// For the case that the request slots spans across the Fulu fork slot
} else if request_end_slot >= fulu_start_slot {
(fulu_start_slot - request_start_slot).as_u64()
} else {
req.count
}
} else {
req.count
};

let data_availability_boundary_slot = match self.chain.data_availability_boundary() {
Some(boundary) => boundary.start_slot(T::EthSpec::slots_per_epoch()),
Expand Down Expand Up @@ -930,7 +991,7 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
}

let block_roots =
self.get_block_roots_for_slot_range(req.start_slot, req.count, "BlobsByRange")?;
self.get_block_roots_for_slot_range(req.start_slot, effective_count, "BlobsByRange")?;

let current_slot = self
.chain
Expand All @@ -957,7 +1018,7 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
// Due to skip slots, blobs could be out of the range, we ensure they
// are in the range before sending
if blob_sidecar.slot() >= request_start_slot
&& blob_sidecar.slot() < request_start_slot + req.count
&& blob_sidecar.slot() < request_start_slot + effective_count
{
blobs_sent += 1;
self.send_network_message(NetworkMessage::SendResponse {
Expand Down Expand Up @@ -1024,7 +1085,7 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
if req.max_requested::<T::EthSpec>() > self.chain.spec.max_request_data_column_sidecars {
return Err((
RpcErrorResponse::InvalidRequest,
"Request exceeded `MAX_REQUEST_BLOBS_SIDECARS`",
"Request exceeded `MAX_REQUEST_DATA_COLUMN_SIDECARS`",
));
}

Expand Down
Loading
Loading