Skip to content

Commit

Permalink
Resume connector change stream using the timestamp of the first event
Browse files Browse the repository at this point in the history
To avoid loosing initial events, we start the change stream from the
first event that triggered the creation of the connector.

Issue: BB-471
  • Loading branch information
Kerkesni committed Nov 16, 2023
1 parent c8564d2 commit fec6d95
Show file tree
Hide file tree
Showing 7 changed files with 69 additions and 4 deletions.
5 changes: 3 additions & 2 deletions extensions/oplogPopulator/OplogPopulator.js
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,7 @@ class OplogPopulator {
// no fullDocument field in delete events
const isBackbeatEnabled = change.fullDocument ?
this._isBucketBackbeatEnabled(change.fullDocument.value) : null;
const eventDate = new Date(change.clusterTime);
switch (change.operationType) {
case 'delete':
if (isListeningToBucket) {
Expand All @@ -186,7 +187,7 @@ class OplogPopulator {
await this._allocator.stopListeningToBucket(change.documentKey._id);
// add bucket if it became backbeat enabled
} else if (!isListeningToBucket && isBackbeatEnabled) {
await this._allocator.listenToBucket(change.documentKey._id);
await this._allocator.listenToBucket(change.documentKey._id, eventDate);
}
break;
default:
Expand All @@ -197,7 +198,7 @@ class OplogPopulator {
});
break;
}
const delta = (Date.now() - new Date(change.clusterTime).getTime()) / 1000;
const delta = (Date.now() - eventDate.getTime()) / 1000;
this._metricsHandler.onOplogEventProcessed(change.operationType, delta);
this._logger.info('Change stream event processed', {
method: 'OplogPopulator._handleChangeStreamChange',
Expand Down
3 changes: 3 additions & 0 deletions extensions/oplogPopulator/constants.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,9 @@ const constants = {
'connector.class': 'com.mongodb.kafka.connect.MongoSourceConnector',
'pipeline': '[]',
'collection': '',
// If no timestamp is provided, the startup mode will be equivalent
// to 'latest' which will pick up from the latest event in the oplog
'startup.mode': 'timestamp',
// JSON output converter config
// Using a string converter to avoid getting an over-stringified
// JSON that is returned by default
Expand Down
9 changes: 8 additions & 1 deletion extensions/oplogPopulator/modules/Allocator.js
Original file line number Diff line number Diff line change
Expand Up @@ -67,14 +67,21 @@ class Allocator {
* Starts listening to bucket by
* adding and assigning a connector to it
* @param {string} bucket bucket name
* @param {Date|null} eventDate oplog event date
* @returns {Promise|undefined} undefined
* @throws {InternalError}
*/
async listenToBucket(bucket) {
async listenToBucket(bucket, eventDate = null) {
try {
if (!this._bucketsToConnectors.has(bucket)) {
const connectors = this._connectorsManager.connectors;
const connector = this._allocationStrategy.getConnector(connectors);
// In the initial startup of the oplog populator
// we fetch the buckets directly from mongo.
// We don't have an event date in this case.
if (eventDate) {
connector.setResumePoint(eventDate);
}
await connector.addBucket(bucket);
this._bucketsToConnectors.set(bucket, connector);
this._metricsHandler.onConnectorConfigured(connector, 'add');
Expand Down
23 changes: 23 additions & 0 deletions extensions/oplogPopulator/modules/Connector.js
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,27 @@ class Connector {
this._config['offset.partition.name'] = `partition-${uuid.v4()}`;
}

/**
* Sets the resume point of the change stream
* to the first event of the first bucket added
* to the connector.
* @param {Date} eventDate oplog event date
* @returns {undefined}
*/
setResumePoint(eventDate) {
if (this._config['startup.mode.timestamp.start.at.operation.time']) {
return;
}

this._config['startup.mode.timestamp.start.at.operation.time'] = eventDate.toISOString();

this._logger.info('Updating resume date', {
method: 'Connector.updateResumeDate',
date: eventDate.toISOString(),
connector: this._name,
});
}

/**
* Creates the Kafka-connect mongo connector
* @returns {Promise|undefined} undefined
Expand Down Expand Up @@ -165,6 +186,8 @@ class Connector {
try {
await this._kafkaConnect.deleteConnector(this._name);
this._isRunning = false;
// resetting the resume point to set a new one on creation of the connector
delete this._config['startup.mode.timestamp.start.at.operation.time'];
} catch (err) {
this._logger.error('Error while destroying connector', {
method: 'Connector.destroy',
Expand Down
2 changes: 1 addition & 1 deletion extensions/oplogPopulator/modules/ConnectorsManager.js
Original file line number Diff line number Diff line change
Expand Up @@ -233,7 +233,7 @@ class ConnectorsManager {
if (connector.isRunning && connector.bucketCount === 0) {
await connector.destroy();
this._metricsHandler.onConnectorDestroyed();
this._logger.info('Successfully destroyed a connector', {
this._logger.info('Successfully spawned a connector', {
method: 'ConnectorsManager._spawnOrDestroyConnector',
connector: connector.name
});
Expand Down
30 changes: 30 additions & 0 deletions tests/unit/oplogPopulator/Connector.js
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,13 @@ describe('Connector', () => {
await connector.destroy();
assert(deleteStub.notCalled);
});
it('Should reset resume point', async () => {
sinon.stub(connector._kafkaConnect, 'deleteConnector')
.resolves();
connector._isRunning = true;
await connector.destroy();
assert.strictEqual(connector.config['startup.mode.timestamp.start.at.operation.time'], undefined);
});
});

describe('addBucket', () => {
Expand Down Expand Up @@ -256,4 +263,27 @@ describe('Connector', () => {
assert.notStrictEqual(connector._config['offset.partition.name'], 'partition-name');
});
});

describe('setResumePoint', () => {
it('Should not set the resume point when resume point already set', () => {
connector._isRunning = false;
connector._config['startup.mode.timestamp.start.at.operation.time'] = '2023-11-15T16:18:53.000Z';
connector.setResumePoint(new Date('2023-11-16T16:18:53.000Z'));
assert.strictEqual(
connector.config['startup.mode.timestamp.start.at.operation.time'],
'2023-11-15T16:18:53.000Z',
);
});

it('Should set the resume point when not present and connector is stopped', () => {
connector._isRunning = false;
delete connector._config['startup.mode.timestamp.start.at.operation.time'];

connector.setResumePoint(new Date('2023-11-16T16:18:53.000Z'));
assert.strictEqual(
connector.config['startup.mode.timestamp.start.at.operation.time'],
'2023-11-16T16:18:53.000Z',
);
});
});
});
1 change: 1 addition & 0 deletions tests/unit/oplogPopulator/ConnectorsManager.js
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ const connectorConfig = {
'connector.class': 'com.mongodb.kafka.connect.MongoSourceConnector',
'pipeline': '[]',
'collection': '',
'startup.mode': 'timestamp',
'output.format.value': 'json',
'value.converter.schemas.enable': false,
'value.converter': 'org.apache.kafka.connect.storage.StringConverter',
Expand Down

0 comments on commit fec6d95

Please sign in to comment.