Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

test #2604

Closed
wants to merge 10 commits into from
2 changes: 1 addition & 1 deletion .github/actions/ft-test/action.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ runs:
env:
BACKBEAT_CONFIG_FILE: ${{ inputs.config }}

- uses: codecov/codecov-action@v4
- uses: codecov/codecov-action@v5
with:
token: ${{ inputs.token }}
directory: ./coverage/ft_test:${{ inputs.testsuite }}
Expand Down
4 changes: 2 additions & 2 deletions .github/workflows/tests.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ jobs:
-nodes 1 -stream -timeout 5m -slowSpecThreshold 60
working-directory: bucket-scanner

- uses: codecov/codecov-action@v4
- uses: codecov/codecov-action@v5
with:
token: ${{ secrets.CODECOV_TOKEN }}
directory: bucket-scanner
Expand Down Expand Up @@ -133,7 +133,7 @@ jobs:
run: yarn run cover:test
env:
BACKBEAT_CONFIG_FILE: tests/config.json
- uses: codecov/codecov-action@v4
- uses: codecov/codecov-action@v5
with:
token: ${{ secrets.CODECOV_TOKEN }}
directory: ./coverage/test
Expand Down
4 changes: 2 additions & 2 deletions Dockerfile
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
ARG NODE_VERSION=16.20-bullseye-slim

FROM node:${NODE_VERSION} as builder
FROM node:${NODE_VERSION} AS builder

WORKDIR /usr/src/app

Expand All @@ -22,7 +22,7 @@ RUN apt-get update \
libffi-dev \
libzstd-dev

ENV DOCKERIZE_VERSION v0.6.1
ENV DOCKERIZE_VERSION=v0.6.1

RUN wget https://github.com/jwilder/dockerize/releases/download/$DOCKERIZE_VERSION/dockerize-linux-amd64-$DOCKERIZE_VERSION.tar.gz \
&& tar -C /usr/local/bin -xzvf dockerize-linux-amd64-$DOCKERIZE_VERSION.tar.gz \
Expand Down
189 changes: 138 additions & 51 deletions extensions/mongoProcessor/MongoQueueProcessor.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,8 @@
const { replicationBackends, emptyFileMd5 } = require('arsenal').constants;
const MongoClient = require('arsenal').storage
.metadata.mongoclient.MongoClientInterface;
const ObjectMD = require('arsenal').models.ObjectMD;
const { ObjectMD } = require('arsenal').models;
const { VersionID } = require('arsenal').versioning;
const { extractVersionId } = require('../../lib/util/versioning');

const Config = require('../../lib/Config');
Expand Down Expand Up @@ -194,30 +195,29 @@
], done);
}

