diff --git a/src/main/java/redis/clients/jedis/JedisSentinelPool.java b/src/main/java/redis/clients/jedis/JedisSentinelPool.java index 6f921ddd97..39110382c7 100644 --- a/src/main/java/redis/clients/jedis/JedisSentinelPool.java +++ b/src/main/java/redis/clients/jedis/JedisSentinelPool.java @@ -1,11 +1,9 @@ package redis.clients.jedis; import java.util.ArrayList; -import java.util.Arrays; import java.util.Collection; import java.util.List; import java.util.Set; -import java.util.concurrent.atomic.AtomicBoolean; import java.util.stream.Collectors; import org.apache.commons.pool2.impl.GenericObjectPoolConfig; @@ -24,7 +22,7 @@ public class JedisSentinelPool extends Pool { private final JedisClientConfig sentinelClientConfig; - protected final Collection masterListeners = new ArrayList<>(); + protected final Collection masterListeners = new ArrayList<>(); private volatile HostAndPort currentHostMaster; @@ -181,8 +179,11 @@ public JedisSentinelPool(String masterName, Set sentinels, HostAndPort master = initSentinels(sentinels, masterName); initMaster(master); + initMasterListeners(sentinels,masterName); } + + public JedisSentinelPool(String masterName, Set sentinels, final GenericObjectPoolConfig poolConfig, final JedisFactory factory, final JedisClientConfig sentinelClientConfig) { @@ -193,6 +194,52 @@ public JedisSentinelPool(String masterName, Set sentinels, HostAndPort master = initSentinels(sentinels, masterName); initMaster(master); + initMasterListeners(sentinels,masterName,poolConfig); + } + + private void initMasterListeners(Set sentinels, String masterName) { + initMasterListeners(sentinels,masterName,null); + } + + private void initMasterListeners(Set sentinels, String masterName, GenericObjectPoolConfig poolConfig) { + + LOG.info("Init master node listener {}", masterName); + SentinelPoolConfig jedisSentinelPoolConfig = null; + if (poolConfig instanceof SentinelPoolConfig) { + jedisSentinelPoolConfig = ((SentinelPoolConfig) poolConfig); + } else { + jedisSentinelPoolConfig = new SentinelPoolConfig(); + } + + for (HostAndPort sentinel : sentinels) { + if (jedisSentinelPoolConfig.isEnableActiveDetectListener()) { + masterListeners.add(new SentinelMasterActiveDetectListener(currentHostMaster, + sentinel, + sentinelClientConfig, + masterName, + jedisSentinelPoolConfig.getActiveDetectIntervalTimeMillis() + ) { + @Override + public void onChange(HostAndPort hostAndPort) { + initMaster(hostAndPort); + } + }); + } + + if (jedisSentinelPoolConfig.isEnableDefaultSubscribeListener()) { + masterListeners.add(new SentinelMasterSubscribeListener(masterName, + sentinel, + sentinelClientConfig, + jedisSentinelPoolConfig.getSubscribeRetryWaitTimeMillis()) { + @Override + public void onChange(HostAndPort hostAndPort) { + initMaster(hostAndPort); + } + }); + } + } + + masterListeners.forEach(SentinelMasterListener::start); } private static Set parseHostAndPorts(Set strings) { @@ -201,10 +248,7 @@ private static Set parseHostAndPorts(Set strings) { @Override public void destroy() { - for (MasterListener m : masterListeners) { - m.shutdown(); - } - + masterListeners.forEach(SentinelMasterListener::shutdown); super.destroy(); } @@ -271,17 +315,7 @@ private HostAndPort initSentinels(Set sentinels, final String maste } } - LOG.info("Redis master running at {}, starting Sentinel listeners...", master); - - for (HostAndPort sentinel : sentinels) { - - MasterListener masterListener = new MasterListener(masterName, sentinel.getHost(), sentinel.getPort()); - // whether MasterListener threads are alive or not, process can be stopped - masterListener.setDaemon(true); - masterListeners.add(masterListener); - masterListener.start(); - } - + LOG.info("Redis master running at {}", master); return master; } @@ -323,113 +357,4 @@ public void returnResource(final Jedis resource) { } } } - - protected class MasterListener extends Thread { - - protected String masterName; - protected String host; - protected int port; - protected long subscribeRetryWaitTimeMillis = 5000; - protected volatile Jedis j; - protected AtomicBoolean running = new AtomicBoolean(false); - - protected MasterListener() { - } - - public MasterListener(String masterName, String host, int port) { - super(String.format("MasterListener-%s-[%s:%d]", masterName, host, port)); - this.masterName = masterName; - this.host = host; - this.port = port; - } - - public MasterListener(String masterName, String host, int port, - long subscribeRetryWaitTimeMillis) { - this(masterName, host, port); - this.subscribeRetryWaitTimeMillis = subscribeRetryWaitTimeMillis; - } - - @Override - public void run() { - - running.set(true); - - while (running.get()) { - - try { - // double check that it is not being shutdown - if (!running.get()) { - break; - } - - final HostAndPort hostPort = new HostAndPort(host, port); - j = new Jedis(hostPort, sentinelClientConfig); - - // code for active refresh - List masterAddr = j.sentinelGetMasterAddrByName(masterName); - if (masterAddr == null || masterAddr.size() != 2) { - LOG.warn("Can not get master addr, master name: {}. Sentinel: {}.", masterName, - hostPort); - } else { - initMaster(toHostAndPort(masterAddr)); - } - - j.subscribe(new JedisPubSub() { - @Override - public void onMessage(String channel, String message) { - LOG.debug("Sentinel {} published: {}.", hostPort, message); - - String[] switchMasterMsg = message.split(" "); - - if (switchMasterMsg.length > 3) { - - if (masterName.equals(switchMasterMsg[0])) { - initMaster(toHostAndPort(Arrays.asList(switchMasterMsg[3], switchMasterMsg[4]))); - } else { - LOG.debug( - "Ignoring message on +switch-master for master name {}, our master name is {}", - switchMasterMsg[0], masterName); - } - - } else { - LOG.error("Invalid message received on Sentinel {} on channel +switch-master: {}", - hostPort, message); - } - } - }, "+switch-master"); - - } catch (JedisException e) { - - if (running.get()) { - LOG.error("Lost connection to Sentinel at {}:{}. Sleeping 5000ms and retrying.", host, - port, e); - try { - Thread.sleep(subscribeRetryWaitTimeMillis); - } catch (InterruptedException e1) { - LOG.error("Sleep interrupted: ", e1); - } - } else { - LOG.debug("Unsubscribing from Sentinel at {}:{}", host, port); - } - } finally { - if (j != null) { - j.close(); - } - } - } - } - - public void shutdown() { - try { - LOG.debug("Shutting down listener on {}:{}", host, port); - running.set(false); - // This isn't good, the Jedis object is not thread safe - if (j != null) { - j.close(); - } - } catch (RuntimeException e) { - LOG.error("Caught exception while shutting down: ", e); - } - } - } } diff --git a/src/main/java/redis/clients/jedis/SentinelMasterActiveDetectListener.java b/src/main/java/redis/clients/jedis/SentinelMasterActiveDetectListener.java new file mode 100644 index 0000000000..e89995ef17 --- /dev/null +++ b/src/main/java/redis/clients/jedis/SentinelMasterActiveDetectListener.java @@ -0,0 +1,80 @@ +package redis.clients.jedis; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Arrays; +import java.util.List; +import java.util.concurrent.atomic.AtomicBoolean; + +/** + * active detect master node .in case of the subscribe message lost + * @see SentinelMasterSubscribeListener subscribe failover message from "+switch-master" channel + * + */ +public abstract class SentinelMasterActiveDetectListener extends Thread implements SentinelMasterListener { + + private static final Logger LOG = LoggerFactory.getLogger(SentinelMasterActiveDetectListener.class); + + private List currentHostMaster; + private HostAndPort sentinel; + private JedisClientConfig jedisClientConfig; + private String masterName; + private long activeDetectIntervalTimeMillis = 5 * 1000; + + private AtomicBoolean running = new AtomicBoolean(false); + private volatile Jedis j; + + public SentinelMasterActiveDetectListener(HostAndPort currentHostMaster, HostAndPort sentinel, + JedisClientConfig jedisClientConfig, String masterName, + long activeDetectIntervalTimeMillis) { + super(String.format("SentinelMasterActiveDetectListener-%s-[%s:%d]", masterName, sentinel.getHost(), sentinel.getPort())); + this.currentHostMaster = Arrays.asList(currentHostMaster.getHost(), String.valueOf(currentHostMaster.getPort())); + this.sentinel = sentinel; + this.jedisClientConfig = jedisClientConfig; + this.masterName = masterName; + this.activeDetectIntervalTimeMillis = activeDetectIntervalTimeMillis; + } + + @Override + public void shutdown() { + LOG.info("Shutting down active detect listener on {}", sentinel); + running.set(false); + if (j != null) { + j.close(); + } + } + + @Override + public void run() { + LOG.info("Start active detect listener on {},interval {} ms", sentinel, activeDetectIntervalTimeMillis); + running.set(true); + j = new Jedis(sentinel, jedisClientConfig); + while (running.get()) { + try { + Thread.sleep(activeDetectIntervalTimeMillis); + + if (j == null || j.isBroken() || !j.isConnected()) { + j = new Jedis(sentinel, jedisClientConfig); + } + + List masterAddr = j.sentinelGetMasterAddrByName(masterName); + if (masterAddr == null || masterAddr.size() != 2) { + LOG.warn("Can not get master addr, master name: {}. Sentinel: {}", masterName, sentinel); + continue; + } + + if (!currentHostMaster.equals(masterAddr)) { + LOG.info("Found master node change from {} to{} ", currentHostMaster, masterAddr); + onChange(new HostAndPort(masterAddr.get(0), Integer.parseInt(masterAddr.get(1)))); + this.currentHostMaster = masterAddr; + } + } catch (Exception e) { + // TO ensure the thread running, catch all exception + LOG.error("Active detect listener failed ", e); + } + } + } + + public abstract void onChange(HostAndPort hostAndPort); +} diff --git a/src/main/java/redis/clients/jedis/SentinelMasterListener.java b/src/main/java/redis/clients/jedis/SentinelMasterListener.java new file mode 100644 index 0000000000..6f3945adf0 --- /dev/null +++ b/src/main/java/redis/clients/jedis/SentinelMasterListener.java @@ -0,0 +1,15 @@ +package redis.clients.jedis; + +/** + * interface for monitor the master failover under sentinel mode + * We offer two implementation options + * @see SentinelMasterSubscribeListener Passive subscription + * @see SentinelMasterActiveDetectListener Active detection + */ +public interface SentinelMasterListener { + void start(); + + void shutdown(); + + void onChange(HostAndPort hostAndPort); +} diff --git a/src/main/java/redis/clients/jedis/SentinelMasterSubscribeListener.java b/src/main/java/redis/clients/jedis/SentinelMasterSubscribeListener.java new file mode 100644 index 0000000000..ce929fac96 --- /dev/null +++ b/src/main/java/redis/clients/jedis/SentinelMasterSubscribeListener.java @@ -0,0 +1,132 @@ +package redis.clients.jedis; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import redis.clients.jedis.exceptions.JedisException; + +import java.util.Arrays; +import java.util.List; +import java.util.concurrent.atomic.AtomicBoolean; + +/** + * subscribe failover message from "+switch-master" channel , the default listener mode use this + * @see SentinelMasterActiveDetectListener active detect master node .in case of the subscribe message lost + * + */ +public abstract class SentinelMasterSubscribeListener extends Thread implements SentinelMasterListener { + + private static final Logger LOG = LoggerFactory.getLogger(SentinelMasterSubscribeListener.class); + + private String masterName; + private HostAndPort sentinel; + private JedisClientConfig sentinelClientConfig; + private long subscribeRetryWaitTimeMillis = 5000; + private volatile Jedis j; + private AtomicBoolean running = new AtomicBoolean(false); + + + public SentinelMasterSubscribeListener(String masterName, HostAndPort sentinel, JedisClientConfig sentinelClientConfig, + long subscribeRetryWaitTimeMillis) { + super(String.format("SentinelMaterSubscribeListener-%s-[%s:%d]", masterName, sentinel.getHost(), sentinel.getPort())); + this.masterName = masterName; + this.sentinel = sentinel; + this.sentinelClientConfig = sentinelClientConfig; + this.subscribeRetryWaitTimeMillis = subscribeRetryWaitTimeMillis; + } + + @Override + public void run() { + + LOG.info("start on:{}", sentinel); + + running.set(true); + + while (running.get()) { + + try { + // double check that it is not being shutdown + if (!running.get()) { + break; + } + + j = new Jedis(sentinel, sentinelClientConfig); + + // code for active refresh + List masterAddr = j.sentinelGetMasterAddrByName(masterName); + if (masterAddr == null || masterAddr.size() != 2) { + LOG.warn("Can not get master addr, master name: {}. Sentinel: {}.", masterName, + sentinel); + } else { + onChange(toHostAndPort(masterAddr)); + } + + j.subscribe(new JedisPubSub() { + @Override + public void onMessage(String channel, String message) { + LOG.debug("Sentinel {} published: {}.", sentinel, message); + + String[] switchMasterMsg = message.split(" "); + + if (switchMasterMsg.length > 3) { + + if (masterName.equals(switchMasterMsg[0])) { + LOG.info("Receive switch-master message:{} from {}.", message, channel); + onChange(toHostAndPort(Arrays.asList(switchMasterMsg[3], switchMasterMsg[4]))); + } else { + LOG.debug( + "Ignoring message on +switch-master for master name {}, our master name is {}", + switchMasterMsg[0], masterName); + } + + } else { + LOG.error("Invalid message received on Sentinel {} on channel +switch-master: {}", + sentinel, message); + } + } + }, "+switch-master"); + + } catch (JedisException e) { + + if (running.get()) { + LOG.error("Lost connection to Sentinel at {}. Sleeping {}ms and retrying.", sentinel, subscribeRetryWaitTimeMillis, e); + try { + Thread.sleep(subscribeRetryWaitTimeMillis); + } catch (InterruptedException e1) { + LOG.error("Sleep interrupted: ", e1); + } + } else { + LOG.debug("Unsubscribing from Sentinel at {}", sentinel); + } + } finally { + if (j != null) { + j.close(); + } + } + } + } + + @Override + public void shutdown() { + try { + LOG.debug("Shutting down subscribe listener on {}", sentinel); + running.set(false); + // This isn't good, the Jedis object is not thread safe + if (j != null) { + j.close(); + } + } catch (RuntimeException e) { + LOG.error("Caught exception while shutting down: ", e); + } + } + + + @Override + public abstract void onChange(HostAndPort hostAndPort); + + private HostAndPort toHostAndPort(List getMasterAddrByNameResult) { + String host = getMasterAddrByNameResult.get(0); + int port = Integer.parseInt(getMasterAddrByNameResult.get(1)); + + return new HostAndPort(host, port); + } +} diff --git a/src/main/java/redis/clients/jedis/SentinelPoolConfig.java b/src/main/java/redis/clients/jedis/SentinelPoolConfig.java new file mode 100644 index 0000000000..158b6a8cbd --- /dev/null +++ b/src/main/java/redis/clients/jedis/SentinelPoolConfig.java @@ -0,0 +1,44 @@ +package redis.clients.jedis; + +import org.apache.commons.pool2.impl.GenericObjectPoolConfig; + +public class SentinelPoolConfig extends GenericObjectPoolConfig { + + private boolean enableActiveDetectListener = false; + private long activeDetectIntervalTimeMillis = 5 * 1000; + + private boolean enableDefaultSubscribeListener = true; + private long subscribeRetryWaitTimeMillis = 5 * 1000; + + public boolean isEnableActiveDetectListener() { + return enableActiveDetectListener; + } + + public void setEnableActiveDetectListener(boolean enableActiveDetectListener) { + this.enableActiveDetectListener = enableActiveDetectListener; + } + + public long getActiveDetectIntervalTimeMillis() { + return activeDetectIntervalTimeMillis; + } + + public void setActiveDetectIntervalTimeMillis(long activeDetectIntervalTimeMillis) { + this.activeDetectIntervalTimeMillis = activeDetectIntervalTimeMillis; + } + + public boolean isEnableDefaultSubscribeListener() { + return enableDefaultSubscribeListener; + } + + public void setEnableDefaultSubscribeListener(boolean enableDefaultSubscribeListener) { + this.enableDefaultSubscribeListener = enableDefaultSubscribeListener; + } + + public long getSubscribeRetryWaitTimeMillis() { + return subscribeRetryWaitTimeMillis; + } + + public void setSubscribeRetryWaitTimeMillis(long subscribeRetryWaitTimeMillis) { + this.subscribeRetryWaitTimeMillis = subscribeRetryWaitTimeMillis; + } +} diff --git a/src/main/java/redis/clients/jedis/providers/SentineledConnectionProvider.java b/src/main/java/redis/clients/jedis/providers/SentineledConnectionProvider.java index 5058f07179..ab71587591 100644 --- a/src/main/java/redis/clients/jedis/providers/SentineledConnectionProvider.java +++ b/src/main/java/redis/clients/jedis/providers/SentineledConnectionProvider.java @@ -4,7 +4,6 @@ import java.util.Collection; import java.util.List; import java.util.Set; -import java.util.concurrent.atomic.AtomicBoolean; import org.apache.commons.pool2.impl.GenericObjectPoolConfig; import org.slf4j.Logger; @@ -16,10 +15,12 @@ import redis.clients.jedis.HostAndPort; import redis.clients.jedis.Jedis; import redis.clients.jedis.JedisClientConfig; -import redis.clients.jedis.JedisPubSub; +import redis.clients.jedis.SentinelPoolConfig; +import redis.clients.jedis.SentinelMasterActiveDetectListener; +import redis.clients.jedis.SentinelMasterListener; +import redis.clients.jedis.SentinelMasterSubscribeListener; import redis.clients.jedis.exceptions.JedisConnectionException; import redis.clients.jedis.exceptions.JedisException; -import redis.clients.jedis.util.IOUtils; public class SentineledConnectionProvider implements ConnectionProvider { @@ -37,7 +38,7 @@ public class SentineledConnectionProvider implements ConnectionProvider { private final GenericObjectPoolConfig masterPoolConfig; - protected final Collection sentinelListeners = new ArrayList<>(); + protected final Collection sentinelListeners = new ArrayList<>(); private final JedisClientConfig sentinelClientConfig; @@ -71,6 +72,48 @@ public SentineledConnectionProvider(String masterName, final JedisClientConfig m HostAndPort master = initSentinels(sentinels); initMaster(master); + initMasterListeners(sentinels,masterName,poolConfig); + } + + private void initMasterListeners(Set sentinels, String masterName, GenericObjectPoolConfig poolConfig) { + + LOG.info("Init master node listener {}", masterName); + SentinelPoolConfig jedisSentinelPoolConfig = null; + if (poolConfig instanceof SentinelPoolConfig) { + jedisSentinelPoolConfig = ((SentinelPoolConfig) poolConfig); + } else { + jedisSentinelPoolConfig = new SentinelPoolConfig(); + } + + for (HostAndPort sentinel : sentinels) { + if (jedisSentinelPoolConfig.isEnableActiveDetectListener()) { + sentinelListeners.add(new SentinelMasterActiveDetectListener(currentMaster, + sentinel, + sentinelClientConfig, + masterName, + jedisSentinelPoolConfig.getActiveDetectIntervalTimeMillis() + ) { + @Override + public void onChange(HostAndPort hostAndPort) { + initMaster(hostAndPort); + } + }); + } + + if (jedisSentinelPoolConfig.isEnableDefaultSubscribeListener()) { + sentinelListeners.add(new SentinelMasterSubscribeListener(masterName, + sentinel, + sentinelClientConfig, + jedisSentinelPoolConfig.getSubscribeRetryWaitTimeMillis()) { + @Override + public void onChange(HostAndPort hostAndPort) { + initMaster(hostAndPort); + } + }); + } + } + + sentinelListeners.forEach(SentinelMasterListener::start); } @Override @@ -85,7 +128,7 @@ public Connection getConnection(CommandArguments args) { @Override public void close() { - sentinelListeners.forEach(SentinelListener::shutdown); + sentinelListeners.forEach(SentinelMasterListener::shutdown); pool.close(); } @@ -161,16 +204,7 @@ private HostAndPort initSentinels(Set sentinels) { } } - LOG.info("Redis master running at {}. Starting sentinel listeners...", master); - - for (HostAndPort sentinel : sentinels) { - - SentinelListener listener = new SentinelListener(sentinel); - // whether SentinelListener threads are alive or not, process can be stopped - listener.setDaemon(true); - sentinelListeners.add(listener); - listener.start(); - } + LOG.info("Redis master running at {}. ", master); return master; } @@ -185,96 +219,4 @@ private static HostAndPort toHostAndPort(List masterAddr) { private static HostAndPort toHostAndPort(String hostStr, String portStr) { return new HostAndPort(hostStr, Integer.parseInt(portStr)); } - - protected class SentinelListener extends Thread { - - protected final HostAndPort node; - protected volatile Jedis sentinelJedis; - protected AtomicBoolean running = new AtomicBoolean(false); - - public SentinelListener(HostAndPort node) { - super(String.format("%s-SentinelListener-[%s]", masterName, node.toString())); - this.node = node; - } - - @Override - public void run() { - - running.set(true); - - while (running.get()) { - - try { - // double check that it is not being shutdown - if (!running.get()) { - break; - } - - sentinelJedis = new Jedis(node, sentinelClientConfig); - - // code for active refresh - List masterAddr = sentinelJedis.sentinelGetMasterAddrByName(masterName); - if (masterAddr == null || masterAddr.size() != 2) { - LOG.warn("Can not get master {} address. Sentinel: {}.", masterName, node); - } else { - initMaster(toHostAndPort(masterAddr)); - } - - sentinelJedis.subscribe(new JedisPubSub() { - @Override - public void onMessage(String channel, String message) { - LOG.debug("Sentinel {} published: {}.", node, message); - - String[] switchMasterMsg = message.split(" "); - - if (switchMasterMsg.length > 3) { - - if (masterName.equals(switchMasterMsg[0])) { - initMaster(toHostAndPort(switchMasterMsg[3], switchMasterMsg[4])); - } else { - LOG.debug( - "Ignoring message on +switch-master for master {}. Our master is {}.", - switchMasterMsg[0], masterName); - } - - } else { - LOG.error("Invalid message received on sentinel {} on channel +switch-master: {}.", - node, message); - } - } - }, "+switch-master"); - - } catch (JedisException e) { - - if (running.get()) { - LOG.error("Lost connection to sentinel {}. Sleeping {}ms and retrying.", node, - subscribeRetryWaitTimeMillis, e); - try { - Thread.sleep(subscribeRetryWaitTimeMillis); - } catch (InterruptedException se) { - LOG.error("Sleep interrupted.", se); - } - } else { - LOG.debug("Unsubscribing from sentinel {}.", node); - } - } finally { - IOUtils.closeQuietly(sentinelJedis); - } - } - } - - // must not throw exception - public void shutdown() { - try { - LOG.debug("Shutting down listener on {}.", node); - running.set(false); - // This isn't good, the Jedis object is not thread safe - if (sentinelJedis != null) { - sentinelJedis.close(); - } - } catch (RuntimeException e) { - LOG.error("Error while shutting down.", e); - } - } - } } diff --git a/src/test/java/redis/clients/jedis/JedisSentinelPoolTest.java b/src/test/java/redis/clients/jedis/JedisSentinelPoolTest.java index 1d52d14784..66d6532500 100644 --- a/src/test/java/redis/clients/jedis/JedisSentinelPoolTest.java +++ b/src/test/java/redis/clients/jedis/JedisSentinelPoolTest.java @@ -1,10 +1,5 @@ package redis.clients.jedis; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; -import static org.junit.Assert.assertSame; -import static org.junit.Assert.fail; - import java.util.HashSet; import java.util.Set; @@ -15,6 +10,8 @@ import redis.clients.jedis.exceptions.JedisConnectionException; import redis.clients.jedis.exceptions.JedisException; +import static org.junit.Assert.*; + public class JedisSentinelPoolTest { private static final String MASTER_NAME = "mymaster"; @@ -183,4 +180,81 @@ public void testResetValidPassword() { } } } + + + @Test + public void testSentinelMasterSubscribeListener() { + // case 1: default : subscribe on ,active off + SentinelPoolConfig config = new SentinelPoolConfig(); + + JedisSentinelPool pool = new JedisSentinelPool(MASTER_NAME, sentinels, config, 1000, + "foobared", 2); + Jedis jedis = pool.getResource(); + HostAndPort hostPort1 = jedis.connection.getHostAndPort(); + + Jedis sentinel = new Jedis(sentinel1); + sentinel.sendCommand(Protocol.Command.SENTINEL, "failover", MASTER_NAME); + try { + Thread.sleep(10000); + } catch (InterruptedException e) { + e.printStackTrace(); + } + + HostAndPort hostPort2 = jedis.connection.getHostAndPort(); + jedis.close(); + pool.destroy(); + assertNotEquals(hostPort1, hostPort2); + } + + @Test + public void testSentinelMasterActiveDetectListener() { + // case 2: subscribe off ,active on + SentinelPoolConfig config = new SentinelPoolConfig(); + config.setEnableActiveDetectListener(true); + config.setEnableDefaultSubscribeListener(false); + + JedisSentinelPool pool = new JedisSentinelPool(MASTER_NAME, sentinels, config, 1000, + "foobared", 2); + Jedis jedis = pool.getResource(); + HostAndPort hostPort1 = jedis.connection.getHostAndPort(); + + Jedis sentinel = new Jedis(sentinel1); + sentinel.sendCommand(Protocol.Command.SENTINEL, "failover", MASTER_NAME); + try { + Thread.sleep(10000); + } catch (InterruptedException e) { + e.printStackTrace(); + } + + HostAndPort hostPort2 = jedis.connection.getHostAndPort(); + jedis.close(); + pool.destroy(); + assertNotEquals(hostPort1, hostPort2); + } + + @Test + public void testALlSentinelMasterListener() { + // case 2: subscribe on ,active on + SentinelPoolConfig config = new SentinelPoolConfig(); + config.setEnableActiveDetectListener(true); + config.setEnableDefaultSubscribeListener(true); + + JedisSentinelPool pool = new JedisSentinelPool(MASTER_NAME, sentinels, config, 1000, + "foobared", 2); + Jedis jedis = pool.getResource(); + HostAndPort hostPort1 = jedis.connection.getHostAndPort(); + + Jedis sentinel = new Jedis(sentinel1); + sentinel.sendCommand(Protocol.Command.SENTINEL, "failover", MASTER_NAME); + try { + Thread.sleep(10000); + } catch (InterruptedException e) { + e.printStackTrace(); + } + + HostAndPort hostPort2 = jedis.connection.getHostAndPort(); + jedis.close(); + pool.destroy(); + assertNotEquals(hostPort1, hostPort2); + } } diff --git a/src/test/java/redis/clients/jedis/SentineledConnectionProviderTest.java b/src/test/java/redis/clients/jedis/SentineledConnectionProviderTest.java index 593875710b..6ea2724c43 100644 --- a/src/test/java/redis/clients/jedis/SentineledConnectionProviderTest.java +++ b/src/test/java/redis/clients/jedis/SentineledConnectionProviderTest.java @@ -1,9 +1,5 @@ package redis.clients.jedis; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertSame; -import static org.junit.Assert.fail; - import java.util.HashSet; import java.util.Set; @@ -15,6 +11,8 @@ import redis.clients.jedis.exceptions.JedisException; import redis.clients.jedis.providers.SentineledConnectionProvider; +import static org.junit.Assert.*; + /** * @see JedisSentinelPoolTest */ @@ -161,4 +159,88 @@ public void testResetValidPassword() { } } } + + @Test + public void testSentinelMasterSubscribeListener() { + SentinelPoolConfig config = new SentinelPoolConfig(); + config.setEnableActiveDetectListener(false); + config.setEnableDefaultSubscribeListener(true); + + try (JedisSentineled jedis = new JedisSentineled(MASTER_NAME, + DefaultJedisClientConfig.builder().timeoutMillis(1000).password("foobared").database(2).build(), + config, sentinels, DefaultJedisClientConfig.builder().build())) { + + HostAndPort master1 = jedis.provider.getConnection().getHostAndPort(); + + Jedis sentinel = new Jedis(sentinel1); + sentinel.sendCommand(Protocol.Command.SENTINEL, "failover", MASTER_NAME); + + try { + Thread.sleep(10000); + } catch (InterruptedException e) { + e.printStackTrace(); + } + + HostAndPort master2 = jedis.provider.getConnection().getHostAndPort(); + + assertNotEquals(master1, master2); + + } + } + + @Test + public void testSentinelMasterActiveDetectListener() { + SentinelPoolConfig config = new SentinelPoolConfig(); + config.setEnableActiveDetectListener(true); + config.setEnableDefaultSubscribeListener(false); + + try (JedisSentineled jedis = new JedisSentineled(MASTER_NAME, + DefaultJedisClientConfig.builder().timeoutMillis(1000).password("foobared").database(2).build(), + config, sentinels, DefaultJedisClientConfig.builder().build())) { + + HostAndPort master1 = jedis.provider.getConnection().getHostAndPort(); + + Jedis sentinel = new Jedis(sentinel1); + sentinel.sendCommand(Protocol.Command.SENTINEL, "failover", MASTER_NAME); + + try { + Thread.sleep(10000); + } catch (InterruptedException e) { + e.printStackTrace(); + } + + HostAndPort master2 = jedis.provider.getConnection().getHostAndPort(); + + assertNotEquals(master1, master2); + + } + } + + @Test + public void testALlSentinelMasterListener() { + SentinelPoolConfig config = new SentinelPoolConfig(); + config.setEnableActiveDetectListener(true); + config.setEnableDefaultSubscribeListener(true); + + try (JedisSentineled jedis = new JedisSentineled(MASTER_NAME, + DefaultJedisClientConfig.builder().timeoutMillis(1000).password("foobared").database(2).build(), + config, sentinels, DefaultJedisClientConfig.builder().build())) { + + HostAndPort master1 = jedis.provider.getConnection().getHostAndPort(); + + Jedis sentinel = new Jedis(sentinel1); + sentinel.sendCommand(Protocol.Command.SENTINEL, "failover", MASTER_NAME); + + try { + Thread.sleep(10000); + } catch (InterruptedException e) { + e.printStackTrace(); + } + + HostAndPort master2 = jedis.provider.getConnection().getHostAndPort(); + + assertNotEquals(master1, master2); + + } + } }