Skip to content

Commit 43513e2

Browse files
committed
Update MQTT client: enable keep alive, use rx/tx topics
1 parent d1b7687 commit 43513e2

File tree

1 file changed

+44
-36
lines changed
  • tutorials/mqtt/mqtt-client

1 file changed

+44
-36
lines changed

tutorials/mqtt/mqtt-client/main.c

Lines changed: 44 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -1,27 +1,41 @@
1-
// Copyright (c) 2023 Cesanta Software Limited
1+
// Copyright (c) 2023-2025 Cesanta Software Limited
22
// All rights reserved
33
//
44
// Example MQTT client. It performs the following steps:
55
// 1. Connects to the MQTT server specified by `s_url` variable
66
// 2. When connected, subscribes to the topic `s_sub_topic`
7-
// 3. Publishes message `hello` to the `s_pub_topic`
8-
// 4. Receives that message back from the subscribed topic and closes
9-
// 5. Timer-based reconnection logic revives the connection when it is down
7+
// 3. When it receives a message, echoes it back to `s_pub_topic`
8+
// 4. Timer-based reconnection logic revives the connection when it is down
9+
// 5. Ping server periodically. When disconnected, a last will is published
1010
//
1111
// To enable SSL/TLS, see https://mongoose.ws/tutorials/tls/#how-to-build
1212

1313
#include "mongoose.h"
1414

1515
static const char *s_url = "mqtt://broker.hivemq.com:1883";
16-
static const char *s_sub_topic = "mg/+/test"; // Subscribe topic
17-
static const char *s_pub_topic = "mg/clnt/test"; // Publish topic
18-
static int s_qos = 1; // MQTT QoS
19-
static struct mg_connection *s_conn; // Client connection
16+
static const char *s_sub_topic = "mg/123/rx"; // Subscribe topic
17+
static const char *s_pub_topic = "mg/123/tx"; // Publish topic
18+
static int s_qos = 1; // MQTT QoS
19+
static struct mg_connection *s_conn; // Client connection
2020

21-
// Handle interrupts, like Ctrl-C
22-
static int s_signo;
23-
static void signal_handler(int signo) {
24-
s_signo = signo;
21+
static void subscribe(struct mg_connection *c, const char *topic) {
22+
struct mg_mqtt_opts opts = {};
23+
memset(&opts, 0, sizeof(opts));
24+
opts.topic = mg_str(topic);
25+
opts.qos = s_qos;
26+
mg_mqtt_sub(c, &opts);
27+
MG_INFO(("%lu SUBSCRIBED to %s", c->id, topic));
28+
}
29+
30+
static void publish(struct mg_connection *c, const char *topic,
31+
const char *message) {
32+
struct mg_mqtt_opts opts = {};
33+
memset(&opts, 0, sizeof(opts));
34+
opts.topic = mg_str(topic);
35+
opts.message = mg_str(message);
36+
opts.qos = s_qos;
37+
mg_mqtt_pub(c, &opts);
38+
MG_INFO(("%lu PUBLISHED %s -> %s", c->id, topic, message));
2539
}
2640

2741
static void fn(struct mg_connection *c, int ev, void *ev_data) {
@@ -39,28 +53,18 @@ static void fn(struct mg_connection *c, int ev, void *ev_data) {
3953
MG_ERROR(("%lu ERROR %s", c->id, (char *) ev_data));
4054
} else if (ev == MG_EV_MQTT_OPEN) {
4155
// MQTT connect is successful
42-
struct mg_str subt = mg_str(s_sub_topic);
43-
struct mg_str pubt = mg_str(s_pub_topic), data = mg_str("hello");
4456
MG_INFO(("%lu CONNECTED to %s", c->id, s_url));
45-
struct mg_mqtt_opts sub_opts;
46-
memset(&sub_opts, 0, sizeof(sub_opts));
47-
sub_opts.topic = subt;
48-
sub_opts.qos = s_qos;
49-
mg_mqtt_sub(c, &sub_opts);
50-
MG_INFO(("%lu SUBSCRIBED to %.*s", c->id, (int) subt.len, subt.buf));
51-
struct mg_mqtt_opts pub_opts;
52-
memset(&pub_opts, 0, sizeof(pub_opts));
53-
pub_opts.topic = pubt;
54-
pub_opts.message = data;
55-
pub_opts.qos = s_qos, pub_opts.retain = false;
56-
mg_mqtt_pub(c, &pub_opts);
57-
MG_INFO(("%lu PUBLISHED %.*s -> %.*s", c->id, (int) data.len, data.buf,
58-
(int) pubt.len, pubt.buf));
57+
subscribe(c, s_sub_topic);
5958
} else if (ev == MG_EV_MQTT_MSG) {
6059
// When we get echo response, print it
60+
char response[100];
61+
struct mg_mqtt_message *mm = (struct mg_mqtt_message *) ev_data;
62+
mg_snprintf(response, sizeof(response), "Got %.*s -> %.*s", mm->topic.len,
63+
mm->topic.buf, mm->data.len, mm->data.buf);
64+
publish(c, s_pub_topic, response);
65+
} else if (ev == MG_EV_MQTT_CMD) {
6166
struct mg_mqtt_message *mm = (struct mg_mqtt_message *) ev_data;
62-
MG_INFO(("%lu RECEIVED %.*s <- %.*s", c->id, (int) mm->data.len,
63-
mm->data.buf, (int) mm->topic.len, mm->topic.buf));
67+
if (mm->cmd == MQTT_CMD_PINGREQ) mg_mqtt_pong(c);
6468
} else if (ev == MG_EV_CLOSE) {
6569
MG_INFO(("%lu CLOSED", c->id));
6670
s_conn = NULL; // Mark that we're closed
@@ -73,9 +77,14 @@ static void timer_fn(void *arg) {
7377
struct mg_mqtt_opts opts = {.clean = true,
7478
.qos = s_qos,
7579
.topic = mg_str(s_pub_topic),
80+
.keepalive = 5,
7681
.version = 4,
7782
.message = mg_str("bye")};
78-
if (s_conn == NULL) s_conn = mg_mqtt_connect(mgr, s_url, &opts, fn, NULL);
83+
if (s_conn == NULL) {
84+
s_conn = mg_mqtt_connect(mgr, s_url, &opts, fn, NULL);
85+
} else {
86+
mg_mqtt_ping(s_conn);
87+
}
7988
}
8089

8190
int main(int argc, char *argv[]) {
@@ -102,13 +111,12 @@ int main(int argc, char *argv[]) {
102111
}
103112
}
104113

105-
signal(SIGINT, signal_handler); // Setup signal handlers - exist event
106-
signal(SIGTERM, signal_handler); // manager loop on SIGINT and SIGTERM
107-
108114
mg_mgr_init(&mgr);
109115
mg_timer_add(&mgr, 3000, MG_TIMER_REPEAT | MG_TIMER_RUN_NOW, timer_fn, &mgr);
110-
while (s_signo == 0) mg_mgr_poll(&mgr, 1000); // Event loop, 1s timeout
111-
mg_mgr_free(&mgr); // Finished, cleanup
116+
for (;;) {
117+
mg_mgr_poll(&mgr, 1000);
118+
}
119+
mg_mgr_free(&mgr);
112120

113121
return 0;
114122
}

0 commit comments

Comments
 (0)