Skip to content

Commit

Permalink
event dispatch pattern, connection status (#29)
Browse files Browse the repository at this point in the history
  • Loading branch information
jackyzha0 authored Dec 12, 2023
1 parent 37c58d0 commit 12760d6
Show file tree
Hide file tree
Showing 8 changed files with 118 additions and 39 deletions.
4 changes: 2 additions & 2 deletions __tests__/fixtures/cleanup.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,9 @@ export async function ensureTransportIsClean(t: Transport<Connection>) {
`transport ${t.clientId} should not have open connections after the test`,
).toStrictEqual(new Map());
expect(
t.messageHandlers,
t.eventDispatcher.numberOfListeners('message'),
`transport ${t.clientId} should not have open message handlers after the test`,
).toStrictEqual(new Set());
).equal(0);
}

export async function waitUntil<T>(
Expand Down
42 changes: 30 additions & 12 deletions __tests__/invariants.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -86,16 +86,22 @@ describe('procedures should leave no trace after finishing', async () => {
const server = await createServer(serverTransport, serviceDefs);
const client = createClient<typeof server>(clientTransport);

let serverListeners = serverTransport.messageHandlers.size;
let clientListeners = clientTransport.messageHandlers.size;
let serverListeners =
serverTransport.eventDispatcher.numberOfListeners('message');
let clientListeners =
clientTransport.eventDispatcher.numberOfListeners('message');

// start procedure
await client.test.add.rpc({ n: 3 });
// end procedure

// number of message handlers shouldn't increase after rpc
expect(serverTransport.messageHandlers.size).toEqual(serverListeners);
expect(clientTransport.messageHandlers.size).toEqual(clientListeners);
expect(
serverTransport.eventDispatcher.numberOfListeners('message'),
).toEqual(serverListeners);
expect(
clientTransport.eventDispatcher.numberOfListeners('message'),
).toEqual(clientListeners);

// check number of connections
expect(serverTransport.connections.size).toEqual(1);
Expand All @@ -113,8 +119,10 @@ describe('procedures should leave no trace after finishing', async () => {
const server = await createServer(serverTransport, serviceDefs);
const client = createClient<typeof server>(clientTransport);

let serverListeners = serverTransport.messageHandlers.size;
let clientListeners = clientTransport.messageHandlers.size;
let serverListeners =
serverTransport.eventDispatcher.numberOfListeners('message');
let clientListeners =
clientTransport.eventDispatcher.numberOfListeners('message');

// start procedure
const [input, output, close] = await client.test.echo.stream();
Expand All @@ -138,8 +146,12 @@ describe('procedures should leave no trace after finishing', async () => {
// end procedure

// number of message handlers shouldn't increase after stream ends
expect(serverTransport.messageHandlers.size).toEqual(serverListeners);
expect(clientTransport.messageHandlers.size).toEqual(clientListeners);
expect(
serverTransport.eventDispatcher.numberOfListeners('message'),
).toEqual(serverListeners);
expect(
clientTransport.eventDispatcher.numberOfListeners('message'),
).toEqual(clientListeners);

// check number of connections
expect(serverTransport.connections.size).toEqual(1);
Expand All @@ -157,8 +169,10 @@ describe('procedures should leave no trace after finishing', async () => {
const server = await createServer(serverTransport, serviceDefs);
const client = createClient<typeof server>(clientTransport);

let serverListeners = serverTransport.messageHandlers.size;
let clientListeners = clientTransport.messageHandlers.size;
let serverListeners =
serverTransport.eventDispatcher.numberOfListeners('message');
let clientListeners =
clientTransport.eventDispatcher.numberOfListeners('message');

// start procedure
const [subscription, close] = await client.test.value.subscribe({});
Expand All @@ -177,8 +191,12 @@ describe('procedures should leave no trace after finishing', async () => {
// end procedure

// number of message handlers shouldn't increase after stream ends
expect(serverTransport.messageHandlers.size).toEqual(serverListeners);
expect(clientTransport.messageHandlers.size).toEqual(clientListeners);
expect(
serverTransport.eventDispatcher.numberOfListeners('message'),
).toEqual(serverListeners);
expect(
clientTransport.eventDispatcher.numberOfListeners('message'),
).toEqual(clientListeners);

// check number of connections
expect(serverTransport.connections.size).toEqual(1);
Expand Down
8 changes: 4 additions & 4 deletions router/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,7 @@ export const createClient = <Srv extends Server<Record<string, AnyService>>>(
}
};

transport.addMessageListener(listener);
transport.addEventListener('message', listener);
const closeHandler = () => {
inputStream.end();
outputStream.end();
Expand All @@ -200,7 +200,7 @@ export const createClient = <Srv extends Server<Record<string, AnyService>>>(
streamId,
),
);
transport.removeMessageListener(listener);
transport.removeEventListener('message', listener);
};

return [inputStream, outputStream, closeHandler];
Expand Down Expand Up @@ -243,7 +243,7 @@ export const createClient = <Srv extends Server<Record<string, AnyService>>>(
}
};

transport.addMessageListener(listener);
transport.addEventListener('message', listener);
const closeHandler = () => {
outputStream.end();
transport.send(
Expand All @@ -255,7 +255,7 @@ export const createClient = <Srv extends Server<Record<string, AnyService>>>(
streamId,
),
);
transport.removeMessageListener(listener);
transport.removeEventListener('message', listener);
};

return [outputStream, closeHandler];
Expand Down
4 changes: 2 additions & 2 deletions router/server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -244,12 +244,12 @@ export async function createServer<Services extends Record<string, AnyService>>(
}
};

transport.addMessageListener(handler);
transport.addEventListener('message', handler);
return {
services,
streams: streamMap,
async close() {
transport.removeMessageListener(handler);
transport.removeEventListener('message', handler);
for (const streamIdx of streamMap.keys()) {
await cleanupStream(streamIdx);
}
Expand Down
45 changes: 45 additions & 0 deletions transport/events.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
import { OpaqueTransportMessage } from './message';
import { Connection } from './transport';

export interface EventMap {
message: OpaqueTransportMessage;
connectionStatus: {
status: 'connect' | 'disconnect';
conn: Connection;
};
}

export type EventTypes = keyof EventMap;
export type EventHandler<K extends EventTypes> = (event: EventMap[K]) => void;

export class EventDispatcher<T extends EventTypes> {
private eventListeners: { [K in T]?: Set<EventHandler<K>> } = {};

numberOfListeners<K extends T>(eventType: K) {
return this.eventListeners[eventType]?.size ?? 0;
}

addEventListener<K extends T>(eventType: K, handler: EventHandler<K>) {
if (!this.eventListeners[eventType]) {
this.eventListeners[eventType] = new Set();
}

this.eventListeners[eventType]?.add(handler);
}

removeEventListener<K extends T>(eventType: K, handler: EventHandler<K>) {
const handlers = this.eventListeners[eventType];
if (handlers) {
this.eventListeners[eventType]?.delete(handler);
}
}

dispatchEvent<K extends T>(eventType: K, event: EventMap[K]) {
const handlers = this.eventListeners[eventType];
if (handlers) {
for (const handler of handlers) {
handler(event);
}
}
}
}
4 changes: 2 additions & 2 deletions transport/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -33,12 +33,12 @@ export async function waitForMessage(
function onMessage(msg: OpaqueTransportMessage) {
if (!filter || filter?.(msg)) {
resolve(msg.payload);
t.removeMessageListener(onMessage);
t.removeEventListener('message', onMessage);
} else if (rejectMismatch) {
reject(new Error('message didnt match the filter'));
}
}

t.addMessageListener(onMessage);
t.addEventListener('message', onMessage);
});
}
46 changes: 31 additions & 15 deletions transport/transport.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import {
reply,
} from './message';
import { log } from '../logging';
import { EventDispatcher, EventHandler, EventTypes } from './events';

/**
* A 1:1 connection between two transports. Once this is created,
Expand Down Expand Up @@ -88,11 +89,6 @@ export abstract class Transport<ConnType extends Connection> {
*/
clientId: TransportClientId;

/**
* The set of message handlers registered with this transport.
*/
messageHandlers: Set<(msg: OpaqueTransportMessage) => void>;

/**
* An array of message IDs that are waiting to be sent over the WebSocket connection.
* This builds up if the WebSocket is down for a period of time.
Expand All @@ -109,13 +105,18 @@ export abstract class Transport<ConnType extends Connection> {
*/
connections: Map<TransportClientId, ConnType>;

/**
* The event dispatcher for handling events of type EventTypes.
*/
eventDispatcher: EventDispatcher<EventTypes>;

/**
* Creates a new Transport instance.
* @param codec The codec used to encode and decode messages.
* @param clientId The client ID of this transport.
*/
constructor(codec: Codec, clientId: TransportClientId) {
this.messageHandlers = new Set();
this.eventDispatcher = new EventDispatcher();
this.sendBuffer = new Map();
this.sendQueue = new Map();
this.connections = new Map();
Expand Down Expand Up @@ -146,6 +147,11 @@ export abstract class Transport<ConnType extends Connection> {
log?.info(`${this.clientId} -- new connection to ${conn.connectedTo}`);
this.connections.set(conn.connectedTo, conn);

this.eventDispatcher.dispatchEvent('connectionStatus', {
status: 'connect',
conn,
});

// send outstanding
const outstanding = this.sendQueue.get(conn.connectedTo);
if (!outstanding) {
Expand Down Expand Up @@ -175,6 +181,10 @@ export abstract class Transport<ConnType extends Connection> {
log?.info(`${this.clientId} -- disconnect from ${conn.connectedTo}`);
conn.close();
this.connections.delete(conn.connectedTo);
this.eventDispatcher.dispatchEvent('connectionStatus', {
status: 'disconnect',
conn,
});
}

/**
Expand Down Expand Up @@ -232,9 +242,7 @@ export abstract class Transport<ConnType extends Connection> {
return;
}

for (const handler of this.messageHandlers) {
handler(msg);
}
this.eventDispatcher.dispatchEvent('message', msg);

if (!isAck(msg.controlFlags)) {
const ackMsg = reply(msg, { ack: msg.id });
Expand All @@ -247,19 +255,27 @@ export abstract class Transport<ConnType extends Connection> {
}

/**
* Adds a message listener to this transport.
* Adds a listener to this transport.
* @param the type of event to listen for
* @param handler The message handler to add.
*/
addMessageListener(handler: (msg: OpaqueTransportMessage) => void): void {
this.messageHandlers.add(handler);
addEventListener<K extends EventTypes, T extends EventHandler<K>>(
type: K,
handler: T,
): void {
this.eventDispatcher.addEventListener(type, handler);
}

/**
* Removes a message listener from this transport.
* Removes a listener from this transport.
* @param the type of event to unlisten on
* @param handler The message handler to remove.
*/
removeMessageListener(handler: (msg: OpaqueTransportMessage) => void): void {
this.messageHandlers.delete(handler);
removeEventListener<K extends EventTypes, T extends EventHandler<K>>(
type: K,
handler: T,
): void {
this.eventDispatcher.removeEventListener(type, handler);
}

/**
Expand Down
4 changes: 2 additions & 2 deletions tsconfig.json
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@
"noImplicitAny": true /* Raise error on expressions and declarations with an implied 'any' type. */,
"strictNullChecks": true /* Enable strict null checks. */,
"strictFunctionTypes": true /* Enable strict checking of function types. */,
// "strictBindCallApply": true, /* Enable strict 'bind', 'call', and 'apply' methods on functions. */
// "strictPropertyInitialization": true, /* Enable strict checking of property initialization in classes. */
"strictBindCallApply": true /* Enable strict 'bind', 'call', and 'apply' methods on functions. */,
"strictPropertyInitialization": true /* Enable strict checking of property initialization in classes. */,
"noImplicitThis": true /* Raise error on 'this' expressions with an implied 'any' type. */,
"alwaysStrict": true /* Parse in strict mode and emit "use strict" for each source file. */,
/* Additional Checks */
Expand Down

0 comments on commit 12760d6

Please sign in to comment.