From ac1e6f1eca57026b24bc22d89ac1785a804caed5 Mon Sep 17 00:00:00 2001 From: Liran Cohen Date: Wed, 1 May 2024 10:21:17 -0400 Subject: [PATCH] `@web5/agent` DWN + Web5 RPC Clients (#433) - organize rpc clients into `agent/prototyping/clients` until they have a permanent home - copy `JsonRpcSocket` class from `@web5/dwn-server`, using `isomorphic-ws` to allow for isomorphic socket client. - create `WebSocketDwnRpcClient` and `WebSocketWeb5RpcClient` to support web5 requests over sockets. --- .changeset/proud-bottles-develop.md | 14 + packages/agent/package.json | 1 + packages/agent/src/index.ts | 3 +- .../src/prototyping/clients/dwn-rpc-types.ts | 55 +++ .../clients/http-dwn-rpc-client.ts | 68 ++++ .../prototyping/clients/json-rpc-socket.ts | 169 +++++++++ .../src/{ => prototyping/clients}/json-rpc.ts | 29 +- .../prototyping/clients/web-socket-clients.ts | 100 +++++ packages/agent/src/rpc-client.ts | 131 +------ .../clients/http-dwn-rpc-client.spec.ts | 130 +++++++ .../clients/json-rpc-socket.spec.ts | 279 ++++++++++++++ .../clients/ws-dwn-rpc-client.spec.ts | 351 ++++++++++++++++++ packages/agent/tests/rpc-client.spec.ts | 256 +++++++++++++ packages/agent/tests/utils/runtimes.ts | 3 +- pnpm-lock.yaml | 12 +- 15 files changed, 1467 insertions(+), 134 deletions(-) create mode 100644 .changeset/proud-bottles-develop.md create mode 100644 packages/agent/src/prototyping/clients/dwn-rpc-types.ts create mode 100644 packages/agent/src/prototyping/clients/http-dwn-rpc-client.ts create mode 100644 packages/agent/src/prototyping/clients/json-rpc-socket.ts rename packages/agent/src/{ => prototyping/clients}/json-rpc.ts (81%) create mode 100644 packages/agent/src/prototyping/clients/web-socket-clients.ts create mode 100644 packages/agent/tests/prototyping/clients/http-dwn-rpc-client.spec.ts create mode 100644 packages/agent/tests/prototyping/clients/json-rpc-socket.spec.ts create mode 100644 packages/agent/tests/prototyping/clients/ws-dwn-rpc-client.spec.ts create mode 100644 packages/agent/tests/rpc-client.spec.ts diff --git a/.changeset/proud-bottles-develop.md b/.changeset/proud-bottles-develop.md new file mode 100644 index 000000000..8c6d3476c --- /dev/null +++ b/.changeset/proud-bottles-develop.md @@ -0,0 +1,14 @@ +--- +"@web5/agent": patch +"@web5/identity-agent": patch +"@web5/proxy-agent": patch +"@web5/user-agent": patch +--- + +Extend and Test RPC DWN/Web5 Clients to support `http` and `ws` +- move `HttpDwnRpcClient` to `/prototyping` folder +- move `JSON RPC` related files to `/prototyping` folder +- create `WebSocketDwnRpcClient` in `/prototyping` folder +- create `WebSocketWeb5RpcClient` wrapper in `rpc-client` + - does not support `sendDidRequest` via sockets + diff --git a/packages/agent/package.json b/packages/agent/package.json index 46f20414f..245a8bc33 100644 --- a/packages/agent/package.json +++ b/packages/agent/package.json @@ -77,6 +77,7 @@ "@web5/dids": "1.0.1", "abstract-level": "1.0.4", "ed25519-keygen": "0.4.11", + "isomorphic-ws": "^5.0.0", "level": "8.0.0", "ms": "2.1.3", "readable-web-to-node-stream": "3.0.2", diff --git a/packages/agent/src/index.ts b/packages/agent/src/index.ts index 433c13cc4..8f63aee6c 100644 --- a/packages/agent/src/index.ts +++ b/packages/agent/src/index.ts @@ -12,7 +12,6 @@ export * from './did-api.js'; export * from './dwn-api.js'; export * from './hd-identity-vault.js'; export * from './identity-api.js'; -export * from './json-rpc.js'; export * from './local-key-manager.js'; export * from './rpc-client.js'; export * from './store-data.js'; @@ -22,4 +21,4 @@ export * from './store-key.js'; export * from './sync-api.js'; export * from './sync-engine-level.js'; export * from './test-harness.js'; -export * from './utils.js'; \ No newline at end of file +export * from './utils.js'; diff --git a/packages/agent/src/prototyping/clients/dwn-rpc-types.ts b/packages/agent/src/prototyping/clients/dwn-rpc-types.ts new file mode 100644 index 000000000..4eff6fb9b --- /dev/null +++ b/packages/agent/src/prototyping/clients/dwn-rpc-types.ts @@ -0,0 +1,55 @@ +import type { MessageEvent, RecordsReadReply, UnionMessageReply } from '@tbd54566975/dwn-sdk-js'; + +export interface SerializableDwnMessage { + toJSON(): string; +} + +export type DwnEventSubscriptionHandler = (event: MessageEvent) => void; + +/** + * Interface for communicating with {@link https://github.com/TBD54566975/dwn-server | DWN Servers} + * via JSON-RPC, supporting operations like sending DWN requests. + */ +export interface DwnRpc { + /** + * Lists the transport protocols supported by the DWN RPC client, such as HTTP or HTTPS. + * @returns An array of strings representing the supported transport protocols. + */ + get transportProtocols(): string[] + + /** + * Sends a request to a DWN Server using the specified DWN RPC request parameters. + * + * @param request - The DWN RPC request containing the URL, target DID, message, and optional data. + * @returns A promise that resolves to the response from the DWN server. + */ + sendDwnRequest(request: DwnRpcRequest): Promise +} + + +/** + * Represents a JSON RPC request to a DWN server, including the URL, target DID, the message to be + * processed, and optional data. + */ +export type DwnRpcRequest = { + /** Optional data to be sent with the request. */ + data?: any; + + /** The URL of the DWN server to which the request is sent. */ + dwnUrl: string; + + /** The message to be processed by the DWN server, which can be a serializable DWN message. */ + message: SerializableDwnMessage | any; + + /** The DID of the target to which the message is addressed. */ + targetDid: string; + + /** Optional subscription handler for DWN events. */ + subscriptionHandler?: DwnEventSubscriptionHandler; +} + +/** + * Represents the JSON RPC response from a DWN server to a request, combining the results of various + * DWN operations. + */ +export type DwnRpcResponse = UnionMessageReply & RecordsReadReply; \ No newline at end of file diff --git a/packages/agent/src/prototyping/clients/http-dwn-rpc-client.ts b/packages/agent/src/prototyping/clients/http-dwn-rpc-client.ts new file mode 100644 index 000000000..b4b36a955 --- /dev/null +++ b/packages/agent/src/prototyping/clients/http-dwn-rpc-client.ts @@ -0,0 +1,68 @@ +import type { JsonRpcResponse } from './json-rpc.js'; +import type { DwnRpc, DwnRpcRequest, DwnRpcResponse } from './dwn-rpc-types.js'; + +import { createJsonRpcRequest, parseJson } from './json-rpc.js'; +import { utils as cryptoUtils } from '@web5/crypto'; + +/** + * HTTP client that can be used to communicate with Dwn Servers + */ +export class HttpDwnRpcClient implements DwnRpc { + get transportProtocols() { return ['http:', 'https:']; } + + async sendDwnRequest(request: DwnRpcRequest): Promise { + const requestId = cryptoUtils.randomUuid(); + const jsonRpcRequest = createJsonRpcRequest(requestId, 'dwn.processMessage', { + target : request.targetDid, + message : request.message + }); + + const fetchOpts = { + method : 'POST', + headers : { + 'dwn-request': JSON.stringify(jsonRpcRequest) + } + }; + + if (request.data) { + // @ts-expect-error TODO: REMOVE + fetchOpts.headers['content-type'] = 'application/octet-stream'; + // @ts-expect-error TODO: REMOVE + fetchOpts['body'] = request.data; + } + + const resp = await fetch(request.dwnUrl, fetchOpts); + let dwnRpcResponse: JsonRpcResponse; + + // check to see if response is in header first. if it is, that means the response is a ReadableStream + let dataStream; + const { headers } = resp; + if (headers.has('dwn-response')) { + // @ts-expect-error TODO: REMOVE + const jsonRpcResponse = parseJson(headers.get('dwn-response')) as JsonRpcResponse; + + if (jsonRpcResponse == null) { + throw new Error(`failed to parse json rpc response. dwn url: ${request.dwnUrl}`); + } + + dataStream = resp.body; + dwnRpcResponse = jsonRpcResponse; + } else { + // TODO: wonder if i need to try/catch this? + const responseBody = await resp.text(); + dwnRpcResponse = JSON.parse(responseBody); + } + + if (dwnRpcResponse.error) { + const { code, message } = dwnRpcResponse.error; + throw new Error(`(${code}) - ${message}`); + } + + const { reply } = dwnRpcResponse.result; + if (dataStream) { + reply['record']['data'] = dataStream; + } + + return reply as DwnRpcResponse; + } +} diff --git a/packages/agent/src/prototyping/clients/json-rpc-socket.ts b/packages/agent/src/prototyping/clients/json-rpc-socket.ts new file mode 100644 index 000000000..9486543d7 --- /dev/null +++ b/packages/agent/src/prototyping/clients/json-rpc-socket.ts @@ -0,0 +1,169 @@ +import { utils as cryptoUtils } from '@web5/crypto'; +import IsomorphicWebSocket from 'isomorphic-ws'; +import { JsonRpcId, JsonRpcRequest, JsonRpcResponse, createJsonRpcSubscriptionRequest, parseJson } from './json-rpc.js'; + +// These were arbitrarily chosen, but can be modified via connect options +const CONNECT_TIMEOUT = 3_000; +const RESPONSE_TIMEOUT = 30_000; + +export interface JsonRpcSocketOptions { + /** socket connection timeout in milliseconds */ + connectTimeout?: number; + /** response timeout for rpc requests in milliseconds */ + responseTimeout?: number; + /** optional connection close handler */ + onclose?: () => void; + /** optional socket error handler */ + onerror?: (error?: any) => void; +} + +/** + * JSON RPC Socket Client for WebSocket request/response and long-running subscriptions. + * + * NOTE: This is temporarily copied over from https://github.com/TBD54566975/dwn-server/blob/main/src/json-rpc-socket.ts + * This was done in order to avoid taking a dependency on the `dwn-server`, until a future time when there will be a `clients` package. + */ +export class JsonRpcSocket { + private messageHandlers: Map void> = new Map(); + + private constructor(private socket: IsomorphicWebSocket, private responseTimeout: number) {} + + static async connect(url: string, options: JsonRpcSocketOptions = {}): Promise { + const { connectTimeout = CONNECT_TIMEOUT, responseTimeout = RESPONSE_TIMEOUT, onclose, onerror } = options; + + const socket = new IsomorphicWebSocket(url); + + if (!onclose) { + socket.onclose = ():void => { + console.info(`JSON RPC Socket close ${url}`); + }; + } else { + socket.onclose = onclose; + } + + if (!onerror) { + socket.onerror = (error?: any):void => { + console.error(`JSON RPC Socket error ${url}`, error); + }; + } else { + socket.onerror = onerror; + } + + return new Promise((resolve, reject) => { + socket.addEventListener('open', () => { + const jsonRpcSocket = new JsonRpcSocket(socket, responseTimeout); + + socket.addEventListener('message', (event: { data: any }) => { + const jsonRpcResponse = parseJson(event.data) as JsonRpcResponse; + const handler = jsonRpcSocket.messageHandlers.get(jsonRpcResponse.id); + if (handler) { + handler(event); + } + }); + + resolve(jsonRpcSocket); + }); + + socket.addEventListener('error', (error: any) => { + reject(error); + }); + + setTimeout(() => reject, connectTimeout); + }); + } + + close(): void { + this.socket.close(); + } + + /** + * Sends a JSON-RPC request through the socket and waits for a single response. + */ + async request(request: JsonRpcRequest): Promise { + return new Promise((resolve, reject) => { + request.id ??= cryptoUtils.randomUuid(); + + const handleResponse = (event: { data: any }):void => { + const jsonRpsResponse = parseJson(event.data) as JsonRpcResponse; + if (jsonRpsResponse.id === request.id) { + // if the incoming response id matches the request id, we will remove the listener and resolve the response + this.messageHandlers.delete(request.id); + return resolve(jsonRpsResponse); + } + }; + + // add the listener to the map of message handlers + this.messageHandlers.set(request.id, handleResponse); + this.send(request); + + // reject this promise if we don't receive any response back within the timeout period + setTimeout(() => { + this.messageHandlers.delete(request.id!); + reject(new Error('request timed out')); + }, this.responseTimeout); + }); + } + + /** + * Sends a JSON-RPC request through the socket and keeps a listener open to read associated responses as they arrive. + * Returns a close method to clean up the listener. + */ + async subscribe(request: JsonRpcRequest, listener: (response: JsonRpcResponse) => void): Promise<{ + response: JsonRpcResponse; + close?: () => Promise; + }> { + + if (!request.method.startsWith('rpc.subscribe.')) { + throw new Error('subscribe rpc requests must include the `rpc.subscribe` prefix'); + } + + if (!request.subscription) { + throw new Error('subscribe rpc requests must include subscribe options'); + } + + const subscriptionId = request.subscription.id; + const socketEventListener = (event: { data: any }):void => { + const jsonRpcResponse = parseJson(event.data.toString()) as JsonRpcResponse; + if (jsonRpcResponse.id === subscriptionId) { + if (jsonRpcResponse.error !== undefined) { + // remove the event listener upon receipt of a JSON RPC Error. + this.messageHandlers.delete(subscriptionId); + this.closeSubscription(subscriptionId); + } + listener(jsonRpcResponse); + } + }; + + this.messageHandlers.set(subscriptionId, socketEventListener); + + const response = await this.request(request); + if (response.error) { + this.messageHandlers.delete(subscriptionId); + return { response }; + } + + // clean up listener and create a `rpc.subscribe.close` message to use when closing this JSON RPC subscription + const close = async (): Promise => { + this.messageHandlers.delete(subscriptionId); + await this.closeSubscription(subscriptionId); + }; + + return { + response, + close + }; + } + + private closeSubscription(id: JsonRpcId): Promise { + const requestId = cryptoUtils.randomUuid(); + const request = createJsonRpcSubscriptionRequest(requestId, 'close', id, {}); + return this.request(request); + } + + /** + * Sends a JSON-RPC request through the socket. You must subscribe to a message listener separately to capture the response. + */ + send(request: JsonRpcRequest):void { + this.socket.send(JSON.stringify(request)); + } +} \ No newline at end of file diff --git a/packages/agent/src/json-rpc.ts b/packages/agent/src/prototyping/clients/json-rpc.ts similarity index 81% rename from packages/agent/src/json-rpc.ts rename to packages/agent/src/prototyping/clients/json-rpc.ts index 14623fa57..73327d743 100644 --- a/packages/agent/src/json-rpc.ts +++ b/packages/agent/src/prototyping/clients/json-rpc.ts @@ -5,6 +5,8 @@ export type JsonRpcVersion = '2.0'; export interface JsonRpcRequest { jsonrpc: JsonRpcVersion; id?: JsonRpcId; + /** JSON RPC Subscription Extension Parameters */ + subscription?: { id: JsonRpcId } method: string; params?: any; } @@ -52,10 +54,7 @@ export const createJsonRpcErrorResponse = ( message: string, data?: any, ): JsonRpcErrorResponse => { - const error: JsonRpcError = { code, message }; - if (data != undefined) { - error.data = data; - } + const error: JsonRpcError = { code, message, data }; return { jsonrpc: '2.0', id, @@ -63,38 +62,44 @@ export const createJsonRpcErrorResponse = ( }; }; -export const createJsonRpcNotification = ( +export const createJsonRpcRequest = ( + id: JsonRpcId, method: string, params?: JsonRpcParams, ): JsonRpcRequest => { return { jsonrpc: '2.0', + id, method, params, }; }; -export const createJsonRpcRequest = ( +export const createJsonRpcSubscriptionRequest = ( id: JsonRpcId, method: string, - params?: JsonRpcParams, + subscriptionId: JsonRpcId, + params?: any ): JsonRpcRequest => { return { - jsonrpc: '2.0', + jsonrpc : '2.0', id, - method, + method : `rpc.subscribe.${method}`, params, + subscription : { + id: subscriptionId, + } }; }; export const createJsonRpcSuccessResponse = ( id: JsonRpcId, - result?: any, + result: any, ): JsonRpcSuccessResponse => { return { - jsonrpc : '2.0', + jsonrpc: '2.0', id, - result : result ?? null, + result, }; }; diff --git a/packages/agent/src/prototyping/clients/web-socket-clients.ts b/packages/agent/src/prototyping/clients/web-socket-clients.ts new file mode 100644 index 000000000..f53b357ec --- /dev/null +++ b/packages/agent/src/prototyping/clients/web-socket-clients.ts @@ -0,0 +1,100 @@ +import type { DwnEventSubscriptionHandler, DwnRpc, DwnRpcRequest, DwnRpcResponse } from './dwn-rpc-types.js'; +import type { GenericMessage, MessageSubscription, UnionMessageReply } from '@tbd54566975/dwn-sdk-js'; + +import { utils as cryptoUtils } from '@web5/crypto'; +import { createJsonRpcRequest, createJsonRpcSubscriptionRequest } from './json-rpc.js'; +import { JsonRpcSocket, JsonRpcSocketOptions } from './json-rpc-socket.js'; + +interface SocketConnection { + socket: JsonRpcSocket; + subscriptions: Map; +} + +export class WebSocketDwnRpcClient implements DwnRpc { + public get transportProtocols() { return ['ws:', 'wss:']; } + // a map of dwn host to WebSocket connection + private static connections = new Map(); + + async sendDwnRequest(request: DwnRpcRequest, jsonRpcSocketOptions?: JsonRpcSocketOptions): Promise { + + // validate that the dwn URL provided is a valid WebSocket URL + const url = new URL(request.dwnUrl); + if (url.protocol !== 'ws:' && url.protocol !== 'wss:') { + throw new Error(`Invalid websocket protocol ${url.protocol}`); + } + + // check if there is already a connection to this host, if it does not exist, initiate a new connection + const hasConnection = WebSocketDwnRpcClient.connections.has(url.host); + if (!hasConnection) { + try { + const socket = await JsonRpcSocket.connect(url.toString(), jsonRpcSocketOptions); + const subscriptions = new Map(); + WebSocketDwnRpcClient.connections.set(url.host, { socket, subscriptions }); + } catch(error) { + throw new Error(`Error connecting to ${url.host}: ${(error as Error).message}`); + } + } + + const connection = WebSocketDwnRpcClient.connections.get(url.host)!; + const { targetDid, message, subscriptionHandler } = request; + + if (subscriptionHandler) { + return WebSocketDwnRpcClient.subscriptionRequest(connection, targetDid, message, subscriptionHandler); + } + + return WebSocketDwnRpcClient.processMessage(connection, targetDid, message); + } + + private static async processMessage(connection: SocketConnection, target: string, message: GenericMessage): Promise { + const requestId = cryptoUtils.randomUuid(); + const request = createJsonRpcRequest(requestId, 'dwn.processMessage', { target, message }); + + const { socket } = connection; + const response = await socket.request(request); + + const { error, result } = response; + if (error !== undefined) { + throw new Error(`error sending DWN request: ${error.message}`); + } + + return result.reply as DwnRpcResponse; + } + + private static async subscriptionRequest(connection: SocketConnection, target:string, message: GenericMessage, messageHandler: DwnEventSubscriptionHandler): Promise { + const requestId = cryptoUtils.randomUuid(); + const subscriptionId = cryptoUtils.randomUuid(); + const request = createJsonRpcSubscriptionRequest(requestId, 'dwn.processMessage', subscriptionId, { target, message }); + + const { socket, subscriptions } = connection; + const { response, close } = await socket.subscribe(request, (response) => { + const { result, error } = response; + if (error) { + + // if there is an error, close the subscription and delete it from the connection + const subscription = subscriptions.get(subscriptionId); + if (subscription) { + subscription.close(); + } + + subscriptions.delete(subscriptionId); + return; + } + + const { event } = result; + messageHandler(event); + }); + + const { error, result } = response; + if (error) { + throw new Error(`could not subscribe via jsonrpc socket: ${error.message}`); + } + + const { reply } = result as { reply: UnionMessageReply }; + if (reply.subscription && close) { + subscriptions.set(subscriptionId, { ...reply.subscription, close }); + reply.subscription.close = close; + } + + return reply; + } +} \ No newline at end of file diff --git a/packages/agent/src/rpc-client.ts b/packages/agent/src/rpc-client.ts index 08765c5b9..25c479c1c 100644 --- a/packages/agent/src/rpc-client.ts +++ b/packages/agent/src/rpc-client.ts @@ -1,9 +1,12 @@ import { utils as cryptoUtils } from '@web5/crypto'; -import { RecordsReadReply, UnionMessageReply } from '@tbd54566975/dwn-sdk-js'; -import type { JsonRpcResponse } from './json-rpc.js'; -import { createJsonRpcRequest, parseJson } from './json-rpc.js'; +import type { DwnRpc, DwnRpcRequest, DwnRpcResponse } from './prototyping/clients/dwn-rpc-types.js'; +import type { JsonRpcResponse } from './prototyping/clients/json-rpc.js'; + +import { createJsonRpcRequest } from './prototyping/clients/json-rpc.js'; +import { HttpDwnRpcClient } from './prototyping/clients/http-dwn-rpc-client.js'; +import { WebSocketDwnRpcClient } from './prototyping/clients/web-socket-clients.js'; /** * Interface that can be implemented to communicate with {@link Web5Agent | Web5 Agent} @@ -31,60 +34,11 @@ export type DidRpcResponse = { status: RpcStatus; } -/** - * Interface for communicating with {@link https://github.com/TBD54566975/dwn-server | DWN Servers} - * via JSON-RPC, supporting operations like sending DWN requests. - */ -export interface DwnRpc { - /** - * Lists the transport protocols supported by the DWN RPC client, such as HTTP or HTTPS. - * @returns An array of strings representing the supported transport protocols. - */ - get transportProtocols(): string[] - - /** - * Sends a request to a DWN Server using the specified DWN RPC request parameters. - * - * @param request - The DWN RPC request containing the URL, target DID, message, and optional data. - * @returns A promise that resolves to the response from the DWN server. - */ - sendDwnRequest(request: DwnRpcRequest): Promise -} - - -/** - * Represents a JSON RPC request to a DWN server, including the URL, target DID, the message to be - * processed, and optional data. - */ -export type DwnRpcRequest = { - /** Optional data to be sent with the request. */ - data?: any; - - /** The URL of the DWN server to which the request is sent. */ - dwnUrl: string; - - /** The message to be processed by the DWN server, which can be a serializable DWN message. */ - message: SerializableDwnMessage | any; - - /** The DID of the target to which the message is addressed. */ - targetDid: string; -} - -/** - * Represents the JSON RPC response from a DWN server to a request, combining the results of various - * DWN operations. - */ -export type DwnRpcResponse = UnionMessageReply & RecordsReadReply; - export type RpcStatus = { code: number; message: string; }; -export interface SerializableDwnMessage { - toJSON(): string; -} - export interface Web5Rpc extends DwnRpc, DidRpc {} /** @@ -142,72 +96,7 @@ export class Web5RpcClient implements Web5Rpc { } } -// TODO: move to dwn-server repo. i wrote this here for expediency - -/** - * HTTP client that can be used to communicate with Dwn Servers - */ -class HttpDwnRpcClient implements DwnRpc { - get transportProtocols() { return ['http:', 'https:']; } - - async sendDwnRequest(request: DwnRpcRequest): Promise { - const requestId = cryptoUtils.randomUuid(); - const jsonRpcRequest = createJsonRpcRequest(requestId, 'dwn.processMessage', { - target : request.targetDid, - message : request.message - }); - - const fetchOpts = { - method : 'POST', - headers : { - 'dwn-request': JSON.stringify(jsonRpcRequest) - } - }; - - if (request.data) { - // @ts-expect-error TODO: REMOVE - fetchOpts.headers['content-type'] = 'application/octet-stream'; - // @ts-expect-error TODO: REMOVE - fetchOpts['body'] = request.data; - } - - const resp = await fetch(request.dwnUrl, fetchOpts); - let dwnRpcResponse: JsonRpcResponse; - - // check to see if response is in header first. if it is, that means the response is a ReadableStream - let dataStream; - const { headers } = resp; - if (headers.has('dwn-response')) { - // @ts-expect-error TODO: REMOVE - const jsonRpcResponse = parseJson(headers.get('dwn-response')) as JsonRpcResponse; - - if (jsonRpcResponse == null) { - throw new Error(`failed to parse json rpc response. dwn url: ${request.dwnUrl}`); - } - - dataStream = resp.body; - dwnRpcResponse = jsonRpcResponse; - } else { - // TODO: wonder if i need to try/catch this? - const responseBody = await resp.text(); - dwnRpcResponse = JSON.parse(responseBody); - } - - if (dwnRpcResponse.error) { - const { code, message } = dwnRpcResponse.error; - throw new Error(`(${code}) - ${message}`); - } - - const { reply } = dwnRpcResponse.result; - if (dataStream) { - reply['record']['data'] = dataStream; - } - - return reply as DwnRpcResponse; - } -} - -class HttpWeb5RpcClient extends HttpDwnRpcClient implements Web5Rpc { +export class HttpWeb5RpcClient extends HttpDwnRpcClient implements Web5Rpc { async sendDidRequest(request: DidRpcRequest): Promise { const requestId = cryptoUtils.randomUuid(); const jsonRpcRequest = createJsonRpcRequest(requestId, request.method, { @@ -244,4 +133,10 @@ class HttpWeb5RpcClient extends HttpDwnRpcClient implements Web5Rpc { return jsonRpcResponse.result as DidRpcResponse; } +} + +export class WebSocketWeb5RpcClient extends WebSocketDwnRpcClient implements Web5Rpc { + async sendDidRequest(_request: DidRpcRequest): Promise { + throw new Error(`not implemented for transports [${this.transportProtocols.join(', ')}]`); + } } \ No newline at end of file diff --git a/packages/agent/tests/prototyping/clients/http-dwn-rpc-client.spec.ts b/packages/agent/tests/prototyping/clients/http-dwn-rpc-client.spec.ts new file mode 100644 index 000000000..b860d041f --- /dev/null +++ b/packages/agent/tests/prototyping/clients/http-dwn-rpc-client.spec.ts @@ -0,0 +1,130 @@ +import type { Persona } from '@tbd54566975/dwn-sdk-js'; + +import sinon from 'sinon'; +import { expect } from 'chai'; + +import { RecordsRead, TestDataGenerator } from '@tbd54566975/dwn-sdk-js'; +import { HttpDwnRpcClient } from '../../../src/prototyping/clients/http-dwn-rpc-client.js'; + +import { testDwnUrl } from '../../utils/test-config.js'; + +describe('HttpDwnRpcClient', () => { + const client = new HttpDwnRpcClient(); + let alice: Persona; + + beforeEach(async () => { + sinon.restore(); + alice = await TestDataGenerator.generateDidKeyPersona(); + }); + + after(() => { + sinon.restore(); + }); + + describe('sendDwnRequest', () => { + it('sends request', async () => { + // create a generic records query + const { message } = await TestDataGenerator.generateRecordsQuery({ + author : alice, + filter : { + schema: 'foo/bar' + } + }); + + const response = await client.sendDwnRequest({ + dwnUrl : testDwnUrl, + targetDid : alice.did, + message, + }); + + // should return success but without any records as none exist yet + expect(response.status.code).to.equal(200); + expect(response.entries).to.exist; + expect(response.entries?.length).to.equal(0); + }); + + it('send RecordsWrite message', async () => { + // create a generic record with schema `foo/bar` + const { message: writeMessage, dataBytes } = await TestDataGenerator.generateRecordsWrite({ + author : alice, + schema : 'foo/bar' + }); + + const writeResponse = await client.sendDwnRequest({ + dwnUrl : testDwnUrl, + targetDid : alice.did, + message : writeMessage, + data : dataBytes, + }); + expect(writeResponse.status.code).to.equal(202); + + // query for records matching the schema of the record we inserted + const { message: readMessage } = await RecordsRead.create({ + signer : alice.signer, + filter : { + recordId: writeMessage.recordId, + } + }); + + const readResponse = await client.sendDwnRequest({ + dwnUrl : testDwnUrl, + targetDid : alice.did, + message : readMessage, + }); + + // should return success, and the record we inserted + expect(readResponse.status.code).to.equal(200); + expect(readResponse.record).to.exist; + expect(readResponse.record?.recordId).to.equal(writeMessage.recordId); + }); + + it('throws error if invalid response exists in the header', async () => { + const headers = sinon.createStubInstance(Headers, { has: true }); + sinon.stub(globalThis, 'fetch').resolves({ headers } as any); + + // create a generic record with schema `foo/bar` + const { message: writeMessage, dataBytes } = await TestDataGenerator.generateRecordsWrite({ + author : alice, + schema : 'foo/bar' + }); + + + try { + await client.sendDwnRequest({ + dwnUrl : testDwnUrl, + targetDid : alice.did, + message : writeMessage, + data : dataBytes, + }); + expect.fail('Expected an error to be thrown'); + } catch(error:any) { + expect(error.message).to.include('failed to parse json rpc response.'); + } + }); + + it('throws error if rpc responds with an error', async () => { + const headers = sinon.createStubInstance(Headers, { + has : true, + get : '{ "error": { "message": "message", "code":"code" } }' + }); + sinon.stub(globalThis, 'fetch').resolves({ headers } as any); + + // create a generic record with schema `foo/bar` + const { message: writeMessage, dataBytes } = await TestDataGenerator.generateRecordsWrite({ + author : alice, + schema : 'foo/bar' + }); + try { + await client.sendDwnRequest({ + dwnUrl : testDwnUrl, + targetDid : alice.did, + message : writeMessage, + data : dataBytes, + }); + expect.fail('Expected an error to be thrown'); + } catch(error: any) { + expect(error.message).to.include('(code) - message'); + } + }); + }); +}); \ No newline at end of file diff --git a/packages/agent/tests/prototyping/clients/json-rpc-socket.spec.ts b/packages/agent/tests/prototyping/clients/json-rpc-socket.spec.ts new file mode 100644 index 000000000..f61f98168 --- /dev/null +++ b/packages/agent/tests/prototyping/clients/json-rpc-socket.spec.ts @@ -0,0 +1,279 @@ +import { expect } from 'chai'; + +import sinon from 'sinon'; + +import { JsonRpcSocket } from '../../../src/prototyping/clients/json-rpc-socket.js'; +import { utils as cryptoUtils } from '@web5/crypto'; +import { JsonRpcErrorCodes, JsonRpcResponse, createJsonRpcErrorResponse, createJsonRpcRequest, createJsonRpcSubscriptionRequest, createJsonRpcSuccessResponse } from '../../../src/prototyping/clients/json-rpc.js'; +import { testDwnUrl } from '../../utils/test-config.js'; +import { Persona, TestDataGenerator } from '@tbd54566975/dwn-sdk-js'; + +/** helper method to sleep while waiting for events to process/arrive */ +async function sleepWhileWaitingForEvents(override?: number):Promise { + await new Promise((resolve) => setTimeout(resolve, override || 10)); +} + +describe('JsonRpcSocket', () => { + let alice: Persona; + // we set the client to a websocket url + const dwnUrl = new URL(testDwnUrl); + dwnUrl.protocol = dwnUrl.protocol === 'http:' ? 'ws:' : 'wss:'; + const socketDwnUrl = dwnUrl.toString(); + + after(() => { + sinon.restore(); + }); + + beforeEach(async () => { + sinon.restore(); + + alice = await TestDataGenerator.generateDidKeyPersona(); + }); + + it('connects to a url', async () => { + const client = await JsonRpcSocket.connect(socketDwnUrl); + client.close(); + }); + + it('generates a request id if one is not provided', async () => { + const client = await JsonRpcSocket.connect(socketDwnUrl); + const requestId = cryptoUtils.randomUuid(); + const request = createJsonRpcRequest(requestId, 'dwn.processMessage', { param1: 'test-param1', param2: 'test-param2' }); + delete request.id; + + const response = await client.request(request); + expect(response.id).to.not.equal(requestId); + }); + + it('resolves a request with given params', async () => { + const client = await JsonRpcSocket.connect(socketDwnUrl); + const requestId = cryptoUtils.randomUuid(); + const request = createJsonRpcRequest(requestId, 'dwn.processMessage', { param1: 'test-param1', param2: 'test-param2' }); + const response = await client.request(request); + expect(response.id).to.equal(request.id); + }); + + it('request times out', async () => { + // time out after 1 ms + const client = await JsonRpcSocket.connect(socketDwnUrl, { responseTimeout: 1 }); + const requestId = cryptoUtils.randomUuid(); + const request = createJsonRpcRequest(requestId, 'down.processMessage', { param1: 'test-param1', param2: 'test-param2' }); + try { + await client.request(request); + expect.fail('Expected an error to be thrown'); + } catch (error: any) { + expect(error.message).to.contain('timed out'); + } + }); + + it('adds a handler to the messageHandlers map when listening for a response to a request', async () => { + const client = await JsonRpcSocket.connect(socketDwnUrl); + const { message } = await TestDataGenerator.generateRecordsSubscribe({ author: alice }); + const requestId = cryptoUtils.randomUuid(); + const request = createJsonRpcRequest(requestId, 'dwn.processMessage', { target: alice.did, message }); + const response = client.request(request); + expect(client['messageHandlers'].has(requestId)).to.be.true; + + await response; + + // removes the handler after the response is received + expect(client['messageHandlers'].has(requestId)).to.be.false; + }); + + it('adds a handler to the messageHandlers map when listening for a response to a subscription', async () => { + const client = await JsonRpcSocket.connect(socketDwnUrl); + const { message } = await TestDataGenerator.generateRecordsSubscribe({ author: alice }); + + const requestId = cryptoUtils.randomUuid(); + const subscriptionId = cryptoUtils.randomUuid(); + const request = createJsonRpcSubscriptionRequest( + requestId, + 'dwn.processMessage', + subscriptionId, + { target: alice.did, message } + ); + + const responseListener = (_response: JsonRpcResponse): void => {}; + const subscription = await client.subscribe(request, responseListener); + expect(client['messageHandlers'].has(subscriptionId)).to.be.true; + + // removes the handler after the subscription is closed + await subscription.close!(); + expect(client['messageHandlers'].has(subscriptionId)).to.be.false; + }); + + it('removes listener if subscription json rpc is rejected ', async () => { + const client = await JsonRpcSocket.connect(socketDwnUrl); + const requestId = cryptoUtils.randomUuid(); + const subscribeId = cryptoUtils.randomUuid(); + + const request = createJsonRpcSubscriptionRequest( + requestId, + 'dwn.processMessage', + subscribeId, + { }, + ); + + const responseListener = (_response: JsonRpcResponse): void => {}; + + const subscription = await client.subscribe(request, responseListener); + expect(subscription.response.error).to.not.be.undefined; + expect(client['messageHandlers'].has(subscribeId)).to.be.false; + }); + + it('opens a subscription', async () => { + + const client = await JsonRpcSocket.connect(socketDwnUrl); + const { message } = await TestDataGenerator.generateRecordsSubscribe({ author: alice }); + + const requestId = cryptoUtils.randomUuid(); + const subscriptionId = cryptoUtils.randomUuid(); + const request = createJsonRpcSubscriptionRequest( + requestId, + 'dwn.processMessage', + subscriptionId, + { target: alice.did, message } + ); + + const responseListener = (_response: JsonRpcResponse): void => {}; + + const subscription = await client.subscribe(request, responseListener); + expect(subscription.response.error).to.be.undefined; + // wait for the messages to arrive + await sleepWhileWaitingForEvents(); + // the original response + if (subscription.close) { + await subscription.close(); + } + }); + + it('only JSON RPC Methods prefixed with `rpc.subscribe.` are accepted for a subscription', async () => { + const client = await JsonRpcSocket.connect(socketDwnUrl); + const requestId = cryptoUtils.randomUuid(); + const request = createJsonRpcRequest(requestId, 'test.method', { param1: 'test-param1', param2: 'test-param2' }); + try { + await client.subscribe(request, () => {}); + expect.fail('Expected an error to be thrown'); + } catch(error: any) { + expect(error.message).to.contain('subscribe rpc requests must include the `rpc.subscribe` prefix'); + } + }); + + it('subscribe methods must contain a subscribe object within the request which contains the subscription JsonRpcId', async () => { + const client = await JsonRpcSocket.connect(socketDwnUrl); + const requestId = cryptoUtils.randomUuid(); + const request = createJsonRpcRequest(requestId, 'rpc.subscribe.test.method', { param1: 'test-param1', param2: 'test-param2' }); + try { + await client.subscribe(request, () => {}); + expect.fail('Expected an error to be thrown'); + } catch(error: any) { + expect(error.message).to.contain('subscribe rpc requests must include subscribe options'); + } + }); + + it('calls onclose handler', async () => { + // test injected handler + const onCloseHandler = { onclose: ():void => {} }; + const onCloseSpy = sinon.spy(onCloseHandler, 'onclose'); + const client = await JsonRpcSocket.connect(socketDwnUrl, { onclose: onCloseHandler.onclose }); + client.close(); + + await sleepWhileWaitingForEvents(); + expect(onCloseSpy.callCount).to.equal(1); + + // test default logger + const logInfoSpy = sinon.stub(console, 'info'); + const defaultClient = await JsonRpcSocket.connect(socketDwnUrl); + defaultClient.close(); + + await sleepWhileWaitingForEvents(); + expect(logInfoSpy.callCount).to.equal(1); + + // extract log message from argument + const logMessage:string = logInfoSpy.args[0][0]!; + expect(logMessage).to.equal(`JSON RPC Socket close ${socketDwnUrl}`); + }); + + // NOTE: Temporary in lieu of a better mock of isomorphic-ws + // tests reply on Node's use of event listeners to emit an error or message over the socket. + describe('browser', () => { + if (typeof window !== 'undefined') { + xit('calls onerror handler', async () => { + }); + xit('closes subscription upon receiving a JsonRpc Error for a long running subscription', async () => { + }); + } + }); + + describe('NodeJS', function () { + if (typeof process !== 'undefined' && (process as any).browser !== true) { + it('calls onerror handler', async () => { + // test injected handler + const onErrorHandler = { onerror: ():void => {} }; + const onErrorSpy = sinon.spy(onErrorHandler, 'onerror'); + const client = await JsonRpcSocket.connect(socketDwnUrl, { onerror: onErrorHandler.onerror }); + client['socket'].emit('error', 'some error'); + + await sleepWhileWaitingForEvents(); + expect(onErrorSpy.callCount).to.equal(1, 'error'); + + // test default logger + const logInfoSpy = sinon.stub(console, 'error'); + const defaultClient = await JsonRpcSocket.connect(socketDwnUrl); + defaultClient['socket'].emit('error', 'some error'); + + await sleepWhileWaitingForEvents(); + expect(logInfoSpy.callCount).to.equal(1, 'log'); + + // extract log message from argument + const logMessage:string = logInfoSpy.args[0][0]!; + expect(logMessage).to.equal(`JSON RPC Socket error ${socketDwnUrl}`); + }); + + it('closes subscription upon receiving a JsonRpc Error for a long running subscription', async () => { + + const client = await JsonRpcSocket.connect(socketDwnUrl); + const { message } = await TestDataGenerator.generateRecordsSubscribe({ author: alice }); + + const requestId = cryptoUtils.randomUuid(); + const subscriptionId = cryptoUtils.randomUuid(); + const request = createJsonRpcSubscriptionRequest( + requestId, + 'dwn.processMessage', + subscriptionId, + { target: alice.did, message } + ); + + let errorCounter = 0; + let responseCounter = 0; + const responseListener = (response: JsonRpcResponse): void => { + expect(response.id).to.equal(subscriptionId); + if (response.error) { + errorCounter++; + } + + if (response.result) { + responseCounter++; + } + }; + + const subscription = await client.subscribe(request, responseListener); + expect(subscription.response.error).to.be.undefined; + // wait for the messages to arrive + + // induce positive result + const jsonResponse = createJsonRpcSuccessResponse(subscriptionId, { reply: {} }); + client['socket'].emit('message', JSON.stringify(jsonResponse)); + + // induce error message + const errorResponse = createJsonRpcErrorResponse(subscriptionId, JsonRpcErrorCodes.InternalError, 'message'); + client['socket'].emit('message', JSON.stringify(errorResponse)); + + await sleepWhileWaitingForEvents(); + // the original response + expect(responseCounter).to.equal(1, 'response'); + expect(errorCounter).to.equal(1, 'error'); + }); + } + }); +}); diff --git a/packages/agent/tests/prototyping/clients/ws-dwn-rpc-client.spec.ts b/packages/agent/tests/prototyping/clients/ws-dwn-rpc-client.spec.ts new file mode 100644 index 000000000..592d47883 --- /dev/null +++ b/packages/agent/tests/prototyping/clients/ws-dwn-rpc-client.spec.ts @@ -0,0 +1,351 @@ +import type { Persona, RecordsWriteMessage } from '@tbd54566975/dwn-sdk-js'; + +import sinon from 'sinon'; + +import { expect } from 'chai'; + +import { DwnInterfaceName, DwnMethodName, RecordsRead, TestDataGenerator } from '@tbd54566975/dwn-sdk-js'; +import { WebSocketDwnRpcClient } from '../../../src/prototyping/clients/web-socket-clients.js'; + +import { testDwnUrl } from '../../utils/test-config.js'; +import { JsonRpcSocket } from '../../../src/prototyping/clients/json-rpc-socket.js'; +import { JsonRpcErrorCodes, createJsonRpcErrorResponse } from '../../../src/prototyping/clients/json-rpc.js'; +import { HttpDwnRpcClient } from '../../../src/prototyping/clients/http-dwn-rpc-client.js'; +import { DwnEventSubscriptionHandler } from '../../../src/prototyping/clients/dwn-rpc-types.js'; + +/** helper method to sleep while waiting for events to process/arrive */ +async function sleepWhileWaitingForEvents(override?: number):Promise { + await new Promise((resolve) => setTimeout(resolve, override || 10)); +} + +describe('WebSocketDwnRpcClient', () => { + const client = new WebSocketDwnRpcClient(); + const httpClient = new HttpDwnRpcClient(); + let alice: Persona; + let socketDwnUrl: string; + + + beforeEach(async () => { + // we set the client to a websocket url + const dwnUrl = new URL(testDwnUrl); + dwnUrl.protocol = dwnUrl.protocol === 'http:' ? 'ws:' : 'wss:'; + socketDwnUrl = dwnUrl.toString(); + + sinon.restore(); + alice = await TestDataGenerator.generateDidKeyPersona(); + }); + + after(() => { + sinon.restore(); + }); + + describe('sendDwnRequest', () => { + it('sends request', async () => { + // create a generic records query + const { message } = await TestDataGenerator.generateRecordsQuery({ + author : alice, + filter : { + schema: 'foo/bar' + } + }); + + const response = await client.sendDwnRequest({ + dwnUrl : socketDwnUrl, + targetDid : alice.did, + message, + }); + + // should return success but without any records as none exist yet + expect(response.status.code).to.equal(200); + expect(response.entries).to.exist; + expect(response.entries?.length).to.equal(0); + }); + + it('only supports WebSocket and Secure WebSocket protocols', async () => { + // deliberately set 'http' as the protocol + const dwnUrl = new URL(testDwnUrl); + dwnUrl.protocol = 'http:'; + const httpDwnUrl = dwnUrl.toString(); + + // create a generic records query + const { message } = await TestDataGenerator.generateRecordsQuery({ + author : alice, + filter : { + schema: 'foo/bar' + } + }); + + try { + await client.sendDwnRequest({ + dwnUrl : httpDwnUrl, + targetDid : alice.did, + message, + }); + expect.fail('Expected an error to be thrown'); + } catch (error: any) { + expect(error.message).to.equal('Invalid websocket protocol http:'); + } + }); + + it('rejects invalid connection', async () => { + + // create a generic records query + const { message } = await TestDataGenerator.generateRecordsQuery({ + author : alice, + filter : { + schema: 'foo/bar' + } + }); + + // avoid print default error logging + sinon.stub(console, 'error'); + + try { + await client.sendDwnRequest({ + dwnUrl : 'ws://127.0.0.1:10', // invalid host + targetDid : alice.did, + message, + }, { connectTimeout: 5 }); // set a short connect timeout + expect.fail('Expected an error to be thrown'); + } catch (error: any) { + expect(error.message).to.include('Error connecting to 127.0.0.1:10'); + } + }); + + it('responds to a RecordsRead message', async () => { + // create a generic record with schema `foo/bar` + const { message: writeMessage, dataBytes } = await TestDataGenerator.generateRecordsWrite({ + author : alice, + schema : 'foo/bar' + }); + + // write the message using the http client as we currently do not support `RecordsWrite` via sockets. + const writeResponse = await httpClient.sendDwnRequest({ + dwnUrl : testDwnUrl, + targetDid : alice.did, + message : writeMessage, + data : dataBytes, + }); + expect(writeResponse.status.code).to.equal(202); + + // query for records matching the schema of the record we inserted + const { message: readMessage } = await RecordsRead.create({ + signer : alice.signer, + filter : { + recordId: writeMessage.recordId, + } + }); + + // now we send a `RecordsRead` request using the socket client + const readResponse = await client.sendDwnRequest({ + dwnUrl : socketDwnUrl, + targetDid : alice.did, + message : readMessage, + }); + + // should return success, and the record we inserted + expect(readResponse.status.code).to.equal(200); + expect(readResponse.record).to.exist; + expect(readResponse.record?.recordId).to.equal(writeMessage.recordId); + }); + + it('subscribes to updates to a record', async () => { + // create an initial record, we will subscribe to updates of this record + const { message: writeMessage, dataBytes, recordsWrite } = await TestDataGenerator.generateRecordsWrite({ + author : alice, + schema : 'foo/bar' + }); + + // write the message using the http client as we currently do not support `RecordsWrite` via sockets. + const writeResponse = await httpClient.sendDwnRequest({ + dwnUrl : testDwnUrl, + targetDid : alice.did, + message : writeMessage, + data : dataBytes, + }); + expect(writeResponse.status.code).to.equal(202); + + // create a subscription + const { message: subscribeMessage } = await TestDataGenerator.generateRecordsSubscribe({ + author : alice, + filter : { + recordId: writeMessage.recordId, + } + }); + + const dataCids:string[] = []; + const subscriptionHandler: DwnEventSubscriptionHandler = (event) => { + const { message, initialWrite } = event; + expect(initialWrite!.recordId).to.equal(writeMessage.recordId); + expect(initialWrite!.descriptor.dataCid).to.equal(writeMessage.descriptor.dataCid); + if (message.descriptor.interface + message.descriptor.method === DwnInterfaceName.Records + DwnMethodName.Write) { + dataCids.push((message as RecordsWriteMessage).descriptor.dataCid); + } + }; + + const subscribeResponse = await client.sendDwnRequest({ + dwnUrl : socketDwnUrl, + targetDid : alice.did, + message : subscribeMessage, + subscriptionHandler + }); + expect(subscribeResponse.status.code).to.equal(200); + expect(subscribeResponse.subscription).to.exist; + + // update the record + const { message: update1, recordsWrite: updateWrite, dataBytes: update1Data } = await TestDataGenerator.generateFromRecordsWrite({ + existingWrite : recordsWrite, + author : alice, + }); + + let updateReply = await httpClient.sendDwnRequest({ + dwnUrl : testDwnUrl, + targetDid : alice.did, + message : update1, + data : update1Data, + }); + expect(updateReply.status.code).to.equal(202); + + // make another update + const { message: update2, dataBytes: update2Data } = await TestDataGenerator.generateFromRecordsWrite({ + existingWrite : updateWrite, + author : alice, + }); + updateReply = await httpClient.sendDwnRequest({ + dwnUrl : testDwnUrl, + targetDid : alice.did, + message : update2, + data : update2Data, + }); + expect(updateReply.status.code).to.equal(202); + + // wait for events to emit + await sleepWhileWaitingForEvents(); + await subscribeResponse.subscription!.close(); + + expect(dataCids).to.have.members([ + update1.descriptor.dataCid, + update2.descriptor.dataCid + ]); + }); + + describe('processMessage', () => { + it('throws when json rpc response errors are returned', async () => { + const { message } = await TestDataGenerator.generateRecordsQuery({ + author : alice, + filter : { + schema: 'foo/bar' + } + }); + + const socket = await JsonRpcSocket.connect(socketDwnUrl); + const connection = { + subscriptions: new Map(), + socket, + }; + + sinon.stub(socket, 'request').resolves({ + jsonrpc : '2.0', + id : 'id', + error : { message: 'some error',code: JsonRpcErrorCodes.BadRequest } + }); + + try { + await WebSocketDwnRpcClient['processMessage'](connection, alice.did, message); + expect.fail('Expected an error to be thrown'); + } catch(error: any) { + expect(error.message).to.equal('error sending DWN request: some error'); + } + }); + }); + + describe('subscriptionRequest', () => { + it('throws when json rpc response errors are returned', async () => { + const { message } = await TestDataGenerator.generateRecordsQuery({ + author : alice, + filter : { + schema: 'foo/bar' + } + }); + + const socket = await JsonRpcSocket.connect(socketDwnUrl); + const connection = { + subscriptions: new Map(), + socket, + }; + + sinon.stub(socket, 'subscribe').resolves({ + response: { + jsonrpc : '2.0', + id : 'id', + error : { message: 'some error',code: JsonRpcErrorCodes.BadRequest } + } + }); + + try { + await WebSocketDwnRpcClient['subscriptionRequest'](connection, alice.did, message, () => {}); + expect.fail('Expected an error to be thrown'); + } catch (error: any) { + expect(error.message).to.equal('could not subscribe via jsonrpc socket: some error'); + } + }); + + it('close and clean up subscription when emitted an json rpc error response in the handler', async () => { + const { message } = await TestDataGenerator.generateRecordsQuery({ + author : alice, + filter : { + schema: 'foo/bar' + } + }); + + const socket = await JsonRpcSocket.connect(socketDwnUrl); + const subscriptions = new Map(); + const connection = { + subscriptions, + socket, + }; + + const subscribeStub = sinon.stub(socket, 'subscribe').resolves({ + response: { + jsonrpc : '2.0', + id : 'id', + result : { + reply: { + status : { code: 200, detail: 'Ok' }, + subscription : { + id : 'sub-id', + close : () => {} + } + } + } + } + }); + + const processMessage = await WebSocketDwnRpcClient['subscriptionRequest'](connection, alice.did, message, () => {}); + expect(processMessage.status.code).to.equal(200); + const subscriptionCallArgs = [...subscribeStub.args][0]; + const subRequest = subscriptionCallArgs[0]; + const subHandler = subscriptionCallArgs[1]; + + // get the subscription Id from the request, and add a mock subscription to the subscriptions map + const subscriptionId = subRequest.subscription!.id; + const subscription = { + id : subscriptionId, + close : () => {} + }; + // spy on the close function + const closeSpy = sinon.spy(subscription, 'close'); + + // add to the subscriptions map + subscriptions.set(subscriptionId, subscription); + + const jsonError = createJsonRpcErrorResponse('id', JsonRpcErrorCodes.BadRequest, 'some error'); + subHandler(jsonError); + + // confirm close was called and subscription was removed + expect(closeSpy.callCount).to.equal(1); + expect(subscriptions.size).to.equal(0); + }); + }); + }); +}); \ No newline at end of file diff --git a/packages/agent/tests/rpc-client.spec.ts b/packages/agent/tests/rpc-client.spec.ts new file mode 100644 index 000000000..11cc6c809 --- /dev/null +++ b/packages/agent/tests/rpc-client.spec.ts @@ -0,0 +1,256 @@ + +import sinon from 'sinon'; + +import { expect } from 'chai'; + +import { utils as cryptoUtils } from '@web5/crypto'; + +import { testDwnUrl } from './utils/test-config.js'; + +import { DidRpcMethod, HttpWeb5RpcClient, Web5RpcClient, WebSocketWeb5RpcClient } from '../src/rpc-client.js'; +import { Persona, TestDataGenerator } from '@tbd54566975/dwn-sdk-js'; +import { JsonRpcErrorCodes, createJsonRpcErrorResponse, createJsonRpcSuccessResponse } from '../src/prototyping/clients/json-rpc.js'; + +describe('RPC Clients', () => { + describe('Web5RpcClient', () => { + let alice: Persona; + + beforeEach(async () => { + sinon.restore(); + + alice = await TestDataGenerator.generateDidKeyPersona(); + }); + + it('returns available transports', async () => { + const httpOnlyClient = new Web5RpcClient(); + expect(httpOnlyClient.transportProtocols).to.have.members(['http:', 'https:']); + + const wsAndHttpClients = new Web5RpcClient([ + new WebSocketWeb5RpcClient() + ]); + + expect(wsAndHttpClients.transportProtocols).to.have.members([ + 'http:', + 'https:', + 'ws:', + 'wss:' + ]); + }); + + describe('sendDidRequest', () => { + it('should send to the client depending on transport', async () => { + const stubHttpClient = sinon.createStubInstance(HttpWeb5RpcClient); + const httpOnlyClient = new Web5RpcClient([ stubHttpClient ]); + + // request with http + const request = { method: DidRpcMethod.Resolve, url: 'http://127.0.0.1', data: 'some-data' }; + httpOnlyClient.sendDidRequest(request); + + expect(stubHttpClient.sendDidRequest.callCount).to.equal(1); + }); + + it('should throw if transport client is not found', async () => { + const stubHttpClient = sinon.createStubInstance(HttpWeb5RpcClient); + const httpOnlyClient = new Web5RpcClient([ stubHttpClient ]); + + // request with http + const request = { method: DidRpcMethod.Resolve, url: 'ws://127.0.0.1', data: 'some-data' }; + try { + await httpOnlyClient.sendDidRequest(request); + expect.fail('Expected error to be thrown'); + } catch (error: any) { + expect(error.message).to.equal('no ws: transport client available'); + } + + // confirm http transport was not called + expect(stubHttpClient.sendDidRequest.callCount).to.equal(0); + }); + }); + + describe('sendDwnRequest', () => { + it('should send to the client depending on transport', async () => { + const stubHttpClient = sinon.createStubInstance(HttpWeb5RpcClient); + const httpOnlyClient = new Web5RpcClient([ stubHttpClient ]); + const { message } = await TestDataGenerator.generateRecordsQuery({ + author : alice, + filter : { + schema: 'foo/bar' + } + }); + + // request with http + await httpOnlyClient.sendDwnRequest({ + dwnUrl : 'http://127.0.0.1', + targetDid : alice.did, + message, + }); + + // confirm http transport was called + expect(stubHttpClient.sendDwnRequest.callCount).to.equal(1); + }); + + it('should throw if transport client is not found', async () => { + const stubHttpClient = sinon.createStubInstance(HttpWeb5RpcClient); + const httpOnlyClient = new Web5RpcClient([ stubHttpClient ]); + const { message } = await TestDataGenerator.generateRecordsQuery({ + author : alice, + filter : { + schema: 'foo/bar' + } + }); + + try { + // request with ws + await httpOnlyClient.sendDwnRequest({ + dwnUrl : 'ws://127.0.0.1', + targetDid : alice.did, + message, + }); + expect.fail('Expected error to be thrown'); + } catch(error: any) { + expect(error.message).to.equal('no ws: transport client available'); + } + + // confirm http transport was not called + expect(stubHttpClient.sendDwnRequest.callCount).to.equal(0); + }); + }); + }); + + describe('HttpWeb5RpcClient', () => { + let alice: Persona; + let client: HttpWeb5RpcClient; + + beforeEach(async () => { + sinon.restore(); + + client = new HttpWeb5RpcClient(); + alice = await TestDataGenerator.generateDidKeyPersona(); + }); + + describe('sendDwnRequest', () => { + it('supports sending dwn requests', async () => { + // create a generic records query + const { message } = await TestDataGenerator.generateRecordsQuery({ + author : alice, + filter : { + schema: 'foo/bar' + } + }); + + const response = await client.sendDwnRequest({ + dwnUrl : testDwnUrl, + targetDid : alice.did, + message, + }); + + // should return success but without any records as none exist yet + expect(response.status.code).to.equal(200); + expect(response.entries).to.exist; + expect(response.entries?.length).to.equal(0); + }); + }); + + describe('sendDidRequest', () => { + it('should throw if json rpc server responds with an error', async () => { + const request = { method: DidRpcMethod.Resolve, url: testDwnUrl, data: 'some-data' }; + + const requestId = cryptoUtils.randomUuid(); + const jsonRpcResponse = createJsonRpcErrorResponse( + requestId, + JsonRpcErrorCodes.InternalError, + 'some error' + ); + const mockResponse = new Response(JSON.stringify(jsonRpcResponse)); + sinon.stub(globalThis, 'fetch').resolves(mockResponse); + + try { + await client.sendDidRequest(request); + expect.fail('Expected an error to be thrown'); + } catch(error: any) { + expect(error.message).to.contain(`Error encountered while processing response from ${testDwnUrl}`); + } + }); + + it('should throw if http response does not return ok', async () => { + const request = { method: DidRpcMethod.Resolve, url: testDwnUrl, data: 'some-data' }; + + const mockResponse = new Response(JSON.stringify({}), { status: 500 }); + sinon.stub(globalThis, 'fetch').resolves(mockResponse); + + try { + await client.sendDidRequest(request); + expect.fail('Expected an error to be thrown'); + } catch(error: any) { + expect(error.message).to.contain(`Error encountered while processing response from ${testDwnUrl}`); + } + }); + + it('should return json rpc result', async () => { + const request = { method: DidRpcMethod.Resolve, url: testDwnUrl, data: 'some-data' }; + + const requestId = cryptoUtils.randomUuid(); + const jsonRpcResponse = createJsonRpcSuccessResponse( + requestId, + { status: { code: 200 }, data: 'data' } + ); + const mockResponse = new Response(JSON.stringify(jsonRpcResponse)); + sinon.stub(globalThis, 'fetch').resolves(mockResponse); + + const response = await client.sendDidRequest(request); + expect(response.status.code).to.equal(200); + expect(response.data).to.equal('data'); + }); + }); + }); + + describe('WebSocketWeb5RpcClient', () => { + let alice: Persona; + const client = new WebSocketWeb5RpcClient(); + // we set the client to a websocket url + const dwnUrl = new URL(testDwnUrl); + dwnUrl.protocol = dwnUrl.protocol === 'http:' ? 'ws:' : 'wss:'; + const socketDwnUrl = dwnUrl.toString(); + + beforeEach(async () => { + sinon.restore(); + + alice = await TestDataGenerator.generateDidKeyPersona(); + }); + + describe('sendDwnRequest', () => { + it('supports sending dwn requests', async () => { + // create a generic records query + const { message } = await TestDataGenerator.generateRecordsQuery({ + author : alice, + filter : { + schema: 'foo/bar' + } + }); + + const response = await client.sendDwnRequest({ + dwnUrl : socketDwnUrl, + targetDid : alice.did, + message, + }); + + // should return success but without any records as none exist yet + expect(response.status.code).to.equal(200); + expect(response.entries).to.exist; + expect(response.entries?.length).to.equal(0); + }); + }); + + describe('sendDidRequest', () => { + it('did requests are not supported over sockets', async () => { + const request = { method: DidRpcMethod.Resolve, url: socketDwnUrl, data: 'some-data' }; + try { + await client.sendDidRequest(request); + expect.fail('Expected error to be thrown'); + } catch (error: any) { + expect(error.message).to.equal('not implemented for transports [ws:, wss:]'); + } + }); + }); + }); +}); \ No newline at end of file diff --git a/packages/agent/tests/utils/runtimes.ts b/packages/agent/tests/utils/runtimes.ts index 237e3ef5c..dde1f2a7e 100644 --- a/packages/agent/tests/utils/runtimes.ts +++ b/packages/agent/tests/utils/runtimes.ts @@ -1 +1,2 @@ -export const isChrome = typeof navigator !== 'undefined' && /Chrome/.test(navigator.userAgent); \ No newline at end of file +export const isChrome = typeof navigator !== 'undefined' && /Chrome/.test(navigator.userAgent); +export const isNode = typeof navigator === 'undefined' && typeof process !== 'undefined' && process.versions != null && process.versions.node != null; \ No newline at end of file diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index 603ab5e0e..cc2868d75 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -59,6 +59,9 @@ importers: ed25519-keygen: specifier: 0.4.11 version: 0.4.11 + isomorphic-ws: + specifier: ^5.0.0 + version: 5.0.0(ws@8.13.0) level: specifier: 8.0.0 version: 8.0.0 @@ -6316,6 +6319,14 @@ packages: engines: {node: '>=10'} dev: true + /isomorphic-ws@5.0.0(ws@8.13.0): + resolution: {integrity: sha512-muId7Zzn9ywDsyXgTIafTry2sV3nySZeUDe6YedVd1Hvuuep5AsIlqK+XefWpYTyJG5e503F2xIuT2lcU6rCSw==} + peerDependencies: + ws: '*' + dependencies: + ws: 8.13.0 + dev: false + /istanbul-lib-coverage@3.2.2: resolution: {integrity: sha512-O8dpsF+r0WV/8MNRKfnmrtCWhuKjxrq2w+jpzBL5UZKTi2LeVWnWOmWRxFlesJONmc+wLAGvKQZEOanko0LFTg==} engines: {node: '>=8'} @@ -9440,7 +9451,6 @@ packages: optional: true utf-8-validate: optional: true - dev: true /xml@1.0.1: resolution: {integrity: sha512-huCv9IH9Tcf95zuYCsQraZtWnJvBtLVE0QHMOs8bWyZAFZNDcYjsPq1nEx8jKA9y+Beo9v+7OBPRisQTjinQMw==}