Skip to content

Commit

Permalink
Updated promise-poller usage to not silently fail
Browse files Browse the repository at this point in the history
  • Loading branch information
doughtnerd committed Feb 12, 2024
1 parent 8d5741e commit 53fa427
Show file tree
Hide file tree
Showing 3 changed files with 73 additions and 11 deletions.
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
"test:docker": "docker-compose -f docker-compose.test.yml up --build --exit-code-from unit unit",
"test-coverage:docker": "docker-compose -f docker-compose.test.yml up --build --exit-code-from coverage coverage"
},
"version": "1.8.3",
"version": "1.9.3",
"main": "dist/index.js",
"author": "Chris Carlson",
"license": "MIT",
Expand Down
68 changes: 68 additions & 0 deletions src/__tests__/message-store-connector.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,74 @@ describe("Message Store Connector", () => {
expect(message[0].id).toEqual(messageId);
});

test('Does not fail silently when closing a subscription due to a handler rejection', async () => {
const streamId = uuid();
const messageId = uuid();
const uniqueCategory = uuid().replace(/\-/g, '')

const spy = jest.spyOn(NoopLogger, 'error');

await messageStore.writeMessage(`testCategorySub${uniqueCategory}:command-${streamId}`, {
id: messageId,
type: "TestEvent",
data: {},
metadata: {},
})

messageStore.subscribeToCategory(
uuid(),
`testCategorySub${uniqueCategory}:command`,
{
TestEvent: (message: Message, context: any) => {
throw new Error('Test error')
}
},
{
pollingInterval: 100,
positionUpdateInterval: 100,
retries: 1
}
)
await wait(200)

expect(spy).toHaveBeenCalledWith(expect.any(String), expect.arrayContaining([expect.any(Error)]));

spy.mockRestore();
});

test('Retries the handler when it fails', async () => {
const streamId = uuid();
const messageId = uuid();
const uniqueCategory = uuid().replace(/\-/g, '')
const fakeFunc = jest.fn()

await messageStore.writeMessage(`testCategorySub${uniqueCategory}:command-${streamId}`, {
id: messageId,
type: "TestEvent",
data: {},
metadata: {},
})

messageStore.subscribeToCategory(
uuid(),
`testCategorySub${uniqueCategory}:command`,
{
TestEvent: (message: Message, context: any) => {
fakeFunc()
throw new Error('Test error')
}
},
{
pollingInterval: 100,
positionUpdateInterval: 100,
retries: 2
}
)
await wait(200);

expect(fakeFunc).toHaveBeenCalledTimes(2);
});

test("Can write several message and retrieve them", async () => {
const streamId = uuid();
await messageStore.writeMessage(`testStream-${streamId}`, {
Expand Down
14 changes: 4 additions & 10 deletions src/message-store/message-store.ts
Original file line number Diff line number Diff line change
Expand Up @@ -57,19 +57,13 @@ export class MessageStore implements IMessageStore {
interval: pollingInterval,
name: `${subscriberId} Poll to ${streamName}`,
retries,
shouldContinue: (_: any, resolvedValue: unknown) => {
if (shouldUnsubscribe) return false;
return resolvedValue ? true : false;
shouldContinue: (_rejectionReason: any, _resolvedValue: unknown) => {
return shouldUnsubscribe === false;
},
});

// This is kinda weird check logic that needs to happen for promisePoller library on cancelled subscriptions
poller.then().catch((e) => {
if (e instanceof Array) {
this.logger.debug("Subscription Closed");
} else {
throw e;
}
poller.catch((e) => {
this.logger.error(`Subscription closing: ${subscriberId} to ${streamName}.\r\nSubscription encountered the following errors:`, e);
});

return { unsubscribe };
Expand Down

0 comments on commit 53fa427

Please sign in to comment.