Skip to content

Commit

Permalink
2.3.2 (#8)
Browse files Browse the repository at this point in the history
* 2.4.0

* * add `waitUntilCommitHead` to the grid
 * fix `HamokEmitter` to have more than one subscriber on one peer
  • Loading branch information
balazskreith authored Aug 20, 2024
1 parent 2819ffd commit 3616252
Show file tree
Hide file tree
Showing 5 changed files with 37 additions and 3 deletions.
7 changes: 7 additions & 0 deletions docs/emitter.md
Original file line number Diff line number Diff line change
Expand Up @@ -229,6 +229,13 @@ The `notify` method notifies all subscribed listeners of an event without waitin

In short use notify for fire and forget messages, and use publish for messages that need to be checked if it is delivered to the target remote peers.

### How many subscribers can a HamokEmitter have for one event?

A `HamokEmitter` can have an unlimited number of subscribers for each event on any peer.
The underlying `EventEmitter` implementation used in `HamokEmitter` has no limit on the number of listeners for an event.
To distribute the event to the remote peers subscribed to the event, the `HamokEmitter` uses the Raft consensus algorithm
to ensure that the subscription is consistent across all peers.

### What is the payloadsCodec for?

The `payloadsCodec` is a map of payload codecs for encoding and decoding event payloads. The key is an event type, and the value is a codec for that event type. This is useful for customizing the encoding and decoding of event payloads, if you are for example unsatisfied with the default JSON encoding/decoding.
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "hamok",
"version": "2.3.1",
"version": "2.3.2",
"description": "Lightweight Distributed Object Storage on RAFT consensus algorithm",
"main": "lib/index.js",
"types": "lib/index.d.ts",
Expand Down
1 change: 1 addition & 0 deletions src/Hamok.ts
Original file line number Diff line number Diff line change
Expand Up @@ -423,6 +423,7 @@ export class Hamok<AppData extends Record<string, unknown> = Record<string, unkn
this.grid = new HamokGrid(
this._emitMessage.bind(this),
this.submit.bind(this),
this.waitUntilCommitHead.bind(this),
new OngoingRequestsNotifier(
providedConfig?.ongoingRequestsSendingPeriodInMs ?? 1000,
(msg) => this._emitMessage(
Expand Down
1 change: 1 addition & 0 deletions src/HamokGrid.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ export class HamokGrid {
public constructor(
public readonly sendMessage: (message: HamokMessage, targetPeerIds?: ReadonlySet<string> | string[] | string) => void,
public readonly submit: (message: HamokMessage) => Promise<void>,
public readonly waitUntilCommitHead: () => Promise<void>,
public readonly ongoingRequestsNotifier: OngoingRequestsNotifier,
public readonly remotePeerIds: ReadonlySet<string>,
private _getLocalPeerId: () => string,
Expand Down
29 changes: 27 additions & 2 deletions src/collections/HamokEmitter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -57,9 +57,13 @@ export class HamokEmitter<T extends HamokEmitterEventMap> {
.on('DeleteEntriesRequest', (request) => {
const removedPeerIds = [ ...request.keys ];

for (const subscribedPeerIds of this._subscriptions.values()) {
for (const subscribedPeerIds of [ ...this._subscriptions.values() ]) {
for (const removedPeerId of removedPeerIds) {
subscribedPeerIds.delete(removedPeerId);

if (subscribedPeerIds.size < 1) {
this._subscriptions.delete(removedPeerId);
}
}
}
logger.info('DeleteEntriesRequest is received, %o is removed from the subscription list for %s', removedPeerIds, this.id);
Expand Down Expand Up @@ -225,20 +229,41 @@ export class HamokEmitter<T extends HamokEmitterEventMap> {
this._emitter.removeAllListeners();
}

public async hasSubscribers<K extends keyof T>(event: K, filterByLocalNode = false): Promise<boolean> {
if (this._closed) throw new Error('Cannot check subscribers on a closed emitter');

await this.connection.grid.waitUntilCommitHead();

const remotePeerIds = this._subscriptions.get(event);

if (!remotePeerIds) return false;
else if (!filterByLocalNode) return true;
else return remotePeerIds.has(this.connection.grid.localPeerId);
}

public async subscribe<K extends keyof T>(event: K, listener: (...args: T[K]) => void): Promise<void> {
if (this._closed) throw new Error('Cannot subscribe on a closed emitter');

// if we already have a listener, we don't need to subscribe in the raft
if (this._emitter.listenerCount(event as string)) {
return (this._emitter.on(event as string, listener), void 0);
}

await this.connection.requestInsertEntries(new Map([ [ event as string, 'empty' ] ]));
this._emitter.on(event as string, listener);
}

public async unsubscribe<K extends keyof T>(event: K, listener: (...args: T[K]) => void): Promise<void> {
if (this._closed) throw new Error('Cannot unsubscribe on a closed emitter');

this._emitter.off(event as string, listener);

// if we still have a listener, we don't need to unsubscribe in the raft
if (this._emitter.listenerCount(event as string)) return;

await this.connection.requestRemoveEntries(
Collections.setOf(event as string)
);
this._emitter.off(event as string, listener);
}

public clear() {
Expand Down

0 comments on commit 3616252

Please sign in to comment.