From e578e204bb918f0d16c1e1ea641699086338ad3d Mon Sep 17 00:00:00 2001 From: Liran Cohen Date: Fri, 6 Sep 2024 17:38:32 -0400 Subject: [PATCH] Modify `stopSync` to block if sync is currently active (#889) - Add a missing changset for https://github.com/TBD54566975/web5-js/pull/887: `stopSync` now blocks if a current sync is in progress before clearing the interval. An optional timeout can be defined, the default is 2 seconds. After this timeout it will throw. TestHarness has been updated to stop sync before clearing storage, previously this caused an issue where an ongoing sync would attempt to sign messages for DID that no longer had keys after clearing storage. https://github.com/TBD54566975/web5-js/issues/890 has been created to better address this by creating a signal to gracefully stop sync immediately. --- .changeset/afraid-geese-knock.md | 8 + package.json | 2 +- packages/agent/src/sync-api.ts | 4 +- packages/agent/src/sync-engine-level.ts | 17 +- packages/agent/src/test-harness.ts | 3 + packages/agent/src/types/sync.ts | 5 +- .../agent/tests/sync-engine-level.spec.ts | 277 +++++++++++++++++- packages/api/tests/web5.spec.ts | 3 - packages/dev-env/docker-compose.yaml | 2 +- pnpm-lock.yaml | 10 +- 10 files changed, 310 insertions(+), 21 deletions(-) create mode 100644 .changeset/afraid-geese-knock.md diff --git a/.changeset/afraid-geese-knock.md b/.changeset/afraid-geese-knock.md new file mode 100644 index 000000000..de5832607 --- /dev/null +++ b/.changeset/afraid-geese-knock.md @@ -0,0 +1,8 @@ +--- +"@web5/agent": patch +"@web5/identity-agent": patch +"@web5/proxy-agent": patch +"@web5/user-agent": patch +--- + +Fix sync race condition issue diff --git a/package.json b/package.json index db1067aaa..95818471f 100644 --- a/package.json +++ b/package.json @@ -31,7 +31,7 @@ "@changesets/cli": "^2.27.5", "@npmcli/package-json": "5.0.0", "@typescript-eslint/eslint-plugin": "7.9.0", - "@web5/dwn-server": "0.4.8", + "@web5/dwn-server": "0.4.9", "audit-ci": "^7.0.1", "eslint-plugin-mocha": "10.4.3", "globals": "^13.24.0", diff --git a/packages/agent/src/sync-api.ts b/packages/agent/src/sync-api.ts index 29763f6b8..2883d8c4c 100644 --- a/packages/agent/src/sync-api.ts +++ b/packages/agent/src/sync-api.ts @@ -53,7 +53,7 @@ export class AgentSyncApi implements SyncEngine { return this._syncEngine.startSync(params); } - public stopSync(): void { - this._syncEngine.stopSync(); + public stopSync(timeout?: number): Promise { + return this._syncEngine.stopSync(timeout); } } \ No newline at end of file diff --git a/packages/agent/src/sync-engine-level.ts b/packages/agent/src/sync-engine-level.ts index f73b67af9..8dc86c369 100644 --- a/packages/agent/src/sync-engine-level.ts +++ b/packages/agent/src/sync-engine-level.ts @@ -315,7 +315,22 @@ export class SyncEngineLevel implements SyncEngine { } } - public stopSync(): void { + /** + * stopSync currently awaits the completion of the current sync operation before stopping the sync interval. + * TODO: implement a signal to gracefully stop sync immediately https://github.com/TBD54566975/web5-js/issues/890 + */ + public async stopSync(timeout: number = 2000): Promise { + let elapsedTimeout = 0; + + while(this._syncLock) { + if (elapsedTimeout >= timeout) { + throw new Error(`SyncEngineLevel: Existing sync operation did not complete within ${timeout} milliseconds.`); + } + + elapsedTimeout += 100; + await new Promise((resolve) => setTimeout(resolve, timeout < 100 ? timeout : 100)); + } + if (this._syncIntervalId) { clearInterval(this._syncIntervalId); this._syncIntervalId = undefined; diff --git a/packages/agent/src/test-harness.ts b/packages/agent/src/test-harness.ts index a98b7c9f2..4628233aa 100644 --- a/packages/agent/src/test-harness.ts +++ b/packages/agent/src/test-harness.ts @@ -85,6 +85,9 @@ export class PlatformAgentTestHarness { } public async clearStorage(): Promise { + // first stop any ongoing sync operations + await this.agent.sync.stopSync(); + // @ts-expect-error since normally this property shouldn't be set to undefined. this.agent.agentDid = undefined; await this.didResolverCache.clear(); diff --git a/packages/agent/src/types/sync.ts b/packages/agent/src/types/sync.ts index e37dfb508..a3002068c 100644 --- a/packages/agent/src/types/sync.ts +++ b/packages/agent/src/types/sync.ts @@ -38,6 +38,9 @@ export interface SyncEngine { startSync(params: { interval: string }): Promise; /** * Stops the periodic sync operation, will complete the current sync operation if one is already in progress. + * + * @param timeout the maximum amount of time, in milliseconds, to wait for the current sync operation to complete. Default is 2000 (2 seconds). + * @throws {Error} if the sync operation fails to stop before the timeout. */ - stopSync(): void; + stopSync(timeout?: number): Promise; } \ No newline at end of file diff --git a/packages/agent/tests/sync-engine-level.spec.ts b/packages/agent/tests/sync-engine-level.spec.ts index e2f8a405f..c887566ef 100644 --- a/packages/agent/tests/sync-engine-level.spec.ts +++ b/packages/agent/tests/sync-engine-level.spec.ts @@ -92,7 +92,6 @@ describe('SyncEngineLevel', () => { watermark, messageCid }); - console.log('key', key); const syncParams = SyncEngineLevel['parseSyncMessageParamsKey'](key); expect(syncParams.protocol).to.be.undefined; @@ -152,7 +151,6 @@ describe('SyncEngineLevel', () => { sinon.restore(); - syncEngine.stopSync(); await syncEngine.clear(); await testHarness.syncStore.clear(); await testHarness.dwnDataStore.clear(); @@ -471,7 +469,7 @@ describe('SyncEngineLevel', () => { did: alice.did.uri, }); - const clock = sinon.useFakeTimers(); + const clock = sinon.useFakeTimers({ shouldClearNativeTimers: true }); sinon.stub(syncEngine as any, 'push').resolves(); const pullSpy = sinon.stub(syncEngine as any, 'pull'); pullSpy.returns(new Promise((resolve) => { @@ -2444,7 +2442,7 @@ describe('SyncEngineLevel', () => { const pushSpy = sinon.stub(SyncEngineLevel.prototype as any, 'push'); pushSpy.resolves(); - const clock = sinon.useFakeTimers(); + const clock = sinon.useFakeTimers({ shouldClearNativeTimers: true }); testHarness.agent.sync.startSync({ interval: '500ms' }); @@ -2463,7 +2461,7 @@ describe('SyncEngineLevel', () => { did: alice.did.uri, }); - const clock = sinon.useFakeTimers(); + const clock = sinon.useFakeTimers({ shouldClearNativeTimers: true }); const pullSpy = sinon.stub(SyncEngineLevel.prototype as any, 'pull'); pullSpy.returns(new Promise((resolve) => { @@ -2505,7 +2503,7 @@ describe('SyncEngineLevel', () => { did: alice.did.uri, }); - const clock = sinon.useFakeTimers(); + const clock = sinon.useFakeTimers({ shouldClearNativeTimers: true }); const syncSpy = sinon.stub(SyncEngineLevel.prototype as any, 'sync'); // set to be a sync time longer than the interval @@ -2551,7 +2549,7 @@ describe('SyncEngineLevel', () => { did: alice.did.uri, }); - const clock = sinon.useFakeTimers(); + const clock = sinon.useFakeTimers({ shouldClearNativeTimers: true }); const syncSpy = sinon.stub(SyncEngineLevel.prototype as any, 'sync'); // set to be a sync time longer than the interval @@ -2586,5 +2584,270 @@ describe('SyncEngineLevel', () => { clock.restore(); }); }); + + describe('stopSync()', () => { + it('stops the sync interval', async () => { + await testHarness.agent.sync.registerIdentity({ + did: alice.did.uri, + }); + + const clock = sinon.useFakeTimers({ shouldClearNativeTimers: true }); + + const syncSpy = sinon.spy(SyncEngineLevel.prototype as any, 'sync'); + + // stub push and pull to take 3 ms each + const pullStub = sinon.stub(SyncEngineLevel.prototype as any, 'pull'); + pullStub.returns(new Promise((resolve) => { + clock.setTimeout(() => { + resolve(); + }, 3); + })); + + const pushStub = sinon.stub(SyncEngineLevel.prototype as any, 'push'); + pushStub.returns(new Promise((resolve) => { + clock.setTimeout(() => { + resolve(); + }, 3); + })); + + testHarness.agent.sync.startSync({ interval: '500ms' }); + + // expect the immediate sync call + expect(syncSpy.callCount).to.equal(1); + + + await clock.tickAsync(1_300); // just under 3 intervals + + // expect 2 sync interval calls + initial sync + expect(syncSpy.callCount).to.equal(3); + + await testHarness.agent.sync.stopSync(); + + await clock.tickAsync(1_000); // 2 intervals + + // sync calls remain unchanged + expect(syncSpy.callCount).to.equal(3); + + syncSpy.restore(); + clock.restore(); + }); + + it('waits for the current sync to complete before stopping', async () => { + await testHarness.agent.sync.registerIdentity({ + did: alice.did.uri, + }); + + const clock = sinon.useFakeTimers({ shouldClearNativeTimers: true }); + + const syncSpy = sinon.spy(SyncEngineLevel.prototype as any, 'sync'); + + // stub push and pull to take 3 ms each + const pullStub = sinon.stub(SyncEngineLevel.prototype as any, 'pull'); + pullStub.returns(new Promise((resolve) => { + clock.setTimeout(() => { + resolve(); + }, 3); + })); + + const pushStub = sinon.stub(SyncEngineLevel.prototype as any, 'push'); + pushStub.returns(new Promise((resolve) => { + clock.setTimeout(() => { + resolve(); + }, 3); + })); + + testHarness.agent.sync.startSync({ interval: '500ms' }); + + // expect the immediate sync call + expect(syncSpy.callCount).to.equal(1); + + await clock.tickAsync(1_300); // just under 3 intervals + + // expect 2 sync interval calls + initial sync + expect(syncSpy.callCount).to.equal(3); + + // cause pull to take longer + pullStub.returns(new Promise((resolve) => { + clock.setTimeout(() => { + resolve(); + }, 1_000); + })); + + await clock.tickAsync(201); // Enough time for the next interval to start + + // next interval was called + expect(syncSpy.callCount).to.equal(4); + + // stop the sync + await new Promise((resolve) => { + const stopPromise = testHarness.agent.sync.stopSync(); + clock.tickAsync(1_000).then(async () => { + await stopPromise; + resolve(); + }); + }); + + // sync calls remain unchanged + expect(syncSpy.callCount).to.equal(4); + + // wait for future intervals + await clock.tickAsync(2_000); + + // sync calls remain unchanged + expect(syncSpy.callCount).to.equal(4); + + syncSpy.restore(); + clock.restore(); + }); + + it('throws if ongoing sync does not complete within 2 seconds', async () => { + await testHarness.agent.sync.registerIdentity({ + did: alice.did.uri, + }); + + const clock = sinon.useFakeTimers({ shouldClearNativeTimers: true }); + + const syncSpy = sinon.spy(SyncEngineLevel.prototype as any, 'sync'); + + // stub push and pull to take 3 ms each + const pullStub = sinon.stub(SyncEngineLevel.prototype as any, 'pull'); + pullStub.returns(new Promise((resolve) => { + clock.setTimeout(() => { + resolve(); + }, 3); + })); + + const pushStub = sinon.stub(SyncEngineLevel.prototype as any, 'push'); + pushStub.returns(new Promise((resolve) => { + clock.setTimeout(() => { + resolve(); + }, 3); + })); + + testHarness.agent.sync.startSync({ interval: '500ms' }); + + // expect the immediate sync call + expect(syncSpy.callCount).to.equal(1); + + await clock.tickAsync(1_300); // just under 3 intervals + + // expect 2 sync interval calls + initial sync + expect(syncSpy.callCount).to.equal(3); + + // cause pull to take longer + pullStub.returns(new Promise((resolve) => { + clock.setTimeout(() => { + resolve(); + }, 2_700); // longer than the 2 seconds + })); + + await clock.tickAsync(201); // Enough time for the next interval to start + + // next interval was called + expect(syncSpy.callCount).to.equal(4); + + const stopPromise = testHarness.agent.sync.stopSync(); + + try { + await new Promise((resolve, reject) => { + stopPromise.catch((error) => reject(error)); + + clock.runToLastAsync().then(async () => { + try { + await stopPromise; + resolve(); + } catch(error) { + reject(error); + } + }); + + }); + expect.fail('Expected an error to be thrown'); + } catch(error:any) { + expect(error.message).to.equal('SyncEngineLevel: Existing sync operation did not complete within 2000 milliseconds.'); + } + + syncSpy.restore(); + clock.restore(); + }); + + it('only waits for the ongoing sync for the given timeout before failing', async () => { + await testHarness.agent.sync.registerIdentity({ + did: alice.did.uri, + }); + + const clock = sinon.useFakeTimers({ shouldClearNativeTimers: true }); + + const syncSpy = sinon.spy(SyncEngineLevel.prototype as any, 'sync'); + + // stub push and pull to take 3 ms each + const pullStub = sinon.stub(SyncEngineLevel.prototype as any, 'pull'); + pullStub.returns(new Promise((resolve) => { + clock.setTimeout(() => { + resolve(); + }, 3); + })); + + const pushStub = sinon.stub(SyncEngineLevel.prototype as any, 'push'); + pushStub.returns(new Promise((resolve) => { + clock.setTimeout(() => { + resolve(); + }, 3); + })); + + testHarness.agent.sync.startSync({ interval: '500ms' }); + + // expect the immediate sync call + expect(syncSpy.callCount).to.equal(1); + + await clock.tickAsync(10); // enough time for the sync round trip to complete + + // cause pull to take longer + pullStub.returns(new Promise((resolve) => { + clock.setTimeout(() => { + resolve(); + }, 2_700); // longer than the 2 seconds + })); + + await clock.tickAsync(501); // Enough time for the next interval to start + + // next interval was called + expect(syncSpy.callCount).to.equal(2); + + const stopPromise = testHarness.agent.sync.stopSync(10); + try { + await new Promise((resolve, reject) => { + stopPromise.catch((error) => reject(error)); + + clock.tickAsync(10).then(async () => { + try { + await stopPromise; + resolve(); + } catch(error) { + reject(error); + } + }); + + }); + expect.fail('Expected an error to be thrown'); + } catch(error:any) { + expect(error.message).to.equal('SyncEngineLevel: Existing sync operation did not complete within 10 milliseconds.'); + } + + // call again with a longer timeout + await new Promise((resolve) => { + const stopPromise2 = testHarness.agent.sync.stopSync(3_000); + // enough time for the ongoing sync to complete + 100ms as the check interval + clock.tickAsync(2800).then(async () => { + stopPromise2.then(() => resolve()); + }); + }); + + await clock.runToLastAsync(); + syncSpy.restore(); + clock.restore(); + }); + + }); }); }); \ No newline at end of file diff --git a/packages/api/tests/web5.spec.ts b/packages/api/tests/web5.spec.ts index 5f8981d4d..da4a4d094 100644 --- a/packages/api/tests/web5.spec.ts +++ b/packages/api/tests/web5.spec.ts @@ -171,7 +171,6 @@ describe('web5 api', () => { beforeEach(async () => { sinon.restore(); - testHarness.agent.sync.stopSync(); await testHarness.clearStorage(); await testHarness.createAgentDid(); }); @@ -794,8 +793,6 @@ describe('web5 api', () => { expect(startSyncSpy.args[0][0].interval).to.equal('1m'); }); - - it('should request all permissions for a protocol if no specific permissions are provided', async () => { sinon.stub(Web5UserAgent, 'create').resolves(testHarness.agent as Web5UserAgent); diff --git a/packages/dev-env/docker-compose.yaml b/packages/dev-env/docker-compose.yaml index 5b4e854ad..116d52148 100644 --- a/packages/dev-env/docker-compose.yaml +++ b/packages/dev-env/docker-compose.yaml @@ -3,6 +3,6 @@ version: "3.98" services: dwn-server: container_name: dwn-server - image: ghcr.io/tbd54566975/dwn-server:0.4.8 + image: ghcr.io/tbd54566975/dwn-server:0.4.9 ports: - "3000:3000" diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index f79114ca2..83e2d7019 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -33,8 +33,8 @@ importers: specifier: 7.9.0 version: 7.9.0(@typescript-eslint/parser@7.14.1(eslint@9.7.0)(typescript@5.5.4))(eslint@9.7.0)(typescript@5.5.4) '@web5/dwn-server': - specifier: 0.4.8 - version: 0.4.8 + specifier: 0.4.9 + version: 0.4.9 audit-ci: specifier: ^7.0.1 version: 7.1.0 @@ -2504,8 +2504,8 @@ packages: resolution: {integrity: sha512-M9EfsEYcOtYuEvUQjow4vpxXbD0Sz5H8EuDXMtwuvP4UdYL0ATl+60F8+8HDmwPFeUy6M2wxuoixrLDwSRFwZA==} engines: {node: '>=18.0.0'} - '@web5/dwn-server@0.4.8': - resolution: {integrity: sha512-Mr+Oq8XTZN133gnQYjhN07sbjVkfdlhsigQhDqThX9ghf2Kk3kiakb+5tlwYgsFCyj8O6sW+UM07443xZS3qLA==} + '@web5/dwn-server@0.4.9': + resolution: {integrity: sha512-LCBu7gcmfWcT8i571LPK5bHsBqtF2b0gC1VjAqZTo7ESCjGPrL6byvntiGiYWfSfzl9zgxpb/dIdVu/Ia8xvFA==} hasBin: true '@webassemblyjs/ast@1.12.1': @@ -8343,7 +8343,7 @@ snapshots: level: 8.0.1 ms: 2.1.3 - '@web5/dwn-server@0.4.8': + '@web5/dwn-server@0.4.9': dependencies: '@tbd54566975/dwn-sdk-js': 0.4.6 '@tbd54566975/dwn-sql-store': 0.6.6