Skip to content

Commit e61b03c

Browse files
committed
netty: Fix getAttributes() data races in NettyClientTransportTest
Since approximately the LBv2 API (the current API) was introduced, gRPC won't use a transport until it is ready. Long ago, transports could be used before they were ready and these old tests were not waiting for the negotiator to complete before starting. We need them to wait for the handshake to complete to avoid a test-only data race in getAttributes() noticed by TSAN. Throwing away data frames in the Noop handshaker is necessary to act like a normal handshaker; they don't allow data frames to pass until the handshake is complete. Without the handling, it goes through invalid code paths in NettyClientHandler where a terminated transport becomes ready, and a similar data race. ``` Write of size 4 at 0x00008db31e2c by thread T37: #0 io.grpc.netty.NettyClientHandler.handleProtocolNegotiationCompleted(Lio/grpc/Attributes;Lio/grpc/InternalChannelz$Security;)V NettyClientHandler.java:517 #1 io.grpc.netty.ProtocolNegotiators$GrpcNegotiationHandler.userEventTriggered(Lio/netty/channel/ChannelHandlerContext;Ljava/lang/Object;)V ProtocolNegotiators.java:937 #2 io.netty.channel.AbstractChannelHandlerContext.invokeUserEventTriggered(Ljava/lang/Object;)V AbstractChannelHandlerContext.java:398 #3 io.netty.channel.AbstractChannelHandlerContext.invokeUserEventTriggered(Lio/netty/channel/AbstractChannelHandlerContext;Ljava/lang/Object;)V AbstractChannelHandlerContext.java:376 #4 io.netty.channel.AbstractChannelHandlerContext.fireUserEventTriggered(Ljava/lang/Object;)Lio/netty/channel/ChannelHandlerContext; AbstractChannelHandlerContext.java:368 grpc#5 io.grpc.netty.ProtocolNegotiators$ProtocolNegotiationHandler.fireProtocolNegotiationEvent(Lio/netty/channel/ChannelHandlerContext;)V ProtocolNegotiators.java:1107 grpc#6 io.grpc.netty.ProtocolNegotiators$WaitUntilActiveHandler.channelActive(Lio/netty/channel/ChannelHandlerContext;)V ProtocolNegotiators.java:1011 ... Previous read of size 4 at 0x00008db31e2c by thread T4 (mutexes: write M0, write M1, write M2, write M3): #0 io.grpc.netty.NettyClientHandler.getAttributes()Lio/grpc/Attributes; NettyClientHandler.java:345 #1 io.grpc.netty.NettyClientTransport.getAttributes()Lio/grpc/Attributes; NettyClientTransport.java:387 #2 io.grpc.netty.NettyClientTransport.newStream(Lio/grpc/MethodDescriptor;Lio/grpc/Metadata;Lio/grpc/CallOptions;[Lio/grpc/ClientStreamTracer;)Lio/grpc/internal/ClientStream; NettyClientTransport.java:198 #3 io.grpc.netty.NettyClientTransportTest$Rpc.<init>(Lio/grpc/netty/NettyClientTransport;Lio/grpc/Metadata;)V NettyClientTransportTest.java:953 #4 io.grpc.netty.NettyClientTransportTest.huffmanCodingShouldNotBePerformed()V NettyClientTransportTest.java:631 ... ``` ``` Read of size 4 at 0x00008f983a3c by thread T4 (mutexes: write M0, write M1): #0 io.grpc.netty.NettyClientHandler.getAttributes()Lio/grpc/Attributes; NettyClientHandler.java:345 #1 io.grpc.netty.NettyClientTransport.getAttributes()Lio/grpc/Attributes; NettyClientTransport.java:387 #2 io.grpc.netty.NettyClientTransport.newStream(Lio/grpc/MethodDescriptor;Lio/grpc/Metadata;Lio/grpc/CallOptions;[Lio/grpc/ClientStreamTracer;)Lio/grpc/internal/ClientStream; NettyClientTransport.java:198 #3 io.grpc.netty.NettyClientTransportTest$Rpc.<init>(Lio/grpc/netty/NettyClientTransport;Lio/grpc/Metadata;)V NettyClientTransportTest.java:973 #4 io.grpc.netty.NettyClientTransportTest$Rpc.<init>(Lio/grpc/netty/NettyClientTransport;)V NettyClientTransportTest.java:969 grpc#5 io.grpc.netty.NettyClientTransportTest.handlerExceptionDuringNegotiatonPropagatesToStatus()V NettyClientTransportTest.java:425 ... Previous write of size 4 at 0x00008f983a3c by thread T56: #0 io.grpc.netty.NettyClientHandler$FrameListener.onSettingsRead(Lio/netty/channel/ChannelHandlerContext;Lio/netty/handler/codec/http2/Http2Settings;)V NettyClientHandler.java:960 ... ```
1 parent ae10972 commit e61b03c

