Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
106 changes: 66 additions & 40 deletions src/main/java/com/rabbitmq/stream/impl/Client.java
Original file line number Diff line number Diff line change
Expand Up @@ -496,10 +496,10 @@ private Map<String, String> peerProperties() {
throw new StreamException("Error when establishing stream connection", request.error());
}
} catch (StreamException e) {
outstandingRequests.remove(correlationId);
this.handleRpcError(correlationId, e);
throw e;
} catch (RuntimeException e) {
outstandingRequests.remove(correlationId);
this.handleRpcError(correlationId, e);
throw new StreamException("Error while trying to exchange peer properties", e);
}
}
Expand Down Expand Up @@ -572,10 +572,10 @@ private SaslAuthenticateResponse sendSaslAuthenticate(
request.block();
return request.response.get();
} catch (StreamException e) {
outstandingRequests.remove(correlationId);
this.handleRpcError(correlationId, e);
throw e;
} catch (RuntimeException e) {
outstandingRequests.remove(correlationId);
this.handleRpcError(correlationId, e);
throw new StreamException("Error while trying to authenticate", e);
}
}
Expand All @@ -602,10 +602,10 @@ private Map<String, String> open(String virtualHost) {
}
return request.response.get().connectionProperties;
} catch (StreamException e) {
outstandingRequests.remove(correlationId);
this.handleRpcError(correlationId, e);
throw e;
} catch (RuntimeException e) {
outstandingRequests.remove(correlationId);
this.handleRpcError(correlationId, e);
throw new StreamException("Error during open command", e);
}
}
Expand Down Expand Up @@ -646,10 +646,10 @@ private void sendClose(short code, String reason) {
+ formatConstant(request.response.get().getResponseCode()));
}
} catch (StreamException e) {
outstandingRequests.remove(correlationId);
this.handleRpcError(correlationId, e);
throw e;
} catch (RuntimeException e) {
outstandingRequests.remove(correlationId);
this.handleRpcError(correlationId, e);
throw new StreamException("Error while closing connection", e);
}
}
Expand All @@ -669,10 +669,10 @@ private List<String> getSaslMechanisms() {
request.block();
return request.response.get();
} catch (StreamException e) {
outstandingRequests.remove(correlationId);
this.handleRpcError(correlationId, e);
throw e;
} catch (RuntimeException e) {
outstandingRequests.remove(correlationId);
this.handleRpcError(correlationId, e);
throw new StreamException("Error while exchanging SASL mechanisms", e);
}
}
Expand All @@ -699,10 +699,10 @@ public Response create(String stream, Map<String, String> arguments) {
request.block();
return request.response.get();
} catch (StreamException e) {
outstandingRequests.remove(correlationId);
this.handleRpcError(correlationId, e);
throw e;
} catch (RuntimeException e) {
outstandingRequests.remove(correlationId);
this.handleRpcError(correlationId, e);
throw new StreamException(format("Error while creating stream '%s'", stream), e);
}
}
Expand Down Expand Up @@ -750,10 +750,10 @@ Response createSuperStream(
request.block();
return request.response.get();
} catch (StreamException e) {
outstandingRequests.remove(correlationId);
this.handleRpcError(correlationId, e);
throw e;
} catch (RuntimeException e) {
outstandingRequests.remove(correlationId);
this.handleRpcError(correlationId, e);
throw new StreamException(format("Error while creating super stream '%s'", superStream), e);
}
}
Expand All @@ -776,10 +776,10 @@ Response deleteSuperStream(String superStream) {
request.block();
return request.response.get();
} catch (StreamException e) {
outstandingRequests.remove(correlationId);
this.handleRpcError(correlationId, e);
throw e;
} catch (RuntimeException e) {
outstandingRequests.remove(correlationId);
this.handleRpcError(correlationId, e);
throw new StreamException(format("Error while deleting stream '%s'", superStream), e);
}
}
Expand Down Expand Up @@ -860,10 +860,10 @@ public Response delete(String stream) {
request.block();
return request.response.get();
} catch (StreamException e) {
outstandingRequests.remove(correlationId);
this.handleRpcError(correlationId, e);
throw e;
} catch (RuntimeException e) {
outstandingRequests.remove(correlationId);
this.handleRpcError(correlationId, e);
throw new StreamException(format("Error while deleting stream '%s'", stream), e);
}
}
Expand Down Expand Up @@ -891,10 +891,10 @@ public Map<String, StreamMetadata> metadata(String... streams) {
request.block();
return request.response.get();
} catch (StreamException e) {
outstandingRequests.remove(correlationId);
this.handleRpcError(correlationId, e);
throw e;
} catch (RuntimeException e) {
outstandingRequests.remove(correlationId);
this.handleRpcError(correlationId, e);
throw new StreamException(
format("Error while getting metadata for stream(s) '%s'", join(",", streams)), e);
}
Expand Down Expand Up @@ -930,10 +930,10 @@ public Response declarePublisher(byte publisherId, String publisherReference, St
request.block();
return request.response.get();
} catch (StreamException e) {
outstandingRequests.remove(correlationId);
this.handleRpcError(correlationId, e);
throw e;
} catch (RuntimeException e) {
outstandingRequests.remove(correlationId);
this.handleRpcError(correlationId, e);
throw new StreamException(
format("Error while declaring publisher for stream '%s'", stream), e);
}
Expand All @@ -955,10 +955,10 @@ public Response deletePublisher(byte publisherId) {
request.block();
return request.response.get();
} catch (StreamException e) {
outstandingRequests.remove(correlationId);
this.handleRpcError(correlationId, e);
throw e;
} catch (RuntimeException e) {
outstandingRequests.remove(correlationId);
this.handleRpcError(correlationId, e);
throw new StreamException("Error while deleting publisher", e);
}
}
Expand Down Expand Up @@ -1293,10 +1293,10 @@ public Response subscribe(
request.block();
return request.response.get();
} catch (StreamException e) {
outstandingRequests.remove(correlationId);
this.handleRpcError(correlationId, e);
throw e;
} catch (RuntimeException e) {
outstandingRequests.remove(correlationId);
this.handleRpcError(correlationId, e);
throw new StreamException(
format("Error while trying to subscribe to stream '%s'", stream), e);
}
Expand Down Expand Up @@ -1351,10 +1351,10 @@ public QueryOffsetResponse queryOffset(String reference, String stream) {
QueryOffsetResponse response = request.response.get();
return response;
} catch (StreamException e) {
outstandingRequests.remove(correlationId);
this.handleRpcError(correlationId, e);
throw e;
} catch (RuntimeException e) {
outstandingRequests.remove(correlationId);
this.handleRpcError(correlationId, e);
throw new StreamException(
format(
"Error while querying offset for reference '%s' on stream '%s'", reference, stream),
Expand Down Expand Up @@ -1397,10 +1397,10 @@ public long queryPublisherSequence(String publisherReference, String stream) {
}
return response.getSequence();
} catch (StreamException e) {
outstandingRequests.remove(correlationId);
this.handleRpcError(correlationId, e);
throw e;
} catch (RuntimeException e) {
outstandingRequests.remove(correlationId);
this.handleRpcError(correlationId, e);
throw new StreamException(
format(
"Error while querying publisher sequence for '%s' on stream '%s'",
Expand All @@ -1425,10 +1425,10 @@ public Response unsubscribe(byte subscriptionId) {
request.block();
return request.response.get();
} catch (StreamException e) {
outstandingRequests.remove(correlationId);
this.handleRpcError(correlationId, e);
throw e;
} catch (RuntimeException e) {
outstandingRequests.remove(correlationId);
this.handleRpcError(correlationId, e);
throw new StreamException("Error while unsubscribing", e);
}
}
Expand All @@ -1450,6 +1450,7 @@ void closingSequence(ShutdownContext.ShutdownReason reason) {
this.shutdownListenerCallback.accept(reason);
}
this.nettyClosing.run();
this.failOutstandingRequests();
if (this.closeDispatchingExecutorService != null) {
this.closeDispatchingExecutorService.accept(this.dispatchingExecutorService);
}
Expand All @@ -1458,6 +1459,24 @@ void closingSequence(ShutdownContext.ShutdownReason reason) {
}
}

private void failOutstandingRequests() {
try {
Exception cause = null;
for (OutstandingRequest<?> request : this.outstandingRequests.values()) {
if (cause == null) {
cause = new ConnectionStreamException("Connection is closed");
}
try {
request.completeExceptionally(cause);
} catch (Exception e) {
LOGGER.debug("Error while failing outstanding request: {}", e.getMessage());
}
}
} catch (Exception e) {
LOGGER.debug("Error while failing outstanding requests: {}", e.getMessage());
}
}

private void closeNetty() {
try {
if (this.channel != null && this.channel.isOpen()) {
Expand Down Expand Up @@ -1567,10 +1586,10 @@ public List<String> route(String routingKey, String superStream) {
request.block();
return request.response.get();
} catch (StreamException e) {
outstandingRequests.remove(correlationId);
this.handleRpcError(correlationId, e);
throw e;
} catch (RuntimeException e) {
outstandingRequests.remove(correlationId);
this.handleRpcError(correlationId, e);
throw new StreamException(
format(
"Error while querying route for routing key '%s' on super stream '%s'",
Expand Down Expand Up @@ -1600,10 +1619,10 @@ public List<String> partitions(String superStream) {
request.block();
return request.response.get();
} catch (StreamException e) {
outstandingRequests.remove(correlationId);
this.handleRpcError(correlationId, e);
throw e;
} catch (RuntimeException e) {
outstandingRequests.remove(correlationId);
this.handleRpcError(correlationId, e);
throw new StreamException(
format("Error while querying partitions for super stream '%s'", superStream), e);
}
Expand Down Expand Up @@ -1632,10 +1651,10 @@ List<FrameHandlerInfo> exchangeCommandVersions() {
request.block();
return request.response.get();
} catch (StreamException e) {
outstandingRequests.remove(correlationId);
this.handleRpcError(correlationId, e);
throw e;
} catch (RuntimeException e) {
outstandingRequests.remove(correlationId);
this.handleRpcError(correlationId, e);
throw new StreamException("Error while exchanging command version", e);
}
}
Expand All @@ -1661,10 +1680,10 @@ StreamStatsResponse streamStats(String stream) {
request.block();
return request.response.get();
} catch (StreamException e) {
outstandingRequests.remove(correlationId);
this.handleRpcError(correlationId, e);
throw e;
} catch (RuntimeException e) {
outstandingRequests.remove(correlationId);
this.handleRpcError(correlationId, e);
throw new StreamException(
format("Error while querying statistics for stream '%s'", stream), e);
}
Expand Down Expand Up @@ -2950,4 +2969,11 @@ private void debug(Supplier<String> format, Object... args) {
LOGGER.debug("Connection '" + this.clientConnectionName + "': " + format.get(), args);
}
}

private void handleRpcError(int correlationId, Exception e) {
OutstandingRequest<?> request = this.outstandingRequests.remove(correlationId);
if (request != null) {
request.completeExceptionally(e);
}
}
}
3 changes: 1 addition & 2 deletions src/test/java/com/rabbitmq/stream/impl/ClientTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -62,11 +62,10 @@
import java.util.stream.IntStream;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInfo;
import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;

@ExtendWith(TestUtils.StreamTestInfrastructureExtension.class)
@StreamTestInfrastructure
public class ClientTest {

static final Charset UTF8 = StandardCharsets.UTF_8;
Expand Down
3 changes: 1 addition & 2 deletions src/test/java/com/rabbitmq/stream/impl/FilteringTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -41,14 +41,13 @@
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInfo;
import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.CsvSource;
import org.junit.jupiter.params.provider.NullSource;
import org.junit.jupiter.params.provider.ValueSource;

@DisabledIfFilteringNotSupported
@ExtendWith(TestUtils.StreamTestInfrastructureExtension.class)
@StreamTestInfrastructure
public class FilteringTest {

private static final Duration CONDITION_TIMEOUT = Duration.ofSeconds(5);
Expand Down
3 changes: 1 addition & 2 deletions src/test/java/com/rabbitmq/stream/impl/MetadataTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -32,12 +32,11 @@
import java.util.stream.IntStream;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInfo;
import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.CsvSource;
import org.junit.jupiter.params.provider.ValueSource;

@ExtendWith(TestUtils.StreamTestInfrastructureExtension.class)
@StreamTestInfrastructure
public class MetadataTest {

TestUtils.ClientFactory cf;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,8 @@
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;

@ExtendWith(TestUtils.StreamTestInfrastructureExtension.class)
@StreamTestInfrastructure
public class OutboundMappingCallbackTest {

TestUtils.ClientFactory cf;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,8 @@
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInfo;
import org.junit.jupiter.api.extension.ExtendWith;

@ExtendWith(TestUtils.StreamTestInfrastructureExtension.class)
@StreamTestInfrastructure
public class SuperStreamProducerTest {

EventLoopGroup eventLoopGroup;
Expand Down
3 changes: 1 addition & 2 deletions src/test/java/com/rabbitmq/stream/impl/SuperStreamTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -43,9 +43,8 @@
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInfo;
import org.junit.jupiter.api.extension.ExtendWith;

@ExtendWith(TestUtils.StreamTestInfrastructureExtension.class)
@StreamTestInfrastructure
public class SuperStreamTest {

EventLoopGroup eventLoopGroup;
Expand Down