diff --git a/.changeset/afraid-geese-knock.md b/.changeset/afraid-geese-knock.md new file mode 100644 index 000000000..de5832607 --- /dev/null +++ b/.changeset/afraid-geese-knock.md @@ -0,0 +1,8 @@ +--- +"@web5/agent": patch +"@web5/identity-agent": patch +"@web5/proxy-agent": patch +"@web5/user-agent": patch +--- + +Fix sync race condition issue diff --git a/package.json b/package.json index db1067aaa..95818471f 100644 --- a/package.json +++ b/package.json @@ -31,7 +31,7 @@ "@changesets/cli": "^2.27.5", "@npmcli/package-json": "5.0.0", "@typescript-eslint/eslint-plugin": "7.9.0", - "@web5/dwn-server": "0.4.8", + "@web5/dwn-server": "0.4.9", "audit-ci": "^7.0.1", "eslint-plugin-mocha": "10.4.3", "globals": "^13.24.0", diff --git a/packages/agent/src/sync-api.ts b/packages/agent/src/sync-api.ts index 29763f6b8..2883d8c4c 100644 --- a/packages/agent/src/sync-api.ts +++ b/packages/agent/src/sync-api.ts @@ -53,7 +53,7 @@ export class AgentSyncApi implements SyncEngine { return this._syncEngine.startSync(params); } - public stopSync(): void { - this._syncEngine.stopSync(); + public stopSync(timeout?: number): Promise { + return this._syncEngine.stopSync(timeout); } } \ No newline at end of file diff --git a/packages/agent/src/sync-engine-level.ts b/packages/agent/src/sync-engine-level.ts index f73b67af9..8dc86c369 100644 --- a/packages/agent/src/sync-engine-level.ts +++ b/packages/agent/src/sync-engine-level.ts @@ -315,7 +315,22 @@ export class SyncEngineLevel implements SyncEngine { } } - public stopSync(): void { + /** + * stopSync currently awaits the completion of the current sync operation before stopping the sync interval. + * TODO: implement a signal to gracefully stop sync immediately https://github.com/TBD54566975/web5-js/issues/890 + */ + public async stopSync(timeout: number = 2000): Promise { + let elapsedTimeout = 0; + + while(this._syncLock) { + if (elapsedTimeout >= timeout) { + throw new Error(`SyncEngineLevel: Existing sync operation did not complete within ${timeout} milliseconds.`); + } + + elapsedTimeout += 100; + await new Promise((resolve) => setTimeout(resolve, timeout < 100 ? timeout : 100)); + } + if (this._syncIntervalId) { clearInterval(this._syncIntervalId); this._syncIntervalId = undefined; diff --git a/packages/agent/src/test-harness.ts b/packages/agent/src/test-harness.ts index a98b7c9f2..4628233aa 100644 --- a/packages/agent/src/test-harness.ts +++ b/packages/agent/src/test-harness.ts @@ -85,6 +85,9 @@ export class PlatformAgentTestHarness { } public async clearStorage(): Promise { + // first stop any ongoing sync operations + await this.agent.sync.stopSync(); + // @ts-expect-error since normally this property shouldn't be set to undefined. this.agent.agentDid = undefined; await this.didResolverCache.clear(); diff --git a/packages/agent/src/types/sync.ts b/packages/agent/src/types/sync.ts index e37dfb508..a3002068c 100644 --- a/packages/agent/src/types/sync.ts +++ b/packages/agent/src/types/sync.ts @@ -38,6 +38,9 @@ export interface SyncEngine { startSync(params: { interval: string }): Promise; /** * Stops the periodic sync operation, will complete the current sync operation if one is already in progress. + * + * @param timeout the maximum amount of time, in milliseconds, to wait for the current sync operation to complete. Default is 2000 (2 seconds). + * @throws {Error} if the sync operation fails to stop before the timeout. */ - stopSync(): void; + stopSync(timeout?: number): Promise; } \ No newline at end of file diff --git a/packages/agent/tests/sync-engine-level.spec.ts b/packages/agent/tests/sync-engine-level.spec.ts index e2f8a405f..c887566ef 100644 --- a/packages/agent/tests/sync-engine-level.spec.ts +++ b/packages/agent/tests/sync-engine-level.spec.ts @@ -92,7 +92,6 @@ describe('SyncEngineLevel', () => { watermark, messageCid }); - console.log('key', key); const syncParams = SyncEngineLevel['parseSyncMessageParamsKey'](key); expect(syncParams.protocol).to.be.undefined; @@ -152,7 +151,6 @@ describe('SyncEngineLevel', () => { sinon.restore(); - syncEngine.stopSync(); await syncEngine.clear(); await testHarness.syncStore.clear(); await testHarness.dwnDataStore.clear(); @@ -471,7 +469,7 @@ describe('SyncEngineLevel', () => { did: alice.did.uri, }); - const clock = sinon.useFakeTimers(); + const clock = sinon.useFakeTimers({ shouldClearNativeTimers: true }); sinon.stub(syncEngine as any, 'push').resolves(); const pullSpy = sinon.stub(syncEngine as any, 'pull'); pullSpy.returns(new Promise((resolve) => { @@ -2444,7 +2442,7 @@ describe('SyncEngineLevel', () => { const pushSpy = sinon.stub(SyncEngineLevel.prototype as any, 'push'); pushSpy.resolves(); - const clock = sinon.useFakeTimers(); + const clock = sinon.useFakeTimers({ shouldClearNativeTimers: true }); testHarness.agent.sync.startSync({ interval: '500ms' }); @@ -2463,7 +2461,7 @@ describe('SyncEngineLevel', () => { did: alice.did.uri, }); - const clock = sinon.useFakeTimers(); + const clock = sinon.useFakeTimers({ shouldClearNativeTimers: true }); const pullSpy = sinon.stub(SyncEngineLevel.prototype as any, 'pull'); pullSpy.returns(new Promise((resolve) => { @@ -2505,7 +2503,7 @@ describe('SyncEngineLevel', () => { did: alice.did.uri, }); - const clock = sinon.useFakeTimers(); + const clock = sinon.useFakeTimers({ shouldClearNativeTimers: true }); const syncSpy = sinon.stub(SyncEngineLevel.prototype as any, 'sync'); // set to be a sync time longer than the interval @@ -2551,7 +2549,7 @@ describe('SyncEngineLevel', () => { did: alice.did.uri, }); - const clock = sinon.useFakeTimers(); + const clock = sinon.useFakeTimers({ shouldClearNativeTimers: true }); const syncSpy = sinon.stub(SyncEngineLevel.prototype as any, 'sync'); // set to be a sync time longer than the interval @@ -2586,5 +2584,270 @@ describe('SyncEngineLevel', () => { clock.restore(); }); }); + + describe('stopSync()', () => { + it('stops the sync interval', async () => { + await testHarness.agent.sync.registerIdentity({ + did: alice.did.uri, + }); + + const clock = sinon.useFakeTimers({ shouldClearNativeTimers: true }); + + const syncSpy = sinon.spy(SyncEngineLevel.prototype as any, 'sync'); + + // stub push and pull to take 3 ms each + const pullStub = sinon.stub(SyncEngineLevel.prototype as any, 'pull'); + pullStub.returns(new Promise((resolve) => { + clock.setTimeout(() => { + resolve(); + }, 3); + })); + + const pushStub = sinon.stub(SyncEngineLevel.prototype as any, 'push'); + pushStub.returns(new Promise((resolve) => { + clock.setTimeout(() => { + resolve(); + }, 3); + })); + + testHarness.agent.sync.startSync({ interval: '500ms' }); + + // expect the immediate sync call + expect(syncSpy.callCount).to.equal(1); + + + await clock.tickAsync(1_300); // just under 3 intervals + + // expect 2 sync interval calls + initial sync + expect(syncSpy.callCount).to.equal(3); + + await testHarness.agent.sync.stopSync(); + + await clock.tickAsync(1_000); // 2 intervals + + // sync calls remain unchanged + expect(syncSpy.callCount).to.equal(3); + + syncSpy.restore(); + clock.restore(); + }); + + it('waits for the current sync to complete before stopping', async () => { + await testHarness.agent.sync.registerIdentity({ + did: alice.did.uri, + }); + + const clock = sinon.useFakeTimers({ shouldClearNativeTimers: true }); + + const syncSpy = sinon.spy(SyncEngineLevel.prototype as any, 'sync'); + + // stub push and pull to take 3 ms each + const pullStub = sinon.stub(SyncEngineLevel.prototype as any, 'pull'); + pullStub.returns(new Promise((resolve) => { + clock.setTimeout(() => { + resolve(); + }, 3); + })); + + const pushStub = sinon.stub(SyncEngineLevel.prototype as any, 'push'); + pushStub.returns(new Promise((resolve) => { + clock.setTimeout(() => { + resolve(); + }, 3); + })); + + testHarness.agent.sync.startSync({ interval: '500ms' }); + + // expect the immediate sync call + expect(syncSpy.callCount).to.equal(1); + + await clock.tickAsync(1_300); // just under 3 intervals + + // expect 2 sync interval calls + initial sync + expect(syncSpy.callCount).to.equal(3); + + // cause pull to take longer + pullStub.returns(new Promise((resolve) => { + clock.setTimeout(() => { + resolve(); + }, 1_000); + })); + + await clock.tickAsync(201); // Enough time for the next interval to start + + // next interval was called + expect(syncSpy.callCount).to.equal(4); + + // stop the sync + await new Promise((resolve) => { + const stopPromise = testHarness.agent.sync.stopSync(); + clock.tickAsync(1_000).then(async () => { + await stopPromise; + resolve(); + }); + }); + + // sync calls remain unchanged + expect(syncSpy.callCount).to.equal(4); + + // wait for future intervals + await clock.tickAsync(2_000); + + // sync calls remain unchanged + expect(syncSpy.callCount).to.equal(4); + + syncSpy.restore(); + clock.restore(); + }); + + it('throws if ongoing sync does not complete within 2 seconds', async () => { + await testHarness.agent.sync.registerIdentity({ + did: alice.did.uri, + }); + + const clock = sinon.useFakeTimers({ shouldClearNativeTimers: true }); + + const syncSpy = sinon.spy(SyncEngineLevel.prototype as any, 'sync'); + + // stub push and pull to take 3 ms each + const pullStub = sinon.stub(SyncEngineLevel.prototype as any, 'pull'); + pullStub.returns(new Promise((resolve) => { + clock.setTimeout(() => { + resolve(); + }, 3); + })); + + const pushStub = sinon.stub(SyncEngineLevel.prototype as any, 'push'); + pushStub.returns(new Promise((resolve) => { + clock.setTimeout(() => { + resolve(); + }, 3); + })); + + testHarness.agent.sync.startSync({ interval: '500ms' }); + + // expect the immediate sync call + expect(syncSpy.callCount).to.equal(1); + + await clock.tickAsync(1_300); // just under 3 intervals + + // expect 2 sync interval calls + initial sync + expect(syncSpy.callCount).to.equal(3); + + // cause pull to take longer + pullStub.returns(new Promise((resolve) => { + clock.setTimeout(() => { + resolve(); + }, 2_700); // longer than the 2 seconds + })); + + await clock.tickAsync(201); // Enough time for the next interval to start + + // next interval was called + expect(syncSpy.callCount).to.equal(4); + + const stopPromise = testHarness.agent.sync.stopSync(); + + try { + await new Promise((resolve, reject) => { + stopPromise.catch((error) => reject(error)); + + clock.runToLastAsync().then(async () => { + try { + await stopPromise; + resolve(); + } catch(error) { + reject(error); + } + }); + + }); + expect.fail('Expected an error to be thrown'); + } catch(error:any) { + expect(error.message).to.equal('SyncEngineLevel: Existing sync operation did not complete within 2000 milliseconds.'); + } + + syncSpy.restore(); + clock.restore(); + }); + + it('only waits for the ongoing sync for the given timeout before failing', async () => { + await testHarness.agent.sync.registerIdentity({ + did: alice.did.uri, + }); + + const clock = sinon.useFakeTimers({ shouldClearNativeTimers: true }); + + const syncSpy = sinon.spy(SyncEngineLevel.prototype as any, 'sync'); + + // stub push and pull to take 3 ms each + const pullStub = sinon.stub(SyncEngineLevel.prototype as any, 'pull'); + pullStub.returns(new Promise((resolve) => { + clock.setTimeout(() => { + resolve(); + }, 3); + })); + + const pushStub = sinon.stub(SyncEngineLevel.prototype as any, 'push'); + pushStub.returns(new Promise((resolve) => { + clock.setTimeout(() => { + resolve(); + }, 3); + })); + + testHarness.agent.sync.startSync({ interval: '500ms' }); + + // expect the immediate sync call + expect(syncSpy.callCount).to.equal(1); + + await clock.tickAsync(10); // enough time for the sync round trip to complete + + // cause pull to take longer + pullStub.returns(new Promise((resolve) => { + clock.setTimeout(() => { + resolve(); + }, 2_700); // longer than the 2 seconds + })); + + await clock.tickAsync(501); // Enough time for the next interval to start + + // next interval was called + expect(syncSpy.callCount).to.equal(2); + + const stopPromise = testHarness.agent.sync.stopSync(10); + try { + await new Promise((resolve, reject) => { + stopPromise.catch((error) => reject(error)); + + clock.tickAsync(10).then(async () => { + try { + await stopPromise; + resolve(); + } catch(error) { + reject(error); + } + }); + + }); + expect.fail('Expected an error to be thrown'); + } catch(error:any) { + expect(error.message).to.equal('SyncEngineLevel: Existing sync operation did not complete within 10 milliseconds.'); + } + + // call again with a longer timeout + await new Promise((resolve) => { + const stopPromise2 = testHarness.agent.sync.stopSync(3_000); + // enough time for the ongoing sync to complete + 100ms as the check interval + clock.tickAsync(2800).then(async () => { + stopPromise2.then(() => resolve()); + }); + }); + + await clock.runToLastAsync(); + syncSpy.restore(); + clock.restore(); + }); + + }); }); }); \ No newline at end of file diff --git a/packages/api/tests/web5.spec.ts b/packages/api/tests/web5.spec.ts index 5f8981d4d..da4a4d094 100644 --- a/packages/api/tests/web5.spec.ts +++ b/packages/api/tests/web5.spec.ts @@ -171,7 +171,6 @@ describe('web5 api', () => { beforeEach(async () => { sinon.restore(); - testHarness.agent.sync.stopSync(); await testHarness.clearStorage(); await testHarness.createAgentDid(); }); @@ -794,8 +793,6 @@ describe('web5 api', () => { expect(startSyncSpy.args[0][0].interval).to.equal('1m'); }); - - it('should request all permissions for a protocol if no specific permissions are provided', async () => { sinon.stub(Web5UserAgent, 'create').resolves(testHarness.agent as Web5UserAgent); diff --git a/packages/dev-env/docker-compose.yaml b/packages/dev-env/docker-compose.yaml index 5b4e854ad..116d52148 100644 --- a/packages/dev-env/docker-compose.yaml +++ b/packages/dev-env/docker-compose.yaml @@ -3,6 +3,6 @@ version: "3.98" services: dwn-server: container_name: dwn-server - image: ghcr.io/tbd54566975/dwn-server:0.4.8 + image: ghcr.io/tbd54566975/dwn-server:0.4.9 ports: - "3000:3000" diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index f79114ca2..83e2d7019 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -33,8 +33,8 @@ importers: specifier: 7.9.0 version: 7.9.0(@typescript-eslint/parser@7.14.1(eslint@9.7.0)(typescript@5.5.4))(eslint@9.7.0)(typescript@5.5.4) '@web5/dwn-server': - specifier: 0.4.8 - version: 0.4.8 + specifier: 0.4.9 + version: 0.4.9 audit-ci: specifier: ^7.0.1 version: 7.1.0 @@ -2504,8 +2504,8 @@ packages: resolution: {integrity: sha512-M9EfsEYcOtYuEvUQjow4vpxXbD0Sz5H8EuDXMtwuvP4UdYL0ATl+60F8+8HDmwPFeUy6M2wxuoixrLDwSRFwZA==} engines: {node: '>=18.0.0'} - '@web5/dwn-server@0.4.8': - resolution: {integrity: sha512-Mr+Oq8XTZN133gnQYjhN07sbjVkfdlhsigQhDqThX9ghf2Kk3kiakb+5tlwYgsFCyj8O6sW+UM07443xZS3qLA==} + '@web5/dwn-server@0.4.9': + resolution: {integrity: sha512-LCBu7gcmfWcT8i571LPK5bHsBqtF2b0gC1VjAqZTo7ESCjGPrL6byvntiGiYWfSfzl9zgxpb/dIdVu/Ia8xvFA==} hasBin: true '@webassemblyjs/ast@1.12.1': @@ -8343,7 +8343,7 @@ snapshots: level: 8.0.1 ms: 2.1.3 - '@web5/dwn-server@0.4.8': + '@web5/dwn-server@0.4.9': dependencies: '@tbd54566975/dwn-sdk-js': 0.4.6 '@tbd54566975/dwn-sql-store': 0.6.6