diff --git a/lib/BackbeatConsumer.js b/lib/BackbeatConsumer.js index f5a35f1cb9..a6538c6f1b 100644 --- a/lib/BackbeatConsumer.js +++ b/lib/BackbeatConsumer.js @@ -181,7 +181,7 @@ class BackbeatConsumer extends EventEmitter { // automatically create topic 'allow.auto.create.topics': true, 'statistics.interval.ms': 1000, - 'rebalance_cb': true, + 'rebalance_cb': this._onRebalance.bind(this), }; const topicParams = {}; if (this._fromOffset !== undefined) { @@ -209,15 +209,6 @@ class BackbeatConsumer extends EventEmitter { this._consumer.on('event.error', err => this._log.error('rdkafka.error', { err })); this._consumer.on('event.throttle', throttle => this._log.info('rdkafka.throttle', { throttle })); this._consumer.on('event.stats', observeKafkaStats); - this._consumer.on('rebalance', (err, assignment) => { - if (err.code === kafka.CODES.ERRORS.ERR__ASSIGN_PARTITIONS) { - this._log.debug('rdkafka.assign', { err, assignment }); - } else if (err.code === kafka.CODES.ERRORS.ERR__REVOKE_PARTITIONS) { - this._log.debug('rdkafka.revoke', { err }); - } else { - this._log.error('rdkafka.rebalance', { err, assignment }); - } - }); this._connect(); } @@ -495,6 +486,50 @@ class BackbeatConsumer extends EventEmitter { return undefined; } + /** + * @param {kafka.KafkaError} err Rebalance event + * @param {TopicPartition[]} assignment List of (un)assigned partitions + * @returns {void} + */ + _onRebalance(err, assignment) { + if (err.code === kafka.CODES.ERRORS.ERR__ASSIGN_PARTITIONS) { + this._log.debug('rdkafka.assign', { err, assignment }); + + try { + this._consumer.assign(assignment); + } catch (e) { + // Ignore exceptions if we are not connected + const logger = this._consumer.isConnected() ? this._log.error : this._log.debug; + logger.bind(this._log)('rdkafka.assign failed', { err, e, assignment }); + } + } else if (err.code === kafka.CODES.ERRORS.ERR__REVOKE_PARTITIONS) { + this._log.debug('rdkafka.revoke', { + err, + queueLen: this._processingQueue.length(), + running: this._processingQueue.running(), + }); + + if (!this._processingQueue || this._processingQueue.length() + this._processingQueue.running() === 0) { + this._consumer.unassign(); + return; + } + + this._processingQueue.drain(() => { + this._log.debug('processing queue drained, un-assigning'); + + try { + this._consumer.unassign(); + } catch (e) { + // Ignore exceptions if we are not connected + const logger = this._consumer.isConnected() ? this._log.error : this._log.debug; + logger.bind(this._log)('rdkafka.unassign failed', { err, e, assignment }); + } + }); + } else { + this._log.error('rdkafka.rebalance', { err, assignment }); + } + } + /** * Function to be called when safe to commit the consumer offset * of the given entry