Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WebSocket Subscriptions via JRPC #107

Merged
merged 40 commits into from
Feb 27, 2024
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
Show all changes
40 commits
Select commit Hold shift + click to select a range
dec147b
initial pass at server supporting long running subscriptions via sockets
LiranCohen Feb 9, 2024
aaa6aa2
json rpc cleanup
LiranCohen Feb 10, 2024
6f46c01
brought back requestCounter functionality for jsonrpc response/error
LiranCohen Feb 11, 2024
523558d
add JSONRPCSocket class to handle requests and long-running subscript…
LiranCohen Feb 12, 2024
5b79176
handle connection close not found error explicitly, general clean up
LiranCohen Feb 12, 2024
81f3ccd
replace crypto.randomUUID()
LiranCohen Feb 12, 2024
1dd2943
clean up JSON RPC Socket client
LiranCohen Feb 12, 2024
919df3b
optional tenant gate and event stream in options
LiranCohen Feb 12, 2024
eac8d3b
refactor into separate files, account for null errors, use loglevel
LiranCohen Feb 12, 2024
0669f04
uncesseary chagnge
LiranCohen Feb 12, 2024
f2b71d3
update comments
LiranCohen Feb 13, 2024
47279cd
refactor SocketConnection
LiranCohen Feb 13, 2024
d9b731a
clearer comments related to connetion reference handling and close
LiranCohen Feb 13, 2024
c14ab4f
increase coverage in ws-api and dwn-server
LiranCohen Feb 13, 2024
9451a26
clean up listener on reject
LiranCohen Feb 14, 2024
5ed80e0
additional JSON RPC Socket client coverage
LiranCohen Feb 14, 2024
0f0a223
remove only test decorator
LiranCohen Feb 14, 2024
be4a981
addressed PR suggestions
LiranCohen Feb 14, 2024
50dbb89
remove close connection
LiranCohen Feb 14, 2024
18e4a5c
optional socket send logger for SocketConnection.send()
LiranCohen Feb 14, 2024
7bb207a
rename onError to callback
LiranCohen Feb 14, 2024
c837cd7
rename some classes for consistency, update coments as per PR review
LiranCohen Feb 15, 2024
b2eacec
update chai, added types for chai and sinon
LiranCohen Feb 15, 2024
e12028f
address review suggestions
LiranCohen Feb 15, 2024
97a1ef9
review comments
LiranCohen Feb 15, 2024
1c9adee
match class name in test
LiranCohen Feb 15, 2024
d699035
fix tests
LiranCohen Feb 16, 2024
16694f2
update jrpc subscription support - more error proof and concise
LiranCohen Feb 21, 2024
fcdf960
added `rpc.subscribe.close` method and handling
LiranCohen Feb 22, 2024
7de0ad1
increase testing across the board
LiranCohen Feb 23, 2024
f54fac2
remove utils that were not very useful
LiranCohen Feb 23, 2024
a1caaae
increase test covrage for connection manager and add comments
LiranCohen Feb 27, 2024
7c9983f
review comments
LiranCohen Feb 27, 2024
6bb0ede
review suggestions
LiranCohen Feb 27, 2024
db403d6
add coverage to json-rpc-socet
LiranCohen Feb 27, 2024
72c8a1d
removed test code
LiranCohen Feb 27, 2024
fdc0ee5
unecessary socket injection
LiranCohen Feb 27, 2024
077db21
remove unecessary JsonRpcParams = any export
LiranCohen Feb 27, 2024
ad90dd2
update comment
LiranCohen Feb 27, 2024
95bb520
remove unused test internals
LiranCohen Feb 27, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 4 additions & 5 deletions src/connection/connection-manager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,12 @@ export class InMemoryConnectionManager implements ConnectionManager {
constructor(private dwn: Dwn, private connections: Map<WebSocket, SocketConnection> = new Map()) {}

async connect(socket: WebSocket): Promise<void> {
const connection = new SocketConnection(socket, this.dwn);
this.connections.set(socket, connection);
// attach to the socket's close handler to clean up this connection.
socket.on('close', () => {
// the connection internally already cleans itself up upon a socket close event, we just ned to remove it from our set.
const connection = new SocketConnection(socket, this.dwn, () => {
thehenrytsai marked this conversation as resolved.
Show resolved Hide resolved
// this is the onClose handler to clean up any closed connections.
this.connections.delete(socket);
});

this.connections.set(socket, connection);
}

async closeAll(): Promise<void> {
Expand Down
22 changes: 15 additions & 7 deletions src/connection/socket-connection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import type { RequestContext } from "../lib/json-rpc-router.js";
import type { JsonRpcErrorResponse, JsonRpcId, JsonRpcRequest, JsonRpcResponse, JsonRpcSubscription } from "../lib/json-rpc.js";

import { requestCounter } from "../metrics.js";
import { jsonRpcApi } from "../json-rpc-api.js";
import { jsonRpcRouter } from "../json-rpc-api.js";
import { JsonRpcErrorCodes, createJsonRpcErrorResponse, createJsonRpcSuccessResponse } from "../lib/json-rpc.js";
import { DwnServerError, DwnServerErrorCode } from "../dwn-error.js";

Expand All @@ -26,7 +26,8 @@ export class SocketConnection {

constructor(
private socket: WebSocket,
private dwn: Dwn
private dwn: Dwn,
private onClose?: () => void
){
socket.on('message', this.message.bind(this));
socket.on('close', this.close.bind(this));
Expand Down Expand Up @@ -93,6 +94,7 @@ export class SocketConnection {
*/
async close(): Promise<void> {
clearInterval(this.heartbeatInterval);

// clean up all socket event listeners
this.socket.removeAllListeners();

Expand All @@ -107,6 +109,11 @@ export class SocketConnection {

// close the socket.
this.socket.close();

// if there was a close handler passed call it after the connection has been closed
if (this.onClose !== undefined) {
this.onClose();
}
}

/**
Expand Down Expand Up @@ -152,7 +159,7 @@ export class SocketConnection {
};

const requestContext = await this.buildRequestContext(jsonRequest);
const { jsonRpcResponse } = await jsonRpcApi.handle(jsonRequest, requestContext);
const { jsonRpcResponse } = await jsonRpcRouter.handle(jsonRequest, requestContext);
if (jsonRpcResponse.error) {
requestCounter.inc({ method: jsonRequest.method, error: 1 });
} else {
Expand Down Expand Up @@ -189,20 +196,21 @@ export class SocketConnection {
* Adds a `subscriptionHandler` for `Subscribe` messages.
*/
private async buildRequestContext(request: JsonRpcRequest): Promise<RequestContext> {
const { params, method, subscribe } = request;
const { params, method, subscription } = request;

const requestContext: RequestContext = {
transport : 'ws',
dwn : this.dwn,
socketConnection : this,
}

if (method.startsWith('rpc.subscribe.') && subscribe) {
// methods that expect a long-running subscription begin with `rpc.subscribe.`
if (method.startsWith('rpc.subscribe.') && subscription) {
const { message } = params as { message?: GenericMessage };
if (message?.descriptor.method === DwnMethodName.Subscribe) {
const handlerFunc = this.createSubscriptionHandler(subscribe.id);
const handlerFunc = this.createSubscriptionHandler(subscription.id);
requestContext.subscriptionRequest = {
id: subscribe.id,
id: subscription.id,
subscriptionHandler: (message): void => handlerFunc(message),
}
}
Expand Down
4 changes: 2 additions & 2 deletions src/http-api.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ import { createJsonRpcErrorResponse, JsonRpcErrorCodes } from './lib/json-rpc.js
import type { DwnServerConfig } from './config.js';
import { config } from './config.js';
import { type DwnServerError } from './dwn-error.js';
import { jsonRpcApi } from './json-rpc-api.js';
import { jsonRpcRouter } from './json-rpc-api.js';
import { requestCounter, responseHistogram } from './metrics.js';
import type { RegistrationManager } from './registration/registration-manager.js';

Expand Down Expand Up @@ -149,7 +149,7 @@ export class HttpApi {
transport : 'http',
dataStream : requestDataStream,
};
const { jsonRpcResponse, dataStream: responseDataStream } = await jsonRpcApi.handle(dwnRpcRequest, requestContext as RequestContext);
const { jsonRpcResponse, dataStream: responseDataStream } = await jsonRpcRouter.handle(dwnRpcRequest, requestContext as RequestContext);

// If the handler catches a thrown exception and returns a JSON RPC InternalError, return the equivalent
// HTTP 500 Internal Server Error with the response.
Expand Down
2 changes: 1 addition & 1 deletion src/index.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
export { DwnServerConfig } from './config.js';
export { DwnServer, DwnServerOptions } from './dwn-server.js';
export { HttpApi } from './http-api.js';
export { jsonRpcApi } from './json-rpc-api.js';
export { jsonRpcRouter } from './json-rpc-api.js';
export { EStoreType, BackendTypes, StoreType } from './storage.js';
export { WsApi } from './ws-api.js';
8 changes: 4 additions & 4 deletions src/json-rpc-api.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,9 @@ import { JsonRpcRouter } from './lib/json-rpc-router.js';
import { handleDwnProcessMessage } from './json-rpc-handlers/dwn/index.js';
import { handleSubscriptionsClose } from './json-rpc-handlers/subscription/index.js';

export const jsonRpcApi = new JsonRpcRouter();
export const jsonRpcRouter = new JsonRpcRouter();

jsonRpcApi.on('dwn.processMessage', handleDwnProcessMessage);
jsonRpcApi.on('rpc.subscribe.dwn.processMessage', handleDwnProcessMessage);
jsonRpcRouter.on('dwn.processMessage', handleDwnProcessMessage);
jsonRpcRouter.on('rpc.subscribe.dwn.processMessage', handleDwnProcessMessage);

jsonRpcApi.on('rpc.subscribe.close', handleSubscriptionsClose);
jsonRpcRouter.on('rpc.subscribe.close', handleSubscriptionsClose);
8 changes: 5 additions & 3 deletions src/json-rpc-handlers/dwn/process-message.ts
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,8 @@ export const handleDwnProcessMessage: JsonRpcHandler = async (
return { jsonRpcResponse };
}

// if this is a subscription request, we first check if the connection has a subscription with this Id
// we do this ahead of time to prevent opening a subscription on the dwn only to close it after attempting to add it to the subscription manager
thehenrytsai marked this conversation as resolved.
Show resolved Hide resolved
if (subscriptionRequest !== undefined && socketConnection?.hasSubscription(subscriptionRequest.id)) {
const jsonRpcResponse = createJsonRpcErrorResponse(
requestId,
Expand All @@ -84,9 +86,9 @@ export const handleDwnProcessMessage: JsonRpcHandler = async (
delete reply.record.data; // not serializable via JSON
}

// Subscribe messages return a close function to facilitate closing the subscription
if (subscriptionRequest && reply.subscription) {
const { close } = reply.subscription;
// Subscribe messages return a close function to facilitate closing the subscription
// we add a reference to the close function for this subscription request to the socket connection.
// this will facilitate closing the subscription later.
const subscriptionReply: JsonRpcSubscription = {
Expand All @@ -111,8 +113,8 @@ export const handleDwnProcessMessage: JsonRpcHandler = async (
e.message,
);

// log the error response
log.error('handleDwnProcessMessage error', jsonRpcResponse);
// log the unhandled error response
log.error('handleDwnProcessMessage error', jsonRpcResponse, e);
return { jsonRpcResponse } as HandlerResponse;
}
};
10 changes: 5 additions & 5 deletions src/json-rpc-handlers/subscription/close.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,27 +16,27 @@ import {
/**
* Closes a subscription tied to a specific `SocketConnection`.
*
* @param dwnRequest must include JsonRpcId of the subscription request within the `params`.
* @param jsonRpcRequest must include JsonRpcId of the subscription request within a `subscription object`.
* @param context must include the associated `SocketConnection`.
*
*/
export const handleSubscriptionsClose: JsonRpcHandler = async (
dwnRequest,
jsonRpcRequest,
context,
) => {
const requestId = dwnRequest.id ?? uuidv4();
const requestId = jsonRpcRequest.id ?? uuidv4();
if (context.socketConnection === undefined) {
const jsonRpcResponse = createJsonRpcErrorResponse(requestId, JsonRpcErrorCodes.InvalidRequest, 'socket connection does not exist');
return { jsonRpcResponse };
}

if (dwnRequest.subscribe === undefined) {
if (jsonRpcRequest.subscription === undefined) {
const jsonRpcResponse = createJsonRpcErrorResponse(requestId, JsonRpcErrorCodes.InvalidRequest, 'subscribe options do not exist');
return { jsonRpcResponse };
}

const { socketConnection } = context;
const { id } = dwnRequest.subscribe as { id: JsonRpcId };
const { id } = jsonRpcRequest.subscription as { id: JsonRpcId };

let jsonRpcResponse:JsonRpcResponse;
try {
Expand Down
31 changes: 17 additions & 14 deletions src/json-rpc-socket.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ import { v4 as uuidv4 } from 'uuid';
import WebSocket from 'ws';

import type { JsonRpcId, JsonRpcRequest, JsonRpcResponse } from "./lib/json-rpc.js";
import { createJsonRpcSubscribeRequest } from "./lib/json-rpc.js";
import { createJsonRpcSubscriptionRequest } from "./lib/json-rpc.js";

// These were arbitrarily chosen, but can be modified via connect options
const CONNECT_TIMEOUT = 3_000;
Expand All @@ -18,6 +18,8 @@ export interface JsonRpcSocketOptions {
onclose?: () => void;
/** optional socket error handler */
onerror?: (error?: any) => void;
/** optional already connected socket to inject */
socket?: WebSocket;
}

/**
Expand All @@ -30,21 +32,22 @@ export class JsonRpcSocket {
const { connectTimeout = CONNECT_TIMEOUT, responseTimeout = RESPONSE_TIMEOUT, onclose, onerror } = options;

const socket = new WebSocket(url);
if (onclose === undefined) {

socket.onclose = onclose;
socket.onerror = onerror;

if (!socket.onclose) {
socket.onclose = ():void => {
log.info(`JSON RPC Socket close ${url}`);
}
}

if (onerror === undefined) {
if (!socket.onerror) {
socket.onerror = (error?: any):void => {
log.error(`JSON RPC Socket error ${url}`, error);
}
}

socket.onclose = onclose;
socket.onerror = onerror;

return new Promise<JsonRpcSocket>((resolve, reject) => {
socket.on('open', () => {
resolve(new JsonRpcSocket(socket, responseTimeout));
Expand Down Expand Up @@ -98,33 +101,33 @@ export class JsonRpcSocket {
throw new Error('subscribe rpc requests must include the `rpc.subscribe` prefix');
}

if (!request.subscribe) {
if (!request.subscription) {
throw new Error('subscribe rpc requests must include subscribe options');
}

const subscriptionId = request.subscribe.id;
const messageHandler = (event: { data: any }):void => {
const subscriptionId = request.subscription.id;
const socketEventListener = (event: { data: any }):void => {
const jsonRpcResponse = JSON.parse(event.data.toString()) as JsonRpcResponse;
if (jsonRpcResponse.id === subscriptionId) {
if (jsonRpcResponse.error !== undefined) {
// remove the event listener upon receipt of a JSON RPC Error.
this.socket.removeEventListener('message', messageHandler);
this.socket.removeEventListener('message', socketEventListener);
this.closeSubscription(subscriptionId);
}
listener(jsonRpcResponse);
thehenrytsai marked this conversation as resolved.
Show resolved Hide resolved
}
};
this.socket.addEventListener('message', messageHandler);
this.socket.addEventListener('message', socketEventListener);

const response = await this.request(request);
if (response.error) {
this.socket.removeEventListener('message', messageHandler);
this.socket.removeEventListener('message', socketEventListener);
return { response }
}

// clean up listener and create a `rpc.subscribe.close` message to use when closing this JSON RPC subscription
const close = async (): Promise<void> => {
this.socket.removeEventListener('message', messageHandler);
this.socket.removeEventListener('message', socketEventListener);
await this.closeSubscription(subscriptionId);
}

Expand All @@ -136,7 +139,7 @@ export class JsonRpcSocket {

private closeSubscription(id: JsonRpcId): Promise<JsonRpcResponse> {
const requestId = uuidv4();
const request = createJsonRpcSubscribeRequest(requestId, 'rpc.subscribe.close', {}, id);
const request = createJsonRpcSubscriptionRequest(requestId, 'rpc.subscribe.close', {}, id);
return this.request(request);
}

Expand Down
1 change: 1 addition & 0 deletions src/lib/json-rpc-router.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import type { SocketConnection } from '../connection/socket-connection.js';
export type RequestContext = {
transport: 'http' | 'ws';
dwn: Dwn;
/** the socket connection associated with this request if over sockets */
socketConnection?: SocketConnection;
subscriptionRequest?: {
/** The JsonRpcId of the subscription handler */
Expand Down
12 changes: 5 additions & 7 deletions src/lib/json-rpc.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
import { v4 as uuidv4 } from 'uuid';

export type JsonRpcId = string | number | null;
export type JsonRpcParams = any;
export type JsonRpcVersion = '2.0';
thehenrytsai marked this conversation as resolved.
Show resolved Hide resolved
Expand All @@ -9,8 +7,8 @@ export interface JsonRpcRequest {
id?: JsonRpcId;
method: string;
params?: JsonRpcParams;
/** JSON RPC Subscribe Extension Parameters */
thehenrytsai marked this conversation as resolved.
Show resolved Hide resolved
subscribe?: {
/** JSON RPC Subscription Extension Parameters */
subscription?: {
id: JsonRpcId
};
}
Expand Down Expand Up @@ -87,7 +85,7 @@ export const createJsonRpcNotification = (
};
};

export const createJsonRpcSubscribeRequest = (
export const createJsonRpcSubscriptionRequest = (
id: JsonRpcId,
method: string,
params?: JsonRpcParams,
Expand All @@ -98,8 +96,8 @@ export const createJsonRpcSubscribeRequest = (
id,
method,
params,
subscribe: {
id: subscriptionId ?? uuidv4(),
subscription: {
id: subscriptionId,
}
}
}
Expand Down
Loading
Loading