Skip to content

Commit

Permalink
Use overhead field to handle delete while restored
Browse files Browse the repository at this point in the history
Issue: BB-590
  • Loading branch information
francoisferrand committed Dec 27, 2024
1 parent bba7122 commit d0b184e
Show file tree
Hide file tree
Showing 5 changed files with 67 additions and 5 deletions.
11 changes: 9 additions & 2 deletions extensions/mongoProcessor/MongoQueueProcessor.js
Original file line number Diff line number Diff line change
Expand Up @@ -400,7 +400,14 @@ class MongoQueueProcessor {
_processDeleteOpQueueEntry(log, sourceEntry, location, bucketInfo, done) {
const bucket = sourceEntry.getBucket();
const key = sourceEntry.getObjectKey();
const versionId = extractVersionId(sourceEntry.getObjectVersionedKey());
const entryVersionId = extractVersionId(sourceEntry.getObjectVersionedKey());

// Use x-amz-meta-scal-version-id if provided, instead of the actual versionId of the object.
// This should happen only for restored objects : in all other situations, both the source
// and ingested objects should have the same version id (and no x-amz-meta-scal-version-id
// metadata).
const scalVersionId = sourceEntry.getOverheadField('x-amz-meta-scal-version-id');
const versionId = scalVersionId ? VersionID.decode(scalVersionId) : entryVersionId;

this.logger.debug('processing object delete', { bucket, key, versionId });

Expand All @@ -416,7 +423,7 @@ class MongoQueueProcessor {
zenkoObjMd.location?.length !== 1 ||
zenkoObjMd.location[0].dataStoreName !== location ||
zenkoObjMd.location[0].key !== key ||
(zenkoObjMd.location[0].dataStoreVersionId || 'null') !== encode(versionId)
(zenkoObjMd.location[0].dataStoreVersionId || 'null') !== encode(entryVersionId)
) {
log.end().info('ignore delete entry, transitioned to another location', {
entry: sourceEntry.getLogInfo(),
Expand Down
12 changes: 11 additions & 1 deletion lib/models/DeleteOpQueueEntry.js
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,16 @@ class DeleteOpQueueEntry {
* @param {string} bucket - entry bucket
* @param {string} key - entry key (a versioned key if the entry
* was for a versioned object or a regular key if no versioning)
* @param {object} overheadFields - overhead fields associated with
* the operation (e.g. previous object's metadata)
*/
constructor(bucket, key) {
constructor(bucket, key, overheadFields) {
this._bucket = bucket;
this._key = key;
this._objectVersionedKey = key;
this._objectKey = _extractVersionedBaseKey(key);
this._startProcessing = Date.now();
this._overheadFields = overheadFields || {};
}

getStartProcessing() {
Expand All @@ -32,6 +35,9 @@ class DeleteOpQueueEntry {
if (typeof this.getObjectKey() !== 'string') {
return { message: 'missing key name' };
}
if (typeof this._overheadFields !== 'object') {
return { message: 'invalid overhead fields' };
}
return undefined;
}

Expand All @@ -51,6 +57,10 @@ class DeleteOpQueueEntry {
return this._objectVersionedKey;
}

getOverheadField(field) {
return this._overheadFields[field];
}

isVersion() {
return this.getObjectKey() === this.getObjectVersionedKey();
}
Expand Down
3 changes: 2 additions & 1 deletion lib/models/QueueEntry.js
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,8 @@ class QueueEntry {
}
let entry;
if (record.type === 'del') {
entry = new DeleteOpQueueEntry(record.bucket, record.key);
const overheadFields = JSON.parse(record.overheadFields);
entry = new DeleteOpQueueEntry(record.bucket, record.key, overheadFields);
} else if (record.bucket === usersBucket) {
// BucketQueueEntry class just handles puts of keys
// to usersBucket
Expand Down
2 changes: 1 addition & 1 deletion lib/queuePopulator/LogReader.js
Original file line number Diff line number Diff line change
Expand Up @@ -516,9 +516,9 @@ class LogReader {
}
async.eachSeries(this._extensions, (ext, next) => {
const overheadFields = {
...entry.overhead,
commitTimestamp: record.timestamp,
opTimestamp: entry.timestamp,
versionId: entry.overhead?.versionId,
};
const entryToFilter = {
type: entry.type,
Expand Down
44 changes: 44 additions & 0 deletions tests/functional/ingestion/MongoQueueProcessor.js
Original file line number Diff line number Diff line change
Expand Up @@ -907,6 +907,50 @@ describe('MongoQueueProcessor', function mqp() {
});
});

it('should use scal-version-id overhead field', done => {
// use existing version id
const versionKey = `${KEY}${VID_SEP}${NEW_VERSION_ID}`;
const entry = new DeleteOpQueueEntry(BUCKET, versionKey, {
'x-amz-meta-scal-version-id': encode(VERSION_ID),
});
const getObject = sinon.stub(mongoClient, 'getObject').yields(null, new ObjectMD()
.setKey(KEY)
.setVersionId(VERSION_ID)
.setDataStoreName(LOCATION)
.setLocation([{
key: KEY,
dataStoreVersionId: encode(NEW_VERSION_ID),
dataStoreName: LOCATION,
}])
.getValue());
const deleteObject = sinon.stub(mongoClient, 'deleteObject').callThrough();
async.waterfall([
next => mongoClient.getBucketAttributes(BUCKET, fakeLogger,
next),
(bucketInfo, next) => mqp._processDeleteOpQueueEntry(fakeLogger,
entry, LOCATION, bucketInfo, next),
], err => {
assert.ifError(err);

sinon.assert.calledOnce(getObject);
assert.deepStrictEqual(deleteObject.getCall(0).args[0], BUCKET);
assert.deepStrictEqual(deleteObject.getCall(0).args[1], KEY);
assert.deepStrictEqual(deleteObject.getCall(0).args[2], {
versionId: VERSION_ID
});

sinon.assert.calledOnce(deleteObject);
assert.deepStrictEqual(deleteObject.getCall(0).args[0], BUCKET);
assert.deepStrictEqual(deleteObject.getCall(0).args[1], KEY);
assert.deepStrictEqual(deleteObject.getCall(0).args[2], {
doesNotNeedOpogUpdate: true,
versionId: VERSION_ID
});

done();
});
});

it('should not delete object from mongo when object is in another location', done => {
// use existing version id
const versionKey = `${KEY}${VID_SEP}${VERSION_ID}`;
Expand Down

0 comments on commit d0b184e

Please sign in to comment.