diff --git a/README.md b/README.md index df9a7fa..6a934a1 100644 --- a/README.md +++ b/README.md @@ -178,6 +178,7 @@ Setting the Redis remote address is mandatory. In the Zeebe configuration, you can furthermore change +* whether to use a cluster client * the value and record types which are exported * the name resulting in a stream prefix * the cleanup cycle @@ -200,6 +201,9 @@ zeebe: # Redis connection url (redis://...) # remoteAddress: + # whether to use the Redis cluster client + useClusterClient: false + # comma separated list of io.zeebe.protocol.record.ValueType to export or empty to export all types enabledValueTypes: "" @@ -234,7 +238,7 @@ zeebe: format: "protobuf" ``` -The values can be overridden by environment variables with the same name and a `ZEEBE_REDIS_` prefix (e.g. `ZEEBE_REDIS_MAX_TIME_TO_LIVE_IN_SECONDS`). +The values can be overridden by environment variables with the same name and a `ZEEBE_REDIS_` prefix (e.g. `ZEEBE_REDIS_MAX_TIME_TO_LIVE_IN_SECONDS`, `ZEEBE_REDIS_USE_CLUSTER_CLIENT`, ...). Especially when it comes to `ZEEBE_REDIS_REMOTE_ADDRESS` it is recommended to define it as environment variable and not within the more internal `application.yaml` configuration. @@ -319,6 +323,36 @@ In order to tune the exporter performance you have the following options availab The exporter queues records and sends them every `ZEEBE_REDIS_BATCH_CYCLE_MILLIS`. Sending data then does not use the default auto-flush / flush-after-write mode of Lettuce but groups multiple commands in a batch using `ZEEBE_REDIS_BATCH_SIZE` as maximum thus increasing the throughput. According to the Lettuce documentation batches are recommended to have a size between 50 and 1000. +### Using Redis clusters +*Since 0.9.10* + +When connecting to Redis clusters the underlying Lettuce client must use the `RedisClusterClient` which differs from the standard `RedisClient`. +In order to activate the cluster client usage of the exporter set `ZEEBE_REDIS_USE_CLUSTER_CLIENT=true` in your environment. + +On the connector side it's your own responsibility to create the `RedisClusterClient` and use it: + +``` +final RedisClusterClient redisClient = RedisClusterClient.create(...); + +final ZeebeRedis zeebeRedis = ZeebeRedis.newBuilder(redisClusterClient) + .consumerGroup("MyApplication").consumerId("consumer-1") + .addIncidentListener(incident -> { ... }) + .addJobListener(job -> { ... }) + .build(); +``` + +**Sharding in Redis clusters** + +Till now the connector uses a Multi-key operation to receive events from Redis. +Sadly certain Multi-key operations - including the stream commands - are not possible in a cluster. You will +experience errors saying `CROSSSLOT Keys in request don't hash to the same slot` on the connector side. + +The temporary workaround for this error is to wrap the default prefix `zeebe` with `{}` ending up +in configuring `ZEEBE_REDIS_NAME={zeebe}` for the exporter and setting the prefix `{zeebe}` on the connector side as well. +The consequence is that all records then end up in the same shard and can therefore all be retrieved collectively as before by using a Multi-key Operation. Even if this does not necessarily correspond to the purpose of a cluster it would at least enable its usage. + +This section will disappear as soon as the connector is optionally available without multi-key operations. + ## Build it from Source The exporter and the Java connector can be built with Maven diff --git a/connector-java/src/main/java/io/zeebe/redis/connect/java/RedisConnectionBuilder.java b/connector-java/src/main/java/io/zeebe/redis/connect/java/RedisConnectionBuilder.java new file mode 100644 index 0000000..bd240e1 --- /dev/null +++ b/connector-java/src/main/java/io/zeebe/redis/connect/java/RedisConnectionBuilder.java @@ -0,0 +1,250 @@ +package io.zeebe.redis.connect.java; + +import io.camunda.zeebe.protocol.record.ValueType; +import io.lettuce.core.RedisBusyException; +import io.lettuce.core.RedisClient; +import io.lettuce.core.XGroupCreateArgs; +import io.lettuce.core.XReadArgs; +import io.lettuce.core.cluster.RedisClusterClient; +import io.zeebe.exporter.proto.Schema; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.time.Duration; +import java.util.*; +import java.util.function.Consumer; + +public class RedisConnectionBuilder { + + private static final Logger LOGGER = LoggerFactory.getLogger(RedisConnectionBuilder.class); + + private static final int XREAD_BLOCK_MILLISECONDS = 2000; + private static final int XREAD_COUNT = 500; + private final UniversalRedisClient redisClient; + + private boolean reconnectUsesNewConnection = false; + private Duration reconnectInterval = Duration.ofSeconds(1); + + private final Map>> listeners = new HashMap<>(); + + private String consumerGroup = UUID.randomUUID().toString(); + + private boolean shouldDestroyConsumerGroupOnClose = true; + + private String consumerId = UUID.randomUUID().toString(); + private String prefix = "zeebe:"; + + private String offset = "0-0"; + + private int xreadBlockMillis = XREAD_BLOCK_MILLISECONDS; + + private int xreadCount = XREAD_COUNT; + + private boolean deleteMessages = false; + + RedisConnectionBuilder(RedisClient redisClient) { + this.redisClient = new UniversalRedisClient(redisClient); + } + + RedisConnectionBuilder(RedisClusterClient redisClient) { + this.redisClient = new UniversalRedisClient(redisClient); + } + + public RedisConnectionBuilder withReconnectUsingNewConnection() { + this.reconnectUsesNewConnection = true; + return this; + } + + public RedisConnectionBuilder reconnectInterval(Duration duration) { + this.reconnectInterval = duration; + return this; + } + + /** + * Sets the XREAD [BLOCK milliseconds] parameter. Default is 2000. + */ + public RedisConnectionBuilder xreadBlockMillis(int xreadBlockMillis) { + this.xreadBlockMillis = xreadBlockMillis; + return this; + } + + /** + * Sets the XREAD [COUNT count] parameter. Default is 1000. + */ + public RedisConnectionBuilder xreadCount(int xreadCount) { + this.xreadCount = xreadCount; + return this; + } + + /** + * Set the consumer group, e.g. the application name. + */ + public RedisConnectionBuilder consumerGroup(String consumerGroup) { + this.consumerGroup = consumerGroup; + this.shouldDestroyConsumerGroupOnClose = false; + return this; + } + + /** + * Set the unique consumer ID. + */ + public RedisConnectionBuilder consumerId(String consumerId) { + this.consumerId = consumerId; + return this; + } + + /** + * Set the prefix for the Streams to read from. + */ + public RedisConnectionBuilder prefix(String name) { + this.prefix = name + ":"; + return this; + } + + /** + * Start reading from a given offset. + */ + public RedisConnectionBuilder offset(String offset) { + this.offset = offset; + return this; + } + + public RedisConnectionBuilder deleteMessagesAfterSuccessfulHandling(boolean deleteMessages) { + this.deleteMessages = deleteMessages; + return this; + } + + private void addListener( + String valueType, Consumer listener) { + final var recordListeners = listeners.getOrDefault(valueType, new ArrayList<>()); + recordListeners.add(listener); + listeners.put(prefix + valueType, recordListeners); + } + + public RedisConnectionBuilder addDeploymentListener(Consumer listener) { + addListener(ValueType.DEPLOYMENT.name(), listener); + return this; + } + + public RedisConnectionBuilder addDeploymentDistributionListener( + Consumer listener) { + addListener(ValueType.DEPLOYMENT_DISTRIBUTION.name(), listener); + return this; + } + + public RedisConnectionBuilder addProcessListener(Consumer listener) { + addListener(ValueType.PROCESS.name(), listener); + return this; + } + + public RedisConnectionBuilder addProcessInstanceListener(Consumer listener) { + addListener(ValueType.PROCESS_INSTANCE.name(), listener); + return this; + } + + public RedisConnectionBuilder addProcessEventListener(Consumer listener) { + addListener(ValueType.PROCESS_EVENT.name(), listener); + return this; + } + + public RedisConnectionBuilder addVariableListener(Consumer listener) { + addListener(ValueType.VARIABLE.name(), listener); + return this; + } + + public RedisConnectionBuilder addVariableDocumentListener(Consumer listener) { + addListener(ValueType.VARIABLE_DOCUMENT.name(), listener); + return this; + } + + public RedisConnectionBuilder addJobListener(Consumer listener) { + addListener(ValueType.JOB.name(), listener); + return this; + } + + public RedisConnectionBuilder addJobBatchListener(Consumer listener) { + addListener(ValueType.JOB_BATCH.name(), listener); + return this; + } + + public RedisConnectionBuilder addIncidentListener(Consumer listener) { + addListener(ValueType.INCIDENT.name(), listener); + return this; + } + + public RedisConnectionBuilder addTimerListener(Consumer listener) { + addListener(ValueType.TIMER.name(), listener); + return this; + } + + public RedisConnectionBuilder addMessageListener(Consumer listener) { + addListener(ValueType.MESSAGE.name(), listener); + return this; + } + + public RedisConnectionBuilder addMessageSubscriptionListener( + Consumer listener) { + addListener(ValueType.MESSAGE_SUBSCRIPTION.name(), listener); + return this; + } + + public RedisConnectionBuilder addMessageStartEventSubscriptionListener( + Consumer listener) { + addListener(ValueType.MESSAGE_START_EVENT_SUBSCRIPTION.name(), listener); + return this; + } + + public RedisConnectionBuilder addProcessMessageSubscriptionListener( + Consumer listener) { + addListener(ValueType.PROCESS_MESSAGE_SUBSCRIPTION.name(), listener); + return this; + } + + public RedisConnectionBuilder addProcessInstanceCreationListener( + Consumer listener) { + addListener(ValueType.PROCESS_INSTANCE_CREATION.name(), listener); + return this; + } + + public RedisConnectionBuilder addErrorListener(Consumer listener) { + addListener(ValueType.ERROR.name(), listener); + return this; + } + + /** + * Start a background task that reads from Zeebe Streams. + *
+ * Call {@link #close()} to stop reading. + */ + public ZeebeRedis build() { + if (listeners.size() == 0) { + throw new IllegalArgumentException("Must register a least one listener, but none found."); + } + + + final var connection = redisClient.connect(new ProtobufCodec()); + + LOGGER.info("Read from Redis streams '{}*' with offset '{}'", prefix, offset); + + // Prepare + var syncStreamCommands = connection.syncStreamCommands(); + List> offsets = new ArrayList<>(); + listeners.keySet().stream().forEach(stream -> { + offsets.add(XReadArgs.StreamOffset.lastConsumed(stream)); + try { + syncStreamCommands.xgroupCreate(XReadArgs.StreamOffset.from(stream, offset), consumerGroup, + XGroupCreateArgs.Builder.mkstream()); + } catch (RedisBusyException ex) { + // NOOP: consumer group already exists + } + }); + + final var zeebeRedis = new ZeebeRedis(redisClient, connection, reconnectUsesNewConnection, reconnectInterval, + xreadBlockMillis, xreadCount, consumerGroup, consumerId, prefix, + offsets.toArray(new XReadArgs.StreamOffset[0]), listeners, deleteMessages, + shouldDestroyConsumerGroupOnClose); + zeebeRedis.start(); + + return zeebeRedis; + } +} diff --git a/connector-java/src/main/java/io/zeebe/redis/connect/java/UniversalRedisClient.java b/connector-java/src/main/java/io/zeebe/redis/connect/java/UniversalRedisClient.java new file mode 100644 index 0000000..3e35f9c --- /dev/null +++ b/connector-java/src/main/java/io/zeebe/redis/connect/java/UniversalRedisClient.java @@ -0,0 +1,24 @@ +package io.zeebe.redis.connect.java; + +import io.lettuce.core.RedisClient; +import io.lettuce.core.cluster.RedisClusterClient; + +public class UniversalRedisClient { + + private RedisClient redisClient = null; + private RedisClusterClient redisClusterClient = null; + + public UniversalRedisClient(RedisClient redisClient) { + this.redisClient = redisClient; + } + + public UniversalRedisClient(RedisClusterClient redisClient) { + this.redisClusterClient = redisClient; + } + + public UniversalRedisConnection connect(ProtobufCodec protobufCodec) { + if (redisClient != null) return new UniversalRedisConnection<>(redisClient.connect(protobufCodec)); + return new UniversalRedisConnection<>(redisClusterClient.connect(protobufCodec)); + } + +} diff --git a/connector-java/src/main/java/io/zeebe/redis/connect/java/UniversalRedisConnection.java b/connector-java/src/main/java/io/zeebe/redis/connect/java/UniversalRedisConnection.java new file mode 100644 index 0000000..c61f7b6 --- /dev/null +++ b/connector-java/src/main/java/io/zeebe/redis/connect/java/UniversalRedisConnection.java @@ -0,0 +1,43 @@ +package io.zeebe.redis.connect.java; + +import io.lettuce.core.RedisConnectionStateListener; +import io.lettuce.core.api.StatefulConnection; +import io.lettuce.core.api.StatefulRedisConnection; +import io.lettuce.core.api.async.RedisStreamAsyncCommands; +import io.lettuce.core.api.sync.RedisStreamCommands; +import io.lettuce.core.cluster.api.StatefulRedisClusterConnection; + +public class UniversalRedisConnection { + + private StatefulRedisConnection redisConnection = null; + + private StatefulRedisClusterConnection redisClusterConnection = null; + + private StatefulConnection theConnection; + public UniversalRedisConnection(StatefulConnection redisConnection) { + this.theConnection = redisConnection; + if (redisConnection instanceof StatefulRedisConnection) { + this.redisConnection = (StatefulRedisConnection) redisConnection; + } else { + this.redisClusterConnection = (StatefulRedisClusterConnection) redisConnection; + } + } + + public void addListener(RedisConnectionStateListener listener) { + theConnection.addListener(listener); + } + + public RedisStreamCommands syncStreamCommands() { + if (redisConnection != null) return redisConnection.sync(); + return redisClusterConnection.sync(); + } + + public RedisStreamAsyncCommands asyncStreamCommands() { + if (redisConnection != null) return redisConnection.async(); + return redisClusterConnection.async(); + } + + public void close() { + theConnection.close(); + } +} diff --git a/connector-java/src/main/java/io/zeebe/redis/connect/java/ZeebeRedis.java b/connector-java/src/main/java/io/zeebe/redis/connect/java/ZeebeRedis.java index 7d475c3..88fdf1b 100644 --- a/connector-java/src/main/java/io/zeebe/redis/connect/java/ZeebeRedis.java +++ b/connector-java/src/main/java/io/zeebe/redis/connect/java/ZeebeRedis.java @@ -3,14 +3,17 @@ import com.google.protobuf.InvalidProtocolBufferException; import io.camunda.zeebe.protocol.record.ValueType; import io.lettuce.core.*; -import io.lettuce.core.api.StatefulRedisConnection; +import io.lettuce.core.cluster.RedisClusterClient; import io.zeebe.exporter.proto.Schema; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.net.SocketAddress; import java.time.Duration; -import java.util.*; +import java.util.AbstractMap; +import java.util.Arrays; +import java.util.List; +import java.util.Map; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; @@ -21,8 +24,6 @@ public class ZeebeRedis implements AutoCloseable { private static final Logger LOGGER = LoggerFactory.getLogger(ZeebeRedis.class); private static final Map > RECORD_MESSAGE_TYPES; - private static final int XREAD_BLOCK_MILLISECONDS = 2000; - private static final int XREAD_COUNT = 500; static { RECORD_MESSAGE_TYPES = Map.ofEntries( @@ -49,13 +50,12 @@ private static AbstractMap.SimpleEntry(valueType, messageClass); } - private RedisClient redisClient; + private UniversalRedisClient redisClient; + private UniversalRedisConnection redisConnection; - private StatefulRedisConnection redisConnection; + private int xreadBlockMillis; - private int xreadBlockMillis = XREAD_BLOCK_MILLISECONDS; - - private int xreadCount = XREAD_COUNT; + private int xreadCount; private String consumerGroup; @@ -67,29 +67,29 @@ private static AbstractMap.SimpleEntry>> listeners; - private boolean deleteMessages = false; + private boolean deleteMessages; private Future future; private ExecutorService executorService; - private boolean reconnectUsesNewConnection = false; + private boolean reconnectUsesNewConnection; private long reconnectIntervalMillis; private Future reconnectFuture; private ExecutorService reconnectExecutorService; private volatile boolean isClosed = false; private volatile boolean forcedClose = false; - private boolean shouldDestroyConsumerGroupOnClose = false; - - private ZeebeRedis(RedisClient redisClient, - StatefulRedisConnection redisConnection, - boolean reconnectUsesNewConnection, Duration reconnectInterval, - int xreadBlockMillis, int xreadCount, - String consumerGroup, String consumerId, - String prefix, XReadArgs.StreamOffset[] offsets, - Map>> listeners, - boolean deleteMessages, - boolean shouldDestroyConsumerGroupOnClose) { + private boolean shouldDestroyConsumerGroupOnClose; + + protected ZeebeRedis(UniversalRedisClient redisClient, + UniversalRedisConnection redisConnection, + boolean reconnectUsesNewConnection, Duration reconnectInterval, + int xreadBlockMillis, int xreadCount, + String consumerGroup, String consumerId, + String prefix, XReadArgs.StreamOffset[] offsets, + Map>> listeners, + boolean deleteMessages, + boolean shouldDestroyConsumerGroupOnClose) { this.redisClient = redisClient; this.redisConnection = redisConnection; this.reconnectUsesNewConnection = reconnectUsesNewConnection; @@ -109,11 +109,15 @@ private ZeebeRedis(RedisClient redisClient, } /** Returns a new builder to read from the Redis Streams. */ - public static Builder newBuilder(RedisClient redisClient) { - return new ZeebeRedis.Builder(redisClient); + public static RedisConnectionBuilder newBuilder(RedisClient redisClient) { + return new RedisConnectionBuilder(redisClient); + } + + public static RedisConnectionBuilder newBuilder(RedisClusterClient redisClient) { + return new RedisConnectionBuilder(redisClient); } - private void start() { + protected void start() { redisConnection.addListener(new RedisConnectionStateAdapter() { public void onRedisConnected(RedisChannelHandler connection, SocketAddress socketAddress) { LOGGER.info("Redis reconnected."); @@ -141,9 +145,10 @@ public void reconnect() { Thread.sleep(reconnectIntervalMillis); redisConnection = redisClient.connect(protobufCodec); LOGGER.info("Redis reconnected."); + var syncStreamCommands = redisConnection.syncStreamCommands(); listeners.keySet().stream().forEach(stream -> { try { - redisConnection.sync().xgroupCreate(XReadArgs.StreamOffset.from(stream, "0-0"), consumerGroup, + syncStreamCommands.xgroupCreate(XReadArgs.StreamOffset.from(stream, "0-0"), consumerGroup, XGroupCreateArgs.Builder.mkstream()); } catch (RedisBusyException ex) { // NOOP: consumer group already exists @@ -167,11 +172,12 @@ public boolean isClosed() { @Override public void close() { if (shouldDestroyConsumerGroupOnClose) { + var syncStreamCommands = redisConnection.syncStreamCommands(); Arrays.stream(offsets).forEach(o -> { String stream = String.valueOf(o.getName()); LOGGER.debug("Destroying consumer group {} of stream {}", consumerGroup, stream); try { - redisConnection.sync().xgroupDestroy(stream, consumerGroup); + syncStreamCommands.xgroupDestroy(stream, consumerGroup); } catch (Exception ex) { LOGGER.error("Error destroying consumer group {} of stream {}", consumerGroup, stream); } @@ -227,16 +233,17 @@ private void readNext() { LOGGER.trace("Consumer[id={}] reads from streams '{}*'", consumerId, prefix); try { - List> messages = redisConnection.sync() + List> messages = redisConnection.syncStreamCommands() .xreadgroup(io.lettuce.core.Consumer.from(consumerGroup, consumerId), XReadArgs.Builder.block(xreadBlockMillis).count(xreadCount), offsets); + var asyncStreamCommands = redisConnection.asyncStreamCommands(); for (StreamMessage message : messages) { LOGGER.trace("Consumer[id={}] received message {} from {}", consumerId, message.getId(), message.getStream()); var success = handleRecord(message); - redisConnection.async().xack(message.getStream(), consumerGroup, message.getId()); + asyncStreamCommands.xack(message.getStream(), consumerGroup, message.getId()); if (deleteMessages && success) { - redisConnection.async().xdel(message.getStream(), message.getId()); + asyncStreamCommands.xdel(message.getStream(), message.getId()); } } } catch (IllegalArgumentException ex) { @@ -270,7 +277,6 @@ private void readNext() { } } } - private boolean handleRecord(StreamMessage message) throws InvalidProtocolBufferException { final var messageValue = message.getBody().values().iterator().next(); final var genericRecord = Schema.Record.parseFrom(messageValue); @@ -298,220 +304,6 @@ final var record = genericRecord.getRecord().unpack(t); listeners .getOrDefault(stream, List.of()) .forEach(listener -> ((Consumer) listener).accept(record)); - } - public static class Builder { - - private final RedisClient redisClient; - - private boolean reconnectUsesNewConnection = false; - private Duration reconnectInterval = Duration.ofSeconds(1); - - private final Map>> listeners = new HashMap<>(); - - private String consumerGroup = UUID.randomUUID().toString(); - - private boolean shouldDestroyConsumerGroupOnClose = true; - - private String consumerId = UUID.randomUUID().toString(); - private String prefix = "zeebe:"; - - private String offset = "0-0"; - - private int xreadBlockMillis = XREAD_BLOCK_MILLISECONDS; - - private int xreadCount = XREAD_COUNT; - - private boolean deleteMessages = false; - - private Builder(RedisClient redisClient) { - this.redisClient = redisClient; - } - - public Builder withReconnectUsingNewConnection() { - this.reconnectUsesNewConnection = true; - return this; - } - - public Builder reconnectInterval(Duration duration) { - this.reconnectInterval = duration; - return this; - } - - /** Sets the XREAD [BLOCK milliseconds] parameter. Default is 2000. */ - public Builder xreadBlockMillis(int xreadBlockMillis) { - this.xreadBlockMillis = xreadBlockMillis; - return this; - } - - /** Sets the XREAD [COUNT count] parameter. Default is 1000. */ - public Builder xreadCount(int xreadCount) { - this.xreadCount = xreadCount; - return this; - } - - /** Set the consumer group, e.g. the application name. */ - public Builder consumerGroup(String consumerGroup) { - this.consumerGroup = consumerGroup; - this.shouldDestroyConsumerGroupOnClose = false; - return this; - } - - /** Set the unique consumer ID. */ - public Builder consumerId(String consumerId) { - this.consumerId = consumerId; - return this; - } - - /** Set the prefix for the Streams to read from. */ - public Builder prefix(String name) { - this.prefix = name + ":"; - return this; - } - - /** Start reading from a given offset. */ - public Builder offset(String offset) { - this.offset = offset; - return this; - } - - public Builder deleteMessagesAfterSuccessfulHandling(boolean deleteMessages) { - this.deleteMessages = deleteMessages; - return this; - } - - private void addListener( - String valueType, Consumer listener) { - final var recordListeners = listeners.getOrDefault(valueType, new ArrayList<>()); - recordListeners.add(listener); - listeners.put(prefix + valueType, recordListeners); - } - - public Builder addDeploymentListener(Consumer listener) { - addListener(ValueType.DEPLOYMENT.name(), listener); - return this; - } - - public Builder addDeploymentDistributionListener( - Consumer listener) { - addListener(ValueType.DEPLOYMENT_DISTRIBUTION.name(), listener); - return this; - } - - public Builder addProcessListener(Consumer listener) { - addListener(ValueType.PROCESS.name(), listener); - return this; - } - - public Builder addProcessInstanceListener(Consumer listener) { - addListener(ValueType.PROCESS_INSTANCE.name(), listener); - return this; - } - - public Builder addProcessEventListener(Consumer listener) { - addListener(ValueType.PROCESS_EVENT.name(), listener); - return this; - } - - public Builder addVariableListener(Consumer listener) { - addListener(ValueType.VARIABLE.name(), listener); - return this; - } - - public Builder addVariableDocumentListener(Consumer listener) { - addListener(ValueType.VARIABLE_DOCUMENT.name(), listener); - return this; - } - - public Builder addJobListener(Consumer listener) { - addListener(ValueType.JOB.name(), listener); - return this; - } - - public Builder addJobBatchListener(Consumer listener) { - addListener(ValueType.JOB_BATCH.name(), listener); - return this; - } - - public Builder addIncidentListener(Consumer listener) { - addListener(ValueType.INCIDENT.name(), listener); - return this; - } - - public Builder addTimerListener(Consumer listener) { - addListener(ValueType.TIMER.name(), listener); - return this; - } - - public Builder addMessageListener(Consumer listener) { - addListener(ValueType.MESSAGE.name(), listener); - return this; - } - - public Builder addMessageSubscriptionListener( - Consumer listener) { - addListener(ValueType.MESSAGE_SUBSCRIPTION.name(), listener); - return this; - } - - public Builder addMessageStartEventSubscriptionListener( - Consumer listener) { - addListener(ValueType.MESSAGE_START_EVENT_SUBSCRIPTION.name(), listener); - return this; - } - - public Builder addProcessMessageSubscriptionListener( - Consumer listener) { - addListener(ValueType.PROCESS_MESSAGE_SUBSCRIPTION.name(), listener); - return this; - } - - public Builder addProcessInstanceCreationListener( - Consumer listener) { - addListener(ValueType.PROCESS_INSTANCE_CREATION.name(), listener); - return this; - } - - public Builder addErrorListener(Consumer listener) { - addListener(ValueType.ERROR.name(), listener); - return this; - } - - /** - * Start a background task that reads from Zeebe Streams. - *
- * Call {@link #close()} to stop reading. - */ - public ZeebeRedis build() { - if (listeners.size() == 0) { - throw new IllegalArgumentException("Must register a least one listener, but none found."); - } - - - final var connection = redisClient.connect(new ProtobufCodec()); - - LOGGER.info("Read from Redis streams '{}*' with offset '{}'", prefix, offset); - - // Prepare - List> offsets = new ArrayList<>(); - listeners.keySet().stream().forEach(stream -> { - offsets.add(XReadArgs.StreamOffset.lastConsumed(stream)); - try { - connection.sync().xgroupCreate(XReadArgs.StreamOffset.from(stream, offset), consumerGroup, - XGroupCreateArgs.Builder.mkstream()); - } catch (RedisBusyException ex) { - // NOOP: consumer group already exists - } - }); - - final var zeebeRedis = new ZeebeRedis(redisClient, connection, reconnectUsesNewConnection, reconnectInterval, - xreadBlockMillis, xreadCount, consumerGroup, consumerId, prefix, - offsets.toArray(new XReadArgs.StreamOffset[0]), listeners, deleteMessages, - shouldDestroyConsumerGroupOnClose); - zeebeRedis.start(); - - return zeebeRedis; - } - } } diff --git a/exporter/src/main/java/io/zeebe/redis/exporter/ExporterConfiguration.java b/exporter/src/main/java/io/zeebe/redis/exporter/ExporterConfiguration.java index fb29479..78cf33e 100644 --- a/exporter/src/main/java/io/zeebe/redis/exporter/ExporterConfiguration.java +++ b/exporter/src/main/java/io/zeebe/redis/exporter/ExporterConfiguration.java @@ -17,6 +17,7 @@ public class ExporterConfiguration { private String name = "zeebe"; private String remoteAddress; + private boolean useClusterClient = false; private long cleanupCycleInSeconds = Duration.ofMinutes(1).toSeconds(); @@ -83,6 +84,10 @@ public Optional getRemoteAddress() { .map(RedisURI::create); } + public boolean isUseClusterClient() { + return getEnv("USE_CLUSTER_CLIENT").map(Boolean::parseBoolean).orElse(useClusterClient); + } + private Optional getEnv(String name) { return Optional.ofNullable(System.getenv(ENV_PREFIX + name)); } @@ -91,6 +96,7 @@ private Optional getEnv(String name) { public String toString() { return "[" + "remoteAddress='" + getRemoteAddress() + '\'' + + ", useClusterClient='" + isUseClusterClient() + '\'' + ", enabledValueTypes='" + getEnabledValueTypes() + '\'' + ", enabledRecordTypes='" + getEnabledRecordTypes() + '\'' + ", format='" + getFormat() + '\'' + diff --git a/exporter/src/main/java/io/zeebe/redis/exporter/RedisCleaner.java b/exporter/src/main/java/io/zeebe/redis/exporter/RedisCleaner.java index b309a1f..0536bd7 100644 --- a/exporter/src/main/java/io/zeebe/redis/exporter/RedisCleaner.java +++ b/exporter/src/main/java/io/zeebe/redis/exporter/RedisCleaner.java @@ -1,7 +1,11 @@ package io.zeebe.redis.exporter; -import io.lettuce.core.*; -import io.lettuce.core.api.StatefulRedisConnection; +import io.lettuce.core.RedisCommandTimeoutException; +import io.lettuce.core.RedisConnectionException; +import io.lettuce.core.SetArgs; +import io.lettuce.core.XTrimArgs; +import io.lettuce.core.api.sync.RedisStreamCommands; +import io.lettuce.core.api.sync.RedisStringCommands; import org.slf4j.Logger; import java.nio.charset.StandardCharsets; @@ -14,7 +18,7 @@ public class RedisCleaner { private final static String CLEANUP_LOCK = "zeebe:cleanup-lock"; private final static String CLEANUP_TIMESTAMP = "zeebe:cleanup-time"; private final Logger logger; - private StatefulRedisConnection redisConnection; + private UniversalRedisConnection redisConnection; private Map streams = new ConcurrentHashMap<>(); @@ -25,7 +29,7 @@ public class RedisCleaner { private Duration trimScheduleDelay; - public RedisCleaner(StatefulRedisConnection redisConnection, boolean useProtoBuf, ExporterConfiguration config, Logger logger) { + public RedisCleaner(UniversalRedisConnection redisConnection, boolean useProtoBuf, ExporterConfiguration config, Logger logger) { this.logger = logger; this.redisConnection = redisConnection; this.useProtoBuf = useProtoBuf; @@ -37,7 +41,7 @@ public RedisCleaner(StatefulRedisConnection redisConnection, boolean trimScheduleDelay = Duration.ofSeconds(config.getCleanupCycleInSeconds()); } - public void setRedisConnection(StatefulRedisConnection redisConnection) { + public void setRedisConnection(UniversalRedisConnection redisConnection) { this.redisConnection = redisConnection; } @@ -57,13 +61,14 @@ public void trimStreamValues() { logger.debug("trim streams {}", streams); // trim all streams List keys = new ArrayList(streams.keySet()); + RedisStreamCommands streamCommands = redisConnection.syncStreamCommands(); keys.forEach(stream -> { Optional minDelivered = !deleteAfterAcknowledge ? Optional.empty() : - redisConnection.sync().xinfoGroups(stream) + streamCommands.xinfoGroups(stream) .stream().map(o -> XInfoGroup.fromXInfo(o, useProtoBuf)) .map(xi -> { if (xi.getPending() > 0) { - xi.considerPendingMessageId(redisConnection.sync() + xi.considerPendingMessageId(streamCommands .xpending(stream, xi.getName()).getMessageIds().getLower().getValue()); } return xi.getLastDeliveredId(); @@ -77,12 +82,12 @@ public void trimStreamValues() { } else if (minTtlInMillisConfig > 0 && minTTLMillis < minDeliveredMillis) { xtrimMinId = minTTLId; } - long numTrimmed = redisConnection.sync().xtrim(stream, new XTrimArgs().minId(xtrimMinId)); + long numTrimmed = streamCommands.xtrim(stream, new XTrimArgs().minId(xtrimMinId)); if (numTrimmed > 0) { logger.debug("{}: {} cleaned records", stream, numTrimmed); } } else if (maxTtlInMillisConfig > 0) { - long numTrimmed = redisConnection.sync().xtrim(stream, new XTrimArgs().minId(maxTTLId)); + long numTrimmed = streamCommands.xtrim(stream, new XTrimArgs().minId(maxTTLId)); if (numTrimmed > 0) { logger.debug("{}: {} cleaned records", stream, numTrimmed); } @@ -105,28 +110,28 @@ private boolean acquireCleanupLock() { // ProtoBuf format if (useProtoBuf) { // try to get lock - StatefulRedisConnection con = (StatefulRedisConnection) redisConnection; - con.sync().set(CLEANUP_LOCK, id.getBytes(StandardCharsets.UTF_8), SetArgs.Builder.nx().px(trimScheduleDelay)); - byte[] getResult = con.sync().get(CLEANUP_LOCK); + RedisStringCommands stringCommands = (RedisStringCommands) redisConnection.syncStringCommands(); + stringCommands.set(CLEANUP_LOCK, id.getBytes(StandardCharsets.UTF_8), SetArgs.Builder.nx().px(trimScheduleDelay)); + byte[] getResult = stringCommands.get(CLEANUP_LOCK); if (getResult != null && getResult.length > 0 && id.equals(new String(getResult, StandardCharsets.UTF_8))) { // lock successful: check last cleanup timestamp (autoscaled new Zeebe instances etc.) - byte[] lastCleanup = con.sync().get(CLEANUP_TIMESTAMP); + byte[] lastCleanup = stringCommands.get(CLEANUP_TIMESTAMP); if (lastCleanup == null || lastCleanup.length == 0 || Long.parseLong(new String(lastCleanup, StandardCharsets.UTF_8)) < now - trimScheduleDelay.toMillis()) { - con.sync().set(CLEANUP_TIMESTAMP, Long.toString(now).getBytes(StandardCharsets.UTF_8)); + stringCommands.set(CLEANUP_TIMESTAMP, Long.toString(now).getBytes(StandardCharsets.UTF_8)); return true; } } // JSON format } else { // try to get lock - StatefulRedisConnection con = (StatefulRedisConnection) redisConnection; - con.sync().set(CLEANUP_LOCK, id, SetArgs.Builder.nx().px(trimScheduleDelay)); - if (id.equals(con.sync().get(CLEANUP_LOCK))) { + RedisStringCommands stringCommands = (RedisStringCommands) redisConnection.syncStringCommands(); + stringCommands.set(CLEANUP_LOCK, id, SetArgs.Builder.nx().px(trimScheduleDelay)); + if (id.equals(stringCommands.get(CLEANUP_LOCK))) { // lock successful: check last cleanup timestamp (autoscaled new Zeebe instances etc.) - String lastCleanup = con.sync().get(CLEANUP_TIMESTAMP); + String lastCleanup = stringCommands.get(CLEANUP_TIMESTAMP); if (lastCleanup == null || lastCleanup.isEmpty() || Long.parseLong(lastCleanup) < now - trimScheduleDelay.toMillis()) { - con.sync().set(CLEANUP_TIMESTAMP, Long.toString(now)); + stringCommands.set(CLEANUP_TIMESTAMP, Long.toString(now)); return true; } } @@ -141,7 +146,7 @@ private boolean acquireCleanupLock() { private void releaseCleanupLock() { try { - redisConnection.sync().del(CLEANUP_LOCK); + redisConnection.syncDel(CLEANUP_LOCK); } catch (RedisCommandTimeoutException | RedisConnectionException ex) { logger.error("Error releasing cleanup lock due to possible Redis unavailability: {}", ex.getMessage()); } catch (Exception ex) { diff --git a/exporter/src/main/java/io/zeebe/redis/exporter/RedisExporter.java b/exporter/src/main/java/io/zeebe/redis/exporter/RedisExporter.java index a01e0dc..63bee54 100644 --- a/exporter/src/main/java/io/zeebe/redis/exporter/RedisExporter.java +++ b/exporter/src/main/java/io/zeebe/redis/exporter/RedisExporter.java @@ -4,15 +4,15 @@ import io.camunda.zeebe.exporter.api.context.Context; import io.camunda.zeebe.exporter.api.context.Controller; import io.camunda.zeebe.protocol.record.Record; -import io.lettuce.core.*; -import io.lettuce.core.api.StatefulRedisConnection; +import io.lettuce.core.RedisClient; +import io.lettuce.core.RedisConnectionException; +import io.lettuce.core.cluster.RedisClusterClient; import io.lettuce.core.resource.ClientResources; import io.zeebe.exporter.proto.RecordTransformer; import io.zeebe.exporter.proto.Schema; import org.apache.commons.lang3.tuple.ImmutablePair; import org.slf4j.Logger; -import java.time.Duration; import java.util.ArrayList; import java.util.List; import java.util.concurrent.Executors; @@ -25,9 +25,9 @@ public class RedisExporter implements Exporter { private ExporterConfiguration config; private Logger logger; - private RedisClient redisClient; - private StatefulRedisConnection cleanupConnection; - private StatefulRedisConnection senderConnection; + private UniversalRedisClient redisClient; + private UniversalRedisConnection cleanupConnection; + private UniversalRedisConnection senderConnection; private Function recordTransformer; private boolean useProtoBuf = false; @@ -91,9 +91,15 @@ public void open(Controller controller) { } this.controller = controller; - redisClient = RedisClient.create( - ClientResources.builder().ioThreadPoolSize(config.getIoThreadPoolSize()).build(), - config.getRemoteAddress().get()); + if (config.isUseClusterClient()) { + redisClient = new UniversalRedisClient(RedisClusterClient.create( + ClientResources.builder().ioThreadPoolSize(config.getIoThreadPoolSize()).build(), + config.getRemoteAddress().get())); + } else { + redisClient = new UniversalRedisClient(RedisClient.create( + ClientResources.builder().ioThreadPoolSize(config.getIoThreadPoolSize()).build(), + config.getRemoteAddress().get())); + } connectToRedis(); } diff --git a/exporter/src/main/java/io/zeebe/redis/exporter/RedisSender.java b/exporter/src/main/java/io/zeebe/redis/exporter/RedisSender.java index 0449e8a..94fcb83 100644 --- a/exporter/src/main/java/io/zeebe/redis/exporter/RedisSender.java +++ b/exporter/src/main/java/io/zeebe/redis/exporter/RedisSender.java @@ -2,8 +2,7 @@ import io.camunda.zeebe.exporter.api.context.Controller; import io.lettuce.core.*; -import io.lettuce.core.api.StatefulRedisConnection; -import io.lettuce.core.api.async.RedisAsyncCommands; +import io.lettuce.core.api.async.RedisStreamAsyncCommands; import org.apache.commons.lang3.tuple.ImmutablePair; import org.slf4j.Logger; @@ -17,7 +16,7 @@ public class RedisSender { private final Logger logger; private final Controller controller; - private final StatefulRedisConnection redisConnection; + private final UniversalRedisConnection redisConnection; private final AtomicBoolean redisConnected = new AtomicBoolean(true); @@ -25,7 +24,7 @@ public class RedisSender { private final List> deQueue = new ArrayList<>(); - public RedisSender(ExporterConfiguration configuration, Controller controller, StatefulRedisConnection redisConnection, Logger logger) { + public RedisSender(ExporterConfiguration configuration, Controller controller, UniversalRedisConnection redisConnection, Logger logger) { this.batchSize = configuration.getBatchSize(); this.controller = controller; this.redisConnection = redisConnection; @@ -50,7 +49,7 @@ void sendFrom(EventQueue eventQueue) { } try { Long positionOfLastRecordInBatch = -1L; - RedisAsyncCommands commands = redisConnection.async(); + RedisStreamAsyncCommands commands = redisConnection.asyncStreamCommands(); List> futures = new ArrayList<>(); ImmutablePair nextEvent = eventQueue.getNextEvent(); while (nextEvent != null) { @@ -91,7 +90,7 @@ private boolean sendDeQueue() { } try { Long positionOfLastRecordInBatch = -1L; - RedisAsyncCommands commands = redisConnection.async(); + RedisStreamAsyncCommands commands = redisConnection.asyncStreamCommands(); List> futures = new ArrayList<>(); for (var nextEvent : deQueue) { var eventValue = nextEvent.getValue(); diff --git a/exporter/src/main/java/io/zeebe/redis/exporter/UniversalRedisClient.java b/exporter/src/main/java/io/zeebe/redis/exporter/UniversalRedisClient.java new file mode 100644 index 0000000..022de3c --- /dev/null +++ b/exporter/src/main/java/io/zeebe/redis/exporter/UniversalRedisClient.java @@ -0,0 +1,36 @@ +package io.zeebe.redis.exporter; + +import io.lettuce.core.RedisClient; +import io.lettuce.core.cluster.RedisClusterClient; + +public class UniversalRedisClient { + + private RedisClient redisClient = null; + private RedisClusterClient redisClusterClient = null; + + public UniversalRedisClient(RedisClient redisClient) { + this.redisClient = redisClient; + } + + public UniversalRedisClient(RedisClusterClient redisClient) { + this.redisClusterClient = redisClient; + } + + public UniversalRedisConnection connect(ProtobufCodec protobufCodec) { + if (redisClient != null) return new UniversalRedisConnection<>(redisClient.connect(protobufCodec)); + return new UniversalRedisConnection<>(redisClusterClient.connect(protobufCodec)); + } + + public UniversalRedisConnection connect() { + if (redisClient != null) return new UniversalRedisConnection<>(redisClient.connect()); + return new UniversalRedisConnection<>(redisClusterClient.connect()); + } + + public void shutdown() { + if (redisClient != null) { + redisClient.shutdown(); + } else { + redisClusterClient.shutdown(); + } + } +} diff --git a/exporter/src/main/java/io/zeebe/redis/exporter/UniversalRedisConnection.java b/exporter/src/main/java/io/zeebe/redis/exporter/UniversalRedisConnection.java new file mode 100644 index 0000000..2ff8feb --- /dev/null +++ b/exporter/src/main/java/io/zeebe/redis/exporter/UniversalRedisConnection.java @@ -0,0 +1,65 @@ +package io.zeebe.redis.exporter; + +import io.lettuce.core.RedisConnectionStateListener; +import io.lettuce.core.api.StatefulConnection; +import io.lettuce.core.api.StatefulRedisConnection; +import io.lettuce.core.api.async.RedisStreamAsyncCommands; +import io.lettuce.core.api.sync.RedisStreamCommands; +import io.lettuce.core.api.sync.RedisStringCommands; +import io.lettuce.core.cluster.api.StatefulRedisClusterConnection; + +public class UniversalRedisConnection { + + private StatefulRedisConnection redisConnection = null; + + private StatefulRedisClusterConnection redisClusterConnection = null; + + private StatefulConnection theConnection; + public UniversalRedisConnection(StatefulConnection redisConnection) { + this.theConnection = redisConnection; + if (redisConnection instanceof StatefulRedisConnection) { + this.redisConnection = (StatefulRedisConnection) redisConnection; + } else { + this.redisClusterConnection = (StatefulRedisClusterConnection) redisConnection; + } + } + + public void addListener(RedisConnectionStateListener listener) { + theConnection.addListener(listener); + } + + public RedisStreamCommands syncStreamCommands() { + if (redisConnection != null) return redisConnection.sync(); + return redisClusterConnection.sync(); + } + + public RedisStringCommands syncStringCommands() { + if (redisConnection != null) return redisConnection.sync(); + return redisClusterConnection.sync(); + } + + public RedisStreamAsyncCommands asyncStreamCommands() { + if (redisConnection != null) return redisConnection.async(); + return redisClusterConnection.async(); + } + + public void setAutoFlushCommands(boolean autoFlush) { + theConnection.setAutoFlushCommands(autoFlush); + } + + public void flushCommands() { + theConnection.flushCommands(); + } + + public void syncDel(K... ks) { + if (redisConnection != null) { + redisConnection.sync().del(ks); + } else { + redisClusterConnection.sync().del(ks); + } + } + + public void close() { + theConnection.close(); + } +}