diff --git a/accounts-db/src/accounts_db.rs b/accounts-db/src/accounts_db.rs index cf4d17745b1b73..f57714ab93f0ee 100644 --- a/accounts-db/src/accounts_db.rs +++ b/accounts-db/src/accounts_db.rs @@ -486,7 +486,7 @@ pub(crate) struct ShrinkCollect<'a, T: ShrinkCollectRefs<'a>> { pub(crate) all_are_zero_lamports: bool, /// index entries that need to be held in memory while shrink is in progress /// These aren't read - they are just held so that entries cannot be flushed. - pub(crate) _index_entries_being_shrunk: Vec>, + pub(crate) index_entries_being_shrunk: Vec>, } pub const ACCOUNTS_DB_CONFIG_FOR_TESTING: AccountsDbConfig = AccountsDbConfig { @@ -3750,6 +3750,7 @@ impl AccountsDb { accounts: &'a [StoredAccountMeta<'a>], stats: &ShrinkStats, slot_to_shrink: Slot, + bin: Option, ) -> LoadAccountsIndexForShrink<'a, T> { let count = accounts.len(); let mut alive_accounts = T::with_capacity(count, slot_to_shrink); @@ -3757,15 +3758,29 @@ impl AccountsDb { let mut alive = 0; let mut dead = 0; - let mut index = 0; + let index = AtomicUsize::default(); let mut all_are_zero_lamports = true; let mut index_entries_being_shrunk = Vec::with_capacity(accounts.len()); + let bin_calc = &self.accounts_index.bin_calculator; self.accounts_index.scan( - accounts.iter().map(|account| account.pubkey()), + accounts.iter().filter_map(|account| { + let pk = account.pubkey(); + if let Some(bin) = bin { + if bin_calc.bin_from_pubkey(pk) == bin { + Some(pk) + } else { + // skipped this one + index.fetch_add(1, Ordering::Relaxed); + None + } + } else { + Some(pk) + } + }), |pubkey, slots_refs, entry| { let mut result = AccountsIndexScanResult::OnlyKeepInMemoryIfDirty; if let Some((slot_list, ref_count)) = slots_refs { - let stored_account = &accounts[index]; + let stored_account = &accounts[index.fetch_add(1, Ordering::Relaxed)]; let is_alive = slot_list.iter().any(|(slot, _acct_info)| { // if the accounts index contains an entry at this slot, then the append vec we're asking about contains this item and thus, it is alive at this slot *slot == slot_to_shrink @@ -3787,13 +3802,12 @@ impl AccountsDb { alive += 1; } } - index += 1; result }, None, true, ); - assert_eq!(index, std::cmp::min(accounts.len(), count)); + // assert_eq!(index, std::cmp::min(accounts.len(), count)); stats.alive_accounts.fetch_add(alive, Ordering::Relaxed); stats.dead_accounts.fetch_add(dead, Ordering::Relaxed); @@ -3847,6 +3861,7 @@ impl AccountsDb { store: &'a Arc, unique_accounts: &'b GetUniqueAccountsResult<'b>, stats: &ShrinkStats, + bin: Option, ) -> ShrinkCollect<'b, T> { let slot = store.slot(); @@ -3874,7 +3889,7 @@ impl AccountsDb { mut unrefed_pubkeys, all_are_zero_lamports, mut index_entries_being_shrunk, - } = self.load_accounts_index_for_shrink(stored_accounts, stats, slot); + } = self.load_accounts_index_for_shrink(stored_accounts, stats, slot, bin); // collect alive_accounts_collect @@ -3926,7 +3941,7 @@ impl AccountsDb { alive_total_bytes, total_starting_accounts: len, all_are_zero_lamports: all_are_zero_lamports_collect.into_inner().unwrap(), - _index_entries_being_shrunk: index_entries_being_shrunk_outer.into_inner().unwrap(), + index_entries_being_shrunk: index_entries_being_shrunk_outer.into_inner().unwrap(), } } @@ -3983,8 +3998,12 @@ impl AccountsDb { let unique_accounts = self.get_unique_accounts_from_storage_for_shrink(store, &self.shrink_stats); debug!("do_shrink_slot_store: slot: {}", slot); - let shrink_collect = - self.shrink_collect::>(store, &unique_accounts, &self.shrink_stats); + let shrink_collect = self.shrink_collect::>( + store, + &unique_accounts, + &self.shrink_stats, + None, + ); // This shouldn't happen if alive_bytes/approx_stored_count are accurate if Self::should_not_shrink( @@ -4539,6 +4558,7 @@ impl AccountsDb { old_storage, &unique_accounts, &self.shrink_ancient_stats.shrink_stats, + None, ); // could follow what shrink does more closely @@ -16980,6 +17000,7 @@ pub mod tests { &storage, &unique_accounts, &ShrinkStats::default(), + None, ); let expect_single_opposite_alive_account = if append_opposite_alive_account { diff --git a/accounts-db/src/ancient_append_vecs.rs b/accounts-db/src/ancient_append_vecs.rs index 1ebcc77763ae27..287efd764aae23 100644 --- a/accounts-db/src/ancient_append_vecs.rs +++ b/accounts-db/src/ancient_append_vecs.rs @@ -611,18 +611,89 @@ impl AccountsDb { // `shrink_collect` all accounts in the append vecs we want to combine. // This also unrefs all dead accounts in those append vecs. - let mut accounts_to_combine = self.thread_pool_clean.install(|| { - accounts_per_storage - .par_iter() - .map(|(info, unique_accounts)| { - self.shrink_collect::>( - &info.storage, - unique_accounts, - &self.shrink_ancient_stats.shrink_stats, - ) - }) - .collect::>() - }); + let mut accounts_to_combine = + self.thread_pool_clean.install(|| { + let mut result = Vec::default(); + for bin in 0..self.accounts_index.bins() { + let this_bin = accounts_per_storage + .par_iter() + .map(|(info, unique_accounts)| { + self.shrink_collect::>( + &info.storage, + unique_accounts, + &self.shrink_ancient_stats.shrink_stats, + Some(bin), + ) + }) + .collect::>(); + if result.is_empty() { + result = this_bin; + } else { + // accumulate results from each bin. We need 1 entry per slot + this_bin.into_iter().zip(result.iter_mut()).for_each( + |(mut this_bin, all_bins)| { + all_bins.alive_accounts.many_refs_old_alive.accounts.append( + &mut this_bin.alive_accounts.many_refs_old_alive.accounts, + ); + all_bins.alive_accounts.many_refs_old_alive.bytes += + this_bin.alive_accounts.many_refs_old_alive.bytes; + assert_eq!( + all_bins.alive_accounts.many_refs_old_alive.slot, + this_bin.alive_accounts.many_refs_old_alive.slot + ); + + all_bins + .alive_accounts + .many_refs_this_is_newest_alive + .accounts + .append( + &mut this_bin + .alive_accounts + .many_refs_this_is_newest_alive + .accounts, + ); + all_bins.alive_accounts.many_refs_this_is_newest_alive.bytes += + this_bin.alive_accounts.many_refs_this_is_newest_alive.bytes; + assert_eq!( + all_bins.alive_accounts.many_refs_this_is_newest_alive.slot, + this_bin.alive_accounts.many_refs_this_is_newest_alive.slot + ); + + all_bins + .alive_accounts + .one_ref + .accounts + .append(&mut this_bin.alive_accounts.one_ref.accounts); + all_bins.alive_accounts.one_ref.bytes += + this_bin.alive_accounts.one_ref.bytes; + all_bins.alive_total_bytes += this_bin.alive_total_bytes; + assert_eq!( + all_bins.alive_accounts.one_ref.slot, + this_bin.alive_accounts.one_ref.slot + ); + + // slot, capacity, total_starting_accounts: same for all bins + assert_eq!(all_bins.slot, this_bin.slot); + assert_eq!(all_bins.capacity, this_bin.capacity); + assert_eq!( + all_bins.total_starting_accounts, + this_bin.total_starting_accounts + ); + + all_bins.all_are_zero_lamports = all_bins.all_are_zero_lamports + && this_bin.all_are_zero_lamports; + all_bins + .unrefed_pubkeys + .append(&mut this_bin.unrefed_pubkeys); + all_bins + .index_entries_being_shrunk + .append(&mut this_bin.index_entries_being_shrunk); + }, + ); + } + } + result + }); let mut remove = Vec::default(); for (i, (shrink_collect, (info, _unique_accounts))) in accounts_to_combine @@ -3301,7 +3372,7 @@ pub mod tests { alive_total_bytes: 0, total_starting_accounts: 0, all_are_zero_lamports: false, - _index_entries_being_shrunk: Vec::default(), + index_entries_being_shrunk: Vec::default(), }; let accounts_to_combine = AccountsToCombine { accounts_keep_slots: HashMap::default(),