Skip to content

Commit

Permalink
update comments
Browse files Browse the repository at this point in the history
  • Loading branch information
LiranCohen committed Feb 13, 2024
1 parent 4f0ce6f commit 4f659d3
Show file tree
Hide file tree
Showing 7 changed files with 65 additions and 6 deletions.
31 changes: 28 additions & 3 deletions src/connection/connection-manager.ts
Original file line number Diff line number Diff line change
@@ -1,15 +1,28 @@
import type { Dwn } from "@tbd54566975/dwn-sdk-js";

import type { IncomingMessage } from "http";
import type { WebSocket } from 'ws';

import { SocketConnection } from "./socket-connection.js";
import { InMemorySubscriptionManager } from "../subscription-manager.js";

/**
* Interface for managing `WebSocket` connections as they arrive.
*/
export interface ConnectionManager {
connect(socket: WebSocket): Promise<void>;
close(): Promise<void>
/** connect handler used for the `WebSockets` `'connection'` event. */
connect(socket: WebSocket, request?: IncomingMessage): Promise<void>;
/** cleans up handlers associated with the `WebSocket` connection */
close(socket:WebSocket): Promise<void>;
/** closes all of the connections */
closeAll(): Promise<void>
}

/**
* A Simple In Memory ConnectionManager implementation.
* It uses a `Map<WebSocket, SocketConnection>` to manage connections.
* It uses am `InMemorySubscriptionManager` for individual subscription management within the connection.
*/
export class InMemoryConnectionManager implements ConnectionManager {
constructor(private dwn: Dwn, private connections: Map<WebSocket, SocketConnection> = new Map()) {}

Expand All @@ -27,7 +40,19 @@ export class InMemoryConnectionManager implements ConnectionManager {
});
}

async close(): Promise<void> {
/**
* Handler for closing websocket event - `close`.
*/
async close(socket: WebSocket): Promise<void> {
const connection = this.connections.get(socket);
this.connections.delete(socket);
await connection.close();
}

/**
* Closes all associated connections.
*/
async closeAll(): Promise<void> {
const closePromises = [];
this.connections.forEach(connection => closePromises.push(connection.close()));
await Promise.all(closePromises);
Expand Down
20 changes: 19 additions & 1 deletion src/connection/socket-connection.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
import type { Dwn, GenericMessage } from "@tbd54566975/dwn-sdk-js";
import { DwnMethodName } from "@tbd54566975/dwn-sdk-js";

import type { WebSocket } from "ws";
import log from 'loglevel';
import { v4 as uuidv4 } from 'uuid';
import type { WebSocket } from "ws";

import type { RequestContext } from "../lib/json-rpc-router.js";
import type { SubscriptionManager } from "../subscription-manager.js";
Expand All @@ -16,6 +16,9 @@ import { JsonRpcErrorCodes, createJsonRpcErrorResponse, createJsonRpcSuccessResp
const SOCKET_ISALIVE_SYMBOL = Symbol('isAlive');
const HEARTBEAT_INTERVAL = 30_000;

/**
* SocketConnection class sets up a socket connection along with a `ping/pong` heartbeat.
*/
export class SocketConnection {
private heartbeatInterval: NodeJS.Timer;
constructor(
Expand Down Expand Up @@ -74,6 +77,9 @@ export class SocketConnection {
}
}

/**
* Handles a `JSON RPC 2.0` encoded message.
*/
private async message(dataBuffer: Buffer): Promise<void> {
const requestData = dataBuffer.toString();
if (!requestData) {
Expand Down Expand Up @@ -110,10 +116,17 @@ export class SocketConnection {
this.send(jsonRpcResponse);
}

/**
* Sends a JSON encoded Buffer through the Websocket.
*/
private send(response: JsonRpcResponse | JsonRpcErrorResponse): void {
this.socket.send(Buffer.from(JSON.stringify(response)), this.error.bind(this));
}

/**
* Subscription Handler used to build the context for a `JSON RPC` API call.
* Wraps the incoming `message` in a `JSON RPC Success Response` using the origin subscription`JSON RPC Id` to send through the WebSocket.
*/
private subscriptionHandler(id: JsonRpcId): (message: GenericMessage) => void {
return (message) => {
const response = createJsonRpcSuccessResponse(id, { reply: {
Expand All @@ -123,6 +136,11 @@ export class SocketConnection {
}
}

/**
* Builds a `RequestContext` object to use with the `JSON RPC API`.
*
* Adds a `subscriptionHandler` for `Subscribe` messages.
*/
private async buildRequestContext(request: JsonRpcRequest): Promise<RequestContext> {
const { id, params, method} = request;
const requestContext: RequestContext = {
Expand Down
2 changes: 2 additions & 0 deletions src/http-api.ts
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ export class HttpApi {
dwn: Dwn;

constructor(config: DwnServerConfig, dwn: Dwn, registrationManager?: RegistrationManager) {
console.log(config);

this.#config = config;
this.#api = express();
this.#server = http.createServer(this.#api);
Expand Down
6 changes: 6 additions & 0 deletions src/json-rpc-handlers/subscriptions/close.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,12 @@ import {
JsonRpcErrorCodes,
} from '../../lib/json-rpc.js';

/**
* Closes a subscription for a given `target` and `subscriptionId` within a given connection's `SubscriptionManager`.
* @param dwnRequest must include the `target` and `subscriptionId` within the `params`.
* @param context must include the `subscriptionManager` for the associated connection.
*
*/
export const handleSubscriptionsClose: JsonRpcHandler = async (
dwnRequest,
context,
Expand Down
5 changes: 4 additions & 1 deletion src/lib/json-rpc-router.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,12 @@ import type { JsonRpcRequest, JsonRpcResponse } from './json-rpc.js';
export type RequestContext = {
transport: 'http' | 'ws';
dwn: Dwn;
/** The `SubscriptionManager` associated with a subscription request, only used in `ws` requests */
subscriptionManager?: SubscriptionManager;
dataStream?: Readable;
/** The `MessageSubscriptionHandler` associated with a subscription request, only used in `ws` requests */
subscriptionHandler?: MessageSubscriptionHandler;
/** The `Readable` stream associated with a `RecordsWrite` request only used in `ws` requests */
dataStream?: Readable;
};

export type HandlerResponse = {
Expand Down
5 changes: 5 additions & 0 deletions src/subscription-manager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,13 @@ export interface SubscriptionManager {
closeAll: () => Promise<void>;
}

/**
* Simple InMemory implementation of a `SubscriptionManager`.
* Uses `Map<string, MessageSubscription>` to manage internal state.
*/
export class InMemorySubscriptionManager implements SubscriptionManager {
constructor(private subscriptions: Map<string, Map<string, MessageSubscription>> = new Map()){};

async subscribe(target: string, subscription: MessageSubscription): Promise<void> {
let targetSubscriptions = this.subscriptions.get(target);
if (targetSubscriptions === undefined) {
Expand Down
2 changes: 1 addition & 1 deletion src/ws-api.ts
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,6 @@ export class WsApi {

async close(): Promise<void> {
this.#wsServer.close();
await this.#connections.close();
await this.#connections.closeAll();
}
}

0 comments on commit 4f659d3

Please sign in to comment.