Skip to content

Commit

Permalink
upgrade dwn-sdk-js to latest v0.4.0, extract the MeessagesRead data i…
Browse files Browse the repository at this point in the history
…nto a separate stream for the JSON response
  • Loading branch information
LiranCohen committed Jul 2, 2024
1 parent d93d543 commit 26e86b0
Show file tree
Hide file tree
Showing 4 changed files with 125 additions and 20 deletions.
22 changes: 11 additions & 11 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 3 additions & 3 deletions package.json
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
{
"name": "@web5/dwn-server",
"type": "module",
"version": "0.3.1",
"version": "0.4.0",
"files": [
"dist",
"src"
Expand All @@ -26,8 +26,8 @@
"url": "https://github.com/TBD54566975/dwn-server/issues"
},
"dependencies": {
"@tbd54566975/dwn-sdk-js": "0.3.10",
"@tbd54566975/dwn-sql-store": "0.5.2",
"@tbd54566975/dwn-sdk-js": "0.4.0",
"@tbd54566975/dwn-sql-store": "0.6.0",
"better-sqlite3": "^8.5.0",
"body-parser": "^1.20.2",
"bytes": "3.1.2",
Expand Down
15 changes: 10 additions & 5 deletions src/json-rpc-handlers/dwn/process-message.ts
Original file line number Diff line number Diff line change
Expand Up @@ -79,12 +79,17 @@ export const handleDwnProcessMessage: JsonRpcHandler = async (
subscriptionHandler: subscriptionRequest?.subscriptionHandler,
});

const { record } = reply;
// RecordsRead messages return record data as a stream to for accommodate large amounts of data

const { record, entry } = reply;
// RecordsRead or MessagesRead messages optionally return data as a stream to accommodate large amounts of data
// we remove the data stream from the reply that will be serialized and return it as a separate property in the response payload.
let recordDataStream: IsomorphicReadable;
if (record !== undefined && record.data !== undefined) {
recordDataStream = reply.record.data;
delete reply.record.data; // not serializable via JSON
} else if (entry !== undefined && entry.data !== undefined) {
recordDataStream = entry.data;
delete reply.entry.data; // not serializable via JSON
}

if (subscriptionRequest && reply.subscription) {
Expand All @@ -107,15 +112,15 @@ export const handleDwnProcessMessage: JsonRpcHandler = async (
}

return responsePayload;
} catch (e) {
} catch (error) {
const jsonRpcResponse = createJsonRpcErrorResponse(
requestId,
JsonRpcErrorCodes.InternalError,
e.message,
error.message,
);

// log the unhandled error response
log.error('handleDwnProcessMessage error', jsonRpcResponse, dwnRequest, e);
log.error('handleDwnProcessMessage error', jsonRpcResponse, dwnRequest, error);
return { jsonRpcResponse } as HandlerResponse;
}
};
102 changes: 101 additions & 1 deletion tests/dwn-process-message.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import type { RequestContext } from '../src/lib/json-rpc-router.js';
import { JsonRpcErrorCodes, createJsonRpcRequest } from '../src/lib/json-rpc.js';
import { getTestDwn } from './test-dwn.js';
import { createRecordsWriteMessage } from './utils.js';
import { TestDataGenerator } from '@tbd54566975/dwn-sdk-js';
import { DataStream, Jws, Message, MessagesRead, RecordsRead, TestDataGenerator } from '@tbd54566975/dwn-sdk-js';

describe('handleDwnProcessMessage', function () {
it('returns a JSON RPC Success Response when DWN returns a 2XX status code', async function () {
Expand Down Expand Up @@ -64,6 +64,106 @@ describe('handleDwnProcessMessage', function () {
await dwn.close();
});

it('should extract data stream from DWN response and return it as a separate property in the JSON RPC response for RecordsRead', async function () {
// scenario: Write a record with some data, and then read the record to get the data back
const alice = await TestDataGenerator.generateDidKeyPersona();

// Write a record to later read
const { recordsWrite, dataStream, dataBytes } = await TestDataGenerator.generateRecordsWrite({ author: alice });
const requestId = uuidv4();
const dwnRequest = createJsonRpcRequest(requestId, 'dwn.processMessage', {
message: recordsWrite.toJSON(),
target: alice.did,
});

const dwn = await getTestDwn();
const context: RequestContext = { dwn, transport: 'http', dataStream };

const { jsonRpcResponse } = await handleDwnProcessMessage(
dwnRequest,
context,
);

expect(jsonRpcResponse.error).to.not.exist;
const { reply } = jsonRpcResponse.result;
expect(reply.status.code).to.equal(202);


// Read the record to get the data back
const readRequestId = uuidv4();
const recordsRead = await RecordsRead.create({
signer: Jws.createSigner(alice),
filter: { recordId: recordsWrite.message.recordId },
});

const readRequest = createJsonRpcRequest(readRequestId, 'dwn.processMessage', {
message: recordsRead.toJSON(),
target: alice.did,
});

const { jsonRpcResponse: recordsReadResponse, dataStream: responseDataStream } = await handleDwnProcessMessage(readRequest, { dwn, transport: 'http' });
expect(recordsReadResponse.error).to.not.exist;
const { reply: readReply } = recordsReadResponse.result;
expect(readReply.status.code).to.equal(200);
expect(responseDataStream).to.not.be.undefined;

// Compare the data stream bytes to ensure they are the same
const responseDataBytes = await DataStream.toBytes(responseDataStream as any)
expect(responseDataBytes).to.deep.equal(dataBytes);
await dwn.close();
});

it('should extract data stream from DWN response and return it as a separate property in the JSON RPC response for MessagesRead', async function () {
// scenario: Write a record with some data, and then read the message to get the data back

const alice = await TestDataGenerator.generateDidKeyPersona();

// Create a record to read
const { recordsWrite, dataStream, dataBytes } = await TestDataGenerator.generateRecordsWrite({ author: alice });
const requestId = uuidv4();
const dwnRequest = createJsonRpcRequest(requestId, 'dwn.processMessage', {
message: recordsWrite.toJSON(),
target: alice.did,
});

const dwn = await getTestDwn();
const context: RequestContext = { dwn, transport: 'http', dataStream };

const { jsonRpcResponse } = await handleDwnProcessMessage(
dwnRequest,
context,
);

expect(jsonRpcResponse.error).to.not.exist;
const { reply } = jsonRpcResponse.result;
expect(reply.status.code).to.equal(202);

const messageCid = await Message.getCid(recordsWrite.message);

// read the message
const readRequestId = uuidv4();
const messageRead = await MessagesRead.create({
signer: Jws.createSigner(alice),
messageCid,
});

const readRequest = createJsonRpcRequest(readRequestId, 'dwn.processMessage', {
message: messageRead.toJSON(),
target: alice.did,
});

const { jsonRpcResponse: recordsReadResponse, dataStream: responseDataStream } = await handleDwnProcessMessage(readRequest, { dwn, transport: 'http' });
expect(recordsReadResponse.error).to.not.exist;
const { reply: readReply } = recordsReadResponse.result;
expect(readReply.status.code).to.equal(200);
expect(responseDataStream).to.not.be.undefined;

// Compare the data stream bytes to ensure they are the same
const responseDataBytes = await DataStream.toBytes(responseDataStream as any)
expect(responseDataBytes).to.deep.equal(dataBytes);
await dwn.close();
});

it('should fail if no subscriptionRequest context exists for a `Subscribe` message', async function () {
const requestId = uuidv4();
const dwnRequest = createJsonRpcRequest(requestId, 'dwn.processMessage', {
Expand Down

0 comments on commit 26e86b0

Please sign in to comment.