diff --git a/extensions/oplogPopulator/OplogPopulator.js b/extensions/oplogPopulator/OplogPopulator.js index 22694f0aac..09a1cbe291 100644 --- a/extensions/oplogPopulator/OplogPopulator.js +++ b/extensions/oplogPopulator/OplogPopulator.js @@ -172,6 +172,7 @@ class OplogPopulator { // no fullDocument field in delete events const isBackbeatEnabled = change.fullDocument ? this._isBucketBackbeatEnabled(change.fullDocument.value) : null; + const eventDate = new Date(change.clusterTime); switch (change.operationType) { case 'delete': if (isListeningToBucket) { @@ -186,7 +187,7 @@ class OplogPopulator { await this._allocator.stopListeningToBucket(change.documentKey._id); // add bucket if it became backbeat enabled } else if (!isListeningToBucket && isBackbeatEnabled) { - await this._allocator.listenToBucket(change.documentKey._id); + await this._allocator.listenToBucket(change.documentKey._id, eventDate); } break; default: @@ -197,7 +198,7 @@ class OplogPopulator { }); break; } - const delta = (Date.now() - new Date(change.clusterTime).getTime()) / 1000; + const delta = (Date.now() - eventDate.getTime()) / 1000; this._metricsHandler.onOplogEventProcessed(change.operationType, delta); this._logger.info('Change stream event processed', { method: 'OplogPopulator._handleChangeStreamChange', diff --git a/extensions/oplogPopulator/constants.js b/extensions/oplogPopulator/constants.js index 079f9bbd95..269f11de1d 100644 --- a/extensions/oplogPopulator/constants.js +++ b/extensions/oplogPopulator/constants.js @@ -5,6 +5,9 @@ const constants = { 'connector.class': 'com.mongodb.kafka.connect.MongoSourceConnector', 'pipeline': '[]', 'collection': '', + // If no timestamp is provided, the startup mode will be equivalent + // to 'latest' which will pick up from the latest event in the oplog + 'startup.mode': 'timestamp', // JSON output converter config // Using a string converter to avoid getting an over-stringified // JSON that is returned by default diff --git a/extensions/oplogPopulator/modules/Allocator.js b/extensions/oplogPopulator/modules/Allocator.js index 29991a4798..328c0bb472 100644 --- a/extensions/oplogPopulator/modules/Allocator.js +++ b/extensions/oplogPopulator/modules/Allocator.js @@ -67,14 +67,21 @@ class Allocator { * Starts listening to bucket by * adding and assigning a connector to it * @param {string} bucket bucket name + * @param {Date|null} eventDate oplog event date * @returns {Promise|undefined} undefined * @throws {InternalError} */ - async listenToBucket(bucket) { + async listenToBucket(bucket, eventDate = null) { try { if (!this._bucketsToConnectors.has(bucket)) { const connectors = this._connectorsManager.connectors; const connector = this._allocationStrategy.getConnector(connectors); + // In the initial startup of the oplog populator + // we fetch the buckets directly from mongo. + // We don't have an event date in this case. + if (!eventDate) { + connector.updateResumeDate(eventDate); + } await connector.addBucket(bucket); this._bucketsToConnectors.set(bucket, connector); this._metricsHandler.onConnectorConfigured(connector, 'add'); diff --git a/extensions/oplogPopulator/modules/Connector.js b/extensions/oplogPopulator/modules/Connector.js index f1bd46f16f..70611b4cc8 100644 --- a/extensions/oplogPopulator/modules/Connector.js +++ b/extensions/oplogPopulator/modules/Connector.js @@ -118,6 +118,26 @@ class Connector { this._config['offset.partition.name'] = `partition-${uuid.v4()}`; } + /** + * Updates the resume date of the change stream + * if the connector is not running to the first + * event of the first bucket added + * @param {Date} eventDate oplog event date + * @returns {undefined} + */ + updateResumeDate(eventDate) { + if (this._isRunning) { + return; + } + + // resume date is reset when the connector is stopped + if (this._config['startup.mode.timestamp.start.at.operation.time']) { + return; + } + + this._config['startup.mode.timestamp.start.at.operation.time'] = eventDate.toISOString(); + } + /** * Creates the Kafka-connect mongo connector * @returns {Promise|undefined} undefined @@ -139,6 +159,7 @@ class Connector { config: this._config, }); this._isRunning = true; + delete this._config['startup.mode.timestamp.start.at.operation.time']; } catch (err) { this._logger.error('Error while spawning connector', { method: 'Connector.spawn', @@ -165,6 +186,7 @@ class Connector { try { await this._kafkaConnect.deleteConnector(this._name); this._isRunning = false; + delete this._config['startup.mode.timestamp.start.at.operation.time']; } catch (err) { this._logger.error('Error while destroying connector', { method: 'Connector.destroy',