diff --git a/packages/automated-dispute/src/services/eboProcessor.ts b/packages/automated-dispute/src/services/eboProcessor.ts index aea44fb..f192f68 100644 --- a/packages/automated-dispute/src/services/eboProcessor.ts +++ b/packages/automated-dispute/src/services/eboProcessor.ts @@ -1,3 +1,4 @@ +import { isNativeError } from "util/types"; import { BlockNumberService } from "@ebo-agent/blocknumber"; import { Address, ILogger } from "@ebo-agent/shared"; @@ -52,29 +53,47 @@ export class EboProcessor { // and trigger a request creation if there's no actor handling an request. // This process should somehow check if there's already a request created for the epoch // and chain that has no agent assigned and create it if that's the case. + try { + if (!this.lastCheckedBlock) { + this.lastCheckedBlock = await this.getEpochStartBlock(); + } - if (!this.lastCheckedBlock) { - this.lastCheckedBlock = await this.getEpochStartBlock(); - } + const lastBlock = await this.getLastFinalizedBlock(); + const events = await this.getEvents(this.lastCheckedBlock, lastBlock); - const lastBlock = await this.protocolProvider.getLastFinalizedBlock(); - const events = await this.protocolProvider.getEvents(this.lastCheckedBlock, lastBlock); - const eventsByRequestId = this.groupEventsByRequest(events); + const eventsByRequestId = this.groupEventsByRequest(events); + const synchableRequests = this.calculateSynchableRequests([ + ...eventsByRequestId.keys(), + ]); - const synchableRequests = this.calculateSynchableRequests([...eventsByRequestId.keys()]); - const synchedRequests = [...synchableRequests].map(async (requestId: RequestId) => { - try { - const events = eventsByRequestId.get(requestId) ?? []; + this.logger.info( + `Reading events for the following requests:\n${synchableRequests.join(", ")}`, + ); - await this.syncRequest(requestId, events, lastBlock); - } catch (err) { - this.onActorError(requestId, err as Error); - } - }); + const synchedRequests = [...synchableRequests].map(async (requestId: RequestId) => { + try { + const events = eventsByRequestId.get(requestId) ?? []; + + await this.syncRequest(requestId, events, lastBlock); + } catch (err) { + this.onActorError(requestId, err as Error); + } + }); + + await Promise.all(synchedRequests); - await Promise.all(synchedRequests); + this.logger.info(`Consumed events up to block ${lastBlock}.`); - this.lastCheckedBlock = lastBlock; + this.lastCheckedBlock = lastBlock; + } catch (err) { + if (isNativeError(err)) { + this.logger.error(`Sync failed: ` + `${err.message}\n\n` + `${err.stack}`); + } else { + this.logger.error(`Sync failed: ${err}`); + } + + // TODO: notify + } } /** @@ -82,12 +101,48 @@ export class EboProcessor { * * @returns the first block of the current epoch */ - private async getEpochStartBlock() { + private async getEpochStartBlock(): Promise { + this.logger.info("Fetching current epoch start block..."); + const { currentEpochBlockNumber } = await this.protocolProvider.getCurrentEpoch(); + this.logger.info(`Current epoch start block ${currentEpochBlockNumber} fetched.`); + return currentEpochBlockNumber; } + /** + * Fetches the last finalized block on the protocol chain. + * + * @returns the last finalized block + */ + private async getLastFinalizedBlock(): Promise { + this.logger.info("Fetching last finalized block..."); + + const lastBlock = await this.protocolProvider.getLastFinalizedBlock(); + + this.logger.info(`Last finalized block ${lastBlock} fetched.`); + + return lastBlock; + } + + /** + * Fetches the events to process during the sync. + * + * @param fromBlock block number lower bound for event search + * @param toBlock block number upper bound for event search + * @returns an array of events + */ + private async getEvents(fromBlock: bigint, toBlock: bigint): Promise[]> { + this.logger.info(`Fetching events between blocks ${fromBlock} and ${toBlock}...`); + + const events = await this.protocolProvider.getEvents(fromBlock, toBlock); + + this.logger.info(`${events.length} events fetched.`); + + return events; + } + /** * Group events by its normalized request ID. * . diff --git a/packages/automated-dispute/tests/services/eboProcessor.spec.ts b/packages/automated-dispute/tests/services/eboProcessor.spec.ts index ac55fb7..e4a5f0b 100644 --- a/packages/automated-dispute/tests/services/eboProcessor.spec.ts +++ b/packages/automated-dispute/tests/services/eboProcessor.spec.ts @@ -132,7 +132,57 @@ describe("EboProcessor", () => { ); }); - it("fetches events since last block checked after first events fetch", async () => { + it("keeps the last block checked unaltered when something fails during sync", async () => { + const initialCurrentBlock = 1n; + + const { processor, protocolProvider, actorsManager } = mocks.buildEboProcessor(logger); + const { actor } = mocks.buildEboActor(request, logger); + + const currentEpoch = { + currentEpoch: 1n, + currentEpochBlockNumber: 1n, + currentEpochTimestamp: BigInt(Date.UTC(2024, 1, 1, 0, 0, 0, 0)), + }; + + const mockProtocolProviderGetEvents = vi + .spyOn(protocolProvider, "getEvents") + .mockImplementationOnce(() => { + // Simulate failure during first synch + throw new Error(); + }) + .mockResolvedValueOnce([]); + + vi.spyOn(protocolProvider, "getLastFinalizedBlock") + .mockResolvedValueOnce(initialCurrentBlock + 10n) + .mockResolvedValueOnce(initialCurrentBlock + 20n); + + vi.spyOn(protocolProvider, "getCurrentEpoch").mockResolvedValue(currentEpoch); + vi.spyOn(actorsManager, "createActor").mockReturnValue(actor); + vi.spyOn(actor, "processEvents").mockImplementation(() => Promise.resolve()); + vi.spyOn(actor, "onLastBlockUpdated").mockImplementation(() => Promise.resolve()); + vi.spyOn(actor, "canBeTerminated").mockResolvedValue(false); + + await processor.start(msBetweenChecks); + + expect(mockProtocolProviderGetEvents).toHaveBeenNthCalledWith( + 1, + currentEpoch.currentEpochBlockNumber, + initialCurrentBlock + 10n, + ); + + expect(mockProtocolProviderGetEvents).toHaveBeenCalledTimes(1); + expect(logger.error).toHaveBeenCalledWith(expect.stringMatching("Sync failed")); + + await vi.advanceTimersByTimeAsync(msBetweenChecks); + + expect(mockProtocolProviderGetEvents).toHaveBeenNthCalledWith( + 2, + currentEpoch.currentEpochBlockNumber, + initialCurrentBlock + 20n, + ); + }); + + it("fetches non-consumed events if event fetching fails", async () => { const { processor, protocolProvider, actorsManager } = mocks.buildEboProcessor(logger); const { actor } = mocks.buildEboActor(request, logger);