Skip to content

Commit

Permalink
save
Browse files Browse the repository at this point in the history
  • Loading branch information
balazskreith committed Nov 17, 2024
1 parent efb13ba commit 33bd77e
Show file tree
Hide file tree
Showing 7 changed files with 194 additions and 5 deletions.
3 changes: 2 additions & 1 deletion examples/.gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,5 @@ purgatory/
node_modules
lib/
package-lock.json
# yarn.lock
# yarn.lock
_mything.ts
4 changes: 3 additions & 1 deletion examples/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
"main": "main.js",
"scripts": {
"format": "prettier --write \"src/**/*.ts\"",
"dev:my": "nodemon -x ts-node src/_mything.ts | pino-pretty",
"dev:run-all": "nodemon -x ts-node src/run-all.ts | pino-pretty",
"dev:readme": "nodemon -x ts-node src/common-readme-example.ts | pino-pretty",
"dev:map:1": "nodemon -x ts-node src/map-insert-get-example.ts | pino-pretty",
Expand All @@ -17,6 +18,7 @@
"dev:record:4": "nodemon -x ts-node src/record-update-if.ts | pino-pretty",
"dev:emitter:1": "nodemon -x ts-node src/emitter-example.ts | pino-pretty",
"dev:emitter:2": "nodemon -x ts-node src/emitter-catchup-example.ts | pino-pretty",
"dev:emitter:3": "nodemon -x ts-node src/emitter-catchup-2-example.ts | pino-pretty",
"dev:election:1": "nodemon -x ts-node src/reelection-example.ts | pino-pretty",
"dev:queue:1": "nodemon -x ts-node src/queue-events-example.ts | pino-pretty",
"dev:queue:2": "nodemon -x ts-node src/queue-push-pop-example.ts | pino-pretty",
Expand Down Expand Up @@ -58,7 +60,7 @@
"dependencies": {
"pino": "^9.3.2",
"ioredis": "^5.4.1",
"hamok": "file:../"
"hamok": "2.6.1-76ef244.0"
},
"devDependencies": {
"@types/events": "^3.0.0",
Expand Down
182 changes: 182 additions & 0 deletions examples/src/emitter-catchup-2-example.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,182 @@
import { Hamok, HamokConfig, HamokEmitter, setHamokLogLevel } from 'hamok';
import * as pino from 'pino';
import { HamokMessageHub } from './utils/HamokMessageHub';

const logger = pino.pino({
name: 'emitter-example',
level: 'debug',
});

type ExampleEventMap = {
'simple-request': [requestId: string, {
'param-1': number,
}],
'complex-request': [requestId: string, {
'param-1': number,
'param-2': string,
}],
'response': [requestId: string, payload?: unknown, error?: string],
}

type PendingRequest<T> = {
requestId: string,
timer: ReturnType<typeof setTimeout>,
resolve: (value: T) => void,
reject: (reason: string) => void,
}

function createResponseHandler(hamok: Hamok<{requests: Map<string, PendingRequest<any>>}>) {
return (requestId: string, payload?: unknown, error?: string) => {
const pendingRequest = hamok.appData.requests.get(requestId);

logger.debug('Response received by server (%s). requestId: %s, payload: %o, error: %o. do we have this request?: %s', hamok.localPeerId, requestId, payload, error, Boolean(pendingRequest));

if (pendingRequest) {
clearTimeout(pendingRequest.timer);
hamok.appData.requests.delete(requestId);
if (error) {
pendingRequest.reject(error);
} else {
pendingRequest.resolve(payload);
}
}
}
}

function createRequest<K extends keyof ExampleEventMap>(requests: Map<string, PendingRequest<any>>, emitter: HamokEmitter<ExampleEventMap>, event: K, payload: ExampleEventMap[K][1]): Promise<number> {
const requestId = Math.random().toString(36).substring(7);

return new Promise((resolve, reject) => {
const timer = setTimeout(() => {
reject('Request timeout. event: ' + event + ', payload: ' + JSON.stringify(payload));
}, 5000);
requests.set(requestId, { requestId, timer, resolve, reject });

logger.debug('Request sent by server (%s). requestId: %s, event: %s, payload: %o', emitter.connection.localPeerId, requestId, event, payload);

emitter.notify(event, requestId as any, payload as any);
});
}



export async function run() {

const server_1 = new Hamok<{ requests: Map<string, PendingRequest<any>> }>({
peerId: 'server_1',
appData: {
requests: new Map(),
},
onlyFollower: true,
});
const server_2 = new Hamok<{ requests: Map<string, PendingRequest<any>> }>({
peerId: 'server_2',
appData: {
requests: new Map(),
},
});
const server_3 = new Hamok<{ requests: Map<string, PendingRequest<any>> }>({
peerId: 'server_3',
appData: {
requests: new Map(),
},
onlyFollower: true,
});
const messageHub = new HamokMessageHub();
messageHub.add(server_1, server_2, server_3);

await Promise.all([
server_1.join(),
server_2.join(),
server_3.join(),
]);

logger.info('Servers are joined');

const emitter_1 = server_1.createEmitter<ExampleEventMap>({
emitterId: 'my-distributed-emitter',
});
const emitter_2 = server_2.createEmitter<ExampleEventMap>({
emitterId: 'my-distributed-emitter',
});
const emitter_3 = server_3.createEmitter<ExampleEventMap>({
emitterId: 'my-distributed-emitter',
});

emitter_1.subscriptions.on('added', (event, peerId, metaData) => {
logger.debug('On server_1 (%s) peer %s subscribed to event %s, metaData: %o. peers on event: %o', server_1.localPeerId, peerId, event, metaData, [...(emitter_1.subscriptions.getEventPeersMap(event) ?? [])]);
});
emitter_2.subscriptions.on('added', (event, peerId, metaData) => {
logger.debug('On server_2 (%s) peer %s subscribed to event %s, metaData: %o. peers on event: %o', server_2.localPeerId, peerId, event, metaData, [...(emitter_2.subscriptions.getEventPeersMap(event) ?? [])]);
});
emitter_3.subscriptions.on('added', (event, peerId, metaData) => {
logger.debug('On server_3 (%s) peer %s subscribed to event %s, metaData: %o. peers on event: %o', server_3.localPeerId, peerId, event, metaData, [...(emitter_3.subscriptions.getEventPeersMap(event) ?? [])]);
});
emitter_1.subscriptions.on('removed', (event, peerId, metaData) => {
logger.debug('On server_1 (%s) peer %s unsubscribed from event %s, metaData: %o, peers on event: %o', server_1.localPeerId, peerId, event, metaData, [...(emitter_1.subscriptions.getEventPeersMap(event) ?? [])]);
});
emitter_2.subscriptions.on('removed', (event, peerId, metaData) => {
logger.debug('On server_2 (%s) peer %s unsubscribed from event %s, metaData: %o, peers on event: %o', server_2.localPeerId, peerId, event, metaData, [...(emitter_2.subscriptions.getEventPeersMap(event) ?? [])]);
});
emitter_3.subscriptions.on('removed', (event, peerId, metaData) => {
logger.debug('On server_3 (%s) peer %s unsubscribed from event %s, metaData: %o, peers on event: %o', server_3.localPeerId, peerId, event, metaData, [...(emitter_3.subscriptions.getEventPeersMap(event) ?? [])]);
});


await emitter_1.subscribe('simple-request', (requestId, { 'param-1': param1 }) => {
logger.debug('Simple request received by server_1: %s, %s, %s', requestId, param1);

emitter_1.notify('response', requestId, 10);
});

await emitter_2.subscribe('complex-request', (requestId, { 'param-1': param1, 'param-2': param2 }) => {
logger.debug('Complex request received by server_2: %s, %s, %s', requestId, param1, param2);

if (10 < param1) {
emitter_2.notify('response', requestId, 20);
}
});

await emitter_1.subscribe('complex-request', (requestId, { 'param-1': param1, 'param-2': param2 }) => {
logger.debug('Complex request received by server_1: %s, %s, %s', requestId, param1, param2);

if (param1 < 10) {
emitter_1.notify('response', requestId, 10);
}
});

await emitter_1.subscribe('response', createResponseHandler(server_1));
await emitter_2.subscribe('response', createResponseHandler(server_2));
await emitter_3.subscribe('response', createResponseHandler(server_3));

await emitter_1.ready;
await emitter_2.ready;
await emitter_3.ready;

const request_1 = createRequest(server_1.appData.requests, emitter_1, 'simple-request', { 'param-1': 5 })
const request_2 = createRequest(server_2.appData.requests, emitter_2, 'complex-request', { 'param-1': 9, 'param-2': 'test' })
const request_3 = createRequest(server_2.appData.requests, emitter_2, 'complex-request', { 'param-1': 20, 'param-2': 'test' })

logger.info('Response for server : %d', await request_1);
logger.info('Response for server : %d', await request_2);
logger.info('Response for server : %d', await request_3);

messageHub.remove(server_1);

logger.info('We wait 10s')
await new Promise(resolve => setTimeout(resolve, 10000));

logger.info('We wait 10s')
await new Promise(resolve => setTimeout(resolve, 10000));

logger.info('we also add some listeners to server_2 to trigger the expiration of logs');

server_1.close();
server_2.close();
}

if (require.main === module) {
logger.info('Running from module file');
setHamokLogLevel('info');
run();
}
6 changes: 4 additions & 2 deletions examples/yarn.lock
Original file line number Diff line number Diff line change
Expand Up @@ -1713,8 +1713,10 @@ graphemer@^1.4.0:
resolved "https://registry.npmjs.org/graphemer/-/graphemer-1.4.0.tgz"
integrity sha512-EtKwoO6kxCL9WO5xipiHTZlSzBm7WLT627TqC/uVRd0HKmq8NXyebnNYxDoBi7wt8eTWrUrKXCOVaFq9x1kgag==

"hamok@file:..":
version "2.2.0"
[email protected]:
version "2.6.1-76ef244.0"
resolved "https://registry.yarnpkg.com/hamok/-/hamok-2.6.1-76ef244.0.tgz#f19644f20597ff689f6197d2ed2800e16bf3ed39"
integrity sha512-MxLcrnBQLVDr5P03dbo/vCyTozDOJSDQ4yL6W2YVJuDgFrwRpksiydHQGcxzJoDSlMjXVCwqI41x6bqVnZ4lSg==
dependencies:
"@bufbuild/protobuf" "^1.10.0"
pino "^9.3.2"
Expand Down
2 changes: 1 addition & 1 deletion src/Hamok.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1148,7 +1148,7 @@ export class Hamok<AppData extends Record<string, unknown> = Record<string, unkn
return logger.trace('%s Received endpoint state notification from itself %o', this.localPeerId, endpointStateNotification);
}

logger.warn('%s Received endpoint state notification %o, activeEndpointIds: %s', this.localPeerId, endpointStateNotification, [ ...(endpointStateNotification.activeEndpointIds ?? []) ].join(', '));
logger.debug('%s Received endpoint state notification %o, activeEndpointIds: %s', this.localPeerId, endpointStateNotification, [ ...(endpointStateNotification.activeEndpointIds ?? []) ].join(', '));

for (const peerId of this.remotePeerIds) {
logger.trace('%s Remote peer %s is in the active endpoints', this.localPeerId, peerId);
Expand Down
1 change: 1 addition & 0 deletions src/collections/HamokEmitter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,7 @@ export class HamokEmitter<T extends HamokEmitterEventMap, M extends Record<strin
const removedPeerIds = this.subscriptions.getAllPeerIds();

for (const remotePeerId of this.connection.grid.remotePeerIds) {
if (remotePeerId === this.connection.grid.localPeerId) continue;
if (removedPeerIds.has(remotePeerId)) removedPeerIds.delete(remotePeerId);
}
if (0 < removedPeerIds.size) {
Expand Down
1 change: 1 addition & 0 deletions src/messages/StorageCodec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -832,6 +832,7 @@ export class StorageCodec<K, V> implements HamokCodec<Input<K, V>, Message> {
requestId: request.requestId,
keys,
values,
// prevValue: request.prevValue !== undefined ? this.valueCodec.encode(request.prevValue) : undefined,
prevValue: request.prevValue !== undefined ? this.valueCodec.encode(request.prevValue) : undefined,
});
}
Expand Down

0 comments on commit 33bd77e

Please sign in to comment.