Skip to content

Commit

Permalink
* Add JOIN_NOTIFICATION message type
Browse files Browse the repository at this point in the history
 * Add `join()` method to hamok to make auto discovery easier
 * remove 'hello-notification' event from Hamok as it become internal
 * add encode / decode for JoinNotification
 * add `JoinNotification` message class
  • Loading branch information
balazskreith committed Aug 9, 2024
1 parent d2d62a8 commit 99119fd
Show file tree
Hide file tree
Showing 7 changed files with 202 additions and 43 deletions.
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ yarn add hamok
- [HamokEmitter](#hamokemitter)
- [HamokRecord](#hamokrecord)
- [User Manual](#user-manual)
- [Important Notes](#important-notes)
- [Contributing](#contributing)
- [License](#license)
## Quick Start
Expand Down
6 changes: 6 additions & 0 deletions schema/hamokMessage.proto
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,12 @@ message HamokMessage {
*/
ONGOING_REQUESTS_NOTIFICATION = 3;

/**
* Join notification is sent by a new endpoint to every other endpoint
* in order to join the grid
*/
JOIN_NOTIFICATION = 4;

/**
* Raft Vote request is sent by a raccoon made itself a candidate
* in order to be a leader of the cluster
Expand Down
10 changes: 10 additions & 0 deletions schema/hamokMessage_pb.ts
Original file line number Diff line number Diff line change
Expand Up @@ -231,6 +231,15 @@ export enum HamokMessage_MessageType {
*/
ONGOING_REQUESTS_NOTIFICATION = 3,

/**
* *
* Join notification is sent by a new endpoint to every other endpoint
* in order to join the grid
*
* @generated from enum value: JOIN_NOTIFICATION = 4;
*/
JOIN_NOTIFICATION = 4,

/**
* *
* Raft Vote request is sent by a raccoon made itself a candidate
Expand Down Expand Up @@ -467,6 +476,7 @@ proto2.util.setEnumType(HamokMessage_MessageType, "io.github.hamok.dev.schema.Ha
{ no: 1, name: "HELLO_NOTIFICATION" },
{ no: 2, name: "ENDPOINT_STATES_NOTIFICATION" },
{ no: 3, name: "ONGOING_REQUESTS_NOTIFICATION" },
{ no: 4, name: "JOIN_NOTIFICATION" },
{ no: 12, name: "RAFT_VOTE_REQUEST" },
{ no: 13, name: "RAFT_VOTE_RESPONSE" },
{ no: 16, name: "RAFT_APPEND_ENTRIES_REQUEST_CHUNK" },
Expand Down
183 changes: 140 additions & 43 deletions src/Hamok.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,38 @@ import { RaftLogs } from './raft/RaftLogs';
import { HamokRecord, HamokRecordObject } from './collections/HamokRecord';
import { HelloNotification } from './messages/messagetypes/HelloNotification';
import { EndpointStatesNotification } from './messages/messagetypes/EndpointNotification';
import { JoinNotification } from './messages/messagetypes/JoinNotification';

const logger = createLogger('Hamok');

type HamokHelloNotificationCustomRequestType = 'snapshot';

export type HamokJoinProcessParams = {

/**
* Timeout in milliseconds for fetching the remote peers.
*/
fetchRemotePeerTimeoutInMs: number,

/**
* Indicate if the remote peers automatically should be removed if no heartbeat is received.
*/
removeRemotePeersOnNoHeartbeat: boolean,

/**
* indicates if the snapshot should be requested from the remote peers,
* and if it is provided then it is used in local
*/
requestSnapshot?: boolean,

/**
* indicates if the start() method should be called automatically after the join process is completed
*/
startAfterJoin?: boolean,
}

export type HamokConfig = {
// empty
raftLogs?: RaftLogs,
}

/**
Expand Down Expand Up @@ -57,6 +83,11 @@ export type HamokConstructorConfig = RaftEngineConfig & HamokConfig & {
* automatically stopping notifications for explicitly postponed requests.
*/
ongoingRequestsSendingPeriodInMs: number;

/**
* Optional. A custom implementation of RaftLogs to store log entries.
*/
raftLogs?: RaftLogs,
}

/**
Expand Down Expand Up @@ -265,10 +296,10 @@ export type HamokEventMap = {
commit: [commitIndex: number, message: HamokMessage],
heartbeat: [],
error: [error: Error],
'hello-notification': [remotePeerId: string, request: {
customData: string,
callback: (response: string) => void,
} | undefined],
// 'hello-notification': [remotePeerId: string, request: {
// customData: string,
// callback: (response: string) => void,
// } | undefined],
'no-heartbeat-from': [remotePeerId: string],
}

Expand Down Expand Up @@ -812,7 +843,7 @@ export class Hamok extends EventEmitter<HamokEventMap> {
switch (message.type) {
case HamokMessageType.RAFT_APPEND_ENTRIES_REQUEST_CHUNK:
case HamokMessageType.RAFT_APPEND_ENTRIES_RESPONSE:
this._acceptAppendRequestResponse(message);
this._acceptKeepAliveHamokMessage(message);
break;
}
this.raft.transport.receive(message);
Expand Down Expand Up @@ -842,8 +873,67 @@ export class Hamok extends EventEmitter<HamokEventMap> {
}
}

public async fetchRemotePeers(options?: { customRequest?: string, timeoutInMs?: number }): Promise<HamokFetchRemotePeersResponse> {
const helloMsg = this._codec.encodeHelloNotification(new HelloNotification(this.localPeerId, this.raft.leaderId));
public async join(params: HamokJoinProcessParams): Promise<void> {
const { remotePeers, customResponses } = await this.fetchRemotePeers(
params.fetchRemotePeerTimeoutInMs,
params?.requestSnapshot ? 'snapshot' : undefined
);
let bestSnapshot: HamokSnapshot | undefined;

if (params.requestSnapshot) {
for (const serializedSnapshot of customResponses ?? []) {
try {
const snapshot = JSON.parse(serializedSnapshot) as HamokSnapshot;

if (!bestSnapshot) bestSnapshot = snapshot;

if (bestSnapshot.term < snapshot.term || bestSnapshot.commitIndex < snapshot.commitIndex) {
bestSnapshot = snapshot;
}
} catch (err) {
logger.error('Failed to parse snapshot %o', err);
}
}
}

if (bestSnapshot) {
try {
this.import(bestSnapshot);
} catch (err) {
logger.error('Failed to import snapshot %o', err);
}
}

if (params.removeRemotePeersOnNoHeartbeat) {
const noHeartbeatListener = (remotePeerId: string) => this.removeRemotePeerId(remotePeerId);

this.once('stopped', () => {
this.off('no-heartbeat-from', noHeartbeatListener);
});
this.on('no-heartbeat-from', noHeartbeatListener);
}

for (const remotePeerId of remotePeers) {
// this._remoteHeartbeats
this._addNoHeartbeatTimer(remotePeerId);
this.addRemotePeerId(remotePeerId);

const message = this._codec.encodeJoinNotification(new JoinNotification(this.localPeerId, remotePeerId));

// this will trigger the remote endpoint to add this endpoint
this._emitMessage(message, remotePeerId);
}
if (params.startAfterJoin) {
this.start();
}
}

public async fetchRemotePeers(timeout?: number, customRequest?: HamokHelloNotificationCustomRequestType): Promise<HamokFetchRemotePeersResponse> {
const helloMsg = this._codec.encodeHelloNotification(new HelloNotification(
this.localPeerId,
this.raft.leaderId,
customRequest
));

return new Promise((resolve) => {
const remotePeerIds = new Set<string>();
Expand All @@ -865,7 +955,7 @@ export class Hamok extends EventEmitter<HamokEventMap> {
remotePeers: [ ...remotePeerIds ],
customResponses: 0 < customResponses.length ? customResponses : undefined,
});
}, options?.timeoutInMs ?? 3000);
}, timeout ?? 5000);

this._remoteStateRequest = {
timer,
Expand All @@ -880,36 +970,37 @@ export class Hamok extends EventEmitter<HamokEventMap> {
switch (message.type) {
case HamokMessageType.HELLO_NOTIFICATION: {
const hello = this._codec.decodeHelloNotification(message);
const customRequest = hello.customData;
let replying: Promise<string | undefined> | undefined;

if (customRequest) {
replying = new Promise((resolve) => {
if (!this.emit('hello-notification', hello.sourcePeerId, {
customData: customRequest,
callback: (response) => resolve(response),
})) {
logger.warn('%s Received hello notification with custom data but no listener is registered %o', this.localPeerId, hello);
resolve(undefined);
}
});
} else this.emit('hello-notification', hello.sourcePeerId, undefined);

(replying ?? Promise.resolve(undefined)).then((customResponse) => {
const notification = this._codec.encodeEndpointStateNotification(new EndpointStatesNotification(
this.localPeerId,
hello.sourcePeerId,
this.raft.props.currentTerm,
this.raft.logs.commitIndex,
this.leader ? this.raft.logs.nextIndex : -1,
this.raft.logs.size,
this.raft.remotePeers,
customResponse
));

this.emit('message', notification);
});

let customResponse: string | undefined;

switch (hello.customData as HamokHelloNotificationCustomRequestType) {
case 'snapshot': {
customResponse = JSON.stringify(this.export());
}
}

const notification = this._codec.encodeEndpointStateNotification(new EndpointStatesNotification(
this.localPeerId,
hello.sourcePeerId,
this.raft.props.currentTerm,
this.raft.logs.commitIndex,
this.leader ? this.raft.logs.nextIndex : -1,
this.raft.logs.size,
this.raft.remotePeers,
customResponse
));

this.emit('message', notification);
break;
}
case HamokMessageType.JOIN_NOTIFICATION: {
const notification = this._codec.decodeJoinNotification(message);

if (notification.sourcePeerId === this.localPeerId) {
logger.warn('%s Received join notification from itself %o', this.localPeerId, notification);
break;
}

this.addRemotePeerId(notification.sourcePeerId);
break;
}
case HamokMessageType.ENDPOINT_STATES_NOTIFICATION: {
Expand Down Expand Up @@ -1043,15 +1134,21 @@ export class Hamok extends EventEmitter<HamokEventMap> {
}
}

private _acceptAppendRequestResponse(message: HamokMessage) {
private _acceptKeepAliveHamokMessage(message: HamokMessage) {
if (!message.sourceId || message.sourceId === this.localPeerId) return;
const remotePeerId = message.sourceId;

clearTimeout(this._remoteHeartbeats.get(message.sourceId));
this._addNoHeartbeatTimer(remotePeerId);
}

private _addNoHeartbeatTimer(remotePeerId: string) {
clearTimeout(this._remoteHeartbeats.get(remotePeerId));

this._remoteHeartbeats.set(remotePeerId, setTimeout(() => {
const timer = setTimeout(() => {
this._remoteHeartbeats.delete(remotePeerId);
this.emit('no-heartbeat-from', remotePeerId);
}, this.raft.config.followerMaxIdleInMs));
}, this.raft.config.followerMaxIdleInMs);

this._remoteHeartbeats.set(remotePeerId, timer);
}
}
27 changes: 27 additions & 0 deletions src/messages/HamokGridCodec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,13 @@ import { OngoingRequestsNotification } from './messagetypes/OngoingRequests';
import * as Collections from '../common/Collections';
import { EndpointStatesNotification } from './messagetypes/EndpointNotification';
import { HelloNotification } from './messagetypes/HelloNotification';
import { JoinNotification } from './messagetypes/JoinNotification';

const logger = createLogger('GridCodec');

type Input =
HelloNotification |
JoinNotification |
EndpointStatesNotification |
OngoingRequestsNotification |
StorageSyncRequest |
Expand Down Expand Up @@ -41,6 +43,8 @@ export class HamokGridCodec implements HamokCodec<Input, HamokMessage> {
switch (input.constructor) {
case HelloNotification:
return this.encodeHelloNotification(input as HelloNotification);
case JoinNotification:
return this.encodeJoinNotification(input as JoinNotification);
case EndpointStatesNotification:
return this.encodeEndpointStateNotification(input as EndpointStatesNotification);
case OngoingRequestsNotification:
Expand All @@ -58,6 +62,8 @@ export class HamokGridCodec implements HamokCodec<Input, HamokMessage> {
switch (message.type) {
case MessageType.HELLO_NOTIFICATION:
return this.decodeHelloNotification(message);
case MessageType.JOIN_NOTIFICATION:
return this.decodeJoinNotification(message);
case MessageType.ENDPOINT_STATES_NOTIFICATION:
return this.decodeEndpointStateNotification(message);
case MessageType.ONGOING_REQUESTS_NOTIFICATION:
Expand Down Expand Up @@ -97,6 +103,27 @@ export class HamokGridCodec implements HamokCodec<Input, HamokMessage> {
);
}

public encodeJoinNotification(notification: JoinNotification): HamokMessage {
return new HamokMessage({
// eslint-disable-next-line camelcase
protocol: HamokMessageProtocol.GRID_COMMUNICATION_PROTOCOL,
type: MessageType.JOIN_NOTIFICATION,
sourceId: notification.sourcePeerId,
destinationId: notification.destinationPeerId,
});
}

public decodeJoinNotification(message: HamokMessage): JoinNotification {
if (message.type !== MessageType.JOIN_NOTIFICATION) {
throw new Error('decodeJoinNotification(): Message type must be JOIN_NOTIFICATION');
}

return new JoinNotification(
message.sourceId!,
message.destinationId!,
);
}

public encodeEndpointStateNotification(notification: EndpointStatesNotification): HamokMessage {
const activeEndpointIds = setToArray<string>(notification.activeEndpointIds);

Expand Down
10 changes: 10 additions & 0 deletions src/messages/HamokMessage.ts
Original file line number Diff line number Diff line change
Expand Up @@ -231,6 +231,15 @@ export enum HamokMessage_MessageType {
*/
ONGOING_REQUESTS_NOTIFICATION = 3,

/**
* *
* Join notification is sent by a new endpoint to every other endpoint
* in order to join the grid
*
* @generated from enum value: JOIN_NOTIFICATION = 4;
*/
JOIN_NOTIFICATION = 4,

/**
* *
* Raft Vote request is sent by a raccoon made itself a candidate
Expand Down Expand Up @@ -467,6 +476,7 @@ proto2.util.setEnumType(HamokMessage_MessageType, "io.github.hamok.dev.schema.Ha
{ no: 1, name: "HELLO_NOTIFICATION" },
{ no: 2, name: "ENDPOINT_STATES_NOTIFICATION" },
{ no: 3, name: "ONGOING_REQUESTS_NOTIFICATION" },
{ no: 4, name: "JOIN_NOTIFICATION" },
{ no: 12, name: "RAFT_VOTE_REQUEST" },
{ no: 13, name: "RAFT_VOTE_RESPONSE" },
{ no: 16, name: "RAFT_APPEND_ENTRIES_REQUEST_CHUNK" },
Expand Down
8 changes: 8 additions & 0 deletions src/messages/messagetypes/JoinNotification.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
export class JoinNotification {

public constructor(
public readonly sourcePeerId: string,
public readonly destinationPeerId: string,
) {
}
}

0 comments on commit 99119fd

Please sign in to comment.