Skip to content

Commit afbb9f6

Browse files
committed
Simplify work-stealing Future implementation
1 parent cf923fc commit afbb9f6

File tree

2 files changed

+34
-36
lines changed

2 files changed

+34
-36
lines changed

junit-platform-engine/src/main/java/org/junit/platform/engine/support/hierarchical/BlockingAwareFuture.java

Lines changed: 7 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -17,48 +17,40 @@
1717
import java.util.concurrent.Future;
1818
import java.util.concurrent.TimeUnit;
1919
import java.util.concurrent.TimeoutException;
20-
import java.util.function.Supplier;
2120

2221
import org.jspecify.annotations.Nullable;
2322

24-
class BlockingAwareFuture<T extends @Nullable Object> extends DelegatingFuture<T> {
23+
abstract class BlockingAwareFuture<T extends @Nullable Object> extends DelegatingFuture<T> {
2524

26-
private final BlockHandler handler;
27-
28-
BlockingAwareFuture(Future<T> delegate, BlockHandler handler) {
25+
BlockingAwareFuture(Future<T> delegate) {
2926
super(delegate);
30-
this.handler = handler;
3127
}
3228

3329
@Override
3430
public T get() throws InterruptedException, ExecutionException {
3531
if (delegate.isDone()) {
3632
return delegate.get();
3733
}
38-
return handle(delegate::get);
34+
return handleSafely(delegate::get);
3935
}
4036

4137
@Override
4238
public T get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
4339
if (delegate.isDone()) {
4440
return delegate.get();
4541
}
46-
return handle(() -> delegate.get(timeout, unit));
42+
return handleSafely(() -> delegate.get(timeout, unit));
4743
}
4844

49-
private T handle(Callable<T> callable) {
45+
private T handleSafely(Callable<T> callable) {
5046
try {
51-
return handler.handle(delegate::isDone, callable);
47+
return handle(callable);
5248
}
5349
catch (Exception e) {
5450
throw throwAsUncheckedException(e);
5551
}
5652
}
5753

58-
interface BlockHandler {
59-
60-
<T extends @Nullable Object> T handle(Supplier<Boolean> blockingUnnecessary, Callable<T> callable)
61-
throws Exception;
54+
protected abstract T handle(Callable<T> callable) throws Exception;
6255

63-
}
6456
}

junit-platform-engine/src/main/java/org/junit/platform/engine/support/hierarchical/ConcurrentHierarchicalTestExecutorService.java

Lines changed: 27 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -96,8 +96,7 @@ public void close() {
9696
return completedFuture(null);
9797
}
9898

99-
var entry = enqueue(testTask);
100-
return new BlockingAwareFuture<@Nullable Void>(entry.future(), new WorkerThread.BlockHandler(entry));
99+
return new WorkStealingFuture(enqueue(testTask));
101100
}
102101

103102
@Override
@@ -434,28 +433,36 @@ private interface BlockingAction<T> {
434433
T run() throws InterruptedException;
435434
}
436435

437-
private record BlockHandler(WorkQueue.Entry entry) implements BlockingAwareFuture.BlockHandler {
436+
}
438437

439-
@Override
440-
public <T> T handle(Supplier<Boolean> blockingUnnecessary, Callable<T> callable) throws Exception {
441-
var workerThread = get();
442-
if (workerThread == null || entry.future.isDone()) {
438+
private static class WorkStealingFuture extends BlockingAwareFuture<@Nullable Void> {
439+
440+
private final WorkQueue.Entry entry;
441+
442+
WorkStealingFuture(WorkQueue.Entry entry) {
443+
super(entry.future);
444+
this.entry = entry;
445+
}
446+
447+
@Override
448+
protected @Nullable Void handle(Callable<@Nullable Void> callable) throws Exception {
449+
var workerThread = WorkerThread.get();
450+
if (workerThread == null || entry.future.isDone()) {
451+
return callable.call();
452+
}
453+
workerThread.tryToStealWork(entry);
454+
if (entry.future.isDone()) {
455+
return callable.call();
456+
}
457+
LOGGER.trace(() -> "blocking for child task");
458+
return workerThread.runBlocking(() -> {
459+
try {
443460
return callable.call();
444461
}
445-
workerThread.tryToStealWork(entry);
446-
if (entry.future.isDone()) {
447-
return callable.call();
462+
catch (Exception ex) {
463+
throw throwAsUncheckedException(ex);
448464
}
449-
LOGGER.trace(() -> "blocking for child task");
450-
return workerThread.runBlocking(() -> {
451-
try {
452-
return callable.call();
453-
}
454-
catch (Exception ex) {
455-
throw throwAsUncheckedException(ex);
456-
}
457-
});
458-
}
465+
});
459466
}
460467

461468
}
@@ -612,5 +619,4 @@ void reacquire() throws InterruptedException {
612619
reacquisitionToken = null;
613620
}
614621
}
615-
616622
}

0 commit comments

Comments
 (0)