Skip to content

Commit

Permalink
feat: enabled basic usage of RedisClusterClient
Browse files Browse the repository at this point in the history
  • Loading branch information
VonDerBeck committed Feb 9, 2024
1 parent 27873c0 commit ce380e4
Show file tree
Hide file tree
Showing 11 changed files with 541 additions and 281 deletions.
36 changes: 35 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,7 @@ Setting the Redis remote address is mandatory.
In the Zeebe configuration, you can furthermore change
* whether to use a cluster client
* the value and record types which are exported
* the name resulting in a stream prefix
* the cleanup cycle
Expand All @@ -200,6 +201,9 @@ zeebe:
# Redis connection url (redis://...)
# remoteAddress:

# whether to use the Redis cluster client
useClusterClient: false
# comma separated list of io.zeebe.protocol.record.ValueType to export or empty to export all types
enabledValueTypes: ""

Expand Down Expand Up @@ -234,7 +238,7 @@ zeebe:
format: "protobuf"
```
The values can be overridden by environment variables with the same name and a `ZEEBE_REDIS_` prefix (e.g. `ZEEBE_REDIS_MAX_TIME_TO_LIVE_IN_SECONDS`).
The values can be overridden by environment variables with the same name and a `ZEEBE_REDIS_` prefix (e.g. `ZEEBE_REDIS_MAX_TIME_TO_LIVE_IN_SECONDS`, `ZEEBE_REDIS_USE_CLUSTER_CLIENT`, ...).
Especially when it comes to `ZEEBE_REDIS_REMOTE_ADDRESS` it is recommended to define it as environment variable
and not within the more internal `application.yaml` configuration.
Expand Down Expand Up @@ -319,6 +323,36 @@ In order to tune the exporter performance you have the following options availab
The exporter queues records and sends them every `ZEEBE_REDIS_BATCH_CYCLE_MILLIS`. Sending data then does not use the default auto-flush / flush-after-write mode of Lettuce but groups multiple
commands in a batch using `ZEEBE_REDIS_BATCH_SIZE` as maximum thus increasing the throughput. According to the Lettuce documentation batches are recommended to have a size between 50 and 1000.
### Using Redis clusters
*Since 0.9.10*
When connecting to Redis clusters the underlying Lettuce client must use the `RedisClusterClient` which differs from the standard `RedisClient`.
In order to activate the cluster client usage of the exporter set `ZEEBE_REDIS_USE_CLUSTER_CLIENT=true` in your environment.
On the connector side it's your own responsibility to create the `RedisClusterClient` and use it:
```
final RedisClusterClient redisClient = RedisClusterClient.create(...);

final ZeebeRedis zeebeRedis = ZeebeRedis.newBuilder(redisClusterClient)
.consumerGroup("MyApplication").consumerId("consumer-1")
.addIncidentListener(incident -> { ... })
.addJobListener(job -> { ... })
.build();
```
**Sharding in Redis clusters**
Till now the connector uses a Multi-key operation to receive events from Redis.
Sadly certain Multi-key operations - including the stream commands - are not possible in a cluster. You will
experience errors saying `CROSSSLOT Keys in request don't hash to the same slot` on the connector side.
The temporary workaround for this error is to wrap the default prefix `zeebe` with `{}` ending up
in configuring `ZEEBE_REDIS_NAME={zeebe}` for the exporter and setting the prefix `{zeebe}` on the connector side as well.
The consequence is that all records then end up in the same shard and can therefore all be retrieved collectively as before by using a Multi-key Operation. Even if this does not necessarily correspond to the purpose of a cluster it would at least enable its usage.
This section will disappear as soon as the connector is optionally available without multi-key operations.
## Build it from Source
The exporter and the Java connector can be built with Maven
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,250 @@
package io.zeebe.redis.connect.java;

import io.camunda.zeebe.protocol.record.ValueType;
import io.lettuce.core.RedisBusyException;
import io.lettuce.core.RedisClient;
import io.lettuce.core.XGroupCreateArgs;
import io.lettuce.core.XReadArgs;
import io.lettuce.core.cluster.RedisClusterClient;
import io.zeebe.exporter.proto.Schema;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.time.Duration;
import java.util.*;
import java.util.function.Consumer;

