Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

wip: add uncleaned roots when we shrink away non-zero lamport accounts #2865

Draft
wants to merge 7 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
115 changes: 108 additions & 7 deletions accounts-db/src/accounts_db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1091,6 +1091,10 @@ pub struct AccountStorageEntry {
approx_store_count: AtomicUsize,

alive_bytes: AtomicUsize,

/// offsets to accounts that are zero lamport single ref stored in this storage.
/// These are still alive. But, shrink will be able to remove them.
zero_lamport_single_ref_offsets: RwLock<IntSet<usize>>,

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to keep all the offsets here?
Is the count of the number of single-ref zeros in the storage enough?

}

impl AccountStorageEntry {
Expand All @@ -1112,6 +1116,7 @@ impl AccountStorageEntry {
count_and_status: SeqLock::new((0, AccountStorageStatus::Available)),
approx_store_count: AtomicUsize::new(0),
alive_bytes: AtomicUsize::new(0),
zero_lamport_single_ref_offsets: RwLock::default(),
}
}

Expand All @@ -1130,6 +1135,7 @@ impl AccountStorageEntry {
approx_store_count: AtomicUsize::new(self.approx_stored_count()),
alive_bytes: AtomicUsize::new(self.alive_bytes()),
accounts,
zero_lamport_single_ref_offsets: RwLock::default(),
})
}

Expand All @@ -1146,6 +1152,7 @@ impl AccountStorageEntry {
count_and_status: SeqLock::new((0, AccountStorageStatus::Available)),
approx_store_count: AtomicUsize::new(num_accounts),
alive_bytes: AtomicUsize::new(0),
zero_lamport_single_ref_offsets: RwLock::default(),
}
}

Expand Down Expand Up @@ -1991,6 +1998,7 @@ pub(crate) struct ShrinkAncientStats {
pub(crate) many_refs_old_alive: AtomicU64,
pub(crate) slots_eligible_to_shrink: AtomicU64,
pub(crate) total_dead_bytes: AtomicU64,
pub(crate) slot: AtomicU64,
pub(crate) total_alive_bytes: AtomicU64,
}

Expand Down Expand Up @@ -2041,6 +2049,9 @@ pub struct ShrinkStats {
purged_zero_lamports: AtomicU64,
accounts_not_found_in_index: AtomicU64,
num_ancient_slots_shrunk: AtomicU64,
adding_dead_slots_to_clean: AtomicU64,
adding_shrink_slots_from_zeros: AtomicU64,
marking_zero_dead_accounts: AtomicU64,
}

impl ShrinkStats {
Expand Down Expand Up @@ -2164,6 +2175,21 @@ impl ShrinkStats {
self.accounts_not_found_in_index.swap(0, Ordering::Relaxed),
i64
),
(
"adding_dead_slots_to_clean",
self.adding_dead_slots_to_clean.swap(0, Ordering::Relaxed),
i64
),
(
"adding_shrink_slots_from_zeros",
self.adding_shrink_slots_from_zeros.swap(0, Ordering::Relaxed),
i64
),
(
"marking_zero_dead_accounts",
self.marking_zero_dead_accounts.swap(0, Ordering::Relaxed),
i64
),
);
}
}
Expand Down Expand Up @@ -2323,6 +2349,11 @@ impl ShrinkAncientStats {
self.slots_eligible_to_shrink.swap(0, Ordering::Relaxed),
i64
),
(
"slot",
self.slot.swap(0, Ordering::Relaxed),
i64
),
(
"total_dead_bytes",
self.total_dead_bytes.swap(0, Ordering::Relaxed),
Expand Down Expand Up @@ -3053,7 +3084,9 @@ impl AccountsDb {
result,
-((epoch_schedule.slots_per_epoch as i64).saturating_sub(1)),
);
result.min(max_root_inclusive)
let result = result.min(max_root_inclusive);
log::error!("oldest non ancient slot: {result}, max root: {max_root_inclusive}, diff: {}, offset: {:?}", max_root_inclusive.saturating_sub(result), self.ancient_append_vec_offset);
result
}

