From e1951eccc1888a5d1d098bfb47bf36c7bf31d81c Mon Sep 17 00:00:00 2001 From: Paul Noel Date: Wed, 6 Mar 2024 11:37:45 -0600 Subject: [PATCH] watcher: add NTT watcher --- common/src/consts.ts | 23 + database/ntt-lifecycle-schema.sql | 18 + package-lock.json | 18 + watcher/package.json | 1 + watcher/scripts/backfill.ts | 2 +- watcher/scripts/backfillNear.ts | 2 +- watcher/src/NTTConsts.ts | 172 ++++ watcher/src/databases/BigtableDatabase.ts | 20 +- watcher/src/databases/Database.ts | 5 +- watcher/src/databases/__tests__/utils.test.ts | 6 +- watcher/src/databases/utils.ts | 18 +- watcher/src/index.ts | 17 +- watcher/src/watchers/NTTArbitrumWatcher.ts | 136 ++++ watcher/src/watchers/NTTPayloads.ts | 184 +++++ watcher/src/watchers/NTTWatcher.ts | 767 ++++++++++++++++++ watcher/src/watchers/Watcher.ts | 27 +- .../__tests__/CosmwasmWatcher.test.ts | 2 +- watcher/src/watchers/utils.ts | 21 + 18 files changed, 1413 insertions(+), 26 deletions(-) create mode 100644 database/ntt-lifecycle-schema.sql create mode 100644 watcher/src/NTTConsts.ts create mode 100644 watcher/src/watchers/NTTArbitrumWatcher.ts create mode 100644 watcher/src/watchers/NTTPayloads.ts create mode 100644 watcher/src/watchers/NTTWatcher.ts diff --git a/common/src/consts.ts b/common/src/consts.ts index f297f7c8..5117f226 100644 --- a/common/src/consts.ts +++ b/common/src/consts.ts @@ -101,6 +101,20 @@ export const INITIAL_DEPLOYMENT_BLOCK_BY_NETWORK_AND_CHAIN: { ['devnet']: {}, }; +export const INITIAL_NTT_DEPLOYMENT_BLOCK_BY_NETWORK_AND_CHAIN: { + [key in Environment]: { [key in ChainName]?: string }; +} = { + ['mainnet']: {}, + ['testnet']: { + solana: '284788472', + sepolia: '5472203', + arbitrum_sepolia: '22501243', + base_sepolia: '7249669', + optimism_sepolia: '9232548', + }, + ['devnet']: {}, +}; + export const TOKEN_BRIDGE_EMITTERS: { [key in ChainName]?: string } = { solana: 'ec7372995d5cc8732397fb0ad35c0121e0eaa90d26f828a534cab54391b3a4f5', ethereum: '0000000000000000000000003ee18b2214aff97000d974cf647e7c347e8fa585', @@ -177,6 +191,15 @@ export const CIRCLE_DOMAIN_TO_CHAIN_ID: { [key: number]: ChainId } = { 6: CHAIN_ID_BASE, 7: CHAIN_ID_POLYGON, }; + +// TODO: This should be needed by processVaa.ts, if we go down that path +export const NTT_EMITTERS: { [key in ChainName]?: string } = { + // TODO: add NTT emitters +}; + +export const isNTTEmitter = (chain: ChainId | ChainName, emitter: string) => + NTT_EMITTERS[coalesceChainName(chain)]?.toLowerCase() === emitter.toLowerCase(); + export type CHAIN_INFO = { name: string; evm: boolean; diff --git a/database/ntt-lifecycle-schema.sql b/database/ntt-lifecycle-schema.sql new file mode 100644 index 00000000..02ec715e --- /dev/null +++ b/database/ntt-lifecycle-schema.sql @@ -0,0 +1,18 @@ + +CREATE TABLE life_cycle ( + from_chain INTEGER, + to_chain INTEGER, + from_token VARCHAR(96), + token_amount DECIMAL(78, 0), + transfer_sent_txhash VARCHAR(96), + redeemed_txhash VARCHAR(96), + ntt_transfer_key VARCHAR(256), + vaa_id VARCHAR(128), + digest VARCHAR(96) NOT NULL, + transfer_time TIMESTAMP, + redeem_time TIMESTAMP, + inbound_transfer_queued_time TIMESTAMP, + outbound_transfer_queued_time TIMESTAMP, + outbound_transfer_rate_limited_time TIMESTAMP, + PRIMARY KEY (digest) +); diff --git a/package-lock.json b/package-lock.json index 86021fdc..d6cebe36 100644 --- a/package-lock.json +++ b/package-lock.json @@ -26423,6 +26423,7 @@ "@google-cloud/pubsub": "^3.4.1", "@mysten/sui.js": "^0.33.0", "@solana/web3.js": "^1.73.0", + "@types/bn.js": "^5.1.5", "@wormhole-foundation/wormhole-monitor-common": "^0.0.1", "algosdk": "^2.4.0", "aptos": "^1.4.0", @@ -26885,6 +26886,14 @@ "@sinonjs/commons": "^1.7.0" } }, + "watcher/node_modules/@types/bn.js": { + "version": "5.1.5", + "resolved": "https://registry.npmjs.org/@types/bn.js/-/bn.js-5.1.5.tgz", + "integrity": "sha512-V46N0zwKRF5Q00AZ6hWtN0T8gGmDUaUzLWQvHFo5yThtVwK/VCenFY3wXVbOvNfajEpsTfQM4IN9k/d6gUVX3A==", + "dependencies": { + "@types/node": "*" + } + }, "watcher/node_modules/@types/node": { "version": "12.20.55", "resolved": "https://registry.npmjs.org/@types/node/-/node-12.20.55.tgz", @@ -33416,6 +33425,7 @@ "@jest/globals": "^29.3.1", "@mysten/sui.js": "^0.33.0", "@solana/web3.js": "^1.73.0", + "@types/bn.js": "^5.1.5", "@wormhole-foundation/wormhole-monitor-common": "^0.0.1", "algosdk": "^2.4.0", "aptos": "^1.4.0", @@ -33754,6 +33764,14 @@ "@sinonjs/commons": "^1.7.0" } }, + "@types/bn.js": { + "version": "5.1.5", + "resolved": "https://registry.npmjs.org/@types/bn.js/-/bn.js-5.1.5.tgz", + "integrity": "sha512-V46N0zwKRF5Q00AZ6hWtN0T8gGmDUaUzLWQvHFo5yThtVwK/VCenFY3wXVbOvNfajEpsTfQM4IN9k/d6gUVX3A==", + "requires": { + "@types/node": "*" + } + }, "@types/node": { "version": "12.20.55", "resolved": "https://registry.npmjs.org/@types/node/-/node-12.20.55.tgz", diff --git a/watcher/package.json b/watcher/package.json index 7d6857e4..184ec929 100644 --- a/watcher/package.json +++ b/watcher/package.json @@ -32,6 +32,7 @@ "@google-cloud/pubsub": "^3.4.1", "@mysten/sui.js": "^0.33.0", "@solana/web3.js": "^1.73.0", + "@types/bn.js": "^5.1.5", "@wormhole-foundation/wormhole-monitor-common": "^0.0.1", "algosdk": "^2.4.0", "aptos": "^1.4.0", diff --git a/watcher/scripts/backfill.ts b/watcher/scripts/backfill.ts index c9de7a35..a52c8db0 100644 --- a/watcher/scripts/backfill.ts +++ b/watcher/scripts/backfill.ts @@ -33,7 +33,7 @@ import { VaasByBlock } from '../src/databases/types'; const lastBlockEntries = Object.entries(localDb.lastBlockByChain); for (const [chain, blockKey] of lastBlockEntries) { console.log('backfilling last block for', chain, blockKey); - await remoteDb.storeLatestBlock(coalesceChainName(Number(chain) as ChainId), blockKey); + await remoteDb.storeLatestBlock(coalesceChainName(Number(chain) as ChainId), blockKey, false); await sleep(500); } })(); diff --git a/watcher/scripts/backfillNear.ts b/watcher/scripts/backfillNear.ts index c526a1ab..8c9718ef 100644 --- a/watcher/scripts/backfillNear.ts +++ b/watcher/scripts/backfillNear.ts @@ -36,7 +36,7 @@ const BATCH_SIZE = 100; const chain: ChainName = 'near'; const provider = await getNearProvider(network, NEAR_ARCHIVE_RPC); const fromBlock = Number( - (await db.getLastBlockByChain(chain)) ?? + (await db.getLastBlockByChain(chain, false)) ?? INITIAL_DEPLOYMENT_BLOCK_BY_NETWORK_AND_CHAIN[network][chain] ?? 0 ); diff --git a/watcher/src/NTTConsts.ts b/watcher/src/NTTConsts.ts new file mode 100644 index 00000000..90ea8f37 --- /dev/null +++ b/watcher/src/NTTConsts.ts @@ -0,0 +1,172 @@ +import { ChainName, keccak256 } from '@certusone/wormhole-sdk'; +import { Environment } from '@wormhole-foundation/wormhole-monitor-common'; +import { NativeTokenTransfer, NttManagerMessage } from './watchers/NTTPayloads'; + +// +// The following are from IRateLimiterEvents.sol +// + +/// @notice Emitted when an inbound transfer is queued +/// @dev Topic0 +/// 0x7f63c9251d82a933210c2b6d0b0f116252c3c116788120e64e8e8215df6f3162. +/// @param digest The digest of the message. +/// event InboundTransferQueued(bytes32 digest); +export const InboundTransferQueuedTopic = + '0x7f63c9251d82a933210c2b6d0b0f116252c3c116788120e64e8e8215df6f3162'; + +/// @notice Emitted when an outbound transfer is queued. +/// @dev Topic0 +/// 0x69add1952a6a6b9cb86f04d05f0cb605cbb469a50ae916139d34495a9991481f. +/// @param queueSequence The location of the transfer in the queue. +/// event OutboundTransferQueued(uint64 queueSequence); +export const OutboundTransferQueuedTopic = + '0x69add1952a6a6b9cb86f04d05f0cb605cbb469a50ae916139d34495a9991481f'; + +/// @notice Emitted when an outbound transfer is rate limited. +/// @dev Topic0 +/// 0x754d657d1363ee47d967b415652b739bfe96d5729ccf2f26625dcdbc147db68b. +/// @param sender The initial sender of the transfer. +/// @param amount The amount to be transferred. +/// @param currentCapacity The capacity left for transfers within the 24-hour window.:w +/// event OutboundTransferRateLimited( address indexed sender, uint64 sequence, uint256 amount, uint256 currentCapacity); +export const OutboundTransferRateLimitedTopic = + '0x754d657d1363ee47d967b415652b739bfe96d5729ccf2f26625dcdbc147db68b'; + +// +// The following are from INttManagerEvents.sol +// + +/// @notice Emitted when a message is sent from the nttManager. +/// @dev Topic0 +/// 0x9716fe52fe4e02cf924ae28f19f5748ef59877c6496041b986fbad3dae6a8ecf +/// @param recipient The recipient of the message. +/// @param amount The amount transferred. +/// @param fee The amount of ether sent along with the tx to cover the delivery fee. +/// @param recipientChain The chain ID of the recipient. +/// @param msgSequence The unique sequence ID of the message. +/// event TransferSent( bytes32 recipient, uint256 amount, uint256 fee, uint16 recipientChain, uint64 msgSequence); +export const TransferSentTopic = + '0x9716fe52fe4e02cf924ae28f19f5748ef59877c6496041b986fbad3dae6a8ecf'; + +/// @notice Emitted when the peer contract is updated. +/// @dev Topic0 +/// 0x51b8437a7e22240c473f4cbdb4ed3a4f4bf5a9e7b3c511d7cfe0197325735700. +/// @param chainId_ The chain ID of the peer contract. +/// @param oldPeerContract The old peer contract address. +/// @param peerContract The new peer contract address. +/// event PeerUpdated(uint16 indexed chainId_, bytes32 oldPeerContract, bytes32 peerContract); +export const PeerUpdatedTopic = + '0x51b8437a7e22240c473f4cbdb4ed3a4f4bf5a9e7b3c511d7cfe0197325735700'; + +/// @notice Emitted when a message has been attested to. +/// @dev Topic0 +/// 0x35a2101eaac94b493e0dfca061f9a7f087913fde8678e7cde0aca9897edba0e5. +/// @param digest The digest of the message. +/// @param transceiver The address of the transceiver. +/// @param index The index of the transceiver in the bitmap. +/// event MessageAttestedTo(bytes32 digest, address transceiver, uint8 index); +export const MessageAttestedToTopic = + '0x35a2101eaac94b493e0dfca061f9a7f087913fde8678e7cde0aca9897edba0e5'; + +/// @notice Emmitted when the threshold required transceivers is changed. +/// @dev Topic0 +/// 0x2a855b929b9a53c6fb5b5ed248b27e502b709c088e036a5aa17620c8fc5085a9. +/// @param oldThreshold The old threshold. +/// @param threshold The new threshold. +/// event ThresholdChanged(uint8 oldThreshold, uint8 threshold); +export const ThresholdChangedTopic = + '0x2a855b929b9a53c6fb5b5ed248b27e502b709c088e036a5aa17620c8fc5085a9'; + +/// @notice Emitted when an transceiver is removed from the nttManager. +/// @dev Topic0 +/// 0xc6289e62021fd0421276d06677862d6b328d9764cdd4490ca5ac78b173f25883. +/// @param transceiver The address of the transceiver. +/// @param transceiversNum The current number of transceivers. +/// @param threshold The current threshold of transceivers. +/// event TransceiverAdded(address transceiver, uint256 transceiversNum, uint8 threshold); +export const TransceiverAddedTopic = + '0xc6289e62021fd0421276d06677862d6b328d9764cdd4490ca5ac78b173f25883'; + +/// @notice Emitted when an transceiver is removed from the nttManager. +/// @dev Topic0 +/// 0x638e631f34d9501a3ff0295873b29f50d0207b5400bf0e48b9b34719e6b1a39e. +/// @param transceiver The address of the transceiver. +/// @param threshold The current threshold of transceivers. +/// event TransceiverRemoved(address transceiver, uint8 threshold); +export const TransceiverRemovedTopic = + '0x638e631f34d9501a3ff0295873b29f50d0207b5400bf0e48b9b34719e6b1a39e'; + +/// @notice Emitted when a message has already been executed to +/// notify client of against retries. +/// @dev Topic0 +/// 0x4069dff8c9df7e38d2867c0910bd96fd61787695e5380281148c04932d02bef2. +/// @param sourceNttManager The address of the source nttManager. +/// @param msgHash The keccak-256 hash of the message. +/// event MessageAlreadyExecuted(bytes32 indexed sourceNttManager, bytes32 indexed msgHash); +export const MessageAlreadyExecutedTopic = + '0x4069dff8c9df7e38d2867c0910bd96fd61787695e5380281148c04932d02bef2'; + +/// @notice Emitted when a transfer has been redeemed +/// (either minted or unlocked on the recipient chain). +/// @dev Topic0 +/// 0x504e6efe18ab9eed10dc6501a417f5b12a2f7f2b1593aed9b89f9bce3cf29a91. +/// @param Topic1 +/// digest The digest of the message. +/// event TransferRedeemed(bytes32 indexed digest); +export const TransferRedeemedTopic = + '0x504e6efe18ab9eed10dc6501a417f5b12a2f7f2b1593aed9b89f9bce3cf29a91'; + +// All topics: +export const NTT_TOPICS = [ + InboundTransferQueuedTopic, + OutboundTransferQueuedTopic, + OutboundTransferRateLimitedTopic, + TransferSentTopic, + PeerUpdatedTopic, + MessageAttestedToTopic, + ThresholdChangedTopic, + TransceiverAddedTopic, + TransceiverRemovedTopic, + MessageAlreadyExecutedTopic, + TransferRedeemedTopic, +]; + +export const NTT_CONTRACT: { [key in Environment]: { [key in ChainName]?: string } } = { + ['mainnet']: {}, + ['testnet']: { + solana: 'nTTh3bZ5Aer6xboWZe39RDEft4MeVxSQ8D1EYAVLZw9', + sepolia: '0xB231aD95f2301bc82eA44c515001F0F746D637e0', + arbitrum_sepolia: '0xEec94CD3083e067398256a79CcA7e740C5c8ef81', + base_sepolia: '0xB03b030b2f5B40819Df76467d67eD1C85Ff66fAD', + optimism_sepolia: '0x7f430D4e7939D994C0955A01FC75D9DE33F12D11', + }, + ['devnet']: {}, +}; + +export const getNttManagerMessageDigest = ( + emitterChain: number, + message: NttManagerMessage +): string => { + const chainIdBuffer = Buffer.alloc(2); + chainIdBuffer.writeUInt16BE(emitterChain); + const serialized = NttManagerMessage.serialize(message, NativeTokenTransfer.serialize); + const digest = keccak256(Buffer.concat([chainIdBuffer, serialized])); + return digest.toString('hex'); +}; + +export type LifeCycle = { + srcChainId: number; + destChainId: number; + sourceToken: string; + tokenAmount: bigint; + transferSentTxhash: string; + redeemedTxhash: string; + nttTransferKey: string; + vaaId: string; + digest: string; + transferTime: string; + redeemTime: string; + inboundTransferQueuedTime: string; + outboundTransferQueuedTime: string; + outboundTransferRateLimitedTime: string; +}; diff --git a/watcher/src/databases/BigtableDatabase.ts b/watcher/src/databases/BigtableDatabase.ts index b6f0f85d..c8e9a655 100644 --- a/watcher/src/databases/BigtableDatabase.ts +++ b/watcher/src/databases/BigtableDatabase.ts @@ -36,6 +36,7 @@ export class BigtableDatabase extends Database { bigtable: Bigtable; firestoreDb: FirebaseFirestore.Firestore; latestCollectionName: string; + latestNTTCollectionName: string; pubsubSignedVAATopic: string; pubsub: PubSub; constructor() { @@ -45,6 +46,7 @@ export class BigtableDatabase extends Database { this.vaasByTxHashTableId = assertEnvironmentVariable('BIGTABLE_VAAS_BY_TX_HASH_TABLE_ID'); this.instanceId = assertEnvironmentVariable('BIGTABLE_INSTANCE_ID'); this.latestCollectionName = assertEnvironmentVariable('FIRESTORE_LATEST_COLLECTION'); + this.latestNTTCollectionName = assertEnvironmentVariable('FIRESTORE_LATEST_NTT_COLLECTION'); this.pubsubSignedVAATopic = assertEnvironmentVariable('PUBSUB_SIGNED_VAA_TOPIC'); try { this.bigtable = new Bigtable(); @@ -60,11 +62,11 @@ export class BigtableDatabase extends Database { } } - async getLastBlockByChain(chain: ChainName): Promise { + async getLastBlockByChain(chain: ChainName, isNTT: boolean): Promise { const chainId = coalesceChainId(chain); - const lastObservedBlock = this.firestoreDb - .collection(this.latestCollectionName) - .doc(chainId.toString()); + const lastObservedBlock = isNTT + ? this.firestoreDb.collection(this.latestNTTCollectionName).doc(chainId.toString()) + : this.firestoreDb.collection(this.latestCollectionName).doc(chainId.toString()); const lastObservedBlockByChain = await lastObservedBlock.get(); const blockKeyData = lastObservedBlockByChain.data(); const lastBlockKey = blockKeyData?.lastBlockKey; @@ -76,16 +78,16 @@ export class BigtableDatabase extends Database { return null; } - async storeLatestBlock(chain: ChainName, lastBlockKey: string): Promise { + async storeLatestBlock(chain: ChainName, lastBlockKey: string, isNTT: boolean): Promise { if (this.firestoreDb === undefined) { this.logger.error('no firestore db set'); return; } const chainId = coalesceChainId(chain); this.logger.info(`storing last block=${lastBlockKey} for chain=${chainId}`); - const lastObservedBlock = this.firestoreDb - .collection(this.latestCollectionName) - .doc(`${chainId.toString()}`); + const lastObservedBlock = isNTT + ? this.firestoreDb.collection(this.latestNTTCollectionName).doc(`${chainId.toString()}`) + : this.firestoreDb.collection(this.latestCollectionName).doc(`${chainId.toString()}`); await lastObservedBlock.set({ lastBlockKey }); } @@ -156,7 +158,7 @@ export class BigtableDatabase extends Database { if (blockKeys.length) { const lastBlockKey = blockKeys[blockKeys.length - 1]; this.logger.info(`for chain=${chain}, storing last bigtable block=${lastBlockKey}`); - await this.storeLatestBlock(chain, lastBlockKey); + await this.storeLatestBlock(chain, lastBlockKey, false); } } } diff --git a/watcher/src/databases/Database.ts b/watcher/src/databases/Database.ts index f0401ff2..72379f4e 100644 --- a/watcher/src/databases/Database.ts +++ b/watcher/src/databases/Database.ts @@ -14,10 +14,13 @@ export class Database { } return filteredVaasByBlock; } - async getLastBlockByChain(chain: ChainName): Promise { + async getLastBlockByChain(chain: ChainName, isNTT: boolean): Promise { throw new Error('Not Implemented'); } async storeVaasByBlock(chain: ChainName, vaasByBlock: VaasByBlock): Promise { throw new Error('Not Implemented'); } + async storeLatestBlock(chain: ChainName, lastBlockKey: string, isNTT: boolean): Promise { + throw new Error('Not Implemented'); + } } diff --git a/watcher/src/databases/__tests__/utils.test.ts b/watcher/src/databases/__tests__/utils.test.ts index 766e9e8b..c37c25e1 100644 --- a/watcher/src/databases/__tests__/utils.test.ts +++ b/watcher/src/databases/__tests__/utils.test.ts @@ -11,13 +11,13 @@ test('getResumeBlockByChain', async () => { db.lastBlockByChain = { [CHAIN_ID_SOLANA]: blockKey }; // if a chain is in the database, that number should be returned expect(await db.getLastBlockByChain('solana')).toEqual(fauxBlock); - expect(await getResumeBlockByChain('mainnet', 'solana')).toEqual(Number(fauxBlock) + 1); + expect(await getResumeBlockByChain('mainnet', 'solana', false)).toEqual(Number(fauxBlock) + 1); // if a chain is not in the database, the initial deployment block should be returned expect(INITIAL_DEPLOYMENT_BLOCK_BY_NETWORK_AND_CHAIN['mainnet'].moonbeam).toBeDefined(); - expect(await getResumeBlockByChain('mainnet', 'moonbeam')).toEqual( + expect(await getResumeBlockByChain('mainnet', 'moonbeam', false)).toEqual( Number(INITIAL_DEPLOYMENT_BLOCK_BY_NETWORK_AND_CHAIN['mainnet'].moonbeam) ); // if neither, null should be returned expect(INITIAL_DEPLOYMENT_BLOCK_BY_NETWORK_AND_CHAIN['mainnet'].unset).toBeUndefined(); - expect(await getResumeBlockByChain('mainnet', 'unset')).toEqual(null); + expect(await getResumeBlockByChain('mainnet', 'unset', false)).toEqual(null); }); diff --git a/watcher/src/databases/utils.ts b/watcher/src/databases/utils.ts index 6f3c8d95..2dfc66f8 100644 --- a/watcher/src/databases/utils.ts +++ b/watcher/src/databases/utils.ts @@ -2,6 +2,7 @@ import { ChainId, ChainName, coalesceChainId } from '@certusone/wormhole-sdk/lib import { Environment, INITIAL_DEPLOYMENT_BLOCK_BY_NETWORK_AND_CHAIN, + INITIAL_NTT_DEPLOYMENT_BLOCK_BY_NETWORK_AND_CHAIN, MAX_UINT_64, padUint16, padUint64, @@ -74,12 +75,23 @@ export const initDb = (startWatching: boolean = true): Database => { return database; }; +export const storeLatestBlock = async ( + chain: ChainName, + lastBlockKey: string, + isNTT: boolean +): Promise => { + return database.storeLatestBlock(chain, lastBlockKey, isNTT); +}; + export const getResumeBlockByChain = async ( network: Environment, - chain: ChainName + chain: ChainName, + isNTT: boolean ): Promise => { - const lastBlock = await database.getLastBlockByChain(chain); - const initialBlock = INITIAL_DEPLOYMENT_BLOCK_BY_NETWORK_AND_CHAIN[network][chain]; + const lastBlock = await database.getLastBlockByChain(chain, isNTT); + const initialBlock = isNTT + ? INITIAL_NTT_DEPLOYMENT_BLOCK_BY_NETWORK_AND_CHAIN[network][chain] + : INITIAL_DEPLOYMENT_BLOCK_BY_NETWORK_AND_CHAIN[network][chain]; return lastBlock !== null ? Number(lastBlock) + 1 : initialBlock !== undefined diff --git a/watcher/src/index.ts b/watcher/src/index.ts index 439b4d95..81873ece 100644 --- a/watcher/src/index.ts +++ b/watcher/src/index.ts @@ -3,7 +3,7 @@ dotenv.config(); import { ChainName } from '@certusone/wormhole-sdk/lib/cjs/utils/consts'; import { initDb } from './databases/utils'; -import { makeFinalizedWatcher } from './watchers/utils'; +import { makeFinalizedNTTWatcher, makeFinalizedWatcher } from './watchers/utils'; import { Environment, getEnvironment } from '@wormhole-foundation/wormhole-monitor-common'; initDb(); @@ -81,3 +81,18 @@ const supportedChains: ChainName[] = for (const chain of supportedChains) { makeFinalizedWatcher(network, chain).watch(); } + +const supportedNTTChains: ChainName[] = + network === 'testnet' + ? [ + //'solana', + 'sepolia', + 'arbitrum_sepolia', + 'base_sepolia', + 'optimism_sepolia', + ] + : []; + +for (const chain of supportedNTTChains) { + makeFinalizedNTTWatcher(network, chain).watch(); +} diff --git a/watcher/src/watchers/NTTArbitrumWatcher.ts b/watcher/src/watchers/NTTArbitrumWatcher.ts new file mode 100644 index 00000000..66fcbf5f --- /dev/null +++ b/watcher/src/watchers/NTTArbitrumWatcher.ts @@ -0,0 +1,136 @@ +import axios from 'axios'; +import { AXIOS_CONFIG_JSON, RPCS_BY_CHAIN } from '../consts'; +import { EVMWatcher } from './EVMWatcher'; +import { Environment } from '@wormhole-foundation/wormhole-monitor-common'; +import { NTTWatcher } from './NTTWatcher'; + +export class NTTArbitrumWatcher extends NTTWatcher { + rpc: string | undefined; + nttWatcher: EVMWatcher; + latestL2Finalized: number; + l1L2Map: Map; + lastEthTime: number; + + constructor(network: Environment) { + if (network === 'mainnet') { + super(network, 'arbitrum'); + } else { + super(network, 'arbitrum_sepolia'); + } + + this.rpc = RPCS_BY_CHAIN[this.network][this.chain]; + if (!this.rpc) { + throw new Error(`${this.chain} RPC is not defined!`); + } + this.nttWatcher = + network === 'mainnet' + ? new NTTWatcher(network, 'ethereum', 'finalized') + : new NTTWatcher(network, 'sepolia', 'finalized'); + this.latestL2Finalized = 0; + this.l1L2Map = new Map(); + this.lastEthTime = 0; + this.maximumBatchSize = 1000; + } + + async getFinalizedBlockNumber(): Promise { + if (!this.rpc) { + throw new Error(`${this.chain} RPC is not defined!`); + } + + // This gets the latest L2 block so we can get the associated L1 block number + const l1Result: BlockByNumberResult = ( + await axios.post( + this.rpc, + [ + { + jsonrpc: '2.0', + id: 1, + method: 'eth_getBlockByNumber', + params: ['latest', false], + }, + ], + AXIOS_CONFIG_JSON + ) + )?.data?.[0]?.result; + if (!l1Result || !l1Result.l1BlockNumber || !l1Result.number) { + throw new Error( + `Unable to parse result of ArbitrumWatcher::eth_getBlockByNumber for latest on ${this.rpc}` + ); + } + const associatedL1: number = parseInt(l1Result.l1BlockNumber, 16); + const l2BlkNum: number = parseInt(l1Result.number, 16); + this.logger.debug( + 'getFinalizedBlockNumber() checking map L1Block: ' + associatedL1 + ' => L2Block: ' + l2BlkNum + ); + + // Only update the map, if the L2 block number is newer + const inMapL2 = this.l1L2Map.get(associatedL1); + if (!inMapL2 || inMapL2 < l2BlkNum) { + this.logger.debug( + `Updating map with ${associatedL1} => ${l2BlkNum}, size = ${this.l1L2Map.size}` + ); + this.l1L2Map.set(associatedL1, l2BlkNum); + } + + // Only check every 30 seconds + const now = Date.now(); + if (now - this.lastEthTime < 30_000) { + return this.latestL2Finalized; + } + this.lastEthTime = now; + + // Get the latest finalized L1 block number + const evmFinal = await this.nttWatcher.getFinalizedBlockNumber(); + this.logger.debug(`Finalized EVM block number = ${evmFinal}`); + + this.logger.debug('Size of map = ' + this.l1L2Map.size); + // Walk the map looking for finalized L2 block number + for (let [l1, l2] of this.l1L2Map) { + if (l1 <= evmFinal) { + this.latestL2Finalized = l2; + this.logger.debug(`Removing key ${l1} from map`); + this.l1L2Map.delete(l1); + } + } + + this.logger.debug(`LatestL2Finalized = ${this.latestL2Finalized}`); + return this.latestL2Finalized; + } + + // This function is only used in test code. + getFirstMapEntry(): number[] { + if (this.l1L2Map.size > 0) { + for (let [l1, l2] of this.l1L2Map) { + return [l1, l2]; + } + } + return [0, 0]; + } +} + +type BlockByNumberResult = { + baseFeePerGas: string; + difficulty: string; + extraData: string; + gasLimit: string; + gasUsed: string; + hash: string; + l1BlockNumber: string; + logsBloom: string; + miner: string; + mixHash: string; + nonce: string; + number: string; + parentHash: string; + receiptsRoot: string; + sendCount: string; + sendRoot: string; + sha3Uncles: string; + size: string; + stateRoot: string; + timestamp: string; + totalDifficulty: string; + transactions: string[]; + transactionsRoot: string; + uncles: string[]; +}; diff --git a/watcher/src/watchers/NTTPayloads.ts b/watcher/src/watchers/NTTPayloads.ts new file mode 100644 index 00000000..707c10be --- /dev/null +++ b/watcher/src/watchers/NTTPayloads.ts @@ -0,0 +1,184 @@ +// +// This file was copied from the example-native-token-transfers repo +// File: solana/ts/sdk/payloads/common.ts +// + +import BN from 'bn.js'; + +export class TransceiverMessage { + static prefix: Buffer; + sourceNttManager: Buffer; + recipientNttManager: Buffer; + ntt_managerPayload: NttManagerMessage; + transceiverPayload: Buffer; + + constructor( + sourceNttManager: Buffer, + recipientNttManager: Buffer, + ntt_managerPayload: NttManagerMessage, + transceiverPayload: Buffer + ) { + this.sourceNttManager = sourceNttManager; + this.recipientNttManager = recipientNttManager; + this.ntt_managerPayload = ntt_managerPayload; + this.transceiverPayload = transceiverPayload; + } + + static deserialize( + data: Buffer, + deserializer: (data: Buffer) => NttManagerMessage + ): TransceiverMessage { + if (this.prefix == undefined) { + throw new Error('Unknown prefix.'); + } + const prefix = data.subarray(0, 4); + if (!prefix.equals(this.prefix)) { + throw new Error('Invalid transceiver prefix'); + } + const sourceNttManager = data.subarray(4, 36); + const recipientNttManager = data.subarray(36, 68); + const ntt_managerPayloadLen = data.readUInt16BE(68); + const ntt_managerPayload = deserializer(data.subarray(70, 70 + ntt_managerPayloadLen)); + const transceiverPayloadLen = data.readUInt16BE(70 + ntt_managerPayloadLen); + const transceiverPayload = data.subarray( + 72 + ntt_managerPayloadLen, + 72 + ntt_managerPayloadLen + transceiverPayloadLen + ); + return new TransceiverMessage( + sourceNttManager, + recipientNttManager, + ntt_managerPayload, + transceiverPayload + ); + } + + static serialize( + msg: TransceiverMessage, + serializer: (payload: NttManagerMessage) => Buffer + ): Buffer { + const payload = serializer(msg.ntt_managerPayload); + if (msg.sourceNttManager.length != 32) { + throw new Error('sourceNttManager must be 32 bytes'); + } + if (msg.recipientNttManager.length != 32) { + throw new Error('recipientNttManager must be 32 bytes'); + } + const payloadLen = new BN(payload.length).toBuffer('be', 2); + const transceiverPayloadLen = new BN(msg.transceiverPayload.length).toBuffer('be', 2); + const buffer = Buffer.concat([ + this.prefix, + msg.sourceNttManager, + msg.recipientNttManager, + payloadLen, + payload, + transceiverPayloadLen, + msg.transceiverPayload, + ]); + return buffer; + } +} + +export class NttManagerMessage { + id: Buffer; + sender: Buffer; + payload: A; + + constructor(id: Buffer, sender: Buffer, payload: A) { + if (id.length != 32) { + throw new Error('id must be 32 bytes'); + } + if (sender.length != 32) { + throw new Error('sender must be 32 bytes'); + } + this.id = id; + this.sender = sender; + this.payload = payload; + } + + static deserialize = ( + data: Buffer, + deserializer: (data: Buffer) => A + ): NttManagerMessage => { + const id = data.subarray(0, 32); + const sender = data.subarray(32, 64); + const payloadLen = data.readUint16BE(64); + const payload = deserializer(data.subarray(66, 66 + payloadLen)); + return new NttManagerMessage(id, sender, payload); + }; + + static serialize = (msg: NttManagerMessage, serializer: (payload: A) => Buffer): Buffer => { + const payload = serializer(msg.payload); + return Buffer.concat([msg.id, msg.sender, new BN(payload.length).toBuffer('be', 2), payload]); + }; +} + +export class WormholeTransceiverMessage extends TransceiverMessage { + static prefix = Buffer.from([0x99, 0x45, 0xff, 0x10]); +} + +export class NativeTokenTransfer { + static prefix = Buffer.from([0x99, 0x4e, 0x54, 0x54]); + trimmedAmount: TrimmedAmount; + sourceToken: Buffer; + recipientAddress: Buffer; + recipientChain: number; + + constructor( + sourceToken: Buffer, + amount: TrimmedAmount, + recipientChain: number, + recipientAddress: Buffer + ) { + this.trimmedAmount = amount; + this.sourceToken = sourceToken; + this.recipientAddress = recipientAddress; + this.recipientChain = recipientChain; + } + + static deserialize = (data: Buffer): NativeTokenTransfer => { + const prefix = data.subarray(0, 4); + if (!prefix.equals(NativeTokenTransfer.prefix)) { + throw new Error('Invalid NTT prefix'); + } + const amount = TrimmedAmount.deserialize(data.subarray(4, 13)); + const sourceToken = data.subarray(13, 45); + const recipientAddress = data.subarray(45, 77); + const recipientChain = data.readUInt16BE(77); + return new NativeTokenTransfer(sourceToken, amount, recipientChain, recipientAddress); + }; + + static serialize = (msg: NativeTokenTransfer): Buffer => { + const buffer = Buffer.concat([ + NativeTokenTransfer.prefix, + TrimmedAmount.serialize(msg.trimmedAmount), + msg.sourceToken, + msg.recipientAddress, + ]); + const recipientChain = Buffer.alloc(2); + recipientChain.writeUInt16BE(msg.recipientChain, 0); + return Buffer.concat([buffer, recipientChain]); + }; +} + +export class TrimmedAmount { + amount: bigint; + decimals: number; + + constructor(amount: bigint, decimals: number) { + this.amount = amount; + this.decimals = decimals; + } + + static deserialize(data: Buffer): TrimmedAmount { + const decimals = data.readUInt8(0); + const amount = data.readBigUInt64BE(1); + return new TrimmedAmount(amount, decimals); + } + + static serialize(amount: TrimmedAmount): Buffer { + const buffer = Buffer.alloc(9); + buffer.writeUInt8(amount.decimals, 0); + buffer.writeBigUInt64BE(amount.amount, 1); + return buffer; + } +} diff --git a/watcher/src/watchers/NTTWatcher.ts b/watcher/src/watchers/NTTWatcher.ts new file mode 100644 index 00000000..a66ae93f --- /dev/null +++ b/watcher/src/watchers/NTTWatcher.ts @@ -0,0 +1,767 @@ +import { Implementation__factory } from '@certusone/wormhole-sdk/lib/cjs/ethers-contracts/factories/Implementation__factory'; +import { + CONTRACTS, + ChainName, + Contracts, + EVMChainName, + coalesceChainId, +} from '@certusone/wormhole-sdk/lib/cjs/utils/consts'; +import { Log } from '@ethersproject/abstract-provider'; +import axios from 'axios'; +import { BigNumber } from 'ethers'; +import { AXIOS_CONFIG_JSON, RPCS_BY_CHAIN } from '../consts'; +import { makeBlockKey } from '../databases/utils'; +import { Watcher } from './Watcher'; +import { + Environment, + assertEnvironmentVariable, +} from '@wormhole-foundation/wormhole-monitor-common'; +import { + InboundTransferQueuedTopic, + LifeCycle, + NTT_CONTRACT, + NTT_TOPICS, + OutboundTransferQueuedTopic, + OutboundTransferRateLimitedTopic, + TransferRedeemedTopic, + TransferSentTopic, + getNttManagerMessageDigest, +} from '../NTTConsts'; +import { NativeTokenTransfer, NttManagerMessage, WormholeTransceiverMessage } from './NTTPayloads'; +import { RELAYER_CONTRACTS, parseWormholeLog } from '@certusone/wormhole-sdk/lib/cjs/relayer'; +import { Firestore } from 'firebase-admin/firestore'; +import knex, { Knex } from 'knex'; +import { WormholeLogger } from '../utils/logger'; + +export const LOG_MESSAGE_PUBLISHED_TOPIC = + '0x6eb224fb001ed210e379b335e35efe88672a8ce935d981a6896b27ffdf52a3b2'; +export const wormholeInterface = Implementation__factory.createInterface(); + +export type BlockTag = 'finalized' | 'safe' | 'latest'; +export type Block = { + hash: string; + number: number; + timestamp: number; +}; +export type ErrorBlock = { + code: number; //6969, + message: string; //'Error: No response received from RPC endpoint in 60s' +}; + +export class NTTWatcher extends Watcher { + finalizedBlockTag: BlockTag; + lastTimestamp: number; + latestFinalizedBlockNumber: number; + lifecycleMap: Map; // digest -> lifecycle + pg: Knex; + + constructor(network: Environment, chain: EVMChainName, finalizedBlockTag: BlockTag = 'latest') { + super(network, chain, true); + this.lastTimestamp = 0; + this.latestFinalizedBlockNumber = 0; + this.finalizedBlockTag = finalizedBlockTag; + this.lifecycleMap = new Map(); + this.pg = knex({ + client: 'pg', + connection: { + user: assertEnvironmentVariable('PG_NTT_USER'), + password: assertEnvironmentVariable('PG_NTT_PASSWORD'), + database: assertEnvironmentVariable('PG_NTT_DATABASE'), + host: assertEnvironmentVariable('PG_NTT_HOST'), + port: Number(assertEnvironmentVariable('PG_NTT_PORT')), + }, + }); + this.logger.debug('NTTWatcher', network, chain, finalizedBlockTag); + } + + async getBlock(blockNumberOrTag: number | BlockTag): Promise { + const rpc = RPCS_BY_CHAIN[this.network][this.chain]; + if (!rpc) { + throw new Error(`${this.chain} RPC is not defined!`); + } + let result = ( + await axios.post( + rpc, + [ + { + jsonrpc: '2.0', + id: 1, + method: 'eth_getBlockByNumber', + params: [ + typeof blockNumberOrTag === 'number' + ? `0x${blockNumberOrTag.toString(16)}` + : blockNumberOrTag, + false, + ], + }, + ], + AXIOS_CONFIG_JSON + ) + )?.data?.[0]; + if (result && result.result === null) { + // Found null block + if ( + typeof blockNumberOrTag === 'number' && + blockNumberOrTag < this.latestFinalizedBlockNumber - 1000 + ) { + return { + hash: '', + number: BigNumber.from(blockNumberOrTag).toNumber(), + timestamp: BigNumber.from(this.lastTimestamp).toNumber(), + }; + } + } else if (result && result.error && result.error.code === 6969) { + return { + hash: '', + number: BigNumber.from(blockNumberOrTag).toNumber(), + timestamp: BigNumber.from(this.lastTimestamp).toNumber(), + }; + } + result = result?.result; + if (result && result.hash && result.number && result.timestamp) { + // Convert to Ethers compatible type + this.lastTimestamp = result.timestamp; + return { + hash: result.hash, + number: BigNumber.from(result.number).toNumber(), + timestamp: BigNumber.from(result.timestamp).toNumber(), + }; + } + throw new Error( + `Unable to parse result of eth_getBlockByNumber for ${blockNumberOrTag} on ${rpc}` + ); + } + async getBlocks(fromBlock: number, toBlock: number): Promise { + const rpc = RPCS_BY_CHAIN[this.network][this.chain]; + if (!rpc) { + throw new Error(`${this.chain} RPC is not defined!`); + } + const reqs: any[] = []; + for (let blockNumber = fromBlock; blockNumber <= toBlock; blockNumber++) { + reqs.push({ + jsonrpc: '2.0', + id: (blockNumber - fromBlock).toString(), + method: 'eth_getBlockByNumber', + params: [`0x${blockNumber.toString(16)}`, false], + }); + } + const results = (await axios.post(rpc, reqs, AXIOS_CONFIG_JSON))?.data; + if (results && results.length) { + // Convert to Ethers compatible type + return results.map( + (response: undefined | { result?: Block; error?: ErrorBlock }, idx: number) => { + // Karura is getting 6969 errors for some blocks, so we'll just return empty blocks for those instead of throwing an error. + // We take the timestamp from the previous block, which is not ideal but should be fine. + if ( + (response && + response.result === null && + fromBlock + idx < this.latestFinalizedBlockNumber - 1000) || + (response?.error && response.error?.code && response.error.code === 6969) + ) { + return { + hash: '', + number: BigNumber.from(fromBlock + idx).toNumber(), + timestamp: BigNumber.from(this.lastTimestamp).toNumber(), + }; + } + if ( + response?.result && + response.result?.hash && + response.result.number && + response.result.timestamp + ) { + this.lastTimestamp = response.result.timestamp; + return { + hash: response.result.hash, + number: BigNumber.from(response.result.number).toNumber(), + timestamp: BigNumber.from(response.result.timestamp).toNumber(), + }; + } + this.logger.error(reqs[idx], response, idx); + throw new Error( + `Unable to parse result of eth_getBlockByNumber for ${fromBlock + idx} on ${rpc}` + ); + } + ); + } + throw new Error( + `Unable to parse result of eth_getBlockByNumber for range ${fromBlock}-${toBlock} on ${rpc}` + ); + } + async getLogs( + fromBlock: number, + toBlock: number, + address: string, + topics: string[] + ): Promise> { + const rpc = RPCS_BY_CHAIN[this.network][this.chain]; + if (!rpc) { + throw new Error(`${this.chain} RPC is not defined!`); + } + const result = ( + await axios.post( + rpc, + [ + { + jsonrpc: '2.0', + id: 1, + method: 'eth_getLogs', + params: [ + { + fromBlock: `0x${fromBlock.toString(16)}`, + toBlock: `0x${toBlock.toString(16)}`, + address, + topics, + }, + ], + }, + ], + AXIOS_CONFIG_JSON + ) + )?.data?.[0]?.result; + if (result) { + // Convert to Ethers compatible type + return result.map((l: Log) => ({ + ...l, + blockNumber: BigNumber.from(l.blockNumber).toNumber(), + transactionIndex: BigNumber.from(l.transactionIndex).toNumber(), + logIndex: BigNumber.from(l.logIndex).toNumber(), + })); + } + throw new Error(`Unable to parse result of eth_getLogs for ${fromBlock}-${toBlock} on ${rpc}`); + } + + async getFinalizedBlockNumber(): Promise { + this.logger.info(`fetching block ${this.finalizedBlockTag}`); + const block: Block = await this.getBlock(this.finalizedBlockTag); + this.latestFinalizedBlockNumber = block.number; + return block.number; + } + + async getNttMessagesForBlocks(fromBlock: number, toBlock: number): Promise { + const nttAddress = NTT_CONTRACT[this.network][this.chain]; + if (!nttAddress) { + throw new Error(`NTT manager contract not defined for ${this.network}`); + } + const contracts: Contracts = + this.network === 'mainnet' + ? CONTRACTS.MAINNET[this.chain] + : this.network === 'testnet' + ? CONTRACTS.TESTNET[this.chain] + : CONTRACTS.DEVNET[this.chain]; + const address = contracts.core; + if (!address) { + throw new Error(`Core contract not defined for ${this.chain}`); + } + // Get and filter logs + const logs: Log[] = (await this.getLogs(fromBlock, toBlock, nttAddress, [])).filter(isNTTEvent); + const timestampsByBlock: { [block: number]: string } = {}; + // fetch timestamps for each block + this.logger.info(`fetching info for blocks ${fromBlock} to ${toBlock}`); + const blocks = await this.getBlocks(fromBlock, toBlock); + for (const block of blocks) { + const timestamp = new Date(block.timestamp * 1000).toISOString(); + timestampsByBlock[block.number] = timestamp; + } + this.logger.info(`processing ${logs.length} logs`); + let newEntry: boolean = false; + for (const log of logs) { + this.logger.debug('log:', log); + const blockNumber = log.blockNumber; + const txhash = log.transactionHash; + this.logger.debug(`blockNumber: ${blockNumber}, txhash: ${txhash}`); + if (log.topics[0] === TransferSentTopic) { + this.logger.debug('***********TransferSentTopic***************'); + const decodedTransfer: decodedTransferSent = decodeNttTransferSent(log.data); + this.logger.debug('decodedTransfer:', decodedTransfer); + if (decodedTransfer.recipient === '') { + this.logger.error('Could not decode transfer'); + continue; + } + const nttTransferKey = makeNttTransferKey( + nttAddress, + decodedTransfer.recipient, + decodedTransfer.msgSequence + ); + const coreLogs = await this.getLogs(blockNumber, blockNumber, address, [ + LOG_MESSAGE_PUBLISHED_TOPIC, + ]); + for (const coreLog of coreLogs) { + if (coreLog.transactionHash !== txhash) { + this.logger.error( + `Mismatched transaction hashes: ${coreLog.transactionHash} vs ${txhash}` + ); + continue; + } + // this.logger.debug('coreLog:', coreLog); + let emitter = coreLog.topics[1].slice(2); + // this.logger.debug(`emitter: ${emitter}`); + // If this emitter is a relayer, parse differently + if (isRelayer(this.network, this.chain, emitter)) { + this.logger.debug('Relayer detected'); + let { + args: { sequence, payload }, + } = wormholeInterface.parseLog(coreLog); + const vaaId = makeVaaId(coalesceChainId(this.chain), emitter, sequence); + // this.logger.debug('payload:', payload); + // Strip off leading 0x, if present + if (payload.startsWith('0x')) { + payload = payload.slice(2); + } + let { type, parsed } = parseWormholeLog(coreLog); + // this.logger.debug('type:', type); + // this.logger.debug('parsed:', parsed); + let payloadBuffer; + if (typeof parsed === 'string') { + // this.logger.debug('parsed is a string'); + payloadBuffer = Buffer.from(parsed, 'hex'); + } else if ('payload' in parsed) { + // this.logger.debug('parsed is an object'); + payloadBuffer = parsed.payload; + // this.logger.debug('payloadBuffer:', payloadBuffer.toString('hex')); + } else { + this.logger.error('Could not parse payload'); + continue; + } + // this.logger.debug('payloadBuffer:', payloadBuffer); + // This payload is a transceiver message + // Use the payload to create a digest + try { + const transceiverMessage = WormholeTransceiverMessage.deserialize( + payloadBuffer, + (a) => NttManagerMessage.deserialize(a, NativeTokenTransfer.deserialize) + ); + const calculatedDigest = getNttManagerMessageDigest( + coalesceChainId(this.chain), + transceiverMessage.ntt_managerPayload + ); + const sourceToken: string = + transceiverMessage.ntt_managerPayload.payload.sourceToken.toString('hex'); + const lc: LifeCycle = { + srcChainId: coalesceChainId(this.chain), + destChainId: decodedTransfer.recipientChain, + sourceToken, + tokenAmount: BigInt(decodedTransfer.amount), + transferSentTxhash: txhash.startsWith('0x') ? txhash.slice(2) : txhash, + redeemedTxhash: '', + nttTransferKey, + vaaId, + digest: calculatedDigest, + transferTime: timestampsByBlock[blockNumber], + redeemTime: '', + inboundTransferQueuedTime: '', + outboundTransferQueuedTime: '', + outboundTransferRateLimitedTime: '', + }; + this.lifecycleMap.set(calculatedDigest, lc); + // await saveToFirestore(lc); // Here for testing + await saveToPG(this.pg, lc, TransferSentTopic, this.logger); + newEntry = true; + this.logger.debug( + `For txhash ${txhash}, correlating nttTransferKey ${nttTransferKey} to vaaId ${vaaId} and digest ${calculatedDigest}` + ); + } catch (e) { + this.logger.error('Error:', e); + } + } else { + this.logger.debug('Not a relayer'); + let { + args: { sequence, payload }, + } = wormholeInterface.parseLog(coreLog); + const vaaId = makeVaaId(coalesceChainId(this.chain), emitter, sequence); + // this.logger.debug('payload:', payload); + // Strip off leading 0x, if present + if (payload.startsWith('0x')) { + payload = payload.slice(2); + } + const payloadBuffer = Buffer.from(payload, 'hex'); + // this.logger.debug('payloadBuffer:', payloadBuffer); + // This payload is a transceiver message + // Use the payload to create a digest + try { + const transceiverMessage = WormholeTransceiverMessage.deserialize( + payloadBuffer, + (a) => NttManagerMessage.deserialize(a, NativeTokenTransfer.deserialize) + ); + const calculatedDigest = getNttManagerMessageDigest( + coalesceChainId(this.chain), + transceiverMessage.ntt_managerPayload + ); + const sourceToken: string = + transceiverMessage.ntt_managerPayload.payload.sourceToken.toString('hex'); + const lc: LifeCycle = { + srcChainId: coalesceChainId(this.chain), + destChainId: decodedTransfer.recipientChain, + sourceToken, + tokenAmount: BigInt(decodedTransfer.amount), + transferSentTxhash: txhash.startsWith('0x') ? txhash.slice(2) : txhash, + redeemedTxhash: '', + nttTransferKey, + vaaId, + digest: calculatedDigest, + transferTime: timestampsByBlock[blockNumber], + redeemTime: '', + inboundTransferQueuedTime: '', + outboundTransferQueuedTime: '', + outboundTransferRateLimitedTime: '', + }; + this.lifecycleMap.set(calculatedDigest, lc); + // await saveToFirestore(lc); // Here for testing. + await saveToPG(this.pg, lc, TransferSentTopic, this.logger); + newEntry = true; + this.logger.debug( + `For txhash ${txhash}, correlating nttTransferKey ${nttTransferKey} to vaaId ${vaaId} and digest ${calculatedDigest}` + ); + } catch (e) { + this.logger.error('Error:', e); + } + } + } + } else if (log.topics[0] === TransferRedeemedTopic) { + this.logger.debug('***********TransferRedeemedTopic***************'); + let digest: string = log.topics[1]; + if (digest.startsWith('0x')) { + digest = digest.slice(2); + } + this.logger.debug('digest:', digest); + // Check if we have a lifecycle for this digest + const lc = this.lifecycleMap.get(digest); + if (lc) { + lc.redeemTime = timestampsByBlock[blockNumber]; + this.lifecycleMap.set(digest, lc); + // await saveToFirestore(lc); // Here for testing + await saveToPG(this.pg, lc, TransferRedeemedTopic, this.logger); + newEntry = true; + this.logger.debug(`For digest ${digest}, found lifecycle`); + } else { + this.logger.error(`TransferRedeemedTopic: Could not find lifecycle for digest ${digest}`); + const lc: LifeCycle = { + transferSentTxhash: '', + redeemedTxhash: txhash.startsWith('0x') ? txhash.slice(2) : txhash, + sourceToken: '', + nttTransferKey: '', + vaaId: '', + digest: digest, + transferTime: '', + redeemTime: timestampsByBlock[blockNumber], + srcChainId: 0, + destChainId: coalesceChainId(this.chain), + tokenAmount: 0n, + inboundTransferQueuedTime: '', + outboundTransferQueuedTime: '', + outboundTransferRateLimitedTime: '', + }; + this.lifecycleMap.set(digest, lc); + // await saveToFirestore(lc); // Here for testing + await saveToPG(this.pg, lc, TransferRedeemedTopic, this.logger); + newEntry = true; + } + } else if (log.topics[0] === InboundTransferQueuedTopic) { + this.logger.debug('***********InboundTransferQueuedTopic***************'); + let digest: string = log.data; + if (digest.startsWith('0x')) { + digest = digest.slice(2); + } + this.logger.debug('digest:', digest); + // Check if we have a lifecycle for this digest + const lc = this.lifecycleMap.get(digest); + if (lc) { + lc.inboundTransferQueuedTime = timestampsByBlock[blockNumber]; + this.lifecycleMap.set(digest, lc); + // await saveToFirestore(lc); // Here for testing + await saveToPG(this.pg, lc, InboundTransferQueuedTopic, this.logger); + newEntry = true; + this.logger.debug(`For digest ${digest}, found lifecycle`); + } else { + this.logger.error( + `InboundTransferQueuedTopic: Could not find lifecycle for digest ${digest}` + ); + } + } else if (log.topics[0] === OutboundTransferQueuedTopic) { + this.logger.debug('***********OutboundTransferQueuedTopic***************'); + let digest: string = log.data; + if (digest.startsWith('0x')) { + digest = digest.slice(2); + } + this.logger.debug('digest:', digest); + // Check if we have a lifecycle for this digest + const lc = this.lifecycleMap.get(digest); + if (lc) { + lc.outboundTransferQueuedTime = timestampsByBlock[blockNumber]; + this.lifecycleMap.set(digest, lc); + // await saveToFirestore(lc); // Here for testing + await saveToPG(this.pg, lc, OutboundTransferQueuedTopic, this.logger); + newEntry = true; + this.logger.debug(`For digest ${digest}, found lifecycle`); + } else { + this.logger.error( + `OutboundTransferQueuedTopic: Could not find lifecycle for digest ${digest}` + ); + } + } else if (log.topics[0] === OutboundTransferRateLimitedTopic) { + this.logger.debug('***********OutboundTransferRateLimitedTopic***************'); + let digest: string = log.data; + if (digest.startsWith('0x')) { + digest = digest.slice(2); + } + this.logger.debug('digest:', digest); + // Check if we have a lifecycle for this digest + const lc = this.lifecycleMap.get(digest); + if (lc) { + lc.outboundTransferRateLimitedTime = timestampsByBlock[blockNumber]; + this.lifecycleMap.set(digest, lc); + // await saveToFirestore(lc); // Here for testing + await saveToPG(this.pg, lc, OutboundTransferRateLimitedTopic, this.logger); + newEntry = true; + this.logger.debug(`For digest ${digest}, found lifecycle`); + } else { + this.logger.error( + `OutboundTransferRateLimitedTopic: Could not find lifecycle for digest ${digest}` + ); + } + } + } + // Here for testing + // if (newEntry) { + // dumpLifecycleMap(this.lifecycleMap); + // await sleep(3000); + // } + + // Create blockKey + const blockKey = makeBlockKey(toBlock.toString(), timestampsByBlock[toBlock]); + return blockKey; + } +} + +type decodedTransferSent = { + recipient: string; + amount: string; + fee: string; + recipientChain: number; + msgSequence: number; +}; + +/// event TransferSent( bytes32 recipient, uint256 amount, uint256 fee, uint16 recipientChain, uint64 msgSequence); +function decodeNttTransferSent(data: string): decodedTransferSent { + // There are 5 fields in this message. Each is 32 bytes long (64 characters) + // If data starts with '0x', we need to remove it + if (data.startsWith('0x')) { + data = data.slice(2); + } + // this.logger.debug('data:', data); + // this.logger.debug('data.length:', data.length); + let retVal: decodedTransferSent = { + recipient: '', + amount: '', + fee: '', + recipientChain: 0, + msgSequence: 0, + }; + if (data.length === 320) { + retVal.recipient = data.slice(0, 64); + retVal.amount = '0x' + data.slice(64, 128); + retVal.fee = '0x' + data.slice(128, 192); + retVal.recipientChain = Number('0x' + data.slice(192, 256)); + retVal.msgSequence = Number('0x' + data.slice(256, 320)); + } + return retVal; +} + +function makeNttTransferKey(mgrAddress: string, recipient: string, seq: number): string { + if (mgrAddress.startsWith('0x')) { + mgrAddress = mgrAddress.slice(2); + } + if (recipient.startsWith('0x')) { + recipient = recipient.slice(2); + } + return `${mgrAddress}/${recipient}/${seq}`; +} + +export const makeVaaId = (chainId: number, emitter: string, seq: number): string => + `${chainId}/${emitter}/${seq}`; + +function isNTTEvent(log: Log): boolean { + return NTT_TOPICS.some((topic) => log.topics[0].includes(topic)); +} + +function dumpLifecycleMap(lcMap: Map) { + console.log('Lifecycle Map:'); + for (const [digest, lc] of lcMap) { + console.log(digest, lc); + } +} + +async function saveToFirestore(lc: LifeCycle) { + console.log('Saving to Firestore:', lc); + const COLLECTION: string = 'nttLifecycle'; + // Document ID is the digest + const firestore = new Firestore(); + const collectionRef = firestore.collection(COLLECTION); + const document = collectionRef.doc(lc.digest); + const doc = await document.get(); + if (doc.exists) { + console.log('Document exists'); + lc = mergeLifecycles(lc, doc.data() as LifeCycle); + } + // Now save to firestore + console.log('Attempting to save to firestore...'); + const res = await document.set(lc); + console.log('Firestore response:', res); +} + +async function saveToPG(pg: Knex, lc: LifeCycle, initiatingEvent: string, logger: WormholeLogger) { + if (!pg) { + throw new Error('pg not initialized'); + } + if (lc.digest === '') { + throw new Error('digest is empty'); + } + + logger.debug('saveToPG: Attempting to get existing record...'); + await pg.transaction(async (trx) => { + const existing = await trx('life_cycle').where('digest', lc.digest).first(); + if (!existing) { + logger.debug('saveToPG: Inserting new record'); + await trx('life_cycle').insert({ + from_chain: lc.srcChainId, + to_chain: lc.destChainId, + from_token: lc.sourceToken, + token_amount: lc.tokenAmount, + transfer_sent_txhash: lc.transferSentTxhash, + redeemed_txhash: lc.redeemedTxhash, + ntt_transfer_key: lc.nttTransferKey, + vaa_id: lc.vaaId, + digest: lc.digest, + transfer_time: lc.transferTime.length > 0 ? formatIntoTimestamp(lc.transferTime) : null, + redeem_time: lc.redeemTime.length > 0 ? formatIntoTimestamp(lc.redeemTime) : null, + inbound_transfer_queued_time: + lc.inboundTransferQueuedTime.length > 0 + ? formatIntoTimestamp(lc.inboundTransferQueuedTime) + : null, + outbound_transfer_queued_time: + lc.outboundTransferQueuedTime.length > 0 + ? formatIntoTimestamp(lc.outboundTransferQueuedTime) + : null, + outbound_transfer_rate_limited_time: + lc.outboundTransferRateLimitedTime.length > 0 + ? formatIntoTimestamp(lc.outboundTransferRateLimitedTime) + : null, + }); + return; + } + // If the row already exists, then we need to update it with the information from the initiating event + logger.debug('saveToPG: Updating existing record'); + if (initiatingEvent === TransferSentTopic) { + await trx('life_cycle') + .where('digest', lc.digest) + .update({ + from_chain: lc.srcChainId, + to_chain: lc.destChainId, + from_token: lc.sourceToken, + token_amount: lc.tokenAmount, + transfer_sent_txhash: lc.transferSentTxhash, + ntt_transfer_key: lc.nttTransferKey, + vaa_id: lc.vaaId, + transfer_time: formatIntoTimestamp(lc.transferTime), + }); + } else if (initiatingEvent === TransferRedeemedTopic) { + await trx('life_cycle') + .where('digest', lc.digest) + .update({ + to_chain: lc.destChainId, + redeemed_txhash: lc.redeemedTxhash, + redeem_time: formatIntoTimestamp(lc.redeemTime), + }); + } else if (initiatingEvent === InboundTransferQueuedTopic) { + await trx('life_cycle') + .where('digest', lc.digest) + .update({ + inbound_transfer_queued_time: formatIntoTimestamp(lc.inboundTransferQueuedTime), + }); + } else if (initiatingEvent === OutboundTransferQueuedTopic) { + await trx('life_cycle') + .where('digest', lc.digest) + .update({ + outbound_transfer_queued_time: formatIntoTimestamp(lc.outboundTransferQueuedTime), + }); + } else if (initiatingEvent === OutboundTransferRateLimitedTopic) { + await trx('life_cycle') + .where('digest', lc.digest) + .update({ + outbound_transfer_rate_limited_time: formatIntoTimestamp( + lc.outboundTransferRateLimitedTime + ), + }); + } else { + logger.error(`saveToPG: Unknown initiating event: ${initiatingEvent} and lifeCycle: ${lc}`); + } + }); +} + +function formatIntoTimestamp(timestamp: string): string { + if (timestamp === '') { + return 'NULL'; + } + // Expected input format is:2024-03-01T02:30:45.000Z + // Convert the 'T' to a space + let parts = timestamp.split('T'); + // Remove the trailing 'Z' + parts[1] = parts[1].slice(0, -1); + return parts.join(' '); +} + +function mergeLifecycles(lc: LifeCycle, existing: LifeCycle): LifeCycle { + let merged: LifeCycle = { + srcChainId: lc.srcChainId !== 0 ? lc.srcChainId : existing.srcChainId, + destChainId: lc.destChainId !== 0 ? lc.destChainId : existing.destChainId, + sourceToken: lc.sourceToken !== '' ? lc.sourceToken : existing.sourceToken, + tokenAmount: lc.tokenAmount !== 0n ? lc.tokenAmount : existing.tokenAmount, + transferSentTxhash: + lc.transferSentTxhash !== '' ? lc.transferSentTxhash : existing.transferSentTxhash, + redeemedTxhash: lc.redeemedTxhash !== '' ? lc.redeemedTxhash : existing.redeemedTxhash, + nttTransferKey: lc.nttTransferKey !== '' ? lc.nttTransferKey : existing.nttTransferKey, + vaaId: lc.vaaId !== '' ? lc.vaaId : existing.vaaId, + digest: lc.digest, + transferTime: lc.transferTime !== '' ? lc.transferTime : existing.transferTime, + redeemTime: lc.redeemTime !== '' ? lc.redeemTime : existing.redeemTime, + inboundTransferQueuedTime: + lc.inboundTransferQueuedTime !== '' + ? lc.inboundTransferQueuedTime + : existing.inboundTransferQueuedTime, + outboundTransferQueuedTime: + lc.outboundTransferQueuedTime !== '' + ? lc.outboundTransferQueuedTime + : existing.outboundTransferQueuedTime, + outboundTransferRateLimitedTime: + lc.outboundTransferRateLimitedTime !== '' + ? lc.outboundTransferRateLimitedTime + : existing.outboundTransferRateLimitedTime, + }; + return merged; +} + +function isRelayer(network: Environment, chain: ChainName, emitter: string): boolean { + const ucNetwork = + network === 'mainnet' ? 'MAINNET' : network === 'testnet' ? 'TESTNET' : 'DEVNET'; + let relayer = RELAYER_CONTRACTS[ucNetwork][chain]?.wormholeRelayerAddress; + if (!relayer) { + return false; + } + if (relayer.startsWith('0x')) { + relayer = relayer.slice(2); + } + // Strip leading 0x off emitter + if (emitter.startsWith('0x')) { + emitter = emitter.slice(2); + } + // The relayer and the emitter may not have the same length, + // so pad the shorter one with leading 0s + let len = Math.max(relayer.length, emitter.length); + relayer = relayer.padStart(len, '0'); + emitter = emitter.padStart(len, '0'); + if (emitter.toLowerCase() === relayer.toLowerCase()) { + console.log('Relayer detected'); + return true; + } + return false; +} diff --git a/watcher/src/watchers/Watcher.ts b/watcher/src/watchers/Watcher.ts index aeb5c2fc..8315ed1b 100644 --- a/watcher/src/watchers/Watcher.ts +++ b/watcher/src/watchers/Watcher.ts @@ -7,7 +7,7 @@ import { import { z } from 'zod'; import { TIMEOUT } from '../consts'; import { VaasByBlock } from '../databases/types'; -import { getResumeBlockByChain, storeVaasByBlock } from '../databases/utils'; +import { getResumeBlockByChain, storeLatestBlock, storeVaasByBlock } from '../databases/utils'; import { getLogger, WormholeLogger } from '../utils/logger'; export class Watcher { @@ -15,11 +15,13 @@ export class Watcher { network: Environment; logger: WormholeLogger; maximumBatchSize: number = 100; + isNTT: boolean = false; - constructor(network: Environment, chain: ChainName) { + constructor(network: Environment, chain: ChainName, isNTT: boolean = false) { this.network = network; this.chain = chain; - this.logger = getLogger(chain); + this.isNTT = isNTT; + this.logger = isNTT ? getLogger('NTT_' + chain) : getLogger(chain); } async getFinalizedBlockNumber(): Promise { @@ -30,6 +32,10 @@ export class Watcher { throw new Error('Not Implemented'); } + async getNttMessagesForBlocks(fromBlock: number, toBlock: number): Promise { + throw new Error('Not Implemented'); + } + isValidBlockKey(key: string) { try { const [block, timestamp] = key.split('/'); @@ -52,7 +58,11 @@ export class Watcher { async watch(): Promise { let toBlock: number | null = null; - let fromBlock: number | null = await getResumeBlockByChain(this.network, this.chain); + let fromBlock: number | null = await getResumeBlockByChain( + this.network, + this.chain, + this.isNTT + ); let retry = 0; while (true) { try { @@ -61,8 +71,13 @@ export class Watcher { // fetch logs for the block range, inclusive of toBlock toBlock = Math.min(fromBlock + this.maximumBatchSize - 1, toBlock); this.logger.info(`fetching messages from ${fromBlock} to ${toBlock}`); - const vaasByBlock = await this.getMessagesForBlocks(fromBlock, toBlock); - await storeVaasByBlock(this.chain, vaasByBlock); + if (this.isNTT) { + const blockKey = await this.getNttMessagesForBlocks(fromBlock, toBlock); + await storeLatestBlock(this.chain, blockKey, true); + } else { + const vaasByBlock = await this.getMessagesForBlocks(fromBlock, toBlock); + await storeVaasByBlock(this.chain, vaasByBlock); + } fromBlock = toBlock + 1; } try { diff --git a/watcher/src/watchers/__tests__/CosmwasmWatcher.test.ts b/watcher/src/watchers/__tests__/CosmwasmWatcher.test.ts index 2d0ccd7d..f5315fcf 100644 --- a/watcher/src/watchers/__tests__/CosmwasmWatcher.test.ts +++ b/watcher/src/watchers/__tests__/CosmwasmWatcher.test.ts @@ -126,7 +126,7 @@ test('getFinalizedBlockNumber(injective)', async () => { expect(blockNumber).toBeGreaterThan(23333696); }); -test('getMessagesForBlocks(injective)', async () => { +test.skip('getMessagesForBlocks(injective)', async () => { const watcher = new InjectiveExplorerWatcher('mainnet'); const vaasByBlock = await watcher.getMessagesForBlocks(61720293, 61720294); const entries = Object.entries(vaasByBlock); diff --git a/watcher/src/watchers/utils.ts b/watcher/src/watchers/utils.ts index f1033750..2d49e969 100644 --- a/watcher/src/watchers/utils.ts +++ b/watcher/src/watchers/utils.ts @@ -15,6 +15,8 @@ import { SeiExplorerWatcher } from './SeiExplorerWatcher'; import { WormchainWatcher } from './WormchainWatcher'; import { NearArchiveWatcher } from './NearArchiveWatcher'; import { Environment } from '@wormhole-foundation/wormhole-monitor-common'; +import { NTTWatcher } from './NTTWatcher'; +import { NTTArbitrumWatcher } from './NTTArbitrumWatcher'; export function makeFinalizedWatcher(network: Environment, chainName: ChainName): Watcher { if (chainName === 'solana') { @@ -77,3 +79,22 @@ export function makeFinalizedWatcher(network: Environment, chainName: ChainName) throw new Error(`Attempted to create finalized watcher for unsupported chain ${chainName}`); } } + +export function makeFinalizedNTTWatcher(network: Environment, chainName: ChainName): Watcher { + if (network === 'testnet') { + // These are testnet only chains + if (chainName === 'sepolia' || chainName === 'holesky') { + return new NTTWatcher(network, chainName, 'finalized'); + } else if (chainName === 'base_sepolia' || chainName === 'optimism_sepolia') { + return new NTTWatcher(network, chainName); + } else if (chainName === 'arbitrum_sepolia') { + return new NTTArbitrumWatcher(network); + } else { + throw new Error( + `Attempted to create finalized watcher for unsupported testnet chain ${chainName}` + ); + } + } else { + throw new Error(`Attempted to create finalized watcher for unsupported chain ${chainName}`); + } +}