From 435ad50478ef8d33a1b07ef535e6c1ebc3fcf83f Mon Sep 17 00:00:00 2001 From: scottf Date: Sun, 23 Nov 2025 15:02:27 -0500 Subject: [PATCH 1/2] Adding flexibility for executor options --- src/main/java/io/nats/client/Options.java | 95 ++++++++++++++++++----- 1 file changed, 75 insertions(+), 20 deletions(-) diff --git a/src/main/java/io/nats/client/Options.java b/src/main/java/io/nats/client/Options.java index 0aa0a0127..bdf666ed7 100644 --- a/src/main/java/io/nats/client/Options.java +++ b/src/main/java/io/nats/client/Options.java @@ -533,6 +533,16 @@ public class Options { * {@link Builder#executor(ExecutorService) executor}. */ public static final String PROP_SCHEDULED_EXECUTOR_SERVICE_CLASS = "scheduled.executor.service.class"; + /** + * Property used to set class name for the Connect Executor Service (executor) class + * {@link Builder#connectExecutor(ExecutorService) connectExecutor}. + */ + public static final String PROP_CONNECT_EXECUTOR_SERVICE_CLASS = "connect.executor.service.class"; + /** + * Property used to set class name for the Callback Executor Service (executor) class + * {@link Builder#callbackExecutor(ExecutorService) callbackExecutor}. + */ + public static final String PROP_CALLBACK_EXECUTOR_SERVICE_CLASS = "callback.executor.service.class"; /** * Property used to set class name for the Connect Thread Factory * {@link Builder#connectThreadFactory(ThreadFactory) connectThreadFactory}. @@ -706,6 +716,8 @@ public class Options { private final ExecutorService executor; private final ScheduledExecutorService scheduledExecutor; + private final ExecutorService connectExecutor; + private final ExecutorService callbackExecutor; private final ThreadFactory connectThreadFactory; private final ThreadFactory callbackThreadFactory; private final ServerPool serverPool; @@ -716,17 +728,20 @@ public class Options { private final boolean enableFastFallback; static class DefaultThreadFactory implements ThreadFactory { + final ThreadGroup group; final String name; - final AtomicInteger threadNo; + final AtomicInteger threadNumber; public DefaultThreadFactory (String name){ this.name = name; - threadNo = new AtomicInteger(0); + SecurityManager s = System.getSecurityManager(); + group = (s != null) ? s.getThreadGroup() : Thread.currentThread().getThreadGroup(); + threadNumber = new AtomicInteger(0); } public Thread newThread(@NonNull Runnable r) { - String threadName = name+":"+threadNo.incrementAndGet(); - Thread t = new Thread(r,threadName); + String threadName = name + ":"+ threadNumber.incrementAndGet(); + Thread t = new Thread(group, r, threadName); if (t.isDaemon()) { t.setDaemon(false); } @@ -851,6 +866,8 @@ public static class Builder { private String dataPortType = DEFAULT_DATA_PORT_TYPE; private ExecutorService executor; private ScheduledExecutorService scheduledExecutor; + private ExecutorService connectExecutor; + private ExecutorService callbackExecutor; private ThreadFactory connectThreadFactory; private ThreadFactory callbackThreadFactory; private List> httpRequestInterceptors; @@ -989,6 +1006,8 @@ public Builder properties(Properties props) { classnameProperty(props, PROP_DISPATCHER_FACTORY_CLASS, o -> this.dispatcherFactory = (DispatcherFactory) o); classnameProperty(props, PROP_EXECUTOR_SERVICE_CLASS, o -> this.executor = (ExecutorService) o); classnameProperty(props, PROP_SCHEDULED_EXECUTOR_SERVICE_CLASS, o -> this.scheduledExecutor = (ScheduledExecutorService) o); + classnameProperty(props, PROP_CONNECT_EXECUTOR_SERVICE_CLASS, o -> this.connectExecutor = (ExecutorService) o); + classnameProperty(props, PROP_CALLBACK_EXECUTOR_SERVICE_CLASS, o -> this.callbackExecutor = (ExecutorService) o); classnameProperty(props, PROP_CONNECT_THREAD_FACTORY_CLASS, o -> this.connectThreadFactory = (ThreadFactory) o); classnameProperty(props, PROP_CALLBACK_THREAD_FACTORY_CLASS, o -> this.callbackThreadFactory = (ThreadFactory) o); return this; @@ -1692,7 +1711,7 @@ public Builder statisticsCollector(StatisticsCollector collector) { } /** - * Set the {@link ExecutorService ExecutorService} used to run threaded tasks. The default is a + * Set the {@link ExecutorService} used to run threaded tasks. The default is a * cached thread pool that names threads after the connection name (or a default). This executor * is used for reading and writing the underlying sockets as well as for each Dispatcher. * The default executor uses a short keepalive time, 500ms, to insure quick shutdowns. This is reasonable @@ -1709,7 +1728,7 @@ public Builder executor(ExecutorService executor) { } /** - * Set the {@link ScheduledExecutorService ScheduledExecutorService} used to run scheduled task like + * Set the {@link ScheduledExecutorService} used to run scheduled task like * heartbeat timers * The default is a ScheduledThreadPoolExecutor that does not * execute delayed tasks after shutdown and removes tasks on cancel; @@ -1721,6 +1740,28 @@ public Builder scheduledExecutor(ScheduledExecutorService scheduledExecutor) { return this; } + /** + * Set the {@link ExecutorService} used to make connections. + * The default is a Single Thread Executor + * @param connectExecutor The ExecutorService to make connections with. + * @return the Builder for chaining + */ + public Builder connectExecutor(ExecutorService connectExecutor) { + this.connectExecutor = connectExecutor; + return this; + } + + /** + * Set the {@link ExecutorService} used to make event callbacks with. + * The default is a Single Thread Executor + * @param callbackExecutor The ExecutorService to make event callbacks with. + * @return the Builder for chaining + */ + public Builder callbackExecutor(ExecutorService callbackExecutor) { + this.callbackExecutor = callbackExecutor; + return this; + } + /** * Sets custom thread factory for the executor service * @@ -2108,6 +2149,8 @@ public Builder(Options o) { this.trackAdvancedStats = o.trackAdvancedStats; this.executor = o.executor; this.scheduledExecutor = o.scheduledExecutor; + this.connectExecutor = o.connectExecutor; + this.callbackExecutor = o.callbackExecutor; this.callbackThreadFactory = o.callbackThreadFactory; this.connectThreadFactory = o.connectThreadFactory; this.httpRequestInterceptors = o.httpRequestInterceptors; @@ -2180,6 +2223,8 @@ private Options(Builder b) { this.trackAdvancedStats = b.trackAdvancedStats; this.executor = b.executor; this.scheduledExecutor = b.scheduledExecutor; + this.connectExecutor = b.connectExecutor; + this.callbackExecutor = b.callbackExecutor; this.callbackThreadFactory = b.callbackThreadFactory; this.connectThreadFactory = b.connectThreadFactory; this.httpRequestInterceptors = b.httpRequestInterceptors; @@ -2235,21 +2280,31 @@ private ScheduledExecutorService _getInternalScheduledExecutor() { } /** - * the callback executor, see {@link Builder#callbackThreadFactory(ThreadFactory) callbackThreadFactory()} in the builder doc + * the connect executor, see {@link Builder#connectThreadFactory(ThreadFactory) connectThreadFactory()} in the builder doc * @return the executor */ - public ExecutorService getCallbackExecutor() { - return this.callbackThreadFactory == null ? - DEFAULT_SINGLE_THREAD_EXECUTOR.get() : Executors.newSingleThreadExecutor(this.callbackThreadFactory); + public ExecutorService getConnectExecutor() { + if (connectExecutor != null) { + return connectExecutor; + } + if (connectThreadFactory != null) { + return Executors.newSingleThreadExecutor(this.connectThreadFactory); + } + return DEFAULT_SINGLE_THREAD_EXECUTOR.get(); } /** - * the connect executor, see {@link Builder#connectThreadFactory(ThreadFactory) connectThreadFactory()} in the builder doc + * the callback executor, see {@link Builder#callbackThreadFactory(ThreadFactory) callbackThreadFactory()} in the builder doc * @return the executor */ - public ExecutorService getConnectExecutor() { - return this.connectThreadFactory == null ? - DEFAULT_SINGLE_THREAD_EXECUTOR.get() : Executors.newSingleThreadExecutor(this.connectThreadFactory); + public ExecutorService getCallbackExecutor() { + if (callbackExecutor != null) { + return callbackExecutor; + } + if (callbackThreadFactory != null) { + return Executors.newSingleThreadExecutor(this.callbackThreadFactory); + } + return DEFAULT_SINGLE_THREAD_EXECUTOR.get(); } /** @@ -2269,19 +2324,19 @@ public boolean scheduledExecutorIsInternal() { } /** - * whether the callback executor is the internal one versus a user supplied one + * whether the connect executor is the internal one versus a user supplied one * @return true if the executor is internal */ - public boolean callbackExecutorIsInternal() { - return this.callbackThreadFactory == null; + public boolean connectExecutorIsInternal() { + return this.connectThreadFactory == null && connectExecutor == null; } /** - * whether the connect executor is the internal one versus a user supplied one + * whether the callback executor is the internal one versus a user supplied one * @return true if the executor is internal */ - public boolean connectExecutorIsInternal() { - return this.connectThreadFactory == null; + public boolean callbackExecutorIsInternal() { + return this.callbackThreadFactory == null && callbackExecutor == null; } /** From b993c0b8fb9e502c272af53701f2acd508231089 Mon Sep 17 00:00:00 2001 From: scottf Date: Sun, 23 Nov 2025 15:34:29 -0500 Subject: [PATCH 2/2] not worrying about group --- src/main/java/io/nats/client/Options.java | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/src/main/java/io/nats/client/Options.java b/src/main/java/io/nats/client/Options.java index bdf666ed7..aa64f2355 100644 --- a/src/main/java/io/nats/client/Options.java +++ b/src/main/java/io/nats/client/Options.java @@ -728,20 +728,17 @@ public class Options { private final boolean enableFastFallback; static class DefaultThreadFactory implements ThreadFactory { - final ThreadGroup group; final String name; final AtomicInteger threadNumber; public DefaultThreadFactory (String name){ this.name = name; - SecurityManager s = System.getSecurityManager(); - group = (s != null) ? s.getThreadGroup() : Thread.currentThread().getThreadGroup(); threadNumber = new AtomicInteger(0); } public Thread newThread(@NonNull Runnable r) { String threadName = name + ":"+ threadNumber.incrementAndGet(); - Thread t = new Thread(group, r, threadName); + Thread t = new Thread(r, threadName); if (t.isDaemon()) { t.setDaemon(false); }