Skip to content

Commit

Permalink
impr(S3UTILS-181): Add scuba backend to service-lvl-sidecar
Browse files Browse the repository at this point in the history
  • Loading branch information
tmacro committed Nov 13, 2024
1 parent 3bf8eca commit 8a5858d
Show file tree
Hide file tree
Showing 7 changed files with 194 additions and 25 deletions.
32 changes: 32 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -1972,14 +1972,31 @@ REST API to provide service level reports for UtapiV2

## Usage


**Using warp10 backend**

```shell
docker run -d \
--network=host \
-e SIDECAR_API_KEY=dev_key_change_me \
-e SIDECAR_SCALE_FACTOR=1.4 \
-e SIDECAR_WARP10_NODE="md1-cluster1:[email protected]:4802" \
scality/s3utils service-level-sidecar/index.js
```

**Using Scuba backend**
```shell
docker run -d \
--network=host \
-e SIDECAR_API_KEY=dev_key_change_me \
-e SIDECAR_SCALE_FACTOR=1.4 \
-e SIDECAR_ENABLE_SCUBA=t \
-e SIDECAR_SCUBA_BUCKETD_BOOTSTRAP=127.0.0.1:19000 \
scality/s3utils service-level-sidecar/index.js
```

**Example output**
```shell
curl -X POST -H "Authorization: Bearer dev_key_change_me" localhost:24742/api/report | jq
{
"account": [
Expand Down Expand Up @@ -2133,6 +2150,21 @@ docker run -d \
scality/s3utils service-level-sidecar/index.js
```

#### Scuba

The scuba backend can be enabled by setting `SIDECAR_ENABLE_SCUBA`.
A bucketd bootstrap list can be provided using `SIDECAR_SCUBA_BUCKETD_BOOTSTRAP`.
If a bootstrap list is not provided `127.0.0.1:19000` will be used.
Internal TLS support can be enabled using `SIDECAR_SCUBA_BUCKETD_ENABLE_TLS`.

```shell
docker run -d \
--network=host \
-e SIDECAR_ENABLE_SCUBA=t \
-e SIDECAR_SCUBA_BUCKETD_BOOTSTRAP=127.0.0.1:19000 \
scality/s3utils service-level-sidecar/index.js
```

### Other Settings

- Log level can be set using `SIDECAR_LOG_LEVEL` (defaults to `info`)
Expand Down
16 changes: 16 additions & 0 deletions service-level-sidecar/bucketd.js
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,22 @@ async function* listBuckets(log) {
}
}

async function getRaftSessionIds(log) {
return new Promise((resolve, reject) => {
metadata.client.getAllRafts(log.getSerializedUids(), (error, res) => {
if (error) {
log.error('error getting raft session ids', { error });
return reject(error);
}

const data = JSON.parse(res);

return resolve(data.map(raft => `${raft.id}`));
}, log);
});
}

module.exports = {
listBuckets,
getRaftSessionIds,
};
6 changes: 6 additions & 0 deletions service-level-sidecar/env.js
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,9 @@ const defaults = {
host: 'localhost',
port: 8500,
},
enableScuba: false,
scubaBucketd: 'localhost:19000',
scubaBucketdTls: false,
};

module.exports = {
Expand All @@ -123,4 +126,7 @@ module.exports = {
readToken: loadFromEnv('WARP10_READ_TOKEN', defaults.warp10.readToken),
...loadFromEnv('WARP10_NODE', defaults.warp10.node, typeCasts.node),
},
enableScuba: loadFromEnv('ENABLE_SCUBA', defaults.enableScuba, typeCasts.bool),
scubaBucketd: loadFromEnv('SCUBA_BUCKETD_BOOTSTRAP', defaults.scubaBucketd),
scubaBucketdTls: loadFromEnv('SCUBA_BUCKETD_ENABLE_TLS', defaults.scubaBucketdTls, typeCasts.bool),
};
55 changes: 33 additions & 22 deletions service-level-sidecar/report.js
Original file line number Diff line number Diff line change
@@ -1,7 +1,12 @@
const async = require('async');
const util = require('util');

const arsenal = require('arsenal');

const { splitter } = arsenal.constants;

