From 402dbb7241fab764b1b0e0efd3672e26b336dd89 Mon Sep 17 00:00:00 2001 From: Andor Kesselman Date: Wed, 27 Sep 2023 09:47:13 +0530 Subject: [PATCH] initial subscriptions commit --- src/config.ts | 4 + src/dwn-server.ts | 2 +- src/subscription-manager.ts | 174 ++++++++++++++++++++++++++++++++++++ src/ws-api.ts | 21 +++-- tests/utils.ts | 14 +++ 5 files changed, 207 insertions(+), 8 deletions(-) create mode 100644 src/subscription-manager.ts diff --git a/src/config.ts b/src/config.ts index 4e52380..fe09007 100644 --- a/src/config.ts +++ b/src/config.ts @@ -22,4 +22,8 @@ export const config = { // log level - trace/debug/info/warn/error logLevel: process.env.DWN_SERVER_LOG_LEVEL || 'INFO', + + subscriptionsEnabled: + { on: true, off: false }[process.env.SUBSCRIPTIONS] ?? true, + // where to store persistant data }; diff --git a/src/dwn-server.ts b/src/dwn-server.ts index 0ba4cda..2aa2b46 100644 --- a/src/dwn-server.ts +++ b/src/dwn-server.ts @@ -46,7 +46,7 @@ export class DwnServer { this.httpServerShutdownHandler = new HttpServerShutdownHandler(httpServer); if (this.config.webSocketServerEnabled) { - const wsServer = new WsApi(httpServer, this.dwn); + const wsServer = new WsApi(httpServer, this.dwn, this.config); wsServer.listen(); } } diff --git a/src/subscription-manager.ts b/src/subscription-manager.ts new file mode 100644 index 0000000..4857493 --- /dev/null +++ b/src/subscription-manager.ts @@ -0,0 +1,174 @@ +import type { JsonRpcSuccessResponse } from './lib/json-rpc.js'; +import { SubscriptionRequestMessage } from '@tbd54566975/dwn-sdk-js'; +import { v4 as uuidv4 } from 'uuid'; +import type WebSocket from 'ws'; +import { WebSocketServer } from 'ws'; + +import type { Dwn, SubscriptionFilter } from '@tbd54566975/dwn-sdk-js'; +import type { EventMessage, PermissionsGrant } from '@tbd54566975/dwn-sdk-js'; +import type { + MessageStore, + SubscriptionRequestReply, +} from '@tbd54566975/dwn-sdk-js'; + +export class Subscription { + from?: string; + subscriptionId: string; + createdAt: string; + description: string; + filters?: SubscriptionFilter[]; + permissionGrant: PermissionsGrant; + connection: WebSocket; +} + +export interface SubscriptionController { + clear(): Promise; + close(): Promise; + start(): Promise; + subscribe( + request: RegisterSubscriptionRequest, + ): Promise; +} + +export type RegisterSubscriptionRequest = { + from: string; + socket: WebSocket; + filters?: SubscriptionFilter[]; + permissionGrant: PermissionsGrant; + subscriptionRequestMessage: SubscriptionRequestMessage; +}; + +export type RegisterSubscriptionReply = { + reply: SubscriptionRequestReply; + subscriptionId?: string; +}; + +export type defaultSubscriptionChannel = 'event'; + +export type SubscriptionManagerOptions = { + subscriptionChannel: string; + wss: WebSocketServer; + dwn: Dwn; + messageStore: MessageStore; + tenant: string; +}; + +export class SubscriptionManager { + private wss: WebSocketServer; + private dwn: Dwn; + private connections: Map; + private messageStore: MessageStore; + private tenant: string; + options: SubscriptionManagerOptions; + #open: boolean; + + constructor(options?: SubscriptionManagerOptions) { + this.wss = options?.wss || new WebSocketServer(); + this.connections = new Map(); + this.messageStore = options?.messageStore; + this.tenant = options?.tenant; + this.dwn = options?.dwn; + this.options = options; + + this.wss.on('connection', (socket: WebSocket) => { + socket.on('subscribe', async (data) => { + await this.handleSubscribe(socket, data); + }); + }); + } + + async clear(): Promise { + this.wss.removeAllListeners(); + this.connections.clear(); + } + + async close(): Promise { + this.#open = false; + this.connections.clear(); + this.wss.close(); + } + + async open(): Promise { + this.#open = true; + } + + async start(): Promise { + this.open(); + } + + private async createSubscription( + from: string, + request: RegisterSubscriptionRequest, + ): Promise { + return { + from, + subscriptionId: uuidv4(), + createdAt: new Date().toISOString(), + description: 'subscription', + filters: request.filters, + permissionGrant: request.permissionGrant, + connection: request.socket, + }; + } + + async handleSubscribe( + socket: WebSocket, + data: any, + ): Promise { + // parse message + const req = SubscriptionRequestMessage.parse(data); + return await this.subscribe(req, socket); + } + + createJSONRPCEvent(e: EventMessage): JsonRpcSuccessResponse { + return { + id: uuidv4(), + jsonrpc: '2.0', + result: e, + }; + } + + async subscribe( + req: RegisterSubscriptionRequest, + socket: WebSocket, + ): Promise { + const subscriptionReply = await this.dwn.handleSubscriptionRequest( + this.tenant, + req.subscriptionRequestMessage, + ); + if (subscriptionReply.status.code !== 200) { + return { reply: subscriptionReply }; + } + const subscription = await this.createSubscription(req.from, req); + this.registerSubscription(subscription); + // set up forwarding. + subscriptionReply.subscription.emitter.on( + async (e: EventMessage): Promise => { + const jsonRpcResponse = this.createJSONRPCEvent(e); + const str = JSON.stringify(jsonRpcResponse); + return socket.send(Buffer.from(str)); + }, + ); + } + + private async registerSubscription( + subscription: Subscription, + ): Promise { + if (!this.#open) { + throw new Error("Can't register subscription. It's not opened."); + } + if (this.connections.has(subscription.subscriptionId)) { + throw new Error( + 'Failed to add connection to controller. ID already exists.', + ); + } + this.connections.set(subscription.subscriptionId, subscription); + subscription.connection.on('close', () => { + this.deleteSubscription(subscription.subscriptionId); + }); + } + + private async deleteSubscription(id: string): Promise { + this.connections.delete(id); + } +} diff --git a/src/ws-api.ts b/src/ws-api.ts index 460eac7..6c558d2 100644 --- a/src/ws-api.ts +++ b/src/ws-api.ts @@ -1,15 +1,17 @@ -import type { Dwn } from '@tbd54566975/dwn-sdk-js'; -import type { JsonRpcResponse } from './lib/json-rpc.js'; -import type { RequestContext } from './lib/json-rpc-router.js'; -import type { Server } from 'http'; -import type { AddressInfo, WebSocket } from 'ws'; - import { base64url } from 'multiformats/bases/base64'; +import type { Config } from './config.js'; import { DataStream } from '@tbd54566975/dwn-sdk-js'; +import type { Dwn } from '@tbd54566975/dwn-sdk-js'; import { jsonRpcApi } from './json-rpc-api.js'; +import type { JsonRpcResponse } from './lib/json-rpc.js'; +import type { RequestContext } from './lib/json-rpc-router.js'; import { requestCounter } from './metrics.js'; +import type { Server } from 'http'; +import { SubscriptionManager } from './subscription-manager.js'; import { v4 as uuidv4 } from 'uuid'; import { WebSocketServer } from 'ws'; + +import type { AddressInfo, WebSocket } from 'ws'; import { createJsonRpcErrorResponse, JsonRpcErrorCodes, @@ -20,10 +22,14 @@ const SOCKET_ISALIVE_SYMBOL = Symbol('isAlive'); export class WsApi { wsServer: WebSocketServer; dwn: Dwn; + config?: Config; + subscriptionManager: SubscriptionManager; - constructor(server: Server, dwn: Dwn) { + constructor(server: Server, dwn: Dwn, config: Config) { this.dwn = dwn; this.wsServer = new WebSocketServer({ server: server }); + this.config = config; + this.subscriptionManager = new SubscriptionManager(); } // TODO: github.com/TBD54566975/dwn-server/issues/49 Add code coverage tracker, similar to either dwn-sdk-js or to web5-js @@ -98,6 +104,7 @@ export class WsApi { transport: 'ws', dataStream: requestDataStream, }; + const { jsonRpcResponse } = await jsonRpcApi.handle( dwnRequest, requestContext, diff --git a/tests/utils.ts b/tests/utils.ts index 7fc0f1a..c91ecbd 100644 --- a/tests/utils.ts +++ b/tests/utils.ts @@ -17,6 +17,7 @@ import { DataStream, DidKeyResolver, RecordsWrite, + SubscriptionRequest, } from '@tbd54566975/dwn-sdk-js'; // __filename and __dirname are not defined in ES module scope @@ -69,6 +70,19 @@ export type GenerateProtocolsConfigureOutput = { dataStream: Readable | undefined; }; +export type CreateSubscriptionRequestOverride = {}; + +export async function createSubscriptionRequest( + signer: Profile, + overrides: CreateSubscriptionRequestOverride, +): Promise { + console.log(overrides); + const subscriptionRequest = await SubscriptionRequest.create({ + authorizationSignatureInput: signer.signatureInput, + }); + return subscriptionRequest; +} + export async function createRecordsWriteMessage( signer: Profile, overrides: CreateRecordsWriteOverrides = {},