Skip to content

Commit 0774a37

Browse files
committed
Deprecated flow step onConfigUpdate method
Now flow script have to use the shared KV store to share data such as entity metadata or measurement units Signed-off-by: Didier Wenzek <[email protected]>
1 parent c79126a commit 0774a37

File tree

7 files changed

+7
-98
lines changed

7 files changed

+7
-98
lines changed

crates/extensions/tedge_flows/src/config.rs

Lines changed: 1 addition & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -40,9 +40,6 @@ pub struct StepConfig {
4040
#[serde(default)]
4141
#[serde(deserialize_with = "parse_human_duration")]
4242
interval: Duration,
43-
44-
#[serde(default)]
45-
meta_topics: Vec<String>,
4643
}
4744

4845
#[derive(Deserialize)]
@@ -159,7 +156,6 @@ impl FlowConfig {
159156
script: ScriptSpec::JavaScript(script),
160157
config: None,
161158
interval: Duration::default(),
162-
meta_topics: vec![],
163159
};
164160
Self {
165161
input: InputConfig::Mqtt {
@@ -214,11 +210,7 @@ impl StepConfig {
214210
let script = JsScript::new(flow.to_owned(), index, path)
215211
.with_config(self.config)
216212
.with_interval(self.interval);
217-
let config_topics = topic_filters(self.meta_topics)?;
218-
Ok(FlowStep {
219-
script,
220-
config_topics,
221-
})
213+
Ok(FlowStep { script })
222214
}
223215
}
224216

crates/extensions/tedge_flows/src/flow.rs

Lines changed: 1 addition & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,6 @@ pub struct Flow {
4444
/// A message transformation step
4545
pub struct FlowStep {
4646
pub script: JsScript,
47-
pub config_topics: TopicFilter,
4847
}
4948

5049
pub enum SourceTag {
@@ -156,33 +155,7 @@ impl Flow {
156155
}
157156

158157
pub fn topics(&self) -> TopicFilter {
159-
let mut topics = self.input.topics();
160-
for step in self.steps.iter() {
161-
topics.add_all(step.config_topics.clone())
162-
}
163-
topics
164-
}
165-
166-
pub async fn on_config_update(
167-
&mut self,
168-
js_runtime: &JsRuntime,
169-
message: &Message,
170-
) -> FlowResult {
171-
let result = self.on_config_update_steps(js_runtime, message).await;
172-
self.publish(result)
173-
}
174-
175-
async fn on_config_update_steps(
176-
&mut self,
177-
js_runtime: &JsRuntime,
178-
message: &Message,
179-
) -> Result<Vec<Message>, FlowError> {
180-
for step in self.steps.iter_mut() {
181-
if step.config_topics.accept_topic_name(&message.topic) {
182-
step.script.on_config_update(js_runtime, message).await?
183-
}
184-
}
185-
Ok(vec![])
158+
self.input.topics()
186159
}
187160

188161
pub fn accept_message(&self, _source: &SourceTag, message: &Message) -> bool {
@@ -377,9 +350,6 @@ impl FlowStep {
377350
if script.no_js_on_message_fun {
378351
warn!(target: "flows", "Flow script with no 'onMessage' function: {}", script.path);
379352
}
380-
if script.no_js_on_config_update_fun && !self.config_topics.is_empty() {
381-
warn!(target: "flows", "Flow script with no 'onConfigUpdate' function: {}; but configured with 'config_topics' in {flow}", script.path);
382-
}
383353
if script.no_js_on_interval_fun && !script.interval.is_zero() {
384354
warn!(target: "flows", "Flow script with no 'onInterval' function: {}; but configured with an 'interval' in {flow}", script.path);
385355
}

crates/extensions/tedge_flows/src/js_runtime.rs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,6 @@ impl JsRuntime {
5353
for export in exports {
5454
match export {
5555
"onMessage" => script.no_js_on_message_fun = false,
56-
"onConfigUpdate" => script.no_js_on_config_update_fun = false,
5756
"onInterval" => script.no_js_on_interval_fun = false,
5857
_ => (),
5958
}
@@ -78,7 +77,7 @@ impl JsRuntime {
7877
) -> Result<Vec<&'static str>, LoadError> {
7978
let (sender, receiver) = oneshot::channel();
8079
let source = source.into();
81-
let imports = vec!["onMessage", "onConfigUpdate", "onInterval"];
80+
let imports = vec!["onMessage", "onInterval"];
8281
TIME_CREDITS.store(100000, std::sync::atomic::Ordering::Relaxed);
8382
self.send(
8483
receiver,

crates/extensions/tedge_flows/src/js_script.rs

Lines changed: 1 addition & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@ pub struct JsScript {
1818
pub interval: Duration,
1919
pub next_execution: Option<Instant>,
2020
pub no_js_on_message_fun: bool,
21-
pub no_js_on_config_update_fun: bool,
2221
pub no_js_on_interval_fun: bool,
2322
}
2423

@@ -32,7 +31,6 @@ impl JsScript {
3231
interval: Duration::ZERO,
3332
next_execution: None,
3433
no_js_on_message_fun: true,
35-
no_js_on_config_update_fun: true,
3634
no_js_on_interval_fun: true,
3735
}
3836
}
@@ -69,7 +67,7 @@ impl JsScript {
6967
/// The "onMessage" function of the JS module is passed 3 arguments
7068
/// - the current timestamp
7169
/// - the message to be transformed
72-
/// - the flow step config (as configured for the flow step, possibly updated by onConfigUpdate messages)
70+
/// - the flow step config (as configured in the flow toml)
7371
///
7472
/// The returned value is expected to be an array of messages.
7573
pub async fn on_message(
@@ -94,32 +92,6 @@ impl JsScript {
9492
.try_into()
9593
}
9694

97-
/// Update the flow step config using a metadata message
98-
///
99-
/// The "onConfigUpdate" function of the JS module is passed 2 arguments
100-
/// - the message
101-
/// - the current flow step config
102-
///
103-
/// The value returned by this function is used as the updated flow step config
104-
pub async fn on_config_update(
105-
&mut self,
106-
js: &JsRuntime,
107-
message: &Message,
108-
) -> Result<(), FlowError> {
109-
debug!(target: "flows", "{}: onConfigUpdate({message})", self.module_name());
110-
if self.no_js_on_config_update_fun {
111-
return Ok(());
112-
}
113-
114-
let input = vec![message.clone().into(), self.config.clone()];
115-
let config = js
116-
.call_function(&self.module_name(), "onConfigUpdate", input)
117-
.await
118-
.map_err(flow::error_from_js)?;
119-
self.config = config;
120-
Ok(())
121-
}
122-
12395
/// Initialize the next execution time for this script's interval
12496
/// Should be called after the script is loaded and interval is set
12597
pub fn init_next_execution(&mut self) {

crates/extensions/tedge_flows/src/runtime.rs

Lines changed: 0 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -104,14 +104,6 @@ impl<Registry: FlowRegistryExt + Send> MessageProcessor<Registry> {
104104

105105
let mut out_messages = vec![];
106106
for flow in self.registry.flows_mut() {
107-
let config_result = flow
108-
.as_mut()
109-
.on_config_update(&self.js_runtime, message)
110-
.await;
111-
if config_result.is_err() {
112-
out_messages.push(config_result);
113-
continue;
114-
}
115107
if flow.as_ref().accept_message(source, message) {
116108
let flow_output = flow
117109
.as_mut()

crates/extensions/tedge_flows/src/stats.rs

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,6 @@ pub enum Dimension {
1414
Flow(String),
1515
OnMessage(String),
1616
OnInterval(String),
17-
OnConfigUpdate(String),
1817
}
1918

2019
pub enum Sample {
@@ -177,7 +176,6 @@ impl Display for Dimension {
177176
Dimension::Flow(toml) => write!(f, "flow {toml}"),
178177
Dimension::OnMessage(js) => write!(f, "onMessage step {js}"),
179178
Dimension::OnInterval(js) => write!(f, "onInterval step {js}"),
180-
Dimension::OnConfigUpdate(js) => write!(f, "onConfigUpdate step {js}"),
181179
}
182180
}
183181
}
@@ -187,7 +185,6 @@ impl Dimension {
187185
match f {
188186
"onMessage" => Some(Dimension::OnMessage(js.to_owned())),
189187
"onInterval" => Some(Dimension::OnInterval(js.to_owned())),
190-
"onConfigUpdate" => Some(Dimension::OnConfigUpdate(js.to_owned())),
191188
_ => None,
192189
}
193190
}

docs/src/references/mappers/flows.md

Lines changed: 3 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -56,18 +56,14 @@ A transformation *script* is a JavaScript or TypeScript module that exports:
5656

5757
- at least, a function `onMessage()`, aimed to transform one input message into zero, one or more output messages,
5858
- possibly, a function `onInterval()`, called at regular intervals to produce aggregated messages,
59-
- possibly, a function `onConfigUpdate()`, used to update the step config.
6059

6160
```ts
6261
interface FlowStep {
6362
// transform one input message into zero, one or more output messages
6463
onMessage(message: Message, config: object): null | Message | Message[],
6564

6665
// called at regular intervals to produce aggregated messages
67-
onInterval(time: Date, config: object): null | Message | Message[],
68-
69-
// update the step config given a config update message
70-
onConfigUpdate(message: Message, config: object): object
66+
onInterval(time: Date, config: object): null | Message | Message[]
7167
}
7268
```
7369

@@ -92,11 +88,6 @@ The `onMessage` function is called for each message to be transformed
9288
- The config as read from the flow config or updated by the script
9389
- The function is expected to return zero, one or many transformed messages `[{ topic: string, payload: string }]`
9490
- An exception can be thrown if the input message cannot be transformed.
95-
- If defined and associated in the step config with `meta_topics`, the `onConfigUpdate` function is called on each message received on these `meta_topics`.
96-
- The arguments are:
97-
- The message to be interpreted as a config update `{ topic: string, payload: string }`
98-
- The current config
99-
- The returned value (an arbitrary JSON value) is then used as the new config for the flow script.
10091
- A flow script can also export a `onInterval` function
10192
- This function is called at a regular pace with the current time and config.
10293
- The flow script can then return zero, one or many transformed messages
@@ -112,18 +103,15 @@ The `onMessage` function is called for each message to be transformed
112103
- A step is defined by a JavaScript file with an `.mjs` or `.js` extension.
113104
- This can also be a TypeScript module with a `.ts` extension.
114105
- The definition of flow defines its input, output and error sink as well as a list of transformation steps.
115-
- Each step is built from a javascript and is possibly given a config (arbitrary json that will be passed to the script)
116-
- Each step can also subscribe to a list of MQTT meta topics where the metadata about the actual data message is stored
117-
(e.g, meta topic of a measurement type where its units threshold values are defined).
118-
The messages received on these topics will be passed to the `onConfigUpdate` letting the script update its config.
106+
- Each step is built from a javascript and is possibly given a config (arbitrary json that will be passed to the script)
119107
120108
```toml
121109
input.mqtt.topics = ["te/+/+/+/+/m/+"]
122110

123111
steps = [
124112
{ script = "add_timestamp.js" },
125113
{ script = "drop_stragglers.js", config = { max_delay = 60 } },
126-
{ script = "te_to_c8y.js", meta_topics = ["te/+/+/+/+/m/+/meta"] }
114+
{ script = "te_to_c8y.js" }
127115
]
128116
```
129117

@@ -216,7 +204,6 @@ This mapper:
216204
- loads all the flows defined in `/etc/tedge/flows`
217205
- reloads any flow or script that is created, updated or deleted while the mapper is running
218206
- subscribes to each flow `input.mqtt.topics`, dispatching the messages to the `onMessage` functions
219-
- subscribes to each step `meta_topics`, dispatching the messages to the `onConfigUpdate` functions
220207
- triggers at the configured pace the `onInterval` functions
221208
- publishes memory usage statistics
222209
- publishes flows and steps usage statistics

0 commit comments

Comments
 (0)