diff --git a/lib/baseProducer.js b/lib/baseProducer.js index 55854c70..12cb9f7d 100644 --- a/lib/baseProducer.js +++ b/lib/baseProducer.js @@ -82,7 +82,13 @@ BaseProducer.prototype.connect = function () { // emiter... var self = this; this.ready = this.client.ready; - if (this.ready) self.emit('ready'); + if (this.ready) { + // Emit the ready event in next tick to give consumers a chance to set up + // a `ready` listener + setImmediate(function () { + self.emit('ready'); + }); + } this.client.on('ready', function () { if (!self.ready) { self.ready = true; diff --git a/test/test.baseProducer.js b/test/test.baseProducer.js index 5c53d9d4..7b8e7d98 100644 --- a/test/test.baseProducer.js +++ b/test/test.baseProducer.js @@ -10,6 +10,33 @@ const async = require('async'); const should = require('should'); describe('BaseProducer', function () { + describe('ready event', function () { + const KAFKA_HOST = 'localhost:9092'; + let client; + before(function () { + client = new KafkaClient({ + kafkaHost: KAFKA_HOST + }); + }); + + it('can listen on the ready event before the client is connected', function (done) { + const producer = new BaseProducer(client, {}, BaseProducer.PARTITIONER_TYPES.default); + producer.once('ready', function () { + should(producer.ready).be.true; + done(); + }); + }); + + it('can listen on the ready event after the client is connected', function (done) { + should(client.ready).be.true; + const producer = new BaseProducer(client, {}, BaseProducer.PARTITIONER_TYPES.default); + producer.once('ready', function () { + should(producer.ready).be.true; + done(); + }); + }); + }); + describe('encoding and decoding key attribute', function () { const KAFKA_HOST = 'localhost:9092'; let consumerGroup, topic, producer;