diff --git a/src/main/java/redis/clients/jedis/MultiNodePipelineBase.java b/src/main/java/redis/clients/jedis/MultiNodePipelineBase.java index fae16a8910..eef6b2a810 100644 --- a/src/main/java/redis/clients/jedis/MultiNodePipelineBase.java +++ b/src/main/java/redis/clients/jedis/MultiNodePipelineBase.java @@ -38,8 +38,6 @@ public abstract class MultiNodePipelineBase extends PipelineBase private final Map connections; private volatile boolean syncing = false; - private final ExecutorService executorService = Executors.newFixedThreadPool(MULTI_NODE_PIPELINE_SYNC_WORKERS); - public MultiNodePipelineBase(CommandObjects commandObjects) { super(commandObjects); pipelinedResponses = new LinkedHashMap<>(); @@ -104,6 +102,8 @@ public final void sync() { } syncing = true; + ExecutorService executorService = Executors.newFixedThreadPool(MULTI_NODE_PIPELINE_SYNC_WORKERS); + CountDownLatch countDownLatch = new CountDownLatch(pipelinedResponses.size()); Iterator>>> pipelinedResponsesIterator = pipelinedResponses.entrySet().iterator(); @@ -136,6 +136,8 @@ public final void sync() { log.error("Thread is interrupted during sync.", e); } + executorService.shutdownNow(); + syncing = false; } diff --git a/src/test/java/redis/clients/jedis/ClusterPipeliningTest.java b/src/test/java/redis/clients/jedis/ClusterPipeliningTest.java index 05cd56c2a8..268728b8aa 100644 --- a/src/test/java/redis/clients/jedis/ClusterPipeliningTest.java +++ b/src/test/java/redis/clients/jedis/ClusterPipeliningTest.java @@ -1061,4 +1061,40 @@ public void transaction() { assertThrows(UnsupportedOperationException.class, () -> cluster.multi()); } } + + @Test(timeout = 10_000L) + public void multiple() { + final int maxTotal = 100; + ConnectionPoolConfig poolConfig = new ConnectionPoolConfig(); + poolConfig.setMaxTotal(maxTotal); + try (JedisCluster cluster = new JedisCluster(nodes, DEFAULT_CLIENT_CONFIG, 5, poolConfig)) { + for (int i = 0; i < maxTotal; i++) { + assertThreadsCount(); + String s = Integer.toString(i); + try (ClusterPipeline pipeline = cluster.pipelined()) { + pipeline.set(s, s); + pipeline.sync(); + } + assertThreadsCount(); + } + } + } + + private static void assertThreadsCount() { + // Get the root thread group + final ThreadGroup rootGroup = Thread.currentThread().getThreadGroup().getParent(); + + // Create a buffer to store the thread information + final Thread[] threads = new Thread[rootGroup.activeCount()]; + + // Enumerate all threads into the buffer + rootGroup.enumerate(threads); + + // Assert information about threads + final int count = (int) Arrays.stream(threads) + .filter(thread -> thread != null && thread.getName() != null + && thread.getName().startsWith("pool-")) + .count(); + assertTrue(count < 9); + } }