diff --git a/Cargo.lock b/Cargo.lock index 815fe60..94a760b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -471,6 +471,12 @@ version = "0.21.7" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9d297deb1925b89f2ccc13d7635fa0714f12c87adce1c75356b39ca9b7178567" +[[package]] +name = "base64" +version = "0.22.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "72b3254f16251a8381aa12e40e3c4d2f0199f8c6508fbecb9d91f575e0fbb8c6" + [[package]] name = "bincode" version = "1.3.3" @@ -1029,6 +1035,16 @@ dependencies = [ "syn 2.0.48", ] +[[package]] +name = "deranged" +version = "0.3.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b42b6fa04a440b495c8b04d0e71b707c585f83cb9cb28cf8cd0d976c315e31b4" +dependencies = [ + "powerfmt", + "serde", +] + [[package]] name = "derivation-path" version = "0.2.0" @@ -1369,6 +1385,7 @@ dependencies = [ "serde", "serde_derive", "serde_json", + "serde_with 3.9.0", "solana-geyser-plugin-interface", "solana-logger", "solana-metrics", @@ -1493,6 +1510,12 @@ version = "0.3.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "fed44880c466736ef9a5c5b5facefb5ed0785676d0c02d612db14e54f0d84286" +[[package]] +name = "hex" +version = "0.4.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7f24254aa9a54b5c858eaee2f5bccdb46aaf0e486a595ed5fd8f86ba55232a70" + [[package]] name = "hmac" version = "0.8.1" @@ -1676,6 +1699,7 @@ checksum = "bd070e393353796e801d209ad339e89596eb4c8d430d18ede6a1cced8fafbd99" dependencies = [ "autocfg", "hashbrown 0.12.3", + "serde", ] [[package]] @@ -1686,6 +1710,7 @@ checksum = "824b2ae422412366ba479e8111fd301f7b5faece8149317bb81925979a53f520" dependencies = [ "equivalent", "hashbrown 0.14.1", + "serde", ] [[package]] @@ -2030,6 +2055,12 @@ dependencies = [ "num-traits", ] +[[package]] +name = "num-conv" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "51d515d32fb182ee37cda2ccdcb92950d6a3c2893aa280e540671c2cd0f3b1d9" + [[package]] name = "num-derive" version = "0.3.3" @@ -2295,6 +2326,12 @@ dependencies = [ "universal-hash", ] +[[package]] +name = "powerfmt" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "439ee305def115ba05938db6eb1644ff94165c5ab5e9420d1c1bcedbba909391" + [[package]] name = "ppv-lite86" version = "0.2.17" @@ -2837,7 +2874,25 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "07ff71d2c147a7b57362cead5e22f772cd52f6ab31cfcd9edcd7f6aeb2a0afbe" dependencies = [ "serde", - "serde_with_macros", + "serde_with_macros 2.3.3", +] + +[[package]] +name = "serde_with" +version = "3.9.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "69cecfa94848272156ea67b2b1a53f20fc7bc638c4a46d2f8abde08f05f4b857" +dependencies = [ + "base64 0.22.1", + "chrono", + "hex", + "indexmap 1.9.3", + "indexmap 2.2.2", + "serde", + "serde_derive", + "serde_json", + "serde_with_macros 3.9.0", + "time", ] [[package]] @@ -2852,6 +2907,18 @@ dependencies = [ "syn 2.0.48", ] +[[package]] +name = "serde_with_macros" +version = "3.9.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a8fee4991ef4f274617a51ad4af30519438dacb2f56ac773b08a1922ff743350" +dependencies = [ + "darling", + "proc-macro2", + "quote", + "syn 2.0.48", +] + [[package]] name = "sha2" version = "0.9.9" @@ -3203,7 +3270,7 @@ dependencies = [ "serde_bytes", "serde_derive", "serde_json", - "serde_with", + "serde_with 2.3.3", "sha2 0.10.8", "sha3 0.10.8", "siphasher", @@ -3659,6 +3726,37 @@ dependencies = [ "syn 2.0.48", ] +[[package]] +name = "time" +version = "0.3.36" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5dfd88e563464686c916c7e46e623e520ddc6d79fa6641390f2e3fa86e83e885" +dependencies = [ + "deranged", + "itoa", + "num-conv", + "powerfmt", + "serde", + "time-core", + "time-macros", +] + +[[package]] +name = "time-core" +version = "0.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ef927ca75afb808a4d64dd374f00a2adf8d0fcff8e7b184af886c3c87ec4a3f3" + +[[package]] +name = "time-macros" +version = "0.2.18" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3f252a68540fde3a3877aeea552b832b40ab9a69e318efd078774a01ddee1ccf" +dependencies = [ + "num-conv", + "time-core", +] + [[package]] name = "tiny-bip39" version = "0.8.2" diff --git a/Cargo.toml b/Cargo.toml index d4da6fd..fc77308 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -36,6 +36,7 @@ rand = "0.8" serde = "1.0.160" serde_derive = "1.0.160" serde_json = "1.0.96" +serde_with = "3.9.0" solana-account-decoder = "=1.18.22" solana-geyser-plugin-interface = "=1.18.22" solana-logger = "=1.18.22" diff --git a/client/src/geyser_consumer.rs b/client/src/geyser_consumer.rs index 6bbd012..bb171e7 100644 --- a/client/src/geyser_consumer.rs +++ b/client/src/geyser_consumer.rs @@ -19,7 +19,8 @@ use std::{ use jito_geyser_protos::solana::geyser::{ geyser_client::GeyserClient, maybe_partial_account_update, EmptyRequest, MaybePartialAccountUpdate, SubscribeAccountUpdatesRequest, - SubscribePartialAccountUpdatesRequest, SubscribeSlotUpdateRequest, TimestampedAccountUpdate, + SubscribePartialAccountUpdatesRequest, SubscribeSlotEntryUpdateRequest, + SubscribeSlotUpdateRequest, TimestampedAccountUpdate, }; use log::*; use lru::LruCache; @@ -32,7 +33,9 @@ use tonic::{codegen::InterceptedService, transport::Channel, Response, Status}; use crate::{ geyser_consumer::GeyserConsumerError::{MissedHeartbeat, StreamClosed}, - types::{AccountUpdate, AccountUpdateNotification, PartialAccountUpdate, SlotUpdate}, + types::{ + AccountUpdate, AccountUpdateNotification, PartialAccountUpdate, SlotEntryUpdate, SlotUpdate, + }, GrpcInterceptor, Pubkey, Slot, }; @@ -242,6 +245,35 @@ impl GeyserConsumer { Ok(()) } + pub async fn consume_slot_entry_updates( + &self, + slot_updates_tx: UnboundedSender, + ) -> Result<()> { + let mut c = self.client.clone(); + + let resp = c + .subscribe_slot_entry_updates(SubscribeSlotEntryUpdateRequest {}) + .await?; + let mut stream = resp.into_inner(); + + while !self.exit.load(Ordering::Relaxed) { + match stream.message().await { + Ok(Some(slot_update)) => { + if slot_updates_tx + .send(SlotEntryUpdate::from(slot_update.entry_update.unwrap())) + .is_err() + { + return Err(GeyserConsumerError::ConsumerChannelDisconnected); + }; + } + Ok(None) => return Err(StreamClosed), + Err(e) => return Err(GeyserConsumerError::from(e)), + } + } + + Ok(()) + } + #[allow(clippy::too_many_arguments)] fn process_account_update( maybe_message: std::result::Result, Status>, diff --git a/client/src/types.rs b/client/src/types.rs index 3dd670f..701df64 100644 --- a/client/src/types.rs +++ b/client/src/types.rs @@ -136,3 +136,17 @@ impl From for SlotUpdate { } } } + +pub struct SlotEntryUpdate { + pub slot: Slot, + pub index: u64, +} + +impl From for SlotEntryUpdate { + fn from(proto: geyser::SlotEntryUpdate) -> Self { + Self { + slot: proto.slot, + index: proto.index, + } + } +} diff --git a/proto/proto/geyser.proto b/proto/proto/geyser.proto index 0287a2e..5ed85e1 100644 --- a/proto/proto/geyser.proto +++ b/proto/proto/geyser.proto @@ -157,6 +157,27 @@ message GetHeartbeatIntervalResponse { uint64 heartbeat_interval_ms = 1; } +/// Modelled based off of https://github.com/solana-labs/solana/blob/v1.18/geyser-plugin-interface/src/geyser_plugin_interface.rs#L210 +/// If more details are needed can extend this structure. +message SlotEntryUpdate { + // The slot number of the block containing this Entry + uint64 slot = 1; + // The Entry's index in the block + uint64 index = 2; + // The number of executed transactions in the Entry + // If this number is zero, we can assume its a tick entry + uint64 executed_transaction_count = 3; +} + +message TimestampedSlotEntryUpdate { + // Time at which the message was generated + google.protobuf.Timestamp ts = 1; + // SlotEntryUpdate update + SlotEntryUpdate entry_update = 2; +} + +message SubscribeSlotEntryUpdateRequest {} + // The following __must__ be assumed: // - Clients may receive data for slots out of order. // - Clients may receive account updates for a given slot out of order. @@ -186,4 +207,8 @@ service Geyser { // Subscribes to block updates. rpc SubscribeBlockUpdates(SubscribeBlockUpdatesRequest) returns (stream TimestampedBlockUpdate) {} + + // Subscribes to entry updates. + // Returns the highest slot seen thus far and the entry index corresponding to the tick + rpc SubscribeSlotEntryUpdates(SubscribeSlotEntryUpdateRequest) returns (stream TimestampedSlotEntryUpdate) {} } diff --git a/server/Cargo.toml b/server/Cargo.toml index 93083ce..b04b180 100644 --- a/server/Cargo.toml +++ b/server/Cargo.toml @@ -20,6 +20,7 @@ prost-types = { workspace = true } serde = { workspace = true } serde_derive = { workspace = true } serde_json = { workspace = true } +serde_with = { workspace = true } solana-geyser-plugin-interface = { workspace = true } solana-logger = { workspace = true } solana-metrics = { workspace = true } diff --git a/server/example-config.json b/server/example-config.json index 6d75cf2..527832f 100644 --- a/server/example-config.json +++ b/server/example-config.json @@ -3,10 +3,11 @@ "bind_address": "0.0.0.0:10000", "account_update_buffer_size": 100000, "slot_update_buffer_size": 100000, + "slot_entry_update_buffer_size": 1000000, "block_update_buffer_size": 100000, "transaction_update_buffer_size": 100000, "geyser_service_config": { "heartbeat_interval_ms": 1000, "subscriber_buffer_size": 1000000 } -} +} \ No newline at end of file diff --git a/server/src/geyser_grpc_plugin.rs b/server/src/geyser_grpc_plugin.rs index 3b5a3c6..9c22c12 100644 --- a/server/src/geyser_grpc_plugin.rs +++ b/server/src/geyser_grpc_plugin.rs @@ -16,17 +16,18 @@ use crossbeam_channel::{bounded, Sender, TrySendError}; use jito_geyser_protos::solana::{ geyser::{ geyser_server::GeyserServer, AccountUpdate, BlockUpdate, SlotUpdate, SlotUpdateStatus, - TimestampedAccountUpdate, TimestampedBlockUpdate, TimestampedSlotUpdate, - TimestampedTransactionUpdate, TransactionUpdate, + TimestampedAccountUpdate, TimestampedBlockUpdate, TimestampedSlotEntryUpdate, + TimestampedSlotUpdate, TimestampedTransactionUpdate, TransactionUpdate, }, storage::confirmed_block::ConfirmedTransaction, }; use log::*; use serde_derive::Deserialize; use serde_json; +use serde_with::{serde_as, DefaultOnError}; use solana_geyser_plugin_interface::geyser_plugin_interface::{ GeyserPlugin, GeyserPluginError, ReplicaAccountInfoVersions, ReplicaBlockInfoVersions, - ReplicaTransactionInfoVersions, Result as PluginResult, SlotStatus, + ReplicaEntryInfoVersions, ReplicaTransactionInfoVersions, Result as PluginResult, SlotStatus, }; use tokio::{runtime::Runtime, sync::oneshot}; use tonic::{ @@ -44,6 +45,7 @@ pub struct PluginData { /// Where updates are piped thru to the grpc service. account_update_sender: Sender, slot_update_sender: Sender, + slot_entry_update_sender: Sender, block_update_sender: Sender, transaction_update_sender: Sender, @@ -66,17 +68,45 @@ impl std::fmt::Debug for GeyserGrpcPlugin { } } +/// Helper macro to generate default functions for setting different values. +/// Sample usage: +/// generate_default_fns! { +/// default_slot_entry_update_buffer_size: usize = PluginConfig::DEFAULT_SLOT_ENTRY_UPDATE_BUFFER_SIZE, +/// } +macro_rules! generate_default_fns { + ($($name:ident: $type:ty = $value:expr),* $(,)?) => { + $( + fn $name() -> $type { + $value + } + )* + }; +} + +#[serde_as] #[derive(Clone, Debug, Deserialize)] pub struct PluginConfig { pub geyser_service_config: GeyserServiceConfig, pub bind_address: String, pub account_update_buffer_size: usize, pub slot_update_buffer_size: usize, + #[serde_as(deserialize_as = "DefaultOnError")] + #[serde(default = "default_slot_entry_update_buffer_size")] + pub slot_entry_update_buffer_size: usize, pub block_update_buffer_size: usize, pub transaction_update_buffer_size: usize, pub skip_startup_stream: Option, } +impl PluginConfig { + const DEFAULT_SLOT_ENTRY_UPDATE_BUFFER_SIZE: usize = 1_000_000; +} + +// Can add default values for other fields here +generate_default_fns! { + default_slot_entry_update_buffer_size: usize = PluginConfig::DEFAULT_SLOT_ENTRY_UPDATE_BUFFER_SIZE, +} + impl GeyserPlugin for GeyserGrpcPlugin { fn name(&self) -> &'static str { "geyser-grpc-plugin" @@ -112,6 +142,8 @@ impl GeyserPlugin for GeyserGrpcPlugin { let highest_write_slot = Arc::new(AtomicU64::new(0)); let (account_update_sender, account_update_rx) = bounded(config.account_update_buffer_size); let (slot_update_sender, slot_update_rx) = bounded(config.slot_update_buffer_size); + let (slot_entry_update_sender, slot_entry_update_rx) = + bounded(config.slot_entry_update_buffer_size); let (block_update_sender, block_update_receiver) = bounded(config.block_update_buffer_size); let (transaction_update_sender, transaction_update_receiver) = @@ -121,6 +153,7 @@ impl GeyserPlugin for GeyserGrpcPlugin { config.geyser_service_config.clone(), account_update_rx, slot_update_rx, + slot_entry_update_rx, block_update_receiver, transaction_update_receiver, highest_write_slot.clone(), @@ -155,6 +188,7 @@ impl GeyserPlugin for GeyserGrpcPlugin { server_exit_sender: server_exit_tx, account_update_sender, slot_update_sender, + slot_entry_update_sender, block_update_sender, transaction_update_sender, highest_write_slot, @@ -468,6 +502,66 @@ impl GeyserPlugin for GeyserGrpcPlugin { fn transaction_notifications_enabled(&self) -> bool { true } + + fn entry_notifications_enabled(&self) -> bool { + true + } + + fn notify_entry(&self, entry: ReplicaEntryInfoVersions) -> PluginResult<()> { + let data = self.data.as_ref().expect("plugin must be initialized"); + + if data.ignore_startup_updates && !data.is_startup_completed.load(Ordering::Relaxed) { + return Ok(()); + } + + let slot_entry = utils::get_slot_and_index_from_replica_entry_info_versions(&entry); + + debug!( + "Updating slot entry {} at index {}", + slot_entry.slot, slot_entry.index + ); + + match data + .slot_entry_update_sender + .try_send(TimestampedSlotEntryUpdate { + ts: Some(prost_types::Timestamp::from(SystemTime::now())), + entry_update: Some(slot_entry), + }) { + Ok(_) => Ok(()), + Err(TrySendError::Full(_)) => { + warn!("slot_entry_update channel full, skipping"); + Ok(()) + } + Err(TrySendError::Disconnected(_)) => { + error!("slot entry info send error"); + Err(GeyserPluginError::SlotStatusUpdateError { + msg: "slot_entry_update channel disconnected, exiting".to_string(), + }) + } + } + } +} + +mod utils { + use jito_geyser_protos::solana::geyser::SlotEntryUpdate; + use solana_geyser_plugin_interface::geyser_plugin_interface::ReplicaEntryInfoVersions; + + pub fn get_slot_and_index_from_replica_entry_info_versions( + entry: &ReplicaEntryInfoVersions, + ) -> SlotEntryUpdate { + match entry { + ReplicaEntryInfoVersions::V0_0_1(entry_info) => SlotEntryUpdate { + slot: entry_info.slot, + index: entry_info.index as u64, + executed_transaction_count: entry_info.executed_transaction_count, + }, + ReplicaEntryInfoVersions::V0_0_2(entry_info) => SlotEntryUpdate { + slot: entry_info.slot, + index: entry_info.index as u64, + executed_transaction_count: entry_info.executed_transaction_count, + }, + } + } } #[no_mangle] @@ -500,3 +594,104 @@ impl Interceptor for AccessTokenChecker { } } } + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_plugin_config_deserialization() { + let config_json = r#" + { + "libpath": "/path/to/container-output/libgeyser_grpc_plugin_server.so", + "bind_address": "0.0.0.0:10000", + "account_update_buffer_size": 100000, + "slot_update_buffer_size": 100000, + "slot_entry_update_buffer_size": 1000000, + "block_update_buffer_size": 100000, + "transaction_update_buffer_size": 100000, + "geyser_service_config": { + "heartbeat_interval_ms": 1000, + "subscriber_buffer_size": 1000000 + } + } + "#; + + let config: PluginConfig = serde_json::from_str(config_json).unwrap(); + + assert_eq!(config.bind_address, "0.0.0.0:10000"); + assert_eq!(config.account_update_buffer_size, 100000); + assert_eq!(config.slot_update_buffer_size, 100000); + assert_eq!(config.slot_entry_update_buffer_size, 1000000); + assert_eq!(config.block_update_buffer_size, 100000); + assert_eq!(config.transaction_update_buffer_size, 100000); + } + + // Please update the test when the default values are added + #[test] + fn test_plugin_config_missing_fields_error() { + let config_json = r#" + { + "bind_address": "0.0.0.0:10000", + "account_update_buffer_size": 100000, + "geyser_service_config": { + "heartbeat_interval_ms": 1000 + } + } + "#; + + let result: Result = serde_json::from_str(config_json); + assert!(result.is_err()); + } + + #[test] + fn test_plugin_config_invalid_types() { + let config_json = r#" + { + "bind_address": "0.0.0.0:10000", + "account_update_buffer_size": "not a number", + "slot_update_buffer_size": 100000, + "block_update_buffer_size": 100000, + "transaction_update_buffer_size": 100000, + "geyser_service_config": { + "heartbeat_interval_ms": 1000, + "subscriber_buffer_size": 1000000 + } + } + "#; + + let result: Result = serde_json::from_str(config_json); + assert!(result.is_err()); + } + + // We currently have default value for slot_entry_update_buffer_size, so this test will always pass + #[test] + fn test_plugin_config_no_slot_entry_update_buffer_size() { + let config_json = r#" + { + "libpath": "/path/to/container-output/libgeyser_grpc_plugin_server.so", + "bind_address": "0.0.0.0:10000", + "account_update_buffer_size": 100000, + "slot_update_buffer_size": 100000, + "block_update_buffer_size": 100000, + "transaction_update_buffer_size": 100000, + "geyser_service_config": { + "heartbeat_interval_ms": 1000, + "subscriber_buffer_size": 1000000 + } + } + "#; + + let config: PluginConfig = serde_json::from_str(config_json).unwrap(); + + assert_eq!(config.bind_address, "0.0.0.0:10000"); + assert_eq!(config.account_update_buffer_size, 100000); + assert_eq!(config.slot_update_buffer_size, 100000); + assert_eq!( + config.slot_entry_update_buffer_size, + PluginConfig::DEFAULT_SLOT_ENTRY_UPDATE_BUFFER_SIZE + ); + assert_eq!(config.block_update_buffer_size, 100000); + assert_eq!(config.transaction_update_buffer_size, 100000); + } +} diff --git a/server/src/server.rs b/server/src/server.rs index 6465e85..2098bc1 100644 --- a/server/src/server.rs +++ b/server/src/server.rs @@ -15,8 +15,9 @@ use jito_geyser_protos::solana::geyser::{ GetHeartbeatIntervalResponse, Heartbeat, MaybePartialAccountUpdate, PartialAccountUpdate, SubscribeAccountUpdatesRequest, SubscribeBlockUpdatesRequest, SubscribePartialAccountUpdatesRequest, SubscribeProgramsUpdatesRequest, - SubscribeSlotUpdateRequest, SubscribeTransactionUpdatesRequest, TimestampedAccountUpdate, - TimestampedBlockUpdate, TimestampedSlotUpdate, TimestampedTransactionUpdate, + SubscribeSlotEntryUpdateRequest, SubscribeSlotUpdateRequest, + SubscribeTransactionUpdatesRequest, TimestampedAccountUpdate, TimestampedBlockUpdate, + TimestampedSlotEntryUpdate, TimestampedSlotUpdate, TimestampedTransactionUpdate, }; use log::*; use once_cell::sync::OnceCell; @@ -48,6 +49,7 @@ impl StreamClosedSender for SubscriptionClosedSender { type AccountUpdateSender = TokioSender>; type PartialAccountUpdateSender = TokioSender>; type SlotUpdateSender = TokioSender>; +type SlotEntryUpdateSender = TokioSender>; type TransactionUpdateSender = TokioSender>; type BlockUpdateSender = TokioSender>; @@ -144,6 +146,10 @@ struct SlotUpdateSubscription { subscription_tx: SlotUpdateSender, } +struct SlotEntryUpdateSubscription { + subscription_tx: SlotEntryUpdateSender, +} + impl ErrorStatusStreamer for SlotUpdateSubscription { fn stream_error(&self, status: Status) -> GeyserServiceResult<()> { self.subscription_tx @@ -157,6 +163,19 @@ impl ErrorStatusStreamer for SlotUpdateSubscription { } } +impl ErrorStatusStreamer for SlotEntryUpdateSubscription { + fn stream_error(&self, status: Status) -> GeyserServiceResult<()> { + self.subscription_tx + .try_send(Err(status)) + .map_err(|e| match e { + TokioTrySendError::Full(_) => GeyserServiceError::NotificationReceiverFull, + TokioTrySendError::Closed(_) => { + GeyserServiceError::NotificationReceiverDisconnected + } + }) + } +} + struct BlockUpdateSubscription { notification_sender: BlockUpdateSender, } @@ -212,6 +231,10 @@ enum SubscriptionAddedEvent { uuid: Uuid, notification_sender: SlotUpdateSender, }, + SlotEntryUpdateSubscription { + uuid: Uuid, + notification_sender: SlotEntryUpdateSender, + }, TransactionUpdateSubscription { uuid: Uuid, notification_sender: TransactionUpdateSender, @@ -234,6 +257,9 @@ impl Debug for SubscriptionAddedEvent { SubscriptionAddedEvent::SlotUpdateSubscription { uuid, .. } => { ("subscribe_slot_update".to_string(), uuid) } + SubscriptionAddedEvent::SlotEntryUpdateSubscription { uuid, .. } => { + ("subscribe_slot_entry_update".to_string(), uuid) + } SubscriptionAddedEvent::ProgramUpdateSubscription { uuid, .. } => { ("program_update_subscribe".to_string(), uuid) } @@ -264,6 +290,7 @@ enum SubscriptionClosedEvent { ProgramUpdateSubscription(Uuid), PartialAccountUpdateSubscription(Uuid), SlotUpdateSubscription(Uuid), + SlotEntryUpdateSubscription(Uuid), TransactionUpdateSubscription(Uuid), BlockUpdateSubscription(Uuid), } @@ -325,6 +352,8 @@ impl GeyserService { account_update_rx: Receiver, // Slot updates streamed from the validator. slot_update_rx: Receiver, + // Slot updates streamed from the validator. + slot_entry_update_rx: Receiver, // Block metadata receiver block_update_receiver: Receiver, // Transaction updates @@ -339,6 +368,7 @@ impl GeyserService { let t_hdl = Self::event_loop( account_update_rx, slot_update_rx, + slot_entry_update_rx, block_update_receiver, transaction_update_receiver, subscription_added_rx, @@ -365,9 +395,11 @@ impl GeyserService { /// 1. Add new subscriptions. /// 2. Cleanup closed subscriptions. /// 3. Receive geyser events and stream them to subscribers. + #[allow(clippy::too_many_arguments)] fn event_loop( account_update_rx: Receiver, slot_update_rx: Receiver, + slot_entry_update_rx: Receiver, block_update_receiver: Receiver, transaction_update_receiver: Receiver, subscription_added_rx: Receiver, @@ -387,6 +419,7 @@ impl GeyserService { PartialAccountUpdateSubscription, > = HashMap::new(); let mut slot_update_subscriptions: HashMap = HashMap::new(); + let mut slot_entry_update_subscriptions: HashMap = HashMap::new(); let mut transaction_update_subscriptions: HashMap = HashMap::new(); let mut block_update_subscriptions: HashMap = HashMap::new(); @@ -400,14 +433,14 @@ impl GeyserService { } recv(subscription_added_rx) -> maybe_subscription_added => { info!("received new subscription"); - if let Err(e) = Self::handle_subscription_added(maybe_subscription_added, &mut account_update_subscriptions, &mut partial_account_update_subscriptions, &mut slot_update_subscriptions, &mut program_update_subscriptions, &mut transaction_update_subscriptions, &mut block_update_subscriptions) { + if let Err(e) = Self::handle_subscription_added(maybe_subscription_added, &mut account_update_subscriptions, &mut partial_account_update_subscriptions, &mut slot_update_subscriptions, &mut slot_entry_update_subscriptions, &mut program_update_subscriptions, &mut transaction_update_subscriptions, &mut block_update_subscriptions) { error!("error adding new subscription: {}", e); return; } }, recv(subscription_closed_rx) -> maybe_subscription_closed => { info!("closing subscription"); - if let Err(e) = Self::handle_subscription_closed(maybe_subscription_closed, &mut account_update_subscriptions, &mut partial_account_update_subscriptions, &mut slot_update_subscriptions, &mut program_update_subscriptions, &mut transaction_update_subscriptions, &mut block_update_subscriptions) { + if let Err(e) = Self::handle_subscription_closed(maybe_subscription_closed, &mut account_update_subscriptions, &mut partial_account_update_subscriptions, &mut slot_update_subscriptions, &mut slot_entry_update_subscriptions, &mut program_update_subscriptions, &mut transaction_update_subscriptions, &mut block_update_subscriptions) { error!("error closing existing subscription: {}", e); return; } @@ -438,6 +471,18 @@ impl GeyserService { }, } }, + recv(slot_entry_update_rx) -> maybe_slot_entry_update => { + debug!("received slot entry update"); + match Self::handle_slot_entry_update_event(maybe_slot_entry_update, &slot_entry_update_subscriptions) { + Err(e) => { + error!("error handling a slot entry update event: {}", e); + return; + }, + Ok(failed_subscription_ids) => { + Self::drop_subscriptions(&failed_subscription_ids, &mut slot_entry_update_subscriptions); + }, + } + }, recv(block_update_receiver) -> maybe_block_update => { debug!("received block update"); match Self::handle_block_update_event(maybe_block_update, &block_update_subscriptions) { @@ -510,11 +555,13 @@ impl GeyserService { } /// Handles adding new subscriptions. + #[allow(clippy::too_many_arguments)] fn handle_subscription_added( maybe_subscription_added: Result, account_update_subscriptions: &mut HashMap, partial_account_update_subscriptions: &mut HashMap, slot_update_subscriptions: &mut HashMap, + slot_entry_update_subscriptions: &mut HashMap, program_update_subscriptions: &mut HashMap, transaction_update_subscriptions: &mut HashMap, block_update_subscriptions: &mut HashMap, @@ -555,6 +602,13 @@ impl GeyserService { } => { slot_update_subscriptions.insert(uuid, SlotUpdateSubscription { subscription_tx }); } + SubscriptionAddedEvent::SlotEntryUpdateSubscription { + uuid, + notification_sender: subscription_tx, + } => { + slot_entry_update_subscriptions + .insert(uuid, SlotEntryUpdateSubscription { subscription_tx }); + } SubscriptionAddedEvent::ProgramUpdateSubscription { uuid, notification_sender, @@ -596,11 +650,13 @@ impl GeyserService { } /// Handles closing existing subscriptions. + #[allow(clippy::too_many_arguments)] fn handle_subscription_closed( maybe_subscription_closed: Result, account_update_subscriptions: &mut HashMap, partial_account_update_subscriptions: &mut HashMap, slot_update_subscriptions: &mut HashMap, + slot_entry_update_subscriptions: &mut HashMap, program_update_subscriptions: &mut HashMap, transaction_update_subscriptions: &mut HashMap, block_update_subscriptions: &mut HashMap, @@ -618,6 +674,9 @@ impl GeyserService { SubscriptionClosedEvent::SlotUpdateSubscription(subscription_id) => { let _ = slot_update_subscriptions.remove(&subscription_id); } + SubscriptionClosedEvent::SlotEntryUpdateSubscription(subscription_id) => { + let _ = slot_entry_update_subscriptions.remove(&subscription_id); + } SubscriptionClosedEvent::ProgramUpdateSubscription(subscription_id) => { let _ = program_update_subscriptions.remove(&subscription_id); } @@ -740,6 +799,30 @@ impl GeyserService { Ok(failed_subscription_ids) } + /// Streams slot updates to subscribers + /// Returns a vector of UUIDs that failed to send to due to the subscription being closed + fn handle_slot_entry_update_event( + maybe_slot_entry_update: Result, + slot_entry_update_subscriptions: &HashMap, + ) -> GeyserServiceResult> { + let slot_entry_update = maybe_slot_entry_update?; + let failed_subscription_ids = slot_entry_update_subscriptions + .iter() + .filter_map(|(uuid, sub)| { + if matches!( + sub.subscription_tx.try_send(Ok(slot_entry_update.clone())), + Err(TokioTrySendError::Closed(_)) + ) { + Some(*uuid) + } else { + None + } + }) + .collect(); + + Ok(failed_subscription_ids) + } + /// Drop broken connections. fn drop_subscriptions( subscription_ids: &[Uuid], @@ -941,6 +1024,43 @@ impl Geyser for GeyserService { Ok(resp) } + type SubscribeSlotEntryUpdatesStream = SubscriptionStream; + async fn subscribe_slot_entry_updates( + &self, + _request: Request, + ) -> Result, Status> { + let (subscription_tx, subscription_rx) = + channel(self.service_config.subscriber_buffer_size); + + let uuid = Uuid::new_v4(); + self.subscription_added_tx + .try_send(SubscriptionAddedEvent::SlotEntryUpdateSubscription { + uuid, + notification_sender: subscription_tx, + }) + .map_err(|e| { + error!("failed to add subscribe_slot_entry_updates subscription: {e}"); + Status::internal("error adding subscription") + })?; + + let stream = SubscriptionStream::new( + subscription_rx, + uuid, + ( + self.subscription_closed_sender.clone(), + SubscriptionClosedEvent::SlotEntryUpdateSubscription(uuid), + ), + "subscribe_slot_entry_updates", + ); + let mut resp = Response::new(stream); + resp.metadata_mut().insert( + HIGHEST_WRITE_SLOT_HEADER, + MetadataValue::from(self.highest_write_slot.load(Ordering::Relaxed)), + ); + + Ok(resp) + } + type SubscribeTransactionUpdatesStream = SubscriptionStream; async fn subscribe_transaction_updates(