diff --git a/extensions/mongoProcessor/MongoQueueProcessor.js b/extensions/mongoProcessor/MongoQueueProcessor.js index 4e455582d..e06fcc425 100644 --- a/extensions/mongoProcessor/MongoQueueProcessor.js +++ b/extensions/mongoProcessor/MongoQueueProcessor.js @@ -400,7 +400,14 @@ class MongoQueueProcessor { _processDeleteOpQueueEntry(log, sourceEntry, location, bucketInfo, done) { const bucket = sourceEntry.getBucket(); const key = sourceEntry.getObjectKey(); - const versionId = extractVersionId(sourceEntry.getObjectVersionedKey()); + const entryVersionId = extractVersionId(sourceEntry.getObjectVersionedKey()); + + // Use x-amz-meta-scal-version-id if provided, instead of the actual versionId of the object. + // This should happen only for restored objects : in all other situations, both the source + // and ingested objects should have the same version id (and no x-amz-meta-scal-version-id + // metadata). + const scalVersionId = sourceEntry.getOverheadField('x-amz-meta-scal-version-id'); + const versionId = scalVersionId ? VersionID.decode(scalVersionId) : entryVersionId; this.logger.debug('processing object delete', { bucket, key, versionId }); @@ -416,7 +423,7 @@ class MongoQueueProcessor { zenkoObjMd.location?.length !== 1 || zenkoObjMd.location[0].dataStoreName !== location || zenkoObjMd.location[0].key !== key || - (zenkoObjMd.location[0].dataStoreVersionId || 'null') !== encode(versionId) + (zenkoObjMd.location[0].dataStoreVersionId || 'null') !== encode(entryVersionId) ) { log.end().info('ignore delete entry, transitioned to another location', { entry: sourceEntry.getLogInfo(), diff --git a/lib/models/DeleteOpQueueEntry.js b/lib/models/DeleteOpQueueEntry.js index 790605482..a4aadeac4 100644 --- a/lib/models/DeleteOpQueueEntry.js +++ b/lib/models/DeleteOpQueueEntry.js @@ -12,13 +12,16 @@ class DeleteOpQueueEntry { * @param {string} bucket - entry bucket * @param {string} key - entry key (a versioned key if the entry * was for a versioned object or a regular key if no versioning) + * @param {object} overheadFields - overhead fields associated with + * the operation (e.g. previous object's metadata) */ - constructor(bucket, key) { + constructor(bucket, key, overheadFields) { this._bucket = bucket; this._key = key; this._objectVersionedKey = key; this._objectKey = _extractVersionedBaseKey(key); this._startProcessing = Date.now(); + this._overheadFields = overheadFields || {}; } getStartProcessing() { @@ -32,6 +35,9 @@ class DeleteOpQueueEntry { if (typeof this.getObjectKey() !== 'string') { return { message: 'missing key name' }; } + if (typeof this._overheadFields !== 'object') { + return { message: 'invalid overhead fields' }; + } return undefined; } @@ -51,6 +57,10 @@ class DeleteOpQueueEntry { return this._objectVersionedKey; } + getOverheadField(field) { + return this._overheadFields[field]; + } + isVersion() { return this.getObjectKey() === this.getObjectVersionedKey(); } diff --git a/lib/models/QueueEntry.js b/lib/models/QueueEntry.js index 03ca2acda..49e29dc99 100644 --- a/lib/models/QueueEntry.js +++ b/lib/models/QueueEntry.js @@ -27,7 +27,8 @@ class QueueEntry { } let entry; if (record.type === 'del') { - entry = new DeleteOpQueueEntry(record.bucket, record.key); + const overheadFields = JSON.parse(record.overheadFields); + entry = new DeleteOpQueueEntry(record.bucket, record.key, overheadFields); } else if (record.bucket === usersBucket) { // BucketQueueEntry class just handles puts of keys // to usersBucket diff --git a/lib/queuePopulator/LogReader.js b/lib/queuePopulator/LogReader.js index 1c894d77f..9fac824b8 100644 --- a/lib/queuePopulator/LogReader.js +++ b/lib/queuePopulator/LogReader.js @@ -516,9 +516,9 @@ class LogReader { } async.eachSeries(this._extensions, (ext, next) => { const overheadFields = { + ...entry.overhead, commitTimestamp: record.timestamp, opTimestamp: entry.timestamp, - versionId: entry.overhead?.versionId, }; const entryToFilter = { type: entry.type, diff --git a/tests/functional/ingestion/MongoQueueProcessor.js b/tests/functional/ingestion/MongoQueueProcessor.js index f7aef46c3..949691f2b 100644 --- a/tests/functional/ingestion/MongoQueueProcessor.js +++ b/tests/functional/ingestion/MongoQueueProcessor.js @@ -907,6 +907,50 @@ describe('MongoQueueProcessor', function mqp() { }); }); + it('should use scal-version-id overhead field', done => { + // use existing version id + const versionKey = `${KEY}${VID_SEP}${NEW_VERSION_ID}`; + const entry = new DeleteOpQueueEntry(BUCKET, versionKey, { + 'x-amz-meta-scal-version-id': encode(VERSION_ID), + }); + const getObject = sinon.stub(mongoClient, 'getObject').yields(null, new ObjectMD() + .setKey(KEY) + .setVersionId(VERSION_ID) + .setDataStoreName(LOCATION) + .setLocation([{ + key: KEY, + dataStoreVersionId: encode(NEW_VERSION_ID), + dataStoreName: LOCATION, + }]) + .getValue()); + const deleteObject = sinon.stub(mongoClient, 'deleteObject').callThrough(); + async.waterfall([ + next => mongoClient.getBucketAttributes(BUCKET, fakeLogger, + next), + (bucketInfo, next) => mqp._processDeleteOpQueueEntry(fakeLogger, + entry, LOCATION, bucketInfo, next), + ], err => { + assert.ifError(err); + + sinon.assert.calledOnce(getObject); + assert.deepStrictEqual(deleteObject.getCall(0).args[0], BUCKET); + assert.deepStrictEqual(deleteObject.getCall(0).args[1], KEY); + assert.deepStrictEqual(deleteObject.getCall(0).args[2], { + versionId: VERSION_ID + }); + + sinon.assert.calledOnce(deleteObject); + assert.deepStrictEqual(deleteObject.getCall(0).args[0], BUCKET); + assert.deepStrictEqual(deleteObject.getCall(0).args[1], KEY); + assert.deepStrictEqual(deleteObject.getCall(0).args[2], { + doesNotNeedOpogUpdate: true, + versionId: VERSION_ID + }); + + done(); + }); + }); + it('should not delete object from mongo when object is in another location', done => { // use existing version id const versionKey = `${KEY}${VID_SEP}${VERSION_ID}`;