From 3ea8b812729e4da43eddadf0a6adcbf1648b0f23 Mon Sep 17 00:00:00 2001 From: Taylor McKinnon Date: Tue, 12 Nov 2024 11:10:50 -0800 Subject: [PATCH] impr(S3UTILS-181): Add scuba backend to service-lvl-sidecar --- README.md | 32 ++++++++++++ service-level-sidecar/bucketd.js | 16 ++++++ service-level-sidecar/env.js | 6 +++ service-level-sidecar/report.js | 53 ++++++++++++-------- service-level-sidecar/scuba.js | 85 ++++++++++++++++++++++++++++++++ service-level-sidecar/server.js | 6 +-- service-level-sidecar/warp10.js | 24 +++++++++ 7 files changed, 198 insertions(+), 24 deletions(-) create mode 100644 service-level-sidecar/scuba.js diff --git a/README.md b/README.md index b0f9dee2..ecddd5d7 100644 --- a/README.md +++ b/README.md @@ -1972,6 +1972,9 @@ REST API to provide service level reports for UtapiV2 ## Usage + +**Using warp10 backend** + ```shell docker run -d \ --network=host \ @@ -1979,7 +1982,21 @@ docker run -d \ -e SIDECAR_SCALE_FACTOR=1.4 \ -e SIDECAR_WARP10_NODE="md1-cluster1:4802@127.0.0.1:4802" \ scality/s3utils service-level-sidecar/index.js +``` + +**Using Scuba backend** +```shell +docker run -d \ + --network=host \ + -e SIDECAR_API_KEY=dev_key_change_me \ + -e SIDECAR_SCALE_FACTOR=1.4 \ + -e SIDECAR_ENABLE_SCUBA=t \ + -e SIDECAR_SCUBA_BUCKETD_BOOTSTRAP=127.0.0.1:19000 \ + scality/s3utils service-level-sidecar/index.js +``` +**Example output** +```shell curl -X POST -H "Authorization: Bearer dev_key_change_me" localhost:24742/api/report | jq { "account": [ @@ -2133,6 +2150,21 @@ docker run -d \ scality/s3utils service-level-sidecar/index.js ``` +#### Scuba + +The scuba backend can be enabled by setting `SIDECAR_ENABLE_SCUBA`. +A bucketd bootstrap list can be provided using `SIDECAR_SCUBA_BUCKETD_BOOTSTRAP`. +If a bootstrap list is not provided `127.0.0.1:19000` will be used. +Internal TLS support can be enabled using `SIDECAR_SCUBA_BUCKETD_ENABLE_TLS`. + +```shell +docker run -d \ + --network=host \ + -e SIDECAR_ENABLE_SCUBA=t \ + -e SIDECAR_SCUBA_BUCKETD_BOOTSTRAP=127.0.0.1:19000 \ + scality/s3utils service-level-sidecar/index.js +``` + ### Other Settings - Log level can be set using `SIDECAR_LOG_LEVEL` (defaults to `info`) diff --git a/service-level-sidecar/bucketd.js b/service-level-sidecar/bucketd.js index 654949a2..4e8a82d5 100644 --- a/service-level-sidecar/bucketd.js +++ b/service-level-sidecar/bucketd.js @@ -60,6 +60,22 @@ async function* listBuckets(log) { } } +async function getRaftSessionIds(log) { + return new Promise((resolve, reject) => { + metadata.client.getAllRafts(log.getSerializedUids(), (err, res) => { + if (err) { + log.error('error getting raft session ids', { err }); + return reject(err); + } + + const data = JSON.parse(res); + + return resolve(data.map(raft => `${raft.id}`)); + }, log); + }); +} + module.exports = { listBuckets, + getRaftSessionIds, }; diff --git a/service-level-sidecar/env.js b/service-level-sidecar/env.js index 793e9a4a..8e26251f 100644 --- a/service-level-sidecar/env.js +++ b/service-level-sidecar/env.js @@ -99,6 +99,9 @@ const defaults = { host: 'localhost', port: 8500, }, + enableScuba: false, + scubaBucketd: 'localhost:19000', + scubaBucketdTls: false, }; module.exports = { @@ -123,4 +126,7 @@ module.exports = { readToken: loadFromEnv('WARP10_READ_TOKEN', defaults.warp10.readToken), ...loadFromEnv('WARP10_NODE', defaults.warp10.node, typeCasts.node), }, + enableScuba: loadFromEnv('ENABLE_SCUBA', defaults.enableScuba, typeCasts.bool), + scubaBucketd: loadFromEnv('SCUBA_BUCKETD_BOOTSTRAP', defaults.scubaBucketd), + scubaBucketdTls: loadFromEnv('SCUBA_BUCKETD_ENABLE_TLS', defaults.scubaBucketdTls, typeCasts.bool), }; diff --git a/service-level-sidecar/report.js b/service-level-sidecar/report.js index 3753e364..359fe347 100644 --- a/service-level-sidecar/report.js +++ b/service-level-sidecar/report.js @@ -1,7 +1,11 @@ const async = require('async'); const util = require('util'); +const arsenal = require('arsenal'); +const { splitter } = arsenal.constants; + const bucketd = require('./bucketd'); +const scuba = require('./scuba'); const env = require('./env'); const warp10 = require('./warp10'); const { getAccountIdForCanonicalId } = require('./vault'); @@ -43,34 +47,35 @@ class MetricReport { /** - * + * @param {Array} logIds - raft session ids to retrieve metrics for (only contains info if scuba backend is enabled) * @param {integer} timestamp - timestamp to retrieve metrics in microseconds * @param {string} bucket - bucket name to retrieve metrics for * @param {object} log - werelogs logger instance * @returns {object} - object count and bytes stored for bucket */ -async function getMetricsForBucket(timestamp, bucket, log) { - log.debug('getting metrics for bucket', { bucket, timestamp }); - const params = { - params: { - end: timestamp, - labels: { bck: bucket }, - node: env.warp10.nodeId, - }, - macro: 'utapi/getMetricsAt', - }; +async function getMetricsForBucket(logIds, timestamp, bucket, log) { + log.debug('getting metrics for bucket', { bucket: bucket.name, timestamp }); + + if (env.enableScuba) { + const resourceName = `${bucket.account}${splitter}${bucket.name}`; + const logResults = await util.promisify(async.mapLimit)(logIds, env.concurrencyLimit, async logId => { + try { + return await scuba.getMetrics('bucket', resourceName, logId, new Date(timestamp), log); + } catch (err) { + log.error('error getting metrics for bucket', { bucket: bucket.name, logId, errmsg: err.message }); + throw err; + } + }); - const resp = await warp10.exec(params); + return logResults.filter(result => result !== null).reduce((acc, result) => { + acc.count += result.value.metrics.objectsTotal; + acc.bytes += result.value.metrics.bytesTotal; + return acc; + }, { count: 0, bytes: 0 }); - if (resp.result.length === 0) { - log.error('unable to retrieve metrics', { bucket }); - throw new Error('Error retrieving metrics'); + } else { + return warp10.getMetricsForBucket(timestamp, bucket.name, log); } - - return { - count: resp.result[0].objD, - bytes: resp.result[0].sizeD, - }; } /** @@ -92,10 +97,16 @@ async function getServiceReport(timestamp, log) { const bucketReports = {}; const accountInfoCache = {}; + let logIds = []; + if (env.enableScuba) { + logIds = await bucketd.getRaftSessionIds(log); + logIds = logIds.filter(id => id !== '0'); + } + for await (const buckets of bucketd.listBuckets(log)) { log.debug('got response from bucketd', { numBuckets: buckets.length }); await util.promisify(async.eachLimit)(buckets, env.concurrencyLimit, async bucket => { - const metrics = await getMetricsForBucket(timestamp, bucket.name, log); + const metrics = await getMetricsForBucket(logIds, timestamp, bucket, log); log.debug('fetched metrics for bucket', { bucket: bucket.name, accCanonicalId: bucket.account }); diff --git a/service-level-sidecar/scuba.js b/service-level-sidecar/scuba.js new file mode 100644 index 00000000..a0ebd711 --- /dev/null +++ b/service-level-sidecar/scuba.js @@ -0,0 +1,85 @@ +const arsenal = require('arsenal'); +const bucketclient = require('bucketclient'); + +const { BucketClientInterface } = arsenal.storage.metadata.bucketclient; +const { splitter } = arsenal.constants; + +const rootLogger = require('./log'); +const env = require('./env'); +const utils = require('./utils'); + +const params = { + bucketdBootstrap: [env.scubaBucketd], + https: env.scubaBucketdTls ? env.tls.certs : undefined, +}; + +const metadata = new BucketClientInterface(params, bucketclient, rootLogger); + +const listObjects = utils.retryable(metadata.listObject.bind(metadata)); + + +/** + * Left-pad a string representation of a value with a given template. + * For example: pad('foo', '00000') gives '00foo'. + * + * @param {any} value - value to pad + * @param {string} template - padding template + * @returns {string} - padded string + */ +function padLeft(value, template) { + return `${template}${value}`.slice(-template.length); +} + +function roundToDay(timestamp) { + return new Date( + Date.UTC(timestamp.getUTCFullYear(), timestamp.getUTCMonth(), timestamp.getUTCDate(), 23, 59, 59, 999), + ); +} + +const LENGTH_TS = 14; +const MAX_TS = 10 ** LENGTH_TS - 1; // good until 16 Nov 5138 +const TEMPLATE_TS = new Array(LENGTH_TS + 1).join('0'); + +function formatMetricsKey(resourceName, timestamp) { + const ts = padLeft(MAX_TS - roundToDay(timestamp).getTime(), TEMPLATE_TS); + return `${resourceName}/${ts}`; + +} + +async function getMetrics(classType, resourceName, logId, timestamp, log) { + const listingParams = { + maxKeys: 1, + listingType: 'Basic', + gte: formatMetricsKey(resourceName, timestamp), + lte: `${resourceName}/${padLeft(MAX_TS, TEMPLATE_TS)}`, + }; + + const bucket = `${classType}${splitter}${logId}`; + console.dir({ + listingParams, + bucket, + }, { depth: null }); + + try { + const resp = await listObjects(bucket, listingParams, log); + if (resp.length === 0) { + return null; + } + + const { key, value } = resp[0]; + return { + key, + value: JSON.parse(value), + }; + } catch (error) { + if (error.NoSuchBucket) { + return null; + } + log.error('Error during listing', { error }); + throw error; + } +} + +module.exports = { + getMetrics, +}; diff --git a/service-level-sidecar/server.js b/service-level-sidecar/server.js index 1e4e068a..a1da4c4e 100644 --- a/service-level-sidecar/server.js +++ b/service-level-sidecar/server.js @@ -63,7 +63,7 @@ function finishMiddleware(req, res) { } // Hash our api key once for reuse during the request -// We prepend `Bearer ` to avoid having to strip it from the header value runtime for comparison +// We prepend `Bearer ` to avoid having to strip it from the header value at runtime for comparison const actualKeyHash = crypto.createHash('sha512').copy().update(`Bearer ${env.apiKey}`).digest(); // Any handler mounted under `/api/` requires an Authorization header @@ -101,7 +101,7 @@ app.post('/api/report', (req, res, next) => { const timestamp = Date.now() * 1000; getServiceReportCb(timestamp, req.log, (err, report) => { if (err) { - req.log.error('error generating metrics report', { error: err }); + req.log.error('error generating metrics report', { errmsg: err.message }); next(makeError(500, 'Internal Server Error')); return; } @@ -119,7 +119,7 @@ app.use((req, res, next) => { // Catch all error handler app.use((err, req, res, next) => { - req.log.error('error during request', { error: err }); + req.log.error('error during request', { errmsg: err.message }); // Default errors to `500 Internal Server Error` in case something unexpected slips through const data = { code: err.status || 500, diff --git a/service-level-sidecar/warp10.js b/service-level-sidecar/warp10.js index be8048af..f6cf41f3 100644 --- a/service-level-sidecar/warp10.js +++ b/service-level-sidecar/warp10.js @@ -41,6 +41,30 @@ class Warp10Client { log.debug('warpscript executed', { ...params, stats: resp.meta }); return resp; } + + async getMetricsForBucket(timestamp, bucket, log) { + log.debug('getting metrics for bucket', { bucket, timestamp }); + const params = { + params: { + end: timestamp, + labels: { bck: bucket }, + node: this.nodeId, + }, + macro: 'utapi/getMetricsAt', + }; + + const resp = await this.exec(params); + + if (resp.result.length === 0) { + log.error('unable to retrieve metrics', { bucket }); + throw new Error('Error retrieving metrics'); + } + + return { + count: resp.result[0].objD, + bytes: resp.result[0].sizeD, + }; + } }