Skip to content

Commit 739bb0b

Browse files
committed
wip
1 parent 07e398a commit 739bb0b

File tree

8 files changed

+1251
-1324
lines changed

8 files changed

+1251
-1324
lines changed

Cargo.lock

Lines changed: 51 additions & 7 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 16 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -35,21 +35,12 @@ unused-async = "warn"
3535
blake3 = "1.8"
3636
bytes = { version = "1.7", features = ["serde"] }
3737
data-encoding = "2.6.0"
38-
derive_more = { version = "2.0.1", features = [
39-
"add",
40-
"debug",
41-
"deref",
42-
"display",
43-
"from",
44-
"try_into",
45-
"into",
46-
] }
47-
ed25519-dalek = { version = "3.0.0-pre.1", features = ["serde", "rand_core"] }
38+
derive_more = { version = "1.0.0", features = ["add", "debug", "deref", "display", "from", "try_into", "into", "deref_mut"] }
4839
hex = "0.4.3"
4940
indexmap = "2.0"
5041
iroh-metrics = { version = "0.36", default-features = false }
5142
iroh-base = { version = "0.93", default-features = false, features = ["key"] }
52-
n0-future = "0.1.2"
43+
n0-future = "0.2"
5344
postcard = { version = "1", default-features = false, features = [
5445
"alloc",
5546
"use-std",
@@ -64,13 +55,16 @@ futures-concurrency = { version = "7.6.1", optional = true }
6455
futures-util = { version = "0.3.30", optional = true }
6556
iroh = { version = "0.93", default-features = false, optional = true }
6657
tokio = { version = "1", optional = true, features = ["io-util", "sync"] }
67-
tokio-util = { version = "0.7.12", optional = true, features = ["codec"] }
58+
tokio-util = { version = "0.7.12", optional = true, features = ["codec", "time"] }
6859
tracing = "0.1"
6960
irpc = { version = "0.9.0", optional = true, default-features = false, features = [
7061
"derive",
7162
"stream",
7263
"spans",
7364
] }
65+
irpc-iroh = { version = "0.9.0", optional = true }
66+
n0-watcher = { version = "0.3.0", optional = true }
67+
strum = { version = "0.27.2", features = ["derive"], optional = true }
7468
n0-snafu = { version = "0.2.2", optional = true }
7569
nested_enum_utils = { version = "0.2.2", optional = true }
7670
snafu = { version = "0.8.5", features = ["rust_1_81"], optional = true }
@@ -103,6 +97,7 @@ tokio = { version = "1", features = [
10397
"fs",
10498
] }
10599
clap = { version = "4", features = ["derive"] }
100+
ed25519-dalek = { version = "3.0.0-pre.1", features = ["serde", "rand_core"] }
106101
humantime-serde = { version = "1.1.1" }
107102
iroh = { version = "0.93", default-features = false, features = [
108103
"metrics",
@@ -117,16 +112,19 @@ url = "2.4.0"
117112
[features]
118113
default = ["net", "metrics"]
119114
net = [
120-
"dep:irpc",
115+
"dep:futures-concurrency",
121116
"dep:futures-lite",
122-
"dep:iroh",
123-
"dep:tokio",
124-
"dep:tokio-util",
125117
"dep:futures-util",
126-
"dep:futures-concurrency",
127-
"dep:nested_enum_utils",
118+
"dep:iroh",
119+
"dep:irpc",
120+
"dep:irpc-iroh",
128121
"dep:n0-snafu",
122+
"dep:n0-watcher",
123+
"dep:nested_enum_utils",
129124
"dep:snafu",
125+
"dep:strum",
126+
"dep:tokio",
127+
"dep:tokio-util",
130128
]
131129
rpc = [
132130
"dep:irpc",

src/api.rs

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -373,7 +373,7 @@ pub struct Message {
373373
}
374374

375375
/// Command for a gossip topic.
376-
#[derive(Serialize, Deserialize, derive_more::Debug, Clone)]
376+
#[derive(Serialize, Deserialize, derive_more::Debug, Clone, strum::Display)]
377377
pub enum Command {
378378
/// Broadcasts a message to all nodes in the swarm.
379379
Broadcast(#[debug("Bytes({})", _0.len())] Bytes),
@@ -383,6 +383,18 @@ pub enum Command {
383383
JoinPeers(Vec<NodeId>),
384384
}
385385

386+
impl From<Command> for crate::proto::Command<NodeId> {
387+
fn from(value: Command) -> Self {
388+
match value {
389+
Command::Broadcast(bytes) => Self::Broadcast(bytes, crate::proto::Scope::Swarm),
390+
Command::BroadcastNeighbors(bytes) => {
391+
Self::Broadcast(bytes, crate::proto::Scope::Neighbors)
392+
}
393+
Command::JoinPeers(peers) => Self::Join(peers),
394+
}
395+
}
396+
}
397+
386398
/// Options for joining a gossip topic.
387399
#[derive(Serialize, Deserialize, Debug)]
388400
pub struct JoinOptions {
@@ -424,7 +436,7 @@ mod tests {
424436

425437
use crate::{
426438
api::{Event, GossipApi},
427-
net::{test::create_endpoint, Gossip},
439+
net::{tests::create_endpoint, Gossip},
428440
proto::TopicId,
429441
ALPN,
430442
};

src/metrics.rs

Lines changed: 80 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,13 @@
11
//! Metrics for iroh-gossip
22
33
use iroh_metrics::{Counter, MetricsGroup};
4+
use serde::Serialize;
5+
6+
use crate::proto::{
7+
self,
8+
state::MessageKind,
9+
topic::{InEvent, OutEvent},
10+
};
411

512
/// Enum of metrics for the module
613
#[derive(Debug, Default, MetricsGroup)]
@@ -26,20 +33,79 @@ pub struct Metrics {
2633
pub neighbor_up: Counter,
2734
/// Number of times we disconnected from a peer
2835
pub neighbor_down: Counter,
36+
/// Number of messages we broadcasted to all nodes
37+
pub msgs_broadcast_swarm: Counter,
38+
/// Number of messages we broadcasted to direct neighbors
39+
pub msgs_broadcast_neighbors: Counter,
40+
/// Number of topcis we joined.
41+
pub topics_joined: Counter,
42+
/// Number of topcis we left.
43+
pub topics_quit: Counter,
44+
/// Number of times we successfully dialed a remote node.
45+
pub peers_dialed_success: Counter,
46+
/// Number of times we failed to dial a remote node.
47+
pub peers_dialed_failure: Counter,
48+
/// Number of times we accepted a connection from a remote node.
49+
pub peers_accepted: Counter,
2950
/// Number of times the main actor loop ticked
3051
pub actor_tick_main: Counter,
31-
/// Number of times the actor ticked for a message received
32-
pub actor_tick_rx: Counter,
33-
/// Number of times the actor ticked for an endpoint event
34-
pub actor_tick_endpoint: Counter,
35-
/// Number of times the actor ticked for a dialer event
36-
pub actor_tick_dialer: Counter,
37-
/// Number of times the actor ticked for a successful dialer event
38-
pub actor_tick_dialer_success: Counter,
39-
/// Number of times the actor ticked for a failed dialer event
40-
pub actor_tick_dialer_failure: Counter,
41-
/// Number of times the actor ticked for an incoming event
42-
pub actor_tick_in_event_rx: Counter,
43-
/// Number of times the actor ticked for a timer event
44-
pub actor_tick_timers: Counter,
52+
}
53+
54+
impl Metrics {
55+
/// Track an [`InEvent`].
56+
pub fn track_in_event<PI: Serialize>(&self, in_event: &InEvent<PI>) {
57+
match in_event {
58+
InEvent::RecvMessage(_, message) => match message.kind() {
59+
MessageKind::Data => {
60+
self.msgs_data_recv.inc();
61+
self.msgs_data_recv_size
62+
.inc_by(message.size().unwrap_or(0) as u64);
63+
}
64+
MessageKind::Control => {
65+
self.msgs_ctrl_recv.inc();
66+
self.msgs_ctrl_recv_size
67+
.inc_by(message.size().unwrap_or(0) as u64);
68+
}
69+
},
70+
InEvent::Command(cmd) => match cmd {
71+
proto::Command::Broadcast(_, scope) => match scope {
72+
proto::Scope::Swarm => inc(&self.msgs_broadcast_swarm),
73+
proto::Scope::Neighbors => inc(&self.msgs_broadcast_neighbors),
74+
},
75+
proto::Command::Join(_) => {}
76+
proto::Command::Quit => {}
77+
},
78+
InEvent::TimerExpired(_) => {}
79+
InEvent::PeerDisconnected(_) => {}
80+
InEvent::UpdatePeerData(_) => {}
81+
}
82+
}
83+
84+
/// Track an [`OutEvent`].
85+
pub fn track_out_event<PI: Serialize>(&self, out_event: &OutEvent<PI>) {
86+
match out_event {
87+
OutEvent::SendMessage(_to, message) => match message.kind() {
88+
MessageKind::Data => {
89+
self.msgs_data_sent.inc();
90+
self.msgs_data_sent_size
91+
.inc_by(message.size().unwrap_or(0) as u64);
92+
}
93+
MessageKind::Control => {
94+
self.msgs_ctrl_sent.inc();
95+
self.msgs_ctrl_sent_size
96+
.inc_by(message.size().unwrap_or(0) as u64);
97+
}
98+
},
99+
OutEvent::EmitEvent(event) => match event {
100+
proto::Event::NeighborUp(_peer) => inc(&self.neighbor_up),
101+
proto::Event::NeighborDown(_peer) => inc(&self.neighbor_down),
102+
_ => {}
103+
},
104+
_ => {}
105+
}
106+
}
107+
}
108+
109+
pub(crate) fn inc(counter: &Counter) {
110+
counter.inc();
45111
}

0 commit comments

Comments
 (0)