diff --git a/accounts-db/src/accounts_db.rs b/accounts-db/src/accounts_db.rs index 3e246f08820054..7bd7d90fb1195a 100644 --- a/accounts-db/src/accounts_db.rs +++ b/accounts-db/src/accounts_db.rs @@ -8566,7 +8566,8 @@ impl AccountsDb { // a given pubkey. If there is just a single item, there is no cleaning to // be done on that pubkey. Use only those pubkeys with multiple updates. if !dirty_pubkeys.is_empty() { - self.uncleaned_pubkeys.insert(slot, dirty_pubkeys); + let old = self.uncleaned_pubkeys.insert(slot, dirty_pubkeys); + assert!(old.is_none()); } SlotIndexGenerationInfo { insert_time_us, diff --git a/accounts-db/src/accounts_index.rs b/accounts-db/src/accounts_index.rs index 2aae2d80a21553..1fe7470bb59d66 100644 --- a/accounts-db/src/accounts_index.rs +++ b/accounts-db/src/accounts_index.rs @@ -1735,7 +1735,7 @@ impl + Into> AccountsIndex { .map(|_| Vec::with_capacity(expected_items_per_bin)) .collect::>(); let mut count = 0; - let mut dirty_pubkeys = items + let dirty_pubkeys = items .filter_map(|(pubkey, account_info)| { let pubkey_bin = self.bin_calculator.bin_from_pubkey(&pubkey); // this value is equivalent to what update() below would have created if we inserted a new item @@ -1775,6 +1775,7 @@ impl + Into> AccountsIndex { } else { // not using disk buckets, so just write to in-mem // this is no longer the default case + let mut duplicates_from_in_memory = vec![]; items .into_iter() .for_each(|(pubkey, (slot, account_info))| { @@ -1789,11 +1790,17 @@ impl + Into> AccountsIndex { { InsertNewEntryResults::DidNotExist => {} InsertNewEntryResults::ExistedNewEntryZeroLamports => {} - InsertNewEntryResults::ExistedNewEntryNonZeroLamports => { - dirty_pubkeys.push(pubkey); + InsertNewEntryResults::ExistedNewEntryNonZeroLamports(other_slot) => { + if let Some(other_slot) = other_slot { + duplicates_from_in_memory.push((other_slot, pubkey)); + } + duplicates_from_in_memory.push((slot, pubkey)); } } }); + + r_account_maps + .startup_update_duplicates_from_in_memory_only(duplicates_from_in_memory); } insert_time.stop(); insertion_time.fetch_add(insert_time.as_us(), Ordering::Relaxed); @@ -1818,7 +1825,11 @@ impl + Into> AccountsIndex { .into_par_iter() .map(|pubkey_bin| { let r_account_maps = &self.account_maps[pubkey_bin]; - r_account_maps.populate_and_retrieve_duplicate_keys_from_startup() + if self.storage.storage.is_disk_index_enabled() { + r_account_maps.populate_and_retrieve_duplicate_keys_from_startup() + } else { + r_account_maps.startup_take_duplicates_from_in_memory_only() + } }) .for_each(f); } diff --git a/accounts-db/src/accounts_index/in_mem_accounts_index.rs b/accounts-db/src/accounts_index/in_mem_accounts_index.rs index 330611e82f641b..5bd40a695ff327 100644 --- a/accounts-db/src/accounts_index/in_mem_accounts_index.rs +++ b/accounts-db/src/accounts_index/in_mem_accounts_index.rs @@ -135,7 +135,7 @@ impl + Into> Debug for InMemAccoun pub enum InsertNewEntryResults { DidNotExist, ExistedNewEntryZeroLamports, - ExistedNewEntryNonZeroLamports, + ExistedNewEntryNonZeroLamports(Option), } #[derive(Default, Debug)] @@ -145,6 +145,11 @@ struct StartupInfoDuplicates { duplicates: Vec<(Slot, Pubkey, T)>, /// pubkeys that were already added to disk and later found to be duplicates, duplicates_put_on_disk: HashSet<(Slot, Pubkey)>, + + /// (slot, pubkey) pairs that are found to be duplicates when we are + /// starting from in-memory only index. This field is used only when disk + /// index is disabled. + duplicates_from_in_memory_only: Vec<(Slot, Pubkey)>, } #[derive(Default, Debug)] @@ -727,6 +732,14 @@ impl + Into> InMemAccountsIndex) { + assert!(self.storage.get_startup()); + assert!(self.bucket.is_none()); + + let mut duplicates = self.startup_info.duplicates.lock().unwrap(); + duplicates.duplicates_from_in_memory_only.extend(items); + } + pub fn insert_new_entry_if_missing_with_lock( &self, pubkey: Pubkey, @@ -737,17 +750,44 @@ impl + Into> InMemAccountsIndex { // in cache, so merge into cache let (slot, account_info) = new_entry.into(); - InMemAccountsIndex::::lock_and_update_slot_list( + + let slot_list = occupied.get().slot_list.read().unwrap(); + + // If there is only one entry in the slot list, it means that + // the previous entry inserted was a duplicate, which should be + // added to the duplicates list too. Note that we only need to do + // this for slot_list.len() == 1. For slot_list.len() > 1, the + // items, previously inserted into the slot_list, have already + // been added. We don't need to add them again. + if slot_list.len() == 1 { + other_slot = Some(slot_list[0].0); + } + drop(slot_list); + + let updated_slot_list_len = InMemAccountsIndex::::lock_and_update_slot_list( occupied.get(), (slot, account_info), None, // should be None because we don't expect a different slot # during index generation &mut Vec::default(), UpsertReclaim::IgnoreReclaims, ); + + // In case of a race condition, multiple threads try to insert + // to the same pubkey with different slots. We only need to + // record `other_slot` once. If the slot list length after + // update is not 2, it means that someone else has already + // recorded `other_slot` before us. Therefore, We don't need to + // record it again. + if updated_slot_list_len != 2 { + // clear `other_slot` if we don't win the race. + other_slot = None; + } + ( true, /* found in mem */ true, /* already existed */ @@ -796,7 +836,7 @@ impl + Into> InMemAccountsIndex + Into> InMemAccountsIndex Vec<(Slot, Pubkey)> { + let mut duplicates = self.startup_info.duplicates.lock().unwrap(); + std::mem::take(&mut duplicates.duplicates_from_in_memory_only) + } + /// synchronize the in-mem index with the disk index fn flush_internal(&self, flush_guard: &FlushGuard, can_advance_age: bool) { let current_age = self.storage.current_age();