diff --git a/src/connection/socket-connection.ts b/src/connection/socket-connection.ts index 6613ff1..de51d56 100644 --- a/src/connection/socket-connection.ts +++ b/src/connection/socket-connection.ts @@ -16,26 +16,27 @@ import { DwnServerError, DwnServerErrorCode } from "../dwn-error.js"; const SOCKET_ISALIVE_SYMBOL = Symbol('isAlive'); const HEARTBEAT_INTERVAL = 30_000; -export interface Subscription { +export interface JsonRPCSubscription { + /** JSON RPC Id of the Subscription Request */ id: JsonRpcId; close: () => Promise; } /** - * SocketConnection class sets up a socket connection along with a `ping/pong` heartbeat. + * SocketConnection handles a WebSocket connection to a DWN using JSON RPC. */ export class SocketConnection { private heartbeatInterval: NodeJS.Timer; - private subscriptions: Map = new Map(); + private subscriptions: Map = new Map(); constructor( private socket: WebSocket, private dwn: Dwn ){ + socket.on('message', this.message.bind(this)); socket.on('close', this.close.bind(this)); - socket.on('pong', this.pong.bind(this)); socket.on('error', this.error.bind(this)); - socket.on('message', this.message.bind(this)); + socket.on('pong', this.pong.bind(this)); // Sometimes connections between client <-> server can get borked in such a way that // leaves both unaware of the borkage. ping messages can be used as a means to verify @@ -52,7 +53,11 @@ export class SocketConnection { }, HEARTBEAT_INTERVAL); } - async subscribe(subscription: Subscription): Promise { + /** + * Adds a reference for the JSON RPC Subscription to this connection. + * Used for cleanup if the connection is closed. + */ + async subscribe(subscription: JsonRPCSubscription): Promise { if (this.subscriptions.has(subscription.id)) { throw new DwnServerError( DwnServerErrorCode.ConnectionSubscriptionJsonRPCIdExists, @@ -63,6 +68,11 @@ export class SocketConnection { this.subscriptions.set(subscription.id, subscription); } + /** + * Closes and removes the reference for a given subscription from this connection. + * + * @param id the `JsonRpcId` of the JSON RPC subscription request. + */ async closeSubscription(id: JsonRpcId): Promise { if (!this.subscriptions.has(id)) { throw new DwnServerError( @@ -71,6 +81,8 @@ export class SocketConnection { ) } + const connection = this.subscriptions.get(id); + await connection.close(); this.subscriptions.delete(id); } @@ -83,8 +95,9 @@ export class SocketConnection { this.socket.removeAllListeners(); const closePromises = []; - for (const [_target, subscription] of this.subscriptions) { + for (const [id, subscription] of this.subscriptions) { closePromises.push(subscription.close()); + this.subscriptions.delete(id); } // close all of the associated subscriptions @@ -158,7 +171,7 @@ export class SocketConnection { /** * Subscription Handler used to build the context for a `JSON RPC` API call. - * Wraps the incoming `message` in a `JSON RPC Success Response` using the origin subscription`JSON RPC Id` to send through the WebSocket. + * Wraps the incoming `message` in a `JSON RPC Success Response` using the original subscription`JSON RPC Id` to send through the WebSocket. */ private subscriptionHandler(id: JsonRpcId): (message: GenericMessage) => void { return (message) => { diff --git a/src/dwn-server.ts b/src/dwn-server.ts index b2502e9..90de633 100644 --- a/src/dwn-server.ts +++ b/src/dwn-server.ts @@ -65,7 +65,7 @@ export class DwnServer { let eventStream: EventStream | undefined; if (this.config.webSocketServerEnabled) { // setting `EventEmitterStream` as default the default `EventStream - // if an alternate implementation is needed instantiate a `Dwn` with a custom `EventStream` and add it to server options. + // if an alternate implementation is needed, instantiate a `Dwn` with a custom `EventStream` and add it to server options. eventStream = new EventEmitterStream(); } diff --git a/src/json-rpc-api.ts b/src/json-rpc-api.ts index 4bcedff..76e971e 100644 --- a/src/json-rpc-api.ts +++ b/src/json-rpc-api.ts @@ -6,4 +6,4 @@ import { handleSubscriptionsClose } from './json-rpc-handlers/subscriptions/inde export const jsonRpcApi = new JsonRpcRouter(); jsonRpcApi.on('dwn.processMessage', handleDwnProcessMessage); -jsonRpcApi.on('subscriptions.close', handleSubscriptionsClose); +jsonRpcApi.on('subscription.close', handleSubscriptionsClose); diff --git a/src/json-rpc-handlers/dwn/process-message.ts b/src/json-rpc-handlers/dwn/process-message.ts index 7ea296e..8380800 100644 --- a/src/json-rpc-handlers/dwn/process-message.ts +++ b/src/json-rpc-handlers/dwn/process-message.ts @@ -4,9 +4,6 @@ import { DwnInterfaceName, DwnMethodName } from '@tbd54566975/dwn-sdk-js'; import type { Readable as IsomorphicReadable } from 'readable-stream'; import { v4 as uuidv4 } from 'uuid'; -import type { - JsonRpcErrorResponse, -} from '../../lib/json-rpc.js'; import type { HandlerResponse, JsonRpcHandler, @@ -28,8 +25,8 @@ export const handleDwnProcessMessage: JsonRpcHandler = async ( const requestId = dwnRequest.id ?? uuidv4(); try { - - // RecordsWrite is only supported on 'http' + // RecordsWrite is only supported on 'http' to support data stream for large data + // TODO: https://github.com/TBD54566975/dwn-server/issues/108 if ( transport !== 'http' && message.descriptor.interface === DwnInterfaceName.Records && @@ -71,30 +68,26 @@ export const handleDwnProcessMessage: JsonRpcHandler = async ( if (subscription !== undefined) { const { close } = subscription; try { - await socketConnection.subscribe({ - id: requestId, - close, - }) + // adding a reference to the close function for this subscription request to the connection. + // this will facilitate closing the subscription later. + await socketConnection.subscribe({ id: requestId, close }); + delete reply.subscription.close // not serializable via JSON } catch(error) { - let errorResponse: JsonRpcErrorResponse; + // close the subscription upon receiving an error here + await close(); if (error.code === DwnServerErrorCode.ConnectionSubscriptionJsonRPCIdExists) { // a subscription with this request id already exists - errorResponse = createJsonRpcErrorResponse( + const errorResponse = createJsonRpcErrorResponse( requestId, JsonRpcErrorCodes.BadRequest, `the request id ${requestId} already has an active subscription` ); + return { jsonRpcResponse: errorResponse }; } else { // will catch as an unknown error below throw new Error('unknown error adding subscription'); } - - // close the subscription that was just opened and return an error - await close(); - return { jsonRpcResponse: errorResponse }; } - - delete reply.subscription.close // not serializable via JSON } const jsonRpcResponse = createJsonRpcSuccessResponse(requestId, { reply }); diff --git a/src/json-rpc-handlers/subscriptions/close.ts b/src/json-rpc-handlers/subscriptions/close.ts index b926447..0f8f889 100644 --- a/src/json-rpc-handlers/subscriptions/close.ts +++ b/src/json-rpc-handlers/subscriptions/close.ts @@ -14,9 +14,9 @@ import { } from '../../lib/json-rpc.js'; /** - * Closes a subscription for a given `id` for a given `SocketConnection` + * Closes a subscription tied to a specific `SocketConnection`. * - * @param dwnRequest must include the `id` of the subscription to close within the `params`. + * @param dwnRequest must include JsonRpcId of the subscription request within the `params`. * @param context must include the associated `SocketConnection`. * */ @@ -30,6 +30,7 @@ export const handleSubscriptionsClose: JsonRpcHandler = async ( let jsonRpcResponse:JsonRpcResponse; try { + // closing the subscription and cleaning up the reference within the given connection. await socketConnection.closeSubscription(id); jsonRpcResponse = createJsonRpcSuccessResponse(requestId, { reply: { status: 200, detail: 'Accepted' } }); } catch(error) { diff --git a/tests/utils.ts b/tests/utils.ts index 0df40ff..60b586f 100644 --- a/tests/utils.ts +++ b/tests/utils.ts @@ -235,7 +235,7 @@ export async function subscriptionRequest( const closeSubscription = async (id: JsonRpcId, connection: JSONRPCSocket): Promise => { const requestId = uuidv4(); - const request = createJsonRpcRequest(requestId, 'subscriptions.close', { id }); + const request = createJsonRpcRequest(requestId, 'subscription.close', { id }); return await connection.request(request); }