diff --git a/apps/cli/src/commands/event-listener.ts b/apps/cli/src/commands/event-listener.ts new file mode 100644 index 000000000..e4b2dc2cf --- /dev/null +++ b/apps/cli/src/commands/event-listener.ts @@ -0,0 +1,25 @@ +import { resilientEventListener, config, RefereeAbi } from '@sentry/core'; +import Vorpal from 'vorpal'; + +export function eventListener(cli: Vorpal) { + cli + .command('event-listener', 'Starts the event listener with provided arguments') + .option('-r, --rpcUrl ', 'RPC URL') + .option('-c, --contractAddress ', 'Contract address') + .option('-a, --abi ', 'ABI of the contract') + .option('-e, --eventName ', 'Event name to listen for') + .action(async function (this: Vorpal.CommandInstance, args) { + const { rpcUrl, contractAddress, abi, eventName } = args.options; + + const { stop } = resilientEventListener({ + rpcUrl: rpcUrl ? rpcUrl : config.arbitrumOneWebSocketUrl, + contractAddress: contractAddress ? contractAddress : config.refereeAddress, + abi: abi ? JSON.parse(abi) : RefereeAbi, + eventName: eventName ? eventName : "RewardsClaimed", + log: (value: string) => this.log(value), + }); + + return new Promise((resolve, reject) => { }); // Keep the command alive + + }); +} diff --git a/apps/cli/src/index.ts b/apps/cli/src/index.ts index bb4f34979..03c1f8946 100644 --- a/apps/cli/src/index.ts +++ b/apps/cli/src/index.ts @@ -41,6 +41,7 @@ import { bootOperator } from './commands/operator-control/operator-runtime.js'; import { setOrAddPricingTiersCommand } from './commands/licenses/set-or-add-pricing-tiers.js'; import { addPromoCode } from './commands/licenses/add-promo-code.js'; import { removePromoCode } from './commands/licenses/remove-promo-code.js'; +import { eventListener } from './commands/event-listener.js'; const cli = new Vorpal(); @@ -57,6 +58,7 @@ checkKycStatus(cli); checkWhitelist(cli); createBlsKeyPair(cli); createMnemonic(cli); +eventListener(cli); getAllContractAddresses(cli); getAssertionCheckingStatus(cli); getBalancesForAddresses(cli); diff --git a/packages/core/package.json b/packages/core/package.json index 9fdeac74b..ff4caac06 100644 --- a/packages/core/package.json +++ b/packages/core/package.json @@ -15,9 +15,11 @@ "license": "UNLICENSED", "dependencies": { "@noble/curves": "^1.2.0", - "alchemy-sdk": "^3.1.0" + "alchemy-sdk": "^3.1.0", + "isomorphic-ws": "^5.0.0" }, "devDependencies": { - "@types/node": "^20.6.0" + "@types/node": "^20.6.0", + "@types/ws": "^8.5.10" } } diff --git a/packages/core/src/challenger/listenForAssertions.ts b/packages/core/src/challenger/listenForAssertions.ts index d5c81cfa4..6a295063f 100644 --- a/packages/core/src/challenger/listenForAssertions.ts +++ b/packages/core/src/challenger/listenForAssertions.ts @@ -1,7 +1,7 @@ -import {ethers} from "ethers"; +import { LogDescription } from "ethers"; import { RollupAdminLogicAbi } from "../abis/RollupAdminLogicAbi.js"; -import { getProvider } from "../utils/getProvider.js"; import { config } from "../config.js"; +import { resilientEventListener } from "../utils/resilientEventListener.js"; /** * Listens for NodeConfirmed events and triggers a callback function when the event is emitted. @@ -10,42 +10,32 @@ import { config } from "../config.js"; * @param log - The logging function to be used for logging. * @returns A function that can be called to stop listening for the event. */ -export function listenForAssertions(callback: (nodeNum: any, blockHash: any, sendRoot: any, event: any) => void, log: (log: string) => void): () => void { - const provider = getProvider("wss://arb-goerli.g.alchemy.com/v2/WNOJEZxrhn3a0PzKUVEZgeRJqxOL7brv"); // arb goerli while we run on testnet - - // create an instance of the rollup contract - const rollupContract = new ethers.Contract(config.rollupAddress, RollupAdminLogicAbi, {provider}); - +export function listenForAssertions(callback: (nodeNum: any, blockHash: any, sendRoot: any, event: any) => void, _log: (log: string) => void): () => void { // create a map to keep track of nodeNums that have called the callback const nodeNumMap: { [nodeNum: string]: boolean } = {}; // listen for the NodeConfirmed event - rollupContract.on("NodeConfirmed", (nodeNum, blockHash, sendRoot, event) => { + const listener = resilientEventListener({ + rpcUrl: "wss://arb-goerli.g.alchemy.com/v2/WNOJEZxrhn3a0PzKUVEZgeRJqxOL7brv", + contractAddress: config.rollupAddress, + abi: RollupAdminLogicAbi, + eventName: "NodeConfirmed", + log: _log, + callback: (log: LogDescription | null) => { + const nodeNum = BigInt(log?.args[0]); + const blockHash = log?.args[1]; + const sendRoot = log?.args[2]; - // if the nodeNum has not been seen before, call the callback and add it to the map - if (!nodeNumMap[nodeNum]) { - log(`[${new Date().toISOString()}] NodeConfirmed event received for new nodeNum: ${nodeNum}`); - nodeNumMap[nodeNum] = true; - void callback(nodeNum, blockHash, sendRoot, event); + // if the nodeNum has not been seen before, call the callback and add it to the map + if (!nodeNumMap[nodeNum.toString()]) { + nodeNumMap[nodeNum.toString()] = true; + void callback(nodeNum, blockHash, sendRoot, log); + } } }); - // Request the current block number immediately and then every 5 minutes - const fetchBlockNumber = async () => { - try { - const blockNumber = await provider.getBlockNumber(); - log(`[${new Date().toISOString()}] Health Check, Challenger still healthy. Current block number: ${blockNumber}`); - } catch (error) { - log(`[${new Date().toISOString()}] Error fetching block number, challenger may no longer be connected to the RPC: ${JSON.stringify(error)}`); - } - }; - fetchBlockNumber(); - const interval = setInterval(fetchBlockNumber, 300000); - // return a function that can be used to stop listening for the event return () => { - log(`[${new Date().toISOString()}] Stopping listening for NodeConfirmed events`); - rollupContract.removeAllListeners("NodeConfirmed"); - clearInterval(interval); + listener.stop(); }; } diff --git a/packages/core/src/config.ts b/packages/core/src/config.ts index af9cc1416..c9eb472f8 100644 --- a/packages/core/src/config.ts +++ b/packages/core/src/config.ts @@ -1,6 +1,8 @@ export let config = { "arbitrumBlockExplorer": "https://arbiscan.io", "arbitrumGoerliBlockExplorer": "https://goerli.arbiscan.io", + "arbitrumOneJsonRpcUrl": "https://arb-mainnet.g.alchemy.com/v2/p_LSgTIj_JtEt3JPM7IZIZFL1a70yvQJ", + "arbitrumOneWebSocketUrl": "wss://arb-mainnet.g.alchemy.com/v2/p_LSgTIj_JtEt3JPM7IZIZFL1a70yvQJ", "defaultRpcUrl": "https://arb-mainnet.g.alchemy.com/v2/p_LSgTIj_JtEt3JPM7IZIZFL1a70yvQJ", "esXaiAddress": "0x4C749d097832DE2FEcc989ce18fDc5f1BD76700c", "esXaiDeployedBlockNumber": 157193630, diff --git a/packages/core/src/operator/listenForChallenges.ts b/packages/core/src/operator/listenForChallenges.ts index 4e05232f2..c059e50a0 100644 --- a/packages/core/src/operator/listenForChallenges.ts +++ b/packages/core/src/operator/listenForChallenges.ts @@ -3,6 +3,7 @@ import { RefereeAbi } from "../abis/RefereeAbi.js"; import { getProvider } from "../utils/getProvider.js"; import { config } from "../config.js"; import { Challenge, getChallenge } from "../index.js"; +import { resilientEventListener } from "../utils/resilientEventListener.js"; /** * Listens for ChallengeSubmitted events and triggers a callback function when the event is emitted. @@ -11,31 +12,34 @@ import { Challenge, getChallenge } from "../index.js"; * @returns A function that can be called to stop listening for the event. */ export function listenForChallenges(callback: (challengeNumber: bigint, challenge: Challenge, event: any) => void): () => void { - // get a provider for the arb one network - const provider = getProvider(); - - // create an instance of the Referee contract - const refereeContract = new ethers.Contract(config.refereeAddress, RefereeAbi, provider); // create a map to keep track of challengeNumbers that have called the callback const challengeNumberMap: { [challengeNumber: string]: boolean } = {}; // listen for the ChallengeSubmitted event - refereeContract.on("ChallengeSubmitted", async (challengeNumber, event) => { - - // if the challengeNumber has not been seen before, call the callback and add it to the map - if (!challengeNumberMap[challengeNumber.toString()]) { - challengeNumberMap[challengeNumber.toString()] = true; - - // lookup the challenge - const challenge = await getChallenge(challengeNumber); - - void callback(challengeNumber, challenge, event); + const listener = resilientEventListener({ + rpcUrl: config.arbitrumOneWebSocketUrl, + contractAddress: config.refereeAddress, + abi: RefereeAbi, + eventName: "ChallengeSubmitted", + log: console.info, + callback: async (log) => { + const challengeNumber = BigInt(log?.args[0]); + + // if the challengeNumber has not been seen before, call the callback and add it to the map + if (!challengeNumberMap[challengeNumber.toString()]) { + challengeNumberMap[challengeNumber.toString()] = true; + + // lookup the challenge + const challenge = await getChallenge(challengeNumber); + + void callback(challengeNumber, challenge, log); + } } }); // return a function that can be used to stop listening for the event return () => { - refereeContract.removeAllListeners("ChallengeSubmitted"); + listener.stop(); }; } diff --git a/packages/core/src/operator/operatorRuntime.ts b/packages/core/src/operator/operatorRuntime.ts index 43dd8ce82..726f23844 100644 --- a/packages/core/src/operator/operatorRuntime.ts +++ b/packages/core/src/operator/operatorRuntime.ts @@ -32,7 +32,7 @@ export async function operatorRuntime( logFunction: (log: string) => void = (_) => {}, ): Promise<() => Promise> { - logFunction("Booting operator runtime."); + logFunction(`[${new Date().toISOString()}] Booting operator runtime.`); const provider = await getProvider(); @@ -53,20 +53,20 @@ export async function operatorRuntime( // get the address of the operator const operatorAddress = await signer.getAddress(); - logFunction(`Fetched address of operator ${operatorAddress}.`); + logFunction(`[${new Date().toISOString()}] Fetched address of operator ${operatorAddress}.`); // get a list of all the owners that are added to this operator - logFunction("Getting all wallets assigned to the operator."); + logFunction(`[${new Date().toISOString()}] Getting all wallets assigned to the operator.`); const owners = [operatorAddress, ...await retry(async () => await listOwnersForOperator(operatorAddress))]; - logFunction(`Received ${owners.length} wallets that are assigned to this operator.`); + logFunction(`[${new Date().toISOString()}] Received ${owners.length} wallets that are assigned to this operator.`); // get a list of all the node licenses for each of the owners let nodeLicenseIds: bigint[] = []; - logFunction("Getting all node licenses for each owner."); + logFunction(`[${new Date().toISOString()}] Getting all node licenses for each owner.`); for (const owner of owners) { - logFunction(`Fetching node licenses for owner ${owner}.`); + logFunction(`[${new Date().toISOString()}] Fetching node licenses for owner ${owner}.`); const licensesOfOwner = await listNodeLicenses(owner, (tokenId) => { - logFunction(`Fetched Sentry Key ${tokenId.toString()} for owner ${owner}.`); + logFunction(`[${new Date().toISOString()}] Fetched Sentry Key ${tokenId.toString()} for owner ${owner}.`); nodeLicenseStatusMap.set(tokenId, { ownerPublicKey: owner, status: NodeLicenseStatus.WAITING_IN_QUEUE, @@ -74,15 +74,15 @@ export async function operatorRuntime( safeStatusCallback(); }); nodeLicenseIds = [...nodeLicenseIds, ...licensesOfOwner]; - logFunction(`Fetched ${licensesOfOwner.length} node licenses for owner ${owner}.`); + logFunction(`[${new Date().toISOString()}] Fetched ${licensesOfOwner.length} node licenses for owner ${owner}.`); } - logFunction(`Total Sentry Keys fetched: ${nodeLicenseIds.length}.`); + logFunction(`[${new Date().toISOString()}] Total Sentry Keys fetched: ${nodeLicenseIds.length}.`); // create a mapping of all the timestamps these nodeLicenses were created at, so we can easily check the eligibility later - logFunction("Checking Sentry Key eligibility."); + logFunction(`[${new Date().toISOString()}] Checking Sentry Key eligibility.`); const mintTimestamps: { [nodeLicenseId: string]: bigint } = {}; for (const nodeLicenseId of nodeLicenseIds) { - logFunction(`Fetching metadata for Sentry Key ${nodeLicenseId}.`); + logFunction(`[${new Date().toISOString()}] Fetching metadata for Sentry Key ${nodeLicenseId}.`); nodeLicenseStatusMap.set(nodeLicenseId, { ...nodeLicenseStatusMap.get(nodeLicenseId) as NodeLicenseInformation, status: NodeLicenseStatus.FETCHING_MINT_TIMESTAMP, @@ -94,9 +94,9 @@ export async function operatorRuntime( status: NodeLicenseStatus.WAITING_FOR_NEXT_CHALLENGE, }); safeStatusCallback(); - logFunction(`Fetched metadata for Sentry Key ${nodeLicenseId}.`); + logFunction(`[${new Date().toISOString()}] Fetched metadata for Sentry Key ${nodeLicenseId}.`); } - logFunction("Finished creating the lookup of metadata for the Sentry Keys."); + logFunction(`[${new Date().toISOString()}] Finished creating the lookup of metadata for the Sentry Keys.`); /** * Processes a new challenge for all the node licenses. @@ -104,10 +104,10 @@ export async function operatorRuntime( * @param {Challenge} challenge - The challenge. */ async function processNewChallenge(challengeNumber: bigint, challenge: Challenge) { - logFunction(`Processing new challenge with number: ${challengeNumber}.`); + logFunction(`[${new Date().toISOString()}] Processing new challenge with number: ${challengeNumber}.`); for (const nodeLicenseId of nodeLicenseIds) { - logFunction(`Checking eligibility for nodeLicenseId ${nodeLicenseId}.`); + logFunction(`[${new Date().toISOString()}] Checking eligibility for nodeLicenseId ${nodeLicenseId}.`); // check the nodeLicense is eligible to submit to this challenge, it must have been minted before the challenge was opened. nodeLicenseStatusMap.set(nodeLicenseId, { @@ -117,7 +117,7 @@ export async function operatorRuntime( safeStatusCallback(); if (challenge.createdTimestamp <= mintTimestamps[nodeLicenseId.toString()]) { - logFunction(`Sentry Key ${nodeLicenseId} is not eligible for challenge ${challengeNumber}.`); + logFunction(`[${new Date().toISOString()}] Sentry Key ${nodeLicenseId} is not eligible for challenge ${challengeNumber}.`); nodeLicenseStatusMap.set(nodeLicenseId, { ...nodeLicenseStatusMap.get(nodeLicenseId) as NodeLicenseInformation, status: NodeLicenseStatus.WAITING_FOR_NEXT_CHALLENGE, @@ -135,7 +135,7 @@ export async function operatorRuntime( const [payoutEligible] = await retry(async () => await refereeContract.createAssertionHashAndCheckPayout(nodeLicenseId, challengeNumber, challenge.assertionStateRootOrConfirmData, challenge.challengerSignedHash)); if (!payoutEligible) { - logFunction(`Sentry Key ${nodeLicenseId} did not accrue esXAI for the challenge ${challengeNumber}. A Sentry Key receives esXAI every few days.`); + logFunction(`[${new Date().toISOString()}] Sentry Key ${nodeLicenseId} did not accrue esXAI for the challenge ${challengeNumber}. A Sentry Key receives esXAI every few days.`); nodeLicenseStatusMap.set(nodeLicenseId, { ...nodeLicenseStatusMap.get(nodeLicenseId) as NodeLicenseInformation, status: NodeLicenseStatus.WAITING_FOR_NEXT_CHALLENGE, @@ -147,7 +147,7 @@ export async function operatorRuntime( // check to see if this nodeLicense has already submitted, if we have, then go to next license const [{submitted}] = await retry(async () => await getSubmissionsForChallenges([challengeNumber], nodeLicenseId)); if (submitted) { - logFunction(`Sentry Key ${nodeLicenseId} has submitted for challenge ${challengeNumber} by another node. If multiple nodes are running, this message can be ignored.`); + logFunction(`[${new Date().toISOString()}] Sentry Key ${nodeLicenseId} has submitted for challenge ${challengeNumber} by another node. If multiple nodes are running, this message can be ignored.`); nodeLicenseStatusMap.set(nodeLicenseId, { ...nodeLicenseStatusMap.get(nodeLicenseId) as NodeLicenseInformation, status: NodeLicenseStatus.WAITING_FOR_NEXT_CHALLENGE, @@ -158,16 +158,16 @@ export async function operatorRuntime( // submit the claim to the challenge try { - logFunction(`Submitting assertion for Sentry Key ${nodeLicenseId} to challenge ${challengeNumber}.`); + logFunction(`[${new Date().toISOString()}] Submitting assertion for Sentry Key ${nodeLicenseId} to challenge ${challengeNumber}.`); await retry(async () => await submitAssertionToChallenge(nodeLicenseId, challengeNumber, challenge.assertionStateRootOrConfirmData, signer)); - logFunction(`Submitted assertion for Sentry Key ${nodeLicenseId} to challenge ${challengeNumber}. You have accrued esXAI.`); + logFunction(`[${new Date().toISOString()}] Submitted assertion for Sentry Key ${nodeLicenseId} to challenge ${challengeNumber}. You have accrued esXAI.`); nodeLicenseStatusMap.set(nodeLicenseId, { ...nodeLicenseStatusMap.get(nodeLicenseId) as NodeLicenseInformation, status: NodeLicenseStatus.WAITING_FOR_NEXT_CHALLENGE, }); safeStatusCallback(); } catch (err) { - logFunction(`Sentry Key ${nodeLicenseId} has submitted for challenge ${challengeNumber} by another node. If multiple nodes are running, this message can be ignored.`); + logFunction(`[${new Date().toISOString()}] Sentry Key ${nodeLicenseId} has submitted for challenge ${challengeNumber} by another node. If multiple nodes are running, this message can be ignored.`); } } @@ -175,7 +175,7 @@ export async function operatorRuntime( async function processClaimForChallenge(challengeNumber: bigint, nodeLicenseId: bigint) { - logFunction(`Checking KYC status of '${nodeLicenseStatusMap.get(nodeLicenseId)!.ownerPublicKey}' for Sentry Key '${nodeLicenseId}'.`); + logFunction(`[${new Date().toISOString()}] Checking KYC status of '${nodeLicenseStatusMap.get(nodeLicenseId)!.ownerPublicKey}' for Sentry Key '${nodeLicenseId}'.`); nodeLicenseStatusMap.set(nodeLicenseId, { ...nodeLicenseStatusMap.get(nodeLicenseId) as NodeLicenseInformation, status: `Checking KYC Status`, @@ -186,7 +186,7 @@ export async function operatorRuntime( const [{isKycApproved}] = await retry(async () => await checkKycStatus([nodeLicenseStatusMap.get(nodeLicenseId)!.ownerPublicKey])); if (isKycApproved) { - logFunction(`Requesting esXAI reward for challenge '${challengeNumber}'.`); + logFunction(`[${new Date().toISOString()}] Requesting esXAI reward for challenge '${challengeNumber}'.`); nodeLicenseStatusMap.set(nodeLicenseId, { ...nodeLicenseStatusMap.get(nodeLicenseId) as NodeLicenseInformation, status: `Requesting esXAI reward for challenge '${challengeNumber}'.'`, @@ -195,14 +195,14 @@ export async function operatorRuntime( await retry(async () => await claimReward(nodeLicenseId, challengeNumber, signer)); - logFunction(`esXAI claim was successful for Challenge '${challengeNumber}'.`); + logFunction(`[${new Date().toISOString()}] esXAI claim was successful for Challenge '${challengeNumber}'.`); nodeLicenseStatusMap.set(nodeLicenseId, { ...nodeLicenseStatusMap.get(nodeLicenseId) as NodeLicenseInformation, status: `esXAI claim was successful for Challenge '${challengeNumber}'`, }); safeStatusCallback(); } else { - logFunction(`Checked KYC status of '${nodeLicenseStatusMap.get(nodeLicenseId)!.ownerPublicKey}' for Sentry Key '${nodeLicenseId}'. It was not KYC'd and not able to claim the reward.`); + logFunction(`[${new Date().toISOString()}] Checked KYC status of '${nodeLicenseStatusMap.get(nodeLicenseId)!.ownerPublicKey}' for Sentry Key '${nodeLicenseId}'. It was not KYC'd and not able to claim the reward.`); nodeLicenseStatusMap.set(nodeLicenseId, { ...nodeLicenseStatusMap.get(nodeLicenseId) as NodeLicenseInformation, status: `Cannot Claim, Failed KYC`, @@ -216,13 +216,13 @@ export async function operatorRuntime( async function listenForChallengesCallback(challengeNumber: bigint, challenge: Challenge, event?: any) { if (challenge.openForSubmissions) { - logFunction(`Received new challenge with number: ${challengeNumber}.`); + logFunction(`[${new Date().toISOString()}] Received new challenge with number: ${challengeNumber}.`); if (!challengeNumberMap[challengeNumber.toString()]) { challengeNumberMap[challengeNumber.toString()] = true; await processNewChallenge(challengeNumber, challenge); } } else { - logFunction(`Looking for previously accrued rewards on Challenge '${challengeNumber}'.`); + logFunction(`[${new Date().toISOString()}] Looking for previously accrued rewards on Challenge '${challengeNumber}'.`); } // check the previous challenge, that should be closed now @@ -232,10 +232,10 @@ export async function operatorRuntime( } const closeChallengeListener = await listenForChallenges(listenForChallengesCallback); - logFunction(`Started listener for new challenges.`); + logFunction(`[${new Date().toISOString()}] Started listener for new challenges.`); // find any open challenges - logFunction(`Processing open challenges.`); + logFunction(`[${new Date().toISOString()}] Processing open challenges.`); const challenges = await listChallenges(false, listenForChallengesCallback); // create a function that checks all the submissions for a closed challenge @@ -247,7 +247,7 @@ export async function operatorRuntime( status: NodeLicenseStatus.QUERYING_FOR_UNCLAIMED_SUBMISSIONS, }); safeStatusCallback(); - logFunction(`Checking for unclaimed rewards on Sentry Key '${nodeLicenseId}'.`); + logFunction(`[${new Date().toISOString()}] Checking for unclaimed rewards on Sentry Key '${nodeLicenseId}'.`); await getSubmissionsForChallenges(challengeIds, nodeLicenseId, async (submission, index) => { @@ -258,7 +258,7 @@ export async function operatorRuntime( status: `Checking For Unclaimed Rewards on Challenge '${challengeId}'`, }); safeStatusCallback(); - logFunction(`Checking for unclaimed rewards on Challenge '${challengeId}'.`); + logFunction(`[${new Date().toISOString()}] Checking for unclaimed rewards on Challenge '${challengeId}'.`); // call the process claim and update statuses/logs accoridngly if (submission.submitted && !submission.claimed) { @@ -267,7 +267,7 @@ export async function operatorRuntime( status: `Found Unclaimed Reward for Challenge '${challengeId}'`, }); safeStatusCallback(); - logFunction(`Found unclaimed reward for challenge '${challengeId}'.`); + logFunction(`[${new Date().toISOString()}] Found unclaimed reward for challenge '${challengeId}'.`); await processClaimForChallenge(challengeId, nodeLicenseId); } }); @@ -283,15 +283,15 @@ export async function operatorRuntime( // iterate over all the challenges that are closed to see if any are available for claiming const closedChallengeIds = challenges.filter(([_, challenge]) => !challenge.openForSubmissions).map(([challengeNumber]) => challengeNumber); await processClosedChallenges(closedChallengeIds); - logFunction(`The operator has finished booting. The operator is running successfully. esXAI will accrue every few days.`); + logFunction(`[${new Date().toISOString()}] The operator has finished booting. The operator is running successfully. esXAI will accrue every few days.`); // Request the current block number immediately and then every 5 minutes const fetchBlockNumber = async () => { try { const blockNumber = await provider.getBlockNumber(); - logFunction(`[${new Date().toISOString()}] Health Check, Operator still healthy. Current block number: ${blockNumber}`); + logFunction(`[${new Date().toISOString()}] Health Check on JSON RPC, Operator still healthy. Current block number: ${blockNumber}`); } catch (error) { - logFunction(`[${new Date().toISOString()}] Error fetching block number, operator may no longer be connected to the RPC: ${JSON.stringify(error)}`); + logFunction(`[${new Date().toISOString()}] Error fetching block number, operator may no longer be connected to the JSON RPC: ${JSON.stringify(error)}.`); } }; fetchBlockNumber(); diff --git a/packages/core/src/utils/getProvider.ts b/packages/core/src/utils/getProvider.ts index 452d5e5cc..762fd7f40 100644 --- a/packages/core/src/utils/getProvider.ts +++ b/packages/core/src/utils/getProvider.ts @@ -1,4 +1,5 @@ import { Networkish, ethers } from 'ethers'; +import { config } from '../index.js'; // global storage of providers const providers: { [key: string]: ethers.JsonRpcProvider | ethers.WebSocketProvider | ethers.AlchemyProvider } = {}; @@ -11,11 +12,11 @@ const providers: { [key: string]: ethers.JsonRpcProvider | ethers.WebSocketProvi * @returns An ethers provider. */ export function getProvider( - rpcUrl: string | undefined = "wss://arb-mainnet.g.alchemy.com/v2/p_LSgTIj_JtEt3JPM7IZIZFL1a70yvQJ", + rpcUrl: string | undefined = config.arbitrumOneJsonRpcUrl, ignoreMemo: boolean = false, - alchemyNetwork: Networkish = {name: "arbitrum", chainId: 42161} + alchemyNetwork: Networkish = { name: "arbitrum", chainId: 42161 } ): ethers.JsonRpcProvider | ethers.WebSocketProvider | ethers.AlchemyProvider { - + const memoKey = rpcUrl != null ? rpcUrl : JSON.stringify(alchemyNetwork); if (!ignoreMemo && providers[memoKey]) { @@ -31,7 +32,7 @@ export function getProvider( provider = new ethers.WebSocketProvider(memoKey); } else { console.log("Provisioning alchemy provider."); - const apiKey = 'p_LSgTIj_JtEt3JPM7IZIZFL1a70yvQJ'; + const apiKey = 'p_LSgTIj_JtEt3JPM7IZIZFL1a70yvQJ'; provider = new ethers.AlchemyProvider(alchemyNetwork, apiKey); } diff --git a/packages/core/src/utils/index.ts b/packages/core/src/utils/index.ts index f87009094..d5d8aae46 100644 --- a/packages/core/src/utils/index.ts +++ b/packages/core/src/utils/index.ts @@ -8,4 +8,5 @@ export * from "./getClosestBlock.js"; export * from "./findEventTopic.js"; export * from "./getWalletBalance.js"; export * from "./verifyPrivateKey.js"; -export * from "./retry.js"; \ No newline at end of file +export * from "./retry.js"; +export * from "./resilientEventListener.js"; \ No newline at end of file diff --git a/packages/core/src/utils/resilientEventListener.ts b/packages/core/src/utils/resilientEventListener.ts new file mode 100644 index 000000000..23a06be42 --- /dev/null +++ b/packages/core/src/utils/resilientEventListener.ts @@ -0,0 +1,132 @@ +import WebSocket from 'isomorphic-ws'; +import {Contract, InterfaceAbi, LogDescription} from "ethers"; + +interface ResilientEventListenerArgs { + rpcUrl: string, + contractAddress: string, + abi: InterfaceAbi, + eventName: string, + log?: (value: string, ...values: string[]) => void; + callback?: (log: LogDescription | null) => void; +} + +const EXPECTED_PONG_BACK = 15000; +const KEEP_ALIVE_CHECK_INTERVAL = 60 * 1000; //7500; + +/** + * This function creates a resilient event listener for a given contract on an EVM-based network. + * It uses a WebSocket connection to the EVM node specified by the rpcUrl. + * The event listener is resilient in the sense that it will automatically reconnect in case of connection errors or closure. + * + * @param args - The arguments for the event listener. + * @param args.rpcUrl - The URL of the EVM node to connect to. + * @param args.contractAddress - The address of the contract to listen to. + * @param args.abi - The ABI of the contract. + * @param args.eventName - The name of the event to listen to. + * @param args.log - An optional logging function. If provided, it will be called with log messages. + * @param args.callback - An optional callback function. If provided, it will be called with the parsed log data whenever an event is received. + */ +export function resilientEventListener(args: ResilientEventListenerArgs) { + let ws: WebSocket | null = null; + + let pingTimeout: NodeJS.Timeout; + let keepAliveInterval: NodeJS.Timeout; + + const connect = () => { + ws = new WebSocket(args.rpcUrl); + + const contract = new Contract(args.contractAddress, args.abi); + const topicHash = contract.getEvent(args.eventName).getFragment().topicHash; + let subscriptionId: string; + + args.log && args.log(`[${new Date().toISOString()}] subscribing to event listener with topic hash: ${topicHash}`); + + const request = { + id: 1, + method: "eth_subscribe", + params: [ + "logs", + { + topics: [topicHash], + address: args.contractAddress, + } + ] + }; + + // sending this backs should return a result of true + const ping = { + id: 2, + method: "net_listening", + params:[], + }; + + ws.onerror = function error(err: any) { + args.log && args.log(`[${new Date().toISOString()}] WebSocket error: ${err}`); + }; + + ws.onclose = function close() { + args.log && args.log(`[${new Date().toISOString()}] WebSocket closed`); + if (keepAliveInterval) clearInterval(keepAliveInterval); + if (pingTimeout) clearTimeout(pingTimeout); + ws = null; + // Reconnect when the connection is closed + setTimeout(connect, 1000); + }; + + ws.onmessage = function message(event: any) { + let parsedData; + if (typeof event.data === 'string') { + parsedData = JSON.parse(event.data); + } else if (event.data instanceof ArrayBuffer) { + const dataString = new TextDecoder().decode(event.data); + parsedData = JSON.parse(dataString); + } + + if (parsedData?.id === request.id) { + subscriptionId = parsedData.result; + args.log && args.log(`[${new Date().toISOString()}] Subscription to event '${args.eventName}' established with subscription ID '${parsedData.result}'.`); + } else if(parsedData?.id === ping.id && parsedData?.result === true) { + args.log && args.log(`[${new Date().toISOString()}] Health check complete, subscription to '${args.eventName}' is still active.`) + if (pingTimeout) clearInterval(pingTimeout); + } else if (parsedData?.method === 'eth_subscription' && parsedData.params.subscription === subscriptionId) { + const log = parsedData.params.result; + const event = contract.interface.parseLog(log); + args.log && args.log(`[${new Date().toISOString()}] Received event ${event?.name}: ${event?.args}`); + args.callback && args.callback(event); + } + }; + + ws.onopen = function open() { + args.log && args.log(`[${new Date().toISOString()}] Opened connection to Web Socket RPC`) + ws!.send(JSON.stringify(request)); + + keepAliveInterval = setInterval(() => { + if (!ws) { + args.log && args.log(`[${new Date().toISOString()}] No websocket, exiting keep alive interval`); + return; + } + args.log && args.log(`[${new Date().toISOString()}] Performing health check on the Web Socket RPC, to maintain subscription to '${args.eventName}'.`); + + ws.send(JSON.stringify(ping)); + pingTimeout = setTimeout(() => { + if (ws) ws.terminate(); + }, EXPECTED_PONG_BACK); + + }, KEEP_ALIVE_CHECK_INTERVAL); + + }; + } + + const stop = () => { + if (ws) { + ws.close(); + ws = null; + } + if (keepAliveInterval) clearInterval(keepAliveInterval); + if (pingTimeout) clearTimeout(pingTimeout); + }; + + connect(); + + return { stop }; +} diff --git a/packages/core/src/utils/retry.ts b/packages/core/src/utils/retry.ts index 358c584cd..d2830951e 100644 --- a/packages/core/src/utils/retry.ts +++ b/packages/core/src/utils/retry.ts @@ -10,7 +10,7 @@ export async function retry(process: () => Promise, retries: number = 10): return await process(); } catch (error) { if (retries === 0) { - console.error(`There was an error retrying a mechanism ${retries} times. Please save this error for troubleshooting.`); + console.error(`There was an error retrying a mechanism 10 times. Please save this error for troubleshooting.`); throw error; } const delay = retries === 1 ? 300000 : Math.random() * (30000 - 5000) + 5000; // Delay for 5 to 30 seconds, but 5 minutes for the last retry diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index 5d8a7c377..fb9ed10ca 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -368,10 +368,16 @@ importers: alchemy-sdk: specifier: ^3.1.0 version: 3.1.0 + isomorphic-ws: + specifier: ^5.0.0 + version: 5.0.0(ws@8.15.1) devDependencies: '@types/node': specifier: ^20.6.0 version: 20.6.0 + '@types/ws': + specifier: ^8.5.10 + version: 8.5.10 packages/ui: dependencies: @@ -6384,6 +6390,11 @@ packages: '@types/node': 20.6.0 dev: false + /@types/ws@8.5.10: + resolution: {integrity: sha512-vmQSUcfalpIq0R9q7uTo2lXs6eGIpt9wtnLdMv9LVpIjCA/+ufZRozlVoVelIYixx1ugCBKDhn89vnsEGOCx9A==} + dependencies: + '@types/node': 20.6.0 + /@types/ws@8.5.6: resolution: {integrity: sha512-8B5EO9jLVCy+B58PLHvLDuOD8DRVMgQzq8d55SjLCOn9kqGyqOvy27exVaTio1q1nX5zLu8/6N0n2ThSxOM6tg==} dependencies: @@ -12869,6 +12880,14 @@ packages: ws: 8.13.0(bufferutil@4.0.8)(utf-8-validate@5.0.10) dev: false + /isomorphic-ws@5.0.0(ws@8.15.1): + resolution: {integrity: sha512-muId7Zzn9ywDsyXgTIafTry2sV3nySZeUDe6YedVd1Hvuuep5AsIlqK+XefWpYTyJG5e503F2xIuT2lcU6rCSw==} + peerDependencies: + ws: '*' + dependencies: + ws: 8.15.1 + dev: false + /isstream@0.1.2: resolution: {integrity: sha512-Yljz7ffyPbrLpLngrMtZ7NduUgVvi6wG9RJ9IUcyCd59YQ911PBJphODUcbOVbqYfxe1wuYf/LJ8PauMRwsM/g==} dev: true @@ -19107,7 +19126,7 @@ packages: '@types/serve-index': 1.9.3 '@types/serve-static': 1.15.3 '@types/sockjs': 0.3.35 - '@types/ws': 8.5.6 + '@types/ws': 8.5.10 ansi-html-community: 0.0.8 bonjour-service: 1.1.1 chokidar: 3.5.3 @@ -19431,6 +19450,19 @@ packages: utf-8-validate: 5.0.10 dev: false + /ws@8.15.1: + resolution: {integrity: sha512-W5OZiCjXEmk0yZ66ZN82beM5Sz7l7coYxpRkzS+p9PP+ToQry8szKh+61eNktr7EA9DOwvFGhfC605jDHbP6QQ==} + engines: {node: '>=10.0.0'} + peerDependencies: + bufferutil: ^4.0.1 + utf-8-validate: '>=5.0.2' + peerDependenciesMeta: + bufferutil: + optional: true + utf-8-validate: + optional: true + dev: false + /ws@8.5.0: resolution: {integrity: sha512-BWX0SWVgLPzYwF8lTzEy1egjhS4S4OEAHfsO8o65WOVsrnSRGaSiUaa9e0ggGlkMTtBlmOpEXiie9RUcBO86qg==} engines: {node: '>=10.0.0'}