Skip to content

Commit

Permalink
Support dead slot notification in geyser (#3163)
Browse files Browse the repository at this point in the history
* support dead slot in geyser

* Add unit tests for SlotStatusNotifier for dead slot

* Add unit tests for SlotStatusNotifier for dead slot

* stream the dead reason for slot
  • Loading branch information
lijunwangs authored Oct 28, 2024
1 parent 99898f5 commit 8abbd16
Show file tree
Hide file tree
Showing 4 changed files with 71 additions and 6 deletions.
60 changes: 57 additions & 3 deletions core/src/replay_stage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -699,6 +699,7 @@ impl ReplayStage {
&bank_notification_sender,
&rewards_recorder_sender,
&rpc_subscriptions,
&slot_status_notifier,
&mut duplicate_slots_tracker,
&duplicate_confirmed_slots,
&mut epoch_slots_frozen_slots,
Expand Down Expand Up @@ -2242,6 +2243,7 @@ impl ReplayStage {
root: Slot,
err: &BlockstoreProcessorError,
rpc_subscriptions: &Arc<RpcSubscriptions>,
slot_status_notifier: &Option<SlotStatusNotifier>,
duplicate_slots_tracker: &mut DuplicateSlotsTracker,
duplicate_confirmed_slots: &DuplicateConfirmedSlots,
epoch_slots_frozen_slots: &mut EpochSlotsFrozenSlots,
Expand Down Expand Up @@ -2282,11 +2284,21 @@ impl ReplayStage {

blockstore.slots_stats.mark_dead(slot);

let err = format!("error: {err:?}");

if let Some(slot_status_notifier) = slot_status_notifier {
slot_status_notifier
.read()
.unwrap()
.notify_slot_dead(slot, err.clone());
}

rpc_subscriptions.notify_slot_update(SlotUpdate::Dead {
slot,
err: format!("error: {err:?}"),
err,
timestamp: timestamp(),
});

let dead_state = DeadState::new_from_state(
slot,
duplicate_slots_tracker,
Expand Down Expand Up @@ -2994,6 +3006,7 @@ impl ReplayStage {
bank_notification_sender: &Option<BankNotificationSenderConfig>,
rewards_recorder_sender: &Option<RewardsRecorderSender>,
rpc_subscriptions: &Arc<RpcSubscriptions>,
slot_status_notifier: &Option<SlotStatusNotifier>,
duplicate_slots_tracker: &mut DuplicateSlotsTracker,
duplicate_confirmed_slots: &DuplicateConfirmedSlots,
epoch_slots_frozen_slots: &mut EpochSlotsFrozenSlots,
Expand Down Expand Up @@ -3034,6 +3047,7 @@ impl ReplayStage {
root,
err,
rpc_subscriptions,
slot_status_notifier,
duplicate_slots_tracker,
duplicate_confirmed_slots,
epoch_slots_frozen_slots,
Expand Down Expand Up @@ -3082,6 +3096,7 @@ impl ReplayStage {
root,
&BlockstoreProcessorError::InvalidTransaction(err),
rpc_subscriptions,
slot_status_notifier,
duplicate_slots_tracker,
duplicate_confirmed_slots,
epoch_slots_frozen_slots,
Expand Down Expand Up @@ -3113,6 +3128,7 @@ impl ReplayStage {
root,
&result_err,
rpc_subscriptions,
slot_status_notifier,
duplicate_slots_tracker,
duplicate_confirmed_slots,
epoch_slots_frozen_slots,
Expand Down Expand Up @@ -3291,6 +3307,7 @@ impl ReplayStage {
bank_notification_sender: &Option<BankNotificationSenderConfig>,
rewards_recorder_sender: &Option<RewardsRecorderSender>,
rpc_subscriptions: &Arc<RpcSubscriptions>,
slot_status_notifier: &Option<SlotStatusNotifier>,
duplicate_slots_tracker: &mut DuplicateSlotsTracker,
duplicate_confirmed_slots: &DuplicateConfirmedSlots,
epoch_slots_frozen_slots: &mut EpochSlotsFrozenSlots,
Expand Down Expand Up @@ -3373,6 +3390,7 @@ impl ReplayStage {
bank_notification_sender,
rewards_recorder_sender,
rpc_subscriptions,
slot_status_notifier,
duplicate_slots_tracker,
duplicate_confirmed_slots,
epoch_slots_frozen_slots,
Expand Down Expand Up @@ -4201,6 +4219,7 @@ pub(crate) mod tests {
solana_rpc::{
optimistically_confirmed_bank_tracker::OptimisticallyConfirmedBank,
rpc::{create_test_transaction_entries, populate_blockstore_for_tests},
slot_status_notifier::SlotStatusNotifierInterface,
},
solana_runtime::{
accounts_background_service::AbsRequestSender,
Expand All @@ -4226,7 +4245,7 @@ pub(crate) mod tests {
std::{
fs::remove_dir_all,
iter,
sync::{atomic::AtomicU64, Arc, RwLock},
sync::{atomic::AtomicU64, Arc, Mutex, RwLock},
},
tempfile::tempdir,
test_case::test_case,
Expand Down Expand Up @@ -4871,6 +4890,34 @@ pub(crate) mod tests {
);
}

struct SlotStatusNotifierForTest {
dead_slots: Arc<Mutex<HashSet<Slot>>>,
}

impl SlotStatusNotifierForTest {
pub fn new(dead_slots: Arc<Mutex<HashSet<Slot>>>) -> Self {
Self { dead_slots }
}
}

impl SlotStatusNotifierInterface for SlotStatusNotifierForTest {
fn notify_slot_confirmed(&self, _slot: Slot, _parent: Option<Slot>) {}

fn notify_slot_processed(&self, _slot: Slot, _parent: Option<Slot>) {}

fn notify_slot_rooted(&self, _slot: Slot, _parent: Option<Slot>) {}

fn notify_first_shred_received(&self, _slot: Slot) {}

fn notify_completed(&self, _slot: Slot) {}

fn notify_created_bank(&self, _slot: Slot, _parent: Slot) {}

fn notify_slot_dead(&self, slot: Slot, _error: String) {
self.dead_slots.lock().unwrap().insert(slot);
}
}

// Given a shred and a fatal expected error, check that replaying that shred causes causes the fork to be
// marked as dead. Returns the error for caller to verify.
fn check_dead_fork<F>(shred_to_insert: F) -> result::Result<(), BlockstoreProcessorError>
Expand Down Expand Up @@ -4939,13 +4986,20 @@ pub(crate) mod tests {
));
let (ancestor_hashes_replay_update_sender, _ancestor_hashes_replay_update_receiver) =
unbounded();
let dead_slots = Arc::new(Mutex::new(HashSet::default()));

let slot_status_notifier: Option<SlotStatusNotifier> = Some(Arc::new(RwLock::new(
SlotStatusNotifierForTest::new(dead_slots.clone()),
)));

if let Err(err) = &res {
ReplayStage::mark_dead_slot(
&blockstore,
&bank1,
0,
err,
&rpc_subscriptions,
&slot_status_notifier,
&mut DuplicateSlotsTracker::default(),
&DuplicateConfirmedSlots::new(),
&mut EpochSlotsFrozenSlots::default(),
Expand All @@ -4956,7 +5010,7 @@ pub(crate) mod tests {
&mut PurgeRepairSlotCounter::default(),
);
}

assert!(dead_slots.lock().unwrap().contains(&bank1.slot()));
// Check that the erroring bank was marked as dead in the progress map
assert!(progress
.get(&bank1.slot())
Expand Down
8 changes: 6 additions & 2 deletions geyser-plugin-interface/src/geyser_plugin_interface.rs
Original file line number Diff line number Diff line change
Expand Up @@ -306,7 +306,7 @@ pub enum GeyserPluginError {
}

/// The current status of a slot
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
#[derive(Debug, Clone, PartialEq, Eq)]
#[repr(u32)]
pub enum SlotStatus {
/// The highest slot of the heaviest fork processed by the node. Ledger state at this slot is
Expand All @@ -328,6 +328,9 @@ pub enum SlotStatus {

/// A new bank fork is created with the slot
CreatedBank,

/// A slot is marked dead
Dead(String),
}

impl SlotStatus {
Expand All @@ -339,6 +342,7 @@ impl SlotStatus {
SlotStatus::FirstShredReceived => "first_shread_received",
SlotStatus::Completed => "completed",
SlotStatus::CreatedBank => "created_bank",
SlotStatus::Dead(_error) => "dead",
}
}
}
Expand Down Expand Up @@ -419,7 +423,7 @@ pub trait GeyserPlugin: Any + Send + Sync + std::fmt::Debug {
&self,
slot: Slot,
parent: Option<u64>,
status: SlotStatus,
status: &SlotStatus,
) -> Result<()> {
Ok(())
}
Expand Down
6 changes: 5 additions & 1 deletion geyser-plugin-manager/src/slot_status_notifier.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,10 @@ impl SlotStatusNotifierInterface for SlotStatusNotifierImpl {
fn notify_created_bank(&self, slot: Slot, parent: Slot) {
self.notify_slot_status(slot, Some(parent), SlotStatus::CreatedBank);
}

fn notify_slot_dead(&self, slot: Slot, error: String) {
self.notify_slot_status(slot, None, SlotStatus::Dead(error));
}
}

impl SlotStatusNotifierImpl {
Expand All @@ -52,7 +56,7 @@ impl SlotStatusNotifierImpl {

for plugin in plugin_manager.plugins.iter() {
let mut measure = Measure::start("geyser-plugin-update-slot");
match plugin.update_slot_status(slot, parent, slot_status) {
match plugin.update_slot_status(slot, parent, &slot_status) {
Err(err) => {
error!(
"Failed to update slot status at slot {}, error: {} to plugin {}",
Expand Down
3 changes: 3 additions & 0 deletions rpc/src/slot_status_notifier.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,9 @@ pub trait SlotStatusNotifierInterface {

/// Notified when the slot has bank created.
fn notify_created_bank(&self, slot: Slot, parent: Slot);

/// Notified when the slot is marked "Dead"
fn notify_slot_dead(&self, slot: Slot, error: String);
}

pub type SlotStatusNotifier = Arc<RwLock<dyn SlotStatusNotifierInterface + Sync + Send>>;

0 comments on commit 8abbd16

Please sign in to comment.