Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

geyser: add flag geyser-plugin-snapshot-disabled #4103

Merged
merged 5 commits into from
Dec 19, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 20 additions & 14 deletions accounts-db/src/accounts_db/geyser_plugin_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,22 +39,23 @@ impl AccountsDb {
/// in the reverse order of the slots so that an account is only streamed once. At a slot, if the accounts is updated
/// multiple times only the last write (with highest write_version) is notified.
pub fn notify_account_restore_from_snapshot(&self) {
if self.accounts_update_notifier.is_none() {
return;
}

let mut slots = self.storage.all_slots();
let mut notified_accounts: HashSet<Pubkey> = HashSet::default();
let mut notify_stats = GeyserPluginNotifyAtSnapshotRestoreStats::default();
if let Some(accounts_update_notifier) = &self.accounts_update_notifier {
fanatid marked this conversation as resolved.
Show resolved Hide resolved
if accounts_update_notifier.notify_snapshot_disabled() {
accounts_update_notifier.notify_end_of_restore_from_snapshot();
} else {
let mut slots = self.storage.all_slots();
let mut notified_accounts: HashSet<Pubkey> = HashSet::default();
let mut notify_stats = GeyserPluginNotifyAtSnapshotRestoreStats::default();

slots.sort_by(|a, b| b.cmp(a));
for slot in slots {
self.notify_accounts_in_slot(slot, &mut notified_accounts, &mut notify_stats);
}

slots.sort_by(|a, b| b.cmp(a));
for slot in slots {
self.notify_accounts_in_slot(slot, &mut notified_accounts, &mut notify_stats);
accounts_update_notifier.notify_end_of_restore_from_snapshot();
notify_stats.report();
}
}

let accounts_update_notifier = self.accounts_update_notifier.as_ref().unwrap();
accounts_update_notifier.notify_end_of_restore_from_snapshot();
notify_stats.report();
}

pub fn notify_account_at_accounts_update(
Expand Down Expand Up @@ -196,6 +197,11 @@ pub mod tests {
}

impl AccountsUpdateNotifierInterface for GeyserTestPlugin {
/// Disable account notifications from snapshot
fn notify_snapshot_disabled(&self) -> bool {
false
}

/// Notified when an account is updated at runtime, due to transaction activities
fn notify_account_update(
&self,
Expand Down
3 changes: 3 additions & 0 deletions accounts-db/src/accounts_update_notifier_interface.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,9 @@ use {
};

pub trait AccountsUpdateNotifierInterface: std::fmt::Debug {
/// Disable account notifications from snapshot
fn notify_snapshot_disabled(&self) -> bool;

/// Notified when an account is updated at runtime, due to transaction activities
fn notify_account_update(
&self,
Expand Down
3 changes: 3 additions & 0 deletions core/src/validator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,7 @@ pub struct ValidatorConfig {
/// Specifies which plugins to start up with
pub on_start_geyser_plugin_config_files: Option<Vec<PathBuf>>,
pub geyser_plugin_always_enabled: bool,
pub geyser_plugin_snapshot_disabled: bool,
pub rpc_addrs: Option<(SocketAddr, SocketAddr)>, // (JsonRpc, JsonRpcPubSub)
pub pubsub_config: PubSubConfig,
pub snapshot_config: SnapshotConfig,
Expand Down Expand Up @@ -303,6 +304,7 @@ impl Default for ValidatorConfig {
rpc_config: JsonRpcConfig::default(),
on_start_geyser_plugin_config_files: None,
geyser_plugin_always_enabled: false,
geyser_plugin_snapshot_disabled: false,
rpc_addrs: None,
pubsub_config: PubSubConfig::default(),
snapshot_config: SnapshotConfig::new_load_only(),
Expand Down Expand Up @@ -602,6 +604,7 @@ impl Validator {
GeyserPluginService::new_with_receiver(
confirmed_bank_receiver,
config.geyser_plugin_always_enabled,
config.geyser_plugin_snapshot_disabled,
geyser_plugin_config_files.as_ref(),
rpc_to_plugin_manager_receiver_and_exit,
)
Expand Down
12 changes: 10 additions & 2 deletions geyser-plugin-manager/src/accounts_update_notifier.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,14 @@ use {
#[derive(Debug)]
pub(crate) struct AccountsUpdateNotifierImpl {
plugin_manager: Arc<RwLock<GeyserPluginManager>>,
snapshot_disabled: bool,
}

impl AccountsUpdateNotifierInterface for AccountsUpdateNotifierImpl {
fn notify_snapshot_disabled(&self) -> bool {
self.snapshot_disabled
}

fn notify_account_update(
&self,
slot: Slot,
Expand Down Expand Up @@ -97,8 +102,11 @@ impl AccountsUpdateNotifierInterface for AccountsUpdateNotifierImpl {
}

impl AccountsUpdateNotifierImpl {
pub fn new(plugin_manager: Arc<RwLock<GeyserPluginManager>>) -> Self {
AccountsUpdateNotifierImpl { plugin_manager }
pub fn new(plugin_manager: Arc<RwLock<GeyserPluginManager>>, snapshot_disabled: bool) -> Self {
AccountsUpdateNotifierImpl {
plugin_manager,
snapshot_disabled,
}
}

fn accountinfo_from_shared_account_data<'a>(
Expand Down
9 changes: 7 additions & 2 deletions geyser-plugin-manager/src/geyser_plugin_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,11 +59,13 @@ impl GeyserPluginService {
pub fn new(
confirmed_bank_receiver: Receiver<SlotNotification>,
geyser_plugin_always_enabled: bool,
geyser_plugin_snapshot_disabled: bool,
geyser_plugin_config_files: &[PathBuf],
) -> Result<Self, GeyserPluginServiceError> {
Self::new_with_receiver(
confirmed_bank_receiver,
geyser_plugin_always_enabled,
geyser_plugin_snapshot_disabled,
geyser_plugin_config_files,
None,
)
Expand All @@ -72,6 +74,7 @@ impl GeyserPluginService {
pub fn new_with_receiver(
confirmed_bank_receiver: Receiver<SlotNotification>,
geyser_plugin_always_enabled: bool,
geyser_plugin_snapshot_disabled: bool,
geyser_plugin_config_files: &[PathBuf],
rpc_to_plugin_manager_receiver_and_exit: Option<(
Receiver<GeyserPluginManagerRequest>,
Expand All @@ -98,8 +101,10 @@ impl GeyserPluginService {

let accounts_update_notifier: Option<AccountsUpdateNotifier> =
if account_data_notifications_enabled {
let accounts_update_notifier =
AccountsUpdateNotifierImpl::new(plugin_manager.clone());
let accounts_update_notifier = AccountsUpdateNotifierImpl::new(
plugin_manager.clone(),
geyser_plugin_snapshot_disabled,
);
Some(Arc::new(accounts_update_notifier))
} else {
None
Expand Down
2 changes: 1 addition & 1 deletion ledger-tool/src/ledger_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -275,7 +275,7 @@ pub fn load_and_process_ledger(
let (confirmed_bank_sender, confirmed_bank_receiver) = unbounded();
drop(confirmed_bank_sender);
let geyser_service =
GeyserPluginService::new(confirmed_bank_receiver, false, &geyser_config_files)
GeyserPluginService::new(confirmed_bank_receiver, false, false, &geyser_config_files)
.map_err(LoadAndProcessLedgerError::GeyserServiceSetup)?;
(
geyser_service.get_accounts_update_notifier(),
Expand Down
1 change: 1 addition & 0 deletions local-cluster/src/validator_configs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ pub fn safe_clone_config(config: &ValidatorConfig) -> ValidatorConfig {
rpc_config: config.rpc_config.clone(),
on_start_geyser_plugin_config_files: config.on_start_geyser_plugin_config_files.clone(),
geyser_plugin_always_enabled: config.geyser_plugin_always_enabled,
geyser_plugin_snapshot_disabled: config.geyser_plugin_snapshot_disabled,
rpc_addrs: config.rpc_addrs,
pubsub_config: config.pubsub_config.clone(),
snapshot_config: config.snapshot_config.clone(),
Expand Down
7 changes: 7 additions & 0 deletions validator/src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1201,6 +1201,13 @@ pub fn app<'a>(version: &'a str, default_args: &'a DefaultArgs) -> App<'a, 'a> {
.takes_value(false)
.help("Еnable Geyser interface even if no Geyser configs are specified."),
)
.arg(
Arg::with_name("geyser_plugin_snapshot_disabled")
fanatid marked this conversation as resolved.
Show resolved Hide resolved
.long("geyser-plugin-snapshot-disabled")
.value_name("BOOLEAN")
.takes_value(false)
.help("Disable on startup account notifications from snapshot."),
)
.arg(
Arg::with_name("snapshot_archive_format")
.long("snapshot-archive-format")
Expand Down
1 change: 1 addition & 0 deletions validator/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1504,6 +1504,7 @@ pub fn main() {
},
on_start_geyser_plugin_config_files,
geyser_plugin_always_enabled: matches.is_present("geyser_plugin_always_enabled"),
geyser_plugin_snapshot_disabled: matches.is_present("geyser_plugin_snapshot_disabled"),
rpc_addrs: value_t!(matches, "rpc_port", u16).ok().map(|rpc_port| {
(
SocketAddr::new(rpc_bind_address, rpc_port),
Expand Down
Loading