From 92e3587f79fbf34a8cfd29b255803d1a9dffb1ec Mon Sep 17 00:00:00 2001 From: benluelo Date: Sun, 19 Jan 2025 12:16:54 +0000 Subject: [PATCH] feat(voyager): add chain id equivalence --- lib/voyager-core/src/lib.rs | 2 + lib/voyager-message/src/context.rs | 228 +++++++----------- .../src/context/equivalent_chain_ids.rs | 81 +++++++ .../src/context/ibc_spec_handler.rs | 66 +++++ lib/voyager-message/src/lib.rs | 2 +- voyager/src/config.rs | 6 +- voyager/src/main.rs | 60 +++-- voyager/src/queue.rs | 13 +- 8 files changed, 299 insertions(+), 159 deletions(-) create mode 100644 lib/voyager-message/src/context/equivalent_chain_ids.rs create mode 100644 lib/voyager-message/src/context/ibc_spec_handler.rs diff --git a/lib/voyager-core/src/lib.rs b/lib/voyager-core/src/lib.rs index 357529533d..d502ce0098 100644 --- a/lib/voyager-core/src/lib.rs +++ b/lib/voyager-core/src/lib.rs @@ -383,6 +383,8 @@ macro_rules! str_newtype { Clone, PartialEq, Eq, + PartialOrd, + Ord, Hash, ::serde::Serialize, ::serde::Deserialize, diff --git a/lib/voyager-message/src/context.rs b/lib/voyager-message/src/context.rs index 5e8ada5f3e..18ca43c23c 100644 --- a/lib/voyager-message/src/context.rs +++ b/lib/voyager-message/src/context.rs @@ -14,13 +14,12 @@ use futures::{ Future, FutureExt, StreamExt, TryStreamExt, }; use jsonrpsee::{ - core::{client::ClientT, RpcResult}, + core::client::ClientT, server::middleware::rpc::RpcServiceT, types::{ErrorObject, ErrorObjectOwned}, }; -use macros::model; use schemars::JsonSchema; -use serde::Serialize; +use serde::{Deserialize, Serialize}; use serde_json::{Map, Value}; use tokio::time::sleep; use tokio_util::sync::CancellationToken; @@ -29,29 +28,29 @@ use tracing::{ Instrument, }; use unionlabs::{ - ethereum::keccak256, - primitives::{encoding::HexUnprefixed, Bytes}, - traits::Member, - ErrorReporter, + ethereum::keccak256, primitives::encoding::HexUnprefixed, traits::Member, ErrorReporter, }; use voyager_core::{ConsensusType, IbcSpecId}; use voyager_vm::{ItemId, QueueError}; use crate::{ - core::{ChainId, ClientType, IbcInterface, IbcSpec}, - into_value, + context::ibc_spec_handler::IbcSpecHandlers, + core::{ChainId, ClientType, IbcInterface}, module::{ ClientBootstrapModuleInfo, ClientModuleInfo, ConsensusModuleInfo, PluginClient, PluginInfo, ProofModuleInfo, StateModuleInfo, }, rpc::{server::Server, VoyagerRpcServer}, - IdThreadClient, ParamsWithItemId, RawClientId, FATAL_JSONRPC_ERROR_CODE, + IdThreadClient, ParamsWithItemId, FATAL_JSONRPC_ERROR_CODE, }; pub const INVALID_CONFIG_EXIT_CODE: u8 = 13; pub const STARTUP_ERROR_EXIT_CODE: u8 = 14; -#[derive(macros::Debug)] +pub mod equivalent_chain_ids; +pub mod ibc_spec_handler; + +#[derive(Debug)] pub struct Context { pub rpc_server: Server, @@ -83,64 +82,6 @@ pub struct Modules { pub ibc_spec_handlers: IbcSpecHandlers, } -pub struct IbcSpecHandlers { - pub(crate) handlers: HashMap, -} - -impl IbcSpecHandlers { - #[allow(clippy::new_without_default)] - pub fn new() -> Self { - Self { - handlers: HashMap::default(), - } - } - - pub fn register(&mut self) { - self.handlers.insert(S::ID, IbcSpecHandler::new::()); - } - - pub fn get(&self, ibc_spec_id: &IbcSpecId) -> RpcResult<&IbcSpecHandler> { - self.handlers.get(ibc_spec_id).ok_or_else(|| { - ErrorObject::owned( - FATAL_JSONRPC_ERROR_CODE, - format!("unknown IBC spec `{ibc_spec_id}`"), - None::<()>, - ) - }) - } -} - -/// A type-erased version of the methods on [`IbcSpec`] (essentially a vtable). -pub struct IbcSpecHandler { - pub client_state_path: fn(RawClientId) -> anyhow::Result, - pub consensus_state_path: fn(RawClientId, String) -> anyhow::Result, - pub msg_update_client: fn(RawClientId, Bytes) -> anyhow::Result, -} - -impl IbcSpecHandler { - pub const fn new() -> Self { - Self { - client_state_path: |client_id| { - Ok(into_value(T::client_state_path(serde_json::from_value( - client_id.0, - )?))) - }, - consensus_state_path: |client_id, height| { - Ok(into_value(T::consensus_state_path( - serde_json::from_value(client_id.0)?, - height.parse()?, - ))) - }, - msg_update_client: |client_id, client_message| { - Ok(into_value(T::update_client_datagram( - serde_json::from_value(client_id.0)?, - client_message, - ))) - }, - } - } -} - impl voyager_vm::ContextT for Context {} #[derive(macros::Debug, Clone)] @@ -193,10 +134,6 @@ impl ModuleRpcClient { ) } - // pub fn client(&self) -> &impl jsonrpsee::core::client::ClientT { - // &self.client - // } - pub fn client(&self) -> &reconnecting_jsonrpc_ws_client::Client { &self.client } @@ -284,8 +221,8 @@ fn make_module_rpc_server_socket_path(name: &str) -> String { ) } -#[model] -#[derive(Hash, JsonSchema)] +#[derive(Debug, Clone, PartialEq, Hash, Serialize, Deserialize, JsonSchema)] +#[serde(deny_unknown_fields)] pub struct PluginConfig { pub path: PathBuf, pub config: Value, @@ -293,8 +230,8 @@ pub struct PluginConfig { pub enabled: bool, } -#[model] -#[derive(JsonSchema)] +#[derive(Debug, Clone, PartialEq, Default, Serialize, Deserialize, JsonSchema)] +#[serde(deny_unknown_fields)] pub struct ModulesConfig { pub state: Vec>, pub proof: Vec>, @@ -303,8 +240,8 @@ pub struct ModulesConfig { pub client_bootstrap: Vec>, } -#[model] -#[derive(JsonSchema)] +#[derive(Debug, Clone, PartialEq, Serialize, Deserialize, JsonSchema)] +#[serde(deny_unknown_fields)] pub struct ModuleConfig { pub path: PathBuf, pub info: T, @@ -327,6 +264,7 @@ impl Context { pub async fn new( plugin_configs: Vec, module_configs: ModulesConfig, + equivalent_chain_ids: equivalent_chain_ids::EquivalentChainIds, register_ibc_spec_handlers: fn(&mut IbcSpecHandlers), ) -> anyhow::Result { let cancellation_token = CancellationToken::new(); @@ -432,15 +370,20 @@ impl Context { ibc_spec_id, }, rpc_client| { - let prev = modules - .state_modules - .insert((chain_id.clone(), ibc_spec_id.clone()), rpc_client); + for equivalent_chain_id in + equivalent_chain_ids.equivalents(chain_id).chain([chain_id]) + { + let prev = modules.state_modules.insert( + (equivalent_chain_id.clone(), ibc_spec_id.clone()), + rpc_client.clone(), + ); - if prev.is_some() { - return Err(anyhow!( - "multiple state modules configured for chain id \ - `{chain_id}` and IBC version `{ibc_spec_id}`", - )); + if prev.is_some() { + return Err(anyhow!( + "multiple state modules configured for chain id \ + `{equivalent_chain_id}` and IBC version `{ibc_spec_id}`", + )); + } } Ok(()) @@ -458,15 +401,20 @@ impl Context { ibc_spec_id, }, rpc_client| { - let prev = modules - .proof_modules - .insert((chain_id.clone(), ibc_spec_id.clone()), rpc_client); + for equivalent_chain_id in + equivalent_chain_ids.equivalents(chain_id).chain([chain_id]) + { + let prev = modules.proof_modules.insert( + (equivalent_chain_id.clone(), ibc_spec_id.clone()), + rpc_client.clone(), + ); - if prev.is_some() { - return Err(anyhow!( - "multiple proof modules configured for chain id \ - `{chain_id}` and IBC version `{ibc_spec_id}`", - )); + if prev.is_some() { + return Err(anyhow!( + "multiple proof modules configured for chain id \ + `{equivalent_chain_id}` and IBC version `{ibc_spec_id}`", + )); + } } Ok(()) @@ -484,23 +432,27 @@ impl Context { consensus_type, }, rpc_client| { - let prev = modules - .consensus_modules - .insert(chain_id.clone(), rpc_client); + for equivalent_chain_id in + equivalent_chain_ids.equivalents(chain_id).chain([chain_id]) + { + let prev = modules + .consensus_modules + .insert(equivalent_chain_id.clone(), rpc_client.clone()); - if prev.is_some() { - return Err(anyhow!( - "multiple consensus modules configured for chain id `{}`", - chain_id - )); - } + if prev.is_some() { + return Err(anyhow!( + "multiple consensus modules configured for chain id `{}`", + equivalent_chain_id + )); + } - let None = modules - .chain_consensus_types - .insert(chain_id.clone(), consensus_type.clone()) - else { - unreachable!() - }; + let None = modules + .chain_consensus_types + .insert(equivalent_chain_id.clone(), consensus_type.clone()) + else { + unreachable!() + }; + } Ok(()) }, @@ -571,32 +523,37 @@ impl Context { chain_id, }, rpc_client| { - let prev = modules - .client_bootstrap_modules - .insert((chain_id.clone(), client_type.clone()), rpc_client.clone()); + for equivalent_chain_id in + equivalent_chain_ids.equivalents(chain_id).chain([chain_id]) + { + let prev = modules.client_bootstrap_modules.insert( + (equivalent_chain_id.clone(), client_type.clone()), + rpc_client.clone(), + ); - if prev.is_some() { - return Err(anyhow!( - "multiple client bootstrap modules configured for client \ - type `{client_type}` and chain id `{chain_id}`", - )); - } + if prev.is_some() { + return Err(anyhow!( + "multiple client bootstrap modules configured for client \ + type `{client_type}` and chain id `{equivalent_chain_id}`", + )); + } - // TODO: Check consistency with client_consensus_types and chain_id? - - // if let Some(previous_consensus_type) = modules - // .client_consensus_types - // .insert(client_type.clone(), consensus_type.clone()) - // { - // if previous_consensus_type != consensus_type { - // return Err(anyhow!( - // "inconsistency in client consensus types: \ - // client type `{client_type}` is registered \ - // as tracking both `{previous_consensus_type}` \ - // and `{consensus_type}`" - // )); - // } - // } + // TODO: Check consistency with client_consensus_types and chain_id? + + // if let Some(previous_consensus_type) = modules + // .client_consensus_types + // .insert(client_type.clone(), consensus_type.clone()) + // { + // if previous_consensus_type != consensus_type { + // return Err(anyhow!( + // "inconsistency in client consensus types: \ + // client type `{client_type}` is registered \ + // as tracking both `{previous_consensus_type}` \ + // and `{consensus_type}`" + // )); + // } + // } + } Ok(()) }, @@ -847,7 +804,8 @@ impl Modules { } } -#[model] +#[derive(Debug, Clone, PartialEq, Serialize, Deserialize, JsonSchema)] +#[serde(deny_unknown_fields)] pub struct LoadedModulesInfo { pub state: Vec, pub proof: Vec, diff --git a/lib/voyager-message/src/context/equivalent_chain_ids.rs b/lib/voyager-message/src/context/equivalent_chain_ids.rs new file mode 100644 index 0000000000..86f7a8aa06 --- /dev/null +++ b/lib/voyager-message/src/context/equivalent_chain_ids.rs @@ -0,0 +1,81 @@ +use std::{collections::HashSet, sync::OnceLock}; + +use schemars::JsonSchema; +use serde::{Deserialize, Serialize}; + +use crate::core::ChainId; + +/// [`ChainId`] to consider equivalent. +/// +/// Some chains expose multiple chain IDs due to requirements of certain components used in the chain (for example, different execution environments running in the same chain may have different chain ID specifications for transaction signing). In cases such as this, multiple chain IDs can refer to the same "abstract machine" of a chain. +#[derive(Debug, Clone, Default, Serialize, Deserialize, JsonSchema)] +#[serde(try_from = "Vec>", into = "Vec>")] +// TODO: Implement JsonSchema manually to properly encode the constraints on the value +#[schemars(transparent)] +pub struct EquivalentChainIds { + // NOTE: All chain IDs in this list are required to be unique + // TODO: Ensure that all of the inner HashSets contain >= 2 items + pub(crate) inner: Vec>, +} + +impl EquivalentChainIds { + /// Get all chain IDs that are equivalent to `chain_id`. + /// + /// Note that this does ***NOT*** include `chain_id`. + pub fn equivalents<'a, 'b>( + &'b self, + chain_id: &'a ChainId, + ) -> impl Iterator + use<'a, 'b> { + static EMPTY_ITER: OnceLock> = OnceLock::new(); + + self.inner + .iter() + .find(|v| v.contains(chain_id)) + .unwrap_or(EMPTY_ITER.get_or_init(HashSet::new)) + .iter() + .filter(move |id| id != chain_id) + } + + pub fn is_empty(&self) -> bool { + self.inner.is_empty() + } +} + +#[allow( + clippy::implicit_hasher, + reason = "EquivalentChainIds.inner is not generic over the hasher" +)] +impl From for Vec> { + fn from(value: EquivalentChainIds) -> Self { + value.inner + } +} + +impl TryFrom>> for EquivalentChainIds { + type Error = EquivalentChainIdsError; + + fn try_from(value: Vec>) -> Result { + let mut all_ids = value.iter().flatten().collect::>(); + + all_ids.sort(); + + let (_, dups) = all_ids.partition_dedup(); + + if dups.is_empty() { + Ok(Self { inner: value }) + } else { + Err(EquivalentChainIdsError { + duplicates: dups.iter().map(|&c| c.clone()).collect(), + }) + } + } +} + +#[derive(Debug, Clone, PartialEq, thiserror::Error)] +#[error( + "duplicated chain IDs in chain ID equivalence lists: {}", + duplicates.iter().map(ToString::to_string).collect::>().join(", "), +)] +pub struct EquivalentChainIdsError { + pub duplicates: HashSet, +} diff --git a/lib/voyager-message/src/context/ibc_spec_handler.rs b/lib/voyager-message/src/context/ibc_spec_handler.rs new file mode 100644 index 0000000000..9082e62ebf --- /dev/null +++ b/lib/voyager-message/src/context/ibc_spec_handler.rs @@ -0,0 +1,66 @@ +use std::collections::HashMap; + +use jsonrpsee::{core::RpcResult, types::ErrorObject}; +use serde_json::Value; +use unionlabs::primitives::Bytes; +use voyager_core::IbcSpecId; + +use crate::{core::IbcSpec, into_value, RawClientId, FATAL_JSONRPC_ERROR_CODE}; + +pub struct IbcSpecHandlers { + pub(crate) handlers: HashMap, +} + +impl IbcSpecHandlers { + #[allow(clippy::new_without_default)] + pub fn new() -> Self { + Self { + handlers: HashMap::default(), + } + } + + pub fn register(&mut self) { + self.handlers.insert(S::ID, IbcSpecHandler::new::()); + } + + pub fn get(&self, ibc_spec_id: &IbcSpecId) -> RpcResult<&IbcSpecHandler> { + self.handlers.get(ibc_spec_id).ok_or_else(|| { + ErrorObject::owned( + FATAL_JSONRPC_ERROR_CODE, + format!("unknown IBC spec `{ibc_spec_id}`"), + None::<()>, + ) + }) + } +} + +/// A type-erased version of the methods on [`IbcSpec`] (essentially a vtable). +pub struct IbcSpecHandler { + pub client_state_path: fn(RawClientId) -> anyhow::Result, + pub consensus_state_path: fn(RawClientId, String) -> anyhow::Result, + pub msg_update_client: fn(RawClientId, Bytes) -> anyhow::Result, +} + +impl IbcSpecHandler { + pub const fn new() -> Self { + Self { + client_state_path: |client_id| { + Ok(into_value(T::client_state_path(serde_json::from_value( + client_id.0, + )?))) + }, + consensus_state_path: |client_id, height| { + Ok(into_value(T::consensus_state_path( + serde_json::from_value(client_id.0)?, + height.parse()?, + ))) + }, + msg_update_client: |client_id, client_message| { + Ok(into_value(T::update_client_datagram( + serde_json::from_value(client_id.0)?, + client_message, + ))) + }, + } + } +} diff --git a/lib/voyager-message/src/lib.rs b/lib/voyager-message/src/lib.rs index 42a235068c..27ceb4dc03 100644 --- a/lib/voyager-message/src/lib.rs +++ b/lib/voyager-message/src/lib.rs @@ -1,4 +1,4 @@ -#![feature(trait_alias)] +#![feature(trait_alias, slice_partition_dedup)] use std::{ borrow::Cow, diff --git a/voyager/src/config.rs b/voyager/src/config.rs index 26a62a2c10..d983305767 100644 --- a/voyager/src/config.rs +++ b/voyager/src/config.rs @@ -2,7 +2,9 @@ use std::net::{IpAddr, Ipv4Addr, SocketAddr}; use schemars::JsonSchema; use serde::{Deserialize, Serialize}; -use voyager_message::context::{ModulesConfig, PluginConfig}; +use voyager_message::context::{ + equivalent_chain_ids::EquivalentChainIds, ModulesConfig, PluginConfig, +}; use crate::queue::QueueConfig; @@ -12,6 +14,8 @@ pub struct Config { // allows for using $schema in the config file #[serde(rename = "$schema", default, skip_serializing_if = "Option::is_none")] pub schema: Option, + #[serde(default, skip_serializing_if = "EquivalentChainIds::is_empty")] + pub equivalent_chain_ids: EquivalentChainIds, pub modules: ModulesConfig, pub plugins: Vec, pub voyager: VoyagerConfig, diff --git a/voyager/src/main.rs b/voyager/src/main.rs index f00a43dca9..ea60e32bb7 100644 --- a/voyager/src/main.rs +++ b/voyager/src/main.rs @@ -27,7 +27,10 @@ use tracing_subscriber::EnvFilter; use voyager_message::{ call::{FetchBlocks, FetchUpdateHeaders}, callback::AggregateMsgUpdateClientsFromOrderedHeaders, - context::{get_plugin_info, Context, IbcSpecHandler, ModulesConfig}, + context::{ + equivalent_chain_ids::EquivalentChainIds, get_plugin_info, + ibc_spec_handler::IbcSpecHandler, Context, ModulesConfig, + }, core::{IbcSpec, QueryHeight}, filter::{make_filter, run_filter, JaqInterestFilter}, rpc::{IbcState, VoyagerRpcClient}, @@ -138,7 +141,7 @@ async fn do_main(args: cli::AppArgs) -> anyhow::Result<()> { } ConfigCmd::Default => print_json(&Config { schema: None, - plugins: vec![], + equivalent_chain_ids: EquivalentChainIds::default(), modules: ModulesConfig { state: vec![], proof: vec![], @@ -146,6 +149,7 @@ async fn do_main(args: cli::AppArgs) -> anyhow::Result<()> { client: vec![], client_bootstrap: vec![], }, + plugins: vec![], voyager: VoyagerConfig { num_workers: 1, rest_laddr: default_rest_laddr(), @@ -322,10 +326,15 @@ async fn do_main(args: cli::AppArgs) -> anyhow::Result<()> { QueryHeight::Latest => { let config = get_voyager_config()?; - let context = Context::new(config.plugins, config.modules, |h| { - h.register::(); - h.register::(); - }) + let context = Context::new( + config.plugins, + config.modules, + config.equivalent_chain_ids, + |h| { + h.register::(); + h.register::(); + }, + ) .await?; let latest_height = context @@ -340,10 +349,15 @@ async fn do_main(args: cli::AppArgs) -> anyhow::Result<()> { QueryHeight::Finalized => { let config = get_voyager_config()?; - let context = Context::new(config.plugins, config.modules, |h| { - h.register::(); - h.register::(); - }) + let context = Context::new( + config.plugins, + config.modules, + config.equivalent_chain_ids, + |h| { + h.register::(); + h.register::(); + }, + ) .await?; let latest_height = context @@ -509,10 +523,15 @@ async fn do_main(args: cli::AppArgs) -> anyhow::Result<()> { } => { let voyager_config = get_voyager_config()?; - let ctx = Context::new(voyager_config.plugins, voyager_config.modules, |h| { - h.register::(); - h.register::(); - }) + let ctx = Context::new( + voyager_config.plugins, + voyager_config.modules, + voyager_config.equivalent_chain_ids, + |h| { + h.register::(); + h.register::(); + }, + ) .await?; // weird race condition in Context::new that i don't feel like debugging right now @@ -546,10 +565,15 @@ async fn do_main(args: cli::AppArgs) -> anyhow::Result<()> { } => { let voyager_config = get_voyager_config()?; - let ctx = Context::new(voyager_config.plugins, voyager_config.modules, |h| { - h.register::(); - h.register::(); - }) + let ctx = Context::new( + voyager_config.plugins, + voyager_config.modules, + voyager_config.equivalent_chain_ids, + |h| { + h.register::(); + h.register::(); + }, + ) .await?; // weird race condition in Context::new that i don't feel like debugging right now diff --git a/voyager/src/queue.rs b/voyager/src/queue.rs index 47e7e6aa6d..c1516c6669 100644 --- a/voyager/src/queue.rs +++ b/voyager/src/queue.rs @@ -148,10 +148,15 @@ impl Voyager { .context("error initializing queue")?; Ok(Self { - context: Context::new(config.plugins, config.modules, |h| { - h.register::(); - h.register::(); - }) + context: Context::new( + config.plugins, + config.modules, + config.equivalent_chain_ids, + |h| { + h.register::(); + h.register::(); + }, + ) .await .context("error initializing plugins")?, num_workers: config.voyager.num_workers,