Skip to content

Commit 0feff48

Browse files
committed
GH-9112: Workaround for Paho stopReconnectCycle
Fixes: #9112 `Mqttv5ClientManager` hangs in `stop()` if never was connected. The scheduled reconnect timer in the client is never cancelled. * Call `stopReconnectCycle()` on the client via reflection when we disconnect from the client in Spring Integration MQTT components (cherry picked from commit 937da13) # Conflicts: # spring-integration-mqtt/src/main/java/org/springframework/integration/mqtt/core/Mqttv3ClientManager.java # spring-integration-mqtt/src/main/java/org/springframework/integration/mqtt/core/Mqttv5ClientManager.java # spring-integration-mqtt/src/main/java/org/springframework/integration/mqtt/inbound/MqttPahoMessageDrivenChannelAdapter.java # spring-integration-mqtt/src/main/java/org/springframework/integration/mqtt/outbound/MqttPahoMessageHandler.java
1 parent 00c75ac commit 0feff48

File tree

11 files changed

+152
-8
lines changed

11 files changed

+152
-8
lines changed
Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
/*
2+
* Copyright 2024 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.integration.mqtt.aot;
18+
19+
import java.util.stream.Stream;
20+
21+
import org.springframework.aot.hint.ExecutableMode;
22+
import org.springframework.aot.hint.ReflectionHints;
23+
import org.springframework.aot.hint.RuntimeHints;
24+
import org.springframework.aot.hint.RuntimeHintsRegistrar;
25+
import org.springframework.util.ClassUtils;
26+
import org.springframework.util.ReflectionUtils;
27+
28+
/**
29+
* {@link RuntimeHintsRegistrar} for Spring Integration MQTT module.
30+
*
31+
* @author Artem Bilan
32+
*
33+
* @since 6.1.9
34+
*/
35+
class MqttRuntimeHints implements RuntimeHintsRegistrar {
36+
37+
@Override
38+
public void registerHints(RuntimeHints hints, ClassLoader classLoader) {
39+
ReflectionHints reflectionHints = hints.reflection();
40+
// TODO until the real fix in Paho library.
41+
Stream.of("org.eclipse.paho.client.mqttv3.MqttAsyncClient", "org.eclipse.paho.mqttv5.client.MqttAsyncClient")
42+
.filter((typeName) -> ClassUtils.isPresent(typeName, classLoader))
43+
.map((typeName) -> loadClassByName(typeName, classLoader))
44+
.flatMap((type) -> Stream.ofNullable(ReflectionUtils.findMethod(type, "stopReconnectCycle")))
45+
.forEach(method -> reflectionHints.registerMethod(method, ExecutableMode.INVOKE));
46+
}
47+
48+
private static Class<?> loadClassByName(String typeName, ClassLoader classLoader) {
49+
try {
50+
return ClassUtils.forName(typeName, classLoader);
51+
}
52+
catch (ClassNotFoundException ex) {
53+
throw new IllegalArgumentException(ex);
54+
}
55+
}
56+
57+
}
Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
/**
2+
* Provides classes to support Spring AOT.
3+
*/
4+
@org.springframework.lang.NonNullApi
5+
@org.springframework.lang.NonNullFields
6+
package org.springframework.integration.mqtt.aot;

spring-integration-mqtt/src/main/java/org/springframework/integration/mqtt/core/Mqttv3ClientManager.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2022-2023 the original author or authors.
2+
* Copyright 2022-2024 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -26,6 +26,7 @@
2626
import org.eclipse.paho.client.mqttv3.MqttMessage;
2727

2828
import org.springframework.integration.mqtt.event.MqttConnectionFailedEvent;
29+
import org.springframework.integration.mqtt.support.MqttUtils;
2930
import org.springframework.util.Assert;
3031

