From 72db69ce0a1abefdf4635e544d7a90d726b2d21b Mon Sep 17 00:00:00 2001 From: brooks Date: Mon, 1 Apr 2024 10:29:37 -0400 Subject: [PATCH] retain producing correct write versions for geyser --- accounts-db/src/accounts_db.rs | 62 +++++++++----------------- accounts-db/src/ancient_append_vecs.rs | 1 - runtime/src/snapshot_minimizer.rs | 3 -- 3 files changed, 22 insertions(+), 44 deletions(-) diff --git a/accounts-db/src/accounts_db.rs b/accounts-db/src/accounts_db.rs index 56e2df36712149..dc1c0dc7448a35 100644 --- a/accounts-db/src/accounts_db.rs +++ b/accounts-db/src/accounts_db.rs @@ -26,10 +26,7 @@ use { crate::{ account_info::{AccountInfo, StorageLocation}, account_storage::{ - meta::{ - StorableAccountsWithHashesAndWriteVersions, StoredAccountMeta, - StoredMetaWriteVersion, - }, + meta::{StorableAccountsWithHashesAndWriteVersions, StoredAccountMeta}, AccountStorage, AccountStorageStatus, ShrinkInProgress, }, accounts_cache::{AccountsCache, CachedAccount, SlotCache}, @@ -393,7 +390,6 @@ impl CurrentAncientAccountsFile { (self.slot(), accounts, accounts_to_store.slot()), None::>, self.accounts_file(), - None, StoreReclaims::Ignore, ); let bytes_written = @@ -4004,7 +4000,6 @@ impl AccountsDb { (slot, &shrink_collect.alive_accounts.alive_accounts()[..]), None::>, shrink_in_progress.new_storage(), - None, StoreReclaims::Ignore, ); @@ -6317,7 +6312,6 @@ impl AccountsDb { (slot, &accounts[..]), Some(hashes), &flushed_store, - None, StoreReclaims::Default, )); store_accounts_timing = store_accounts_timing_inner; @@ -6398,16 +6392,26 @@ impl AccountsDb { } } - fn write_accounts_to_cache<'a, 'b, T: ReadableAccount + Sync, P>( + fn write_accounts_to_cache<'a, 'b, T: ReadableAccount + Sync>( &self, slot: Slot, accounts_and_meta_to_store: &impl StorableAccounts<'b, T>, txn_iter: Box> + 'a>, - mut write_version_producer: P, - ) -> Vec - where - P: Iterator, - { + ) -> Vec { + let mut write_version_producer: Box> = + if self.accounts_update_notifier.is_some() { + let mut current_version = self + .write_version + .fetch_add(accounts_and_meta_to_store.len() as u64, Ordering::AcqRel); + Box::new(std::iter::from_fn(move || { + let ret = current_version; + current_version += 1; + Some(ret) + })) + } else { + Box::new(std::iter::empty()) + }; + txn_iter .enumerate() .map(|(i, txn)| { @@ -6440,17 +6444,10 @@ impl AccountsDb { .collect() } - fn store_accounts_to< - 'a: 'c, - 'b, - 'c, - P: Iterator, - T: ReadableAccount + Sync + ZeroLamport + 'b, - >( + fn store_accounts_to<'a: 'c, 'b, 'c, T: ReadableAccount + Sync + ZeroLamport + 'b>( &self, accounts: &'c impl StorableAccounts<'b, T>, hashes: Option>>, - mut write_version_producer: P, store_to: &StoreTo, transactions: Option<&[Option<&'a SanitizedTransaction>]>, ) -> Vec { @@ -6484,7 +6481,7 @@ impl AccountsDb { None => Box::new(std::iter::repeat(&None).take(accounts.len())), }; - self.write_accounts_to_cache(slot, accounts, txn_iter, write_version_producer) + self.write_accounts_to_cache(slot, accounts, txn_iter) } StoreTo::Storage(storage) => { if accounts.has_hash_and_write_version() { @@ -6496,9 +6493,7 @@ impl AccountsDb { ), ) } else { - let write_versions = (0..accounts.len()) - .map(|_| write_version_producer.next().unwrap()) - .collect::>(); + let write_versions = vec![0; accounts.len()]; match hashes { Some(hashes) => self.write_accounts_to_storage( slot, @@ -8397,7 +8392,6 @@ impl AccountsDb { self.store_accounts_custom( accounts, hashes, - None::>>, store_to, reset_accounts, transactions, @@ -8411,7 +8405,6 @@ impl AccountsDb { accounts: impl StorableAccounts<'a, T>, hashes: Option>>, storage: &Arc, - write_version_producer: Option>>, reclaim: StoreReclaims, ) -> StoreAccountsTiming { // stores on a frozen slot should not reset @@ -8421,7 +8414,6 @@ impl AccountsDb { self.store_accounts_custom( accounts, hashes, - write_version_producer, &StoreTo::Storage(storage), reset_accounts, None, @@ -8434,27 +8426,17 @@ impl AccountsDb { &self, accounts: impl StorableAccounts<'a, T>, hashes: Option>>, - write_version_producer: Option>>, store_to: &StoreTo, reset_accounts: bool, transactions: Option<&[Option<&SanitizedTransaction>]>, reclaim: StoreReclaims, update_index_thread_selection: UpdateIndexThreadSelection, ) -> StoreAccountsTiming { - let write_version_producer: Box> = - write_version_producer.unwrap_or_else(|| Box::new(std::iter::repeat(0))); - self.stats .store_num_accounts .fetch_add(accounts.len() as u64, Ordering::Relaxed); let mut store_accounts_time = Measure::start("store_accounts"); - let infos = self.store_accounts_to( - &accounts, - hashes, - write_version_producer, - store_to, - transactions, - ); + let infos = self.store_accounts_to(&accounts, hashes, store_to, transactions); store_accounts_time.stop(); self.stats .store_accounts @@ -9491,7 +9473,7 @@ pub mod tests { super::*, crate::{ account_info::StoredSize, - account_storage::meta::{AccountMeta, StoredMeta}, + account_storage::meta::{AccountMeta, StoredMeta, StoredMetaWriteVersion}, accounts_hash::MERKLE_FANOUT, accounts_index::{tests::*, AccountSecondaryIndexesIncludeExclude}, ancient_append_vecs, diff --git a/accounts-db/src/ancient_append_vecs.rs b/accounts-db/src/ancient_append_vecs.rs index f83f16e121a734..c4df48f0447593 100644 --- a/accounts-db/src/ancient_append_vecs.rs +++ b/accounts-db/src/ancient_append_vecs.rs @@ -438,7 +438,6 @@ impl AccountsDb { accounts_to_write, None::>, shrink_in_progress.new_storage(), - None, StoreReclaims::Ignore, )); diff --git a/runtime/src/snapshot_minimizer.rs b/runtime/src/snapshot_minimizer.rs index 009444b962ed48..e39c298fb1aaaf 100644 --- a/runtime/src/snapshot_minimizer.rs +++ b/runtime/src/snapshot_minimizer.rs @@ -359,12 +359,10 @@ impl<'a> SnapshotMinimizer<'a> { if aligned_total > 0 { let mut accounts = Vec::with_capacity(keep_accounts.len()); let mut hashes = Vec::with_capacity(keep_accounts.len()); - let mut write_versions = Vec::with_capacity(keep_accounts.len()); for alive_account in keep_accounts { accounts.push(alive_account); hashes.push(alive_account.hash()); - write_versions.push(alive_account.write_version()); } shrink_in_progress = Some(self.accounts_db().get_store_for_shrink(slot, aligned_total)); @@ -373,7 +371,6 @@ impl<'a> SnapshotMinimizer<'a> { (slot, &accounts[..]), Some(hashes), new_storage, - Some(Box::new(write_versions.into_iter())), StoreReclaims::Ignore, );