Skip to content

Commit

Permalink
feat: added default cluster client options for refreshing topology
Browse files Browse the repository at this point in the history
  • Loading branch information
VonDerBeck committed Feb 14, 2024
1 parent ca36ba6 commit 18255a5
Show file tree
Hide file tree
Showing 6 changed files with 96 additions and 13 deletions.
5 changes: 5 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -335,12 +335,17 @@ On the connector side it's your own responsibility to create the `RedisClusterCl
final RedisClusterClient redisClient = RedisClusterClient.create(...);

final ZeebeRedis zeebeRedis = ZeebeRedis.newBuilder(redisClusterClient)
.withStandardClusterOptions()
.consumerGroup("MyApplication").consumerId("consumer-1")
.addIncidentListener(incident -> { ... })
.addJobListener(job -> { ... })
.build();
```
The optional method `withStandardClusterOptions()` activates several cluster topology refresh options
and filters out failed nodes from the topology. It sets / overrides the cluster client options and
might be a good option if you do not have any specific own requirements.
**Sharding in Redis clusters**
When using the standard `RedisClient` the connector uses a Multi-key operation to receive events from Redis.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,11 @@ public RedisConnectionBuilder withReconnectUsingNewConnection() {
return this;
}

public RedisConnectionBuilder withStandardClusterOptions() {
this.redisClient.setStandardClusterOptions();
return this;
}

public RedisConnectionBuilder reconnectInterval(Duration duration) {
this.reconnectInterval = duration;
return this;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
package io.zeebe.redis.connect.java;

import io.lettuce.core.RedisClient;
import io.lettuce.core.cluster.ClusterClientOptions;
import io.lettuce.core.cluster.ClusterTopologyRefreshOptions;
import io.lettuce.core.cluster.RedisClusterClient;
import io.lettuce.core.cluster.models.partitions.RedisClusterNode;

public class UniversalRedisClient {

Expand Down Expand Up @@ -29,4 +32,32 @@ public RedisClusterClient getRedisClusterClient() {
return redisClusterClient;
}

public void setStandardClusterOptions() {
if (redisClusterClient != null) {
redisClusterClient.setOptions(createStandardOptions());
}
}

private static ClusterClientOptions createStandardOptions() {
return ClusterClientOptions.builder()
.autoReconnect(true)
// dynamic adaptive refresh
.topologyRefreshOptions(dynamicRefreshOptions())
// filter out failed nodes from the topology
.nodeFilter(it ->
! (it.is(RedisClusterNode.NodeFlag.FAIL)
|| it.is(RedisClusterNode.NodeFlag.EVENTUAL_FAIL)
|| it.is(RedisClusterNode.NodeFlag.HANDSHAKE)
|| it.is(RedisClusterNode.NodeFlag.NOADDR)))
.validateClusterNodeMembership(true)
.build();
}

private static ClusterTopologyRefreshOptions dynamicRefreshOptions() {
return ClusterTopologyRefreshOptions.builder()
.enableAllAdaptiveRefreshTriggers()
.enablePeriodicRefresh()
.dynamicRefreshSources(true)
.build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ private static AbstractMap.SimpleEntry<String, Class<? extends com.google.protob

private boolean deleteMessages;

private Future<?> future;
private Future<?> readFuture;
private ExecutorService executorService;

private boolean reconnectUsesNewConnection;
Expand Down Expand Up @@ -135,9 +135,11 @@ protected void start() {
LOGGER.error("Error creating Redis cluster connection pool", e);
throw new RuntimeException(e);
}
// the connection has been used to create consumer groups and can now be closed
// because reading from streams is done by the connection pool
redisConnection.close();
if (!shouldDestroyConsumerGroupOnClose) {
// the connection has been used to create consumer groups and can now be closed
// because reading from streams is done by the connection pool.
redisConnection.close();
}
if (reconnectUsesNewConnection) {
LOGGER.warn("Parameter 'reconnectUsesNewConnection' has no effect when using RedisClusterClient.");
}
Expand All @@ -162,7 +164,7 @@ public void onRedisDisconnected(RedisChannelHandler<?, ?> connection) {
forcedClose = false;
isClosed = false;
executorService = Executors.newSingleThreadExecutor();
future = executorService.submit(this::readFromStream);
readFuture = executorService.submit(this::readFromStream);
}

public void reconnect() {
Expand Down Expand Up @@ -195,14 +197,15 @@ public boolean isClosed() {
return isClosed;
}

/** Stop reading from the Redis Strams. */
/** Stop reading from the Redis Streams. */
@Override
public void close() {
isClosed = true;
if (shouldDestroyConsumerGroupOnClose) {
var syncStreamCommands = redisConnection.syncStreamCommands();
Arrays.stream(offsets).forEach(o -> {
String stream = String.valueOf(o.getName());
LOGGER.debug("Destroying consumer group {} of stream {}", consumerGroup, stream);
LOGGER.trace("Destroying consumer group {} of stream {}", consumerGroup, stream);
try {
syncStreamCommands.xgroupDestroy(stream, consumerGroup);
} catch (Exception ex) {
Expand All @@ -227,9 +230,9 @@ private void doClose() {
redisPool.closeAsync();
redisPool = null;
}
if (future != null) {
future.cancel(true);
future = null;
if (readFuture != null) {
readFuture.cancel(true);
readFuture = null;
}
if (executorService != null) {
executorService.shutdown();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
package io.zeebe.redis.exporter;

import io.lettuce.core.cluster.ClusterClientOptions;
import io.lettuce.core.cluster.ClusterTopologyRefreshOptions;
import io.lettuce.core.cluster.models.partitions.RedisClusterNode;
import io.lettuce.core.resource.ClientResources;

public class ClusterClientSettings {

public static ClientResources createResourcesFromConfig(ExporterConfiguration config) {
return ClientResources.builder()
.ioThreadPoolSize(config.getIoThreadPoolSize())
.build();
}

public static ClusterClientOptions createStandardOptions() {
return ClusterClientOptions.builder()
.autoReconnect(true)
// dynamic adaptive refresh
.topologyRefreshOptions(dynamicRefreshOptions())
// filter out failed nodes from the topology
.nodeFilter(it ->
! (it.is(RedisClusterNode.NodeFlag.FAIL)
|| it.is(RedisClusterNode.NodeFlag.EVENTUAL_FAIL)
|| it.is(RedisClusterNode.NodeFlag.HANDSHAKE)
|| it.is(RedisClusterNode.NodeFlag.NOADDR)))
.validateClusterNodeMembership(true)
.build();
}

private static ClusterTopologyRefreshOptions dynamicRefreshOptions() {
return ClusterTopologyRefreshOptions.builder()
.enableAllAdaptiveRefreshTriggers()
.enablePeriodicRefresh()
.dynamicRefreshSources(true)
.build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -92,9 +92,10 @@ public void open(Controller controller) {
this.controller = controller;

if (config.isUseClusterClient()) {
redisClient = new UniversalRedisClient(RedisClusterClient.create(
ClientResources.builder().ioThreadPoolSize(config.getIoThreadPoolSize()).build(),
config.getRemoteAddress().get()));
var clusterClient = RedisClusterClient.create(
ClusterClientSettings.createResourcesFromConfig(config), config.getRemoteAddress().get());
clusterClient.setOptions(ClusterClientSettings.createStandardOptions());
redisClient = new UniversalRedisClient(clusterClient);
} else {
redisClient = new UniversalRedisClient(RedisClient.create(
ClientResources.builder().ioThreadPoolSize(config.getIoThreadPoolSize()).build(),
Expand Down

0 comments on commit 18255a5

Please sign in to comment.