Skip to content

Commit 96a895a

Browse files
simensrostadjorgenmk
authored andcommitted
samples: net: MQTT: transport: Respect Run To Completion
Ensure that state changes are executed in order by queueing an event on callbacks from the internal MQTT thread. This ensures correct order and state changes in the state machine. Signed-off-by: Simen S. Røstad <[email protected]>
1 parent 486e73a commit 96a895a

File tree

1 file changed

+68
-3
lines changed

1 file changed

+68
-3
lines changed

samples/net/mqtt/src/modules/transport/transport.c

Lines changed: 68 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,24 @@ static char client_id[CONFIG_MQTT_SAMPLE_TRANSPORT_CLIENT_ID_BUFFER_SIZE];
4646
static uint8_t pub_topic[sizeof(client_id) + sizeof(CONFIG_MQTT_SAMPLE_TRANSPORT_PUBLISH_TOPIC)];
4747
static uint8_t sub_topic[sizeof(client_id) + sizeof(CONFIG_MQTT_SAMPLE_TRANSPORT_SUBSCRIBE_TOPIC)];
4848

49+
enum transport_event_type {
50+
CONNECTED,
51+
DISCONNECTED,
52+
};
53+
54+
struct transport_event {
55+
enum transport_event_type type;
56+
};
57+
58+
/* Private channel for internal events */
59+
ZBUS_CHAN_DEFINE(TRANSPORT_PRIVATE_CHANNEL,
60+
struct transport_event,
61+
NULL,
62+
NULL,
63+
ZBUS_OBSERVERS(transport),
64+
ZBUS_MSG_INIT(0)
65+
);
66+
4967
/* User defined state object.
5068
* Used to transfer data between state changes.
5169
*/
@@ -70,15 +88,37 @@ static struct s_object {
7088
static void on_mqtt_connack(enum mqtt_conn_return_code return_code, bool session_present)
7189
{
7290
ARG_UNUSED(return_code);
91+
ARG_UNUSED(session_present);
92+
93+
int err;
94+
struct transport_event event = {
95+
.type = CONNECTED,
96+
};
7397

74-
smf_set_state(SMF_CTX(&s_obj), &state[MQTT_CONNECTED]);
98+
if (return_code != MQTT_CONNECTION_ACCEPTED) {
99+
LOG_ERR("MQTT broker rejected connection, return code: %d", return_code);
100+
return;
101+
}
102+
103+
err = zbus_chan_pub(&TRANSPORT_PRIVATE_CHANNEL, &event, K_SECONDS(1));
104+
if (err) {
105+
LOG_ERR("zbus_chan_pub, error: %d", err);
106+
SEND_FATAL_ERROR();
107+
}
75108
}
76109

77110
static void on_mqtt_disconnect(int result)
78111
{
79-
ARG_UNUSED(result);
112+
int err;
113+
struct transport_event event = {
114+
.type = DISCONNECTED,
115+
};
80116

81-
smf_set_state(SMF_CTX(&s_obj), &state[MQTT_DISCONNECTED]);
117+
err = zbus_chan_pub(&TRANSPORT_PRIVATE_CHANNEL, &event, K_SECONDS(1));
118+
if (err) {
119+
LOG_ERR("zbus_chan_pub, error: %d", err);
120+
SEND_FATAL_ERROR();
121+
}
82122
}
83123

84124
static void on_mqtt_publish(struct mqtt_helper_buf topic, struct mqtt_helper_buf payload)
@@ -381,6 +421,31 @@ static void transport_task(void)
381421
return;
382422
}
383423
}
424+
425+
if (&TRANSPORT_PRIVATE_CHANNEL == chan) {
426+
427+
struct transport_event event;
428+
429+
err = zbus_chan_read(&TRANSPORT_PRIVATE_CHANNEL, &event, K_SECONDS(1));
430+
if (err) {
431+
LOG_ERR("zbus_chan_read, error: %d", err);
432+
SEND_FATAL_ERROR();
433+
return;
434+
}
435+
436+
/* Process MQTT events and change state in the correct thread context */
437+
switch (event.type) {
438+
case CONNECTED:
439+
smf_set_state(SMF_CTX(&s_obj), &state[MQTT_CONNECTED]);
440+
break;
441+
case DISCONNECTED:
442+
smf_set_state(SMF_CTX(&s_obj), &state[MQTT_DISCONNECTED]);
443+
break;
444+
default:
445+
LOG_WRN("Unknown MQTT event type: %d", event.type);
446+
break;
447+
}
448+
}
384449
}
385450
}
386451

0 commit comments

Comments
 (0)