Skip to content

Commit

Permalink
geyser: fix always enabled flag (#3572)
Browse files Browse the repository at this point in the history
  • Loading branch information
fanatid authored Nov 14, 2024
1 parent 43cf84f commit 60b4196
Show file tree
Hide file tree
Showing 6 changed files with 37 additions and 11 deletions.
17 changes: 15 additions & 2 deletions core/src/validator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,7 @@ use {
solana_vote_program::vote_state,
solana_wen_restart::wen_restart::{wait_for_wen_restart, WenRestartConfig},
std::{
borrow::Cow,
collections::{HashMap, HashSet},
net::SocketAddr,
num::NonZeroUsize,
Expand Down Expand Up @@ -226,6 +227,7 @@ pub struct ValidatorConfig {
pub rpc_config: JsonRpcConfig,
/// Specifies which plugins to start up with
pub on_start_geyser_plugin_config_files: Option<Vec<PathBuf>>,
pub geyser_plugin_always_enabled: bool,
pub rpc_addrs: Option<(SocketAddr, SocketAddr)>, // (JsonRpc, JsonRpcPubSub)
pub pubsub_config: PubSubConfig,
pub snapshot_config: SnapshotConfig,
Expand Down Expand Up @@ -301,6 +303,7 @@ impl Default for ValidatorConfig {
account_snapshot_paths: Vec::new(),
rpc_config: JsonRpcConfig::default(),
on_start_geyser_plugin_config_files: None,
geyser_plugin_always_enabled: false,
rpc_addrs: None,
pubsub_config: PubSubConfig::default(),
snapshot_config: SnapshotConfig::new_load_only(),
Expand Down Expand Up @@ -562,16 +565,26 @@ impl Validator {

let exit = Arc::new(AtomicBool::new(false));

let geyser_plugin_config_files = config
.on_start_geyser_plugin_config_files
.as_ref()
.map(Cow::Borrowed)
.or_else(|| {
config
.geyser_plugin_always_enabled
.then_some(Cow::Owned(vec![]))
});
let geyser_plugin_service =
if let Some(geyser_plugin_config_files) = &config.on_start_geyser_plugin_config_files {
if let Some(geyser_plugin_config_files) = geyser_plugin_config_files {
let (confirmed_bank_sender, confirmed_bank_receiver) = unbounded();
bank_notification_senders.push(confirmed_bank_sender);
let rpc_to_plugin_manager_receiver_and_exit =
rpc_to_plugin_manager_receiver.map(|receiver| (receiver, exit.clone()));
Some(
GeyserPluginService::new_with_receiver(
confirmed_bank_receiver,
geyser_plugin_config_files,
config.geyser_plugin_always_enabled,
geyser_plugin_config_files.as_ref(),
rpc_to_plugin_manager_receiver_and_exit,
)
.map_err(|err| {
Expand Down
19 changes: 15 additions & 4 deletions geyser-plugin-manager/src/geyser_plugin_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@ impl GeyserPluginService {
/// Creates and returns the GeyserPluginService.
/// # Arguments
/// * `confirmed_bank_receiver` - The receiver for confirmed bank notification
/// * `geyser_plugin_always_enabled` -- Subscribe on all types of notifiactions, even if
/// no config files are passed
/// * `geyser_plugin_config_file` - The config file path for the plugin. The
/// config file controls the plugin responsible
/// for transporting the data to external data stores. It is defined in JSON format.
Expand All @@ -56,13 +58,20 @@ impl GeyserPluginService {
/// It is usually used to configure the connection information for the external data store.
pub fn new(
confirmed_bank_receiver: Receiver<SlotNotification>,
geyser_plugin_always_enabled: bool,
geyser_plugin_config_files: &[PathBuf],
) -> Result<Self, GeyserPluginServiceError> {
Self::new_with_receiver(confirmed_bank_receiver, geyser_plugin_config_files, None)
Self::new_with_receiver(
confirmed_bank_receiver,
geyser_plugin_always_enabled,
geyser_plugin_config_files,
None,
)
}

pub fn new_with_receiver(
confirmed_bank_receiver: Receiver<SlotNotification>,
geyser_plugin_always_enabled: bool,
geyser_plugin_config_files: &[PathBuf],
rpc_to_plugin_manager_receiver_and_exit: Option<(
Receiver<GeyserPluginManagerRequest>,
Expand All @@ -80,9 +89,11 @@ impl GeyserPluginService {
}

let account_data_notifications_enabled =
plugin_manager.account_data_notifications_enabled();
let transaction_notifications_enabled = plugin_manager.transaction_notifications_enabled();
let entry_notifications_enabled = plugin_manager.entry_notifications_enabled();
plugin_manager.account_data_notifications_enabled() || geyser_plugin_always_enabled;
let transaction_notifications_enabled =
plugin_manager.transaction_notifications_enabled() || geyser_plugin_always_enabled;
let entry_notifications_enabled =
plugin_manager.entry_notifications_enabled() || geyser_plugin_always_enabled;
let plugin_manager = Arc::new(RwLock::new(plugin_manager));

let accounts_update_notifier: Option<AccountsUpdateNotifier> =
Expand Down
2 changes: 1 addition & 1 deletion ledger-tool/src/ledger_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -272,7 +272,7 @@ pub fn load_and_process_ledger(
let (confirmed_bank_sender, confirmed_bank_receiver) = unbounded();
drop(confirmed_bank_sender);
let geyser_service =
GeyserPluginService::new(confirmed_bank_receiver, &geyser_config_files)
GeyserPluginService::new(confirmed_bank_receiver, false, &geyser_config_files)
.map_err(LoadAndProcessLedgerError::GeyserServiceSetup)?;
(
geyser_service.get_accounts_update_notifier(),
Expand Down
1 change: 1 addition & 0 deletions local-cluster/src/validator_configs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ pub fn safe_clone_config(config: &ValidatorConfig) -> ValidatorConfig {
account_snapshot_paths: config.account_snapshot_paths.clone(),
rpc_config: config.rpc_config.clone(),
on_start_geyser_plugin_config_files: config.on_start_geyser_plugin_config_files.clone(),
geyser_plugin_always_enabled: config.geyser_plugin_always_enabled,
rpc_addrs: config.rpc_addrs,
pubsub_config: config.pubsub_config.clone(),
snapshot_config: config.snapshot_config.clone(),
Expand Down
3 changes: 1 addition & 2 deletions validator/src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1204,8 +1204,7 @@ pub fn app<'a>(version: &'a str, default_args: &'a DefaultArgs) -> App<'a, 'a> {
Arg::with_name("geyser_plugin_always_enabled")
.long("geyser-plugin-always-enabled")
.value_name("BOOLEAN")
.takes_value(true)
.default_value("false")
.takes_value(false)
.help("Еnable Geyser interface even if no Geyser configs are specified."),
)
.arg(
Expand Down
6 changes: 4 additions & 2 deletions validator/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1380,9 +1380,10 @@ pub fn main() {
.collect(),
)
} else {
value_t_or_exit!(matches, "geyser_plugin_always_enabled", bool).then(Vec::new)
None
};
let starting_with_geyser_plugins: bool = on_start_geyser_plugin_config_files.is_some();
let starting_with_geyser_plugins: bool = on_start_geyser_plugin_config_files.is_some()
|| matches.is_present("geyser_plugin_always_enabled");

let rpc_bigtable_config = if matches.is_present("enable_rpc_bigtable_ledger_storage")
|| matches.is_present("enable_bigtable_ledger_upload")
Expand Down Expand Up @@ -1495,6 +1496,7 @@ pub fn main() {
skip_preflight_health_check: matches.is_present("skip_preflight_health_check"),
},
on_start_geyser_plugin_config_files,
geyser_plugin_always_enabled: matches.is_present("geyser_plugin_always_enabled"),
rpc_addrs: value_t!(matches, "rpc_port", u16).ok().map(|rpc_port| {
(
SocketAddr::new(rpc_bind_address, rpc_port),
Expand Down

0 comments on commit 60b4196

Please sign in to comment.