From 0a57a475c1270e089770bad2641a59fa997daf1e Mon Sep 17 00:00:00 2001 From: HaoranYi Date: Sun, 8 Dec 2024 17:11:35 +0000 Subject: [PATCH] fix in-memory startup indexgen --- accounts-db/src/accounts_db.rs | 3 +- accounts-db/src/accounts_index.rs | 42 ++++++++++++++----- .../accounts_index/in_mem_accounts_index.rs | 38 ++++++++++++++++- 3 files changed, 69 insertions(+), 14 deletions(-) diff --git a/accounts-db/src/accounts_db.rs b/accounts-db/src/accounts_db.rs index 26923d5e05a224..bad351b570fe0a 100644 --- a/accounts-db/src/accounts_db.rs +++ b/accounts-db/src/accounts_db.rs @@ -8558,7 +8558,8 @@ impl AccountsDb { // a given pubkey. If there is just a single item, there is no cleaning to // be done on that pubkey. Use only those pubkeys with multiple updates. if !dirty_pubkeys.is_empty() { - self.uncleaned_pubkeys.insert(slot, dirty_pubkeys); + let old = self.uncleaned_pubkeys.insert(slot, dirty_pubkeys); + assert!(old.is_none()); } SlotIndexGenerationInfo { insert_time_us, diff --git a/accounts-db/src/accounts_index.rs b/accounts-db/src/accounts_index.rs index f90168aeeccd59..76a71556715e0e 100644 --- a/accounts-db/src/accounts_index.rs +++ b/accounts-db/src/accounts_index.rs @@ -1713,6 +1713,10 @@ impl + Into> AccountsIndex { duplicates } + pub fn with_disk(&self) -> bool { + self.storage.storage.disk.is_some() + } + // Same functionally to upsert, but: // 1. operates on a batch of items // 2. holds the write lock for the duration of adding the items @@ -1730,12 +1734,12 @@ impl + Into> AccountsIndex { // this assumes the largest bin contains twice the expected amount of the average size per bin let bins = self.bins(); let expected_items_per_bin = approx_items_len * 2 / bins; - let use_disk = self.storage.storage.is_disk_index_enabled(); + let use_disk = self.with_disk(); let mut binned = (0..bins) .map(|_| Vec::with_capacity(expected_items_per_bin)) .collect::>(); let mut count = 0; - let mut dirty_pubkeys = items + let dirty_pubkeys = items .filter_map(|(pubkey, account_info)| { let pubkey_bin = self.bin_calculator.bin_from_pubkey(&pubkey); // this value is equivalent to what update() below would have created if we inserted a new item @@ -1775,6 +1779,7 @@ impl + Into> AccountsIndex { } else { // not using disk buckets, so just write to in-mem // this is no longer the default case + let mut dup_pubkeys = vec![]; items .into_iter() .for_each(|(pubkey, (slot, account_info))| { @@ -1789,11 +1794,16 @@ impl + Into> AccountsIndex { { InsertNewEntryResults::DidNotExist => {} InsertNewEntryResults::ExistedNewEntryZeroLamports => {} - InsertNewEntryResults::ExistedNewEntryNonZeroLamports => { - dirty_pubkeys.push(pubkey); + InsertNewEntryResults::ExistedNewEntryNonZeroLamports(other_slot) => { + if let Some(other_slot) = other_slot { + dup_pubkeys.push((other_slot, pubkey)); + } + dup_pubkeys.push((slot, pubkey)); } } }); + + r_account_maps.update_duplicates_from_in_memory_only_startup(dup_pubkeys); } insert_time.stop(); insertion_time.fetch_add(insert_time.as_us(), Ordering::Relaxed); @@ -1814,13 +1824,23 @@ impl + Into> AccountsIndex { &self, f: impl Fn(Vec<(Slot, Pubkey)>) + Sync + Send, ) { - (0..self.bins()) - .into_par_iter() - .map(|pubkey_bin| { - let r_account_maps = &self.account_maps[pubkey_bin]; - r_account_maps.populate_and_retrieve_duplicate_keys_from_startup() - }) - .for_each(f); + if self.with_disk() { + (0..self.bins()) + .into_par_iter() + .map(|pubkey_bin| { + let r_account_maps = &self.account_maps[pubkey_bin]; + r_account_maps.populate_and_retrieve_duplicate_keys_from_startup() + }) + .for_each(f); + } else { + (0..self.bins()) + .into_par_iter() + .map(|pubkey_bin| { + let r_account_maps = &self.account_maps[pubkey_bin]; + r_account_maps.get_duplicates_from_in_memory_only_startup() + }) + .for_each(f); + } } /// Updates the given pubkey at the given slot with the new account information. diff --git a/accounts-db/src/accounts_index/in_mem_accounts_index.rs b/accounts-db/src/accounts_index/in_mem_accounts_index.rs index 0e0c5db9aa4afc..89ac4d94d38461 100644 --- a/accounts-db/src/accounts_index/in_mem_accounts_index.rs +++ b/accounts-db/src/accounts_index/in_mem_accounts_index.rs @@ -135,7 +135,7 @@ impl + Into> Debug for InMemAccoun pub enum InsertNewEntryResults { DidNotExist, ExistedNewEntryZeroLamports, - ExistedNewEntryNonZeroLamports, + ExistedNewEntryNonZeroLamports(Option), } #[derive(Default, Debug)] @@ -153,6 +153,10 @@ struct StartupInfo + Into> { insert: Mutex>, /// pubkeys with more than 1 entry duplicates: Mutex>, + + /// (slot, pubkey) pairs that are duplicates when we are starting from in-memory only index. + /// And this field is only populated and used when we are building the in-memory only index. + duplicate_from_in_memory_only: Mutex>, } #[derive(Default, Debug)] @@ -727,6 +731,19 @@ impl + Into> InMemAccountsIndex) { + assert!(self.storage.get_startup()); + assert!(self.bucket.is_none()); + + let mut duplicates = self + .startup_info + .duplicate_from_in_memory_only + .lock() + .unwrap(); + + duplicates.extend(items); + } + pub fn insert_new_entry_if_missing_with_lock( &self, pubkey: Pubkey, @@ -737,10 +754,18 @@ impl + Into> InMemAccountsIndex { // in cache, so merge into cache let (slot, account_info) = new_entry.into(); + + let slot_list = occupied.get().slot_list.read().unwrap(); + if slot_list.len() == 1 { + other_slot = Some(slot_list[0].0); + } + drop(slot_list); + InMemAccountsIndex::::lock_and_update_slot_list( occupied.get(), (slot, account_info), @@ -796,7 +821,7 @@ impl + Into> InMemAccountsIndex + Into> InMemAccountsIndex Vec<(Slot, Pubkey)> { + let mut duplicates = self + .startup_info + .duplicate_from_in_memory_only + .lock() + .unwrap(); + std::mem::take(&mut *duplicates) + } + /// synchronize the in-mem index with the disk index fn flush_internal(&self, flush_guard: &FlushGuard, can_advance_age: bool) { let current_age = self.storage.current_age();