Skip to content

Commit

Permalink
watcher: add nearArchiveWatcher
Browse files Browse the repository at this point in the history
  • Loading branch information
panoel committed Oct 11, 2023
1 parent 594501b commit 10c26a9
Show file tree
Hide file tree
Showing 6 changed files with 200 additions and 22 deletions.
17 changes: 8 additions & 9 deletions watcher/scripts/backfillNear.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,12 @@ import {
import { BlockResult, Provider } from 'near-api-js/lib/providers/provider';
import ora from 'ora';
import { initDb, makeBlockKey } from '../src/databases/utils';
import { getNearProvider, getTransactionsByAccountId, NEAR_ARCHIVE_RPC } from '../src/utils/near';
import {
getNearProvider,
getTimestampByBlock,
getTransactionsByAccountId,
NEAR_ARCHIVE_RPC,
} from '../src/utils/near';
import { getMessagesFromBlockResults } from '../src/watchers/NearWatcher';
import { Transaction } from '../src/types/near';
import { VaasByBlock } from '../src/databases/types';
Expand All @@ -23,11 +28,6 @@ import { VaasByBlock } from '../src/databases/types';

const BATCH_SIZE = 100;

async function getTimestampByBlock(provider: Provider, blockHeight: number): Promise<number> {
const block: BlockResult = await provider.block({ blockId: blockHeight });
return block.header.timestamp;
}

(async () => {
const db = initDb(false); // Don't start watching
const chain: ChainName = 'near';
Expand Down Expand Up @@ -58,16 +58,15 @@ async function getTimestampByBlock(provider: Provider, blockHeight: number): Pro
log.text = `Fetching blocks... ${i + 1}/${blockHashes.length}`;
let success: boolean = false;
while (!success) {
success = true;
try {
const block = await provider.block({ blockId: blockHashes[i] });
if (block.header.height > fromBlock && block.header.height <= toBlock.header.height) {
blocks.push(block);
}
success = true;
} catch (e) {
console.log('Error fetching block', e);
console.error('Error fetching block', e);
await sleep(5000);
success = false;
}
}
}
Expand Down
1 change: 1 addition & 0 deletions watcher/src/databases/BigtableDatabase.ts
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ export class BigtableDatabase extends Database {
this.firestoreDb = getFirestore();
this.pubsub = new PubSub();
} catch (e) {
this.logger.error(e);
throw new Error('Could not load bigtable db');
}
}
Expand Down
38 changes: 33 additions & 5 deletions watcher/src/utils/near.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ import {
Transaction,
WormholePublishEventLog,
} from '../types/near';
import { BlockId, BlockResult } from 'near-api-js/lib/providers/provider';
import { sleep } from '@wormhole-foundation/wormhole-monitor-common';

// The following is obtained by going to: https://explorer.near.org/accounts/contract.wormhole_crypto.near
// and watching the network tab in the browser to see where the explorer is going.
Expand All @@ -22,6 +24,32 @@ export const getNearProvider = async (rpc: string): Promise<Provider> => {
return provider;
};

export async function getTimestampByBlock(
provider: Provider,
blockHeight: number
): Promise<number> {
const block: BlockResult = await fetchBlockByBlockId(provider, blockHeight);
return block.header.timestamp;
}

export async function fetchBlockByBlockId(
provider: Provider,
blockHeight: BlockId
): Promise<BlockResult> {
let success: boolean = false;
let block: BlockResult = {} as BlockResult;
while (!success) {
try {
block = await provider.block({ blockId: blockHeight });
success = true;
} catch (e) {
console.error('Error fetching block', e);
await sleep(5000);
}
}
return block;
}

// This function will only return transactions in the time window.
export const getTransactionsByAccountId = async (
accountId: string,
Expand All @@ -42,11 +70,11 @@ export const getTransactionsByAccountId = async (

while (!done) {
// using this api: https://github.com/near/near-explorer/blob/beead42ba2a91ad8d2ac3323c29b1148186eec98/backend/src/router/transaction/list.ts#L127
console.log(
`Near explorer URL: [${NEAR_EXPLORER_TRANSACTION_URL}?batch=1&input={"0":${JSON.stringify(
params
)}}]`
);
// console.log(
// `Near explorer URL: [${NEAR_EXPLORER_TRANSACTION_URL}?batch=1&input={"0":${JSON.stringify(
// params
// )}}]`
// );
const res = (
(
await axios.get(
Expand Down
146 changes: 146 additions & 0 deletions watcher/src/watchers/NearArchiveWatcher.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,146 @@
import { CONTRACTS } from '@certusone/wormhole-sdk/lib/cjs/utils/consts';
import { decode } from 'bs58';
import { Provider } from 'near-api-js/lib/providers';
import { BlockResult, ExecutionStatus } from 'near-api-js/lib/providers/provider';
import ora from 'ora';
import { z } from 'zod';
import { VaasByBlock } from '../databases/types';
import { makeBlockKey, makeVaaKey } from '../databases/utils';
import { EventLog, Transaction } from '../types/near';
import {
NEAR_ARCHIVE_RPC,
fetchBlockByBlockId,
getNearProvider,
getTimestampByBlock,
getTransactionsByAccountId,
isWormholePublishEventLog,
} from '../utils/near';
import { Watcher } from './Watcher';
import { sleep } from '@wormhole-foundation/wormhole-monitor-common';

export class NearArchiveWatcher extends Watcher {
provider: Provider | null = null;

constructor() {
super('near');
}

async getFinalizedBlockNumber(): Promise<number> {
this.logger.info(`fetching final block for ${this.chain}`);
const provider = await this.getProvider();
const block = await provider.block({ finality: 'final' });
return block.header.height;
}

async getMessagesForBlocks(fromBlock: number, toBlock: number): Promise<VaasByBlock> {
// assume toBlock was retrieved from getFinalizedBlockNumber and is finalized
this.logger.info(`fetching info for blocks ${fromBlock} to ${toBlock}`);
const provider = await this.getProvider();
const fromBlockTimestamp: number = await getTimestampByBlock(provider, fromBlock);
const toBlockInfo: BlockResult = await fetchBlockByBlockId(provider, toBlock);
const transactions: Transaction[] = await getTransactionsByAccountId(
CONTRACTS.MAINNET.near.core,
this.maximumBatchSize,
fromBlockTimestamp,
toBlockInfo.header.timestamp.toString().padEnd(19, '9') // pad to nanoseconds
);
this.logger.info(`Fetched ${transactions.length} transactions from NEAR Explorer`);

// filter out transactions that precede last seen block
const blocks: BlockResult[] = [];
const blockHashes = [...new Set(transactions.map((tx) => tx.blockHash))]; // de-dup blocks
for (let i = 0; i < blockHashes.length; i++) {
let success: boolean = false;
while (!success) {
try {
const block = await fetchBlockByBlockId(provider, blockHashes[i]);
if (block.header.height > fromBlock && block.header.height <= toBlockInfo.header.height) {
blocks.push(block);
}
success = true;
} catch (e) {
console.error('Error fetching block', e);
await sleep(5000);
}
}
}

this.logger.info(`Fetched ${blocks.length} blocks`);
const vaasByBlock: VaasByBlock = await getMessagesFromBlockResults(provider, blocks, true);
// Make a block for the to_block, if it isn't already there
const blockKey = makeBlockKey(
toBlockInfo.header.height.toString(),
new Date(toBlockInfo.header.timestamp / 1_000_000).toISOString()
);
if (!vaasByBlock[blockKey]) {
vaasByBlock[blockKey] = [];
}
return vaasByBlock;
}

async getProvider(): Promise<Provider> {
return (this.provider = this.provider || (await getNearProvider(NEAR_ARCHIVE_RPC)));
}

isValidVaaKey(key: string) {
try {
const [txHash, vaaKey] = key.split(':');
const txHashDecoded = Buffer.from(decode(txHash)).toString('hex');
const [_, emitter, sequence] = vaaKey.split('/');
return (
/^[0-9a-fA-F]{64}$/.test(z.string().parse(txHashDecoded)) &&
/^[0-9a-fA-F]{64}$/.test(z.string().parse(emitter)) &&
z.number().int().parse(Number(sequence)) >= 0
);
} catch (e) {
return false;
}
}
}

export const getMessagesFromBlockResults = async (
provider: Provider,
blocks: BlockResult[],
debug: boolean = false
): Promise<VaasByBlock> => {
const vaasByBlock: VaasByBlock = {};
let log: ora.Ora;
if (debug) log = ora(`Fetching messages from ${blocks.length} blocks...`).start();
for (let i = 0; i < blocks.length; i++) {
if (debug) log!.text = `Fetching messages from block ${i + 1}/${blocks.length}...`;
const { height, timestamp } = blocks[i].header;
const blockKey = makeBlockKey(height.toString(), new Date(timestamp / 1_000_000).toISOString());
vaasByBlock[blockKey] = [];

const chunks = [];
for (const chunk of blocks[i].chunks) {
chunks.push(await provider.chunk(chunk.chunk_hash));
}

const transactions = chunks.flatMap(({ transactions }) => transactions);
for (const tx of transactions) {
const outcome = await provider.txStatus(tx.hash, CONTRACTS.MAINNET.near.core);
const logs = outcome.receipts_outcome
.filter(
({ outcome }) =>
(outcome as any).executor_id === CONTRACTS.MAINNET.near.core &&
(outcome.status as ExecutionStatus).SuccessValue
)
.flatMap(({ outcome }) => outcome.logs)
.filter((log) => log.startsWith('EVENT_JSON:')) // https://nomicon.io/Standards/EventsFormat
.map((log) => JSON.parse(log.slice(11)) as EventLog)
.filter(isWormholePublishEventLog);
for (const log of logs) {
const vaaKey = makeVaaKey(tx.hash, 'near', log.emitter, log.seq.toString());
vaasByBlock[blockKey] = [...vaasByBlock[blockKey], vaaKey];
}
}
}

if (debug) {
const numMessages = Object.values(vaasByBlock).flat().length;
log!.succeed(`Fetched ${numMessages} messages from ${blocks.length} blocks`);
}

return vaasByBlock;
};
16 changes: 10 additions & 6 deletions watcher/src/watchers/__tests__/NearWatcher.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,22 +4,24 @@ import { INITIAL_DEPLOYMENT_BLOCK_BY_CHAIN } from '@wormhole-foundation/wormhole
import { RPCS_BY_CHAIN } from '../../consts';
import { getNearProvider, getTransactionsByAccountId, NEAR_ARCHIVE_RPC } from '../../utils/near';
import { getMessagesFromBlockResults, NearWatcher } from '../NearWatcher';
import { NearArchiveWatcher } from '../NearArchiveWatcher';

jest.setTimeout(60000);

const INITIAL_NEAR_BLOCK = Number(INITIAL_DEPLOYMENT_BLOCK_BY_CHAIN.near ?? 0);

test('getFinalizedBlockNumber', async () => {
const watcher = new NearWatcher();
const watcher = new NearArchiveWatcher();
const blockNumber = await watcher.getFinalizedBlockNumber();
expect(blockNumber).toBeGreaterThan(INITIAL_NEAR_BLOCK);
});

// No more "too old" blocks
test('getMessagesForBlocks', async () => {
// requests that are too old for rpc node should error, be caught, and return an empty object
const watcher = new NearWatcher();
const watcher = new NearArchiveWatcher();
const messages = await watcher.getMessagesForBlocks(INITIAL_NEAR_BLOCK, INITIAL_NEAR_BLOCK);
expect(Object.keys(messages).length).toEqual(0);
expect(Object.keys(messages).length).toEqual(1);
});

describe('getNearProvider', () => {
Expand All @@ -42,6 +44,7 @@ test('getTransactionsByAccountId', async () => {
let transactions = await getTransactionsByAccountId(
CONTRACTS.MAINNET.near.core,
10,
1669731480649090392,
'1669732480649090392'
);
expect(transactions.length).toEqual(10);
Expand All @@ -51,6 +54,7 @@ test('getTransactionsByAccountId', async () => {
transactions = await getTransactionsByAccountId(
CONTRACTS.MAINNET.near.core,
15,
1661429814932000000,
'1661429914932000000'
);
expect(transactions.length).toEqual(2);
Expand All @@ -59,9 +63,9 @@ test('getTransactionsByAccountId', async () => {

describe('getMessagesFromBlockResults', () => {
test('with Provider', async () => {
const watcher = new NearWatcher();
const watcher = new NearArchiveWatcher();
const provider = await watcher.getProvider();
const messages = getMessagesFromBlockResults(provider, [
const messages = await getMessagesFromBlockResults(provider, [
await provider.block({ finality: 'final' }),
]);
expect(messages).toBeTruthy();
Expand All @@ -85,7 +89,7 @@ describe('getMessagesFromBlockResults', () => {
});

// validate keys
const watcher = new NearWatcher();
const watcher = new NearArchiveWatcher();
const blockKey = Object.keys(messages).at(-1)!;
expect(watcher.isValidBlockKey(blockKey)).toBe(true);
expect(watcher.isValidVaaKey(messages[blockKey][0])).toBe(true);
Expand Down
4 changes: 2 additions & 2 deletions watcher/src/watchers/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,14 @@ import { CosmwasmWatcher } from './CosmwasmWatcher';
import { EVMWatcher } from './EVMWatcher';
import { InjectiveExplorerWatcher } from './InjectiveExplorerWatcher';
import { MoonbeamWatcher } from './MoonbeamWatcher';
import { NearWatcher } from './NearWatcher';
import { PolygonWatcher } from './PolygonWatcher';
import { SolanaWatcher } from './SolanaWatcher';
import { TerraExplorerWatcher } from './TerraExplorerWatcher';
import { Watcher } from './Watcher';
import { SuiWatcher } from './SuiWatcher';
import { SeiExplorerWatcher } from './SeiExplorerWatcher';
import { WormchainWatcher } from './WormchainWatcher';
import { NearArchiveWatcher } from './NearArchiveWatcher';

export function makeFinalizedWatcher(chainName: ChainName): Watcher {
if (chainName === 'solana') {
Expand Down Expand Up @@ -44,7 +44,7 @@ export function makeFinalizedWatcher(chainName: ChainName): Watcher {
} else if (chainName === 'aptos') {
return new AptosWatcher();
} else if (chainName === 'near') {
return new NearWatcher();
return new NearArchiveWatcher();
} else if (chainName === 'injective') {
return new InjectiveExplorerWatcher();
} else if (chainName === 'sei') {
Expand Down

0 comments on commit 10c26a9

Please sign in to comment.