Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

S3UTILS-181: Scuba backend for service-level-sidecar #328

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 33 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -1972,14 +1972,31 @@ REST API to provide service level reports for UtapiV2

## Usage


**Using Warp 10 backend**

```shell
docker run -d \
--network=host \
-e SIDECAR_API_KEY=dev_key_change_me \
-e SIDECAR_SCALE_FACTOR=1.4 \
-e SIDECAR_WARP10_NODE="md1-cluster1:[email protected]:4802" \
scality/s3utils service-level-sidecar/index.js
```

**Using Scuba backend**
```shell
docker run -d \
--network=host \
-e SIDECAR_API_KEY=dev_key_change_me \
-e SIDECAR_SCALE_FACTOR=1.4 \
-e SIDECAR_ENABLE_SCUBA=true \
-e SIDECAR_SCUBA_BUCKETD_BOOTSTRAP=127.0.0.1:19000 \
scality/s3utils service-level-sidecar/index.js
```

**Example output**
```shell
curl -X POST -H "Authorization: Bearer dev_key_change_me" localhost:24742/api/report | jq
{
"account": [
Expand Down Expand Up @@ -2120,7 +2137,7 @@ docker run -d \
scality/s3utils service-level-sidecar/index.js
```

#### Warp10
#### Warp 10

