Skip to content

Commit

Permalink
refactor into separate files for easier review
Browse files Browse the repository at this point in the history
  • Loading branch information
LiranCohen committed Feb 12, 2024
1 parent 8d20cd6 commit 0580bec
Show file tree
Hide file tree
Showing 6 changed files with 247 additions and 246 deletions.
35 changes: 35 additions & 0 deletions src/connection/connection-manager.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
import type { Dwn } from "@tbd54566975/dwn-sdk-js";

import type { WebSocket } from 'ws';

import { SocketConnection } from "./socket-connection.js";
import { InMemorySubscriptionManager } from "../subscription-manager.js";

export interface ConnectionManager {
connect(socket: WebSocket): Promise<void>;
close(): Promise<void>
}

export class InMemoryConnectionManager implements ConnectionManager {
constructor(private dwn: Dwn, private connections: Map<WebSocket, SocketConnection> = new Map()) {}

/**
* Handler for opening websocket event - `connection`.
* Sets listeners for `message`, `pong`, `close`, and `error` events.
*/
async connect(socket: WebSocket): Promise<void> {
const connection = new SocketConnection(socket, this.dwn, new InMemorySubscriptionManager());
this.connections.set(socket, connection);
// attach to the socket's close handler to clean up this connection.
socket.on('close', () => {
// the connection internally already cleans itself up upon a socket close event, we just ned to remove it from our set.
this.connections.delete(socket);
});
}

async close(): Promise<void> {
const closePromises = [];
this.connections.forEach(connection => closePromises.push(connection.close()));
await Promise.all(closePromises);
}
}
145 changes: 145 additions & 0 deletions src/connection/socket-connection.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,145 @@
import type { Dwn, GenericMessage } from "@tbd54566975/dwn-sdk-js";
import { DwnMethodName } from "@tbd54566975/dwn-sdk-js";

import log from 'loglevel';
import { v4 as uuidv4 } from 'uuid';
import type { WebSocket } from "ws";

import type { RequestContext } from "../lib/json-rpc-router.js";
import type { SubscriptionManager } from "../subscription-manager.js";
import type { JsonRpcErrorResponse, JsonRpcId, JsonRpcRequest, JsonRpcResponse } from "../lib/json-rpc.js";

import { requestCounter } from "../metrics.js";
import { jsonRpcApi } from "../json-rpc-api.js";
import { JsonRpcErrorCodes, createJsonRpcErrorResponse, createJsonRpcSuccessResponse } from "../lib/json-rpc.js";

const SOCKET_ISALIVE_SYMBOL = Symbol('isAlive');
const HEARTBEAT_INTERVAL = 30_000;

