diff --git a/src/connection/connection-manager.ts b/src/connection/connection-manager.ts index 931315d..d8a1947 100644 --- a/src/connection/connection-manager.ts +++ b/src/connection/connection-manager.ts @@ -4,7 +4,6 @@ import type { IncomingMessage } from "http"; import type { WebSocket } from 'ws'; import { SocketConnection } from "./socket-connection.js"; -import { InMemorySubscriptionManager } from "../subscription-manager.js"; /** * Interface for managing `WebSocket` connections as they arrive. @@ -21,7 +20,6 @@ export interface ConnectionManager { /** * A Simple In Memory ConnectionManager implementation. * It uses a `Map` to manage connections. - * It uses am `InMemorySubscriptionManager` for individual subscription management within the connection. */ export class InMemoryConnectionManager implements ConnectionManager { constructor(private dwn: Dwn, private connections: Map = new Map()) {} @@ -31,7 +29,7 @@ export class InMemoryConnectionManager implements ConnectionManager { * Sets listeners for `message`, `pong`, `close`, and `error` events. */ async connect(socket: WebSocket): Promise { - const connection = new SocketConnection(socket, this.dwn, new InMemorySubscriptionManager()); + const connection = new SocketConnection(socket, this.dwn); this.connections.set(socket, connection); // attach to the socket's close handler to clean up this connection. socket.on('close', () => { diff --git a/src/connection/socket-connection.ts b/src/connection/socket-connection.ts index d45d554..6613ff1 100644 --- a/src/connection/socket-connection.ts +++ b/src/connection/socket-connection.ts @@ -6,25 +6,31 @@ import log from 'loglevel'; import { v4 as uuidv4 } from 'uuid'; import type { RequestContext } from "../lib/json-rpc-router.js"; -import type { SubscriptionManager } from "../subscription-manager.js"; import type { JsonRpcErrorResponse, JsonRpcId, JsonRpcRequest, JsonRpcResponse } from "../lib/json-rpc.js"; import { requestCounter } from "../metrics.js"; import { jsonRpcApi } from "../json-rpc-api.js"; import { JsonRpcErrorCodes, createJsonRpcErrorResponse, createJsonRpcSuccessResponse } from "../lib/json-rpc.js"; +import { DwnServerError, DwnServerErrorCode } from "../dwn-error.js"; const SOCKET_ISALIVE_SYMBOL = Symbol('isAlive'); const HEARTBEAT_INTERVAL = 30_000; +export interface Subscription { + id: JsonRpcId; + close: () => Promise; +} + /** * SocketConnection class sets up a socket connection along with a `ping/pong` heartbeat. */ export class SocketConnection { private heartbeatInterval: NodeJS.Timer; + private subscriptions: Map = new Map(); + constructor( private socket: WebSocket, - private dwn: Dwn, - private subscriptions: SubscriptionManager + private dwn: Dwn ){ socket.on('close', this.close.bind(this)); socket.on('pong', this.pong.bind(this)); @@ -46,6 +52,28 @@ export class SocketConnection { }, HEARTBEAT_INTERVAL); } + async subscribe(subscription: Subscription): Promise { + if (this.subscriptions.has(subscription.id)) { + throw new DwnServerError( + DwnServerErrorCode.ConnectionSubscriptionJsonRPCIdExists, + `the subscription with id ${subscription.id} already exists` + ) + } + + this.subscriptions.set(subscription.id, subscription); + } + + async closeSubscription(id: JsonRpcId): Promise { + if (!this.subscriptions.has(id)) { + throw new DwnServerError( + DwnServerErrorCode.ConnectionSubscriptionJsonRPCIdNotFound, + `the subscription with id ${id} was not found` + ) + } + + this.subscriptions.delete(id); + } + /** * Closes the existing connection and cleans up any listeners or subscriptions. */ @@ -54,8 +82,13 @@ export class SocketConnection { // clean up all socket event listeners this.socket.removeAllListeners(); + const closePromises = []; + for (const [_target, subscription] of this.subscriptions) { + closePromises.push(subscription.close()); + } + // close all of the associated subscriptions - await this.subscriptions.closeAll(); + await Promise.all(closePromises); // close the socket. this.socket.close(); @@ -144,9 +177,9 @@ export class SocketConnection { private async buildRequestContext(request: JsonRpcRequest): Promise { const { id, params, method} = request; const requestContext: RequestContext = { - transport : 'ws', - dwn : this.dwn, - subscriptionManager : this.subscriptions, + transport : 'ws', + dwn : this.dwn, + socketConnection : this, } if (method === 'dwn.processMessage') { diff --git a/src/dwn-error.ts b/src/dwn-error.ts index 0163a28..196a5e2 100644 --- a/src/dwn-error.ts +++ b/src/dwn-error.ts @@ -26,12 +26,13 @@ export class DwnServerError extends Error { * DWN Server error codes. */ export enum DwnServerErrorCode { + ConnectionSubscriptionJsonRPCIdExists = 'ConnectionSubscriptionJsonRPCIdExists', + ConnectionSubscriptionJsonRPCIdNotFound = 'ConnectionSubscriptionJsonRPCIdNotFound', ProofOfWorkInsufficientSolutionNonce = 'ProofOfWorkInsufficientSolutionNonce', ProofOfWorkInvalidOrExpiredChallenge = 'ProofOfWorkInvalidOrExpiredChallenge', ProofOfWorkManagerInvalidChallengeNonce = 'ProofOfWorkManagerInvalidChallengeNonce', ProofOfWorkManagerInvalidResponseNonceFormat = 'ProofOfWorkManagerInvalidResponseNonceFormat', ProofOfWorkManagerResponseNonceReused = 'ProofOfWorkManagerResponseNonceReused', RegistrationManagerInvalidOrOutdatedTermsOfServiceHash = 'RegistrationManagerInvalidOrOutdatedTermsOfServiceHash', - SubscriptionManagerSubscriptionNotFound = 'SubscriptionManagerSubscriptionNotFound', TenantRegistrationOutdatedTermsOfService = 'TenantRegistrationOutdatedTermsOfService', } diff --git a/src/json-rpc-handlers/dwn/process-message.ts b/src/json-rpc-handlers/dwn/process-message.ts index d46cbb4..7ea296e 100644 --- a/src/json-rpc-handlers/dwn/process-message.ts +++ b/src/json-rpc-handlers/dwn/process-message.ts @@ -4,11 +4,15 @@ import { DwnInterfaceName, DwnMethodName } from '@tbd54566975/dwn-sdk-js'; import type { Readable as IsomorphicReadable } from 'readable-stream'; import { v4 as uuidv4 } from 'uuid'; +import type { + JsonRpcErrorResponse, +} from '../../lib/json-rpc.js'; import type { HandlerResponse, JsonRpcHandler, } from '../../lib/json-rpc-router.js'; +import { DwnServerErrorCode } from '../../dwn-error.js'; import { createJsonRpcErrorResponse, createJsonRpcSuccessResponse, @@ -19,7 +23,7 @@ export const handleDwnProcessMessage: JsonRpcHandler = async ( dwnRequest, context, ) => { - const { dwn, dataStream, subscriptionHandler, subscriptionManager, transport } = context; + const { dwn, dataStream, subscriptionHandler, socketConnection, transport } = context; const { target, message } = dwnRequest.params as { target: string, message: GenericMessage }; const requestId = dwnRequest.id ?? uuidv4(); @@ -65,8 +69,31 @@ export const handleDwnProcessMessage: JsonRpcHandler = async ( // Subscribe messages return a close function to facilitate closing the subscription if (subscription !== undefined) { - const { id, close } = subscription; - subscriptionManager.subscribe(target, { id, close }); + const { close } = subscription; + try { + await socketConnection.subscribe({ + id: requestId, + close, + }) + } catch(error) { + let errorResponse: JsonRpcErrorResponse; + if (error.code === DwnServerErrorCode.ConnectionSubscriptionJsonRPCIdExists) { + // a subscription with this request id already exists + errorResponse = createJsonRpcErrorResponse( + requestId, + JsonRpcErrorCodes.BadRequest, + `the request id ${requestId} already has an active subscription` + ); + } else { + // will catch as an unknown error below + throw new Error('unknown error adding subscription'); + } + + // close the subscription that was just opened and return an error + await close(); + return { jsonRpcResponse: errorResponse }; + } + delete reply.subscription.close // not serializable via JSON } diff --git a/src/json-rpc-handlers/subscriptions/close.ts b/src/json-rpc-handlers/subscriptions/close.ts index c38c386..b926447 100644 --- a/src/json-rpc-handlers/subscriptions/close.ts +++ b/src/json-rpc-handlers/subscriptions/close.ts @@ -6,7 +6,7 @@ import type { JsonRpcHandler, } from '../../lib/json-rpc-router.js'; -import type { JsonRpcResponse } from '../../lib/json-rpc.js'; +import type { JsonRpcId, JsonRpcResponse } from '../../lib/json-rpc.js'; import { createJsonRpcErrorResponse, createJsonRpcSuccessResponse, @@ -14,9 +14,10 @@ import { } from '../../lib/json-rpc.js'; /** - * Closes a subscription for a given `target` and `subscriptionId` within a given connection's `SubscriptionManager`. - * @param dwnRequest must include the `target` and `subscriptionId` within the `params`. - * @param context must include the `subscriptionManager` for the associated connection. + * Closes a subscription for a given `id` for a given `SocketConnection` + * + * @param dwnRequest must include the `id` of the subscription to close within the `params`. + * @param context must include the associated `SocketConnection`. * */ export const handleSubscriptionsClose: JsonRpcHandler = async ( @@ -24,21 +25,21 @@ export const handleSubscriptionsClose: JsonRpcHandler = async ( context, ) => { const requestId = dwnRequest.id ?? uuidv4(); - const { subscriptionManager } = context; - const { target, subscriptionId } = dwnRequest.params as { target: string, subscriptionId: string }; + const { socketConnection } = context; + const { id } = dwnRequest.params as { id: JsonRpcId}; let jsonRpcResponse:JsonRpcResponse; try { - await subscriptionManager.close(target, subscriptionId); + await socketConnection.closeSubscription(id); jsonRpcResponse = createJsonRpcSuccessResponse(requestId, { reply: { status: 200, detail: 'Accepted' } }); } catch(error) { - if (error.code === DwnServerErrorCode.SubscriptionManagerSubscriptionNotFound) { - jsonRpcResponse = createJsonRpcErrorResponse(requestId, JsonRpcErrorCodes.InvalidParams, `subscription ${subscriptionId} does not exist.`); + if (error.code === DwnServerErrorCode.ConnectionSubscriptionJsonRPCIdNotFound) { + jsonRpcResponse = createJsonRpcErrorResponse(requestId, JsonRpcErrorCodes.InvalidParams, `subscription ${id} does not exist.`); } else { jsonRpcResponse = createJsonRpcErrorResponse( requestId, JsonRpcErrorCodes.InternalError, - `unknown subscription close error for ${subscriptionId}: ${error.message}` + `unknown subscription close error for ${id}: ${error.message}` ); } } diff --git a/src/lib/json-rpc-router.ts b/src/lib/json-rpc-router.ts index 8065a15..18ba03e 100644 --- a/src/lib/json-rpc-router.ts +++ b/src/lib/json-rpc-router.ts @@ -2,14 +2,13 @@ import type { Dwn, MessageSubscriptionHandler } from '@tbd54566975/dwn-sdk-js'; import type { Readable } from 'node:stream'; -import type { SubscriptionManager } from '../subscription-manager.js'; import type { JsonRpcRequest, JsonRpcResponse } from './json-rpc.js'; +import type { SocketConnection } from '../connection/socket-connection.js'; export type RequestContext = { transport: 'http' | 'ws'; dwn: Dwn; - /** The `SubscriptionManager` associated with a subscription request, only used in `ws` requests */ - subscriptionManager?: SubscriptionManager; + socketConnection?: SocketConnection; /** The `MessageSubscriptionHandler` associated with a subscription request, only used in `ws` requests */ subscriptionHandler?: MessageSubscriptionHandler; /** The `Readable` stream associated with a `RecordsWrite` request only used in `ws` requests */ diff --git a/src/subscription-manager.ts b/src/subscription-manager.ts deleted file mode 100644 index ac18d2b..0000000 --- a/src/subscription-manager.ts +++ /dev/null @@ -1,60 +0,0 @@ -import type { MessageSubscription } from "@tbd54566975/dwn-sdk-js"; - -import { DwnServerError, DwnServerErrorCode } from "./dwn-error.js"; - -/** - * SubscriptionManager manages the subscriptions related to a `SocketConnection` - */ -export interface SubscriptionManager { - subscribe: (target: string, subscription: MessageSubscription) => Promise; - close: (target: string, id: string) => Promise; - closeAll: () => Promise; -} - -/** - * Simple InMemory implementation of a `SubscriptionManager`. - * Uses `Map` to manage internal state. - */ -export class InMemorySubscriptionManager implements SubscriptionManager { - constructor(private subscriptions: Map> = new Map()){}; - - async subscribe(target: string, subscription: MessageSubscription): Promise { - let targetSubscriptions = this.subscriptions.get(target); - if (targetSubscriptions === undefined) { - targetSubscriptions = new Map(); - this.subscriptions.set(target, targetSubscriptions); - } - targetSubscriptions.set(subscription.id, subscription); - } - - async close(target: string, id: string): Promise { - const targetSubscriptions = this.subscriptions.get(target); - if (targetSubscriptions !== undefined) { - const subscription = targetSubscriptions.get(id); - if (subscription !== undefined) { - targetSubscriptions.delete(id); - await subscription.close(); - return; - } - } - - // if it reached here no subscription to close - throw new DwnServerError( - DwnServerErrorCode.SubscriptionManagerSubscriptionNotFound, - `subscription ${id} was not found` - ) - } - - async closeAll(): Promise { - const closePromises = []; - for (const [target, subscriptions] of this.subscriptions) { - this.subscriptions.delete(target); - for (const [id, subscription] of subscriptions) { - subscriptions.delete(id); - closePromises.push(subscription.close()); - } - } - - await Promise.all(closePromises); - } -} \ No newline at end of file diff --git a/tests/utils.ts b/tests/utils.ts index c05021d..0df40ff 100644 --- a/tests/utils.ts +++ b/tests/utils.ts @@ -11,7 +11,7 @@ import type { Readable } from 'readable-stream'; import { fileURLToPath } from 'url'; import { WebSocket } from 'ws'; -import type { JsonRpcResponse, JsonRpcRequest } from '../src/lib/json-rpc.js'; +import type { JsonRpcResponse, JsonRpcRequest, JsonRpcId } from '../src/lib/json-rpc.js'; import { createJsonRpcRequest } from '../src/lib/json-rpc.js'; import { JSONRPCSocket } from '../src/json-rpc-socket.js'; @@ -230,12 +230,12 @@ export async function subscriptionRequest( messageHandler: MessageSubscriptionHandler ): Promise<{ status: any, subscription?: { id: string, close: () => Promise } }> { let resolved: boolean = false; - const { params: { target } } = request; + const { id: requestId } = request; const connection = await JSONRPCSocket.connect(url); - const closeSubscription = async (id: string, target: string, connection: JSONRPCSocket): Promise => { + const closeSubscription = async (id: JsonRpcId, connection: JSONRPCSocket): Promise => { const requestId = uuidv4(); - const request = createJsonRpcRequest(requestId, 'subscriptions.close', { subscriptionId: id, target }); + const request = createJsonRpcRequest(requestId, 'subscriptions.close', { id }); return await connection.request(request); } @@ -263,7 +263,7 @@ export async function subscriptionRequest( ...subscription, close: async (): Promise => { subscriptionClose(); - const closeResponse = await closeSubscription(subscription.id, target, connection); + const closeResponse = await closeSubscription(requestId, connection); if (closeResponse.error?.message !== undefined) { throw new Error(`unable to close subscription: ${closeResponse.error.message}`); }