From 58c028af15e5d1532d722f313da6454a4ee49317 Mon Sep 17 00:00:00 2001 From: Liran Cohen Date: Wed, 24 Apr 2024 08:57:33 -0400 Subject: [PATCH] create a map of message mandlers instead of listening to the socket event `message` many times --- .../prototyping/clients/json-rpc-socket.ts | 34 +++++++++++++------ 1 file changed, 24 insertions(+), 10 deletions(-) diff --git a/packages/agent/src/prototyping/clients/json-rpc-socket.ts b/packages/agent/src/prototyping/clients/json-rpc-socket.ts index 8b93998d2..9486543d7 100644 --- a/packages/agent/src/prototyping/clients/json-rpc-socket.ts +++ b/packages/agent/src/prototyping/clients/json-rpc-socket.ts @@ -24,6 +24,8 @@ export interface JsonRpcSocketOptions { * This was done in order to avoid taking a dependency on the `dwn-server`, until a future time when there will be a `clients` package. */ export class JsonRpcSocket { + private messageHandlers: Map void> = new Map(); + private constructor(private socket: IsomorphicWebSocket, private responseTimeout: number) {} static async connect(url: string, options: JsonRpcSocketOptions = {}): Promise { @@ -49,10 +51,20 @@ export class JsonRpcSocket { return new Promise((resolve, reject) => { socket.addEventListener('open', () => { - resolve(new JsonRpcSocket(socket, responseTimeout)); + const jsonRpcSocket = new JsonRpcSocket(socket, responseTimeout); + + socket.addEventListener('message', (event: { data: any }) => { + const jsonRpcResponse = parseJson(event.data) as JsonRpcResponse; + const handler = jsonRpcSocket.messageHandlers.get(jsonRpcResponse.id); + if (handler) { + handler(event); + } + }); + + resolve(jsonRpcSocket); }); - socket.addEventListener('error', (error) => { + socket.addEventListener('error', (error: any) => { reject(error); }); @@ -75,17 +87,18 @@ export class JsonRpcSocket { const jsonRpsResponse = parseJson(event.data) as JsonRpcResponse; if (jsonRpsResponse.id === request.id) { // if the incoming response id matches the request id, we will remove the listener and resolve the response - this.socket.removeEventListener('message', handleResponse); + this.messageHandlers.delete(request.id); return resolve(jsonRpsResponse); } }; - // subscribe to the listener before sending the request - this.socket.addEventListener('message', handleResponse); + + // add the listener to the map of message handlers + this.messageHandlers.set(request.id, handleResponse); this.send(request); // reject this promise if we don't receive any response back within the timeout period setTimeout(() => { - this.socket.removeEventListener('message', handleResponse); + this.messageHandlers.delete(request.id!); reject(new Error('request timed out')); }, this.responseTimeout); }); @@ -114,23 +127,24 @@ export class JsonRpcSocket { if (jsonRpcResponse.id === subscriptionId) { if (jsonRpcResponse.error !== undefined) { // remove the event listener upon receipt of a JSON RPC Error. - this.socket.removeEventListener('message', socketEventListener); + this.messageHandlers.delete(subscriptionId); this.closeSubscription(subscriptionId); } listener(jsonRpcResponse); } }; - this.socket.addEventListener('message', socketEventListener); + + this.messageHandlers.set(subscriptionId, socketEventListener); const response = await this.request(request); if (response.error) { - this.socket.removeEventListener('message', socketEventListener); + this.messageHandlers.delete(subscriptionId); return { response }; } // clean up listener and create a `rpc.subscribe.close` message to use when closing this JSON RPC subscription const close = async (): Promise => { - this.socket.removeEventListener('message', socketEventListener); + this.messageHandlers.delete(subscriptionId); await this.closeSubscription(subscriptionId); };