Skip to content

Commit

Permalink
feat: handle errors during sync (#38)
Browse files Browse the repository at this point in the history
# 🤖 Linear

Closes GRT-139

## Description
TL;DR fetching events failures won't make the whole agent crash now.
:goal_net:

* Handle errors during sync by catching and logging them making the
`sync` more robust.
* Added more verbose logging inside the `sync` method to improve its
traceability.
  • Loading branch information
0xyaco authored Sep 12, 2024
1 parent baeea02 commit 4186fff
Show file tree
Hide file tree
Showing 2 changed files with 124 additions and 19 deletions.
91 changes: 73 additions & 18 deletions packages/automated-dispute/src/services/eboProcessor.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import { isNativeError } from "util/types";
import { BlockNumberService } from "@ebo-agent/blocknumber";
import { Address, ILogger } from "@ebo-agent/shared";

Expand Down Expand Up @@ -52,42 +53,96 @@ export class EboProcessor {
// and trigger a request creation if there's no actor handling an <epoch, chain> request.
// This process should somehow check if there's already a request created for the epoch
// and chain that has no agent assigned and create it if that's the case.
try {
if (!this.lastCheckedBlock) {
this.lastCheckedBlock = await this.getEpochStartBlock();
}

if (!this.lastCheckedBlock) {
this.lastCheckedBlock = await this.getEpochStartBlock();
}
const lastBlock = await this.getLastFinalizedBlock();
const events = await this.getEvents(this.lastCheckedBlock, lastBlock);

const lastBlock = await this.protocolProvider.getLastFinalizedBlock();
const events = await this.protocolProvider.getEvents(this.lastCheckedBlock, lastBlock);
const eventsByRequestId = this.groupEventsByRequest(events);
const eventsByRequestId = this.groupEventsByRequest(events);
const synchableRequests = this.calculateSynchableRequests([
...eventsByRequestId.keys(),
]);

const synchableRequests = this.calculateSynchableRequests([...eventsByRequestId.keys()]);
const synchedRequests = [...synchableRequests].map(async (requestId: RequestId) => {
try {
const events = eventsByRequestId.get(requestId) ?? [];
this.logger.info(
`Reading events for the following requests:\n${synchableRequests.join(", ")}`,
);

await this.syncRequest(requestId, events, lastBlock);
} catch (err) {
this.onActorError(requestId, err as Error);
}
});
const synchedRequests = [...synchableRequests].map(async (requestId: RequestId) => {
try {
const events = eventsByRequestId.get(requestId) ?? [];

await this.syncRequest(requestId, events, lastBlock);
} catch (err) {
this.onActorError(requestId, err as Error);
}
});

await Promise.all(synchedRequests);

await Promise.all(synchedRequests);
this.logger.info(`Consumed events up to block ${lastBlock}.`);

this.lastCheckedBlock = lastBlock;
this.lastCheckedBlock = lastBlock;
} catch (err) {
if (isNativeError(err)) {
this.logger.error(`Sync failed: ` + `${err.message}\n\n` + `${err.stack}`);
} else {
this.logger.error(`Sync failed: ${err}`);
}

// TODO: notify
}
}

/**
* Fetches the first block of the current epoch.
*
* @returns the first block of the current epoch
*/
private async getEpochStartBlock() {
private async getEpochStartBlock(): Promise<bigint> {
this.logger.info("Fetching current epoch start block...");

const { currentEpochBlockNumber } = await this.protocolProvider.getCurrentEpoch();

this.logger.info(`Current epoch start block ${currentEpochBlockNumber} fetched.`);

return currentEpochBlockNumber;
}

/**
* Fetches the last finalized block on the protocol chain.
*
* @returns the last finalized block
*/
private async getLastFinalizedBlock(): Promise<bigint> {
this.logger.info("Fetching last finalized block...");

const lastBlock = await this.protocolProvider.getLastFinalizedBlock();

this.logger.info(`Last finalized block ${lastBlock} fetched.`);

return lastBlock;
}

/**
* Fetches the events to process during the sync.
*
* @param fromBlock block number lower bound for event search
* @param toBlock block number upper bound for event search
* @returns an array of events
*/
private async getEvents(fromBlock: bigint, toBlock: bigint): Promise<EboEvent<EboEventName>[]> {
this.logger.info(`Fetching events between blocks ${fromBlock} and ${toBlock}...`);

const events = await this.protocolProvider.getEvents(fromBlock, toBlock);

this.logger.info(`${events.length} events fetched.`);

return events;
}

/**
* Group events by its normalized request ID.
* .
Expand Down
52 changes: 51 additions & 1 deletion packages/automated-dispute/tests/services/eboProcessor.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,57 @@ describe("EboProcessor", () => {
);
});

it("fetches events since last block checked after first events fetch", async () => {
it("keeps the last block checked unaltered when something fails during sync", async () => {
const initialCurrentBlock = 1n;

const { processor, protocolProvider, actorsManager } = mocks.buildEboProcessor(logger);
const { actor } = mocks.buildEboActor(request, logger);

const currentEpoch = {
currentEpoch: 1n,
currentEpochBlockNumber: 1n,
currentEpochTimestamp: BigInt(Date.UTC(2024, 1, 1, 0, 0, 0, 0)),
};

const mockProtocolProviderGetEvents = vi
.spyOn(protocolProvider, "getEvents")
.mockImplementationOnce(() => {
// Simulate failure during first synch
throw new Error();
})
.mockResolvedValueOnce([]);

vi.spyOn(protocolProvider, "getLastFinalizedBlock")
.mockResolvedValueOnce(initialCurrentBlock + 10n)
.mockResolvedValueOnce(initialCurrentBlock + 20n);

vi.spyOn(protocolProvider, "getCurrentEpoch").mockResolvedValue(currentEpoch);
vi.spyOn(actorsManager, "createActor").mockReturnValue(actor);
vi.spyOn(actor, "processEvents").mockImplementation(() => Promise.resolve());
vi.spyOn(actor, "onLastBlockUpdated").mockImplementation(() => Promise.resolve());
vi.spyOn(actor, "canBeTerminated").mockResolvedValue(false);

await processor.start(msBetweenChecks);

expect(mockProtocolProviderGetEvents).toHaveBeenNthCalledWith(
1,
currentEpoch.currentEpochBlockNumber,
initialCurrentBlock + 10n,
);

expect(mockProtocolProviderGetEvents).toHaveBeenCalledTimes(1);
expect(logger.error).toHaveBeenCalledWith(expect.stringMatching("Sync failed"));

await vi.advanceTimersByTimeAsync(msBetweenChecks);

expect(mockProtocolProviderGetEvents).toHaveBeenNthCalledWith(
2,
currentEpoch.currentEpochBlockNumber,
initialCurrentBlock + 20n,
);
});

it("fetches non-consumed events if event fetching fails", async () => {
const { processor, protocolProvider, actorsManager } = mocks.buildEboProcessor(logger);
const { actor } = mocks.buildEboActor(request, logger);

Expand Down

0 comments on commit 4186fff

Please sign in to comment.