From a9d01993bee1b9985799102f5948ed2a9f4ca499 Mon Sep 17 00:00:00 2001 From: Bogdan Shinkarenko Date: Thu, 19 Oct 2023 13:43:59 +1000 Subject: [PATCH] feat: add update event function --- src/event_reader_service.rs | 101 ++++++++++++++++++++---------------- 1 file changed, 57 insertions(+), 44 deletions(-) diff --git a/src/event_reader_service.rs b/src/event_reader_service.rs index d54c662..233749d 100644 --- a/src/event_reader_service.rs +++ b/src/event_reader_service.rs @@ -22,6 +22,7 @@ use crate::{ transaction_parser::{BindTransactionInstructionLogs, TransactionParsedMeta}, }; use de_solana_client::GetTransactionsSignaturesForAddress; +use solana_sdk::signature::Signature; use solana_transaction_status::EncodedTransaction; macro_rules! unwrap_or_continue { @@ -369,8 +370,11 @@ where )) } - pub fn get_start_signature(self: Arc, resync_from_slot: u64) -> Result<()> { - let resync_start = match &self + pub async fn get_start_signature( + self: &Arc, + resync_from_slot: u64, + ) -> Result> { + match &self .client .get_block(resync_from_slot) .await @@ -386,27 +390,17 @@ where SolanaSignature::from_str(tx.signatures.first().ok_or_else(|| { Error::WrongConfig("Error while getting tx signature".to_owned()) })?) - .map_err(|err| { - Error::WrongConfig(format!("Error while signature parsing: {err:?}")) - })?, + .map_err(|err| { + Error::WrongConfig(format!("Error while signature parsing: {err:?}")) + })?, )), other => Err(Error::WrongConfig(format!( "Unexpected format of EncodedTransaction: {other:?}", ))), - }?; + } } - pub async fn hard_resync_event(self: Arc, resync_from_slot: u64) -> Result<()> { - - let resync_start = self.get_start_signature(resync_from_slot - 1)?; - - let signatures = ::get_signatures_data_for_address_with_config( - &self.client, - &self.program_id, - self.commitment_config, - resync_start - ).await?.iter().map(|sig_data| sig_data.signature).collect::>(); - + pub async fn update_events(self: Arc, signatures: Vec) -> Result<()> { let signatures_chunks = signatures .as_slice() .chunks( @@ -420,16 +414,17 @@ where let self_clone = self.clone(); let signatures_chunk = signatures_chunk.to_vec(); - tasks.push(async move { - let mut is_chunk_successfull_processed = true; + tasks.push( + async move { + let mut is_chunk_successfull_processed = true; - for tx_signature in signatures_chunk.into_iter() { - info!( + for tx_signature in signatures_chunk.into_iter() { + info!( "Hard resyncing transaction with signature: {}", tx_signature.to_string() ); - let transaction = unwrap_or_continue!( + let transaction = unwrap_or_continue!( self_clone.get_transaction_by_signature(tx_signature).await, error_action = { is_chunk_successfull_processed = false; @@ -437,33 +432,36 @@ where "Error while get transaction by signature: {err:?}" ); - let transaction_str = tx_signature.to_string(); - if let Err(err) = (self_clone.transaction_consumer)( - tx_signature, - transaction, - Arc::clone(&self_clone.client), - Arc::clone(&self_clone.event_recipient), - ) + let transaction_str = tx_signature.to_string(); + if let Err(err) = (self_clone.transaction_consumer)( + tx_signature, + transaction, + Arc::clone(&self_clone.client), + Arc::clone(&self_clone.event_recipient), + ) .await - { - error!("Error while transaction {transaction_str} consuming {err:?}", err = err); - is_chunk_successfull_processed = false; - } else { - info!("Transaction {tx_signature} consumed as part of resync process"); + { + error!( + "Error while transaction {transaction_str} consuming {err:?}", + err = err + ); + is_chunk_successfull_processed = false; + } else { + info!("Transaction {tx_signature} consumed as part of resync process"); + } + + self_clone + .local_storage + .register_transaction(&self_clone.program_id, &tx_signature)?; } - self_clone - .local_storage - .register_transaction(&self_clone.program_id, &tx_signature)?; + Result::Ok(is_chunk_successfull_processed) } - - Result::Ok(is_chunk_successfull_processed) - } .instrument(span!( - Level::ERROR, - "Register chunk", - chunk_index = index, - )) + Level::ERROR, + "Register chunk", + chunk_index = index, + )), ); } @@ -496,6 +494,21 @@ where Ok(()) } + pub async fn hard_resync_event(self: Arc, resync_from_slot: u64) -> Result<()> { + let resync_start = self.get_start_signature(resync_from_slot - 1).await?; + + let signatures = ::get_signatures_data_for_address_with_config( + &self.client, + &self.program_id, + self.commitment_config, + resync_start + ).await?.iter().map(|sig_data| sig_data.signature).collect::>(); + + self.update_events(signatures).await?; + + Ok(()) + } + async fn resync_events(self: &Arc) -> Result<()> { if !self.is_resync_enabled { return Ok(());