const bucketd = require('./bucketd');
const scuba = require('./scuba');
const env = require('./env');
const warp10 = require('./warp10');
const { getAccountIdForCanonicalId } = require('./vault');
Expand Down Expand Up @@ -43,34 +48,34 @@ class MetricReport {


/**
*
* @param {Array<Number>} logIds - raft session ids to retrieve metrics for (only contains info if scuba backend is enabled)
* @param {integer} timestamp - timestamp to retrieve metrics in microseconds
* @param {string} bucket - bucket name to retrieve metrics for
* @param {object} log - werelogs logger instance
* @returns {object} - object count and bytes stored for bucket
*/
async function getMetricsForBucket(timestamp, bucket, log) {
log.debug('getting metrics for bucket', { bucket, timestamp });
const params = {
params: {
end: timestamp,
labels: { bck: bucket },
node: env.warp10.nodeId,
},
macro: 'utapi/getMetricsAt',
};

const resp = await warp10.exec(params);
async function getMetricsForBucket(logIds, timestamp, bucket, log) {
log.debug('getting metrics for bucket', { bucket: bucket.name, timestamp });

if (env.enableScuba) {
const resourceName = `${bucket.account}${splitter}${bucket.name}`;
const logResults = await util.promisify(async.mapLimit)(logIds, env.concurrencyLimit, async logId => {
try {
return await scuba.getMetrics('bucket', resourceName, logId, new Date(timestamp), log);
} catch (err) {
log.error('error getting metrics for bucket', { bucket: bucket.name, logId, error: err.message });
throw err;
}
});

if (resp.result.length === 0) {
log.error('unable to retrieve metrics', { bucket });
throw new Error('Error retrieving metrics');
return logResults
.filter(result => result !== null)
.reduce((acc, result) => ({
count: acc.count + result.value.metrics.objectsTotal,
bytes: acc.bytes + result.value.metrics.bytesTotal,
}), { count: 0, bytes: 0 });
}

return {
count: resp.result[0].objD,
bytes: resp.result[0].sizeD,
};
return warp10.getMetricsForBucket(timestamp, bucket.name, log);
}

/**
Expand All @@ -92,10 +97,16 @@ async function getServiceReport(timestamp, log) {
const bucketReports = {};
const accountInfoCache = {};

let logIds = [];
if (env.enableScuba) {
logIds = await bucketd.getRaftSessionIds(log);
logIds = logIds.filter(id => id !== '0');
}

for await (const buckets of bucketd.listBuckets(log)) {
log.debug('got response from bucketd', { numBuckets: buckets.length });
await util.promisify(async.eachLimit)(buckets, env.concurrencyLimit, async bucket => {
const metrics = await getMetricsForBucket(timestamp, bucket.name, log);
const metrics = await getMetricsForBucket(logIds, timestamp, bucket, log);

log.debug('fetched metrics for bucket', { bucket: bucket.name, accCanonicalId: bucket.account });

Expand Down
80 changes: 80 additions & 0 deletions service-level-sidecar/scuba.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
const arsenal = require('arsenal');
const bucketclient = require('bucketclient');

const { BucketClientInterface } = arsenal.storage.metadata.bucketclient;
const { splitter } = arsenal.constants;

const rootLogger = require('./log');
const env = require('./env');
const utils = require('./utils');

const params = {
bucketdBootstrap: [env.scubaBucketd],
https: env.scubaBucketdTls ? env.tls.certs : undefined,
};

const metadata = new BucketClientInterface(params, bucketclient, rootLogger);

const listObjects = utils.retryable(metadata.listObject.bind(metadata));


/**
* Left-pad a string representation of a value with a given template.
* For example: pad('foo', '00000') gives '00foo'.
*
* @param {any} value - value to pad
* @param {string} template - padding template
* @returns {string} - padded string
*/
function padLeft(value, template) {
return `${template}${value}`.slice(-template.length);
}

function roundToDay(timestamp) {
return new Date(
Date.UTC(timestamp.getUTCFullYear(), timestamp.getUTCMonth(), timestamp.getUTCDate(), 23, 59, 59, 999),
);
}

const LENGTH_TS = 14;
const MAX_TS = 10 ** LENGTH_TS - 1; // good until 16 Nov 5138
const TEMPLATE_TS = new Array(LENGTH_TS + 1).join('0');

function formatMetricsKey(resourceName, timestamp) {
const ts = padLeft(MAX_TS - roundToDay(timestamp).getTime(), TEMPLATE_TS);
return `${resourceName}/${ts}`;
}

async function getMetrics(classType, resourceName, logId, timestamp, log) {
const listingParams = {
maxKeys: 1,
listingType: 'Basic',
gte: formatMetricsKey(resourceName, timestamp),
lte: `${resourceName}/${padLeft(MAX_TS, TEMPLATE_TS)}`,
};

const bucket = `${classType}${splitter}${logId}`;

try {
const resp = await listObjects(bucket, listingParams, log);
if (resp.length === 0) {
return null;
}

const { key, value } = resp[0];
return {
key,
value: JSON.parse(value),
};
} catch (error) {
if (error.NoSuchBucket) {
return null;
}
log.error('error during metric listing', { error: error.message });
throw error;
}
}

module.exports = {
getMetrics,
};
6 changes: 3 additions & 3 deletions service-level-sidecar/server.js
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ function finishMiddleware(req, res) {
}

// Hash our api key once for reuse during the request
// We prepend `Bearer ` to avoid having to strip it from the header value runtime for comparison
// We prepend `Bearer ` to avoid having to strip it from the header value at runtime for comparison
const actualKeyHash = crypto.createHash('sha512').copy().update(`Bearer ${env.apiKey}`).digest();

// Any handler mounted under `/api/` requires an Authorization header
Expand Down Expand Up @@ -101,7 +101,7 @@ app.post('/api/report', (req, res, next) => {
const timestamp = Date.now() * 1000;
getServiceReportCb(timestamp, req.log, (err, report) => {
if (err) {
req.log.error('error generating metrics report', { error: err });
req.log.error('error generating metrics report', { error: err.message });
next(makeError(500, 'Internal Server Error'));
return;
}
Expand All @@ -119,7 +119,7 @@ app.use((req, res, next) => {

// Catch all error handler
app.use((err, req, res, next) => {
req.log.error('error during request', { error: err });
req.log.error('error during request', { error: err.message });
// Default errors to `500 Internal Server Error` in case something unexpected slips through
const data = {
code: err.status || 500,
Expand Down
24 changes: 24 additions & 0 deletions service-level-sidecar/warp10.js
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,30 @@ class Warp10Client {
log.debug('warpscript executed', { ...params, stats: resp.meta });
return resp;
}

async getMetricsForBucket(timestamp, bucket, log) {
log.debug('getting metrics for bucket', { bucket, timestamp });
const params = {
params: {
end: timestamp,
labels: { bck: bucket },
node: this.nodeId,
},
macro: 'utapi/getMetricsAt',
};

const resp = await this.exec(params);

if (resp.result.length === 0) {
log.error('unable to retrieve metrics', { bucket });
throw new Error('Error retrieving metrics');
}

return {
count: resp.result[0].objD,
bytes: resp.result[0].sizeD,
};
}
}


Expand Down

0 comments on commit 8a5858d

Please sign in to comment.