The Warp 10 address is configured using `SIDECAR_WARP10_NODE`.
The Warp 10 `nodeId` must be included (normally matches ansible inventory name plus port ie `md1-cluster1:4802`).
Expand All @@ -2133,6 +2150,21 @@ docker run -d \
scality/s3utils service-level-sidecar/index.js
```

#### Scuba

The scuba backend can be enabled by setting `SIDECAR_ENABLE_SCUBA`.
anurag4DSB marked this conversation as resolved.
Show resolved Hide resolved
A bucketd address can be provided using `SIDECAR_SCUBA_BUCKETD_BOOTSTRAP`.
If a bucketd address is not provided `127.0.0.1:19000` will be used.
Internal TLS support can be enabled using `SIDECAR_SCUBA_BUCKETD_ENABLE_TLS`.

```shell
docker run -d \
--network=host \
-e SIDECAR_ENABLE_SCUBA=true \
-e SIDECAR_SCUBA_BUCKETD_BOOTSTRAP=127.0.0.1:19000 \
scality/s3utils service-level-sidecar/index.js
```

### Other Settings

- Log level can be set using `SIDECAR_LOG_LEVEL` (defaults to `info`)
Expand Down
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "s3utils",
"version": "1.15.4",
"version": "1.15.5",
"engines": {
"node": ">= 16"
},
Expand Down
27 changes: 26 additions & 1 deletion service-level-sidecar/bucketd.js
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,12 @@ const params = {

const metadata = new BucketClientInterface(params, bucketclient, rootLogger);

const listObjects = utils.retryable(metadata.listObject.bind(metadata));
const listObjects = utils.retryable((bucket, params, log, cb) => metadata.listObject(bucket, params, log, (err, res) => {
if (err && err.NoSuchBucket) {
return cb(null, []);
}
return cb(err, res);
}));

/**
* List all s3 buckets implemented as a async generator
Expand All @@ -36,6 +41,10 @@ async function* listBuckets(log) {
// eslint-disable-next-line no-await-in-loop
res = await listObjects(usersBucket, { ...listingParams, gt }, log);
} catch (error) {
if (error.NoSuchBucket) {
log.info('no buckets found');
return;
}
log.error('Error during listing', { error });
throw error;
}
Expand All @@ -60,6 +69,22 @@ async function* listBuckets(log) {
}
}

async function getRaftSessionIds(log) {
return new Promise((resolve, reject) => {
metadata.client.getAllRafts(log.getSerializedUids(), (error, res) => {
if (error) {
log.error('error getting raft session ids', { error });
return reject(error);
}

const data = JSON.parse(res);

return resolve(data.map(raft => `${raft.id}`));
}, log);
});
}

module.exports = {
listBuckets,
getRaftSessionIds,
};
6 changes: 6 additions & 0 deletions service-level-sidecar/env.js
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,9 @@ const defaults = {
host: 'localhost',
port: 8500,
},
enableScuba: false,
scubaBucketd: 'localhost:19000',
scubaBucketdTls: false,
};

module.exports = {
Expand All @@ -123,4 +126,7 @@ module.exports = {
readToken: loadFromEnv('WARP10_READ_TOKEN', defaults.warp10.readToken),
...loadFromEnv('WARP10_NODE', defaults.warp10.node, typeCasts.node),
},
enableScuba: loadFromEnv('ENABLE_SCUBA', defaults.enableScuba, typeCasts.bool),
scubaBucketd: loadFromEnv('SCUBA_BUCKETD_BOOTSTRAP', defaults.scubaBucketd),
scubaBucketdTls: loadFromEnv('SCUBA_BUCKETD_ENABLE_TLS', defaults.scubaBucketdTls, typeCasts.bool),
};
12 changes: 11 additions & 1 deletion service-level-sidecar/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -30,13 +30,23 @@ function setupSignalHandlers(cleanUpFunc) {
startServer(server => {
log.info(`server listening on ${env.host}:${env.port}`);

const sockets = new Set();
server.on('connection', socket => {
sockets.add(socket);
socket.once('close', () => sockets.delete(socket));
});

if (env.tls.enabled) {
log.info('tls enabled', { apiServer: true, vault: env.vaultTls, bucketd: env.bucketdTls });
}

const cleanup = jsutil.once(() => {
log.info('server exiting');
server.close();
server.close(() => {
sockets.forEach(socket => socket.destroy());
log.info('server closed');
process.exit(0);
});
});

setupSignalHandlers(cleanup);
Expand Down
59 changes: 36 additions & 23 deletions service-level-sidecar/report.js
Original file line number Diff line number Diff line change
@@ -1,7 +1,12 @@
const async = require('async');
const util = require('util');

const arsenal = require('arsenal');

const { splitter } = arsenal.constants;

const bucketd = require('./bucketd');
const scuba = require('./scuba');
const env = require('./env');
const warp10 = require('./warp10');
const { getAccountIdForCanonicalId } = require('./vault');
Expand Down Expand Up @@ -43,34 +48,36 @@ class MetricReport {


/**
*
* @param {Array<Number>} sessionIds - raft session ids to retrieve metrics for (only contains info if scuba backend is enabled)
* @param {integer} timestamp - timestamp to retrieve metrics in microseconds
* @param {string} bucket - bucket name to retrieve metrics for
* @param {object} bucket - bucket to retrieve metrics for
* @param {string} bucket.name - bucket name
* @param {string} bucket.account - bucket owner account canonical id
* @param {object} log - werelogs logger instance
* @returns {object} - object count and bytes stored for bucket
*/
async function getMetricsForBucket(timestamp, bucket, log) {
log.debug('getting metrics for bucket', { bucket, timestamp });
const params = {
params: {
end: timestamp,
labels: { bck: bucket },
node: env.warp10.nodeId,
},
macro: 'utapi/getMetricsAt',
};

const resp = await warp10.exec(params);
async function getMetricsForBucket(sessionIds, timestamp, bucket, log) {
log.debug('getting metrics for bucket', { bucket: bucket.name, timestamp });

if (env.enableScuba) {
const resourceName = `${bucket.account}${splitter}${bucket.name}`;
const logResults = await util.promisify(async.mapLimit)(sessionIds, env.concurrencyLimit, async logId => {
try {
return await scuba.getMetrics('bucket', resourceName, logId, new Date(timestamp), log);
} catch (err) {
log.error('error getting metrics for bucket', { bucket: bucket.name, logId, error: err.message });
throw err;
}
});

if (resp.result.length === 0) {
log.error('unable to retrieve metrics', { bucket });
throw new Error('Error retrieving metrics');
return logResults
.filter(result => result !== null)
.reduce((acc, result) => ({
count: acc.count + result.value.metrics.objectsTotal,
bytes: acc.bytes + result.value.metrics.bytesTotal,
}), { count: 0, bytes: 0 });
}

return {
count: resp.result[0].objD,
bytes: resp.result[0].sizeD,
};
return warp10.getMetricsForBucket(timestamp, bucket.name, log);
}

/**
Expand All @@ -92,10 +99,16 @@ async function getServiceReport(timestamp, log) {
const bucketReports = {};
const accountInfoCache = {};

let sessionIds = [];
if (env.enableScuba) {
sessionIds = await bucketd.getRaftSessionIds(log);
sessionIds = sessionIds.filter(id => id !== '0');
}

for await (const buckets of bucketd.listBuckets(log)) {
log.debug('got response from bucketd', { numBuckets: buckets.length });
await util.promisify(async.eachLimit)(buckets, env.concurrencyLimit, async bucket => {
const metrics = await getMetricsForBucket(timestamp, bucket.name, log);
const metrics = await getMetricsForBucket(sessionIds, timestamp, bucket, log);

log.debug('fetched metrics for bucket', { bucket: bucket.name, accCanonicalId: bucket.account });

Expand Down
68 changes: 68 additions & 0 deletions service-level-sidecar/scuba.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
const arsenal = require('arsenal');
const bucketclient = require('bucketclient');

const { BucketClientInterface } = arsenal.storage.metadata.bucketclient;
const { splitter } = arsenal.constants;

const rootLogger = require('./log');
const env = require('./env');
const utils = require('./utils');

const params = {
bucketdBootstrap: [env.scubaBucketd],
BourgoisMickael marked this conversation as resolved.
Show resolved Hide resolved
https: env.scubaBucketdTls ? env.tls.certs : undefined,
};

const metadata = new BucketClientInterface(params, bucketclient, rootLogger);

const listObjects = utils.retryable((bucket, params, log, cb) => metadata.listObject(bucket, params, log, (err, res) => {
if (err && err.NoSuchBucket) {
return cb(null, []);
}
return cb(err, res);
}));

function roundToDay(timestamp) {
return new Date(
Date.UTC(timestamp.getUTCFullYear(), timestamp.getUTCMonth(), timestamp.getUTCDate(), 23, 59, 59, 999),
);
}

const LENGTH_TS = 14;
const MAX_TS = parseInt(('9'.repeat(LENGTH_TS)), 10);

function formatMetricsKey(resourceName, timestamp) {
const ts = (MAX_TS - roundToDay(timestamp).getTime()).toString().padStart(LENGTH_TS, '0');
return `${resourceName}/${ts}`;
}

async function getMetrics(classType, resourceName, sessionId, timestamp, log) {
const listingParams = {
maxKeys: 1,
listingType: 'Basic',
gte: formatMetricsKey(resourceName, timestamp),
lte: `${resourceName}/${MAX_TS.toString()}`,
};

const bucket = `${classType}${splitter}${sessionId}`;

try {
const resp = await listObjects(bucket, listingParams, log);
if (resp.length === 0) {
return null;
}

const { key, value } = resp[0];
return {
key,
value: JSON.parse(value),
};
} catch (error) {
log.error('error during metric listing', { bucket, error: error.message });
throw error;
}
}

module.exports = {
getMetrics,
};
6 changes: 3 additions & 3 deletions service-level-sidecar/server.js
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ function finishMiddleware(req, res) {
}

// Hash our api key once for reuse during the request
// We prepend `Bearer ` to avoid having to strip it from the header value runtime for comparison
// We prepend `Bearer ` to avoid having to strip it from the header value at runtime for comparison
const actualKeyHash = crypto.createHash('sha512').copy().update(`Bearer ${env.apiKey}`).digest();

// Any handler mounted under `/api/` requires an Authorization header
Expand Down Expand Up @@ -101,7 +101,7 @@ app.post('/api/report', (req, res, next) => {
const timestamp = Date.now() * 1000;
getServiceReportCb(timestamp, req.log, (err, report) => {
if (err) {
req.log.error('error generating metrics report', { error: err });
req.log.error('error generating metrics report', { error: err.message });
next(makeError(500, 'Internal Server Error'));
return;
}
Expand All @@ -119,7 +119,7 @@ app.use((req, res, next) => {

// Catch all error handler
app.use((err, req, res, next) => {
req.log.error('error during request', { error: err });
req.log.error('error during request', { error: err.message });
// Default errors to `500 Internal Server Error` in case something unexpected slips through
const data = {
code: err.status || 500,
Expand Down
24 changes: 24 additions & 0 deletions service-level-sidecar/warp10.js
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,30 @@ class Warp10Client {
log.debug('warpscript executed', { ...params, stats: resp.meta });
return resp;
}

async getMetricsForBucket(timestamp, bucket, log) {
log.debug('getting metrics for bucket', { bucket, timestamp });
const params = {
params: {
end: timestamp,
labels: { bck: bucket },
node: this.nodeId,
},
macro: 'utapi/getMetricsAt',
};

const resp = await this.exec(params);

if (resp.result.length === 0) {
log.error('unable to retrieve metrics', { bucket });
throw new Error('Error retrieving metrics');
}

return {
count: resp.result[0].objD,
bytes: resp.result[0].sizeD,
};
}
}


Expand Down