Skip to content

Commit ec10a35

Browse files
authored
MINOR: Simplify timeout handling in CoordinatorRuntime (#21334)
Remove the timeout parameter from all CoordinatorRuntime schedule methods and use the writeTimeout field (configured via the builder) instead. This simplifies the API since the timeout is now configured once at runtime construction time. - Rename defaultWriteTimeout to writeTimeout - Rename withDefaultWriteTimeOut to withWriteTimeout - Remove timeout parameter from scheduleWriteOperation, scheduleWriteAllOperation, scheduleTransactionalWriteOperation, and scheduleTransactionCompletion methods - Update GroupCoordinator.completeTransaction to remove timeout parameter - Update all callers and tests accordingly Reviewers: Sean Quah <squah@confluent.io>, Lianet Magrans <lmagrans@confluent.io>
1 parent 9175b88 commit ec10a35

File tree

11 files changed

+223
-451
lines changed

11 files changed

+223
-451
lines changed

coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/CoordinatorExecutorImpl.java

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,6 @@
2323

2424
import org.slf4j.Logger;
2525

26-
import java.time.Duration;
2726
import java.util.Map;
2827
import java.util.concurrent.ConcurrentHashMap;
2928
import java.util.concurrent.ExecutorService;
@@ -36,21 +35,18 @@ private record TaskResult<R>(R result, Throwable exception) { }
3635
private final TopicPartition shard;
3736
private final CoordinatorRuntime<S, U> runtime;
3837
private final ExecutorService executor;
39-
private final Duration writeTimeout;
4038
private final Map<String, TaskRunnable<?>> tasks = new ConcurrentHashMap<>();
4139

4240
public CoordinatorExecutorImpl(
4341
LogContext logContext,
4442
TopicPartition shard,
4543
CoordinatorRuntime<S, U> runtime,
46-
ExecutorService executor,
47-
Duration writeTimeout
44+
ExecutorService executor
4845
) {
4946
this.log = logContext.logger(CoordinatorExecutorImpl.class);
5047
this.shard = shard;
5148
this.runtime = runtime;
5249
this.executor = executor;
53-
this.writeTimeout = writeTimeout;
5450
}
5551

5652
private <R> TaskResult<R> executeTask(TaskRunnable<R> task) {
@@ -83,7 +79,6 @@ public <R> boolean schedule(
8379
runtime.scheduleWriteOperation(
8480
key,
8581
shard,
86-
writeTimeout,
8782
coordinator -> {
8883
// If the task associated with the key is not us, it means
8984
// that the task was either replaced or cancelled. We stop.

coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntime.java

Lines changed: 15 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -113,7 +113,7 @@ public static class Builder<S extends CoordinatorShard<U>, U> {
113113
private CoordinatorShardBuilderSupplier<S, U> coordinatorShardBuilderSupplier;
114114
private Time time = Time.SYSTEM;
115115
private Timer timer;
116-
private Duration defaultWriteTimeout;
116+
private Duration writeTimeout;
117117
private CoordinatorRuntimeMetrics runtimeMetrics;
118118
private CoordinatorMetrics coordinatorMetrics;
119119
private Serializer<U> serializer;
@@ -162,8 +162,8 @@ public Builder<S, U> withTimer(Timer timer) {
162162
return this;
163163
}
164164

165-
public Builder<S, U> withDefaultWriteTimeOut(Duration defaultWriteTimeout) {
166-
this.defaultWriteTimeout = defaultWriteTimeout;
165+
public Builder<S, U> withWriteTimeout(Duration writeTimeout) {
166+
this.writeTimeout = writeTimeout;
167167
return this;
168168
}
169169

@@ -246,7 +246,7 @@ public CoordinatorRuntime<S, U> build() {
246246
coordinatorShardBuilderSupplier,
247247
time,
248248
timer,
249-
defaultWriteTimeout,
249+
writeTimeout,
250250
runtimeMetrics,
251251
coordinatorMetrics,
252252
serializer,
@@ -378,7 +378,7 @@ public void schedule(
378378
@Override
379379
public void run() {
380380
String eventName = "Timeout(tp=" + tp + ", key=" + key + ")";
381-
CoordinatorWriteEvent<Void> event = new CoordinatorWriteEvent<>(eventName, tp, defaultWriteTimeout, coordinator -> {
381+
CoordinatorWriteEvent<Void> event = new CoordinatorWriteEvent<>(eventName, tp, writeTimeout, coordinator -> {
382382
log.debug("Executing write event {} for timer {}.", eventName, key);
383383

384384
// If the task is different, it means that the timer has been
@@ -644,8 +644,7 @@ private CoordinatorContext(
644644
logContext,
645645
tp,
646646
CoordinatorRuntime.this,
647-
executorService,
648-
defaultWriteTimeout
647+
executorService
649648
);
650649
this.bufferSupplier = new BufferSupplier.GrowableBufferSupplier();
651650
this.cachedBufferSize = new AtomicLong(0);
@@ -1964,7 +1963,7 @@ public void onHighWatermarkUpdated(
19641963
/**
19651964
* The write operation timeout
19661965
*/
1967-
private final Duration defaultWriteTimeout;
1966+
private final Duration writeTimeout;
19681967

19691968
/**
19701969
* The coordinators keyed by topic partition.
@@ -2051,7 +2050,7 @@ public void onHighWatermarkUpdated(
20512050
* @param coordinatorShardBuilderSupplier The coordinator builder.
20522051
* @param time The system time.
20532052
* @param timer The system timer.
2054-
* @param defaultWriteTimeout The write operation timeout.
2053+
* @param writeTimeout The write operation timeout.
20552054
* @param runtimeMetrics The runtime metrics.
20562055
* @param coordinatorMetrics The coordinator metrics.
20572056
* @param serializer The serializer.
@@ -2070,7 +2069,7 @@ private CoordinatorRuntime(
20702069
CoordinatorShardBuilderSupplier<S, U> coordinatorShardBuilderSupplier,
20712070
Time time,
20722071
Timer timer,
2073-
Duration defaultWriteTimeout,
2072+
Duration writeTimeout,
20742073
CoordinatorRuntimeMetrics runtimeMetrics,
20752074
CoordinatorMetrics coordinatorMetrics,
20762075
Serializer<U> serializer,
@@ -2083,7 +2082,7 @@ private CoordinatorRuntime(
20832082
this.log = logContext.logger(CoordinatorRuntime.class);
20842083
this.time = time;
20852084
this.timer = timer;
2086-
this.defaultWriteTimeout = defaultWriteTimeout;
2085+
this.writeTimeout = writeTimeout;
20872086
this.coordinators = new ConcurrentHashMap<>();
20882087
this.processor = processor;
20892088
this.partitionWriter = partitionWriter;
@@ -2198,7 +2197,6 @@ private void withActiveContextOrThrow(
21982197
*
21992198
* @param name The name of the write operation.
22002199
* @param tp The address of the coordinator (aka its topic-partitions).
2201-
* @param timeout The write operation timeout.
22022200
* @param op The write operation.
22032201
*
22042202
* @return A future that will be completed with the result of the write operation
@@ -2209,12 +2207,11 @@ private void withActiveContextOrThrow(
22092207
public <T> CompletableFuture<T> scheduleWriteOperation(
22102208
String name,
22112209
TopicPartition tp,
2212-
Duration timeout,
22132210
CoordinatorWriteOperation<S, T, U> op
22142211
) {
22152212
throwIfNotRunning();
22162213
log.debug("Scheduled execution of write operation {}.", name);
2217-
CoordinatorWriteEvent<T> event = new CoordinatorWriteEvent<>(name, tp, timeout, op);
2214+
CoordinatorWriteEvent<T> event = new CoordinatorWriteEvent<>(name, tp, writeTimeout, op);
22182215
enqueueLast(event);
22192216
return event.future;
22202217
}
@@ -2223,7 +2220,6 @@ public <T> CompletableFuture<T> scheduleWriteOperation(
22232220
* Schedule a write operation for each coordinator.
22242221
*
22252222
* @param name The name of the write operation.
2226-
* @param timeout The write operation timeout.
22272223
* @param op The write operation.
22282224
*
22292225
* @return A list of futures where each future will be completed with the result of the write operation
@@ -2233,15 +2229,14 @@ public <T> CompletableFuture<T> scheduleWriteOperation(
22332229
*/
22342230
public <T> List<CompletableFuture<T>> scheduleWriteAllOperation(
22352231
String name,
2236-
Duration timeout,
22372232
CoordinatorWriteOperation<S, T, U> op
22382233
) {
22392234
throwIfNotRunning();
22402235
log.debug("Scheduled execution of write all operation {}.", name);
22412236
return coordinators
22422237
.keySet()
22432238
.stream()
2244-
.map(tp -> scheduleWriteOperation(name, tp, timeout, op))
2239+
.map(tp -> scheduleWriteOperation(name, tp, op))
22452240
.collect(Collectors.toList());
22462241
}
22472242

@@ -2253,7 +2248,6 @@ public <T> List<CompletableFuture<T>> scheduleWriteAllOperation(
22532248
* @param transactionalId The transactional id.
22542249
* @param producerId The producer id.
22552250
* @param producerEpoch The producer epoch.
2256-
* @param timeout The write operation timeout.
22572251
* @param op The write operation.
22582252
* @param apiVersion The Version of the Txn_Offset_Commit request
22592253
*
@@ -2268,7 +2262,6 @@ public <T> CompletableFuture<T> scheduleTransactionalWriteOperation(
22682262
String transactionalId,
22692263
long producerId,
22702264
short producerEpoch,
2271-
Duration timeout,
22722265
CoordinatorWriteOperation<S, T, U> op,
22732266
int apiVersion
22742267
) {
@@ -2288,7 +2281,7 @@ public <T> CompletableFuture<T> scheduleTransactionalWriteOperation(
22882281
producerId,
22892282
producerEpoch,
22902283
verificationGuard,
2291-
timeout,
2284+
writeTimeout,
22922285
op
22932286
);
22942287
enqueueLast(event);
@@ -2317,8 +2310,7 @@ public CompletableFuture<Void> scheduleTransactionCompletion(
23172310
short producerEpoch,
23182311
int coordinatorEpoch,
23192312
TransactionResult result,
2320-
short transactionVersion,
2321-
Duration timeout
2313+
short transactionVersion
23222314
) {
23232315
throwIfNotRunning();
23242316
log.debug("Scheduled execution of transaction completion for {} with producer id={}, producer epoch={}, " +
@@ -2331,7 +2323,7 @@ public CompletableFuture<Void> scheduleTransactionCompletion(
23312323
coordinatorEpoch,
23322324
result,
23332325
transactionVersion,
2334-
timeout
2326+
writeTimeout
23352327
);
23362328
enqueueLast(event);
23372329
return event.future;

coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/CoordinatorExecutorImplTest.java

Lines changed: 10 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -60,19 +60,17 @@ public void testTaskSuccessfulLifecycle() {
6060
LOG_CONTEXT,
6161
SHARD_PARTITION,
6262
runtime,
63-
executorService,
64-
WRITE_TIMEOUT
63+
executorService
6564
);
6665

6766
when(runtime.scheduleWriteOperation(
6867
eq(TASK_KEY),
6968
eq(SHARD_PARTITION),
70-
eq(WRITE_TIMEOUT),
7169
any()
7270
)).thenAnswer(args -> {
7371
assertTrue(executor.isScheduled(TASK_KEY));
7472
CoordinatorRuntime.CoordinatorWriteOperation<CoordinatorShard<String>, Void, String> op =
75-
args.getArgument(3);
73+
args.getArgument(2);
7674
assertEquals(
7775
new CoordinatorResult<>(List.of("record"), null),
7876
op.generateRecordsAndResult(coordinatorShard)
@@ -120,18 +118,16 @@ public void testTaskFailedLifecycle() {
120118
LOG_CONTEXT,
121119
SHARD_PARTITION,
122120
runtime,
123-
executorService,
124-
WRITE_TIMEOUT
121+
executorService
125122
);
126123

127124
when(runtime.scheduleWriteOperation(
128125
eq(TASK_KEY),
129126
eq(SHARD_PARTITION),
130-
eq(WRITE_TIMEOUT),
131127
any()
132128
)).thenAnswer(args -> {
133129
CoordinatorRuntime.CoordinatorWriteOperation<CoordinatorShard<String>, Void, String> op =
134-
args.getArgument(3);
130+
args.getArgument(2);
135131
assertEquals(
136132
new CoordinatorResult<>(List.of(), null),
137133
op.generateRecordsAndResult(coordinatorShard)
@@ -178,8 +174,7 @@ public void testTaskCancelledBeforeBeingExecuted() {
178174
LOG_CONTEXT,
179175
SHARD_PARTITION,
180176
runtime,
181-
executorService,
182-
WRITE_TIMEOUT
177+
executorService
183178
);
184179

185180
when(executorService.submit(any(Runnable.class))).thenAnswer(args -> {
@@ -223,21 +218,19 @@ public void testTaskCancelledAfterBeingExecutedButBeforeWriteOperationIsExecuted
223218
LOG_CONTEXT,
224219
SHARD_PARTITION,
225220
runtime,
226-
executorService,
227-
WRITE_TIMEOUT
221+
executorService
228222
);
229223

230224
when(runtime.scheduleWriteOperation(
231225
eq(TASK_KEY),
232226
eq(SHARD_PARTITION),
233-
eq(WRITE_TIMEOUT),
234227
any()
235228
)).thenAnswer(args -> {
236229
// Cancel the task before running the write operation.
237230
executor.cancel(TASK_KEY);
238231

239232
CoordinatorRuntime.CoordinatorWriteOperation<CoordinatorShard<String>, Void, String> op =
240-
args.getArgument(3);
233+
args.getArgument(2);
241234
Throwable ex = assertThrows(RejectedExecutionException.class, () -> op.generateRecordsAndResult(coordinatorShard));
242235
return FutureUtils.failedFuture(ex);
243236
});
@@ -278,14 +271,12 @@ public void testTaskSchedulingWriteOperationFailed() {
278271
LOG_CONTEXT,
279272
SHARD_PARTITION,
280273
runtime,
281-
executorService,
282-
WRITE_TIMEOUT
274+
executorService
283275
);
284276

285277
when(runtime.scheduleWriteOperation(
286278
eq(TASK_KEY),
287279
eq(SHARD_PARTITION),
288-
eq(WRITE_TIMEOUT),
289280
any()
290281
)).thenReturn(FutureUtils.failedFuture(new Throwable("Oh no!")));
291282

@@ -327,19 +318,17 @@ public void testCancelAllTasks() {
327318
LOG_CONTEXT,
328319
SHARD_PARTITION,
329320
runtime,
330-
executorService,
331-
WRITE_TIMEOUT
321+
executorService
332322
);
333323

334324
List<CoordinatorRuntime.CoordinatorWriteOperation<CoordinatorShard<String>, Void, String>> writeOperations = new ArrayList<>();
335325
List<CompletableFuture<Void>> writeFutures = new ArrayList<>();
336326
when(runtime.scheduleWriteOperation(
337327
anyString(),
338328
eq(SHARD_PARTITION),
339-
eq(WRITE_TIMEOUT),
340329
any()
341330
)).thenAnswer(args -> {
342-
writeOperations.add(args.getArgument(3));
331+
writeOperations.add(args.getArgument(2));
343332
CompletableFuture<Void> writeFuture = new CompletableFuture<>();
344333
writeFutures.add(writeFuture);
345334
return writeFuture;

0 commit comments

Comments
 (0)