Skip to content

Commit

Permalink
fixups post reviews
Browse files Browse the repository at this point in the history
  • Loading branch information
benzekrimaha committed May 15, 2024
1 parent 8ebe3b3 commit be0f3e1
Show file tree
Hide file tree
Showing 7 changed files with 49 additions and 100 deletions.
24 changes: 2 additions & 22 deletions CountItems/CountManager.js
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,6 @@ class CountManager {
account: {},
};
this.workerList = [];
this.uniqueAccounts = new Set();
this.uniqueLocations = new Set();
this.uniqueBuckets = new Set();
this._setupQueue();
}

Expand All @@ -42,9 +39,7 @@ class CountManager {
const processingStartTime = process.hrtime.bigint();
return this.workers[id].count(bucketInfo, (err, res) => {
const processingDuration = Number(process.hrtime.bigint() - processingStartTime) / 1e9;
monitoring.bucketProcessingDuration.labels({
service: 'countItems',
}).observe(processingDuration);
monitoring.bucketProcessingDuration.observe(processingDuration);
this.log.info('processing a bucket', {
method: 'CountManager::_setupQueue',
workInQueue: this.q.length(),
Expand All @@ -53,7 +48,6 @@ class CountManager {
if (err) {
return done(err);
}
monitoring.bucketCount.inc();
this._consolidateData(res);
this.workerList.push(id);
return done();
Expand Down Expand Up @@ -95,18 +89,6 @@ class CountManager {
// metricLevel can only be 'bucket', 'location' or 'account'
if (validStorageMetricLevels.has(metricLevel)) {
Object.keys(results.dataMetrics[metricLevel]).forEach(resourceName => {
if (metricLevel === 'account' && !this.uniqueAccounts.has(resourceName)) {
this.uniqueAccounts.add(resourceName);
monitoring.metricsCount.inc({ metricLevel: 'account' });
}
if (metricLevel === 'location' && !this.uniqueLocations.has(resourceName)) {
this.uniqueLocations.add(resourceName);
monitoring.metricsCount.inc({ metricLevel: 'location' });
}
if (metricLevel === 'bucket' && !this.uniqueBuckets.has(resourceName)) {
this.uniqueBuckets.add(resourceName);
monitoring.metricsCount.inc({ metricLevel: 'bucket' });
}
// resourceName can be the name of bucket, location or account
this.dataMetrics[metricLevel][resourceName] = consolidateDataMetrics(
this.dataMetrics[metricLevel][resourceName],
Expand Down Expand Up @@ -135,9 +117,7 @@ class CountManager {
this.dataMetrics = results.dataMetrics;
}
const consolidationDurationInS = Number(process.hrtime.bigint() - startTime) / 1e9;
monitoring.consolidationDuration.labels({
service: 'countItems',
}).observe(consolidationDurationInS);
monitoring.consolidationDuration.observe(consolidationDurationInS);
}

setup(callback) {
Expand Down
11 changes: 1 addition & 10 deletions CountItems/CountMaster.js
Original file line number Diff line number Diff line change
@@ -1,13 +1,11 @@
const async = require('async');
const { reshapeExceptionError } = require('arsenal').errorUtils;
const { threshold, frequency } = require('../utils/constants');

class CountMaster {
constructor(params) {
this.log = params.log;
this.manager = params.manager;
this.client = params.client;
this.metrics = params.metrics;
CountMaster.waitingForPromScraping = false;
}

Expand Down Expand Up @@ -37,13 +35,6 @@ class CountMaster {
}
return next();
}),
next => {
this.metrics.start();
this.log.info('metrics server started', {
port: 8003,
});
return next();
},
next => this.manager.setup(next),
next => this.client.getBucketInfos(this.log, (err, bucketList) => {
if (err) {
Expand All @@ -69,7 +60,7 @@ class CountMaster {
return this.stop(null, () => callback(err));
}
CountMaster.waitingForPromScraping = true;
return setTimeout(() => this.stop(null, () => callback()), threshold * frequency * 1000 * 4);
return callback();
});
}
}
Expand Down
22 changes: 16 additions & 6 deletions CountItems/masterProcess.js
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,14 @@ const loggerConfig = {
werelogs.configure(loggerConfig);
const log = new werelogs.Logger('S3Utils::CountItems::Master');

const frequency = process.env.PROMETHEUS_POLLING_FREQUENCY && !Number.isNaN(process.env.PROMETHEUS_POLLING_FREQUENCY)
? Number.parseInt(process.env.PROMETHEUS_POLLING_FREQUENCY, 10)
: 30;

const threshold = process.env.PROMETHEUS_POLLING_THRESHOLD && !Number.isNaN(process.env.PROMETHEUS_POLLING_THRESHOLD)
? Number.parseInt(process.env.PROMETHEUS_POLLING_THRESHOLD, 10)
: 5;

const numWorkers = process.env.NUM_WORKERS && !Number.isNaN(process.env.NUM_WORKERS)
? Number.parseInt(process.env.NUM_WORKERS, 10)
: 4;
Expand All @@ -31,7 +39,6 @@ const concurrentCursors = (process.env.CONCURRENT_CURSORS
? Number.parseInt(process.env.CONCURRENT_CURSORS, 10)
: 5;

const metricServer = new WebServer(8003, log);
const countMaster = new CountMaster({
log,
manager: new CountManager({
Expand All @@ -40,17 +47,18 @@ const countMaster = new CountMaster({
maxConcurrent: concurrentCursors,
}),
client: new S3UtilsMongoClient(createMongoParams(log)),
metrics: metricServer,
});

metricServer.onRequest((req, res) => monitoring.metricsHandler(
countMaster,
const metricServer = new WebServer(8003, log).onRequest((req, res) => monitoring.metricsHandler(
() => {
process.exit(1);
if (CountMaster.waitingForPromScraping === true) {
countMaster.stop(null, () => process.exit(1));
}
},
req,
res,
));
metricServer.start();

const handleSignal = sig => countMaster.stop(sig, () => process.exit(0));
process.on('SIGINT', handleSignal);
Expand All @@ -68,5 +76,7 @@ countMaster.start(err => {
if (err) {
process.exit(1);
}
process.exit(0);
setTimeout(() => {
countMaster.stop(null, () => process.exit(0));
}, threshold * frequency * 1000 * 4);
});
5 changes: 0 additions & 5 deletions tests/functional/countItems.js
Original file line number Diff line number Diff line change
Expand Up @@ -190,15 +190,11 @@ jest.setTimeout(120000);
describe('CountItems', () => {
const oldEnv = process.env;
let client;
let metricsStub;
let setTimeoutSpy;

beforeAll(done => {
process.env = oldEnv;
process.env.MONGODB_DATABASE = dbName;
metricsStub = {
start: sinon.stub(),
};

const opts = {
replicaSetHosts: MONGODB_REPLICASET,
Expand Down Expand Up @@ -241,7 +237,6 @@ describe('CountItems', () => {
maxConcurrent: 5,
}),
client: new S3UtilsMongoClient(createMongoParams(logger)),
metrics: metricsStub,
});

async.series([
Expand Down
6 changes: 3 additions & 3 deletions utils/S3UtilsMongoClient.js
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ class S3UtilsMongoClient extends MongoClientInterface {
entry: res,
error,
});
monitoring.objectsCount.inc({ state: 'error' });
monitoring.objectsCount.inc({ status: 'error' });
return;
}

Expand All @@ -161,7 +161,7 @@ class S3UtilsMongoClient extends MongoClientInterface {
method: 'getObjectMDStats',
entry: res,
});
monitoring.objectsCount.inc({ state: 'skipped' });
monitoring.objectsCount.inc({ status: 'skipped' });
return;
}

Expand Down Expand Up @@ -234,7 +234,7 @@ class S3UtilsMongoClient extends MongoClientInterface {
collRes.account[account].locations[location].deleteMarkerCount += res.value.isDeleteMarker ? 1 : 0;
});
});
monitoring.objectsCount.inc({ state: 'success' });
monitoring.objectsCount.inc({ status: 'success' });
processed++;
},
err => {
Expand Down
2 changes: 0 additions & 2 deletions utils/constants.js
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
module.exports = {
MaxParallelLimit: 10,
frequency: 30,
threshold: 5,
};
79 changes: 27 additions & 52 deletions utils/monitoring.js
Original file line number Diff line number Diff line change
@@ -1,82 +1,59 @@
const { errors } = require('arsenal');
const promClient = require('prom-client');
const { Registry } = require('prom-client');
const { http } = require('httpagent');
const CountMaster = require('../CountItems/CountMaster');

const aggregatorRegistry = new promClient.AggregatorRegistry();
const { collectDefaultMetrics } = promClient;

const bucketCount = (promClient.register.getSingleMetric('bucketCount_metrics'))
|| new promClient.Gauge({
name: 'bucketCount_metrics',
help: 'Total number of buckets processed',
labelNames: ['bucketName'],
});

// Histogram of the bucket processing duration, by the utilization service.
const bucketProcessingDuration = new promClient.Histogram({
name: 's3_countitems_bucket_listing_duration_seconds',
help: 'Bucket processing duration',
buckets: [1, 10, 60, 600, 3600, 18000, 36000],
});

const bucketProcessingDuration = (promClient.register.getSingleMetric('count_items_bucketProcessingDuration'))
|| new promClient.Histogram({
name: 'count_items_bucketProcessingDuration',
help: 'Bucket processing duration',
labelNames: ['service'],
buckets: [1, 10, 60, 600, 3600, 18000, 36000],
});
const consolidationDuration = new promClient.Histogram({
name: 's3_countitems_bucket_merge_duration_seconds',
help: 'Duration of metrics consolidation in seconds',
buckets: [0.01, 0.05, 0.1, 0.2, 0.5, 1, 2, 5, 10],
});

const consolidationDuration = (promClient.register.getSingleMetric('count_items_consolidationDuration'))
|| new promClient.Histogram({
name: 'count_items_consolidationDuration',
help: 'Duration of metrics consolidation in seconds',
labelNames: ['service'],
buckets: [0.01, 0.05, 0.1, 0.2, 0.5, 1, 1.5, 2],
});
const workersCount = new promClient.Counter({
name: 's3_countitems_worker_errors_count',
help: 'Number of errors in counting workers',
labelNames: ['state'],
});

const workersCount = (promClient.register.getSingleMetric('count_items_workersCountErrors'))
|| new promClient.Counter({
name: 'count_items_workersCountErrors',
help: 'Number of errors in counting workers',
labelNames: ['state'],
});
const objectsCount = new promClient.Counter({
name: 's3_countitems_total_objects_count',
help: 'Number of objects processed',
labelNames: ['status'],
});

const objectsCount = (promClient.register.getSingleMetric('count_items_objectsCount'))
|| new promClient.Counter({
name: 'count_items_objectsCount',
help: 'Number of objects processed',
labelNames: ['state'],
});

const metricsCount = (promClient.register.getSingleMetric('count_items_metricsCount'))
|| new promClient.Counter({
name: 'count_items_metricsCount',
help: 'Number of entities with data usage',
labelNames: ['metricLevel'],
});
/**
* @param {http.ServerResponse} res - http response object
* @param {Error | errors.ArsenalError} error - Error
* @return {void}
*/
function _writeResponse(res, error) {
let statusCode = 200;
if (error) {
if (error instanceof errors.ArsenalError && Number.isInteger(error.code)) {
statusCode = error.code;
} else {
statusCode = 500;
}
let statusCode = 500;
if (error instanceof errors.ArsenalError && Number.isInteger(error.code)) {
statusCode = error.code;
}
res.writeHead(statusCode, { 'Content-Type': 'application/json' });
res.end();
}

/**
* @param {number} countMasterInstance - countMaster instance
* @param {function} onScraped - callback to call when metrics are scraped
* @param {http.IncomingMessage} req - http request object
* @param {http.ServerResponse} res - http response object
* @return {void}
*/
async function metricsHandler(countMasterInstance, onScraped, req, res) {
async function metricsHandler(onScraped, req, res) {
if (req.method !== 'GET' || req.url !== '/metrics') {
return _writeResponse(res, errors.MethodNotAllowed);
}
Expand All @@ -94,8 +71,8 @@ async function metricsHandler(countMasterInstance, onScraped, req, res) {
} catch (ex) {
return _writeResponse(res, ex);
} finally {
if (CountMaster.waitingForPromScraping === true) {
countMasterInstance.stop(null, onScraped);
if (onScraped) {
onScraped();
}
}
}
Expand All @@ -104,10 +81,8 @@ module.exports = {
client: promClient,
collectDefaultMetrics,
metricsHandler,
bucketCount,
bucketProcessingDuration,
consolidationDuration,
workersCount,
objectsCount,
metricsCount,
};

0 comments on commit be0f3e1

Please sign in to comment.