diff --git a/server/src/geyser_grpc_plugin.rs b/server/src/geyser_grpc_plugin.rs index 7bd348d..7f199f1 100644 --- a/server/src/geyser_grpc_plugin.rs +++ b/server/src/geyser_grpc_plugin.rs @@ -9,6 +9,7 @@ use std::{ }, time::SystemTime, }; +use std::sync::atomic::AtomicBool; use bs58; use crossbeam_channel::{bounded, Sender, TrySendError}; @@ -44,6 +45,9 @@ pub struct PluginData { /// Highest slot that an account write has been processed for thus far. highest_write_slot: Arc, + + is_startup_completed: AtomicBool, + ignore_startup_updates: bool } #[derive(Default)] @@ -66,6 +70,7 @@ pub struct PluginConfig { pub slot_update_buffer_size: usize, pub block_update_buffer_size: usize, pub transaction_update_buffer_size: usize, + pub skip_startup_stream: Option } impl GeyserPlugin for GeyserGrpcPlugin { @@ -90,6 +95,8 @@ impl GeyserPlugin for GeyserGrpcPlugin { msg: format!("Error deserializing PluginConfig: {err:?}"), })?; + info!("loaded geyser config: {:?}", config); + let addr = config .bind_address @@ -134,6 +141,9 @@ impl GeyserPlugin for GeyserGrpcPlugin { block_update_sender, transaction_update_sender, highest_write_slot, + is_startup_completed: AtomicBool::new(false), + // don't skip startup to keep backwards compatability + ignore_startup_updates: config.skip_startup_stream.unwrap_or(false) }); info!("plugin data initialized"); @@ -150,6 +160,11 @@ impl GeyserPlugin for GeyserGrpcPlugin { data.runtime.shutdown_background(); } + fn notify_end_of_startup(&self) -> PluginResult<()> { + self.data.as_ref().unwrap().is_startup_completed.store(true, Ordering::Relaxed); + Ok(()) + } + fn update_account( &self, account: ReplicaAccountInfoVersions, @@ -157,6 +172,11 @@ impl GeyserPlugin for GeyserGrpcPlugin { is_startup: bool, ) -> PluginResult<()> { let data = self.data.as_ref().expect("plugin must be initialized"); + + if data.ignore_startup_updates && !data.is_startup_completed.load(Ordering::Relaxed) { + return Ok(()); + } + let account_update = match account { ReplicaAccountInfoVersions::V0_0_1(account) => TimestampedAccountUpdate { ts: Some(prost_types::Timestamp::from(SystemTime::now())), @@ -254,10 +274,6 @@ impl GeyserPlugin for GeyserGrpcPlugin { } } - fn notify_end_of_startup(&self) -> PluginResult<()> { - Ok(()) - } - fn update_slot_status( &self, slot: u64, @@ -265,6 +281,11 @@ impl GeyserPlugin for GeyserGrpcPlugin { status: SlotStatus, ) -> PluginResult<()> { let data = self.data.as_ref().expect("plugin must be initialized"); + + if data.ignore_startup_updates && !data.is_startup_completed.load(Ordering::Relaxed) { + return Ok(()); + } + debug!("Updating slot {:?} at with status {:?}", slot, status); let status = match status { @@ -301,6 +322,11 @@ impl GeyserPlugin for GeyserGrpcPlugin { slot: u64, ) -> PluginResult<()> { let data = self.data.as_ref().expect("plugin must be initialized"); + + if data.ignore_startup_updates && !data.is_startup_completed.load(Ordering::Relaxed) { + return Ok(()); + } + let transaction_update = match transaction { ReplicaTransactionInfoVersions::V0_0_1(tx) => TimestampedTransactionUpdate { ts: Some(prost_types::Timestamp::from(SystemTime::now())), @@ -348,6 +374,10 @@ impl GeyserPlugin for GeyserGrpcPlugin { fn notify_block_metadata(&self, block_info: ReplicaBlockInfoVersions) -> PluginResult<()> { let data = self.data.as_ref().expect("plugin must be initialized"); + if data.ignore_startup_updates && !data.is_startup_completed.load(Ordering::Relaxed) { + return Ok(()); + } + let block = match block_info { ReplicaBlockInfoVersions::V0_0_1(block) => TimestampedBlockUpdate { ts: Some(prost_types::Timestamp::from(SystemTime::now())),