From 478f190cd4c29089a300f7a1f6b1f591adfcec24 Mon Sep 17 00:00:00 2001 From: "Jeff Washington (jwash)" Date: Wed, 5 Jun 2024 10:37:14 -0500 Subject: [PATCH] skip packing multi ref ancient slots until sufficient normal slots are packed (#1583) * don't pack multi ref slots until safe * sub 1 to get math perfect --- accounts-db/src/accounts_db.rs | 6 + accounts-db/src/ancient_append_vecs.rs | 460 +++++++++++++++++-------- 2 files changed, 329 insertions(+), 137 deletions(-) diff --git a/accounts-db/src/accounts_db.rs b/accounts-db/src/accounts_db.rs index 03c85d405d5517..c36cb57ce38bc0 100644 --- a/accounts-db/src/accounts_db.rs +++ b/accounts-db/src/accounts_db.rs @@ -1942,6 +1942,7 @@ pub(crate) struct ShrinkAncientStats { pub(crate) slots_considered: AtomicU64, pub(crate) ancient_scanned: AtomicU64, pub(crate) bytes_ancient_created: AtomicU64, + pub(crate) many_ref_slots_skipped: AtomicU64, } #[derive(Debug, Default)] @@ -2231,6 +2232,11 @@ impl ShrinkAncientStats { self.bytes_ancient_created.swap(0, Ordering::Relaxed) as i64, i64 ), + ( + "many_ref_slots_skipped", + self.many_ref_slots_skipped.swap(0, Ordering::Relaxed), + i64 + ), ); } } diff --git a/accounts-db/src/ancient_append_vecs.rs b/accounts-db/src/ancient_append_vecs.rs index bd57f923637c46..2c41e246420c35 100644 --- a/accounts-db/src/ancient_append_vecs.rs +++ b/accounts-db/src/ancient_append_vecs.rs @@ -252,6 +252,15 @@ struct WriteAncientAccounts<'a> { metrics: ShrinkStatsSub, } +#[derive(Debug, PartialEq, Clone, Copy)] +/// specify what to do with slots with accounts with many refs +enum IncludeManyRefSlots { + /// include them in packing + Include, + // skip them. ie. don't include them until sufficient slots of single refs have been created + Skip, +} + impl AccountsDb { /// Combine account data from storages in 'sorted_slots' into packed storages. /// This keeps us from accumulating storages for each slot older than an epoch. @@ -336,12 +345,17 @@ impl AccountsDb { if ancient_slot_infos.all_infos.is_empty() { return; // nothing to do } - let accounts_per_storage = self + let mut accounts_per_storage = self .get_unique_accounts_from_storage_for_combining_ancient_slots( &ancient_slot_infos.all_infos[..], ); - let mut accounts_to_combine = self.calc_accounts_to_combine(&accounts_per_storage); + let mut accounts_to_combine = self.calc_accounts_to_combine( + &mut accounts_per_storage, + &tuning, + ancient_slot_infos.total_alive_bytes_shrink.0, + IncludeManyRefSlots::Skip, + ); metrics.unpackable_slots_count += accounts_to_combine.unpackable_slots_count; let mut many_refs_newest = accounts_to_combine @@ -622,13 +636,17 @@ impl AccountsDb { /// 'accounts_per_storage' should be sorted by slot fn calc_accounts_to_combine<'a>( &self, - accounts_per_storage: &'a Vec<(&'a SlotInfo, GetUniqueAccountsResult)>, + accounts_per_storage: &'a mut Vec<(&'a SlotInfo, GetUniqueAccountsResult)>, + tuning: &PackedAncientStorageTuning, + alive_bytes: u64, + mut many_ref_slots: IncludeManyRefSlots, ) -> AccountsToCombine<'a> { + // reverse sort by slot # + accounts_per_storage.sort_unstable_by(|a, b| b.0.slot.cmp(&a.0.slot)); let mut accounts_keep_slots = HashMap::default(); let len = accounts_per_storage.len(); let mut target_slots_sorted = Vec::with_capacity(len); - // reverse sort by slot # // `shrink_collect` all accounts in the append vecs we want to combine. // This also unrefs all dead accounts in those append vecs. let mut accounts_to_combine = self.thread_pool_clean.install(|| { @@ -644,13 +662,56 @@ impl AccountsDb { .collect::>() }); + // We want ceiling, so we add 1. + // 0 < alive_bytes < `ideal_storage_size`, then `min_resulting_packed_slots` = 0. + // We obviously require 1 packed slot if we have at 1 alive byte. + let min_resulting_packed_slots = + alive_bytes.saturating_sub(1) / u64::from(tuning.ideal_storage_size) + 1; let mut remove = Vec::default(); + let mut last_slot = None; for (i, (shrink_collect, (info, _unique_accounts))) in accounts_to_combine .iter_mut() .zip(accounts_per_storage.iter()) .enumerate() { + // assert that iteration is in descending slot order since the code below relies on this. + if let Some(last_slot) = last_slot { + assert!(last_slot > info.slot); + } + last_slot = Some(info.slot); + let many_refs_old_alive = &mut shrink_collect.alive_accounts.many_refs_old_alive; + if many_ref_slots == IncludeManyRefSlots::Skip + && !shrink_collect + .alive_accounts + .many_refs_this_is_newest_alive + .accounts + .is_empty() + { + let mut required_packed_slots = min_resulting_packed_slots; + if many_refs_old_alive.accounts.is_empty() { + // if THIS slot can be used as a target slot, then even if we have multi refs + // this is ok. + required_packed_slots = required_packed_slots.saturating_sub(1); + } + + if (target_slots_sorted.len() as u64) >= required_packed_slots { + // we have prepared to pack enough normal target slots, that form now on we can safely pack + // any 'many ref' slots. + many_ref_slots = IncludeManyRefSlots::Include; + } else { + // Skip this because too few valid slots have been processed so far. + // There are 'many ref newest' accounts in this slot. They must be packed into slots that are >= the current slot value. + // We require `min_resulting_packed_slots` target slots. If we have not encountered enough slots already without `many ref newest` accounts, then keep trying. + // On the next pass, THIS slot will be older relative to newly ancient slot #s, so those newly ancient slots will be higher in this list. + self.shrink_ancient_stats + .many_ref_slots_skipped + .fetch_add(1, Ordering::Relaxed); + remove.push(i); + continue; + } + } + if !many_refs_old_alive.accounts.is_empty() { many_refs_old_alive.accounts.iter().for_each(|account| { // these accounts could indicate clean bugs or low memory conditions where we are forced to flush non-roots @@ -689,7 +750,7 @@ impl AccountsDb { } let unpackable_slots_count = remove.len(); remove.into_iter().rev().for_each(|i| { - accounts_to_combine.remove(i); + self.addref_accounts_failed_to_shrink_ancient(vec![accounts_to_combine.remove(i)]); }); target_slots_sorted.sort_unstable(); AccountsToCombine { @@ -1408,7 +1469,7 @@ pub mod tests { for all_slots_shrunk in [false, true] { for num_slots in 0..3 { let (db, storages, slots, infos) = get_sample_storages(num_slots, None); - let accounts_per_storage = infos + let mut accounts_per_storage = infos .iter() .zip( storages @@ -1417,7 +1478,13 @@ pub mod tests { ) .collect::>(); - let accounts_to_combine = db.calc_accounts_to_combine(&accounts_per_storage); + let alive_bytes = 1000; + let accounts_to_combine = db.calc_accounts_to_combine( + &mut accounts_per_storage, + &default_tuning(), + alive_bytes, + IncludeManyRefSlots::Include, + ); let mut stats = ShrinkStatsSub::default(); let mut write_ancient_accounts = WriteAncientAccounts::default(); @@ -1489,151 +1556,256 @@ pub mod tests { } #[test] - fn test_calc_accounts_to_combine_simple() { + fn test_calc_accounts_to_combine_many_refs() { // n storages // 1 account each // all accounts have 1 ref or all accounts have 2 refs - for add_dead_account in [true, false] { - for method in TestWriteMultipleRefs::iter() { - for num_slots in 0..3 { - for unsorted_slots in [false, true] { - for two_refs in [false, true] { - let (db, mut storages, slots, mut infos) = - get_sample_storages(num_slots, None); - let slots_vec; - if unsorted_slots { - slots_vec = slots.rev().collect::>(); - storages = storages.into_iter().rev().collect(); - infos = infos.into_iter().rev().collect(); - } else { - slots_vec = slots.collect::>() - } + solana_logger::setup(); - let original_results = storages - .iter() - .map(|store| db.get_unique_accounts_from_storage(store)) - .collect::>(); - if two_refs { - original_results.iter().for_each(|results| { - results.stored_accounts.iter().for_each(|account| { - db.accounts_index.get_and_then(account.pubkey(), |entry| { - (false, entry.unwrap().addref()) - }); - }) - }); - } + let alive_bytes_per_slot = 2; - if add_dead_account { - storages.iter().for_each(|storage| { - let pk = solana_sdk::pubkey::new_rand(); - let alive = false; - append_single_account_with_default_hash( - storage, - &pk, - &AccountSharedData::default(), - alive, - Some(&db.accounts_index), - ); - assert!(db.accounts_index.purge_exact( - &pk, - &[storage.slot()] - .into_iter() - .collect::>(), - &mut Vec::default() - )); - }); - } - let original_results = storages - .iter() - .map(|store| db.get_unique_accounts_from_storage(store)) - .collect::>(); + // pack 2.5 ancient slots into 1 packed slot ideally + let tuning = PackedAncientStorageTuning { + ideal_storage_size: NonZeroU64::new(alive_bytes_per_slot * 2 + 1).unwrap(), + ..default_tuning() + }; + for many_ref_slots in [IncludeManyRefSlots::Skip, IncludeManyRefSlots::Include] { + for num_slots in 0..6 { + for unsorted_slots in [false, true] { + for two_refs in [false, true] { + let (db, mut storages, _slots, mut infos) = + get_sample_storages(num_slots, None); + if unsorted_slots { + storages = storages.into_iter().rev().collect(); + infos = infos.into_iter().rev().collect(); + } - let accounts_per_storage = infos - .iter() - .zip(original_results.into_iter()) - .collect::>(); + let original_results = storages + .iter() + .map(|store| db.get_unique_accounts_from_storage(store)) + .collect::>(); + if two_refs { + original_results.iter().for_each(|results| { + results.stored_accounts.iter().for_each(|account| { + db.accounts_index.get_and_then(account.pubkey(), |entry| { + (false, entry.unwrap().addref()) + }); + }) + }); + } - let accounts_to_combine = - db.calc_accounts_to_combine(&accounts_per_storage); - assert_eq!( - accounts_to_combine.accounts_to_combine.len(), - num_slots, - "method: {method:?}, num_slots: {num_slots}, two_refs: {two_refs}" - ); + let original_results = storages + .iter() + .map(|store| db.get_unique_accounts_from_storage(store)) + .collect::>(); - if add_dead_account { - assert!(!accounts_to_combine - .accounts_to_combine - .iter() - .any(|a| a.unrefed_pubkeys.is_empty())); - } - // all accounts should be in one_ref and all slots are available as target slots - assert_eq!( - accounts_to_combine.target_slots_sorted, + let mut accounts_per_storage = infos + .iter() + .zip(original_results.into_iter()) + .collect::>(); + + let alive_bytes = num_slots as u64 * alive_bytes_per_slot; + let accounts_to_combine = db.calc_accounts_to_combine( + &mut accounts_per_storage, + &tuning, + alive_bytes, + many_ref_slots, + ); + let mut expected_accounts_to_combine = num_slots; + if two_refs && many_ref_slots == IncludeManyRefSlots::Skip && num_slots > 2 + { + // We require more than 1 target slot. Since all slots have multi refs, we find no slots we can use as target slots. + // Thus, nothing can be packed. + expected_accounts_to_combine = 0; + } + (0..accounts_to_combine + .target_slots_sorted + .len() + .saturating_sub(1)) + .for_each(|i| { + let slots = &accounts_to_combine.target_slots_sorted; + assert!(slots[i] < slots[i + 1]); + }); + + log::debug!("output slots: {:?}, num_slots: {num_slots}, two_refs: {two_refs}, many_refs: {many_ref_slots:?}, expected accounts to combine: {expected_accounts_to_combine}, target slots: {:?}, accounts_to_combine: {}", accounts_to_combine.target_slots_sorted, + accounts_to_combine.target_slots_sorted, + accounts_to_combine.accounts_to_combine.len(),); + assert_eq!( + accounts_to_combine.accounts_to_combine.len(), + expected_accounts_to_combine, + "num_slots: {num_slots}, two_refs: {two_refs}, many_refs: {many_ref_slots:?}" + ); + } + } + } + } + } + + #[test] + fn test_calc_accounts_to_combine_simple() { + // n storages + // 1 account each + // all accounts have 1 ref or all accounts have 2 refs + for many_ref_slots in [IncludeManyRefSlots::Skip, IncludeManyRefSlots::Include] { + for add_dead_account in [true, false] { + for method in TestWriteMultipleRefs::iter() { + for num_slots in 0..3 { + for unsorted_slots in [false, true] { + for two_refs in [false, true] { + let (db, mut storages, slots, mut infos) = + get_sample_storages(num_slots, None); + let slots_vec; if unsorted_slots { - slots_vec.iter().cloned().rev().collect::>() + slots_vec = slots.rev().collect::>(); + storages = storages.into_iter().rev().collect(); + infos = infos.into_iter().rev().collect(); } else { - slots_vec.clone() + slots_vec = slots.collect::>() } - ); - assert!(accounts_to_combine.accounts_keep_slots.is_empty()); - assert!(accounts_to_combine.accounts_to_combine.iter().all( - |shrink_collect| shrink_collect - .alive_accounts - .many_refs_old_alive - .accounts - .is_empty() - )); - if two_refs { - assert!(accounts_to_combine.accounts_to_combine.iter().all( - |shrink_collect| shrink_collect - .alive_accounts - .one_ref - .accounts - .is_empty() - )); - assert!(accounts_to_combine.accounts_to_combine.iter().all( - |shrink_collect| !shrink_collect - .alive_accounts - .many_refs_this_is_newest_alive - .accounts - .is_empty() - )); - } else { - assert!(accounts_to_combine.accounts_to_combine.iter().all( - |shrink_collect| !shrink_collect - .alive_accounts - .one_ref - .accounts - .is_empty() - )); + + let original_results = storages + .iter() + .map(|store| db.get_unique_accounts_from_storage(store)) + .collect::>(); + if two_refs { + original_results.iter().for_each(|results| { + results.stored_accounts.iter().for_each(|account| { + db.accounts_index + .get_and_then(account.pubkey(), |entry| { + (false, entry.unwrap().addref()) + }); + }) + }); + } + + if add_dead_account { + storages.iter().for_each(|storage| { + let pk = solana_sdk::pubkey::new_rand(); + let alive = false; + append_single_account_with_default_hash( + storage, + &pk, + &AccountSharedData::default(), + alive, + Some(&db.accounts_index), + ); + assert!(db.accounts_index.purge_exact( + &pk, + &[storage.slot()] + .into_iter() + .collect::>(), + &mut Vec::default() + )); + }); + } + let original_results = storages + .iter() + .map(|store| db.get_unique_accounts_from_storage(store)) + .collect::>(); + + let mut accounts_per_storage = infos + .iter() + .zip(original_results.into_iter()) + .collect::>(); + + let alive_bytes = num_slots; + let accounts_to_combine = db.calc_accounts_to_combine( + &mut accounts_per_storage, + &default_tuning(), + alive_bytes as u64, + many_ref_slots, + ); + assert_eq!( + accounts_to_combine.accounts_to_combine.len(), + // if we are only trying to pack a single slot of multi-refs, it will succeed + if !two_refs || many_ref_slots == IncludeManyRefSlots::Include || num_slots == 1 {num_slots} else {0}, + "method: {method:?}, num_slots: {num_slots}, two_refs: {two_refs}, many_refs: {many_ref_slots:?}" + ); + + if add_dead_account { + assert!(!accounts_to_combine + .accounts_to_combine + .iter() + .any(|a| a.unrefed_pubkeys.is_empty())); + } + // all accounts should be in one_ref and all slots are available as target slots + assert_eq!( + accounts_to_combine.target_slots_sorted, + if !two_refs + || many_ref_slots == IncludeManyRefSlots::Include + || num_slots == 1 + { + if unsorted_slots { + slots_vec.iter().cloned().rev().collect::>() + } else { + slots_vec.clone() + } + } else { + vec![] + }, + ); + assert!(accounts_to_combine.accounts_keep_slots.is_empty()); assert!(accounts_to_combine.accounts_to_combine.iter().all( |shrink_collect| shrink_collect .alive_accounts - .many_refs_this_is_newest_alive + .many_refs_old_alive .accounts .is_empty() )); - } - - // test write_ancient_accounts_to_same_slot_multiple_refs since we built interesting 'AccountsToCombine' - let write_ancient_accounts = match method { - TestWriteMultipleRefs::MultipleRefs => { - let mut write_ancient_accounts = - WriteAncientAccounts::default(); - db.write_ancient_accounts_to_same_slot_multiple_refs( - accounts_to_combine.accounts_keep_slots.values(), - &mut write_ancient_accounts, - ); - write_ancient_accounts - } - TestWriteMultipleRefs::PackedStorages => { - let packed_contents = Vec::default(); - db.write_packed_storages(&accounts_to_combine, packed_contents) + if two_refs { + assert!(accounts_to_combine.accounts_to_combine.iter().all( + |shrink_collect| shrink_collect + .alive_accounts + .one_ref + .accounts + .is_empty() + )); + assert!(accounts_to_combine.accounts_to_combine.iter().all( + |shrink_collect| !shrink_collect + .alive_accounts + .many_refs_this_is_newest_alive + .accounts + .is_empty() + )); + } else { + assert!(accounts_to_combine.accounts_to_combine.iter().all( + |shrink_collect| !shrink_collect + .alive_accounts + .one_ref + .accounts + .is_empty() + )); + assert!(accounts_to_combine.accounts_to_combine.iter().all( + |shrink_collect| shrink_collect + .alive_accounts + .many_refs_this_is_newest_alive + .accounts + .is_empty() + )); } - }; - assert!(write_ancient_accounts.shrinks_in_progress.is_empty()); + // test write_ancient_accounts_to_same_slot_multiple_refs since we built interesting 'AccountsToCombine' + let write_ancient_accounts = match method { + TestWriteMultipleRefs::MultipleRefs => { + let mut write_ancient_accounts = + WriteAncientAccounts::default(); + db.write_ancient_accounts_to_same_slot_multiple_refs( + accounts_to_combine.accounts_keep_slots.values(), + &mut write_ancient_accounts, + ); + write_ancient_accounts + } + TestWriteMultipleRefs::PackedStorages => { + let packed_contents = Vec::default(); + db.write_packed_storages( + &accounts_to_combine, + packed_contents, + ) + } + }; + + assert!(write_ancient_accounts.shrinks_in_progress.is_empty()); + } } } } @@ -1698,12 +1870,18 @@ pub mod tests { .map(|store| db.get_unique_accounts_from_storage(store)) .collect::>(); assert_eq!(original_results.first().unwrap().stored_accounts.len(), 2); - let accounts_per_storage = infos + let mut accounts_per_storage = infos .iter() .zip(original_results.into_iter()) .collect::>(); - let accounts_to_combine = db.calc_accounts_to_combine(&accounts_per_storage); + let alive_bytes = 1000; // just something + let accounts_to_combine = db.calc_accounts_to_combine( + &mut accounts_per_storage, + &default_tuning(), + alive_bytes, + IncludeManyRefSlots::Include, + ); let slots_vec = slots.collect::>(); assert_eq!(accounts_to_combine.accounts_to_combine.len(), num_slots); // all accounts should be in many_refs @@ -1835,6 +2013,7 @@ pub mod tests { #[test] fn test_calc_accounts_to_combine_opposite() { + solana_logger::setup(); // 1 storage // 2 accounts // 1 with 1 ref @@ -1881,12 +2060,18 @@ pub mod tests { .map(|store| db.get_unique_accounts_from_storage(store)) .collect::>(); assert_eq!(original_results.first().unwrap().stored_accounts.len(), 2); - let accounts_per_storage = infos + let mut accounts_per_storage = infos .iter() .zip(original_results.into_iter()) .collect::>(); - let accounts_to_combine = db.calc_accounts_to_combine(&accounts_per_storage); + let alive_bytes = 0; // just something + let accounts_to_combine = db.calc_accounts_to_combine( + &mut accounts_per_storage, + &default_tuning(), + alive_bytes, + IncludeManyRefSlots::Include, + ); let slots_vec = slots.collect::>(); assert_eq!(accounts_to_combine.accounts_to_combine.len(), num_slots); // all accounts should be in many_refs_this_is_newest_alive @@ -2545,6 +2730,7 @@ pub mod tests { #[test] fn test_truncate_to_max_storages() { + solana_logger::setup(); for filter in [false, true] { let ideal_storage_size_large = get_ancient_append_vec_capacity(); let mut infos = create_test_infos(1);