Skip to content
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
92 changes: 72 additions & 20 deletions src/main/java/io/nats/client/Options.java
Original file line number Diff line number Diff line change
Expand Up @@ -533,6 +533,16 @@ public class Options {
* {@link Builder#executor(ExecutorService) executor}.
*/
public static final String PROP_SCHEDULED_EXECUTOR_SERVICE_CLASS = "scheduled.executor.service.class";
/**
* Property used to set class name for the Connect Executor Service (executor) class
* {@link Builder#connectExecutor(ExecutorService) connectExecutor}.
*/
public static final String PROP_CONNECT_EXECUTOR_SERVICE_CLASS = "connect.executor.service.class";
/**
* Property used to set class name for the Callback Executor Service (executor) class
* {@link Builder#callbackExecutor(ExecutorService) callbackExecutor}.
*/
public static final String PROP_CALLBACK_EXECUTOR_SERVICE_CLASS = "callback.executor.service.class";
/**
* Property used to set class name for the Connect Thread Factory
* {@link Builder#connectThreadFactory(ThreadFactory) connectThreadFactory}.
Expand Down Expand Up @@ -706,6 +716,8 @@ public class Options {

private final ExecutorService executor;
private final ScheduledExecutorService scheduledExecutor;
private final ExecutorService connectExecutor;
private final ExecutorService callbackExecutor;
private final ThreadFactory connectThreadFactory;
private final ThreadFactory callbackThreadFactory;
private final ServerPool serverPool;
Expand All @@ -717,16 +729,16 @@ public class Options {

static class DefaultThreadFactory implements ThreadFactory {
final String name;
final AtomicInteger threadNo;
final AtomicInteger threadNumber;

public DefaultThreadFactory (String name){
this.name = name;
threadNo = new AtomicInteger(0);
threadNumber = new AtomicInteger(0);
}

public Thread newThread(@NonNull Runnable r) {
String threadName = name+":"+threadNo.incrementAndGet();
Thread t = new Thread(r,threadName);
String threadName = name + ":"+ threadNumber.incrementAndGet();
Thread t = new Thread(r, threadName);
if (t.isDaemon()) {
t.setDaemon(false);
}
Expand Down Expand Up @@ -851,6 +863,8 @@ public static class Builder {
private String dataPortType = DEFAULT_DATA_PORT_TYPE;
private ExecutorService executor;
private ScheduledExecutorService scheduledExecutor;
private ExecutorService connectExecutor;
private ExecutorService callbackExecutor;
private ThreadFactory connectThreadFactory;
private ThreadFactory callbackThreadFactory;
private List<java.util.function.Consumer<HttpRequest>> httpRequestInterceptors;
Expand Down Expand Up @@ -989,6 +1003,8 @@ public Builder properties(Properties props) {
classnameProperty(props, PROP_DISPATCHER_FACTORY_CLASS, o -> this.dispatcherFactory = (DispatcherFactory) o);
classnameProperty(props, PROP_EXECUTOR_SERVICE_CLASS, o -> this.executor = (ExecutorService) o);
classnameProperty(props, PROP_SCHEDULED_EXECUTOR_SERVICE_CLASS, o -> this.scheduledExecutor = (ScheduledExecutorService) o);
classnameProperty(props, PROP_CONNECT_EXECUTOR_SERVICE_CLASS, o -> this.connectExecutor = (ExecutorService) o);
classnameProperty(props, PROP_CALLBACK_EXECUTOR_SERVICE_CLASS, o -> this.callbackExecutor = (ExecutorService) o);
classnameProperty(props, PROP_CONNECT_THREAD_FACTORY_CLASS, o -> this.connectThreadFactory = (ThreadFactory) o);
classnameProperty(props, PROP_CALLBACK_THREAD_FACTORY_CLASS, o -> this.callbackThreadFactory = (ThreadFactory) o);
return this;
Expand Down Expand Up @@ -1692,7 +1708,7 @@ public Builder statisticsCollector(StatisticsCollector collector) {
}

/**
* Set the {@link ExecutorService ExecutorService} used to run threaded tasks. The default is a
* Set the {@link ExecutorService} used to run threaded tasks. The default is a
* cached thread pool that names threads after the connection name (or a default). This executor
* is used for reading and writing the underlying sockets as well as for each Dispatcher.
* The default executor uses a short keepalive time, 500ms, to insure quick shutdowns. This is reasonable
Expand All @@ -1709,7 +1725,7 @@ public Builder executor(ExecutorService executor) {
}

/**
* Set the {@link ScheduledExecutorService ScheduledExecutorService} used to run scheduled task like
* Set the {@link ScheduledExecutorService} used to run scheduled task like
* heartbeat timers
* The default is a ScheduledThreadPoolExecutor that does not
* execute delayed tasks after shutdown and removes tasks on cancel;
Expand All @@ -1721,6 +1737,28 @@ public Builder scheduledExecutor(ScheduledExecutorService scheduledExecutor) {
return this;
}

/**
* Set the {@link ExecutorService} used to make connections.
* The default is a Single Thread Executor
* @param connectExecutor The ExecutorService to make connections with.
* @return the Builder for chaining
*/
public Builder connectExecutor(ExecutorService connectExecutor) {
this.connectExecutor = connectExecutor;
return this;
}

/**
* Set the {@link ExecutorService} used to make event callbacks with.
* The default is a Single Thread Executor
* @param callbackExecutor The ExecutorService to make event callbacks with.
* @return the Builder for chaining
*/
public Builder callbackExecutor(ExecutorService callbackExecutor) {
this.callbackExecutor = callbackExecutor;
return this;
}

/**
* Sets custom thread factory for the executor service
*
Expand Down Expand Up @@ -2108,6 +2146,8 @@ public Builder(Options o) {
this.trackAdvancedStats = o.trackAdvancedStats;
this.executor = o.executor;
this.scheduledExecutor = o.scheduledExecutor;
this.connectExecutor = o.connectExecutor;
this.callbackExecutor = o.callbackExecutor;
this.callbackThreadFactory = o.callbackThreadFactory;
this.connectThreadFactory = o.connectThreadFactory;
this.httpRequestInterceptors = o.httpRequestInterceptors;
Expand Down Expand Up @@ -2180,6 +2220,8 @@ private Options(Builder b) {
this.trackAdvancedStats = b.trackAdvancedStats;
this.executor = b.executor;
this.scheduledExecutor = b.scheduledExecutor;
this.connectExecutor = b.connectExecutor;
this.callbackExecutor = b.callbackExecutor;
this.callbackThreadFactory = b.callbackThreadFactory;
this.connectThreadFactory = b.connectThreadFactory;
this.httpRequestInterceptors = b.httpRequestInterceptors;
Expand Down Expand Up @@ -2235,21 +2277,31 @@ private ScheduledExecutorService _getInternalScheduledExecutor() {
}

/**
* the callback executor, see {@link Builder#callbackThreadFactory(ThreadFactory) callbackThreadFactory()} in the builder doc
* the connect executor, see {@link Builder#connectThreadFactory(ThreadFactory) connectThreadFactory()} in the builder doc
* @return the executor
*/
public ExecutorService getCallbackExecutor() {
return this.callbackThreadFactory == null ?
DEFAULT_SINGLE_THREAD_EXECUTOR.get() : Executors.newSingleThreadExecutor(this.callbackThreadFactory);
public ExecutorService getConnectExecutor() {
if (connectExecutor != null) {
return connectExecutor;
}
if (connectThreadFactory != null) {
return Executors.newSingleThreadExecutor(this.connectThreadFactory);
}
return DEFAULT_SINGLE_THREAD_EXECUTOR.get();
}

/**
* the connect executor, see {@link Builder#connectThreadFactory(ThreadFactory) connectThreadFactory()} in the builder doc
* the callback executor, see {@link Builder#callbackThreadFactory(ThreadFactory) callbackThreadFactory()} in the builder doc
* @return the executor
*/
public ExecutorService getConnectExecutor() {
return this.connectThreadFactory == null ?
DEFAULT_SINGLE_THREAD_EXECUTOR.get() : Executors.newSingleThreadExecutor(this.connectThreadFactory);
public ExecutorService getCallbackExecutor() {
if (callbackExecutor != null) {
return callbackExecutor;
}
if (callbackThreadFactory != null) {
return Executors.newSingleThreadExecutor(this.callbackThreadFactory);
}
return DEFAULT_SINGLE_THREAD_EXECUTOR.get();
}

/**
Expand All @@ -2269,19 +2321,19 @@ public boolean scheduledExecutorIsInternal() {
}

/**
* whether the callback executor is the internal one versus a user supplied one
* whether the connect executor is the internal one versus a user supplied one
* @return true if the executor is internal
*/
public boolean callbackExecutorIsInternal() {
return this.callbackThreadFactory == null;
public boolean connectExecutorIsInternal() {
return this.connectThreadFactory == null && connectExecutor == null;
}

/**
* whether the connect executor is the internal one versus a user supplied one
* whether the callback executor is the internal one versus a user supplied one
* @return true if the executor is internal
*/
public boolean connectExecutorIsInternal() {
return this.connectThreadFactory == null;
public boolean callbackExecutorIsInternal() {
return this.callbackThreadFactory == null && callbackExecutor == null;
}

/**
Expand Down
Loading