Skip to content

Commit

Permalink
Better compatibility with browser WebSockets for the JsonRpcSocket
Browse files Browse the repository at this point in the history
…client. (#116)

- compatibility for `JsonRpcSocket` with the native browser `WebSocket`
client.
- enhanced test coverage
- included `webSocketSupport` in the `/info` boolean to signal
socket support.
  • Loading branch information
LiranCohen authored Feb 29, 2024
1 parent e1396cb commit 40e758c
Show file tree
Hide file tree
Showing 10 changed files with 104 additions and 25 deletions.
4 changes: 2 additions & 2 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
{
"name": "@web5/dwn-server",
"type": "module",
"version": "0.1.11",
"version": "0.1.12",
"files": [
"dist",
"src"
Expand Down
2 changes: 1 addition & 1 deletion src/config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ export const config = {
// port that server listens on
port: parseInt(process.env.DS_PORT || '3000'),
// whether to enable 'ws:'
webSocketServerEnabled: { on: true, off: false }[process.env.DS_WEBSOCKET_SERVER] ?? true,
webSocketSupport: { on: true, off: false }[process.env.DS_WEBSOCKET_SERVER] ?? true,
// where to store persistent data
messageStore: process.env.DWN_STORAGE_MESSAGES || process.env.DWN_STORAGE || 'level://data',
dataStore: process.env.DWN_STORAGE_DATA || process.env.DWN_STORAGE || 'level://data',
Expand Down
2 changes: 1 addition & 1 deletion src/connection/socket-connection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,7 @@ export class SocketConnection {
* Sends a JSON encoded Buffer through the Websocket.
*/
private send(response: JsonRpcResponse | JsonRpcErrorResponse): void {
this.socket.send(Buffer.from(JSON.stringify(response)));
this.socket.send(JSON.stringify(response));
}

/**
Expand Down
4 changes: 2 additions & 2 deletions src/dwn-server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ export class DwnServer {
});

let eventStream: EventStream | undefined;
if (this.config.webSocketServerEnabled) {
if (this.config.webSocketSupport) {
// setting `EventEmitterStream` as default the default `EventStream
// if an alternate implementation is needed, instantiate a `Dwn` with a custom `EventStream` and add it to server options.
eventStream = new EventEmitterStream();
Expand All @@ -84,7 +84,7 @@ export class DwnServer {
this.#httpApi.server,
);

if (this.config.webSocketServerEnabled) {
if (this.config.webSocketSupport) {
this.#wsApi = new WsApi(this.#httpApi.server, this.dwn);
this.#wsApi.start();
log.info('WebSocketServer ready...');
Expand Down
1 change: 1 addition & 0 deletions src/http-api.ts
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,7 @@ export class HttpApi {
registrationRequirements : registrationRequirements,
version : packageJson.version,
sdkVersion : packageJson.dependencies['@tbd54566975/dwn-sdk-js'],
webSocketSupport : config.webSocketSupport,
});
});
}
Expand Down
33 changes: 19 additions & 14 deletions src/json-rpc-socket.ts
Original file line number Diff line number Diff line change
Expand Up @@ -29,28 +29,33 @@ export class JsonRpcSocket {
static async connect(url: string, options: JsonRpcSocketOptions = {}): Promise<JsonRpcSocket> {
const { connectTimeout = CONNECT_TIMEOUT, responseTimeout = RESPONSE_TIMEOUT, onclose, onerror } = options;

const socket = new WebSocket(url, { timeout: connectTimeout });
const socket = new WebSocket(url);

socket.onclose = onclose;
socket.onerror = onerror;

if (!socket.onclose) {
if (!onclose) {
socket.onclose = ():void => {
log.info(`JSON RPC Socket close ${url}`);
}
};
} else {
socket.onclose = onclose;
}

if (!socket.onerror) {
if (!onerror) {
socket.onerror = (error?: any):void => {
log.error(`JSON RPC Socket error ${url}`, error);
}
};
} else {
socket.onerror = onerror;
}

return new Promise<JsonRpcSocket>((resolve, reject) => {
socket.on('open', () => {
socket.addEventListener('open', () => {
resolve(new JsonRpcSocket(socket, responseTimeout));
});

socket.addEventListener('error', (error) => {
reject(error);
});

setTimeout(() => reject, connectTimeout);
});
}
Expand All @@ -67,7 +72,7 @@ export class JsonRpcSocket {
request.id ??= uuidv4();

const handleResponse = (event: { data: any }):void => {
const jsonRpsResponse = JSON.parse(event.data.toString()) as JsonRpcResponse;
const jsonRpsResponse = JSON.parse(event.data) as JsonRpcResponse;
if (jsonRpsResponse.id === request.id) {
// if the incoming response id matches the request id, we will remove the listener and resolve the response
this.socket.removeEventListener('message', handleResponse);
Expand Down Expand Up @@ -120,19 +125,19 @@ export class JsonRpcSocket {
const response = await this.request(request);
if (response.error) {
this.socket.removeEventListener('message', socketEventListener);
return { response }
return { response };
}

// clean up listener and create a `rpc.subscribe.close` message to use when closing this JSON RPC subscription
const close = async (): Promise<void> => {
this.socket.removeEventListener('message', socketEventListener);
await this.closeSubscription(subscriptionId);
}
};

return {
response,
close
}
};
}

private closeSubscription(id: JsonRpcId): Promise<JsonRpcResponse> {
Expand All @@ -145,6 +150,6 @@ export class JsonRpcSocket {
* Sends a JSON-RPC request through the socket. You must subscribe to a message listener separately to capture the response.
*/
send(request: JsonRpcRequest):void {
this.socket.send(Buffer.from(JSON.stringify(request)));
this.socket.send(JSON.stringify(request));
}
}
6 changes: 3 additions & 3 deletions tests/dwn-server.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,14 @@ describe('DwnServer', function () {
expect(dwnServer.httpServer.listening).to.be.false;
});

describe('webSocketServerEnabled config', function() {
describe('webSocketSupport config', function() {
it('should not return a websocket server if disabled', async function() {
dwn = await getTestDwn({ withEvents: true });
const withoutSocketServer = new DwnServer({
dwn,
config: {
...dwnServerConfig,
webSocketServerEnabled: false,
webSocketSupport: false,
}
});

Expand All @@ -44,7 +44,7 @@ describe('DwnServer', function () {
dwn,
config: {
...dwnServerConfig,
webSocketServerEnabled: true,
webSocketSupport: true,
}
});

Expand Down
25 changes: 25 additions & 0 deletions tests/http-api.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -537,5 +537,30 @@ describe('http api', function () {
'proof-of-work-sha256-v0',
);
});

it('verify /info signals websocket support', async function() {
let resp = await fetch(`http://localhost:3000/info`);
expect(resp.status).to.equal(200);

let info = await resp.json();
expect(info['server']).to.equal('@web5/dwn-server');
expect(info['webSocketSupport']).to.equal(true);


// start server without websocket support enabled
server.close();
server.closeAllConnections();

config.webSocketSupport = false;
httpApi = new HttpApi(config, dwn, registrationManager);
server = await httpApi.start(3000);

resp = await fetch(`http://localhost:3000/info`);
expect(resp.status).to.equal(200);

info = await resp.json();
expect(info['server']).to.equal('@web5/dwn-server');
expect(info['webSocketSupport']).to.equal(false);
});
});
});
50 changes: 49 additions & 1 deletion tests/json-rpc-socket.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,33 @@ describe('JsonRpcSocket', () => {
await expect(requestPromise).to.eventually.be.rejectedWith('timed out');
});

it('removes listener if subscription json rpc is rejected ', async () => {
wsServer.addListener('connection', (socket) => {
socket.on('message', (dataBuffer: Buffer) => {
const request = JSON.parse(dataBuffer.toString()) as JsonRpcRequest;
// initial response
const response = createJsonRpcErrorResponse(request.id, JsonRpcErrorCodes.BadRequest, 'bad request');
socket.send(Buffer.from(JSON.stringify(response)));
});
});

const client = await JsonRpcSocket.connect('ws://127.0.0.1:9003', { responseTimeout: 5 });
const requestId = uuidv4();
const subscribeId = uuidv4();
const request = createJsonRpcSubscriptionRequest(
requestId,
'rpc.subscribe.test.method',
{ param1: 'test-param1', param2: 'test-param2' },
subscribeId,
);

const responseListener = (_response: JsonRpcSuccessResponse): void => {}

const subscription = await client.subscribe(request, responseListener);
expect(subscription.response.error).to.not.be.undefined;
expect(client['socket'].listenerCount('message')).to.equal(0);
});

it('opens a subscription', async () => {
wsServer.addListener('connection', (socket) => {
socket.on('message', (dataBuffer: Buffer) => {
Expand Down Expand Up @@ -230,5 +257,26 @@ describe('JsonRpcSocket', () => {
expect(logMessage).to.equal('JSON RPC Socket close ws://127.0.0.1:9003');
});

xit('calls onerror handler', async () => {});
it('calls onerror handler', async () => {
// test injected handler
const onErrorHandler = { onerror: ():void => {} };
const onErrorSpy = sinon.spy(onErrorHandler, 'onerror');
const client = await JsonRpcSocket.connect('ws://127.0.0.1:9003', { onerror: onErrorHandler.onerror });
client['socket'].emit('error', 'some error');

await new Promise((resolve) => setTimeout(resolve, 5)); // wait for close event to arrive
expect(onErrorSpy.callCount).to.equal(1, 'error');

// test default logger
const logInfoSpy = sinon.spy(log, 'error');
const defaultClient = await JsonRpcSocket.connect('ws://127.0.0.1:9003');
defaultClient['socket'].emit('error', 'some error');

await new Promise((resolve) => setTimeout(resolve, 5)); // wait for close event to arrive
expect(logInfoSpy.callCount).to.equal(1, 'log');

// extract log message from argument
const logMessage:string = logInfoSpy.args[0][0]!;
expect(logMessage).to.equal('JSON RPC Socket error ws://127.0.0.1:9003');
});
});

0 comments on commit 40e758c

Please sign in to comment.