Skip to content

Commit

Permalink
Ensure lifecycle tasks wait for messages to be pushed
Browse files Browse the repository at this point in the history
Issue: BB-641
  • Loading branch information
francoisferrand committed Dec 18, 2024
1 parent af16d44 commit ca50af2
Show file tree
Hide file tree
Showing 2 changed files with 74 additions and 53 deletions.
25 changes: 14 additions & 11 deletions extensions/lifecycle/tasks/LifecycleTask.js
Original file line number Diff line number Diff line change
Expand Up @@ -400,6 +400,8 @@ class LifecycleTask extends BackbeatTask {
});
}

const promises = [];

// sending bucket entry - only once - for checking next listing
if (data.IsTruncated && allVersions.length > 0 && nbRetries === 0) {
// Uses last version whether Version or DeleteMarker
Expand All @@ -414,31 +416,30 @@ class LifecycleTask extends BackbeatTask {
prevDate: last.LastModified,
},
});
this._sendBucketEntry(entry, err => {
promises.push(new Promise(resolve => this._sendBucketEntry(entry, err => {
if (!err) {
log.debug('sent kafka entry for bucket ' +
'consumption', {
method: 'LifecycleTask._getObjectVersions',
});
}
});
resolve(); // safe to ignore the error, we will retry lifecycle eventually
})));
}

// if no versions to process, skip further processing for this
// batch
// if no versions to process, skip further processing for this batch
if (allVersionsWithStaleDate.length === 0) {
return done(null);
return Promise.allSettled(promises).then(() => done(), done);

Check warning on line 432 in extensions/lifecycle/tasks/LifecycleTask.js

View check run for this annotation

Codecov / codecov/patch/Backbeat

extensions/lifecycle/tasks/LifecycleTask.js#L432

Added line #L432 was not covered by tests
}

// for each version, get their relative rules, compare with
// bucket rules, match with `staleDate` to
// NoncurrentVersionExpiration Days and send expiration if
// rules all apply
return this._compareRulesToList(bucketData, bucketLCRules,
allVersionsWithStaleDate, log, versioningStatus,
err => {
promises.push(new Promise((resolve, reject) => this._compareRulesToList(bucketData,
bucketLCRules, allVersionsWithStaleDate, log, versioningStatus, err => {
if (err) {
return done(err);
return reject(err);

Check warning on line 442 in extensions/lifecycle/tasks/LifecycleTask.js

View check run for this annotation

Codecov / codecov/patch/Backbeat

extensions/lifecycle/tasks/LifecycleTask.js#L442

Added line #L442 was not covered by tests
}

if (!data.IsTruncated) {
Expand All @@ -453,8 +454,10 @@ class LifecycleTask extends BackbeatTask {
);
}

return done();
});
return resolve();
})));

return Promise.allSettled(promises).then(() => done(), done);
});
}

Expand Down
102 changes: 60 additions & 42 deletions extensions/lifecycle/tasks/LifecycleTaskV2.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
'use strict'; // eslint-disable-line

const async = require('async');
const util = require('util');
const { errors } = require('arsenal');

