Skip to content

Commit

Permalink
geyser: add flag geyser-plugin-snapshot-disabled
Browse files Browse the repository at this point in the history
  • Loading branch information
fanatid committed Dec 13, 2024
1 parent d11072e commit d48cdb9
Show file tree
Hide file tree
Showing 8 changed files with 46 additions and 19 deletions.
29 changes: 15 additions & 14 deletions accounts-db/src/accounts_db/geyser_plugin_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,22 +39,23 @@ impl AccountsDb {
/// in the reverse order of the slots so that an account is only streamed once. At a slot, if the accounts is updated
/// multiple times only the last write (with highest write_version) is notified.
pub fn notify_account_restore_from_snapshot(&self) {
if self.accounts_update_notifier.is_none() {
return;
}

let mut slots = self.storage.all_slots();
let mut notified_accounts: HashSet<Pubkey> = HashSet::default();
let mut notify_stats = GeyserPluginNotifyAtSnapshotRestoreStats::default();
if let Some(accounts_update_notifier) = &self.accounts_update_notifier {
if accounts_update_notifier.notify_snapshot_disabled() {
accounts_update_notifier.notify_end_of_restore_from_snapshot();
} else {
let mut slots = self.storage.all_slots();
let mut notified_accounts: HashSet<Pubkey> = HashSet::default();
let mut notify_stats = GeyserPluginNotifyAtSnapshotRestoreStats::default();

slots.sort_by(|a, b| b.cmp(a));
for slot in slots {
self.notify_accounts_in_slot(slot, &mut notified_accounts, &mut notify_stats);
}

slots.sort_by(|a, b| b.cmp(a));
for slot in slots {
self.notify_accounts_in_slot(slot, &mut notified_accounts, &mut notify_stats);
accounts_update_notifier.notify_end_of_restore_from_snapshot();
notify_stats.report();
}
}

let accounts_update_notifier = self.accounts_update_notifier.as_ref().unwrap();
accounts_update_notifier.notify_end_of_restore_from_snapshot();
notify_stats.report();
}

pub fn notify_account_at_accounts_update(
Expand Down
2 changes: 2 additions & 0 deletions accounts-db/src/accounts_update_notifier_interface.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ use {
};

pub trait AccountsUpdateNotifierInterface: std::fmt::Debug {
fn notify_snapshot_disabled(&self) -> bool;

/// Notified when an account is updated at runtime, due to transaction activities
fn notify_account_update(
&self,
Expand Down
3 changes: 3 additions & 0 deletions core/src/validator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,7 @@ pub struct ValidatorConfig {
/// Specifies which plugins to start up with
pub on_start_geyser_plugin_config_files: Option<Vec<PathBuf>>,
pub geyser_plugin_always_enabled: bool,
pub geyser_plugin_snapshot_disabled: bool,
pub rpc_addrs: Option<(SocketAddr, SocketAddr)>, // (JsonRpc, JsonRpcPubSub)
pub pubsub_config: PubSubConfig,
pub snapshot_config: SnapshotConfig,
Expand Down Expand Up @@ -303,6 +304,7 @@ impl Default for ValidatorConfig {
rpc_config: JsonRpcConfig::default(),
on_start_geyser_plugin_config_files: None,
geyser_plugin_always_enabled: false,
geyser_plugin_snapshot_disabled: false,
rpc_addrs: None,
pubsub_config: PubSubConfig::default(),
snapshot_config: SnapshotConfig::new_load_only(),
Expand Down Expand Up @@ -602,6 +604,7 @@ impl Validator {
GeyserPluginService::new_with_receiver(
confirmed_bank_receiver,
config.geyser_plugin_always_enabled,
config.geyser_plugin_snapshot_disabled,
geyser_plugin_config_files.as_ref(),
rpc_to_plugin_manager_receiver_and_exit,
)
Expand Down
12 changes: 10 additions & 2 deletions geyser-plugin-manager/src/accounts_update_notifier.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,14 @@ use {
#[derive(Debug)]
pub(crate) struct AccountsUpdateNotifierImpl {
plugin_manager: Arc<RwLock<GeyserPluginManager>>,
snapshot_disabled: bool,
}

impl AccountsUpdateNotifierInterface for AccountsUpdateNotifierImpl {
fn notify_snapshot_disabled(&self) -> bool {
self.snapshot_disabled
}

fn notify_account_update(
&self,
slot: Slot,
Expand Down Expand Up @@ -97,8 +102,11 @@ impl AccountsUpdateNotifierInterface for AccountsUpdateNotifierImpl {
}

impl AccountsUpdateNotifierImpl {
pub fn new(plugin_manager: Arc<RwLock<GeyserPluginManager>>) -> Self {
AccountsUpdateNotifierImpl { plugin_manager }
pub fn new(plugin_manager: Arc<RwLock<GeyserPluginManager>>, snapshot_disabled: bool) -> Self {
AccountsUpdateNotifierImpl {
plugin_manager,
snapshot_disabled,
}
}

fn accountinfo_from_shared_account_data<'a>(
Expand Down
9 changes: 7 additions & 2 deletions geyser-plugin-manager/src/geyser_plugin_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,11 +59,13 @@ impl GeyserPluginService {
pub fn new(
confirmed_bank_receiver: Receiver<SlotNotification>,
geyser_plugin_always_enabled: bool,
geyser_plugin_snapshot_disabled: bool,
geyser_plugin_config_files: &[PathBuf],
) -> Result<Self, GeyserPluginServiceError> {
Self::new_with_receiver(
confirmed_bank_receiver,
geyser_plugin_always_enabled,
geyser_plugin_snapshot_disabled,
geyser_plugin_config_files,
None,
)
Expand All @@ -72,6 +74,7 @@ impl GeyserPluginService {
pub fn new_with_receiver(
confirmed_bank_receiver: Receiver<SlotNotification>,
geyser_plugin_always_enabled: bool,
geyser_plugin_snapshot_disabled: bool,
geyser_plugin_config_files: &[PathBuf],
rpc_to_plugin_manager_receiver_and_exit: Option<(
Receiver<GeyserPluginManagerRequest>,
Expand All @@ -98,8 +101,10 @@ impl GeyserPluginService {

let accounts_update_notifier: Option<AccountsUpdateNotifier> =
if account_data_notifications_enabled {
let accounts_update_notifier =
AccountsUpdateNotifierImpl::new(plugin_manager.clone());
let accounts_update_notifier = AccountsUpdateNotifierImpl::new(
plugin_manager.clone(),
geyser_plugin_snapshot_disabled,
);
Some(Arc::new(accounts_update_notifier))
} else {
None
Expand Down
2 changes: 1 addition & 1 deletion ledger-tool/src/ledger_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -275,7 +275,7 @@ pub fn load_and_process_ledger(
let (confirmed_bank_sender, confirmed_bank_receiver) = unbounded();
drop(confirmed_bank_sender);
let geyser_service =
GeyserPluginService::new(confirmed_bank_receiver, false, &geyser_config_files)
GeyserPluginService::new(confirmed_bank_receiver, false, false, &geyser_config_files)
.map_err(LoadAndProcessLedgerError::GeyserServiceSetup)?;
(
geyser_service.get_accounts_update_notifier(),
Expand Down
7 changes: 7 additions & 0 deletions validator/src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1201,6 +1201,13 @@ pub fn app<'a>(version: &'a str, default_args: &'a DefaultArgs) -> App<'a, 'a> {
.takes_value(false)
.help("Еnable Geyser interface even if no Geyser configs are specified."),
)
.arg(
Arg::with_name("geyser_plugin_snapshot_disabled")
.long("geyser-plugin-snapshot-disabled")
.value_name("BOOLEAN")
.takes_value(false)
.help("Disable on startup account notifications from snapshot."),
)
.arg(
Arg::with_name("snapshot_archive_format")
.long("snapshot-archive-format")
Expand Down
1 change: 1 addition & 0 deletions validator/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1504,6 +1504,7 @@ pub fn main() {
},
on_start_geyser_plugin_config_files,
geyser_plugin_always_enabled: matches.is_present("geyser_plugin_always_enabled"),
geyser_plugin_snapshot_disabled: matches.is_present("geyser_plugin_snapshot_disabled"),
rpc_addrs: value_t!(matches, "rpc_port", u16).ok().map(|rpc_port| {
(
SocketAddr::new(rpc_bind_address, rpc_port),
Expand Down

0 comments on commit d48cdb9

Please sign in to comment.