/// Collect all the uncleaned slots, up to a max slot
Expand Down Expand Up @@ -3597,6 +3630,7 @@ impl AccountsDb {
self.clean_accounts_stats.report();
datapoint_info!(
"clean_accounts",
("slot", max_clean_root_inclusive.unwrap_or_default(), i64),
("total_us", measure_all.as_us(), i64),
(
"collect_delta_keys_us",
Expand Down Expand Up @@ -3972,6 +4006,7 @@ impl AccountsDb {
let mut alive = 0;
let mut dead = 0;
let mut index = 0;
let mut adding_slots_to_clean = 0;
let mut index_scan_returned_some_count = 0;
let mut index_scan_returned_none_count = 0;
let mut all_are_zero_lamports = true;
Expand Down Expand Up @@ -4016,6 +4051,21 @@ impl AccountsDb {
// rewriting the storage entries.
pubkeys_to_unref.push(pubkey);
dead += 1;

// If we are marking something dead, and the only remaining alive account is zero lamport, then make that zero lamport slot ready to be cleaned.
// If that slot happens to only contain zero lamport accounts, the whole slot will go away
if slot_list.len() == 1 && ref_count == 2 {
// probably should have this check after we unref
// should we also check for ref counts here?
if let Some((slot_alive, acct_info)) = slot_list.first() {
if acct_info.is_zero_lamport() && !acct_info.is_cached() {
self.zero_lamport_single_ref_found(
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this hsould probably ALSO run when we are doing clean and we find a zero lamport account that has a single ref. This is a reason why the hashset may be necessary so we don't double-count it.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

*slot_alive,
acct_info.offset(),
);
}
}
}
} else {
do_populate_accounts_for_shrink(ref_count, slot_list);
}
Expand Down Expand Up @@ -4353,6 +4403,37 @@ impl AccountsDb {
);
}

pub(crate) fn zero_lamport_single_ref_found(&self, slot: Slot, offset: usize) {
if let Some(store) = self.storage.get_slot_storage_entry(slot) {
let mut zero_lamport_single_ref_offsets =

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we move this logic one level up, i.e. after 'index.scan(...)`?

In index.scan(...), we keep a counter of how many single_ref_zeros.
After the index.scan(), we do the check below.

In this way, we don't need zero_lamport_single_ref_offsets: RwLock<IntSet> on account_storage at all?

Can we just keep the count, zero_lamport_single_ref_counts on account_storage instead, for the later usage in do_shrink_store()?

store.zero_lamport_single_ref_offsets.write().unwrap();
if zero_lamport_single_ref_offsets.insert(offset) {
// this wasn't previously marked as zero lamport single ref
if zero_lamport_single_ref_offsets.len() == store.count() {
// all accounts in this storage can be dead
self.accounts_index.add_uncleaned_roots([slot].into_iter());
self.shrink_stats
.adding_dead_slots_to_clean
.fetch_add(1, Ordering::Relaxed);
} else {
if Self::is_shrinking_productive(slot, &store)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: we can check if slot is already in self.shrink_candidate_slot here.
if it is already in, we can skip this if shrinking_productive check () ....

&& self.is_candidate_for_shrink(&store)
{
// this store might be eligible for shrinking now
self.shrink_candidate_slots.lock().unwrap().insert(slot);
self.shrink_stats
.adding_shrink_slots_from_zeros
.fetch_add(1, Ordering::Relaxed);
} else {
self.shrink_stats
.marking_zero_dead_accounts
.fetch_add(1, Ordering::Relaxed);
}
}
}
}
}

fn do_shrink_slot_store(&self, slot: Slot, store: &Arc<AccountStorageEntry>) {
if self.accounts_cache.contains(slot) {
// It is not correct to shrink a slot while it is in the write cache until flush is complete and the slot is removed from the write cache.
Expand Down Expand Up @@ -8034,7 +8115,14 @@ impl AccountsDb {
let alive_bytes = store.alive_bytes() as u64;
let total_bytes = store.capacity();

if Self::should_not_shrink(alive_bytes, total_bytes) {
let zero_lamport_dead_bytes = store.accounts.dead_bytes_due_to_zero_lamport_single_ref(
store.zero_lamport_single_ref_offsets.read().unwrap().len(),
);

if Self::should_not_shrink(
alive_bytes.saturating_sub(zero_lamport_dead_bytes as u64),
total_bytes,
) {
trace!(
"shrink_slot_forced ({}): not able to shrink at all: alive/stored: {}/{} ({}b / {}b) save: {}",
slot,
Expand All @@ -8061,12 +8149,17 @@ impl AccountsDb {
} else {
store.capacity()
};

let zero_lamport_dead_bytes = store.accounts.dead_bytes_due_to_zero_lamport_single_ref(
store.zero_lamport_single_ref_offsets.read().unwrap().len(),
);

let alive_bytes = store.alive_bytes().saturating_sub(zero_lamport_dead_bytes) as u64;

match self.shrink_ratio {
AccountShrinkThreshold::TotalSpace { shrink_ratio: _ } => {
(store.alive_bytes() as u64) < total_bytes
}
AccountShrinkThreshold::TotalSpace { shrink_ratio: _ } => alive_bytes < total_bytes,
AccountShrinkThreshold::IndividualStore { shrink_ratio } => {
(store.alive_bytes() as f64 / total_bytes as f64) < shrink_ratio
(alive_bytes as f64 / total_bytes as f64) < shrink_ratio
}
}
}
Expand Down Expand Up @@ -8833,7 +8926,7 @@ impl AccountsDb {

fn generate_index_for_slot(
&self,
storage: &AccountStorageEntry,
storage: &Arc<AccountStorageEntry>,
slot: Slot,
store_id: AccountsFileId,
rent_collector: &RentCollector,
Expand All @@ -8850,15 +8943,23 @@ impl AccountsDb {
let mut amount_to_top_off_rent = 0;
let mut stored_size_alive = 0;

let mut all_accounts_are_zero_lamports = true;

let (dirty_pubkeys, insert_time_us, mut generate_index_results) = {
let mut items_local = Vec::default();
storage.accounts.scan_index(|info| {
stored_size_alive += info.stored_size_aligned;
if info.index_info.lamports > 0 {
accounts_data_len += info.index_info.data_len;
all_accounts_are_zero_lamports = false;
}
items_local.push(info.index_info);
});
if all_accounts_are_zero_lamports {
// this whole slot can likely be marked dead and dropped. Clean has to determine that. There could be an older non-zero account for any of these zero lamport accounts.
self.dirty_stores.insert(slot, Arc::clone(storage));
self.accounts_index.add_uncleaned_roots([slot].into_iter());
}
let items = items_local.into_iter().map(|info| {
if let Some(amount_to_top_off_rent_this_account) = Self::stats_for_rent_payers(
&info.pubkey,
Expand Down
7 changes: 7 additions & 0 deletions accounts-db/src/accounts_file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,13 @@ impl AccountsFile {
}
}

pub(crate) fn dead_bytes_due_to_zero_lamport_single_ref(&self, count: usize) -> usize {
match self {
Self::AppendVec(av) => av.dead_bytes_due_to_zero_lamport_single_ref(count),
Self::TieredStorage(ts) => ts.dead_bytes_due_to_zero_lamport_single_ref(count),
}
}

pub fn flush(&self) -> Result<()> {
match self {
Self::AppendVec(av) => av.flush(),
Expand Down
1 change: 1 addition & 0 deletions accounts-db/src/ancient_append_vecs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -393,6 +393,7 @@ impl AccountsDb {
tuning: PackedAncientStorageTuning,
metrics: &mut ShrinkStatsSub,
) {
self.shrink_ancient_stats.slot.store(sorted_slots.first().cloned().unwrap_or_default(), Ordering::Relaxed);
self.shrink_ancient_stats
.slots_considered
.fetch_add(sorted_slots.len() as u64, Ordering::Relaxed);
Expand Down
4 changes: 4 additions & 0 deletions accounts-db/src/append_vec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -409,6 +409,10 @@ impl AppendVec {
}
}

pub fn dead_bytes_due_to_zero_lamport_single_ref(&self, count: usize) -> usize {
aligned_stored_size(0) * count
}

pub fn flush(&self) -> Result<()> {
match &self.backing {
AppendVecFileBacking::Mmap(mmap_only) => {
Expand Down
5 changes: 5 additions & 0 deletions accounts-db/src/tiered_storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,11 @@ impl TieredStorage {
self.reader()
.map_or(MAX_TIERED_STORAGE_FILE_SIZE, |reader| reader.capacity())
}

pub fn dead_bytes_due_to_zero_lamport_single_ref(&self, count: usize) -> usize {
// approximately 42 bytes per zero lamport account
count * 42
}
}

#[cfg(test)]
Expand Down
Loading