Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Backport: Granular slot updates v2.0 (#52) #58

Merged
merged 2 commits into from
Nov 14, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
96 changes: 94 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ rand = "0.8"
serde = "1.0.160"
serde_derive = "1.0.160"
serde_json = "1.0.96"
serde_with = "=3.9.0"
solana-account-decoder = "=2.0.15"
solana-logger = "=2.0.15"
solana-metrics = "=2.0.15"
Expand Down
36 changes: 34 additions & 2 deletions client/src/geyser_consumer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,8 @@ use std::{
use jito_geyser_protos::solana::geyser::{
geyser_client::GeyserClient, maybe_partial_account_update, EmptyRequest,
MaybePartialAccountUpdate, SubscribeAccountUpdatesRequest,
SubscribePartialAccountUpdatesRequest, SubscribeSlotUpdateRequest, TimestampedAccountUpdate,
SubscribePartialAccountUpdatesRequest, SubscribeSlotEntryUpdateRequest,
SubscribeSlotUpdateRequest, TimestampedAccountUpdate,
};
use log::*;
use lru::LruCache;
Expand All @@ -32,7 +33,9 @@ use tonic::{codegen::InterceptedService, transport::Channel, Response, Status};

use crate::{
geyser_consumer::GeyserConsumerError::{MissedHeartbeat, StreamClosed},
types::{AccountUpdate, AccountUpdateNotification, PartialAccountUpdate, SlotUpdate},
types::{
AccountUpdate, AccountUpdateNotification, PartialAccountUpdate, SlotEntryUpdate, SlotUpdate,
},
GrpcInterceptor, Pubkey, Slot,
};

Expand Down Expand Up @@ -242,6 +245,35 @@ impl GeyserConsumer {
Ok(())
}

pub async fn consume_slot_entry_updates(
&self,
slot_updates_tx: UnboundedSender<SlotEntryUpdate>,
) -> Result<()> {
let mut c = self.client.clone();

let resp = c
.subscribe_slot_entry_updates(SubscribeSlotEntryUpdateRequest {})
.await?;
let mut stream = resp.into_inner();

while !self.exit.load(Ordering::Relaxed) {
match stream.message().await {
Ok(Some(slot_update)) => {
if slot_updates_tx
.send(SlotEntryUpdate::from(slot_update.entry_update.unwrap()))
.is_err()
{
return Err(GeyserConsumerError::ConsumerChannelDisconnected);
};
}
Ok(None) => return Err(StreamClosed),
Err(e) => return Err(GeyserConsumerError::from(e)),
}
}

Ok(())
}

#[allow(clippy::too_many_arguments)]
fn process_account_update(
maybe_message: std::result::Result<Option<TimestampedAccountUpdate>, Status>,
Expand Down
14 changes: 14 additions & 0 deletions client/src/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -136,3 +136,17 @@ impl From<geyser::SlotUpdate> for SlotUpdate {
}
}
}

pub struct SlotEntryUpdate {
pub slot: Slot,
pub index: u64,
}

impl From<geyser::SlotEntryUpdate> for SlotEntryUpdate {
fn from(proto: geyser::SlotEntryUpdate) -> Self {
Self {
slot: proto.slot,
index: proto.index,
}
}
}
27 changes: 27 additions & 0 deletions proto/proto/geyser.proto
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,29 @@ message GetHeartbeatIntervalResponse {
uint64 heartbeat_interval_ms = 1;
}

/// Modelled based off of https://github.com/solana-labs/solana/blob/v2.0/geyser-plugin-interface/src/geyser_plugin_interface.rs#L210
/// If more details are needed can extend this structure.
message SlotEntryUpdate {
// The slot number of the block containing this Entry
uint64 slot = 1;
// The Entry's index in the block
uint64 index = 2;
// The number of executed transactions in the Entry
// If this number is zero, we can assume its a tick entry
uint64 executed_transaction_count = 3;
}

message TimestampedSlotEntryUpdate {
// Time at which the message was generated
// Send relative timestamp in micros using u32 to reduce overhead. Provides ~71 mins of accuracy between sender and receiver
// See [compact_timestamp::to_system_time]
uint32 ts = 1;
// SlotEntryUpdate update
SlotEntryUpdate entry_update = 2;
}

message SubscribeSlotEntryUpdateRequest {}

// The following __must__ be assumed:
// - Clients may receive data for slots out of order.
// - Clients may receive account updates for a given slot out of order.
Expand Down Expand Up @@ -186,4 +209,8 @@ service Geyser {

// Subscribes to block updates.
rpc SubscribeBlockUpdates(SubscribeBlockUpdatesRequest) returns (stream TimestampedBlockUpdate) {}

// Subscribes to entry updates.
// Returns the highest slot seen thus far and the entry index corresponding to the tick
rpc SubscribeSlotEntryUpdates(SubscribeSlotEntryUpdateRequest) returns (stream TimestampedSlotEntryUpdate) {}
}
6 changes: 4 additions & 2 deletions proto/src/convert.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,10 @@ use solana_transaction_status::{
TransactionWithStatusMeta, VersionedConfirmedBlock, VersionedTransactionWithStatusMeta,
};

use crate::solana::{entries, tx_by_addr};
use crate::{solana::storage::confirmed_block, StoredExtendedRewards, StoredTransactionStatusMeta};
use crate::{
solana::{entries, storage::confirmed_block, tx_by_addr},
StoredExtendedRewards, StoredTransactionStatusMeta,
};

impl From<Vec<Reward>> for confirmed_block::Rewards {
fn from(rewards: Vec<Reward>) -> Self {
Expand Down
1 change: 1 addition & 0 deletions server/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ prost-types = { workspace = true }
serde = { workspace = true }
serde_derive = { workspace = true }
serde_json = { workspace = true }
serde_with = { workspace = true }
solana-logger = { workspace = true }
solana-metrics = { workspace = true }
solana-program = { workspace = true }
Expand Down
3 changes: 2 additions & 1 deletion server/example-config.json
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,11 @@
"bind_address": "0.0.0.0:10000",
"account_update_buffer_size": 100000,
"slot_update_buffer_size": 100000,
"slot_entry_update_buffer_size": 1000000,
"block_update_buffer_size": 100000,
"transaction_update_buffer_size": 100000,
"geyser_service_config": {
"heartbeat_interval_ms": 1000,
"subscriber_buffer_size": 1000000
}
}
}
Loading