"Verteiler" is german and translates to distributor. When you are using the (new) kafka-clients
consumer introduced with v0.9.0, you will face the issue that Consumer.poll(...)
can only be called from one thread. Verteiler leverages the kafka-client consumer implementation to distribute messages from assigned partitions to BlockingQueue
s. Each assigned partition will relay its messages to its own internal queue. These queues have worker threads which process the message leveraging a user defined consumer. Once a message has been processed successfully its offset will be committed asynchronously after poll(...)
has finished. This way only the offset of processed messages is committed.
String topic
: Name of the topic you want to consume messages from.
Properties kafkaConfig
: Regular consumer config. By default verteiler
will disable auto offset commit and set a client id when not given.
int queueSize
: Size of the internal queue, which depends on your needs.
java.util.function.Consumer<V> action
: The function which will handle a message.
Simple message counter:
Properties props = new Properties();
props.setProperty(BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
...
AtomicInteger messageCounter = new AtomicInteger();
Consumer<String> action = (message) -> messageCounter.incrementAndGet();
BlockingQueueConsumer<String, String> consumer = new BlockingQueueConsumer<>("my_topic", props, 42, action);
consumer.start();
For a full example take a look at BlockingQueueConsumerTest.
gradle build
will build the project. The integration tests will start embedded zookeeper & kafka instances.