Redis Streams Java is a Java library that provides a topic abstraction over Redis streams. Each topic is backed by one or more Redis streams. As you publish messages to a topic, a new stream is automatically created as soon as the current stream grows beyond a predefined size. This allows streams to scale linearly across the shards of your Redis cluster.
The library provides core functionality for writing to a distributed Redis Stream as well as reading from a distributed Redis Stream in Redis. This works in all Redis deployment scenarios:
-
A Pair of Redis Enterprise or Redis Cloud Active-Active Clusters
-
Redis Enterprise or Redis Cloud Instance.
-
Single Instance of Redis Stack
In fact the library takes special care to ensure that reading from a Consumer Group of a single topic across two Active-Active clusters is done so in a way that honors the ordering and pending entry list of the consumer group consistently.
Redis Streams Java requires a JedisPooled
connection object. You may want to tune your JedisPooled
instance for your own needs.
To run Redis Streams Java in production, you’ll need one of the following deployments:
Next, you’ll need to install the Redis Streams Java plugin and configure it.
For a self-managed deployment, or for testing locally, install Redis Stack or spin up a free Redis Cloud instance. If you need a fully-managed, cloud-based deployment of Redis on AWS, GCP, or Azure, see all the Redis Cloud offerings. For deployment in your own private cloud or data center, consider Redis Enterprise.
To install the library simply add the following:
Add the following to your pom.xml
file:
<dependency>
<groupId>com.redis</groupId>
<artifactId>redis-streams-java</artifactId>
<version>0.3.5</version>
</dependency>
Generically, there are three primary components of the streams library
-
TopicManager
- Oversees the topics, tracks the status of consumers within the library. -
ConsumerGroup
- Manages and coordinates consumption from topics. -
TopicProducer
- Produces messages for the topic.
To connect any of these key components you will need a JedisPooled
connection to Redis.
To initialize the TopicManager
, simply pass in your Jedis Connection along with a topic manager configuration to createTopic
.
JedisPooled jedis = new JedisPooled("redis://localhost:6379");
SerialTopicConfig config = new SerialTopicConfig("my-topic");
TopicManager topicManager = TopicManager.createTopic(jedis, config);
This will create the topic manager, as well as creating the items in Redis required to support the topic.
The next step is to create a TopicProducer
, to create one, simply pass in your Jedis Connection along with the topic name to the TopicProducer
constructor.
Producer producer = new TopicProducer(jedis, "my-topic");
The final item to create is a ConsumerGroup
, the consumer group is responsible for coordinating consumption of the topic. To create a ConsumerGroup
, pass your Jedis Connection along with the topic name and your consumer name into the ConsumerGroup
constructor
ConsumerGroup consumerGroup = new ConsumerGroup(jedis, "my-topic", "test-group");
To add a message to the topic, simply pass a Map<String,String>
into the TopicProducer
Map<String, String> msg = Map.of("temp", "81", "humidity", "0.92", "city", "Satellite Beach");
producer.produce(msg);
To consume messages, simply call consume
on you consumer group, passing in your consumer name:
TopicEntry entry = consumerGroup.consume("my-consumer");
The message contains: 1. The message id (the monotonic id created by Redis when the message was produced) 2. The Stream the message was read from 3. The message itself
After you have consumed a message, you must then acknowledge it, to do so, simply call acknowledge
passing in the AckMessage
constructed from the TopicEntry
received from consuming a message.
TopicEntry entry = consumerGroup.consume("test-consumer");
// Some extra processing
// ...
consumerGroup.acknowledge(new AckMessage(entry));
If your application is unable to acknowledge the message (for example if the process died during processing), the messages remain in a pending state, you can acquire any pending messages using the TopicManager
.
Then you can acknowledge those messages using the consumerGroup:
List<PendingEntry> pendingEntryList = topicManager.getPendingEntries("my-group", query);
consumerGroup.acknowledge(new AckMessage(pendingEntryList.get(0)));
If you want to keep an eye on what is going on with your topic, and the consumer groups within the topic, you can use the use the TopicManager’s `getConsumerGroupStats
method:
ConsumerGroupStatus stats = topicManager.getConsumerGroupStatus("my-group");
System.out.printf("Consumer Group Name: %s%n", stats.getGroupName());
System.out.printf("Consumer Group Topic Size: %d%n", stats.getTopicEntryCount());
System.out.printf("Consumer Group Pending Entries: %d%n", stats.getPendingEntryCount());
System.out.printf("Consumer Group Lag: %d%n", stats.getConsumerLag());
A NoAckConsumerGroup
implementation exists which allows you to read from a stream in the context of a
consumer group with no need to acknowledge any messages that you retrieved from the stream. This is useful when
you want to ensure "exactly once" delivery semantics and are comfortable losing a message if something
happens after the entry is delivered. To utilize this, just initialize the NoAckConsumerGroup
, and consume
as you would with the normal ConsumerGroup
the key difference is that there is no need to acknowledge any
NoAckConsumerGroup noack = new NoAckConsumerGroup(jedis, "my-topic", "no-ack-group");
TopicEntry entry = noack.consume("my-consumer");
// your apps processing
There is also a "Single Cluster PEL" topic manager and consumer group. This implementation does not replicate the Pending Entries List (PEL) across Cluster in an Active-Active configuration, making it more performant than its standard counterpart for those Active Active deployments. The caveat is that your consumer group PEL will not be synchronized across clusters, so you will not be able to claim any entries dropped outside of the original region of consumption.
To read without replicating the PEL, simply initialize the SingleClusterPelConsumer
group and use it as you would with any
other consumer group:
SingleClusterPelConsumerGroup singleClusterPel = new SingleClusterPelConsumerGroup(jedis, "my-topic", "pel-group");
TopicEntry entry = singleClusterPel.consume("my-consumer");
// your apps processing
singleClusterPel.acknowledge(new AckMessage(entry));
The method for gather Consumer group stats and getting pending entries is naturally different with the Single Cluster PEL
implementation. You must therefore use a specialized SingleCLusterPelTopicManager
to retrieve these e.g.:
SingleClusterPelTopicManager singleClusterPelTopicManager = new SingleClusterPelTopicManager(jedis, config);
PendingEntryQuery query = new PendingEntryQuery();
query.setCount(1);
List<PendingEntry> pendingEntriesSingleCLuster = singleClusterPelTopicManager.getPendingEntries("pel-group", query);
ConsumerGroupStatus consumerGroupStatsSingleCluster = singleClusterPelTopicManager.getConsumerGroupStatus("pel-group");
Redis Streams Java is supported by Redis, Inc. for enterprise-tier customers as a 'Developer Tool' under the Redis Software Support Policy. For non enterprise-tier customers we supply support for Redis Streams Java on a good-faith basis. To report bugs, request features, or receive assistance, please file an issue.
Redis Streams Java is licensed under the Business Source License 1.1. Copyright © 2024 Redis, Inc. See LICENSE for details.