diff --git a/package-lock.json b/package-lock.json index 20c1b9b94..840c4ae04 100644 --- a/package-lock.json +++ b/package-lock.json @@ -21,7 +21,7 @@ "devDependencies": { "@npmcli/package-json": "5.0.0", "@typescript-eslint/eslint-plugin": "6.4.0", - "@web5/dwn-server": "0.1.7", + "@web5/dwn-server": "0.1.9", "eslint-plugin-mocha": "10.1.0" } }, @@ -2474,9 +2474,9 @@ } }, "node_modules/@tbd54566975/dwn-sdk-js": { - "version": "0.2.8", - "resolved": "https://registry.npmjs.org/@tbd54566975/dwn-sdk-js/-/dwn-sdk-js-0.2.8.tgz", - "integrity": "sha512-oiKk+ekAQO94bUkt6yk+xkDY8uCGmNC+rKaYQLhAoTrhYrczeRSuDT04F5/vPBT5K6NfAoRcQsIyBmvgRCUvgA==", + "version": "0.2.10", + "resolved": "https://registry.npmjs.org/@tbd54566975/dwn-sdk-js/-/dwn-sdk-js-0.2.10.tgz", + "integrity": "sha512-CoKO8+NciwWNzD4xRoAAgeElqQCXKM4Fc+zEHsUWD0M3E9v67hRWiTHI6AenUfQv1RSEB2H4GHUeUOHuEV72uw==", "dependencies": { "@ipld/dag-cbor": "9.0.3", "@js-temporal/polyfill": "0.4.4", @@ -2487,7 +2487,6 @@ "blockstore-core": "4.2.0", "cross-fetch": "4.0.0", "eciesjs": "0.4.5", - "flat": "5.0.2", "interface-blockstore": "5.2.3", "interface-store": "5.1.2", "ipfs-unixfs-exporter": "13.1.5", @@ -2562,13 +2561,13 @@ } }, "node_modules/@tbd54566975/dwn-sql-store": { - "version": "0.2.4", - "resolved": "https://registry.npmjs.org/@tbd54566975/dwn-sql-store/-/dwn-sql-store-0.2.4.tgz", - "integrity": "sha512-LwSpQMcKtNEAj8ffn/UGrYwiENjN4S7rIDX5FDJ4+qjnMbF7I051i1bpw3INKnrtnVWAuynD5033EHSKLml0Zw==", + "version": "0.2.6", + "resolved": "https://registry.npmjs.org/@tbd54566975/dwn-sql-store/-/dwn-sql-store-0.2.6.tgz", + "integrity": "sha512-N5SSyKGgHoW7ttWW6xrPq4xK7aYfxDvrmXdYEi+eA3qlT+Wzi5HZfrlcSnFIHee9+s56V6WpJw1AQ6XmjZH2QQ==", "dev": true, "dependencies": { "@ipld/dag-cbor": "^9.0.5", - "@tbd54566975/dwn-sdk-js": "0.2.8", + "@tbd54566975/dwn-sdk-js": "0.2.10", "kysely": "0.26.3", "multiformats": "12.0.1", "readable-stream": "4.4.2" @@ -2598,9 +2597,9 @@ "dev": true }, "node_modules/@tbd54566975/dwn-sql-store/node_modules/cborg": { - "version": "4.0.5", - "resolved": "https://registry.npmjs.org/cborg/-/cborg-4.0.5.tgz", - "integrity": "sha512-q8TAjprr8pn9Fp53rOIGp/UFDdFY6os2Nq62YogPSIzczJD9M6g2b6igxMkpCiZZKJ0kn/KzDLDvG+EqBIEeCg==", + "version": "4.0.6", + "resolved": "https://registry.npmjs.org/cborg/-/cborg-4.0.6.tgz", + "integrity": "sha512-McNIJHMQKQv/WgSE1JqWfqS4kaeN5g9GRA5MqVCt1+66TGsywkpzBUywpZ/HWF3Ik8yudSR+ZPlq6TRBEZXQyA==", "dev": true, "bin": { "cborg": "lib/bin.js" @@ -3627,13 +3626,13 @@ "link": true }, "node_modules/@web5/dwn-server": { - "version": "0.1.7", - "resolved": "https://registry.npmjs.org/@web5/dwn-server/-/dwn-server-0.1.7.tgz", - "integrity": "sha512-SlOL3Fzq/O43c7v+cJS+cMassuHyKQxmoPJQ7U/OjZNw51yHAveQJ3nKbxkUuiHIc+ERMKGE7tW4ignhxieSgQ==", + "version": "0.1.9", + "resolved": "https://registry.npmjs.org/@web5/dwn-server/-/dwn-server-0.1.9.tgz", + "integrity": "sha512-t1xpWGQ+hbIglu0OjZS3DG6Q8pmrxK2TmPo53YaxNpceKNT9tkqJqgssiIJzVWiQS90BmR/JULchKR/aQX1sOg==", "dev": true, "dependencies": { - "@tbd54566975/dwn-sdk-js": "0.2.8", - "@tbd54566975/dwn-sql-store": "0.2.4", + "@tbd54566975/dwn-sdk-js": "0.2.10", + "@tbd54566975/dwn-sql-store": "0.2.6", "better-sqlite3": "^8.5.0", "bytes": "3.1.2", "cors": "2.8.5", @@ -3650,6 +3649,9 @@ "response-time": "2.3.2", "uuid": "9.0.0", "ws": "8.12.0" + }, + "bin": { + "dwn-server": "dist/esm/src/main.js" } }, "node_modules/@web5/dwn-server/node_modules/uuid": { @@ -7044,6 +7046,7 @@ "version": "5.0.2", "resolved": "https://registry.npmjs.org/flat/-/flat-5.0.2.tgz", "integrity": "sha512-b6suED+5/3rTpUBdG1gupIl8MPFCAMA0QXwmljLhvCUKcUvdE4gWky9zpuGCcXHOsz4J9wPGNWq6OKpmIzz3hQ==", + "dev": true, "bin": { "flat": "cli.js" } @@ -14210,13 +14213,14 @@ "version": "0.2.5", "license": "Apache-2.0", "dependencies": { - "@tbd54566975/dwn-sdk-js": "0.2.8", + "@tbd54566975/dwn-sdk-js": "0.2.10", "@web5/common": "0.2.2", "@web5/crypto": "0.2.2", "@web5/dids": "0.2.4", "level": "8.0.0", "readable-stream": "4.4.2", - "readable-web-to-node-stream": "3.0.2" + "readable-web-to-node-stream": "3.0.2", + "ulidx": "2.1.0" }, "devDependencies": { "@playwright/test": "1.40.1", @@ -14569,7 +14573,7 @@ "version": "0.8.3", "license": "Apache-2.0", "dependencies": { - "@tbd54566975/dwn-sdk-js": "0.2.8", + "@tbd54566975/dwn-sdk-js": "0.2.10", "@web5/agent": "0.2.5", "@web5/common": "0.2.2", "@web5/crypto": "0.2.2", diff --git a/package.json b/package.json index 8c137e961..7923ce931 100644 --- a/package.json +++ b/package.json @@ -27,7 +27,7 @@ "devDependencies": { "@npmcli/package-json": "5.0.0", "@typescript-eslint/eslint-plugin": "6.4.0", - "@web5/dwn-server": "0.1.7", + "@web5/dwn-server": "0.1.9", "eslint-plugin-mocha": "10.1.0" } } diff --git a/packages/agent/package.json b/packages/agent/package.json index 11a1c922e..33d59e72d 100644 --- a/packages/agent/package.json +++ b/packages/agent/package.json @@ -68,13 +68,14 @@ "node": ">=18.0.0" }, "dependencies": { - "@tbd54566975/dwn-sdk-js": "0.2.8", + "@tbd54566975/dwn-sdk-js": "0.2.10", "@web5/common": "0.2.2", "@web5/crypto": "0.2.2", "@web5/dids": "0.2.4", "level": "8.0.0", "readable-stream": "4.4.2", - "readable-web-to-node-stream": "3.0.2" + "readable-web-to-node-stream": "3.0.2", + "ulidx": "2.1.0" }, "devDependencies": { "@playwright/test": "1.40.1", diff --git a/packages/agent/src/dwn-manager.ts b/packages/agent/src/dwn-manager.ts index 8f94612f5..44a6be195 100644 --- a/packages/agent/src/dwn-manager.ts +++ b/packages/agent/src/dwn-manager.ts @@ -350,11 +350,11 @@ export class DwnManager { const result: MessagesGetReply = await this._dwn.processMessage(author, messagesGet.message); - if (!(result.messages && result.messages.length === 1)) { + if (!(result.entries && result.entries.length === 1)) { throw new Error('TODO: figure out error message'); } - const [ messageEntry ] = result.messages; + const [ messageEntry ] = result.entries; let { message } = messageEntry; if (!message) { @@ -421,22 +421,6 @@ export class DwnManager { return dwnMessage; } - /** - * Writes a pruned initial `RecordsWrite` to a DWN without needing to supply associated data. - * Note: This method should ONLY be used by a {@link SyncManager} implementation. - * - * @param options.targetDid - DID of the DWN tenant to write the pruned RecordsWrite to. - * @returns DWN reply containing the status of processing request. - */ - public async writePrunedRecord(options: { - targetDid: string, - message: RecordsWriteMessage - }): Promise { - const { targetDid, message } = options; - - return await this._dwn.synchronizePrunedInitialRecordsWrite(targetDid, message); - } - public async processMessage(options: { targetDid: string, message: GenericMessage, @@ -446,13 +430,4 @@ export class DwnManager { return await this._dwn.processMessage(targetDid, message, dataStream); } -} - -type GenericMessageReply = { - status: Status; -}; - -type Status = { - code: number - detail: string -}; \ No newline at end of file +} \ No newline at end of file diff --git a/packages/agent/src/store-managed-did.ts b/packages/agent/src/store-managed-did.ts index 3979191a4..f415dc707 100644 --- a/packages/agent/src/store-managed-did.ts +++ b/packages/agent/src/store-managed-did.ts @@ -1,4 +1,4 @@ -import type { RecordsWriteMessage } from '@tbd54566975/dwn-sdk-js'; +import type { RecordsQueryReply, RecordsWriteMessage } from '@tbd54566975/dwn-sdk-js'; import { Convert } from '@web5/common'; @@ -45,7 +45,7 @@ export class DidStoreDwn implements ManagedDidStore { // Loop through all of the entries and try to find a match. let matchingRecordId: string | undefined; - for (const record of queryReply.entries ?? []) { + for (const record of (queryReply as RecordsQueryReply).entries ?? []) { if (record.encodedData) { const storedDid = Convert.base64Url(record.encodedData).toObject() as ManagedDid; if (storedDid && storedDid.did === did) { @@ -94,7 +94,7 @@ export class DidStoreDwn implements ManagedDidStore { }); // Loop through all of the entries and return a match, if found. - for (const record of queryReply.entries ?? []) { + for (const record of (queryReply as RecordsQueryReply).entries ?? []) { if (record.encodedData) { const storedDid = Convert.base64Url(record.encodedData).toObject() as ManagedDid; if (storedDid && storedDid.did === did) return storedDid; @@ -125,7 +125,7 @@ export class DidStoreDwn implements ManagedDidStore { }); // Loop through all of the entries and return a match, if found. - for (const record of queryReply.entries ?? []) { + for (const record of (queryReply as RecordsQueryReply).entries ?? []) { if (record.encodedData) { const storedDid = Convert.base64Url(record.encodedData).toObject() as ManagedDid; if (storedDid && storedDid.did === did) return storedDid; @@ -190,7 +190,7 @@ export class DidStoreDwn implements ManagedDidStore { // Loop through all of the entries and accumulate the DID objects. let storedDids: ManagedDid[] = []; - for (const record of queryReply.entries ?? []) { + for (const record of (queryReply as RecordsQueryReply).entries ?? []) { if (record.encodedData) { const storedDid = Convert.base64Url(record.encodedData).toObject() as ManagedDid; storedDids.push(storedDid); diff --git a/packages/agent/src/store-managed-identity.ts b/packages/agent/src/store-managed-identity.ts index cfc8f9d06..57916c240 100644 --- a/packages/agent/src/store-managed-identity.ts +++ b/packages/agent/src/store-managed-identity.ts @@ -2,7 +2,7 @@ import { Convert } from '@web5/common'; import type { Web5ManagedAgent } from './types/agent.js'; import type { ManagedIdentity } from './identity-manager.js'; -import type { RecordsWriteMessage } from '@tbd54566975/dwn-sdk-js'; +import type { RecordsQueryReply, RecordsWriteMessage } from '@tbd54566975/dwn-sdk-js'; export interface ManagedIdentityStore { deleteIdentity(options: { did: string, agent?: Web5ManagedAgent, context?: string }): Promise @@ -42,7 +42,7 @@ export class IdentityStoreDwn implements ManagedIdentityStore { // Loop through all of the entries and try to find a match. let matchingRecordId: string | undefined; - for (const record of queryReply.entries ?? []) { + for (const record of (queryReply as RecordsQueryReply).entries ?? []) { if (record.encodedData) { const storedIdentity = Convert.base64Url(record.encodedData).toObject() as ManagedIdentity; if (storedIdentity && storedIdentity.did === did) { @@ -91,7 +91,7 @@ export class IdentityStoreDwn implements ManagedIdentityStore { }); // Loop through all of the entries and return a match, if found. - for (const record of queryReply.entries ?? []) { + for (const record of (queryReply as RecordsQueryReply).entries ?? []) { if (record.encodedData) { const storedIdentity = Convert.base64Url(record.encodedData).toObject() as ManagedIdentity; if (storedIdentity && storedIdentity.did === did) return storedIdentity; @@ -156,7 +156,7 @@ export class IdentityStoreDwn implements ManagedIdentityStore { // Loop through all of the entries and accumulate the Identity objects. let storedIdentities: ManagedIdentity[] = []; - for (const record of queryReply.entries ?? []) { + for (const record of (queryReply as RecordsQueryReply).entries ?? []) { if (record.encodedData) { const storedIdentity = Convert.base64Url(record.encodedData).toObject() as ManagedIdentity; storedIdentities.push(storedIdentity); diff --git a/packages/agent/src/store-managed-key.ts b/packages/agent/src/store-managed-key.ts index 41a35b596..13c7a2632 100644 --- a/packages/agent/src/store-managed-key.ts +++ b/packages/agent/src/store-managed-key.ts @@ -1,4 +1,4 @@ -import type { RecordsWriteMessage, RecordsWriteOptions } from '@tbd54566975/dwn-sdk-js'; +import type { RecordsQueryReply, RecordsWriteMessage, RecordsWriteOptions } from '@tbd54566975/dwn-sdk-js'; import { utils as cryptoUtils } from '@web5/crypto'; import { Convert, removeEmptyObjects, removeUndefinedProperties } from '@web5/common'; @@ -63,7 +63,7 @@ export class KeyStoreDwn implements ManagedKeyStore; + private _ulidFactory: ULIDFactory; constructor(options?: SyncManagerOptions) { let { agent, dataPath = 'DATA/AGENT/SYNC_STORE', db } = options ?? {}; this._agent = agent; this._db = (db) ? db : new Level(dataPath); + this._ulidFactory = monotonicFactory(); } /** @@ -106,7 +110,7 @@ export class SyncManagerLevel implements SyncManager { for (let job of pullJobs) { const [key] = job; - const [did, dwnUrl, watermark, messageCid] = key.split('~'); + const [did, dwnUrl, _, messageCid] = key.split('~'); // If a particular DWN service endpoint is unreachable, skip subsequent pull operations. if (errored.has(dwnUrl)) { @@ -115,9 +119,7 @@ export class SyncManagerLevel implements SyncManager { const messageExists = await this.messageExists(did, messageCid); if (messageExists) { - await this.setWatermark(did, dwnUrl, 'pull', watermark); deleteOperations.push({ type: 'del', key: key }); - continue; } @@ -147,10 +149,8 @@ export class SyncManagerLevel implements SyncManager { * values to batch network requests for record messages rather than one at a time, as it * is currently implemented. Either the pull() method should be refactored to batch * getting messages OR this loop should be removed. */ - for (let entry of reply.messages ?? []) { + for (let entry of reply.entries ?? []) { if (entry.error || !entry.message) { - - await this.setWatermark(did, dwnUrl, 'pull', watermark); await this.addMessage(did, messageCid); deleteOperations.push({ type: 'del', key: key }); @@ -192,13 +192,13 @@ export class SyncManagerLevel implements SyncManager { dataStream = webReadableToIsomorphicNodeReadable(record.data as any); } else if (readStatus.code >= 400) { - const pruneReply = await this.agent.dwnManager.writePrunedRecord({ + // writes record without data, if this is an initial records write, it will succeed. + const pruneReply = await this.agent.dwnManager.processMessage({ targetDid: did, message }); if (pruneReply.status.code === 202 || pruneReply.status.code === 409) { - await this.setWatermark(did, dwnUrl, 'pull', watermark); await this.addMessage(did, messageCid); deleteOperations.push({ type: 'del', key: key }); @@ -217,7 +217,6 @@ export class SyncManagerLevel implements SyncManager { }); if (pullReply.status.code === 202 || pullReply.status.code === 409) { - await this.setWatermark(did, dwnUrl, 'pull', watermark); await this.addMessage(did, messageCid); deleteOperations.push({ type: 'del', key: key }); } @@ -239,7 +238,7 @@ export class SyncManagerLevel implements SyncManager { for (let job of pushJobs) { const [key] = job; - const [did, dwnUrl, watermark, messageCid] = key.split('~'); + const [did, dwnUrl, _, messageCid] = key.split('~'); // If a particular DWN service endpoint is unreachable, skip subsequent push operations. if (errored.has(dwnUrl)) { @@ -254,7 +253,6 @@ export class SyncManagerLevel implements SyncManager { * message to the local message store, and continue to the next job. */ if (!dwnMessage) { deleteOperations.push({ type: 'del', key: key }); - await this.setWatermark(did, dwnUrl, 'push', watermark); await this.addMessage(did, messageCid); continue; @@ -273,7 +271,6 @@ export class SyncManagerLevel implements SyncManager { * - 409: message was already present on the remote DWN */ if (reply.status.code === 202 || reply.status.code === 409) { - await this.setWatermark(did, dwnUrl, 'push', watermark); await this.addMessage(did, messageCid); deleteOperations.push({ type: 'del', key: key }); } @@ -302,11 +299,12 @@ export class SyncManagerLevel implements SyncManager { const { interval = 120_000 } = options; return new Promise((resolve, reject) => { - if (this._syncIntervalId) { - clearInterval(this._syncIntervalId); - } - this._syncIntervalId = setInterval(async () => { + const intervalSync = async () => { + if (this._syncIntervalId) { + clearInterval(this._syncIntervalId); + } + try { await this.push(); await this.pull(); @@ -314,7 +312,12 @@ export class SyncManagerLevel implements SyncManager { this.stopSync(); reject(error); } - }, interval); + + // then we start sync again + this._syncIntervalId = setInterval(intervalSync, interval); + }; + + this._syncIntervalId = setInterval(intervalSync, interval); }); } @@ -334,15 +337,17 @@ export class SyncManagerLevel implements SyncManager { for (let syncState of syncPeerState) { // Get the event log from the remote DWN if pull sync, or local DWN if push sync. const eventLog = await this.getDwnEventLog({ - did : syncState.did, - dwnUrl : syncState.dwnUrl, - syncDirection, - watermark : syncState.watermark + did : syncState.did, + dwnUrl : syncState.dwnUrl, + cursor : syncState.cursor, + syncDirection }); + const syncOperations: DbBatchOperation[] = []; - for (let event of eventLog) { + for (let messageCid of eventLog) { + const watermark = this._ulidFactory(); /** Use "did~dwnUrl~watermark~messageCid" as the key in the sync queue. * Note: It is critical that `watermark` precedes `messageCid` to * ensure that when the sync jobs are pulled off the queue, they @@ -350,12 +355,11 @@ export class SyncManagerLevel implements SyncManager { const operationKey = [ syncState.did, syncState.dwnUrl, - event.watermark, - event.messageCid + watermark, + messageCid ].join('~'); const operation: DbBatchOperation = { type: 'put', key: operationKey, value: '' }; - syncOperations.push(operation); } @@ -372,9 +376,9 @@ export class SyncManagerLevel implements SyncManager { did: string, dwnUrl: string, syncDirection: SyncDirection, - watermark?: string + cursor?: string }) { - const { did, dwnUrl, syncDirection, watermark } = options; + const { did, dwnUrl, syncDirection, cursor } = options; let eventsReply = {} as EventsGetReply; @@ -383,7 +387,7 @@ export class SyncManagerLevel implements SyncManager { const eventsGetMessage = await this.agent.dwnManager.createMessage({ author : did, messageType : 'EventsGet', - messageOptions : { watermark } + messageOptions : { cursor } }); try { @@ -391,22 +395,27 @@ export class SyncManagerLevel implements SyncManager { dwnUrl : dwnUrl, targetDid : did, message : eventsGetMessage - }); + }) as EventsGetReply; } catch { // If a particular DWN service endpoint is unreachable, silently ignore. } } else if (syncDirection === 'push') { // When sync is a push, get the event log from the local DWN. - ({ reply: eventsReply } = await this.agent.dwnManager.processRequest({ + const eventsGetDwnResponse = await this.agent.dwnManager.processRequest({ author : did, target : did, messageType : 'EventsGet', - messageOptions : { watermark } - })); + messageOptions : { cursor } + }); + eventsReply = eventsGetDwnResponse.reply as EventsGetReply; } - const eventLog = eventsReply.events ?? []; + const eventLog = eventsReply.entries ?? []; + if (eventLog.length > 0) { + const cursorItem = eventLog.at(-1)!; + this.setCursor(did, dwnUrl, syncDirection, cursorItem); + } return eventLog; } @@ -424,17 +433,17 @@ export class SyncManagerLevel implements SyncManager { } }); - const reply: MessagesGetReply = messagesGetResponse.reply; + const reply = messagesGetResponse.reply as MessagesGetReply; /** Absence of a messageEntry or message within messageEntry can happen because updating a * Record creates another RecordsWrite with the same recordId. Only the first and * most recent RecordsWrite messages are kept for a given recordId. Any RecordsWrite messages * that aren't the first or most recent are discarded by the DWN. */ - if (!(reply.messages && reply.messages.length === 1)) { + if (!(reply.entries && reply.entries.length === 1)) { return undefined; } - const [ messageEntry ] = reply.messages; + const [ messageEntry ] = reply.entries; let { message } = messageEntry; if (!message) { @@ -523,23 +532,23 @@ export class SyncManagerLevel implements SyncManager { throw new Error(`SyncManager: Malformed '#dwn' service endpoint. Expected array of node addresses.`); } - /** Get the watermark (or undefined) for each (DID, DWN service endpoint, sync direction) + /** Get the cursor (or undefined) for each (DID, DWN service endpoint, sync direction) * combination and add it to the sync peer state array. */ for (let dwnUrl of service.serviceEndpoint.nodes) { - const watermark = await this.getWatermark(did, dwnUrl, syncDirection); - syncPeerState.push({ did, dwnUrl, watermark }); + const cursor = await this.getCursor(did, dwnUrl, syncDirection); + syncPeerState.push({ did, dwnUrl, cursor}); } } return syncPeerState; } - private async getWatermark(did: string, dwnUrl: string, direction: SyncDirection) { - const wmKey = `${did}~${dwnUrl}~${direction}`; - const watermarkStore = this.getWatermarkStore(); - + private async getCursor(did: string, dwnUrl: string, direction: SyncDirection): Promise { + const cursorKey = `${did}~${dwnUrl}~${direction}`; + const cursorsStore = this.getCursorStore(); try { - return await watermarkStore.get(wmKey); + const cursorValue = await cursorsStore.get(cursorKey); + return cursorValue; } catch(error: any) { // Don't throw when a key wasn't found. if (error.notFound) { @@ -548,11 +557,10 @@ export class SyncManagerLevel implements SyncManager { } } - private async setWatermark(did: string, dwnUrl: string, direction: SyncDirection, watermark: string) { - const wmKey = `${did}~${dwnUrl}~${direction}`; - const watermarkStore = this.getWatermarkStore(); - - await watermarkStore.put(wmKey, watermark); + private async setCursor(did: string, dwnUrl: string, direction: SyncDirection, cursor: string) { + const cursorKey = `${did}~${dwnUrl}~${direction}`; + const cursorsStore = this.getCursorStore(); + await cursorsStore.put(cursorKey, cursor); } /** @@ -585,8 +593,8 @@ export class SyncManagerLevel implements SyncManager { return this._db.sublevel('history').sublevel(did).sublevel('messages'); } - private getWatermarkStore() { - return this._db.sublevel('watermarks'); + private getCursorStore() { + return this._db.sublevel('cursors'); } private getPushQueue() { diff --git a/packages/agent/tests/dwn-manager.spec.ts b/packages/agent/tests/dwn-manager.spec.ts index cfdab0dce..c3d050ff9 100644 --- a/packages/agent/tests/dwn-manager.spec.ts +++ b/packages/agent/tests/dwn-manager.spec.ts @@ -109,7 +109,7 @@ describe('DwnManager', () => { }); it('handles EventsGet', async () => { - const testWatermarkName = 'foo'; + const testCursor = 'foo'; // Attempt to process the EventsGet. let eventsGetResponse = await testAgent.agent.dwnManager.processRequest({ @@ -117,7 +117,7 @@ describe('DwnManager', () => { target : identity.did, messageType : 'EventsGet', messageOptions : { - watermark: testWatermarkName + cursor: testCursor, } }); @@ -126,12 +126,12 @@ describe('DwnManager', () => { expect(eventsGetResponse).to.have.property('reply'); const eventsGetMessage = eventsGetResponse.message as EventsGetMessage; - expect(eventsGetMessage.descriptor).to.have.property('watermark', testWatermarkName); + expect(eventsGetMessage.descriptor).to.have.property('cursor', testCursor); const eventsGetReply = eventsGetResponse.reply as EventsGetReply; expect(eventsGetReply).to.have.property('status'); expect(eventsGetReply.status.code).to.equal(200); - expect(eventsGetReply.events).to.have.length(0); + expect(eventsGetReply.entries).to.have.length(0); }); it('handles MessagesGet', async () => { @@ -177,11 +177,11 @@ describe('DwnManager', () => { const messagesGetReply = messagesGetResponse.reply as MessagesGetReply; expect(messagesGetReply).to.have.property('status'); expect(messagesGetReply.status.code).to.equal(200); - expect(messagesGetReply.messages).to.have.length(1); + expect(messagesGetReply.entries).to.have.length(1); - if (!Array.isArray(messagesGetReply.messages)) throw new Error('Type guard'); - if (messagesGetReply.messages.length !== 1) throw new Error('Type guard'); - const [ retrievedRecordsWrite ] = messagesGetReply.messages; + if (!Array.isArray(messagesGetReply.entries)) throw new Error('Type guard'); + if (messagesGetReply.entries.length !== 1) throw new Error('Type guard'); + const [ retrievedRecordsWrite ] = messagesGetReply.entries; expect(retrievedRecordsWrite.message).to.have.property('recordId', writeMessage.recordId); }); diff --git a/packages/agent/tests/sync-manager.spec.ts b/packages/agent/tests/sync-manager.spec.ts index e55587a92..70a205267 100644 --- a/packages/agent/tests/sync-manager.spec.ts +++ b/packages/agent/tests/sync-manager.spec.ts @@ -1,15 +1,18 @@ import type { PortableDid } from '@web5/dids'; +import type { RecordsQueryReply, RecordsQueryReplyEntry, RecordsWriteMessage } from '@tbd54566975/dwn-sdk-js'; -import { expect } from 'chai'; -import * as sinon from 'sinon'; -import { RecordsQueryReply, RecordsWriteMessage } from '@tbd54566975/dwn-sdk-js'; +import sinon from 'sinon'; +import chai, { expect } from 'chai'; +import chaiAsPromised from 'chai-as-promised'; import type { ManagedIdentity } from '../src/identity-manager.js'; -import { testDwnUrl } from './utils/test-config.js'; import { TestAgent } from './utils/test-agent.js'; -import { SyncManagerLevel } from '../src/sync-manager.js'; +import { testDwnUrl } from './utils/test-config.js'; import { TestManagedAgent } from '../src/test-managed-agent.js'; +import { SyncManagerLevel } from '../src/sync-manager.js'; + +chai.use(chaiAsPromised); let testDwnUrls: string[] = [testDwnUrl]; @@ -73,6 +76,101 @@ describe('SyncManagerLevel', () => { await testAgent.closeStorage(); }); + it('syncs multiple records in both directions', async () => { + // create 3 local records. + const localRecords: string[] = []; + for (let i = 0; i < 3; i++) { + const writeResponse = await testAgent.agent.dwnManager.processRequest({ + author : alice.did, + target : alice.did, + messageType : 'RecordsWrite', + messageOptions : { + dataFormat: 'text/plain' + }, + dataStream: new Blob([`Hello, ${i}`]) + }); + + localRecords.push((writeResponse.message as RecordsWriteMessage).recordId); + } + + // create 3 remote records + const remoteRecords: string[] = []; + for (let i = 0; i < 3; i++) { + let writeResponse = await testAgent.agent.dwnManager.sendRequest({ + author : alice.did, + target : alice.did, + messageType : 'RecordsWrite', + messageOptions : { + dataFormat: 'text/plain' + }, + dataStream: new Blob([`Hello, ${i}`]) + }); + remoteRecords.push((writeResponse.message as RecordsWriteMessage).recordId); + } + + // query local and check for only local records + let localQueryResponse = await testAgent.agent.dwnManager.processRequest({ + author : alice.did, + target : alice.did, + messageType : 'RecordsQuery', + messageOptions : { filter: { dataFormat: 'text/plain' } } + }); + let localDwnQueryReply = localQueryResponse.reply as RecordsQueryReply; + expect(localDwnQueryReply.status.code).to.equal(200); + expect(localDwnQueryReply.entries).to.have.length(3); + let localRecordsFromQuery = localDwnQueryReply.entries?.map(entry => entry.recordId); + expect(localRecordsFromQuery).to.have.members(localRecords); + + // query remote and check for only remote records + let remoteQueryResponse = await testAgent.agent.dwnManager.sendRequest({ + author : alice.did, + target : alice.did, + messageType : 'RecordsQuery', + messageOptions : { filter: { dataFormat: 'text/plain' } } + }); + let remoteDwnQueryReply = remoteQueryResponse.reply as RecordsQueryReply; + expect(remoteDwnQueryReply.status.code).to.equal(200); + expect(remoteDwnQueryReply.entries).to.have.length(3); + let remoteRecordsFromQuery = remoteDwnQueryReply.entries?.map(entry => entry.recordId); + expect(remoteRecordsFromQuery).to.have.members(remoteRecords); + + // Register Alice's DID to be synchronized. + await testAgent.agent.syncManager.registerIdentity({ + did: alice.did + }); + + // Execute Sync to pull all records from Alice's remote DWN to Alice's local DWN. + await testAgent.agent.syncManager.push(); + await testAgent.agent.syncManager.pull(); + + // query local node to see all records + localQueryResponse = await testAgent.agent.dwnManager.processRequest({ + author : alice.did, + target : alice.did, + messageType : 'RecordsQuery', + messageOptions : { filter: { dataFormat: 'text/plain' } } + }); + localDwnQueryReply = localQueryResponse.reply as RecordsQueryReply; + expect(localDwnQueryReply.status.code).to.equal(200); + expect(localDwnQueryReply.entries).to.have.length(6); + localRecordsFromQuery = localDwnQueryReply.entries?.map(entry => entry.recordId); + expect(localRecordsFromQuery).to.have.members([...localRecords, ...remoteRecords]); + + // query remote node to see all results + remoteQueryResponse = await testAgent.agent.dwnManager.sendRequest({ + author : alice.did, + target : alice.did, + messageType : 'RecordsQuery', + messageOptions : { filter: { dataFormat: 'text/plain' } } + }); + remoteDwnQueryReply = remoteQueryResponse.reply as RecordsQueryReply; + expect(remoteDwnQueryReply.status.code).to.equal(200); + expect(remoteDwnQueryReply.entries).to.have.length(6); + remoteRecordsFromQuery = remoteDwnQueryReply.entries?.map(entry => entry.recordId); + expect(remoteRecordsFromQuery).to.have.members([...localRecords, ...remoteRecords]); + + }); + describe('pull()', () => { it('takes no action if no identities are registered', async () => { const didResolveSpy = sinon.spy(testAgent.agent.didResolver, 'resolve'); @@ -118,7 +216,6 @@ describe('SyncManagerLevel', () => { await testAgent.agent.syncManager.registerIdentity({ did: alice.did }); - // Execute Sync to pull all records from Alice's remote DWN to Alice's local DWN. await testAgent.agent.syncManager.pull(); @@ -179,7 +276,7 @@ describe('SyncManagerLevel', () => { expect(localReply2.status.code).to.equal(200); expect(localReply2.entries?.length).to.equal(1); - const entry = localReply2.entries![0]; + const entry = localReply2.entries![0] as RecordsQueryReplyEntry; expect(entry.encodedData).to.be.undefined; // encodedData is undefined // check for response encodedData if it doesn't exist issue a RecordsRead @@ -376,7 +473,7 @@ describe('SyncManagerLevel', () => { expect(remoteReply2.status.code).to.equal(200); expect(remoteReply2.entries?.length).to.equal(1); - const entry = remoteReply2.entries![0]; + const entry = remoteReply2.entries![0] as RecordsQueryReplyEntry; expect(entry.encodedData).to.be.undefined; // check for response encodedData if it doesn't exist issue a RecordsRead const recordId = (entry as RecordsWriteMessage).recordId; @@ -466,5 +563,64 @@ describe('SyncManagerLevel', () => { expect(remoteDwnQueryReply.entries).to.have.length(1); // Record does exist on remote DWN. }); }); + + describe('startSync()', () => { + it('calls push/pull in each interval', async () => { + await testAgent.agent.syncManager.registerIdentity({ + did: alice.did + }); + + const pushSpy = sinon.stub(SyncManagerLevel.prototype, 'push'); + pushSpy.resolves(); + + const pullSpy = sinon.stub(SyncManagerLevel.prototype, 'pull'); + pullSpy.resolves(); + + const clock = sinon.useFakeTimers(); + + testAgent.agent.syncManager.startSync({ interval: 500 }); + + await clock.tickAsync(1_400); // just under 3 intervals + pushSpy.restore(); + pullSpy.restore(); + clock.restore(); + + expect(pushSpy.callCount).to.equal(2, 'push'); + expect(pullSpy.callCount).to.equal(2, 'pull'); + }); + + it('does not call push/pull again until a push/pull finishes', async () => { + await testAgent.agent.syncManager.registerIdentity({ + did: alice.did + }); + + const clock = sinon.useFakeTimers(); + + const pushSpy = sinon.stub(SyncManagerLevel.prototype, 'push'); + pushSpy.returns(new Promise((resolve) => { + clock.setTimeout(() => { + resolve(); + }, 1_500); // more than the interval + })); + + const pullSpy = sinon.stub(SyncManagerLevel.prototype, 'pull'); + pullSpy.resolves(); + + testAgent.agent.syncManager.startSync({ interval: 500 }); + + await clock.tickAsync(1_400); // less time than the push + + expect(pushSpy.callCount).to.equal(1, 'push'); + expect(pullSpy.callCount).to.equal(0, 'pull'); // not called yet + + await clock.tickAsync(100); //remaining time for pull to be called + + expect(pullSpy.callCount).to.equal(1, 'pull'); + + pushSpy.restore(); + pullSpy.restore(); + clock.restore(); + }); + }); }); }); \ No newline at end of file diff --git a/packages/api/package.json b/packages/api/package.json index 5bba6aba4..e21b8b5bf 100644 --- a/packages/api/package.json +++ b/packages/api/package.json @@ -76,7 +76,7 @@ "node": ">=18.0.0" }, "dependencies": { - "@tbd54566975/dwn-sdk-js": "0.2.8", + "@tbd54566975/dwn-sdk-js": "0.2.10", "@web5/agent": "0.2.5", "@web5/common": "0.2.2", "@web5/crypto": "0.2.2", diff --git a/packages/api/src/dwn-api.ts b/packages/api/src/dwn-api.ts index a7c0cec10..bc387a4dc 100644 --- a/packages/api/src/dwn-api.ts +++ b/packages/api/src/dwn-api.ts @@ -1,6 +1,8 @@ import type { DwnResponse, Web5Agent } from '@web5/agent'; import type { + RecordsQueryReply, RecordsReadOptions, + ProtocolsQueryReply, RecordsQueryOptions, RecordsWriteMessage, RecordsWriteOptions, @@ -236,7 +238,8 @@ export class DwnApi { agentResponse = await this.agent.processDwnRequest(agentRequest); } - const { reply: { entries = [], status } } = agentResponse; + const reply = agentResponse.reply as ProtocolsQueryReply; + const { entries = [], status } = reply; const protocols = entries.map((entry: ProtocolsQueryReplyEntry) => { const metadata = { author: this.connectedDid, }; @@ -359,7 +362,8 @@ export class DwnApi { agentResponse = await this.agent.processDwnRequest(agentRequest); } - const { reply: { entries, status, cursor } } = agentResponse; + const reply = agentResponse.reply as RecordsQueryReply; + const { entries, status, cursor } = reply; const records = entries.map((entry: RecordsQueryReplyEntry) => { const recordOptions = { diff --git a/packages/dev-env/docker-compose.yaml b/packages/dev-env/docker-compose.yaml index fe4f895d6..5034cb24c 100644 --- a/packages/dev-env/docker-compose.yaml +++ b/packages/dev-env/docker-compose.yaml @@ -3,6 +3,6 @@ version: "3.98" services: dwn-server: container_name: dwn-server - image: ghcr.io/tbd54566975/dwn-server:dwn-sdk-0.2.8 + image: ghcr.io/tbd54566975/dwn-server:dwn-sdk-0.2.10 ports: - "3000:3000"