Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

unref accounts in shink and pack when we're committed #2806

Merged
merged 14 commits into from
Sep 4, 2024
71 changes: 37 additions & 34 deletions accounts-db/src/accounts_db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -524,8 +524,9 @@ pub type BinnedHashData = Vec<Vec<CalculateHashIntermediate>>;
struct LoadAccountsIndexForShrink<'a, T: ShrinkCollectRefs<'a>> {
/// all alive accounts
alive_accounts: T,
/// pubkeys that were unref'd in the accounts index because they were dead
unrefed_pubkeys: Vec<&'a Pubkey>,
/// pubkeys that are going to be unref'd in the accounts index after we are
/// done with shrinking, because they are dead
pubkeys_to_unref: Vec<&'a Pubkey>,
/// pubkeys that are the last remaining zero lamport instance of an account
zero_lamport_single_ref_pubkeys: Vec<&'a Pubkey>,
/// true if all alive accounts are zero lamport accounts
Expand Down Expand Up @@ -3954,7 +3955,7 @@ impl AccountsDb {
) -> LoadAccountsIndexForShrink<'a, T> {
let count = accounts.len();
let mut alive_accounts = T::with_capacity(count, slot_to_shrink);
let mut unrefed_pubkeys = Vec::with_capacity(count);
let mut pubkeys_to_unref = Vec::with_capacity(count);
let mut zero_lamport_single_ref_pubkeys = Vec::with_capacity(count);

let mut alive = 0;
Expand All @@ -3967,7 +3968,6 @@ impl AccountsDb {
self.accounts_index.scan(
accounts.iter().map(|account| account.pubkey()),
|pubkey, slots_refs, _entry| {
let mut result = AccountsIndexScanResult::OnlyKeepInMemoryIfDirty;
let stored_account = &accounts[index];
let mut do_populate_accounts_for_shrink = |ref_count, slot_list| {
if stored_account.is_zero_lamport()
Expand Down Expand Up @@ -4003,8 +4003,7 @@ impl AccountsDb {
// It would have had a ref to the storage from the initial store, but it will
// not exist in the re-written slot. Unref it to keep the index consistent with
// rewriting the storage entries.
unrefed_pubkeys.push(pubkey);
result = AccountsIndexScanResult::Unref;
brooksprumo marked this conversation as resolved.
Show resolved Hide resolved
pubkeys_to_unref.push(pubkey);
dead += 1;
} else {
do_populate_accounts_for_shrink(ref_count, slot_list);
Expand All @@ -4021,7 +4020,7 @@ impl AccountsDb {
do_populate_accounts_for_shrink(ref_count, &slot_list);
}
index += 1;
result
AccountsIndexScanResult::OnlyKeepInMemoryIfDirty
},
None,
false,
Expand All @@ -4039,7 +4038,7 @@ impl AccountsDb {

LoadAccountsIndexForShrink {
alive_accounts,
unrefed_pubkeys,
pubkeys_to_unref,
zero_lamport_single_ref_pubkeys,
all_are_zero_lamports,
}
Expand Down Expand Up @@ -4159,7 +4158,7 @@ impl AccountsDb {
.for_each(|stored_accounts| {
let LoadAccountsIndexForShrink {
alive_accounts,
mut unrefed_pubkeys,
pubkeys_to_unref: mut unrefed_pubkeys,

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Here's another unrefed_pubkeys that can be renamed. (And the others in this function.)

It looks like ShrinkCollect has an unrefed_pubkeys field too. That can be renamed. IMO a subsequent PR is fine too.

all_are_zero_lamports,
mut zero_lamport_single_ref_pubkeys,
} = self.load_accounts_index_for_shrink(stored_accounts, stats, slot);
Expand Down Expand Up @@ -4309,6 +4308,33 @@ impl AccountsDb {
.fetch_add(time.as_us(), Ordering::Relaxed);
}

pub(crate) fn unref_shrunk_dead_accounts<'a>(
&self,
pubkeys: impl Iterator<Item = &'a Pubkey>,
slot: Slot,
) {
self.accounts_index.scan(
pubkeys,
|pubkey, slot_refs, _entry| {
if slot_refs.is_none() {
// We also expect that the accounts index must contain an
// entry for `pubkey`. Log a warning for now. In future,
// we will panic when this happens.
warn!("pubkey {pubkey} in slot {slot} was NOT found in accounts index during shrink");
datapoint_warn!(
"accounts_db-shink_pubkey_missing_from_index",
("store_slot", slot, i64),
("pubkey", pubkey.to_string(), String),
)
}
AccountsIndexScanResult::Unref
},
None,
false,
ScanFilter::All,
);
}

fn do_shrink_slot_store(&self, slot: Slot, store: &Arc<AccountStorageEntry>) {
if self.accounts_cache.contains(slot) {
// It is not correct to shrink a slot while it is in the write cache until flush is complete and the slot is removed from the write cache.
Expand Down Expand Up @@ -4350,34 +4376,11 @@ impl AccountsDb {
self.shrink_stats
.skipped_shrink
.fetch_add(1, Ordering::Relaxed);

self.accounts_index.scan(
shrink_collect.unrefed_pubkeys.into_iter(),
|pubkey, _slot_refs, entry| {
// pubkeys in `unrefed_pubkeys` were unref'd in `shrink_collect` above under the assumption that we would shrink everything.
// Since shrink is not occurring, we need to addref the pubkeys to get the system back to the prior state since the account still exists at this slot.
if let Some(entry) = entry {
entry.addref();
} else {
// We also expect that the accounts index must contain an
// entry for `pubkey`. Log a warning for now. In future,
// we will panic when this happens.
warn!("pubkey {pubkey} in slot {slot} was NOT found in accounts index during shrink");
datapoint_warn!(
"accounts_db-shink_pubkey_missing_from_index",
("store_slot", slot, i64),
("pubkey", pubkey.to_string(), String),
)
}
AccountsIndexScanResult::OnlyKeepInMemoryIfDirty
},
None,
true,
ScanFilter::All,
);
return;
}

self.unref_shrunk_dead_accounts(shrink_collect.unrefed_pubkeys.iter().cloned(), slot);
brooksprumo marked this conversation as resolved.
Show resolved Hide resolved

let total_accounts_after_shrink = shrink_collect.alive_accounts.len();
debug!(
"shrinking: slot: {}, accounts: ({} => {}) bytes: {} original: {}",
Expand Down
109 changes: 52 additions & 57 deletions accounts-db/src/ancient_append_vecs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,11 @@ use {
ShrinkCollectAliveSeparatedByRefs, ShrinkStatsSub,
},
accounts_file::AccountsFile,
accounts_index::{AccountsIndexScanResult, ScanFilter},
active_stats::ActiveStatItem,
storable_accounts::{StorableAccounts, StorableAccountsBySlot},
},
rand::{thread_rng, Rng},
rayon::prelude::{IntoParallelIterator, IntoParallelRefIterator, ParallelIterator},
rayon::prelude::{IntoParallelRefIterator, ParallelIterator},
solana_measure::measure_us,
solana_sdk::clock::Slot,
std::{
Expand Down Expand Up @@ -440,7 +439,6 @@ impl AccountsDb {
accounts_to_combine.target_slots_sorted.last(),
many_refs_newest.last().map(|accounts| accounts.slot)
);
self.addref_accounts_failed_to_shrink_ancient(accounts_to_combine.accounts_to_combine);
return;
}

Expand Down Expand Up @@ -468,12 +466,19 @@ impl AccountsDb {

if pack.len() > accounts_to_combine.target_slots_sorted.len() {
// Not enough slots to contain the accounts we are trying to pack.
// `shrink_collect` previously unref'd some accounts. We need to addref them
// to restore the correct state since we failed to combine anything.
self.addref_accounts_failed_to_shrink_ancient(accounts_to_combine.accounts_to_combine);
return;
}

accounts_to_combine
.accounts_to_combine
.iter()
.for_each(|combine| {
self.unref_shrunk_dead_accounts(
combine.unrefed_pubkeys.iter().cloned(),
combine.slot,
);
});

let write_ancient_accounts = self.write_packed_storages(&accounts_to_combine, pack);

self.finish_combine_ancient_slots_packed_internal(
Expand All @@ -483,29 +488,6 @@ impl AccountsDb {
);
}

/// for each account in `unrefed_pubkeys`, in each `accounts_to_combine`, addref
fn addref_accounts_failed_to_shrink_ancient<'a>(
&self,
accounts_to_combine: Vec<ShrinkCollect<'a, ShrinkCollectAliveSeparatedByRefs<'a>>>,
) {
self.thread_pool_clean.install(|| {
accounts_to_combine.into_par_iter().for_each(|combine| {
self.accounts_index.scan(
combine.unrefed_pubkeys.into_iter(),
|_pubkey, _slots_refs, entry| {
if let Some(entry) = entry {
entry.addref();
}
AccountsIndexScanResult::OnlyKeepInMemoryIfDirty
},
None,
true,
ScanFilter::All,
);
});
});
}

/// calculate all storage info for the storages in slots
/// Then, apply 'tuning' to filter out slots we do NOT want to combine.
fn collect_sort_filter_ancient_slots(
Expand Down Expand Up @@ -755,8 +737,12 @@ impl AccountsDb {
let mut target_slots_sorted = Vec::with_capacity(len);

// `shrink_collect` all accounts in the append vecs we want to combine.
// This also unrefs all dead accounts in those append vecs.
// This needs to serially iterate largest to smallest slot so that we unref older dead slots after we have visited the newer alive slots.
// We are no longer doing eager unref in shrink_collect. Therefore, we will no longer need to iter them serially?
// There is a subtle difference for zero lamport accounts, which can lead to having more multi-refs than before?
// Consider account X in both slot x, and x+1 and x+2.
// With eager unref, we will only collect `one_ref`` X at slot x+2 after shrink.
// Without eager unref, we will collect X at `multi-ref` after shrink.
// Packing multi-ref is less efficient than `one_ref``. But it might be ok - in next round of clean, hopefully, it can turn this from multi-ref into one-ref.
let mut accounts_to_combine = accounts_per_storage
.iter()
.map(|(info, unique_accounts)| {
Expand Down Expand Up @@ -858,9 +844,6 @@ impl AccountsDb {
}
}
let unpackable_slots_count = remove.len();
remove.into_iter().rev().for_each(|i| {
self.addref_accounts_failed_to_shrink_ancient(vec![accounts_to_combine.remove(i)]);
});
target_slots_sorted.sort_unstable();
self.shrink_ancient_stats
.slots_cannot_move_count
Expand Down Expand Up @@ -1203,7 +1186,7 @@ pub mod tests {
},
accounts_file::StorageAccess,
accounts_hash::AccountHash,
accounts_index::UpsertReclaim,
accounts_index::{AccountsIndexScanResult, ScanFilter, UpsertReclaim},
append_vec::{
aligned_stored_size, AppendVec, AppendVecStoredAccountMeta,
MAXIMUM_APPEND_VEC_FILE_SIZE,
Expand Down Expand Up @@ -1765,13 +1748,7 @@ pub mod tests {
&tuning,
many_ref_slots,
);
let mut expected_accounts_to_combine = num_slots;
if two_refs && many_ref_slots == IncludeManyRefSlots::Skip && num_slots > 2
{
// We require more than 1 target slot. Since all slots have multi refs, we find no slots we can use as target slots.
// Thus, nothing can be packed.
expected_accounts_to_combine = 0;
}
let expected_accounts_to_combine = num_slots;
(0..accounts_to_combine
.target_slots_sorted
.len()
Expand Down Expand Up @@ -1874,7 +1851,7 @@ pub mod tests {
assert_eq!(
accounts_to_combine.accounts_to_combine.len(),
// if we are only trying to pack a single slot of multi-refs, it will succeed
if !two_refs || many_ref_slots == IncludeManyRefSlots::Include || num_slots == 1 {num_slots} else {0},
if !two_refs || many_ref_slots == IncludeManyRefSlots::Include || num_slots == 1 || num_slots == 2 {num_slots} else {0},
"method: {method:?}, num_slots: {num_slots}, two_refs: {two_refs}, many_refs: {many_ref_slots:?}"
);

Expand Down Expand Up @@ -3833,15 +3810,16 @@ pub mod tests {
}

#[test]
fn test_addref_accounts_failed_to_shrink_ancient() {
fn test_shrink_ancient_expected_unref() {
let db = AccountsDb::new_single_for_tests();
let empty_account = AccountSharedData::default();
for count in 0..3 {
let unrefed_pubkeys = (0..count)
.map(|_| solana_sdk::pubkey::new_rand())
.collect::<Vec<_>>();
// how many of `many_ref_accounts` should be found in the index with ref_count=1
let mut expected_ref_counts = HashMap::<Pubkey, u64>::default();
let mut expected_ref_counts_before_unref = HashMap::<Pubkey, u64>::default();
let mut expected_ref_counts_after_unref = HashMap::<Pubkey, u64>::default();

unrefed_pubkeys.iter().for_each(|k| {
for slot in 0..2 {
Expand All @@ -3857,8 +3835,8 @@ pub mod tests {
UpsertReclaim::IgnoreReclaims,
);
}
// set to 2 initially, made part of `unrefed_pubkeys`, expect it to be addref'd to 3
expected_ref_counts.insert(*k, 3);
expected_ref_counts_before_unref.insert(*k, 2);
expected_ref_counts_after_unref.insert(*k, 1);
});

let shrink_collect = ShrinkCollect::<ShrinkCollectAliveSeparatedByRefs> {
Expand All @@ -3878,25 +3856,42 @@ pub mod tests {
total_starting_accounts: 0,
all_are_zero_lamports: false,
};
let accounts_to_combine = AccountsToCombine {
accounts_keep_slots: HashMap::default(),
accounts_to_combine: vec![shrink_collect],
target_slots_sorted: Vec::default(),
unpackable_slots_count: 0,
};
db.addref_accounts_failed_to_shrink_ancient(accounts_to_combine.accounts_to_combine);

// Assert ref_counts before unref.
db.accounts_index.scan(
shrink_collect.unrefed_pubkeys.iter().cloned(),
|k, slot_refs, _entry| {
assert_eq!(
expected_ref_counts_before_unref.remove(k).unwrap(),
slot_refs.unwrap().1
);
AccountsIndexScanResult::OnlyKeepInMemoryIfDirty
},
None,
false,
ScanFilter::All,
);
assert!(expected_ref_counts_before_unref.is_empty());

// unref ref_counts
db.unref_shrunk_dead_accounts(shrink_collect.unrefed_pubkeys.iter().cloned(), 0);

// Assert ref_counts after unref
db.accounts_index.scan(
unrefed_pubkeys.iter(),
shrink_collect.unrefed_pubkeys.iter().cloned(),
|k, slot_refs, _entry| {
assert_eq!(expected_ref_counts.remove(k).unwrap(), slot_refs.unwrap().1);
assert_eq!(
expected_ref_counts_after_unref.remove(k).unwrap(),
slot_refs.unwrap().1
);
AccountsIndexScanResult::OnlyKeepInMemoryIfDirty
},
None,
false,
ScanFilter::All,
);
// should have removed all of them
assert!(expected_ref_counts.is_empty());
assert!(expected_ref_counts_after_unref.is_empty());
}
}

Expand Down