Skip to content

Commit

Permalink
added rpc.subscribe.close method and handling
Browse files Browse the repository at this point in the history
  • Loading branch information
LiranCohen committed Feb 23, 2024
1 parent 16694f2 commit fcdf960
Show file tree
Hide file tree
Showing 8 changed files with 58 additions and 38 deletions.
11 changes: 5 additions & 6 deletions src/connection/socket-connection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -196,8 +196,7 @@ export class SocketConnection {
* Adds a `subscriptionHandler` for `Subscribe` messages.
*/
private async buildRequestContext(request: JsonRpcRequest): Promise<RequestContext> {
const { params, method } = request;
const { subscribe } = params.rpc || {};
const { params, method, subscribe } = request;

const requestContext: RequestContext = {
transport : 'ws',
Expand All @@ -206,11 +205,11 @@ export class SocketConnection {
}

if (method.startsWith('rpc.subscribe.') && subscribe) {
const { message } = params as { message: GenericMessage };
if (message.descriptor.method === DwnMethodName.Subscribe) {
const handlerFunc = this.createSubscriptionHandler(subscribe);
const { message } = params as { message?: GenericMessage };
if (message?.descriptor.method === DwnMethodName.Subscribe) {
const handlerFunc = this.createSubscriptionHandler(subscribe.id);
requestContext.subscriptionRequest = {
id: subscribe,
id: subscribe.id,
subscriptionHandler: (message): void => handlerFunc(message),
}
}
Expand Down
1 change: 1 addition & 0 deletions src/json-rpc-api.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,4 +7,5 @@ export const jsonRpcApi = new JsonRpcRouter();

jsonRpcApi.on('dwn.processMessage', handleDwnProcessMessage);
jsonRpcApi.on('rpc.subscribe.dwn.processMessage', handleDwnProcessMessage);

jsonRpcApi.on('rpc.subscribe.close', handleSubscriptionsClose);
2 changes: 1 addition & 1 deletion src/json-rpc-handlers/subscription/close.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ export const handleSubscriptionsClose: JsonRpcHandler = async (
) => {
const requestId = dwnRequest.id ?? uuidv4();
const { socketConnection } = context;
const { id } = dwnRequest.params as { id: JsonRpcId};
const { id } = dwnRequest.subscribe as { id: JsonRpcId };

let jsonRpcResponse:JsonRpcResponse;
try {
Expand Down
22 changes: 7 additions & 15 deletions src/json-rpc-socket.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ import { v4 as uuidv4 } from 'uuid';
import WebSocket from 'ws';

import type { JsonRpcRequest, JsonRpcResponse } from "./lib/json-rpc.js";
import { createJsonRpcRequest } from "./lib/json-rpc.js";
import { createJsonRpcSubscribeRequest } from "./lib/json-rpc.js";

// These were arbitrarily chosen, but can be modified via connect options
const CONNECT_TIMEOUT = 3_000;
Expand Down Expand Up @@ -73,7 +73,6 @@ export class JsonRpcSocket {
return resolve(jsonRpsResponse);
}
};

// subscribe to the listener before sending the request
this.socket.addEventListener('message', handleResponse);
this.send(request);
Expand All @@ -96,21 +95,14 @@ export class JsonRpcSocket {
}> {

if (!request.method.startsWith('rpc.subscribe.')) {
throw new Error('subscribe rpc messages must include the `rpc.subscribe` prefix');
throw new Error('subscribe rpc requests must include the `rpc.subscribe` prefix');
}

// extract optional `rpc.subscribe` param
const { rpc } = request.params;
const { subscribe } = rpc || {};
const subscriptionId = subscribe || uuidv4();

// When subscribing to a JSON RPC Message, we want to generate the subscription update Json PRC Id ahead of time and create a listener.
// We then set the subscription Id within a special rpc.subscribe params namespace preserving any other properties
request.params.rpc = {
...rpc,
subscribe: subscriptionId,
};
if (!request.subscribe) {
throw new Error('subscribe rpc requests must include subscribe options');
}

const subscriptionId = request.subscribe.id;
const messageHandler = (event: { data: any }):void => {
const jsonRpcResponse = JSON.parse(event.data.toString()) as JsonRpcResponse;
if (jsonRpcResponse.id === subscriptionId) {
Expand All @@ -134,7 +126,7 @@ export class JsonRpcSocket {
const close = async (): Promise<void> => {
this.socket.removeEventListener('message', messageHandler);
const requestId = uuidv4();
const request = createJsonRpcRequest(requestId, 'rpc.subscribe.close', { id: subscriptionId });
const request = createJsonRpcSubscribeRequest(requestId, 'rpc.subscribe.close', {}, subscriptionId)
const response = await this.request(request);
if (response.error) {
throw response.error;
Expand Down
23 changes: 23 additions & 0 deletions src/lib/json-rpc.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
import { v4 as uuidv4 } from 'uuid';

export type JsonRpcId = string | number | null;
export type JsonRpcParams = any;
export type JsonRpcVersion = '2.0';
Expand All @@ -7,6 +9,10 @@ export interface JsonRpcRequest {
id?: JsonRpcId;
method: string;
params?: JsonRpcParams;
/** JSON RPC Subscribe Extension Parameters */
subscribe?: {
id: JsonRpcId
};
}

export interface JsonRpcError {
Expand Down Expand Up @@ -81,6 +87,23 @@ export const createJsonRpcNotification = (
};
};

export const createJsonRpcSubscribeRequest = (
id: JsonRpcId,
method: string,
params?: JsonRpcParams,
subscriptionId?: JsonRpcId
): JsonRpcRequest => {
return {
jsonrpc: '2.0',
id,
method,
params,
subscribe: {
id: subscriptionId ?? uuidv4(),
}
}
}

export const createJsonRpcRequest = (
id: JsonRpcId,
method: string,
Expand Down
17 changes: 10 additions & 7 deletions tests/json-rpc-socket.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import { WebSocketServer } from 'ws';
import type { JsonRpcId, JsonRpcRequest, JsonRpcSuccessResponse } from '../src/lib/json-rpc.js';

import { JsonRpcSocket } from '../src/json-rpc-socket.js';
import { createJsonRpcRequest, createJsonRpcSuccessResponse } from '../src/lib/json-rpc.js';
import { createJsonRpcRequest, createJsonRpcSubscribeRequest, createJsonRpcSuccessResponse } from '../src/lib/json-rpc.js';

chai.use(chaiAsPromised);

Expand Down Expand Up @@ -72,20 +72,23 @@ describe('JsonRpcSocket', () => {
// initial response
const response = createJsonRpcSuccessResponse(request.id, { reply: {} })
socket.send(Buffer.from(JSON.stringify(response)));

const { params } = request;
const { subscribe } = params.rpc || {};
const { subscribe } = request;
// send 3 messages
for (let i = 0; i < 3; i++) {
const response = createJsonRpcSuccessResponse(subscribe, { count: i });
const response = createJsonRpcSuccessResponse(subscribe.id, { count: i });
socket.send(Buffer.from(JSON.stringify(response)));
}
});
});
const client = await JsonRpcSocket.connect('ws://127.0.0.1:9003', { responseTimeout: 5 });
const requestId = uuidv4();
const subscribeId = uuidv4();
const request = createJsonRpcRequest(requestId, 'rpc.subscribe.test.method', { param1: 'test-param1', param2: 'test-param2', rpc: { subscribe: subscribeId } });
const request = createJsonRpcSubscribeRequest(
requestId,
'rpc.subscribe.test.method',
{ param1: 'test-param1', param2: 'test-param2' },
subscribeId,
);

let responseCounter = 0;
const responseListener = (response: JsonRpcSuccessResponse): void => {
Expand All @@ -101,7 +104,7 @@ describe('JsonRpcSocket', () => {
await new Promise((resolve) => setTimeout(resolve, 5));
// the original response
expect(responseCounter).to.equal(3);
subscription.close();
await subscription.close();
});

it('sends message', async () => {
Expand Down
3 changes: 3 additions & 0 deletions tests/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -242,6 +242,9 @@ export async function subscriptionRequest(options: {
}): Promise<{ close?: () => Promise<void>, response: JsonRpcResponse, connection?: JsonRpcSocket }> {
const { url, connection: incomingConnection, request, messageHandler, responseTimeout } = options;
const connection = incomingConnection ?? await JsonRpcSocket.connect(url, { responseTimeout });
request.subscribe ??= {
id: uuidv4(),
};

const { close, response } = await connection.subscribe(request, (response) => {
const { event } = response.result.reply;
Expand Down
17 changes: 8 additions & 9 deletions tests/ws-api.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import { v4 as uuidv4 } from 'uuid';

import {
createJsonRpcRequest,
createJsonRpcSubscribeRequest,
JsonRpcErrorCodes,
} from '../src/lib/json-rpc.js';
import { config } from '../src/config.js';
Expand Down Expand Up @@ -111,7 +112,7 @@ describe('websocket api', function () {
};

const requestId = uuidv4();
const dwnRequest = createJsonRpcRequest(requestId, 'rpc.subscribe.dwn.processMessage', {
const dwnRequest = createJsonRpcSubscribeRequest(requestId, 'rpc.subscribe.dwn.processMessage', {
message: message,
target: alice.did,
});
Expand Down Expand Up @@ -262,11 +263,10 @@ describe('websocket api', function () {

const requestId = uuidv4();
const subscribeId = uuidv4();
const dwnRequest = createJsonRpcRequest(requestId, 'rpc.subscribe.dwn.processMessage', {
const dwnRequest = createJsonRpcSubscribeRequest(requestId, 'rpc.subscribe.dwn.processMessage', {
message: message,
target: alice.did,
rpc: { subscribe: subscribeId }
});
target: alice.did
}, subscribeId);

const { response, close, connection } = await subscriptionRequest({
url : 'ws://127.0.0.1:9002',
Expand All @@ -282,11 +282,10 @@ describe('websocket api', function () {

// We are checking for the subscription Id not the request Id
const request2Id = uuidv4();
const dwnRequest2 = createJsonRpcRequest(request2Id, 'rpc.subscribe.dwn.processMessage', {
const dwnRequest2 = createJsonRpcSubscribeRequest(request2Id, 'rpc.subscribe.dwn.processMessage', {
message: message2,
target: alice.did,
rpc: { subscribe: subscribeId }
});
target: alice.did
}, subscribeId);

const { response: response2 } = await subscriptionRequest({
connection,
Expand Down

0 comments on commit fcdf960

Please sign in to comment.