Skip to content

Commit

Permalink
Incorrect MessageOptions for RecordsRead requests in sync manager. (#…
Browse files Browse the repository at this point in the history
…297)

* add filter property for RecordsRead within sync manager
* Fix response type for RecordsRead messages and improve type safety

---------


Co-authored-by: Frank Hinek <[email protected]>
  • Loading branch information
LiranCohen and frankhinek authored Nov 18, 2023
1 parent 50c27ec commit 9655934
Show file tree
Hide file tree
Showing 4 changed files with 143 additions and 14 deletions.
5 changes: 2 additions & 3 deletions packages/agent/src/dwn-manager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ import {
Signer,
GenericMessage,
MessagesGetReply,
RecordsReadReply,
UnionMessageReply,
RecordsWriteMessage,
RecordsWriteOptions,
Expand Down Expand Up @@ -281,7 +280,7 @@ export class DwnManager {
signer: dwnSigner
});

return { message: dwnMessage.toJSON(), dataStream: readableStream };
return { message: dwnMessage.message, dataStream: readableStream };
}

private async getAuthorSigningKeyId(options: {
Expand Down Expand Up @@ -380,7 +379,7 @@ export class DwnManager {
signer: dwnSigner
});

const reply = await this._dwn.processMessage(author, recordsRead.toJSON()) as RecordsReadReply;
const reply = await this._dwn.processMessage(author, recordsRead.message);

if (reply.status.code >= 400) {
const { status: { code, detail } } = reply;
Expand Down
11 changes: 8 additions & 3 deletions packages/agent/src/rpc-client.ts
Original file line number Diff line number Diff line change
@@ -1,11 +1,15 @@
import { utils as cryptoUtils } from '@web5/crypto';
import { UnionMessageReply } from '@tbd54566975/dwn-sdk-js';
import { RecordsReadReply, UnionMessageReply } from '@tbd54566975/dwn-sdk-js';

import type { JsonRpcResponse } from './json-rpc.js';
import type { SerializableDwnMessage } from './types/agent.js';

import { createJsonRpcRequest, parseJson } from './json-rpc.js';

/**
* Interface that can be implemented to communicate with {@link Web5Agent | Web5 Agent}
* implementations via JSON-RPC.
*/
export interface DidRpc {
get transportProtocols(): string[]
sendDidRequest(request: DidRpcRequest): Promise<DidRpcResponse>
Expand All @@ -29,7 +33,8 @@ export type DidRpcResponse = {
}

/**
* interface that can be implemented to communicate with Dwn Servers
* Interface that can be implemented to communicate with
* {@link https://github.com/TBD54566975/dwn-server | DWN Servers} via JSON-RPC.
*/
export interface DwnRpc {
/**
Expand Down Expand Up @@ -57,7 +62,7 @@ export type DwnRpcRequest = {
/**
* TODO: add jsdoc
*/
export type DwnRpcResponse = UnionMessageReply;
export type DwnRpcResponse = UnionMessageReply & RecordsReadReply;

export type RpcStatus = {
code: number;
Expand Down
15 changes: 9 additions & 6 deletions packages/agent/src/sync-manager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ import type {
EventsGetReply,
GenericMessage,
MessagesGetReply,
RecordsReadReply,
RecordsWriteMessage,
} from '@tbd54566975/dwn-sdk-js';

Expand Down Expand Up @@ -171,15 +170,17 @@ export class SyncManagerLevel implements SyncManager {
author : did,
messageType : 'RecordsRead',
messageOptions : {
recordId: message['recordId']
filter: {
recordId: message.recordId
}
}
});

const recordsReadReply = await this.agent.rpcClient.sendDwnRequest({
dwnUrl,
targetDid : did,
message : recordsRead
}) as RecordsReadReply;
message : recordsRead.message
});

const { record, status: readStatus } = recordsReadReply;

Expand Down Expand Up @@ -455,10 +456,12 @@ export class SyncManagerLevel implements SyncManager {
target : author,
messageType : 'RecordsRead',
messageOptions : {
recordId: writeMessage.recordId
filter: {
recordId: writeMessage.recordId
}
}
});
const reply = readResponse.reply as RecordsReadReply;
const reply = readResponse.reply;

if (is2xx(reply.status.code) && reply.record) {
// If status code is 200-299, return the data.
Expand Down
126 changes: 124 additions & 2 deletions packages/agent/tests/sync-manager.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import type { PortableDid } from '@web5/dids';

import { expect } from 'chai';
import * as sinon from 'sinon';
import { RecordsQueryReply, RecordsWriteMessage } from '@tbd54566975/dwn-sdk-js';

import type { ManagedIdentity } from '../src/identity-manager.js';

Expand All @@ -10,8 +11,6 @@ import { TestAgent } from './utils/test-agent.js';
import { SyncManagerLevel } from '../src/sync-manager.js';
import { TestManagedAgent } from '../src/test-managed-agent.js';

import { RecordsQueryReply, RecordsWriteMessage } from '@tbd54566975/dwn-sdk-js';

let testDwnUrls: string[] = [testDwnUrl];

describe('SyncManagerLevel', () => {
Expand Down Expand Up @@ -135,6 +134,68 @@ describe('SyncManagerLevel', () => {
expect(localDwnQueryReply.entries).to.have.length(1); // Record does exist on local DWN.
});

it('synchronizes records with data larger than the `encodedData` limit within the `RecordsQuery` response', async () => {
// larger than the size of data returned in a RecordsQuery
const LARGE_DATA_SIZE = 70_000;

//register alice
await testAgent.agent.syncManager.registerIdentity({
did: alice.did
});

// create a remote record
const record = await testAgent.agent.dwnManager.sendRequest({
store : false,
author : alice.did,
target : alice.did,
messageType : 'RecordsWrite',
messageOptions : {
dataFormat: 'text/plain'
},
dataStream: new Blob(Array(LARGE_DATA_SIZE).fill('a')) //large data
});

// check that the record doesn't exist locally
const { reply: localReply } = await testAgent.agent.dwnManager.processRequest({
author : alice.did,
target : alice.did,
messageType : 'RecordsQuery',
messageOptions : { filter: { recordId: (record.message as RecordsWriteMessage).recordId }}
});

expect(localReply.status.code).to.equal(200);
expect(localReply.entries?.length).to.equal(0);

// initiate sync
await testAgent.agent.syncManager.pull();

// query that the local record exists
const { reply: localReply2 } = await testAgent.agent.dwnManager.processRequest({
author : alice.did,
target : alice.did,
messageType : 'RecordsQuery',
messageOptions : { filter: { recordId: (record.message as RecordsWriteMessage).recordId }}
});

expect(localReply2.status.code).to.equal(200);
expect(localReply2.entries?.length).to.equal(1);
const entry = localReply2.entries![0];
expect(entry.encodedData).to.be.undefined; // encodedData is undefined

// check for response encodedData if it doesn't exist issue a RecordsRead
const recordId = (entry as RecordsWriteMessage).recordId;
// get individual records without encodedData to check that data exists
const readRecord = await testAgent.agent.dwnManager.processRequest({
author : alice.did,
target : alice.did,
messageType : 'RecordsRead',
messageOptions : { filter: { recordId } }
});
const reply = readRecord.reply;
expect(reply.status.code).to.equal(200);
expect(reply.record).to.not.be.undefined;
expect(reply.record!.data).to.not.be.undefined; // record data exists
});

it('synchronizes records for multiple identities from remote DWN to local DWN', async () => {
// Create a second Identity to author the DWN messages.
Expand Down Expand Up @@ -271,6 +332,67 @@ describe('SyncManagerLevel', () => {
expect(remoteDwnQueryReply.entries).to.have.length(1); // Record does exist on remote DWN.
});

it('synchronizes records with data larger than the `encodedData` limit within the `RecordsQuery` response', async () => {
// larger than the size of data returned in a RecordsQuery
const LARGE_DATA_SIZE = 70_000;

//register alice
await testAgent.agent.syncManager.registerIdentity({
did: alice.did
});

// create a local record
const record = await testAgent.agent.dwnManager.processRequest({
author : alice.did,
target : alice.did,
messageType : 'RecordsWrite',
messageOptions : {
dataFormat: 'text/plain'
},
dataStream: new Blob(Array(LARGE_DATA_SIZE).fill('a')) //large data
});

// check that record doesn't exist remotely
const { reply: remoteReply } = await testAgent.agent.dwnManager.sendRequest({
author : alice.did,
target : alice.did,
messageType : 'RecordsQuery',
messageOptions : { filter: { recordId: (record.message as RecordsWriteMessage).recordId }}
});

expect(remoteReply.status.code).to.equal(200);
expect(remoteReply.entries?.length).to.equal(0);

// initiate sync
await testAgent.agent.syncManager.push();

// query for remote REcords
const { reply: remoteReply2 } = await testAgent.agent.dwnManager.sendRequest({
author : alice.did,
target : alice.did,
messageType : 'RecordsQuery',
messageOptions : { filter: { recordId: (record.message as RecordsWriteMessage).recordId }}
});

expect(remoteReply2.status.code).to.equal(200);
expect(remoteReply2.entries?.length).to.equal(1);
const entry = remoteReply2.entries![0];
expect(entry.encodedData).to.be.undefined;
// check for response encodedData if it doesn't exist issue a RecordsRead
const recordId = (entry as RecordsWriteMessage).recordId;
// get individual records without encodedData to check that data exists
const readRecord = await testAgent.agent.dwnManager.processRequest({
author : alice.did,
target : alice.did,
messageType : 'RecordsRead',
messageOptions : { filter: { recordId } }
});
const reply = readRecord.reply;
expect(reply.status.code).to.equal(200);
expect(reply.record).to.not.be.undefined;
expect(reply.record!.data).to.not.be.undefined;
});

it('synchronizes records for multiple identities from local DWN to remote DWN', async () => {
// Create a second Identity to author the DWN messages.
const { did: bobDid } = await testAgent.createIdentity({ testDwnUrls });
Expand Down

0 comments on commit 9655934

Please sign in to comment.