@@ -171,6 +171,98 @@ int MqttClient::messageRetain() const
171
171
return -1 ;
172
172
}
173
173
174
+ void MqttClient::setClient (arduino::Client* client) {
175
+ if (_client != nullptr && _client->connected ()) {
176
+ // TODO if the current client is connected we cannot perform the change, first call disconnect
177
+ return ;
178
+ }
179
+
180
+ _client = client;
181
+ }
182
+
183
+ void MqttClient::setReceiveCallback (MqttReceiveCallback cbk) {
184
+ _cbk = cbk;
185
+ }
186
+
187
+ class MqttReadStream : public IStream {
188
+ public:
189
+ MqttReadStream (MqttClient& ref, int available)
190
+ : ref(ref), _available(available) { }
191
+
192
+ size_t readBytes (uint8_t * buf, size_t s) override {
193
+ size_t to_read = s < _available ? s : _available;
194
+ to_read = ref.readBytes (buf, to_read);
195
+ _available -= to_read;
196
+ return to_read;
197
+ }
198
+
199
+ int available () override { return _available; }
200
+
201
+ int read () override {
202
+ if (_available > 0 ) {
203
+ _available--;
204
+ return ref.read ();
205
+ } else {
206
+ return -1 ; // TODO return proper error code
207
+ }
208
+ }
209
+ private:
210
+ MqttClient& ref;
211
+ int _available;
212
+ };
213
+
214
+ class ArduinoMqttOStream : public MqttOStream {
215
+ public:
216
+ // TODO change pointer to reference, since it won't change
217
+ ArduinoMqttOStream (MqttClient &ref, error_t err=0 )
218
+ : MqttOStream(err), ref(ref) { }// TODO replace err default value with success
219
+
220
+ ~ArduinoMqttOStream () {
221
+ ref.endMessage ();
222
+ }
223
+
224
+ size_t write (uint8_t a) override {
225
+ if (rc == 1 ) {
226
+ return ref.write (a);
227
+ }
228
+ return 0 ;
229
+ }
230
+
231
+ size_t write (const uint8_t *buffer, size_t size) override {
232
+ if (rc == 1 ) {
233
+ return ref.write (buffer, size);
234
+ }
235
+ return 0 ;
236
+ }
237
+
238
+ int availableForWrite () override { return 0 ; }
239
+
240
+ private:
241
+ MqttClient& ref;
242
+ };
243
+
244
+
245
+ error_t MqttClient::publish (Topic t, uint8_t payload[], size_t size, MqttQos qos, MqttPublishFlag flags) {
246
+ int error = this ->beginMessage (t, (flags & RetainEnabled) == RetainEnabled, qos, (flags & DupEnabled) == DupEnabled);
247
+
248
+ if (error == 0 ) { // TODO replace this with a proper enum value
249
+ return error;
250
+ }
251
+
252
+ int res = this ->write (payload, size);
253
+ this ->endMessage ();
254
+
255
+ return res;
256
+ }
257
+
258
+ MqttOStream&& MqttClient::publish(Topic t, MqttQos qos, MqttPublishFlag flags) {
259
+ int error = this ->beginMessage (
260
+ t, (flags & RetainEnabled) == RetainEnabled,
261
+ static_cast <uint8_t >(qos), (flags & DupEnabled) == DupEnabled);
262
+
263
+ return std::move (ArduinoMqttOStream (*this , error));
264
+ }
265
+
174
266
int MqttClient::beginMessage (const char * topic, unsigned long size, bool retain, uint8_t qos, bool dup)
175
267
{
176
268
_txMessageTopic = topic;
@@ -259,6 +351,20 @@ int MqttClient::endMessage()
259
351
return 1 ;
260
352
}
261
353
354
+ void MqttClient::setWill (
355
+ Topic willTopic, const uint8_t * will_message, size_t will_size, MqttQos qos, MqttPublishFlag flags) {
356
+ int error = this ->beginWill (willTopic, (flags & RetainEnabled) == RetainEnabled, qos, (flags & DupEnabled) == DupEnabled);
357
+
358
+ if (error == 0 ) { // TODO replace this with a proper enum value
359
+ return ;
360
+ }
361
+
362
+ int res = this ->write (will_message, will_size);
363
+ this ->endWill ();
364
+
365
+ return ;
366
+ }
367
+
262
368
int MqttClient::beginWill (const char * topic, unsigned short size, bool retain, uint8_t qos)
263
369
{
264
370
int topicLength = strlen (topic);
@@ -314,7 +420,12 @@ int MqttClient::endWill()
314
420
return 1 ;
315
421
}
316
422
317
- int MqttClient::subscribe (const char * topic, uint8_t qos)
423
+ error_t MqttClient::subscribe (Topic topic, MqttQos qos)
424
+ {
425
+ return subscribe (topic, qos);
426
+ }
427
+
428
+ error_t MqttClient::subscribe (Topic topic, uint8_t qos)
318
429
{
319
430
int topicLength = strlen (topic);
320
431
int remainingLength = topicLength + 5 ;
@@ -362,12 +473,12 @@ int MqttClient::subscribe(const char* topic, uint8_t qos)
362
473
return 0 ;
363
474
}
364
475
365
- int MqttClient::subscribe (const String& topic, uint8_t qos)
476
+ error_t MqttClient::subscribe (const String& topic, MqttQos qos)
366
477
{
367
478
return subscribe (topic.c_str (), qos);
368
479
}
369
480
370
- int MqttClient::unsubscribe (const char * topic)
481
+ error_t MqttClient::unsubscribe (Topic topic)
371
482
{
372
483
int topicLength = strlen (topic);
373
484
int remainingLength = topicLength + 4 ;
@@ -565,16 +676,19 @@ void MqttClient::poll()
565
676
} else {
566
677
_rxState = MQTT_CLIENT_RX_STATE_READ_PUBLISH_PAYLOAD;
567
678
568
- if (_onMessage) {
679
+ if (_cbk) {
680
+ MqttReadStream stream (*this , _rxLength);
681
+ _cbk (_rxMessageTopic.c_str (), stream);
682
+ } else if (_onMessage) {
569
683
#ifdef MQTT_CLIENT_STD_FUNCTION_CALLBACK
570
684
_onMessage (this ,_rxLength);
571
685
#else
572
686
_onMessage (_rxLength);
573
687
#endif
688
+ }
574
689
575
- if (_rxLength == 0 ) {
576
- _rxState = MQTT_CLIENT_RX_STATE_READ_TYPE;
577
- }
690
+ if ((_onMessage || _cbk) && _rxLength == 0 ) {
691
+ _rxState = MQTT_CLIENT_RX_STATE_READ_TYPE;
578
692
}
579
693
}
580
694
}
@@ -592,7 +706,10 @@ void MqttClient::poll()
592
706
593
707
_rxState = MQTT_CLIENT_RX_STATE_READ_PUBLISH_PAYLOAD;
594
708
595
- if (_onMessage) {
709
+ if (_cbk) {
710
+ MqttReadStream stream (*this , _rxLength);
711
+ _cbk (_rxMessageTopic.c_str (), stream);
712
+ } else if (_onMessage) {
596
713
#ifdef MQTT_CLIENT_STD_FUNCTION_CALLBACK
597
714
_onMessage (this ,_rxLength);
598
715
#else
@@ -647,12 +764,12 @@ void MqttClient::poll()
647
764
}
648
765
}
649
766
650
- int MqttClient::connect (IPAddress ip, uint16_t port)
767
+ error_t MqttClient::connect (IPAddress ip, uint16_t port)
651
768
{
652
769
return connect (ip, NULL , port);
653
770
}
654
771
655
- int MqttClient::connect (const char *host, uint16_t port)
772
+ error_t MqttClient::connect (const char *host, uint16_t port)
656
773
{
657
774
return connect ((uint32_t )0 , host, port);
658
775
}
@@ -833,7 +950,7 @@ int MqttClient::subscribeQoS() const
833
950
return _subscribeQos;
834
951
}
835
952
836
- int MqttClient::connect (IPAddress ip, const char * host, uint16_t port)
953
+ error_t MqttClient::connect (IPAddress ip, const char * host, uint16_t port)
837
954
{
838
955
if (clientConnected ()) {
839
956
_client->stop ();
@@ -1041,7 +1158,7 @@ void MqttClient::pubcomp(uint16_t id)
1041
1158
endPacket ();
1042
1159
}
1043
1160
1044
- void MqttClient::ping ()
1161
+ error_t MqttClient::ping ()
1045
1162
{
1046
1163
uint8_t packetBuffer[2 ];
1047
1164
0 commit comments