Skip to content

Commit

Permalink
Merge branch 'feature/S3UTILS-158/storage-consumption-metrics' into t…
Browse files Browse the repository at this point in the history
…mp/octopus/w/1.15/feature/S3UTILS-158/storage-consumption-metrics
  • Loading branch information
bert-e committed May 16, 2024
2 parents c172d80 + bc28c67 commit e91f150
Show file tree
Hide file tree
Showing 9 changed files with 223 additions and 8 deletions.
7 changes: 7 additions & 0 deletions CountItems/CountManager.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ const async = require('async');
const { once } = require('arsenal').jsutil;
const { validStorageMetricLevels } = require('./utils/constants');
const { consolidateDataMetrics } = require('./utils/utils');
const monitoring = require('../utils/monitoring');

class CountManager {
constructor(params) {
Expand Down Expand Up @@ -35,6 +36,7 @@ class CountManager {
return done(new Error('emptyWorkerList'));
}
const id = this.workerList.shift();
const processingStartTime = process.hrtime.bigint();
return this.workers[id].count(bucketInfo, (err, res) => {
this.log.info('processing a bucket', {
method: 'CountManager::_setupQueue',
Expand All @@ -44,6 +46,8 @@ class CountManager {
if (err) {
return done(err);
}
const processingDuration = Number(process.hrtime.bigint() - processingStartTime) / 1e9;
monitoring.bucketProcessingDuration.observe(processingDuration);
this._consolidateData(res);
this.workerList.push(id);
return done();
Expand All @@ -53,6 +57,7 @@ class CountManager {
}

_consolidateData(results) {
const startTime = process.hrtime.bigint();
if (!results) {
return;
}
Expand Down Expand Up @@ -111,6 +116,8 @@ class CountManager {
} else {
this.dataMetrics = results.dataMetrics;
}
const consolidationDurationInS = Number(process.hrtime.bigint() - startTime) / 1e9;
monitoring.consolidationDuration.observe(consolidationDurationInS);
}

setup(callback) {
Expand Down
2 changes: 1 addition & 1 deletion CountItems/CountMaster.js
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ class CountMaster {
});
return this.stop(null, () => callback(err));
}
return this.stop(null, () => callback());
return callback();
});
}
}
Expand Down
6 changes: 5 additions & 1 deletion CountItems/CountWorker.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
const assert = require('assert');
const async = require('async');
const { BucketInfo } = require('arsenal').models;
const monitoring = require('../utils/monitoring');

class CountWorker {
constructor(params) {
Expand Down Expand Up @@ -35,7 +36,10 @@ class CountWorker {
return async.waterfall([
next => this.client._getIsTransient(bucketInfo, this.log, next),
(isTransient, next) => this.client.getObjectMDStats(bucketName, bucketInfo, isTransient, this.log, next),
], callback);
], (err, results) => {
monitoring.bucketsCount.inc({ status: err ? 'error' : 'success' });
callback(err, results);
});
}

clientTeardown(callback) {
Expand Down
31 changes: 30 additions & 1 deletion CountItems/masterProcess.js
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
const werelogs = require('werelogs');
const { network } = require('arsenal');
const { reshapeExceptionError } = require('arsenal').errorUtils;
const S3UtilsMongoClient = require('../utils/S3UtilsMongoClient');

Expand All @@ -7,6 +8,9 @@ const CountManager = require('./CountManager');
const createMongoParams = require('../utils/createMongoParams');
const createWorkers = require('./utils/createWorkers');

const WebServer = network.http.server;
const monitoring = require('../utils/monitoring');

const logLevel = Number.parseInt(process.env.DEBUG, 10) === 1
? 'debug' : 'info';

Expand All @@ -15,9 +19,20 @@ const loggerConfig = {
dump: 'error',
};

let waitingForPromScraping = false;

werelogs.configure(loggerConfig);
const log = new werelogs.Logger('S3Utils::CountItems::Master');

function tryParseInt(s, defaultValue) {
const v = Number.parseInt(s, 10);
return v > 0 ? v : defaultValue;
}

const prometheusPollingPeriod = tryParseInt(process.env.PROMETHEUS_POLLING_PERIOD, 30);

const prometheusPollingAttempts = tryParseInt(process.env.PROMETHEUS_POLLING_ATTEMPTS, 5);

const numWorkers = process.env.NUM_WORKERS && !Number.isNaN(process.env.NUM_WORKERS)
? Number.parseInt(process.env.NUM_WORKERS, 10)
: 4;
Expand All @@ -37,6 +52,17 @@ const countMaster = new CountMaster({
client: new S3UtilsMongoClient(createMongoParams(log)),
});

const metricServer = new WebServer(8003, log).onRequest((req, res) => monitoring.metricsHandler(
() => {
if (waitingForPromScraping === true) {
countMaster.stop(null, () => process.exit(1));
}
},
req,
res,
));
metricServer.start();

const handleSignal = sig => countMaster.stop(sig, () => process.exit(0));
process.on('SIGINT', handleSignal);
process.on('SIGHUP', handleSignal);
Expand All @@ -53,5 +79,8 @@ countMaster.start(err => {
if (err) {
process.exit(1);
}
process.exit(0);
waitingForPromScraping = true;
setTimeout(() => {
countMaster.stop(null, () => process.exit(0));
}, prometheusPollingAttempts * prometheusPollingPeriod * 1000 * 4);
});
5 changes: 3 additions & 2 deletions package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "s3utils",
"version": "1.14.6",
"version": "1.14.7",
"engines": {
"node": ">= 16"
},
Expand Down Expand Up @@ -57,6 +57,7 @@
"eslint-plugin-import": "^2.20.1",
"eslint-plugin-jest": "^23.6.0",
"jest": "^23.6.0",
"mongodb-memory-server": "^8.10.2"
"mongodb-memory-server": "^8.10.2",
"sinon": "^17.0.1"
}
}
6 changes: 5 additions & 1 deletion tests/functional/countItems.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@ const async = require('async');
const werelogs = require('werelogs');
const { BucketInfo, ObjectMD } = require('arsenal').models;
const { constants } = require('arsenal');
const sinon = require('sinon');
const S3UtilsMongoClient = require('../../utils/S3UtilsMongoClient');

const CountMaster = require('../../CountItems/CountMaster');
const CountManager = require('../../CountItems/CountManager');
const createMongoParams = require('../../utils/createMongoParams');
Expand Down Expand Up @@ -190,6 +190,7 @@ jest.setTimeout(120000);
describe('CountItems', () => {
const oldEnv = process.env;
let client;
let setTimeoutSpy;

beforeAll(done => {
process.env = oldEnv;
Expand All @@ -209,9 +210,12 @@ describe('CountItems', () => {
next => client.setup(next),
next => populateMongo(client, next),
], done);
setTimeoutSpy = jest.spyOn(global, 'setTimeout');
setTimeoutSpy.mockImplementation((callback, delay) => callback());
});

afterAll(done => {
setTimeoutSpy.mockRestore();
async.series([
next => client.db.dropDatabase().then(() => next()).catch(() => next()),
next => client.close(next),
Expand Down
4 changes: 4 additions & 0 deletions utils/S3UtilsMongoClient.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ const { errors, constants } = require('arsenal');
const async = require('async');
const { validStorageMetricLevels } = require('../CountItems/utils/constants');
const getLocationConfig = require('./locationConfig');
const monitoring = require('./monitoring');

const METASTORE = '__metastore';
const INFOSTORE = '__infostore';
Expand Down Expand Up @@ -150,6 +151,7 @@ class S3UtilsMongoClient extends MongoClientInterface {
entry: res,
error,
});
monitoring.objectsCount.inc({ status: 'error' });
return;
}

Expand All @@ -159,6 +161,7 @@ class S3UtilsMongoClient extends MongoClientInterface {
method: 'getObjectMDStats',
entry: res,
});
monitoring.objectsCount.inc({ status: 'skipped' });
return;
}

Expand Down Expand Up @@ -231,6 +234,7 @@ class S3UtilsMongoClient extends MongoClientInterface {
collRes.account[account].locations[location].deleteMarkerCount += res.value.isDeleteMarker ? 1 : 0;
});
});
monitoring.objectsCount.inc({ status: 'success' });
processed++;
},
err => {
Expand Down
88 changes: 88 additions & 0 deletions utils/monitoring.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
const { errors } = require('arsenal');
const promClient = require('prom-client');
const { Registry } = require('prom-client');
const { http } = require('httpagent');
const CountMaster = require('../CountItems/CountMaster');

const aggregatorRegistry = new promClient.AggregatorRegistry();
const { collectDefaultMetrics } = promClient;


// Histogram of the bucket processing duration, by the utilization service.
const bucketProcessingDuration = new promClient.Histogram({
name: 's3_countitems_bucket_listing_duration_seconds',
help: 'Bucket processing duration',
buckets: [1, 10, 60, 600, 3600, 18000, 36000],
});

const consolidationDuration = new promClient.Histogram({
name: 's3_countitems_bucket_merge_duration_seconds',
help: 'Duration of metrics consolidation in seconds',
buckets: [0.01, 0.05, 0.1, 0.2, 0.5, 1, 2, 5, 10],
});

const bucketsCount = new promClient.Counter({
name: 's3_countitems_total_buckets_count',
help: 'Total number of buckets processed',
labelNames: ['status'],
});

const objectsCount = new promClient.Counter({
name: 's3_countitems_total_objects_count',
help: 'Total number of objects processed',
labelNames: ['status'],
});

/**
* @param {http.ServerResponse} res - http response object
* @param {Error | errors.ArsenalError} error - Error
* @return {void}
*/
function _writeResponse(res, error) {
let statusCode = 500;
if (error instanceof errors.ArsenalError && Number.isInteger(error.code)) {
statusCode = error.code;
}
res.writeHead(statusCode, { 'Content-Type': 'application/json' });
res.end();
}

/**
* @param {function} onScraped - callback to call when metrics are scraped
* @param {http.IncomingMessage} req - http request object
* @param {http.ServerResponse} res - http response object
* @return {void}
*/
async function metricsHandler(onScraped, req, res) {
if (req.method !== 'GET' || req.url !== '/metrics') {
return _writeResponse(res, errors.MethodNotAllowed);
}
try {
const [registerMetrics, clusterMetrics] = await Promise.all([
promClient.register.metrics(),
aggregatorRegistry.clusterMetrics(),
]);
const promMetrics = `${registerMetrics}\n${clusterMetrics}`;
const contentLen = Buffer.byteLength(promMetrics, 'utf8');
return res.writeHead(200, {
'content-length': contentLen,
'content-type': promClient.register.contentType,
}).end(promMetrics);
} catch (ex) {
return _writeResponse(res, ex);
} finally {
if (onScraped) {
onScraped();
}
}
}

module.exports = {
client: promClient,
collectDefaultMetrics,
metricsHandler,
bucketProcessingDuration,
consolidationDuration,
bucketsCount,
objectsCount,
};
Loading

0 comments on commit e91f150

Please sign in to comment.