diff --git a/package-lock.json b/package-lock.json index f111f15..bd5ee4a 100644 --- a/package-lock.json +++ b/package-lock.json @@ -1,12 +1,12 @@ { "name": "@web5/dwn-server", - "version": "0.1.11", + "version": "0.1.12", "lockfileVersion": 3, "requires": true, "packages": { "": { "name": "@web5/dwn-server", - "version": "0.1.11", + "version": "0.1.12", "dependencies": { "@tbd54566975/dwn-sdk-js": "0.2.18", "@tbd54566975/dwn-sql-store": "0.2.10", diff --git a/package.json b/package.json index fd60e4c..20bad08 100644 --- a/package.json +++ b/package.json @@ -1,7 +1,7 @@ { "name": "@web5/dwn-server", "type": "module", - "version": "0.1.11", + "version": "0.1.12", "files": [ "dist", "src" diff --git a/src/config.ts b/src/config.ts index adc5de3..4d9c3f1 100644 --- a/src/config.ts +++ b/src/config.ts @@ -8,7 +8,7 @@ export const config = { // port that server listens on port: parseInt(process.env.DS_PORT || '3000'), // whether to enable 'ws:' - webSocketServerEnabled: { on: true, off: false }[process.env.DS_WEBSOCKET_SERVER] ?? true, + webSocketSupport: { on: true, off: false }[process.env.DS_WEBSOCKET_SERVER] ?? true, // where to store persistent data messageStore: process.env.DWN_STORAGE_MESSAGES || process.env.DWN_STORAGE || 'level://data', dataStore: process.env.DWN_STORAGE_DATA || process.env.DWN_STORAGE || 'level://data', diff --git a/src/connection/socket-connection.ts b/src/connection/socket-connection.ts index 49625c1..bf920db 100644 --- a/src/connection/socket-connection.ts +++ b/src/connection/socket-connection.ts @@ -175,7 +175,7 @@ export class SocketConnection { * Sends a JSON encoded Buffer through the Websocket. */ private send(response: JsonRpcResponse | JsonRpcErrorResponse): void { - this.socket.send(Buffer.from(JSON.stringify(response))); + this.socket.send(JSON.stringify(response)); } /** diff --git a/src/dwn-server.ts b/src/dwn-server.ts index 7a9e23f..cd606ce 100644 --- a/src/dwn-server.ts +++ b/src/dwn-server.ts @@ -62,7 +62,7 @@ export class DwnServer { }); let eventStream: EventStream | undefined; - if (this.config.webSocketServerEnabled) { + if (this.config.webSocketSupport) { // setting `EventEmitterStream` as default the default `EventStream // if an alternate implementation is needed, instantiate a `Dwn` with a custom `EventStream` and add it to server options. eventStream = new EventEmitterStream(); @@ -84,7 +84,7 @@ export class DwnServer { this.#httpApi.server, ); - if (this.config.webSocketServerEnabled) { + if (this.config.webSocketSupport) { this.#wsApi = new WsApi(this.#httpApi.server, this.dwn); this.#wsApi.start(); log.info('WebSocketServer ready...'); diff --git a/src/http-api.ts b/src/http-api.ts index e5bbb24..19b4680 100644 --- a/src/http-api.ts +++ b/src/http-api.ts @@ -190,6 +190,7 @@ export class HttpApi { registrationRequirements : registrationRequirements, version : packageJson.version, sdkVersion : packageJson.dependencies['@tbd54566975/dwn-sdk-js'], + webSocketSupport : config.webSocketSupport, }); }); } diff --git a/src/json-rpc-socket.ts b/src/json-rpc-socket.ts index 3b3cc7b..3bbb2e3 100644 --- a/src/json-rpc-socket.ts +++ b/src/json-rpc-socket.ts @@ -29,28 +29,33 @@ export class JsonRpcSocket { static async connect(url: string, options: JsonRpcSocketOptions = {}): Promise { const { connectTimeout = CONNECT_TIMEOUT, responseTimeout = RESPONSE_TIMEOUT, onclose, onerror } = options; - const socket = new WebSocket(url, { timeout: connectTimeout }); + const socket = new WebSocket(url); - socket.onclose = onclose; - socket.onerror = onerror; - - if (!socket.onclose) { + if (!onclose) { socket.onclose = ():void => { log.info(`JSON RPC Socket close ${url}`); - } + }; + } else { + socket.onclose = onclose; } - if (!socket.onerror) { + if (!onerror) { socket.onerror = (error?: any):void => { log.error(`JSON RPC Socket error ${url}`, error); - } + }; + } else { + socket.onerror = onerror; } return new Promise((resolve, reject) => { - socket.on('open', () => { + socket.addEventListener('open', () => { resolve(new JsonRpcSocket(socket, responseTimeout)); }); + socket.addEventListener('error', (error) => { + reject(error); + }); + setTimeout(() => reject, connectTimeout); }); } @@ -67,7 +72,7 @@ export class JsonRpcSocket { request.id ??= uuidv4(); const handleResponse = (event: { data: any }):void => { - const jsonRpsResponse = JSON.parse(event.data.toString()) as JsonRpcResponse; + const jsonRpsResponse = JSON.parse(event.data) as JsonRpcResponse; if (jsonRpsResponse.id === request.id) { // if the incoming response id matches the request id, we will remove the listener and resolve the response this.socket.removeEventListener('message', handleResponse); @@ -120,19 +125,19 @@ export class JsonRpcSocket { const response = await this.request(request); if (response.error) { this.socket.removeEventListener('message', socketEventListener); - return { response } + return { response }; } // clean up listener and create a `rpc.subscribe.close` message to use when closing this JSON RPC subscription const close = async (): Promise => { this.socket.removeEventListener('message', socketEventListener); await this.closeSubscription(subscriptionId); - } + }; return { response, close - } + }; } private closeSubscription(id: JsonRpcId): Promise { @@ -145,6 +150,6 @@ export class JsonRpcSocket { * Sends a JSON-RPC request through the socket. You must subscribe to a message listener separately to capture the response. */ send(request: JsonRpcRequest):void { - this.socket.send(Buffer.from(JSON.stringify(request))); + this.socket.send(JSON.stringify(request)); } } \ No newline at end of file diff --git a/tests/dwn-server.spec.ts b/tests/dwn-server.spec.ts index 9c79327..1f33890 100644 --- a/tests/dwn-server.spec.ts +++ b/tests/dwn-server.spec.ts @@ -20,14 +20,14 @@ describe('DwnServer', function () { expect(dwnServer.httpServer.listening).to.be.false; }); - describe('webSocketServerEnabled config', function() { + describe('webSocketSupport config', function() { it('should not return a websocket server if disabled', async function() { dwn = await getTestDwn({ withEvents: true }); const withoutSocketServer = new DwnServer({ dwn, config: { ...dwnServerConfig, - webSocketServerEnabled: false, + webSocketSupport: false, } }); @@ -44,7 +44,7 @@ describe('DwnServer', function () { dwn, config: { ...dwnServerConfig, - webSocketServerEnabled: true, + webSocketSupport: true, } }); diff --git a/tests/http-api.spec.ts b/tests/http-api.spec.ts index a64121e..621660d 100644 --- a/tests/http-api.spec.ts +++ b/tests/http-api.spec.ts @@ -537,5 +537,30 @@ describe('http api', function () { 'proof-of-work-sha256-v0', ); }); + + it('verify /info signals websocket support', async function() { + let resp = await fetch(`http://localhost:3000/info`); + expect(resp.status).to.equal(200); + + let info = await resp.json(); + expect(info['server']).to.equal('@web5/dwn-server'); + expect(info['webSocketSupport']).to.equal(true); + + + // start server without websocket support enabled + server.close(); + server.closeAllConnections(); + + config.webSocketSupport = false; + httpApi = new HttpApi(config, dwn, registrationManager); + server = await httpApi.start(3000); + + resp = await fetch(`http://localhost:3000/info`); + expect(resp.status).to.equal(200); + + info = await resp.json(); + expect(info['server']).to.equal('@web5/dwn-server'); + expect(info['webSocketSupport']).to.equal(false); + }); }); }); diff --git a/tests/json-rpc-socket.spec.ts b/tests/json-rpc-socket.spec.ts index 6f1c4c4..fd3ea74 100644 --- a/tests/json-rpc-socket.spec.ts +++ b/tests/json-rpc-socket.spec.ts @@ -67,6 +67,33 @@ describe('JsonRpcSocket', () => { await expect(requestPromise).to.eventually.be.rejectedWith('timed out'); }); + it('removes listener if subscription json rpc is rejected ', async () => { + wsServer.addListener('connection', (socket) => { + socket.on('message', (dataBuffer: Buffer) => { + const request = JSON.parse(dataBuffer.toString()) as JsonRpcRequest; + // initial response + const response = createJsonRpcErrorResponse(request.id, JsonRpcErrorCodes.BadRequest, 'bad request'); + socket.send(Buffer.from(JSON.stringify(response))); + }); + }); + + const client = await JsonRpcSocket.connect('ws://127.0.0.1:9003', { responseTimeout: 5 }); + const requestId = uuidv4(); + const subscribeId = uuidv4(); + const request = createJsonRpcSubscriptionRequest( + requestId, + 'rpc.subscribe.test.method', + { param1: 'test-param1', param2: 'test-param2' }, + subscribeId, + ); + + const responseListener = (_response: JsonRpcSuccessResponse): void => {} + + const subscription = await client.subscribe(request, responseListener); + expect(subscription.response.error).to.not.be.undefined; + expect(client['socket'].listenerCount('message')).to.equal(0); + }); + it('opens a subscription', async () => { wsServer.addListener('connection', (socket) => { socket.on('message', (dataBuffer: Buffer) => { @@ -230,5 +257,26 @@ describe('JsonRpcSocket', () => { expect(logMessage).to.equal('JSON RPC Socket close ws://127.0.0.1:9003'); }); - xit('calls onerror handler', async () => {}); + it('calls onerror handler', async () => { + // test injected handler + const onErrorHandler = { onerror: ():void => {} }; + const onErrorSpy = sinon.spy(onErrorHandler, 'onerror'); + const client = await JsonRpcSocket.connect('ws://127.0.0.1:9003', { onerror: onErrorHandler.onerror }); + client['socket'].emit('error', 'some error'); + + await new Promise((resolve) => setTimeout(resolve, 5)); // wait for close event to arrive + expect(onErrorSpy.callCount).to.equal(1, 'error'); + + // test default logger + const logInfoSpy = sinon.spy(log, 'error'); + const defaultClient = await JsonRpcSocket.connect('ws://127.0.0.1:9003'); + defaultClient['socket'].emit('error', 'some error'); + + await new Promise((resolve) => setTimeout(resolve, 5)); // wait for close event to arrive + expect(logInfoSpy.callCount).to.equal(1, 'log'); + + // extract log message from argument + const logMessage:string = logInfoSpy.args[0][0]!; + expect(logMessage).to.equal('JSON RPC Socket error ws://127.0.0.1:9003'); + }); });