Skip to content

Commit c79126a

Browse files
committed
KV store shared by all the flows
This is first step toward a context used by flow scripts to access shared data, such as entity metadata or measurement units. For this first step, the store is not passed as an argument to the onMessage and onInterval method. The scripts has to create a `new FlowStore()` providing a `get()` method to get the value attached to some key - typically the metadata attached to an MQTT topic. The scripts only have a read access to the shared store which is populated by regular flows which output is configured to be the context. Signed-off-by: Didier Wenzek <[email protected]>
1 parent deb7204 commit c79126a

File tree

11 files changed

+182
-43
lines changed

11 files changed

+182
-43
lines changed

crates/extensions/tedge_flows/src/actor.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -409,6 +409,12 @@ impl FlowsMapper {
409409
error!(target: "flows", "{flow}: cannot flush {path}: {err}");
410410
}
411411
}
412+
FlowOutput::Context => {
413+
// Any context output reaching this point is an error
414+
for message in messages {
415+
error!(target: "flows", "{flow}: cannot store context value: {}", message.payload_str().unwrap_or_default());
416+
}
417+
}
412418
}
413419
Ok(())
414420
}

crates/extensions/tedge_flows/src/config.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -88,6 +88,9 @@ pub enum OutputConfig {
8888

8989
#[serde(rename = "file")]
9090
File { path: Utf8PathBuf },
91+
92+
#[serde(rename = "context")]
93+
Context,
9194
}
9295

9396
#[derive(thiserror::Error, Debug)]
@@ -269,6 +272,7 @@ impl TryFrom<OutputConfig> for FlowOutput {
269272
topic: topic.map(into_topic).transpose()?,
270273
},
271274
OutputConfig::File { path } => FlowOutput::File { path },
275+
OutputConfig::Context => FlowOutput::Context,
272276
})
273277
}
274278
}

crates/extensions/tedge_flows/src/flow.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -87,6 +87,7 @@ pub enum FlowInput {
8787
pub enum FlowOutput {
8888
Mqtt { topic: Option<Topic> },
8989
File { path: Utf8PathBuf },
90+
Context,
9091
}
9192

9293
/// The final outcome of a sequence of transformations applied by a flow to a message
Lines changed: 75 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,75 @@
1+
use crate::js_value::JsonValue;
2+
use rquickjs::class::Trace;
3+
use rquickjs::Class;
4+
use rquickjs::Ctx;
5+
use rquickjs::JsLifetime;
6+
use rquickjs::Result;
7+
use std::collections::BTreeMap;
8+
use std::ops::Deref;
9+
use std::sync::Arc;
10+
use std::sync::Mutex;
11+
12+
#[derive(Clone, Default, JsLifetime)]
13+
pub struct KVStore {
14+
data: Arc<Mutex<BTreeMap<String, JsonValue>>>,
15+
}
16+
17+
impl KVStore {
18+
pub fn init(&self, ctx: &Ctx<'_>) {
19+
let globals = ctx.globals();
20+
let _ = Class::<FlowStore>::define(&globals);
21+
self.store_as_userdata(ctx)
22+
}
23+
24+
pub fn get(&self, key: &str) -> JsonValue {
25+
let data = self.data.lock().unwrap();
26+
data.get(key).cloned().unwrap_or(JsonValue::Null)
27+
}
28+
29+
pub fn insert(&self, key: impl Into<String>, value: impl Into<JsonValue>) {
30+
match value.into() {
31+
JsonValue::Null => self.remove(&key.into()),
32+
value => {
33+
let mut data = self.data.lock().unwrap();
34+
data.insert(key.into(), value);
35+
}
36+
}
37+
}
38+
39+
pub fn remove(&self, key: &str) {
40+
let mut data = self.data.lock().unwrap();
41+
data.remove(key);
42+
}
43+
44+
fn store_as_userdata(&self, ctx: &Ctx<'_>) {
45+
let _ = ctx.store_userdata(self.clone());
46+
}
47+
48+
fn get_from_userdata(ctx: &Ctx<'_>) -> Self {
49+
match ctx.userdata::<Self>() {
50+
None => {
51+
let store = KVStore::default();
52+
store.store_as_userdata(ctx);
53+
store
54+
}
55+
Some(userdata) => userdata.deref().clone(),
56+
}
57+
}
58+
}
59+
60+
#[derive(Clone, Trace, JsLifetime)]
61+
#[rquickjs::class(frozen)]
62+
pub struct FlowStore {}
63+
64+
#[rquickjs::methods]
65+
impl<'js> FlowStore {
66+
#[qjs(constructor)]
67+
fn new(_ctx: Ctx<'js>) -> Result<FlowStore> {
68+
Ok(FlowStore {})
69+
}
70+
71+
fn get(&self, ctx: Ctx<'js>, key: String) -> Result<JsonValue> {
72+
let data = KVStore::get_from_userdata(&ctx);
73+
Ok(data.get(&key))
74+
}
75+
}
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
11
pub mod console;
2+
pub mod kv_store;
23
pub mod text_decoder;
34
pub mod text_encoder;

