diff --git a/README.md b/README.md index 1c8e614..ac9cd9f 100644 --- a/README.md +++ b/README.md @@ -31,6 +31,7 @@ yarn add hamok - [Important Notes](#important-notes) - [Contributing](#contributing) - [License](#license) + ## Quick Start ```javascript @@ -39,28 +40,22 @@ import { Hamok } from 'hamok'; (async () => { const server_1 = new Hamok(); const server_2 = new Hamok(); - + server_1.on('message', server_2.accept.bind(server_2)); server_2.on('message', server_1.accept.bind(server_1)); - - server_1.addRemotePeerId(server_2.localPeerId); - server_2.addRemotePeerId(server_1.localPeerId); - - server_1.start(); - server_2.start(); - + await Promise.all([ - new Promise(resolve => server_1.once('leader-changed', resolve)), - new Promise(resolve => server_2.once('leader-changed', resolve)), + server_1.join(), + server_2.join() ]); - + const storage_1 = server_1.createMap({ mapId: 'my-replicated-storage', }); const storage_2 = server_2.createMap({ mapId: 'my-replicated-storage', }); - + console.log('Setting value in storage on server_1 for key-1 to 1'); console.log('Setting value in storage on server_2 for key-2 to 2'); @@ -72,7 +67,7 @@ import { Hamok } from 'hamok'; server_1.waitUntilCommitHead(), server_2.waitUntilCommitHead(), ]) - + console.log('value for key-2 by server_1:', storage_1.get('key-2')); console.log('value for key-1 by server_2:', storage_1.get('key-1')); @@ -89,11 +84,11 @@ Hamok is a lightweight, distributed object storage library developed using the [ [Raft](https://raft.github.io/) is a consensus algorithm designed to manage a replicated log across a distributed system. Its primary goal is to ensure that multiple servers agree on a sequence of state transitions, providing consistency and fault tolerance in distributed systems. RAFT breaks down the consensus problem into three subproblems: - - **Leader Election:** Ensures that one server acts as the leader, which is responsible for managing the log replication. +- **Leader Election:** Ensures that one server acts as the leader, which is responsible for managing the log replication. - - **Log Replication:** The leader receives log entries from clients and replicates them to follower servers. The leader waits for a majority of followers to acknowledge the entries before considering them committed. +- **Log Replication:** The leader receives log entries from clients and replicates them to follower servers. The leader waits for a majority of followers to acknowledge the entries before considering them committed. - - **Safety:** RAFT guarantees that committed log entries are durable and will not be lost, even in the presence of server failures. It ensures that no two leaders can be elected for the same term and that logs are consistent across servers. +- **Safety:** RAFT guarantees that committed log entries are durable and will not be lost, even in the presence of server failures. It ensures that no two leaders can be elected for the same term and that logs are consistent across servers. Overall, RAFT is designed to be understandable and easy to implement while providing strong consistency and reliability in distributed systems. @@ -105,7 +100,6 @@ Hamok uses Raft to manage the shared storage across multiple instances. - **Distributed Data Structures:** Provides maps, queues, records, and emitters. - **Event-driven Architecture:** Emits events for state changes, errors, and communication. - ## Collections ### HamokMap @@ -124,12 +118,10 @@ HamokEmitter is an event emitter designed for distributed systems. It allows ser HamokRecord is a feature that provides distributed storage for individual record objects. Each record can be accessed and updated by multiple service instances, with RAFT ensuring that all updates are consistently applied and persisted across the system. - ## User Manual You can find detailed user manuals [here](https://balazskreith.github.io/hamok-ts/) - ## Contributing Contributions are welcome! Please feel free to submit issues or pull requests to improve the library. diff --git a/docs/emitter.md b/docs/emitter.md index 01ed6b7..8233cfe 100644 --- a/docs/emitter.md +++ b/docs/emitter.md @@ -1,16 +1,17 @@ ## User Manual -[Hamok](./index.md) | HamokEmitter | [HamokMap](./map.md) | [HamokQueue](./queue.md) | [HamokRecord](./record.md) + +[Hamok](./index.md) / HamokEmitter / [HamokMap](./map.md) / [HamokQueue](./record.md) / [HamokRecord](./remoteMap.md) / [HamokRemoteMap](./remoteMap.md) ## Table of Contents -* [Overview](#overview) -* [Configuration](#configuration) -* [API Reference](#api-reference) - * [Properties](#properties) - * [Events](#events) - * [Methods](#methods) -* [Examples](#examples) -* [FAQ](#faq) +- [Overview](#overview) +- [Configuration](#configuration) +- [API Reference](#api-reference) + - [Properties](#properties) + - [Events](#events) + - [Methods](#methods) +- [Examples](#examples) +- [FAQ](#faq) ## Overview @@ -22,7 +23,7 @@ To create a `HamokEmitter` instance, you need a `Hamok` instance. Here is how yo ```typescript const emitter = hamok.createEmitter({ - emitterId: 'exampleEmitter', + emitterId: "exampleEmitter", }); ``` @@ -30,51 +31,56 @@ const emitter = hamok.createEmitter({ ```typescript type MyEventMap = { - myEvent: [string, number]; + myEvent: [string, number]; }; const emitter = hamok.createEmitter({ - - /** - * The unique identifier for the emitter. - */ - emitterId: 'exampleEmitter', - - /** - * Optional. The timeout duration in milliseconds for requests. - * - * DEFAULT: 5000 - */ - requestTimeoutInMs: 5000, - - /** - * Optional. The maximum waiting time in milliseconds for a message to be sent. - * The storage holds back the message sending if Hamok is not connected to a grid or not part of a network. - * - * DEFAULT: 10x requestTimeoutInMs - */ - maxMessageWaitingTimeInMs: 50000, - - /** - * Optional. The maximum number of keys allowed in request or response messages. - * - * DEFAULT: 0 means infinity - */ - maxOutboundMessageKeys: 1000, - - /** - * Optional. The maximum number of values allowed in request or response messages. - * - * DEFAULT: 0 means infinity - */ - maxOutboundMessageValues: 100, - - /** - * Optional. A map of payload codecs for encoding and decoding event payloads. - * The key is an event type, and the value is a codec for that event type. - * - * DEFAULT: JSON codec - */ - payloadsCodec?: Map string, decode: (data: string) => unknown[] }>, + /** + * The unique identifier for the emitter. + */ + emitterId: "exampleEmitter", + + /** + * Optional. The timeout duration in milliseconds for requests. + * + * DEFAULT: 5000 + */ + requestTimeoutInMs: 5000, + + /** + * Optional. The maximum waiting time in milliseconds for a message to be sent. + * The storage holds back the message sending if Hamok is not connected to a grid or not part of a network. + * + * DEFAULT: 10x requestTimeoutInMs + */ + maxMessageWaitingTimeInMs: 50000, + + /** + * Optional. The maximum number of keys allowed in request or response messages. + * + * DEFAULT: 0 means infinity + */ + maxOutboundMessageKeys: 1000, + + /** + * Optional. The maximum number of values allowed in request or response messages. + * + * DEFAULT: 0 means infinity + */ + maxOutboundMessageValues: 100, + + /** + * Optional. A map of payload codecs for encoding and decoding event payloads. + * The key is an event type, and the value is a codec for that event type. + * + * DEFAULT: JSON codec + */ + payloadsCodec: Map< + keyof MyEventMap, + { + encode: (...args: unknown[]) => string; + decode: (data: string) => unknown[]; + } + >, }); ``` @@ -117,15 +123,15 @@ A class for managing events and subscriptions in a distributed system. ```typescript const emitter = new HamokEmitter(connection, payloadsCodec); -emitter.subscribe('event', (data) => { +emitter.subscribe("event", (data) => { console.log(`Received data: ${data}`); }); -emitter.publish('event', 'sample data').then((peerIds) => { +emitter.publish("event", "sample data").then((peerIds) => { console.log(`Event published to peers: ${peerIds}`); }); -emitter.unsubscribe('event', (data) => { +emitter.unsubscribe("event", (data) => { console.log(`Unsubscribed from event`); }); @@ -134,7 +140,7 @@ emitter.close(); ## Examples - - [simple distributed emitter](https://github.com/balazskreith/hamok-ts/blob/main/examples/src/emitter-example.ts) +- [simple distributed emitter](https://github.com/balazskreith/hamok-ts/blob/main/examples/src/emitter-example.ts) ## FAQ @@ -143,7 +149,7 @@ emitter.close(); To create a `HamokEmitter` instance, you need a `HamokConnection`. Here is an example: ```typescript -const connection = new HamokConnection('my-storage-id'); +const connection = new HamokConnection("my-storage-id"); const emitter = new HamokEmitter(connection); ``` @@ -160,7 +166,7 @@ When creating a `HamokEmitter` instance, you can optionally pass a `payloadsCode You can listen to `HamokEmitter` events using the `on` method. Here is an example: ```typescript -emitter.on('myEvent', (message, count) => { +emitter.on("myEvent", (message, count) => { console.log(`Received: ${message} - ${count}`); }); ``` @@ -191,7 +197,7 @@ emitter.clear(); To subscribe to an event, use the `subscribe` method: ```typescript -await emitter.subscribe('myEvent', (message, count) => { +await emitter.subscribe("myEvent", (message, count) => { console.log(`Received: ${message} - ${count}`); }); ``` @@ -201,7 +207,7 @@ await emitter.subscribe('myEvent', (message, count) => { To unsubscribe from an event, use the `unsubscribe` method: ```typescript -await emitter.unsubscribe('myEvent', (message, count) => { +await emitter.unsubscribe("myEvent", (message, count) => { console.log(`Received: ${message} - ${count}`); }); ``` @@ -211,7 +217,7 @@ await emitter.unsubscribe('myEvent', (message, count) => { To publish an event, use the `publish` method: ```typescript -emitter.publish('myEvent', 'Hello, world!', 42); +emitter.publish("myEvent", "Hello, world!", 42); ``` ### What is the difference between the `publish` and `notify` methods? @@ -225,4 +231,4 @@ In short use notify for fire and forget messages, and use publish for messages t ### What is the payloadsCodec for? -The `payloadsCodec` is a map of payload codecs for encoding and decoding event payloads. The key is an event type, and the value is a codec for that event type. This is useful for customizing the encoding and decoding of event payloads, if you are for example unsatisfied with the default JSON encoding/decoding. \ No newline at end of file +The `payloadsCodec` is a map of payload codecs for encoding and decoding event payloads. The key is an event type, and the value is a codec for that event type. This is useful for customizing the encoding and decoding of event payloads, if you are for example unsatisfied with the default JSON encoding/decoding. diff --git a/docs/index.md b/docs/index.md index 6986ad5..6920638 100644 --- a/docs/index.md +++ b/docs/index.md @@ -1,6 +1,6 @@ ## User Manual -[Hamok](./index.md) | [HamokEmitter](./emitter.md) | [HamokMap](./map.md) | HamokQueue | [HamokRecord](./record.md) | [HamokRemoteMap](./remoteMap.md) +Hamok / [HamokEmitter](./emitter.md) / [HamokMap](./map.md) / [HamokQueue](./record.md) / [HamokRecord](./remoteMap.md) / [HamokRemoteMap](./remoteMap.md) ## Table of Contents diff --git a/docs/map.md b/docs/map.md index d0c9ad2..0335c06 100644 --- a/docs/map.md +++ b/docs/map.md @@ -1,6 +1,6 @@ ## User Manual -[Hamok](./index.md) | [HamokEmitter](./emitter.md) | [HamokMap](./map.md) | HamokQueue | [HamokRecord](./record.md) | [HamokRemoteMap](./remoteMap.md) +[Hamok](./index.md) / [HamokEmitter](./emitter.md) / HamokMap / [HamokQueue](./record.md) / [HamokRecord](./remoteMap.md) / [HamokRemoteMap](./remoteMap.md) ## Table of Contents diff --git a/docs/queue.md b/docs/queue.md index 5543ea9..a99668d 100644 --- a/docs/queue.md +++ b/docs/queue.md @@ -1,6 +1,6 @@ ## User Manual -[Hamok](./index.md) | [HamokEmitter](./emitter.md) | [HamokMap](./map.md) | HamokQueue | [HamokRecord](./record.md) | [HamokRemoteMap](./remoteMap.md) +[Hamok](./index.md) / [HamokEmitter](./emitter.md) / [HamokMap](./map.md) / HamokQueue / [HamokRecord](./remoteMap.md) / [HamokRemoteMap](./remoteMap.md) ## Table of Contents diff --git a/docs/record.md b/docs/record.md index 71f6a1e..d19eb6e 100644 --- a/docs/record.md +++ b/docs/record.md @@ -1,6 +1,6 @@ ## User Manual -[Hamok](./index.md) | [HamokEmitter](./emitter.md) | [HamokMap](./map.md) | HamokQueue | [HamokRecord](./record.md) | [HamokRemoteMap](./remoteMap.md) +[Hamok](./index.md) / [HamokEmitter](./emitter.md) / [HamokMap](./map.md) / [HamokQueue](./record.md) / HamokRecord / [HamokRemoteMap](./remoteMap.md) ## Table of Contents diff --git a/docs/remoteMap.md b/docs/remoteMap.md index 79fa71a..4c65c6b 100644 --- a/docs/remoteMap.md +++ b/docs/remoteMap.md @@ -1,6 +1,6 @@ ## User Manual -[Hamok](./index.md) | [HamokEmitter](./emitter.md) | [HamokMap](./map.md) | HamokQueue | [HamokRecord](./record.md) | [HamokRemoteMap](./remoteMap.md) +[Hamok](./index.md) / [HamokEmitter](./emitter.md) / [HamokMap](./map.md) / [HamokQueue](./record.md) / [HamokRecord](./remoteMap.md) / HamokRemoteMap ## Table of Contents diff --git a/package.json b/package.json index e9c8ec2..cfbf700 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "hamok", - "version": "2.3.0", + "version": "2.3.1", "description": "Lightweight Distributed Object Storage on RAFT consensus algorithm", "main": "lib/index.js", "types": "lib/index.d.ts", diff --git a/src/Hamok.ts b/src/Hamok.ts index 30183b9..cb49dd5 100644 --- a/src/Hamok.ts +++ b/src/Hamok.ts @@ -83,7 +83,7 @@ export type HamokConfig = Record = Record = Record { export class HamokEmitter { private readonly _subscriptions = new Map>(); private readonly _emitter = new EventEmitter(); + private _removedPeerIdsBuffer: string[] = []; private _closed = false; public constructor( @@ -53,6 +54,16 @@ export class HamokEmitter { ); } }) + .on('DeleteEntriesRequest', (request) => { + const removedPeerIds = [ ...request.keys ]; + + for (const subscribedPeerIds of this._subscriptions.values()) { + for (const removedPeerId of removedPeerIds) { + subscribedPeerIds.delete(removedPeerId); + } + } + logger.info('DeleteEntriesRequest is received, %o is removed from the subscription list for %s', removedPeerIds, this.id); + }) .on('RemoveEntriesRequest', (request) => { // this is for the subscription to manage, and to remove the source endpoint from the list if (request.sourceEndpointId === undefined) { @@ -150,10 +161,49 @@ export class HamokEmitter { } }) .on('remote-peer-removed', (remotePeerId) => { - for (const subscribedPeerIds of this._subscriptions.values()) { - subscribedPeerIds.delete(remotePeerId); + if (this.connection.grid.leaderId !== this.connection.localPeerId) { + if (this.connection.grid.leaderId === undefined) { + this._removedPeerIdsBuffer.push(remotePeerId); + } + + return; + } + let retried = 0; + const process = async (): Promise => { + if (this.connection.grid.leaderId === undefined) { + return Promise.resolve(this._removedPeerIdsBuffer.push(remotePeerId)); + } else if (this.connection.grid.leaderId !== this.connection.localPeerId) { + // not our problem. + return Promise.resolve(); + } + try { + return this.connection.requestDeleteEntries(new Set([ remotePeerId ])); + } catch (err) { + logger.warn('Error while requesting to remove endpoint %s, from subscriptions in emitter %s, error: %o', remotePeerId, this.id, err); + + if (++retried < 10) { + return process(); + } + } + }; + + process().catch(() => void 0); + }) + .on('leader-changed', (leaderId) => { + if (leaderId !== this.connection.grid.localPeerId) { + if (leaderId !== undefined) { + this._removedPeerIdsBuffer = []; + } + + return; + } + if (0 < this._removedPeerIdsBuffer.length) { + this.connection.requestDeleteEntries(new Set(this._removedPeerIdsBuffer)) + .then(() => (this._removedPeerIdsBuffer = [])) + .catch(() => { + logger.warn('Error while requesting to remove endpoints %o, from subscriptions in emitter %s', this._removedPeerIdsBuffer, this.id); + }); } - logger.debug('%s remote-peer-removed is received, %s is removed from the subscription list for all events in emitter %s', this.connection.localPeerId, remotePeerId, this.id); }) .once('close', () => this.close()) ; @@ -200,32 +250,52 @@ export class HamokEmitter { if (this._closed) throw new Error('Cannot publish on a closed emitter'); const remotePeerIds = this._subscriptions.get(event); - const entry = [ event as string, this.payloadsCodec?.get(event)?.encode(...args) ?? JSON.stringify(args) ] as [string, string]; - if (!remotePeerIds || remotePeerIds.size < 1) return []; + if (!remotePeerIds || remotePeerIds.size < 1) { + return []; + } else if (remotePeerIds.size === 1 && remotePeerIds.has(this.connection.grid.localPeerId)) { + return (this._emitter.emit(event as string, ...args), [ this.connection.grid.localPeerId ]); + } - const respondedPeerIds = await this.connection.requestUpdateEntries( - new Map([ entry ]), - [ ...remotePeerIds ].filter((peerId) => peerId !== this.connection.grid.localPeerId) - ); - const result = [ ...respondedPeerIds.keys() ]; + const entry = [ event as string, this.payloadsCodec?.get(event)?.encode(...args) ?? JSON.stringify(args) ] as [string, string]; + const [ + respondedRemotePeerIds, + isLocalPeerSubscribed + ] = await Promise.all([ + this.connection.requestUpdateEntries( + new Map([ entry ]), + [ ...remotePeerIds ].filter((peerId) => peerId !== this.connection.grid.localPeerId) + ), + Promise.resolve(remotePeerIds.has(this.connection.grid.localPeerId) ? this._emitter.emit(event as string, ...args) : false) + ]); + const result = [ ...respondedRemotePeerIds.keys() ]; - if (remotePeerIds?.has(this.connection.grid.localPeerId)) { - this._emitter.emit(event as string, ...args); + if (isLocalPeerSubscribed) { result.push(this.connection.grid.localPeerId); } return result; } - public notify(event: K, ...args: T[K]): void { + public notify(event: K, ...args: T[K]): boolean { if (this._closed) throw new Error('Cannot publish on a closed emitter'); const remotePeerIds = this._subscriptions.get(event); + + if (!remotePeerIds || remotePeerIds.size < 1) { + return false; + } else if (remotePeerIds.size === 1 && remotePeerIds.has(this.connection.grid.localPeerId)) { + return this._emitter.emit(event as string, ...args); + } + const entry = [ event as string, this.payloadsCodec?.get(event)?.encode(...args) ?? JSON.stringify(args) ] as [string, string]; for (const remotePeerId of remotePeerIds ?? []) { - if (remotePeerId === this.connection.grid.localPeerId) continue; + if (remotePeerId === this.connection.grid.localPeerId) { + this._emitter.emit(event as string, ...args); + + continue; + } this.connection.notifyUpdateEntries( new Map([ entry ]), @@ -233,9 +303,7 @@ export class HamokEmitter { ); } - if (remotePeerIds?.has(this.connection.grid.localPeerId)) { - this._emitter.emit(event as string, ...args); - } + return true; } public export(): HamokEmitterSnapshot {