Skip to content

Commit

Permalink
Add active detection in sentinel mode,Reconstruct sentinel mode Listener
Browse files Browse the repository at this point in the history
Signed-off-by: c00603587 <[email protected]>
  • Loading branch information
c00603587 committed Sep 19, 2023
1 parent 1526ede commit fd996e2
Show file tree
Hide file tree
Showing 8 changed files with 537 additions and 243 deletions.
179 changes: 52 additions & 127 deletions src/main/java/redis/clients/jedis/JedisSentinelPool.java
Original file line number Diff line number Diff line change
@@ -1,11 +1,9 @@
package redis.clients.jedis;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;

import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
Expand All @@ -24,7 +22,7 @@ public class JedisSentinelPool extends Pool<Jedis> {

private final JedisClientConfig sentinelClientConfig;

protected final Collection<MasterListener> masterListeners = new ArrayList<>();
protected final Collection<SentinelMasterListener> masterListeners = new ArrayList<>();

private volatile HostAndPort currentHostMaster;

Expand Down Expand Up @@ -181,8 +179,11 @@ public JedisSentinelPool(String masterName, Set<HostAndPort> sentinels,

HostAndPort master = initSentinels(sentinels, masterName);
initMaster(master);
initMasterListeners(sentinels,masterName);
}



public JedisSentinelPool(String masterName, Set<HostAndPort> sentinels,
final GenericObjectPoolConfig<Jedis> poolConfig, final JedisFactory factory,
final JedisClientConfig sentinelClientConfig) {
Expand All @@ -193,6 +194,52 @@ public JedisSentinelPool(String masterName, Set<HostAndPort> sentinels,

HostAndPort master = initSentinels(sentinels, masterName);
initMaster(master);
initMasterListeners(sentinels,masterName,poolConfig);
}

private void initMasterListeners(Set<HostAndPort> sentinels, String masterName) {
initMasterListeners(sentinels,masterName,null);
}

private void initMasterListeners(Set<HostAndPort> sentinels, String masterName, GenericObjectPoolConfig<Jedis> poolConfig) {

LOG.info("Init master node listener {}", masterName);
SentinelPoolConfig jedisSentinelPoolConfig = null;
if (poolConfig instanceof SentinelPoolConfig) {
jedisSentinelPoolConfig = ((SentinelPoolConfig) poolConfig);
} else {
jedisSentinelPoolConfig = new SentinelPoolConfig();
}

for (HostAndPort sentinel : sentinels) {
if (jedisSentinelPoolConfig.isEnableActiveDetectListener()) {
masterListeners.add(new SentinelMasterActiveDetectListener(currentHostMaster,
sentinel,
sentinelClientConfig,
masterName,
jedisSentinelPoolConfig.getActiveDetectIntervalTimeMillis()
) {
@Override
public void onChange(HostAndPort hostAndPort) {
initMaster(hostAndPort);
}
});
}

if (jedisSentinelPoolConfig.isEnableDefaultSubscribeListener()) {
masterListeners.add(new SentinelMasterSubscribeListener(masterName,
sentinel,
sentinelClientConfig,
jedisSentinelPoolConfig.getSubscribeRetryWaitTimeMillis()) {
@Override
public void onChange(HostAndPort hostAndPort) {
initMaster(hostAndPort);
}
});
}
}

masterListeners.forEach(SentinelMasterListener::start);
}

private static Set<HostAndPort> parseHostAndPorts(Set<String> strings) {
Expand All @@ -201,10 +248,7 @@ private static Set<HostAndPort> parseHostAndPorts(Set<String> strings) {

@Override
public void destroy() {
for (MasterListener m : masterListeners) {
m.shutdown();
}

masterListeners.forEach(SentinelMasterListener::shutdown);
super.destroy();
}

Expand Down Expand Up @@ -271,17 +315,7 @@ private HostAndPort initSentinels(Set<HostAndPort> sentinels, final String maste
}
}

LOG.info("Redis master running at {}, starting Sentinel listeners...", master);

for (HostAndPort sentinel : sentinels) {

MasterListener masterListener = new MasterListener(masterName, sentinel.getHost(), sentinel.getPort());
// whether MasterListener threads are alive or not, process can be stopped
masterListener.setDaemon(true);
masterListeners.add(masterListener);
masterListener.start();
}

LOG.info("Redis master running at {}", master);
return master;
}

Expand Down Expand Up @@ -323,113 +357,4 @@ public void returnResource(final Jedis resource) {
}
}
}