public class RedisConnectionBuilder {

private static final Logger LOGGER = LoggerFactory.getLogger(RedisConnectionBuilder.class);

private static final int XREAD_BLOCK_MILLISECONDS = 2000;
private static final int XREAD_COUNT = 500;
private final UniversalRedisClient redisClient;

private boolean reconnectUsesNewConnection = false;
private Duration reconnectInterval = Duration.ofSeconds(1);

private final Map<String, List<Consumer<?>>> listeners = new HashMap<>();

private String consumerGroup = UUID.randomUUID().toString();

private boolean shouldDestroyConsumerGroupOnClose = true;

private String consumerId = UUID.randomUUID().toString();
private String prefix = "zeebe:";

private String offset = "0-0";

private int xreadBlockMillis = XREAD_BLOCK_MILLISECONDS;

private int xreadCount = XREAD_COUNT;

private boolean deleteMessages = false;

RedisConnectionBuilder(RedisClient redisClient) {
this.redisClient = new UniversalRedisClient(redisClient);
}

RedisConnectionBuilder(RedisClusterClient redisClient) {
this.redisClient = new UniversalRedisClient(redisClient);
}

public RedisConnectionBuilder withReconnectUsingNewConnection() {
this.reconnectUsesNewConnection = true;
return this;
}

public RedisConnectionBuilder reconnectInterval(Duration duration) {
this.reconnectInterval = duration;
return this;
}

/**
* Sets the XREAD [BLOCK milliseconds] parameter. Default is 2000.
*/
public RedisConnectionBuilder xreadBlockMillis(int xreadBlockMillis) {
this.xreadBlockMillis = xreadBlockMillis;
return this;
}

/**
* Sets the XREAD [COUNT count] parameter. Default is 1000.
*/
public RedisConnectionBuilder xreadCount(int xreadCount) {
this.xreadCount = xreadCount;
return this;
}

/**
* Set the consumer group, e.g. the application name.
*/
public RedisConnectionBuilder consumerGroup(String consumerGroup) {
this.consumerGroup = consumerGroup;
this.shouldDestroyConsumerGroupOnClose = false;
return this;
}

/**
* Set the unique consumer ID.
*/
public RedisConnectionBuilder consumerId(String consumerId) {
this.consumerId = consumerId;
return this;
}

/**
* Set the prefix for the Streams to read from.
*/
public RedisConnectionBuilder prefix(String name) {
this.prefix = name + ":";
return this;
}

/**
* Start reading from a given offset.
*/
public RedisConnectionBuilder offset(String offset) {
this.offset = offset;
return this;
}

public RedisConnectionBuilder deleteMessagesAfterSuccessfulHandling(boolean deleteMessages) {
this.deleteMessages = deleteMessages;
return this;
}

private <T extends com.google.protobuf.Message> void addListener(
String valueType, Consumer<T> listener) {
final var recordListeners = listeners.getOrDefault(valueType, new ArrayList<>());
recordListeners.add(listener);
listeners.put(prefix + valueType, recordListeners);
}

public RedisConnectionBuilder addDeploymentListener(Consumer<Schema.DeploymentRecord> listener) {
addListener(ValueType.DEPLOYMENT.name(), listener);
return this;
}

public RedisConnectionBuilder addDeploymentDistributionListener(
Consumer<Schema.DeploymentDistributionRecord> listener) {
addListener(ValueType.DEPLOYMENT_DISTRIBUTION.name(), listener);
return this;
}

public RedisConnectionBuilder addProcessListener(Consumer<Schema.ProcessRecord> listener) {
addListener(ValueType.PROCESS.name(), listener);
return this;
}

public RedisConnectionBuilder addProcessInstanceListener(Consumer<Schema.ProcessInstanceRecord> listener) {
addListener(ValueType.PROCESS_INSTANCE.name(), listener);
return this;
}

public RedisConnectionBuilder addProcessEventListener(Consumer<Schema.ProcessEventRecord> listener) {
addListener(ValueType.PROCESS_EVENT.name(), listener);
return this;
}

public RedisConnectionBuilder addVariableListener(Consumer<Schema.VariableRecord> listener) {
addListener(ValueType.VARIABLE.name(), listener);
return this;
}

public RedisConnectionBuilder addVariableDocumentListener(Consumer<Schema.VariableDocumentRecord> listener) {
addListener(ValueType.VARIABLE_DOCUMENT.name(), listener);
return this;
}

public RedisConnectionBuilder addJobListener(Consumer<Schema.JobRecord> listener) {
addListener(ValueType.JOB.name(), listener);
return this;
}

public RedisConnectionBuilder addJobBatchListener(Consumer<Schema.JobBatchRecord> listener) {
addListener(ValueType.JOB_BATCH.name(), listener);
return this;
}

public RedisConnectionBuilder addIncidentListener(Consumer<Schema.IncidentRecord> listener) {
addListener(ValueType.INCIDENT.name(), listener);
return this;
}

public RedisConnectionBuilder addTimerListener(Consumer<Schema.TimerRecord> listener) {
addListener(ValueType.TIMER.name(), listener);
return this;
}

public RedisConnectionBuilder addMessageListener(Consumer<Schema.MessageRecord> listener) {
addListener(ValueType.MESSAGE.name(), listener);
return this;
}

public RedisConnectionBuilder addMessageSubscriptionListener(
Consumer<Schema.MessageSubscriptionRecord> listener) {
addListener(ValueType.MESSAGE_SUBSCRIPTION.name(), listener);
return this;
}

public RedisConnectionBuilder addMessageStartEventSubscriptionListener(
Consumer<Schema.MessageStartEventSubscriptionRecord> listener) {
addListener(ValueType.MESSAGE_START_EVENT_SUBSCRIPTION.name(), listener);
return this;
}

public RedisConnectionBuilder addProcessMessageSubscriptionListener(
Consumer<Schema.ProcessMessageSubscriptionRecord> listener) {
addListener(ValueType.PROCESS_MESSAGE_SUBSCRIPTION.name(), listener);
return this;
}

public RedisConnectionBuilder addProcessInstanceCreationListener(
Consumer<Schema.ProcessInstanceCreationRecord> listener) {
addListener(ValueType.PROCESS_INSTANCE_CREATION.name(), listener);
return this;
}

public RedisConnectionBuilder addErrorListener(Consumer<Schema.ErrorRecord> listener) {
addListener(ValueType.ERROR.name(), listener);
return this;
}

/**
* Start a background task that reads from Zeebe Streams.
* <br>
* Call {@link #close()} to stop reading.
*/
public ZeebeRedis build() {
if (listeners.size() == 0) {
throw new IllegalArgumentException("Must register a least one listener, but none found.");
}


final var connection = redisClient.connect(new ProtobufCodec());

LOGGER.info("Read from Redis streams '{}*' with offset '{}'", prefix, offset);

// Prepare
var syncStreamCommands = connection.syncStreamCommands();
List<XReadArgs.StreamOffset<String>> offsets = new ArrayList<>();
listeners.keySet().stream().forEach(stream -> {
offsets.add(XReadArgs.StreamOffset.lastConsumed(stream));
try {
syncStreamCommands.xgroupCreate(XReadArgs.StreamOffset.from(stream, offset), consumerGroup,
XGroupCreateArgs.Builder.mkstream());
} catch (RedisBusyException ex) {
// NOOP: consumer group already exists
}
});

final var zeebeRedis = new ZeebeRedis(redisClient, connection, reconnectUsesNewConnection, reconnectInterval,
xreadBlockMillis, xreadCount, consumerGroup, consumerId, prefix,
offsets.toArray(new XReadArgs.StreamOffset[0]), listeners, deleteMessages,
shouldDestroyConsumerGroupOnClose);
zeebeRedis.start();

return zeebeRedis;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
package io.zeebe.redis.connect.java;

import io.lettuce.core.RedisClient;
import io.lettuce.core.cluster.RedisClusterClient;

public class UniversalRedisClient {

private RedisClient redisClient = null;
private RedisClusterClient redisClusterClient = null;

public UniversalRedisClient(RedisClient redisClient) {
this.redisClient = redisClient;
}

public UniversalRedisClient(RedisClusterClient redisClient) {
this.redisClusterClient = redisClient;
}

public UniversalRedisConnection<String, byte[]> connect(ProtobufCodec protobufCodec) {
if (redisClient != null) return new UniversalRedisConnection<>(redisClient.connect(protobufCodec));
return new UniversalRedisConnection<>(redisClusterClient.connect(protobufCodec));
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
package io.zeebe.redis.connect.java;

import io.lettuce.core.RedisConnectionStateListener;
import io.lettuce.core.api.StatefulConnection;
import io.lettuce.core.api.StatefulRedisConnection;
import io.lettuce.core.api.async.RedisStreamAsyncCommands;
import io.lettuce.core.api.sync.RedisStreamCommands;
import io.lettuce.core.cluster.api.StatefulRedisClusterConnection;

public class UniversalRedisConnection<K, V> {

private StatefulRedisConnection<K, V> redisConnection = null;

private StatefulRedisClusterConnection<K, V> redisClusterConnection = null;

private StatefulConnection theConnection;
public UniversalRedisConnection(StatefulConnection<K,V> redisConnection) {
this.theConnection = redisConnection;
if (redisConnection instanceof StatefulRedisConnection) {
this.redisConnection = (StatefulRedisConnection<K, V>) redisConnection;
} else {
this.redisClusterConnection = (StatefulRedisClusterConnection<K, V>) redisConnection;
}
}

public void addListener(RedisConnectionStateListener listener) {
theConnection.addListener(listener);
}

public RedisStreamCommands<K, V> syncStreamCommands() {
if (redisConnection != null) return redisConnection.sync();
return redisClusterConnection.sync();
}

public RedisStreamAsyncCommands<K, V> asyncStreamCommands() {
if (redisConnection != null) return redisConnection.async();
return redisClusterConnection.async();
}

public void close() {
theConnection.close();
}
}
Loading

0 comments on commit ce380e4

Please sign in to comment.