Skip to content

Commit

Permalink
dont stream updates on startup
Browse files Browse the repository at this point in the history
  • Loading branch information
buffalu committed Apr 8, 2024
1 parent 0946fd3 commit 62d5e6a
Showing 1 changed file with 34 additions and 4 deletions.
38 changes: 34 additions & 4 deletions server/src/geyser_grpc_plugin.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ use std::{
},
time::SystemTime,
};
use std::sync::atomic::AtomicBool;

use bs58;
use crossbeam_channel::{bounded, Sender, TrySendError};
Expand Down Expand Up @@ -44,6 +45,9 @@ pub struct PluginData {

/// Highest slot that an account write has been processed for thus far.
highest_write_slot: Arc<AtomicU64>,

is_startup_completed: AtomicBool,
ignore_startup_updates: bool
}

#[derive(Default)]
Expand All @@ -66,6 +70,7 @@ pub struct PluginConfig {
pub slot_update_buffer_size: usize,
pub block_update_buffer_size: usize,
pub transaction_update_buffer_size: usize,
pub skip_startup_stream: Option<bool>
}

impl GeyserPlugin for GeyserGrpcPlugin {
Expand All @@ -90,6 +95,8 @@ impl GeyserPlugin for GeyserGrpcPlugin {
msg: format!("Error deserializing PluginConfig: {err:?}"),
})?;

info!("loaded geyser config: {:?}", config);

let addr =
config
.bind_address
Expand Down Expand Up @@ -134,6 +141,9 @@ impl GeyserPlugin for GeyserGrpcPlugin {
block_update_sender,
transaction_update_sender,
highest_write_slot,
is_startup_completed: AtomicBool::new(false),
// don't skip startup to keep backwards compatability
ignore_startup_updates: config.skip_startup_stream.unwrap_or(false)
});
info!("plugin data initialized");

Expand All @@ -150,13 +160,23 @@ impl GeyserPlugin for GeyserGrpcPlugin {
data.runtime.shutdown_background();
}

fn notify_end_of_startup(&self) -> PluginResult<()> {
self.data.as_ref().unwrap().is_startup_completed.store(true, Ordering::Relaxed);
Ok(())
}

fn update_account(
&self,
account: ReplicaAccountInfoVersions,
slot: u64,
is_startup: bool,
) -> PluginResult<()> {
let data = self.data.as_ref().expect("plugin must be initialized");

if data.ignore_startup_updates && !data.is_startup_completed.load(Ordering::Relaxed) {
return Ok(());
}

let account_update = match account {
ReplicaAccountInfoVersions::V0_0_1(account) => TimestampedAccountUpdate {
ts: Some(prost_types::Timestamp::from(SystemTime::now())),
Expand Down Expand Up @@ -254,17 +274,18 @@ impl GeyserPlugin for GeyserGrpcPlugin {
}
}

fn notify_end_of_startup(&self) -> PluginResult<()> {
Ok(())
}

fn update_slot_status(
&self,
slot: u64,
parent_slot: Option<u64>,
status: SlotStatus,
) -> PluginResult<()> {
let data = self.data.as_ref().expect("plugin must be initialized");

if data.ignore_startup_updates && !data.is_startup_completed.load(Ordering::Relaxed) {
return Ok(());
}

debug!("Updating slot {:?} at with status {:?}", slot, status);

let status = match status {
Expand Down Expand Up @@ -301,6 +322,11 @@ impl GeyserPlugin for GeyserGrpcPlugin {
slot: u64,
) -> PluginResult<()> {
let data = self.data.as_ref().expect("plugin must be initialized");

if data.ignore_startup_updates && !data.is_startup_completed.load(Ordering::Relaxed) {
return Ok(());
}

let transaction_update = match transaction {
ReplicaTransactionInfoVersions::V0_0_1(tx) => TimestampedTransactionUpdate {
ts: Some(prost_types::Timestamp::from(SystemTime::now())),
Expand Down Expand Up @@ -348,6 +374,10 @@ impl GeyserPlugin for GeyserGrpcPlugin {
fn notify_block_metadata(&self, block_info: ReplicaBlockInfoVersions) -> PluginResult<()> {
let data = self.data.as_ref().expect("plugin must be initialized");

if data.ignore_startup_updates && !data.is_startup_completed.load(Ordering::Relaxed) {
return Ok(());
}

let block = match block_info {
ReplicaBlockInfoVersions::V0_0_1(block) => TimestampedBlockUpdate {
ts: Some(prost_types::Timestamp::from(SystemTime::now())),
Expand Down

0 comments on commit 62d5e6a

Please sign in to comment.