diff --git a/src/main/java/redis/clients/jedis/ClusterPipeline.java b/src/main/java/redis/clients/jedis/ClusterPipeline.java index e70db767f96..e7396b7e46b 100644 --- a/src/main/java/redis/clients/jedis/ClusterPipeline.java +++ b/src/main/java/redis/clients/jedis/ClusterPipeline.java @@ -1,5 +1,6 @@ package redis.clients.jedis; +import java.time.Duration; import java.util.Set; import org.apache.commons.pool2.impl.GenericObjectPoolConfig; import redis.clients.jedis.providers.ClusterConnectionProvider; @@ -23,6 +24,14 @@ public ClusterPipeline(Set clusterNodes, JedisClientConfig clientCo this.closeable = this.provider; } + public ClusterPipeline(Set clusterNodes, JedisClientConfig clientConfig, + GenericObjectPoolConfig poolConfig, boolean topologyRefreshEnabled, Duration topologyRefreshPeriod) { + this(new ClusterConnectionProvider(clusterNodes, clientConfig, poolConfig, + topologyRefreshEnabled, topologyRefreshPeriod), + createClusterCommandObjects(clientConfig.getRedisProtocol())); + this.closeable = this.provider; + } + public ClusterPipeline(ClusterConnectionProvider provider) { this(provider, new ClusterCommandObjects()); } diff --git a/src/main/java/redis/clients/jedis/JedisCluster.java b/src/main/java/redis/clients/jedis/JedisCluster.java index e9bb606191a..8f79ccb810f 100644 --- a/src/main/java/redis/clients/jedis/JedisCluster.java +++ b/src/main/java/redis/clients/jedis/JedisCluster.java @@ -195,6 +195,13 @@ public JedisCluster(Set clusterNodes, JedisClientConfig clientConfi super(clusterNodes, clientConfig, maxAttempts, maxTotalRetriesDuration); } + public JedisCluster(Set clusterNodes, JedisClientConfig clientConfig, + GenericObjectPoolConfig poolConfig, int maxAttempts, Duration maxTotalRetriesDuration, + boolean topologyRefreshEnabled, Duration topologyRefreshPeriod) { + super(clusterNodes, clientConfig, poolConfig, maxAttempts, maxTotalRetriesDuration, topologyRefreshEnabled, + topologyRefreshPeriod); + } + public JedisCluster(ClusterConnectionProvider provider, int maxAttempts, Duration maxTotalRetriesDuration) { super(provider, maxAttempts, maxTotalRetriesDuration); diff --git a/src/main/java/redis/clients/jedis/JedisClusterInfoCache.java b/src/main/java/redis/clients/jedis/JedisClusterInfoCache.java index a4cc2d1d63e..1ec498ea32c 100644 --- a/src/main/java/redis/clients/jedis/JedisClusterInfoCache.java +++ b/src/main/java/redis/clients/jedis/JedisClusterInfoCache.java @@ -1,5 +1,6 @@ package redis.clients.jedis; +import java.time.Duration; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; @@ -10,16 +11,24 @@ import java.util.Map; import java.util.Map.Entry; import java.util.Set; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; import java.util.concurrent.locks.ReentrantReadWriteLock; import org.apache.commons.pool2.impl.GenericObjectPoolConfig; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import redis.clients.jedis.exceptions.JedisClusterOperationException; import redis.clients.jedis.exceptions.JedisException; import redis.clients.jedis.util.SafeEncoder; public class JedisClusterInfoCache { + private static final Logger logger = LoggerFactory.getLogger(JedisClusterInfoCache.class); + private static final boolean DEFAULT_TOPOLOGY_REFRESH_ENABLED = true; + private static final Duration DEFAULT_TOPOLOGY_REFRESH_PERIOD = Duration.ofSeconds(60); private final Map nodes = new HashMap<>(); private final ConnectionPool[] slots = new ConnectionPool[Protocol.CLUSTER_HASHSLOTS]; @@ -36,15 +45,47 @@ public class JedisClusterInfoCache { private static final int MASTER_NODE_INDEX = 2; + private final boolean topologyRefreshEnabled; + private final Duration topologyRefreshPeriod; + + /** + * The single thread executor for the topology refresh task. + */ + private ScheduledExecutorService topologyRefreshExecutor = null; + + class TopologyRefreshTask implements Runnable { + @Override + public void run() { + logger.debug("Cluster topology refresh run, old nodes: {}", nodes); + renewClusterSlots(null); + logger.debug("Cluster topology refresh run, new nodes: {}", nodes); + } + } + public JedisClusterInfoCache(final JedisClientConfig clientConfig, final Set startNodes) { - this(clientConfig, null, startNodes); + this(clientConfig, null, startNodes, DEFAULT_TOPOLOGY_REFRESH_ENABLED, DEFAULT_TOPOLOGY_REFRESH_PERIOD); } public JedisClusterInfoCache(final JedisClientConfig clientConfig, final GenericObjectPoolConfig poolConfig, final Set startNodes) { + this(clientConfig, poolConfig, startNodes, DEFAULT_TOPOLOGY_REFRESH_ENABLED, DEFAULT_TOPOLOGY_REFRESH_PERIOD); + } + + public JedisClusterInfoCache(final JedisClientConfig clientConfig, + final GenericObjectPoolConfig poolConfig, final Set startNodes, + final boolean topologyRefreshEnabled, final Duration topologyRefreshPeriod) { this.poolConfig = poolConfig; this.clientConfig = clientConfig; this.startNodes = startNodes; + this.topologyRefreshEnabled = topologyRefreshEnabled; + this.topologyRefreshPeriod = topologyRefreshPeriod; + if (topologyRefreshEnabled) { + logger.info("Cluster topology refresh start, period: {}, startNodes: {}", + topologyRefreshPeriod.toString(), startNodes); + topologyRefreshExecutor = Executors.newSingleThreadScheduledExecutor(); + topologyRefreshExecutor.scheduleAtFixedRate(new TopologyRefreshTask(), topologyRefreshPeriod.toMillis(), + topologyRefreshPeriod.toMillis(), TimeUnit.MILLISECONDS); + } } public void discoverClusterNodesAndSlots(Connection jedis) { @@ -308,6 +349,14 @@ public void reset() { } } + public void close() { + reset(); + if (topologyRefreshEnabled && topologyRefreshExecutor != null) { + logger.info("Cluster topology refresh shutdown, startNodes: {}", startNodes); + topologyRefreshExecutor.shutdownNow(); + } + } + public static String getNodeKey(HostAndPort hnp) { //return hnp.getHost() + ":" + hnp.getPort(); return hnp.toString(); diff --git a/src/main/java/redis/clients/jedis/UnifiedJedis.java b/src/main/java/redis/clients/jedis/UnifiedJedis.java index 2184534ca66..f32a6d9973a 100644 --- a/src/main/java/redis/clients/jedis/UnifiedJedis.java +++ b/src/main/java/redis/clients/jedis/UnifiedJedis.java @@ -164,6 +164,15 @@ public UnifiedJedis(Set jedisClusterNodes, JedisClientConfig client if (proto != null) commandObjects.setProtocol(proto); } + public UnifiedJedis(Set jedisClusterNodes, JedisClientConfig clientConfig, + GenericObjectPoolConfig poolConfig, int maxAttempts, Duration maxTotalRetriesDuration, + boolean topologyRefreshEnabled, Duration topologyRefreshPeriod) { + this(new ClusterConnectionProvider(jedisClusterNodes, clientConfig, poolConfig, topologyRefreshEnabled, + topologyRefreshPeriod), maxAttempts, maxTotalRetriesDuration); + RedisProtocol proto = clientConfig.getRedisProtocol(); + if (proto != null) commandObjects.setProtocol(proto); + } + public UnifiedJedis(ClusterConnectionProvider provider, int maxAttempts, Duration maxTotalRetriesDuration) { this.provider = provider; this.executor = new ClusterCommandExecutor(provider, maxAttempts, maxTotalRetriesDuration); diff --git a/src/main/java/redis/clients/jedis/providers/ClusterConnectionProvider.java b/src/main/java/redis/clients/jedis/providers/ClusterConnectionProvider.java index 4c47f2094b8..6550a9410ed 100644 --- a/src/main/java/redis/clients/jedis/providers/ClusterConnectionProvider.java +++ b/src/main/java/redis/clients/jedis/providers/ClusterConnectionProvider.java @@ -1,5 +1,6 @@ package redis.clients.jedis.providers; +import java.time.Duration; import java.util.ArrayList; import java.util.Collections; import java.util.List; @@ -34,6 +35,13 @@ public ClusterConnectionProvider(Set clusterNodes, JedisClientConfi initializeSlotsCache(clusterNodes, clientConfig); } + public ClusterConnectionProvider(Set clusterNodes, JedisClientConfig clientConfig, + GenericObjectPoolConfig poolConfig, boolean topologyRefreshEnabled, Duration topologyRefreshPeriod) { + this.cache = new JedisClusterInfoCache(clientConfig, poolConfig, clusterNodes, topologyRefreshEnabled, + topologyRefreshPeriod); + initializeSlotsCache(clusterNodes, clientConfig); + } + private void initializeSlotsCache(Set startNodes, JedisClientConfig clientConfig) { if (startNodes.isEmpty()) { throw new JedisClusterOperationException("No nodes to initialize cluster slots cache."); @@ -66,7 +74,7 @@ private void initializeSlotsCache(Set startNodes, JedisClientConfig @Override public void close() { - cache.reset(); + cache.close(); } public void renewSlotCache() { diff --git a/src/test/java/redis/clients/jedis/JedisClusterTest.java b/src/test/java/redis/clients/jedis/JedisClusterTest.java index b93fa2409ee..b6ec6698827 100644 --- a/src/test/java/redis/clients/jedis/JedisClusterTest.java +++ b/src/test/java/redis/clients/jedis/JedisClusterTest.java @@ -741,6 +741,66 @@ public void clusterRefreshNodes() throws Exception { } } + @Test + public void clusterPeriodTopologyRefreshTest() throws Exception { + Set jedisClusterNode = new HashSet<>(); + jedisClusterNode.add(nodeInfo1); + jedisClusterNode.add(nodeInfo2); + jedisClusterNode.add(nodeInfo3); + + // we set topologyRefreshPeriod is 5s + boolean topologyRefreshEnabled = true; + Duration topologyRefreshPeriod = Duration.ofSeconds(3); + try (JedisCluster cluster = new JedisCluster(jedisClusterNode, DEFAULT_CLIENT_CONFIG, DEFAULT_POOL_CONFIG, + DEFAULT_REDIRECTIONS, Duration.ofSeconds(1000), topologyRefreshEnabled, topologyRefreshPeriod)) { + assertEquals(3, cluster.getClusterNodes().size()); + cleanUp(); // cleanup and add node4 + + // at first, join node4 to cluster + node1.clusterMeet(LOCAL_IP, nodeInfo2.getPort()); + node1.clusterMeet(LOCAL_IP, nodeInfo3.getPort()); + node1.clusterMeet(LOCAL_IP, nodeInfo4.getPort()); + // split available slots across the three nodes + int slotsPerNode = CLUSTER_HASHSLOTS / 4; + int[] node1Slots = new int[slotsPerNode]; + int[] node2Slots = new int[slotsPerNode]; + int[] node3Slots = new int[slotsPerNode]; + int[] node4Slots = new int[slotsPerNode]; + for (int i = 0, slot1 = 0, slot2 = 0, slot3 = 0, slot4 = 0; i < CLUSTER_HASHSLOTS; i++) { + if (i < slotsPerNode) { + node1Slots[slot1++] = i; + } else if (i >= slotsPerNode && i < slotsPerNode*2) { + node2Slots[slot2++] = i; + } else if (i >= slotsPerNode*2 && i < slotsPerNode*3) { + node3Slots[slot3++] = i; + } else { + node4Slots[slot4++] = i; + } + } + + node1.clusterAddSlots(node1Slots); + node2.clusterAddSlots(node2Slots); + node3.clusterAddSlots(node3Slots); + node4.clusterAddSlots(node4Slots); + JedisClusterTestUtil.waitForClusterReady(node1, node2, node3, node4); + + // Now we just wait topologyRefreshPeriod * 3 (executor will delay) for cluster topology refresh (3 -> 4) + Thread.sleep(topologyRefreshPeriod.toMillis() * 3); + + assertEquals(4, cluster.getClusterNodes().size()); + String nodeKey4 = LOCAL_IP + ":" + nodeInfo4.getPort(); + assertTrue(cluster.getClusterNodes().keySet().contains(nodeKey4)); + + // make 4 nodes to 3 nodes + cleanUp(); + setUp(); + + // Now we just wait topologyRefreshPeriod * 3 (executor will delay) for cluster topology refresh (4 -> 3) + Thread.sleep(topologyRefreshPeriod.toMillis() * 3); + assertEquals(3, cluster.getClusterNodes().size()); + } + } + private static String getNodeServingSlotRange(String infoOutput) { // f4f3dc4befda352a4e0beccf29f5e8828438705d 127.0.0.1:7380 master - 0 // 1394372400827 0 connected 5461-10922