Skip to content

Commit 64321c3

Browse files
authored
Obsolete resource handling for read-cache-after-write (#3207)
Signed-off-by: Attila Mészáros <a_meszaros@apple.com>
1 parent 6396b48 commit 64321c3

File tree

17 files changed

+462
-80
lines changed

17 files changed

+462
-80
lines changed

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ExecutorServiceManager.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import java.util.concurrent.ExecutorService;
2424
import java.util.concurrent.Executors;
2525
import java.util.concurrent.Future;
26+
import java.util.concurrent.ScheduledExecutorService;
2627
import java.util.concurrent.TimeUnit;
2728
import java.util.concurrent.TimeoutException;
2829
import java.util.function.Function;
@@ -40,6 +41,7 @@ public class ExecutorServiceManager {
4041
private ExecutorService executor;
4142
private ExecutorService workflowExecutor;
4243
private ExecutorService cachingExecutorService;
44+
private ScheduledExecutorService scheduledExecutorService;
4345
private boolean started;
4446
private ConfigurationService configurationService;
4547

@@ -126,10 +128,15 @@ public ExecutorService cachingExecutorService() {
126128
return cachingExecutorService;
127129
}
128130

131+
public ScheduledExecutorService scheduledExecutorService() {
132+
return scheduledExecutorService;
133+
}
134+
129135
public void start(ConfigurationService configurationService) {
130136
if (!started) {
131137
this.configurationService = configurationService; // used to lazy init workflow executor
132138
this.cachingExecutorService = Executors.newCachedThreadPool();
139+
this.scheduledExecutorService = Executors.newScheduledThreadPool(0);
133140
this.executor = new InstrumentedExecutorService(configurationService.getExecutorService());
134141
started = true;
135142
}

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/informer/Informer.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,9 +27,11 @@
2727
import io.javaoperatorsdk.operator.processing.event.source.filter.OnAddFilter;
2828
import io.javaoperatorsdk.operator.processing.event.source.filter.OnDeleteFilter;
2929
import io.javaoperatorsdk.operator.processing.event.source.filter.OnUpdateFilter;
30+
import io.javaoperatorsdk.operator.processing.event.source.informer.TemporaryResourceCache;
3031

3132
import static io.javaoperatorsdk.operator.api.reconciler.Constants.DEFAULT_COMPARABLE_RESOURCE_VERSION;
3233
import static io.javaoperatorsdk.operator.api.reconciler.Constants.DEFAULT_FOLLOW_CONTROLLER_NAMESPACE_CHANGES;
34+
import static io.javaoperatorsdk.operator.api.reconciler.Constants.DEFAULT_GHOST_RESOURCE_CHECK_INTERVAL_MILLIS;
3335
import static io.javaoperatorsdk.operator.api.reconciler.Constants.NO_LONG_VALUE_SET;
3436
import static io.javaoperatorsdk.operator.api.reconciler.Constants.NO_VALUE_SET;
3537

@@ -139,4 +141,13 @@
139141
* @since 5.3.0
140142
*/
141143
boolean comparableResourceVersions() default DEFAULT_COMPARABLE_RESOURCE_VERSION;
144+
145+
/**
146+
* For read-cache-after-write consistency there are some corner cases where we need to check the
147+
* caches see {@link TemporaryResourceCache} periodically. This is the period in milliseconds.
148+
* Applicable only if {@link #comparableResourceVersions()} is true.
149+
*
150+
* @since 5.3.0
151+
*/
152+
long ghostResourceCacheCheckInterval() default DEFAULT_GHOST_RESOURCE_CHECK_INTERVAL_MILLIS;
142153
}

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/informer/InformerConfiguration.java

Lines changed: 34 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
*/
1616
package io.javaoperatorsdk.operator.api.config.informer;
1717

18+
import java.time.Duration;
1819
import java.util.Arrays;
1920
import java.util.Collection;
2021
import java.util.Collections;
@@ -53,7 +54,8 @@ public class InformerConfiguration<R extends HasMetadata> {
5354
private ItemStore<R> itemStore;
5455
private Long informerListLimit;
5556
private FieldSelector fieldSelector;
56-
private boolean comparableResourceVersions;
57+
private Boolean comparableResourceVersions;
58+
private Duration ghostResourceCacheCheckInterval;
5759

5860
protected InformerConfiguration(
5961
Class<R> resourceClass,
@@ -68,7 +70,8 @@ protected InformerConfiguration(
6870
ItemStore<R> itemStore,
6971
Long informerListLimit,
7072
FieldSelector fieldSelector,
71-
boolean comparableResourceVersions) {
73+
Boolean comparableResourceVersions,
74+
Duration ghostResourceCacheCheckInterval) {
7275
this(resourceClass);
7376
this.name = name;
7477
this.namespaces = namespaces;
@@ -82,6 +85,7 @@ protected InformerConfiguration(
8285
this.informerListLimit = informerListLimit;
8386
this.fieldSelector = fieldSelector;
8487
this.comparableResourceVersions = comparableResourceVersions;
88+
this.ghostResourceCacheCheckInterval = ghostResourceCacheCheckInterval;
8589
}
8690

8791
private InformerConfiguration(Class<R> resourceClass) {
@@ -117,7 +121,8 @@ public static <R extends HasMetadata> InformerConfiguration<R>.Builder builder(
117121
original.itemStore,
118122
original.informerListLimit,
119123
original.fieldSelector,
120-
original.comparableResourceVersions)
124+
original.comparableResourceVersions,
125+
original.ghostResourceCacheCheckInterval)
121126
.builder;
122127
}
123128

@@ -296,6 +301,10 @@ public boolean isComparableResourceVersions() {
296301
return comparableResourceVersions;
297302
}
298303

304+
public Duration getGhostResourceCacheCheckInterval() {
305+
return ghostResourceCacheCheckInterval;
306+
}
307+
299308
@SuppressWarnings("UnusedReturnValue")
300309
public class Builder {
301310

@@ -310,6 +319,13 @@ public InformerConfiguration<R> buildForController() {
310319
}
311320
// to avoid potential NPE
312321
followControllerNamespaceChanges = false;
322+
if (comparableResourceVersions == null) {
323+
comparableResourceVersions = DEFAULT_COMPARABLE_RESOURCE_VERSION;
324+
}
325+
326+
if (ghostResourceCacheCheckInterval == null) {
327+
ghostResourceCacheCheckInterval = DEFAULT_GHOST_RESOURCE_CHECK_INTERVAL;
328+
}
313329
return InformerConfiguration.this;
314330
}
315331

@@ -321,6 +337,14 @@ public InformerConfiguration<R> build() {
321337
if (followControllerNamespaceChanges == null) {
322338
followControllerNamespaceChanges = DEFAULT_FOLLOW_CONTROLLER_NAMESPACE_CHANGES;
323339
}
340+
if (comparableResourceVersions == null) {
341+
comparableResourceVersions = DEFAULT_COMPARABLE_RESOURCE_VERSION;
342+
}
343+
344+
if (ghostResourceCacheCheckInterval == null) {
345+
ghostResourceCacheCheckInterval = DEFAULT_GHOST_RESOURCE_CHECK_INTERVAL;
346+
}
347+
324348
return InformerConfiguration.this;
325349
}
326350

@@ -368,6 +392,8 @@ public InformerConfiguration<R>.Builder initFromAnnotation(
368392
.map(f -> new FieldSelector.Field(f.path(), f.value(), f.negated()))
369393
.toList()));
370394
withComparableResourceVersions(informerConfig.comparableResourceVersions());
395+
withGhostResourceCacheCheckInterval(
396+
Duration.ofMillis(informerConfig.ghostResourceCacheCheckInterval()));
371397
}
372398
return this;
373399
}
@@ -473,5 +499,10 @@ public Builder withComparableResourceVersions(boolean comparableResourceVersions
473499
InformerConfiguration.this.comparableResourceVersions = comparableResourceVersions;
474500
return this;
475501
}
502+
503+
public Builder withGhostResourceCacheCheckInterval(Duration ghostResourceCacheCheckInterval) {
504+
InformerConfiguration.this.ghostResourceCacheCheckInterval = ghostResourceCacheCheckInterval;
505+
return this;
506+
}
476507
}
477508
}

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/informer/InformerEventSourceConfiguration.java

