From c3f596fcb4ae40d79e40804012a15a2f49ebfd80 Mon Sep 17 00:00:00 2001 From: Balazs Kreith Date: Sat, 10 Aug 2024 16:41:47 +0300 Subject: [PATCH] * refine `Join` method * add more condition for `no-heartbeat` event * change examples * `JoinNotification` is sent without destination, hence broadcast * some additional condition for adding and removing remote peers * change readme for describing `join()` method --- docs/index.md | 81 +++++++++- examples/package.json | 3 +- examples/src/common-discovery-example-2.ts | 106 ------------- examples/src/common-discovery-example.ts | 93 ------------ examples/src/common-join-example.ts | 50 ++++++ examples/src/common-waiting-example-2.ts | 50 ++++++ examples/src/run-all.ts | 2 +- src/Hamok.ts | 142 ++++++++++++++---- src/index.ts | 5 +- src/messages/HamokGridCodec.ts | 2 +- src/messages/messagetypes/JoinNotification.ts | 2 +- src/raft/RaftEngine.ts | 14 ++ src/raft/RaftLeaderState.ts | 2 +- 13 files changed, 318 insertions(+), 234 deletions(-) delete mode 100644 examples/src/common-discovery-example-2.ts delete mode 100644 examples/src/common-discovery-example.ts create mode 100644 examples/src/common-join-example.ts create mode 100644 examples/src/common-waiting-example-2.ts diff --git a/docs/index.md b/docs/index.md index 8eb96f7..9d5b42b 100644 --- a/docs/index.md +++ b/docs/index.md @@ -11,6 +11,7 @@ - [Events](#events) - [Methods](#methods) 5. [Use Cases](#use-cases) + - [Join to the grid by using the `join()` method](#join-to-the-grid-by-using-the-join-method) - [Executing Tasks on the leader](#executing-tasks-on-the-leader) - [Creating and Managing Maps](#creating-and-managing-maps) - [Creating and Managing Records](#creating-and-managing-records) @@ -59,6 +60,12 @@ Hamok can be configured using the `HamokConstructorConfig` type. Here is an exam import { Hamok } from 'hamok'; const config = { + /** + * Indicate if the Hamok should stop automatically when there are no remote peers. + * + * DEFAULT: false + */ + autoStopOnNoRemotePeers: false, /** * The unique identifier for the peer in the Raft cluster. @@ -148,7 +155,6 @@ Hamok emits various events that can be listened to for handling specific actions - `commit`: Emitted when a commit occurs. - `heartbeat`: Emitted during heartbeats. - `error`: Emitted when an error occurs. -- `hello-notification`: Emitted when a hello notification is received. - `no-heartbeat-from`: Emitted when no heartbeat is received from a peer. @@ -196,10 +202,81 @@ Hamok emits various events that can be listened to for handling specific actions - **accept**(`message: HamokMessage`): `void` - Accepts a message and processes it according to its type and protocol. -- **fetchRemotePeers**(`options?: { customRequest?: string, timeoutInMs?: number }`): `Promise` +- **fetchRemotePeers**(`timeout?: number, customRequest?: HamokHelloNotificationCustomRequestType`): `Promise` - Fetches remote peers with optional custom requests and timeout. +- **join**(`params: HamokJoinProcessParams`): `Promise` + - Runs a join process with the provided parameters. See [here](#use-the-join-method) for more details. + ## Use cases +Here is a revised version of your text with improvements for clarity, grammar, and consistency: + +--- + +### Joining the Grid Using the `join()` Method + +Hamok provides an automated process to join a network of instances by connecting to remote peers. This feature simplifies integrating a new Hamok instance into an existing network. + +The automated join process consists of two phases: + +1. **Discover Remote Endpoints**: Add these endpoints to the local Hamok instance's list of remote peers. +2. **Notify Remote Peers**: Inform them about the local peer so they can add it to their lists. + +The first phase is executed by the `fetchRemotePeers` method, which is called by the `join` method. This method sends a `HelloNotification` message to remote peers. Each remote peer responds with an `EndpointStateNotification` message, which includes all the peers known to them. The local peer waits for these notifications within a specified timeout and then evaluates the responses. If no remote peers are received and the local instance does not have a remote peer, the process is either retried or an exception is raised. Additionally, the `HelloNotification` message can include a custom request, such as requesting a snapshot from the remote peers, which can be applied to the local instance if provided. + +In the second phase, a `JoinNotification` message is sent to remote peers, instructing them to add the local peer to their remote peer lists. + +Below is an example of using the `join` method: + +```typescript +await hamok.join({ + /** + * Timeout in milliseconds for fetching remote peers. + * + * DEFAULT: 5000 + */ + fetchRemotePeerTimeoutInMs: 3000, + + /** + * The maximum number of retries for fetching remote peers. + * -1 - means infinite retries + * 0 - means no retries + * + * DEFAULT: 3 + */ + maxRetry: 3, + + /** + * Indicates if remote peers should be automatically removed if no heartbeat is received. + * + * DEFAULT: true + */ + removeRemotePeersOnNoHeartbeat: true, + + /** + * Indicates if a snapshot should be requested from the remote peers. + * If provided, it is used locally. + * + * DEFAULT: false + */ + requestSnapshot: true, + + /** + * Indicates if the start() method should be called automatically after the join process is completed. + * + * DEFAULT: false + */ + startAfterJoin: true, +}); +``` + +In the above example, the method attempts to fetch remote peers three times, each with a timeout of 3000 milliseconds. If remote peers are not fetched within the given timeout, the process is retried. If the maximum number of retries is reached and the remote peers are still not fetched, an error is raised, indicating that joining is not possible. + +Once remote peers are fetched, the local peer selects the best snapshot from the remote peers (based on the highest raft terms and commit index) and applies it to the local instance. + +After the snapshot is applied and the remote peers are added to the local instance, the local peer sends a `JoinNotification` message to remote peers to add the local peer to their remote peer lists. + +If `startAfterJoin` is set to true, the `start` method is automatically called once the join process is completed. ### Executing Tasks on the leader diff --git a/examples/package.json b/examples/package.json index dbe462a..e5fedc6 100644 --- a/examples/package.json +++ b/examples/package.json @@ -18,8 +18,9 @@ "dev:queue:2": "nodemon -x ts-node src/queue-push-pop-example.ts | pino-pretty", "dev:common:1": "nodemon -x ts-node src/common-waiting-example.ts | pino-pretty", "dev:common:2": "nodemon -x ts-node src/common-import-export-example.ts | pino-pretty", - "dev:common:3": "nodemon -x ts-node src/common-discovery-example.ts | pino-pretty", + "dev:common:3": "nodemon -x ts-node src/common-join-example.ts | pino-pretty", "dev:common:4": "nodemon -x ts-node src/common-reelection-example.ts | pino-pretty", + "dev:common:5": "nodemon -x ts-node src/common-waiting-example-2.ts | pino-pretty", "build": "tsc", "test": "jest --config jest.config.js", "lint": "eslint --ext .ts src" diff --git a/examples/src/common-discovery-example-2.ts b/examples/src/common-discovery-example-2.ts deleted file mode 100644 index 3281d32..0000000 --- a/examples/src/common-discovery-example-2.ts +++ /dev/null @@ -1,106 +0,0 @@ -import { Hamok, HamokEventMap, HamokFetchRemotePeersResponse, HamokMessage, setHamokLogLevel } from 'hamok'; -import EventEmitter from 'events'; -import * as pino from 'pino'; - -const logger = pino.pino({ - name: 'common-waiting-example', - level: 'debug', -}); - -const pubSubServer = new class extends EventEmitter<{ message: [HamokMessage] }> { - private _servers = new Map(); - public add(server: Hamok) { - server.on('message', (message) => this.emit('message', message)); - this.on('message', (message) => server.accept(message)); - this._servers.set(server.localPeerId, server); - } - public remove(server: Hamok) { - server.removeAllListeners('message'); - this.off('message', (message) => server.accept(message)); - this._servers.delete(server.localPeerId); - } -}; - -export async function run() { - - const server_1 = new Hamok(); - const server_2 = new Hamok(); - - // by having the communication channel we assume we can inquery remote endpoints - pubSubServer.add(server_1); - pubSubServer.add(server_2); - - server_1.on('hello-notification', createHelloListener(server_1)); - server_1.on('no-heartbeat-from', createNoHeartbeatFromListener(server_1)); - server_2.on('hello-notification', createHelloListener(server_2)); - server_2.on('no-heartbeat-from', createNoHeartbeatFromListener(server_2)); - - // we fetch the remote endpoints - await server_1.fetchRemotePeers().then((response) => fetchRemoteEndpointHandler(server_1, response)); - await server_2.fetchRemotePeers().then((response) => fetchRemoteEndpointHandler(server_2, response)); - - server_1.start(); - server_2.start(); - - await Promise.all([ - new Promise(resolve => server_1.once('leader-changed', resolve)), - new Promise(resolve => server_2.once('leader-changed', resolve)), - ]); - - logger.info('Leader changed'); - - // add new Hamok to the grid - const server_3 = new Hamok(); - pubSubServer.add(server_3); - - server_3.on('hello-notification', createHelloListener(server_3)); - server_3.on('no-heartbeat-from', createNoHeartbeatFromListener(server_3)); - await server_3.fetchRemotePeers().then((response) => fetchRemoteEndpointHandler(server_3, response)); - - await new Promise(resolve => { - server_3.once('leader-changed', resolve) - server_3.start(); - }); - - logger.info('Leader changed'); - - await Promise.all([ - new Promise(resolve => server_2.once('no-heartbeat-from', resolve)), - new Promise(resolve => server_3.once('no-heartbeat-from', resolve)), - Promise.resolve(server_1.stop()), - Promise.resolve(pubSubServer.remove(server_1)), - ]); - - logger.info('Server_1 stopped'); - - server_2.stop(); - server_3.stop(); -} - -function createHelloListener(server: Hamok): (...args: HamokEventMap['hello-notification']) => void { - return (remotePeerId, request) => { - logger.info('%s received hello from %s, customRequest: %s', server.localPeerId, remotePeerId, request?.customData); - server.addRemotePeerId(remotePeerId); - - // IMPORTANT! if the notification holds a request, we must call the callback - if (request) request.callback('Hello from server'); - }; -} - -function createNoHeartbeatFromListener(server: Hamok): (...args: HamokEventMap['no-heartbeat-from']) => void { - return (remotePeerId) => { - logger.info('%s received no heartbeat from %s', server.localPeerId, remotePeerId); - server.removeRemotePeerId(remotePeerId); - }; -} - -function fetchRemoteEndpointHandler(server: Hamok, response: HamokFetchRemotePeersResponse): void { - logger.info('Adding remote peers to %s: %o', server.localPeerId, response.remotePeers); - response.remotePeers.forEach(remotePeerId => server.addRemotePeerId(remotePeerId)); -} - -if (require.main === module) { - logger.info('Running from module file'); - setHamokLogLevel('info'); - run(); -} diff --git a/examples/src/common-discovery-example.ts b/examples/src/common-discovery-example.ts deleted file mode 100644 index 6fbb52e..0000000 --- a/examples/src/common-discovery-example.ts +++ /dev/null @@ -1,93 +0,0 @@ -import { Hamok, HamokEventMap, HamokFetchRemotePeersResponse, setHamokLogLevel } from 'hamok'; -import * as pino from 'pino'; - -const logger = pino.pino({ - name: 'common-waiting-example', - level: 'debug', -}); - -export async function run() { - - const server_1 = new Hamok(); - const server_2 = new Hamok(); - - // by having the communication channel we assume we can inquery remote endpoints - server_1.on('message', server_2.accept.bind(server_2)); - server_2.on('message', server_1.accept.bind(server_1)); - - server_1.on('hello-notification', createHelloListener(server_1)); - server_1.on('no-heartbeat-from', createNoHeartbeatFromListener(server_1)); - server_2.on('hello-notification', createHelloListener(server_2)); - server_2.on('no-heartbeat-from', createNoHeartbeatFromListener(server_2)); - - // we fetch the remote endpoints - await server_1.fetchRemotePeers().then((response) => fetchRemoteEndpointHandler(server_1, response)); - await server_2.fetchRemotePeers().then((response) => fetchRemoteEndpointHandler(server_2, response)); - - server_1.start(); - server_2.start(); - - await Promise.all([ - new Promise(resolve => server_1.once('leader-changed', resolve)), - new Promise(resolve => server_2.once('leader-changed', resolve)), - ]); - - logger.info('Leader changed'); - - // add new Hamok to the grid - const server_3 = new Hamok(); - server_3.on('message', server_1.accept.bind(server_1)); - server_3.on('message', server_2.accept.bind(server_2)); - server_1.on('message', server_3.accept.bind(server_3)); - server_2.on('message', server_3.accept.bind(server_3)); - - server_3.on('hello-notification', createHelloListener(server_3)); - server_3.on('no-heartbeat-from', createNoHeartbeatFromListener(server_3)); - await server_3.fetchRemotePeers().then((response) => fetchRemoteEndpointHandler(server_3, response)); - - await new Promise(resolve => { - server_3.once('leader-changed', resolve) - server_3.start(); - }); - - logger.info('Leader changed'); - - await Promise.all([ - new Promise(resolve => server_2.once('no-heartbeat-from', resolve)), - new Promise(resolve => server_3.once('no-heartbeat-from', resolve)), - Promise.resolve(server_1.stop()) - ]); - - logger.info('Server_1 stopped'); - - server_2.stop(); - server_3.stop(); -} - -function createHelloListener(server: Hamok): (...args: HamokEventMap['hello-notification']) => void { - return (remotePeerId, request) => { - logger.info('%s received hello from %s, customRequest: %s', server.localPeerId, remotePeerId, request?.customData); - server.addRemotePeerId(remotePeerId); - - // IMPORTANT! if the notification holds a request, we must call the callback - if (request) request.callback('Hello from server'); - }; -} - -function createNoHeartbeatFromListener(server: Hamok): (...args: HamokEventMap['no-heartbeat-from']) => void { - return (remotePeerId) => { - logger.info('%s received no heartbeat from %s', server.localPeerId, remotePeerId); - server.removeRemotePeerId(remotePeerId); - }; -} - -function fetchRemoteEndpointHandler(server: Hamok, response: HamokFetchRemotePeersResponse): void { - logger.info('Adding remote peers to %s: %o', server.localPeerId, response.remotePeers); - response.remotePeers.forEach(remotePeerId => server.addRemotePeerId(remotePeerId)); -} - -if (require.main === module) { - logger.info('Running from module file'); - setHamokLogLevel('info'); - run(); -} diff --git a/examples/src/common-join-example.ts b/examples/src/common-join-example.ts new file mode 100644 index 0000000..0fdc4c1 --- /dev/null +++ b/examples/src/common-join-example.ts @@ -0,0 +1,50 @@ +import { Hamok, HamokEventMap, HamokFetchRemotePeersResponse, setHamokLogLevel } from 'hamok'; +import * as pino from 'pino'; + +const logger = pino.pino({ + name: 'common-join-example', + level: 'debug', +}); + +export async function run() { + + const server_1 = new Hamok(); + const server_2 = new Hamok(); + + // by having the communication channel we assume we can inquery remote endpoints + server_1.on('message', server_2.accept.bind(server_2)); + server_2.on('message', server_1.accept.bind(server_1)); + + await Promise.all([ + server_1.join(), + server_2.join(), + ]); + + logger.info('Server 1 and Server 2 joined'); + + // add new Hamok to the grid + const server_3 = new Hamok(); + server_3.on('message', server_1.accept.bind(server_1)); + server_3.on('message', server_2.accept.bind(server_2)); + server_1.on('message', server_3.accept.bind(server_3)); + server_2.on('message', server_3.accept.bind(server_3)); + + await server_3.join(); + + logger.info('Server 3 joined, let\'s stop server_1 %s', server_1.localPeerId); + + await Promise.all([ + new Promise(resolve => server_2.once('no-heartbeat-from', peerId => (logger.info('Server_2 no-heartbeat-from %s', peerId), resolve()))), + new Promise(resolve => server_3.once('no-heartbeat-from', peerId => (logger.info('Server_3 no-heartbeat-from %s', peerId), resolve()))), + Promise.resolve(server_1.stop()) + ]); + + server_2.stop(); + server_3.stop(); +} + +if (require.main === module) { + logger.info('Running from module file'); + setHamokLogLevel('info'); + run(); +} diff --git a/examples/src/common-waiting-example-2.ts b/examples/src/common-waiting-example-2.ts new file mode 100644 index 0000000..48ba138 --- /dev/null +++ b/examples/src/common-waiting-example-2.ts @@ -0,0 +1,50 @@ +import { Hamok, setHamokLogLevel } from 'hamok'; +import * as pino from 'pino'; + +const logger = pino.pino({ + name: 'common-waiting-example-2', + level: 'debug', +}); + +type ExampleEventMap = { + 'event-1': [number, string, boolean], + 'event-2': [number, string], +} + +export async function run() { + + const server_1 = new Hamok(); + const server_2 = new Hamok(); + + const storage_1 = server_1.createMap({ + mapId: 'my-replicated-storage', + maxMessageWaitingTimeInMs: 20000, + }); + const storage_2 = server_2.createMap({ + mapId: 'my-replicated-storage', + maxMessageWaitingTimeInMs: 20000, + }); + + server_1.on('message', server_2.accept.bind(server_2)); + server_2.on('message', server_1.accept.bind(server_1)); + + server_1.addRemotePeerId(server_2.localPeerId); + server_2.addRemotePeerId(server_1.localPeerId); + + server_1.start(); + server_2.start(); + + await storage_1.insert('key', 1); + await server_2.waitUntilCommitHead(); + + logger.info('Value from server_2: %s', storage_1.get('key')); + + server_1.stop(); + server_2.stop(); +} + +if (require.main === module) { + logger.info('Running from module file'); + setHamokLogLevel('info'); + run(); +} diff --git a/examples/src/run-all.ts b/examples/src/run-all.ts index edd5003..54645b3 100644 --- a/examples/src/run-all.ts +++ b/examples/src/run-all.ts @@ -1,6 +1,6 @@ import { setHamokLogLevel } from 'hamok'; import { run as reelection } from './common-reelection-example'; -import { run as discovery } from './common-discovery-example'; +import { run as discovery } from './common-join-example'; import { run as mapUpdateIf } from './map-update-if-example'; import { run as mapInsert } from './map-insert-get-example' import { run as queuePushPop } from './queue-push-pop-example'; diff --git a/src/Hamok.ts b/src/Hamok.ts index 958617d..2d2b217 100644 --- a/src/Hamok.ts +++ b/src/Hamok.ts @@ -34,28 +34,49 @@ export type HamokJoinProcessParams = { /** * Timeout in milliseconds for fetching the remote peers. + * + * DEFAULT: 5000 */ - fetchRemotePeerTimeoutInMs: number, + fetchRemotePeerTimeoutInMs?: number, + + /** + * The maximum number of retries for fetching the remote peers. + * -1 - means infinite retries + * 0 - means no retries + * + * DEFAULT: 3 + */ + maxRetry?: number; /** * Indicate if the remote peers automatically should be removed if no heartbeat is received. + * + * DEFAULT: true */ - removeRemotePeersOnNoHeartbeat: boolean, + removeRemotePeersOnNoHeartbeat?: boolean, /** * indicates if the snapshot should be requested from the remote peers, * and if it is provided then it is used in local + * + * DEFAULT: false */ requestSnapshot?: boolean, /** * indicates if the start() method should be called automatically after the join process is completed + * + * DEFAULT: false */ startAfterJoin?: boolean, } export type HamokConfig = { - // empty + + /** + * Indicate if the Hamok should stop automatically when there are no remote peers. + */ + autoStopOnNoRemotePeers?: boolean, } /** @@ -287,6 +308,7 @@ export type HamokEventMap = { started: [], stopped: [], follower: [], + candidate: [], leader: [], message: [message: HamokMessage] 'remote-peer-joined': [peerId: string], @@ -315,6 +337,7 @@ export class Hamok extends EventEmitter { // eslint-disable-next-line @typescript-eslint/no-explicit-any public readonly emitters = new Map>(); + private _joining?: Promise; private _raftTimer?: ReturnType; private _remoteStateRequest?: { timer: ReturnType, responses: EndpointStatesNotification[] }; private readonly _remoteHeartbeats = new Map>(); @@ -387,7 +410,7 @@ export class Hamok extends EventEmitter { public start(): void { if (this._raftTimer) { - return; + return logger.debug('Hamok is already running'); } const raftEngine = this.raft; @@ -407,7 +430,7 @@ export class Hamok extends EventEmitter { this.emit('started'); } - + public stop() { if (!this._raftTimer) { return; @@ -446,6 +469,9 @@ export class Hamok extends EventEmitter { } public addRemotePeerId(remoteEndpointId: string): void { + if (remoteEndpointId === this.localPeerId) return; + if (this.raft.remotePeers.has(remoteEndpointId)) return; + this.raft.remotePeers.add(remoteEndpointId); logger.debug('%s added remote peer %s', this.localPeerId, remoteEndpointId); @@ -454,11 +480,17 @@ export class Hamok extends EventEmitter { } public removeRemotePeerId(remoteEndpointId: string): void { - this.raft.remotePeers.delete(remoteEndpointId); + if (!this.raft.remotePeers.delete(remoteEndpointId)) return; logger.debug('%s removed remote peer %s', this.localPeerId, remoteEndpointId); this.emit('remote-peer-left', remoteEndpointId); + + if (this.remotePeerIds.size === 0) { + if (this.config.autoStopOnNoRemotePeers) { + this.stop(); + } + } } public export(): HamokSnapshot { @@ -843,6 +875,8 @@ export class Hamok extends EventEmitter { switch (message.type) { case HamokMessageType.RAFT_APPEND_ENTRIES_REQUEST_CHUNK: case HamokMessageType.RAFT_APPEND_ENTRIES_RESPONSE: + case HamokMessageType.RAFT_VOTE_REQUEST: + case HamokMessageType.RAFT_VOTE_RESPONSE: this._acceptKeepAliveHamokMessage(message); break; } @@ -873,14 +907,53 @@ export class Hamok extends EventEmitter { } } - public async join(params: HamokJoinProcessParams): Promise { + public async join(params?: HamokJoinProcessParams): Promise { + if (this._joining) return this._joining; + try { + this._joining = this._join({ + startAfterJoin: params?.startAfterJoin ?? true, + fetchRemotePeerTimeoutInMs: params?.fetchRemotePeerTimeoutInMs ?? 5000, + requestSnapshot: params?.requestSnapshot ?? false, + maxRetry: params?.maxRetry ?? 3, + removeRemotePeersOnNoHeartbeat: params?.removeRemotePeersOnNoHeartbeat ?? true, + }); + + await this._joining; + } finally { + this._joining = undefined; + } + } + + public async _join(params: Required, retried = 0): Promise { + const { + startAfterJoin, + fetchRemotePeerTimeoutInMs, + requestSnapshot, + maxRetry, + removeRemotePeersOnNoHeartbeat, + } = params ?? {}; + + logger.debug('Joining the network. startAfterJoin: %s, fetchRemotePeerTimeoutInMs: %s, requestSnapshot: %s, maxRetry: %s, removeRemotePeersOnNoHeartbeat: %s', + startAfterJoin, fetchRemotePeerTimeoutInMs, requestSnapshot, maxRetry, removeRemotePeersOnNoHeartbeat + ); + const { remotePeers, customResponses } = await this.fetchRemotePeers( - params.fetchRemotePeerTimeoutInMs, - params?.requestSnapshot ? 'snapshot' : undefined + fetchRemotePeerTimeoutInMs, + requestSnapshot ? 'snapshot' : undefined ); let bestSnapshot: HamokSnapshot | undefined; - if (params.requestSnapshot) { + if (remotePeers.length < 1) { + if (0 <= maxRetry && maxRetry <= retried) throw new Error('No remote peers found'); + + logger.warn('No remote peers found, retrying %s/%s', retried, maxRetry < 0 ? '∞' : maxRetry); + + return this._join(params, retried + 1); + } + + logger.debug('Remote peers found %o', remotePeers); + + if (requestSnapshot) { for (const serializedSnapshot of customResponses ?? []) { try { const snapshot = JSON.parse(serializedSnapshot) as HamokSnapshot; @@ -896,6 +969,8 @@ export class Hamok extends EventEmitter { } } + logger.debug('Best snapshot %o', bestSnapshot); + if (bestSnapshot) { try { this.import(bestSnapshot); @@ -904,7 +979,7 @@ export class Hamok extends EventEmitter { } } - if (params.removeRemotePeersOnNoHeartbeat) { + if (removeRemotePeersOnNoHeartbeat) { const noHeartbeatListener = (remotePeerId: string) => this.removeRemotePeerId(remotePeerId); this.once('stopped', () => { @@ -913,18 +988,27 @@ export class Hamok extends EventEmitter { this.on('no-heartbeat-from', noHeartbeatListener); } - for (const remotePeerId of remotePeers) { - // this._remoteHeartbeats - this._addNoHeartbeatTimer(remotePeerId); - this.addRemotePeerId(remotePeerId); + const joinMsg = this._codec.encodeJoinNotification(new JoinNotification(this.localPeerId)); - const message = this._codec.encodeJoinNotification(new JoinNotification(this.localPeerId, remotePeerId)); + // this will trigger the remote endpoint to add this endpoint + this._emitMessage(joinMsg); - // this will trigger the remote endpoint to add this endpoint - this._emitMessage(message, remotePeerId); - } - if (params.startAfterJoin) { - this.start(); + if (startAfterJoin) { + let leaderElected: () => void | undefined; + let noMoreRemotePeers: () => void | undefined; + + return new Promise((resolve, reject) => { + leaderElected = () => (this.raft.leaderId !== undefined ? resolve() : void 0); + noMoreRemotePeers = () => (this.remotePeerIds.size === 0 ? reject(new Error('No remote peers')) : void 0); + + this.on('leader-changed', leaderElected); + this.on('remote-peer-left', noMoreRemotePeers); + this.start(); + + }).finally(() => { + this.off('leader-changed', leaderElected); + this.off('remote-peer-left', noMoreRemotePeers); + }); } } @@ -995,12 +1079,12 @@ export class Hamok extends EventEmitter { case HamokMessageType.JOIN_NOTIFICATION: { const notification = this._codec.decodeJoinNotification(message); - if (notification.sourcePeerId === this.localPeerId) { + if (notification.sourcePeerId !== this.localPeerId) { + this.addRemotePeerId(notification.sourcePeerId); + } else { logger.warn('%s Received join notification from itself %o', this.localPeerId, notification); - break; } - - this.addRemotePeerId(notification.sourcePeerId); + break; } case HamokMessageType.ENDPOINT_STATES_NOTIFICATION: { @@ -1143,11 +1227,17 @@ export class Hamok extends EventEmitter { private _addNoHeartbeatTimer(remotePeerId: string) { clearTimeout(this._remoteHeartbeats.get(remotePeerId)); + + logger.trace('%s Add no heartbeat timeout for %s', this.localPeerId, remotePeerId); const timer = setTimeout(() => { this._remoteHeartbeats.delete(remotePeerId); + + if (this._joining) { + return this._addNoHeartbeatTimer(remotePeerId); + } this.emit('no-heartbeat-from', remotePeerId); - }, this.raft.config.followerMaxIdleInMs); + }, this.raft.config.electionTimeoutInMs); this._remoteHeartbeats.set(remotePeerId, timer); } diff --git a/src/index.ts b/src/index.ts index 659fc92..8497417 100644 --- a/src/index.ts +++ b/src/index.ts @@ -8,9 +8,10 @@ export { HamokFetchRemotePeersResponse, HamokEventMap, HamokMapBuilderConfig, + HamokJoinProcessParams, } from './Hamok'; export { - HamokMap as HamokStorage + HamokMap, } from './collections/HamokMap'; export { HamokQueue @@ -25,7 +26,7 @@ export { HamokSnapshot, HamokEmitterSnapshot, HamokQueueSnapshot, - HamokMapSnapshot as HamokStorageSnapshot, + HamokMapSnapshot, } from './HamokSnapshot'; export { LogEntry diff --git a/src/messages/HamokGridCodec.ts b/src/messages/HamokGridCodec.ts index 7b56255..21e3620 100644 --- a/src/messages/HamokGridCodec.ts +++ b/src/messages/HamokGridCodec.ts @@ -120,7 +120,7 @@ export class HamokGridCodec implements HamokCodec { return new JoinNotification( message.sourceId!, - message.destinationId!, + message.destinationId, ); } diff --git a/src/messages/messagetypes/JoinNotification.ts b/src/messages/messagetypes/JoinNotification.ts index 94aae42..76adc40 100644 --- a/src/messages/messagetypes/JoinNotification.ts +++ b/src/messages/messagetypes/JoinNotification.ts @@ -2,7 +2,7 @@ export class JoinNotification { public constructor( public readonly sourcePeerId: string, - public readonly destinationPeerId: string, + public readonly destinationPeerId?: string, ) { } } \ No newline at end of file diff --git a/src/raft/RaftEngine.ts b/src/raft/RaftEngine.ts index b62ad7b..834122e 100644 --- a/src/raft/RaftEngine.ts +++ b/src/raft/RaftEngine.ts @@ -55,6 +55,7 @@ export class RaftEngine { private _leaderId?: string; public readonly remotePeers = new Set(); public readonly transport = new RaftMessageEmitter(); + private _failedElections = 0; public constructor( public readonly config: RaftEngineConfig, @@ -75,6 +76,10 @@ export class RaftEngine { return this._leaderId; } + public get failedElections(): number { + return this._failedElections; + } + public set leaderId(newLeaderId: string | undefined) { if (this._leaderId === newLeaderId) return; const prevLeaderId = this._leaderId; @@ -83,6 +88,10 @@ export class RaftEngine { logger.info(`%s Leader changed from ${prevLeaderId} to ${newLeaderId}`, this.localPeerId); + if (newLeaderId !== undefined) { + this._failedElections = 0; + } + this.events.emit('leader-changed', newLeaderId, prevLeaderId); } @@ -99,6 +108,10 @@ export class RaftEngine { logger.debug(`%s State changed from ${prevState.stateName} to ${newState.stateName}`, this.localPeerId); + if (prevState.stateName === 'candidate' && newState.stateName === 'follower') { + ++this._failedElections; + } + newState.init?.(); this.events.emit('state-changed', newState.stateName); @@ -106,6 +119,7 @@ export class RaftEngine { switch (newState.stateName) { case 'leader': case 'follower': + case 'candidate': this.events.emit(newState.stateName); break; } diff --git a/src/raft/RaftLeaderState.ts b/src/raft/RaftLeaderState.ts index 7662e98..21acb09 100644 --- a/src/raft/RaftLeaderState.ts +++ b/src/raft/RaftLeaderState.ts @@ -75,7 +75,7 @@ export function createRaftLeaderState(context: RaftLeaderStateContext): RaftStat return follow(); } // now we are talking in my term... - logger.debug('Received RaftAppendEntriesResponse %o', response); + logger.trace('Received RaftAppendEntriesResponse %o', response); // if (localPeerId !== response.sourcePeerId) { // remotePeers.touch(response.sourcePeerId); // }