Skip to content

Commit 9175b88

Browse files
authored
MINOR: Move DeferredEventCollection out from CoordinatorRuntime (#21348)
Extract DeferredEventCollection from being an inner class of CoordinatorRuntime into its own standalone file. This improves code organization and adds unit tests for the class. Reviewers: Sean Quah <squah@confluent.io>, Lianet Magrans <lmagrans@confluent.io>
1 parent aad33e4 commit 9175b88

File tree

3 files changed

+190
-52
lines changed

3 files changed

+190
-52
lines changed

coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntime.java

Lines changed: 0 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -1230,58 +1230,6 @@ public void run() {
12301230
}
12311231
}
12321232

1233-
/**
1234-
* A collection of {@link DeferredEvent}. When completed, completes all the events in the collection
1235-
* and logs any exceptions thrown.
1236-
*/
1237-
static class DeferredEventCollection implements DeferredEvent {
1238-
/**
1239-
* The logger.
1240-
*/
1241-
private final Logger log;
1242-
1243-
/**
1244-
* The list of events.
1245-
*/
1246-
private final List<DeferredEvent> events = new ArrayList<>();
1247-
1248-
public DeferredEventCollection(Logger log) {
1249-
this.log = log;
1250-
}
1251-
1252-
@Override
1253-
public void complete(Throwable t) {
1254-
for (DeferredEvent event : events) {
1255-
try {
1256-
event.complete(t);
1257-
} catch (Throwable e) {
1258-
log.error("Completion of event {} failed due to {}.", event, e.getMessage(), e);
1259-
}
1260-
}
1261-
}
1262-
1263-
public boolean add(DeferredEvent event) {
1264-
return events.add(event);
1265-
}
1266-
1267-
public int size() {
1268-
return events.size();
1269-
}
1270-
1271-
@Override
1272-
public String toString() {
1273-
return "DeferredEventCollection(events=" + events + ")";
1274-
}
1275-
1276-
public static DeferredEventCollection of(Logger log, DeferredEvent... deferredEvents) {
1277-
DeferredEventCollection collection = new DeferredEventCollection(log);
1278-
for (DeferredEvent deferredEvent : deferredEvents) {
1279-
collection.add(deferredEvent);
1280-
}
1281-
return collection;
1282-
}
1283-
}
1284-
12851233
/**
12861234
* A coordinator write operation.
12871235
*
Lines changed: 76 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,76 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.kafka.coordinator.common.runtime;
18+
19+
import org.apache.kafka.deferred.DeferredEvent;
20+
21+
import org.slf4j.Logger;
22+
23+
import java.util.ArrayList;
24+
import java.util.List;
25+
26+
/**
27+
* A collection of {@link DeferredEvent}. When completed, completes all the events in the collection
28+
* and logs any exceptions thrown.
29+
*/
30+
public class DeferredEventCollection implements DeferredEvent {
31+
/**
32+
* The logger.
33+
*/
34+
private final Logger log;
35+
36+
/**
37+
* The list of events.
38+
*/
39+
private final List<DeferredEvent> events = new ArrayList<>();
40+
41+
public DeferredEventCollection(Logger log) {
42+
this.log = log;
43+
}
44+
45+
@Override
46+
public void complete(Throwable t) {
47+
for (DeferredEvent event : events) {
48+
try {
49+
event.complete(t);
50+
} catch (Throwable e) {
51+
log.error("Completion of event {} failed due to {}.", event, e.getMessage(), e);
52+
}
53+
}
54+
}
55+
56+
public boolean add(DeferredEvent event) {
57+
return events.add(event);
58+
}
59+
60+
public int size() {
61+
return events.size();
62+
}
63+
64+
@Override
65+
public String toString() {
66+
return "DeferredEventCollection(events=" + events + ")";
67+
}
68+
69+
public static DeferredEventCollection of(Logger log, DeferredEvent... deferredEvents) {
70+
DeferredEventCollection collection = new DeferredEventCollection(log);
71+
for (DeferredEvent deferredEvent : deferredEvents) {
72+
collection.add(deferredEvent);
73+
}
74+
return collection;
75+
}
76+
}
Lines changed: 114 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,114 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.kafka.coordinator.common.runtime;
18+
19+
import org.apache.kafka.common.utils.LogContext;
20+
import org.apache.kafka.deferred.DeferredEvent;
21+
22+
import org.junit.jupiter.api.Test;
23+
import org.slf4j.Logger;
24+
25+
import java.util.ArrayList;
26+
import java.util.List;
27+
28+
import static org.junit.jupiter.api.Assertions.assertEquals;
29+
import static org.junit.jupiter.api.Assertions.assertTrue;
30+
31+
public class DeferredEventCollectionTest {
32+
33+
private static final Logger LOG = new LogContext().logger(DeferredEventCollectionTest.class);
34+
35+
@Test
36+
public void testAddAndSize() {
37+
DeferredEventCollection collection = new DeferredEventCollection(LOG);
38+
assertEquals(0, collection.size());
39+
40+
assertTrue(collection.add(t -> { }));
41+
assertEquals(1, collection.size());
42+
43+
assertTrue(collection.add(t -> { }));
44+
assertEquals(2, collection.size());
45+
}
46+
47+
@Test
48+
public void testCompleteCallsAllEvents() {
49+
List<Throwable> completedWith = new ArrayList<>();
50+
51+
DeferredEventCollection collection = new DeferredEventCollection(LOG);
52+
collection.add(completedWith::add);
53+
collection.add(completedWith::add);
54+
collection.add(completedWith::add);
55+
56+
collection.complete(null);
57+
58+
assertEquals(3, completedWith.size());
59+
for (Throwable t : completedWith) {
60+
assertEquals(null, t);
61+
}
62+
}
63+
64+
@Test
65+
public void testCompleteWithException() {
66+
List<Throwable> completedWith = new ArrayList<>();
67+
RuntimeException exception = new RuntimeException("test exception");
68+
69+
DeferredEventCollection collection = new DeferredEventCollection(LOG);
70+
collection.add(completedWith::add);
71+
collection.add(completedWith::add);
72+
73+
collection.complete(exception);
74+
75+
assertEquals(2, completedWith.size());
76+
for (Throwable t : completedWith) {
77+
assertEquals(exception, t);
78+
}
79+
}
80+
81+
@Test
82+
public void testCompleteContinuesOnEventFailure() {
83+
List<Throwable> completedWith = new ArrayList<>();
84+
85+
DeferredEventCollection collection = new DeferredEventCollection(LOG);
86+
collection.add(completedWith::add);
87+
collection.add(t -> {
88+
throw new RuntimeException("event failure");
89+
});
90+
collection.add(completedWith::add);
91+
92+
// Should not throw, and should complete all events
93+
collection.complete(null);
94+
95+
// The first and third events should have been completed
96+
assertEquals(2, completedWith.size());
97+
}
98+
99+
@Test
100+
public void testOfFactoryMethod() {
101+
DeferredEvent event1 = t -> { };
102+
DeferredEvent event2 = t -> { };
103+
104+
DeferredEventCollection collection = DeferredEventCollection.of(LOG, event1, event2);
105+
106+
assertEquals(2, collection.size());
107+
}
108+
109+
@Test
110+
public void testOfFactoryMethodEmpty() {
111+
DeferredEventCollection collection = DeferredEventCollection.of(LOG);
112+
assertEquals(0, collection.size());
113+
}
114+
}

0 commit comments

Comments
 (0)