export class SocketConnection {
private heartbeatInterval: NodeJS.Timer;
constructor(
private socket: WebSocket,
private dwn: Dwn,
private subscriptions: SubscriptionManager
){
socket.on('close', this.close.bind(this));
socket.on('pong', this.pong.bind(this));
socket.on('error', this.error.bind(this));
socket.on('message', this.message.bind(this));

// Sometimes connections between client <-> server can get borked in such a way that
// leaves both unaware of the borkage. ping messages can be used as a means to verify
// that the remote endpoint is still responsive. Server will ping each socket every 30s
// if a pong hasn't received from a socket by the next ping, the server will terminate
// the socket connection
socket[SOCKET_ISALIVE_SYMBOL] = true;
this.heartbeatInterval = setInterval(() => {
if (this.socket[SOCKET_ISALIVE_SYMBOL] === false) {
this.close();
}
this.ping();
}, HEARTBEAT_INTERVAL);
}

/**
* Closes the existing connection and cleans up any listeners or subscriptions.
*/
async close(): Promise<void> {
clearInterval(this.heartbeatInterval);
// clean up all socket event listeners
this.socket.removeAllListeners();

// close all of the associated subscriptions
await this.subscriptions.closeAll();
}

private ping(): void {
this.socket[SOCKET_ISALIVE_SYMBOL] = false;
this.socket.ping();
}

/**
* Pong messages are automatically sent in response to ping messages as required by
* the websocket spec. So, no need to send explicit pongs from browser
*/
private pong(): void {
this.socket[SOCKET_ISALIVE_SYMBOL] = true;
}

private async error(error?:Error): Promise<void>{
if (error !== undefined) {
log.error('WebSocket', this.socket.url, error);
this.socket.terminate();
await this.close()
}
}

private async message(dataBuffer: Buffer): Promise<void> {
const requestData = dataBuffer.toString();
if (!requestData) {
return this.send(createJsonRpcErrorResponse(
uuidv4(),
JsonRpcErrorCodes.BadRequest,
'request payload required.'
))
}

let jsonRequest: JsonRpcRequest;
try {
jsonRequest = JSON.parse(requestData);
} catch(error) {
const errorResponse = createJsonRpcErrorResponse(
uuidv4(),
JsonRpcErrorCodes.BadRequest,
(error as Error).message
);

return this.send(errorResponse);
};

const requestContext = await this.buildRequestContext(jsonRequest);
const { jsonRpcResponse } = await jsonRpcApi.handle(jsonRequest, requestContext);
if (jsonRpcResponse.error) {
requestCounter.inc({ method: jsonRequest.method, error: 1 });
} else {
requestCounter.inc({
method: jsonRequest.method,
status: jsonRpcResponse?.result?.reply?.status?.code || 0,
});
}

this.send(jsonRpcResponse);
}

private send(response: JsonRpcResponse | JsonRpcErrorResponse): void {
this.socket.send(Buffer.from(JSON.stringify(response)), this.error.bind(this));
}

private subscriptionHandler(id: JsonRpcId): (message: GenericMessage) => void {
return (message) => {
const response = createJsonRpcSuccessResponse(id, { reply: {
record : message
} });
this.send(response);
}
}

private async buildRequestContext(request: JsonRpcRequest): Promise<RequestContext> {
const { id, params, method} = request;
const requestContext: RequestContext = {
transport : 'ws',
dwn : this.dwn,
subscriptionManager : this.subscriptions,
}

if (method === 'dwn.processMessage') {
const { message } = params as { message: GenericMessage };
if (message.descriptor.method === DwnMethodName.Subscribe) {
requestContext.subscriptionHandler = this.subscriptionHandler(id).bind(this);
}
}

return requestContext;
}
}
2 changes: 1 addition & 1 deletion src/lib/json-rpc-router.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@ import type { Dwn, MessageSubscriptionHandler } from '@tbd54566975/dwn-sdk-js';

import type { Readable } from 'node:stream';

import type { SubscriptionManager } from '../subscription-manager.js';
import type { JsonRpcRequest, JsonRpcResponse } from './json-rpc.js';
import type { SubscriptionManager } from '../ws-api.js';

export type RequestContext = {
transport: 'http' | 'ws';
Expand Down
52 changes: 52 additions & 0 deletions src/subscription-manager.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
import type { MessageSubscription } from "@tbd54566975/dwn-sdk-js";

import { DwnServerError, DwnServerErrorCode } from "./dwn-error.js";

export interface SubscriptionManager {
subscribe: (target: string, subscription: MessageSubscription) => Promise<void>;
close: (target: string, id: string) => Promise<void>;
closeAll: () => Promise<void>;
}

export class InMemorySubscriptionManager implements SubscriptionManager {
constructor(private subscriptions: Map<string, Map<string, MessageSubscription>> = new Map()){};
async subscribe(target: string, subscription: MessageSubscription): Promise<void> {
let targetSubscriptions = this.subscriptions.get(target);
if (targetSubscriptions === undefined) {
targetSubscriptions = new Map();
this.subscriptions.set(target, targetSubscriptions);
}
targetSubscriptions.set(subscription.id, subscription);
}

async close(target: string, id: string): Promise<void> {
const targetSubscriptions = this.subscriptions.get(target);
if (targetSubscriptions !== undefined) {
const subscription = targetSubscriptions.get(id);
if (subscription !== undefined) {
targetSubscriptions.delete(id);
await subscription.close();
return;
}
}

// if it reached here no subscription to close
throw new DwnServerError(
DwnServerErrorCode.SubscriptionManagerSubscriptionNotFound,
`subscription ${id} was not found`
)
}

async closeAll(): Promise<void> {
const closePromises = [];
for (const [target, subscriptions] of this.subscriptions) {
this.subscriptions.delete(target);
for (const [id, subscription] of subscriptions) {
subscriptions.delete(id);
closePromises.push(subscription.close());
}
}

await Promise.all(closePromises);
}
}
Loading

0 comments on commit 0580bec

Please sign in to comment.