Skip to content

Commit

Permalink
rpc: Cleanup CacheBlockMetaService (#4077)
Browse files Browse the repository at this point in the history
* add a basic module description
* add thread start/stop logs for tracing
* make the thread stop (and set exit flag) on Blockstore error
  • Loading branch information
steviez authored Dec 13, 2024
1 parent 3261a3a commit 6dd364b
Showing 1 changed file with 27 additions and 28 deletions.
55 changes: 27 additions & 28 deletions rpc/src/cache_block_meta_service.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
//! The `CacheBlockMetaService` is responsible for persisting block metadata
//! from banks into the `Blockstore`
pub use solana_ledger::blockstore_processor::CacheBlockMetaSender;
use {
crossbeam_channel::{Receiver, RecvTimeoutError},
solana_ledger::blockstore::Blockstore,
solana_measure::measure::Measure,
solana_ledger::blockstore::{Blockstore, BlockstoreError},
solana_runtime::bank::Bank,
std::{
sync::{
Expand All @@ -20,8 +22,6 @@ pub struct CacheBlockMetaService {
thread_hdl: JoinHandle<()>,
}

const CACHE_BLOCK_TIME_WARNING_MS: u64 = 150;

impl CacheBlockMetaService {
pub fn new(
cache_block_meta_receiver: CacheBlockMetaReceiver,
Expand All @@ -30,40 +30,39 @@ impl CacheBlockMetaService {
) -> Self {
let thread_hdl = Builder::new()
.name("solCacheBlkTime".to_string())
.spawn(move || loop {
if exit.load(Ordering::Relaxed) {
break;
}
let recv_result = cache_block_meta_receiver.recv_timeout(Duration::from_secs(1));
match recv_result {
Err(RecvTimeoutError::Disconnected) => {
.spawn(move || {
info!("CacheBlockMetaService has started");
loop {
if exit.load(Ordering::Relaxed) {
break;
}
Ok(bank) => {
let mut cache_block_meta_timer = Measure::start("cache_block_meta_timer");
Self::cache_block_meta(&bank, &blockstore);
cache_block_meta_timer.stop();
if cache_block_meta_timer.as_ms() > CACHE_BLOCK_TIME_WARNING_MS {
warn!(
"cache_block_meta operation took: {}ms",
cache_block_meta_timer.as_ms()
);

let bank = match cache_block_meta_receiver.recv_timeout(Duration::from_secs(1))
{
Ok(bank) => bank,
Err(RecvTimeoutError::Timeout) => continue,
Err(err @ RecvTimeoutError::Disconnected) => {
info!("CacheBlockMetaService is stopping because: {err}");
break;
}
};

if let Err(err) = Self::cache_block_meta(&bank, &blockstore) {
error!("CacheBlockMetaService is stopping because: {err}");
// Set the exit flag to allow other services to gracefully stop
exit.store(true, Ordering::Relaxed);
break;
}
_ => {}
}
info!("CacheBlockMetaService has stopped");
})
.unwrap();
Self { thread_hdl }
}

fn cache_block_meta(bank: &Bank, blockstore: &Blockstore) {
if let Err(e) = blockstore.cache_block_time(bank.slot(), bank.clock().unix_timestamp) {
error!("cache_block_time failed: slot {:?} {:?}", bank.slot(), e);
}
if let Err(e) = blockstore.cache_block_height(bank.slot(), bank.block_height()) {
error!("cache_block_height failed: slot {:?} {:?}", bank.slot(), e);
}
fn cache_block_meta(bank: &Bank, blockstore: &Blockstore) -> Result<(), BlockstoreError> {
blockstore.cache_block_time(bank.slot(), bank.clock().unix_timestamp)?;
blockstore.cache_block_height(bank.slot(), bank.block_height())
}

pub fn join(self) -> thread::Result<()> {
Expand Down

0 comments on commit 6dd364b

Please sign in to comment.