diff --git a/rpc/src/cache_block_meta_service.rs b/rpc/src/cache_block_meta_service.rs index 5d0dbefa7802a2..192b766e9f4a83 100644 --- a/rpc/src/cache_block_meta_service.rs +++ b/rpc/src/cache_block_meta_service.rs @@ -1,8 +1,10 @@ +//! The `CacheBlockMetaService` is responsible for persisting block metadata +//! from banks into the `Blockstore` + pub use solana_ledger::blockstore_processor::CacheBlockMetaSender; use { crossbeam_channel::{Receiver, RecvTimeoutError}, - solana_ledger::blockstore::Blockstore, - solana_measure::measure::Measure, + solana_ledger::blockstore::{Blockstore, BlockstoreError}, solana_runtime::bank::Bank, std::{ sync::{ @@ -20,8 +22,6 @@ pub struct CacheBlockMetaService { thread_hdl: JoinHandle<()>, } -const CACHE_BLOCK_TIME_WARNING_MS: u64 = 150; - impl CacheBlockMetaService { pub fn new( cache_block_meta_receiver: CacheBlockMetaReceiver, @@ -30,40 +30,39 @@ impl CacheBlockMetaService { ) -> Self { let thread_hdl = Builder::new() .name("solCacheBlkTime".to_string()) - .spawn(move || loop { - if exit.load(Ordering::Relaxed) { - break; - } - let recv_result = cache_block_meta_receiver.recv_timeout(Duration::from_secs(1)); - match recv_result { - Err(RecvTimeoutError::Disconnected) => { + .spawn(move || { + info!("CacheBlockMetaService has started"); + loop { + if exit.load(Ordering::Relaxed) { break; } - Ok(bank) => { - let mut cache_block_meta_timer = Measure::start("cache_block_meta_timer"); - Self::cache_block_meta(&bank, &blockstore); - cache_block_meta_timer.stop(); - if cache_block_meta_timer.as_ms() > CACHE_BLOCK_TIME_WARNING_MS { - warn!( - "cache_block_meta operation took: {}ms", - cache_block_meta_timer.as_ms() - ); + + let bank = match cache_block_meta_receiver.recv_timeout(Duration::from_secs(1)) + { + Ok(bank) => bank, + Err(RecvTimeoutError::Timeout) => continue, + Err(err @ RecvTimeoutError::Disconnected) => { + info!("CacheBlockMetaService is stopping because: {err}"); + break; } + }; + + if let Err(err) = Self::cache_block_meta(&bank, &blockstore) { + error!("CacheBlockMetaService is stopping because: {err}"); + // Set the exit flag to allow other services to gracefully stop + exit.store(true, Ordering::Relaxed); + break; } - _ => {} } + info!("CacheBlockMetaService has stopped"); }) .unwrap(); Self { thread_hdl } } - fn cache_block_meta(bank: &Bank, blockstore: &Blockstore) { - if let Err(e) = blockstore.cache_block_time(bank.slot(), bank.clock().unix_timestamp) { - error!("cache_block_time failed: slot {:?} {:?}", bank.slot(), e); - } - if let Err(e) = blockstore.cache_block_height(bank.slot(), bank.block_height()) { - error!("cache_block_height failed: slot {:?} {:?}", bank.slot(), e); - } + fn cache_block_meta(bank: &Bank, blockstore: &Blockstore) -> Result<(), BlockstoreError> { + blockstore.cache_block_time(bank.slot(), bank.clock().unix_timestamp)?; + blockstore.cache_block_height(bank.slot(), bank.block_height()) } pub fn join(self) -> thread::Result<()> {