Skip to content

Commit

Permalink
Add scuba backend
Browse files Browse the repository at this point in the history
  • Loading branch information
tmacro committed Nov 12, 2024
1 parent 3bf8eca commit b4f49d8
Show file tree
Hide file tree
Showing 6 changed files with 166 additions and 24 deletions.
16 changes: 16 additions & 0 deletions service-level-sidecar/bucketd.js
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,22 @@ async function* listBuckets(log) {
}
}

async function getRaftSessionIds(log) {
return new Promise((resolve, reject) => {
metadata.client.getAllRafts(log.getSerializedUids(), (err, res) => {
if (err) {
log.error('error getting raft session ids', { err });
return reject(err);
}

const data = JSON.parse(res);

return resolve(data.map(raft => `${raft.id}`));
}, log);
});
}

module.exports = {
listBuckets,
getRaftSessionIds,
};
6 changes: 6 additions & 0 deletions service-level-sidecar/env.js
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,9 @@ const defaults = {
host: 'localhost',
port: 8500,
},
enableScuba: false,
scubaBucketd: 'localhost:19000',
scubaBucketdTls: false,
};

module.exports = {
Expand All @@ -123,4 +126,7 @@ module.exports = {
readToken: loadFromEnv('WARP10_READ_TOKEN', defaults.warp10.readToken),
...loadFromEnv('WARP10_NODE', defaults.warp10.node, typeCasts.node),
},
enableScuba: loadFromEnv('ENABLE_SCUBA', defaults.enableScuba, typeCasts.bool),
scubaBucketd: loadFromEnv('SCUBA_BUCKETD_BOOTSTRAP', defaults.scubaBucketd),
scubaBucketdTls: loadFromEnv('SCUBA_BUCKETD_ENABLE_TLS', defaults.scubaBucketdTls, typeCasts.bool),
};
53 changes: 32 additions & 21 deletions service-level-sidecar/report.js
Original file line number Diff line number Diff line change
@@ -1,7 +1,11 @@
const async = require('async');
const util = require('util');

const arsenal = require('arsenal');

Check failure on line 4 in service-level-sidecar/report.js

View workflow job for this annotation

GitHub Actions / tests

Expected 1 empty line after require statement not followed by another require
const { splitter } = arsenal.constants;

