Skip to content

Commit

Permalink
initial subscriptions commit
Browse files Browse the repository at this point in the history
  • Loading branch information
andorsk committed Sep 27, 2023
1 parent 00ed815 commit 402dbb7
Show file tree
Hide file tree
Showing 5 changed files with 207 additions and 8 deletions.
4 changes: 4 additions & 0 deletions src/config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,4 +22,8 @@ export const config = {

// log level - trace/debug/info/warn/error
logLevel: process.env.DWN_SERVER_LOG_LEVEL || 'INFO',

subscriptionsEnabled:
{ on: true, off: false }[process.env.SUBSCRIPTIONS] ?? true,
// where to store persistant data
};
2 changes: 1 addition & 1 deletion src/dwn-server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ export class DwnServer {
this.httpServerShutdownHandler = new HttpServerShutdownHandler(httpServer);

if (this.config.webSocketServerEnabled) {
const wsServer = new WsApi(httpServer, this.dwn);
const wsServer = new WsApi(httpServer, this.dwn, this.config);
wsServer.listen();
}
}
Expand Down
174 changes: 174 additions & 0 deletions src/subscription-manager.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,174 @@
import type { JsonRpcSuccessResponse } from './lib/json-rpc.js';
import { SubscriptionRequestMessage } from '@tbd54566975/dwn-sdk-js';
import { v4 as uuidv4 } from 'uuid';
import type WebSocket from 'ws';
import { WebSocketServer } from 'ws';

import type { Dwn, SubscriptionFilter } from '@tbd54566975/dwn-sdk-js';
import type { EventMessage, PermissionsGrant } from '@tbd54566975/dwn-sdk-js';
import type {
MessageStore,
SubscriptionRequestReply,
} from '@tbd54566975/dwn-sdk-js';

export class Subscription {
from?: string;
subscriptionId: string;
createdAt: string;
description: string;
filters?: SubscriptionFilter[];
permissionGrant: PermissionsGrant;
connection: WebSocket;
}

export interface SubscriptionController {
clear(): Promise<void>;
close(): Promise<void>;
start(): Promise<void>;
subscribe(
request: RegisterSubscriptionRequest,
): Promise<RegisterSubscriptionReply>;
}

export type RegisterSubscriptionRequest = {
from: string;
socket: WebSocket;
filters?: SubscriptionFilter[];
permissionGrant: PermissionsGrant;
subscriptionRequestMessage: SubscriptionRequestMessage;
};

export type RegisterSubscriptionReply = {
reply: SubscriptionRequestReply;
subscriptionId?: string;
};

export type defaultSubscriptionChannel = 'event';

export type SubscriptionManagerOptions = {
subscriptionChannel: string;
wss: WebSocketServer;
dwn: Dwn;
messageStore: MessageStore;
tenant: string;
};

