diff --git a/custom_components/meshtastic/aiomeshtastic/interface.py b/custom_components/meshtastic/aiomeshtastic/interface.py index 1c6b770..ce02c18 100644 --- a/custom_components/meshtastic/aiomeshtastic/interface.py +++ b/custom_components/meshtastic/aiomeshtastic/interface.py @@ -5,6 +5,7 @@ import functools import itertools import random +import ssl from collections import defaultdict, deque from collections.abc import AsyncIterator, Awaitable, Callable, Mapping, MutableMapping from dataclasses import dataclass @@ -16,15 +17,12 @@ Self, ) -from homeassistant.util.ssl import get_default_context - try: - import aiomqtt - from aiomqtt import MqttError + import paho.mqtt.client as mqtt - _has_aiomqtt = True + _has_mqtt = True except ImportError: - _has_aiomqtt = False + _has_mqtt = False import google from google.protobuf.message import Message @@ -168,15 +166,14 @@ def __init__( # noqa: PLR0913 # MQTT client for persistent connection self._mqtt_proxy_enabled = enable_mqtt_proxy - if self._mqtt_proxy_enabled and not _has_aiomqtt: - self._logger.warning("Could not enable MQTT proxy because aiomqtt is not installed") + if self._mqtt_proxy_enabled and not _has_mqtt: + self._logger.warning("Could not enable MQTT proxy because paho-mqtt is not installed") self._mqtt_proxy_enabled = False if self._mqtt_proxy_enabled: - self._mqtt_client: aiomqtt.Client | None = None - self._mqtt_connected = False - self._mqtt_connection_task: asyncio.Task | None = None - self._mqtt_config: dict[str, str] | None = None + self._mqtt_client: mqtt.Client | None = None + self._mqtt_message_queue = deque() + self._mqtt_retry_task = None def add_packet_app_listener( self, @@ -346,9 +343,12 @@ async def stop(self) -> None: await self._cancel_processing_tasks() await self._cancel_background_tasks() - # MQTT client will be closed automatically when the context manager exits - self._mqtt_client = None - self._mqtt_connected = False + # Clean up MQTT client + if self._mqtt_client is not None: + self._logger.debug("Stopping MQTT client") + self._mqtt_client.disconnect() + self._mqtt_client.loop_stop() + self._mqtt_client = None with contextlib.suppress(Exception): await self._connection.send_disconnect() @@ -397,6 +397,10 @@ async def connected_node_ready(self) -> bool: async def _init_mqtt_client(self) -> None: """Initialize the MQTT client if MQTT is enabled in the module config.""" + if self._mqtt_client is not None: + self._logger.debug("MQTT client already initialized, skipping.") + return + if not await self.connected_node_ready(): self._logger.debug("Node not ready, not initializing MQTT client") return @@ -406,103 +410,94 @@ async def _init_mqtt_client(self) -> None: self._logger.debug("MQTT not enabled in module config, not initializing client") return - # Get MQTT configuration - broker = mqtt_config.address or "mqtt.meshtastic.org" + broker = mqtt_config.address username = mqtt_config.username password = mqtt_config.password use_tls = mqtt_config.tls_enabled - # Parse broker address hostname = broker.split(":", 1)[0] port = int(broker.split(":", 1)[1]) if ":" in broker else 1883 - # Get node ID for client identifier node_id = self._connected_node_info.my_node_num - client_id = f"!{node_id:08x}" + client_id = f"!{node_id:08x}-ha" self._logger.info("Initializing MQTT client") - # Create MQTT client configuration - self._mqtt_config = { - "hostname": hostname, - "port": port, - "username": username or None, - "password": password or None, - "tls_context": get_default_context() if use_tls else None, - "identifier": client_id, - } - - # Start connection task - self._mqtt_connection_task = self._add_background_task(self._maintain_mqtt_connection(), name="mqtt-connection") - - async def _maintain_mqtt_connection(self) -> None: - """Maintains the MQTT connection and handles reconnections.""" - while self.is_running: - try: - self._logger.debug("Connecting to MQTT broker") + self._mqtt_client = mqtt.Client(client_id=client_id) - self._mqtt_client = aiomqtt.Client(**self._mqtt_config) + self._mqtt_client.on_connect = self._on_mqtt_connect + self._mqtt_client.on_disconnect = self._on_mqtt_disconnect - # When the context manager exits, the connection is closed - async with self._mqtt_client: - self._mqtt_connected = True - self._logger.debug("Connected to MQTT broker") + if use_tls: + self._mqtt_client.tls_set( + ca_certs=None, # Use default CA certs + certfile=None, + keyfile=None, + cert_reqs=ssl.CERT_REQUIRED, + tls_version=ssl.PROTOCOL_TLS, + ciphers=None, + ) - # Wait until the interface is stopped - await self._is_stopped.wait() + if username and password: + self._mqtt_client.username_pw_set(username, password) - # Interface stopped, don't reconnect - break - except MqttError as e: - self._logger.warning("Meshtastic MQTT proxy connection error: %s", e) - finally: - self._mqtt_connected = False - self._logger.debug("MQTT connection closed") + self._logger.debug("Connecting to MQTT broker %s:%d", hostname, port) + self._mqtt_client.connect_async(hostname, port) - # Wait before attempting to reconnect - self._logger.debug("Reconnecting MQTT in 5 seconds") - try: - await asyncio.sleep(5) - except asyncio.CancelledError: - break + self._mqtt_client.loop_start() + + def _on_mqtt_connect(self, _client: mqtt.Client, _userdata: Any, _flags: dict[str, Any], rc: int) -> None: + if rc == 0: + self._logger.info("Connected to MQTT broker") + else: + self._logger.warning("Failed to connect to MQTT broker, return code: %d", rc) + + def _on_mqtt_disconnect(self, _client: mqtt.Client, _userdata: Any, rc: int) -> None: + self._logger.info("Disconnected from MQTT broker, return code: %d", rc) async def _handle_mqtt_client_proxy_message(self, message: mesh_pb2.MqttClientProxyMessage) -> None: """ Handle MQTT client proxy messages from the radio. This receives MqttClientProxyMessage messages from the radio and forwards them to the - configured MQTT broker. + configured MQTT broker. Will retry up to 3 times on failure. """ - if ( - not hasattr(self._connected_node_module_config, "mqtt") - or not self._connected_node_module_config.mqtt.enabled - or not self._connected_node_module_config.mqtt.proxy_to_client_enabled - ): - return - - if not self._mqtt_connected or self._mqtt_client is None: - self._logger.debug("MQTT client not yet connected") + if self._mqtt_client is None: + self._logger.debug("No MQTT client available, message will be lost") return - self._logger.debug("Publishing MQTT message") + self._logger.debug("Publishing MQTT message to topic: {message.topic}") try: if message.HasField("data"): - await self._mqtt_client.publish( - message.topic, - payload=message.data, - retain=message.retained, - qos=1, - ) + payload = message.data elif message.HasField("text"): - await self._mqtt_client.publish( - message.topic, - payload=message.text.encode("utf-8"), - retain=message.retained, - qos=1, - ) + payload = message.text.encode("utf-8") else: self._logger.debug("No payload in MQTT message, ignoring") + return + + max_retries = 3 + retry_count = 0 + + while retry_count < max_retries: + result = self._mqtt_client.publish(message.topic, payload=payload, qos=2, retain=message.retained) + + if result.rc == mqtt.MQTT_ERR_SUCCESS: + return + + self._logger.warning( + "Error publishing MQTT message: %s, attempt %d of %d", + mqtt.error_string(result.rc), + retry_count + 1, + max_retries, + ) + + retry_count += 1 + if retry_count < max_retries: + await asyncio.sleep(1) + + self._logger.error("Failed to publish MQTT message after %d attempts", max_retries) except Exception: self._logger.exception("Error publishing MQTT message") diff --git a/custom_components/meshtastic/manifest.json b/custom_components/meshtastic/manifest.json index 1a14d5b..85d6545 100644 --- a/custom_components/meshtastic/manifest.json +++ b/custom_components/meshtastic/manifest.json @@ -25,7 +25,7 @@ "issue_tracker": "https://github.com/meshtastic/home-assistant/issues", "requirements": [ "pyserial-asyncio==0.6", - "aiomqtt" + "paho-mqtt>=2.1.0" ], "usb": [ { diff --git a/requirements.txt b/requirements.txt index 6931f93..4b3144b 100644 --- a/requirements.txt +++ b/requirements.txt @@ -6,4 +6,4 @@ ruff==0.11.2 bleak~=0.22.3 pyserial-asyncio~=0.6 -aiomqtt>=1.2.0 +paho-mqtt>=2.1.0