Skip to content

Commit

Permalink
retain producing correct write versions for geyser
Browse files Browse the repository at this point in the history
  • Loading branch information
brooksprumo committed Apr 2, 2024
1 parent 429c876 commit 90e134d
Show file tree
Hide file tree
Showing 3 changed files with 22 additions and 44 deletions.
62 changes: 22 additions & 40 deletions accounts-db/src/accounts_db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,7 @@ use {
crate::{
account_info::{AccountInfo, StorageLocation},
account_storage::{
meta::{
StorableAccountsWithHashesAndWriteVersions, StoredAccountMeta,
StoredMetaWriteVersion,
},
meta::{StorableAccountsWithHashesAndWriteVersions, StoredAccountMeta},
AccountStorage, AccountStorageStatus, ShrinkInProgress,
},
accounts_cache::{AccountsCache, CachedAccount, SlotCache},
Expand Down Expand Up @@ -393,7 +390,6 @@ impl CurrentAncientAccountsFile {
(self.slot(), accounts, accounts_to_store.slot()),
None::<Vec<AccountHash>>,
self.accounts_file(),
None,
StoreReclaims::Ignore,
);
let bytes_written =
Expand Down Expand Up @@ -4004,7 +4000,6 @@ impl AccountsDb {
(slot, &shrink_collect.alive_accounts.alive_accounts()[..]),
None::<Vec<AccountHash>>,
shrink_in_progress.new_storage(),
None,
StoreReclaims::Ignore,
);

Expand Down Expand Up @@ -6317,7 +6312,6 @@ impl AccountsDb {
(slot, &accounts[..]),
Some(hashes),
&flushed_store,
None,
StoreReclaims::Default,
));
store_accounts_timing = store_accounts_timing_inner;
Expand Down Expand Up @@ -6398,16 +6392,26 @@ impl AccountsDb {
}
}

