diff --git a/extensions/lifecycle/objectProcessor/LifecycleObjectTransitionProcessor.js b/extensions/lifecycle/objectProcessor/LifecycleObjectTransitionProcessor.js index 99cc5b3c67..9eee4027fa 100644 --- a/extensions/lifecycle/objectProcessor/LifecycleObjectTransitionProcessor.js +++ b/extensions/lifecycle/objectProcessor/LifecycleObjectTransitionProcessor.js @@ -10,6 +10,7 @@ const { LifecycleResetTransitionInProgressTask } = require('../tasks/LifecycleResetTransitionInProgressTask'); const { updateCircuitBreakerConfigForImplicitOutputQueue } = require('../../../lib/CircuitBreaker'); const { LifecycleRetriggerRestoreTask } = require('../tasks/LifecycleRetriggerRestoreTask'); +const BackbeatProducer = require('../../../lib/BackbeatProducer'); class LifecycleObjectTransitionProcessor extends LifecycleObjectProcessor { @@ -45,6 +46,45 @@ class LifecycleObjectTransitionProcessor extends LifecycleObjectProcessor { super(zkConfig, kafkaConfig, lcConfig, s3Config, transport); } + /** + * Start kafka consumer. Emits a 'ready' event when + * consumer is ready. + * @param {function} done - callback + * @return {undefined} + */ + start(done) { + super.start(err => { + if (err) { + return done(err); + } + return this.setupProducer(done); + }); + } + + /** + * Set up Kafka producer + * @param {function} cb callback called when producer + * startup is complete + * @return {undefined} + */ + setupProducer(cb) { + const producer = new BackbeatProducer({ + kafka: { hosts: this._kafkaConfig.hosts }, + maxRequestSize: this._kafkaConfig.maxRequestSize, + }); + producer.once('error', () => {}); + producer.once('ready', () => { + producer.removeAllListeners('error'); + producer.on('error', err => + this._log.error('error from backbeat producer', { + method: 'LifecycleObjectTransitionProcessor.setupProducer', + error: err, + })); + this._coldProducer = producer; + return cb(); + }); + } + getProcessorType() { return 'transition-processor'; } @@ -153,6 +193,13 @@ class LifecycleObjectTransitionProcessor extends LifecycleObjectProcessor { log: this._log, }, done); } + + getStateVars() { + return { + ...super.getStateVars(), + coldProducer: this._coldProducer, + }; + } } module.exports = LifecycleObjectTransitionProcessor; diff --git a/extensions/lifecycle/tasks/LifecycleColdStatusArchiveTask.js b/extensions/lifecycle/tasks/LifecycleColdStatusArchiveTask.js index c3dbd8a905..1daefdd6be 100644 --- a/extensions/lifecycle/tasks/LifecycleColdStatusArchiveTask.js +++ b/extensions/lifecycle/tasks/LifecycleColdStatusArchiveTask.js @@ -5,6 +5,8 @@ const ActionQueueEntry = require('../../../lib/models/ActionQueueEntry'); const LifecycleUpdateTransitionTask = require('./LifecycleUpdateTransitionTask'); const { LifecycleMetrics } = require('../LifecycleMetrics'); +class SkipMdUpdateError extends Error {} + class LifecycleColdStatusArchiveTask extends LifecycleUpdateTransitionTask { getTargetAttribute(entry) { const { @@ -40,6 +42,38 @@ class LifecycleColdStatusArchiveTask extends LifecycleUpdateTransitionTask { }); } + /** + * Requests the deletion of a cold object by pushing + * a message into the cold GC topic + * @param {string} coldLocation cold location name + * @param {ColdStorageStatusQueueEntry} entry entry received + * from the cold location status topic + * @param {Logger} log logger instance + * @param {function} cb callback + * @return {undefined} + */ + _deleteColdObject(coldLocation, entry, log, cb) { + const coldGcTopic = `${this.lcConfig.coldStorageGCTopicPrefix}${coldLocation}`; + const message = JSON.stringify({ + bucketName: entry.target.bucketName, + objectKey: entry.target.objectKey, + objectVersion: entry.target.objectVersion, + archiveInfo: entry.archiveInfo, + requestId: entry.requestId, + }); + this.coldProducer.sendToTopic(coldGcTopic, message, err => { + if (err) { + log.error('error sending cold object deletion entry', { + error: err, + entry: entry.getLogInfo(), + method: 'LifecycleColdStatusArchiveTask._deleteColdObject', + }); + return cb(err); + } + return cb(new SkipMdUpdateError('cold object deleted')); + }); + } + processEntry(coldLocation, entry, done) { const log = this.logger.newRequestLogger(); let objectMD; @@ -50,6 +84,13 @@ class LifecycleColdStatusArchiveTask extends LifecycleUpdateTransitionTask { next => this._getMetadata(entry, log, (err, res) => { LifecycleMetrics.onS3Request(log, 'getMetadata', 'archive', err); if (err) { + if (err.code === 'ObjNotFound') { + log.info('object metadata not found, cleaning orphan cold object', { + entry: entry.getLogInfo(), + method: 'LifecycleColdStatusArchiveTask.processEntry', + }); + return this._deleteColdObject(coldLocation, entry, log, next); + } return next(err); } @@ -87,7 +128,7 @@ class LifecycleColdStatusArchiveTask extends LifecycleUpdateTransitionTask { return process.nextTick(next); }, ], err => { - if (err) { + if (err && !(err instanceof SkipMdUpdateError)) { // if error occurs, do not commit offset return done(err, { committable: false }); }