From 24af00a281dbf1e142f3116ef7a84e0fd7a319ef Mon Sep 17 00:00:00 2001 From: - <> Date: Mon, 18 Nov 2024 18:41:14 +0200 Subject: [PATCH 1/4] Fix the resumeOnRestart function to handle correctly also the recurring jobs + add unit tests --- src/pulse/resume-on-restart.ts | 46 +++++++++++++++++++++++++++++++++- test/unit/pulse.spec.ts | 42 ++++++++++++++++++++++++++++--- 2 files changed, 83 insertions(+), 5 deletions(-) diff --git a/src/pulse/resume-on-restart.ts b/src/pulse/resume-on-restart.ts index 8b77a3b..8ddadcd 100644 --- a/src/pulse/resume-on-restart.ts +++ b/src/pulse/resume-on-restart.ts @@ -1,5 +1,6 @@ import createDebugger from 'debug'; import { Pulse } from '.'; +import { Job } from '../job'; const debug = createDebugger('pulse:resumeOnRestart'); @@ -18,6 +19,8 @@ export const resumeOnRestart: ResumeOnRestartMethod = function (this: Pulse, res if (this._collection && this._resumeOnRestart) { const now = new Date(); + + // Non-recurring jobs this._collection .updateMany( { @@ -41,7 +44,48 @@ export const resumeOnRestart: ResumeOnRestartMethod = function (this: Pulse, res ) .then((result) => { if (result.modifiedCount > 0) { - debug('resuming unfinished %d jobs(%s)', result.modifiedCount, now.toISOString()); + debug('Resumed %d unfinished standard jobs (%s)', result.modifiedCount, now.toISOString()); + } + }); + + // Handling for recurring jobs using repeatInterval or repeatAt + this._collection + .find({ + $or: [{ repeatInterval: { $exists: true } }, { repeatAt: { $exists: true } }], + nextRunAt: { $exists: false }, + }) + .toArray() + .then((jobs) => { + const updates = jobs.map((jobData) => { + const job = new Job({ + pulse: this, + name: jobData.name || '', + data: jobData.data || {}, + type: jobData.type || 'normal', + priority: jobData.priority || 'normal', + shouldSaveResult: jobData.shouldSaveResult || false, + attempts: jobData.attempts || 0, + backoff: jobData.backoff, + ...jobData, + }); + + job.computeNextRunAt(); + + return this._collection.updateOne( + { _id: job.attrs._id }, + { + $set: { nextRunAt: job.attrs.nextRunAt }, + $unset: { lockedAt: undefined, lastModifiedBy: undefined, lastRunAt: undefined }, + } + ); + }); + + return Promise.all(updates); + }) + .then((results) => { + const modifiedCount = results.filter((res) => res.modifiedCount > 0).length; + if (modifiedCount > 0) { + debug('Resumed %d recurring jobs (%s)', modifiedCount, now.toISOString()); } }); } diff --git a/test/unit/pulse.spec.ts b/test/unit/pulse.spec.ts index 172b424..46bfb65 100644 --- a/test/unit/pulse.spec.ts +++ b/test/unit/pulse.spec.ts @@ -206,17 +206,51 @@ describe('Test Pulse', () => { }); describe('Test resumeOnRestart', () => { - test('sets the default resumeOnRestart', () => { + test('should enable resumeOnRestart by default', () => { expect(globalPulseInstance._resumeOnRestart).toBeTruthy(); }); - test('sets the custom resumeOnRestart', () => { + test('should disable resumeOnRestart when set to false', () => { globalPulseInstance.resumeOnRestart(false); expect(globalPulseInstance._resumeOnRestart).toBeFalsy(); }); - test('returns itself', () => { - expect(globalPulseInstance.resumeOnRestart(false)).toEqual(globalPulseInstance); + test('should resume non-recurring jobs on restart', async () => { + const job = globalPulseInstance.create('sendEmail', { to: 'user@example.com' }); + job.attrs.nextRunAt = new Date(Date.now() - 1000); // Set nextRunAt in the past + await job.save(); + + await globalPulseInstance.resumeOnRestart(); + + const updatedJob = (await globalPulseInstance.jobs({ name: 'sendEmail' }))[0]; + const now = Date.now(); + expect(updatedJob.attrs.nextRunAt?.getTime()).toBeGreaterThan(now - 100); // Allow a 100ms buffer + }); + + test('should resume recurring jobs on restart', async () => { + const job = globalPulseInstance.create('sendEmail', { to: 'user@example.com' }); + job.attrs.repeatInterval = '5 minutes'; + job.attrs.nextRunAt = null; + await job.save(); + + await globalPulseInstance.resumeOnRestart(); + + const updatedJob = (await globalPulseInstance.jobs({ name: 'sendEmail' }))[0]; + const now = Date.now(); + expect(updatedJob.attrs.nextRunAt).not.toBeNull(); + expect(updatedJob.attrs.nextRunAt?.getTime()).toBeGreaterThan(now - 100); // Allow a 100ms buffer + }); + + test('should not modify jobs with existing nextRunAt', async () => { + const futureDate = new Date(Date.now() + 60 * 60 * 1000); + const job = globalPulseInstance.create('sendEmail', { to: 'user@example.com' }); + job.attrs.nextRunAt = futureDate; + await job.save(); + + await globalPulseInstance.resumeOnRestart(); + + const updatedJob = (await globalPulseInstance.jobs({ name: 'sendEmail' }))[0]; + expect(updatedJob.attrs.nextRunAt?.getTime()).toEqual(futureDate.getTime()); }); }); }); From 8d6686194888e67ddd7478bf67f1f7a4beb8d85f Mon Sep 17 00:00:00 2001 From: b0dea <> Date: Tue, 19 Nov 2024 08:21:24 +0200 Subject: [PATCH 2/4] Do better checks when searching for jobs to resume on restart + add more unit tests --- src/job/index.ts | 7 ++- src/pulse/resume-on-restart.ts | 13 +++-- test/unit/pulse.spec.ts | 100 +++++++++++++++++++++++++++++++-- 3 files changed, 108 insertions(+), 12 deletions(-) diff --git a/src/job/index.ts b/src/job/index.ts index 33b4a38..3f768a7 100644 --- a/src/job/index.ts +++ b/src/job/index.ts @@ -186,7 +186,7 @@ class Job { attrs: JobAttributes; constructor(options: Modify, { _id?: mongodb.ObjectId }>) { - const { pulse, type, nextRunAt, ...args } = options ?? {}; + const { pulse, type, nextRunAt, repeatAt, repeatInterval, lastFinishedAt, ...args } = options ?? {}; // Save Pulse instance this.pulse = pulse; @@ -213,7 +213,10 @@ class Job { name: attrs.name || '', priority: attrs.priority, type: type || 'once', - nextRunAt: nextRunAt || new Date(), + // if a job that's non-recurring has a lastFinishedAt (finished the job), do not default nextRunAt to now + // only if it will be defaulted either by explicitly setting it or by computing it computeNextRunAt + nextRunAt: + repeatAt || repeatInterval ? nextRunAt || new Date() : !lastFinishedAt ? nextRunAt || new Date() : nextRunAt, }; } diff --git a/src/pulse/resume-on-restart.ts b/src/pulse/resume-on-restart.ts index 8ddadcd..dbe776f 100644 --- a/src/pulse/resume-on-restart.ts +++ b/src/pulse/resume-on-restart.ts @@ -28,11 +28,14 @@ export const resumeOnRestart: ResumeOnRestartMethod = function (this: Pulse, res { lockedAt: { $exists: true }, nextRunAt: { $ne: null }, - $or: [{ $expr: { $eq: ['$runCount', '$finishedCount'] } }, { lastFinishedAt: { $exists: false } }], + $or: [ + { $expr: { $eq: ['$runCount', '$finishedCount'] } }, + { $or: [{ lastFinishedAt: { $exists: false } }, { lastFinishedAt: null }] }, + ], }, { lockedAt: { $exists: false }, - lastFinishedAt: { $exists: false }, + $or: [{ lastFinishedAt: { $exists: false } }, { lastFinishedAt: null }], nextRunAt: { $lte: now, $ne: null }, }, ], @@ -51,8 +54,10 @@ export const resumeOnRestart: ResumeOnRestartMethod = function (this: Pulse, res // Handling for recurring jobs using repeatInterval or repeatAt this._collection .find({ - $or: [{ repeatInterval: { $exists: true } }, { repeatAt: { $exists: true } }], - nextRunAt: { $exists: false }, + $and: [ + { $or: [{ repeatInterval: { $exists: true } }, { repeatAt: { $exists: true } }] }, + { $or: [{ nextRunAt: { $lte: now } }, { nextRunAt: { $exists: false } }, { nextRunAt: null }] }, + ], }) .toArray() .then((jobs) => { diff --git a/test/unit/pulse.spec.ts b/test/unit/pulse.spec.ts index 46bfb65..e2ec1b2 100644 --- a/test/unit/pulse.spec.ts +++ b/test/unit/pulse.spec.ts @@ -215,19 +215,30 @@ describe('Test Pulse', () => { expect(globalPulseInstance._resumeOnRestart).toBeFalsy(); }); + test('should not reschedule successfully finished non-recurring jobs', async () => { + const job = globalPulseInstance.create('sendEmail', { to: 'user@example.com' }); + job.attrs.lastFinishedAt = new Date(); + job.attrs.nextRunAt = null; + await job.save(); + + await globalPulseInstance.resumeOnRestart(); + + const updatedJob = (await globalPulseInstance.jobs({ name: 'sendEmail' }))[0]; + expect(updatedJob.attrs.nextRunAt).toBeNull(); + }); + test('should resume non-recurring jobs on restart', async () => { const job = globalPulseInstance.create('sendEmail', { to: 'user@example.com' }); - job.attrs.nextRunAt = new Date(Date.now() - 1000); // Set nextRunAt in the past + job.attrs.nextRunAt = new Date(Date.now() - 1000); await job.save(); await globalPulseInstance.resumeOnRestart(); const updatedJob = (await globalPulseInstance.jobs({ name: 'sendEmail' }))[0]; - const now = Date.now(); - expect(updatedJob.attrs.nextRunAt?.getTime()).toBeGreaterThan(now - 100); // Allow a 100ms buffer + expect(updatedJob.attrs.nextRunAt?.getTime()).toBeGreaterThan(Date.now() - 100); }); - test('should resume recurring jobs on restart', async () => { + test('should resume recurring jobs on restart - interval', async () => { const job = globalPulseInstance.create('sendEmail', { to: 'user@example.com' }); job.attrs.repeatInterval = '5 minutes'; job.attrs.nextRunAt = null; @@ -236,9 +247,31 @@ describe('Test Pulse', () => { await globalPulseInstance.resumeOnRestart(); const updatedJob = (await globalPulseInstance.jobs({ name: 'sendEmail' }))[0]; - const now = Date.now(); expect(updatedJob.attrs.nextRunAt).not.toBeNull(); - expect(updatedJob.attrs.nextRunAt?.getTime()).toBeGreaterThan(now - 100); // Allow a 100ms buffer + }); + + test('should resume recurring jobs on restart - cron', async () => { + const job = globalPulseInstance.create('sendEmail', { to: 'user@example.com' }); + job.attrs.repeatInterval = '*/5 * * * *'; + job.attrs.nextRunAt = null; + await job.save(); + + await globalPulseInstance.resumeOnRestart(); + + const updatedJob = (await globalPulseInstance.jobs({ name: 'sendEmail' }))[0]; + expect(updatedJob.attrs.nextRunAt).not.toBeNull(); + }); + + test('should resume recurring jobs on restart - repeatAt', async () => { + const job = globalPulseInstance.create('sendEmail', { to: 'user@example.com' }); + job.attrs.repeatAt = '1:00 am'; + job.attrs.nextRunAt = null; + await job.save(); + + await globalPulseInstance.resumeOnRestart(); + + const updatedJob = (await globalPulseInstance.jobs({ name: 'sendEmail' }))[0]; + expect(updatedJob.attrs.nextRunAt).not.toBeNull(); }); test('should not modify jobs with existing nextRunAt', async () => { @@ -252,6 +285,61 @@ describe('Test Pulse', () => { const updatedJob = (await globalPulseInstance.jobs({ name: 'sendEmail' }))[0]; expect(updatedJob.attrs.nextRunAt?.getTime()).toEqual(futureDate.getTime()); }); + + test('should handle jobs that started but have not finished (non-recurring)', async () => { + const job = globalPulseInstance.create('processData', { data: 'sample' }); + job.attrs.nextRunAt = null; + job.attrs.lockedAt = new Date(); + await job.save(); + + await globalPulseInstance.resumeOnRestart(); + + const updatedJob = (await globalPulseInstance.jobs({ name: 'processData' }))[0]; + + const now = Date.now(); + expect(updatedJob.attrs.nextRunAt).not.toBeNull(); + expect(updatedJob.attrs.nextRunAt?.getTime()).toBeGreaterThan(now - 100); + }); + + test('should handle recurring jobs that started but have not finished', async () => { + const job = globalPulseInstance.create('processData', { data: 'sample' }); + job.attrs.repeatInterval = '10 minutes'; + job.attrs.lockedAt = new Date(); + job.attrs.nextRunAt = new Date(Date.now() + 10000); // Next run in 10 seconds + await job.save(); + + await globalPulseInstance.resumeOnRestart(); + + const updatedJob = (await globalPulseInstance.jobs({ name: 'processData' }))[0]; + expect(updatedJob.attrs.lockedAt).not.toBeNull(); // Job remains locked + expect(updatedJob.attrs.nextRunAt).not.toBeNull(); // Scheduling intact + }); + + test('should handle interrupted recurring jobs after server recovery', async () => { + const job = globalPulseInstance.create('processData', { data: 'sample' }); + job.attrs.repeatInterval = '5 minutes'; + job.attrs.lastModifiedBy = 'server_crash'; + job.attrs.nextRunAt = null; + await job.save(); + + await globalPulseInstance.resumeOnRestart(); + + const updatedJob = (await globalPulseInstance.jobs({ name: 'processData' }))[0]; + expect(updatedJob.attrs.nextRunAt).not.toBeNull(); + expect(updatedJob.attrs.lastModifiedBy).not.toEqual('server_crash'); + }); + + test('should not modify non-recurring jobs with lastFinishedAt in the past', async () => { + const job = globalPulseInstance.create('sendEmail', { to: 'user@example.com' }); + job.attrs.lastFinishedAt = new Date(Date.now() - 10000); // Finished 10 seconds ago + job.attrs.nextRunAt = null; + await job.save(); + + await globalPulseInstance.resumeOnRestart(); + + const updatedJob = (await globalPulseInstance.jobs({ name: 'sendEmail' }))[0]; + expect(updatedJob.attrs.nextRunAt).toBeNull(); // Job remains finished + }); }); }); From a1fb95bc9456003e6a14447f9126a3e6586c9882 Mon Sep 17 00:00:00 2001 From: b0dea <> Date: Tue, 19 Nov 2024 08:39:16 +0200 Subject: [PATCH 3/4] Recover missing test case + remove unnecessary renaming and comments. --- test/unit/pulse.spec.ts | 18 +++++++++++------- 1 file changed, 11 insertions(+), 7 deletions(-) diff --git a/test/unit/pulse.spec.ts b/test/unit/pulse.spec.ts index e2ec1b2..bfb566b 100644 --- a/test/unit/pulse.spec.ts +++ b/test/unit/pulse.spec.ts @@ -206,15 +206,19 @@ describe('Test Pulse', () => { }); describe('Test resumeOnRestart', () => { - test('should enable resumeOnRestart by default', () => { + test('sets the default resumeOnRestart', () => { expect(globalPulseInstance._resumeOnRestart).toBeTruthy(); }); - test('should disable resumeOnRestart when set to false', () => { + test('sets the custom resumeOnRestart', () => { globalPulseInstance.resumeOnRestart(false); expect(globalPulseInstance._resumeOnRestart).toBeFalsy(); }); + test('returns itself', () => { + expect(globalPulseInstance.resumeOnRestart(false)).toEqual(globalPulseInstance); + }); + test('should not reschedule successfully finished non-recurring jobs', async () => { const job = globalPulseInstance.create('sendEmail', { to: 'user@example.com' }); job.attrs.lastFinishedAt = new Date(); @@ -305,14 +309,14 @@ describe('Test Pulse', () => { const job = globalPulseInstance.create('processData', { data: 'sample' }); job.attrs.repeatInterval = '10 minutes'; job.attrs.lockedAt = new Date(); - job.attrs.nextRunAt = new Date(Date.now() + 10000); // Next run in 10 seconds + job.attrs.nextRunAt = new Date(Date.now() + 10000); await job.save(); await globalPulseInstance.resumeOnRestart(); const updatedJob = (await globalPulseInstance.jobs({ name: 'processData' }))[0]; - expect(updatedJob.attrs.lockedAt).not.toBeNull(); // Job remains locked - expect(updatedJob.attrs.nextRunAt).not.toBeNull(); // Scheduling intact + expect(updatedJob.attrs.lockedAt).not.toBeNull(); + expect(updatedJob.attrs.nextRunAt).not.toBeNull(); }); test('should handle interrupted recurring jobs after server recovery', async () => { @@ -331,14 +335,14 @@ describe('Test Pulse', () => { test('should not modify non-recurring jobs with lastFinishedAt in the past', async () => { const job = globalPulseInstance.create('sendEmail', { to: 'user@example.com' }); - job.attrs.lastFinishedAt = new Date(Date.now() - 10000); // Finished 10 seconds ago + job.attrs.lastFinishedAt = new Date(Date.now() - 10000); job.attrs.nextRunAt = null; await job.save(); await globalPulseInstance.resumeOnRestart(); const updatedJob = (await globalPulseInstance.jobs({ name: 'sendEmail' }))[0]; - expect(updatedJob.attrs.nextRunAt).toBeNull(); // Job remains finished + expect(updatedJob.attrs.nextRunAt).toBeNull(); }); }); }); From 5a3e1fa849d36edf73d14572a0985374a481413c Mon Sep 17 00:00:00 2001 From: b0dea <> Date: Tue, 19 Nov 2024 08:40:09 +0200 Subject: [PATCH 4/4] comment --- src/pulse/resume-on-restart.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/pulse/resume-on-restart.ts b/src/pulse/resume-on-restart.ts index dbe776f..baf6afb 100644 --- a/src/pulse/resume-on-restart.ts +++ b/src/pulse/resume-on-restart.ts @@ -51,7 +51,7 @@ export const resumeOnRestart: ResumeOnRestartMethod = function (this: Pulse, res } }); - // Handling for recurring jobs using repeatInterval or repeatAt + // Recurring jobs this._collection .find({ $and: [