Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Replace purges_old_accounts by a boolean field in CleaningInfo #2566

Merged
merged 6 commits into from
Aug 14, 2024
Merged
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
260 changes: 132 additions & 128 deletions accounts-db/src/accounts_db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1349,6 +1349,8 @@ impl StoreAccountsTiming {
struct CleaningInfo {
slot_list: SlotList<AccountInfo>,
ref_count: u64,
/// True for pubkeys mapping to older versions of accounts that should be purged.
should_purge: bool,
}

/// This is the return type of AccountsDb::construct_candidate_clean_keys.
Expand Down Expand Up @@ -2684,41 +2686,52 @@ impl AccountsDb {
/// These should NOT be unref'd later from the accounts index.
fn clean_accounts_older_than_root(
&self,
purges: Vec<Pubkey>,
candidates: &[RwLock<HashMap<Pubkey, CleaningInfo>>],
max_clean_root_inclusive: Option<Slot>,
ancient_account_cleans: &AtomicU64,
epoch_schedule: &EpochSchedule,
) -> (ReclaimResult, PubkeysRemovedFromAccountsIndex) {
let pubkeys_removed_from_accounts_index = HashSet::default();
if purges.is_empty() {
// return early if there are no old accounts to purge
if !candidates
Copy link

@jeffwashington jeffwashington Aug 13, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this could be very expensive and I think will scan all bins every time. I'd prefer to see a bool set at the caller that knows if there were any should_purge accounts. You could set a local atomic in the index scan in the calling fn if any account is set to should_purge. That could be passed to this fn or this fn could just assume that there is at least one should_purge entry if it is called with candidates. This first check is really just a cya to make sure that we know the behavior if there are no should_purge accounts. So, we could probably get rid of it in any case if we are careful of what the resulting behavior would be. I acknowledge that this code is a correct drop in replacement for purges before.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I moved the check to the caller and used an atomic bool to record whether any purgeable candidate exists.

.iter()
.map(|candidate_bin| {
candidate_bin
.read()
.unwrap()
.iter()
.any(|(_, v)| v.should_purge)
})
.reduce(|acc, e| acc || e)
.unwrap()
{
return (
ReclaimResult::default(),
pubkeys_removed_from_accounts_index,
);
}
// This number isn't carefully chosen; just guessed randomly such that
// the hot loop will be the order of ~Xms.
const INDEX_CLEAN_BULK_COUNT: usize = 4096;

let one_epoch_old = self.get_oldest_non_ancient_slot(epoch_schedule);
let pubkeys_removed_from_accounts_index = Mutex::new(pubkeys_removed_from_accounts_index);

let mut clean_rooted = Measure::start("clean_old_root-ms");
let reclaim_vecs = purges
.par_chunks(INDEX_CLEAN_BULK_COUNT)
.filter_map(|pubkeys: &[Pubkey]| {
let reclaim_vecs = candidates
.par_iter()
.filter_map(|candidates_bin| {
Comment on lines +2699 to +2701

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

:chefs-kiss:

let mut reclaims = Vec::new();
for pubkey in pubkeys {
let removed_from_index = self.accounts_index.clean_rooted_entries(
pubkey,
&mut reclaims,
max_clean_root_inclusive,
);
if removed_from_index {
pubkeys_removed_from_accounts_index
.lock()
.unwrap()
.insert(*pubkey);
for (pubkey, cleaning_info) in candidates_bin.read().unwrap().iter() {
if cleaning_info.should_purge {
let removed_from_index = self.accounts_index.clean_rooted_entries(
pubkey,
&mut reclaims,
max_clean_root_inclusive,
);
if removed_from_index {
pubkeys_removed_from_accounts_index
.lock()
.unwrap()
.insert(*pubkey);
}
}
}

Expand Down Expand Up @@ -2791,6 +2804,7 @@ impl AccountsDb {
CleaningInfo {
slot_list,
ref_count,
should_purge: _,
},
) in bin.iter().filter(|x| !x.1.slot_list.is_empty())
{
Expand Down Expand Up @@ -3286,128 +3300,112 @@ impl AccountsDb {
let useful_accum = AtomicU64::new(0);

// parallel scan the index.
let purges_old_accounts = {
let do_clean_scan = || {
candidates
.par_iter()
.map(|candidates_bin| {
let mut purges_old_accounts = Vec::new();
let mut found_not_zero = 0;
let mut not_found_on_fork = 0;
let mut missing = 0;
let mut useful = 0;
let mut candidates_bin = candidates_bin.write().unwrap();
// Iterate over each HashMap entry to
// avoid capturing the HashMap in the
// closure passed to scan thus making
// conflicting read and write borrows.
candidates_bin
.iter_mut()
.for_each(|(candidate_pubkey, candidate_info)| {
self.accounts_index.scan(
[*candidate_pubkey].iter(),
|candidate_pubkey, slot_list_and_ref_count, _entry| {
let mut useless = true;
if let Some((slot_list, ref_count)) =
slot_list_and_ref_count
{
// find the highest rooted slot in the slot list
let index_in_slot_list =
self.accounts_index.latest_slot(
None,
slot_list,
max_clean_root_inclusive,
);
let do_clean_scan = || {
candidates.par_iter().for_each(|candidates_bin| {
let mut found_not_zero = 0;
let mut not_found_on_fork = 0;
let mut missing = 0;
let mut useful = 0;
let mut candidates_bin = candidates_bin.write().unwrap();
// Iterate over each HashMap entry to
// avoid capturing the HashMap in the
// closure passed to scan thus making
// conflicting read and write borrows.
candidates_bin
.iter_mut()
.for_each(|(candidate_pubkey, candidate_info)| {
self.accounts_index.scan(
[*candidate_pubkey].iter(),
|_candidate_pubkey, slot_list_and_ref_count, _entry| {
let mut useless = true;
if let Some((slot_list, ref_count)) = slot_list_and_ref_count {
// find the highest rooted slot in the slot list
let index_in_slot_list = self.accounts_index.latest_slot(
None,
slot_list,
max_clean_root_inclusive,
);

match index_in_slot_list {
Some(index_in_slot_list) => {
// found info relative to max_clean_root
let (slot, account_info) =
&slot_list[index_in_slot_list];
if account_info.is_zero_lamport() {
useless = false;
// The latest one is zero lamports. We may be able to purge it.
// Add all the rooted entries that contain this pubkey.
// We know the highest rooted entry is zero lamports.
candidate_info.slot_list =
self.accounts_index.get_rooted_entries(
slot_list,
max_clean_root_inclusive,
);
candidate_info.ref_count = ref_count;
} else {
found_not_zero += 1;
}
if uncleaned_roots.contains(slot) {
// Assertion enforced by `accounts_index.get()`, the latest slot
// will not be greater than the given `max_clean_root`
if let Some(max_clean_root_inclusive) =
max_clean_root_inclusive
{
assert!(
slot <= &max_clean_root_inclusive
);
}
if slot_list.len() > 1 {
// no need to purge old accounts if there is only 1 slot in the slot list
purges_old_accounts
.push(*candidate_pubkey);
useless = false;
} else {
self.clean_accounts_stats
.uncleaned_roots_slot_list_1
.fetch_add(1, Ordering::Relaxed);
}
}
match index_in_slot_list {
Some(index_in_slot_list) => {
// found info relative to max_clean_root
let (slot, account_info) =
&slot_list[index_in_slot_list];
if account_info.is_zero_lamport() {
useless = false;
// The latest one is zero lamports. We may be able to purge it.
// Add all the rooted entries that contain this pubkey.
// We know the highest rooted entry is zero lamports.
candidate_info.slot_list =
self.accounts_index.get_rooted_entries(
slot_list,
max_clean_root_inclusive,
);
candidate_info.ref_count = ref_count;
} else {
found_not_zero += 1;
}
if uncleaned_roots.contains(slot) {
// Assertion enforced by `accounts_index.get()`, the latest slot
// will not be greater than the given `max_clean_root`
if let Some(max_clean_root_inclusive) =
max_clean_root_inclusive
{
assert!(slot <= &max_clean_root_inclusive);
}
None => {
// This pubkey is in the index but not in a root slot, so clean
// it up by adding it to the to-be-purged list.
//
// Also, this pubkey must have been touched by some slot since
// it was in the dirty list, so we assume that the slot it was
// touched in must be unrooted.
not_found_on_fork += 1;
if slot_list.len() > 1 {
// no need to purge old accounts if there is only 1 slot in the slot list
candidate_info.should_purge = true;
useless = false;
purges_old_accounts.push(*candidate_pubkey);
} else {
self.clean_accounts_stats
.uncleaned_roots_slot_list_1
.fetch_add(1, Ordering::Relaxed);
}
}
} else {
missing += 1;
}
if !useless {
useful += 1;
None => {
// This pubkey is in the index but not in a root slot, so clean
// it up by adding it to the to-be-purged list.
//
// Also, this pubkey must have been touched by some slot since
// it was in the dirty list, so we assume that the slot it was
// touched in must be unrooted.
not_found_on_fork += 1;
candidate_info.should_purge = true;
useless = false;
}
AccountsIndexScanResult::OnlyKeepInMemoryIfDirty
},
None,
false,
);
});
found_not_zero_accum.fetch_add(found_not_zero, Ordering::Relaxed);
not_found_on_fork_accum.fetch_add(not_found_on_fork, Ordering::Relaxed);
missing_accum.fetch_add(missing, Ordering::Relaxed);
useful_accum.fetch_add(useful, Ordering::Relaxed);
purges_old_accounts
})
.reduce(Vec::new, |mut a, b| {
// Collapse down the vecs into one.
a.extend(b);
a
})
};
if is_startup {
do_clean_scan()
} else {
self.thread_pool_clean.install(do_clean_scan)
}
}
} else {
missing += 1;
}
if !useless {
useful += 1;
}
AccountsIndexScanResult::OnlyKeepInMemoryIfDirty
},
None,
false,
);
});
found_not_zero_accum.fetch_add(found_not_zero, Ordering::Relaxed);
not_found_on_fork_accum.fetch_add(not_found_on_fork, Ordering::Relaxed);
missing_accum.fetch_add(missing, Ordering::Relaxed);
useful_accum.fetch_add(useful, Ordering::Relaxed);
})
};
if is_startup {
do_clean_scan();
} else {
self.thread_pool_clean.install(do_clean_scan);
}

accounts_scan.stop();

let mut clean_old_rooted = Measure::start("clean_old_roots");
let ((purged_account_slots, removed_accounts), mut pubkeys_removed_from_accounts_index) =
self.clean_accounts_older_than_root(
purges_old_accounts,
&candidates,
max_clean_root_inclusive,
&ancient_account_cleans,
epoch_schedule,
Expand All @@ -3427,6 +3425,7 @@ impl AccountsDb {
CleaningInfo {
slot_list,
ref_count,
should_purge: _,
},
) in candidates_bin.write().unwrap().iter_mut()
{
Expand Down Expand Up @@ -3508,6 +3507,7 @@ impl AccountsDb {
let CleaningInfo {
slot_list,
ref_count: _,
should_purge: _,
} = cleaning_info;
(!slot_list.is_empty()).then_some((
*pubkey,
Expand Down Expand Up @@ -3784,6 +3784,7 @@ impl AccountsDb {
let CleaningInfo {
slot_list,
ref_count: _,
should_purge: _,
} = cleaning_info;
if slot_list.is_empty() {
return false;
Expand Down Expand Up @@ -12817,6 +12818,7 @@ pub mod tests {
CleaningInfo {
slot_list: rooted_entries,
ref_count,
should_purge: false,
},
);
}
Expand All @@ -12827,6 +12829,7 @@ pub mod tests {
CleaningInfo {
slot_list: list,
ref_count,
should_purge: _,
},
) in candidates_bin.iter()
{
Expand Down Expand Up @@ -15131,6 +15134,7 @@ pub mod tests {
CleaningInfo {
slot_list: vec![(slot, account_info)],
ref_count: 1,
should_purge: false,
},
);
let accounts_db = AccountsDb::new_single_for_tests();
Expand Down