From 720389e532967a6f7582230e70f6c18b5ced0a39 Mon Sep 17 00:00:00 2001 From: Liran Cohen Date: Tue, 19 Sep 2023 09:31:23 -0400 Subject: [PATCH 01/20] add test for bi-drectional sync --- packages/agent/tests/sync-manager.spec.ts | 66 +++++++++++++++++++++++ 1 file changed, 66 insertions(+) diff --git a/packages/agent/tests/sync-manager.spec.ts b/packages/agent/tests/sync-manager.spec.ts index 2a6272534..0ddddf583 100644 --- a/packages/agent/tests/sync-manager.spec.ts +++ b/packages/agent/tests/sync-manager.spec.ts @@ -5,12 +5,14 @@ import * as sinon from 'sinon'; import type { ManagedIdentity } from '../src/identity-manager.js'; +import { randomUuid } from '@web5/crypto/utils'; import { testDwnUrl } from './test-config.js'; import { TestAgent } from './utils/test-agent.js'; import { SyncManagerLevel } from '../src/sync-manager.js'; import { TestManagedAgent } from '../src/test-managed-agent.js'; import { RecordsQueryReply, RecordsWriteMessage } from '@tbd54566975/dwn-sdk-js'; +import { ProcessDwnRequest } from '../src/index.js'; let testDwnUrls: string[] = [testDwnUrl]; @@ -343,6 +345,70 @@ describe('SyncManagerLevel', () => { expect(remoteDwnQueryReply.status.code).to.equal(200); // Query was successfully executed. expect(remoteDwnQueryReply.entries).to.have.length(1); // Record does exist on remote DWN. }).timeout(5000); + + it('synchronizes data in both directions for a single identity', async () => { + + await testAgent.agent.syncManager.registerIdentity({ + did: alice.did + }); + + const testWriteMessage = (id: string): ProcessDwnRequest => { + return { + author : alice.did, + target : alice.did, + messageType : 'RecordsWrite', + messageOptions : { + schema : 'testSchema', + dataFormat : 'text/plain' + }, + dataStream: new Blob([`Hello, ${id}`]) + }; + }; + + const everythingQuery = (): ProcessDwnRequest => { + return { + author : alice.did, + target : alice.did, + messageType : 'RecordsQuery', + messageOptions : { filter: { schema: 'testSchema' } } + }; + }; + + const localRecords = new Set( + (await Promise.all(Array(5).fill({}).map(_ => testAgent.agent.dwnManager.processRequest(testWriteMessage(randomUuid()))))) + .map(r => (r.message as RecordsWriteMessage).recordId) + ); + + const remoteRecords = new Set( + (await Promise.all(Array(5).fill({}).map(_ => testAgent.agent.dwnManager.sendRequest(testWriteMessage(randomUuid()))))) + .map(r => (r.message as RecordsWriteMessage).recordId) + ); + + const { reply: localReply } = await testAgent.agent.dwnManager.processRequest(everythingQuery()); + expect(localReply.status.code).to.equal(200); + expect(localReply.entries?.length).to.equal(localRecords.size); + expect(localReply.entries?.every(e => localRecords.has((e as RecordsWriteMessage).recordId))).to.be.true; + + const { reply: remoteReply } = await testAgent.agent.dwnManager.sendRequest(everythingQuery()); + expect(remoteReply.status.code).to.equal(200); + expect(remoteReply.entries?.length).to.equal(remoteRecords.size); + expect(remoteReply.entries?.every(e => remoteRecords.has((e as RecordsWriteMessage).recordId))).to.be.true; + + await testAgent.agent.syncManager.push(); + await testAgent.agent.syncManager.pull(); + + const records = new Set([...remoteRecords, ...localRecords]); + const { reply: allRemoteReply } = await testAgent.agent.dwnManager.sendRequest(everythingQuery()); + expect(allRemoteReply.status.code).to.equal(200); + expect(allRemoteReply.entries?.length).to.equal(records.size); + expect(allRemoteReply.entries?.every(e => records.has((e as RecordsWriteMessage).recordId))).to.be.true; + + const { reply: allLocalReply } = await testAgent.agent.dwnManager.sendRequest(everythingQuery()); + expect(allLocalReply.status.code).to.equal(200); + expect(allLocalReply.entries?.length).to.equal(records.size); + expect(allLocalReply.entries?.every(e => records.has((e as RecordsWriteMessage).recordId))).to.be.true; + + }).timeout(5000); }); }); }); \ No newline at end of file From afd885036ca40e0e3295430b566193f1525d010b Mon Sep 17 00:00:00 2001 From: Liran Cohen Date: Sun, 17 Sep 2023 18:10:35 -0400 Subject: [PATCH 02/20] chaos tests --- packages/agent/tests/chaos-monkey.spec.ts | 215 ++++++++++++++++++++++ 1 file changed, 215 insertions(+) create mode 100644 packages/agent/tests/chaos-monkey.spec.ts diff --git a/packages/agent/tests/chaos-monkey.spec.ts b/packages/agent/tests/chaos-monkey.spec.ts new file mode 100644 index 000000000..f3860f9d6 --- /dev/null +++ b/packages/agent/tests/chaos-monkey.spec.ts @@ -0,0 +1,215 @@ +import type { PortableDid } from '@web5/dids'; + +import { expect } from 'chai'; +import * as sinon from 'sinon'; + +import type { ManagedIdentity } from '../src/identity-manager.js'; + +import { testDwnUrl } from './test-config.js'; +import { TestAgent } from './utils/test-agent.js'; +import { TestManagedAgent } from '../src/test-managed-agent.js'; + +import { randomUuid } from '@web5/crypto/utils'; +import { DwnRequest, DwnResponse, ProcessDwnRequest, SendDwnRequest } from '../src/index.js'; +import { DataStream, RecordsDeleteMessage, RecordsWrite, RecordsWriteMessage } from '@tbd54566975/dwn-sdk-js'; +import _Readable from 'readable-stream'; + +const testDwnUrls = [ testDwnUrl ]; + +describe('Chaos Monkey', () => { + describe('Sync Manager', function () { + this.timeout(120_000); + const records:DwnResponse[] = []; + + const lean: string|undefined = process.env.SYNC_LEAN === 'pull' ? 'pull' : + process.env.SYNC_LEAN === 'push' ? 'push' : undefined; + + const DEFAULT_SYNC_ROUNDS = 10; + const DEFAULT_BATCH_ROUNDS = 10; + const DEFAULT_BATCH_COUNT = 5; + const rounds: number = !isNaN(parseInt(process.env.SYNC_ROUNDS || 'not-a-number')) ? parseInt(process.env.SYNC_ROUNDS!) : DEFAULT_SYNC_ROUNDS; + const batchRounds: number = !isNaN(parseInt(process.env.BATCH_ROUNDS || 'not-a-number')) ? parseInt(process.env.BATCH_ROUNDS!) : DEFAULT_BATCH_ROUNDS; + const batchCount: number = !isNaN(parseInt(process.env.BATCH_COUNT || 'not-a-number')) ? parseInt(process.env.BATCH_COUNT!) : DEFAULT_BATCH_COUNT; + + let alice: ManagedIdentity; + let bob: ManagedIdentity; + let carol: ManagedIdentity; + let dave: ManagedIdentity; + + let aliceDid: PortableDid; + let bobDid: PortableDid; + let carolDid: PortableDid; + let daveDid: PortableDid; + let testAgent: TestManagedAgent; + + + const testWriteMessage = (did:string, id: string): ProcessDwnRequest => { + return { + author : did, + target : did, + messageType : 'RecordsWrite', + messageOptions : { + schema : 'schema', + dataFormat : 'application/json' + }, + dataStream: new Blob([ `Hello, ${id}`]) + }; + } + + const testQueryMessage = (did:string): ProcessDwnRequest => { + return { + author : did, + target : did, + messageType : 'RecordsQuery', + messageOptions: { filter: { schema: 'schema', dataFormat: 'application/json' } }, + }; + } + + const testReadMessage = (did:string, recordId: string): SendDwnRequest => { + return { + author : did, + target : did, + messageType : 'RecordsRead', + messageOptions : { recordId } + }; + } + + before(async () => { + testAgent = await TestManagedAgent.create({ + agentClass : TestAgent, + agentStores : 'dwn' + }); + }); + + beforeEach(async () => { + records.splice(0, records.length); + await testAgent.clearStorage(); + await testAgent.createAgentDid(); + // Create a new Identity to author the DWN messages. + ({ did: aliceDid } = await testAgent.createIdentity({ testDwnUrls })); + alice = await testAgent.agent.identityManager.import({ + did : aliceDid, + identity : { name: 'Alice', did: aliceDid.did }, + kms : 'local' + }); + ({ did: bobDid } = await testAgent.createIdentity({ testDwnUrls })); + bob = await testAgent.agent.identityManager.import({ + did : bobDid, + identity : { name: 'Bob', did: bobDid.did }, + kms : 'local' + }); + ({ did: carolDid } = await testAgent.createIdentity({ testDwnUrls })); + carol = await testAgent.agent.identityManager.import({ + did : carolDid, + identity : { name: 'Carol', did: carolDid.did }, + kms : 'local' + }); + ({ did: daveDid} = await testAgent.createIdentity({ testDwnUrls })); + dave = await testAgent.agent.identityManager.import({ + did : daveDid, + identity : { name: 'Dave', did: daveDid.did }, + kms : 'local' + }); + + const { dwnManager } = testAgent.agent; + const startLoadMessages = Date.now(); + + const process = async (message: ProcessDwnRequest, random: number): Promise => { + + let randomMod = 2; + if (lean !== undefined) { + // create an uneven distribution + randomMod = 3; + } + + // throw in a record that both get every 11th record. + if (random % 11 === 0) return processBoth(message); + + const left = (message: ProcessDwnRequest) => { + return lean === undefined || lean === 'pull' ? dwnManager.processRequest(message as ProcessDwnRequest): dwnManager.sendRequest(message as SendDwnRequest); + } + + const right = (message: ProcessDwnRequest) => { + return lean === undefined || lean === 'pull' ? dwnManager.sendRequest(message as SendDwnRequest) : dwnManager.processRequest(message as ProcessDwnRequest); + } + + return random % randomMod === 0 ? left(message) : right(message); + }; + + + const processBoth = async (message: ProcessDwnRequest) => { + const localResponse = await dwnManager.processRequest({...message} as ProcessDwnRequest); + // copy the message, todo use createFrom?? + message = { + ...message, + messageOptions: { + ...message.messageOptions || {}, + ...(localResponse.message as RecordsDeleteMessage).descriptor + } + } + const remoteResponse = await dwnManager.sendRequest({...message} as SendDwnRequest) + expect(localResponse.messageCid).to.equal(remoteResponse.messageCid, `invalid remote and local messages`); + return remoteResponse; + } + + const randomMessage = () => { + const random = getRandomInt(0, 1234567890); + const message = testWriteMessage(alice.did, randomUuid()); + return process(message, random); + } + + const batch = (count: number) => Array(count).fill({}).map(randomMessage) + + for (const _ of Array(batchRounds).fill({})) { + records.push(...(await Promise.all(batch(batchCount)))) + } + + const endLoadMessages = Date.now(); + console.log(`loaded ${records.length} messages in ${endLoadMessages - startLoadMessages}ms`); + expect(records.every(r => r.reply.status.code === 202), `could not load messages successfully`).to.be.true; + }); + + afterEach(async () => { + await testAgent.clearStorage(); + }); + + after(async () => { + await testAgent.clearStorage(); + await testAgent.closeStorage(); + }); + + describe(`startSync() ${rounds} runs`, () => { + for ( const _ of Array(rounds).fill({})) { + it('sync a lot of records', async () => { + await testAgent.agent.syncManager.registerIdentity({ + did: alice.did + }); + + // get remote and local before sync; + const testQuery = testQueryMessage(alice.did); + let { reply } = await testAgent.agent.dwnManager.processRequest(testQuery); + let { reply: replyRemote } = await testAgent.agent.dwnManager.sendRequest(testQuery); + + const startSync = Date.now(); + await testAgent.agent.syncManager.push(); + await testAgent.agent.syncManager.pull(); + const endSync = Date.now(); + + const remoteEntries = (replyRemote.entries || []).filter(e => (reply.entries || []).findIndex(le => (le as RecordsWriteMessage).recordId === (e as RecordsWriteMessage).recordId) < 0); + const localEntries = (reply.entries || []).filter(e => (replyRemote.entries || []).findIndex(re => (re as RecordsWriteMessage).recordId === (e as RecordsWriteMessage).recordId) < 0) + const commonItemsLength = (reply.entries!.length + replyRemote.entries!.length) - records.length; + + console.log(`sync time:\t\t${endSync-startSync} for ${records.length} records\nlocal records:\t\t${reply.entries!.length}/${localEntries.length} unique\nremote records:\t\t${replyRemote.entries!.length}/${remoteEntries.length} unique\ncommon records:\t\t${commonItemsLength}\n\n`) + expect(endSync-startSync).to.be.lt(60_000); + ({ reply } = await testAgent.agent.dwnManager.processRequest(testQuery)); + expect(reply.status.code).to.equal(200); + expect(reply.entries!.length).to.equal(records.length); + }).timeout(100_000); + } + }); + }); +}); + +function getRandomInt(min: number, max: number) { + return Math.floor(Math.random() * (Math.ceil(max - min)) + Math.ceil(min)); +} \ No newline at end of file From 481b3918734497e4a38fa7a71b9327fb9be88b91 Mon Sep 17 00:00:00 2001 From: Liran Cohen Date: Mon, 18 Sep 2023 15:06:33 -0400 Subject: [PATCH 03/20] move chaos tests --- .../tests/{ => chaos}/chaos-monkey.spec.ts | 68 ++++++++++--------- 1 file changed, 37 insertions(+), 31 deletions(-) rename packages/agent/tests/{ => chaos}/chaos-monkey.spec.ts (75%) diff --git a/packages/agent/tests/chaos-monkey.spec.ts b/packages/agent/tests/chaos/chaos-monkey.spec.ts similarity index 75% rename from packages/agent/tests/chaos-monkey.spec.ts rename to packages/agent/tests/chaos/chaos-monkey.spec.ts index f3860f9d6..bbbecd895 100644 --- a/packages/agent/tests/chaos-monkey.spec.ts +++ b/packages/agent/tests/chaos/chaos-monkey.spec.ts @@ -3,19 +3,23 @@ import type { PortableDid } from '@web5/dids'; import { expect } from 'chai'; import * as sinon from 'sinon'; -import type { ManagedIdentity } from '../src/identity-manager.js'; +import type { ManagedIdentity } from '../../src/identity-manager.js' -import { testDwnUrl } from './test-config.js'; -import { TestAgent } from './utils/test-agent.js'; -import { TestManagedAgent } from '../src/test-managed-agent.js'; +import { testDwnUrl } from '../test-config.js' +import { TestAgent } from '../utils/test-agent.js'; +import { TestManagedAgent } from '../../src/test-managed-agent.js'; import { randomUuid } from '@web5/crypto/utils'; -import { DwnRequest, DwnResponse, ProcessDwnRequest, SendDwnRequest } from '../src/index.js'; +import { DwnRequest, DwnResponse, ProcessDwnRequest, SendDwnRequest } from '../../src/index.js'; import { DataStream, RecordsDeleteMessage, RecordsWrite, RecordsWriteMessage } from '@tbd54566975/dwn-sdk-js'; import _Readable from 'readable-stream'; const testDwnUrls = [ testDwnUrl ]; +const checkChaos = (): boolean => { + return process.env.CHAOS_ENV === 'true' +} + describe('Chaos Monkey', () => { describe('Sync Manager', function () { this.timeout(120_000); @@ -179,32 +183,34 @@ describe('Chaos Monkey', () => { }); describe(`startSync() ${rounds} runs`, () => { - for ( const _ of Array(rounds).fill({})) { - it('sync a lot of records', async () => { - await testAgent.agent.syncManager.registerIdentity({ - did: alice.did - }); - - // get remote and local before sync; - const testQuery = testQueryMessage(alice.did); - let { reply } = await testAgent.agent.dwnManager.processRequest(testQuery); - let { reply: replyRemote } = await testAgent.agent.dwnManager.sendRequest(testQuery); - - const startSync = Date.now(); - await testAgent.agent.syncManager.push(); - await testAgent.agent.syncManager.pull(); - const endSync = Date.now(); - - const remoteEntries = (replyRemote.entries || []).filter(e => (reply.entries || []).findIndex(le => (le as RecordsWriteMessage).recordId === (e as RecordsWriteMessage).recordId) < 0); - const localEntries = (reply.entries || []).filter(e => (replyRemote.entries || []).findIndex(re => (re as RecordsWriteMessage).recordId === (e as RecordsWriteMessage).recordId) < 0) - const commonItemsLength = (reply.entries!.length + replyRemote.entries!.length) - records.length; - - console.log(`sync time:\t\t${endSync-startSync} for ${records.length} records\nlocal records:\t\t${reply.entries!.length}/${localEntries.length} unique\nremote records:\t\t${replyRemote.entries!.length}/${remoteEntries.length} unique\ncommon records:\t\t${commonItemsLength}\n\n`) - expect(endSync-startSync).to.be.lt(60_000); - ({ reply } = await testAgent.agent.dwnManager.processRequest(testQuery)); - expect(reply.status.code).to.equal(200); - expect(reply.entries!.length).to.equal(records.length); - }).timeout(100_000); + if (checkChaos()) { + for ( const _ of Array(rounds).fill({})) { + it('sync a lot of records', async () => { + await testAgent.agent.syncManager.registerIdentity({ + did: alice.did + }); + + // get remote and local before sync; + const testQuery = testQueryMessage(alice.did); + let { reply } = await testAgent.agent.dwnManager.processRequest(testQuery); + let { reply: replyRemote } = await testAgent.agent.dwnManager.sendRequest(testQuery); + + const startSync = Date.now(); + await testAgent.agent.syncManager.push(); + await testAgent.agent.syncManager.pull(); + const endSync = Date.now(); + + const remoteEntries = (replyRemote.entries || []).filter(e => (reply.entries || []).findIndex(le => (le as RecordsWriteMessage).recordId === (e as RecordsWriteMessage).recordId) < 0); + const localEntries = (reply.entries || []).filter(e => (replyRemote.entries || []).findIndex(re => (re as RecordsWriteMessage).recordId === (e as RecordsWriteMessage).recordId) < 0) + const commonItemsLength = (reply.entries!.length + replyRemote.entries!.length) - records.length; + + console.log(`sync time:\t\t${endSync-startSync} for ${records.length} records\nlocal records:\t\t${reply.entries!.length}/${localEntries.length} unique\nremote records:\t\t${replyRemote.entries!.length}/${remoteEntries.length} unique\ncommon records:\t\t${commonItemsLength}\n\n`) + expect(endSync-startSync).to.be.lt(60_000); + ({ reply } = await testAgent.agent.dwnManager.processRequest(testQuery)); + expect(reply.status.code).to.equal(200); + expect(reply.entries!.length).to.equal(records.length); + }).timeout(100_000); + } } }); }); From 5ab4e4bb368fd43d9c9251cf236f12a9daabbd96 Mon Sep 17 00:00:00 2001 From: Liran Cohen Date: Thu, 7 Sep 2023 14:24:48 -0400 Subject: [PATCH 04/20] add tests using multiple docker nodes --- .github/workflows/tests-ci.yml | 14 ++++++++++---- packages/agent/karma.conf.cjs | 2 +- packages/agent/tests/chaos/chaos-monkey.spec.ts | 4 +--- packages/agent/tests/dwn-manager.spec.ts | 4 +--- packages/agent/tests/sync-manager.spec.ts | 4 +--- packages/agent/tests/test-config.ts | 16 ++++++++-------- packages/api/karma.conf.cjs | 2 +- packages/api/tests/dwn-api.spec.ts | 4 +--- packages/api/tests/protocol.spec.ts | 4 +--- packages/api/tests/record.spec.ts | 4 +--- packages/api/tests/test-config.ts | 10 +++++----- packages/common/karma.conf.cjs | 2 +- packages/credentials/karma.conf.cjs | 2 +- packages/crypto/karma.conf.cjs | 2 +- packages/dev-env/docker-compose.yaml | 9 +++++++-- packages/dids/karma.conf.cjs | 2 +- packages/identity-agent/karma.conf.cjs | 2 +- packages/proxy-agent/karma.conf.cjs | 2 +- packages/user-agent/karma.conf.cjs | 2 +- 19 files changed, 45 insertions(+), 46 deletions(-) diff --git a/.github/workflows/tests-ci.yml b/.github/workflows/tests-ci.yml index cf98b6b5f..434ed8151 100644 --- a/.github/workflows/tests-ci.yml +++ b/.github/workflows/tests-ci.yml @@ -53,13 +53,16 @@ jobs: - name: Start dwn-server container run: cd packages/dev-env && docker-compose up -d - - name: Wait for dwn-server to be ready + - name: Wait for dwn-server1 to be ready run: until curl -sf http://localhost:3000/health; do echo -n .; sleep .1; done + - name: Wait for dwn-server2 to be ready + run: until curl -sf http://localhost:3001/health; do echo -n .; sleep .1; done + - name: Run tests for all packages run: npm run test:node --ws -- --color env: - TEST_DWN_URL: http://localhost:3000 + TEST_DWN_URLS: http://localhost:3000,http://localhost:3001 - name: Upload test coverage to Codecov uses: codecov/codecov-action@eaaf4bedf32dbdc6b720b63067d99c4d77d6047d # v3.1.4 @@ -100,10 +103,13 @@ jobs: - name: Start dwn-server container run: cd packages/dev-env && docker-compose up -d - - name: Wait for dwn-server to be ready + - name: Wait for dwn-server1 to be ready run: until curl -sf http://localhost:3000/health; do echo -n .; sleep .1; done + - name: Wait for dwn-server2 to be ready + run: until curl -sf http://localhost:3001/health; do echo -n .; sleep .1; done + - name: Run tests for all packages run: npm run test:browser --ws -- --color env: - TEST_DWN_URL: http://localhost:3000 + TEST_DWN_URLS: http://localhost:3000,http://localhost:3001 diff --git a/packages/agent/karma.conf.cjs b/packages/agent/karma.conf.cjs index 6b7407b2c..6d5eeb080 100644 --- a/packages/agent/karma.conf.cjs +++ b/packages/agent/karma.conf.cjs @@ -36,7 +36,7 @@ module.exports = function (config) { timeout: 10000 // 10 seconds }, // If an environment variable is defined, override the default test DWN URL. - testDwnUrl: process.env.TEST_DWN_URL, + testDwnUrls: process.env.TEST_DWN_URLS, }, // list of files / patterns to load in the browser diff --git a/packages/agent/tests/chaos/chaos-monkey.spec.ts b/packages/agent/tests/chaos/chaos-monkey.spec.ts index bbbecd895..6c02de8f4 100644 --- a/packages/agent/tests/chaos/chaos-monkey.spec.ts +++ b/packages/agent/tests/chaos/chaos-monkey.spec.ts @@ -5,7 +5,7 @@ import * as sinon from 'sinon'; import type { ManagedIdentity } from '../../src/identity-manager.js' -import { testDwnUrl } from '../test-config.js' +import { testDwnUrls } from '../test-config.js' import { TestAgent } from '../utils/test-agent.js'; import { TestManagedAgent } from '../../src/test-managed-agent.js'; @@ -14,8 +14,6 @@ import { DwnRequest, DwnResponse, ProcessDwnRequest, SendDwnRequest } from '../. import { DataStream, RecordsDeleteMessage, RecordsWrite, RecordsWriteMessage } from '@tbd54566975/dwn-sdk-js'; import _Readable from 'readable-stream'; -const testDwnUrls = [ testDwnUrl ]; - const checkChaos = (): boolean => { return process.env.CHAOS_ENV === 'true' } diff --git a/packages/agent/tests/dwn-manager.spec.ts b/packages/agent/tests/dwn-manager.spec.ts index 105110058..6d8149710 100644 --- a/packages/agent/tests/dwn-manager.spec.ts +++ b/packages/agent/tests/dwn-manager.spec.ts @@ -18,7 +18,7 @@ import { ProtocolsConfigureMessage, } from '@tbd54566975/dwn-sdk-js'; -import { testDwnUrl } from './test-config.js'; +import { testDwnUrls } from './test-config.js'; import { TestAgent } from './utils/test-agent.js'; import { DwnManager } from '../src/dwn-manager.js'; import { ManagedIdentity } from '../src/identity-manager.js'; @@ -34,8 +34,6 @@ if (!globalThis.crypto) globalThis.crypto = webcrypto; chai.use(chaiAsPromised); -let testDwnUrls: string[] = [testDwnUrl]; - describe('DwnManager', () => { describe('constructor', () => { diff --git a/packages/agent/tests/sync-manager.spec.ts b/packages/agent/tests/sync-manager.spec.ts index 0ddddf583..1dd55f236 100644 --- a/packages/agent/tests/sync-manager.spec.ts +++ b/packages/agent/tests/sync-manager.spec.ts @@ -6,7 +6,7 @@ import * as sinon from 'sinon'; import type { ManagedIdentity } from '../src/identity-manager.js'; import { randomUuid } from '@web5/crypto/utils'; -import { testDwnUrl } from './test-config.js'; +import { testDwnUrls } from './test-config.js'; import { TestAgent } from './utils/test-agent.js'; import { SyncManagerLevel } from '../src/sync-manager.js'; import { TestManagedAgent } from '../src/test-managed-agent.js'; @@ -14,8 +14,6 @@ import { TestManagedAgent } from '../src/test-managed-agent.js'; import { RecordsQueryReply, RecordsWriteMessage } from '@tbd54566975/dwn-sdk-js'; import { ProcessDwnRequest } from '../src/index.js'; -let testDwnUrls: string[] = [testDwnUrl]; - describe('SyncManagerLevel', () => { describe('get agent', () => { it(`returns the 'agent' instance property`, async () => { diff --git a/packages/agent/tests/test-config.ts b/packages/agent/tests/test-config.ts index 213604404..6ce799757 100644 --- a/packages/agent/tests/test-config.ts +++ b/packages/agent/tests/test-config.ts @@ -1,10 +1,10 @@ -declare const __karma__: { config?: { testDwnUrl?: string; } }; +declare const __karma__: { config?: { testDwnUrls?: string; } }; -const DEFAULT_TEST_DWN_URL = 'https://dwn.tbddev.org/dwn0'; +const DEFAULT_TEST_DWN_URLS = 'https://dwn.tbddev.org/dwn0'; -function getTestDwnUrl(): string { +function getTestDwnUrls(): string[] { // Check to see if we're running in a Karma browser test environment. - const browserTestEnvironment = typeof __karma__ !== 'undefined' && __karma__?.config?.testDwnUrl !== undefined; + const browserTestEnvironment = typeof __karma__ !== 'undefined' && __karma__?.config?.testDwnUrls !== undefined; // Check to see if we're running in a Node environment. const nodeTestEnvironment = process && process?.env !== undefined; @@ -12,13 +12,13 @@ function getTestDwnUrl(): string { // Attempt to use DWN URL defined in Karma config, if running a browser test. // Otherwise, attempt to use the Node environment variable. const envTestDwnUrl = (browserTestEnvironment) - ? __karma__.config!.testDwnUrl + ? __karma__.config!.testDwnUrls : (nodeTestEnvironment) - ? process.env.TEST_DWN_URL + ? process.env.TEST_DWN_URLS : undefined; // If defined, return the test environment DWN URL. Otherwise, return the default. - return envTestDwnUrl || DEFAULT_TEST_DWN_URL; + return (envTestDwnUrl || DEFAULT_TEST_DWN_URLS).split(','); } -export const testDwnUrl = getTestDwnUrl(); \ No newline at end of file +export const testDwnUrls = getTestDwnUrls(); \ No newline at end of file diff --git a/packages/api/karma.conf.cjs b/packages/api/karma.conf.cjs index 6b7407b2c..79b45d1b5 100644 --- a/packages/api/karma.conf.cjs +++ b/packages/api/karma.conf.cjs @@ -36,7 +36,7 @@ module.exports = function (config) { timeout: 10000 // 10 seconds }, // If an environment variable is defined, override the default test DWN URL. - testDwnUrl: process.env.TEST_DWN_URL, + testDwnUrls: process.env.TEST_DWN_URLS }, // list of files / patterns to load in the browser diff --git a/packages/api/tests/dwn-api.spec.ts b/packages/api/tests/dwn-api.spec.ts index 2374a7899..09ea469d7 100644 --- a/packages/api/tests/dwn-api.spec.ts +++ b/packages/api/tests/dwn-api.spec.ts @@ -5,12 +5,10 @@ import { expect } from 'chai'; import { TestManagedAgent } from '@web5/agent'; import { DwnApi } from '../src/dwn-api.js'; -import { testDwnUrl } from './test-config.js'; +import { testDwnUrls } from './test-config.js'; import { TestUserAgent } from './utils/test-user-agent.js'; import emailProtocolDefinition from './fixtures/protocol-definitions/email.json' assert { type: 'json' }; -let testDwnUrls: string[] = [testDwnUrl]; - describe('DwnApi', () => { let alice: ManagedIdentity; let aliceDid: PortableDid; diff --git a/packages/api/tests/protocol.spec.ts b/packages/api/tests/protocol.spec.ts index ea1cd1061..da2c8a716 100644 --- a/packages/api/tests/protocol.spec.ts +++ b/packages/api/tests/protocol.spec.ts @@ -6,7 +6,7 @@ import chaiAsPromised from 'chai-as-promised'; import { TestManagedAgent } from '@web5/agent'; import { DwnApi } from '../src/dwn-api.js'; -import { testDwnUrl } from './test-config.js'; +import { testDwnUrls } from './test-config.js'; import { TestUserAgent } from './utils/test-user-agent.js'; import emailProtocolDefinition from './fixtures/protocol-definitions/email.json' assert { type: 'json' }; @@ -21,8 +21,6 @@ import { webcrypto } from 'node:crypto'; if (!globalThis.crypto) globalThis.crypto = webcrypto; // TODO: Come up with a better way of resolving the TS errors. -let testDwnUrls: string[] = [testDwnUrl]; - describe('Protocol', () => { let dwn: DwnApi; let alice: ManagedIdentity; diff --git a/packages/api/tests/record.spec.ts b/packages/api/tests/record.spec.ts index ba3e881d8..f49087204 100644 --- a/packages/api/tests/record.spec.ts +++ b/packages/api/tests/record.spec.ts @@ -22,7 +22,7 @@ import { import { Record } from '../src/record.js'; import { DwnApi } from '../src/dwn-api.js'; import { dataToBlob } from '../src/utils.js'; -import { testDwnUrl } from './test-config.js'; +import { testDwnUrls } from './test-config.js'; import { TestUserAgent } from './utils/test-user-agent.js'; import { TestDataGenerator } from './utils/test-data-generator.js'; import emailProtocolDefinition from './fixtures/protocol-definitions/email.json' assert { type: 'json' }; @@ -41,8 +41,6 @@ if (!globalThis.crypto) globalThis.crypto = webcrypto; // TODO: Come up with a better way of resolving the TS errors. type RecordsWriteTest = RecordsWrite & RecordsWriteMessage; -let testDwnUrls: string[] = [testDwnUrl]; - describe('Record', () => { let dataText: string; let dataBlob: Blob; diff --git a/packages/api/tests/test-config.ts b/packages/api/tests/test-config.ts index 213604404..56374a65e 100644 --- a/packages/api/tests/test-config.ts +++ b/packages/api/tests/test-config.ts @@ -1,8 +1,8 @@ declare const __karma__: { config?: { testDwnUrl?: string; } }; -const DEFAULT_TEST_DWN_URL = 'https://dwn.tbddev.org/dwn0'; +const DEFAULT_TEST_DWN_URLS = 'https://dwn.tbddev.org/dwn0'; -function getTestDwnUrl(): string { +function getTestDwnUrl(): string[] { // Check to see if we're running in a Karma browser test environment. const browserTestEnvironment = typeof __karma__ !== 'undefined' && __karma__?.config?.testDwnUrl !== undefined; @@ -14,11 +14,11 @@ function getTestDwnUrl(): string { const envTestDwnUrl = (browserTestEnvironment) ? __karma__.config!.testDwnUrl : (nodeTestEnvironment) - ? process.env.TEST_DWN_URL + ? process.env.TEST_DWN_URLS : undefined; // If defined, return the test environment DWN URL. Otherwise, return the default. - return envTestDwnUrl || DEFAULT_TEST_DWN_URL; + return (envTestDwnUrl || DEFAULT_TEST_DWN_URLS).split(','); } -export const testDwnUrl = getTestDwnUrl(); \ No newline at end of file +export const testDwnUrls = getTestDwnUrl(); \ No newline at end of file diff --git a/packages/common/karma.conf.cjs b/packages/common/karma.conf.cjs index 6b7407b2c..79b45d1b5 100644 --- a/packages/common/karma.conf.cjs +++ b/packages/common/karma.conf.cjs @@ -36,7 +36,7 @@ module.exports = function (config) { timeout: 10000 // 10 seconds }, // If an environment variable is defined, override the default test DWN URL. - testDwnUrl: process.env.TEST_DWN_URL, + testDwnUrls: process.env.TEST_DWN_URLS }, // list of files / patterns to load in the browser diff --git a/packages/credentials/karma.conf.cjs b/packages/credentials/karma.conf.cjs index 6b7407b2c..79b45d1b5 100644 --- a/packages/credentials/karma.conf.cjs +++ b/packages/credentials/karma.conf.cjs @@ -36,7 +36,7 @@ module.exports = function (config) { timeout: 10000 // 10 seconds }, // If an environment variable is defined, override the default test DWN URL. - testDwnUrl: process.env.TEST_DWN_URL, + testDwnUrls: process.env.TEST_DWN_URLS }, // list of files / patterns to load in the browser diff --git a/packages/crypto/karma.conf.cjs b/packages/crypto/karma.conf.cjs index 6b7407b2c..79b45d1b5 100644 --- a/packages/crypto/karma.conf.cjs +++ b/packages/crypto/karma.conf.cjs @@ -36,7 +36,7 @@ module.exports = function (config) { timeout: 10000 // 10 seconds }, // If an environment variable is defined, override the default test DWN URL. - testDwnUrl: process.env.TEST_DWN_URL, + testDwnUrls: process.env.TEST_DWN_URLS }, // list of files / patterns to load in the browser diff --git a/packages/dev-env/docker-compose.yaml b/packages/dev-env/docker-compose.yaml index 98e08847e..36c1ebfb0 100644 --- a/packages/dev-env/docker-compose.yaml +++ b/packages/dev-env/docker-compose.yaml @@ -1,8 +1,13 @@ version: "3.98" services: - dwn-server: - container_name: dwn-server + dwn-server2: + container_name: dwn-server2 + image: ghcr.io/tbd54566975/dwn-server:dwn-sdk-0.2.3 + ports: + - "3001:3000" + dwn-server1: + container_name: dwn-server1 image: ghcr.io/tbd54566975/dwn-server:dwn-sdk-0.2.3 ports: - "3000:3000" diff --git a/packages/dids/karma.conf.cjs b/packages/dids/karma.conf.cjs index 6b7407b2c..79b45d1b5 100644 --- a/packages/dids/karma.conf.cjs +++ b/packages/dids/karma.conf.cjs @@ -36,7 +36,7 @@ module.exports = function (config) { timeout: 10000 // 10 seconds }, // If an environment variable is defined, override the default test DWN URL. - testDwnUrl: process.env.TEST_DWN_URL, + testDwnUrls: process.env.TEST_DWN_URLS }, // list of files / patterns to load in the browser diff --git a/packages/identity-agent/karma.conf.cjs b/packages/identity-agent/karma.conf.cjs index 6b7407b2c..79b45d1b5 100644 --- a/packages/identity-agent/karma.conf.cjs +++ b/packages/identity-agent/karma.conf.cjs @@ -36,7 +36,7 @@ module.exports = function (config) { timeout: 10000 // 10 seconds }, // If an environment variable is defined, override the default test DWN URL. - testDwnUrl: process.env.TEST_DWN_URL, + testDwnUrls: process.env.TEST_DWN_URLS }, // list of files / patterns to load in the browser diff --git a/packages/proxy-agent/karma.conf.cjs b/packages/proxy-agent/karma.conf.cjs index 6b7407b2c..79b45d1b5 100644 --- a/packages/proxy-agent/karma.conf.cjs +++ b/packages/proxy-agent/karma.conf.cjs @@ -36,7 +36,7 @@ module.exports = function (config) { timeout: 10000 // 10 seconds }, // If an environment variable is defined, override the default test DWN URL. - testDwnUrl: process.env.TEST_DWN_URL, + testDwnUrls: process.env.TEST_DWN_URLS }, // list of files / patterns to load in the browser diff --git a/packages/user-agent/karma.conf.cjs b/packages/user-agent/karma.conf.cjs index 6b7407b2c..79b45d1b5 100644 --- a/packages/user-agent/karma.conf.cjs +++ b/packages/user-agent/karma.conf.cjs @@ -36,7 +36,7 @@ module.exports = function (config) { timeout: 10000 // 10 seconds }, // If an environment variable is defined, override the default test DWN URL. - testDwnUrl: process.env.TEST_DWN_URL, + testDwnUrls: process.env.TEST_DWN_URLS }, // list of files / patterns to load in the browser From c67099bdcdc118076ab7dc260a9fd8e99feaadad Mon Sep 17 00:00:00 2001 From: Liran Cohen Date: Tue, 12 Sep 2023 19:17:56 -0400 Subject: [PATCH 05/20] bring parts of sync up outside of push/pull --- packages/agent/src/sync-manager.ts | 217 ++++++++++-------- .../agent/tests/chaos/chaos-monkey.spec.ts | 3 +- packages/agent/tests/sync-manager.spec.ts | 21 +- packages/agent/tests/utils/test-agent.ts | 4 + 4 files changed, 136 insertions(+), 109 deletions(-) diff --git a/packages/agent/src/sync-manager.ts b/packages/agent/src/sync-manager.ts index 2f6dc830b..e8e55d270 100644 --- a/packages/agent/src/sync-manager.ts +++ b/packages/agent/src/sync-manager.ts @@ -21,8 +21,6 @@ export interface SyncManager { registerIdentity(options: { did: string }): Promise; startSync(options: { interval: number }): Promise; stopSync(): void; - push(): Promise; - pull(): Promise; } export type SyncManagerOptions = { @@ -31,12 +29,12 @@ export type SyncManagerOptions = { db?: Level; }; -type SyncDirection = 'push' | 'pull'; type SyncState = { did: string; dwnUrl: string; - watermark: string | undefined; + pullWatermark: string | undefined; + pushWatermark: string | undefined; } type DwnMessage = { @@ -94,9 +92,6 @@ export class SyncManagerLevel implements SyncManager { } public async pull(): Promise { - const syncPeerState = await this.getSyncPeerState({ syncDirection: 'pull' }); - await this.enqueueOperations({ syncDirection: 'pull', syncPeerState }); - const pullQueue = this.getPullQueue(); const pullJobs = await pullQueue.iterator().all(); @@ -225,9 +220,6 @@ export class SyncManagerLevel implements SyncManager { } public async push(): Promise { - const syncPeerState = await this.getSyncPeerState({ syncDirection: 'push' }); - await this.enqueueOperations({ syncDirection: 'push', syncPeerState }); - const pushQueue = this.getPushQueue(); const pushJobs = await pushQueue.iterator().all(); @@ -296,7 +288,20 @@ export class SyncManagerLevel implements SyncManager { public startSync(options: { interval: number }): Promise { - const { interval = 120_000 } = options; + const { interval } = options; + + // interval 0 means start instantly and don't repeat. + if (interval === 0) { + return new Promise( async (resolve,reject) => { + try { + await this.sync(); + resolve(); + + } catch(error) { + reject(error); + } + }) + } return new Promise((resolve, reject) => { if (this._syncIntervalId) { @@ -305,8 +310,7 @@ export class SyncManagerLevel implements SyncManager { this._syncIntervalId = setInterval(async () => { try { - await this.push(); - await this.pull(); + await this.sync(); } catch (error) { this.stopSync(); reject(error); @@ -322,90 +326,93 @@ export class SyncManagerLevel implements SyncManager { } } - private async enqueueOperations(options: { - syncDirection: SyncDirection, - syncPeerState: SyncState[] - }) { - const { syncDirection, syncPeerState } = options; - - for (let syncState of syncPeerState) { - // Get the event log from the remote DWN if pull sync, or local DWN if push sync. - const eventLog = await this.getDwnEventLog({ - did : syncState.did, - dwnUrl : syncState.dwnUrl, - syncDirection, - watermark : syncState.watermark - }); + private async sync(): Promise { + await this.enqueueOperations(); + await this.push(); + await this.pull(); + } - const syncOperations: DbBatchOperation[] = []; + private createOperationKey(did: string, dwnUrl: string, watermark: string, messageCid: string): string { + return [did, dwnUrl, watermark, messageCid].join('~'); + } - for (let event of eventLog) { - /** Use "did~dwnUrl~watermark~messageCid" as the key in the sync queue. - * Note: It is critical that `watermark` precedes `messageCid` to - * ensure that when the sync jobs are pulled off the queue, they - * are lexographically sorted oldest to newest. */ - const operationKey = [ - syncState.did, - syncState.dwnUrl, - event.watermark, - event.messageCid - ].join('~'); + private dbBatchOperationPut(did: string, dwnUrl: string, watermark: string, messageCid: string): DbBatchOperation { + const key = this.createOperationKey(did, dwnUrl, watermark, messageCid); + return { type: 'put', key, value: '' } + } - const operation: DbBatchOperation = { type: 'put', key: operationKey, value: '' }; + async enqueueOperations(direction?: 'pull' | 'push') { + const syncPeerState = await this.getSyncPeerState(); + for (let syncState of syncPeerState) { + const localEvents = await this.getLocalDwnEvents({ + did: syncState.did, + watermark: syncState.pushWatermark, + }); + const remoteEvents = await this.getRemoteEvents({ + did: syncState.did, + dwnUrl: syncState.dwnUrl, + watermark: syncState.pullWatermark, + }); - syncOperations.push(operation); + const pullOperations: DbBatchOperation[] = []; + if (direction === undefined || direction === 'pull') { + remoteEvents.forEach(remoteEvent => { + if (localEvents.findIndex(localEvent => localEvent.messageCid === remoteEvent.messageCid) < 0) { + const operation = this.dbBatchOperationPut(syncState.did, syncState.dwnUrl, remoteEvent.watermark, remoteEvent.messageCid); + pullOperations.push(operation); + } + }); } - if (syncOperations.length > 0) { - const syncQueue = (syncDirection === 'pull') - ? this.getPullQueue() - : this.getPushQueue(); - await syncQueue.batch(syncOperations as any); + const pushOperations: DbBatchOperation[] = []; + if (direction === undefined || direction === 'push') { + localEvents.forEach(localEvent => { + if(remoteEvents.findIndex(remoteEvent => remoteEvent.messageCid === localEvent.messageCid) < 0) { + const operation = this.dbBatchOperationPut(syncState.did, syncState.dwnUrl, localEvent.watermark, localEvent.messageCid); + pushOperations.push(operation); + } + }); } + + await this.getPullQueue().batch(pullOperations as any); + await this.getPushQueue().batch(pushOperations as any); } } - private async getDwnEventLog(options: { - did: string, - dwnUrl: string, - syncDirection: SyncDirection, - watermark?: string - }) { - const { did, dwnUrl, syncDirection, watermark } = options; - + private async getLocalDwnEvents(options:{ did: string, watermark?: string }) { + const { did, watermark } = options; let eventsReply = {} as EventsGetReply; + ({ reply: eventsReply } = await this.agent.dwnManager.processRequest({ + author : did, + target : did, + messageType : 'EventsGet', + messageOptions : { watermark } + })); + + return eventsReply.events ?? []; + } - if (syncDirection === 'pull') { - // When sync is a pull, get the event log from the remote DWN. - const eventsGetMessage = await this.agent.dwnManager.createMessage({ - author : did, - messageType : 'EventsGet', - messageOptions : { watermark } - }); + private async getRemoteEvents(options: { did: string, dwnUrl: string, watermark?: string }) { + const { did, dwnUrl, watermark } = options; + let eventsReply = {} as EventsGetReply; - try { - eventsReply = await this.agent.rpcClient.sendDwnRequest({ - dwnUrl : dwnUrl, - targetDid : did, - message : eventsGetMessage - }); - } catch { - // If a particular DWN service endpoint is unreachable, silently ignore. - } + const eventsGetMessage = await this.agent.dwnManager.createMessage({ + author : did, + messageType : 'EventsGet', + messageOptions : { watermark } + }); - } else if (syncDirection === 'push') { - // When sync is a push, get the event log from the local DWN. - ({ reply: eventsReply } = await this.agent.dwnManager.processRequest({ - author : did, - target : did, - messageType : 'EventsGet', - messageOptions : { watermark } - })); + try { + eventsReply = await this.agent.rpcClient.sendDwnRequest({ + dwnUrl : dwnUrl, + targetDid : did, + message : eventsGetMessage + }); + } catch { + // If a particular DWN service endpoint is unreachable, silently ignore. } - const eventLog = eventsReply.events ?? []; - - return eventLog; + return eventsReply.events ?? []; } private async getDwnMessage( @@ -482,11 +489,7 @@ export class SyncManagerLevel implements SyncManager { return dwnMessage; } - private async getSyncPeerState(options: { - syncDirection: SyncDirection - }): Promise { - const { syncDirection } = options; - + private async getSyncPeerState(): Promise { // Get a list of the DIDs of all registered identities. const registeredIdentities = await this._db.sublevel('registeredIdentities').keys().all(); @@ -521,33 +524,57 @@ export class SyncManagerLevel implements SyncManager { /** Get the watermark (or undefined) for each (DID, DWN service endpoint, sync direction) * combination and add it to the sync peer state array. */ for (let dwnUrl of service.serviceEndpoint.nodes) { - const watermark = await this.getWatermark(did, dwnUrl, syncDirection); - syncPeerState.push({ did, dwnUrl, watermark }); + const watermark = await this.getWatermark(did, dwnUrl); + syncPeerState.push({ did, dwnUrl, pullWatermark: watermark.pull, pushWatermark: watermark.push }); } } return syncPeerState; } - private async getWatermark(did: string, dwnUrl: string, direction: SyncDirection) { - const wmKey = `${did}~${dwnUrl}~${direction}`; + private async getWatermark(did: string, dwnUrl: string): Promise<{ pull?:string, push?: string }> { + const wmKey = `${did}~${dwnUrl}`; const watermarkStore = this.getWatermarkStore(); try { - return await watermarkStore.get(wmKey); + const wm = await watermarkStore.get(wmKey); + const split = wm.split('~'); + if (split.length !== 2) { + return {} + } + + let pull; + let push; + if (split[0] !== '0') { + pull = split[0] + } + if (split[1] !== '0') { + push = split[1] + } + + return { pull, push }; } catch(error: any) { // Don't throw when a key wasn't found. if (error.notFound) { - return undefined; + return {}; } + throw new Error('invalid watermark'); } } - private async setWatermark(did: string, dwnUrl: string, direction: SyncDirection, watermark: string) { - const wmKey = `${did}~${dwnUrl}~${direction}`; + private async setWatermark(did: string, dwnUrl: string, pullWatermark?: string, pushWatermark?: string) { + const wmKey = `${did}~${dwnUrl}`; const watermarkStore = this.getWatermarkStore(); - await watermarkStore.put(wmKey, watermark); + if (pullWatermark === undefined) { + pullWatermark = '0' + } + + if (pushWatermark === undefined) { + pushWatermark = '0' + } + + await watermarkStore.put(wmKey, `${pullWatermark}~${pushWatermark}`); } /** diff --git a/packages/agent/tests/chaos/chaos-monkey.spec.ts b/packages/agent/tests/chaos/chaos-monkey.spec.ts index 6c02de8f4..664095e9a 100644 --- a/packages/agent/tests/chaos/chaos-monkey.spec.ts +++ b/packages/agent/tests/chaos/chaos-monkey.spec.ts @@ -194,8 +194,7 @@ describe('Chaos Monkey', () => { let { reply: replyRemote } = await testAgent.agent.dwnManager.sendRequest(testQuery); const startSync = Date.now(); - await testAgent.agent.syncManager.push(); - await testAgent.agent.syncManager.pull(); + await testAgent.agent.syncManager.startSync({ interval: 0 }); const endSync = Date.now(); const remoteEntries = (replyRemote.entries || []).filter(e => (reply.entries || []).findIndex(le => (le as RecordsWriteMessage).recordId === (e as RecordsWriteMessage).recordId) < 0); diff --git a/packages/agent/tests/sync-manager.spec.ts b/packages/agent/tests/sync-manager.spec.ts index 1dd55f236..7edd9cd86 100644 --- a/packages/agent/tests/sync-manager.spec.ts +++ b/packages/agent/tests/sync-manager.spec.ts @@ -7,7 +7,7 @@ import type { ManagedIdentity } from '../src/identity-manager.js'; import { randomUuid } from '@web5/crypto/utils'; import { testDwnUrls } from './test-config.js'; -import { TestAgent } from './utils/test-agent.js'; +import { TestAgent, sleep } from './utils/test-agent.js'; import { SyncManagerLevel } from '../src/sync-manager.js'; import { TestManagedAgent } from '../src/test-managed-agent.js'; @@ -74,12 +74,12 @@ describe('SyncManagerLevel', () => { await testAgent.closeStorage(); }); - describe('pull()', () => { + describe('startSync()', () => { it('takes no action if no identities are registered', async () => { const didResolveSpy = sinon.spy(testAgent.agent.didResolver, 'resolve'); const sendDwnRequestSpy = sinon.spy(testAgent.agent.rpcClient, 'sendDwnRequest'); - await testAgent.agent.syncManager.pull(); + await testAgent.agent.syncManager.startSync({ interval: 0 }); // Verify DID resolution and DWN requests did not occur. expect(didResolveSpy.notCalled).to.be.true; @@ -121,7 +121,7 @@ describe('SyncManagerLevel', () => { }); // Execute Sync to pull all records from Alice's remote DWN to Alice's local DWN. - await testAgent.agent.syncManager.pull(); + await testAgent.agent.syncManager.startSync({ interval: 0 }); // Confirm the record now DOES exist on Alice's local DWN. queryResponse = await testAgent.agent.dwnManager.processRequest({ @@ -184,7 +184,7 @@ describe('SyncManagerLevel', () => { }); // Execute Sync to pull all records from Alice's and Bob's remove DWNs to their local DWNs. - await testAgent.agent.syncManager.pull(); + await testAgent.agent.syncManager.startSync({ interval: 0 }); // Confirm the Alice test record exist on Alice's local DWN. let queryResponse = await testAgent.agent.dwnManager.processRequest({ @@ -208,14 +208,12 @@ describe('SyncManagerLevel', () => { expect(localDwnQueryReply.status.code).to.equal(200); // Query was successfully executed. expect(localDwnQueryReply.entries).to.have.length(1); // Record does exist on local DWN. }).timeout(5000); - }); - describe('push()', () => { it('takes no action if no identities are registered', async () => { const didResolveSpy = sinon.spy(testAgent.agent.didResolver, 'resolve'); const processRequestSpy = sinon.spy(testAgent.agent.dwnManager, 'processRequest'); - await testAgent.agent.syncManager.push(); + await testAgent.agent.syncManager.startSync({ interval: 0 }); // Verify DID resolution and DWN requests did not occur. expect(didResolveSpy.notCalled).to.be.true; @@ -257,7 +255,7 @@ describe('SyncManagerLevel', () => { }); // Execute Sync to push all records from Alice's local DWN to Alice's remote DWN. - await testAgent.agent.syncManager.push(); + await testAgent.agent.syncManager.startSync({ interval: 0 }); // Confirm the record now DOES exist on Alice's remote DWN. queryResponse = await testAgent.agent.dwnManager.sendRequest({ @@ -319,7 +317,7 @@ describe('SyncManagerLevel', () => { }); // Execute Sync to push all records from Alice's and Bob's local DWNs to their remote DWNs. - await testAgent.agent.syncManager.push(); + await testAgent.agent.syncManager.startSync({ interval: 0 }); // Confirm the Alice test record exist on Alice's remote DWN. let queryResponse = await testAgent.agent.dwnManager.sendRequest({ @@ -392,8 +390,7 @@ describe('SyncManagerLevel', () => { expect(remoteReply.entries?.length).to.equal(remoteRecords.size); expect(remoteReply.entries?.every(e => remoteRecords.has((e as RecordsWriteMessage).recordId))).to.be.true; - await testAgent.agent.syncManager.push(); - await testAgent.agent.syncManager.pull(); + await testAgent.agent.syncManager.startSync({ interval: 0 }); const records = new Set([...remoteRecords, ...localRecords]); const { reply: allRemoteReply } = await testAgent.agent.dwnManager.sendRequest(everythingQuery()); diff --git a/packages/agent/tests/utils/test-agent.ts b/packages/agent/tests/utils/test-agent.ts index 730c9fa0b..c4cc9cb26 100644 --- a/packages/agent/tests/utils/test-agent.ts +++ b/packages/agent/tests/utils/test-agent.ts @@ -203,4 +203,8 @@ export class TestAgent implements Web5ManagedAgent { async start(_options: { passphrase: string; }): Promise { throw new Error('Not implemented'); } +} + +export async function sleep(duration: number): Promise { + return new Promise((resolve) => setTimeout(resolve, duration)); } \ No newline at end of file From 8c48ef180264788bd0792e95580e3ece7028c4cb Mon Sep 17 00:00:00 2001 From: Liran Cohen Date: Tue, 19 Sep 2023 11:31:16 -0400 Subject: [PATCH 06/20] clean up sync interface, add new tests, remove chaos script --- .github/workflows/tests-ci.yml | 2 + packages/agent/src/sync-manager.ts | 132 ++-- .../agent/tests/chaos/chaos-monkey.spec.ts | 218 ------- packages/agent/tests/sync-manager.spec.ts | 565 ++++++++++-------- 4 files changed, 379 insertions(+), 538 deletions(-) delete mode 100644 packages/agent/tests/chaos/chaos-monkey.spec.ts diff --git a/.github/workflows/tests-ci.yml b/.github/workflows/tests-ci.yml index 434ed8151..bd203d4ad 100644 --- a/.github/workflows/tests-ci.yml +++ b/.github/workflows/tests-ci.yml @@ -63,6 +63,7 @@ jobs: run: npm run test:node --ws -- --color env: TEST_DWN_URLS: http://localhost:3000,http://localhost:3001 + MIN_SYNC_INTERVAL: 100 - name: Upload test coverage to Codecov uses: codecov/codecov-action@eaaf4bedf32dbdc6b720b63067d99c4d77d6047d # v3.1.4 @@ -113,3 +114,4 @@ jobs: run: npm run test:browser --ws -- --color env: TEST_DWN_URLS: http://localhost:3000,http://localhost:3001 + MIN_SYNC_INTERVAL: 100 diff --git a/packages/agent/src/sync-manager.ts b/packages/agent/src/sync-manager.ts index e8e55d270..28c23f16d 100644 --- a/packages/agent/src/sync-manager.ts +++ b/packages/agent/src/sync-manager.ts @@ -1,5 +1,6 @@ import type { BatchOperation } from 'level'; import type { + Event, EventsGetReply, GenericMessage, MessagesGetReply, @@ -16,10 +17,29 @@ import type { Web5ManagedAgent } from './types/agent.js'; import { webReadableToIsomorphicNodeReadable } from './utils.js'; +const checkNumber = (n?: string) => isNaN(parseInt(n || '')) ? undefined : parseInt(n || ''); +// arbitrary number for now, but we should enforce some sane minimum +// allow for environment to set a minimum +const MIN_SYNC_INTERVAL = checkNumber(process?.env.MIN_SYNC_INTERVAL) ?? 5000; + +type SyncDirection = 'pull' | 'push'; + +interface SyncOptions { + interval?: number + direction?: SyncDirection +} + export interface SyncManager { agent: Web5ManagedAgent; registerIdentity(options: { did: string }): Promise; - startSync(options: { interval: number }): Promise; + + // sync will run the sync operation once. + // if a direction is passed, it will only sync in that direction. + sync(direction?: SyncDirection): Promise; + + // startSync will run sync on an interval + // if a direction is provided, it will only sync in that direction. + startSync(options?: SyncOptions): Promise; stopSync(): void; } @@ -91,7 +111,7 @@ export class SyncManagerLevel implements SyncManager { await this._db.clear(); } - public async pull(): Promise { + private async pull(): Promise { const pullQueue = this.getPullQueue(); const pullJobs = await pullQueue.iterator().all(); @@ -219,7 +239,7 @@ export class SyncManagerLevel implements SyncManager { await pullQueue.batch(deleteOperations as any); } - public async push(): Promise { + private async push(): Promise { const pushQueue = this.getPushQueue(); const pushJobs = await pushQueue.iterator().all(); @@ -285,24 +305,8 @@ export class SyncManagerLevel implements SyncManager { await registeredIdentities.put(did, ''); } - public startSync(options: { - interval: number - }): Promise { - const { interval } = options; - - // interval 0 means start instantly and don't repeat. - if (interval === 0) { - return new Promise( async (resolve,reject) => { - try { - await this.sync(); - resolve(); - - } catch(error) { - reject(error); - } - }) - } - + public startSync(options: SyncOptions = {}): Promise { + const { interval = MIN_SYNC_INTERVAL, direction } = options; return new Promise((resolve, reject) => { if (this._syncIntervalId) { clearInterval(this._syncIntervalId); @@ -310,12 +314,12 @@ export class SyncManagerLevel implements SyncManager { this._syncIntervalId = setInterval(async () => { try { - await this.sync(); + await this.sync(direction); } catch (error) { this.stopSync(); reject(error); } - }, interval); + }, interval >= MIN_SYNC_INTERVAL ? interval : MIN_SYNC_INTERVAL); }); } @@ -326,10 +330,13 @@ export class SyncManagerLevel implements SyncManager { } } - private async sync(): Promise { - await this.enqueueOperations(); - await this.push(); - await this.pull(); + public async sync(direction?: SyncDirection): Promise { + await this.enqueueOperations(direction); + // enqueue operations handles the direction logic. + // we can just run both operations and only enqueued events will sync. + await Promise.all([ + this.push(), this.pull() + ]); } private createOperationKey(did: string, dwnUrl: string, watermark: string, messageCid: string): string { @@ -338,47 +345,47 @@ export class SyncManagerLevel implements SyncManager { private dbBatchOperationPut(did: string, dwnUrl: string, watermark: string, messageCid: string): DbBatchOperation { const key = this.createOperationKey(did, dwnUrl, watermark, messageCid); - return { type: 'put', key, value: '' } + return { type: 'put', key, value: '' }; } - async enqueueOperations(direction?: 'pull' | 'push') { + /** + * Enqueues the operations needed for sync based on the supplied direction. + * + * @param direction the optional direction in which you would like to enqueue sync events for. + * If no direction is supplied it will sync in both directions. + */ + async enqueueOperations(direction?: SyncDirection) { const syncPeerState = await this.getSyncPeerState(); - for (let syncState of syncPeerState) { - const localEvents = await this.getLocalDwnEvents({ - did: syncState.did, - watermark: syncState.pushWatermark, - }); - const remoteEvents = await this.getRemoteEvents({ - did: syncState.did, - dwnUrl: syncState.dwnUrl, - watermark: syncState.pullWatermark, - }); - const pullOperations: DbBatchOperation[] = []; - if (direction === undefined || direction === 'pull') { - remoteEvents.forEach(remoteEvent => { - if (localEvents.findIndex(localEvent => localEvent.messageCid === remoteEvent.messageCid) < 0) { - const operation = this.dbBatchOperationPut(syncState.did, syncState.dwnUrl, remoteEvent.watermark, remoteEvent.messageCid); - pullOperations.push(operation); - } + for (let syncState of syncPeerState) { + const batchPromises = []; + if (direction === undefined || direction === 'push') { + const localEventsPromise = this.getLocalDwnEvents({ + did : syncState.did, + watermark : syncState.pushWatermark, }); + batchPromises.push(this.batchOperations('push', localEventsPromise, syncState)); } - const pushOperations: DbBatchOperation[] = []; - if (direction === undefined || direction === 'push') { - localEvents.forEach(localEvent => { - if(remoteEvents.findIndex(remoteEvent => remoteEvent.messageCid === localEvent.messageCid) < 0) { - const operation = this.dbBatchOperationPut(syncState.did, syncState.dwnUrl, localEvent.watermark, localEvent.messageCid); - pushOperations.push(operation); - } + if(direction === undefined || direction === 'pull') { + const remoteEventsPromise = this.getRemoteEvents({ + did : syncState.did, + dwnUrl : syncState.dwnUrl, + watermark : syncState.pullWatermark, }); + batchPromises.push(this.batchOperations('pull', remoteEventsPromise, syncState)); } - - await this.getPullQueue().batch(pullOperations as any); - await this.getPushQueue().batch(pushOperations as any); + await Promise.all(batchPromises); } } + private async batchOperations(direction: SyncDirection, eventsPromise: Promise, syncState: SyncState): Promise { + const { did, dwnUrl } = syncState; + const operations: DbBatchOperation[] = []; + (await eventsPromise).forEach(e => operations.push(this.dbBatchOperationPut(did, dwnUrl, e.watermark, e.messageCid))); + return direction === 'pull' ? this.getPullQueue().batch(operations as any) : this.getPushQueue().batch(operations as any); + } + private async getLocalDwnEvents(options:{ did: string, watermark?: string }) { const { did, watermark } = options; let eventsReply = {} as EventsGetReply; @@ -394,6 +401,7 @@ export class SyncManagerLevel implements SyncManager { private async getRemoteEvents(options: { did: string, dwnUrl: string, watermark?: string }) { const { did, dwnUrl, watermark } = options; + let eventsReply = {} as EventsGetReply; const eventsGetMessage = await this.agent.dwnManager.createMessage({ @@ -419,7 +427,7 @@ export class SyncManagerLevel implements SyncManager { author: string, messageCid: string ): Promise { - let messagesGetResponse = await this.agent.dwnManager.processRequest({ + const messagesGetResponse = await this.agent.dwnManager.processRequest({ author : author, target : author, messageType : 'MessagesGet', @@ -540,16 +548,16 @@ export class SyncManagerLevel implements SyncManager { const wm = await watermarkStore.get(wmKey); const split = wm.split('~'); if (split.length !== 2) { - return {} + return {}; } let pull; let push; if (split[0] !== '0') { - pull = split[0] + pull = split[0]; } if (split[1] !== '0') { - push = split[1] + push = split[1]; } return { pull, push }; @@ -567,11 +575,11 @@ export class SyncManagerLevel implements SyncManager { const watermarkStore = this.getWatermarkStore(); if (pullWatermark === undefined) { - pullWatermark = '0' + pullWatermark = '0'; } if (pushWatermark === undefined) { - pushWatermark = '0' + pushWatermark = '0'; } await watermarkStore.put(wmKey, `${pullWatermark}~${pushWatermark}`); diff --git a/packages/agent/tests/chaos/chaos-monkey.spec.ts b/packages/agent/tests/chaos/chaos-monkey.spec.ts deleted file mode 100644 index 664095e9a..000000000 --- a/packages/agent/tests/chaos/chaos-monkey.spec.ts +++ /dev/null @@ -1,218 +0,0 @@ -import type { PortableDid } from '@web5/dids'; - -import { expect } from 'chai'; -import * as sinon from 'sinon'; - -import type { ManagedIdentity } from '../../src/identity-manager.js' - -import { testDwnUrls } from '../test-config.js' -import { TestAgent } from '../utils/test-agent.js'; -import { TestManagedAgent } from '../../src/test-managed-agent.js'; - -import { randomUuid } from '@web5/crypto/utils'; -import { DwnRequest, DwnResponse, ProcessDwnRequest, SendDwnRequest } from '../../src/index.js'; -import { DataStream, RecordsDeleteMessage, RecordsWrite, RecordsWriteMessage } from '@tbd54566975/dwn-sdk-js'; -import _Readable from 'readable-stream'; - -const checkChaos = (): boolean => { - return process.env.CHAOS_ENV === 'true' -} - -describe('Chaos Monkey', () => { - describe('Sync Manager', function () { - this.timeout(120_000); - const records:DwnResponse[] = []; - - const lean: string|undefined = process.env.SYNC_LEAN === 'pull' ? 'pull' : - process.env.SYNC_LEAN === 'push' ? 'push' : undefined; - - const DEFAULT_SYNC_ROUNDS = 10; - const DEFAULT_BATCH_ROUNDS = 10; - const DEFAULT_BATCH_COUNT = 5; - const rounds: number = !isNaN(parseInt(process.env.SYNC_ROUNDS || 'not-a-number')) ? parseInt(process.env.SYNC_ROUNDS!) : DEFAULT_SYNC_ROUNDS; - const batchRounds: number = !isNaN(parseInt(process.env.BATCH_ROUNDS || 'not-a-number')) ? parseInt(process.env.BATCH_ROUNDS!) : DEFAULT_BATCH_ROUNDS; - const batchCount: number = !isNaN(parseInt(process.env.BATCH_COUNT || 'not-a-number')) ? parseInt(process.env.BATCH_COUNT!) : DEFAULT_BATCH_COUNT; - - let alice: ManagedIdentity; - let bob: ManagedIdentity; - let carol: ManagedIdentity; - let dave: ManagedIdentity; - - let aliceDid: PortableDid; - let bobDid: PortableDid; - let carolDid: PortableDid; - let daveDid: PortableDid; - let testAgent: TestManagedAgent; - - - const testWriteMessage = (did:string, id: string): ProcessDwnRequest => { - return { - author : did, - target : did, - messageType : 'RecordsWrite', - messageOptions : { - schema : 'schema', - dataFormat : 'application/json' - }, - dataStream: new Blob([ `Hello, ${id}`]) - }; - } - - const testQueryMessage = (did:string): ProcessDwnRequest => { - return { - author : did, - target : did, - messageType : 'RecordsQuery', - messageOptions: { filter: { schema: 'schema', dataFormat: 'application/json' } }, - }; - } - - const testReadMessage = (did:string, recordId: string): SendDwnRequest => { - return { - author : did, - target : did, - messageType : 'RecordsRead', - messageOptions : { recordId } - }; - } - - before(async () => { - testAgent = await TestManagedAgent.create({ - agentClass : TestAgent, - agentStores : 'dwn' - }); - }); - - beforeEach(async () => { - records.splice(0, records.length); - await testAgent.clearStorage(); - await testAgent.createAgentDid(); - // Create a new Identity to author the DWN messages. - ({ did: aliceDid } = await testAgent.createIdentity({ testDwnUrls })); - alice = await testAgent.agent.identityManager.import({ - did : aliceDid, - identity : { name: 'Alice', did: aliceDid.did }, - kms : 'local' - }); - ({ did: bobDid } = await testAgent.createIdentity({ testDwnUrls })); - bob = await testAgent.agent.identityManager.import({ - did : bobDid, - identity : { name: 'Bob', did: bobDid.did }, - kms : 'local' - }); - ({ did: carolDid } = await testAgent.createIdentity({ testDwnUrls })); - carol = await testAgent.agent.identityManager.import({ - did : carolDid, - identity : { name: 'Carol', did: carolDid.did }, - kms : 'local' - }); - ({ did: daveDid} = await testAgent.createIdentity({ testDwnUrls })); - dave = await testAgent.agent.identityManager.import({ - did : daveDid, - identity : { name: 'Dave', did: daveDid.did }, - kms : 'local' - }); - - const { dwnManager } = testAgent.agent; - const startLoadMessages = Date.now(); - - const process = async (message: ProcessDwnRequest, random: number): Promise => { - - let randomMod = 2; - if (lean !== undefined) { - // create an uneven distribution - randomMod = 3; - } - - // throw in a record that both get every 11th record. - if (random % 11 === 0) return processBoth(message); - - const left = (message: ProcessDwnRequest) => { - return lean === undefined || lean === 'pull' ? dwnManager.processRequest(message as ProcessDwnRequest): dwnManager.sendRequest(message as SendDwnRequest); - } - - const right = (message: ProcessDwnRequest) => { - return lean === undefined || lean === 'pull' ? dwnManager.sendRequest(message as SendDwnRequest) : dwnManager.processRequest(message as ProcessDwnRequest); - } - - return random % randomMod === 0 ? left(message) : right(message); - }; - - - const processBoth = async (message: ProcessDwnRequest) => { - const localResponse = await dwnManager.processRequest({...message} as ProcessDwnRequest); - // copy the message, todo use createFrom?? - message = { - ...message, - messageOptions: { - ...message.messageOptions || {}, - ...(localResponse.message as RecordsDeleteMessage).descriptor - } - } - const remoteResponse = await dwnManager.sendRequest({...message} as SendDwnRequest) - expect(localResponse.messageCid).to.equal(remoteResponse.messageCid, `invalid remote and local messages`); - return remoteResponse; - } - - const randomMessage = () => { - const random = getRandomInt(0, 1234567890); - const message = testWriteMessage(alice.did, randomUuid()); - return process(message, random); - } - - const batch = (count: number) => Array(count).fill({}).map(randomMessage) - - for (const _ of Array(batchRounds).fill({})) { - records.push(...(await Promise.all(batch(batchCount)))) - } - - const endLoadMessages = Date.now(); - console.log(`loaded ${records.length} messages in ${endLoadMessages - startLoadMessages}ms`); - expect(records.every(r => r.reply.status.code === 202), `could not load messages successfully`).to.be.true; - }); - - afterEach(async () => { - await testAgent.clearStorage(); - }); - - after(async () => { - await testAgent.clearStorage(); - await testAgent.closeStorage(); - }); - - describe(`startSync() ${rounds} runs`, () => { - if (checkChaos()) { - for ( const _ of Array(rounds).fill({})) { - it('sync a lot of records', async () => { - await testAgent.agent.syncManager.registerIdentity({ - did: alice.did - }); - - // get remote and local before sync; - const testQuery = testQueryMessage(alice.did); - let { reply } = await testAgent.agent.dwnManager.processRequest(testQuery); - let { reply: replyRemote } = await testAgent.agent.dwnManager.sendRequest(testQuery); - - const startSync = Date.now(); - await testAgent.agent.syncManager.startSync({ interval: 0 }); - const endSync = Date.now(); - - const remoteEntries = (replyRemote.entries || []).filter(e => (reply.entries || []).findIndex(le => (le as RecordsWriteMessage).recordId === (e as RecordsWriteMessage).recordId) < 0); - const localEntries = (reply.entries || []).filter(e => (replyRemote.entries || []).findIndex(re => (re as RecordsWriteMessage).recordId === (e as RecordsWriteMessage).recordId) < 0) - const commonItemsLength = (reply.entries!.length + replyRemote.entries!.length) - records.length; - - console.log(`sync time:\t\t${endSync-startSync} for ${records.length} records\nlocal records:\t\t${reply.entries!.length}/${localEntries.length} unique\nremote records:\t\t${replyRemote.entries!.length}/${remoteEntries.length} unique\ncommon records:\t\t${commonItemsLength}\n\n`) - expect(endSync-startSync).to.be.lt(60_000); - ({ reply } = await testAgent.agent.dwnManager.processRequest(testQuery)); - expect(reply.status.code).to.equal(200); - expect(reply.entries!.length).to.equal(records.length); - }).timeout(100_000); - } - } - }); - }); -}); - -function getRandomInt(min: number, max: number) { - return Math.floor(Math.random() * (Math.ceil(max - min)) + Math.ceil(min)); -} \ No newline at end of file diff --git a/packages/agent/tests/sync-manager.spec.ts b/packages/agent/tests/sync-manager.spec.ts index 7edd9cd86..59c9dc680 100644 --- a/packages/agent/tests/sync-manager.spec.ts +++ b/packages/agent/tests/sync-manager.spec.ts @@ -11,8 +11,8 @@ import { TestAgent, sleep } from './utils/test-agent.js'; import { SyncManagerLevel } from '../src/sync-manager.js'; import { TestManagedAgent } from '../src/test-managed-agent.js'; -import { RecordsQueryReply, RecordsWriteMessage } from '@tbd54566975/dwn-sdk-js'; import { ProcessDwnRequest } from '../src/index.js'; +import { RecordsQueryReply, RecordsWriteMessage } from '@tbd54566975/dwn-sdk-js'; describe('SyncManagerLevel', () => { describe('get agent', () => { @@ -74,12 +74,12 @@ describe('SyncManagerLevel', () => { await testAgent.closeStorage(); }); - describe('startSync()', () => { + describe('sync()', () => { it('takes no action if no identities are registered', async () => { const didResolveSpy = sinon.spy(testAgent.agent.didResolver, 'resolve'); const sendDwnRequestSpy = sinon.spy(testAgent.agent.rpcClient, 'sendDwnRequest'); - await testAgent.agent.syncManager.startSync({ interval: 0 }); + await testAgent.agent.syncManager.sync(); // Verify DID resolution and DWN requests did not occur. expect(didResolveSpy.notCalled).to.be.true; @@ -89,259 +89,6 @@ describe('SyncManagerLevel', () => { sendDwnRequestSpy.restore(); }); - it('synchronizes records for 1 identity from remove DWN to local DWN', async () => { - // Write a test record to Alice's remote DWN. - let writeResponse = await testAgent.agent.dwnManager.sendRequest({ - author : alice.did, - target : alice.did, - messageType : 'RecordsWrite', - messageOptions : { - dataFormat: 'text/plain' - }, - dataStream: new Blob(['Hello, world!']) - }); - - // Get the record ID of the test record. - const testRecordId = (writeResponse.message as RecordsWriteMessage).recordId; - - // Confirm the record does NOT exist on Alice's local DWN. - let queryResponse = await testAgent.agent.dwnManager.processRequest({ - author : alice.did, - target : alice.did, - messageType : 'RecordsQuery', - messageOptions : { filter: { recordId: testRecordId } } - }); - let localDwnQueryReply = queryResponse.reply as RecordsQueryReply; - expect(localDwnQueryReply.status.code).to.equal(200); // Query was successfully executed. - expect(localDwnQueryReply.entries).to.have.length(0); // Record doesn't exist on local DWN. - - // Register Alice's DID to be synchronized. - await testAgent.agent.syncManager.registerIdentity({ - did: alice.did - }); - - // Execute Sync to pull all records from Alice's remote DWN to Alice's local DWN. - await testAgent.agent.syncManager.startSync({ interval: 0 }); - - // Confirm the record now DOES exist on Alice's local DWN. - queryResponse = await testAgent.agent.dwnManager.processRequest({ - author : alice.did, - target : alice.did, - messageType : 'RecordsQuery', - messageOptions : { filter: { recordId: testRecordId } } - }); - localDwnQueryReply = queryResponse.reply as RecordsQueryReply; - expect(localDwnQueryReply.status.code).to.equal(200); // Query was successfully executed. - expect(localDwnQueryReply.entries).to.have.length(1); // Record does exist on local DWN. - }); - - - it('synchronizes records for multiple identities from remote DWN to local DWN', async () => { - // Create a second Identity to author the DWN messages. - const { did: bobDid } = await testAgent.createIdentity({ testDwnUrls }); - const bob = await testAgent.agent.identityManager.import({ - did : bobDid, - identity : { name: 'Bob', did: bobDid.did }, - kms : 'local' - }); - - // Write a test record to Alice's remote DWN. - let writeResponse = await testAgent.agent.dwnManager.sendRequest({ - author : alice.did, - target : alice.did, - messageType : 'RecordsWrite', - messageOptions : { - dataFormat: 'text/plain' - }, - dataStream: new Blob(['Hello, Bob!']) - }); - - // Get the record ID of Alice's test record. - const testRecordIdAlice = (writeResponse.message as RecordsWriteMessage).recordId; - - // Write a test record to Bob's remote DWN. - writeResponse = await testAgent.agent.dwnManager.sendRequest({ - author : bob.did, - target : bob.did, - messageType : 'RecordsWrite', - messageOptions : { - dataFormat: 'text/plain' - }, - dataStream: new Blob(['Hello, Alice!']) - }); - - // Get the record ID of Bob's test record. - const testRecordIdBob = (writeResponse.message as RecordsWriteMessage).recordId; - - // Register Alice's DID to be synchronized. - await testAgent.agent.syncManager.registerIdentity({ - did: alice.did - }); - - // Register Bob's DID to be synchronized. - await testAgent.agent.syncManager.registerIdentity({ - did: bob.did - }); - - // Execute Sync to pull all records from Alice's and Bob's remove DWNs to their local DWNs. - await testAgent.agent.syncManager.startSync({ interval: 0 }); - - // Confirm the Alice test record exist on Alice's local DWN. - let queryResponse = await testAgent.agent.dwnManager.processRequest({ - author : alice.did, - target : alice.did, - messageType : 'RecordsQuery', - messageOptions : { filter: { recordId: testRecordIdAlice } } - }); - let localDwnQueryReply = queryResponse.reply as RecordsQueryReply; - expect(localDwnQueryReply.status.code).to.equal(200); // Query was successfully executed. - expect(localDwnQueryReply.entries).to.have.length(1); // Record does exist on local DWN. - - // Confirm the Bob test record exist on Bob's local DWN. - queryResponse = await testAgent.agent.dwnManager.sendRequest({ - author : bob.did, - target : bob.did, - messageType : 'RecordsQuery', - messageOptions : { filter: { recordId: testRecordIdBob } } - }); - localDwnQueryReply = queryResponse.reply as RecordsQueryReply; - expect(localDwnQueryReply.status.code).to.equal(200); // Query was successfully executed. - expect(localDwnQueryReply.entries).to.have.length(1); // Record does exist on local DWN. - }).timeout(5000); - - it('takes no action if no identities are registered', async () => { - const didResolveSpy = sinon.spy(testAgent.agent.didResolver, 'resolve'); - const processRequestSpy = sinon.spy(testAgent.agent.dwnManager, 'processRequest'); - - await testAgent.agent.syncManager.startSync({ interval: 0 }); - - // Verify DID resolution and DWN requests did not occur. - expect(didResolveSpy.notCalled).to.be.true; - expect(processRequestSpy.notCalled).to.be.true; - - didResolveSpy.restore(); - processRequestSpy.restore(); - }); - - it('synchronizes records for 1 identity from local DWN to remote DWN', async () => { - // Write a record that we can use for this test. - let writeResponse = await testAgent.agent.dwnManager.processRequest({ - author : alice.did, - target : alice.did, - messageType : 'RecordsWrite', - messageOptions : { - dataFormat: 'text/plain' - }, - dataStream: new Blob(['Hello, world!']) - }); - - // Get the record ID of the test record. - const testRecordId = (writeResponse.message as RecordsWriteMessage).recordId; - - // Confirm the record does NOT exist on Alice's remote DWN. - let queryResponse = await testAgent.agent.dwnManager.sendRequest({ - author : alice.did, - target : alice.did, - messageType : 'RecordsQuery', - messageOptions : { filter: { recordId: testRecordId } } - }); - let remoteDwnQueryReply = queryResponse.reply as RecordsQueryReply; - expect(remoteDwnQueryReply.status.code).to.equal(200); // Query was successfully executed. - expect(remoteDwnQueryReply.entries).to.have.length(0); // Record doesn't exist on remote DWN. - - // Register Alice's DID to be synchronized. - await testAgent.agent.syncManager.registerIdentity({ - did: alice.did - }); - - // Execute Sync to push all records from Alice's local DWN to Alice's remote DWN. - await testAgent.agent.syncManager.startSync({ interval: 0 }); - - // Confirm the record now DOES exist on Alice's remote DWN. - queryResponse = await testAgent.agent.dwnManager.sendRequest({ - author : alice.did, - target : alice.did, - messageType : 'RecordsQuery', - messageOptions : { filter: { recordId: testRecordId } } - }); - remoteDwnQueryReply = queryResponse.reply as RecordsQueryReply; - expect(remoteDwnQueryReply.status.code).to.equal(200); // Query was successfully executed. - expect(remoteDwnQueryReply.entries).to.have.length(1); // Record does exist on remote DWN. - }); - - it('synchronizes records for multiple identities from local DWN to remote DWN', async () => { - // Create a second Identity to author the DWN messages. - const { did: bobDid } = await testAgent.createIdentity({ testDwnUrls }); - const bob = await testAgent.agent.identityManager.import({ - did : bobDid, - identity : { name: 'Bob', did: bobDid.did }, - kms : 'local' - }); - - // Write a test record to Alice's local DWN. - let writeResponse = await testAgent.agent.dwnManager.processRequest({ - author : alice.did, - target : alice.did, - messageType : 'RecordsWrite', - messageOptions : { - dataFormat: 'text/plain' - }, - dataStream: new Blob(['Hello, Bob!']) - }); - - // Get the record ID of Alice's test record. - const testRecordIdAlice = (writeResponse.message as RecordsWriteMessage).recordId; - - // Write a test record to Bob's local DWN. - writeResponse = await testAgent.agent.dwnManager.processRequest({ - author : bob.did, - target : bob.did, - messageType : 'RecordsWrite', - messageOptions : { - dataFormat: 'text/plain' - }, - dataStream: new Blob(['Hello, Alice!']) - }); - - // Get the record ID of Bob's test record. - const testRecordIdBob = (writeResponse.message as RecordsWriteMessage).recordId; - - // Register Alice's DID to be synchronized. - await testAgent.agent.syncManager.registerIdentity({ - did: alice.did - }); - - // Register Bob's DID to be synchronized. - await testAgent.agent.syncManager.registerIdentity({ - did: bob.did - }); - - // Execute Sync to push all records from Alice's and Bob's local DWNs to their remote DWNs. - await testAgent.agent.syncManager.startSync({ interval: 0 }); - - // Confirm the Alice test record exist on Alice's remote DWN. - let queryResponse = await testAgent.agent.dwnManager.sendRequest({ - author : alice.did, - target : alice.did, - messageType : 'RecordsQuery', - messageOptions : { filter: { recordId: testRecordIdAlice } } - }); - let remoteDwnQueryReply = queryResponse.reply as RecordsQueryReply; - expect(remoteDwnQueryReply.status.code).to.equal(200); // Query was successfully executed. - expect(remoteDwnQueryReply.entries).to.have.length(1); // Record does exist on remote DWN. - - // Confirm the Bob test record exist on Bob's remote DWN. - queryResponse = await testAgent.agent.dwnManager.sendRequest({ - author : bob.did, - target : bob.did, - messageType : 'RecordsQuery', - messageOptions : { filter: { recordId: testRecordIdBob } } - }); - remoteDwnQueryReply = queryResponse.reply as RecordsQueryReply; - expect(remoteDwnQueryReply.status.code).to.equal(200); // Query was successfully executed. - expect(remoteDwnQueryReply.entries).to.have.length(1); // Record does exist on remote DWN. - }).timeout(5000); - it('synchronizes data in both directions for a single identity', async () => { await testAgent.agent.syncManager.registerIdentity({ @@ -390,7 +137,7 @@ describe('SyncManagerLevel', () => { expect(remoteReply.entries?.length).to.equal(remoteRecords.size); expect(remoteReply.entries?.every(e => remoteRecords.has((e as RecordsWriteMessage).recordId))).to.be.true; - await testAgent.agent.syncManager.startSync({ interval: 0 }); + await testAgent.agent.syncManager.sync(); const records = new Set([...remoteRecords, ...localRecords]); const { reply: allRemoteReply } = await testAgent.agent.dwnManager.sendRequest(everythingQuery()); @@ -403,7 +150,309 @@ describe('SyncManagerLevel', () => { expect(allLocalReply.entries?.length).to.equal(records.size); expect(allLocalReply.entries?.every(e => records.has((e as RecordsWriteMessage).recordId))).to.be.true; - }).timeout(5000); + }).timeout(10_000); + + // tests must be run with a low MIN_SYNC_INTERVAL + it('check sync interval input', async () => { + const syncSpy = sinon.spy(testAgent.agent.syncManager, 'sync'); + await testAgent.agent.syncManager.registerIdentity({ + did: alice.did + }); + testAgent.agent.syncManager.startSync({ interval: 300 }); + await sleep(1000); + expect(syncSpy.callCount).to.equal(3); + syncSpy.restore(); + }); + + // test must be run with MIN_SYNC_INTERVAL=100 + it('check sync default value passed', async () => { + const setIntervalSpy = sinon.spy(global, 'setInterval'); + await testAgent.agent.syncManager.registerIdentity({ + did: alice.did + }); + testAgent.agent.syncManager.startSync({ }); + await sleep(500); + expect(setIntervalSpy.calledOnce).to.be.true; + expect(setIntervalSpy.getCall(0).args.at(1)).to.equal(100); + setIntervalSpy.restore(); + }); + + describe('batchOperations()', () => { + it('should only call once per remote DWN if pull direction is passed', async () => { + const batchOperationsSpy = sinon.spy(testAgent.agent.syncManager as any, 'batchOperations'); + await testAgent.agent.syncManager.registerIdentity({ + did: alice.did + }); + await testAgent.agent.syncManager.sync('pull'); + expect(batchOperationsSpy.callCount).to.equal(testDwnUrls.length, 'pull direction is passed'); + expect(batchOperationsSpy.args.filter(arg => arg.includes('pull')).length).to.equal(1, `args must include pull ${batchOperationsSpy.args[0]}`); + batchOperationsSpy.restore(); + }); + + it('should only call once if push direction is passed', async () => { + const batchOperationsSpy = sinon.spy(testAgent.agent.syncManager as any, 'batchOperations'); + await testAgent.agent.syncManager.registerIdentity({ + did: alice.did + }); + await testAgent.agent.syncManager.sync('push'); + expect(batchOperationsSpy.callCount).to.equal(1, 'push direction is passed'); + expect(batchOperationsSpy.args.filter(arg => arg.includes('push')).length).to.equal(1, `args must include push ${batchOperationsSpy.args[0]}`); + batchOperationsSpy.restore(); + }); + + it('should be called twice if no direction is passed', async () => { + const batchOperationsSpy = sinon.spy(testAgent.agent.syncManager as any, 'batchOperations'); + await testAgent.agent.syncManager.registerIdentity({ + did: alice.did + }); + await testAgent.agent.syncManager.sync(); + expect(batchOperationsSpy.callCount).to.equal(2, 'no direction is passed'); + expect(batchOperationsSpy.args.filter(arg => arg.includes('pull')).length).to.equal(1, `args must include one pull ${batchOperationsSpy.args}`); + expect(batchOperationsSpy.args.filter(arg => arg.includes('push')).length).to.equal(1, `args must include one push ${batchOperationsSpy.args}`); + batchOperationsSpy.restore(); + }); + }); + describe('pull', () => { + it('synchronizes records for 1 identity from remote DWN to local DWN', async () => { + // Write a test record to Alice's remote DWN. + let writeResponse = await testAgent.agent.dwnManager.sendRequest({ + author : alice.did, + target : alice.did, + messageType : 'RecordsWrite', + messageOptions : { + dataFormat: 'text/plain' + }, + dataStream: new Blob(['Hello, world!']) + }); + + // Get the record ID of the test record. + const testRecordId = (writeResponse.message as RecordsWriteMessage).recordId; + + // Confirm the record does NOT exist on Alice's local DWN. + let queryResponse = await testAgent.agent.dwnManager.processRequest({ + author : alice.did, + target : alice.did, + messageType : 'RecordsQuery', + messageOptions : { filter: { recordId: testRecordId } } + }); + let localDwnQueryReply = queryResponse.reply as RecordsQueryReply; + expect(localDwnQueryReply.status.code).to.equal(200); // Query was successfully executed. + expect(localDwnQueryReply.entries).to.have.length(0); // Record doesn't exist on local DWN. + + // Register Alice's DID to be synchronized. + await testAgent.agent.syncManager.registerIdentity({ + did: alice.did + }); + + // Execute Sync to pull all records from Alice's remote DWN to Alice's local DWN. + await testAgent.agent.syncManager.sync('pull'); + + // Confirm the record now DOES exist on Alice's local DWN. + queryResponse = await testAgent.agent.dwnManager.processRequest({ + author : alice.did, + target : alice.did, + messageType : 'RecordsQuery', + messageOptions : { filter: { recordId: testRecordId } } + }); + localDwnQueryReply = queryResponse.reply as RecordsQueryReply; + expect(localDwnQueryReply.status.code).to.equal(200); // Query was successfully executed. + expect(localDwnQueryReply.entries).to.have.length(1); // Record does exist on local DWN. + }); + + it('synchronizes records for multiple identities from remote DWN to local DWN', async () => { + // Create a second Identity to author the DWN messages. + const { did: bobDid } = await testAgent.createIdentity({ testDwnUrls }); + const bob = await testAgent.agent.identityManager.import({ + did : bobDid, + identity : { name: 'Bob', did: bobDid.did }, + kms : 'local' + }); + + // Write a test record to Alice's remote DWN. + let writeResponse = await testAgent.agent.dwnManager.sendRequest({ + author : alice.did, + target : alice.did, + messageType : 'RecordsWrite', + messageOptions : { + dataFormat: 'text/plain' + }, + dataStream: new Blob(['Hello, Bob!']) + }); + + // Get the record ID of Alice's test record. + const testRecordIdAlice = (writeResponse.message as RecordsWriteMessage).recordId; + + // Write a test record to Bob's remote DWN. + writeResponse = await testAgent.agent.dwnManager.sendRequest({ + author : bob.did, + target : bob.did, + messageType : 'RecordsWrite', + messageOptions : { + dataFormat: 'text/plain' + }, + dataStream: new Blob(['Hello, Alice!']) + }); + + // Get the record ID of Bob's test record. + const testRecordIdBob = (writeResponse.message as RecordsWriteMessage).recordId; + + // Register Alice's DID to be synchronized. + await testAgent.agent.syncManager.registerIdentity({ + did: alice.did + }); + + // Register Bob's DID to be synchronized. + await testAgent.agent.syncManager.registerIdentity({ + did: bob.did + }); + + // Execute Sync to pull all records from Alice's and Bob's remote DWNs to their local DWNs. + await testAgent.agent.syncManager.sync('pull'); + + // Confirm the Alice test record exist on Alice's local DWN. + let queryResponse = await testAgent.agent.dwnManager.processRequest({ + author : alice.did, + target : alice.did, + messageType : 'RecordsQuery', + messageOptions : { filter: { recordId: testRecordIdAlice } } + }); + let localDwnQueryReply = queryResponse.reply as RecordsQueryReply; + expect(localDwnQueryReply.status.code).to.equal(200); // Query was successfully executed. + expect(localDwnQueryReply.entries).to.have.length(1); // Record does exist on local DWN. + + // Confirm the Bob test record exist on Bob's local DWN. + queryResponse = await testAgent.agent.dwnManager.sendRequest({ + author : bob.did, + target : bob.did, + messageType : 'RecordsQuery', + messageOptions : { filter: { recordId: testRecordIdBob } } + }); + localDwnQueryReply = queryResponse.reply as RecordsQueryReply; + expect(localDwnQueryReply.status.code).to.equal(200); // Query was successfully executed. + expect(localDwnQueryReply.entries).to.have.length(1); // Record does exist on local DWN. + }).timeout(5000); + }); + + describe('push', () => { + it('synchronizes records for 1 identity from local DWN to remote DWN', async () => { + // Write a record that we can use for this test. + let writeResponse = await testAgent.agent.dwnManager.processRequest({ + author : alice.did, + target : alice.did, + messageType : 'RecordsWrite', + messageOptions : { + dataFormat: 'text/plain' + }, + dataStream: new Blob(['Hello, world!']) + }); + + // Get the record ID of the test record. + const testRecordId = (writeResponse.message as RecordsWriteMessage).recordId; + + // Confirm the record does NOT exist on Alice's remote DWN. + let queryResponse = await testAgent.agent.dwnManager.sendRequest({ + author : alice.did, + target : alice.did, + messageType : 'RecordsQuery', + messageOptions : { filter: { recordId: testRecordId } } + }); + let remoteDwnQueryReply = queryResponse.reply as RecordsQueryReply; + expect(remoteDwnQueryReply.status.code).to.equal(200); // Query was successfully executed. + expect(remoteDwnQueryReply.entries).to.have.length(0); // Record doesn't exist on remote DWN. + + // Register Alice's DID to be synchronized. + await testAgent.agent.syncManager.registerIdentity({ + did: alice.did + }); + + // Execute Sync to push all records from Alice's local DWN to Alice's remote DWN. + await testAgent.agent.syncManager.sync('push'); + + // Confirm the record now DOES exist on Alice's remote DWN. + queryResponse = await testAgent.agent.dwnManager.sendRequest({ + author : alice.did, + target : alice.did, + messageType : 'RecordsQuery', + messageOptions : { filter: { recordId: testRecordId } } + }); + remoteDwnQueryReply = queryResponse.reply as RecordsQueryReply; + expect(remoteDwnQueryReply.status.code).to.equal(200); // Query was successfully executed. + expect(remoteDwnQueryReply.entries).to.have.length(1); // Record does exist on remote DWN. + }); + + it('synchronizes records for multiple identities from local DWN to remote DWN', async () => { + // Create a second Identity to author the DWN messages. + const { did: bobDid } = await testAgent.createIdentity({ testDwnUrls }); + const bob = await testAgent.agent.identityManager.import({ + did : bobDid, + identity : { name: 'Bob', did: bobDid.did }, + kms : 'local' + }); + + // Write a test record to Alice's local DWN. + let writeResponse = await testAgent.agent.dwnManager.processRequest({ + author : alice.did, + target : alice.did, + messageType : 'RecordsWrite', + messageOptions : { + dataFormat: 'text/plain' + }, + dataStream: new Blob(['Hello, Bob!']) + }); + + // Get the record ID of Alice's test record. + const testRecordIdAlice = (writeResponse.message as RecordsWriteMessage).recordId; + + // Write a test record to Bob's local DWN. + writeResponse = await testAgent.agent.dwnManager.processRequest({ + author : bob.did, + target : bob.did, + messageType : 'RecordsWrite', + messageOptions : { + dataFormat: 'text/plain' + }, + dataStream: new Blob(['Hello, Alice!']) + }); + + // Get the record ID of Bob's test record. + const testRecordIdBob = (writeResponse.message as RecordsWriteMessage).recordId; + + // Register Alice's DID to be synchronized. + await testAgent.agent.syncManager.registerIdentity({ + did: alice.did + }); + + // Register Bob's DID to be synchronized. + await testAgent.agent.syncManager.registerIdentity({ + did: bob.did + }); + + // Execute Sync to push all records from Alice's and Bob's local DWNs to their remote DWNs. + await testAgent.agent.syncManager.sync('push'); + + // Confirm the Alice test record exist on Alice's remote DWN. + let queryResponse = await testAgent.agent.dwnManager.sendRequest({ + author : alice.did, + target : alice.did, + messageType : 'RecordsQuery', + messageOptions : { filter: { recordId: testRecordIdAlice } } + }); + let remoteDwnQueryReply = queryResponse.reply as RecordsQueryReply; + expect(remoteDwnQueryReply.status.code).to.equal(200); // Query was successfully executed. + expect(remoteDwnQueryReply.entries).to.have.length(1); // Record does exist on remote DWN. + + // Confirm the Bob test record exist on Bob's remote DWN. + queryResponse = await testAgent.agent.dwnManager.sendRequest({ + author : bob.did, + target : bob.did, + messageType : 'RecordsQuery', + messageOptions : { filter: { recordId: testRecordIdBob } } + }); + remoteDwnQueryReply = queryResponse.reply as RecordsQueryReply; + expect(remoteDwnQueryReply.status.code).to.equal(200); // Query was successfully executed. + expect(remoteDwnQueryReply.entries).to.have.length(1); // Record does exist on remote DWN. + }).timeout(5000); + }); }); }); }); \ No newline at end of file From e1b059bb246ae71d393bfb2fbe17bccbe2f91ac6 Mon Sep 17 00:00:00 2001 From: Liran Cohen Date: Tue, 19 Sep 2023 12:00:49 -0400 Subject: [PATCH 07/20] fix tests --- packages/agent/tests/sync-manager.spec.ts | 22 +++++++++++----------- 1 file changed, 11 insertions(+), 11 deletions(-) diff --git a/packages/agent/tests/sync-manager.spec.ts b/packages/agent/tests/sync-manager.spec.ts index 59c9dc680..4671fafac 100644 --- a/packages/agent/tests/sync-manager.spec.ts +++ b/packages/agent/tests/sync-manager.spec.ts @@ -184,32 +184,32 @@ describe('SyncManagerLevel', () => { did: alice.did }); await testAgent.agent.syncManager.sync('pull'); + batchOperationsSpy.restore(); // restore before assertions to avoid failures in other tests expect(batchOperationsSpy.callCount).to.equal(testDwnUrls.length, 'pull direction is passed'); - expect(batchOperationsSpy.args.filter(arg => arg.includes('pull')).length).to.equal(1, `args must include pull ${batchOperationsSpy.args[0]}`); - batchOperationsSpy.restore(); + expect(batchOperationsSpy.args.filter(arg => arg.includes('pull')).length).to.equal(testDwnUrls.length, `args must include pull ${batchOperationsSpy.args[0]}`); }); - it('should only call once if push direction is passed', async () => { + it('should only call once per remote DWN if push direction is passed', async () => { const batchOperationsSpy = sinon.spy(testAgent.agent.syncManager as any, 'batchOperations'); await testAgent.agent.syncManager.registerIdentity({ did: alice.did }); await testAgent.agent.syncManager.sync('push'); - expect(batchOperationsSpy.callCount).to.equal(1, 'push direction is passed'); - expect(batchOperationsSpy.args.filter(arg => arg.includes('push')).length).to.equal(1, `args must include push ${batchOperationsSpy.args[0]}`); - batchOperationsSpy.restore(); + batchOperationsSpy.restore(); // restore before assertions to avoid failures in other tests + expect(batchOperationsSpy.callCount).to.equal(testDwnUrls.length, 'push direction is passed'); + expect(batchOperationsSpy.args.filter(arg => arg.includes('push')).length).to.equal(testDwnUrls.length, `args must include push ${batchOperationsSpy.args[0]}`); }); - it('should be called twice if no direction is passed', async () => { + it('should be called twice per remote DWN if no direction is passed', async () => { const batchOperationsSpy = sinon.spy(testAgent.agent.syncManager as any, 'batchOperations'); await testAgent.agent.syncManager.registerIdentity({ did: alice.did }); await testAgent.agent.syncManager.sync(); - expect(batchOperationsSpy.callCount).to.equal(2, 'no direction is passed'); - expect(batchOperationsSpy.args.filter(arg => arg.includes('pull')).length).to.equal(1, `args must include one pull ${batchOperationsSpy.args}`); - expect(batchOperationsSpy.args.filter(arg => arg.includes('push')).length).to.equal(1, `args must include one push ${batchOperationsSpy.args}`); - batchOperationsSpy.restore(); + batchOperationsSpy.restore(); // restore before assertions to avoid failures in other tests + expect(batchOperationsSpy.callCount).to.equal((2 * testDwnUrls.length), 'no direction is passed'); + expect(batchOperationsSpy.args.filter(arg => arg.includes('pull')).length).to.equal(testDwnUrls.length, `args must include one pull ${batchOperationsSpy.args}`); + expect(batchOperationsSpy.args.filter(arg => arg.includes('push')).length).to.equal(testDwnUrls.length, `args must include one push ${batchOperationsSpy.args}`); }); }); describe('pull', () => { From fdd6d6b7e33c265440f40dea8e07d39b63bc2729 Mon Sep 17 00:00:00 2001 From: Liran Cohen Date: Tue, 19 Sep 2023 14:19:32 -0400 Subject: [PATCH 08/20] fix watermark --- packages/agent/src/sync-manager.ts | 59 +++++++++-------------- packages/agent/tests/sync-manager.spec.ts | 9 ++++ 2 files changed, 33 insertions(+), 35 deletions(-) diff --git a/packages/agent/src/sync-manager.ts b/packages/agent/src/sync-manager.ts index 28c23f16d..c0e2cc96b 100644 --- a/packages/agent/src/sync-manager.ts +++ b/packages/agent/src/sync-manager.ts @@ -49,12 +49,11 @@ export type SyncManagerOptions = { db?: Level; }; - type SyncState = { did: string; dwnUrl: string; - pullWatermark: string | undefined; - pushWatermark: string | undefined; + pullWatermark?: string; + pushWatermark?: string; } type DwnMessage = { @@ -532,57 +531,47 @@ export class SyncManagerLevel implements SyncManager { /** Get the watermark (or undefined) for each (DID, DWN service endpoint, sync direction) * combination and add it to the sync peer state array. */ for (let dwnUrl of service.serviceEndpoint.nodes) { - const watermark = await this.getWatermark(did, dwnUrl); - syncPeerState.push({ did, dwnUrl, pullWatermark: watermark.pull, pushWatermark: watermark.push }); + const syncState = await this.getSyncState(did, dwnUrl); + syncPeerState.push(syncState); } } return syncPeerState; } - private async getWatermark(did: string, dwnUrl: string): Promise<{ pull?:string, push?: string }> { - const wmKey = `${did}~${dwnUrl}`; + private async getWatermark(did: string, dwnUrl: string, direction: SyncDirection): Promise { + const wmKey = `${did}~${dwnUrl}~${direction}`; const watermarkStore = this.getWatermarkStore(); - try { const wm = await watermarkStore.get(wmKey); - const split = wm.split('~'); - if (split.length !== 2) { - return {}; - } - - let pull; - let push; - if (split[0] !== '0') { - pull = split[0]; - } - if (split[1] !== '0') { - push = split[1]; - } - - return { pull, push }; + return wm; } catch(error: any) { // Don't throw when a key wasn't found. if (error.notFound) { - return {}; + return undefined; } - throw new Error('invalid watermark'); + throw new Error('SyncManager: invalid watermark store'); } } - private async setWatermark(did: string, dwnUrl: string, pullWatermark?: string, pushWatermark?: string) { - const wmKey = `${did}~${dwnUrl}`; + private async setWatermark(did: string, dwnUrl: string, direction: SyncDirection, watermark: string) { + const wmKey = `${did}~${dwnUrl}~${direction}`; const watermarkStore = this.getWatermarkStore(); + await watermarkStore.put(wmKey, watermark); + } - if (pullWatermark === undefined) { - pullWatermark = '0'; - } - - if (pushWatermark === undefined) { - pushWatermark = '0'; + private async getSyncState(did: string, dwnUrl: string): Promise { + try { + const pullWatermark = await this.getWatermark(did, dwnUrl, 'pull'); + const pushWatermark = await this.getWatermark(did, dwnUrl, 'push'); + return { did, dwnUrl, pullWatermark, pushWatermark }; + } catch(error: any) { + // Don't throw when a key wasn't found. + if (error.notFound) { + return { did, dwnUrl }; + } + throw new Error('SyncManager: invalid watermark store'); } - - await watermarkStore.put(wmKey, `${pullWatermark}~${pushWatermark}`); } /** diff --git a/packages/agent/tests/sync-manager.spec.ts b/packages/agent/tests/sync-manager.spec.ts index 4671fafac..cd51f0e27 100644 --- a/packages/agent/tests/sync-manager.spec.ts +++ b/packages/agent/tests/sync-manager.spec.ts @@ -212,6 +212,15 @@ describe('SyncManagerLevel', () => { expect(batchOperationsSpy.args.filter(arg => arg.includes('push')).length).to.equal(testDwnUrls.length, `args must include one push ${batchOperationsSpy.args}`); }); }); + + describe('watermarks', async () =>{ + await testAgent.agent.syncManager.registerIdentity({ + did: alice.did + }); + testAgent.agent.syncManager.startSync({ }); + await sleep(500); + }); + describe('pull', () => { it('synchronizes records for 1 identity from remote DWN to local DWN', async () => { // Write a test record to Alice's remote DWN. From c0729134d089c0808799cdc28148a392e8365335 Mon Sep 17 00:00:00 2001 From: Liran Cohen Date: Tue, 19 Sep 2023 14:37:29 -0400 Subject: [PATCH 09/20] move sync watermark error further up --- packages/agent/src/sync-manager.ts | 25 ++++++++++--------------- 1 file changed, 10 insertions(+), 15 deletions(-) diff --git a/packages/agent/src/sync-manager.ts b/packages/agent/src/sync-manager.ts index c0e2cc96b..94c139fe0 100644 --- a/packages/agent/src/sync-manager.ts +++ b/packages/agent/src/sync-manager.ts @@ -531,8 +531,12 @@ export class SyncManagerLevel implements SyncManager { /** Get the watermark (or undefined) for each (DID, DWN service endpoint, sync direction) * combination and add it to the sync peer state array. */ for (let dwnUrl of service.serviceEndpoint.nodes) { - const syncState = await this.getSyncState(did, dwnUrl); - syncPeerState.push(syncState); + try { + const syncState = await this.getSyncState(did, dwnUrl); + syncPeerState.push(syncState); + } catch(error) { + // go onto next peer if this fails + } } } @@ -543,8 +547,7 @@ export class SyncManagerLevel implements SyncManager { const wmKey = `${did}~${dwnUrl}~${direction}`; const watermarkStore = this.getWatermarkStore(); try { - const wm = await watermarkStore.get(wmKey); - return wm; + return await watermarkStore.get(wmKey); } catch(error: any) { // Don't throw when a key wasn't found. if (error.notFound) { @@ -561,17 +564,9 @@ export class SyncManagerLevel implements SyncManager { } private async getSyncState(did: string, dwnUrl: string): Promise { - try { - const pullWatermark = await this.getWatermark(did, dwnUrl, 'pull'); - const pushWatermark = await this.getWatermark(did, dwnUrl, 'push'); - return { did, dwnUrl, pullWatermark, pushWatermark }; - } catch(error: any) { - // Don't throw when a key wasn't found. - if (error.notFound) { - return { did, dwnUrl }; - } - throw new Error('SyncManager: invalid watermark store'); - } + const pullWatermark = await this.getWatermark(did, dwnUrl, 'pull'); + const pushWatermark = await this.getWatermark(did, dwnUrl, 'push'); + return { did, dwnUrl, pullWatermark, pushWatermark }; } /** From deb334647e5f44aec4336329151f01be7da2ce94 Mon Sep 17 00:00:00 2001 From: Liran Cohen Date: Fri, 22 Sep 2023 17:58:44 -0400 Subject: [PATCH 10/20] use sinon SinonFakeTimers for sync interval test, remove multiple dwn servers from tests --- .github/workflows/tests-ci.yml | 16 ++--- packages/agent/src/sync-manager.ts | 4 +- packages/agent/tests/sync-manager.spec.ts | 78 ++++++++++++----------- packages/dev-env/docker-compose.yaml | 9 +-- 4 files changed, 49 insertions(+), 58 deletions(-) diff --git a/.github/workflows/tests-ci.yml b/.github/workflows/tests-ci.yml index bd203d4ad..76a999111 100644 --- a/.github/workflows/tests-ci.yml +++ b/.github/workflows/tests-ci.yml @@ -53,17 +53,13 @@ jobs: - name: Start dwn-server container run: cd packages/dev-env && docker-compose up -d - - name: Wait for dwn-server1 to be ready + - name: Wait for dwn-server to be ready run: until curl -sf http://localhost:3000/health; do echo -n .; sleep .1; done - - name: Wait for dwn-server2 to be ready - run: until curl -sf http://localhost:3001/health; do echo -n .; sleep .1; done - - name: Run tests for all packages run: npm run test:node --ws -- --color env: - TEST_DWN_URLS: http://localhost:3000,http://localhost:3001 - MIN_SYNC_INTERVAL: 100 + TEST_DWN_URLS: http://localhost:3000 - name: Upload test coverage to Codecov uses: codecov/codecov-action@eaaf4bedf32dbdc6b720b63067d99c4d77d6047d # v3.1.4 @@ -104,14 +100,10 @@ jobs: - name: Start dwn-server container run: cd packages/dev-env && docker-compose up -d - - name: Wait for dwn-server1 to be ready + - name: Wait for dwn-server to be ready run: until curl -sf http://localhost:3000/health; do echo -n .; sleep .1; done - - name: Wait for dwn-server2 to be ready - run: until curl -sf http://localhost:3001/health; do echo -n .; sleep .1; done - - name: Run tests for all packages run: npm run test:browser --ws -- --color env: - TEST_DWN_URLS: http://localhost:3000,http://localhost:3001 - MIN_SYNC_INTERVAL: 100 + TEST_DWN_URLS: http://localhost:3000 diff --git a/packages/agent/src/sync-manager.ts b/packages/agent/src/sync-manager.ts index 94c139fe0..9100bf64a 100644 --- a/packages/agent/src/sync-manager.ts +++ b/packages/agent/src/sync-manager.ts @@ -17,10 +17,8 @@ import type { Web5ManagedAgent } from './types/agent.js'; import { webReadableToIsomorphicNodeReadable } from './utils.js'; -const checkNumber = (n?: string) => isNaN(parseInt(n || '')) ? undefined : parseInt(n || ''); // arbitrary number for now, but we should enforce some sane minimum -// allow for environment to set a minimum -const MIN_SYNC_INTERVAL = checkNumber(process?.env.MIN_SYNC_INTERVAL) ?? 5000; +export const MIN_SYNC_INTERVAL = 3000; type SyncDirection = 'pull' | 'push'; diff --git a/packages/agent/tests/sync-manager.spec.ts b/packages/agent/tests/sync-manager.spec.ts index cd51f0e27..d8f14c31a 100644 --- a/packages/agent/tests/sync-manager.spec.ts +++ b/packages/agent/tests/sync-manager.spec.ts @@ -1,14 +1,14 @@ import type { PortableDid } from '@web5/dids'; import { expect } from 'chai'; -import * as sinon from 'sinon'; +import sinon from 'sinon'; import type { ManagedIdentity } from '../src/identity-manager.js'; import { randomUuid } from '@web5/crypto/utils'; import { testDwnUrls } from './test-config.js'; -import { TestAgent, sleep } from './utils/test-agent.js'; -import { SyncManagerLevel } from '../src/sync-manager.js'; +import { TestAgent } from './utils/test-agent.js'; +import { MIN_SYNC_INTERVAL, SyncManagerLevel } from '../src/sync-manager.js'; import { TestManagedAgent } from '../src/test-managed-agent.js'; import { ProcessDwnRequest } from '../src/index.js'; @@ -74,6 +74,45 @@ describe('SyncManagerLevel', () => { await testAgent.closeStorage(); }); + describe('startSync()', () => { + let clock: sinon.SinonFakeTimers; + + beforeEach(() => { + clock = sinon.useFakeTimers(); + }); + + afterEach(() => { + clock.restore(); + }); + + it('check sync interval input', async () => { + const syncSpy = sinon.spy(testAgent.agent.syncManager, 'sync'); + await testAgent.agent.syncManager.registerIdentity({ + did: alice.did + }); + testAgent.agent.syncManager.startSync({ interval: 5000 }); + + clock.tick(3 * 5000); + + expect(syncSpy.callCount).to.equal(3); + syncSpy.restore(); + }); + + it('check sync default value passed', async () => { + const setIntervalSpy = sinon.spy(global, 'setInterval'); + await testAgent.agent.syncManager.registerIdentity({ + did: alice.did + }); + testAgent.agent.syncManager.startSync(); + + clock.tick( 1 * MIN_SYNC_INTERVAL); + + expect(setIntervalSpy.calledOnce).to.be.true; + expect(setIntervalSpy.getCall(0).args.at(1)).to.equal(MIN_SYNC_INTERVAL); + setIntervalSpy.restore(); + }); + }); + describe('sync()', () => { it('takes no action if no identities are registered', async () => { const didResolveSpy = sinon.spy(testAgent.agent.didResolver, 'resolve'); @@ -152,31 +191,6 @@ describe('SyncManagerLevel', () => { }).timeout(10_000); - // tests must be run with a low MIN_SYNC_INTERVAL - it('check sync interval input', async () => { - const syncSpy = sinon.spy(testAgent.agent.syncManager, 'sync'); - await testAgent.agent.syncManager.registerIdentity({ - did: alice.did - }); - testAgent.agent.syncManager.startSync({ interval: 300 }); - await sleep(1000); - expect(syncSpy.callCount).to.equal(3); - syncSpy.restore(); - }); - - // test must be run with MIN_SYNC_INTERVAL=100 - it('check sync default value passed', async () => { - const setIntervalSpy = sinon.spy(global, 'setInterval'); - await testAgent.agent.syncManager.registerIdentity({ - did: alice.did - }); - testAgent.agent.syncManager.startSync({ }); - await sleep(500); - expect(setIntervalSpy.calledOnce).to.be.true; - expect(setIntervalSpy.getCall(0).args.at(1)).to.equal(100); - setIntervalSpy.restore(); - }); - describe('batchOperations()', () => { it('should only call once per remote DWN if pull direction is passed', async () => { const batchOperationsSpy = sinon.spy(testAgent.agent.syncManager as any, 'batchOperations'); @@ -213,14 +227,6 @@ describe('SyncManagerLevel', () => { }); }); - describe('watermarks', async () =>{ - await testAgent.agent.syncManager.registerIdentity({ - did: alice.did - }); - testAgent.agent.syncManager.startSync({ }); - await sleep(500); - }); - describe('pull', () => { it('synchronizes records for 1 identity from remote DWN to local DWN', async () => { // Write a test record to Alice's remote DWN. diff --git a/packages/dev-env/docker-compose.yaml b/packages/dev-env/docker-compose.yaml index 36c1ebfb0..98e08847e 100644 --- a/packages/dev-env/docker-compose.yaml +++ b/packages/dev-env/docker-compose.yaml @@ -1,13 +1,8 @@ version: "3.98" services: - dwn-server2: - container_name: dwn-server2 - image: ghcr.io/tbd54566975/dwn-server:dwn-sdk-0.2.3 - ports: - - "3001:3000" - dwn-server1: - container_name: dwn-server1 + dwn-server: + container_name: dwn-server image: ghcr.io/tbd54566975/dwn-server:dwn-sdk-0.2.3 ports: - "3000:3000" From 4ea1a15f2e9933627dec1318883036fa430a722e Mon Sep 17 00:00:00 2001 From: Liran Cohen Date: Tue, 26 Sep 2023 11:18:14 -0400 Subject: [PATCH 11/20] updated test coverage --- packages/agent/tests/sync-manager.spec.ts | 106 ++++++++++++++++++---- packages/agent/tests/utils/test-agent.ts | 31 ++++++- 2 files changed, 115 insertions(+), 22 deletions(-) diff --git a/packages/agent/tests/sync-manager.spec.ts b/packages/agent/tests/sync-manager.spec.ts index d8f14c31a..7b9b24f4e 100644 --- a/packages/agent/tests/sync-manager.spec.ts +++ b/packages/agent/tests/sync-manager.spec.ts @@ -7,12 +7,29 @@ import type { ManagedIdentity } from '../src/identity-manager.js'; import { randomUuid } from '@web5/crypto/utils'; import { testDwnUrls } from './test-config.js'; -import { TestAgent } from './utils/test-agent.js'; +import { TestAgent, randomBytes } from './utils/test-agent.js'; import { MIN_SYNC_INTERVAL, SyncManagerLevel } from '../src/sync-manager.js'; import { TestManagedAgent } from '../src/test-managed-agent.js'; import { ProcessDwnRequest } from '../src/index.js'; -import { RecordsQueryReply, RecordsWriteMessage } from '@tbd54566975/dwn-sdk-js'; +import { DataStream, RecordsQueryReply, RecordsWriteMessage } from '@tbd54566975/dwn-sdk-js'; +import { Readable } from 'readable-stream'; + +/** + * Generates a `RecordsWrite` ProcessDwnRequest for testing. + */ +export function TestRecordsWriteMessage(target: string, author: string, dataStream: Blob | ReadableStream | Readable ): ProcessDwnRequest { + return { + author : author, + target : target, + messageType : 'RecordsWrite', + messageOptions : { + schema : 'testSchema', + dataFormat : 'text/plain' + }, + dataStream, + }; +}; describe('SyncManagerLevel', () => { describe('get agent', () => { @@ -98,6 +115,32 @@ describe('SyncManagerLevel', () => { syncSpy.restore(); }); + it('subsequent startSync should cancel the old sync and start a new sync interval', async () => { + const syncSpy = sinon.spy(testAgent.agent.syncManager, 'sync'); + await testAgent.agent.syncManager.registerIdentity({ + did: alice.did + }); + + // start sync with default timeout + testAgent.agent.syncManager.startSync(); + + // go through 3 intervals + clock.tick(3 * MIN_SYNC_INTERVAL); + + expect(syncSpy.callCount).to.equal(3); + + // start sync with a higher interval. Should cancel the old sync and set a new interval. + testAgent.agent.syncManager.startSync({ interval: 10_000 }); + + // go through 3 intervals with the new timeout + clock.tick( 3 * 10_000); + + // should be called a total of 6 times. + expect(syncSpy.callCount).to.equal(6); + + syncSpy.restore(); + }); + it('check sync default value passed', async () => { const setIntervalSpy = sinon.spy(global, 'setInterval'); await testAgent.agent.syncManager.registerIdentity({ @@ -128,24 +171,43 @@ describe('SyncManagerLevel', () => { sendDwnRequestSpy.restore(); }); - it('synchronizes data in both directions for a single identity', async () => { + it('silently ignore when a particular DWN service endpoint is unreachable', async () => { + // Write a test record to Alice's remote DWN. + let writeResponse = await testAgent.agent.dwnManager.sendRequest({ + author : alice.did, + target : alice.did, + messageType : 'RecordsWrite', + messageOptions : { + dataFormat: 'text/plain' + }, + dataStream: new Blob(['Hello, world!']) + }); + expect(writeResponse.reply.status.code).to.equal(202); + + const getRemoteEventsSpy = sinon.spy(testAgent.agent.syncManager as any, 'getRemoteEvents'); + const sendDwnRequestStub = sinon.stub(testAgent.agent.rpcClient, 'sendDwnRequest').rejects('some failure'); + // Register Alice's DID to be synchronized. await testAgent.agent.syncManager.registerIdentity({ did: alice.did }); - const testWriteMessage = (id: string): ProcessDwnRequest => { - return { - author : alice.did, - target : alice.did, - messageType : 'RecordsWrite', - messageOptions : { - schema : 'testSchema', - dataFormat : 'text/plain' - }, - dataStream: new Blob([`Hello, ${id}`]) - }; - }; + // Execute Sync to pull all records from Alice's remote DWN to Alice's local DWN. + await testAgent.agent.syncManager.sync(); + + //restore sinon stubs and spys + getRemoteEventsSpy.restore(); + sendDwnRequestStub.restore(); + + expect(getRemoteEventsSpy.called).to.be.true; + expect(getRemoteEventsSpy.threw()).to.be.false; + }); + + it('synchronizes data in both directions for a single identity', async () => { + + await testAgent.agent.syncManager.registerIdentity({ + did: alice.did + }); const everythingQuery = (): ProcessDwnRequest => { return { @@ -157,13 +219,19 @@ describe('SyncManagerLevel', () => { }; const localRecords = new Set( - (await Promise.all(Array(5).fill({}).map(_ => testAgent.agent.dwnManager.processRequest(testWriteMessage(randomUuid()))))) - .map(r => (r.message as RecordsWriteMessage).recordId) + (await Promise.all(Array(5).fill({}).map(_ => testAgent.agent.dwnManager.processRequest(TestRecordsWriteMessage( + alice.did, + alice.did, + new Blob([randomBytes(256)]), + ))))).map(r => (r.message as RecordsWriteMessage).recordId) ); const remoteRecords = new Set( - (await Promise.all(Array(5).fill({}).map(_ => testAgent.agent.dwnManager.sendRequest(testWriteMessage(randomUuid()))))) - .map(r => (r.message as RecordsWriteMessage).recordId) + (await Promise.all(Array(5).fill({}).map(_ => testAgent.agent.dwnManager.sendRequest(TestRecordsWriteMessage( + alice.did, + alice.did, + new Blob([randomBytes(256)]), + ))))).map(r => (r.message as RecordsWriteMessage).recordId) ); const { reply: localReply } = await testAgent.agent.dwnManager.processRequest(everythingQuery()); diff --git a/packages/agent/tests/utils/test-agent.ts b/packages/agent/tests/utils/test-agent.ts index c4cc9cb26..ac8b17526 100644 --- a/packages/agent/tests/utils/test-agent.ts +++ b/packages/agent/tests/utils/test-agent.ts @@ -25,6 +25,7 @@ import { Web5RpcClient } from '../../src/rpc-client.js'; import { AppDataVault } from '../../src/app-data-store.js'; import { IdentityManager } from '../../src/identity-manager.js'; import { SyncManager, SyncManagerLevel } from '../../src/sync-manager.js'; +import { Readable } from 'readable-stream'; type CreateMethodOptions = { testDataLocation?: string; @@ -205,6 +206,30 @@ export class TestAgent implements Web5ManagedAgent { } } -export async function sleep(duration: number): Promise { - return new Promise((resolve) => setTimeout(resolve, duration)); -} \ No newline at end of file +/** + * Generates a random byte array of given length. + */ +export function randomBytes(length: number): Uint8Array { + const randomBytes = new Uint8Array(length); + for (let i = 0; i < length; i++) { + randomBytes[i] = Math.floor(Math.random() * 256); + } + + return randomBytes; +}; + +/** + * Generates a `RecordsWrite` ProcessDwnRequest for testing. + */ +export function TestRecordsWriteMessage(target: string, author: string, dataStream: Blob | ReadableStream | Readable ): ProcessDwnRequest { + return { + author : author, + target : target, + messageType : 'RecordsWrite', + messageOptions : { + schema : 'testSchema', + dataFormat : 'text/plain' + }, + dataStream, + }; +}; \ No newline at end of file From c48ddd2b3534b2ba412f5f80d13fb06f9136493a Mon Sep 17 00:00:00 2001 From: Liran Cohen Date: Tue, 26 Sep 2023 11:51:22 -0400 Subject: [PATCH 12/20] add more test coverage --- packages/agent/tests/sync-manager.spec.ts | 7 +++---- packages/agent/tests/utils/test-agent.ts | 19 +------------------ 2 files changed, 4 insertions(+), 22 deletions(-) diff --git a/packages/agent/tests/sync-manager.spec.ts b/packages/agent/tests/sync-manager.spec.ts index 7b9b24f4e..dc3588e62 100644 --- a/packages/agent/tests/sync-manager.spec.ts +++ b/packages/agent/tests/sync-manager.spec.ts @@ -5,14 +5,13 @@ import sinon from 'sinon'; import type { ManagedIdentity } from '../src/identity-manager.js'; -import { randomUuid } from '@web5/crypto/utils'; import { testDwnUrls } from './test-config.js'; import { TestAgent, randomBytes } from './utils/test-agent.js'; import { MIN_SYNC_INTERVAL, SyncManagerLevel } from '../src/sync-manager.js'; import { TestManagedAgent } from '../src/test-managed-agent.js'; import { ProcessDwnRequest } from '../src/index.js'; -import { DataStream, RecordsQueryReply, RecordsWriteMessage } from '@tbd54566975/dwn-sdk-js'; +import { RecordsQueryReply, RecordsWriteMessage } from '@tbd54566975/dwn-sdk-js'; import { Readable } from 'readable-stream'; /** @@ -20,7 +19,7 @@ import { Readable } from 'readable-stream'; */ export function TestRecordsWriteMessage(target: string, author: string, dataStream: Blob | ReadableStream | Readable ): ProcessDwnRequest { return { - author : author, + author : author, target : target, messageType : 'RecordsWrite', messageOptions : { @@ -29,7 +28,7 @@ export function TestRecordsWriteMessage(target: string, author: string, dataStre }, dataStream, }; -}; +} describe('SyncManagerLevel', () => { describe('get agent', () => { diff --git a/packages/agent/tests/utils/test-agent.ts b/packages/agent/tests/utils/test-agent.ts index ac8b17526..72a5d8a2e 100644 --- a/packages/agent/tests/utils/test-agent.ts +++ b/packages/agent/tests/utils/test-agent.ts @@ -25,7 +25,6 @@ import { Web5RpcClient } from '../../src/rpc-client.js'; import { AppDataVault } from '../../src/app-data-store.js'; import { IdentityManager } from '../../src/identity-manager.js'; import { SyncManager, SyncManagerLevel } from '../../src/sync-manager.js'; -import { Readable } from 'readable-stream'; type CreateMethodOptions = { testDataLocation?: string; @@ -216,20 +215,4 @@ export function randomBytes(length: number): Uint8Array { } return randomBytes; -}; - -/** - * Generates a `RecordsWrite` ProcessDwnRequest for testing. - */ -export function TestRecordsWriteMessage(target: string, author: string, dataStream: Blob | ReadableStream | Readable ): ProcessDwnRequest { - return { - author : author, - target : target, - messageType : 'RecordsWrite', - messageOptions : { - schema : 'testSchema', - dataFormat : 'text/plain' - }, - dataStream, - }; -}; \ No newline at end of file +} \ No newline at end of file From 5e626ed3486f273b3833a00856d03a3787a66d41 Mon Sep 17 00:00:00 2001 From: Liran Cohen Date: Tue, 26 Sep 2023 12:25:25 -0400 Subject: [PATCH 13/20] update test coverage --- packages/agent/tests/sync-manager.spec.ts | 29 +++++++++++++++++++++++ 1 file changed, 29 insertions(+) diff --git a/packages/agent/tests/sync-manager.spec.ts b/packages/agent/tests/sync-manager.spec.ts index dc3588e62..a8269b4a2 100644 --- a/packages/agent/tests/sync-manager.spec.ts +++ b/packages/agent/tests/sync-manager.spec.ts @@ -114,6 +114,19 @@ describe('SyncManagerLevel', () => { syncSpy.restore(); }); + it('sync interval below minimum allowed threshold will sync at the minimum interval', async () => { + const syncSpy = sinon.spy(testAgent.agent.syncManager, 'sync'); + await testAgent.agent.syncManager.registerIdentity({ + did: alice.did + }); + testAgent.agent.syncManager.startSync({ interval: 100 }); + + clock.tick(3 * MIN_SYNC_INTERVAL); + + expect(syncSpy.callCount).to.equal(3); + syncSpy.restore(); + }); + it('subsequent startSync should cancel the old sync and start a new sync interval', async () => { const syncSpy = sinon.spy(testAgent.agent.syncManager, 'sync'); await testAgent.agent.syncManager.registerIdentity({ @@ -258,6 +271,22 @@ describe('SyncManagerLevel', () => { }).timeout(10_000); + it('should skip dwn if there a failure getting syncState', async () => { + await testAgent.agent.syncManager.registerIdentity({ + did: alice.did + }); + + const getWatermarkStub = sinon.stub(testAgent.agent.syncManager as any, 'getSyncState').rejects('rejected'); + const getSyncPeerState = sinon.spy(testAgent.agent.syncManager as any, 'getSyncPeerState'); + + await testAgent.agent.syncManager.sync(); + getWatermarkStub.restore(); + getSyncPeerState.restore(); + + expect(getSyncPeerState.called).to.be.true; + expect(getWatermarkStub.called).to.be.true; + }); + describe('batchOperations()', () => { it('should only call once per remote DWN if pull direction is passed', async () => { const batchOperationsSpy = sinon.spy(testAgent.agent.syncManager as any, 'batchOperations'); From 6eb6f1988ddf7c94e9ac85a2c1e55271def68f43 Mon Sep 17 00:00:00 2001 From: Liran Cohen Date: Tue, 26 Sep 2023 12:29:40 -0400 Subject: [PATCH 14/20] lint fix --- packages/agent/tests/sync-manager.spec.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/agent/tests/sync-manager.spec.ts b/packages/agent/tests/sync-manager.spec.ts index a8269b4a2..769be33bf 100644 --- a/packages/agent/tests/sync-manager.spec.ts +++ b/packages/agent/tests/sync-manager.spec.ts @@ -274,7 +274,7 @@ describe('SyncManagerLevel', () => { it('should skip dwn if there a failure getting syncState', async () => { await testAgent.agent.syncManager.registerIdentity({ did: alice.did - }); + }); const getWatermarkStub = sinon.stub(testAgent.agent.syncManager as any, 'getSyncState').rejects('rejected'); const getSyncPeerState = sinon.spy(testAgent.agent.syncManager as any, 'getSyncPeerState'); From c8a87c9b15bc1a6eacf39cc6e33fd69d3d7437cf Mon Sep 17 00:00:00 2001 From: Liran Cohen Date: Tue, 26 Sep 2023 17:38:02 -0400 Subject: [PATCH 15/20] data size test --- packages/agent/tests/sync-manager.spec.ts | 124 +++++++++++++++++++++- 1 file changed, 121 insertions(+), 3 deletions(-) diff --git a/packages/agent/tests/sync-manager.spec.ts b/packages/agent/tests/sync-manager.spec.ts index 769be33bf..a7aed89b2 100644 --- a/packages/agent/tests/sync-manager.spec.ts +++ b/packages/agent/tests/sync-manager.spec.ts @@ -11,7 +11,7 @@ import { MIN_SYNC_INTERVAL, SyncManagerLevel } from '../src/sync-manager.js'; import { TestManagedAgent } from '../src/test-managed-agent.js'; import { ProcessDwnRequest } from '../src/index.js'; -import { RecordsQueryReply, RecordsWriteMessage } from '@tbd54566975/dwn-sdk-js'; +import { DataStream, RecordsQueryReply, RecordsReadReply, RecordsWriteMessage } from '@tbd54566975/dwn-sdk-js'; import { Readable } from 'readable-stream'; /** @@ -441,7 +441,66 @@ describe('SyncManagerLevel', () => { localDwnQueryReply = queryResponse.reply as RecordsQueryReply; expect(localDwnQueryReply.status.code).to.equal(200); // Query was successfully executed. expect(localDwnQueryReply.entries).to.have.length(1); // Record does exist on local DWN. - }).timeout(5000); + }).timeout(5_000); + + it('synchronizes records with data larger than the `encodedData` limit within the `RecordsQuery` response', async () => { + // larger than the size of data returned in a RecordsQuery + const LARGE_DATA_SIZE = 70_000; + const everythingQuery = (): ProcessDwnRequest => { + return { + author : alice.did, + target : alice.did, + messageType : 'RecordsQuery', + messageOptions : { filter: { schema: 'testSchema' } } + }; + }; + + //register alice + await testAgent.agent.syncManager.registerIdentity({ + did: alice.did + }); + + // create remote records to sync locally + const remoteRecords = new Set( + (await Promise.all(Array(2).fill({}).map(i => testAgent.agent.dwnManager.sendRequest(TestRecordsWriteMessage( + alice.did, + alice.did, + new Blob([randomBytes(i % 2 === 0 ? 256 : LARGE_DATA_SIZE )]), // create some small and large records + )))))); + + // check that records don't exist locally + const { reply: localReply } = await testAgent.agent.dwnManager.processRequest(everythingQuery()); + expect(localReply.status.code).to.equal(200); + expect(localReply.entries?.length).to.equal(0); + + // initiate sync + await testAgent.agent.syncManager.sync(); + + // query for local records + const { reply: localReply2 } = await testAgent.agent.dwnManager.processRequest(everythingQuery()); + expect(localReply2.status.code).to.equal(200); + expect(localReply2.entries?.length).to.equal(remoteRecords.size); + + // check for response encodedData if it doesn't exist issue a RecordsRead + for (const entry of localReply2.entries!) { + if (entry.encodedData === undefined) { + const recordId = (entry as RecordsWriteMessage).recordId; + // get individual records without encodedData to check that data exists + const record = await testAgent.agent.dwnManager.processRequest({ + author : alice.did, + target : alice.did, + messageType : 'RecordsRead', + messageOptions : { recordId } + }); + const reply = record.reply as RecordsReadReply; + expect(reply.status.code).to.equal(200); + expect(reply.record).to.not.be.undefined; + expect(reply.record!.data).to.not.be.undefined; + const data = await DataStream.toBytes(reply.record!.data); + expect(data.length).to.equal(LARGE_DATA_SIZE); + } + } + }).timeout(5_000); }); describe('push', () => { @@ -562,7 +621,66 @@ describe('SyncManagerLevel', () => { remoteDwnQueryReply = queryResponse.reply as RecordsQueryReply; expect(remoteDwnQueryReply.status.code).to.equal(200); // Query was successfully executed. expect(remoteDwnQueryReply.entries).to.have.length(1); // Record does exist on remote DWN. - }).timeout(5000); + }).timeout(5_000); + + it('synchronizes records with data larger than the `encodedData` limit within the `RecordsQuery` response', async () => { + // larger than the size of data returned in a RecordsQuery + const LARGE_DATA_SIZE = 70_000; + const everythingQuery = (): ProcessDwnRequest => { + return { + author : alice.did, + target : alice.did, + messageType : 'RecordsQuery', + messageOptions : { filter: { schema: 'testSchema' } } + }; + }; + + //register alice + await testAgent.agent.syncManager.registerIdentity({ + did: alice.did + }); + + // create remote local records to sync to remote + const remoteRecords = new Set( + (await Promise.all(Array(2).fill({}).map(i => testAgent.agent.dwnManager.processRequest(TestRecordsWriteMessage( + alice.did, + alice.did, + new Blob([randomBytes(i % 2 === 0 ? 256 : LARGE_DATA_SIZE )]), // create some small and large records + )))))); + + // check that records don't exist on remote + const { reply: localReply } = await testAgent.agent.dwnManager.sendRequest(everythingQuery()); + expect(localReply.status.code).to.equal(200); + expect(localReply.entries?.length).to.equal(0); + + // initiate sync + await testAgent.agent.syncManager.sync(); + + // query for for remote records that now exist + const { reply: localReply2 } = await testAgent.agent.dwnManager.sendRequest(everythingQuery()); + expect(localReply2.status.code).to.equal(200); + expect(localReply2.entries?.length).to.equal(remoteRecords.size); + + // check for response encodedData if it doesn't exist issue a RecordsRead + for (const entry of localReply2.entries!) { + if (entry.encodedData === undefined) { + const recordId = (entry as RecordsWriteMessage).recordId; + // get individual records without encodedData to check that data exists + const record = await testAgent.agent.dwnManager.sendRequest({ + author : alice.did, + target : alice.did, + messageType : 'RecordsRead', + messageOptions : { recordId } + }); + const reply = record.reply as RecordsReadReply; + expect(reply.status.code).to.equal(200); + expect(reply.record).to.not.be.undefined; + expect(reply.record!.data).to.not.be.undefined; + const data = await DataStream.toBytes(reply.record!.data); + expect(data.length).to.equal(LARGE_DATA_SIZE); + } + } + }).timeout(5_000); }); }); }); From 0607591c1b1972ce317264b25c02893c09979761 Mon Sep 17 00:00:00 2001 From: Liran Cohen Date: Thu, 28 Sep 2023 13:15:10 -0400 Subject: [PATCH 16/20] update tests --- packages/agent/tests/sync-manager.spec.ts | 95 +++++++++++++++-------- packages/agent/tests/utils/test-agent.ts | 15 ++++ 2 files changed, 79 insertions(+), 31 deletions(-) diff --git a/packages/agent/tests/sync-manager.spec.ts b/packages/agent/tests/sync-manager.spec.ts index a7aed89b2..ef9747ad4 100644 --- a/packages/agent/tests/sync-manager.spec.ts +++ b/packages/agent/tests/sync-manager.spec.ts @@ -6,13 +6,14 @@ import sinon from 'sinon'; import type { ManagedIdentity } from '../src/identity-manager.js'; import { testDwnUrls } from './test-config.js'; -import { TestAgent, randomBytes } from './utils/test-agent.js'; +import { TestAgent, randomString } from './utils/test-agent.js'; import { MIN_SYNC_INTERVAL, SyncManagerLevel } from '../src/sync-manager.js'; import { TestManagedAgent } from '../src/test-managed-agent.js'; import { ProcessDwnRequest } from '../src/index.js'; import { DataStream, RecordsQueryReply, RecordsReadReply, RecordsWriteMessage } from '@tbd54566975/dwn-sdk-js'; import { Readable } from 'readable-stream'; +import { randomUuid } from '@web5/crypto/utils'; /** * Generates a `RecordsWrite` ProcessDwnRequest for testing. @@ -234,7 +235,7 @@ describe('SyncManagerLevel', () => { (await Promise.all(Array(5).fill({}).map(_ => testAgent.agent.dwnManager.processRequest(TestRecordsWriteMessage( alice.did, alice.did, - new Blob([randomBytes(256)]), + new Blob([randomString(256)]), ))))).map(r => (r.message as RecordsWriteMessage).recordId) ); @@ -242,7 +243,7 @@ describe('SyncManagerLevel', () => { (await Promise.all(Array(5).fill({}).map(_ => testAgent.agent.dwnManager.sendRequest(TestRecordsWriteMessage( alice.did, alice.did, - new Blob([randomBytes(256)]), + new Blob([randomString(256)]), ))))).map(r => (r.message as RecordsWriteMessage).recordId) ); @@ -460,26 +461,41 @@ describe('SyncManagerLevel', () => { did: alice.did }); - // create remote records to sync locally - const remoteRecords = new Set( - (await Promise.all(Array(2).fill({}).map(i => testAgent.agent.dwnManager.sendRequest(TestRecordsWriteMessage( - alice.did, - alice.did, - new Blob([randomBytes(i % 2 === 0 ? 256 : LARGE_DATA_SIZE )]), // create some small and large records - )))))); + const dataObject = { + id: randomUuid(), + randomString: randomString(LARGE_DATA_SIZE) + } + + const dataString = JSON.stringify(dataObject); + + // create remote record + const record = await testAgent.agent.dwnManager.processRequest(TestRecordsWriteMessage( + alice.did, + alice.did, + new Blob([ dataString ]), + )); // check that records don't exist locally - const { reply: localReply } = await testAgent.agent.dwnManager.processRequest(everythingQuery()); + const { reply: localReply } = await testAgent.agent.dwnManager.processRequest({ + author: alice.did, target: alice.did, + messageType: 'RecordsQuery', + messageOptions: { filter: { recordId: (record.message as RecordsWriteMessage).recordId }} + }); + expect(localReply.status.code).to.equal(200); - expect(localReply.entries?.length).to.equal(0); + expect(localReply.entries?.length).to.equal(1); // initiate sync await testAgent.agent.syncManager.sync(); // query for local records - const { reply: localReply2 } = await testAgent.agent.dwnManager.processRequest(everythingQuery()); + const { reply: localReply2 } = await testAgent.agent.dwnManager.processRequest({ + author: alice.did, target: alice.did, + messageType: 'RecordsQuery', + messageOptions: { filter: { recordId: (record.message as RecordsWriteMessage).recordId }} + }); expect(localReply2.status.code).to.equal(200); - expect(localReply2.entries?.length).to.equal(remoteRecords.size); + expect(localReply2.entries?.length).to.equal(1); // check for response encodedData if it doesn't exist issue a RecordsRead for (const entry of localReply2.entries!) { @@ -497,7 +513,10 @@ describe('SyncManagerLevel', () => { expect(reply.record).to.not.be.undefined; expect(reply.record!.data).to.not.be.undefined; const data = await DataStream.toBytes(reply.record!.data); - expect(data.length).to.equal(LARGE_DATA_SIZE); + const newObj = JSON.parse(new TextDecoder().decode(data)); + expect(data.length).to.equal(dataString.length); + expect(newObj.id).to.equal(dataObject.id); + expect(newObj.randomString).to.equal(dataObject.randomString); } } }).timeout(5_000); @@ -626,6 +645,7 @@ describe('SyncManagerLevel', () => { it('synchronizes records with data larger than the `encodedData` limit within the `RecordsQuery` response', async () => { // larger than the size of data returned in a RecordsQuery const LARGE_DATA_SIZE = 70_000; + const everythingQuery = (): ProcessDwnRequest => { return { author : alice.did, @@ -640,29 +660,36 @@ describe('SyncManagerLevel', () => { did: alice.did }); - // create remote local records to sync to remote - const remoteRecords = new Set( - (await Promise.all(Array(2).fill({}).map(i => testAgent.agent.dwnManager.processRequest(TestRecordsWriteMessage( - alice.did, - alice.did, - new Blob([randomBytes(i % 2 === 0 ? 256 : LARGE_DATA_SIZE )]), // create some small and large records - )))))); + const dataObject = { + id: randomUuid(), + randomString: randomString(LARGE_DATA_SIZE) + } + + const dataString = JSON.stringify(dataObject); + + // create remote local record to sync to remote + const remoteRecord = await testAgent.agent.dwnManager.processRequest(TestRecordsWriteMessage( + alice.did, + alice.did, + new Blob([ dataString ]) + )); + expect(remoteRecord.reply.status.code).to.equal(202); // check that records don't exist on remote - const { reply: localReply } = await testAgent.agent.dwnManager.sendRequest(everythingQuery()); - expect(localReply.status.code).to.equal(200); - expect(localReply.entries?.length).to.equal(0); + const { reply: remoteReply } = await testAgent.agent.dwnManager.sendRequest(everythingQuery()); + expect(remoteReply.status.code).to.equal(200); + expect(remoteReply.entries?.length).to.equal(0); // initiate sync await testAgent.agent.syncManager.sync(); // query for for remote records that now exist - const { reply: localReply2 } = await testAgent.agent.dwnManager.sendRequest(everythingQuery()); - expect(localReply2.status.code).to.equal(200); - expect(localReply2.entries?.length).to.equal(remoteRecords.size); + const { reply: remoteReply2 } = await testAgent.agent.dwnManager.sendRequest(everythingQuery()); + expect(remoteReply2.status.code).to.equal(200); + expect(remoteReply2.entries?.length).to.equal(1); // check for response encodedData if it doesn't exist issue a RecordsRead - for (const entry of localReply2.entries!) { + for (const entry of remoteReply2.entries!) { if (entry.encodedData === undefined) { const recordId = (entry as RecordsWriteMessage).recordId; // get individual records without encodedData to check that data exists @@ -676,8 +703,14 @@ describe('SyncManagerLevel', () => { expect(reply.status.code).to.equal(200); expect(reply.record).to.not.be.undefined; expect(reply.record!.data).to.not.be.undefined; - const data = await DataStream.toBytes(reply.record!.data); - expect(data.length).to.equal(LARGE_DATA_SIZE); + + const replyRecord = reply.record as unknown as RecordsWriteMessage & { data: ReadableStream }; + expect(replyRecord.data).to.exist; + expect(replyRecord.data instanceof ReadableStream).to.be.true; + + const { value } = await replyRecord.data.getReader().read(); + const replyDataString = new TextDecoder().decode(value); + expect(replyDataString.length).to.eq(dataString.length); } } }).timeout(5_000); diff --git a/packages/agent/tests/utils/test-agent.ts b/packages/agent/tests/utils/test-agent.ts index 72a5d8a2e..a2be007a4 100644 --- a/packages/agent/tests/utils/test-agent.ts +++ b/packages/agent/tests/utils/test-agent.ts @@ -215,4 +215,19 @@ export function randomBytes(length: number): Uint8Array { } return randomBytes; +} + +/** + * Generates a random alpha-numeric string. + */ +export function randomString(length: number): string { + const charset = 'ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789'; + + // pick characters randomly + let randomString = ''; + for (let i = 0; i < length; i++) { + randomString += charset.charAt(Math.floor(Math.random() * charset.length)); + } + + return randomString; } \ No newline at end of file From 29d0c05721aba20283b3b685ab216a8d0d09b741 Mon Sep 17 00:00:00 2001 From: Liran Cohen Date: Fri, 29 Sep 2023 12:13:36 -0400 Subject: [PATCH 17/20] updated tests --- packages/agent/tests/sync-manager.spec.ts | 170 +++++++++++----------- 1 file changed, 88 insertions(+), 82 deletions(-) diff --git a/packages/agent/tests/sync-manager.spec.ts b/packages/agent/tests/sync-manager.spec.ts index ef9747ad4..2fb4b0824 100644 --- a/packages/agent/tests/sync-manager.spec.ts +++ b/packages/agent/tests/sync-manager.spec.ts @@ -13,7 +13,6 @@ import { TestManagedAgent } from '../src/test-managed-agent.js'; import { ProcessDwnRequest } from '../src/index.js'; import { DataStream, RecordsQueryReply, RecordsReadReply, RecordsWriteMessage } from '@tbd54566975/dwn-sdk-js'; import { Readable } from 'readable-stream'; -import { randomUuid } from '@web5/crypto/utils'; /** * Generates a `RecordsWrite` ProcessDwnRequest for testing. @@ -31,6 +30,31 @@ export function TestRecordsWriteMessage(target: string, author: string, dataStre }; } +async function streamToText(stream: ReadableStream): Promise { + const reader = stream.getReader(); + const decoder = new TextDecoder('utf-8'); // You can specify other encodings if necessary + let result = ''; + + try { + // eslint-disable-next-line + while (true) { + const { done, value } = await reader.read(); + + if (done) { + return result; + } + + result += decoder.decode(value, { stream: true }); + } + } catch (error) { + console.error('Error while reading stream:', error); + } finally { + reader.releaseLock(); + } + + return result; +} + describe('SyncManagerLevel', () => { describe('get agent', () => { it(`returns the 'agent' instance property`, async () => { @@ -447,26 +471,13 @@ describe('SyncManagerLevel', () => { it('synchronizes records with data larger than the `encodedData` limit within the `RecordsQuery` response', async () => { // larger than the size of data returned in a RecordsQuery const LARGE_DATA_SIZE = 70_000; - const everythingQuery = (): ProcessDwnRequest => { - return { - author : alice.did, - target : alice.did, - messageType : 'RecordsQuery', - messageOptions : { filter: { schema: 'testSchema' } } - }; - }; //register alice await testAgent.agent.syncManager.registerIdentity({ did: alice.did }); - const dataObject = { - id: randomUuid(), - randomString: randomString(LARGE_DATA_SIZE) - } - - const dataString = JSON.stringify(dataObject); + const dataString = randomString(LARGE_DATA_SIZE); // create remote record const record = await testAgent.agent.dwnManager.processRequest(TestRecordsWriteMessage( @@ -477,9 +488,10 @@ describe('SyncManagerLevel', () => { // check that records don't exist locally const { reply: localReply } = await testAgent.agent.dwnManager.processRequest({ - author: alice.did, target: alice.did, - messageType: 'RecordsQuery', - messageOptions: { filter: { recordId: (record.message as RecordsWriteMessage).recordId }} + author : alice.did, + target : alice.did, + messageType : 'RecordsQuery', + messageOptions : { filter: { recordId: (record.message as RecordsWriteMessage).recordId }} }); expect(localReply.status.code).to.equal(200); @@ -490,34 +502,33 @@ describe('SyncManagerLevel', () => { // query for local records const { reply: localReply2 } = await testAgent.agent.dwnManager.processRequest({ - author: alice.did, target: alice.did, - messageType: 'RecordsQuery', - messageOptions: { filter: { recordId: (record.message as RecordsWriteMessage).recordId }} + author : alice.did, + target : alice.did, + messageType : 'RecordsQuery', + messageOptions : { filter: { recordId: (record.message as RecordsWriteMessage).recordId }} }); + expect(localReply2.status.code).to.equal(200); expect(localReply2.entries?.length).to.equal(1); - + const entry = localReply2.entries![0]; // check for response encodedData if it doesn't exist issue a RecordsRead - for (const entry of localReply2.entries!) { - if (entry.encodedData === undefined) { - const recordId = (entry as RecordsWriteMessage).recordId; - // get individual records without encodedData to check that data exists - const record = await testAgent.agent.dwnManager.processRequest({ - author : alice.did, - target : alice.did, - messageType : 'RecordsRead', - messageOptions : { recordId } - }); - const reply = record.reply as RecordsReadReply; - expect(reply.status.code).to.equal(200); - expect(reply.record).to.not.be.undefined; - expect(reply.record!.data).to.not.be.undefined; - const data = await DataStream.toBytes(reply.record!.data); - const newObj = JSON.parse(new TextDecoder().decode(data)); - expect(data.length).to.equal(dataString.length); - expect(newObj.id).to.equal(dataObject.id); - expect(newObj.randomString).to.equal(dataObject.randomString); - } + if (entry.encodedData === undefined) { + const recordId = (entry as RecordsWriteMessage).recordId; + // get individual records without encodedData to check that data exists + const record = await testAgent.agent.dwnManager.processRequest({ + author : alice.did, + target : alice.did, + messageType : 'RecordsRead', + messageOptions : { recordId } + }); + const reply = record.reply as RecordsReadReply; + expect(reply.status.code).to.equal(200); + expect(reply.record).to.not.be.undefined; + expect(reply.record!.data).to.not.be.undefined; + const data = await DataStream.toBytes(reply.record!.data); + const replyDataString = new TextDecoder().decode(data); + expect(replyDataString.length).to.equal(dataString.length); + expect(replyDataString).to.equal(dataString); } }).timeout(5_000); }); @@ -646,26 +657,12 @@ describe('SyncManagerLevel', () => { // larger than the size of data returned in a RecordsQuery const LARGE_DATA_SIZE = 70_000; - const everythingQuery = (): ProcessDwnRequest => { - return { - author : alice.did, - target : alice.did, - messageType : 'RecordsQuery', - messageOptions : { filter: { schema: 'testSchema' } } - }; - }; - //register alice await testAgent.agent.syncManager.registerIdentity({ did: alice.did }); - const dataObject = { - id: randomUuid(), - randomString: randomString(LARGE_DATA_SIZE) - } - - const dataString = JSON.stringify(dataObject); + const dataString = randomString(LARGE_DATA_SIZE); // create remote local record to sync to remote const remoteRecord = await testAgent.agent.dwnManager.processRequest(TestRecordsWriteMessage( @@ -676,7 +673,12 @@ describe('SyncManagerLevel', () => { expect(remoteRecord.reply.status.code).to.equal(202); // check that records don't exist on remote - const { reply: remoteReply } = await testAgent.agent.dwnManager.sendRequest(everythingQuery()); + const { reply: remoteReply } = await testAgent.agent.dwnManager.sendRequest({ + author : alice.did, + target : alice.did, + messageType : 'RecordsQuery', + messageOptions : { filter: { schema: 'testSchema' } } + }); expect(remoteReply.status.code).to.equal(200); expect(remoteReply.entries?.length).to.equal(0); @@ -684,34 +686,38 @@ describe('SyncManagerLevel', () => { await testAgent.agent.syncManager.sync(); // query for for remote records that now exist - const { reply: remoteReply2 } = await testAgent.agent.dwnManager.sendRequest(everythingQuery()); + const { reply: remoteReply2 } = await testAgent.agent.dwnManager.sendRequest({ + author : alice.did, + target : alice.did, + messageType : 'RecordsQuery', + messageOptions : { filter: { schema: 'testSchema' } } + }); expect(remoteReply2.status.code).to.equal(200); expect(remoteReply2.entries?.length).to.equal(1); + const entry = remoteReply2.entries![0]; + // check for response encodedData if it doesn't exist issue a RecordsRead - for (const entry of remoteReply2.entries!) { - if (entry.encodedData === undefined) { - const recordId = (entry as RecordsWriteMessage).recordId; - // get individual records without encodedData to check that data exists - const record = await testAgent.agent.dwnManager.sendRequest({ - author : alice.did, - target : alice.did, - messageType : 'RecordsRead', - messageOptions : { recordId } - }); - const reply = record.reply as RecordsReadReply; - expect(reply.status.code).to.equal(200); - expect(reply.record).to.not.be.undefined; - expect(reply.record!.data).to.not.be.undefined; - - const replyRecord = reply.record as unknown as RecordsWriteMessage & { data: ReadableStream }; - expect(replyRecord.data).to.exist; - expect(replyRecord.data instanceof ReadableStream).to.be.true; - - const { value } = await replyRecord.data.getReader().read(); - const replyDataString = new TextDecoder().decode(value); - expect(replyDataString.length).to.eq(dataString.length); - } + if (entry.encodedData === undefined) { + const recordId = (entry as RecordsWriteMessage).recordId; + // get individual records without encodedData to check that data exists + const record = await testAgent.agent.dwnManager.sendRequest({ + author : alice.did, + target : alice.did, + messageType : 'RecordsRead', + messageOptions : { recordId } + }); + const reply = record.reply as RecordsReadReply; + expect(reply.status.code).to.equal(200); + expect(reply.record).to.not.be.undefined; + expect(reply.record!.data).to.not.be.undefined; + + const replyRecord = reply.record as unknown as RecordsWriteMessage & { data: ReadableStream }; + expect(replyRecord.data).to.exist; + expect(replyRecord.data instanceof ReadableStream).to.be.true; + const replyDataString = await streamToText(replyRecord.data); + expect(replyDataString.length).to.eq(dataString.length); + expect(replyDataString).to.equal(dataString); } }).timeout(5_000); }); From 49f81d7338d7e2f7bad423f03fcd8b3a1a8cea3e Mon Sep 17 00:00:00 2001 From: Liran Cohen Date: Tue, 3 Oct 2023 11:06:04 -0400 Subject: [PATCH 18/20] remove uneeded set --- packages/agent/tests/sync-manager.spec.ts | 34 ++++++++++------------- 1 file changed, 15 insertions(+), 19 deletions(-) diff --git a/packages/agent/tests/sync-manager.spec.ts b/packages/agent/tests/sync-manager.spec.ts index 2fb4b0824..75442ca18 100644 --- a/packages/agent/tests/sync-manager.spec.ts +++ b/packages/agent/tests/sync-manager.spec.ts @@ -255,31 +255,27 @@ describe('SyncManagerLevel', () => { }; }; - const localRecords = new Set( - (await Promise.all(Array(5).fill({}).map(_ => testAgent.agent.dwnManager.processRequest(TestRecordsWriteMessage( - alice.did, - alice.did, - new Blob([randomString(256)]), - ))))).map(r => (r.message as RecordsWriteMessage).recordId) - ); - - const remoteRecords = new Set( - (await Promise.all(Array(5).fill({}).map(_ => testAgent.agent.dwnManager.sendRequest(TestRecordsWriteMessage( - alice.did, - alice.did, - new Blob([randomString(256)]), - ))))).map(r => (r.message as RecordsWriteMessage).recordId) - ); + const localRecords = (await Promise.all(Array(5).fill({}).map(_ => testAgent.agent.dwnManager.processRequest(TestRecordsWriteMessage( + alice.did, + alice.did, + new Blob([randomString(256)]), + ))))).map(r => (r.message as RecordsWriteMessage).recordId); + + const remoteRecords = (await Promise.all(Array(5).fill({}).map(_ => testAgent.agent.dwnManager.sendRequest(TestRecordsWriteMessage( + alice.did, + alice.did, + new Blob([randomString(256)]), + ))))).map(r => (r.message as RecordsWriteMessage).recordId); const { reply: localReply } = await testAgent.agent.dwnManager.processRequest(everythingQuery()); expect(localReply.status.code).to.equal(200); - expect(localReply.entries?.length).to.equal(localRecords.size); - expect(localReply.entries?.every(e => localRecords.has((e as RecordsWriteMessage).recordId))).to.be.true; + expect(localReply.entries?.length).to.equal(localRecords.length); + expect(localReply.entries?.every(e => localRecords.includes((e as RecordsWriteMessage).recordId))).to.be.true; const { reply: remoteReply } = await testAgent.agent.dwnManager.sendRequest(everythingQuery()); expect(remoteReply.status.code).to.equal(200); - expect(remoteReply.entries?.length).to.equal(remoteRecords.size); - expect(remoteReply.entries?.every(e => remoteRecords.has((e as RecordsWriteMessage).recordId))).to.be.true; + expect(remoteReply.entries?.length).to.equal(remoteRecords.length); + expect(remoteReply.entries?.every(e => remoteRecords.includes((e as RecordsWriteMessage).recordId))).to.be.true; await testAgent.agent.syncManager.sync(); From 0143dfe0770ef61de553f05ea7ce43b996f96f97 Mon Sep 17 00:00:00 2001 From: Liran Cohen Date: Tue, 3 Oct 2023 18:45:50 -0400 Subject: [PATCH 19/20] dwn sdk rebase fixes --- packages/agent/src/sync-manager.ts | 8 ++++++-- packages/agent/tests/sync-manager.spec.ts | 4 ++-- 2 files changed, 8 insertions(+), 4 deletions(-) diff --git a/packages/agent/src/sync-manager.ts b/packages/agent/src/sync-manager.ts index 9100bf64a..b13ad6ae3 100644 --- a/packages/agent/src/sync-manager.ts +++ b/packages/agent/src/sync-manager.ts @@ -183,7 +183,9 @@ export class SyncManagerLevel implements SyncManager { author : did, messageType : 'RecordsRead', messageOptions : { - recordId: message['recordId'] + filter: { + recordId: message['recordId'] + } } }); @@ -467,7 +469,9 @@ export class SyncManagerLevel implements SyncManager { target : author, messageType : 'RecordsRead', messageOptions : { - recordId: writeMessage.recordId + filter: { + recordId: writeMessage.recordId + } } }); const reply = readResponse.reply as RecordsReadReply; diff --git a/packages/agent/tests/sync-manager.spec.ts b/packages/agent/tests/sync-manager.spec.ts index 75442ca18..6623b2473 100644 --- a/packages/agent/tests/sync-manager.spec.ts +++ b/packages/agent/tests/sync-manager.spec.ts @@ -515,7 +515,7 @@ describe('SyncManagerLevel', () => { author : alice.did, target : alice.did, messageType : 'RecordsRead', - messageOptions : { recordId } + messageOptions : { filter: { recordId } } }); const reply = record.reply as RecordsReadReply; expect(reply.status.code).to.equal(200); @@ -701,7 +701,7 @@ describe('SyncManagerLevel', () => { author : alice.did, target : alice.did, messageType : 'RecordsRead', - messageOptions : { recordId } + messageOptions : { filter: { recordId } } }); const reply = record.reply as RecordsReadReply; expect(reply.status.code).to.equal(200); From 1fb120f88f4e133c9ebf6484ad77233047defaff Mon Sep 17 00:00:00 2001 From: Liran Cohen Date: Tue, 3 Oct 2023 19:08:50 -0400 Subject: [PATCH 20/20] update test timeouts --- packages/agent/tests/sync-manager.spec.ts | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/packages/agent/tests/sync-manager.spec.ts b/packages/agent/tests/sync-manager.spec.ts index 6623b2473..20e997afc 100644 --- a/packages/agent/tests/sync-manager.spec.ts +++ b/packages/agent/tests/sync-manager.spec.ts @@ -255,13 +255,13 @@ describe('SyncManagerLevel', () => { }; }; - const localRecords = (await Promise.all(Array(5).fill({}).map(_ => testAgent.agent.dwnManager.processRequest(TestRecordsWriteMessage( + const localRecords = (await Promise.all(Array(3).fill({}).map(_ => testAgent.agent.dwnManager.processRequest(TestRecordsWriteMessage( alice.did, alice.did, new Blob([randomString(256)]), ))))).map(r => (r.message as RecordsWriteMessage).recordId); - const remoteRecords = (await Promise.all(Array(5).fill({}).map(_ => testAgent.agent.dwnManager.sendRequest(TestRecordsWriteMessage( + const remoteRecords = (await Promise.all(Array(3).fill({}).map(_ => testAgent.agent.dwnManager.sendRequest(TestRecordsWriteMessage( alice.did, alice.did, new Blob([randomString(256)]), @@ -290,7 +290,7 @@ describe('SyncManagerLevel', () => { expect(allLocalReply.entries?.length).to.equal(records.size); expect(allLocalReply.entries?.every(e => records.has((e as RecordsWriteMessage).recordId))).to.be.true; - }).timeout(10_000); + }).timeout(5_000); it('should skip dwn if there a failure getting syncState', async () => { await testAgent.agent.syncManager.registerIdentity({ @@ -526,7 +526,7 @@ describe('SyncManagerLevel', () => { expect(replyDataString.length).to.equal(dataString.length); expect(replyDataString).to.equal(dataString); } - }).timeout(5_000); + }); }); describe('push', () => { @@ -715,7 +715,7 @@ describe('SyncManagerLevel', () => { expect(replyDataString.length).to.eq(dataString.length); expect(replyDataString).to.equal(dataString); } - }).timeout(5_000); + }); }); }); });