Skip to content

Commit

Permalink
feat: add update event function
Browse files Browse the repository at this point in the history
  • Loading branch information
Bogdan Shinkarenko authored and cyphersnake committed Oct 27, 2023
1 parent 3383960 commit a9d0199
Showing 1 changed file with 57 additions and 44 deletions.
101 changes: 57 additions & 44 deletions src/event_reader_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ use crate::{
transaction_parser::{BindTransactionInstructionLogs, TransactionParsedMeta},
};
use de_solana_client::GetTransactionsSignaturesForAddress;
use solana_sdk::signature::Signature;
use solana_transaction_status::EncodedTransaction;

macro_rules! unwrap_or_continue {
Expand Down Expand Up @@ -369,8 +370,11 @@ where
))
}

pub fn get_start_signature(self: Arc<Self>, resync_from_slot: u64) -> Result<()> {
let resync_start = match &self
pub async fn get_start_signature(
self: &Arc<Self>,
resync_from_slot: u64,
) -> Result<Option<Signature>> {
match &self
.client
.get_block(resync_from_slot)
.await
Expand All @@ -386,27 +390,17 @@ where
SolanaSignature::from_str(tx.signatures.first().ok_or_else(|| {
Error::WrongConfig("Error while getting tx signature".to_owned())
})?)
.map_err(|err| {
Error::WrongConfig(format!("Error while signature parsing: {err:?}"))
})?,
.map_err(|err| {
Error::WrongConfig(format!("Error while signature parsing: {err:?}"))
})?,
)),
other => Err(Error::WrongConfig(format!(
"Unexpected format of EncodedTransaction: {other:?}",
))),
}?;
}
}

pub async fn hard_resync_event(self: Arc<Self>, resync_from_slot: u64) -> Result<()> {

let resync_start = self.get_start_signature(resync_from_slot - 1)?;

let signatures = <RpcClient as GetTransactionsSignaturesForAddress>::get_signatures_data_for_address_with_config(
&self.client,
&self.program_id,
self.commitment_config,
resync_start
).await?.iter().map(|sig_data| sig_data.signature).collect::<Vec<_>>();

pub async fn update_events(self: Arc<Self>, signatures: Vec<Signature>) -> Result<()> {
let signatures_chunks = signatures
.as_slice()
.chunks(
Expand All @@ -420,50 +414,54 @@ where
let self_clone = self.clone();
let signatures_chunk = signatures_chunk.to_vec();

tasks.push(async move {
let mut is_chunk_successfull_processed = true;
tasks.push(
async move {
let mut is_chunk_successfull_processed = true;

for tx_signature in signatures_chunk.into_iter() {
info!(
for tx_signature in signatures_chunk.into_iter() {
info!(
"Hard resyncing transaction with signature: {}",
tx_signature.to_string()
);

let transaction = unwrap_or_continue!(
let transaction = unwrap_or_continue!(
self_clone.get_transaction_by_signature(tx_signature).await,
error_action = {
is_chunk_successfull_processed = false;
},
"Error while get transaction by signature: {err:?}"
);

let transaction_str = tx_signature.to_string();
if let Err(err) = (self_clone.transaction_consumer)(
tx_signature,
transaction,
Arc::clone(&self_clone.client),
Arc::clone(&self_clone.event_recipient),
)
let transaction_str = tx_signature.to_string();
if let Err(err) = (self_clone.transaction_consumer)(
tx_signature,
transaction,
Arc::clone(&self_clone.client),
Arc::clone(&self_clone.event_recipient),
)
.await
{
error!("Error while transaction {transaction_str} consuming {err:?}", err = err);
is_chunk_successfull_processed = false;
} else {
info!("Transaction {tx_signature} consumed as part of resync process");
{
error!(
"Error while transaction {transaction_str} consuming {err:?}",
err = err
);
is_chunk_successfull_processed = false;
} else {
info!("Transaction {tx_signature} consumed as part of resync process");
}

self_clone
.local_storage
.register_transaction(&self_clone.program_id, &tx_signature)?;
}

self_clone
.local_storage
.register_transaction(&self_clone.program_id, &tx_signature)?;
Result::Ok(is_chunk_successfull_processed)
}

Result::Ok(is_chunk_successfull_processed)
}
.instrument(span!(
Level::ERROR,
"Register chunk",
chunk_index = index,
))
Level::ERROR,
"Register chunk",
chunk_index = index,
)),
);
}

Expand Down Expand Up @@ -496,6 +494,21 @@ where
Ok(())
}

pub async fn hard_resync_event(self: Arc<Self>, resync_from_slot: u64) -> Result<()> {
let resync_start = self.get_start_signature(resync_from_slot - 1).await?;

let signatures = <RpcClient as GetTransactionsSignaturesForAddress>::get_signatures_data_for_address_with_config(
&self.client,
&self.program_id,
self.commitment_config,
resync_start
).await?.iter().map(|sig_data| sig_data.signature).collect::<Vec<_>>();

self.update_events(signatures).await?;

Ok(())
}

async fn resync_events(self: &Arc<Self>) -> Result<()> {
if !self.is_resync_enabled {
return Ok(());
Expand Down

0 comments on commit a9d0199

Please sign in to comment.