diff --git a/cronet/src/main/java/io/grpc/cronet/CronetChannelBuilder.java b/cronet/src/main/java/io/grpc/cronet/CronetChannelBuilder.java index f42dabdd55a..e37d4b36d06 100644 --- a/cronet/src/main/java/io/grpc/cronet/CronetChannelBuilder.java +++ b/cronet/src/main/java/io/grpc/cronet/CronetChannelBuilder.java @@ -54,6 +54,8 @@ @ExperimentalApi("There is no plan to make this API stable, given transport API instability") public final class CronetChannelBuilder extends ForwardingChannelBuilder2 { + private static final int DEFAULT_READ_BUFFER_SIZE = 4 * 1024; + /** BidirectionalStream.Builder factory used for getting the gRPC BidirectionalStream. */ public static abstract class StreamBuilderFactory { public abstract BidirectionalStream.Builder newBidirectionalStreamBuilder( @@ -109,6 +111,7 @@ public static CronetChannelBuilder forAddress(String name, int port) { private boolean trafficStatsUidSet; private int trafficStatsUid; private Network network; + private int readBufferSize = DEFAULT_READ_BUFFER_SIZE; private CronetChannelBuilder(String host, int port, CronetEngine cronetEngine) { final class CronetChannelTransportFactoryBuilder implements ClientTransportFactoryBuilder { @@ -142,6 +145,15 @@ public CronetChannelBuilder maxMessageSize(int maxMessageSize) { return this; } + /** + * Sets the buffer size to read from the network. Default to {@link #DEFAULT_READ_BUFFER_SIZE}. + */ + public CronetChannelBuilder readBufferSize(int readBufferSize) { + checkArgument(readBufferSize >= 0, "readBufferSize must be >= 0"); + this.readBufferSize = readBufferSize; + return this; + } + /** * Sets the Cronet channel to always use PUT instead of POST. Defaults to false. */ @@ -233,7 +245,8 @@ ClientTransportFactory buildTransportFactory() { alwaysUsePut, transportTracerFactory.create(), useGetForSafeMethods, - usePutForIdempotentMethods); + usePutForIdempotentMethods, + readBufferSize); } @VisibleForTesting @@ -247,6 +260,7 @@ static class CronetTransportFactory implements ClientTransportFactory { private final boolean usingSharedScheduler; private final boolean useGetForSafeMethods; private final boolean usePutForIdempotentMethods; + private final int readBufferSize; private CronetTransportFactory( StreamBuilderFactory streamFactory, @@ -256,7 +270,8 @@ private CronetTransportFactory( boolean alwaysUsePut, TransportTracer transportTracer, boolean useGetForSafeMethods, - boolean usePutForIdempotentMethods) { + boolean usePutForIdempotentMethods, + int readBufferSize) { usingSharedScheduler = timeoutService == null; this.timeoutService = usingSharedScheduler ? SharedResourceHolder.get(GrpcUtil.TIMER_SERVICE) : timeoutService; @@ -267,6 +282,7 @@ private CronetTransportFactory( this.transportTracer = Preconditions.checkNotNull(transportTracer, "transportTracer"); this.useGetForSafeMethods = useGetForSafeMethods; this.usePutForIdempotentMethods = usePutForIdempotentMethods; + this.readBufferSize = readBufferSize; } @Override @@ -275,7 +291,8 @@ public ConnectionClientTransport newClientTransport( InetSocketAddress inetSocketAddr = (InetSocketAddress) addr; return new CronetClientTransport(streamFactory, inetSocketAddr, options.getAuthority(), options.getUserAgent(), options.getEagAttributes(), executor, maxMessageSize, - alwaysUsePut, transportTracer, useGetForSafeMethods, usePutForIdempotentMethods); + alwaysUsePut, transportTracer, useGetForSafeMethods, usePutForIdempotentMethods, + readBufferSize); } @Override diff --git a/cronet/src/main/java/io/grpc/cronet/CronetClientStream.java b/cronet/src/main/java/io/grpc/cronet/CronetClientStream.java index fcba49a7ae1..1d418d871ca 100644 --- a/cronet/src/main/java/io/grpc/cronet/CronetClientStream.java +++ b/cronet/src/main/java/io/grpc/cronet/CronetClientStream.java @@ -59,7 +59,6 @@ * Client stream for the cronet transport. */ class CronetClientStream extends AbstractClientStream { - private static final int READ_BUFFER_CAPACITY = 4 * 1024; private static final ByteBuffer EMPTY_BUFFER = ByteBuffer.allocateDirect(0); private static final String LOG_TAG = "grpc-java-cronet"; @@ -85,6 +84,7 @@ class CronetClientStream extends AbstractClientStream { private final Collection annotations; private final TransportState state; private final Sink sink = new Sink(); + private final int readBufferSize; private StreamBuilderFactory streamFactory; CronetClientStream( @@ -102,7 +102,8 @@ class CronetClientStream extends AbstractClientStream { CallOptions callOptions, TransportTracer transportTracer, boolean useGetForSafeMethods, - boolean usePutForIdempotentMethods) { + boolean usePutForIdempotentMethods, + int readBufferSize) { super( new CronetWritableBufferAllocator(), statsTraceCtx, transportTracer, headers, callOptions, useGetForSafeMethods && method.isSafe()); @@ -120,6 +121,7 @@ class CronetClientStream extends AbstractClientStream { this.annotations = callOptions.getOption(CRONET_ANNOTATIONS_KEY); this.state = new TransportState(maxMessageSize, statsTraceCtx, lock, transportTracer, callOptions); + this.readBufferSize = readBufferSize; // Tests expect the "plain" deframer behavior, not MigratingDeframer // https://github.com/grpc/grpc-java/issues/7140 @@ -309,7 +311,7 @@ public void bytesRead(int processedBytes) { if (Log.isLoggable(LOG_TAG, Log.VERBOSE)) { Log.v(LOG_TAG, "BidirectionalStream.read"); } - stream.read(ByteBuffer.allocateDirect(READ_BUFFER_CAPACITY)); + stream.read(ByteBuffer.allocateDirect(readBufferSize)); } } @@ -429,7 +431,7 @@ public void onResponseHeadersReceived(BidirectionalStream stream, UrlResponseInf Log.v(LOG_TAG, "BidirectionalStream.read"); } reportHeaders(info.getAllHeadersAsList(), false); - stream.read(ByteBuffer.allocateDirect(READ_BUFFER_CAPACITY)); + stream.read(ByteBuffer.allocateDirect(readBufferSize)); } @Override diff --git a/cronet/src/main/java/io/grpc/cronet/CronetClientTransport.java b/cronet/src/main/java/io/grpc/cronet/CronetClientTransport.java index 465df8b2cc9..0be0117c32a 100644 --- a/cronet/src/main/java/io/grpc/cronet/CronetClientTransport.java +++ b/cronet/src/main/java/io/grpc/cronet/CronetClientTransport.java @@ -65,6 +65,7 @@ class CronetClientTransport implements ConnectionClientTransport { private final boolean useGetForSafeMethods; private final boolean usePutForIdempotentMethods; private final StreamBuilderFactory streamFactory; + private final int readBufferSize; // Indicates the transport is in go-away state: no new streams will be processed, // but existing streams may continue. @GuardedBy("lock") @@ -92,7 +93,8 @@ class CronetClientTransport implements ConnectionClientTransport { boolean alwaysUsePut, TransportTracer transportTracer, boolean useGetForSafeMethods, - boolean usePutForIdempotentMethods) { + boolean usePutForIdempotentMethods, + int readBufferSize) { this.address = Preconditions.checkNotNull(address, "address"); this.logId = InternalLogId.allocate(getClass(), address.toString()); this.authority = authority; @@ -108,6 +110,7 @@ class CronetClientTransport implements ConnectionClientTransport { .build(); this.useGetForSafeMethods = useGetForSafeMethods; this.usePutForIdempotentMethods = usePutForIdempotentMethods; + this.readBufferSize = readBufferSize; } @Override @@ -132,7 +135,7 @@ class StartCallback implements Runnable { final CronetClientStream clientStream = new CronetClientStream( url, userAgent, executor, headers, CronetClientTransport.this, this, lock, maxMessageSize, alwaysUsePut, method, statsTraceCtx, callOptions, transportTracer, useGetForSafeMethods, - usePutForIdempotentMethods); + usePutForIdempotentMethods, readBufferSize); @Override public void run() { diff --git a/cronet/src/test/java/io/grpc/cronet/CronetClientStreamTest.java b/cronet/src/test/java/io/grpc/cronet/CronetClientStreamTest.java index e2b0e0b26ca..32e391dfa9f 100644 --- a/cronet/src/test/java/io/grpc/cronet/CronetClientStreamTest.java +++ b/cronet/src/test/java/io/grpc/cronet/CronetClientStreamTest.java @@ -127,7 +127,8 @@ public void setUp() { CallOptions.DEFAULT, transportTracer, false, - false); + false, + 4 * 1024); callback.setStream(clientStream); when(factory.newBidirectionalStreamBuilder( any(String.class), any(BidirectionalStream.Callback.class), any(Executor.class))) @@ -591,7 +592,8 @@ public void addCronetRequestAnnotation_deprecated() { CallOptions.DEFAULT.withOption(CronetClientStream.CRONET_ANNOTATION_KEY, annotation), transportTracer, false, - false); + false, + 4 * 1024); callback.setStream(stream); when(factory.newBidirectionalStreamBuilder( any(String.class), any(BidirectionalStream.Callback.class), any(Executor.class))) @@ -626,7 +628,8 @@ public void withAnnotation() { callOptions, transportTracer, false, - false); + false, + 4 * 1024); callback.setStream(stream); when(factory.newBidirectionalStreamBuilder( any(String.class), any(BidirectionalStream.Callback.class), any(Executor.class))) @@ -666,7 +669,8 @@ public void getUnaryRequest() { CallOptions.DEFAULT, transportTracer, true, - false); + false, + 4 * 1024); callback.setStream(stream); BidirectionalStream.Builder getBuilder = mock(BidirectionalStream.Builder.class); @@ -723,7 +727,8 @@ public void idempotentMethod_usesHttpPut() { CallOptions.DEFAULT, transportTracer, true, - true); + true, + 4 * 1024); callback.setStream(stream); BidirectionalStream.Builder builder = mock(BidirectionalStream.Builder.class); @@ -755,7 +760,8 @@ public void alwaysUsePutOption_usesHttpPut() { CallOptions.DEFAULT, transportTracer, true, - true); + true, + 4 * 1024); callback.setStream(stream); BidirectionalStream.Builder builder = mock(BidirectionalStream.Builder.class); @@ -795,7 +801,8 @@ public void reservedHeadersStripped() { CallOptions.DEFAULT, transportTracer, false, - false); + false, + 4 * 1024); callback.setStream(stream); BidirectionalStream.Builder builder = mock(BidirectionalStream.Builder.class); diff --git a/cronet/src/test/java/io/grpc/cronet/CronetClientTransportTest.java b/cronet/src/test/java/io/grpc/cronet/CronetClientTransportTest.java index 03c31f93329..cffda6361db 100644 --- a/cronet/src/test/java/io/grpc/cronet/CronetClientTransportTest.java +++ b/cronet/src/test/java/io/grpc/cronet/CronetClientTransportTest.java @@ -89,7 +89,8 @@ public void setUp() { false, TransportTracer.getDefaultFactory().create(), false, - false); + false, + 4 * 1024); Runnable callback = transport.start(clientTransportListener); assertNotNull(callback); callback.run();