diff --git a/core/src/replay_stage.rs b/core/src/replay_stage.rs index b84a5f4691da5a..775f0b89f099e0 100644 --- a/core/src/replay_stage.rs +++ b/core/src/replay_stage.rs @@ -699,6 +699,7 @@ impl ReplayStage { &bank_notification_sender, &rewards_recorder_sender, &rpc_subscriptions, + &slot_status_notifier, &mut duplicate_slots_tracker, &duplicate_confirmed_slots, &mut epoch_slots_frozen_slots, @@ -2242,6 +2243,7 @@ impl ReplayStage { root: Slot, err: &BlockstoreProcessorError, rpc_subscriptions: &Arc, + slot_status_notifier: &Option, duplicate_slots_tracker: &mut DuplicateSlotsTracker, duplicate_confirmed_slots: &DuplicateConfirmedSlots, epoch_slots_frozen_slots: &mut EpochSlotsFrozenSlots, @@ -2282,11 +2284,21 @@ impl ReplayStage { blockstore.slots_stats.mark_dead(slot); + let err = format!("error: {err:?}"); + + if let Some(slot_status_notifier) = slot_status_notifier { + slot_status_notifier + .read() + .unwrap() + .notify_slot_dead(slot, err.clone()); + } + rpc_subscriptions.notify_slot_update(SlotUpdate::Dead { slot, - err: format!("error: {err:?}"), + err, timestamp: timestamp(), }); + let dead_state = DeadState::new_from_state( slot, duplicate_slots_tracker, @@ -2994,6 +3006,7 @@ impl ReplayStage { bank_notification_sender: &Option, rewards_recorder_sender: &Option, rpc_subscriptions: &Arc, + slot_status_notifier: &Option, duplicate_slots_tracker: &mut DuplicateSlotsTracker, duplicate_confirmed_slots: &DuplicateConfirmedSlots, epoch_slots_frozen_slots: &mut EpochSlotsFrozenSlots, @@ -3034,6 +3047,7 @@ impl ReplayStage { root, err, rpc_subscriptions, + slot_status_notifier, duplicate_slots_tracker, duplicate_confirmed_slots, epoch_slots_frozen_slots, @@ -3082,6 +3096,7 @@ impl ReplayStage { root, &BlockstoreProcessorError::InvalidTransaction(err), rpc_subscriptions, + slot_status_notifier, duplicate_slots_tracker, duplicate_confirmed_slots, epoch_slots_frozen_slots, @@ -3113,6 +3128,7 @@ impl ReplayStage { root, &result_err, rpc_subscriptions, + slot_status_notifier, duplicate_slots_tracker, duplicate_confirmed_slots, epoch_slots_frozen_slots, @@ -3291,6 +3307,7 @@ impl ReplayStage { bank_notification_sender: &Option, rewards_recorder_sender: &Option, rpc_subscriptions: &Arc, + slot_status_notifier: &Option, duplicate_slots_tracker: &mut DuplicateSlotsTracker, duplicate_confirmed_slots: &DuplicateConfirmedSlots, epoch_slots_frozen_slots: &mut EpochSlotsFrozenSlots, @@ -3373,6 +3390,7 @@ impl ReplayStage { bank_notification_sender, rewards_recorder_sender, rpc_subscriptions, + slot_status_notifier, duplicate_slots_tracker, duplicate_confirmed_slots, epoch_slots_frozen_slots, @@ -4201,6 +4219,7 @@ pub(crate) mod tests { solana_rpc::{ optimistically_confirmed_bank_tracker::OptimisticallyConfirmedBank, rpc::{create_test_transaction_entries, populate_blockstore_for_tests}, + slot_status_notifier::SlotStatusNotifierInterface, }, solana_runtime::{ accounts_background_service::AbsRequestSender, @@ -4226,7 +4245,7 @@ pub(crate) mod tests { std::{ fs::remove_dir_all, iter, - sync::{atomic::AtomicU64, Arc, RwLock}, + sync::{atomic::AtomicU64, Arc, Mutex, RwLock}, }, tempfile::tempdir, test_case::test_case, @@ -4871,6 +4890,34 @@ pub(crate) mod tests { ); } + struct SlotStatusNotifierForTest { + dead_slots: Arc>>, + } + + impl SlotStatusNotifierForTest { + pub fn new(dead_slots: Arc>>) -> Self { + Self { dead_slots } + } + } + + impl SlotStatusNotifierInterface for SlotStatusNotifierForTest { + fn notify_slot_confirmed(&self, _slot: Slot, _parent: Option) {} + + fn notify_slot_processed(&self, _slot: Slot, _parent: Option) {} + + fn notify_slot_rooted(&self, _slot: Slot, _parent: Option) {} + + fn notify_first_shred_received(&self, _slot: Slot) {} + + fn notify_completed(&self, _slot: Slot) {} + + fn notify_created_bank(&self, _slot: Slot, _parent: Slot) {} + + fn notify_slot_dead(&self, slot: Slot, _error: String) { + self.dead_slots.lock().unwrap().insert(slot); + } + } + // Given a shred and a fatal expected error, check that replaying that shred causes causes the fork to be // marked as dead. Returns the error for caller to verify. fn check_dead_fork(shred_to_insert: F) -> result::Result<(), BlockstoreProcessorError> @@ -4939,6 +4986,12 @@ pub(crate) mod tests { )); let (ancestor_hashes_replay_update_sender, _ancestor_hashes_replay_update_receiver) = unbounded(); + let dead_slots = Arc::new(Mutex::new(HashSet::default())); + + let slot_status_notifier: Option = Some(Arc::new(RwLock::new( + SlotStatusNotifierForTest::new(dead_slots.clone()), + ))); + if let Err(err) = &res { ReplayStage::mark_dead_slot( &blockstore, @@ -4946,6 +4999,7 @@ pub(crate) mod tests { 0, err, &rpc_subscriptions, + &slot_status_notifier, &mut DuplicateSlotsTracker::default(), &DuplicateConfirmedSlots::new(), &mut EpochSlotsFrozenSlots::default(), @@ -4956,7 +5010,7 @@ pub(crate) mod tests { &mut PurgeRepairSlotCounter::default(), ); } - + assert!(dead_slots.lock().unwrap().contains(&bank1.slot())); // Check that the erroring bank was marked as dead in the progress map assert!(progress .get(&bank1.slot()) diff --git a/geyser-plugin-interface/src/geyser_plugin_interface.rs b/geyser-plugin-interface/src/geyser_plugin_interface.rs index 3f4394a84bae8e..fafc2704fb7bfc 100644 --- a/geyser-plugin-interface/src/geyser_plugin_interface.rs +++ b/geyser-plugin-interface/src/geyser_plugin_interface.rs @@ -306,7 +306,7 @@ pub enum GeyserPluginError { } /// The current status of a slot -#[derive(Debug, Clone, Copy, PartialEq, Eq)] +#[derive(Debug, Clone, PartialEq, Eq)] #[repr(u32)] pub enum SlotStatus { /// The highest slot of the heaviest fork processed by the node. Ledger state at this slot is @@ -328,6 +328,9 @@ pub enum SlotStatus { /// A new bank fork is created with the slot CreatedBank, + + /// A slot is marked dead + Dead(String), } impl SlotStatus { @@ -339,6 +342,7 @@ impl SlotStatus { SlotStatus::FirstShredReceived => "first_shread_received", SlotStatus::Completed => "completed", SlotStatus::CreatedBank => "created_bank", + SlotStatus::Dead(_error) => "dead", } } } @@ -419,7 +423,7 @@ pub trait GeyserPlugin: Any + Send + Sync + std::fmt::Debug { &self, slot: Slot, parent: Option, - status: SlotStatus, + status: &SlotStatus, ) -> Result<()> { Ok(()) } diff --git a/geyser-plugin-manager/src/slot_status_notifier.rs b/geyser-plugin-manager/src/slot_status_notifier.rs index 27dcd03dac28f3..3269c5966856ff 100644 --- a/geyser-plugin-manager/src/slot_status_notifier.rs +++ b/geyser-plugin-manager/src/slot_status_notifier.rs @@ -37,6 +37,10 @@ impl SlotStatusNotifierInterface for SlotStatusNotifierImpl { fn notify_created_bank(&self, slot: Slot, parent: Slot) { self.notify_slot_status(slot, Some(parent), SlotStatus::CreatedBank); } + + fn notify_slot_dead(&self, slot: Slot, error: String) { + self.notify_slot_status(slot, None, SlotStatus::Dead(error)); + } } impl SlotStatusNotifierImpl { @@ -52,7 +56,7 @@ impl SlotStatusNotifierImpl { for plugin in plugin_manager.plugins.iter() { let mut measure = Measure::start("geyser-plugin-update-slot"); - match plugin.update_slot_status(slot, parent, slot_status) { + match plugin.update_slot_status(slot, parent, &slot_status) { Err(err) => { error!( "Failed to update slot status at slot {}, error: {} to plugin {}", diff --git a/rpc/src/slot_status_notifier.rs b/rpc/src/slot_status_notifier.rs index 38e9bf60a6e091..4b48d7d858e321 100644 --- a/rpc/src/slot_status_notifier.rs +++ b/rpc/src/slot_status_notifier.rs @@ -21,6 +21,9 @@ pub trait SlotStatusNotifierInterface { /// Notified when the slot has bank created. fn notify_created_bank(&self, slot: Slot, parent: Slot); + + /// Notified when the slot is marked "Dead" + fn notify_slot_dead(&self, slot: Slot, error: String); } pub type SlotStatusNotifier = Arc>;