protected class MasterListener extends Thread {

protected String masterName;
protected String host;
protected int port;
protected long subscribeRetryWaitTimeMillis = 5000;
protected volatile Jedis j;
protected AtomicBoolean running = new AtomicBoolean(false);

protected MasterListener() {
}

public MasterListener(String masterName, String host, int port) {
super(String.format("MasterListener-%s-[%s:%d]", masterName, host, port));
this.masterName = masterName;
this.host = host;
this.port = port;
}

public MasterListener(String masterName, String host, int port,
long subscribeRetryWaitTimeMillis) {
this(masterName, host, port);
this.subscribeRetryWaitTimeMillis = subscribeRetryWaitTimeMillis;
}

@Override
public void run() {

running.set(true);

while (running.get()) {

try {
// double check that it is not being shutdown
if (!running.get()) {
break;
}

final HostAndPort hostPort = new HostAndPort(host, port);
j = new Jedis(hostPort, sentinelClientConfig);

// code for active refresh
List<String> masterAddr = j.sentinelGetMasterAddrByName(masterName);
if (masterAddr == null || masterAddr.size() != 2) {
LOG.warn("Can not get master addr, master name: {}. Sentinel: {}.", masterName,
hostPort);
} else {
initMaster(toHostAndPort(masterAddr));
}

j.subscribe(new JedisPubSub() {
@Override
public void onMessage(String channel, String message) {
LOG.debug("Sentinel {} published: {}.", hostPort, message);

String[] switchMasterMsg = message.split(" ");

if (switchMasterMsg.length > 3) {

if (masterName.equals(switchMasterMsg[0])) {
initMaster(toHostAndPort(Arrays.asList(switchMasterMsg[3], switchMasterMsg[4])));
} else {
LOG.debug(
"Ignoring message on +switch-master for master name {}, our master name is {}",
switchMasterMsg[0], masterName);
}

} else {
LOG.error("Invalid message received on Sentinel {} on channel +switch-master: {}",
hostPort, message);
}
}
}, "+switch-master");

} catch (JedisException e) {

if (running.get()) {
LOG.error("Lost connection to Sentinel at {}:{}. Sleeping 5000ms and retrying.", host,
port, e);
try {
Thread.sleep(subscribeRetryWaitTimeMillis);
} catch (InterruptedException e1) {
LOG.error("Sleep interrupted: ", e1);
}
} else {
LOG.debug("Unsubscribing from Sentinel at {}:{}", host, port);
}
} finally {
if (j != null) {
j.close();
}
}
}
}

public void shutdown() {
try {
LOG.debug("Shutting down listener on {}:{}", host, port);
running.set(false);
// This isn't good, the Jedis object is not thread safe
if (j != null) {
j.close();
}
} catch (RuntimeException e) {
LOG.error("Caught exception while shutting down: ", e);
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
package redis.clients.jedis;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Arrays;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;

/**
* active detect master node .in case of the subscribe message lost
* @see SentinelMasterSubscribeListener subscribe failover message from "+switch-master" channel
*
*/
public abstract class SentinelMasterActiveDetectListener extends Thread implements SentinelMasterListener {

private static final Logger LOG = LoggerFactory.getLogger(SentinelMasterActiveDetectListener.class);

private List<String> currentHostMaster;
private HostAndPort sentinel;
private JedisClientConfig jedisClientConfig;
private String masterName;
private long activeDetectIntervalTimeMillis = 5 * 1000;

private AtomicBoolean running = new AtomicBoolean(false);
private volatile Jedis j;

public SentinelMasterActiveDetectListener(HostAndPort currentHostMaster, HostAndPort sentinel,
JedisClientConfig jedisClientConfig, String masterName,
long activeDetectIntervalTimeMillis) {
super(String.format("SentinelMasterActiveDetectListener-%s-[%s:%d]", masterName, sentinel.getHost(), sentinel.getPort()));
this.currentHostMaster = Arrays.asList(currentHostMaster.getHost(), String.valueOf(currentHostMaster.getPort()));
this.sentinel = sentinel;
this.jedisClientConfig = jedisClientConfig;
this.masterName = masterName;
this.activeDetectIntervalTimeMillis = activeDetectIntervalTimeMillis;
}

@Override
public void shutdown() {
LOG.info("Shutting down active detect listener on {}", sentinel);
running.set(false);
if (j != null) {
j.close();
}
}

@Override
public void run() {
LOG.info("Start active detect listener on {},interval {} ms", sentinel, activeDetectIntervalTimeMillis);
running.set(true);
j = new Jedis(sentinel, jedisClientConfig);
while (running.get()) {
try {
Thread.sleep(activeDetectIntervalTimeMillis);

if (j == null || j.isBroken() || !j.isConnected()) {
j = new Jedis(sentinel, jedisClientConfig);
}

List<String> masterAddr = j.sentinelGetMasterAddrByName(masterName);
if (masterAddr == null || masterAddr.size() != 2) {
LOG.warn("Can not get master addr, master name: {}. Sentinel: {}", masterName, sentinel);
continue;
}

if (!currentHostMaster.equals(masterAddr)) {
LOG.info("Found master node change from {} to{} ", currentHostMaster, masterAddr);
onChange(new HostAndPort(masterAddr.get(0), Integer.parseInt(masterAddr.get(1))));
this.currentHostMaster = masterAddr;
}
} catch (Exception e) {
// TO ensure the thread running, catch all exception
LOG.error("Active detect listener failed ", e);
}
}
}

public abstract void onChange(HostAndPort hostAndPort);
}
15 changes: 15 additions & 0 deletions src/main/java/redis/clients/jedis/SentinelMasterListener.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package redis.clients.jedis;

/**
* interface for monitor the master failover under sentinel mode
* We offer two implementation options
* @see SentinelMasterSubscribeListener Passive subscription
* @see SentinelMasterActiveDetectListener Active detection
*/
public interface SentinelMasterListener {
void start();

void shutdown();

void onChange(HostAndPort hostAndPort);
}
Loading

0 comments on commit fd996e2

Please sign in to comment.