Skip to content

Commit

Permalink
sync engine refactor
Browse files Browse the repository at this point in the history
  • Loading branch information
LiranCohen committed Sep 4, 2024
1 parent b5a9a2b commit a2190b5
Show file tree
Hide file tree
Showing 2 changed files with 104 additions and 49 deletions.
56 changes: 21 additions & 35 deletions packages/agent/src/sync-engine-level.ts
Original file line number Diff line number Diff line change
Expand Up @@ -265,25 +265,18 @@ export class SyncEngineLevel implements SyncEngine {
}

public async sync(direction?: 'push' | 'pull'): Promise<void> {
if (this._syncIntervalId) {
throw new Error('SyncEngineLevel: Cannot call sync while a sync interval is active. Call `stopSync()` first.');
}

if (this._syncLock) {
throw new Error('SyncEngineLevel: Cannot call sync while a sync operation is in progress.');
throw new Error('SyncEngineLevel: Sync operation is already in progress.');
}

this._syncLock = true;
try {
this._syncLock = true;
if (!direction || direction === 'push') {
await this.push();
}
if (!direction || direction === 'pull') {
await this.pull();
}
} catch (error: any) {
this._syncLock = false;
throw error;
} finally {
this._syncLock = false;
}
Expand All @@ -295,38 +288,31 @@ export class SyncEngineLevel implements SyncEngine {
// Convert the interval string to milliseconds.
const intervalMilliseconds = ms(interval);

if (this._syncIntervalId) {
this.stopSync();
}
const intervalSync = async () => {
if (this._syncLock) {
return;
}

if (!this._syncLock) {
clearInterval(this._syncIntervalId);
this._syncIntervalId = undefined;
await this.sync();
}

return new Promise((resolve, reject) => {
const intervalSync = async () => {
if (this._syncLock) {
return;
}

// clears the interval and sets the syncIntervalId to undefined
this.stopSync();
if (!this._syncIntervalId) {
this._syncIntervalId = setInterval(intervalSync, intervalMilliseconds);
}
};

try {
await this.sync();
} catch (error: any) {
this.stopSync();
reject(error);
}
if (this._syncIntervalId) {
clearInterval(this._syncIntervalId);
}

if (!this._syncIntervalId) {
// only set a new interval if none is set. The most recently called `startSync` will set the final interval.
this._syncIntervalId = setInterval(intervalSync, intervalMilliseconds);
}
};
// Set up a new interval.
this._syncIntervalId = setInterval(intervalSync, intervalMilliseconds);

this._syncIntervalId = setInterval(intervalSync, intervalMilliseconds);
});
// initiate an immediate sync
if (!this._syncLock) {
await this.sync();
}
}

public stopSync(): void {
Expand Down
97 changes: 83 additions & 14 deletions packages/agent/tests/sync-engine-level.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -465,24 +465,41 @@ describe('SyncEngineLevel', () => {
expect(pullSpy.calledOnce).to.be.true;
});

it('throws if sync is attempted while an interval sync is running', async () => {
it('throws an error if the sync is currently already running', async () => {
// Register Alice's DID to be synchronized.
await testHarness.agent.sync.registerIdentity({
did: alice.did.uri,
});

// start the sync engine with an interval of 10 seconds
syncEngine.startSync({ interval: '10s' });
const clock = sinon.useFakeTimers();
sinon.stub(syncEngine as any, 'push').resolves();
const pullSpy = sinon.stub(syncEngine as any, 'pull');
pullSpy.returns(new Promise<void>((resolve) => {
clock.setTimeout(() => {
resolve();
}, 90);
}));

// do not await
syncEngine.sync();

await clock.tickAsync(50);

// do not block for subsequent syncs
pullSpy.returns(Promise.resolve());
try {
// Execute Sync to push and pull all records from Alice's remote DWN to Alice's local DWN.
await syncEngine.sync();

expect.fail('Expected an error to be thrown');
} catch (error: any) {
// Execute Sync to push and pull all records from Alice's remote DWN to Alice's local DWN.
expect(error.message).to.equal('SyncEngineLevel: Cannot call sync while a sync interval is active. Call `stopSync()` first.');
} catch(error:any) {
expect(error.message).to.equal('SyncEngineLevel: Sync operation is already in progress.');
}

await clock.tickAsync(50);

// no error thrown
await syncEngine.sync();

clock.restore();
});
});

Expand Down Expand Up @@ -2436,8 +2453,9 @@ describe('SyncEngineLevel', () => {
pushSpy.restore();
clock.restore();

expect(pullSpy.callCount).to.equal(2, 'push');
expect(pushSpy.callCount).to.equal(2, 'pull');
// one when starting the sync, and another for each interval
expect(pullSpy.callCount).to.equal(3, 'push');
expect(pushSpy.callCount).to.equal(3, 'pull');
});

it('does not call sync() again until a sync round finishes', async () => {
Expand All @@ -2461,14 +2479,22 @@ describe('SyncEngineLevel', () => {

await clock.tickAsync(1_400); // less time than the push

// only once for when starting the sync
expect(pullSpy.callCount).to.equal(1, 'pull');
expect(pullSpy.callCount).to.equal(1, 'push');

await clock.tickAsync(600); //remaining time for a 2nd sync
await clock.tickAsync(200); //remaining time and one interval

// once when starting, and once for the interval
expect(pullSpy.callCount).to.equal(2, 'pull');
expect(pushSpy.callCount).to.equal(2, 'push');

await clock.tickAsync(500); // one more interval

// one more for the interval
expect(pullSpy.callCount).to.equal(3, 'pull');
expect(pushSpy.callCount).to.equal(3, 'push');

pullSpy.restore();
pushSpy.restore();
clock.restore();
Expand All @@ -2493,7 +2519,8 @@ describe('SyncEngineLevel', () => {

await clock.tickAsync(1_400); // less than the initial interval + the sync time

expect(syncSpy.callCount).to.equal(1);
// once for the initial call and once for each interval call
expect(syncSpy.callCount).to.equal(2);

// set to be a short sync time
syncSpy.returns(new Promise<void>((resolve) => {
Expand All @@ -2506,12 +2533,54 @@ describe('SyncEngineLevel', () => {

await clock.tickAsync(301); // exactly the new interval + 1

expect(syncSpy.callCount).to.equal(2);
// one for the initial 'startSync' call and one for each interval call
expect(syncSpy.callCount).to.equal(4);


await clock.tickAsync(601); // two more intervals

expect(syncSpy.callCount).to.equal(4);
expect(syncSpy.callCount).to.equal(6);

syncSpy.restore();
clock.restore();
});

it('should replace the interval timer with the latest interval timer', async () => {

await testHarness.agent.sync.registerIdentity({
did: alice.did.uri,
});

const clock = sinon.useFakeTimers();

const syncSpy = sinon.stub(SyncEngineLevel.prototype as any, 'sync');
// set to be a sync time longer than the interval
syncSpy.returns(new Promise<void>((resolve) => {
clock.setTimeout(() => {
resolve();
}, 100);
}));

testHarness.agent.sync.startSync({ interval: '500ms' });

// two intervals
await clock.tickAsync(1_001);

// this should equal 3, once for the initial call and once for each interval call
expect(syncSpy.callCount).to.equal(3);

syncSpy.resetHistory();
testHarness.agent.sync.startSync({ interval: '200ms' });

await clock.tickAsync(401); // two intervals

// one for the initial 'startSync' call and one for each interval call
expect(syncSpy.callCount).to.equal(3);

await clock.tickAsync(401); // two more intervals

// one additional calls for each interval
expect(syncSpy.callCount).to.equal(5);

syncSpy.restore();
clock.restore();
Expand Down

0 comments on commit a2190b5

Please sign in to comment.