From 4a8b9e7bd0a6e3c3bc1b23ce7ed0133b229dbf93 Mon Sep 17 00:00:00 2001 From: ren ran <35724820+stillerrr@users.noreply.github.com> Date: Wed, 22 Mar 2023 21:47:38 +0800 Subject: [PATCH] Improve safety of MultiNodePipelineBase under multi-thread (#3330) * Make sure MultiNodePipelineBase is safe under multi-thread * Edit log message --------- Co-authored-by: Stiller Co-authored-by: M Sazzadul Hoque <7600764+sazzad16@users.noreply.github.com> --- .../clients/jedis/MultiNodePipelineBase.java | 16 +++++++++++----- 1 file changed, 11 insertions(+), 5 deletions(-) diff --git a/src/main/java/redis/clients/jedis/MultiNodePipelineBase.java b/src/main/java/redis/clients/jedis/MultiNodePipelineBase.java index dfada93da0..8f9f90fae9 100644 --- a/src/main/java/redis/clients/jedis/MultiNodePipelineBase.java +++ b/src/main/java/redis/clients/jedis/MultiNodePipelineBase.java @@ -73,10 +73,16 @@ protected final Response appendCommand(CommandObject commandObject) { queue = pipelinedResponses.get(nodeKey); connection = connections.get(nodeKey); } else { - queue = new LinkedList<>(); - connection = getConnection(nodeKey); - pipelinedResponses.put(nodeKey, queue); - connections.put(nodeKey, connection); + pipelinedResponses.putIfAbsent(nodeKey, new LinkedList<>()); + queue = pipelinedResponses.get(nodeKey); + + Connection newOne = getConnection(nodeKey); + connections.putIfAbsent(nodeKey, newOne); + connection = connections.get(nodeKey); + if (connection != newOne) { + log.debug("Duplicate connection to {}, closing it.", nodeKey); + IOUtils.closeQuietly(newOne); + } } connection.sendCommand(commandObject.getArguments()); @@ -1005,7 +1011,7 @@ public Response zscore(String key, String member) { } @Override - public Response> zmscore(String key, String... members) { + public Response> zmscore(String key, String... members) { return appendCommand(commandObjects.zmscore(key, members)); }