diff --git a/src/main/java/io/nats/client/Options.java b/src/main/java/io/nats/client/Options.java index c5151dafb..c3c2e8480 100644 --- a/src/main/java/io/nats/client/Options.java +++ b/src/main/java/io/nats/client/Options.java @@ -533,6 +533,16 @@ public class Options { * {@link Builder#executor(ExecutorService) executor}. */ public static final String PROP_SCHEDULED_EXECUTOR_SERVICE_CLASS = "scheduled.executor.service.class"; + /** + * Property used to set class name for the Connect Executor Service (executor) class + * {@link Builder#connectExecutor(ExecutorService) connectExecutor}. + */ + public static final String PROP_CONNECT_EXECUTOR_SERVICE_CLASS = "connect.executor.service.class"; + /** + * Property used to set class name for the Callback Executor Service (executor) class + * {@link Builder#callbackExecutor(ExecutorService) callbackExecutor}. + */ + public static final String PROP_CALLBACK_EXECUTOR_SERVICE_CLASS = "callback.executor.service.class"; /** * Property used to set class name for the Connect Thread Factory * {@link Builder#connectThreadFactory(ThreadFactory) connectThreadFactory}. @@ -706,6 +716,8 @@ public class Options { private final ExecutorService executor; private final ScheduledExecutorService scheduledExecutor; + private final ExecutorService connectExecutor; + private final ExecutorService callbackExecutor; private final ThreadFactory connectThreadFactory; private final ThreadFactory callbackThreadFactory; private final ServerPool serverPool; @@ -717,16 +729,16 @@ public class Options { static class DefaultThreadFactory implements ThreadFactory { final String name; - final AtomicInteger threadNo; + final AtomicInteger threadNumber; public DefaultThreadFactory (String name){ this.name = name; - threadNo = new AtomicInteger(0); + threadNumber = new AtomicInteger(0); } public Thread newThread(@NonNull Runnable r) { - String threadName = name+":"+threadNo.incrementAndGet(); - Thread t = new Thread(r,threadName); + String threadName = name + ":"+ threadNumber.incrementAndGet(); + Thread t = new Thread(r, threadName); if (t.isDaemon()) { t.setDaemon(false); } @@ -851,6 +863,8 @@ public static class Builder { private String dataPortType = DEFAULT_DATA_PORT_TYPE; private ExecutorService executor; private ScheduledExecutorService scheduledExecutor; + private ExecutorService connectExecutor; + private ExecutorService callbackExecutor; private ThreadFactory connectThreadFactory; private ThreadFactory callbackThreadFactory; private List> httpRequestInterceptors; @@ -989,6 +1003,8 @@ public Builder properties(Properties props) { classnameProperty(props, PROP_DISPATCHER_FACTORY_CLASS, o -> this.dispatcherFactory = (DispatcherFactory) o); classnameProperty(props, PROP_EXECUTOR_SERVICE_CLASS, o -> this.executor = (ExecutorService) o); classnameProperty(props, PROP_SCHEDULED_EXECUTOR_SERVICE_CLASS, o -> this.scheduledExecutor = (ScheduledExecutorService) o); + classnameProperty(props, PROP_CONNECT_EXECUTOR_SERVICE_CLASS, o -> this.connectExecutor = (ExecutorService) o); + classnameProperty(props, PROP_CALLBACK_EXECUTOR_SERVICE_CLASS, o -> this.callbackExecutor = (ExecutorService) o); classnameProperty(props, PROP_CONNECT_THREAD_FACTORY_CLASS, o -> this.connectThreadFactory = (ThreadFactory) o); classnameProperty(props, PROP_CALLBACK_THREAD_FACTORY_CLASS, o -> this.callbackThreadFactory = (ThreadFactory) o); return this; @@ -1692,7 +1708,7 @@ public Builder statisticsCollector(StatisticsCollector collector) { } /** - * Set the {@link ExecutorService ExecutorService} used to run threaded tasks. The default is a + * Set the {@link ExecutorService} used to run threaded tasks. The default is a * cached thread pool that names threads after the connection name (or a default). This executor * is used for reading and writing the underlying sockets as well as for each Dispatcher. * The default executor uses a short keepalive time, 500ms, to insure quick shutdowns. This is reasonable @@ -1709,7 +1725,7 @@ public Builder executor(ExecutorService executor) { } /** - * Set the {@link ScheduledExecutorService ScheduledExecutorService} used to run scheduled task like + * Set the {@link ScheduledExecutorService} used to run scheduled task like * heartbeat timers * The default is a ScheduledThreadPoolExecutor that does not * execute delayed tasks after shutdown and removes tasks on cancel; @@ -1721,6 +1737,28 @@ public Builder scheduledExecutor(ScheduledExecutorService scheduledExecutor) { return this; } + /** + * Set the {@link ExecutorService} used to make connections. + * The default is a Single Thread Executor + * @param connectExecutor The ExecutorService to make connections with. + * @return the Builder for chaining + */ + public Builder connectExecutor(ExecutorService connectExecutor) { + this.connectExecutor = connectExecutor; + return this; + } + + /** + * Set the {@link ExecutorService} used to make event callbacks with. + * The default is a Single Thread Executor + * @param callbackExecutor The ExecutorService to make event callbacks with. + * @return the Builder for chaining + */ + public Builder callbackExecutor(ExecutorService callbackExecutor) { + this.callbackExecutor = callbackExecutor; + return this; + } + /** * Sets custom thread factory for the executor service * @@ -2108,6 +2146,8 @@ public Builder(Options o) { this.trackAdvancedStats = o.trackAdvancedStats; this.executor = o.executor; this.scheduledExecutor = o.scheduledExecutor; + this.connectExecutor = o.connectExecutor; + this.callbackExecutor = o.callbackExecutor; this.callbackThreadFactory = o.callbackThreadFactory; this.connectThreadFactory = o.connectThreadFactory; this.httpRequestInterceptors = o.httpRequestInterceptors; @@ -2180,6 +2220,8 @@ private Options(Builder b) { this.trackAdvancedStats = b.trackAdvancedStats; this.executor = b.executor; this.scheduledExecutor = b.scheduledExecutor; + this.connectExecutor = b.connectExecutor; + this.callbackExecutor = b.callbackExecutor; this.callbackThreadFactory = b.callbackThreadFactory; this.connectThreadFactory = b.connectThreadFactory; this.httpRequestInterceptors = b.httpRequestInterceptors; @@ -2235,21 +2277,31 @@ private ScheduledExecutorService _getInternalScheduledExecutor() { } /** - * the callback executor, see {@link Builder#callbackThreadFactory(ThreadFactory) callbackThreadFactory()} in the builder doc + * the connect executor, see {@link Builder#connectThreadFactory(ThreadFactory) connectThreadFactory()} in the builder doc * @return the executor */ - public ExecutorService getCallbackExecutor() { - return this.callbackThreadFactory == null ? - DEFAULT_SINGLE_THREAD_EXECUTOR.get() : Executors.newSingleThreadExecutor(this.callbackThreadFactory); + public ExecutorService getConnectExecutor() { + if (connectExecutor != null) { + return connectExecutor; + } + if (connectThreadFactory != null) { + return Executors.newSingleThreadExecutor(this.connectThreadFactory); + } + return DEFAULT_SINGLE_THREAD_EXECUTOR.get(); } /** - * the connect executor, see {@link Builder#connectThreadFactory(ThreadFactory) connectThreadFactory()} in the builder doc + * the callback executor, see {@link Builder#callbackThreadFactory(ThreadFactory) callbackThreadFactory()} in the builder doc * @return the executor */ - public ExecutorService getConnectExecutor() { - return this.connectThreadFactory == null ? - DEFAULT_SINGLE_THREAD_EXECUTOR.get() : Executors.newSingleThreadExecutor(this.connectThreadFactory); + public ExecutorService getCallbackExecutor() { + if (callbackExecutor != null) { + return callbackExecutor; + } + if (callbackThreadFactory != null) { + return Executors.newSingleThreadExecutor(this.callbackThreadFactory); + } + return DEFAULT_SINGLE_THREAD_EXECUTOR.get(); } /** @@ -2269,19 +2321,19 @@ public boolean scheduledExecutorIsInternal() { } /** - * whether the callback executor is the internal one versus a user supplied one + * whether the connect executor is the internal one versus a user supplied one * @return true if the executor is internal */ - public boolean callbackExecutorIsInternal() { - return this.callbackThreadFactory == null; + public boolean connectExecutorIsInternal() { + return this.connectThreadFactory == null && connectExecutor == null; } /** - * whether the connect executor is the internal one versus a user supplied one + * whether the callback executor is the internal one versus a user supplied one * @return true if the executor is internal */ - public boolean connectExecutorIsInternal() { - return this.connectThreadFactory == null; + public boolean callbackExecutorIsInternal() { + return this.callbackThreadFactory == null && callbackExecutor == null; } /**