Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
58 changes: 51 additions & 7 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

34 changes: 16 additions & 18 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -35,21 +35,12 @@ unused-async = "warn"
blake3 = "1.8"
bytes = { version = "1.7", features = ["serde"] }
data-encoding = "2.6.0"
derive_more = { version = "2.0.1", features = [
"add",
"debug",
"deref",
"display",
"from",
"try_into",
"into",
] }
ed25519-dalek = { version = "3.0.0-pre.1", features = ["serde", "rand_core"] }
derive_more = { version = "1.0.0", features = ["add", "debug", "deref", "display", "from", "try_into", "into", "deref_mut"] }
hex = "0.4.3"
indexmap = "2.0"
iroh-metrics = { version = "0.36", default-features = false }
iroh-base = { version = "0.93", default-features = false, features = ["key"] }
n0-future = "0.1.2"
n0-future = "0.2"
postcard = { version = "1", default-features = false, features = [
"alloc",
"use-std",
Expand All @@ -64,13 +55,16 @@ futures-concurrency = { version = "7.6.1", optional = true }
futures-util = { version = "0.3.30", optional = true }
iroh = { version = "0.93", default-features = false, optional = true }
tokio = { version = "1", optional = true, features = ["io-util", "sync"] }
tokio-util = { version = "0.7.12", optional = true, features = ["codec"] }
tokio-util = { version = "0.7.12", optional = true, features = ["codec", "time"] }
tracing = "0.1"
irpc = { version = "0.9.0", optional = true, default-features = false, features = [
"derive",
"stream",
"spans",
] }
irpc-iroh = { version = "0.9.0", optional = true }
n0-watcher = { version = "0.3.0", optional = true }
strum = { version = "0.27.2", features = ["derive"], optional = true }
n0-snafu = { version = "0.2.2", optional = true }
nested_enum_utils = { version = "0.2.2", optional = true }
snafu = { version = "0.8.5", features = ["rust_1_81"], optional = true }
Expand Down Expand Up @@ -103,6 +97,7 @@ tokio = { version = "1", features = [
"fs",
] }
clap = { version = "4", features = ["derive"] }
ed25519-dalek = { version = "3.0.0-pre.1", features = ["serde", "rand_core"] }
humantime-serde = { version = "1.1.1" }
iroh = { version = "0.93", default-features = false, features = [
"metrics",
Expand All @@ -117,16 +112,19 @@ url = "2.4.0"
[features]
default = ["net", "metrics"]
net = [
"dep:irpc",
"dep:futures-concurrency",
"dep:futures-lite",
"dep:iroh",
"dep:tokio",
"dep:tokio-util",
"dep:futures-util",
"dep:futures-concurrency",
"dep:nested_enum_utils",
"dep:iroh",
"dep:irpc",
"dep:irpc-iroh",
"dep:n0-snafu",
"dep:n0-watcher",
"dep:nested_enum_utils",
"dep:snafu",
"dep:strum",
"dep:tokio",
"dep:tokio-util",
]
rpc = [
"dep:irpc",
Expand Down
16 changes: 14 additions & 2 deletions src/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -373,7 +373,7 @@ pub struct Message {
}

/// Command for a gossip topic.
#[derive(Serialize, Deserialize, derive_more::Debug, Clone)]
#[derive(Serialize, Deserialize, derive_more::Debug, Clone, strum::Display)]
pub enum Command {
/// Broadcasts a message to all nodes in the swarm.
Broadcast(#[debug("Bytes({})", _0.len())] Bytes),
Expand All @@ -383,6 +383,18 @@ pub enum Command {
JoinPeers(Vec<NodeId>),
}

impl From<Command> for crate::proto::Command<NodeId> {
fn from(value: Command) -> Self {
match value {
Command::Broadcast(bytes) => Self::Broadcast(bytes, crate::proto::Scope::Swarm),
Command::BroadcastNeighbors(bytes) => {
Self::Broadcast(bytes, crate::proto::Scope::Neighbors)
}
Command::JoinPeers(peers) => Self::Join(peers),
}
}
}

/// Options for joining a gossip topic.
#[derive(Serialize, Deserialize, Debug)]
pub struct JoinOptions {
Expand Down Expand Up @@ -424,7 +436,7 @@ mod tests {

use crate::{
api::{Event, GossipApi},
net::{test::create_endpoint, Gossip},
net::{tests::create_endpoint, Gossip},
proto::TopicId,
ALPN,
};
Expand Down
94 changes: 80 additions & 14 deletions src/metrics.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,13 @@
//! Metrics for iroh-gossip

use iroh_metrics::{Counter, MetricsGroup};
use serde::Serialize;

use crate::proto::{
self,
state::MessageKind,
topic::{InEvent, OutEvent},
};

/// Enum of metrics for the module
#[derive(Debug, Default, MetricsGroup)]
Expand All @@ -26,20 +33,79 @@ pub struct Metrics {
pub neighbor_up: Counter,
/// Number of times we disconnected from a peer
pub neighbor_down: Counter,
/// Number of messages we broadcasted to all nodes
pub msgs_broadcast_swarm: Counter,
/// Number of messages we broadcasted to direct neighbors
pub msgs_broadcast_neighbors: Counter,
/// Number of topcis we joined.
pub topics_joined: Counter,
/// Number of topcis we left.
pub topics_quit: Counter,
/// Number of times we successfully dialed a remote node.
pub peers_dialed_success: Counter,
/// Number of times we failed to dial a remote node.
pub peers_dialed_failure: Counter,
/// Number of times we accepted a connection from a remote node.
pub peers_accepted: Counter,
/// Number of times the main actor loop ticked
pub actor_tick_main: Counter,
/// Number of times the actor ticked for a message received
pub actor_tick_rx: Counter,
/// Number of times the actor ticked for an endpoint event
pub actor_tick_endpoint: Counter,
/// Number of times the actor ticked for a dialer event
pub actor_tick_dialer: Counter,
/// Number of times the actor ticked for a successful dialer event
pub actor_tick_dialer_success: Counter,
/// Number of times the actor ticked for a failed dialer event
pub actor_tick_dialer_failure: Counter,
/// Number of times the actor ticked for an incoming event
pub actor_tick_in_event_rx: Counter,
/// Number of times the actor ticked for a timer event
pub actor_tick_timers: Counter,
}

impl Metrics {
/// Track an [`InEvent`].
pub fn track_in_event<PI: Serialize>(&self, in_event: &InEvent<PI>) {
match in_event {
InEvent::RecvMessage(_, message) => match message.kind() {
MessageKind::Data => {
self.msgs_data_recv.inc();
self.msgs_data_recv_size
.inc_by(message.size().unwrap_or(0) as u64);
}
MessageKind::Control => {
self.msgs_ctrl_recv.inc();
self.msgs_ctrl_recv_size
.inc_by(message.size().unwrap_or(0) as u64);
}
},
InEvent::Command(cmd) => match cmd {
proto::Command::Broadcast(_, scope) => match scope {
proto::Scope::Swarm => inc(&self.msgs_broadcast_swarm),
proto::Scope::Neighbors => inc(&self.msgs_broadcast_neighbors),
},
proto::Command::Join(_) => {}
proto::Command::Quit => {}
},
InEvent::TimerExpired(_) => {}
InEvent::PeerDisconnected(_) => {}
InEvent::UpdatePeerData(_) => {}
}
}

/// Track an [`OutEvent`].
pub fn track_out_event<PI: Serialize>(&self, out_event: &OutEvent<PI>) {
match out_event {
OutEvent::SendMessage(_to, message) => match message.kind() {
MessageKind::Data => {
self.msgs_data_sent.inc();
self.msgs_data_sent_size
.inc_by(message.size().unwrap_or(0) as u64);
}
MessageKind::Control => {
self.msgs_ctrl_sent.inc();
self.msgs_ctrl_sent_size
.inc_by(message.size().unwrap_or(0) as u64);
}
},
OutEvent::EmitEvent(event) => match event {
proto::Event::NeighborUp(_peer) => inc(&self.neighbor_up),
proto::Event::NeighborDown(_peer) => inc(&self.neighbor_down),
_ => {}
},
_ => {}
}
}
}

pub(crate) fn inc(counter: &Counter) {
counter.inc();
}
Loading
Loading