diff --git a/vertx-async-await-incubator/src/test/java/io/vertx/await/VirtualThreadContextTestBase.java b/vertx-async-await-incubator/src/test/java/io/vertx/await/VirtualThreadContextTestBase.java index 42293cc..627e77c 100644 --- a/vertx-async-await-incubator/src/test/java/io/vertx/await/VirtualThreadContextTestBase.java +++ b/vertx-async-await-incubator/src/test/java/io/vertx/await/VirtualThreadContextTestBase.java @@ -12,7 +12,10 @@ import java.util.ArrayList; import java.util.List; +import java.util.concurrent.Callable; import java.util.concurrent.locks.ReentrantLock; +import java.util.function.Consumer; +import java.util.function.Function; public abstract class VirtualThreadContextTestBase extends VertxTestBase { @@ -116,7 +119,7 @@ public void testDuplicateUseSameThread() { async.run(v -> { ContextInternal context = (ContextInternal) vertx.getOrCreateContext(); Thread th = Thread.currentThread(); - for (int i = 0;i < num;i++) { + for (int i = 0; i < num; i++) { ContextInternal duplicate = context.duplicate(); duplicate.runOnContext(v2 -> { // assertSame(th, Thread.currentThread()); @@ -135,7 +138,7 @@ public void testDuplicateConcurrentAwait() { ContextInternal context = (ContextInternal) vertx.getOrCreateContext(); Object lock = new Object(); List> list = new ArrayList<>(); - for (int i = 0;i < num;i++) { + for (int i = 0; i < num; i++) { ContextInternal duplicate = context.duplicate(); duplicate.runOnContext(v2 -> { Promise promise = duplicate.promise(); @@ -222,4 +225,76 @@ public void testAcquireLock() throws Exception { }); await(); } + + + protected void testInterruption(Callable sleeper, boolean assertIsInterrupted) { + async.run(v1 -> { + var thisThread = Thread.currentThread(); + vertx.runOnContext(v2 -> thisThread.interrupt()); + assertFalse(Thread.currentThread().isInterrupted()); + try { + sleeper.call(); + } catch (InterruptedException e) { + assertEquals(thisThread, Thread.currentThread()); + assertTrue(Thread.currentThread().isVirtual()); + if (assertIsInterrupted) { + assertTrue("thread should be interrupted", Thread.currentThread().isInterrupted()); + } + testComplete(); + return; + } catch (Exception e) { + fail(e); + return; + } + + fail("should have interrupted"); + }); + + await(); + } + + @Test + public void testInterruptsVertxSleepAssertInterrupted() throws Exception { + testInterruption(() -> { + sleepVertxInterruptibly(1000); + return null; + }, true); + } + + @Test + public void testInterruptsThreadSleepAssertInterrupted() throws Exception { + testInterruption(() -> { + Thread.sleep(1000); + return null; + }, true); + } + + @Test + public void testInterruptsVertxSleepIgnoreInterruptedStatus() throws Exception { + testInterruption(() -> { + sleepVertxInterruptibly(1000); + return null; + }, false); + } + + @Test + public void testInterruptsThreadSleepIgnoreInterruptedStatus() throws Exception { + testInterruption(() -> { + Thread.sleep(1000); + return null; + }, false); + } + + private void sleepVertxInterruptibly(long milliseconds) throws InterruptedException { + var promise = Promise.promise(); + vertx.setTimer(milliseconds, promise::complete); + try { + Async.await(promise.future()); + } catch (Throwable t) { + if (t.getCause() instanceof InterruptedException interruptedException) { + throw interruptedException; + } + throw t; + } + } } diff --git a/vertx-execute-blocking-incubator/src/test/java/io/vertx/await/ExecuteBlockingTest.java b/vertx-execute-blocking-incubator/src/test/java/io/vertx/await/ExecuteBlockingTest.java index c675922..b6c22e5 100644 --- a/vertx-execute-blocking-incubator/src/test/java/io/vertx/await/ExecuteBlockingTest.java +++ b/vertx-execute-blocking-incubator/src/test/java/io/vertx/await/ExecuteBlockingTest.java @@ -1,6 +1,7 @@ package io.vertx.await; import io.vertx.core.Future; +import io.vertx.core.Promise; import io.vertx.executeblocking.ExecuteBlocking; import io.vertx.test.core.VertxTestBase; import org.junit.Test; @@ -25,6 +26,45 @@ public void testCompletion() { await(); } + @Test + public void testInterruptsStatusIgnored() { + testInterrupts(false); + } + + @Test + public void testInterruptsStatusAsserted() { + testInterrupts(true); + } + + private void testInterrupts(boolean assertInterrupted) { + vertx.runOnContext(v -> { + var thisThread = Promise.promise(); + var wasInterrupted = ExecuteBlocking.executeBlocking(() -> { + thisThread.tryComplete(Thread.currentThread()); + + try { + Thread.sleep(10000); + } catch (InterruptedException interrupted) { + // we successfully interrupted + if (assertInterrupted) { + assertTrue(Thread.currentThread().isInterrupted()); + } + return null; + } + + throw new IllegalStateException("should not be reached"); + }); + + thisThread.future() + .compose(thread -> { + thread.interrupt(); + return wasInterrupted; + }) + .onComplete(onSuccess(t -> testComplete())); + }); + await(); + } + @Test public void testFailure() { Exception failure = new Exception(); @@ -46,7 +86,7 @@ public void testManyThreads() { vertx.runOnContext(v -> { CyclicBarrier barrier = new CyclicBarrier(num); AtomicInteger count = new AtomicInteger(); - for (int i = 0;i < num;i++) { + for (int i = 0; i < num; i++) { Future fut = ExecuteBlocking.executeBlocking(() -> { barrier.await(); return "Hello";