3132
/**
@@ -147,6 +148,9 @@ public synchronized void stop() {
147148
finally {
148149
try {
149150
client.close();
151+
if (getConnectionInfo().isAutomaticReconnect()) {
152+
MqttUtils.stopClientReconnectCycle(client);
153+
}
150154
}
151155
catch (MqttException e) {
152156
logger.error("Could not close the client", e);

spring-integration-mqtt/src/main/java/org/springframework/integration/mqtt/core/Mqttv5ClientManager.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2022-2023 the original author or authors.
2+
* Copyright 2022-2024 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -28,6 +28,7 @@
2828
import org.eclipse.paho.mqttv5.common.packet.MqttProperties;
2929

3030
import org.springframework.integration.mqtt.event.MqttConnectionFailedEvent;
31+
import org.springframework.integration.mqtt.support.MqttUtils;
3132
import org.springframework.util.Assert;
3233

3334
/**
@@ -149,6 +150,9 @@ public synchronized void stop() {
149150
finally {
150151
try {
151152
client.close();
153+
if (getConnectionInfo().isAutomaticReconnect()) {
154+
MqttUtils.stopClientReconnectCycle(client);
155+
}
152156
}
153157
catch (MqttException e) {
154158
logger.error("Could not close the client", e);

spring-integration-mqtt/src/main/java/org/springframework/integration/mqtt/inbound/MqttPahoMessageDrivenChannelAdapter.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2023 the original author or authors.
2+
* Copyright 2002-2024 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -216,6 +216,9 @@ protected synchronized void doStop() {
216216

217217
try {
218218
this.client.disconnectForcibly(getDisconnectCompletionTimeout());
219+
if (getConnectionInfo().isAutomaticReconnect()) {
220+
MqttUtils.stopClientReconnectCycle(this.client);
221+
}
219222
}
220223
catch (MqttException ex) {
221224
logger.error(ex, "Exception while disconnecting");

spring-integration-mqtt/src/main/java/org/springframework/integration/mqtt/inbound/Mqttv5PahoMessageDrivenChannelAdapter.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@
4848
import org.springframework.integration.mqtt.support.MqttHeaderMapper;
4949
import org.springframework.integration.mqtt.support.MqttHeaders;
5050
import org.springframework.integration.mqtt.support.MqttMessageConverter;
51+
import org.springframework.integration.mqtt.support.MqttUtils;
5152
import org.springframework.lang.Nullable;
5253
import org.springframework.messaging.Message;
5354
import org.springframework.messaging.MessageHeaders;
@@ -239,6 +240,9 @@ protected void doStop() {
239240
}
240241
if (getClientManager() == null) {
241242
this.mqttClient.disconnectForcibly(getDisconnectCompletionTimeout());
243+
if (getConnectionInfo().isAutomaticReconnect()) {
244+
MqttUtils.stopClientReconnectCycle(this.mqttClient);
245+
}
242246
}
243247
}
244248
}

spring-integration-mqtt/src/main/java/org/springframework/integration/mqtt/outbound/MqttPahoMessageHandler.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2022 the original author or authors.
2+
* Copyright 2002-2024 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -175,6 +175,9 @@ protected void doStop() {
175175
IMqttAsyncClient theClient = this.client;
176176
if (theClient != null) {
177177
theClient.disconnect().waitForCompletion(getDisconnectCompletionTimeout());
178+
if (getConnectionInfo().isAutomaticReconnect()) {
179+
MqttUtils.stopClientReconnectCycle(theClient);
180+
}
178181
theClient.close();
179182
this.client = null;
180183
}

spring-integration-mqtt/src/main/java/org/springframework/integration/mqtt/outbound/Mqttv5PahoMessageHandler.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@
4242
import org.springframework.integration.mqtt.event.MqttProtocolErrorEvent;
4343
import org.springframework.integration.mqtt.support.MqttHeaderMapper;
4444
import org.springframework.integration.mqtt.support.MqttMessageConverter;
45+
import org.springframework.integration.mqtt.support.MqttUtils;
4546
import org.springframework.lang.Nullable;
4647
import org.springframework.messaging.Message;
4748
import org.springframework.messaging.MessageHandlingException;
@@ -185,6 +186,9 @@ protected void doStop() {
185186
try {
186187
if (getClientManager() == null) {
187188
this.mqttClient.disconnect().waitForCompletion(getDisconnectCompletionTimeout());
189+
if (getConnectionInfo().isAutomaticReconnect()) {
190+
MqttUtils.stopClientReconnectCycle(this.mqttClient);
191+
}
188192
}
189193
}
190194
catch (MqttException ex) {

spring-integration-mqtt/src/main/java/org/springframework/integration/mqtt/support/MqttUtils.java

Lines changed: 59 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2020-2021 the original author or authors.
2+
* Copyright 2020-2024 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -16,9 +16,13 @@
1616

1717
package org.springframework.integration.mqtt.support;
1818

19+
import java.lang.reflect.Method;
20+
1921
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
2022

2123
import org.springframework.beans.BeanUtils;
24+
import org.springframework.util.ClassUtils;
25+
import org.springframework.util.ReflectionUtils;
2226

2327
/**
2428
* MQTT Utilities.
@@ -30,6 +34,38 @@
3034
*/
3135
public final class MqttUtils {
3236

37+
private static final boolean PAHO_MQTTV3_PRESENT =
38+
ClassUtils.isPresent("org.eclipse.paho.client.mqttv3.MqttAsyncClient", null);
39+
40+
private static final boolean PAHO_MQTTV5_PRESENT =
41+
ClassUtils.isPresent("org.eclipse.paho.mqttv5.client.MqttAsyncClient", null);
42+
43+
private static final Method V3_STOP_RECONNECT_CYCLE_METHOD;
44+
45+
private static final Method V5_STOP_RECONNECT_CYCLE_METHOD;
46+
47+
static {
48+
if (PAHO_MQTTV3_PRESENT) {
49+
V3_STOP_RECONNECT_CYCLE_METHOD =
50+
ReflectionUtils.findMethod(org.eclipse.paho.client.mqttv3.MqttAsyncClient.class,
51+
"stopReconnectCycle");
52+
ReflectionUtils.makeAccessible(V3_STOP_RECONNECT_CYCLE_METHOD);
53+
}
54+
else {
55+
V3_STOP_RECONNECT_CYCLE_METHOD = null;
56+
}
57+
58+
if (PAHO_MQTTV5_PRESENT) {
59+
V5_STOP_RECONNECT_CYCLE_METHOD =
60+
ReflectionUtils.findMethod(org.eclipse.paho.mqttv5.client.MqttAsyncClient.class,
61+
"stopReconnectCycle");
62+
ReflectionUtils.makeAccessible(V5_STOP_RECONNECT_CYCLE_METHOD);
63+
}
64+
else {
65+
V5_STOP_RECONNECT_CYCLE_METHOD = null;
66+
}
67+
}
68+
3369
private MqttUtils() {
3470
}
3571

@@ -47,4 +83,26 @@ public static MqttConnectOptions cloneConnectOptions(MqttConnectOptions options)
4783
return options2;
4884
}
4985