Lines changed: 10 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
*/
1616
package io.javaoperatorsdk.operator.api.config.informer;
1717

18+
import java.time.Duration;
1819
import java.util.Objects;
1920
import java.util.Optional;
2021
import java.util.Set;
@@ -33,7 +34,6 @@
3334
import io.javaoperatorsdk.operator.processing.event.source.filter.OnUpdateFilter;
3435
import io.javaoperatorsdk.operator.processing.event.source.informer.Mappers;
3536

36-
import static io.javaoperatorsdk.operator.api.reconciler.Constants.DEFAULT_COMPARABLE_RESOURCE_VERSION;
3737
import static io.javaoperatorsdk.operator.api.reconciler.Constants.SAME_AS_CONTROLLER_NAMESPACES_SET;
3838
import static io.javaoperatorsdk.operator.api.reconciler.Constants.WATCH_ALL_NAMESPACE_SET;
3939
import static io.javaoperatorsdk.operator.api.reconciler.Constants.WATCH_CURRENT_NAMESPACE_SET;
@@ -97,21 +97,18 @@ class DefaultInformerEventSourceConfiguration<R extends HasMetadata>
9797
private final GroupVersionKind groupVersionKind;
9898
private final InformerConfiguration<R> informerConfig;
9999
private final KubernetesClient kubernetesClient;
100-
private final boolean comparableResourceVersion;
101100

