Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ protected InternalServer newServer(List<ServerStreamTracer.Factory> streamTracer
.setListenAddress(addr)
.setExecutorPool(serverExecutorPool)
.setExecutorServicePool(executorServicePool)
.setOffloadExecutorPool(offloadExecutorPool)
.setStreamTracerFactories(streamTracerFactories)
.build();

Expand Down
22 changes: 22 additions & 0 deletions binder/src/main/java/io/grpc/binder/BinderServerBuilder.java
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import io.grpc.internal.FixedObjectPool;
import io.grpc.internal.ServerImplBuilder;
import java.io.File;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;

/** Builder for a server that services requests from an Android Service. */
Expand Down Expand Up @@ -141,6 +142,27 @@ public BinderServerBuilder useTransportSecurity(File certChain, File privateKey)
throw new UnsupportedOperationException("TLS not supported in BinderServer");
}

/**
* Provides an executor to be used for operations that block or are expensive.
*
* <p>For example, {@link SecurityPolicy}s may be evaluated on this executor as implementations
* commonly require one or more blocking IPC round trips to Android's system server. This allows
* the host application to segregate its threads by workload.
*
* <p>Optional. By default, the executor associated with {@link ServerBuilder#executor(Executor)}
* will be used for this purpose.
*
* <p>The server won't take ownership of the given executor. Callers must ensure that it remains
* usable (not shutdown) until the built server terminates.
*
* @return this
*/
public BinderServerBuilder offloadExecutor(Executor executor) {
internalBuilder.setOffloadExecutorPool(
new FixedObjectPool<>(checkNotNull(executor, "offloadExecutor")));
return this;
}

