Skip to content

Commit

Permalink
feat: use multiple cluster-connections to read from each stream indiv…
Browse files Browse the repository at this point in the history
…idually
  • Loading branch information
VonDerBeck committed Feb 14, 2024
1 parent f83e129 commit ca36ba6
Show file tree
Hide file tree
Showing 8 changed files with 95 additions and 36 deletions.
11 changes: 4 additions & 7 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -343,15 +343,12 @@ final ZeebeRedis zeebeRedis = ZeebeRedis.newBuilder(redisClusterClient)
**Sharding in Redis clusters**
Till now the connector uses a Multi-key operation to receive events from Redis.
Sadly certain Multi-key operations - including the stream commands - are not possible in a cluster. You will
When using the standard `RedisClient` the connector uses a Multi-key operation to receive events from Redis.
Some Multi-key operations - including stream commands - are not possible in a cluster. We would
experience errors saying `CROSSSLOT Keys in request don't hash to the same slot` on the connector side.
The temporary workaround for this error is to wrap the default prefix `zeebe` with `{}` ending up
in configuring `ZEEBE_REDIS_NAME={zeebe}` for the exporter and setting the prefix `{zeebe}` on the connector side as well.
The consequence is that all records then end up in the same shard and can therefore all be retrieved collectively as before by using a Multi-key Operation. Even if this does not necessarily correspond to the purpose of a cluster it would at least enable its usage.
This section will disappear as soon as the connector is optionally available without multi-key operations. The feature is currently under development.
In the case of Redis clusters each stream (which corresponds to a Zeebe `ValueType`) will end up in its own shard.
The connector - if initialized correctly with the `RedisClusterClient` - then uses multiple connections to read from each stream individually based on the asynchronous connection pool support of Lettuce.
## Build it from Source
Expand Down
2 changes: 1 addition & 1 deletion connector-java/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@

<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-slf4j-impl</artifactId>
<artifactId>log4j-slf4j2-impl</artifactId>
<scope>test</scope>
</dependency>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,4 +21,12 @@ public UniversalRedisConnection<String, byte[]> connect(ProtobufCodec protobufCo
return new UniversalRedisConnection<>(redisClusterClient.connect(protobufCodec));
}

public boolean isCluster() {
return redisClusterClient != null;
}

