From 04886d426d10626d8f4b3070a7930feabb38cde8 Mon Sep 17 00:00:00 2001 From: Francois Ferrand Date: Wed, 4 Dec 2024 17:50:19 +0100 Subject: [PATCH] Properly handle archive Issue: BB-590 --- .../mongoProcessor/MongoQueueProcessor.js | 215 +++++++++--------- .../ingestion/MongoQueueProcessor.js | 179 +++++++++++---- 2 files changed, 244 insertions(+), 150 deletions(-) diff --git a/extensions/mongoProcessor/MongoQueueProcessor.js b/extensions/mongoProcessor/MongoQueueProcessor.js index 36424eaba..cbe4cfdcb 100644 --- a/extensions/mongoProcessor/MongoQueueProcessor.js +++ b/extensions/mongoProcessor/MongoQueueProcessor.js @@ -195,33 +195,23 @@ class MongoQueueProcessor { ], done); } - _getZenkoObjectMetadata(log, entry, bucketInfo, done) { - // NOTE: This is used for updating replication info, as well as validating the - // `x-amz-meta-scal-version-id` header. If the Zenko bucket does not have repInfo set and - // the header is not set, then we can ignore fetching - const bucketRepInfo = bucketInfo.getReplicationConfiguration(); - // KO for DeleteOpQueueEntry : no such field (and no metadata...) - const scalVersionId = entry.getValue ? entry.getValue()['x-amz-meta-scal-version-id'] : undefined; - if (!(entry instanceof DeleteOpQueueEntry) && - !scalVersionId && - (!bucketRepInfo || !bucketRepInfo.rules || !bucketRepInfo.rules[0].enabled)) { - return done(); - } - + /** + * Retrieve Zenko object metadata from MongoDB + * @param {Logger} log The logger object + * @param {ObjectQueueEntry|DeleteOpQueueEntry} entry The entry to being processed + * @param {string} versionId The version id of the object + * @param {function} done The callback function + * @returns {undefined} + */ + _getZenkoObjectMetadata(log, entry, versionId, done) { const bucket = entry.getBucket(); const key = entry.getObjectKey(); const params = {}; - // Use x-amz-meta-scal-version-id if provided, instead of the actual versionId of the object. - // This should happen only for restored objects : in all other situations, both the source - // and ingested objects should have the same version id (and not x-amz-meta-scal-version-id - // metadata). - const versionId = VersionID.decode(scalVersionId) || entry.getVersionId(); - // master keys with a 'null' version id comming from // a versioning suspended bucket are considered a version // we should not specify the version id in this case - if (versionId && !entry.getIsNull()) { + if (versionId && !(entry.getIsNull && entry.getIsNull())) { params.versionId = versionId; } @@ -411,35 +401,59 @@ class MongoQueueProcessor { const key = sourceEntry.getObjectKey(); const versionId = extractVersionId(sourceEntry.getObjectVersionedKey()); - const options = versionId ? { versionId } : undefined; - - // Calling deleteObject with undefined options to use deleteObjectNoVer which is used for - // deleting non versioned objects that only have master keys. - // When deleting a versioned object however we supply the version id in the options, which - // causes the function to call the deleteObjectVer function that is used to handle objects that - // have both a master and version keys. This handles the deletion of both the version and the master - // keys in the case where no other version is available, or deleting the version and updating the - // master key otherwise. - return this._mongoClient.deleteObject(bucket, key, options, log, - err => { - if (err) { - this._normalizePendingMetric(location); - log.end().error('error deleting object metadata ' + - 'from mongo', { - bucket, - key, - error: err.message, + this.logger.debug('processing object delete', { bucket, key, versionId }); + + async.waterfall([ + cb => this._getZenkoObjectMetadata(log, sourceEntry, versionId, cb), + (zenkoObjMd, cb) => { + if (zenkoObjMd.dataStoreName !== location) { + log.end().info('ignore delete entry, transitioned to another location', { + entry: sourceEntry.getLogInfo(), location, }); - return done(err); + return done(); } - this._produceMetricCompletionEntry(location); - log.end().info('object metadata deleted from mongo', { + + return cb(); + }, + cb => { + // Calling deleteObject with undefined options to use deleteObjectNoVer which is used for + // deleting non versioned objects that only have master keys. + // When deleting a versioned object however we supply the version id in the options, which + // causes the function to call the deleteObjectVer function that is used to handle objects that + // have both a master and version keys. This handles the deletion of both the version and the master + // keys in the case where no other version is available, or deleting the version and updating the + // master key otherwise. + const options = versionId ? { versionId } : undefined; + + return this._mongoClient.deleteObject(bucket, key, options, log, cb); + }, + ], err => { + if (err?.is.NoSuchKey) { + log.end().info('skipping delete entry', { entry: sourceEntry.getLogInfo(), location, }); return done(); + } + if (err) { + this._normalizePendingMetric(location); + log.end().error('error deleting object metadata ' + + 'from mongo', { + bucket, + key, + error: err.message, + location, + }); + return done(err); + } + this._produceMetricCompletionEntry(location); + log.end().info('object metadata deleted from mongo', { + entry: sourceEntry.getLogInfo(), + location, }); + return done(); + }); } /** @@ -454,14 +468,29 @@ class MongoQueueProcessor { _processObjectQueueEntry(log, sourceEntry, location, bucketInfo, done) { const bucket = sourceEntry.getBucket(); const key = sourceEntry.getObjectKey(); + const scalVersionId = sourceEntry.getValue()['x-amz-meta-scal-version-id']; - this.logger.info('processing object metadata', { - bucket, key, scalVersionId: sourceEntry.getValue()['x-amz-meta-scal-version-id'], - }); + this.logger.debug('processing object metadata', { bucket, key, scalVersionId }); - this._getZenkoObjectMetadata(log, sourceEntry, bucketInfo, - (err, zenkoObjMd) => { - if (err) { + const maybeGetZenkoObjectMetadata = cb => { + // NOTE: ZenkoObjMD is used for updating replication info, as well as validating the + // `x-amz-meta-scal-version-id` header of restored objects. If the Zenko bucket does + // not have repInfo set and the header is not set, then we can skip fetching. + const bucketRepInfo = bucketInfo.getReplicationConfiguration(); + if (!scalVersionId && !bucketRepInfo?.rules?.some(r => r.enabled)) { + return cb(); + } + + // Use x-amz-meta-scal-version-id if provided, instead of the actual versionId of the object. + // This should happen only for restored objects : in all other situations, both the source + // and ingested objects should have the same version id (and not x-amz-meta-scal-version-id + // metadata). + const versionId = scalVersionId ? VersionID.decode(scalVersionId) : sourceEntry.getVersionId(); + return this._getZenkoObjectMetadata(log, sourceEntry, versionId, cb); + }; + + maybeGetZenkoObjectMetadata((err, zenkoObjMd) => { + if (err && !err.NoSuchKey) { this._normalizePendingMetric(location); log.end().error('error processing object queue entry', { method: 'MongoQueueProcessor._processObjectQueueEntry', @@ -471,6 +500,35 @@ class MongoQueueProcessor { return done(err); } + // If the object has `x-amz-meta-scal-version-id`, we need to use it instead of the id. + // This should only happen for objects restored onto the OOB location, and the location + // should match in that case + if (scalVersionId) { + if (!zenkoObjMd) { + this.logger.warn('missing source entry, ignoring x-amz-meta-scal-version-id', { + method: 'MongoQueueProcessor._processObjectQueueEntry', + location, + }); + } else if (zenkoObjMd.location[0]?.dataStoreVersionId !== sourceEntry.getVersionId()) { + this.logger.warn('mismatched source entry, ignoring x-amz-meta-scal-version-id', { + method: 'MongoQueueProcessor._processObjectQueueEntry', + location, + }); + } else { + this.logger.info('restored oob object', { + bucket, key, scalVersionId, zenkoObjMd, sourceEntry + }); + + sourceEntry.setVersionId(scalVersionId); + + // TODO: do we need to update the (mongo) metadata in that case??? + // - This may happen if object is re-tagged while restored? + // - Need to cleanup scal version id: delete objVal['x-amz-meta-scal-version-id']; + // - Need to keep the archive & restore fields in the metadata + return done(); + } + } + const content = getContentType(sourceEntry, zenkoObjMd); if (content.length === 0) { this._normalizePendingMetric(location); @@ -498,33 +556,6 @@ class MongoQueueProcessor { const objVal = sourceEntry.getValue(); const params = {}; - // If the object has `x-amz-meta-scal-version-id`, we need to use it instead of the id. - // This should only happen for objects restored onto the OOB location, and the location - // should match in that case - const scalVersionId = objVal['x-amz-meta-scal-version-id']; - if (scalVersionId) { - - this.logger.info('restored oob object', { - bucket, key, scalVersionId, zenkoObjMd, sourceEntry - }); - - if (!zenkoObjMd) { - this.logger.warn('missing source entry, ignoring x-amz-meta-scal-version-id', { - method: 'MongoQueueProcessor._processObjectQueueEntry', - location, - }); - } else if (zenkoObjMd.location[0]?.dataStoreVersionId !== sourceEntry.getVersionId()) { - this.logger.warn('mismatched source entry, ignoring x-amz-meta-scal-version-id', { - method: 'MongoQueueProcessor._processObjectQueueEntry', - location, - }); - } else { - sourceEntry.setVersionId(zenkoObjMd.versionId); - delete objVal['x-amz-meta-scal-version-id']; - delete objVal['x-amz-meta-scal-restore-info']; - } - } - // Versioning suspended entries will have a version id but also a isNull tag. // These master keys are considered a version and do not have a duplicate version, // we don't specify the version id and repairMaster in this case @@ -654,7 +685,7 @@ class MongoQueueProcessor { const log = this.logger.newRequestLogger(); const sourceEntry = QueueEntry.createFromKafkaEntry(kafkaEntry); - this.logger.info('processing kafka entry', { sourceEntry }); + this.logger.trace('processing kafka entry', { sourceEntry }); if (sourceEntry.error) { log.end().error('error processing source entry', @@ -671,37 +702,9 @@ class MongoQueueProcessor { const location = bucketInfo.getLocationConstraint(); if (sourceEntry instanceof DeleteOpQueueEntry) { - return this._getZenkoObjectMetadata(log, sourceEntry, bucketInfo, (err, zenkoObjMd) => { - if (err) { - this._normalizePendingMetric(location); - return done(err); - } - - // TODO: use getReducedLocation() ? or x-amz-storage-class ? or dataStoreName ? - // * x-amz-storage-class --> will always indicate the "cold" location - // * dataStoreName will be cold when archived, and "oob" when restored - // ==> we update the data store name (and x-amz-storage-class) before actually - // removing the data so we should look at this: so we can ignore when there is - // a match (e.g. lifecycle gc) and process when the event when user deletes - // the (restored) object - // - // TODO: two cases for restored object to test - // - expiring restored object from Sorbet --> delete from Zenko, should work fine... - // - delete made on the ring (user deletes the object) --> need to be propagated to Zenko! - if (zenkoObjMd?.dataStoreName !== location) { - log.end().info('skipping delete entry with location mismatch', { - entry: sourceEntry.getLogInfo(), - location, - zenkoLocation: zenkoObjMd.location, - }); - this._normalizePendingMetric(location); - return done(); - } - - return this._processDeleteOpQueueEntry(log, sourceEntry, location, err => { - this._handleMetrics(sourceEntry, !!err); - return done(err); - }); + return this._processDeleteOpQueueEntry(log, sourceEntry, location, err => { + this._handleMetrics(sourceEntry, !!err); + return done(err); }); } diff --git a/tests/functional/ingestion/MongoQueueProcessor.js b/tests/functional/ingestion/MongoQueueProcessor.js index e37abea75..6dc0630b6 100644 --- a/tests/functional/ingestion/MongoQueueProcessor.js +++ b/tests/functional/ingestion/MongoQueueProcessor.js @@ -2,6 +2,7 @@ const assert = require('assert'); const async = require('async'); +const sinon = require('sinon'); const { ObjectMD, BucketInfo } = require('arsenal').models; const { decode, encode } = require('arsenal').versioning.VersionID; const errors = require('arsenal').errors; @@ -13,6 +14,7 @@ const MongoQueueProcessor = require('../../../extensions/mongoProcessor/MongoQueueProcessor'); const authdata = require('../../../conf/authdata.json'); const ObjectQueueEntry = require('../../../lib/models/ObjectQueueEntry'); +const DeleteOpQueueEntry = require('../../../lib/models/DeleteOpQueueEntry'); const fakeLogger = require('../../utils/fakeLogger'); const kafkaConfig = config.kafka; @@ -119,8 +121,9 @@ class MongoClientMock { // we get object from mongo to determine replicationInfo.Content types. // use "tags" and "versionId" for determining this. const obj = new ObjectMD() - .setVersionId(VERSION_ID) - .setTags({ mytag: 'mytags-value' }); + .setVersionId(VERSION_ID) + .setTags({ mytag: 'mytags-value' }) + .setDataStoreName(LOCATION); return cb(null, obj._data); } @@ -204,74 +207,86 @@ describe('MongoQueueProcessor', function mqp() { afterEach(() => { mqp.reset(); + sinon.restore(); }); describe('::_getZenkoObjectMetadata', () => { - function testGetZenkoObjectMetadata(entry, cb) { - mongoClient.getBucketAttributes(BUCKET, fakeLogger, - (error, bucketInfo) => { - assert.ifError(error); - - mqp._getZenkoObjectMetadata(fakeLogger, entry, bucketInfo, cb); - }); - } - - it('should return empty if key does not exist in mongo', done => { + it('should return an error if key does not exist in mongo', done => { const key = 'nonexistant'; const objmd = new ObjectMD().setKey(key); const entry = new ObjectQueueEntry(BUCKET, key, objmd); - testGetZenkoObjectMetadata(entry, (err, res) => { - assert.ifError(err); + mqp._getZenkoObjectMetadata(fakeLogger, entry, VERSION_ID, (err, res) => { + assert.ok(err?.is?.NoSuchKey); assert.strictEqual(res, undefined); return done(); }); }); - it('should return empty if version id of object does not exist in ' + - 'mongo', done => { + it('should return an error if version id of object does not exist in mongo', done => { const versionKey = `${KEY}${VID_SEP}${NEW_VERSION_ID}`; const objmd = new ObjectMD() - .setKey(KEY) - .setVersionId(NEW_VERSION_ID); + .setKey(KEY) + .setVersionId(NEW_VERSION_ID); const entry = new ObjectQueueEntry(BUCKET, versionKey, objmd); - testGetZenkoObjectMetadata(entry, (err, res) => { - assert.ifError(err); + mqp._getZenkoObjectMetadata(fakeLogger, entry, NEW_VERSION_ID, (err, res) => { + assert.ok(err?.is?.NoSuchKey); assert.strictEqual(res, undefined); return done(); }); }); - it('should return empty if bucket replication info is disabled', - done => { - const disabledRepInfo = Object.assign({}, mockReplicationInfo, { - rules: [{ enabled: false }], - }); - const disabledMockBucketInfo = { - getReplicationConfiguration: () => disabledRepInfo, - }; - const versionKey = `${KEY}${VID_SEP}${NEW_VERSION_ID}`; + it('should return object metadata for existing version', done => { + const versionKey = `${KEY}${VID_SEP}${VERSION_ID}`; const objmd = new ObjectMD() - .setKey(KEY) - .setVersionId(NEW_VERSION_ID); + .setKey(KEY) + .setVersionId(VERSION_ID); const entry = new ObjectQueueEntry(BUCKET, versionKey, objmd); - mqp._getZenkoObjectMetadata(fakeLogger, entry, - disabledMockBucketInfo, (err, res) => { - assert.ifError(err); + mqp._getZenkoObjectMetadata(fakeLogger, entry, VERSION_ID, (err, res) => { + assert.ifError(err); - assert.strictEqual(res, undefined); - return done(); - }); + assert(res); + assert.strictEqual(res.versionId, VERSION_ID); + return done(); + }); }); - it('should return object metadata for existing version', done => { + it('should return object metadata for existing version from DeleteOpQueueEntry', done => { const versionKey = `${KEY}${VID_SEP}${VERSION_ID}`; + const entry = new DeleteOpQueueEntry(BUCKET, versionKey); + mqp._getZenkoObjectMetadata(fakeLogger, entry, VERSION_ID, (err, res) => { + assert.ifError(err); + + assert(res); + assert.strictEqual(res.versionId, VERSION_ID); + return done(); + }); + }); + + it('should return object metadata for null "master" version', done => { + const versionKey = `${KEY}`; const objmd = new ObjectMD() - .setKey(KEY) - .setVersionId(VERSION_ID); + .setKey(KEY); const entry = new ObjectQueueEntry(BUCKET, versionKey, objmd); - testGetZenkoObjectMetadata(entry, (err, res) => { + mqp._getZenkoObjectMetadata(fakeLogger, entry, null, (err, res) => { + assert.ifError(err); + + assert(res); + assert.strictEqual(res.versionId, VERSION_ID); + return done(); + }); + }); + + it('should return object metadata for null "suspended" version', done => { + const versionKey = `${KEY}${VID_SEP}${NEW_VERSION_ID}`; + const objmd = new ObjectMD() + .setKey(KEY) + .setVersionId(NEW_VERSION_ID) + .setNullVersionId(NEW_VERSION_ID) + .setIsNull(true); + const entry = new ObjectQueueEntry(BUCKET, versionKey, objmd); + mqp._getZenkoObjectMetadata(fakeLogger, entry, NEW_VERSION_ID, (err, res) => { assert.ifError(err); assert(res); @@ -551,8 +566,9 @@ describe('MongoQueueProcessor', function mqp() { // use existing version id const versionKey = `${KEY}${VID_SEP}${VERSION_ID}`; const objmd = new ObjectMD() - .setKey(KEY) - .setVersionId(VERSION_ID); + .setKey(KEY) + .setVersionId(VERSION_ID) + .setDataStoreName(LOCATION); const entry = new ObjectQueueEntry(BUCKET, versionKey, objmd); async.waterfall([ next => mongoClient.getBucketAttributes(BUCKET, fakeLogger, @@ -571,7 +587,9 @@ describe('MongoQueueProcessor', function mqp() { }); it('should delete an existing non versioned object from mongo', done => { - const objmd = new ObjectMD().setKey(KEY); + const objmd = new ObjectMD() + .setKey(KEY) + .setDataStoreName(LOCATION); const entry = new ObjectQueueEntry(BUCKET, KEY, objmd); async.waterfall([ next => mongoClient.getBucketAttributes(BUCKET, fakeLogger, @@ -588,6 +606,79 @@ describe('MongoQueueProcessor', function mqp() { done(); }); }); + + it('should not delete object from mongo when object is in another location', done => { + // use existing version id + const versionKey = `${KEY}${VID_SEP}${VERSION_ID}`; + const objmd = new ObjectMD() + .setKey(KEY) + .setVersionId(VERSION_ID) + .setDataStoreName('cold'); + sinon.stub(mqp._mongoClient, 'getObject').yields(null, objmd); + const entry = new ObjectQueueEntry(BUCKET, versionKey, objmd); + async.waterfall([ + next => mongoClient.getBucketAttributes(BUCKET, fakeLogger, + next), + (bucketInfo, next) => mqp._processDeleteOpQueueEntry(fakeLogger, + entry, LOCATION, next), + ], err => { + assert.ifError(err); + + const deleted = mqp.getDeleted(); + assert.strictEqual(deleted.length, 0); + done(); + }); + }); + + it('should not fail if object to delete does not exist anymore in mongo', done => { + // use existing version id + const versionKey = `${KEY}${VID_SEP}${VERSION_ID}`; + const objmd = new ObjectMD() + .setKey(KEY) + .setVersionId(VERSION_ID) + .setDataStoreName(LOCATION); + const entry = new ObjectQueueEntry(BUCKET, versionKey, objmd); + const deleteObject = sinon.stub(mongoClient, 'deleteObject').yields(errors.NoSuchKey); + async.waterfall([ + next => mongoClient.getBucketAttributes(BUCKET, fakeLogger, next), + (bucketInfo, next) => mqp._processDeleteOpQueueEntry(fakeLogger, + entry, LOCATION, next), + ], err => { + assert.ifError(err); + + assert.ok(deleteObject.calledOnce); + assert.strictEqual(deleteObject.getCall(0).args[0], BUCKET); + assert.strictEqual(deleteObject.getCall(0).args[1], KEY); + assert.strictEqual(deleteObject.getCall(0).args[2].versionId, VERSION_ID); + + done(); + }); + }); + + it('should fail if deleteObject fails', done => { + // use existing version id + const versionKey = `${KEY}${VID_SEP}${VERSION_ID}`; + const objmd = new ObjectMD() + .setKey(KEY) + .setVersionId(VERSION_ID) + .setDataStoreName(LOCATION); + const entry = new ObjectQueueEntry(BUCKET, versionKey, objmd); + const deleteObject = sinon.stub(mongoClient, 'deleteObject').yields(errors.InternalError); + async.waterfall([ + next => mongoClient.getBucketAttributes(BUCKET, fakeLogger, next), + (bucketInfo, next) => mqp._processDeleteOpQueueEntry(fakeLogger, + entry, LOCATION, next), + ], err => { + assert.ok(err?.is?.InternalError); + + assert.ok(deleteObject.calledOnce); + assert.strictEqual(deleteObject.getCall(0).args[0], BUCKET); + assert.strictEqual(deleteObject.getCall(0).args[1], KEY); + assert.strictEqual(deleteObject.getCall(0).args[2].versionId, VERSION_ID); + + done(); + }); + }); }); describe('::_getBucketInfo', () => {