Skip to content

Commit

Permalink
S3UTILS-149 Prevent deleting versions with pending replication
Browse files Browse the repository at this point in the history
  • Loading branch information
nicolas2bert committed Oct 13, 2023
1 parent 8ebc522 commit 32b64fa
Show file tree
Hide file tree
Showing 3 changed files with 266 additions and 2 deletions.
3 changes: 3 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -1624,6 +1624,9 @@ authentify the S3 endpoint
* **HTTPS_NO_VERIFY**: set to 1 to disable S3 endpoint certificate check
* **EXCLUDE_REPLICATING_VERSIONS**: if is set to '1,' 'true,' or 'yes,'
prevent the deletion of replicating versions
## Example
```
Expand Down
89 changes: 87 additions & 2 deletions cleanupNoncurrentVersions.js
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
const fs = require('fs');
const { http, https } = require('httpagent');
const { ObjectMD } = require('arsenal').models;

const AWS = require('aws-sdk');
const { doWhilst, eachSeries } = require('async');
const { doWhilst, eachSeries, filterLimit } = require('async');

const { Logger } = require('werelogs');

const parseOlderThan = require('./utils/parseOlderThan');
const { makeBackbeatRequest } = require('./utils/makeRequest');

const log = new Logger('s3utils::cleanupNoncurrentVersions');

Expand All @@ -31,6 +33,7 @@ const DELETED_BEFORE = (process.env.DELETED_BEFORE
const ONLY_DELETED = _parseBoolean(process.env.ONLY_DELETED) || DELETED_BEFORE !== null;
const { HTTPS_CA_PATH } = process.env;
const { HTTPS_NO_VERIFY } = process.env;
const EXCLUDE_REPLICATING_VERSIONS = _parseBoolean(process.env.EXCLUDE_REPLICATING_VERSIONS);

const LISTING_LIMIT = 1000;
const LOG_PROGRESS_INTERVAL_MS = 10000;
Expand Down Expand Up @@ -80,6 +83,8 @@ Optional environment variables:
HTTPS_CA_PATH: path to a CA certificate bundle used to authentify
the S3 endpoint
HTTPS_NO_VERIFY: set to 1 to disable S3 endpoint certificate check
EXCLUDE_REPLICATING_VERSIONS: if is set to '1,' 'true,' or 'yes,'
prevent the deletion of replicating versions
`;

// We accept console statements for usage purpose
Expand All @@ -95,7 +100,9 @@ if (!S3_ENDPOINT) {
console.error(USAGE);
process.exit(1);
}
const { hostname: s3Hostname, port: s3Port } = new URL(S3_ENDPOINT);
const s3EndpointIsHttps = S3_ENDPOINT.startsWith('https:');

if (!ACCESS_KEY) {
console.error('ACCESS_KEY not defined');
console.error(USAGE);
Expand Down Expand Up @@ -230,6 +237,44 @@ const logProgressInterval = setInterval(
LOG_PROGRESS_INTERVAL_MS,
);

function _getMetadata(bucketName, keyName, versionId, cb) {
return makeBackbeatRequest({
hostname: s3Hostname,
port: s3Port,
method: 'GET',
resourceType: 'metadata',
bucket: bucketName,
objectKey: keyName,
queryObj: {
versionId,
},
authCredentials: {
accessKey: ACCESS_KEY,
secretKey: SECRET_KEY,
},
isHttps: s3EndpointIsHttps,
agent,
}, (err, data) => {
if (err) {
return cb(err);
}

let parsedBody;
try {
parsedBody = JSON.parse(data.body);
} catch (parseError) {
return cb(parseError);
}

const { result, error } = ObjectMD.createFromBlob(parsedBody.Body);
if (error) {
return cb(error);
}

return cb(null, result);
});
}

function _listObjectVersions(bucket, VersionIdMarker, KeyMarker, cb) {
return s3.listObjectVersions({
Bucket: bucket,
Expand Down Expand Up @@ -444,10 +489,50 @@ function _triggerDeletesOnEligibleObjects(
}
}
});
_triggerDeletes(bucket, versionsToDelete, cb);
_filterEligibleVersions(bucket, versionsToDelete, (eligibleVersions) => {

Check failure on line 492 in cleanupNoncurrentVersions.js

View workflow job for this annotation

GitHub Actions / tests

'_filterEligibleVersions' was used before it was defined

Check warning on line 492 in cleanupNoncurrentVersions.js

View workflow job for this annotation

GitHub Actions / tests

Unexpected parentheses around single function argument
_triggerDeletes(bucket, eligibleVersions, cb);
});
return ret;
}

function _filterEligibleVersions(bucket, versionsToDelete, cb) {
if (!EXCLUDE_REPLICATING_VERSIONS) {
return process.nextTick(cb, versionsToDelete);
}

return filterLimit(versionsToDelete, 10, (v, next) => {

Check warning on line 503 in cleanupNoncurrentVersions.js

View workflow job for this annotation

GitHub Actions / tests

Unexpected block statement surrounding arrow body; move the returned value immediately after the `=>`
return _getMetadata(bucket, v.Key, v.VersionId, (err, objMD) => {
if (err) {
log.error('version not deleted because get metadata failed', {
bucketName: bucket,
key: v.Key,
versionId: v.VersionId,
error: err,
});

return next(null, false);
}

const replicationStatus = objMD.getReplicationStatus();

if (replicationStatus && replicationStatus !== 'COMPLETED') {
log.info('version not deleted because being replicated', {
bucketName: bucket,
key: v.Key,
versionId: v.VersionId,
error: err,
});

return next(null, false);
}

return next(null, true);
});
}, (err, eligibleVersions) => {

Check warning on line 531 in cleanupNoncurrentVersions.js

View workflow job for this annotation

GitHub Actions / tests

Unexpected block statement surrounding arrow body; move the returned value immediately after the `=>`
return cb(eligibleVersions);
});
}

