From 4f659d3115a8d5094486b577e65b66dbc537d216 Mon Sep 17 00:00:00 2001 From: Liran Cohen Date: Tue, 13 Feb 2024 11:12:18 -0500 Subject: [PATCH] update comments --- src/connection/connection-manager.ts | 31 ++++++++++++++++++-- src/connection/socket-connection.ts | 20 ++++++++++++- src/http-api.ts | 2 ++ src/json-rpc-handlers/subscriptions/close.ts | 6 ++++ src/lib/json-rpc-router.ts | 5 +++- src/subscription-manager.ts | 5 ++++ src/ws-api.ts | 2 +- 7 files changed, 65 insertions(+), 6 deletions(-) diff --git a/src/connection/connection-manager.ts b/src/connection/connection-manager.ts index 6f3388a..931315d 100644 --- a/src/connection/connection-manager.ts +++ b/src/connection/connection-manager.ts @@ -1,15 +1,28 @@ import type { Dwn } from "@tbd54566975/dwn-sdk-js"; +import type { IncomingMessage } from "http"; import type { WebSocket } from 'ws'; import { SocketConnection } from "./socket-connection.js"; import { InMemorySubscriptionManager } from "../subscription-manager.js"; +/** + * Interface for managing `WebSocket` connections as they arrive. + */ export interface ConnectionManager { - connect(socket: WebSocket): Promise; - close(): Promise + /** connect handler used for the `WebSockets` `'connection'` event. */ + connect(socket: WebSocket, request?: IncomingMessage): Promise; + /** cleans up handlers associated with the `WebSocket` connection */ + close(socket:WebSocket): Promise; + /** closes all of the connections */ + closeAll(): Promise } +/** + * A Simple In Memory ConnectionManager implementation. + * It uses a `Map` to manage connections. + * It uses am `InMemorySubscriptionManager` for individual subscription management within the connection. + */ export class InMemoryConnectionManager implements ConnectionManager { constructor(private dwn: Dwn, private connections: Map = new Map()) {} @@ -27,7 +40,19 @@ export class InMemoryConnectionManager implements ConnectionManager { }); } - async close(): Promise { + /** + * Handler for closing websocket event - `close`. + */ + async close(socket: WebSocket): Promise { + const connection = this.connections.get(socket); + this.connections.delete(socket); + await connection.close(); + } + + /** + * Closes all associated connections. + */ + async closeAll(): Promise { const closePromises = []; this.connections.forEach(connection => closePromises.push(connection.close())); await Promise.all(closePromises); diff --git a/src/connection/socket-connection.ts b/src/connection/socket-connection.ts index fabcab1..d45d554 100644 --- a/src/connection/socket-connection.ts +++ b/src/connection/socket-connection.ts @@ -1,9 +1,9 @@ import type { Dwn, GenericMessage } from "@tbd54566975/dwn-sdk-js"; import { DwnMethodName } from "@tbd54566975/dwn-sdk-js"; +import type { WebSocket } from "ws"; import log from 'loglevel'; import { v4 as uuidv4 } from 'uuid'; -import type { WebSocket } from "ws"; import type { RequestContext } from "../lib/json-rpc-router.js"; import type { SubscriptionManager } from "../subscription-manager.js"; @@ -16,6 +16,9 @@ import { JsonRpcErrorCodes, createJsonRpcErrorResponse, createJsonRpcSuccessResp const SOCKET_ISALIVE_SYMBOL = Symbol('isAlive'); const HEARTBEAT_INTERVAL = 30_000; +/** + * SocketConnection class sets up a socket connection along with a `ping/pong` heartbeat. + */ export class SocketConnection { private heartbeatInterval: NodeJS.Timer; constructor( @@ -74,6 +77,9 @@ export class SocketConnection { } } + /** + * Handles a `JSON RPC 2.0` encoded message. + */ private async message(dataBuffer: Buffer): Promise { const requestData = dataBuffer.toString(); if (!requestData) { @@ -110,10 +116,17 @@ export class SocketConnection { this.send(jsonRpcResponse); } + /** + * Sends a JSON encoded Buffer through the Websocket. + */ private send(response: JsonRpcResponse | JsonRpcErrorResponse): void { this.socket.send(Buffer.from(JSON.stringify(response)), this.error.bind(this)); } + /** + * Subscription Handler used to build the context for a `JSON RPC` API call. + * Wraps the incoming `message` in a `JSON RPC Success Response` using the origin subscription`JSON RPC Id` to send through the WebSocket. + */ private subscriptionHandler(id: JsonRpcId): (message: GenericMessage) => void { return (message) => { const response = createJsonRpcSuccessResponse(id, { reply: { @@ -123,6 +136,11 @@ export class SocketConnection { } } + /** + * Builds a `RequestContext` object to use with the `JSON RPC API`. + * + * Adds a `subscriptionHandler` for `Subscribe` messages. + */ private async buildRequestContext(request: JsonRpcRequest): Promise { const { id, params, method} = request; const requestContext: RequestContext = { diff --git a/src/http-api.ts b/src/http-api.ts index 93128be..ff0c1a2 100644 --- a/src/http-api.ts +++ b/src/http-api.ts @@ -31,6 +31,8 @@ export class HttpApi { dwn: Dwn; constructor(config: DwnServerConfig, dwn: Dwn, registrationManager?: RegistrationManager) { + console.log(config); + this.#config = config; this.#api = express(); this.#server = http.createServer(this.#api); diff --git a/src/json-rpc-handlers/subscriptions/close.ts b/src/json-rpc-handlers/subscriptions/close.ts index 76862f8..c38c386 100644 --- a/src/json-rpc-handlers/subscriptions/close.ts +++ b/src/json-rpc-handlers/subscriptions/close.ts @@ -13,6 +13,12 @@ import { JsonRpcErrorCodes, } from '../../lib/json-rpc.js'; +/** + * Closes a subscription for a given `target` and `subscriptionId` within a given connection's `SubscriptionManager`. + * @param dwnRequest must include the `target` and `subscriptionId` within the `params`. + * @param context must include the `subscriptionManager` for the associated connection. + * + */ export const handleSubscriptionsClose: JsonRpcHandler = async ( dwnRequest, context, diff --git a/src/lib/json-rpc-router.ts b/src/lib/json-rpc-router.ts index f92368c..8065a15 100644 --- a/src/lib/json-rpc-router.ts +++ b/src/lib/json-rpc-router.ts @@ -8,9 +8,12 @@ import type { JsonRpcRequest, JsonRpcResponse } from './json-rpc.js'; export type RequestContext = { transport: 'http' | 'ws'; dwn: Dwn; + /** The `SubscriptionManager` associated with a subscription request, only used in `ws` requests */ subscriptionManager?: SubscriptionManager; - dataStream?: Readable; + /** The `MessageSubscriptionHandler` associated with a subscription request, only used in `ws` requests */ subscriptionHandler?: MessageSubscriptionHandler; + /** The `Readable` stream associated with a `RecordsWrite` request only used in `ws` requests */ + dataStream?: Readable; }; export type HandlerResponse = { diff --git a/src/subscription-manager.ts b/src/subscription-manager.ts index de87dec..ac18d2b 100644 --- a/src/subscription-manager.ts +++ b/src/subscription-manager.ts @@ -11,8 +11,13 @@ export interface SubscriptionManager { closeAll: () => Promise; } +/** + * Simple InMemory implementation of a `SubscriptionManager`. + * Uses `Map` to manage internal state. + */ export class InMemorySubscriptionManager implements SubscriptionManager { constructor(private subscriptions: Map> = new Map()){}; + async subscribe(target: string, subscription: MessageSubscription): Promise { let targetSubscriptions = this.subscriptions.get(target); if (targetSubscriptions === undefined) { diff --git a/src/ws-api.ts b/src/ws-api.ts index 25bcc8b..57e807c 100644 --- a/src/ws-api.ts +++ b/src/ws-api.ts @@ -48,6 +48,6 @@ export class WsApi { async close(): Promise { this.#wsServer.close(); - await this.#connections.close(); + await this.#connections.closeAll(); } }