From e273d87a29961581404238fac61b65ed1ff3d977 Mon Sep 17 00:00:00 2001 From: Lijun Wang <83639177+lijunwangs@users.noreply.github.com> Date: Sat, 21 Sep 2024 02:50:36 -0700 Subject: [PATCH 1/6] support notify first shred received in geyser --- Cargo.lock | 1 + core/src/tvu.rs | 4 +++- core/src/validator.rs | 5 +++++ .../src/geyser_plugin_interface.rs | 4 ++++ .../src/geyser_plugin_service.rs | 16 ++++++++++++---- .../src/slot_status_notifier.rs | 7 +++++++ turbine/Cargo.toml | 1 + turbine/src/retransmit_stage.rs | 16 +++++++++++++++- 8 files changed, 48 insertions(+), 6 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 51b063750b0bf4..6c030fe71362cd 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -8373,6 +8373,7 @@ dependencies = [ "rustls 0.23.13", "solana-entry", "solana-feature-set", + "solana-geyser-plugin-manager", "solana-gossip", "solana-ledger", "solana-logger", diff --git a/core/src/tvu.rs b/core/src/tvu.rs index 5134228c34ac4c..663dc3da5008d7 100644 --- a/core/src/tvu.rs +++ b/core/src/tvu.rs @@ -25,7 +25,7 @@ use { bytes::Bytes, crossbeam_channel::{unbounded, Receiver, Sender}, solana_client::connection_cache::ConnectionCache, - solana_geyser_plugin_manager::block_metadata_notifier_interface::BlockMetadataNotifierArc, + solana_geyser_plugin_manager::{block_metadata_notifier_interface::BlockMetadataNotifierArc, slot_status_notifier::SlotStatusNotifier}, solana_gossip::{ cluster_info::ClusterInfo, duplicate_shred_handler::DuplicateShredHandler, duplicate_shred_listener::DuplicateShredListener, @@ -161,6 +161,7 @@ impl Tvu { outstanding_repair_requests: Arc>, cluster_slots: Arc, wen_restart_repair_slots: Option>>>, + slot_status_notifier: Option, ) -> Result { let in_wen_restart = wen_restart_repair_slots.is_some(); @@ -210,6 +211,7 @@ impl Tvu { retransmit_receiver, max_slots.clone(), Some(rpc_subscriptions.clone()), + slot_status_notifier, ); let (ancestor_duplicate_slots_sender, ancestor_duplicate_slots_receiver) = unbounded(); diff --git a/core/src/validator.rs b/core/src/validator.rs index aa9b9e81ab6fb2..3ea9593a5efbb1 100644 --- a/core/src/validator.rs +++ b/core/src/validator.rs @@ -674,6 +674,10 @@ impl Validator { .as_ref() .and_then(|geyser_plugin_service| geyser_plugin_service.get_block_metadata_notifier()); + let slot_status_notifier = geyser_plugin_service + .as_ref() + .and_then(|geyser_plugin_service| geyser_plugin_service.get_slot_status_notifier()); + info!( "Geyser plugin: accounts_update_notifier: {}, transaction_notifier: {}, \ entry_notifier: {}", @@ -1405,6 +1409,7 @@ impl Validator { outstanding_repair_requests.clone(), cluster_slots.clone(), wen_restart_repair_slots.clone(), + slot_status_notifier, ) .map_err(ValidatorError::Other)?; diff --git a/geyser-plugin-interface/src/geyser_plugin_interface.rs b/geyser-plugin-interface/src/geyser_plugin_interface.rs index 1dec992bd6f10b..6ac7bb848b1444 100644 --- a/geyser-plugin-interface/src/geyser_plugin_interface.rs +++ b/geyser-plugin-interface/src/geyser_plugin_interface.rs @@ -319,6 +319,9 @@ pub enum SlotStatus { /// The highest slot that has been voted on by supermajority of the cluster, ie. is confirmed. Confirmed, + + /// First Shred Received + FirstShredReceived, } impl SlotStatus { @@ -327,6 +330,7 @@ impl SlotStatus { SlotStatus::Confirmed => "confirmed", SlotStatus::Processed => "processed", SlotStatus::Rooted => "rooted", + SlotStatus::FirstShredReceived => "first_shread_received", } } } diff --git a/geyser-plugin-manager/src/geyser_plugin_service.rs b/geyser-plugin-manager/src/geyser_plugin_service.rs index ff3e050dc4b391..3cc7f8398c6d16 100644 --- a/geyser-plugin-manager/src/geyser_plugin_service.rs +++ b/geyser-plugin-manager/src/geyser_plugin_service.rs @@ -5,7 +5,7 @@ use { block_metadata_notifier_interface::BlockMetadataNotifierArc, entry_notifier::EntryNotifierImpl, geyser_plugin_manager::{GeyserPluginManager, GeyserPluginManagerRequest}, - slot_status_notifier::SlotStatusNotifierImpl, + slot_status_notifier::{SlotStatusNotifier, SlotStatusNotifierImpl}, slot_status_observer::SlotStatusObserver, transaction_notifier::TransactionNotifierImpl, }, @@ -37,6 +37,7 @@ pub struct GeyserPluginService { transaction_notifier: Option, entry_notifier: Option, block_metadata_notifier: Option, + slot_status_notifier: Option, } impl GeyserPluginService { @@ -107,9 +108,10 @@ impl GeyserPluginService { None }; - let (slot_status_observer, block_metadata_notifier): ( + let (slot_status_observer, block_metadata_notifier, slot_status_notifier): ( Option, Option, + Option ) = if account_data_notifications_enabled || transaction_notifications_enabled || entry_notifications_enabled @@ -119,14 +121,15 @@ impl GeyserPluginService { ( Some(SlotStatusObserver::new( confirmed_bank_receiver, - slot_status_notifier, + slot_status_notifier.clone(), )), Some(Arc::new(BlockMetadataNotifierImpl::new( plugin_manager.clone(), ))), + Some(slot_status_notifier), ) } else { - (None, None) + (None, None, None) }; // Initialize plugin manager rpc handler thread if needed @@ -143,6 +146,7 @@ impl GeyserPluginService { transaction_notifier, entry_notifier, block_metadata_notifier, + slot_status_notifier, }) } @@ -172,6 +176,10 @@ impl GeyserPluginService { self.block_metadata_notifier.clone() } + pub fn get_slot_status_notifier(&self) -> Option { + self.slot_status_notifier.clone() + } + pub fn join(self) -> thread::Result<()> { if let Some(mut slot_status_observer) = self.slot_status_observer { slot_status_observer.join()?; diff --git a/geyser-plugin-manager/src/slot_status_notifier.rs b/geyser-plugin-manager/src/slot_status_notifier.rs index 1557bb2d4d8c36..00aa5bfc08ba85 100644 --- a/geyser-plugin-manager/src/slot_status_notifier.rs +++ b/geyser-plugin-manager/src/slot_status_notifier.rs @@ -17,6 +17,9 @@ pub trait SlotStatusNotifierInterface { /// Notified when a slot is rooted. fn notify_slot_rooted(&self, slot: Slot, parent: Option); + + /// Notified when the first shred is received for a slot. + fn notify_shred_received(&self, slot: Slot); } pub type SlotStatusNotifier = Arc>; @@ -37,6 +40,10 @@ impl SlotStatusNotifierInterface for SlotStatusNotifierImpl { fn notify_slot_rooted(&self, slot: Slot, parent: Option) { self.notify_slot_status(slot, parent, SlotStatus::Rooted); } + + fn notify_shred_received(&self, slot: Slot) { + self.notify_slot_status(slot, None, SlotStatus::FirstShredReceived); + } } impl SlotStatusNotifierImpl { diff --git a/turbine/Cargo.toml b/turbine/Cargo.toml index 0addbaf6866f74..6951fb6662d368 100644 --- a/turbine/Cargo.toml +++ b/turbine/Cargo.toml @@ -26,6 +26,7 @@ rustls = { workspace = true } solana-entry = { workspace = true } solana-feature-set = { workspace = true } solana-gossip = { workspace = true } +solana-geyser-plugin-manager = { workspace = true } solana-ledger = { workspace = true } solana-measure = { workspace = true } solana-metrics = { workspace = true } diff --git a/turbine/src/retransmit_stage.rs b/turbine/src/retransmit_stage.rs index d8d13e7f935cad..cbdd2b1ee7029f 100644 --- a/turbine/src/retransmit_stage.rs +++ b/turbine/src/retransmit_stage.rs @@ -10,6 +10,7 @@ use { rand::Rng, rayon::{prelude::*, ThreadPool, ThreadPoolBuilder}, solana_gossip::{cluster_info::ClusterInfo, contact_info::Protocol}, + solana_geyser_plugin_manager::slot_status_notifier::SlotStatusNotifier, solana_ledger::{ leader_schedule_cache::LeaderScheduleCache, shred::{self, ShredId}, @@ -184,6 +185,7 @@ fn retransmit( shred_deduper: &mut ShredDeduper<2>, max_slots: &MaxSlots, rpc_subscriptions: Option<&RpcSubscriptions>, + slot_status_notifier: Option, ) -> Result<(), RecvTimeoutError> { const RECV_TIMEOUT: Duration = Duration::from_secs(1); let mut shreds = shreds_receiver.recv_timeout(RECV_TIMEOUT)?; @@ -299,7 +301,7 @@ fn retransmit( .reduce(HashMap::new, RetransmitSlotStats::merge) }) }; - stats.upsert_slot_stats(slot_stats, root_bank.slot(), rpc_subscriptions); + stats.upsert_slot_stats(slot_stats, root_bank.slot(), rpc_subscriptions, slot_status_notifier); timer_start.stop(); stats.total_time += timer_start.as_us(); stats.maybe_submit(&root_bank, &working_bank, cluster_info, cluster_nodes_cache); @@ -381,6 +383,7 @@ pub fn retransmitter( shreds_receiver: Receiver>>, max_slots: Arc, rpc_subscriptions: Option>, + slot_status_notifier: Option, ) -> JoinHandle<()> { let cluster_nodes_cache = ClusterNodesCache::::new( CLUSTER_NODES_CACHE_NUM_EPOCH_CAP, @@ -412,6 +415,7 @@ pub fn retransmitter( &mut shred_deduper, &max_slots, rpc_subscriptions.as_deref(), + slot_status_notifier.clone(), ) { Ok(()) => (), Err(RecvTimeoutError::Timeout) => (), @@ -435,6 +439,7 @@ impl RetransmitStage { retransmit_receiver: Receiver>>, max_slots: Arc, rpc_subscriptions: Option>, + slot_status_notifier: Option, ) -> Self { let retransmit_thread_handle = retransmitter( retransmit_sockets, @@ -445,6 +450,7 @@ impl RetransmitStage { retransmit_receiver, max_slots, rpc_subscriptions, + slot_status_notifier, ); Self { @@ -507,6 +513,7 @@ impl RetransmitStats { feed: I, root: Slot, rpc_subscriptions: Option<&RpcSubscriptions>, + slot_status_notifier: Option, ) where I: IntoIterator, { @@ -523,6 +530,13 @@ impl RetransmitStats { datapoint_info!("retransmit-first-shred", ("slot", slot, i64)); } } + + if let Some(slot_status_notifier) = &slot_status_notifier { + if slot > root { + slot_status_notifier.read().unwrap().notify_shred_received(slot); + } + } + self.slot_stats.put(slot, slot_stats); } Some(entry) => { From 9549543c319005750311b2eada1bb0567181a133 Mon Sep 17 00:00:00 2001 From: Lijun Wang <83639177+lijunwangs@users.noreply.github.com> Date: Sat, 21 Sep 2024 15:53:53 -0700 Subject: [PATCH 2/6] Fixed unit test --- core/src/tvu.rs | 6 +++++- .../src/geyser_plugin_service.rs | 2 +- .../src/slot_status_notifier.rs | 4 ++-- turbine/src/retransmit_stage.rs | 16 ++++++++++++---- 4 files changed, 20 insertions(+), 8 deletions(-) diff --git a/core/src/tvu.rs b/core/src/tvu.rs index 663dc3da5008d7..5d5e18bc241395 100644 --- a/core/src/tvu.rs +++ b/core/src/tvu.rs @@ -25,7 +25,10 @@ use { bytes::Bytes, crossbeam_channel::{unbounded, Receiver, Sender}, solana_client::connection_cache::ConnectionCache, - solana_geyser_plugin_manager::{block_metadata_notifier_interface::BlockMetadataNotifierArc, slot_status_notifier::SlotStatusNotifier}, + solana_geyser_plugin_manager::{ + block_metadata_notifier_interface::BlockMetadataNotifierArc, + slot_status_notifier::SlotStatusNotifier, + }, solana_gossip::{ cluster_info::ClusterInfo, duplicate_shred_handler::DuplicateShredHandler, duplicate_shred_listener::DuplicateShredListener, @@ -544,6 +547,7 @@ pub mod tests { outstanding_repair_requests, cluster_slots, wen_restart_repair_slots, + None, ) .expect("assume success"); if enable_wen_restart { diff --git a/geyser-plugin-manager/src/geyser_plugin_service.rs b/geyser-plugin-manager/src/geyser_plugin_service.rs index 3cc7f8398c6d16..8e293cbddbbeb0 100644 --- a/geyser-plugin-manager/src/geyser_plugin_service.rs +++ b/geyser-plugin-manager/src/geyser_plugin_service.rs @@ -111,7 +111,7 @@ impl GeyserPluginService { let (slot_status_observer, block_metadata_notifier, slot_status_notifier): ( Option, Option, - Option + Option, ) = if account_data_notifications_enabled || transaction_notifications_enabled || entry_notifications_enabled diff --git a/geyser-plugin-manager/src/slot_status_notifier.rs b/geyser-plugin-manager/src/slot_status_notifier.rs index 00aa5bfc08ba85..18ea942810ef41 100644 --- a/geyser-plugin-manager/src/slot_status_notifier.rs +++ b/geyser-plugin-manager/src/slot_status_notifier.rs @@ -19,7 +19,7 @@ pub trait SlotStatusNotifierInterface { fn notify_slot_rooted(&self, slot: Slot, parent: Option); /// Notified when the first shred is received for a slot. - fn notify_shred_received(&self, slot: Slot); + fn notify_first_shred_received(&self, slot: Slot); } pub type SlotStatusNotifier = Arc>; @@ -41,7 +41,7 @@ impl SlotStatusNotifierInterface for SlotStatusNotifierImpl { self.notify_slot_status(slot, parent, SlotStatus::Rooted); } - fn notify_shred_received(&self, slot: Slot) { + fn notify_first_shred_received(&self, slot: Slot) { self.notify_slot_status(slot, None, SlotStatus::FirstShredReceived); } } diff --git a/turbine/src/retransmit_stage.rs b/turbine/src/retransmit_stage.rs index cbdd2b1ee7029f..30cdccb98d15fd 100644 --- a/turbine/src/retransmit_stage.rs +++ b/turbine/src/retransmit_stage.rs @@ -9,8 +9,8 @@ use { lru::LruCache, rand::Rng, rayon::{prelude::*, ThreadPool, ThreadPoolBuilder}, - solana_gossip::{cluster_info::ClusterInfo, contact_info::Protocol}, solana_geyser_plugin_manager::slot_status_notifier::SlotStatusNotifier, + solana_gossip::{cluster_info::ClusterInfo, contact_info::Protocol}, solana_ledger::{ leader_schedule_cache::LeaderScheduleCache, shred::{self, ShredId}, @@ -301,7 +301,12 @@ fn retransmit( .reduce(HashMap::new, RetransmitSlotStats::merge) }) }; - stats.upsert_slot_stats(slot_stats, root_bank.slot(), rpc_subscriptions, slot_status_notifier); + stats.upsert_slot_stats( + slot_stats, + root_bank.slot(), + rpc_subscriptions, + slot_status_notifier, + ); timer_start.stop(); stats.total_time += timer_start.as_us(); stats.maybe_submit(&root_bank, &working_bank, cluster_info, cluster_nodes_cache); @@ -383,7 +388,7 @@ pub fn retransmitter( shreds_receiver: Receiver>>, max_slots: Arc, rpc_subscriptions: Option>, - slot_status_notifier: Option, + slot_status_notifier: Option, ) -> JoinHandle<()> { let cluster_nodes_cache = ClusterNodesCache::::new( CLUSTER_NODES_CACHE_NUM_EPOCH_CAP, @@ -533,7 +538,10 @@ impl RetransmitStats { if let Some(slot_status_notifier) = &slot_status_notifier { if slot > root { - slot_status_notifier.read().unwrap().notify_shred_received(slot); + slot_status_notifier + .read() + .unwrap() + .notify_first_shred_received(slot); } } From e0714b17d34206fd2432520450be83edffac5987 Mon Sep 17 00:00:00 2001 From: Lijun Wang <83639177+lijunwangs@users.noreply.github.com> Date: Mon, 23 Sep 2024 09:41:21 -0700 Subject: [PATCH 3/6] cargo.lock --- programs/sbf/Cargo.lock | 1 + 1 file changed, 1 insertion(+) diff --git a/programs/sbf/Cargo.lock b/programs/sbf/Cargo.lock index 9807b7e2afa2e2..a3207197f0827f 100644 --- a/programs/sbf/Cargo.lock +++ b/programs/sbf/Cargo.lock @@ -6976,6 +6976,7 @@ dependencies = [ "rustls 0.23.13", "solana-entry", "solana-feature-set", + "solana-geyser-plugin-manager", "solana-gossip", "solana-ledger", "solana-measure", From 4ae723b430c0e99a1f8442fa9c4b6c23ec8d5a64 Mon Sep 17 00:00:00 2001 From: Lijun Wang <83639177+lijunwangs@users.noreply.github.com> Date: Mon, 30 Sep 2024 01:21:05 -0700 Subject: [PATCH 4/6] fixed test issue --- turbine/benches/retransmit_stage.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/turbine/benches/retransmit_stage.rs b/turbine/benches/retransmit_stage.rs index c5490d5670e6c6..75c7ad06bdde34 100644 --- a/turbine/benches/retransmit_stage.rs +++ b/turbine/benches/retransmit_stage.rs @@ -126,6 +126,7 @@ fn bench_retransmitter(bencher: &mut Bencher) { shreds_receiver, Arc::default(), // solana_rpc::max_slots::MaxSlots None, + None, ); let mut index = 0; From af05a21d87dba5914bcfc53df39a4861f11f73c6 Mon Sep 17 00:00:00 2001 From: Lijun Wang <83639177+lijunwangs@users.noreply.github.com> Date: Mon, 30 Sep 2024 07:18:44 -0700 Subject: [PATCH 5/6] Cargo.toml dependency order --- turbine/Cargo.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/turbine/Cargo.toml b/turbine/Cargo.toml index 6951fb6662d368..ff035eaf11fdea 100644 --- a/turbine/Cargo.toml +++ b/turbine/Cargo.toml @@ -25,8 +25,8 @@ rayon = { workspace = true } rustls = { workspace = true } solana-entry = { workspace = true } solana-feature-set = { workspace = true } -solana-gossip = { workspace = true } solana-geyser-plugin-manager = { workspace = true } +solana-gossip = { workspace = true } solana-ledger = { workspace = true } solana-measure = { workspace = true } solana-metrics = { workspace = true } From 11a1fba86a0bd178a45b3d492809f6d5b06dca90 Mon Sep 17 00:00:00 2001 From: Lijun Wang <83639177+lijunwangs@users.noreply.github.com> Date: Thu, 3 Oct 2024 01:37:08 -0700 Subject: [PATCH 6/6] Pass reference instead of copy --- turbine/src/retransmit_stage.rs | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/turbine/src/retransmit_stage.rs b/turbine/src/retransmit_stage.rs index 30cdccb98d15fd..b5e67cd3203a40 100644 --- a/turbine/src/retransmit_stage.rs +++ b/turbine/src/retransmit_stage.rs @@ -185,7 +185,7 @@ fn retransmit( shred_deduper: &mut ShredDeduper<2>, max_slots: &MaxSlots, rpc_subscriptions: Option<&RpcSubscriptions>, - slot_status_notifier: Option, + slot_status_notifier: Option<&SlotStatusNotifier>, ) -> Result<(), RecvTimeoutError> { const RECV_TIMEOUT: Duration = Duration::from_secs(1); let mut shreds = shreds_receiver.recv_timeout(RECV_TIMEOUT)?; @@ -420,7 +420,7 @@ pub fn retransmitter( &mut shred_deduper, &max_slots, rpc_subscriptions.as_deref(), - slot_status_notifier.clone(), + slot_status_notifier.as_ref(), ) { Ok(()) => (), Err(RecvTimeoutError::Timeout) => (), @@ -518,7 +518,7 @@ impl RetransmitStats { feed: I, root: Slot, rpc_subscriptions: Option<&RpcSubscriptions>, - slot_status_notifier: Option, + slot_status_notifier: Option<&SlotStatusNotifier>, ) where I: IntoIterator, { @@ -536,7 +536,7 @@ impl RetransmitStats { } } - if let Some(slot_status_notifier) = &slot_status_notifier { + if let Some(slot_status_notifier) = slot_status_notifier { if slot > root { slot_status_notifier .read()