Kafka test helper is a Node.js library that helps you writing integration tests that interacts with Apache Kafka.
This library can:
- read events published on a topic
- publish events on a channel or a specific partition
- create/delete topic on the fly
Kafka test helper is able to work both with independent messages and messages managed in the context of a transaction (commited or aborted). You can use Kafka test helper in conjunction with your preferred testing library, because it is compatible with jest, mocha, tap and so on..., by the way it's agnostic by test runners.
npm install kafka-test-helper
Full source code available here: ProducerExample.test.js and ProducerExample.js
import { createKafkaTestHelper } from 'kafka-test-helper'
test('ProducerExample', async () => {
// init Kafka Test helper
const kafka = getKafka() // see https://kafka.js.org/docs/configuration
const topicPrefix = Date.now() // this avoids cross test interference
const topicHelper = await createKafkaTestHelper(kafka, topicPrefix + '_something_happened')
await topicHelper.ensureTopicExists()
// init the module that has to be tested
const controller = new ProducerExample()
await controller.setup(kafka, topicPrefix)
const record = {
name: 'Tony',
surname: 'Stark'
}
await controller.doSomething(record)
// Kafka Test Helper retrieves published messages
const messages = await topicHelper.messages()
expect(messages).toHaveLength(1)
expect(messages[0].json).toEqual({
operation: 'doSomething',
record
})
// OR
expect(messages[0].string).toEqual(JSON.stringify({
operation: 'doSomething',
record
}))
// OR
expect(messages[0].buffer).toEqual(Buffer.from(JSON.stringify({
operation: 'doSomething',
record
})))
// OR
expect(messages).toEqual([
expect.objectContaining({
json: {
operation: 'doSomething',
record
}
})
])
// destroy
await controller.destroy()
await topicHelper.ensureTopicDeleted()
})
Full source code available here: ConsumerExample.test.js and ConsumerExample.js
import { createKafkaTestHelper } from 'kafka-test-helper'
test('ConsumerExample', async () => {
// init Kafka Test helper
const kafka = getKafka() // see https://kafka.js.org/docs/configuration
const topicPrefix = Date.now() // this avoids cross test interference
const topicHelper = await createKafkaTestHelper(kafka, topicPrefix + 'test-topic')
await topicHelper.ensureTopicExists()
// init the module that has to be tested
const controller = new ConsumerExample()
await controller.setup(kafka, topicPrefix)
// a way to intercepts when the controller has done (there could be other ways...)
const waitMessage = () => new Promise(resolve => {
controller.handleMessage = jest.fn()
.mockImplementation(message => {
resolve(message)
})
})
// Kafka Test Helper publishes a message, serialized as JSON string
await topicHelper.publishMessages([
{
json: {
hello: 'world'
}
}
])
// wait for post elaboration and validates output
const message = await waitMessage()
expect(message).toBe('{"hello":"world"}')
await controller.destroy()
await topicHelper.ensureTopicDeleted()
})
- 0.x - Beta version
- 1.0 - First official version
- 1.1 - Adds typescript definition [#1]; Upgrades deps
- 1.2 - Migrates to Kafka.JS 2; Upgrades deps; Upgrades referenced images on docker-compose
- 1.3 - Fixes
js/insecure-randomness
security issue; Deprecates Node.js 14 and 16; Upgrades deps, Kafka docker images and workflows
- chrvadala (author)
- RodolfoSilva