From 60b41963ea26e739fd0988639297bf4bda82231a Mon Sep 17 00:00:00 2001 From: Kirill Fomichev Date: Thu, 14 Nov 2024 23:32:49 +0200 Subject: [PATCH] geyser: fix always enabled flag (#3572) --- core/src/validator.rs | 17 +++++++++++++++-- .../src/geyser_plugin_service.rs | 19 +++++++++++++++---- ledger-tool/src/ledger_utils.rs | 2 +- local-cluster/src/validator_configs.rs | 1 + validator/src/cli.rs | 3 +-- validator/src/main.rs | 6 ++++-- 6 files changed, 37 insertions(+), 11 deletions(-) diff --git a/core/src/validator.rs b/core/src/validator.rs index 8986fea5f005d2..510c0fb0f71cba 100644 --- a/core/src/validator.rs +++ b/core/src/validator.rs @@ -131,6 +131,7 @@ use { solana_vote_program::vote_state, solana_wen_restart::wen_restart::{wait_for_wen_restart, WenRestartConfig}, std::{ + borrow::Cow, collections::{HashMap, HashSet}, net::SocketAddr, num::NonZeroUsize, @@ -226,6 +227,7 @@ pub struct ValidatorConfig { pub rpc_config: JsonRpcConfig, /// Specifies which plugins to start up with pub on_start_geyser_plugin_config_files: Option>, + pub geyser_plugin_always_enabled: bool, pub rpc_addrs: Option<(SocketAddr, SocketAddr)>, // (JsonRpc, JsonRpcPubSub) pub pubsub_config: PubSubConfig, pub snapshot_config: SnapshotConfig, @@ -301,6 +303,7 @@ impl Default for ValidatorConfig { account_snapshot_paths: Vec::new(), rpc_config: JsonRpcConfig::default(), on_start_geyser_plugin_config_files: None, + geyser_plugin_always_enabled: false, rpc_addrs: None, pubsub_config: PubSubConfig::default(), snapshot_config: SnapshotConfig::new_load_only(), @@ -562,8 +565,17 @@ impl Validator { let exit = Arc::new(AtomicBool::new(false)); + let geyser_plugin_config_files = config + .on_start_geyser_plugin_config_files + .as_ref() + .map(Cow::Borrowed) + .or_else(|| { + config + .geyser_plugin_always_enabled + .then_some(Cow::Owned(vec![])) + }); let geyser_plugin_service = - if let Some(geyser_plugin_config_files) = &config.on_start_geyser_plugin_config_files { + if let Some(geyser_plugin_config_files) = geyser_plugin_config_files { let (confirmed_bank_sender, confirmed_bank_receiver) = unbounded(); bank_notification_senders.push(confirmed_bank_sender); let rpc_to_plugin_manager_receiver_and_exit = @@ -571,7 +583,8 @@ impl Validator { Some( GeyserPluginService::new_with_receiver( confirmed_bank_receiver, - geyser_plugin_config_files, + config.geyser_plugin_always_enabled, + geyser_plugin_config_files.as_ref(), rpc_to_plugin_manager_receiver_and_exit, ) .map_err(|err| { diff --git a/geyser-plugin-manager/src/geyser_plugin_service.rs b/geyser-plugin-manager/src/geyser_plugin_service.rs index 61fca230030c11..f624fc66c90c3e 100644 --- a/geyser-plugin-manager/src/geyser_plugin_service.rs +++ b/geyser-plugin-manager/src/geyser_plugin_service.rs @@ -45,6 +45,8 @@ impl GeyserPluginService { /// Creates and returns the GeyserPluginService. /// # Arguments /// * `confirmed_bank_receiver` - The receiver for confirmed bank notification + /// * `geyser_plugin_always_enabled` -- Subscribe on all types of notifiactions, even if + /// no config files are passed /// * `geyser_plugin_config_file` - The config file path for the plugin. The /// config file controls the plugin responsible /// for transporting the data to external data stores. It is defined in JSON format. @@ -56,13 +58,20 @@ impl GeyserPluginService { /// It is usually used to configure the connection information for the external data store. pub fn new( confirmed_bank_receiver: Receiver, + geyser_plugin_always_enabled: bool, geyser_plugin_config_files: &[PathBuf], ) -> Result { - Self::new_with_receiver(confirmed_bank_receiver, geyser_plugin_config_files, None) + Self::new_with_receiver( + confirmed_bank_receiver, + geyser_plugin_always_enabled, + geyser_plugin_config_files, + None, + ) } pub fn new_with_receiver( confirmed_bank_receiver: Receiver, + geyser_plugin_always_enabled: bool, geyser_plugin_config_files: &[PathBuf], rpc_to_plugin_manager_receiver_and_exit: Option<( Receiver, @@ -80,9 +89,11 @@ impl GeyserPluginService { } let account_data_notifications_enabled = - plugin_manager.account_data_notifications_enabled(); - let transaction_notifications_enabled = plugin_manager.transaction_notifications_enabled(); - let entry_notifications_enabled = plugin_manager.entry_notifications_enabled(); + plugin_manager.account_data_notifications_enabled() || geyser_plugin_always_enabled; + let transaction_notifications_enabled = + plugin_manager.transaction_notifications_enabled() || geyser_plugin_always_enabled; + let entry_notifications_enabled = + plugin_manager.entry_notifications_enabled() || geyser_plugin_always_enabled; let plugin_manager = Arc::new(RwLock::new(plugin_manager)); let accounts_update_notifier: Option = diff --git a/ledger-tool/src/ledger_utils.rs b/ledger-tool/src/ledger_utils.rs index df541aeea41ea0..0d58d8d91b883f 100644 --- a/ledger-tool/src/ledger_utils.rs +++ b/ledger-tool/src/ledger_utils.rs @@ -272,7 +272,7 @@ pub fn load_and_process_ledger( let (confirmed_bank_sender, confirmed_bank_receiver) = unbounded(); drop(confirmed_bank_sender); let geyser_service = - GeyserPluginService::new(confirmed_bank_receiver, &geyser_config_files) + GeyserPluginService::new(confirmed_bank_receiver, false, &geyser_config_files) .map_err(LoadAndProcessLedgerError::GeyserServiceSetup)?; ( geyser_service.get_accounts_update_notifier(), diff --git a/local-cluster/src/validator_configs.rs b/local-cluster/src/validator_configs.rs index 78a7acd681ef75..e90475aad2a06f 100644 --- a/local-cluster/src/validator_configs.rs +++ b/local-cluster/src/validator_configs.rs @@ -15,6 +15,7 @@ pub fn safe_clone_config(config: &ValidatorConfig) -> ValidatorConfig { account_snapshot_paths: config.account_snapshot_paths.clone(), rpc_config: config.rpc_config.clone(), on_start_geyser_plugin_config_files: config.on_start_geyser_plugin_config_files.clone(), + geyser_plugin_always_enabled: config.geyser_plugin_always_enabled, rpc_addrs: config.rpc_addrs, pubsub_config: config.pubsub_config.clone(), snapshot_config: config.snapshot_config.clone(), diff --git a/validator/src/cli.rs b/validator/src/cli.rs index a4d22797ad542f..c7845269e77019 100644 --- a/validator/src/cli.rs +++ b/validator/src/cli.rs @@ -1204,8 +1204,7 @@ pub fn app<'a>(version: &'a str, default_args: &'a DefaultArgs) -> App<'a, 'a> { Arg::with_name("geyser_plugin_always_enabled") .long("geyser-plugin-always-enabled") .value_name("BOOLEAN") - .takes_value(true) - .default_value("false") + .takes_value(false) .help("Đ•nable Geyser interface even if no Geyser configs are specified."), ) .arg( diff --git a/validator/src/main.rs b/validator/src/main.rs index 34fd5e1227bd16..922d676d750238 100644 --- a/validator/src/main.rs +++ b/validator/src/main.rs @@ -1380,9 +1380,10 @@ pub fn main() { .collect(), ) } else { - value_t_or_exit!(matches, "geyser_plugin_always_enabled", bool).then(Vec::new) + None }; - let starting_with_geyser_plugins: bool = on_start_geyser_plugin_config_files.is_some(); + let starting_with_geyser_plugins: bool = on_start_geyser_plugin_config_files.is_some() + || matches.is_present("geyser_plugin_always_enabled"); let rpc_bigtable_config = if matches.is_present("enable_rpc_bigtable_ledger_storage") || matches.is_present("enable_bigtable_ledger_upload") @@ -1495,6 +1496,7 @@ pub fn main() { skip_preflight_health_check: matches.is_present("skip_preflight_health_check"), }, on_start_geyser_plugin_config_files, + geyser_plugin_always_enabled: matches.is_present("geyser_plugin_always_enabled"), rpc_addrs: value_t!(matches, "rpc_port", u16).ok().map(|rpc_port| { ( SocketAddr::new(rpc_bind_address, rpc_port),