Skip to content

Commit

Permalink
refactor into separate files, account for null errors, use loglevel
Browse files Browse the repository at this point in the history
  • Loading branch information
LiranCohen committed Feb 12, 2024
1 parent 8d20cd6 commit 72b8bcb
Show file tree
Hide file tree
Showing 9 changed files with 253 additions and 252 deletions.
2 changes: 1 addition & 1 deletion src/config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ export const config = {
// port that server listens on
port: parseInt(process.env.DS_PORT || '3000'),
// whether to enable 'ws:'
webSocketServerEnabled: { on: true, off: false }[process.env.DS_WEBSOCKET_SERVER] ?? true,
webSocketServerEnabled: { on: true, off: false }[process.env.DWN_WEBSOCKET_SERVER] ?? true,
// where to store persistent data
messageStore: process.env.DWN_STORAGE_MESSAGES || process.env.DWN_STORAGE || 'level://data',
dataStore: process.env.DWN_STORAGE_DATA || process.env.DWN_STORAGE || 'level://data',
Expand Down
35 changes: 35 additions & 0 deletions src/connection/connection-manager.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
import type { Dwn } from "@tbd54566975/dwn-sdk-js";

import type { WebSocket } from 'ws';

import { SocketConnection } from "./socket-connection.js";
import { InMemorySubscriptionManager } from "../subscription-manager.js";

export interface ConnectionManager {
connect(socket: WebSocket): Promise<void>;
close(): Promise<void>
}

export class InMemoryConnectionManager implements ConnectionManager {
constructor(private dwn: Dwn, private connections: Map<WebSocket, SocketConnection> = new Map()) {}

/**
* Handler for opening websocket event - `connection`.
* Sets listeners for `message`, `pong`, `close`, and `error` events.
*/
async connect(socket: WebSocket): Promise<void> {
const connection = new SocketConnection(socket, this.dwn, new InMemorySubscriptionManager());
this.connections.set(socket, connection);
// attach to the socket's close handler to clean up this connection.
socket.on('close', () => {
// the connection internally already cleans itself up upon a socket close event, we just ned to remove it from our set.
this.connections.delete(socket);
});
}

async close(): Promise<void> {
const closePromises = [];
this.connections.forEach(connection => closePromises.push(connection.close()));
await Promise.all(closePromises);
}
}
143 changes: 143 additions & 0 deletions src/connection/socket-connection.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,143 @@
import type { Dwn, GenericMessage } from "@tbd54566975/dwn-sdk-js";
import { DwnMethodName } from "@tbd54566975/dwn-sdk-js";

import log from 'loglevel';
import { v4 as uuidv4 } from 'uuid';
import type { WebSocket } from "ws";

import type { RequestContext } from "../lib/json-rpc-router.js";
import type { SubscriptionManager } from "../subscription-manager.js";
import type { JsonRpcErrorResponse, JsonRpcId, JsonRpcRequest, JsonRpcResponse } from "../lib/json-rpc.js";

import { requestCounter } from "../metrics.js";
import { jsonRpcApi } from "../json-rpc-api.js";
import { JsonRpcErrorCodes, createJsonRpcErrorResponse, createJsonRpcSuccessResponse } from "../lib/json-rpc.js";

const SOCKET_ISALIVE_SYMBOL = Symbol('isAlive');
const HEARTBEAT_INTERVAL = 30_000;

export class SocketConnection {
private heartbeatInterval: NodeJS.Timer;
constructor(
private socket: WebSocket,
private dwn: Dwn,
private subscriptions: SubscriptionManager
){
socket.on('close', this.close.bind(this));
socket.on('pong', this.pong.bind(this));
socket.on('error', this.error.bind(this));
socket.on('message', this.message.bind(this));

// Sometimes connections between client <-> server can get borked in such a way that
// leaves both unaware of the borkage. ping messages can be used as a means to verify
// that the remote endpoint is still responsive. Server will ping each socket every 30s
// if a pong hasn't received from a socket by the next ping, the server will terminate
// the socket connection
socket[SOCKET_ISALIVE_SYMBOL] = true;
this.heartbeatInterval = setInterval(() => {
if (this.socket[SOCKET_ISALIVE_SYMBOL] === false) {
this.close();
}
this.socket[SOCKET_ISALIVE_SYMBOL] = false;
this.socket.ping();
}, HEARTBEAT_INTERVAL);
}

/**
* Closes the existing connection and cleans up any listeners or subscriptions.
*/
async close(): Promise<void> {
clearInterval(this.heartbeatInterval);
// clean up all socket event listeners
this.socket.removeAllListeners();

// close all of the associated subscriptions
await this.subscriptions.closeAll();

// close the socket.
this.socket.close();
}

/**
* Pong messages are automatically sent in response to ping messages as required by
* the websocket spec. So, no need to send explicit pongs.
*/
private pong(): void {
this.socket[SOCKET_ISALIVE_SYMBOL] = true;
}

private async error(error?:Error): Promise<void>{
if (error) {
log.error(`SocketConnection error, terminating connection`, error);
this.socket.terminate();
await this.close()
}
}

private async message(dataBuffer: Buffer): Promise<void> {
const requestData = dataBuffer.toString();
if (!requestData) {
return this.send(createJsonRpcErrorResponse(
uuidv4(),
JsonRpcErrorCodes.BadRequest,
'request payload required.'
))
}

let jsonRequest: JsonRpcRequest;
try {
jsonRequest = JSON.parse(requestData);
} catch(error) {
const errorResponse = createJsonRpcErrorResponse(
uuidv4(),
JsonRpcErrorCodes.BadRequest,
(error as Error).message
);

return this.send(errorResponse);
};

const requestContext = await this.buildRequestContext(jsonRequest);
const { jsonRpcResponse } = await jsonRpcApi.handle(jsonRequest, requestContext);
if (jsonRpcResponse.error) {
requestCounter.inc({ method: jsonRequest.method, error: 1 });
} else {
requestCounter.inc({
method: jsonRequest.method,
status: jsonRpcResponse?.result?.reply?.status?.code || 0,
});
}
this.send(jsonRpcResponse);
}

private send(response: JsonRpcResponse | JsonRpcErrorResponse): void {
this.socket.send(Buffer.from(JSON.stringify(response)), this.error.bind(this));
}

private subscriptionHandler(id: JsonRpcId): (message: GenericMessage) => void {
return (message) => {
const response = createJsonRpcSuccessResponse(id, { reply: {
record : message
} });
this.send(response);
}
}

private async buildRequestContext(request: JsonRpcRequest): Promise<RequestContext> {
const { id, params, method} = request;
const requestContext: RequestContext = {
transport : 'ws',
dwn : this.dwn,
subscriptionManager : this.subscriptions,
}

if (method === 'dwn.processMessage') {
const { message } = params as { message: GenericMessage };
if (message.descriptor.method === DwnMethodName.Subscribe) {
requestContext.subscriptionHandler = this.subscriptionHandler(id).bind(this);
}
}

return requestContext;
}
}
5 changes: 3 additions & 2 deletions src/json-rpc-socket.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import log from 'loglevel';
import { v4 as uuidv4 } from 'uuid';
import WebSocket from 'ws';

Expand All @@ -22,11 +23,11 @@ export class JSONRPCSocket {
const { connectTimeout = CONNECT_TIMEOUT, responseTimeout = RESPONSE_TIMEOUT } = options;

const onclose = ():void => {
console.log('json rpc close');
log.info(`JSON RPC Socket close ${url}`);
};

const onerror = (event: any):void => {
console.log('json rpc error', event);
log.error(`JSON RPC Socket error ${url}`, event);
};

const socket = new WebSocket(url);
Expand Down
2 changes: 1 addition & 1 deletion src/lib/json-rpc-router.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@ import type { Dwn, MessageSubscriptionHandler } from '@tbd54566975/dwn-sdk-js';

import type { Readable } from 'node:stream';

import type { SubscriptionManager } from '../subscription-manager.js';
import type { JsonRpcRequest, JsonRpcResponse } from './json-rpc.js';
import type { SubscriptionManager } from '../ws-api.js';

export type RequestContext = {
transport: 'http' | 'ws';
Expand Down
55 changes: 55 additions & 0 deletions src/subscription-manager.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
import type { MessageSubscription } from "@tbd54566975/dwn-sdk-js";

import { DwnServerError, DwnServerErrorCode } from "./dwn-error.js";

/**
* SubscriptionManager manages the subscriptions related to a `SocketConnection`
*/
export interface SubscriptionManager {
subscribe: (target: string, subscription: MessageSubscription) => Promise<void>;
close: (target: string, id: string) => Promise<void>;
closeAll: () => Promise<void>;
}

export class InMemorySubscriptionManager implements SubscriptionManager {
constructor(private subscriptions: Map<string, Map<string, MessageSubscription>> = new Map()){};
async subscribe(target: string, subscription: MessageSubscription): Promise<void> {
let targetSubscriptions = this.subscriptions.get(target);
if (targetSubscriptions === undefined) {
targetSubscriptions = new Map();
this.subscriptions.set(target, targetSubscriptions);
}
targetSubscriptions.set(subscription.id, subscription);
}

async close(target: string, id: string): Promise<void> {
const targetSubscriptions = this.subscriptions.get(target);
if (targetSubscriptions !== undefined) {
const subscription = targetSubscriptions.get(id);
if (subscription !== undefined) {
targetSubscriptions.delete(id);
await subscription.close();
return;
}
}

// if it reached here no subscription to close
throw new DwnServerError(
DwnServerErrorCode.SubscriptionManagerSubscriptionNotFound,
`subscription ${id} was not found`
)
}

async closeAll(): Promise<void> {
const closePromises = [];
for (const [target, subscriptions] of this.subscriptions) {
this.subscriptions.delete(target);
for (const [id, subscription] of subscriptions) {
subscriptions.delete(id);
closePromises.push(subscription.close());
}
}

await Promise.all(closePromises);
}
}
Loading

0 comments on commit 72b8bcb

Please sign in to comment.