Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix accounts index generation at startup when we don't use disk index #4018

Open
wants to merge 11 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion accounts-db/src/accounts_db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8566,7 +8566,8 @@ impl AccountsDb {
// a given pubkey. If there is just a single item, there is no cleaning to
// be done on that pubkey. Use only those pubkeys with multiple updates.
if !dirty_pubkeys.is_empty() {
self.uncleaned_pubkeys.insert(slot, dirty_pubkeys);
let old = self.uncleaned_pubkeys.insert(slot, dirty_pubkeys);
assert!(old.is_none());
brooksprumo marked this conversation as resolved.
Show resolved Hide resolved
}
SlotIndexGenerationInfo {
insert_time_us,
Expand Down
19 changes: 15 additions & 4 deletions accounts-db/src/accounts_index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1735,7 +1735,7 @@ impl<T: IndexValue, U: DiskIndexValue + From<T> + Into<T>> AccountsIndex<T, U> {
.map(|_| Vec::with_capacity(expected_items_per_bin))
.collect::<Vec<_>>();
let mut count = 0;
let mut dirty_pubkeys = items
let dirty_pubkeys = items
.filter_map(|(pubkey, account_info)| {
let pubkey_bin = self.bin_calculator.bin_from_pubkey(&pubkey);
// this value is equivalent to what update() below would have created if we inserted a new item
Expand Down Expand Up @@ -1775,6 +1775,7 @@ impl<T: IndexValue, U: DiskIndexValue + From<T> + Into<T>> AccountsIndex<T, U> {
} else {
// not using disk buckets, so just write to in-mem
// this is no longer the default case
let mut duplicates_from_in_memory = vec![];
items
.into_iter()
.for_each(|(pubkey, (slot, account_info))| {
Expand All @@ -1789,11 +1790,17 @@ impl<T: IndexValue, U: DiskIndexValue + From<T> + Into<T>> AccountsIndex<T, U> {
{
InsertNewEntryResults::DidNotExist => {}
InsertNewEntryResults::ExistedNewEntryZeroLamports => {}
InsertNewEntryResults::ExistedNewEntryNonZeroLamports => {
dirty_pubkeys.push(pubkey);
InsertNewEntryResults::ExistedNewEntryNonZeroLamports(other_slot) => {
if let Some(other_slot) = other_slot {
duplicates_from_in_memory.push((other_slot, pubkey));
}
duplicates_from_in_memory.push((slot, pubkey));
brooksprumo marked this conversation as resolved.
Show resolved Hide resolved
}
}
});

r_account_maps
.startup_update_duplicates_from_in_memory_only(duplicates_from_in_memory);
}
insert_time.stop();
insertion_time.fetch_add(insert_time.as_us(), Ordering::Relaxed);
Expand All @@ -1818,7 +1825,11 @@ impl<T: IndexValue, U: DiskIndexValue + From<T> + Into<T>> AccountsIndex<T, U> {
.into_par_iter()
.map(|pubkey_bin| {
let r_account_maps = &self.account_maps[pubkey_bin];
r_account_maps.populate_and_retrieve_duplicate_keys_from_startup()
if self.storage.storage.is_disk_index_enabled() {
r_account_maps.populate_and_retrieve_duplicate_keys_from_startup()
} else {
r_account_maps.startup_take_duplicates_from_in_memory_only()
}
})
.for_each(f);
}
Expand Down
51 changes: 48 additions & 3 deletions accounts-db/src/accounts_index/in_mem_accounts_index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ impl<T: IndexValue, U: DiskIndexValue + From<T> + Into<T>> Debug for InMemAccoun
pub enum InsertNewEntryResults {
DidNotExist,
ExistedNewEntryZeroLamports,
ExistedNewEntryNonZeroLamports,
ExistedNewEntryNonZeroLamports(Option<Slot>),
brooksprumo marked this conversation as resolved.
Show resolved Hide resolved
}

#[derive(Default, Debug)]
Expand All @@ -145,6 +145,11 @@ struct StartupInfoDuplicates<T: IndexValue> {
duplicates: Vec<(Slot, Pubkey, T)>,
/// pubkeys that were already added to disk and later found to be duplicates,
duplicates_put_on_disk: HashSet<(Slot, Pubkey)>,

/// (slot, pubkey) pairs that are found to be duplicates when we are
/// starting from in-memory only index. This field is used only when disk
/// index is disabled.
duplicates_from_in_memory_only: Vec<(Slot, Pubkey)>,
}

#[derive(Default, Debug)]
Expand Down Expand Up @@ -727,6 +732,14 @@ impl<T: IndexValue, U: DiskIndexValue + From<T> + Into<T>> InMemAccountsIndex<T,
.fetch_add(m.end_as_us(), Ordering::Relaxed);
}

pub fn startup_update_duplicates_from_in_memory_only(&self, items: Vec<(Slot, Pubkey)>) {
assert!(self.storage.get_startup());
assert!(self.bucket.is_none());

let mut duplicates = self.startup_info.duplicates.lock().unwrap();
duplicates.duplicates_from_in_memory_only.extend(items);
}

pub fn insert_new_entry_if_missing_with_lock(
&self,
pubkey: Pubkey,
Expand All @@ -737,17 +750,44 @@ impl<T: IndexValue, U: DiskIndexValue + From<T> + Into<T>> InMemAccountsIndex<T,
let entry = map.entry(pubkey);
m.stop();
let new_entry_zero_lamports = new_entry.is_zero_lamport();
let mut other_slot = None;
let (found_in_mem, already_existed) = match entry {
Entry::Occupied(occupied) => {
// in cache, so merge into cache
let (slot, account_info) = new_entry.into();
InMemAccountsIndex::<T, U>::lock_and_update_slot_list(

let slot_list = occupied.get().slot_list.read().unwrap();

// If there is only one entry in the slot list, it means that
// the previous entry inserted was a duplicate, which should be
// added to the duplicates list too. Note that we only need to do
// this for slot_list.len() == 1. For slot_list.len() > 1, the
// items, previously inserted into the slot_list, have already
// been added. We don't need to add them again.
if slot_list.len() == 1 {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

there is a race here. 2 competing insert threads here could both find slot_list.len() = 1. Then, they serilialize access with the write lock on lock_and_update_slot_list so that one makes len 2, the next 3. I don't think this is bad in this case, but it is a race condition. lock_and_update_slot_list returns slot list len after insertion using write lock. We could set other_slot to None if lock_and_update_slot_list returns != 2, which means someone else won the race.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah. fixed in ef3e873.

other_slot = Some(slot_list[0].0);
}
drop(slot_list);

let updated_slot_list_len = InMemAccountsIndex::<T, U>::lock_and_update_slot_list(
occupied.get(),
(slot, account_info),
None, // should be None because we don't expect a different slot # during index generation
&mut Vec::default(),
UpsertReclaim::IgnoreReclaims,
);

// In case of a race condition, multiple threads try to insert
// to the same pubkey with different slots. We only need to
// record `other_slot` once. If the slot list length after
// update is not 2, it means that someone else has already
// recorded `other_slot` before us. Therefore, We don't need to
// record it again.
if updated_slot_list_len != 2 {
// clear `other_slot` if we don't win the race.
other_slot = None;
}

(
true, /* found in mem */
true, /* already existed */
Expand Down Expand Up @@ -796,7 +836,7 @@ impl<T: IndexValue, U: DiskIndexValue + From<T> + Into<T>> InMemAccountsIndex<T,
} else if new_entry_zero_lamports {
InsertNewEntryResults::ExistedNewEntryZeroLamports
} else {
InsertNewEntryResults::ExistedNewEntryNonZeroLamports
InsertNewEntryResults::ExistedNewEntryNonZeroLamports(other_slot)
}
}

Expand Down Expand Up @@ -1147,6 +1187,11 @@ impl<T: IndexValue, U: DiskIndexValue + From<T> + Into<T>> InMemAccountsIndex<T,
.collect()
}

pub fn startup_take_duplicates_from_in_memory_only(&self) -> Vec<(Slot, Pubkey)> {
let mut duplicates = self.startup_info.duplicates.lock().unwrap();
std::mem::take(&mut duplicates.duplicates_from_in_memory_only)
}

/// synchronize the in-mem index with the disk index
fn flush_internal(&self, flush_guard: &FlushGuard, can_advance_age: bool) {
let current_age = self.storage.current_age();
Expand Down
Loading