diff --git a/src/main/java/redis/clients/jedis/ClusterPipeline.java b/src/main/java/redis/clients/jedis/ClusterPipeline.java index 9564bd167b..404f09ce9b 100644 --- a/src/main/java/redis/clients/jedis/ClusterPipeline.java +++ b/src/main/java/redis/clients/jedis/ClusterPipeline.java @@ -2,6 +2,7 @@ import java.time.Duration; import java.util.Set; + import org.apache.commons.pool2.impl.GenericObjectPoolConfig; import redis.clients.jedis.providers.ClusterConnectionProvider; import redis.clients.jedis.util.IOUtils; @@ -40,6 +41,12 @@ public ClusterPipeline(ClusterConnectionProvider provider, ClusterCommandObjects this.provider = provider; } + public ClusterPipeline(ClusterConnectionProvider provider, ClusterCommandObjects commandObjects, + ClusterPipelineExecutor executorService) { + super(commandObjects, executorService); + this.provider = provider; + } + private static ClusterCommandObjects createClusterCommandObjects(RedisProtocol protocol) { ClusterCommandObjects cco = new ClusterCommandObjects(); if (protocol == RedisProtocol.RESP3) cco.setProtocol(protocol); diff --git a/src/main/java/redis/clients/jedis/ClusterPipelineExecutor.java b/src/main/java/redis/clients/jedis/ClusterPipelineExecutor.java new file mode 100644 index 0000000000..d38fb1d30d --- /dev/null +++ b/src/main/java/redis/clients/jedis/ClusterPipelineExecutor.java @@ -0,0 +1,43 @@ +package redis.clients.jedis; + +import java.util.concurrent.Executor; +import java.util.concurrent.ExecutorService; + +/** + * Executor used for parallel syncing of multinode pipeline when provided in + * {@link DefaultJedisClientConfig.Builder#pipelineExecutorProvider(PipelineExecutorProvider)} + */ +public interface ClusterPipelineExecutor extends Executor, AutoCloseable { + + /** + * To avoid following hte {@link JedisCluster} client lifecycle in shutting down the executor service + * provide your own implementation of this interface to {@link PipelineExecutorProvider} + */ + default void shutdown() {} + + default void close() { + shutdown(); + } + + /** + * Wrap an executor service into a {@link ClusterPipelineExecutor} to allow clients to provide their + * desired implementation of the {@link ExecutorService} to support parallel syncing of {@link MultiNodePipelineBase}. + * + * @param executorService + * @return ClusterPipelineExecutor that will be shutdown alongside the {@link JedisCluster} client. + */ + static ClusterPipelineExecutor from(ExecutorService executorService) { + return new ClusterPipelineExecutor() { + @Override + public void execute(Runnable command) { + executorService.execute(command); + } + + @Override + public void shutdown() { + executorService.shutdown(); + } + }; + } + +} diff --git a/src/main/java/redis/clients/jedis/DefaultJedisClientConfig.java b/src/main/java/redis/clients/jedis/DefaultJedisClientConfig.java index 25a4737ec0..ac0e18bdb2 100644 --- a/src/main/java/redis/clients/jedis/DefaultJedisClientConfig.java +++ b/src/main/java/redis/clients/jedis/DefaultJedisClientConfig.java @@ -33,6 +33,8 @@ public final class DefaultJedisClientConfig implements JedisClientConfig { private final AuthXManager authXManager; + private final PipelineExecutorProvider pipelineExecutorProvider; + private DefaultJedisClientConfig(DefaultJedisClientConfig.Builder builder) { this.redisProtocol = builder.redisProtocol; this.connectionTimeoutMillis = builder.connectionTimeoutMillis; @@ -50,6 +52,7 @@ private DefaultJedisClientConfig(DefaultJedisClientConfig.Builder builder) { this.clientSetInfoConfig = builder.clientSetInfoConfig; this.readOnlyForRedisClusterReplicas = builder.readOnlyForRedisClusterReplicas; this.authXManager = builder.authXManager; + this.pipelineExecutorProvider = builder.pipelineExecutorProvider; } @Override @@ -143,6 +146,11 @@ public boolean isReadOnlyForRedisClusterReplicas() { return readOnlyForRedisClusterReplicas; } + @Override + public PipelineExecutorProvider getPipelineExecutorProvider() { + return pipelineExecutorProvider; + } + public static Builder builder() { return new Builder(); } @@ -175,6 +183,8 @@ public static class Builder { private AuthXManager authXManager = null; + private PipelineExecutorProvider pipelineExecutorProvider = PipelineExecutorProvider.DEFAULT; + private Builder() { } @@ -297,6 +307,11 @@ public Builder authXManager(AuthXManager authXManager) { return this; } + public Builder pipelineExecutorProvider(PipelineExecutorProvider pipelineExecutorProvider) { + this.pipelineExecutorProvider = pipelineExecutorProvider; + return this; + } + public Builder from(JedisClientConfig instance) { this.redisProtocol = instance.getRedisProtocol(); this.connectionTimeoutMillis = instance.getConnectionTimeoutMillis(); @@ -314,6 +329,7 @@ public Builder from(JedisClientConfig instance) { this.clientSetInfoConfig = instance.getClientSetInfoConfig(); this.readOnlyForRedisClusterReplicas = instance.isReadOnlyForRedisClusterReplicas(); this.authXManager = instance.getAuthXManager(); + this.pipelineExecutorProvider = instance.getPipelineExecutorProvider(); return this; } } diff --git a/src/main/java/redis/clients/jedis/JedisClientConfig.java b/src/main/java/redis/clients/jedis/JedisClientConfig.java index ce7fd82de4..1046086de8 100644 --- a/src/main/java/redis/clients/jedis/JedisClientConfig.java +++ b/src/main/java/redis/clients/jedis/JedisClientConfig.java @@ -115,4 +115,15 @@ default boolean isReadOnlyForRedisClusterReplicas() { default ClientSetInfoConfig getClientSetInfoConfig() { return ClientSetInfoConfig.DEFAULT; } + + /** + * If different then DEFAULT this will provide an Executor implementation that will sync/close multi node pipelines + * in parallel. This replaces the deprecated internal usage of new Executor Services for every pipeline, resulting in + * high thread creation rates and impact on latency. + * + * @return PipelineExecutorProvider + */ + default PipelineExecutorProvider getPipelineExecutorProvider() { + return PipelineExecutorProvider.DEFAULT; + } } diff --git a/src/main/java/redis/clients/jedis/JedisCluster.java b/src/main/java/redis/clients/jedis/JedisCluster.java index d4c555230c..4f74ab260c 100644 --- a/src/main/java/redis/clients/jedis/JedisCluster.java +++ b/src/main/java/redis/clients/jedis/JedisCluster.java @@ -13,6 +13,7 @@ import redis.clients.jedis.csc.Cache; import redis.clients.jedis.csc.CacheConfig; import redis.clients.jedis.csc.CacheFactory; +import redis.clients.jedis.util.IOUtils; import redis.clients.jedis.util.JedisClusterCRC16; public class JedisCluster extends UnifiedJedis { @@ -29,6 +30,12 @@ public class JedisCluster extends UnifiedJedis { */ public static final int DEFAULT_MAX_ATTEMPTS = 5; + /** + * Executor used to close MultiNodePipeline in parallel. See {@link JedisClientConfig#getPipelineExecutorProvider()} + * for mor details on configuration. + */ + private ClusterPipelineExecutor clusterPipelineExecutor; + /** * Creates a JedisCluster instance. The provided node is used to make the first contact with the cluster. *

@@ -251,6 +258,8 @@ public JedisCluster(Set clusterNodes, JedisClientConfig clientConfi Duration maxTotalRetriesDuration) { this(new ClusterConnectionProvider(clusterNodes, clientConfig), maxAttempts, maxTotalRetriesDuration, clientConfig.getRedisProtocol()); + clientConfig.getPipelineExecutorProvider() + .getClusteredPipelineExecutor().ifPresent((executor) -> this.clusterPipelineExecutor = executor); } public JedisCluster(Set clusterNodes, JedisClientConfig clientConfig, @@ -268,6 +277,8 @@ public JedisCluster(Set clusterNodes, JedisClientConfig clientConfi Duration maxTotalRetriesDuration, GenericObjectPoolConfig poolConfig) { this(new ClusterConnectionProvider(clusterNodes, clientConfig, poolConfig), maxAttempts, maxTotalRetriesDuration, clientConfig.getRedisProtocol()); + clientConfig.getPipelineExecutorProvider() + .getClusteredPipelineExecutor().ifPresent((executor) -> this.clusterPipelineExecutor = executor); } public JedisCluster(Set clusterNodes, JedisClientConfig clientConfig, @@ -275,6 +286,8 @@ public JedisCluster(Set clusterNodes, JedisClientConfig clientConfi Duration maxTotalRetriesDuration) { this(new ClusterConnectionProvider(clusterNodes, clientConfig, poolConfig, topologyRefreshPeriod), maxAttempts, maxTotalRetriesDuration, clientConfig.getRedisProtocol()); + clientConfig.getPipelineExecutorProvider() + .getClusteredPipelineExecutor().ifPresent((executor) -> this.clusterPipelineExecutor = executor); } // Uses a fetched connection to process protocol. Should be avoided if possible. @@ -334,6 +347,12 @@ private JedisCluster(ClusterConnectionProvider provider, int maxAttempts, Durati super(provider, maxAttempts, maxTotalRetriesDuration, protocol, clientSideCache); } + @Override + public void close() { + super.close(); + IOUtils.closeQuietly(this.clusterPipelineExecutor); + } + /** * Returns all nodes that were configured to connect to in key-value pairs ({@link Map}).
* Key is the HOST:PORT and the value is the connection pool. @@ -376,9 +395,14 @@ public void ssubscribe(BinaryJedisShardedPubSub jedisPubSub, final byte[]... cha @Override public ClusterPipeline pipelined() { - return new ClusterPipeline((ClusterConnectionProvider) provider, (ClusterCommandObjects) commandObjects); + if (clusterPipelineExecutor == null) { + return new ClusterPipeline((ClusterConnectionProvider) provider, (ClusterCommandObjects) commandObjects); + } + return new ClusterPipeline((ClusterConnectionProvider) provider, (ClusterCommandObjects) commandObjects, + clusterPipelineExecutor); } + /** * @param doMulti param * @return nothing diff --git a/src/main/java/redis/clients/jedis/MultiNodePipelineBase.java b/src/main/java/redis/clients/jedis/MultiNodePipelineBase.java index 247069410a..86044a240d 100644 --- a/src/main/java/redis/clients/jedis/MultiNodePipelineBase.java +++ b/src/main/java/redis/clients/jedis/MultiNodePipelineBase.java @@ -1,13 +1,12 @@ package redis.clients.jedis; -import java.util.Iterator; import java.util.LinkedHashMap; import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Queue; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.ExecutorService; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; import java.util.concurrent.Executors; import org.slf4j.Logger; @@ -24,11 +23,17 @@ public abstract class MultiNodePipelineBase extends PipelineBase { * The number of processes for {@code sync()}. If you have enough cores for client (and you have * more than 3 cluster nodes), you may increase this number of workers. * Suggestion: ≤ cluster nodes. + * + * @deprecated Client using this approach are paying the thread creation cost for every pipeline sync. Clients + * should use refer to {@link JedisClientConfig#getPipelineExecutorProvider()} to provide a single Executor for + * gain in performance. */ public static volatile int MULTI_NODE_PIPELINE_SYNC_WORKERS = 3; private final Map>> pipelinedResponses; private final Map connections; + private ClusterPipelineExecutor clusterPipelineExecutor; + private boolean useSharedExecutor = false; private volatile boolean syncing = false; public MultiNodePipelineBase(CommandObjects commandObjects) { @@ -37,6 +42,14 @@ public MultiNodePipelineBase(CommandObjects commandObjects) { connections = new LinkedHashMap<>(); } + public MultiNodePipelineBase(CommandObjects commandObjects, ClusterPipelineExecutor executorService) { + super(commandObjects); + clusterPipelineExecutor = executorService; + useSharedExecutor = clusterPipelineExecutor != null; + pipelinedResponses = new LinkedHashMap<>(); + connections = new LinkedHashMap<>(); + } + protected abstract HostAndPort getNodeKey(CommandArguments args); protected abstract Connection getConnection(HostAndPort nodeKey); @@ -84,44 +97,48 @@ public final void sync() { return; } syncing = true; - - ExecutorService executorService = Executors.newFixedThreadPool(MULTI_NODE_PIPELINE_SYNC_WORKERS); - - CountDownLatch countDownLatch = new CountDownLatch(pipelinedResponses.size()); - Iterator>>> pipelinedResponsesIterator - = pipelinedResponses.entrySet().iterator(); - while (pipelinedResponsesIterator.hasNext()) { - Map.Entry>> entry = pipelinedResponsesIterator.next(); - HostAndPort nodeKey = entry.getKey(); - Queue> queue = entry.getValue(); - Connection connection = connections.get(nodeKey); - executorService.submit(() -> { - try { - List unformatted = connection.getMany(queue.size()); - for (Object o : unformatted) { - queue.poll().set(o); - } - } catch (JedisConnectionException jce) { - log.error("Error with connection to " + nodeKey, jce); - // cleanup the connection - pipelinedResponsesIterator.remove(); - connections.remove(nodeKey); - IOUtils.closeQuietly(connection); - } finally { - countDownLatch.countDown(); - } - }); - } - + ClusterPipelineExecutor executorService = getExecutorService(); + CompletableFuture[] futures + = pipelinedResponses.entrySet().stream() + .map(response -> CompletableFuture.runAsync(() -> readCommandResponse(response), executorService)) + .toArray(CompletableFuture[]::new); + CompletableFuture awaitAllCompleted = CompletableFuture.allOf(futures); try { - countDownLatch.await(); + awaitAllCompleted.get(); + if (!useSharedExecutor) { + executorService.shutdown(); + } + } catch (ExecutionException e) { + log.error("Failed execution.", e); } catch (InterruptedException e) { log.error("Thread is interrupted during sync.", e); + Thread.currentThread().interrupt(); } + syncing = false; + } - executorService.shutdownNow(); + private ClusterPipelineExecutor getExecutorService() { + if (useSharedExecutor) { + return clusterPipelineExecutor; + } + return ClusterPipelineExecutor.from( + Executors.newFixedThreadPool(Math.min(this.pipelinedResponses.size(), MULTI_NODE_PIPELINE_SYNC_WORKERS))); + } - syncing = false; + private void readCommandResponse(Map.Entry>> entry) { + HostAndPort nodeKey = entry.getKey(); + Queue> queue = entry.getValue(); + Connection connection = connections.get(nodeKey); + try { + List unformatted = connection.getMany(queue.size()); + for (Object o : unformatted) { + queue.poll().set(o); + } + } catch (JedisConnectionException jce) { + log.error("Error with connection to " + nodeKey, jce); + connections.remove(nodeKey); + IOUtils.closeQuietly(connection); + } } @Deprecated diff --git a/src/main/java/redis/clients/jedis/PipelineExecutorProvider.java b/src/main/java/redis/clients/jedis/PipelineExecutorProvider.java new file mode 100644 index 0000000000..2ee4733e23 --- /dev/null +++ b/src/main/java/redis/clients/jedis/PipelineExecutorProvider.java @@ -0,0 +1,44 @@ +package redis.clients.jedis; + +import java.util.Optional; +import java.util.concurrent.Executors; + +/** + * This provides a {@link ClusterPipelineExecutor} used for parallel syncing of {@link MultiNodePipelineBase} + */ +public class PipelineExecutorProvider { + + static final PipelineExecutorProvider DEFAULT = new PipelineExecutorProvider(); + + private ClusterPipelineExecutor clusterPipelineExecutor; + + /** + * Default constructor providing an empty {@link Optional} of {@link ClusterPipelineExecutor} + */ + private PipelineExecutorProvider() {} + + /** + * Will provide a {@link ClusterPipelineExecutor} with the specified number of thread. The number of thread + * should be equal or higher than the number of master nodes in the cluster. + * + * @param threadCount + */ + public PipelineExecutorProvider(int threadCount) { + this.clusterPipelineExecutor = ClusterPipelineExecutor.from(Executors.newFixedThreadPool(threadCount));; + } + + /** + * Allow clients to provide their own implementation of {@link ClusterPipelineExecutor} + * @param clusterPipelineExecutor + */ + public PipelineExecutorProvider(ClusterPipelineExecutor clusterPipelineExecutor) { + this.clusterPipelineExecutor = clusterPipelineExecutor; + } + + /** + * @return an empty option by default, otherwise will return the configured value. + */ + Optional getClusteredPipelineExecutor() { + return Optional.ofNullable(clusterPipelineExecutor); + } +} diff --git a/src/test/java/redis/clients/jedis/ClusterPipeliningTest.java b/src/test/java/redis/clients/jedis/ClusterPipeliningTest.java index e186702e81..c4273f3f2f 100644 --- a/src/test/java/redis/clients/jedis/ClusterPipeliningTest.java +++ b/src/test/java/redis/clients/jedis/ClusterPipeliningTest.java @@ -6,6 +6,9 @@ import static redis.clients.jedis.Protocol.CLUSTER_HASHSLOTS; import java.util.*; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.ThreadPoolExecutor; import org.hamcrest.MatcherAssert; import org.hamcrest.Matchers; @@ -1081,6 +1084,32 @@ public void transaction() { } } + @Test(timeout = 10_000L) + public void pipelineMergingWithExecutorService() { + final int maxTotal = 100; + ExecutorService executorService = Executors.newFixedThreadPool(10); + PipelineExecutorProvider pipelineExecutorProvider + = new PipelineExecutorProvider(ClusterPipelineExecutor.from(executorService)); + JedisClientConfig jedisClientConfig = DefaultJedisClientConfig.builder() + .pipelineExecutorProvider(pipelineExecutorProvider) + .password("cluster").build(); + ConnectionPoolConfig poolConfig = new ConnectionPoolConfig(); + poolConfig.setMaxTotal(maxTotal); + try (JedisCluster cluster = new JedisCluster(nodes, jedisClientConfig, 5, poolConfig)) { + ClusterPipeline pipeline = cluster.pipelined(); + for (int i = 0; i < maxTotal; i++) { + String s = Integer.toString(i); + pipeline.set(s, s); + } + pipeline.close(); + // The sync results in one pipeline per node needing closing. + assertEquals(nodes.size(), ((ThreadPoolExecutor) executorService).getTaskCount()); + assertFalse(executorService.isShutdown()); + } finally { + executorService.shutdown(); + } + } + @Test(timeout = 10_000L) public void multiple() { final int maxTotal = 100;