diff --git a/src/json-rpc-handlers/dwn/process-message.ts b/src/json-rpc-handlers/dwn/process-message.ts index 461f86a..183e83c 100644 --- a/src/json-rpc-handlers/dwn/process-message.ts +++ b/src/json-rpc-handlers/dwn/process-message.ts @@ -1,19 +1,19 @@ -import type { Readable as IsomorphicReadable } from 'readable-stream'; -import type { RecordsReadReply } from '@tbd54566975/dwn-sdk-js'; +import { DwnInterfaceName, DwnMethodName } from '@tbd54566975/dwn-sdk-js'; import type { HandlerResponse, JsonRpcHandler, } from '../../lib/json-rpc-router.js'; - -import { v4 as uuidv4 } from 'uuid'; -import { DwnInterfaceName, DwnMethodName } from '@tbd54566975/dwn-sdk-js'; - import { + JsonRpcErrorCodes, createJsonRpcErrorResponse, createJsonRpcSuccessResponse, - JsonRpcErrorCodes, } from '../../lib/json-rpc.js'; +import type { Readable as IsomorphicReadable } from 'readable-stream'; +import type { RecordsReadReply } from '@tbd54566975/dwn-sdk-js'; +import type { SubscriptionRequestReply } from '@tbd54566975/dwn-sdk-js'; +import { v4 as uuidv4 } from 'uuid'; + export const handleDwnProcessMessage: JsonRpcHandler = async ( dwnRequest, context, @@ -36,6 +36,26 @@ export const handleDwnProcessMessage: JsonRpcHandler = async ( !dataStream ) { reply = await dwn.synchronizePrunedInitialRecordsWrite(target, message); + } else if ( + messageType === + DwnInterfaceName.Subscriptions + DwnMethodName.Request + ) { + reply = (await dwn.processMessage( + target, + message, + )) as SubscriptionRequestReply; + if (!context.subscriptionManager || context.socket) { + throw new Error( + 'setup failure. improper context provided for subscription', + ); + } + const req = { + socket: context.socket, + from: dwnRequest.params?.descriptor, + request: {}, + }; + const subscription = await context.subscriptionManager.subscribe(req); + console.log(subscription); } else { reply = (await dwn.processMessage( target, @@ -43,7 +63,6 @@ export const handleDwnProcessMessage: JsonRpcHandler = async ( dataStream as IsomorphicReadable, )) as RecordsReadReply; } - // RecordsRead messages return record data as a stream to for accommodate large amounts of data let recordDataStream; if (reply?.record?.data !== undefined) { diff --git a/src/lib/json-rpc-router.ts b/src/lib/json-rpc-router.ts index 33f7314..eb4d451 100644 --- a/src/lib/json-rpc-router.ts +++ b/src/lib/json-rpc-router.ts @@ -1,11 +1,16 @@ +import type { JsonRpcRequest, JsonRpcResponse } from './json-rpc.js'; + import type { Dwn } from '@tbd54566975/dwn-sdk-js'; import type { Readable } from 'node:stream'; -import type { JsonRpcRequest, JsonRpcResponse } from './json-rpc.js'; +import type { SubscriptionController } from '../subscription-manager.js'; +import type { WebSocket } from 'ws'; export type RequestContext = { dwn: Dwn; transport: 'http' | 'ws'; dataStream?: Readable; + socket?: WebSocket; + subscriptionManager?: SubscriptionController; }; export type HandlerResponse = { diff --git a/src/subscription-manager.ts b/src/subscription-manager.ts index 2318237..58e02c9 100644 --- a/src/subscription-manager.ts +++ b/src/subscription-manager.ts @@ -2,7 +2,6 @@ import type { Dwn, SubscriptionFilter } from '@tbd54566975/dwn-sdk-js'; import type { EventMessage, PermissionsGrant } from '@tbd54566975/dwn-sdk-js'; import type { JsonRpcSuccessResponse } from './lib/json-rpc.js'; -import type { MessageStore } from '@tbd54566975/dwn-sdk-js'; import { SubscriptionRequest } from '@tbd54566975/dwn-sdk-js'; import type { SubscriptionRequestReply } from '@tbd54566975/dwn-sdk-js'; import type WebSocket from 'ws'; @@ -46,7 +45,6 @@ export type defaultSubscriptionChannel = 'event'; export type SubscriptionManagerOptions = { wss?: WebSocketServer; dwn: Dwn; - messageStore: MessageStore; tenant: string; }; @@ -54,7 +52,6 @@ export class SubscriptionManager { private wss: WebSocketServer; private dwn: Dwn; private connections: Map; - private messageStore: MessageStore; private tenant: string; options: SubscriptionManagerOptions; #open: boolean; @@ -62,13 +59,14 @@ export class SubscriptionManager { constructor(options?: SubscriptionManagerOptions) { this.wss = options?.wss || new WebSocketServer(); this.connections = new Map(); - this.messageStore = options?.messageStore; this.tenant = options?.tenant; this.dwn = options?.dwn; this.options = options; this.wss.on('connection', (socket: WebSocket) => { - socket.on('subscribe', async (data) => { + console.log('connected'); + socket.on('message', async (data) => { + console.log('got message...'); await this.handleSubscribe(socket, data); }); }); diff --git a/src/ws-api.ts b/src/ws-api.ts index cfe846b..6809f71 100644 --- a/src/ws-api.ts +++ b/src/ws-api.ts @@ -1,7 +1,7 @@ import { base64url } from 'multiformats/bases/base64'; import { v4 as uuidv4 } from 'uuid'; import { DataStream, type Dwn } from '@tbd54566975/dwn-sdk-js'; -import type { IncomingMessage, Server } from 'http'; +import { type IncomingMessage, type Server } from 'http'; import { type AddressInfo, type WebSocket, WebSocketServer } from 'ws'; import { jsonRpcApi } from './json-rpc-api.js'; @@ -12,6 +12,10 @@ import { JsonRpcErrorCodes, type JsonRpcResponse, } from './lib/json-rpc.js'; +import { + SubscriptionManager, + type SubscriptionController, +} from './subscription-manager.js'; const SOCKET_ISALIVE_SYMBOL = Symbol('isAlive'); const HEARTBEAT_INTERVAL = 30_000; @@ -19,10 +23,16 @@ const HEARTBEAT_INTERVAL = 30_000; export class WsApi { #wsServer: WebSocketServer; dwn: Dwn; + #subscriptionManager: SubscriptionController; constructor(server: Server, dwn: Dwn) { this.dwn = dwn; this.#wsServer = new WebSocketServer({ server }); + this.#subscriptionManager = new SubscriptionManager({ + dwn: dwn, + tenant: 'asdf', + wss: this.#wsServer, + }); } // TODO: github.com/TBD54566975/dwn-server/issues/49 Add code coverage tracker, similar to either dwn-sdk-js or to web5-js @@ -63,7 +73,6 @@ export class WsApi { socket.on('message', async function (dataBuffer) { let dwnRequest; - try { // deserialize bytes into JSON object dwnRequest = dataBuffer.toString(); @@ -77,7 +86,6 @@ export class WsApi { const responseBuffer = WsApi.jsonRpcResponseToBuffer(jsonRpcResponse); return socket.send(responseBuffer); } - dwnRequest = JSON.parse(dwnRequest); } catch (e) { const jsonRpcResponse = createJsonRpcErrorResponse( @@ -85,7 +93,6 @@ export class WsApi { JsonRpcErrorCodes.BadRequest, e.message, ); - const responseBuffer = WsApi.jsonRpcResponseToBuffer(jsonRpcResponse); return socket.send(responseBuffer); } @@ -101,6 +108,7 @@ export class WsApi { transport: 'ws', dataStream: requestDataStream, }; + const { jsonRpcResponse } = await jsonRpcApi.handle( dwnRequest, requestContext, @@ -140,9 +148,7 @@ export class WsApi { #setupWebSocket(): void { this.#wsServer.on('connection', this.#handleConnection.bind(this)); - const heartbeatInterval = this.#setupHeartbeat(); - this.#wsServer.on('close', function close() { clearInterval(heartbeatInterval); }); diff --git a/tests/subscription-manager.spec.ts b/tests/subscription-manager.spec.ts index 2e92a06..9f84fd9 100644 --- a/tests/subscription-manager.spec.ts +++ b/tests/subscription-manager.spec.ts @@ -1,112 +1,120 @@ import http from 'node:http'; +import type { AddressInfo } from 'ws'; import { WebSocket, type WebSocketServer } from 'ws'; +import { v4 as uuidv4 } from 'uuid'; -import { - DataStoreLevel, - DidKeyResolver, - Dwn, - EventLogLevel, - MessageStoreLevel, - SubscriptionRequest, -} from '@tbd54566975/dwn-sdk-js'; +import { DidKeyResolver, SubscriptionRequest } from '@tbd54566975/dwn-sdk-js'; import { Jws } from '@tbd54566975/dwn-sdk-js'; -import type { SubscriptionController } from '../src/subscription-manager.js'; -import { SubscriptionManager } from '../src/subscription-manager.js'; import { assert } from 'chai'; import { createProfile } from './utils.js'; import type { Profile } from './utils.js'; import { WsApi } from '../src/ws-api.js'; +import { createJsonRpcRequest } from '../src/lib/json-rpc.js'; +import { clear as clearDwn, dwn } from './test-dwn.js'; describe('Subscription Manager Test', async () => { - let subscriptionManager: SubscriptionController; - let wsServer: WebSocketServer; let server: http.Server; - let dataStore: DataStoreLevel; - let eventLog: EventLogLevel; - let messageStore: MessageStoreLevel; + let wsServer: WebSocketServer; let alice: Profile; - let dwn: Dwn; let socket: WebSocket; before(async () => { - // Setup data stores... - dataStore = new DataStoreLevel({ - blockstoreLocation: 'data/DATASTORE', - }); - eventLog = new EventLogLevel({ location: 'data/EVENTLOG' }); - messageStore = new MessageStoreLevel({ - blockstoreLocation: 'data/MESSAGESTORE', - indexLocation: 'data/INDEX', - }); - - // create profile - alice = await createProfile(); - // create Dwn - dwn = await Dwn.create({ eventLog, dataStore, messageStore }); - // create listeners... server = http.createServer(); server.listen(9002, '127.0.0.1'); + const wsApi = new WsApi(server, dwn); wsServer = wsApi.start(); - + alice = await createProfile(); + // starts the ws server // create subscription manager... - subscriptionManager = new SubscriptionManager({ - dwn: dwn, - messageStore: messageStore, - tenant: alice.did, - wss: wsServer, - }); return; }); // before each, clear the subscriptions beforeEach(async () => { - subscriptionManager.clear(); - await dataStore.clear(); - await eventLog.clear(); - await messageStore.clear(); + // subscriptionManager.clear(); + }); + + afterEach(async () => { + await clearDwn(); }); // close at the end after(async () => { - await subscriptionManager.close(); + //await subscriptionManager.close(); wsServer.close(); server.close(); server.closeAllConnections(); - socket.close(); + if (socket) { + socket.close(); + } }); it('test subscription manager registration', async () => { try { const signer = await DidKeyResolver.generate(); - - // create a subscription request const req = await SubscriptionRequest.create({ signer: Jws.createSigner(signer), }); - // setup a socket connection to wsServer - const socket = new WebSocket(wsServer.address.toString()); - socket.onopen = async (): Promise => { - console.log('sending req', req); - // send a subscription request - // const subscription = await subscriptionManager.subscribe({ - // from: alice.did, - // subscriptionRequestMessage: req, - // permissionGrant: 'asdf', - // }); - socket.send('subscription request'); - return; - }; + const port = (wsServer.address() as AddressInfo).port; + const ip = (wsServer.address() as AddressInfo).address; + const addr = `ws://${ip}:${port}`; + const socket = new WebSocket(addr); - socket.onmessage = (event): Promise => { - console.log('got message', event); - return; - }; + const socketPromise = new Promise((resolve, reject) => { + // set up lisetner... + socket.onmessage = (event): Promise => { + try { + console.log('got message'); + resolve(event); + return; + } catch (error) { + reject(error); + } + }; + + socket.onerror = (error): void => { + reject(error); // Reject the promise if there's an error with the socket + }; + + socket.onclose = (event): void => { + if (event.wasClean) { + console.log( + `Connection closed cleanly, code=${event.code}, reason=${event.reason}`, + ); + } else { + console.error(`Connection abruptly closed`); + } + reject(new Error(`Connection closed: ${event.reason}`)); // Reject the promise on socket close + }; + + socket.onopen = async (): Promise => { + const requestId = uuidv4(); + const dwnRequest = createJsonRpcRequest( + requestId, + 'dwn.processMessage', + { + message: req.toJSON(), + target: alice.did, + }, + ); + try { + if (socket.readyState !== WebSocket.OPEN) { + reject(new Error('socket not open')); + } + socket.send(JSON.stringify(dwnRequest)); + } catch (error) { + reject(error); + } + return; + }; + }); + await socketPromise; } catch (error) { - assert.fail(error, undefined, 'failed to register subscription'); + assert.fail(error, undefined, 'failed to register subscription' + error); } }); }); diff --git a/tests/test-dwn.ts b/tests/test-dwn.ts index 515ece5..3354982 100644 --- a/tests/test-dwn.ts +++ b/tests/test-dwn.ts @@ -1,13 +1,15 @@ import { - Dwn, DataStoreLevel, + Dwn, EventLogLevel, MessageStoreLevel, } from '@tbd54566975/dwn-sdk-js'; -const dataStore = new DataStoreLevel({ blockstoreLocation: 'data/DATASTORE' }); +export const dataStore = new DataStoreLevel({ + blockstoreLocation: 'data/DATASTORE', +}); const eventLog = new EventLogLevel({ location: 'data/EVENTLOG' }); -const messageStore = new MessageStoreLevel({ +export const messageStore = new MessageStoreLevel({ blockstoreLocation: 'data/MESSAGESTORE', indexLocation: 'data/INDEX', });