Skip to content

Commit

Permalink
refactor: #56 introduce command batching
Browse files Browse the repository at this point in the history
  • Loading branch information
VonDerBeck committed Oct 6, 2023
1 parent 587163e commit 8a7ff6e
Show file tree
Hide file tree
Showing 9 changed files with 383 additions and 144 deletions.
27 changes: 24 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ When creating the Redis Client it might as well help to set an appropriate IO-Th

```java
var redisClient = RedisClient.create(
ClientResources.builder().ioThreadPoolSize(8).build(),
ClientResources.builder().ioThreadPoolSize(4).build(),
redisAddress);
```

Expand Down Expand Up @@ -222,8 +222,14 @@ zeebe:
# Redis stream automatic cleanup of acknowledged messages. Default is false.
deleteAfterAcknowledge: false

# Redis Client IO-Thread-Pool-Size. Default is number of available processors but not smaller than 4.
ioThreadPoolSize: 4
# Redis Client IO-Thread-Pool-Size. Default is number of available processors but not smaller than 2.
ioThreadPoolSize: 2

# Redis batch size for flushing commands when sending data to Redis streams. Default is 250. Maximum number of events which will be flushed simultaneously.
batchSize: 250

# Redis batch cycle in milliseconds for sending data to Redis streams. Default is 500. Even if the batch size has not been reached events will be sent after this time.
batchCycleMillis: 500

# record serialization format: [protobuf|json]
format: "protobuf"
Expand Down Expand Up @@ -306,6 +312,21 @@ Cleanup is synchronized between different Zeebe nodes so that the cleanup does n
Version 0.9.2 comes with a single cleanup parameter `ZEEBE_REDIS_TIME_TO_LIVE_IN_SECONDS`
which is equal to the `ZEEBE_REDIS_MAX_TIME_TO_LIVE_IN_SECONDS` since version 0.9.3.
#### Tuning exporter performance
*Since 0.9.8*
In order to tune the exporter performance you have the following options available:
| **Parameter** | **Description** |
|-----------------------------------|---------------------------------------------------------------------------------------------------------------------------------------------------------|
| `ZEEBE_REDIS_BATCH_SIZE` | Batch size for flushing commands when sending data. Default is `250`. More precisely the maximum number of events which will be flushed simultaneously. |
| `ZEEBE_REDIS_BATCH_CYCLE_MILLIS` | Batch cycle in milliseconds for sending data. Default is `500`. Even if the batch size has not been reached events will be sent after this time. |
| `ZEEBE_REDIS_IO_THREAD_POOL_SIZE` | Redis Client IO-Thread-Pool-Size. Default is number of available processors but not smaller than `2`. |
The exporter queues records and sends them every `ZEEBE_REDIS_BATCH_CYCLE_MILLIS`. Sending data then does not use the default auto-flush / flush-after-write mode of Lettuce but groups multiple
commands in a batch using `ZEEBE_REDIS_BATCH_SIZE` as maximum thus increasing the throughput. According to the Lettuce documentation batches are recommended to have a size between 50 and 1000.
## Build it from Source
The exporter and the Java connector can be built with Maven
Expand Down
6 changes: 6 additions & 0 deletions exporter/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,12 @@
<artifactId>lettuce-core</artifactId>
</dependency>

<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
<version>3.12.0</version>
</dependency>

<!-- test -->
<dependency>
<groupId>io.camunda</groupId>
Expand Down
18 changes: 18 additions & 0 deletions exporter/src/main/java/io/zeebe/redis/exporter/EventQueue.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package io.zeebe.redis.exporter;

import org.apache.commons.lang3.tuple.ImmutablePair;

import java.util.concurrent.ConcurrentLinkedQueue;

