From 70723312d677c3f0aac960688613b45160528f90 Mon Sep 17 00:00:00 2001 From: Liran Cohen Date: Mon, 21 Oct 2024 19:30:45 -0400 Subject: [PATCH] Currently sync interval stops if there is a failure, instead log failure and wait for the interval (#958) This PR fixes the issue where a sync interval would stop completely if any error occurred during sync. This will now catch the error and log it, continuing to attempt sync the next interval. Additionally, enqueueing operations for each DID/Remote will not wait for each one to complete sequentially and instead take advantage of `Promise.allSettled`. Which also will not fail if a single peer of the array fails and instead log the error. --- .changeset/lucky-dots-clap.md | 8 +++ packages/agent/src/sync-engine-level.ts | 19 +++++- .../agent/tests/sync-engine-level.spec.ts | 68 +++++++++++++++++++ 3 files changed, 92 insertions(+), 3 deletions(-) create mode 100644 .changeset/lucky-dots-clap.md diff --git a/.changeset/lucky-dots-clap.md b/.changeset/lucky-dots-clap.md new file mode 100644 index 000000000..20abdcc11 --- /dev/null +++ b/.changeset/lucky-dots-clap.md @@ -0,0 +1,8 @@ +--- +"@web5/agent": patch +"@web5/identity-agent": patch +"@web5/proxy-agent": patch +"@web5/user-agent": patch +--- + +Prevent SyncEngine from stopping completely during a sync failure, next interval will try again. diff --git a/packages/agent/src/sync-engine-level.ts b/packages/agent/src/sync-engine-level.ts index 3d043e90d..4b42650b5 100644 --- a/packages/agent/src/sync-engine-level.ts +++ b/packages/agent/src/sync-engine-level.ts @@ -338,7 +338,12 @@ export class SyncEngineLevel implements SyncEngine { clearInterval(this._syncIntervalId); this._syncIntervalId = undefined; - await this.sync(); + + try { + await this.sync(); + } catch (error) { + console.error('SyncEngineLevel: Error during sync operation', error); + } if (!this._syncIntervalId) { this._syncIntervalId = setInterval(intervalSync, intervalMilliseconds); @@ -405,7 +410,7 @@ export class SyncEngineLevel implements SyncEngine { syncDirection: SyncDirection, syncPeerState: SyncState[] }) { - for (let syncState of syncPeerState) { + const enqueueOps = await Promise.allSettled(syncPeerState.map(async (syncState) => { // Get the event log from the remote DWN if pull sync, or local DWN if push sync. const eventLog = await this.getDwnEventLog({ did : syncState.did, @@ -435,7 +440,15 @@ export class SyncEngineLevel implements SyncEngine { : this.getPushQueue(); await syncQueue.batch(syncOperations as any); } - } + })); + + // log any errors that occurred during the enqueuing process + enqueueOps.forEach((result, index) => { + if (result.status === 'rejected') { + const peerState = syncPeerState[index]; + console.error(`SyncEngineLevel: Error enqueuing sync operation for peerState: ${JSON.stringify(peerState)}`, result.reason); + } + }); } private static generateSyncMessageParamsKey({ did, delegateDid, dwnUrl, protocol, watermark, messageCid }:SyncMessageParams): string { diff --git a/packages/agent/tests/sync-engine-level.spec.ts b/packages/agent/tests/sync-engine-level.spec.ts index 4ce1a509d..097f01e2a 100644 --- a/packages/agent/tests/sync-engine-level.spec.ts +++ b/packages/agent/tests/sync-engine-level.spec.ts @@ -500,6 +500,36 @@ describe('SyncEngineLevel', () => { clock.restore(); }); + + it('sync logs failures when enqueueing sync operations', async () => { + // returns 3 DID peers to sync with + sinon.stub(syncEngine as any, 'getSyncPeerState').resolves([{ + did: 'did:example:alice', + }, { + did: 'did:example:bob', + }, { + did: 'did:example:carol', + }]); + + const getDwnEventLogSpy = sinon.stub(syncEngine as any, 'getDwnEventLog').resolves([]); + getDwnEventLogSpy.onCall(2).rejects(new Error('Failed to get event log')); + + // spy on the console error + const consoleErrorSpy = sinon.stub(console, 'error').resolves(); + + await syncEngine.sync(); + + expect(consoleErrorSpy.callCount).to.equal(1); + expect(consoleErrorSpy.firstCall.args[0]).to.include('Error enqueuing sync operation for peerState'); + + // reset the error spy + consoleErrorSpy.resetHistory(); + + // sync again, this time no errors should be thrown + await syncEngine.sync(); + + expect(consoleErrorSpy.notCalled).to.be.true; + }); }); describe('pull()', () => { @@ -2002,6 +2032,44 @@ describe('SyncEngineLevel', () => { syncSpy.restore(); clock.restore(); }); + + it('should log sync errors, but continue syncing the next interval', async () => { + await testHarness.agent.sync.registerIdentity({ + did: alice.did.uri, + }); + + const clock = sinon.useFakeTimers({ shouldClearNativeTimers: true }); + const syncSpy = sinon.stub(SyncEngineLevel.prototype as any, 'sync'); + + syncSpy.returns(new Promise((resolve, reject) => { + clock.setTimeout(() => { + resolve(); + }, 100); + })); + + // first call is the initial sync, 2nd and onward are the intervals + // on the 2nd interval (3rd call), we reject the promise, a 4th call should be made + syncSpy.onThirdCall().rejects(new Error('Sync error')); + + // spy on console.error to check if the error message is logged + const consoleErrorSpy = sinon.stub(console, 'error').resolves(); + + testHarness.agent.sync.startSync({ interval: '500ms' }); + + // three intervals + await clock.tickAsync(1_500); + + // this should equal 4, once for the initial call and once for each interval call + expect(syncSpy.callCount).to.equal(4); + + // check if the error message is logged + expect(consoleErrorSpy.callCount).to.equal(1); + expect(consoleErrorSpy.args[0][0]).to.include('SyncEngineLevel: Error during sync operation'); + + syncSpy.restore(); + consoleErrorSpy.restore(); + clock.restore(); + }); }); describe('stopSync()', () => {