diff --git a/src/core/dwn-error.ts b/src/core/dwn-error.ts index bbe2e9440..5fe1ae2cc 100644 --- a/src/core/dwn-error.ts +++ b/src/core/dwn-error.ts @@ -25,6 +25,7 @@ export enum DwnErrorCode { DidNotValid = 'DidNotValid', DidResolutionFailed = 'DidResolutionFailed', Ed25519InvalidJwk = 'Ed25519InvalidJwk', + EventEmitterStreamNotOpenError = 'EventEmitterStreamNotOpenError', EventsSubscribeEventStreamUnimplemented = 'EventsSubscribeEventStreamUnimplemented', GeneralJwsVerifierGetPublicKeyNotFound = 'GeneralJwsVerifierGetPublicKeyNotFound', GeneralJwsVerifierInvalidSignature = 'GeneralJwsVerifierInvalidSignature', diff --git a/src/event-log/event-emitter-stream.ts b/src/event-log/event-emitter-stream.ts index 787e38cca..10b1dd79e 100644 --- a/src/event-log/event-emitter-stream.ts +++ b/src/event-log/event-emitter-stream.ts @@ -3,24 +3,42 @@ import type { KeyValues } from '../types/query-types.js'; import type { EventListener, EventStream, EventSubscription } from '../types/subscriptions.js'; import { EventEmitter } from 'events'; +import { DwnError, DwnErrorCode } from '../core/dwn-error.js'; const EVENTS_LISTENER_CHANNEL = 'events'; +export interface EventEmitterStreamConfig { + /** + * An optional error handler in order to be able to react to any errors or warnings triggers by `EventEmitter`. + * By default we log errors with `console.error`. + */ + errorHandler?: (error: any) => void; +}; + export class EventEmitterStream implements EventStream { private eventEmitter: EventEmitter; private isOpen: boolean = false; - constructor() { + constructor(config: EventEmitterStreamConfig = {}) { // we capture the rejections and currently just log the errors that are produced this.eventEmitter = new EventEmitter({ captureRejections: true }); - this.eventEmitter.on('error', this.eventError); + + // number of listeners per particular eventName before a warning is emitted + // we set to 0 which represents infinity. + // https://nodejs.org/api/events.html#emittersetmaxlistenersn + this.eventEmitter.setMaxListeners(0); + + if (config.errorHandler) { + this.errorHandler = config.errorHandler; + } + + this.eventEmitter.on('error', this.errorHandler); } - // we subscribe to the general `EventEmitter` error events with this handler. - // this handler is also called when there is a caught error upon emitting an event from a handler. - private eventError(error: any): void { - console.error('event emitter error', error); - }; + /** + * we subscribe to the `EventEmitter` error handler with a provided handler or set one which logs the errors. + */ + private errorHandler: (error:any) => void = (error) => { console.error('event emitter error', error); }; async subscribe(id: string, listener: EventListener): Promise { this.eventEmitter.on(EVENTS_LISTENER_CHANNEL, listener); @@ -41,7 +59,10 @@ export class EventEmitterStream implements EventStream { emit(tenant: string, message: GenericMessage, indexes: KeyValues): void { if (!this.isOpen) { - console.error('message emitted when EventEmitterStream is closed', tenant, message, indexes); + this.errorHandler(new DwnError( + DwnErrorCode.EventEmitterStreamNotOpenError, + 'a message emitted when EventEmitterStream is closed' + )); return; } this.eventEmitter.emit(EVENTS_LISTENER_CHANNEL, tenant, message, indexes); diff --git a/tests/event-log/event-emitter-stream.spec.ts b/tests/event-log/event-emitter-stream.spec.ts index 947729cc8..dbd2673ae 100644 --- a/tests/event-log/event-emitter-stream.spec.ts +++ b/tests/event-log/event-emitter-stream.spec.ts @@ -1,7 +1,9 @@ -import type { MessageStore } from '../../src/index.js'; +import type { KeyValues } from '../../src/types/query-types.js'; +import type { GenericMessage, MessageStore } from '../../src/index.js'; import { EventEmitterStream } from '../../src/event-log/event-emitter-stream.js'; import { TestStores } from '../test-stores.js'; +import { Message, TestDataGenerator, Time } from '../../src/index.js'; import sinon from 'sinon'; @@ -11,16 +13,10 @@ import chai, { expect } from 'chai'; chai.use(chaiAsPromised); describe('EventEmitterStream', () => { - // saving the original `console.error` function to re-assign after tests complete - const originalConsoleErrorFunction = console.error; - let eventStream: EventEmitterStream; let messageStore: MessageStore; before(() => { ({ messageStore } = TestStores.get()); - - // do not print the console error statements from the emitter error - console.error = (_):void => { }; }); beforeEach(async () => { @@ -28,14 +24,13 @@ describe('EventEmitterStream', () => { }); after(async () => { - console.error = originalConsoleErrorFunction; // Clean up after each test by closing and clearing the event stream await messageStore.close(); - await eventStream.close(); + sinon.restore(); }); it('should remove listeners when `close` method is used', async () => { - eventStream = new EventEmitterStream(); + const eventStream = new EventEmitterStream(); const emitter = eventStream['eventEmitter']; // count the `events` listeners, which represents all listeners @@ -50,10 +45,50 @@ describe('EventEmitterStream', () => { }); it('logs message when the emitter experiences an error', async () => { - const eventErrorSpy = sinon.spy(EventEmitterStream.prototype as any, 'eventError'); - eventStream = new EventEmitterStream(); + const testHandler = { + errorHandler: (_:any):void => {}, + }; + const eventErrorSpy = sinon.spy(testHandler, 'errorHandler'); + + const eventStream = new EventEmitterStream({ errorHandler: testHandler.errorHandler }); const emitter = eventStream['eventEmitter']; emitter.emit('error', new Error('random error')); expect(eventErrorSpy.callCount).to.equal(1); }); + + it('does not emit messages if event stream is closed', async () => { + const testHandler = { + errorHandler: (_:any):void => {}, + }; + const eventErrorSpy = sinon.spy(testHandler, 'errorHandler'); + + const eventStream = new EventEmitterStream({ errorHandler: testHandler.errorHandler }); + + const messageCids: string[] = []; + const handler = async (_tenant: string, message: GenericMessage, _indexes: KeyValues): Promise => { + const messageCid = await Message.getCid(message); + messageCids.push(messageCid); + }; + const subscription = await eventStream.subscribe('sub-1', handler); + + // close eventStream + await eventStream.close(); + + const message1 = await TestDataGenerator.generateRecordsWrite({}); + eventStream.emit('did:alice', message1.message, {}); + const message2 = await TestDataGenerator.generateRecordsWrite({}); + eventStream.emit('did:alice', message2.message, {}); + + expect(eventErrorSpy.callCount).to.equal(2); + await subscription.close(); + + await Time.minimalSleep(); + expect(messageCids).to.have.length(0); + }); + + it('sets max listeners to 0 which represents infinity', async () => { + const eventStreamOne = new EventEmitterStream(); + const emitterOne = eventStreamOne['eventEmitter']; + expect(emitterOne.getMaxListeners()).to.equal(0); + }); });