From d48cdb92b12df2876eec9e19e6cda8c0b457c119 Mon Sep 17 00:00:00 2001 From: Kirill Fomichev Date: Sat, 7 Dec 2024 09:34:04 -0500 Subject: [PATCH] geyser: add flag `geyser-plugin-snapshot-disabled` --- .../src/accounts_db/geyser_plugin_utils.rs | 29 ++++++++++--------- .../src/accounts_update_notifier_interface.rs | 2 ++ core/src/validator.rs | 3 ++ .../src/accounts_update_notifier.rs | 12 ++++++-- .../src/geyser_plugin_service.rs | 9 ++++-- ledger-tool/src/ledger_utils.rs | 2 +- validator/src/cli.rs | 7 +++++ validator/src/main.rs | 1 + 8 files changed, 46 insertions(+), 19 deletions(-) diff --git a/accounts-db/src/accounts_db/geyser_plugin_utils.rs b/accounts-db/src/accounts_db/geyser_plugin_utils.rs index 8cddf857f1681e..8416ffd042cc38 100644 --- a/accounts-db/src/accounts_db/geyser_plugin_utils.rs +++ b/accounts-db/src/accounts_db/geyser_plugin_utils.rs @@ -39,22 +39,23 @@ impl AccountsDb { /// in the reverse order of the slots so that an account is only streamed once. At a slot, if the accounts is updated /// multiple times only the last write (with highest write_version) is notified. pub fn notify_account_restore_from_snapshot(&self) { - if self.accounts_update_notifier.is_none() { - return; - } - - let mut slots = self.storage.all_slots(); - let mut notified_accounts: HashSet = HashSet::default(); - let mut notify_stats = GeyserPluginNotifyAtSnapshotRestoreStats::default(); + if let Some(accounts_update_notifier) = &self.accounts_update_notifier { + if accounts_update_notifier.notify_snapshot_disabled() { + accounts_update_notifier.notify_end_of_restore_from_snapshot(); + } else { + let mut slots = self.storage.all_slots(); + let mut notified_accounts: HashSet = HashSet::default(); + let mut notify_stats = GeyserPluginNotifyAtSnapshotRestoreStats::default(); + + slots.sort_by(|a, b| b.cmp(a)); + for slot in slots { + self.notify_accounts_in_slot(slot, &mut notified_accounts, &mut notify_stats); + } - slots.sort_by(|a, b| b.cmp(a)); - for slot in slots { - self.notify_accounts_in_slot(slot, &mut notified_accounts, &mut notify_stats); + accounts_update_notifier.notify_end_of_restore_from_snapshot(); + notify_stats.report(); + } } - - let accounts_update_notifier = self.accounts_update_notifier.as_ref().unwrap(); - accounts_update_notifier.notify_end_of_restore_from_snapshot(); - notify_stats.report(); } pub fn notify_account_at_accounts_update( diff --git a/accounts-db/src/accounts_update_notifier_interface.rs b/accounts-db/src/accounts_update_notifier_interface.rs index ec86fce8cd6898..6b54bca14e835a 100644 --- a/accounts-db/src/accounts_update_notifier_interface.rs +++ b/accounts-db/src/accounts_update_notifier_interface.rs @@ -7,6 +7,8 @@ use { }; pub trait AccountsUpdateNotifierInterface: std::fmt::Debug { + fn notify_snapshot_disabled(&self) -> bool; + /// Notified when an account is updated at runtime, due to transaction activities fn notify_account_update( &self, diff --git a/core/src/validator.rs b/core/src/validator.rs index c3318ee070f2bc..dc74dd226c201b 100644 --- a/core/src/validator.rs +++ b/core/src/validator.rs @@ -227,6 +227,7 @@ pub struct ValidatorConfig { /// Specifies which plugins to start up with pub on_start_geyser_plugin_config_files: Option>, pub geyser_plugin_always_enabled: bool, + pub geyser_plugin_snapshot_disabled: bool, pub rpc_addrs: Option<(SocketAddr, SocketAddr)>, // (JsonRpc, JsonRpcPubSub) pub pubsub_config: PubSubConfig, pub snapshot_config: SnapshotConfig, @@ -303,6 +304,7 @@ impl Default for ValidatorConfig { rpc_config: JsonRpcConfig::default(), on_start_geyser_plugin_config_files: None, geyser_plugin_always_enabled: false, + geyser_plugin_snapshot_disabled: false, rpc_addrs: None, pubsub_config: PubSubConfig::default(), snapshot_config: SnapshotConfig::new_load_only(), @@ -602,6 +604,7 @@ impl Validator { GeyserPluginService::new_with_receiver( confirmed_bank_receiver, config.geyser_plugin_always_enabled, + config.geyser_plugin_snapshot_disabled, geyser_plugin_config_files.as_ref(), rpc_to_plugin_manager_receiver_and_exit, ) diff --git a/geyser-plugin-manager/src/accounts_update_notifier.rs b/geyser-plugin-manager/src/accounts_update_notifier.rs index 60df441a7e3cef..6f7a58b1499fab 100644 --- a/geyser-plugin-manager/src/accounts_update_notifier.rs +++ b/geyser-plugin-manager/src/accounts_update_notifier.rs @@ -22,9 +22,14 @@ use { #[derive(Debug)] pub(crate) struct AccountsUpdateNotifierImpl { plugin_manager: Arc>, + snapshot_disabled: bool, } impl AccountsUpdateNotifierInterface for AccountsUpdateNotifierImpl { + fn notify_snapshot_disabled(&self) -> bool { + self.snapshot_disabled + } + fn notify_account_update( &self, slot: Slot, @@ -97,8 +102,11 @@ impl AccountsUpdateNotifierInterface for AccountsUpdateNotifierImpl { } impl AccountsUpdateNotifierImpl { - pub fn new(plugin_manager: Arc>) -> Self { - AccountsUpdateNotifierImpl { plugin_manager } + pub fn new(plugin_manager: Arc>, snapshot_disabled: bool) -> Self { + AccountsUpdateNotifierImpl { + plugin_manager, + snapshot_disabled, + } } fn accountinfo_from_shared_account_data<'a>( diff --git a/geyser-plugin-manager/src/geyser_plugin_service.rs b/geyser-plugin-manager/src/geyser_plugin_service.rs index f624fc66c90c3e..4a2959364cc232 100644 --- a/geyser-plugin-manager/src/geyser_plugin_service.rs +++ b/geyser-plugin-manager/src/geyser_plugin_service.rs @@ -59,11 +59,13 @@ impl GeyserPluginService { pub fn new( confirmed_bank_receiver: Receiver, geyser_plugin_always_enabled: bool, + geyser_plugin_snapshot_disabled: bool, geyser_plugin_config_files: &[PathBuf], ) -> Result { Self::new_with_receiver( confirmed_bank_receiver, geyser_plugin_always_enabled, + geyser_plugin_snapshot_disabled, geyser_plugin_config_files, None, ) @@ -72,6 +74,7 @@ impl GeyserPluginService { pub fn new_with_receiver( confirmed_bank_receiver: Receiver, geyser_plugin_always_enabled: bool, + geyser_plugin_snapshot_disabled: bool, geyser_plugin_config_files: &[PathBuf], rpc_to_plugin_manager_receiver_and_exit: Option<( Receiver, @@ -98,8 +101,10 @@ impl GeyserPluginService { let accounts_update_notifier: Option = if account_data_notifications_enabled { - let accounts_update_notifier = - AccountsUpdateNotifierImpl::new(plugin_manager.clone()); + let accounts_update_notifier = AccountsUpdateNotifierImpl::new( + plugin_manager.clone(), + geyser_plugin_snapshot_disabled, + ); Some(Arc::new(accounts_update_notifier)) } else { None diff --git a/ledger-tool/src/ledger_utils.rs b/ledger-tool/src/ledger_utils.rs index 58c2a54f725055..78bbb5f3efaf43 100644 --- a/ledger-tool/src/ledger_utils.rs +++ b/ledger-tool/src/ledger_utils.rs @@ -275,7 +275,7 @@ pub fn load_and_process_ledger( let (confirmed_bank_sender, confirmed_bank_receiver) = unbounded(); drop(confirmed_bank_sender); let geyser_service = - GeyserPluginService::new(confirmed_bank_receiver, false, &geyser_config_files) + GeyserPluginService::new(confirmed_bank_receiver, false, false, &geyser_config_files) .map_err(LoadAndProcessLedgerError::GeyserServiceSetup)?; ( geyser_service.get_accounts_update_notifier(), diff --git a/validator/src/cli.rs b/validator/src/cli.rs index aed7a3bffc1d9f..bdc95b2efc0272 100644 --- a/validator/src/cli.rs +++ b/validator/src/cli.rs @@ -1201,6 +1201,13 @@ pub fn app<'a>(version: &'a str, default_args: &'a DefaultArgs) -> App<'a, 'a> { .takes_value(false) .help("Đ•nable Geyser interface even if no Geyser configs are specified."), ) + .arg( + Arg::with_name("geyser_plugin_snapshot_disabled") + .long("geyser-plugin-snapshot-disabled") + .value_name("BOOLEAN") + .takes_value(false) + .help("Disable on startup account notifications from snapshot."), + ) .arg( Arg::with_name("snapshot_archive_format") .long("snapshot-archive-format") diff --git a/validator/src/main.rs b/validator/src/main.rs index a7de615b3be9ac..33e133243d23ec 100644 --- a/validator/src/main.rs +++ b/validator/src/main.rs @@ -1504,6 +1504,7 @@ pub fn main() { }, on_start_geyser_plugin_config_files, geyser_plugin_always_enabled: matches.is_present("geyser_plugin_always_enabled"), + geyser_plugin_snapshot_disabled: matches.is_present("geyser_plugin_snapshot_disabled"), rpc_addrs: value_t!(matches, "rpc_port", u16).ok().map(|rpc_port| { ( SocketAddr::new(rpc_bind_address, rpc_port),