Skip to content

Commit

Permalink
fix(#8947): filter out valid outbound tasks with empty configs becaus…
Browse files Browse the repository at this point in the history
…e they are not due
  • Loading branch information
njuguna-n committed Sep 26, 2024
1 parent 806b590 commit a5dc0e4
Show file tree
Hide file tree
Showing 2 changed files with 81 additions and 4 deletions.
9 changes: 5 additions & 4 deletions sentinel/src/schedule/outbound.js
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ const attachInfoDocs = tasks => {
});
};

const batch = (configuredPushes, startKey) => {
const batch = (dueConfiguredPushes, startKey) => {
let nextKey;
return queuedTasks(startKey)
.then(({ validTasks = [], invalidTasks = [], lastDocId } = {}) => {
Expand All @@ -183,8 +183,9 @@ const batch = (configuredPushes, startKey) => {
.then(validTasks => {
const pushes = validTasks.reduce((acc, {task, doc, info}) => {
const pushesForDoc =
getConfigurationsToPush(configuredPushes, task)
.map(([key, config]) => ({task, doc, info, config, key}));
getConfigurationsToPush(dueConfiguredPushes, task)
.map(([key, config]) => ({task, doc, info, config, key}))
.filter(({config}) => config && Object.keys(config).length > 0); // Filter out tasks without configs

return acc.concat(pushesForDoc);
}, []);
Expand All @@ -200,7 +201,7 @@ const batch = (configuredPushes, startKey) => {
Promise.resolve()
);
})
.then(() => nextKey && batch(configuredPushes, nextKey));
.then(() => nextKey && batch(dueConfiguredPushes, nextKey));
};

// Coordinates the attempted pushing of documents that need it
Expand Down
76 changes: 76 additions & 0 deletions sentinel/tests/unit/schedule/outbound.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -784,5 +784,81 @@ describe('outbound schedule', () => {
assert.isUndefined(dueConfigs[INVALID_CRON]);
});
});

it('should not process tasks if they do not have due configurations', () => {
const attachInfoDocs = sinon.stub();
const queuedTasks = sinon.stub();
const removeInvalidTasks = sinon.stub();
const singlePush = sinon.stub();
restores.push(outbound.__set__('attachInfoDocs', attachInfoDocs));
restores.push(outbound.__set__('queuedTasks', queuedTasks));
restores.push(outbound.__set__('removeInvalidTasks', removeInvalidTasks));
restores.push(outbound.__set__('singlePush', singlePush));

const VALID_CRON = 'outbound-with-due-cron';
const VALID_CRON_2 = 'outbound-with-due-cron-2';


const configs = {
[VALID_CRON]: {
cron: '* * * * *'
},
[VALID_CRON_2]: {
cron: '15 * * * *'
}
};
const task1 = {
_id: 'task:outbound:test-doc-1',
doc_id: 'test-doc-1',
queue: [VALID_CRON]
};
const task2 = {
_id: 'task:outbound:test-doc-2',
doc_id: 'test-doc-2',
queue: ['test-push-2']
};
const doc1 = {
_id: 'test-doc-1', some: 'data-1'
};
const doc2 = {
_id: 'test-doc-2', some: 'data-2'
};
const doc1Info = {
_id: 'test-doc-1-info'
};
const doc2Info = {
_id: 'test-doc-2-info'
};

configGet.returns(configs);
singlePush.resolvesArg(0);
queuedTasks.onCall(0).resolves({
validTasks: [{ task: task1, doc: doc1 }, { task: task2, doc: doc2 }],
invalidTasks: [],
lastDocId: task2._id,
});
queuedTasks.onCall(1).resolves();
removeInvalidTasks.resolves();
attachInfoDocs
.onCall(0).resolves([{ task: task1, doc: doc1, info: doc1Info }, { task: task2, doc: doc2, info: doc2Info }])
.onCall(1).resolves([]);

singlePush.resolves();

clock = sinon.useFakeTimers(new Date('2024-09-10T03:05:00+0000').getTime());

return outbound.execute().then(() => {
assert.equal(singlePush.callCount, 1);
assert.equal(singlePush.args[0][0], task1);
assert.equal(singlePush.args[0][1], doc1);
assert.equal(singlePush.args[0][2], doc1Info);
assert.equal(singlePush.args[0][3], configs[VALID_CRON]);
assert.equal(singlePush.args[0][4], VALID_CRON);
assert.equal(singlePush.args.find(arg => arg[0] === task2), undefined);
assert.equal(attachInfoDocs.args[0][0][0].task, task1);
assert.equal(attachInfoDocs.args[0][0][1].task, task2);
assert.equal(removeInvalidTasks.callCount, 0);
});
});
});
});

0 comments on commit a5dc0e4

Please sign in to comment.