From 37178d669438bd453b7e774cfd2b2392bd102654 Mon Sep 17 00:00:00 2001 From: benluelo Date: Tue, 28 Jan 2025 09:00:22 +0000 Subject: [PATCH 1/2] chore(voyager): cleanup --- .../src/header.rs | 126 ------------------ .../src/lib.rs | 3 +- .../src/header.rs | 22 +-- voyager/modules/client/cometbls/src/main.rs | 1 + .../client/state-lens/ics23-ics23/src/main.rs | 52 ++++---- .../state/cosmos-sdk-union/src/main.rs | 7 +- .../client-update/state-lens/src/main.rs | 74 +++++----- 7 files changed, 91 insertions(+), 194 deletions(-) delete mode 100644 lib/state-lens-ics23-mpt-light-client-types/src/header.rs diff --git a/lib/state-lens-ics23-mpt-light-client-types/src/header.rs b/lib/state-lens-ics23-mpt-light-client-types/src/header.rs deleted file mode 100644 index edcd4cead7..0000000000 --- a/lib/state-lens-ics23-mpt-light-client-types/src/header.rs +++ /dev/null @@ -1,126 +0,0 @@ -use unionlabs::{ - ibc::core::{client::height::Height, commitment::merkle_proof::MerkleProof}, - primitives::Bytes, -}; - -#[derive(Debug, Clone, PartialEq)] -#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))] -#[cfg_attr(feature = "bincode", derive(bincode::Encode, bincode::Decode))] -pub struct Header { - pub l1_height: Height, - pub l2_height: Height, - pub l2_consensus_state_proof: MerkleProof, - pub l2_consensus_state: Bytes, -} - -#[cfg(feature = "ethabi")] -pub mod ethabi { - use alloy::sol_types::SolValue; - use unionlabs::{ - encoding::{Encode, EthAbi}, - union::ics23, - }; - - use crate::Header; - - impl Encode for Header { - fn encode(self) -> Vec { - Into::::into(self).abi_encode_params() - } - } - - alloy::sol! { - struct SolHeader { - uint64 l1Height; - uint64 l2Height; - bytes l2InclusionProof; - bytes l2ConsensusState; - } - } - - #[derive(Debug, Clone, PartialEq, thiserror::Error)] - pub enum Error {} - - impl From
for SolHeader { - fn from(value: Header) -> Self { - Self { - l1Height: value.l1_height.height(), - l2Height: value.l2_height.height(), - l2InclusionProof: encode_merkle_proof_for_evm(value.l2_consensus_state_proof) - .into(), - l2ConsensusState: value.l2_consensus_state.into(), - } - } - } - - // FIXME: deduplicate with voyager/module/client/cometbls, in unionlabs? - fn encode_merkle_proof_for_evm( - proof: unionlabs::ibc::core::commitment::merkle_proof::MerkleProof, - ) -> Vec { - alloy::sol! { - struct ExistenceProof { - bytes key; - bytes value; - bytes leafPrefix; - InnerOp[] path; - } - - struct NonExistenceProof { - bytes key; - ExistenceProof left; - ExistenceProof right; - } - - struct InnerOp { - bytes prefix; - bytes suffix; - } - - struct ProofSpec { - uint256 childSize; - uint256 minPrefixLength; - uint256 maxPrefixLength; - } - } - - let merkle_proof = ics23::merkle_proof::MerkleProof::try_from( - protos::ibc::core::commitment::v1::MerkleProof::from(proof), - ) - .unwrap(); - - let convert_inner_op = |i: unionlabs::union::ics23::inner_op::InnerOp| InnerOp { - prefix: i.prefix.into(), - suffix: i.suffix.into(), - }; - - let convert_existence_proof = - |e: unionlabs::union::ics23::existence_proof::ExistenceProof| ExistenceProof { - key: e.key.into(), - value: e.value.into(), - leafPrefix: e.leaf_prefix.into(), - path: e.path.into_iter().map(convert_inner_op).collect(), - }; - - let exist_default = || ics23::existence_proof::ExistenceProof { - key: vec![].into(), - value: vec![].into(), - leaf_prefix: vec![].into(), - path: vec![], - }; - - match merkle_proof { - ics23::merkle_proof::MerkleProof::Membership(a, b) => { - (convert_existence_proof(a), convert_existence_proof(b)).abi_encode_params() - } - ics23::merkle_proof::MerkleProof::NonMembership(a, b) => ( - NonExistenceProof { - key: a.key.into(), - left: convert_existence_proof(a.left.unwrap_or_else(exist_default)), - right: convert_existence_proof(a.right.unwrap_or_else(exist_default)), - }, - convert_existence_proof(b), - ) - .abi_encode_params(), - } - } -} diff --git a/lib/state-lens-ics23-mpt-light-client-types/src/lib.rs b/lib/state-lens-ics23-mpt-light-client-types/src/lib.rs index 3d184a96ce..491b00f903 100644 --- a/lib/state-lens-ics23-mpt-light-client-types/src/lib.rs +++ b/lib/state-lens-ics23-mpt-light-client-types/src/lib.rs @@ -1,5 +1,4 @@ pub mod client_state; pub mod consensus_state; -pub mod header; -pub use crate::{client_state::ClientState, consensus_state::ConsensusState, header::Header}; +pub use crate::{client_state::ClientState, consensus_state::ConsensusState}; diff --git a/lib/state-lens-light-client-types/src/header.rs b/lib/state-lens-light-client-types/src/header.rs index 5621c4b5ab..019934e1ca 100644 --- a/lib/state-lens-light-client-types/src/header.rs +++ b/lib/state-lens-light-client-types/src/header.rs @@ -14,18 +14,11 @@ pub struct Header { #[cfg(feature = "ethabi")] pub mod ethabi { use alloy::sol_types::SolValue; - use unionlabs::encoding::{Encode, EthAbi}; + use unionlabs::{ibc::core::client::height::Height, impl_ethabi_via_try_from_into}; use crate::Header; - impl Encode for Header { - fn encode(self) -> Vec { - Into::::into(self).abi_encode_params() - } - } - - #[derive(Debug, Clone, PartialEq, thiserror::Error)] - pub enum Error {} + impl_ethabi_via_try_from_into!(Header => SolHeader); alloy::sol! { struct SolHeader { @@ -46,4 +39,15 @@ pub mod ethabi { } } } + + impl From for Header { + fn from(value: SolHeader) -> Self { + Self { + l1_height: Height::new(value.l1Height), + l2_height: Height::new(value.l2Height), + l2_consensus_state_proof: value.l2InclusionProof.into(), + l2_consensus_state: value.l2ConsensusState.into(), + } + } + } } diff --git a/voyager/modules/client/cometbls/src/main.rs b/voyager/modules/client/cometbls/src/main.rs index 198269456c..803ae08c55 100644 --- a/voyager/modules/client/cometbls/src/main.rs +++ b/voyager/modules/client/cometbls/src/main.rs @@ -512,6 +512,7 @@ fn reencode_zkp_for_move(zkp: &[u8]) -> Result, SerializationError> { Ok(buf) } + #[model] struct MoveMembershipProof { sub_proof: ics23::existence_proof::ExistenceProof, diff --git a/voyager/modules/client/state-lens/ics23-ics23/src/main.rs b/voyager/modules/client/state-lens/ics23-ics23/src/main.rs index ca2c0f13d8..43bf9933f7 100644 --- a/voyager/modules/client/state-lens/ics23-ics23/src/main.rs +++ b/voyager/modules/client/state-lens/ics23-ics23/src/main.rs @@ -203,35 +203,39 @@ impl ClientModuleServer for Module { } } -fn encode_merkle_proof_for_evm( - proof: unionlabs::ibc::core::commitment::merkle_proof::MerkleProof, -) -> Vec { - alloy::sol! { - struct ExistenceProof { - bytes key; - bytes value; - bytes leafPrefix; - InnerOp[] path; - } +alloy::sol! { + #[derive(Debug)] + struct ExistenceProof { + bytes key; + bytes value; + bytes leafPrefix; + InnerOp[] path; + } - struct NonExistenceProof { - bytes key; - ExistenceProof left; - ExistenceProof right; - } + #[derive(Debug)] + struct NonExistenceProof { + bytes key; + ExistenceProof left; + ExistenceProof right; + } - struct InnerOp { - bytes prefix; - bytes suffix; - } + #[derive(Debug)] + struct InnerOp { + bytes prefix; + bytes suffix; + } - struct ProofSpec { - uint256 childSize; - uint256 minPrefixLength; - uint256 maxPrefixLength; - } + #[derive(Debug)] + struct ProofSpec { + uint256 childSize; + uint256 minPrefixLength; + uint256 maxPrefixLength; } +} +fn encode_merkle_proof_for_evm( + proof: unionlabs::ibc::core::commitment::merkle_proof::MerkleProof, +) -> Vec { let merkle_proof = ics23::merkle_proof::MerkleProof::try_from( protos::ibc::core::commitment::v1::MerkleProof::from(proof), ) diff --git a/voyager/modules/state/cosmos-sdk-union/src/main.rs b/voyager/modules/state/cosmos-sdk-union/src/main.rs index 5991b15271..1300308be1 100644 --- a/voyager/modules/state/cosmos-sdk-union/src/main.rs +++ b/voyager/modules/state/cosmos-sdk-union/src/main.rs @@ -23,7 +23,7 @@ use prost::Message; use protos::cosmwasm::wasm::v1::{QuerySmartContractStateRequest, QuerySmartContractStateResponse}; use serde::{de::DeserializeOwned, Deserialize, Serialize}; use serde_json::{json, Value}; -use tracing::{error, instrument}; +use tracing::{debug, error, instrument}; use unionlabs::{ bech32::Bech32, ibc::core::client::height::Height, @@ -110,6 +110,7 @@ impl Module { Height::new_with_revision(self.chain_revision, height) } + #[instrument(skip_all, fields(?height))] pub async fn query_smart( &self, query: &Q, @@ -128,6 +129,8 @@ impl Module { ) .await?; + debug!(?response); + Ok(response.value.map(|value| { serde_json::from_slice( &QuerySmartContractStateResponse::decode(&*value) @@ -158,7 +161,7 @@ impl Module { ) .await .map_err(rpc_error( - format_args!("error fetching abci query"), + "error fetching abci query", Some(json!({ "height": height, "path": data })), )) .map(|response| response.response) diff --git a/voyager/plugins/client-update/state-lens/src/main.rs b/voyager/plugins/client-update/state-lens/src/main.rs index 8fbd6a6215..42ea903d37 100644 --- a/voyager/plugins/client-update/state-lens/src/main.rs +++ b/voyager/plugins/client-update/state-lens/src/main.rs @@ -12,7 +12,7 @@ use jsonrpsee::{ }; use serde::{Deserialize, Serialize}; use state_lens_light_client_types::Header; -use tracing::{debug, instrument}; +use tracing::{debug, info, instrument}; use unionlabs::ErrorReporter; use voyager_message::{ call::{Call, FetchUpdateHeaders, WaitForTrustedHeight}, @@ -259,36 +259,48 @@ impl PluginServer for Module { // if the L2 consensus state exists on the L1, we don't have to update the L2 on the L1. match l2_consensus_state_proof { - Some(_) => Ok(continuation), - None => Ok(conc([ - // update the L2 client on L1 and then dispatch the continuation - promise( - [call(FetchUpdateHeaders { - client_type: l2_client_type, - chain_id: self.chain_id.clone(), - counterparty_chain_id: l1_client_meta.chain_id.clone(), - client_id: RawClientId::new(state_lens_client_state.l2_client_id), - update_from, - update_to, - })], - [], - AggregateMsgUpdateClientsFromOrderedHeaders { - ibc_spec_id: IbcUnion::ID, - chain_id: l1_client_meta.chain_id.clone(), - client_id: RawClientId::new(state_lens_client_state.l2_client_id), - }, - ), - seq([ - call(WaitForTrustedHeight { - chain_id: l1_client_meta.chain_id.clone(), - ibc_spec_id: IbcUnion::ID, - client_id: RawClientId::new(state_lens_client_state.l2_client_id), - height: update_to, - finalized: true, - }), - continuation, - ]), - ])), + Some(_) => { + info!("consensus state already exists"); + Ok(continuation) + } + None => { + info!("consensus state does not exist, queueing update for l2 client"); + Ok(conc([ + // update the L2 client on L1 and then dispatch the continuation + promise( + [call(FetchUpdateHeaders { + client_type: l2_client_type, + chain_id: self.chain_id.clone(), + counterparty_chain_id: l1_client_meta.chain_id.clone(), + client_id: RawClientId::new( + state_lens_client_state.l2_client_id, + ), + update_from, + update_to, + })], + [], + AggregateMsgUpdateClientsFromOrderedHeaders { + ibc_spec_id: IbcUnion::ID, + chain_id: l1_client_meta.chain_id.clone(), + client_id: RawClientId::new( + state_lens_client_state.l2_client_id, + ), + }, + ), + seq([ + call(WaitForTrustedHeight { + chain_id: l1_client_meta.chain_id.clone(), + ibc_spec_id: IbcUnion::ID, + client_id: RawClientId::new( + state_lens_client_state.l2_client_id, + ), + height: update_to, + finalized: true, + }), + continuation, + ]), + ])) + } } } ModuleCall::FetchUpdateAfterL1Update(FetchUpdateAfterL1Update { From 219688eb13022a7f5b25dcd2b9cc44f5c1f0c9ca Mon Sep 17 00:00:00 2001 From: benluelo Date: Tue, 28 Jan 2025 12:54:53 +0000 Subject: [PATCH 2/2] chore(voyager): improvements and cleanup --- lib/chain-utils/src/keyring.rs | 4 +- lib/pg-queue/src/lib.rs | 12 +- lib/reconnecting-jsonrpc-ws-client/src/lib.rs | 2 +- lib/voyager-message/src/context.rs | 56 +- lib/voyager-message/src/module.rs | 9 +- lib/voyager-message/src/rpc.rs | 14 +- lib/voyager-message/src/rpc/server.rs | 20 +- lib/voyager-vm/src/in_memory.rs | 12 +- .../proof/cosmos-sdk-union/src/main.rs | 79 +- voyager/modules/proof/cosmos-sdk/src/main.rs | 45 +- voyager/modules/proof/ethereum/src/main.rs | 11 +- voyager/modules/proof/movement/src/main.rs | 21 +- .../client-update/state-lens/src/main.rs | 18 +- .../plugins/event-source/ethereum/src/main.rs | 1576 +++++++++-------- .../plugins/transaction/ethereum/src/main.rs | 20 +- voyager/src/cli.rs | 47 +- voyager/src/main.rs | 76 +- 17 files changed, 1042 insertions(+), 980 deletions(-) diff --git a/lib/chain-utils/src/keyring.rs b/lib/chain-utils/src/keyring.rs index d1387454a8..86e71c0ca7 100644 --- a/lib/chain-utils/src/keyring.rs +++ b/lib/chain-utils/src/keyring.rs @@ -4,7 +4,7 @@ use crossbeam_queue::ArrayQueue; use futures::Future; use rand::prelude::SliceRandom; use serde::{Deserialize, Serialize}; -use tracing::{info_span, warn, Instrument}; +use tracing::{debug, info_span, Instrument}; pub trait ChainKeyring { type Address: Hash + Eq + Clone + Display + Send + Sync; @@ -88,7 +88,7 @@ impl ConcurrentKeyring { f: F, ) -> Option { let Some(address) = self.addresses_buffer.pop() else { - warn!(keyring = %self.name, "high traffic in keyring"); + debug!(keyring = %self.name, "high traffic in keyring"); return None; }; diff --git a/lib/pg-queue/src/lib.rs b/lib/pg-queue/src/lib.rs index 38fcf2cefb..3998050478 100644 --- a/lib/pg-queue/src/lib.rs +++ b/lib/pg-queue/src/lib.rs @@ -323,7 +323,7 @@ impl voyager_vm::Queue for PgQueue { match row { Some(row) => { - let span = info_span!("processing item", id = row.id); + let span = info_span!("processing item", item_id = row.id); trace!(%row.item); @@ -384,20 +384,22 @@ impl voyager_vm::Queue for PgQueue { sqlx::query( " - INSERT INTO queue (item) - SELECT * FROM UNNEST($1::JSONB[]) + INSERT INTO queue (item, parents) + SELECT *, $1 as parents FROM UNNEST($2::JSONB[]) ", ) + .bind(vec![row.id]) .bind(ready.into_iter().map(Json).collect::>()) .execute(tx.as_mut()) .await?; sqlx::query( " - INSERT INTO optimize (item, tag) - SELECT * FROM UNNEST($1::JSONB[], $2::TEXT[]) + INSERT INTO optimize (item, tag, parents) + SELECT *, $1 as parents FROM UNNEST($2::JSONB[], $3::TEXT[]) ", ) + .bind(vec![row.id]) .bind(optimize.iter().map(|(op, _)| Json(op)).collect::>()) .bind(optimize.iter().map(|(_, tag)| *tag).collect::>()) .execute(tx.as_mut()) diff --git a/lib/reconnecting-jsonrpc-ws-client/src/lib.rs b/lib/reconnecting-jsonrpc-ws-client/src/lib.rs index 4e38ce254a..30c42b44a8 100644 --- a/lib/reconnecting-jsonrpc-ws-client/src/lib.rs +++ b/lib/reconnecting-jsonrpc-ws-client/src/lib.rs @@ -181,7 +181,7 @@ impl ClientT for Client { .as_deref() .ok_or_else(|| { jsonrpsee::core::client::Error::Custom(format!( - "not yet connected (request: {method})", + "not yet connected (notification: {method})", )) })? .notification(method, params) diff --git a/lib/voyager-message/src/context.rs b/lib/voyager-message/src/context.rs index eca377288b..2e4aff1317 100644 --- a/lib/voyager-message/src/context.rs +++ b/lib/voyager-message/src/context.rs @@ -562,39 +562,39 @@ impl Context { ) .await?; - main_rpc_server.start(Arc::new(modules)); - info!("checking for plugin health..."); - { - let mut futures = plugins - .iter() - .map(|(name, client)| async move { - match client - .client - .wait_until_connected(Duration::from_secs(10)) - .instrument(debug_span!("health check", %name)) - .await - { - Ok(_) => { - info!("plugin {name} connected") - } - Err(_) => { - warn!("plugin {name} failed to connect after 10 seconds") - } + let futures = plugins + .iter() + .map(|(name, client)| async move { + match client + .client + .wait_until_connected(Duration::from_secs(10)) + .instrument(debug_span!("health check", %name)) + .await + { + Ok(()) => { + info!("plugin {name} connected") } - }) - .collect::>(); - - match cancellation_token - .run_until_cancelled(async { while let Some(()) = futures.next().await {} }) - .await - { - Some(()) => {} - None => return Err(anyhow!("startup error")), - } + Err(_) => { + warn!("plugin {name} failed to connect after 10 seconds") + } + } + }) + .collect::>(); + + match cancellation_token + .run_until_cancelled(futures.collect::>()) + .await + { + Some(_) => {} + None => return Err(anyhow!("startup error")), } + main_rpc_server.start(Arc::new(modules)); + + info!("started"); + Ok(Self { rpc_server: main_rpc_server, plugins, diff --git a/lib/voyager-message/src/module.rs b/lib/voyager-message/src/module.rs index 180a91d899..8f0767036b 100644 --- a/lib/voyager-message/src/module.rs +++ b/lib/voyager-message/src/module.rs @@ -13,6 +13,7 @@ use crate::{ ChainId, ClientInfo, ClientStateMeta, ClientType, ConsensusStateMeta, IbcInterface, IbcSpec, }, data::Data, + rpc::ProofType, RawClientId, VoyagerMessage, }; @@ -350,14 +351,18 @@ pub trait ProofModule { /// Query a proof of IBC state on this chain, at the specified [`Height`], /// returning the state as a JSON [`Value`]. #[method(name = "queryIbcProof", with_extensions)] - async fn query_ibc_proof(&self, at: Height, path: V::StorePath) -> RpcResult; + async fn query_ibc_proof( + &self, + at: Height, + path: V::StorePath, + ) -> RpcResult<(Value, ProofType)>; } /// Type-erased version of [`ProofModuleClient`]. #[rpc(client, namespace = "proof")] pub trait RawProofModule { #[method(name = "queryIbcProof")] - async fn query_ibc_proof_raw(&self, at: Height, path: Value) -> RpcResult; + async fn query_ibc_proof_raw(&self, at: Height, path: Value) -> RpcResult<(Value, ProofType)>; } /// Client modules provide functionality to interact with a single light client diff --git a/lib/voyager-message/src/rpc.rs b/lib/voyager-message/src/rpc.rs index da796f178b..a980cd947a 100644 --- a/lib/voyager-message/src/rpc.rs +++ b/lib/voyager-message/src/rpc.rs @@ -5,7 +5,7 @@ use jsonrpsee::{ types::{ErrorObject, ErrorObjectOwned}, }; use macros::model; -use serde::de::DeserializeOwned; +use serde::{de::DeserializeOwned, Deserialize, Serialize}; use serde_json::{json, Value}; use unionlabs::{ibc::core::client::height::Height, primitives::Bytes, ErrorReporter}; use voyager_core::{IbcSpecId, Timestamp}; @@ -167,16 +167,18 @@ impl IbcState { #[model] pub struct IbcProof { - // pub proof_type: ProofType, + pub proof_type: ProofType, /// The height that the proof was read at. pub height: Height, pub proof: Value, } -// enum ProofType { -// Membership, -// NonMembership, -// } +#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)] +#[serde(rename_all = "snake_case")] +pub enum ProofType { + Membership, + NonMembership, +} #[model] pub struct SelfClientState { diff --git a/lib/voyager-message/src/rpc/server.rs b/lib/voyager-message/src/rpc/server.rs index fd5e6f7df5..a2f83efd85 100644 --- a/lib/voyager-message/src/rpc/server.rs +++ b/lib/voyager-message/src/rpc/server.rs @@ -342,15 +342,19 @@ impl Server { .map_err(fatal_error)? .with_id(self.item_id); - let proof = proof_module + let (proof, proof_type) = proof_module .query_ibc_proof_raw(height, path) .await .map_err(json_rpc_error_to_error_object)?; // TODO: Use valuable here - debug!(%proof, "fetched ibc proof"); + debug!(%proof, ?proof_type, "fetched ibc proof"); - Ok(IbcProof { height, proof }) + Ok(IbcProof { + height, + proof, + proof_type, + }) }) .await } @@ -415,15 +419,19 @@ impl Server { .map_err(fatal_error)? .with_id(self.item_id); - let proof = proof_module + let (proof, proof_type) = proof_module .query_ibc_proof_raw(height, into_value(path.clone())) .await .map_err(json_rpc_error_to_error_object)?; // TODO: Use valuable here - trace!(%proof, "fetched ibc proof"); + debug!(%proof, ?proof_type, "fetched ibc proof"); - Ok(IbcProof { height, proof }) + Ok(IbcProof { + height, + proof, + proof_type, + }) }) .await } diff --git a/lib/voyager-vm/src/in_memory.rs b/lib/voyager-vm/src/in_memory.rs index baafaf1b03..3e653b77f9 100644 --- a/lib/voyager-vm/src/in_memory.rs +++ b/lib/voyager-vm/src/in_memory.rs @@ -104,17 +104,17 @@ impl Queue for InMemoryQueue { }; match op { - Some((id, item)) => { - let span = info_span!("processing item", %id); + Some((item_id, item)) => { + let span = info_span!("processing item", %item_id); self.done .lock() .expect("mutex is poisoned") - .insert(id, item.clone()); + .insert(item_id, item.clone()); let (r, res) = f( item.op.clone(), - ItemId::new(i64::from(id)).expect("infallible"), + ItemId::new(i64::from(item_id)).expect("infallible"), ) .instrument(span) .await; @@ -130,7 +130,7 @@ impl Queue for InMemoryQueue { optimizer_queue.entry(tag.to_owned()).or_default().insert( self.idx.fetch_add(1, Ordering::SeqCst), Item { - parents: vec![id], + parents: vec![item_id], op, }, ); @@ -139,7 +139,7 @@ impl Queue for InMemoryQueue { ready.insert( self.idx.fetch_add(1, Ordering::SeqCst), Item { - parents: vec![id], + parents: vec![item_id], op, }, ); diff --git a/voyager/modules/proof/cosmos-sdk-union/src/main.rs b/voyager/modules/proof/cosmos-sdk-union/src/main.rs index 6b6bbea593..5da67bcc6e 100644 --- a/voyager/modules/proof/cosmos-sdk-union/src/main.rs +++ b/voyager/modules/proof/cosmos-sdk-union/src/main.rs @@ -18,6 +18,7 @@ use tracing::{error, instrument}; use unionlabs::{ bech32::Bech32, bounded::BoundedI64, + cosmos::ics23::commitment_proof::CommitmentProof, ibc::core::{client::height::Height, commitment::merkle_proof::MerkleProof}, primitives::H256, ErrorReporter, @@ -26,6 +27,7 @@ use voyager_message::{ core::ChainId, into_value, module::{ProofModuleInfo, ProofModuleServer}, + rpc::ProofType, ProofModule, FATAL_JSONRPC_ERROR_CODE, }; use voyager_vm::BoxDynError; @@ -116,7 +118,7 @@ impl ProofModuleServer for Module { _: &Extensions, at: Height, path: StorePath, - ) -> RpcResult { + ) -> RpcResult<(Value, ProofType)> { let data = [0x03] .into_iter() .chain(*self.ibc_host_contract_address.data()) @@ -144,44 +146,53 @@ impl ProofModuleServer for Module { true, ) .await - .map_err(rpc_error("error querying connection proof", None))?; - - Ok(into_value( - MerkleProof::try_from(protos::ibc::core::commitment::v1::MerkleProof { - proofs: query_result - .response - .proof_ops - .ok_or_else(|| { + .map_err(rpc_error("error querying ibc proof", None))?; + + let proofs = query_result + .response + .proof_ops + .ok_or_else(|| { + ErrorObject::owned( + FATAL_JSONRPC_ERROR_CODE, + "proofOps must be present on abci query when called with prove = true", + None::<()>, + ) + })? + .ops + .into_iter() + .map(|op| { + ::decode(&*op.data) + .map_err(|e| { ErrorObject::owned( FATAL_JSONRPC_ERROR_CODE, - "proofOps must be present on abci query when called with prove = true", - None::<()>, + format!("invalid height value: {}", ErrorReporter(e)), + Some(json!({ "height": at })), ) - })? - .ops - .into_iter() - .map(|op| { - ::decode( - &*op.data, - ) - .map_err(|e| { - ErrorObject::owned( - FATAL_JSONRPC_ERROR_CODE, - format!("invalid height value: {}", ErrorReporter(e)), - Some(json!({ "height": at })), - ) - }) }) - .collect::, _>>()?, }) - .map_err(|e| { - ErrorObject::owned( - FATAL_JSONRPC_ERROR_CODE, - format!("invalid height value: {}", ErrorReporter(e)), - Some(json!({ "height": at })), - ) - })?, - )) + .collect::, _>>()?; + + let proof = + MerkleProof::try_from(protos::ibc::core::commitment::v1::MerkleProof { proofs }) + .map_err(|e| { + ErrorObject::owned( + FATAL_JSONRPC_ERROR_CODE, + format!("invalid merkle proof value: {}", ErrorReporter(e)), + Some(json!({ "height": at })), + ) + })?; + + let proof_type = if proof + .proofs + .iter() + .any(|p| matches!(&p, CommitmentProof::Nonexist(_))) + { + ProofType::NonMembership + } else { + ProofType::Membership + }; + + Ok((into_value(proof), proof_type)) } } diff --git a/voyager/modules/proof/cosmos-sdk/src/main.rs b/voyager/modules/proof/cosmos-sdk/src/main.rs index f9c9665134..f05ddb55bb 100644 --- a/voyager/modules/proof/cosmos-sdk/src/main.rs +++ b/voyager/modules/proof/cosmos-sdk/src/main.rs @@ -18,6 +18,7 @@ use serde::{Deserialize, Serialize}; use serde_json::{json, Value}; use tracing::{error, instrument}; use unionlabs::{ + cosmos::ics23::commitment_proof::CommitmentProof, ibc::core::{client::height::Height, commitment::merkle_proof::MerkleProof}, primitives::H256, ErrorReporter, WasmClientType, @@ -26,6 +27,7 @@ use voyager_message::{ core::ChainId, into_value, module::{ProofModuleInfo, ProofModuleServer}, + rpc::ProofType, ProofModule, }; use voyager_vm::BoxDynError; @@ -115,7 +117,7 @@ impl ProofModuleServer for Module { _: &Extensions, at: Height, path: StorePath, - ) -> RpcResult { + ) -> RpcResult<(Value, ProofType)> { const IBC_STORE_PATH: &str = "store/ibc/key"; let path_string = path.to_string(); @@ -140,24 +142,33 @@ impl ProofModuleServer for Module { Some(json!({ "height": at, "path": path })), ))?; - Ok(into_value( - MerkleProof::try_from(protos::ibc::core::commitment::v1::MerkleProof { - proofs: query_result - .response - .proof_ops + let proofs = query_result + .response + .proof_ops + .unwrap() + .ops + .into_iter() + .map(|op| { + ::decode(&*op.data) .unwrap() - .ops - .into_iter() - .map(|op| { - ::decode( - &*op.data, - ) - .unwrap() - }) - .collect::>(), }) - .unwrap(), - )) + .collect::>(); + + let proof = + MerkleProof::try_from(protos::ibc::core::commitment::v1::MerkleProof { proofs }) + .unwrap(); + + let proof_type = if proof + .proofs + .iter() + .any(|p| matches!(&p, CommitmentProof::Nonexist(_))) + { + ProofType::NonMembership + } else { + ProofType::Membership + }; + + Ok((into_value(proof), proof_type)) } } diff --git a/voyager/modules/proof/ethereum/src/main.rs b/voyager/modules/proof/ethereum/src/main.rs index fcef96a6e8..9d25b8bf4f 100644 --- a/voyager/modules/proof/ethereum/src/main.rs +++ b/voyager/modules/proof/ethereum/src/main.rs @@ -24,6 +24,7 @@ use voyager_message::{ core::ChainId, into_value, module::{ProofModuleInfo, ProofModuleServer}, + rpc::ProofType, ProofModule, }; use voyager_vm::BoxDynError; @@ -85,7 +86,7 @@ impl ProofModuleServer for Module { _: &Extensions, at: Height, path: StorePath, - ) -> RpcResult { + ) -> RpcResult<(Value, ProofType)> { let location = ibc_commitment_key(path.key()); debug!( @@ -124,6 +125,12 @@ impl ProofModuleServer for Module { proof: proof.proof.into_iter().map(|bytes| bytes.into()).collect(), }; - Ok(into_value(proof)) + let proof_type = if proof.value == U256::ZERO { + ProofType::NonMembership + } else { + ProofType::Membership + }; + + Ok((into_value(proof), proof_type)) } } diff --git a/voyager/modules/proof/movement/src/main.rs b/voyager/modules/proof/movement/src/main.rs index 9b77ec6a9c..4fa301adcc 100644 --- a/voyager/modules/proof/movement/src/main.rs +++ b/voyager/modules/proof/movement/src/main.rs @@ -25,6 +25,7 @@ use voyager_message::{ core::ChainId, into_value, module::{ProofModuleInfo, ProofModuleServer}, + rpc::ProofType, ProofModule, }; use voyager_vm::BoxDynError; @@ -116,7 +117,7 @@ impl ProofModuleServer for Module { _: &Extensions, at: Height, _path: StorePath, - ) -> RpcResult { + ) -> RpcResult<(Value, ProofType)> { let ledger_version = self.ledger_version_of_height(at.height()).await; let vault_addr = self @@ -150,13 +151,17 @@ impl ProofModuleServer for Module { // at.revision_height, // ).await; - Ok(into_value(StorageProof { - state_value: None, - proof: SparseMerkleProof { - leaf: None, - siblings: Vec::new(), - }, - })) + Ok(( + into_value(StorageProof { + state_value: None, + proof: SparseMerkleProof { + leaf: None, + siblings: Vec::new(), + }, + }), + // TODO: Implement properly, see above + ProofType::Membership, + )) } } diff --git a/voyager/plugins/client-update/state-lens/src/main.rs b/voyager/plugins/client-update/state-lens/src/main.rs index 42ea903d37..c2ce588cf5 100644 --- a/voyager/plugins/client-update/state-lens/src/main.rs +++ b/voyager/plugins/client-update/state-lens/src/main.rs @@ -22,7 +22,7 @@ use voyager_message::{ hook::UpdateHook, into_value, module::{PluginInfo, PluginServer}, - rpc::missing_state, + rpc::{missing_state, ProofType}, DefaultCmd, ExtensionsExt, Plugin, PluginMessage, RawClientId, VoyagerClient, VoyagerMessage, FATAL_JSONRPC_ERROR_CODE, }; @@ -264,7 +264,7 @@ impl PluginServer for Module { Ok(continuation) } None => { - info!("consensus state does not exist, queueing update for l2 client"); + info!("consensus state does not exist, queuing update for l2 client"); Ok(conc([ // update the L2 client on L1 and then dispatch the continuation promise( @@ -402,15 +402,23 @@ impl PluginServer for Module { QueryHeight::Specific(l1_latest_height), l2_consensus_state_path, ) - .await? - .proof; + .await?; + + if l2_consensus_state_proof.proof_type != ProofType::Membership { + return Err(ErrorObject::owned( + FATAL_JSONRPC_ERROR_CODE, + "proof of the l2 consensus state must be a membership proof", + None::<()>, + )); + } + debug!(?l2_consensus_state_proof); let l2_consensus_state_proof_bytes = voyager_client .encode_proof::( l1_client_info.client_type.clone(), state_lens_client_info.ibc_interface, - l2_consensus_state_proof, + l2_consensus_state_proof.proof, ) .await?; diff --git a/voyager/plugins/event-source/ethereum/src/main.rs b/voyager/plugins/event-source/ethereum/src/main.rs index 218dd902c9..e3605684dd 100644 --- a/voyager/plugins/event-source/ethereum/src/main.rs +++ b/voyager/plugins/event-source/ethereum/src/main.rs @@ -26,8 +26,12 @@ use jsonrpsee::{ Extensions, }; use serde::{Deserialize, Serialize}; -use tracing::{debug, info, instrument, trace, warn}; -use unionlabs::{ibc::core::client::height::Height, primitives::H160, ErrorReporter}; +use tracing::{debug, info, info_span, instrument, trace, warn}; +use unionlabs::{ + ibc::core::client::height::Height, + primitives::{H160, H256}, + ErrorReporter, +}; use voyager_message::{ call::Call, core::{ChainId, ClientInfo, IbcSpec, QueryHeight}, @@ -265,811 +269,811 @@ impl PluginServer for Module { tx_hash, event, }) => { - let provable_height = Height::new(block_number); - let voyager_client = e.try_get::()?; - - match event { - IbcEvents::CreateClient(raw_event) => { - let client_info = voyager_client - .client_info::(self.chain_id.clone(), raw_event.client_id) - .await?; - - let event = CreateClient { - client_id: raw_event.client_id, - client_type: client_info.client_type.clone(), - } - .into(); - - ibc_union_spec::log_event(&event, &self.chain_id); - - Ok(data(ChainEvent { - chain_id: self.chain_id.clone(), - client_info: client_info.clone(), - counterparty_chain_id: ChainId::new(raw_event.counterparty_chain_id), - tx_hash, - provable_height, - ibc_spec_id: IbcUnion::ID, - event: into_value::(event), - })) - } - IbcEvents::RegisterClient(raw_event) => { - info!(?raw_event, "observed RegisterClient event"); + self.make_full_event(e.try_get::()?, block_number, tx_hash, event) + .await + } + ModuleCall::FetchGetLogs(FetchGetLogs { + block_number, + up_to, + }) => { + self.fetch_get_logs(e.try_get::()?, block_number, up_to) + .await + } + } + } +} + +impl Module { + #[instrument(skip_all, fields(block_number, ?up_to))] + async fn fetch_get_logs( + &self, + voyager_client: &VoyagerClient, + block_number: u64, + up_to: Option, + ) -> RpcResult> { + if up_to.is_some_and(|up_to| up_to < block_number) { + return Err(ErrorObject::owned( + FATAL_JSONRPC_ERROR_CODE, + "`up_to` must be either > `block_number` or null", + None::<()>, + )); + } - Ok(noop()) + let latest_height = voyager_client + .query_latest_height(self.chain_id.clone(), true) + .await?; + + if latest_height.height() < block_number { + debug!(block_number, "block is not yet finalized"); + + return Ok(seq([ + defer(now() + 1), + call(Call::Plugin(PluginMessage::new( + self.plugin_name(), + ModuleCall::from(FetchGetLogs { + block_number, + up_to, + }), + ))), + ])); + } + + debug!("fetching logs in execution block"); + + let logs = self + .provider + .get_logs( + &Filter::new() + .address(alloy::primitives::Address::from( + self.ibc_handler_address.get(), + )) + .from_block(block_number) + .to_block(block_number), + ) + .await + .map_err(|e| { + ErrorObject::owned( + -1, + format!( + "error fetching logs in block {block_number}: {}", + ErrorReporter(e) + ), + None::<()>, + ) + })?; + + info!("found {} logs", logs.len()); + + let events = logs.into_iter().flat_map(|log| { + let tx_hash = log + .transaction_hash + .expect("log should have transaction_hash") + .into(); + + info_span!("tx_hash", %tx_hash).in_scope(|| { + match Ibc::IbcEvents::decode_log(&log.inner, true) { + Ok(event) => { + trace!(?event, "found IbcHandler event"); + + Some(call(PluginMessage::new( + self.plugin_name(), + ModuleCall::from(MakeFullEvent { + block_number, + tx_hash, + event: match event.data { + Ibc::IbcEvents::RegisterClient(client_registered) => { + IbcEvents::RegisterClient(client_registered) + } + Ibc::IbcEvents::CreateClient(client_created) => { + IbcEvents::CreateClient(client_created) + } + Ibc::IbcEvents::UpdateClient(client_updated) => { + IbcEvents::UpdateClient(client_updated) + } + Ibc::IbcEvents::ConnectionOpenInit(connection_open_init) => { + IbcEvents::ConnectionOpenInit(connection_open_init) + } + Ibc::IbcEvents::ConnectionOpenTry(connection_open_try) => { + IbcEvents::ConnectionOpenTry(connection_open_try) + } + Ibc::IbcEvents::ConnectionOpenAck(connection_open_ack) => { + IbcEvents::ConnectionOpenAck(connection_open_ack) + } + Ibc::IbcEvents::ConnectionOpenConfirm( + connection_open_confirm, + ) => IbcEvents::ConnectionOpenConfirm(connection_open_confirm), + Ibc::IbcEvents::ChannelOpenInit(channel_open_init) => { + IbcEvents::ChannelOpenInit(channel_open_init) + } + Ibc::IbcEvents::ChannelOpenTry(channel_open_try) => { + IbcEvents::ChannelOpenTry(channel_open_try) + } + Ibc::IbcEvents::ChannelOpenAck(channel_open_ack) => { + IbcEvents::ChannelOpenAck(channel_open_ack) + } + Ibc::IbcEvents::ChannelOpenConfirm(channel_open_confirm) => { + IbcEvents::ChannelOpenConfirm(channel_open_confirm) + } + Ibc::IbcEvents::ChannelCloseInit(channel_close_init) => { + IbcEvents::ChannelCloseInit(channel_close_init) + } + Ibc::IbcEvents::ChannelCloseConfirm(channel_close_confirm) => { + IbcEvents::ChannelCloseConfirm(channel_close_confirm) + } + Ibc::IbcEvents::PacketSend(packet_send) => { + IbcEvents::PacketSend(packet_send) + } + Ibc::IbcEvents::PacketRecv(packet_recv) => { + IbcEvents::PacketRecv(packet_recv) + } + Ibc::IbcEvents::IntentPacketRecv(intent_packet_recv) => { + IbcEvents::IntentPacketRecv(intent_packet_recv) + } + Ibc::IbcEvents::WriteAck(write_acknowledgement) => { + IbcEvents::WriteAck(write_acknowledgement) + } + Ibc::IbcEvents::PacketAck(acknowledge_packet) => { + IbcEvents::PacketAck(acknowledge_packet) + } + Ibc::IbcEvents::PacketTimeout(timeout_packet) => { + IbcEvents::PacketTimeout(timeout_packet) + } + }, + }), + ))) } - IbcEvents::UpdateClient(raw_event) => { - let client_info = voyager_client - .client_info::(self.chain_id.clone(), raw_event.client_id) - .await?; - - let client_meta = voyager_client - .client_meta::( - self.chain_id.clone(), - provable_height.into(), - raw_event.client_id, - ) - .await?; - - let event = UpdateClient { - client_type: client_info.client_type.clone(), - client_id: raw_event.client_id, - height: raw_event.height, - } - .into(); - - ibc_union_spec::log_event(&event, &self.chain_id); - - Ok(data(ChainEvent { - chain_id: self.chain_id.clone(), - client_info: client_info.clone(), - counterparty_chain_id: client_meta.chain_id, - tx_hash, - provable_height, - ibc_spec_id: IbcUnion::ID, - event: into_value::(event), - })) + Err(e) => { + warn!( + ?log, + "could not decode IbcHandler event: {}", + ErrorReporter(e) + ); + None } + } + }) + }); - IbcEvents::ConnectionOpenInit(raw_event) => { - let client_info = voyager_client - .client_info::(self.chain_id.clone(), raw_event.client_id) - .await?; - - let client_meta = voyager_client - .client_meta::( - self.chain_id.clone(), - provable_height.into(), - raw_event.client_id, - ) - .await?; - - let event = ConnectionOpenInit { - client_id: raw_event.client_id, - connection_id: raw_event.connection_id, - counterparty_client_id: raw_event.counterparty_client_id, - } - .into(); - - ibc_union_spec::log_event(&event, &self.chain_id); - - Ok(data(ChainEvent { - chain_id: self.chain_id.clone(), - client_info, - counterparty_chain_id: client_meta.chain_id, - tx_hash, - ibc_spec_id: IbcUnion::ID, - provable_height, - event: into_value::(event), - })) - } - IbcEvents::ConnectionOpenTry(raw_event) => { - let client_info = voyager_client - .client_info::(self.chain_id.clone(), raw_event.client_id) - .await?; - - let client_meta = voyager_client - .client_meta::( - self.chain_id.clone(), - provable_height.into(), - raw_event.client_id, - ) - .await?; - - let event = ConnectionOpenTry { - client_id: raw_event.client_id, + let next_fetch = match up_to { + Some(up_to) => { + if up_to > block_number { + Some(call(Call::Plugin(PluginMessage::new( + self.plugin_name(), + ModuleCall::from(FetchGetLogs { + block_number: block_number + 1, + up_to: Some(up_to), + }), + )))) + } else { + None + } + } + None => Some(call(Call::Plugin(PluginMessage::new( + self.plugin_name(), + ModuleCall::from(FetchGetLogs { + block_number: block_number + 1, + up_to: None, + }), + )))), + }; + + Ok(conc(next_fetch.into_iter().chain(events))) + } + + #[instrument(skip_all, fields(block_number, %tx_hash))] + async fn make_full_event( + &self, + voyager_client: &VoyagerClient, + block_number: u64, + tx_hash: H256, + event: IbcEvents, + ) -> RpcResult> { + trace!(?event, "raw event"); + + let provable_height = Height::new(block_number); + + match event { + IbcEvents::CreateClient(raw_event) => { + let client_info = voyager_client + .client_info::(self.chain_id.clone(), raw_event.client_id) + .await?; + + let event = CreateClient { + client_id: raw_event.client_id, + client_type: client_info.client_type.clone(), + } + .into(); + + ibc_union_spec::log_event(&event, &self.chain_id); + + Ok(data(ChainEvent { + chain_id: self.chain_id.clone(), + client_info: client_info.clone(), + counterparty_chain_id: ChainId::new(raw_event.counterparty_chain_id), + tx_hash, + provable_height, + ibc_spec_id: IbcUnion::ID, + event: into_value::(event), + })) + } + IbcEvents::RegisterClient(raw_event) => { + info!(?raw_event, "observed RegisterClient event"); + + Ok(noop()) + } + IbcEvents::UpdateClient(raw_event) => { + let client_info = voyager_client + .client_info::(self.chain_id.clone(), raw_event.client_id) + .await?; + + let client_meta = voyager_client + .client_meta::( + self.chain_id.clone(), + provable_height.into(), + raw_event.client_id, + ) + .await?; + + let event = UpdateClient { + client_type: client_info.client_type.clone(), + client_id: raw_event.client_id, + height: raw_event.height, + } + .into(); + + ibc_union_spec::log_event(&event, &self.chain_id); + + Ok(data(ChainEvent { + chain_id: self.chain_id.clone(), + client_info: client_info.clone(), + counterparty_chain_id: client_meta.chain_id, + tx_hash, + provable_height, + ibc_spec_id: IbcUnion::ID, + event: into_value::(event), + })) + } + + IbcEvents::ConnectionOpenInit(raw_event) => { + let client_info = voyager_client + .client_info::(self.chain_id.clone(), raw_event.client_id) + .await?; + + let client_meta = voyager_client + .client_meta::( + self.chain_id.clone(), + provable_height.into(), + raw_event.client_id, + ) + .await?; + + let event = ConnectionOpenInit { + client_id: raw_event.client_id, + connection_id: raw_event.connection_id, + counterparty_client_id: raw_event.counterparty_client_id, + } + .into(); + + ibc_union_spec::log_event(&event, &self.chain_id); + + Ok(data(ChainEvent { + chain_id: self.chain_id.clone(), + client_info, + counterparty_chain_id: client_meta.chain_id, + tx_hash, + ibc_spec_id: IbcUnion::ID, + provable_height, + event: into_value::(event), + })) + } + IbcEvents::ConnectionOpenTry(raw_event) => { + let client_info = voyager_client + .client_info::(self.chain_id.clone(), raw_event.client_id) + .await?; + + let client_meta = voyager_client + .client_meta::( + self.chain_id.clone(), + provable_height.into(), + raw_event.client_id, + ) + .await?; + + let event = ConnectionOpenTry { + client_id: raw_event.client_id, + connection_id: raw_event.connection_id, + counterparty_client_id: raw_event.counterparty_client_id, + counterparty_connection_id: raw_event.counterparty_connection_id, + } + .into(); + + ibc_union_spec::log_event(&event, &self.chain_id); + + Ok(data(ChainEvent { + chain_id: self.chain_id.clone(), + client_info, + counterparty_chain_id: client_meta.chain_id, + tx_hash, + provable_height, + ibc_spec_id: IbcUnion::ID, + event: into_value::(event), + })) + } + IbcEvents::ConnectionOpenAck(raw_event) => { + let client_info = voyager_client + .client_info::(self.chain_id.clone(), raw_event.client_id) + .await?; + + let client_meta = voyager_client + .client_meta::( + self.chain_id.clone(), + provable_height.into(), + raw_event.client_id, + ) + .await?; + + let event = ConnectionOpenAck { + client_id: raw_event.client_id, + connection_id: raw_event.connection_id, + counterparty_client_id: raw_event.counterparty_client_id, + counterparty_connection_id: raw_event.counterparty_connection_id, + } + .into(); + + ibc_union_spec::log_event(&event, &self.chain_id); + + Ok(data(ChainEvent { + chain_id: self.chain_id.clone(), + client_info, + counterparty_chain_id: client_meta.chain_id, + tx_hash, + provable_height, + ibc_spec_id: IbcUnion::ID, + event: into_value::(event), + })) + } + IbcEvents::ConnectionOpenConfirm(raw_event) => { + let client_info = voyager_client + .client_info::(self.chain_id.clone(), raw_event.client_id) + .await?; + + let client_meta = voyager_client + .client_meta::( + self.chain_id.clone(), + provable_height.into(), + raw_event.client_id, + ) + .await?; + + let event = ConnectionOpenConfirm { + client_id: raw_event.client_id, + connection_id: raw_event.connection_id, + counterparty_client_id: raw_event.counterparty_client_id, + counterparty_connection_id: raw_event.counterparty_connection_id, + } + .into(); + + ibc_union_spec::log_event(&event, &self.chain_id); + + Ok(data(ChainEvent { + chain_id: self.chain_id.clone(), + client_info, + counterparty_chain_id: client_meta.chain_id, + tx_hash, + provable_height, + ibc_spec_id: IbcUnion::ID, + event: into_value::(event), + })) + } + IbcEvents::ChannelOpenInit(raw_event) => { + let connection = voyager_client + .query_ibc_state( + self.chain_id.clone(), + provable_height.into(), + ConnectionPath { connection_id: raw_event.connection_id, - counterparty_client_id: raw_event.counterparty_client_id, - counterparty_connection_id: raw_event.counterparty_connection_id, - } - .into(); - - ibc_union_spec::log_event(&event, &self.chain_id); - - Ok(data(ChainEvent { - chain_id: self.chain_id.clone(), - client_info, - counterparty_chain_id: client_meta.chain_id, - tx_hash, - provable_height, - ibc_spec_id: IbcUnion::ID, - event: into_value::(event), - })) - } - IbcEvents::ConnectionOpenAck(raw_event) => { - let client_info = voyager_client - .client_info::(self.chain_id.clone(), raw_event.client_id) - .await?; - - let client_meta = voyager_client - .client_meta::( - self.chain_id.clone(), - provable_height.into(), - raw_event.client_id, - ) - .await?; - - let event = ConnectionOpenAck { - client_id: raw_event.client_id, + }, + ) + .await? + .state + .ok_or_else(missing_state("connection must exist", None))?; + + let client_info = voyager_client + .client_info::(self.chain_id.clone(), connection.client_id) + .await?; + + let client_meta = voyager_client + .client_meta::( + self.chain_id.clone(), + provable_height.into(), + connection.client_id, + ) + .await?; + + let channel_id = raw_event.channel_id; + + let channel = voyager_client + .query_ibc_state( + self.chain_id.clone(), + provable_height.into(), + ChannelPath { channel_id }, + ) + .await? + .state + .ok_or_else(missing_state("connection must exist", None))?; + + let event = ChannelOpenInit { + port_id: raw_event.port_id.into(), + channel_id, + counterparty_port_id: raw_event.counterparty_port_id.into(), + connection, + version: channel.version, + } + .into(); + + ibc_union_spec::log_event(&event, &self.chain_id); + + Ok(data(ChainEvent { + chain_id: self.chain_id.clone(), + client_info, + counterparty_chain_id: client_meta.chain_id, + tx_hash, + provable_height, + ibc_spec_id: IbcUnion::ID, + event: into_value::(event), + })) + } + IbcEvents::ChannelOpenTry(raw_event) => { + let connection = voyager_client + .query_ibc_state( + self.chain_id.clone(), + provable_height.into(), + ConnectionPath { connection_id: raw_event.connection_id, - counterparty_client_id: raw_event.counterparty_client_id, - counterparty_connection_id: raw_event.counterparty_connection_id, - } - .into(); - - ibc_union_spec::log_event(&event, &self.chain_id); - - Ok(data(ChainEvent { - chain_id: self.chain_id.clone(), - client_info, - counterparty_chain_id: client_meta.chain_id, - tx_hash, - provable_height, - ibc_spec_id: IbcUnion::ID, - event: into_value::(event), - })) - } - IbcEvents::ConnectionOpenConfirm(raw_event) => { - let client_info = voyager_client - .client_info::(self.chain_id.clone(), raw_event.client_id) - .await?; - - let client_meta = voyager_client - .client_meta::( - self.chain_id.clone(), - provable_height.into(), - raw_event.client_id, - ) - .await?; - - let event = ConnectionOpenConfirm { - client_id: raw_event.client_id, + }, + ) + .await? + .state + .ok_or_else(missing_state("connection must exist", None))?; + + let client_info = voyager_client + .client_info::(self.chain_id.clone(), connection.client_id) + .await?; + + let client_meta = voyager_client + .client_meta::( + self.chain_id.clone(), + provable_height.into(), + connection.client_id, + ) + .await?; + + let channel_id = raw_event.channel_id; + + let channel = voyager_client + .query_ibc_state( + self.chain_id.clone(), + provable_height.into(), + ChannelPath { channel_id }, + ) + .await? + .state + .ok_or_else(missing_state("channel must exist", None))?; + + let event = ChannelOpenTry { + port_id: raw_event.port_id.into(), + channel_id, + counterparty_port_id: raw_event.counterparty_port_id.into(), + counterparty_channel_id: raw_event.counterparty_channel_id, + connection, + version: channel.version, + } + .into(); + + ibc_union_spec::log_event(&event, &self.chain_id); + + Ok(data(ChainEvent { + chain_id: self.chain_id.clone(), + client_info, + counterparty_chain_id: client_meta.chain_id, + tx_hash, + provable_height, + ibc_spec_id: IbcUnion::ID, + event: into_value::(event), + })) + } + IbcEvents::ChannelOpenAck(raw_event) => { + let connection = voyager_client + .query_ibc_state( + self.chain_id.clone(), + provable_height.into(), + ConnectionPath { connection_id: raw_event.connection_id, - counterparty_client_id: raw_event.counterparty_client_id, - counterparty_connection_id: raw_event.counterparty_connection_id, - } - .into(); - - ibc_union_spec::log_event(&event, &self.chain_id); - - Ok(data(ChainEvent { - chain_id: self.chain_id.clone(), - client_info, - counterparty_chain_id: client_meta.chain_id, - tx_hash, - provable_height, - ibc_spec_id: IbcUnion::ID, - event: into_value::(event), - })) - } - IbcEvents::ChannelOpenInit(raw_event) => { - let connection = voyager_client - .query_ibc_state( - self.chain_id.clone(), - provable_height.into(), - ConnectionPath { - connection_id: raw_event.connection_id, - }, - ) - .await? - .state - .ok_or_else(missing_state("connection must exist", None))?; - - let client_info = voyager_client - .client_info::(self.chain_id.clone(), connection.client_id) - .await?; - - let client_meta = voyager_client - .client_meta::( - self.chain_id.clone(), - provable_height.into(), - connection.client_id, - ) - .await?; - - let channel_id = raw_event.channel_id; - - let channel = voyager_client - .query_ibc_state( - self.chain_id.clone(), - provable_height.into(), - ChannelPath { channel_id }, - ) - .await? - .state - .ok_or_else(missing_state("connection must exist", None))?; - - let event = ChannelOpenInit { - port_id: raw_event.port_id.into(), - channel_id, - counterparty_port_id: raw_event.counterparty_port_id.into(), - connection, - version: channel.version, - } - .into(); - - ibc_union_spec::log_event(&event, &self.chain_id); - - Ok(data(ChainEvent { - chain_id: self.chain_id.clone(), - client_info, - counterparty_chain_id: client_meta.chain_id, - tx_hash, - provable_height, - ibc_spec_id: IbcUnion::ID, - event: into_value::(event), - })) - } - IbcEvents::ChannelOpenTry(raw_event) => { - let connection = voyager_client - .query_ibc_state( - self.chain_id.clone(), - provable_height.into(), - ConnectionPath { - connection_id: raw_event.connection_id, - }, - ) - .await? - .state - .ok_or_else(missing_state("connection must exist", None))?; - - let client_info = voyager_client - .client_info::(self.chain_id.clone(), connection.client_id) - .await?; - - let client_meta = voyager_client - .client_meta::( - self.chain_id.clone(), - provable_height.into(), - connection.client_id, - ) - .await?; - - let channel_id = raw_event.channel_id; - - let channel = voyager_client - .query_ibc_state( - self.chain_id.clone(), - provable_height.into(), - ChannelPath { channel_id }, - ) - .await? - .state - .ok_or_else(missing_state("channel must exist", None))?; - - let event = ChannelOpenTry { - port_id: raw_event.port_id.into(), - channel_id, - counterparty_port_id: raw_event.counterparty_port_id.into(), - counterparty_channel_id: raw_event.counterparty_channel_id, - connection, - version: channel.version, - } - .into(); - - ibc_union_spec::log_event(&event, &self.chain_id); - - Ok(data(ChainEvent { - chain_id: self.chain_id.clone(), - client_info, - counterparty_chain_id: client_meta.chain_id, - tx_hash, - provable_height, - ibc_spec_id: IbcUnion::ID, - event: into_value::(event), - })) - } - IbcEvents::ChannelOpenAck(raw_event) => { - let connection = voyager_client - .query_ibc_state( - self.chain_id.clone(), - provable_height.into(), - ConnectionPath { - connection_id: raw_event.connection_id, - }, - ) - .await? - .state - .ok_or_else(missing_state("connection must exist", None))?; - - let client_info = voyager_client - .client_info::(self.chain_id.clone(), connection.client_id) - .await?; - - let client_meta = voyager_client - .client_meta::( - self.chain_id.clone(), - provable_height.into(), - connection.client_id, - ) - .await?; - - let channel_id = raw_event.channel_id; - - let channel = voyager_client - .query_ibc_state( - self.chain_id.clone(), - provable_height.into(), - ChannelPath { channel_id }, - ) - .await? - .state - .ok_or_else(missing_state("channel must exist", None))?; - - let event = ChannelOpenAck { - port_id: raw_event.port_id.into(), - channel_id, - counterparty_port_id: raw_event.counterparty_port_id.into(), - counterparty_channel_id: raw_event.counterparty_channel_id, - connection, - version: channel.version, - } - .into(); - - ibc_union_spec::log_event(&event, &self.chain_id); - - Ok(data(ChainEvent { - chain_id: self.chain_id.clone(), - client_info, - counterparty_chain_id: client_meta.chain_id, - tx_hash, - provable_height, - ibc_spec_id: IbcUnion::ID, - event: into_value::(event), - })) - } - IbcEvents::ChannelOpenConfirm(raw_event) => { - let connection = voyager_client - .query_ibc_state( - self.chain_id.clone(), - provable_height.into(), - ConnectionPath { - connection_id: raw_event.connection_id, - }, - ) - .await? - .state - .ok_or_else(missing_state("connection must exist", None))?; - - let client_info = voyager_client - .client_info::(self.chain_id.clone(), connection.client_id) - .await?; - - let client_meta = voyager_client - .client_meta::( - self.chain_id.clone(), - provable_height.into(), - connection.client_id, - ) - .await?; - - let channel_id = raw_event.channel_id; - - let channel = voyager_client - .query_ibc_state( - self.chain_id.clone(), - provable_height.into(), - ChannelPath { channel_id }, - ) - .await? - .state - .ok_or_else(missing_state("channel must exist", None))?; - - let event = ChannelOpenConfirm { - port_id: raw_event.port_id.into(), - channel_id, - counterparty_port_id: channel.counterparty_port_id, - counterparty_channel_id: channel.counterparty_channel_id, - connection, - version: channel.version, - } - .into(); - - ibc_union_spec::log_event(&event, &self.chain_id); - - Ok(data(ChainEvent { - chain_id: self.chain_id.clone(), - client_info, - counterparty_chain_id: client_meta.chain_id, - tx_hash, - provable_height, - ibc_spec_id: IbcUnion::ID, - event: into_value::(event), - })) - } + }, + ) + .await? + .state + .ok_or_else(missing_state("connection must exist", None))?; - IbcEvents::ChannelCloseInit(_) | IbcEvents::ChannelCloseConfirm(_) => { - warn!("observed channel close message, these are not handled currently"); + let client_info = voyager_client + .client_info::(self.chain_id.clone(), connection.client_id) + .await?; - Ok(noop()) - } + let client_meta = voyager_client + .client_meta::( + self.chain_id.clone(), + provable_height.into(), + connection.client_id, + ) + .await?; - // packet origin is this chain - IbcEvents::PacketSend(event) => { - let ( - counterparty_chain_id, - client_info, - source_channel, - destination_channel, - ) = self - .make_packet_metadata( - provable_height, - event.packet.source_channel_id, - e.try_get()?, - ) - .await?; - - let event = PacketSend { - packet_data: event.packet.data.to_vec().into(), - packet: PacketMetadata { - source_channel, - destination_channel, - timeout_height: event.packet.timeout_height, - timeout_timestamp: event.packet.timeout_timestamp, - }, - } - .into(); - - ibc_union_spec::log_event(&event, &self.chain_id); - - Ok(data(ChainEvent { - chain_id: self.chain_id.clone(), - client_info, - counterparty_chain_id, - tx_hash, - provable_height, - ibc_spec_id: IbcUnion::ID, - event: into_value::(event), - })) - } - IbcEvents::PacketTimeout(event) => { - let ( - counterparty_chain_id, - client_info, - source_channel, - destination_channel, - ) = self - .make_packet_metadata( - provable_height, - event.packet.source_channel_id, - e.try_get()?, - ) - .await?; - - let event = PacketTimeout { - packet: PacketMetadata { - source_channel, - destination_channel, - timeout_height: event.packet.timeout_height, - timeout_timestamp: event.packet.timeout_timestamp, - }, - packet_data: event.packet.data.into(), - } - .into(); - - ibc_union_spec::log_event(&event, &self.chain_id); - - Ok(data(ChainEvent { - chain_id: self.chain_id.clone(), - client_info, - counterparty_chain_id, - tx_hash, - provable_height, - ibc_spec_id: IbcUnion::ID, - event: into_value::(event), - })) - } - IbcEvents::PacketAck(event) => { - let ( - counterparty_chain_id, - client_info, - source_channel, - destination_channel, - ) = self - .make_packet_metadata( - provable_height, - event.packet.source_channel_id, - e.try_get()?, - ) - .await?; - - let event = PacketAck { - packet: PacketMetadata { - source_channel, - destination_channel, - timeout_height: event.packet.timeout_height, - timeout_timestamp: event.packet.timeout_timestamp, - }, - packet_data: event.packet.data.into(), - acknowledgement: event.acknowledgement.into(), - } - .into(); - - ibc_union_spec::log_event(&event, &self.chain_id); - - Ok(data(ChainEvent { - chain_id: self.chain_id.clone(), - client_info, - counterparty_chain_id, - tx_hash, - provable_height, - ibc_spec_id: IbcUnion::ID, - event: into_value::(event), - })) - } - // packet origin is the counterparty chain - IbcEvents::WriteAck(event) => { - let ( - counterparty_chain_id, - client_info, - destination_channel, - source_channel, - ) = self - .make_packet_metadata( - provable_height, - event.packet.destination_channel_id, - e.try_get()?, - ) - .await?; - - let event = WriteAck { - packet_data: event.packet.data.to_vec().into(), - acknowledgement: event.acknowledgement.to_vec().into(), - packet: PacketMetadata { - source_channel, - destination_channel, - timeout_height: event.packet.timeout_height, - timeout_timestamp: event.packet.timeout_timestamp, - }, - } - .into(); - - ibc_union_spec::log_event(&event, &self.chain_id); - - Ok(data(ChainEvent { - chain_id: self.chain_id.clone(), - client_info, - counterparty_chain_id, - tx_hash, - provable_height, - ibc_spec_id: IbcUnion::ID, - event: into_value::(event), - })) - } - IbcEvents::PacketRecv(event) => { - let ( - counterparty_chain_id, - client_info, - destination_channel, - source_channel, - ) = self - .make_packet_metadata( - provable_height, - event.packet.destination_channel_id, - e.try_get()?, - ) - .await?; - - let event = PacketRecv { - packet_data: event.packet.data.to_vec().into(), - packet: PacketMetadata { - source_channel, - destination_channel, - timeout_height: event.packet.timeout_height, - timeout_timestamp: event.packet.timeout_timestamp, - }, - maker_msg: event.maker_msg.into(), - } - .into(); - - ibc_union_spec::log_event(&event, &self.chain_id); - - Ok(data(ChainEvent { - chain_id: self.chain_id.clone(), - client_info, - counterparty_chain_id, - tx_hash, - provable_height, - ibc_spec_id: IbcUnion::ID, - event: into_value::(event), - })) - } - IbcEvents::IntentPacketRecv(_event) => { - todo!() - } + let channel_id = raw_event.channel_id; + + let channel = voyager_client + .query_ibc_state( + self.chain_id.clone(), + provable_height.into(), + ChannelPath { channel_id }, + ) + .await? + .state + .ok_or_else(missing_state("channel must exist", None))?; + + let event = ChannelOpenAck { + port_id: raw_event.port_id.into(), + channel_id, + counterparty_port_id: raw_event.counterparty_port_id.into(), + counterparty_channel_id: raw_event.counterparty_channel_id, + connection, + version: channel.version, } + .into(); + + ibc_union_spec::log_event(&event, &self.chain_id); + + Ok(data(ChainEvent { + chain_id: self.chain_id.clone(), + client_info, + counterparty_chain_id: client_meta.chain_id, + tx_hash, + provable_height, + ibc_spec_id: IbcUnion::ID, + event: into_value::(event), + })) } - ModuleCall::FetchGetLogs(FetchGetLogs { - block_number, - up_to, - }) => { - if up_to.is_some_and(|up_to| up_to < block_number) { - return Err(ErrorObject::owned( - FATAL_JSONRPC_ERROR_CODE, - "`up_to` must be either > `block_number` or null", - None::<()>, - )); + IbcEvents::ChannelOpenConfirm(raw_event) => { + let connection = voyager_client + .query_ibc_state( + self.chain_id.clone(), + provable_height.into(), + ConnectionPath { + connection_id: raw_event.connection_id, + }, + ) + .await? + .state + .ok_or_else(missing_state("connection must exist", None))?; + + let client_info = voyager_client + .client_info::(self.chain_id.clone(), connection.client_id) + .await?; + + let client_meta = voyager_client + .client_meta::( + self.chain_id.clone(), + provable_height.into(), + connection.client_id, + ) + .await?; + + let channel_id = raw_event.channel_id; + + let channel = voyager_client + .query_ibc_state( + self.chain_id.clone(), + provable_height.into(), + ChannelPath { channel_id }, + ) + .await? + .state + .ok_or_else(missing_state("channel must exist", None))?; + + let event = ChannelOpenConfirm { + port_id: raw_event.port_id.into(), + channel_id, + counterparty_port_id: channel.counterparty_port_id, + counterparty_channel_id: channel.counterparty_channel_id, + connection, + version: channel.version, } + .into(); + + ibc_union_spec::log_event(&event, &self.chain_id); + + Ok(data(ChainEvent { + chain_id: self.chain_id.clone(), + client_info, + counterparty_chain_id: client_meta.chain_id, + tx_hash, + provable_height, + ibc_spec_id: IbcUnion::ID, + event: into_value::(event), + })) + } + + IbcEvents::ChannelCloseInit(_) | IbcEvents::ChannelCloseConfirm(_) => { + warn!("observed channel close message, these are not handled currently"); + + Ok(noop()) + } - let latest_height = e - .try_get::()? - .query_latest_height(self.chain_id.clone(), true) + // packet origin is this chain + IbcEvents::PacketSend(event) => { + let (counterparty_chain_id, client_info, source_channel, destination_channel) = + self.make_packet_metadata( + provable_height, + event.packet.source_channel_id, + voyager_client, + ) .await?; - if latest_height.height() < block_number { - debug!(block_number, "block is not yet finalized"); + let event = PacketSend { + packet_data: event.packet.data.to_vec().into(), + packet: PacketMetadata { + source_channel, + destination_channel, + timeout_height: event.packet.timeout_height, + timeout_timestamp: event.packet.timeout_timestamp, + }, + } + .into(); + + ibc_union_spec::log_event(&event, &self.chain_id); + + Ok(data(ChainEvent { + chain_id: self.chain_id.clone(), + client_info, + counterparty_chain_id, + tx_hash, + provable_height, + ibc_spec_id: IbcUnion::ID, + event: into_value::(event), + })) + } + IbcEvents::PacketTimeout(event) => { + let (counterparty_chain_id, client_info, source_channel, destination_channel) = + self.make_packet_metadata( + provable_height, + event.packet.source_channel_id, + voyager_client, + ) + .await?; - return Ok(seq([ - defer(now() + 1), - call(Call::Plugin(PluginMessage::new( - self.plugin_name(), - ModuleCall::from(FetchGetLogs { - block_number, - up_to, - }), - ))), - ])); + let event = PacketTimeout { + packet: PacketMetadata { + source_channel, + destination_channel, + timeout_height: event.packet.timeout_height, + timeout_timestamp: event.packet.timeout_timestamp, + }, + packet_data: event.packet.data.into(), } + .into(); + + ibc_union_spec::log_event(&event, &self.chain_id); + + Ok(data(ChainEvent { + chain_id: self.chain_id.clone(), + client_info, + counterparty_chain_id, + tx_hash, + provable_height, + ibc_spec_id: IbcUnion::ID, + event: into_value::(event), + })) + } + IbcEvents::PacketAck(event) => { + let (counterparty_chain_id, client_info, source_channel, destination_channel) = + self.make_packet_metadata( + provable_height, + event.packet.source_channel_id, + voyager_client, + ) + .await?; - debug!(%block_number, "fetching logs in execution block"); - - let logs = self - .provider - .get_logs( - &Filter::new() - .address(alloy::primitives::Address::from( - self.ibc_handler_address.get(), - )) - .from_block(block_number) - .to_block(block_number), + let event = PacketAck { + packet: PacketMetadata { + source_channel, + destination_channel, + timeout_height: event.packet.timeout_height, + timeout_timestamp: event.packet.timeout_timestamp, + }, + packet_data: event.packet.data.into(), + acknowledgement: event.acknowledgement.into(), + } + .into(); + + ibc_union_spec::log_event(&event, &self.chain_id); + + Ok(data(ChainEvent { + chain_id: self.chain_id.clone(), + client_info, + counterparty_chain_id, + tx_hash, + provable_height, + ibc_spec_id: IbcUnion::ID, + event: into_value::(event), + })) + } + // packet origin is the counterparty chain + IbcEvents::WriteAck(event) => { + let (counterparty_chain_id, client_info, destination_channel, source_channel) = + self.make_packet_metadata( + provable_height, + event.packet.destination_channel_id, + voyager_client, ) - .await - .map_err(|e| { - ErrorObject::owned( - -1, - format!( - "error fetching logs in block {block_number}: {}", - ErrorReporter(e) - ), - None::<()>, - ) - })?; - - info!(%block_number, "found {} logs", logs.len()); - - let events = logs.into_iter().flat_map(|log| { - let tx_hash = log - .transaction_hash - .expect("log should have transaction_hash") - .into(); - - match Ibc::IbcEvents::decode_log(&log.inner, true) { - Ok(event) => { - trace!(?event, "found IbcHandler event"); - - Some(call(PluginMessage::new( - self.plugin_name(), - ModuleCall::from(MakeFullEvent { - block_number, - tx_hash, - event: match event.data { - Ibc::IbcEvents::RegisterClient(client_registered) => { - IbcEvents::RegisterClient(client_registered) - } - Ibc::IbcEvents::CreateClient(client_created) => { - IbcEvents::CreateClient(client_created) - } - Ibc::IbcEvents::UpdateClient(client_updated) => { - IbcEvents::UpdateClient(client_updated) - } - Ibc::IbcEvents::ConnectionOpenInit( - connection_open_init, - ) => IbcEvents::ConnectionOpenInit(connection_open_init), - Ibc::IbcEvents::ConnectionOpenTry(connection_open_try) => { - IbcEvents::ConnectionOpenTry(connection_open_try) - } - Ibc::IbcEvents::ConnectionOpenAck(connection_open_ack) => { - IbcEvents::ConnectionOpenAck(connection_open_ack) - } - Ibc::IbcEvents::ConnectionOpenConfirm( - connection_open_confirm, - ) => IbcEvents::ConnectionOpenConfirm( - connection_open_confirm, - ), - Ibc::IbcEvents::ChannelOpenInit(channel_open_init) => { - IbcEvents::ChannelOpenInit(channel_open_init) - } - Ibc::IbcEvents::ChannelOpenTry(channel_open_try) => { - IbcEvents::ChannelOpenTry(channel_open_try) - } - Ibc::IbcEvents::ChannelOpenAck(channel_open_ack) => { - IbcEvents::ChannelOpenAck(channel_open_ack) - } - Ibc::IbcEvents::ChannelOpenConfirm( - channel_open_confirm, - ) => IbcEvents::ChannelOpenConfirm(channel_open_confirm), - Ibc::IbcEvents::ChannelCloseInit(channel_close_init) => { - IbcEvents::ChannelCloseInit(channel_close_init) - } - Ibc::IbcEvents::ChannelCloseConfirm( - channel_close_confirm, - ) => IbcEvents::ChannelCloseConfirm(channel_close_confirm), - Ibc::IbcEvents::PacketSend(packet_send) => { - IbcEvents::PacketSend(packet_send) - } - Ibc::IbcEvents::PacketRecv(packet_recv) => { - IbcEvents::PacketRecv(packet_recv) - } - Ibc::IbcEvents::IntentPacketRecv(intent_packet_recv) => { - IbcEvents::IntentPacketRecv(intent_packet_recv) - } - Ibc::IbcEvents::WriteAck(write_acknowledgement) => { - IbcEvents::WriteAck(write_acknowledgement) - } - Ibc::IbcEvents::PacketAck(acknowledge_packet) => { - IbcEvents::PacketAck(acknowledge_packet) - } - Ibc::IbcEvents::PacketTimeout(timeout_packet) => { - IbcEvents::PacketTimeout(timeout_packet) - } - }, - }), - ))) - } - Err(e) => { - warn!( - ?log, - "could not decode IbcHandler event: {}", - ErrorReporter(e) - ); - None - } - } - }); - - let next_fetch = match up_to { - Some(up_to) => { - if up_to > block_number { - Some(call(Call::Plugin(PluginMessage::new( - self.plugin_name(), - ModuleCall::from(FetchGetLogs { - block_number: block_number + 1, - up_to: Some(up_to), - }), - )))) - } else { - None - } - } - None => Some(call(Call::Plugin(PluginMessage::new( - self.plugin_name(), - ModuleCall::from(FetchGetLogs { - block_number: block_number + 1, - up_to: None, - }), - )))), - }; + .await?; - Ok(conc(next_fetch.into_iter().chain(events))) + let event = WriteAck { + packet_data: event.packet.data.to_vec().into(), + acknowledgement: event.acknowledgement.to_vec().into(), + packet: PacketMetadata { + source_channel, + destination_channel, + timeout_height: event.packet.timeout_height, + timeout_timestamp: event.packet.timeout_timestamp, + }, + } + .into(); + + ibc_union_spec::log_event(&event, &self.chain_id); + + Ok(data(ChainEvent { + chain_id: self.chain_id.clone(), + client_info, + counterparty_chain_id, + tx_hash, + provable_height, + ibc_spec_id: IbcUnion::ID, + event: into_value::(event), + })) + } + IbcEvents::PacketRecv(event) => { + let (counterparty_chain_id, client_info, destination_channel, source_channel) = + self.make_packet_metadata( + provable_height, + event.packet.destination_channel_id, + voyager_client, + ) + .await?; + + let event = PacketRecv { + packet_data: event.packet.data.to_vec().into(), + packet: PacketMetadata { + source_channel, + destination_channel, + timeout_height: event.packet.timeout_height, + timeout_timestamp: event.packet.timeout_timestamp, + }, + maker_msg: event.maker_msg.into(), + } + .into(); + + ibc_union_spec::log_event(&event, &self.chain_id); + + Ok(data(ChainEvent { + chain_id: self.chain_id.clone(), + client_info, + counterparty_chain_id, + tx_hash, + provable_height, + ibc_spec_id: IbcUnion::ID, + event: into_value::(event), + })) + } + IbcEvents::IntentPacketRecv(_event) => { + todo!() } } } diff --git a/voyager/plugins/transaction/ethereum/src/main.rs b/voyager/plugins/transaction/ethereum/src/main.rs index 25d943bf09..dbb00dd72a 100644 --- a/voyager/plugins/transaction/ethereum/src/main.rs +++ b/voyager/plugins/transaction/ethereum/src/main.rs @@ -362,8 +362,6 @@ impl Module { "submitted batched evm messages" ); - let mut retry_msgs = vec![]; - for (idx, (result, (msg, msg_name))) in result._0.into_iter().zip(msg_names).enumerate() { @@ -392,10 +390,8 @@ impl Module { revert = %result.returnData, well_known = false, data = %serde_json::to_string(&msg).unwrap(), - "evm message failed", + "evm message failed with 0x revert, likely an ABI issue", ); - - retry_msgs.push((true, msg)); } else { error!( msg = %msg_name, @@ -405,22 +401,10 @@ impl Module { data = %serde_json::to_string(&msg).unwrap(), "evm message failed", ); - - retry_msgs.push((false, msg)); } } - // NOTE: An empty iterator returns false - if retry_msgs - .iter() - .any(|(is_empty_revert, _)| *is_empty_revert) - { - Err(TxSubmitError::EmptyRevert( - retry_msgs.into_iter().map(|(_, msg)| msg).collect(), - )) - } else { - Ok(()) - } + Ok(()) } .instrument(info_span!( "evm tx", diff --git a/voyager/src/cli.rs b/voyager/src/cli.rs index 7fdb119a9c..85806a3dc4 100644 --- a/voyager/src/cli.rs +++ b/voyager/src/cli.rs @@ -9,7 +9,7 @@ use voyager_message::{ }; use voyager_vm::{BoxDynError, Op}; -use crate::config::default_rpc_laddr; +use crate::config::{default_rest_laddr, default_rpc_laddr}; #[derive(Debug, Parser)] #[command(arg_required_else_help = true)] @@ -68,6 +68,24 @@ pub enum Command { /// Automatically enqueue the op. #[arg(long, short = 'e', default_value_t = false)] enqueue: bool, + #[arg( + long, + global = true, + default_value_t = format!( + "http://{}", + default_rpc_laddr() + ) + )] + rpc_url: String, + #[arg( + long, + global = true, + default_value_t = format!( + "http://{}", + default_rest_laddr() + ) + )] + rest_url: String, }, /// Run Voyager. Start, @@ -115,6 +133,15 @@ pub enum QueueCmd { Enqueue { #[arg(value_parser(|s: &str| serde_json::from_str::>(s)))] op: Op, + #[arg( + long, + global = true, + default_value_t = format!( + "http://{}", + default_rest_laddr() + ) + )] + rest_url: String, }, // History { @@ -251,6 +278,15 @@ pub enum MsgCmd { /// Automatically enqueue the op. #[arg(long, short = 'e', default_value_t = false)] enqueue: bool, + #[arg( + long, + global = true, + default_value_t = format!( + "http://{}", + default_rest_laddr() + ) + )] + rest_url: String, }, UpdateClient { #[arg(long, value_parser(|s: &str| ok(ChainId::new(s.to_owned()))))] @@ -266,6 +302,15 @@ pub enum MsgCmd { /// Automatically enqueue the op. #[arg(long, short = 'e', default_value_t = false)] enqueue: bool, + #[arg( + long, + global = true, + default_value_t = format!( + "http://{}", + default_rest_laddr() + ) + )] + rest_url: String, }, } diff --git a/voyager/src/main.rs b/voyager/src/main.rs index deb1edf349..1bfab97741 100644 --- a/voyager/src/main.rs +++ b/voyager/src/main.rs @@ -10,8 +10,8 @@ )] use std::{ - collections::HashMap, ffi::OsStr, fmt::Write, fs::read_to_string, iter, net::SocketAddr, - path::PathBuf, process::ExitCode, + collections::HashMap, ffi::OsStr, fmt::Write, fs::read_to_string, iter, path::PathBuf, + process::ExitCode, }; use anyhow::{anyhow, Context as _}; @@ -266,8 +266,8 @@ async fn do_main(args: cli::AppArgs) -> anyhow::Result<()> { }; match cli_msg { - QueueCmd::Enqueue { op } => { - send_enqueue(&get_voyager_config()?.voyager.rest_laddr, op).await?; + QueueCmd::Enqueue { op, rest_url } => { + send_enqueue(&rest_url, op).await?; } // NOTE: Temporarily disabled until i figure out a better way to implement this with the new queue design // cli::QueueCmd::History { id, max_depth } => { @@ -321,53 +321,21 @@ async fn do_main(args: cli::AppArgs) -> anyhow::Result<()> { chain_id, height, enqueue, + rpc_url, + rest_url, } => { + let voyager_client = jsonrpsee::http_client::HttpClient::builder().build(rpc_url)?; + let start_height = match height { QueryHeight::Latest => { - let config = get_voyager_config()?; - - let context = Context::new( - config.plugins, - config.modules, - config.equivalent_chain_ids, - |h| { - h.register::(); - h.register::(); - }, - ) - .await?; - - let latest_height = context - .rpc_server - .query_latest_height(&chain_id, false) - .await?; - - context.shutdown().await; - - latest_height + voyager_client + .query_latest_height(chain_id.clone(), false) + .await? } QueryHeight::Finalized => { - let config = get_voyager_config()?; - - let context = Context::new( - config.plugins, - config.modules, - config.equivalent_chain_ids, - |h| { - h.register::(); - h.register::(); - }, - ) - .await?; - - let latest_height = context - .rpc_server - .query_latest_height(&chain_id, true) - .await?; - - context.shutdown().await; - - latest_height + voyager_client + .query_latest_height(chain_id.clone(), true) + .await? } QueryHeight::Specific(height) => height, }; @@ -379,7 +347,7 @@ async fn do_main(args: cli::AppArgs) -> anyhow::Result<()> { if enqueue { println!("enqueueing op for `{chain_id}` at `{start_height}`"); - send_enqueue(&get_voyager_config()?.voyager.rest_laddr, op).await?; + send_enqueue(&rest_url, op).await?; } else { print_json(&op); } @@ -520,6 +488,7 @@ async fn do_main(args: cli::AppArgs) -> anyhow::Result<()> { height, metadata, enqueue, + rest_url, } => { let voyager_config = get_voyager_config()?; @@ -537,7 +506,7 @@ async fn do_main(args: cli::AppArgs) -> anyhow::Result<()> { // weird race condition in Context::new that i don't feel like debugging right now tokio::time::sleep(std::time::Duration::from_secs(1)).await; - let msg = make_msg_create_client( + let op = make_msg_create_client( &ctx, tracking, height, @@ -551,9 +520,9 @@ async fn do_main(args: cli::AppArgs) -> anyhow::Result<()> { if enqueue { println!("enqueueing msg"); - send_enqueue(&voyager_config.voyager.rest_laddr, msg).await?; + send_enqueue(&rest_url, op).await?; } else { - print_json(&msg); + print_json(&op); } } MsgCmd::UpdateClient { @@ -562,6 +531,7 @@ async fn do_main(args: cli::AppArgs) -> anyhow::Result<()> { ibc_spec_id, update_to, enqueue, + rest_url, } => { let voyager_config = get_voyager_config()?; @@ -617,7 +587,7 @@ async fn do_main(args: cli::AppArgs) -> anyhow::Result<()> { if enqueue { println!("enqueueing msg"); - send_enqueue(&voyager_config.voyager.rest_laddr, op).await?; + send_enqueue(&rest_url, op).await?; } else { print_json(&op); } @@ -629,11 +599,11 @@ async fn do_main(args: cli::AppArgs) -> anyhow::Result<()> { } async fn send_enqueue( - rest_laddr: &SocketAddr, + rest_laddr: &str, op: Op, ) -> anyhow::Result { Ok(reqwest::Client::new() - .post(format!("http://{rest_laddr}/enqueue")) + .post(format!("{rest_laddr}/enqueue")) .json(&op) .send() .await?)