From 2badfb70c46de440249a3e0e8c765724ffffa481 Mon Sep 17 00:00:00 2001 From: Michael Sproul Date: Wed, 10 Sep 2025 12:12:42 +1000 Subject: [PATCH 1/4] Fix beacon processor metrics for batched messages --- beacon_node/beacon_processor/src/lib.rs | 34 +++++++++++++++++++++---- 1 file changed, 29 insertions(+), 5 deletions(-) diff --git a/beacon_node/beacon_processor/src/lib.rs b/beacon_node/beacon_processor/src/lib.rs index ab9ab045f4e..d0a30675799 100644 --- a/beacon_node/beacon_processor/src/lib.rs +++ b/beacon_node/beacon_processor/src/lib.rs @@ -627,7 +627,7 @@ impl fmt::Debug for Work { } } -#[derive(IntoStaticStr, PartialEq, Eq, Debug, Clone)] +#[derive(IntoStaticStr, PartialEq, Eq, Debug, Clone, Copy)] #[strum(serialize_all = "snake_case")] pub enum WorkType { GossipAttestation, @@ -731,6 +731,17 @@ impl Work { } } +impl WorkType { + /// Return the type of work that was batched to create this type of work, if any. + pub fn batched_from(self) -> Option { + match self { + WorkType::GossipAggregateBatch => Some(WorkType::GossipAggregate), + WorkType::GossipAttestationBatch => Some(WorkType::GossipAttestation), + _ => None, + } + } +} + /// Unifies all the messages processed by the `BeaconProcessor`. enum InboundEvent { /// A worker has completed a task and is free. @@ -1404,7 +1415,7 @@ impl BeaconProcessor { }; if let Some(modified_queue_id) = modified_queue_id { - let queue_len = match modified_queue_id { + let get_queue_len = |work_type| match work_type { WorkType::GossipAttestation => attestation_queue.len(), WorkType::GossipAttestationToConvert => attestation_to_convert_queue.len(), WorkType::UnknownBlockAttestation => unknown_block_attestation_queue.len(), @@ -1458,11 +1469,24 @@ impl BeaconProcessor { WorkType::ApiRequestP1 => api_request_p1_queue.len(), WorkType::Reprocess => 0, }; + + // Update the metric for the length of this work type's queue. metrics::observe_vec( &metrics::BEACON_PROCESSOR_QUEUE_LENGTH, &[modified_queue_id.into()], - queue_len as f64, + get_queue_len(modified_queue_id) as f64, ); + + // If the work was batched from another type of work, also update that work + // event's queue length. There was previously a bug here where those queues + // metrics could go a long time without being updated at all. + if let Some(batched_work_type) = modified_queue_id.batched_from() { + metrics::observe_vec( + &metrics::BEACON_PROCESSOR_QUEUE_LENGTH, + &[batched_work_type.into()], + get_queue_len(batched_work_type) as f64, + ); + } } if aggregate_queue.is_full() && aggregate_debounce.elapsed() { @@ -1690,10 +1714,10 @@ impl Drop for SendOnDrop { fn drop(&mut self) { metrics::dec_gauge_vec( &metrics::BEACON_PROCESSOR_WORKERS_ACTIVE_GAUGE_BY_TYPE, - &[self.work_type.clone().into()], + &[self.work_type.into()], ); - if let Err(e) = self.tx.try_send(self.work_type.clone()) { + if let Err(e) = self.tx.try_send(self.work_type) { warn!( msg = "did not free worker, shutdown may be underway", error = %e, From b033e1fc9ee7f0caf91a57a42fd62764cd57a537 Mon Sep 17 00:00:00 2001 From: Michael Sproul Date: Thu, 11 Sep 2025 13:25:47 +1000 Subject: [PATCH 2/4] More bruteforce approach --- beacon_node/beacon_processor/src/lib.rs | 146 +++++++++++------------- 1 file changed, 68 insertions(+), 78 deletions(-) diff --git a/beacon_node/beacon_processor/src/lib.rs b/beacon_node/beacon_processor/src/lib.rs index d0a30675799..d6746fd51c9 100644 --- a/beacon_node/beacon_processor/src/lib.rs +++ b/beacon_node/beacon_processor/src/lib.rs @@ -58,7 +58,7 @@ use std::pin::Pin; use std::sync::Arc; use std::task::Context; use std::time::{Duration, Instant}; -use strum::IntoStaticStr; +use strum::{EnumIter, IntoEnumIterator, IntoStaticStr}; use task_executor::TaskExecutor; use tokio::sync::mpsc; use tokio::sync::mpsc::error::TrySendError; @@ -627,7 +627,7 @@ impl fmt::Debug for Work { } } -#[derive(IntoStaticStr, PartialEq, Eq, Debug, Clone, Copy)] +#[derive(IntoStaticStr, EnumIter, PartialEq, Eq, Debug, Clone, Copy)] #[strum(serialize_all = "snake_case")] pub enum WorkType { GossipAttestation, @@ -1033,7 +1033,7 @@ impl BeaconProcessor { .is_some_and(|event| event.drop_during_sync); let idle_tx = idle_tx.clone(); - let modified_queue_id = match work_event { + match work_event { // There is no new work event, but we are able to spawn a new worker. // // We don't check the `work.drop_during_sync` here. We assume that if it made @@ -1414,81 +1414,6 @@ impl BeaconProcessor { } }; - if let Some(modified_queue_id) = modified_queue_id { - let get_queue_len = |work_type| match work_type { - WorkType::GossipAttestation => attestation_queue.len(), - WorkType::GossipAttestationToConvert => attestation_to_convert_queue.len(), - WorkType::UnknownBlockAttestation => unknown_block_attestation_queue.len(), - WorkType::GossipAttestationBatch => 0, // No queue - WorkType::GossipAggregate => aggregate_queue.len(), - WorkType::UnknownBlockAggregate => unknown_block_aggregate_queue.len(), - WorkType::UnknownLightClientOptimisticUpdate => { - unknown_light_client_update_queue.len() - } - WorkType::GossipAggregateBatch => 0, // No queue - WorkType::GossipBlock => gossip_block_queue.len(), - WorkType::GossipBlobSidecar => gossip_blob_queue.len(), - WorkType::GossipDataColumnSidecar => gossip_data_column_queue.len(), - WorkType::DelayedImportBlock => delayed_block_queue.len(), - WorkType::GossipVoluntaryExit => gossip_voluntary_exit_queue.len(), - WorkType::GossipProposerSlashing => gossip_proposer_slashing_queue.len(), - WorkType::GossipAttesterSlashing => gossip_attester_slashing_queue.len(), - WorkType::GossipSyncSignature => sync_message_queue.len(), - WorkType::GossipSyncContribution => sync_contribution_queue.len(), - WorkType::GossipLightClientFinalityUpdate => { - lc_gossip_finality_update_queue.len() - } - WorkType::GossipLightClientOptimisticUpdate => { - lc_gossip_optimistic_update_queue.len() - } - WorkType::RpcBlock => rpc_block_queue.len(), - WorkType::RpcBlobs | WorkType::IgnoredRpcBlock => rpc_blob_queue.len(), - WorkType::RpcCustodyColumn => rpc_custody_column_queue.len(), - WorkType::ColumnReconstruction => column_reconstruction_queue.len(), - WorkType::ChainSegment => chain_segment_queue.len(), - WorkType::ChainSegmentBackfill => backfill_chain_segment.len(), - WorkType::Status => status_queue.len(), - WorkType::BlocksByRangeRequest => blbrange_queue.len(), - WorkType::BlocksByRootsRequest => blbroots_queue.len(), - WorkType::BlobsByRangeRequest => bbrange_queue.len(), - WorkType::BlobsByRootsRequest => bbroots_queue.len(), - WorkType::DataColumnsByRootsRequest => dcbroots_queue.len(), - WorkType::DataColumnsByRangeRequest => dcbrange_queue.len(), - WorkType::GossipBlsToExecutionChange => { - gossip_bls_to_execution_change_queue.len() - } - WorkType::LightClientBootstrapRequest => lc_bootstrap_queue.len(), - WorkType::LightClientOptimisticUpdateRequest => { - lc_rpc_optimistic_update_queue.len() - } - WorkType::LightClientFinalityUpdateRequest => { - lc_rpc_finality_update_queue.len() - } - WorkType::LightClientUpdatesByRangeRequest => lc_update_range_queue.len(), - WorkType::ApiRequestP0 => api_request_p0_queue.len(), - WorkType::ApiRequestP1 => api_request_p1_queue.len(), - WorkType::Reprocess => 0, - }; - - // Update the metric for the length of this work type's queue. - metrics::observe_vec( - &metrics::BEACON_PROCESSOR_QUEUE_LENGTH, - &[modified_queue_id.into()], - get_queue_len(modified_queue_id) as f64, - ); - - // If the work was batched from another type of work, also update that work - // event's queue length. There was previously a bug here where those queues - // metrics could go a long time without being updated at all. - if let Some(batched_work_type) = modified_queue_id.batched_from() { - metrics::observe_vec( - &metrics::BEACON_PROCESSOR_QUEUE_LENGTH, - &[batched_work_type.into()], - get_queue_len(batched_work_type) as f64, - ); - } - } - if aggregate_queue.is_full() && aggregate_debounce.elapsed() { error!( msg = "the system has insufficient resources for load", @@ -1504,6 +1429,71 @@ impl BeaconProcessor { "Attestation queue full" ) } + + let get_queue_len = |work_type| match work_type { + WorkType::GossipAttestation => attestation_queue.len(), + WorkType::GossipAttestationToConvert => attestation_to_convert_queue.len(), + WorkType::UnknownBlockAttestation => unknown_block_attestation_queue.len(), + WorkType::GossipAttestationBatch => 0, // No queue + WorkType::GossipAggregate => aggregate_queue.len(), + WorkType::UnknownBlockAggregate => unknown_block_aggregate_queue.len(), + WorkType::UnknownLightClientOptimisticUpdate => { + unknown_light_client_update_queue.len() + } + WorkType::GossipAggregateBatch => 0, // No queue + WorkType::GossipBlock => gossip_block_queue.len(), + WorkType::GossipBlobSidecar => gossip_blob_queue.len(), + WorkType::GossipDataColumnSidecar => gossip_data_column_queue.len(), + WorkType::DelayedImportBlock => delayed_block_queue.len(), + WorkType::GossipVoluntaryExit => gossip_voluntary_exit_queue.len(), + WorkType::GossipProposerSlashing => gossip_proposer_slashing_queue.len(), + WorkType::GossipAttesterSlashing => gossip_attester_slashing_queue.len(), + WorkType::GossipSyncSignature => sync_message_queue.len(), + WorkType::GossipSyncContribution => sync_contribution_queue.len(), + WorkType::GossipLightClientFinalityUpdate => { + lc_gossip_finality_update_queue.len() + } + WorkType::GossipLightClientOptimisticUpdate => { + lc_gossip_optimistic_update_queue.len() + } + WorkType::RpcBlock => rpc_block_queue.len(), + WorkType::RpcBlobs | WorkType::IgnoredRpcBlock => rpc_blob_queue.len(), + WorkType::RpcCustodyColumn => rpc_custody_column_queue.len(), + WorkType::ColumnReconstruction => column_reconstruction_queue.len(), + WorkType::ChainSegment => chain_segment_queue.len(), + WorkType::ChainSegmentBackfill => backfill_chain_segment.len(), + WorkType::Status => status_queue.len(), + WorkType::BlocksByRangeRequest => blbrange_queue.len(), + WorkType::BlocksByRootsRequest => blbroots_queue.len(), + WorkType::BlobsByRangeRequest => bbrange_queue.len(), + WorkType::BlobsByRootsRequest => bbroots_queue.len(), + WorkType::DataColumnsByRootsRequest => dcbroots_queue.len(), + WorkType::DataColumnsByRangeRequest => dcbrange_queue.len(), + WorkType::GossipBlsToExecutionChange => { + gossip_bls_to_execution_change_queue.len() + } + WorkType::LightClientBootstrapRequest => lc_bootstrap_queue.len(), + WorkType::LightClientOptimisticUpdateRequest => { + lc_rpc_optimistic_update_queue.len() + } + WorkType::LightClientFinalityUpdateRequest => { + lc_rpc_finality_update_queue.len() + } + WorkType::LightClientUpdatesByRangeRequest => lc_update_range_queue.len(), + WorkType::ApiRequestP0 => api_request_p0_queue.len(), + WorkType::ApiRequestP1 => api_request_p1_queue.len(), + WorkType::Reprocess => 0, + }; + + // Update metrics for all work events. + for work_type in WorkType::iter() { + // Update the metric for the length of this work type's queue. + metrics::observe_vec( + &metrics::BEACON_PROCESSOR_QUEUE_LENGTH, + &[work_type.into()], + get_queue_len(work_type) as f64, + ); + } } }; From dd1f185e68d5cf01bd08bb587314a06fb3a6b749 Mon Sep 17 00:00:00 2001 From: Michael Sproul Date: Mon, 15 Sep 2025 10:40:27 +1000 Subject: [PATCH 3/4] Revert "More bruteforce approach" This reverts commit b033e1fc9ee7f0caf91a57a42fd62764cd57a537. --- beacon_node/beacon_processor/src/lib.rs | 146 +++++++++++++----------- 1 file changed, 78 insertions(+), 68 deletions(-) diff --git a/beacon_node/beacon_processor/src/lib.rs b/beacon_node/beacon_processor/src/lib.rs index d6746fd51c9..d0a30675799 100644 --- a/beacon_node/beacon_processor/src/lib.rs +++ b/beacon_node/beacon_processor/src/lib.rs @@ -58,7 +58,7 @@ use std::pin::Pin; use std::sync::Arc; use std::task::Context; use std::time::{Duration, Instant}; -use strum::{EnumIter, IntoEnumIterator, IntoStaticStr}; +use strum::IntoStaticStr; use task_executor::TaskExecutor; use tokio::sync::mpsc; use tokio::sync::mpsc::error::TrySendError; @@ -627,7 +627,7 @@ impl fmt::Debug for Work { } } -#[derive(IntoStaticStr, EnumIter, PartialEq, Eq, Debug, Clone, Copy)] +#[derive(IntoStaticStr, PartialEq, Eq, Debug, Clone, Copy)] #[strum(serialize_all = "snake_case")] pub enum WorkType { GossipAttestation, @@ -1033,7 +1033,7 @@ impl BeaconProcessor { .is_some_and(|event| event.drop_during_sync); let idle_tx = idle_tx.clone(); - match work_event { + let modified_queue_id = match work_event { // There is no new work event, but we are able to spawn a new worker. // // We don't check the `work.drop_during_sync` here. We assume that if it made @@ -1414,6 +1414,81 @@ impl BeaconProcessor { } }; + if let Some(modified_queue_id) = modified_queue_id { + let get_queue_len = |work_type| match work_type { + WorkType::GossipAttestation => attestation_queue.len(), + WorkType::GossipAttestationToConvert => attestation_to_convert_queue.len(), + WorkType::UnknownBlockAttestation => unknown_block_attestation_queue.len(), + WorkType::GossipAttestationBatch => 0, // No queue + WorkType::GossipAggregate => aggregate_queue.len(), + WorkType::UnknownBlockAggregate => unknown_block_aggregate_queue.len(), + WorkType::UnknownLightClientOptimisticUpdate => { + unknown_light_client_update_queue.len() + } + WorkType::GossipAggregateBatch => 0, // No queue + WorkType::GossipBlock => gossip_block_queue.len(), + WorkType::GossipBlobSidecar => gossip_blob_queue.len(), + WorkType::GossipDataColumnSidecar => gossip_data_column_queue.len(), + WorkType::DelayedImportBlock => delayed_block_queue.len(), + WorkType::GossipVoluntaryExit => gossip_voluntary_exit_queue.len(), + WorkType::GossipProposerSlashing => gossip_proposer_slashing_queue.len(), + WorkType::GossipAttesterSlashing => gossip_attester_slashing_queue.len(), + WorkType::GossipSyncSignature => sync_message_queue.len(), + WorkType::GossipSyncContribution => sync_contribution_queue.len(), + WorkType::GossipLightClientFinalityUpdate => { + lc_gossip_finality_update_queue.len() + } + WorkType::GossipLightClientOptimisticUpdate => { + lc_gossip_optimistic_update_queue.len() + } + WorkType::RpcBlock => rpc_block_queue.len(), + WorkType::RpcBlobs | WorkType::IgnoredRpcBlock => rpc_blob_queue.len(), + WorkType::RpcCustodyColumn => rpc_custody_column_queue.len(), + WorkType::ColumnReconstruction => column_reconstruction_queue.len(), + WorkType::ChainSegment => chain_segment_queue.len(), + WorkType::ChainSegmentBackfill => backfill_chain_segment.len(), + WorkType::Status => status_queue.len(), + WorkType::BlocksByRangeRequest => blbrange_queue.len(), + WorkType::BlocksByRootsRequest => blbroots_queue.len(), + WorkType::BlobsByRangeRequest => bbrange_queue.len(), + WorkType::BlobsByRootsRequest => bbroots_queue.len(), + WorkType::DataColumnsByRootsRequest => dcbroots_queue.len(), + WorkType::DataColumnsByRangeRequest => dcbrange_queue.len(), + WorkType::GossipBlsToExecutionChange => { + gossip_bls_to_execution_change_queue.len() + } + WorkType::LightClientBootstrapRequest => lc_bootstrap_queue.len(), + WorkType::LightClientOptimisticUpdateRequest => { + lc_rpc_optimistic_update_queue.len() + } + WorkType::LightClientFinalityUpdateRequest => { + lc_rpc_finality_update_queue.len() + } + WorkType::LightClientUpdatesByRangeRequest => lc_update_range_queue.len(), + WorkType::ApiRequestP0 => api_request_p0_queue.len(), + WorkType::ApiRequestP1 => api_request_p1_queue.len(), + WorkType::Reprocess => 0, + }; + + // Update the metric for the length of this work type's queue. + metrics::observe_vec( + &metrics::BEACON_PROCESSOR_QUEUE_LENGTH, + &[modified_queue_id.into()], + get_queue_len(modified_queue_id) as f64, + ); + + // If the work was batched from another type of work, also update that work + // event's queue length. There was previously a bug here where those queues + // metrics could go a long time without being updated at all. + if let Some(batched_work_type) = modified_queue_id.batched_from() { + metrics::observe_vec( + &metrics::BEACON_PROCESSOR_QUEUE_LENGTH, + &[batched_work_type.into()], + get_queue_len(batched_work_type) as f64, + ); + } + } + if aggregate_queue.is_full() && aggregate_debounce.elapsed() { error!( msg = "the system has insufficient resources for load", @@ -1429,71 +1504,6 @@ impl BeaconProcessor { "Attestation queue full" ) } - - let get_queue_len = |work_type| match work_type { - WorkType::GossipAttestation => attestation_queue.len(), - WorkType::GossipAttestationToConvert => attestation_to_convert_queue.len(), - WorkType::UnknownBlockAttestation => unknown_block_attestation_queue.len(), - WorkType::GossipAttestationBatch => 0, // No queue - WorkType::GossipAggregate => aggregate_queue.len(), - WorkType::UnknownBlockAggregate => unknown_block_aggregate_queue.len(), - WorkType::UnknownLightClientOptimisticUpdate => { - unknown_light_client_update_queue.len() - } - WorkType::GossipAggregateBatch => 0, // No queue - WorkType::GossipBlock => gossip_block_queue.len(), - WorkType::GossipBlobSidecar => gossip_blob_queue.len(), - WorkType::GossipDataColumnSidecar => gossip_data_column_queue.len(), - WorkType::DelayedImportBlock => delayed_block_queue.len(), - WorkType::GossipVoluntaryExit => gossip_voluntary_exit_queue.len(), - WorkType::GossipProposerSlashing => gossip_proposer_slashing_queue.len(), - WorkType::GossipAttesterSlashing => gossip_attester_slashing_queue.len(), - WorkType::GossipSyncSignature => sync_message_queue.len(), - WorkType::GossipSyncContribution => sync_contribution_queue.len(), - WorkType::GossipLightClientFinalityUpdate => { - lc_gossip_finality_update_queue.len() - } - WorkType::GossipLightClientOptimisticUpdate => { - lc_gossip_optimistic_update_queue.len() - } - WorkType::RpcBlock => rpc_block_queue.len(), - WorkType::RpcBlobs | WorkType::IgnoredRpcBlock => rpc_blob_queue.len(), - WorkType::RpcCustodyColumn => rpc_custody_column_queue.len(), - WorkType::ColumnReconstruction => column_reconstruction_queue.len(), - WorkType::ChainSegment => chain_segment_queue.len(), - WorkType::ChainSegmentBackfill => backfill_chain_segment.len(), - WorkType::Status => status_queue.len(), - WorkType::BlocksByRangeRequest => blbrange_queue.len(), - WorkType::BlocksByRootsRequest => blbroots_queue.len(), - WorkType::BlobsByRangeRequest => bbrange_queue.len(), - WorkType::BlobsByRootsRequest => bbroots_queue.len(), - WorkType::DataColumnsByRootsRequest => dcbroots_queue.len(), - WorkType::DataColumnsByRangeRequest => dcbrange_queue.len(), - WorkType::GossipBlsToExecutionChange => { - gossip_bls_to_execution_change_queue.len() - } - WorkType::LightClientBootstrapRequest => lc_bootstrap_queue.len(), - WorkType::LightClientOptimisticUpdateRequest => { - lc_rpc_optimistic_update_queue.len() - } - WorkType::LightClientFinalityUpdateRequest => { - lc_rpc_finality_update_queue.len() - } - WorkType::LightClientUpdatesByRangeRequest => lc_update_range_queue.len(), - WorkType::ApiRequestP0 => api_request_p0_queue.len(), - WorkType::ApiRequestP1 => api_request_p1_queue.len(), - WorkType::Reprocess => 0, - }; - - // Update metrics for all work events. - for work_type in WorkType::iter() { - // Update the metric for the length of this work type's queue. - metrics::observe_vec( - &metrics::BEACON_PROCESSOR_QUEUE_LENGTH, - &[work_type.into()], - get_queue_len(work_type) as f64, - ); - } } }; From b90804bf62a1e3ea27ebcd585e3cd8aedd23f18c Mon Sep 17 00:00:00 2001 From: Michael Sproul Date: Mon, 15 Sep 2025 11:48:57 +1000 Subject: [PATCH 4/4] New approach: sample length only on receive --- beacon_node/beacon_processor/src/lib.rs | 177 +++++++++++------------- 1 file changed, 83 insertions(+), 94 deletions(-) diff --git a/beacon_node/beacon_processor/src/lib.rs b/beacon_node/beacon_processor/src/lib.rs index d0a30675799..7df2b378833 100644 --- a/beacon_node/beacon_processor/src/lib.rs +++ b/beacon_node/beacon_processor/src/lib.rs @@ -731,17 +731,6 @@ impl Work { } } -impl WorkType { - /// Return the type of work that was batched to create this type of work, if any. - pub fn batched_from(self) -> Option { - match self { - WorkType::GossipAggregateBatch => Some(WorkType::GossipAggregate), - WorkType::GossipAttestationBatch => Some(WorkType::GossipAttestation), - _ => None, - } - } -} - /// Unifies all the messages processed by the `BeaconProcessor`. enum InboundEvent { /// A worker has completed a task and is free. @@ -1033,7 +1022,7 @@ impl BeaconProcessor { .is_some_and(|event| event.drop_during_sync); let idle_tx = idle_tx.clone(); - let modified_queue_id = match work_event { + match work_event { // There is no new work event, but we are able to spawn a new worker. // // We don't check the `work.drop_during_sync` here. We assume that if it made @@ -1265,11 +1254,7 @@ impl BeaconProcessor { }; if let Some(work_event) = work_event { - let work_type = work_event.to_type(); self.spawn_worker(work_event, created_timestamp, idle_tx); - Some(work_type) - } else { - None } } // There is no new work event and we are unable to spawn a new worker. @@ -1280,7 +1265,6 @@ impl BeaconProcessor { msg = "no new work and cannot spawn worker", "Unexpected gossip processor condition" ); - None } // The chain is syncing and this event should be dropped during sync. Some(work_event) @@ -1297,7 +1281,6 @@ impl BeaconProcessor { work_id = work_id, "Gossip processor skipping work" ); - None } // There is a new work event and the chain is not syncing. Process it or queue // it. @@ -1305,6 +1288,88 @@ impl BeaconProcessor { let work_id = work.str_id(); let work_type = work.to_type(); + // Sample the length of the queue for this event type. It is important that + // we only sample upon receiving events, in order to make the full set of + // samples representative. + // + // See: https://github.com/sigp/lighthouse/pull/8020 + + // This closure gets the current length of the queue for a given + // `work_type`. + // + // It should be relocated when we shift the queues from being local + // variables to fields of the struct. + let get_queue_len = |work_type| match work_type { + WorkType::GossipAttestation => attestation_queue.len(), + WorkType::GossipAttestationToConvert => { + attestation_to_convert_queue.len() + } + WorkType::UnknownBlockAttestation => { + unknown_block_attestation_queue.len() + } + WorkType::GossipAttestationBatch => 0, // No queue + WorkType::GossipAggregate => aggregate_queue.len(), + WorkType::UnknownBlockAggregate => unknown_block_aggregate_queue.len(), + WorkType::UnknownLightClientOptimisticUpdate => { + unknown_light_client_update_queue.len() + } + WorkType::GossipAggregateBatch => 0, // No queue + WorkType::GossipBlock => gossip_block_queue.len(), + WorkType::GossipBlobSidecar => gossip_blob_queue.len(), + WorkType::GossipDataColumnSidecar => gossip_data_column_queue.len(), + WorkType::DelayedImportBlock => delayed_block_queue.len(), + WorkType::GossipVoluntaryExit => gossip_voluntary_exit_queue.len(), + WorkType::GossipProposerSlashing => { + gossip_proposer_slashing_queue.len() + } + WorkType::GossipAttesterSlashing => { + gossip_attester_slashing_queue.len() + } + WorkType::GossipSyncSignature => sync_message_queue.len(), + WorkType::GossipSyncContribution => sync_contribution_queue.len(), + WorkType::GossipLightClientFinalityUpdate => { + lc_gossip_finality_update_queue.len() + } + WorkType::GossipLightClientOptimisticUpdate => { + lc_gossip_optimistic_update_queue.len() + } + WorkType::RpcBlock => rpc_block_queue.len(), + WorkType::RpcBlobs | WorkType::IgnoredRpcBlock => rpc_blob_queue.len(), + WorkType::RpcCustodyColumn => rpc_custody_column_queue.len(), + WorkType::ColumnReconstruction => column_reconstruction_queue.len(), + WorkType::ChainSegment => chain_segment_queue.len(), + WorkType::ChainSegmentBackfill => backfill_chain_segment.len(), + WorkType::Status => status_queue.len(), + WorkType::BlocksByRangeRequest => blbrange_queue.len(), + WorkType::BlocksByRootsRequest => blbroots_queue.len(), + WorkType::BlobsByRangeRequest => bbrange_queue.len(), + WorkType::BlobsByRootsRequest => bbroots_queue.len(), + WorkType::DataColumnsByRootsRequest => dcbroots_queue.len(), + WorkType::DataColumnsByRangeRequest => dcbrange_queue.len(), + WorkType::GossipBlsToExecutionChange => { + gossip_bls_to_execution_change_queue.len() + } + WorkType::LightClientBootstrapRequest => lc_bootstrap_queue.len(), + WorkType::LightClientOptimisticUpdateRequest => { + lc_rpc_optimistic_update_queue.len() + } + WorkType::LightClientFinalityUpdateRequest => { + lc_rpc_finality_update_queue.len() + } + WorkType::LightClientUpdatesByRangeRequest => { + lc_update_range_queue.len() + } + WorkType::ApiRequestP0 => api_request_p0_queue.len(), + WorkType::ApiRequestP1 => api_request_p1_queue.len(), + WorkType::Reprocess => 0, + }; + + metrics::observe_vec( + &metrics::BEACON_PROCESSOR_QUEUE_LENGTH, + &[work_type.into()], + get_queue_len(work_type) as f64, + ); + match work { Work::Reprocess(work_event) => { if let Err(e) = reprocess_work_tx.try_send(work_event) { @@ -1410,85 +1475,9 @@ impl BeaconProcessor { Work::ApiRequestP0 { .. } => api_request_p0_queue.push(work, work_id), Work::ApiRequestP1 { .. } => api_request_p1_queue.push(work, work_id), }; - Some(work_type) } }; - if let Some(modified_queue_id) = modified_queue_id { - let get_queue_len = |work_type| match work_type { - WorkType::GossipAttestation => attestation_queue.len(), - WorkType::GossipAttestationToConvert => attestation_to_convert_queue.len(), - WorkType::UnknownBlockAttestation => unknown_block_attestation_queue.len(), - WorkType::GossipAttestationBatch => 0, // No queue - WorkType::GossipAggregate => aggregate_queue.len(), - WorkType::UnknownBlockAggregate => unknown_block_aggregate_queue.len(), - WorkType::UnknownLightClientOptimisticUpdate => { - unknown_light_client_update_queue.len() - } - WorkType::GossipAggregateBatch => 0, // No queue - WorkType::GossipBlock => gossip_block_queue.len(), - WorkType::GossipBlobSidecar => gossip_blob_queue.len(), - WorkType::GossipDataColumnSidecar => gossip_data_column_queue.len(), - WorkType::DelayedImportBlock => delayed_block_queue.len(), - WorkType::GossipVoluntaryExit => gossip_voluntary_exit_queue.len(), - WorkType::GossipProposerSlashing => gossip_proposer_slashing_queue.len(), - WorkType::GossipAttesterSlashing => gossip_attester_slashing_queue.len(), - WorkType::GossipSyncSignature => sync_message_queue.len(), - WorkType::GossipSyncContribution => sync_contribution_queue.len(), - WorkType::GossipLightClientFinalityUpdate => { - lc_gossip_finality_update_queue.len() - } - WorkType::GossipLightClientOptimisticUpdate => { - lc_gossip_optimistic_update_queue.len() - } - WorkType::RpcBlock => rpc_block_queue.len(), - WorkType::RpcBlobs | WorkType::IgnoredRpcBlock => rpc_blob_queue.len(), - WorkType::RpcCustodyColumn => rpc_custody_column_queue.len(), - WorkType::ColumnReconstruction => column_reconstruction_queue.len(), - WorkType::ChainSegment => chain_segment_queue.len(), - WorkType::ChainSegmentBackfill => backfill_chain_segment.len(), - WorkType::Status => status_queue.len(), - WorkType::BlocksByRangeRequest => blbrange_queue.len(), - WorkType::BlocksByRootsRequest => blbroots_queue.len(), - WorkType::BlobsByRangeRequest => bbrange_queue.len(), - WorkType::BlobsByRootsRequest => bbroots_queue.len(), - WorkType::DataColumnsByRootsRequest => dcbroots_queue.len(), - WorkType::DataColumnsByRangeRequest => dcbrange_queue.len(), - WorkType::GossipBlsToExecutionChange => { - gossip_bls_to_execution_change_queue.len() - } - WorkType::LightClientBootstrapRequest => lc_bootstrap_queue.len(), - WorkType::LightClientOptimisticUpdateRequest => { - lc_rpc_optimistic_update_queue.len() - } - WorkType::LightClientFinalityUpdateRequest => { - lc_rpc_finality_update_queue.len() - } - WorkType::LightClientUpdatesByRangeRequest => lc_update_range_queue.len(), - WorkType::ApiRequestP0 => api_request_p0_queue.len(), - WorkType::ApiRequestP1 => api_request_p1_queue.len(), - WorkType::Reprocess => 0, - }; - - // Update the metric for the length of this work type's queue. - metrics::observe_vec( - &metrics::BEACON_PROCESSOR_QUEUE_LENGTH, - &[modified_queue_id.into()], - get_queue_len(modified_queue_id) as f64, - ); - - // If the work was batched from another type of work, also update that work - // event's queue length. There was previously a bug here where those queues - // metrics could go a long time without being updated at all. - if let Some(batched_work_type) = modified_queue_id.batched_from() { - metrics::observe_vec( - &metrics::BEACON_PROCESSOR_QUEUE_LENGTH, - &[batched_work_type.into()], - get_queue_len(batched_work_type) as f64, - ); - } - } - if aggregate_queue.is_full() && aggregate_debounce.elapsed() { error!( msg = "the system has insufficient resources for load",