/**
* Builds a {@link Server} according to this builder's parameters and stores its listening {@link
* IBinder} in the {@link IBinderReceiver} passed to {@link #forAddress(AndroidComponentAddress,
Expand Down
13 changes: 5 additions & 8 deletions binder/src/main/java/io/grpc/binder/ServerSecurityPolicy.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import io.grpc.Status;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.Executor;
import javax.annotation.CheckReturnValue;

/**
Expand Down Expand Up @@ -66,22 +67,18 @@ public Status checkAuthorizationForService(int uid, String serviceName) {
*
* @param uid The Android UID to authenticate.
* @param serviceName The name of the gRPC service being called.
* @param offloadExecutor for evaluating the relevant SecurityPolicy if it's not natively async
* @return a future with the result of the authorization check. A failed future represents a
* failure to perform the authorization check, not that the access is denied.
*/
@CheckReturnValue
ListenableFuture<Status> checkAuthorizationForServiceAsync(int uid, String serviceName) {
ListenableFuture<Status> checkAuthorizationForServiceAsync(
int uid, String serviceName, Executor offloadExecutor) {
SecurityPolicy securityPolicy = perServicePolicies.getOrDefault(serviceName, defaultPolicy);
if (securityPolicy instanceof AsyncSecurityPolicy) {
return ((AsyncSecurityPolicy) securityPolicy).checkAuthorizationAsync(uid);
}

try {
Status status = securityPolicy.checkAuthorization(uid);
return Futures.immediateFuture(status);
} catch (Exception e) {
return Futures.immediateFailedFuture(e);
}
return Futures.submit(() -> securityPolicy.checkAuthorization(uid), offloadExecutor);
}

public static Builder newBuilder() {
Expand Down
27 changes: 26 additions & 1 deletion binder/src/main/java/io/grpc/binder/internal/BinderServer.java
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,8 @@ public final class BinderServer implements InternalServer, LeakSafeOneWayBinder.

private final ObjectPool<ScheduledExecutorService> executorServicePool;
private final ObjectPool<? extends Executor> executorPool;
private final ObjectPool<? extends Executor> offloadExecutorPool;

private final ImmutableList<ServerStreamTracer.Factory> streamTracerFactories;
private final AndroidComponentAddress listenAddress;
private final LeakSafeOneWayBinder hostServiceBinder;
Expand All @@ -81,13 +83,21 @@ public final class BinderServer implements InternalServer, LeakSafeOneWayBinder.
@GuardedBy("this")
private Executor executor;

@Nullable // Before start() and after termination.
@GuardedBy("this")
private Executor offloadExecutor;

@GuardedBy("this")
private boolean shutdown;

private BinderServer(Builder builder) {
this.listenAddress = checkNotNull(builder.listenAddress);
this.executorPool = checkNotNull(builder.executorPool);
this.executorServicePool = builder.executorServicePool;
this.offloadExecutorPool =
builder.offloadExecutorPool != null
? builder.offloadExecutorPool
: builder.executorServicePool;
this.streamTracerFactories =
ImmutableList.copyOf(checkNotNull(builder.streamTracerFactories, "streamTracerFactories"));
this.serverPolicyChecker = BinderInternal.createPolicyChecker(builder.serverSecurityPolicy);
Expand All @@ -105,6 +115,7 @@ public synchronized void start(ServerListener serverListener) throws IOException
listener = new ActiveTransportTracker(serverListener, this::onTerminated);
executorService = executorServicePool.getObject();
executor = executorPool.getObject();
offloadExecutor = offloadExecutorPool.getObject();
}

@Override
Expand Down Expand Up @@ -142,6 +153,7 @@ public synchronized void shutdown() {

private synchronized void onTerminated() {
executor = executorPool.returnObject(executor);
offloadExecutor = offloadExecutorPool.returnObject(offloadExecutor);
}

@Override
Expand Down Expand Up @@ -176,7 +188,9 @@ public synchronized boolean handleTransaction(int code, Parcel parcel) {
attrsBuilder,
callingUid,
serverPolicyChecker,
checkNotNull(executor, "Not started?"));
checkNotNull(executor, "Not started?"),
checkNotNull(offloadExecutor, "Not started?"));

// Create a new transport and let our listener know about it.
BinderTransport.BinderServerTransport transport =
new BinderTransport.BinderServerTransport(
Expand Down Expand Up @@ -219,6 +233,7 @@ public boolean handleTransaction(int code, Parcel parcel) {
public static class Builder {
@Nullable AndroidComponentAddress listenAddress;
@Nullable List<? extends ServerStreamTracer.Factory> streamTracerFactories;
@Nullable ObjectPool<? extends Executor> offloadExecutorPool;
@Nullable ObjectPool<? extends Executor> executorPool;

ObjectPool<ScheduledExecutorService> executorServicePool =
Expand Down Expand Up @@ -275,6 +290,16 @@ public Builder setExecutorServicePool(
return this;
}

/**
* Sets the executor to be used for blocking work.
*
* <p>Optional. If unset, 'executorPool' will be used for this work (not recommended).
*/
public Builder setOffloadExecutorPool(ObjectPool<? extends Executor> offloadExecutorPool) {
this.offloadExecutorPool = offloadExecutorPool;
return this;
}

/**
* Sets the {@link ServerSecurityPolicy} to be used for built servers.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,17 +69,20 @@ public static void installAuthInterceptor(ServerBuilder<?> serverBuilder) {
* @param remoteUid The remote UID of the transport.
* @param serverPolicyChecker The policy checker for this transport.
* @param executor used for calling into the application. Must outlive the transport.
* @param offloadExecutor used for blocking or expensive work. Must outlive the transport.
*/
@Internal
public static void attachAuthAttrs(
Attributes.Builder builder,
int remoteUid,
ServerPolicyChecker serverPolicyChecker,
Executor executor) {
Executor executor,
Executor offloadExecutor) {
builder
.set(
TRANSPORT_AUTHORIZATION_STATE,
new TransportAuthorizationState(remoteUid, serverPolicyChecker, executor))
new TransportAuthorizationState(
remoteUid, serverPolicyChecker, executor, offloadExecutor))
.set(GrpcAttributes.ATTR_SECURITY_LEVEL, SecurityLevel.PRIVACY_AND_INTEGRITY);
}

Expand All @@ -97,9 +100,9 @@ public <ReqT, RespT> ServerCall.Listener<ReqT> interceptCall(
ListenableFuture<Status> authStatusFuture =
transportAuthState.checkAuthorization(call.getMethodDescriptor());

// Most SecurityPolicy will have synchronous implementations that provide an
// immediately-resolved Future. In that case, short-circuit to avoid unnecessary allocations
// and asynchronous code if the authorization result is already present.
// Auth decisions are cached so this future will often already be complete. In that case, we
// use a fast path below that avoids unnecessary allocations and asynchronous code since the
// authorization result is already known.
if (!authStatusFuture.isDone()) {
return newServerCallListenerForPendingAuthResult(
authStatusFuture, transportAuthState.executor, call, headers, next);
Expand Down Expand Up @@ -166,15 +169,21 @@ private static final class TransportAuthorizationState {
private final ServerPolicyChecker serverPolicyChecker;
private final ConcurrentHashMap<String, ListenableFuture<Status>> serviceAuthorization;
private final Executor executor;
private final Executor offloadExecutor;

/**
* @param executor used for calling into the application. Must outlive the transport.
* @param offloadExecutor used to check a non-async SecurityPolicy. Must outlive the transport.
*/
TransportAuthorizationState(
int uid, ServerPolicyChecker serverPolicyChecker, Executor executor) {
int uid,
ServerPolicyChecker serverPolicyChecker,
Executor executor,
Executor offloadExecutor) {
this.uid = uid;
this.serverPolicyChecker = serverPolicyChecker;
this.executor = executor;
this.offloadExecutor = offloadExecutor;
serviceAuthorization = new ConcurrentHashMap<>(8);
}

Expand Down Expand Up @@ -202,7 +211,7 @@ ListenableFuture<Status> checkAuthorization(MethodDescriptor<?, ?> method) {
// TODO(10669): evaluate if there should be at most a single pending authorization check per
// (uid, serviceName) pair at any given time.
ListenableFuture<Status> authorization =
serverPolicyChecker.checkAuthorizationForServiceAsync(uid, serviceName);
serverPolicyChecker.checkAuthorizationForServiceAsync(uid, serviceName, offloadExecutor);
if (useCache) {
serviceAuthorization.putIfAbsent(serviceName, authorization);
Futures.addCallback(
Expand All @@ -227,7 +236,7 @@ public void onFailure(Throwable t) {
*
* <p>This class provides the asynchronous version of {@link io.grpc.binder.SecurityPolicy},
* allowing implementations of authorization logic that involves slow or asynchronous calls
* without necessarily blocking the calling thread.
* without ever blocking the calling thread.
*
* @see io.grpc.binder.SecurityPolicy
*/
Expand All @@ -238,11 +247,15 @@ public interface ServerPolicyChecker {
* <p>This method never throws an exception. If the execution of the security policy check
* fails, a failed future with such exception is returned.
*
* <p>This method never blocks the calling thread.
*
* @param uid The Android UID to authenticate.
* @param serviceName The name of the gRPC service being called.
* @param offloadExecutor used for blocking or expensive work if necessary
* @return a future with the result of the authorization check. A failed future represents a
* failure to perform the authorization check, not that the access is denied.
*/
ListenableFuture<Status> checkAuthorizationForServiceAsync(int uid, String serviceName);
ListenableFuture<Status> checkAuthorizationForServiceAsync(
int uid, String serviceName, Executor offloadExecutor);
}
}
30 changes: 18 additions & 12 deletions binder/src/test/java/io/grpc/binder/ServerSecurityPolicyTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,20 +19,24 @@
import static com.google.common.truth.Truth.assertThat;
import static org.junit.Assert.assertThrows;
import static org.junit.Assert.fail;
import static org.robolectric.Shadows.shadowOf;

import android.os.Looper;
import android.os.Process;
import androidx.core.content.ContextCompat;
import androidx.test.core.app.ApplicationProvider;
import com.google.common.base.Function;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.Uninterruptibles;
import io.grpc.Status;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import org.junit.Test;
import org.junit.runner.RunWith;
Expand All @@ -49,6 +53,8 @@ public final class ServerSecurityPolicyTest {
private static final int OTHER_UID = MY_UID + 1;

ServerSecurityPolicy policy;
final Executor executor =
ContextCompat.getMainExecutor(ApplicationProvider.getApplicationContext());

@Test
public void testDefaultInternalOnly() throws Exception {
Expand Down Expand Up @@ -153,8 +159,7 @@ public void testPerServiceAsync() throws Exception {
// Add some extra future transformation to confirm that a chain
// of futures gets properly handled.
ListenableFuture<Void> dependency = Futures.immediateVoidFuture();
return Futures.transform(
dependency, unused -> Status.OK, MoreExecutors.directExecutor());
return Futures.transform(dependency, unused -> Status.OK, executor);
}))
.build();

Expand All @@ -181,8 +186,8 @@ public void testPerService_failedSecurityPolicyFuture_returnsAFailedFuture() {
.build();

ListenableFuture<Status> statusFuture =
policy.checkAuthorizationForServiceAsync(MY_UID, SERVICE1);

policy.checkAuthorizationForServiceAsync(MY_UID, SERVICE1, executor);
shadowOf(Looper.getMainLooper()).idle();
assertThrows(ExecutionException.class, statusFuture::get);
}

Expand All @@ -194,8 +199,8 @@ public void testPerServiceAsync_cancelledFuture_propagatesStatus() {
.build();

ListenableFuture<Status> statusFuture =
policy.checkAuthorizationForServiceAsync(MY_UID, SERVICE1);

policy.checkAuthorizationForServiceAsync(MY_UID, SERVICE1, executor);
shadowOf(Looper.getMainLooper()).idle();
assertThrows(CancellationException.class, statusFuture::get);
}

Expand Down Expand Up @@ -231,8 +236,8 @@ public void testPerServiceAsync_interrupted_cancelledFuture() {
}))
.build();
ListenableFuture<Status> statusFuture =
policy.checkAuthorizationForServiceAsync(MY_UID, SERVICE1);

policy.checkAuthorizationForServiceAsync(MY_UID, SERVICE1, executor);
shadowOf(Looper.getMainLooper()).idle();
assertThrows(InterruptedException.class, statusFuture::get);
listeningExecutorService.shutdownNow();
}
Expand Down Expand Up @@ -337,11 +342,12 @@ public void testPerServiceNoDefaultAsync() throws Exception {
* Shortcut for invoking {@link ServerSecurityPolicy#checkAuthorizationForServiceAsync} without
* dealing with concurrency details. Returns a {link @Status.Code} for convenience.
*/
private static Status.Code checkAuthorizationForServiceAsync(
private Status.Code checkAuthorizationForServiceAsync(
ServerSecurityPolicy policy, int callerUid, String service) throws ExecutionException {
ListenableFuture<Status> statusFuture =
policy.checkAuthorizationForServiceAsync(callerUid, service);
return Uninterruptibles.getUninterruptibly(statusFuture).getCode();
policy.checkAuthorizationForServiceAsync(callerUid, service, executor);
shadowOf(Looper.getMainLooper()).idle();
return Futures.getDone(statusFuture).getCode();
}

private static SecurityPolicy policy(Function<Integer, Status> func) {
Expand Down