Skip to content

Commit

Permalink
adding subscription manager
Browse files Browse the repository at this point in the history
  • Loading branch information
andorsk committed Sep 27, 2023
1 parent 06fc2bc commit d1126c8
Show file tree
Hide file tree
Showing 4 changed files with 238 additions and 0 deletions.
4 changes: 4 additions & 0 deletions src/config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,4 +22,8 @@ export const config = {

// log level - trace/debug/info/warn/error
logLevel: process.env.DWN_SERVER_LOG_LEVEL || 'INFO',

subscriptionsEnabled:
{ on: true, off: false }[process.env.SUBSCRIPTIONS] ?? true,
// where to store persistant data
};
174 changes: 174 additions & 0 deletions src/subscription-manager.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,174 @@
import type { Dwn, SubscriptionFilter } from '@tbd54566975/dwn-sdk-js';
import type { EventMessage, PermissionsGrant } from '@tbd54566975/dwn-sdk-js';
import type {
MessageStore,
SubscriptionRequestReply,
} from '@tbd54566975/dwn-sdk-js';

import type { JsonRpcSuccessResponse } from './lib/json-rpc.js';
import { SubscriptionRequest } from '@tbd54566975/dwn-sdk-js';
import type WebSocket from 'ws';
import { WebSocketServer } from 'ws';
import { v4 as uuidv4 } from 'uuid';

export class Subscription {
from?: string;
subscriptionId: string;
createdAt: string;
description: string;
filters?: SubscriptionFilter[];
permissionGrant: PermissionsGrant;
connection: WebSocket;
}

export interface SubscriptionController {
clear(): Promise<void>;
close(): Promise<void>;
start(): Promise<void>;
subscribe(
request: RegisterSubscriptionRequest,
): Promise<RegisterSubscriptionReply>;
}

export type RegisterSubscriptionRequest = {
from: string;
socket: WebSocket;
filters?: SubscriptionFilter[];
permissionGrant: PermissionsGrant;
subscriptionRequestMessage: SubscriptionRequest;
};

export type RegisterSubscriptionReply = {
reply: SubscriptionRequestReply;
subscriptionId?: string;
};

export type defaultSubscriptionChannel = 'event';

export type SubscriptionManagerOptions = {
subscriptionChannel: string;
wss: WebSocketServer;
dwn: Dwn;
messageStore: MessageStore;
tenant: string;
};

export class SubscriptionManager {
private wss: WebSocketServer;
private dwn: Dwn;
private connections: Map<string, Subscription>;
private messageStore: MessageStore;
private tenant: string;
options: SubscriptionManagerOptions;
#open: boolean;

constructor(options?: SubscriptionManagerOptions) {
this.wss = options?.wss || new WebSocketServer();
this.connections = new Map();
this.messageStore = options?.messageStore;
this.tenant = options?.tenant;
this.dwn = options?.dwn;
this.options = options;

this.wss.on('connection', (socket: WebSocket) => {
socket.on('subscribe', async (data) => {
await this.handleSubscribe(socket, data);
});
});
}

async clear(): Promise<void> {
this.wss.removeAllListeners();
this.connections.clear();
}

async close(): Promise<void> {
this.#open = false;
this.connections.clear();
this.wss.close();
}

async open(): Promise<void> {
this.#open = true;
}

async start(): Promise<void> {
this.open();
}

private async createSubscription(
from: string,
request: RegisterSubscriptionRequest,
): Promise<Subscription> {
return {
from,
subscriptionId: uuidv4(),
createdAt: new Date().toISOString(),
description: 'subscription',
filters: request.filters,
permissionGrant: request.permissionGrant,
connection: request.socket,
};
}

async handleSubscribe(
socket: WebSocket,
data: any,
): Promise<RegisterSubscriptionReply> {
// parse message
const req = SubscriptionRequest.parse(data);
return await this.subscribe(req, socket);
}

createJSONRPCEvent(e: EventMessage): JsonRpcSuccessResponse {
return {
id: uuidv4(),
jsonrpc: '2.0',
result: e,
};
}

async subscribe(
req: RegisterSubscriptionRequest,
socket: WebSocket,
): Promise<RegisterSubscriptionReply> {
const subscriptionReply = await this.dwn.handleSubscriptionRequest(
this.tenant,
req.subscriptionRequestMessage,
);
if (subscriptionReply.status.code !== 200) {
return { reply: subscriptionReply };
}
const subscription = await this.createSubscription(req.from, req);
this.registerSubscription(subscription);
// set up forwarding.
subscriptionReply.subscription.emitter.on(
async (e: EventMessage): Promise<void> => {
const jsonRpcResponse = this.createJSONRPCEvent(e);
const str = JSON.stringify(jsonRpcResponse);
return socket.send(Buffer.from(str));
},
);
}

private async registerSubscription(
subscription: Subscription,
): Promise<void> {
if (!this.#open) {
throw new Error("Can't register subscription. It's not opened.");
}
if (this.connections.has(subscription.subscriptionId)) {
throw new Error(
'Failed to add connection to controller. ID already exists.',
);
}
this.connections.set(subscription.subscriptionId, subscription);
subscription.connection.on('close', () => {
this.deleteSubscription(subscription.subscriptionId);
});
}

private async deleteSubscription(id: string): Promise<void> {
this.connections.delete(id);
}
}
46 changes: 46 additions & 0 deletions tests/subscription-manager.spec.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
import { assert } from 'chai';
import { createProfile } from './utils.js';
import { Jws } from '@tbd54566975/dwn-sdk-js';
import type { SubscriptionController } from '../src/subscription-manager.js';
import { SubscriptionManager } from '../src/subscription-manager.js';

describe('Subscription Manager Test', () => {
let subscriptionManager: SubscriptionController;

// important to follow the `before` and `after` pattern to initialize and clean the stores in tests
// so that different test suites can reuse the same backend store for testing
before(async () => {
subscriptionManager = new SubscriptionManager({});
});

// before each, clear the subscriptions
beforeEach(async () => {
subscriptionManager.clear();
});

// close at the end
after(async () => {
await subscriptionManager.close();
});

it('test subscription manager registration', async () => {
try {
const alice = await createProfile();
const req = await SubscriptionRequest.create({
filter: {
eventType: EventType.Operation,
},
authorizationSignatureInput: Jws.createSignatureInput(alice),
});
const subscription = await subscriptionManager.subscribe({
from: alice.did,
subscriptionRequestMessage: req,
permissionGrant: 'asdf',
});
assert.isDefined(subscription.reply);
assert.isDefined(subscription.subscriptionId);
} catch (error) {
assert.fail(error, undefined, 'failed to register subscription');
}
});
});
14 changes: 14 additions & 0 deletions tests/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import {
DidKeyResolver,
PrivateKeySigner,
RecordsWrite,
SubscriptionRequest,
} from '@tbd54566975/dwn-sdk-js';

// __filename and __dirname are not defined in ES module scope
Expand Down Expand Up @@ -67,6 +68,19 @@ export type GenerateProtocolsConfigureOutput = {
dataStream: Readable | undefined;
};

export type CreateSubscriptionRequestOverride = {};

export async function createSubscriptionRequest(
signer: Profile,
overrides: CreateSubscriptionRequestOverride,
): Promise<SubscriptionRequest> {
console.log(overrides);
const subscriptionRequest = await SubscriptionRequest.create({
authorizationSignatureInput: signer.signatureInput,
});
return subscriptionRequest;
}

export async function createRecordsWriteMessage(
signer: Profile,
overrides: CreateRecordsWriteOverrides = {},
Expand Down

0 comments on commit d1126c8

Please sign in to comment.