Skip to content

cronet: Add API in channel builder for specifying buffer size to use to read from the transport #12214

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 14 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 20 additions & 3 deletions cronet/src/main/java/io/grpc/cronet/CronetChannelBuilder.java
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,8 @@
@ExperimentalApi("There is no plan to make this API stable, given transport API instability")
public final class CronetChannelBuilder extends ForwardingChannelBuilder2<CronetChannelBuilder> {

private static final int DEFAULT_READ_BUFFER_SIZE = 4 * 1024;

/** BidirectionalStream.Builder factory used for getting the gRPC BidirectionalStream. */
public static abstract class StreamBuilderFactory {
public abstract BidirectionalStream.Builder newBidirectionalStreamBuilder(
Expand Down Expand Up @@ -109,6 +111,7 @@ public static CronetChannelBuilder forAddress(String name, int port) {
private boolean trafficStatsUidSet;
private int trafficStatsUid;
private Network network;
private int readBufferSize = DEFAULT_READ_BUFFER_SIZE;

private CronetChannelBuilder(String host, int port, CronetEngine cronetEngine) {
final class CronetChannelTransportFactoryBuilder implements ClientTransportFactoryBuilder {
Expand Down Expand Up @@ -142,6 +145,15 @@ public CronetChannelBuilder maxMessageSize(int maxMessageSize) {
return this;
}

/**
* Sets the buffer size to read from the network. Default to {@link #DEFAULT_READ_BUFFER_SIZE}.
*/
public CronetChannelBuilder readBufferSize(int readBufferSize) {
checkArgument(readBufferSize >= 0, "readBufferSize must be >= 0");
this.readBufferSize = readBufferSize;
return this;
}

/**
* Sets the Cronet channel to always use PUT instead of POST. Defaults to false.
*/
Expand Down Expand Up @@ -233,7 +245,8 @@ ClientTransportFactory buildTransportFactory() {
alwaysUsePut,
transportTracerFactory.create(),
useGetForSafeMethods,
usePutForIdempotentMethods);
usePutForIdempotentMethods,
readBufferSize);
}

@VisibleForTesting
Expand All @@ -247,6 +260,7 @@ static class CronetTransportFactory implements ClientTransportFactory {
private final boolean usingSharedScheduler;
private final boolean useGetForSafeMethods;
private final boolean usePutForIdempotentMethods;
private final int readBufferSize;

private CronetTransportFactory(
StreamBuilderFactory streamFactory,
Expand All @@ -256,7 +270,8 @@ private CronetTransportFactory(
boolean alwaysUsePut,
TransportTracer transportTracer,
boolean useGetForSafeMethods,
boolean usePutForIdempotentMethods) {
boolean usePutForIdempotentMethods,
int readBufferSize) {
usingSharedScheduler = timeoutService == null;
this.timeoutService = usingSharedScheduler
? SharedResourceHolder.get(GrpcUtil.TIMER_SERVICE) : timeoutService;
Expand All @@ -267,6 +282,7 @@ private CronetTransportFactory(
this.transportTracer = Preconditions.checkNotNull(transportTracer, "transportTracer");
this.useGetForSafeMethods = useGetForSafeMethods;
this.usePutForIdempotentMethods = usePutForIdempotentMethods;
this.readBufferSize = readBufferSize;
}

@Override
Expand All @@ -275,7 +291,8 @@ public ConnectionClientTransport newClientTransport(
InetSocketAddress inetSocketAddr = (InetSocketAddress) addr;
return new CronetClientTransport(streamFactory, inetSocketAddr, options.getAuthority(),
options.getUserAgent(), options.getEagAttributes(), executor, maxMessageSize,
alwaysUsePut, transportTracer, useGetForSafeMethods, usePutForIdempotentMethods);
alwaysUsePut, transportTracer, useGetForSafeMethods, usePutForIdempotentMethods,
readBufferSize);
}

@Override
Expand Down
10 changes: 6 additions & 4 deletions cronet/src/main/java/io/grpc/cronet/CronetClientStream.java
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,6 @@
* Client stream for the cronet transport.
*/
class CronetClientStream extends AbstractClientStream {
private static final int READ_BUFFER_CAPACITY = 4 * 1024;
private static final ByteBuffer EMPTY_BUFFER = ByteBuffer.allocateDirect(0);
private static final String LOG_TAG = "grpc-java-cronet";

Expand All @@ -85,6 +84,7 @@ class CronetClientStream extends AbstractClientStream {
private final Collection<Object> annotations;
private final TransportState state;
private final Sink sink = new Sink();
private final int readBufferSize;
private StreamBuilderFactory streamFactory;

CronetClientStream(
Expand All @@ -102,7 +102,8 @@ class CronetClientStream extends AbstractClientStream {
CallOptions callOptions,
TransportTracer transportTracer,
boolean useGetForSafeMethods,
boolean usePutForIdempotentMethods) {
boolean usePutForIdempotentMethods,
int readBufferSize) {
super(
new CronetWritableBufferAllocator(), statsTraceCtx, transportTracer, headers, callOptions,
useGetForSafeMethods && method.isSafe());
Expand All @@ -120,6 +121,7 @@ class CronetClientStream extends AbstractClientStream {
this.annotations = callOptions.getOption(CRONET_ANNOTATIONS_KEY);
this.state = new TransportState(maxMessageSize, statsTraceCtx, lock, transportTracer,
callOptions);
this.readBufferSize = readBufferSize;

// Tests expect the "plain" deframer behavior, not MigratingDeframer
// https://github.com/grpc/grpc-java/issues/7140
Expand Down Expand Up @@ -309,7 +311,7 @@ public void bytesRead(int processedBytes) {
if (Log.isLoggable(LOG_TAG, Log.VERBOSE)) {
Log.v(LOG_TAG, "BidirectionalStream.read");
}
stream.read(ByteBuffer.allocateDirect(READ_BUFFER_CAPACITY));
stream.read(ByteBuffer.allocateDirect(readBufferSize));
}
}

Expand Down Expand Up @@ -429,7 +431,7 @@ public void onResponseHeadersReceived(BidirectionalStream stream, UrlResponseInf
Log.v(LOG_TAG, "BidirectionalStream.read");
}
reportHeaders(info.getAllHeadersAsList(), false);
stream.read(ByteBuffer.allocateDirect(READ_BUFFER_CAPACITY));
stream.read(ByteBuffer.allocateDirect(readBufferSize));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ class CronetClientTransport implements ConnectionClientTransport {
private final boolean useGetForSafeMethods;
private final boolean usePutForIdempotentMethods;
private final StreamBuilderFactory streamFactory;
private final int readBufferSize;
// Indicates the transport is in go-away state: no new streams will be processed,
// but existing streams may continue.
@GuardedBy("lock")
Expand Down Expand Up @@ -92,7 +93,8 @@ class CronetClientTransport implements ConnectionClientTransport {
boolean alwaysUsePut,
TransportTracer transportTracer,
boolean useGetForSafeMethods,
boolean usePutForIdempotentMethods) {
boolean usePutForIdempotentMethods,
int readBufferSize) {
this.address = Preconditions.checkNotNull(address, "address");
this.logId = InternalLogId.allocate(getClass(), address.toString());
this.authority = authority;
Expand All @@ -108,6 +110,7 @@ class CronetClientTransport implements ConnectionClientTransport {
.build();
this.useGetForSafeMethods = useGetForSafeMethods;
this.usePutForIdempotentMethods = usePutForIdempotentMethods;
this.readBufferSize = readBufferSize;
}

@Override
Expand All @@ -132,7 +135,7 @@ class StartCallback implements Runnable {
final CronetClientStream clientStream = new CronetClientStream(
url, userAgent, executor, headers, CronetClientTransport.this, this, lock, maxMessageSize,
alwaysUsePut, method, statsTraceCtx, callOptions, transportTracer, useGetForSafeMethods,
usePutForIdempotentMethods);
usePutForIdempotentMethods, readBufferSize);

@Override
public void run() {
Expand Down
21 changes: 14 additions & 7 deletions cronet/src/test/java/io/grpc/cronet/CronetClientStreamTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,8 @@ public void setUp() {
CallOptions.DEFAULT,
transportTracer,
false,
false);
false,
4 * 1024);
callback.setStream(clientStream);
when(factory.newBidirectionalStreamBuilder(
any(String.class), any(BidirectionalStream.Callback.class), any(Executor.class)))
Expand Down Expand Up @@ -591,7 +592,8 @@ public void addCronetRequestAnnotation_deprecated() {
CallOptions.DEFAULT.withOption(CronetClientStream.CRONET_ANNOTATION_KEY, annotation),
transportTracer,
false,
false);
false,
4 * 1024);
callback.setStream(stream);
when(factory.newBidirectionalStreamBuilder(
any(String.class), any(BidirectionalStream.Callback.class), any(Executor.class)))
Expand Down Expand Up @@ -626,7 +628,8 @@ public void withAnnotation() {
callOptions,
transportTracer,
false,
false);
false,
4 * 1024);
callback.setStream(stream);
when(factory.newBidirectionalStreamBuilder(
any(String.class), any(BidirectionalStream.Callback.class), any(Executor.class)))
Expand Down Expand Up @@ -666,7 +669,8 @@ public void getUnaryRequest() {
CallOptions.DEFAULT,
transportTracer,
true,
false);
false,
4 * 1024);
callback.setStream(stream);
BidirectionalStream.Builder getBuilder =
mock(BidirectionalStream.Builder.class);
Expand Down Expand Up @@ -723,7 +727,8 @@ public void idempotentMethod_usesHttpPut() {
CallOptions.DEFAULT,
transportTracer,
true,
true);
true,
4 * 1024);
callback.setStream(stream);
BidirectionalStream.Builder builder =
mock(BidirectionalStream.Builder.class);
Expand Down Expand Up @@ -755,7 +760,8 @@ public void alwaysUsePutOption_usesHttpPut() {
CallOptions.DEFAULT,
transportTracer,
true,
true);
true,
4 * 1024);
callback.setStream(stream);
BidirectionalStream.Builder builder =
mock(BidirectionalStream.Builder.class);
Expand Down Expand Up @@ -795,7 +801,8 @@ public void reservedHeadersStripped() {
CallOptions.DEFAULT,
transportTracer,
false,
false);
false,
4 * 1024);
callback.setStream(stream);
BidirectionalStream.Builder builder =
mock(BidirectionalStream.Builder.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,8 @@ public void setUp() {
false,
TransportTracer.getDefaultFactory().create(),
false,
false);
false,
4 * 1024);
Runnable callback = transport.start(clientTransportListener);
assertNotNull(callback);
callback.run();
Expand Down