Skip to content

Commit

Permalink
fix: timeoutExpiredCallback with all expired items
Browse files Browse the repository at this point in the history
  • Loading branch information
ilbertt committed Dec 8, 2023
1 parent d304b03 commit bacd12e
Show file tree
Hide file tree
Showing 2 changed files with 10 additions and 13 deletions.
12 changes: 6 additions & 6 deletions src/queues.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ describe("BaseQueue", () => {

beforeEach(() => {
queue = new BaseQueue({
itemCallback: (message: string) => true,
itemCallback: (_: string) => true,
});
});

Expand Down Expand Up @@ -121,6 +121,7 @@ describe("AckMessagesQueue", () => {
const expirationMs = 1000;

beforeEach(() => {
jest.useFakeTimers();
queue = new AckMessagesQueue({
expirationMs,
timeoutExpiredCallback: jest.fn(),
Expand All @@ -140,7 +141,7 @@ describe("AckMessagesQueue", () => {
});

it("should call the timeoutExpiredCallback for expired items when not receiving any ack", () => {
jest.useFakeTimers().setSystemTime(Date.now() + expirationMs + 1);
jest.setSystemTime(Date.now() + expirationMs + 1);
queue.add(BigInt(1));
jest.advanceTimersByTime(expirationMs + 1);
expect(queue.last()).toBeNull();
Expand Down Expand Up @@ -170,19 +171,18 @@ describe("AckMessagesQueue", () => {

it("should call the timeoutExpiredCallback for expired items when receiving the ack", () => {
queue.add(BigInt(1));
jest.useFakeTimers().setSystemTime(Date.now() + expirationMs + 1);
queue.add(BigInt(2));
queue.add(BigInt(3));
jest.setSystemTime(Date.now() + expirationMs + 1);
queue.ack(BigInt(1));
jest.advanceTimersByTime(expirationMs + 1);
expect(queue.last()).toBeNull();
expect(queue["_timeoutExpiredCallback"]).toHaveBeenCalledWith([BigInt(2)]);
expect(queue["_timeoutExpiredCallback"]).toHaveBeenCalledWith([BigInt(2), BigInt(3)]);
});

it("should call the timeoutExpiredCallback for all expired items after not receiving the ack", () => {
queue.add(BigInt(1));
queue.add(BigInt(2));
queue.add(BigInt(3));
jest.useFakeTimers();
queue.ack(BigInt(1));
jest.advanceTimersByTime(expirationMs);
expect(queue.last()).toBeNull();
Expand Down
11 changes: 4 additions & 7 deletions src/queues.ts
Original file line number Diff line number Diff line change
Expand Up @@ -173,13 +173,10 @@ export class AckMessagesQueue {
}

// for the remaining items in the queue, check if they have expired
// if yes, call the callback for the first expired item
for (const item of this._queue) {
if (Date.now() - item.addedAt >= this._expirationMs) {
// if it has expired and is still in the queue,
// it means it has not been acked, so we call the callback
return this._onTimeoutExpired([item]);
}
// if yes, call the callback for the expired items
const expiredItems = this._queue.filter((item) => Date.now() - item.addedAt >= this._expirationMs);
if (expiredItems.length > 0) {
return this._onTimeoutExpired(expiredItems);
}

this._restartLastAckTimeout();
Expand Down

0 comments on commit bacd12e

Please sign in to comment.