Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

wip: reduce time doing index lookup during ancient packing (and later for shrink) #99

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 31 additions & 10 deletions accounts-db/src/accounts_db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -486,7 +486,7 @@ pub(crate) struct ShrinkCollect<'a, T: ShrinkCollectRefs<'a>> {
pub(crate) all_are_zero_lamports: bool,
/// index entries that need to be held in memory while shrink is in progress
/// These aren't read - they are just held so that entries cannot be flushed.
pub(crate) _index_entries_being_shrunk: Vec<AccountMapEntry<AccountInfo>>,
pub(crate) index_entries_being_shrunk: Vec<AccountMapEntry<AccountInfo>>,
}

pub const ACCOUNTS_DB_CONFIG_FOR_TESTING: AccountsDbConfig = AccountsDbConfig {
Expand Down Expand Up @@ -3750,22 +3750,37 @@ impl AccountsDb {
accounts: &'a [StoredAccountMeta<'a>],
stats: &ShrinkStats,
slot_to_shrink: Slot,
bin: Option<usize>,
) -> LoadAccountsIndexForShrink<'a, T> {
let count = accounts.len();
let mut alive_accounts = T::with_capacity(count, slot_to_shrink);
let mut unrefed_pubkeys = Vec::with_capacity(count);

let mut alive = 0;
let mut dead = 0;
let mut index = 0;
let index = AtomicUsize::default();
let mut all_are_zero_lamports = true;
let mut index_entries_being_shrunk = Vec::with_capacity(accounts.len());
let bin_calc = &self.accounts_index.bin_calculator;
self.accounts_index.scan(
accounts.iter().map(|account| account.pubkey()),
accounts.iter().filter_map(|account| {
let pk = account.pubkey();
if let Some(bin) = bin {
if bin_calc.bin_from_pubkey(pk) == bin {
Some(pk)
} else {
// skipped this one
index.fetch_add(1, Ordering::Relaxed);
None
}
} else {
Some(pk)
}
}),
|pubkey, slots_refs, entry| {
let mut result = AccountsIndexScanResult::OnlyKeepInMemoryIfDirty;
if let Some((slot_list, ref_count)) = slots_refs {
let stored_account = &accounts[index];
let stored_account = &accounts[index.fetch_add(1, Ordering::Relaxed)];
let is_alive = slot_list.iter().any(|(slot, _acct_info)| {
// if the accounts index contains an entry at this slot, then the append vec we're asking about contains this item and thus, it is alive at this slot
*slot == slot_to_shrink
Expand All @@ -3787,13 +3802,12 @@ impl AccountsDb {
alive += 1;
}
}
index += 1;
result
},
None,
true,
);
assert_eq!(index, std::cmp::min(accounts.len(), count));
// assert_eq!(index, std::cmp::min(accounts.len(), count));
stats.alive_accounts.fetch_add(alive, Ordering::Relaxed);
stats.dead_accounts.fetch_add(dead, Ordering::Relaxed);

Expand Down Expand Up @@ -3847,6 +3861,7 @@ impl AccountsDb {
store: &'a Arc<AccountStorageEntry>,
unique_accounts: &'b GetUniqueAccountsResult<'b>,
stats: &ShrinkStats,
bin: Option<usize>,
) -> ShrinkCollect<'b, T> {
let slot = store.slot();

Expand Down Expand Up @@ -3874,7 +3889,7 @@ impl AccountsDb {
mut unrefed_pubkeys,
all_are_zero_lamports,
mut index_entries_being_shrunk,
} = self.load_accounts_index_for_shrink(stored_accounts, stats, slot);
} = self.load_accounts_index_for_shrink(stored_accounts, stats, slot, bin);

// collect
alive_accounts_collect
Expand Down Expand Up @@ -3926,7 +3941,7 @@ impl AccountsDb {
alive_total_bytes,
total_starting_accounts: len,
all_are_zero_lamports: all_are_zero_lamports_collect.into_inner().unwrap(),
_index_entries_being_shrunk: index_entries_being_shrunk_outer.into_inner().unwrap(),
index_entries_being_shrunk: index_entries_being_shrunk_outer.into_inner().unwrap(),
}
}

Expand Down Expand Up @@ -3983,8 +3998,12 @@ impl AccountsDb {
let unique_accounts =
self.get_unique_accounts_from_storage_for_shrink(store, &self.shrink_stats);
debug!("do_shrink_slot_store: slot: {}", slot);
let shrink_collect =
self.shrink_collect::<AliveAccounts<'_>>(store, &unique_accounts, &self.shrink_stats);
let shrink_collect = self.shrink_collect::<AliveAccounts<'_>>(
store,
&unique_accounts,
&self.shrink_stats,
None,
);

