diff --git a/src/json-rpc-handlers/dwn/process-message.ts b/src/json-rpc-handlers/dwn/process-message.ts index a9a1c63..eeef9d1 100644 --- a/src/json-rpc-handlers/dwn/process-message.ts +++ b/src/json-rpc-handlers/dwn/process-message.ts @@ -36,6 +36,7 @@ export const handleDwnProcessMessage: JsonRpcHandler = async ( messageType === DwnInterfaceName.Records + DwnMethodName.Write && !dataStream ) { + console.log('sending'); reply = await dwn.synchronizePrunedInitialRecordsWrite(target, message); } else if ( messageType === diff --git a/src/subscription-manager.ts b/src/subscription-manager.ts index 0a93828..d3e1b2d 100644 --- a/src/subscription-manager.ts +++ b/src/subscription-manager.ts @@ -137,8 +137,10 @@ export class SubscriptionManager { const subscription = await this.createSubscription(req.from, req); this.registerSubscription(subscription); // set up forwarding. + // console.log('---------', subscriptionReply.subscription.emitter); subscriptionReply.subscription.emitter.on( async (e: EventMessage): Promise => { + // console.log('got a record', e); const jsonRpcResponse = this.createJSONRPCEvent(e); const str = JSON.stringify(jsonRpcResponse); return req.socket.send(Buffer.from(str)); diff --git a/tests/subscription-manager.spec.ts b/tests/subscription-manager.spec.ts index cd3d78e..11fa11f 100644 --- a/tests/subscription-manager.spec.ts +++ b/tests/subscription-manager.spec.ts @@ -3,15 +3,23 @@ import type { AddressInfo } from 'ws'; import { WebSocket, type WebSocketServer } from 'ws'; import { v4 as uuidv4 } from 'uuid'; -import { DidKeyResolver, SubscriptionRequest } from '@tbd54566975/dwn-sdk-js'; +import { + DataStream, + DidKeyResolver, + SubscriptionRequest, +} from '@tbd54566975/dwn-sdk-js'; import { Jws } from '@tbd54566975/dwn-sdk-js'; import { assert } from 'chai'; -import { createProfile } from './utils.js'; +import { createProfile, createRecordsWriteMessage } from './utils.js'; import type { Profile } from './utils.js'; import { WsApi } from '../src/ws-api.js'; import { createJsonRpcRequest } from '../src/lib/json-rpc.js'; import { clear as clearDwn, dwn } from './test-dwn.js'; +import { base64url } from 'multiformats/bases/base64'; +import { EventType } from '@tbd54566975/dwn-sdk-js'; +import { DwnInterfaceName } from '@tbd54566975/dwn-sdk-js'; +import { DwnMethodName } from '@tbd54566975/dwn-sdk-js'; describe('Subscription Manager Test', async () => { let server: http.Server; @@ -57,12 +65,16 @@ describe('Subscription Manager Test', async () => { const signer = await DidKeyResolver.generate(); const req = await SubscriptionRequest.create({ signer: Jws.createSigner(signer), + filter: { + eventType: EventType.Operation, + }, }); const port = (wsServer.address() as AddressInfo).port; const ip = (wsServer.address() as AddressInfo).address; const addr = `ws://${ip}:${port}`; const socket = new WebSocket(addr); + let receivedCount = 0; const socketPromise = new Promise((resolve, reject) => { // set up lisetner... @@ -72,7 +84,16 @@ describe('Subscription Manager Test', async () => { if (resp.error) { throw new Error(resp.error.message); } - resolve(event); + receivedCount += 1; + if ( + resp.result?.descriptor?.eventDescriptor?.interface === + DwnInterfaceName.Records && + resp.result?.descriptor?.eventDescriptor?.method === + DwnMethodName.Write + ) { + resolve(event); + socket.close(); + } return; } catch (error) { reject(error); @@ -114,10 +135,31 @@ describe('Subscription Manager Test', async () => { } catch (error) { reject(error); } + try { + const { recordsWrite, dataStream } = + await createRecordsWriteMessage(alice); + const dataBytes = await DataStream.toBytes(dataStream); + const encodedData = base64url.baseEncode(dataBytes); + + const requestId = uuidv4(); + const dwnRequest = createJsonRpcRequest( + requestId, + 'dwn.processMessage', + { + message: recordsWrite.toJSON(), + target: alice.did, + encodedData, + }, + ); + socket.send(JSON.stringify(dwnRequest)); + } catch (error) { + reject(error); + } return; }; }); await socketPromise; + assert.equal(receivedCount, 2, 'received count'); } catch (error) { assert.fail(error, undefined, 'failed to register subscription' + error); }