From a2190b57a7c394e3c59b9cc7134671ab1d4aaada Mon Sep 17 00:00:00 2001 From: Liran Cohen Date: Wed, 4 Sep 2024 16:21:37 -0400 Subject: [PATCH] sync engine refactor --- packages/agent/src/sync-engine-level.ts | 56 ++++------- .../agent/tests/sync-engine-level.spec.ts | 97 ++++++++++++++++--- 2 files changed, 104 insertions(+), 49 deletions(-) diff --git a/packages/agent/src/sync-engine-level.ts b/packages/agent/src/sync-engine-level.ts index d77166536..eb922406e 100644 --- a/packages/agent/src/sync-engine-level.ts +++ b/packages/agent/src/sync-engine-level.ts @@ -265,25 +265,18 @@ export class SyncEngineLevel implements SyncEngine { } public async sync(direction?: 'push' | 'pull'): Promise { - if (this._syncIntervalId) { - throw new Error('SyncEngineLevel: Cannot call sync while a sync interval is active. Call `stopSync()` first.'); - } - if (this._syncLock) { - throw new Error('SyncEngineLevel: Cannot call sync while a sync operation is in progress.'); + throw new Error('SyncEngineLevel: Sync operation is already in progress.'); } + this._syncLock = true; try { - this._syncLock = true; if (!direction || direction === 'push') { await this.push(); } if (!direction || direction === 'pull') { await this.pull(); } - } catch (error: any) { - this._syncLock = false; - throw error; } finally { this._syncLock = false; } @@ -295,38 +288,31 @@ export class SyncEngineLevel implements SyncEngine { // Convert the interval string to milliseconds. const intervalMilliseconds = ms(interval); - if (this._syncIntervalId) { - this.stopSync(); - } + const intervalSync = async () => { + if (this._syncLock) { + return; + } - if (!this._syncLock) { + clearInterval(this._syncIntervalId); + this._syncIntervalId = undefined; await this.sync(); - } - return new Promise((resolve, reject) => { - const intervalSync = async () => { - if (this._syncLock) { - return; - } - - // clears the interval and sets the syncIntervalId to undefined - this.stopSync(); + if (!this._syncIntervalId) { + this._syncIntervalId = setInterval(intervalSync, intervalMilliseconds); + } + }; - try { - await this.sync(); - } catch (error: any) { - this.stopSync(); - reject(error); - } + if (this._syncIntervalId) { + clearInterval(this._syncIntervalId); + } - if (!this._syncIntervalId) { - // only set a new interval if none is set. The most recently called `startSync` will set the final interval. - this._syncIntervalId = setInterval(intervalSync, intervalMilliseconds); - } - }; + // Set up a new interval. + this._syncIntervalId = setInterval(intervalSync, intervalMilliseconds); - this._syncIntervalId = setInterval(intervalSync, intervalMilliseconds); - }); + // initiate an immediate sync + if (!this._syncLock) { + await this.sync(); + } } public stopSync(): void { diff --git a/packages/agent/tests/sync-engine-level.spec.ts b/packages/agent/tests/sync-engine-level.spec.ts index 07633d90c..e2f8a405f 100644 --- a/packages/agent/tests/sync-engine-level.spec.ts +++ b/packages/agent/tests/sync-engine-level.spec.ts @@ -465,24 +465,41 @@ describe('SyncEngineLevel', () => { expect(pullSpy.calledOnce).to.be.true; }); - it('throws if sync is attempted while an interval sync is running', async () => { + it('throws an error if the sync is currently already running', async () => { // Register Alice's DID to be synchronized. await testHarness.agent.sync.registerIdentity({ did: alice.did.uri, }); - // start the sync engine with an interval of 10 seconds - syncEngine.startSync({ interval: '10s' }); + const clock = sinon.useFakeTimers(); + sinon.stub(syncEngine as any, 'push').resolves(); + const pullSpy = sinon.stub(syncEngine as any, 'pull'); + pullSpy.returns(new Promise((resolve) => { + clock.setTimeout(() => { + resolve(); + }, 90); + })); + + // do not await + syncEngine.sync(); + await clock.tickAsync(50); + + // do not block for subsequent syncs + pullSpy.returns(Promise.resolve()); try { - // Execute Sync to push and pull all records from Alice's remote DWN to Alice's local DWN. await syncEngine.sync(); - expect.fail('Expected an error to be thrown'); - } catch (error: any) { - // Execute Sync to push and pull all records from Alice's remote DWN to Alice's local DWN. - expect(error.message).to.equal('SyncEngineLevel: Cannot call sync while a sync interval is active. Call `stopSync()` first.'); + } catch(error:any) { + expect(error.message).to.equal('SyncEngineLevel: Sync operation is already in progress.'); } + + await clock.tickAsync(50); + + // no error thrown + await syncEngine.sync(); + + clock.restore(); }); }); @@ -2436,8 +2453,9 @@ describe('SyncEngineLevel', () => { pushSpy.restore(); clock.restore(); - expect(pullSpy.callCount).to.equal(2, 'push'); - expect(pushSpy.callCount).to.equal(2, 'pull'); + // one when starting the sync, and another for each interval + expect(pullSpy.callCount).to.equal(3, 'push'); + expect(pushSpy.callCount).to.equal(3, 'pull'); }); it('does not call sync() again until a sync round finishes', async () => { @@ -2461,14 +2479,22 @@ describe('SyncEngineLevel', () => { await clock.tickAsync(1_400); // less time than the push + // only once for when starting the sync expect(pullSpy.callCount).to.equal(1, 'pull'); expect(pullSpy.callCount).to.equal(1, 'push'); - await clock.tickAsync(600); //remaining time for a 2nd sync + await clock.tickAsync(200); //remaining time and one interval + // once when starting, and once for the interval expect(pullSpy.callCount).to.equal(2, 'pull'); expect(pushSpy.callCount).to.equal(2, 'push'); + await clock.tickAsync(500); // one more interval + + // one more for the interval + expect(pullSpy.callCount).to.equal(3, 'pull'); + expect(pushSpy.callCount).to.equal(3, 'push'); + pullSpy.restore(); pushSpy.restore(); clock.restore(); @@ -2493,7 +2519,8 @@ describe('SyncEngineLevel', () => { await clock.tickAsync(1_400); // less than the initial interval + the sync time - expect(syncSpy.callCount).to.equal(1); + // once for the initial call and once for each interval call + expect(syncSpy.callCount).to.equal(2); // set to be a short sync time syncSpy.returns(new Promise((resolve) => { @@ -2506,12 +2533,54 @@ describe('SyncEngineLevel', () => { await clock.tickAsync(301); // exactly the new interval + 1 - expect(syncSpy.callCount).to.equal(2); + // one for the initial 'startSync' call and one for each interval call + expect(syncSpy.callCount).to.equal(4); await clock.tickAsync(601); // two more intervals - expect(syncSpy.callCount).to.equal(4); + expect(syncSpy.callCount).to.equal(6); + + syncSpy.restore(); + clock.restore(); + }); + + it('should replace the interval timer with the latest interval timer', async () => { + + await testHarness.agent.sync.registerIdentity({ + did: alice.did.uri, + }); + + const clock = sinon.useFakeTimers(); + + const syncSpy = sinon.stub(SyncEngineLevel.prototype as any, 'sync'); + // set to be a sync time longer than the interval + syncSpy.returns(new Promise((resolve) => { + clock.setTimeout(() => { + resolve(); + }, 100); + })); + + testHarness.agent.sync.startSync({ interval: '500ms' }); + + // two intervals + await clock.tickAsync(1_001); + + // this should equal 3, once for the initial call and once for each interval call + expect(syncSpy.callCount).to.equal(3); + + syncSpy.resetHistory(); + testHarness.agent.sync.startSync({ interval: '200ms' }); + + await clock.tickAsync(401); // two intervals + + // one for the initial 'startSync' call and one for each interval call + expect(syncSpy.callCount).to.equal(3); + + await clock.tickAsync(401); // two more intervals + + // one additional calls for each interval + expect(syncSpy.callCount).to.equal(5); syncSpy.restore(); clock.restore();