Skip to content

Commit

Permalink
updated message processing and added handlers to ws-api
Browse files Browse the repository at this point in the history
  • Loading branch information
andorsk committed Sep 28, 2023
1 parent 3547c74 commit 2c16e0c
Show file tree
Hide file tree
Showing 6 changed files with 125 additions and 87 deletions.
35 changes: 27 additions & 8 deletions src/json-rpc-handlers/dwn/process-message.ts
Original file line number Diff line number Diff line change
@@ -1,19 +1,19 @@
import type { Readable as IsomorphicReadable } from 'readable-stream';
import type { RecordsReadReply } from '@tbd54566975/dwn-sdk-js';
import { DwnInterfaceName, DwnMethodName } from '@tbd54566975/dwn-sdk-js';
import type {
HandlerResponse,
JsonRpcHandler,
} from '../../lib/json-rpc-router.js';

import { v4 as uuidv4 } from 'uuid';
import { DwnInterfaceName, DwnMethodName } from '@tbd54566975/dwn-sdk-js';

import {
JsonRpcErrorCodes,
createJsonRpcErrorResponse,
createJsonRpcSuccessResponse,
JsonRpcErrorCodes,
} from '../../lib/json-rpc.js';

import type { Readable as IsomorphicReadable } from 'readable-stream';
import type { RecordsReadReply } from '@tbd54566975/dwn-sdk-js';
import type { SubscriptionRequestReply } from '@tbd54566975/dwn-sdk-js';

Check failure on line 14 in src/json-rpc-handlers/dwn/process-message.ts

View workflow job for this annotation

GitHub Actions / test

Module '"@tbd54566975/dwn-sdk-js"' has no exported member 'SubscriptionRequestReply'.
import { v4 as uuidv4 } from 'uuid';

