diff --git a/README.md b/README.md index b0f9dee2..7880687e 100644 --- a/README.md +++ b/README.md @@ -1972,6 +1972,9 @@ REST API to provide service level reports for UtapiV2 ## Usage + +**Using Warp 10 backend** + ```shell docker run -d \ --network=host \ @@ -1979,7 +1982,21 @@ docker run -d \ -e SIDECAR_SCALE_FACTOR=1.4 \ -e SIDECAR_WARP10_NODE="md1-cluster1:4802@127.0.0.1:4802" \ scality/s3utils service-level-sidecar/index.js +``` + +**Using Scuba backend** +```shell +docker run -d \ + --network=host \ + -e SIDECAR_API_KEY=dev_key_change_me \ + -e SIDECAR_SCALE_FACTOR=1.4 \ + -e SIDECAR_ENABLE_SCUBA=true \ + -e SIDECAR_SCUBA_BUCKETD_BOOTSTRAP=127.0.0.1:19000 \ + scality/s3utils service-level-sidecar/index.js +``` +**Example output** +```shell curl -X POST -H "Authorization: Bearer dev_key_change_me" localhost:24742/api/report | jq { "account": [ @@ -2120,7 +2137,7 @@ docker run -d \ scality/s3utils service-level-sidecar/index.js ``` -#### Warp10 +#### Warp 10 The Warp 10 address is configured using `SIDECAR_WARP10_NODE`. The Warp 10 `nodeId` must be included (normally matches ansible inventory name plus port ie `md1-cluster1:4802`). @@ -2133,6 +2150,21 @@ docker run -d \ scality/s3utils service-level-sidecar/index.js ``` +#### Scuba + +The scuba backend can be enabled by setting `SIDECAR_ENABLE_SCUBA`. +A bucketd address can be provided using `SIDECAR_SCUBA_BUCKETD_BOOTSTRAP`. +If a bucketd address is not provided `127.0.0.1:19000` will be used. +Internal TLS support can be enabled using `SIDECAR_SCUBA_BUCKETD_ENABLE_TLS`. + +```shell +docker run -d \ + --network=host \ + -e SIDECAR_ENABLE_SCUBA=true \ + -e SIDECAR_SCUBA_BUCKETD_BOOTSTRAP=127.0.0.1:19000 \ + scality/s3utils service-level-sidecar/index.js +``` + ### Other Settings - Log level can be set using `SIDECAR_LOG_LEVEL` (defaults to `info`) diff --git a/package.json b/package.json index 598186c8..eccc2e5d 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "s3utils", - "version": "1.15.4", + "version": "1.15.5", "engines": { "node": ">= 16" }, diff --git a/service-level-sidecar/bucketd.js b/service-level-sidecar/bucketd.js index 654949a2..6e634d46 100644 --- a/service-level-sidecar/bucketd.js +++ b/service-level-sidecar/bucketd.js @@ -17,7 +17,12 @@ const params = { const metadata = new BucketClientInterface(params, bucketclient, rootLogger); -const listObjects = utils.retryable(metadata.listObject.bind(metadata)); +const listObjects = utils.retryable((bucket, params, log, cb) => metadata.listObject(bucket, params, log, (err, res) => { + if (err && err.NoSuchBucket) { + return cb(null, []); + } + return cb(err, res); +})); /** * List all s3 buckets implemented as a async generator @@ -36,6 +41,10 @@ async function* listBuckets(log) { // eslint-disable-next-line no-await-in-loop res = await listObjects(usersBucket, { ...listingParams, gt }, log); } catch (error) { + if (error.NoSuchBucket) { + log.info('no buckets found'); + return; + } log.error('Error during listing', { error }); throw error; } @@ -60,6 +69,22 @@ async function* listBuckets(log) { } } +async function getRaftSessionIds(log) { + return new Promise((resolve, reject) => { + metadata.client.getAllRafts(log.getSerializedUids(), (error, res) => { + if (error) { + log.error('error getting raft session ids', { error }); + return reject(error); + } + + const data = JSON.parse(res); + + return resolve(data.map(raft => `${raft.id}`)); + }, log); + }); +} + module.exports = { listBuckets, + getRaftSessionIds, }; diff --git a/service-level-sidecar/env.js b/service-level-sidecar/env.js index 793e9a4a..8e26251f 100644 --- a/service-level-sidecar/env.js +++ b/service-level-sidecar/env.js @@ -99,6 +99,9 @@ const defaults = { host: 'localhost', port: 8500, }, + enableScuba: false, + scubaBucketd: 'localhost:19000', + scubaBucketdTls: false, }; module.exports = { @@ -123,4 +126,7 @@ module.exports = { readToken: loadFromEnv('WARP10_READ_TOKEN', defaults.warp10.readToken), ...loadFromEnv('WARP10_NODE', defaults.warp10.node, typeCasts.node), }, + enableScuba: loadFromEnv('ENABLE_SCUBA', defaults.enableScuba, typeCasts.bool), + scubaBucketd: loadFromEnv('SCUBA_BUCKETD_BOOTSTRAP', defaults.scubaBucketd), + scubaBucketdTls: loadFromEnv('SCUBA_BUCKETD_ENABLE_TLS', defaults.scubaBucketdTls, typeCasts.bool), }; diff --git a/service-level-sidecar/index.js b/service-level-sidecar/index.js index 4d4c6372..fd3c2436 100644 --- a/service-level-sidecar/index.js +++ b/service-level-sidecar/index.js @@ -30,13 +30,23 @@ function setupSignalHandlers(cleanUpFunc) { startServer(server => { log.info(`server listening on ${env.host}:${env.port}`); + const sockets = new Set(); + server.on('connection', socket => { + sockets.add(socket); + socket.once('close', () => sockets.delete(socket)); + }); + if (env.tls.enabled) { log.info('tls enabled', { apiServer: true, vault: env.vaultTls, bucketd: env.bucketdTls }); } const cleanup = jsutil.once(() => { log.info('server exiting'); - server.close(); + server.close(() => { + sockets.forEach(socket => socket.destroy()); + log.info('server closed'); + process.exit(0); + }); }); setupSignalHandlers(cleanup); diff --git a/service-level-sidecar/report.js b/service-level-sidecar/report.js index 3753e364..9093030b 100644 --- a/service-level-sidecar/report.js +++ b/service-level-sidecar/report.js @@ -1,7 +1,12 @@ const async = require('async'); const util = require('util'); +const arsenal = require('arsenal'); + +const { splitter } = arsenal.constants; + const bucketd = require('./bucketd'); +const scuba = require('./scuba'); const env = require('./env'); const warp10 = require('./warp10'); const { getAccountIdForCanonicalId } = require('./vault'); @@ -43,34 +48,36 @@ class MetricReport { /** - * + * @param {Array} sessionIds - raft session ids to retrieve metrics for (only contains info if scuba backend is enabled) * @param {integer} timestamp - timestamp to retrieve metrics in microseconds - * @param {string} bucket - bucket name to retrieve metrics for + * @param {object} bucket - bucket to retrieve metrics for + * @param {string} bucket.name - bucket name + * @param {string} bucket.account - bucket owner account canonical id * @param {object} log - werelogs logger instance * @returns {object} - object count and bytes stored for bucket */ -async function getMetricsForBucket(timestamp, bucket, log) { - log.debug('getting metrics for bucket', { bucket, timestamp }); - const params = { - params: { - end: timestamp, - labels: { bck: bucket }, - node: env.warp10.nodeId, - }, - macro: 'utapi/getMetricsAt', - }; - - const resp = await warp10.exec(params); +async function getMetricsForBucket(sessionIds, timestamp, bucket, log) { + log.debug('getting metrics for bucket', { bucket: bucket.name, timestamp }); + + if (env.enableScuba) { + const resourceName = `${bucket.account}${splitter}${bucket.name}`; + const logResults = await util.promisify(async.mapLimit)(sessionIds, env.concurrencyLimit, async logId => { + try { + return await scuba.getMetrics('bucket', resourceName, logId, new Date(timestamp), log); + } catch (err) { + log.error('error getting metrics for bucket', { bucket: bucket.name, logId, error: err.message }); + throw err; + } + }); - if (resp.result.length === 0) { - log.error('unable to retrieve metrics', { bucket }); - throw new Error('Error retrieving metrics'); + return logResults + .filter(result => result !== null) + .reduce((acc, result) => ({ + count: acc.count + result.value.metrics.objectsTotal, + bytes: acc.bytes + result.value.metrics.bytesTotal, + }), { count: 0, bytes: 0 }); } - - return { - count: resp.result[0].objD, - bytes: resp.result[0].sizeD, - }; + return warp10.getMetricsForBucket(timestamp, bucket.name, log); } /** @@ -92,10 +99,16 @@ async function getServiceReport(timestamp, log) { const bucketReports = {}; const accountInfoCache = {}; + let sessionIds = []; + if (env.enableScuba) { + sessionIds = await bucketd.getRaftSessionIds(log); + sessionIds = sessionIds.filter(id => id !== '0'); + } + for await (const buckets of bucketd.listBuckets(log)) { log.debug('got response from bucketd', { numBuckets: buckets.length }); await util.promisify(async.eachLimit)(buckets, env.concurrencyLimit, async bucket => { - const metrics = await getMetricsForBucket(timestamp, bucket.name, log); + const metrics = await getMetricsForBucket(sessionIds, timestamp, bucket, log); log.debug('fetched metrics for bucket', { bucket: bucket.name, accCanonicalId: bucket.account }); diff --git a/service-level-sidecar/scuba.js b/service-level-sidecar/scuba.js new file mode 100644 index 00000000..3770616f --- /dev/null +++ b/service-level-sidecar/scuba.js @@ -0,0 +1,68 @@ +const arsenal = require('arsenal'); +const bucketclient = require('bucketclient'); + +const { BucketClientInterface } = arsenal.storage.metadata.bucketclient; +const { splitter } = arsenal.constants; + +const rootLogger = require('./log'); +const env = require('./env'); +const utils = require('./utils'); + +const params = { + bucketdBootstrap: [env.scubaBucketd], + https: env.scubaBucketdTls ? env.tls.certs : undefined, +}; + +const metadata = new BucketClientInterface(params, bucketclient, rootLogger); + +const listObjects = utils.retryable((bucket, params, log, cb) => metadata.listObject(bucket, params, log, (err, res) => { + if (err && err.NoSuchBucket) { + return cb(null, []); + } + return cb(err, res); +})); + +function roundToDay(timestamp) { + return new Date( + Date.UTC(timestamp.getUTCFullYear(), timestamp.getUTCMonth(), timestamp.getUTCDate(), 23, 59, 59, 999), + ); +} + +const LENGTH_TS = 14; +const MAX_TS = parseInt(('9'.repeat(LENGTH_TS)), 10); + +function formatMetricsKey(resourceName, timestamp) { + const ts = (MAX_TS - roundToDay(timestamp).getTime()).toString().padStart(LENGTH_TS, '0'); + return `${resourceName}/${ts}`; +} + +async function getMetrics(classType, resourceName, sessionId, timestamp, log) { + const listingParams = { + maxKeys: 1, + listingType: 'Basic', + gte: formatMetricsKey(resourceName, timestamp), + lte: `${resourceName}/${MAX_TS.toString()}`, + }; + + const bucket = `${classType}${splitter}${sessionId}`; + + try { + const resp = await listObjects(bucket, listingParams, log); + if (resp.length === 0) { + return null; + } + + const { key, value } = resp[0]; + return { + key, + value: JSON.parse(value), + }; + } catch (error) { + log.error('error during metric listing', { bucket, error: error.message }); + throw error; + } +} + +module.exports = { + getMetrics, +}; diff --git a/service-level-sidecar/server.js b/service-level-sidecar/server.js index 1e4e068a..b4dc8468 100644 --- a/service-level-sidecar/server.js +++ b/service-level-sidecar/server.js @@ -63,7 +63,7 @@ function finishMiddleware(req, res) { } // Hash our api key once for reuse during the request -// We prepend `Bearer ` to avoid having to strip it from the header value runtime for comparison +// We prepend `Bearer ` to avoid having to strip it from the header value at runtime for comparison const actualKeyHash = crypto.createHash('sha512').copy().update(`Bearer ${env.apiKey}`).digest(); // Any handler mounted under `/api/` requires an Authorization header @@ -101,7 +101,7 @@ app.post('/api/report', (req, res, next) => { const timestamp = Date.now() * 1000; getServiceReportCb(timestamp, req.log, (err, report) => { if (err) { - req.log.error('error generating metrics report', { error: err }); + req.log.error('error generating metrics report', { error: err.message }); next(makeError(500, 'Internal Server Error')); return; } @@ -119,7 +119,7 @@ app.use((req, res, next) => { // Catch all error handler app.use((err, req, res, next) => { - req.log.error('error during request', { error: err }); + req.log.error('error during request', { error: err.message }); // Default errors to `500 Internal Server Error` in case something unexpected slips through const data = { code: err.status || 500, diff --git a/service-level-sidecar/warp10.js b/service-level-sidecar/warp10.js index be8048af..f6cf41f3 100644 --- a/service-level-sidecar/warp10.js +++ b/service-level-sidecar/warp10.js @@ -41,6 +41,30 @@ class Warp10Client { log.debug('warpscript executed', { ...params, stats: resp.meta }); return resp; } + + async getMetricsForBucket(timestamp, bucket, log) { + log.debug('getting metrics for bucket', { bucket, timestamp }); + const params = { + params: { + end: timestamp, + labels: { bck: bucket }, + node: this.nodeId, + }, + macro: 'utapi/getMetricsAt', + }; + + const resp = await this.exec(params); + + if (resp.result.length === 0) { + log.error('unable to retrieve metrics', { bucket }); + throw new Error('Error retrieving metrics'); + } + + return { + count: resp.result[0].objD, + bytes: resp.result[0].sizeD, + }; + } }