Skip to content

Commit

Permalink
add HamokEmitterStats
Browse files Browse the repository at this point in the history
  • Loading branch information
balazskreith committed Oct 11, 2024
1 parent 1e20ec8 commit 093b627
Show file tree
Hide file tree
Showing 2 changed files with 28 additions and 2 deletions.
27 changes: 26 additions & 1 deletion src/collections/HamokEmitter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,24 @@ type UpdatedMetaData<M extends Record<string, unknown>> = {
newMetaData: M;
}

export type HamokEmitterStats = {
numberOfSubscriptions: number;
numberOfReceivedEventInvocations: number;
numberOfSentEventInvocations: number;
}

export class HamokEmitter<T extends HamokEmitterEventMap, M extends Record<string, unknown> = Record<string, unknown>> {
// private readonly _subscriptions = new Map<keyof T, Set<string>>();
public readonly subscriptions = new HamokEmitterSubscriptions<T, M>();
private readonly _emitter = new EventEmitter();
private _initializing?: Promise<this>;
private _closed = false;

public stats: HamokEmitterStats = {
numberOfSubscriptions: 0,
numberOfReceivedEventInvocations: 0,
numberOfSentEventInvocations: 0,
};

public constructor(
public readonly connection: HamokConnection<string, string>,
Expand Down Expand Up @@ -88,7 +100,8 @@ export class HamokEmitter<T extends HamokEmitterEventMap, M extends Record<strin
const removedPeerIds = [ ...request.keys ];

removedPeerIds.forEach((peerId) => this.subscriptions.removePeerFromAllEvent(peerId));
logger.info('DeleteEntriesRequest is received, %o is removed from the subscription list for %s', removedPeerIds, this.id);

logger.debug('DeleteEntriesRequest is received, %o is removed from the subscription list for %s', removedPeerIds, this.id);

if (request.sourceEndpointId === this.connection.grid.localPeerId) {
this.connection.respond(
Expand Down Expand Up @@ -126,6 +139,9 @@ export class HamokEmitter<T extends HamokEmitterEventMap, M extends Record<strin
const payloads = this.payloadsCodec?.get(event)?.decode(serializedPayload) ?? JSON.parse(serializedPayload);

this._emitter.emit(event, ...payloads);

++this.stats.numberOfReceivedEventInvocations;

} catch (err) {
logger.error('Error while decoding the payload for %s, %s, %o', this.id, event, `${err}`);
}
Expand All @@ -145,6 +161,9 @@ export class HamokEmitter<T extends HamokEmitterEventMap, M extends Record<strin
const payloads = this.payloadsCodec?.get(event)?.decode(serializedPayload) ?? JSON.parse(serializedPayload);

this._emitter.emit(event, ...payloads);

++this.stats.numberOfReceivedEventInvocations;

} catch (err) {
logger.error('Error while decoding the payload for %s, %s, %o', this.id, event, `${err}`);
}
Expand Down Expand Up @@ -223,6 +242,11 @@ export class HamokEmitter<T extends HamokEmitterEventMap, M extends Record<strin
.once('close', () => this.close())
;

this.subscriptions
.on('added', () => (this.stats.numberOfSubscriptions = this.subscriptions.size))
.on('removed', () => (this.stats.numberOfSubscriptions = this.subscriptions.size))
;

logger.trace('Emitter %s is created', this.id);

process.nextTick(() => (this._initializing = this._startInitializing()));
Expand Down Expand Up @@ -250,6 +274,7 @@ export class HamokEmitter<T extends HamokEmitterEventMap, M extends Record<strin

this.connection.close();
this._emitter.removeAllListeners();
this.subscriptions.removeAllListeners();
}

public async hasSubscribers<K extends keyof T>(event: K, filterByLocalNode = false): Promise<boolean> {
Expand Down
3 changes: 2 additions & 1 deletion src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,8 @@ export {
HamokQueue
} from './collections/HamokQueue';
export {
HamokEmitter
HamokEmitter,
HamokEmitterStats,
} from './collections/HamokEmitter';
export {
HamokConnection
Expand Down

0 comments on commit 093b627

Please sign in to comment.