Skip to content

Commit

Permalink
Add a subscription for slot entry updates
Browse files Browse the repository at this point in the history
  • Loading branch information
ebin-mathews committed Sep 9, 2024
1 parent 4d36b3f commit c1b31e1
Show file tree
Hide file tree
Showing 9 changed files with 497 additions and 12 deletions.
102 changes: 100 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ rand = "0.8"
serde = "1.0.160"
serde_derive = "1.0.160"
serde_json = "1.0.96"
serde_with = "3.9.0"
solana-account-decoder = "=1.18.22"
solana-geyser-plugin-interface = "=1.18.22"
solana-logger = "=1.18.22"
Expand Down
36 changes: 34 additions & 2 deletions client/src/geyser_consumer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,8 @@ use std::{
use jito_geyser_protos::solana::geyser::{
geyser_client::GeyserClient, maybe_partial_account_update, EmptyRequest,
MaybePartialAccountUpdate, SubscribeAccountUpdatesRequest,
SubscribePartialAccountUpdatesRequest, SubscribeSlotUpdateRequest, TimestampedAccountUpdate,
SubscribePartialAccountUpdatesRequest, SubscribeSlotEntryUpdateRequest,
SubscribeSlotUpdateRequest, TimestampedAccountUpdate,
};
use log::*;
use lru::LruCache;
Expand All @@ -32,7 +33,9 @@ use tonic::{codegen::InterceptedService, transport::Channel, Response, Status};

use crate::{
geyser_consumer::GeyserConsumerError::{MissedHeartbeat, StreamClosed},
types::{AccountUpdate, AccountUpdateNotification, PartialAccountUpdate, SlotUpdate},
types::{
AccountUpdate, AccountUpdateNotification, PartialAccountUpdate, SlotEntryUpdate, SlotUpdate,
},
GrpcInterceptor, Pubkey, Slot,
};

Expand Down Expand Up @@ -242,6 +245,35 @@ impl GeyserConsumer {
Ok(())
}

pub async fn consume_slot_entry_updates(
&self,
slot_updates_tx: UnboundedSender<SlotEntryUpdate>,
) -> Result<()> {
let mut c = self.client.clone();

let resp = c
.subscribe_slot_entry_updates(SubscribeSlotEntryUpdateRequest {})
.await?;
let mut stream = resp.into_inner();

while !self.exit.load(Ordering::Relaxed) {
match stream.message().await {
Ok(Some(slot_update)) => {
if slot_updates_tx
.send(SlotEntryUpdate::from(slot_update.entry_update.unwrap()))
.is_err()
{
return Err(GeyserConsumerError::ConsumerChannelDisconnected);
};
}
Ok(None) => return Err(StreamClosed),
Err(e) => return Err(GeyserConsumerError::from(e)),
}
}

Ok(())
}

#[allow(clippy::too_many_arguments)]
fn process_account_update(
maybe_message: std::result::Result<Option<TimestampedAccountUpdate>, Status>,
Expand Down
14 changes: 14 additions & 0 deletions client/src/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -136,3 +136,17 @@ impl From<geyser::SlotUpdate> for SlotUpdate {
}
}
}

pub struct SlotEntryUpdate {
pub slot: Slot,
pub index: u64,
}

impl From<geyser::SlotEntryUpdate> for SlotEntryUpdate {
fn from(proto: geyser::SlotEntryUpdate) -> Self {
Self {
slot: proto.slot,
index: proto.index,
}
}
}
23 changes: 23 additions & 0 deletions proto/proto/geyser.proto
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,25 @@ message GetHeartbeatIntervalResponse {
uint64 heartbeat_interval_ms = 1;
}

message SlotEntryUpdate {
// The slot number of the block containing this Entry
uint64 slot = 1;
// The Entry's index in the block
uint64 index = 2;
// The number of executed transactions in the Entry
// If this number is zero, we can assume its a tick entry
uint64 executed_transaction_count = 3;
}

message TimestampedSlotEntryUpdate {
// Time at which the message was generated
google.protobuf.Timestamp ts = 1;
// SlotEntryUpdate update
SlotEntryUpdate entry_update = 2;
}

message SubscribeSlotEntryUpdateRequest {}

// The following __must__ be assumed:
// - Clients may receive data for slots out of order.
// - Clients may receive account updates for a given slot out of order.
Expand Down Expand Up @@ -186,4 +205,8 @@ service Geyser {

// Subscribes to block updates.
rpc SubscribeBlockUpdates(SubscribeBlockUpdatesRequest) returns (stream TimestampedBlockUpdate) {}

// Subscribes to entry updates.
// Returns the highest slot seen thus far and the entry index corresponding to the tick
rpc SubscribeSlotEntryUpdates(SubscribeSlotEntryUpdateRequest) returns (stream TimestampedSlotEntryUpdate) {}
}
1 change: 1 addition & 0 deletions server/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ prost-types = { workspace = true }
serde = { workspace = true }
serde_derive = { workspace = true }
serde_json = { workspace = true }
serde_with = { workspace = true }
solana-geyser-plugin-interface = { workspace = true }
solana-logger = { workspace = true }
solana-metrics = { workspace = true }
Expand Down
3 changes: 2 additions & 1 deletion server/example-config.json
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,11 @@
"bind_address": "0.0.0.0:10000",
"account_update_buffer_size": 100000,
"slot_update_buffer_size": 100000,
"slot_entry_update_buffer_size": 1000000,
"block_update_buffer_size": 100000,
"transaction_update_buffer_size": 100000,
"geyser_service_config": {
"heartbeat_interval_ms": 1000,
"subscriber_buffer_size": 1000000
}
}
}
Loading

0 comments on commit c1b31e1

Please sign in to comment.