diff --git a/package.json b/package.json index da77e2e..80400a1 100644 --- a/package.json +++ b/package.json @@ -18,7 +18,7 @@ "test:docker": "docker-compose -f docker-compose.test.yml up --build --exit-code-from unit unit", "test-coverage:docker": "docker-compose -f docker-compose.test.yml up --build --exit-code-from coverage coverage" }, - "version": "1.8.3", + "version": "1.9.3", "main": "dist/index.js", "author": "Chris Carlson", "license": "MIT", diff --git a/src/__tests__/message-store-connector.test.ts b/src/__tests__/message-store-connector.test.ts index f4507f6..177027b 100644 --- a/src/__tests__/message-store-connector.test.ts +++ b/src/__tests__/message-store-connector.test.ts @@ -49,6 +49,74 @@ describe("Message Store Connector", () => { expect(message[0].id).toEqual(messageId); }); + test('Does not fail silently when closing a subscription due to a handler rejection', async () => { + const streamId = uuid(); + const messageId = uuid(); + const uniqueCategory = uuid().replace(/\-/g, '') + + const spy = jest.spyOn(NoopLogger, 'error'); + + await messageStore.writeMessage(`testCategorySub${uniqueCategory}:command-${streamId}`, { + id: messageId, + type: "TestEvent", + data: {}, + metadata: {}, + }) + + messageStore.subscribeToCategory( + uuid(), + `testCategorySub${uniqueCategory}:command`, + { + TestEvent: (message: Message, context: any) => { + throw new Error('Test error') + } + }, + { + pollingInterval: 100, + positionUpdateInterval: 100, + retries: 1 + } + ) + await wait(200) + + expect(spy).toHaveBeenCalledWith(expect.any(String), expect.arrayContaining([expect.any(Error)])); + + spy.mockRestore(); + }); + + test('Retries the handler when it fails', async () => { + const streamId = uuid(); + const messageId = uuid(); + const uniqueCategory = uuid().replace(/\-/g, '') + const fakeFunc = jest.fn() + + await messageStore.writeMessage(`testCategorySub${uniqueCategory}:command-${streamId}`, { + id: messageId, + type: "TestEvent", + data: {}, + metadata: {}, + }) + + messageStore.subscribeToCategory( + uuid(), + `testCategorySub${uniqueCategory}:command`, + { + TestEvent: (message: Message, context: any) => { + fakeFunc() + throw new Error('Test error') + } + }, + { + pollingInterval: 100, + positionUpdateInterval: 100, + retries: 2 + } + ) + await wait(200); + + expect(fakeFunc).toHaveBeenCalledTimes(2); + }); + test("Can write several message and retrieve them", async () => { const streamId = uuid(); await messageStore.writeMessage(`testStream-${streamId}`, { diff --git a/src/message-store/message-store.ts b/src/message-store/message-store.ts index 2187155..b4af426 100644 --- a/src/message-store/message-store.ts +++ b/src/message-store/message-store.ts @@ -57,19 +57,13 @@ export class MessageStore implements IMessageStore { interval: pollingInterval, name: `${subscriberId} Poll to ${streamName}`, retries, - shouldContinue: (_: any, resolvedValue: unknown) => { - if (shouldUnsubscribe) return false; - return resolvedValue ? true : false; + shouldContinue: (_rejectionReason: any, _resolvedValue: unknown) => { + return shouldUnsubscribe === false; }, }); - // This is kinda weird check logic that needs to happen for promisePoller library on cancelled subscriptions - poller.then().catch((e) => { - if (e instanceof Array) { - this.logger.debug("Subscription Closed"); - } else { - throw e; - } + poller.catch((e) => { + this.logger.error(`Subscription closing: ${subscriberId} to ${streamName}.\r\nSubscription encountered the following errors:`, e); }); return { unsubscribe };