public RedisClusterClient getRedisClusterClient() {
return redisClusterClient;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import io.lettuce.core.api.async.RedisStreamAsyncCommands;
import io.lettuce.core.api.sync.RedisStreamCommands;
import io.lettuce.core.cluster.api.StatefulRedisClusterConnection;
import io.lettuce.core.support.BoundedAsyncPool;

public class UniversalRedisConnection<K, V> {

Expand Down Expand Up @@ -40,4 +41,10 @@ public RedisStreamAsyncCommands<K, V> asyncStreamCommands() {
public void close() {
theConnection.close();
}

public void releaseFromPool(BoundedAsyncPool<StatefulRedisClusterConnection<K, V>> redisPool) {
if (redisPool != null && redisClusterConnection != null) {
redisPool.release(redisClusterConnection);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,10 @@
import io.camunda.zeebe.protocol.record.ValueType;
import io.lettuce.core.*;
import io.lettuce.core.cluster.RedisClusterClient;
import io.lettuce.core.cluster.api.StatefulRedisClusterConnection;
import io.lettuce.core.support.AsyncConnectionPoolSupport;
import io.lettuce.core.support.BoundedAsyncPool;
import io.lettuce.core.support.BoundedPoolConfig;
import io.zeebe.exporter.proto.Schema;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -14,9 +18,7 @@
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.*;
import java.util.function.Consumer;

public class ZeebeRedis implements AutoCloseable {
Expand Down Expand Up @@ -52,6 +54,7 @@ private static AbstractMap.SimpleEntry<String, Class<? extends com.google.protob

private UniversalRedisClient redisClient;
private UniversalRedisConnection<String, byte[]> redisConnection;
private BoundedAsyncPool<StatefulRedisClusterConnection<String, byte[]>> redisPool;

private int xreadBlockMillis;

Expand Down Expand Up @@ -118,20 +121,44 @@ public static RedisConnectionBuilder newBuilder(RedisClusterClient redisClient)
}

protected void start() {
redisConnection.addListener(new RedisConnectionStateAdapter() {
public void onRedisConnected(RedisChannelHandler<?, ?> connection, SocketAddress socketAddress) {
LOGGER.info("Redis reconnected.");
if (redisClient.isCluster()) {
CompletionStage<BoundedAsyncPool<StatefulRedisClusterConnection<String, byte[]>>> poolFuture =
AsyncConnectionPoolSupport.createBoundedObjectPoolAsync(() ->
redisClient.getRedisClusterClient().connectAsync(new ProtobufCodec()),
BoundedPoolConfig.builder().maxTotal(offsets.length).build());
try {
redisPool = poolFuture.toCompletableFuture().get();
} catch (InterruptedException e) {
LOGGER.error("Error creating Redis cluster connection pool", e);
throw new RuntimeException(e);
} catch (ExecutionException e) {
LOGGER.error("Error creating Redis cluster connection pool", e);
throw new RuntimeException(e);
}
public void onRedisDisconnected(RedisChannelHandler<?, ?> connection) {
if (forcedClose) return;
LOGGER.warn("Redis connection lost.");
if (reconnectUsesNewConnection) {
doClose();
reconnectExecutorService = Executors.newSingleThreadExecutor();
reconnectFuture = reconnectExecutorService.submit(ZeebeRedis.this::reconnect);
}
// the connection has been used to create consumer groups and can now be closed
// because reading from streams is done by the connection pool
redisConnection.close();
if (reconnectUsesNewConnection) {
LOGGER.warn("Parameter 'reconnectUsesNewConnection' has no effect when using RedisClusterClient.");
}
});
} else {
// if we're not connected to a cluster we eventually handle reconnects ourselves
redisConnection.addListener(new RedisConnectionStateAdapter() {
public void onRedisConnected(RedisChannelHandler<?, ?> connection, SocketAddress socketAddress) {
LOGGER.info("Redis reconnected.");
}

public void onRedisDisconnected(RedisChannelHandler<?, ?> connection) {
if (forcedClose) return;
LOGGER.warn("Redis connection lost.");
if (reconnectUsesNewConnection) {
doClose();
reconnectExecutorService = Executors.newSingleThreadExecutor();
reconnectFuture = reconnectExecutorService.submit(ZeebeRedis.this::reconnect);
}
}
});
}
forcedClose = false;
isClosed = false;
executorService = Executors.newSingleThreadExecutor();
Expand Down Expand Up @@ -196,6 +223,10 @@ private void doClose() {

isClosed = true;

if (redisPool != null) {
redisPool.closeAsync();
redisPool = null;
}
if (future != null) {
future.cancel(true);
future = null;
Expand Down Expand Up @@ -224,20 +255,36 @@ private void readFromStream() {
reconnectExecutorService.shutdown();
reconnectExecutorService = null;
}
while (!isClosed) {
readNext();
if (!redisClient.isCluster()) {
while (!isClosed) {
readNext(redisConnection, offsets);
}
} else {
for (XReadArgs.StreamOffset offset : offsets) {
redisPool.acquire().thenAcceptAsync(connection -> {
var universalRedisConnection = new UniversalRedisConnection(connection);
while (!isClosed) {
readNext(universalRedisConnection, offset);
}
redisPool.release(connection);
});
}
}
}

private void readNext() {
LOGGER.trace("Consumer[id={}] reads from streams '{}*'", consumerId, prefix);
private void readNext(UniversalRedisConnection redisConnection, XReadArgs.StreamOffset... offsets) {
if (offsets.length == 1) {
LOGGER.trace("Consumer[id={}] reads from stream '{}'", consumerId, offsets[0].getName());
} else {
LOGGER.trace("Consumer[id={}] reads from streams '{}*'", consumerId, prefix);
}

try {
List<StreamMessage<String, byte[]>> messages = redisConnection.syncStreamCommands()
var asyncStreamCommands = redisConnection.asyncStreamCommands();
List<StreamMessage<String, byte[]>> messages = (List<StreamMessage<String, byte[]>>) asyncStreamCommands
.xreadgroup(io.lettuce.core.Consumer.from(consumerGroup, consumerId),
XReadArgs.Builder.block(xreadBlockMillis).count(xreadCount), offsets);
XReadArgs.Builder.block(xreadBlockMillis).count(xreadCount), offsets).get();

var asyncStreamCommands = redisConnection.asyncStreamCommands();
for (StreamMessage<String, byte[]> message : messages) {
LOGGER.trace("Consumer[id={}] received message {} from {}", consumerId, message.getId(), message.getStream());
var success = handleRecord(message);
Expand All @@ -248,6 +295,7 @@ private void readNext() {
}
} catch (IllegalArgumentException ex) {
// should not happen with a correct configuration
redisConnection.releaseFromPool(redisPool);
LOGGER.error("Illegal arguments for xreadgroup: {}. Closing Redis client.", ex.getMessage());
try {
forceClose();
Expand All @@ -260,6 +308,7 @@ private void readNext() {
}
} catch (RedisCommandExecutionException e) {
// should not happen, but we want to recover anyway
redisConnection.releaseFromPool(redisPool);
if (!isClosed) {
LOGGER.error("Consumer[group={}, id={}] failed to read from streams '{}*': {}. Initiating reconnect.", consumerGroup, consumerId, prefix, e.getMessage());
try {
Expand Down Expand Up @@ -296,9 +345,7 @@ private boolean handleRecord(StreamMessage<String, byte[]> message) throws Inval

private <T extends com.google.protobuf.Message> void handleRecord(String stream,
Schema.Record genericRecord, Class<T> t) throws InvalidProtocolBufferException {

final var record = genericRecord.getRecord().unpack(t);

LOGGER.trace("Consumer[id={}] handling record {}", consumerId, record);

listeners
Expand Down
4 changes: 2 additions & 2 deletions connector-java/src/test/resources/log4j2-test.xml
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@

<Loggers>
<Logger name="io.zeebe" level="info"/>
<Logger name="io.zeebe.broker.exporter" level="trace"/>
<Logger name="io.zeebe.redis" level="trace"/>
<Logger name="io.zeebe.broker.exporter" level="debug"/>
<Logger name="io.zeebe.redis" level="debug"/>

<Root level="info">
<AppenderRef ref="Console"/>
Expand Down
2 changes: 1 addition & 1 deletion exporter/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@

<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-slf4j-impl</artifactId>
<artifactId>log4j-slf4j2-impl</artifactId>
<scope>test</scope>
</dependency>

Expand Down
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@

<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-slf4j-impl</artifactId>
<artifactId>log4j-slf4j2-impl</artifactId>
<version>${version.log4j}</version>
</dependency>

Expand Down

0 comments on commit ca36ba6

Please sign in to comment.