From ca36ba63d9bdde555dfa972c088ddf883ebd4fff Mon Sep 17 00:00:00 2001 From: Gunnar von der Beck Date: Wed, 14 Feb 2024 12:49:43 +0100 Subject: [PATCH] feat: use multiple cluster-connections to read from each stream individually --- README.md | 11 +-- connector-java/pom.xml | 2 +- .../connect/java/UniversalRedisClient.java | 8 ++ .../java/UniversalRedisConnection.java | 7 ++ .../zeebe/redis/connect/java/ZeebeRedis.java | 95 ++++++++++++++----- .../src/test/resources/log4j2-test.xml | 4 +- exporter/pom.xml | 2 +- pom.xml | 2 +- 8 files changed, 95 insertions(+), 36 deletions(-) diff --git a/README.md b/README.md index 2fa2c99..8bf1db9 100644 --- a/README.md +++ b/README.md @@ -343,15 +343,12 @@ final ZeebeRedis zeebeRedis = ZeebeRedis.newBuilder(redisClusterClient) **Sharding in Redis clusters** -Till now the connector uses a Multi-key operation to receive events from Redis. -Sadly certain Multi-key operations - including the stream commands - are not possible in a cluster. You will +When using the standard `RedisClient` the connector uses a Multi-key operation to receive events from Redis. +Some Multi-key operations - including stream commands - are not possible in a cluster. We would experience errors saying `CROSSSLOT Keys in request don't hash to the same slot` on the connector side. -The temporary workaround for this error is to wrap the default prefix `zeebe` with `{}` ending up -in configuring `ZEEBE_REDIS_NAME={zeebe}` for the exporter and setting the prefix `{zeebe}` on the connector side as well. -The consequence is that all records then end up in the same shard and can therefore all be retrieved collectively as before by using a Multi-key Operation. Even if this does not necessarily correspond to the purpose of a cluster it would at least enable its usage. - -This section will disappear as soon as the connector is optionally available without multi-key operations. The feature is currently under development. +In the case of Redis clusters each stream (which corresponds to a Zeebe `ValueType`) will end up in its own shard. +The connector - if initialized correctly with the `RedisClusterClient` - then uses multiple connections to read from each stream individually based on the asynchronous connection pool support of Lettuce. ## Build it from Source diff --git a/connector-java/pom.xml b/connector-java/pom.xml index f959857..d80d100 100644 --- a/connector-java/pom.xml +++ b/connector-java/pom.xml @@ -53,7 +53,7 @@ org.apache.logging.log4j - log4j-slf4j-impl + log4j-slf4j2-impl test diff --git a/connector-java/src/main/java/io/zeebe/redis/connect/java/UniversalRedisClient.java b/connector-java/src/main/java/io/zeebe/redis/connect/java/UniversalRedisClient.java index 3e35f9c..c4dee78 100644 --- a/connector-java/src/main/java/io/zeebe/redis/connect/java/UniversalRedisClient.java +++ b/connector-java/src/main/java/io/zeebe/redis/connect/java/UniversalRedisClient.java @@ -21,4 +21,12 @@ public UniversalRedisConnection connect(ProtobufCodec protobufCo return new UniversalRedisConnection<>(redisClusterClient.connect(protobufCodec)); } + public boolean isCluster() { + return redisClusterClient != null; + } + + public RedisClusterClient getRedisClusterClient() { + return redisClusterClient; + } + } diff --git a/connector-java/src/main/java/io/zeebe/redis/connect/java/UniversalRedisConnection.java b/connector-java/src/main/java/io/zeebe/redis/connect/java/UniversalRedisConnection.java index c61f7b6..604f0aa 100644 --- a/connector-java/src/main/java/io/zeebe/redis/connect/java/UniversalRedisConnection.java +++ b/connector-java/src/main/java/io/zeebe/redis/connect/java/UniversalRedisConnection.java @@ -6,6 +6,7 @@ import io.lettuce.core.api.async.RedisStreamAsyncCommands; import io.lettuce.core.api.sync.RedisStreamCommands; import io.lettuce.core.cluster.api.StatefulRedisClusterConnection; +import io.lettuce.core.support.BoundedAsyncPool; public class UniversalRedisConnection { @@ -40,4 +41,10 @@ public RedisStreamAsyncCommands asyncStreamCommands() { public void close() { theConnection.close(); } + + public void releaseFromPool(BoundedAsyncPool> redisPool) { + if (redisPool != null && redisClusterConnection != null) { + redisPool.release(redisClusterConnection); + } + } } diff --git a/connector-java/src/main/java/io/zeebe/redis/connect/java/ZeebeRedis.java b/connector-java/src/main/java/io/zeebe/redis/connect/java/ZeebeRedis.java index 88fdf1b..950df2d 100644 --- a/connector-java/src/main/java/io/zeebe/redis/connect/java/ZeebeRedis.java +++ b/connector-java/src/main/java/io/zeebe/redis/connect/java/ZeebeRedis.java @@ -4,6 +4,10 @@ import io.camunda.zeebe.protocol.record.ValueType; import io.lettuce.core.*; import io.lettuce.core.cluster.RedisClusterClient; +import io.lettuce.core.cluster.api.StatefulRedisClusterConnection; +import io.lettuce.core.support.AsyncConnectionPoolSupport; +import io.lettuce.core.support.BoundedAsyncPool; +import io.lettuce.core.support.BoundedPoolConfig; import io.zeebe.exporter.proto.Schema; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -14,9 +18,7 @@ import java.util.Arrays; import java.util.List; import java.util.Map; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.Future; +import java.util.concurrent.*; import java.util.function.Consumer; public class ZeebeRedis implements AutoCloseable { @@ -52,6 +54,7 @@ private static AbstractMap.SimpleEntry redisConnection; + private BoundedAsyncPool> redisPool; private int xreadBlockMillis; @@ -118,20 +121,44 @@ public static RedisConnectionBuilder newBuilder(RedisClusterClient redisClient) } protected void start() { - redisConnection.addListener(new RedisConnectionStateAdapter() { - public void onRedisConnected(RedisChannelHandler connection, SocketAddress socketAddress) { - LOGGER.info("Redis reconnected."); + if (redisClient.isCluster()) { + CompletionStage>> poolFuture = + AsyncConnectionPoolSupport.createBoundedObjectPoolAsync(() -> + redisClient.getRedisClusterClient().connectAsync(new ProtobufCodec()), + BoundedPoolConfig.builder().maxTotal(offsets.length).build()); + try { + redisPool = poolFuture.toCompletableFuture().get(); + } catch (InterruptedException e) { + LOGGER.error("Error creating Redis cluster connection pool", e); + throw new RuntimeException(e); + } catch (ExecutionException e) { + LOGGER.error("Error creating Redis cluster connection pool", e); + throw new RuntimeException(e); } - public void onRedisDisconnected(RedisChannelHandler connection) { - if (forcedClose) return; - LOGGER.warn("Redis connection lost."); - if (reconnectUsesNewConnection) { - doClose(); - reconnectExecutorService = Executors.newSingleThreadExecutor(); - reconnectFuture = reconnectExecutorService.submit(ZeebeRedis.this::reconnect); - } + // the connection has been used to create consumer groups and can now be closed + // because reading from streams is done by the connection pool + redisConnection.close(); + if (reconnectUsesNewConnection) { + LOGGER.warn("Parameter 'reconnectUsesNewConnection' has no effect when using RedisClusterClient."); } - }); + } else { + // if we're not connected to a cluster we eventually handle reconnects ourselves + redisConnection.addListener(new RedisConnectionStateAdapter() { + public void onRedisConnected(RedisChannelHandler connection, SocketAddress socketAddress) { + LOGGER.info("Redis reconnected."); + } + + public void onRedisDisconnected(RedisChannelHandler connection) { + if (forcedClose) return; + LOGGER.warn("Redis connection lost."); + if (reconnectUsesNewConnection) { + doClose(); + reconnectExecutorService = Executors.newSingleThreadExecutor(); + reconnectFuture = reconnectExecutorService.submit(ZeebeRedis.this::reconnect); + } + } + }); + } forcedClose = false; isClosed = false; executorService = Executors.newSingleThreadExecutor(); @@ -196,6 +223,10 @@ private void doClose() { isClosed = true; + if (redisPool != null) { + redisPool.closeAsync(); + redisPool = null; + } if (future != null) { future.cancel(true); future = null; @@ -224,20 +255,36 @@ private void readFromStream() { reconnectExecutorService.shutdown(); reconnectExecutorService = null; } - while (!isClosed) { - readNext(); + if (!redisClient.isCluster()) { + while (!isClosed) { + readNext(redisConnection, offsets); + } + } else { + for (XReadArgs.StreamOffset offset : offsets) { + redisPool.acquire().thenAcceptAsync(connection -> { + var universalRedisConnection = new UniversalRedisConnection(connection); + while (!isClosed) { + readNext(universalRedisConnection, offset); + } + redisPool.release(connection); + }); + } } } - private void readNext() { - LOGGER.trace("Consumer[id={}] reads from streams '{}*'", consumerId, prefix); + private void readNext(UniversalRedisConnection redisConnection, XReadArgs.StreamOffset... offsets) { + if (offsets.length == 1) { + LOGGER.trace("Consumer[id={}] reads from stream '{}'", consumerId, offsets[0].getName()); + } else { + LOGGER.trace("Consumer[id={}] reads from streams '{}*'", consumerId, prefix); + } try { - List> messages = redisConnection.syncStreamCommands() + var asyncStreamCommands = redisConnection.asyncStreamCommands(); + List> messages = (List>) asyncStreamCommands .xreadgroup(io.lettuce.core.Consumer.from(consumerGroup, consumerId), - XReadArgs.Builder.block(xreadBlockMillis).count(xreadCount), offsets); + XReadArgs.Builder.block(xreadBlockMillis).count(xreadCount), offsets).get(); - var asyncStreamCommands = redisConnection.asyncStreamCommands(); for (StreamMessage message : messages) { LOGGER.trace("Consumer[id={}] received message {} from {}", consumerId, message.getId(), message.getStream()); var success = handleRecord(message); @@ -248,6 +295,7 @@ private void readNext() { } } catch (IllegalArgumentException ex) { // should not happen with a correct configuration + redisConnection.releaseFromPool(redisPool); LOGGER.error("Illegal arguments for xreadgroup: {}. Closing Redis client.", ex.getMessage()); try { forceClose(); @@ -260,6 +308,7 @@ private void readNext() { } } catch (RedisCommandExecutionException e) { // should not happen, but we want to recover anyway + redisConnection.releaseFromPool(redisPool); if (!isClosed) { LOGGER.error("Consumer[group={}, id={}] failed to read from streams '{}*': {}. Initiating reconnect.", consumerGroup, consumerId, prefix, e.getMessage()); try { @@ -296,9 +345,7 @@ private boolean handleRecord(StreamMessage message) throws Inval private void handleRecord(String stream, Schema.Record genericRecord, Class t) throws InvalidProtocolBufferException { - final var record = genericRecord.getRecord().unpack(t); - LOGGER.trace("Consumer[id={}] handling record {}", consumerId, record); listeners diff --git a/connector-java/src/test/resources/log4j2-test.xml b/connector-java/src/test/resources/log4j2-test.xml index d833d8e..3a9a4bd 100644 --- a/connector-java/src/test/resources/log4j2-test.xml +++ b/connector-java/src/test/resources/log4j2-test.xml @@ -9,8 +9,8 @@ - - + + diff --git a/exporter/pom.xml b/exporter/pom.xml index da5a08d..6ca5727 100644 --- a/exporter/pom.xml +++ b/exporter/pom.xml @@ -66,7 +66,7 @@ org.apache.logging.log4j - log4j-slf4j-impl + log4j-slf4j2-impl test diff --git a/pom.xml b/pom.xml index 25c3a3c..3599c99 100644 --- a/pom.xml +++ b/pom.xml @@ -101,7 +101,7 @@ org.apache.logging.log4j - log4j-slf4j-impl + log4j-slf4j2-impl ${version.log4j}