From b42df1334b23b51810d69825652079a3f7c520bd Mon Sep 17 00:00:00 2001 From: Nicolas Humbert Date: Tue, 26 Dec 2023 10:58:15 +0100 Subject: [PATCH] S3UTILS-54 crrExistingObjects is compatible with both S3C and Artesca --- CRR/ReplicationStatusUpdater.js | 410 +++++++++++++++++++++ CRR/clients.js | 74 ++++ CrrExistingObjects/listingParser.js | 24 -- CrrExistingObjects/metadataClient.js | 20 - CrrExistingObjects/metadataUtils.js | 211 ----------- crrExistingObjects.js | 346 +++-------------- tests/unit/CRR/ReplicationStatusUpdater.js | 278 ++++++++++++++ tests/unit/CRR/crrExistingObjects.js | 187 ++++++++++ tests/utils/crr.js | 195 ++++++++++ 9 files changed, 1190 insertions(+), 555 deletions(-) create mode 100644 CRR/ReplicationStatusUpdater.js create mode 100644 CRR/clients.js delete mode 100644 CrrExistingObjects/listingParser.js delete mode 100644 CrrExistingObjects/metadataClient.js delete mode 100644 CrrExistingObjects/metadataUtils.js create mode 100644 tests/unit/CRR/ReplicationStatusUpdater.js create mode 100644 tests/unit/CRR/crrExistingObjects.js create mode 100644 tests/utils/crr.js diff --git a/CRR/ReplicationStatusUpdater.js b/CRR/ReplicationStatusUpdater.js new file mode 100644 index 00000000..6abc73cf --- /dev/null +++ b/CRR/ReplicationStatusUpdater.js @@ -0,0 +1,410 @@ +const { + doWhilst, eachSeries, eachLimit, waterfall, series, +} = require('async'); +const werelogs = require('werelogs'); +const { ObjectMD } = require('arsenal').models; + +const { setupClients } = require('./clients'); + +const LOG_PROGRESS_INTERVAL_MS = 10000; +const AWS_SDK_REQUEST_RETRIES = 100; +const AWS_SDK_REQUEST_DELAY_MS = 30; + +class ReplicationStatusUpdater { + /** + * @param {Object} params - An object containing the configuration parameters for the instance. + * @param {Array} params.buckets - An array of bucket names to process. + * @param {Array} params.replicationStatusToProcess - Replication status to be processed. + * @param {number} params.workers - Number of worker threads for processing. + * @param {string} params.accessKey - Access key for AWS SDK authentication. + * @param {string} params.secretKey - Secret key for AWS SDK authentication. + * @param {string} params.endpoint - Endpoint URL for the S3 service. + * @param {Object} log - The logging object used for logging purposes within the instance. + * + * @param {string} [params.siteName] - (Optional) Name of the destination site. + * @param {string} [params.storageType] - (Optional) Type of the destination site (aws_s3, azure...). + * @param {string} [params.targetPrefix] - (Optional) Prefix to target for replication. + * @param {number} [params.listingLimit] - (Optional) Limit for listing objects. + * @param {number} [params.maxUpdates] - (Optional) Maximum number of updates to perform. + * @param {number} [params.maxScanned] - (Optional) Maximum number of items to scan. + * @param {string} [params.keyMarker] - (Optional) Key marker for resuming object listing. + * @param {string} [params.versionIdMarker] - (Optional) Version ID marker for resuming object listing. + */ + constructor(params, log) { + const { + buckets, + replicationStatusToProcess, + workers, + accessKey, + secretKey, + endpoint, + siteName, + storageType, + targetPrefix, + listingLimit, + maxUpdates, + maxScanned, + keyMarker, + versionIdMarker, + } = params; + + // inputs + this.buckets = buckets; + this.replicationStatusToProcess = replicationStatusToProcess; + this.workers = workers; + this.accessKey = accessKey; + this.secretKey = secretKey; + this.endpoint = endpoint; + this.siteName = siteName; + this.storageType = storageType; + this.targetPrefix = targetPrefix; + this.listingLimit = listingLimit; + this.maxUpdates = maxUpdates; + this.maxScanned = maxScanned; + this.inputKeyMarker = keyMarker; + this.inputVersionIdMarker = versionIdMarker; + this.log = log; + + this._setupClients(); + + this.logProgressInterval = setInterval(this._logProgress.bind(this), LOG_PROGRESS_INTERVAL_MS); + + // intenal state + this._nProcessed = 0; + this._nSkipped = 0; + this._nUpdated = 0; + this._nErrors = 0; + this._bucketInProgress = null; + this._VersionIdMarker = null; + this._KeyMarker = null; + } + + /** + * Sets up and initializes the S3 and Backbeat client instances. + * + * @returns {void} This method does not return a value; instead, it sets the S3 and Backbeat clients. + */ + _setupClients() { + const { s3, bb } = setupClients({ + accessKey: this.accessKey, + secretKey: this.secretKey, + endpoint: this.endpoint, + }, this.log); + + this.s3 = s3; + this.bb = bb; + } + + /** + * Logs the progress of the CRR process at regular intervals. + * @private + * @returns {void} + */ + _logProgress() { + this.log.info('progress update', { + updated: this._nUpdated, + skipped: this._nSkipped, + errors: this._nErrors, + bucket: this._bucketInProgress || null, + keyMarker: this._KeyMarker || null, + versionIdMarker: this._VersionIdMarker || null, + }); + } + + + /** + * Determines if an object should be updated based on its replication metadata properties. + * @private + * @param {ObjectMD} objMD - The metadata of the object. + * @returns {boolean} True if the object should be updated. + */ + _objectShouldBeUpdated(objMD) { + return this.replicationStatusToProcess.some(filter => { + if (filter === 'NEW') { + return (!objMD.getReplicationInfo() + || objMD.getReplicationInfo().status === ''); + } + return (objMD.getReplicationInfo() + && objMD.getReplicationInfo().status === filter); + }); + } + + /** + * Marks an object as pending for replication. + * @private + * @param {string} bucket - The bucket name. + * @param {string} key - The object key. + * @param {string} versionId - The object version ID. + * @param {string} storageClass - The storage class for replication. + * @param {Object} repConfig - The replication configuration. + * @param {Function} cb - Callback function. + * @returns {void} + */ + _markObjectPending( + bucket, + key, + versionId, + storageClass, + repConfig, + cb, + ) { + let objMD; + let skip = false; + return waterfall([ + // get object blob + next => this.bb.getMetadata({ + Bucket: bucket, + Key: key, + VersionId: versionId, + }, next), + (mdRes, next) => { + // TODO: Check if object gets deleted after listing and before GET metadata. + // NOTE: The Arsenal Object Metadata schema version 8.1 is being used for both Ring S3C and Artesca, + // and this is acceptable because the 8.1 schema only adds extra properties to the 7.10 schema. + // This is beneficial because: + // - Forward compatibility: Having the 8.1 properties in place now ensures that + // S3C is compatible with the 8.1 schema, which could be useful if we plan to upgrade + // from 7.10 to 8.1 in the future. + // - Minimal impact on current functionality: The extra properties from the 8.1 + // schema do not interfere with the current functionalities of the 7.10 environment, + // so there is no harm in keeping them. S3C should ignore them without causing any issues. + // - Simplified codebase: Not having to remove these properties simplifies the codebase of s3utils. + // This avoids the added complexity and potential errors associated with conditionally removing + // or altering metadata properties based on the version. + // - Single schema approach: Maintaining a single, unified schema approach in s3utils can make the + // codebase easier to maintain and upgrade, as opposed to having multiple branches or versions of + // the code for different schema versions. + objMD = new ObjectMD(JSON.parse(mdRes.Body)); + if (!this._objectShouldBeUpdated(objMD)) { + skip = true; + return process.nextTick(next); + } + // Initialize replication info, if missing + // This is particularly important if the object was created before + // enabling replication on the bucket. + if (!objMD.getReplicationInfo() + || !objMD.getReplicationSiteStatus(storageClass)) { + const { Rules, Role } = repConfig; + const destination = Rules[0].Destination.Bucket; + // set replication properties + const ops = objMD.getContentLength() === 0 ? ['METADATA'] + : ['METADATA', 'DATA']; + const backends = [{ + site: storageClass, + status: 'PENDING', + dataStoreVersionId: '', + }]; + const replicationInfo = { + status: 'PENDING', + backends, + content: ops, + destination, + storageClass, + role: Role, + storageType: this.storageType, + }; + objMD.setReplicationInfo(replicationInfo); + } + + objMD.setReplicationSiteStatus(storageClass, 'PENDING'); + objMD.setReplicationStatus('PENDING'); + objMD.updateMicroVersionId(); + const md = objMD.getSerialized(); + return this.bb.putMetadata({ + Bucket: bucket, + Key: key, + VersionId: versionId, + ContentLength: Buffer.byteLength(md), + Body: md, + }, next); + }, + ], err => { + ++this._nProcessed; + if (err) { + ++this._nErrors; + this.log.error('error updating object', { + bucket, key, versionId, error: err.message, + }); + cb(); + return; + } + if (skip) { + ++this._nSkipped; + } else { + ++this._nUpdated; + } + cb(); + }); + } + + /** + * Lists object versions for a bucket. + * @private + * @param {string} bucket - The bucket name. + * @param {string} VersionIdMarker - The version ID marker for pagination. + * @param {string} KeyMarker - The key marker for pagination. + * @param {Function} cb - Callback function. + * @returns {void} + */ + _listObjectVersions(bucket, VersionIdMarker, KeyMarker, cb) { + return this.s3.listObjectVersions({ + Bucket: bucket, + MaxKeys: this.listingLimit, + Prefix: this.targetPrefix, + VersionIdMarker, + KeyMarker, + }, cb); + } + + /** + * Marks pending replication for listed object versions. + * @private + * @param {string} bucket - The bucket name. + * @param {Array} versions - Array of object versions. + * @param {Function} cb - Callback function. + * @returns {void} + */ + _markPending(bucket, versions, cb) { + const options = { Bucket: bucket }; + waterfall([ + next => this.s3.getBucketReplication(options, (err, res) => { + if (err) { + this.log.error('error getting bucket replication', { error: err }); + return next(err); + } + return next(null, res.ReplicationConfiguration); + }), + (repConfig, next) => { + const { Rules } = repConfig; + const storageClass = Rules[0].Destination.StorageClass || this.siteName; + if (!storageClass) { + const errMsg = 'missing SITE_NAME environment variable, must be set to' + + ' the value of "site" property in the CRR configuration'; + this.log.error(errMsg); + return next(new Error(errMsg)); + } + return eachLimit(versions, this.workers, (i, apply) => { + const { Key, VersionId } = i; + this._markObjectPending(bucket, Key, VersionId, storageClass, repConfig, apply); + }, next); + }, + ], cb); + } + + /** + * Triggers CRR process on a specific bucket. + * @private + * @param {string} bucketName - The name of the bucket. + * @param {Function} cb - Callback function. + * @returns {void} + */ + _triggerCRROnBucket(bucketName, cb) { + const bucket = bucketName.trim(); + this._bucketInProgress = bucket; + this.log.info(`starting task for bucket: ${bucket}`); + if (this.inputKeyMarker || this.inputVersionIdMarker) { + // resume from where we left off in previous script launch + this._KeyMarker = this.inputKeyMarker; + this._VersionIdMarker = this.inputVersionIdMarker; + this.inputKeyMarker = undefined; + this.inputVersionIdMarker = undefined; + this.log.info(`resuming bucket: ${bucket} at: KeyMarker=${this._KeyMarker} ` + + `VersionIdMarker=${this._VersionIdMarker}`); + } + return doWhilst( + done => this._listObjectVersions( + bucket, + this._VersionIdMarker, + this._KeyMarker, + (err, data) => { + if (err) { + this.log.error('error listing object versions', { error: err }); + return done(err); + } + return this._markPending(bucket, data.Versions.concat(data.DeleteMarkers), err => { + if (err) { + return done(err); + } + this._VersionIdMarker = data.NextVersionIdMarker; + this._KeyMarker = data.NextKeyMarker; + return done(); + }); + }, + ), + () => { + if (this._nUpdated >= this.maxUpdates || this._nProcessed >= this.maxScanned) { + this._logProgress(); + let remainingBuckets; + if (this._VersionIdMarker || this._KeyMarker) { + // next bucket to process is still the current one + remainingBuckets = this.buckets.slice( + this.buckets.findIndex(bucket => bucket === bucketName), + ); + } else { + // next bucket to process is the next in bucket list + remainingBuckets = this.buckets.slice( + this.buckets.findIndex(bucket => bucket === bucketName) + 1, + ); + } + let message = 'reached ' + + `${this._nUpdated >= this.maxUpdates ? 'update' : 'scanned'} ` + + 'count limit, resuming from this ' + + 'point can be achieved by re-running the script with ' + + `the bucket list "${remainingBuckets.join(',')}"`; + if (this._VersionIdMarker || this._KeyMarker) { + message += ' and the following environment variables set: ' + + `KEY_MARKER=${this._KeyMarker} ` + + `VERSION_ID_MARKER=${this._VersionIdMarker}`; + } + this.log.info(message); + return false; + } + if (this._VersionIdMarker || this._KeyMarker) { + return true; + } + return false; + }, + err => { + this._bucketInProgress = null; + if (err) { + this.log.error('error marking objects for crr', { bucket }); + cb(err); + return; + } + this._logProgress(); + this.log.info(`completed task for bucket: ${bucket}`); + cb(); + }, + ); + } + + /** + * Runs the CRR process on all configured buckets. + * @param {Function} cb - Callback function. + * @returns {void} + */ + run(cb) { + return eachSeries(this.buckets, this._triggerCRROnBucket.bind(this), err => { + clearInterval(this.logProgressInterval); + if (err) { + cb(err); + return; + } + cb(); + }); + } + + /** + * Stops the execution of the CRR process. + * NOTE: This method terminates the node.js process, and hence it does not return a value. + * @returns {void} + */ + stop() { + this.log.warn('stopping execution'); + this._logProgress(); + clearInterval(this.logProgressInterval); + process.exit(1); + } +} + +module.exports = ReplicationStatusUpdater; diff --git a/CRR/clients.js b/CRR/clients.js new file mode 100644 index 00000000..988de987 --- /dev/null +++ b/CRR/clients.js @@ -0,0 +1,74 @@ +const AWS = require('aws-sdk'); +const http = require('http'); +const BackbeatClient = require('../BackbeatClient'); + +const AWS_SDK_REQUEST_DELAY_MS = 30; +const AWS_SDK_REQUEST_RETRIES = 100; +const LOG_PROGRESS_INTERVAL_MS = 10000; + +/** + * Sets up and configures AWS S3 and Backbeat clients. + * + * This function initializes and configures clients for S3 and Backbeat services, + * using provided access credentials and endpoint configurations. It includes a custom + * backoff strategy for AWS SDK requests to handle retries in case of errors. + * The clients are set with specific options, such as maximum retries and custom + * backoff strategies for S3 requests. + * + * @param {Object} config - The configuration object for the clients. + * @param {string} config.accessKey - The access key for AWS services. + * @param {string} config.secretKey - The secret key for AWS services. + * @param {string} config.endpoint - The endpoint URL for the AWS services. + * @param {Function} log - The logging function for error logging. + * @returns {Object} An object containing initialized S3 and Backbeat clients. + */ +function setupClients({ + accessKey, + secretKey, + endpoint, +}, log) { + const awsConfig = { + accessKeyId: accessKey, + secretAccessKey: secretKey, + endpoint, + region: 'us-east-1', + sslEnabled: false, + s3ForcePathStyle: true, + apiVersions: { s3: '2006-03-01' }, + signatureVersion: 'v4', + signatureCache: false, + httpOptions: { + timeout: 0, + agent: new http.Agent({ keepAlive: true }), + }, + }; + + /** + * Custom backoff strategy for AWS SDK requests. + * @param {number} retryCount - The current retry attempt. + * @param {Error} error - The error that caused the retry. + * @returns {number} The delay in milliseconds before the next retry. + */ + function customBackoffStrategy(retryCount, error) { + this.log.error('aws sdk request error', { error, retryCount }); + // The delay is not truly exponential; it resets to the minimum after every 10 calls, + // with a maximum delay of 15 seconds. + return AWS_SDK_REQUEST_DELAY_MS * (2 ** (retryCount % 10)); + } + + // Specific options for S3 requests + const s3SpecificOptions = { + maxRetries: AWS_SDK_REQUEST_RETRIES, + customBackoff: customBackoffStrategy, + }; + + // Create an S3 client instance + const s3 = new AWS.S3({ ...awsConfig, ...s3SpecificOptions }); + + // Create a BackbeatClient instance + const bb = new BackbeatClient(awsConfig); + + return { s3, bb }; +} + +module.exports = { setupClients }; diff --git a/CrrExistingObjects/listingParser.js b/CrrExistingObjects/listingParser.js deleted file mode 100644 index 5d47b9bd..00000000 --- a/CrrExistingObjects/listingParser.js +++ /dev/null @@ -1,24 +0,0 @@ -function listingParser(entries) { - if (!entries) { - return entries; - } - return entries.map(entry => { - const tmp = JSON.parse(entry.value); - return { - Key: entry.key, - Size: tmp['content-length'], - ETag: tmp['content-md5'], - VersionId: tmp.versionId, - IsNull: tmp.isNull, - IsDeleteMarker: tmp.isDeleteMarker, - LastModified: tmp['last-modified'], - Owner: { - DisplayName: tmp['owner-display-name'], - ID: tmp['owner-id'], - }, - StorageClass: tmp['x-amz-storage-class'], - }; - }); -} - -module.exports = listingParser; diff --git a/CrrExistingObjects/metadataClient.js b/CrrExistingObjects/metadataClient.js deleted file mode 100644 index 58d20c86..00000000 --- a/CrrExistingObjects/metadataClient.js +++ /dev/null @@ -1,20 +0,0 @@ -const { MetadataWrapper } = require('arsenal').storage.metadata; -const werelogs = require('werelogs'); -const createMongoParams = require('../utils/createMongoParams'); -const listingParser = require('./listingParser'); - -const loggerConfig = { - level: 'info', - dump: 'error', -}; -werelogs.configure(loggerConfig); - -const log = new werelogs.Logger('s3utils::crrExistingObjects'); -const implName = 'mongodb'; -const params = { - customListingParser: listingParser, - mongodb: createMongoParams(log, { readPreference: 'primary' }), -}; -const metadata = new MetadataWrapper(implName, params, null, log); - -module.exports = metadata; diff --git a/CrrExistingObjects/metadataUtils.js b/CrrExistingObjects/metadataUtils.js deleted file mode 100644 index 8c197baa..00000000 --- a/CrrExistingObjects/metadataUtils.js +++ /dev/null @@ -1,211 +0,0 @@ -const { errors, versioning } = require('arsenal'); -const metadataClient = require('./metadataClient'); - -const versionIdUtils = versioning.VersionID; - -const { GENERATE_INTERNAL_VERSION_ID } = process.env; -const REPLICATION_GROUP_ID = process.env.REPLICATION_GROUP_ID || 'RG001'; -// Use Arsenal function to generate a version ID used internally by metadata -// for null versions that are created before bucket versioning is configured -const nonVersionedObjId = versionIdUtils.getInfVid(REPLICATION_GROUP_ID); - -function _processVersions(list) { - /* eslint-disable no-param-reassign */ - list.NextVersionIdMarker = list.NextVersionIdMarker - ? versionIdUtils.encode(list.NextVersionIdMarker) - : list.NextVersionIdMarker; - - list.Versions.forEach(v => { - v.VersionId = v.VersionId - ? versionIdUtils.encode(v.VersionId) : v.VersionId; - }); - /* eslint-enable no-param-reassign */ - return list; -} - -function listObjectVersions(params, log, cb) { - const bucketName = params.Bucket; - const listingParams = { - listingType: 'DelimiterVersions', - maxKeys: params.MaxKeys, - prefix: params.Prefix, - keyMarker: params.KeyMarker, - versionIdMarker: params.VersionIdMarker, - }; - log.debug('listing object versions', { - method: 'metadataUtils.listObjectVersions', - listingParams, - }); - return metadataClient.listObject( - bucketName, - listingParams, - log, - (err, list) => { - if (err) { - return cb(err); - } - return cb(null, _processVersions(list)); - }, - ); -} - -function _formatConfig(config) { - const { role, destination, rules } = config; - const Rules = rules.map(rule => { - const { - prefix, enabled, storageClass, id, - } = rule; - return { - ID: id, - Prefix: prefix, - Status: enabled ? 'Enabled' : 'Disabled', - Destination: { - Bucket: destination, - StorageClass: (storageClass || ''), - }, - }; - }); - return { - ReplicationConfiguration: { - Role: role, - Rules, - }, - }; -} - -function getBucketReplication(options, log, cb) { - const bucketName = options.Bucket; - log.debug('getting bucket replication', { - method: 'metadataUtils.getBucketReplication', - bucket: bucketName, - }); - return metadataClient.getBucket(bucketName, log, (err, data) => { - if (err) { - return cb(err); - } - const replConf = _formatConfig(data._replicationConfiguration); - return cb(null, replConf); - }); -} - -function _getNullVersion(objMD, bucketName, objectKey, log, cb) { - const options = {}; - if (objMD.isNull || !objMD.versionId) { - log.debug('found null version'); - return process.nextTick(() => cb(null, objMD)); - } - if (objMD.nullVersionId) { - log.debug('null version exists, get the null version'); - options.versionId = objMD.nullVersionId; - return metadataClient.getObjectMD( - bucketName, - objectKey, - options, - log, - cb, - ); - } - return process.nextTick(() => cb()); -} - -function getMetadata(params, log, cb) { - const { Bucket, Key } = params; - let versionId = params.VersionId; - log.debug('getting object metadata', { - method: 'metadataUtils.getMetadata', - bucket: Bucket, - objectKey: Key, - versionId, - }); - if (versionId && versionId !== 'null') { - versionId = versionIdUtils.decode(versionId); - } - if (versionId instanceof Error) { - const errMsg = 'Invalid version id specified'; - return cb(errors.InvalidArgument.customizeDescription(errMsg)); - } - const mdParams = { - versionId, - }; - return metadataClient.getObjectMD( - Bucket, - Key, - mdParams, - log, - (err, data) => { - if (err) { - return cb(err); - } - if (data && versionId === 'null') { - return _getNullVersion( - data, - Bucket, - Key, - log, - (err, nullVer) => { - if (err) { - return cb(err); - } - return cb(null, nullVer); - }, - ); - } - return cb(null, data); - }, - ); -} - -function getOptions(objMD) { - const options = {}; - - if (objMD.versionId === undefined) { - if (!GENERATE_INTERNAL_VERSION_ID) { - return options; - } - - objMD.setIsNull(true); - objMD.setVersionId(nonVersionedObjId); - - options.nullVersionId = objMD.versionId; - // non-versioned (non-null) MPU objects don't have a - // replay ID, so don't reference their uploadId - if (objMD.uploadId) { - options.nullUploadId = objMD.uploadId; - } - } - - // specify both 'versioning' and 'versionId' to create a "new" - // version (updating master as well) but with specified versionId - options.versioning = true; - options.versionId = objMD.versionId; - return options; -} - -function putMetadata(params, log, cb) { - const { Bucket, Key, Body: objMD } = params; - const options = getOptions(objMD); - - log.debug('updating object metadata', { - method: 'metadataUtils.putMetadata', - bucket: Bucket, - objectKey: Key, - versionId: objMD.versionId, - }); - // If the object is from a source bucket without versioning (i.e. NFS), - // then we want to create a version for the replica object even though - // none was provided in the object metadata value. - if (objMD.replicationInfo.isNFS) { - const isReplica = objMD.replicationInfo.status === 'REPLICA'; - options.versioning = isReplica; - objMD.replicationInfo.isNFS = !isReplica; - } - return metadataClient.putObjectMD(Bucket, Key, objMD, options, log, cb); -} - -module.exports = { - metadataClient, - listObjectVersions, - getBucketReplication, - getMetadata, - putMetadata, -}; diff --git a/crrExistingObjects.js b/crrExistingObjects.js index 06312574..21f07500 100644 --- a/crrExistingObjects.js +++ b/crrExistingObjects.js @@ -1,9 +1,5 @@ -const { - doWhilst, eachSeries, eachLimit, waterfall, series, -} = require('async'); const werelogs = require('werelogs'); -const { ObjectMD } = require('arsenal').models; -const metadataUtil = require('./CrrExistingObjects/metadataUtils'); +const ReplicationStatusUpdater = require('./CRR/ReplicationStatusUpdater'); const logLevel = Number.parseInt(process.env.DEBUG, 10) === 1 ? 'debug' : 'info'; @@ -25,18 +21,33 @@ const MAX_UPDATES = (process.env.MAX_UPDATES && Number.parseInt(process.env.MAX_UPDATES, 10)); const MAX_SCANNED = (process.env.MAX_SCANNED && Number.parseInt(process.env.MAX_SCANNED, 10)); -let { KEY_MARKER } = process.env; -let { VERSION_ID_MARKER } = process.env; -const { GENERATE_INTERNAL_VERSION_ID } = process.env; +const { KEY_MARKER } = process.env; +const { VERSION_ID_MARKER } = process.env; + +const { + ACCESS_KEY, + SECRET_KEY, + ENDPOINT, +} = process.env; const LISTING_LIMIT = (process.env.LISTING_LIMIT && Number.parseInt(process.env.LISTING_LIMIT, 10)) || 1000; -const LOG_PROGRESS_INTERVAL_MS = 10000; - if (!BUCKETS || BUCKETS.length === 0) { log.fatal('No buckets given as input! Please provide ' - + 'a comma-separated list of buckets'); + + 'a comma-separated list of buckets'); + process.exit(1); +} +if (!ENDPOINT) { + log.fatal('ENDPOINT not defined!'); + process.exit(1); +} +if (!ACCESS_KEY) { + log.fatal('ACCESS_KEY not defined'); + process.exit(1); +} +if (!SECRET_KEY) { + log.fatal('SECRET_KEY not defined'); process.exit(1); } if (!STORAGE_TYPE) { @@ -51,7 +62,7 @@ replicationStatusToProcess.forEach(state => { if (!['NEW', 'PENDING', 'COMPLETED', 'FAILED', 'REPLICA'].includes(state)) { log.fatal('invalid TARGET_REPLICATION_STATUS environment: must be a ' + 'comma-separated list of replication statuses to requeue, ' - + 'as NEW, PENDING, COMPLETED, FAILED or REPLICA.'); + + 'as NEW,PENDING,COMPLETED,FAILED,REPLICA.'); process.exit(1); } }); @@ -59,297 +70,32 @@ log.info('Objects with replication status ' + `${replicationStatusToProcess.join(' or ')} ` + 'will be reset to PENDING to trigger CRR'); -let nProcessed = 0; -let nSkipped = 0; -let nUpdated = 0; -let nErrors = 0; -let bucketInProgress = null; -let VersionIdMarker = null; -let KeyMarker = null; - -function _logProgress() { - log.info('progress update', { - updated: nUpdated, - skipped: nSkipped, - errors: nErrors, - bucket: bucketInProgress || null, - keyMarker: KeyMarker || null, - versionIdMarker: VersionIdMarker || null, - }); -} - -const logProgressInterval = setInterval(_logProgress, LOG_PROGRESS_INTERVAL_MS); - -function _objectShouldBeUpdated(objMD) { - return replicationStatusToProcess.some(filter => { - if (filter === 'NEW') { - return (!objMD.getReplicationInfo() - || objMD.getReplicationInfo().status === ''); - } - return (objMD.getReplicationInfo() - && objMD.getReplicationInfo().status === filter); - }); -} - -function _markObjectPending( - bucket, - key, - versionId, - storageClass, - repConfig, - cb, -) { - let objMD; - let skip = false; - return waterfall([ - // get object blob - next => metadataUtil.getMetadata({ - Bucket: bucket, - Key: key, - VersionId: versionId, - }, log, next), - (mdRes, next) => { - objMD = new ObjectMD(mdRes); - const md = objMD.getValue(); - if (!_objectShouldBeUpdated(objMD)) { - skip = true; - return next(); - } - if (objMD.getVersionId()) { - // The object already has an *internal* versionId, - // which exists when the object has been put on - // versioned or versioning-suspended bucket. Even if - // the listed version is "null", the object may have - // an actual internal versionId, only if the bucket - // was versioning-suspended when the object was put. - return next(); - } - if (!GENERATE_INTERNAL_VERSION_ID) { - // When the GENERATE_INTERNAL_VERSION_ID env variable is set, - // matching objects with no *internal* versionId will get - // "updated" to get an internal versionId. The external versionId - // will still be "null". - return next(); - } - // The object does not have an *internal* versionId, as it - // was put on a nonversioned bucket: do a first metadata - // update to generate one, just passing on the existing metadata - // blob. Note that the resulting key will still be nonversioned, - // but the following update will be able to create a versioned key - // for this object, so that replication can happen. The externally - // visible version will stay "null". - return metadataUtil.putMetadata({ - Bucket: bucket, - Key: key, - Body: md, - }, log, (err, putRes) => { - if (err) { - return next(err); - } - // No need to fetch the whole metadata again, simply - // update the one we have with the generated versionId. - objMD.setVersionId(putRes.versionId); - return next(); - }); - }, - // update replication info and put back object blob - next => { - if (skip) { - return next(); - } - - // Initialize replication info, if missing - if (!objMD.getReplicationInfo() - || !objMD.getReplicationSiteStatus(storageClass)) { - const { Rules, Role } = repConfig; - const destination = Rules[0].Destination.Bucket; - // set replication properties - const ops = objMD.getContentLength() === 0 ? ['METADATA'] - : ['METADATA', 'DATA']; - const backends = [{ - site: storageClass, - status: 'PENDING', - dataStoreVersionId: '', - }]; - const replicationInfo = { - status: 'PENDING', - backends, - content: ops, - destination, - storageClass, - role: Role, - storageType: STORAGE_TYPE, - }; - objMD.setReplicationInfo(replicationInfo); - } - - objMD.setReplicationSiteStatus(storageClass, 'PENDING'); - objMD.setReplicationStatus('PENDING'); - objMD.updateMicroVersionId(); - const md = objMD.getValue(); - return metadataUtil.putMetadata({ - Bucket: bucket, - Key: key, - Body: md, - }, log, next); - }, - ], err => { - ++nProcessed; - if (err) { - ++nErrors; - log.error('error updating object', { - bucket, key, versionId, error: err.message, - }); - return cb(); - } - if (skip) { - ++nSkipped; - } else { - ++nUpdated; - } - return cb(); - }); -} - -// list object versions -function _listObjectVersions(bucket, VersionIdMarker, KeyMarker, cb) { - return metadataUtil.listObjectVersions({ - Bucket: bucket, - MaxKeys: LISTING_LIMIT, - Prefix: TARGET_PREFIX, - VersionIdMarker, - KeyMarker, - }, log, cb); -} - -function _markPending(bucket, versions, cb) { - const options = { Bucket: bucket }; - waterfall([ - next => metadataUtil.getBucketReplication(options, log, (err, res) => { - if (err) { - log.error('error getting bucket replication', { error: err }); - return next(err); - } - return next(null, res.ReplicationConfiguration); - }), - (repConfig, next) => { - const { Rules } = repConfig; - const storageClass = Rules[0].Destination.StorageClass || SITE_NAME; - if (!storageClass) { - const errMsg = 'missing SITE_NAME environment variable, must be set to' - + ' the value of "site" property in the CRR configuration'; - log.error(errMsg); - return next(new Error(errMsg)); - } - return eachLimit(versions, WORKERS, (i, apply) => { - const { Key, VersionId } = i; - _markObjectPending(bucket, Key, VersionId, storageClass, repConfig, apply); - }, next); - }, - ], cb); -} - -function triggerCRROnBucket(bucketName, cb) { - const bucket = bucketName.trim(); - bucketInProgress = bucket; - log.info(`starting task for bucket: ${bucket}`); - if (KEY_MARKER || VERSION_ID_MARKER) { - // resume from where we left off in previous script launch - KeyMarker = KEY_MARKER; - VersionIdMarker = VERSION_ID_MARKER; - KEY_MARKER = undefined; - VERSION_ID_MARKER = undefined; - log.info(`resuming at: KeyMarker=${KeyMarker} ` - + `VersionIdMarker=${VersionIdMarker}`); - } - doWhilst( - done => _listObjectVersions( - bucket, - VersionIdMarker, - KeyMarker, - (err, data) => { - if (err) { - log.error('error listing object versions', { error: err }); - return done(err); - } - const versions = data.DeleteMarkers - ? data.Versions.concat(data.DeleteMarkers) : data.Versions; - return _markPending(bucket, versions, err => { - if (err) { - return done(err); - } - VersionIdMarker = data.NextVersionIdMarker; - KeyMarker = data.NextKeyMarker; - return done(); - }); - }, - ), - () => { - if (nUpdated >= MAX_UPDATES || nProcessed >= MAX_SCANNED) { - _logProgress(); - let remainingBuckets; - if (VersionIdMarker || KeyMarker) { - // next bucket to process is still the current one - remainingBuckets = BUCKETS.slice( - BUCKETS.findIndex(bucket => bucket === bucketName), - ); - } else { - // next bucket to process is the next in bucket list - remainingBuckets = BUCKETS.slice( - BUCKETS.findIndex(bucket => bucket === bucketName) + 1, - ); - } - let message = 'reached ' - + `${nUpdated >= MAX_UPDATES ? 'update' : 'scanned'} ` - + 'count limit, resuming from this ' - + 'point can be achieved by re-running the script with ' - + `the bucket list "${remainingBuckets.join(',')}"`; - if (VersionIdMarker || KeyMarker) { - message += ' and the following environment variables set: ' - + `KEY_MARKER=${KeyMarker} ` - + `VERSION_ID_MARKER=${VersionIdMarker}`; - } - log.info(message); - process.exit(0); - } - if (VersionIdMarker || KeyMarker) { - return true; - } - return false; - }, - err => { - bucketInProgress = null; - if (err) { - log.error('error marking objects for crr', { bucket }); - return cb(err); - } - _logProgress(); - log.info(`completed task for bucket: ${bucket}`); - return cb(); - }, - ); -} - -// trigger the calls to list objects and mark them for crr -series([ - next => metadataUtil.metadataClient.setup(next), - next => eachSeries(BUCKETS, triggerCRROnBucket, next), - next => metadataUtil.metadataClient.close(next), -], err => { - clearInterval(logProgressInterval); +const replicationStatusUpdater = new ReplicationStatusUpdater({ + buckets: BUCKETS, + replicationStatusToProcess, + workers: WORKERS, + accessKey: ACCESS_KEY, + secretKey: SECRET_KEY, + endpoint: ENDPOINT, + siteName: SITE_NAME, + storageType: STORAGE_TYPE, + targetPrefix: TARGET_PREFIX, + listingLimit: LISTING_LIMIT, + maxUpdates: MAX_UPDATES, + maxScanned: MAX_SCANNED, + keyMarker: KEY_MARKER, + versionIdMarker: VERSION_ID_MARKER, +}, log); + +replicationStatusUpdater.run(err => { if (err) { return log.error('error during task execution', { error: err }); } return log.info('completed task for all buckets'); }); -function stop() { - log.warn('stopping execution'); - _logProgress(); - process.exit(1); -} - -process.on('SIGINT', stop); -process.on('SIGHUP', stop); -process.on('SIGQUIT', stop); -process.on('SIGTERM', stop); +const stopCrr = replicationStatusUpdater.stop; +process.on('SIGINT', stopCrr); +process.on('SIGHUP', stopCrr); +process.on('SIGQUIT', stopCrr); +process.on('SIGTERM', stopCrr); diff --git a/tests/unit/CRR/ReplicationStatusUpdater.js b/tests/unit/CRR/ReplicationStatusUpdater.js new file mode 100644 index 00000000..8f83c9e5 --- /dev/null +++ b/tests/unit/CRR/ReplicationStatusUpdater.js @@ -0,0 +1,278 @@ +const AWS = require('aws-sdk'); +const werelogs = require('werelogs'); +const assert = require('assert'); + +const BackbeatClient = require('../../../BackbeatClient'); +const ReplicationStatusUpdater = require('../../../CRR/ReplicationStatusUpdater'); +const { + initializeCrrWithMocks, + listVersionRes, + listVersionsRes, + listVersionWithMarkerRes, + getBucketReplicationRes, + getMetadataRes, + putMetadataRes, +} = require('../../utils/crr'); + +const logger = new werelogs.Logger('ReplicationStatusUpdater::tests', 'debug', 'debug'); + +describe('ReplicationStatusUpdater', () => { + let crr; + + beforeEach(() => { + crr = initializeCrrWithMocks({ + buckets: ['bucket0'], + workers: 10, + replicationStatusToProcess: ['NEW'], + targetPrefix: 'toto', + listingLimit: 10, + }, logger); + }); + + it('should process bucket for CRR', done => { + crr.run(err => { + assert.ifError(err); + + expect(crr.s3.listObjectVersions).toHaveBeenCalledTimes(1); + expect(crr.s3.listObjectVersions).toHaveBeenCalledWith({ + Bucket: 'bucket0', + KeyMarker: null, + MaxKeys: 10, + Prefix: 'toto', + VersionIdMarker: null, + }, expect.any(Function)); + + expect(crr.s3.getBucketReplication).toHaveBeenCalledTimes(1); + expect(crr.s3.getBucketReplication).toHaveBeenCalledWith({ + Bucket: 'bucket0', + }, expect.any(Function)); + + expect(crr.bb.getMetadata).toHaveBeenCalledTimes(1); + expect(crr.bb.getMetadata).toHaveBeenCalledWith({ + Bucket: 'bucket0', + Key: listVersionRes.Versions[0].Key, + VersionId: listVersionRes.Versions[0].VersionId, + }, expect.any(Function)); + + expect(crr.bb.putMetadata).toHaveBeenCalledTimes(1); + const expectedReplicationInfo = { + status: 'PENDING', + backends: [ + { + site: 'aws-location', + status: 'PENDING', + dataStoreVersionId: '', + }, + ], + content: ['METADATA', 'DATA'], + destination: 'arn:aws:s3:::sourcebucket', + storageClass: 'aws-location', + role: 'arn:aws:iam::root:role/s3-replication-role', + storageType: '', + dataStoreVersionId: '', + isNFS: null, + }; + expect(crr.bb.putMetadata).toHaveBeenCalledWith( + expect.objectContaining({ + Body: expect.stringContaining(JSON.stringify(expectedReplicationInfo)), + }), + expect.any(Function), + ); + + assert.strictEqual(crr._nProcessed, 1); + assert.strictEqual(crr._nSkipped, 0); + assert.strictEqual(crr._nUpdated, 1); + assert.strictEqual(crr._nErrors, 0); + return done(); + }); + }); + + it('should process bucket for CRR with multiple objects', done => { + crr.s3.listObjectVersions = jest.fn((params, cb) => cb(null, listVersionsRes)); + + crr.run(err => { + assert.ifError(err); + + expect(crr.s3.listObjectVersions).toHaveBeenCalledTimes(1); + expect(crr.s3.listObjectVersions).toHaveBeenCalledWith({ + Bucket: 'bucket0', + KeyMarker: null, + MaxKeys: 10, + Prefix: 'toto', + VersionIdMarker: null, + }, expect.any(Function)); + + expect(crr.s3.getBucketReplication).toHaveBeenCalledTimes(1); + expect(crr.s3.getBucketReplication).toHaveBeenCalledWith({ + Bucket: 'bucket0', + }, expect.any(Function)); + + expect(crr.bb.getMetadata).toHaveBeenCalledTimes(2); + expect(crr.bb.getMetadata).toHaveBeenNthCalledWith(1, { + Bucket: 'bucket0', + Key: listVersionsRes.Versions[0].Key, + VersionId: listVersionsRes.Versions[0].VersionId, + }, expect.any(Function)); + + expect(crr.bb.getMetadata).toHaveBeenNthCalledWith(2, { + Bucket: 'bucket0', + Key: listVersionsRes.Versions[1].Key, + VersionId: listVersionsRes.Versions[1].VersionId, + }, expect.any(Function)); + + expect(crr.bb.putMetadata).toHaveBeenCalledTimes(2); + + assert.strictEqual(crr._nProcessed, 2); + assert.strictEqual(crr._nSkipped, 0); + assert.strictEqual(crr._nUpdated, 2); + assert.strictEqual(crr._nErrors, 0); + return done(); + }); + }); +}); + +describe('ReplicationStatusUpdater with specifics', () => { + it('maxUpdates set to 1', done => { + const crr = initializeCrrWithMocks({ + buckets: ['bucket0'], + workers: 10, + replicationStatusToProcess: ['NEW'], + maxUpdates: 1, + }, logger); + + crr.s3.listObjectVersions = jest.fn((params, cb) => cb(null, listVersionWithMarkerRes)); + + crr.run(err => { + assert.ifError(err); + + expect(crr.s3.listObjectVersions).toHaveBeenCalledTimes(1); + expect(crr.s3.getBucketReplication).toHaveBeenCalledTimes(1); + expect(crr.bb.getMetadata).toHaveBeenCalledTimes(1); + expect(crr.bb.putMetadata).toHaveBeenCalledTimes(1); + + assert.strictEqual(crr._nProcessed, 1); + assert.strictEqual(crr._nSkipped, 0); + assert.strictEqual(crr._nUpdated, 1); + assert.strictEqual(crr._nErrors, 0); + done(); + }); + }); + + it('maxUpdates set to 2', done => { + const crr = initializeCrrWithMocks({ + buckets: ['bucket0'], + workers: 10, + replicationStatusToProcess: ['NEW'], + maxUpdates: 2, + }, logger); + + crr.s3.listObjectVersions = jest.fn((params, cb) => cb(null, listVersionWithMarkerRes)); + + crr.run(err => { + assert.ifError(err); + + expect(crr.s3.listObjectVersions).toHaveBeenCalledTimes(2); + + expect(crr.s3.listObjectVersions).toHaveBeenNthCalledWith(1, { + Bucket: 'bucket0', + Prefix: undefined, + MaxKeys: undefined, + KeyMarker: null, + VersionIdMarker: null, + }, expect.any(Function)); + + expect(crr.s3.listObjectVersions).toHaveBeenNthCalledWith(2, { + Bucket: 'bucket0', + Prefix: undefined, + MaxKeys: undefined, + KeyMarker: 'key0', + VersionIdMarker: 'aJdO148N3LjN00000000001I4j3QKItW', + }, expect.any(Function)); + + expect(crr.s3.getBucketReplication).toHaveBeenCalledTimes(2); + expect(crr.bb.getMetadata).toHaveBeenCalledTimes(2); + expect(crr.bb.putMetadata).toHaveBeenCalledTimes(2); + + assert.strictEqual(crr._nProcessed, 2); + assert.strictEqual(crr._nSkipped, 0); + assert.strictEqual(crr._nUpdated, 2); + assert.strictEqual(crr._nErrors, 0); + done(); + }); + }); + + it('maxScanned set to 1', done => { + const crr = initializeCrrWithMocks({ + buckets: ['bucket0'], + workers: 10, + replicationStatusToProcess: ['NEW'], + maxScanned: 1, + }, logger); + + crr.s3.listObjectVersions = jest.fn((params, cb) => cb(null, listVersionWithMarkerRes)); + + crr.run(err => { + assert.ifError(err); + + expect(crr.s3.listObjectVersions).toHaveBeenCalledTimes(1); + expect(crr.s3.getBucketReplication).toHaveBeenCalledTimes(1); + expect(crr.bb.getMetadata).toHaveBeenCalledTimes(1); + expect(crr.bb.putMetadata).toHaveBeenCalledTimes(1); + + assert.strictEqual(crr._nProcessed, 1); + assert.strictEqual(crr._nSkipped, 0); + assert.strictEqual(crr._nUpdated, 1); + assert.strictEqual(crr._nErrors, 0); + done(); + }); + }); + + it('set inputKeyMarker', done => { + const crr = initializeCrrWithMocks({ + buckets: ['bucket0'], + workers: 10, + replicationStatusToProcess: ['NEW'], + keyMarker: 'key1', + }, logger); + + crr.run(err => { + assert.ifError(err); + + expect(crr.s3.listObjectVersions).toHaveBeenCalledTimes(1); + expect(crr.s3.listObjectVersions).toHaveBeenNthCalledWith(1, { + Bucket: 'bucket0', + Prefix: undefined, + MaxKeys: undefined, + KeyMarker: 'key1', + VersionIdMarker: undefined, + }, expect.any(Function)); + + done(); + }); + }); + + it('set inputKeyMarker and inputVersionIdMarker', done => { + const crr = initializeCrrWithMocks({ + buckets: ['bucket0'], + workers: 10, + replicationStatusToProcess: ['NEW'], + keyMarker: 'key1', + versionIdMarker: 'vid1', + }, logger); + + crr.run(err => { + assert.ifError(err); + + expect(crr.s3.listObjectVersions).toHaveBeenCalledTimes(1); + expect(crr.s3.listObjectVersions).toHaveBeenNthCalledWith(1, { + Bucket: 'bucket0', + Prefix: undefined, + MaxKeys: undefined, + KeyMarker: 'key1', + VersionIdMarker: 'vid1', + }, expect.any(Function)); + + done(); + }); + }); +}); diff --git a/tests/unit/CRR/crrExistingObjects.js b/tests/unit/CRR/crrExistingObjects.js new file mode 100644 index 00000000..85fe481a --- /dev/null +++ b/tests/unit/CRR/crrExistingObjects.js @@ -0,0 +1,187 @@ +const werelogs = require('werelogs'); +// Mock the entire werelogs module +const mockFatal = jest.fn(); +const mockInfo = jest.fn(); +const mockError = jest.fn(); + +jest.mock('werelogs', () => ({ + Logger: jest.fn().mockImplementation(() => ({ + fatal: mockFatal, + info: mockInfo, + error: mockError, + })), + configure: jest.fn(), +})); + +const mockCrrRun = jest.fn(cb => cb(null)); // Simulate successful run +const mockCrrStop = jest.fn(); + +jest.mock('../../../CRR/ReplicationStatusUpdater', () => jest.fn().mockImplementation(() => ({ + run: mockCrrRun, + stop: mockCrrStop, +}))); + +describe('crrExistingObjects', () => { + let originalEnv; + + beforeAll(() => { + process.exit = jest.fn(); + }); + + beforeEach(() => { + // Save the original process.env + originalEnv = { ...process.env }; + + // Reset the argv array to its default state + process.argv = ['node', 'yourscript.js']; + // Reset the mock to clear previous call history + process.exit.mockReset(); + + // Clear the mocks before each test + // ReplicationStatusUpdater.mockClear(); + mockFatal.mockClear(); + mockInfo.mockClear(); + mockError.mockClear(); + mockCrrRun.mockClear(); + mockCrrStop.mockClear(); + + // Clear the module cache before each test + jest.resetModules(); + }); + + afterEach(() => { + // Restore the original process.env after each test + process.env = originalEnv; + }); + + afterAll(() => { + // Restore the original process.exit + process.exit = jest.requireActual('process').exit; + }); + + test('should run successfully', () => { + process.argv[2] = 'bucket1,bucket2'; + + process.env.TARGET_REPLICATION_STATUS = 'NEW,PENDING'; + process.env.WORKERS = '10'; + process.env.ACCESS_KEY = 'testAccessKey'; + process.env.SECRET_KEY = 'testSecretKey'; + process.env.ENDPOINT = 'http://fake.endpoint'; + process.env.SITE_NAME = 'testSite'; + process.env.STORAGE_TYPE = 'testStorage'; + process.env.TARGET_PREFIX = 'testPrefix'; + process.env.LISTING_LIMIT = '2000'; + process.env.MAX_UPDATES = '100'; + process.env.MAX_SCANNED = '1000'; + process.env.KEY_MARKER = 'testKeyMarker'; + process.env.VERSION_ID_MARKER = 'testVersionIdMarker'; + process.env.DEBUG = '0'; + + require('../../../crrExistingObjects'); + + const ReplicationStatusUpdater = require('../../../CRR/ReplicationStatusUpdater'); + + expect(ReplicationStatusUpdater).toHaveBeenCalledTimes(1); + expect(ReplicationStatusUpdater).toHaveBeenCalledWith({ + buckets: ['bucket1', 'bucket2'], + replicationStatusToProcess: ['NEW', 'PENDING'], + workers: 10, + accessKey: 'testAccessKey', + secretKey: 'testSecretKey', + endpoint: 'http://fake.endpoint', + siteName: 'testSite', + storageType: 'testStorage', + targetPrefix: 'testPrefix', + listingLimit: 2000, + maxUpdates: 100, // or a more specific expectation + maxScanned: 1000, + keyMarker: 'testKeyMarker', + versionIdMarker: 'testVersionIdMarker', + }, expect.anything()); + + expect(mockFatal).not.toHaveBeenCalled(); + expect(mockError).not.toHaveBeenCalled(); + expect(process.exit).not.toHaveBeenCalled(); + expect(mockCrrRun).toHaveBeenCalled(); + }); + + test('should set the default parameter when unspecified', () => { + process.argv[2] = 'bucket1,bucket2'; + + process.env.WORKERS = '10'; + process.env.ACCESS_KEY = 'testAccessKey'; + process.env.SECRET_KEY = 'testSecretKey'; + process.env.ENDPOINT = 'http://fake.endpoint'; + + require('../../../crrExistingObjects'); + + const ReplicationStatusUpdater = require('../../../CRR/ReplicationStatusUpdater'); + + expect(ReplicationStatusUpdater).toHaveBeenCalledTimes(1); + expect(ReplicationStatusUpdater).toHaveBeenCalledWith({ + buckets: ['bucket1', 'bucket2'], + replicationStatusToProcess: ['NEW'], + workers: 10, + accessKey: 'testAccessKey', + secretKey: 'testSecretKey', + endpoint: 'http://fake.endpoint', + storageType: '', + targetPrefix: undefined, + listingLimit: 1000, + maxUpdates: undefined, + maxScanned: undefined, + keyMarker: undefined, + versionIdMarker: undefined, + }, expect.anything()); + + expect(mockFatal).not.toHaveBeenCalled(); + expect(mockError).not.toHaveBeenCalled(); + expect(process.exit).not.toHaveBeenCalled(); + expect(mockCrrRun).toHaveBeenCalled(); + }); + + test('should exit if no bucket is provided', () => { + process.argv[2] = ''; + process.env.ENDPOINT = 'http://example.com'; + process.env.ACCESS_KEY = 'accesskey'; + process.env.SECRET_KEY = 'secretkey'; + + require('../../../crrExistingObjects'); + + expect(mockFatal).toHaveBeenCalledWith('No buckets given as input! Please provide a comma-separated list of buckets'); + expect(process.exit).toHaveBeenCalledWith(1); + }); + + test('should exit if no endpoint is provided', () => { + process.argv[2] = 'bucket0'; + process.env.ACCESS_KEY = 'accesskey'; + process.env.SECRET_KEY = 'secretkey'; + + require('../../../crrExistingObjects'); + + expect(mockFatal).toHaveBeenCalledWith('ENDPOINT not defined!'); + expect(process.exit).toHaveBeenCalledWith(1); + }); + + test('should exit if no access key is provided', () => { + process.argv[2] = 'bucket0'; + process.env.ENDPOINT = 'http://example.com'; + process.env.SECRET_KEY = 'secretkey'; + + require('../../../crrExistingObjects'); + + expect(mockFatal).toHaveBeenCalledWith('ACCESS_KEY not defined'); + expect(process.exit).toHaveBeenCalledWith(1); + }); + + test('should exit if no secret key is provided', () => { + process.argv[2] = 'bucket0'; + process.env.ENDPOINT = 'http://example.com'; + process.env.ACCESS_KEY = 'accesskey'; + + require('../../../crrExistingObjects'); + + expect(mockFatal).toHaveBeenCalledWith('SECRET_KEY not defined'); + expect(process.exit).toHaveBeenCalledWith(1); + }); +}); diff --git a/tests/utils/crr.js b/tests/utils/crr.js new file mode 100644 index 00000000..6554dbcd --- /dev/null +++ b/tests/utils/crr.js @@ -0,0 +1,195 @@ +const ReplicationStatusUpdater = require('../../CRR/ReplicationStatusUpdater'); + +const listVersionRes = { + IsTruncated: false, + Versions: [ + { + ETag: '"dabcc341ecab339daf766e1cddd5d1bb"', + ChecksumAlgorithm: [], + Size: 3263, + StorageClass: 'STANDARD', + Key: 'key0', + VersionId: 'aJdO148N3LjN00000000001I4j3QKItW', + IsLatest: true, + LastModified: '2024-01-05T13:11:31.861Z', + Owner: { + DisplayName: 'bart', + ID: '0', + }, + }, + ], + DeleteMarkers: [], + Name: 'bucket0', + MaxKeys: 1000, + CommonPrefixes: [], +}; + +const listVersionWithMarkerRes = { + IsTruncated: true, + Versions: [ + { + ETag: '"dabcc341ecab339daf766e1cddd5d1bb"', + ChecksumAlgorithm: [], + Size: 3263, + StorageClass: 'STANDARD', + Key: 'key0', + VersionId: 'aJdO148N3LjN00000000001I4j3QKItW', + IsLatest: true, + LastModified: '2024-01-05T13:11:31.861Z', + Owner: { + DisplayName: 'bart', + ID: '0', + }, + }, + ], + DeleteMarkers: [], + Name: 'bucket0', + MaxKeys: 1, + CommonPrefixes: [], + NextVersionIdMarker: 'aJdO148N3LjN00000000001I4j3QKItW', + NextKeyMarker: 'key0', +}; + +const listVersionsRes = { + IsTruncated: false, + Versions: [ + { + ETag: '"dabcc341ecab339daf766e1cddd5d1bb"', + ChecksumAlgorithm: [], + Size: 3263, + StorageClass: 'STANDARD', + Key: 'key0', + VersionId: 'aJdO148N3LjN00000000001I4j3QKItW', + IsLatest: true, + LastModified: '2024-01-05T13:11:31.861Z', + Owner: { + DisplayName: 'bart', + ID: '0', + }, + }, + { + ETag: '"dabcc341ecab339daf766e1cddd5d1bb"', + ChecksumAlgorithm: [], + Size: 3263, + StorageClass: 'STANDARD', + Key: 'key0', + VersionId: 'aJdO148N3LjN00000000001I4j3QKItV', + IsLatest: true, + LastModified: '2024-01-05T13:11:32.861Z', + Owner: { + DisplayName: 'bart', + ID: '0', + }, + }, + ], + DeleteMarkers: [], + Name: 'bucket0', + MaxKeys: 1000, + CommonPrefixes: [], +}; + +const getBucketReplicationRes = { + ReplicationConfiguration: { + Role: 'arn:aws:iam::root:role/s3-replication-role', + Rules: [ + { + ID: 'r0', + Prefix: '', + Status: 'Enabled', + Destination: { + Bucket: 'arn:aws:s3:::sourcebucket', + StorageClass: 'aws-location', + }, + }, + ], + }, +}; + +const objectMd = { + 'owner-display-name': 'bart', + 'owner-id': 'f2ae8ca93fb44fe7ef409dbfdc0e0873b921fe4183364ede136be8c44756acda', + 'content-length': 3263, + 'content-md5': 'dabcc341ecab339daf766e1cddd5d1bb', + 'x-amz-version-id': 'null', + 'x-amz-server-version-id': '', + 'x-amz-storage-class': 'STANDARD', + 'x-amz-server-side-encryption': '', + 'x-amz-server-side-encryption-aws-kms-key-id': '', + 'x-amz-server-side-encryption-customer-algorithm': '', + 'x-amz-website-redirect-location': '', + 'acl': { + Canned: 'private', FULL_CONTROL: [], WRITE_ACP: [], READ: [], READ_ACP: [], + }, + 'key': '', + 'location': [{ + key: '7751DDF49AA3B289C2D261ED1FD09A596D5D3F20', + size: 3263, + start: 0, + dataStoreName: 'us-east-1', + dataStoreType: 'scality', + dataStoreETag: '1:dabcc341ecab339daf766e1cddd5d1bb', + }], + 'isDeleteMarker': false, + 'tags': {}, + 'replicationInfo': { + status: '', + backends: [], + content: [], + destination: '', + storageClass: '', + role: '', + storageType: '', + dataStoreVersionId: '', + }, + 'dataStoreName': 'us-east-1', + 'originOp': 's3:ObjectCreated:Put', + 'last-modified': '2024-01-05T13:11:31.861Z', + 'md-model-version': 3, + 'versionId': '98295539708053999999RG001 ', +}; + +const getMetadataRes = { + Body: JSON.stringify(objectMd), +}; + +const putMetadataRes = { versionId: '98295539708053999999RG001 ' }; + +/** + * Initializes the ReplicationStatusUpdater class with mock methods for testing. + * + * This function creates an instance of the ReplicationStatusUpdater class using the provided configuration. + * It then replaces certain methods of this instance with Jest mock functions. These mocked methods include + * `listObjectVersions`, `getBucketReplication` for the S3 client, and `getMetadata`, `putMetadata` for the + * Backbeat client. These mock functions are designed to simulate the behavior of the actual AWS S3 and Backbeat + * clients without making real API calls, which is useful for isolated testing of the ReplicationStatusUpdater + * functionality. + * + * @param {Object} config - The configuration object used to initialize the ReplicationStatusUpdater instance. + * @param {Logger} log - The logging object to be used by the ReplicationStatusUpdater. + * @returns {ReplicationStatusUpdater} An instance of ReplicationStatusUpdater with mocked methods. + */ +function initializeCrrWithMocks(config, log) { + const crr = new ReplicationStatusUpdater(config, log); + + const listObjectVersionsMock = jest.fn((params, cb) => cb(null, listVersionRes)); + const getBucketReplicationMock = jest.fn((params, cb) => cb(null, getBucketReplicationRes)); + const getMetadataMock = jest.fn((params, cb) => cb(null, getMetadataRes)); + const putMetadataMock = jest.fn((params, cb) => cb(null, putMetadataRes)); + + crr.s3.listObjectVersions = listObjectVersionsMock; + crr.s3.getBucketReplication = getBucketReplicationMock; + crr.bb.getMetadata = getMetadataMock; + crr.bb.putMetadata = putMetadataMock; + + return crr; +} + +module.exports = { + initializeCrrWithMocks, + listVersionRes, + listVersionsRes, + listVersionWithMarkerRes, + getBucketReplicationRes, + getMetadataRes, + putMetadataRes, +};