diff --git a/src/main/java/com/rabbitmq/stream/impl/Client.java b/src/main/java/com/rabbitmq/stream/impl/Client.java index df5abb0f6b..f5c5121937 100644 --- a/src/main/java/com/rabbitmq/stream/impl/Client.java +++ b/src/main/java/com/rabbitmq/stream/impl/Client.java @@ -496,10 +496,10 @@ private Map peerProperties() { throw new StreamException("Error when establishing stream connection", request.error()); } } catch (StreamException e) { - outstandingRequests.remove(correlationId); + this.handleRpcError(correlationId, e); throw e; } catch (RuntimeException e) { - outstandingRequests.remove(correlationId); + this.handleRpcError(correlationId, e); throw new StreamException("Error while trying to exchange peer properties", e); } } @@ -572,10 +572,10 @@ private SaslAuthenticateResponse sendSaslAuthenticate( request.block(); return request.response.get(); } catch (StreamException e) { - outstandingRequests.remove(correlationId); + this.handleRpcError(correlationId, e); throw e; } catch (RuntimeException e) { - outstandingRequests.remove(correlationId); + this.handleRpcError(correlationId, e); throw new StreamException("Error while trying to authenticate", e); } } @@ -602,10 +602,10 @@ private Map open(String virtualHost) { } return request.response.get().connectionProperties; } catch (StreamException e) { - outstandingRequests.remove(correlationId); + this.handleRpcError(correlationId, e); throw e; } catch (RuntimeException e) { - outstandingRequests.remove(correlationId); + this.handleRpcError(correlationId, e); throw new StreamException("Error during open command", e); } } @@ -646,10 +646,10 @@ private void sendClose(short code, String reason) { + formatConstant(request.response.get().getResponseCode())); } } catch (StreamException e) { - outstandingRequests.remove(correlationId); + this.handleRpcError(correlationId, e); throw e; } catch (RuntimeException e) { - outstandingRequests.remove(correlationId); + this.handleRpcError(correlationId, e); throw new StreamException("Error while closing connection", e); } } @@ -669,10 +669,10 @@ private List getSaslMechanisms() { request.block(); return request.response.get(); } catch (StreamException e) { - outstandingRequests.remove(correlationId); + this.handleRpcError(correlationId, e); throw e; } catch (RuntimeException e) { - outstandingRequests.remove(correlationId); + this.handleRpcError(correlationId, e); throw new StreamException("Error while exchanging SASL mechanisms", e); } } @@ -699,10 +699,10 @@ public Response create(String stream, Map arguments) { request.block(); return request.response.get(); } catch (StreamException e) { - outstandingRequests.remove(correlationId); + this.handleRpcError(correlationId, e); throw e; } catch (RuntimeException e) { - outstandingRequests.remove(correlationId); + this.handleRpcError(correlationId, e); throw new StreamException(format("Error while creating stream '%s'", stream), e); } } @@ -750,10 +750,10 @@ Response createSuperStream( request.block(); return request.response.get(); } catch (StreamException e) { - outstandingRequests.remove(correlationId); + this.handleRpcError(correlationId, e); throw e; } catch (RuntimeException e) { - outstandingRequests.remove(correlationId); + this.handleRpcError(correlationId, e); throw new StreamException(format("Error while creating super stream '%s'", superStream), e); } } @@ -776,10 +776,10 @@ Response deleteSuperStream(String superStream) { request.block(); return request.response.get(); } catch (StreamException e) { - outstandingRequests.remove(correlationId); + this.handleRpcError(correlationId, e); throw e; } catch (RuntimeException e) { - outstandingRequests.remove(correlationId); + this.handleRpcError(correlationId, e); throw new StreamException(format("Error while deleting stream '%s'", superStream), e); } } @@ -860,10 +860,10 @@ public Response delete(String stream) { request.block(); return request.response.get(); } catch (StreamException e) { - outstandingRequests.remove(correlationId); + this.handleRpcError(correlationId, e); throw e; } catch (RuntimeException e) { - outstandingRequests.remove(correlationId); + this.handleRpcError(correlationId, e); throw new StreamException(format("Error while deleting stream '%s'", stream), e); } } @@ -891,10 +891,10 @@ public Map metadata(String... streams) { request.block(); return request.response.get(); } catch (StreamException e) { - outstandingRequests.remove(correlationId); + this.handleRpcError(correlationId, e); throw e; } catch (RuntimeException e) { - outstandingRequests.remove(correlationId); + this.handleRpcError(correlationId, e); throw new StreamException( format("Error while getting metadata for stream(s) '%s'", join(",", streams)), e); } @@ -930,10 +930,10 @@ public Response declarePublisher(byte publisherId, String publisherReference, St request.block(); return request.response.get(); } catch (StreamException e) { - outstandingRequests.remove(correlationId); + this.handleRpcError(correlationId, e); throw e; } catch (RuntimeException e) { - outstandingRequests.remove(correlationId); + this.handleRpcError(correlationId, e); throw new StreamException( format("Error while declaring publisher for stream '%s'", stream), e); } @@ -955,10 +955,10 @@ public Response deletePublisher(byte publisherId) { request.block(); return request.response.get(); } catch (StreamException e) { - outstandingRequests.remove(correlationId); + this.handleRpcError(correlationId, e); throw e; } catch (RuntimeException e) { - outstandingRequests.remove(correlationId); + this.handleRpcError(correlationId, e); throw new StreamException("Error while deleting publisher", e); } } @@ -1293,10 +1293,10 @@ public Response subscribe( request.block(); return request.response.get(); } catch (StreamException e) { - outstandingRequests.remove(correlationId); + this.handleRpcError(correlationId, e); throw e; } catch (RuntimeException e) { - outstandingRequests.remove(correlationId); + this.handleRpcError(correlationId, e); throw new StreamException( format("Error while trying to subscribe to stream '%s'", stream), e); } @@ -1351,10 +1351,10 @@ public QueryOffsetResponse queryOffset(String reference, String stream) { QueryOffsetResponse response = request.response.get(); return response; } catch (StreamException e) { - outstandingRequests.remove(correlationId); + this.handleRpcError(correlationId, e); throw e; } catch (RuntimeException e) { - outstandingRequests.remove(correlationId); + this.handleRpcError(correlationId, e); throw new StreamException( format( "Error while querying offset for reference '%s' on stream '%s'", reference, stream), @@ -1397,10 +1397,10 @@ public long queryPublisherSequence(String publisherReference, String stream) { } return response.getSequence(); } catch (StreamException e) { - outstandingRequests.remove(correlationId); + this.handleRpcError(correlationId, e); throw e; } catch (RuntimeException e) { - outstandingRequests.remove(correlationId); + this.handleRpcError(correlationId, e); throw new StreamException( format( "Error while querying publisher sequence for '%s' on stream '%s'", @@ -1425,10 +1425,10 @@ public Response unsubscribe(byte subscriptionId) { request.block(); return request.response.get(); } catch (StreamException e) { - outstandingRequests.remove(correlationId); + this.handleRpcError(correlationId, e); throw e; } catch (RuntimeException e) { - outstandingRequests.remove(correlationId); + this.handleRpcError(correlationId, e); throw new StreamException("Error while unsubscribing", e); } } @@ -1450,6 +1450,7 @@ void closingSequence(ShutdownContext.ShutdownReason reason) { this.shutdownListenerCallback.accept(reason); } this.nettyClosing.run(); + this.failOutstandingRequests(); if (this.closeDispatchingExecutorService != null) { this.closeDispatchingExecutorService.accept(this.dispatchingExecutorService); } @@ -1458,6 +1459,24 @@ void closingSequence(ShutdownContext.ShutdownReason reason) { } } + private void failOutstandingRequests() { + try { + Exception cause = null; + for (OutstandingRequest request : this.outstandingRequests.values()) { + if (cause == null) { + cause = new ConnectionStreamException("Connection is closed"); + } + try { + request.completeExceptionally(cause); + } catch (Exception e) { + LOGGER.debug("Error while failing outstanding request: {}", e.getMessage()); + } + } + } catch (Exception e) { + LOGGER.debug("Error while failing outstanding requests: {}", e.getMessage()); + } + } + private void closeNetty() { try { if (this.channel != null && this.channel.isOpen()) { @@ -1567,10 +1586,10 @@ public List route(String routingKey, String superStream) { request.block(); return request.response.get(); } catch (StreamException e) { - outstandingRequests.remove(correlationId); + this.handleRpcError(correlationId, e); throw e; } catch (RuntimeException e) { - outstandingRequests.remove(correlationId); + this.handleRpcError(correlationId, e); throw new StreamException( format( "Error while querying route for routing key '%s' on super stream '%s'", @@ -1600,10 +1619,10 @@ public List partitions(String superStream) { request.block(); return request.response.get(); } catch (StreamException e) { - outstandingRequests.remove(correlationId); + this.handleRpcError(correlationId, e); throw e; } catch (RuntimeException e) { - outstandingRequests.remove(correlationId); + this.handleRpcError(correlationId, e); throw new StreamException( format("Error while querying partitions for super stream '%s'", superStream), e); } @@ -1632,10 +1651,10 @@ List exchangeCommandVersions() { request.block(); return request.response.get(); } catch (StreamException e) { - outstandingRequests.remove(correlationId); + this.handleRpcError(correlationId, e); throw e; } catch (RuntimeException e) { - outstandingRequests.remove(correlationId); + this.handleRpcError(correlationId, e); throw new StreamException("Error while exchanging command version", e); } } @@ -1661,10 +1680,10 @@ StreamStatsResponse streamStats(String stream) { request.block(); return request.response.get(); } catch (StreamException e) { - outstandingRequests.remove(correlationId); + this.handleRpcError(correlationId, e); throw e; } catch (RuntimeException e) { - outstandingRequests.remove(correlationId); + this.handleRpcError(correlationId, e); throw new StreamException( format("Error while querying statistics for stream '%s'", stream), e); } @@ -2950,4 +2969,11 @@ private void debug(Supplier format, Object... args) { LOGGER.debug("Connection '" + this.clientConnectionName + "': " + format.get(), args); } } + + private void handleRpcError(int correlationId, Exception e) { + OutstandingRequest request = this.outstandingRequests.remove(correlationId); + if (request != null) { + request.completeExceptionally(e); + } + } } diff --git a/src/test/java/com/rabbitmq/stream/impl/ClientTest.java b/src/test/java/com/rabbitmq/stream/impl/ClientTest.java index 9e04942d17..0174f37956 100644 --- a/src/test/java/com/rabbitmq/stream/impl/ClientTest.java +++ b/src/test/java/com/rabbitmq/stream/impl/ClientTest.java @@ -62,11 +62,10 @@ import java.util.stream.IntStream; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.TestInfo; -import org.junit.jupiter.api.extension.ExtendWith; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.ValueSource; -@ExtendWith(TestUtils.StreamTestInfrastructureExtension.class) +@StreamTestInfrastructure public class ClientTest { static final Charset UTF8 = StandardCharsets.UTF_8; diff --git a/src/test/java/com/rabbitmq/stream/impl/FilteringTest.java b/src/test/java/com/rabbitmq/stream/impl/FilteringTest.java index 4118b78b16..85b805bf69 100644 --- a/src/test/java/com/rabbitmq/stream/impl/FilteringTest.java +++ b/src/test/java/com/rabbitmq/stream/impl/FilteringTest.java @@ -41,14 +41,13 @@ import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.TestInfo; -import org.junit.jupiter.api.extension.ExtendWith; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.CsvSource; import org.junit.jupiter.params.provider.NullSource; import org.junit.jupiter.params.provider.ValueSource; @DisabledIfFilteringNotSupported -@ExtendWith(TestUtils.StreamTestInfrastructureExtension.class) +@StreamTestInfrastructure public class FilteringTest { private static final Duration CONDITION_TIMEOUT = Duration.ofSeconds(5); diff --git a/src/test/java/com/rabbitmq/stream/impl/MetadataTest.java b/src/test/java/com/rabbitmq/stream/impl/MetadataTest.java index de02f23093..2a2a295719 100644 --- a/src/test/java/com/rabbitmq/stream/impl/MetadataTest.java +++ b/src/test/java/com/rabbitmq/stream/impl/MetadataTest.java @@ -32,12 +32,11 @@ import java.util.stream.IntStream; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.TestInfo; -import org.junit.jupiter.api.extension.ExtendWith; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.CsvSource; import org.junit.jupiter.params.provider.ValueSource; -@ExtendWith(TestUtils.StreamTestInfrastructureExtension.class) +@StreamTestInfrastructure public class MetadataTest { TestUtils.ClientFactory cf; diff --git a/src/test/java/com/rabbitmq/stream/impl/OutboundMappingCallbackTest.java b/src/test/java/com/rabbitmq/stream/impl/OutboundMappingCallbackTest.java index d424070a5d..00fcc68edb 100644 --- a/src/test/java/com/rabbitmq/stream/impl/OutboundMappingCallbackTest.java +++ b/src/test/java/com/rabbitmq/stream/impl/OutboundMappingCallbackTest.java @@ -28,9 +28,8 @@ import java.util.stream.Collectors; import java.util.stream.IntStream; import org.junit.jupiter.api.Test; -import org.junit.jupiter.api.extension.ExtendWith; -@ExtendWith(TestUtils.StreamTestInfrastructureExtension.class) +@StreamTestInfrastructure public class OutboundMappingCallbackTest { TestUtils.ClientFactory cf; diff --git a/src/test/java/com/rabbitmq/stream/impl/SuperStreamProducerTest.java b/src/test/java/com/rabbitmq/stream/impl/SuperStreamProducerTest.java index e2d43d7aa1..e2aeef1a27 100644 --- a/src/test/java/com/rabbitmq/stream/impl/SuperStreamProducerTest.java +++ b/src/test/java/com/rabbitmq/stream/impl/SuperStreamProducerTest.java @@ -40,9 +40,8 @@ import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.TestInfo; -import org.junit.jupiter.api.extension.ExtendWith; -@ExtendWith(TestUtils.StreamTestInfrastructureExtension.class) +@StreamTestInfrastructure public class SuperStreamProducerTest { EventLoopGroup eventLoopGroup; diff --git a/src/test/java/com/rabbitmq/stream/impl/SuperStreamTest.java b/src/test/java/com/rabbitmq/stream/impl/SuperStreamTest.java index 0f3021126e..e69826eaff 100644 --- a/src/test/java/com/rabbitmq/stream/impl/SuperStreamTest.java +++ b/src/test/java/com/rabbitmq/stream/impl/SuperStreamTest.java @@ -43,9 +43,8 @@ import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.TestInfo; -import org.junit.jupiter.api.extension.ExtendWith; -@ExtendWith(TestUtils.StreamTestInfrastructureExtension.class) +@StreamTestInfrastructure public class SuperStreamTest { EventLoopGroup eventLoopGroup;