diff --git a/src/connection/socket-connection.ts b/src/connection/socket-connection.ts index 6613ff1..903c70a 100644 --- a/src/connection/socket-connection.ts +++ b/src/connection/socket-connection.ts @@ -16,26 +16,28 @@ import { DwnServerError, DwnServerErrorCode } from "../dwn-error.js"; const SOCKET_ISALIVE_SYMBOL = Symbol('isAlive'); const HEARTBEAT_INTERVAL = 30_000; -export interface Subscription { +export interface JsonRPCSubscription { + /** JSON RPC Id of the Subscription Request */ id: JsonRpcId; close: () => Promise; } /** - * SocketConnection class sets up a socket connection along with a `ping/pong` heartbeat. + * SocketConnection handles a WebSocket connection to a DWN using JSON RPC. + * It also manages references to the long running RPC subscriptions for the connection. */ export class SocketConnection { private heartbeatInterval: NodeJS.Timer; - private subscriptions: Map = new Map(); + private subscriptions: Map = new Map(); constructor( private socket: WebSocket, private dwn: Dwn ){ + socket.on('message', this.message.bind(this)); socket.on('close', this.close.bind(this)); - socket.on('pong', this.pong.bind(this)); socket.on('error', this.error.bind(this)); - socket.on('message', this.message.bind(this)); + socket.on('pong', this.pong.bind(this)); // Sometimes connections between client <-> server can get borked in such a way that // leaves both unaware of the borkage. ping messages can be used as a means to verify @@ -52,7 +54,11 @@ export class SocketConnection { }, HEARTBEAT_INTERVAL); } - async subscribe(subscription: Subscription): Promise { + /** + * Adds a reference for the JSON RPC Subscription to this connection. + * Used for cleanup if the connection is closed. + */ + async subscribe(subscription: JsonRPCSubscription): Promise { if (this.subscriptions.has(subscription.id)) { throw new DwnServerError( DwnServerErrorCode.ConnectionSubscriptionJsonRPCIdExists, @@ -63,6 +69,11 @@ export class SocketConnection { this.subscriptions.set(subscription.id, subscription); } + /** + * Closes and removes the reference for a given subscription from this connection. + * + * @param id the `JsonRpcId` of the JSON RPC subscription request. + */ async closeSubscription(id: JsonRpcId): Promise { if (!this.subscriptions.has(id)) { throw new DwnServerError( @@ -71,6 +82,8 @@ export class SocketConnection { ) } + const connection = this.subscriptions.get(id); + await connection.close(); this.subscriptions.delete(id); } @@ -83,8 +96,9 @@ export class SocketConnection { this.socket.removeAllListeners(); const closePromises = []; - for (const [_target, subscription] of this.subscriptions) { + for (const [id, subscription] of this.subscriptions) { closePromises.push(subscription.close()); + this.subscriptions.delete(id); } // close all of the associated subscriptions @@ -158,7 +172,7 @@ export class SocketConnection { /** * Subscription Handler used to build the context for a `JSON RPC` API call. - * Wraps the incoming `message` in a `JSON RPC Success Response` using the origin subscription`JSON RPC Id` to send through the WebSocket. + * Wraps the incoming `message` in a `JSON RPC Success Response` using the original subscription`JSON RPC Id` to send through the WebSocket. */ private subscriptionHandler(id: JsonRpcId): (message: GenericMessage) => void { return (message) => { diff --git a/src/dwn-server.ts b/src/dwn-server.ts index b2502e9..90de633 100644 --- a/src/dwn-server.ts +++ b/src/dwn-server.ts @@ -65,7 +65,7 @@ export class DwnServer { let eventStream: EventStream | undefined; if (this.config.webSocketServerEnabled) { // setting `EventEmitterStream` as default the default `EventStream - // if an alternate implementation is needed instantiate a `Dwn` with a custom `EventStream` and add it to server options. + // if an alternate implementation is needed, instantiate a `Dwn` with a custom `EventStream` and add it to server options. eventStream = new EventEmitterStream(); } diff --git a/src/json-rpc-api.ts b/src/json-rpc-api.ts index 4bcedff..76e971e 100644 --- a/src/json-rpc-api.ts +++ b/src/json-rpc-api.ts @@ -6,4 +6,4 @@ import { handleSubscriptionsClose } from './json-rpc-handlers/subscriptions/inde export const jsonRpcApi = new JsonRpcRouter(); jsonRpcApi.on('dwn.processMessage', handleDwnProcessMessage); -jsonRpcApi.on('subscriptions.close', handleSubscriptionsClose); +jsonRpcApi.on('subscription.close', handleSubscriptionsClose); diff --git a/src/json-rpc-handlers/dwn/process-message.ts b/src/json-rpc-handlers/dwn/process-message.ts index 7ea296e..8380800 100644 --- a/src/json-rpc-handlers/dwn/process-message.ts +++ b/src/json-rpc-handlers/dwn/process-message.ts @@ -4,9 +4,6 @@ import { DwnInterfaceName, DwnMethodName } from '@tbd54566975/dwn-sdk-js'; import type { Readable as IsomorphicReadable } from 'readable-stream'; import { v4 as uuidv4 } from 'uuid'; -import type { - JsonRpcErrorResponse, -} from '../../lib/json-rpc.js'; import type { HandlerResponse, JsonRpcHandler, @@ -28,8 +25,8 @@ export const handleDwnProcessMessage: JsonRpcHandler = async ( const requestId = dwnRequest.id ?? uuidv4(); try { - - // RecordsWrite is only supported on 'http' + // RecordsWrite is only supported on 'http' to support data stream for large data + // TODO: https://github.com/TBD54566975/dwn-server/issues/108 if ( transport !== 'http' && message.descriptor.interface === DwnInterfaceName.Records && @@ -71,30 +68,26 @@ export const handleDwnProcessMessage: JsonRpcHandler = async ( if (subscription !== undefined) { const { close } = subscription; try { - await socketConnection.subscribe({ - id: requestId, - close, - }) + // adding a reference to the close function for this subscription request to the connection. + // this will facilitate closing the subscription later. + await socketConnection.subscribe({ id: requestId, close }); + delete reply.subscription.close // not serializable via JSON } catch(error) { - let errorResponse: JsonRpcErrorResponse; + // close the subscription upon receiving an error here + await close(); if (error.code === DwnServerErrorCode.ConnectionSubscriptionJsonRPCIdExists) { // a subscription with this request id already exists - errorResponse = createJsonRpcErrorResponse( + const errorResponse = createJsonRpcErrorResponse( requestId, JsonRpcErrorCodes.BadRequest, `the request id ${requestId} already has an active subscription` ); + return { jsonRpcResponse: errorResponse }; } else { // will catch as an unknown error below throw new Error('unknown error adding subscription'); } - - // close the subscription that was just opened and return an error - await close(); - return { jsonRpcResponse: errorResponse }; } - - delete reply.subscription.close // not serializable via JSON } const jsonRpcResponse = createJsonRpcSuccessResponse(requestId, { reply }); diff --git a/src/json-rpc-handlers/subscriptions/close.ts b/src/json-rpc-handlers/subscriptions/close.ts index b926447..0f8f889 100644 --- a/src/json-rpc-handlers/subscriptions/close.ts +++ b/src/json-rpc-handlers/subscriptions/close.ts @@ -14,9 +14,9 @@ import { } from '../../lib/json-rpc.js'; /** - * Closes a subscription for a given `id` for a given `SocketConnection` + * Closes a subscription tied to a specific `SocketConnection`. * - * @param dwnRequest must include the `id` of the subscription to close within the `params`. + * @param dwnRequest must include JsonRpcId of the subscription request within the `params`. * @param context must include the associated `SocketConnection`. * */ @@ -30,6 +30,7 @@ export const handleSubscriptionsClose: JsonRpcHandler = async ( let jsonRpcResponse:JsonRpcResponse; try { + // closing the subscription and cleaning up the reference within the given connection. await socketConnection.closeSubscription(id); jsonRpcResponse = createJsonRpcSuccessResponse(requestId, { reply: { status: 200, detail: 'Accepted' } }); } catch(error) { diff --git a/src/json-rpc-socket.ts b/src/json-rpc-socket.ts index 0622b37..df4cebe 100644 --- a/src/json-rpc-socket.ts +++ b/src/json-rpc-socket.ts @@ -8,9 +8,11 @@ import type { JsonRpcRequest, JsonRpcResponse } from "./lib/json-rpc.js"; const CONNECT_TIMEOUT = 3_000; const RESPONSE_TIMEOUT = 30_000; -export type JSONRPCSocketOptions = { +export interface JSONRPCSocketOptions { connectTimeout?: number; responseTimeout?: number; + onclose?: () => void; + onerror?: (error?: any) => void; } /** @@ -20,17 +22,21 @@ export class JSONRPCSocket { private constructor(private socket: WebSocket, private responseTimeout: number) {} static async connect(url: string, options: JSONRPCSocketOptions = {}): Promise { - const { connectTimeout = CONNECT_TIMEOUT, responseTimeout = RESPONSE_TIMEOUT } = options; + const { connectTimeout = CONNECT_TIMEOUT, responseTimeout = RESPONSE_TIMEOUT, onclose, onerror } = options; - const onclose = ():void => { - log.info(`JSON RPC Socket close ${url}`); - }; + const socket = new WebSocket(url); + if (onclose === undefined) { + socket.onclose = ():void => { + log.info(`JSON RPC Socket close ${url}`); + } + } - const onerror = (event: any):void => { - log.error(`JSON RPC Socket error ${url}`, event); - }; + if (onerror === undefined) { + socket.onerror = (error?: any):void => { + log.error(`JSON RPC Socket error ${url}`, error); + } + } - const socket = new WebSocket(url); socket.onclose = onclose; socket.onerror = onerror; diff --git a/tests/utils.ts b/tests/utils.ts index 0df40ff..60b586f 100644 --- a/tests/utils.ts +++ b/tests/utils.ts @@ -235,7 +235,7 @@ export async function subscriptionRequest( const closeSubscription = async (id: JsonRpcId, connection: JSONRPCSocket): Promise => { const requestId = uuidv4(); - const request = createJsonRpcRequest(requestId, 'subscriptions.close', { id }); + const request = createJsonRpcRequest(requestId, 'subscription.close', { id }); return await connection.request(request); }