Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
159 changes: 86 additions & 73 deletions beacon_node/beacon_processor/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -627,7 +627,7 @@ impl<E: EthSpec> fmt::Debug for Work<E> {
}
}

#[derive(IntoStaticStr, PartialEq, Eq, Debug, Clone)]
#[derive(IntoStaticStr, PartialEq, Eq, Debug, Clone, Copy)]
#[strum(serialize_all = "snake_case")]
pub enum WorkType {
GossipAttestation,
Expand Down Expand Up @@ -1022,7 +1022,7 @@ impl<E: EthSpec> BeaconProcessor<E> {
.is_some_and(|event| event.drop_during_sync);

let idle_tx = idle_tx.clone();
let modified_queue_id = match work_event {
match work_event {
// There is no new work event, but we are able to spawn a new worker.
//
// We don't check the `work.drop_during_sync` here. We assume that if it made
Expand Down Expand Up @@ -1254,11 +1254,7 @@ impl<E: EthSpec> BeaconProcessor<E> {
};

if let Some(work_event) = work_event {
let work_type = work_event.to_type();
self.spawn_worker(work_event, created_timestamp, idle_tx);
Some(work_type)
} else {
None
}
}
// There is no new work event and we are unable to spawn a new worker.
Expand All @@ -1269,7 +1265,6 @@ impl<E: EthSpec> BeaconProcessor<E> {
msg = "no new work and cannot spawn worker",
"Unexpected gossip processor condition"
);
None
}
// The chain is syncing and this event should be dropped during sync.
Some(work_event)
Expand All @@ -1286,14 +1281,95 @@ impl<E: EthSpec> BeaconProcessor<E> {
work_id = work_id,
"Gossip processor skipping work"
);
None
}
// There is a new work event and the chain is not syncing. Process it or queue
// it.
Some(WorkEvent { work, .. }) => {
let work_id = work.str_id();
let work_type = work.to_type();

// Sample the length of the queue for this event type. It is important that
// we only sample upon receiving events, in order to make the full set of
// samples representative.
//
// See: https://github.com/sigp/lighthouse/pull/8020

// This closure gets the current length of the queue for a given
// `work_type`.
//
// It should be relocated when we shift the queues from being local
// variables to fields of the struct.
let get_queue_len = |work_type| match work_type {
WorkType::GossipAttestation => attestation_queue.len(),
WorkType::GossipAttestationToConvert => {
attestation_to_convert_queue.len()
}
WorkType::UnknownBlockAttestation => {
unknown_block_attestation_queue.len()
}
WorkType::GossipAttestationBatch => 0, // No queue
WorkType::GossipAggregate => aggregate_queue.len(),
WorkType::UnknownBlockAggregate => unknown_block_aggregate_queue.len(),
WorkType::UnknownLightClientOptimisticUpdate => {
unknown_light_client_update_queue.len()
}
WorkType::GossipAggregateBatch => 0, // No queue
WorkType::GossipBlock => gossip_block_queue.len(),
WorkType::GossipBlobSidecar => gossip_blob_queue.len(),
WorkType::GossipDataColumnSidecar => gossip_data_column_queue.len(),
WorkType::DelayedImportBlock => delayed_block_queue.len(),
WorkType::GossipVoluntaryExit => gossip_voluntary_exit_queue.len(),
WorkType::GossipProposerSlashing => {
gossip_proposer_slashing_queue.len()
}
WorkType::GossipAttesterSlashing => {
gossip_attester_slashing_queue.len()
}
WorkType::GossipSyncSignature => sync_message_queue.len(),
WorkType::GossipSyncContribution => sync_contribution_queue.len(),
WorkType::GossipLightClientFinalityUpdate => {
lc_gossip_finality_update_queue.len()
}
WorkType::GossipLightClientOptimisticUpdate => {
lc_gossip_optimistic_update_queue.len()
}
WorkType::RpcBlock => rpc_block_queue.len(),
WorkType::RpcBlobs | WorkType::IgnoredRpcBlock => rpc_blob_queue.len(),
WorkType::RpcCustodyColumn => rpc_custody_column_queue.len(),
WorkType::ColumnReconstruction => column_reconstruction_queue.len(),
WorkType::ChainSegment => chain_segment_queue.len(),
WorkType::ChainSegmentBackfill => backfill_chain_segment.len(),
WorkType::Status => status_queue.len(),
WorkType::BlocksByRangeRequest => blbrange_queue.len(),
WorkType::BlocksByRootsRequest => blbroots_queue.len(),
WorkType::BlobsByRangeRequest => bbrange_queue.len(),
WorkType::BlobsByRootsRequest => bbroots_queue.len(),
WorkType::DataColumnsByRootsRequest => dcbroots_queue.len(),
WorkType::DataColumnsByRangeRequest => dcbrange_queue.len(),
WorkType::GossipBlsToExecutionChange => {
gossip_bls_to_execution_change_queue.len()
}
WorkType::LightClientBootstrapRequest => lc_bootstrap_queue.len(),
WorkType::LightClientOptimisticUpdateRequest => {
lc_rpc_optimistic_update_queue.len()
}
WorkType::LightClientFinalityUpdateRequest => {
lc_rpc_finality_update_queue.len()
}
WorkType::LightClientUpdatesByRangeRequest => {
lc_update_range_queue.len()
}
WorkType::ApiRequestP0 => api_request_p0_queue.len(),
WorkType::ApiRequestP1 => api_request_p1_queue.len(),
WorkType::Reprocess => 0,
};

metrics::observe_vec(
&metrics::BEACON_PROCESSOR_QUEUE_LENGTH,
&[work_type.into()],
get_queue_len(work_type) as f64,
);

match work {
Work::Reprocess(work_event) => {
if let Err(e) = reprocess_work_tx.try_send(work_event) {
Expand Down Expand Up @@ -1399,72 +1475,9 @@ impl<E: EthSpec> BeaconProcessor<E> {
Work::ApiRequestP0 { .. } => api_request_p0_queue.push(work, work_id),
Work::ApiRequestP1 { .. } => api_request_p1_queue.push(work, work_id),
};
Some(work_type)
}
};

if let Some(modified_queue_id) = modified_queue_id {
let queue_len = match modified_queue_id {
WorkType::GossipAttestation => attestation_queue.len(),
WorkType::GossipAttestationToConvert => attestation_to_convert_queue.len(),
WorkType::UnknownBlockAttestation => unknown_block_attestation_queue.len(),
WorkType::GossipAttestationBatch => 0, // No queue
WorkType::GossipAggregate => aggregate_queue.len(),
WorkType::UnknownBlockAggregate => unknown_block_aggregate_queue.len(),
WorkType::UnknownLightClientOptimisticUpdate => {
unknown_light_client_update_queue.len()
}
WorkType::GossipAggregateBatch => 0, // No queue
WorkType::GossipBlock => gossip_block_queue.len(),
WorkType::GossipBlobSidecar => gossip_blob_queue.len(),
WorkType::GossipDataColumnSidecar => gossip_data_column_queue.len(),
WorkType::DelayedImportBlock => delayed_block_queue.len(),
WorkType::GossipVoluntaryExit => gossip_voluntary_exit_queue.len(),
WorkType::GossipProposerSlashing => gossip_proposer_slashing_queue.len(),
WorkType::GossipAttesterSlashing => gossip_attester_slashing_queue.len(),
WorkType::GossipSyncSignature => sync_message_queue.len(),
WorkType::GossipSyncContribution => sync_contribution_queue.len(),
WorkType::GossipLightClientFinalityUpdate => {
lc_gossip_finality_update_queue.len()
}
WorkType::GossipLightClientOptimisticUpdate => {
lc_gossip_optimistic_update_queue.len()
}
WorkType::RpcBlock => rpc_block_queue.len(),
WorkType::RpcBlobs | WorkType::IgnoredRpcBlock => rpc_blob_queue.len(),
WorkType::RpcCustodyColumn => rpc_custody_column_queue.len(),
WorkType::ColumnReconstruction => column_reconstruction_queue.len(),
WorkType::ChainSegment => chain_segment_queue.len(),
WorkType::ChainSegmentBackfill => backfill_chain_segment.len(),
WorkType::Status => status_queue.len(),
WorkType::BlocksByRangeRequest => blbrange_queue.len(),
WorkType::BlocksByRootsRequest => blbroots_queue.len(),
WorkType::BlobsByRangeRequest => bbrange_queue.len(),
WorkType::BlobsByRootsRequest => bbroots_queue.len(),
WorkType::DataColumnsByRootsRequest => dcbroots_queue.len(),
WorkType::DataColumnsByRangeRequest => dcbrange_queue.len(),
WorkType::GossipBlsToExecutionChange => {
gossip_bls_to_execution_change_queue.len()
}
WorkType::LightClientBootstrapRequest => lc_bootstrap_queue.len(),
WorkType::LightClientOptimisticUpdateRequest => {
lc_rpc_optimistic_update_queue.len()
}
WorkType::LightClientFinalityUpdateRequest => {
lc_rpc_finality_update_queue.len()
}
WorkType::LightClientUpdatesByRangeRequest => lc_update_range_queue.len(),
WorkType::ApiRequestP0 => api_request_p0_queue.len(),
WorkType::ApiRequestP1 => api_request_p1_queue.len(),
WorkType::Reprocess => 0,
};
metrics::observe_vec(
&metrics::BEACON_PROCESSOR_QUEUE_LENGTH,
&[modified_queue_id.into()],
queue_len as f64,
);
}

if aggregate_queue.is_full() && aggregate_debounce.elapsed() {
error!(
msg = "the system has insufficient resources for load",
Expand Down Expand Up @@ -1690,10 +1703,10 @@ impl Drop for SendOnDrop {
fn drop(&mut self) {
metrics::dec_gauge_vec(
&metrics::BEACON_PROCESSOR_WORKERS_ACTIVE_GAUGE_BY_TYPE,
&[self.work_type.clone().into()],
&[self.work_type.into()],
);

if let Err(e) = self.tx.try_send(self.work_type.clone()) {
if let Err(e) = self.tx.try_send(self.work_type) {
warn!(
msg = "did not free worker, shutdown may be underway",
error = %e,
Expand Down
Loading