102101
protected DefaultInformerEventSourceConfiguration(
103102
GroupVersionKind groupVersionKind,
104103
PrimaryToSecondaryMapper<?> primaryToSecondaryMapper,
105104
SecondaryToPrimaryMapper<R> secondaryToPrimaryMapper,
106105
InformerConfiguration<R> informerConfig,
107-
KubernetesClient kubernetesClient,
108-
boolean comparableResourceVersion) {
106+
KubernetesClient kubernetesClient) {
109107
this.informerConfig = Objects.requireNonNull(informerConfig);
110108
this.groupVersionKind = groupVersionKind;
111109
this.primaryToSecondaryMapper = primaryToSecondaryMapper;
112110
this.secondaryToPrimaryMapper = secondaryToPrimaryMapper;
113111
this.kubernetesClient = kubernetesClient;
114-
this.comparableResourceVersion = comparableResourceVersion;
115112
}
116113

117114
@Override
@@ -139,11 +136,6 @@ public Optional<GroupVersionKind> getGroupVersionKind() {
139136
public Optional<KubernetesClient> getKubernetesClient() {
140137
return Optional.ofNullable(kubernetesClient);
141138
}
142-
143-
@Override
144-
public boolean comparableResourceVersion() {
145-
return this.comparableResourceVersion;
146-
}
147139
}
148140

149141
@SuppressWarnings({"unused", "UnusedReturnValue"})
@@ -157,7 +149,6 @@ class Builder<R extends HasMetadata> {
157149
private PrimaryToSecondaryMapper<?> primaryToSecondaryMapper;
158150
private SecondaryToPrimaryMapper<R> secondaryToPrimaryMapper;
159151
private KubernetesClient kubernetesClient;
160-
private boolean comparableResourceVersion = DEFAULT_COMPARABLE_RESOURCE_VERSION;
161152

162153
private Builder(Class<R> resourceClass, Class<? extends HasMetadata> primaryResourceClass) {
163154
this(resourceClass, primaryResourceClass, null);
@@ -296,7 +287,13 @@ public Builder<R> withFieldSelector(FieldSelector fieldSelector) {
296287
}
297288

298289
public Builder<R> withComparableResourceVersion(boolean comparableResourceVersion) {
299-
this.comparableResourceVersion = comparableResourceVersion;
290+
config.withComparableResourceVersions(comparableResourceVersion);
291+
return this;
292+
}
293+
294+
public Builder<R> withGhostResourceCacheCheckInterval(
295+
Duration ghostResourceCacheCheckInterval) {
296+
config.withGhostResourceCacheCheckInterval(ghostResourceCacheCheckInterval);
300297
return this;
301298
}
302299

@@ -339,10 +336,7 @@ public InformerEventSourceConfiguration<R> build() {
339336
HasMetadata.getKind(primaryResourceClass),
340337
false)),
341338
config.build(),
342-
kubernetesClient,
343-
comparableResourceVersion);
339+
kubernetesClient);
344340
}
345341
}
346-
347-
boolean comparableResourceVersion();
348342
}

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/Constants.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
*/
1616
package io.javaoperatorsdk.operator.api.reconciler;
1717

18+
import java.time.Duration;
1819
import java.util.Collections;
1920
import java.util.Set;
2021

