70
70
import io .netty .handler .timeout .IdleState ;
71
71
import io .netty .handler .timeout .IdleStateEvent ;
72
72
import io .netty .handler .timeout .IdleStateHandler ;
73
+ import io .netty .util .concurrent .Future ;
74
+ import io .netty .util .concurrent .GenericFutureListener ;
73
75
import java .io .IOException ;
74
76
import java .io .OutputStream ;
75
77
import java .lang .reflect .Field ;
@@ -488,7 +490,7 @@ private Map<String, String> peerProperties() {
488
490
OutstandingRequest <Map <String , String >> request = outstandingRequest ();
489
491
LOGGER .debug ("Peer properties request has correlation ID {}" , correlationId );
490
492
outstandingRequests .put (correlationId , request );
491
- channel .writeAndFlush (bb );
493
+ channel .writeAndFlush (bb ). addListener ( maybeFailRpc ( correlationId )) ;
492
494
request .block ();
493
495
if (request .error () == null ) {
494
496
return request .response .get ();
@@ -568,7 +570,7 @@ private SaslAuthenticateResponse sendSaslAuthenticate(
568
570
}
569
571
OutstandingRequest <SaslAuthenticateResponse > request = outstandingRequest ();
570
572
outstandingRequests .put (correlationId , request );
571
- channel .writeAndFlush (bb );
573
+ channel .writeAndFlush (bb ). addListener ( maybeFailRpc ( correlationId )) ;
572
574
request .block ();
573
575
return request .response .get ();
574
576
} catch (StreamException e ) {
@@ -593,7 +595,7 @@ private Map<String, String> open(String virtualHost) {
593
595
bb .writeBytes (virtualHost .getBytes (CHARSET ));
594
596
OutstandingRequest <OpenResponse > request = outstandingRequest ();
595
597
outstandingRequests .put (correlationId , request );
596
- channel .writeAndFlush (bb );
598
+ channel .writeAndFlush (bb ). addListener ( maybeFailRpc ( correlationId )) ;
597
599
request .block ();
598
600
if (!request .response .get ().isOk ()) {
599
601
throw new StreamException (
@@ -635,7 +637,7 @@ private void sendClose(short code, String reason) {
635
637
bb .writeBytes (reason .getBytes (CHARSET ));
636
638
OutstandingRequest <Response > request = outstandingRequest ();
637
639
outstandingRequests .put (correlationId , request );
638
- channel .writeAndFlush (bb );
640
+ channel .writeAndFlush (bb ). addListener ( maybeFailRpc ( correlationId )) ;
639
641
request .block ();
640
642
if (!request .response .get ().isOk ()) {
641
643
LOGGER .warn (
@@ -665,7 +667,7 @@ private List<String> getSaslMechanisms() {
665
667
bb .writeInt (correlationId );
666
668
OutstandingRequest <List <String >> request = outstandingRequest ();
667
669
outstandingRequests .put (correlationId , request );
668
- channel .writeAndFlush (bb );
670
+ channel .writeAndFlush (bb ). addListener ( maybeFailRpc ( correlationId )) ;
669
671
request .block ();
670
672
return request .response .get ();
671
673
} catch (StreamException e ) {
@@ -695,7 +697,7 @@ public Response create(String stream, Map<String, String> arguments) {
695
697
writeMap (bb , arguments );
696
698
OutstandingRequest <Response > request = outstandingRequest ();
697
699
outstandingRequests .put (correlationId , request );
698
- channel .writeAndFlush (bb );
700
+ channel .writeAndFlush (bb ). addListener ( maybeFailRpc ( correlationId )) ;
699
701
request .block ();
700
702
return request .response .get ();
701
703
} catch (StreamException e ) {
@@ -746,7 +748,7 @@ Response createSuperStream(
746
748
writeMap (bb , arguments );
747
749
OutstandingRequest <Response > request = outstandingRequest ();
748
750
outstandingRequests .put (correlationId , request );
749
- channel .writeAndFlush (bb );
751
+ channel .writeAndFlush (bb ). addListener ( maybeFailRpc ( correlationId )) ;
750
752
request .block ();
751
753
return request .response .get ();
752
754
} catch (StreamException e ) {
@@ -772,7 +774,7 @@ Response deleteSuperStream(String superStream) {
772
774
bb .writeBytes (superStream .getBytes (CHARSET ));
773
775
OutstandingRequest <Response > request = outstandingRequest ();
774
776
outstandingRequests .put (correlationId , request );
775
- channel .writeAndFlush (bb );
777
+ channel .writeAndFlush (bb ). addListener ( maybeFailRpc ( correlationId )) ;
776
778
request .block ();
777
779
return request .response .get ();
778
780
} catch (StreamException e ) {
@@ -856,7 +858,7 @@ public Response delete(String stream) {
856
858
bb .writeBytes (stream .getBytes (CHARSET ));
857
859
OutstandingRequest <Response > request = outstandingRequest ();
858
860
outstandingRequests .put (correlationId , request );
859
- channel .writeAndFlush (bb );
861
+ channel .writeAndFlush (bb ). addListener ( maybeFailRpc ( correlationId )) ;
860
862
request .block ();
861
863
return request .response .get ();
862
864
} catch (StreamException e ) {
@@ -887,7 +889,7 @@ public Map<String, StreamMetadata> metadata(String... streams) {
887
889
writeArray (bb , streams );
888
890
OutstandingRequest <Map <String , StreamMetadata >> request = outstandingRequest ();
889
891
outstandingRequests .put (correlationId , request );
890
- channel .writeAndFlush (bb );
892
+ channel .writeAndFlush (bb ). addListener ( maybeFailRpc ( correlationId )) ;
891
893
request .block ();
892
894
return request .response .get ();
893
895
} catch (StreamException e ) {
@@ -926,7 +928,7 @@ public Response declarePublisher(byte publisherId, String publisherReference, St
926
928
bb .writeBytes (stream .getBytes (CHARSET ));
927
929
OutstandingRequest <Response > request = outstandingRequest ();
928
930
outstandingRequests .put (correlationId , request );
929
- channel .writeAndFlush (bb );
931
+ channel .writeAndFlush (bb ). addListener ( maybeFailRpc ( correlationId )) ;
930
932
request .block ();
931
933
return request .response .get ();
932
934
} catch (StreamException e ) {
@@ -951,7 +953,7 @@ public Response deletePublisher(byte publisherId) {
951
953
bb .writeByte (publisherId );
952
954
OutstandingRequest <Response > request = outstandingRequest ();
953
955
outstandingRequests .put (correlationId , request );
954
- channel .writeAndFlush (bb );
956
+ channel .writeAndFlush (bb ). addListener ( maybeFailRpc ( correlationId )) ;
955
957
request .block ();
956
958
return request .response .get ();
957
959
} catch (StreamException e ) {
@@ -1289,7 +1291,7 @@ public Response subscribe(
1289
1291
subscriptionOffsets .add (
1290
1292
new SubscriptionOffset (subscriptionId , offsetSpecification .getOffset ()));
1291
1293
}
1292
- channel .writeAndFlush (bb );
1294
+ channel .writeAndFlush (bb ). addListener ( maybeFailRpc ( correlationId )) ;
1293
1295
request .block ();
1294
1296
return request .response .get ();
1295
1297
} catch (StreamException e ) {
@@ -1346,10 +1348,9 @@ public QueryOffsetResponse queryOffset(String reference, String stream) {
1346
1348
bb .writeBytes (stream .getBytes (CHARSET ));
1347
1349
OutstandingRequest <QueryOffsetResponse > request = outstandingRequest ();
1348
1350
outstandingRequests .put (correlationId , request );
1349
- channel .writeAndFlush (bb );
1351
+ channel .writeAndFlush (bb ). addListener ( maybeFailRpc ( correlationId )) ;
1350
1352
request .block ();
1351
- QueryOffsetResponse response = request .response .get ();
1352
- return response ;
1353
+ return request .response .get ();
1353
1354
} catch (StreamException e ) {
1354
1355
this .handleRpcError (correlationId , e );
1355
1356
throw e ;
@@ -1387,7 +1388,7 @@ public long queryPublisherSequence(String publisherReference, String stream) {
1387
1388
bb .writeBytes (stream .getBytes (CHARSET ));
1388
1389
OutstandingRequest <QueryPublisherSequenceResponse > request = outstandingRequest ();
1389
1390
outstandingRequests .put (correlationId , request );
1390
- channel .writeAndFlush (bb );
1391
+ channel .writeAndFlush (bb ). addListener ( maybeFailRpc ( correlationId )) ;
1391
1392
request .block ();
1392
1393
QueryPublisherSequenceResponse response = request .response .get ();
1393
1394
if (!response .isOk ()) {
@@ -1421,7 +1422,7 @@ public Response unsubscribe(byte subscriptionId) {
1421
1422
bb .writeByte (subscriptionId );
1422
1423
OutstandingRequest <Response > request = outstandingRequest ();
1423
1424
outstandingRequests .put (correlationId , request );
1424
- channel .writeAndFlush (bb );
1425
+ channel .writeAndFlush (bb ). addListener ( maybeFailRpc ( correlationId )) ;
1425
1426
request .block ();
1426
1427
return request .response .get ();
1427
1428
} catch (StreamException e ) {
@@ -1586,7 +1587,7 @@ public List<String> route(String routingKey, String superStream) {
1586
1587
bb .writeBytes (superStream .getBytes (CHARSET ));
1587
1588
OutstandingRequest <List <String >> request = outstandingRequest ();
1588
1589
outstandingRequests .put (correlationId , request );
1589
- channel .writeAndFlush (bb );
1590
+ channel .writeAndFlush (bb ). addListener ( maybeFailRpc ( correlationId )) ;
1590
1591
request .block ();
1591
1592
return request .response .get ();
1592
1593
} catch (StreamException e ) {
@@ -1619,7 +1620,7 @@ public List<String> partitions(String superStream) {
1619
1620
bb .writeBytes (superStream .getBytes (CHARSET ));
1620
1621
OutstandingRequest <List <String >> request = outstandingRequest ();
1621
1622
outstandingRequests .put (correlationId , request );
1622
- channel .writeAndFlush (bb );
1623
+ channel .writeAndFlush (bb ). addListener ( maybeFailRpc ( correlationId )) ;
1623
1624
request .block ();
1624
1625
return request .response .get ();
1625
1626
} catch (StreamException e ) {
@@ -1651,7 +1652,7 @@ List<FrameHandlerInfo> exchangeCommandVersions() {
1651
1652
}
1652
1653
OutstandingRequest <List <FrameHandlerInfo >> request = outstandingRequest ();
1653
1654
outstandingRequests .put (correlationId , request );
1654
- channel .writeAndFlush (bb );
1655
+ channel .writeAndFlush (bb ). addListener ( maybeFailRpc ( correlationId )) ;
1655
1656
request .block ();
1656
1657
return request .response .get ();
1657
1658
} catch (StreamException e ) {
@@ -1680,7 +1681,7 @@ StreamStatsResponse streamStats(String stream) {
1680
1681
bb .writeBytes (stream .getBytes (CHARSET ));
1681
1682
OutstandingRequest <StreamStatsResponse > request = outstandingRequest ();
1682
1683
outstandingRequests .put (correlationId , request );
1683
- channel .writeAndFlush (bb );
1684
+ channel .writeAndFlush (bb ). addListener ( maybeFailRpc ( correlationId )) ;
1684
1685
request .block ();
1685
1686
return request .response .get ();
1686
1687
} catch (StreamException e ) {
@@ -2972,10 +2973,37 @@ private void debug(Supplier<String> format, Object... args) {
2972
2973
}
2973
2974
}
2974
2975
2975
- private void handleRpcError (int correlationId , Exception e ) {
2976
- OutstandingRequest <?> request = this .outstandingRequests .remove (correlationId );
2976
+ private GenericFutureListener <Future <? super Void >> maybeFailRpc (int correlationId ) {
2977
+ return new FailRpcFuture (this .outstandingRequests , correlationId );
2978
+ }
2979
+
2980
+ private void handleRpcError (int correlationId , Throwable e ) {
2981
+ handleRpcError (this .outstandingRequests , correlationId , e );
2982
+ }
2983
+
2984
+ private static void handleRpcError (
2985
+ Map <Integer , OutstandingRequest <?>> requests , int correlationId , Throwable e ) {
2986
+ OutstandingRequest <?> request = requests .remove (correlationId );
2977
2987
if (request != null ) {
2978
2988
request .completeExceptionally (e );
2979
2989
}
2980
2990
}
2991
+
2992
+ private static final class FailRpcFuture implements GenericFutureListener <Future <? super Void >> {
2993
+
2994
+ private final Map <Integer , OutstandingRequest <?>> requests ;
2995
+ private final int correlationId ;
2996
+
2997
+ private FailRpcFuture (Map <Integer , OutstandingRequest <?>> requests , int correlationId ) {
2998
+ this .requests = requests ;
2999
+ this .correlationId = correlationId ;
3000
+ }
3001
+
3002
+ @ Override
3003
+ public void operationComplete (Future <? super Void > f ) {
3004
+ if (!f .isSuccess ()) {
3005
+ handleRpcError (requests , correlationId , f .cause ());
3006
+ }
3007
+ }
3008
+ }
2981
3009
}
0 commit comments