diff --git a/CHANGELOG.md b/CHANGELOG.md index 5dc2d8995..65abb1826 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,3 @@ - - # Changelog Lists all changes with user impact. @@ -11,6 +9,11 @@ The format is based on [Keep a Changelog](http://keepachangelog.com/en/1.0.0/). ## [0.19.19] +### Changed +- Added support for Delta XDS + +## [0.19.19] + ### Changed - Add default access log filter configuration @@ -33,7 +36,7 @@ The format is based on [Keep a Changelog](http://keepachangelog.com/en/1.0.0/). ### Changed - Added flags for lua filters - +- ## [0.19.14] ### Changed diff --git a/build.gradle b/build.gradle index 925677470..b809b25c4 100644 --- a/build.gradle +++ b/build.gradle @@ -47,9 +47,9 @@ allprojects { project.ext.versions = [ kotlin : '1.6.10', - java_controlplane : '0.1.32', + java_controlplane : '0.1.35', spring_boot : '2.3.4.RELEASE', - grpc : '1.21.0', + grpc : '1.48.1', jaxb : '2.3.1', javaxactivation : '1.2.0', micrometer : '1.5.5', diff --git a/docs/changelog_symlink.md b/docs/changelog_symlink.md deleted file mode 120000 index 04c99a55c..000000000 --- a/docs/changelog_symlink.md +++ /dev/null @@ -1 +0,0 @@ -../CHANGELOG.md \ No newline at end of file diff --git a/docs/changelog_symlink.md b/docs/changelog_symlink.md new file mode 100644 index 000000000..60b7a6ed9 --- /dev/null +++ b/docs/changelog_symlink.md @@ -0,0 +1 @@ +../CHANGELOG.md diff --git a/docs/configuration.md b/docs/configuration.md index 1fc2d8a00..04f586a49 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -7,8 +7,8 @@ Property ------------------------------------------------------------------------------------------- | ----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- | --------- **envoy-control.server.executor-group.type** | Group executor type. DIRECT or PARALLEL | DIRECT **envoy-control.server.executor-group.parallel-pool-size** | Pool size used for executor group in PARALLEL mode | 4 -**envoy-control.server.nio-event-loop-thread-count** | The number of threads that will be used by netty's nio event loop | 1 -**envoy-control.server.nio-event-loop-pool-size** | Pool size of NIO Event Loop | 0 (Number of CPUs * 2) +**envoy-control.server.nio-event-loop-thread-count** | The number of threads that will be used by netty's nio worker event loop | 1 +**envoy-control.server.nio-boss-event-loop-thread-count** | The number of threads that will be used by netty's nio boss event loop | 1 **envoy-control.server.netty.keep-alive-time** | Sets a custom keepalive time for Netty server | 15s **envoy-control.server.netty.permit-keep-alive-time** | Specify the most aggressive keep-alive time clients are permitted to configure (in seconds) | 10s **envoy-control.server.netty.permit-keep-alive-without-calls** | Sets whether to allow clients to send keep-alive HTTP/2 PINGs even if there are no outstanding RPCs on the connection | true @@ -89,6 +89,7 @@ Property **envoy-control.envoy.snapshot.max-host-ttl** | The TTL for hosts that are unused. Hosts that have not been used in the configured time interval will be purged | 300s **envoy-control.envoy.snapshot.rate-limit.domain** | Domain name for ratelimit service. | rl **envoy-control.envoy.snapshot.rate-limit.service-name** | ratelimit GRPC service name | ratelimit-grpc +**envoy-control.envoy.snapshot.delta-xds-enabled** | Enable detla xds | false **envoy-control.envoy.snapshot.should-audit-global-snapshot** | Enable global snapshot audits | false diff --git a/docs/deployment/observability.md b/docs/deployment/observability.md index a9567485e..ebce6bb30 100644 --- a/docs/deployment/observability.md +++ b/docs/deployment/observability.md @@ -51,14 +51,20 @@ Metric | Description #### xDS requests -Metric | Description ---------------------------| -------------------------------------------------------- -**grpc.requests.cds** | Counter of received gRPC CDS requests -**grpc.requests.eds** | Counter of received gRPC EDS requests -**grpc.requests.lds** | Counter of received gRPC LDS requests -**grpc.requests.rds** | Counter of received gRPC RDS requests -**grpc.requests.sds** | Counter of received gRPC SDS requests -**grpc.requests.unknown** | Counter of received gRPC requests for unknown resource +Metric | Description +------------------------------- | -------------------------------------------------------- +**grpc.requests.cds** | Counter of received gRPC CDS requests +**grpc.requests.eds** | Counter of received gRPC EDS requests +**grpc.requests.lds** | Counter of received gRPC LDS requests +**grpc.requests.rds** | Counter of received gRPC RDS requests +**grpc.requests.sds** | Counter of received gRPC SDS requests +**grpc.requests.unknown** | Counter of received gRPC requests for unknown resource +**grpc.requests.cds.delta** | Counter of received gRPC delta CDS requests +**grpc.requests.eds.delta** | Counter of received gRPC delta EDS requests +**grpc.requests.lds.delta** | Counter of received gRPC delta LDS requests +**grpc.requests.rds.delta** | Counter of received gRPC delta RDS requests +**grpc.requests.sds.delta** | Counter of received gRPC delta SDS requests +**grpc.requests.unknown.delta** | Counter of received gRPC delta requests for unknown resource #### Snapshot diff --git a/envoy-control-core/src/main/java/pl/allegro/tech/servicemesh/envoycontrol/GroupCacheStatusInfo.java b/envoy-control-core/src/main/java/pl/allegro/tech/servicemesh/envoycontrol/GroupCacheStatusInfo.java deleted file mode 100644 index de714b084..000000000 --- a/envoy-control-core/src/main/java/pl/allegro/tech/servicemesh/envoycontrol/GroupCacheStatusInfo.java +++ /dev/null @@ -1,44 +0,0 @@ -package pl.allegro.tech.servicemesh.envoycontrol; - -import io.envoyproxy.controlplane.cache.CacheStatusInfo; -import io.envoyproxy.controlplane.cache.StatusInfo; -import java.util.ArrayList; -import java.util.Collection; -import javax.annotation.concurrent.ThreadSafe; - -/** - * {@code GroupCacheStatusInfo} provides an implementation of {@link StatusInfo} for a group of {@link CacheStatusInfo}. - * This class is copy of {@link io.envoyproxy.controlplane.cache.GroupCacheStatusInfo} - */ -@ThreadSafe -class GroupCacheStatusInfo implements StatusInfo { - private final Collection> statuses; - - public GroupCacheStatusInfo(Collection> statuses) { - this.statuses = new ArrayList<>(statuses); - } - - /** - * {@inheritDoc} - */ - @Override - public long lastWatchRequestTime() { - return statuses.stream().mapToLong(CacheStatusInfo::lastWatchRequestTime).max().orElse(0); - } - - /** - * {@inheritDoc} - */ - @Override - public T nodeGroup() { - return statuses.stream().map(CacheStatusInfo::nodeGroup).findFirst().orElse(null); - } - - /** - * {@inheritDoc} - */ - @Override - public int numWatches() { - return statuses.stream().mapToInt(CacheStatusInfo::numWatches).sum(); - } -} \ No newline at end of file diff --git a/envoy-control-core/src/main/java/pl/allegro/tech/servicemesh/envoycontrol/SimpleCache.java b/envoy-control-core/src/main/java/pl/allegro/tech/servicemesh/envoycontrol/SimpleCache.java index c5cbe713c..0dbabcc33 100644 --- a/envoy-control-core/src/main/java/pl/allegro/tech/servicemesh/envoycontrol/SimpleCache.java +++ b/envoy-control-core/src/main/java/pl/allegro/tech/servicemesh/envoycontrol/SimpleCache.java @@ -2,19 +2,12 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; import com.google.common.collect.Sets; import com.google.protobuf.Message; -import io.envoyproxy.controlplane.cache.CacheStatusInfo; -import io.envoyproxy.controlplane.cache.NodeGroup; -import io.envoyproxy.controlplane.cache.Resources; -import io.envoyproxy.controlplane.cache.Response; -import io.envoyproxy.controlplane.cache.Snapshot; -import io.envoyproxy.controlplane.cache.SnapshotCache; -import io.envoyproxy.controlplane.cache.StatusInfo; -import io.envoyproxy.controlplane.cache.Watch; -import io.envoyproxy.controlplane.cache.WatchCancelledException; -import io.envoyproxy.controlplane.cache.XdsRequest; +import io.envoyproxy.controlplane.cache.*; +import io.envoyproxy.controlplane.cache.GroupCacheStatusInfo; import io.envoyproxy.envoy.config.endpoint.v3.ClusterLoadAssignment; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -23,6 +16,7 @@ import java.util.Collection; import java.util.Collections; import java.util.HashMap; +import java.util.List; import java.util.Map; import java.util.Objects; import java.util.Set; @@ -34,6 +28,8 @@ import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.function.Consumer; import java.util.stream.Collectors; +import java.util.function.Function; +import java.util.stream.Stream; import static io.envoyproxy.controlplane.cache.Resources.RESOURCE_TYPES_IN_ORDER; @@ -53,7 +49,7 @@ public class SimpleCache implements SnapshotCache { @GuardedBy("lock") private final Map snapshots = new HashMap<>(); - private final ConcurrentMap>> statuses = new ConcurrentHashMap<>(); + private final CacheStatusInfoAggregator statuses = new CacheStatusInfoAggregator<>(); private AtomicLong watchCount = new AtomicLong(); @@ -76,10 +72,9 @@ public boolean clearSnapshot(T group) { // we take a writeLock to prevent watches from being created writeLock.lock(); try { - Map> status = statuses.get(group); // If we don't know about this group, do nothing. - if (status != null && status.values().stream().mapToLong(CacheStatusInfo::numWatches).sum() > 0) { + if (statuses.hasStatuses(group)) { LOGGER.warn("tried to clear snapshot for group with existing watches, group={}", group); return false; @@ -94,6 +89,14 @@ public boolean clearSnapshot(T group) { } } + public Watch createWatch( + boolean ads, + XdsRequest request, + Set knownResourceNames, + Consumer responseConsumer) { + return createWatch(ads, request, knownResourceNames, responseConsumer, false); + } + /** * {@inheritDoc} */ @@ -104,7 +107,7 @@ public Watch createWatch( Set knownResourceNames, Consumer responseConsumer, boolean hasClusterChanged) { - Resources.ResourceType requestResourceType = request.getResourceType(); + Resources.ResourceType requestResourceType = request.getResourceType(); Preconditions.checkNotNull(requestResourceType, "unsupported type URL %s", request.getTypeUrl()); T group; @@ -114,8 +117,7 @@ public Watch createWatch( // doesn't conflict readLock.lock(); try { - CacheStatusInfo status = statuses.computeIfAbsent(group, g -> new ConcurrentHashMap<>()) - .computeIfAbsent(requestResourceType, s -> new CacheStatusInfo<>(group)); + CacheStatusInfo status = statuses.getOrAddStatusInfo(group, requestResourceType); status.setLastWatchRequestTime(System.currentTimeMillis()); U snapshot = snapshots.get(group); @@ -149,20 +151,7 @@ public Watch createWatch( // If the requested version is up-to-date or missing a response, leave an open watch. if (snapshot == null || request.getVersionInfo().equals(version)) { - long watchId = watchCount.incrementAndGet(); - - if (LOGGER.isDebugEnabled()) { - LOGGER.debug("open watch {} for {}[{}] from node {} for version {}", - watchId, - request.getTypeUrl(), - String.join(", ", request.getResourceNamesList()), - group, - request.getVersionInfo()); - } - - status.setWatch(watchId, watch); - - watch.setStop(() -> status.removeWatch(watchId)); + openWatch(status, watch, request.getTypeUrl(), request.getResourceNamesList(), group, request.getVersionInfo()); return watch; } @@ -171,31 +160,126 @@ public Watch createWatch( boolean responded = respond(watch, snapshot, group); if (!responded) { - long watchId = watchCount.incrementAndGet(); + openWatch(status, watch, request.getTypeUrl(), request.getResourceNamesList(), group, request.getVersionInfo()); + } - if (LOGGER.isDebugEnabled()) { - LOGGER.debug("did not respond immediately, leaving open watch {} for {}[{}] from node {} for version {}", - watchId, - request.getTypeUrl(), - String.join(", ", request.getResourceNamesList()), - group, - request.getVersionInfo()); + return watch; + } finally { + readLock.unlock(); + } + } + + /** + * {@inheritDoc} + */ + @Override + public DeltaWatch createDeltaWatch( + DeltaXdsRequest request, + String requesterVersion, + Map resourceVersions, + Set pendingResources, + boolean isWildcard, + Consumer responseConsumer, + boolean hasClusterChanged) { + + Resources.ResourceType requestResourceType = request.getResourceType(); + Preconditions.checkNotNull(requestResourceType, "unsupported type URL %s", + request.getTypeUrl()); + T group; + group = groups.hash(request.v3Request().getNode()); + + // even though we're modifying, we take a readLock to allow multiple watches to be created in parallel since it + // doesn't conflict + readLock.lock(); + try { + DeltaCacheStatusInfo status = statuses.getOrAddDeltaStatusInfo(group, requestResourceType); + + status.setLastWatchRequestTime(System.currentTimeMillis()); + + U snapshot = snapshots.get(group); + String version = snapshot == null ? "" : snapshot.version(requestResourceType, Collections.emptyList()); + + DeltaWatch watch = new DeltaWatch(request, + ImmutableMap.copyOf(resourceVersions), + ImmutableSet.copyOf(pendingResources), + requesterVersion, + isWildcard, + responseConsumer); + + // If no snapshot, leave an open watch. + + if (snapshot == null) { + openWatch(status, watch, request.getTypeUrl(), watch.trackedResources().keySet(), group, requesterVersion); + return watch; + } + + // If the requested version is up-to-date or missing a response, leave an open watch. + if (version.equals(requesterVersion)) { + // If the request is not wildcard, we have pending resources and we have them, we should respond immediately. + if (!isWildcard && watch.pendingResources().size() != 0) { + // If any of the pending resources are in the snapshot respond immediately. If not we'll fall back to + // version comparisons. + Map> resources = snapshot.versionedResources(request.getResourceType()); + Map> requestedResources = watch.pendingResources() + .stream() + .filter(resources::containsKey) + .collect(Collectors.toMap(Function.identity(), resources::get)); + ResponseState responseState = respondDelta(watch, + requestedResources, + Collections.emptyList(), + version, + group); + if (responseState.isFinished()) { + return watch; + } + } else if (hasClusterChanged && requestResourceType.equals(Resources.ResourceType.ENDPOINT)) { + ResponseState responseState = respondDelta(request, watch, snapshot, version, group); + if (responseState.isFinished()) { + return watch; + } } - status.setWatch(watchId, watch); + openWatch(status, watch, request.getTypeUrl(), watch.trackedResources().keySet(), group, requesterVersion); - watch.setStop(() -> status.removeWatch(watchId)); + return watch; + } + + // Otherwise, version is different, the watch may be responded immediately + ResponseState responseState = respondDelta(request, watch, snapshot, version, group); + + if (responseState.isFinished()) { + return watch; } + openWatch(status, watch, request.getTypeUrl(), watch.trackedResources().keySet(), group, requesterVersion); + return watch; } finally { readLock.unlock(); } } + private > void openWatch(MutableStatusInfo status, + V watch, + String url, + Collection resources, + T group, + String version) { + long watchId = watchCount.incrementAndGet(); + status.setWatch(watchId, watch); + watch.setStop(() -> status.removeWatch(watchId)); + if (LOGGER.isDebugEnabled()) { + LOGGER.debug("open watch {} for {}[{}] from node {} for version {}", + watchId, + url, + String.join(", ", resources), + group, + version); + } + } + /** * {@inheritDoc} - * @return */ @Override public U getSnapshot(T group) { @@ -213,7 +297,7 @@ public U getSnapshot(T group) { */ @Override public Collection groups() { - return ImmutableSet.copyOf(statuses.keySet()); + return ImmutableSet.copyOf(statuses.groups()); } /** @@ -225,88 +309,157 @@ public Collection groups() { @Override public void setSnapshot(T group, U snapshot) { // we take a writeLock to prevent watches from being created while we update the snapshot - ConcurrentMap> status; + Map> status; + Map> deltaStatus; + U previousSnapshot; writeLock.lock(); try { // Update the existing snapshot entry. - snapshots.put(group, snapshot); - status = statuses.get(group); + previousSnapshot = snapshots.put(group, snapshot); + status = statuses.getStatus(group); + deltaStatus = statuses.getDeltaStatus(group); } finally { writeLock.unlock(); } - if (status == null) { + if (status.isEmpty() && deltaStatus.isEmpty()) { return; } - // Responses should be in specific order and TYPE_URLS has a list of resources in the right order. - respondWithSpecificOrder(group, snapshot, status); + // Responses should be in specific order and typeUrls has a list of resources in the right + // order. + respondWithSpecificOrder(group, previousSnapshot, snapshot, status, deltaStatus); } /** * {@inheritDoc} */ @Override - public StatusInfo statusInfo(T group) { + public StatusInfo statusInfo(T group) { readLock.lock(); try { - ConcurrentMap> statusMap = statuses.get(group); - if (statusMap == null || statusMap.isEmpty()) { + Map> statusMap = statuses.getStatus(group); + Map> deltaStatusMap = statuses.getDeltaStatus(group); + + if (statusMap.isEmpty() && deltaStatusMap.isEmpty()) { return null; } - return new GroupCacheStatusInfo<>(statusMap.values()); + List> collection = Stream.concat(statusMap.values().stream(), + deltaStatusMap.values().stream()).collect(Collectors.toList()); + + return new GroupCacheStatusInfo<>(collection); } finally { readLock.unlock(); } } @VisibleForTesting - protected void respondWithSpecificOrder(T group, U snapshot, ConcurrentMap> statusMap) { + protected void respondWithSpecificOrder(T group, + U previousSnapshot, U snapshot, + Map> statusMap, + Map> deltaStatusMap) { for (Resources.ResourceType resourceType : RESOURCE_TYPES_IN_ORDER) { CacheStatusInfo status = statusMap.get(resourceType); - if (status == null) continue; // todo: why this happens? - status.watchesRemoveIf((id, watch) -> { - if (!watch.request().getResourceType().equals(resourceType)) { - return false; - } - String version = snapshot.version(watch.request().getResourceType(), watch.request().getResourceNamesList()); + if (status != null) { + status.watchesRemoveIf((id, watch) -> { + if (!watch.request().getResourceType().equals(resourceType)) { + return false; + } + String version = snapshot.version(watch.request().getResourceType(), watch.request().getResourceNamesList()); - if (!watch.request().getVersionInfo().equals(version)) { - if (LOGGER.isDebugEnabled()) { - LOGGER.debug("responding to open watch {}[{}] with new version {}", + if (!watch.request().getVersionInfo().equals(version)) { + if (LOGGER.isDebugEnabled()) { + LOGGER.debug("responding to open watch {}[{}] with new version {}", id, String.join(", ", watch.request().getResourceNamesList()), version); - } + } - respond(watch, snapshot, group); + respond(watch, snapshot, group); - // Discard the watch. A new watch will be created for future snapshots once envoy ACKs the response. - return true; - } + // Discard the watch. A new watch will be created for future snapshots once envoy ACKs the response. + return true; + } - // Do not discard the watch. The request version is the same as the snapshot version, so we wait to respond. - return false; - }); + // Do not discard the watch. The request version is the same as the snapshot version, so we wait to respond. + return false; + }); + } + DeltaCacheStatusInfo deltaStatus = deltaStatusMap.get(resourceType); + if (deltaStatus != null) { + Map> previousResources = previousSnapshot == null + ? Collections.emptyMap() + : previousSnapshot.versionedResources(resourceType); + Map> snapshotResources = snapshot.versionedResources(resourceType); + + Map> snapshotChangedResources = snapshotResources.entrySet() + .stream() + .filter(entry -> { + VersionedResource versionedResource = previousResources.get(entry.getKey()); + return versionedResource == null || !versionedResource + .version().equals(entry.getValue().version()); + }) + .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); + + Set snapshotRemovedResources = previousResources.keySet() + .stream() + .filter(s -> !snapshotResources.containsKey(s)) + .collect(Collectors.toSet()); + + deltaStatus.watchesRemoveIf((id, watch) -> { + String version = snapshot.version(watch.request().getResourceType(), Collections.emptyList()); + + if (!watch.version().equals(version)) { + if (LOGGER.isDebugEnabled()) { + LOGGER.debug("responding to open watch {}[{}] with new version {}", + id, + String.join(", ", watch.trackedResources().keySet()), + version); + } + + List removedResources = snapshotRemovedResources.stream() + .filter(s -> watch.trackedResources().get(s) != null) + .collect(Collectors.toList()); + + Map> changedResources = findChangedResources(watch, snapshotChangedResources); + + ResponseState responseState = respondDelta(watch, + changedResources, + removedResources, + version, + group); + // Discard the watch if it was responded or cancelled. + // A new watch will be created for future snapshots once envoy ACKs the response. + return responseState.isFinished(); + } + + // Do not discard the watch. The request version is the same as the snapshot version, so we wait to respond. + return false; + }); + } } } - private Response createResponse(XdsRequest request, Map resources, String version) { + private Response createResponse(XdsRequest request, Map> resources, + String version) { Collection filtered = request.getResourceNamesList().isEmpty() - ? resources.values() - : request.getResourceNamesList().stream() - .map(resources::get) - .filter(Objects::nonNull) - .collect(Collectors.toList()); + ? resources.values().stream() + .map(VersionedResource::resource) + .collect(Collectors.toList()) + : request.getResourceNamesList().stream() + .map(resources::get) + .filter(Objects::nonNull) + .map(VersionedResource::resource) + .collect(Collectors.toList()); return Response.create(request, filtered, version); } private boolean respond(Watch watch, U snapshot, T group) { - Map snapshotResources = snapshot.resources(watch.request().getResourceType()); - Map snapshotForMissingResources = Collections.emptyMap(); + Map> snapshotResources = snapshot.versionedResources(watch.request().getResourceType()); + Map> snapshotForMissingResources = Collections.emptyMap(); if (!watch.request().getResourceNamesList().isEmpty() && watch.ads()) { Collection missingNames = watch.request().getResourceNamesList().stream() @@ -336,7 +489,7 @@ private boolean respond(Watch watch, U snapshot, T group) { for (String missingName : missingNames) { snapshotForMissingResources.put( missingName, - ClusterLoadAssignment.newBuilder().setClusterName(missingName).build() + VersionedResource.create(ClusterLoadAssignment.newBuilder().setClusterName(missingName).build()) ); } } else { @@ -362,7 +515,7 @@ private boolean respond(Watch watch, U snapshot, T group) { version); Response response; if (!snapshotForMissingResources.isEmpty()) { - snapshotForMissingResources.putAll((Map) snapshotResources); + snapshotForMissingResources.putAll(snapshotResources); response = createResponse( watch.request(), snapshotForMissingResources, @@ -388,4 +541,81 @@ private boolean respond(Watch watch, U snapshot, T group) { return false; } + + private List findRemovedResources(DeltaWatch watch, Map> snapshotResources) { + // remove resources for which client has a tracked version but do not exist in snapshot + return watch.trackedResources().keySet() + .stream() + .filter(s -> !snapshotResources.containsKey(s)) + .collect(Collectors.toList()); + } + + private Map> findChangedResources(DeltaWatch watch, + Map> snapshotResources) { + return snapshotResources.entrySet() + .stream() + .filter(entry -> { + if (watch.pendingResources().contains(entry.getKey())) { + return true; + } + String resourceVersion = watch.trackedResources().get(entry.getKey()); + if (resourceVersion == null) { + // resource is not tracked, should respond it only if watch is wildcard + return watch.isWildcard(); + } + return !entry.getValue().version().equals(resourceVersion); + }) + .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); + } + + private ResponseState respondDelta(DeltaXdsRequest request, DeltaWatch watch, U snapshot, String version, T group) { + Map> snapshotResources = snapshot.versionedResources(request.getResourceType()); + List removedResources = findRemovedResources(watch, + snapshotResources); + Map> changedResources = findChangedResources(watch, snapshotResources); + return respondDelta(watch, + changedResources, + removedResources, + version, + group); + } + + private ResponseState respondDelta(DeltaWatch watch, + Map> resources, + List removedResources, + String version, + T group) { + if (resources.isEmpty() && removedResources.isEmpty()) { + return ResponseState.UNRESPONDED; + } + + DeltaResponse response = DeltaResponse.create( + watch.request(), + resources, + removedResources, + version); + + try { + watch.respond(response); + return ResponseState.RESPONDED; + } catch (WatchCancelledException e) { + LOGGER.error( + "failed to respond for {} from node {} with version {} because watch was already cancelled", + watch.request().getTypeUrl(), + group, + version); + } + + return ResponseState.CANCELLED; + } + + private enum ResponseState { + RESPONDED, + UNRESPONDED, + CANCELLED; + + private boolean isFinished() { + return this.equals(RESPONDED) || this.equals(CANCELLED); + } + } } diff --git a/envoy-control-core/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/ControlPlane.kt b/envoy-control-core/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/ControlPlane.kt index c44725443..ac431e297 100644 --- a/envoy-control-core/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/ControlPlane.kt +++ b/envoy-control-core/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/ControlPlane.kt @@ -12,6 +12,7 @@ import io.grpc.netty.NettyServerBuilder import io.micrometer.core.instrument.MeterRegistry import io.micrometer.core.instrument.binder.jvm.ExecutorServiceMetrics import io.netty.channel.nio.NioEventLoopGroup +import io.netty.channel.socket.nio.NioServerSocketChannel import pl.allegro.tech.servicemesh.envoycontrol.groups.Group import pl.allegro.tech.servicemesh.envoycontrol.groups.GroupChangeWatcher import pl.allegro.tech.servicemesh.envoycontrol.groups.MetadataNodeGroup @@ -89,6 +90,7 @@ class ControlPlane private constructor( ) { var grpcServerExecutor: Executor? = null var nioEventLoopExecutor: Executor? = null + var nioBossEventLoopExecutor: Executor? = null var executorGroup: ExecutorGroup? = null var globalSnapshotExecutor: Executor? = null var globalSnapshotAuditExecutor: Executor? = null @@ -112,6 +114,12 @@ class ControlPlane private constructor( nioEventLoopExecutor = newMeteredCachedThreadPool("grpc-worker-event-loop") } + if (nioBossEventLoopExecutor == null) { + // unbounded executor - netty will only use configured number of threads + // (by nioEventLoopThreadCount property or default netty value: * 2) + nioBossEventLoopExecutor = newMeteredCachedThreadPool("grpc-boss-event-loop") + } + if (executorGroup == null) { executorGroup = buildExecutorGroup() } @@ -210,6 +218,11 @@ class ControlPlane private constructor( nioEventLoopExecutor ) ) + .bossEventLoopGroup(NioEventLoopGroup( + properties.server.nioBossEventLoopThreadCount, + nioBossEventLoopExecutor + )) + .channelType(NioServerSocketChannel::class.java) .executor(grpcServerExecutor) .keepAliveTime(properties.server.netty.keepAliveTime.toMillis(), TimeUnit.MILLISECONDS) .permitKeepAliveTime(properties.server.netty.permitKeepAliveTime.toMillis(), TimeUnit.MILLISECONDS) @@ -300,6 +313,11 @@ class ControlPlane private constructor( return this } + fun withNioBossEventLoopExecutor(executor: Executor): ControlPlaneBuilder { + nioBossEventLoopExecutor = executor + return this + } + fun withExecutorGroup(executor: ExecutorGroup): ControlPlaneBuilder { executorGroup = executor return this diff --git a/envoy-control-core/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/groups/GroupChangeWatcher.kt b/envoy-control-core/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/groups/GroupChangeWatcher.kt index 12ba07f11..e95c897b5 100644 --- a/envoy-control-core/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/groups/GroupChangeWatcher.kt +++ b/envoy-control-core/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/groups/GroupChangeWatcher.kt @@ -1,6 +1,9 @@ package pl.allegro.tech.servicemesh.envoycontrol.groups import io.envoyproxy.controlplane.cache.ConfigWatcher +import io.envoyproxy.controlplane.cache.DeltaResponse +import io.envoyproxy.controlplane.cache.DeltaWatch +import io.envoyproxy.controlplane.cache.DeltaXdsRequest import io.envoyproxy.controlplane.cache.Response import io.envoyproxy.controlplane.cache.Watch import io.envoyproxy.controlplane.cache.XdsRequest @@ -57,6 +60,34 @@ internal class GroupChangeWatcher( return watch } + override fun createDeltaWatch( + request: DeltaXdsRequest?, + requesterVersion: String?, + resourceVersions: MutableMap?, + pendingResources: MutableSet?, + isWildcard: Boolean, + responseConsumer: Consumer?, + hasClusterChanged: Boolean + ): DeltaWatch { + val oldGroups = cache.groups() + + val watch = cache.createDeltaWatch( + request, + requesterVersion, + resourceVersions, + pendingResources, + isWildcard, + responseConsumer, + hasClusterChanged + ) + val groups = cache.groups() + metrics.setCacheGroupsCount(groups.size) + if (oldGroups != groups) { + emitNewGroupsEvent(groups - oldGroups) + } + return watch + } + private fun emitNewGroupsEvent(difference: List) { groupChangeEmitter?.next(difference) } diff --git a/envoy-control-core/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/groups/NodeMetadataValidator.kt b/envoy-control-core/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/groups/NodeMetadataValidator.kt index cae85956c..feb5e20c2 100644 --- a/envoy-control-core/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/groups/NodeMetadataValidator.kt +++ b/envoy-control-core/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/groups/NodeMetadataValidator.kt @@ -8,6 +8,7 @@ import pl.allegro.tech.servicemesh.envoycontrol.protocol.HttpMethod import pl.allegro.tech.servicemesh.envoycontrol.snapshot.SnapshotProperties import io.envoyproxy.envoy.config.core.v3.Node as NodeV3 import io.envoyproxy.envoy.service.discovery.v3.DiscoveryRequest as DiscoveryRequestV3 +import io.envoyproxy.envoy.service.discovery.v3.DeltaDiscoveryRequest as DeltaDiscoveryRequestV3 class V2NotSupportedException : NodeMetadataValidationException( "Blocked service from receiving updates. V2 resources are not supported by server." @@ -63,6 +64,13 @@ class NodeMetadataValidator( request?.node?.let { validateV3Metadata(it) } } + override fun onV3StreamDeltaRequest( + streamId: Long, + request: DeltaDiscoveryRequestV3? + ) { + request?.node?.let { validateV3Metadata(it) } + } + private fun validateV3Metadata(node: NodeV3) { // Some validation logic is executed when NodeMetadata is created. // This may throw NodeMetadataValidationException diff --git a/envoy-control-core/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/server/CachedProtoResourcesSerializer.kt b/envoy-control-core/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/server/CachedProtoResourcesSerializer.kt index 46056413c..3503b70a4 100644 --- a/envoy-control-core/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/server/CachedProtoResourcesSerializer.kt +++ b/envoy-control-core/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/server/CachedProtoResourcesSerializer.kt @@ -26,10 +26,10 @@ internal class CachedProtoResourcesSerializer( } } - private val cache: Cache, MutableCollection> = createCache("protobuf-cache") + private val cache: Cache = createCache("protobuf-cache") private val timer = createTimer(reportMetrics, meterRegistry, "protobuf-cache.serialize.time") - private fun createCache(cacheName: String): Cache, MutableCollection> { + private fun createCache(cacheName: String): Cache { return if (reportMetrics) { GuavaCacheMetrics .monitor( @@ -37,33 +37,23 @@ internal class CachedProtoResourcesSerializer( CacheBuilder.newBuilder() .recordStats() .weakValues() - .build(), + .build(), cacheName ) } else { CacheBuilder.newBuilder() .weakValues() - .build() + .build() } } - override fun serialize( - resources: MutableCollection, - apiVersion: Resources.ApiVersion - ): MutableCollection { - return timer.record(Supplier { getResources(resources) }) - } - - private fun getResources(resources: MutableCollection): MutableCollection { - return cache.get(resources) { - resources.asSequence() - .map { Any.pack(it) } - .toMutableList() - } - } - - @Suppress("NotImplementedDeclaration") - override fun serialize(resource: Message?, apiVersion: Resources.ApiVersion?): Any { - throw NotImplementedError("Serializing single messages is not supported") + override fun serialize(resource: Message, apiVersion: Resources.ApiVersion): Any { + return timer.record(Supplier { + cache.get(resource) { + Any.pack( + resource + ) + } + }) } } diff --git a/envoy-control-core/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/server/ServerProperties.kt b/envoy-control-core/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/server/ServerProperties.kt index b7ad29c2f..6ad4a9385 100644 --- a/envoy-control-core/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/server/ServerProperties.kt +++ b/envoy-control-core/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/server/ServerProperties.kt @@ -7,6 +7,7 @@ import java.time.Duration class ServerProperties { var port = 50000 var nioEventLoopThreadCount = 0 // if set to 0, default Netty value will be used: * 2 + var nioBossEventLoopThreadCount = 0 // if set to 0, default Netty value will be used: * 2 var serverPoolSize = 16 var serverPoolKeepAlive: Duration = Duration.ofMinutes(10) var executorGroup = ExecutorProperties() diff --git a/envoy-control-core/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/server/callbacks/CompositeDiscoveryServerCallbacks.kt b/envoy-control-core/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/server/callbacks/CompositeDiscoveryServerCallbacks.kt index 481389f62..93fab3687 100644 --- a/envoy-control-core/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/server/callbacks/CompositeDiscoveryServerCallbacks.kt +++ b/envoy-control-core/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/server/callbacks/CompositeDiscoveryServerCallbacks.kt @@ -4,6 +4,7 @@ import io.envoyproxy.controlplane.server.DiscoveryServerCallbacks import io.envoyproxy.controlplane.server.exception.RequestException import io.micrometer.core.instrument.MeterRegistry import pl.allegro.tech.servicemesh.envoycontrol.logger +import io.envoyproxy.envoy.service.discovery.v3.DeltaDiscoveryRequest as v3DeltaDiscoveryRequest import io.envoyproxy.envoy.service.discovery.v3.DiscoveryRequest as v3DiscoveryRequest import io.envoyproxy.envoy.service.discovery.v3.DiscoveryResponse as v3DiscoveryResponse @@ -40,6 +41,15 @@ class CompositeDiscoveryServerCallbacks( } } + override fun onV3StreamDeltaRequest( + streamId: Long, + request: v3DeltaDiscoveryRequest? + ) { + runCallbacks { + it.onV3StreamDeltaRequest(streamId, request) + } + } + override fun onV3StreamResponse( streamId: Long, request: v3DiscoveryRequest?, diff --git a/envoy-control-core/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/server/callbacks/LoggingDiscoveryServerCallbacks.kt b/envoy-control-core/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/server/callbacks/LoggingDiscoveryServerCallbacks.kt index a3fd143f7..315e25089 100644 --- a/envoy-control-core/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/server/callbacks/LoggingDiscoveryServerCallbacks.kt +++ b/envoy-control-core/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/server/callbacks/LoggingDiscoveryServerCallbacks.kt @@ -2,6 +2,7 @@ package pl.allegro.tech.servicemesh.envoycontrol.server.callbacks import io.envoyproxy.controlplane.server.DiscoveryServerCallbacks import org.slf4j.LoggerFactory +import io.envoyproxy.envoy.service.discovery.v3.DeltaDiscoveryRequest as v3DeltaDiscoveryRequest import io.envoyproxy.envoy.service.discovery.v3.DiscoveryRequest as v3DiscoveryRequest import io.envoyproxy.envoy.service.discovery.v3.DiscoveryResponse as v3DiscoveryResponse @@ -19,6 +20,13 @@ class LoggingDiscoveryServerCallbacks( logger.debug("onV3StreamRequest streamId: {} request: {}", streamId, requestData(request)) } + override fun onV3StreamDeltaRequest( + streamId: Long, + request: v3DeltaDiscoveryRequest? + ) { + logger.debug("onV3StreamDeltaRequest streamId: {} request: {}", streamId, requestData(request)) + } + override fun onStreamCloseWithError(streamId: Long, typeUrl: String?, error: Throwable?) { logger.debug("onStreamCloseWithError streamId: {}, typeUrl: {}", streamId, typeUrl, error) } @@ -40,6 +48,14 @@ class LoggingDiscoveryServerCallbacks( ) } + private fun requestData(request: v3DeltaDiscoveryRequest?): String { + return if (logFullRequest) { + "$request" + } else { + "id: ${request?.node?.id}, cluster: ${request?.node?.cluster}, " + + "type: ${request?.typeUrl}, responseNonce: ${request?.responseNonce}" + } + } private fun requestData(request: v3DiscoveryRequest?): String { return if (logFullRequest) { "$request" diff --git a/envoy-control-core/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/server/callbacks/MetricsDiscoveryServerCallbacks.kt b/envoy-control-core/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/server/callbacks/MetricsDiscoveryServerCallbacks.kt index b23e0ce86..c0f65410b 100644 --- a/envoy-control-core/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/server/callbacks/MetricsDiscoveryServerCallbacks.kt +++ b/envoy-control-core/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/server/callbacks/MetricsDiscoveryServerCallbacks.kt @@ -3,6 +3,7 @@ package pl.allegro.tech.servicemesh.envoycontrol.server.callbacks import io.envoyproxy.controlplane.cache.Resources import io.envoyproxy.controlplane.server.DiscoveryServerCallbacks import io.envoyproxy.envoy.service.discovery.v3.DiscoveryRequest as V3DiscoveryRequest +import io.envoyproxy.envoy.service.discovery.v3.DeltaDiscoveryRequest as V3DeltaDiscoveryRequest import io.micrometer.core.instrument.MeterRegistry import java.util.concurrent.atomic.AtomicInteger @@ -54,6 +55,14 @@ class MetricsDiscoveryServerCallbacks(private val meterRegistry: MeterRegistry) .increment() } + override fun onV3StreamDeltaRequest( + streamId: Long, + request: V3DeltaDiscoveryRequest + ) { + meterRegistry.counter("grpc.requests.${StreamType.fromTypeUrl(request.typeUrl).name.toLowerCase()}.delta") + .increment() + } + override fun onStreamCloseWithError(streamId: Long, typeUrl: String, error: Throwable) { connections.decrementAndGet() connectionsByType(typeUrl).decrementAndGet() diff --git a/envoy-control-core/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/snapshot/EnvoySnapshotFactory.kt b/envoy-control-core/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/snapshot/EnvoySnapshotFactory.kt index 498db7785..1357b6eac 100644 --- a/envoy-control-core/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/snapshot/EnvoySnapshotFactory.kt +++ b/envoy-control-core/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/snapshot/EnvoySnapshotFactory.kt @@ -223,7 +223,7 @@ class EnvoySnapshotFactory( if (rateLimitEndpoints.isNotEmpty()) listOf(properties.rateLimit.serviceName) else emptyList() val allClusters = egressRouteClusters + rateLimitClusters - return allClusters.mapNotNull { name -> globalSnapshot.endpoints.resources()[name] } + return allClusters.mapNotNull { name -> globalSnapshot.endpoints[name] } } private fun newSnapshotForGroup( @@ -271,7 +271,7 @@ class EnvoySnapshotFactory( val endpoints = getServicesEndpointsForGroup(group.proxySettings.incoming.rateLimitEndpoints, globalSnapshot, egressRouteSpecification) - val version = snapshotsVersions.version(group, clusters, endpoints, listeners) + val version = snapshotsVersions.version(group, clusters, endpoints, listeners, routes) return createSnapshot( clusters = clusters, diff --git a/envoy-control-core/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/snapshot/GlobalSnapshot.kt b/envoy-control-core/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/snapshot/GlobalSnapshot.kt index 296f273b3..ae618cd4f 100644 --- a/envoy-control-core/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/snapshot/GlobalSnapshot.kt +++ b/envoy-control-core/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/snapshot/GlobalSnapshot.kt @@ -5,25 +5,25 @@ import io.envoyproxy.envoy.config.cluster.v3.Cluster import io.envoyproxy.envoy.config.endpoint.v3.ClusterLoadAssignment data class GlobalSnapshot( - val clusters: SnapshotResources, + val clusters: Map, val allServicesNames: Set, - val endpoints: SnapshotResources, + val endpoints: Map, val clusterConfigurations: Map, - val securedClusters: SnapshotResources + val securedClusters: Map ) @Suppress("LongParameterList") fun globalSnapshot( - clusters: Iterable, - endpoints: Iterable, + clusters: Iterable = emptyList(), + endpoints: Iterable = emptyList(), properties: OutgoingPermissionsProperties = OutgoingPermissionsProperties(), - clusterConfigurations: Map, - securedClusters: List + clusterConfigurations: Map = emptyMap(), + securedClusters: List = emptyList() ): GlobalSnapshot { - val clusters = SnapshotResources.create(clusters, "") - val securedClusters = SnapshotResources.create(securedClusters, "") - val allServicesNames = getClustersForAllServicesGroups(clusters.resources(), properties) - val endpoints = SnapshotResources.create(endpoints, "") + val clusters = SnapshotResources.create(clusters, "").resources() + val securedClusters = SnapshotResources.create(securedClusters, "").resources() + val allServicesNames = getClustersForAllServicesGroups(clusters, properties) + val endpoints = SnapshotResources.create(endpoints, "").resources() return GlobalSnapshot( clusters = clusters, securedClusters = securedClusters, diff --git a/envoy-control-core/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/snapshot/SnapshotProperties.kt b/envoy-control-core/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/snapshot/SnapshotProperties.kt index b7ce00ee0..a5221c042 100644 --- a/envoy-control-core/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/snapshot/SnapshotProperties.kt +++ b/envoy-control-core/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/snapshot/SnapshotProperties.kt @@ -33,6 +33,7 @@ class SnapshotProperties { var jwt = JwtFilterProperties() var requireServiceName = false var rateLimit = RateLimitProperties() + var deltaXdsEnabled = false var retryPolicy = RetryPolicyProperties() var tcpDumpsEnabled: Boolean = true } diff --git a/envoy-control-core/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/snapshot/SnapshotsVersions.kt b/envoy-control-core/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/snapshot/SnapshotsVersions.kt index 8889a16d7..d1d9c37d5 100644 --- a/envoy-control-core/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/snapshot/SnapshotsVersions.kt +++ b/envoy-control-core/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/snapshot/SnapshotsVersions.kt @@ -3,6 +3,7 @@ package pl.allegro.tech.servicemesh.envoycontrol.snapshot import io.envoyproxy.envoy.config.cluster.v3.Cluster import io.envoyproxy.envoy.config.endpoint.v3.ClusterLoadAssignment import io.envoyproxy.envoy.config.listener.v3.Listener +import io.envoyproxy.envoy.config.route.v3.RouteConfiguration import pl.allegro.tech.servicemesh.envoycontrol.groups.Group import pl.allegro.tech.servicemesh.envoycontrol.snapshot.SnapshotsVersions.Companion.newVersion import java.util.UUID @@ -31,7 +32,8 @@ class SnapshotsVersions { group: Group, clusters: List, endpoints: List, - listeners: List = listOf() + listeners: List = listOf(), + routes: List = listOf() ): Version { val versionsWithData = versions.compute(group) { _, previous -> val version = when (previous) { @@ -44,15 +46,16 @@ class SnapshotsVersions { else -> { val clustersChanged = previous.clusters != clusters val listenersChanged = previous.listeners != listeners + val routesChanged = previous.routes != routes Version( clusters = selectClusters(previous, clusters, clustersChanged), endpoints = selectEndpoints(previous, endpoints, clustersChanged), listeners = selectListeners(previous, listenersChanged), - routes = selectRoutes(previous, listenersChanged, clustersChanged) + routes = selectRoutes(previous, listenersChanged, routesChanged) ) } } - VersionsWithData(version, clusters, endpoints, listeners) + VersionsWithData(version, clusters, endpoints, listeners, routes) } return versionsWithData!!.version } @@ -60,9 +63,9 @@ class SnapshotsVersions { private fun selectRoutes( previous: VersionsWithData, listenersChanged: Boolean, - clustersChanged: Boolean + routesChanged: Boolean ): RoutesVersion { - return if (listenersChanged || clustersChanged) RoutesVersion(newVersion()) else previous.version.routes + return if (listenersChanged || routesChanged) RoutesVersion(newVersion()) else previous.version.routes } private fun selectListeners(previous: VersionsWithData, hasChanged: Boolean): ListenersVersion { @@ -101,7 +104,8 @@ class SnapshotsVersions { val version: Version, val clusters: List, val endpoints: List, - val listeners: List + val listeners: List, + val routes: List ) data class Version( diff --git a/envoy-control-core/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/snapshot/resource/clusters/EnvoyClustersFactory.kt b/envoy-control-core/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/snapshot/resource/clusters/EnvoyClustersFactory.kt index fe68b3c43..1a25e5a7f 100644 --- a/envoy-control-core/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/snapshot/resource/clusters/EnvoyClustersFactory.kt +++ b/envoy-control-core/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/snapshot/resource/clusters/EnvoyClustersFactory.kt @@ -167,7 +167,7 @@ class EnvoyClustersFactory( private fun getRateLimitClusterForGroup(group: Group, globalSnapshot: GlobalSnapshot): List { if (group.proxySettings.incoming.rateLimitEndpoints.containsGlobalRateLimits()) { - val cluster = globalSnapshot.clusters.resources()[properties.rateLimit.serviceName] + val cluster = globalSnapshot.clusters[properties.rateLimit.serviceName] if (cluster != null) { return listOf(Cluster.newBuilder(cluster).build()) @@ -185,9 +185,9 @@ class EnvoyClustersFactory( private fun getEdsClustersForGroup(group: Group, globalSnapshot: GlobalSnapshot): List { val clusters: Map = if (enableTlsForGroup(group)) { - globalSnapshot.securedClusters.resources() + globalSnapshot.securedClusters } else { - globalSnapshot.clusters.resources() + globalSnapshot.clusters } val serviceDependencies = group.proxySettings.outgoing.getServiceDependencies().associateBy { it.service } @@ -376,7 +376,13 @@ class EnvoyClustersFactory( .setResourceApiVersion(ApiVersion.V3) .setApiConfigSource( ApiConfigSource.newBuilder() - .setApiType(ApiConfigSource.ApiType.GRPC) + .setApiType( + if (properties.deltaXdsEnabled) { + ApiConfigSource.ApiType.DELTA_GRPC + } else { + ApiConfigSource.ApiType.GRPC + } + ) .setTransportApiVersion(ApiVersion.V3) .addGrpcServices( 0, GrpcService.newBuilder().setEnvoyGrpc( diff --git a/envoy-control-core/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/snapshot/resource/listeners/filters/HttpConnectionManagerFactory.kt b/envoy-control-core/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/snapshot/resource/listeners/filters/HttpConnectionManagerFactory.kt index 117b3c0e4..336ecc73d 100644 --- a/envoy-control-core/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/snapshot/resource/listeners/filters/HttpConnectionManagerFactory.kt +++ b/envoy-control-core/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/snapshot/resource/listeners/filters/HttpConnectionManagerFactory.kt @@ -136,7 +136,13 @@ class HttpConnectionManagerFactory( private fun apiConfigSource(): ApiConfigSource { return ApiConfigSource.newBuilder() - .setApiType(ApiConfigSource.ApiType.GRPC) + .setApiType( + if (snapshotProperties.deltaXdsEnabled) { + ApiConfigSource.ApiType.DELTA_GRPC + } else { + ApiConfigSource.ApiType.GRPC + } + ) .setTransportApiVersion(ApiVersion.V3) .addGrpcServices( GrpcService.newBuilder() diff --git a/envoy-control-core/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/snapshot/resource/listeners/filters/RBACFilterFactory.kt b/envoy-control-core/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/snapshot/resource/listeners/filters/RBACFilterFactory.kt index 11f92d9b8..4f3bd6b52 100644 --- a/envoy-control-core/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/snapshot/resource/listeners/filters/RBACFilterFactory.kt +++ b/envoy-control-core/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/snapshot/resource/listeners/filters/RBACFilterFactory.kt @@ -507,7 +507,8 @@ class RBACFilterFactory( selectorMatching: SelectorMatching?, snapshot: GlobalSnapshot ): List { - val clusterLoadAssignment = snapshot.endpoints.resources()[client.name] + val resources = snapshot.endpoints + val clusterLoadAssignment = resources[client.name] val sourceIpPrincipal = mapEndpointsToExactPrincipals(clusterLoadAssignment) return if (sourceIpPrincipal == null) { diff --git a/envoy-control-core/src/test/java/pl/allegro/tech/servicemesh/envoycontrol/v3/SimpleCacheTest.java b/envoy-control-core/src/test/java/pl/allegro/tech/servicemesh/envoycontrol/v3/SimpleCacheTest.java index 2de95732b..3c5568f71 100644 --- a/envoy-control-core/src/test/java/pl/allegro/tech/servicemesh/envoycontrol/v3/SimpleCacheTest.java +++ b/envoy-control-core/src/test/java/pl/allegro/tech/servicemesh/envoycontrol/v3/SimpleCacheTest.java @@ -7,6 +7,7 @@ import io.envoyproxy.controlplane.cache.Resources; import io.envoyproxy.controlplane.cache.Response; import io.envoyproxy.controlplane.cache.StatusInfo; +import io.envoyproxy.controlplane.cache.VersionedResource; import io.envoyproxy.controlplane.cache.Watch; import io.envoyproxy.controlplane.cache.XdsRequest; import io.envoyproxy.controlplane.cache.v3.Snapshot; @@ -523,7 +524,8 @@ protected static void assertThatWatchReceivesSnapshot(WatchAndTracker watchAndTr assertThat(response).isNotNull(); assertThat(response.version()).isEqualTo(snapshot.version(watchAndTracker.watch.request().getTypeUrl())); assertThat(response.resources().toArray(new Message[0])) - .containsExactlyElementsOf(snapshot.resources(watchAndTracker.watch.request().getTypeUrl()).values()); + .containsExactlyElementsOf(snapshot.resources(watchAndTracker.watch.request().getTypeUrl()).values() + .stream().map(VersionedResource::resource).collect(Collectors.toList())); } protected static class ResponseTracker implements Consumer { diff --git a/envoy-control-core/src/test/java/pl/allegro/tech/servicemesh/envoycontrol/v3/SimpleCacheWithMissingEndpointsTest.java b/envoy-control-core/src/test/java/pl/allegro/tech/servicemesh/envoycontrol/v3/SimpleCacheWithMissingEndpointsTest.java index 2b82ec6fe..77d9ee749 100644 --- a/envoy-control-core/src/test/java/pl/allegro/tech/servicemesh/envoycontrol/v3/SimpleCacheWithMissingEndpointsTest.java +++ b/envoy-control-core/src/test/java/pl/allegro/tech/servicemesh/envoycontrol/v3/SimpleCacheWithMissingEndpointsTest.java @@ -4,21 +4,20 @@ import com.google.protobuf.Message; import io.envoyproxy.controlplane.cache.Resources; import io.envoyproxy.controlplane.cache.Response; +import io.envoyproxy.controlplane.cache.VersionedResource; import io.envoyproxy.controlplane.cache.Watch; import io.envoyproxy.controlplane.cache.XdsRequest; import io.envoyproxy.controlplane.cache.v3.Snapshot; import io.envoyproxy.envoy.config.core.v3.Node; import io.envoyproxy.envoy.config.endpoint.v3.ClusterLoadAssignment; import io.envoyproxy.envoy.service.discovery.v3.DiscoveryRequest; - -import static org.assertj.core.api.Assertions.assertThat; - import org.junit.Ignore; import org.junit.jupiter.api.Test; import java.util.Collections; import static java.util.Collections.emptyList; +import static org.assertj.core.api.Assertions.assertThat; public class SimpleCacheWithMissingEndpointsTest extends SimpleCacheTest { @@ -75,11 +74,11 @@ private static void assertThatWatchReceivesSnapshotWithMissingResources(WatchAnd assertThat(response).isNotNull(); assertThat(response.version()).isEqualTo(snapshot.version(watchAndTracker.watch.request().getTypeUrl())); Message[] responseValues = response.resources().toArray(new Message[0]); - Message[] snapshotValues = snapshot.resources(watchAndTracker.watch.request().getTypeUrl()).values().toArray(new Message[0]); + Message[] snapshotValues = snapshot.resources(watchAndTracker.watch.request().getTypeUrl()).values().stream().map(VersionedResource::resource).toArray(Message[]::new); assertThat(responseValues.length).isEqualTo(2); assertThat(responseValues.length).isEqualTo(snapshotValues.length); - assertThat(responseValues[0]).isEqualToComparingFieldByFieldRecursively(snapshotValues[0]); - assertThat(responseValues[1]).isEqualToComparingFieldByFieldRecursively(snapshotValues[1]); + assertThat(responseValues[0].toString()).isEqualTo(snapshotValues[0].toString()); + assertThat(responseValues[1].toString()).isEqualTo(snapshotValues[1].toString()); } } diff --git a/envoy-control-core/src/test/kotlin/pl/allegro/tech/servicemesh/envoycontrol/EnvoySnapshotFactoryTest.kt b/envoy-control-core/src/test/kotlin/pl/allegro/tech/servicemesh/envoycontrol/EnvoySnapshotFactoryTest.kt index 8c3aa7a76..41fe92da5 100644 --- a/envoy-control-core/src/test/kotlin/pl/allegro/tech/servicemesh/envoycontrol/EnvoySnapshotFactoryTest.kt +++ b/envoy-control-core/src/test/kotlin/pl/allegro/tech/servicemesh/envoycontrol/EnvoySnapshotFactoryTest.kt @@ -273,14 +273,10 @@ class EnvoySnapshotFactoryTest { } private fun GlobalSnapshot.withEndpoint(clusterName: String): GlobalSnapshot = copy( - endpoints = SnapshotResources.create( - listOf( - ClusterLoadAssignment.newBuilder() + endpoints = SnapshotResources.create(listOf(ClusterLoadAssignment.newBuilder() .setClusterName(clusterName) .build() - ), "v1" - ) - ) + ), "v1").resources()) private fun createServicesGroup( mode: CommunicationMode = CommunicationMode.XDS, @@ -374,11 +370,11 @@ class EnvoySnapshotFactoryTest { private fun createGlobalSnapshot(vararg clusters: Cluster): GlobalSnapshot { return GlobalSnapshot( - SnapshotResources.create(clusters.toList(), "pl/allegro/tech/servicemesh/envoycontrol/v3"), + SnapshotResources.create(clusters.toList(), "pl/allegro/tech/servicemesh/envoycontrol/v3").resources(), clusters.map { it.name }.toSet(), - SnapshotResources.create(emptyList(), "v1"), + SnapshotResources.create(emptyList(), "v1").resources(), emptyMap(), - SnapshotResources.create(clusters.toList(), "v3") + SnapshotResources.create(clusters.toList(), "v3").resources() ) } diff --git a/envoy-control-core/src/test/kotlin/pl/allegro/tech/servicemesh/envoycontrol/snapshot/SnapshotUpdaterTest.kt b/envoy-control-core/src/test/kotlin/pl/allegro/tech/servicemesh/envoycontrol/snapshot/SnapshotUpdaterTest.kt index d7c361ca1..85f5105a2 100644 --- a/envoy-control-core/src/test/kotlin/pl/allegro/tech/servicemesh/envoycontrol/snapshot/SnapshotUpdaterTest.kt +++ b/envoy-control-core/src/test/kotlin/pl/allegro/tech/servicemesh/envoycontrol/snapshot/SnapshotUpdaterTest.kt @@ -1,6 +1,9 @@ package pl.allegro.tech.servicemesh.envoycontrol.snapshot import com.google.protobuf.util.Durations +import io.envoyproxy.controlplane.cache.DeltaResponse +import io.envoyproxy.controlplane.cache.DeltaWatch +import io.envoyproxy.controlplane.cache.DeltaXdsRequest import io.envoyproxy.controlplane.cache.Resources import io.envoyproxy.controlplane.cache.Response import io.envoyproxy.controlplane.cache.SnapshotCache @@ -1138,6 +1141,18 @@ class SnapshotUpdaterTest { .isTrue() } } + + override fun createDeltaWatch( + request: DeltaXdsRequest?, + requesterVersion: String?, + resourceVersions: MutableMap?, + pendingResources: MutableSet?, + isWildcard: Boolean, + responseConsumer: Consumer?, + hasClusterChanged: Boolean + ): DeltaWatch { + throw UnsupportedOperationException("not used in testing") + } } private fun hasSnapshot(cache: SnapshotCache, group: Group): Snapshot { @@ -1249,7 +1264,7 @@ class SnapshotUpdaterTest { ) private fun GlobalSnapshot.hasHttp2Cluster(clusterName: String): GlobalSnapshot { - val cluster = this.clusters.resources()[clusterName] + val cluster = this.clusters[clusterName] assertThat(cluster).isNotNull assertThat(cluster!!.hasHttp2ProtocolOptions()).isTrue() return this @@ -1259,7 +1274,7 @@ class SnapshotUpdaterTest { clusterName: String, zones: Set ): GlobalSnapshot { - val endpoints = this.endpoints.resources()[clusterName] + val endpoints = this.endpoints[clusterName] assertThat(endpoints).isNotNull assertThat(endpoints!!.endpointsList.map { it.locality.zone }.toSet()).containsAll(zones) assertThat(endpoints.endpointsList.flatMap { it.lbEndpointsList }).isEmpty() @@ -1267,7 +1282,7 @@ class SnapshotUpdaterTest { } private fun GlobalSnapshot.hasAnEndpoint(clusterName: String, ip: String, port: Int): GlobalSnapshot { - val endpoints = this.endpoints.resources()[clusterName] + val endpoints = this.endpoints[clusterName] assertThat(endpoints).isNotNull assertThat(endpoints!!.endpointsList.flatMap { it.lbEndpointsList }) .anyMatch { it.endpoint.address.socketAddress.let { it.address == ip && it.portValue == port } } @@ -1275,20 +1290,20 @@ class SnapshotUpdaterTest { } private fun GlobalSnapshot.hasTheSameClusters(other: GlobalSnapshot): GlobalSnapshot { - val clusters = this.clusters.resources() - assertThat(clusters).isEqualTo(other.clusters.resources()) + val clusters = this.clusters + assertThat(clusters).isEqualTo(other.clusters) return this } private fun GlobalSnapshot.hasTheSameEndpoints(other: GlobalSnapshot): GlobalSnapshot { - val endpoints = this.endpoints.resources() - assertThat(endpoints).isEqualTo(other.endpoints.resources()) + val endpoints = this.endpoints + assertThat(endpoints).isEqualTo(other.endpoints) return this } private fun GlobalSnapshot.hasTheSameSecuredClusters(other: GlobalSnapshot): GlobalSnapshot { - val securedClusters = this.securedClusters.resources() - assertThat(securedClusters).isEqualTo(other.securedClusters.resources()) + val securedClusters = this.securedClusters + assertThat(securedClusters).isEqualTo(other.securedClusters) return this } diff --git a/envoy-control-core/src/test/kotlin/pl/allegro/tech/servicemesh/envoycontrol/snapshot/resource/listeners/config/LocalReplyConfigFactoryTest.kt b/envoy-control-core/src/test/kotlin/pl/allegro/tech/servicemesh/envoycontrol/snapshot/resource/listeners/config/LocalReplyConfigFactoryTest.kt index 1613da04e..08141d3b2 100644 --- a/envoy-control-core/src/test/kotlin/pl/allegro/tech/servicemesh/envoycontrol/snapshot/resource/listeners/config/LocalReplyConfigFactoryTest.kt +++ b/envoy-control-core/src/test/kotlin/pl/allegro/tech/servicemesh/envoycontrol/snapshot/resource/listeners/config/LocalReplyConfigFactoryTest.kt @@ -448,9 +448,19 @@ body_format { value { struct_value { fields { - key: "service-tag" + key: "listOfIntegers" value { - string_value: "test" + list_value { + values { + number_value: 1.0 + } + values { + number_value: 2.0 + } + values { + number_value: 3.0 + } + } } } fields { @@ -467,30 +477,14 @@ body_format { } } fields { - key: "listOfIntegers" + key: "service-tag" value { - list_value { - values { - number_value: 1.0 - } - values { - number_value: 2.0 - } - values { - number_value: 3.0 - } - } + string_value: "test" } } } } } - fields { - key: "reason" - value { - number_value: 1.0 - } - } fields { key: "listOfMap" value { @@ -518,6 +512,12 @@ body_format { } } } + fields { + key: "reason" + value { + number_value: 1.0 + } + } } content_type: "application/envoy+json" }""".trimIndent() diff --git a/envoy-control-core/src/test/kotlin/pl/allegro/tech/servicemesh/envoycontrol/snapshot/resource/listeners/filters/rbac/RBACFilterFactoryJwtTest.kt b/envoy-control-core/src/test/kotlin/pl/allegro/tech/servicemesh/envoycontrol/snapshot/resource/listeners/filters/rbac/RBACFilterFactoryJwtTest.kt index d0b0238be..a9e189d06 100644 --- a/envoy-control-core/src/test/kotlin/pl/allegro/tech/servicemesh/envoycontrol/snapshot/resource/listeners/filters/rbac/RBACFilterFactoryJwtTest.kt +++ b/envoy-control-core/src/test/kotlin/pl/allegro/tech/servicemesh/envoycontrol/snapshot/resource/listeners/filters/rbac/RBACFilterFactoryJwtTest.kt @@ -1,6 +1,8 @@ package pl.allegro.tech.servicemesh.envoycontrol.snapshot.resource.listeners.filters.rbac import io.envoyproxy.controlplane.cache.SnapshotResources +import io.envoyproxy.envoy.config.cluster.v3.Cluster +import io.envoyproxy.envoy.config.endpoint.v3.ClusterLoadAssignment import org.assertj.core.api.Assertions import org.junit.jupiter.api.Test import org.junit.jupiter.params.ParameterizedTest @@ -39,11 +41,11 @@ internal class RBACFilterFactoryJwtTest : RBACFilterFactoryTestUtils { ) val snapshot = GlobalSnapshot( - SnapshotResources.create(listOf(), ""), + SnapshotResources.create(listOf(), "").resources(), setOf(), - SnapshotResources.create(listOf(), ""), + SnapshotResources.create(listOf(), "").resources(), mapOf(), - SnapshotResources.create(listOf(), "") + SnapshotResources.create(listOf(), "").resources() ) @ParameterizedTest(name = "should generate RBAC rules for {arguments} OAuth Policy") diff --git a/envoy-control-core/src/test/kotlin/pl/allegro/tech/servicemesh/envoycontrol/snapshot/resource/listeners/filters/rbac/RBACFilterFactoryTest.kt b/envoy-control-core/src/test/kotlin/pl/allegro/tech/servicemesh/envoycontrol/snapshot/resource/listeners/filters/rbac/RBACFilterFactoryTest.kt index d3b132e82..3de5edaf9 100644 --- a/envoy-control-core/src/test/kotlin/pl/allegro/tech/servicemesh/envoycontrol/snapshot/resource/listeners/filters/rbac/RBACFilterFactoryTest.kt +++ b/envoy-control-core/src/test/kotlin/pl/allegro/tech/servicemesh/envoycontrol/snapshot/resource/listeners/filters/rbac/RBACFilterFactoryTest.kt @@ -1,6 +1,7 @@ package pl.allegro.tech.servicemesh.envoycontrol.snapshot.resource.listeners.filters.rbac import io.envoyproxy.controlplane.cache.SnapshotResources +import io.envoyproxy.envoy.config.cluster.v3.Cluster import io.envoyproxy.envoy.config.core.v3.Address import io.envoyproxy.envoy.config.core.v3.SocketAddress import io.envoyproxy.envoy.config.endpoint.v3.ClusterLoadAssignment @@ -86,11 +87,11 @@ internal class RBACFilterFactoryTest : RBACFilterFactoryTestUtils { ) val snapshot = GlobalSnapshot( - SnapshotResources.create(listOf(), ""), + SnapshotResources.create(listOf(), "").resources(), setOf(), - SnapshotResources.create(listOf(), ""), + SnapshotResources.create(listOf(), "").resources(), mapOf(), - SnapshotResources.create(listOf(), "") + SnapshotResources.create(listOf(), "").resources() ) val clusterLoadAssignment = ClusterLoadAssignment.newBuilder() @@ -108,11 +109,11 @@ internal class RBACFilterFactoryTest : RBACFilterFactoryTestUtils { ).build() val snapshotForSourceIpAuth = GlobalSnapshot( - SnapshotResources.create(listOf(), ""), + SnapshotResources.create(listOf(), "").resources(), setOf(), - SnapshotResources.create(listOf(clusterLoadAssignment), ""), + SnapshotResources.create(listOf(clusterLoadAssignment), "").resources(), mapOf(), - SnapshotResources.create(listOf(), "") + SnapshotResources.create(listOf(), "").resources() ) @Test diff --git a/envoy-control-runner/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/snapshot/debug/SnapshotDebugInfo.kt b/envoy-control-runner/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/snapshot/debug/SnapshotDebugInfo.kt index e626a5342..e4ce5c0b7 100644 --- a/envoy-control-runner/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/snapshot/debug/SnapshotDebugInfo.kt +++ b/envoy-control-runner/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/snapshot/debug/SnapshotDebugInfo.kt @@ -55,8 +55,8 @@ data class SnapshotDebugInfo( constructor(globalSnapshot: GlobalSnapshot) : this( snapshot = Snapshot( - clusters = globalSnapshot.clusters.resources(), - endpoints = globalSnapshot.endpoints.resources() + clusters = globalSnapshot.clusters, + endpoints = globalSnapshot.endpoints ), versions = Versions() ) diff --git a/envoy-control-runner/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/snapshot/debug/SnapshotDebugService.kt b/envoy-control-runner/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/snapshot/debug/SnapshotDebugService.kt index 52891c9c1..346a74245 100644 --- a/envoy-control-runner/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/snapshot/debug/SnapshotDebugService.kt +++ b/envoy-control-runner/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/snapshot/debug/SnapshotDebugService.kt @@ -75,8 +75,7 @@ class SnapshotDebugService( logger.warn("Global snapshot is missing") throw GlobalSnapshotNotFoundException("Global snapshot is missing") } - val endpoints = globalSnapshot.endpoints - .resources()[service] + val endpoints = globalSnapshot.endpoints[service] if (endpoints == null) { logger.warn("Can not find $service in global snapshot") throw GlobalSnapshotNotFoundException("Service $service not found in global snapshot") diff --git a/envoy-control-runner/src/main/resources/application-local.yaml b/envoy-control-runner/src/main/resources/application-local.yaml index f89a8ada0..abfb4498b 100644 --- a/envoy-control-runner/src/main/resources/application-local.yaml +++ b/envoy-control-runner/src/main/resources/application-local.yaml @@ -6,4 +6,4 @@ envoy-control: chaos: username: "user" - password: "pass" \ No newline at end of file + password: "pass" diff --git a/envoy-control-tests/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/EnvoyControlSmokeTest.kt b/envoy-control-tests/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/EnvoyControlSmokeTest.kt index 40f546b94..cf7410a17 100644 --- a/envoy-control-tests/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/EnvoyControlSmokeTest.kt +++ b/envoy-control-tests/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/EnvoyControlSmokeTest.kt @@ -7,6 +7,7 @@ import pl.allegro.tech.servicemesh.envoycontrol.assertions.untilAsserted import pl.allegro.tech.servicemesh.envoycontrol.config.Ads import pl.allegro.tech.servicemesh.envoycontrol.config.AdsWithNoDependencies import pl.allegro.tech.servicemesh.envoycontrol.config.AdsWithStaticListeners +import pl.allegro.tech.servicemesh.envoycontrol.config.DeltaAds import pl.allegro.tech.servicemesh.envoycontrol.config.Xds import pl.allegro.tech.servicemesh.envoycontrol.config.consul.ConsulExtension import pl.allegro.tech.servicemesh.envoycontrol.config.envoy.EnvoyExtension @@ -100,6 +101,35 @@ class AdsEnvoyControlSmokeTest : EnvoyControlSmokeTest { override fun envoy() = envoy } +class DeltaAdsEnvoyControlSmokeTest : EnvoyControlSmokeTest { + companion object { + + @JvmField + @RegisterExtension + val consul = ConsulExtension() + + @JvmField + @RegisterExtension + val envoyControl = EnvoyControlExtension(consul) + + @JvmField + @RegisterExtension + val service = EchoServiceExtension() + + @JvmField + @RegisterExtension + val envoy = EnvoyExtension(envoyControl, service, config = DeltaAds) + } + + override fun consul() = consul + + override fun envoyControl() = envoyControl + + override fun service() = service + + override fun envoy() = envoy +} + class XdsEnvoyControlSmokeTest : EnvoyControlSmokeTest { companion object { diff --git a/envoy-control-tests/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/MetricsDiscoveryServerCallbacksTest.kt b/envoy-control-tests/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/MetricsDiscoveryServerCallbacksTest.kt index 6f7b70d88..beffcee16 100644 --- a/envoy-control-tests/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/MetricsDiscoveryServerCallbacksTest.kt +++ b/envoy-control-tests/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/MetricsDiscoveryServerCallbacksTest.kt @@ -6,6 +6,7 @@ import org.junit.jupiter.api.Test import org.junit.jupiter.api.extension.RegisterExtension import pl.allegro.tech.servicemesh.envoycontrol.assertions.untilAsserted import pl.allegro.tech.servicemesh.envoycontrol.config.Ads +import pl.allegro.tech.servicemesh.envoycontrol.config.DeltaAds import pl.allegro.tech.servicemesh.envoycontrol.config.Xds import pl.allegro.tech.servicemesh.envoycontrol.config.consul.ConsulExtension import pl.allegro.tech.servicemesh.envoycontrol.config.envoy.EnvoyExtension @@ -57,6 +58,23 @@ class XdsMetricsDiscoveryServerCallbacksTest : MetricsDiscoveryServerCallbacksTe ADS to 0, UNKNOWN to 0 ) + + override fun expectedGrpcRequestsCounterValues() = mapOf( + CDS.name.toLowerCase() to isGreaterThanZero(), + EDS.name.toLowerCase() to isGreaterThanZero(), + LDS.name.toLowerCase() to isGreaterThanZero(), + RDS.name.toLowerCase() to isGreaterThanZero(), + SDS.name.toLowerCase() to isNull(), + ADS.name.toLowerCase() to isNull(), + UNKNOWN.name.toLowerCase() to isNull(), + "${CDS.name.toLowerCase()}.delta" to isNull(), + "${EDS.name.toLowerCase()}.delta" to isNull(), + "${LDS.name.toLowerCase()}.delta" to isNull(), + "${RDS.name.toLowerCase()}.delta" to isNull(), + "${SDS.name.toLowerCase()}.delta" to isNull(), + "${ADS.name.toLowerCase()}.delta" to isNull(), + "${UNKNOWN.name.toLowerCase()}.delta" to isNull() + ) } class AdsMetricsDiscoveryServerCallbackTest : MetricsDiscoveryServerCallbacksTest { @@ -96,6 +114,79 @@ class AdsMetricsDiscoveryServerCallbackTest : MetricsDiscoveryServerCallbacksTes ADS to 1, // all info is exchanged on one stream UNKNOWN to 0 ) + + override fun expectedGrpcRequestsCounterValues() = mapOf( + CDS.name.toLowerCase() to isGreaterThanZero(), + EDS.name.toLowerCase() to isGreaterThanZero(), + LDS.name.toLowerCase() to isGreaterThanZero(), + RDS.name.toLowerCase() to isGreaterThanZero(), + SDS.name.toLowerCase() to isNull(), + ADS.name.toLowerCase() to isNull(), + UNKNOWN.name.toLowerCase() to isNull(), + "${CDS.name.toLowerCase()}.delta" to isNull(), + "${EDS.name.toLowerCase()}.delta" to isNull(), + "${LDS.name.toLowerCase()}.delta" to isNull(), + "${RDS.name.toLowerCase()}.delta" to isNull(), + "${SDS.name.toLowerCase()}.delta" to isNull(), + "${ADS.name.toLowerCase()}.delta" to isNull(), + "${UNKNOWN.name.toLowerCase()}.delta" to isNull() + ) +} + +class DeltaAdsMetricsDiscoveryServerCallbackTest : MetricsDiscoveryServerCallbacksTest { + companion object { + + @JvmField + @RegisterExtension + val consul = ConsulExtension() + + @JvmField + @RegisterExtension + val envoyControl = EnvoyControlExtension(consul) + + @JvmField + @RegisterExtension + val service = EchoServiceExtension() + + @JvmField + @RegisterExtension + val envoy = EnvoyExtension(envoyControl, service, config = DeltaAds) + } + + override fun consul() = consul + + override fun envoyControl() = envoyControl + + override fun service() = service + + override fun envoy() = envoy + + override fun expectedGrpcConnectionsGaugeValues() = mapOf( + CDS to 0, + EDS to 0, + LDS to 0, + RDS to 0, + SDS to 0, + ADS to 1, // all info is exchanged on one stream + UNKNOWN to 0 + ) + + override fun expectedGrpcRequestsCounterValues() = mapOf( + CDS.name.toLowerCase() to isNull(), + EDS.name.toLowerCase() to isNull(), + LDS.name.toLowerCase() to isNull(), + RDS.name.toLowerCase() to isNull(), + SDS.name.toLowerCase() to isNull(), + ADS.name.toLowerCase() to isNull(), + UNKNOWN.name.toLowerCase() to isNull(), + "${CDS.name.toLowerCase()}.delta" to isGreaterThanZero(), + "${EDS.name.toLowerCase()}.delta" to isGreaterThanZero(), + "${LDS.name.toLowerCase()}.delta" to isGreaterThanZero(), + "${RDS.name.toLowerCase()}.delta" to isGreaterThanZero(), + "${SDS.name.toLowerCase()}.delta" to isNull(), + "${ADS.name.toLowerCase()}.delta" to isNull(), + "${UNKNOWN.name.toLowerCase()}.delta" to isNull() + ) } interface MetricsDiscoveryServerCallbacksTest { @@ -108,6 +199,16 @@ interface MetricsDiscoveryServerCallbacksTest { fun envoy(): EnvoyExtension + fun expectedGrpcConnectionsGaugeValues(): Map + + fun expectedGrpcRequestsCounterValues(): Map Boolean> + + fun MeterRegistry.counterValue(name: String) = this.find(name).counter()?.count()?.toInt() + + fun isGreaterThanZero() = { x: Int? -> x!! > 0 } + + fun isNull() = { x: Int? -> x == null } + @Test fun `should measure gRPC connections`() { // given @@ -124,8 +225,6 @@ interface MetricsDiscoveryServerCallbacksTest { } } - fun expectedGrpcConnectionsGaugeValues(): Map - @Test fun `should measure gRPC requests`() { // given @@ -134,14 +233,11 @@ interface MetricsDiscoveryServerCallbacksTest { // expect untilAsserted { - listOf(CDS, EDS, LDS, RDS).forEach { - assertThat(meterRegistry.counterValue("grpc.requests.${it.name.toLowerCase()}")).isGreaterThan(0) - } - listOf(SDS, ADS, UNKNOWN).forEach { - assertThat(meterRegistry.counterValue("grpc.requests.${it.name.toLowerCase()}")).isNull() + expectedGrpcRequestsCounterValues().forEach { (type, condition) -> + val counterValue = meterRegistry.counterValue("grpc.requests.$type") + println("$type $counterValue") + assertThat(counterValue).satisfies { condition(it) } } } } - - fun MeterRegistry.counterValue(name: String) = this.find(name).counter()?.count()?.toInt() } diff --git a/envoy-control-tests/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/SnapshotDebugTest.kt b/envoy-control-tests/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/SnapshotDebugTest.kt index a21194891..2bfd0463b 100644 --- a/envoy-control-tests/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/SnapshotDebugTest.kt +++ b/envoy-control-tests/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/SnapshotDebugTest.kt @@ -4,15 +4,75 @@ import org.assertj.core.api.Assertions.assertThat import org.junit.jupiter.api.Test import org.junit.jupiter.api.extension.RegisterExtension import pl.allegro.tech.servicemesh.envoycontrol.assertions.untilAsserted +import pl.allegro.tech.servicemesh.envoycontrol.config.AdsAllDependencies +import pl.allegro.tech.servicemesh.envoycontrol.config.DeltaAdsAllDependencies import pl.allegro.tech.servicemesh.envoycontrol.config.consul.ConsulExtension import pl.allegro.tech.servicemesh.envoycontrol.config.envoy.EnvoyExtension import pl.allegro.tech.servicemesh.envoycontrol.config.envoycontrol.EnvoyControlExtension import pl.allegro.tech.servicemesh.envoycontrol.config.service.EchoServiceExtension -open class SnapshotDebugTest { +open class WildcardSnapshotDebugTest : SnapshotDebugTest { + companion object { + @JvmField + @RegisterExtension + val consul = ConsulExtension() + + @JvmField + @RegisterExtension + val envoyControl = EnvoyControlExtension(consul, properties = mapOf( + "envoy-control.envoy.snapshot.outgoing-permissions.services-allowed-to-use-wildcard" to setOf("echo2", "test-service") + )) + @JvmField + @RegisterExtension + val service = EchoServiceExtension() + + @JvmField + @RegisterExtension + val envoy = EnvoyExtension(envoyControl, service, config = AdsAllDependencies) + } + + override fun consul() = consul + + override fun envoyControl() = envoyControl + + override fun envoy() = envoy + + override fun service() = service +} + +open class DeltaWildcardSnapshotDebugTest : SnapshotDebugTest { companion object { + @JvmField + @RegisterExtension + val consul = ConsulExtension() + + @JvmField + @RegisterExtension + val envoyControl = EnvoyControlExtension(consul, properties = mapOf( + "envoy-control.envoy.snapshot.outgoing-permissions.services-allowed-to-use-wildcard" to setOf("echo2", "test-service") + )) + @JvmField + @RegisterExtension + val service = EchoServiceExtension() + + @JvmField + @RegisterExtension + val envoy = EnvoyExtension(envoyControl, service, config = DeltaAdsAllDependencies) + } + + override fun consul() = consul + + override fun envoyControl() = envoyControl + + override fun envoy() = envoy + + override fun service() = service +} + +open class RandomSnapshotDebugTest : SnapshotDebugTest { + companion object { @JvmField @RegisterExtension val consul = ConsulExtension() @@ -30,8 +90,19 @@ open class SnapshotDebugTest { val envoy = EnvoyExtension(envoyControl, service) } + override fun consul() = consul + + override fun envoyControl() = envoyControl + + override fun envoy() = envoy + + override fun service() = service +} + +interface SnapshotDebugTest { + @Test - open fun `should return snapshot debug info containing snapshot versions`() { + fun `should return snapshot debug info containing snapshot versions`() { // given consul().server.operations.registerService(service(), name = "echo") val nodeMetadata = envoy().container.admin().nodeInfo() @@ -53,7 +124,7 @@ open class SnapshotDebugTest { } @Test - open fun `should return snapshot debug info containing snapshot contents`() { + fun `should return snapshot debug info containing snapshot contents`() { // given consul().server.operations.registerService(service(), name = "echo") val nodeMetadata = envoy().container.admin().nodeInfo() @@ -70,49 +141,51 @@ open class SnapshotDebugTest { } } - private val missingNodeJson = """{ - "metadata": { - "service_name": "service-mesh-service-first", - "identity": "", - "service_version": "0.1.16-SKYHELIX-839-eds-version-metric-SNAPSHOT", - "proxy_settings": { - "incoming": { - "endpoints": null, - "healthCheck": null, - "roles": null, - "timeoutPolicy": null - }, - "outgoing": { - "dependencies": [ - { - "handleInternalRedirect": null, - "timeoutPolicy": null, - "endpoints": [], - "domain": null, - "service": "*" - } - ] - } - }, - "ads": true - }, - "locality": { - "zone": "dev-dc4" - } + fun missingNodeJson(): String { + return """{ + "metadata": { + "service_name": "service-mesh-service-first", + "identity": "", + "service_version": "0.1.16-SKYHELIX-839-eds-version-metric-SNAPSHOT", + "proxy_settings": { + "incoming": { + "endpoints": null, + "healthCheck": null, + "roles": null, + "timeoutPolicy": null + }, + "outgoing": { + "dependencies": [ + { + "handleInternalRedirect": null, + "timeoutPolicy": null, + "endpoints": [], + "domain": null, + "service": "*" + } + ] + } + }, + "ads": true + }, + "locality": { + "zone": "dev-dc4" + } + } + """.trim() } -""".trim() @Test - open fun `should inform about missing snapshot when given node does not exist`() { + fun `should inform about missing snapshot when given node does not exist`() { // when - val snapshot = envoyControl().app.getSnapshot(missingNodeJson) + val snapshot = envoyControl().app.getSnapshot(missingNodeJson()) // then assertThat(snapshot.found).isFalse() } @Test - open fun `should return global snapshot debug info from xds`() { + fun `should return global snapshot debug info from xds`() { untilAsserted { // when val snapshot = envoyControl().app.getGlobalSnapshot(xds = true) @@ -125,7 +198,7 @@ open class SnapshotDebugTest { } @Test - open fun `should return global snapshot debug info from ads`() { + fun `should return global snapshot debug info from ads`() { untilAsserted { // when val snapshotXdsNull = envoyControl().app.getGlobalSnapshot(xds = null) @@ -145,11 +218,8 @@ open class SnapshotDebugTest { } } - open fun consul() = consul - - open fun envoyControl() = envoyControl - - open fun envoy() = envoy - - open fun service() = service + fun consul(): ConsulExtension + fun envoyControl(): EnvoyControlExtension + fun envoy(): EnvoyExtension + fun service(): EchoServiceExtension } diff --git a/envoy-control-tests/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/config/EnvoyControlTestConfiguration.kt b/envoy-control-tests/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/config/EnvoyControlTestConfiguration.kt index 7adffebd2..9a968b2be 100644 --- a/envoy-control-tests/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/config/EnvoyControlTestConfiguration.kt +++ b/envoy-control-tests/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/config/EnvoyControlTestConfiguration.kt @@ -24,7 +24,6 @@ import pl.allegro.tech.servicemesh.envoycontrol.config.service.EchoContainer import pl.allegro.tech.servicemesh.envoycontrol.logger import java.time.Duration import java.util.concurrent.TimeUnit -import kotlin.random.Random data class EnvoyConfig( val filePath: String, @@ -39,6 +38,22 @@ val AdsCustomHealthCheck = EnvoyConfig("envoy/config_ads_custom_health_check.yam val AdsDynamicForwardProxy = EnvoyConfig("envoy/config_ads_dynamic_forward_proxy.yaml") val FaultyConfig = EnvoyConfig("envoy/bad_config.yaml") val Ads = EnvoyConfig("envoy/config_ads.yaml") +val DeltaAds = Ads.copy( + configOverride = """ + dynamic_resources: + ads_config: + api_type: DELTA_GRPC +""".trimIndent() +) + +val DeltaAdsAllDependencies = AdsAllDependencies.copy( + configOverride = """ + dynamic_resources: + ads_config: + api_type: DELTA_GRPC +""".trimIndent() +) + val Echo1EnvoyAuthConfig = EnvoyConfig("envoy/config_auth.yaml") val Echo2EnvoyAuthConfig = Echo1EnvoyAuthConfig.copy( serviceName = "echo2", @@ -54,7 +69,7 @@ val AdsWithDisabledEndpointPermissions = EnvoyConfig("envoy/config_ads_disabled_ val AdsWithStaticListeners = EnvoyConfig("envoy/config_ads_static_listeners.yaml") val AdsWithNoDependencies = EnvoyConfig("envoy/config_ads_no_dependencies.yaml") val Xds = EnvoyConfig("envoy/config_xds.yaml") -val RandomConfigFile = if (Random.nextBoolean()) Ads else Xds +val RandomConfigFile = listOf(Ads, Xds, DeltaAds).random() val OAuthEnvoyConfig = EnvoyConfig("envoy/config_oauth.yaml") @Deprecated("use extension approach instead, e.g. RetryPolicyTest")