Skip to content

Commit

Permalink
unref accounts in shink and pack when we're committed
Browse files Browse the repository at this point in the history
  • Loading branch information
jeffwashington committed Aug 30, 2024
1 parent dbaea8f commit d4ce8cc
Show file tree
Hide file tree
Showing 2 changed files with 43 additions and 60 deletions.
60 changes: 32 additions & 28 deletions accounts-db/src/accounts_db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3964,7 +3964,6 @@ impl AccountsDb {
self.accounts_index.scan(
accounts.iter().map(|account| account.pubkey()),
|pubkey, slots_refs, _entry| {
let mut result = AccountsIndexScanResult::OnlyKeepInMemoryIfDirty;
let stored_account = &accounts[index];
let mut do_populate_accounts_for_shrink = |ref_count, slot_list| {
if stored_account.is_zero_lamport()
Expand Down Expand Up @@ -4001,7 +4000,6 @@ impl AccountsDb {
// not exist in the re-written slot. Unref it to keep the index consistent with
// rewriting the storage entries.
unrefed_pubkeys.push(pubkey);
result = AccountsIndexScanResult::Unref;
dead += 1;
} else {
do_populate_accounts_for_shrink(ref_count, slot_list);
Expand All @@ -4018,7 +4016,7 @@ impl AccountsDb {
do_populate_accounts_for_shrink(ref_count, &slot_list);
}
index += 1;
result
AccountsIndexScanResult::OnlyKeepInMemoryIfDirty
},
None,
true,
Expand Down Expand Up @@ -4306,6 +4304,35 @@ impl AccountsDb {
.fetch_add(time.as_us(), Ordering::Relaxed);
}

pub(crate) fn unref_shrunk_dead_accounts<'a>(
&self,
unrefed_pubkeys: impl Iterator<Item = &'a Pubkey>,
slot: Slot,
) {
self.accounts_index.scan(
unrefed_pubkeys,
|pubkey, slot_refs, _entry| {
// pubkeys in `unrefed_pubkeys` were unref'd in `shrink_collect` above under the assumption that we would shrink everything.
// Since shrink is not occurring, we need to addref the pubkeys to get the system back to the prior state since the account still exists at this slot.
if slot_refs.is_none() {
// We also expect that the accounts index must contain an
// entry for `pubkey`. Log a warning for now. In future,
// we will panic when this happens.
warn!("pubkey {pubkey} in slot {slot} was NOT found in accounts index during shrink");
datapoint_warn!(
"accounts_db-shink_pubkey_missing_from_index",
("store_slot", slot, i64),
("pubkey", pubkey.to_string(), String),
)
}
AccountsIndexScanResult::Unref
},
None,
false,
ScanFilter::All,
);
}

fn do_shrink_slot_store(&self, slot: Slot, store: &Arc<AccountStorageEntry>) {
if self.accounts_cache.contains(slot) {
// It is not correct to shrink a slot while it is in the write cache until flush is complete and the slot is removed from the write cache.
Expand Down Expand Up @@ -4347,34 +4374,11 @@ impl AccountsDb {
self.shrink_stats
.skipped_shrink
.fetch_add(1, Ordering::Relaxed);

self.accounts_index.scan(
shrink_collect.unrefed_pubkeys.into_iter(),
|pubkey, _slot_refs, entry| {
// pubkeys in `unrefed_pubkeys` were unref'd in `shrink_collect` above under the assumption that we would shrink everything.
// Since shrink is not occurring, we need to addref the pubkeys to get the system back to the prior state since the account still exists at this slot.
if let Some(entry) = entry {
entry.addref();
} else {
// We also expect that the accounts index must contain an
// entry for `pubkey`. Log a warning for now. In future,
// we will panic when this happens.
warn!("pubkey {pubkey} in slot {slot} was NOT found in accounts index during shrink");
datapoint_warn!(
"accounts_db-shink_pubkey_missing_from_index",
("store_slot", slot, i64),
("pubkey", pubkey.to_string(), String),
)
}
AccountsIndexScanResult::OnlyKeepInMemoryIfDirty
},
None,
true,
ScanFilter::All,
);
return;
}

self.unref_shrunk_dead_accounts(shrink_collect.unrefed_pubkeys.iter().cloned(), slot);

let total_accounts_after_shrink = shrink_collect.alive_accounts.len();
debug!(
"shrinking: slot: {}, accounts: ({} => {}) bytes: {} original: {}",
Expand Down
43 changes: 11 additions & 32 deletions accounts-db/src/ancient_append_vecs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,11 @@ use {
ShrinkCollectAliveSeparatedByRefs, ShrinkStatsSub,
},
accounts_file::AccountsFile,
accounts_index::{AccountsIndexScanResult, ScanFilter},
active_stats::ActiveStatItem,
storable_accounts::{StorableAccounts, StorableAccountsBySlot},
},
rand::{thread_rng, Rng},
rayon::prelude::{IntoParallelIterator, IntoParallelRefIterator, ParallelIterator},
rayon::prelude::{IntoParallelRefIterator, ParallelIterator},
solana_measure::measure_us,
solana_sdk::clock::Slot,
std::{
Expand Down Expand Up @@ -440,7 +439,6 @@ impl AccountsDb {
accounts_to_combine.target_slots_sorted.last(),
many_refs_newest.last().map(|accounts| accounts.slot)
);
self.addref_accounts_failed_to_shrink_ancient(accounts_to_combine.accounts_to_combine);
return;
}

Expand Down Expand Up @@ -468,12 +466,19 @@ impl AccountsDb {

if pack.len() > accounts_to_combine.target_slots_sorted.len() {
// Not enough slots to contain the accounts we are trying to pack.
// `shrink_collect` previously unref'd some accounts. We need to addref them
// to restore the correct state since we failed to combine anything.
self.addref_accounts_failed_to_shrink_ancient(accounts_to_combine.accounts_to_combine);
return;
}

accounts_to_combine
.accounts_to_combine
.iter()
.for_each(|combine| {
self.unref_shrunk_dead_accounts(
combine.unrefed_pubkeys.iter().cloned(),
combine.slot,
);
});

let write_ancient_accounts = self.write_packed_storages(&accounts_to_combine, pack);

self.finish_combine_ancient_slots_packed_internal(
Expand All @@ -483,29 +488,6 @@ impl AccountsDb {
);
}

/// for each account in `unrefed_pubkeys`, in each `accounts_to_combine`, addref
fn addref_accounts_failed_to_shrink_ancient<'a>(
&self,
accounts_to_combine: Vec<ShrinkCollect<'a, ShrinkCollectAliveSeparatedByRefs<'a>>>,
) {
self.thread_pool_clean.install(|| {
accounts_to_combine.into_par_iter().for_each(|combine| {
self.accounts_index.scan(
combine.unrefed_pubkeys.into_iter(),
|_pubkey, _slots_refs, entry| {
if let Some(entry) = entry {
entry.addref();
}
AccountsIndexScanResult::OnlyKeepInMemoryIfDirty
},
None,
true,
ScanFilter::All,
);
});
});
}

/// calculate all storage info for the storages in slots
/// Then, apply 'tuning' to filter out slots we do NOT want to combine.
fn collect_sort_filter_ancient_slots(
Expand Down Expand Up @@ -858,9 +840,6 @@ impl AccountsDb {
}
}
let unpackable_slots_count = remove.len();
remove.into_iter().rev().for_each(|i| {
self.addref_accounts_failed_to_shrink_ancient(vec![accounts_to_combine.remove(i)]);
});
target_slots_sorted.sort_unstable();
self.shrink_ancient_stats
.slots_cannot_move_count
Expand Down

0 comments on commit d4ce8cc

Please sign in to comment.