diff --git a/src/main/java/redis/clients/jedis/providers/MultiClusterPooledConnectionProvider.java b/src/main/java/redis/clients/jedis/providers/MultiClusterPooledConnectionProvider.java index eb443bca1e..097bf636e0 100644 --- a/src/main/java/redis/clients/jedis/providers/MultiClusterPooledConnectionProvider.java +++ b/src/main/java/redis/clients/jedis/providers/MultiClusterPooledConnectionProvider.java @@ -24,6 +24,7 @@ import redis.clients.jedis.*; import redis.clients.jedis.MultiClusterClientConfig.ClusterConfig; import redis.clients.jedis.annots.Experimental; +import redis.clients.jedis.annots.VisibleForTesting; import redis.clients.jedis.exceptions.JedisConnectionException; import redis.clients.jedis.exceptions.JedisValidationException; import redis.clients.jedis.util.Pool; @@ -299,6 +300,11 @@ public Cluster getCluster() { return multiClusterMap.get(activeMultiClusterIndex); } + @VisibleForTesting + public Cluster getCluster(int multiClusterIndex) { + return multiClusterMap.get(multiClusterIndex); + } + public CircuitBreaker getClusterCircuitBreaker() { return multiClusterMap.get(activeMultiClusterIndex).getCircuitBreaker(); } diff --git a/src/test/java/redis/clients/jedis/EndpointConfig.java b/src/test/java/redis/clients/jedis/EndpointConfig.java index 42a44a3c47..68f927be0e 100644 --- a/src/test/java/redis/clients/jedis/EndpointConfig.java +++ b/src/test/java/redis/clients/jedis/EndpointConfig.java @@ -31,6 +31,10 @@ public HostAndPort getHostAndPort() { return JedisURIHelper.getHostAndPort(endpoints.get(0)); } + public HostAndPort getHostAndPort(int index) { + return JedisURIHelper.getHostAndPort(endpoints.get(index)); + } + public String getPassword() { return password; } diff --git a/src/test/java/redis/clients/jedis/scenario/ActiveActiveFailoverTest.java b/src/test/java/redis/clients/jedis/scenario/ActiveActiveFailoverTest.java new file mode 100644 index 0000000000..587988b447 --- /dev/null +++ b/src/test/java/redis/clients/jedis/scenario/ActiveActiveFailoverTest.java @@ -0,0 +1,187 @@ +package redis.clients.jedis.scenario; + +import io.github.resilience4j.circuitbreaker.CircuitBreakerConfig; +import org.junit.BeforeClass; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import redis.clients.jedis.*; +import redis.clients.jedis.providers.MultiClusterPooledConnectionProvider; +import redis.clients.jedis.exceptions.JedisConnectionException; + +import java.io.IOException; +import java.time.Duration; +import java.time.Instant; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Consumer; + +import static org.junit.Assert.*; + +public class ActiveActiveFailoverTest { + private static final Logger log = LoggerFactory.getLogger(ActiveActiveFailoverTest.class); + + private static EndpointConfig endpoint; + + private final FaultInjectionClient faultClient = new FaultInjectionClient(); + + @BeforeClass + public static void beforeClass() { + try { + ActiveActiveFailoverTest.endpoint = HostAndPorts.getRedisEndpoint("re-active-active"); + } catch (IllegalArgumentException e) { + log.warn("Skipping test because no Redis endpoint is configured"); + org.junit.Assume.assumeTrue(false); + } + } + + @Test + public void testFailover() { + + MultiClusterClientConfig.ClusterConfig[] clusterConfig = new MultiClusterClientConfig.ClusterConfig[2]; + + JedisClientConfig config = endpoint.getClientConfigBuilder() + .socketTimeoutMillis(RecommendedSettings.DEFAULT_TIMEOUT_MS) + .connectionTimeoutMillis(RecommendedSettings.DEFAULT_TIMEOUT_MS).build(); + + clusterConfig[0] = new MultiClusterClientConfig.ClusterConfig(endpoint.getHostAndPort(0), + config, RecommendedSettings.poolConfig); + clusterConfig[1] = new MultiClusterClientConfig.ClusterConfig(endpoint.getHostAndPort(1), + config, RecommendedSettings.poolConfig); + + MultiClusterClientConfig.Builder builder = new MultiClusterClientConfig.Builder(clusterConfig); + + builder.circuitBreakerSlidingWindowType(CircuitBreakerConfig.SlidingWindowType.TIME_BASED); + builder.circuitBreakerSlidingWindowSize(1); // SLIDING WINDOW SIZE IN SECONDS + builder.circuitBreakerSlidingWindowMinCalls(1); + builder.circuitBreakerFailureRateThreshold(10.0f); // percentage of failures to trigger circuit breaker + + builder.retryWaitDuration(10); + builder.retryMaxAttempts(1); + builder.retryWaitDurationExponentialBackoffMultiplier(1); + + class FailoverReporter implements Consumer { + + String currentClusterName = "not set"; + + boolean failoverHappened = false; + + Instant failoverAt = null; + + public String getCurrentClusterName() { + return currentClusterName; + } + + @Override + public void accept(String clusterName) { + this.currentClusterName = clusterName; + log.info( + "\n\n====FailoverEvent=== \nJedis failover to cluster: {}\n====FailoverEvent===\n\n", + clusterName); + + failoverHappened = true; + failoverAt = Instant.now(); + } + } + + MultiClusterPooledConnectionProvider provider = new MultiClusterPooledConnectionProvider( + builder.build()); + FailoverReporter reporter = new FailoverReporter(); + provider.setClusterFailoverPostProcessor(reporter); + provider.setActiveMultiClusterIndex(1); + + UnifiedJedis client = new UnifiedJedis(provider); + + AtomicLong retryingThreadsCounter = new AtomicLong(0); + AtomicLong failedCommandsAfterFailover = new AtomicLong(0); + AtomicReference lastFailedCommandAt = new AtomicReference<>(); + + // Start thread that imitates an application that uses the client + MultiThreadedFakeApp fakeApp = new MultiThreadedFakeApp(client, (UnifiedJedis c) -> { + + long threadId = Thread.currentThread().getId(); + + int attempt = 0; + int maxTries = 500; + int retryingDelay = 5; + while (true) { + try { + Map executionInfo = new HashMap() {{ + put("threadId", String.valueOf(threadId)); + put("cluster", reporter.getCurrentClusterName()); + }}; + client.xadd("execution_log", StreamEntryID.NEW_ENTRY, executionInfo); + + if (attempt > 0) { + log.info("Thread {} recovered after {} ms. Threads still not recovered: {}", threadId, + attempt * retryingDelay, retryingThreadsCounter.decrementAndGet()); + } + + break; + } catch (JedisConnectionException e) { + + if (reporter.failoverHappened) { + long failedCommands = failedCommandsAfterFailover.incrementAndGet(); + lastFailedCommandAt.set(Instant.now()); + log.warn( + "Thread {} failed to execute command after failover. Failed commands after failover: {}", + threadId, failedCommands); + } + + if (attempt == 0) { + long failedThreads = retryingThreadsCounter.incrementAndGet(); + log.warn("Thread {} failed to execute command. Failed threads: {}", threadId, + failedThreads); + } + try { + Thread.sleep(retryingDelay); + } catch (InterruptedException ie) { + throw new RuntimeException(ie); + } + if (++attempt == maxTries) throw e; + } + } + return true; + }, 18); + fakeApp.setKeepExecutingForSeconds(30); + Thread t = new Thread(fakeApp); + t.start(); + + HashMap params = new HashMap<>(); + params.put("bdb_id", endpoint.getBdbId()); + params.put("rlutil_command", "pause_bdb"); + + FaultInjectionClient.TriggerActionResponse actionResponse = null; + + try { + log.info("Triggering bdb_pause"); + actionResponse = faultClient.triggerAction("execute_rlutil_command", params); + } catch (IOException e) { + fail("Fault Injection Server error:" + e.getMessage()); + } + + log.info("Action id: {}", actionResponse.getActionId()); + fakeApp.setAction(actionResponse); + + try { + t.join(); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + + ConnectionPool pool = provider.getCluster(1).getConnectionPool(); + + log.info("First connection pool state: active: {}, idle: {}", pool.getNumActive(), + pool.getNumIdle()); + log.info("Full failover time: {} s", + Duration.between(reporter.failoverAt, lastFailedCommandAt.get()).getSeconds()); + + assertEquals(0, pool.getNumActive()); + assertTrue(fakeApp.capturedExceptions().isEmpty()); + + client.close(); + } + +} diff --git a/src/test/java/redis/clients/jedis/scenario/FakeApp.java b/src/test/java/redis/clients/jedis/scenario/FakeApp.java index 7e505862a2..4ea6ffc1f9 100644 --- a/src/test/java/redis/clients/jedis/scenario/FakeApp.java +++ b/src/test/java/redis/clients/jedis/scenario/FakeApp.java @@ -12,18 +12,18 @@ public class FakeApp implements Runnable { - private static final Logger log = LoggerFactory.getLogger(FakeApp.class); + protected static final Logger log = LoggerFactory.getLogger(FakeApp.class); public void setKeepExecutingForSeconds(int keepExecutingForSeconds) { this.keepExecutingForSeconds = keepExecutingForSeconds; } - private int keepExecutingForSeconds = 60; + protected int keepExecutingForSeconds = 60; - private FaultInjectionClient.TriggerActionResponse actionResponse = null; - private final UnifiedJedis client; - private final ExecutedAction action; - private List exceptions = new ArrayList<>(); + protected FaultInjectionClient.TriggerActionResponse actionResponse = null; + protected final UnifiedJedis client; + protected final ExecutedAction action; + protected List exceptions = new ArrayList<>(); @FunctionalInterface public interface ExecutedAction { diff --git a/src/test/java/redis/clients/jedis/scenario/MultiThreadedFakeApp.java b/src/test/java/redis/clients/jedis/scenario/MultiThreadedFakeApp.java new file mode 100644 index 0000000000..4c03fc2cf2 --- /dev/null +++ b/src/test/java/redis/clients/jedis/scenario/MultiThreadedFakeApp.java @@ -0,0 +1,48 @@ +package redis.clients.jedis.scenario; + +import redis.clients.jedis.UnifiedJedis; +import redis.clients.jedis.exceptions.JedisConnectionException; + +import java.time.Duration; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; + +public class MultiThreadedFakeApp extends FakeApp { + + private final ExecutorService executorService; + + public MultiThreadedFakeApp(UnifiedJedis client, FakeApp.ExecutedAction action, int numThreads) { + super(client, action); + this.executorService = Executors.newFixedThreadPool(numThreads); + } + + @Override + public void run() { + log.info("Starting FakeApp"); + + int checkEachSeconds = 5; + int timeoutSeconds = 120; + + while (actionResponse == null || !actionResponse.isCompleted( + Duration.ofSeconds(checkEachSeconds), Duration.ofSeconds(keepExecutingForSeconds), + Duration.ofSeconds(timeoutSeconds))) { + try { + executorService.submit(() -> action.run(client)); + } catch (JedisConnectionException e) { + log.error("Error executing action", e); + exceptions.add(e); + } + } + + executorService.shutdown(); + + try { + if (!executorService.awaitTermination(keepExecutingForSeconds, TimeUnit.SECONDS)) { + executorService.shutdownNow(); + } + } catch (InterruptedException e) { + log.error("Error waiting for executor service to terminate", e); + } + } +}