const bucketd = require('./bucketd');
const scuba = require('./scuba');
const env = require('./env');
const warp10 = require('./warp10');
const { getAccountIdForCanonicalId } = require('./vault');
Expand Down Expand Up @@ -43,34 +47,35 @@ class MetricReport {


/**
*
* @param {Array<Number>} logIds - raft session ids to retrieve metrics for (only contains info if scuba backend is enabled)
* @param {integer} timestamp - timestamp to retrieve metrics in microseconds
* @param {string} bucket - bucket name to retrieve metrics for
* @param {object} log - werelogs logger instance
* @returns {object} - object count and bytes stored for bucket
*/
async function getMetricsForBucket(timestamp, bucket, log) {
log.debug('getting metrics for bucket', { bucket, timestamp });
const params = {
params: {
end: timestamp,
labels: { bck: bucket },
node: env.warp10.nodeId,
},
macro: 'utapi/getMetricsAt',
};
async function getMetricsForBucket(logIds, timestamp, bucket, log) {
log.debug('getting metrics for bucket', { bucket: bucket.name, timestamp });

if (env.enableScuba) {
const resourceName = `${bucket.account}${splitter}${bucket.name}`;
const logResults = await util.promisify(async.mapLimit)(logIds, env.concurrencyLimit, async logId => {
try {
return await scuba.getMetrics('bucket', resourceName, logId, new Date(timestamp), log);
} catch (err) {
log.error('error getting metrics for bucket', { bucket: bucket.name, logId, errmsg: err.message });
throw err;
}
});

const resp = await warp10.exec(params);
return logResults.filter(result => result !== null).reduce((acc, result) => {
acc.count += result.value.metrics.objectsTotal;

Check warning on line 71 in service-level-sidecar/report.js

View workflow job for this annotation

GitHub Actions / tests

Assignment to property of function parameter 'acc'
acc.bytes += result.value.metrics.bytesTotal;

Check warning on line 72 in service-level-sidecar/report.js

View workflow job for this annotation

GitHub Actions / tests

Assignment to property of function parameter 'acc'
return acc;
}, { count: 0, bytes: 0 });

Check failure on line 74 in service-level-sidecar/report.js

View workflow job for this annotation

GitHub Actions / tests

Block must not be padded by blank lines

if (resp.result.length === 0) {
log.error('unable to retrieve metrics', { bucket });
throw new Error('Error retrieving metrics');
} else {

Check failure on line 76 in service-level-sidecar/report.js

View workflow job for this annotation

GitHub Actions / tests

Unnecessary 'else' after 'return'
return warp10.getMetricsForBucket(timestamp, bucket.name, log);
}

return {
count: resp.result[0].objD,
bytes: resp.result[0].sizeD,
};
}

/**
Expand All @@ -92,10 +97,16 @@ async function getServiceReport(timestamp, log) {
const bucketReports = {};
const accountInfoCache = {};

let logIds = [];
if (env.enableScuba) {
logIds = await bucketd.getRaftSessionIds(log);
logIds = logIds.filter(id => id !== '0');
}

for await (const buckets of bucketd.listBuckets(log)) {
log.debug('got response from bucketd', { numBuckets: buckets.length });
await util.promisify(async.eachLimit)(buckets, env.concurrencyLimit, async bucket => {
const metrics = await getMetricsForBucket(timestamp, bucket.name, log);
const metrics = await getMetricsForBucket(logIds, timestamp, bucket, log);

log.debug('fetched metrics for bucket', { bucket: bucket.name, accCanonicalId: bucket.account });

Expand Down
85 changes: 85 additions & 0 deletions service-level-sidecar/scuba.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
const arsenal = require('arsenal');
const bucketclient = require('bucketclient');

const { BucketClientInterface } = arsenal.storage.metadata.bucketclient;
const { splitter } = arsenal.constants;

const rootLogger = require('./log');
const env = require('./env');
const utils = require('./utils');

const params = {
bucketdBootstrap: [env.scubaBucketd],
https: env.scubaBucketdTls ? env.tls.certs : undefined,
};

const metadata = new BucketClientInterface(params, bucketclient, rootLogger);

const listObjects = utils.retryable(metadata.listObject.bind(metadata));


/**
* Left-pad a string representation of a value with a given template.
* For example: pad('foo', '00000') gives '00foo'.
*
* @param {any} value - value to pad
* @param {string} template - padding template
* @returns {string} - padded string
*/
function padLeft(value, template) {
return `${template}${value}`.slice(-template.length);
}

function roundToDay(timestamp) {
return new Date(
Date.UTC(timestamp.getUTCFullYear(), timestamp.getUTCMonth(), timestamp.getUTCDate(), 23, 59, 59, 999),
);
}

const LENGTH_TS = 14;
const MAX_TS = 10 ** LENGTH_TS - 1; // good until 16 Nov 5138
const TEMPLATE_TS = new Array(LENGTH_TS + 1).join('0');

function formatMetricsKey(resourceName, timestamp) {
const ts = padLeft(MAX_TS - roundToDay(timestamp).getTime(), TEMPLATE_TS);
return `${resourceName}/${ts}`;

Check failure on line 45 in service-level-sidecar/scuba.js

View workflow job for this annotation

GitHub Actions / tests

Block must not be padded by blank lines

}

async function getMetrics(classType, resourceName, logId, timestamp, log) {
const listingParams = {
maxKeys: 1,
listingType: 'Basic',
gte: formatMetricsKey(resourceName, timestamp),
lte: `${resourceName}/${padLeft(MAX_TS, TEMPLATE_TS)}`,
};

const bucket = `${classType}${splitter}${logId}`;
console.dir({

Check failure on line 58 in service-level-sidecar/scuba.js

View workflow job for this annotation

GitHub Actions / tests

Unexpected console statement
listingParams,
bucket,
}, { depth: null });

try {
const resp = await listObjects(bucket, listingParams, log);
if (resp.length === 0) {
return null;
}

const { key, value } = resp[0];
return {
key,
value: JSON.parse(value),
};
} catch (error) {
if (error.NoSuchBucket) {
return null;
}
log.error('Error during listing', { error });
throw error;
}
}

module.exports = {
getMetrics,
};
6 changes: 3 additions & 3 deletions service-level-sidecar/server.js
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ function finishMiddleware(req, res) {
}

// Hash our api key once for reuse during the request
// We prepend `Bearer ` to avoid having to strip it from the header value runtime for comparison
// We prepend `Bearer ` to avoid having to strip it from the header value at runtime for comparison
const actualKeyHash = crypto.createHash('sha512').copy().update(`Bearer ${env.apiKey}`).digest();

// Any handler mounted under `/api/` requires an Authorization header
Expand Down Expand Up @@ -101,7 +101,7 @@ app.post('/api/report', (req, res, next) => {
const timestamp = Date.now() * 1000;
getServiceReportCb(timestamp, req.log, (err, report) => {
if (err) {
req.log.error('error generating metrics report', { error: err });
req.log.error('error generating metrics report', { errmsg: err.message });
next(makeError(500, 'Internal Server Error'));
return;
}
Expand All @@ -119,7 +119,7 @@ app.use((req, res, next) => {

// Catch all error handler
app.use((err, req, res, next) => {
req.log.error('error during request', { error: err });
req.log.error('error during request', { errmsg: err.message });
// Default errors to `500 Internal Server Error` in case something unexpected slips through
const data = {
code: err.status || 500,
Expand Down
24 changes: 24 additions & 0 deletions service-level-sidecar/warp10.js
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,30 @@ class Warp10Client {
log.debug('warpscript executed', { ...params, stats: resp.meta });
return resp;
}

async getMetricsForBucket(timestamp, bucket, log) {
log.debug('getting metrics for bucket', { bucket, timestamp });
const params = {
params: {
end: timestamp,
labels: { bck: bucket },
node: this.nodeId,
},
macro: 'utapi/getMetricsAt',
};

const resp = await this.exec(params);

if (resp.result.length === 0) {
log.error('unable to retrieve metrics', { bucket });
throw new Error('Error retrieving metrics');
}

return {
count: resp.result[0].objD,
bytes: resp.result[0].sizeD,
};
}
}


Expand Down

0 comments on commit b4f49d8

Please sign in to comment.