From 0cc2f77bf1daa64e75490b53db24cd4a596df108 Mon Sep 17 00:00:00 2001 From: Francois Ferrand Date: Fri, 13 Dec 2024 19:27:46 +0100 Subject: [PATCH] Ensure lifecycle tasks wait for messages to be pushed Issue: BB-641 --- extensions/lifecycle/tasks/LifecycleTask.js | 23 ++-- extensions/lifecycle/tasks/LifecycleTaskV2.js | 102 ++++++++++-------- 2 files changed, 74 insertions(+), 51 deletions(-) diff --git a/extensions/lifecycle/tasks/LifecycleTask.js b/extensions/lifecycle/tasks/LifecycleTask.js index 4b7e1af4cf..1da1a275f4 100644 --- a/extensions/lifecycle/tasks/LifecycleTask.js +++ b/extensions/lifecycle/tasks/LifecycleTask.js @@ -1,6 +1,7 @@ 'use strict'; // eslint-disable-line const async = require('async'); +const util = require('util'); const { errors, versioning } = require('arsenal'); const { ObjectMD } = require('arsenal').models; const { supportedLifecycleRules } = require('arsenal').constants; @@ -400,6 +401,8 @@ class LifecycleTask extends BackbeatTask { }); } + const promises = []; + // sending bucket entry - only once - for checking next listing if (data.IsTruncated && allVersions.length > 0 && nbRetries === 0) { // Uses last version whether Version or DeleteMarker @@ -414,31 +417,31 @@ class LifecycleTask extends BackbeatTask { prevDate: last.LastModified, }, }); - this._sendBucketEntry(entry, err => { + promises.push(new Promise(resolve => this._sendBucketEntry(entry, err => { if (!err) { log.debug('sent kafka entry for bucket ' + 'consumption', { method: 'LifecycleTask._getObjectVersions', }); } - }); + resolve(); // safe to ignore the error, we will retry lifecycle eventually + }))); } // if no versions to process, skip further processing for this // batch if (allVersionsWithStaleDate.length === 0) { - return done(null); + return Promise.all(promises).then(() => done(null), () => done(null)); } // for each version, get their relative rules, compare with // bucket rules, match with `staleDate` to // NoncurrentVersionExpiration Days and send expiration if // rules all apply - return this._compareRulesToList(bucketData, bucketLCRules, - allVersionsWithStaleDate, log, versioningStatus, - err => { + promises.push(new Promise((resolve, reject) => this._compareRulesToList(bucketData, + bucketLCRules, allVersionsWithStaleDate, log, versioningStatus, err => { if (err) { - return done(err); + return reject(err); } if (!data.IsTruncated) { @@ -453,8 +456,10 @@ class LifecycleTask extends BackbeatTask { ); } - return done(); - }); + return resolve(); + }))); + + return Promise.all(promises).then(done, done); }); } diff --git a/extensions/lifecycle/tasks/LifecycleTaskV2.js b/extensions/lifecycle/tasks/LifecycleTaskV2.js index 6cd2034318..35c0bf7eab 100644 --- a/extensions/lifecycle/tasks/LifecycleTaskV2.js +++ b/extensions/lifecycle/tasks/LifecycleTaskV2.js @@ -1,6 +1,7 @@ 'use strict'; // eslint-disable-line const async = require('async'); +const util = require('util'); const { errors } = require('arsenal'); const LifecycleTask = require('./LifecycleTask'); @@ -39,33 +40,33 @@ class LifecycleTaskV2 extends LifecycleTask { * @param {array} remainings - array of { prefix, listType, beforeDate } * @param {object} bucketData - bucket data * @param {Logger.newRequestLogger} log - logger object + * @param {function} done - callback(error) * @return {undefined} */ - _handleRemainingListings(remainings, bucketData, log) { - if (remainings && remainings.length) { - remainings.forEach(l => { - const { - prefix, - listType, - beforeDate, - storageClass, - } = l; - - const entry = Object.assign({}, bucketData, { - contextInfo: { requestId: log.getSerializedUids() }, - details: { beforeDate, prefix, listType, storageClass }, - }); + _handleRemainingListings(remainings, bucketData, log, done) { + async.forEach(remainings || [], (l, cb) => { + const { + prefix, + listType, + beforeDate, + storageClass, + } = l; + + const entry = Object.assign({}, bucketData, { + contextInfo: { requestId: log.getSerializedUids() }, + details: { beforeDate, prefix, listType, storageClass }, + }); - this._sendBucketEntry(entry, err => { - if (!err) { - log.debug( - 'sent kafka entry for bucket consumption', { - method: 'LifecycleTaskV2._getVersionList', - }); - } - }); + this._sendBucketEntry(entry, err => { + if (!err) { + log.debug( + 'sent kafka entry for bucket consumption', { + method: 'LifecycleTaskV2._getVersionList', + }); + } + cb(); }); - } + }, done); } /** @@ -101,15 +102,19 @@ class LifecycleTaskV2 extends LifecycleTask { return process.nextTick(done); } + const promises = []; + // re-queue remaining listings only once if (nbRetries === 0) { - this._handleRemainingListings(remainings, bucketData, log); + promises.push(util.promisify(this._handleRemainingListings).bind(this)( + remainings, bucketData, log, + )); } return this.backbeatMetadataProxy.listLifecycle(listType, params, log, (err, contents, isTruncated, markerInfo) => { if (err) { - return done(err); + return Promise.all(promises).then(() => done(err), () => done(err)); } // re-queue truncated listing only once. @@ -125,17 +130,22 @@ class LifecycleTaskV2 extends LifecycleTask { }, }); - this._sendBucketEntry(entry, err => { + promises.push(new Promise(resolve => this._sendBucketEntry(entry, err => { if (!err) { log.debug( 'sent kafka entry for bucket consumption', { - method: 'LifecycleTaskV2._getObjectList', - }); + method: 'LifecycleTaskV2._getObjectList', + }); } - }); + resolve(); // safe to ignore the error, we will retry lifecycle eventually + }))); } - return this._compareRulesToList(bucketData, bucketLCRules, - contents, log, done); + + promises.push(util.promisify(this._compareRulesToList).bind(this)( + bucketData, bucketLCRules, contents, log, + )); + + return Promise.all(promises).then(done, done); }); } @@ -173,15 +183,19 @@ class LifecycleTaskV2 extends LifecycleTask { return process.nextTick(done); } + const promises = []; + // re-queue remaining listings only once if (nbRetries === 0) { - this._handleRemainingListings(remainings, bucketData, log); + promises.push(util.promisify(this._handleRemainingListings).bind(this)( + remainings, bucketData, log, + )); } return this.backbeatMetadataProxy.listLifecycle(listType, params, log, (err, contents, isTruncated, markerInfo) => { if (err) { - return done(err); + return Promise.all(promises).then(() => done(err), () => done(err)); } // create Set of unique keys not matching the next marker to @@ -209,19 +223,21 @@ class LifecycleTaskV2 extends LifecycleTask { }, }); - this._sendBucketEntry(entry, err => { + promises.push(new Promise(resolve => this._sendBucketEntry(entry, err => { if (!err) { log.debug( 'sent kafka entry for bucket consumption', { - method: 'LifecycleTaskV2._getObjectList', - }); + method: 'LifecycleTaskV2._getObjectVersions', + }); } - }); + resolve(); // safe to ignore the error, we will retry lifecycle eventually + }))); } - return this._compareRulesToList(bucketData, bucketLCRules, - contents, log, err => { + + promises.push(new Promise((resolve, reject) => this._compareRulesToList( + bucketData, bucketLCRules, contents, log, err => { if (err) { - return done(err); + return reject(err); } if (!isTruncated) { @@ -236,8 +252,10 @@ class LifecycleTaskV2 extends LifecycleTask { ); } - return done(); - }); + return resolve(); + }))); + + return Promise.all(promises).then(done, done); }); }