// This shouldn't happen if alive_bytes/approx_stored_count are accurate
if Self::should_not_shrink(
Expand Down Expand Up @@ -4539,6 +4558,7 @@ impl AccountsDb {
old_storage,
&unique_accounts,
&self.shrink_ancient_stats.shrink_stats,
None,
);

// could follow what shrink does more closely
Expand Down Expand Up @@ -16980,6 +17000,7 @@ pub mod tests {
&storage,
&unique_accounts,
&ShrinkStats::default(),
None,
);
let expect_single_opposite_alive_account =
if append_opposite_alive_account {
Expand Down
97 changes: 84 additions & 13 deletions accounts-db/src/ancient_append_vecs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -611,18 +611,89 @@ impl AccountsDb {

// `shrink_collect` all accounts in the append vecs we want to combine.
// This also unrefs all dead accounts in those append vecs.
let mut accounts_to_combine = self.thread_pool_clean.install(|| {
accounts_per_storage
.par_iter()
.map(|(info, unique_accounts)| {
self.shrink_collect::<ShrinkCollectAliveSeparatedByRefs<'_>>(
&info.storage,
unique_accounts,
&self.shrink_ancient_stats.shrink_stats,
)
})
.collect::<Vec<_>>()
});
let mut accounts_to_combine =
self.thread_pool_clean.install(|| {
let mut result = Vec::default();
for bin in 0..self.accounts_index.bins() {
let this_bin = accounts_per_storage
.par_iter()
.map(|(info, unique_accounts)| {
self.shrink_collect::<ShrinkCollectAliveSeparatedByRefs<'_>>(
&info.storage,
unique_accounts,
&self.shrink_ancient_stats.shrink_stats,
Some(bin),
)
})
.collect::<Vec<_>>();
if result.is_empty() {
result = this_bin;
} else {
// accumulate results from each bin. We need 1 entry per slot
this_bin.into_iter().zip(result.iter_mut()).for_each(
|(mut this_bin, all_bins)| {
all_bins.alive_accounts.many_refs_old_alive.accounts.append(
&mut this_bin.alive_accounts.many_refs_old_alive.accounts,
);
all_bins.alive_accounts.many_refs_old_alive.bytes +=
this_bin.alive_accounts.many_refs_old_alive.bytes;
assert_eq!(
all_bins.alive_accounts.many_refs_old_alive.slot,
this_bin.alive_accounts.many_refs_old_alive.slot
);

all_bins
.alive_accounts
.many_refs_this_is_newest_alive
.accounts
.append(
&mut this_bin
.alive_accounts
.many_refs_this_is_newest_alive
.accounts,
);
all_bins.alive_accounts.many_refs_this_is_newest_alive.bytes +=
this_bin.alive_accounts.many_refs_this_is_newest_alive.bytes;
assert_eq!(
all_bins.alive_accounts.many_refs_this_is_newest_alive.slot,
this_bin.alive_accounts.many_refs_this_is_newest_alive.slot
);

all_bins
.alive_accounts
.one_ref
.accounts
.append(&mut this_bin.alive_accounts.one_ref.accounts);
all_bins.alive_accounts.one_ref.bytes +=
this_bin.alive_accounts.one_ref.bytes;
all_bins.alive_total_bytes += this_bin.alive_total_bytes;
assert_eq!(
all_bins.alive_accounts.one_ref.slot,
this_bin.alive_accounts.one_ref.slot
);

// slot, capacity, total_starting_accounts: same for all bins
assert_eq!(all_bins.slot, this_bin.slot);
assert_eq!(all_bins.capacity, this_bin.capacity);
assert_eq!(
all_bins.total_starting_accounts,
this_bin.total_starting_accounts
);

all_bins.all_are_zero_lamports = all_bins.all_are_zero_lamports
&& this_bin.all_are_zero_lamports;
all_bins
.unrefed_pubkeys
.append(&mut this_bin.unrefed_pubkeys);
all_bins
.index_entries_being_shrunk
.append(&mut this_bin.index_entries_being_shrunk);
},
);
}
}
result
});

let mut remove = Vec::default();
for (i, (shrink_collect, (info, _unique_accounts))) in accounts_to_combine
Expand Down Expand Up @@ -3301,7 +3372,7 @@ pub mod tests {
alive_total_bytes: 0,
total_starting_accounts: 0,
all_are_zero_lamports: false,
_index_entries_being_shrunk: Vec::default(),
index_entries_being_shrunk: Vec::default(),
};
let accounts_to_combine = AccountsToCombine {
accounts_keep_slots: HashMap::default(),
Expand Down
Loading