Skip to content

Commit

Permalink
S3UTILS-163: adapt inflight support
Browse files Browse the repository at this point in the history
We want to compute the diff between the inflight for each bucket
at the beginning of its scan with the inflight at the end of the
count-items job. this ensure that we do not lose any information
between the start of a scan and the end of count-items, that would
cause quotas to be exceeded as inflights are lost.

The limitation of this implementation comes from the fact that
inflights are not created in buckets when there is only quotas on
Accounts. This is something we will improve on Scuba:Cloudserver
side; so the introduced implementation will be future-proof.
  • Loading branch information
williamlardier committed May 17, 2024
1 parent bc28c67 commit 3fa48b9
Show file tree
Hide file tree
Showing 2 changed files with 139 additions and 3 deletions.
9 changes: 7 additions & 2 deletions CountItems/utils/utils.js
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ function consolidateDataMetrics(target, source) {
_currentRestoring: 0,
_nonCurrentRestored: 0,
_nonCurrentRestoring: 0,
_inflightsPreScan: 0,
},
});
}
Expand All @@ -38,7 +39,7 @@ function consolidateDataMetrics(target, source) {
if (!source) {
return resTarget;
}
const { usedCapacity, objectCount } = source;
const { usedCapacity, objectCount, accountOwnerID } = source;
resTarget.usedCapacity.current += usedCapacity && usedCapacity.current ? usedCapacity.current : 0;
resTarget.usedCapacity.nonCurrent += usedCapacity && usedCapacity.nonCurrent ? usedCapacity.nonCurrent : 0;
resTarget.usedCapacity._currentCold += usedCapacity && usedCapacity._currentCold ? usedCapacity._currentCold : 0;
Expand All @@ -58,7 +59,11 @@ function consolidateDataMetrics(target, source) {
resTarget.objectCount._nonCurrentRestoring += objectCount && objectCount._nonCurrentRestoring ? objectCount._nonCurrentRestoring : 0;
resTarget.objectCount._nonCurrentRestored += objectCount && objectCount._nonCurrentRestored ? objectCount._nonCurrentRestored : 0;

// Current and NonCurrent are the total of all other metrics
resTarget.usedCapacity._inflightsPreScan += usedCapacity && usedCapacity._inflightsPreScan ? usedCapacity._inflightsPreScan : 0;
if (accountOwnerID) {
resTarget.accountOwnerID = accountOwnerID;
}

resTarget.usedCapacity.current += usedCapacity
? usedCapacity._currentCold + usedCapacity._currentRestored + usedCapacity._currentRestoring : 0;
resTarget.usedCapacity.nonCurrent += usedCapacity
Expand Down
133 changes: 132 additions & 1 deletion utils/S3UtilsMongoClient.js
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,89 @@ class S3UtilsMongoClient extends MongoClientInterface {
}
}

async updateInflightDeltas(allMetrics, log) {
let cursor;
try {
if (!allMetrics || !Array.isArray(allMetrics) || allMetrics.length === 0) {
return allMetrics;
}

cursor = await this.getCollection(INFOSTORE).find({}, {
projection: {
'usedCapacity._inflight': 1,
},
});

const inflights = await cursor.toArray();
// convert inflights to a map with _id: usedCapacity._inflight
const inflightsMap = inflights.reduce((map, obj) => {
const inflightLong = obj.usedCapacity && obj.usedCapacity._inflight ? Long.fromNumber(Number(obj.usedCapacity._inflight)) : Long.fromNumber(0);
return {
...map,
[obj._id]: inflightLong,
};
}, {});

const accountInflights = {};
allMetrics.forEach(entry => {
const id = entry._id;
if (id.startsWith('bucket_')) {
const inflightDocument = inflightsMap[id];
const inflight = inflightDocument && inflightDocument.usedCapacity ? Math.max(0, inflightDocument - entry.usedCapacity._inflightsPreScan) : 0;
if (inflight) {
const inflightLong = Long.fromNumber(Number(inflight));
// Inflights remaining after the scan are part of the "current" bytes,
// and stored in _inflightsDelta
// eslint-disable-next-line no-param-reassign
entry.usedCapacity.current = Long.fromNumber(Number(entry.usedCapacity.current)).add(inflightLong);
// eslint-disable-next-line no-param-reassign
entry.usedCapacity._inflightsDelta = inflightLong;
const accountOwnerId = `account_${entry.accountOwnerID}`;
if (accountInflights[accountOwnerId]) {
accountInflights[accountOwnerId] = Long.fromNumber(Number(accountInflights[accountOwnerId])).add(inflightLong);
} else {
accountInflights[accountOwnerId] = inflightLong;
}
// eslint-disable-next-line no-param-reassign
delete entry.usedCapacity._inflightsPreScan;
// eslint-disable-next-line no-param-reassign
delete entry.accountOwnerID;
}
}
});

allMetrics.forEach(entry => {
const id = entry._id;
if (id.startsWith('account_')) {
if (accountInflights[id]) {
// Inflights remaining after the scan are part of the "current" bytes,
// and stored in _inflightsDelta
// eslint-disable-next-line no-param-reassign
entry.usedCapacity.current = Long.fromNumber(Number(entry.usedCapacity.current)).add(accountInflights[id]);
// eslint-disable-next-line no-param-reassign
entry.usedCapacity._inflightsDelta = accountInflights[id];
}
}
});

return allMetrics;
} catch (err) {
log.error('An error occurred', {
method: 'updateInflightDeltas',
errDetails: { ...err },
errorString: err.toString(),
});
return allMetrics;
} finally {
if (cursor && !cursor.closed) {
log.info('Finished processing cursor', {
method: 'updateInflightDeltas',
});
cursor.close();
}
}
}

async getObjectMDStats(bucketName, bucketInfo, isTransient, log, callback) {
let cursor;
try {
Expand All @@ -100,6 +183,9 @@ class S3UtilsMongoClient extends MongoClientInterface {
account: {}, // account level metrics
};
let stalledCount = 0;
let bucketKey;
let inflightsPreScan = 0;
let accountBucket;
const cmpDate = new Date();
cmpDate.setHours(cmpDate.getHours() - 1);

Expand All @@ -111,6 +197,15 @@ class S3UtilsMongoClient extends MongoClientInterface {
return callback(errors.InternalError);
}

const bucketEntry = usersBucketCreationDatesMap[`${bucketInfo.getOwner()}${constants.splitter}${bucketName}`];
console.log('hihi', `${bucketInfo.getOwner()}${constants.splitter}${bucketName}`)
if (bucketEntry) {
bucketKey = `bucket_${bucketName}_${new Date(usersBucketCreationDatesMap[bucketEntry]).getTime()}`;
if (bucketKey) {
inflightsPreScan = await this.readStorageConsumptionInflights(bucketKey, log);
}
}

let startCursorDate = new Date();
let processed = 0;
await cursor.forEach(
Expand Down Expand Up @@ -234,6 +329,8 @@ class S3UtilsMongoClient extends MongoClientInterface {
collRes.account[account].locations[location].deleteMarkerCount += res.value.isDeleteMarker ? 1 : 0;
});
});
// one bucket has only one account
[accountBucket] = Object.keys(collRes.account);
monitoring.objectsCount.inc({ status: 'success' });
processed++;
},
Expand Down Expand Up @@ -261,6 +358,19 @@ class S3UtilsMongoClient extends MongoClientInterface {
const retResult = this._handleResults(collRes, isVer);
retResult.stalled = stalledCount;

if (inflightsPreScan > 0 && retResult && retResult.dataMetrics) {
Object.keys(retResult.dataMetrics.bucket).forEach(key => {
retResult.dataMetrics.bucket[key] = {
...retResult.dataMetrics.bucket[key],
usedCapacity: {
...retResult.dataMetrics.bucket[key].usedCapacity,
_inflightsPreScan: inflightsPreScan,
},
};
retResult.dataMetrics.bucket[key].accountOwnerID = accountBucket;
});
}

return callback(null, retResult);
} catch (err) {
log.error('An error occurred', {
Expand Down Expand Up @@ -654,7 +764,7 @@ class S3UtilsMongoClient extends MongoClientInterface {

async updateStorageConsumptionMetrics(countItems, dataMetrics, log, cb) {
try {
const updatedStorageMetricsList = [
let updatedStorageMetricsList = [
{ _id: __COUNT_ITEMS, value: countItems },
// iterate every resource through dataMetrics and add to updatedStorageMetricsList
...Object.entries(dataMetrics)
Expand All @@ -668,6 +778,9 @@ class S3UtilsMongoClient extends MongoClientInterface {
];
log.info('updateStorageConsumptionMetrics: updating storage metrics');

// update the inflights
updatedStorageMetricsList = await this.updateInflightDeltas(updatedStorageMetricsList, log);

// Drop the temporary collection if it exists
try {
await this.getCollection(INFOSTORE_TMP).drop();
Expand Down Expand Up @@ -711,6 +824,24 @@ class S3UtilsMongoClient extends MongoClientInterface {
}
}

async readStorageConsumptionInflights(entityName, log) {
try {
const i = this.getCollection(INFOSTORE);
const doc = await i.findOne({ _id: entityName });
if (!doc || !doc.usedCapacity || !doc.usedCapacity._inflight) {
return 0;
}
return doc.usedCapacity._inflight;
} catch (err) {
log.error('readStorageConsumptionInflights: error reading metrics', {
error: err,
errDetails: { ...err },
errorString: err.toString(),
});
return 0;
}
}

/*
* Overwrite the getBucketInfos method to specially handle the cases that
* bucket collection exists but bucket is not in metastore collection.
Expand Down

0 comments on commit 3fa48b9

Please sign in to comment.