diff --git a/libraries-server/pom.xml b/libraries-server/pom.xml index b0391db60fc0..d587e5913bfe 100644 --- a/libraries-server/pom.xml +++ b/libraries-server/pom.xml @@ -1,7 +1,7 @@ + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> 4.0.0 libraries-server 0.0.1-SNAPSHOT @@ -55,6 +55,12 @@ netty-all ${netty.version} + + + com.hivemq + hivemq-mqtt-client + ${hivemq.mqtt.client.version} + @@ -63,6 +69,7 @@ 4.1.20.Final 1.2.0 logback.xml + 1.3.12 \ No newline at end of file diff --git a/libraries-server/src/test/java/com/baeldung/mqtt/HiveMqMqttClientIntegrationTest.java b/libraries-server/src/test/java/com/baeldung/mqtt/HiveMqMqttClientIntegrationTest.java new file mode 100644 index 000000000000..31a04f0a1131 --- /dev/null +++ b/libraries-server/src/test/java/com/baeldung/mqtt/HiveMqMqttClientIntegrationTest.java @@ -0,0 +1,72 @@ +package com.baeldung.mqtt; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import java.nio.charset.StandardCharsets; +import java.util.UUID; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; + +import org.junit.jupiter.api.Test; + +import com.hivemq.client.mqtt.MqttGlobalPublishFilter; +import com.hivemq.client.mqtt.mqtt5.Mqtt5AsyncClient; +import com.hivemq.client.mqtt.mqtt5.Mqtt5BlockingClient; +import com.hivemq.client.mqtt.mqtt5.Mqtt5Client; + +class HiveMqMqttClientIntegrationTest { + + private static final String PUBLIC_BROKER_HOST = "broker.hivemq.com"; + private static final int PUBLIC_BROKER_PORT = 1883; + + @Test + void givenSubscriber_whenMessageIsPublished_thenItIsReceived() throws Exception { + String topic = "baeldung/hivemq/test/" + UUID.randomUUID(); + String payload = "Hello from Baeldung"; + + Mqtt5AsyncClient subscriber = Mqtt5Client.builder() + .identifier("baeldung-sub-" + UUID.randomUUID()) + .serverHost(PUBLIC_BROKER_HOST) + .serverPort(PUBLIC_BROKER_PORT) + .buildAsync(); + + subscriber.connect() + .join(); + + CountDownLatch latch = new CountDownLatch(1); + AtomicReference receivedMessage = new AtomicReference<>(); + + subscriber.publishes(MqttGlobalPublishFilter.SUBSCRIBED, publish -> { + String message = new String(publish.getPayloadAsBytes(), StandardCharsets.UTF_8); + receivedMessage.set(message); + latch.countDown(); + }); + + subscriber.subscribeWith() + .topicFilter(topic) + .send() + .join(); + + Mqtt5BlockingClient publisher = Mqtt5Client.builder() + .identifier("baeldung-pub-" + UUID.randomUUID()) + .serverHost(PUBLIC_BROKER_HOST) + .serverPort(PUBLIC_BROKER_PORT) + .buildBlocking(); + + publisher.connect(); + + publisher.publishWith() + .topic(topic) + .payload(payload.getBytes(StandardCharsets.UTF_8)) + .send(); + + assertTrue(latch.await(2, TimeUnit.SECONDS)); + assertEquals(payload, receivedMessage.get()); + + publisher.disconnect(); + subscriber.disconnect() + .join(); + } +} \ No newline at end of file