Skip to content

Commit 65c5498

Browse files
OlegDokukaOlegDokuka
authored andcommitted
wip
Signed-off-by: Oleh Dokuka <[email protected]> Signed-off-by: Oleh Dokuka <[email protected]>
1 parent 59c04e2 commit 65c5498

File tree

10 files changed

+127
-18
lines changed

10 files changed

+127
-18
lines changed

rsocket-core/src/jcstress/java/io/rsocket/core/TestRequesterResponderSupport.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
import io.rsocket.DuplexConnection;
66
import io.rsocket.RSocket;
77
import io.rsocket.frame.decoder.PayloadDecoder;
8+
import reactor.core.publisher.Sinks;
89
import reactor.util.annotation.Nullable;
910

1011
public class TestRequesterResponderSupport extends RequesterResponderSupport implements RSocket {
@@ -27,7 +28,8 @@ public TestRequesterResponderSupport(
2728
PayloadDecoder.ZERO_COPY,
2829
connection,
2930
streamIdSupplier,
30-
__ -> null);
31+
__ -> null,
32+
Sinks.empty());
3133
this.requesterLeaseTracker = requesterLeaseTracker;
3234
}
3335

rsocket-core/src/main/java/io/rsocket/core/RSocketRequester.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -201,6 +201,10 @@ public double availability() {
201201

202202
@Override
203203
public void dispose() {
204+
if (terminationError != null) {
205+
return;
206+
}
207+
204208
getDuplexConnection().sendErrorAndClose(new ConnectionErrorException("Disposed"));
205209
}
206210

rsocket-core/src/test/java/io/rsocket/core/AbstractSocketRule.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -44,12 +44,12 @@ public void init() {
4444
}
4545

4646
protected void doInit() {
47-
if (socket != null) {
48-
socket.dispose();
49-
}
5047
if (connection != null) {
5148
connection.dispose();
5249
}
50+
if (socket != null) {
51+
socket.dispose();
52+
}
5353
connection = new TestDuplexConnection(allocator);
5454
socket = newRSocket();
5555
}

rsocket-core/src/test/java/io/rsocket/core/DefaultRSocketClientTests.java

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -644,10 +644,10 @@ public static class ClientSocketRule extends AbstractSocketRule<RSocket> {
644644
protected Sinks.One<RSocket> producer;
645645

646646
protected Sinks.Empty<Void> onGracefulShutdownStartedSink;
647-
648647
protected Sinks.Empty<Void> otherGracefulShutdownSink;
649-
650648
protected Sinks.Empty<Void> thisGracefulShutdownSink;
649+
protected Sinks.Empty<Void> thisClosedSink;
650+
protected Sinks.Empty<Void> otherClosedSink;
651651

652652
@Override
653653
protected void doInit() {
@@ -666,6 +666,11 @@ protected void doInit() {
666666

667667
@Override
668668
protected RSocket newRSocket() {
669+
this.onGracefulShutdownStartedSink = Sinks.empty();
670+
this.otherGracefulShutdownSink = Sinks.empty();
671+
this.thisGracefulShutdownSink = Sinks.empty();
672+
this.thisClosedSink = Sinks.empty();
673+
this.otherClosedSink = Sinks.empty();
669674
return new RSocketRequester(
670675
connection,
671676
PayloadDecoder.ZERO_COPY,
@@ -678,9 +683,11 @@ protected RSocket newRSocket() {
678683
null,
679684
__ -> null,
680685
null,
681-
onGracefulShutdownStartedSink,
682-
thisGracefulShutdownSink,
683-
);
686+
onGracefulShutdownStartedSink,
687+
thisGracefulShutdownSink,
688+
thisClosedSink,
689+
otherGracefulShutdownSink.asMono().and(thisGracefulShutdownSink.asMono()),
690+
otherClosedSink.asMono().and(thisClosedSink.asMono()));
684691
}
685692
}
686693
}

rsocket-core/src/test/java/io/rsocket/core/RSocketLeaseTest.java

Lines changed: 20 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -91,6 +91,11 @@ class RSocketLeaseTest {
9191

9292
private Sinks.Many<Lease> leaseSender = Sinks.many().multicast().onBackpressureBuffer();
9393
private RequesterLeaseTracker requesterLeaseTracker;
94+
protected Sinks.Empty<Void> onGracefulShutdownStartedSink;
95+
protected Sinks.Empty<Void> otherGracefulShutdownSink;
96+
protected Sinks.Empty<Void> thisGracefulShutdownSink;
97+
protected Sinks.Empty<Void> thisClosedSink;
98+
protected Sinks.Empty<Void> otherClosedSink;
9499

95100
@BeforeEach
96101
void setUp() {
@@ -100,6 +105,11 @@ void setUp() {
100105
connection = new TestDuplexConnection(byteBufAllocator);
101106
requesterLeaseTracker = new RequesterLeaseTracker(TAG, 0);
102107
responderLeaseTracker = new ResponderLeaseTracker(TAG, connection, () -> leaseSender.asFlux());
108+
this.onGracefulShutdownStartedSink = Sinks.empty();
109+
this.otherGracefulShutdownSink = Sinks.empty();
110+
this.thisGracefulShutdownSink = Sinks.empty();
111+
this.thisClosedSink = Sinks.empty();
112+
this.otherClosedSink = Sinks.empty();
103113

104114
ClientServerInputMultiplexer multiplexer =
105115
new ClientServerInputMultiplexer(connection, new InitializingInterceptorRegistry(), true);
@@ -115,7 +125,12 @@ void setUp() {
115125
0,
116126
null,
117127
__ -> null,
118-
requesterLeaseTracker);
128+
requesterLeaseTracker,
129+
onGracefulShutdownStartedSink,
130+
thisGracefulShutdownSink,
131+
thisClosedSink,
132+
otherGracefulShutdownSink.asMono().and(thisGracefulShutdownSink.asMono()),
133+
otherClosedSink.asMono().and(thisClosedSink.asMono()));
119134

120135
mockRSocketHandler = mock(RSocket.class);
121136
when(mockRSocketHandler.metadataPush(any()))
@@ -182,7 +197,10 @@ protected void hookOnError(Throwable throwable) {
182197
0,
183198
FRAME_LENGTH_MASK,
184199
Integer.MAX_VALUE,
185-
__ -> null);
200+
__ -> null,
201+
otherGracefulShutdownSink,
202+
otherClosedSink,
203+
onGracefulShutdownStartedSink.asMono());
186204
}
187205

188206
@Test

rsocket-core/src/test/java/io/rsocket/core/RSocketRequesterSubscribersTest.java

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@
4444
import org.reactivestreams.Publisher;
4545
import reactor.core.publisher.Flux;
4646
import reactor.core.publisher.Mono;
47+
import reactor.core.publisher.Sinks;
4748
import reactor.test.util.RaceTestUtils;
4849

4950
class RSocketRequesterSubscribersTest {
@@ -60,11 +61,21 @@ class RSocketRequesterSubscribersTest {
6061
private LeaksTrackingByteBufAllocator allocator;
6162
private RSocket rSocketRequester;
6263
private TestDuplexConnection connection;
64+
protected Sinks.Empty<Void> onGracefulShutdownStartedSink;
65+
protected Sinks.Empty<Void> otherGracefulShutdownSink;
66+
protected Sinks.Empty<Void> thisGracefulShutdownSink;
67+
protected Sinks.Empty<Void> thisClosedSink;
68+
protected Sinks.Empty<Void> otherClosedSink;
6369

6470
@BeforeEach
6571
void setUp() {
6672
allocator = LeaksTrackingByteBufAllocator.instrument(ByteBufAllocator.DEFAULT);
6773
connection = new TestDuplexConnection(allocator);
74+
this.onGracefulShutdownStartedSink = Sinks.empty();
75+
this.otherGracefulShutdownSink = Sinks.empty();
76+
this.thisGracefulShutdownSink = Sinks.empty();
77+
this.thisClosedSink = Sinks.empty();
78+
this.otherClosedSink = Sinks.empty();
6879
rSocketRequester =
6980
new RSocketRequester(
7081
connection,
@@ -77,7 +88,12 @@ void setUp() {
7788
0,
7889
null,
7990
__ -> null,
80-
null);
91+
null,
92+
onGracefulShutdownStartedSink,
93+
thisGracefulShutdownSink,
94+
thisClosedSink,
95+
otherGracefulShutdownSink.asMono().and(thisGracefulShutdownSink.asMono()),
96+
otherClosedSink.asMono().and(thisClosedSink.asMono()));
8197
}
8298

8399
@ParameterizedTest

rsocket-core/src/test/java/io/rsocket/core/RSocketRequesterTest.java

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1453,8 +1453,20 @@ public void testWorkaround959(String type) {
14531453
}
14541454

14551455
public static class ClientSocketRule extends AbstractSocketRule<RSocketRequester> {
1456+
1457+
protected Sinks.Empty<Void> onGracefulShutdownStartedSink;
1458+
protected Sinks.Empty<Void> otherGracefulShutdownSink;
1459+
protected Sinks.Empty<Void> thisGracefulShutdownSink;
1460+
protected Sinks.Empty<Void> thisClosedSink;
1461+
protected Sinks.Empty<Void> otherClosedSink;
1462+
14561463
@Override
14571464
protected RSocketRequester newRSocket() {
1465+
this.onGracefulShutdownStartedSink = Sinks.empty();
1466+
this.otherGracefulShutdownSink = Sinks.empty();
1467+
this.thisGracefulShutdownSink = Sinks.empty();
1468+
this.thisClosedSink = Sinks.empty();
1469+
this.otherClosedSink = Sinks.empty();
14581470
return new RSocketRequester(
14591471
connection,
14601472
PayloadDecoder.ZERO_COPY,
@@ -1466,7 +1478,12 @@ protected RSocketRequester newRSocket() {
14661478
Integer.MAX_VALUE,
14671479
null,
14681480
(__) -> null,
1469-
null);
1481+
null,
1482+
onGracefulShutdownStartedSink,
1483+
thisGracefulShutdownSink,
1484+
thisClosedSink,
1485+
otherGracefulShutdownSink.asMono().and(thisGracefulShutdownSink.asMono()),
1486+
otherClosedSink.asMono().and(thisClosedSink.asMono()));
14701487
}
14711488

14721489
public int getStreamIdForRequestType(FrameType expectedFrameType) {

rsocket-core/src/test/java/io/rsocket/core/RSocketResponderTest.java

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1184,6 +1184,9 @@ public static class ServerSocketRule extends AbstractSocketRule<RSocketResponder
11841184
private RSocket acceptingSocket;
11851185
private volatile int prefetch;
11861186
private RequestInterceptor requestInterceptor;
1187+
protected Sinks.Empty<Void> onGracefulShutdownSink;
1188+
protected Sinks.Empty<Void> onCloseSink;
1189+
protected Sinks.Empty<Void> onGracefulShutdownStartedSink;
11871190

11881191
@Override
11891192
protected void doInit() {
@@ -1220,6 +1223,9 @@ public void setAcceptingSocket(RSocket acceptingSocket, int prefetch) {
12201223

12211224
@Override
12221225
protected RSocketResponder newRSocket() {
1226+
onGracefulShutdownSink = Sinks.empty();
1227+
onCloseSink = Sinks.empty();
1228+
onGracefulShutdownStartedSink = Sinks.empty();
12231229
return new RSocketResponder(
12241230
connection,
12251231
acceptingSocket,
@@ -1228,7 +1234,10 @@ protected RSocketResponder newRSocket() {
12281234
0,
12291235
maxFrameLength,
12301236
maxInboundPayloadSize,
1231-
__ -> requestInterceptor);
1237+
__ -> requestInterceptor,
1238+
onGracefulShutdownSink,
1239+
onCloseSink,
1240+
onGracefulShutdownStartedSink.asMono());
12321241
}
12331242

12341243
private void sendRequest(int streamId, FrameType frameType) {

rsocket-core/src/test/java/io/rsocket/core/RSocketTest.java

Lines changed: 21 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -509,6 +509,11 @@ public static class SocketRule {
509509
private RSocket requestAcceptor;
510510

511511
private LeaksTrackingByteBufAllocator allocator;
512+
protected Sinks.Empty<Void> onGracefulShutdownStartedSink;
513+
protected Sinks.Empty<Void> otherGracefulShutdownSink;
514+
protected Sinks.Empty<Void> thisGracefulShutdownSink;
515+
protected Sinks.Empty<Void> thisClosedSink;
516+
protected Sinks.Empty<Void> otherClosedSink;
512517

513518
public LeaksTrackingByteBufAllocator alloc() {
514519
return allocator;
@@ -519,6 +524,12 @@ public void init() {
519524
serverProcessor = Sinks.many().multicast().directBestEffort();
520525
clientProcessor = Sinks.many().multicast().directBestEffort();
521526

527+
this.onGracefulShutdownStartedSink = Sinks.empty();
528+
this.otherGracefulShutdownSink = Sinks.empty();
529+
this.thisGracefulShutdownSink = Sinks.empty();
530+
this.thisClosedSink = Sinks.empty();
531+
this.otherClosedSink = Sinks.empty();
532+
522533
LocalDuplexConnection serverConnection =
523534
new LocalDuplexConnection("server", allocator, clientProcessor, serverProcessor);
524535
LocalDuplexConnection clientConnection =
@@ -566,7 +577,10 @@ public Flux<Payload> requestChannel(Publisher<Payload> payloads) {
566577
0,
567578
FRAME_LENGTH_MASK,
568579
Integer.MAX_VALUE,
569-
__ -> null);
580+
__ -> null,
581+
otherGracefulShutdownSink,
582+
otherClosedSink,
583+
onGracefulShutdownStartedSink.asMono());
570584

571585
crs =
572586
new RSocketRequester(
@@ -580,7 +594,12 @@ public Flux<Payload> requestChannel(Publisher<Payload> payloads) {
580594
0,
581595
null,
582596
__ -> null,
583-
null);
597+
null,
598+
onGracefulShutdownStartedSink,
599+
thisGracefulShutdownSink,
600+
thisClosedSink,
601+
otherGracefulShutdownSink.asMono().and(thisGracefulShutdownSink.asMono()),
602+
otherClosedSink.asMono().and(thisClosedSink.asMono()));
584603
}
585604

586605
public void setRequestAcceptor(RSocket requestAcceptor) {

rsocket-core/src/test/java/io/rsocket/core/SetupRejectionTest.java

Lines changed: 19 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,10 @@ void requesterStreamsTerminatedOnZeroErrorFrame() {
6969
LeaksTrackingByteBufAllocator allocator =
7070
LeaksTrackingByteBufAllocator.instrument(ByteBufAllocator.DEFAULT);
7171
TestDuplexConnection conn = new TestDuplexConnection(allocator);
72+
Sinks.Empty<Void> onGracefulShutdownStartedSink = Sinks.empty();
73+
Sinks.Empty<Void> onGracefulShutdownSink = Sinks.empty();
74+
Sinks.Empty<Void> onThisSideClosedSink = Sinks.empty();
75+
7276
RSocketRequester rSocket =
7377
new RSocketRequester(
7478
conn,
@@ -81,7 +85,12 @@ void requesterStreamsTerminatedOnZeroErrorFrame() {
8185
0,
8286
null,
8387
__ -> null,
84-
null);
88+
null,
89+
onGracefulShutdownStartedSink,
90+
onGracefulShutdownSink,
91+
onThisSideClosedSink,
92+
onGracefulShutdownSink.asMono(),
93+
onThisSideClosedSink.asMono());
8594

8695
String errorMsg = "error";
8796

@@ -107,6 +116,9 @@ void requesterNewStreamsTerminatedAfterZeroErrorFrame() {
107116
LeaksTrackingByteBufAllocator allocator =
108117
LeaksTrackingByteBufAllocator.instrument(ByteBufAllocator.DEFAULT);
109118
TestDuplexConnection conn = new TestDuplexConnection(allocator);
119+
Sinks.Empty<Void> onGracefulShutdownStartedSink = Sinks.empty();
120+
Sinks.Empty<Void> onGracefulShutdownSink = Sinks.empty();
121+
Sinks.Empty<Void> onThisSideClosedSink = Sinks.empty();
110122
RSocketRequester rSocket =
111123
new RSocketRequester(
112124
conn,
@@ -119,7 +131,12 @@ void requesterNewStreamsTerminatedAfterZeroErrorFrame() {
119131
0,
120132
null,
121133
__ -> null,
122-
null);
134+
null,
135+
onGracefulShutdownStartedSink,
136+
onGracefulShutdownSink,
137+
onThisSideClosedSink,
138+
onGracefulShutdownSink.asMono(),
139+
onThisSideClosedSink.asMono());
123140

124141
conn.addToReceivedBuffer(
125142
ErrorFrameCodec.encode(ByteBufAllocator.DEFAULT, 0, new RejectedSetupException("error")));

0 commit comments

Comments
 (0)