export const handleDwnProcessMessage: JsonRpcHandler = async (
dwnRequest,
context,
Expand All @@ -36,14 +36,33 @@ export const handleDwnProcessMessage: JsonRpcHandler = async (
!dataStream
) {
reply = await dwn.synchronizePrunedInitialRecordsWrite(target, message);
} else if (
messageType ===
DwnInterfaceName.Subscriptions + DwnMethodName.Request

Check failure on line 41 in src/json-rpc-handlers/dwn/process-message.ts

View workflow job for this annotation

GitHub Actions / test

Property 'Subscriptions' does not exist on type 'typeof DwnInterfaceName'.
) {
reply = (await dwn.processMessage(
target,
message,
)) as SubscriptionRequestReply;
if (!context.subscriptionManager || context.socket) {
throw new Error(
'setup failure. improper context provided for subscription',
);
}
const req = {
socket: context.socket,
from: dwnRequest.params?.descriptor,
request: {},
};
const subscription = await context.subscriptionManager.subscribe(req);
console.log(subscription);
} else {
reply = (await dwn.processMessage(
target,
message,
dataStream as IsomorphicReadable,
)) as RecordsReadReply;
}

// RecordsRead messages return record data as a stream to for accommodate large amounts of data
let recordDataStream;
if (reply?.record?.data !== undefined) {
Expand Down
7 changes: 6 additions & 1 deletion src/lib/json-rpc-router.ts
Original file line number Diff line number Diff line change
@@ -1,11 +1,16 @@
import type { JsonRpcRequest, JsonRpcResponse } from './json-rpc.js';

import type { Dwn } from '@tbd54566975/dwn-sdk-js';
import type { Readable } from 'node:stream';
import type { JsonRpcRequest, JsonRpcResponse } from './json-rpc.js';
import type { SubscriptionController } from '../subscription-manager.js';
import type { WebSocket } from 'ws';

export type RequestContext = {
dwn: Dwn;
transport: 'http' | 'ws';
dataStream?: Readable;
socket?: WebSocket;
subscriptionManager?: SubscriptionController;
};

export type HandlerResponse = {
Expand Down
8 changes: 3 additions & 5 deletions src/subscription-manager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ import type { Dwn, SubscriptionFilter } from '@tbd54566975/dwn-sdk-js';
import type { EventMessage, PermissionsGrant } from '@tbd54566975/dwn-sdk-js';

Check failure on line 2 in src/subscription-manager.ts

View workflow job for this annotation

GitHub Actions / test

'"@tbd54566975/dwn-sdk-js"' has no exported member named 'EventMessage'. Did you mean 'EventsGetMessage'?

import type { JsonRpcSuccessResponse } from './lib/json-rpc.js';
import type { MessageStore } from '@tbd54566975/dwn-sdk-js';
import { SubscriptionRequest } from '@tbd54566975/dwn-sdk-js';

Check failure on line 5 in src/subscription-manager.ts

View workflow job for this annotation

GitHub Actions / test

Module '"@tbd54566975/dwn-sdk-js"' has no exported member 'SubscriptionRequest'.
import type { SubscriptionRequestReply } from '@tbd54566975/dwn-sdk-js';

Check failure on line 6 in src/subscription-manager.ts

View workflow job for this annotation

GitHub Actions / test

Module '"@tbd54566975/dwn-sdk-js"' has no exported member 'SubscriptionRequestReply'.
import type WebSocket from 'ws';
Expand Down Expand Up @@ -46,29 +45,28 @@ export type defaultSubscriptionChannel = 'event';
export type SubscriptionManagerOptions = {
wss?: WebSocketServer;
dwn: Dwn;
messageStore: MessageStore;
tenant: string;
};

export class SubscriptionManager {
private wss: WebSocketServer;
private dwn: Dwn;
private connections: Map<string, Subscription>;
private messageStore: MessageStore;
private tenant: string;
options: SubscriptionManagerOptions;
#open: boolean;

constructor(options?: SubscriptionManagerOptions) {
this.wss = options?.wss || new WebSocketServer();
this.connections = new Map();
this.messageStore = options?.messageStore;
this.tenant = options?.tenant;
this.dwn = options?.dwn;
this.options = options;

this.wss.on('connection', (socket: WebSocket) => {
socket.on('subscribe', async (data) => {
console.log('connected');
socket.on('message', async (data) => {
console.log('got message...');
await this.handleSubscribe(socket, data);
});
});
Expand Down
18 changes: 12 additions & 6 deletions src/ws-api.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import { base64url } from 'multiformats/bases/base64';
import { v4 as uuidv4 } from 'uuid';
import { DataStream, type Dwn } from '@tbd54566975/dwn-sdk-js';
import type { IncomingMessage, Server } from 'http';
import { type IncomingMessage, type Server } from 'http';
import { type AddressInfo, type WebSocket, WebSocketServer } from 'ws';

import { jsonRpcApi } from './json-rpc-api.js';
Expand All @@ -12,17 +12,27 @@ import {
JsonRpcErrorCodes,
type JsonRpcResponse,
} from './lib/json-rpc.js';
import {
SubscriptionManager,
type SubscriptionController,
} from './subscription-manager.js';

const SOCKET_ISALIVE_SYMBOL = Symbol('isAlive');
const HEARTBEAT_INTERVAL = 30_000;

export class WsApi {
#wsServer: WebSocketServer;
dwn: Dwn;
#subscriptionManager: SubscriptionController;

constructor(server: Server, dwn: Dwn) {
this.dwn = dwn;
this.#wsServer = new WebSocketServer({ server });
this.#subscriptionManager = new SubscriptionManager({
dwn: dwn,
tenant: 'asdf',
wss: this.#wsServer,
});
}

// TODO: github.com/TBD54566975/dwn-server/issues/49 Add code coverage tracker, similar to either dwn-sdk-js or to web5-js
Expand Down Expand Up @@ -63,7 +73,6 @@ export class WsApi {

socket.on('message', async function (dataBuffer) {
let dwnRequest;

try {
// deserialize bytes into JSON object
dwnRequest = dataBuffer.toString();
Expand All @@ -77,15 +86,13 @@ export class WsApi {
const responseBuffer = WsApi.jsonRpcResponseToBuffer(jsonRpcResponse);
return socket.send(responseBuffer);
}

dwnRequest = JSON.parse(dwnRequest);
} catch (e) {
const jsonRpcResponse = createJsonRpcErrorResponse(
uuidv4(),
JsonRpcErrorCodes.BadRequest,
e.message,
);

const responseBuffer = WsApi.jsonRpcResponseToBuffer(jsonRpcResponse);
return socket.send(responseBuffer);
}
Expand All @@ -101,6 +108,7 @@ export class WsApi {
transport: 'ws',
dataStream: requestDataStream,
};

const { jsonRpcResponse } = await jsonRpcApi.handle(
dwnRequest,
requestContext,
Expand Down Expand Up @@ -140,9 +148,7 @@ export class WsApi {

#setupWebSocket(): void {
this.#wsServer.on('connection', this.#handleConnection.bind(this));

const heartbeatInterval = this.#setupHeartbeat();

this.#wsServer.on('close', function close() {
clearInterval(heartbeatInterval);
});
Expand Down
136 changes: 72 additions & 64 deletions tests/subscription-manager.spec.ts
Original file line number Diff line number Diff line change
@@ -1,112 +1,120 @@
import http from 'node:http';
import type { AddressInfo } from 'ws';
import { WebSocket, type WebSocketServer } from 'ws';
import { v4 as uuidv4 } from 'uuid';

import {
DataStoreLevel,
DidKeyResolver,
Dwn,
EventLogLevel,
MessageStoreLevel,
SubscriptionRequest,
} from '@tbd54566975/dwn-sdk-js';
import { DidKeyResolver, SubscriptionRequest } from '@tbd54566975/dwn-sdk-js';

Check failure on line 6 in tests/subscription-manager.spec.ts

View workflow job for this annotation

GitHub Actions / test

Module '"@tbd54566975/dwn-sdk-js"' has no exported member 'SubscriptionRequest'.

import { Jws } from '@tbd54566975/dwn-sdk-js';
import type { SubscriptionController } from '../src/subscription-manager.js';
import { SubscriptionManager } from '../src/subscription-manager.js';
import { assert } from 'chai';
import { createProfile } from './utils.js';
import type { Profile } from './utils.js';
import { WsApi } from '../src/ws-api.js';
import { createJsonRpcRequest } from '../src/lib/json-rpc.js';
import { clear as clearDwn, dwn } from './test-dwn.js';

describe('Subscription Manager Test', async () => {
let subscriptionManager: SubscriptionController;
let wsServer: WebSocketServer;
let server: http.Server;
let dataStore: DataStoreLevel;
let eventLog: EventLogLevel;
let messageStore: MessageStoreLevel;
let wsServer: WebSocketServer;
let alice: Profile;
let dwn: Dwn;
let socket: WebSocket;

before(async () => {
// Setup data stores...
dataStore = new DataStoreLevel({
blockstoreLocation: 'data/DATASTORE',
});
eventLog = new EventLogLevel({ location: 'data/EVENTLOG' });
messageStore = new MessageStoreLevel({
blockstoreLocation: 'data/MESSAGESTORE',
indexLocation: 'data/INDEX',
});

// create profile
alice = await createProfile();
// create Dwn
dwn = await Dwn.create({ eventLog, dataStore, messageStore });

// create listeners...
server = http.createServer();
server.listen(9002, '127.0.0.1');

const wsApi = new WsApi(server, dwn);
wsServer = wsApi.start();

alice = await createProfile();
// starts the ws server
// create subscription manager...
subscriptionManager = new SubscriptionManager({
dwn: dwn,
messageStore: messageStore,
tenant: alice.did,
wss: wsServer,
});
return;
});

// before each, clear the subscriptions
beforeEach(async () => {
subscriptionManager.clear();
await dataStore.clear();
await eventLog.clear();
await messageStore.clear();
// subscriptionManager.clear();
});

afterEach(async () => {
await clearDwn();
});

// close at the end
after(async () => {
await subscriptionManager.close();
//await subscriptionManager.close();
wsServer.close();
server.close();
server.closeAllConnections();
socket.close();
if (socket) {
socket.close();
}
});

it('test subscription manager registration', async () => {
try {
const signer = await DidKeyResolver.generate();

// create a subscription request
const req = await SubscriptionRequest.create({
signer: Jws.createSigner(signer),
});

// setup a socket connection to wsServer
const socket = new WebSocket(wsServer.address.toString());
socket.onopen = async (): Promise<void> => {
console.log('sending req', req);
// send a subscription request
// const subscription = await subscriptionManager.subscribe({
// from: alice.did,
// subscriptionRequestMessage: req,
// permissionGrant: 'asdf',
// });
socket.send('subscription request');
return;
};
const port = (wsServer.address() as AddressInfo).port;
const ip = (wsServer.address() as AddressInfo).address;
const addr = `ws://${ip}:${port}`;
const socket = new WebSocket(addr);

socket.onmessage = (event): Promise<void> => {
console.log('got message', event);
return;
};
const socketPromise = new Promise<any>((resolve, reject) => {
// set up lisetner...
socket.onmessage = (event): Promise<void> => {
try {
console.log('got message');
resolve(event);
return;
} catch (error) {
reject(error);
}
};

socket.onerror = (error): void => {
reject(error); // Reject the promise if there's an error with the socket
};

socket.onclose = (event): void => {
if (event.wasClean) {
console.log(
`Connection closed cleanly, code=${event.code}, reason=${event.reason}`,
);
} else {
console.error(`Connection abruptly closed`);
}
reject(new Error(`Connection closed: ${event.reason}`)); // Reject the promise on socket close
};

socket.onopen = async (): Promise<void> => {
const requestId = uuidv4();
const dwnRequest = createJsonRpcRequest(
requestId,
'dwn.processMessage',
{
message: req.toJSON(),
target: alice.did,
},
);
try {
if (socket.readyState !== WebSocket.OPEN) {
reject(new Error('socket not open'));
}
socket.send(JSON.stringify(dwnRequest));
} catch (error) {
reject(error);
}
return;
};
});
await socketPromise;
} catch (error) {
assert.fail(error, undefined, 'failed to register subscription');
assert.fail(error, undefined, 'failed to register subscription' + error);
}
});
});
Loading

0 comments on commit 2c16e0c

Please sign in to comment.