From 33a33904767b686f21f65eec49d5a9fae7c1c21d Mon Sep 17 00:00:00 2001 From: Rached Ben Mustapha Date: Tue, 17 Oct 2023 02:15:31 +0000 Subject: [PATCH 1/2] BB-452: add version-id to delete notifications Includes backported logic from 8.x branch, adapted to match available log entries. --- .../NotificationQueuePopulator.js | 79 ++++++++-- lib/queuePopulator/LogReader.js | 6 +- .../unit/lib/queuePopulator/LogReader.spec.js | 57 ++++++- .../NotificationQueuePopulator.js | 149 ++++++++++++++---- 4 files changed, 247 insertions(+), 44 deletions(-) diff --git a/extensions/notification/NotificationQueuePopulator.js b/extensions/notification/NotificationQueuePopulator.js index 979711af6..0391ff46e 100644 --- a/extensions/notification/NotificationQueuePopulator.js +++ b/extensions/notification/NotificationQueuePopulator.js @@ -119,6 +119,59 @@ class NotificationQueuePopulator extends QueuePopulatorExtension { return this.bnConfigManager.removeConfig(bucketName); } + /** + * Returns the correct versionId + * to display according to the + * versioning state of the object + * @param {Object} value log entry object + * @param {Object} overheadFields - extra fields missing from the log entry + * @return {String} versionId + */ + _getVersionId(value, overheadFields) { + const versionId = value.versionId || (overheadFields && overheadFields.versionId); + const isNullVersion = value.isNull; + const isVersioned = !!versionId; + // Versioning suspended objects have + // a versionId, however it is internal + // and should not be used to get the object + if (isNullVersion || !isVersioned) { + return null; + } + return versionId; + } + + /** + * Decides if we should process the entry. + * Since we get both master and version events, + * we need to avoid pushing two notifications for + * the same event. + * - For non-versioned buckets, we process the master + * objects' events. + * - For versioned buckets, we process version events + * and ignore all master events. + * - For versioning suspended buckets, we need to process + * both master and version events, as the master is considered + * a separate version. + * @param {String} key object key + * @param {Object} value object metadata + * @return {boolean} - true if entry is valid + */ + _shouldProcessEntry(key, value) { + const isMaster = isMasterKey(key); + const hasVersionId = !!value.versionId; + + if (!isMaster) { + // versioned keys do generate a notifications. FIXME: in some cases + // they may duplicate the notification with a master update of the + // same null version. + return true; + } + // generate a notification for non-versioned or null-versioned master + // keys, but not for regular versions as then the versioned key triggers + // the notification. + return !hasVersionId || value.isNull; + } + /** * Process object entry from the log * @@ -126,15 +179,21 @@ class NotificationQueuePopulator extends QueuePopulatorExtension { * @param {String} key - object key * @param {Object} value - log entry object * @param {String} type - entry type - * @param {String} commitTimestamp - when the entry was written, used as a fallback + * @param {Object} overheadFields - extra context fields missing from the log entry + * @param {String} overheadFields.commitTimestamp - when the entry was written, used as a fallback * if no last-modified MD attribute available + * @param {String} overheadFields.versionId - version id involved in this operation, if the log entry + * does not contain it * @return {undefined} */ - _processObjectEntry(bucket, key, value, type, commitTimestamp) { - const versionId = value.versionId || null; - if (!isMasterKey(key)) { + _processObjectEntry(bucket, key, value, type, overheadFields) { + if (!this._shouldProcessEntry(key, value)) { return undefined; } + + const versionId = this._getVersionId(value, overheadFields); + const baseKey = this._extractVersionedBaseKey(key); + const config = this.bnConfigManager.getConfig(bucket); if (config && Object.keys(config).length > 0) { const { eventMessageProperty } @@ -145,11 +204,13 @@ class NotificationQueuePopulator extends QueuePopulatorExtension { = value[eventMessageProperty.dateTime]; if (eventType === undefined && type === 'del') { eventType = notifConstants.deleteEvent; - dateTime = commitTimestamp; + if (!dateTime) { + dateTime = (overheadFields && overheadFields.commitTimestamp) || null; + } } const ent = { bucket, - key, + key: baseKey, eventType, versionId, dateTime, @@ -171,7 +232,7 @@ class NotificationQueuePopulator extends QueuePopulatorExtension { eventTime: message.dateTime, }); this.publish(this.notificationConfig.topic, - `${bucket}/${key}`, + `${bucket}/${baseKey}`, JSON.stringify(message)); } return undefined; @@ -187,7 +248,7 @@ class NotificationQueuePopulator extends QueuePopulatorExtension { * @return {undefined} */ filter(entry) { - const { bucket, key, type, timestamp } = entry; + const { bucket, key, type, overheadFields } = entry; const value = entry.value || '{}'; const { error, result } = safeJsonParse(value); // ignore if entry's value is not valid @@ -206,7 +267,7 @@ class NotificationQueuePopulator extends QueuePopulatorExtension { } // object entry processing - filter and publish if (key && result) { - return this._processObjectEntry(bucket, key, result, type, timestamp); + return this._processObjectEntry(bucket, key, result, type, overheadFields); } return undefined; } diff --git a/lib/queuePopulator/LogReader.js b/lib/queuePopulator/LogReader.js index 3fab58479..3891e4d34 100644 --- a/lib/queuePopulator/LogReader.js +++ b/lib/queuePopulator/LogReader.js @@ -379,13 +379,17 @@ class LogReader { } async.eachSeries(this._extensions, (ext, next) => { + const overheadFields = { + commitTimestamp: record.timestamp, + versionId: entry.overhead && entry.overhead.versionId, + }; const entryToFilter = { type: entry.type, bucket: record.db, key: _transformKey(entry.key), value: entry.value, logReader: this, - timestamp: record.timestamp, + overheadFields, }; return _executeFilter(ext, entryToFilter, next); }, cb); diff --git a/tests/unit/lib/queuePopulator/LogReader.spec.js b/tests/unit/lib/queuePopulator/LogReader.spec.js index b898b7d02..f2e003d4b 100644 --- a/tests/unit/lib/queuePopulator/LogReader.spec.js +++ b/tests/unit/lib/queuePopulator/LogReader.spec.js @@ -76,11 +76,66 @@ describe('LogReader', () => { key: testCase.processedKey, value: '{}', logReader, - timestamp: undefined, + overheadFields: { + versionId: undefined, + commitTimestamp: undefined, + }, }); }); }); + it('should pass through overhead.versionId if available in record', () => { + const record = { + db: 'db', + }; + const versionId = 'v1'; + const entry = { + type: 'put', + key: 'k1', + value: '{}', + overhead: { + versionId, + }, + }; + logReader._processLogEntry(null, record, entry); + assert.deepStrictEqual(filteredEntry, { + type: 'put', + bucket: 'db', + key: 'k1', + value: '{}', + logReader, + overheadFields: { + versionId, + commitTimestamp: undefined, + }, + }); + }); + + it('should pass through timestamp if available in record', () => { + const timestamp = 't1'; + const record = { + db: 'db', + timestamp, + }; + const entry = { + type: 'put', + key: 'k1', + value: '{}', + }; + logReader._processLogEntry(null, record, entry); + assert.deepStrictEqual(filteredEntry, { + type: 'put', + bucket: 'db', + key: 'k1', + value: '{}', + logReader, + overheadFields: { + versionId: undefined, + commitTimestamp: timestamp, + }, + }); + }); + it('should start from latest log cseq plus one if no zookeeper log offset', done => { logReader.setup(err => { assert.ifError(err); diff --git a/tests/unit/notification/NotificationQueuePopulator.js b/tests/unit/notification/NotificationQueuePopulator.js index 926fef6f1..64edb1a2e 100644 --- a/tests/unit/notification/NotificationQueuePopulator.js +++ b/tests/unit/notification/NotificationQueuePopulator.js @@ -16,12 +16,24 @@ const value = { }; const fallbackCommitTimestamp = '2022-10-12T00:01:02.003Z'; const mdTimestamp = '2023-10-12T00:01:02.003Z'; +const fallbackVersionId = 'vid001'; const configTopic = 'topic1'; +const bnConfigManager = { + getConfig: () => ({ + bucket, + notificationConfiguration: { + queueConfig: [ + { + events: ['*'], + }, + ], + }, + }), +}; describe('NotificationQueuePopulator', () => { describe('_processObjectEntry', () => { - it('should use the fallback event timestamp for deletes', () => { - const type = 'del'; + it('any op: should use the event timestamp from MD if available', () => { let published = false; const qp = new NotificationQueuePopulator({ @@ -29,18 +41,70 @@ describe('NotificationQueuePopulator', () => { config: { topic: configTopic, }, - bnConfigManager: { - getConfig: () => ({ - bucket, - notificationConfiguration: { - queueConfig: [ - { - events: ['*'], - }, - ], - }, - }), + bnConfigManager, + }); + + qp.publish = (topic, key, data) => { + const parsed = JSON.parse(data); + + assert.deepStrictEqual(topic, configTopic); + assert.deepStrictEqual(key, `${bucket}/${objectKey}`); + assert.deepStrictEqual(parsed.dateTime, mdTimestamp); + + published = true; + }; + + const valueWithMD = { + 'originOp': 's3:ObjectCreated:Put', + 'last-modified': mdTimestamp, + ...value, + }; + + const overheadFields = { + commitTimestamp: fallbackCommitTimestamp, + }; + qp._processObjectEntry(bucket, objectKey, valueWithMD, null, overheadFields); + + assert(published); + }); + + it('any op: should accept null version with non-empty version ids with isNull md', () => { + let published = false; + + const qp = new NotificationQueuePopulator({ + logger: log, + config: { + topic: configTopic, }, + bnConfigManager, + }); + + qp.publish = () => { + published = true; + }; + + const valueWithMD = { + originOp: 's3:ObjectCreated:Put', + versionId: fallbackVersionId, + isNull: true, + ...value, + }; + + qp._processObjectEntry(bucket, `${objectKey}\0`, valueWithMD); + + assert(published); + }); + + it('delete: should use the fallback event timestamp', () => { + const type = 'del'; + let published = false; + + const qp = new NotificationQueuePopulator({ + logger: log, + config: { + topic: configTopic, + }, + bnConfigManager, }); qp.publish = (topic, key, data) => { @@ -53,12 +117,16 @@ describe('NotificationQueuePopulator', () => { published = true; }; - qp._processObjectEntry(bucket, objectKey, value, type, fallbackCommitTimestamp); + const overheadFields = { + commitTimestamp: fallbackCommitTimestamp, + }; + qp._processObjectEntry(bucket, objectKey, value, type, overheadFields); assert(published); }); - it('should use the event timestamp from MD if available', () => { + it('delete: should use version id from overhead fields', () => { + const type = 'del'; let published = false; const qp = new NotificationQueuePopulator({ @@ -66,18 +134,7 @@ describe('NotificationQueuePopulator', () => { config: { topic: configTopic, }, - bnConfigManager: { - getConfig: () => ({ - bucket, - notificationConfiguration: { - queueConfig: [ - { - events: ['*'], - }, - ], - }, - }), - }, + bnConfigManager, }); qp.publish = (topic, key, data) => { @@ -85,18 +142,44 @@ describe('NotificationQueuePopulator', () => { assert.deepStrictEqual(topic, configTopic); assert.deepStrictEqual(key, `${bucket}/${objectKey}`); - assert.deepStrictEqual(parsed.dateTime, mdTimestamp); + assert.deepStrictEqual(parsed.versionId, fallbackVersionId); published = true; }; - const valueWithMD = { - 'originOp': 's3:ObjectCreated:Put', - 'last-modified': mdTimestamp, - ...value, + const overheadFields = { + commitTimestamp: fallbackCommitTimestamp, + versionId: fallbackVersionId, + }; + qp._processObjectEntry(bucket, objectKey, value, type, overheadFields); + + assert(published); + }); + + it('delete: should handle missing overhead fields', () => { + const type = 'del'; + let published = false; + + const qp = new NotificationQueuePopulator({ + logger: log, + config: { + topic: configTopic, + }, + bnConfigManager, + }); + + qp.publish = (topic, key, data) => { + const parsed = JSON.parse(data); + + assert.deepStrictEqual(topic, configTopic); + assert.deepStrictEqual(key, `${bucket}/${objectKey}`); + assert.deepStrictEqual(parsed.versionId, null); + assert.deepStrictEqual(parsed.dateTime, null); + + published = true; }; - qp._processObjectEntry(bucket, objectKey, valueWithMD, null, fallbackCommitTimestamp); + qp._processObjectEntry(bucket, objectKey, value, type); assert(published); }); From a49105ecbf455fa637baa8c7a9e4d17617482646 Mon Sep 17 00:00:00 2001 From: Rached Ben Mustapha Date: Thu, 26 Oct 2023 22:55:59 +0000 Subject: [PATCH 2/2] BB-452: bump version --- package.json | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/package.json b/package.json index 2c1833a5e..b0170273b 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "backbeat", - "version": "7.70.8", + "version": "7.70.9", "description": "Asynchronous queue and job manager", "main": "index.js", "scripts": {