diff --git a/src/json-rpc-handlers/dwn/process-message.ts b/src/json-rpc-handlers/dwn/process-message.ts index 183e83c..a9a1c63 100644 --- a/src/json-rpc-handlers/dwn/process-message.ts +++ b/src/json-rpc-handlers/dwn/process-message.ts @@ -1,4 +1,14 @@ -import { DwnInterfaceName, DwnMethodName } from '@tbd54566975/dwn-sdk-js'; +import { v4 as uuidv4 } from 'uuid'; +import type { Readable as IsomorphicReadable } from 'readable-stream'; +import type { + RecordsReadReply, + SubscriptionRequestReply, +} from '@tbd54566975/dwn-sdk-js'; +import { + DwnInterfaceName, + DwnMethodName, + SubscriptionRequest, +} from '@tbd54566975/dwn-sdk-js'; import type { HandlerResponse, JsonRpcHandler, @@ -9,11 +19,6 @@ import { createJsonRpcSuccessResponse, } from '../../lib/json-rpc.js'; -import type { Readable as IsomorphicReadable } from 'readable-stream'; -import type { RecordsReadReply } from '@tbd54566975/dwn-sdk-js'; -import type { SubscriptionRequestReply } from '@tbd54566975/dwn-sdk-js'; -import { v4 as uuidv4 } from 'uuid'; - export const handleDwnProcessMessage: JsonRpcHandler = async ( dwnRequest, context, @@ -21,16 +26,12 @@ export const handleDwnProcessMessage: JsonRpcHandler = async ( const { dwn, dataStream } = context; const { target, message } = dwnRequest.params; const requestId = dwnRequest.id ?? uuidv4(); - try { - let reply; + let reply: any; + const messageType = message?.descriptor?.interface + message?.descriptor?.method; - // When a record is deleted via `RecordsDelete`, the initial RecordsWrite is kept as a tombstone _in addition_ - // to the RecordsDelete message. the data associated to that initial RecordsWrite is deleted. If a record was written - // _and_ deleted before it ever got to dwn-server, we end up in a situation where we still need to process the tombstone - // so that we can process the RecordsDelete. if ( messageType === DwnInterfaceName.Records + DwnMethodName.Write && !dataStream @@ -44,18 +45,25 @@ export const handleDwnProcessMessage: JsonRpcHandler = async ( target, message, )) as SubscriptionRequestReply; - if (!context.subscriptionManager || context.socket) { + if (!context.subscriptionManager || !context.socket) { throw new Error( 'setup failure. improper context provided for subscription', ); } + + // FIXME: How to handle subscription requests? + const request = await SubscriptionRequest.create({}); const req = { socket: context.socket, - from: dwnRequest.params?.descriptor, - request: {}, + from: message.descriptor.author, + request: request, }; - const subscription = await context.subscriptionManager.subscribe(req); - console.log(subscription); + reply = await context.subscriptionManager.subscribe(req); + const jsonRpcResponse = createJsonRpcSuccessResponse(requestId, { + reply, + }); + const responsePayload: HandlerResponse = { jsonRpcResponse }; + return responsePayload; } else { reply = (await dwn.processMessage( target, @@ -63,7 +71,7 @@ export const handleDwnProcessMessage: JsonRpcHandler = async ( dataStream as IsomorphicReadable, )) as RecordsReadReply; } - // RecordsRead messages return record data as a stream to for accommodate large amounts of data + let recordDataStream; if (reply?.record?.data !== undefined) { recordDataStream = reply.record.data; @@ -83,7 +91,6 @@ export const handleDwnProcessMessage: JsonRpcHandler = async ( JsonRpcErrorCodes.InternalError, e.message, ); - return { jsonRpcResponse } as HandlerResponse; } }; diff --git a/src/subscription-manager.ts b/src/subscription-manager.ts index 58e02c9..0a93828 100644 --- a/src/subscription-manager.ts +++ b/src/subscription-manager.ts @@ -28,15 +28,15 @@ export interface SubscriptionController { } export type RegisterSubscriptionRequest = { - from: string; - socket: WebSocket; - filters?: SubscriptionFilter[]; - permissionGrant?: PermissionsGrant; - request: SubscriptionRequest; + from: string; // from connection + socket: WebSocket; // socket connection + filters?: SubscriptionFilter[]; // filters, if applicable + permissionGrant?: PermissionsGrant; //permission grant, if applicable + request: SubscriptionRequest; // subscription request }; export type RegisterSubscriptionReply = { - reply: SubscriptionRequestReply; + reply?: SubscriptionRequestReply; subscriptionId?: string; }; @@ -52,21 +52,17 @@ export class SubscriptionManager { private wss: WebSocketServer; private dwn: Dwn; private connections: Map; - private tenant: string; options: SubscriptionManagerOptions; #open: boolean; constructor(options?: SubscriptionManagerOptions) { this.wss = options?.wss || new WebSocketServer(); this.connections = new Map(); - this.tenant = options?.tenant; this.dwn = options?.dwn; this.options = options; this.wss.on('connection', (socket: WebSocket) => { - console.log('connected'); socket.on('message', async (data) => { - console.log('got message...'); await this.handleSubscribe(socket, data); }); }); @@ -132,7 +128,7 @@ export class SubscriptionManager { req: RegisterSubscriptionRequest, ): Promise { const subscriptionReply = await this.dwn.handleSubscriptionRequest( - this.tenant, + req.from, req.request.message, ); if (subscriptionReply.status.code !== 200) { @@ -148,6 +144,10 @@ export class SubscriptionManager { return req.socket.send(Buffer.from(str)); }, ); + return { + reply: subscriptionReply, + subscriptionId: subscription?.subscriptionId, + } as RegisterSubscriptionReply; } private async registerSubscription( diff --git a/src/ws-api.ts b/src/ws-api.ts index 6809f71..3b67b04 100644 --- a/src/ws-api.ts +++ b/src/ws-api.ts @@ -50,6 +50,7 @@ export class WsApi { */ #handleConnection(socket: WebSocket, _request: IncomingMessage): void { const dwn = this.dwn; + const subscriptionManager = this.#subscriptionManager; socket[SOCKET_ISALIVE_SYMBOL] = true; @@ -107,6 +108,8 @@ export class WsApi { dwn, transport: 'ws', dataStream: requestDataStream, + subscriptionManager: subscriptionManager, + socket: socket, }; const { jsonRpcResponse } = await jsonRpcApi.handle( diff --git a/tests/subscription-manager.spec.ts b/tests/subscription-manager.spec.ts index 9f84fd9..cd3d78e 100644 --- a/tests/subscription-manager.spec.ts +++ b/tests/subscription-manager.spec.ts @@ -68,7 +68,10 @@ describe('Subscription Manager Test', async () => { // set up lisetner... socket.onmessage = (event): Promise => { try { - console.log('got message'); + const resp = JSON.parse(event.data.toString()); + if (resp.error) { + throw new Error(resp.error.message); + } resolve(event); return; } catch (error) { @@ -92,6 +95,7 @@ describe('Subscription Manager Test', async () => { }; socket.onopen = async (): Promise => { + // on open const requestId = uuidv4(); const dwnRequest = createJsonRpcRequest( requestId, @@ -101,6 +105,7 @@ describe('Subscription Manager Test', async () => { target: alice.did, }, ); + try { if (socket.readyState !== WebSocket.OPEN) { reject(new Error('socket not open'));