-
Notifications
You must be signed in to change notification settings - Fork 70
feat: tedge flows context #3869
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: tedge flows context #3869
Conversation
Codecov Report❌ Patch coverage is 📢 Thoughts on this report? Let us know! 🚀 New features to boost your workflow:
|
Robot Results
|
|
|
||
| let meta = config[`${message.topic}/meta`] || {} | ||
| let store = new FlowStore() | ||
| let meta = store.get(`${message.topic}/meta`) || {} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That || {} would mask a possible race condition between this flow and the load_metadata.toml flow, right? Since both the flows could be loaded independently, this required context may not have been populated by the other flow, before this step is executed by this flow as it could theoretically receive the measurement message first, before the other flow receives and processes the metadata message for the same.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That
|| {}would mask a possible race condition between this flow and theload_metadata.tomlflow, right?
As long as the flow mapper received the metadata first and the measurement second, then the mapper guarantees this line to be processed where the context metadata is available. But yes, a race can happen at the MQTT level - exactly as for any other mapper or agent logic.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Though wouldn't the race conditional only occur for the first message? Any message delivered in the future will then be processed after the store has been populated.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Though wouldn't the race conditional only occur for the first message? Any message delivered in the future will then be processed after the store has been populated.
As discussed offline, the race condition would happen not only when the flow is loaded for the very first time, but everytime the flow is reloaded as well, on mapper restarts. Even though the subscriptions made by these flows are persistent after the flow is loaded for the first time, the race between messages from two different topics can still happen whenever the MQTT connection is re-established. This is a fundamental issue with the MQTT protocol itself that we've had to workaround for the entity store, by caching telemetry messages for an entity before it is registered. The flow scripts will also have to take a similar approach by caching the data in the flow context.
| /// } | ||
| /// } | ||
| /// ``` | ||
| export function onConfigUpdate(message, config) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Though I'm fully aligned with tedge taking over the extract and store responsibilities of the meta messages, one additional "feature" that we've lost in the process is the flows awareness of any dynamic meta changes. For example, if there was a tick based flow doing some processing with an interval of 1h, if the meta definition changed after 30 mins, the flow should have been aware of this change. The flow can choose whether to react to that change or not for the current batch of messages. But, at least getting notified would have been useful.
This is probably not a big deal in most fleets where the telemetry metadata is pretty much static.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Correct
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we allow for multiple outputs? If so, then technically users could also publish the context to an MQTT topic as well, and the flow could subscribe to both the original flow messages, and the context updates (which are also published on some local MQTT topic)
db244df to
0774a37
Compare
albinsuresh
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Happy to approve once the doc is updated.
A side note, unrelated to this PR: When console.log() outputs are logged in the mapper log, it appears directly as follows:
Nov 28 06:37:49 f6641df9de15 tedge-mapper[530]: JavaScript.Console: "test log"
It would have been better if the context(module_name) of that log is also included in that log statement.
|
|
||
| let entity_id = `${topic_parts[1]}/${topic_parts[2]}/${topic_parts[3]}/${topic_parts[4]}` | ||
| if (entity_id != "device/main//") { | ||
| let entity = context.mapper.get(entity_id) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
While trying to access a non-exported context namespace like context.abc.get() from the script, it was a silent failure with no trace of that error in the log. But, testing the same broken script with the tedge flows test utility was giving the proper error message:
2025-11-28T06:29:00.114082517Z ERROR tedge::cli::flows::test: Error in /etc/tedge/flows/measurements.toml: No messages can be processed due to an incorrect setting: JS raised exception: Error: cannot read property 'get' of undefined
at onMessage (/etc/tedge/flows/measurements.toml|0|/etc/tedge/flows/add_timestamp.js:2:11)
Was it not feasible to get the same error message during live message processing?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
no trace of that error in the log.
This is surprising. Both tedge-mapper flows and tedge flows test should log the same. I will check.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So there is indeed a difference between tedge-mapper flows and tedge flows test.
tedge flows testignores all the input, output and error configuration: all the messages are consumed from stdin and published to stdout or stderr.tedge-mapper flowsuses the configured input, output and error. And the default is to send errors tote/errors.
One can improve flow error logging with an option to tee error messages to the mapper log.
albinsuresh
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Approving with minor doc suggestions. The inconsistency with the error reporting and the logging of console.log() log statements, reported in the previous review, are independent issues. So, it can be handled separately as well.
This is first step toward a context used by flow scripts to access shared data, such as entity metadata or measurement units. For this first step, the store is not passed as an argument to the onMessage and onInterval method. The scripts has to create a `new FlowStore()` providing a `get()` method to get the value attached to some key - typically the metadata attached to an MQTT topic. The scripts only have a read access to the shared store which is populated by regular flows which output is configured to be the context. Signed-off-by: Didier Wenzek <[email protected]>
Now flow script have to use the shared KV store to share data such as entity metadata or measurement units Signed-off-by: Didier Wenzek <[email protected]>
Signed-off-by: Didier Wenzek <[email protected]>
Signed-off-by: Didier Wenzek <[email protected]>
Signed-off-by: Didier Wenzek <[email protected]>
This introduces 3 levels of namespace: - `mapper` shared by all the flows - `flow` shared by all the steps of a flow - `script` private to a script instance Signed-off-by: Didier Wenzek <[email protected]>
Signed-off-by: Didier Wenzek <[email protected]>
Signed-off-by: Didier Wenzek <[email protected]>
Signed-off-by: Didier Wenzek <[email protected]>
b4012f1 to
ffd9bcd
Compare
reubenmiller
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Approved
Proposed changes
onConfigUpdatemethodonMessageandonIntervalto give the scripts access to the shared datacontext.mappergives the scripts a read access to a store shared by all the flowscontext.mapperis populated by regular flows which output is configured to be the context.context.flowis shared by all the scripts of a flowcontext.scriptis private to a script instancecontext.configis the config provided by the flow to a script instancekeysmethod.contextcan be updated usingget,set,removemethods.Types of changes
Paste Link to the issue
#3850
Checklist
just prepare-devonce)just formatas mentioned in CODING_GUIDELINESjust checkas mentioned in CODING_GUIDELINESFurther comments