Skip to content

Commit 5dac58d

Browse files
fixup! Added Zephyr implementation for Mqtt client
1 parent 0583378 commit 5dac58d

File tree

2 files changed

+85
-64
lines changed

2 files changed

+85
-64
lines changed

src/implementations/ZephyrMqttClient.cpp

Lines changed: 74 additions & 59 deletions
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,8 @@ static inline void __mqtt_utf8_allocation_helper(
4141
}
4242
}
4343

44-
ZephyrMqttClient::ZephyrMqttClient() {
44+
ZephyrMqttClient::ZephyrMqttClient()
45+
: last_message(nullptr) {
4546
fds.fd = -1;
4647
mqtt_client_init(&client);
4748
}
@@ -198,21 +199,6 @@ error_t ZephyrMqttClient::publish(Topic t, uint8_t payload[], size_t size, MqttQ
198199
return mqtt_publish(&client, &param); // error codes should be proxied
199200
}
200201

201-
class ZephyrMqttOStream: public MqttOStream {
202-
public:
203-
ZephyrMqttOStream(): MqttOStream(NotImplementedError) {}
204-
205-
size_t write(uint8_t) override { return 0; };
206-
size_t write(const uint8_t *buffer, size_t size) override { return 0; };
207-
int availableForWrite() override { return 0; };
208-
};
209-
210-
MqttOStream&& ZephyrMqttClient::publish(
211-
Topic t, MqttQos qos,
212-
MqttPublishFlag flags) {
213-
return std::move(ZephyrMqttOStream()); // TODO is this correct?
214-
}
215-
216202
error_t ZephyrMqttClient::unsubscribe(Topic t) {
217203
struct mqtt_topic topic;
218204
struct mqtt_subscription_list unsub;
@@ -273,56 +259,43 @@ error_t ZephyrMqttClient::ping() {
273259
return mqtt_ping(&client);
274260
}
275261

276-
// this class should extend something like IStream, that doesn't exist in arduino apis
277-
class ZephyrMqttReadStream: public IStream {
278-
public:
279-
ZephyrMqttReadStream(struct mqtt_client *client, const struct mqtt_publish_param *p)
280-
: client(client), p(p) {}
281-
282-
~ZephyrMqttReadStream() { end(); }
283-
284-
size_t readBytes(uint8_t* buf, size_t s) override {
285-
return mqtt_read_publish_payload(client, buf, s);
286-
}
287-
288-
int available() override {
289-
return client->internal.remaining_payload;
290-
}
262+
int ZephyrMqttClient::read() {
263+
uint8_t a;
264+
int res = read(&a, 1);
265+
return res <= 0 ? res : a;
266+
}
291267

292-
int read() override {
293-
uint8_t r;
294-
int res = mqtt_read_publish_payload_blocking(client, &r, 1);
268+
int ZephyrMqttClient::read(uint8_t payload[], size_t len) {
269+
int res = mqtt_read_publish_payload_blocking(&client, payload, len);
295270

296-
return res==1 ? r : res;
271+
if(res < 0) {
272+
return res;
297273
}
298274

299-
void end() {
300-
if(p == nullptr) {
301-
return;
302-
}
303-
// consume all remaining payload
304-
mqtt_readall_publish_payload(client, nullptr, available());
305-
306-
// acknowledgements should be sent when the entire packet is consumed, the maximum size for an mqtt
307-
// message could be 4MB, so this should be treated like a stream.
308-
if (p->message.topic.qos == MQTT_QOS_1_AT_LEAST_ONCE) {
275+
if(available() == 0) {
276+
if (messageQoS() == MqttQos1) {
309277
const struct mqtt_puback_param ack_param = {
310-
.message_id = p->message_id
278+
.message_id = messageId()
311279
};
312-
mqtt_publish_qos1_ack(client, &ack_param);
313-
} else if (p->message.topic.qos == MQTT_QOS_2_EXACTLY_ONCE) {
280+
mqtt_publish_qos1_ack(&client, &ack_param);
281+
} else if (messageQoS() == MqttQos2) {
314282
const struct mqtt_pubrec_param rec_param = {
315-
.message_id = p->message_id
283+
.message_id = messageId()
316284
};
317-
mqtt_publish_qos2_receive(client, &rec_param);
285+
286+
mqtt_publish_qos2_receive(&client, &rec_param);
318287
}
319288

320-
p = nullptr;
289+
delete last_message;
290+
last_message = nullptr;
321291
}
322-
private:
323-
struct mqtt_client *client;
324-
const struct mqtt_publish_param *p;
325-
};
292+
293+
return res;
294+
}
295+
296+
int ZephyrMqttClient::available() {
297+
return client.internal.remaining_payload;
298+
}
326299

327300
void ZephyrMqttClient::mqtt_event_handler(
328301
struct mqtt_client *const client, const struct mqtt_evt *evt) {
@@ -354,16 +327,18 @@ void ZephyrMqttClient::mqtt_event_handler(
354327
}
355328
case MQTT_EVT_PUBLISH: {
356329
const struct mqtt_publish_param *p = &evt->param.publish;
357-
ZephyrMqttReadStream stream(&this->client, p);
330+
if(last_message == nullptr) {
331+
last_message = new mqtt_publish_param;
332+
}
333+
334+
memcpy(last_message, p, sizeof(*last_message));
358335

359336
// Pass the stream to the user callback
360337
if(_cbk != nullptr) {
361338
const char* topic = (const char*)p->message.topic.topic.utf8; // will this be null terminated string?
362339

363-
_cbk(topic, stream);
340+
_cbk(topic);
364341
}
365-
366-
stream.end();
367342
}
368343
default:
369344
break;
@@ -396,5 +371,45 @@ void ZephyrMqttClient::setReceiveCallback(MqttReceiveCallback cbk) {
396371
this->_cbk = cbk;
397372
}
398373

374+
String ZephyrMqttClient::messageTopic() const {
375+
if(last_message != nullptr) {
376+
return (const char*)last_message->message.topic.topic.utf8;
377+
}
378+
379+
return "";
380+
}
381+
382+
int ZephyrMqttClient::messageDup() const {
383+
if(last_message != nullptr) {
384+
return last_message->dup_flag;
385+
}
386+
387+
return -1;
388+
}
389+
390+
uint16_t ZephyrMqttClient::messageId() const {
391+
if(last_message != nullptr) {
392+
return last_message->message_id;
393+
}
394+
395+
return 0;
396+
}
397+
398+
MqttQos ZephyrMqttClient::messageQoS() const {
399+
if(last_message != nullptr) {
400+
return static_cast<MqttQos>(last_message->message.topic.qos);
401+
}
402+
403+
return QosDefault;
404+
}
405+
406+
int ZephyrMqttClient::messageRetain() const {
407+
if(last_message != nullptr) {
408+
return last_message->retain_flag;
409+
}
410+
411+
return -1;
412+
}
413+
399414

400415
#endif // CONFIG_MQTT_LIB

src/implementations/ZephyrMqttClient.h

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -28,11 +28,6 @@ class ZephyrMqttClient: public MqttClientInterface {
2828
size_t size, MqttQos qos = QosDefault,
2929
MqttPublishFlag flags = MqttPublishFlags::None) override;
3030

31-
// TODO zephyr mqtt publish definition doesn't support streaming a message
32-
MqttOStream&& publish(
33-
Topic t, MqttQos qos = QosDefault,
34-
MqttPublishFlag flags = MqttPublishFlags::None) override;
35-
3631
error_t unsubscribe(Topic t) override;
3732
void poll() override;
3833
error_t ping() override;
@@ -51,6 +46,15 @@ class ZephyrMqttClient: public MqttClientInterface {
5146
// in zephyr we do not need to provide a client on which to rely on
5247
void setClient(arduino::Client*) override {};
5348

49+
int read() override;
50+
int read(uint8_t payload[], size_t len) override;
51+
int available() override;
52+
53+
String messageTopic() const override;
54+
int messageDup() const override;
55+
uint16_t messageId() const override;
56+
MqttQos messageQoS() const override;
57+
int messageRetain() const override;
5458
private:
5559
int connect(sockaddr_storage addr);
5660
void client_init();
@@ -63,4 +67,6 @@ class ZephyrMqttClient: public MqttClientInterface {
6367
uint8_t tx_buffer[CONFIG_MQTT_CLIENT_TX_BUFFER];
6468

6569
MqttReceiveCallback _cbk;
70+
71+
struct mqtt_publish_param *last_message;
6672
};

0 commit comments

Comments
 (0)