diff --git a/README.md b/README.md index 03bc87b..1c8e614 100644 --- a/README.md +++ b/README.md @@ -28,6 +28,7 @@ yarn add hamok - [HamokEmitter](#hamokemitter) - [HamokRecord](#hamokrecord) - [User Manual](#user-manual) +- [Important Notes](#important-notes) - [Contributing](#contributing) - [License](#license) ## Quick Start diff --git a/schema/hamokMessage.proto b/schema/hamokMessage.proto index ba2aaca..cd0e299 100644 --- a/schema/hamokMessage.proto +++ b/schema/hamokMessage.proto @@ -22,6 +22,12 @@ message HamokMessage { */ ONGOING_REQUESTS_NOTIFICATION = 3; + /** + * Join notification is sent by a new endpoint to every other endpoint + * in order to join the grid + */ + JOIN_NOTIFICATION = 4; + /** * Raft Vote request is sent by a raccoon made itself a candidate * in order to be a leader of the cluster diff --git a/schema/hamokMessage_pb.ts b/schema/hamokMessage_pb.ts index f0df892..b2390a7 100644 --- a/schema/hamokMessage_pb.ts +++ b/schema/hamokMessage_pb.ts @@ -231,6 +231,15 @@ export enum HamokMessage_MessageType { */ ONGOING_REQUESTS_NOTIFICATION = 3, + /** + * * + * Join notification is sent by a new endpoint to every other endpoint + * in order to join the grid + * + * @generated from enum value: JOIN_NOTIFICATION = 4; + */ + JOIN_NOTIFICATION = 4, + /** * * * Raft Vote request is sent by a raccoon made itself a candidate @@ -467,6 +476,7 @@ proto2.util.setEnumType(HamokMessage_MessageType, "io.github.hamok.dev.schema.Ha { no: 1, name: "HELLO_NOTIFICATION" }, { no: 2, name: "ENDPOINT_STATES_NOTIFICATION" }, { no: 3, name: "ONGOING_REQUESTS_NOTIFICATION" }, + { no: 4, name: "JOIN_NOTIFICATION" }, { no: 12, name: "RAFT_VOTE_REQUEST" }, { no: 13, name: "RAFT_VOTE_RESPONSE" }, { no: 16, name: "RAFT_APPEND_ENTRIES_REQUEST_CHUNK" }, diff --git a/src/Hamok.ts b/src/Hamok.ts index 4d4ecbc..958617d 100644 --- a/src/Hamok.ts +++ b/src/Hamok.ts @@ -24,12 +24,38 @@ import { RaftLogs } from './raft/RaftLogs'; import { HamokRecord, HamokRecordObject } from './collections/HamokRecord'; import { HelloNotification } from './messages/messagetypes/HelloNotification'; import { EndpointStatesNotification } from './messages/messagetypes/EndpointNotification'; +import { JoinNotification } from './messages/messagetypes/JoinNotification'; const logger = createLogger('Hamok'); +type HamokHelloNotificationCustomRequestType = 'snapshot'; + +export type HamokJoinProcessParams = { + + /** + * Timeout in milliseconds for fetching the remote peers. + */ + fetchRemotePeerTimeoutInMs: number, + + /** + * Indicate if the remote peers automatically should be removed if no heartbeat is received. + */ + removeRemotePeersOnNoHeartbeat: boolean, + + /** + * indicates if the snapshot should be requested from the remote peers, + * and if it is provided then it is used in local + */ + requestSnapshot?: boolean, + + /** + * indicates if the start() method should be called automatically after the join process is completed + */ + startAfterJoin?: boolean, +} + export type HamokConfig = { // empty - raftLogs?: RaftLogs, } /** @@ -57,6 +83,11 @@ export type HamokConstructorConfig = RaftEngineConfig & HamokConfig & { * automatically stopping notifications for explicitly postponed requests. */ ongoingRequestsSendingPeriodInMs: number; + + /** + * Optional. A custom implementation of RaftLogs to store log entries. + */ + raftLogs?: RaftLogs, } /** @@ -265,10 +296,10 @@ export type HamokEventMap = { commit: [commitIndex: number, message: HamokMessage], heartbeat: [], error: [error: Error], - 'hello-notification': [remotePeerId: string, request: { - customData: string, - callback: (response: string) => void, - } | undefined], + // 'hello-notification': [remotePeerId: string, request: { + // customData: string, + // callback: (response: string) => void, + // } | undefined], 'no-heartbeat-from': [remotePeerId: string], } @@ -812,7 +843,7 @@ export class Hamok extends EventEmitter { switch (message.type) { case HamokMessageType.RAFT_APPEND_ENTRIES_REQUEST_CHUNK: case HamokMessageType.RAFT_APPEND_ENTRIES_RESPONSE: - this._acceptAppendRequestResponse(message); + this._acceptKeepAliveHamokMessage(message); break; } this.raft.transport.receive(message); @@ -842,8 +873,67 @@ export class Hamok extends EventEmitter { } } - public async fetchRemotePeers(options?: { customRequest?: string, timeoutInMs?: number }): Promise { - const helloMsg = this._codec.encodeHelloNotification(new HelloNotification(this.localPeerId, this.raft.leaderId)); + public async join(params: HamokJoinProcessParams): Promise { + const { remotePeers, customResponses } = await this.fetchRemotePeers( + params.fetchRemotePeerTimeoutInMs, + params?.requestSnapshot ? 'snapshot' : undefined + ); + let bestSnapshot: HamokSnapshot | undefined; + + if (params.requestSnapshot) { + for (const serializedSnapshot of customResponses ?? []) { + try { + const snapshot = JSON.parse(serializedSnapshot) as HamokSnapshot; + + if (!bestSnapshot) bestSnapshot = snapshot; + + if (bestSnapshot.term < snapshot.term || bestSnapshot.commitIndex < snapshot.commitIndex) { + bestSnapshot = snapshot; + } + } catch (err) { + logger.error('Failed to parse snapshot %o', err); + } + } + } + + if (bestSnapshot) { + try { + this.import(bestSnapshot); + } catch (err) { + logger.error('Failed to import snapshot %o', err); + } + } + + if (params.removeRemotePeersOnNoHeartbeat) { + const noHeartbeatListener = (remotePeerId: string) => this.removeRemotePeerId(remotePeerId); + + this.once('stopped', () => { + this.off('no-heartbeat-from', noHeartbeatListener); + }); + this.on('no-heartbeat-from', noHeartbeatListener); + } + + for (const remotePeerId of remotePeers) { + // this._remoteHeartbeats + this._addNoHeartbeatTimer(remotePeerId); + this.addRemotePeerId(remotePeerId); + + const message = this._codec.encodeJoinNotification(new JoinNotification(this.localPeerId, remotePeerId)); + + // this will trigger the remote endpoint to add this endpoint + this._emitMessage(message, remotePeerId); + } + if (params.startAfterJoin) { + this.start(); + } + } + + public async fetchRemotePeers(timeout?: number, customRequest?: HamokHelloNotificationCustomRequestType): Promise { + const helloMsg = this._codec.encodeHelloNotification(new HelloNotification( + this.localPeerId, + this.raft.leaderId, + customRequest + )); return new Promise((resolve) => { const remotePeerIds = new Set(); @@ -865,7 +955,7 @@ export class Hamok extends EventEmitter { remotePeers: [ ...remotePeerIds ], customResponses: 0 < customResponses.length ? customResponses : undefined, }); - }, options?.timeoutInMs ?? 3000); + }, timeout ?? 5000); this._remoteStateRequest = { timer, @@ -880,36 +970,37 @@ export class Hamok extends EventEmitter { switch (message.type) { case HamokMessageType.HELLO_NOTIFICATION: { const hello = this._codec.decodeHelloNotification(message); - const customRequest = hello.customData; - let replying: Promise | undefined; - - if (customRequest) { - replying = new Promise((resolve) => { - if (!this.emit('hello-notification', hello.sourcePeerId, { - customData: customRequest, - callback: (response) => resolve(response), - })) { - logger.warn('%s Received hello notification with custom data but no listener is registered %o', this.localPeerId, hello); - resolve(undefined); - } - }); - } else this.emit('hello-notification', hello.sourcePeerId, undefined); - - (replying ?? Promise.resolve(undefined)).then((customResponse) => { - const notification = this._codec.encodeEndpointStateNotification(new EndpointStatesNotification( - this.localPeerId, - hello.sourcePeerId, - this.raft.props.currentTerm, - this.raft.logs.commitIndex, - this.leader ? this.raft.logs.nextIndex : -1, - this.raft.logs.size, - this.raft.remotePeers, - customResponse - )); - - this.emit('message', notification); - }); - + let customResponse: string | undefined; + + switch (hello.customData as HamokHelloNotificationCustomRequestType) { + case 'snapshot': { + customResponse = JSON.stringify(this.export()); + } + } + + const notification = this._codec.encodeEndpointStateNotification(new EndpointStatesNotification( + this.localPeerId, + hello.sourcePeerId, + this.raft.props.currentTerm, + this.raft.logs.commitIndex, + this.leader ? this.raft.logs.nextIndex : -1, + this.raft.logs.size, + this.raft.remotePeers, + customResponse + )); + + this.emit('message', notification); + break; + } + case HamokMessageType.JOIN_NOTIFICATION: { + const notification = this._codec.decodeJoinNotification(message); + + if (notification.sourcePeerId === this.localPeerId) { + logger.warn('%s Received join notification from itself %o', this.localPeerId, notification); + break; + } + + this.addRemotePeerId(notification.sourcePeerId); break; } case HamokMessageType.ENDPOINT_STATES_NOTIFICATION: { @@ -1043,15 +1134,21 @@ export class Hamok extends EventEmitter { } } - private _acceptAppendRequestResponse(message: HamokMessage) { + private _acceptKeepAliveHamokMessage(message: HamokMessage) { if (!message.sourceId || message.sourceId === this.localPeerId) return; const remotePeerId = message.sourceId; - clearTimeout(this._remoteHeartbeats.get(message.sourceId)); + this._addNoHeartbeatTimer(remotePeerId); + } + + private _addNoHeartbeatTimer(remotePeerId: string) { + clearTimeout(this._remoteHeartbeats.get(remotePeerId)); - this._remoteHeartbeats.set(remotePeerId, setTimeout(() => { + const timer = setTimeout(() => { this._remoteHeartbeats.delete(remotePeerId); this.emit('no-heartbeat-from', remotePeerId); - }, this.raft.config.followerMaxIdleInMs)); + }, this.raft.config.followerMaxIdleInMs); + + this._remoteHeartbeats.set(remotePeerId, timer); } } \ No newline at end of file diff --git a/src/messages/HamokGridCodec.ts b/src/messages/HamokGridCodec.ts index a938505..7b56255 100644 --- a/src/messages/HamokGridCodec.ts +++ b/src/messages/HamokGridCodec.ts @@ -7,11 +7,13 @@ import { OngoingRequestsNotification } from './messagetypes/OngoingRequests'; import * as Collections from '../common/Collections'; import { EndpointStatesNotification } from './messagetypes/EndpointNotification'; import { HelloNotification } from './messagetypes/HelloNotification'; +import { JoinNotification } from './messagetypes/JoinNotification'; const logger = createLogger('GridCodec'); type Input = HelloNotification | +JoinNotification | EndpointStatesNotification | OngoingRequestsNotification | StorageSyncRequest | @@ -41,6 +43,8 @@ export class HamokGridCodec implements HamokCodec { switch (input.constructor) { case HelloNotification: return this.encodeHelloNotification(input as HelloNotification); + case JoinNotification: + return this.encodeJoinNotification(input as JoinNotification); case EndpointStatesNotification: return this.encodeEndpointStateNotification(input as EndpointStatesNotification); case OngoingRequestsNotification: @@ -58,6 +62,8 @@ export class HamokGridCodec implements HamokCodec { switch (message.type) { case MessageType.HELLO_NOTIFICATION: return this.decodeHelloNotification(message); + case MessageType.JOIN_NOTIFICATION: + return this.decodeJoinNotification(message); case MessageType.ENDPOINT_STATES_NOTIFICATION: return this.decodeEndpointStateNotification(message); case MessageType.ONGOING_REQUESTS_NOTIFICATION: @@ -97,6 +103,27 @@ export class HamokGridCodec implements HamokCodec { ); } + public encodeJoinNotification(notification: JoinNotification): HamokMessage { + return new HamokMessage({ + // eslint-disable-next-line camelcase + protocol: HamokMessageProtocol.GRID_COMMUNICATION_PROTOCOL, + type: MessageType.JOIN_NOTIFICATION, + sourceId: notification.sourcePeerId, + destinationId: notification.destinationPeerId, + }); + } + + public decodeJoinNotification(message: HamokMessage): JoinNotification { + if (message.type !== MessageType.JOIN_NOTIFICATION) { + throw new Error('decodeJoinNotification(): Message type must be JOIN_NOTIFICATION'); + } + + return new JoinNotification( + message.sourceId!, + message.destinationId!, + ); + } + public encodeEndpointStateNotification(notification: EndpointStatesNotification): HamokMessage { const activeEndpointIds = setToArray(notification.activeEndpointIds); diff --git a/src/messages/HamokMessage.ts b/src/messages/HamokMessage.ts index f0df892..b2390a7 100644 --- a/src/messages/HamokMessage.ts +++ b/src/messages/HamokMessage.ts @@ -231,6 +231,15 @@ export enum HamokMessage_MessageType { */ ONGOING_REQUESTS_NOTIFICATION = 3, + /** + * * + * Join notification is sent by a new endpoint to every other endpoint + * in order to join the grid + * + * @generated from enum value: JOIN_NOTIFICATION = 4; + */ + JOIN_NOTIFICATION = 4, + /** * * * Raft Vote request is sent by a raccoon made itself a candidate @@ -467,6 +476,7 @@ proto2.util.setEnumType(HamokMessage_MessageType, "io.github.hamok.dev.schema.Ha { no: 1, name: "HELLO_NOTIFICATION" }, { no: 2, name: "ENDPOINT_STATES_NOTIFICATION" }, { no: 3, name: "ONGOING_REQUESTS_NOTIFICATION" }, + { no: 4, name: "JOIN_NOTIFICATION" }, { no: 12, name: "RAFT_VOTE_REQUEST" }, { no: 13, name: "RAFT_VOTE_RESPONSE" }, { no: 16, name: "RAFT_APPEND_ENTRIES_REQUEST_CHUNK" }, diff --git a/src/messages/messagetypes/JoinNotification.ts b/src/messages/messagetypes/JoinNotification.ts new file mode 100644 index 0000000..94aae42 --- /dev/null +++ b/src/messages/messagetypes/JoinNotification.ts @@ -0,0 +1,8 @@ +export class JoinNotification { + + public constructor( + public readonly sourcePeerId: string, + public readonly destinationPeerId: string, + ) { + } +} \ No newline at end of file