From a2808bce7c166dc0248af2a0b72bbcf3186113c6 Mon Sep 17 00:00:00 2001 From: williamlardier Date: Mon, 23 Dec 2024 16:09:38 +0100 Subject: [PATCH] Support counting mpus in count items - The MPU parts size are part of the current size of the bucket - Only the count of overview keys are part of the object count - The metrics are detailed in a field - The getObjectMDStats function is updated: the logic to process each cursor's entry is shared, and mpu entries are processed in the same way as the regular objects, with some specifics. Issue: S3UTILS-186 --- CountItems/utils/utils.js | 10 +- utils/S3UtilsMongoClient.js | 321 +++++++++++++++++++++++------------- 2 files changed, 212 insertions(+), 119 deletions(-) diff --git a/CountItems/utils/utils.js b/CountItems/utils/utils.js index e4c24e2a..5a404ab6 100644 --- a/CountItems/utils/utils.js +++ b/CountItems/utils/utils.js @@ -18,6 +18,7 @@ function consolidateDataMetrics(target, source) { _nonCurrentRestored: 0, _nonCurrentRestoring: 0, _inflightsPreScan: 0, + _incompleteMPUParts: 0, }, }); } @@ -32,6 +33,7 @@ function consolidateDataMetrics(target, source) { _currentRestoring: 0, _nonCurrentRestored: 0, _nonCurrentRestoring: 0, + _incompleteMPUParts: 0, deleteMarker: 0, }, }); @@ -48,6 +50,7 @@ function consolidateDataMetrics(target, source) { resTarget.usedCapacity._currentRestored += usedCapacity && usedCapacity._currentRestored ? usedCapacity._currentRestored : 0; resTarget.usedCapacity._nonCurrentRestoring += usedCapacity && usedCapacity._nonCurrentRestoring ? usedCapacity._nonCurrentRestoring : 0; resTarget.usedCapacity._nonCurrentRestored += usedCapacity && usedCapacity._nonCurrentRestored ? usedCapacity._nonCurrentRestored : 0; + resTarget.usedCapacity._incompleteMPUParts += usedCapacity && usedCapacity._incompleteMPUParts ? usedCapacity._incompleteMPUParts : 0; resTarget.objectCount.current += objectCount && objectCount.current ? objectCount.current : 0; resTarget.objectCount.nonCurrent += objectCount && objectCount.nonCurrent ? objectCount.nonCurrent : 0; @@ -58,6 +61,7 @@ function consolidateDataMetrics(target, source) { resTarget.objectCount._currentRestored += objectCount && objectCount._currentRestored ? objectCount._currentRestored : 0; resTarget.objectCount._nonCurrentRestoring += objectCount && objectCount._nonCurrentRestoring ? objectCount._nonCurrentRestoring : 0; resTarget.objectCount._nonCurrentRestored += objectCount && objectCount._nonCurrentRestored ? objectCount._nonCurrentRestored : 0; + resTarget.objectCount._incompleteMPUParts += objectCount && objectCount._incompleteMPUParts ? objectCount._incompleteMPUParts : 0; resTarget.usedCapacity._inflightsPreScan += usedCapacity && usedCapacity._inflightsPreScan ? usedCapacity._inflightsPreScan : 0; if (accountOwnerID) { @@ -65,11 +69,13 @@ function consolidateDataMetrics(target, source) { } resTarget.usedCapacity.current += usedCapacity - ? usedCapacity._currentCold + usedCapacity._currentRestored + usedCapacity._currentRestoring : 0; + ? usedCapacity._currentCold + usedCapacity._currentRestored + usedCapacity._currentRestoring + + usedCapacity._incompleteMPUParts : 0; resTarget.usedCapacity.nonCurrent += usedCapacity ? usedCapacity._nonCurrentCold + usedCapacity._nonCurrentRestored + usedCapacity._nonCurrentRestoring : 0; resTarget.objectCount.current += objectCount - ? objectCount._currentCold + objectCount._currentRestored + objectCount._currentRestoring : 0; + ? objectCount._currentCold + objectCount._currentRestored + objectCount._currentRestoring + + objectCount._incompleteMPUParts : 0; resTarget.objectCount.nonCurrent += objectCount ? objectCount._nonCurrentCold + objectCount._nonCurrentRestored + objectCount._nonCurrentRestoring : 0; diff --git a/utils/S3UtilsMongoClient.js b/utils/S3UtilsMongoClient.js index b91e4d01..48a8d843 100644 --- a/utils/S3UtilsMongoClient.js +++ b/utils/S3UtilsMongoClient.js @@ -42,6 +42,8 @@ const baseMetricsObject = { versionCountRestored: 0, versionDataRestored: 0, deleteMarkerCountRestored: 0, + mpuPartCount: 0, + mpuPartsData: 0, }; class S3UtilsMongoClient extends MongoClientInterface { @@ -159,6 +161,7 @@ class S3UtilsMongoClient extends MongoClientInterface { async getObjectMDStats(bucketName, bucketInfo, isTransient, log, callback) { let cursor; + let cursorMpuBucket; try { const c = this.getCollection(bucketName); cursor = c.find({}, { @@ -193,6 +196,10 @@ class S3UtilsMongoClient extends MongoClientInterface { const usersBucketCreationDatesMap = await this._getUsersBucketCreationDates(log); + const bucketStatus = bucketInfo.getVersioningConfiguration(); + const isVer = (bucketStatus && (bucketStatus.Status === 'Enabled' + || bucketStatus.Status === 'Suspended')); + if (!usersBucketCreationDatesMap) { return callback(errors.InternalError); } @@ -207,132 +214,145 @@ class S3UtilsMongoClient extends MongoClientInterface { let startCursorDate = new Date(); let processed = 0; - await cursor.forEach( - res => { - // Periodically display information about the cursor - // if more than 30s elapsed - const currentDate = Date.now(); - if (currentDate - startCursorDate > 30000) { - startCursorDate = currentDate; - log.info('Processing cursor', { - method: 'getObjectMDStats', - bucketName, - processed, - }); - } - - const isObjectCold = this._isObjectCold(res); - const isObjectRestoring = this._isObjectRestoring(res); - const isObjectRestored = this._isObjectRestored(res); - const { data, error } = this._processEntryData( + const processCursorEntry = (entry, isMPUPart = false, isOverviewKey = false) => { + // Periodically display information about the cursor + // if more than 30s elapsed + const currentDate = Date.now(); + if (currentDate - startCursorDate > 30000) { + startCursorDate = currentDate; + log.info('Processing cursor', { + method: 'getObjectMDStats', bucketName, - bucketInfo, - res, - usersBucketCreationDatesMap[`${res.value['owner-id']}${constants.splitter}${bucketName}`], - isTransient, - locationConfig, - { - isCold: isObjectCold, - isRestoring: isObjectRestoring, - isRestored: isObjectRestored, - }, - ); + processed, + }); + } - if (error) { - log.error('Failed to process entry data', { - method: 'getObjectMDStats', - entry: res, - error, - }); - monitoring.objectsCount.inc({ status: 'error' }); - return; - } + const isObjectCold = this._isObjectCold(entry); + const isObjectRestoring = this._isObjectRestoring(entry); + const isObjectRestored = this._isObjectRestored(entry); - if (!data) { - // Skipping entry, esp. in case of PHD - log.info('Skipping entry', { - method: 'getObjectMDStats', - entry: res, - }); - monitoring.objectsCount.inc({ status: 'skipped' }); - return; - } + const { data, error } = this._processEntryData( + bucketName, + bucketInfo, + entry, + usersBucketCreationDatesMap[`${entry.value['owner-id']}${constants.splitter}${bucketName}`], + isTransient, + locationConfig, + { + isCold: isObjectCold, + isRestoring: isObjectRestoring, + isRestored: isObjectRestored, + }, + ); - let targetCount; - let targetData; - if (res._id.indexOf('\0') !== -1) { - // versioned item - targetCount = 'versionCount'; - targetData = 'versionData'; + if (error) { + log.error('Failed to process entry data', { + method: 'getObjectMDStats', + entry, + error, + }); + monitoring.objectsCount.inc({ status: 'error' }); + return; + } - if (res.value.replicationInfo.backends.length > 0 - && this._isReplicationEntryStalled(res, cmpDate)) { - stalledCount++; - } - } else if (!!res.value.versionId && !res.value.isNull) { - // master version - // includes current objects in versioned bucket and - // objects uploaded before bucket suspended - targetCount = 'masterCount'; - targetData = 'masterData'; - } else { - // null version - // include current objects in nonversioned bucket and - // objects uploaded after bucket suspended - targetCount = 'nullCount'; - targetData = 'nullData'; - } + if (!data) { + // Skipping entry, esp. in case of PHD + log.info('Skipping entry', { + method: 'getObjectMDStats', + entry, + }); + monitoring.objectsCount.inc({ status: 'skipped' }); + return; + } - // Dynamically get the metrics based on the object state - if (isObjectCold) { - targetCount += 'Cold'; - targetData += 'Cold'; - } else if (isObjectRestoring) { - targetCount += 'Restoring'; - targetData += 'Restoring'; - } else if (isObjectRestored) { - targetCount += 'Restored'; - targetData += 'Restored'; + let targetCount; + let targetData; + if (entry._id.indexOf('\0') !== -1) { + // versioned item + targetCount = 'versionCount'; + targetData = 'versionData'; + + if (entry.value.replicationInfo.backends.length > 0 + && this._isReplicationEntryStalled(entry, cmpDate)) { + stalledCount++; } + } else if (!!entry.value.versionId && !entry.value.isNull) { + // master version + // includes current objects in versioned bucket and + // objects uploaded before bucket suspended + targetCount = 'masterCount'; + targetData = 'masterData'; + } else { + // null version + // include current objects in nonversioned bucket and + // objects uploaded after bucket suspended + targetCount = 'nullCount'; + targetData = 'nullData'; + } - Object.keys(data).forEach(metricLevel => { - // metricLevel can only be 'bucket', 'location' or 'account' - if (validStorageMetricLevels.has(metricLevel)) { - Object.keys(data[metricLevel]).forEach(resourceName => { - // resourceName can be the name of bucket, location or account - if (!collRes[metricLevel][resourceName]) { - collRes[metricLevel][resourceName] = { - ...baseMetricsObject, - }; - } - collRes[metricLevel][resourceName][targetData] += data[metricLevel][resourceName]; - collRes[metricLevel][resourceName][targetCount]++; - collRes[metricLevel][resourceName].deleteMarkerCount += res.value.isDeleteMarker ? 1 : 0; - }); - } - }); - Object.keys(data.account).forEach(account => { - if (!collRes.account[account].locations) { - collRes.account[account].locations = {}; - } + // Dynamically get the metrics based on the object state + if (isObjectCold) { + targetCount += 'Cold'; + targetData += 'Cold'; + } else if (isObjectRestoring) { + targetCount += 'Restoring'; + targetData += 'Restoring'; + } else if (isObjectRestored) { + targetCount += 'Restored'; + targetData += 'Restored'; + } + + if (isMPUPart || isOverviewKey) { + targetCount = 'mpuPartCount'; + targetData = 'mpuPartsData'; + } - Object.keys(data.location).forEach(location => { - if (!collRes.account[account].locations[location]) { - collRes.account[account].locations[location] = { + Object.keys(data).forEach(metricLevel => { + // metricLevel can only be 'bucket', 'location' or 'account' + if (validStorageMetricLevels.has(metricLevel)) { + Object.keys(data[metricLevel]).forEach(resourceName => { + // resourceName can be the name of bucket, location or account + if (!collRes[metricLevel][resourceName]) { + collRes[metricLevel][resourceName] = { ...baseMetricsObject, }; } - collRes.account[account].locations[location][targetData] += data.location[location]; - collRes.account[account].locations[location][targetCount]++; - collRes.account[account].locations[location].deleteMarkerCount += res.value.isDeleteMarker ? 1 : 0; + collRes[metricLevel][resourceName][targetData] += data[metricLevel][resourceName]; + // Do not count the MPU parts as objects + if (!isMPUPart) { + collRes[metricLevel][resourceName][targetCount]++; + } + collRes[metricLevel][resourceName].deleteMarkerCount += entry.value.isDeleteMarker ? 1 : 0; }); + } + }); + Object.keys(data.account).forEach(account => { + if (!collRes.account[account].locations) { + collRes.account[account].locations = {}; + } + + Object.keys(data.location).forEach(location => { + if (!collRes.account[account].locations[location]) { + collRes.account[account].locations[location] = { + ...baseMetricsObject, + }; + } + collRes.account[account].locations[location][targetData] += data.location[location]; + if (!isMPUPart) { + collRes.account[account].locations[location][targetCount]++; + } + collRes.account[account].locations[location].deleteMarkerCount += entry.value.isDeleteMarker ? 1 : 0; }); - // one bucket has only one account - [accountBucket] = Object.keys(collRes.account); - monitoring.objectsCount.inc({ status: 'success' }); - processed++; - }, + }); + // one bucket has only one account + [accountBucket] = Object.keys(collRes.account); + monitoring.objectsCount.inc({ status: 'success' }); + processed++; + }; + + await cursor.forEach( + res => processCursorEntry(res), err => { if (err) { log.error('Error when processing mongo entries', { @@ -342,18 +362,75 @@ class S3UtilsMongoClient extends MongoClientInterface { }); return callback(err); } - const bucketStatus = bucketInfo.getVersioningConfiguration(); - const isVer = (bucketStatus && (bucketStatus.Status === 'Enabled' - || bucketStatus.Status === 'Suspended')); const retResult = this._handleResults(collRes, isVer); retResult.stalled = stalledCount; return callback(null, retResult); }, ); - const bucketStatus = bucketInfo.getVersioningConfiguration(); - const isVer = (bucketStatus && (bucketStatus.Status === 'Enabled' - || bucketStatus.Status === 'Suspended')); + const mpuBucket = `${constants.mpuBucketPrefix}${bucketName}`; + const collectionMpu = this.getCollection(mpuBucket); + cursorMpuBucket = collectionMpu.find({}); + + // MPU entries from the mpu shadow bucket must be considered + // as part of the current metrics. + await cursorMpuBucket.forEach( + res => { + if (res._id.startsWith(`overview${constants.splitter}`)) { + // For overview keys, only consider the number of object + return processCursorEntry({ + _id: res._id, + value: { + 'replicationInfo': { + status: '', + backends: [], + }, + 'dataStoreName': res.value.dataStoreName, + 'content-length': 0, + 'versionId': null, + 'owner-id': res.value['owner-id'], + 'isDeleteMarker': false, + 'isNull': false, + 'archive': null, + 'x-amz-storage-class': 'STANDARD', + 'isPHD': false, + }, + }, false, true); + } + return processCursorEntry({ + _id: res._id, + value: { + 'replicationInfo': { + status: '', + backends: [], + }, + 'dataStoreName': res.value.partLocations[0].dataStoreName, + 'content-length': res.value['content-length'], + 'versionId': null, + 'owner-id': res.value['owner-id'], + 'isDeleteMarker': false, + 'isNull': false, + 'archive': null, + 'x-amz-storage-class': 'STANDARD', + 'isPHD': false, + }, + }, true); + }, + err => { + if (err) { + log.error('Error when processing mongo mpu entries', { + method: 'getObjectMDStats', + errDetails: { ...err }, + errorString: err.toString(), + }); + return callback(err); + } + const retResult = this._handleResults(collRes, isVer); + retResult.stalled = stalledCount; + return callback(null, retResult); + }, + ); + const retResult = this._handleResults(collRes, isVer); retResult.stalled = stalledCount; @@ -516,6 +593,7 @@ class S3UtilsMongoClient extends MongoClientInterface { _currentRestoring: 0, _nonCurrentRestored: 0, _nonCurrentRestoring: 0, + _incompleteMPUParts: 0, }, objectCount: { current: 0, @@ -526,6 +604,7 @@ class S3UtilsMongoClient extends MongoClientInterface { _currentRestoring: 0, _nonCurrentRestored: 0, _nonCurrentRestoring: 0, + _incompleteMPUParts: 0, deleteMarker: 0, }, }; @@ -559,16 +638,20 @@ class S3UtilsMongoClient extends MongoClientInterface { versionCountRestored = 0, versionDataRestored = 0, deleteMarkerCountRestored = 0, + mpuPartCount = 0, + mpuPartsData = 0, } = res[metricLevel][resourceName]; dataMetrics[metricLevel][resourceName].usedCapacity.current += nullData + masterData; dataMetrics[metricLevel][resourceName].usedCapacity._currentCold += nullDataCold + masterDataCold; dataMetrics[metricLevel][resourceName].usedCapacity._currentRestoring += nullDataRestoring + masterDataRestoring; dataMetrics[metricLevel][resourceName].usedCapacity._currentRestored += nullDataRestored + masterDataRestored; + dataMetrics[metricLevel][resourceName].usedCapacity._incompleteMPUParts += mpuPartsData; dataMetrics[metricLevel][resourceName].objectCount.current += nullCount + masterCount; dataMetrics[metricLevel][resourceName].objectCount._currentCold += nullCountCold + masterCountCold; dataMetrics[metricLevel][resourceName].objectCount._currentRestoring += nullCountRestoring + masterCountRestoring; dataMetrics[metricLevel][resourceName].objectCount._currentRestored += nullCountRestored + masterCountRestored; + dataMetrics[metricLevel][resourceName].objectCount._incompleteMPUParts += mpuPartCount; if (isVersioned) { dataMetrics[metricLevel][resourceName].usedCapacity.nonCurrent @@ -655,6 +738,7 @@ class S3UtilsMongoClient extends MongoClientInterface { _currentRestoring: 0, _nonCurrentRestored: 0, _nonCurrentRestoring: 0, + _incompleteMPUParts: 0, }; } if (!accountLocation.objectCount) { @@ -667,6 +751,7 @@ class S3UtilsMongoClient extends MongoClientInterface { _currentRestoring: 0, _nonCurrentRestored: 0, _nonCurrentRestoring: 0, + _incompleteMPUParts: 0, deleteMarker: 0, }; } @@ -678,6 +763,7 @@ class S3UtilsMongoClient extends MongoClientInterface { accountLocation.usedCapacity._nonCurrentRestoring += dataMetrics.location[location].usedCapacity._nonCurrentRestoring; accountLocation.usedCapacity._currentRestored += dataMetrics.location[location].usedCapacity._currentRestored; accountLocation.usedCapacity._nonCurrentRestored += dataMetrics.location[location].usedCapacity._nonCurrentRestored; + accountLocation.usedCapacity._incompleteMPUParts += dataMetrics.location[location].usedCapacity._incompleteMPUParts; accountLocation.objectCount.current += dataMetrics.location[location].objectCount.current; accountLocation.objectCount.nonCurrent += dataMetrics.location[location].objectCount.nonCurrent; @@ -687,6 +773,7 @@ class S3UtilsMongoClient extends MongoClientInterface { accountLocation.objectCount._nonCurrentRestoring += dataMetrics.location[location].objectCount._nonCurrentRestoring; accountLocation.objectCount._currentRestored += dataMetrics.location[location].objectCount._currentRestored; accountLocation.objectCount._nonCurrentRestored += dataMetrics.location[location].objectCount._nonCurrentRestored; + accountLocation.objectCount._incompleteMPUParts += dataMetrics.location[location].objectCount._incompleteMPUParts; accountLocation.objectCount.deleteMarker += dataMetrics.location[location].objectCount.deleteMarker; });