diff --git a/zenoh-plugin-mqtt/src/lib.rs b/zenoh-plugin-mqtt/src/lib.rs index 89bdb86..ee65b24 100644 --- a/zenoh-plugin-mqtt/src/lib.rs +++ b/zenoh-plugin-mqtt/src/lib.rs @@ -684,6 +684,7 @@ async fn control_v3( session.client_id, topic.as_str() ); + session.state().unsubscribe(topic.as_str()).await?; } Ok(msg.ack()) } @@ -821,6 +822,7 @@ async fn control_v5( session.client_id, topic.as_str() ); + session.state().unsubscribe(topic.as_str()).await?; } Ok(msg.ack()) } diff --git a/zenoh-plugin-mqtt/src/mqtt_session_state.rs b/zenoh-plugin-mqtt/src/mqtt_session_state.rs index e64ab49..c43d8eb 100644 --- a/zenoh-plugin-mqtt/src/mqtt_session_state.rs +++ b/zenoh-plugin-mqtt/src/mqtt_session_state.rs @@ -133,6 +133,25 @@ impl MqttSessionState { .allowed_destination(destination) .await } + + pub(crate) async fn unsubscribe(&self, topic: &str) -> ZResult<()> { + let mut subs = self.subs.write().await; + if let Some(sub) = subs.remove(topic) { + drop(sub); + tracing::debug!( + "MQTT Client {}: unsubscribed from Zenoh topic '{}'", + self.client_id, + topic + ); + } else { + tracing::error!( + "MQTT Client {}: no subscription to unsubscribe for Zenoh topic '{}'", + self.client_id, + topic + ); + } + Ok(()) + } } fn route_zenoh_to_mqtt( @@ -188,3 +207,4 @@ fn spawn_mqtt_publisher(client_id: String, rx: Receiver<(ByteString, Bytes)>, si } }); } + diff --git a/zenoh-plugin-mqtt/tests/test.rs b/zenoh-plugin-mqtt/tests/test.rs index 833da60..c02dbe1 100644 --- a/zenoh-plugin-mqtt/tests/test.rs +++ b/zenoh-plugin-mqtt/tests/test.rs @@ -288,4 +288,4 @@ fn test_zenoh_pub_mqtt_sub() { let payload = result.expect("Receiver timeout"); assert_eq!(payload, TEST_PAYLOAD); -} +} \ No newline at end of file