export class SubscriptionManager {
private wss: WebSocketServer;
private dwn: Dwn;
private connections: Map<string, Subscription>;
private messageStore: MessageStore;
private tenant: string;
options: SubscriptionManagerOptions;
#open: boolean;

constructor(options?: SubscriptionManagerOptions) {
this.wss = options?.wss || new WebSocketServer();
this.connections = new Map();
this.messageStore = options?.messageStore;
this.tenant = options?.tenant;
this.dwn = options?.dwn;
this.options = options;

this.wss.on('connection', (socket: WebSocket) => {
socket.on('subscribe', async (data) => {
await this.handleSubscribe(socket, data);
});
});
}

async clear(): Promise<void> {
this.wss.removeAllListeners();
this.connections.clear();
}

async close(): Promise<void> {
this.#open = false;
this.connections.clear();
this.wss.close();
}

async open(): Promise<void> {
this.#open = true;
}

async start(): Promise<void> {
this.open();
}

private async createSubscription(
from: string,
request: RegisterSubscriptionRequest,
): Promise<Subscription> {
return {
from,
subscriptionId: uuidv4(),
createdAt: new Date().toISOString(),
description: 'subscription',
filters: request.filters,
permissionGrant: request.permissionGrant,
connection: request.socket,
};
}

async handleSubscribe(
socket: WebSocket,
data: any,
): Promise<RegisterSubscriptionReply> {
// parse message
const req = SubscriptionRequestMessage.parse(data);
return await this.subscribe(req, socket);
}

createJSONRPCEvent(e: EventMessage): JsonRpcSuccessResponse {
return {
id: uuidv4(),
jsonrpc: '2.0',
result: e,
};
}

async subscribe(
req: RegisterSubscriptionRequest,
socket: WebSocket,
): Promise<RegisterSubscriptionReply> {
const subscriptionReply = await this.dwn.handleSubscriptionRequest(
this.tenant,
req.subscriptionRequestMessage,
);
if (subscriptionReply.status.code !== 200) {
return { reply: subscriptionReply };
}
const subscription = await this.createSubscription(req.from, req);
this.registerSubscription(subscription);
// set up forwarding.
subscriptionReply.subscription.emitter.on(
async (e: EventMessage): Promise<void> => {
const jsonRpcResponse = this.createJSONRPCEvent(e);
const str = JSON.stringify(jsonRpcResponse);
return socket.send(Buffer.from(str));
},
);
}

private async registerSubscription(
subscription: Subscription,
): Promise<void> {
if (!this.#open) {
throw new Error("Can't register subscription. It's not opened.");
}
if (this.connections.has(subscription.subscriptionId)) {
throw new Error(
'Failed to add connection to controller. ID already exists.',
);
}
this.connections.set(subscription.subscriptionId, subscription);
subscription.connection.on('close', () => {
this.deleteSubscription(subscription.subscriptionId);
});
}

private async deleteSubscription(id: string): Promise<void> {
this.connections.delete(id);
}
}
21 changes: 14 additions & 7 deletions src/ws-api.ts
Original file line number Diff line number Diff line change
@@ -1,15 +1,17 @@
import type { Dwn } from '@tbd54566975/dwn-sdk-js';
import type { JsonRpcResponse } from './lib/json-rpc.js';
import type { RequestContext } from './lib/json-rpc-router.js';
import type { Server } from 'http';
import type { AddressInfo, WebSocket } from 'ws';

import { base64url } from 'multiformats/bases/base64';
import type { Config } from './config.js';
import { DataStream } from '@tbd54566975/dwn-sdk-js';
import type { Dwn } from '@tbd54566975/dwn-sdk-js';
import { jsonRpcApi } from './json-rpc-api.js';
import type { JsonRpcResponse } from './lib/json-rpc.js';
import type { RequestContext } from './lib/json-rpc-router.js';
import { requestCounter } from './metrics.js';
import type { Server } from 'http';
import { SubscriptionManager } from './subscription-manager.js';
import { v4 as uuidv4 } from 'uuid';
import { WebSocketServer } from 'ws';

import type { AddressInfo, WebSocket } from 'ws';
import {
createJsonRpcErrorResponse,
JsonRpcErrorCodes,
Expand All @@ -20,10 +22,14 @@ const SOCKET_ISALIVE_SYMBOL = Symbol('isAlive');
export class WsApi {
wsServer: WebSocketServer;
dwn: Dwn;
config?: Config;
subscriptionManager: SubscriptionManager;

constructor(server: Server, dwn: Dwn) {
constructor(server: Server, dwn: Dwn, config: Config) {
this.dwn = dwn;
this.wsServer = new WebSocketServer({ server: server });
this.config = config;
this.subscriptionManager = new SubscriptionManager();
}

// TODO: github.com/TBD54566975/dwn-server/issues/49 Add code coverage tracker, similar to either dwn-sdk-js or to web5-js
Expand Down Expand Up @@ -98,6 +104,7 @@ export class WsApi {
transport: 'ws',
dataStream: requestDataStream,
};

const { jsonRpcResponse } = await jsonRpcApi.handle(
dwnRequest,
requestContext,
Expand Down
14 changes: 14 additions & 0 deletions tests/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import {
DataStream,
DidKeyResolver,
RecordsWrite,
SubscriptionRequest,
} from '@tbd54566975/dwn-sdk-js';

// __filename and __dirname are not defined in ES module scope
Expand Down Expand Up @@ -69,6 +70,19 @@ export type GenerateProtocolsConfigureOutput = {
dataStream: Readable | undefined;
};

export type CreateSubscriptionRequestOverride = {};

export async function createSubscriptionRequest(
signer: Profile,
overrides: CreateSubscriptionRequestOverride,
): Promise<SubscriptionRequest> {
console.log(overrides);
const subscriptionRequest = await SubscriptionRequest.create({
authorizationSignatureInput: signer.signatureInput,
});
return subscriptionRequest;
}

export async function createRecordsWriteMessage(
signer: Profile,
overrides: CreateRecordsWriteOverrides = {},
Expand Down

0 comments on commit 402dbb7

Please sign in to comment.