From 7de0ad1de5b243cd628fb28e20e9341a1b4310d8 Mon Sep 17 00:00:00 2001 From: Liran Cohen Date: Thu, 22 Feb 2024 22:48:58 -0500 Subject: [PATCH] increase testing across the board --- src/connection/socket-connection.ts | 15 +- src/json-rpc-handlers/subscription/close.ts | 10 ++ src/json-rpc-socket.ts | 20 +-- src/lib/json-rpc-router.ts | 4 +- src/lib/json-rpc.ts | 8 - tests/connection/connection-manager.spec.ts | 45 ++++++ tests/connection/socket-connection.spec.ts | 159 ++++++++++++++++++++ tests/dwn-process-message.spec.ts | 73 ++++++++- tests/json-rpc-socket.spec.ts | 83 +++++++++- tests/rpc-subscribe-close.spec.ts | 124 +++++++++++++++ tests/utils.ts | 24 ++- tests/ws-api.spec.ts | 10 +- 12 files changed, 530 insertions(+), 45 deletions(-) create mode 100644 tests/connection/connection-manager.spec.ts create mode 100644 tests/connection/socket-connection.spec.ts create mode 100644 tests/rpc-subscribe-close.spec.ts diff --git a/src/connection/socket-connection.ts b/src/connection/socket-connection.ts index 1178506..82c18c9 100644 --- a/src/connection/socket-connection.ts +++ b/src/connection/socket-connection.ts @@ -148,7 +148,6 @@ export class SocketConnection { JsonRpcErrorCodes.BadRequest, (error as Error).message ); - return this.send(errorResponse); }; @@ -166,16 +165,10 @@ export class SocketConnection { } /** - * Sends a JSON encoded Buffer through the Websocket. Accepts a callback, if none is provided an error logger is used. + * Sends a JSON encoded Buffer through the Websocket. */ - private send(response: JsonRpcResponse | JsonRpcErrorResponse, cb?: (error?: Error) => void): void { - if (!cb) { - cb = (error):void => { - if(error) { log.error('socket send error', error, response); } - } - } - - this.socket.send(Buffer.from(JSON.stringify(response)), cb); + private send(response: JsonRpcResponse | JsonRpcErrorResponse): void { + this.socket.send(Buffer.from(JSON.stringify(response))); } /** @@ -185,7 +178,7 @@ export class SocketConnection { */ private createSubscriptionHandler(id: JsonRpcId): (message: MessageEvent) => void { return (event) => { - const response = createJsonRpcSuccessResponse(id, { reply: { event } }); + const response = createJsonRpcSuccessResponse(id, { event }); this.send(response); } } diff --git a/src/json-rpc-handlers/subscription/close.ts b/src/json-rpc-handlers/subscription/close.ts index fa15853..55de23a 100644 --- a/src/json-rpc-handlers/subscription/close.ts +++ b/src/json-rpc-handlers/subscription/close.ts @@ -25,6 +25,16 @@ export const handleSubscriptionsClose: JsonRpcHandler = async ( context, ) => { const requestId = dwnRequest.id ?? uuidv4(); + if (context.socketConnection === undefined) { + const jsonRpcResponse = createJsonRpcErrorResponse(requestId, JsonRpcErrorCodes.InvalidRequest, 'socket connection does not exist'); + return { jsonRpcResponse }; + } + + if (dwnRequest.subscribe === undefined) { + const jsonRpcResponse = createJsonRpcErrorResponse(requestId, JsonRpcErrorCodes.InvalidRequest, 'subscribe options do not exist'); + return { jsonRpcResponse }; + } + const { socketConnection } = context; const { id } = dwnRequest.subscribe as { id: JsonRpcId }; diff --git a/src/json-rpc-socket.ts b/src/json-rpc-socket.ts index ce327c8..941f366 100644 --- a/src/json-rpc-socket.ts +++ b/src/json-rpc-socket.ts @@ -2,7 +2,7 @@ import log from 'loglevel'; import { v4 as uuidv4 } from 'uuid'; import WebSocket from 'ws'; -import type { JsonRpcRequest, JsonRpcResponse } from "./lib/json-rpc.js"; +import type { JsonRpcId, JsonRpcRequest, JsonRpcResponse } from "./lib/json-rpc.js"; import { createJsonRpcSubscribeRequest } from "./lib/json-rpc.js"; // These were arbitrarily chosen, but can be modified via connect options @@ -89,7 +89,7 @@ export class JsonRpcSocket { * Sends a JSON-RPC request through the socket and keeps a listener open to read associated responses as they arrive. * Returns a close method to clean up the listener. */ - async subscribe(request: JsonRpcRequest, listener: (response: JsonRpcResponse) => void): Promise<{ + async subscribe(request: JsonRpcRequest, listener: (response: JsonRpcResponse) => void): Promise<{ response: JsonRpcResponse; close?: () => Promise; }> { @@ -109,8 +109,8 @@ export class JsonRpcSocket { if (jsonRpcResponse.error !== undefined) { // remove the event listener upon receipt of a JSON RPC Error. this.socket.removeEventListener('message', messageHandler); + this.closeSubscription(subscriptionId); } - listener(jsonRpcResponse); } }; @@ -125,12 +125,7 @@ export class JsonRpcSocket { // clean up listener and create a `rpc.subscribe.close` message to use when closing this JSON RPC subscription const close = async (): Promise => { this.socket.removeEventListener('message', messageHandler); - const requestId = uuidv4(); - const request = createJsonRpcSubscribeRequest(requestId, 'rpc.subscribe.close', {}, subscriptionId) - const response = await this.request(request); - if (response.error) { - throw response.error; - } + await this.closeSubscription(subscriptionId); } return { @@ -139,11 +134,16 @@ export class JsonRpcSocket { } } + private closeSubscription(id: JsonRpcId): Promise { + const requestId = uuidv4(); + const request = createJsonRpcSubscribeRequest(requestId, 'rpc.subscribe.close', {}, id); + return this.request(request); + } + /** * Sends a JSON-RPC request through the socket. You must subscribe to a message listener separately to capture the response. */ send(request: JsonRpcRequest):void { this.socket.send(Buffer.from(JSON.stringify(request))); - return; } } \ No newline at end of file diff --git a/src/lib/json-rpc-router.ts b/src/lib/json-rpc-router.ts index 580d375..75e9adf 100644 --- a/src/lib/json-rpc-router.ts +++ b/src/lib/json-rpc-router.ts @@ -1,4 +1,4 @@ -import type { Dwn, MessageEvent } from '@tbd54566975/dwn-sdk-js'; +import type { Dwn, EventSubscriptionHandler } from '@tbd54566975/dwn-sdk-js'; import type { Readable } from 'node:stream'; @@ -13,7 +13,7 @@ export type RequestContext = { /** The JsonRpcId of the subscription handler */ id: JsonRpcId; /** The `MessageEvent` handler associated with a subscription request, only used in `ws` requests */ - subscriptionHandler: (message: MessageEvent) => void; + subscriptionHandler: EventSubscriptionHandler; } /** The `Readable` stream associated with a `RecordsWrite` request only used in `http` requests */ dataStream?: Readable; diff --git a/src/lib/json-rpc.ts b/src/lib/json-rpc.ts index bb4021d..30ee5d5 100644 --- a/src/lib/json-rpc.ts +++ b/src/lib/json-rpc.ts @@ -127,11 +127,3 @@ export const createJsonRpcSuccessResponse = ( result: result ?? null, }; }; - -export function parseJson(text: string): object | null { - try { - return JSON.parse(text); - } catch { - return null; - } -} diff --git a/tests/connection/connection-manager.spec.ts b/tests/connection/connection-manager.spec.ts new file mode 100644 index 0000000..657c859 --- /dev/null +++ b/tests/connection/connection-manager.spec.ts @@ -0,0 +1,45 @@ +import type { Dwn } from '@tbd54566975/dwn-sdk-js'; + +import chaiAsPromised from 'chai-as-promised'; +import chai, { expect } from 'chai'; + +import sinon from 'sinon'; +import { WebSocket } from 'ws'; +import { getTestDwn } from '../test-dwn.js'; +import { InMemoryConnectionManager } from '../../src/connection/connection-manager.js'; + +chai.use(chaiAsPromised); + +describe('InMemoryConnectionManager', () => { + let dwn: Dwn; + let connectionManager: InMemoryConnectionManager; + + beforeEach(async () => { + dwn = await getTestDwn({ withEvents: true }); + connectionManager = new InMemoryConnectionManager(dwn); + }); + + afterEach(async () => { + await connectionManager.closeAll(); + await dwn.close(); + sinon.restore(); + }); + + it('adds connection to the connections map and closes all', async () => { + const socket1 = sinon.createStubInstance(WebSocket); + await connectionManager.connect(socket1); + expect((connectionManager as any).connections.size).to.equal(1); + + const socket2 = sinon.createStubInstance(WebSocket); + await connectionManager.connect(socket2); + expect((connectionManager as any).connections.size).to.equal(2); + }); + + xit('closes all connections', async () => { + const socket = sinon.createStubInstance(WebSocket); + await connectionManager.connect(socket); + expect((connectionManager as any).connections.size).to.equal(1); + await connectionManager.closeAll(); + expect((connectionManager as any).connections.size).to.equal(0); + }); +}); \ No newline at end of file diff --git a/tests/connection/socket-connection.spec.ts b/tests/connection/socket-connection.spec.ts new file mode 100644 index 0000000..3f033a5 --- /dev/null +++ b/tests/connection/socket-connection.spec.ts @@ -0,0 +1,159 @@ +import type { Dwn } from '@tbd54566975/dwn-sdk-js'; + +import chaiAsPromised from 'chai-as-promised'; +import chai, { expect } from 'chai'; + +import sinon from 'sinon'; +import { WebSocket } from 'ws'; +import { SocketConnection } from '../../src/connection/socket-connection.js'; +import { getTestDwn } from '../test-dwn.js'; +import log from 'loglevel'; + +chai.use(chaiAsPromised); + +describe('SocketConnection', () => { + let dwn: Dwn; + + before(async () => { + dwn = await getTestDwn(); + }); + + after(async () => { + await dwn.close(); + sinon.restore(); + }); + + it('should assign socket handlers', async () => { + const socket = sinon.createStubInstance(WebSocket); + const connection = new SocketConnection(socket, dwn); + expect(socket.on.callCount).to.equal(4); + expect(socket.on.args.map(arg => arg[0])).to.have.members(['message', 'close', 'error', 'pong']); + await connection.close(); + }); + + it('should add a subscription to the subscription manager map', async () => { + const socket = sinon.createStubInstance(WebSocket); + const connection = new SocketConnection(socket, dwn); + const subscriptionRequest = { + id: 'id', + method: 'method', + params: { param1: 'param' }, + close: async ():Promise => {} + } + + await connection.addSubscription(subscriptionRequest); + expect((connection as any).subscriptions.size).to.equal(1); + await connection.close(); + expect((connection as any).subscriptions.size).to.equal(0); + }); + + it('should reject a subscription with an Id of an existing subscription', async () => { + const socket = sinon.createStubInstance(WebSocket); + const connection = new SocketConnection(socket, dwn); + + const id = 'some-id'; + + const subscriptionRequest = { + id, + method: 'method', + params: { param1: 'param' }, + close: async ():Promise => {} + } + + await connection.addSubscription(subscriptionRequest); + expect((connection as any).subscriptions.size).to.equal(1); + + const addDuplicatePromise = connection.addSubscription(subscriptionRequest); + await expect(addDuplicatePromise).to.eventually.be.rejectedWith(`the subscription with id ${id} already exists`); + expect((connection as any).subscriptions.size).to.equal(1); + await connection.close(); + expect((connection as any).subscriptions.size).to.equal(0); + }); + + it('should close a subscription and remove it from the connection manager map', async () => { + const socket = sinon.createStubInstance(WebSocket); + const connection = new SocketConnection(socket, dwn); + + const id = 'some-id'; + + const subscriptionRequest = { + id, + method: 'method', + params: { param1: 'param' }, + close: async ():Promise => {} + } + + await connection.addSubscription(subscriptionRequest); + expect((connection as any).subscriptions.size).to.equal(1); + + await connection.closeSubscription(id); + expect((connection as any).subscriptions.size).to.equal(0); + + const closeAgainPromise = connection.closeSubscription(id); + await expect(closeAgainPromise).to.eventually.be.rejectedWith(`the subscription with id ${id} was not found`); + await connection.close(); + }); + + it('hasSubscription returns whether a subscription with the id already exists', async () => { + const socket = sinon.createStubInstance(WebSocket); + const connection = new SocketConnection(socket, dwn); + const subscriptionRequest = { + id: 'id', + method: 'method', + params: { param1: 'param' }, + close: async ():Promise => {} + } + + await connection.addSubscription(subscriptionRequest); + expect((connection as any).subscriptions.size).to.equal(1); + expect(connection.hasSubscription(subscriptionRequest.id)).to.be.true; + expect(connection.hasSubscription('does-not-exist')).to.be.false; + + await connection.closeSubscription(subscriptionRequest.id); + expect(connection.hasSubscription(subscriptionRequest.id)).to.be.false; + await connection.close(); + }); + + it('should close if pong is not triggered between heartbeat intervals', async () => { + const socket = sinon.createStubInstance(WebSocket); + const clock = sinon.useFakeTimers(); + const connection = new SocketConnection(socket, dwn); + const closeSpy = sinon.spy(connection, 'close'); + + clock.tick(60_100); // interval has to run twice + clock.restore(); + + expect(closeSpy.callCount).to.equal(1); + }); + + it('should not close if pong is called within the heartbeat interval', async () => { + const socket = sinon.createStubInstance(WebSocket); + const clock = sinon.useFakeTimers(); + const connection = new SocketConnection(socket, dwn); + const closeSpy = sinon.spy(connection, 'close'); + + (connection as any).pong(); // trigger a pong + clock.tick(30_100); // first interval + + (connection as any).pong(); // trigger a pong + clock.tick(30_100); // second interval + + expect(closeSpy.callCount).to.equal(0); + + clock.tick(30_100); // another interval without a ping + clock.restore(); + expect(closeSpy.callCount).to.equal(1); + }); + + it('logs an error and closes connection if error is triggered', async () => { + const socket = sinon.createStubInstance(WebSocket); + const connection = new SocketConnection(socket, dwn); + const logSpy = sinon.stub(log, 'error'); + const closeSpy = sinon.spy(connection, 'close'); + + (connection as any).error(new Error('some error')); + + expect(logSpy.callCount).to.equal(1); + expect(closeSpy.callCount).to.equal(1); + }); +}); \ No newline at end of file diff --git a/tests/dwn-process-message.spec.ts b/tests/dwn-process-message.spec.ts index 5249da2..fe4f3d1 100644 --- a/tests/dwn-process-message.spec.ts +++ b/tests/dwn-process-message.spec.ts @@ -1,9 +1,10 @@ import { expect } from 'chai'; +import sinon from 'sinon'; import { v4 as uuidv4 } from 'uuid'; import { handleDwnProcessMessage } from '../src/json-rpc-handlers/dwn/process-message.js'; import type { RequestContext } from '../src/lib/json-rpc-router.js'; -import { createJsonRpcRequest } from '../src/lib/json-rpc.js'; +import { JsonRpcErrorCodes, createJsonRpcRequest } from '../src/lib/json-rpc.js'; import { getTestDwn } from './test-dwn.js'; import { createRecordsWriteMessage } from './utils.js'; import { TestDataGenerator } from '@tbd54566975/dwn-sdk-js'; @@ -62,4 +63,74 @@ describe('handleDwnProcessMessage', function () { expect(reply.entries).to.be.undefined; await dwn.close(); }); + + it('should fail if no subscriptionRequest context exists for a `Subscribe` message', async function () { + const requestId = uuidv4(); + const dwnRequest = createJsonRpcRequest(requestId, 'dwn.processMessage', { + message: { + descriptor: { interface: 'Records', method: 'Subscribe' }, + }, + target: 'did:key:abc1234', + }); + + const dwn = await getTestDwn(); + const context: RequestContext = { dwn, transport: 'ws' }; + + const { jsonRpcResponse } = await handleDwnProcessMessage( + dwnRequest, + context, + ); + + expect(jsonRpcResponse.error).to.exist; + expect(jsonRpcResponse.error.code).to.equal(JsonRpcErrorCodes.InvalidRequest); + expect(jsonRpcResponse.error.message).to.equal('subscribe methods must contain a subscriptionRequest context'); + await dwn.close(); + }); + + it('should fail on http requests for a `Subscribe` message', async function () { + const requestId = uuidv4(); + const dwnRequest = createJsonRpcRequest(requestId, 'dwn.processMessage', { + message: { + descriptor: { interface: 'Records', method: 'Subscribe' }, + }, + target: 'did:key:abc1234', + }); + + const dwn = await getTestDwn(); + const context: RequestContext = { dwn, transport: 'http', subscriptionRequest: { id: 'test', subscriptionHandler: () => {}} }; + + const { jsonRpcResponse } = await handleDwnProcessMessage( + dwnRequest, + context, + ); + + expect(jsonRpcResponse.error).to.exist; + expect(jsonRpcResponse.error.code).to.equal(JsonRpcErrorCodes.InvalidParams); + expect(jsonRpcResponse.error.message).to.equal('subscriptions are not supported via http'); + await dwn.close(); + }); + + it('should return a JsonRpc Internal Error for an unexpected thrown error within the handler', async function () { + const requestId = uuidv4(); + const dwnRequest = createJsonRpcRequest(requestId, 'dwn.processMessage', { + message: { + descriptor: { interface: 'Records' }, + }, + target: 'did:key:abc1234', + }); + + const dwn = await getTestDwn(); + sinon.stub(dwn, 'processMessage').throws(new Error('unexpected error')); + const context: RequestContext = { dwn, transport: 'http' }; + + const { jsonRpcResponse } = await handleDwnProcessMessage( + dwnRequest, + context, + ); + + expect(jsonRpcResponse.error).to.exist; + expect(jsonRpcResponse.error.code).to.equal(JsonRpcErrorCodes.InternalError); + expect(jsonRpcResponse.error.message).to.equal('unexpected error'); + await dwn.close(); + }); }); diff --git a/tests/json-rpc-socket.spec.ts b/tests/json-rpc-socket.spec.ts index 20f2daf..7897438 100644 --- a/tests/json-rpc-socket.spec.ts +++ b/tests/json-rpc-socket.spec.ts @@ -7,7 +7,7 @@ import { WebSocketServer } from 'ws'; import type { JsonRpcId, JsonRpcRequest, JsonRpcSuccessResponse } from '../src/lib/json-rpc.js'; import { JsonRpcSocket } from '../src/json-rpc-socket.js'; -import { createJsonRpcRequest, createJsonRpcSubscribeRequest, createJsonRpcSuccessResponse } from '../src/lib/json-rpc.js'; +import { JsonRpcErrorCodes, createJsonRpcErrorResponse, createJsonRpcRequest, createJsonRpcSubscribeRequest, createJsonRpcSuccessResponse } from '../src/lib/json-rpc.js'; chai.use(chaiAsPromised); @@ -80,6 +80,7 @@ describe('JsonRpcSocket', () => { } }); }); + const client = await JsonRpcSocket.connect('ws://127.0.0.1:9003', { responseTimeout: 5 }); const requestId = uuidv4(); const subscribeId = uuidv4(); @@ -126,9 +127,87 @@ describe('JsonRpcSocket', () => { await expect(receivedPromise).to.eventually.eql({ reply: { id: request.id }}); }); + it('closes subscription upon receiving a JsonRpc Error for a long running subscription', async () => { + let closed = true; + wsServer.addListener('connection', (socket) => { + closed = false; + socket.on('message', (dataBuffer: Buffer) => { + const request = JSON.parse(dataBuffer.toString()) as JsonRpcRequest; + if (request.method.startsWith('rpc.subscribe') && request.method !== 'rpc.subscribe.close') { + // initial response + const response = createJsonRpcSuccessResponse(request.id, { reply: {} }) + socket.send(Buffer.from(JSON.stringify(response))); + const { subscribe } = request; + + // send 1 valid message + const message1 = createJsonRpcSuccessResponse(subscribe.id, { message: 1 }); + socket.send(Buffer.from(JSON.stringify(message1))); + + // send a json rpc error + const jsonRpcError = createJsonRpcErrorResponse(subscribe.id, JsonRpcErrorCodes.InternalError, 'some error'); + socket.send(Buffer.from(JSON.stringify(jsonRpcError))); + + // send a 2nd message that shouldn't be handled + const message2 = createJsonRpcSuccessResponse(subscribe.id, { message: 2 }); + socket.send(Buffer.from(JSON.stringify(message2))); + } else if (request.method === 'rpc.subscribe.close') { + closed = true; + } + }); + }); + + const client = await JsonRpcSocket.connect('ws://127.0.0.1:9003', { responseTimeout: 5 }); + const requestId = uuidv4(); + const subscribeId = uuidv4(); + const request = createJsonRpcSubscribeRequest( + requestId, + 'rpc.subscribe.test.method', + { param1: 'test-param1', param2: 'test-param2' }, + subscribeId, + ); + + let responseCounter = 0; + let errorCounter = 0; + const responseListener = (response: JsonRpcSuccessResponse): void => { + expect(response.id).to.equal(subscribeId); + if (response.error) { + errorCounter++; + } + + if (response.result) { + responseCounter++; + } + } + + const subscription = await client.subscribe(request, responseListener); + expect(subscription.response.error).to.be.undefined; + // wait for the messages to arrive + await new Promise((resolve) => setTimeout(resolve, 5)); + // the original response + expect(responseCounter).to.equal(1); + expect(errorCounter).to.equal(1); + expect(closed).to.equal(true); + }); + + it('only JSON RPC Methods prefixed with `rpc.subscribe.` are accepted for a subscription', async () => { + const client = await JsonRpcSocket.connect('ws://127.0.0.1:9003'); + const requestId = uuidv4(); + const request = createJsonRpcRequest(requestId, 'test.method', { param1: 'test-param1', param2: 'test-param2' }); + const subscribePromise = client.subscribe(request, () => {}); + await expect(subscribePromise).to.eventually.be.rejectedWith('subscribe rpc requests must include the `rpc.subscribe` prefix'); + }); + + it('subscribe methods must contain a subscribe object within the request which contains the subscription JsonRpcId', async () => { + const client = await JsonRpcSocket.connect('ws://127.0.0.1:9003'); + const requestId = uuidv4(); + const request = createJsonRpcRequest(requestId, 'rpc.subscribe.test.method', { param1: 'test-param1', param2: 'test-param2' }); + const subscribePromise = client.subscribe(request, () => {}); + await expect(subscribePromise).to.eventually.be.rejectedWith('subscribe rpc requests must include subscribe options'); + }); + xit('calls onerror handler', async () => { }); - xit('calls onclose handler', async () => { + xit('calls onclose hanhler', async () => { }); }); diff --git a/tests/rpc-subscribe-close.spec.ts b/tests/rpc-subscribe-close.spec.ts new file mode 100644 index 0000000..06ad6c5 --- /dev/null +++ b/tests/rpc-subscribe-close.spec.ts @@ -0,0 +1,124 @@ +import { expect } from 'chai'; +import sinon from 'sinon'; +import { v4 as uuidv4 } from 'uuid'; + +import type { RequestContext } from '../src/lib/json-rpc-router.js'; +import { JsonRpcErrorCodes, createJsonRpcRequest, createJsonRpcSubscribeRequest } from '../src/lib/json-rpc.js'; +import { getTestDwn } from './test-dwn.js'; +import { handleSubscriptionsClose } from '../src/json-rpc-handlers/subscription/close.js'; +import { SocketConnection } from '../src/connection/socket-connection.js'; +import { DwnServerError, DwnServerErrorCode } from '../src/dwn-error.js'; + +describe('handleDwnProcessMessage', function () { + it('should return an error if no socket connection exists', async function () { + const requestId = uuidv4(); + const dwnRequest = createJsonRpcRequest(requestId, 'rpc.subscribe.close', { }); + + const dwn = await getTestDwn(); + const context: RequestContext = { dwn, transport: 'ws' }; + + const { jsonRpcResponse } = await handleSubscriptionsClose( + dwnRequest, + context, + ); + + expect(jsonRpcResponse.error).to.exist; + expect(jsonRpcResponse.error.code).to.equal(JsonRpcErrorCodes.InvalidRequest); + expect(jsonRpcResponse.error.message).to.equal('socket connection does not exist'); + }); + + it('should return an error if no subscribe options exist', async function () { + const requestId = uuidv4(); + const dwnRequest = createJsonRpcRequest(requestId, 'rpc.subscribe.close', { }); + const socketConnection = sinon.createStubInstance(SocketConnection); + + const dwn = await getTestDwn(); + const context: RequestContext = { dwn, transport: 'ws', socketConnection }; + + const { jsonRpcResponse } = await handleSubscriptionsClose( + dwnRequest, + context, + ); + + expect(jsonRpcResponse.error).to.exist; + expect(jsonRpcResponse.error.code).to.equal(JsonRpcErrorCodes.InvalidRequest); + expect(jsonRpcResponse.error.message).to.equal('subscribe options do not exist'); + }); + + it('should return an error if close subscription throws ConnectionSubscriptionJsonRpcIdNotFound', async function () { + const requestId = uuidv4(); + const id = 'some-id'; + const dwnRequest = createJsonRpcSubscribeRequest(requestId, 'rpc.subscribe.close', {}, id); + const socketConnection = sinon.createStubInstance(SocketConnection); + socketConnection.closeSubscription.throws(new DwnServerError( + DwnServerErrorCode.ConnectionSubscriptionJsonRpcIdNotFound, + '' + )); + + const dwn = await getTestDwn(); + const context: RequestContext = { dwn, transport: 'ws', socketConnection }; + + const { jsonRpcResponse } = await handleSubscriptionsClose( + dwnRequest, + context, + ); + + expect(jsonRpcResponse.error).to.exist; + expect(jsonRpcResponse.error.code).to.equal(JsonRpcErrorCodes.InvalidParams); + expect(jsonRpcResponse.error.message).to.equal(`subscription ${id} does not exist.`); + }); + + it('should return an error if close subscription throws ConnectionSubscriptionJsonRpcIdNotFound', async function () { + const requestId = uuidv4(); + const id = 'some-id'; + const dwnRequest = createJsonRpcSubscribeRequest(requestId, 'rpc.subscribe.close', {}, id); + const socketConnection = sinon.createStubInstance(SocketConnection); + socketConnection.closeSubscription.throws(new Error('unknown error')); + + const dwn = await getTestDwn(); + const context: RequestContext = { dwn, transport: 'ws', socketConnection }; + + const { jsonRpcResponse } = await handleSubscriptionsClose( + dwnRequest, + context, + ); + + expect(jsonRpcResponse.error).to.exist; + expect(jsonRpcResponse.error.code).to.equal(JsonRpcErrorCodes.InternalError); + expect(jsonRpcResponse.error.message).to.equal(`unknown subscription close error for ${id}: unknown error`); + }); + + it('should return a success', async function () { + const requestId = uuidv4(); + const id = 'some-id'; + const dwnRequest = createJsonRpcSubscribeRequest(requestId, 'rpc.subscribe.close', {}, id); + const socketConnection = sinon.createStubInstance(SocketConnection); + + const dwn = await getTestDwn(); + const context: RequestContext = { dwn, transport: 'ws', socketConnection }; + + const { jsonRpcResponse } = await handleSubscriptionsClose( + dwnRequest, + context, + ); + expect(jsonRpcResponse.error).to.not.exist; + }); + + it('handler should generate a recordId if one is not provided with the request', async function () { + const requestId = uuidv4(); + const id = 'some-id'; + const dwnRequest = createJsonRpcSubscribeRequest(requestId, 'rpc.subscribe.close', {}, id); + delete dwnRequest.id; // delete request id + + const socketConnection = sinon.createStubInstance(SocketConnection); + + const dwn = await getTestDwn(); + const context: RequestContext = { dwn, transport: 'ws', socketConnection }; + + const { jsonRpcResponse } = await handleSubscriptionsClose( + dwnRequest, + context, + ); + expect(jsonRpcResponse.error).to.not.exist; + }); +}); diff --git a/tests/utils.ts b/tests/utils.ts index 55761e6..e7b7f8e 100644 --- a/tests/utils.ts +++ b/tests/utils.ts @@ -1,4 +1,4 @@ -import type { GenericMessage, MessageEvent, Persona, UnionMessageReply } from '@tbd54566975/dwn-sdk-js'; +import type { EventSubscriptionHandler, GenericMessage, Persona, UnionMessageReply } from '@tbd54566975/dwn-sdk-js'; import { Cid, DataStream, RecordsWrite } from '@tbd54566975/dwn-sdk-js'; import type { ReadStream } from 'node:fs'; @@ -233,12 +233,24 @@ export async function sendWsRequest(options: { return connection.request(request); } -export async function subscriptionRequest(options: { - url?: string, +/** + * A helper method for testing JSON RPC socket subscription requests to the DWN. + * + * If a connection is not provided, creates a new connection to the url provided. + * If no subscribe options are provided, creates a subscribe id. + * Attempts to subscribe and returns the response, close function and connection. + */ +export async function subscribeToMessageEvents(options: { + /** json rpc socket connection, mutually exclusive with url */ connection?: JsonRpcSocket, + /** url to connect to if no connection is provided */ + url?: string, + /** the request to use for subscription */ request: JsonRpcRequest, - messageHandler: (event: MessageEvent) => void, - responseTimeout?: number; + /** the message handler to use for incoming events */ + messageHandler: EventSubscriptionHandler, + /** optional response timeout for new connections */ + responseTimeout?: number, }): Promise<{ close?: () => Promise, response: JsonRpcResponse, connection?: JsonRpcSocket }> { const { url, connection: incomingConnection, request, messageHandler, responseTimeout } = options; const connection = incomingConnection ?? await JsonRpcSocket.connect(url, { responseTimeout }); @@ -247,7 +259,7 @@ export async function subscriptionRequest(options: { }; const { close, response } = await connection.subscribe(request, (response) => { - const { event } = response.result.reply; + const { event } = response.result; messageHandler(event); }); diff --git a/tests/ws-api.spec.ts b/tests/ws-api.spec.ts index 502ce7c..893a592 100644 --- a/tests/ws-api.spec.ts +++ b/tests/ws-api.spec.ts @@ -18,7 +18,7 @@ import { import { config } from '../src/config.js'; import { WsApi } from '../src/ws-api.js'; import { getTestDwn } from './test-dwn.js'; -import { createRecordsWriteMessage, sendWsMessage, sendHttpMessage, subscriptionRequest, sendWsRequest } from './utils.js'; +import { createRecordsWriteMessage, sendWsMessage, sendHttpMessage, subscribeToMessageEvents, sendWsRequest } from './utils.js'; import { HttpApi } from '../src/http-api.js'; @@ -117,7 +117,7 @@ describe('websocket api', function () { target: alice.did, }); - const { response, close } = await subscriptionRequest({ + const { response, close } = await subscribeToMessageEvents({ url : 'ws://127.0.0.1:9002', request : dwnRequest, messageHandler : subscriptionHandler @@ -186,7 +186,7 @@ describe('websocket api', function () { target: alice.did, }); - const { response, close } = await subscriptionRequest({ + const { response, close } = await subscribeToMessageEvents({ url : 'ws://127.0.0.1:9002', request : dwnRequest, messageHandler : subscriptionHandler @@ -268,7 +268,7 @@ describe('websocket api', function () { target: alice.did }, subscribeId); - const { response, close, connection } = await subscriptionRequest({ + const { response, close, connection } = await subscribeToMessageEvents({ url : 'ws://127.0.0.1:9002', request : dwnRequest, messageHandler : subscriptionHandler @@ -287,7 +287,7 @@ describe('websocket api', function () { target: alice.did }, subscribeId); - const { response: response2 } = await subscriptionRequest({ + const { response: response2 } = await subscribeToMessageEvents({ connection, request: dwnRequest2, messageHandler: subscriptionHandler,