Skip to content

Commit 0583378

Browse files
fixup! Making MqttClient inherit from MqttClientInterface
1 parent b75cead commit 0583378

File tree

2 files changed

+23
-83
lines changed

2 files changed

+23
-83
lines changed

src/MqttClient.cpp

Lines changed: 17 additions & 74 deletions
Original file line numberDiff line numberDiff line change
@@ -151,14 +151,25 @@ int MqttClient::messageDup() const
151151
return -1;
152152
}
153153

154-
int MqttClient::messageQoS() const
154+
uint16_t MqttClient::messageId() const
155155
{
156156
if (_rxState == MQTT_CLIENT_RX_STATE_READ_PUBLISH_PAYLOAD) {
157157
// message received and ready for reading
158-
return _rxMessageQoS;
158+
// return _rxMessageId; // TODO
159159
}
160160

161-
return -1;
161+
return 0;
162+
}
163+
164+
165+
MqttQos MqttClient::messageQoS() const
166+
{
167+
if (_rxState == MQTT_CLIENT_RX_STATE_READ_PUBLISH_PAYLOAD) {
168+
// message received and ready for reading
169+
return static_cast<MqttQos>(_rxMessageQoS);
170+
}
171+
172+
return QosDefault;
162173
}
163174

164175
int MqttClient::messageRetain() const
@@ -184,64 +195,6 @@ void MqttClient::setReceiveCallback(MqttReceiveCallback cbk) {
184195
_cbk = cbk;
185196
}
186197

187-
class MqttReadStream: public IStream {
188-
public:
189-
MqttReadStream(MqttClient& ref, int available)
190-
: ref(ref), _available(available) { }
191-
192-
size_t readBytes(uint8_t* buf, size_t s) override {
193-
size_t to_read = s < _available ? s : _available;
194-
to_read = ref.readBytes(buf, to_read);
195-
_available -= to_read;
196-
return to_read;
197-
}
198-
199-
int available() override { return _available; }
200-
201-
int read() override {
202-
if(_available > 0) {
203-
_available--;
204-
return ref.read();
205-
} else {
206-
return -1; // TODO return proper error code
207-
}
208-
}
209-
private:
210-
MqttClient& ref;
211-
int _available;
212-
};
213-
214-
class ArduinoMqttOStream: public MqttOStream {
215-
public:
216-
// TODO change pointer to reference, since it won't change
217-
ArduinoMqttOStream(MqttClient &ref, error_t err=0)
218-
: MqttOStream(err), ref(ref) { }// TODO replace err default value with success
219-
220-
~ArduinoMqttOStream() {
221-
ref.endMessage();
222-
}
223-
224-
size_t write(uint8_t a) override {
225-
if(rc == 1) {
226-
return ref.write(a);
227-
}
228-
return 0;
229-
}
230-
231-
size_t write(const uint8_t *buffer, size_t size) override {
232-
if(rc == 1) {
233-
return ref.write(buffer, size);
234-
}
235-
return 0;
236-
}
237-
238-
int availableForWrite() override { return 0; }
239-
240-
private:
241-
MqttClient& ref;
242-
};
243-
244-
245198
error_t MqttClient::publish(Topic t, uint8_t payload[], size_t size, MqttQos qos, MqttPublishFlag flags) {
246199
int error = this->beginMessage(t, (flags & RetainEnabled) == RetainEnabled, qos, (flags & DupEnabled) == DupEnabled);
247200

@@ -255,14 +208,6 @@ error_t MqttClient::publish(Topic t, uint8_t payload[], size_t size, MqttQos qos
255208
return res;
256209
}
257210

258-
MqttOStream&& MqttClient::publish(Topic t, MqttQos qos, MqttPublishFlag flags) {
259-
int error = this->beginMessage(
260-
t, (flags & RetainEnabled) == RetainEnabled,
261-
static_cast<uint8_t>(qos), (flags & DupEnabled) == DupEnabled);
262-
263-
return std::move(ArduinoMqttOStream(*this, error));
264-
}
265-
266211
int MqttClient::beginMessage(const char* topic, unsigned long size, bool retain, uint8_t qos, bool dup)
267212
{
268213
_txMessageTopic = topic;
@@ -672,8 +617,7 @@ void MqttClient::poll()
672617
_rxState = MQTT_CLIENT_RX_STATE_READ_PUBLISH_PAYLOAD;
673618

674619
if(_cbk) {
675-
MqttReadStream stream(*this, _rxLength);
676-
_cbk(_rxMessageTopic.c_str(), stream);
620+
_cbk(_rxMessageTopic.c_str());
677621
} else if (_onMessage) {
678622
#ifdef MQTT_CLIENT_STD_FUNCTION_CALLBACK
679623
_onMessage(this,_rxLength);
@@ -702,8 +646,7 @@ void MqttClient::poll()
702646
_rxState = MQTT_CLIENT_RX_STATE_READ_PUBLISH_PAYLOAD;
703647

704648
if(_cbk) {
705-
MqttReadStream stream(*this, _rxLength);
706-
_cbk(_rxMessageTopic.c_str(), stream);
649+
_cbk(_rxMessageTopic.c_str());
707650
} else if (_onMessage) {
708651
#ifdef MQTT_CLIENT_STD_FUNCTION_CALLBACK
709652
_onMessage(this,_rxLength);
@@ -818,7 +761,7 @@ int MqttClient::read()
818761
return b;
819762
}
820763

821-
int MqttClient::read(uint8_t *buf, size_t size)
764+
int MqttClient::read(uint8_t buf[], size_t size)
822765
{
823766
size_t result = 0;
824767

src/MqttClient.h

Lines changed: 6 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -60,10 +60,6 @@ class MqttClient : public MqttClientInterface, public Client {
6060
size_t size, MqttQos qos = QosDefault,
6161
MqttPublishFlag flags = MqttPublishFlags::None);
6262

63-
MqttOStream&& publish(
64-
Topic t, MqttQos qos = QosDefault,
65-
MqttPublishFlag flags = MqttPublishFlags::None);
66-
6763
error_t ping() override;
6864

6965
void setReceiveCallback(MqttReceiveCallback cbk);
@@ -76,10 +72,11 @@ class MqttClient : public MqttClientInterface, public Client {
7672
void setClient(arduino::Client*) override;
7773

7874
int parseMessage();
79-
String messageTopic() const;
80-
int messageDup() const;
81-
int messageQoS() const;
82-
int messageRetain() const;
75+
String messageTopic() const override;
76+
int messageDup() const override;
77+
uint16_t messageId() const override;
78+
MqttQos messageQoS() const override;
79+
int messageRetain() const override;
8380

8481
int beginMessage(const char* topic, unsigned long size, bool retain = false, uint8_t qos = QosDefault, bool dup = false);
8582
int beginMessage(const String& topic, unsigned long size, bool retain = false, uint8_t qos = QosDefault, bool dup = false);
@@ -110,7 +107,7 @@ class MqttClient : public MqttClientInterface, public Client {
110107
virtual size_t write(const uint8_t *buf, size_t size);
111108
virtual int available();
112109
virtual int read();
113-
virtual int read(uint8_t *buf, size_t size);
110+
virtual int read(uint8_t buf[], size_t size);
114111
virtual int peek();
115112
virtual void flush();
116113
virtual void stop();

0 commit comments

Comments
 (0)