Simple java based library to consume/produce messages from an Apache Kafka broker. This tool was strongly based on the original kafkacat implementation. The motivation to recreate the (already existing) tool is because kafkacat needs to have some OS dependencies installed (librdkafka) that in some cases may not be available (my case) and/or because some old OS available packages we wont be able to have new awesome features (avro serialization/deserialization) for example.
Jkafkacat requires basically Maven 3+ and JDK 1.8 to compile:
mvn clean package
This will generate the target\jkafkacat-dist.jar, with that you're good to go.
Command:
java -jar jkafkacat-dist.jar -m consumer -o begin -t mytopic -c config.properties
config.properties
bootstrap.servers=localhost:9092
group.id=jkafkacat
value.deserializer=org.apache.kafka.common.serialization.StringDeserializer
key.deserializer=org.apache.kafka.common.serialization.StringDeserializer
Consuming 10 messages from an unsecured broker and then exit, starting at the end of the topic (will consume only new messages)
Command:
java -jar jkafkacat-dist.jar -m consumer -o end -nm 10 -t mytopic -c config.properties
config.properties
bootstrap.servers=localhost:9092
group.id=jkafkacat
value.deserializer=org.apache.kafka.common.serialization.StringDeserializer
key.deserializer=org.apache.kafka.common.serialization.StringDeserializer
Given a topic with 5 partitions in an unsecured broker, consume the last message from each partition, consuming an amount of 5 messages and then exit
Command:
java -jar jkafkacat-dist.jar -m consumer -o end -os -1 -t mytopic -c config.properties -nm 5
config.properties
bootstrap.servers=localhost:9092
group.id=jkafkacat
value.deserializer=org.apache.kafka.common.serialization.StringDeserializer
key.deserializer=org.apache.kafka.common.serialization.StringDeserializer
Given an ordinary topic in an unsecured broker, consume 100 messages produced after 'Thursday, 29 August 2019 13:00:00 GMT', formatting the output to show the message timestamp and the key#value and then exit
Command:
java -jar jkafkacat-dist.jar -m consumer -o timestamp -ot 1567083600000 -t mytopic -c config.properties -nm 100 -f "%T %k#%v"
config.properties
bootstrap.servers=localhost:9092
group.id=jkafkacat
value.deserializer=org.apache.kafka.common.serialization.StringDeserializer
key.deserializer=org.apache.kafka.common.serialization.StringDeserializer
Given an ordinary topic in a secured broker with SASL_SSL, consume 10 AVRO messages, getting dynamically the schema from a secured schema registry with SSL/Certificate authentication - specified through the ks and ksp params, formatting the output to show the message timestamp and the key#value and then exit
Command:
java -jar jkafkacat-dist.jar -ks keystore.jks -ksp changeit -m consumer -o end -nm 10 -t mytopic -c config.properties -f "%k#%v"
config.properties
bootstrap.servers=localhost:9092
group.id=jkafkacat
sasl.mechanism=SCRAM-SHA-512
security.protocol=SASL_SSL
ssl.endpoint.identification.algorithm=
ssl.truststore.location=truststore.jks
ssl.truststore.password=changeit
sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required \
username="admin" \
password="admin";
# avro deserialization
value.deserializer=io.confluent.kafka.serializers.KafkaAvroDeserializer
key.deserializer=org.apache.kafka.common.serialization.LongDeserializer
# schema registry
schema.registry.url=https://localhost:8081
Given the need to reset the offset of a source kafka-connect connector, extract the information from the connect-offsets topic to see the partition and key#value (see https://rmoff.net/2019/08/15/reset-kafka-connect-source-connector-offsets/)
Command:
java -jar jkafkacat-dist.jar -m consumer -o end -os -1 -t connect-offsets -c config.properties -f "\nKey (%K bytes): %k \nValue (%V bytes): %v \nTimestamp: %T \nPartition: %p \nOffset: %o\n"
config.properties
bootstrap.servers=localhost:9092
group.id=jkafkacat
value.deserializer=org.apache.kafka.common.serialization.StringDeserializer
key.deserializer=org.apache.kafka.common.serialization.StringDeserializer
Given the need to reset the offset of a source kafka-connect connector, in an unsecured broker, produce a tombstone message to a specific partition of the topic connect-offsets (see https://rmoff.net/2019/08/15/reset-kafka-connect-source-connector-offsets/)
Command:
echo '["source-file-01",{"filename":"/data/testdata.txt"}]#' | java -jar jkafkacat-dist.jar -m producer -t connect-offsets -c config.properties -K # -p 20
config.properties
bootstrap.servers=localhost:9092
value.serializer=org.apache.kafka.common.serialization.StringSerializer
key.serializer=org.apache.kafka.common.serialization.StringSerializer
Given the need to copy AVRO messages (key and value) from a topic to another, in an unsecured broker, pipe the consume command to the produce command to produce AVRO messages
Command:
java -jar jkafkacat-dist.jar -m consumer -t source-topic -o begin -c consumer.properties -f "%k#%v" | java -jar jkafkacat-dist.jar -m producer -t target-topic -c producer.properties -K #
consumer.properties
bootstrap.servers=brokerA:9092
value.deserializer=io.confluent.kafka.serializers.KafkaAvroDeserializer
key.deserializer=org.apache.kafka.common.serialization.LongDeserializer
schema.registry.url=http://localhost:8081
producer.properties
bootstrap.servers=brokerB:9092
value.serializer=io.confluent.kafka.serializers.KafkaAvroSerializer
key.serializer=io.confluent.kafka.serializers.KafkaAvroSerializer
schema.registry.url=http://localhost:8081