diff --git a/xds/src/main/java/io/grpc/xds/ClusterResolverLoadBalancer.java b/xds/src/main/java/io/grpc/xds/ClusterResolverLoadBalancer.java index f57cada52e9..06fafbb6cf1 100644 --- a/xds/src/main/java/io/grpc/xds/ClusterResolverLoadBalancer.java +++ b/xds/src/main/java/io/grpc/xds/ClusterResolverLoadBalancer.java @@ -17,12 +17,9 @@ package io.grpc.xds; import static com.google.common.base.Preconditions.checkNotNull; -import static io.grpc.ConnectivityState.TRANSIENT_FAILURE; import static io.grpc.xds.XdsLbPolicies.PRIORITY_POLICY_NAME; -import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.ImmutableMap; -import com.google.protobuf.Struct; import io.grpc.Attributes; import io.grpc.EquivalentAddressGroup; import io.grpc.HttpConnectProxiedSocketAddress; @@ -30,16 +27,8 @@ import io.grpc.LoadBalancer; import io.grpc.LoadBalancerProvider; import io.grpc.LoadBalancerRegistry; -import io.grpc.NameResolver; -import io.grpc.NameResolver.ResolutionResult; import io.grpc.Status; import io.grpc.StatusOr; -import io.grpc.SynchronizationContext; -import io.grpc.SynchronizationContext.ScheduledHandle; -import io.grpc.internal.BackoffPolicy; -import io.grpc.internal.ExponentialBackoffPolicy; -import io.grpc.internal.ObjectPool; -import io.grpc.util.ForwardingLoadBalancerHelper; import io.grpc.util.GracefulSwitchLoadBalancer; import io.grpc.util.OutlierDetectionLoadBalancer.OutlierDetectionLoadBalancerConfig; import io.grpc.xds.ClusterImplLoadBalancerProvider.ClusterImplConfig; @@ -51,94 +40,46 @@ import io.grpc.xds.EnvoyServerProtoData.FailurePercentageEjection; import io.grpc.xds.EnvoyServerProtoData.OutlierDetection; import io.grpc.xds.EnvoyServerProtoData.SuccessRateEjection; -import io.grpc.xds.EnvoyServerProtoData.UpstreamTlsContext; import io.grpc.xds.PriorityLoadBalancerProvider.PriorityLbConfig; import io.grpc.xds.PriorityLoadBalancerProvider.PriorityLbConfig.PriorityChildConfig; +import io.grpc.xds.XdsConfig.XdsClusterConfig; import io.grpc.xds.XdsEndpointResource.EdsUpdate; -import io.grpc.xds.client.Bootstrapper.ServerInfo; import io.grpc.xds.client.Locality; -import io.grpc.xds.client.XdsClient; -import io.grpc.xds.client.XdsClient.ResourceWatcher; import io.grpc.xds.client.XdsLogger; import io.grpc.xds.client.XdsLogger.XdsLogLevel; import java.net.InetSocketAddress; import java.net.SocketAddress; -import java.net.URI; -import java.net.URISyntaxException; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.List; -import java.util.Locale; import java.util.Map; -import java.util.Objects; import java.util.Set; import java.util.TreeMap; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.TimeUnit; -import javax.annotation.Nullable; /** * Load balancer for cluster_resolver_experimental LB policy. This LB policy is the child LB policy * of the cds_experimental LB policy and the parent LB policy of the priority_experimental LB - * policy in the xDS load balancing hierarchy. This policy resolves endpoints of non-aggregate + * policy in the xDS load balancing hierarchy. This policy converts endpoints of non-aggregate * clusters (e.g., EDS or Logical DNS) and groups endpoints in priorities and localities to be * used in the downstream LB policies for fine-grained load balancing purposes. */ final class ClusterResolverLoadBalancer extends LoadBalancer { - // DNS-resolved endpoints do not have the definition of the locality it belongs to, just hardcode - // to an empty locality. - private static final Locality LOGICAL_DNS_CLUSTER_LOCALITY = Locality.create("", "", ""); private final XdsLogger logger; - private final SynchronizationContext syncContext; - private final ScheduledExecutorService timeService; private final LoadBalancerRegistry lbRegistry; - private final BackoffPolicy.Provider backoffPolicyProvider; - private final GracefulSwitchLoadBalancer delegate; - private ObjectPool xdsClientPool; - private XdsClient xdsClient; - private ClusterResolverConfig config; - - ClusterResolverLoadBalancer(Helper helper) { - this(helper, LoadBalancerRegistry.getDefaultRegistry(), - new ExponentialBackoffPolicy.Provider()); - } + private final LoadBalancer delegate; + private ClusterState clusterState; - @VisibleForTesting - ClusterResolverLoadBalancer(Helper helper, LoadBalancerRegistry lbRegistry, - BackoffPolicy.Provider backoffPolicyProvider) { + ClusterResolverLoadBalancer(Helper helper, LoadBalancerRegistry lbRegistry) { + this.delegate = lbRegistry.getProvider(PRIORITY_POLICY_NAME).newLoadBalancer(helper); this.lbRegistry = checkNotNull(lbRegistry, "lbRegistry"); - this.backoffPolicyProvider = checkNotNull(backoffPolicyProvider, "backoffPolicyProvider"); - this.syncContext = checkNotNull(helper.getSynchronizationContext(), "syncContext"); - this.timeService = checkNotNull(helper.getScheduledExecutorService(), "timeService"); - delegate = new GracefulSwitchLoadBalancer(helper); logger = XdsLogger.withLogId( InternalLogId.allocate("cluster-resolver-lb", helper.getAuthority())); logger.log(XdsLogLevel.INFO, "Created"); } - @Override - public Status acceptResolvedAddresses(ResolvedAddresses resolvedAddresses) { - logger.log(XdsLogLevel.DEBUG, "Received resolution result: {0}", resolvedAddresses); - if (xdsClientPool == null) { - xdsClientPool = resolvedAddresses.getAttributes().get(XdsAttributes.XDS_CLIENT_POOL); - xdsClient = xdsClientPool.getObject(); - } - ClusterResolverConfig config = - (ClusterResolverConfig) resolvedAddresses.getLoadBalancingPolicyConfig(); - if (!Objects.equals(this.config, config)) { - logger.log(XdsLogLevel.DEBUG, "Config: {0}", config); - this.config = config; - Object gracefulConfig = GracefulSwitchLoadBalancer.createLoadBalancingPolicyConfig( - new ClusterResolverLbStateFactory(), config); - delegate.handleResolvedAddresses( - resolvedAddresses.toBuilder().setLoadBalancingPolicyConfig(gracefulConfig).build()); - } - return Status.OK; - } - @Override public void handleNameResolutionError(Status error) { logger.log(XdsLogLevel.WARNING, "Received name resolution error: {0}", error); @@ -149,575 +90,213 @@ public void handleNameResolutionError(Status error) { public void shutdown() { logger.log(XdsLogLevel.INFO, "Shutdown"); delegate.shutdown(); - if (xdsClientPool != null) { - xdsClientPool.returnObject(xdsClient); - } - } - - private final class ClusterResolverLbStateFactory extends LoadBalancer.Factory { - @Override - public LoadBalancer newLoadBalancer(Helper helper) { - return new ClusterResolverLbState(helper); - } } - /** - * The state of a cluster_resolver LB working session. A new instance is created whenever - * the cluster_resolver LB receives a new config. The old instance is replaced when the - * new one is ready to handle new RPCs. - */ - private final class ClusterResolverLbState extends LoadBalancer { - private final Helper helper; - private ClusterState clusterState; - private String cluster; - private Object endpointLbConfig; - private ResolvedAddresses resolvedAddresses; - private LoadBalancer childLb; - - ClusterResolverLbState(Helper helper) { - this.helper = new RefreshableHelper(checkNotNull(helper, "helper")); - logger.log(XdsLogLevel.DEBUG, "New ClusterResolverLbState"); - } - - @Override - public Status acceptResolvedAddresses(ResolvedAddresses resolvedAddresses) { - this.resolvedAddresses = resolvedAddresses; - ClusterResolverConfig config = - (ClusterResolverConfig) resolvedAddresses.getLoadBalancingPolicyConfig(); - endpointLbConfig = config.lbConfig; - DiscoveryMechanism instance = config.discoveryMechanism; - cluster = instance.cluster; - if (instance.type == DiscoveryMechanism.Type.EDS) { - clusterState = new EdsClusterState(instance.cluster, instance.edsServiceName, - instance.lrsServerInfo, instance.maxConcurrentRequests, instance.tlsContext, - instance.filterMetadata, instance.outlierDetection); - } else { // logical DNS - clusterState = new LogicalDnsClusterState(instance.cluster, instance.dnsHostName, - instance.lrsServerInfo, instance.maxConcurrentRequests, instance.tlsContext, - instance.filterMetadata); - } - clusterState.start(); - return Status.OK; - } + @Override + public Status acceptResolvedAddresses(ResolvedAddresses resolvedAddresses) { + logger.log(XdsLogLevel.DEBUG, "Received resolution result: {0}", resolvedAddresses); + ClusterResolverConfig config = + (ClusterResolverConfig) resolvedAddresses.getLoadBalancingPolicyConfig(); + XdsConfig xdsConfig = resolvedAddresses.getAttributes().get(XdsAttributes.XDS_CONFIG); - @Override - public void handleNameResolutionError(Status error) { - if (childLb != null) { - childLb.handleNameResolutionError(error); - } else { - helper.updateBalancingState( - TRANSIENT_FAILURE, new FixedResultPicker(PickResult.withError(error))); - } + DiscoveryMechanism instance = config.discoveryMechanism; + String cluster = instance.cluster; + if (clusterState == null) { + clusterState = new ClusterState(); } - @Override - public void shutdown() { - clusterState.shutdown(); - if (childLb != null) { - childLb.shutdown(); - } + StatusOr edsUpdate = getEdsUpdate(xdsConfig, cluster); + StatusOr statusOrResult = + clusterState.edsUpdateToResult(config, instance, edsUpdate); + if (!statusOrResult.hasValue()) { + Status status = Status.UNAVAILABLE + .withDescription(statusOrResult.getStatus().getDescription()) + .withCause(statusOrResult.getStatus().getCause()); + delegate.handleNameResolutionError(status); + return status; } - - private void handleEndpointResourceUpdate() { - List addresses = new ArrayList<>(); - Map priorityChildConfigs = new HashMap<>(); - List priorities = new ArrayList<>(); // totally ordered priority list - - Status endpointNotFound = Status.OK; - // Propagate endpoints to the child LB policy only after all clusters have been resolved. - if (!clusterState.resolved && clusterState.status.isOk()) { - return; - } - if (clusterState.result != null) { - addresses.addAll(clusterState.result.addresses); - priorityChildConfigs.putAll(clusterState.result.priorityChildConfigs); - priorities.addAll(clusterState.result.priorities); - } else { - endpointNotFound = clusterState.status; - } - if (addresses.isEmpty()) { - if (endpointNotFound.isOk()) { - endpointNotFound = Status.UNAVAILABLE.withDescription( - "No usable endpoint from cluster: " + cluster); - } else { - endpointNotFound = - Status.UNAVAILABLE.withCause(endpointNotFound.getCause()) - .withDescription(endpointNotFound.getDescription()); - } - helper.updateBalancingState( - TRANSIENT_FAILURE, new FixedResultPicker(PickResult.withError(endpointNotFound))); - if (childLb != null) { - childLb.shutdown(); - childLb = null; - } - return; - } - PriorityLbConfig childConfig = - new PriorityLbConfig(Collections.unmodifiableMap(priorityChildConfigs), - Collections.unmodifiableList(priorities)); - if (childLb == null) { - childLb = lbRegistry.getProvider(PRIORITY_POLICY_NAME).newLoadBalancer(helper); - } - childLb.handleResolvedAddresses( - resolvedAddresses.toBuilder() - .setLoadBalancingPolicyConfig(childConfig) - .setAddresses(Collections.unmodifiableList(addresses)) - .build()); + ClusterResolutionResult result = statusOrResult.getValue(); + List addresses = result.addresses; + if (addresses.isEmpty()) { + Status status = Status.UNAVAILABLE + .withDescription("No usable endpoint from cluster: " + cluster); + delegate.handleNameResolutionError(status); + return status; } + PriorityLbConfig childConfig = + new PriorityLbConfig( + Collections.unmodifiableMap(result.priorityChildConfigs), + Collections.unmodifiableList(result.priorities)); + return delegate.acceptResolvedAddresses( + resolvedAddresses.toBuilder() + .setLoadBalancingPolicyConfig(childConfig) + .setAddresses(Collections.unmodifiableList(addresses)) + .build()); + } - private void handleEndpointResolutionError() { - if (!clusterState.status.isOk()) { - if (childLb != null) { - childLb.handleNameResolutionError(clusterState.status); - } else { - helper.updateBalancingState( - TRANSIENT_FAILURE, new FixedResultPicker(PickResult.withError(clusterState.status))); - } - } + private static StatusOr getEdsUpdate(XdsConfig xdsConfig, String cluster) { + StatusOr clusterConfig = xdsConfig.getClusters().get(cluster); + if (clusterConfig == null) { + return StatusOr.fromStatus(Status.INTERNAL + .withDescription("BUG: cluster resolver could not find cluster in xdsConfig")); } - - /** - * Wires re-resolution requests from downstream LB policies with DNS resolver. - */ - private final class RefreshableHelper extends ForwardingLoadBalancerHelper { - private final Helper delegate; - - private RefreshableHelper(Helper delegate) { - this.delegate = checkNotNull(delegate, "delegate"); - } - - @Override - public void refreshNameResolution() { - if (clusterState instanceof LogicalDnsClusterState) { - ((LogicalDnsClusterState) clusterState).refresh(); - } - } - - @Override - protected Helper delegate() { - return delegate; - } + if (!clusterConfig.hasValue()) { + return StatusOr.fromStatus(clusterConfig.getStatus()); } - - /** - * Resolution state of an underlying cluster. - */ - private abstract class ClusterState { - // Name of the cluster to be resolved. - protected final String name; - @Nullable - protected final ServerInfo lrsServerInfo; - @Nullable - protected final Long maxConcurrentRequests; - @Nullable - protected final UpstreamTlsContext tlsContext; - protected final Map filterMetadata; - @Nullable - protected final OutlierDetection outlierDetection; - // Resolution status, may contain most recent error encountered. - protected Status status = Status.OK; - // True if has received resolution result. - protected boolean resolved; - // Most recently resolved addresses and config, or null if resource not exists. - @Nullable - protected ClusterResolutionResult result; - - protected boolean shutdown; - - private ClusterState(String name, @Nullable ServerInfo lrsServerInfo, - @Nullable Long maxConcurrentRequests, @Nullable UpstreamTlsContext tlsContext, - Map filterMetadata, @Nullable OutlierDetection outlierDetection) { - this.name = name; - this.lrsServerInfo = lrsServerInfo; - this.maxConcurrentRequests = maxConcurrentRequests; - this.tlsContext = tlsContext; - this.filterMetadata = ImmutableMap.copyOf(filterMetadata); - this.outlierDetection = outlierDetection; - } - - abstract void start(); - - void shutdown() { - shutdown = true; - } + if (!(clusterConfig.getValue().getChildren() instanceof XdsClusterConfig.EndpointConfig)) { + return StatusOr.fromStatus(Status.INTERNAL + .withDescription("BUG: cluster resolver cluster with children of unknown type")); } + XdsClusterConfig.EndpointConfig endpointConfig = + (XdsClusterConfig.EndpointConfig) clusterConfig.getValue().getChildren(); + return endpointConfig.getEndpoint(); + } - private final class EdsClusterState extends ClusterState implements ResourceWatcher { - @Nullable - private final String edsServiceName; - private Map localityPriorityNames = Collections.emptyMap(); - int priorityNameGenId = 1; - - private EdsClusterState(String name, @Nullable String edsServiceName, - @Nullable ServerInfo lrsServerInfo, @Nullable Long maxConcurrentRequests, - @Nullable UpstreamTlsContext tlsContext, Map filterMetadata, - @Nullable OutlierDetection outlierDetection) { - super(name, lrsServerInfo, maxConcurrentRequests, tlsContext, filterMetadata, - outlierDetection); - this.edsServiceName = edsServiceName; - } - - @Override - void start() { - String resourceName = edsServiceName != null ? edsServiceName : name; - logger.log(XdsLogLevel.INFO, "Start watching EDS resource {0}", resourceName); - xdsClient.watchXdsResource(XdsEndpointResource.getInstance(), - resourceName, this, syncContext); - } - - @Override - protected void shutdown() { - super.shutdown(); - String resourceName = edsServiceName != null ? edsServiceName : name; - logger.log(XdsLogLevel.INFO, "Stop watching EDS resource {0}", resourceName); - xdsClient.cancelXdsResourceWatch(XdsEndpointResource.getInstance(), resourceName, this); - } - - @Override - public void onChanged(final EdsUpdate update) { - class EndpointsUpdated implements Runnable { - @Override - public void run() { - if (shutdown) { - return; - } - logger.log(XdsLogLevel.DEBUG, "Received endpoint update {0}", update); - if (logger.isLoggable(XdsLogLevel.INFO)) { - logger.log(XdsLogLevel.INFO, "Cluster {0}: {1} localities, {2} drop categories", - update.clusterName, update.localityLbEndpointsMap.size(), - update.dropPolicies.size()); + private final class ClusterState { + private Map localityPriorityNames = Collections.emptyMap(); + int priorityNameGenId = 1; + + StatusOr edsUpdateToResult( + ClusterResolverConfig config, DiscoveryMechanism discovery, StatusOr updateOr) { + if (!updateOr.hasValue()) { + return StatusOr.fromStatus(updateOr.getStatus()); + } + EdsUpdate update = updateOr.getValue(); + logger.log(XdsLogLevel.DEBUG, "Received endpoint update {0}", update); + if (logger.isLoggable(XdsLogLevel.INFO)) { + logger.log(XdsLogLevel.INFO, "Cluster {0}: {1} localities, {2} drop categories", + discovery.cluster, update.localityLbEndpointsMap.size(), + update.dropPolicies.size()); + } + Map localityLbEndpoints = + update.localityLbEndpointsMap; + List dropOverloads = update.dropPolicies; + List addresses = new ArrayList<>(); + Map> prioritizedLocalityWeights = new HashMap<>(); + List sortedPriorityNames = + generatePriorityNames(discovery.cluster, localityLbEndpoints); + for (Locality locality : localityLbEndpoints.keySet()) { + LocalityLbEndpoints localityLbInfo = localityLbEndpoints.get(locality); + String priorityName = localityPriorityNames.get(locality); + boolean discard = true; + for (LbEndpoint endpoint : localityLbInfo.endpoints()) { + if (endpoint.isHealthy()) { + discard = false; + long weight = localityLbInfo.localityWeight(); + if (endpoint.loadBalancingWeight() != 0) { + weight *= endpoint.loadBalancingWeight(); } - Map localityLbEndpoints = - update.localityLbEndpointsMap; - List dropOverloads = update.dropPolicies; - List addresses = new ArrayList<>(); - Map> prioritizedLocalityWeights = new HashMap<>(); - List sortedPriorityNames = generatePriorityNames(name, localityLbEndpoints); - for (Locality locality : localityLbEndpoints.keySet()) { - LocalityLbEndpoints localityLbInfo = localityLbEndpoints.get(locality); - String priorityName = localityPriorityNames.get(locality); - boolean discard = true; - for (LbEndpoint endpoint : localityLbInfo.endpoints()) { - if (endpoint.isHealthy()) { - discard = false; - long weight = localityLbInfo.localityWeight(); - if (endpoint.loadBalancingWeight() != 0) { - weight *= endpoint.loadBalancingWeight(); - } - String localityName = localityName(locality); - Attributes attr = - endpoint.eag().getAttributes().toBuilder() - .set(XdsAttributes.ATTR_LOCALITY, locality) - .set(EquivalentAddressGroup.ATTR_LOCALITY_NAME, localityName) - .set(XdsAttributes.ATTR_LOCALITY_WEIGHT, - localityLbInfo.localityWeight()) - .set(XdsAttributes.ATTR_SERVER_WEIGHT, weight) - .set(XdsAttributes.ATTR_ADDRESS_NAME, endpoint.hostname()) - .build(); - - EquivalentAddressGroup eag; - if (config.isHttp11ProxyAvailable()) { - List rewrittenAddresses = new ArrayList<>(); - for (SocketAddress addr : endpoint.eag().getAddresses()) { - rewrittenAddresses.add(rewriteAddress( - addr, endpoint.endpointMetadata(), localityLbInfo.localityMetadata())); - } - eag = new EquivalentAddressGroup(rewrittenAddresses, attr); - } else { - eag = new EquivalentAddressGroup(endpoint.eag().getAddresses(), attr); - } - eag = AddressFilter.setPathFilter(eag, Arrays.asList(priorityName, localityName)); - addresses.add(eag); - } - } - if (discard) { - logger.log(XdsLogLevel.INFO, - "Discard locality {0} with 0 healthy endpoints", locality); - continue; + String localityName = localityName(locality); + Attributes attr = + endpoint.eag().getAttributes().toBuilder() + .set(XdsAttributes.ATTR_LOCALITY, locality) + .set(EquivalentAddressGroup.ATTR_LOCALITY_NAME, localityName) + .set(XdsAttributes.ATTR_LOCALITY_WEIGHT, + localityLbInfo.localityWeight()) + .set(XdsAttributes.ATTR_SERVER_WEIGHT, weight) + .set(XdsAttributes.ATTR_ADDRESS_NAME, endpoint.hostname()) + .build(); + EquivalentAddressGroup eag; + if (config.isHttp11ProxyAvailable()) { + List rewrittenAddresses = new ArrayList<>(); + for (SocketAddress addr : endpoint.eag().getAddresses()) { + rewrittenAddresses.add(rewriteAddress( + addr, endpoint.endpointMetadata(), localityLbInfo.localityMetadata())); } - if (!prioritizedLocalityWeights.containsKey(priorityName)) { - prioritizedLocalityWeights.put(priorityName, new HashMap()); - } - prioritizedLocalityWeights.get(priorityName).put( - locality, localityLbInfo.localityWeight()); - } - if (prioritizedLocalityWeights.isEmpty()) { - // Will still update the result, as if the cluster resource is revoked. - logger.log(XdsLogLevel.INFO, - "Cluster {0} has no usable priority/locality/endpoint", update.clusterName); + eag = new EquivalentAddressGroup(rewrittenAddresses, attr); + } else { + eag = new EquivalentAddressGroup(endpoint.eag().getAddresses(), attr); } - sortedPriorityNames.retainAll(prioritizedLocalityWeights.keySet()); - Map priorityChildConfigs = - generateEdsBasedPriorityChildConfigs( - name, edsServiceName, lrsServerInfo, maxConcurrentRequests, tlsContext, - filterMetadata, outlierDetection, endpointLbConfig, lbRegistry, - prioritizedLocalityWeights, dropOverloads); - status = Status.OK; - resolved = true; - result = new ClusterResolutionResult(addresses, priorityChildConfigs, - sortedPriorityNames); - handleEndpointResourceUpdate(); - } - } - - new EndpointsUpdated().run(); - } - - private SocketAddress rewriteAddress(SocketAddress addr, - ImmutableMap endpointMetadata, - ImmutableMap localityMetadata) { - if (!(addr instanceof InetSocketAddress)) { - return addr; - } - - SocketAddress proxyAddress; - try { - proxyAddress = (SocketAddress) endpointMetadata.get( - "envoy.http11_proxy_transport_socket.proxy_address"); - if (proxyAddress == null) { - proxyAddress = (SocketAddress) localityMetadata.get( - "envoy.http11_proxy_transport_socket.proxy_address"); + eag = AddressFilter.setPathFilter(eag, Arrays.asList(priorityName, localityName)); + addresses.add(eag); } - } catch (ClassCastException e) { - return addr; } - - if (proxyAddress == null) { - return addr; + if (discard) { + logger.log(XdsLogLevel.INFO, + "Discard locality {0} with 0 healthy endpoints", locality); + continue; } - - return HttpConnectProxiedSocketAddress.newBuilder() - .setTargetAddress((InetSocketAddress) addr) - .setProxyAddress(proxyAddress) - .build(); - } - - private List generatePriorityNames(String name, - Map localityLbEndpoints) { - TreeMap> todo = new TreeMap<>(); - for (Locality locality : localityLbEndpoints.keySet()) { - int priority = localityLbEndpoints.get(locality).priority(); - if (!todo.containsKey(priority)) { - todo.put(priority, new ArrayList<>()); - } - todo.get(priority).add(locality); + if (!prioritizedLocalityWeights.containsKey(priorityName)) { + prioritizedLocalityWeights.put(priorityName, new HashMap()); } - Map newNames = new HashMap<>(); - Set usedNames = new HashSet<>(); - List ret = new ArrayList<>(); - for (Integer priority: todo.keySet()) { - String foundName = ""; - for (Locality locality : todo.get(priority)) { - if (localityPriorityNames.containsKey(locality) - && usedNames.add(localityPriorityNames.get(locality))) { - foundName = localityPriorityNames.get(locality); - break; - } - } - if ("".equals(foundName)) { - foundName = String.format(Locale.US, "%s[child%d]", name, priorityNameGenId++); - } - for (Locality locality : todo.get(priority)) { - newNames.put(locality, foundName); - } - ret.add(foundName); - } - localityPriorityNames = newNames; - return ret; - } - - @Override - public void onResourceDoesNotExist(final String resourceName) { - if (shutdown) { - return; - } - logger.log(XdsLogLevel.INFO, "Resource {0} unavailable", resourceName); - status = Status.OK; - resolved = true; - result = null; // resource revoked - handleEndpointResourceUpdate(); - } - - @Override - public void onError(final Status error) { - if (shutdown) { - return; - } - String resourceName = edsServiceName != null ? edsServiceName : name; - status = Status.UNAVAILABLE - .withDescription(String.format("Unable to load EDS %s. xDS server returned: %s: %s", - resourceName, error.getCode(), error.getDescription())) - .withCause(error.getCause()); - logger.log(XdsLogLevel.WARNING, "Received EDS error: {0}", error); - handleEndpointResolutionError(); - } + prioritizedLocalityWeights.get(priorityName).put( + locality, localityLbInfo.localityWeight()); + } + if (prioritizedLocalityWeights.isEmpty()) { + // Will still update the result, as if the cluster resource is revoked. + logger.log(XdsLogLevel.INFO, + "Cluster {0} has no usable priority/locality/endpoint", discovery.cluster); + } + sortedPriorityNames.retainAll(prioritizedLocalityWeights.keySet()); + Map priorityChildConfigs = + generatePriorityChildConfigs( + discovery, config.lbConfig, lbRegistry, + prioritizedLocalityWeights, dropOverloads); + return StatusOr.fromValue(new ClusterResolutionResult(addresses, priorityChildConfigs, + sortedPriorityNames)); } - private final class LogicalDnsClusterState extends ClusterState { - private final String dnsHostName; - private final NameResolver.Factory nameResolverFactory; - private final NameResolver.Args nameResolverArgs; - private NameResolver resolver; - @Nullable - private BackoffPolicy backoffPolicy; - @Nullable - private ScheduledHandle scheduledRefresh; - - private LogicalDnsClusterState(String name, String dnsHostName, - @Nullable ServerInfo lrsServerInfo, @Nullable Long maxConcurrentRequests, - @Nullable UpstreamTlsContext tlsContext, Map filterMetadata) { - super(name, lrsServerInfo, maxConcurrentRequests, tlsContext, filterMetadata, null); - this.dnsHostName = checkNotNull(dnsHostName, "dnsHostName"); - nameResolverFactory = - checkNotNull(helper.getNameResolverRegistry().asFactory(), "nameResolverFactory"); - nameResolverArgs = checkNotNull(helper.getNameResolverArgs(), "nameResolverArgs"); + private SocketAddress rewriteAddress(SocketAddress addr, + ImmutableMap endpointMetadata, + ImmutableMap localityMetadata) { + if (!(addr instanceof InetSocketAddress)) { + return addr; } - @Override - void start() { - URI uri; - try { - uri = new URI("dns", "", "/" + dnsHostName, null); - } catch (URISyntaxException e) { - status = Status.INTERNAL.withDescription( - "Bug, invalid URI creation: " + dnsHostName).withCause(e); - handleEndpointResolutionError(); - return; - } - resolver = nameResolverFactory.newNameResolver(uri, nameResolverArgs); - if (resolver == null) { - status = Status.INTERNAL.withDescription("Xds cluster resolver lb for logical DNS " - + "cluster [" + name + "] cannot find DNS resolver with uri:" + uri); - handleEndpointResolutionError(); - return; - } - resolver.start(new NameResolverListener(dnsHostName)); - } - - void refresh() { - if (resolver == null) { - return; - } - cancelBackoff(); - resolver.refresh(); - } - - @Override - void shutdown() { - super.shutdown(); - if (resolver != null) { - resolver.shutdown(); - } - cancelBackoff(); - } - - private void cancelBackoff() { - if (scheduledRefresh != null) { - scheduledRefresh.cancel(); - scheduledRefresh = null; - backoffPolicy = null; + SocketAddress proxyAddress; + try { + proxyAddress = (SocketAddress) endpointMetadata.get( + "envoy.http11_proxy_transport_socket.proxy_address"); + if (proxyAddress == null) { + proxyAddress = (SocketAddress) localityMetadata.get( + "envoy.http11_proxy_transport_socket.proxy_address"); } + } catch (ClassCastException e) { + return addr; } - private class DelayedNameResolverRefresh implements Runnable { - @Override - public void run() { - scheduledRefresh = null; - if (!shutdown) { - resolver.refresh(); - } - } + if (proxyAddress == null) { + return addr; } - private class NameResolverListener extends NameResolver.Listener2 { - private final String dnsHostName; - - NameResolverListener(String dnsHostName) { - this.dnsHostName = dnsHostName; - } + return HttpConnectProxiedSocketAddress.newBuilder() + .setTargetAddress((InetSocketAddress) addr) + .setProxyAddress(proxyAddress) + .build(); + } - @Override - public void onResult(final ResolutionResult resolutionResult) { - syncContext.execute(() -> onResult2(resolutionResult)); + private List generatePriorityNames(String name, + Map localityLbEndpoints) { + TreeMap> todo = new TreeMap<>(); + for (Locality locality : localityLbEndpoints.keySet()) { + int priority = localityLbEndpoints.get(locality).priority(); + if (!todo.containsKey(priority)) { + todo.put(priority, new ArrayList<>()); } - - @Override - public Status onResult2(final ResolutionResult resolutionResult) { - if (shutdown) { - return Status.OK; - } - // Arbitrary priority notation for all DNS-resolved endpoints. - String priorityName = priorityName(name, 0); // value doesn't matter - List addresses = new ArrayList<>(); - StatusOr> addressesOrError = - resolutionResult.getAddressesOrError(); - if (addressesOrError.hasValue()) { - backoffPolicy = null; // reset backoff sequence if succeeded - for (EquivalentAddressGroup eag : addressesOrError.getValue()) { - // No weight attribute is attached, all endpoint-level LB policy should be able - // to handle such it. - String localityName = localityName(LOGICAL_DNS_CLUSTER_LOCALITY); - Attributes attr = eag.getAttributes().toBuilder() - .set(XdsAttributes.ATTR_LOCALITY, LOGICAL_DNS_CLUSTER_LOCALITY) - .set(EquivalentAddressGroup.ATTR_LOCALITY_NAME, localityName) - .set(XdsAttributes.ATTR_ADDRESS_NAME, dnsHostName) - .build(); - eag = new EquivalentAddressGroup(eag.getAddresses(), attr); - eag = AddressFilter.setPathFilter(eag, Arrays.asList(priorityName, localityName)); - addresses.add(eag); - } - PriorityChildConfig priorityChildConfig = generateDnsBasedPriorityChildConfig( - name, lrsServerInfo, maxConcurrentRequests, tlsContext, filterMetadata, - lbRegistry, Collections.emptyList()); - status = Status.OK; - resolved = true; - result = new ClusterResolutionResult(addresses, priorityName, priorityChildConfig); - handleEndpointResourceUpdate(); - return Status.OK; - } else { - handleErrorInSyncContext(addressesOrError.getStatus()); - return addressesOrError.getStatus(); + todo.get(priority).add(locality); + } + Map newNames = new HashMap<>(); + Set usedNames = new HashSet<>(); + List ret = new ArrayList<>(); + for (Integer priority: todo.keySet()) { + String foundName = ""; + for (Locality locality : todo.get(priority)) { + if (localityPriorityNames.containsKey(locality) + && usedNames.add(localityPriorityNames.get(locality))) { + foundName = localityPriorityNames.get(locality); + break; } } - - @Override - public void onError(final Status error) { - syncContext.execute(() -> handleErrorInSyncContext(error)); + if ("".equals(foundName)) { + foundName = priorityName(name, priorityNameGenId++); } - - private void handleErrorInSyncContext(final Status error) { - if (shutdown) { - return; - } - status = error; - // NameResolver.Listener API cannot distinguish between address-not-found and - // transient errors. If the error occurs in the first resolution, treat it as - // address not found. Otherwise, either there is previously resolved addresses - // previously encountered error, propagate the error to downstream/upstream and - // let downstream/upstream handle it. - if (!resolved) { - resolved = true; - handleEndpointResourceUpdate(); - } else { - handleEndpointResolutionError(); - } - if (scheduledRefresh != null && scheduledRefresh.isPending()) { - return; - } - if (backoffPolicy == null) { - backoffPolicy = backoffPolicyProvider.get(); - } - long delayNanos = backoffPolicy.nextBackoffNanos(); - logger.log(XdsLogLevel.DEBUG, - "Logical DNS resolver for cluster {0} encountered name resolution " - + "error: {1}, scheduling DNS resolution backoff for {2} ns", - name, error, delayNanos); - scheduledRefresh = - syncContext.schedule( - new DelayedNameResolverRefresh(), delayNanos, TimeUnit.NANOSECONDS, - timeService); + for (Locality locality : todo.get(priority)) { + newNames.put(locality, foundName); } + ret.add(foundName); } + localityPriorityNames = newNames; + return ret; } } @@ -729,12 +308,6 @@ private static class ClusterResolutionResult { // List of priority names ordered in descending priorities. private final List priorities; - ClusterResolutionResult(List addresses, String priority, - PriorityChildConfig config) { - this(addresses, Collections.singletonMap(priority, config), - Collections.singletonList(priority)); - } - ClusterResolutionResult(List addresses, Map configs, List priorities) { this.addresses = addresses; @@ -744,46 +317,24 @@ private static class ClusterResolutionResult { } /** - * Generates the config to be used in the priority LB policy for the single priority of - * logical DNS cluster. - * - *

priority LB -> cluster_impl LB (single hardcoded priority) -> pick_first - */ - private static PriorityChildConfig generateDnsBasedPriorityChildConfig( - String cluster, @Nullable ServerInfo lrsServerInfo, @Nullable Long maxConcurrentRequests, - @Nullable UpstreamTlsContext tlsContext, Map filterMetadata, - LoadBalancerRegistry lbRegistry, List dropOverloads) { - // Override endpoint-level LB policy with pick_first for logical DNS cluster. - Object endpointLbConfig = GracefulSwitchLoadBalancer.createLoadBalancingPolicyConfig( - lbRegistry.getProvider("pick_first"), null); - ClusterImplConfig clusterImplConfig = - new ClusterImplConfig(cluster, null, lrsServerInfo, maxConcurrentRequests, - dropOverloads, endpointLbConfig, tlsContext, filterMetadata); - LoadBalancerProvider clusterImplLbProvider = - lbRegistry.getProvider(XdsLbPolicies.CLUSTER_IMPL_POLICY_NAME); - Object clusterImplPolicy = GracefulSwitchLoadBalancer.createLoadBalancingPolicyConfig( - clusterImplLbProvider, clusterImplConfig); - return new PriorityChildConfig(clusterImplPolicy, false /* ignoreReresolution*/); - } - - /** - * Generates configs to be used in the priority LB policy for priorities in an EDS cluster. + * Generates configs to be used in the priority LB policy for priorities in a cluster. * *

priority LB -> cluster_impl LB (one per priority) -> (weighted_target LB * -> round_robin / least_request_experimental (one per locality)) / ring_hash_experimental */ - private static Map generateEdsBasedPriorityChildConfigs( - String cluster, @Nullable String edsServiceName, @Nullable ServerInfo lrsServerInfo, - @Nullable Long maxConcurrentRequests, @Nullable UpstreamTlsContext tlsContext, - Map filterMetadata, - @Nullable OutlierDetection outlierDetection, Object endpointLbConfig, - LoadBalancerRegistry lbRegistry, Map> prioritizedLocalityWeights, List dropOverloads) { + private static Map generatePriorityChildConfigs( + DiscoveryMechanism discovery, + Object endpointLbConfig, + LoadBalancerRegistry lbRegistry, + Map> prioritizedLocalityWeights, + List dropOverloads) { Map configs = new HashMap<>(); for (String priority : prioritizedLocalityWeights.keySet()) { ClusterImplConfig clusterImplConfig = - new ClusterImplConfig(cluster, edsServiceName, lrsServerInfo, maxConcurrentRequests, - dropOverloads, endpointLbConfig, tlsContext, filterMetadata); + new ClusterImplConfig( + discovery.cluster, discovery.edsServiceName, discovery.lrsServerInfo, + discovery.maxConcurrentRequests, dropOverloads, endpointLbConfig, + discovery.tlsContext, discovery.filterMetadata); LoadBalancerProvider clusterImplLbProvider = lbRegistry.getProvider(XdsLbPolicies.CLUSTER_IMPL_POLICY_NAME); Object priorityChildPolicy = GracefulSwitchLoadBalancer.createLoadBalancingPolicyConfig( @@ -791,16 +342,17 @@ private static Map generateEdsBasedPriorityChildCon // If outlier detection has been configured we wrap the child policy in the outlier detection // load balancer. - if (outlierDetection != null) { + if (discovery.outlierDetection != null) { LoadBalancerProvider outlierDetectionProvider = lbRegistry.getProvider( "outlier_detection_experimental"); priorityChildPolicy = GracefulSwitchLoadBalancer.createLoadBalancingPolicyConfig( outlierDetectionProvider, - buildOutlierDetectionLbConfig(outlierDetection, priorityChildPolicy)); + buildOutlierDetectionLbConfig(discovery.outlierDetection, priorityChildPolicy)); } + boolean isEds = discovery.type == DiscoveryMechanism.Type.EDS; PriorityChildConfig priorityChildConfig = - new PriorityChildConfig(priorityChildPolicy, true /* ignoreReresolution */); + new PriorityChildConfig(priorityChildPolicy, isEds /* ignoreReresolution */); configs.put(priority, priorityChildConfig); } return configs; diff --git a/xds/src/main/java/io/grpc/xds/ClusterResolverLoadBalancerProvider.java b/xds/src/main/java/io/grpc/xds/ClusterResolverLoadBalancerProvider.java index 48101cd9c54..8cff272fcba 100644 --- a/xds/src/main/java/io/grpc/xds/ClusterResolverLoadBalancerProvider.java +++ b/xds/src/main/java/io/grpc/xds/ClusterResolverLoadBalancerProvider.java @@ -25,6 +25,7 @@ import io.grpc.LoadBalancer; import io.grpc.LoadBalancer.Helper; import io.grpc.LoadBalancerProvider; +import io.grpc.LoadBalancerRegistry; import io.grpc.NameResolver.ConfigOrError; import io.grpc.Status; import io.grpc.xds.EnvoyServerProtoData.OutlierDetection; @@ -41,6 +42,15 @@ */ @Internal public final class ClusterResolverLoadBalancerProvider extends LoadBalancerProvider { + private final LoadBalancerRegistry lbRegistry; + + public ClusterResolverLoadBalancerProvider() { + this.lbRegistry = null; + } + + ClusterResolverLoadBalancerProvider(LoadBalancerRegistry lbRegistry) { + this.lbRegistry = checkNotNull(lbRegistry, "lbRegistry"); + } @Override public boolean isAvailable() { @@ -65,7 +75,11 @@ public ConfigOrError parseLoadBalancingPolicyConfig(Map rawLoadBalanc @Override public LoadBalancer newLoadBalancer(Helper helper) { - return new ClusterResolverLoadBalancer(helper); + LoadBalancerRegistry lbRegistry = this.lbRegistry; + if (lbRegistry == null) { + lbRegistry = LoadBalancerRegistry.getDefaultRegistry(); + } + return new ClusterResolverLoadBalancer(helper, lbRegistry); } static final class ClusterResolverConfig { diff --git a/xds/src/main/java/io/grpc/xds/EnvoyServerProtoData.java b/xds/src/main/java/io/grpc/xds/EnvoyServerProtoData.java index fd2a1d2a069..9c2ee641423 100644 --- a/xds/src/main/java/io/grpc/xds/EnvoyServerProtoData.java +++ b/xds/src/main/java/io/grpc/xds/EnvoyServerProtoData.java @@ -328,7 +328,7 @@ static OutlierDetection fromEnvoyOutlierDetection( Integer minimumHosts = envoyOutlierDetection.hasSuccessRateMinimumHosts() ? envoyOutlierDetection.getSuccessRateMinimumHosts().getValue() : null; Integer requestVolume = envoyOutlierDetection.hasSuccessRateRequestVolume() - ? envoyOutlierDetection.getSuccessRateMinimumHosts().getValue() : null; + ? envoyOutlierDetection.getSuccessRateRequestVolume().getValue() : null; successRateEjection = SuccessRateEjection.create(stdevFactor, enforcementPercentage, minimumHosts, requestVolume); diff --git a/xds/src/main/java/io/grpc/xds/XdsDependencyManager.java b/xds/src/main/java/io/grpc/xds/XdsDependencyManager.java index 5ed3821b7e4..21b0ad7dc66 100644 --- a/xds/src/main/java/io/grpc/xds/XdsDependencyManager.java +++ b/xds/src/main/java/io/grpc/xds/XdsDependencyManager.java @@ -41,6 +41,7 @@ import io.grpc.xds.client.XdsClient; import io.grpc.xds.client.XdsClient.ResourceWatcher; import io.grpc.xds.client.XdsResourceType; +import java.net.SocketAddress; import java.net.URI; import java.net.URISyntaxException; import java.util.ArrayList; @@ -83,7 +84,7 @@ private enum TrackedWatcherTypeEnum { private static final int MAX_CLUSTER_RECURSION_DEPTH = 16; // Specified by gRFC A37 - static boolean enableLogicalDns = false; + static boolean enableLogicalDns = true; private final String listenerName; private final XdsClient xdsClient; @@ -394,10 +395,13 @@ private static StatusOr dnsToEdsUpdate( return StatusOr.fromStatus(dnsData.getStatus()); } - List endpoints = new ArrayList<>(); + List addresses = new ArrayList<>(); for (EquivalentAddressGroup eag : dnsData.getValue()) { - endpoints.add(Endpoints.LbEndpoint.create(eag, 1, true, dnsHostName, ImmutableMap.of())); + addresses.addAll(eag.getAddresses()); } + EquivalentAddressGroup eag = new EquivalentAddressGroup(addresses); + List endpoints = ImmutableList.of( + Endpoints.LbEndpoint.create(eag, 1, true, dnsHostName, ImmutableMap.of())); LocalityLbEndpoints lbEndpoints = LocalityLbEndpoints.create(endpoints, 1, 0, ImmutableMap.of()); return StatusOr.fromValue(new XdsEndpointResource.EdsUpdate( diff --git a/xds/src/main/java/io/grpc/xds/XdsNameResolver.java b/xds/src/main/java/io/grpc/xds/XdsNameResolver.java index 58d1ff769fe..55059dc4a5a 100644 --- a/xds/src/main/java/io/grpc/xds/XdsNameResolver.java +++ b/xds/src/main/java/io/grpc/xds/XdsNameResolver.java @@ -234,6 +234,13 @@ public void start(Listener2 listener) { resolveState.start(); } + @Override + public void refresh() { + if (resolveState != null) { + resolveState.refresh(); + } + } + private static String expandPercentS(String template, String replacement) { return template.replace("%s", replacement); } @@ -323,7 +330,10 @@ private void updateResolutionResult(XdsConfig xdsConfig) { .setAttributes(attrs) .setServiceConfig(parsedServiceConfig) .build(); - listener.onResult2(result); + if (!listener.onResult2(result).isOk()) { + // TODO: check if this is right + resolveState.xdsDependencyManager.requestReresolution(); + } } /** @@ -662,6 +672,10 @@ void start() { xdsDependencyManager.start(this); } + void refresh() { + xdsDependencyManager.requestReresolution(); + } + private void shutdown() { if (stopped) { return; diff --git a/xds/src/test/java/io/grpc/xds/ClusterResolverLoadBalancerTest.java b/xds/src/test/java/io/grpc/xds/ClusterResolverLoadBalancerTest.java index 982677d24da..be68018792b 100644 --- a/xds/src/test/java/io/grpc/xds/ClusterResolverLoadBalancerTest.java +++ b/xds/src/test/java/io/grpc/xds/ClusterResolverLoadBalancerTest.java @@ -21,18 +21,41 @@ import static io.grpc.xds.XdsLbPolicies.PRIORITY_POLICY_NAME; import static io.grpc.xds.XdsLbPolicies.WEIGHTED_TARGET_POLICY_NAME; import static io.grpc.xds.XdsLbPolicies.WRR_LOCALITY_POLICY_NAME; +import static io.grpc.xds.XdsTestControlPlaneService.ADS_TYPE_URL_CDS; +import static io.grpc.xds.XdsTestControlPlaneService.ADS_TYPE_URL_EDS; +import static io.grpc.xds.XdsTestControlPlaneService.ADS_TYPE_URL_LDS; +import static io.grpc.xds.XdsTestControlPlaneService.ADS_TYPE_URL_RDS; import static java.util.stream.Collectors.toList; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; -import static org.mockito.Mockito.reset; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Iterables; import com.google.common.testing.EqualsTester; +import com.google.protobuf.Any; +import com.google.protobuf.Duration; +import com.google.protobuf.UInt32Value; +import com.google.protobuf.UInt64Value; +import io.envoyproxy.envoy.config.cluster.v3.Cluster; +import io.envoyproxy.envoy.config.cluster.v3.OutlierDetection; +import io.envoyproxy.envoy.config.core.v3.Address; +import io.envoyproxy.envoy.config.core.v3.AggregatedConfigSource; +import io.envoyproxy.envoy.config.core.v3.ConfigSource; +import io.envoyproxy.envoy.config.core.v3.HealthStatus; +import io.envoyproxy.envoy.config.core.v3.Locality; +import io.envoyproxy.envoy.config.core.v3.Metadata; +import io.envoyproxy.envoy.config.core.v3.SocketAddress; +import io.envoyproxy.envoy.config.core.v3.TransportSocket; +import io.envoyproxy.envoy.config.endpoint.v3.ClusterLoadAssignment; +import io.envoyproxy.envoy.config.endpoint.v3.Endpoint; +import io.envoyproxy.envoy.config.endpoint.v3.LbEndpoint; +import io.envoyproxy.envoy.config.endpoint.v3.LocalityLbEndpoints; +import io.envoyproxy.envoy.extensions.transport_sockets.http_11_proxy.v3.Http11ProxyUpstreamTransport; import io.grpc.Attributes; import io.grpc.ChannelLogger; import io.grpc.ConnectivityState; @@ -53,25 +76,23 @@ import io.grpc.NameResolverProvider; import io.grpc.NameResolverRegistry; import io.grpc.Status; -import io.grpc.Status.Code; import io.grpc.StatusOr; import io.grpc.SynchronizationContext; -import io.grpc.internal.BackoffPolicy; +import io.grpc.inprocess.InProcessChannelBuilder; +import io.grpc.inprocess.InProcessServerBuilder; import io.grpc.internal.FakeClock; -import io.grpc.internal.FakeClock.ScheduledTask; import io.grpc.internal.GrpcUtil; -import io.grpc.internal.ObjectPool; +import io.grpc.testing.GrpcCleanupRule; import io.grpc.util.GracefulSwitchLoadBalancer; import io.grpc.util.GracefulSwitchLoadBalancerAccessor; +import io.grpc.util.OutlierDetectionLoadBalancer.OutlierDetectionLoadBalancerConfig; import io.grpc.util.OutlierDetectionLoadBalancerProvider; +import io.grpc.xds.CdsLoadBalancerProvider.CdsConfig; import io.grpc.xds.ClusterImplLoadBalancerProvider.ClusterImplConfig; import io.grpc.xds.ClusterResolverLoadBalancerProvider.ClusterResolverConfig; import io.grpc.xds.ClusterResolverLoadBalancerProvider.ClusterResolverConfig.DiscoveryMechanism; import io.grpc.xds.Endpoints.DropOverload; -import io.grpc.xds.Endpoints.LbEndpoint; -import io.grpc.xds.Endpoints.LocalityLbEndpoints; import io.grpc.xds.EnvoyServerProtoData.FailurePercentageEjection; -import io.grpc.xds.EnvoyServerProtoData.OutlierDetection; import io.grpc.xds.EnvoyServerProtoData.SuccessRateEjection; import io.grpc.xds.EnvoyServerProtoData.UpstreamTlsContext; import io.grpc.xds.LeastRequestLoadBalancer.LeastRequestConfig; @@ -79,24 +100,16 @@ import io.grpc.xds.PriorityLoadBalancerProvider.PriorityLbConfig.PriorityChildConfig; import io.grpc.xds.RingHashLoadBalancer.RingHashConfig; import io.grpc.xds.WrrLocalityLoadBalancer.WrrLocalityConfig; -import io.grpc.xds.XdsEndpointResource.EdsUpdate; import io.grpc.xds.client.Bootstrapper.ServerInfo; -import io.grpc.xds.client.Locality; import io.grpc.xds.client.XdsClient; -import io.grpc.xds.client.XdsResourceType; import io.grpc.xds.internal.security.CommonTlsContextTestsUtil; import java.net.InetSocketAddress; -import java.net.SocketAddress; import java.net.URI; -import java.net.URISyntaxException; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; -import java.util.HashMap; +import java.util.Iterator; import java.util.List; -import java.util.Map; -import java.util.Objects; -import java.util.concurrent.Executor; import java.util.concurrent.TimeUnit; import javax.annotation.Nullable; import org.junit.After; @@ -107,9 +120,7 @@ import org.junit.runners.JUnit4; import org.mockito.ArgumentCaptor; import org.mockito.Captor; -import org.mockito.InOrder; import org.mockito.Mock; -import org.mockito.Mockito; import org.mockito.junit.MockitoJUnit; import org.mockito.junit.MockitoRule; @@ -117,39 +128,43 @@ @RunWith(JUnit4.class) public class ClusterResolverLoadBalancerTest { @Rule public final MockitoRule mocks = MockitoJUnit.rule(); + @Rule + public final GrpcCleanupRule cleanupRule = new GrpcCleanupRule(); - private static final String AUTHORITY = "api.google.com"; - private static final String CLUSTER1 = "cluster-foo.googleapis.com"; - private static final String CLUSTER2 = "cluster-bar.googleapis.com"; - private static final String CLUSTER_DNS = "cluster-dns.googleapis.com"; - private static final String EDS_SERVICE_NAME1 = "backend-service-foo.googleapis.com"; - private static final String EDS_SERVICE_NAME2 = "backend-service-bar.googleapis.com"; + private static final String SERVER_NAME = "example.com"; + private static final String CLUSTER = "cluster-foo.googleapis.com"; + private static final String EDS_SERVICE_NAME = "backend-service-foo.googleapis.com"; private static final String DNS_HOST_NAME = "dns-service.googleapis.com"; - private static final ServerInfo LRS_SERVER_INFO = - ServerInfo.create("lrs.googleapis.com", InsecureChannelCredentials.create()); - private final Locality locality1 = - Locality.create("test-region-1", "test-zone-1", "test-subzone-1"); - private final Locality locality2 = - Locality.create("test-region-2", "test-zone-2", "test-subzone-2"); - private final Locality locality3 = - Locality.create("test-region-3", "test-zone-3", "test-subzone-3"); - private final UpstreamTlsContext tlsContext = - CommonTlsContextTestsUtil.buildUpstreamTlsContext("google_cloud_private_spiffe", true); - private final OutlierDetection outlierDetection = OutlierDetection.create( - 100L, 100L, 100L, 100, SuccessRateEjection.create(100, 100, 100, 100), - FailurePercentageEjection.create(100, 100, 100, 100)); - private final DiscoveryMechanism edsDiscoveryMechanism1 = - DiscoveryMechanism.forEds(CLUSTER1, EDS_SERVICE_NAME1, LRS_SERVER_INFO, 100L, tlsContext, - Collections.emptyMap(), null); - private final DiscoveryMechanism edsDiscoveryMechanism2 = - DiscoveryMechanism.forEds(CLUSTER2, EDS_SERVICE_NAME2, LRS_SERVER_INFO, 200L, tlsContext, - Collections.emptyMap(), null); - private final DiscoveryMechanism edsDiscoveryMechanismWithOutlierDetection = - DiscoveryMechanism.forEds(CLUSTER1, EDS_SERVICE_NAME1, LRS_SERVER_INFO, 100L, tlsContext, - Collections.emptyMap(), outlierDetection); - private final DiscoveryMechanism logicalDnsDiscoveryMechanism = - DiscoveryMechanism.forLogicalDns(CLUSTER_DNS, DNS_HOST_NAME, LRS_SERVER_INFO, 300L, null, - Collections.emptyMap()); + private static final Cluster EDS_CLUSTER = Cluster.newBuilder() + .setName(CLUSTER) + .setType(Cluster.DiscoveryType.EDS) + .setEdsClusterConfig(Cluster.EdsClusterConfig.newBuilder() + .setServiceName(EDS_SERVICE_NAME) + .setEdsConfig(ConfigSource.newBuilder() + .setAds(AggregatedConfigSource.newBuilder()))) + .build(); + private static final Cluster LOGICAL_DNS_CLUSTER = Cluster.newBuilder() + .setName(CLUSTER) + .setType(Cluster.DiscoveryType.LOGICAL_DNS) + .setLoadAssignment(ClusterLoadAssignment.newBuilder() + .addEndpoints(LocalityLbEndpoints.newBuilder() + .addLbEndpoints(newSocketLbEndpoint(DNS_HOST_NAME, 9000)))) + .build(); + private static final Locality LOCALITY1 = Locality.newBuilder() + .setRegion("test-region-1") + .setZone("test-zone-1") + .setSubZone("test-subzone-1") + .build(); + private static final Locality LOCALITY2 = Locality.newBuilder() + .setRegion("test-region-2") + .setZone("test-zone-2") + .setSubZone("test-subzone-2") + .build(); + private static final Locality LOCALITY3 = Locality.newBuilder() + .setRegion("test-region-3") + .setZone("test-zone-3") + .setSubZone("test-subzone-3") + .build(); private final SynchronizationContext syncContext = new SynchronizationContext( new Thread.UncaughtExceptionHandler() { @@ -161,50 +176,32 @@ public void uncaughtException(Thread t, Throwable e) { private final FakeClock fakeClock = new FakeClock(); private final LoadBalancerRegistry lbRegistry = new LoadBalancerRegistry(); private final NameResolverRegistry nsRegistry = new NameResolverRegistry(); - private final Object roundRobin = GracefulSwitchLoadBalancer.createLoadBalancingPolicyConfig( - new FakeLoadBalancerProvider("wrr_locality_experimental"), new WrrLocalityConfig( - GracefulSwitchLoadBalancer.createLoadBalancingPolicyConfig( - new FakeLoadBalancerProvider("round_robin"), null))); - private final Object ringHash = GracefulSwitchLoadBalancer.createLoadBalancingPolicyConfig( - new FakeLoadBalancerProvider("ring_hash_experimental"), new RingHashConfig(10L, 100L, "")); - private final Object leastRequest = GracefulSwitchLoadBalancer.createLoadBalancingPolicyConfig( - new FakeLoadBalancerProvider("wrr_locality_experimental"), new WrrLocalityConfig( - GracefulSwitchLoadBalancer.createLoadBalancingPolicyConfig( - new FakeLoadBalancerProvider("least_request_experimental"), - new LeastRequestConfig(3)))); private final List childBalancers = new ArrayList<>(); private final List resolvers = new ArrayList<>(); - private final FakeXdsClient xdsClient = new FakeXdsClient(); - private final ObjectPool xdsClientPool = new ObjectPool() { - @Override - public XdsClient getObject() { - xdsClientRefs++; - return xdsClient; - } - - @Override - public XdsClient returnObject(Object object) { - xdsClientRefs--; - return null; - } - }; - + private final XdsTestControlPlaneService controlPlaneService = new XdsTestControlPlaneService(); + private final XdsClient xdsClient = XdsTestUtils.createXdsClient( + Arrays.asList("control-plane.example.com"), + serverInfo -> new GrpcXdsTransportFactory.GrpcXdsTransport( + InProcessChannelBuilder + .forName(serverInfo.target()) + .directExecutor() + .build()), + fakeClock); + + + private XdsDependencyManager xdsDepManager; @Mock private Helper helper; - @Mock - private BackoffPolicy.Provider backoffPolicyProvider; - @Mock - private BackoffPolicy backoffPolicy1; - @Mock - private BackoffPolicy backoffPolicy2; @Captor private ArgumentCaptor pickerCaptor; - private int xdsClientRefs; - private ClusterResolverLoadBalancer loadBalancer; - private NameResolverProvider fakeNameResolverProvider; + private CdsLoadBalancer2 loadBalancer; + private boolean originalIsEnabledXdsHttpConnect; @Before - public void setUp() throws URISyntaxException { + public void setUp() throws Exception { + lbRegistry.register(new ClusterResolverLoadBalancerProvider(lbRegistry)); + lbRegistry.register(new RingHashLoadBalancerProvider()); + lbRegistry.register(new WrrLocalityLoadBalancerProvider()); lbRegistry.register(new FakeLoadBalancerProvider(PRIORITY_POLICY_NAME)); lbRegistry.register(new FakeLoadBalancerProvider(CLUSTER_IMPL_POLICY_NAME)); lbRegistry.register(new FakeLoadBalancerProvider(WEIGHTED_TARGET_POLICY_NAME)); @@ -217,81 +214,112 @@ public void setUp() throws URISyntaxException { .setSynchronizationContext(syncContext) .setServiceConfigParser(mock(ServiceConfigParser.class)) .setChannelLogger(mock(ChannelLogger.class)) + .setScheduledExecutorService(fakeClock.getScheduledExecutorService()) + .setNameResolverRegistry(nsRegistry) .build(); - fakeNameResolverProvider = new FakeNameResolverProvider(false); - nsRegistry.register(fakeNameResolverProvider); - when(helper.getNameResolverRegistry()).thenReturn(nsRegistry); - when(helper.getNameResolverArgs()).thenReturn(args); - when(helper.getSynchronizationContext()).thenReturn(syncContext); - when(helper.getScheduledExecutorService()).thenReturn(fakeClock.getScheduledExecutorService()); - when(helper.getAuthority()).thenReturn(AUTHORITY); - when(backoffPolicyProvider.get()).thenReturn(backoffPolicy1, backoffPolicy2); - when(backoffPolicy1.nextBackoffNanos()) - .thenReturn(TimeUnit.SECONDS.toNanos(1L), TimeUnit.SECONDS.toNanos(10L)); - when(backoffPolicy2.nextBackoffNanos()) - .thenReturn(TimeUnit.SECONDS.toNanos(5L), TimeUnit.SECONDS.toNanos(50L)); - loadBalancer = new ClusterResolverLoadBalancer(helper, lbRegistry, backoffPolicyProvider); + + xdsDepManager = new XdsDependencyManager( + xdsClient, + syncContext, + SERVER_NAME, + SERVER_NAME, + args); + + cleanupRule.register(InProcessServerBuilder + .forName("control-plane.example.com") + .addService(controlPlaneService) + .directExecutor() + .build() + .start()); + + controlPlaneService.setXdsConfig(ADS_TYPE_URL_LDS, ImmutableMap.of( + SERVER_NAME, ControlPlaneRule.buildClientListener(SERVER_NAME, "my-route"))); + controlPlaneService.setXdsConfig(ADS_TYPE_URL_RDS, ImmutableMap.of( + "my-route", XdsTestUtils.buildRouteConfiguration(SERVER_NAME, "my-route", CLUSTER))); + controlPlaneService.setXdsConfig(ADS_TYPE_URL_CDS, ImmutableMap.of( + CLUSTER, EDS_CLUSTER)); + controlPlaneService.setXdsConfig(ADS_TYPE_URL_EDS, ImmutableMap.of( + EDS_SERVICE_NAME, ControlPlaneRule.buildClusterLoadAssignment( + "127.0.0.1", "", 8080, EDS_SERVICE_NAME))); + + nsRegistry.register(new FakeNameResolverProvider()); + when(helper.getAuthority()).thenReturn("api.google.com"); + doAnswer((inv) -> { + xdsDepManager.requestReresolution(); + return null; + }).when(helper).refreshNameResolution(); + loadBalancer = new CdsLoadBalancer2(helper, lbRegistry); + + originalIsEnabledXdsHttpConnect = XdsClusterResource.isEnabledXdsHttpConnect; } @After - public void tearDown() { + public void tearDown() throws Exception { + XdsClusterResource.isEnabledXdsHttpConnect = originalIsEnabledXdsHttpConnect; loadBalancer.shutdown(); + if (xdsDepManager != null) { + xdsDepManager.shutdown(); + } + assertThat(xdsClient.getSubscribedResourcesMetadataSnapshot().get()).isEmpty(); + xdsClient.shutdown(); + assertThat(childBalancers).isEmpty(); assertThat(resolvers).isEmpty(); - assertThat(xdsClient.watchers).isEmpty(); - assertThat(xdsClientRefs).isEqualTo(0); assertThat(fakeClock.getPendingTasks()).isEmpty(); } @Test - public void edsClustersWithRingHashEndpointLbPolicy() { - ClusterResolverConfig config = new ClusterResolverConfig( - edsDiscoveryMechanism1, ringHash, false); - deliverLbConfig(config); - assertThat(xdsClient.watchers.keySet()).containsExactly(EDS_SERVICE_NAME1); - assertThat(childBalancers).isEmpty(); - + public void edsClustersWithRingHashEndpointLbPolicy() throws Exception { + Cluster cluster = EDS_CLUSTER.toBuilder() + .setLbPolicy(Cluster.LbPolicy.RING_HASH) + .setRingHashLbConfig(Cluster.RingHashLbConfig.newBuilder() + .setMinimumRingSize(UInt64Value.of(10)) + .setMaximumRingSize(UInt64Value.of(100)) + .build()) + .build(); // One priority with two localities of different weights. - EquivalentAddressGroup endpoint1 = makeAddress("endpoint-addr-1"); - EquivalentAddressGroup endpoint2 = makeAddress("endpoint-addr-2"); - EquivalentAddressGroup endpoint3 = makeAddress("endpoint-addr-3"); - LocalityLbEndpoints localityLbEndpoints1 = - LocalityLbEndpoints.create( - Arrays.asList( - LbEndpoint.create(endpoint1, 0 /* loadBalancingWeight */, - true, "hostname1", ImmutableMap.of()), - LbEndpoint.create(endpoint2, 0 /* loadBalancingWeight */, - true, "hostname2", ImmutableMap.of())), - 10 /* localityWeight */, 1 /* priority */, ImmutableMap.of()); - LocalityLbEndpoints localityLbEndpoints2 = - LocalityLbEndpoints.create( - Collections.singletonList( - LbEndpoint.create( - endpoint3, 60 /* loadBalancingWeight */, true, - "hostname3", ImmutableMap.of())), - 50 /* localityWeight */, 1 /* priority */, ImmutableMap.of()); - xdsClient.deliverClusterLoadAssignment( - EDS_SERVICE_NAME1, - ImmutableMap.of(locality1, localityLbEndpoints1, locality2, localityLbEndpoints2)); + ClusterLoadAssignment clusterLoadAssignment = ClusterLoadAssignment.newBuilder() + .setClusterName(EDS_SERVICE_NAME) + .addEndpoints(LocalityLbEndpoints.newBuilder() + .setLoadBalancingWeight(UInt32Value.of(10)) + .setLocality(LOCALITY1) + .addLbEndpoints(newSocketLbEndpoint("127.0.0.1", 8080)) + .addLbEndpoints(newSocketLbEndpoint("127.0.0.2", 8080))) + .addEndpoints(LocalityLbEndpoints.newBuilder() + .setLoadBalancingWeight(UInt32Value.of(50)) + .setLocality(LOCALITY2) + .addLbEndpoints(newSocketLbEndpoint("127.0.1.1", 8080) + .setLoadBalancingWeight(UInt32Value.of(60)))) + .build(); + controlPlaneService.setXdsConfig(ADS_TYPE_URL_CDS, ImmutableMap.of( + CLUSTER, cluster)); + controlPlaneService.setXdsConfig(ADS_TYPE_URL_EDS, ImmutableMap.of( + EDS_SERVICE_NAME, clusterLoadAssignment)); + startXdsDepManager(); + + verify(helper, never()).updateBalancingState(eq(ConnectivityState.TRANSIENT_FAILURE), any()); assertThat(childBalancers).hasSize(1); FakeLoadBalancer childBalancer = Iterables.getOnlyElement(childBalancers); assertThat(childBalancer.addresses).hasSize(3); EquivalentAddressGroup addr1 = childBalancer.addresses.get(0); EquivalentAddressGroup addr2 = childBalancer.addresses.get(1); EquivalentAddressGroup addr3 = childBalancer.addresses.get(2); - // Endpoints in locality1 have no endpoint-level weight specified, so all endpoints within - // locality1 are equally weighted. - assertThat(addr1.getAddresses()).isEqualTo(endpoint1.getAddresses()); + // Endpoints in LOCALITY1 have no endpoint-level weight specified, so all endpoints within + // LOCALITY1 are equally weighted. + assertThat(addr1.getAddresses()) + .isEqualTo(Arrays.asList(newInetSocketAddress("127.0.0.1", 8080))); assertThat(addr1.getAttributes().get(XdsAttributes.ATTR_SERVER_WEIGHT)) .isEqualTo(10); - assertThat(addr2.getAddresses()).isEqualTo(endpoint2.getAddresses()); + assertThat(addr2.getAddresses()) + .isEqualTo(Arrays.asList(newInetSocketAddress("127.0.0.2", 8080))); assertThat(addr2.getAttributes().get(XdsAttributes.ATTR_SERVER_WEIGHT)) .isEqualTo(10); - assertThat(addr3.getAddresses()).isEqualTo(endpoint3.getAddresses()); + assertThat(addr3.getAddresses()) + .isEqualTo(Arrays.asList(newInetSocketAddress("127.0.1.1", 8080))); assertThat(addr3.getAttributes().get(XdsAttributes.ATTR_SERVER_WEIGHT)) .isEqualTo(50 * 60); assertThat(childBalancer.name).isEqualTo(PRIORITY_POLICY_NAME); PriorityLbConfig priorityLbConfig = (PriorityLbConfig) childBalancer.config; - assertThat(priorityLbConfig.priorities).containsExactly(CLUSTER1 + "[child1]"); + assertThat(priorityLbConfig.priorities).containsExactly(CLUSTER + "[child1]"); PriorityChildConfig priorityChildConfig = Iterables.getOnlyElement(priorityLbConfig.childConfigs.values()); assertThat(priorityChildConfig.ignoreReresolution).isTrue(); @@ -300,8 +328,8 @@ public void edsClustersWithRingHashEndpointLbPolicy() { .isEqualTo(CLUSTER_IMPL_POLICY_NAME); ClusterImplConfig clusterImplConfig = (ClusterImplConfig) GracefulSwitchLoadBalancerAccessor.getChildConfig(priorityChildConfig.childConfig); - assertClusterImplConfig(clusterImplConfig, CLUSTER1, EDS_SERVICE_NAME1, LRS_SERVER_INFO, 100L, - tlsContext, Collections.emptyList(), "ring_hash_experimental"); + assertClusterImplConfig(clusterImplConfig, CLUSTER, EDS_SERVICE_NAME, null, null, + null, Collections.emptyList(), "ring_hash_experimental"); RingHashConfig ringHashConfig = (RingHashConfig) GracefulSwitchLoadBalancerAccessor.getChildConfig(clusterImplConfig.childConfig); assertThat(ringHashConfig.minRingSize).isEqualTo(10L); @@ -310,31 +338,33 @@ public void edsClustersWithRingHashEndpointLbPolicy() { @Test public void edsClustersWithLeastRequestEndpointLbPolicy() { - ClusterResolverConfig config = new ClusterResolverConfig( - edsDiscoveryMechanism1, leastRequest, false); - deliverLbConfig(config); - assertThat(xdsClient.watchers.keySet()).containsExactly(EDS_SERVICE_NAME1); - assertThat(childBalancers).isEmpty(); - + Cluster cluster = EDS_CLUSTER.toBuilder() + .setLbPolicy(Cluster.LbPolicy.LEAST_REQUEST) + .build(); // Simple case with one priority and one locality - EquivalentAddressGroup endpoint = makeAddress("endpoint-addr-1"); - LocalityLbEndpoints localityLbEndpoints = - LocalityLbEndpoints.create( - Arrays.asList( - LbEndpoint.create(endpoint, 0 /* loadBalancingWeight */, true, - "hostname1", ImmutableMap.of())), - 100 /* localityWeight */, 1 /* priority */, ImmutableMap.of()); - xdsClient.deliverClusterLoadAssignment( - EDS_SERVICE_NAME1, - ImmutableMap.of(locality1, localityLbEndpoints)); + ClusterLoadAssignment clusterLoadAssignment = ClusterLoadAssignment.newBuilder() + .setClusterName(EDS_SERVICE_NAME) + .addEndpoints(LocalityLbEndpoints.newBuilder() + .setLoadBalancingWeight(UInt32Value.of(100)) + .setLocality(LOCALITY1) + .addLbEndpoints(newSocketLbEndpoint("127.0.0.1", 8080))) + .build(); + controlPlaneService.setXdsConfig(ADS_TYPE_URL_CDS, ImmutableMap.of( + CLUSTER, cluster)); + controlPlaneService.setXdsConfig(ADS_TYPE_URL_EDS, ImmutableMap.of( + EDS_SERVICE_NAME, clusterLoadAssignment)); + startXdsDepManager(); + + verify(helper, never()).updateBalancingState(eq(ConnectivityState.TRANSIENT_FAILURE), any()); assertThat(childBalancers).hasSize(1); FakeLoadBalancer childBalancer = Iterables.getOnlyElement(childBalancers); assertThat(childBalancer.addresses).hasSize(1); EquivalentAddressGroup addr = childBalancer.addresses.get(0); - assertThat(addr.getAddresses()).isEqualTo(endpoint.getAddresses()); + assertThat(addr.getAddresses()) + .isEqualTo(Arrays.asList(newInetSocketAddress("127.0.0.1", 8080))); assertThat(childBalancer.name).isEqualTo(PRIORITY_POLICY_NAME); PriorityLbConfig priorityLbConfig = (PriorityLbConfig) childBalancer.config; - assertThat(priorityLbConfig.priorities).containsExactly(CLUSTER1 + "[child1]"); + assertThat(priorityLbConfig.priorities).containsExactly(CLUSTER + "[child1]"); PriorityChildConfig priorityChildConfig = Iterables.getOnlyElement(priorityLbConfig.childConfigs.values()); assertThat(GracefulSwitchLoadBalancerAccessor.getChildProvider(priorityChildConfig.childConfig) @@ -342,8 +372,8 @@ public void edsClustersWithLeastRequestEndpointLbPolicy() { .isEqualTo(CLUSTER_IMPL_POLICY_NAME); ClusterImplConfig clusterImplConfig = (ClusterImplConfig) GracefulSwitchLoadBalancerAccessor.getChildConfig(priorityChildConfig.childConfig); - assertClusterImplConfig(clusterImplConfig, CLUSTER1, EDS_SERVICE_NAME1, LRS_SERVER_INFO, 100L, - tlsContext, Collections.emptyList(), WRR_LOCALITY_POLICY_NAME); + assertClusterImplConfig(clusterImplConfig, CLUSTER, EDS_SERVICE_NAME, null, null, + null, Collections.emptyList(), WRR_LOCALITY_POLICY_NAME); WrrLocalityConfig wrrLocalityConfig = (WrrLocalityConfig) GracefulSwitchLoadBalancerAccessor.getChildConfig(clusterImplConfig.childConfig); LoadBalancerProvider childProvider = @@ -357,23 +387,22 @@ public void edsClustersWithLeastRequestEndpointLbPolicy() { @Test public void edsClustersEndpointHostname_addedToAddressAttribute() { - ClusterResolverConfig config = new ClusterResolverConfig( - edsDiscoveryMechanismWithOutlierDetection, leastRequest, false); - deliverLbConfig(config); - assertThat(xdsClient.watchers.keySet()).containsExactly(EDS_SERVICE_NAME1); - assertThat(childBalancers).isEmpty(); - // Simple case with one priority and one locality - EquivalentAddressGroup endpoint = makeAddress("endpoint-addr-1"); - LocalityLbEndpoints localityLbEndpoints = - LocalityLbEndpoints.create( - Arrays.asList( - LbEndpoint.create(endpoint, 0 /* loadBalancingWeight */, true, - "hostname1", ImmutableMap.of())), - 100 /* localityWeight */, 1 /* priority */, ImmutableMap.of()); - xdsClient.deliverClusterLoadAssignment( - EDS_SERVICE_NAME1, - ImmutableMap.of(locality1, localityLbEndpoints)); + ClusterLoadAssignment clusterLoadAssignment = ClusterLoadAssignment.newBuilder() + .setClusterName(EDS_SERVICE_NAME) + .addEndpoints(LocalityLbEndpoints.newBuilder() + .setLoadBalancingWeight(UInt32Value.of(100)) + .setLocality(LOCALITY1) + .addLbEndpoints(LbEndpoint.newBuilder() + .setEndpoint(Endpoint.newBuilder() + .setHostname("hostname1") + .setAddress(newAddress("127.0.0.1", 8000))))) + .build(); + controlPlaneService.setXdsConfig(ADS_TYPE_URL_EDS, ImmutableMap.of( + EDS_SERVICE_NAME, clusterLoadAssignment)); + startXdsDepManager(); + + verify(helper, never()).updateBalancingState(eq(ConnectivityState.TRANSIENT_FAILURE), any()); assertThat(childBalancers).hasSize(1); FakeLoadBalancer childBalancer = Iterables.getOnlyElement(childBalancers); @@ -384,85 +413,87 @@ public void edsClustersEndpointHostname_addedToAddressAttribute() { @Test public void endpointAddressRewritten_whenProxyMetadataIsInEndpointMetadata() { - ClusterResolverConfig config = new ClusterResolverConfig( - edsDiscoveryMechanismWithOutlierDetection, leastRequest, true); - deliverLbConfig(config); - assertThat(xdsClient.watchers.keySet()).containsExactly(EDS_SERVICE_NAME1); - assertThat(childBalancers).isEmpty(); - - EquivalentAddressGroup endpoint = - new EquivalentAddressGroup(InetSocketAddress.createUnresolved("127.0.0.1", 8080)); - - // Proxy address in endpointMetadata (use FakeSocketAddress directly) - SocketAddress proxyAddress = new FakeSocketAddress("127.0.0.2"); - ImmutableMap endpointMetadata = - ImmutableMap.of("envoy.http11_proxy_transport_socket.proxy_address", proxyAddress); - - // No proxy in locality metadata - ImmutableMap localityMetadata = ImmutableMap.of(); - - LocalityLbEndpoints localityLbEndpoints = LocalityLbEndpoints.create( - Arrays.asList( - LbEndpoint.create(endpoint, 0 /* loadBalancingWeight */, true, - "hostname1", endpointMetadata)), - 100 /* localityWeight */, 1 /* priority */, localityMetadata); - - xdsClient.deliverClusterLoadAssignment( - EDS_SERVICE_NAME1, - ImmutableMap.of(locality1, localityLbEndpoints)); + XdsClusterResource.isEnabledXdsHttpConnect = true; + Cluster cluster = EDS_CLUSTER.toBuilder() + .setTransportSocket(TransportSocket.newBuilder() + .setName( + "type.googleapis.com/" + Http11ProxyUpstreamTransport.getDescriptor().getFullName()) + .setTypedConfig(Any.pack(Http11ProxyUpstreamTransport.getDefaultInstance()))) + .build(); + // Proxy address in endpointMetadata, and no proxy in locality metadata + ClusterLoadAssignment clusterLoadAssignment = ClusterLoadAssignment.newBuilder() + .setClusterName(EDS_SERVICE_NAME) + .addEndpoints(LocalityLbEndpoints.newBuilder() + .setLoadBalancingWeight(UInt32Value.of(100)) + .setLocality(LOCALITY1) + .addLbEndpoints(newSocketLbEndpoint("127.0.0.1", 8080) + .setMetadata(Metadata.newBuilder() + .putTypedFilterMetadata( + "envoy.http11_proxy_transport_socket.proxy_address", + Any.pack(newAddress("127.0.0.2", 8081).build())))) + .addLbEndpoints(newSocketLbEndpoint("127.0.0.3", 8082))) + .build(); + controlPlaneService.setXdsConfig(ADS_TYPE_URL_CDS, ImmutableMap.of( + CLUSTER, cluster)); + controlPlaneService.setXdsConfig(ADS_TYPE_URL_EDS, ImmutableMap.of( + EDS_SERVICE_NAME, clusterLoadAssignment)); + startXdsDepManager(); + verify(helper, never()).updateBalancingState(eq(ConnectivityState.TRANSIENT_FAILURE), any()); assertThat(childBalancers).hasSize(1); FakeLoadBalancer childBalancer = Iterables.getOnlyElement(childBalancers); // Get the rewritten address - SocketAddress rewrittenAddress = + java.net.SocketAddress rewrittenAddress = childBalancer.addresses.get(0).getAddresses().get(0); assertThat(rewrittenAddress).isInstanceOf(HttpConnectProxiedSocketAddress.class); HttpConnectProxiedSocketAddress proxiedSocket = (HttpConnectProxiedSocketAddress) rewrittenAddress; // Assert that the target address is the original address - assertThat(proxiedSocket.getTargetAddress()) - .isEqualTo(endpoint.getAddresses().get(0)); + assertThat(proxiedSocket.getTargetAddress()).isEqualTo(newInetSocketAddress("127.0.0.1", 8080)); // Assert that the proxy address is correctly set - assertThat(proxiedSocket.getProxyAddress()).isEqualTo(proxyAddress); + assertThat(proxiedSocket.getProxyAddress()).isEqualTo(newInetSocketAddress("127.0.0.2", 8081)); + + // Check the non-rewritten address + java.net.SocketAddress normalAddress = childBalancer.addresses.get(1).getAddresses().get(0); + assertThat(normalAddress).isEqualTo(newInetSocketAddress("127.0.0.3", 8082)); } @Test public void endpointAddressRewritten_whenProxyMetadataIsInLocalityMetadata() { - ClusterResolverConfig config = new ClusterResolverConfig( - edsDiscoveryMechanismWithOutlierDetection, leastRequest, true); - deliverLbConfig(config); - assertThat(xdsClient.watchers.keySet()).containsExactly(EDS_SERVICE_NAME1); - assertThat(childBalancers).isEmpty(); - - EquivalentAddressGroup endpoint = - new EquivalentAddressGroup(InetSocketAddress.createUnresolved("127.0.0.2", 8080)); - - // No proxy in endpointMetadata - ImmutableMap endpointMetadata = ImmutableMap.of(); - - // Proxy address is now in localityMetadata - SocketAddress proxyAddress = new FakeSocketAddress("proxy-addr"); - ImmutableMap localityMetadata = - ImmutableMap.of("envoy.http11_proxy_transport_socket.proxy_address", proxyAddress); - - LocalityLbEndpoints localityLbEndpoints = LocalityLbEndpoints.create( - Arrays.asList( - LbEndpoint.create(endpoint, 0 /* loadBalancingWeight */, true, - "hostname2", endpointMetadata)), - 100 /* localityWeight */, 1 /* priority */, localityMetadata); - - xdsClient.deliverClusterLoadAssignment( - EDS_SERVICE_NAME1, - ImmutableMap.of(locality1, localityLbEndpoints)); + XdsClusterResource.isEnabledXdsHttpConnect = true; + Cluster cluster = EDS_CLUSTER.toBuilder() + .setTransportSocket(TransportSocket.newBuilder() + .setName( + "type.googleapis.com/" + Http11ProxyUpstreamTransport.getDescriptor().getFullName()) + .setTypedConfig(Any.pack(Http11ProxyUpstreamTransport.getDefaultInstance()))) + .build(); + // No proxy address in endpointMetadata, and proxy in locality metadata + ClusterLoadAssignment clusterLoadAssignment = ClusterLoadAssignment.newBuilder() + .setClusterName(EDS_SERVICE_NAME) + .addEndpoints(LocalityLbEndpoints.newBuilder() + .setLoadBalancingWeight(UInt32Value.of(100)) + .setLocality(LOCALITY1) + .addLbEndpoints(newSocketLbEndpoint("127.0.0.1", 8080)) + .setMetadata(Metadata.newBuilder() + .putTypedFilterMetadata( + "envoy.http11_proxy_transport_socket.proxy_address", + Any.pack(newAddress("127.0.0.2", 8081).build())))) + .build(); + controlPlaneService.setXdsConfig(ADS_TYPE_URL_CDS, ImmutableMap.of( + CLUSTER, cluster)); + controlPlaneService.setXdsConfig(ADS_TYPE_URL_EDS, ImmutableMap.of( + EDS_SERVICE_NAME, clusterLoadAssignment)); + startXdsDepManager(); + verify(helper, never()).updateBalancingState(eq(ConnectivityState.TRANSIENT_FAILURE), any()); assertThat(childBalancers).hasSize(1); FakeLoadBalancer childBalancer = Iterables.getOnlyElement(childBalancers); // Get the rewritten address - SocketAddress rewrittenAddress = childBalancer.addresses.get(0).getAddresses().get(0); + java.net.SocketAddress rewrittenAddress = childBalancer.addresses.get(0).getAddresses().get(0); // Assert that the address was rewritten assertThat(rewrittenAddress).isInstanceOf(HttpConnectProxiedSocketAddress.class); @@ -470,47 +501,37 @@ public void endpointAddressRewritten_whenProxyMetadataIsInLocalityMetadata() { (HttpConnectProxiedSocketAddress) rewrittenAddress; // Assert that the target address is the original address - assertThat(proxiedSocket.getTargetAddress()).isEqualTo(endpoint.getAddresses().get(0)); + assertThat(proxiedSocket.getTargetAddress()).isEqualTo(newInetSocketAddress("127.0.0.1", 8080)); // Assert that the proxy address is correctly set from locality metadata - assertThat(proxiedSocket.getProxyAddress()).isEqualTo(proxyAddress); + assertThat(proxiedSocket.getProxyAddress()).isEqualTo(newInetSocketAddress("127.0.0.2", 8081)); } @Test public void onlyEdsClusters_receivedEndpoints() { - ClusterResolverConfig config = new ClusterResolverConfig( - edsDiscoveryMechanism2, roundRobin, false); - deliverLbConfig(config); - assertThat(xdsClient.watchers.keySet()).containsExactly(EDS_SERVICE_NAME2); - assertThat(childBalancers).isEmpty(); - // CLUSTER1 has priority 1 (priority3), which has locality 2, which has endpoint3. - // CLUSTER2 has priority 1 (priority1) and 2 (priority2); priority1 has locality1, - // which has endpoint1 and endpoint2; priority2 has locality3, which has endpoint4. - EquivalentAddressGroup endpoint1 = makeAddress("endpoint-addr-1"); - EquivalentAddressGroup endpoint2 = makeAddress("endpoint-addr-2"); - EquivalentAddressGroup endpoint4 = makeAddress("endpoint-addr-4"); - LocalityLbEndpoints localityLbEndpoints1 = - LocalityLbEndpoints.create( - Arrays.asList( - LbEndpoint.create(endpoint1, 100, - true, "hostname1", ImmutableMap.of()), - LbEndpoint.create(endpoint2, 100, - true, "hostname1", ImmutableMap.of())), - 70 /* localityWeight */, 1 /* priority */, ImmutableMap.of()); - LocalityLbEndpoints localityLbEndpoints3 = - LocalityLbEndpoints.create( - Collections.singletonList(LbEndpoint.create(endpoint4, 100, true, - "hostname3", ImmutableMap.of())), - 20 /* localityWeight */, 2 /* priority */, ImmutableMap.of()); - String priority1 = CLUSTER2 + "[child1]"; - String priority2 = CLUSTER2 + "[child2]"; - - // CLUSTER2: locality1 with priority 1 and locality3 with priority 2. - xdsClient.deliverClusterLoadAssignment( - EDS_SERVICE_NAME2, - ImmutableMap.of(locality1, localityLbEndpoints1, locality3, localityLbEndpoints3)); - - // Endpoints of all clusters have been resolved. + // Has two localities with different priorities + ClusterLoadAssignment clusterLoadAssignment = ClusterLoadAssignment.newBuilder() + .setClusterName(EDS_SERVICE_NAME) + .addEndpoints(LocalityLbEndpoints.newBuilder() + .setLoadBalancingWeight(UInt32Value.of(70)) + .setPriority(0) + .setLocality(LOCALITY1) + .addLbEndpoints(newSocketLbEndpoint("127.0.0.1", 8080)) + .addLbEndpoints(newSocketLbEndpoint("127.0.0.2", 8080))) + .addEndpoints(LocalityLbEndpoints.newBuilder() + .setLoadBalancingWeight(UInt32Value.of(30)) + .setPriority(1) + .setLocality(LOCALITY2) + .addLbEndpoints(newSocketLbEndpoint("127.0.0.3", 8080))) + .build(); + controlPlaneService.setXdsConfig(ADS_TYPE_URL_EDS, ImmutableMap.of( + EDS_SERVICE_NAME, clusterLoadAssignment)); + startXdsDepManager(); + + String priority1 = CLUSTER + "[child1]"; + String priority2 = CLUSTER + "[child2]"; + + verify(helper, never()).updateBalancingState(eq(ConnectivityState.TRANSIENT_FAILURE), any()); assertThat(childBalancers).hasSize(1); FakeLoadBalancer childBalancer = Iterables.getOnlyElement(childBalancers); assertThat(childBalancer.name).isEqualTo(PRIORITY_POLICY_NAME); @@ -525,8 +546,8 @@ public void onlyEdsClusters_receivedEndpoints() { .isEqualTo(CLUSTER_IMPL_POLICY_NAME); ClusterImplConfig clusterImplConfig1 = (ClusterImplConfig) GracefulSwitchLoadBalancerAccessor.getChildConfig(priorityChildConfig1.childConfig); - assertClusterImplConfig(clusterImplConfig1, CLUSTER2, EDS_SERVICE_NAME2, LRS_SERVER_INFO, 200L, - tlsContext, Collections.emptyList(), WRR_LOCALITY_POLICY_NAME); + assertClusterImplConfig(clusterImplConfig1, CLUSTER, EDS_SERVICE_NAME, null, null, + null, Collections.emptyList(), WRR_LOCALITY_POLICY_NAME); WrrLocalityConfig wrrLocalityConfig1 = (WrrLocalityConfig) GracefulSwitchLoadBalancerAccessor.getChildConfig(clusterImplConfig1.childConfig); LoadBalancerProvider childProvider1 = @@ -540,8 +561,8 @@ public void onlyEdsClusters_receivedEndpoints() { .isEqualTo(CLUSTER_IMPL_POLICY_NAME); ClusterImplConfig clusterImplConfig2 = (ClusterImplConfig) GracefulSwitchLoadBalancerAccessor.getChildConfig(priorityChildConfig2.childConfig); - assertClusterImplConfig(clusterImplConfig2, CLUSTER2, EDS_SERVICE_NAME2, LRS_SERVER_INFO, 200L, - tlsContext, Collections.emptyList(), WRR_LOCALITY_POLICY_NAME); + assertClusterImplConfig(clusterImplConfig2, CLUSTER, EDS_SERVICE_NAME, null, null, + null, Collections.emptyList(), WRR_LOCALITY_POLICY_NAME); WrrLocalityConfig wrrLocalityConfig2 = (WrrLocalityConfig) GracefulSwitchLoadBalancerAccessor.getChildConfig(clusterImplConfig1.childConfig); LoadBalancerProvider childProvider2 = @@ -554,36 +575,41 @@ public void onlyEdsClusters_receivedEndpoints() { GracefulSwitchLoadBalancerAccessor.getChildProvider(wrrLocalityConfig3.childConfig); assertThat(childProvider3.getPolicyName()).isEqualTo("round_robin"); + io.grpc.xds.client.Locality locality1 = io.grpc.xds.client.Locality.create( + LOCALITY1.getRegion(), LOCALITY1.getZone(), LOCALITY1.getSubZone()); + io.grpc.xds.client.Locality locality2 = io.grpc.xds.client.Locality.create( + LOCALITY2.getRegion(), LOCALITY2.getZone(), LOCALITY2.getSubZone()); for (EquivalentAddressGroup eag : childBalancer.addresses) { - if (eag.getAttributes().get(XdsAttributes.ATTR_LOCALITY) == locality1) { + io.grpc.xds.client.Locality locality = eag.getAttributes().get(XdsAttributes.ATTR_LOCALITY); + if (locality.equals(locality1)) { assertThat(eag.getAttributes().get(XdsAttributes.ATTR_LOCALITY_WEIGHT)) .isEqualTo(70); - } - if (eag.getAttributes().get(XdsAttributes.ATTR_LOCALITY) == locality2) { - assertThat(eag.getAttributes().get(XdsAttributes.ATTR_LOCALITY_WEIGHT)) - .isEqualTo(10); - } - if (eag.getAttributes().get(XdsAttributes.ATTR_LOCALITY) == locality3) { + } else if (locality.equals(locality2)) { assertThat(eag.getAttributes().get(XdsAttributes.ATTR_LOCALITY_WEIGHT)) - .isEqualTo(20); + .isEqualTo(30); + } else { + throw new AssertionError("Unexpected locality region: " + locality.region()); } } } @SuppressWarnings("unchecked") - private void verifyEdsPriorityNames(List want, - Map... updates) { - ClusterResolverConfig config = new ClusterResolverConfig( - edsDiscoveryMechanism2, roundRobin, false); - deliverLbConfig(config); - assertThat(xdsClient.watchers.keySet()).containsExactly(EDS_SERVICE_NAME2); - assertThat(childBalancers).isEmpty(); - - for (Map update: updates) { - xdsClient.deliverClusterLoadAssignment( - EDS_SERVICE_NAME2, - update); + private void verifyEdsPriorityNames(List want, List... updates) { + Iterator edsUpdates = Arrays.asList(updates).stream() + .map(update -> ClusterLoadAssignment.newBuilder() + .setClusterName(EDS_SERVICE_NAME) + .addAllEndpoints(update) + .build()) + .iterator(); + controlPlaneService.setXdsConfig(ADS_TYPE_URL_EDS, ImmutableMap.of( + EDS_SERVICE_NAME, edsUpdates.next())); + startXdsDepManager(); + + while (edsUpdates.hasNext()) { + controlPlaneService.setXdsConfig(ADS_TYPE_URL_EDS, ImmutableMap.of( + EDS_SERVICE_NAME, edsUpdates.next())); } + verify(helper, never()).updateBalancingState(eq(ConnectivityState.TRANSIENT_FAILURE), any()); assertThat(childBalancers).hasSize(1); FakeLoadBalancer childBalancer = Iterables.getOnlyElement(childBalancers); assertThat(childBalancer.name).isEqualTo(PRIORITY_POLICY_NAME); @@ -594,151 +620,198 @@ private void verifyEdsPriorityNames(List want, @Test @SuppressWarnings("unchecked") public void edsUpdatePriorityName_twoPriorities() { - verifyEdsPriorityNames(Arrays.asList(CLUSTER2 + "[child1]", CLUSTER2 + "[child2]"), - ImmutableMap.of(locality1, createEndpoints(1), - locality2, createEndpoints(2) - )); + verifyEdsPriorityNames(Arrays.asList(CLUSTER + "[child1]", CLUSTER + "[child2]"), + Arrays.asList(createEndpoints(LOCALITY1, 0), createEndpoints(LOCALITY2, 1))); } @Test @SuppressWarnings("unchecked") public void edsUpdatePriorityName_addOnePriority() { - verifyEdsPriorityNames(Arrays.asList(CLUSTER2 + "[child2]"), - ImmutableMap.of(locality1, createEndpoints(1)), - ImmutableMap.of(locality2, createEndpoints(1) - )); + verifyEdsPriorityNames(Arrays.asList(CLUSTER + "[child2]"), + Arrays.asList(createEndpoints(LOCALITY1, 0)), + Arrays.asList(createEndpoints(LOCALITY2, 0))); } @Test @SuppressWarnings("unchecked") public void edsUpdatePriorityName_swapTwoPriorities() { - verifyEdsPriorityNames(Arrays.asList(CLUSTER2 + "[child2]", CLUSTER2 + "[child1]", - CLUSTER2 + "[child3]"), - ImmutableMap.of(locality1, createEndpoints(1), - locality2, createEndpoints(2), - locality3, createEndpoints(3) - ), - ImmutableMap.of(locality1, createEndpoints(2), - locality2, createEndpoints(1), - locality3, createEndpoints(3)) - ); + verifyEdsPriorityNames(Arrays.asList(CLUSTER + "[child2]", CLUSTER + "[child1]", + CLUSTER + "[child3]"), + Arrays.asList( + createEndpoints(LOCALITY1, 0), + createEndpoints(LOCALITY2, 1), + createEndpoints(LOCALITY3, 2)), + Arrays.asList( + createEndpoints(LOCALITY1, 1), + createEndpoints(LOCALITY2, 0), + createEndpoints(LOCALITY3, 2))); } @Test @SuppressWarnings("unchecked") public void edsUpdatePriorityName_mergeTwoPriorities() { - verifyEdsPriorityNames(Arrays.asList(CLUSTER2 + "[child3]", CLUSTER2 + "[child1]"), - ImmutableMap.of(locality1, createEndpoints(1), - locality3, createEndpoints(3), - locality2, createEndpoints(2)), - ImmutableMap.of(locality1, createEndpoints(2), - locality3, createEndpoints(1), - locality2, createEndpoints(1) - )); + verifyEdsPriorityNames(Arrays.asList(CLUSTER + "[child3]", CLUSTER + "[child1]"), + Arrays.asList( + createEndpoints(LOCALITY1, 0), + createEndpoints(LOCALITY3, 2), + createEndpoints(LOCALITY2, 1)), + Arrays.asList( + createEndpoints(LOCALITY1, 1), + createEndpoints(LOCALITY3, 0), + createEndpoints(LOCALITY2, 0))); } - private LocalityLbEndpoints createEndpoints(int priority) { - return LocalityLbEndpoints.create( - Arrays.asList( - LbEndpoint.create(makeAddress("endpoint-addr-1"), 100, - true, "hostname1", ImmutableMap.of()), - LbEndpoint.create(makeAddress("endpoint-addr-2"), 100, - true, "hostname2", ImmutableMap.of())), - 70 /* localityWeight */, priority /* priority */, ImmutableMap.of()); + private LocalityLbEndpoints createEndpoints(Locality locality, int priority) { + return LocalityLbEndpoints.newBuilder() + .setLoadBalancingWeight(UInt32Value.of(70)) + .setLocality(locality) + .setPriority(priority) + .addLbEndpoints(newSocketLbEndpoint("127.0." + priority + ".1", 8080)) + .build(); } @Test public void onlyEdsClusters_resourceNeverExist_returnErrorPicker() { - ClusterResolverConfig config = new ClusterResolverConfig( - edsDiscoveryMechanism1, roundRobin, false); - deliverLbConfig(config); - assertThat(xdsClient.watchers.keySet()).containsExactly(EDS_SERVICE_NAME1); - assertThat(childBalancers).isEmpty(); - reset(helper); - xdsClient.deliverResourceNotFound(EDS_SERVICE_NAME1); + controlPlaneService.setXdsConfig(ADS_TYPE_URL_CDS, ImmutableMap.of()); + startXdsDepManager(); verify(helper).updateBalancingState( eq(ConnectivityState.TRANSIENT_FAILURE), pickerCaptor.capture()); assertPicker( pickerCaptor.getValue(), Status.UNAVAILABLE.withDescription( - "No usable endpoint from cluster: " + CLUSTER1), + "CDS resource " + CLUSTER + " does not exist nodeID: node-id"), null); } @Test - public void edsCluster_resourcesRevoked_shutDownChildLbPolicy() { - ClusterResolverConfig config = new ClusterResolverConfig( - edsDiscoveryMechanism1, roundRobin, false); - deliverLbConfig(config); - assertThat(xdsClient.watchers.keySet()).containsExactly(EDS_SERVICE_NAME1); - assertThat(childBalancers).isEmpty(); - reset(helper); - EquivalentAddressGroup endpoint1 = makeAddress("endpoint-addr-1"); - LocalityLbEndpoints localityLbEndpoints1 = - LocalityLbEndpoints.create( - Collections.singletonList(LbEndpoint.create(endpoint1, 100, true, - "hostname1", ImmutableMap.of())), - 10 /* localityWeight */, 1 /* priority */, ImmutableMap.of()); - xdsClient.deliverClusterLoadAssignment( - EDS_SERVICE_NAME1, Collections.singletonMap(locality1, localityLbEndpoints1)); + public void cdsMissing_handledDirectly() { + ClusterLoadAssignment clusterLoadAssignment = ClusterLoadAssignment.newBuilder() + .setClusterName(EDS_SERVICE_NAME) + .addEndpoints(LocalityLbEndpoints.newBuilder() + .setLoadBalancingWeight(UInt32Value.of(100)) + .setLocality(LOCALITY1) + .addLbEndpoints(newSocketLbEndpoint("127.0.0.1", 8000))) + .build(); + controlPlaneService.setXdsConfig(ADS_TYPE_URL_CDS, ImmutableMap.of()); + controlPlaneService.setXdsConfig(ADS_TYPE_URL_EDS, ImmutableMap.of( + EDS_SERVICE_NAME, clusterLoadAssignment)); + + startXdsDepManager(); + assertThat(childBalancers).hasSize(0); // no child LB policy created + verify(helper).updateBalancingState( + eq(ConnectivityState.TRANSIENT_FAILURE), pickerCaptor.capture()); + Status expectedError = Status.UNAVAILABLE.withDescription( + "CDS resource " + CLUSTER + " does not exist nodeID: node-id"); + assertPicker(pickerCaptor.getValue(), expectedError, null); + } + + @Test + public void cdsRevoked_handledDirectly() { + ClusterLoadAssignment clusterLoadAssignment = ClusterLoadAssignment.newBuilder() + .setClusterName(EDS_SERVICE_NAME) + .addEndpoints(LocalityLbEndpoints.newBuilder() + .setLoadBalancingWeight(UInt32Value.of(100)) + .setLocality(LOCALITY1) + .addLbEndpoints(newSocketLbEndpoint("127.0.0.1", 8000))) + .build(); + controlPlaneService.setXdsConfig(ADS_TYPE_URL_EDS, ImmutableMap.of( + EDS_SERVICE_NAME, clusterLoadAssignment)); + + startXdsDepManager(); assertThat(childBalancers).hasSize(1); // child LB policy created FakeLoadBalancer childBalancer = Iterables.getOnlyElement(childBalancers); assertThat(((PriorityLbConfig) childBalancer.config).priorities).hasSize(1); - assertAddressesEqual(Arrays.asList(endpoint1), childBalancer.addresses); + assertThat(childBalancer.addresses).hasSize(1); + assertAddressesEqual( + Arrays.asList(newInetSocketAddressEag("127.0.0.1", 8000)), + childBalancer.addresses); - xdsClient.deliverResourceNotFound(EDS_SERVICE_NAME1); + controlPlaneService.setXdsConfig(ADS_TYPE_URL_CDS, ImmutableMap.of()); verify(helper).updateBalancingState( eq(ConnectivityState.TRANSIENT_FAILURE), pickerCaptor.capture()); Status expectedError = Status.UNAVAILABLE.withDescription( - "No usable endpoint from cluster: " + CLUSTER1); + "CDS resource " + CLUSTER + " does not exist nodeID: node-id"); assertPicker(pickerCaptor.getValue(), expectedError, null); + assertThat(childBalancer.shutdown).isTrue(); + } + + @Test + public void edsMissing_handledByChildPolicy() { + controlPlaneService.setXdsConfig(ADS_TYPE_URL_EDS, ImmutableMap.of()); + + startXdsDepManager(); + assertThat(childBalancers).hasSize(1); // child LB policy created + FakeLoadBalancer childBalancer = Iterables.getOnlyElement(childBalancers); + assertThat(childBalancer.upstreamError).isNotNull(); + assertThat(childBalancer.upstreamError.getCode()).isEqualTo(Status.Code.UNAVAILABLE); + assertThat(childBalancer.upstreamError.getDescription()) + .isEqualTo("EDS resource " + EDS_SERVICE_NAME + " does not exist nodeID: node-id"); + assertThat(childBalancer.shutdown).isFalse(); + } + + @Test + public void logicalDnsLookupFailed_handledByChildPolicy() { + controlPlaneService.setXdsConfig(ADS_TYPE_URL_CDS, ImmutableMap.of( + CLUSTER, LOGICAL_DNS_CLUSTER)); + startXdsDepManager(new CdsConfig(CLUSTER), /* forwardTime= */ false); + FakeNameResolver resolver = assertResolverCreated("/" + DNS_HOST_NAME + ":9000"); + assertThat(childBalancers).isEmpty(); + resolver.deliverError(Status.UNAVAILABLE.withDescription("OH NO! Who would have guessed?")); + + assertThat(childBalancers).hasSize(1); // child LB policy created + FakeLoadBalancer childBalancer = Iterables.getOnlyElement(childBalancers); + assertThat(childBalancer.upstreamError).isNotNull(); + assertThat(childBalancer.upstreamError.getCode()).isEqualTo(Status.Code.UNAVAILABLE); + assertThat(childBalancer.upstreamError.getDescription()) + .isEqualTo("OH NO! Who would have guessed?"); + assertThat(childBalancer.shutdown).isFalse(); } @Test public void handleEdsResource_ignoreUnhealthyEndpoints() { - ClusterResolverConfig config = new ClusterResolverConfig( - edsDiscoveryMechanism1, roundRobin, false); - deliverLbConfig(config); - EquivalentAddressGroup endpoint1 = makeAddress("endpoint-addr-1"); - EquivalentAddressGroup endpoint2 = makeAddress("endpoint-addr-2"); - LocalityLbEndpoints localityLbEndpoints = - LocalityLbEndpoints.create( - Arrays.asList( - LbEndpoint.create(endpoint1, 100, false /* isHealthy */, - "hostname1", ImmutableMap.of()), - LbEndpoint.create(endpoint2, 100, true /* isHealthy */, - "hostname2", ImmutableMap.of())), - 10 /* localityWeight */, 1 /* priority */, ImmutableMap.of()); - xdsClient.deliverClusterLoadAssignment( - EDS_SERVICE_NAME1, Collections.singletonMap(locality1, localityLbEndpoints)); + ClusterLoadAssignment clusterLoadAssignment = ClusterLoadAssignment.newBuilder() + .setClusterName(EDS_SERVICE_NAME) + .addEndpoints(LocalityLbEndpoints.newBuilder() + .setLoadBalancingWeight(UInt32Value.of(100)) + .setLocality(LOCALITY1) + .addLbEndpoints(newSocketLbEndpoint("127.0.0.1", 8000) + .setHealthStatus(HealthStatus.UNHEALTHY)) + .addLbEndpoints(newSocketLbEndpoint("127.0.0.2", 8000))) + .build(); + controlPlaneService.setXdsConfig(ADS_TYPE_URL_EDS, ImmutableMap.of( + EDS_SERVICE_NAME, clusterLoadAssignment)); + startXdsDepManager(); + + verify(helper, never()).updateBalancingState(eq(ConnectivityState.TRANSIENT_FAILURE), any()); FakeLoadBalancer childBalancer = Iterables.getOnlyElement(childBalancers); - assertThat(childBalancer.addresses).hasSize(1); - assertAddressesEqual(Collections.singletonList(endpoint2), childBalancer.addresses); + assertAddressesEqual( + Arrays.asList(new EquivalentAddressGroup(newInetSocketAddress("127.0.0.2", 8000))), + childBalancer.addresses); } @Test public void handleEdsResource_ignoreLocalitiesWithNoHealthyEndpoints() { - ClusterResolverConfig config = new ClusterResolverConfig( - edsDiscoveryMechanism1, roundRobin, false); - deliverLbConfig(config); - EquivalentAddressGroup endpoint1 = makeAddress("endpoint-addr-1"); - EquivalentAddressGroup endpoint2 = makeAddress("endpoint-addr-2"); - LocalityLbEndpoints localityLbEndpoints1 = - LocalityLbEndpoints.create( - Collections.singletonList(LbEndpoint.create(endpoint1, 100, false /* isHealthy */, - "hostname1", ImmutableMap.of())), - 10 /* localityWeight */, 1 /* priority */, ImmutableMap.of()); - LocalityLbEndpoints localityLbEndpoints2 = - LocalityLbEndpoints.create( - Collections.singletonList(LbEndpoint.create(endpoint2, 100, true /* isHealthy */, - "hostname2", ImmutableMap.of())), - 10 /* localityWeight */, 1 /* priority */, ImmutableMap.of()); - xdsClient.deliverClusterLoadAssignment( - EDS_SERVICE_NAME1, - ImmutableMap.of(locality1, localityLbEndpoints1, locality2, localityLbEndpoints2)); + ClusterLoadAssignment clusterLoadAssignment = ClusterLoadAssignment.newBuilder() + .setClusterName(EDS_SERVICE_NAME) + .addEndpoints(LocalityLbEndpoints.newBuilder() + .setLoadBalancingWeight(UInt32Value.of(100)) + .setLocality(LOCALITY1) + .addLbEndpoints(newSocketLbEndpoint("127.0.0.1", 8000) + .setHealthStatus(HealthStatus.UNHEALTHY))) + .addEndpoints(LocalityLbEndpoints.newBuilder() + .setLoadBalancingWeight(UInt32Value.of(100)) + .setLocality(LOCALITY2) + .addLbEndpoints(newSocketLbEndpoint("127.0.0.2", 8000))) + .build(); + controlPlaneService.setXdsConfig(ADS_TYPE_URL_EDS, ImmutableMap.of( + EDS_SERVICE_NAME, clusterLoadAssignment)); + startXdsDepManager(); + verify(helper, never()).updateBalancingState(eq(ConnectivityState.TRANSIENT_FAILURE), any()); FakeLoadBalancer childBalancer = Iterables.getOnlyElement(childBalancers); + io.grpc.xds.client.Locality locality2 = io.grpc.xds.client.Locality.create( + LOCALITY2.getRegion(), LOCALITY2.getZone(), LOCALITY2.getSubZone()); for (EquivalentAddressGroup eag : childBalancer.addresses) { assertThat(eag.getAttributes().get(XdsAttributes.ATTR_LOCALITY)).isEqualTo(locality2); } @@ -746,76 +819,65 @@ public void handleEdsResource_ignoreLocalitiesWithNoHealthyEndpoints() { @Test public void handleEdsResource_ignorePrioritiesWithNoHealthyEndpoints() { - ClusterResolverConfig config = new ClusterResolverConfig( - edsDiscoveryMechanism1, roundRobin, false); - deliverLbConfig(config); - EquivalentAddressGroup endpoint1 = makeAddress("endpoint-addr-1"); - EquivalentAddressGroup endpoint2 = makeAddress("endpoint-addr-2"); - LocalityLbEndpoints localityLbEndpoints1 = - LocalityLbEndpoints.create( - Collections.singletonList(LbEndpoint.create(endpoint1, 100, false /* isHealthy */, - "hostname1", ImmutableMap.of())), - 10 /* localityWeight */, 1 /* priority */, ImmutableMap.of()); - LocalityLbEndpoints localityLbEndpoints2 = - LocalityLbEndpoints.create( - Collections.singletonList(LbEndpoint.create(endpoint2, 200, true /* isHealthy */, - "hostname2", ImmutableMap.of())), - 10 /* localityWeight */, 2 /* priority */, ImmutableMap.of()); - String priority2 = CLUSTER1 + "[child2]"; - xdsClient.deliverClusterLoadAssignment( - EDS_SERVICE_NAME1, - ImmutableMap.of(locality1, localityLbEndpoints1, locality2, localityLbEndpoints2)); + ClusterLoadAssignment clusterLoadAssignment = ClusterLoadAssignment.newBuilder() + .setClusterName(EDS_SERVICE_NAME) + .addEndpoints(LocalityLbEndpoints.newBuilder() + .setLoadBalancingWeight(UInt32Value.of(100)) + .setLocality(LOCALITY1) + .setPriority(0) + .addLbEndpoints(newSocketLbEndpoint("127.0.0.1", 8000) + .setHealthStatus(HealthStatus.UNHEALTHY))) + .addEndpoints(LocalityLbEndpoints.newBuilder() + .setLoadBalancingWeight(UInt32Value.of(100)) + .setLocality(LOCALITY2) + .setPriority(1) + .addLbEndpoints(newSocketLbEndpoint("127.0.0.2", 8000))) + .build(); + controlPlaneService.setXdsConfig(ADS_TYPE_URL_EDS, ImmutableMap.of( + EDS_SERVICE_NAME, clusterLoadAssignment)); + startXdsDepManager(); + verify(helper, never()).updateBalancingState(eq(ConnectivityState.TRANSIENT_FAILURE), any()); + String priority2 = CLUSTER + "[child2]"; FakeLoadBalancer childBalancer = Iterables.getOnlyElement(childBalancers); assertThat(((PriorityLbConfig) childBalancer.config).priorities).containsExactly(priority2); } @Test public void handleEdsResource_noHealthyEndpoint() { - ClusterResolverConfig config = new ClusterResolverConfig( - edsDiscoveryMechanism1, roundRobin, false); - deliverLbConfig(config); - EquivalentAddressGroup endpoint = makeAddress("endpoint-addr-1"); - LocalityLbEndpoints localityLbEndpoints = - LocalityLbEndpoints.create( - Collections.singletonList(LbEndpoint.create(endpoint, 100, false /* isHealthy */, - "hostname1", ImmutableMap.of())), - 10 /* localityWeight */, 1 /* priority */, ImmutableMap.of()); - xdsClient.deliverClusterLoadAssignment(EDS_SERVICE_NAME1, - Collections.singletonMap(locality1, localityLbEndpoints)); // single endpoint, unhealthy + ClusterLoadAssignment clusterLoadAssignment = ClusterLoadAssignment.newBuilder() + .setClusterName(EDS_SERVICE_NAME) + .addEndpoints(LocalityLbEndpoints.newBuilder() + .setLoadBalancingWeight(UInt32Value.of(100)) + .setLocality(LOCALITY1) + .addLbEndpoints(newSocketLbEndpoint("127.0.0.1", 8000) + .setHealthStatus(HealthStatus.UNHEALTHY))) + .build(); + controlPlaneService.setXdsConfig(ADS_TYPE_URL_EDS, ImmutableMap.of( + EDS_SERVICE_NAME, clusterLoadAssignment)); + startXdsDepManager(); - assertThat(childBalancers).isEmpty(); - verify(helper).updateBalancingState( - eq(ConnectivityState.TRANSIENT_FAILURE), pickerCaptor.capture()); - assertPicker( - pickerCaptor.getValue(), - Status.UNAVAILABLE.withDescription( - "No usable endpoint from cluster: " + CLUSTER1), - null); + verify(helper, never()).updateBalancingState(eq(ConnectivityState.TRANSIENT_FAILURE), any()); + assertThat(childBalancers).hasSize(1); + FakeLoadBalancer childBalancer = Iterables.getOnlyElement(childBalancers); + assertThat(childBalancer.upstreamError).isNotNull(); + assertThat(childBalancer.upstreamError.getCode()).isEqualTo(Status.Code.UNAVAILABLE); + assertThat(childBalancer.upstreamError.getDescription()) + .isEqualTo("No usable endpoint from cluster: " + CLUSTER); } @Test public void onlyLogicalDnsCluster_endpointsResolved() { - do_onlyLogicalDnsCluster_endpointsResolved(); - } - - @Test - public void oldListenerCallback_onlyLogicalDnsCluster_endpointsResolved() { - nsRegistry.deregister(fakeNameResolverProvider); - nsRegistry.register(new FakeNameResolverProvider(true)); - do_onlyLogicalDnsCluster_endpointsResolved(); - } - - void do_onlyLogicalDnsCluster_endpointsResolved() { - ClusterResolverConfig config = new ClusterResolverConfig( - logicalDnsDiscoveryMechanism, roundRobin, false); - deliverLbConfig(config); - FakeNameResolver resolver = assertResolverCreated("/" + DNS_HOST_NAME); + controlPlaneService.setXdsConfig(ADS_TYPE_URL_CDS, ImmutableMap.of( + CLUSTER, LOGICAL_DNS_CLUSTER)); + startXdsDepManager(new CdsConfig(CLUSTER), /* forwardTime= */ false); + FakeNameResolver resolver = assertResolverCreated("/" + DNS_HOST_NAME + ":9000"); assertThat(childBalancers).isEmpty(); - EquivalentAddressGroup endpoint1 = makeAddress("endpoint-addr-1"); - EquivalentAddressGroup endpoint2 = makeAddress("endpoint-addr-2"); - resolver.deliverEndpointAddresses(Arrays.asList(endpoint1, endpoint2)); + resolver.deliverEndpointAddresses(Arrays.asList( + newInetSocketAddressEag("127.0.2.1", 9000), newInetSocketAddressEag("127.0.2.2", 9000))); + fakeClock.forwardTime(10, TimeUnit.MINUTES); + verify(helper, never()).updateBalancingState(eq(ConnectivityState.TRANSIENT_FAILURE), any()); assertThat(childBalancers).hasSize(1); FakeLoadBalancer childBalancer = Iterables.getOnlyElement(childBalancers); assertThat(childBalancer.name).isEqualTo(PRIORITY_POLICY_NAME); @@ -828,263 +890,132 @@ void do_onlyLogicalDnsCluster_endpointsResolved() { .isEqualTo(CLUSTER_IMPL_POLICY_NAME); ClusterImplConfig clusterImplConfig = (ClusterImplConfig) GracefulSwitchLoadBalancerAccessor.getChildConfig(priorityChildConfig.childConfig); - assertClusterImplConfig(clusterImplConfig, CLUSTER_DNS, null, LRS_SERVER_INFO, 300L, null, - Collections.emptyList(), "pick_first"); - assertAddressesEqual(Arrays.asList(endpoint1, endpoint2), childBalancer.addresses); + assertClusterImplConfig(clusterImplConfig, CLUSTER, null, null, null, null, + Collections.emptyList(), "wrr_locality_experimental"); + assertAddressesEqual( + Arrays.asList(new EquivalentAddressGroup(Arrays.asList( + newInetSocketAddress("127.0.2.1", 9000), newInetSocketAddress("127.0.2.2", 9000)))), + childBalancer.addresses); assertThat(childBalancer.addresses.get(0).getAttributes() - .get(XdsAttributes.ATTR_ADDRESS_NAME)).isEqualTo(DNS_HOST_NAME); - assertThat(childBalancer.addresses.get(1).getAttributes() - .get(XdsAttributes.ATTR_ADDRESS_NAME)).isEqualTo(DNS_HOST_NAME); + .get(XdsAttributes.ATTR_ADDRESS_NAME)).isEqualTo(DNS_HOST_NAME + ":9000"); } @Test public void onlyLogicalDnsCluster_handleRefreshNameResolution() { - ClusterResolverConfig config = new ClusterResolverConfig( - logicalDnsDiscoveryMechanism, roundRobin, false); - deliverLbConfig(config); - FakeNameResolver resolver = assertResolverCreated("/" + DNS_HOST_NAME); + controlPlaneService.setXdsConfig(ADS_TYPE_URL_CDS, ImmutableMap.of( + CLUSTER, LOGICAL_DNS_CLUSTER)); + startXdsDepManager(new CdsConfig(CLUSTER), /* forwardTime= */ false); + FakeNameResolver resolver = assertResolverCreated("/" + DNS_HOST_NAME + ":9000"); assertThat(childBalancers).isEmpty(); - EquivalentAddressGroup endpoint1 = makeAddress("endpoint-addr-1"); - EquivalentAddressGroup endpoint2 = makeAddress("endpoint-addr-2"); - resolver.deliverEndpointAddresses(Arrays.asList(endpoint1, endpoint2)); - assertThat(resolver.refreshCount).isEqualTo(0); - FakeLoadBalancer childBalancer = Iterables.getOnlyElement(childBalancers); - childBalancer.helper.refreshNameResolution(); - assertThat(resolver.refreshCount).isEqualTo(1); - } - - @Test - public void resolutionError_backoffAndRefresh() { - do_onlyLogicalDnsCluster_resolutionError_backoffAndRefresh(); - } - - @Test - public void oldListenerCallback_resolutionError_backoffAndRefresh() { - nsRegistry.deregister(fakeNameResolverProvider); - nsRegistry.register(new FakeNameResolverProvider(true)); - do_onlyLogicalDnsCluster_resolutionError_backoffAndRefresh(); - } + resolver.deliverEndpointAddresses(Arrays.asList(newInetSocketAddressEag("127.0.2.1", 9000))); + fakeClock.forwardTime(10, TimeUnit.MINUTES); - void do_onlyLogicalDnsCluster_resolutionError_backoffAndRefresh() { - InOrder inOrder = Mockito.inOrder(helper, backoffPolicyProvider, - backoffPolicy1, backoffPolicy2); - ClusterResolverConfig config = new ClusterResolverConfig( - logicalDnsDiscoveryMechanism, roundRobin, false); - deliverLbConfig(config); - FakeNameResolver resolver = assertResolverCreated("/" + DNS_HOST_NAME); - assertThat(childBalancers).isEmpty(); - Status error = Status.UNAVAILABLE.withDescription("cannot reach DNS server"); - resolver.deliverError(error); - inOrder.verify(helper).updateBalancingState( - eq(ConnectivityState.TRANSIENT_FAILURE), pickerCaptor.capture()); - assertPicker(pickerCaptor.getValue(), error, null); + verify(helper, never()).updateBalancingState(eq(ConnectivityState.TRANSIENT_FAILURE), any()); assertThat(resolver.refreshCount).isEqualTo(0); - inOrder.verify(backoffPolicyProvider).get(); - inOrder.verify(backoffPolicy1).nextBackoffNanos(); - assertThat(fakeClock.getPendingTasks()).hasSize(1); - assertThat(Iterables.getOnlyElement(fakeClock.getPendingTasks()).getDelay(TimeUnit.SECONDS)) - .isEqualTo(1L); - fakeClock.forwardTime(1L, TimeUnit.SECONDS); - assertThat(resolver.refreshCount).isEqualTo(1); - - error = Status.UNKNOWN.withDescription("I am lost"); - resolver.deliverError(error); - inOrder.verify(helper).updateBalancingState( - eq(ConnectivityState.TRANSIENT_FAILURE), pickerCaptor.capture()); - inOrder.verify(backoffPolicy1).nextBackoffNanos(); - assertPicker(pickerCaptor.getValue(), error, null); - assertThat(fakeClock.getPendingTasks()).hasSize(1); - assertThat(Iterables.getOnlyElement(fakeClock.getPendingTasks()).getDelay(TimeUnit.SECONDS)) - .isEqualTo(10L); - fakeClock.forwardTime(10L, TimeUnit.SECONDS); - assertThat(resolver.refreshCount).isEqualTo(2); - - // Succeed. - EquivalentAddressGroup endpoint1 = makeAddress("endpoint-addr-1"); - EquivalentAddressGroup endpoint2 = makeAddress("endpoint-addr-2"); - resolver.deliverEndpointAddresses(Arrays.asList(endpoint1, endpoint2)); - assertThat(childBalancers).hasSize(1); - assertAddressesEqual(Arrays.asList(endpoint1, endpoint2), - Iterables.getOnlyElement(childBalancers).addresses); - - assertThat(fakeClock.getPendingTasks()).isEmpty(); - inOrder.verifyNoMoreInteractions(); - } - - @Test - public void onlyLogicalDnsCluster_refreshNameResolutionRaceWithResolutionError() { - InOrder inOrder = Mockito.inOrder(backoffPolicyProvider, backoffPolicy1, backoffPolicy2); - ClusterResolverConfig config = new ClusterResolverConfig( - logicalDnsDiscoveryMechanism, roundRobin, false); - deliverLbConfig(config); - FakeNameResolver resolver = assertResolverCreated("/" + DNS_HOST_NAME); - assertThat(childBalancers).isEmpty(); - EquivalentAddressGroup endpoint = makeAddress("endpoint-addr"); - resolver.deliverEndpointAddresses(Collections.singletonList(endpoint)); FakeLoadBalancer childBalancer = Iterables.getOnlyElement(childBalancers); - assertAddressesEqual(Collections.singletonList(endpoint), childBalancer.addresses); - assertThat(resolver.refreshCount).isEqualTo(0); - childBalancer.helper.refreshNameResolution(); assertThat(resolver.refreshCount).isEqualTo(1); - resolver.deliverError(Status.UNAVAILABLE.withDescription("I am lost")); - inOrder.verify(backoffPolicyProvider).get(); - inOrder.verify(backoffPolicy1).nextBackoffNanos(); - assertThat(fakeClock.getPendingTasks()).hasSize(1); - ScheduledTask task = Iterables.getOnlyElement(fakeClock.getPendingTasks()); - assertThat(task.getDelay(TimeUnit.SECONDS)).isEqualTo(1L); - - fakeClock.forwardTime( 100L, TimeUnit.MILLISECONDS); - childBalancer.helper.refreshNameResolution(); - assertThat(resolver.refreshCount).isEqualTo(2); - assertThat(task.isCancelled()).isTrue(); - assertThat(fakeClock.getPendingTasks()).isEmpty(); - resolver.deliverError(Status.UNAVAILABLE.withDescription("I am still lost")); - inOrder.verify(backoffPolicyProvider).get(); // active refresh resets backoff sequence - inOrder.verify(backoffPolicy2).nextBackoffNanos(); - task = Iterables.getOnlyElement(fakeClock.getPendingTasks()); - assertThat(task.getDelay(TimeUnit.SECONDS)).isEqualTo(5L); - - fakeClock.forwardTime(5L, TimeUnit.SECONDS); - assertThat(resolver.refreshCount).isEqualTo(3); - inOrder.verifyNoMoreInteractions(); } @Test - public void resolutionErrorAfterChildLbCreated_propagateError() { - ClusterResolverConfig config = new ClusterResolverConfig( - edsDiscoveryMechanism1, roundRobin, false); - deliverLbConfig(config); - assertThat(xdsClient.watchers.keySet()).containsExactly(EDS_SERVICE_NAME1); - assertThat(childBalancers).isEmpty(); - reset(helper); - EquivalentAddressGroup endpoint = makeAddress("endpoint-addr-1"); - LocalityLbEndpoints localityLbEndpoints = - LocalityLbEndpoints.create( - Collections.singletonList(LbEndpoint.create(endpoint, 100, true, - "hostname1", ImmutableMap.of())), - 10 /* localityWeight */, 1 /* priority */, ImmutableMap.of()); - xdsClient.deliverClusterLoadAssignment( - EDS_SERVICE_NAME1, Collections.singletonMap(locality1, localityLbEndpoints)); - - assertThat(childBalancers).hasSize(1); - FakeLoadBalancer childBalancer = Iterables.getOnlyElement(childBalancers); // child LB created - assertThat(childBalancer.upstreamError).isNull(); // should not propagate error to child LB - assertAddressesEqual(Collections.singletonList(endpoint), childBalancer.addresses); - - xdsClient.deliverError(Status.RESOURCE_EXHAUSTED.withDescription("out of memory")); - assertThat(childBalancer.upstreamError).isNotNull(); // last cluster's (DNS) error propagated - assertThat(childBalancer.upstreamError.getCode()).isEqualTo(Code.UNAVAILABLE); - assertThat(childBalancer.upstreamError.getDescription()) - .isEqualTo("Unable to load EDS backend-service-foo.googleapis.com. xDS server returned: " - + "RESOURCE_EXHAUSTED: out of memory"); - assertThat(childBalancer.shutdown).isFalse(); - verify(helper, never()).updateBalancingState( - eq(ConnectivityState.TRANSIENT_FAILURE), any(SubchannelPicker.class)); - } - - @Test - public void resolutionErrorBeforeChildLbCreated_returnErrorPicker() { - ClusterResolverConfig config = new ClusterResolverConfig( - edsDiscoveryMechanism1, roundRobin, false); - deliverLbConfig(config); - assertThat(xdsClient.watchers.keySet()).containsExactly(EDS_SERVICE_NAME1); - assertThat(childBalancers).isEmpty(); - reset(helper); - xdsClient.deliverError(Status.RESOURCE_EXHAUSTED.withDescription("OOM")); - assertThat(childBalancers).isEmpty(); - verify(helper).updateBalancingState( - eq(ConnectivityState.TRANSIENT_FAILURE), pickerCaptor.capture()); - PickResult result = pickerCaptor.getValue().pickSubchannel(mock(PickSubchannelArgs.class)); - Status actualStatus = result.getStatus(); - assertThat(actualStatus.getCode()).isEqualTo(Status.Code.UNAVAILABLE); - assertThat(actualStatus.getDescription()).contains("RESOURCE_EXHAUSTED: OOM"); - } - - @Test - public void handleNameResolutionErrorFromUpstream_eds_beforeChildLbCreated_returnErrorPicker() { - ClusterResolverConfig config = new ClusterResolverConfig( - edsDiscoveryMechanism1, roundRobin, false); - deliverLbConfig(config); - assertThat(xdsClient.watchers.keySet()).containsExactly(EDS_SERVICE_NAME1); - assertThat(childBalancers).isEmpty(); - reset(helper); - Status upstreamError = Status.UNAVAILABLE.withDescription("unreachable"); - loadBalancer.handleNameResolutionError(upstreamError); - verify(helper).updateBalancingState( - eq(ConnectivityState.TRANSIENT_FAILURE), pickerCaptor.capture()); - assertPicker(pickerCaptor.getValue(), upstreamError, null); - } - - @Test - public void handleNameResolutionErrorFromUpstream_lDns_beforeChildLbCreated_returnErrorPicker() { - ClusterResolverConfig config = new ClusterResolverConfig( - logicalDnsDiscoveryMechanism, roundRobin, false); - deliverLbConfig(config); - assertResolverCreated("/" + DNS_HOST_NAME); - assertThat(childBalancers).isEmpty(); - reset(helper); - Status upstreamError = Status.UNAVAILABLE.withDescription("unreachable"); - loadBalancer.handleNameResolutionError(upstreamError); - verify(helper).updateBalancingState( - eq(ConnectivityState.TRANSIENT_FAILURE), pickerCaptor.capture()); - assertPicker(pickerCaptor.getValue(), upstreamError, null); - } + public void outlierDetection_disabledConfig() { + Cluster cluster = EDS_CLUSTER.toBuilder() + .setOutlierDetection(OutlierDetection.newBuilder() + .setEnforcingSuccessRate(UInt32Value.of(0)) + .setEnforcingFailurePercentage(UInt32Value.of(0))) + .build(); + controlPlaneService.setXdsConfig(ADS_TYPE_URL_CDS, ImmutableMap.of( + CLUSTER, cluster)); + startXdsDepManager(); - @Test - public void handleNameResolutionErrorFromUpstream_afterChildLbCreated_eds_fallThrough() { - ClusterResolverConfig config = new ClusterResolverConfig( - edsDiscoveryMechanism1, roundRobin, false); - deliverLbConfig(config); - assertThat(xdsClient.watchers.keySet()).containsExactly(EDS_SERVICE_NAME1); - assertThat(childBalancers).isEmpty(); - reset(helper); - EquivalentAddressGroup endpoint1 = makeAddress("endpoint-addr-1"); - LocalityLbEndpoints localityLbEndpoints = - LocalityLbEndpoints.create( - Collections.singletonList(LbEndpoint.create(endpoint1, 100, true, - "hostname1", ImmutableMap.of())), - 10 /* localityWeight */, 1 /* priority */, ImmutableMap.of()); - xdsClient.deliverClusterLoadAssignment( - EDS_SERVICE_NAME1, Collections.singletonMap(locality1, localityLbEndpoints)); + verify(helper, never()).updateBalancingState(eq(ConnectivityState.TRANSIENT_FAILURE), any()); assertThat(childBalancers).hasSize(1); FakeLoadBalancer childBalancer = Iterables.getOnlyElement(childBalancers); - assertThat(((PriorityLbConfig) childBalancer.config).priorities) - .containsExactly(CLUSTER1 + "[child1]"); - assertAddressesEqual(Arrays.asList(endpoint1), childBalancer.addresses); - - loadBalancer.handleNameResolutionError(Status.UNAVAILABLE.withDescription("unreachable")); - assertThat(childBalancer.upstreamError.getCode()).isEqualTo(Code.UNAVAILABLE); - assertThat(childBalancer.upstreamError.getDescription()).isEqualTo("unreachable"); - verify(helper, never()).updateBalancingState( - any(ConnectivityState.class), any(SubchannelPicker.class)); + assertThat(childBalancer.name).isEqualTo(PRIORITY_POLICY_NAME); + PriorityLbConfig priorityLbConfig = (PriorityLbConfig) childBalancer.config; + PriorityChildConfig priorityChildConfig = + Iterables.getOnlyElement(priorityLbConfig.childConfigs.values()); + OutlierDetectionLoadBalancerConfig outlier = (OutlierDetectionLoadBalancerConfig) + GracefulSwitchLoadBalancerAccessor.getChildConfig(priorityChildConfig.childConfig); + assertThat(outlier.successRateEjection).isNull(); + assertThat(outlier.failurePercentageEjection).isNull(); } @Test - public void handleNameResolutionErrorFromUpstream_afterChildLbCreated_logicalDns_fallThrough() { - ClusterResolverConfig config = new ClusterResolverConfig( - logicalDnsDiscoveryMechanism, roundRobin, false); - deliverLbConfig(config); - FakeNameResolver resolver = assertResolverCreated("/" + DNS_HOST_NAME); - assertThat(childBalancers).isEmpty(); - reset(helper); - EquivalentAddressGroup endpoint2 = makeAddress("endpoint-addr-2"); - resolver.deliverEndpointAddresses(Collections.singletonList(endpoint2)); + public void outlierDetection_fullConfig() { + Cluster cluster = EDS_CLUSTER.toBuilder() + .setLbPolicy(Cluster.LbPolicy.ROUND_ROBIN) + .setOutlierDetection(OutlierDetection.newBuilder() + .setInterval(Duration.newBuilder().setNanos(101)) + .setBaseEjectionTime(Duration.newBuilder().setNanos(102)) + .setMaxEjectionTime(Duration.newBuilder().setNanos(103)) + .setMaxEjectionPercent(UInt32Value.of(80)) + .setSuccessRateStdevFactor(UInt32Value.of(105)) + .setEnforcingSuccessRate(UInt32Value.of(81)) + .setSuccessRateMinimumHosts(UInt32Value.of(107)) + .setSuccessRateRequestVolume(UInt32Value.of(108)) + .setFailurePercentageThreshold(UInt32Value.of(82)) + .setEnforcingFailurePercentage(UInt32Value.of(83)) + .setFailurePercentageMinimumHosts(UInt32Value.of(111)) + .setFailurePercentageRequestVolume(UInt32Value.of(112))) + .build(); + controlPlaneService.setXdsConfig(ADS_TYPE_URL_CDS, ImmutableMap.of( + CLUSTER, cluster)); + startXdsDepManager(); + + verify(helper, never()).updateBalancingState(eq(ConnectivityState.TRANSIENT_FAILURE), any()); assertThat(childBalancers).hasSize(1); FakeLoadBalancer childBalancer = Iterables.getOnlyElement(childBalancers); - assertThat(((PriorityLbConfig) childBalancer.config).priorities) - .containsExactly(CLUSTER_DNS + "[child0]"); - assertAddressesEqual(Arrays.asList(endpoint2), childBalancer.addresses); - - loadBalancer.handleNameResolutionError(Status.UNAVAILABLE.withDescription("unreachable")); - assertThat(childBalancer.upstreamError.getCode()).isEqualTo(Code.UNAVAILABLE); - assertThat(childBalancer.upstreamError.getDescription()).isEqualTo("unreachable"); - verify(helper, never()).updateBalancingState( - any(ConnectivityState.class), any(SubchannelPicker.class)); + assertThat(childBalancer.name).isEqualTo(PRIORITY_POLICY_NAME); + PriorityLbConfig priorityLbConfig = (PriorityLbConfig) childBalancer.config; + PriorityChildConfig priorityChildConfig = + Iterables.getOnlyElement(priorityLbConfig.childConfigs.values()); + OutlierDetectionLoadBalancerConfig outlier = (OutlierDetectionLoadBalancerConfig) + GracefulSwitchLoadBalancerAccessor.getChildConfig(priorityChildConfig.childConfig); + assertThat(outlier.intervalNanos).isEqualTo(101); + assertThat(outlier.baseEjectionTimeNanos).isEqualTo(102); + assertThat(outlier.maxEjectionTimeNanos).isEqualTo(103); + assertThat(outlier.maxEjectionPercent).isEqualTo(80); + assertThat(outlier.successRateEjection.stdevFactor).isEqualTo(105); + assertThat(outlier.successRateEjection.enforcementPercentage).isEqualTo(81); + assertThat(outlier.successRateEjection.minimumHosts).isEqualTo(107); + assertThat(outlier.successRateEjection.requestVolume).isEqualTo(108); + assertThat(outlier.failurePercentageEjection.threshold).isEqualTo(82); + assertThat(outlier.failurePercentageEjection.enforcementPercentage).isEqualTo(83); + assertThat(outlier.failurePercentageEjection.minimumHosts).isEqualTo(111); + assertThat(outlier.failurePercentageEjection.requestVolume).isEqualTo(112); + assertClusterImplConfig( + (ClusterImplConfig) GracefulSwitchLoadBalancerAccessor.getChildConfig(outlier.childConfig), + CLUSTER, EDS_SERVICE_NAME, null, null, null, Collections.emptyList(), + "wrr_locality_experimental"); } @Test public void config_equalsTester() { + ServerInfo lrsServerInfo = + ServerInfo.create("lrs.googleapis.com", InsecureChannelCredentials.create()); + UpstreamTlsContext tlsContext = + CommonTlsContextTestsUtil.buildUpstreamTlsContext("google_cloud_private_spiffe", true); + DiscoveryMechanism edsDiscoveryMechanism1 = + DiscoveryMechanism.forEds(CLUSTER, EDS_SERVICE_NAME, lrsServerInfo, 100L, tlsContext, + Collections.emptyMap(), null); + io.grpc.xds.EnvoyServerProtoData.OutlierDetection outlierDetection = + io.grpc.xds.EnvoyServerProtoData.OutlierDetection.create( + 100L, 100L, 100L, 100, SuccessRateEjection.create(100, 100, 100, 100), + FailurePercentageEjection.create(100, 100, 100, 100)); + DiscoveryMechanism edsDiscoveryMechanismWithOutlierDetection = + DiscoveryMechanism.forEds(CLUSTER, EDS_SERVICE_NAME, lrsServerInfo, 100L, tlsContext, + Collections.emptyMap(), outlierDetection); + Object roundRobin = GracefulSwitchLoadBalancer.createLoadBalancingPolicyConfig( + new FakeLoadBalancerProvider("wrr_locality_experimental"), new WrrLocalityConfig( + GracefulSwitchLoadBalancer.createLoadBalancingPolicyConfig( + new FakeLoadBalancerProvider("round_robin"), null))); + Object leastRequest = GracefulSwitchLoadBalancer.createLoadBalancingPolicyConfig( + new FakeLoadBalancerProvider("wrr_locality_experimental"), new WrrLocalityConfig( + GracefulSwitchLoadBalancer.createLoadBalancingPolicyConfig( + new FakeLoadBalancerProvider("least_request_experimental"), + new LeastRequestConfig(3)))); + new EqualsTester() .addEqualityGroup( new ClusterResolverConfig( @@ -1102,17 +1033,36 @@ public void config_equalsTester() { .testEquals(); } - private void deliverLbConfig(ClusterResolverConfig config) { - loadBalancer.acceptResolvedAddresses( - ResolvedAddresses.newBuilder() - .setAddresses(Collections.emptyList()) - .setAttributes( - // Other attributes not used by cluster_resolver LB are omitted. - Attributes.newBuilder() - .set(XdsAttributes.XDS_CLIENT_POOL, xdsClientPool) - .build()) - .setLoadBalancingPolicyConfig(config) - .build()); + private void startXdsDepManager() { + startXdsDepManager(new CdsConfig(CLUSTER)); + } + + private void startXdsDepManager(final CdsConfig cdsConfig) { + startXdsDepManager(cdsConfig, true); + } + + private void startXdsDepManager(final CdsConfig cdsConfig, boolean forwardTime) { + xdsDepManager.start( + xdsConfig -> { + if (!xdsConfig.hasValue()) { + throw new AssertionError("" + xdsConfig.getStatus()); + } + if (loadBalancer == null) { + return; + } + loadBalancer.acceptResolvedAddresses(ResolvedAddresses.newBuilder() + .setAddresses(Collections.emptyList()) + .setAttributes(Attributes.newBuilder() + .set(XdsAttributes.XDS_CONFIG, xdsConfig.getValue()) + .set(XdsAttributes.XDS_CLUSTER_SUBSCRIPT_REGISTRY, xdsDepManager) + .build()) + .setLoadBalancingPolicyConfig(cdsConfig) + .build()); + }); + if (forwardTime) { + // trigger does not exist timer, so broken config is more obvious + fakeClock.forwardTime(10, TimeUnit.MINUTES); + } } private FakeNameResolver assertResolverCreated(String uriPath) { @@ -1151,112 +1101,41 @@ private static void assertClusterImplConfig(ClusterImplConfig config, String clu /** Asserts two list of EAGs contains same addresses, regardless of attributes. */ private static void assertAddressesEqual( List expected, List actual) { - List> expectedAddresses + List> expectedAddresses = expected.stream().map(EquivalentAddressGroup::getAddresses).collect(toList()); - List> actualAddresses + List> actualAddresses = actual.stream().map(EquivalentAddressGroup::getAddresses).collect(toList()); assertThat(actualAddresses).isEqualTo(expectedAddresses); } - private static EquivalentAddressGroup makeAddress(final String name) { - return new EquivalentAddressGroup(new FakeSocketAddress(name)); + @SuppressWarnings("AddressSelection") + private static InetSocketAddress newInetSocketAddress(String ip, int port) { + return new InetSocketAddress(ip, port); } - static class FakeSocketAddress extends SocketAddress { - private final String name; - - private FakeSocketAddress(String name) { - this.name = name; - } - - @Override - public int hashCode() { - return Objects.hash(name); - } - - @Override - public boolean equals(Object o) { - if (this == o) { - return true; - } - if (!(o instanceof FakeSocketAddress)) { - return false; - } - FakeSocketAddress that = (FakeSocketAddress) o; - return Objects.equals(name, that.name); - } - - @Override - public String toString() { - return name; - } + private static EquivalentAddressGroup newInetSocketAddressEag(String ip, int port) { + return new EquivalentAddressGroup(newInetSocketAddress(ip, port)); } - private static final class FakeXdsClient extends XdsClient { - - private final Map> watchers = new HashMap<>(); - - @Override - @SuppressWarnings("unchecked") - public void watchXdsResource(XdsResourceType type, - String resourceName, - ResourceWatcher watcher, - Executor syncContext) { - assertThat(type.typeName()).isEqualTo("EDS"); - assertThat(watchers).doesNotContainKey(resourceName); - watchers.put(resourceName, (ResourceWatcher) watcher); - } - - @Override - @SuppressWarnings("unchecked") - public void cancelXdsResourceWatch(XdsResourceType type, - String resourceName, - ResourceWatcher watcher) { - assertThat(type.typeName()).isEqualTo("EDS"); - assertThat(watchers).containsKey(resourceName); - watchers.remove(resourceName); - } - - void deliverClusterLoadAssignment( - String resource, Map localityLbEndpointsMap) { - deliverClusterLoadAssignment( - resource, Collections.emptyList(), localityLbEndpointsMap); - } - - void deliverClusterLoadAssignment(String resource, List dropOverloads, - Map localityLbEndpointsMap) { - if (watchers.containsKey(resource)) { - watchers.get(resource).onChanged( - new XdsEndpointResource.EdsUpdate(resource, localityLbEndpointsMap, dropOverloads)); - } - } - - void deliverResourceNotFound(String resource) { - if (watchers.containsKey(resource)) { - watchers.get(resource).onResourceDoesNotExist(resource); - } - } + private static LbEndpoint.Builder newSocketLbEndpoint(String ip, int port) { + return LbEndpoint.newBuilder() + .setEndpoint(Endpoint.newBuilder() + .setAddress(newAddress(ip, port))) + .setHealthStatus(HealthStatus.HEALTHY); + } - void deliverError(Status error) { - for (ResourceWatcher watcher : watchers.values()) { - watcher.onError(error); - } - } + private static Address.Builder newAddress(String ip, int port) { + return Address.newBuilder() + .setSocketAddress(SocketAddress.newBuilder() + .setAddress(ip) + .setPortValue(port)); } private class FakeNameResolverProvider extends NameResolverProvider { - private final boolean useOldListenerCallback; - - private FakeNameResolverProvider(boolean useOldListenerCallback) { - this.useOldListenerCallback = useOldListenerCallback; - } - @Override public NameResolver newNameResolver(URI targetUri, NameResolver.Args args) { assertThat(targetUri.getScheme()).isEqualTo("dns"); - FakeNameResolver resolver = useOldListenerCallback - ? new FakeNameResolverUsingOldListenerCallback(targetUri) - : new FakeNameResolver(targetUri); + FakeNameResolver resolver = new FakeNameResolver(targetUri); resolvers.add(resolver); return resolver; } @@ -1321,23 +1200,6 @@ protected void deliverError(Status error) { } } - private class FakeNameResolverUsingOldListenerCallback extends FakeNameResolver { - private FakeNameResolverUsingOldListenerCallback(URI targetUri) { - super(targetUri); - } - - @Override - protected void deliverEndpointAddresses(List addresses) { - listener.onResult(ResolutionResult.newBuilder() - .setAddressesOrError(StatusOr.fromValue(addresses)).build()); - } - - @Override - protected void deliverError(Status error) { - listener.onError(error); - } - } - private final class FakeLoadBalancerProvider extends LoadBalancerProvider { private final String policyName; diff --git a/xds/src/test/java/io/grpc/xds/XdsNameResolverTest.java b/xds/src/test/java/io/grpc/xds/XdsNameResolverTest.java index b50bdfce01f..e80fcd008bd 100644 --- a/xds/src/test/java/io/grpc/xds/XdsNameResolverTest.java +++ b/xds/src/test/java/io/grpc/xds/XdsNameResolverTest.java @@ -205,6 +205,8 @@ public ConfigOrError parseServiceConfig(Map rawServiceConfig) { @Before public void setUp() { + lenient().doReturn(Status.OK).when(mockListener).onResult2(any()); + try { targetUri = new URI(AUTHORITY); } catch (URISyntaxException e) { @@ -435,6 +437,7 @@ public void resolving_ldsResourceUpdateRdsName() { (Map) resolutionResultCaptor.getValue().getServiceConfig().getConfig()); reset(mockListener); + when(mockListener.onResult2(any())).thenReturn(Status.OK); ArgumentCaptor resultCaptor = ArgumentCaptor.forClass(ResolutionResult.class); String alternativeRdsResource = "route-configuration-alter.googleapis.com"; @@ -492,11 +495,13 @@ public void resolving_ldsResourceRevokedAndAddedBack() { (Map) resolutionResultCaptor.getValue().getServiceConfig().getConfig()); reset(mockListener); + when(mockListener.onResult2(any())).thenReturn(Status.OK); xdsClient.deliverLdsResourceNotFound(); // revoke LDS resource assertThat(xdsClient.rdsWatchers.keySet()).isEmpty(); // stop subscribing to stale RDS resource assertEmptyResolutionResult(expectedLdsResourceName); reset(mockListener); + when(mockListener.onResult2(any())).thenReturn(Status.OK); xdsClient.deliverLdsUpdateForRdsName(RDS_RESOURCE_NAME); // No name resolution result until new RDS resource update is received. Do not use stale config verifyNoInteractions(mockListener); @@ -533,11 +538,13 @@ public void resolving_rdsResourceRevokedAndAddedBack() { (Map) resolutionResultCaptor.getValue().getServiceConfig().getConfig()); reset(mockListener); + when(mockListener.onResult2(any())).thenReturn(Status.OK); xdsClient.deliverRdsResourceNotFound(RDS_RESOURCE_NAME); // revoke RDS resource assertEmptyResolutionResult(RDS_RESOURCE_NAME); // Simulate management server adds back the previously used RDS resource. reset(mockListener); + when(mockListener.onResult2(any())).thenReturn(Status.OK); xdsClient.deliverRdsUpdate(RDS_RESOURCE_NAME, Collections.singletonList(virtualHost)); createAndDeliverClusterUpdates(xdsClient, cluster1); verify(mockListener).onResult2(resolutionResultCaptor.capture()); @@ -941,6 +948,7 @@ public void resolved_rpcHashingByChannelId() { // A different resolver/Channel. resolver.shutdown(); reset(mockListener); + when(mockListener.onResult2(any())).thenReturn(Status.OK); when(mockRandom.nextLong()).thenReturn(123L); resolver = new XdsNameResolver(targetUri, null, AUTHORITY, null, serviceConfigParser, syncContext, scheduler, @@ -1044,6 +1052,7 @@ public void resolved_resourceUpdateAfterCallStarted() { TestCall firstCall = testCall; reset(mockListener); + when(mockListener.onResult2(any())).thenReturn(Status.OK); FakeXdsClient xdsClient = (FakeXdsClient) resolver.getXdsClient(); xdsClient.deliverLdsUpdate( Arrays.asList( @@ -1086,6 +1095,7 @@ public void resolved_resourceUpdateAfterCallStarted() { public void resolved_resourceUpdatedBeforeCallStarted() { InternalConfigSelector configSelector = resolveToClusters(); reset(mockListener); + when(mockListener.onResult2(any())).thenReturn(Status.OK); FakeXdsClient xdsClient = (FakeXdsClient) resolver.getXdsClient(); xdsClient.deliverLdsUpdate( Arrays.asList( @@ -1122,6 +1132,7 @@ public void resolved_raceBetweenCallAndRepeatedResourceUpdate() { assertCallSelectClusterResult(call1, configSelector, cluster1, 15.0); reset(mockListener); + when(mockListener.onResult2(any())).thenReturn(Status.OK); FakeXdsClient xdsClient = (FakeXdsClient) resolver.getXdsClient(); xdsClient.deliverLdsUpdate( Arrays.asList( @@ -1546,6 +1557,7 @@ public void filterState_shutdown_onLdsNotFound() { // LDS 2: resource not found. reset(mockListener); + when(mockListener.onResult2(any())).thenReturn(Status.OK); xdsClient.deliverLdsResourceNotFound(); assertEmptyResolutionResult(expectedLdsResourceName); // Verify shutdown. @@ -1603,6 +1615,7 @@ public void filterState_shutdown_onRdsNotFound() { // RDS 2: RDS_RESOURCE_NAME not found. reset(mockListener); + when(mockListener.onResult2(any())).thenReturn(Status.OK); xdsClient.deliverRdsResourceNotFound(RDS_RESOURCE_NAME); assertEmptyResolutionResult(RDS_RESOURCE_NAME); assertThat(lds1Filter1.isShutdown()).isTrue();