From 6768fb4f717ab09cd338917344387133a73d1301 Mon Sep 17 00:00:00 2001 From: Francois Ferrand Date: Wed, 11 Dec 2024 18:39:31 +0100 Subject: [PATCH 1/6] gha: bump codecov@v5 Issue: BB-590 --- .github/actions/ft-test/action.yaml | 2 +- .github/workflows/tests.yaml | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/.github/actions/ft-test/action.yaml b/.github/actions/ft-test/action.yaml index aab6897d6..40d1aefbc 100644 --- a/.github/actions/ft-test/action.yaml +++ b/.github/actions/ft-test/action.yaml @@ -27,7 +27,7 @@ runs: env: BACKBEAT_CONFIG_FILE: ${{ inputs.config }} - - uses: codecov/codecov-action@v4 + - uses: codecov/codecov-action@v5 with: token: ${{ inputs.token }} directory: ./coverage/ft_test:${{ inputs.testsuite }} diff --git a/.github/workflows/tests.yaml b/.github/workflows/tests.yaml index 0aaff96c9..58196c033 100644 --- a/.github/workflows/tests.yaml +++ b/.github/workflows/tests.yaml @@ -80,7 +80,7 @@ jobs: -nodes 1 -stream -timeout 5m -slowSpecThreshold 60 working-directory: bucket-scanner - - uses: codecov/codecov-action@v4 + - uses: codecov/codecov-action@v5 with: token: ${{ secrets.CODECOV_TOKEN }} directory: bucket-scanner @@ -133,7 +133,7 @@ jobs: run: yarn run cover:test env: BACKBEAT_CONFIG_FILE: tests/config.json - - uses: codecov/codecov-action@v4 + - uses: codecov/codecov-action@v5 with: token: ${{ secrets.CODECOV_TOKEN }} directory: ./coverage/test From c91c1dde962be5f45006e8bf54da94f9569be249 Mon Sep 17 00:00:00 2001 From: Francois Ferrand Date: Wed, 11 Dec 2024 18:39:52 +0100 Subject: [PATCH 2/6] Fix dockerfile lint issues Issue: BB-590 --- Dockerfile | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/Dockerfile b/Dockerfile index 79bf75fba..7aa30db21 100644 --- a/Dockerfile +++ b/Dockerfile @@ -1,6 +1,6 @@ ARG NODE_VERSION=16.20-bullseye-slim -FROM node:${NODE_VERSION} as builder +FROM node:${NODE_VERSION} AS builder WORKDIR /usr/src/app @@ -22,7 +22,7 @@ RUN apt-get update \ libffi-dev \ libzstd-dev -ENV DOCKERIZE_VERSION v0.6.1 +ENV DOCKERIZE_VERSION=v0.6.1 RUN wget https://github.com/jwilder/dockerize/releases/download/$DOCKERIZE_VERSION/dockerize-linux-amd64-$DOCKERIZE_VERSION.tar.gz \ && tar -C /usr/local/bin -xzvf dockerize-linux-amd64-$DOCKERIZE_VERSION.tar.gz \ From c47058b75a8c4fa85bda9810cc8b781cdbecaed3 Mon Sep 17 00:00:00 2001 From: Francois Ferrand Date: Fri, 13 Sep 2024 15:51:21 +0200 Subject: [PATCH 3/6] Do not delete Zenko object when not in OOB Make OOB location-aware, and thus do not delete the Zenko object when it gets removed from S3C by gc at the end of the transition. Issue: BB-590 --- .../mongoProcessor/MongoQueueProcessor.js | 31 ++++++++++++++----- 1 file changed, 24 insertions(+), 7 deletions(-) diff --git a/extensions/mongoProcessor/MongoQueueProcessor.js b/extensions/mongoProcessor/MongoQueueProcessor.js index 30339f788..13e42f5a7 100644 --- a/extensions/mongoProcessor/MongoQueueProcessor.js +++ b/extensions/mongoProcessor/MongoQueueProcessor.js @@ -623,19 +623,36 @@ class MongoQueueProcessor { const location = bucketInfo.getLocationConstraint(); if (sourceEntry instanceof DeleteOpQueueEntry) { - return this._processDeleteOpQueueEntry(log, sourceEntry, - location, err => { + return this._getZenkoObjectMetadata(log, sourceEntry, bucketInfo, (err, zenkoObjMd) => { + if (err) { + this._normalizePendingMetric(location); + return done(err); + } + + if (zenkoObjMd.dataStoreName !== location) { + log.end().info('skipping delete entry with location mismatch', { + entry: sourceEntry.getLogInfo(), + location, + zenkoLocation: zenkoObjMd.location, + }); + this._normalizePendingMetric(location); + return done(); + } + + return this._processDeleteOpQueueEntry(log, sourceEntry, location, err => { this._handleMetrics(sourceEntry, !!err); return done(err); }); + }); } + if (sourceEntry instanceof ObjectQueueEntry) { - return this._processObjectQueueEntry(log, sourceEntry, location, - bucketInfo, err => { - this._handleMetrics(sourceEntry, !!err); - return done(err); - }); + return this._processObjectQueueEntry(log, sourceEntry, location, bucketInfo, err => { + this._handleMetrics(sourceEntry, !!err); + return done(err); + }); } + log.end().warn('skipping unknown source entry', { entry: sourceEntry.getLogInfo(), entryType: sourceEntry.constructor.name, From 40e7814f43e3353cebcf2c30a11ee802076090f3 Mon Sep 17 00:00:00 2001 From: Francois Ferrand Date: Tue, 15 Oct 2024 16:24:24 +0200 Subject: [PATCH 4/6] Use x-amz-meta-scal-version-id when ingesting object This custom (but reserved) user metadata field is set by Cloudserver when restoring cold object in OOB bucket. Issue: BB-590 --- .../mongoProcessor/MongoQueueProcessor.js | 99 +++++++++++++++++-- 1 file changed, 89 insertions(+), 10 deletions(-) diff --git a/extensions/mongoProcessor/MongoQueueProcessor.js b/extensions/mongoProcessor/MongoQueueProcessor.js index 13e42f5a7..384c49f7b 100644 --- a/extensions/mongoProcessor/MongoQueueProcessor.js +++ b/extensions/mongoProcessor/MongoQueueProcessor.js @@ -7,7 +7,8 @@ const errors = require('arsenal').errors; const { replicationBackends, emptyFileMd5 } = require('arsenal').constants; const MongoClient = require('arsenal').storage .metadata.mongoclient.MongoClientInterface; -const ObjectMD = require('arsenal').models.ObjectMD; +const { ObjectMD } = require('arsenal').models; +const { VersionID } = require('arsenal').versioning; const { extractVersionId } = require('../../lib/util/versioning'); const Config = require('../../lib/Config'); @@ -195,27 +196,43 @@ class MongoQueueProcessor { } _getZenkoObjectMetadata(log, entry, bucketInfo, done) { - // NOTE: This is only used for updating replication info. If the Zenko - // bucket does not have repInfo set, then we can ignore fetching + // NOTE: This is used for updating replication info, as well as validating the + // `x-amz-meta-scal-version-id` header. If the Zenko bucket does not have repInfo set and + // the header is not set, then we can ignore fetching const bucketRepInfo = bucketInfo.getReplicationConfiguration(); - if (!bucketRepInfo || !bucketRepInfo.rules || - !bucketRepInfo.rules[0].enabled) { + // KO for DeleteOpQueueEntry : no such field (and no metadata...) + const scalVersionId = entry.getValue ? entry.getValue()['x-amz-meta-scal-version-id'] : undefined; + if (!(entry instanceof DeleteOpQueueEntry) && + !scalVersionId && + (!bucketRepInfo || !bucketRepInfo.rules || !bucketRepInfo.rules[0].enabled)) { return done(); } const bucket = entry.getBucket(); const key = entry.getObjectKey(); const params = {}; + + // Use x-amz-meta-scal-version-id if provided, instead of the actual versionId of the object. + // This should happen only for restored objects : in all other situations, both the source + // and ingested objects should have the same version id (and not x-amz-meta-scal-version-id + // metadata). + const versionId = VersionID.decode(scalVersionId) || entry.getVersionId(); + // master keys with a 'null' version id comming from // a versioning suspended bucket are considered a version // we should not specify the version id in this case - if (entry.getVersionId() && !entry.getIsNull()) { - params.versionId = entry.getVersionId(); + if (versionId && !entry.getIsNull()) { + params.versionId = versionId; } - return this._mongoClient.getObject(bucket, key, params, log, - (err, data) => { + return this._mongoClient.getObject(bucket, key, params, log, (err, data) => { if (err && err.NoSuchKey) { + // TODO: this may happen if the object was created from artesca side (e.g. on restore) + // --> in that case, the versionId does not match, and we cannot find the object + // --> yet we have the 'ring' versionId in the (zenko) entry's `location` + // field, so we may try to look it up (get all versions, check the one with + // where location is indeed in the RING and with the 'target' versionId) + // ....or we introduce a way to store the RING object with the target metadata return done(); } if (err) { @@ -226,6 +243,19 @@ class MongoQueueProcessor { }); return done(err); } + + // Sanity check (esp. for restored objects case): verify that the object in the data + // location matches the object we ingested + // if (data.location[0].dataStoreVersionId !== entry.getVersionId()) { + // const err = new Error('version id mismatch'); + // log.error('error getting zenko object metadata', { + // method: 'MongoQueueProcessor._getZenkoObjectMetadata', + // err, + // entry: entry.getLogInfo(), + // }); + // return done(err); // TODO: should we return an error here, or just consider a mismatch? + // } + return done(null, data); }); } @@ -332,6 +362,8 @@ class MongoQueueProcessor { _updateReplicationInfo(entry, bucketInfo, content, zenkoObjMd) { const bucketRepInfo = bucketInfo.getReplicationConfiguration(); + // TODO: adjust `_updateReplicationInfo` for restored objects + // reset first before attempting any other updates const objectMDModel = new ObjectMD(); entry.setReplicationInfo(objectMDModel.getReplicationInfo()); @@ -442,6 +474,10 @@ class MongoQueueProcessor { const bucket = sourceEntry.getBucket(); const key = sourceEntry.getObjectKey(); + this.logger.info('processing object metadata', { + bucket, key, scalVersionId: sourceEntry.getValue()['x-amz-meta-scal-version-id'], + }); + this._getZenkoObjectMetadata(log, sourceEntry, bucketInfo, (err, zenkoObjMd) => { if (err) { @@ -480,6 +516,34 @@ class MongoQueueProcessor { const objVal = sourceEntry.getValue(); const params = {}; + + // If the object has `x-amz-meta-scal-version-id`, we need to use it instead of the id. + // This should only happen for objects restored onto the OOB location, and the location + // should match in that case + const scalVersionId = objVal['x-amz-meta-scal-version-id']; + if (scalVersionId) { + + this.logger.info('restored oob object', { + bucket, key, scalVersionId, zenkoObjMd, sourceEntry + }); + + if (!zenkoObjMd) { + this.logger.warn('missing source entry, ignoring x-amz-meta-scal-version-id', { + method: 'MongoQueueProcessor._processObjectQueueEntry', + location, + }); + } else if (zenkoObjMd.location[0]?.dataStoreVersionId !== sourceEntry.getVersionId()) { + this.logger.warn('mismatched source entry, ignoring x-amz-meta-scal-version-id', { + method: 'MongoQueueProcessor._processObjectQueueEntry', + location, + }); + } else { + sourceEntry.setVersionId(zenkoObjMd.versionId); + delete objVal['x-amz-meta-scal-version-id']; + delete objVal['x-amz-meta-scal-restore-info']; + } + } + // Versioning suspended entries will have a version id but also a isNull tag. // These master keys are considered a version and do not have a duplicate version, // we don't specify the version id and repairMaster in this case @@ -629,7 +693,18 @@ class MongoQueueProcessor { return done(err); } - if (zenkoObjMd.dataStoreName !== location) { + // TODO: use getReducedLocation() ? or x-amz-storage-class ? or dataStoreName ? + // * x-amz-storage-class --> will always indicate the "cold" location + // * dataStoreName will be cold when archived, and "oob" when restored + // ==> we update the data store name (and x-amz-storage-class) before actually + // removing the data so we should look at this: so we can ignore when there is + // a match (e.g. lifecycle gc) and process when the event when user deletes + // the (restored) object + // + // TODO: two cases for restored object to test + // - expiring restored object from Sorbet --> delete from Zenko, should work fine... + // - delete made on the ring (user deletes the object) --> need to be propagated to Zenko! + if (zenkoObjMd?.dataStoreName !== location) { log.end().info('skipping delete entry with location mismatch', { entry: sourceEntry.getLogInfo(), location, @@ -647,6 +722,10 @@ class MongoQueueProcessor { } if (sourceEntry instanceof ObjectQueueEntry) { + // TODO: need to handle "replacement" case, e.g. if new object (version) is created on the RING + // while the object is either restored or archived + // - this is versionned : so it will not depend on the "state" of the object + // - probably works fine, may simply not have the same versions on both sides... return this._processObjectQueueEntry(log, sourceEntry, location, bucketInfo, err => { this._handleMetrics(sourceEntry, !!err); return done(err); From 769a387a0bb705816794852ed037b1b1c16fba3f Mon Sep 17 00:00:00 2001 From: Francois Ferrand Date: Mon, 25 Nov 2024 10:06:33 +0100 Subject: [PATCH 5/6] debug --- .../mongoProcessor/MongoQueueProcessor.js | 26 ++++--------------- 1 file changed, 5 insertions(+), 21 deletions(-) diff --git a/extensions/mongoProcessor/MongoQueueProcessor.js b/extensions/mongoProcessor/MongoQueueProcessor.js index 384c49f7b..36424eaba 100644 --- a/extensions/mongoProcessor/MongoQueueProcessor.js +++ b/extensions/mongoProcessor/MongoQueueProcessor.js @@ -225,16 +225,9 @@ class MongoQueueProcessor { params.versionId = versionId; } + log.debug('getting zenko object metadata', { bucket, key, versionId, params }); + return this._mongoClient.getObject(bucket, key, params, log, (err, data) => { - if (err && err.NoSuchKey) { - // TODO: this may happen if the object was created from artesca side (e.g. on restore) - // --> in that case, the versionId does not match, and we cannot find the object - // --> yet we have the 'ring' versionId in the (zenko) entry's `location` - // field, so we may try to look it up (get all versions, check the one with - // where location is indeed in the RING and with the 'target' versionId) - // ....or we introduce a way to store the RING object with the target metadata - return done(); - } if (err) { log.error('error getting zenko object metadata', { method: 'MongoQueueProcessor._getZenkoObjectMetadata', @@ -244,18 +237,6 @@ class MongoQueueProcessor { return done(err); } - // Sanity check (esp. for restored objects case): verify that the object in the data - // location matches the object we ingested - // if (data.location[0].dataStoreVersionId !== entry.getVersionId()) { - // const err = new Error('version id mismatch'); - // log.error('error getting zenko object metadata', { - // method: 'MongoQueueProcessor._getZenkoObjectMetadata', - // err, - // entry: entry.getLogInfo(), - // }); - // return done(err); // TODO: should we return an error here, or just consider a mismatch? - // } - return done(null, data); }); } @@ -672,6 +653,9 @@ class MongoQueueProcessor { MongoProcessorMetrics.onProcessKafkaEntry(); const log = this.logger.newRequestLogger(); const sourceEntry = QueueEntry.createFromKafkaEntry(kafkaEntry); + + this.logger.info('processing kafka entry', { sourceEntry }); + if (sourceEntry.error) { log.end().error('error processing source entry', { error: sourceEntry.error }); From 04886d426d10626d8f4b3070a7930feabb38cde8 Mon Sep 17 00:00:00 2001 From: Francois Ferrand Date: Wed, 4 Dec 2024 17:50:19 +0100 Subject: [PATCH 6/6] Properly handle archive Issue: BB-590 --- .../mongoProcessor/MongoQueueProcessor.js | 215 +++++++++--------- .../ingestion/MongoQueueProcessor.js | 179 +++++++++++---- 2 files changed, 244 insertions(+), 150 deletions(-) diff --git a/extensions/mongoProcessor/MongoQueueProcessor.js b/extensions/mongoProcessor/MongoQueueProcessor.js index 36424eaba..cbe4cfdcb 100644 --- a/extensions/mongoProcessor/MongoQueueProcessor.js +++ b/extensions/mongoProcessor/MongoQueueProcessor.js @@ -195,33 +195,23 @@ class MongoQueueProcessor { ], done); } - _getZenkoObjectMetadata(log, entry, bucketInfo, done) { - // NOTE: This is used for updating replication info, as well as validating the - // `x-amz-meta-scal-version-id` header. If the Zenko bucket does not have repInfo set and - // the header is not set, then we can ignore fetching - const bucketRepInfo = bucketInfo.getReplicationConfiguration(); - // KO for DeleteOpQueueEntry : no such field (and no metadata...) - const scalVersionId = entry.getValue ? entry.getValue()['x-amz-meta-scal-version-id'] : undefined; - if (!(entry instanceof DeleteOpQueueEntry) && - !scalVersionId && - (!bucketRepInfo || !bucketRepInfo.rules || !bucketRepInfo.rules[0].enabled)) { - return done(); - } - + /** + * Retrieve Zenko object metadata from MongoDB + * @param {Logger} log The logger object + * @param {ObjectQueueEntry|DeleteOpQueueEntry} entry The entry to being processed + * @param {string} versionId The version id of the object + * @param {function} done The callback function + * @returns {undefined} + */ + _getZenkoObjectMetadata(log, entry, versionId, done) { const bucket = entry.getBucket(); const key = entry.getObjectKey(); const params = {}; - // Use x-amz-meta-scal-version-id if provided, instead of the actual versionId of the object. - // This should happen only for restored objects : in all other situations, both the source - // and ingested objects should have the same version id (and not x-amz-meta-scal-version-id - // metadata). - const versionId = VersionID.decode(scalVersionId) || entry.getVersionId(); - // master keys with a 'null' version id comming from // a versioning suspended bucket are considered a version // we should not specify the version id in this case - if (versionId && !entry.getIsNull()) { + if (versionId && !(entry.getIsNull && entry.getIsNull())) { params.versionId = versionId; } @@ -411,35 +401,59 @@ class MongoQueueProcessor { const key = sourceEntry.getObjectKey(); const versionId = extractVersionId(sourceEntry.getObjectVersionedKey()); - const options = versionId ? { versionId } : undefined; - - // Calling deleteObject with undefined options to use deleteObjectNoVer which is used for - // deleting non versioned objects that only have master keys. - // When deleting a versioned object however we supply the version id in the options, which - // causes the function to call the deleteObjectVer function that is used to handle objects that - // have both a master and version keys. This handles the deletion of both the version and the master - // keys in the case where no other version is available, or deleting the version and updating the - // master key otherwise. - return this._mongoClient.deleteObject(bucket, key, options, log, - err => { - if (err) { - this._normalizePendingMetric(location); - log.end().error('error deleting object metadata ' + - 'from mongo', { - bucket, - key, - error: err.message, + this.logger.debug('processing object delete', { bucket, key, versionId }); + + async.waterfall([ + cb => this._getZenkoObjectMetadata(log, sourceEntry, versionId, cb), + (zenkoObjMd, cb) => { + if (zenkoObjMd.dataStoreName !== location) { + log.end().info('ignore delete entry, transitioned to another location', { + entry: sourceEntry.getLogInfo(), location, }); - return done(err); + return done(); } - this._produceMetricCompletionEntry(location); - log.end().info('object metadata deleted from mongo', { + + return cb(); + }, + cb => { + // Calling deleteObject with undefined options to use deleteObjectNoVer which is used for + // deleting non versioned objects that only have master keys. + // When deleting a versioned object however we supply the version id in the options, which + // causes the function to call the deleteObjectVer function that is used to handle objects that + // have both a master and version keys. This handles the deletion of both the version and the master + // keys in the case where no other version is available, or deleting the version and updating the + // master key otherwise. + const options = versionId ? { versionId } : undefined; + + return this._mongoClient.deleteObject(bucket, key, options, log, cb); + }, + ], err => { + if (err?.is.NoSuchKey) { + log.end().info('skipping delete entry', { entry: sourceEntry.getLogInfo(), location, }); return done(); + } + if (err) { + this._normalizePendingMetric(location); + log.end().error('error deleting object metadata ' + + 'from mongo', { + bucket, + key, + error: err.message, + location, + }); + return done(err); + } + this._produceMetricCompletionEntry(location); + log.end().info('object metadata deleted from mongo', { + entry: sourceEntry.getLogInfo(), + location, }); + return done(); + }); } /** @@ -454,14 +468,29 @@ class MongoQueueProcessor { _processObjectQueueEntry(log, sourceEntry, location, bucketInfo, done) { const bucket = sourceEntry.getBucket(); const key = sourceEntry.getObjectKey(); + const scalVersionId = sourceEntry.getValue()['x-amz-meta-scal-version-id']; - this.logger.info('processing object metadata', { - bucket, key, scalVersionId: sourceEntry.getValue()['x-amz-meta-scal-version-id'], - }); + this.logger.debug('processing object metadata', { bucket, key, scalVersionId }); - this._getZenkoObjectMetadata(log, sourceEntry, bucketInfo, - (err, zenkoObjMd) => { - if (err) { + const maybeGetZenkoObjectMetadata = cb => { + // NOTE: ZenkoObjMD is used for updating replication info, as well as validating the + // `x-amz-meta-scal-version-id` header of restored objects. If the Zenko bucket does + // not have repInfo set and the header is not set, then we can skip fetching. + const bucketRepInfo = bucketInfo.getReplicationConfiguration(); + if (!scalVersionId && !bucketRepInfo?.rules?.some(r => r.enabled)) { + return cb(); + } + + // Use x-amz-meta-scal-version-id if provided, instead of the actual versionId of the object. + // This should happen only for restored objects : in all other situations, both the source + // and ingested objects should have the same version id (and not x-amz-meta-scal-version-id + // metadata). + const versionId = scalVersionId ? VersionID.decode(scalVersionId) : sourceEntry.getVersionId(); + return this._getZenkoObjectMetadata(log, sourceEntry, versionId, cb); + }; + + maybeGetZenkoObjectMetadata((err, zenkoObjMd) => { + if (err && !err.NoSuchKey) { this._normalizePendingMetric(location); log.end().error('error processing object queue entry', { method: 'MongoQueueProcessor._processObjectQueueEntry', @@ -471,6 +500,35 @@ class MongoQueueProcessor { return done(err); } + // If the object has `x-amz-meta-scal-version-id`, we need to use it instead of the id. + // This should only happen for objects restored onto the OOB location, and the location + // should match in that case + if (scalVersionId) { + if (!zenkoObjMd) { + this.logger.warn('missing source entry, ignoring x-amz-meta-scal-version-id', { + method: 'MongoQueueProcessor._processObjectQueueEntry', + location, + }); + } else if (zenkoObjMd.location[0]?.dataStoreVersionId !== sourceEntry.getVersionId()) { + this.logger.warn('mismatched source entry, ignoring x-amz-meta-scal-version-id', { + method: 'MongoQueueProcessor._processObjectQueueEntry', + location, + }); + } else { + this.logger.info('restored oob object', { + bucket, key, scalVersionId, zenkoObjMd, sourceEntry + }); + + sourceEntry.setVersionId(scalVersionId); + + // TODO: do we need to update the (mongo) metadata in that case??? + // - This may happen if object is re-tagged while restored? + // - Need to cleanup scal version id: delete objVal['x-amz-meta-scal-version-id']; + // - Need to keep the archive & restore fields in the metadata + return done(); + } + } + const content = getContentType(sourceEntry, zenkoObjMd); if (content.length === 0) { this._normalizePendingMetric(location); @@ -498,33 +556,6 @@ class MongoQueueProcessor { const objVal = sourceEntry.getValue(); const params = {}; - // If the object has `x-amz-meta-scal-version-id`, we need to use it instead of the id. - // This should only happen for objects restored onto the OOB location, and the location - // should match in that case - const scalVersionId = objVal['x-amz-meta-scal-version-id']; - if (scalVersionId) { - - this.logger.info('restored oob object', { - bucket, key, scalVersionId, zenkoObjMd, sourceEntry - }); - - if (!zenkoObjMd) { - this.logger.warn('missing source entry, ignoring x-amz-meta-scal-version-id', { - method: 'MongoQueueProcessor._processObjectQueueEntry', - location, - }); - } else if (zenkoObjMd.location[0]?.dataStoreVersionId !== sourceEntry.getVersionId()) { - this.logger.warn('mismatched source entry, ignoring x-amz-meta-scal-version-id', { - method: 'MongoQueueProcessor._processObjectQueueEntry', - location, - }); - } else { - sourceEntry.setVersionId(zenkoObjMd.versionId); - delete objVal['x-amz-meta-scal-version-id']; - delete objVal['x-amz-meta-scal-restore-info']; - } - } - // Versioning suspended entries will have a version id but also a isNull tag. // These master keys are considered a version and do not have a duplicate version, // we don't specify the version id and repairMaster in this case @@ -654,7 +685,7 @@ class MongoQueueProcessor { const log = this.logger.newRequestLogger(); const sourceEntry = QueueEntry.createFromKafkaEntry(kafkaEntry); - this.logger.info('processing kafka entry', { sourceEntry }); + this.logger.trace('processing kafka entry', { sourceEntry }); if (sourceEntry.error) { log.end().error('error processing source entry', @@ -671,37 +702,9 @@ class MongoQueueProcessor { const location = bucketInfo.getLocationConstraint(); if (sourceEntry instanceof DeleteOpQueueEntry) { - return this._getZenkoObjectMetadata(log, sourceEntry, bucketInfo, (err, zenkoObjMd) => { - if (err) { - this._normalizePendingMetric(location); - return done(err); - } - - // TODO: use getReducedLocation() ? or x-amz-storage-class ? or dataStoreName ? - // * x-amz-storage-class --> will always indicate the "cold" location - // * dataStoreName will be cold when archived, and "oob" when restored - // ==> we update the data store name (and x-amz-storage-class) before actually - // removing the data so we should look at this: so we can ignore when there is - // a match (e.g. lifecycle gc) and process when the event when user deletes - // the (restored) object - // - // TODO: two cases for restored object to test - // - expiring restored object from Sorbet --> delete from Zenko, should work fine... - // - delete made on the ring (user deletes the object) --> need to be propagated to Zenko! - if (zenkoObjMd?.dataStoreName !== location) { - log.end().info('skipping delete entry with location mismatch', { - entry: sourceEntry.getLogInfo(), - location, - zenkoLocation: zenkoObjMd.location, - }); - this._normalizePendingMetric(location); - return done(); - } - - return this._processDeleteOpQueueEntry(log, sourceEntry, location, err => { - this._handleMetrics(sourceEntry, !!err); - return done(err); - }); + return this._processDeleteOpQueueEntry(log, sourceEntry, location, err => { + this._handleMetrics(sourceEntry, !!err); + return done(err); }); } diff --git a/tests/functional/ingestion/MongoQueueProcessor.js b/tests/functional/ingestion/MongoQueueProcessor.js index e37abea75..6dc0630b6 100644 --- a/tests/functional/ingestion/MongoQueueProcessor.js +++ b/tests/functional/ingestion/MongoQueueProcessor.js @@ -2,6 +2,7 @@ const assert = require('assert'); const async = require('async'); +const sinon = require('sinon'); const { ObjectMD, BucketInfo } = require('arsenal').models; const { decode, encode } = require('arsenal').versioning.VersionID; const errors = require('arsenal').errors; @@ -13,6 +14,7 @@ const MongoQueueProcessor = require('../../../extensions/mongoProcessor/MongoQueueProcessor'); const authdata = require('../../../conf/authdata.json'); const ObjectQueueEntry = require('../../../lib/models/ObjectQueueEntry'); +const DeleteOpQueueEntry = require('../../../lib/models/DeleteOpQueueEntry'); const fakeLogger = require('../../utils/fakeLogger'); const kafkaConfig = config.kafka; @@ -119,8 +121,9 @@ class MongoClientMock { // we get object from mongo to determine replicationInfo.Content types. // use "tags" and "versionId" for determining this. const obj = new ObjectMD() - .setVersionId(VERSION_ID) - .setTags({ mytag: 'mytags-value' }); + .setVersionId(VERSION_ID) + .setTags({ mytag: 'mytags-value' }) + .setDataStoreName(LOCATION); return cb(null, obj._data); } @@ -204,74 +207,86 @@ describe('MongoQueueProcessor', function mqp() { afterEach(() => { mqp.reset(); + sinon.restore(); }); describe('::_getZenkoObjectMetadata', () => { - function testGetZenkoObjectMetadata(entry, cb) { - mongoClient.getBucketAttributes(BUCKET, fakeLogger, - (error, bucketInfo) => { - assert.ifError(error); - - mqp._getZenkoObjectMetadata(fakeLogger, entry, bucketInfo, cb); - }); - } - - it('should return empty if key does not exist in mongo', done => { + it('should return an error if key does not exist in mongo', done => { const key = 'nonexistant'; const objmd = new ObjectMD().setKey(key); const entry = new ObjectQueueEntry(BUCKET, key, objmd); - testGetZenkoObjectMetadata(entry, (err, res) => { - assert.ifError(err); + mqp._getZenkoObjectMetadata(fakeLogger, entry, VERSION_ID, (err, res) => { + assert.ok(err?.is?.NoSuchKey); assert.strictEqual(res, undefined); return done(); }); }); - it('should return empty if version id of object does not exist in ' + - 'mongo', done => { + it('should return an error if version id of object does not exist in mongo', done => { const versionKey = `${KEY}${VID_SEP}${NEW_VERSION_ID}`; const objmd = new ObjectMD() - .setKey(KEY) - .setVersionId(NEW_VERSION_ID); + .setKey(KEY) + .setVersionId(NEW_VERSION_ID); const entry = new ObjectQueueEntry(BUCKET, versionKey, objmd); - testGetZenkoObjectMetadata(entry, (err, res) => { - assert.ifError(err); + mqp._getZenkoObjectMetadata(fakeLogger, entry, NEW_VERSION_ID, (err, res) => { + assert.ok(err?.is?.NoSuchKey); assert.strictEqual(res, undefined); return done(); }); }); - it('should return empty if bucket replication info is disabled', - done => { - const disabledRepInfo = Object.assign({}, mockReplicationInfo, { - rules: [{ enabled: false }], - }); - const disabledMockBucketInfo = { - getReplicationConfiguration: () => disabledRepInfo, - }; - const versionKey = `${KEY}${VID_SEP}${NEW_VERSION_ID}`; + it('should return object metadata for existing version', done => { + const versionKey = `${KEY}${VID_SEP}${VERSION_ID}`; const objmd = new ObjectMD() - .setKey(KEY) - .setVersionId(NEW_VERSION_ID); + .setKey(KEY) + .setVersionId(VERSION_ID); const entry = new ObjectQueueEntry(BUCKET, versionKey, objmd); - mqp._getZenkoObjectMetadata(fakeLogger, entry, - disabledMockBucketInfo, (err, res) => { - assert.ifError(err); + mqp._getZenkoObjectMetadata(fakeLogger, entry, VERSION_ID, (err, res) => { + assert.ifError(err); - assert.strictEqual(res, undefined); - return done(); - }); + assert(res); + assert.strictEqual(res.versionId, VERSION_ID); + return done(); + }); }); - it('should return object metadata for existing version', done => { + it('should return object metadata for existing version from DeleteOpQueueEntry', done => { const versionKey = `${KEY}${VID_SEP}${VERSION_ID}`; + const entry = new DeleteOpQueueEntry(BUCKET, versionKey); + mqp._getZenkoObjectMetadata(fakeLogger, entry, VERSION_ID, (err, res) => { + assert.ifError(err); + + assert(res); + assert.strictEqual(res.versionId, VERSION_ID); + return done(); + }); + }); + + it('should return object metadata for null "master" version', done => { + const versionKey = `${KEY}`; const objmd = new ObjectMD() - .setKey(KEY) - .setVersionId(VERSION_ID); + .setKey(KEY); const entry = new ObjectQueueEntry(BUCKET, versionKey, objmd); - testGetZenkoObjectMetadata(entry, (err, res) => { + mqp._getZenkoObjectMetadata(fakeLogger, entry, null, (err, res) => { + assert.ifError(err); + + assert(res); + assert.strictEqual(res.versionId, VERSION_ID); + return done(); + }); + }); + + it('should return object metadata for null "suspended" version', done => { + const versionKey = `${KEY}${VID_SEP}${NEW_VERSION_ID}`; + const objmd = new ObjectMD() + .setKey(KEY) + .setVersionId(NEW_VERSION_ID) + .setNullVersionId(NEW_VERSION_ID) + .setIsNull(true); + const entry = new ObjectQueueEntry(BUCKET, versionKey, objmd); + mqp._getZenkoObjectMetadata(fakeLogger, entry, NEW_VERSION_ID, (err, res) => { assert.ifError(err); assert(res); @@ -551,8 +566,9 @@ describe('MongoQueueProcessor', function mqp() { // use existing version id const versionKey = `${KEY}${VID_SEP}${VERSION_ID}`; const objmd = new ObjectMD() - .setKey(KEY) - .setVersionId(VERSION_ID); + .setKey(KEY) + .setVersionId(VERSION_ID) + .setDataStoreName(LOCATION); const entry = new ObjectQueueEntry(BUCKET, versionKey, objmd); async.waterfall([ next => mongoClient.getBucketAttributes(BUCKET, fakeLogger, @@ -571,7 +587,9 @@ describe('MongoQueueProcessor', function mqp() { }); it('should delete an existing non versioned object from mongo', done => { - const objmd = new ObjectMD().setKey(KEY); + const objmd = new ObjectMD() + .setKey(KEY) + .setDataStoreName(LOCATION); const entry = new ObjectQueueEntry(BUCKET, KEY, objmd); async.waterfall([ next => mongoClient.getBucketAttributes(BUCKET, fakeLogger, @@ -588,6 +606,79 @@ describe('MongoQueueProcessor', function mqp() { done(); }); }); + + it('should not delete object from mongo when object is in another location', done => { + // use existing version id + const versionKey = `${KEY}${VID_SEP}${VERSION_ID}`; + const objmd = new ObjectMD() + .setKey(KEY) + .setVersionId(VERSION_ID) + .setDataStoreName('cold'); + sinon.stub(mqp._mongoClient, 'getObject').yields(null, objmd); + const entry = new ObjectQueueEntry(BUCKET, versionKey, objmd); + async.waterfall([ + next => mongoClient.getBucketAttributes(BUCKET, fakeLogger, + next), + (bucketInfo, next) => mqp._processDeleteOpQueueEntry(fakeLogger, + entry, LOCATION, next), + ], err => { + assert.ifError(err); + + const deleted = mqp.getDeleted(); + assert.strictEqual(deleted.length, 0); + done(); + }); + }); + + it('should not fail if object to delete does not exist anymore in mongo', done => { + // use existing version id + const versionKey = `${KEY}${VID_SEP}${VERSION_ID}`; + const objmd = new ObjectMD() + .setKey(KEY) + .setVersionId(VERSION_ID) + .setDataStoreName(LOCATION); + const entry = new ObjectQueueEntry(BUCKET, versionKey, objmd); + const deleteObject = sinon.stub(mongoClient, 'deleteObject').yields(errors.NoSuchKey); + async.waterfall([ + next => mongoClient.getBucketAttributes(BUCKET, fakeLogger, next), + (bucketInfo, next) => mqp._processDeleteOpQueueEntry(fakeLogger, + entry, LOCATION, next), + ], err => { + assert.ifError(err); + + assert.ok(deleteObject.calledOnce); + assert.strictEqual(deleteObject.getCall(0).args[0], BUCKET); + assert.strictEqual(deleteObject.getCall(0).args[1], KEY); + assert.strictEqual(deleteObject.getCall(0).args[2].versionId, VERSION_ID); + + done(); + }); + }); + + it('should fail if deleteObject fails', done => { + // use existing version id + const versionKey = `${KEY}${VID_SEP}${VERSION_ID}`; + const objmd = new ObjectMD() + .setKey(KEY) + .setVersionId(VERSION_ID) + .setDataStoreName(LOCATION); + const entry = new ObjectQueueEntry(BUCKET, versionKey, objmd); + const deleteObject = sinon.stub(mongoClient, 'deleteObject').yields(errors.InternalError); + async.waterfall([ + next => mongoClient.getBucketAttributes(BUCKET, fakeLogger, next), + (bucketInfo, next) => mqp._processDeleteOpQueueEntry(fakeLogger, + entry, LOCATION, next), + ], err => { + assert.ok(err?.is?.InternalError); + + assert.ok(deleteObject.calledOnce); + assert.strictEqual(deleteObject.getCall(0).args[0], BUCKET); + assert.strictEqual(deleteObject.getCall(0).args[1], KEY); + assert.strictEqual(deleteObject.getCall(0).args[2].versionId, VERSION_ID); + + done(); + }); + }); }); describe('::_getBucketInfo', () => {