diff --git a/Cargo.lock b/Cargo.lock index 756d24d..4a0e519 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -954,6 +954,16 @@ dependencies = [ "syn 2.0.48", ] +[[package]] +name = "deranged" +version = "0.3.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b42b6fa04a440b495c8b04d0e71b707c585f83cb9cb28cf8cd0d976c315e31b4" +dependencies = [ + "powerfmt", + "serde", +] + [[package]] name = "derivation-path" version = "0.2.0" @@ -1295,6 +1305,7 @@ dependencies = [ "serde", "serde_derive", "serde_json", + "serde_with 3.9.0", "solana-logger", "solana-metrics", "solana-program", @@ -1409,6 +1420,12 @@ version = "0.3.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "fed44880c466736ef9a5c5b5facefb5ed0785676d0c02d612db14e54f0d84286" +[[package]] +name = "hex" +version = "0.4.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7f24254aa9a54b5c858eaee2f5bccdb46aaf0e486a595ed5fd8f86ba55232a70" + [[package]] name = "hmac" version = "0.8.1" @@ -1576,6 +1593,7 @@ checksum = "bd070e393353796e801d209ad339e89596eb4c8d430d18ede6a1cced8fafbd99" dependencies = [ "autocfg", "hashbrown 0.12.3", + "serde", ] [[package]] @@ -1586,6 +1604,7 @@ checksum = "824b2ae422412366ba479e8111fd301f7b5faece8149317bb81925979a53f520" dependencies = [ "equivalent", "hashbrown 0.14.1", + "serde", ] [[package]] @@ -1917,6 +1936,12 @@ dependencies = [ "num-traits", ] +[[package]] +name = "num-conv" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "51d515d32fb182ee37cda2ccdcb92950d6a3c2893aa280e540671c2cd0f3b1d9" + [[package]] name = "num-derive" version = "0.4.1" @@ -2140,6 +2165,12 @@ dependencies = [ "universal-hash", ] +[[package]] +name = "powerfmt" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "439ee305def115ba05938db6eb1644ff94165c5ab5e9420d1c1bcedbba909391" + [[package]] name = "ppv-lite86" version = "0.2.17" @@ -2625,7 +2656,25 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "07ff71d2c147a7b57362cead5e22f772cd52f6ab31cfcd9edcd7f6aeb2a0afbe" dependencies = [ "serde", - "serde_with_macros", + "serde_with_macros 2.3.3", +] + +[[package]] +name = "serde_with" +version = "3.9.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "69cecfa94848272156ea67b2b1a53f20fc7bc638c4a46d2f8abde08f05f4b857" +dependencies = [ + "base64 0.22.1", + "chrono", + "hex", + "indexmap 1.9.3", + "indexmap 2.2.2", + "serde", + "serde_derive", + "serde_json", + "serde_with_macros 3.9.0", + "time", ] [[package]] @@ -2640,6 +2689,18 @@ dependencies = [ "syn 2.0.48", ] +[[package]] +name = "serde_with_macros" +version = "3.9.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a8fee4991ef4f274617a51ad4af30519438dacb2f56ac773b08a1922ff743350" +dependencies = [ + "darling", + "proc-macro2", + "quote", + "syn 2.0.48", +] + [[package]] name = "sha2" version = "0.9.9" @@ -2944,7 +3005,7 @@ dependencies = [ "serde_bytes", "serde_derive", "serde_json", - "serde_with", + "serde_with 2.3.3", "sha2 0.10.8", "sha3 0.10.8", "siphasher", @@ -3425,6 +3486,37 @@ dependencies = [ "syn 2.0.48", ] +[[package]] +name = "time" +version = "0.3.36" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5dfd88e563464686c916c7e46e623e520ddc6d79fa6641390f2e3fa86e83e885" +dependencies = [ + "deranged", + "itoa", + "num-conv", + "powerfmt", + "serde", + "time-core", + "time-macros", +] + +[[package]] +name = "time-core" +version = "0.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ef927ca75afb808a4d64dd374f00a2adf8d0fcff8e7b184af886c3c87ec4a3f3" + +[[package]] +name = "time-macros" +version = "0.2.18" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3f252a68540fde3a3877aeea552b832b40ab9a69e318efd078774a01ddee1ccf" +dependencies = [ + "num-conv", + "time-core", +] + [[package]] name = "tinyvec" version = "1.6.0" diff --git a/Cargo.toml b/Cargo.toml index 3912c86..71773d5 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -37,6 +37,7 @@ rand = "0.8" serde = "1.0.160" serde_derive = "1.0.160" serde_json = "1.0.96" +serde_with = "=3.9.0" solana-account-decoder = "=2.0.15" solana-logger = "=2.0.15" solana-metrics = "=2.0.15" diff --git a/client/src/geyser_consumer.rs b/client/src/geyser_consumer.rs index 6bbd012..bb171e7 100644 --- a/client/src/geyser_consumer.rs +++ b/client/src/geyser_consumer.rs @@ -19,7 +19,8 @@ use std::{ use jito_geyser_protos::solana::geyser::{ geyser_client::GeyserClient, maybe_partial_account_update, EmptyRequest, MaybePartialAccountUpdate, SubscribeAccountUpdatesRequest, - SubscribePartialAccountUpdatesRequest, SubscribeSlotUpdateRequest, TimestampedAccountUpdate, + SubscribePartialAccountUpdatesRequest, SubscribeSlotEntryUpdateRequest, + SubscribeSlotUpdateRequest, TimestampedAccountUpdate, }; use log::*; use lru::LruCache; @@ -32,7 +33,9 @@ use tonic::{codegen::InterceptedService, transport::Channel, Response, Status}; use crate::{ geyser_consumer::GeyserConsumerError::{MissedHeartbeat, StreamClosed}, - types::{AccountUpdate, AccountUpdateNotification, PartialAccountUpdate, SlotUpdate}, + types::{ + AccountUpdate, AccountUpdateNotification, PartialAccountUpdate, SlotEntryUpdate, SlotUpdate, + }, GrpcInterceptor, Pubkey, Slot, }; @@ -242,6 +245,35 @@ impl GeyserConsumer { Ok(()) } + pub async fn consume_slot_entry_updates( + &self, + slot_updates_tx: UnboundedSender, + ) -> Result<()> { + let mut c = self.client.clone(); + + let resp = c + .subscribe_slot_entry_updates(SubscribeSlotEntryUpdateRequest {}) + .await?; + let mut stream = resp.into_inner(); + + while !self.exit.load(Ordering::Relaxed) { + match stream.message().await { + Ok(Some(slot_update)) => { + if slot_updates_tx + .send(SlotEntryUpdate::from(slot_update.entry_update.unwrap())) + .is_err() + { + return Err(GeyserConsumerError::ConsumerChannelDisconnected); + }; + } + Ok(None) => return Err(StreamClosed), + Err(e) => return Err(GeyserConsumerError::from(e)), + } + } + + Ok(()) + } + #[allow(clippy::too_many_arguments)] fn process_account_update( maybe_message: std::result::Result, Status>, diff --git a/client/src/types.rs b/client/src/types.rs index 3dd670f..701df64 100644 --- a/client/src/types.rs +++ b/client/src/types.rs @@ -136,3 +136,17 @@ impl From for SlotUpdate { } } } + +pub struct SlotEntryUpdate { + pub slot: Slot, + pub index: u64, +} + +impl From for SlotEntryUpdate { + fn from(proto: geyser::SlotEntryUpdate) -> Self { + Self { + slot: proto.slot, + index: proto.index, + } + } +} diff --git a/proto/proto/geyser.proto b/proto/proto/geyser.proto index 0287a2e..f870a5b 100644 --- a/proto/proto/geyser.proto +++ b/proto/proto/geyser.proto @@ -157,6 +157,29 @@ message GetHeartbeatIntervalResponse { uint64 heartbeat_interval_ms = 1; } +/// Modelled based off of https://github.com/solana-labs/solana/blob/v2.0/geyser-plugin-interface/src/geyser_plugin_interface.rs#L210 +/// If more details are needed can extend this structure. +message SlotEntryUpdate { + // The slot number of the block containing this Entry + uint64 slot = 1; + // The Entry's index in the block + uint64 index = 2; + // The number of executed transactions in the Entry + // If this number is zero, we can assume its a tick entry + uint64 executed_transaction_count = 3; +} + +message TimestampedSlotEntryUpdate { + // Time at which the message was generated + // Send relative timestamp in micros using u32 to reduce overhead. Provides ~71 mins of accuracy between sender and receiver + // See [compact_timestamp::to_system_time] + uint32 ts = 1; + // SlotEntryUpdate update + SlotEntryUpdate entry_update = 2; +} + +message SubscribeSlotEntryUpdateRequest {} + // The following __must__ be assumed: // - Clients may receive data for slots out of order. // - Clients may receive account updates for a given slot out of order. @@ -186,4 +209,8 @@ service Geyser { // Subscribes to block updates. rpc SubscribeBlockUpdates(SubscribeBlockUpdatesRequest) returns (stream TimestampedBlockUpdate) {} + + // Subscribes to entry updates. + // Returns the highest slot seen thus far and the entry index corresponding to the tick + rpc SubscribeSlotEntryUpdates(SubscribeSlotEntryUpdateRequest) returns (stream TimestampedSlotEntryUpdate) {} } diff --git a/proto/src/convert.rs b/proto/src/convert.rs index 72da2d0..e4dfaa9 100644 --- a/proto/src/convert.rs +++ b/proto/src/convert.rs @@ -23,8 +23,10 @@ use solana_transaction_status::{ TransactionWithStatusMeta, VersionedConfirmedBlock, VersionedTransactionWithStatusMeta, }; -use crate::solana::{entries, tx_by_addr}; -use crate::{solana::storage::confirmed_block, StoredExtendedRewards, StoredTransactionStatusMeta}; +use crate::{ + solana::{entries, storage::confirmed_block, tx_by_addr}, + StoredExtendedRewards, StoredTransactionStatusMeta, +}; impl From> for confirmed_block::Rewards { fn from(rewards: Vec) -> Self { diff --git a/server/Cargo.toml b/server/Cargo.toml index 51048d4..9e6c3de 100644 --- a/server/Cargo.toml +++ b/server/Cargo.toml @@ -21,6 +21,7 @@ prost-types = { workspace = true } serde = { workspace = true } serde_derive = { workspace = true } serde_json = { workspace = true } +serde_with = { workspace = true } solana-logger = { workspace = true } solana-metrics = { workspace = true } solana-program = { workspace = true } diff --git a/server/example-config.json b/server/example-config.json index 6d75cf2..527832f 100644 --- a/server/example-config.json +++ b/server/example-config.json @@ -3,10 +3,11 @@ "bind_address": "0.0.0.0:10000", "account_update_buffer_size": 100000, "slot_update_buffer_size": 100000, + "slot_entry_update_buffer_size": 1000000, "block_update_buffer_size": 100000, "transaction_update_buffer_size": 100000, "geyser_service_config": { "heartbeat_interval_ms": 1000, "subscriber_buffer_size": 1000000 } -} +} \ No newline at end of file diff --git a/server/src/compact_timestamp.rs b/server/src/compact_timestamp.rs new file mode 100644 index 0000000..abbca81 --- /dev/null +++ b/server/src/compact_timestamp.rs @@ -0,0 +1,74 @@ +use std::time::{Duration, SystemTime, UNIX_EPOCH}; + +/// A compact timestamp representation for microseconds using u32 to reduce overhead. Provides ~71 mins of accuracy + +pub fn get_current_time_us_u32() -> u32 { + from_system_time(SystemTime::now()) +} + +/// Returns the absolute duration since other timestamp +pub fn duration_since_us(timestamp_a_us: u32, timestamp_b_us: u32) -> u32 { + // use i64 to avoid overflow + let normal_diff = if timestamp_a_us > timestamp_b_us { + timestamp_a_us - timestamp_b_us + } else { + timestamp_b_us - timestamp_a_us + }; + let wraparound_diff = u32::MAX.wrapping_sub(normal_diff).wrapping_add(1); + normal_diff.min(wraparound_diff) +} + +pub fn from_system_time(from: SystemTime) -> u32 { + from.duration_since(UNIX_EPOCH).unwrap().as_micros() as u32 +} + +pub fn to_system_time(timestamp_us: u32) -> SystemTime { + let now = SystemTime::now(); + let now_us = now.duration_since(UNIX_EPOCH).unwrap().as_micros() as u32; + let diff = duration_since_us(now_us, timestamp_us); + now - Duration::from_micros(diff as u64) +} + +#[cfg(test)] +mod test { + use std::thread::sleep; + + use super::*; + + #[test] + fn test_duration_since_us() { + // test overflow behavior + for i in -100i32..100 { + for j in -100i32..100 { + assert_eq!(duration_since_us(i as u32, j as u32), i.abs_diff(j)); + } + } + + // Test specific cases + assert_eq!(duration_since_us(100, 50), 50); + assert_eq!(duration_since_us(50, 100), 50); + assert_eq!(duration_since_us(0, u32::MAX), 1); + assert_eq!(duration_since_us(u32::MAX, 0), 1); + } + + #[test] + fn test_get_current_time_us_u32() { + let sleep_time_us = 10_000u32; + let t1 = get_current_time_us_u32(); + sleep(Duration::from_micros(sleep_time_us as u64)); + let t2 = get_current_time_us_u32(); + assert!(t2 > t1); + assert!(duration_since_us(t1, t2) > sleep_time_us); + } + + #[test] + fn test_to_system_time() { + let now = SystemTime::now(); + let timestamp = from_system_time(now); + let reconstructed = to_system_time(timestamp); + let diff = now + .duration_since(reconstructed) + .unwrap_or_else(|e| e.duration()); + assert!(diff < Duration::from_micros(1)); + } +} diff --git a/server/src/geyser_grpc_plugin.rs b/server/src/geyser_grpc_plugin.rs index 426e043..6aba96b 100644 --- a/server/src/geyser_grpc_plugin.rs +++ b/server/src/geyser_grpc_plugin.rs @@ -13,21 +13,22 @@ use std::{ use agave_geyser_plugin_interface::geyser_plugin_interface::{ GeyserPlugin, GeyserPluginError, ReplicaAccountInfoVersions, ReplicaBlockInfoVersions, - ReplicaTransactionInfoVersions, Result as PluginResult, SlotStatus, + ReplicaEntryInfoVersions, ReplicaTransactionInfoVersions, Result as PluginResult, SlotStatus, }; use bs58; use crossbeam_channel::{bounded, Sender, TrySendError}; use jito_geyser_protos::solana::{ geyser::{ geyser_server::GeyserServer, AccountUpdate, BlockUpdate, SlotUpdate, SlotUpdateStatus, - TimestampedAccountUpdate, TimestampedBlockUpdate, TimestampedSlotUpdate, - TimestampedTransactionUpdate, TransactionUpdate, + TimestampedAccountUpdate, TimestampedBlockUpdate, TimestampedSlotEntryUpdate, + TimestampedSlotUpdate, TimestampedTransactionUpdate, TransactionUpdate, }, storage::confirmed_block::ConfirmedTransaction, }; use log::*; use serde_derive::Deserialize; use serde_json; +use serde_with::{serde_as, DefaultOnError}; use tokio::{runtime::Runtime, sync::oneshot}; use tonic::{ service::{interceptor::InterceptedService, Interceptor}, @@ -35,7 +36,10 @@ use tonic::{ Request, Status, }; -use crate::server::{GeyserService, GeyserServiceConfig}; +use crate::{ + compact_timestamp, + server::{GeyserService, GeyserServiceConfig}, +}; pub struct PluginData { runtime: Runtime, @@ -44,6 +48,7 @@ pub struct PluginData { /// Where updates are piped thru to the grpc service. account_update_sender: Sender, slot_update_sender: Sender, + slot_entry_update_sender: Sender, block_update_sender: Sender, transaction_update_sender: Sender, @@ -67,18 +72,46 @@ impl std::fmt::Debug for GeyserGrpcPlugin { } } +/// Helper macro to generate default functions for setting different values. +/// Sample usage: +/// generate_default_fns! { +/// default_slot_entry_update_buffer_size: usize = PluginConfig::DEFAULT_SLOT_ENTRY_UPDATE_BUFFER_SIZE, +/// } +macro_rules! generate_default_fns { + ($($name:ident: $type:ty = $value:expr),* $(,)?) => { + $( + fn $name() -> $type { + $value + } + )* + }; +} + +#[serde_as] #[derive(Clone, Debug, Deserialize)] pub struct PluginConfig { pub geyser_service_config: GeyserServiceConfig, pub bind_address: String, pub account_update_buffer_size: usize, pub slot_update_buffer_size: usize, + #[serde_as(deserialize_as = "DefaultOnError")] + #[serde(default = "default_slot_entry_update_buffer_size")] + pub slot_entry_update_buffer_size: usize, pub block_update_buffer_size: usize, pub transaction_update_buffer_size: usize, pub skip_startup_stream: Option, pub account_data_notifications_enabled: Option, } +impl PluginConfig { + const DEFAULT_SLOT_ENTRY_UPDATE_BUFFER_SIZE: usize = 1_000_000; +} + +// Can add default values for other fields here +generate_default_fns! { + default_slot_entry_update_buffer_size: usize = PluginConfig::DEFAULT_SLOT_ENTRY_UPDATE_BUFFER_SIZE, +} + impl GeyserPlugin for GeyserGrpcPlugin { fn name(&self) -> &'static str { "geyser-grpc-plugin" @@ -114,6 +147,8 @@ impl GeyserPlugin for GeyserGrpcPlugin { let highest_write_slot = Arc::new(AtomicU64::new(0)); let (account_update_sender, account_update_rx) = bounded(config.account_update_buffer_size); let (slot_update_sender, slot_update_rx) = bounded(config.slot_update_buffer_size); + let (slot_entry_update_sender, slot_entry_update_rx) = + bounded(config.slot_entry_update_buffer_size); let (block_update_sender, block_update_receiver) = bounded(config.block_update_buffer_size); let (transaction_update_sender, transaction_update_receiver) = @@ -123,6 +158,7 @@ impl GeyserPlugin for GeyserGrpcPlugin { config.geyser_service_config.clone(), account_update_rx, slot_update_rx, + slot_entry_update_rx, block_update_receiver, transaction_update_receiver, highest_write_slot.clone(), @@ -157,13 +193,16 @@ impl GeyserPlugin for GeyserGrpcPlugin { server_exit_sender: server_exit_tx, account_update_sender, slot_update_sender, + slot_entry_update_sender, block_update_sender, transaction_update_sender, highest_write_slot, is_startup_completed: AtomicBool::new(false), // don't skip startup to keep backwards compatability ignore_startup_updates: config.skip_startup_stream.unwrap_or(false), - account_data_notifications_enabled: config.account_data_notifications_enabled.unwrap_or(true), + account_data_notifications_enabled: config + .account_data_notifications_enabled + .unwrap_or(true), }); info!("plugin data initialized"); @@ -485,12 +524,71 @@ impl GeyserPlugin for GeyserGrpcPlugin { } fn account_data_notifications_enabled(&self) -> bool { - self.data.as_ref().map(|d| d.account_data_notifications_enabled).unwrap_or(true) + self.data + .as_ref() + .map(|d| d.account_data_notifications_enabled) + .unwrap_or(true) } fn transaction_notifications_enabled(&self) -> bool { true } + + fn entry_notifications_enabled(&self) -> bool { + true + } + + fn notify_entry(&self, entry: ReplicaEntryInfoVersions) -> PluginResult<()> { + let data = self.data.as_ref().expect("plugin must be initialized"); + + let slot_entry = utils::get_slot_and_index_from_replica_entry_info_versions(&entry); + + debug!( + "Updating slot entry {} at index {}", + slot_entry.slot, slot_entry.index + ); + + match data + .slot_entry_update_sender + .try_send(TimestampedSlotEntryUpdate { + ts: compact_timestamp::get_current_time_us_u32(), + entry_update: Some(slot_entry), + }) { + Ok(_) => Ok(()), + Err(TrySendError::Full(_)) => { + warn!("slot_entry_update channel full, skipping"); + Ok(()) + } + Err(TrySendError::Disconnected(_)) => { + error!("slot entry info send error"); + Err(GeyserPluginError::SlotStatusUpdateError { + msg: "slot_entry_update channel disconnected, exiting".to_string(), + }) + } + } + } +} + +mod utils { + use agave_geyser_plugin_interface::geyser_plugin_interface::ReplicaEntryInfoVersions; + use jito_geyser_protos::solana::geyser::SlotEntryUpdate; + + pub fn get_slot_and_index_from_replica_entry_info_versions( + entry: &ReplicaEntryInfoVersions, + ) -> SlotEntryUpdate { + match entry { + ReplicaEntryInfoVersions::V0_0_1(entry_info) => SlotEntryUpdate { + slot: entry_info.slot, + index: entry_info.index as u64, + executed_transaction_count: entry_info.executed_transaction_count, + }, + ReplicaEntryInfoVersions::V0_0_2(entry_info) => SlotEntryUpdate { + slot: entry_info.slot, + index: entry_info.index as u64, + executed_transaction_count: entry_info.executed_transaction_count, + }, + } + } } #[no_mangle] @@ -523,3 +621,104 @@ impl Interceptor for AccessTokenChecker { } } } + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_plugin_config_deserialization() { + let config_json = r#" + { + "libpath": "/path/to/container-output/libgeyser_grpc_plugin_server.so", + "bind_address": "0.0.0.0:10000", + "account_update_buffer_size": 100000, + "slot_update_buffer_size": 100000, + "slot_entry_update_buffer_size": 1000000, + "block_update_buffer_size": 100000, + "transaction_update_buffer_size": 100000, + "geyser_service_config": { + "heartbeat_interval_ms": 1000, + "subscriber_buffer_size": 1000000 + } + } + "#; + + let config: PluginConfig = serde_json::from_str(config_json).unwrap(); + + assert_eq!(config.bind_address, "0.0.0.0:10000"); + assert_eq!(config.account_update_buffer_size, 100000); + assert_eq!(config.slot_update_buffer_size, 100000); + assert_eq!(config.slot_entry_update_buffer_size, 1000000); + assert_eq!(config.block_update_buffer_size, 100000); + assert_eq!(config.transaction_update_buffer_size, 100000); + } + + // Please update the test when the default values are added + #[test] + fn test_plugin_config_missing_fields_error() { + let config_json = r#" + { + "bind_address": "0.0.0.0:10000", + "account_update_buffer_size": 100000, + "geyser_service_config": { + "heartbeat_interval_ms": 1000 + } + } + "#; + + let result: Result = serde_json::from_str(config_json); + assert!(result.is_err()); + } + + #[test] + fn test_plugin_config_invalid_types() { + let config_json = r#" + { + "bind_address": "0.0.0.0:10000", + "account_update_buffer_size": "not a number", + "slot_update_buffer_size": 100000, + "block_update_buffer_size": 100000, + "transaction_update_buffer_size": 100000, + "geyser_service_config": { + "heartbeat_interval_ms": 1000, + "subscriber_buffer_size": 1000000 + } + } + "#; + + let result: Result = serde_json::from_str(config_json); + assert!(result.is_err()); + } + + // We currently have default value for slot_entry_update_buffer_size, so this test will always pass + #[test] + fn test_plugin_config_no_slot_entry_update_buffer_size() { + let config_json = r#" + { + "libpath": "/path/to/container-output/libgeyser_grpc_plugin_server.so", + "bind_address": "0.0.0.0:10000", + "account_update_buffer_size": 100000, + "slot_update_buffer_size": 100000, + "block_update_buffer_size": 100000, + "transaction_update_buffer_size": 100000, + "geyser_service_config": { + "heartbeat_interval_ms": 1000, + "subscriber_buffer_size": 1000000 + } + } + "#; + + let config: PluginConfig = serde_json::from_str(config_json).unwrap(); + + assert_eq!(config.bind_address, "0.0.0.0:10000"); + assert_eq!(config.account_update_buffer_size, 100000); + assert_eq!(config.slot_update_buffer_size, 100000); + assert_eq!( + config.slot_entry_update_buffer_size, + PluginConfig::DEFAULT_SLOT_ENTRY_UPDATE_BUFFER_SIZE + ); + assert_eq!(config.block_update_buffer_size, 100000); + assert_eq!(config.transaction_update_buffer_size, 100000); + } +} diff --git a/server/src/lib.rs b/server/src/lib.rs index 0a4bb86..96cd45e 100644 --- a/server/src/lib.rs +++ b/server/src/lib.rs @@ -1,3 +1,4 @@ +pub mod compact_timestamp; pub mod geyser_grpc_plugin; pub mod server; pub(crate) mod subscription_stream; diff --git a/server/src/server.rs b/server/src/server.rs index 6465e85..2098bc1 100644 --- a/server/src/server.rs +++ b/server/src/server.rs @@ -15,8 +15,9 @@ use jito_geyser_protos::solana::geyser::{ GetHeartbeatIntervalResponse, Heartbeat, MaybePartialAccountUpdate, PartialAccountUpdate, SubscribeAccountUpdatesRequest, SubscribeBlockUpdatesRequest, SubscribePartialAccountUpdatesRequest, SubscribeProgramsUpdatesRequest, - SubscribeSlotUpdateRequest, SubscribeTransactionUpdatesRequest, TimestampedAccountUpdate, - TimestampedBlockUpdate, TimestampedSlotUpdate, TimestampedTransactionUpdate, + SubscribeSlotEntryUpdateRequest, SubscribeSlotUpdateRequest, + SubscribeTransactionUpdatesRequest, TimestampedAccountUpdate, TimestampedBlockUpdate, + TimestampedSlotEntryUpdate, TimestampedSlotUpdate, TimestampedTransactionUpdate, }; use log::*; use once_cell::sync::OnceCell; @@ -48,6 +49,7 @@ impl StreamClosedSender for SubscriptionClosedSender { type AccountUpdateSender = TokioSender>; type PartialAccountUpdateSender = TokioSender>; type SlotUpdateSender = TokioSender>; +type SlotEntryUpdateSender = TokioSender>; type TransactionUpdateSender = TokioSender>; type BlockUpdateSender = TokioSender>; @@ -144,6 +146,10 @@ struct SlotUpdateSubscription { subscription_tx: SlotUpdateSender, } +struct SlotEntryUpdateSubscription { + subscription_tx: SlotEntryUpdateSender, +} + impl ErrorStatusStreamer for SlotUpdateSubscription { fn stream_error(&self, status: Status) -> GeyserServiceResult<()> { self.subscription_tx @@ -157,6 +163,19 @@ impl ErrorStatusStreamer for SlotUpdateSubscription { } } +impl ErrorStatusStreamer for SlotEntryUpdateSubscription { + fn stream_error(&self, status: Status) -> GeyserServiceResult<()> { + self.subscription_tx + .try_send(Err(status)) + .map_err(|e| match e { + TokioTrySendError::Full(_) => GeyserServiceError::NotificationReceiverFull, + TokioTrySendError::Closed(_) => { + GeyserServiceError::NotificationReceiverDisconnected + } + }) + } +} + struct BlockUpdateSubscription { notification_sender: BlockUpdateSender, } @@ -212,6 +231,10 @@ enum SubscriptionAddedEvent { uuid: Uuid, notification_sender: SlotUpdateSender, }, + SlotEntryUpdateSubscription { + uuid: Uuid, + notification_sender: SlotEntryUpdateSender, + }, TransactionUpdateSubscription { uuid: Uuid, notification_sender: TransactionUpdateSender, @@ -234,6 +257,9 @@ impl Debug for SubscriptionAddedEvent { SubscriptionAddedEvent::SlotUpdateSubscription { uuid, .. } => { ("subscribe_slot_update".to_string(), uuid) } + SubscriptionAddedEvent::SlotEntryUpdateSubscription { uuid, .. } => { + ("subscribe_slot_entry_update".to_string(), uuid) + } SubscriptionAddedEvent::ProgramUpdateSubscription { uuid, .. } => { ("program_update_subscribe".to_string(), uuid) } @@ -264,6 +290,7 @@ enum SubscriptionClosedEvent { ProgramUpdateSubscription(Uuid), PartialAccountUpdateSubscription(Uuid), SlotUpdateSubscription(Uuid), + SlotEntryUpdateSubscription(Uuid), TransactionUpdateSubscription(Uuid), BlockUpdateSubscription(Uuid), } @@ -325,6 +352,8 @@ impl GeyserService { account_update_rx: Receiver, // Slot updates streamed from the validator. slot_update_rx: Receiver, + // Slot updates streamed from the validator. + slot_entry_update_rx: Receiver, // Block metadata receiver block_update_receiver: Receiver, // Transaction updates @@ -339,6 +368,7 @@ impl GeyserService { let t_hdl = Self::event_loop( account_update_rx, slot_update_rx, + slot_entry_update_rx, block_update_receiver, transaction_update_receiver, subscription_added_rx, @@ -365,9 +395,11 @@ impl GeyserService { /// 1. Add new subscriptions. /// 2. Cleanup closed subscriptions. /// 3. Receive geyser events and stream them to subscribers. + #[allow(clippy::too_many_arguments)] fn event_loop( account_update_rx: Receiver, slot_update_rx: Receiver, + slot_entry_update_rx: Receiver, block_update_receiver: Receiver, transaction_update_receiver: Receiver, subscription_added_rx: Receiver, @@ -387,6 +419,7 @@ impl GeyserService { PartialAccountUpdateSubscription, > = HashMap::new(); let mut slot_update_subscriptions: HashMap = HashMap::new(); + let mut slot_entry_update_subscriptions: HashMap = HashMap::new(); let mut transaction_update_subscriptions: HashMap = HashMap::new(); let mut block_update_subscriptions: HashMap = HashMap::new(); @@ -400,14 +433,14 @@ impl GeyserService { } recv(subscription_added_rx) -> maybe_subscription_added => { info!("received new subscription"); - if let Err(e) = Self::handle_subscription_added(maybe_subscription_added, &mut account_update_subscriptions, &mut partial_account_update_subscriptions, &mut slot_update_subscriptions, &mut program_update_subscriptions, &mut transaction_update_subscriptions, &mut block_update_subscriptions) { + if let Err(e) = Self::handle_subscription_added(maybe_subscription_added, &mut account_update_subscriptions, &mut partial_account_update_subscriptions, &mut slot_update_subscriptions, &mut slot_entry_update_subscriptions, &mut program_update_subscriptions, &mut transaction_update_subscriptions, &mut block_update_subscriptions) { error!("error adding new subscription: {}", e); return; } }, recv(subscription_closed_rx) -> maybe_subscription_closed => { info!("closing subscription"); - if let Err(e) = Self::handle_subscription_closed(maybe_subscription_closed, &mut account_update_subscriptions, &mut partial_account_update_subscriptions, &mut slot_update_subscriptions, &mut program_update_subscriptions, &mut transaction_update_subscriptions, &mut block_update_subscriptions) { + if let Err(e) = Self::handle_subscription_closed(maybe_subscription_closed, &mut account_update_subscriptions, &mut partial_account_update_subscriptions, &mut slot_update_subscriptions, &mut slot_entry_update_subscriptions, &mut program_update_subscriptions, &mut transaction_update_subscriptions, &mut block_update_subscriptions) { error!("error closing existing subscription: {}", e); return; } @@ -438,6 +471,18 @@ impl GeyserService { }, } }, + recv(slot_entry_update_rx) -> maybe_slot_entry_update => { + debug!("received slot entry update"); + match Self::handle_slot_entry_update_event(maybe_slot_entry_update, &slot_entry_update_subscriptions) { + Err(e) => { + error!("error handling a slot entry update event: {}", e); + return; + }, + Ok(failed_subscription_ids) => { + Self::drop_subscriptions(&failed_subscription_ids, &mut slot_entry_update_subscriptions); + }, + } + }, recv(block_update_receiver) -> maybe_block_update => { debug!("received block update"); match Self::handle_block_update_event(maybe_block_update, &block_update_subscriptions) { @@ -510,11 +555,13 @@ impl GeyserService { } /// Handles adding new subscriptions. + #[allow(clippy::too_many_arguments)] fn handle_subscription_added( maybe_subscription_added: Result, account_update_subscriptions: &mut HashMap, partial_account_update_subscriptions: &mut HashMap, slot_update_subscriptions: &mut HashMap, + slot_entry_update_subscriptions: &mut HashMap, program_update_subscriptions: &mut HashMap, transaction_update_subscriptions: &mut HashMap, block_update_subscriptions: &mut HashMap, @@ -555,6 +602,13 @@ impl GeyserService { } => { slot_update_subscriptions.insert(uuid, SlotUpdateSubscription { subscription_tx }); } + SubscriptionAddedEvent::SlotEntryUpdateSubscription { + uuid, + notification_sender: subscription_tx, + } => { + slot_entry_update_subscriptions + .insert(uuid, SlotEntryUpdateSubscription { subscription_tx }); + } SubscriptionAddedEvent::ProgramUpdateSubscription { uuid, notification_sender, @@ -596,11 +650,13 @@ impl GeyserService { } /// Handles closing existing subscriptions. + #[allow(clippy::too_many_arguments)] fn handle_subscription_closed( maybe_subscription_closed: Result, account_update_subscriptions: &mut HashMap, partial_account_update_subscriptions: &mut HashMap, slot_update_subscriptions: &mut HashMap, + slot_entry_update_subscriptions: &mut HashMap, program_update_subscriptions: &mut HashMap, transaction_update_subscriptions: &mut HashMap, block_update_subscriptions: &mut HashMap, @@ -618,6 +674,9 @@ impl GeyserService { SubscriptionClosedEvent::SlotUpdateSubscription(subscription_id) => { let _ = slot_update_subscriptions.remove(&subscription_id); } + SubscriptionClosedEvent::SlotEntryUpdateSubscription(subscription_id) => { + let _ = slot_entry_update_subscriptions.remove(&subscription_id); + } SubscriptionClosedEvent::ProgramUpdateSubscription(subscription_id) => { let _ = program_update_subscriptions.remove(&subscription_id); } @@ -740,6 +799,30 @@ impl GeyserService { Ok(failed_subscription_ids) } + /// Streams slot updates to subscribers + /// Returns a vector of UUIDs that failed to send to due to the subscription being closed + fn handle_slot_entry_update_event( + maybe_slot_entry_update: Result, + slot_entry_update_subscriptions: &HashMap, + ) -> GeyserServiceResult> { + let slot_entry_update = maybe_slot_entry_update?; + let failed_subscription_ids = slot_entry_update_subscriptions + .iter() + .filter_map(|(uuid, sub)| { + if matches!( + sub.subscription_tx.try_send(Ok(slot_entry_update.clone())), + Err(TokioTrySendError::Closed(_)) + ) { + Some(*uuid) + } else { + None + } + }) + .collect(); + + Ok(failed_subscription_ids) + } + /// Drop broken connections. fn drop_subscriptions( subscription_ids: &[Uuid], @@ -941,6 +1024,43 @@ impl Geyser for GeyserService { Ok(resp) } + type SubscribeSlotEntryUpdatesStream = SubscriptionStream; + async fn subscribe_slot_entry_updates( + &self, + _request: Request, + ) -> Result, Status> { + let (subscription_tx, subscription_rx) = + channel(self.service_config.subscriber_buffer_size); + + let uuid = Uuid::new_v4(); + self.subscription_added_tx + .try_send(SubscriptionAddedEvent::SlotEntryUpdateSubscription { + uuid, + notification_sender: subscription_tx, + }) + .map_err(|e| { + error!("failed to add subscribe_slot_entry_updates subscription: {e}"); + Status::internal("error adding subscription") + })?; + + let stream = SubscriptionStream::new( + subscription_rx, + uuid, + ( + self.subscription_closed_sender.clone(), + SubscriptionClosedEvent::SlotEntryUpdateSubscription(uuid), + ), + "subscribe_slot_entry_updates", + ); + let mut resp = Response::new(stream); + resp.metadata_mut().insert( + HIGHEST_WRITE_SLOT_HEADER, + MetadataValue::from(self.highest_write_slot.load(Ordering::Relaxed)), + ); + + Ok(resp) + } + type SubscribeTransactionUpdatesStream = SubscriptionStream; async fn subscribe_transaction_updates(