Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WebSocket Subscriptions via JRPC #107

Merged
merged 40 commits into from
Feb 27, 2024
Merged
Show file tree
Hide file tree
Changes from 38 commits
Commits
Show all changes
40 commits
Select commit Hold shift + click to select a range
dec147b
initial pass at server supporting long running subscriptions via sockets
LiranCohen Feb 9, 2024
aaa6aa2
json rpc cleanup
LiranCohen Feb 10, 2024
6f46c01
brought back requestCounter functionality for jsonrpc response/error
LiranCohen Feb 11, 2024
523558d
add JSONRPCSocket class to handle requests and long-running subscript…
LiranCohen Feb 12, 2024
5b79176
handle connection close not found error explicitly, general clean up
LiranCohen Feb 12, 2024
81f3ccd
replace crypto.randomUUID()
LiranCohen Feb 12, 2024
1dd2943
clean up JSON RPC Socket client
LiranCohen Feb 12, 2024
919df3b
optional tenant gate and event stream in options
LiranCohen Feb 12, 2024
eac8d3b
refactor into separate files, account for null errors, use loglevel
LiranCohen Feb 12, 2024
0669f04
uncesseary chagnge
LiranCohen Feb 12, 2024
f2b71d3
update comments
LiranCohen Feb 13, 2024
47279cd
refactor SocketConnection
LiranCohen Feb 13, 2024
d9b731a
clearer comments related to connetion reference handling and close
LiranCohen Feb 13, 2024
c14ab4f
increase coverage in ws-api and dwn-server
LiranCohen Feb 13, 2024
9451a26
clean up listener on reject
LiranCohen Feb 14, 2024
5ed80e0
additional JSON RPC Socket client coverage
LiranCohen Feb 14, 2024
0f0a223
remove only test decorator
LiranCohen Feb 14, 2024
be4a981
addressed PR suggestions
LiranCohen Feb 14, 2024
50dbb89
remove close connection
LiranCohen Feb 14, 2024
18e4a5c
optional socket send logger for SocketConnection.send()
LiranCohen Feb 14, 2024
7bb207a
rename onError to callback
LiranCohen Feb 14, 2024
c837cd7
rename some classes for consistency, update coments as per PR review
LiranCohen Feb 15, 2024
b2eacec
update chai, added types for chai and sinon
LiranCohen Feb 15, 2024
e12028f
address review suggestions
LiranCohen Feb 15, 2024
97a1ef9
review comments
LiranCohen Feb 15, 2024
1c9adee
match class name in test
LiranCohen Feb 15, 2024
d699035
fix tests
LiranCohen Feb 16, 2024
16694f2
update jrpc subscription support - more error proof and concise
LiranCohen Feb 21, 2024
fcdf960
added `rpc.subscribe.close` method and handling
LiranCohen Feb 22, 2024
7de0ad1
increase testing across the board
LiranCohen Feb 23, 2024
f54fac2
remove utils that were not very useful
LiranCohen Feb 23, 2024
a1caaae
increase test covrage for connection manager and add comments
LiranCohen Feb 27, 2024
7c9983f
review comments
LiranCohen Feb 27, 2024
6bb0ede
review suggestions
LiranCohen Feb 27, 2024
db403d6
add coverage to json-rpc-socet
LiranCohen Feb 27, 2024
72c8a1d
removed test code
LiranCohen Feb 27, 2024
fdc0ee5
unecessary socket injection
LiranCohen Feb 27, 2024
077db21
remove unecessary JsonRpcParams = any export
LiranCohen Feb 27, 2024
ad90dd2
update comment
LiranCohen Feb 27, 2024
95bb520
remove unused test internals
LiranCohen Feb 27, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 41 additions & 7 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 3 additions & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -50,10 +50,12 @@
"devDependencies": {
"@types/bytes": "3.1.1",
"@types/chai": "4.3.4",
"@types/chai-as-promised": "7.1.5",
"@types/express": "4.17.17",
"@types/mocha": "10.0.1",
"@types/node": "18.11.18",
"@types/readable-stream": "4.0.6",
"@types/sinon": "17.0.3",
"@types/supertest": "2.0.12",
"@types/ws": "8.5.4",
"@typescript-eslint/eslint-plugin": "5.59.0",
Expand All @@ -77,7 +79,7 @@
"lint-staged": "^14.0.1",
"mocha": "^10.2.0",
"puppeteer": "^21.4.0",
"sinon": "16.1.0",
"sinon": "17.0.1",
"stream-browserify": "^3.0.0",
"supertest": "6.3.3",
"typescript": "^5.1.6"
Expand Down
39 changes: 39 additions & 0 deletions src/connection/connection-manager.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
import type { Dwn } from "@tbd54566975/dwn-sdk-js";

import type { IncomingMessage } from "http";
import type { WebSocket } from 'ws';

import { SocketConnection } from "./socket-connection.js";

/**
* Interface for managing `WebSocket` connections as they arrive.
*/
export interface ConnectionManager {
/** connect handler used for the `WebSockets` `'connection'` event. */
connect(socket: WebSocket, request?: IncomingMessage): Promise<void>;
/** closes all of the connections */
closeAll(): Promise<void>
}

/**
* A Simple In Memory ConnectionManager implementation.
* It uses a `Map<WebSocket, SocketConnection>` to manage connections.
*/
export class InMemoryConnectionManager implements ConnectionManager {
constructor(private dwn: Dwn, private connections: Map<WebSocket, SocketConnection> = new Map()) {}

async connect(socket: WebSocket): Promise<void> {
const connection = new SocketConnection(socket, this.dwn, () => {
thehenrytsai marked this conversation as resolved.
Show resolved Hide resolved
// this is the onClose handler to clean up any closed connections.
this.connections.delete(socket);
});

this.connections.set(socket, connection);
}

async closeAll(): Promise<void> {
const closePromises = [];
this.connections.forEach(connection => closePromises.push(connection.close()));
await Promise.all(closePromises);
}
}
221 changes: 221 additions & 0 deletions src/connection/socket-connection.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,221 @@
import type { Dwn, GenericMessage, MessageEvent } from "@tbd54566975/dwn-sdk-js";
import { DwnMethodName } from "@tbd54566975/dwn-sdk-js";

import type { WebSocket } from "ws";
import log from 'loglevel';
import { v4 as uuidv4 } from 'uuid';

import type { RequestContext } from "../lib/json-rpc-router.js";
import type { JsonRpcErrorResponse, JsonRpcId, JsonRpcRequest, JsonRpcResponse, JsonRpcSubscription } from "../lib/json-rpc.js";

import { requestCounter } from "../metrics.js";
import { jsonRpcRouter } from "../json-rpc-api.js";
import { JsonRpcErrorCodes, createJsonRpcErrorResponse, createJsonRpcSuccessResponse } from "../lib/json-rpc.js";
import { DwnServerError, DwnServerErrorCode } from "../dwn-error.js";

const HEARTBEAT_INTERVAL = 30_000;

/**
* SocketConnection handles a WebSocket connection to a DWN using JSON RPC.
* It also manages references to the long running RPC subscriptions for the connection.
*/
export class SocketConnection {
private heartbeatInterval: NodeJS.Timer;
private subscriptions: Map<JsonRpcId, JsonRpcSubscription> = new Map();
private isAlive: boolean;

constructor(
private socket: WebSocket,
private dwn: Dwn,
private onClose?: () => void
){
socket.on('message', this.message.bind(this));
socket.on('close', this.close.bind(this));
socket.on('error', this.error.bind(this));
socket.on('pong', this.pong.bind(this));

// Sometimes connections between client <-> server can get borked in such a way that
// leaves both unaware of the borkage. ping messages can be used as a means to verify
// that the remote endpoint is still responsive. Server will ping each socket every 30s
// if a pong hasn't received from a socket by the next ping, the server will terminate
// the socket connection
this.isAlive = true;
this.heartbeatInterval = setInterval(() => {
if (this.isAlive === false) {
this.close();
}
this.isAlive = false;
this.socket.ping();
}, HEARTBEAT_INTERVAL);
}

/**
* Checks to see if the incoming `JsonRpcId` is already in use for a subscription.
*/
hasSubscription(id: JsonRpcId): boolean {
return this.subscriptions.has(id);
}

/**
* Adds a reference for the JSON RPC Subscription to this connection.
* Used for cleanup if the connection is closed.
*/
async addSubscription(subscription: JsonRpcSubscription): Promise<void> {
if (this.subscriptions.has(subscription.id)) {
throw new DwnServerError(
DwnServerErrorCode.ConnectionSubscriptionJsonRpcIdExists,
`the subscription with id ${subscription.id} already exists`
)
}

this.subscriptions.set(subscription.id, subscription);
}

/**
* Closes and removes the reference for a given subscription from this connection.
*
* @param id the `JsonRpcId` of the JSON RPC subscription request.
*/
async closeSubscription(id: JsonRpcId): Promise<void> {
if (!this.subscriptions.has(id)) {
throw new DwnServerError(
DwnServerErrorCode.ConnectionSubscriptionJsonRpcIdNotFound,
`the subscription with id ${id} was not found`
)
}

const connection = this.subscriptions.get(id);
await connection.close();
this.subscriptions.delete(id);
}

/**
* Closes the existing connection and cleans up any listeners or subscriptions.
*/
async close(): Promise<void> {
clearInterval(this.heartbeatInterval);

// clean up all socket event listeners
this.socket.removeAllListeners();

const closePromises = [];
for (const [id, subscription] of this.subscriptions) {
closePromises.push(subscription.close());
this.subscriptions.delete(id);
}

// close all of the associated subscriptions
await Promise.all(closePromises);

// close the socket.
this.socket.close();

// if there was a close handler passed call it after the connection has been closed
if (this.onClose !== undefined) {
this.onClose();
}
}

/**
* Pong messages are automatically sent in response to ping messages as required by
* the websocket spec. So, no need to send explicit pongs.
*/
private pong(): void {
this.isAlive = true;
}

/**
* Log the error and close the connection.
*/
private async error(error:Error): Promise<void>{
log.error(`SocketConnection error, terminating connection`, error);
this.socket.terminate();
await this.close();
}

/**
* Handles a `JSON RPC 2.0` encoded message.
*/
private async message(dataBuffer: Buffer): Promise<void> {
const requestData = dataBuffer.toString();
if (!requestData) {
return this.send(createJsonRpcErrorResponse(
uuidv4(),
JsonRpcErrorCodes.BadRequest,
'request payload required.'
))
}

let jsonRequest: JsonRpcRequest;
try {
jsonRequest = JSON.parse(requestData);
} catch(error) {
const errorResponse = createJsonRpcErrorResponse(
uuidv4(),
JsonRpcErrorCodes.BadRequest,
(error as Error).message
);
return this.send(errorResponse);
};

const requestContext = await this.buildRequestContext(jsonRequest);
const { jsonRpcResponse } = await jsonRpcRouter.handle(jsonRequest, requestContext);
if (jsonRpcResponse.error) {
requestCounter.inc({ method: jsonRequest.method, error: 1 });
} else {
requestCounter.inc({
method: jsonRequest.method,
status: jsonRpcResponse?.result?.reply?.status?.code || 0,
});
}
this.send(jsonRpcResponse);
}

/**
* Sends a JSON encoded Buffer through the Websocket.
*/
private send(response: JsonRpcResponse | JsonRpcErrorResponse): void {
this.socket.send(Buffer.from(JSON.stringify(response)));
}

/**
* Creates a subscription handler to send messages matching the subscription requested.
*
* Wraps the incoming `message` in a `JSON RPC Success Response` using the original subscription`JSON RPC Id` to send through the WebSocket.
*/
private createSubscriptionHandler(id: JsonRpcId): (message: MessageEvent) => void {
return (event) => {
const response = createJsonRpcSuccessResponse(id, { event });
this.send(response);
}
}

/**
* Builds a `RequestContext` object to use with the `JSON RPC API`.
*
* Adds a `subscriptionHandler` for `Subscribe` messages.
*/
private async buildRequestContext(request: JsonRpcRequest): Promise<RequestContext> {
const { params, method, subscription } = request;

const requestContext: RequestContext = {
transport : 'ws',
dwn : this.dwn,
socketConnection : this,
}

// methods that expect a long-running subscription begin with `rpc.subscribe.`
if (method.startsWith('rpc.subscribe.') && subscription) {
const { message } = params as { message?: GenericMessage };
if (message?.descriptor.method === DwnMethodName.Subscribe) {
const handlerFunc = this.createSubscriptionHandler(subscription.id);
requestContext.subscriptionRequest = {
id: subscription.id,
subscriptionHandler: (message): void => handlerFunc(message),
}
}
}

return requestContext;
}
}
2 changes: 2 additions & 0 deletions src/dwn-error.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ export class DwnServerError extends Error {
* DWN Server error codes.
*/
export enum DwnServerErrorCode {
ConnectionSubscriptionJsonRpcIdExists = 'ConnectionSubscriptionJsonRpcIdExists',
ConnectionSubscriptionJsonRpcIdNotFound = 'ConnectionSubscriptionJsonRpcIdNotFound',
ProofOfWorkInsufficientSolutionNonce = 'ProofOfWorkInsufficientSolutionNonce',
ProofOfWorkInvalidOrExpiredChallenge = 'ProofOfWorkInvalidOrExpiredChallenge',
ProofOfWorkManagerInvalidChallengeNonce = 'ProofOfWorkManagerInvalidChallengeNonce',
Expand Down
Loading
Loading