Skip to content

Commit

Permalink
Delete orphaned cold object
Browse files Browse the repository at this point in the history
When an object is deleted (from S3) while it was archived, a message
is sent to the cold storage backend to notify the removal.

When this is happens during the transition (archival) process, the object
is not yet archived, and we cannot sent such message to the backend. In most
cases, this should be fine, as the archiving will fail with an error related
to the absence of the (removed) object.

However, there is a race condition, and it is possible that the archival succeeds
in backend if the object is removed at the end of this process or in between this
result from the backend and the update of metadata in backbeat : the metadata
update will fail (as expected), but the object will still be in cold storage and
remain there forever (not referenced from anywhere, and thus orphaned)

Issue: BB-469
  • Loading branch information
Kerkesni committed Nov 23, 2023
1 parent c8564d2 commit 520e2ea
Show file tree
Hide file tree
Showing 2 changed files with 89 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ const { LifecycleResetTransitionInProgressTask } =
require('../tasks/LifecycleResetTransitionInProgressTask');
const { updateCircuitBreakerConfigForImplicitOutputQueue } = require('../../../lib/CircuitBreaker');
const { LifecycleRetriggerRestoreTask } = require('../tasks/LifecycleRetriggerRestoreTask');
const BackbeatProducer = require('../../../lib/BackbeatProducer');

class LifecycleObjectTransitionProcessor extends LifecycleObjectProcessor {

Expand Down Expand Up @@ -45,6 +46,45 @@ class LifecycleObjectTransitionProcessor extends LifecycleObjectProcessor {
super(zkConfig, kafkaConfig, lcConfig, s3Config, transport);
}

/**
* Start kafka consumer. Emits a 'ready' event when
* consumer is ready.
* @param {function} done - callback
* @return {undefined}
*/
start(done) {
super.start(err => {
if (err) {
return done(err);
}
return this.setupProducer(done);
});
}

/**
* Set up Kafka producer
* @param {function} cb callback called when producer
* startup is complete
* @return {undefined}
*/
setupProducer(cb) {
const producer = new BackbeatProducer({
kafka: { hosts: this._kafkaConfig.hosts },
maxRequestSize: this._kafkaConfig.maxRequestSize,
});
producer.once('error', () => {});
producer.once('ready', () => {
producer.removeAllListeners('error');
producer.on('error', err =>
this._log.error('error from backbeat producer', {
method: 'LifecycleObjectTransitionProcessor.setupProducer',
error: err,
}));
this._coldProducer = producer;
return cb();
});
}

getProcessorType() {
return 'transition-processor';
}
Expand Down Expand Up @@ -153,6 +193,13 @@ class LifecycleObjectTransitionProcessor extends LifecycleObjectProcessor {
log: this._log,
}, done);
}

getStateVars() {
return {
...super.getStateVars(),
coldProducer: this._coldProducer,
};
}
}

module.exports = LifecycleObjectTransitionProcessor;
43 changes: 42 additions & 1 deletion extensions/lifecycle/tasks/LifecycleColdStatusArchiveTask.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ const ActionQueueEntry = require('../../../lib/models/ActionQueueEntry');
const LifecycleUpdateTransitionTask = require('./LifecycleUpdateTransitionTask');
const { LifecycleMetrics } = require('../LifecycleMetrics');

class SkipMdUpdateError extends Error {}

class LifecycleColdStatusArchiveTask extends LifecycleUpdateTransitionTask {
getTargetAttribute(entry) {
const {
Expand Down Expand Up @@ -40,6 +42,38 @@ class LifecycleColdStatusArchiveTask extends LifecycleUpdateTransitionTask {
});
}

/**
* Requests the deletion of a cold object by pushing
* a message into the cold GC topic
* @param {string} coldLocation cold location name
* @param {ColdStorageStatusQueueEntry} entry entry received
* from the cold location status topic
* @param {Logger} log logger instance
* @param {function} cb callback
* @return {undefined}
*/
_deleteColdObject(coldLocation, entry, log, cb) {
const coldGcTopic = `${this.lcConfig.coldStorageGCTopicPrefix}${coldLocation}`;
const message = JSON.stringify({
bucketName: entry.target.bucketName,
objectKey: entry.target.objectKey,
objectVersion: entry.target.objectVersion,
archiveInfo: entry.archiveInfo,
requestId: entry.requestId,
});
this.coldProducer.sendToTopic(coldGcTopic, message, err => {
if (err) {
log.error('error sending cold object deletion entry', {
error: err,
entry: entry.getLogInfo(),
method: 'LifecycleColdStatusArchiveTask._deleteColdObject',
});
return cb(err);
}
return cb(new SkipMdUpdateError('cold object deleted'));
});
}

processEntry(coldLocation, entry, done) {
const log = this.logger.newRequestLogger();
let objectMD;
Expand All @@ -50,6 +84,13 @@ class LifecycleColdStatusArchiveTask extends LifecycleUpdateTransitionTask {
next => this._getMetadata(entry, log, (err, res) => {
LifecycleMetrics.onS3Request(log, 'getMetadata', 'archive', err);
if (err) {
if (err.code === 'ObjNotFound') {
log.info('object metadata not found, cleaning orphan cold object', {
entry: entry.getLogInfo(),
method: 'LifecycleColdStatusArchiveTask.processEntry',
});
return this._deleteColdObject(coldLocation, entry, log, next);
}
return next(err);
}

Expand Down Expand Up @@ -87,7 +128,7 @@ class LifecycleColdStatusArchiveTask extends LifecycleUpdateTransitionTask {
return process.nextTick(next);
},
], err => {
if (err) {
if (err && !(err instanceof SkipMdUpdateError)) {
// if error occurs, do not commit offset
return done(err, { committable: false });
}
Expand Down

0 comments on commit 520e2ea

Please sign in to comment.