Skip to content

Commit

Permalink
Publish offsets before unassign
Browse files Browse the repository at this point in the history
This is the only place where we can do it (esp. on shutdown), since it
calls `consumer.position()`, with no param, and thus updates the position
of assigned partitions.

This however has an unfortunate effect: it puts Zookeeper in the path,
as the unassign with be delayed until we have retrieved the offsets
and written them to zookeeper.

This should however be fine in the context of a kafka rebalance, which
takes seconds or more.

Issue: BB-455
  • Loading branch information
francoisferrand authored and Kerkesni committed Oct 30, 2023
1 parent e1de8b5 commit 409ea16
Showing 1 changed file with 33 additions and 31 deletions.
64 changes: 33 additions & 31 deletions lib/BackbeatConsumer.js
Original file line number Diff line number Diff line change
Expand Up @@ -585,15 +585,24 @@ class BackbeatConsumer extends EventEmitter {
logger.bind(this._log)('rdkafka.commit failed', { e: e.toString(), assignment });
}

try {
this._consumer.unassign();
} catch (e) {
// Ignore exceptions if we are not connected
const logger = this._consumer.isConnected() ? this._log.error : this._log.info;
logger.bind(this._log)('rdkafka.unassign failed', { e: e.toString(), assignment });
}
const doUnassign = () => {
try {
this._consumer.unassign();
} catch (e) {
// Ignore exceptions if we are not connected
const logger = this._consumer.isConnected() ? this._log.error : this._log.info;
logger.bind(this._log)('rdkafka.unassign failed', { e: e.toString(), assignment });
}

this.emit('unassign', status);
this.emit('unassign', status);
};

// publish offsets to zookeeper
if (this._kafkaBacklogMetricsConfig) {
this._publishOffsetsCron(doUnassign);
} else {
doUnassign();
}
});

if (!this._processingQueue || this._processingQueue.idle()) {
Expand Down Expand Up @@ -911,37 +920,30 @@ class BackbeatConsumer extends EventEmitter {
this._circuitBreaker.stop();
return async.waterfall([
next => {
if (this._kafkaBacklogMetricsConfig) {
// publish offsets to zookeeper
return this._publishOffsetsCron(() => next());
if (this._consumer.isConnected()) {
const subscription = this._consumer.subscription() || [];
if (subscription.length > 0) {
this._consumer.unsubscribe();
// Wait for partition unassign to complete before
// disconnecting, the rebalance callback will handle
// waiting for current jobs to complete as well as commit
// the latest offsets
this.once('unassign', () => next());
return;
}
}
return process.nextTick(next);
process.nextTick(next);
},
next => {
if (this._zookeeper) {
this._zookeeper.close();
} else {
process.nextTick(next);
}
},
next => {
const subscription = this._consumer.subscription() || [];
if (subscription.length > 0) {
this._consumer.unsubscribe();
// Wait for partition unassign to complete before
// disconnecting, the rebalance callback will handle
// waiting for current jobs to complete as well as commit
// the latest offsets
this.on('unassign', next);
} else {
next(UNASSIGN_STATUS.IDLE);
}
},
(status, next) => {
if (status !== UNASSIGN_STATUS.TIMEOUT) {
if (this._consumer.isConnected()) {
this._consumer.disconnect();
this._consumer.once('disconnected', () => next());
} else {
process.nextTick(next);
}
this._consumer.once('disconnected', () => next());
},
], () => cb());
}
Expand Down

0 comments on commit 409ea16

Please sign in to comment.