Skip to content

Commit

Permalink
Enhance EventEmitterStream with options (#665)
Browse files Browse the repository at this point in the history
Set maxListeners option on EventEmitter to 0, which represents infinity.
This is a soft limit of when to issue warnings into the stderr of the system but does not have an effect on actual event listeners.
https://nodejs.org/api/events.html#eventsdefaultmaxlisteners

Exposed a way to capture errors, with the default being sent to console.error as before.
  • Loading branch information
LiranCohen authored Jan 20, 2024
1 parent 908d5f0 commit 4e89df6
Show file tree
Hide file tree
Showing 3 changed files with 77 additions and 20 deletions.
1 change: 1 addition & 0 deletions src/core/dwn-error.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ export enum DwnErrorCode {
DidNotValid = 'DidNotValid',
DidResolutionFailed = 'DidResolutionFailed',
Ed25519InvalidJwk = 'Ed25519InvalidJwk',
EventEmitterStreamNotOpenError = 'EventEmitterStreamNotOpenError',
EventsSubscribeEventStreamUnimplemented = 'EventsSubscribeEventStreamUnimplemented',
GeneralJwsVerifierGetPublicKeyNotFound = 'GeneralJwsVerifierGetPublicKeyNotFound',
GeneralJwsVerifierInvalidSignature = 'GeneralJwsVerifierInvalidSignature',
Expand Down
37 changes: 29 additions & 8 deletions src/event-log/event-emitter-stream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,24 +3,42 @@ import type { KeyValues } from '../types/query-types.js';
import type { EventListener, EventStream, EventSubscription } from '../types/subscriptions.js';

import { EventEmitter } from 'events';
import { DwnError, DwnErrorCode } from '../core/dwn-error.js';

const EVENTS_LISTENER_CHANNEL = 'events';

export interface EventEmitterStreamConfig {
/**
* An optional error handler in order to be able to react to any errors or warnings triggers by `EventEmitter`.
* By default we log errors with `console.error`.
*/
errorHandler?: (error: any) => void;
};

export class EventEmitterStream implements EventStream {
private eventEmitter: EventEmitter;
private isOpen: boolean = false;

constructor() {
constructor(config: EventEmitterStreamConfig = {}) {
// we capture the rejections and currently just log the errors that are produced
this.eventEmitter = new EventEmitter({ captureRejections: true });
this.eventEmitter.on('error', this.eventError);

// number of listeners per particular eventName before a warning is emitted
// we set to 0 which represents infinity.
// https://nodejs.org/api/events.html#emittersetmaxlistenersn
this.eventEmitter.setMaxListeners(0);

if (config.errorHandler) {
this.errorHandler = config.errorHandler;
}

this.eventEmitter.on('error', this.errorHandler);
}

// we subscribe to the general `EventEmitter` error events with this handler.
// this handler is also called when there is a caught error upon emitting an event from a handler.
private eventError(error: any): void {
console.error('event emitter error', error);
};
/**
* we subscribe to the `EventEmitter` error handler with a provided handler or set one which logs the errors.
*/
private errorHandler: (error:any) => void = (error) => { console.error('event emitter error', error); };

async subscribe(id: string, listener: EventListener): Promise<EventSubscription> {
this.eventEmitter.on(EVENTS_LISTENER_CHANNEL, listener);
Expand All @@ -41,7 +59,10 @@ export class EventEmitterStream implements EventStream {

emit(tenant: string, message: GenericMessage, indexes: KeyValues): void {
if (!this.isOpen) {
console.error('message emitted when EventEmitterStream is closed', tenant, message, indexes);
this.errorHandler(new DwnError(
DwnErrorCode.EventEmitterStreamNotOpenError,
'a message emitted when EventEmitterStream is closed'
));
return;
}
this.eventEmitter.emit(EVENTS_LISTENER_CHANNEL, tenant, message, indexes);
Expand Down
59 changes: 47 additions & 12 deletions tests/event-log/event-emitter-stream.spec.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
import type { MessageStore } from '../../src/index.js';
import type { KeyValues } from '../../src/types/query-types.js';
import type { GenericMessage, MessageStore } from '../../src/index.js';

import { EventEmitterStream } from '../../src/event-log/event-emitter-stream.js';
import { TestStores } from '../test-stores.js';
import { Message, TestDataGenerator, Time } from '../../src/index.js';

import sinon from 'sinon';

Expand All @@ -11,31 +13,24 @@ import chai, { expect } from 'chai';
chai.use(chaiAsPromised);

describe('EventEmitterStream', () => {
// saving the original `console.error` function to re-assign after tests complete
const originalConsoleErrorFunction = console.error;
let eventStream: EventEmitterStream;
let messageStore: MessageStore;

before(() => {
({ messageStore } = TestStores.get());

// do not print the console error statements from the emitter error
console.error = (_):void => { };
});

beforeEach(async () => {
messageStore.clear();
});

after(async () => {
console.error = originalConsoleErrorFunction;
// Clean up after each test by closing and clearing the event stream
await messageStore.close();
await eventStream.close();
sinon.restore();
});

it('should remove listeners when `close` method is used', async () => {
eventStream = new EventEmitterStream();
const eventStream = new EventEmitterStream();
const emitter = eventStream['eventEmitter'];

// count the `events` listeners, which represents all listeners
Expand All @@ -50,10 +45,50 @@ describe('EventEmitterStream', () => {
});

it('logs message when the emitter experiences an error', async () => {
const eventErrorSpy = sinon.spy(EventEmitterStream.prototype as any, 'eventError');
eventStream = new EventEmitterStream();
const testHandler = {
errorHandler: (_:any):void => {},
};
const eventErrorSpy = sinon.spy(testHandler, 'errorHandler');

const eventStream = new EventEmitterStream({ errorHandler: testHandler.errorHandler });
const emitter = eventStream['eventEmitter'];
emitter.emit('error', new Error('random error'));
expect(eventErrorSpy.callCount).to.equal(1);
});

it('does not emit messages if event stream is closed', async () => {
const testHandler = {
errorHandler: (_:any):void => {},
};
const eventErrorSpy = sinon.spy(testHandler, 'errorHandler');

const eventStream = new EventEmitterStream({ errorHandler: testHandler.errorHandler });

const messageCids: string[] = [];
const handler = async (_tenant: string, message: GenericMessage, _indexes: KeyValues): Promise<void> => {
const messageCid = await Message.getCid(message);
messageCids.push(messageCid);
};
const subscription = await eventStream.subscribe('sub-1', handler);

// close eventStream
await eventStream.close();

const message1 = await TestDataGenerator.generateRecordsWrite({});
eventStream.emit('did:alice', message1.message, {});
const message2 = await TestDataGenerator.generateRecordsWrite({});
eventStream.emit('did:alice', message2.message, {});

expect(eventErrorSpy.callCount).to.equal(2);
await subscription.close();

await Time.minimalSleep();
expect(messageCids).to.have.length(0);
});

it('sets max listeners to 0 which represents infinity', async () => {
const eventStreamOne = new EventEmitterStream();
const emitterOne = eventStreamOne['eventEmitter'];
expect(emitterOne.getMaxListeners()).to.equal(0);
});
});

0 comments on commit 4e89df6

Please sign in to comment.