Skip to content

Commit

Permalink
Updated subscriber and fixed version
Browse files Browse the repository at this point in the history
Updated subscription primitive to emit a closed event when the
subscription terminates gracefully and an error event when the
subscription completely fails out.

Also updated package version to reflect a proper minor version with the
api changes implemented
  • Loading branch information
doughtnerd committed Feb 26, 2024
1 parent 34820f1 commit 17c62de
Show file tree
Hide file tree
Showing 3 changed files with 16 additions and 12 deletions.
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
"test:docker": "docker-compose -f docker-compose.test.yml up --build --exit-code-from unit unit",
"test-coverage:docker": "docker-compose -f docker-compose.test.yml up --build --exit-code-from coverage coverage"
},
"version": "1.9.5",
"version": "1.10.0",
"main": "dist/index.js",
"author": "Chris Carlson",
"license": "MIT",
Expand Down
6 changes: 1 addition & 5 deletions src/__tests__/message-store-connector.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ describe("Message Store Connector", () => {
retries: 1
}
)
subscription.on('subscription_closed', (message, errors) =>{
subscription.on('subscription_error', (message, errors) =>{
mockFn(message, errors);
});

Expand Down Expand Up @@ -185,10 +185,6 @@ describe("Message Store Connector", () => {
{ pollingInterval: 100 }
);

subscription.on('subscription_closed', () => {
console.log('CLOSED');
});

await messageStore.writeMessage(`${uniqueCategory}:command-${streamId}`, {
id: uuid(),
type: "TestEvent",
Expand Down
20 changes: 14 additions & 6 deletions src/types/subscription.type.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,13 @@ import promisePoller, { PromisePollerOptions } from "promise-poller";
enum Status {
CLOSED,
OPEN,
ERROR,
}

type SubscriptionListener = {
subscription_opened: (message: string) => void;
subscription_closed: (message: string, errors: Error[]) => void;
subscription_closed: (message: string) => void;
subscription_error: (message: string, errors: Error[]) => void;
};

type SubscriptionOptions = {
Expand All @@ -33,8 +35,8 @@ export class Subscription extends EventEmitter {
}

this.status = Status.OPEN;
this.emit('subscription_opened', `Subscription opened: ${this.options.id}`);
const poller = promisePoller({
this.emit('subscription_opened', `Subscription ${this.options.id} opened.`);
promisePoller({
taskFn: this.options.pollFn,
interval: this.options.pollingInterval,
name: this.options.id,
Expand All @@ -43,13 +45,19 @@ export class Subscription extends EventEmitter {
shouldContinue: (_rejectionReason: any, _resolvedValue: unknown) => {
return this.status === Status.OPEN;
},
});
poller.catch((e) => {
}).then(() => {
this.status = Status.CLOSED;
this.emit('subscription_closed', `Subscription closing: ${this.options.id}.`, e);
this.emit('subscription_closed', `Subscription ${this.options.id} closed.`);
}).catch((e) => {
this.status = Status.ERROR;
this.emit('subscription_error', `Subscription ${this.options.id} encountered errors:`, e);
});
}

public once(event: keyof SubscriptionListener, listener: SubscriptionListener[typeof event]): this {
return super.once(event, listener);
}

public on(event: keyof SubscriptionListener, listener: SubscriptionListener[typeof event]): this {
return super.on(event, listener);
}
Expand Down

0 comments on commit 17c62de

Please sign in to comment.