Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 24 additions & 0 deletions beacon_node/network/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -491,6 +491,30 @@ pub static SYNC_UNKNOWN_NETWORK_REQUESTS: LazyLock<Result<IntCounterVec>> = Lazy
&["type"],
)
});
pub static SYNC_RPC_REQUEST_SUCCESSES: LazyLock<Result<IntCounterVec>> = LazyLock::new(|| {
try_create_int_counter_vec(
"sync_rpc_requests_success_total",
"Total count of sync RPC requests successes",
&["protocol"],
)
});
pub static SYNC_RPC_REQUEST_ERRORS: LazyLock<Result<IntCounterVec>> = LazyLock::new(|| {
try_create_int_counter_vec(
"sync_rpc_requests_error_total",
"Total count of sync RPC requests errors",
&["protocol", "error"],
)
});
pub static SYNC_RPC_REQUEST_TIME: LazyLock<Result<HistogramVec>> = LazyLock::new(|| {
try_create_histogram_vec_with_buckets(
"sync_rpc_request_duration_sec",
"Time to complete a successful sync RPC requesst",
Ok(vec![
0.001, 0.005, 0.01, 0.025, 0.05, 0.075, 0.1, 0.25, 0.5, 1.0, 2.0,
]),
&["protocol"],
)
});

/*
* Block Delay Metrics
Expand Down
37 changes: 8 additions & 29 deletions beacon_node/network/src/sync/network_context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1379,7 +1379,7 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
}
})
});
self.on_rpc_response_result(id, "BlocksByRoot", resp, peer_id, |_| 1)
self.on_rpc_response_result(resp, peer_id)
}

pub(crate) fn on_single_blob_response(
Expand Down Expand Up @@ -1408,7 +1408,7 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
}
})
});
self.on_rpc_response_result(id, "BlobsByRoot", resp, peer_id, |_| 1)
self.on_rpc_response_result(resp, peer_id)
}

#[allow(clippy::type_complexity)]
Expand All @@ -1421,7 +1421,7 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
let resp = self
.data_columns_by_root_requests
.on_response(id, rpc_event);
self.on_rpc_response_result(id, "DataColumnsByRoot", resp, peer_id, |_| 1)
self.on_rpc_response_result(resp, peer_id)
}

#[allow(clippy::type_complexity)]
Expand All @@ -1432,7 +1432,7 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
rpc_event: RpcEvent<Arc<SignedBeaconBlock<T::EthSpec>>>,
) -> Option<RpcResponseResult<Vec<Arc<SignedBeaconBlock<T::EthSpec>>>>> {
let resp = self.blocks_by_range_requests.on_response(id, rpc_event);
self.on_rpc_response_result(id, "BlocksByRange", resp, peer_id, |b| b.len())
self.on_rpc_response_result(resp, peer_id)
}

#[allow(clippy::type_complexity)]
Expand All @@ -1443,7 +1443,7 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
rpc_event: RpcEvent<Arc<BlobSidecar<T::EthSpec>>>,
) -> Option<RpcResponseResult<Vec<Arc<BlobSidecar<T::EthSpec>>>>> {
let resp = self.blobs_by_range_requests.on_response(id, rpc_event);
self.on_rpc_response_result(id, "BlobsByRangeRequest", resp, peer_id, |b| b.len())
self.on_rpc_response_result(resp, peer_id)
}

#[allow(clippy::type_complexity)]
Expand All @@ -1456,36 +1456,15 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
let resp = self
.data_columns_by_range_requests
.on_response(id, rpc_event);
self.on_rpc_response_result(id, "DataColumnsByRange", resp, peer_id, |d| d.len())
self.on_rpc_response_result(resp, peer_id)
}

fn on_rpc_response_result<I: std::fmt::Display, R, F: FnOnce(&R) -> usize>(
/// Common handler for consistent scoring of RpcResponseError
fn on_rpc_response_result<R>(
&mut self,
id: I,
method: &'static str,
resp: Option<RpcResponseResult<R>>,
peer_id: PeerId,
get_count: F,
) -> Option<RpcResponseResult<R>> {
match &resp {
None => {}
Some(Ok((v, _))) => {
debug!(
%id,
method,
count = get_count(v),
"Sync RPC request completed"
);
}
Some(Err(e)) => {
debug!(
%id,
method,
error = ?e,
"Sync RPC request error"
);
}
}
if let Some(Err(RpcResponseError::VerifyError(e))) = &resp {
self.report_peer(peer_id, PeerAction::LowToleranceError, e.into());
}
Expand Down
54 changes: 48 additions & 6 deletions beacon_node/network/src/sync/network_context/requests.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
use std::time::Instant;
use std::{collections::hash_map::Entry, hash::Hash};

use beacon_chain::validator_monitor::timestamp_now;
use fnv::FnvHashMap;
use lighthouse_network::PeerId;
use strum::IntoStaticStr;
use tracing::debug;
use types::{Hash256, Slot};

pub use blobs_by_range::BlobsByRangeRequestItems;
Expand All @@ -17,7 +19,7 @@ pub use data_columns_by_root::{

use crate::metrics;

use super::{RpcEvent, RpcResponseResult};
use super::{RpcEvent, RpcResponseError, RpcResponseResult};

mod blobs_by_range;
mod blobs_by_root;
Expand Down Expand Up @@ -50,6 +52,7 @@ struct ActiveRequest<T: ActiveRequestItems> {
peer_id: PeerId,
// Error if the request terminates before receiving max expected responses
expect_max_responses: bool,
start_instant: Instant,
}

enum State<T> {
Expand All @@ -58,7 +61,7 @@ enum State<T> {
Errored,
}

impl<K: Eq + Hash, T: ActiveRequestItems> ActiveRequests<K, T> {
impl<K: Copy + Eq + Hash + std::fmt::Display, T: ActiveRequestItems> ActiveRequests<K, T> {
pub fn new(name: &'static str) -> Self {
Self {
requests: <_>::default(),
Expand All @@ -73,6 +76,7 @@ impl<K: Eq + Hash, T: ActiveRequestItems> ActiveRequests<K, T> {
state: State::Active(items),
peer_id,
expect_max_responses,
start_instant: Instant::now(),
},
);
}
Expand All @@ -96,7 +100,7 @@ impl<K: Eq + Hash, T: ActiveRequestItems> ActiveRequests<K, T> {
return None;
};

match rpc_event {
let result = match rpc_event {
// Handler of a success ReqResp chunk. Adds the item to the request accumulator.
// `ActiveRequestItems` validates the item before appending to its internal state.
RpcEvent::Response(item, seen_timestamp) => {
Expand All @@ -109,7 +113,7 @@ impl<K: Eq + Hash, T: ActiveRequestItems> ActiveRequests<K, T> {
Ok(true) => {
let items = items.consume();
request.state = State::CompletedEarly;
Some(Ok((items, seen_timestamp)))
Some(Ok((items, seen_timestamp, request.start_instant.elapsed())))
}
// Received item, but we are still expecting more
Ok(false) => None,
Expand Down Expand Up @@ -145,7 +149,11 @@ impl<K: Eq + Hash, T: ActiveRequestItems> ActiveRequests<K, T> {
}
.into()))
} else {
Some(Ok((items.consume(), timestamp_now())))
Some(Ok((
items.consume(),
timestamp_now(),
request.start_instant.elapsed(),
)))
}
}
// Items already returned, ignore stream termination
Expand All @@ -168,7 +176,41 @@ impl<K: Eq + Hash, T: ActiveRequestItems> ActiveRequests<K, T> {
State::Errored => None,
}
}
}
};

result.map(|result| match result {
Ok((items, seen_timestamp, duration)) => {
metrics::inc_counter_vec(&metrics::SYNC_RPC_REQUEST_SUCCESSES, &[self.name]);
metrics::observe_timer_vec(&metrics::SYNC_RPC_REQUEST_TIME, &[self.name], duration);
debug!(
%id,
method = self.name,
count = items.len(),
"Sync RPC request completed"
);
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Moved the match from on_rpc_response_result to here as I need access to start_instant


Ok((items, seen_timestamp))
}
Err(e) => {
let err_str: &'static str = match &e {
RpcResponseError::RpcError(e) => e.into(),
RpcResponseError::VerifyError(e) => e.into(),
RpcResponseError::CustodyRequestError(_) => "CustodyRequestError",
RpcResponseError::BlockComponentCouplingError(_) => {
"BlockComponentCouplingError"
}
};
metrics::inc_counter_vec(&metrics::SYNC_RPC_REQUEST_ERRORS, &[self.name, err_str]);
debug!(
%id,
method = self.name,
error = ?e,
"Sync RPC request error"
);

Err(e)
}
})
}

pub fn active_requests_of_peer(&self, peer_id: &PeerId) -> Vec<&K> {
Expand Down