From 13f91535673d4884b66b00f72ef49a8ed00e9cbe Mon Sep 17 00:00:00 2001 From: Nicolas Humbert Date: Tue, 26 Dec 2023 10:58:15 +0100 Subject: [PATCH] S3UTILS-54 crrExistingObjects is compatible with both S3C and Artesca The Pull Request introduces these key changes: Switch to S3 and Backbeat API: crrExistingObjects now uses S3 and Backbeat APIs instead of direct metadata calls, enhancing security and maintainability but potentially slowing down performance due to added abstraction. It's expected to be minor, but will undergo testing. Refactoring for testability: The script has been refactored to make it more modular and easier to test, improving code clarity and allowing for better unit/functional testing. Adding unit/functional tests with mocks: Unit/functional tests using mocked S3 and Backbeat APIs have been added, providing a controlled environment for more comprehensive and reliable testing of the script's functionality. --- CRR/ReplicationStatusUpdater.js | 409 +++++++++++++++++++++ CRR/clients.js | 74 ++++ CrrExistingObjects/listingParser.js | 24 -- CrrExistingObjects/metadataClient.js | 20 - CrrExistingObjects/metadataUtils.js | 211 ----------- crrExistingObjects.js | 346 +++-------------- tests/unit/CRR/ReplicationStatusUpdater.js | 278 ++++++++++++++ tests/unit/CRR/crrExistingObjects.js | 187 ++++++++++ tests/utils/crr.js | 195 ++++++++++ 9 files changed, 1189 insertions(+), 555 deletions(-) create mode 100644 CRR/ReplicationStatusUpdater.js create mode 100644 CRR/clients.js delete mode 100644 CrrExistingObjects/listingParser.js delete mode 100644 CrrExistingObjects/metadataClient.js delete mode 100644 CrrExistingObjects/metadataUtils.js create mode 100644 tests/unit/CRR/ReplicationStatusUpdater.js create mode 100644 tests/unit/CRR/crrExistingObjects.js create mode 100644 tests/utils/crr.js diff --git a/CRR/ReplicationStatusUpdater.js b/CRR/ReplicationStatusUpdater.js new file mode 100644 index 00000000..a8c07027 --- /dev/null +++ b/CRR/ReplicationStatusUpdater.js @@ -0,0 +1,409 @@ +const { + doWhilst, eachSeries, eachLimit, waterfall, series, +} = require('async'); +const werelogs = require('werelogs'); +const { ObjectMD } = require('arsenal').models; + +const { setupClients } = require('./clients'); + +const LOG_PROGRESS_INTERVAL_MS = 10000; +const AWS_SDK_REQUEST_RETRIES = 100; +const AWS_SDK_REQUEST_DELAY_MS = 30; + +class ReplicationStatusUpdater { + /** + * @param {Object} params - An object containing the configuration parameters for the instance. + * @param {Array} params.buckets - An array of bucket names to process. + * @param {Array} params.replicationStatusToProcess - Replication status to be processed. + * @param {number} params.workers - Number of worker threads for processing. + * @param {string} params.accessKey - Access key for AWS SDK authentication. + * @param {string} params.secretKey - Secret key for AWS SDK authentication. + * @param {string} params.endpoint - Endpoint URL for the S3 service. + * @param {Object} log - The logging object used for logging purposes within the instance. + * + * @param {string} [params.siteName] - (Optional) Name of the destination site. + * @param {string} [params.storageType] - (Optional) Type of the destination site (aws_s3, azure...). + * @param {string} [params.targetPrefix] - (Optional) Prefix to target for replication. + * @param {number} [params.listingLimit] - (Optional) Limit for listing objects. + * @param {number} [params.maxUpdates] - (Optional) Maximum number of updates to perform. + * @param {number} [params.maxScanned] - (Optional) Maximum number of items to scan. + * @param {string} [params.keyMarker] - (Optional) Key marker for resuming object listing. + * @param {string} [params.versionIdMarker] - (Optional) Version ID marker for resuming object listing. + */ + constructor(params, log) { + const { + buckets, + replicationStatusToProcess, + workers, + accessKey, + secretKey, + endpoint, + siteName, + storageType, + targetPrefix, + listingLimit, + maxUpdates, + maxScanned, + keyMarker, + versionIdMarker, + } = params; + + // inputs + this.buckets = buckets; + this.replicationStatusToProcess = replicationStatusToProcess; + this.workers = workers; + this.accessKey = accessKey; + this.secretKey = secretKey; + this.endpoint = endpoint; + this.siteName = siteName; + this.storageType = storageType; + this.targetPrefix = targetPrefix; + this.listingLimit = listingLimit; + this.maxUpdates = maxUpdates; + this.maxScanned = maxScanned; + this.inputKeyMarker = keyMarker; + this.inputVersionIdMarker = versionIdMarker; + this.log = log; + + this._setupClients(); + + this.logProgressInterval = setInterval(this._logProgress.bind(this), LOG_PROGRESS_INTERVAL_MS); + + // intenal state + this._nProcessed = 0; + this._nSkipped = 0; + this._nUpdated = 0; + this._nErrors = 0; + this._bucketInProgress = null; + this._VersionIdMarker = null; + this._KeyMarker = null; + } + + /** + * Sets up and initializes the S3 and Backbeat client instances. + * + * @returns {void} This method does not return a value; instead, it sets the S3 and Backbeat clients. + */ + _setupClients() { + const { s3, bb } = setupClients({ + accessKey: this.accessKey, + secretKey: this.secretKey, + endpoint: this.endpoint, + }, this.log); + + this.s3 = s3; + this.bb = bb; + } + + /** + * Logs the progress of the CRR process at regular intervals. + * @private + * @returns {void} + */ + _logProgress() { + this.log.info('progress update', { + updated: this._nUpdated, + skipped: this._nSkipped, + errors: this._nErrors, + bucket: this._bucketInProgress || null, + keyMarker: this._KeyMarker || null, + versionIdMarker: this._VersionIdMarker || null, + }); + } + + + /** + * Determines if an object should be updated based on its replication metadata properties. + * @private + * @param {ObjectMD} objMD - The metadata of the object. + * @returns {boolean} True if the object should be updated. + */ + _objectShouldBeUpdated(objMD) { + return this.replicationStatusToProcess.some(filter => { + if (filter === 'NEW') { + return (!objMD.getReplicationInfo() + || objMD.getReplicationInfo().status === ''); + } + return (objMD.getReplicationInfo() + && objMD.getReplicationInfo().status === filter); + }); + } + + /** + * Marks an object as pending for replication. + * @private + * @param {string} bucket - The bucket name. + * @param {string} key - The object key. + * @param {string} versionId - The object version ID. + * @param {string} storageClass - The storage class for replication. + * @param {Object} repConfig - The replication configuration. + * @param {Function} cb - Callback function. + * @returns {void} + */ + _markObjectPending( + bucket, + key, + versionId, + storageClass, + repConfig, + cb, + ) { + let objMD; + let skip = false; + return waterfall([ + // get object blob + next => this.bb.getMetadata({ + Bucket: bucket, + Key: key, + VersionId: versionId, + }, next), + (mdRes, next) => { + // NOTE: The Arsenal Object Metadata schema version 8.1 is being used for both Ring S3C and Artesca, + // it is acceptable because the 8.1 schema only adds extra properties to the 7.10 schema. + // This is beneficial because: + // - Forward compatibility: Having the 8.1 properties in place now ensures that + // S3C is compatible with the 8.1 schema, which could be useful if we plan to upgrade + // from 7.10 to 8.1 in the future. + // - No impact on current functionality: The extra properties from the 8.1 + // schema do not interfere with the current functionalities of the 7.10 environment, + // so there is no harm in keeping them. S3C should ignore them without causing any issues. + // - Simple codebase: Not having to remove these properties simplifies the codebase of s3utils. + // Less complexity and potential errors linked with conditionally removing metadata properties + // based on the version. + // - Single schema approach: Keeping a single, unified schema approach in s3utils can make the + // codebase easier to maintain and upgrade, as opposed to having multiple branches or versions of + // the code for different schema versions. + objMD = new ObjectMD(JSON.parse(mdRes.Body)); + if (!this._objectShouldBeUpdated(objMD)) { + skip = true; + return process.nextTick(next); + } + // Initialize replication info, if missing + // This is particularly important if the object was created before + // enabling replication on the bucket. + if (!objMD.getReplicationInfo() + || !objMD.getReplicationSiteStatus(storageClass)) { + const { Rules, Role } = repConfig; + const destination = Rules[0].Destination.Bucket; + // set replication properties + const ops = objMD.getContentLength() === 0 ? ['METADATA'] + : ['METADATA', 'DATA']; + const backends = [{ + site: storageClass, + status: 'PENDING', + dataStoreVersionId: '', + }]; + const replicationInfo = { + status: 'PENDING', + backends, + content: ops, + destination, + storageClass, + role: Role, + storageType: this.storageType, + }; + objMD.setReplicationInfo(replicationInfo); + } + + objMD.setReplicationSiteStatus(storageClass, 'PENDING'); + objMD.setReplicationStatus('PENDING'); + objMD.updateMicroVersionId(); + const md = objMD.getSerialized(); + return this.bb.putMetadata({ + Bucket: bucket, + Key: key, + VersionId: versionId, + ContentLength: Buffer.byteLength(md), + Body: md, + }, next); + }, + ], err => { + ++this._nProcessed; + if (err) { + ++this._nErrors; + this.log.error('error updating object', { + bucket, key, versionId, error: err.message, + }); + cb(); + return; + } + if (skip) { + ++this._nSkipped; + } else { + ++this._nUpdated; + } + cb(); + }); + } + + /** + * Lists object versions for a bucket. + * @private + * @param {string} bucket - The bucket name. + * @param {string} VersionIdMarker - The version ID marker for pagination. + * @param {string} KeyMarker - The key marker for pagination. + * @param {Function} cb - Callback function. + * @returns {void} + */ + _listObjectVersions(bucket, VersionIdMarker, KeyMarker, cb) { + return this.s3.listObjectVersions({ + Bucket: bucket, + MaxKeys: this.listingLimit, + Prefix: this.targetPrefix, + VersionIdMarker, + KeyMarker, + }, cb); + } + + /** + * Marks pending replication for listed object versions. + * @private + * @param {string} bucket - The bucket name. + * @param {Array} versions - Array of object versions. + * @param {Function} cb - Callback function. + * @returns {void} + */ + _markPending(bucket, versions, cb) { + const options = { Bucket: bucket }; + waterfall([ + next => this.s3.getBucketReplication(options, (err, res) => { + if (err) { + this.log.error('error getting bucket replication', { error: err }); + return next(err); + } + return next(null, res.ReplicationConfiguration); + }), + (repConfig, next) => { + const { Rules } = repConfig; + const storageClass = Rules[0].Destination.StorageClass || this.siteName; + if (!storageClass) { + const errMsg = 'missing SITE_NAME environment variable, must be set to' + + ' the value of "site" property in the CRR configuration'; + this.log.error(errMsg); + return next(new Error(errMsg)); + } + return eachLimit(versions, this.workers, (i, apply) => { + const { Key, VersionId } = i; + this._markObjectPending(bucket, Key, VersionId, storageClass, repConfig, apply); + }, next); + }, + ], cb); + } + + /** + * Triggers CRR process on a specific bucket. + * @private + * @param {string} bucketName - The name of the bucket. + * @param {Function} cb - Callback function. + * @returns {void} + */ + _triggerCRROnBucket(bucketName, cb) { + const bucket = bucketName.trim(); + this._bucketInProgress = bucket; + this.log.info(`starting task for bucket: ${bucket}`); + if (this.inputKeyMarker || this.inputVersionIdMarker) { + // resume from where we left off in previous script launch + this._KeyMarker = this.inputKeyMarker; + this._VersionIdMarker = this.inputVersionIdMarker; + this.inputKeyMarker = undefined; + this.inputVersionIdMarker = undefined; + this.log.info(`resuming bucket: ${bucket} at: KeyMarker=${this._KeyMarker} ` + + `VersionIdMarker=${this._VersionIdMarker}`); + } + return doWhilst( + done => this._listObjectVersions( + bucket, + this._VersionIdMarker, + this._KeyMarker, + (err, data) => { + if (err) { + this.log.error('error listing object versions', { error: err }); + return done(err); + } + return this._markPending(bucket, data.Versions.concat(data.DeleteMarkers), err => { + if (err) { + return done(err); + } + this._VersionIdMarker = data.NextVersionIdMarker; + this._KeyMarker = data.NextKeyMarker; + return done(); + }); + }, + ), + () => { + if (this._nUpdated >= this.maxUpdates || this._nProcessed >= this.maxScanned) { + this._logProgress(); + let remainingBuckets; + if (this._VersionIdMarker || this._KeyMarker) { + // next bucket to process is still the current one + remainingBuckets = this.buckets.slice( + this.buckets.findIndex(bucket => bucket === bucketName), + ); + } else { + // next bucket to process is the next in bucket list + remainingBuckets = this.buckets.slice( + this.buckets.findIndex(bucket => bucket === bucketName) + 1, + ); + } + let message = 'reached ' + + `${this._nUpdated >= this.maxUpdates ? 'update' : 'scanned'} ` + + 'count limit, resuming from this ' + + 'point can be achieved by re-running the script with ' + + `the bucket list "${remainingBuckets.join(',')}"`; + if (this._VersionIdMarker || this._KeyMarker) { + message += ' and the following environment variables set: ' + + `KEY_MARKER=${this._KeyMarker} ` + + `VERSION_ID_MARKER=${this._VersionIdMarker}`; + } + this.log.info(message); + return false; + } + if (this._VersionIdMarker || this._KeyMarker) { + return true; + } + return false; + }, + err => { + this._bucketInProgress = null; + if (err) { + this.log.error('error marking objects for crr', { bucket }); + cb(err); + return; + } + this._logProgress(); + this.log.info(`completed task for bucket: ${bucket}`); + cb(); + }, + ); + } + + /** + * Runs the CRR process on all configured buckets. + * @param {Function} cb - Callback function. + * @returns {void} + */ + run(cb) { + return eachSeries(this.buckets, this._triggerCRROnBucket.bind(this), err => { + clearInterval(this.logProgressInterval); + if (err) { + cb(err); + return; + } + cb(); + }); + } + + /** + * Stops the execution of the CRR process. + * NOTE: This method terminates the node.js process, and hence it does not return a value. + * @returns {void} + */ + stop() { + this.log.warn('stopping execution'); + this._logProgress(); + clearInterval(this.logProgressInterval); + process.exit(1); + } +} + +module.exports = ReplicationStatusUpdater; diff --git a/CRR/clients.js b/CRR/clients.js new file mode 100644 index 00000000..988de987 --- /dev/null +++ b/CRR/clients.js @@ -0,0 +1,74 @@ +const AWS = require('aws-sdk'); +const http = require('http'); +const BackbeatClient = require('../BackbeatClient'); + +const AWS_SDK_REQUEST_DELAY_MS = 30; +const AWS_SDK_REQUEST_RETRIES = 100; +const LOG_PROGRESS_INTERVAL_MS = 10000; + +/** + * Sets up and configures AWS S3 and Backbeat clients. + * + * This function initializes and configures clients for S3 and Backbeat services, + * using provided access credentials and endpoint configurations. It includes a custom + * backoff strategy for AWS SDK requests to handle retries in case of errors. + * The clients are set with specific options, such as maximum retries and custom + * backoff strategies for S3 requests. + * + * @param {Object} config - The configuration object for the clients. + * @param {string} config.accessKey - The access key for AWS services. + * @param {string} config.secretKey - The secret key for AWS services. + * @param {string} config.endpoint - The endpoint URL for the AWS services. + * @param {Function} log - The logging function for error logging. + * @returns {Object} An object containing initialized S3 and Backbeat clients. + */ +function setupClients({ + accessKey, + secretKey, + endpoint, +}, log) { + const awsConfig = { + accessKeyId: accessKey, + secretAccessKey: secretKey, + endpoint, + region: 'us-east-1', + sslEnabled: false, + s3ForcePathStyle: true, + apiVersions: { s3: '2006-03-01' }, + signatureVersion: 'v4', + signatureCache: false, + httpOptions: { + timeout: 0, + agent: new http.Agent({ keepAlive: true }), + }, + }; + + /** + * Custom backoff strategy for AWS SDK requests. + * @param {number} retryCount - The current retry attempt. + * @param {Error} error - The error that caused the retry. + * @returns {number} The delay in milliseconds before the next retry. + */ + function customBackoffStrategy(retryCount, error) { + this.log.error('aws sdk request error', { error, retryCount }); + // The delay is not truly exponential; it resets to the minimum after every 10 calls, + // with a maximum delay of 15 seconds. + return AWS_SDK_REQUEST_DELAY_MS * (2 ** (retryCount % 10)); + } + + // Specific options for S3 requests + const s3SpecificOptions = { + maxRetries: AWS_SDK_REQUEST_RETRIES, + customBackoff: customBackoffStrategy, + }; + + // Create an S3 client instance + const s3 = new AWS.S3({ ...awsConfig, ...s3SpecificOptions }); + + // Create a BackbeatClient instance + const bb = new BackbeatClient(awsConfig); + + return { s3, bb }; +} + +module.exports = { setupClients }; diff --git a/CrrExistingObjects/listingParser.js b/CrrExistingObjects/listingParser.js deleted file mode 100644 index 5d47b9bd..00000000 --- a/CrrExistingObjects/listingParser.js +++ /dev/null @@ -1,24 +0,0 @@ -function listingParser(entries) { - if (!entries) { - return entries; - } - return entries.map(entry => { - const tmp = JSON.parse(entry.value); - return { - Key: entry.key, - Size: tmp['content-length'], - ETag: tmp['content-md5'], - VersionId: tmp.versionId, - IsNull: tmp.isNull, - IsDeleteMarker: tmp.isDeleteMarker, - LastModified: tmp['last-modified'], - Owner: { - DisplayName: tmp['owner-display-name'], - ID: tmp['owner-id'], - }, - StorageClass: tmp['x-amz-storage-class'], - }; - }); -} - -module.exports = listingParser; diff --git a/CrrExistingObjects/metadataClient.js b/CrrExistingObjects/metadataClient.js deleted file mode 100644 index 58d20c86..00000000 --- a/CrrExistingObjects/metadataClient.js +++ /dev/null @@ -1,20 +0,0 @@ -const { MetadataWrapper } = require('arsenal').storage.metadata; -const werelogs = require('werelogs'); -const createMongoParams = require('../utils/createMongoParams'); -const listingParser = require('./listingParser'); - -const loggerConfig = { - level: 'info', - dump: 'error', -}; -werelogs.configure(loggerConfig); - -const log = new werelogs.Logger('s3utils::crrExistingObjects'); -const implName = 'mongodb'; -const params = { - customListingParser: listingParser, - mongodb: createMongoParams(log, { readPreference: 'primary' }), -}; -const metadata = new MetadataWrapper(implName, params, null, log); - -module.exports = metadata; diff --git a/CrrExistingObjects/metadataUtils.js b/CrrExistingObjects/metadataUtils.js deleted file mode 100644 index 8c197baa..00000000 --- a/CrrExistingObjects/metadataUtils.js +++ /dev/null @@ -1,211 +0,0 @@ -const { errors, versioning } = require('arsenal'); -const metadataClient = require('./metadataClient'); - -const versionIdUtils = versioning.VersionID; - -const { GENERATE_INTERNAL_VERSION_ID } = process.env; -const REPLICATION_GROUP_ID = process.env.REPLICATION_GROUP_ID || 'RG001'; -// Use Arsenal function to generate a version ID used internally by metadata -// for null versions that are created before bucket versioning is configured -const nonVersionedObjId = versionIdUtils.getInfVid(REPLICATION_GROUP_ID); - -function _processVersions(list) { - /* eslint-disable no-param-reassign */ - list.NextVersionIdMarker = list.NextVersionIdMarker - ? versionIdUtils.encode(list.NextVersionIdMarker) - : list.NextVersionIdMarker; - - list.Versions.forEach(v => { - v.VersionId = v.VersionId - ? versionIdUtils.encode(v.VersionId) : v.VersionId; - }); - /* eslint-enable no-param-reassign */ - return list; -} - -function listObjectVersions(params, log, cb) { - const bucketName = params.Bucket; - const listingParams = { - listingType: 'DelimiterVersions', - maxKeys: params.MaxKeys, - prefix: params.Prefix, - keyMarker: params.KeyMarker, - versionIdMarker: params.VersionIdMarker, - }; - log.debug('listing object versions', { - method: 'metadataUtils.listObjectVersions', - listingParams, - }); - return metadataClient.listObject( - bucketName, - listingParams, - log, - (err, list) => { - if (err) { - return cb(err); - } - return cb(null, _processVersions(list)); - }, - ); -} - -function _formatConfig(config) { - const { role, destination, rules } = config; - const Rules = rules.map(rule => { - const { - prefix, enabled, storageClass, id, - } = rule; - return { - ID: id, - Prefix: prefix, - Status: enabled ? 'Enabled' : 'Disabled', - Destination: { - Bucket: destination, - StorageClass: (storageClass || ''), - }, - }; - }); - return { - ReplicationConfiguration: { - Role: role, - Rules, - }, - }; -} - -function getBucketReplication(options, log, cb) { - const bucketName = options.Bucket; - log.debug('getting bucket replication', { - method: 'metadataUtils.getBucketReplication', - bucket: bucketName, - }); - return metadataClient.getBucket(bucketName, log, (err, data) => { - if (err) { - return cb(err); - } - const replConf = _formatConfig(data._replicationConfiguration); - return cb(null, replConf); - }); -} - -function _getNullVersion(objMD, bucketName, objectKey, log, cb) { - const options = {}; - if (objMD.isNull || !objMD.versionId) { - log.debug('found null version'); - return process.nextTick(() => cb(null, objMD)); - } - if (objMD.nullVersionId) { - log.debug('null version exists, get the null version'); - options.versionId = objMD.nullVersionId; - return metadataClient.getObjectMD( - bucketName, - objectKey, - options, - log, - cb, - ); - } - return process.nextTick(() => cb()); -} - -function getMetadata(params, log, cb) { - const { Bucket, Key } = params; - let versionId = params.VersionId; - log.debug('getting object metadata', { - method: 'metadataUtils.getMetadata', - bucket: Bucket, - objectKey: Key, - versionId, - }); - if (versionId && versionId !== 'null') { - versionId = versionIdUtils.decode(versionId); - } - if (versionId instanceof Error) { - const errMsg = 'Invalid version id specified'; - return cb(errors.InvalidArgument.customizeDescription(errMsg)); - } - const mdParams = { - versionId, - }; - return metadataClient.getObjectMD( - Bucket, - Key, - mdParams, - log, - (err, data) => { - if (err) { - return cb(err); - } - if (data && versionId === 'null') { - return _getNullVersion( - data, - Bucket, - Key, - log, - (err, nullVer) => { - if (err) { - return cb(err); - } - return cb(null, nullVer); - }, - ); - } - return cb(null, data); - }, - ); -} - -function getOptions(objMD) { - const options = {}; - - if (objMD.versionId === undefined) { - if (!GENERATE_INTERNAL_VERSION_ID) { - return options; - } - - objMD.setIsNull(true); - objMD.setVersionId(nonVersionedObjId); - - options.nullVersionId = objMD.versionId; - // non-versioned (non-null) MPU objects don't have a - // replay ID, so don't reference their uploadId - if (objMD.uploadId) { - options.nullUploadId = objMD.uploadId; - } - } - - // specify both 'versioning' and 'versionId' to create a "new" - // version (updating master as well) but with specified versionId - options.versioning = true; - options.versionId = objMD.versionId; - return options; -} - -function putMetadata(params, log, cb) { - const { Bucket, Key, Body: objMD } = params; - const options = getOptions(objMD); - - log.debug('updating object metadata', { - method: 'metadataUtils.putMetadata', - bucket: Bucket, - objectKey: Key, - versionId: objMD.versionId, - }); - // If the object is from a source bucket without versioning (i.e. NFS), - // then we want to create a version for the replica object even though - // none was provided in the object metadata value. - if (objMD.replicationInfo.isNFS) { - const isReplica = objMD.replicationInfo.status === 'REPLICA'; - options.versioning = isReplica; - objMD.replicationInfo.isNFS = !isReplica; - } - return metadataClient.putObjectMD(Bucket, Key, objMD, options, log, cb); -} - -module.exports = { - metadataClient, - listObjectVersions, - getBucketReplication, - getMetadata, - putMetadata, -}; diff --git a/crrExistingObjects.js b/crrExistingObjects.js index 06312574..21f07500 100644 --- a/crrExistingObjects.js +++ b/crrExistingObjects.js @@ -1,9 +1,5 @@ -const { - doWhilst, eachSeries, eachLimit, waterfall, series, -} = require('async'); const werelogs = require('werelogs'); -const { ObjectMD } = require('arsenal').models; -const metadataUtil = require('./CrrExistingObjects/metadataUtils'); +const ReplicationStatusUpdater = require('./CRR/ReplicationStatusUpdater'); const logLevel = Number.parseInt(process.env.DEBUG, 10) === 1 ? 'debug' : 'info'; @@ -25,18 +21,33 @@ const MAX_UPDATES = (process.env.MAX_UPDATES && Number.parseInt(process.env.MAX_UPDATES, 10)); const MAX_SCANNED = (process.env.MAX_SCANNED && Number.parseInt(process.env.MAX_SCANNED, 10)); -let { KEY_MARKER } = process.env; -let { VERSION_ID_MARKER } = process.env; -const { GENERATE_INTERNAL_VERSION_ID } = process.env; +const { KEY_MARKER } = process.env; +const { VERSION_ID_MARKER } = process.env; + +const { + ACCESS_KEY, + SECRET_KEY, + ENDPOINT, +} = process.env; const LISTING_LIMIT = (process.env.LISTING_LIMIT && Number.parseInt(process.env.LISTING_LIMIT, 10)) || 1000; -const LOG_PROGRESS_INTERVAL_MS = 10000; - if (!BUCKETS || BUCKETS.length === 0) { log.fatal('No buckets given as input! Please provide ' - + 'a comma-separated list of buckets'); + + 'a comma-separated list of buckets'); + process.exit(1); +} +if (!ENDPOINT) { + log.fatal('ENDPOINT not defined!'); + process.exit(1); +} +if (!ACCESS_KEY) { + log.fatal('ACCESS_KEY not defined'); + process.exit(1); +} +if (!SECRET_KEY) { + log.fatal('SECRET_KEY not defined'); process.exit(1); } if (!STORAGE_TYPE) { @@ -51,7 +62,7 @@ replicationStatusToProcess.forEach(state => { if (!['NEW', 'PENDING', 'COMPLETED', 'FAILED', 'REPLICA'].includes(state)) { log.fatal('invalid TARGET_REPLICATION_STATUS environment: must be a ' + 'comma-separated list of replication statuses to requeue, ' - + 'as NEW, PENDING, COMPLETED, FAILED or REPLICA.'); + + 'as NEW,PENDING,COMPLETED,FAILED,REPLICA.'); process.exit(1); } }); @@ -59,297 +70,32 @@ log.info('Objects with replication status ' + `${replicationStatusToProcess.join(' or ')} ` + 'will be reset to PENDING to trigger CRR'); -let nProcessed = 0; -let nSkipped = 0; -let nUpdated = 0; -let nErrors = 0; -let bucketInProgress = null; -let VersionIdMarker = null; -let KeyMarker = null; - -function _logProgress() { - log.info('progress update', { - updated: nUpdated, - skipped: nSkipped, - errors: nErrors, - bucket: bucketInProgress || null, - keyMarker: KeyMarker || null, - versionIdMarker: VersionIdMarker || null, - }); -} - -const logProgressInterval = setInterval(_logProgress, LOG_PROGRESS_INTERVAL_MS); - -function _objectShouldBeUpdated(objMD) { - return replicationStatusToProcess.some(filter => { - if (filter === 'NEW') { - return (!objMD.getReplicationInfo() - || objMD.getReplicationInfo().status === ''); - } - return (objMD.getReplicationInfo() - && objMD.getReplicationInfo().status === filter); - }); -} - -function _markObjectPending( - bucket, - key, - versionId, - storageClass, - repConfig, - cb, -) { - let objMD; - let skip = false; - return waterfall([ - // get object blob - next => metadataUtil.getMetadata({ - Bucket: bucket, - Key: key, - VersionId: versionId, - }, log, next), - (mdRes, next) => { - objMD = new ObjectMD(mdRes); - const md = objMD.getValue(); - if (!_objectShouldBeUpdated(objMD)) { - skip = true; - return next(); - } - if (objMD.getVersionId()) { - // The object already has an *internal* versionId, - // which exists when the object has been put on - // versioned or versioning-suspended bucket. Even if - // the listed version is "null", the object may have - // an actual internal versionId, only if the bucket - // was versioning-suspended when the object was put. - return next(); - } - if (!GENERATE_INTERNAL_VERSION_ID) { - // When the GENERATE_INTERNAL_VERSION_ID env variable is set, - // matching objects with no *internal* versionId will get - // "updated" to get an internal versionId. The external versionId - // will still be "null". - return next(); - } - // The object does not have an *internal* versionId, as it - // was put on a nonversioned bucket: do a first metadata - // update to generate one, just passing on the existing metadata - // blob. Note that the resulting key will still be nonversioned, - // but the following update will be able to create a versioned key - // for this object, so that replication can happen. The externally - // visible version will stay "null". - return metadataUtil.putMetadata({ - Bucket: bucket, - Key: key, - Body: md, - }, log, (err, putRes) => { - if (err) { - return next(err); - } - // No need to fetch the whole metadata again, simply - // update the one we have with the generated versionId. - objMD.setVersionId(putRes.versionId); - return next(); - }); - }, - // update replication info and put back object blob - next => { - if (skip) { - return next(); - } - - // Initialize replication info, if missing - if (!objMD.getReplicationInfo() - || !objMD.getReplicationSiteStatus(storageClass)) { - const { Rules, Role } = repConfig; - const destination = Rules[0].Destination.Bucket; - // set replication properties - const ops = objMD.getContentLength() === 0 ? ['METADATA'] - : ['METADATA', 'DATA']; - const backends = [{ - site: storageClass, - status: 'PENDING', - dataStoreVersionId: '', - }]; - const replicationInfo = { - status: 'PENDING', - backends, - content: ops, - destination, - storageClass, - role: Role, - storageType: STORAGE_TYPE, - }; - objMD.setReplicationInfo(replicationInfo); - } - - objMD.setReplicationSiteStatus(storageClass, 'PENDING'); - objMD.setReplicationStatus('PENDING'); - objMD.updateMicroVersionId(); - const md = objMD.getValue(); - return metadataUtil.putMetadata({ - Bucket: bucket, - Key: key, - Body: md, - }, log, next); - }, - ], err => { - ++nProcessed; - if (err) { - ++nErrors; - log.error('error updating object', { - bucket, key, versionId, error: err.message, - }); - return cb(); - } - if (skip) { - ++nSkipped; - } else { - ++nUpdated; - } - return cb(); - }); -} - -// list object versions -function _listObjectVersions(bucket, VersionIdMarker, KeyMarker, cb) { - return metadataUtil.listObjectVersions({ - Bucket: bucket, - MaxKeys: LISTING_LIMIT, - Prefix: TARGET_PREFIX, - VersionIdMarker, - KeyMarker, - }, log, cb); -} - -function _markPending(bucket, versions, cb) { - const options = { Bucket: bucket }; - waterfall([ - next => metadataUtil.getBucketReplication(options, log, (err, res) => { - if (err) { - log.error('error getting bucket replication', { error: err }); - return next(err); - } - return next(null, res.ReplicationConfiguration); - }), - (repConfig, next) => { - const { Rules } = repConfig; - const storageClass = Rules[0].Destination.StorageClass || SITE_NAME; - if (!storageClass) { - const errMsg = 'missing SITE_NAME environment variable, must be set to' - + ' the value of "site" property in the CRR configuration'; - log.error(errMsg); - return next(new Error(errMsg)); - } - return eachLimit(versions, WORKERS, (i, apply) => { - const { Key, VersionId } = i; - _markObjectPending(bucket, Key, VersionId, storageClass, repConfig, apply); - }, next); - }, - ], cb); -} - -function triggerCRROnBucket(bucketName, cb) { - const bucket = bucketName.trim(); - bucketInProgress = bucket; - log.info(`starting task for bucket: ${bucket}`); - if (KEY_MARKER || VERSION_ID_MARKER) { - // resume from where we left off in previous script launch - KeyMarker = KEY_MARKER; - VersionIdMarker = VERSION_ID_MARKER; - KEY_MARKER = undefined; - VERSION_ID_MARKER = undefined; - log.info(`resuming at: KeyMarker=${KeyMarker} ` - + `VersionIdMarker=${VersionIdMarker}`); - } - doWhilst( - done => _listObjectVersions( - bucket, - VersionIdMarker, - KeyMarker, - (err, data) => { - if (err) { - log.error('error listing object versions', { error: err }); - return done(err); - } - const versions = data.DeleteMarkers - ? data.Versions.concat(data.DeleteMarkers) : data.Versions; - return _markPending(bucket, versions, err => { - if (err) { - return done(err); - } - VersionIdMarker = data.NextVersionIdMarker; - KeyMarker = data.NextKeyMarker; - return done(); - }); - }, - ), - () => { - if (nUpdated >= MAX_UPDATES || nProcessed >= MAX_SCANNED) { - _logProgress(); - let remainingBuckets; - if (VersionIdMarker || KeyMarker) { - // next bucket to process is still the current one - remainingBuckets = BUCKETS.slice( - BUCKETS.findIndex(bucket => bucket === bucketName), - ); - } else { - // next bucket to process is the next in bucket list - remainingBuckets = BUCKETS.slice( - BUCKETS.findIndex(bucket => bucket === bucketName) + 1, - ); - } - let message = 'reached ' - + `${nUpdated >= MAX_UPDATES ? 'update' : 'scanned'} ` - + 'count limit, resuming from this ' - + 'point can be achieved by re-running the script with ' - + `the bucket list "${remainingBuckets.join(',')}"`; - if (VersionIdMarker || KeyMarker) { - message += ' and the following environment variables set: ' - + `KEY_MARKER=${KeyMarker} ` - + `VERSION_ID_MARKER=${VersionIdMarker}`; - } - log.info(message); - process.exit(0); - } - if (VersionIdMarker || KeyMarker) { - return true; - } - return false; - }, - err => { - bucketInProgress = null; - if (err) { - log.error('error marking objects for crr', { bucket }); - return cb(err); - } - _logProgress(); - log.info(`completed task for bucket: ${bucket}`); - return cb(); - }, - ); -} - -// trigger the calls to list objects and mark them for crr -series([ - next => metadataUtil.metadataClient.setup(next), - next => eachSeries(BUCKETS, triggerCRROnBucket, next), - next => metadataUtil.metadataClient.close(next), -], err => { - clearInterval(logProgressInterval); +const replicationStatusUpdater = new ReplicationStatusUpdater({ + buckets: BUCKETS, + replicationStatusToProcess, + workers: WORKERS, + accessKey: ACCESS_KEY, + secretKey: SECRET_KEY, + endpoint: ENDPOINT, + siteName: SITE_NAME, + storageType: STORAGE_TYPE, + targetPrefix: TARGET_PREFIX, + listingLimit: LISTING_LIMIT, + maxUpdates: MAX_UPDATES, + maxScanned: MAX_SCANNED, + keyMarker: KEY_MARKER, + versionIdMarker: VERSION_ID_MARKER, +}, log); + +replicationStatusUpdater.run(err => { if (err) { return log.error('error during task execution', { error: err }); } return log.info('completed task for all buckets'); }); -function stop() { - log.warn('stopping execution'); - _logProgress(); - process.exit(1); -} - -process.on('SIGINT', stop); -process.on('SIGHUP', stop); -process.on('SIGQUIT', stop); -process.on('SIGTERM', stop); +const stopCrr = replicationStatusUpdater.stop; +process.on('SIGINT', stopCrr); +process.on('SIGHUP', stopCrr); +process.on('SIGQUIT', stopCrr); +process.on('SIGTERM', stopCrr); diff --git a/tests/unit/CRR/ReplicationStatusUpdater.js b/tests/unit/CRR/ReplicationStatusUpdater.js new file mode 100644 index 00000000..8f83c9e5 --- /dev/null +++ b/tests/unit/CRR/ReplicationStatusUpdater.js @@ -0,0 +1,278 @@ +const AWS = require('aws-sdk'); +const werelogs = require('werelogs'); +const assert = require('assert'); + +const BackbeatClient = require('../../../BackbeatClient'); +const ReplicationStatusUpdater = require('../../../CRR/ReplicationStatusUpdater'); +const { + initializeCrrWithMocks, + listVersionRes, + listVersionsRes, + listVersionWithMarkerRes, + getBucketReplicationRes, + getMetadataRes, + putMetadataRes, +} = require('../../utils/crr'); + +const logger = new werelogs.Logger('ReplicationStatusUpdater::tests', 'debug', 'debug'); + +describe('ReplicationStatusUpdater', () => { + let crr; + + beforeEach(() => { + crr = initializeCrrWithMocks({ + buckets: ['bucket0'], + workers: 10, + replicationStatusToProcess: ['NEW'], + targetPrefix: 'toto', + listingLimit: 10, + }, logger); + }); + + it('should process bucket for CRR', done => { + crr.run(err => { + assert.ifError(err); + + expect(crr.s3.listObjectVersions).toHaveBeenCalledTimes(1); + expect(crr.s3.listObjectVersions).toHaveBeenCalledWith({ + Bucket: 'bucket0', + KeyMarker: null, + MaxKeys: 10, + Prefix: 'toto', + VersionIdMarker: null, + }, expect.any(Function)); + + expect(crr.s3.getBucketReplication).toHaveBeenCalledTimes(1); + expect(crr.s3.getBucketReplication).toHaveBeenCalledWith({ + Bucket: 'bucket0', + }, expect.any(Function)); + + expect(crr.bb.getMetadata).toHaveBeenCalledTimes(1); + expect(crr.bb.getMetadata).toHaveBeenCalledWith({ + Bucket: 'bucket0', + Key: listVersionRes.Versions[0].Key, + VersionId: listVersionRes.Versions[0].VersionId, + }, expect.any(Function)); + + expect(crr.bb.putMetadata).toHaveBeenCalledTimes(1); + const expectedReplicationInfo = { + status: 'PENDING', + backends: [ + { + site: 'aws-location', + status: 'PENDING', + dataStoreVersionId: '', + }, + ], + content: ['METADATA', 'DATA'], + destination: 'arn:aws:s3:::sourcebucket', + storageClass: 'aws-location', + role: 'arn:aws:iam::root:role/s3-replication-role', + storageType: '', + dataStoreVersionId: '', + isNFS: null, + }; + expect(crr.bb.putMetadata).toHaveBeenCalledWith( + expect.objectContaining({ + Body: expect.stringContaining(JSON.stringify(expectedReplicationInfo)), + }), + expect.any(Function), + ); + + assert.strictEqual(crr._nProcessed, 1); + assert.strictEqual(crr._nSkipped, 0); + assert.strictEqual(crr._nUpdated, 1); + assert.strictEqual(crr._nErrors, 0); + return done(); + }); + }); + + it('should process bucket for CRR with multiple objects', done => { + crr.s3.listObjectVersions = jest.fn((params, cb) => cb(null, listVersionsRes)); + + crr.run(err => { + assert.ifError(err); + + expect(crr.s3.listObjectVersions).toHaveBeenCalledTimes(1); + expect(crr.s3.listObjectVersions).toHaveBeenCalledWith({ + Bucket: 'bucket0', + KeyMarker: null, + MaxKeys: 10, + Prefix: 'toto', + VersionIdMarker: null, + }, expect.any(Function)); + + expect(crr.s3.getBucketReplication).toHaveBeenCalledTimes(1); + expect(crr.s3.getBucketReplication).toHaveBeenCalledWith({ + Bucket: 'bucket0', + }, expect.any(Function)); + + expect(crr.bb.getMetadata).toHaveBeenCalledTimes(2); + expect(crr.bb.getMetadata).toHaveBeenNthCalledWith(1, { + Bucket: 'bucket0', + Key: listVersionsRes.Versions[0].Key, + VersionId: listVersionsRes.Versions[0].VersionId, + }, expect.any(Function)); + + expect(crr.bb.getMetadata).toHaveBeenNthCalledWith(2, { + Bucket: 'bucket0', + Key: listVersionsRes.Versions[1].Key, + VersionId: listVersionsRes.Versions[1].VersionId, + }, expect.any(Function)); + + expect(crr.bb.putMetadata).toHaveBeenCalledTimes(2); + + assert.strictEqual(crr._nProcessed, 2); + assert.strictEqual(crr._nSkipped, 0); + assert.strictEqual(crr._nUpdated, 2); + assert.strictEqual(crr._nErrors, 0); + return done(); + }); + }); +}); + +describe('ReplicationStatusUpdater with specifics', () => { + it('maxUpdates set to 1', done => { + const crr = initializeCrrWithMocks({ + buckets: ['bucket0'], + workers: 10, + replicationStatusToProcess: ['NEW'], + maxUpdates: 1, + }, logger); + + crr.s3.listObjectVersions = jest.fn((params, cb) => cb(null, listVersionWithMarkerRes)); + + crr.run(err => { + assert.ifError(err); + + expect(crr.s3.listObjectVersions).toHaveBeenCalledTimes(1); + expect(crr.s3.getBucketReplication).toHaveBeenCalledTimes(1); + expect(crr.bb.getMetadata).toHaveBeenCalledTimes(1); + expect(crr.bb.putMetadata).toHaveBeenCalledTimes(1); + + assert.strictEqual(crr._nProcessed, 1); + assert.strictEqual(crr._nSkipped, 0); + assert.strictEqual(crr._nUpdated, 1); + assert.strictEqual(crr._nErrors, 0); + done(); + }); + }); + + it('maxUpdates set to 2', done => { + const crr = initializeCrrWithMocks({ + buckets: ['bucket0'], + workers: 10, + replicationStatusToProcess: ['NEW'], + maxUpdates: 2, + }, logger); + + crr.s3.listObjectVersions = jest.fn((params, cb) => cb(null, listVersionWithMarkerRes)); + + crr.run(err => { + assert.ifError(err); + + expect(crr.s3.listObjectVersions).toHaveBeenCalledTimes(2); + + expect(crr.s3.listObjectVersions).toHaveBeenNthCalledWith(1, { + Bucket: 'bucket0', + Prefix: undefined, + MaxKeys: undefined, + KeyMarker: null, + VersionIdMarker: null, + }, expect.any(Function)); + + expect(crr.s3.listObjectVersions).toHaveBeenNthCalledWith(2, { + Bucket: 'bucket0', + Prefix: undefined, + MaxKeys: undefined, + KeyMarker: 'key0', + VersionIdMarker: 'aJdO148N3LjN00000000001I4j3QKItW', + }, expect.any(Function)); + + expect(crr.s3.getBucketReplication).toHaveBeenCalledTimes(2); + expect(crr.bb.getMetadata).toHaveBeenCalledTimes(2); + expect(crr.bb.putMetadata).toHaveBeenCalledTimes(2); + + assert.strictEqual(crr._nProcessed, 2); + assert.strictEqual(crr._nSkipped, 0); + assert.strictEqual(crr._nUpdated, 2); + assert.strictEqual(crr._nErrors, 0); + done(); + }); + }); + + it('maxScanned set to 1', done => { + const crr = initializeCrrWithMocks({ + buckets: ['bucket0'], + workers: 10, + replicationStatusToProcess: ['NEW'], + maxScanned: 1, + }, logger); + + crr.s3.listObjectVersions = jest.fn((params, cb) => cb(null, listVersionWithMarkerRes)); + + crr.run(err => { + assert.ifError(err); + + expect(crr.s3.listObjectVersions).toHaveBeenCalledTimes(1); + expect(crr.s3.getBucketReplication).toHaveBeenCalledTimes(1); + expect(crr.bb.getMetadata).toHaveBeenCalledTimes(1); + expect(crr.bb.putMetadata).toHaveBeenCalledTimes(1); + + assert.strictEqual(crr._nProcessed, 1); + assert.strictEqual(crr._nSkipped, 0); + assert.strictEqual(crr._nUpdated, 1); + assert.strictEqual(crr._nErrors, 0); + done(); + }); + }); + + it('set inputKeyMarker', done => { + const crr = initializeCrrWithMocks({ + buckets: ['bucket0'], + workers: 10, + replicationStatusToProcess: ['NEW'], + keyMarker: 'key1', + }, logger); + + crr.run(err => { + assert.ifError(err); + + expect(crr.s3.listObjectVersions).toHaveBeenCalledTimes(1); + expect(crr.s3.listObjectVersions).toHaveBeenNthCalledWith(1, { + Bucket: 'bucket0', + Prefix: undefined, + MaxKeys: undefined, + KeyMarker: 'key1', + VersionIdMarker: undefined, + }, expect.any(Function)); + + done(); + }); + }); + + it('set inputKeyMarker and inputVersionIdMarker', done => { + const crr = initializeCrrWithMocks({ + buckets: ['bucket0'], + workers: 10, + replicationStatusToProcess: ['NEW'], + keyMarker: 'key1', + versionIdMarker: 'vid1', + }, logger); + + crr.run(err => { + assert.ifError(err); + + expect(crr.s3.listObjectVersions).toHaveBeenCalledTimes(1); + expect(crr.s3.listObjectVersions).toHaveBeenNthCalledWith(1, { + Bucket: 'bucket0', + Prefix: undefined, + MaxKeys: undefined, + KeyMarker: 'key1', + VersionIdMarker: 'vid1', + }, expect.any(Function)); + + done(); + }); + }); +}); diff --git a/tests/unit/CRR/crrExistingObjects.js b/tests/unit/CRR/crrExistingObjects.js new file mode 100644 index 00000000..85fe481a --- /dev/null +++ b/tests/unit/CRR/crrExistingObjects.js @@ -0,0 +1,187 @@ +const werelogs = require('werelogs'); +// Mock the entire werelogs module +const mockFatal = jest.fn(); +const mockInfo = jest.fn(); +const mockError = jest.fn(); + +jest.mock('werelogs', () => ({ + Logger: jest.fn().mockImplementation(() => ({ + fatal: mockFatal, + info: mockInfo, + error: mockError, + })), + configure: jest.fn(), +})); + +const mockCrrRun = jest.fn(cb => cb(null)); // Simulate successful run +const mockCrrStop = jest.fn(); + +jest.mock('../../../CRR/ReplicationStatusUpdater', () => jest.fn().mockImplementation(() => ({ + run: mockCrrRun, + stop: mockCrrStop, +}))); + +describe('crrExistingObjects', () => { + let originalEnv; + + beforeAll(() => { + process.exit = jest.fn(); + }); + + beforeEach(() => { + // Save the original process.env + originalEnv = { ...process.env }; + + // Reset the argv array to its default state + process.argv = ['node', 'yourscript.js']; + // Reset the mock to clear previous call history + process.exit.mockReset(); + + // Clear the mocks before each test + // ReplicationStatusUpdater.mockClear(); + mockFatal.mockClear(); + mockInfo.mockClear(); + mockError.mockClear(); + mockCrrRun.mockClear(); + mockCrrStop.mockClear(); + + // Clear the module cache before each test + jest.resetModules(); + }); + + afterEach(() => { + // Restore the original process.env after each test + process.env = originalEnv; + }); + + afterAll(() => { + // Restore the original process.exit + process.exit = jest.requireActual('process').exit; + }); + + test('should run successfully', () => { + process.argv[2] = 'bucket1,bucket2'; + + process.env.TARGET_REPLICATION_STATUS = 'NEW,PENDING'; + process.env.WORKERS = '10'; + process.env.ACCESS_KEY = 'testAccessKey'; + process.env.SECRET_KEY = 'testSecretKey'; + process.env.ENDPOINT = 'http://fake.endpoint'; + process.env.SITE_NAME = 'testSite'; + process.env.STORAGE_TYPE = 'testStorage'; + process.env.TARGET_PREFIX = 'testPrefix'; + process.env.LISTING_LIMIT = '2000'; + process.env.MAX_UPDATES = '100'; + process.env.MAX_SCANNED = '1000'; + process.env.KEY_MARKER = 'testKeyMarker'; + process.env.VERSION_ID_MARKER = 'testVersionIdMarker'; + process.env.DEBUG = '0'; + + require('../../../crrExistingObjects'); + + const ReplicationStatusUpdater = require('../../../CRR/ReplicationStatusUpdater'); + + expect(ReplicationStatusUpdater).toHaveBeenCalledTimes(1); + expect(ReplicationStatusUpdater).toHaveBeenCalledWith({ + buckets: ['bucket1', 'bucket2'], + replicationStatusToProcess: ['NEW', 'PENDING'], + workers: 10, + accessKey: 'testAccessKey', + secretKey: 'testSecretKey', + endpoint: 'http://fake.endpoint', + siteName: 'testSite', + storageType: 'testStorage', + targetPrefix: 'testPrefix', + listingLimit: 2000, + maxUpdates: 100, // or a more specific expectation + maxScanned: 1000, + keyMarker: 'testKeyMarker', + versionIdMarker: 'testVersionIdMarker', + }, expect.anything()); + + expect(mockFatal).not.toHaveBeenCalled(); + expect(mockError).not.toHaveBeenCalled(); + expect(process.exit).not.toHaveBeenCalled(); + expect(mockCrrRun).toHaveBeenCalled(); + }); + + test('should set the default parameter when unspecified', () => { + process.argv[2] = 'bucket1,bucket2'; + + process.env.WORKERS = '10'; + process.env.ACCESS_KEY = 'testAccessKey'; + process.env.SECRET_KEY = 'testSecretKey'; + process.env.ENDPOINT = 'http://fake.endpoint'; + + require('../../../crrExistingObjects'); + + const ReplicationStatusUpdater = require('../../../CRR/ReplicationStatusUpdater'); + + expect(ReplicationStatusUpdater).toHaveBeenCalledTimes(1); + expect(ReplicationStatusUpdater).toHaveBeenCalledWith({ + buckets: ['bucket1', 'bucket2'], + replicationStatusToProcess: ['NEW'], + workers: 10, + accessKey: 'testAccessKey', + secretKey: 'testSecretKey', + endpoint: 'http://fake.endpoint', + storageType: '', + targetPrefix: undefined, + listingLimit: 1000, + maxUpdates: undefined, + maxScanned: undefined, + keyMarker: undefined, + versionIdMarker: undefined, + }, expect.anything()); + + expect(mockFatal).not.toHaveBeenCalled(); + expect(mockError).not.toHaveBeenCalled(); + expect(process.exit).not.toHaveBeenCalled(); + expect(mockCrrRun).toHaveBeenCalled(); + }); + + test('should exit if no bucket is provided', () => { + process.argv[2] = ''; + process.env.ENDPOINT = 'http://example.com'; + process.env.ACCESS_KEY = 'accesskey'; + process.env.SECRET_KEY = 'secretkey'; + + require('../../../crrExistingObjects'); + + expect(mockFatal).toHaveBeenCalledWith('No buckets given as input! Please provide a comma-separated list of buckets'); + expect(process.exit).toHaveBeenCalledWith(1); + }); + + test('should exit if no endpoint is provided', () => { + process.argv[2] = 'bucket0'; + process.env.ACCESS_KEY = 'accesskey'; + process.env.SECRET_KEY = 'secretkey'; + + require('../../../crrExistingObjects'); + + expect(mockFatal).toHaveBeenCalledWith('ENDPOINT not defined!'); + expect(process.exit).toHaveBeenCalledWith(1); + }); + + test('should exit if no access key is provided', () => { + process.argv[2] = 'bucket0'; + process.env.ENDPOINT = 'http://example.com'; + process.env.SECRET_KEY = 'secretkey'; + + require('../../../crrExistingObjects'); + + expect(mockFatal).toHaveBeenCalledWith('ACCESS_KEY not defined'); + expect(process.exit).toHaveBeenCalledWith(1); + }); + + test('should exit if no secret key is provided', () => { + process.argv[2] = 'bucket0'; + process.env.ENDPOINT = 'http://example.com'; + process.env.ACCESS_KEY = 'accesskey'; + + require('../../../crrExistingObjects'); + + expect(mockFatal).toHaveBeenCalledWith('SECRET_KEY not defined'); + expect(process.exit).toHaveBeenCalledWith(1); + }); +}); diff --git a/tests/utils/crr.js b/tests/utils/crr.js new file mode 100644 index 00000000..6554dbcd --- /dev/null +++ b/tests/utils/crr.js @@ -0,0 +1,195 @@ +const ReplicationStatusUpdater = require('../../CRR/ReplicationStatusUpdater'); + +const listVersionRes = { + IsTruncated: false, + Versions: [ + { + ETag: '"dabcc341ecab339daf766e1cddd5d1bb"', + ChecksumAlgorithm: [], + Size: 3263, + StorageClass: 'STANDARD', + Key: 'key0', + VersionId: 'aJdO148N3LjN00000000001I4j3QKItW', + IsLatest: true, + LastModified: '2024-01-05T13:11:31.861Z', + Owner: { + DisplayName: 'bart', + ID: '0', + }, + }, + ], + DeleteMarkers: [], + Name: 'bucket0', + MaxKeys: 1000, + CommonPrefixes: [], +}; + +const listVersionWithMarkerRes = { + IsTruncated: true, + Versions: [ + { + ETag: '"dabcc341ecab339daf766e1cddd5d1bb"', + ChecksumAlgorithm: [], + Size: 3263, + StorageClass: 'STANDARD', + Key: 'key0', + VersionId: 'aJdO148N3LjN00000000001I4j3QKItW', + IsLatest: true, + LastModified: '2024-01-05T13:11:31.861Z', + Owner: { + DisplayName: 'bart', + ID: '0', + }, + }, + ], + DeleteMarkers: [], + Name: 'bucket0', + MaxKeys: 1, + CommonPrefixes: [], + NextVersionIdMarker: 'aJdO148N3LjN00000000001I4j3QKItW', + NextKeyMarker: 'key0', +}; + +const listVersionsRes = { + IsTruncated: false, + Versions: [ + { + ETag: '"dabcc341ecab339daf766e1cddd5d1bb"', + ChecksumAlgorithm: [], + Size: 3263, + StorageClass: 'STANDARD', + Key: 'key0', + VersionId: 'aJdO148N3LjN00000000001I4j3QKItW', + IsLatest: true, + LastModified: '2024-01-05T13:11:31.861Z', + Owner: { + DisplayName: 'bart', + ID: '0', + }, + }, + { + ETag: '"dabcc341ecab339daf766e1cddd5d1bb"', + ChecksumAlgorithm: [], + Size: 3263, + StorageClass: 'STANDARD', + Key: 'key0', + VersionId: 'aJdO148N3LjN00000000001I4j3QKItV', + IsLatest: true, + LastModified: '2024-01-05T13:11:32.861Z', + Owner: { + DisplayName: 'bart', + ID: '0', + }, + }, + ], + DeleteMarkers: [], + Name: 'bucket0', + MaxKeys: 1000, + CommonPrefixes: [], +}; + +const getBucketReplicationRes = { + ReplicationConfiguration: { + Role: 'arn:aws:iam::root:role/s3-replication-role', + Rules: [ + { + ID: 'r0', + Prefix: '', + Status: 'Enabled', + Destination: { + Bucket: 'arn:aws:s3:::sourcebucket', + StorageClass: 'aws-location', + }, + }, + ], + }, +}; + +const objectMd = { + 'owner-display-name': 'bart', + 'owner-id': 'f2ae8ca93fb44fe7ef409dbfdc0e0873b921fe4183364ede136be8c44756acda', + 'content-length': 3263, + 'content-md5': 'dabcc341ecab339daf766e1cddd5d1bb', + 'x-amz-version-id': 'null', + 'x-amz-server-version-id': '', + 'x-amz-storage-class': 'STANDARD', + 'x-amz-server-side-encryption': '', + 'x-amz-server-side-encryption-aws-kms-key-id': '', + 'x-amz-server-side-encryption-customer-algorithm': '', + 'x-amz-website-redirect-location': '', + 'acl': { + Canned: 'private', FULL_CONTROL: [], WRITE_ACP: [], READ: [], READ_ACP: [], + }, + 'key': '', + 'location': [{ + key: '7751DDF49AA3B289C2D261ED1FD09A596D5D3F20', + size: 3263, + start: 0, + dataStoreName: 'us-east-1', + dataStoreType: 'scality', + dataStoreETag: '1:dabcc341ecab339daf766e1cddd5d1bb', + }], + 'isDeleteMarker': false, + 'tags': {}, + 'replicationInfo': { + status: '', + backends: [], + content: [], + destination: '', + storageClass: '', + role: '', + storageType: '', + dataStoreVersionId: '', + }, + 'dataStoreName': 'us-east-1', + 'originOp': 's3:ObjectCreated:Put', + 'last-modified': '2024-01-05T13:11:31.861Z', + 'md-model-version': 3, + 'versionId': '98295539708053999999RG001 ', +}; + +const getMetadataRes = { + Body: JSON.stringify(objectMd), +}; + +const putMetadataRes = { versionId: '98295539708053999999RG001 ' }; + +/** + * Initializes the ReplicationStatusUpdater class with mock methods for testing. + * + * This function creates an instance of the ReplicationStatusUpdater class using the provided configuration. + * It then replaces certain methods of this instance with Jest mock functions. These mocked methods include + * `listObjectVersions`, `getBucketReplication` for the S3 client, and `getMetadata`, `putMetadata` for the + * Backbeat client. These mock functions are designed to simulate the behavior of the actual AWS S3 and Backbeat + * clients without making real API calls, which is useful for isolated testing of the ReplicationStatusUpdater + * functionality. + * + * @param {Object} config - The configuration object used to initialize the ReplicationStatusUpdater instance. + * @param {Logger} log - The logging object to be used by the ReplicationStatusUpdater. + * @returns {ReplicationStatusUpdater} An instance of ReplicationStatusUpdater with mocked methods. + */ +function initializeCrrWithMocks(config, log) { + const crr = new ReplicationStatusUpdater(config, log); + + const listObjectVersionsMock = jest.fn((params, cb) => cb(null, listVersionRes)); + const getBucketReplicationMock = jest.fn((params, cb) => cb(null, getBucketReplicationRes)); + const getMetadataMock = jest.fn((params, cb) => cb(null, getMetadataRes)); + const putMetadataMock = jest.fn((params, cb) => cb(null, putMetadataRes)); + + crr.s3.listObjectVersions = listObjectVersionsMock; + crr.s3.getBucketReplication = getBucketReplicationMock; + crr.bb.getMetadata = getMetadataMock; + crr.bb.putMetadata = putMetadataMock; + + return crr; +} + +module.exports = { + initializeCrrWithMocks, + listVersionRes, + listVersionsRes, + listVersionWithMarkerRes, + getBucketReplicationRes, + getMetadataRes, + putMetadataRes, +};