Skip to content

Commit

Permalink
fix in-memory startup indexgen
Browse files Browse the repository at this point in the history
  • Loading branch information
HaoranYi committed Dec 10, 2024
1 parent 2cea8e6 commit 0a57a47
Show file tree
Hide file tree
Showing 3 changed files with 69 additions and 14 deletions.
3 changes: 2 additions & 1 deletion accounts-db/src/accounts_db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8558,7 +8558,8 @@ impl AccountsDb {
// a given pubkey. If there is just a single item, there is no cleaning to
// be done on that pubkey. Use only those pubkeys with multiple updates.
if !dirty_pubkeys.is_empty() {
self.uncleaned_pubkeys.insert(slot, dirty_pubkeys);
let old = self.uncleaned_pubkeys.insert(slot, dirty_pubkeys);
assert!(old.is_none());
}
SlotIndexGenerationInfo {
insert_time_us,
Expand Down
42 changes: 31 additions & 11 deletions accounts-db/src/accounts_index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1713,6 +1713,10 @@ impl<T: IndexValue, U: DiskIndexValue + From<T> + Into<T>> AccountsIndex<T, U> {
duplicates
}

pub fn with_disk(&self) -> bool {
self.storage.storage.disk.is_some()
}

// Same functionally to upsert, but:
// 1. operates on a batch of items
// 2. holds the write lock for the duration of adding the items
Expand All @@ -1730,12 +1734,12 @@ impl<T: IndexValue, U: DiskIndexValue + From<T> + Into<T>> AccountsIndex<T, U> {
// this assumes the largest bin contains twice the expected amount of the average size per bin
let bins = self.bins();
let expected_items_per_bin = approx_items_len * 2 / bins;
let use_disk = self.storage.storage.is_disk_index_enabled();
let use_disk = self.with_disk();
let mut binned = (0..bins)
.map(|_| Vec::with_capacity(expected_items_per_bin))
.collect::<Vec<_>>();
let mut count = 0;
let mut dirty_pubkeys = items
let dirty_pubkeys = items
.filter_map(|(pubkey, account_info)| {
let pubkey_bin = self.bin_calculator.bin_from_pubkey(&pubkey);
// this value is equivalent to what update() below would have created if we inserted a new item
Expand Down Expand Up @@ -1775,6 +1779,7 @@ impl<T: IndexValue, U: DiskIndexValue + From<T> + Into<T>> AccountsIndex<T, U> {
} else {
// not using disk buckets, so just write to in-mem
// this is no longer the default case
let mut dup_pubkeys = vec![];
items
.into_iter()
.for_each(|(pubkey, (slot, account_info))| {
Expand All @@ -1789,11 +1794,16 @@ impl<T: IndexValue, U: DiskIndexValue + From<T> + Into<T>> AccountsIndex<T, U> {
{
InsertNewEntryResults::DidNotExist => {}
InsertNewEntryResults::ExistedNewEntryZeroLamports => {}
InsertNewEntryResults::ExistedNewEntryNonZeroLamports => {
dirty_pubkeys.push(pubkey);
InsertNewEntryResults::ExistedNewEntryNonZeroLamports(other_slot) => {
if let Some(other_slot) = other_slot {
dup_pubkeys.push((other_slot, pubkey));
}
dup_pubkeys.push((slot, pubkey));
}
}
});

r_account_maps.update_duplicates_from_in_memory_only_startup(dup_pubkeys);
}
insert_time.stop();
insertion_time.fetch_add(insert_time.as_us(), Ordering::Relaxed);
Expand All @@ -1814,13 +1824,23 @@ impl<T: IndexValue, U: DiskIndexValue + From<T> + Into<T>> AccountsIndex<T, U> {
&self,
f: impl Fn(Vec<(Slot, Pubkey)>) + Sync + Send,
) {
(0..self.bins())
.into_par_iter()
.map(|pubkey_bin| {
let r_account_maps = &self.account_maps[pubkey_bin];
r_account_maps.populate_and_retrieve_duplicate_keys_from_startup()
})
.for_each(f);
if self.with_disk() {
(0..self.bins())
.into_par_iter()
.map(|pubkey_bin| {
let r_account_maps = &self.account_maps[pubkey_bin];
r_account_maps.populate_and_retrieve_duplicate_keys_from_startup()
})
.for_each(f);
} else {
(0..self.bins())
.into_par_iter()
.map(|pubkey_bin| {
let r_account_maps = &self.account_maps[pubkey_bin];
r_account_maps.get_duplicates_from_in_memory_only_startup()
})
.for_each(f);
}
}

/// Updates the given pubkey at the given slot with the new account information.
Expand Down
38 changes: 36 additions & 2 deletions accounts-db/src/accounts_index/in_mem_accounts_index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ impl<T: IndexValue, U: DiskIndexValue + From<T> + Into<T>> Debug for InMemAccoun
pub enum InsertNewEntryResults {
DidNotExist,
ExistedNewEntryZeroLamports,
ExistedNewEntryNonZeroLamports,
ExistedNewEntryNonZeroLamports(Option<Slot>),
}

#[derive(Default, Debug)]
Expand All @@ -153,6 +153,10 @@ struct StartupInfo<T: IndexValue, U: DiskIndexValue + From<T> + Into<T>> {
insert: Mutex<Vec<(Pubkey, (Slot, U))>>,
/// pubkeys with more than 1 entry
duplicates: Mutex<StartupInfoDuplicates<T>>,

/// (slot, pubkey) pairs that are duplicates when we are starting from in-memory only index.
/// And this field is only populated and used when we are building the in-memory only index.
duplicate_from_in_memory_only: Mutex<Vec<(Slot, Pubkey)>>,
}

#[derive(Default, Debug)]
Expand Down Expand Up @@ -727,6 +731,19 @@ impl<T: IndexValue, U: DiskIndexValue + From<T> + Into<T>> InMemAccountsIndex<T,
.fetch_add(m.end_as_us(), Ordering::Relaxed);
}

pub fn update_duplicates_from_in_memory_only_startup(&self, items: Vec<(Slot, Pubkey)>) {
assert!(self.storage.get_startup());
assert!(self.bucket.is_none());

let mut duplicates = self
.startup_info
.duplicate_from_in_memory_only
.lock()
.unwrap();

duplicates.extend(items);
}

pub fn insert_new_entry_if_missing_with_lock(
&self,
pubkey: Pubkey,
Expand All @@ -737,10 +754,18 @@ impl<T: IndexValue, U: DiskIndexValue + From<T> + Into<T>> InMemAccountsIndex<T,
let entry = map.entry(pubkey);
m.stop();
let new_entry_zero_lamports = new_entry.is_zero_lamport();
let mut other_slot = None;
let (found_in_mem, already_existed) = match entry {
Entry::Occupied(occupied) => {
// in cache, so merge into cache
let (slot, account_info) = new_entry.into();

let slot_list = occupied.get().slot_list.read().unwrap();
if slot_list.len() == 1 {
other_slot = Some(slot_list[0].0);
}
drop(slot_list);

InMemAccountsIndex::<T, U>::lock_and_update_slot_list(
occupied.get(),
(slot, account_info),
Expand Down Expand Up @@ -796,7 +821,7 @@ impl<T: IndexValue, U: DiskIndexValue + From<T> + Into<T>> InMemAccountsIndex<T,
} else if new_entry_zero_lamports {
InsertNewEntryResults::ExistedNewEntryZeroLamports
} else {
InsertNewEntryResults::ExistedNewEntryNonZeroLamports
InsertNewEntryResults::ExistedNewEntryNonZeroLamports(other_slot)
}
}

Expand Down Expand Up @@ -1147,6 +1172,15 @@ impl<T: IndexValue, U: DiskIndexValue + From<T> + Into<T>> InMemAccountsIndex<T,
.collect()
}

pub fn get_duplicates_from_in_memory_only_startup(&self) -> Vec<(Slot, Pubkey)> {
let mut duplicates = self
.startup_info
.duplicate_from_in_memory_only
.lock()
.unwrap();
std::mem::take(&mut *duplicates)
}

/// synchronize the in-mem index with the disk index
fn flush_internal(&self, flush_guard: &FlushGuard, can_advance_age: bool) {
let current_age = self.storage.current_age();
Expand Down

0 comments on commit 0a57a47

Please sign in to comment.