From c196d7afad326c2c411faadc27b4e4ed7b84fddb Mon Sep 17 00:00:00 2001 From: voidmain Date: Fri, 17 Jan 2025 13:28:42 -0800 Subject: [PATCH 1/2] Adds ExecutorService to ClusterPipeline for improved thread management This allows passing an ExecutorService when creating a ClusterPipeline. The previous parallelization approach for pipeline syncing/closing would create a new executor service for each sync operation, resulting in excessive thread creation and termination. On an EC2 m5.12xlarge instance with ~100k single writes/sec, this thread creation consumed 40% CPU and increased operation latency. The change also optimizes thread usage when no ExecutorService is provided. Previously, even a single pipeline within a multipipeline would create 3 threads for syncing. This improvement removes that overhead, though callers are encouraged to provide their own ExecutorService for optimal CPU usage and latency. --- .../redis/clients/jedis/ClusterPipeline.java | 8 ++ .../redis/clients/jedis/JedisCluster.java | 5 ++ .../clients/jedis/MultiNodePipelineBase.java | 79 +++++++++++-------- .../clients/jedis/ClusterPipeliningTest.java | 24 ++++++ 4 files changed, 82 insertions(+), 34 deletions(-) diff --git a/src/main/java/redis/clients/jedis/ClusterPipeline.java b/src/main/java/redis/clients/jedis/ClusterPipeline.java index a122b41688..74bcae17b6 100644 --- a/src/main/java/redis/clients/jedis/ClusterPipeline.java +++ b/src/main/java/redis/clients/jedis/ClusterPipeline.java @@ -2,6 +2,8 @@ import java.time.Duration; import java.util.Set; +import java.util.concurrent.ExecutorService; + import org.apache.commons.pool2.impl.GenericObjectPoolConfig; import redis.clients.jedis.providers.ClusterConnectionProvider; import redis.clients.jedis.util.IOUtils; @@ -40,6 +42,12 @@ public ClusterPipeline(ClusterConnectionProvider provider, ClusterCommandObjects this.provider = provider; } + public ClusterPipeline(ClusterConnectionProvider provider, ClusterCommandObjects commandObjects, + ExecutorService executorService) { + super(commandObjects, executorService); + this.provider = provider; + } + private static ClusterCommandObjects createClusterCommandObjects(RedisProtocol protocol) { ClusterCommandObjects cco = new ClusterCommandObjects(); if (protocol == RedisProtocol.RESP3) cco.setProtocol(protocol); diff --git a/src/main/java/redis/clients/jedis/JedisCluster.java b/src/main/java/redis/clients/jedis/JedisCluster.java index d4c555230c..f518303a01 100644 --- a/src/main/java/redis/clients/jedis/JedisCluster.java +++ b/src/main/java/redis/clients/jedis/JedisCluster.java @@ -4,6 +4,7 @@ import java.util.Collections; import java.util.Map; import java.util.Set; +import java.util.concurrent.ExecutorService; import org.apache.commons.pool2.impl.GenericObjectPoolConfig; @@ -379,6 +380,10 @@ public ClusterPipeline pipelined() { return new ClusterPipeline((ClusterConnectionProvider) provider, (ClusterCommandObjects) commandObjects); } + public ClusterPipeline pipelined(ExecutorService executorService) { + return new ClusterPipeline((ClusterConnectionProvider) provider, (ClusterCommandObjects) commandObjects, executorService); + } + /** * @param doMulti param * @return nothing diff --git a/src/main/java/redis/clients/jedis/MultiNodePipelineBase.java b/src/main/java/redis/clients/jedis/MultiNodePipelineBase.java index 13f2730ab4..83f426478d 100644 --- a/src/main/java/redis/clients/jedis/MultiNodePipelineBase.java +++ b/src/main/java/redis/clients/jedis/MultiNodePipelineBase.java @@ -1,12 +1,12 @@ package redis.clients.jedis; -import java.util.Iterator; import java.util.LinkedHashMap; import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Queue; -import java.util.concurrent.CountDownLatch; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @@ -31,6 +31,7 @@ public abstract class MultiNodePipelineBase extends PipelineBase { private final Map>> pipelinedResponses; private final Map connections; + private ExecutorService executorService; private volatile boolean syncing = false; public MultiNodePipelineBase(CommandObjects commandObjects) { @@ -39,6 +40,13 @@ public MultiNodePipelineBase(CommandObjects commandObjects) { connections = new LinkedHashMap<>(); } + public MultiNodePipelineBase(CommandObjects commandObjects, ExecutorService executorService) { + super(commandObjects); + this.executorService = executorService; + pipelinedResponses = new LinkedHashMap<>(); + connections = new LinkedHashMap<>(); + } + /** * Sub-classes must call this method, if graph commands are going to be used. * @param connectionProvider connection provider @@ -96,44 +104,47 @@ public final void sync() { return; } syncing = true; - - ExecutorService executorService = Executors.newFixedThreadPool(MULTI_NODE_PIPELINE_SYNC_WORKERS); - - CountDownLatch countDownLatch = new CountDownLatch(pipelinedResponses.size()); - Iterator>>> pipelinedResponsesIterator - = pipelinedResponses.entrySet().iterator(); - while (pipelinedResponsesIterator.hasNext()) { - Map.Entry>> entry = pipelinedResponsesIterator.next(); - HostAndPort nodeKey = entry.getKey(); - Queue> queue = entry.getValue(); - Connection connection = connections.get(nodeKey); - executorService.submit(() -> { - try { - List unformatted = connection.getMany(queue.size()); - for (Object o : unformatted) { - queue.poll().set(o); - } - } catch (JedisConnectionException jce) { - log.error("Error with connection to " + nodeKey, jce); - // cleanup the connection - pipelinedResponsesIterator.remove(); - connections.remove(nodeKey); - IOUtils.closeQuietly(connection); - } finally { - countDownLatch.countDown(); - } - }); - } - + ExecutorService executorService = getExecutorService(); + CompletableFuture[] futures + = pipelinedResponses.entrySet().stream() + .map(e -> CompletableFuture.runAsync(() -> closeConnection(e), executorService)) + .toArray(CompletableFuture[]::new); + CompletableFuture awaitAllCompleted = CompletableFuture.allOf(futures); try { - countDownLatch.await(); + awaitAllCompleted.get(); + if (executorService != this.executorService) { + executorService.shutdown(); + } + } catch (ExecutionException e) { + log.error("Failed execution.", e); } catch (InterruptedException e) { log.error("Thread is interrupted during sync.", e); + Thread.currentThread().interrupt(); } + syncing = false; + } - executorService.shutdownNow(); + private ExecutorService getExecutorService() { + if (executorService == null) { + return Executors.newFixedThreadPool(Math.min(this.pipelinedResponses.size(), MULTI_NODE_PIPELINE_SYNC_WORKERS)); + } + return executorService; + } - syncing = false; + private void closeConnection(Map.Entry>> entry) { + HostAndPort nodeKey = entry.getKey(); + Queue> queue = entry.getValue(); + Connection connection = connections.get(nodeKey); + try { + List unformatted = connection.getMany(queue.size()); + for (Object o : unformatted) { + queue.poll().set(o); + } + } catch (JedisConnectionException jce) { + log.error("Error with connection to " + nodeKey, jce); + connections.remove(nodeKey); + IOUtils.closeQuietly(connection); + } } @Deprecated diff --git a/src/test/java/redis/clients/jedis/ClusterPipeliningTest.java b/src/test/java/redis/clients/jedis/ClusterPipeliningTest.java index e186702e81..5e328342f5 100644 --- a/src/test/java/redis/clients/jedis/ClusterPipeliningTest.java +++ b/src/test/java/redis/clients/jedis/ClusterPipeliningTest.java @@ -6,6 +6,9 @@ import static redis.clients.jedis.Protocol.CLUSTER_HASHSLOTS; import java.util.*; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.ThreadPoolExecutor; import org.hamcrest.MatcherAssert; import org.hamcrest.Matchers; @@ -1081,6 +1084,27 @@ public void transaction() { } } + @Test(timeout = 10_000L) + public void pipelineMergingWithExecutorService() { + final int maxTotal = 100; + ConnectionPoolConfig poolConfig = new ConnectionPoolConfig(); + poolConfig.setMaxTotal(maxTotal); + ThreadPoolExecutor executorService = (ThreadPoolExecutor) Executors.newFixedThreadPool(10); + try (JedisCluster cluster = new JedisCluster(nodes, DEFAULT_CLIENT_CONFIG, 5, poolConfig)) { + ClusterPipeline pipeline = cluster.pipelined(executorService); + for (int i = 0; i < maxTotal; i++) { + String s = Integer.toString(i); + pipeline.set(s, s); + } + pipeline.close(); + // The sync results in one pipeline per node needing closing. + assertEquals(nodes.size(), executorService.getTaskCount()); + assertFalse(executorService.isShutdown()); + } finally { + executorService.shutdown(); + } + } + @Test(timeout = 10_000L) public void multiple() { final int maxTotal = 100; From 5d3398f336e3120cde494a76cf7836c2f3242c9a Mon Sep 17 00:00:00 2001 From: Dominique Lavoie Date: Sat, 22 Feb 2025 15:15:30 -0800 Subject: [PATCH 2/2] Moves MutltiNodePipeline Executor to JedisClientConfig This allow the configuration of ClusterPipelineExecutor to sync pipeline in parallel. The default implementation remain problematic. This new approach will allow clients to address the performance issue fo the default approach. --- .../redis/clients/jedis/ClusterPipeline.java | 3 +- .../jedis/ClusterPipelineExecutor.java | 43 ++++++++++++++++++ .../jedis/DefaultJedisClientConfig.java | 16 +++++++ .../clients/jedis/JedisClientConfig.java | 11 +++++ .../redis/clients/jedis/JedisCluster.java | 29 +++++++++--- .../clients/jedis/MultiNodePipelineBase.java | 41 ++++++++--------- .../jedis/PipelineExecutorProvider.java | 44 +++++++++++++++++++ .../clients/jedis/ClusterPipeliningTest.java | 13 ++++-- 8 files changed, 166 insertions(+), 34 deletions(-) create mode 100644 src/main/java/redis/clients/jedis/ClusterPipelineExecutor.java create mode 100644 src/main/java/redis/clients/jedis/PipelineExecutorProvider.java diff --git a/src/main/java/redis/clients/jedis/ClusterPipeline.java b/src/main/java/redis/clients/jedis/ClusterPipeline.java index e525ad98eb..404f09ce9b 100644 --- a/src/main/java/redis/clients/jedis/ClusterPipeline.java +++ b/src/main/java/redis/clients/jedis/ClusterPipeline.java @@ -2,7 +2,6 @@ import java.time.Duration; import java.util.Set; -import java.util.concurrent.ExecutorService; import org.apache.commons.pool2.impl.GenericObjectPoolConfig; import redis.clients.jedis.providers.ClusterConnectionProvider; @@ -43,7 +42,7 @@ public ClusterPipeline(ClusterConnectionProvider provider, ClusterCommandObjects } public ClusterPipeline(ClusterConnectionProvider provider, ClusterCommandObjects commandObjects, - ExecutorService executorService) { + ClusterPipelineExecutor executorService) { super(commandObjects, executorService); this.provider = provider; } diff --git a/src/main/java/redis/clients/jedis/ClusterPipelineExecutor.java b/src/main/java/redis/clients/jedis/ClusterPipelineExecutor.java new file mode 100644 index 0000000000..d38fb1d30d --- /dev/null +++ b/src/main/java/redis/clients/jedis/ClusterPipelineExecutor.java @@ -0,0 +1,43 @@ +package redis.clients.jedis; + +import java.util.concurrent.Executor; +import java.util.concurrent.ExecutorService; + +/** + * Executor used for parallel syncing of multinode pipeline when provided in + * {@link DefaultJedisClientConfig.Builder#pipelineExecutorProvider(PipelineExecutorProvider)} + */ +public interface ClusterPipelineExecutor extends Executor, AutoCloseable { + + /** + * To avoid following hte {@link JedisCluster} client lifecycle in shutting down the executor service + * provide your own implementation of this interface to {@link PipelineExecutorProvider} + */ + default void shutdown() {} + + default void close() { + shutdown(); + } + + /** + * Wrap an executor service into a {@link ClusterPipelineExecutor} to allow clients to provide their + * desired implementation of the {@link ExecutorService} to support parallel syncing of {@link MultiNodePipelineBase}. + * + * @param executorService + * @return ClusterPipelineExecutor that will be shutdown alongside the {@link JedisCluster} client. + */ + static ClusterPipelineExecutor from(ExecutorService executorService) { + return new ClusterPipelineExecutor() { + @Override + public void execute(Runnable command) { + executorService.execute(command); + } + + @Override + public void shutdown() { + executorService.shutdown(); + } + }; + } + +} diff --git a/src/main/java/redis/clients/jedis/DefaultJedisClientConfig.java b/src/main/java/redis/clients/jedis/DefaultJedisClientConfig.java index 25a4737ec0..ac0e18bdb2 100644 --- a/src/main/java/redis/clients/jedis/DefaultJedisClientConfig.java +++ b/src/main/java/redis/clients/jedis/DefaultJedisClientConfig.java @@ -33,6 +33,8 @@ public final class DefaultJedisClientConfig implements JedisClientConfig { private final AuthXManager authXManager; + private final PipelineExecutorProvider pipelineExecutorProvider; + private DefaultJedisClientConfig(DefaultJedisClientConfig.Builder builder) { this.redisProtocol = builder.redisProtocol; this.connectionTimeoutMillis = builder.connectionTimeoutMillis; @@ -50,6 +52,7 @@ private DefaultJedisClientConfig(DefaultJedisClientConfig.Builder builder) { this.clientSetInfoConfig = builder.clientSetInfoConfig; this.readOnlyForRedisClusterReplicas = builder.readOnlyForRedisClusterReplicas; this.authXManager = builder.authXManager; + this.pipelineExecutorProvider = builder.pipelineExecutorProvider; } @Override @@ -143,6 +146,11 @@ public boolean isReadOnlyForRedisClusterReplicas() { return readOnlyForRedisClusterReplicas; } + @Override + public PipelineExecutorProvider getPipelineExecutorProvider() { + return pipelineExecutorProvider; + } + public static Builder builder() { return new Builder(); } @@ -175,6 +183,8 @@ public static class Builder { private AuthXManager authXManager = null; + private PipelineExecutorProvider pipelineExecutorProvider = PipelineExecutorProvider.DEFAULT; + private Builder() { } @@ -297,6 +307,11 @@ public Builder authXManager(AuthXManager authXManager) { return this; } + public Builder pipelineExecutorProvider(PipelineExecutorProvider pipelineExecutorProvider) { + this.pipelineExecutorProvider = pipelineExecutorProvider; + return this; + } + public Builder from(JedisClientConfig instance) { this.redisProtocol = instance.getRedisProtocol(); this.connectionTimeoutMillis = instance.getConnectionTimeoutMillis(); @@ -314,6 +329,7 @@ public Builder from(JedisClientConfig instance) { this.clientSetInfoConfig = instance.getClientSetInfoConfig(); this.readOnlyForRedisClusterReplicas = instance.isReadOnlyForRedisClusterReplicas(); this.authXManager = instance.getAuthXManager(); + this.pipelineExecutorProvider = instance.getPipelineExecutorProvider(); return this; } } diff --git a/src/main/java/redis/clients/jedis/JedisClientConfig.java b/src/main/java/redis/clients/jedis/JedisClientConfig.java index ce7fd82de4..1046086de8 100644 --- a/src/main/java/redis/clients/jedis/JedisClientConfig.java +++ b/src/main/java/redis/clients/jedis/JedisClientConfig.java @@ -115,4 +115,15 @@ default boolean isReadOnlyForRedisClusterReplicas() { default ClientSetInfoConfig getClientSetInfoConfig() { return ClientSetInfoConfig.DEFAULT; } + + /** + * If different then DEFAULT this will provide an Executor implementation that will sync/close multi node pipelines + * in parallel. This replaces the deprecated internal usage of new Executor Services for every pipeline, resulting in + * high thread creation rates and impact on latency. + * + * @return PipelineExecutorProvider + */ + default PipelineExecutorProvider getPipelineExecutorProvider() { + return PipelineExecutorProvider.DEFAULT; + } } diff --git a/src/main/java/redis/clients/jedis/JedisCluster.java b/src/main/java/redis/clients/jedis/JedisCluster.java index f518303a01..4f74ab260c 100644 --- a/src/main/java/redis/clients/jedis/JedisCluster.java +++ b/src/main/java/redis/clients/jedis/JedisCluster.java @@ -4,7 +4,6 @@ import java.util.Collections; import java.util.Map; import java.util.Set; -import java.util.concurrent.ExecutorService; import org.apache.commons.pool2.impl.GenericObjectPoolConfig; @@ -14,6 +13,7 @@ import redis.clients.jedis.csc.Cache; import redis.clients.jedis.csc.CacheConfig; import redis.clients.jedis.csc.CacheFactory; +import redis.clients.jedis.util.IOUtils; import redis.clients.jedis.util.JedisClusterCRC16; public class JedisCluster extends UnifiedJedis { @@ -30,6 +30,12 @@ public class JedisCluster extends UnifiedJedis { */ public static final int DEFAULT_MAX_ATTEMPTS = 5; + /** + * Executor used to close MultiNodePipeline in parallel. See {@link JedisClientConfig#getPipelineExecutorProvider()} + * for mor details on configuration. + */ + private ClusterPipelineExecutor clusterPipelineExecutor; + /** * Creates a JedisCluster instance. The provided node is used to make the first contact with the cluster. *

@@ -252,6 +258,8 @@ public JedisCluster(Set clusterNodes, JedisClientConfig clientConfi Duration maxTotalRetriesDuration) { this(new ClusterConnectionProvider(clusterNodes, clientConfig), maxAttempts, maxTotalRetriesDuration, clientConfig.getRedisProtocol()); + clientConfig.getPipelineExecutorProvider() + .getClusteredPipelineExecutor().ifPresent((executor) -> this.clusterPipelineExecutor = executor); } public JedisCluster(Set clusterNodes, JedisClientConfig clientConfig, @@ -269,6 +277,8 @@ public JedisCluster(Set clusterNodes, JedisClientConfig clientConfi Duration maxTotalRetriesDuration, GenericObjectPoolConfig poolConfig) { this(new ClusterConnectionProvider(clusterNodes, clientConfig, poolConfig), maxAttempts, maxTotalRetriesDuration, clientConfig.getRedisProtocol()); + clientConfig.getPipelineExecutorProvider() + .getClusteredPipelineExecutor().ifPresent((executor) -> this.clusterPipelineExecutor = executor); } public JedisCluster(Set clusterNodes, JedisClientConfig clientConfig, @@ -276,6 +286,8 @@ public JedisCluster(Set clusterNodes, JedisClientConfig clientConfi Duration maxTotalRetriesDuration) { this(new ClusterConnectionProvider(clusterNodes, clientConfig, poolConfig, topologyRefreshPeriod), maxAttempts, maxTotalRetriesDuration, clientConfig.getRedisProtocol()); + clientConfig.getPipelineExecutorProvider() + .getClusteredPipelineExecutor().ifPresent((executor) -> this.clusterPipelineExecutor = executor); } // Uses a fetched connection to process protocol. Should be avoided if possible. @@ -335,6 +347,12 @@ private JedisCluster(ClusterConnectionProvider provider, int maxAttempts, Durati super(provider, maxAttempts, maxTotalRetriesDuration, protocol, clientSideCache); } + @Override + public void close() { + super.close(); + IOUtils.closeQuietly(this.clusterPipelineExecutor); + } + /** * Returns all nodes that were configured to connect to in key-value pairs ({@link Map}).
* Key is the HOST:PORT and the value is the connection pool. @@ -377,12 +395,13 @@ public void ssubscribe(BinaryJedisShardedPubSub jedisPubSub, final byte[]... cha @Override public ClusterPipeline pipelined() { - return new ClusterPipeline((ClusterConnectionProvider) provider, (ClusterCommandObjects) commandObjects); + if (clusterPipelineExecutor == null) { + return new ClusterPipeline((ClusterConnectionProvider) provider, (ClusterCommandObjects) commandObjects); + } + return new ClusterPipeline((ClusterConnectionProvider) provider, (ClusterCommandObjects) commandObjects, + clusterPipelineExecutor); } - public ClusterPipeline pipelined(ExecutorService executorService) { - return new ClusterPipeline((ClusterConnectionProvider) provider, (ClusterCommandObjects) commandObjects, executorService); - } /** * @param doMulti param diff --git a/src/main/java/redis/clients/jedis/MultiNodePipelineBase.java b/src/main/java/redis/clients/jedis/MultiNodePipelineBase.java index 18e002ab4d..86044a240d 100644 --- a/src/main/java/redis/clients/jedis/MultiNodePipelineBase.java +++ b/src/main/java/redis/clients/jedis/MultiNodePipelineBase.java @@ -7,7 +7,6 @@ import java.util.Queue; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; -import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import org.slf4j.Logger; @@ -24,12 +23,17 @@ public abstract class MultiNodePipelineBase extends PipelineBase { * The number of processes for {@code sync()}. If you have enough cores for client (and you have * more than 3 cluster nodes), you may increase this number of workers. * Suggestion: ≤ cluster nodes. + * + * @deprecated Client using this approach are paying the thread creation cost for every pipeline sync. Clients + * should use refer to {@link JedisClientConfig#getPipelineExecutorProvider()} to provide a single Executor for + * gain in performance. */ public static volatile int MULTI_NODE_PIPELINE_SYNC_WORKERS = 3; private final Map>> pipelinedResponses; private final Map connections; - private ExecutorService executorService; + private ClusterPipelineExecutor clusterPipelineExecutor; + private boolean useSharedExecutor = false; private volatile boolean syncing = false; public MultiNodePipelineBase(CommandObjects commandObjects) { @@ -38,24 +42,14 @@ public MultiNodePipelineBase(CommandObjects commandObjects) { connections = new LinkedHashMap<>(); } - - public MultiNodePipelineBase(CommandObjects commandObjects, ExecutorService executorService) { + public MultiNodePipelineBase(CommandObjects commandObjects, ClusterPipelineExecutor executorService) { super(commandObjects); - this.executorService = executorService; + clusterPipelineExecutor = executorService; + useSharedExecutor = clusterPipelineExecutor != null; pipelinedResponses = new LinkedHashMap<>(); connections = new LinkedHashMap<>(); } - /** - * Sub-classes must call this method, if graph commands are going to be used. - * @param connectionProvider connection provider - */ - protected final void prepareGraphCommands(ConnectionProvider connectionProvider) { - GraphCommandObjects graphCommandObjects = new GraphCommandObjects(connectionProvider); - graphCommandObjects.setBaseCommandArgumentsCreator((comm) -> this.commandObjects.commandArguments(comm)); - super.setGraphCommands(graphCommandObjects); - } - protected abstract HostAndPort getNodeKey(CommandArguments args); protected abstract Connection getConnection(HostAndPort nodeKey); @@ -103,15 +97,15 @@ public final void sync() { return; } syncing = true; - ExecutorService executorService = getExecutorService(); + ClusterPipelineExecutor executorService = getExecutorService(); CompletableFuture[] futures = pipelinedResponses.entrySet().stream() - .map(e -> CompletableFuture.runAsync(() -> closeConnection(e), executorService)) + .map(response -> CompletableFuture.runAsync(() -> readCommandResponse(response), executorService)) .toArray(CompletableFuture[]::new); CompletableFuture awaitAllCompleted = CompletableFuture.allOf(futures); try { awaitAllCompleted.get(); - if (executorService != this.executorService) { + if (!useSharedExecutor) { executorService.shutdown(); } } catch (ExecutionException e) { @@ -123,14 +117,15 @@ public final void sync() { syncing = false; } - private ExecutorService getExecutorService() { - if (executorService == null) { - return Executors.newFixedThreadPool(Math.min(this.pipelinedResponses.size(), MULTI_NODE_PIPELINE_SYNC_WORKERS)); + private ClusterPipelineExecutor getExecutorService() { + if (useSharedExecutor) { + return clusterPipelineExecutor; } - return executorService; + return ClusterPipelineExecutor.from( + Executors.newFixedThreadPool(Math.min(this.pipelinedResponses.size(), MULTI_NODE_PIPELINE_SYNC_WORKERS))); } - private void closeConnection(Map.Entry>> entry) { + private void readCommandResponse(Map.Entry>> entry) { HostAndPort nodeKey = entry.getKey(); Queue> queue = entry.getValue(); Connection connection = connections.get(nodeKey); diff --git a/src/main/java/redis/clients/jedis/PipelineExecutorProvider.java b/src/main/java/redis/clients/jedis/PipelineExecutorProvider.java new file mode 100644 index 0000000000..2ee4733e23 --- /dev/null +++ b/src/main/java/redis/clients/jedis/PipelineExecutorProvider.java @@ -0,0 +1,44 @@ +package redis.clients.jedis; + +import java.util.Optional; +import java.util.concurrent.Executors; + +/** + * This provides a {@link ClusterPipelineExecutor} used for parallel syncing of {@link MultiNodePipelineBase} + */ +public class PipelineExecutorProvider { + + static final PipelineExecutorProvider DEFAULT = new PipelineExecutorProvider(); + + private ClusterPipelineExecutor clusterPipelineExecutor; + + /** + * Default constructor providing an empty {@link Optional} of {@link ClusterPipelineExecutor} + */ + private PipelineExecutorProvider() {} + + /** + * Will provide a {@link ClusterPipelineExecutor} with the specified number of thread. The number of thread + * should be equal or higher than the number of master nodes in the cluster. + * + * @param threadCount + */ + public PipelineExecutorProvider(int threadCount) { + this.clusterPipelineExecutor = ClusterPipelineExecutor.from(Executors.newFixedThreadPool(threadCount));; + } + + /** + * Allow clients to provide their own implementation of {@link ClusterPipelineExecutor} + * @param clusterPipelineExecutor + */ + public PipelineExecutorProvider(ClusterPipelineExecutor clusterPipelineExecutor) { + this.clusterPipelineExecutor = clusterPipelineExecutor; + } + + /** + * @return an empty option by default, otherwise will return the configured value. + */ + Optional getClusteredPipelineExecutor() { + return Optional.ofNullable(clusterPipelineExecutor); + } +} diff --git a/src/test/java/redis/clients/jedis/ClusterPipeliningTest.java b/src/test/java/redis/clients/jedis/ClusterPipeliningTest.java index 5e328342f5..c4273f3f2f 100644 --- a/src/test/java/redis/clients/jedis/ClusterPipeliningTest.java +++ b/src/test/java/redis/clients/jedis/ClusterPipeliningTest.java @@ -1087,18 +1087,23 @@ public void transaction() { @Test(timeout = 10_000L) public void pipelineMergingWithExecutorService() { final int maxTotal = 100; + ExecutorService executorService = Executors.newFixedThreadPool(10); + PipelineExecutorProvider pipelineExecutorProvider + = new PipelineExecutorProvider(ClusterPipelineExecutor.from(executorService)); + JedisClientConfig jedisClientConfig = DefaultJedisClientConfig.builder() + .pipelineExecutorProvider(pipelineExecutorProvider) + .password("cluster").build(); ConnectionPoolConfig poolConfig = new ConnectionPoolConfig(); poolConfig.setMaxTotal(maxTotal); - ThreadPoolExecutor executorService = (ThreadPoolExecutor) Executors.newFixedThreadPool(10); - try (JedisCluster cluster = new JedisCluster(nodes, DEFAULT_CLIENT_CONFIG, 5, poolConfig)) { - ClusterPipeline pipeline = cluster.pipelined(executorService); + try (JedisCluster cluster = new JedisCluster(nodes, jedisClientConfig, 5, poolConfig)) { + ClusterPipeline pipeline = cluster.pipelined(); for (int i = 0; i < maxTotal; i++) { String s = Integer.toString(i); pipeline.set(s, s); } pipeline.close(); // The sync results in one pipeline per node needing closing. - assertEquals(nodes.size(), executorService.getTaskCount()); + assertEquals(nodes.size(), ((ThreadPoolExecutor) executorService).getTaskCount()); assertFalse(executorService.isShutdown()); } finally { executorService.shutdown();