From 4905076fb6722f4b88cf4e701ae8572d1a55c2bb Mon Sep 17 00:00:00 2001 From: steviez Date: Wed, 21 Feb 2024 10:16:16 -0600 Subject: [PATCH] Remove channel that sends roots to BlockstoreCleanupService (#35211) Currently, ReplayStage sends new roots to BlockstoreCleanupService, and BlockstoreCleanupService decides when to clean based on advancement of the latest root. This is totally unnecessary as the latest root is cached by the Blockstore, and this value can simply be fetched. This change removes the channel completely, and instead just fetches the latest root from Blockstore directly. Moreso, some logic is added to check the latest root less frequently, based on the set purge interval. All in all, we went from sending > 100 slots/min across a crossbeam channel to reading an atomic roughly 3 times/min, while also removing the need for an additional thread that read from the channel. --- core/src/replay_stage.rs | 9 -- core/src/tvu.rs | 9 +- ledger/src/blockstore_cleanup_service.rs | 192 +++++++++-------------- 3 files changed, 74 insertions(+), 136 deletions(-) diff --git a/core/src/replay_stage.rs b/core/src/replay_stage.rs index 2dbe2fdbc2e932..31595e6b6504d9 100644 --- a/core/src/replay_stage.rs +++ b/core/src/replay_stage.rs @@ -281,7 +281,6 @@ pub struct ReplayStageConfig { pub exit: Arc, pub rpc_subscriptions: Arc, pub leader_schedule_cache: Arc, - pub latest_root_senders: Vec>, pub accounts_background_request_sender: AbsRequestSender, pub block_commitment_cache: Arc>, pub transaction_status_sender: Option, @@ -551,7 +550,6 @@ impl ReplayStage { exit, rpc_subscriptions, leader_schedule_cache, - latest_root_senders, accounts_background_request_sender, block_commitment_cache, transaction_status_sender, @@ -951,7 +949,6 @@ impl ReplayStage { &leader_schedule_cache, &lockouts_sender, &accounts_background_request_sender, - &latest_root_senders, &rpc_subscriptions, &block_commitment_cache, &mut heaviest_subtree_fork_choice, @@ -2230,7 +2227,6 @@ impl ReplayStage { leader_schedule_cache: &Arc, lockouts_sender: &Sender, accounts_background_request_sender: &AbsRequestSender, - latest_root_senders: &[Sender], rpc_subscriptions: &Arc, block_commitment_cache: &Arc>, heaviest_subtree_fork_choice: &mut HeaviestSubtreeForkChoice, @@ -2319,11 +2315,6 @@ impl ReplayStage { .unwrap_or_else(|err| warn!("bank_notification_sender failed: {:?}", err)); } } - latest_root_senders.iter().for_each(|s| { - if let Err(e) = s.send(new_root) { - trace!("latest root send failed: {:?}", e); - } - }); info!("new root {}", new_root); } diff --git a/core/src/tvu.rs b/core/src/tvu.rs index d498ab405d39aa..b0fe93890761b4 100644 --- a/core/src/tvu.rs +++ b/core/src/tvu.rs @@ -246,14 +246,12 @@ impl Tvu { exit.clone(), ); - let (blockstore_cleanup_slot_sender, blockstore_cleanup_slot_receiver) = unbounded(); let replay_stage_config = ReplayStageConfig { vote_account: *vote_account, authorized_voter_keypairs, exit: exit.clone(), rpc_subscriptions: rpc_subscriptions.clone(), leader_schedule_cache: leader_schedule_cache.clone(), - latest_root_senders: vec![blockstore_cleanup_slot_sender], accounts_background_request_sender, block_commitment_cache, transaction_status_sender, @@ -322,12 +320,7 @@ impl Tvu { )?; let blockstore_cleanup_service = tvu_config.max_ledger_shreds.map(|max_ledger_shreds| { - BlockstoreCleanupService::new( - blockstore_cleanup_slot_receiver, - blockstore.clone(), - max_ledger_shreds, - exit.clone(), - ) + BlockstoreCleanupService::new(blockstore.clone(), max_ledger_shreds, exit.clone()) }); let duplicate_shred_listener = DuplicateShredListener::new( diff --git a/ledger/src/blockstore_cleanup_service.rs b/ledger/src/blockstore_cleanup_service.rs index d9212bf6ddfb58..2f79be6694844d 100644 --- a/ledger/src/blockstore_cleanup_service.rs +++ b/ledger/src/blockstore_cleanup_service.rs @@ -9,9 +9,8 @@ use { blockstore::{Blockstore, PurgeType}, blockstore_db::{Result as BlockstoreResult, DATA_SHRED_CF}, }, - crossbeam_channel::{Receiver, RecvTimeoutError}, solana_measure::measure::Measure, - solana_sdk::clock::Slot, + solana_sdk::clock::{Slot, DEFAULT_MS_PER_SLOT}, std::{ string::ToString, sync::{ @@ -19,7 +18,7 @@ use { Arc, }, thread::{self, Builder, JoinHandle}, - time::Duration, + time::{Duration, Instant}, }, }; @@ -36,46 +35,53 @@ pub const DEFAULT_MAX_LEDGER_SHREDS: u64 = 200_000_000; // Allow down to 50m, or 3.5 days at idle, 1hr at 50k load, around ~100GB pub const DEFAULT_MIN_MAX_LEDGER_SHREDS: u64 = 50_000_000; -// Check for removing slots at this interval so we don't purge too often -// and starve other blockstore users. -pub const DEFAULT_PURGE_SLOT_INTERVAL: u64 = 512; +// Perform blockstore cleanup at this interval to limit the overhead of cleanup +// Cleanup will be considered after the latest root has advanced by this value +const DEFAULT_CLEANUP_SLOT_INTERVAL: u64 = 512; +// The above slot interval can be roughly equated to a time interval. So, scale +// how often we check for cleanup with the interval. Doing so will avoid wasted +// checks when we know that the latest root could not have advanced far enough +// +// Given that the timing of new slots/roots is not exact, divide by 10 to avoid +// a long wait incase a check occurs just before the interval has elapsed +const LOOP_LIMITER: Duration = + Duration::from_millis(DEFAULT_CLEANUP_SLOT_INTERVAL * DEFAULT_MS_PER_SLOT / 10); pub struct BlockstoreCleanupService { t_cleanup: JoinHandle<()>, } impl BlockstoreCleanupService { - pub fn new( - new_root_receiver: Receiver, - blockstore: Arc, - max_ledger_shreds: u64, - exit: Arc, - ) -> Self { + pub fn new(blockstore: Arc, max_ledger_shreds: u64, exit: Arc) -> Self { let mut last_purge_slot = 0; - - info!( - "BlockstoreCleanupService active. max ledger shreds={}", - max_ledger_shreds - ); + let mut last_check_time = Instant::now(); let t_cleanup = Builder::new() .name("solBstoreClean".to_string()) - .spawn(move || loop { - if exit.load(Ordering::Relaxed) { - break; - } - if let Err(e) = Self::cleanup_ledger( - &new_root_receiver, - &blockstore, - max_ledger_shreds, - &mut last_purge_slot, - DEFAULT_PURGE_SLOT_INTERVAL, - ) { - match e { - RecvTimeoutError::Disconnected => break, - RecvTimeoutError::Timeout => (), + .spawn(move || { + info!( + "BlockstoreCleanupService has started with max \ + ledger shreds={max_ledger_shreds}", + ); + loop { + if exit.load(Ordering::Relaxed) { + break; + } + if last_check_time.elapsed() > LOOP_LIMITER { + Self::cleanup_ledger( + &blockstore, + max_ledger_shreds, + &mut last_purge_slot, + DEFAULT_CLEANUP_SLOT_INTERVAL, + ); + + last_check_time = Instant::now(); } + // Only sleep for 1 second instead of LOOP_LIMITER so that this + // thread can respond to the exit flag in a timely manner + thread::sleep(Duration::from_secs(1)); } + info!("BlockstoreCleanupService has stopped"); }) .unwrap(); @@ -136,8 +142,8 @@ impl BlockstoreCleanupService { .unwrap_or(lowest_slot); if highest_slot < lowest_slot { error!( - "Skipping cleanup: Blockstore highest slot {} < lowest slot {}", - highest_slot, lowest_slot + "Skipping Blockstore cleanup: \ + highest slot {highest_slot} < lowest slot {lowest_slot}", ); return (false, 0, num_shreds); } @@ -146,8 +152,8 @@ impl BlockstoreCleanupService { let num_slots = highest_slot - lowest_slot + 1; let mean_shreds_per_slot = num_shreds / num_slots; info!( - "{} alive shreds in slots [{}, {}], mean of {} shreds per slot", - num_shreds, lowest_slot, highest_slot, mean_shreds_per_slot + "Blockstore has {num_shreds} alive shreds in slots [{lowest_slot}, {highest_slot}], \ + mean of {mean_shreds_per_slot} shreds per slot", ); if num_shreds <= max_ledger_shreds { @@ -164,17 +170,11 @@ impl BlockstoreCleanupService { let lowest_cleanup_slot = std::cmp::min(lowest_slot + num_slots_to_clean - 1, root); (true, lowest_cleanup_slot, num_shreds) } else { - error!("Skipping cleanup: calculated mean of 0 shreds per slot"); + error!("Skipping Blockstore cleanup: calculated mean of 0 shreds per slot"); (false, 0, num_shreds) } } - fn receive_new_roots(new_root_receiver: &Receiver) -> Result { - let root = new_root_receiver.recv_timeout(Duration::from_secs(1))?; - // Get the newest root - Ok(new_root_receiver.try_iter().last().unwrap_or(root)) - } - /// Checks for new roots and initiates a cleanup if the last cleanup was at /// least `purge_interval` slots ago. A cleanup will no-op if the ledger /// already has fewer than `max_ledger_shreds`; otherwise, the cleanup will @@ -182,8 +182,6 @@ impl BlockstoreCleanupService { /// /// # Arguments /// - /// - `new_root_receiver`: signal receiver which contains the information - /// about what `Slot` is the current root. /// - `max_ledger_shreds`: the number of shreds to keep since the new root. /// - `last_purge_slot`: an both an input and output parameter indicating /// the id of the last purged slot. As an input parameter, it works @@ -191,85 +189,53 @@ impl BlockstoreCleanupService { /// ledger cleanup. As an output parameter, it will be updated if this /// function actually performs the ledger cleanup. /// - `purge_interval`: the minimum slot interval between two ledger - /// cleanup. When the root derived from `new_root_receiver` minus + /// cleanup. When the max root fetched from the Blockstore minus /// `last_purge_slot` is fewer than `purge_interval`, the function will /// simply return `Ok` without actually running the ledger cleanup. /// In this case, `purge_interval` will remain unchanged. /// /// Also see `blockstore::purge_slot`. pub fn cleanup_ledger( - new_root_receiver: &Receiver, blockstore: &Arc, max_ledger_shreds: u64, last_purge_slot: &mut u64, purge_interval: u64, - ) -> Result<(), RecvTimeoutError> { - let root = Self::receive_new_roots(new_root_receiver)?; + ) { + let root = blockstore.max_root(); if root - *last_purge_slot <= purge_interval { - return Ok(()); + return; } - - let disk_utilization_pre = blockstore.storage_size(); - info!( - "purge: last_root={}, last_purge_slot={}, purge_interval={}, disk_utilization={:?}", - root, last_purge_slot, purge_interval, disk_utilization_pre - ); - *last_purge_slot = root; + info!("Looking for Blockstore data to cleanup, latest root: {root}"); + let disk_utilization_pre = blockstore.storage_size(); let (slots_to_clean, lowest_cleanup_slot, total_shreds) = Self::find_slots_to_clean(blockstore, root, max_ledger_shreds); if slots_to_clean { - let purge_complete = Arc::new(AtomicBool::new(false)); - let blockstore = blockstore.clone(); - let purge_complete1 = purge_complete.clone(); - let _t_purge = Builder::new() - .name("solLedgerPurge".to_string()) - .spawn(move || { - let mut slot_update_time = Measure::start("slot_update"); - *blockstore.lowest_cleanup_slot.write().unwrap() = lowest_cleanup_slot; - slot_update_time.stop(); - - info!("purging data older than {}", lowest_cleanup_slot); - - let mut purge_time = Measure::start("purge_slots"); - - // purge any slots older than lowest_cleanup_slot. - blockstore.purge_slots(0, lowest_cleanup_slot, PurgeType::CompactionFilter); - // Update only after purge operation. - // Safety: This value can be used by compaction_filters shared via Arc. - // Compactions are async and run as a multi-threaded background job. However, this - // shouldn't cause consistency issues for iterators and getters because we have - // already expired all affected keys (older than or equal to lowest_cleanup_slot) - // by the above `purge_slots`. According to the general RocksDB design where SST - // files are immutable, even running iterators aren't affected; the database grabs - // a snapshot of the live set of sst files at iterator's creation. - // Also, we passed the PurgeType::CompactionFilter, meaning no delete_range for - // transaction_status and address_signatures CFs. These are fine because they - // don't require strong consistent view for their operation. - blockstore.set_max_expired_slot(lowest_cleanup_slot); - - purge_time.stop(); - info!("{}", purge_time); - - purge_complete1.store(true, Ordering::Relaxed); - }) - .unwrap(); - - // Keep pulling roots off `new_root_receiver` while purging to avoid channel buildup - while !purge_complete.load(Ordering::Relaxed) { - if let Err(err) = Self::receive_new_roots(new_root_receiver) { - debug!("receive_new_roots: {}", err); - } - thread::sleep(Duration::from_secs(1)); - } + *blockstore.lowest_cleanup_slot.write().unwrap() = lowest_cleanup_slot; + + let mut purge_time = Measure::start("purge_slots()"); + // purge any slots older than lowest_cleanup_slot. + blockstore.purge_slots(0, lowest_cleanup_slot, PurgeType::CompactionFilter); + // Update only after purge operation. + // Safety: This value can be used by compaction_filters shared via Arc. + // Compactions are async and run as a multi-threaded background job. However, this + // shouldn't cause consistency issues for iterators and getters because we have + // already expired all affected keys (older than or equal to lowest_cleanup_slot) + // by the above `purge_slots`. According to the general RocksDB design where SST + // files are immutable, even running iterators aren't affected; the database grabs + // a snapshot of the live set of sst files at iterator's creation. + // Also, we passed the PurgeType::CompactionFilter, meaning no delete_range for + // transaction_status and address_signatures CFs. These are fine because they + // don't require strong consistent view for their operation. + blockstore.set_max_expired_slot(lowest_cleanup_slot); + purge_time.stop(); + info!("Cleaned up Blockstore data older than slot {lowest_cleanup_slot}. {purge_time}"); } let disk_utilization_post = blockstore.storage_size(); Self::report_disk_metrics(disk_utilization_pre, disk_utilization_post, total_shreds); - - Ok(()) } fn report_disk_metrics( @@ -297,7 +263,6 @@ mod tests { use { super::*, crate::{blockstore::make_many_slot_entries, get_tmp_ledger_path_auto_delete}, - crossbeam_channel::unbounded, }; fn flush_blockstore_contents_to_disk(blockstore: Blockstore) -> Blockstore { @@ -388,7 +353,7 @@ mod tests { } #[test] - fn test_cleanup1() { + fn test_cleanup() { solana_logger::setup(); let ledger_path = get_tmp_ledger_path_auto_delete!(); let blockstore = Blockstore::open(ledger_path.path()).unwrap(); @@ -397,19 +362,11 @@ mod tests { // Initiate a flush so inserted shreds found by find_slots_to_clean() let blockstore = Arc::new(flush_blockstore_contents_to_disk(blockstore)); - let (sender, receiver) = unbounded(); - //send a signal to kill all but 5 shreds, which will be in the newest slots + // Mark 50 as a root to kill all but 5 shreds, which will be in the newest slots let mut last_purge_slot = 0; - sender.send(50).unwrap(); - BlockstoreCleanupService::cleanup_ledger( - &receiver, - &blockstore, - 5, - &mut last_purge_slot, - 10, - ) - .unwrap(); + blockstore.set_roots([50].iter()).unwrap(); + BlockstoreCleanupService::cleanup_ledger(&blockstore, 5, &mut last_purge_slot, 10); assert_eq!(last_purge_slot, 50); //check that 0-40 don't exist @@ -424,7 +381,6 @@ mod tests { solana_logger::setup(); let ledger_path = get_tmp_ledger_path_auto_delete!(); let blockstore = Arc::new(Blockstore::open(ledger_path.path()).unwrap()); - let (sender, receiver) = unbounded(); let mut first_insert = Measure::start("first_insert"); let initial_slots = 50; @@ -451,15 +407,13 @@ mod tests { insert_time.stop(); let mut time = Measure::start("purge time"); - sender.send(slot + num_slots).unwrap(); + blockstore.set_roots([slot + num_slots].iter()).unwrap(); BlockstoreCleanupService::cleanup_ledger( - &receiver, &blockstore, initial_slots, &mut last_purge_slot, 10, - ) - .unwrap(); + ); time.stop(); info!( "slot: {} size: {} {} {}",