diff --git a/src/domain/polling/index.ts b/src/domain/polling/index.ts index 6eb739e..b9b8f69 100644 --- a/src/domain/polling/index.ts +++ b/src/domain/polling/index.ts @@ -8,13 +8,6 @@ import { import { ethers } from "ethers"; import { BytesLike } from "ethers/lib/utils"; -import { ConditionalOrder, OrderStatus } from "../../types"; -import { - formatStatus, - getLogger, - handleOnChainCustomError, - metrics, -} from "../../utils"; import { ConditionalOrder as ConditionalOrderSDK, OrderBookApi, @@ -30,6 +23,14 @@ import { formatEpoch, } from "@cowprotocol/cow-sdk"; import { ChainContext } from "../../services"; +import { ConditionalOrder, OrderStatus } from "../../types"; +import { + LoggerWithMethods, + formatStatus, + getLogger, + handleOnChainCustomError, + metrics, +} from "../../utils"; import { badOrder, policy } from "./filtering"; import { pollConditionalOrder } from "./poll"; @@ -91,6 +92,8 @@ const API_ERRORS_DROP: DropApiErrorsArray = [ // ApiErrors.IncompatibleSigningScheme - we control this in the watch-tower // ApiErrors.AppDataHashMismatch - we never submit full appData +const CHUNK_SIZE = 20; // How many orders to process before saving + /** * Watch for new blocks and check for orders to place * @@ -128,19 +131,29 @@ export async function checkForAndPlaceOrder( blockNumber.toString(), ownerCounter.toString() ); - const ordersPendingDelete = []; - // enumerate all the `ConditionalOrder`s for a given owner + + let ordersPendingDelete = []; + log.debug(`Process owner ${owner} (${conditionalOrders.size} orders)`); + for (const conditionalOrder of conditionalOrders) { orderCounter++; + + // Check if we reached the chunk size + if (orderCounter % CHUNK_SIZE === 1 && orderCounter > 1) { + // Delete orders pending delete, if any + _deleteOrders(ordersPendingDelete, conditionalOrders, log, chainId); + // Reset tracker + ordersPendingDelete = []; + + log.debug(`Processed ${orderCounter}, saving registry`); + + // Save the registry after processing each chunk + await registry.write(); + } + const ownerRef = `${ownerCounter}.${orderCounter}`; - const orderRef = `${chainId}:${ownerRef}@${blockNumber}`; - const log = getLogger( - "checkForAndPlaceOrder:checkForAndPlaceOrder", - chainId.toString(), - blockNumber.toString(), - ownerRef - ); + const orderRef = `${chainId}:${blockNumber}:${ownerRef}`; const logOrderDetails = `Processing order ${conditionalOrder.id} from TX ${conditionalOrder.tx} with params:`; const { result: lastHint } = conditionalOrder.pollResult || {}; @@ -158,7 +171,6 @@ export async function checkForAndPlaceOrder( case policy.FilterAction.DROP: log.info("Dropping conditional order. Reason: AcceptPolicy: DROP"); ordersPendingDelete.push(conditionalOrder); - continue; case policy.FilterAction.SKIP: log.debug("Skipping conditional order. Reason: AcceptPolicy: SKIP"); @@ -217,7 +229,6 @@ export async function checkForAndPlaceOrder( conditionalOrder.pollResult = { lastExecutionTimestamp: blockTimestamp, blockNumber: blockNumber, - result: pollResult, }; @@ -247,15 +258,7 @@ export async function checkForAndPlaceOrder( } // Delete orders we don't want to keep watching - for (const conditionalOrder of ordersPendingDelete) { - const deleted = conditionalOrders.delete(conditionalOrder); - const action = deleted ? "Stop Watching" : "Failed to stop watching"; - - log.debug( - `${action} conditional order ${conditionalOrder.id} from TX ${conditionalOrder.tx}` - ); - metrics.activeOrdersTotal.labels(chainId.toString()).dec(); - } + _deleteOrders(ordersPendingDelete, conditionalOrders, log, chainId); } // It may be handy in other versions of the watch tower implemented in other languages @@ -280,6 +283,24 @@ export async function checkForAndPlaceOrder( throw Error(`At least one unexpected error processing conditional orders`); } } + +function _deleteOrders( + ordersPendingDelete: ConditionalOrder[], + conditionalOrders: Set, + log: LoggerWithMethods, + chainId: SupportedChainId +) { + for (const conditionalOrder of ordersPendingDelete) { + const deleted = conditionalOrders.delete(conditionalOrder); + const action = deleted ? "Stop Watching" : "Failed to stop watching"; + + log.debug( + `${action} conditional order ${conditionalOrder.id} from TX ${conditionalOrder.tx}` + ); + metrics.activeOrdersTotal.labels(chainId.toString()).dec(); + } +} + async function _processConditionalOrder( owner: string, conditionalOrder: ConditionalOrder, @@ -513,7 +534,7 @@ async function _placeOrder(params: { // If the operation is a dry run, don't post to the API log.info(`Post order ${orderUid} to OrderBook on chain ${chainId}`); - log.debug(`Post order details`, postOrder); + log.debug(`Post order ${orderUid} details`, postOrder); if (!dryRun) { const orderUid = await orderBookApi.sendOrder(postOrder); metrics.orderBookDiscreteOrdersTotal.labels(...metricLabels).inc(); diff --git a/src/services/chain.ts b/src/services/chain.ts index e431f67..cb314cc 100644 --- a/src/services/chain.ts +++ b/src/services/chain.ts @@ -1,21 +1,23 @@ import { - Registry, + ApiBaseUrls, + OrderBookApi, + SupportedChainId, +} from "@cowprotocol/cow-sdk"; +import { ethers, providers } from "ethers"; +import { DBService } from "."; +import { addContract } from "../domain/events"; +import { checkForAndPlaceOrder } from "../domain/polling"; +import { policy } from "../domain/polling/filtering"; +import { + ComposableCoW, ConditionalOrderCreatedEvent, + ContextOptions, Multicall3, - ComposableCoW, Multicall3__factory, + Registry, RegistryBlock, blockToRegistryBlock, - ContextOptions, } from "../types"; -import { - SupportedChainId, - OrderBookApi, - ApiBaseUrls, -} from "@cowprotocol/cow-sdk"; -import { addContract } from "../domain/events"; -import { checkForAndPlaceOrder } from "../domain/polling"; -import { ethers, providers } from "ethers"; import { LoggerWithMethods, composableCowContract, @@ -23,8 +25,6 @@ import { isRunningInKubernetesPod, metrics, } from "../utils"; -import { DBService } from "."; -import { policy } from "../domain/polling/filtering"; const WATCHDOG_FREQUENCY_SECS = 5; // 5 seconds const WATCHDOG_TIMEOUT_DEFAULT_SECS = 30; @@ -295,7 +295,7 @@ export class ChainContext { oneShot ? "Chain watcher is in sync" : "Chain watcher is warmed up" }` ); - log.debug(`Last processed block: ${lastProcessedBlock}`); + log.debug(`Last processed block: ${JSON.stringify(lastProcessedBlock)}`); // If one-shot, return if (oneShot) { @@ -591,11 +591,15 @@ function _formatResult(result: boolean) { } function getProvider(rpcUrl: string): providers.Provider { + const log = getLogger("getProvider", rpcUrl); // if the rpcUrl is a websocket url, use the WebSocketProvider if (rpcUrl.startsWith("ws")) { + log.debug("Instantiating WS"); return new providers.WebSocketProvider(rpcUrl); } + log.debug("Instantiating HTTP"); + // otherwise, use the JsonRpcProvider return new providers.JsonRpcProvider(rpcUrl); } diff --git a/src/services/storage.ts b/src/services/storage.ts index b0744fa..7c3d145 100644 --- a/src/services/storage.ts +++ b/src/services/storage.ts @@ -28,6 +28,8 @@ export class DBService { } public async open() { + const log = getLogger("dbService:open"); + log.info("Opening database..."); await this.db.open(); } diff --git a/src/types/model.ts b/src/types/model.ts index 6171d9d..e6fe803 100644 --- a/src/types/model.ts +++ b/src/types/model.ts @@ -2,10 +2,13 @@ import Slack = require("node-slack"); import { BytesLike, ethers } from "ethers"; -import { ConditionalOrderParams, PollResult } from "@cowprotocol/cow-sdk"; +import { + ConditionalOrderParams, + ConditionalOrder as ConditionalOrderSdk, + PollResult, +} from "@cowprotocol/cow-sdk"; import { DBService } from "../services"; -import { metrics } from "../utils"; -import { ConditionalOrder as ConditionalOrderSdk } from "@cowprotocol/cow-sdk"; +import { getLogger, metrics } from "../utils"; // Standardise the storage key const LAST_NOTIFIED_ERROR_STORAGE_KEY = "LAST_NOTIFIED_ERROR"; @@ -237,6 +240,14 @@ export class Registry { // Write all atomically await batch.write(); + + const log = getLogger( + `Registry:write:${this.version}:${this.network}:${ + this.lastProcessedBlock?.number + }:${this.lastNotifiedError || ""}` + ); + + log.debug("batch written 📝"); } public stringifyOrders(): string { diff --git a/src/types/types.d.ts b/src/types/types.d.ts index e7a35f0..b135b36 100644 --- a/src/types/types.d.ts +++ b/src/types/types.d.ts @@ -14,7 +14,7 @@ export interface Config { deploymentBlock: number; watchdogTimeout?: number; /** - * Throttle block processing to only process blocks every N blocks. Set to 1 to process every block, 2 to process every other block, etc. + * Throttle block processing to only process blocks every N blocks. Set to 1 to process every block (default), 2 to process every other block, etc. */ processEveryNumBlocks?: number; orderBookApi?: string;