diff --git a/core/src/replay_stage.rs b/core/src/replay_stage.rs index 2dbe2fdbc2e932..31595e6b6504d9 100644 --- a/core/src/replay_stage.rs +++ b/core/src/replay_stage.rs @@ -281,7 +281,6 @@ pub struct ReplayStageConfig { pub exit: Arc, pub rpc_subscriptions: Arc, pub leader_schedule_cache: Arc, - pub latest_root_senders: Vec>, pub accounts_background_request_sender: AbsRequestSender, pub block_commitment_cache: Arc>, pub transaction_status_sender: Option, @@ -551,7 +550,6 @@ impl ReplayStage { exit, rpc_subscriptions, leader_schedule_cache, - latest_root_senders, accounts_background_request_sender, block_commitment_cache, transaction_status_sender, @@ -951,7 +949,6 @@ impl ReplayStage { &leader_schedule_cache, &lockouts_sender, &accounts_background_request_sender, - &latest_root_senders, &rpc_subscriptions, &block_commitment_cache, &mut heaviest_subtree_fork_choice, @@ -2230,7 +2227,6 @@ impl ReplayStage { leader_schedule_cache: &Arc, lockouts_sender: &Sender, accounts_background_request_sender: &AbsRequestSender, - latest_root_senders: &[Sender], rpc_subscriptions: &Arc, block_commitment_cache: &Arc>, heaviest_subtree_fork_choice: &mut HeaviestSubtreeForkChoice, @@ -2319,11 +2315,6 @@ impl ReplayStage { .unwrap_or_else(|err| warn!("bank_notification_sender failed: {:?}", err)); } } - latest_root_senders.iter().for_each(|s| { - if let Err(e) = s.send(new_root) { - trace!("latest root send failed: {:?}", e); - } - }); info!("new root {}", new_root); } diff --git a/core/src/tvu.rs b/core/src/tvu.rs index d498ab405d39aa..b0fe93890761b4 100644 --- a/core/src/tvu.rs +++ b/core/src/tvu.rs @@ -246,14 +246,12 @@ impl Tvu { exit.clone(), ); - let (blockstore_cleanup_slot_sender, blockstore_cleanup_slot_receiver) = unbounded(); let replay_stage_config = ReplayStageConfig { vote_account: *vote_account, authorized_voter_keypairs, exit: exit.clone(), rpc_subscriptions: rpc_subscriptions.clone(), leader_schedule_cache: leader_schedule_cache.clone(), - latest_root_senders: vec![blockstore_cleanup_slot_sender], accounts_background_request_sender, block_commitment_cache, transaction_status_sender, @@ -322,12 +320,7 @@ impl Tvu { )?; let blockstore_cleanup_service = tvu_config.max_ledger_shreds.map(|max_ledger_shreds| { - BlockstoreCleanupService::new( - blockstore_cleanup_slot_receiver, - blockstore.clone(), - max_ledger_shreds, - exit.clone(), - ) + BlockstoreCleanupService::new(blockstore.clone(), max_ledger_shreds, exit.clone()) }); let duplicate_shred_listener = DuplicateShredListener::new( diff --git a/ledger/src/blockstore_cleanup_service.rs b/ledger/src/blockstore_cleanup_service.rs index d9212bf6ddfb58..2f79be6694844d 100644 --- a/ledger/src/blockstore_cleanup_service.rs +++ b/ledger/src/blockstore_cleanup_service.rs @@ -9,9 +9,8 @@ use { blockstore::{Blockstore, PurgeType}, blockstore_db::{Result as BlockstoreResult, DATA_SHRED_CF}, }, - crossbeam_channel::{Receiver, RecvTimeoutError}, solana_measure::measure::Measure, - solana_sdk::clock::Slot, + solana_sdk::clock::{Slot, DEFAULT_MS_PER_SLOT}, std::{ string::ToString, sync::{ @@ -19,7 +18,7 @@ use { Arc, }, thread::{self, Builder, JoinHandle}, - time::Duration, + time::{Duration, Instant}, }, }; @@ -36,46 +35,53 @@ pub const DEFAULT_MAX_LEDGER_SHREDS: u64 = 200_000_000; // Allow down to 50m, or 3.5 days at idle, 1hr at 50k load, around ~100GB pub const DEFAULT_MIN_MAX_LEDGER_SHREDS: u64 = 50_000_000; -// Check for removing slots at this interval so we don't purge too often -// and starve other blockstore users. -pub const DEFAULT_PURGE_SLOT_INTERVAL: u64 = 512; +// Perform blockstore cleanup at this interval to limit the overhead of cleanup +// Cleanup will be considered after the latest root has advanced by this value +const DEFAULT_CLEANUP_SLOT_INTERVAL: u64 = 512; +// The above slot interval can be roughly equated to a time interval. So, scale +// how often we check for cleanup with the interval. Doing so will avoid wasted +// checks when we know that the latest root could not have advanced far enough +// +// Given that the timing of new slots/roots is not exact, divide by 10 to avoid +// a long wait incase a check occurs just before the interval has elapsed +const LOOP_LIMITER: Duration = + Duration::from_millis(DEFAULT_CLEANUP_SLOT_INTERVAL * DEFAULT_MS_PER_SLOT / 10); pub struct BlockstoreCleanupService { t_cleanup: JoinHandle<()>, } impl BlockstoreCleanupService { - pub fn new( - new_root_receiver: Receiver, - blockstore: Arc, - max_ledger_shreds: u64, - exit: Arc, - ) -> Self { + pub fn new(blockstore: Arc, max_ledger_shreds: u64, exit: Arc) -> Self { let mut last_purge_slot = 0; - - info!( - "BlockstoreCleanupService active. max ledger shreds={}", - max_ledger_shreds - ); + let mut last_check_time = Instant::now(); let t_cleanup = Builder::new() .name("solBstoreClean".to_string()) - .spawn(move || loop { - if exit.load(Ordering::Relaxed) { - break; - } - if let Err(e) = Self::cleanup_ledger( - &new_root_receiver, - &blockstore, - max_ledger_shreds, - &mut last_purge_slot, - DEFAULT_PURGE_SLOT_INTERVAL, - ) { - match e { - RecvTimeoutError::Disconnected => break, - RecvTimeoutError::Timeout => (), + .spawn(move || { + info!( + "BlockstoreCleanupService has started with max \ + ledger shreds={max_ledger_shreds}", + ); + loop { + if exit.load(Ordering::Relaxed) { + break; + } + if last_check_time.elapsed() > LOOP_LIMITER { + Self::cleanup_ledger( + &blockstore, + max_ledger_shreds, + &mut last_purge_slot, + DEFAULT_CLEANUP_SLOT_INTERVAL, + ); + + last_check_time = Instant::now(); } + // Only sleep for 1 second instead of LOOP_LIMITER so that this + // thread can respond to the exit flag in a timely manner + thread::sleep(Duration::from_secs(1)); } + info!("BlockstoreCleanupService has stopped"); }) .unwrap(); @@ -136,8 +142,8 @@ impl BlockstoreCleanupService { .unwrap_or(lowest_slot); if highest_slot < lowest_slot { error!( - "Skipping cleanup: Blockstore highest slot {} < lowest slot {}", - highest_slot, lowest_slot + "Skipping Blockstore cleanup: \ + highest slot {highest_slot} < lowest slot {lowest_slot}", ); return (false, 0, num_shreds); } @@ -146,8 +152,8 @@ impl BlockstoreCleanupService { let num_slots = highest_slot - lowest_slot + 1; let mean_shreds_per_slot = num_shreds / num_slots; info!( - "{} alive shreds in slots [{}, {}], mean of {} shreds per slot", - num_shreds, lowest_slot, highest_slot, mean_shreds_per_slot + "Blockstore has {num_shreds} alive shreds in slots [{lowest_slot}, {highest_slot}], \ + mean of {mean_shreds_per_slot} shreds per slot", ); if num_shreds <= max_ledger_shreds { @@ -164,17 +170,11 @@ impl BlockstoreCleanupService { let lowest_cleanup_slot = std::cmp::min(lowest_slot + num_slots_to_clean - 1, root); (true, lowest_cleanup_slot, num_shreds) } else { - error!("Skipping cleanup: calculated mean of 0 shreds per slot"); + error!("Skipping Blockstore cleanup: calculated mean of 0 shreds per slot"); (false, 0, num_shreds) } } - fn receive_new_roots(new_root_receiver: &Receiver) -> Result { - let root = new_root_receiver.recv_timeout(Duration::from_secs(1))?; - // Get the newest root - Ok(new_root_receiver.try_iter().last().unwrap_or(root)) - } - /// Checks for new roots and initiates a cleanup if the last cleanup was at /// least `purge_interval` slots ago. A cleanup will no-op if the ledger /// already has fewer than `max_ledger_shreds`; otherwise, the cleanup will @@ -182,8 +182,6 @@ impl BlockstoreCleanupService { /// /// # Arguments /// - /// - `new_root_receiver`: signal receiver which contains the information - /// about what `Slot` is the current root. /// - `max_ledger_shreds`: the number of shreds to keep since the new root. /// - `last_purge_slot`: an both an input and output parameter indicating /// the id of the last purged slot. As an input parameter, it works @@ -191,85 +189,53 @@ impl BlockstoreCleanupService { /// ledger cleanup. As an output parameter, it will be updated if this /// function actually performs the ledger cleanup. /// - `purge_interval`: the minimum slot interval between two ledger - /// cleanup. When the root derived from `new_root_receiver` minus + /// cleanup. When the max root fetched from the Blockstore minus /// `last_purge_slot` is fewer than `purge_interval`, the function will /// simply return `Ok` without actually running the ledger cleanup. /// In this case, `purge_interval` will remain unchanged. /// /// Also see `blockstore::purge_slot`. pub fn cleanup_ledger( - new_root_receiver: &Receiver, blockstore: &Arc, max_ledger_shreds: u64, last_purge_slot: &mut u64, purge_interval: u64, - ) -> Result<(), RecvTimeoutError> { - let root = Self::receive_new_roots(new_root_receiver)?; + ) { + let root = blockstore.max_root(); if root - *last_purge_slot <= purge_interval { - return Ok(()); + return; } - - let disk_utilization_pre = blockstore.storage_size(); - info!( - "purge: last_root={}, last_purge_slot={}, purge_interval={}, disk_utilization={:?}", - root, last_purge_slot, purge_interval, disk_utilization_pre - ); - *last_purge_slot = root; + info!("Looking for Blockstore data to cleanup, latest root: {root}"); + let disk_utilization_pre = blockstore.storage_size(); let (slots_to_clean, lowest_cleanup_slot, total_shreds) = Self::find_slots_to_clean(blockstore, root, max_ledger_shreds); if slots_to_clean { - let purge_complete = Arc::new(AtomicBool::new(false)); - let blockstore = blockstore.clone(); - let purge_complete1 = purge_complete.clone(); - let _t_purge = Builder::new() - .name("solLedgerPurge".to_string()) - .spawn(move || { - let mut slot_update_time = Measure::start("slot_update"); - *blockstore.lowest_cleanup_slot.write().unwrap() = lowest_cleanup_slot; - slot_update_time.stop(); - - info!("purging data older than {}", lowest_cleanup_slot); - - let mut purge_time = Measure::start("purge_slots"); - - // purge any slots older than lowest_cleanup_slot. - blockstore.purge_slots(0, lowest_cleanup_slot, PurgeType::CompactionFilter); - // Update only after purge operation. - // Safety: This value can be used by compaction_filters shared via Arc. - // Compactions are async and run as a multi-threaded background job. However, this - // shouldn't cause consistency issues for iterators and getters because we have - // already expired all affected keys (older than or equal to lowest_cleanup_slot) - // by the above `purge_slots`. According to the general RocksDB design where SST - // files are immutable, even running iterators aren't affected; the database grabs - // a snapshot of the live set of sst files at iterator's creation. - // Also, we passed the PurgeType::CompactionFilter, meaning no delete_range for - // transaction_status and address_signatures CFs. These are fine because they - // don't require strong consistent view for their operation. - blockstore.set_max_expired_slot(lowest_cleanup_slot); - - purge_time.stop(); - info!("{}", purge_time); - - purge_complete1.store(true, Ordering::Relaxed); - }) - .unwrap(); - - // Keep pulling roots off `new_root_receiver` while purging to avoid channel buildup - while !purge_complete.load(Ordering::Relaxed) { - if let Err(err) = Self::receive_new_roots(new_root_receiver) { - debug!("receive_new_roots: {}", err); - } - thread::sleep(Duration::from_secs(1)); - } + *blockstore.lowest_cleanup_slot.write().unwrap() = lowest_cleanup_slot; + + let mut purge_time = Measure::start("purge_slots()"); + // purge any slots older than lowest_cleanup_slot. + blockstore.purge_slots(0, lowest_cleanup_slot, PurgeType::CompactionFilter); + // Update only after purge operation. + // Safety: This value can be used by compaction_filters shared via Arc. + // Compactions are async and run as a multi-threaded background job. However, this + // shouldn't cause consistency issues for iterators and getters because we have + // already expired all affected keys (older than or equal to lowest_cleanup_slot) + // by the above `purge_slots`. According to the general RocksDB design where SST + // files are immutable, even running iterators aren't affected; the database grabs + // a snapshot of the live set of sst files at iterator's creation. + // Also, we passed the PurgeType::CompactionFilter, meaning no delete_range for + // transaction_status and address_signatures CFs. These are fine because they + // don't require strong consistent view for their operation. + blockstore.set_max_expired_slot(lowest_cleanup_slot); + purge_time.stop(); + info!("Cleaned up Blockstore data older than slot {lowest_cleanup_slot}. {purge_time}"); } let disk_utilization_post = blockstore.storage_size(); Self::report_disk_metrics(disk_utilization_pre, disk_utilization_post, total_shreds); - - Ok(()) } fn report_disk_metrics( @@ -297,7 +263,6 @@ mod tests { use { super::*, crate::{blockstore::make_many_slot_entries, get_tmp_ledger_path_auto_delete}, - crossbeam_channel::unbounded, }; fn flush_blockstore_contents_to_disk(blockstore: Blockstore) -> Blockstore { @@ -388,7 +353,7 @@ mod tests { } #[test] - fn test_cleanup1() { + fn test_cleanup() { solana_logger::setup(); let ledger_path = get_tmp_ledger_path_auto_delete!(); let blockstore = Blockstore::open(ledger_path.path()).unwrap(); @@ -397,19 +362,11 @@ mod tests { // Initiate a flush so inserted shreds found by find_slots_to_clean() let blockstore = Arc::new(flush_blockstore_contents_to_disk(blockstore)); - let (sender, receiver) = unbounded(); - //send a signal to kill all but 5 shreds, which will be in the newest slots + // Mark 50 as a root to kill all but 5 shreds, which will be in the newest slots let mut last_purge_slot = 0; - sender.send(50).unwrap(); - BlockstoreCleanupService::cleanup_ledger( - &receiver, - &blockstore, - 5, - &mut last_purge_slot, - 10, - ) - .unwrap(); + blockstore.set_roots([50].iter()).unwrap(); + BlockstoreCleanupService::cleanup_ledger(&blockstore, 5, &mut last_purge_slot, 10); assert_eq!(last_purge_slot, 50); //check that 0-40 don't exist @@ -424,7 +381,6 @@ mod tests { solana_logger::setup(); let ledger_path = get_tmp_ledger_path_auto_delete!(); let blockstore = Arc::new(Blockstore::open(ledger_path.path()).unwrap()); - let (sender, receiver) = unbounded(); let mut first_insert = Measure::start("first_insert"); let initial_slots = 50; @@ -451,15 +407,13 @@ mod tests { insert_time.stop(); let mut time = Measure::start("purge time"); - sender.send(slot + num_slots).unwrap(); + blockstore.set_roots([slot + num_slots].iter()).unwrap(); BlockstoreCleanupService::cleanup_ledger( - &receiver, &blockstore, initial_slots, &mut last_purge_slot, 10, - ) - .unwrap(); + ); time.stop(); info!( "slot: {} size: {} {} {}",