86+
/**
87+
* Perform a {@code stopReconnectCycle()} (via reflection) method on the provided client
88+
* to clean up resources on client stop.
89+
* TODO until the real fix in Paho library.
90+
* @param client the MQTTv3 Paho client instance.
91+
* @since 6.1.9
92+
*/
93+
public static void stopClientReconnectCycle(org.eclipse.paho.client.mqttv3.IMqttAsyncClient client) {
94+
ReflectionUtils.invokeMethod(V3_STOP_RECONNECT_CYCLE_METHOD, client);
95+
}
96+
97+
/**
98+
* Perform a {@code stopReconnectCycle()} (via reflection) method on the provided client
99+
* to clean up resources on client stop.
100+
* TODO until the real fix in Paho library.
101+
* @param client the MQTTv5 Paho client instance.
102+
* @since 6.1.9
103+
*/
104+
public static void stopClientReconnectCycle(org.eclipse.paho.mqttv5.client.IMqttAsyncClient client) {
105+
ReflectionUtils.invokeMethod(V5_STOP_RECONNECT_CYCLE_METHOD, client);
106+
}
107+
50108
}
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
org.springframework.aot.hint.RuntimeHintsRegistrar=org.springframework.integration.mqtt.aot.MqttRuntimeHints

0 commit comments

Comments
 (0)