Skip to content

Commit ca55e27

Browse files
authored
Merge pull request #813 from rabbitmq/race-condition-closing
Fix race condition on connection closing
2 parents 63b0497 + 0b5e77f commit ca55e27

File tree

1 file changed

+16
-14
lines changed

1 file changed

+16
-14
lines changed

src/main/java/com/rabbitmq/stream/impl/Client.java

Lines changed: 16 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -184,7 +184,7 @@ public long applyAsLong(Object value) {
184184
private final Map<String, String> connectionProperties;
185185
private final Duration rpcTimeout;
186186
private final List<String> saslMechanisms;
187-
private volatile ShutdownReason shutdownReason = null;
187+
private AtomicReference<ShutdownReason> shutdownReason = new AtomicReference<>();
188188
private final Runnable streamStatsCommandVersionsCheck;
189189
private final boolean filteringSupported;
190190
private final Runnable superStreamManagementCommandVersionsCheck;
@@ -1435,17 +1435,21 @@ public Response unsubscribe(byte subscriptionId) {
14351435

14361436
public void close() {
14371437
if (closing.compareAndSet(false, true)) {
1438-
LOGGER.debug("Closing client");
1439-
1440-
sendClose(RESPONSE_CODE_OK, "OK");
1441-
1442-
closingSequence(ShutdownContext.ShutdownReason.CLIENT_CLOSE);
1443-
1438+
LOGGER.debug("Closing client, channel still active? {}", this.channel.isActive());
1439+
ShutdownReason reason;
1440+
if (this.channel.isActive()) {
1441+
sendClose(RESPONSE_CODE_OK, "OK");
1442+
reason = ShutdownReason.CLIENT_CLOSE;
1443+
} else {
1444+
reason = ShutdownReason.UNKNOWN;
1445+
}
1446+
closingSequence(reason);
14441447
LOGGER.debug("Client closed");
14451448
}
14461449
}
14471450

14481451
void closingSequence(ShutdownContext.ShutdownReason reason) {
1452+
this.shutdownReason(reason);
14491453
if (reason != null) {
14501454
this.shutdownListenerCallback.accept(reason);
14511455
}
@@ -1713,7 +1717,7 @@ public void consumerUpdateResponse(
17131717
}
17141718

17151719
void shutdownReason(ShutdownReason reason) {
1716-
this.shutdownReason = reason;
1720+
this.shutdownReason.compareAndSet(null, reason);
17171721
}
17181722

17191723
public SocketAddress localAddress() {
@@ -2858,16 +2862,14 @@ public void channelInactive(ChannelHandlerContext ctx) {
28582862
// the event is actually dispatched to the listener, emitting
28592863
// an UNKNOWN reason instead of SERVER_CLOSE. So we skip the closing here
28602864
// because it will be handled later anyway.
2861-
if (shutdownReason == null) {
2865+
if (shutdownReason.get() == null) {
2866+
LOGGER.debug("No shutdown reason");
28622867
if (closing.compareAndSet(false, true)) {
2868+
LOGGER.debug("Closing with 'unknown' shutdown reason");
28632869
if (executorService == null) {
28642870
// the TCP connection is closed before the state is initialized
28652871
// we do our best the execute the closing sequence
2866-
new Thread(
2867-
() -> {
2868-
closingSequence(ShutdownReason.UNKNOWN);
2869-
})
2870-
.start();
2872+
new Thread(() -> closingSequence(ShutdownReason.UNKNOWN)).start();
28712873
} else {
28722874
executorService.submit(() -> closingSequence(ShutdownReason.UNKNOWN));
28732875
}

0 commit comments

Comments
 (0)