diff --git a/docs/emitter.md b/docs/emitter.md index 9840465..b480db4 100644 --- a/docs/emitter.md +++ b/docs/emitter.md @@ -229,6 +229,13 @@ The `notify` method notifies all subscribed listeners of an event without waitin In short use notify for fire and forget messages, and use publish for messages that need to be checked if it is delivered to the target remote peers. +### How many subscribers can a HamokEmitter have for one event? + +A `HamokEmitter` can have an unlimited number of subscribers for each event on any peer. +The underlying `EventEmitter` implementation used in `HamokEmitter` has no limit on the number of listeners for an event. +To distribute the event to the remote peers subscribed to the event, the `HamokEmitter` uses the Raft consensus algorithm +to ensure that the subscription is consistent across all peers. + ### What is the payloadsCodec for? The `payloadsCodec` is a map of payload codecs for encoding and decoding event payloads. The key is an event type, and the value is a codec for that event type. This is useful for customizing the encoding and decoding of event payloads, if you are for example unsatisfied with the default JSON encoding/decoding. diff --git a/package.json b/package.json index cfbf700..3891f19 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "hamok", - "version": "2.3.1", + "version": "2.3.2", "description": "Lightweight Distributed Object Storage on RAFT consensus algorithm", "main": "lib/index.js", "types": "lib/index.d.ts", diff --git a/src/Hamok.ts b/src/Hamok.ts index cb49dd5..2dd0a05 100644 --- a/src/Hamok.ts +++ b/src/Hamok.ts @@ -423,6 +423,7 @@ export class Hamok = Record this._emitMessage( diff --git a/src/HamokGrid.ts b/src/HamokGrid.ts index a020895..abcaf2b 100644 --- a/src/HamokGrid.ts +++ b/src/HamokGrid.ts @@ -13,6 +13,7 @@ export class HamokGrid { public constructor( public readonly sendMessage: (message: HamokMessage, targetPeerIds?: ReadonlySet | string[] | string) => void, public readonly submit: (message: HamokMessage) => Promise, + public readonly waitUntilCommitHead: () => Promise, public readonly ongoingRequestsNotifier: OngoingRequestsNotifier, public readonly remotePeerIds: ReadonlySet, private _getLocalPeerId: () => string, diff --git a/src/collections/HamokEmitter.ts b/src/collections/HamokEmitter.ts index ea0907c..396f73f 100644 --- a/src/collections/HamokEmitter.ts +++ b/src/collections/HamokEmitter.ts @@ -57,9 +57,13 @@ export class HamokEmitter { .on('DeleteEntriesRequest', (request) => { const removedPeerIds = [ ...request.keys ]; - for (const subscribedPeerIds of this._subscriptions.values()) { + for (const subscribedPeerIds of [ ...this._subscriptions.values() ]) { for (const removedPeerId of removedPeerIds) { subscribedPeerIds.delete(removedPeerId); + + if (subscribedPeerIds.size < 1) { + this._subscriptions.delete(removedPeerId); + } } } logger.info('DeleteEntriesRequest is received, %o is removed from the subscription list for %s', removedPeerIds, this.id); @@ -225,8 +229,25 @@ export class HamokEmitter { this._emitter.removeAllListeners(); } + public async hasSubscribers(event: K, filterByLocalNode = false): Promise { + if (this._closed) throw new Error('Cannot check subscribers on a closed emitter'); + + await this.connection.grid.waitUntilCommitHead(); + + const remotePeerIds = this._subscriptions.get(event); + + if (!remotePeerIds) return false; + else if (!filterByLocalNode) return true; + else return remotePeerIds.has(this.connection.grid.localPeerId); + } + public async subscribe(event: K, listener: (...args: T[K]) => void): Promise { if (this._closed) throw new Error('Cannot subscribe on a closed emitter'); + + // if we already have a listener, we don't need to subscribe in the raft + if (this._emitter.listenerCount(event as string)) { + return (this._emitter.on(event as string, listener), void 0); + } await this.connection.requestInsertEntries(new Map([ [ event as string, 'empty' ] ])); this._emitter.on(event as string, listener); @@ -234,11 +255,15 @@ export class HamokEmitter { public async unsubscribe(event: K, listener: (...args: T[K]) => void): Promise { if (this._closed) throw new Error('Cannot unsubscribe on a closed emitter'); + + this._emitter.off(event as string, listener); + // if we still have a listener, we don't need to unsubscribe in the raft + if (this._emitter.listenerCount(event as string)) return; + await this.connection.requestRemoveEntries( Collections.setOf(event as string) ); - this._emitter.off(event as string, listener); } public clear() {