diff --git a/packages/ciphernode/evm/src/ciphernode_registry_sol.rs b/packages/ciphernode/evm/src/ciphernode_registry_sol.rs index c3bd3b9c..cae4524b 100644 --- a/packages/ciphernode/evm/src/ciphernode_registry_sol.rs +++ b/packages/ciphernode/evm/src/ciphernode_registry_sol.rs @@ -126,14 +126,12 @@ impl CiphernodeRegistrySolReader { } } - pub async fn load(params: CiphernodeRegistryReaderParams) -> Result> { - let addr = if let Some(snapshot) = params.repository.read().await? { + pub async fn load(params: CiphernodeRegistryReaderParams) -> Result { + Ok(if let Some(snapshot) = params.repository.read().await? { Self::from_snapshot(params, snapshot).await? } else { Self::new(params) - } - .start(); - Ok(addr) + }) } pub async fn attach( @@ -147,14 +145,16 @@ impl CiphernodeRegistrySolReader { repository: repository.clone(), }; - let addr = Self::load(params).await?; + let actor = Self::load(params).await?; + let last_block = actor.state.last_block; + let addr = actor.start(); EvmEventReader::attach( &addr.clone().into(), provider, extractor, contract_address, - None, + last_block, &bus.clone().into(), ) .await?; diff --git a/packages/ciphernode/evm/src/enclave_sol_reader.rs b/packages/ciphernode/evm/src/enclave_sol_reader.rs index 63f5c491..930c4d60 100644 --- a/packages/ciphernode/evm/src/enclave_sol_reader.rs +++ b/packages/ciphernode/evm/src/enclave_sol_reader.rs @@ -119,14 +119,12 @@ impl EnclaveSolReader { } } - pub async fn load(params: EnclaveSolReaderParams) -> Result> { - let addr = if let Some(snapshot) = params.repository.read().await? { + pub async fn load(params: EnclaveSolReaderParams) -> Result { + Ok(if let Some(snapshot) = params.repository.read().await? { Self::from_snapshot(params, snapshot).await? } else { Self::new(params) - } - .start(); - Ok(addr) + }) } pub async fn attach( @@ -135,19 +133,22 @@ impl EnclaveSolReader { contract_address: &str, repository: &Repository, ) -> Result> { - let addr = Self::load(EnclaveSolReaderParams { + let params = EnclaveSolReaderParams { bus: bus.clone(), repository: repository.clone(), - }) - .await?; + }; + + let actor = Self::load(params).await?; + let last_block = actor.state.last_block; + let addr = actor.start(); EvmEventReader::attach( - &addr.clone().into(), + &addr.clone().recipient(), provider, extractor, contract_address, - None, - &bus.clone().into(), + last_block, + &bus.clone(), ) .await?; diff --git a/packages/ciphernode/evm/src/lib.rs b/packages/ciphernode/evm/src/lib.rs index ba330d5d..db53cbe3 100644 --- a/packages/ciphernode/evm/src/lib.rs +++ b/packages/ciphernode/evm/src/lib.rs @@ -13,5 +13,5 @@ pub use ciphernode_registry_sol::{ pub use enclave_sol::EnclaveSol; pub use enclave_sol_reader::{EnclaveSolReader, EnclaveSolReaderParams, EnclaveSolReaderState}; pub use enclave_sol_writer::EnclaveSolWriter; -pub use event_reader::{EvmEventReader, ExtractorFn}; +pub use event_reader::{EnclaveEvmEvent, EvmEventReader, ExtractorFn}; pub use registry_filter_sol::{RegistryFilterSol, RegistryFilterSolWriter}; diff --git a/packages/ciphernode/evm/tests/evm_reader.rs b/packages/ciphernode/evm/tests/evm_reader.rs index 15daeddd..165142bf 100644 --- a/packages/ciphernode/evm/tests/evm_reader.rs +++ b/packages/ciphernode/evm/tests/evm_reader.rs @@ -1,4 +1,4 @@ -use actix::Actor; +use actix::{Actor, Addr, Handler}; use alloy::{ node_bindings::Anvil, primitives::{FixedBytes, LogData}, @@ -8,7 +8,7 @@ use alloy::{ }; use anyhow::Result; use enclave_core::{EnclaveEvent, EventBus, GetHistory, TestEvent}; -use evm::{helpers::WithChainId, EvmEventReader}; +use evm::{helpers::WithChainId, EnclaveEvmEvent, EvmEventReader}; use std::time::Duration; use tokio::time::sleep; @@ -37,6 +37,21 @@ fn test_event_extractor( } } +struct EvmEventUnwrapper { + bus: Addr, +} + +impl Actor for EvmEventUnwrapper { + type Context = actix::Context; +} + +impl Handler for EvmEventUnwrapper { + type Result = (); + fn handle(&mut self, msg: EnclaveEvmEvent, _: &mut Self::Context) -> Self::Result { + self.bus.do_send(msg.event); + } +} + #[actix::test] async fn evm_reader() -> Result<()> { // Create a WS provider @@ -47,13 +62,14 @@ async fn evm_reader() -> Result<()> { let arc_provider = WithChainId::new(provider).await?; let contract = EmitLogs::deploy(arc_provider.get_provider()).await?; let bus = EventBus::new(true).start(); - + let unwrapper = EvmEventUnwrapper { bus: bus.clone() }.start(); EvmEventReader::attach( - &bus.clone().into(), + &unwrapper.clone().into(), &arc_provider, test_event_extractor, &contract.address().to_string(), None, + &bus, ) .await?; @@ -100,7 +116,7 @@ async fn ensure_historical_events() -> Result<()> { let arc_provider = WithChainId::new(provider).await?; let contract = EmitLogs::deploy(arc_provider.get_provider()).await?; let bus = EventBus::new(true).start(); - + let unwrapper = EvmEventUnwrapper { bus: bus.clone() }.start(); let historical_msgs = vec!["these", "are", "historical", "events"]; let live_events = vec!["these", "events", "are", "live"]; @@ -114,11 +130,12 @@ async fn ensure_historical_events() -> Result<()> { } EvmEventReader::attach( - &bus.clone().into(), + &unwrapper.clone().into(), &arc_provider, test_event_extractor, &contract.address().to_string(), None, + &bus, ) .await?;