crates/extensions/tedge_flows/src/js_runtime.rs

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
use crate::js_lib;
2+
use crate::js_lib::kv_store::KVStore;
23
use crate::js_script::JsScript;
34
use crate::js_value::JsonValue;
45
use crate::LoadError;
@@ -17,6 +18,7 @@ use tracing::debug;
1718

1819
pub struct JsRuntime {
1920
runtime: rquickjs::AsyncRuntime,
21+
pub(crate) store: KVStore,
2022
worker: mpsc::Sender<JsRequest>,
2123
execution_timeout: Duration,
2224
}
@@ -35,10 +37,12 @@ impl JsRuntime {
3537
})))
3638
.await;
3739
let context = rquickjs::AsyncContext::full(&runtime).await?;
38-
let worker = JsWorker::spawn(context).await;
40+
let store = KVStore::default();
41+
let worker = JsWorker::spawn(context, store.clone()).await;
3942
let execution_timeout = Duration::from_secs(5);
4043
Ok(JsRuntime {
4144
runtime,
45+
store,
4246
worker,
4347
execution_timeout,
4448
})
@@ -164,20 +168,21 @@ struct JsWorker {
164168
}
165169

166170
impl JsWorker {
167-
pub async fn spawn(context: rquickjs::AsyncContext) -> mpsc::Sender<JsRequest> {
171+
pub async fn spawn(context: rquickjs::AsyncContext, store: KVStore) -> mpsc::Sender<JsRequest> {
168172
let (sender, requests) = mpsc::channel(100);
169173
tokio::spawn(async move {
170174
let worker = JsWorker { context, requests };
171-
worker.run().await
175+
worker.run(store).await
172176
});
173177
sender
174178
}
175179

176-
async fn run(mut self) {
180+
async fn run(mut self, store: KVStore) {
177181
rquickjs::async_with!(self.context => |ctx| {
178182
js_lib::console::init(&ctx);
179183
js_lib::text_decoder::init(&ctx);
180184
js_lib::text_encoder::init(&ctx);
185+
store.init(&ctx);
181186
let mut modules = JsModules::new();
182187
while let Some(request) = self.requests.recv().await {
183188
match request {

crates/extensions/tedge_flows/src/js_script.rs

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -555,6 +555,38 @@ export async function onMessage(message, config) {
555555
);
556556
}
557557

558+
#[tokio::test]
559+
async fn using_the_context() {
560+
let js = r#"
561+
export function onMessage(message) {
562+
let data = new FlowStore()
563+
return {
564+
topic: message.topic,
565+
payload: JSON.stringify(data.get(message.topic))
566+
}
567+
}
568+
"#;
569+
let (runtime, script) = runtime_with(js).await;
570+
571+
runtime.store.insert(
572+
"foo/bar",
573+
serde_json::json!({
574+
"hello": "world",
575+
"guess": 42,
576+
}),
577+
);
578+
579+
let input = Message::new("foo/bar", "");
580+
let output = Message::new("foo/bar", r#"{"guess":42,"hello":"world"}"#);
581+
assert_eq!(
582+
script
583+
.on_message(&runtime, SystemTime::now(), &input)
584+
.await
585+
.unwrap(),
586+
vec![output]
587+
);
588+
}
589+
558590
async fn runtime_with(js: &str) -> (JsRuntime, JsScript) {
559591
let mut runtime = JsRuntime::try_new().await.unwrap();
560592
let mut script = JsScript::new("toml".into(), 1, "js".into());

crates/extensions/tedge_flows/src/runtime.rs

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,8 @@ use crate::js_runtime::JsRuntime;
55
use crate::registry::BaseFlowRegistry;
66
use crate::registry::FlowRegistryExt;
77
use crate::stats::Counter;
8+
use crate::FlowError;
9+
use crate::FlowOutput;
810
use crate::LoadError;
911
use camino::Utf8Path;
1012
use camino::Utf8PathBuf;
@@ -121,6 +123,9 @@ impl<Registry: FlowRegistryExt + Send> MessageProcessor<Registry> {
121123

122124
self.stats.runtime_on_message_done(started_at);
123125
out_messages
126+
.into_iter()
127+
.filter_map(|flow_output| self.store_context_values(flow_output))
128+
.collect()
124129
}
125130

126131
pub async fn on_interval(&mut self, timestamp: SystemTime, now: Instant) -> Vec<FlowResult> {
@@ -135,6 +140,45 @@ impl<Registry: FlowRegistryExt + Send> MessageProcessor<Registry> {
135140
out_messages
136141
}
137142

143+
fn store_context_values(&mut self, messages: FlowResult) -> Option<FlowResult> {
144+
match messages {
145+
FlowResult::Ok {
146+
messages,
147+
output: FlowOutput::Context,
148+
flow,
149+
} => {
150+
for message in messages {
151+
if let Err(error) = self.store_context_value(&message) {
152+
return Some(FlowResult::Err {
153+
flow,
154+
error,
155+
output: FlowOutput::Context,
156+
});
157+
}
158+
}
159+
None
160+
}
161+
messages => Some(messages),
162+
}
163+
}
164+
165+
pub fn store_context_value(&mut self, message: &Message) -> Result<(), FlowError> {
166+
if message.payload.is_empty() {
167+
self.js_runtime.store.remove(&message.topic)
168+
} else {
169+
let payload = message.payload_str().ok_or(FlowError::UnsupportedMessage(
170+
"Non UFT8 payload".to_string(),
171+
))?;
172+
let value: serde_json::Value = serde_json::from_str(payload)
173+
.map_err(|err| FlowError::UnsupportedMessage(format!("Non JSON payload: {err}")))?;
174+
self.js_runtime
175+
.store
176+
.insert(message.topic.to_owned(), value);
177+
}
178+
179+
Ok(())
180+
}
181+
138182
pub async fn dump_processing_stats(&self) {
139183
self.stats.dump_processing_stats();
140184
}
Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
#
2+
# Update flows context with measurement metadata
3+
#
4+
[input.mqtt]
5+
topics = ["te/+/+/+/+/m/+/meta"]
6+
7+
[output.context]

tests/RobotFramework/tests/tedge_flows/flows/measurements.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,5 +2,5 @@ input.mqtt.topics = ["te/+/+/+/+/m/+"]
22

33
steps = [
44
{ script = "add_timestamp.js" },
5-
{ script = "te_to_c8y.js", meta_topics = ["te/+/+/+/+/m/+/meta"] }
5+
{ script = "te_to_c8y.js" }
66
]

0 commit comments

Comments
 (0)