From 10c26a9623f7c65727506724c05fd65b57d60dbb Mon Sep 17 00:00:00 2001 From: Paul Noel Date: Thu, 5 Oct 2023 11:08:14 -0500 Subject: [PATCH] watcher: add nearArchiveWatcher --- watcher/scripts/backfillNear.ts | 17 +- watcher/src/databases/BigtableDatabase.ts | 1 + watcher/src/utils/near.ts | 38 ++++- watcher/src/watchers/NearArchiveWatcher.ts | 146 ++++++++++++++++++ .../watchers/__tests__/NearWatcher.test.ts | 16 +- watcher/src/watchers/utils.ts | 4 +- 6 files changed, 200 insertions(+), 22 deletions(-) create mode 100644 watcher/src/watchers/NearArchiveWatcher.ts diff --git a/watcher/scripts/backfillNear.ts b/watcher/scripts/backfillNear.ts index f4ad94d8..7802fb2b 100644 --- a/watcher/scripts/backfillNear.ts +++ b/watcher/scripts/backfillNear.ts @@ -8,7 +8,12 @@ import { import { BlockResult, Provider } from 'near-api-js/lib/providers/provider'; import ora from 'ora'; import { initDb, makeBlockKey } from '../src/databases/utils'; -import { getNearProvider, getTransactionsByAccountId, NEAR_ARCHIVE_RPC } from '../src/utils/near'; +import { + getNearProvider, + getTimestampByBlock, + getTransactionsByAccountId, + NEAR_ARCHIVE_RPC, +} from '../src/utils/near'; import { getMessagesFromBlockResults } from '../src/watchers/NearWatcher'; import { Transaction } from '../src/types/near'; import { VaasByBlock } from '../src/databases/types'; @@ -23,11 +28,6 @@ import { VaasByBlock } from '../src/databases/types'; const BATCH_SIZE = 100; -async function getTimestampByBlock(provider: Provider, blockHeight: number): Promise { - const block: BlockResult = await provider.block({ blockId: blockHeight }); - return block.header.timestamp; -} - (async () => { const db = initDb(false); // Don't start watching const chain: ChainName = 'near'; @@ -58,16 +58,15 @@ async function getTimestampByBlock(provider: Provider, blockHeight: number): Pro log.text = `Fetching blocks... ${i + 1}/${blockHashes.length}`; let success: boolean = false; while (!success) { - success = true; try { const block = await provider.block({ blockId: blockHashes[i] }); if (block.header.height > fromBlock && block.header.height <= toBlock.header.height) { blocks.push(block); } + success = true; } catch (e) { - console.log('Error fetching block', e); + console.error('Error fetching block', e); await sleep(5000); - success = false; } } } diff --git a/watcher/src/databases/BigtableDatabase.ts b/watcher/src/databases/BigtableDatabase.ts index 02c8e4cf..b6f0f85d 100644 --- a/watcher/src/databases/BigtableDatabase.ts +++ b/watcher/src/databases/BigtableDatabase.ts @@ -55,6 +55,7 @@ export class BigtableDatabase extends Database { this.firestoreDb = getFirestore(); this.pubsub = new PubSub(); } catch (e) { + this.logger.error(e); throw new Error('Could not load bigtable db'); } } diff --git a/watcher/src/utils/near.ts b/watcher/src/utils/near.ts index e6d4654f..a3008bda 100644 --- a/watcher/src/utils/near.ts +++ b/watcher/src/utils/near.ts @@ -9,6 +9,8 @@ import { Transaction, WormholePublishEventLog, } from '../types/near'; +import { BlockId, BlockResult } from 'near-api-js/lib/providers/provider'; +import { sleep } from '@wormhole-foundation/wormhole-monitor-common'; // The following is obtained by going to: https://explorer.near.org/accounts/contract.wormhole_crypto.near // and watching the network tab in the browser to see where the explorer is going. @@ -22,6 +24,32 @@ export const getNearProvider = async (rpc: string): Promise => { return provider; }; +export async function getTimestampByBlock( + provider: Provider, + blockHeight: number +): Promise { + const block: BlockResult = await fetchBlockByBlockId(provider, blockHeight); + return block.header.timestamp; +} + +export async function fetchBlockByBlockId( + provider: Provider, + blockHeight: BlockId +): Promise { + let success: boolean = false; + let block: BlockResult = {} as BlockResult; + while (!success) { + try { + block = await provider.block({ blockId: blockHeight }); + success = true; + } catch (e) { + console.error('Error fetching block', e); + await sleep(5000); + } + } + return block; +} + // This function will only return transactions in the time window. export const getTransactionsByAccountId = async ( accountId: string, @@ -42,11 +70,11 @@ export const getTransactionsByAccountId = async ( while (!done) { // using this api: https://github.com/near/near-explorer/blob/beead42ba2a91ad8d2ac3323c29b1148186eec98/backend/src/router/transaction/list.ts#L127 - console.log( - `Near explorer URL: [${NEAR_EXPLORER_TRANSACTION_URL}?batch=1&input={"0":${JSON.stringify( - params - )}}]` - ); + // console.log( + // `Near explorer URL: [${NEAR_EXPLORER_TRANSACTION_URL}?batch=1&input={"0":${JSON.stringify( + // params + // )}}]` + // ); const res = ( ( await axios.get( diff --git a/watcher/src/watchers/NearArchiveWatcher.ts b/watcher/src/watchers/NearArchiveWatcher.ts new file mode 100644 index 00000000..b966f81f --- /dev/null +++ b/watcher/src/watchers/NearArchiveWatcher.ts @@ -0,0 +1,146 @@ +import { CONTRACTS } from '@certusone/wormhole-sdk/lib/cjs/utils/consts'; +import { decode } from 'bs58'; +import { Provider } from 'near-api-js/lib/providers'; +import { BlockResult, ExecutionStatus } from 'near-api-js/lib/providers/provider'; +import ora from 'ora'; +import { z } from 'zod'; +import { VaasByBlock } from '../databases/types'; +import { makeBlockKey, makeVaaKey } from '../databases/utils'; +import { EventLog, Transaction } from '../types/near'; +import { + NEAR_ARCHIVE_RPC, + fetchBlockByBlockId, + getNearProvider, + getTimestampByBlock, + getTransactionsByAccountId, + isWormholePublishEventLog, +} from '../utils/near'; +import { Watcher } from './Watcher'; +import { sleep } from '@wormhole-foundation/wormhole-monitor-common'; + +export class NearArchiveWatcher extends Watcher { + provider: Provider | null = null; + + constructor() { + super('near'); + } + + async getFinalizedBlockNumber(): Promise { + this.logger.info(`fetching final block for ${this.chain}`); + const provider = await this.getProvider(); + const block = await provider.block({ finality: 'final' }); + return block.header.height; + } + + async getMessagesForBlocks(fromBlock: number, toBlock: number): Promise { + // assume toBlock was retrieved from getFinalizedBlockNumber and is finalized + this.logger.info(`fetching info for blocks ${fromBlock} to ${toBlock}`); + const provider = await this.getProvider(); + const fromBlockTimestamp: number = await getTimestampByBlock(provider, fromBlock); + const toBlockInfo: BlockResult = await fetchBlockByBlockId(provider, toBlock); + const transactions: Transaction[] = await getTransactionsByAccountId( + CONTRACTS.MAINNET.near.core, + this.maximumBatchSize, + fromBlockTimestamp, + toBlockInfo.header.timestamp.toString().padEnd(19, '9') // pad to nanoseconds + ); + this.logger.info(`Fetched ${transactions.length} transactions from NEAR Explorer`); + + // filter out transactions that precede last seen block + const blocks: BlockResult[] = []; + const blockHashes = [...new Set(transactions.map((tx) => tx.blockHash))]; // de-dup blocks + for (let i = 0; i < blockHashes.length; i++) { + let success: boolean = false; + while (!success) { + try { + const block = await fetchBlockByBlockId(provider, blockHashes[i]); + if (block.header.height > fromBlock && block.header.height <= toBlockInfo.header.height) { + blocks.push(block); + } + success = true; + } catch (e) { + console.error('Error fetching block', e); + await sleep(5000); + } + } + } + + this.logger.info(`Fetched ${blocks.length} blocks`); + const vaasByBlock: VaasByBlock = await getMessagesFromBlockResults(provider, blocks, true); + // Make a block for the to_block, if it isn't already there + const blockKey = makeBlockKey( + toBlockInfo.header.height.toString(), + new Date(toBlockInfo.header.timestamp / 1_000_000).toISOString() + ); + if (!vaasByBlock[blockKey]) { + vaasByBlock[blockKey] = []; + } + return vaasByBlock; + } + + async getProvider(): Promise { + return (this.provider = this.provider || (await getNearProvider(NEAR_ARCHIVE_RPC))); + } + + isValidVaaKey(key: string) { + try { + const [txHash, vaaKey] = key.split(':'); + const txHashDecoded = Buffer.from(decode(txHash)).toString('hex'); + const [_, emitter, sequence] = vaaKey.split('/'); + return ( + /^[0-9a-fA-F]{64}$/.test(z.string().parse(txHashDecoded)) && + /^[0-9a-fA-F]{64}$/.test(z.string().parse(emitter)) && + z.number().int().parse(Number(sequence)) >= 0 + ); + } catch (e) { + return false; + } + } +} + +export const getMessagesFromBlockResults = async ( + provider: Provider, + blocks: BlockResult[], + debug: boolean = false +): Promise => { + const vaasByBlock: VaasByBlock = {}; + let log: ora.Ora; + if (debug) log = ora(`Fetching messages from ${blocks.length} blocks...`).start(); + for (let i = 0; i < blocks.length; i++) { + if (debug) log!.text = `Fetching messages from block ${i + 1}/${blocks.length}...`; + const { height, timestamp } = blocks[i].header; + const blockKey = makeBlockKey(height.toString(), new Date(timestamp / 1_000_000).toISOString()); + vaasByBlock[blockKey] = []; + + const chunks = []; + for (const chunk of blocks[i].chunks) { + chunks.push(await provider.chunk(chunk.chunk_hash)); + } + + const transactions = chunks.flatMap(({ transactions }) => transactions); + for (const tx of transactions) { + const outcome = await provider.txStatus(tx.hash, CONTRACTS.MAINNET.near.core); + const logs = outcome.receipts_outcome + .filter( + ({ outcome }) => + (outcome as any).executor_id === CONTRACTS.MAINNET.near.core && + (outcome.status as ExecutionStatus).SuccessValue + ) + .flatMap(({ outcome }) => outcome.logs) + .filter((log) => log.startsWith('EVENT_JSON:')) // https://nomicon.io/Standards/EventsFormat + .map((log) => JSON.parse(log.slice(11)) as EventLog) + .filter(isWormholePublishEventLog); + for (const log of logs) { + const vaaKey = makeVaaKey(tx.hash, 'near', log.emitter, log.seq.toString()); + vaasByBlock[blockKey] = [...vaasByBlock[blockKey], vaaKey]; + } + } + } + + if (debug) { + const numMessages = Object.values(vaasByBlock).flat().length; + log!.succeed(`Fetched ${numMessages} messages from ${blocks.length} blocks`); + } + + return vaasByBlock; +}; diff --git a/watcher/src/watchers/__tests__/NearWatcher.test.ts b/watcher/src/watchers/__tests__/NearWatcher.test.ts index 7c4ad41e..d1d4181a 100644 --- a/watcher/src/watchers/__tests__/NearWatcher.test.ts +++ b/watcher/src/watchers/__tests__/NearWatcher.test.ts @@ -4,22 +4,24 @@ import { INITIAL_DEPLOYMENT_BLOCK_BY_CHAIN } from '@wormhole-foundation/wormhole import { RPCS_BY_CHAIN } from '../../consts'; import { getNearProvider, getTransactionsByAccountId, NEAR_ARCHIVE_RPC } from '../../utils/near'; import { getMessagesFromBlockResults, NearWatcher } from '../NearWatcher'; +import { NearArchiveWatcher } from '../NearArchiveWatcher'; jest.setTimeout(60000); const INITIAL_NEAR_BLOCK = Number(INITIAL_DEPLOYMENT_BLOCK_BY_CHAIN.near ?? 0); test('getFinalizedBlockNumber', async () => { - const watcher = new NearWatcher(); + const watcher = new NearArchiveWatcher(); const blockNumber = await watcher.getFinalizedBlockNumber(); expect(blockNumber).toBeGreaterThan(INITIAL_NEAR_BLOCK); }); +// No more "too old" blocks test('getMessagesForBlocks', async () => { // requests that are too old for rpc node should error, be caught, and return an empty object - const watcher = new NearWatcher(); + const watcher = new NearArchiveWatcher(); const messages = await watcher.getMessagesForBlocks(INITIAL_NEAR_BLOCK, INITIAL_NEAR_BLOCK); - expect(Object.keys(messages).length).toEqual(0); + expect(Object.keys(messages).length).toEqual(1); }); describe('getNearProvider', () => { @@ -42,6 +44,7 @@ test('getTransactionsByAccountId', async () => { let transactions = await getTransactionsByAccountId( CONTRACTS.MAINNET.near.core, 10, + 1669731480649090392, '1669732480649090392' ); expect(transactions.length).toEqual(10); @@ -51,6 +54,7 @@ test('getTransactionsByAccountId', async () => { transactions = await getTransactionsByAccountId( CONTRACTS.MAINNET.near.core, 15, + 1661429814932000000, '1661429914932000000' ); expect(transactions.length).toEqual(2); @@ -59,9 +63,9 @@ test('getTransactionsByAccountId', async () => { describe('getMessagesFromBlockResults', () => { test('with Provider', async () => { - const watcher = new NearWatcher(); + const watcher = new NearArchiveWatcher(); const provider = await watcher.getProvider(); - const messages = getMessagesFromBlockResults(provider, [ + const messages = await getMessagesFromBlockResults(provider, [ await provider.block({ finality: 'final' }), ]); expect(messages).toBeTruthy(); @@ -85,7 +89,7 @@ describe('getMessagesFromBlockResults', () => { }); // validate keys - const watcher = new NearWatcher(); + const watcher = new NearArchiveWatcher(); const blockKey = Object.keys(messages).at(-1)!; expect(watcher.isValidBlockKey(blockKey)).toBe(true); expect(watcher.isValidVaaKey(messages[blockKey][0])).toBe(true); diff --git a/watcher/src/watchers/utils.ts b/watcher/src/watchers/utils.ts index 4d0a31b7..099d7338 100644 --- a/watcher/src/watchers/utils.ts +++ b/watcher/src/watchers/utils.ts @@ -7,7 +7,6 @@ import { CosmwasmWatcher } from './CosmwasmWatcher'; import { EVMWatcher } from './EVMWatcher'; import { InjectiveExplorerWatcher } from './InjectiveExplorerWatcher'; import { MoonbeamWatcher } from './MoonbeamWatcher'; -import { NearWatcher } from './NearWatcher'; import { PolygonWatcher } from './PolygonWatcher'; import { SolanaWatcher } from './SolanaWatcher'; import { TerraExplorerWatcher } from './TerraExplorerWatcher'; @@ -15,6 +14,7 @@ import { Watcher } from './Watcher'; import { SuiWatcher } from './SuiWatcher'; import { SeiExplorerWatcher } from './SeiExplorerWatcher'; import { WormchainWatcher } from './WormchainWatcher'; +import { NearArchiveWatcher } from './NearArchiveWatcher'; export function makeFinalizedWatcher(chainName: ChainName): Watcher { if (chainName === 'solana') { @@ -44,7 +44,7 @@ export function makeFinalizedWatcher(chainName: ChainName): Watcher { } else if (chainName === 'aptos') { return new AptosWatcher(); } else if (chainName === 'near') { - return new NearWatcher(); + return new NearArchiveWatcher(); } else if (chainName === 'injective') { return new InjectiveExplorerWatcher(); } else if (chainName === 'sei') {