const LifecycleTask = require('./LifecycleTask');
Expand Down Expand Up @@ -39,33 +40,33 @@ class LifecycleTaskV2 extends LifecycleTask {
* @param {array} remainings - array of { prefix, listType, beforeDate }
* @param {object} bucketData - bucket data
* @param {Logger.newRequestLogger} log - logger object
* @param {function} done - callback(error)
* @return {undefined}
*/
_handleRemainingListings(remainings, bucketData, log) {
if (remainings && remainings.length) {
remainings.forEach(l => {
const {
prefix,
listType,
beforeDate,
storageClass,
} = l;

const entry = Object.assign({}, bucketData, {
contextInfo: { requestId: log.getSerializedUids() },
details: { beforeDate, prefix, listType, storageClass },
});
_handleRemainingListings(remainings, bucketData, log, done) {
async.forEach(remainings || [], (l, cb) => {
const {
prefix,
listType,
beforeDate,
storageClass,
} = l;

const entry = Object.assign({}, bucketData, {
contextInfo: { requestId: log.getSerializedUids() },
details: { beforeDate, prefix, listType, storageClass },
});

this._sendBucketEntry(entry, err => {
if (!err) {
log.debug(
'sent kafka entry for bucket consumption', {
method: 'LifecycleTaskV2._getVersionList',
});
}
});
this._sendBucketEntry(entry, err => {
if (!err) {
log.debug(
'sent kafka entry for bucket consumption', {
method: 'LifecycleTaskV2._getVersionList',
});
}
cb();
});
}
}, done);
}

/**
Expand Down Expand Up @@ -101,15 +102,19 @@ class LifecycleTaskV2 extends LifecycleTask {
return process.nextTick(done);
}

const promises = [];

// re-queue remaining listings only once
if (nbRetries === 0) {
this._handleRemainingListings(remainings, bucketData, log);
promises.push(util.promisify(this._handleRemainingListings).bind(this)(
remainings, bucketData, log,
));
}

return this.backbeatMetadataProxy.listLifecycle(listType, params, log,
(err, contents, isTruncated, markerInfo) => {
if (err) {
return done(err);
return Promise.allSettled(promises).then(() => done(err), () => done(err));

Check warning on line 117 in extensions/lifecycle/tasks/LifecycleTaskV2.js

View check run for this annotation

Codecov / codecov/patch/Backbeat

extensions/lifecycle/tasks/LifecycleTaskV2.js#L117

Added line #L117 was not covered by tests
}

// re-queue truncated listing only once.
Expand All @@ -125,17 +130,22 @@ class LifecycleTaskV2 extends LifecycleTask {
},
});

this._sendBucketEntry(entry, err => {
promises.push(new Promise(resolve => this._sendBucketEntry(entry, err => {
if (!err) {
log.debug(
'sent kafka entry for bucket consumption', {
method: 'LifecycleTaskV2._getObjectList',
});
method: 'LifecycleTaskV2._getObjectList',
});
}
});
resolve(); // safe to ignore the error, we will retry lifecycle eventually
})));
}
return this._compareRulesToList(bucketData, bucketLCRules,
contents, log, done);

promises.push(util.promisify(this._compareRulesToList).bind(this)(
bucketData, bucketLCRules, contents, log,
));

return Promise.allSettled(promises).then(() => done(), done);
});
}

Expand Down Expand Up @@ -173,15 +183,19 @@ class LifecycleTaskV2 extends LifecycleTask {
return process.nextTick(done);
}

const promises = [];

// re-queue remaining listings only once
if (nbRetries === 0) {
this._handleRemainingListings(remainings, bucketData, log);
promises.push(util.promisify(this._handleRemainingListings).bind(this)(
remainings, bucketData, log,
));
}

return this.backbeatMetadataProxy.listLifecycle(listType, params, log,
(err, contents, isTruncated, markerInfo) => {
if (err) {
return done(err);
return Promise.allSettled(promises).then(() => done(err), () => done(err));

Check warning on line 198 in extensions/lifecycle/tasks/LifecycleTaskV2.js

View check run for this annotation

Codecov / codecov/patch/Backbeat

extensions/lifecycle/tasks/LifecycleTaskV2.js#L198

Added line #L198 was not covered by tests
}

// create Set of unique keys not matching the next marker to
Expand Down Expand Up @@ -209,19 +223,21 @@ class LifecycleTaskV2 extends LifecycleTask {
},
});

this._sendBucketEntry(entry, err => {
promises.push(new Promise(resolve => this._sendBucketEntry(entry, err => {
if (!err) {
log.debug(
'sent kafka entry for bucket consumption', {
method: 'LifecycleTaskV2._getObjectList',
});
method: 'LifecycleTaskV2._getObjectVersions',
});
}
});
resolve(); // safe to ignore the error, we will retry lifecycle eventually
})));
}
return this._compareRulesToList(bucketData, bucketLCRules,
contents, log, err => {

promises.push(new Promise((resolve, reject) => this._compareRulesToList(
bucketData, bucketLCRules, contents, log, err => {
if (err) {
return done(err);
return reject(err);

Check warning on line 240 in extensions/lifecycle/tasks/LifecycleTaskV2.js

View check run for this annotation

Codecov / codecov/patch/Backbeat

extensions/lifecycle/tasks/LifecycleTaskV2.js#L240

Added line #L240 was not covered by tests
}

if (!isTruncated) {
Expand All @@ -236,8 +252,10 @@ class LifecycleTaskV2 extends LifecycleTask {
);
}

return done();
});
return resolve();
})));

return Promise.allSettled(promises).then(() => done(), done);
});
}

Expand Down

0 comments on commit ca50af2

Please sign in to comment.