From 7d4f0811189e84a6f5e3df116f168c26133933de Mon Sep 17 00:00:00 2001 From: franz1981 Date: Tue, 30 May 2023 09:30:33 +0200 Subject: [PATCH 1/2] Replace ThreadLocals --- .../await/impl/VirtualThreadContext.java | 61 +++++++++---------- 1 file changed, 29 insertions(+), 32 deletions(-) diff --git a/vertx-async-await-incubator/src/main/java/io/vertx/await/impl/VirtualThreadContext.java b/vertx-async-await-incubator/src/main/java/io/vertx/await/impl/VirtualThreadContext.java index 2f0894c..df7ed0d 100644 --- a/vertx-async-await-incubator/src/main/java/io/vertx/await/impl/VirtualThreadContext.java +++ b/vertx-async-await-incubator/src/main/java/io/vertx/await/impl/VirtualThreadContext.java @@ -4,20 +4,10 @@ import io.vertx.core.Context; import io.vertx.core.Handler; import io.vertx.core.Vertx; -import io.vertx.core.impl.CloseFuture; -import io.vertx.core.impl.ContextBase; -import io.vertx.core.impl.ContextInternal; -import io.vertx.core.impl.Deployment; -import io.vertx.core.impl.VertxImpl; -import io.vertx.core.impl.VertxInternal; -import io.vertx.core.impl.WorkerPool; +import io.vertx.core.impl.*; import java.util.Objects; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.CompletionStage; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.Executor; -import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.*; import java.util.concurrent.locks.Lock; import java.util.function.Consumer; @@ -32,7 +22,11 @@ public static VirtualThreadContext create(Vertx vertx, EventLoop nettyEventLoop, } private final Scheduler scheduler; - private final ThreadLocal inThread = new ThreadLocal<>(); + + // Use this instead of a ThreadLocal because must friendly with Virtual Threads! + // ideally we should use https://github.com/JCTools/JCTools/blob/master/jctools-core/src/main/java/org/jctools/maps/NonBlockingHashMap.java + // which doesn't use any synchronized op! + private final ConcurrentHashSet inThread = new ConcurrentHashSet<>(); VirtualThreadContext(VertxInternal vertx, EventLoop eventLoop, @@ -97,38 +91,39 @@ public boolean isWorkerContext() { private void run(ContextInternal ctx, T value, Handler task) { Objects.requireNonNull(task, "Task handler must not be null"); scheduler.execute(() -> { - inThread.set(true); + var current = Thread.currentThread(); + inThread.add(current); try { ctx.dispatch(value, task); } finally { - inThread.remove(); + inThread.remove(current); } }); } private void execute2(T argument, Handler task) { if (Context.isOnWorkerThread()) { - inThread.set(true); - try { - task.handle(argument); - } finally { - inThread.remove(); - } + handle(argument, task); } else { scheduler.execute(() -> { - inThread.set(true); - try { - task.handle(argument); - } finally { - inThread.remove(); - } + handle(argument, task); }); } } + private void handle(T argument, Handler task) { + var current = Thread.currentThread(); + inThread.add(current); + try { + task.handle(argument); + } finally { + inThread.remove(current); + } + } + @Override public boolean inThread() { - return inThread.get() == Boolean.TRUE; + return inThread.contains(Thread.currentThread()); } @Override @@ -138,7 +133,8 @@ public ContextInternal duplicate() { } public void lock(Lock lock) { - inThread.remove(); + var current = Thread.currentThread(); + inThread.remove(current); Consumer cont = scheduler.unschedule(); CompletableFuture latch = new CompletableFuture<>(); try { @@ -154,12 +150,13 @@ public void lock(Lock lock) { } catch (ExecutionException e) { throwAsUnchecked(e); } finally { - inThread.set(true); + inThread.add(current); } } public T await(CompletionStage fut) { - inThread.remove(); + var current = Thread.currentThread(); + inThread.remove(current); Consumer cont = scheduler.unschedule(); CompletableFuture latch = new CompletableFuture<>(); fut.whenComplete((v, err) -> { @@ -175,7 +172,7 @@ public T await(CompletionStage fut) { throwAsUnchecked(e.getCause()); return null; } finally { - inThread.set(true); + inThread.add(current); } } From 20ed13d8e67dfdd580121ac0a20895fa5f9ab3eb Mon Sep 17 00:00:00 2001 From: franz1981 Date: Tue, 30 May 2023 09:47:26 +0200 Subject: [PATCH 2/2] Add few comments and add pooling of v thread ctx in the new freshly created ctx --- .../src/main/java/io/vertx/await/Async.java | 21 +++++++++------ .../vertx/await/impl/EventLoopScheduler.java | 26 ++++++++++++------- .../EventLoopVirtualThreadContextTest.java | 19 ++++++++++++++ 3 files changed, 48 insertions(+), 18 deletions(-) diff --git a/vertx-async-await-incubator/src/main/java/io/vertx/await/Async.java b/vertx-async-await-incubator/src/main/java/io/vertx/await/Async.java index 5ff720d..aeda07b 100644 --- a/vertx-async-await-incubator/src/main/java/io/vertx/await/Async.java +++ b/vertx-async-await-incubator/src/main/java/io/vertx/await/Async.java @@ -18,6 +18,7 @@ public class Async { private final Vertx vertx; private final boolean useVirtualEventLoopThreads; + private static final String VTHREAD_CTX = "VTHREAD_CTX"; public Async(Vertx vertx) { this(vertx, false); @@ -32,17 +33,21 @@ public Async(Vertx vertx, boolean useVirtualEventLoopThreads) { * Run a task on a virtual thread */ public void run(Handler task) { + assert !Thread.currentThread().isVirtual(); Context ctx = vertx.getOrCreateContext(); - EventLoop eventLoop; - if (ctx.isEventLoopContext()) { - eventLoop = ((ContextInternal)ctx).nettyEventLoop(); - } else { + if (!ctx.isEventLoopContext()) { throw new IllegalStateException(); } - // Scheduler scheduler = useVirtualEventLoopThreads ? new SchedulerImpl(LoomaniaScheduler2.threadFactory(eventLoop)): new SchedulerImpl(SchedulerImpl.DEFAULT_THREAD_FACTORY); - Scheduler scheduler = useVirtualEventLoopThreads ? new EventLoopScheduler(eventLoop) : new DefaultScheduler(DefaultScheduler.DEFAULT_THREAD_FACTORY); - VirtualThreadContext context = VirtualThreadContext.create(vertx, eventLoop, scheduler); - context.runOnContext(task); + var unsafeCtx = (ContextInternal) ctx; + VirtualThreadContext virtualCtx = unsafeCtx.getLocal(VTHREAD_CTX); + if (virtualCtx == null) { + EventLoop eventLoop = unsafeCtx.nettyEventLoop(); + // Scheduler scheduler = useVirtualEventLoopThreads ? new SchedulerImpl(LoomaniaScheduler2.threadFactory(eventLoop)): new SchedulerImpl(SchedulerImpl.DEFAULT_THREAD_FACTORY); + Scheduler scheduler = useVirtualEventLoopThreads ? new EventLoopScheduler(eventLoop) : new DefaultScheduler(DefaultScheduler.DEFAULT_THREAD_FACTORY); + virtualCtx = VirtualThreadContext.create(vertx, eventLoop, scheduler); + unsafeCtx.putLocal(VTHREAD_CTX, virtualCtx); + } + virtualCtx.runOnContext(task); } private static VirtualThreadContext virtualThreadContext() { diff --git a/vertx-async-await-incubator/src/main/java/io/vertx/await/impl/EventLoopScheduler.java b/vertx-async-await-incubator/src/main/java/io/vertx/await/impl/EventLoopScheduler.java index 0843662..ef73663 100644 --- a/vertx-async-await-incubator/src/main/java/io/vertx/await/impl/EventLoopScheduler.java +++ b/vertx-async-await-incubator/src/main/java/io/vertx/await/impl/EventLoopScheduler.java @@ -52,23 +52,29 @@ private static ThreadFactory threadFactory(Executor carrier) { private final ThreadFactory threadFactory; private final LinkedList tasks = new LinkedList<>(); - private boolean flag; + private boolean runOnContext; public EventLoopScheduler(EventLoop carrier) { this(command -> { - if (carrier.inEventLoop()) { - command.run(); - } else { - carrier.execute(command); - } + execute(carrier, command); }); } - public EventLoopScheduler(Executor carrier) { + private static void execute(EventLoop carrier, Runnable command) { + if (carrier.inEventLoop()) { + command.run(); + } else { + carrier.execute(command); + } + } + + private EventLoopScheduler(Executor carrier) { threadFactory = threadFactory(command -> { - if (flag) { + if (runOnContext) { tasks.addLast(command); } else { + // "external" continuations are prioritized and placed + // upfront, to be consumed first tasks.addFirst(command); } carrier.execute(() -> { @@ -84,11 +90,11 @@ public Consumer unschedule() { public void execute(Runnable runnable) { Thread thread = threadFactory.newThread(runnable); - flag = true; + runOnContext = true; try { thread.start(); } finally { - flag = false; + runOnContext = false; } } } diff --git a/vertx-async-await-incubator/src/test/java/io/vertx/await/EventLoopVirtualThreadContextTest.java b/vertx-async-await-incubator/src/test/java/io/vertx/await/EventLoopVirtualThreadContextTest.java index 79eac8b..100067d 100644 --- a/vertx-async-await-incubator/src/test/java/io/vertx/await/EventLoopVirtualThreadContextTest.java +++ b/vertx-async-await-incubator/src/test/java/io/vertx/await/EventLoopVirtualThreadContextTest.java @@ -30,4 +30,23 @@ public void testSuspend() { await(); } + @Test + public void testSuspendFromEventLoop() { + vertx.runOnContext(v0 -> { + async.run(v1 -> { + CompletableFuture cf = new CompletableFuture<>(); + vertx.runOnContext(v2 -> { + cf.complete(null); + }); + try { + cf.get(10, TimeUnit.SECONDS); + } catch (Exception e) { + fail(e); + } + testComplete(); + }); + }); + await(); + } + }