@@ -43,5 +44,9 @@ public final class Constants {
4344
public static final boolean DEFAULT_FOLLOW_CONTROLLER_NAMESPACE_CHANGES = true;
4445
public static final boolean DEFAULT_COMPARABLE_RESOURCE_VERSION = true;
4546

47+
public static final long DEFAULT_GHOST_RESOURCE_CHECK_INTERVAL_MILLIS = 3 * 60 * 1000;
48+
public static final Duration DEFAULT_GHOST_RESOURCE_CHECK_INTERVAL =
49+
Duration.ofMillis(DEFAULT_GHOST_RESOURCE_CHECK_INTERVAL_MILLIS);
50+
4651
private Constants() {}
4752
}

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/controller/ControllerEventSource.java

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -48,11 +48,7 @@ public class ControllerEventSource<T extends HasMetadata>
4848

4949
@SuppressWarnings({"unchecked", "rawtypes"})
5050
public ControllerEventSource(Controller<T> controller) {
51-
super(
52-
NAME,
53-
controller.getCRClient(),
54-
controller.getConfiguration(),
55-
controller.getConfiguration().getInformerConfig().isComparableResourceVersions());
51+
super(NAME, controller.getCRClient(), controller.getConfiguration());
5652
this.controller = controller;
5753

5854
final var config = controller.getConfiguration();

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerEventSource.java

Lines changed: 3 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -35,8 +35,6 @@
3535
import io.javaoperatorsdk.operator.processing.event.source.ResourceAction;
3636
import io.javaoperatorsdk.operator.processing.event.source.informer.TemporaryResourceCache.EventHandling;
3737

38-
import static io.javaoperatorsdk.operator.api.reconciler.Constants.DEFAULT_COMPARABLE_RESOURCE_VERSION;
39-
4038
/**
4139
* Wraps informer(s) so they are connected to the eventing system of the framework. Note that since
4240
* this is built on top of Fabric8 client Informers, it also supports caching resources using
@@ -58,29 +56,18 @@ public class InformerEventSource<R extends HasMetadata, P extends HasMetadata>
5856

5957
public InformerEventSource(
6058
InformerEventSourceConfiguration<R> configuration, EventSourceContext<P> context) {
61-
this(
62-
configuration,
63-
configuration.getKubernetesClient().orElse(context.getClient()),
64-
configuration.comparableResourceVersion());
65-
}
66-
67-
InformerEventSource(InformerEventSourceConfiguration<R> configuration, KubernetesClient client) {
68-
this(configuration, client, DEFAULT_COMPARABLE_RESOURCE_VERSION);
59+
this(configuration, configuration.getKubernetesClient().orElse(context.getClient()));
6960
}
7061

7162
@SuppressWarnings({"unchecked", "rawtypes"})
72-
private InformerEventSource(
73-
InformerEventSourceConfiguration<R> configuration,
74-
KubernetesClient client,
75-
boolean comparableResourceVersions) {
63+
InformerEventSource(InformerEventSourceConfiguration<R> configuration, KubernetesClient client) {
7664
super(
7765
configuration.name(),
7866
configuration
7967
.getGroupVersionKind()
8068
.map(gvk -> client.genericKubernetesResources(gvk.apiVersion(), gvk.getKind()))
8169
.orElseGet(() -> (MixedOperation) client.resources(configuration.getResourceClass())),
82-
configuration,
83-
comparableResourceVersions);
70+
configuration);
8471
// If there is a primary to secondary mapper there is no need for primary to secondary index.
8572
primaryToSecondaryMapper = configuration.getPrimaryToSecondaryMapper();
8673
if (useSecondaryToPrimaryIndex()) {

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerManager.java

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -230,11 +230,27 @@ private boolean isWatchingAllNamespaces() {
230230
return sources.containsKey(WATCH_ALL_NAMESPACES);
231231
}
232232

233+
public boolean isWatchingNamespace(String namespace) {
234+
// for cluster scoped resources we can assume
235+
// that we watch the whole cluster
236+
if (namespace == null) {
237+
return true;
238+
}
239+
if (isWatchingAllNamespaces()) {
240+
return true;
241+
}
242+
return sources.containsKey(namespace);
243+
}
244+
233245
private Optional<InformerWrapper<R>> getSource(String namespace) {
234246
namespace = isWatchingAllNamespaces() || namespace == null ? WATCH_ALL_NAMESPACES : namespace;
235247
return Optional.ofNullable(sources.get(namespace));
236248
}
237249

250+
String lastSyncResourceVersion(String namespace) {
251+
return getSource(namespace).orElseThrow().getLastSyncResourceVersion();
252+
}
253+
238254
@Override
239255
public void addIndexers(Map<String, Function<R, List<String>>> indexers) {
240256
this.indexers.putAll(indexers);

0 commit comments

Comments
 (0)