Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: create requests #42

Merged
merged 7 commits into from
Sep 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 2 additions & 8 deletions packages/automated-dispute/src/interfaces/protocolProvider.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
import { Timestamp } from "@ebo-agent/shared";
import { Address } from "viem";

import type { EboEvent, EboEventName } from "../types/events.js";
import type { Dispute, Request, Response } from "../types/prophet.js";
import type { Dispute, EboEvent, EboEventName, Epoch, Request, Response } from "../types/index.js";
import { ProtocolContractsNames } from "../constants.js";

export type ProtocolContract = (typeof ProtocolContractsNames)[number];
Expand All @@ -17,11 +15,7 @@ export interface IReadProvider {
*
* @returns A promise that resolves with the current epoch, block number, and timestamp.
*/
getCurrentEpoch(): Promise<{
currentEpoch: bigint;
currentEpochBlockNumber: bigint;
currentEpochTimestamp: Timestamp;
}>;
getCurrentEpoch(): Promise<Epoch>;

/**
* Gets the last finalized block number.
Expand Down
25 changes: 10 additions & 15 deletions packages/automated-dispute/src/providers/protocolProvider.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { Timestamp } from "@ebo-agent/shared";
import { Caip2ChainId } from "@ebo-agent/blocknumber/dist/types.js";
import {
Address,
BaseError,
Expand All @@ -18,8 +18,7 @@ import {
import { privateKeyToAccount } from "viem/accounts";
import { arbitrum } from "viem/chains";

import type { EboEvent, EboEventName } from "../types/events.js";
import type { Dispute, Request, Response } from "../types/prophet.js";
import type { Dispute, EboEvent, EboEventName, Epoch, Request, Response } from "../types/index.js";
import { eboRequestCreatorAbi, epochManagerAbi, oracleAbi } from "../abis/index.js";
import {
InvalidAccountOnClient,
Expand Down Expand Up @@ -153,24 +152,20 @@ export class ProtocolProvider implements IProtocolProvider {
*
* @returns The current epoch, its block number and its timestamp
*/
async getCurrentEpoch(): Promise<{
currentEpoch: bigint;
currentEpochBlockNumber: bigint;
currentEpochTimestamp: Timestamp;
}> {
const [currentEpoch, currentEpochBlockNumber] = await Promise.all([
async getCurrentEpoch(): Promise<Epoch> {
const [epoch, epochFirstBlockNumber] = await Promise.all([
this.epochManagerContract.read.currentEpoch(),
this.epochManagerContract.read.currentEpochBlock(),
]);

const currentEpochBlock = await this.readClient.getBlock({
blockNumber: currentEpochBlockNumber,
const epochFirstBlock = await this.readClient.getBlock({
blockNumber: epochFirstBlockNumber,
});

return {
currentEpoch,
currentEpochBlockNumber,
currentEpochTimestamp: currentEpochBlock.timestamp,
number: epoch,
firstBlockNumber: epochFirstBlockNumber,
startTimestamp: epochFirstBlock.timestamp,
};
}

Expand Down Expand Up @@ -257,7 +252,7 @@ export class ProtocolProvider implements IProtocolProvider {
}

// TODO: use Caip2 Chain ID instead of string in return type
async getAvailableChains(): Promise<string[]> {
async getAvailableChains(): Promise<Caip2ChainId[]> {
// TODO: implement actual method
return ["eip155:1", "eip155:42161"];
}
Expand Down
39 changes: 21 additions & 18 deletions packages/automated-dispute/src/services/eboActor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,16 @@ import { Mutex } from "async-mutex";
import { Heap } from "heap-js";
import { ContractFunctionRevertedError } from "viem";

import type {
Dispute,
DisputeStatus,
EboEvent,
EboEventName,
Epoch,
Request,
Response,
ResponseBody,
} from "../types/index.js";
import {
DisputeWithoutResponse,
EBORequestCreator_ChainNotAdded,
Expand All @@ -27,16 +37,7 @@ import {
FinalizeRequest,
UpdateDisputeStatus,
} from "../services/index.js";
import {
Dispute,
DisputeStatus,
EboEvent,
EboEventName,
Request,
RequestId,
Response,
ResponseBody,
} from "../types/index.js";
import { ActorRequest } from "../types/actorRequest.js";

/**
* Compare function to sort events chronologically in ascending order by block number
Expand Down Expand Up @@ -78,10 +79,7 @@ export class EboActor {
* @param logger an `ILogger` instance
*/
constructor(
private readonly actorRequest: {
id: RequestId;
epoch: bigint;
},
public readonly actorRequest: ActorRequest,
private readonly protocolProvider: ProtocolProvider,
private readonly blockNumberService: BlockNumberService,
private readonly registry: EboRegistry,
Expand Down Expand Up @@ -432,15 +430,19 @@ export class EboActor {
*
* Be aware that a request can be finalized but some of its disputes can still be pending resolution.
*
* At last, actors must be kept alive until their epoch concludes, to ensure no actor/request duplication.
*
* @param currentEpoch the epoch to check against actor termination
* @param blockNumber block number to check entities at
* @returns `true` if all entities are settled, otherwise `false`
*/
public canBeTerminated(blockNumber: bigint): boolean {
public canBeTerminated(currentEpoch: Epoch["number"], blockNumber: bigint): boolean {
const request = this.getActorRequest();
const isPastEpoch = currentEpoch > request.epoch;
const isRequestFinalized = request.status === "Finalized";
const nonSettledProposals = this.activeProposals(blockNumber);

return isRequestFinalized && nonSettledProposals.length === 0;
return isPastEpoch && isRequestFinalized && nonSettledProposals.length === 0;
}

/**
Expand Down Expand Up @@ -556,10 +558,11 @@ export class EboActor {
private async buildResponse(chainId: Caip2ChainId): Promise<ResponseBody> {
// FIXME(non-current epochs): adapt this code to fetch timestamps corresponding
// to the first block of any epoch, not just the current epoch
const { currentEpochTimestamp } = await this.protocolProvider.getCurrentEpoch();
const { startTimestamp: epochStartTimestamp } =
await this.protocolProvider.getCurrentEpoch();

const epochBlockNumber = await this.blockNumberService.getEpochBlockNumber(
currentEpochTimestamp,
epochStartTimestamp,
chainId,
);

Expand Down
13 changes: 7 additions & 6 deletions packages/automated-dispute/src/services/eboActorsManager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import { Mutex } from "async-mutex";

import { RequestAlreadyHandled } from "../exceptions/index.js";
import { ProtocolProvider } from "../providers/protocolProvider.js";
import { RequestId } from "../types/prophet.js";
import { ActorRequest, RequestId } from "../types/index.js";
import { EboActor } from "./eboActor.js";
import { EboMemoryRegistry } from "./eboRegistry/eboMemoryRegistry.js";

Expand All @@ -21,7 +21,11 @@ export class EboActorsManager {
* @returns array of normalized request IDs
*/
public getRequestIds(): RequestId[] {
return [...this.requestActorMap.entries()].map((entry) => Address.normalize(entry[0]));
return [...this.requestActorMap.keys()].map((requestId) => Address.normalize(requestId));
}

public getActorsRequests(): ActorRequest[] {
return [...this.requestActorMap.values()].map((actor) => actor.actorRequest);
}

/**
Expand All @@ -30,10 +34,7 @@ export class EboActorsManager {
* @param actor an `EboActor` instance that handles a request.
*/
public createActor(
actorRequest: {
id: RequestId;
epoch: bigint;
},
actorRequest: ActorRequest,
protocolProvider: ProtocolProvider,
blockNumberService: BlockNumberService,
logger: ILogger,
Expand Down
107 changes: 90 additions & 17 deletions packages/automated-dispute/src/services/eboProcessor.ts
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
import { isNativeError } from "util/types";
import { BlockNumberService } from "@ebo-agent/blocknumber";
import { Caip2ChainId } from "@ebo-agent/blocknumber/dist/types.js";
import { Address, ILogger } from "@ebo-agent/shared";

import { ProcessorAlreadyStarted } from "../exceptions/index.js";
import { ProtocolProvider } from "../providers/protocolProvider.js";
import { alreadyDeletedActorWarning, droppingUnhandledEventsWarning } from "../templates/index.js";
import { EboEvent, EboEventName } from "../types/events.js";
import { RequestId } from "../types/prophet.js";
import { ActorRequest, EboEvent, EboEventName, Epoch, RequestId } from "../types/index.js";
import { EboActorsManager } from "./eboActorsManager.js";

const DEFAULT_MS_BETWEEN_CHECKS = 10 * 60 * 1000; // 10 minutes
Expand Down Expand Up @@ -49,13 +49,11 @@ export class EboProcessor {

/** Sync new blocks and their events with their corresponding actors. */
private async sync() {
// TODO: detect new epoch by comparing subgraph's data with EpochManager's current epoch
// and trigger a request creation if there's no actor handling an <epoch, chain> request.
// This process should somehow check if there's already a request created for the epoch
// and chain that has no agent assigned and create it if that's the case.
try {
const currentEpoch = await this.getCurrentEpoch();

if (!this.lastCheckedBlock) {
this.lastCheckedBlock = await this.getEpochStartBlock();
this.lastCheckedBlock = currentEpoch.firstBlockNumber;
}

const lastBlock = await this.getLastFinalizedBlock();
Expand All @@ -74,7 +72,7 @@ export class EboProcessor {
try {
const events = eventsByRequestId.get(requestId) ?? [];

await this.syncRequest(requestId, events, lastBlock);
await this.syncRequest(requestId, events, currentEpoch.number, lastBlock);
} catch (err) {
this.onActorError(requestId, err as Error);
}
Expand All @@ -84,6 +82,8 @@ export class EboProcessor {

this.logger.info(`Consumed events up to block ${lastBlock}.`);

this.createMissingRequests(currentEpoch.number);

this.lastCheckedBlock = lastBlock;
} catch (err) {
if (isNativeError(err)) {
Expand All @@ -97,18 +97,18 @@ export class EboProcessor {
}

/**
* Fetches the first block of the current epoch.
* Fetches the current epoch for the protocol chain.
*
* @returns the first block of the current epoch
* @returns the current epoch properties of the protocol chain.
*/
private async getEpochStartBlock(): Promise<bigint> {
this.logger.info("Fetching current epoch start block...");
private async getCurrentEpoch(): Promise<Epoch> {
this.logger.info("Fetching current epoch...");

const { currentEpochBlockNumber } = await this.protocolProvider.getCurrentEpoch();
const currentEpoch = await this.protocolProvider.getCurrentEpoch();

this.logger.info(`Current epoch start block ${currentEpochBlockNumber} fetched.`);
this.logger.info(`Current epoch fetched.`);

return currentEpochBlockNumber;
return currentEpoch;
}

/**
Expand Down Expand Up @@ -182,9 +182,15 @@ export class EboProcessor {
*
* @param requestId the ID of the `Request`
* @param events a stream of consumed events
* @param currentEpoch the current epoch based on the last block
* @param lastBlock the last block checked
*/
private async syncRequest(requestId: RequestId, events: EboEventStream, lastBlock: bigint) {
private async syncRequest(
requestId: RequestId,
events: EboEventStream,
currentEpoch: Epoch["number"],
lastBlock: bigint,
) {
const firstEvent = events[0];
const actor = this.getOrCreateActor(requestId, firstEvent);

Expand All @@ -199,7 +205,7 @@ export class EboProcessor {
await actor.processEvents();
await actor.onLastBlockUpdated(lastBlock);

if (actor.canBeTerminated(lastBlock)) {
if (actor.canBeTerminated(currentEpoch, lastBlock)) {
this.terminateActor(requestId);
}
}
Expand Down Expand Up @@ -235,6 +241,7 @@ export class EboProcessor {
const actorRequest = {
id: Address.normalize(event.requestId),
epoch: event.metadata.epoch,
chainId: event.metadata.chainId,
};

const actor = this.actorsManager.createActor(
Expand All @@ -259,6 +266,72 @@ export class EboProcessor {
this.terminateActor(requestId);
}

/**
* Creates missing requests for the specified epoch, based on the
* available chains and the currently being handled requests.
*
* @param epoch the epoch number
*/
private async createMissingRequests(epoch: Epoch["number"]): Promise<void> {
try {
const handledEpochChains = this.actorsManager
.getActorsRequests()
.reduce((actorRequestMap, actorRequest: ActorRequest) => {
const epochRequests = actorRequestMap.get(actorRequest.epoch) ?? new Set();

epochRequests.add(actorRequest.chainId);

return actorRequestMap.set(actorRequest.epoch, epochRequests);
}, new Map<Epoch["number"], Set<Caip2ChainId>>());

this.logger.info("Fetching available chains...");

const availableChains: Caip2ChainId[] =
await this.protocolProvider.getAvailableChains();

this.logger.info("Available chains fetched.");

const unhandledEpochChain = availableChains.filter((chain) => {
const epochRequests = handledEpochChains.get(epoch);
const isHandled = epochRequests && epochRequests.has(chain);

return !isHandled;
});

this.logger.info("Creating missing requests...");

const epochChainRequests = unhandledEpochChain.map(async (chain) => {
try {
this.logger.info(`Creating request for chain ${chain} and epoch ${epoch}...`);

await this.protocolProvider.createRequest(epoch, [chain]);

this.logger.info(`Request created for chain ${chain} and epoch ${epoch}`);
} catch (err) {
// Request creation must be notified but it's not critical, as it will be
// retried during next sync.

// TODO: warn when getting a EBORequestCreator_RequestAlreadyCreated
// TODO: notify under any other error

this.logger.error(
`Could not create a request for epoch ${epoch} and chain ${chain}.`,
);
}
});

await Promise.all(epochChainRequests);

this.logger.info("Missing requests created.");
} catch (err) {
// TODO: notify

this.logger.error(
`Requests creation missing: ${isNativeError(err) ? err.message : err}`,
);
}
}

/**
* Removes the actor from tracking the request.
*
Expand Down
5 changes: 5 additions & 0 deletions packages/automated-dispute/src/types/actorRequest.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
import { Caip2ChainId } from "@ebo-agent/blocknumber/dist/types.js";

import { RequestId } from "./prophet.js";

export type ActorRequest = { id: RequestId; epoch: bigint; chainId: Caip2ChainId };
Loading