diff --git a/tests/config.json b/tests/config.json index f9534305be..2c2f345f01 100644 --- a/tests/config.json +++ b/tests/config.json @@ -156,6 +156,7 @@ "zookeeperPath": "/lifecycletest", "bucketTasksTopic": "backbeat-test-dummy-bucket-task", "objectTasksTopic": "backbeat-test-dummy-object-task", + "transitionTasksTopic": "backbeat-test-dummy-transition-task", "conductor": { "cronRule": "0 */5 * * * *", "probeServer": { @@ -196,11 +197,29 @@ "port": 8554 } }, + "transitionProcessor": { + "groupId": "backbeat-lifecycle-transition-processor-group", + "retry": { + "maxRetries": 5, + "timeoutS": 300, + "backoff": { + "min": 1000, + "max": 300000, + "jitter": 0.1, + "factor": 1.5 + } + }, + "concurrency": 10, + "probeServer": { + "port": 8554 + } + }, "auth": { "type": "account", "account": "bart" }, - "coldStorageArchiveTopicPrefix": "cold-archive-req-" + "coldStorageArchiveTopicPrefix": "cold-archive-req-", + "coldStorageTopics": [] }, "gc": { "topic": "backbeat-test-gc", diff --git a/tests/functional/lifecycle/LifecycleConductor.spec.js b/tests/functional/lifecycle/LifecycleConductor.spec.js index 95289881c6..a28899a696 100644 --- a/tests/functional/lifecycle/LifecycleConductor.spec.js +++ b/tests/functional/lifecycle/LifecycleConductor.spec.js @@ -95,6 +95,7 @@ const baseLCConfig = { zookeeperPath: '/test/lifecycle', bucketTasksTopic, objectTasksTopic: 'backbeat-lifecycle-object-tasks-spec', + transitionTasksTopic: 'backbeat-lifecycle-transition-tasks-spec', conductor: { cronRule: '*/5 * * * * *', backlogControl: { diff --git a/tests/functional/lifecycle/LifecycleTask.js b/tests/functional/lifecycle/LifecycleTask.js index 4b6377f95a..a2d595b29c 100644 --- a/tests/functional/lifecycle/LifecycleTask.js +++ b/tests/functional/lifecycle/LifecycleTask.js @@ -8,6 +8,12 @@ const LifecycleRule = require('arsenal').models.LifecycleRule; const LifecycleTask = require('../../../extensions/lifecycle/tasks/LifecycleTask'); const testConfig = require('../../config.json'); const { objectMD } = require('../utils/MetadataMock'); +const { + bucketTasksTopic, + objectTasksTopic, + transitionTasksTopic, +} = require('./configObjects'); + const timeOptions = { expireOneDayEarlier: true, transitionOneDayEarlier: true, @@ -328,10 +334,10 @@ class ProducerMock { sendToTopic(topicName, entries, cb) { const entry = JSON.parse(entries[0].message); - if (topicName === 'bucket-tasks') { + if (topicName === bucketTasksTopic) { this.sendCount.bucket++; this.entries.bucket.push(entry); - } else if (topicName === 'object-tasks') { + } else if (topicName === objectTasksTopic) { this.sendCount.object++; this.entries.object.push(entry.target.key); } else if (topicName === 'backbeat-data-mover') { @@ -421,8 +427,9 @@ class LifecycleBucketProcessorMock { bootstrapList: [{ site: 'us-east-2', type: 'aws_s3' }], s3Endpoint: s3config.endpoint, s3Auth: lifecycle.auth, - bucketTasksTopic: 'bucket-tasks', - objectTasksTopic: 'object-tasks', + bucketTasksTopic, + objectTasksTopic, + transitionTasksTopic, kafkaBacklogMetrics: this._kafkaBacklogMetrics, pausedLocations: new Set(), log: this._log, @@ -510,6 +517,7 @@ describe('lifecycle task functional tests', function dF() { entries.transitions.forEach(transition => { assert.strictEqual(transition.target.bucket, 'test-bucket'); assert.strictEqual(transition.toLocation, 'us-east-2'); + assert.strictEqual(transition.resultsTopic, transitionTasksTopic); }); } diff --git a/tests/functional/lifecycle/LifecycleTaskV2-versioned.js b/tests/functional/lifecycle/LifecycleTaskV2-versioned.js index a303850341..f8682c5643 100644 --- a/tests/functional/lifecycle/LifecycleTaskV2-versioned.js +++ b/tests/functional/lifecycle/LifecycleTaskV2-versioned.js @@ -39,9 +39,11 @@ const destinationLocation = 'us-east-2'; const bucketTopic = 'bucket-topic'; const objectTopic = 'object-topic'; +const transitionTopic = 'transition-topic'; const dataMoverTopic = 'backbeat-data-mover'; const testKafkaEntry = new TestKafkaEntry({ objectTopic, + transitionTopic, bucketTopic, dataMoverTopic, ownerId, @@ -72,6 +74,7 @@ describe('LifecycleTaskV2 with bucket versioned', () => { producer, bucketTasksTopic: bucketTopic, objectTasksTopic: objectTopic, + transitionTasksTopic: transitionTopic, kafkaBacklogMetrics: { snapshotTopicOffsets: () => {} }, pausedLocations: new Set(), log, diff --git a/tests/functional/lifecycle/LifecycleTaskV2.js b/tests/functional/lifecycle/LifecycleTaskV2.js index 573d3c1295..b625b3e0da 100644 --- a/tests/functional/lifecycle/LifecycleTaskV2.js +++ b/tests/functional/lifecycle/LifecycleTaskV2.js @@ -40,9 +40,11 @@ const destinationLocation = 'us-east-2'; const bucketTopic = 'bucket-topic'; const objectTopic = 'object-topic'; +const transitionTopic = 'transition-topic'; const dataMoverTopic = 'backbeat-data-mover'; const testKafkaEntry = new TestKafkaEntry({ objectTopic, + transitionTopic, bucketTopic, dataMoverTopic, ownerId, @@ -73,6 +75,7 @@ describe('LifecycleTaskV2 with bucket non-versioned', () => { producer, bucketTasksTopic: bucketTopic, objectTasksTopic: objectTopic, + transitionTasksTopic: transitionTopic, kafkaBacklogMetrics: { snapshotTopicOffsets: () => {} }, pausedLocations: new Set(), log, diff --git a/tests/functional/lifecycle/configObjects.js b/tests/functional/lifecycle/configObjects.js index 5c27d5cf8c..f1e9fe71d0 100644 --- a/tests/functional/lifecycle/configObjects.js +++ b/tests/functional/lifecycle/configObjects.js @@ -1,5 +1,6 @@ const bucketTasksTopic = 'bucket-tasks'; const objectTasksTopic = 'object-tasks'; +const transitionTasksTopic = 'transition-tasks'; const zkConfig = { connectionString: 'localhost:2181', @@ -35,6 +36,7 @@ const lcConfig = { }, bucketTasksTopic, objectTasksTopic, + transitionTasksTopic, rules: { expiration: { enabled: true, @@ -69,6 +71,7 @@ const timeOptions = { module.exports = { bucketTasksTopic, objectTasksTopic, + transitionTasksTopic, zkConfig, kafkaConfig, lcConfig, diff --git a/tests/functional/lifecycle/utils.js b/tests/functional/lifecycle/utils.js index 074a14f7bb..bda5d4992e 100644 --- a/tests/functional/lifecycle/utils.js +++ b/tests/functional/lifecycle/utils.js @@ -6,6 +6,7 @@ class TestKafkaEntry { constructor(state) { const { objectTopic, + transitionTopic, bucketTopic, dataMoverTopic, ownerId, @@ -14,6 +15,7 @@ class TestKafkaEntry { } = state; this.objectTopic = objectTopic; + this.transitionTopic = transitionTopic; this.bucketTopic = bucketTopic; this.dataMoverTopic = dataMoverTopic; this.ownerId = ownerId; @@ -96,7 +98,7 @@ class TestKafkaEntry { assert.strictEqual(metrics.contentLength, contentLength); assert.strictEqual(message.toLocation, destinationLocation); - assert.strictEqual(message.resultsTopic, this.objectTopic); + assert.strictEqual(message.resultsTopic, this.transitionTopic); } expectBucketEntry(e, { diff --git a/tests/unit/lifecycle/LifecycleObjectExpirationProcessor.spec.js b/tests/unit/lifecycle/LifecycleObjectExpirationProcessor.spec.js new file mode 100644 index 0000000000..f0cf6a8d8b --- /dev/null +++ b/tests/unit/lifecycle/LifecycleObjectExpirationProcessor.spec.js @@ -0,0 +1,26 @@ +const assert = require('assert'); +const config = require('../../config.json'); +const LifecycleObjectExpirationProcessor = + require('../../../extensions/lifecycle/objectProcessor/LifecycleObjectExpirationProcessor'); + +describe('LifecycleObjectExpirationProcessor', () => { + let objectProcessor; + + beforeEach(() => { + objectProcessor = new LifecycleObjectExpirationProcessor( + config.zookeeper, + config.kafka, + config.extensions.lifecycle, + config.s3, + ); + }); + + it('consumer params should contain object tasks topic', () => { + const consumerParams = objectProcessor.getConsumerParams(); + assert.deepStrictEqual(Object.keys(consumerParams), [config.extensions.lifecycle.objectTasksTopic]); + assert.strictEqual( + consumerParams[config.extensions.lifecycle.objectTasksTopic].topic, + config.extensions.lifecycle.objectTasksTopic, + ); + }); +}); diff --git a/tests/unit/lifecycle/LifecycleObjectTransitionProcessor.spec.js b/tests/unit/lifecycle/LifecycleObjectTransitionProcessor.spec.js new file mode 100644 index 0000000000..7746de48f3 --- /dev/null +++ b/tests/unit/lifecycle/LifecycleObjectTransitionProcessor.spec.js @@ -0,0 +1,26 @@ +const assert = require('assert'); +const config = require('../../config.json'); +const LifecycleObjectTransitionProcessor = + require('../../../extensions/lifecycle/objectProcessor/LifecycleObjectTransitionProcessor'); + +describe('LifecycleObjectExpirationProcessor', () => { + let objectProcessor; + + beforeEach(() => { + objectProcessor = new LifecycleObjectTransitionProcessor( + config.zookeeper, + config.kafka, + config.extensions.lifecycle, + config.s3, + ); + }); + + it('consumer params should contain transition tasks topic', () => { + const consumerParams = objectProcessor.getConsumerParams(); + assert.deepStrictEqual(Object.keys(consumerParams), [config.extensions.lifecycle.transitionTasksTopic]); + assert.strictEqual( + consumerParams[config.extensions.lifecycle.transitionTasksTopic].topic, + config.extensions.lifecycle.transitionTasksTopic + ); + }); +});