diff --git a/src/config.ts b/src/config.ts index 4e52380..fe09007 100644 --- a/src/config.ts +++ b/src/config.ts @@ -22,4 +22,8 @@ export const config = { // log level - trace/debug/info/warn/error logLevel: process.env.DWN_SERVER_LOG_LEVEL || 'INFO', + + subscriptionsEnabled: + { on: true, off: false }[process.env.SUBSCRIPTIONS] ?? true, + // where to store persistant data }; diff --git a/src/json-rpc-handlers/dwn/process-message.ts b/src/json-rpc-handlers/dwn/process-message.ts index 461f86a..a9a1c63 100644 --- a/src/json-rpc-handlers/dwn/process-message.ts +++ b/src/json-rpc-handlers/dwn/process-message.ts @@ -1,17 +1,22 @@ +import { v4 as uuidv4 } from 'uuid'; import type { Readable as IsomorphicReadable } from 'readable-stream'; -import type { RecordsReadReply } from '@tbd54566975/dwn-sdk-js'; +import type { + RecordsReadReply, + SubscriptionRequestReply, +} from '@tbd54566975/dwn-sdk-js'; +import { + DwnInterfaceName, + DwnMethodName, + SubscriptionRequest, +} from '@tbd54566975/dwn-sdk-js'; import type { HandlerResponse, JsonRpcHandler, } from '../../lib/json-rpc-router.js'; - -import { v4 as uuidv4 } from 'uuid'; -import { DwnInterfaceName, DwnMethodName } from '@tbd54566975/dwn-sdk-js'; - import { + JsonRpcErrorCodes, createJsonRpcErrorResponse, createJsonRpcSuccessResponse, - JsonRpcErrorCodes, } from '../../lib/json-rpc.js'; export const handleDwnProcessMessage: JsonRpcHandler = async ( @@ -21,21 +26,44 @@ export const handleDwnProcessMessage: JsonRpcHandler = async ( const { dwn, dataStream } = context; const { target, message } = dwnRequest.params; const requestId = dwnRequest.id ?? uuidv4(); - try { - let reply; + let reply: any; + const messageType = message?.descriptor?.interface + message?.descriptor?.method; - // When a record is deleted via `RecordsDelete`, the initial RecordsWrite is kept as a tombstone _in addition_ - // to the RecordsDelete message. the data associated to that initial RecordsWrite is deleted. If a record was written - // _and_ deleted before it ever got to dwn-server, we end up in a situation where we still need to process the tombstone - // so that we can process the RecordsDelete. if ( messageType === DwnInterfaceName.Records + DwnMethodName.Write && !dataStream ) { reply = await dwn.synchronizePrunedInitialRecordsWrite(target, message); + } else if ( + messageType === + DwnInterfaceName.Subscriptions + DwnMethodName.Request + ) { + reply = (await dwn.processMessage( + target, + message, + )) as SubscriptionRequestReply; + if (!context.subscriptionManager || !context.socket) { + throw new Error( + 'setup failure. improper context provided for subscription', + ); + } + + // FIXME: How to handle subscription requests? + const request = await SubscriptionRequest.create({}); + const req = { + socket: context.socket, + from: message.descriptor.author, + request: request, + }; + reply = await context.subscriptionManager.subscribe(req); + const jsonRpcResponse = createJsonRpcSuccessResponse(requestId, { + reply, + }); + const responsePayload: HandlerResponse = { jsonRpcResponse }; + return responsePayload; } else { reply = (await dwn.processMessage( target, @@ -44,7 +72,6 @@ export const handleDwnProcessMessage: JsonRpcHandler = async ( )) as RecordsReadReply; } - // RecordsRead messages return record data as a stream to for accommodate large amounts of data let recordDataStream; if (reply?.record?.data !== undefined) { recordDataStream = reply.record.data; @@ -64,7 +91,6 @@ export const handleDwnProcessMessage: JsonRpcHandler = async ( JsonRpcErrorCodes.InternalError, e.message, ); - return { jsonRpcResponse } as HandlerResponse; } }; diff --git a/src/lib/json-rpc-router.ts b/src/lib/json-rpc-router.ts index 33f7314..eb4d451 100644 --- a/src/lib/json-rpc-router.ts +++ b/src/lib/json-rpc-router.ts @@ -1,11 +1,16 @@ +import type { JsonRpcRequest, JsonRpcResponse } from './json-rpc.js'; + import type { Dwn } from '@tbd54566975/dwn-sdk-js'; import type { Readable } from 'node:stream'; -import type { JsonRpcRequest, JsonRpcResponse } from './json-rpc.js'; +import type { SubscriptionController } from '../subscription-manager.js'; +import type { WebSocket } from 'ws'; export type RequestContext = { dwn: Dwn; transport: 'http' | 'ws'; dataStream?: Readable; + socket?: WebSocket; + subscriptionManager?: SubscriptionController; }; export type HandlerResponse = { diff --git a/src/subscription-manager.ts b/src/subscription-manager.ts new file mode 100644 index 0000000..bb8c64d --- /dev/null +++ b/src/subscription-manager.ts @@ -0,0 +1,153 @@ +import type { Dwn, SubscriptionFilter } from '@tbd54566975/dwn-sdk-js'; +import type { EventMessage, PermissionsGrant } from '@tbd54566975/dwn-sdk-js'; + +import type { JsonRpcSuccessResponse } from './lib/json-rpc.js'; +import type { SubscriptionRequest } from '@tbd54566975/dwn-sdk-js'; +import type { SubscriptionRequestReply } from '@tbd54566975/dwn-sdk-js'; +import type WebSocket from 'ws'; +import { WebSocketServer } from 'ws'; +import { v4 as uuidv4 } from 'uuid'; + +export class Subscription { + from?: string; + subscriptionId: string; + createdAt: string; + description: string; + filters?: SubscriptionFilter[]; + permissionGrant: PermissionsGrant; + connection: WebSocket; +} + +export interface SubscriptionController { + clear(): Promise; + close(): Promise; + start(): Promise; + subscribe( + request: RegisterSubscriptionRequest, + ): Promise; +} + +export type RegisterSubscriptionRequest = { + from: string; // from connection + socket: WebSocket; // socket connection + filters?: SubscriptionFilter[]; // filters, if applicable + permissionGrant?: PermissionsGrant; //permission grant, if applicable + request: SubscriptionRequest; // subscription request +}; + +export type RegisterSubscriptionReply = { + reply?: SubscriptionRequestReply; + subscriptionId?: string; +}; + +export type defaultSubscriptionChannel = 'event'; + +export type SubscriptionManagerOptions = { + wss?: WebSocketServer; + dwn: Dwn; + tenant: string; +}; + +export class SubscriptionManager { + private wss: WebSocketServer; + private dwn: Dwn; + private connections: Map; + options: SubscriptionManagerOptions; + #open: boolean; + + constructor(options?: SubscriptionManagerOptions) { + this.wss = options?.wss || new WebSocketServer(); + this.connections = new Map(); + this.dwn = options?.dwn; + this.options = options; + } + + async clear(): Promise { + this.wss.removeAllListeners(); + this.connections.clear(); + } + + async close(): Promise { + this.#open = false; + this.connections.clear(); + this.wss.close(); + } + + async open(): Promise { + this.#open = true; + } + + async start(): Promise { + this.open(); + } + + private async createSubscription( + from: string, + request: RegisterSubscriptionRequest, + ): Promise { + return { + from, + subscriptionId: uuidv4(), + createdAt: new Date().toISOString(), + description: 'subscription', + filters: request.filters, + permissionGrant: request.permissionGrant, + connection: request.socket, + }; + } + + createJSONRPCEvent(e: EventMessage): JsonRpcSuccessResponse { + return { + id: uuidv4(), + jsonrpc: '2.0', + result: e, + }; + } + + async subscribe( + req: RegisterSubscriptionRequest, + ): Promise { + const subscriptionReply = await this.dwn.handleSubscriptionRequest( + req.from, + req.request.message, + ); + if (subscriptionReply.status.code !== 200) { + return { reply: subscriptionReply }; + } + const subscription = await this.createSubscription(req.from, req); + this.registerSubscription(subscription); + // set up forwarding. + subscriptionReply.subscription.emitter.on( + async (e: EventMessage): Promise => { + const jsonRpcResponse = this.createJSONRPCEvent(e); + const str = JSON.stringify(jsonRpcResponse); + return req.socket.send(Buffer.from(str)); + }, + ); + return { + reply: subscriptionReply, + subscriptionId: subscription?.subscriptionId, + } as RegisterSubscriptionReply; + } + + private async registerSubscription( + subscription: Subscription, + ): Promise { + if (!this.#open) { + throw new Error("Can't register subscription. It's not opened."); + } + if (this.connections.has(subscription.subscriptionId)) { + throw new Error( + 'Failed to add connection to controller. ID already exists.', + ); + } + this.connections.set(subscription.subscriptionId, subscription); + subscription.connection.on('close', () => { + this.deleteSubscription(subscription.subscriptionId); + }); + } + + private async deleteSubscription(id: string): Promise { + this.connections.delete(id); + } +} diff --git a/src/ws-api.ts b/src/ws-api.ts index cfe846b..3b67b04 100644 --- a/src/ws-api.ts +++ b/src/ws-api.ts @@ -1,7 +1,7 @@ import { base64url } from 'multiformats/bases/base64'; import { v4 as uuidv4 } from 'uuid'; import { DataStream, type Dwn } from '@tbd54566975/dwn-sdk-js'; -import type { IncomingMessage, Server } from 'http'; +import { type IncomingMessage, type Server } from 'http'; import { type AddressInfo, type WebSocket, WebSocketServer } from 'ws'; import { jsonRpcApi } from './json-rpc-api.js'; @@ -12,6 +12,10 @@ import { JsonRpcErrorCodes, type JsonRpcResponse, } from './lib/json-rpc.js'; +import { + SubscriptionManager, + type SubscriptionController, +} from './subscription-manager.js'; const SOCKET_ISALIVE_SYMBOL = Symbol('isAlive'); const HEARTBEAT_INTERVAL = 30_000; @@ -19,10 +23,16 @@ const HEARTBEAT_INTERVAL = 30_000; export class WsApi { #wsServer: WebSocketServer; dwn: Dwn; + #subscriptionManager: SubscriptionController; constructor(server: Server, dwn: Dwn) { this.dwn = dwn; this.#wsServer = new WebSocketServer({ server }); + this.#subscriptionManager = new SubscriptionManager({ + dwn: dwn, + tenant: 'asdf', + wss: this.#wsServer, + }); } // TODO: github.com/TBD54566975/dwn-server/issues/49 Add code coverage tracker, similar to either dwn-sdk-js or to web5-js @@ -40,6 +50,7 @@ export class WsApi { */ #handleConnection(socket: WebSocket, _request: IncomingMessage): void { const dwn = this.dwn; + const subscriptionManager = this.#subscriptionManager; socket[SOCKET_ISALIVE_SYMBOL] = true; @@ -63,7 +74,6 @@ export class WsApi { socket.on('message', async function (dataBuffer) { let dwnRequest; - try { // deserialize bytes into JSON object dwnRequest = dataBuffer.toString(); @@ -77,7 +87,6 @@ export class WsApi { const responseBuffer = WsApi.jsonRpcResponseToBuffer(jsonRpcResponse); return socket.send(responseBuffer); } - dwnRequest = JSON.parse(dwnRequest); } catch (e) { const jsonRpcResponse = createJsonRpcErrorResponse( @@ -85,7 +94,6 @@ export class WsApi { JsonRpcErrorCodes.BadRequest, e.message, ); - const responseBuffer = WsApi.jsonRpcResponseToBuffer(jsonRpcResponse); return socket.send(responseBuffer); } @@ -100,7 +108,10 @@ export class WsApi { dwn, transport: 'ws', dataStream: requestDataStream, + subscriptionManager: subscriptionManager, + socket: socket, }; + const { jsonRpcResponse } = await jsonRpcApi.handle( dwnRequest, requestContext, @@ -140,9 +151,7 @@ export class WsApi { #setupWebSocket(): void { this.#wsServer.on('connection', this.#handleConnection.bind(this)); - const heartbeatInterval = this.#setupHeartbeat(); - this.#wsServer.on('close', function close() { clearInterval(heartbeatInterval); }); diff --git a/tests/http-api.spec.ts b/tests/http-api.spec.ts index d0d59ca..72c5f4f 100644 --- a/tests/http-api.spec.ts +++ b/tests/http-api.spec.ts @@ -1,34 +1,18 @@ -// node.js 18 and earlier, needs globalThis.crypto polyfill -import { webcrypto } from 'node:crypto'; - -if (!globalThis.crypto) { - // @ts-ignore - globalThis.crypto = webcrypto; -} - -import type { Server } from 'http'; -import type { - JsonRpcErrorResponse, - JsonRpcResponse, -} from '../src/lib/json-rpc.js'; - -import fetch from 'node-fetch'; -import request from 'supertest'; - -import { expect } from 'chai'; -import { HttpApi } from '../src/http-api.js'; -import { v4 as uuidv4 } from 'uuid'; import { Cid, DataStream, RecordsQuery, RecordsRead, } from '@tbd54566975/dwn-sdk-js'; -import { clear as clearDwn, dwn } from './test-dwn.js'; import { - createJsonRpcRequest, JsonRpcErrorCodes, + createJsonRpcRequest, +} from '../src/lib/json-rpc.js'; +import type { + JsonRpcErrorResponse, + JsonRpcResponse, } from '../src/lib/json-rpc.js'; +import { clear as clearDwn, dwn } from './test-dwn.js'; import { createProfile, createRecordsWriteMessage, @@ -36,6 +20,20 @@ import { streamHttpRequest, } from './utils.js'; +import { HttpApi } from '../src/http-api.js'; +import type { Server } from 'http'; +import { expect } from 'chai'; +import fetch from 'node-fetch'; +import request from 'supertest'; +import { v4 as uuidv4 } from 'uuid'; +// node.js 18 and earlier, needs globalThis.crypto polyfill +import { webcrypto } from 'node:crypto'; + +if (!globalThis.crypto) { + // @ts-ignore + globalThis.crypto = webcrypto; +} + describe('http api', function () { let httpApi: HttpApi; let server: Server; diff --git a/tests/subscription-manager.spec.ts b/tests/subscription-manager.spec.ts new file mode 100644 index 0000000..11fa11f --- /dev/null +++ b/tests/subscription-manager.spec.ts @@ -0,0 +1,167 @@ +import http from 'node:http'; +import type { AddressInfo } from 'ws'; +import { WebSocket, type WebSocketServer } from 'ws'; +import { v4 as uuidv4 } from 'uuid'; + +import { + DataStream, + DidKeyResolver, + SubscriptionRequest, +} from '@tbd54566975/dwn-sdk-js'; + +import { Jws } from '@tbd54566975/dwn-sdk-js'; +import { assert } from 'chai'; +import { createProfile, createRecordsWriteMessage } from './utils.js'; +import type { Profile } from './utils.js'; +import { WsApi } from '../src/ws-api.js'; +import { createJsonRpcRequest } from '../src/lib/json-rpc.js'; +import { clear as clearDwn, dwn } from './test-dwn.js'; +import { base64url } from 'multiformats/bases/base64'; +import { EventType } from '@tbd54566975/dwn-sdk-js'; +import { DwnInterfaceName } from '@tbd54566975/dwn-sdk-js'; +import { DwnMethodName } from '@tbd54566975/dwn-sdk-js'; + +describe('Subscription Manager Test', async () => { + let server: http.Server; + let wsServer: WebSocketServer; + let alice: Profile; + let socket: WebSocket; + + before(async () => { + // create listeners... + server = http.createServer(); + server.listen(9002, '127.0.0.1'); + + const wsApi = new WsApi(server, dwn); + wsServer = wsApi.start(); + alice = await createProfile(); + // starts the ws server + // create subscription manager... + return; + }); + + // before each, clear the subscriptions + beforeEach(async () => { + // subscriptionManager.clear(); + }); + + afterEach(async () => { + await clearDwn(); + }); + + // close at the end + after(async () => { + //await subscriptionManager.close(); + wsServer.close(); + server.close(); + server.closeAllConnections(); + if (socket) { + socket.close(); + } + }); + + it('test subscription manager registration', async () => { + try { + const signer = await DidKeyResolver.generate(); + const req = await SubscriptionRequest.create({ + signer: Jws.createSigner(signer), + filter: { + eventType: EventType.Operation, + }, + }); + + const port = (wsServer.address() as AddressInfo).port; + const ip = (wsServer.address() as AddressInfo).address; + const addr = `ws://${ip}:${port}`; + const socket = new WebSocket(addr); + let receivedCount = 0; + + const socketPromise = new Promise((resolve, reject) => { + // set up lisetner... + socket.onmessage = (event): Promise => { + try { + const resp = JSON.parse(event.data.toString()); + if (resp.error) { + throw new Error(resp.error.message); + } + receivedCount += 1; + if ( + resp.result?.descriptor?.eventDescriptor?.interface === + DwnInterfaceName.Records && + resp.result?.descriptor?.eventDescriptor?.method === + DwnMethodName.Write + ) { + resolve(event); + socket.close(); + } + return; + } catch (error) { + reject(error); + } + }; + + socket.onerror = (error): void => { + reject(error); // Reject the promise if there's an error with the socket + }; + + socket.onclose = (event): void => { + if (event.wasClean) { + console.log( + `Connection closed cleanly, code=${event.code}, reason=${event.reason}`, + ); + } else { + console.error(`Connection abruptly closed`); + } + reject(new Error(`Connection closed: ${event.reason}`)); // Reject the promise on socket close + }; + + socket.onopen = async (): Promise => { + // on open + const requestId = uuidv4(); + const dwnRequest = createJsonRpcRequest( + requestId, + 'dwn.processMessage', + { + message: req.toJSON(), + target: alice.did, + }, + ); + + try { + if (socket.readyState !== WebSocket.OPEN) { + reject(new Error('socket not open')); + } + socket.send(JSON.stringify(dwnRequest)); + } catch (error) { + reject(error); + } + try { + const { recordsWrite, dataStream } = + await createRecordsWriteMessage(alice); + const dataBytes = await DataStream.toBytes(dataStream); + const encodedData = base64url.baseEncode(dataBytes); + + const requestId = uuidv4(); + const dwnRequest = createJsonRpcRequest( + requestId, + 'dwn.processMessage', + { + message: recordsWrite.toJSON(), + target: alice.did, + encodedData, + }, + ); + socket.send(JSON.stringify(dwnRequest)); + } catch (error) { + reject(error); + } + return; + }; + }); + await socketPromise; + assert.equal(receivedCount, 2, 'received count'); + } catch (error) { + assert.fail(error, undefined, 'failed to register subscription' + error); + } + }); +}); diff --git a/tests/test-dwn.ts b/tests/test-dwn.ts index 515ece5..3354982 100644 --- a/tests/test-dwn.ts +++ b/tests/test-dwn.ts @@ -1,13 +1,15 @@ import { - Dwn, DataStoreLevel, + Dwn, EventLogLevel, MessageStoreLevel, } from '@tbd54566975/dwn-sdk-js'; -const dataStore = new DataStoreLevel({ blockstoreLocation: 'data/DATASTORE' }); +export const dataStore = new DataStoreLevel({ + blockstoreLocation: 'data/DATASTORE', +}); const eventLog = new EventLogLevel({ location: 'data/EVENTLOG' }); -const messageStore = new MessageStoreLevel({ +export const messageStore = new MessageStoreLevel({ blockstoreLocation: 'data/MESSAGESTORE', indexLocation: 'data/INDEX', }); diff --git a/tests/utils.ts b/tests/utils.ts index f6750e4..3ec7872 100644 --- a/tests/utils.ts +++ b/tests/utils.ts @@ -12,9 +12,10 @@ import { Cid, DataStream, DidKeyResolver, - PrivateKeySigner, RecordsWrite, + SubscriptionRequest, } from '@tbd54566975/dwn-sdk-js'; +import { Jws } from '@tbd54566975/dwn-sdk-js'; // __filename and __dirname are not defined in ES module scope const __filename = fileURLToPath(import.meta.url); @@ -31,18 +32,11 @@ export type Profile = { export async function createProfile(): Promise { const { did, keyPair, keyId } = await DidKeyResolver.generate(); - - // signer is required by all dwn message classes. it's used to sign messages - const signer = new PrivateKeySigner({ - privateJwk: keyPair.privateJwk, - algorithm: keyPair.privateJwk.alg, - keyId: `${did}#${keyId}`, - }); - + const signer = Jws.createSigner({ keyPair, keyId }); return { - did, - keyPair, - signer, + did: did, + keyPair: keyPair, + signer: signer, }; } @@ -67,6 +61,19 @@ export type GenerateProtocolsConfigureOutput = { dataStream: Readable | undefined; }; +export type CreateSubscriptionRequestOverride = {}; + +export async function createSubscriptionRequest( + signer: Profile, + overrides: CreateSubscriptionRequestOverride, +): Promise { + console.log(overrides); + const subscriptionRequest = await SubscriptionRequest.create({ + signer: signer.signer, + }); + return subscriptionRequest; +} + export async function createRecordsWriteMessage( signer: Profile, overrides: CreateRecordsWriteOverrides = {},