Skip to content

Commit 6103e07

Browse files
committed
Tedge flows statistics are published over MQTT
Signed-off-by: Didier Wenzek <[email protected]>
1 parent 661d55d commit 6103e07

File tree

5 files changed

+103
-16
lines changed

5 files changed

+103
-16
lines changed

crates/extensions/tedge_flows/src/actor.rs

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ use crate::flow::Message;
66
use crate::flow::SourceTag;
77
use crate::registry::FlowRegistryExt;
88
use crate::runtime::MessageProcessor;
9+
use crate::stats::MqttStatsPublisher;
910
use crate::InputMessage;
1011
use crate::Tick;
1112
use async_trait::async_trait;
@@ -48,6 +49,7 @@ pub struct FlowsMapper {
4849
pub(super) watched_commands: HashSet<String>,
4950
pub(super) processor: MessageProcessor<ConnectedFlowRegistry>,
5051
pub(super) next_dump: Instant,
52+
pub(super) stats_publisher: MqttStatsPublisher,
5153
}
5254

5355
#[async_trait]
@@ -258,7 +260,13 @@ impl FlowsMapper {
258260
let timestamp = SystemTime::now();
259261
if self.next_dump <= now {
260262
self.processor.dump_memory_stats().await;
261-
self.processor.dump_processing_stats().await;
263+
for record in self
264+
.processor
265+
.dump_processing_stats(&self.stats_publisher)
266+
.await
267+
{
268+
self.mqtt_sender.send(record).await?;
269+
}
262270
self.next_dump = now + STATS_DUMP_INTERVAL;
263271
}
264272
for messages in self.processor.on_interval(timestamp, now).await {

crates/extensions/tedge_flows/src/lib.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ pub use crate::flow::*;
1818
pub use crate::registry::BaseFlowRegistry;
1919
pub use crate::registry::FlowRegistryExt;
2020
pub use crate::runtime::MessageProcessor;
21+
use crate::stats::MqttStatsPublisher;
2122
use camino::Utf8Path;
2223
use std::collections::HashSet;
2324
use std::convert::Infallible;
@@ -131,6 +132,9 @@ impl Builder<FlowsMapper> for FlowsMapperBuilder {
131132
fn build(self) -> FlowsMapper {
132133
let subscriptions = self.topics().clone();
133134
let watched_commands = HashSet::new();
135+
let stats_publisher = MqttStatsPublisher {
136+
topic_prefix: "te/device/main/service/tedge-flows/stats".to_string(),
137+
};
134138
FlowsMapper {
135139
messages: self.message_box.build(),
136140
mqtt_sender: self.mqtt_sender,
@@ -139,6 +143,7 @@ impl Builder<FlowsMapper> for FlowsMapperBuilder {
139143
watched_commands,
140144
processor: self.processor,
141145
next_dump: Instant::now() + STATS_DUMP_INTERVAL,
146+
stats_publisher,
142147
}
143148
}
144149
}

crates/extensions/tedge_flows/src/runtime.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ use crate::js_runtime::JsRuntime;
55
use crate::registry::BaseFlowRegistry;
66
use crate::registry::FlowRegistryExt;
77
use crate::stats::Counter;
8+
use crate::stats::StatsPublisher;
89
use crate::LoadError;
910
use camino::Utf8Path;
1011
use camino::Utf8PathBuf;
@@ -135,8 +136,8 @@ impl<Registry: FlowRegistryExt + Send> MessageProcessor<Registry> {
135136
out_messages
136137
}
137138

138-
pub async fn dump_processing_stats(&self) {
139-
self.stats.dump_processing_stats();
139+
pub async fn dump_processing_stats<P: StatsPublisher>(&self, publisher: &P) -> Vec<P::Record> {
140+
self.stats.dump_processing_stats(publisher)
140141
}
141142

142143
pub async fn dump_memory_stats(&self) {

crates/extensions/tedge_flows/src/stats.rs

Lines changed: 85 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,10 @@
1+
use serde_json::Value;
12
use std::collections::HashMap;
23
use std::fmt::Display;
34
use std::time::Duration;
45
use std::time::Instant;
6+
use tedge_mqtt_ext::MqttMessage;
7+
use tedge_mqtt_ext::Topic;
58

69
#[derive(Default)]
710
pub struct Counter {
@@ -113,11 +116,12 @@ impl Counter {
113116
self.from_start.entry(dim).or_default().add(sample);
114117
}
115118

116-
pub fn dump_processing_stats(&self) {
119+
pub fn dump_processing_stats<P: StatsPublisher>(&self, publisher: &P) -> Vec<P::Record> {
117120
tracing::info!(target: "flows", "Processing statistics:");
118-
for (dim, stats) in &self.from_start {
119-
stats.dump_statistics(dim)
120-
}
121+
self.from_start
122+
.iter()
123+
.filter_map(|(dim, stats)| stats.dump_statistics(dim, publisher))
124+
.collect()
121125
}
122126
}
123127

@@ -140,15 +144,27 @@ impl Stats {
140144
}
141145
}
142146

143-
pub fn dump_statistics(&self, dim: &Dimension) {
144-
tracing::info!(target: "flows", " - {dim}");
145-
tracing::info!(target: "flows", " - input count: {}", self.messages_in);
146-
tracing::info!(target: "flows", " - output count: {}", self.messages_out);
147-
tracing::info!(target: "flows", " - error count: {}", self.error_raised);
148-
if let Some(duration_stats) = &self.processing_time {
149-
tracing::info!(target: "flows", " - min processing time: {:?}", duration_stats.min);
150-
tracing::info!(target: "flows", " - max processing time: {:?}", duration_stats.max);
151-
}
147+
pub fn dump_statistics<P: StatsPublisher>(
148+
&self,
149+
dim: &Dimension,
150+
publisher: &P,
151+
) -> Option<P::Record> {
152+
let stats = match self.processing_time.as_ref() {
153+
None => serde_json::json!({
154+
"input": self.messages_in,
155+
"output": self.messages_out,
156+
"error": self.error_raised,
157+
}),
158+
Some(duration_stats) => serde_json::json!({
159+
"input": self.messages_in,
160+
"output": self.messages_out,
161+
"error": self.error_raised,
162+
"cpu-min": format!("{:?}", duration_stats.min),
163+
"cpu-max": format!("{:?}", duration_stats.max),
164+
}),
165+
};
166+
167+
publisher.publish(dim, stats)
152168
}
153169
}
154170

@@ -192,3 +208,59 @@ impl Dimension {
192208
}
193209
}
194210
}
211+
212+
pub trait StatsPublisher {
213+
type Record;
214+
215+
fn publish(&self, dim: &Dimension, stats: serde_json::Value) -> Option<Self::Record>;
216+
}
217+
218+
pub struct TracingStatsPublisher;
219+
220+
impl StatsPublisher for TracingStatsPublisher {
221+
type Record = ();
222+
223+
fn publish(&self, dim: &Dimension, stats: Value) -> Option<()> {
224+
tracing::info!(target: "flows", " - {dim}");
225+
if let Some(stats) = stats.as_object() {
226+
for (k, v) in stats {
227+
tracing::info!(target: "flows", " - {k}: {v}");
228+
}
229+
}
230+
None
231+
}
232+
}
233+
234+
pub struct MqttStatsPublisher {
235+
pub topic_prefix: String,
236+
}
237+
238+
impl StatsPublisher for MqttStatsPublisher {
239+
type Record = MqttMessage;
240+
241+
fn publish(&self, dim: &Dimension, stats: Value) -> Option<Self::Record> {
242+
match dim {
243+
Dimension::Flow(path) | Dimension::OnMessage(path) => {
244+
self.topic_for(path).map(|topic| {
245+
let payload = stats.to_string();
246+
MqttMessage::new(&topic, payload)
247+
})
248+
}
249+
250+
Dimension::Runtime => self.topic_for("runtime").map(|topic| {
251+
let payload = stats.to_string();
252+
MqttMessage::new(&topic, payload)
253+
}),
254+
255+
_ => None,
256+
}
257+
}
258+
}
259+
260+
impl MqttStatsPublisher {
261+
pub fn topic_for(&self, path: &str) -> Option<Topic> {
262+
let name = path.split('/').last().unwrap();
263+
let topic = format!("{}/{}", self.topic_prefix, name);
264+
Topic::new(&topic).ok()
265+
}
266+
}

crates/extensions/tedge_flows/tests/interval.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -370,6 +370,7 @@ async fn interval_executes_when_time_exceeds_interval() {
370370
let count = || {
371371
captured_messages
372372
.retain(|msg| !msg.topic.as_ref().contains("status"))
373+
.retain(|msg| !msg.topic.as_ref().contains("stats"))
373374
.count()
374375
};
375376

0 commit comments

Comments
 (0)