diff --git a/src/event_reader_service.rs b/src/event_reader_service.rs index a6bb3b6..b8c8a14 100644 --- a/src/event_reader_service.rs +++ b/src/event_reader_service.rs @@ -90,6 +90,14 @@ pub enum ResyncOrder { Historical, } +#[derive(Debug, Default, PartialEq, Eq, PartialOrd, Ord, Clone, Copy)] +pub enum Rollback { + #[default] + None, + Beginning, + Signature(SolanaSignature), +} + #[derive(derive_builder::Builder)] pub struct EventsReader where @@ -126,8 +134,8 @@ where pub resync_signatures_chunk_size: Option, pub resync_ptr_setter: Arc BoxFuture<'static, Result<()>>>, pub resync_order: ResyncOrder, - #[builder(default = "Arc::new(RwLock::new(None))")] - pub resync_rollback: Arc>>, + #[builder(default = "Arc::new(RwLock::new(Rollback::None))")] + pub resync_rollback: Arc>, pub live_events_transaction_request_param: TransactionRequestParams, } @@ -488,18 +496,22 @@ where self: &Arc, last_transaction: Option, ) -> Result<()> { - if let Some(last_transaction) = self - .resync_rollback - .write() - .ok() - .and_then(|mut write| { - write.take().map(|tx| { - info!("Found rollback to {tx} transaction"); - tx - }) - }) - .or(last_transaction) - { + let next_resync_ptr = match self.resync_rollback.write().as_deref() { + Ok(Rollback::Beginning) => { + info!("Reset last resynced tx"); + self.local_storage + .reset_last_resynced_transaction(&self.program_id)?; + return Ok(()); + } + Ok(Rollback::None) => last_transaction, + Err(err) => { + error!("Error while lock rollback: {err:?}"); + last_transaction + } + Ok(Rollback::Signature(signature)) => Some(*signature), + }; + + if let Some(last_transaction) = next_resync_ptr { info!("Set last resynced tx to {last_transaction} transaction"); self.local_storage .set_last_resynced_transaction(&self.program_id, &last_transaction)?; diff --git a/src/storage.rs b/src/storage.rs index c5fbe55..9b61ef8 100644 --- a/src/storage.rs +++ b/src/storage.rs @@ -63,6 +63,11 @@ pub trait ResyncedTransactionsPtrStorage: RegisterTransaction { program_id: &Pubkey, transaction: &SolanaSignature, ) -> Result<(), ::Error>; + + fn reset_last_resynced_transaction( + &self, + program_id: &Pubkey, + ) -> Result<(), ::Error>; } #[cfg(feature = "rocksdb")] @@ -185,5 +190,14 @@ pub mod rocksdb { Ok(()) } + + fn reset_last_resynced_transaction( + &self, + program_id: &Pubkey, + ) -> Result<(), ::Error> { + self.delete([&program_id.to_bytes()[..], LAST_RESYNCED_SUFFIX].concat())?; + + Ok(()) + } } }