Skip to content

Commit

Permalink
2.6.0 (#16)
Browse files Browse the repository at this point in the history
### 2.6.0

 - Update `HamokEmitter` to have metaData property bind to subscriber peers per events.
  * Add Subscriptions to `HamokEmitter` to track the subscribers of an event.
  * Add `HamokEmitterStats` to track the number of events emitted and received by the emitter.
 - Simplifying discovery and joining methods in `Hamok`.
  • Loading branch information
balazskreith authored Dec 3, 2024
1 parent 79e5fc5 commit 1d3aa2f
Show file tree
Hide file tree
Showing 30 changed files with 1,337 additions and 487 deletions.
9 changes: 8 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -41,4 +41,11 @@ A ready version to use (hopefully)

- Fixed a bug where the leader would not send a `StorageState` notification to a follower when the follower was behind the leader.
- Fixed bug of remaining remote peer not part of the grid due to follower ENDPOINT_STATE_NOTIFICATION contained a wrong endpoint.
- Changing follower behavior when falling out of the grid. Instead of trying to collect endpoints, it periodically sends JOIN_NOTIFICATION until a leader is not elected for the endpoint
- Changing follower behavior when falling out of the grid. Instead of trying to collect endpoints, it periodically sends JOIN_NOTIFICATION until a leader is not elected for the endpoint

### 2.6.0

- Update `HamokEmitter` to have metaData property bind to subscriber peers per events.
* Add Subscriptions to `HamokEmitter` to track the subscribers of an event.
* Add `HamokEmitterStats` to track the number of events emitted and received by the emitter.
- Simplifying discovery and joining methods in `Hamok`.
7 changes: 6 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ yarn add hamok
- [HamokEmitter](#hamokemitter)
- [HamokRecord](#hamokrecord)
- [User Manual](#user-manual)
- [NPM library](#npm-library)
- [Important Notes](#important-notes)
- [Contributing](#contributing)
- [License](#license)
Expand Down Expand Up @@ -129,7 +130,11 @@ HamokRecord is a feature that provides distributed storage for individual record

## User Manual

You can find detailed user manuals [here](https://balazskreith.github.io/hamok-ts/)
You can find detailed user manuals [here](https://balazskreith.github.io/hamok-ts/).

## NPM library

The Hamok library is available on [NPM](https://www.npmjs.com/package/hamok).

## Contributing

Expand Down
70 changes: 70 additions & 0 deletions docs/emitter.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
- [Properties](#properties)
- [Events](#events)
- [Methods](#methods)
- [Subscriptions](#subscriptions)
- [Examples](#examples)
- [FAQ](#faq)

Expand Down Expand Up @@ -131,6 +132,75 @@ emitter.unsubscribe("event", (data) => {
emitter.close();
```

### Subscriptions

Peers subscribe to events using the `subscribe` method. When an event is published, all subscribed peers receive the event.
You can observe the subscriptions by accessing the `subscriptions` property of the emitter.

```typescript
// Handle a peer subscribing to an event
for (const [event, peers] of emitter.subscriptions.entries()) {
console.log(`Event: ${event}`, [ ...peers.entries() ]);
}
```

Additionally you can listen to the `add-peer` and `remove-peer` events to observe the subscriptions.

```typescript
// Handle a peer subscribing to an event
emitter.subscriptions.on('add-peer', (event, peerId) => {
console.log(`Peer ${peerId} subscribed to event ${event}.`);
});

// Handle a peer unsubscribing from an event
emitter.subscriptions.on('remove-peer', (event, peerId) => {
console.log(`Peer ${peerId} unsubscribed from event ${event}.`);
});
```

#### Add Metadata to subscription

Peers can subscribe to an event with metadata.

```typescript
type SubscriptionMetaData = {
userId: string;
timestamp: Date; // Time when the peer subscribed
}

// Create the emitter with metadata for subscriptions
const emitter = hamok.createEmitter<MyEventMap, SubscriptionMetaData>({
emitterId: "exampleEmitter",
});

// Handle a peer subscribing to an event
emitter.subscriptions.on('add-peer', (event, peerId, metaData) => {
console.log(`Peer ${peerId} subscribed to event ${event}.`);
console.log(`User ID: ${metaData?.userId}`);
console.log(`Subscription Timestamp: ${metaData.timestamp}`);
});

// Handle a peer unsubscribing from an event
emitter.subscriptions.on('remove-peer', (event, peerId, metaData) => {
console.log(`Peer ${peerId} unsubscribed from event ${event}.`);
console.log(`User ID: ${metaData?.userId}`);
console.log(`Unsubscription Timestamp: ${new Date().toISOString()}`);
});

```

#### Update Metadata for subscription

Peers can update the metadata for an existing subscription.

```typescript
// Update the metadata for a subscription
emitter.subscriptions.updateSubscriptionMetaData('myEvent', 'peerId', {
userId
}, prevMetaData);
```


## Examples

- [simple distributed emitter](https://github.com/balazskreith/hamok-ts/blob/main/examples/src/emitter-example.ts)
Expand Down
3 changes: 2 additions & 1 deletion examples/.gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,5 @@ purgatory/
node_modules
lib/
package-lock.json
# yarn.lock
# yarn.lock
_mything.ts
6 changes: 5 additions & 1 deletion examples/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
"main": "main.js",
"scripts": {
"format": "prettier --write \"src/**/*.ts\"",
"dev:my": "nodemon -x ts-node src/_mything.ts | pino-pretty",
"dev:run-all": "nodemon -x ts-node src/run-all.ts | pino-pretty",
"dev:readme": "nodemon -x ts-node src/common-readme-example.ts | pino-pretty",
"dev:map:1": "nodemon -x ts-node src/map-insert-get-example.ts | pino-pretty",
Expand All @@ -14,8 +15,10 @@
"dev:record:1": "nodemon -x ts-node src/record-insert-get-example.ts | pino-pretty",
"dev:record:2": "nodemon -x ts-node src/record-events-example.ts | pino-pretty",
"dev:record:3": "nodemon -x ts-node src/record-dynamic-creating-example.ts | pino-pretty",
"dev:record:4": "nodemon -x ts-node src/record-update-if.ts | pino-pretty",
"dev:emitter:1": "nodemon -x ts-node src/emitter-example.ts | pino-pretty",
"dev:emitter:2": "nodemon -x ts-node src/emitter-catchup-example.ts | pino-pretty",
"dev:emitter:3": "nodemon -x ts-node src/emitter-catchup-2-example.ts | pino-pretty",
"dev:election:1": "nodemon -x ts-node src/reelection-example.ts | pino-pretty",
"dev:queue:1": "nodemon -x ts-node src/queue-events-example.ts | pino-pretty",
"dev:queue:2": "nodemon -x ts-node src/queue-push-pop-example.ts | pino-pretty",
Expand All @@ -25,6 +28,7 @@
"dev:common:3": "nodemon -x ts-node src/common-join-leave-rejoin-example.ts | pino-pretty",
"dev:common:4": "nodemon -x ts-node src/common-waiting-example.ts | pino-pretty",
"dev:common:5": "nodemon -x ts-node src/common-join-leave-rejoin-example-2.ts | pino-pretty",
"dev:common:6": "nodemon -x ts-node src/common-rollout-example.ts | pino-pretty",
"dev:redis:1": "nodemon -x ts-node src/redis-remote-map-example.ts | pino-pretty",
"dev:redis:2": "nodemon -x ts-node src/redis-dynamic-record-example.ts | pino-pretty",
"dev:redis:3": "nodemon -x ts-node src/redis-job-executing-example.ts | pino-pretty",
Expand Down Expand Up @@ -57,7 +61,7 @@
"dependencies": {
"pino": "^9.3.2",
"ioredis": "^5.4.1",
"hamok": "file:../"
"hamok": "2.6.1-7ab70c1.0"
},
"devDependencies": {
"@types/events": "^3.0.0",
Expand Down
78 changes: 78 additions & 0 deletions examples/src/common-3-peers.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@


import { Hamok, setHamokLogLevel } from 'hamok';
import * as pino from 'pino';
import { HamokMessageHub } from './utils/HamokMessageHub';

const logger = pino.pino({
name: 'common-join-example-2',
level: 'debug',
});

export async function run() {

const servers = new Map<string, Hamok>();
const messageHub = new HamokMessageHub();
const addServer = (server: Hamok) => {
const leaderChangedListener = () => {
logger.debug('Server %s, State: %s remotePeers: %s', server.localPeerId, server.raft.state.stateName, [...server.remotePeerIds].join(', '));
}
server.once('close', () => {
servers.delete(server.localPeerId);
messageHub.remove(server);
server.off('leader-changed', leaderChangedListener);
})
servers.set(server.localPeerId, server);
messageHub.add(server);
server.on('leader-changed', leaderChangedListener);
}
addServer(new Hamok());
addServer(new Hamok());
addServer(new Hamok());

await Promise.all([...servers.values()].map(server => server.join()));



for (let i = 0; i < 10; ++i) {
const newServer = new Hamok();
const oldServer = servers.values().next().value;
addServer(newServer);
// by having the communication channel we assume we can inquery remote endpoints


const timer = setInterval(() => {
const messages: string[] = [];
for (const server of servers.values()) {
messages.push(`server (${server.localPeerId}, state: ${server.state}) remotePeers are ${[...server.remotePeerIds].join(', ')}`);
}
logger.debug('iteration: %d\n, %s', i, messages.join('\n'));
}, 1000)


await newServer.join();

const messages: string[] = [];
for (const server of servers.values()) {
messages.push(`server (${server.localPeerId}, state: ${server.state}) remotePeers are ${[...server.remotePeerIds].join(', ')}`);
}
logger.debug('iteration: %d\n, %s', i, messages.join('\n'));

oldServer.close();

clearInterval(timer);
}


logger.info('Close');

for (const server of servers.values()) {
server.close();
}
}

if (require.main === module) {
logger.info('Running from module file');
setHamokLogLevel('debug');
run();
}
75 changes: 39 additions & 36 deletions examples/src/common-join-leave-rejoin-example-2.ts
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@

import { Hamok, setHamokLogLevel } from 'hamok';
import * as pino from 'pino';
import { HamokMessageHub } from './utils/HamokMessageHub';

const logger = pino.pino({
name: 'common-join-example-2',
Expand All @@ -39,61 +40,63 @@ const logger = pino.pino({

export async function run() {

const server_1 = new Hamok({
onlyFollower: true,
});
logger.info('server 1 is %s', server_1.localPeerId);
const server1Acceptor = server_1.accept.bind(server_1);
const servers = new Map<string, Hamok>();
const messageHub = new HamokMessageHub();
const addServer = (server: Hamok) => {
const leaderChangedListener = () => {
logger.debug('Server %s, State: %s remotePeers: %s', server.localPeerId, server.raft.state.stateName, [...server.remotePeerIds].join(', '));
}
server.once('close', () => {
servers.delete(server.localPeerId);
messageHub.remove(server);
server.off('leader-changed', leaderChangedListener);
})
servers.set(server.localPeerId, server);
messageHub.add(server);
server.on('leader-changed', leaderChangedListener);
}
addServer(new Hamok());
addServer(new Hamok());

let server_1_joined = false;
await Promise.all([...servers.values()].map(server => server.join()));

for (let i = 0; i < 10; ++i) {
const server_2 = new Hamok();
const newServer = new Hamok();
const oldServer = servers.values().next().value;
addServer(newServer);
// by having the communication channel we assume we can inquery remote endpoints
logger.info('server 2 is %s', server_2.localPeerId);

const server2Acceptor = server_2.accept.bind(server_2);

server_1.on('message', server2Acceptor);
server_2.on('message', server1Acceptor);

const timer = setInterval(() => {
logger.debug('\
\niteration: %d, \
\nserver_1 (%s, state: %s) remotePeers are %s, \
\nserver_2 (%s, state: %s) remotePeers are %s',
i,
server_1.localPeerId,
server_1.state,
[...server_1.remotePeerIds].join(', '),
server_2.localPeerId,
server_2.state,
[...server_2.remotePeerIds].join(', '),
);
const messages: string[] = [];
for (const server of servers.values()) {
messages.push(`server (${server.localPeerId}, state: ${server.state}) remotePeers are ${[...server.remotePeerIds].join(', ')}`);
}
logger.debug('iteration: %d\n, %s', i, messages.join('\n'));
}, 1000)


await Promise.all([
server_1_joined ? Promise.resolve() : server_1.join(),
server_2.join(),
]);

logger.info('Server 1 and Server 2 joined');
await newServer.join();

server_2.close();
const messages: string[] = [];
for (const server of servers.values()) {
messages.push(`server (${server.localPeerId}, state: ${server.state}) remotePeers are ${[...server.remotePeerIds].join(', ')}`);
}
logger.debug('iteration: %d\n, %s', i, messages.join('\n'));

await new Promise(resolve => setTimeout(resolve, 10000));

server_1.off('message', server2Acceptor);
server_2.off('message', server1Acceptor);
oldServer.close();

server_1_joined = true;
server_1.raft.config.onlyFollower = false;
clearInterval(timer);
}


logger.info('Close');

server_1.close();
for (const server of servers.values()) {
server.close();
}
}

if (require.main === module) {
Expand Down
4 changes: 4 additions & 0 deletions examples/src/common-join-leave-rejoin-example.ts
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,8 @@ export async function run() {

logger.info('We remove server1Acceptor from server_2 and server_3 and see if rejoin event is triggered');

logger.info('Waiting for rejoining event');

await Promise.all([
new Promise<void>(resolve => server_1.once('rejoining', () => (logger.info('Server_1 rejoin'), resolve()))),
Promise.resolve(server_2.off('message', server1Acceptor)),
Expand All @@ -115,6 +117,8 @@ export async function run() {

logger.info('We add server1Acceptor to server_2 and server_3 and see if joined event is triggered');

logger.info('Waiting for joined event');

await Promise.all([
new Promise<void>(resolve => server_1.once('joined', () => (logger.info('Server_1 joined'), resolve()))),
Promise.resolve(server_2.on('message', server1Acceptor)),
Expand Down
Loading

0 comments on commit 1d3aa2f

Please sign in to comment.