Skip to content

Commit

Permalink
Resume connector change stream using the timestamp of the first event
Browse files Browse the repository at this point in the history
To avoid loosing initial events, we start the change stream from the
first event that triggered the creation of the connector.

Issue: BB-471
  • Loading branch information
Kerkesni committed Nov 15, 2023
1 parent c8564d2 commit 6555b06
Show file tree
Hide file tree
Showing 4 changed files with 36 additions and 3 deletions.
5 changes: 3 additions & 2 deletions extensions/oplogPopulator/OplogPopulator.js
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,7 @@ class OplogPopulator {
// no fullDocument field in delete events
const isBackbeatEnabled = change.fullDocument ?
this._isBucketBackbeatEnabled(change.fullDocument.value) : null;
const eventDate = new Date(change.clusterTime);
switch (change.operationType) {
case 'delete':
if (isListeningToBucket) {
Expand All @@ -186,7 +187,7 @@ class OplogPopulator {
await this._allocator.stopListeningToBucket(change.documentKey._id);
// add bucket if it became backbeat enabled
} else if (!isListeningToBucket && isBackbeatEnabled) {
await this._allocator.listenToBucket(change.documentKey._id);
await this._allocator.listenToBucket(change.documentKey._id, eventDate);
}
break;
default:
Expand All @@ -197,7 +198,7 @@ class OplogPopulator {
});
break;
}
const delta = (Date.now() - new Date(change.clusterTime).getTime()) / 1000;
const delta = (Date.now() - eventDate.getTime()) / 1000;
this._metricsHandler.onOplogEventProcessed(change.operationType, delta);
this._logger.info('Change stream event processed', {
method: 'OplogPopulator._handleChangeStreamChange',
Expand Down
3 changes: 3 additions & 0 deletions extensions/oplogPopulator/constants.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,9 @@ const constants = {
'connector.class': 'com.mongodb.kafka.connect.MongoSourceConnector',
'pipeline': '[]',
'collection': '',
// If no timestamp is provided, the startup mode will be equivalent
// to 'latest' which will pick up from the latest event in the oplog
'startup.mode': 'timestamp',
// JSON output converter config
// Using a string converter to avoid getting an over-stringified
// JSON that is returned by default
Expand Down
9 changes: 8 additions & 1 deletion extensions/oplogPopulator/modules/Allocator.js
Original file line number Diff line number Diff line change
Expand Up @@ -67,14 +67,21 @@ class Allocator {
* Starts listening to bucket by
* adding and assigning a connector to it
* @param {string} bucket bucket name
* @param {Date|null} eventDate oplog event date
* @returns {Promise|undefined} undefined
* @throws {InternalError}
*/
async listenToBucket(bucket) {
async listenToBucket(bucket, eventDate = null) {
try {
if (!this._bucketsToConnectors.has(bucket)) {
const connectors = this._connectorsManager.connectors;
const connector = this._allocationStrategy.getConnector(connectors);
// In the initial startup of the oplog populator
// we fetch the buckets directly from mongo.
// We don't have an event date in this case.
if (!eventDate) {
connector.updateResumeDate(eventDate);
}
await connector.addBucket(bucket);
this._bucketsToConnectors.set(bucket, connector);
this._metricsHandler.onConnectorConfigured(connector, 'add');
Expand Down
22 changes: 22 additions & 0 deletions extensions/oplogPopulator/modules/Connector.js
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,26 @@ class Connector {
this._config['offset.partition.name'] = `partition-${uuid.v4()}`;
}

/**
* Updates the resume date of the change stream
* if the connector is not running to the first
* event of the first bucket added
* @param {Date} eventDate oplog event date
* @returns {undefined}
*/
updateResumeDate(eventDate) {
if (this._isRunning) {
return;
}

// resume date is reset when the connector is stopped
if (this._config['startup.mode.timestamp.start.at.operation.time']) {
return;
}

this._config['startup.mode.timestamp.start.at.operation.time'] = eventDate.toISOString();
}

/**
* Creates the Kafka-connect mongo connector
* @returns {Promise|undefined} undefined
Expand All @@ -139,6 +159,7 @@ class Connector {
config: this._config,
});
this._isRunning = true;
delete this._config['startup.mode.timestamp.start.at.operation.time'];
} catch (err) {
this._logger.error('Error while spawning connector', {
method: 'Connector.spawn',
Expand All @@ -165,6 +186,7 @@ class Connector {
try {
await this._kafkaConnect.deleteConnector(this._name);
this._isRunning = false;
delete this._config['startup.mode.timestamp.start.at.operation.time'];
} catch (err) {
this._logger.error('Error while destroying connector', {
method: 'Connector.destroy',
Expand Down

0 comments on commit 6555b06

Please sign in to comment.