Skip to content

Commit

Permalink
feat: improve rollback mechanism
Browse files Browse the repository at this point in the history
This commit refactors the `event_reader_service.rs` file by introducing a new
 `Rollback` enum and modifying the `resync_rollback` field in the `EventsReader
` struct to use this enum. The `Rollback` enum is used to specify different
 types of rollbacks, including `None`, `Beginning`, and `Signature`. The `res
ync_rollback` field is now of type `Arc<RwLock<Rollback>>`.

Additionally, a new method `reset_last_resynced_transaction` is added to the
 `ResyncedTransactionsPtrStorage` trait and implemented for the `rocksdb` module
. This method allows resetting the last resynced transaction for a specific
 program ID.

These changes improve the clarity and maintainability of the codebase.
  • Loading branch information
cyphersnake committed Jun 28, 2023
1 parent 4a014a2 commit af95f34
Show file tree
Hide file tree
Showing 3 changed files with 41 additions and 15 deletions.
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "solana-events-parser"
version = "0.4.6"
version = "0.5.0"
authors = [
"cyphersnake <[email protected]>",
"shinkarenko <[email protected]>",
Expand Down
40 changes: 26 additions & 14 deletions src/event_reader_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,14 @@ pub enum ResyncOrder {
Historical,
}

#[derive(Debug, Default, PartialEq, Eq, PartialOrd, Ord, Clone, Copy)]
pub enum Rollback {
#[default]
None,
Beginning,
Signature(SolanaSignature),
}

#[derive(derive_builder::Builder)]
pub struct EventsReader<TransactionConsumerFn, EventRecipient, E>
where
Expand Down Expand Up @@ -126,8 +134,8 @@ where
pub resync_signatures_chunk_size: Option<usize>,
pub resync_ptr_setter: Arc<dyn Send + Sync + Fn(u64) -> BoxFuture<'static, Result<()>>>,
pub resync_order: ResyncOrder,
#[builder(default = "Arc::new(RwLock::new(None))")]
pub resync_rollback: Arc<RwLock<Option<SolanaSignature>>>,
#[builder(default = "Arc::new(RwLock::new(Rollback::None))")]
pub resync_rollback: Arc<RwLock<Rollback>>,
}

impl<TransactionConsumerFn, EventRecipient, E>
Expand Down Expand Up @@ -460,18 +468,22 @@ where
self: &Arc<Self>,
last_transaction: Option<SolanaSignature>,
) -> Result<()> {
if let Some(last_transaction) = self
.resync_rollback
.write()
.ok()
.and_then(|mut write| {
write.take().map(|tx| {
info!("Found rollback to {tx} transaction");
tx
})
})
.or(last_transaction)
{
let next_resync_ptr = match self.resync_rollback.write().as_deref() {
Ok(Rollback::Beginning) => {
info!("Reset last resynced tx");
self.local_storage
.reset_last_resynced_transaction(&self.program_id)?;
return Ok(());
}
Ok(Rollback::None) => last_transaction,
Err(err) => {
error!("Error while lock rollback: {err:?}");
last_transaction
}
Ok(Rollback::Signature(signature)) => Some(*signature),
};

if let Some(last_transaction) = next_resync_ptr {
info!("Set last resynced tx to {last_transaction} transaction");
self.local_storage
.set_last_resynced_transaction(&self.program_id, &last_transaction)?;
Expand Down
14 changes: 14 additions & 0 deletions src/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,11 @@ pub trait ResyncedTransactionsPtrStorage: RegisterTransaction {
program_id: &Pubkey,
transaction: &SolanaSignature,
) -> Result<(), <Self as RegisterTransaction>::Error>;

fn reset_last_resynced_transaction(
&self,
program_id: &Pubkey,
) -> Result<(), <Self as RegisterTransaction>::Error>;
}

#[cfg(feature = "rocksdb")]
Expand Down Expand Up @@ -183,5 +188,14 @@ pub mod rocksdb {

Ok(())
}

fn reset_last_resynced_transaction(
&self,
program_id: &Pubkey,
) -> Result<(), <Self as RegisterTransaction>::Error> {
self.delete([&program_id.to_bytes()[..], LAST_RESYNCED_SUFFIX].concat())?;

Ok(())
}
}
}

0 comments on commit af95f34

Please sign in to comment.