Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

S3UTILS-149 Prevent deleting versions with pending replication #301

Merged
merged 2 commits into from
Oct 26, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -1624,6 +1624,9 @@ authentify the S3 endpoint

* **HTTPS_NO_VERIFY**: set to 1 to disable S3 endpoint certificate check

* **EXCLUDE_REPLICATING_VERSIONS**: if is set to '1,' 'true,' or 'yes,'
prevent the deletion of replicating versions

## Example

```
Expand Down
75 changes: 72 additions & 3 deletions cleanupNoncurrentVersions.js
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
const fs = require('fs');
const { http, https } = require('httpagent');
const { ObjectMD } = require('arsenal').models;

const AWS = require('aws-sdk');
const { doWhilst, eachSeries } = require('async');
const { doWhilst, eachSeries, filterLimit } = require('async');

const { Logger } = require('werelogs');

const BackbeatClient = require('./BackbeatClient');
const parseOlderThan = require('./utils/parseOlderThan');

const log = new Logger('s3utils::cleanupNoncurrentVersions');
Expand All @@ -31,6 +33,7 @@ const DELETED_BEFORE = (process.env.DELETED_BEFORE
const ONLY_DELETED = _parseBoolean(process.env.ONLY_DELETED) || DELETED_BEFORE !== null;
const { HTTPS_CA_PATH } = process.env;
const { HTTPS_NO_VERIFY } = process.env;
const EXCLUDE_REPLICATING_VERSIONS = _parseBoolean(process.env.EXCLUDE_REPLICATING_VERSIONS);

const LISTING_LIMIT = 1000;
const LOG_PROGRESS_INTERVAL_MS = 10000;
Expand Down Expand Up @@ -80,6 +83,8 @@ Optional environment variables:
HTTPS_CA_PATH: path to a CA certificate bundle used to authentify
the S3 endpoint
HTTPS_NO_VERIFY: set to 1 to disable S3 endpoint certificate check
EXCLUDE_REPLICATING_VERSIONS: if is set to '1,' 'true,' or 'yes,'
prevent the deletion of replicating versions
`;

// We accept console statements for usage purpose
Expand Down Expand Up @@ -153,6 +158,7 @@ log.info('Start deleting noncurrent versions and delete markers', {
olderThan: (OLDER_THAN ? OLDER_THAN.toString() : 'N/A'),
deletedBefore: (DELETED_BEFORE ? DELETED_BEFORE.toString() : 'N/A'),
onlyDeleted: ONLY_DELETED,
excludeReplicatingVersions: EXCLUDE_REPLICATING_VERSIONS,
});

let agent;
Expand Down Expand Up @@ -197,14 +203,19 @@ const s3Options = {
* (0.9 + Math.random() * 0.2);
},
};
const s3 = new AWS.S3(Object.assign(options, s3Options));

const opt = Object.assign(options, s3Options);

const s3 = new AWS.S3(opt);
const bb = new BackbeatClient(opt);

let nListed = 0;
let nDeletesTriggered = 0;
let nDeleted = 0;
let nSkippedCurrent = 0;
let nSkippedTooRecent = 0;
let nSkippedNotDeleted = 0;
let nSkippedReplicating = 0;
let nErrors = 0;
let bucketInProgress = null;
let KeyMarker = null;
Expand All @@ -218,6 +229,7 @@ function _logProgress(message) {
skippedCurrent: nSkippedCurrent,
skippedTooRecent: nSkippedTooRecent,
skippedNotDeleted: nSkippedNotDeleted,
skippedReplicating: EXCLUDE_REPLICATING_VERSIONS ? nSkippedReplicating : 'N/A',
errors: nErrors,
bucket: bucketInProgress || null,
keyMarker: KeyMarker || null,
Expand All @@ -240,6 +252,25 @@ function _listObjectVersions(bucket, VersionIdMarker, KeyMarker, cb) {
}, cb);
}

function _getMetadata(bucket, key, versionId, cb) {
return bb.getMetadata({
Bucket: bucket,
Key: key,
VersionId: versionId,
}, (err, data) => {
if (err) {
return cb(err);
}

const { result, error } = ObjectMD.createFromBlob(data.Body);
if (error) {
return cb(error);
}

return cb(null, result);
});
}

function _lastModifiedIsEligible(lastModifiedString) {
return !OLDER_THAN || (new Date(lastModifiedString) < OLDER_THAN);
}
Expand Down Expand Up @@ -333,6 +364,44 @@ function decVersionId(versionId) {
+ String.fromCharCode(versionId.charCodeAt(versionId.length - 1) - 1);
}

function _filterEligibleVersions(bucket, versionsToDelete, cb) {
if (!EXCLUDE_REPLICATING_VERSIONS) {
return process.nextTick(cb, versionsToDelete);
}

return filterLimit(versionsToDelete, 10, (v, next) => {
_getMetadata(bucket, v.Key, v.VersionId, (err, objMD) => {
if (err) {
nErrors += 1;
log.error('version not deleted because get metadata failed', {
bucketName: bucket,
key: v.Key,
versionId: v.VersionId,
error: err,
});

return next(null, false);
}

const replicationStatus = objMD.getReplicationStatus();

if (replicationStatus && replicationStatus !== 'COMPLETED') {
nSkippedReplicating += 1;
log.info('version not deleted because being replicated', {
bucketName: bucket,
key: v.Key,
versionId: v.VersionId,
error: err,
});

return next(null, false);
}

return next(null, true);
});
}, (err, eligibleVersions) => cb(eligibleVersions));
}

function _triggerDeletesOnEligibleObjects(
bucket,
versions,
Expand Down Expand Up @@ -444,7 +513,7 @@ function _triggerDeletesOnEligibleObjects(
}
}
});
_triggerDeletes(bucket, versionsToDelete, cb);
_filterEligibleVersions(bucket, versionsToDelete, eligibleVersions => _triggerDeletes(bucket, eligibleVersions, cb));
return ret;
}

Expand Down
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "s3utils",
"version": "1.14.0",
"version": "1.14.1",
"engines": {
"node": ">= 16"
},
Expand Down