|
29 | 29 | from AWSIoTPythonSDK.core.protocol.internal.defaults import ALPN_PROTCOLS |
30 | 30 | from AWSIoTPythonSDK.core.protocol.internal.events import FixedEventMids |
31 | 31 | from AWSIoTPythonSDK.core.protocol.paho.client import MQTT_ERR_SUCCESS |
| 32 | +from AWSIoTPythonSDK.core.protocol.paho.client import MQTT_ERR_SUBACK_ERROR |
32 | 33 | from AWSIoTPythonSDK.exception.AWSIoTExceptions import connectError |
33 | 34 | from AWSIoTPythonSDK.exception.AWSIoTExceptions import connectTimeoutException |
34 | 35 | from AWSIoTPythonSDK.exception.AWSIoTExceptions import disconnectError |
|
58 | 59 | from queue import Queue |
59 | 60 |
|
60 | 61 |
|
| 62 | +class AckPacket(object): |
| 63 | + def __init__(self): |
| 64 | + self.event = Event() |
| 65 | + self.data = None |
| 66 | + |
| 67 | + |
61 | 68 | class MqttCore(object): |
62 | 69 |
|
63 | 70 | _logger = logging.getLogger(__name__) |
@@ -298,12 +305,15 @@ def subscribe(self, topic, qos, message_callback=None): |
298 | 305 | if ClientStatus.STABLE != self._client_status.get_status(): |
299 | 306 | self._handle_offline_request(RequestTypes.SUBSCRIBE, (topic, qos, message_callback, None)) |
300 | 307 | else: |
301 | | - event = Event() |
302 | | - rc, mid = self._subscribe_async(topic, qos, self._create_blocking_ack_callback(event), message_callback) |
303 | | - if not event.wait(self._operation_timeout_sec): |
| 308 | + ack = AckPacket() |
| 309 | + rc, mid = self._subscribe_async(topic, qos, self._create_blocking_ack_callback_ret(ack), message_callback) |
| 310 | + if not ack.event.wait(self._operation_timeout_sec): |
304 | 311 | self._internal_async_client.remove_event_callback(mid) |
305 | 312 | self._logger.error("Subscribe timed out") |
306 | 313 | raise subscribeTimeoutException() |
| 314 | + if ack.data[0] == MQTT_ERR_SUBACK_ERROR: |
| 315 | + self._logger.error(f"Subscribe error: {ack.data}") |
| 316 | + raise subscribeError(ack.data) |
307 | 317 | ret = True |
308 | 318 | return ret |
309 | 319 |
|
@@ -361,6 +371,12 @@ def ack_callback(mid, data=None): |
361 | 371 | event.set() |
362 | 372 | return ack_callback |
363 | 373 |
|
| 374 | + def _create_blocking_ack_callback_ret(self, ack: AckPacket): |
| 375 | + def ack_callback(mid, data=None): |
| 376 | + ack.data = data |
| 377 | + ack.event.set() |
| 378 | + return ack_callback |
| 379 | + |
364 | 380 | def _handle_offline_request(self, type, data): |
365 | 381 | self._logger.info("Offline request detected!") |
366 | 382 | offline_request = QueueableRequest(type, data) |
|
0 commit comments