Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

watcher: make near archive watcher work #390

Merged
merged 1 commit into from
Nov 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion watcher/scripts/backfillArbitrum.ts
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ import { Chain, contracts } from '@wormhole-foundation/sdk-base';
const watcher = new ArbitrumWatcher('Mainnet');
for (const blockNumber of blockNumbers) {
log.text = `Fetching block ${blockNumber}`;
const vaasByBlock = await watcher.getMessagesForBlocks(blockNumber, blockNumber);
const { vaasByBlock } = await watcher.getMessagesForBlocks(blockNumber, blockNumber);
await db.storeVaasByBlock(chain, vaasByBlock);
}
log.succeed('Uploaded messages to db successfully');
Expand Down
4 changes: 3 additions & 1 deletion watcher/scripts/locateMessageGaps.ts
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,9 @@ import { ChainId, Network, toChain, toChainId } from '@wormhole-foundation/sdk-b
while (fromBlock <= rangeEnd && !found) {
const toBlock = Math.min(fromBlock + watcher.maximumBatchSize - 1, rangeEnd);
const messages = await watcher.getMessagesForBlocks(fromBlock, toBlock);
for (const message of Object.entries(messages).filter(([key, value]) => value.length > 0)) {
for (const message of Object.entries(messages.vaasByBlock).filter(
([key, value]) => value.length > 0
)) {
const locatedMessages = message[1].filter((msgKey) => {
const [_transaction, vaaKey] = msgKey.split(':');
const [_chain, msgEmitter, msgSeq] = vaaKey.split('/');
Expand Down
2 changes: 1 addition & 1 deletion watcher/src/consts.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ import { Mode } from '@wormhole-foundation/wormhole-monitor-common';
import { AxiosRequestConfig } from 'axios';

export const TIMEOUT = 0.5 * 1000;
export const HB_INTERVAL = 5 * 60 * 1000; // 5 Minutes
export const HB_INTERVAL = 15 * 60 * 1000; // 15 Minutes
export type WorkerData = {
network: Network;
chain: Chain;
Expand Down
7 changes: 5 additions & 2 deletions watcher/src/watchers/AlgorandWatcher.ts
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,10 @@ export class AlgorandWatcher extends Watcher {
return messages;
}

async getMessagesForBlocks(fromBlock: number, toBlock: number): Promise<VaasByBlock> {
async getMessagesForBlocks(
fromBlock: number,
toBlock: number
): Promise<{ vaasByBlock: VaasByBlock; optionalBlockHeight?: number }> {
const txIds = await this.getApplicationLogTransactionIds(fromBlock, toBlock);
const transactions = [];
for (const txId of txIds) {
Expand All @@ -124,6 +127,6 @@ export class AlgorandWatcher extends Watcher {
if (!vaasByBlock[toBlockKey]) {
vaasByBlock[toBlockKey] = [];
}
return vaasByBlock;
return { vaasByBlock };
}
}
7 changes: 5 additions & 2 deletions watcher/src/watchers/AptosWatcher.ts
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,10 @@ export class AptosWatcher extends Watcher {
);
}

async getMessagesForBlocks(fromSequence: number, toSequence: number): Promise<VaasByBlock> {
async getMessagesForBlocks(
fromSequence: number,
toSequence: number
): Promise<{ vaasByBlock: VaasByBlock; optionalBlockHeight?: number }> {
const limit = toSequence - fromSequence + 1;
const events: AptosEvent[] = (await this.client.getEventsByEventHandle(
this.coreBridgeAddress,
Expand All @@ -63,7 +66,7 @@ export class AptosWatcher extends Watcher {
vaasByBlock[blockKey] = [...(vaasByBlock[blockKey] ?? []), vaaKey];
})
);
return vaasByBlock;
return { vaasByBlock };
}

isValidBlockKey(key: string) {
Expand Down
7 changes: 5 additions & 2 deletions watcher/src/watchers/CosmwasmWatcher.ts
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,10 @@ export class CosmwasmWatcher extends Watcher {
throw new Error(`Unable to parse result of ${this.latestBlockTag} on ${this.rpc}`);
}

async getMessagesForBlocks(fromBlock: number, toBlock: number): Promise<VaasByBlock> {
async getMessagesForBlocks(
fromBlock: number,
toBlock: number
): Promise<{ vaasByBlock: VaasByBlock; optionalBlockHeight?: number }> {
const address = contracts.coreBridge.get(this.network, this.chain);
if (!address) {
throw new Error(`Core contract not defined for ${this.chain}`);
Expand Down Expand Up @@ -153,7 +156,7 @@ export class CosmwasmWatcher extends Watcher {
}
}
}
return vaasByBlock;
return { vaasByBlock };
}
}

Expand Down
12 changes: 10 additions & 2 deletions watcher/src/watchers/EVMWatcher.ts
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ export class EVMWatcher extends Watcher {
this.lastTimestamp = 0;
this.latestFinalizedBlockNumber = 0;
this.finalizedBlockTag = finalizedBlockTag;
// Special cases for batch size
if (chain === 'Acala' || chain === 'Karura' || chain === 'Berachain') {
this.maximumBatchSize = 50;
} else if (
Expand All @@ -55,6 +56,10 @@ export class EVMWatcher extends Watcher {
) {
this.maximumBatchSize = 10;
}
// Special cases for watch loop delay
if (chain === 'Berachain') {
this.watchLoopDelay = 1000;
}
}

async getBlock(blockNumberOrTag: number | BlockTag): Promise<Block> {
Expand Down Expand Up @@ -221,7 +226,10 @@ export class EVMWatcher extends Watcher {
return block.number;
}

async getMessagesForBlocks(fromBlock: number, toBlock: number): Promise<VaasByBlock> {
async getMessagesForBlocks(
fromBlock: number,
toBlock: number
): Promise<{ vaasByBlock: VaasByBlock; optionalBlockHeight?: number }> {
const address = contracts.coreBridge.get(this.network, this.chain);
if (!address) {
throw new Error(`Core contract not defined for ${this.chain} on ${this.network}!`);
Expand Down Expand Up @@ -252,6 +260,6 @@ export class EVMWatcher extends Watcher {
const blockKey = makeBlockKey(blockNumber.toString(), timestampsByBlock[blockNumber]);
vaasByBlock[blockKey] = [...(vaasByBlock[blockKey] || []), vaaKey];
}
return vaasByBlock;
return { vaasByBlock };
}
}
7 changes: 5 additions & 2 deletions watcher/src/watchers/InjectiveExplorerWatcher.ts
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,10 @@ export class InjectiveExplorerWatcher extends Watcher {
// should be core, but the explorer doesn't support it yet
// use "to": as the pagination key
// compare block height ("block_number":) with what is passed in.
async getMessagesForBlocks(fromBlock: number, toBlock: number): Promise<VaasByBlock> {
async getMessagesForBlocks(
fromBlock: number,
toBlock: number
): Promise<{ vaasByBlock: VaasByBlock; optionalBlockHeight?: number }> {
const coreAddress = contracts.coreBridge.get(this.network, this.chain);
const address = contracts.tokenBridge.get(this.network, this.chain);
if (!coreAddress || !address) {
Expand Down Expand Up @@ -169,7 +172,7 @@ export class InjectiveExplorerWatcher extends Watcher {
);
vaasByBlock[blockKey] = [];
}
return vaasByBlock;
return { vaasByBlock };
}
}

Expand Down
120 changes: 107 additions & 13 deletions watcher/src/watchers/NearArchiveWatcher.ts
Original file line number Diff line number Diff line change
@@ -1,27 +1,29 @@
import { decode } from 'bs58';
import { Provider } from 'near-api-js/lib/providers';
import { BlockResult } from 'near-api-js/lib/providers/provider';
import { BlockResult, ExecutionStatus } from 'near-api-js/lib/providers/provider';
import { z } from 'zod';
import { VaasByBlock } from '../databases/types';
import { makeBlockKey } from '../databases/utils';
import { makeBlockKey, makeVaaKey } from '../databases/utils';
import {
fetchBlockByBlockId,
getMessagesFromBlockResults,
getNearProvider,
getTimestampByBlock,
isWormholePublishEventLog,
} from '../utils/near';
import { Watcher } from './Watcher';
import { assertEnvironmentVariable, sleep } from '@wormhole-foundation/wormhole-monitor-common';
import { Network, contracts } from '@wormhole-foundation/sdk-base';
import axios from 'axios';
import { AXIOS_CONFIG_JSON } from '../consts';
import { AXIOS_CONFIG_JSON, HB_INTERVAL } from '../consts';
import { EventLog } from 'src/types/near';

export class NearArchiveWatcher extends Watcher {
provider: Provider | null = null;

constructor(network: Network) {
super(network, 'Near');
this.maximumBatchSize = 1000;
this.maximumBatchSize = 1_000_000;
this.watchLoopDelay = 60 * 60 * 1000; // 1 hour
}

async getFinalizedBlockNumber(): Promise<number> {
Expand All @@ -37,7 +39,11 @@ export class NearArchiveWatcher extends Watcher {
}
}

async getMessagesForBlocks(fromBlock: number, toBlock: number): Promise<VaasByBlock> {
async getMessagesForBlocks(
fromBlock: number,
toBlock: number
): Promise<{ vaasByBlock: VaasByBlock; optionalBlockHeight?: number }> {
const quittingTimestamp = Date.now() + HB_INTERVAL * 0.75;
const origFromBlock = fromBlock;
const origToBlock = toBlock;
this.logger.info(`fetching info for blocks ${origFromBlock} to ${origToBlock}`);
Expand Down Expand Up @@ -114,21 +120,26 @@ export class NearArchiveWatcher extends Watcher {
}

this.logger.info(`Fetched ${blocks.length} blocks`);
const vaasByBlock: VaasByBlock = await getMessagesFromBlockResults(
const response: ConstrainedResponse = await this.getMessagesFromBlockResultsConstrained(
this.network,
provider,
blocks,
true
quittingTimestamp
);
// This is the case where there are no transactions in the time window.
if (response.lastBlockHeight === 0) {
response.lastBlockHeight = toBlock;
}
const lastBlockInfo = await fetchBlockByBlockId(provider, response.lastBlockHeight);
// Make a block for the to_block, if it isn't already there
const blockKey = makeBlockKey(
toBlockInfo.header.height.toString(),
new Date(toBlockInfo.header.timestamp / 1_000_000).toISOString()
response.lastBlockHeight.toString(),
new Date(lastBlockInfo.header.timestamp / 1_000_000).toISOString()
);
if (!vaasByBlock[blockKey]) {
vaasByBlock[blockKey] = [];
if (!response.vaasByBlock[blockKey]) {
response.vaasByBlock[blockKey] = [];
}
return vaasByBlock;
return response;
}

async getProvider(): Promise<Provider> {
Expand Down Expand Up @@ -191,7 +202,90 @@ export class NearArchiveWatcher extends Watcher {
}
return txs.reverse();
}

async getMessagesFromBlockResultsConstrained(
network: Network,
provider: Provider,
blocks: BlockResult[],
quittingTime: number
): Promise<ConstrainedResponse> {
const vaasByBlock: VaasByBlock = {};
let lastBlockHeight = 0;
let prevLastBlockHeight = 0;
this.logger.debug(`Fetching messages from ${blocks.length} blocks...`);
try {
for (let i = 0; i < blocks.length; i++) {
this.logger.debug(`Fetching messages from block ${i + 1}/${blocks.length}...`);
const { height, timestamp } = blocks[i].header;
prevLastBlockHeight = lastBlockHeight;
lastBlockHeight = height;
const blockKey = makeBlockKey(
height.toString(),
new Date(timestamp / 1_000_000).toISOString()
);
let localVaasByBlock: VaasByBlock = {};
localVaasByBlock[blockKey] = [];

const chunks = [];
this.logger.debug('attempting to fetch chunks');
for (const chunk of blocks[i].chunks) {
chunks.push(await provider.chunk(chunk.chunk_hash));
}

const transactions = chunks.flatMap(({ transactions }) => transactions);
const coreBridge = contracts.coreBridge.get(network, 'Near');
if (!coreBridge) {
throw new Error('Unable to get contract address for Near');
}
this.logger.debug(`attempting to fetch ${transactions.length} transactions`);
const totTx = transactions.length;
let txCount = 1;
for (const tx of transactions) {
this.logger.debug(`fetching transaction ${txCount}/${totTx}`);
txCount++;
const outcome = await provider.txStatus(tx.hash, coreBridge);
const logs = outcome.receipts_outcome
.filter(
({ outcome }) =>
(outcome as any).executor_id === coreBridge &&
(outcome.status as ExecutionStatus).SuccessValue
)
.flatMap(({ outcome }) => outcome.logs)
.filter((log) => log.startsWith('EVENT_JSON:')) // https://nomicon.io/Standards/EventsFormat
.map((log) => JSON.parse(log.slice(11)) as EventLog)
.filter(isWormholePublishEventLog);
for (const log of logs) {
const vaaKey = makeVaaKey(tx.hash, 'Near', log.emitter, log.seq.toString());
localVaasByBlock[blockKey] = [...localVaasByBlock[blockKey], vaaKey];
}
}
this.logger.debug(
`Fetched ${localVaasByBlock[blockKey].length} messages from block ${blockKey}`
);
vaasByBlock[blockKey] = localVaasByBlock[blockKey];
if (Date.now() >= quittingTime) {
this.logger.warn(`Quitting early due to time constraint.`);
break;
}
}
} catch (e) {
this.logger.error(`Near block getMessagesFromBlockResultsConstrained error: ${e}`);
this.logger.warn(`Quitting early due to error.`);
lastBlockHeight = prevLastBlockHeight;
}

const numMessages = Object.values(vaasByBlock).flat().length;
this.logger.debug(`Fetched ${numMessages} messages from ${blocks.length} blocks`);

return { vaasByBlock, lastBlockHeight };
}
}

type ConstrainedResponse = {
vaasByBlock: VaasByBlock;
lastBlockHeight: number;
};

type GetTransactionsByAccountIdResponse = {
txns: NearTxn[];
};
Expand Down
9 changes: 7 additions & 2 deletions watcher/src/watchers/NearWatcher.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,10 @@ export class NearWatcher extends Watcher {
return block.header.height;
}

async getMessagesForBlocks(fromBlock: number, toBlock: number): Promise<VaasByBlock> {
async getMessagesForBlocks(
fromBlock: number,
toBlock: number
): Promise<{ vaasByBlock: VaasByBlock; optionalBlockHeight?: number }> {
// assume toBlock was retrieved from getFinalizedBlockNumber and is finalized
this.logger.info(`fetching info for blocks ${fromBlock} to ${toBlock}`);
const provider = await this.getProvider();
Expand All @@ -48,7 +51,9 @@ export class NearWatcher extends Watcher {
}
}

return getMessagesFromBlockResults(this.network, provider, blocks);
return {
vaasByBlock: await getMessagesFromBlockResults(this.network, provider, blocks),
};
}

async getProvider(): Promise<Provider> {
Expand Down
7 changes: 5 additions & 2 deletions watcher/src/watchers/SeiExplorerWatcher.ts
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,10 @@ export class SeiExplorerWatcher extends CosmwasmWatcher {

// retrieve blocks for core contract
// compare block height with what is passed in
async getMessagesForBlocks(fromBlock: number, toBlock: number): Promise<VaasByBlock> {
async getMessagesForBlocks(
fromBlock: number,
toBlock: number
): Promise<{ vaasByBlock: VaasByBlock; optionalBlockHeight?: number }> {
const address = contracts.coreBridge.get(this.network, this.chain);
if (!address) {
throw new Error(`Core contract not defined for ${this.chain}`);
Expand Down Expand Up @@ -237,6 +240,6 @@ export class SeiExplorerWatcher extends CosmwasmWatcher {
}
// NOTE: this does not set an empty entry for the latest block since we don't know if the graphql response
// is synced with the block height. Therefore, the latest block will only update when a new transaction appears.
return vaasByBlock;
return { vaasByBlock };
}
}
7 changes: 5 additions & 2 deletions watcher/src/watchers/SolanaWatcher.ts
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,10 @@ export class SolanaWatcher extends Watcher {
return block;
}

async getMessagesForBlocks(fromSlot: number, toSlot: number): Promise<VaasByBlock> {
async getMessagesForBlocks(
fromSlot: number,
toSlot: number
): Promise<{ vaasByBlock: VaasByBlock; optionalBlockHeight?: number }> {
// in the rare case of maximumBatchSize skipped blocks in a row,
// you might hit this error due to the recursion below
if (fromSlot > toSlot) throw new Error('solana: invalid block range');
Expand Down Expand Up @@ -251,7 +254,7 @@ export class SolanaWatcher extends Watcher {
toSlot.toString(),
new Date(toBlock.blockTime! * 1000).toISOString()
);
return { [lastBlockKey]: [], ...vaasByBlock };
return { vaasByBlock: { [lastBlockKey]: [], ...vaasByBlock } };
}

isValidVaaKey(key: string) {
Expand Down
Loading