Skip to content

Commit

Permalink
unref accounts in shink and pack when we're committed
Browse files Browse the repository at this point in the history
  • Loading branch information
jeffwashington committed Aug 30, 2024
1 parent dbaea8f commit cf8ee7f
Show file tree
Hide file tree
Showing 2 changed files with 52 additions and 65 deletions.
74 changes: 41 additions & 33 deletions accounts-db/src/accounts_db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3964,7 +3964,6 @@ impl AccountsDb {
self.accounts_index.scan(
accounts.iter().map(|account| account.pubkey()),
|pubkey, slots_refs, _entry| {
let mut result = AccountsIndexScanResult::OnlyKeepInMemoryIfDirty;
let stored_account = &accounts[index];
let mut do_populate_accounts_for_shrink = |ref_count, slot_list| {
if stored_account.is_zero_lamport()
Expand Down Expand Up @@ -4001,7 +4000,6 @@ impl AccountsDb {
// not exist in the re-written slot. Unref it to keep the index consistent with
// rewriting the storage entries.
unrefed_pubkeys.push(pubkey);
result = AccountsIndexScanResult::Unref;
dead += 1;
} else {
do_populate_accounts_for_shrink(ref_count, slot_list);
Expand All @@ -4018,7 +4016,7 @@ impl AccountsDb {
do_populate_accounts_for_shrink(ref_count, &slot_list);
}
index += 1;
result
AccountsIndexScanResult::OnlyKeepInMemoryIfDirty
},
None,
true,
Expand Down Expand Up @@ -4306,6 +4304,35 @@ impl AccountsDb {
.fetch_add(time.as_us(), Ordering::Relaxed);
}

pub(crate) fn unref_shrunk_dead_accounts<'a>(
&self,
unrefed_pubkeys: impl Iterator<Item = &'a Pubkey>,
slot: Slot,
) {
self.accounts_index.scan(
unrefed_pubkeys,
|pubkey, slot_refs, _entry| {
// pubkeys in `unrefed_pubkeys` were unref'd in `shrink_collect` above under the assumption that we would shrink everything.
// Since shrink is not occurring, we need to addref the pubkeys to get the system back to the prior state since the account still exists at this slot.
if slot_refs.is_none() {
// We also expect that the accounts index must contain an
// entry for `pubkey`. Log a warning for now. In future,
// we will panic when this happens.
warn!("pubkey {pubkey} in slot {slot} was NOT found in accounts index during shrink");
datapoint_warn!(
"accounts_db-shink_pubkey_missing_from_index",
("store_slot", slot, i64),
("pubkey", pubkey.to_string(), String),
)
}
AccountsIndexScanResult::Unref
},
None,
false,
ScanFilter::All,
);
}

fn do_shrink_slot_store(&self, slot: Slot, store: &Arc<AccountStorageEntry>) {
if self.accounts_cache.contains(slot) {
// It is not correct to shrink a slot while it is in the write cache until flush is complete and the slot is removed from the write cache.
Expand Down Expand Up @@ -4338,43 +4365,24 @@ impl AccountsDb {
// clean needs to take care of this dead slot
self.accounts_index.add_uncleaned_roots([slot]);
}
info!(
"Unexpected shrink for slot {} alive {} capacity {}, \
likely caused by a bug for calculating alive bytes.",
slot, shrink_collect.alive_total_bytes, shrink_collect.capacity
);
if !shrink_collect.all_are_zero_lamports {
// if all are zero lamports, then we expect that we would like to mark the whole slot dead, but we cannot. That's clean's job.
info!(
"Unexpected shrink for slot {} alive {} capacity {}, \
likely caused by a bug for calculating alive bytes. All alive bytes are zero: {}, {}",
slot, shrink_collect.alive_total_bytes, shrink_collect.capacity,
store.alive_bytes(), shrink_collect.zero_lamport_single_ref_pubkeys.len() * aligned_stored_size(0),
);
}

self.shrink_stats
.skipped_shrink
.fetch_add(1, Ordering::Relaxed);

self.accounts_index.scan(
shrink_collect.unrefed_pubkeys.into_iter(),
|pubkey, _slot_refs, entry| {
// pubkeys in `unrefed_pubkeys` were unref'd in `shrink_collect` above under the assumption that we would shrink everything.
// Since shrink is not occurring, we need to addref the pubkeys to get the system back to the prior state since the account still exists at this slot.
if let Some(entry) = entry {
entry.addref();
} else {
// We also expect that the accounts index must contain an
// entry for `pubkey`. Log a warning for now. In future,
// we will panic when this happens.
warn!("pubkey {pubkey} in slot {slot} was NOT found in accounts index during shrink");
datapoint_warn!(
"accounts_db-shink_pubkey_missing_from_index",
("store_slot", slot, i64),
("pubkey", pubkey.to_string(), String),
)
}
AccountsIndexScanResult::OnlyKeepInMemoryIfDirty
},
None,
true,
ScanFilter::All,
);
return;
}

self.unref_shrunk_dead_accounts(shrink_collect.unrefed_pubkeys.iter().cloned(), slot);

let total_accounts_after_shrink = shrink_collect.alive_accounts.len();
debug!(
"shrinking: slot: {}, accounts: ({} => {}) bytes: {} original: {}",
Expand Down
43 changes: 11 additions & 32 deletions accounts-db/src/ancient_append_vecs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,11 @@ use {
ShrinkCollectAliveSeparatedByRefs, ShrinkStatsSub,
},
accounts_file::AccountsFile,
accounts_index::{AccountsIndexScanResult, ScanFilter},
active_stats::ActiveStatItem,
storable_accounts::{StorableAccounts, StorableAccountsBySlot},
},
rand::{thread_rng, Rng},
rayon::prelude::{IntoParallelIterator, IntoParallelRefIterator, ParallelIterator},
rayon::prelude::{IntoParallelRefIterator, ParallelIterator},
solana_measure::measure_us,
solana_sdk::clock::Slot,
std::{
Expand Down Expand Up @@ -440,7 +439,6 @@ impl AccountsDb {
accounts_to_combine.target_slots_sorted.last(),
many_refs_newest.last().map(|accounts| accounts.slot)
);
self.addref_accounts_failed_to_shrink_ancient(accounts_to_combine.accounts_to_combine);
return;
}

Expand Down Expand Up @@ -468,12 +466,19 @@ impl AccountsDb {

if pack.len() > accounts_to_combine.target_slots_sorted.len() {
// Not enough slots to contain the accounts we are trying to pack.
// `shrink_collect` previously unref'd some accounts. We need to addref them
// to restore the correct state since we failed to combine anything.
self.addref_accounts_failed_to_shrink_ancient(accounts_to_combine.accounts_to_combine);
return;
}

accounts_to_combine
.accounts_to_combine
.iter()
.for_each(|combine| {
self.unref_shrunk_dead_accounts(
combine.unrefed_pubkeys.iter().cloned(),
combine.slot,
);
});

let write_ancient_accounts = self.write_packed_storages(&accounts_to_combine, pack);

self.finish_combine_ancient_slots_packed_internal(
Expand All @@ -483,29 +488,6 @@ impl AccountsDb {
);
}

/// for each account in `unrefed_pubkeys`, in each `accounts_to_combine`, addref
fn addref_accounts_failed_to_shrink_ancient<'a>(
&self,
accounts_to_combine: Vec<ShrinkCollect<'a, ShrinkCollectAliveSeparatedByRefs<'a>>>,
) {
self.thread_pool_clean.install(|| {
accounts_to_combine.into_par_iter().for_each(|combine| {
self.accounts_index.scan(
combine.unrefed_pubkeys.into_iter(),
|_pubkey, _slots_refs, entry| {
if let Some(entry) = entry {
entry.addref();
}
AccountsIndexScanResult::OnlyKeepInMemoryIfDirty
},
None,
true,
ScanFilter::All,
);
});
});
}

/// calculate all storage info for the storages in slots
/// Then, apply 'tuning' to filter out slots we do NOT want to combine.
fn collect_sort_filter_ancient_slots(
Expand Down Expand Up @@ -858,9 +840,6 @@ impl AccountsDb {
}
}
let unpackable_slots_count = remove.len();
remove.into_iter().rev().for_each(|i| {
self.addref_accounts_failed_to_shrink_ancient(vec![accounts_to_combine.remove(i)]);
});
target_slots_sorted.sort_unstable();
self.shrink_ancient_stats
.slots_cannot_move_count
Expand Down

0 comments on commit cf8ee7f

Please sign in to comment.