File tree

1 file changed

+26
-1
lines changed

1 file changed

+26
-1
lines changed

netty/src/test/java/io/grpc/netty/NettyClientTransportTest.java

Lines changed: 26 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,8 @@
3737
import static org.junit.Assert.assertTrue;
3838
import static org.junit.Assert.fail;
3939
import static org.mockito.ArgumentMatchers.any;
40+
import static org.mockito.Mockito.timeout;
41+
import static org.mockito.Mockito.verify;
4042
import static org.mockito.Mockito.when;
4143

4244
import com.google.common.base.Optional;
@@ -95,6 +97,7 @@
9597
import io.netty.handler.ssl.ClientAuth;
9698
import io.netty.handler.ssl.SslContext;
9799
import io.netty.util.AsciiString;
100+
import io.netty.util.ReferenceCountUtil;
98101
import java.io.ByteArrayInputStream;
99102
import java.io.IOException;
100103
import java.io.InputStream;
@@ -187,6 +190,7 @@ public void addDefaultUserAgent() throws Exception {
187190
startServer();
188191
NettyClientTransport transport = newTransport(newNegotiator());
189192
callMeMaybe(transport.start(clientTransportListener));
193+
verify(clientTransportListener, timeout(5000)).transportReady();
190194

191195
// Send a single RPC and wait for the response.
192196
new Rpc(transport).halfClose().waitForResponse();
@@ -244,6 +248,7 @@ public void overrideDefaultUserAgent() throws Exception {
244248
NettyClientTransport transport = newTransport(newNegotiator(),
245249
DEFAULT_MAX_MESSAGE_SIZE, GrpcUtil.DEFAULT_MAX_HEADER_LIST_SIZE, "testUserAgent", true);
246250
callMeMaybe(transport.start(clientTransportListener));
251+
verify(clientTransportListener, timeout(5000)).transportReady();
247252

248253
new Rpc(transport, new Metadata()).halfClose().waitForResponse();
249254

@@ -261,6 +266,7 @@ public void maxMessageSizeShouldBeEnforced() throws Throwable {
261266
NettyClientTransport transport = newTransport(newNegotiator(),
262267
1, GrpcUtil.DEFAULT_MAX_HEADER_LIST_SIZE, null, true);
263268
callMeMaybe(transport.start(clientTransportListener));
269+
verify(clientTransportListener, timeout(5000)).transportReady();
264270

265271
try {
266272
// Send a single RPC and wait for the response.
@@ -287,6 +293,7 @@ public void creatingMultipleTlsTransportsShouldSucceed() throws Exception {
287293
NettyClientTransport transport = newTransport(negotiator);
288294
callMeMaybe(transport.start(clientTransportListener));
289295
}
296+
verify(clientTransportListener, timeout(5000).times(2)).transportReady();
290297

291298
// Send a single RPC on each transport.
292299
final List<Rpc> rpcs = new ArrayList<>(transports.size());
@@ -316,6 +323,7 @@ public void run() {
316323
failureStatus.asRuntimeException());
317324
}
318325
});
326+
verify(clientTransportListener, timeout(5000)).transportTerminated();
319327

320328
Rpc rpc = new Rpc(transport).halfClose();
321329
try {
@@ -349,6 +357,7 @@ public void tlsNegotiationFailurePropagatesToStatus() throws Exception {
349357
ProtocolNegotiator negotiator = ProtocolNegotiators.tls(clientContext);
350358
final NettyClientTransport transport = newTransport(negotiator);
351359
callMeMaybe(transport.start(clientTransportListener));
360+
verify(clientTransportListener, timeout(5000)).transportTerminated();
352361

353362
Rpc rpc = new Rpc(transport).halfClose();
354363
try {
@@ -378,6 +387,7 @@ public void channelExceptionDuringNegotiatonPropagatesToStatus() throws Exceptio
378387
callMeMaybe(transport.start(clientTransportListener));
379388
final Status failureStatus = Status.UNAVAILABLE.withDescription("oh noes!");
380389
transport.channel().pipeline().fireExceptionCaught(failureStatus.asRuntimeException());
390+
verify(clientTransportListener, timeout(5000)).transportTerminated();
381391

382392
Rpc rpc = new Rpc(transport).halfClose();
383393
try {
@@ -409,6 +419,7 @@ public void run() {
409419
}
410420
}
411421
});
422+
verify(clientTransportListener, timeout(5000)).transportTerminated();
412423

413424
Rpc rpc = new Rpc(transport).halfClose();
414425
try {
@@ -428,6 +439,7 @@ public void bufferedStreamsShouldBeClosedWhenConnectionTerminates() throws Excep
428439

429440
NettyClientTransport transport = newTransport(newNegotiator());
430441
callMeMaybe(transport.start(clientTransportListener));
442+
verify(clientTransportListener, timeout(5000)).transportReady();
431443

432444
// Send a dummy RPC in order to ensure that the updated SETTINGS_MAX_CONCURRENT_STREAMS
433445
// has been received by the remote endpoint.
@@ -579,6 +591,7 @@ public void maxHeaderListSizeShouldBeEnforcedOnClient() throws Exception {
579591
NettyClientTransport transport =
580592
newTransport(newNegotiator(), DEFAULT_MAX_MESSAGE_SIZE, 1, null, true);
581593
callMeMaybe(transport.start(clientTransportListener));
594+
verify(clientTransportListener, timeout(5000)).transportReady();
582595

583596
try {
584597
// Send a single RPC and wait for the response.
@@ -612,6 +625,7 @@ public void huffmanCodingShouldNotBePerformed() throws Exception {
612625
longStringOfA);
613626

614627
callMeMaybe(transport.start(clientTransportListener));
628+
verify(clientTransportListener, timeout(5000)).transportReady();
615629

616630
AtomicBoolean foundExpectedHeaderBytes = new AtomicBoolean(false);
617631

@@ -641,6 +655,7 @@ public void maxHeaderListSizeShouldBeEnforcedOnServer() throws Exception {
641655

642656
NettyClientTransport transport = newTransport(newNegotiator());
643657
callMeMaybe(transport.start(clientTransportListener));
658+
verify(clientTransportListener, timeout(5000)).transportReady();
644659

645660
try {
646661
// Send a single RPC and wait for the response.
@@ -685,6 +700,7 @@ public void clientStreamGetsAttributes() throws Exception {
685700
startServer();
686701
NettyClientTransport transport = newTransport(newNegotiator());
687702
callMeMaybe(transport.start(clientTransportListener));
703+
verify(clientTransportListener, timeout(5000)).transportReady();
688704
Rpc rpc = new Rpc(transport).halfClose();
689705
rpc.waitForResponse();
690706

@@ -703,6 +719,7 @@ public void keepAliveEnabled() throws Exception {
703719
NettyClientTransport transport = newTransport(newNegotiator(), DEFAULT_MAX_MESSAGE_SIZE,
704720
GrpcUtil.DEFAULT_MAX_HEADER_LIST_SIZE, null /* user agent */, true /* keep alive */);
705721
callMeMaybe(transport.start(clientTransportListener));
722+
verify(clientTransportListener, timeout(5000)).transportReady();
706723
Rpc rpc = new Rpc(transport).halfClose();
707724
rpc.waitForResponse();
708725

@@ -715,6 +732,7 @@ public void keepAliveDisabled() throws Exception {
715732
NettyClientTransport transport = newTransport(newNegotiator(), DEFAULT_MAX_MESSAGE_SIZE,
716733
GrpcUtil.DEFAULT_MAX_HEADER_LIST_SIZE, null /* user agent */, false /* keep alive */);
717734
callMeMaybe(transport.start(clientTransportListener));
735+
verify(clientTransportListener, timeout(5000)).transportReady();
718736
Rpc rpc = new Rpc(transport).halfClose();
719737
rpc.waitForResponse();
720738

@@ -808,6 +826,7 @@ public void tlsNegotiationServerExecutorShouldSucceed() throws Exception {
808826
assertEquals(true, clientExecutorPool.isInUse());
809827
final NettyClientTransport transport = newTransport(negotiator);
810828
callMeMaybe(transport.start(clientTransportListener));
829+
verify(clientTransportListener, timeout(5000)).transportReady();
811830
Rpc rpc = new Rpc(transport).halfClose();
812831
rpc.waitForResponse();
813832
// closing the negotiators should return the executors back to pool, and release the resource
@@ -1098,9 +1117,15 @@ public NoopHandler(GrpcHttp2ConnectionHandler grpcHandler) {
10981117
this.grpcHandler = grpcHandler;
10991118
}
11001119

1120+
@Override
1121+
public void channelRead(ChannelHandlerContext ctx, Object msg) {
1122+
// Prevent any data being passed to NettyClientHandler
1123+
ReferenceCountUtil.release(msg);
1124+
}
1125+
11011126
@Override
11021127
public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
1103-
ctx.pipeline().addBefore(ctx.name(), null, grpcHandler);
1128+
ctx.pipeline().addAfter(ctx.name(), null, grpcHandler);
11041129
}
11051130

11061131
public void fail(ChannelHandlerContext ctx, Throwable cause) {

0 commit comments

Comments
 (0)