From b6b5b1a04fde8b3bae36be1b99960c7279ce300e Mon Sep 17 00:00:00 2001 From: Lijun Wang <83639177+lijunwangs@users.noreply.github.com> Date: Thu, 3 Oct 2024 10:11:52 -0700 Subject: [PATCH] Support notify first shred received in geyser (#3030) * support notify first shred received in geyser --- Cargo.lock | 1 + core/src/tvu.rs | 8 ++++++- core/src/validator.rs | 5 ++++ .../src/geyser_plugin_interface.rs | 4 ++++ .../src/geyser_plugin_service.rs | 16 +++++++++---- .../src/slot_status_notifier.rs | 7 ++++++ programs/sbf/Cargo.lock | 1 + turbine/Cargo.toml | 1 + turbine/benches/retransmit_stage.rs | 1 + turbine/src/retransmit_stage.rs | 24 ++++++++++++++++++- 10 files changed, 62 insertions(+), 6 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 51b063750b0bf4..6c030fe71362cd 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -8373,6 +8373,7 @@ dependencies = [ "rustls 0.23.13", "solana-entry", "solana-feature-set", + "solana-geyser-plugin-manager", "solana-gossip", "solana-ledger", "solana-logger", diff --git a/core/src/tvu.rs b/core/src/tvu.rs index 5134228c34ac4c..5d5e18bc241395 100644 --- a/core/src/tvu.rs +++ b/core/src/tvu.rs @@ -25,7 +25,10 @@ use { bytes::Bytes, crossbeam_channel::{unbounded, Receiver, Sender}, solana_client::connection_cache::ConnectionCache, - solana_geyser_plugin_manager::block_metadata_notifier_interface::BlockMetadataNotifierArc, + solana_geyser_plugin_manager::{ + block_metadata_notifier_interface::BlockMetadataNotifierArc, + slot_status_notifier::SlotStatusNotifier, + }, solana_gossip::{ cluster_info::ClusterInfo, duplicate_shred_handler::DuplicateShredHandler, duplicate_shred_listener::DuplicateShredListener, @@ -161,6 +164,7 @@ impl Tvu { outstanding_repair_requests: Arc>, cluster_slots: Arc, wen_restart_repair_slots: Option>>>, + slot_status_notifier: Option, ) -> Result { let in_wen_restart = wen_restart_repair_slots.is_some(); @@ -210,6 +214,7 @@ impl Tvu { retransmit_receiver, max_slots.clone(), Some(rpc_subscriptions.clone()), + slot_status_notifier, ); let (ancestor_duplicate_slots_sender, ancestor_duplicate_slots_receiver) = unbounded(); @@ -542,6 +547,7 @@ pub mod tests { outstanding_repair_requests, cluster_slots, wen_restart_repair_slots, + None, ) .expect("assume success"); if enable_wen_restart { diff --git a/core/src/validator.rs b/core/src/validator.rs index aa9b9e81ab6fb2..3ea9593a5efbb1 100644 --- a/core/src/validator.rs +++ b/core/src/validator.rs @@ -674,6 +674,10 @@ impl Validator { .as_ref() .and_then(|geyser_plugin_service| geyser_plugin_service.get_block_metadata_notifier()); + let slot_status_notifier = geyser_plugin_service + .as_ref() + .and_then(|geyser_plugin_service| geyser_plugin_service.get_slot_status_notifier()); + info!( "Geyser plugin: accounts_update_notifier: {}, transaction_notifier: {}, \ entry_notifier: {}", @@ -1405,6 +1409,7 @@ impl Validator { outstanding_repair_requests.clone(), cluster_slots.clone(), wen_restart_repair_slots.clone(), + slot_status_notifier, ) .map_err(ValidatorError::Other)?; diff --git a/geyser-plugin-interface/src/geyser_plugin_interface.rs b/geyser-plugin-interface/src/geyser_plugin_interface.rs index 1dec992bd6f10b..6ac7bb848b1444 100644 --- a/geyser-plugin-interface/src/geyser_plugin_interface.rs +++ b/geyser-plugin-interface/src/geyser_plugin_interface.rs @@ -319,6 +319,9 @@ pub enum SlotStatus { /// The highest slot that has been voted on by supermajority of the cluster, ie. is confirmed. Confirmed, + + /// First Shred Received + FirstShredReceived, } impl SlotStatus { @@ -327,6 +330,7 @@ impl SlotStatus { SlotStatus::Confirmed => "confirmed", SlotStatus::Processed => "processed", SlotStatus::Rooted => "rooted", + SlotStatus::FirstShredReceived => "first_shread_received", } } } diff --git a/geyser-plugin-manager/src/geyser_plugin_service.rs b/geyser-plugin-manager/src/geyser_plugin_service.rs index ff3e050dc4b391..8e293cbddbbeb0 100644 --- a/geyser-plugin-manager/src/geyser_plugin_service.rs +++ b/geyser-plugin-manager/src/geyser_plugin_service.rs @@ -5,7 +5,7 @@ use { block_metadata_notifier_interface::BlockMetadataNotifierArc, entry_notifier::EntryNotifierImpl, geyser_plugin_manager::{GeyserPluginManager, GeyserPluginManagerRequest}, - slot_status_notifier::SlotStatusNotifierImpl, + slot_status_notifier::{SlotStatusNotifier, SlotStatusNotifierImpl}, slot_status_observer::SlotStatusObserver, transaction_notifier::TransactionNotifierImpl, }, @@ -37,6 +37,7 @@ pub struct GeyserPluginService { transaction_notifier: Option, entry_notifier: Option, block_metadata_notifier: Option, + slot_status_notifier: Option, } impl GeyserPluginService { @@ -107,9 +108,10 @@ impl GeyserPluginService { None }; - let (slot_status_observer, block_metadata_notifier): ( + let (slot_status_observer, block_metadata_notifier, slot_status_notifier): ( Option, Option, + Option, ) = if account_data_notifications_enabled || transaction_notifications_enabled || entry_notifications_enabled @@ -119,14 +121,15 @@ impl GeyserPluginService { ( Some(SlotStatusObserver::new( confirmed_bank_receiver, - slot_status_notifier, + slot_status_notifier.clone(), )), Some(Arc::new(BlockMetadataNotifierImpl::new( plugin_manager.clone(), ))), + Some(slot_status_notifier), ) } else { - (None, None) + (None, None, None) }; // Initialize plugin manager rpc handler thread if needed @@ -143,6 +146,7 @@ impl GeyserPluginService { transaction_notifier, entry_notifier, block_metadata_notifier, + slot_status_notifier, }) } @@ -172,6 +176,10 @@ impl GeyserPluginService { self.block_metadata_notifier.clone() } + pub fn get_slot_status_notifier(&self) -> Option { + self.slot_status_notifier.clone() + } + pub fn join(self) -> thread::Result<()> { if let Some(mut slot_status_observer) = self.slot_status_observer { slot_status_observer.join()?; diff --git a/geyser-plugin-manager/src/slot_status_notifier.rs b/geyser-plugin-manager/src/slot_status_notifier.rs index 1557bb2d4d8c36..18ea942810ef41 100644 --- a/geyser-plugin-manager/src/slot_status_notifier.rs +++ b/geyser-plugin-manager/src/slot_status_notifier.rs @@ -17,6 +17,9 @@ pub trait SlotStatusNotifierInterface { /// Notified when a slot is rooted. fn notify_slot_rooted(&self, slot: Slot, parent: Option); + + /// Notified when the first shred is received for a slot. + fn notify_first_shred_received(&self, slot: Slot); } pub type SlotStatusNotifier = Arc>; @@ -37,6 +40,10 @@ impl SlotStatusNotifierInterface for SlotStatusNotifierImpl { fn notify_slot_rooted(&self, slot: Slot, parent: Option) { self.notify_slot_status(slot, parent, SlotStatus::Rooted); } + + fn notify_first_shred_received(&self, slot: Slot) { + self.notify_slot_status(slot, None, SlotStatus::FirstShredReceived); + } } impl SlotStatusNotifierImpl { diff --git a/programs/sbf/Cargo.lock b/programs/sbf/Cargo.lock index 9807b7e2afa2e2..a3207197f0827f 100644 --- a/programs/sbf/Cargo.lock +++ b/programs/sbf/Cargo.lock @@ -6976,6 +6976,7 @@ dependencies = [ "rustls 0.23.13", "solana-entry", "solana-feature-set", + "solana-geyser-plugin-manager", "solana-gossip", "solana-ledger", "solana-measure", diff --git a/turbine/Cargo.toml b/turbine/Cargo.toml index 0addbaf6866f74..ff035eaf11fdea 100644 --- a/turbine/Cargo.toml +++ b/turbine/Cargo.toml @@ -25,6 +25,7 @@ rayon = { workspace = true } rustls = { workspace = true } solana-entry = { workspace = true } solana-feature-set = { workspace = true } +solana-geyser-plugin-manager = { workspace = true } solana-gossip = { workspace = true } solana-ledger = { workspace = true } solana-measure = { workspace = true } diff --git a/turbine/benches/retransmit_stage.rs b/turbine/benches/retransmit_stage.rs index c5490d5670e6c6..75c7ad06bdde34 100644 --- a/turbine/benches/retransmit_stage.rs +++ b/turbine/benches/retransmit_stage.rs @@ -126,6 +126,7 @@ fn bench_retransmitter(bencher: &mut Bencher) { shreds_receiver, Arc::default(), // solana_rpc::max_slots::MaxSlots None, + None, ); let mut index = 0; diff --git a/turbine/src/retransmit_stage.rs b/turbine/src/retransmit_stage.rs index d8d13e7f935cad..b5e67cd3203a40 100644 --- a/turbine/src/retransmit_stage.rs +++ b/turbine/src/retransmit_stage.rs @@ -9,6 +9,7 @@ use { lru::LruCache, rand::Rng, rayon::{prelude::*, ThreadPool, ThreadPoolBuilder}, + solana_geyser_plugin_manager::slot_status_notifier::SlotStatusNotifier, solana_gossip::{cluster_info::ClusterInfo, contact_info::Protocol}, solana_ledger::{ leader_schedule_cache::LeaderScheduleCache, @@ -184,6 +185,7 @@ fn retransmit( shred_deduper: &mut ShredDeduper<2>, max_slots: &MaxSlots, rpc_subscriptions: Option<&RpcSubscriptions>, + slot_status_notifier: Option<&SlotStatusNotifier>, ) -> Result<(), RecvTimeoutError> { const RECV_TIMEOUT: Duration = Duration::from_secs(1); let mut shreds = shreds_receiver.recv_timeout(RECV_TIMEOUT)?; @@ -299,7 +301,12 @@ fn retransmit( .reduce(HashMap::new, RetransmitSlotStats::merge) }) }; - stats.upsert_slot_stats(slot_stats, root_bank.slot(), rpc_subscriptions); + stats.upsert_slot_stats( + slot_stats, + root_bank.slot(), + rpc_subscriptions, + slot_status_notifier, + ); timer_start.stop(); stats.total_time += timer_start.as_us(); stats.maybe_submit(&root_bank, &working_bank, cluster_info, cluster_nodes_cache); @@ -381,6 +388,7 @@ pub fn retransmitter( shreds_receiver: Receiver>>, max_slots: Arc, rpc_subscriptions: Option>, + slot_status_notifier: Option, ) -> JoinHandle<()> { let cluster_nodes_cache = ClusterNodesCache::::new( CLUSTER_NODES_CACHE_NUM_EPOCH_CAP, @@ -412,6 +420,7 @@ pub fn retransmitter( &mut shred_deduper, &max_slots, rpc_subscriptions.as_deref(), + slot_status_notifier.as_ref(), ) { Ok(()) => (), Err(RecvTimeoutError::Timeout) => (), @@ -435,6 +444,7 @@ impl RetransmitStage { retransmit_receiver: Receiver>>, max_slots: Arc, rpc_subscriptions: Option>, + slot_status_notifier: Option, ) -> Self { let retransmit_thread_handle = retransmitter( retransmit_sockets, @@ -445,6 +455,7 @@ impl RetransmitStage { retransmit_receiver, max_slots, rpc_subscriptions, + slot_status_notifier, ); Self { @@ -507,6 +518,7 @@ impl RetransmitStats { feed: I, root: Slot, rpc_subscriptions: Option<&RpcSubscriptions>, + slot_status_notifier: Option<&SlotStatusNotifier>, ) where I: IntoIterator, { @@ -523,6 +535,16 @@ impl RetransmitStats { datapoint_info!("retransmit-first-shred", ("slot", slot, i64)); } } + + if let Some(slot_status_notifier) = slot_status_notifier { + if slot > root { + slot_status_notifier + .read() + .unwrap() + .notify_first_shred_received(slot); + } + } + self.slot_stats.put(slot, slot_stats); } Some(entry) => {