From baf27b9b839cb5aeef69d8d650a27489962f0c24 Mon Sep 17 00:00:00 2001 From: Andrea Gilardoni Date: Mon, 28 Jul 2025 13:25:44 +0200 Subject: [PATCH 1/8] added Mqtt client Interface definition --- src/MqttInterface.h | 128 ++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 128 insertions(+) create mode 100644 src/MqttInterface.h diff --git a/src/MqttInterface.h b/src/MqttInterface.h new file mode 100644 index 0000000..df968ae --- /dev/null +++ b/src/MqttInterface.h @@ -0,0 +1,128 @@ +#pragma once +#include + +// The Idea for this section of the library is to allow the usage of different implementation for Mqtt Clients +// while preserving the possibility of having an Arduino standardized interface for Mqtt protocol +// One should implement MqttClientInterface and provide a way to instantiate the implementation + +// namespace arduino { // namespace net { namespace mqtt { +typedef int error_t; // TODO move this to be generally available + +using Topic = const char* const; + +#if defined(__has_include) + // Check if string_view is available and usable + #if __has_include() + #include + + // for incoming published messages + using MqttReceiveCallback = std::function; + #endif +#else + typedef void(*MqttReceiveCallback)(Topic)>; +#endif + +// TODO MQTT 5.0 stuff + +// TODO define callback for mqtt events. one should be the default, but the user can always change it + +// copied from zephyr +// enum mqtt_conn_return_code: error_t { +// /** Connection accepted. */ +// MQTT_CONNECTION_ACCEPTED = 0x00, + +// /** The Server does not support the level of the MQTT protocol +// * requested by the Client. +// */ +// MQTT_UNACCEPTABLE_PROTOCOL_VERSION = 0x01, + +// /** The Client identifier is correct UTF-8 but not allowed by the +// * Server. +// */ +// MQTT_IDENTIFIER_REJECTED = 0x02, + +// /** The Network Connection has been made but the MQTT service is +// * unavailable. +// */ +// MQTT_SERVER_UNAVAILABLE = 0x03, + +// /** The data in the user name or password is malformed. */ +// MQTT_BAD_USER_NAME_OR_PASSWORD = 0x04, + +// /** The Client is not authorized to connect. */ +// MQTT_NOT_AUTHORIZED = 0x05 +// }; + +constexpr error_t NotImplementedError= -0x100; // TODO define a proper value + +enum MqttQos: uint8_t { + MqttQos0 = 0, // At Most once + MqttQos1 = 1, // At least once + MqttQos2 = 2, // Exactly once +}; + +typedef uint8_t MqttPublishFlag; +enum MqttPublishFlags: MqttPublishFlag { + None = 0, + RetainEnabled = 1, + DupEnabled = 2, +}; + +// TODO define mqtt version + +constexpr MqttQos QosDefault = MqttQos0; +// constexpr size_t MqttClientIdMaxLength = 256; +constexpr size_t MqttClientIdMaxLength = 40; + +// TODO it shouldn't be called client, since it is not an arduino client +class MqttClientInterface { +public: + virtual ~MqttClientInterface() = default; + + virtual error_t connect(IPAddress ip, uint16_t port) = 0; + virtual error_t connect(const char *host, uint16_t port) = 0; // TODO should host be string instead of c-string? + virtual void disconnect() = 0; + + virtual error_t subscribe(Topic t, MqttQos qos = QosDefault) = 0; + + virtual error_t publish( + Topic t, uint8_t payload[], + size_t size, MqttQos qos = QosDefault, + MqttPublishFlag flags = MqttPublishFlags::None) = 0; + + virtual error_t unsubscribe(Topic t) = 0; + virtual void poll() = 0; + virtual error_t ping() = 0; + + virtual void setReceiveCallback(MqttReceiveCallback cbk) = 0; + + // nullptr means generate it randomly + virtual void setId(const char* client_id = nullptr) = 0; + + // password may be null, if username is null password won't be used + virtual void setUsernamePassword(const char* username, const char* password=nullptr) = 0; + + virtual void setWill( + Topic willTopic, const uint8_t* will_message, + size_t will_size, MqttQos qos=QosDefault, + MqttPublishFlag flags = MqttPublishFlags::None) = 0; + + virtual void setClient(arduino::Client*) = 0; + + // FIXME the following definition may cause errors since one can easily pass a context dependent object + // virtual void setClient(Client&) = 0; + // Could this be a better solution? + // virtual void setClient(Client&&) = 0; + + virtual int read() = 0; + virtual int read(uint8_t payload[], size_t size) = 0; + virtual int available() = 0; + + // The following methods should return the current rx message parameters. + // FIXME what if none? + virtual String messageTopic() const = 0; + virtual int messageDup() const = 0; + virtual uint16_t messageId() const = 0; + virtual MqttQos messageQoS() const = 0; + virtual int messageRetain() const = 0; +}; From 8ed0d8e18a060a8ca12b9dcdb2e181462791d5cf Mon Sep 17 00:00:00 2001 From: Andrea Gilardoni Date: Tue, 29 Jul 2025 09:52:57 +0200 Subject: [PATCH 2/8] updating entrypoint for ArduinoMqttClient --- src/ArduinoMqttClient.h | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/src/ArduinoMqttClient.h b/src/ArduinoMqttClient.h index ff0a8cc..8059a6e 100644 --- a/src/ArduinoMqttClient.h +++ b/src/ArduinoMqttClient.h @@ -20,6 +20,14 @@ #ifndef _ARDUINO_MQTT_CLIENT_H_ #define _ARDUINO_MQTT_CLIENT_H_ +#if defined(__ZEPHYR__) && __ZEPHYR__ == 1 && defined(CONFIG_MQTT_LIB) +#include "implementations/ZephyrMqttClient.h" + +using MqttClient = ZephyrMqttClient; +#else #include "MqttClient.h" +using MqttClient = arduino::MqttClient; +#endif + #endif From f821ff04ac0d694124f70b9ca5a4a0fdd78c00f1 Mon Sep 17 00:00:00 2001 From: Andrea Gilardoni Date: Tue, 29 Jul 2025 09:59:33 +0200 Subject: [PATCH 3/8] Added Zephyr implementation for Mqtt client --- src/implementations/ZephyrMqttClient.cpp | 415 +++++++++++++++++++++++ src/implementations/ZephyrMqttClient.h | 72 ++++ 2 files changed, 487 insertions(+) create mode 100644 src/implementations/ZephyrMqttClient.cpp create mode 100644 src/implementations/ZephyrMqttClient.h diff --git a/src/implementations/ZephyrMqttClient.cpp b/src/implementations/ZephyrMqttClient.cpp new file mode 100644 index 0000000..a589610 --- /dev/null +++ b/src/implementations/ZephyrMqttClient.cpp @@ -0,0 +1,415 @@ + +#if defined(__ZEPHYR__) && __ZEPHYR__ == 1 && defined(CONFIG_MQTT_LIB) +#include "ZephyrMqttClient.h" +#include +#include + +extern "C" { + /** Handler for asynchronous MQTT events */ + static void _mqtt_event_handler(struct mqtt_client *const client, const struct mqtt_evt *evt) { + ((ZephyrMqttClient*)client->user_data)->mqtt_event_handler( + client, + evt); + } +} + +static inline uint8_t convertQosValue(MqttQos qos) { + return static_cast(qos); +} + +/* This helper function is used to generalize the allocation and deallocation + * for zephyr mqtt_utf8 structs. It deallocates the pointed to object if no value is provided + * it allocates the struct if a value is provided + * mqtt_topic could also be used, since the struct respects mqtt_utf8 fields + */ +static inline void __mqtt_utf8_allocation_helper( + struct mqtt_utf8** s, + const uint8_t* value=nullptr, + size_t size=0, + size_t struct_size=sizeof(struct mqtt_utf8)) { + + if(value == nullptr) { + free(*s); + *s = nullptr; + } else { + if(*s == nullptr) { + *s = (struct mqtt_utf8*) malloc(struct_size); + } + + (*s)->utf8 = (const uint8_t*)value; + (*s)->size = size; + } +} + +ZephyrMqttClient::ZephyrMqttClient() +: last_message(nullptr) { + fds.fd = -1; + mqtt_client_init(&client); +} + +ZephyrMqttClient::~ZephyrMqttClient() { + __mqtt_utf8_allocation_helper(&client.user_name); + __mqtt_utf8_allocation_helper(&client.password); + __mqtt_utf8_allocation_helper((struct mqtt_utf8**)&client.will_topic); + __mqtt_utf8_allocation_helper(&client.will_message); +} + +int ZephyrMqttClient::connect(IPAddress ip, uint16_t port) { + struct sockaddr_in *broker4 = (struct sockaddr_in *)&broker; + broker4->sin_addr.s_addr = (uint32_t)ip; + + broker4->sin_family = AF_INET; + broker4->sin_port = htons(port); + + return connect(broker); +} + +int ZephyrMqttClient::connect(const char *host, uint16_t port) { + int rc = 0; + struct addrinfo *result; + struct sockaddr_in *broker4 = (struct sockaddr_in *)&broker; + + const struct addrinfo hints = { + .ai_family = AF_INET, + .ai_socktype = SOCK_STREAM + }; + + /* Resolve IP address of MQTT broker */ + // FIXME port should be a string + rc = getaddrinfo(host, "1883", &hints, &result); + if (rc != 0) { + return -EIO; + } + + // TODO this may not be compatible with ipv6? + broker4->sin_addr.s_addr = ((struct sockaddr_in *)result->ai_addr)->sin_addr.s_addr; + broker4->sin_family = AF_INET; + broker4->sin_port = htons(port); + + freeaddrinfo(result); + + return connect(broker); +} + +int ZephyrMqttClient::connect(sockaddr_storage addr) { + client_init(); + + int rc = 0; + + memcpy(&broker, &addr, sizeof(broker)); + client.broker = &broker; + + rc = mqtt_connect(&client); + if (rc != 0) { + // TODO LOG ERROR? + return rc; + } + + if (client.transport.type == MQTT_TRANSPORT_NON_SECURE) { + fds.fd = client.transport.tcp.sock; + }else if (client.transport.type == MQTT_TRANSPORT_SECURE) { + fds.fd = client.transport.tls.sock; + } + + fds.events = ZSOCK_POLLIN; + + // FIXME this requires the timeout to be properly configured in the poll function + poll(); // make the client connect + return rc; +} + +void ZephyrMqttClient::client_init() { + client.protocol_version = MQTT_VERSION_3_1_1; // FIXME provide a way to set these values + + // client.client_id.utf8 = (const uint8_t*)_clientid; // TODO generate client_id if not provided + // client.client_id.size = strlen((const char*) _clientid); + + /* MQTT buffers configuration */ + client.rx_buf = rx_buffer; + client.rx_buf_size = sizeof(rx_buffer); + client.tx_buf = tx_buffer; + client.tx_buf_size = sizeof(tx_buffer); + + client.user_data = this; + client.evt_cb = _mqtt_event_handler; + + // TODO TLS settings + client.transport.type = MQTT_TRANSPORT_NON_SECURE; + + // client.transport.type = MQTT_TRANSPORT_SECURE; + // struct mqtt_sec_config *tls_config = &client.transport.tls.config; + + // tls_config->peer_verify = TLS_PEER_VERIFY_REQUIRED; + // tls_config->cipher_list = NULL; + // // tls_config->cipher_list = mbedtls_cipher_list(); + // // tls_config->cipher_count = mbedtls_cipher_list(); + // tls_config->sec_tag_list = m_sec_tags; + // tls_config->sec_tag_count = 2; + // tls_config->hostname = "iot.arduino.cc"; + // // tls_config->cert_nocopy = TLS_CERT_NOCOPY_NONE; +} + +void ZephyrMqttClient::disconnect() { // TODO call this in the destructor + mqtt_disconnect(&client, nullptr); // TODO mqtt_disconnect_param takes values in case of mqtt version 5 + + close(fds.fd); // FIXME is this correct? + fds.fd = -1; +} + +uint8_t ZephyrMqttClient::connected() { + return fds.fd != -1; // FIXME is this enough? nope +} + +ZephyrMqttClient::operator bool() { + return connected() == 1; +} + +error_t ZephyrMqttClient::subscribe(Topic t, MqttQos qos) { + // TODO allow user to subscribe to multiple topics + + struct mqtt_topic topics[] = {{ + .topic = { + .utf8 = (const uint8_t*) t, + .size = strlen(t) + }, + .qos = convertQosValue(qos), + }}; + + const struct mqtt_subscription_list sub_list { + .list = topics, + .list_count = 1, + .message_id = sys_rand16_get() + }; + + return mqtt_subscribe(&client, &sub_list); // should this parameter kept in an available storage +} + +error_t ZephyrMqttClient::publish(Topic t, uint8_t payload[], size_t size, MqttQos qos, MqttPublishFlag flags) { + struct mqtt_publish_param param; + + param.message.topic.qos = convertQosValue(qos); + param.message.topic.topic.utf8 = (uint8_t *)t; + param.message.topic.topic.size = strlen(t); + param.message.payload.data = payload; + param.message.payload.len = size; + param.message_id = sys_rand16_get(); // TODO what value should this take? + param.dup_flag = (flags & DupEnabled) == DupEnabled; + param.retain_flag = (flags & RetainEnabled) == RetainEnabled; + + return mqtt_publish(&client, ¶m); // error codes should be proxied +} + +error_t ZephyrMqttClient::unsubscribe(Topic t) { + struct mqtt_topic topic; + struct mqtt_subscription_list unsub; + + topic.topic.utf8 = (const uint8_t*)t; + topic.topic.size = strlen(t); + unsub.list = &topic; + unsub.list_count = 1U; + unsub.message_id = sys_rand16_get(); + + return mqtt_unsubscribe(&client, &unsub); +} + +void ZephyrMqttClient::poll() { + int rc; + + // TODO This defines the minimum time poll should wait till another event occurs, find a proper value + // auto _mqtt_keepalive_time_left = mqtt_keepalive_time_left(&client); + auto _mqtt_keepalive_time_left = 100; // poll returns immediately if no event occurs + + rc = zsock_poll(&fds, 1, _mqtt_keepalive_time_left); + if (rc < 0) { + // LOG_ERR("Socket poll error [%d]", rc); + return; + } + + // FIXME understand what this does + + if (rc >= 0) { + if (fds.revents & ZSOCK_POLLIN) { + /* MQTT data received */ + rc = mqtt_input(&client); + if (rc != 0) { + // LOG_ERR("MQTT Input failed [%d]", rc); + // return rc; + return; + } + + /* Socket error */ + if (fds.revents & (ZSOCK_POLLHUP | ZSOCK_POLLERR)) { + // LOG_ERR("MQTT socket closed / error"); + // return -ENOTCONN; + } + } + } else { + /* Socket poll timed out, time to call mqtt_live() */ + rc = mqtt_live(&client); + if (rc != 0) { + // LOG_ERR("MQTT Live failed [%d]", rc); + // return rc; + return; + } + } + // return rc; +} + +error_t ZephyrMqttClient::ping() { + return mqtt_ping(&client); +} + +int ZephyrMqttClient::read() { + uint8_t a; + int res = read(&a, 1); + return res <= 0 ? res : a; +} + +int ZephyrMqttClient::read(uint8_t payload[], size_t len) { + int res = mqtt_read_publish_payload_blocking(&client, payload, len); + + if(res < 0) { + return res; + } + + if(available() == 0) { + if (messageQoS() == MqttQos1) { + const struct mqtt_puback_param ack_param = { + .message_id = messageId() + }; + mqtt_publish_qos1_ack(&client, &ack_param); + } else if (messageQoS() == MqttQos2) { + const struct mqtt_pubrec_param rec_param = { + .message_id = messageId() + }; + + mqtt_publish_qos2_receive(&client, &rec_param); + } + + delete last_message; + last_message = nullptr; + } + + return res; +} + +int ZephyrMqttClient::available() { + return client.internal.remaining_payload; +} + +void ZephyrMqttClient::mqtt_event_handler( + struct mqtt_client *const client, const struct mqtt_evt *evt) { + + switch (evt->type) { + case MQTT_EVT_PUBREC: { + if (evt->result != 0) { + break; + } + + const struct mqtt_pubrel_param rel_param = { + .message_id = evt->param.pubrec.message_id + }; + + mqtt_publish_qos2_release(client, &rel_param); + break; + } + case MQTT_EVT_PUBREL: { + if (evt->result != 0) { + break; + } + + const struct mqtt_pubcomp_param rec_param = { + .message_id = evt->param.pubrel.message_id + }; + + mqtt_publish_qos2_complete(client, &rec_param); + break; + } + case MQTT_EVT_PUBLISH: { + const struct mqtt_publish_param *p = &evt->param.publish; + if(last_message == nullptr) { + last_message = new mqtt_publish_param; + } + + memcpy(last_message, p, sizeof(*last_message)); + + // Pass the stream to the user callback + if(_cbk != nullptr) { + const char* topic = (const char*)p->message.topic.topic.utf8; // will this be null terminated string? + + _cbk(topic); + } + } + default: + break; + } +} + +void ZephyrMqttClient::setId(const char* client_id) { + client.client_id.utf8 = (const uint8_t*)client_id; + client.client_id.size = strlen((const char*)client_id); +} + +void ZephyrMqttClient::setUsernamePassword(const char* username, const char* password) { + __mqtt_utf8_allocation_helper(&client.user_name, (const uint8_t*)username, strlen(username)); + __mqtt_utf8_allocation_helper(&client.password, (const uint8_t*)password, strlen(password)); +} + +void ZephyrMqttClient::setWill( + Topic will_topic, const uint8_t* will_message, size_t will_size, MqttQos qos, + MqttPublishFlag flags) { // TODO use publish flags + __mqtt_utf8_allocation_helper( + (struct mqtt_utf8**)&client.will_topic, (const uint8_t*)will_topic, + strlen(will_topic), sizeof(struct mqtt_topic)); + client.will_topic->qos = qos; + + __mqtt_utf8_allocation_helper(&client.will_message, will_message, will_size); + +} + +void ZephyrMqttClient::setReceiveCallback(MqttReceiveCallback cbk) { + this->_cbk = cbk; +} + +String ZephyrMqttClient::messageTopic() const { + if(last_message != nullptr) { + return (const char*)last_message->message.topic.topic.utf8; + } + + return ""; +} + +int ZephyrMqttClient::messageDup() const { + if(last_message != nullptr) { + return last_message->dup_flag; + } + + return -1; +} + +uint16_t ZephyrMqttClient::messageId() const { + if(last_message != nullptr) { + return last_message->message_id; + } + + return 0; +} + +MqttQos ZephyrMqttClient::messageQoS() const { + if(last_message != nullptr) { + return static_cast(last_message->message.topic.qos); + } + + return QosDefault; +} + +int ZephyrMqttClient::messageRetain() const { + if(last_message != nullptr) { + return last_message->retain_flag; + } + + return -1; +} + + +#endif // CONFIG_MQTT_LIB diff --git a/src/implementations/ZephyrMqttClient.h b/src/implementations/ZephyrMqttClient.h new file mode 100644 index 0000000..63e68bf --- /dev/null +++ b/src/implementations/ZephyrMqttClient.h @@ -0,0 +1,72 @@ +#pragma once +#include "../MqttInterface.h" +#include +#include + +// TODO calculate minimum size +#define CONFIG_MQTT_CLIENT_RX_BUFFER 128 +#define CONFIG_MQTT_CLIENT_TX_BUFFER 128 + +class ZephyrMqttClient: public MqttClientInterface { +public: + ZephyrMqttClient(); + ~ZephyrMqttClient(); + + int connect(IPAddress ip, uint16_t port) override; + int connect(const char *host, uint16_t port) override; + void disconnect() override; + + uint8_t connected(); + operator bool(); + + error_t subscribe( + Topic t, + MqttQos qos = QosDefault) override; + + error_t publish( + Topic t, uint8_t payload[], + size_t size, MqttQos qos = QosDefault, + MqttPublishFlag flags = MqttPublishFlags::None) override; + + error_t unsubscribe(Topic t) override; + void poll() override; + error_t ping() override; + + void setReceiveCallback(MqttReceiveCallback cbk) override; + void setId(const char* client_id = nullptr) override; + void setUsernamePassword(const char* username, const char* password=nullptr) override; + void setWill( + Topic willTopic, const uint8_t* will_message, + size_t will_size, MqttQos qos=QosDefault, + MqttPublishFlag flags = MqttPublishFlags::None) override; + + // TODO should this be private? + void mqtt_event_handler(struct mqtt_client *const client, const struct mqtt_evt *evt); + + // in zephyr we do not need to provide a client on which to rely on + void setClient(arduino::Client*) override {}; + + int read() override; + int read(uint8_t payload[], size_t len) override; + int available() override; + + String messageTopic() const override; + int messageDup() const override; + uint16_t messageId() const override; + MqttQos messageQoS() const override; + int messageRetain() const override; +private: + int connect(sockaddr_storage addr); + void client_init(); + + struct mqtt_client client; + struct zsock_pollfd fds; + sockaddr_storage broker; // broker address TODO should we keep it as an attribute + + uint8_t rx_buffer[CONFIG_MQTT_CLIENT_RX_BUFFER]; + uint8_t tx_buffer[CONFIG_MQTT_CLIENT_TX_BUFFER]; + + MqttReceiveCallback _cbk; + + struct mqtt_publish_param *last_message; +}; From dfe0a7dab3888b072389e87123b688952e0fc81e Mon Sep 17 00:00:00 2001 From: Andrea Gilardoni Date: Tue, 29 Jul 2025 10:24:00 +0200 Subject: [PATCH 4/8] Changing MqttClient namespace --- src/MqttClient.h | 3 +++ 1 file changed, 3 insertions(+) diff --git a/src/MqttClient.h b/src/MqttClient.h index 522f023..275e4ec 100644 --- a/src/MqttClient.h +++ b/src/MqttClient.h @@ -39,6 +39,8 @@ #include #endif +namespace arduino { + class MqttClient : public Client { public: MqttClient(Client* client); @@ -196,5 +198,6 @@ class MqttClient : public Client { size_t _willMessageIndex; uint8_t _willFlags; }; +} #endif From f4001e732d50c333ef9358737f1db6e5ef8b1634 Mon Sep 17 00:00:00 2001 From: Andrea Gilardoni Date: Tue, 29 Jul 2025 10:44:11 +0200 Subject: [PATCH 5/8] codestyle unification --- src/MqttClient.cpp | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/src/MqttClient.cpp b/src/MqttClient.cpp index 221b230..82831f8 100644 --- a/src/MqttClient.cpp +++ b/src/MqttClient.cpp @@ -448,9 +448,9 @@ void MqttClient::poll() if ((b & 0x80) == 0) { // length done bool malformedResponse = false; - if (_rxType == MQTT_CONNACK || + if (_rxType == MQTT_CONNACK || _rxType == MQTT_PUBACK || - _rxType == MQTT_PUBREC || + _rxType == MQTT_PUBREC || _rxType == MQTT_PUBCOMP || _rxType == MQTT_UNSUBACK) { malformedResponse = (_rxFlags != 0x00 || _rxLength != 2); @@ -458,7 +458,7 @@ void MqttClient::poll() malformedResponse = ((_rxFlags & 0x06) == 0x06); } else if (_rxType == MQTT_PUBREL) { malformedResponse = (_rxFlags != 0x02 || _rxLength != 2); - } else if (_rxType == MQTT_SUBACK) { + } else if (_rxType == MQTT_SUBACK) { malformedResponse = (_rxFlags != 0x00 || _rxLength != 3); } else if (_rxType == MQTT_PINGRESP) { malformedResponse = (_rxFlags != 0x00 || _rxLength != 0); @@ -531,7 +531,7 @@ void MqttClient::poll() if (_rxMessageIndex == 2) { _rxMessageTopicLength = (_rxMessageBuffer[0] << 8) | _rxMessageBuffer[1]; _rxLength -= 2; - + _rxMessageTopic = ""; _rxMessageTopic.reserve(_rxMessageTopicLength); @@ -722,7 +722,7 @@ int MqttClient::read(uint8_t *buf, size_t size) if (b == -1) { break; - } + } result++; *buf++ = b; @@ -819,7 +819,7 @@ void MqttClient::setTxPayloadSize(unsigned short size) _txPayloadBuffer = NULL; _txPayloadBufferIndex = 0; } - + _tx_payload_buffer_size = size; } From 99fd1129e335eccf9338a40fce7c2b7a023fbd0d Mon Sep 17 00:00:00 2001 From: Andrea Gilardoni Date: Tue, 29 Jul 2025 16:06:59 +0200 Subject: [PATCH 6/8] Making MqttClient inherit from MqttClientInterface --- src/MqttClient.cpp | 87 +++++++++++++++++++++++++++++++++++++--------- src/MqttClient.h | 74 ++++++++++++++++++++++++--------------- 2 files changed, 117 insertions(+), 44 deletions(-) diff --git a/src/MqttClient.cpp b/src/MqttClient.cpp index 82831f8..c9bb864 100644 --- a/src/MqttClient.cpp +++ b/src/MqttClient.cpp @@ -151,14 +151,25 @@ int MqttClient::messageDup() const return -1; } -int MqttClient::messageQoS() const +uint16_t MqttClient::messageId() const { if (_rxState == MQTT_CLIENT_RX_STATE_READ_PUBLISH_PAYLOAD) { // message received and ready for reading - return _rxMessageQoS; + return _rxPacketId; } - return -1; + return 0; +} + + +MqttQos MqttClient::messageQoS() const +{ + if (_rxState == MQTT_CLIENT_RX_STATE_READ_PUBLISH_PAYLOAD) { + // message received and ready for reading + return static_cast(_rxMessageQoS); + } + + return QosDefault; } int MqttClient::messageRetain() const @@ -171,6 +182,32 @@ int MqttClient::messageRetain() const return -1; } +void MqttClient::setClient(arduino::Client* client) { + if(_client != nullptr && _client->connected()) { + // TODO if the current client is connected we cannot perform the change, first call disconnect + return; + } + + _client = client; +} + +void MqttClient::setReceiveCallback(MqttReceiveCallback cbk) { + _cbk = cbk; +} + +error_t MqttClient::publish(Topic t, uint8_t payload[], size_t size, MqttQos qos, MqttPublishFlag flags) { + int error = this->beginMessage(t, (flags & RetainEnabled) == RetainEnabled, qos, (flags & DupEnabled) == DupEnabled); + + if(error == 0) { // TODO replace this with a proper enum value + return error; + } + + int res = this->write(payload, size); + this->endMessage(); + + return res; +} + int MqttClient::beginMessage(const char* topic, unsigned long size, bool retain, uint8_t qos, bool dup) { _txMessageTopic = topic; @@ -259,6 +296,20 @@ int MqttClient::endMessage() return 1; } +void MqttClient::setWill( + Topic willTopic, const uint8_t* will_message, size_t will_size, MqttQos qos, MqttPublishFlag flags) { + int error = this->beginWill(willTopic, (flags & RetainEnabled) == RetainEnabled, qos, (flags & DupEnabled) == DupEnabled); + + if(error == 0) { // TODO replace this with a proper enum value + return; + } + + int res = this->write(will_message, will_size); + this->endWill(); + + return; +} + int MqttClient::beginWill(const char* topic, unsigned short size, bool retain, uint8_t qos) { int topicLength = strlen(topic); @@ -314,7 +365,7 @@ int MqttClient::endWill() return 1; } -int MqttClient::subscribe(const char* topic, uint8_t qos) +error_t MqttClient::subscribe(Topic topic, MqttQos qos) { int topicLength = strlen(topic); int remainingLength = topicLength + 5; @@ -362,12 +413,12 @@ int MqttClient::subscribe(const char* topic, uint8_t qos) return 0; } -int MqttClient::subscribe(const String& topic, uint8_t qos) +error_t MqttClient::subscribe(const String& topic, MqttQos qos) { return subscribe(topic.c_str(), qos); } -int MqttClient::unsubscribe(const char* topic) +error_t MqttClient::unsubscribe(Topic topic) { int topicLength = strlen(topic); int remainingLength = topicLength + 4; @@ -565,16 +616,18 @@ void MqttClient::poll() } else { _rxState = MQTT_CLIENT_RX_STATE_READ_PUBLISH_PAYLOAD; - if (_onMessage) { + if(_cbk) { + _cbk(_rxMessageTopic.c_str()); + } else if (_onMessage) { #ifdef MQTT_CLIENT_STD_FUNCTION_CALLBACK _onMessage(this,_rxLength); #else _onMessage(_rxLength); #endif + } - if (_rxLength == 0) { - _rxState = MQTT_CLIENT_RX_STATE_READ_TYPE; - } + if ((_onMessage || _cbk) && _rxLength == 0) { + _rxState = MQTT_CLIENT_RX_STATE_READ_TYPE; } } } @@ -592,7 +645,9 @@ void MqttClient::poll() _rxState = MQTT_CLIENT_RX_STATE_READ_PUBLISH_PAYLOAD; - if (_onMessage) { + if(_cbk) { + _cbk(_rxMessageTopic.c_str()); + } else if (_onMessage) { #ifdef MQTT_CLIENT_STD_FUNCTION_CALLBACK _onMessage(this,_rxLength); #else @@ -647,12 +702,12 @@ void MqttClient::poll() } } -int MqttClient::connect(IPAddress ip, uint16_t port) +error_t MqttClient::connect(IPAddress ip, uint16_t port) { return connect(ip, NULL, port); } -int MqttClient::connect(const char *host, uint16_t port) +error_t MqttClient::connect(const char *host, uint16_t port) { return connect((uint32_t)0, host, port); } @@ -706,7 +761,7 @@ int MqttClient::read() return b; } -int MqttClient::read(uint8_t *buf, size_t size) +int MqttClient::read(uint8_t buf[], size_t size) { size_t result = 0; @@ -833,7 +888,7 @@ int MqttClient::subscribeQoS() const return _subscribeQos; } -int MqttClient::connect(IPAddress ip, const char* host, uint16_t port) +error_t MqttClient::connect(IPAddress ip, const char* host, uint16_t port) { if (clientConnected()) { _client->stop(); @@ -1041,7 +1096,7 @@ void MqttClient::pubcomp(uint16_t id) endPacket(); } -void MqttClient::ping() +error_t MqttClient::ping() { uint8_t packetBuffer[2]; diff --git a/src/MqttClient.h b/src/MqttClient.h index 275e4ec..7065e6a 100644 --- a/src/MqttClient.h +++ b/src/MqttClient.h @@ -21,7 +21,7 @@ #define _MQTT_CLIENT_H_ #include -#include +#include "MqttInterface.h" #define MQTT_CONNECTION_REFUSED -2 #define MQTT_CONNECTION_TIMEOUT -1 @@ -41,9 +41,9 @@ namespace arduino { -class MqttClient : public Client { +class MqttClient : public MqttClientInterface, public Client { public: - MqttClient(Client* client); + MqttClient(Client* client=nullptr); MqttClient(Client& client); virtual ~MqttClient(); @@ -55,34 +55,51 @@ class MqttClient : public Client { void onMessage(void(*)(int)); #endif + error_t publish( + Topic t, uint8_t payload[], + size_t size, MqttQos qos = QosDefault, + MqttPublishFlag flags = MqttPublishFlags::None); + + error_t ping() override; + + void setReceiveCallback(MqttReceiveCallback cbk); + + void setWill( + Topic willTopic, const uint8_t* will_message, + size_t will_size, MqttQos qos=QosDefault, + MqttPublishFlag flags = MqttPublishFlags::None) override; + + void setClient(arduino::Client*) override; + int parseMessage(); - String messageTopic() const; - int messageDup() const; - int messageQoS() const; - int messageRetain() const; - - int beginMessage(const char* topic, unsigned long size, bool retain = false, uint8_t qos = 0, bool dup = false); - int beginMessage(const String& topic, unsigned long size, bool retain = false, uint8_t qos = 0, bool dup = false); - int beginMessage(const char* topic, bool retain = false, uint8_t qos = 0, bool dup = false); - int beginMessage(const String& topic, bool retain = false, uint8_t qos = 0, bool dup = false); + String messageTopic() const override; + int messageDup() const override; + uint16_t messageId() const override; + MqttQos messageQoS() const override; + int messageRetain() const override; + + int beginMessage(const char* topic, unsigned long size, bool retain = false, uint8_t qos = QosDefault, bool dup = false); + int beginMessage(const String& topic, unsigned long size, bool retain = false, uint8_t qos = QosDefault, bool dup = false); + int beginMessage(const char* topic, bool retain = false, uint8_t qos = QosDefault, bool dup = false); + int beginMessage(const String& topic, bool retain = false, uint8_t qos = QosDefault, bool dup = false); int endMessage(); - int beginWill(const char* topic, unsigned short size, bool retain, uint8_t qos); - int beginWill(const String& topic, unsigned short size, bool retain, uint8_t qos); - int beginWill(const char* topic, bool retain, uint8_t qos); - int beginWill(const String& topic, bool retain, uint8_t qos); + int beginWill(const char* topic, unsigned short size, bool retain, uint8_t qos = QosDefault); + int beginWill(const String& topic, unsigned short size, bool retain, uint8_t qos = QosDefault); + int beginWill(const char* topic, bool retain, uint8_t qos = QosDefault); + int beginWill(const String& topic, bool retain, uint8_t qos = QosDefault); int endWill(); - int subscribe(const char* topic, uint8_t qos = 0); - int subscribe(const String& topic, uint8_t qos = 0); - int unsubscribe(const char* topic); - int unsubscribe(const String& topic); + error_t subscribe(Topic topic, MqttQos qos = QosDefault) override; + error_t subscribe(const String& topic, MqttQos qos = QosDefault); + error_t unsubscribe(Topic topic) override; + error_t unsubscribe(const String& topic); - void poll(); + void poll() override; // from Client - virtual int connect(IPAddress ip, uint16_t port = 1883); - virtual int connect(const char *host, uint16_t port = 1883); + error_t connect(IPAddress ip, uint16_t port = 1883) override; + error_t connect(const char *host, uint16_t port = 1883) override; #ifdef ESP8266 virtual int connect(const IPAddress& ip, uint16_t port) { return 0; }; /* ESP8266 core defines this pure virtual in Client.h */ #endif @@ -90,17 +107,17 @@ class MqttClient : public Client { virtual size_t write(const uint8_t *buf, size_t size); virtual int available(); virtual int read(); - virtual int read(uint8_t *buf, size_t size); + virtual int read(uint8_t buf[], size_t size); virtual int peek(); virtual void flush(); virtual void stop(); virtual uint8_t connected(); virtual operator bool(); - void setId(const char* id); + void setId(const char* id) override; void setId(const String& id); - void setUsernamePassword(const char* username, const char* password); + void setUsernamePassword(const char* username, const char* password) override; void setUsernamePassword(const String& username, const String& password); void setCleanSession(bool cleanSession); @@ -123,8 +140,7 @@ class MqttClient : public Client { void pubrec(uint16_t id); void pubrel(uint16_t id); void pubcomp(uint16_t id); - void ping(); - void disconnect(); + void disconnect() override; int beginPacket(uint8_t type, uint8_t flags, size_t length, uint8_t* buffer); int writeString(const char* s, uint16_t length); @@ -197,6 +213,8 @@ class MqttClient : public Client { uint16_t _willBufferIndex; size_t _willMessageIndex; uint8_t _willFlags; + + MqttReceiveCallback _cbk; }; } From 1a25bb19ebb747d52296a0440d288d38b5c15e27 Mon Sep 17 00:00:00 2001 From: Andrea Gilardoni Date: Tue, 29 Jul 2025 16:58:25 +0200 Subject: [PATCH 7/8] Adapting Example to use qos enum instead of int --- examples/WiFiAdvancedCallback/WiFiAdvancedCallback.ino | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/examples/WiFiAdvancedCallback/WiFiAdvancedCallback.ino b/examples/WiFiAdvancedCallback/WiFiAdvancedCallback.ino index 4b1a196..4b4796f 100644 --- a/examples/WiFiAdvancedCallback/WiFiAdvancedCallback.ino +++ b/examples/WiFiAdvancedCallback/WiFiAdvancedCallback.ino @@ -118,7 +118,7 @@ void setup() { // subscribe to a topic // the second parameter sets the QoS of the subscription, // the the library supports subscribing at QoS 0, 1, or 2 - int subscribeQos = 1; + MqttQos subscribeQos = MqttQos1; mqttClient.subscribe(inTopic, subscribeQos); From 9aec4c13dee297dca912d9f91cb404f4e85ffdd4 Mon Sep 17 00:00:00 2001 From: Andrea Gilardoni Date: Wed, 30 Jul 2025 14:05:50 +0200 Subject: [PATCH 8/8] [TMP] adding an example that had been used to test the new mqtt interface This example may be deleted before merging --- .../NewInterfaceExample.ino | 154 ++++++++++++++++++ 1 file changed, 154 insertions(+) create mode 100644 examples/NewInterfaceExample/NewInterfaceExample.ino diff --git a/examples/NewInterfaceExample/NewInterfaceExample.ino b/examples/NewInterfaceExample/NewInterfaceExample.ino new file mode 100644 index 0000000..8ac4f49 --- /dev/null +++ b/examples/NewInterfaceExample/NewInterfaceExample.ino @@ -0,0 +1,154 @@ +/* + * The following example is added for testing purposes and may be deleted before publishing the PR. + * This had been tested with mosquitto with the following config files inside of a local directory `/mosquitto/config` + * ``` + * port 1883 + * allow_anonymous true + * password_file /mosquitto/config/passwd + * ``` + * passwd file containing auth arduino:arduino: + * ``` + * arduino:$7$101$CkoJckWT8WWX9+Z1$mzMdh0TD99P41AEAT55GXDIPf8FLKXp7LS3BTpEvhBUnXMYj4myWSGMH7aShCufuvsbtUXChk+KCW3AncUgtuA== + * ``` + * run with docker: + * ``` + * docker run -it -p 1883:1883 -v "${PWD}/mosquitto/config:/mosquitto/config" eclipse-mosquitto + * ``` + */ + +#include +#include +#include + +uint8_t payload[] = { + 0xda, 0x00, 0x01, 0x00, 0x00, 0x81, 0x58, 0x20, + 0x1c, 0x05, 0xeb, 0xfa, 0xee, 0x5a, 0x38, 0xa5, + 0x44, 0x05, 0xe9, 0x1b, 0xaa, 0xeb, 0xeb, 0x40, + 0x68, 0x3e, 0xad, 0x1f, 0x93, 0x90, 0xd8, 0x57, + 0x4a, 0xe3, 0x35, 0x9d, 0x84, 0xc9, 0x0f, 0x64, +}; + +char topic[] = "/a/d/f334982b-032f-40e9-a91a-99da31a90dbe/c/up"; +uint32_t start; +int i = 0; + +MqttClient client; +// ZephyrMqttClient client; +volatile bool packet = false; + +void mqtt_cbk(const char* topic) { + Serial.print("Received a message on topic: \""); + Serial.print(topic); + // Serial.print("\" with payload: 0x"); + + // while(stream.available() > 0) { + // int res = stream.read(); + + // // if(res < 0x10) { + // // Serial.print('0'); + // // } + // // Serial.print(res, HEX); + // } + + // while(client.available() > 0) { + // int res = client.read(); + + // if(res < 0x10) { + // Serial.print('0'); + // } + // Serial.print(res, HEX); + // } + // packet = true; + + Serial.println(); +} + +void setup() { + // put your setup code here, to run once: + Serial.begin(115200); + while (!Serial) { + ; // wait for serial port to connect. Needed for native USB port only + } + + Serial.print("begin "); + Serial.println(i); + // int err = tls_credential_add(APP_CA_CERT_TAG, TLS_CREDENTIAL_CA_CERTIFICATE, + // ca_certificate, sizeof(ca_certificate)); + // if (err < 0) { + // DEBUG_ERROR("Failed to register public certificate: %d", err); + // } + + // err = tls_credential_add(APP_KEYS, TLS_CREDENTIAL_PUBLIC_CERTIFICATE, + // client_public_key, sizeof(client_public_key)); + // if (err < 0) { + // DEBUG_ERROR("Failed to register puiblic key: %d", err); + // } + + // err = tls_credential_add(APP_KEYS, TLS_CREDENTIAL_PRIVATE_KEY, + // client_private_key, sizeof(client_private_key)); + // if (err < 0) { + // DEBUG_ERROR("Failed to register private key: %d", err); + // } + + while (Ethernet.linkStatus() != LinkON) { + Serial.println("waiting for link on"); + delay(100); + } + Ethernet.begin(); + + Serial.println(Ethernet.localIP()); + + client.setId("f334982b-032f-40e9-a91a-99da31a90dbe"); + client.setUsernamePassword("arduino", "arduino"); + + client.setReceiveCallback(mqtt_cbk); + + // int res = client.connect("iot.arduino.cc", 8885); + int res = client.connect(IPAddress(192, 168, 10, 250), 1883); + // int res = client.connect("test.mosquitto.org", 1883); + + Serial.print("connection result: "); + Serial.println(res); + + res = client.subscribe("test0", MqttQos0); + res = client.subscribe("test1", MqttQos1); + res = client.subscribe("test2", MqttQos2); + + Serial.print("subscribe result: "); + Serial.println(res); + + start = millis(); +} +void loop() { + client.poll(); + + if(client.available() > 0) { + Serial.println("packet received"); + Serial.println(client.available()); + + while(client.available() > 0) { + int res = client.read(); + + if(res < 0x10) { + Serial.print('0'); + } + Serial.print(res, HEX); + } + + packet = false; + } + + if(millis() - start > 10000) { + int res = client.publish( + topic, + payload, + sizeof(payload) + ); + + Serial.print("publish res: "); + Serial.println(res); + + start = millis(); + } + delay(100); +}