function _waitForDeletesCompletion(cb) {
if (batchDeleteInProgress) {
batchDeleteOnFullDrain = cb;
Expand Down
176 changes: 176 additions & 0 deletions utils/makeRequest.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,176 @@
const { auth } = require('arsenal');

const http = require('http');
const https = require('https');
const querystring = require('querystring');
const fs = require('fs');

const ipAddress = process.env.IP ? process.env.IP : '127.0.0.1';

function _parseError(responseBody, statusCode, jsonResponse) {
if (jsonResponse && statusCode !== 200) {
return JSON.parse(responseBody);
}
if (responseBody.indexOf('<Error>') > -1) {
const error = {};
const codeStartIndex = responseBody.indexOf('<Code>') + 6;
const codeEndIndex = responseBody.indexOf('</Code>');
error.code = responseBody.slice(codeStartIndex, codeEndIndex);
const msgStartIndex = responseBody.indexOf('<Message>') + 9;
const msgEndIndex = responseBody.indexOf('</Message>');
error.message = responseBody.slice(msgStartIndex, msgEndIndex);
return error;
}
return null;
}

function _decodeURI(uri) {
// do the same decoding than in S3 server
return decodeURIComponent(uri.replace(/\+/g, ' '));
}

/** _makeRequest - utility function to generate a request
* @param {object} params - params for making request
* @param {string} params.hostname - request hostname
* @param {number} [params.port] - request port
* @param {string} params.method - request method
* @param {string} [params.path] - URL-encoded request path
* @param {boolean} [params.jsonResponse] - if true, response is
* expected to be received in JSON format (including errors)
* @param {object} [params.queryObj] - query fields and their string values
* @param {object} [params.authCredentials] - authentication credentials
* @param {object} params.authCredentials.accessKey - access key
* @param {object} params.authCredentials.secretKey - secret key
* @param {boolean} [params.isHttps] - whether the request is made using the HTTPS protocol
* @param {object} [params.agent] - request agent
* @param {object} [params.headers] - headers and their string values
* @param {string} [params.requestBody] - request body contents
* @param {function} callback - with error and response parameters
* @return {undefined} - and call callback
*/
function _makeRequest(params, callback) {
const {
hostname,
port,
method,
path,
jsonResponse,
queryObj,
authCredentials,
isHttps,
agent,
headers,
requestBody,
} = params;

const options = {
hostname,
port,
method,
headers,
path: path || '/',
rejectUnauthorized: false,
agent,
};

const qs = querystring.stringify(queryObj);

const transport = isHttps ? https : http;

const req = transport.request(options, res => {
const body = [];
res.on('data', chunk => {
body.push(chunk);
});
res.on('error', callback);
res.on('end', () => {
const total = body.join('');
const data = {
headers: res.headers,
statusCode: res.statusCode,
body: total,
};
const err = _parseError(total, res.statusCode, jsonResponse);
if (err) {
err.statusCode = res.statusCode;
}
return callback(err, data);
});
});

req.on('error', callback);
// generate v4 headers if authentication credentials are provided
const encodedPath = req.path;
// decode path because signing code re-encodes it
req.path = _decodeURI(encodedPath);
if (authCredentials) {
if (queryObj) {
auth.client.generateV4Headers(req, queryObj, authCredentials.accessKey, authCredentials.secretKey, 's3');
// may update later if request may contain POST body
} else {
auth.client.generateV4Headers(req, '', authCredentials.accessKey, authCredentials.secretKey, 's3');
}
}
// restore original URL-encoded path
req.path = encodedPath;
req.path = queryObj ? `${options.path}?${qs}` : req.path;
if (requestBody) {
req.write(requestBody);
}
req.end();
}


/** _makeBackbeatRequest - utility function to generate a request going
* through backbeat route
* @param {object} params - params for making request
* @param {string} params.hostname - request hostname
* @param {string} params.port - request port
* @param {string} params.method - request method
* @param {string} params.resourceType - request source type ("data", "metadata", "multiplebackenddata", ...)
* @param {string} params.bucket - bucket name
* @param {string} params.objectKey - object key
* @param {object} [params.queryObj] - query params
* @param {object} [params.authCredentials] - authentication credentials
* @param {object} params.authCredentials.accessKey - access key
* @param {object} params.authCredentials.secretKey - secret key
* @param {string} params.isHttps - whether the request is made using the HTTPS protocol
* @param {string} params.agent - request agent
* @param {object} [params.headers] - headers and their string values
* @param {string} [params.requestBody] - request body content
* @param {function} callback - with error and response parameters
* @return {undefined} - and call callback
*/
function makeBackbeatRequest(params, callback) {
const {
hostname,
port,
method,
resourceType,
bucket,
objectKey,
queryObj,
authCredentials,
isHttps,
agent,
headers,
requestBody,
} = params;

const options = {
hostname,
port,
method,
path: `/_/backbeat/${resourceType}/${bucket}/${objectKey}`,
jsonResponse: true,
queryObj,
authCredentials,
isHttps,
agent,
headers,
requestBody,
};
_makeRequest(options, callback);
}

module.exports = { makeBackbeatRequest };

0 comments on commit 32b64fa

Please sign in to comment.