From f00734a793cfed45f97fec744f26b60ba84976cb Mon Sep 17 00:00:00 2001 From: Francois Ferrand Date: Fri, 6 Oct 2023 14:45:51 +0200 Subject: [PATCH] Use cooperative rebalance This should minimize disruptions on rebalance. Issue: BB-441 --- lib/BackbeatConsumer.js | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/lib/BackbeatConsumer.js b/lib/BackbeatConsumer.js index cf8cf5292..3d50ce895 100644 --- a/lib/BackbeatConsumer.js +++ b/lib/BackbeatConsumer.js @@ -182,6 +182,7 @@ class BackbeatConsumer extends EventEmitter { 'allow.auto.create.topics': true, 'statistics.interval.ms': 1000, 'rebalance_cb': this._onRebalance.bind(this), + 'partition.assignment.strategy': 'cooperative-sticky', }; const topicParams = {}; if (this._fromOffset !== undefined) { @@ -496,7 +497,7 @@ class BackbeatConsumer extends EventEmitter { this._log.debug('rdkafka.assign', { err, assignment }); try { - this._consumer.assign(assignment); + this._consumer.incrementalAssign(assignment); } catch (e) { // Ignore exceptions if we are not connected const logger = this._consumer.isConnected() ? this._log.error : this._log.debug; @@ -510,7 +511,7 @@ class BackbeatConsumer extends EventEmitter { }); if (!this._processingQueue || this._processingQueue.length() + this._processingQueue.running() === 0) { - this._consumer.unassign(); + this._consumer.incrementalUnassign(assignment); return; } @@ -521,7 +522,7 @@ class BackbeatConsumer extends EventEmitter { this._log.debug('processing queue drained, un-assigning'); try { - this._consumer.unassign(); + this._consumer.incrementalUnassign(assignment); } catch (e) { // Ignore exceptions if we are not connected const logger = this._consumer.isConnected() ? this._log.error : this._log.debug;