diff --git a/src/config.ts b/src/config.ts index adc5de3..be83277 100644 --- a/src/config.ts +++ b/src/config.ts @@ -8,7 +8,7 @@ export const config = { // port that server listens on port: parseInt(process.env.DS_PORT || '3000'), // whether to enable 'ws:' - webSocketServerEnabled: { on: true, off: false }[process.env.DS_WEBSOCKET_SERVER] ?? true, + webSocketServerEnabled: { on: true, off: false }[process.env.DWN_WEBSOCKET_SERVER] ?? true, // where to store persistent data messageStore: process.env.DWN_STORAGE_MESSAGES || process.env.DWN_STORAGE || 'level://data', dataStore: process.env.DWN_STORAGE_DATA || process.env.DWN_STORAGE || 'level://data', diff --git a/src/connection/connection-manager.ts b/src/connection/connection-manager.ts new file mode 100644 index 0000000..6f3388a --- /dev/null +++ b/src/connection/connection-manager.ts @@ -0,0 +1,35 @@ +import type { Dwn } from "@tbd54566975/dwn-sdk-js"; + +import type { WebSocket } from 'ws'; + +import { SocketConnection } from "./socket-connection.js"; +import { InMemorySubscriptionManager } from "../subscription-manager.js"; + +export interface ConnectionManager { + connect(socket: WebSocket): Promise; + close(): Promise +} + +export class InMemoryConnectionManager implements ConnectionManager { + constructor(private dwn: Dwn, private connections: Map = new Map()) {} + + /** + * Handler for opening websocket event - `connection`. + * Sets listeners for `message`, `pong`, `close`, and `error` events. + */ + async connect(socket: WebSocket): Promise { + const connection = new SocketConnection(socket, this.dwn, new InMemorySubscriptionManager()); + this.connections.set(socket, connection); + // attach to the socket's close handler to clean up this connection. + socket.on('close', () => { + // the connection internally already cleans itself up upon a socket close event, we just ned to remove it from our set. + this.connections.delete(socket); + }); + } + + async close(): Promise { + const closePromises = []; + this.connections.forEach(connection => closePromises.push(connection.close())); + await Promise.all(closePromises); + } +} \ No newline at end of file diff --git a/src/connection/socket-connection.ts b/src/connection/socket-connection.ts new file mode 100644 index 0000000..fabcab1 --- /dev/null +++ b/src/connection/socket-connection.ts @@ -0,0 +1,143 @@ +import type { Dwn, GenericMessage } from "@tbd54566975/dwn-sdk-js"; +import { DwnMethodName } from "@tbd54566975/dwn-sdk-js"; + +import log from 'loglevel'; +import { v4 as uuidv4 } from 'uuid'; +import type { WebSocket } from "ws"; + +import type { RequestContext } from "../lib/json-rpc-router.js"; +import type { SubscriptionManager } from "../subscription-manager.js"; +import type { JsonRpcErrorResponse, JsonRpcId, JsonRpcRequest, JsonRpcResponse } from "../lib/json-rpc.js"; + +import { requestCounter } from "../metrics.js"; +import { jsonRpcApi } from "../json-rpc-api.js"; +import { JsonRpcErrorCodes, createJsonRpcErrorResponse, createJsonRpcSuccessResponse } from "../lib/json-rpc.js"; + +const SOCKET_ISALIVE_SYMBOL = Symbol('isAlive'); +const HEARTBEAT_INTERVAL = 30_000; + +export class SocketConnection { + private heartbeatInterval: NodeJS.Timer; + constructor( + private socket: WebSocket, + private dwn: Dwn, + private subscriptions: SubscriptionManager + ){ + socket.on('close', this.close.bind(this)); + socket.on('pong', this.pong.bind(this)); + socket.on('error', this.error.bind(this)); + socket.on('message', this.message.bind(this)); + + // Sometimes connections between client <-> server can get borked in such a way that + // leaves both unaware of the borkage. ping messages can be used as a means to verify + // that the remote endpoint is still responsive. Server will ping each socket every 30s + // if a pong hasn't received from a socket by the next ping, the server will terminate + // the socket connection + socket[SOCKET_ISALIVE_SYMBOL] = true; + this.heartbeatInterval = setInterval(() => { + if (this.socket[SOCKET_ISALIVE_SYMBOL] === false) { + this.close(); + } + this.socket[SOCKET_ISALIVE_SYMBOL] = false; + this.socket.ping(); + }, HEARTBEAT_INTERVAL); + } + + /** + * Closes the existing connection and cleans up any listeners or subscriptions. + */ + async close(): Promise { + clearInterval(this.heartbeatInterval); + // clean up all socket event listeners + this.socket.removeAllListeners(); + + // close all of the associated subscriptions + await this.subscriptions.closeAll(); + + // close the socket. + this.socket.close(); + } + + /** + * Pong messages are automatically sent in response to ping messages as required by + * the websocket spec. So, no need to send explicit pongs. + */ + private pong(): void { + this.socket[SOCKET_ISALIVE_SYMBOL] = true; + } + + private async error(error?:Error): Promise{ + if (error) { + log.error(`SocketConnection error, terminating connection`, error); + this.socket.terminate(); + await this.close() + } + } + + private async message(dataBuffer: Buffer): Promise { + const requestData = dataBuffer.toString(); + if (!requestData) { + return this.send(createJsonRpcErrorResponse( + uuidv4(), + JsonRpcErrorCodes.BadRequest, + 'request payload required.' + )) + } + + let jsonRequest: JsonRpcRequest; + try { + jsonRequest = JSON.parse(requestData); + } catch(error) { + const errorResponse = createJsonRpcErrorResponse( + uuidv4(), + JsonRpcErrorCodes.BadRequest, + (error as Error).message + ); + + return this.send(errorResponse); + }; + + const requestContext = await this.buildRequestContext(jsonRequest); + const { jsonRpcResponse } = await jsonRpcApi.handle(jsonRequest, requestContext); + if (jsonRpcResponse.error) { + requestCounter.inc({ method: jsonRequest.method, error: 1 }); + } else { + requestCounter.inc({ + method: jsonRequest.method, + status: jsonRpcResponse?.result?.reply?.status?.code || 0, + }); + } + this.send(jsonRpcResponse); + } + + private send(response: JsonRpcResponse | JsonRpcErrorResponse): void { + this.socket.send(Buffer.from(JSON.stringify(response)), this.error.bind(this)); + } + + private subscriptionHandler(id: JsonRpcId): (message: GenericMessage) => void { + return (message) => { + const response = createJsonRpcSuccessResponse(id, { reply: { + record : message + } }); + this.send(response); + } + } + + private async buildRequestContext(request: JsonRpcRequest): Promise { + const { id, params, method} = request; + const requestContext: RequestContext = { + transport : 'ws', + dwn : this.dwn, + subscriptionManager : this.subscriptions, + } + + if (method === 'dwn.processMessage') { + const { message } = params as { message: GenericMessage }; + if (message.descriptor.method === DwnMethodName.Subscribe) { + requestContext.subscriptionHandler = this.subscriptionHandler(id).bind(this); + } + } + + return requestContext; + } +} diff --git a/src/json-rpc-socket.ts b/src/json-rpc-socket.ts index c96f40d..0622b37 100644 --- a/src/json-rpc-socket.ts +++ b/src/json-rpc-socket.ts @@ -1,3 +1,4 @@ +import log from 'loglevel'; import { v4 as uuidv4 } from 'uuid'; import WebSocket from 'ws'; @@ -22,11 +23,11 @@ export class JSONRPCSocket { const { connectTimeout = CONNECT_TIMEOUT, responseTimeout = RESPONSE_TIMEOUT } = options; const onclose = ():void => { - console.log('json rpc close'); + log.info(`JSON RPC Socket close ${url}`); }; const onerror = (event: any):void => { - console.log('json rpc error', event); + log.error(`JSON RPC Socket error ${url}`, event); }; const socket = new WebSocket(url); diff --git a/src/lib/json-rpc-router.ts b/src/lib/json-rpc-router.ts index 3838786..f92368c 100644 --- a/src/lib/json-rpc-router.ts +++ b/src/lib/json-rpc-router.ts @@ -2,8 +2,8 @@ import type { Dwn, MessageSubscriptionHandler } from '@tbd54566975/dwn-sdk-js'; import type { Readable } from 'node:stream'; +import type { SubscriptionManager } from '../subscription-manager.js'; import type { JsonRpcRequest, JsonRpcResponse } from './json-rpc.js'; -import type { SubscriptionManager } from '../ws-api.js'; export type RequestContext = { transport: 'http' | 'ws'; diff --git a/src/subscription-manager.ts b/src/subscription-manager.ts new file mode 100644 index 0000000..de87dec --- /dev/null +++ b/src/subscription-manager.ts @@ -0,0 +1,55 @@ +import type { MessageSubscription } from "@tbd54566975/dwn-sdk-js"; + +import { DwnServerError, DwnServerErrorCode } from "./dwn-error.js"; + +/** + * SubscriptionManager manages the subscriptions related to a `SocketConnection` + */ +export interface SubscriptionManager { + subscribe: (target: string, subscription: MessageSubscription) => Promise; + close: (target: string, id: string) => Promise; + closeAll: () => Promise; +} + +export class InMemorySubscriptionManager implements SubscriptionManager { + constructor(private subscriptions: Map> = new Map()){}; + async subscribe(target: string, subscription: MessageSubscription): Promise { + let targetSubscriptions = this.subscriptions.get(target); + if (targetSubscriptions === undefined) { + targetSubscriptions = new Map(); + this.subscriptions.set(target, targetSubscriptions); + } + targetSubscriptions.set(subscription.id, subscription); + } + + async close(target: string, id: string): Promise { + const targetSubscriptions = this.subscriptions.get(target); + if (targetSubscriptions !== undefined) { + const subscription = targetSubscriptions.get(id); + if (subscription !== undefined) { + targetSubscriptions.delete(id); + await subscription.close(); + return; + } + } + + // if it reached here no subscription to close + throw new DwnServerError( + DwnServerErrorCode.SubscriptionManagerSubscriptionNotFound, + `subscription ${id} was not found` + ) + } + + async closeAll(): Promise { + const closePromises = []; + for (const [target, subscriptions] of this.subscriptions) { + this.subscriptions.delete(target); + for (const [id, subscription] of subscriptions) { + subscriptions.delete(id); + closePromises.push(subscription.close()); + } + } + + await Promise.all(closePromises); + } +} \ No newline at end of file diff --git a/src/ws-api.ts b/src/ws-api.ts index ab0cef1..25bcc8b 100644 --- a/src/ws-api.ts +++ b/src/ws-api.ts @@ -1,210 +1,24 @@ -import type { AddressInfo, WebSocket } from 'ws'; -import type { IncomingMessage, Server } from 'http'; - -import log from 'loglevel'; -import { WebSocketServer } from 'ws'; -import { v4 as uuidv4 } from 'uuid'; import type { Dwn, - GenericMessage, - MessageSubscription, -} from '@tbd54566975/dwn-sdk-js' - -import { DwnMethodName } from '@tbd54566975/dwn-sdk-js'; - -import type { RequestContext } from './lib/json-rpc-router.js'; -import type { JsonRpcErrorResponse, JsonRpcId, JsonRpcRequest, JsonRpcResponse } from './lib/json-rpc.js'; - -import { jsonRpcApi } from './json-rpc-api.js'; -import { - createJsonRpcErrorResponse, - createJsonRpcSuccessResponse, - JsonRpcErrorCodes -} from './lib/json-rpc.js'; -import { requestCounter } from './metrics.js'; -import { DwnServerError, DwnServerErrorCode } from './dwn-error.js'; - - -const SOCKET_ISALIVE_SYMBOL = Symbol('isAlive'); -const HEARTBEAT_INTERVAL = 30_000; - -export interface SubscriptionManager { - subscribe: (target: string, subscription: MessageSubscription) => Promise; - close: (target: string, id: string) => Promise; - closeAll: () => Promise; -} - -class Manager { - constructor(private subscriptions: Map> = new Map()){}; - async subscribe(target: string, subscription: MessageSubscription): Promise { - let targetSubscriptions = this.subscriptions.get(target); - if (targetSubscriptions === undefined) { - targetSubscriptions = new Map(); - this.subscriptions.set(target, targetSubscriptions); - } - targetSubscriptions.set(subscription.id, subscription); - } - - async close(target: string, id: string): Promise { - const targetSubscriptions = this.subscriptions.get(target); - if (targetSubscriptions !== undefined) { - const subscription = targetSubscriptions.get(id); - if (subscription !== undefined) { - targetSubscriptions.delete(id); - await subscription.close(); - return; - } - } - - // if it reached here no subscription to close - throw new DwnServerError( - DwnServerErrorCode.SubscriptionManagerSubscriptionNotFound, - `subscription ${id} was not found` - ) - } - - async closeAll(): Promise { - const closePromises = []; - for (const [target, subscriptions] of this.subscriptions) { - this.subscriptions.delete(target); - for (const [id, subscription] of subscriptions) { - subscriptions.delete(id); - closePromises.push(subscription.close()); - } - } - - await Promise.all(closePromises); - } -} - -export class SocketConnection { - constructor( - private socket: WebSocket, - private dwn: Dwn, - private subscriptions: SubscriptionManager = new Manager(), - ){ - socket.on('close', this.close.bind(this)); - socket.on('pong', this.pong.bind(this)); - socket.on('error', this.error.bind(this)); - socket.on('message', this.message.bind(this)); - socket[SOCKET_ISALIVE_SYMBOL] = true; - } - - get isAlive(): boolean { - return this.socket[SOCKET_ISALIVE_SYMBOL]; - } - - /** - * Closes the existing connection and cleans up any listeners or subscriptions. - */ - async close(): Promise { - // clean up all socket event listeners - this.socket.removeAllListeners(); - - // close all of the associated subscriptions - await this.subscriptions.closeAll(); - } - - ping(): void { - this.socket[SOCKET_ISALIVE_SYMBOL] = false; - this.socket.ping(); - } +} from '@tbd54566975/dwn-sdk-js'; - /** - * Pong messages are automatically sent in response to ping messages as required by - * the websocket spec. So, no need to send explicit pongs from browser - */ - private pong(): void { - this.socket[SOCKET_ISALIVE_SYMBOL] = true; - } - - private async error(error?:Error): Promise{ - if (error !== undefined) { - log.error('WebSocket', this.socket.url, error); - this.socket.terminate(); - await this.close() - } - } - - private async message(dataBuffer: Buffer): Promise { - const requestData = dataBuffer.toString(); - if (!requestData) { - return this.send(createJsonRpcErrorResponse( - uuidv4(), - JsonRpcErrorCodes.BadRequest, - 'request payload required.' - )) - } - - let jsonRequest: JsonRpcRequest; - try { - jsonRequest = JSON.parse(requestData); - } catch(error) { - const errorResponse = createJsonRpcErrorResponse( - uuidv4(), - JsonRpcErrorCodes.BadRequest, - (error as Error).message - ); - - return this.send(errorResponse); - }; - - const requestContext = await this.buildRequestContext(jsonRequest); - const { jsonRpcResponse } = await jsonRpcApi.handle(jsonRequest, requestContext); - if (jsonRpcResponse.error) { - requestCounter.inc({ method: jsonRequest.method, error: 1 }); - } else { - requestCounter.inc({ - method: jsonRequest.method, - status: jsonRpcResponse?.result?.reply?.status?.code || 0, - }); - } - - this.send(jsonRpcResponse); - } - - private send(response: JsonRpcResponse | JsonRpcErrorResponse): void { - this.socket.send(Buffer.from(JSON.stringify(response)), this.error.bind(this)); - } - - private subscriptionHandler(id: JsonRpcId): (message: GenericMessage) => void { - return (message) => { - const response = createJsonRpcSuccessResponse(id, { reply: { - record : message - } }); - this.send(response); - } - } +import type { AddressInfo } from 'ws'; +import type { Server } from 'http'; - private async buildRequestContext(request: JsonRpcRequest): Promise { - const { id, params, method} = request; - const requestContext: RequestContext = { - transport : 'ws', - dwn : this.dwn, - subscriptionManager : this.subscriptions, - } - - if (method === 'dwn.processMessage') { - const { message } = params as { message: GenericMessage }; - if (message.descriptor.method === DwnMethodName.Subscribe) { - requestContext.subscriptionHandler = this.subscriptionHandler(id).bind(this); - } - } +import { WebSocketServer } from 'ws'; - return requestContext; - } -} +import type { ConnectionManager } from './connection/connection-manager.js'; +import { InMemoryConnectionManager } from './connection/connection-manager.js'; export class WsApi { #wsServer: WebSocketServer; dwn: Dwn; + #connections: ConnectionManager - #heartbeatInterval: NodeJS.Timer | undefined; - #connections: Map = new Map(); - - constructor(server: Server, dwn: Dwn) { + constructor(server: Server, dwn: Dwn, connectionManager?: ConnectionManager) { this.dwn = dwn; + this.#connections = connectionManager || new InMemoryConnectionManager(dwn); this.#wsServer = new WebSocketServer({ server }); } @@ -216,56 +30,14 @@ export class WsApi { return this.#wsServer; } - /** - * Handler for opening websocket event - `connection`. - * Sets listeners for `message`, `pong`, `close`, and `error` events. - */ - #handleConnection(socket: WebSocket, _request: IncomingMessage): void { - const connection = new SocketConnection(socket, this.dwn); - this.#connections.set(socket, connection); - // attach to the socket's close handler to clean up this connection. - socket.on('close', () => { - // the connection internally already cleans itself up upon a socket close event, we just ned to remove it from our set. - this.#connections.delete(socket); - }); - } - /** - * This handler returns an interval to ping clients' socket every 30s - * if a pong hasn't received from a socket by the next ping, the server will terminate the socket connection. - */ - #setupHeartbeat(): NodeJS.Timer { - if (this.#heartbeatInterval) { - return this.#heartbeatInterval; - } - // Sometimes connections between client <-> server can get borked in such a way that - // leaves both unaware of the borkage. ping messages can be used as a means to verify - // that the remote endpoint is still responsive. Server will ping each socket every 30s - // if a pong hasn't received from a socket by the next ping, the server will terminate - // the socket connection - this.#heartbeatInterval = setInterval(() => { - this.#connections.forEach(async (connection) => { - if (connection.isAlive === false) { - return await connection.close(); - } - - connection.ping(); - }); - }, HEARTBEAT_INTERVAL); - } - /** * Handler for starting a WebSocket. * Sets listeners for `connection`, `close` events. * It clears `heartbeatInterval` when a `close` event is made. */ #setupWebSocket(): void { - this.#wsServer.on('connection', this.#handleConnection.bind(this)); - - const heartbeatInterval = this.#setupHeartbeat(); - - this.#wsServer.on('close', function close() { - clearInterval(heartbeatInterval); - }); + this.#wsServer.on('connection', this.#connections.connect.bind(this)); + this.#wsServer.on('close', this.#connections.close.bind(this)); } start(callback?: () => void): WebSocketServer { @@ -276,9 +48,6 @@ export class WsApi { async close(): Promise { this.#wsServer.close(); - for (const [socket, connection] of this.#connections) { - this.#connections.delete(socket); - await connection.close() - } + await this.#connections.close(); } } diff --git a/tests/utils.ts b/tests/utils.ts index 9bd625a..c05021d 100644 --- a/tests/utils.ts +++ b/tests/utils.ts @@ -222,7 +222,7 @@ export async function sendWsMessage( }); } -const MAX_RESPONSE_TIMEOUT = 3_000; +const MAX_RESPONSE_TIMEOUT = 1_500; export async function subscriptionRequest( url: string, @@ -255,10 +255,8 @@ export async function subscriptionRequest( messageHandler(record); return; } - if (subscription) { resolved = true; - resolve({ status, subscription: { diff --git a/tests/ws-api.spec.ts b/tests/ws-api.spec.ts index b905c4b..74661f2 100644 --- a/tests/ws-api.spec.ts +++ b/tests/ws-api.spec.ts @@ -139,7 +139,7 @@ describe('websocket api', function () { // close the subscription await response.subscription.close(); - await new Promise(resolve => setTimeout(resolve, 500)); // wait for records to be processed + await new Promise(resolve => setTimeout(resolve, 5)); // wait for records to be processed expect(records).to.have.members([ await Message.getCid(write1Message.message), await Message.getCid(write2Message.message) @@ -216,7 +216,7 @@ describe('websocket api', function () { }) expect(writeResult3.status.code).to.equal(202); - await new Promise(resolve => setTimeout(resolve, 500)); // wait for records to be processed + await new Promise(resolve => setTimeout(resolve, 5)); // wait for records to be processed expect(records).to.have.members([ await Message.getCid(write1Message.message) ]); }); });