Skip to content

Commit

Permalink
add JSONRPCSocket class to handle requests and long-running subscript…
Browse files Browse the repository at this point in the history
…ions over sockets, added test client helpers for unit testing, added some initial unit tests
  • Loading branch information
LiranCohen committed Feb 12, 2024
1 parent a98733a commit 56b4976
Show file tree
Hide file tree
Showing 6 changed files with 377 additions and 19 deletions.
4 changes: 1 addition & 3 deletions src/http-api.ts
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,7 @@ export class HttpApi {
registrationManager: RegistrationManager;
dwn: Dwn;

constructor(config: DwnServerConfig, dwn: Dwn, registrationManager: RegistrationManager) {
console.log(config);

constructor(config: DwnServerConfig, dwn: Dwn, registrationManager?: RegistrationManager) {
this.#config = config;
this.#api = express();
this.#server = http.createServer(this.#api);
Expand Down
89 changes: 89 additions & 0 deletions src/json-rpc-socket.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
import { v4 as uuidv4 } from 'uuid';
import WebSocket from 'ws';

import type { JsonRpcRequest, JsonRpcResponse } from "./lib/json-rpc.js";

const CONNECT_TIMEOUT = 3_000;

export class JSONRPCSocket {
private isOpen = false;
constructor(private socket: WebSocket) {
socket.onopen = this.open;
}

static async connect(url: string): Promise<JSONRPCSocket> {

const onclose = ():void => {
console.log('json rpc close');
};

const onerror = (event: any):void => {
console.log('json rpc error', event);
};

const socket = new WebSocket(url);
socket.onclose = onclose;
socket.onerror = onerror;

return new Promise<JSONRPCSocket>((resolve, reject) => {
socket.on('open', () => {
resolve(new JSONRPCSocket(socket));
});

setTimeout(() => reject, CONNECT_TIMEOUT);
});
}

open(): void {
this.isOpen = true;
}

close(): void {
this.isOpen = false;
this.socket.close();
}

/**
* Sends a JSON-RPC request through the socket. You must subscribe to a message listener separately to capture the response.
*/
send(request: JsonRpcRequest):void {
return this.socket.send(Buffer.from(JSON.stringify(request)));
}

subscribe(request: JsonRpcRequest, listener: (response: JsonRpcResponse) => void): { close: () => void } {
request.id ??= uuidv4();

const messageHandler = (event: { data: any }):void => {
const jsonRpcResponse = JSON.parse(event.data.toString()) as JsonRpcResponse;
if (jsonRpcResponse.id === request.id) {
return listener(jsonRpcResponse);
}
};

this.socket.addEventListener('message', messageHandler);
this.send(request);

return {
close: ():void => {
this.socket.removeEventListener('message', messageHandler);
}
};
}

async request(request: JsonRpcRequest): Promise<JsonRpcResponse> {
return new Promise((resolve) => {
request.id ??= uuidv4();

const handleResponse = (event: { data: any }):void => {
const jsonRpsResponse = JSON.parse(event.data.toString()) as JsonRpcResponse;
if (jsonRpsResponse.id === request.id) {
this.socket.removeEventListener('message', handleResponse);
return resolve(jsonRpsResponse);
}
};

this.socket.addEventListener('message', handleResponse);
this.send(request);
});
}
}
2 changes: 1 addition & 1 deletion tests/http-api.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ describe('http api', function () {
const proofOfWorkInitialMaximumAllowedHash = config.registrationProofOfWorkInitialMaxHash;
registrationManager = await RegistrationManager.create({ registrationStoreUrl, termsOfServiceFilePath, proofOfWorkInitialMaximumAllowedHash });

dwn = await getTestDwn(registrationManager);
dwn = await getTestDwn({ tenantGate: registrationManager });

httpApi = new HttpApi(config, dwn, registrationManager);

Expand Down
12 changes: 8 additions & 4 deletions tests/test-dwn.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import type { TenantGate } from '@tbd54566975/dwn-sdk-js';
import { Dwn } from '@tbd54566975/dwn-sdk-js';
import { Dwn, EventEmitterStream } from '@tbd54566975/dwn-sdk-js';
import {
DataStoreSql,
EventLogSql,
Expand All @@ -8,20 +8,24 @@ import {

import { getDialectFromURI } from '../src/storage.js';

export async function getTestDwn(
tenantGate?: TenantGate
): Promise<Dwn> {
export async function getTestDwn(options: {
tenantGate?: TenantGate,
withEvents?: boolean,
} = {}): Promise<Dwn> {
const { tenantGate, withEvents = false } = options;
const db = getDialectFromURI(new URL('sqlite://'));
const dataStore = new DataStoreSql(db);
const eventLog = new EventLogSql(db);
const messageStore = new MessageStoreSql(db);
const eventStream = withEvents ? new EventEmitterStream() : undefined;

let dwn: Dwn;
try {
dwn = await Dwn.create({
eventLog,
dataStore,
messageStore,
eventStream,
tenantGate,
});
} catch (e) {
Expand Down
131 changes: 130 additions & 1 deletion tests/utils.ts
Original file line number Diff line number Diff line change
@@ -1,14 +1,20 @@
import type { Persona } from '@tbd54566975/dwn-sdk-js';
import type { GenericMessage, MessageSubscriptionHandler, Persona, UnionMessageReply } from '@tbd54566975/dwn-sdk-js';
import { Cid, DataStream, RecordsWrite } from '@tbd54566975/dwn-sdk-js';

import type { ReadStream } from 'node:fs';
import fs from 'node:fs';
import http from 'node:http';
import path from 'path';
import { v4 as uuidv4 } from 'uuid';
import fetch from 'node-fetch';
import type { Readable } from 'readable-stream';
import { fileURLToPath } from 'url';
import { WebSocket } from 'ws';

import type { JsonRpcResponse, JsonRpcRequest } from '../src/lib/json-rpc.js';
import { createJsonRpcRequest } from '../src/lib/json-rpc.js';
import { JSONRPCSocket } from '../src/json-rpc-socket.js';

// __filename and __dirname are not defined in ES module scope
const __filename = fileURLToPath(import.meta.url);
const __dirname = path.dirname(__filename);
Expand Down Expand Up @@ -139,6 +145,65 @@ export function streamHttpRequest(
});
}

export async function sendHttpMessage(options: {
url: string,
target: string,
message: GenericMessage,
data?: any,
}): Promise<UnionMessageReply> {
const { url, target, message, data } = options;
// First RecordsWrite that creates the record.
const requestId = uuidv4();
const jsonRpcRequest = createJsonRpcRequest(requestId, 'dwn.processMessage', {
target,
message,
});

const fetchOpts = {
method : 'POST',
headers : {
'dwn-request': JSON.stringify(jsonRpcRequest)
}
};

if (data !== undefined) {
fetchOpts.headers['content-type'] = 'application/octet-stream';
fetchOpts['body'] = data;
}

const resp = await fetch(url, fetchOpts);
let dwnRpcResponse: JsonRpcResponse;

// check to see if response is in header first. if it is, that means the response is a ReadableStream
let dataStream;
const { headers } = resp;
if (headers.has('dwn-response')) {
const jsonRpcResponse = JSON.parse(headers.get('dwn-response')) as JsonRpcResponse;

if (jsonRpcResponse == null) {
throw new Error(`failed to parse json rpc response. dwn url: ${url}`);
}

dataStream = resp.body;
dwnRpcResponse = jsonRpcResponse;
} else {
const responseBody = await resp.text();
dwnRpcResponse = JSON.parse(responseBody);
}

if (dwnRpcResponse.error) {
const { code, message } = dwnRpcResponse.error;
throw new Error(`(${code}) - ${message}`);
}

const { reply } = dwnRpcResponse.result;
if (dataStream) {
reply['record']['data'] = dataStream;
}

return reply as UnionMessageReply;
}

export async function sendWsMessage(
address: string,
message: any,
Expand All @@ -156,3 +221,67 @@ export async function sendWsMessage(
};
});
}

const MAX_RESPONSE_TIMEOUT = 3_000;

export async function subscriptionRequest(
url: string,
request: JsonRpcRequest,
messageHandler: MessageSubscriptionHandler
): Promise<{ status: any, subscription?: { id: string, close: () => Promise<void> } }> {
let resolved: boolean = false;
const { params: { target } } = request;
const connection = await JSONRPCSocket.connect(url);

const closeSubscription = async (id: string, target: string, connection: JSONRPCSocket): Promise<JsonRpcResponse> => {
const requestId = crypto.randomUUID();
const request = createJsonRpcRequest(requestId, 'subscriptions.close', { subscriptionId: id, target });
return await connection.request(request);
}

return new Promise<{ status: any, subscription?: { id: string, close: () => Promise<void> } }>((resolve, reject) => {
const { close: subscriptionClose } = connection.subscribe(request, (response) => {
const { result, error } = response;

// this is an error specific to the `JsonRpcRequest` requesting the subscription
if (error) {
reject(error);
return;
}

// at this point the reply should be DwnRpcResponse
const { status, record, subscription } = result.reply;
if (record) {
messageHandler(record);
return;
}

if (subscription) {
resolved = true;

resolve({
status,
subscription: {
...subscription,
close: async (): Promise<void> => {
subscriptionClose();
const closeResponse = await closeSubscription(subscription.id, target, connection);
if (closeResponse.error?.message !== undefined) {
throw new Error(`unable to close subscription: ${closeResponse.error.message}`);
}
}
}
})
} else {
resolve({ status });
}
});

setTimeout(() => {
if (resolved) {
return;
};
return reject('subscription request timeout');
}, MAX_RESPONSE_TIMEOUT);
});
}
Loading

0 comments on commit 56b4976

Please sign in to comment.