Skip to content

Commit 63b0497

Browse files
authored
Merge pull request #812 from rabbitmq/clean-up-outstanding-requests
Clean up outstanding requests on error and on closing
2 parents a48291e + 0a987ce commit 63b0497

File tree

7 files changed

+72
-52
lines changed

7 files changed

+72
-52
lines changed

src/main/java/com/rabbitmq/stream/impl/Client.java

Lines changed: 66 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -496,10 +496,10 @@ private Map<String, String> peerProperties() {
496496
throw new StreamException("Error when establishing stream connection", request.error());
497497
}
498498
} catch (StreamException e) {
499-
outstandingRequests.remove(correlationId);
499+
this.handleRpcError(correlationId, e);
500500
throw e;
501501
} catch (RuntimeException e) {
502-
outstandingRequests.remove(correlationId);
502+
this.handleRpcError(correlationId, e);
503503
throw new StreamException("Error while trying to exchange peer properties", e);
504504
}
505505
}
@@ -572,10 +572,10 @@ private SaslAuthenticateResponse sendSaslAuthenticate(
572572
request.block();
573573
return request.response.get();
574574
} catch (StreamException e) {
575-
outstandingRequests.remove(correlationId);
575+
this.handleRpcError(correlationId, e);
576576
throw e;
577577
} catch (RuntimeException e) {
578-
outstandingRequests.remove(correlationId);
578+
this.handleRpcError(correlationId, e);
579579
throw new StreamException("Error while trying to authenticate", e);
580580
}
581581
}
@@ -602,10 +602,10 @@ private Map<String, String> open(String virtualHost) {
602602
}
603603
return request.response.get().connectionProperties;
604604
} catch (StreamException e) {
605-
outstandingRequests.remove(correlationId);
605+
this.handleRpcError(correlationId, e);
606606
throw e;
607607
} catch (RuntimeException e) {
608-
outstandingRequests.remove(correlationId);
608+
this.handleRpcError(correlationId, e);
609609
throw new StreamException("Error during open command", e);
610610
}
611611
}
@@ -646,10 +646,10 @@ private void sendClose(short code, String reason) {
646646
+ formatConstant(request.response.get().getResponseCode()));
647647
}
648648
} catch (StreamException e) {
649-
outstandingRequests.remove(correlationId);
649+
this.handleRpcError(correlationId, e);
650650
throw e;
651651
} catch (RuntimeException e) {
652-
outstandingRequests.remove(correlationId);
652+
this.handleRpcError(correlationId, e);
653653
throw new StreamException("Error while closing connection", e);
654654
}
655655
}
@@ -669,10 +669,10 @@ private List<String> getSaslMechanisms() {
669669
request.block();
670670
return request.response.get();
671671
} catch (StreamException e) {
672-
outstandingRequests.remove(correlationId);
672+
this.handleRpcError(correlationId, e);
673673
throw e;
674674
} catch (RuntimeException e) {
675-
outstandingRequests.remove(correlationId);
675+
this.handleRpcError(correlationId, e);
676676
throw new StreamException("Error while exchanging SASL mechanisms", e);
677677
}
678678
}
@@ -699,10 +699,10 @@ public Response create(String stream, Map<String, String> arguments) {
699699
request.block();
700700
return request.response.get();
701701
} catch (StreamException e) {
702-
outstandingRequests.remove(correlationId);
702+
this.handleRpcError(correlationId, e);
703703
throw e;
704704
} catch (RuntimeException e) {
705-
outstandingRequests.remove(correlationId);
705+
this.handleRpcError(correlationId, e);
706706
throw new StreamException(format("Error while creating stream '%s'", stream), e);
707707
}
708708
}
@@ -750,10 +750,10 @@ Response createSuperStream(
750750
request.block();
751751
return request.response.get();
752752
} catch (StreamException e) {
753-
outstandingRequests.remove(correlationId);
753+
this.handleRpcError(correlationId, e);
754754
throw e;
755755
} catch (RuntimeException e) {
756-
outstandingRequests.remove(correlationId);
756+
this.handleRpcError(correlationId, e);
757757
throw new StreamException(format("Error while creating super stream '%s'", superStream), e);
758758
}
759759
}
@@ -776,10 +776,10 @@ Response deleteSuperStream(String superStream) {
776776
request.block();
777777
return request.response.get();
778778
} catch (StreamException e) {
779-
outstandingRequests.remove(correlationId);
779+
this.handleRpcError(correlationId, e);
780780
throw e;
781781
} catch (RuntimeException e) {
782-
outstandingRequests.remove(correlationId);
782+
this.handleRpcError(correlationId, e);
783783
throw new StreamException(format("Error while deleting stream '%s'", superStream), e);
784784
}
785785
}
@@ -860,10 +860,10 @@ public Response delete(String stream) {
860860
request.block();
861861
return request.response.get();
862862
} catch (StreamException e) {
863-
outstandingRequests.remove(correlationId);
863+
this.handleRpcError(correlationId, e);
864864
throw e;
865865
} catch (RuntimeException e) {
866-
outstandingRequests.remove(correlationId);
866+
this.handleRpcError(correlationId, e);
867867
throw new StreamException(format("Error while deleting stream '%s'", stream), e);
868868
}
869869
}
@@ -891,10 +891,10 @@ public Map<String, StreamMetadata> metadata(String... streams) {
891891
request.block();
892892
return request.response.get();
893893
} catch (StreamException e) {
894-
outstandingRequests.remove(correlationId);
894+
this.handleRpcError(correlationId, e);
895895
throw e;
896896
} catch (RuntimeException e) {
897-
outstandingRequests.remove(correlationId);
897+
this.handleRpcError(correlationId, e);
898898
throw new StreamException(
899899
format("Error while getting metadata for stream(s) '%s'", join(",", streams)), e);
900900
}
@@ -930,10 +930,10 @@ public Response declarePublisher(byte publisherId, String publisherReference, St
930930
request.block();
931931
return request.response.get();
932932
} catch (StreamException e) {
933-
outstandingRequests.remove(correlationId);
933+
this.handleRpcError(correlationId, e);
934934
throw e;
935935
} catch (RuntimeException e) {
936-
outstandingRequests.remove(correlationId);
936+
this.handleRpcError(correlationId, e);
937937
throw new StreamException(
938938
format("Error while declaring publisher for stream '%s'", stream), e);
939939
}
@@ -955,10 +955,10 @@ public Response deletePublisher(byte publisherId) {
955955
request.block();
956956
return request.response.get();
957957
} catch (StreamException e) {
958-
outstandingRequests.remove(correlationId);
958+
this.handleRpcError(correlationId, e);
959959
throw e;
960960
} catch (RuntimeException e) {
961-
outstandingRequests.remove(correlationId);
961+
this.handleRpcError(correlationId, e);
962962
throw new StreamException("Error while deleting publisher", e);
963963
}
964964
}
@@ -1293,10 +1293,10 @@ public Response subscribe(
12931293
request.block();
12941294
return request.response.get();
12951295
} catch (StreamException e) {
1296-
outstandingRequests.remove(correlationId);
1296+
this.handleRpcError(correlationId, e);
12971297
throw e;
12981298
} catch (RuntimeException e) {
1299-
outstandingRequests.remove(correlationId);
1299+
this.handleRpcError(correlationId, e);
13001300
throw new StreamException(
13011301
format("Error while trying to subscribe to stream '%s'", stream), e);
13021302
}
@@ -1351,10 +1351,10 @@ public QueryOffsetResponse queryOffset(String reference, String stream) {
13511351
QueryOffsetResponse response = request.response.get();
13521352
return response;
13531353
} catch (StreamException e) {
1354-
outstandingRequests.remove(correlationId);
1354+
this.handleRpcError(correlationId, e);
13551355
throw e;
13561356
} catch (RuntimeException e) {
1357-
outstandingRequests.remove(correlationId);
1357+
this.handleRpcError(correlationId, e);
13581358
throw new StreamException(
13591359
format(
13601360
"Error while querying offset for reference '%s' on stream '%s'", reference, stream),
@@ -1397,10 +1397,10 @@ public long queryPublisherSequence(String publisherReference, String stream) {
13971397
}
13981398
return response.getSequence();
13991399
} catch (StreamException e) {
1400-
outstandingRequests.remove(correlationId);
1400+
this.handleRpcError(correlationId, e);
14011401
throw e;
14021402
} catch (RuntimeException e) {
1403-
outstandingRequests.remove(correlationId);
1403+
this.handleRpcError(correlationId, e);
14041404
throw new StreamException(
14051405
format(
14061406
"Error while querying publisher sequence for '%s' on stream '%s'",
@@ -1425,10 +1425,10 @@ public Response unsubscribe(byte subscriptionId) {
14251425
request.block();
14261426
return request.response.get();
14271427
} catch (StreamException e) {
1428-
outstandingRequests.remove(correlationId);
1428+
this.handleRpcError(correlationId, e);
14291429
throw e;
14301430
} catch (RuntimeException e) {
1431-
outstandingRequests.remove(correlationId);
1431+
this.handleRpcError(correlationId, e);
14321432
throw new StreamException("Error while unsubscribing", e);
14331433
}
14341434
}
@@ -1450,6 +1450,7 @@ void closingSequence(ShutdownContext.ShutdownReason reason) {
14501450
this.shutdownListenerCallback.accept(reason);
14511451
}
14521452
this.nettyClosing.run();
1453+
this.failOutstandingRequests();
14531454
if (this.closeDispatchingExecutorService != null) {
14541455
this.closeDispatchingExecutorService.accept(this.dispatchingExecutorService);
14551456
}
@@ -1458,6 +1459,24 @@ void closingSequence(ShutdownContext.ShutdownReason reason) {
14581459
}
14591460
}
14601461

1462+
private void failOutstandingRequests() {
1463+
try {
1464+
Exception cause = null;
1465+
for (OutstandingRequest<?> request : this.outstandingRequests.values()) {
1466+
if (cause == null) {
1467+
cause = new ConnectionStreamException("Connection is closed");
1468+
}
1469+
try {
1470+
request.completeExceptionally(cause);
1471+
} catch (Exception e) {
1472+
LOGGER.debug("Error while failing outstanding request: {}", e.getMessage());
1473+
}
1474+
}
1475+
} catch (Exception e) {
1476+
LOGGER.debug("Error while failing outstanding requests: {}", e.getMessage());
1477+
}
1478+
}
1479+
14611480
private void closeNetty() {
14621481
try {
14631482
if (this.channel != null && this.channel.isOpen()) {
@@ -1567,10 +1586,10 @@ public List<String> route(String routingKey, String superStream) {
15671586
request.block();
15681587
return request.response.get();
15691588
} catch (StreamException e) {
1570-
outstandingRequests.remove(correlationId);
1589+
this.handleRpcError(correlationId, e);
15711590
throw e;
15721591
} catch (RuntimeException e) {
1573-
outstandingRequests.remove(correlationId);
1592+
this.handleRpcError(correlationId, e);
15741593
throw new StreamException(
15751594
format(
15761595
"Error while querying route for routing key '%s' on super stream '%s'",
@@ -1600,10 +1619,10 @@ public List<String> partitions(String superStream) {
16001619
request.block();
16011620
return request.response.get();
16021621
} catch (StreamException e) {
1603-
outstandingRequests.remove(correlationId);
1622+
this.handleRpcError(correlationId, e);
16041623
throw e;
16051624
} catch (RuntimeException e) {
1606-
outstandingRequests.remove(correlationId);
1625+
this.handleRpcError(correlationId, e);
16071626
throw new StreamException(
16081627
format("Error while querying partitions for super stream '%s'", superStream), e);
16091628
}
@@ -1632,10 +1651,10 @@ List<FrameHandlerInfo> exchangeCommandVersions() {
16321651
request.block();
16331652
return request.response.get();
16341653
} catch (StreamException e) {
1635-
outstandingRequests.remove(correlationId);
1654+
this.handleRpcError(correlationId, e);
16361655
throw e;
16371656
} catch (RuntimeException e) {
1638-
outstandingRequests.remove(correlationId);
1657+
this.handleRpcError(correlationId, e);
16391658
throw new StreamException("Error while exchanging command version", e);
16401659
}
16411660
}
@@ -1661,10 +1680,10 @@ StreamStatsResponse streamStats(String stream) {
16611680
request.block();
16621681
return request.response.get();
16631682
} catch (StreamException e) {
1664-
outstandingRequests.remove(correlationId);
1683+
this.handleRpcError(correlationId, e);
16651684
throw e;
16661685
} catch (RuntimeException e) {
1667-
outstandingRequests.remove(correlationId);
1686+
this.handleRpcError(correlationId, e);
16681687
throw new StreamException(
16691688
format("Error while querying statistics for stream '%s'", stream), e);
16701689
}
@@ -2950,4 +2969,11 @@ private void debug(Supplier<String> format, Object... args) {
29502969
LOGGER.debug("Connection '" + this.clientConnectionName + "': " + format.get(), args);
29512970
}
29522971
}
2972+
2973+
private void handleRpcError(int correlationId, Exception e) {
2974+
OutstandingRequest<?> request = this.outstandingRequests.remove(correlationId);
2975+
if (request != null) {
2976+
request.completeExceptionally(e);
2977+
}
2978+
}
29532979
}

src/test/java/com/rabbitmq/stream/impl/ClientTest.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -62,11 +62,10 @@
6262
import java.util.stream.IntStream;
6363
import org.junit.jupiter.api.Test;
6464
import org.junit.jupiter.api.TestInfo;
65-
import org.junit.jupiter.api.extension.ExtendWith;
6665
import org.junit.jupiter.params.ParameterizedTest;
6766
import org.junit.jupiter.params.provider.ValueSource;
6867

69-
@ExtendWith(TestUtils.StreamTestInfrastructureExtension.class)
68+
@StreamTestInfrastructure
7069
public class ClientTest {
7170

7271
static final Charset UTF8 = StandardCharsets.UTF_8;

src/test/java/com/rabbitmq/stream/impl/FilteringTest.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -41,14 +41,13 @@
4141
import org.junit.jupiter.api.BeforeEach;
4242
import org.junit.jupiter.api.Test;
4343
import org.junit.jupiter.api.TestInfo;
44-
import org.junit.jupiter.api.extension.ExtendWith;
4544
import org.junit.jupiter.params.ParameterizedTest;
4645
import org.junit.jupiter.params.provider.CsvSource;
4746
import org.junit.jupiter.params.provider.NullSource;
4847
import org.junit.jupiter.params.provider.ValueSource;
4948

5049
@DisabledIfFilteringNotSupported
51-
@ExtendWith(TestUtils.StreamTestInfrastructureExtension.class)
50+
@StreamTestInfrastructure
5251
public class FilteringTest {
5352

5453
private static final Duration CONDITION_TIMEOUT = Duration.ofSeconds(5);

src/test/java/com/rabbitmq/stream/impl/MetadataTest.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -32,12 +32,11 @@
3232
import java.util.stream.IntStream;
3333
import org.junit.jupiter.api.Test;
3434
import org.junit.jupiter.api.TestInfo;
35-
import org.junit.jupiter.api.extension.ExtendWith;
3635
import org.junit.jupiter.params.ParameterizedTest;
3736
import org.junit.jupiter.params.provider.CsvSource;
3837
import org.junit.jupiter.params.provider.ValueSource;
3938

40-
@ExtendWith(TestUtils.StreamTestInfrastructureExtension.class)
39+
@StreamTestInfrastructure
4140
public class MetadataTest {
4241

4342
TestUtils.ClientFactory cf;

src/test/java/com/rabbitmq/stream/impl/OutboundMappingCallbackTest.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,9 +28,8 @@
2828
import java.util.stream.Collectors;
2929
import java.util.stream.IntStream;
3030
import org.junit.jupiter.api.Test;
31-
import org.junit.jupiter.api.extension.ExtendWith;
3231

33-
@ExtendWith(TestUtils.StreamTestInfrastructureExtension.class)
32+
@StreamTestInfrastructure
3433
public class OutboundMappingCallbackTest {
3534

3635
TestUtils.ClientFactory cf;

src/test/java/com/rabbitmq/stream/impl/SuperStreamProducerTest.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -40,9 +40,8 @@
4040
import org.junit.jupiter.api.BeforeEach;
4141
import org.junit.jupiter.api.Test;
4242
import org.junit.jupiter.api.TestInfo;
43-
import org.junit.jupiter.api.extension.ExtendWith;
4443

45-
@ExtendWith(TestUtils.StreamTestInfrastructureExtension.class)
44+
@StreamTestInfrastructure
4645
public class SuperStreamProducerTest {
4746

4847
EventLoopGroup eventLoopGroup;

src/test/java/com/rabbitmq/stream/impl/SuperStreamTest.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -43,9 +43,8 @@
4343
import org.junit.jupiter.api.BeforeEach;
4444
import org.junit.jupiter.api.Test;
4545
import org.junit.jupiter.api.TestInfo;
46-
import org.junit.jupiter.api.extension.ExtendWith;
4746

48-
@ExtendWith(TestUtils.StreamTestInfrastructureExtension.class)
47+
@StreamTestInfrastructure
4948
public class SuperStreamTest {
5049

5150
EventLoopGroup eventLoopGroup;

0 commit comments

Comments
 (0)