Skip to content

Commit

Permalink
Cluster info geyzer notification firs impl
Browse files Browse the repository at this point in the history
add cluster notifier registration and test

Add cluster node remove geyzer notification

rebase on master and correct PR comments

fix rebase, fmt, clippy
  • Loading branch information
musitdev authored and godmodegalactus committed Mar 11, 2024
1 parent 3863bb1 commit 897e0bc
Show file tree
Hide file tree
Showing 15 changed files with 1,085 additions and 814 deletions.
1,569 changes: 759 additions & 810 deletions Cargo.lock

Large diffs are not rendered by default.

13 changes: 11 additions & 2 deletions core/src/validator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -659,13 +659,19 @@ impl Validator {
.as_ref()
.and_then(|geyser_plugin_service| geyser_plugin_service.get_block_metadata_notifier());

let cluster_info_notifier = geyser_plugin_service
.as_ref()
.and_then(|geyser_plugin_service| geyser_plugin_service.get_cluster_info_notifier());

info!(
"Geyser plugin: accounts_update_notifier: {}, \
transaction_notifier: {}, \
entry_notifier: {}",
entry_notifier: {} \
cluster_info_notifier: {}",
accounts_update_notifier.is_some(),
transaction_notifier.is_some(),
entry_notifier.is_some()
entry_notifier.is_some(),
cluster_info_notifier.is_some(),
);

let system_monitor_service = Some(SystemMonitorService::new(
Expand Down Expand Up @@ -744,6 +750,9 @@ impl Validator {
identity_keypair.clone(),
socket_addr_space,
);

//register Geyzer notifier.
cluster_info.set_clusterinfo_notifier(cluster_info_notifier);
cluster_info.set_contact_debug_interval(config.contact_debug_interval);
cluster_info.set_entrypoints(cluster_entrypoints);
cluster_info.restore_contact_info(ledger_path, config.contact_save_interval);
Expand Down
1 change: 1 addition & 0 deletions geyser-plugin-interface/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ edition = { workspace = true }

[dependencies]
log = { workspace = true }
solana-gossip = { workspace = true }
solana-sdk = { workspace = true }
solana-transaction-status = { workspace = true }
thiserror = { workspace = true }
Expand Down
52 changes: 52 additions & 0 deletions geyser-plugin-interface/src/geyser_plugin_interface.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,11 @@
/// the GeyserPlugin trait to work with the runtime.
/// In addition, the dynamic library must export a "C" function _create_plugin which
/// creates the implementation of the plugin.
use std::net::SocketAddr;
use {
solana_sdk::{
clock::{Slot, UnixTimestamp},
pubkey::Pubkey,
signature::Signature,
transaction::SanitizedTransaction,
},
Expand All @@ -13,6 +15,37 @@ use {
thiserror::Error,
};

#[derive(Debug, Clone, PartialEq, Eq)]
#[repr(C)]
/// Information about a node in the cluster.
pub struct ReplicaClusterInfoNode {
pub id: Pubkey,
/// gossip address
pub gossip: Option<SocketAddr>,
/// address to connect to for replication
pub tvu: Option<SocketAddr>,
/// TVU over QUIC protocol.
pub tvu_quic: Option<SocketAddr>,
/// repair service over QUIC protocol.
pub serve_repair_quic: Option<SocketAddr>,
/// transactions address
pub tpu: Option<SocketAddr>,
/// address to forward unprocessed transactions to
pub tpu_forwards: Option<SocketAddr>,
/// address to which to send bank state requests
pub tpu_vote: Option<SocketAddr>,
/// address to which to send JSON-RPC requests
pub rpc: Option<SocketAddr>,
/// websocket for JSON-RPC push notifications
pub rpc_pubsub: Option<SocketAddr>,
/// address to send repair requests to
pub serve_repair: Option<SocketAddr>,
/// latest wallclock picked
pub wallclock: u64,
/// node shred version
pub shred_version: u16,
}

#[derive(Debug, Clone, PartialEq, Eq)]
#[repr(C)]
/// Information about an account being updated
Expand Down Expand Up @@ -380,6 +413,18 @@ pub trait GeyserPlugin: Any + Send + Sync + std::fmt::Debug {
Ok(())
}

/// Called when a cluster info is updated on gossip network.
#[allow(unused_variables)]
fn update_cluster_info(&self, cluster_info: &ReplicaClusterInfoNode) -> Result<()> {
Ok(())
}

/// Called when a cluster info is removed on gossip network.
#[allow(unused_variables)]
fn notify_clusterinfo_remove(&self, pubkey: &Pubkey) -> Result<()> {
Ok(())
}

/// Called when all accounts are notified of during startup.
fn notify_end_of_startup(&self) -> Result<()> {
Ok(())
Expand Down Expand Up @@ -438,4 +483,11 @@ pub trait GeyserPlugin: Any + Send + Sync + std::fmt::Debug {
fn entry_notifications_enabled(&self) -> bool {
false
}

/// Check if the plugin is interested in cluster info data
/// Default is false -- if the plugin is interested in
/// cluster info data, return true.
fn clusterinfo_notifications_enabled(&self) -> bool {
false
}
}
2 changes: 2 additions & 0 deletions geyser-plugin-manager/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@ libloading = { workspace = true }
log = { workspace = true }
serde_json = { workspace = true }
solana-accounts-db = { workspace = true }
solana-client = { workspace = true }
solana-entry = { workspace = true }
solana-gossip = { workspace = true }
solana-ledger = { workspace = true }
solana-measure = { workspace = true }
solana-metrics = { workspace = true }
Expand Down
152 changes: 152 additions & 0 deletions geyser-plugin-manager/src/cluster_info_notifier.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,152 @@
// Module responsible for notifying plugins of transactions
use {
crate::geyser_plugin_manager::GeyserPluginManager,
agave_geyser_plugin_interface::geyser_plugin_interface::ReplicaClusterInfoNode,
log::*,
solana_client::connection_cache::Protocol,
solana_gossip::{
cluster_info_notifier_interface::ClusterInfoNotifierInterface,
legacy_contact_info::LegacyContactInfo,
},
solana_measure::measure::Measure,
solana_metrics::*,
solana_sdk::pubkey::Pubkey,
std::sync::{Arc, RwLock},
};

// This implementation of ClusterInfoNotifierImpl is passed to the rpc's TransactionStatusService
// at the validator startup. TransactionStatusService invokes the notify_transaction method
// for new transactions. The implementation in turn invokes the notify_transaction of each
// plugin enabled with transaction notification managed by the GeyserPluginManager.
#[derive(Debug)]
pub(crate) struct ClusterInfoNotifierImpl {
plugin_manager: Arc<RwLock<GeyserPluginManager>>,
}

impl ClusterInfoNotifierImpl {
pub fn new(plugin_manager: Arc<RwLock<GeyserPluginManager>>) -> Self {
ClusterInfoNotifierImpl { plugin_manager }
}

fn clusterinfo_from_legacy_contact_info(
legacy_info: &LegacyContactInfo,
) -> ReplicaClusterInfoNode {
ReplicaClusterInfoNode {
id: *legacy_info.pubkey(),
// gossip address
gossip: legacy_info.gossip().ok(),
// address to connect to for replication
tvu: legacy_info.tvu(Protocol::UDP).ok(),
// TVU over QUIC protocol.
tvu_quic: legacy_info.tvu(Protocol::QUIC).ok(),
// repair service over QUIC protocol.
serve_repair_quic: legacy_info.serve_repair(Protocol::QUIC).ok(),
// transactions address
tpu: legacy_info.tpu(Protocol::UDP).ok(),
// address to forward unprocessed transactions to
tpu_forwards: legacy_info.tpu_forwards(Protocol::UDP).ok(),
// address to which to send bank state requests
tpu_vote: legacy_info.tpu_vote().ok(),
// address to which to send JSON-RPC requests
rpc: legacy_info.rpc().ok(),
// websocket for JSON-RPC push notifications
rpc_pubsub: legacy_info.rpc_pubsub().ok(),
// address to send repair requests to
serve_repair: legacy_info.serve_repair(Protocol::UDP).ok(),
// latest wallclock picked
wallclock: legacy_info.wallclock(),
// node shred version
shred_version: legacy_info.shred_version(),
}
}
}

impl ClusterInfoNotifierInterface for ClusterInfoNotifierImpl {
fn notify_clusterinfo_update(&self, contact_info: &LegacyContactInfo) {
let cluster_info =
ClusterInfoNotifierImpl::clusterinfo_from_legacy_contact_info(contact_info);
let mut measure2 = Measure::start("geyser-plugin-notify_plugins_of_cluster_info_update");
let plugin_manager = self.plugin_manager.read().unwrap();

if plugin_manager.plugins.is_empty() {
return;
}
for plugin in plugin_manager.plugins.iter() {
let mut measure = Measure::start("geyser-plugin-update-cluster_info");
match plugin.update_cluster_info(&cluster_info) {
Err(err) => {
error!(
"Failed to update cluster_info {}, error: {} to plugin {}",
bs58::encode(cluster_info.id).into_string(),
err,
plugin.name()
)
}
Ok(_) => {
trace!(
"Successfully updated cluster_info {} to plugin {}",
bs58::encode(cluster_info.id).into_string(),
plugin.name()
);
}
}
measure.stop();
inc_new_counter_debug!(
"geyser-plugin-update-cluster_info-us",
measure.as_us() as usize,
100000,
100000
);
}
measure2.stop();
inc_new_counter_debug!(
"geyser-plugin-notify_plugins_of_cluster_info_update-us",
measure2.as_us() as usize,
100000,
100000
);
}

fn notify_clusterinfo_remove(&self, pubkey: &Pubkey) {
let mut measure2 = Measure::start("geyser-plugin-notify_plugins_of_cluster_info_update");
let plugin_manager = self.plugin_manager.read().unwrap();

if plugin_manager.plugins.is_empty() {
return;
}
for plugin in plugin_manager.plugins.iter() {
let mut measure = Measure::start("geyser-plugin-remove-cluster_info");
match plugin.notify_clusterinfo_remove(pubkey) {
Err(err) => {
error!(
"Failed to remove cluster_info {}, error: {} to plugin {}",
bs58::encode(pubkey).into_string(),
err,
plugin.name()
)
}
Ok(_) => {
trace!(
"Successfully remove cluster_info {} to plugin {}",
bs58::encode(pubkey).into_string(),
plugin.name()
);
}
}
measure.stop();
inc_new_counter_debug!(
"geyser-plugin-remove-cluster_info-us",
measure.as_us() as usize,
100000,
100000
);
}
measure2.stop();
inc_new_counter_debug!(
"geyser-plugin-notify_plugins_of_cluster_info_remove-us",
measure2.as_us() as usize,
100000,
100000
);
}
}
9 changes: 9 additions & 0 deletions geyser-plugin-manager/src/geyser_plugin_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,15 @@ impl GeyserPluginManager {
false
}

/// Check if the plugin is interested in cluster info data
pub fn clusterinfo_notifications_enabled(&self) -> bool {
for plugin in &self.plugins {
if plugin.entry_notifications_enabled() {
return true;
}
}
false
}
/// Admin RPC request handler
pub(crate) fn list_plugins(&self) -> JsonRpcResult<Vec<String>> {
Ok(self.plugins.iter().map(|p| p.name().to_owned()).collect())
Expand Down
17 changes: 17 additions & 0 deletions geyser-plugin-manager/src/geyser_plugin_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use {
accounts_update_notifier::AccountsUpdateNotifierImpl,
block_metadata_notifier::BlockMetadataNotifierImpl,
block_metadata_notifier_interface::BlockMetadataNotifierArc,
cluster_info_notifier::ClusterInfoNotifierImpl,
entry_notifier::EntryNotifierImpl,
geyser_plugin_manager::{GeyserPluginManager, GeyserPluginManagerRequest},
slot_status_notifier::SlotStatusNotifierImpl,
Expand All @@ -12,6 +13,7 @@ use {
crossbeam_channel::Receiver,
log::*,
solana_accounts_db::accounts_update_notifier_interface::AccountsUpdateNotifier,
solana_gossip::cluster_info_notifier_interface::ClusterInfoUpdateNotifierLock,
solana_ledger::entry_notifier_interface::EntryNotifierArc,
solana_rpc::{
optimistically_confirmed_bank_tracker::SlotNotification,
Expand All @@ -37,6 +39,7 @@ pub struct GeyserPluginService {
transaction_notifier: Option<TransactionNotifierArc>,
entry_notifier: Option<EntryNotifierArc>,
block_metadata_notifier: Option<BlockMetadataNotifierArc>,
cluster_info_notifier: Option<ClusterInfoUpdateNotifierLock>,
}

impl GeyserPluginService {
Expand Down Expand Up @@ -81,8 +84,17 @@ impl GeyserPluginService {
plugin_manager.account_data_notifications_enabled();
let transaction_notifications_enabled = plugin_manager.transaction_notifications_enabled();
let entry_notifications_enabled = plugin_manager.entry_notifications_enabled();
let cluster_info_notifications_enabled = plugin_manager.clusterinfo_notifications_enabled();
let plugin_manager = Arc::new(RwLock::new(plugin_manager));

let cluster_info_notifier: Option<ClusterInfoUpdateNotifierLock> =
if cluster_info_notifications_enabled {
let cluster_info_notifier = ClusterInfoNotifierImpl::new(plugin_manager.clone());
Some(Arc::new(RwLock::new(cluster_info_notifier)))
} else {
None
};

let accounts_update_notifier: Option<AccountsUpdateNotifier> =
if account_data_notifications_enabled {
let accounts_update_notifier =
Expand Down Expand Up @@ -143,6 +155,7 @@ impl GeyserPluginService {
transaction_notifier,
entry_notifier,
block_metadata_notifier,
cluster_info_notifier,
})
}

Expand Down Expand Up @@ -172,6 +185,10 @@ impl GeyserPluginService {
self.block_metadata_notifier.clone()
}

pub fn get_cluster_info_notifier(&self) -> Option<ClusterInfoUpdateNotifierLock> {
self.cluster_info_notifier.clone()
}

pub fn join(self) -> thread::Result<()> {
if let Some(mut slot_status_observer) = self.slot_status_observer {
slot_status_observer.join()?;
Expand Down
1 change: 1 addition & 0 deletions geyser-plugin-manager/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
pub mod accounts_update_notifier;
pub mod block_metadata_notifier;
pub mod block_metadata_notifier_interface;
pub mod cluster_info_notifier;
pub mod entry_notifier;
pub mod geyser_plugin_manager;
pub mod geyser_plugin_service;
Expand Down
8 changes: 8 additions & 0 deletions gossip/src/cluster_info.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
note = "Please use `solana_net_utils::{MINIMUM_VALIDATOR_PORT_RANGE_WIDTH, VALIDATOR_PORT_RANGE}` instead"
)]
#[allow(deprecated)]
use crate::cluster_info_notifier_interface::ClusterInfoUpdateNotifierLock;
pub use solana_net_utils::{MINIMUM_VALIDATOR_PORT_RANGE_WIDTH, VALIDATOR_PORT_RANGE};
use {
crate::{
Expand Down Expand Up @@ -430,6 +431,13 @@ impl ClusterInfo {
me
}

pub fn set_clusterinfo_notifier(
&self,
cluster_info_notifier: Option<ClusterInfoUpdateNotifierLock>,
) {
self.gossip.set_clusterinfo_notifier(cluster_info_notifier);
}

pub fn set_contact_debug_interval(&mut self, new: u64) {
self.contact_debug_interval = new;
}
Expand Down
Loading

0 comments on commit 897e0bc

Please sign in to comment.