public class EventQueue {

private final ConcurrentLinkedQueue<ImmutablePair<Long, RedisEvent>> queue = new ConcurrentLinkedQueue<>();

public void addEvent(ImmutablePair<Long, RedisEvent> event) {
queue.add(event);
}

public ImmutablePair<Long, RedisEvent> getNextEvent() {
return queue.poll();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,11 @@ public class ExporterConfiguration {

private boolean deleteAfterAcknowledge = false;

private int ioThreadPoolSize = Math.max(4, Runtime.getRuntime().availableProcessors());
private int ioThreadPoolSize = Math.max(2, Runtime.getRuntime().availableProcessors());

private int batchSize = 250;

private int batchCycleMillis = 500;

public long getCleanupCycleInSeconds() {
return getEnv("CLEANUP_CYCLE_IN_SECONDS").map(Long::parseLong).orElse(cleanupCycleInSeconds);
Expand All @@ -48,6 +52,14 @@ public int getIoThreadPoolSize() {
return getEnv("IO_THREAD_POOL_SIZE").map(Integer::parseInt).orElse(ioThreadPoolSize);
}

public int getBatchSize() {
return getEnv("BATCH_SIZE").map(Integer::parseInt).orElse(batchSize);
}

public int getBatchCycleMillis() {
return getEnv("BATCH_CYCLE_MILLIS").map(Integer::parseInt).orElse(batchCycleMillis);
}

public String getFormat() {
return getEnv("FORMAT").orElse(format);
}
Expand Down Expand Up @@ -88,6 +100,8 @@ public String toString() {
", maxTimeToLiveInSeconds=" + getMaxTimeToLiveInSeconds() +
", deleteAfterAcknowledge=" + isDeleteAfterAcknowledge() +
", ioThreadPoolSize=" + getIoThreadPoolSize() +
", batchSize=" + getBatchSize() +
", batchCycleMillis=" + getBatchCycleMillis() +
']';
}
}
147 changes: 147 additions & 0 deletions exporter/src/main/java/io/zeebe/redis/exporter/RedisCleaner.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,147 @@
package io.zeebe.redis.exporter;

import io.lettuce.core.*;
import io.lettuce.core.api.StatefulRedisConnection;
import org.slf4j.Logger;

import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;

public class RedisCleaner {

private final static String CLEANUP_LOCK = "zeebe:cleanup-lock";
private final static String CLEANUP_TIMESTAMP = "zeebe:cleanup-time";
private final Logger logger;
private StatefulRedisConnection<String, ?> redisConnection;

private Map<String, Boolean> streams = new ConcurrentHashMap<>();

private boolean useProtoBuf;
private long maxTtlInMillisConfig;
private long minTtlInMillisConfig;
private boolean deleteAfterAcknowledge;

private Duration trimScheduleDelay;

public RedisCleaner(StatefulRedisConnection<String, ?> redisConnection, boolean useProtoBuf, ExporterConfiguration config, Logger logger) {
this.logger = logger;
this.redisConnection = redisConnection;
this.useProtoBuf = useProtoBuf;
minTtlInMillisConfig = config.getMinTimeToLiveInSeconds() * 1000l;
if (minTtlInMillisConfig < 0) minTtlInMillisConfig = 0;
maxTtlInMillisConfig = config.getMaxTimeToLiveInSeconds() * 1000l;
if (maxTtlInMillisConfig < 0) maxTtlInMillisConfig = 0;
deleteAfterAcknowledge = config.isDeleteAfterAcknowledge();
trimScheduleDelay = Duration.ofSeconds(config.getCleanupCycleInSeconds());
}

public void considerStream(String stream) {
streams.put(stream, Boolean.TRUE);
}

public void trimStreamValues() {
if (redisConnection != null && streams.size() > 0 && acquireCleanupLock()) {
try {
// get ID according to max time to live
final long maxTTLMillis = System.currentTimeMillis() - maxTtlInMillisConfig;
final String maxTTLId = String.valueOf(maxTTLMillis);
// get ID according to min time to live
final long minTTLMillis = System.currentTimeMillis() - minTtlInMillisConfig;
final String minTTLId = String.valueOf(minTTLMillis);
logger.debug("trim streams {}", streams);
// trim all streams
List<String> keys = new ArrayList(streams.keySet());
keys.forEach(stream -> {
Optional<Long> minDelivered = !deleteAfterAcknowledge ? Optional.empty() :
redisConnection.sync().xinfoGroups(stream)
.stream().map(o -> XInfoGroup.fromXInfo(o, useProtoBuf))
.map(xi -> {
if (xi.getPending() > 0) {
xi.considerPendingMessageId(redisConnection.sync()
.xpending(stream, xi.getName()).getMessageIds().getLower().getValue());
}
return xi.getLastDeliveredId();
})
.min(Comparator.comparing(Long::longValue));
if (minDelivered.isPresent()) {
long minDeliveredMillis = minDelivered.get();
String xtrimMinId = String.valueOf(minDeliveredMillis);
if (maxTtlInMillisConfig > 0 && maxTTLMillis > minDeliveredMillis) {
xtrimMinId = maxTTLId;
} else if (minTtlInMillisConfig > 0 && minTTLMillis < minDeliveredMillis) {
xtrimMinId = minTTLId;
}
long numTrimmed = redisConnection.sync().xtrim(stream, new XTrimArgs().minId(xtrimMinId));
if (numTrimmed > 0) {
logger.debug("{}: {} cleaned records", stream, numTrimmed);
}
} else if (maxTtlInMillisConfig > 0) {
long numTrimmed = redisConnection.sync().xtrim(stream, new XTrimArgs().minId(maxTTLId));
if (numTrimmed > 0) {
logger.debug("{}: {} cleaned records", stream, numTrimmed);
}
}
});
} catch (RedisCommandTimeoutException | RedisConnectionException ex) {
logger.error("Error during cleanup due to possible Redis unavailability: {}", ex.getMessage());
} catch (Exception ex) {
logger.error("Error during cleanup", ex);
} finally {
releaseCleanupLock();
}
}
}

private boolean acquireCleanupLock() {
try {
String id = UUID.randomUUID().toString();
Long now = System.currentTimeMillis();
// ProtoBuf format
if (useProtoBuf) {
// try to get lock
StatefulRedisConnection<String, byte[]> con = (StatefulRedisConnection<String, byte[]>) redisConnection;
con.sync().set(CLEANUP_LOCK, id.getBytes(StandardCharsets.UTF_8), SetArgs.Builder.nx().px(trimScheduleDelay));
byte[] getResult = con.sync().get(CLEANUP_LOCK);
if (getResult != null && getResult.length > 0 && id.equals(new String(getResult, StandardCharsets.UTF_8))) {
// lock successful: check last cleanup timestamp (autoscaled new Zeebe instances etc.)
byte[] lastCleanup = con.sync().get(CLEANUP_TIMESTAMP);
if (lastCleanup == null || lastCleanup.length == 0 ||
Long.parseLong(new String(lastCleanup, StandardCharsets.UTF_8)) < now - trimScheduleDelay.toMillis()) {
con.sync().set(CLEANUP_TIMESTAMP, Long.toString(now).getBytes(StandardCharsets.UTF_8));
return true;
}
}
// JSON format
} else {
// try to get lock
StatefulRedisConnection<String, String> con = (StatefulRedisConnection<String, String>) redisConnection;
con.sync().set(CLEANUP_LOCK, id, SetArgs.Builder.nx().px(trimScheduleDelay));
if (id.equals(con.sync().get(CLEANUP_LOCK))) {
// lock successful: check last cleanup timestamp (autoscaled new Zeebe instances etc.)
String lastCleanup = con.sync().get(CLEANUP_TIMESTAMP);
if (lastCleanup == null || lastCleanup.isEmpty() || Long.parseLong(lastCleanup) < now - trimScheduleDelay.toMillis()) {
con.sync().set(CLEANUP_TIMESTAMP, Long.toString(now));
return true;
}
}
}
} catch (RedisCommandTimeoutException | RedisConnectionException ex) {
logger.error("Error acquiring cleanup lock due to possible Redis unavailability: {}", ex.getMessage());
} catch (Exception ex) {
logger.error("Error acquiring cleanup lock", ex);
}
return false;
}

private void releaseCleanupLock() {
try {
redisConnection.sync().del(CLEANUP_LOCK);
} catch (RedisCommandTimeoutException | RedisConnectionException ex) {
logger.error("Error releasing cleanup lock due to possible Redis unavailability: {}", ex.getMessage());
} catch (Exception ex) {
logger.error("Error releasing cleanup lock", ex);
}
}
}
14 changes: 14 additions & 0 deletions exporter/src/main/java/io/zeebe/redis/exporter/RedisEvent.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
package io.zeebe.redis.exporter;

public class RedisEvent {

String stream;
long key;
Object value;

public RedisEvent(String stream, long key, Object value) {
this.stream = stream;
this.key = key;
this.value = value;
}
}
Loading

0 comments on commit 8a7ff6e

Please sign in to comment.