From e1396cba8e8ed89694f8819974f3c3f94f6b5d66 Mon Sep 17 00:00:00 2001 From: Liran Cohen Date: Tue, 27 Feb 2024 16:01:49 -0500 Subject: [PATCH] WebSocket Subscriptions via JRPC (#107) This PR enhances `WebSocket` support via `JSON RPC 2.0`. This now supports long running subscriptions via `JSON RPC`. The new `JsonRpcSocket` abstracts this by providing two methods,`request` and `subscribe`. `request` sends a JSON RPC message over the socket, setting up a message handler to listen to the response, once the response is received it will clean up the listener, and resolve the response to the promise. If a response is not emitted within a given timeout, it will reject the promise. `subscribe` does something similar, however after receiving the response to the subscribe message, it will keep the handler open and look for any other messages that match the JSON RPC Id provided in the subscription request, and emit those to a message handler. The subscribe method returns a close method in order to clean up these listeners. Some things to note: - `RecordsWrite` are currently not supported via sockets, this is due to a poor handling of the data payload in the prior implementation. Would rather add this as a separate effort. TODO is tagged in code with the issue listed below. - `Subscribe` methods are currently only supported via sockets and hot `http`. - A `rpc.subscription.close` JSON RPC Method was added to close a subscription that is active for a connection. I went back and forth between making this a DWN Message vs some other signal. I landed on this for the time being and am open to discussion. More notes below. - As a part of `getDWNConfig`, `tenantGate` and `eventStream` were both made optional, as well as the `registrationManager` for `HttpApi`. I did this mostly out of convenience, but it also seems plausible that someone might run this without any registration manager. Open to discuss/change. - the `sendHttpMessage` method within `tests/utils.ts` will also be replaced by full-fledged client, listed in a separate PR below. - Current implementation allows anyone to connect, this will be addressed in a subsequent PR, issue listed below. ### Usage of `subscription.close` JSON RPC method. Q: Why not use a specific `Unsubscribe` DWN message such as `RecordsUnsubscribe`? A: This would be the first message of it's kind that would need to specifically target the DWN and potentially transport of the DWN which holds the subscription. Instead the DWN `RecordsSubscribe` message returns a close method which the transport can keep a reference to given a specific `JSON RPC Id`. This JSON RPC Id represents a specific request to a transport that was created. Later a user can issue a `rpc.subscription.close` JSON RPC Method to tell the server to close that subscription. Ultimately the owner of the JRPC Socket connection is in charge of closing the subscription, they can close all subscriptions by simply disconnecting. So it makes sense to give them the ability to close a specific JRPC Subscription. ## Initial Effort Subsequent Issues/PRs: - https://github.com/TBD54566975/dwn-server/issues/111 - https://github.com/TBD54566975/dwn-server/issues/109 - https://github.com/TBD54566975/dwn-server/issues/110 ### Separate effort: - https://github.com/TBD54566975/dwn-server/issues/108 --- package-lock.json | 48 ++- package.json | 4 +- src/connection/connection-manager.ts | 39 ++ src/connection/socket-connection.ts | 221 ++++++++++ src/dwn-error.ts | 2 + src/dwn-server.ts | 25 +- src/http-api.ts | 6 +- src/index.ts | 2 +- src/json-rpc-api.ts | 8 +- src/json-rpc-handlers/dwn/process-message.ts | 89 +++- src/json-rpc-handlers/subscription/close.ts | 59 +++ src/json-rpc-handlers/subscription/index.ts | 1 + src/json-rpc-socket.ts | 150 +++++++ src/lib/json-rpc-router.ts | 16 +- src/lib/json-rpc.ts | 54 ++- src/storage.ts | 11 +- src/ws-api.ts | 159 +------- tests/connection/connection-manager.spec.ts | 60 +++ tests/connection/socket-connection.spec.ts | 159 ++++++++ tests/dwn-process-message.spec.ts | 75 +++- tests/dwn-server.spec.ts | 43 +- tests/http-api.spec.ts | 2 +- tests/json-rpc-socket.spec.ts | 234 +++++++++++ .../proof-of-work-manager.spec.ts | 4 +- tests/rpc-subscribe-close.spec.ts | 126 ++++++ tests/test-dwn.ts | 21 +- tests/utils.ts | 66 ++- tests/ws-api.spec.ts | 380 ++++++++++++++++-- 28 files changed, 1823 insertions(+), 241 deletions(-) create mode 100644 src/connection/connection-manager.ts create mode 100644 src/connection/socket-connection.ts create mode 100644 src/json-rpc-handlers/subscription/close.ts create mode 100644 src/json-rpc-handlers/subscription/index.ts create mode 100644 src/json-rpc-socket.ts create mode 100644 tests/connection/connection-manager.spec.ts create mode 100644 tests/connection/socket-connection.spec.ts create mode 100644 tests/json-rpc-socket.spec.ts create mode 100644 tests/rpc-subscribe-close.spec.ts diff --git a/package-lock.json b/package-lock.json index 45359d5..f111f15 100644 --- a/package-lock.json +++ b/package-lock.json @@ -35,10 +35,12 @@ "devDependencies": { "@types/bytes": "3.1.1", "@types/chai": "4.3.4", + "@types/chai-as-promised": "7.1.5", "@types/express": "4.17.17", "@types/mocha": "10.0.1", "@types/node": "18.11.18", "@types/readable-stream": "4.0.6", + "@types/sinon": "17.0.3", "@types/supertest": "2.0.12", "@types/ws": "8.5.4", "@typescript-eslint/eslint-plugin": "5.59.0", @@ -62,7 +64,7 @@ "lint-staged": "^14.0.1", "mocha": "^10.2.0", "puppeteer": "^21.4.0", - "sinon": "16.1.0", + "sinon": "17.0.1", "stream-browserify": "^3.0.0", "supertest": "6.3.3", "typescript": "^5.1.6" @@ -743,6 +745,15 @@ "integrity": "sha512-KnRanxnpfpjUTqTCXslZSEdLfXExwgNxYPdiO2WGUj8+HDjFi8R3k5RVKPeSCzLjCcshCAtVO2QBbVuAV4kTnw==", "dev": true }, + "node_modules/@types/chai-as-promised": { + "version": "7.1.5", + "resolved": "https://registry.npmjs.org/@types/chai-as-promised/-/chai-as-promised-7.1.5.tgz", + "integrity": "sha512-jStwss93SITGBwt/niYrkf2C+/1KTeZCZl1LaeezTlqppAKeoQC7jxyqYuP72sxBGKCIbw7oHgbYssIRzT5FCQ==", + "dev": true, + "dependencies": { + "@types/chai": "*" + } + }, "node_modules/@types/connect": { "version": "3.4.38", "resolved": "https://registry.npmjs.org/@types/connect/-/connect-3.4.38.tgz", @@ -888,6 +899,21 @@ "@types/node": "*" } }, + "node_modules/@types/sinon": { + "version": "17.0.3", + "resolved": "https://registry.npmjs.org/@types/sinon/-/sinon-17.0.3.tgz", + "integrity": "sha512-j3uovdn8ewky9kRBG19bOwaZbexJu/XjtkHyjvUgt4xfPFz18dcORIMqnYh66Fx3Powhcr85NT5+er3+oViapw==", + "dev": true, + "dependencies": { + "@types/sinonjs__fake-timers": "*" + } + }, + "node_modules/@types/sinonjs__fake-timers": { + "version": "8.1.5", + "resolved": "https://registry.npmjs.org/@types/sinonjs__fake-timers/-/sinonjs__fake-timers-8.1.5.tgz", + "integrity": "sha512-mQkU2jY8jJEF7YHjHvsQO8+3ughTL1mcnn96igfhONmR+fUPSKIkefQYpSe8bsly2Ep7oQbn/6VG5/9/0qcArQ==", + "dev": true + }, "node_modules/@types/superagent": { "version": "4.1.21", "resolved": "https://registry.npmjs.org/@types/superagent/-/superagent-4.1.21.tgz", @@ -8222,17 +8248,16 @@ } }, "node_modules/sinon": { - "version": "16.1.0", - "resolved": "https://registry.npmjs.org/sinon/-/sinon-16.1.0.tgz", - "integrity": "sha512-ZSgzF0vwmoa8pq0GEynqfdnpEDyP1PkYmEChnkjW0Vyh8IDlyFEJ+fkMhCP0il6d5cJjPl2PUsnUSAuP5sttOQ==", - "deprecated": "16.1.1", + "version": "17.0.1", + "resolved": "https://registry.npmjs.org/sinon/-/sinon-17.0.1.tgz", + "integrity": "sha512-wmwE19Lie0MLT+ZYNpDymasPHUKTaZHUH/pKEubRXIzySv9Atnlw+BUMGCzWgV7b7wO+Hw6f1TEOr0IUnmU8/g==", "dev": true, "dependencies": { "@sinonjs/commons": "^3.0.0", - "@sinonjs/fake-timers": "^10.3.0", + "@sinonjs/fake-timers": "^11.2.2", "@sinonjs/samsam": "^8.0.0", "diff": "^5.1.0", - "nise": "^5.1.4", + "nise": "^5.1.5", "supports-color": "^7.2.0" }, "funding": { @@ -8240,6 +8265,15 @@ "url": "https://opencollective.com/sinon" } }, + "node_modules/sinon/node_modules/@sinonjs/fake-timers": { + "version": "11.2.2", + "resolved": "https://registry.npmjs.org/@sinonjs/fake-timers/-/fake-timers-11.2.2.tgz", + "integrity": "sha512-G2piCSxQ7oWOxwGSAyFHfPIsyeJGXYtc6mFbnFA+kRXkiEnTl8c/8jul2S329iFBnDI9HGoeWWAZvuvOkZccgw==", + "dev": true, + "dependencies": { + "@sinonjs/commons": "^3.0.0" + } + }, "node_modules/sinon/node_modules/diff": { "version": "5.1.0", "resolved": "https://registry.npmjs.org/diff/-/diff-5.1.0.tgz", diff --git a/package.json b/package.json index 9c5365d..fd60e4c 100644 --- a/package.json +++ b/package.json @@ -50,10 +50,12 @@ "devDependencies": { "@types/bytes": "3.1.1", "@types/chai": "4.3.4", + "@types/chai-as-promised": "7.1.5", "@types/express": "4.17.17", "@types/mocha": "10.0.1", "@types/node": "18.11.18", "@types/readable-stream": "4.0.6", + "@types/sinon": "17.0.3", "@types/supertest": "2.0.12", "@types/ws": "8.5.4", "@typescript-eslint/eslint-plugin": "5.59.0", @@ -77,7 +79,7 @@ "lint-staged": "^14.0.1", "mocha": "^10.2.0", "puppeteer": "^21.4.0", - "sinon": "16.1.0", + "sinon": "17.0.1", "stream-browserify": "^3.0.0", "supertest": "6.3.3", "typescript": "^5.1.6" diff --git a/src/connection/connection-manager.ts b/src/connection/connection-manager.ts new file mode 100644 index 0000000..b98aae8 --- /dev/null +++ b/src/connection/connection-manager.ts @@ -0,0 +1,39 @@ +import type { Dwn } from "@tbd54566975/dwn-sdk-js"; + +import type { IncomingMessage } from "http"; +import type { WebSocket } from 'ws'; + +import { SocketConnection } from "./socket-connection.js"; + +/** + * Interface for managing `WebSocket` connections as they arrive. + */ +export interface ConnectionManager { + /** connect handler used for the `WebSockets` `'connection'` event. */ + connect(socket: WebSocket, request?: IncomingMessage): Promise; + /** closes all of the connections */ + closeAll(): Promise +} + +/** + * A Simple In Memory ConnectionManager implementation. + * It uses a `Map` to manage connections. + */ +export class InMemoryConnectionManager implements ConnectionManager { + constructor(private dwn: Dwn, private connections: Map = new Map()) {} + + async connect(socket: WebSocket): Promise { + const connection = new SocketConnection(socket, this.dwn, () => { + // this is the onClose handler to clean up any closed connections. + this.connections.delete(socket); + }); + + this.connections.set(socket, connection); + } + + async closeAll(): Promise { + const closePromises = []; + this.connections.forEach(connection => closePromises.push(connection.close())); + await Promise.all(closePromises); + } +} \ No newline at end of file diff --git a/src/connection/socket-connection.ts b/src/connection/socket-connection.ts new file mode 100644 index 0000000..49625c1 --- /dev/null +++ b/src/connection/socket-connection.ts @@ -0,0 +1,221 @@ +import type { Dwn, GenericMessage, MessageEvent } from "@tbd54566975/dwn-sdk-js"; +import { DwnMethodName } from "@tbd54566975/dwn-sdk-js"; + +import type { WebSocket } from "ws"; +import log from 'loglevel'; +import { v4 as uuidv4 } from 'uuid'; + +import type { RequestContext } from "../lib/json-rpc-router.js"; +import type { JsonRpcErrorResponse, JsonRpcId, JsonRpcRequest, JsonRpcResponse, JsonRpcSubscription } from "../lib/json-rpc.js"; + +import { requestCounter } from "../metrics.js"; +import { jsonRpcRouter } from "../json-rpc-api.js"; +import { JsonRpcErrorCodes, createJsonRpcErrorResponse, createJsonRpcSuccessResponse } from "../lib/json-rpc.js"; +import { DwnServerError, DwnServerErrorCode } from "../dwn-error.js"; + +const HEARTBEAT_INTERVAL = 30_000; + +/** + * SocketConnection handles a WebSocket connection to a DWN using JSON RPC. + * It also manages references to the long running RPC subscriptions for the connection. + */ +export class SocketConnection { + private heartbeatInterval: NodeJS.Timer; + private subscriptions: Map = new Map(); + private isAlive: boolean; + + constructor( + private socket: WebSocket, + private dwn: Dwn, + private onClose?: () => void + ){ + socket.on('message', this.message.bind(this)); + socket.on('close', this.close.bind(this)); + socket.on('error', this.error.bind(this)); + socket.on('pong', this.pong.bind(this)); + + // Sometimes connections between client <-> server can get borked in such a way that + // leaves both unaware of the borkage. ping messages can be used as a means to verify + // that the remote endpoint is still responsive. Server will ping each socket every 30s + // if a pong hasn't received from a socket by the next ping, the server will terminate + // the socket connection + this.isAlive = true; + this.heartbeatInterval = setInterval(() => { + if (this.isAlive === false) { + this.close(); + } + this.isAlive = false; + this.socket.ping(); + }, HEARTBEAT_INTERVAL); + } + + /** + * Checks to see if the incoming `JsonRpcId` is already in use for a subscription. + */ + hasSubscription(id: JsonRpcId): boolean { + return this.subscriptions.has(id); + } + + /** + * Adds a reference for the JSON RPC Subscription to this connection. + * Used for cleanup if the connection is closed. + */ + async addSubscription(subscription: JsonRpcSubscription): Promise { + if (this.subscriptions.has(subscription.id)) { + throw new DwnServerError( + DwnServerErrorCode.ConnectionSubscriptionJsonRpcIdExists, + `the subscription with id ${subscription.id} already exists` + ) + } + + this.subscriptions.set(subscription.id, subscription); + } + + /** + * Closes and removes the reference for a given subscription from this connection. + * + * @param id the `JsonRpcId` of the JSON RPC subscription request. + */ + async closeSubscription(id: JsonRpcId): Promise { + if (!this.subscriptions.has(id)) { + throw new DwnServerError( + DwnServerErrorCode.ConnectionSubscriptionJsonRpcIdNotFound, + `the subscription with id ${id} was not found` + ) + } + + const connection = this.subscriptions.get(id); + await connection.close(); + this.subscriptions.delete(id); + } + + /** + * Closes the existing connection and cleans up any listeners or subscriptions. + */ + async close(): Promise { + clearInterval(this.heartbeatInterval); + + // clean up all socket event listeners + this.socket.removeAllListeners(); + + const closePromises = []; + for (const [id, subscription] of this.subscriptions) { + closePromises.push(subscription.close()); + this.subscriptions.delete(id); + } + + // close all of the associated subscriptions + await Promise.all(closePromises); + + // close the socket. + this.socket.close(); + + // if there was a close handler passed call it after the connection has been closed + if (this.onClose !== undefined) { + this.onClose(); + } + } + + /** + * Pong messages are automatically sent in response to ping messages as required by + * the websocket spec. So, no need to send explicit pongs. + */ + private pong(): void { + this.isAlive = true; + } + + /** + * Log the error and close the connection. + */ + private async error(error:Error): Promise{ + log.error(`SocketConnection error, terminating connection`, error); + this.socket.terminate(); + await this.close(); + } + + /** + * Handles a `JSON RPC 2.0` encoded message. + */ + private async message(dataBuffer: Buffer): Promise { + const requestData = dataBuffer.toString(); + if (!requestData) { + return this.send(createJsonRpcErrorResponse( + uuidv4(), + JsonRpcErrorCodes.BadRequest, + 'request payload required.' + )) + } + + let jsonRequest: JsonRpcRequest; + try { + jsonRequest = JSON.parse(requestData); + } catch(error) { + const errorResponse = createJsonRpcErrorResponse( + uuidv4(), + JsonRpcErrorCodes.BadRequest, + (error as Error).message + ); + return this.send(errorResponse); + }; + + const requestContext = await this.buildRequestContext(jsonRequest); + const { jsonRpcResponse } = await jsonRpcRouter.handle(jsonRequest, requestContext); + if (jsonRpcResponse.error) { + requestCounter.inc({ method: jsonRequest.method, error: 1 }); + } else { + requestCounter.inc({ + method: jsonRequest.method, + status: jsonRpcResponse?.result?.reply?.status?.code || 0, + }); + } + this.send(jsonRpcResponse); + } + + /** + * Sends a JSON encoded Buffer through the Websocket. + */ + private send(response: JsonRpcResponse | JsonRpcErrorResponse): void { + this.socket.send(Buffer.from(JSON.stringify(response))); + } + + /** + * Creates a subscription handler to send messages matching the subscription requested. + * + * Wraps the incoming `message` in a `JSON RPC Success Response` using the original subscription`JSON RPC Id` to send through the WebSocket. + */ + private createSubscriptionHandler(id: JsonRpcId): (message: MessageEvent) => void { + return (event) => { + const response = createJsonRpcSuccessResponse(id, { event }); + this.send(response); + } + } + + /** + * Builds a `RequestContext` object to use with the `JSON RPC API`. + * + * Adds a `subscriptionHandler` for `Subscribe` messages. + */ + private async buildRequestContext(request: JsonRpcRequest): Promise { + const { params, method, subscription } = request; + + const requestContext: RequestContext = { + transport : 'ws', + dwn : this.dwn, + socketConnection : this, + } + + // methods that expect a long-running subscription begin with `rpc.subscribe.` + if (method.startsWith('rpc.subscribe.') && subscription) { + const { message } = params as { message?: GenericMessage }; + if (message?.descriptor.method === DwnMethodName.Subscribe) { + const handlerFunc = this.createSubscriptionHandler(subscription.id); + requestContext.subscriptionRequest = { + id: subscription.id, + subscriptionHandler: (message): void => handlerFunc(message), + } + } + } + + return requestContext; + } +} diff --git a/src/dwn-error.ts b/src/dwn-error.ts index fa19159..e872524 100644 --- a/src/dwn-error.ts +++ b/src/dwn-error.ts @@ -26,6 +26,8 @@ export class DwnServerError extends Error { * DWN Server error codes. */ export enum DwnServerErrorCode { + ConnectionSubscriptionJsonRpcIdExists = 'ConnectionSubscriptionJsonRpcIdExists', + ConnectionSubscriptionJsonRpcIdNotFound = 'ConnectionSubscriptionJsonRpcIdNotFound', ProofOfWorkInsufficientSolutionNonce = 'ProofOfWorkInsufficientSolutionNonce', ProofOfWorkInvalidOrExpiredChallenge = 'ProofOfWorkInvalidOrExpiredChallenge', ProofOfWorkManagerInvalidChallengeNonce = 'ProofOfWorkManagerInvalidChallengeNonce', diff --git a/src/dwn-server.ts b/src/dwn-server.ts index 15878ef..7a9e23f 100644 --- a/src/dwn-server.ts +++ b/src/dwn-server.ts @@ -1,4 +1,5 @@ -import { Dwn } from '@tbd54566975/dwn-sdk-js'; +import type { EventStream } from '@tbd54566975/dwn-sdk-js'; +import { Dwn, EventEmitterStream } from '@tbd54566975/dwn-sdk-js'; import type { Server } from 'http'; import log from 'loglevel'; @@ -39,10 +40,9 @@ export class DwnServer { prefix.apply(log); } - async start(callback?: () => void): Promise { + async start(): Promise { await this.#setupServer(); setProcessHandlers(this); - callback?.(); } /** @@ -61,7 +61,17 @@ export class DwnServer { proofOfWorkInitialMaximumAllowedHash: this.config.registrationProofOfWorkInitialMaxHash, }); - this.dwn = await Dwn.create(getDWNConfig(this.config, registrationManager)); + let eventStream: EventStream | undefined; + if (this.config.webSocketServerEnabled) { + // setting `EventEmitterStream` as default the default `EventStream + // if an alternate implementation is needed, instantiate a `Dwn` with a custom `EventStream` and add it to server options. + eventStream = new EventEmitterStream(); + } + + this.dwn = await Dwn.create(getDWNConfig(this.config, { + tenantGate: registrationManager, + eventStream, + })); } this.#httpApi = new HttpApi(this.config, this.dwn, registrationManager); @@ -76,7 +86,8 @@ export class DwnServer { if (this.config.webSocketServerEnabled) { this.#wsApi = new WsApi(this.#httpApi.server, this.dwn); - this.#wsApi.start(() => log.info(`WebSocketServer ready...`)); + this.#wsApi.start(); + log.info('WebSocketServer ready...'); } } @@ -88,8 +99,8 @@ export class DwnServer { return this.#httpApi.server; } - get wsServer(): WebSocketServer { - return this.#wsApi.server; + get wsServer(): WebSocketServer | undefined { + return this.#wsApi?.server; } /** diff --git a/src/http-api.ts b/src/http-api.ts index 30972ae..e5bbb24 100644 --- a/src/http-api.ts +++ b/src/http-api.ts @@ -17,7 +17,7 @@ import { createJsonRpcErrorResponse, JsonRpcErrorCodes } from './lib/json-rpc.js import type { DwnServerConfig } from './config.js'; import { config } from './config.js'; import { type DwnServerError } from './dwn-error.js'; -import { jsonRpcApi } from './json-rpc-api.js'; +import { jsonRpcRouter } from './json-rpc-api.js'; import { requestCounter, responseHistogram } from './metrics.js'; import type { RegistrationManager } from './registration/registration-manager.js'; @@ -30,7 +30,7 @@ export class HttpApi { registrationManager: RegistrationManager; dwn: Dwn; - constructor(config: DwnServerConfig, dwn: Dwn, registrationManager: RegistrationManager) { + constructor(config: DwnServerConfig, dwn: Dwn, registrationManager?: RegistrationManager) { console.log(config); this.#config = config; @@ -149,7 +149,7 @@ export class HttpApi { transport : 'http', dataStream : requestDataStream, }; - const { jsonRpcResponse, dataStream: responseDataStream } = await jsonRpcApi.handle(dwnRpcRequest, requestContext as RequestContext); + const { jsonRpcResponse, dataStream: responseDataStream } = await jsonRpcRouter.handle(dwnRpcRequest, requestContext as RequestContext); // If the handler catches a thrown exception and returns a JSON RPC InternalError, return the equivalent // HTTP 500 Internal Server Error with the response. diff --git a/src/index.ts b/src/index.ts index e77275c..77b43b5 100644 --- a/src/index.ts +++ b/src/index.ts @@ -1,6 +1,6 @@ export { DwnServerConfig } from './config.js'; export { DwnServer, DwnServerOptions } from './dwn-server.js'; export { HttpApi } from './http-api.js'; -export { jsonRpcApi } from './json-rpc-api.js'; +export { jsonRpcRouter } from './json-rpc-api.js'; export { EStoreType, BackendTypes, StoreType } from './storage.js'; export { WsApi } from './ws-api.js'; diff --git a/src/json-rpc-api.ts b/src/json-rpc-api.ts index 41661e5..d3e4d39 100644 --- a/src/json-rpc-api.ts +++ b/src/json-rpc-api.ts @@ -1,7 +1,11 @@ import { JsonRpcRouter } from './lib/json-rpc-router.js'; import { handleDwnProcessMessage } from './json-rpc-handlers/dwn/index.js'; +import { handleSubscriptionsClose } from './json-rpc-handlers/subscription/index.js'; -export const jsonRpcApi = new JsonRpcRouter(); +export const jsonRpcRouter = new JsonRpcRouter(); -jsonRpcApi.on('dwn.processMessage', handleDwnProcessMessage); +jsonRpcRouter.on('dwn.processMessage', handleDwnProcessMessage); +jsonRpcRouter.on('rpc.subscribe.dwn.processMessage', handleDwnProcessMessage); + +jsonRpcRouter.on('rpc.subscribe.close', handleSubscriptionsClose); diff --git a/src/json-rpc-handlers/dwn/process-message.ts b/src/json-rpc-handlers/dwn/process-message.ts index 09be116..bed6db3 100644 --- a/src/json-rpc-handlers/dwn/process-message.ts +++ b/src/json-rpc-handlers/dwn/process-message.ts @@ -1,38 +1,103 @@ -import type { RecordsReadReply } from '@tbd54566975/dwn-sdk-js'; +import type { GenericMessage } from '@tbd54566975/dwn-sdk-js'; +import { DwnInterfaceName, DwnMethodName } from '@tbd54566975/dwn-sdk-js'; import type { Readable as IsomorphicReadable } from 'readable-stream'; +import log from 'loglevel'; import { v4 as uuidv4 } from 'uuid'; +import type { JsonRpcSubscription } from '../../lib/json-rpc.js'; import type { HandlerResponse, JsonRpcHandler, } from '../../lib/json-rpc-router.js'; + import { createJsonRpcErrorResponse, createJsonRpcSuccessResponse, JsonRpcErrorCodes, } from '../../lib/json-rpc.js'; + export const handleDwnProcessMessage: JsonRpcHandler = async ( dwnRequest, context, ) => { - const { dwn, dataStream } = context; - const { target, message } = dwnRequest.params; + const { dwn, dataStream, subscriptionRequest, socketConnection, transport } = context; + const { target, message } = dwnRequest.params as { target: string, message: GenericMessage }; const requestId = dwnRequest.id ?? uuidv4(); try { - const reply = (await dwn.processMessage( - target, - message, - { dataStream: dataStream as IsomorphicReadable }, - )) as RecordsReadReply; + // RecordsWrite is only supported on 'http' to support data stream for large data + // TODO: https://github.com/TBD54566975/dwn-server/issues/108 + if ( + transport !== 'http' && + message.descriptor.interface === DwnInterfaceName.Records && + message.descriptor.method === DwnMethodName.Write + ) { + const jsonRpcResponse = createJsonRpcErrorResponse( + requestId, + JsonRpcErrorCodes.InvalidParams, + `RecordsWrite is not supported via ${context.transport}` + ) + return { jsonRpcResponse }; + } + + // subscribe methods must come with a subscriptionRequest context + if (message.descriptor.method === DwnMethodName.Subscribe && subscriptionRequest === undefined) { + const jsonRpcResponse = createJsonRpcErrorResponse( + requestId, + JsonRpcErrorCodes.InvalidRequest, + `subscribe methods must contain a subscriptionRequest context` + ); + return { jsonRpcResponse }; + } + + // Subscribe methods are only supported on 'ws' (WebSockets) + if (transport !== 'ws' && subscriptionRequest !== undefined) { + const jsonRpcResponse = createJsonRpcErrorResponse( + requestId, + JsonRpcErrorCodes.InvalidParams, + `subscriptions are not supported via ${context.transport}` + ) + return { jsonRpcResponse }; + } + + // if this is a subscription request, we first check if the connection has a subscription with this Id + // we do this ahead of time to prevent opening a subscription on the dwn only to close it after attempting to add it to the subscription manager + // otherwise the subscription manager would throw an error that the Id is already in use and we would close the open subscription on the DWN. + if (subscriptionRequest !== undefined && socketConnection?.hasSubscription(subscriptionRequest.id)) { + const jsonRpcResponse = createJsonRpcErrorResponse( + requestId, + JsonRpcErrorCodes.InvalidParams, + `the subscribe id: ${subscriptionRequest.id} is in use by an active subscription` + ) + return { jsonRpcResponse }; + } + const reply = await dwn.processMessage(target, message, { + dataStream: dataStream as IsomorphicReadable, + subscriptionHandler: subscriptionRequest?.subscriptionHandler, + }); + + const { record } = reply; // RecordsRead messages return record data as a stream to for accommodate large amounts of data - let recordDataStream; - if (reply?.record?.data !== undefined) { + let recordDataStream: IsomorphicReadable; + if (record !== undefined && record.data !== undefined) { recordDataStream = reply.record.data; - delete reply.record.data; + delete reply.record.data; // not serializable via JSON + } + + if (subscriptionRequest && reply.subscription) { + const { close } = reply.subscription; + // Subscribe messages return a close function to facilitate closing the subscription + // we add a reference to the close function for this subscription request to the socket connection. + // this will facilitate closing the subscription later. + const subscriptionReply: JsonRpcSubscription = { + id: subscriptionRequest.id, + close, + } + await socketConnection.addSubscription(subscriptionReply); + delete reply.subscription.close // delete the close method from the reply as it's not JSON serializable and has a held reference. } const jsonRpcResponse = createJsonRpcSuccessResponse(requestId, { reply }); @@ -49,6 +114,8 @@ export const handleDwnProcessMessage: JsonRpcHandler = async ( e.message, ); + // log the unhandled error response + log.error('handleDwnProcessMessage error', jsonRpcResponse, e); return { jsonRpcResponse } as HandlerResponse; } }; diff --git a/src/json-rpc-handlers/subscription/close.ts b/src/json-rpc-handlers/subscription/close.ts new file mode 100644 index 0000000..fda3761 --- /dev/null +++ b/src/json-rpc-handlers/subscription/close.ts @@ -0,0 +1,59 @@ +import { v4 as uuidv4 } from 'uuid'; + +import { DwnServerErrorCode } from '../../dwn-error.js'; +import type { + HandlerResponse, + JsonRpcHandler, +} from '../../lib/json-rpc-router.js'; + +import type { JsonRpcId, JsonRpcResponse } from '../../lib/json-rpc.js'; +import { + createJsonRpcErrorResponse, + createJsonRpcSuccessResponse, + JsonRpcErrorCodes, +} from '../../lib/json-rpc.js'; + +/** + * Closes a subscription tied to a specific `SocketConnection`. + * + * @param jsonRpcRequest must include JsonRpcId of the subscription request within a `subscription object`. + * @param context must include the associated `SocketConnection`. + * + */ +export const handleSubscriptionsClose: JsonRpcHandler = async ( + jsonRpcRequest, + context, +) => { + const requestId = jsonRpcRequest.id ?? uuidv4(); + if (context.socketConnection === undefined) { + const jsonRpcResponse = createJsonRpcErrorResponse(requestId, JsonRpcErrorCodes.InvalidRequest, 'socket connection does not exist'); + return { jsonRpcResponse }; + } + + if (jsonRpcRequest.subscription === undefined) { + const jsonRpcResponse = createJsonRpcErrorResponse(requestId, JsonRpcErrorCodes.InvalidRequest, 'subscribe options do not exist'); + return { jsonRpcResponse }; + } + + const { socketConnection } = context; + const { id } = jsonRpcRequest.subscription as { id: JsonRpcId }; + + let jsonRpcResponse:JsonRpcResponse; + try { + // closing the subscription and cleaning up the reference within the given connection. + await socketConnection.closeSubscription(id); + jsonRpcResponse = createJsonRpcSuccessResponse(requestId, { reply: { status: 200, detail: 'Accepted' } }); + } catch(error) { + if (error.code === DwnServerErrorCode.ConnectionSubscriptionJsonRpcIdNotFound) { + jsonRpcResponse = createJsonRpcErrorResponse(requestId, JsonRpcErrorCodes.InvalidParams, `subscription ${id} does not exist.`); + } else { + jsonRpcResponse = createJsonRpcErrorResponse( + requestId, + JsonRpcErrorCodes.InternalError, + `unknown subscription close error for ${id}: ${error.message}` + ); + } + } + + return { jsonRpcResponse } as HandlerResponse; +} \ No newline at end of file diff --git a/src/json-rpc-handlers/subscription/index.ts b/src/json-rpc-handlers/subscription/index.ts new file mode 100644 index 0000000..1225a0f --- /dev/null +++ b/src/json-rpc-handlers/subscription/index.ts @@ -0,0 +1 @@ +export * from './close.js'; diff --git a/src/json-rpc-socket.ts b/src/json-rpc-socket.ts new file mode 100644 index 0000000..3b3cc7b --- /dev/null +++ b/src/json-rpc-socket.ts @@ -0,0 +1,150 @@ +import log from 'loglevel'; +import { v4 as uuidv4 } from 'uuid'; +import WebSocket from 'ws'; + +import type { JsonRpcId, JsonRpcRequest, JsonRpcResponse } from "./lib/json-rpc.js"; +import { createJsonRpcSubscriptionRequest } from "./lib/json-rpc.js"; + +// These were arbitrarily chosen, but can be modified via connect options +const CONNECT_TIMEOUT = 3_000; +const RESPONSE_TIMEOUT = 30_000; + +export interface JsonRpcSocketOptions { + /** socket connection timeout in milliseconds */ + connectTimeout?: number; + /** response timeout for rpc requests in milliseconds */ + responseTimeout?: number; + /** optional connection close handler */ + onclose?: () => void; + /** optional socket error handler */ + onerror?: (error?: any) => void; +} + +/** + * JSON RPC Socket Client for WebSocket request/response and long-running subscriptions. + */ +export class JsonRpcSocket { + private constructor(private socket: WebSocket, private responseTimeout: number) {} + + static async connect(url: string, options: JsonRpcSocketOptions = {}): Promise { + const { connectTimeout = CONNECT_TIMEOUT, responseTimeout = RESPONSE_TIMEOUT, onclose, onerror } = options; + + const socket = new WebSocket(url, { timeout: connectTimeout }); + + socket.onclose = onclose; + socket.onerror = onerror; + + if (!socket.onclose) { + socket.onclose = ():void => { + log.info(`JSON RPC Socket close ${url}`); + } + } + + if (!socket.onerror) { + socket.onerror = (error?: any):void => { + log.error(`JSON RPC Socket error ${url}`, error); + } + } + + return new Promise((resolve, reject) => { + socket.on('open', () => { + resolve(new JsonRpcSocket(socket, responseTimeout)); + }); + + setTimeout(() => reject, connectTimeout); + }); + } + + close(): void { + this.socket.close(); + } + + /** + * Sends a JSON-RPC request through the socket and waits for a single response. + */ + async request(request: JsonRpcRequest): Promise { + return new Promise((resolve, reject) => { + request.id ??= uuidv4(); + + const handleResponse = (event: { data: any }):void => { + const jsonRpsResponse = JSON.parse(event.data.toString()) as JsonRpcResponse; + if (jsonRpsResponse.id === request.id) { + // if the incoming response id matches the request id, we will remove the listener and resolve the response + this.socket.removeEventListener('message', handleResponse); + return resolve(jsonRpsResponse); + } + }; + // subscribe to the listener before sending the request + this.socket.addEventListener('message', handleResponse); + this.send(request); + + // reject this promise if we don't receive any response back within the timeout period + setTimeout(() => { + this.socket.removeEventListener('message', handleResponse); + reject(new Error('request timed out')); + }, this.responseTimeout); + }); + } + + /** + * Sends a JSON-RPC request through the socket and keeps a listener open to read associated responses as they arrive. + * Returns a close method to clean up the listener. + */ + async subscribe(request: JsonRpcRequest, listener: (response: JsonRpcResponse) => void): Promise<{ + response: JsonRpcResponse; + close?: () => Promise; + }> { + + if (!request.method.startsWith('rpc.subscribe.')) { + throw new Error('subscribe rpc requests must include the `rpc.subscribe` prefix'); + } + + if (!request.subscription) { + throw new Error('subscribe rpc requests must include subscribe options'); + } + + const subscriptionId = request.subscription.id; + const socketEventListener = (event: { data: any }):void => { + const jsonRpcResponse = JSON.parse(event.data.toString()) as JsonRpcResponse; + if (jsonRpcResponse.id === subscriptionId) { + if (jsonRpcResponse.error !== undefined) { + // remove the event listener upon receipt of a JSON RPC Error. + this.socket.removeEventListener('message', socketEventListener); + this.closeSubscription(subscriptionId); + } + listener(jsonRpcResponse); + } + }; + this.socket.addEventListener('message', socketEventListener); + + const response = await this.request(request); + if (response.error) { + this.socket.removeEventListener('message', socketEventListener); + return { response } + } + + // clean up listener and create a `rpc.subscribe.close` message to use when closing this JSON RPC subscription + const close = async (): Promise => { + this.socket.removeEventListener('message', socketEventListener); + await this.closeSubscription(subscriptionId); + } + + return { + response, + close + } + } + + private closeSubscription(id: JsonRpcId): Promise { + const requestId = uuidv4(); + const request = createJsonRpcSubscriptionRequest(requestId, 'rpc.subscribe.close', {}, id); + return this.request(request); + } + + /** + * Sends a JSON-RPC request through the socket. You must subscribe to a message listener separately to capture the response. + */ + send(request: JsonRpcRequest):void { + this.socket.send(Buffer.from(JSON.stringify(request))); + } +} \ No newline at end of file diff --git a/src/lib/json-rpc-router.ts b/src/lib/json-rpc-router.ts index 78036c7..8fd1408 100644 --- a/src/lib/json-rpc-router.ts +++ b/src/lib/json-rpc-router.ts @@ -1,12 +1,22 @@ -import type { Dwn } from '@tbd54566975/dwn-sdk-js'; +import type { Dwn, EventSubscriptionHandler } from '@tbd54566975/dwn-sdk-js'; import type { Readable } from 'node:stream'; -import type { JsonRpcRequest, JsonRpcResponse } from './json-rpc.js'; +import type { JsonRpcId, JsonRpcRequest, JsonRpcResponse } from './json-rpc.js'; +import type { SocketConnection } from '../connection/socket-connection.js'; export type RequestContext = { - dwn: Dwn; transport: 'http' | 'ws'; + dwn: Dwn; + /** the socket connection associated with this request if over sockets */ + socketConnection?: SocketConnection; + subscriptionRequest?: { + /** The JsonRpcId of the subscription handler */ + id: JsonRpcId; + /** The `MessageEvent` handler associated with a subscription request, only used in `ws` requests */ + subscriptionHandler: EventSubscriptionHandler; + } + /** The `Readable` stream associated with a `RecordsWrite` request only used in `http` requests */ dataStream?: Readable; }; diff --git a/src/lib/json-rpc.ts b/src/lib/json-rpc.ts index dfd638a..d54a9cf 100644 --- a/src/lib/json-rpc.ts +++ b/src/lib/json-rpc.ts @@ -1,12 +1,15 @@ export type JsonRpcId = string | number | null; -export type JsonRpcParams = any; export type JsonRpcVersion = '2.0'; export interface JsonRpcRequest { jsonrpc: JsonRpcVersion; id?: JsonRpcId; method: string; - params?: JsonRpcParams; + params?: any; + /** JSON RPC Subscription Extension Parameters */ + subscription?: { + id: JsonRpcId + }; } export interface JsonRpcError { @@ -15,6 +18,12 @@ export interface JsonRpcError { data?: any; } +export interface JsonRpcSubscription { + /** JSON RPC Id of the Subscription Request */ + id: JsonRpcId; + close: () => Promise; +} + export enum JsonRpcErrorCodes { // JSON-RPC 2.0 pre-defined errors InvalidRequest = -32600, @@ -23,10 +32,12 @@ export enum JsonRpcErrorCodes { InternalError = -32603, ParseError = -32700, - // App defined errors - BadRequest = -50400, // equivalent to HTTP Status 400 - Unauthorized = -50401, // equivalent to HTTP Status 401 - Forbidden = -50403, // equivalent to HTTP Status 403 + /** App defined error equivalent to HTTP Status 400 */ + BadRequest = -50400, + /** App defined error equivalent to HTTP Status 401 */ + Unauthorized = -50401, + /** App defined error equivalent to HTTP Status 403 */ + Forbidden = -50403, } export type JsonRpcResponse = JsonRpcSuccessResponse | JsonRpcErrorResponse; @@ -35,7 +46,7 @@ export interface JsonRpcSuccessResponse { jsonrpc: JsonRpcVersion; id: JsonRpcId; result: any; - error?: undefined; + error?: never; } export interface JsonRpcErrorResponse { @@ -64,7 +75,7 @@ export const createJsonRpcErrorResponse = ( export const createJsonRpcNotification = ( method: string, - params?: JsonRpcParams, + params?: any, ): JsonRpcRequest => { return { jsonrpc: '2.0', @@ -73,10 +84,27 @@ export const createJsonRpcNotification = ( }; }; +export const createJsonRpcSubscriptionRequest = ( + id: JsonRpcId, + method: string, + params?: any, + subscriptionId?: JsonRpcId +): JsonRpcRequest => { + return { + jsonrpc: '2.0', + id, + method, + params, + subscription: { + id: subscriptionId, + } + } +} + export const createJsonRpcRequest = ( id: JsonRpcId, method: string, - params?: JsonRpcParams, + params?: any, ): JsonRpcRequest => { return { jsonrpc: '2.0', @@ -96,11 +124,3 @@ export const createJsonRpcSuccessResponse = ( result: result ?? null, }; }; - -export function parseJson(text: string): object | null { - try { - return JSON.parse(text); - } catch { - return null; - } -} diff --git a/src/storage.ts b/src/storage.ts index 1988349..a4eea8c 100644 --- a/src/storage.ts +++ b/src/storage.ts @@ -9,6 +9,7 @@ import type { DataStore, DwnConfig, EventLog, + EventStream, MessageStore, TenantGate, } from '@tbd54566975/dwn-sdk-js'; @@ -45,9 +46,13 @@ export enum BackendTypes { export type StoreType = DataStore | EventLog | MessageStore; export function getDWNConfig( - config: DwnServerConfig, - tenantGate: TenantGate, + config : DwnServerConfig, + options : { + tenantGate? : TenantGate, + eventStream? : EventStream, + } ): DwnConfig { + const { tenantGate, eventStream } = options; const dataStore: DataStore = getStore(config.dataStore, EStoreType.DataStore); const eventLog: EventLog = getStore(config.eventLog, EStoreType.EventLog); const messageStore: MessageStore = getStore( @@ -55,7 +60,7 @@ export function getDWNConfig( EStoreType.MessageStore, ); - return { eventLog, dataStore, messageStore, tenantGate }; + return { eventStream, eventLog, dataStore, messageStore, tenantGate }; } function getLevelStore( diff --git a/src/ws-api.ts b/src/ws-api.ts index cac1cea..426aadc 100644 --- a/src/ws-api.ts +++ b/src/ws-api.ts @@ -1,175 +1,46 @@ -import { DataStream, type Dwn } from '@tbd54566975/dwn-sdk-js'; -import type { IncomingMessage, Server } from 'http'; -import { base64url } from 'multiformats/bases/base64'; -import { v4 as uuidv4 } from 'uuid'; -import { type AddressInfo, type WebSocket, WebSocketServer } from 'ws'; +import type { + Dwn, +} from '@tbd54566975/dwn-sdk-js'; -import type { RequestContext } from './lib/json-rpc-router.js'; -import { - createJsonRpcErrorResponse, - JsonRpcErrorCodes, - type JsonRpcResponse, -} from './lib/json-rpc.js'; +import type { Server } from 'http'; -import { jsonRpcApi } from './json-rpc-api.js'; -import { requestCounter } from './metrics.js'; +import { WebSocketServer } from 'ws'; -const SOCKET_ISALIVE_SYMBOL = Symbol('isAlive'); -const HEARTBEAT_INTERVAL = 30_000; +import type { ConnectionManager } from './connection/connection-manager.js'; +import { InMemoryConnectionManager } from './connection/connection-manager.js'; export class WsApi { #wsServer: WebSocketServer; dwn: Dwn; + #connectionManager: ConnectionManager - constructor(server: Server, dwn: Dwn) { + constructor(server: Server, dwn: Dwn, connectionManager?: ConnectionManager) { this.dwn = dwn; + this.#connectionManager = connectionManager || new InMemoryConnectionManager(dwn); this.#wsServer = new WebSocketServer({ server }); } - get address(): AddressInfo | string { - return this.#wsServer.address(); - } - get server(): WebSocketServer { return this.#wsServer; } - /** - * Handler for opening websocket event - `connection`. - * Sets listeners for `message`, `pong`, `close`, and `error` events. - */ - #handleConnection(socket: WebSocket, _request: IncomingMessage): void { - const dwn = this.dwn; - - socket[SOCKET_ISALIVE_SYMBOL] = true; - - // Pong messages are automatically sent in response to ping messages as required by - // the websocket spec. So, no need to send explicit pongs from browser - socket.on('pong', function () { - this[SOCKET_ISALIVE_SYMBOL] = true; - }); - - socket.on('close', function () { - // Clean up event listeners - socket.removeAllListeners(); - }); - - socket.on('error', function (error) { - console.error('WebSocket error:', error); - // Close the socket and remove all event listeners - socket.terminate(); - socket.removeAllListeners(); - }); - - socket.on('message', async function (dataBuffer) { - let dwnRequest; - - try { - // deserialize bytes into JSON object - dwnRequest = dataBuffer.toString(); - if (!dwnRequest) { - const jsonRpcResponse = createJsonRpcErrorResponse( - uuidv4(), - JsonRpcErrorCodes.BadRequest, - 'request payload required.', - ); - - const responseBuffer = WsApi.jsonRpcResponseToBuffer(jsonRpcResponse); - return socket.send(responseBuffer); - } - - dwnRequest = JSON.parse(dwnRequest); - } catch (e) { - const jsonRpcResponse = createJsonRpcErrorResponse( - uuidv4(), - JsonRpcErrorCodes.BadRequest, - e.message, - ); - - const responseBuffer = WsApi.jsonRpcResponseToBuffer(jsonRpcResponse); - return socket.send(responseBuffer); - } - - // Check whether data was provided in the request - const { encodedData } = dwnRequest.params; - const requestDataStream = encodedData - ? DataStream.fromBytes(base64url.baseDecode(encodedData)) - : undefined; - - const requestContext: RequestContext = { - dwn, - transport: 'ws', - dataStream: requestDataStream, - }; - const { jsonRpcResponse } = await jsonRpcApi.handle( - dwnRequest, - requestContext, - ); - - if (jsonRpcResponse.error) { - requestCounter.inc({ method: dwnRequest.method, error: 1 }); - } else { - requestCounter.inc({ - method: dwnRequest.method, - status: jsonRpcResponse?.result?.reply?.status?.code || 0, - }); - } - - const responseBuffer = WsApi.jsonRpcResponseToBuffer(jsonRpcResponse); - return socket.send(responseBuffer); - }); - } - - /** - * This handler returns an interval to ping clients' socket every 30s - * if a pong hasn't received from a socket by the next ping, the server will terminate the socket connection. - */ - #setupHeartbeat(): NodeJS.Timer { - // Sometimes connections between client <-> server can get borked in such a way that - // leaves both unaware of the borkage. ping messages can be used as a means to verify - // that the remote endpoint is still responsive. Server will ping each socket every 30s - // if a pong hasn't received from a socket by the next ping, the server will terminate - // the socket connection - return setInterval(() => { - this.#wsServer.clients.forEach(function (socket) { - if (socket[SOCKET_ISALIVE_SYMBOL] === false) { - return socket.terminate(); - } - - socket[SOCKET_ISALIVE_SYMBOL] = false; - socket.ping(); - }); - }, HEARTBEAT_INTERVAL); - } - /** * Handler for starting a WebSocket. * Sets listeners for `connection`, `close` events. - * It clears `heartbeatInterval` when a `close` event is made. */ #setupWebSocket(): void { - this.#wsServer.on('connection', this.#handleConnection.bind(this)); - - const heartbeatInterval = this.#setupHeartbeat(); - - this.#wsServer.on('close', function close() { - clearInterval(heartbeatInterval); - }); + this.#wsServer.on('connection', (socket, request) => this.#connectionManager.connect(socket, request)); + this.#wsServer.on('close', () => this.#connectionManager.closeAll()); } - start(callback?: () => void): WebSocketServer { + start(): WebSocketServer { this.#setupWebSocket(); - callback?.(); return this.#wsServer; } - close(): void { + async close(): Promise { this.#wsServer.close(); - } - - static jsonRpcResponseToBuffer(jsonRpcResponse: JsonRpcResponse): Buffer { - const str = JSON.stringify(jsonRpcResponse); - return Buffer.from(str); + await this.#connectionManager.closeAll(); } } diff --git a/tests/connection/connection-manager.spec.ts b/tests/connection/connection-manager.spec.ts new file mode 100644 index 0000000..ba6850b --- /dev/null +++ b/tests/connection/connection-manager.spec.ts @@ -0,0 +1,60 @@ +import type { Dwn } from '@tbd54566975/dwn-sdk-js'; + +import chaiAsPromised from 'chai-as-promised'; +import chai, { expect } from 'chai'; + +import sinon from 'sinon'; +import { getTestDwn } from '../test-dwn.js'; +import { InMemoryConnectionManager } from '../../src/connection/connection-manager.js'; +import { config } from '../../src/config.js'; +import { WsApi } from '../../src/ws-api.js'; +import type { Server } from 'http'; +import { HttpApi } from '../../src/http-api.js'; +import { JsonRpcSocket } from '../../src/json-rpc-socket.js'; + +chai.use(chaiAsPromised); + +describe('InMemoryConnectionManager', () => { + let dwn: Dwn; + let connectionManager: InMemoryConnectionManager; + let server: Server + let wsApi: WsApi; + + beforeEach(async () => { + dwn = await getTestDwn({ withEvents: true }); + connectionManager = new InMemoryConnectionManager(dwn); + const httpApi = new HttpApi(config, dwn); + server = await httpApi.start(9002); + wsApi = new WsApi(server, dwn, connectionManager); + wsApi.start(); + }); + + afterEach(async () => { + await connectionManager.closeAll(); + await dwn.close(); + await wsApi.close(); + server.close(); + server.closeAllConnections(); + sinon.restore(); + }); + + it('adds connection to the connections and removes it if that connection is closed', async () => { + const connection = await JsonRpcSocket.connect('ws://127.0.0.1:9002'); + expect((connectionManager as any).connections.size).to.equal(1); + connection.close(); + + await new Promise((resolve) => setTimeout(resolve, 5)); // wait for close event to be fired + expect((connectionManager as any).connections.size).to.equal(0); + }); + + it('closes all connections on `closeAll`', async () => { + await JsonRpcSocket.connect('ws://127.0.0.1:9002'); + expect((connectionManager as any).connections.size).to.equal(1); + + await JsonRpcSocket.connect('ws://127.0.0.1:9002'); + expect((connectionManager as any).connections.size).to.equal(2); + + await connectionManager.closeAll(); + expect((connectionManager as any).connections.size).to.equal(0); + }); +}); \ No newline at end of file diff --git a/tests/connection/socket-connection.spec.ts b/tests/connection/socket-connection.spec.ts new file mode 100644 index 0000000..3f033a5 --- /dev/null +++ b/tests/connection/socket-connection.spec.ts @@ -0,0 +1,159 @@ +import type { Dwn } from '@tbd54566975/dwn-sdk-js'; + +import chaiAsPromised from 'chai-as-promised'; +import chai, { expect } from 'chai'; + +import sinon from 'sinon'; +import { WebSocket } from 'ws'; +import { SocketConnection } from '../../src/connection/socket-connection.js'; +import { getTestDwn } from '../test-dwn.js'; +import log from 'loglevel'; + +chai.use(chaiAsPromised); + +describe('SocketConnection', () => { + let dwn: Dwn; + + before(async () => { + dwn = await getTestDwn(); + }); + + after(async () => { + await dwn.close(); + sinon.restore(); + }); + + it('should assign socket handlers', async () => { + const socket = sinon.createStubInstance(WebSocket); + const connection = new SocketConnection(socket, dwn); + expect(socket.on.callCount).to.equal(4); + expect(socket.on.args.map(arg => arg[0])).to.have.members(['message', 'close', 'error', 'pong']); + await connection.close(); + }); + + it('should add a subscription to the subscription manager map', async () => { + const socket = sinon.createStubInstance(WebSocket); + const connection = new SocketConnection(socket, dwn); + const subscriptionRequest = { + id: 'id', + method: 'method', + params: { param1: 'param' }, + close: async ():Promise => {} + } + + await connection.addSubscription(subscriptionRequest); + expect((connection as any).subscriptions.size).to.equal(1); + await connection.close(); + expect((connection as any).subscriptions.size).to.equal(0); + }); + + it('should reject a subscription with an Id of an existing subscription', async () => { + const socket = sinon.createStubInstance(WebSocket); + const connection = new SocketConnection(socket, dwn); + + const id = 'some-id'; + + const subscriptionRequest = { + id, + method: 'method', + params: { param1: 'param' }, + close: async ():Promise => {} + } + + await connection.addSubscription(subscriptionRequest); + expect((connection as any).subscriptions.size).to.equal(1); + + const addDuplicatePromise = connection.addSubscription(subscriptionRequest); + await expect(addDuplicatePromise).to.eventually.be.rejectedWith(`the subscription with id ${id} already exists`); + expect((connection as any).subscriptions.size).to.equal(1); + await connection.close(); + expect((connection as any).subscriptions.size).to.equal(0); + }); + + it('should close a subscription and remove it from the connection manager map', async () => { + const socket = sinon.createStubInstance(WebSocket); + const connection = new SocketConnection(socket, dwn); + + const id = 'some-id'; + + const subscriptionRequest = { + id, + method: 'method', + params: { param1: 'param' }, + close: async ():Promise => {} + } + + await connection.addSubscription(subscriptionRequest); + expect((connection as any).subscriptions.size).to.equal(1); + + await connection.closeSubscription(id); + expect((connection as any).subscriptions.size).to.equal(0); + + const closeAgainPromise = connection.closeSubscription(id); + await expect(closeAgainPromise).to.eventually.be.rejectedWith(`the subscription with id ${id} was not found`); + await connection.close(); + }); + + it('hasSubscription returns whether a subscription with the id already exists', async () => { + const socket = sinon.createStubInstance(WebSocket); + const connection = new SocketConnection(socket, dwn); + const subscriptionRequest = { + id: 'id', + method: 'method', + params: { param1: 'param' }, + close: async ():Promise => {} + } + + await connection.addSubscription(subscriptionRequest); + expect((connection as any).subscriptions.size).to.equal(1); + expect(connection.hasSubscription(subscriptionRequest.id)).to.be.true; + expect(connection.hasSubscription('does-not-exist')).to.be.false; + + await connection.closeSubscription(subscriptionRequest.id); + expect(connection.hasSubscription(subscriptionRequest.id)).to.be.false; + await connection.close(); + }); + + it('should close if pong is not triggered between heartbeat intervals', async () => { + const socket = sinon.createStubInstance(WebSocket); + const clock = sinon.useFakeTimers(); + const connection = new SocketConnection(socket, dwn); + const closeSpy = sinon.spy(connection, 'close'); + + clock.tick(60_100); // interval has to run twice + clock.restore(); + + expect(closeSpy.callCount).to.equal(1); + }); + + it('should not close if pong is called within the heartbeat interval', async () => { + const socket = sinon.createStubInstance(WebSocket); + const clock = sinon.useFakeTimers(); + const connection = new SocketConnection(socket, dwn); + const closeSpy = sinon.spy(connection, 'close'); + + (connection as any).pong(); // trigger a pong + clock.tick(30_100); // first interval + + (connection as any).pong(); // trigger a pong + clock.tick(30_100); // second interval + + expect(closeSpy.callCount).to.equal(0); + + clock.tick(30_100); // another interval without a ping + clock.restore(); + expect(closeSpy.callCount).to.equal(1); + }); + + it('logs an error and closes connection if error is triggered', async () => { + const socket = sinon.createStubInstance(WebSocket); + const connection = new SocketConnection(socket, dwn); + const logSpy = sinon.stub(log, 'error'); + const closeSpy = sinon.spy(connection, 'close'); + + (connection as any).error(new Error('some error')); + + expect(logSpy.callCount).to.equal(1); + expect(closeSpy.callCount).to.equal(1); + }); +}); \ No newline at end of file diff --git a/tests/dwn-process-message.spec.ts b/tests/dwn-process-message.spec.ts index 1447eb8..fe4f3d1 100644 --- a/tests/dwn-process-message.spec.ts +++ b/tests/dwn-process-message.spec.ts @@ -1,9 +1,10 @@ import { expect } from 'chai'; +import sinon from 'sinon'; import { v4 as uuidv4 } from 'uuid'; import { handleDwnProcessMessage } from '../src/json-rpc-handlers/dwn/process-message.js'; import type { RequestContext } from '../src/lib/json-rpc-router.js'; -import { createJsonRpcRequest } from '../src/lib/json-rpc.js'; +import { JsonRpcErrorCodes, createJsonRpcRequest } from '../src/lib/json-rpc.js'; import { getTestDwn } from './test-dwn.js'; import { createRecordsWriteMessage } from './utils.js'; import { TestDataGenerator } from '@tbd54566975/dwn-sdk-js'; @@ -32,6 +33,7 @@ describe('handleDwnProcessMessage', function () { const { reply } = jsonRpcResponse.result; expect(reply.status.code).to.equal(202); expect(reply.status.detail).to.equal('Accepted'); + await dwn.close(); }); it('returns a JSON RPC Success Response when DWN returns a 4XX/5XX status code', async function () { @@ -59,5 +61,76 @@ describe('handleDwnProcessMessage', function () { expect(reply.status.detail).to.exist; expect(reply.data).to.be.undefined; expect(reply.entries).to.be.undefined; + await dwn.close(); + }); + + it('should fail if no subscriptionRequest context exists for a `Subscribe` message', async function () { + const requestId = uuidv4(); + const dwnRequest = createJsonRpcRequest(requestId, 'dwn.processMessage', { + message: { + descriptor: { interface: 'Records', method: 'Subscribe' }, + }, + target: 'did:key:abc1234', + }); + + const dwn = await getTestDwn(); + const context: RequestContext = { dwn, transport: 'ws' }; + + const { jsonRpcResponse } = await handleDwnProcessMessage( + dwnRequest, + context, + ); + + expect(jsonRpcResponse.error).to.exist; + expect(jsonRpcResponse.error.code).to.equal(JsonRpcErrorCodes.InvalidRequest); + expect(jsonRpcResponse.error.message).to.equal('subscribe methods must contain a subscriptionRequest context'); + await dwn.close(); + }); + + it('should fail on http requests for a `Subscribe` message', async function () { + const requestId = uuidv4(); + const dwnRequest = createJsonRpcRequest(requestId, 'dwn.processMessage', { + message: { + descriptor: { interface: 'Records', method: 'Subscribe' }, + }, + target: 'did:key:abc1234', + }); + + const dwn = await getTestDwn(); + const context: RequestContext = { dwn, transport: 'http', subscriptionRequest: { id: 'test', subscriptionHandler: () => {}} }; + + const { jsonRpcResponse } = await handleDwnProcessMessage( + dwnRequest, + context, + ); + + expect(jsonRpcResponse.error).to.exist; + expect(jsonRpcResponse.error.code).to.equal(JsonRpcErrorCodes.InvalidParams); + expect(jsonRpcResponse.error.message).to.equal('subscriptions are not supported via http'); + await dwn.close(); + }); + + it('should return a JsonRpc Internal Error for an unexpected thrown error within the handler', async function () { + const requestId = uuidv4(); + const dwnRequest = createJsonRpcRequest(requestId, 'dwn.processMessage', { + message: { + descriptor: { interface: 'Records' }, + }, + target: 'did:key:abc1234', + }); + + const dwn = await getTestDwn(); + sinon.stub(dwn, 'processMessage').throws(new Error('unexpected error')); + const context: RequestContext = { dwn, transport: 'http' }; + + const { jsonRpcResponse } = await handleDwnProcessMessage( + dwnRequest, + context, + ); + + expect(jsonRpcResponse.error).to.exist; + expect(jsonRpcResponse.error.code).to.equal(JsonRpcErrorCodes.InternalError); + expect(jsonRpcResponse.error.message).to.equal('unexpected error'); + await dwn.close(); }); }); diff --git a/tests/dwn-server.spec.ts b/tests/dwn-server.spec.ts index 0cc2888..9c79327 100644 --- a/tests/dwn-server.spec.ts +++ b/tests/dwn-server.spec.ts @@ -1,3 +1,5 @@ +import type { Dwn } from '@tbd54566975/dwn-sdk-js'; + import { expect } from 'chai'; import { config } from '../src/config.js'; @@ -6,15 +8,50 @@ import { getTestDwn } from './test-dwn.js'; describe('DwnServer', function () { const dwnServerConfig = { ...config }; + let dwn: Dwn; - it('starts with injected dwn', async function () { - const testDwn = await getTestDwn(); + dwn = await getTestDwn(); - const dwnServer = new DwnServer({ config: dwnServerConfig, dwn: testDwn }); + const dwnServer = new DwnServer({ config: dwnServerConfig, dwn }); await dwnServer.start(); dwnServer.stop(() => console.log('server Stop')); expect(dwnServer.httpServer.listening).to.be.false; }); + + describe('webSocketServerEnabled config', function() { + it('should not return a websocket server if disabled', async function() { + dwn = await getTestDwn({ withEvents: true }); + const withoutSocketServer = new DwnServer({ + dwn, + config: { + ...dwnServerConfig, + webSocketServerEnabled: false, + } + }); + + await withoutSocketServer.start(); + expect(withoutSocketServer.httpServer.listening).to.be.true; + expect(withoutSocketServer.wsServer).to.be.undefined; + withoutSocketServer.stop(() => console.log('server Stop')); + expect(withoutSocketServer.httpServer.listening).to.be.false; + }); + + it('should return a websocket server if enabled', async function() { + dwn = await getTestDwn({ withEvents: true }); + const withSocketServer = new DwnServer({ + dwn, + config: { + ...dwnServerConfig, + webSocketServerEnabled: true, + } + }); + + await withSocketServer.start(); + expect(withSocketServer.wsServer).to.not.be.undefined; + withSocketServer.stop(() => console.log('server Stop')); + expect(withSocketServer.httpServer.listening).to.be.false; + }); + }); }); diff --git a/tests/http-api.spec.ts b/tests/http-api.spec.ts index bb9a22b..a64121e 100644 --- a/tests/http-api.spec.ts +++ b/tests/http-api.spec.ts @@ -62,7 +62,7 @@ describe('http api', function () { const proofOfWorkInitialMaximumAllowedHash = config.registrationProofOfWorkInitialMaxHash; registrationManager = await RegistrationManager.create({ registrationStoreUrl, termsOfServiceFilePath, proofOfWorkInitialMaximumAllowedHash }); - dwn = await getTestDwn(registrationManager); + dwn = await getTestDwn({ tenantGate: registrationManager }); httpApi = new HttpApi(config, dwn, registrationManager); diff --git a/tests/json-rpc-socket.spec.ts b/tests/json-rpc-socket.spec.ts new file mode 100644 index 0000000..6f1c4c4 --- /dev/null +++ b/tests/json-rpc-socket.spec.ts @@ -0,0 +1,234 @@ +import chaiAsPromised from 'chai-as-promised'; +import chai, { expect } from 'chai'; + +import { v4 as uuidv4 } from 'uuid'; +import sinon from 'sinon'; +import { WebSocketServer } from 'ws'; + +import type { JsonRpcId, JsonRpcRequest, JsonRpcSuccessResponse } from '../src/lib/json-rpc.js'; + +import { JsonRpcSocket } from '../src/json-rpc-socket.js'; +import { JsonRpcErrorCodes, createJsonRpcErrorResponse, createJsonRpcRequest, createJsonRpcSubscriptionRequest, createJsonRpcSuccessResponse } from '../src/lib/json-rpc.js'; +import log from 'loglevel'; + +chai.use(chaiAsPromised); + +describe('JsonRpcSocket', () => { + let wsServer: WebSocketServer; + + before(async () => { + wsServer = new WebSocketServer({ + port: 9003, + }); + }); + + beforeEach(() => { + wsServer.removeAllListeners(); + }); + + it('connects to a url', async () => { + const client = await JsonRpcSocket.connect('ws://127.0.0.1:9003'); + expect(wsServer.clients.size).to.equal(1); + client.close(); + + // give time for the connection to close on the server. + await new Promise((resolve) => setTimeout(resolve, 5)); + expect(wsServer.clients.size).to.equal(0); + }); + + it('resolves a request with given params', async () => { + wsServer.addListener('connection', (socket) => { + socket.on('message', (dataBuffer: Buffer) => { + const request = JSON.parse(dataBuffer.toString()) as JsonRpcRequest; + const { param1, param2 } = request.params; + expect(param1).to.equal('test-param1'); + expect(param2).to.equal('test-param2'); + + // send response passed tests + const response = createJsonRpcSuccessResponse(request.id, {}); + socket.send(Buffer.from(JSON.stringify(response))); + }); + }); + + const client = await JsonRpcSocket.connect('ws://127.0.0.1:9003'); + const requestId = uuidv4(); + const request = createJsonRpcRequest(requestId, 'test.method', { param1: 'test-param1', param2: 'test-param2' }); + const response = await client.request(request); + expect(response.id).to.equal(request.id); + }); + + it('request times out', async () => { + // time out after 1 ms + const client = await JsonRpcSocket.connect('ws://127.0.0.1:9003', { responseTimeout: 1 }); + const requestId = uuidv4(); + const request = createJsonRpcRequest(requestId, 'test.method', { param1: 'test-param1', param2: 'test-param2' }); + const requestPromise = client.request(request); + + await expect(requestPromise).to.eventually.be.rejectedWith('timed out'); + }); + + it('opens a subscription', async () => { + wsServer.addListener('connection', (socket) => { + socket.on('message', (dataBuffer: Buffer) => { + const request = JSON.parse(dataBuffer.toString()) as JsonRpcRequest; + // initial response + const response = createJsonRpcSuccessResponse(request.id, { reply: {} }) + socket.send(Buffer.from(JSON.stringify(response))); + const { subscription } = request; + // send 3 messages + for (let i = 0; i < 3; i++) { + const response = createJsonRpcSuccessResponse(subscription.id, { count: i }); + socket.send(Buffer.from(JSON.stringify(response))); + } + }); + }); + + const client = await JsonRpcSocket.connect('ws://127.0.0.1:9003', { responseTimeout: 5 }); + const requestId = uuidv4(); + const subscribeId = uuidv4(); + const request = createJsonRpcSubscriptionRequest( + requestId, + 'rpc.subscribe.test.method', + { param1: 'test-param1', param2: 'test-param2' }, + subscribeId, + ); + + let responseCounter = 0; + const responseListener = (response: JsonRpcSuccessResponse): void => { + expect(response.id).to.equal(subscribeId); + const { count } = response.result; + expect(count).to.equal(responseCounter); + responseCounter++; + } + + const subscription = await client.subscribe(request, responseListener); + expect(subscription.response.error).to.be.undefined; + // wait for the messages to arrive + await new Promise((resolve) => setTimeout(resolve, 5)); + // the original response + expect(responseCounter).to.equal(3); + await subscription.close(); + }); + + it('sends message', async () => { + const receivedPromise = new Promise<{ reply: { id?: JsonRpcId }}>((resolve) => { + wsServer.addListener('connection', (socket) => { + socket.on('message', (dataBuffer: Buffer) => { + const request = JSON.parse(dataBuffer.toString()) as JsonRpcRequest; + const { param1, param2 } = request.params; + expect(param1).to.equal('test-param1'); + expect(param2).to.equal('test-param2'); + resolve({ reply: { id: request.id }}); + }); + }); + }); + const client = await JsonRpcSocket.connect('ws://127.0.0.1:9003'); + const requestId = uuidv4(); + const request = createJsonRpcRequest(requestId, 'test.method', { param1: 'test-param1', param2: 'test-param2' }); + client.send(request); + await expect(receivedPromise).to.eventually.eql({ reply: { id: request.id }}); + }); + + it('closes subscription upon receiving a JsonRpc Error for a long running subscription', async () => { + let closed = true; + wsServer.addListener('connection', (socket) => { + closed = false; + socket.on('message', (dataBuffer: Buffer) => { + const request = JSON.parse(dataBuffer.toString()) as JsonRpcRequest; + if (request.method.startsWith('rpc.subscribe') && request.method !== 'rpc.subscribe.close') { + // initial response + const response = createJsonRpcSuccessResponse(request.id, { reply: {} }) + socket.send(Buffer.from(JSON.stringify(response))); + const { subscription } = request; + + // send 1 valid message + const message1 = createJsonRpcSuccessResponse(subscription.id, { message: 1 }); + socket.send(Buffer.from(JSON.stringify(message1))); + + // send a json rpc error + const jsonRpcError = createJsonRpcErrorResponse(subscription.id, JsonRpcErrorCodes.InternalError, 'some error'); + socket.send(Buffer.from(JSON.stringify(jsonRpcError))); + + // send a 2nd message that shouldn't be handled + const message2 = createJsonRpcSuccessResponse(subscription.id, { message: 2 }); + socket.send(Buffer.from(JSON.stringify(message2))); + } else if (request.method === 'rpc.subscribe.close') { + closed = true; + } + }); + }); + + const client = await JsonRpcSocket.connect('ws://127.0.0.1:9003', { responseTimeout: 5 }); + const requestId = uuidv4(); + const subscribeId = uuidv4(); + const request = createJsonRpcSubscriptionRequest( + requestId, + 'rpc.subscribe.test.method', + { param1: 'test-param1', param2: 'test-param2' }, + subscribeId, + ); + + let responseCounter = 0; + let errorCounter = 0; + const responseListener = (response: JsonRpcSuccessResponse): void => { + expect(response.id).to.equal(subscribeId); + if (response.error) { + errorCounter++; + } + + if (response.result) { + responseCounter++; + } + } + + const subscription = await client.subscribe(request, responseListener); + expect(subscription.response.error).to.be.undefined; + // wait for the messages to arrive + await new Promise((resolve) => setTimeout(resolve, 5)); + // the original response + expect(responseCounter).to.equal(1); + expect(errorCounter).to.equal(1); + expect(closed).to.equal(true); + }); + + it('only JSON RPC Methods prefixed with `rpc.subscribe.` are accepted for a subscription', async () => { + const client = await JsonRpcSocket.connect('ws://127.0.0.1:9003'); + const requestId = uuidv4(); + const request = createJsonRpcRequest(requestId, 'test.method', { param1: 'test-param1', param2: 'test-param2' }); + const subscribePromise = client.subscribe(request, () => {}); + await expect(subscribePromise).to.eventually.be.rejectedWith('subscribe rpc requests must include the `rpc.subscribe` prefix'); + }); + + it('subscribe methods must contain a subscribe object within the request which contains the subscription JsonRpcId', async () => { + const client = await JsonRpcSocket.connect('ws://127.0.0.1:9003'); + const requestId = uuidv4(); + const request = createJsonRpcRequest(requestId, 'rpc.subscribe.test.method', { param1: 'test-param1', param2: 'test-param2' }); + const subscribePromise = client.subscribe(request, () => {}); + await expect(subscribePromise).to.eventually.be.rejectedWith('subscribe rpc requests must include subscribe options'); + }); + + it('calls onclose handler', async () => { + // test injected handler + const onCloseHandler = { onclose: ():void => {} }; + const onCloseSpy = sinon.spy(onCloseHandler, 'onclose'); + const client = await JsonRpcSocket.connect('ws://127.0.0.1:9003', { onclose: onCloseHandler.onclose }); + client.close(); + + await new Promise((resolve) => setTimeout(resolve, 5)); // wait for close event to arrive + expect(onCloseSpy.callCount).to.equal(1); + + // test default logger + const logInfoSpy = sinon.spy(log, 'info'); + const defaultClient = await JsonRpcSocket.connect('ws://127.0.0.1:9003'); + defaultClient.close(); + + await new Promise((resolve) => setTimeout(resolve, 5)); // wait for close event to arrive + expect(logInfoSpy.callCount).to.equal(1); + + // extract log message from argument + const logMessage:string = logInfoSpy.args[0][0]!; + expect(logMessage).to.equal('JSON RPC Socket close ws://127.0.0.1:9003'); + }); + + xit('calls onerror handler', async () => {}); +}); diff --git a/tests/registration/proof-of-work-manager.spec.ts b/tests/registration/proof-of-work-manager.spec.ts index 06354a2..986b6df 100644 --- a/tests/registration/proof-of-work-manager.spec.ts +++ b/tests/registration/proof-of-work-manager.spec.ts @@ -46,8 +46,8 @@ describe('ProofOfWorkManager', function () { } }; - const challengeNonceRefreshSpy = sinon.stub(proofOfWorkManager, 'refreshChallengeNonce').callsFake(stub); - const maximumAllowedHashValueRefreshSpy = sinon.stub(proofOfWorkManager, 'refreshMaximumAllowedHashValue').callsFake(stub); + const challengeNonceRefreshSpy = sinon.stub(proofOfWorkManager as any, 'refreshChallengeNonce').callsFake(stub); + const maximumAllowedHashValueRefreshSpy = sinon.stub(proofOfWorkManager as any, 'refreshMaximumAllowedHashValue').callsFake(stub); clock.tick(60 * 60 * 1000); diff --git a/tests/rpc-subscribe-close.spec.ts b/tests/rpc-subscribe-close.spec.ts new file mode 100644 index 0000000..5877387 --- /dev/null +++ b/tests/rpc-subscribe-close.spec.ts @@ -0,0 +1,126 @@ +import { expect } from 'chai'; +import sinon from 'sinon'; +import { v4 as uuidv4 } from 'uuid'; + +import type { RequestContext } from '../src/lib/json-rpc-router.js'; +import { JsonRpcErrorCodes, createJsonRpcRequest, createJsonRpcSubscriptionRequest } from '../src/lib/json-rpc.js'; +import { getTestDwn } from './test-dwn.js'; +import { handleSubscriptionsClose } from '../src/json-rpc-handlers/subscription/close.js'; +import { SocketConnection } from '../src/connection/socket-connection.js'; +import { DwnServerError, DwnServerErrorCode } from '../src/dwn-error.js'; + +describe('handleDwnProcessMessage', function () { + it('should return an error if no socket connection exists', async function () { + const requestId = uuidv4(); + const dwnRequest = createJsonRpcRequest(requestId, 'rpc.subscribe.close', { }); + + const dwn = await getTestDwn(); + const context: RequestContext = { dwn, transport: 'ws' }; + + const { jsonRpcResponse } = await handleSubscriptionsClose( + dwnRequest, + context, + ); + + expect(jsonRpcResponse.error).to.exist; + expect(jsonRpcResponse.error.code).to.equal(JsonRpcErrorCodes.InvalidRequest); + expect(jsonRpcResponse.error.message).to.equal('socket connection does not exist'); + }); + + it('should return an error if no subscribe options exist', async function () { + const requestId = uuidv4(); + const dwnRequest = createJsonRpcRequest(requestId, 'rpc.subscribe.close', { }); + const socketConnection = sinon.createStubInstance(SocketConnection); + + const dwn = await getTestDwn(); + const context: RequestContext = { dwn, transport: 'ws', socketConnection }; + + const { jsonRpcResponse } = await handleSubscriptionsClose( + dwnRequest, + context, + ); + + expect(jsonRpcResponse.error).to.exist; + expect(jsonRpcResponse.error.code).to.equal(JsonRpcErrorCodes.InvalidRequest); + expect(jsonRpcResponse.error.message).to.equal('subscribe options do not exist'); + }); + + it('should return an error if close subscription throws ConnectionSubscriptionJsonRpcIdNotFound', async function () { + const requestId = uuidv4(); + const id = 'some-id'; + const dwnRequest = createJsonRpcSubscriptionRequest(requestId, 'rpc.subscribe.close', {}, id); + const socketConnection = sinon.createStubInstance(SocketConnection); + socketConnection.closeSubscription.throws(new DwnServerError( + DwnServerErrorCode.ConnectionSubscriptionJsonRpcIdNotFound, + '' + )); + + const dwn = await getTestDwn(); + const context: RequestContext = { dwn, transport: 'ws', socketConnection }; + + const { jsonRpcResponse } = await handleSubscriptionsClose( + dwnRequest, + context, + ); + + expect(jsonRpcResponse.error).to.exist; + expect(jsonRpcResponse.error.code).to.equal(JsonRpcErrorCodes.InvalidParams); + expect(jsonRpcResponse.error.message).to.equal(`subscription ${id} does not exist.`); + }); + + it('should return an error if close subscription throws ConnectionSubscriptionJsonRpcIdNotFound', async function () { + const requestId = uuidv4(); + const id = 'some-id'; + const dwnRequest = createJsonRpcSubscriptionRequest(requestId, 'rpc.subscribe.close', {}, id); + const socketConnection = sinon.createStubInstance(SocketConnection); + socketConnection.closeSubscription.throws(new Error('unknown error')); + + const dwn = await getTestDwn(); + const context: RequestContext = { dwn, transport: 'ws', socketConnection }; + + const { jsonRpcResponse } = await handleSubscriptionsClose( + dwnRequest, + context, + ); + + expect(jsonRpcResponse.error).to.exist; + expect(jsonRpcResponse.error.code).to.equal(JsonRpcErrorCodes.InternalError); + expect(jsonRpcResponse.error.message).to.equal(`unknown subscription close error for ${id}: unknown error`); + }); + + it('should return a success', async function () { + const requestId = uuidv4(); + const id = 'some-id'; + const dwnRequest = createJsonRpcSubscriptionRequest(requestId, 'rpc.subscribe.close', {}, id); + const socketConnection = sinon.createStubInstance(SocketConnection); + + const dwn = await getTestDwn(); + const context: RequestContext = { dwn, transport: 'ws', socketConnection }; + + const { jsonRpcResponse } = await handleSubscriptionsClose( + dwnRequest, + context, + ); + expect(jsonRpcResponse.error).to.not.exist; + }); + + it('handler should generate a request Id if one is not provided with the request', async function () { + const requestId = uuidv4(); + const id = 'some-id'; + const dwnRequest = createJsonRpcSubscriptionRequest(requestId, 'rpc.subscribe.close', {}, id); + delete dwnRequest.id; // delete request id + + const socketConnection = sinon.createStubInstance(SocketConnection); + + const dwn = await getTestDwn(); + const context: RequestContext = { dwn, transport: 'ws', socketConnection }; + + const { jsonRpcResponse } = await handleSubscriptionsClose( + dwnRequest, + context, + ); + expect(jsonRpcResponse.error).to.not.exist; + expect(jsonRpcResponse.id).to.exist; + expect(jsonRpcResponse.id).to.not.equal(id); + }); +}); diff --git a/tests/test-dwn.ts b/tests/test-dwn.ts index b286c68..713fa55 100644 --- a/tests/test-dwn.ts +++ b/tests/test-dwn.ts @@ -1,5 +1,5 @@ import type { TenantGate } from '@tbd54566975/dwn-sdk-js'; -import { Dwn } from '@tbd54566975/dwn-sdk-js'; +import { Dwn, EventEmitterStream } from '@tbd54566975/dwn-sdk-js'; import { DataStoreSql, EventLogSql, @@ -9,26 +9,29 @@ import { import { getDialectFromURI } from '../src/storage.js'; import { DidDht, DidIon, DidKey, DidResolver } from '@web5/dids'; -export async function getTestDwn( - tenantGate?: TenantGate -): Promise { +export async function getTestDwn(options: { + tenantGate?: TenantGate, + withEvents?: boolean, +} = {}): Promise { + const { tenantGate, withEvents = false } = options; + const db = getDialectFromURI(new URL('sqlite://')); + const dataStore = new DataStoreSql(db); + const eventLog = new EventLogSql(db); + const messageStore = new MessageStoreSql(db); + const eventStream = withEvents ? new EventEmitterStream() : undefined; // NOTE: no resolver cache used here to avoid locking LevelDB const didResolver = new DidResolver({ didResolvers : [DidDht, DidIon, DidKey], }); - const db = getDialectFromURI(new URL('sqlite://')); - const dataStore = new DataStoreSql(db); - const eventLog = new EventLogSql(db); - const messageStore = new MessageStoreSql(db); - let dwn: Dwn; try { dwn = await Dwn.create({ eventLog, dataStore, messageStore, + eventStream, tenantGate, didResolver }); diff --git a/tests/utils.ts b/tests/utils.ts index 3ded3c4..af10b0a 100644 --- a/tests/utils.ts +++ b/tests/utils.ts @@ -1,14 +1,19 @@ -import type { Persona } from '@tbd54566975/dwn-sdk-js'; +import type { GenericMessage, Persona, UnionMessageReply } from '@tbd54566975/dwn-sdk-js'; import { Cid, DataStream, RecordsWrite } from '@tbd54566975/dwn-sdk-js'; import type { ReadStream } from 'node:fs'; import fs from 'node:fs'; import http from 'node:http'; import path from 'path'; +import { v4 as uuidv4 } from 'uuid'; +import fetch from 'node-fetch'; import type { Readable } from 'readable-stream'; import { fileURLToPath } from 'url'; import { WebSocket } from 'ws'; +import type { JsonRpcResponse } from '../src/lib/json-rpc.js'; +import { createJsonRpcRequest } from '../src/lib/json-rpc.js'; + // __filename and __dirname are not defined in ES module scope const __filename = fileURLToPath(import.meta.url); const __dirname = path.dirname(__filename); @@ -139,6 +144,65 @@ export function streamHttpRequest( }); } +export async function sendHttpMessage(options: { + url: string, + target: string, + message: GenericMessage, + data?: any, +}): Promise { + const { url, target, message, data } = options; + // First RecordsWrite that creates the record. + const requestId = uuidv4(); + const jsonRpcRequest = createJsonRpcRequest(requestId, 'dwn.processMessage', { + target, + message, + }); + + const fetchOpts = { + method : 'POST', + headers : { + 'dwn-request': JSON.stringify(jsonRpcRequest) + } + }; + + if (data !== undefined) { + fetchOpts.headers['content-type'] = 'application/octet-stream'; + fetchOpts['body'] = data; + } + + const resp = await fetch(url, fetchOpts); + let dwnRpcResponse: JsonRpcResponse; + + // check to see if response is in header first. if it is, that means the response is a ReadableStream + let dataStream; + const { headers } = resp; + if (headers.has('dwn-response')) { + const jsonRpcResponse = JSON.parse(headers.get('dwn-response')) as JsonRpcResponse; + + if (jsonRpcResponse == null) { + throw new Error(`failed to parse json rpc response. dwn url: ${url}`); + } + + dataStream = resp.body; + dwnRpcResponse = jsonRpcResponse; + } else { + const responseBody = await resp.text(); + dwnRpcResponse = JSON.parse(responseBody); + } + + if (dwnRpcResponse.error) { + const { code, message } = dwnRpcResponse.error; + throw new Error(`(${code}) - ${message}`); + } + + const { reply } = dwnRpcResponse.result; + if (dataStream) { + reply['record']['data'] = dataStream; + } + + return reply as UnionMessageReply; +} + export async function sendWsMessage( address: string, message: any, diff --git a/tests/ws-api.spec.ts b/tests/ws-api.spec.ts index e2b338f..34a2150 100644 --- a/tests/ws-api.spec.ts +++ b/tests/ws-api.spec.ts @@ -1,36 +1,56 @@ -import { DataStream, TestDataGenerator } from '@tbd54566975/dwn-sdk-js'; + +import type { Dwn, MessageEvent } from '@tbd54566975/dwn-sdk-js'; +import { DataStream, Message, TestDataGenerator } from '@tbd54566975/dwn-sdk-js'; + +import type { Server } from 'http'; import { expect } from 'chai'; import { base64url } from 'multiformats/bases/base64'; -import http from 'node:http'; +import type { SinonFakeTimers } from 'sinon'; +import { useFakeTimers } from 'sinon'; import { v4 as uuidv4 } from 'uuid'; -import { type WebSocketServer } from 'ws'; import { createJsonRpcRequest, + createJsonRpcSubscriptionRequest, JsonRpcErrorCodes, } from '../src/lib/json-rpc.js'; +import { config } from '../src/config.js'; import { WsApi } from '../src/ws-api.js'; import { getTestDwn } from './test-dwn.js'; -import { createRecordsWriteMessage, sendWsMessage } from './utils.js'; +import { createRecordsWriteMessage, sendWsMessage, sendHttpMessage } from './utils.js'; +import { HttpApi } from '../src/http-api.js'; +import { JsonRpcSocket } from '../src/json-rpc-socket.js'; -let server: http.Server; -let wsServer: WebSocketServer; describe('websocket api', function () { - before(async function () { - server = http.createServer(); - server.listen(9002, '127.0.0.1'); + let server: Server; + let httpApi: HttpApi; + let wsApi: WsApi; + let dwn: Dwn; + let clock: SinonFakeTimers; + + before(() => { + clock = useFakeTimers({ shouldAdvanceTime: true }); + }); - const testDwn = await getTestDwn(); - const wsApi = new WsApi(server, testDwn); - wsServer = wsApi.start(); + after(() => { + clock.restore(); }); - after(function () { - wsServer.close(); + beforeEach(async function () { + dwn = await getTestDwn({ withEvents: true }); + httpApi = new HttpApi(config, dwn); + server = await httpApi.start(9002); + wsApi = new WsApi(server, dwn); + wsApi.start(); + }); + + afterEach(async function () { + await wsApi.close(); server.close(); server.closeAllConnections(); + await dwn.close(); }); it('returns an error response if no request payload is provided', async function () { @@ -52,7 +72,7 @@ describe('websocket api', function () { expect(resp.error.message).to.include('JSON'); }); - it('handles RecordsWrite messages', async function () { + it('RecordsWrite messages are not supported', async function () { const alice = await TestDataGenerator.generateDidKeyPersona(); const { recordsWrite, dataStream } = await createRecordsWriteMessage(alice); @@ -66,16 +86,326 @@ describe('websocket api', function () { encodedData, }); - const data = await sendWsMessage( - 'ws://127.0.0.1:9002', - JSON.stringify(dwnRequest), - ); - const resp = JSON.parse(data.toString()); - expect(resp.id).to.equal(requestId); - console.log(resp.error); - expect(resp.error).to.not.exist; + const connection = await JsonRpcSocket.connect('ws://127.0.0.1:9002'); + const response = await connection.request(dwnRequest); + + expect(response.id).to.equal(requestId); + expect(response.error).to.not.be.undefined; + expect(response.error.code).to.equal(JsonRpcErrorCodes.InvalidParams); + expect(response.error.message).to.include('RecordsWrite is not supported via ws'); + }); + + it('subscribes to records and receives updates', async () => { + const alice = await TestDataGenerator.generateDidKeyPersona(); + + const { message } = await TestDataGenerator.generateRecordsSubscribe({ + author: alice, + filter: { + schema: 'foo/bar' + } + }); + + const records: string[] = []; + const subscriptionHandler = async (event: MessageEvent): Promise => { + const { message } = event + records.push(await Message.getCid(message)); + }; + + const requestId = uuidv4(); + const dwnRequest = createJsonRpcSubscriptionRequest(requestId, 'rpc.subscribe.dwn.processMessage', { + message: message, + target: alice.did, + }); + + const connection = await JsonRpcSocket.connect('ws://127.0.0.1:9002'); + const { response, close } = await connection.subscribe(dwnRequest, (response) => { + const { event } = response.result; + subscriptionHandler(event); + }); + + expect(response.error).to.be.undefined; + expect(response.result.reply.status.code).to.equal(200); + expect(close).to.not.be.undefined; + + const write1Message = await TestDataGenerator.generateRecordsWrite({ + author : alice, + schema : 'foo/bar', + dataFormat : 'text/plain' + }); + + const writeResult1 = await sendHttpMessage({ + url : 'http://localhost:9002', + target : alice.did, + message : write1Message.message, + data : write1Message.dataBytes, + }); + expect(writeResult1.status.code).to.equal(202); + + const write2Message = await TestDataGenerator.generateRecordsWrite({ + author : alice, + schema : 'foo/bar', + dataFormat : 'text/plain' + }); + + const writeResult2 = await sendHttpMessage({ + url : 'http://localhost:9002', + target : alice.did, + message : write2Message.message, + data : write2Message.dataBytes, + }) + expect(writeResult2.status.code).to.equal(202); + + // close the subscription + await close(); + + await new Promise(resolve => setTimeout(resolve, 5)); // wait for records to be processed + expect(records).to.have.members([ + await Message.getCid(write1Message.message), + await Message.getCid(write2Message.message) + ]); + }); + + it('stops receiving updates when subscription is closed', async () => { + const alice = await TestDataGenerator.generateDidKeyPersona(); + + const { message } = await TestDataGenerator.generateRecordsSubscribe({ + author: alice, + filter: { + schema: 'foo/bar' + } + }); + + const records: string[] = []; + const subscriptionHandler = async (event: MessageEvent): Promise => { + const { message } = event; + records.push(await Message.getCid(message)); + }; + + const requestId = uuidv4(); + const subscribeId = uuidv4(); + const dwnRequest = createJsonRpcSubscriptionRequest(requestId, 'rpc.subscribe.dwn.processMessage', { + message: message, + target: alice.did, + }, subscribeId); + + const connection = await JsonRpcSocket.connect('ws://127.0.0.1:9002'); + const { response, close } = await connection.subscribe(dwnRequest, (response) => { + const { event } = response.result; + subscriptionHandler(event); + }); + + expect(response.error).to.be.undefined; + expect(response.result.reply.status.code).to.equal(200); + expect(close).to.not.be.undefined; + + const write1Message = await TestDataGenerator.generateRecordsWrite({ + author : alice, + schema : 'foo/bar', + dataFormat : 'text/plain' + }); + + const writeResult1 = await sendHttpMessage({ + url : 'http://localhost:9002', + target : alice.did, + message : write1Message.message, + data : write1Message.dataBytes, + }); + expect(writeResult1.status.code).to.equal(202); + + // close the subscription after only 1 message + await close(); + + // write more messages that won't show up in the subscription + const write2Message = await TestDataGenerator.generateRecordsWrite({ + author : alice, + schema : 'foo/bar', + dataFormat : 'text/plain' + }); + + const writeResult2 = await sendHttpMessage({ + url : 'http://localhost:9002', + target : alice.did, + message : write2Message.message, + data : write2Message.dataBytes, + }) + expect(writeResult2.status.code).to.equal(202); + + const write3Message = await TestDataGenerator.generateRecordsWrite({ + author : alice, + schema : 'foo/bar', + dataFormat : 'text/plain' + }); + + const writeResult3 = await sendHttpMessage({ + url : 'http://localhost:9002', + target : alice.did, + message : write3Message.message, + data : write3Message.dataBytes, + }) + expect(writeResult3.status.code).to.equal(202); + + await new Promise(resolve => setTimeout(resolve, 5)); // wait for records to be processed + expect(records).to.have.members([ await Message.getCid(write1Message.message) ]); + }); + + it('should fail to add subscription using a `JsonRpcId` that already exists for a subscription in that socket', async () => { + const alice = await TestDataGenerator.generateDidKeyPersona(); + + const { message } = await TestDataGenerator.generateRecordsSubscribe({ + author: alice, + filter: { + schema: 'foo/bar' + } + }); + + const records: string[] = []; + const subscriptionHandler = async (event: MessageEvent): Promise => { + const { message } = event + records.push(await Message.getCid(message)); + }; + + const requestId = uuidv4(); + const subscribeId = uuidv4(); + const dwnRequest = createJsonRpcSubscriptionRequest(requestId, 'rpc.subscribe.dwn.processMessage', { + message: message, + target: alice.did + }, subscribeId); + + const connection = await JsonRpcSocket.connect('ws://127.0.0.1:9002'); + const { close } = await connection.subscribe(dwnRequest, (response) => { + const { event } = response.result; + subscriptionHandler(event); + }); + + const { message: message2 } = await TestDataGenerator.generateRecordsSubscribe({ filter: { schema: 'bar/baz' }, author: alice }); + + // We are checking for the subscription Id not the request Id + const request2Id = uuidv4(); + const dwnRequest2 = createJsonRpcSubscriptionRequest(request2Id, 'rpc.subscribe.dwn.processMessage', { + message: message2, + target: alice.did + }, subscribeId); + + const { response: response2 } = await connection.subscribe(dwnRequest2, (response) => { + const { event } = response.result; + subscriptionHandler(event); + }); + + expect(response2.error.code).to.equal(JsonRpcErrorCodes.InvalidParams); + expect(response2.error.message).to.contain(`${subscribeId} is in use by an active subscription`); + + const write1Message = await TestDataGenerator.generateRecordsWrite({ + author : alice, + schema : 'foo/bar', + dataFormat : 'text/plain' + }); + + const writeResult1 = await sendHttpMessage({ + url : 'http://localhost:9002', + target : alice.did, + message : write1Message.message, + data : write1Message.dataBytes, + }); + expect(writeResult1.status.code).to.equal(202); + + const write2Message = await TestDataGenerator.generateRecordsWrite({ + author : alice, + schema : 'foo/bar', + dataFormat : 'text/plain' + }); + + const writeResult2 = await sendHttpMessage({ + url : 'http://localhost:9002', + target : alice.did, + message : write2Message.message, + data : write2Message.dataBytes, + }) + expect(writeResult2.status.code).to.equal(202); + + // close the subscription + await close(); + + await new Promise(resolve => setTimeout(resolve, 5)); // wait for records to be processed + expect(records).to.have.members([ + await Message.getCid(write1Message.message), + await Message.getCid(write2Message.message) + ]); + }); + + it('should receive an updated message as well as the initial write when subscribing to a record', async () => { + const alice = await TestDataGenerator.generateDidKeyPersona(); + + // write an initial message + const initialWrite = await TestDataGenerator.generateRecordsWrite({ + author : alice, + schema : 'foo/bar', + dataFormat : 'text/plain' + }); + + const writeResult1 = await sendHttpMessage({ + url : 'http://localhost:9002', + target : alice.did, + message : initialWrite.message, + data : initialWrite.dataBytes, + }); + expect(writeResult1.status.code).to.equal(202); + + // subscribe to 'foo/bar' messages + const { message } = await TestDataGenerator.generateRecordsSubscribe({ + author: alice, + filter: { + schema: 'foo/bar' + } + }); + + const records: string[] = []; + const subscriptionHandler = async (event: MessageEvent): Promise => { + const { message, initialWrite } = event + if (initialWrite) { + records.push(await Message.getCid(initialWrite)); + } + records.push(await Message.getCid(message)); + }; + + const requestId = uuidv4(); + const subscribeId = uuidv4(); + const dwnRequest = createJsonRpcSubscriptionRequest(requestId, 'rpc.subscribe.dwn.processMessage', { + message: message, + target: alice.did + }, subscribeId); + + const connection = await JsonRpcSocket.connect('ws://127.0.0.1:9002'); + const { close } = await connection.subscribe(dwnRequest, (response) => { + const { event } = response.result; + subscriptionHandler(event); + }); + + // wait for potential records to process and confirm that initial write has not been processed + await new Promise(resolve => setTimeout(resolve, 5)); + expect(records.length).length.to.equal(0); + + // update the initial message + const updatedMessage = await TestDataGenerator.generateFromRecordsWrite({ + author : alice, + existingWrite : initialWrite.recordsWrite, + }); + + const updateResult = await sendHttpMessage({ + url : 'http://localhost:9002', + target : alice.did, + message : updatedMessage.message, + data : updatedMessage.dataBytes, + }); + expect(updateResult.status.code).to.equal(202); + + await close(); + + await new Promise(resolve => setTimeout(resolve, 5)); // wait for records to be processed - const { reply } = resp.result; - expect(reply.status.code).to.equal(202); + // both initial and update should exist now + expect(records).to.have.members([ + await Message.getCid(initialWrite.message), + await Message.getCid(updatedMessage.message) + ]); }); });