From 4916bd304e5d2267e715e8cd23a7c1559b0373de Mon Sep 17 00:00:00 2001 From: M Sazzadul Hoque <7600764+sazzad16@users.noreply.github.com> Date: Wed, 14 Jun 2023 13:47:00 +0600 Subject: [PATCH] Shutdown ExecutorService(s) in multi node pipelines (#3467) * Shutdown ExecutorService(s) in multi node pipelines * Use only shutdownNow() * format import --- .../clients/jedis/MultiNodePipelineBase.java | 6 ++-- .../clients/jedis/ClusterPipeliningTest.java | 36 +++++++++++++++++++ 2 files changed, 40 insertions(+), 2 deletions(-) diff --git a/src/main/java/redis/clients/jedis/MultiNodePipelineBase.java b/src/main/java/redis/clients/jedis/MultiNodePipelineBase.java index 7ed97d8ca9..9dc9a6a689 100644 --- a/src/main/java/redis/clients/jedis/MultiNodePipelineBase.java +++ b/src/main/java/redis/clients/jedis/MultiNodePipelineBase.java @@ -58,8 +58,6 @@ public abstract class MultiNodePipelineBase implements PipelineCommands, Pipelin private final CommandObjects commandObjects; private GraphCommandObjects graphCommandObjects; - private final ExecutorService executorService = Executors.newFixedThreadPool(MULTI_NODE_PIPELINE_SYNC_WORKERS); - public MultiNodePipelineBase(CommandObjects commandObjects) { pipelinedResponses = new LinkedHashMap<>(); connections = new LinkedHashMap<>(); @@ -121,6 +119,8 @@ public final void sync() { } syncing = true; + ExecutorService executorService = Executors.newFixedThreadPool(MULTI_NODE_PIPELINE_SYNC_WORKERS); + CountDownLatch countDownLatch = new CountDownLatch(pipelinedResponses.size()); Iterator>>> pipelinedResponsesIterator = pipelinedResponses.entrySet().iterator(); @@ -153,6 +153,8 @@ public final void sync() { log.error("Thread is interrupted during sync.", e); } + executorService.shutdownNow(); + syncing = false; } diff --git a/src/test/java/redis/clients/jedis/ClusterPipeliningTest.java b/src/test/java/redis/clients/jedis/ClusterPipeliningTest.java index 21c55ad5d3..5e2b600801 100644 --- a/src/test/java/redis/clients/jedis/ClusterPipeliningTest.java +++ b/src/test/java/redis/clients/jedis/ClusterPipeliningTest.java @@ -1051,4 +1051,40 @@ public void transaction() { assertThrows(UnsupportedOperationException.class, () -> cluster.multi()); } } + + @Test(timeout = 10_000L) + public void multiple() { + final int maxTotal = 100; + ConnectionPoolConfig poolConfig = new ConnectionPoolConfig(); + poolConfig.setMaxTotal(maxTotal); + try (JedisCluster cluster = new JedisCluster(nodes, DEFAULT_CLIENT_CONFIG, 5, poolConfig)) { + for (int i = 0; i < maxTotal; i++) { + assertThreadsCount(); + String s = Integer.toString(i); + try (ClusterPipeline pipeline = cluster.pipelined()) { + pipeline.set(s, s); + pipeline.sync(); + } + assertThreadsCount(); + } + } + } + + private static void assertThreadsCount() { + // Get the root thread group + final ThreadGroup rootGroup = Thread.currentThread().getThreadGroup().getParent(); + + // Create a buffer to store the thread information + final Thread[] threads = new Thread[rootGroup.activeCount()]; + + // Enumerate all threads into the buffer + rootGroup.enumerate(threads); + + // Assert information about threads + final int count = (int) Arrays.stream(threads) + .filter(thread -> thread != null && thread.getName() != null + && thread.getName().startsWith("pool-")) + .count(); + assertTrue(count < 9); + } }