_getZenkoObjectMetadata(log, entry, bucketInfo, done) {
// NOTE: This is only used for updating replication info. If the Zenko
// bucket does not have repInfo set, then we can ignore fetching
const bucketRepInfo = bucketInfo.getReplicationConfiguration();
if (!bucketRepInfo || !bucketRepInfo.rules ||
!bucketRepInfo.rules[0].enabled) {
return done();
}

/**
* Retrieve Zenko object metadata from MongoDB
* @param {Logger} log The logger object
* @param {ObjectQueueEntry|DeleteOpQueueEntry} entry The entry to being processed
* @param {string} versionId The version id of the object
* @param {function} done The callback function
* @returns {undefined}
*/
_getZenkoObjectMetadata(log, entry, versionId, done) {
const bucket = entry.getBucket();
const key = entry.getObjectKey();
const params = {};

// master keys with a 'null' version id comming from
// a versioning suspended bucket are considered a version
// we should not specify the version id in this case
if (entry.getVersionId() && !entry.getIsNull()) {
params.versionId = entry.getVersionId();
if (versionId && !(entry.getIsNull && entry.getIsNull())) {
params.versionId = versionId;
}

return this._mongoClient.getObject(bucket, key, params, log,
(err, data) => {
if (err && err.NoSuchKey) {
return done();
}
log.debug('getting zenko object metadata', { bucket, key, versionId, params });

return this._mongoClient.getObject(bucket, key, params, log, (err, data) => {
if (err) {
log.error('error getting zenko object metadata', {
method: 'MongoQueueProcessor._getZenkoObjectMetadata',
Expand All @@ -226,13 +226,14 @@
});
return done(err);
}

return done(null, data);
});
}

/**
* get dataStoreVersionId, if exists
* @param {Object} objMd - object md fetched from mongo
* @param {ObjectMDData} objMd - object md fetched from mongo
* @param {String} site - storage location name
* @return {String} dataStoreVersionId
*/
Expand Down Expand Up @@ -390,43 +391,95 @@
* @param {Logger.newRequestLogger} log - request logger object
* @param {DeleteOpQueueEntry} sourceEntry - delete object entry
* @param {string} location - zenko storage location name
* @param {BucketInfo} bucketInfo - bucket info object
* @param {function} done - callback(error)
* @return {undefined}
*/
_processDeleteOpQueueEntry(log, sourceEntry, location, done) {
_processDeleteOpQueueEntry(log, sourceEntry, location, bucketInfo, done) {
const bucket = sourceEntry.getBucket();
const key = sourceEntry.getObjectKey();
const versionId = extractVersionId(sourceEntry.getObjectVersionedKey());

const options = versionId ? { versionId } : undefined;

// Calling deleteObject with undefined options to use deleteObjectNoVer which is used for
// deleting non versioned objects that only have master keys.
// When deleting a versioned object however we supply the version id in the options, which
// causes the function to call the deleteObjectVer function that is used to handle objects that
// have both a master and version keys. This handles the deletion of both the version and the master
// keys in the case where no other version is available, or deleting the version and updating the
// master key otherwise.
return this._mongoClient.deleteObject(bucket, key, options, log,
err => {
if (err) {
this._normalizePendingMetric(location);
log.end().error('error deleting object metadata ' +
'from mongo', {
bucket,
key,
error: err.message,
const entryVersionId = extractVersionId(sourceEntry.getObjectVersionedKey());

// Use x-amz-meta-scal-version-id if provided, instead of the actual versionId of the object.
// This should happen only for restored objects : in all other situations, both the source
// and ingested objects should have the same version id (and no x-amz-meta-scal-version-id
// metadata).
const scalVersionId = sourceEntry.getOverheadField('x-amz-meta-scal-version-id');
const versionId = scalVersionId ? VersionID.decode(scalVersionId) : entryVersionId;

this.logger.debug('processing object delete', { bucket, key, versionId });

async.waterfall([
cb => this._getZenkoObjectMetadata(log, sourceEntry, versionId, cb),
(zenkoObjMd, cb) => {
// Skip if the object is in a different location, i.e. when the delete was caused
// by restored-object expiration or transition. It works because the dataStoreName
// is updated before actually sending the object to GC to effectively delete the
// data.
const encode = versionId => (versionId ? VersionID.encode(versionId) : 'null');
if (zenkoObjMd.dataStoreName !== location ||
zenkoObjMd.location?.length !== 1 ||
zenkoObjMd.location[0].dataStoreName !== location ||
zenkoObjMd.location[0].key !== key ||
(zenkoObjMd.location[0].dataStoreVersionId || 'null') !== encode(entryVersionId)
) {
log.end().info('ignore delete entry, transitioned to another location', {
entry: sourceEntry.getLogInfo(),
location,
});
return done(err);
return done();
}

return cb(null, zenkoObjMd);
},
(zenkoObjMd, cb) => {
const options = {};

// Calling deleteObject with undefined options to use deleteObjectNoVer which is used for
// deleting non versioned objects that only have master keys.
// When deleting a versioned object however we supply the version id in the options, which
// causes the function to call the deleteObjectVer function that is used to handle objects that
// have both a master and version keys. This handles the deletion of both the version and the master
// keys in the case where no other version is available, or deleting the version and updating the
// master key otherwise.
if (versionId) {
options.versionId = versionId;
}

// If the bucket has notification configuration and the object is not archived, we
// don't need the oplog update, and can skip it to lower the load on mongo
if (!zenkoObjMd.archive && !bucketInfo.notificationConfiguration) {
options.doesNotNeedOpogUpdate = true;
}
this._produceMetricCompletionEntry(location);
log.end().info('object metadata deleted from mongo', {

return this._mongoClient.deleteObject(bucket, key, options, log, cb);
},
], err => {
if (err?.is.NoSuchKey) {
log.end().info('skipping delete entry', {
entry: sourceEntry.getLogInfo(),
location,
});
return done();
}
if (err) {
this._normalizePendingMetric(location);
log.end().error('error deleting object metadata ' +
'from mongo', {
bucket,
key,
error: err.message,
location,
});
return done(err);
}
this._produceMetricCompletionEntry(location);
log.end().info('object metadata deleted from mongo', {
entry: sourceEntry.getLogInfo(),
location,
});
return done();
});
}

/**
Expand All @@ -441,10 +494,29 @@
_processObjectQueueEntry(log, sourceEntry, location, bucketInfo, done) {
const bucket = sourceEntry.getBucket();
const key = sourceEntry.getObjectKey();
const scalVersionId = sourceEntry.getValue()['x-amz-meta-scal-version-id'];

this._getZenkoObjectMetadata(log, sourceEntry, bucketInfo,
(err, zenkoObjMd) => {
if (err) {
this.logger.debug('processing object metadata', { bucket, key, scalVersionId });

const maybeGetZenkoObjectMetadata = cb => {
// NOTE: ZenkoObjMD is used for updating replication info, as well as validating the
// `x-amz-meta-scal-version-id` header of restored objects. If the Zenko bucket does
// not have repInfo set and the header is not set, then we can skip fetching.
const bucketRepInfo = bucketInfo.getReplicationConfiguration();
if (!scalVersionId && !bucketRepInfo?.rules?.some(r => r.enabled)) {
return cb();
}

// Use x-amz-meta-scal-version-id if provided, instead of the actual versionId of the object.
// This should happen only for restored objects : in all other situations, both the source
// and ingested objects should have the same version id (and not x-amz-meta-scal-version-id
// metadata).
const versionId = scalVersionId ? VersionID.decode(scalVersionId) : sourceEntry.getVersionId();
return this._getZenkoObjectMetadata(log, sourceEntry, versionId, cb);
};

maybeGetZenkoObjectMetadata((err, zenkoObjMd) => {
if (err && !err.NoSuchKey) {
this._normalizePendingMetric(location);
log.end().error('error processing object queue entry', {
method: 'MongoQueueProcessor._processObjectQueueEntry',
Expand All @@ -466,11 +538,20 @@
return done();
}

// update necessary metadata fields before saving to Zenko MongoDB
this._updateOwnerMD(sourceEntry, bucketInfo);
this._updateObjectDataStoreName(sourceEntry, location);
this._updateLocations(sourceEntry, location);
this._updateAcl(sourceEntry);
if (zenkoObjMd) {
// Keep existing metadata fields, only need to update the tags
const tags = sourceEntry.getTags();
sourceEntry._data = { ...zenkoObjMd }; // eslint-disable-line no-param-reassign
sourceEntry.setTags(tags);
} else {
// Update necessary metadata fields before saving to Zenko MongoDB
this._updateOwnerMD(sourceEntry, bucketInfo);
this._updateObjectDataStoreName(sourceEntry, location);
this._updateLocations(sourceEntry, location);
this._updateAcl(sourceEntry);
}

// Try to update replication info, if applicable
this._updateReplicationInfo(sourceEntry, bucketInfo, content,
zenkoObjMd);

Expand All @@ -480,6 +561,7 @@

const objVal = sourceEntry.getValue();
const params = {};

// Versioning suspended entries will have a version id but also a isNull tag.
// These master keys are considered a version and do not have a duplicate version,
// we don't specify the version id and repairMaster in this case
Expand Down Expand Up @@ -608,6 +690,9 @@
MongoProcessorMetrics.onProcessKafkaEntry();
const log = this.logger.newRequestLogger();
const sourceEntry = QueueEntry.createFromKafkaEntry(kafkaEntry);

this.logger.trace('processing kafka entry', { sourceEntry });

Check warning on line 694 in extensions/mongoProcessor/MongoQueueProcessor.js

View check run for this annotation

Codecov / codecov/patch/Backbeat

extensions/mongoProcessor/MongoQueueProcessor.js#L694

Added line #L694 was not covered by tests

if (sourceEntry.error) {
log.end().error('error processing source entry',
{ error: sourceEntry.error });
Expand All @@ -624,18 +709,20 @@

if (sourceEntry instanceof DeleteOpQueueEntry) {
return this._processDeleteOpQueueEntry(log, sourceEntry,
location, err => {
location, bucketInfo, err => {
this._handleMetrics(sourceEntry, !!err);
return done(err);
});
}

if (sourceEntry instanceof ObjectQueueEntry) {
return this._processObjectQueueEntry(log, sourceEntry, location,
bucketInfo, err => {
this._handleMetrics(sourceEntry, !!err);
return done(err);
});
}

log.end().warn('skipping unknown source entry', {
entry: sourceEntry.getLogInfo(),
entryType: sourceEntry.constructor.name,
Expand Down
12 changes: 11 additions & 1 deletion lib/models/DeleteOpQueueEntry.js
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,16 @@
* @param {string} bucket - entry bucket
* @param {string} key - entry key (a versioned key if the entry
* was for a versioned object or a regular key if no versioning)
* @param {object} overheadFields - overhead fields associated with
* the operation (e.g. previous object's metadata)
*/
constructor(bucket, key) {
constructor(bucket, key, overheadFields) {
this._bucket = bucket;
this._key = key;
this._objectVersionedKey = key;
this._objectKey = _extractVersionedBaseKey(key);
this._startProcessing = Date.now();
this._overheadFields = overheadFields || {};
}

getStartProcessing() {
Expand All @@ -32,6 +35,9 @@
if (typeof this.getObjectKey() !== 'string') {
return { message: 'missing key name' };
}
if (typeof this._overheadFields !== 'object') {
return { message: 'invalid overhead fields' };

Check warning on line 39 in lib/models/DeleteOpQueueEntry.js

View check run for this annotation

Codecov / codecov/patch/Backbeat

lib/models/DeleteOpQueueEntry.js#L38-L39

Added lines #L38 - L39 were not covered by tests
}
return undefined;
}

Expand All @@ -51,6 +57,10 @@
return this._objectVersionedKey;
}

getOverheadField(field) {
return this._overheadFields[field];
}

isVersion() {
return this.getObjectKey() === this.getObjectVersionedKey();
}
Expand Down
3 changes: 2 additions & 1 deletion lib/models/QueueEntry.js
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,8 @@
}
let entry;
if (record.type === 'del') {
entry = new DeleteOpQueueEntry(record.bucket, record.key);
const overheadFields = JSON.parse(record.overheadFields);
entry = new DeleteOpQueueEntry(record.bucket, record.key, overheadFields);

Check warning on line 31 in lib/models/QueueEntry.js

View check run for this annotation

Codecov / codecov/patch/Backbeat

lib/models/QueueEntry.js#L30-L31

Added lines #L30 - L31 were not covered by tests
} else if (record.bucket === usersBucket) {
// BucketQueueEntry class just handles puts of keys
// to usersBucket
Expand Down
2 changes: 1 addition & 1 deletion lib/queuePopulator/LogReader.js
Original file line number Diff line number Diff line change
Expand Up @@ -497,9 +497,9 @@ class LogReader {
return next();
}
const overheadFields = {
...entry.overhead,
commitTimestamp: record.timestamp,
opTimestamp: entry.timestamp,
versionId: entry.overhead?.versionId,
};
const entryToFilter = {
type: entry.type,
Expand Down
Loading
Loading