Skip to content

Commit 566e062

Browse files
authored
Merge pull request #407 from lburgazzoli/github-368
Enhance event handling
2 parents 0843339 + d7871f0 commit 566e062

File tree

8 files changed

+180
-49
lines changed

8 files changed

+180
-49
lines changed

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/Operator.java

Lines changed: 38 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -15,17 +15,22 @@
1515
import io.javaoperatorsdk.operator.processing.event.DefaultEventSourceManager;
1616
import io.javaoperatorsdk.operator.processing.event.internal.CustomResourceEventSource;
1717
import io.javaoperatorsdk.operator.processing.retry.GenericRetry;
18+
import java.io.Closeable;
19+
import java.io.IOException;
20+
import java.util.ArrayList;
1821
import java.util.Arrays;
22+
import java.util.List;
1923
import org.slf4j.Logger;
2024
import org.slf4j.LoggerFactory;
2125

2226
@SuppressWarnings("rawtypes")
23-
public class Operator {
27+
public class Operator implements AutoCloseable {
2428

2529
private static final Logger log = LoggerFactory.getLogger(Operator.class);
2630
private final KubernetesClient k8sClient;
2731
private final ConfigurationService configurationService;
2832
private final ObjectMapper objectMapper;
33+
private final List<Closeable> closeables;
2934

3035
public Operator(KubernetesClient k8sClient, ConfigurationService configurationService) {
3136
this(k8sClient, configurationService, new ObjectMapper());
@@ -38,6 +43,7 @@ public Operator(
3843
this.k8sClient = k8sClient;
3944
this.configurationService = configurationService;
4045
this.objectMapper = objectMapper;
46+
this.closeables = new ArrayList<>();
4147
}
4248

4349
/**
@@ -64,6 +70,21 @@ public void start() {
6470
}
6571
}
6672

73+
/** Stop the operator. */
74+
@Override
75+
public void close() {
76+
log.info("Operator {} is shutting down...", configurationService.getVersion().getSdkVersion());
77+
78+
for (Closeable closeable : this.closeables) {
79+
try {
80+
log.debug("closing {}", closeable);
81+
closeable.close();
82+
} catch (IOException e) {
83+
log.warn("Error closing {}", closeable, e);
84+
}
85+
}
86+
}
87+
6788
/**
6889
* Registers the specified controller with this operator.
6990
*
@@ -160,10 +181,15 @@ public <R extends CustomResource> void register(
160181
customResourceCache,
161182
watchAllNamespaces,
162183
targetNamespaces,
163-
defaultEventHandler,
164184
configuration.isGenerationAware(),
165-
finalizer);
166-
eventSourceManager.registerCustomResourceEventSource(customResourceEventSource);
185+
finalizer,
186+
resClass);
187+
188+
closeables.add(customResourceEventSource);
189+
closeables.add(eventSourceManager);
190+
191+
customResourceEventSource.setEventHandler(defaultEventHandler);
192+
customResourceEventSource.start();
167193

168194
log.info(
169195
"Registered Controller: '{}' for CRD: '{}' for namespace(s): {}",
@@ -178,18 +204,14 @@ private CustomResourceEventSource createCustomResourceEventSource(
178204
CustomResourceCache customResourceCache,
179205
boolean watchAllNamespaces,
180206
String[] targetNamespaces,
181-
DefaultEventHandler defaultEventHandler,
182207
boolean generationAware,
183-
String finalizer) {
184-
CustomResourceEventSource customResourceEventSource =
185-
watchAllNamespaces
186-
? CustomResourceEventSource.customResourceEventSourceForAllNamespaces(
187-
customResourceCache, client, generationAware, finalizer)
188-
: CustomResourceEventSource.customResourceEventSourceForTargetNamespaces(
189-
customResourceCache, client, targetNamespaces, generationAware, finalizer);
190-
191-
customResourceEventSource.setEventHandler(defaultEventHandler);
192-
193-
return customResourceEventSource;
208+
String finalizer,
209+
Class<?> resClass) {
210+
211+
return watchAllNamespaces
212+
? CustomResourceEventSource.customResourceEventSourceForAllNamespaces(
213+
customResourceCache, client, generationAware, finalizer, resClass)
214+
: CustomResourceEventSource.customResourceEventSourceForTargetNamespaces(
215+
customResourceCache, client, targetNamespaces, generationAware, finalizer, resClass);
194216
}
195217
}

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/DefaultEventHandler.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@ public class DefaultEventHandler implements EventHandler {
3737
private final EventDispatcher eventDispatcher;
3838
private final Retry retry;
3939
private final Map<String, RetryExecution> retryState = new HashMap<>();
40+
private final String controllerName;
4041
private DefaultEventSourceManager eventSourceManager;
4142

4243
private final ReentrantLock lock = new ReentrantLock();
@@ -50,6 +51,7 @@ public DefaultEventHandler(
5051
this.customResourceCache = customResourceCache;
5152
this.eventDispatcher = eventDispatcher;
5253
this.retry = retry;
54+
this.controllerName = relatedControllerName;
5355
eventBuffer = new EventBuffer();
5456
executor =
5557
new ScheduledThreadPoolExecutor(
@@ -70,6 +72,16 @@ public DefaultEventHandler(
7072
ConfigurationService.DEFAULT_RECONCILIATION_THREADS_NUMBER);
7173
}
7274

75+
@Override
76+
public void close() {
77+
if (eventSourceManager != null) {
78+
log.debug("Closing EventSourceManager {} -> {}", controllerName, eventSourceManager);
79+
eventSourceManager.close();
80+
}
81+
82+
executor.shutdownNow();
83+
}
84+
7385
public void setEventSourceManager(DefaultEventSourceManager eventSourceManager) {
7486
this.eventSourceManager = eventSourceManager;
7587
}

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/DefaultEventSourceManager.java

Lines changed: 40 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,10 @@
11
package io.javaoperatorsdk.operator.processing.event;
22

33
import io.javaoperatorsdk.operator.processing.DefaultEventHandler;
4-
import io.javaoperatorsdk.operator.processing.event.internal.CustomResourceEventSource;
54
import io.javaoperatorsdk.operator.processing.event.internal.TimerEventSource;
65
import java.util.Collections;
76
import java.util.Map;
7+
import java.util.Objects;
88
import java.util.Optional;
99
import java.util.concurrent.ConcurrentHashMap;
1010
import java.util.concurrent.locks.ReentrantLock;
@@ -17,9 +17,8 @@ public class DefaultEventSourceManager implements EventSourceManager {
1717
private static final Logger log = LoggerFactory.getLogger(DefaultEventSourceManager.class);
1818

1919
private final ReentrantLock lock = new ReentrantLock();
20-
private Map<String, EventSource> eventSources = new ConcurrentHashMap<>();
21-
private CustomResourceEventSource customResourceEventSource;
22-
private DefaultEventHandler defaultEventHandler;
20+
private final Map<String, EventSource> eventSources = new ConcurrentHashMap<>();
21+
private final DefaultEventHandler defaultEventHandler;
2322
private TimerEventSource retryTimerEventSource;
2423

2524
public DefaultEventSourceManager(DefaultEventHandler defaultEventHandler, boolean supportRetry) {
@@ -30,23 +29,53 @@ public DefaultEventSourceManager(DefaultEventHandler defaultEventHandler, boolea
3029
}
3130
}
3231

33-
public void registerCustomResourceEventSource(
34-
CustomResourceEventSource customResourceEventSource) {
35-
this.customResourceEventSource = customResourceEventSource;
36-
this.customResourceEventSource.addedToEventManager();
32+
@Override
33+
public void close() {
34+
try {
35+
lock.lock();
36+
for (var entry : eventSources.entrySet()) {
37+
try {
38+
log.debug("Closing {} -> {}", entry.getKey(), entry.getValue());
39+
entry.getValue().close();
40+
} catch (Exception e) {
41+
log.warn("Error closing {} -> {}", entry.getKey(), entry.getValue(), e);
42+
}
43+
}
44+
45+
eventSources.clear();
46+
} finally {
47+
lock.unlock();
48+
}
3749
}
3850

3951
@Override
40-
public <T extends EventSource> void registerEventSource(String name, T eventSource) {
52+
public final void registerEventSource(String name, EventSource eventSource) {
53+
Objects.requireNonNull(eventSource, "EventSource must not be null");
54+
4155
try {
4256
lock.lock();
43-
EventSource currentEventSource = eventSources.get(name);
44-
if (currentEventSource != null) {
57+
if (eventSources.containsKey(name)) {
4558
throw new IllegalStateException(
4659
"Event source with name already registered. Event source name: " + name);
4760
}
4861
eventSources.put(name, eventSource);
4962
eventSource.setEventHandler(defaultEventHandler);
63+
eventSource.start();
64+
} finally {
65+
lock.unlock();
66+
}
67+
}
68+
69+
@Override
70+
public Optional<EventSource> deRegisterEventSource(String name) {
71+
try {
72+
lock.lock();
73+
EventSource currentEventSource = eventSources.remove(name);
74+
if (currentEventSource != null) {
75+
currentEventSource.close();
76+
}
77+
78+
return Optional.ofNullable(currentEventSource);
5079
} finally {
5180
lock.unlock();
5281
}
Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,11 @@
11
package io.javaoperatorsdk.operator.processing.event;
22

3-
public interface EventHandler {
3+
import java.io.Closeable;
4+
5+
public interface EventHandler extends Closeable {
46

57
void handleEvent(Event event);
8+
9+
@Override
10+
default void close() {}
611
}

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/EventSource.java

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,21 @@
11
package io.javaoperatorsdk.operator.processing.event;
22

3-
public interface EventSource {
3+
import java.io.Closeable;
4+
5+
public interface EventSource extends Closeable {
6+
7+
/**
8+
* This method is invoked when this {@link EventSource} instance is properly registered to a
9+
* {@link EventSourceManager}.
10+
*/
11+
default void start() {}
12+
13+
/**
14+
* This method is invoked when this {@link EventSource} instance is de-registered from a {@link
15+
* EventSourceManager}.
16+
*/
17+
@Override
18+
default void close() {}
419

520
void setEventHandler(EventHandler eventHandler);
621

Lines changed: 24 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,36 @@
11
package io.javaoperatorsdk.operator.processing.event;
22

3+
import java.io.Closeable;
34
import java.util.Map;
45
import java.util.Optional;
56

6-
public interface EventSourceManager {
7+
public interface EventSourceManager extends Closeable {
78

8-
<T extends EventSource> void registerEventSource(String name, T eventSource);
9+
/**
10+
* Add the {@link EventSource} identified by the given <code>name</code> to the event manager.
11+
*
12+
* @param name the name of the {@link EventSource} to add
13+
* @param eventSource the {@link EventSource} to register
14+
* @thorw IllegalStateException if an {@link EventSource} with the same name is already
15+
* registered.
16+
*/
17+
void registerEventSource(String name, EventSource eventSource);
18+
19+
/**
20+
* Remove the {@link EventSource} identified by the given <code>name</code> from the event
21+
* manager.
22+
*
23+
* @param name the name of the {@link EventSource} to remove
24+
* @return an optional {@link EventSource} which would be empty if no {@link EventSource} have
25+
* been registered with the given name.
26+
*/
27+
Optional<EventSource> deRegisterEventSource(String name);
928

1029
Optional<EventSource> deRegisterCustomResourceFromEventSource(
1130
String name, String customResourceUid);
1231

1332
Map<String, EventSource> getRegisteredEventSources();
33+
34+
@Override
35+
default void close() {}
1436
}

0 commit comments

Comments
 (0)