Skip to content

Commit

Permalink
dwn-server test passing
Browse files Browse the repository at this point in the history
  • Loading branch information
andorsk committed Oct 1, 2023
1 parent 2c16e0c commit f3c0b62
Show file tree
Hide file tree
Showing 4 changed files with 46 additions and 31 deletions.
45 changes: 26 additions & 19 deletions src/json-rpc-handlers/dwn/process-message.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,14 @@
import { DwnInterfaceName, DwnMethodName } from '@tbd54566975/dwn-sdk-js';
import { v4 as uuidv4 } from 'uuid';
import type { Readable as IsomorphicReadable } from 'readable-stream';
import type {
RecordsReadReply,
SubscriptionRequestReply,

Check failure on line 5 in src/json-rpc-handlers/dwn/process-message.ts

View workflow job for this annotation

GitHub Actions / test

Module '"@tbd54566975/dwn-sdk-js"' has no exported member 'SubscriptionRequestReply'.
} from '@tbd54566975/dwn-sdk-js';
import {
DwnInterfaceName,
DwnMethodName,
SubscriptionRequest,

Check failure on line 10 in src/json-rpc-handlers/dwn/process-message.ts

View workflow job for this annotation

GitHub Actions / test

Module '"@tbd54566975/dwn-sdk-js"' has no exported member 'SubscriptionRequest'.
} from '@tbd54566975/dwn-sdk-js';
import type {
HandlerResponse,
JsonRpcHandler,
Expand All @@ -9,28 +19,19 @@ import {
createJsonRpcSuccessResponse,
} from '../../lib/json-rpc.js';

import type { Readable as IsomorphicReadable } from 'readable-stream';
import type { RecordsReadReply } from '@tbd54566975/dwn-sdk-js';
import type { SubscriptionRequestReply } from '@tbd54566975/dwn-sdk-js';
import { v4 as uuidv4 } from 'uuid';

export const handleDwnProcessMessage: JsonRpcHandler = async (
dwnRequest,
context,
) => {
const { dwn, dataStream } = context;
const { target, message } = dwnRequest.params;
const requestId = dwnRequest.id ?? uuidv4();

try {
let reply;
let reply: any;

const messageType =
message?.descriptor?.interface + message?.descriptor?.method;

// When a record is deleted via `RecordsDelete`, the initial RecordsWrite is kept as a tombstone _in addition_
// to the RecordsDelete message. the data associated to that initial RecordsWrite is deleted. If a record was written
// _and_ deleted before it ever got to dwn-server, we end up in a situation where we still need to process the tombstone
// so that we can process the RecordsDelete.
if (
messageType === DwnInterfaceName.Records + DwnMethodName.Write &&
!dataStream
Expand All @@ -44,26 +45,33 @@ export const handleDwnProcessMessage: JsonRpcHandler = async (
target,
message,
)) as SubscriptionRequestReply;
if (!context.subscriptionManager || context.socket) {
if (!context.subscriptionManager || !context.socket) {
throw new Error(
'setup failure. improper context provided for subscription',
);
}

// FIXME: How to handle subscription requests?
const request = await SubscriptionRequest.create({});
const req = {
socket: context.socket,
from: dwnRequest.params?.descriptor,
request: {},
from: message.descriptor.author,
request: request,
};
const subscription = await context.subscriptionManager.subscribe(req);
console.log(subscription);
reply = await context.subscriptionManager.subscribe(req);
const jsonRpcResponse = createJsonRpcSuccessResponse(requestId, {
reply,
});
const responsePayload: HandlerResponse = { jsonRpcResponse };
return responsePayload;
} else {
reply = (await dwn.processMessage(
target,
message,
dataStream as IsomorphicReadable,
)) as RecordsReadReply;
}
// RecordsRead messages return record data as a stream to for accommodate large amounts of data

let recordDataStream;
if (reply?.record?.data !== undefined) {
recordDataStream = reply.record.data;
Expand All @@ -83,7 +91,6 @@ export const handleDwnProcessMessage: JsonRpcHandler = async (
JsonRpcErrorCodes.InternalError,
e.message,
);

return { jsonRpcResponse } as HandlerResponse;
}
};
22 changes: 11 additions & 11 deletions src/subscription-manager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,15 +28,15 @@ export interface SubscriptionController {
}

export type RegisterSubscriptionRequest = {
from: string;
socket: WebSocket;
filters?: SubscriptionFilter[];
permissionGrant?: PermissionsGrant;
request: SubscriptionRequest;
from: string; // from connection
socket: WebSocket; // socket connection
filters?: SubscriptionFilter[]; // filters, if applicable
permissionGrant?: PermissionsGrant; //permission grant, if applicable
request: SubscriptionRequest; // subscription request
};

export type RegisterSubscriptionReply = {
reply: SubscriptionRequestReply;
reply?: SubscriptionRequestReply;
subscriptionId?: string;
};

Expand All @@ -52,21 +52,17 @@ export class SubscriptionManager {
private wss: WebSocketServer;
private dwn: Dwn;
private connections: Map<string, Subscription>;
private tenant: string;
options: SubscriptionManagerOptions;
#open: boolean;

constructor(options?: SubscriptionManagerOptions) {
this.wss = options?.wss || new WebSocketServer();
this.connections = new Map();
this.tenant = options?.tenant;
this.dwn = options?.dwn;
this.options = options;

this.wss.on('connection', (socket: WebSocket) => {
console.log('connected');
socket.on('message', async (data) => {
console.log('got message...');
await this.handleSubscribe(socket, data);
});
});
Expand Down Expand Up @@ -132,7 +128,7 @@ export class SubscriptionManager {
req: RegisterSubscriptionRequest,
): Promise<RegisterSubscriptionReply> {
const subscriptionReply = await this.dwn.handleSubscriptionRequest(

Check failure on line 130 in src/subscription-manager.ts

View workflow job for this annotation

GitHub Actions / test

Property 'handleSubscriptionRequest' does not exist on type 'Dwn'.
this.tenant,
req.from,
req.request.message,
);
if (subscriptionReply.status.code !== 200) {
Expand All @@ -148,6 +144,10 @@ export class SubscriptionManager {
return req.socket.send(Buffer.from(str));
},
);
return {
reply: subscriptionReply,
subscriptionId: subscription?.subscriptionId,
} as RegisterSubscriptionReply;
}

private async registerSubscription(
Expand Down
3 changes: 3 additions & 0 deletions src/ws-api.ts
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ export class WsApi {
*/
#handleConnection(socket: WebSocket, _request: IncomingMessage): void {
const dwn = this.dwn;
const subscriptionManager = this.#subscriptionManager;

socket[SOCKET_ISALIVE_SYMBOL] = true;

Expand Down Expand Up @@ -107,6 +108,8 @@ export class WsApi {
dwn,
transport: 'ws',
dataStream: requestDataStream,
subscriptionManager: subscriptionManager,
socket: socket,
};

const { jsonRpcResponse } = await jsonRpcApi.handle(
Expand Down
7 changes: 6 additions & 1 deletion tests/subscription-manager.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,10 @@ describe('Subscription Manager Test', async () => {
// set up lisetner...
socket.onmessage = (event): Promise<void> => {
try {
console.log('got message');
const resp = JSON.parse(event.data.toString());
if (resp.error) {
throw new Error(resp.error.message);
}
resolve(event);
return;
} catch (error) {
Expand All @@ -92,6 +95,7 @@ describe('Subscription Manager Test', async () => {
};

socket.onopen = async (): Promise<void> => {
// on open
const requestId = uuidv4();
const dwnRequest = createJsonRpcRequest(
requestId,
Expand All @@ -101,6 +105,7 @@ describe('Subscription Manager Test', async () => {
target: alice.did,
},
);

try {
if (socket.readyState !== WebSocket.OPEN) {
reject(new Error('socket not open'));
Expand Down

0 comments on commit f3c0b62

Please sign in to comment.