fn write_accounts_to_cache<'a, 'b, T: ReadableAccount + Sync, P>(
fn write_accounts_to_cache<'a, 'b, T: ReadableAccount + Sync>(
&self,
slot: Slot,
accounts_and_meta_to_store: &impl StorableAccounts<'b, T>,
txn_iter: Box<dyn std::iter::Iterator<Item = &Option<&SanitizedTransaction>> + 'a>,
mut write_version_producer: P,
) -> Vec<AccountInfo>
where
P: Iterator<Item = u64>,
{
) -> Vec<AccountInfo> {
let mut write_version_producer: Box<dyn Iterator<Item = u64>> =
if self.accounts_update_notifier.is_some() {
let mut current_version = self
.write_version
.fetch_add(accounts_and_meta_to_store.len() as u64, Ordering::AcqRel);
Box::new(std::iter::from_fn(move || {
let ret = current_version;
current_version += 1;
Some(ret)
}))
} else {
Box::new(std::iter::empty())
};

txn_iter
.enumerate()
.map(|(i, txn)| {
Expand Down Expand Up @@ -6440,17 +6444,10 @@ impl AccountsDb {
.collect()
}

fn store_accounts_to<
'a: 'c,
'b,
'c,
P: Iterator<Item = u64>,
T: ReadableAccount + Sync + ZeroLamport + 'b,
>(
fn store_accounts_to<'a: 'c, 'b, 'c, T: ReadableAccount + Sync + ZeroLamport + 'b>(
&self,
accounts: &'c impl StorableAccounts<'b, T>,
hashes: Option<Vec<impl Borrow<AccountHash>>>,
mut write_version_producer: P,
store_to: &StoreTo,
transactions: Option<&[Option<&'a SanitizedTransaction>]>,
) -> Vec<AccountInfo> {
Expand Down Expand Up @@ -6484,7 +6481,7 @@ impl AccountsDb {
None => Box::new(std::iter::repeat(&None).take(accounts.len())),
};

self.write_accounts_to_cache(slot, accounts, txn_iter, write_version_producer)
self.write_accounts_to_cache(slot, accounts, txn_iter)
}
StoreTo::Storage(storage) => {
if accounts.has_hash_and_write_version() {
Expand All @@ -6496,9 +6493,7 @@ impl AccountsDb {
),
)
} else {
let write_versions = (0..accounts.len())
.map(|_| write_version_producer.next().unwrap())
.collect::<Vec<_>>();
let write_versions = vec![0; accounts.len()];
match hashes {
Some(hashes) => self.write_accounts_to_storage(
slot,
Expand Down Expand Up @@ -8397,7 +8392,6 @@ impl AccountsDb {
self.store_accounts_custom(
accounts,
hashes,
None::<Box<dyn Iterator<Item = u64>>>,
store_to,
reset_accounts,
transactions,
Expand All @@ -8411,7 +8405,6 @@ impl AccountsDb {
accounts: impl StorableAccounts<'a, T>,
hashes: Option<Vec<impl Borrow<AccountHash>>>,
storage: &Arc<AccountStorageEntry>,
write_version_producer: Option<Box<dyn Iterator<Item = StoredMetaWriteVersion>>>,
reclaim: StoreReclaims,
) -> StoreAccountsTiming {
// stores on a frozen slot should not reset
Expand All @@ -8421,7 +8414,6 @@ impl AccountsDb {
self.store_accounts_custom(
accounts,
hashes,
write_version_producer,
&StoreTo::Storage(storage),
reset_accounts,
None,
Expand All @@ -8434,27 +8426,17 @@ impl AccountsDb {
&self,
accounts: impl StorableAccounts<'a, T>,
hashes: Option<Vec<impl Borrow<AccountHash>>>,
write_version_producer: Option<Box<dyn Iterator<Item = u64>>>,
store_to: &StoreTo,
reset_accounts: bool,
transactions: Option<&[Option<&SanitizedTransaction>]>,
reclaim: StoreReclaims,
update_index_thread_selection: UpdateIndexThreadSelection,
) -> StoreAccountsTiming {
let write_version_producer: Box<dyn Iterator<Item = u64>> =
write_version_producer.unwrap_or_else(|| Box::new(std::iter::repeat(0)));

self.stats
.store_num_accounts
.fetch_add(accounts.len() as u64, Ordering::Relaxed);
let mut store_accounts_time = Measure::start("store_accounts");
let infos = self.store_accounts_to(
&accounts,
hashes,
write_version_producer,
store_to,
transactions,
);
let infos = self.store_accounts_to(&accounts, hashes, store_to, transactions);
store_accounts_time.stop();
self.stats
.store_accounts
Expand Down Expand Up @@ -9491,7 +9473,7 @@ pub mod tests {
super::*,
crate::{
account_info::StoredSize,
account_storage::meta::{AccountMeta, StoredMeta},
account_storage::meta::{AccountMeta, StoredMeta, StoredMetaWriteVersion},
accounts_file::AccountsFileProvider,
accounts_hash::MERKLE_FANOUT,
accounts_index::{tests::*, AccountSecondaryIndexesIncludeExclude},
Expand Down
1 change: 0 additions & 1 deletion accounts-db/src/ancient_append_vecs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -438,7 +438,6 @@ impl AccountsDb {
accounts_to_write,
None::<Vec<AccountHash>>,
shrink_in_progress.new_storage(),
None,
StoreReclaims::Ignore,
));

Expand Down
3 changes: 0 additions & 3 deletions runtime/src/snapshot_minimizer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -359,12 +359,10 @@ impl<'a> SnapshotMinimizer<'a> {
if aligned_total > 0 {
let mut accounts = Vec::with_capacity(keep_accounts.len());
let mut hashes = Vec::with_capacity(keep_accounts.len());
let mut write_versions = Vec::with_capacity(keep_accounts.len());

for alive_account in keep_accounts {
accounts.push(alive_account);
hashes.push(alive_account.hash());
write_versions.push(alive_account.write_version());
}

shrink_in_progress = Some(self.accounts_db().get_store_for_shrink(slot, aligned_total));
Expand All @@ -373,7 +371,6 @@ impl<'a> SnapshotMinimizer<'a> {
(slot, &accounts[..]),
Some(hashes),
new_storage,
Some(Box::new(write_versions.into_iter())),
StoreReclaims::Ignore,
);

Expand Down

0 comments on commit 90e134d

Please sign in to comment.