Skip to content

Commit

Permalink
don't populate uncleaned_roots
Browse files Browse the repository at this point in the history
inline reduce
  • Loading branch information
HaoranYi committed Dec 31, 2024
1 parent 3d43824 commit 273d9da
Show file tree
Hide file tree
Showing 3 changed files with 71 additions and 30 deletions.
31 changes: 8 additions & 23 deletions accounts-db/src/accounts_db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8566,7 +8566,8 @@ impl AccountsDb {
// a given pubkey. If there is just a single item, there is no cleaning to
// be done on that pubkey. Use only those pubkeys with multiple updates.
if !dirty_pubkeys.is_empty() {
self.uncleaned_pubkeys.insert(slot, dirty_pubkeys);
let old = self.uncleaned_pubkeys.insert(slot, dirty_pubkeys);
assert!(old.is_none());
}
SlotIndexGenerationInfo {
insert_time_us,
Expand Down Expand Up @@ -8841,24 +8842,13 @@ impl AccountsDb {
struct DuplicatePubkeysVisitedInfo {
accounts_data_len_from_duplicates: u64,
num_duplicate_accounts: u64,
uncleaned_roots: IntSet<Slot>,
duplicates_lt_hash: Option<Box<DuplicatesLtHash>>,
}
impl DuplicatePubkeysVisitedInfo {
fn reduce(mut a: Self, mut b: Self) -> Self {
if a.uncleaned_roots.len() >= b.uncleaned_roots.len() {
a.merge(b);
a
} else {
b.merge(a);
b
}
}
fn merge(&mut self, other: Self) {
fn reduce(mut self, other: Self) -> Self {
self.accounts_data_len_from_duplicates +=
other.accounts_data_len_from_duplicates;
self.num_duplicate_accounts += other.num_duplicate_accounts;
self.uncleaned_roots.extend(other.uncleaned_roots);

match (
self.duplicates_lt_hash.is_some(),
Expand All @@ -8871,16 +8861,20 @@ impl AccountsDb {
.unwrap()
.0
.mix_in(&other.duplicates_lt_hash.as_ref().unwrap().0);
self
}
(true, false) => {
// nothing to do; `other` doesn't have a duplicates lt hash
self
}
(false, true) => {
// `self` doesn't have a duplicates lt hash, so pilfer from `other`
self.duplicates_lt_hash = other.duplicates_lt_hash;
self
}
(false, false) => {
// nothing to do; no duplicates lt hash at all
self
}
}
}
Expand All @@ -8900,7 +8894,6 @@ impl AccountsDb {
let DuplicatePubkeysVisitedInfo {
accounts_data_len_from_duplicates,
num_duplicate_accounts,
uncleaned_roots,
duplicates_lt_hash,
} = unique_pubkeys_by_bin
.par_iter()
Expand All @@ -8913,7 +8906,6 @@ impl AccountsDb {
let (
accounts_data_len_from_duplicates,
accounts_duplicates_num,
uncleaned_roots,
duplicates_lt_hash,
) = self.visit_duplicate_pubkeys_during_startup(
pubkeys,
Expand All @@ -8924,7 +8916,6 @@ impl AccountsDb {
let intermediate = DuplicatePubkeysVisitedInfo {
accounts_data_len_from_duplicates,
num_duplicate_accounts: accounts_duplicates_num,
uncleaned_roots,
duplicates_lt_hash,
};
DuplicatePubkeysVisitedInfo::reduce(accum, intermediate)
Expand All @@ -8942,11 +8933,8 @@ impl AccountsDb {
);
accounts_data_len_dedup_timer.stop();
timings.accounts_data_len_dedup_time_us = accounts_data_len_dedup_timer.as_us();
timings.slots_to_clean = uncleaned_roots.len() as u64;
timings.num_duplicate_accounts = num_duplicate_accounts;

self.accounts_index
.add_uncleaned_roots(uncleaned_roots.into_iter());
accounts_data_len.fetch_sub(accounts_data_len_from_duplicates, Ordering::Relaxed);
if let Some(duplicates_lt_hash) = duplicates_lt_hash {
let old_val = outer_duplicates_lt_hash.replace(duplicates_lt_hash);
Expand Down Expand Up @@ -9069,10 +9057,9 @@ impl AccountsDb {
rent_collector: &RentCollector,
timings: &GenerateIndexTimings,
should_calculate_duplicates_lt_hash: bool,
) -> (u64, u64, IntSet<Slot>, Option<Box<DuplicatesLtHash>>) {
) -> (u64, u64, Option<Box<DuplicatesLtHash>>) {
let mut accounts_data_len_from_duplicates = 0;
let mut num_duplicate_accounts = 0_u64;
let mut uncleaned_slots = IntSet::default();
let mut duplicates_lt_hash =
should_calculate_duplicates_lt_hash.then(|| Box::new(DuplicatesLtHash::default()));
let mut removed_rent_paying = 0;
Expand All @@ -9090,7 +9077,6 @@ impl AccountsDb {
// the slot where duplicate accounts are found in the index need to be in 'uncleaned_slots' list, too.
let max = slot_list.iter().map(|(slot, _)| slot).max().unwrap();
slot_list.iter().for_each(|(slot, account_info)| {
uncleaned_slots.insert(*slot);
if slot == max {
// the info in 'max' is the most recent, current info for this pubkey
return;
Expand Down Expand Up @@ -9148,7 +9134,6 @@ impl AccountsDb {
(
accounts_data_len_from_duplicates as u64,
num_duplicate_accounts,
uncleaned_slots,
duplicates_lt_hash,
)
}
Expand Down
19 changes: 15 additions & 4 deletions accounts-db/src/accounts_index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1735,7 +1735,7 @@ impl<T: IndexValue, U: DiskIndexValue + From<T> + Into<T>> AccountsIndex<T, U> {
.map(|_| Vec::with_capacity(expected_items_per_bin))
.collect::<Vec<_>>();
let mut count = 0;
let mut dirty_pubkeys = items
let dirty_pubkeys = items
.filter_map(|(pubkey, account_info)| {
let pubkey_bin = self.bin_calculator.bin_from_pubkey(&pubkey);
// this value is equivalent to what update() below would have created if we inserted a new item
Expand Down Expand Up @@ -1775,6 +1775,7 @@ impl<T: IndexValue, U: DiskIndexValue + From<T> + Into<T>> AccountsIndex<T, U> {
} else {
// not using disk buckets, so just write to in-mem
// this is no longer the default case
let mut duplicates_from_in_memory = vec![];
items
.into_iter()
.for_each(|(pubkey, (slot, account_info))| {
Expand All @@ -1789,11 +1790,17 @@ impl<T: IndexValue, U: DiskIndexValue + From<T> + Into<T>> AccountsIndex<T, U> {
{
InsertNewEntryResults::DidNotExist => {}
InsertNewEntryResults::ExistedNewEntryZeroLamports => {}
InsertNewEntryResults::ExistedNewEntryNonZeroLamports => {
dirty_pubkeys.push(pubkey);
InsertNewEntryResults::ExistedNewEntryNonZeroLamports(other_slot) => {
if let Some(other_slot) = other_slot {
duplicates_from_in_memory.push((other_slot, pubkey));
}
duplicates_from_in_memory.push((slot, pubkey));
}
}
});

r_account_maps
.startup_update_duplicates_from_in_memory_only(duplicates_from_in_memory);
}
insert_time.stop();
insertion_time.fetch_add(insert_time.as_us(), Ordering::Relaxed);
Expand All @@ -1818,7 +1825,11 @@ impl<T: IndexValue, U: DiskIndexValue + From<T> + Into<T>> AccountsIndex<T, U> {
.into_par_iter()
.map(|pubkey_bin| {
let r_account_maps = &self.account_maps[pubkey_bin];
r_account_maps.populate_and_retrieve_duplicate_keys_from_startup()
if self.storage.storage.is_disk_index_enabled() {
r_account_maps.populate_and_retrieve_duplicate_keys_from_startup()
} else {
r_account_maps.startup_take_duplicates_from_in_memory_only()
}
})
.for_each(f);
}
Expand Down
51 changes: 48 additions & 3 deletions accounts-db/src/accounts_index/in_mem_accounts_index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ impl<T: IndexValue, U: DiskIndexValue + From<T> + Into<T>> Debug for InMemAccoun
pub enum InsertNewEntryResults {
DidNotExist,
ExistedNewEntryZeroLamports,
ExistedNewEntryNonZeroLamports,
ExistedNewEntryNonZeroLamports(Option<Slot>),
}

#[derive(Default, Debug)]
Expand All @@ -145,6 +145,11 @@ struct StartupInfoDuplicates<T: IndexValue> {
duplicates: Vec<(Slot, Pubkey, T)>,
/// pubkeys that were already added to disk and later found to be duplicates,
duplicates_put_on_disk: HashSet<(Slot, Pubkey)>,

/// (slot, pubkey) pairs that are found to be duplicates when we are
/// starting from in-memory only index. This field is used only when disk
/// index is disabled.
duplicates_from_in_memory_only: Vec<(Slot, Pubkey)>,
}

#[derive(Default, Debug)]
Expand Down Expand Up @@ -727,6 +732,14 @@ impl<T: IndexValue, U: DiskIndexValue + From<T> + Into<T>> InMemAccountsIndex<T,
.fetch_add(m.end_as_us(), Ordering::Relaxed);
}

pub fn startup_update_duplicates_from_in_memory_only(&self, items: Vec<(Slot, Pubkey)>) {
assert!(self.storage.get_startup());
assert!(self.bucket.is_none());

let mut duplicates = self.startup_info.duplicates.lock().unwrap();
duplicates.duplicates_from_in_memory_only.extend(items);
}

pub fn insert_new_entry_if_missing_with_lock(
&self,
pubkey: Pubkey,
Expand All @@ -737,17 +750,44 @@ impl<T: IndexValue, U: DiskIndexValue + From<T> + Into<T>> InMemAccountsIndex<T,
let entry = map.entry(pubkey);
m.stop();
let new_entry_zero_lamports = new_entry.is_zero_lamport();
let mut other_slot = None;
let (found_in_mem, already_existed) = match entry {
Entry::Occupied(occupied) => {
// in cache, so merge into cache
let (slot, account_info) = new_entry.into();
InMemAccountsIndex::<T, U>::lock_and_update_slot_list(

let slot_list = occupied.get().slot_list.read().unwrap();

// If there is only one entry in the slot list, it means that
// the previous entry inserted was a duplicate, which should be
// added to the duplicates list too. Note that we only need to do
// this for slot_list.len() == 1. For slot_list.len() > 1, the
// items, previously inserted into the slot_list, have already
// been added. We don't need to add them again.
if slot_list.len() == 1 {
other_slot = Some(slot_list[0].0);
}
drop(slot_list);

let updated_slot_list_len = InMemAccountsIndex::<T, U>::lock_and_update_slot_list(
occupied.get(),
(slot, account_info),
None, // should be None because we don't expect a different slot # during index generation
&mut Vec::default(),
UpsertReclaim::IgnoreReclaims,
);

// In case of a race condition, multiple threads try to insert
// to the same pubkey with different slots. We only need to
// record `other_slot` once. If the slot list length after
// update is not 2, it means that someone else has already
// recorded `other_slot` before us. Therefore, We don't need to
// record it again.
if updated_slot_list_len != 2 {
// clear `other_slot` if we don't win the race.
other_slot = None;
}

(
true, /* found in mem */
true, /* already existed */
Expand Down Expand Up @@ -796,7 +836,7 @@ impl<T: IndexValue, U: DiskIndexValue + From<T> + Into<T>> InMemAccountsIndex<T,
} else if new_entry_zero_lamports {
InsertNewEntryResults::ExistedNewEntryZeroLamports
} else {
InsertNewEntryResults::ExistedNewEntryNonZeroLamports
InsertNewEntryResults::ExistedNewEntryNonZeroLamports(other_slot)
}
}

Expand Down Expand Up @@ -1147,6 +1187,11 @@ impl<T: IndexValue, U: DiskIndexValue + From<T> + Into<T>> InMemAccountsIndex<T,
.collect()
}

pub fn startup_take_duplicates_from_in_memory_only(&self) -> Vec<(Slot, Pubkey)> {
let mut duplicates = self.startup_info.duplicates.lock().unwrap();
std::mem::take(&mut duplicates.duplicates_from_in_memory_only)
}

/// synchronize the in-mem index with the disk index
fn flush_internal(&self, flush_guard: &FlushGuard, can_advance_age: bool) {
let current_age = self.storage.current_age();
Expand Down

0 comments on commit 273d9da

Please sign in to comment.