Skip to content

Commit

Permalink
retain producing correct write versions for geyser
Browse files Browse the repository at this point in the history
  • Loading branch information
brooksprumo committed Apr 1, 2024
1 parent 5a86917 commit ad35199
Show file tree
Hide file tree
Showing 3 changed files with 22 additions and 44 deletions.
62 changes: 22 additions & 40 deletions accounts-db/src/accounts_db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,7 @@ use {
crate::{
account_info::{AccountInfo, StorageLocation},
account_storage::{
meta::{
StorableAccountsWithHashesAndWriteVersions, StoredAccountMeta,
StoredMetaWriteVersion,
},
meta::{StorableAccountsWithHashesAndWriteVersions, StoredAccountMeta},
AccountStorage, AccountStorageStatus, ShrinkInProgress,
},
accounts_cache::{AccountsCache, CachedAccount, SlotCache},
Expand Down Expand Up @@ -392,7 +389,6 @@ impl CurrentAncientAppendVec {
(self.slot(), accounts, accounts_to_store.slot()),
None::<Vec<AccountHash>>,
self.append_vec(),
None,
StoreReclaims::Ignore,
);
let bytes_written =
Expand Down Expand Up @@ -3971,7 +3967,6 @@ impl AccountsDb {
(slot, &shrink_collect.alive_accounts.alive_accounts()[..]),
None::<Vec<AccountHash>>,
shrink_in_progress.new_storage(),
None,
StoreReclaims::Ignore,
);

Expand Down Expand Up @@ -6283,7 +6278,6 @@ impl AccountsDb {
(slot, &accounts[..]),
Some(hashes),
&flushed_store,
None,
StoreReclaims::Default,
));
store_accounts_timing = store_accounts_timing_inner;
Expand Down Expand Up @@ -6407,16 +6401,26 @@ impl AccountsDb {
}
}

fn write_accounts_to_cache<'a, 'b, T: ReadableAccount + Sync, P>(
fn write_accounts_to_cache<'a, 'b, T: ReadableAccount + Sync>(
&self,
slot: Slot,
accounts_and_meta_to_store: &impl StorableAccounts<'b, T>,
txn_iter: Box<dyn std::iter::Iterator<Item = &Option<&SanitizedTransaction>> + 'a>,
mut write_version_producer: P,
) -> Vec<AccountInfo>
where
P: Iterator<Item = u64>,
{
) -> Vec<AccountInfo> {
let mut write_version_producer: Box<dyn Iterator<Item = u64>> =
if self.accounts_update_notifier.is_some() {
let mut current_version = self
.write_version
.fetch_add(accounts_and_meta_to_store.len() as u64, Ordering::AcqRel);
Box::new(std::iter::from_fn(move || {
let ret = current_version;
current_version += 1;
Some(ret)
}))
} else {
Box::new(std::iter::empty())
};

txn_iter
.enumerate()
.map(|(i, txn)| {
Expand Down Expand Up @@ -6449,17 +6453,10 @@ impl AccountsDb {
.collect()
}

fn store_accounts_to<
'a: 'c,
'b,
'c,
P: Iterator<Item = u64>,
T: ReadableAccount + Sync + ZeroLamport + 'b,
>(
fn store_accounts_to<'a: 'c, 'b, 'c, T: ReadableAccount + Sync + ZeroLamport + 'b>(
&self,
accounts: &'c impl StorableAccounts<'b, T>,
hashes: Option<Vec<impl Borrow<AccountHash>>>,
mut write_version_producer: P,
store_to: &StoreTo,
transactions: Option<&[Option<&'a SanitizedTransaction>]>,
) -> Vec<AccountInfo> {
Expand All @@ -6485,7 +6482,7 @@ impl AccountsDb {
None => Box::new(std::iter::repeat(&None).take(accounts.len())),
};

self.write_accounts_to_cache(slot, accounts, txn_iter, write_version_producer)
self.write_accounts_to_cache(slot, accounts, txn_iter)
}
StoreTo::Storage(storage) => {
if accounts.has_hash_and_write_version() {
Expand All @@ -6497,9 +6494,7 @@ impl AccountsDb {
),
)
} else {
let write_versions = (0..accounts.len())
.map(|_| write_version_producer.next().unwrap())
.collect::<Vec<_>>();
let write_versions = vec![0; accounts.len()];
match hashes {
Some(hashes) => self.write_accounts_to_storage(
slot,
Expand Down Expand Up @@ -8398,7 +8393,6 @@ impl AccountsDb {
self.store_accounts_custom(
accounts,
hashes,
None::<Box<dyn Iterator<Item = u64>>>,
store_to,
reset_accounts,
transactions,
Expand All @@ -8412,7 +8406,6 @@ impl AccountsDb {
accounts: impl StorableAccounts<'a, T>,
hashes: Option<Vec<impl Borrow<AccountHash>>>,
storage: &Arc<AccountStorageEntry>,
write_version_producer: Option<Box<dyn Iterator<Item = StoredMetaWriteVersion>>>,
reclaim: StoreReclaims,
) -> StoreAccountsTiming {
// stores on a frozen slot should not reset
Expand All @@ -8422,7 +8415,6 @@ impl AccountsDb {
self.store_accounts_custom(
accounts,
hashes,
write_version_producer,
&StoreTo::Storage(storage),
reset_accounts,
None,
Expand All @@ -8435,27 +8427,17 @@ impl AccountsDb {
&self,
accounts: impl StorableAccounts<'a, T>,
hashes: Option<Vec<impl Borrow<AccountHash>>>,
write_version_producer: Option<Box<dyn Iterator<Item = u64>>>,
store_to: &StoreTo,
reset_accounts: bool,
transactions: Option<&[Option<&SanitizedTransaction>]>,
reclaim: StoreReclaims,
update_index_thread_selection: UpdateIndexThreadSelection,
) -> StoreAccountsTiming {
let write_version_producer: Box<dyn Iterator<Item = u64>> =
write_version_producer.unwrap_or_else(|| Box::new(std::iter::repeat(0)));

self.stats
.store_num_accounts
.fetch_add(accounts.len() as u64, Ordering::Relaxed);
let mut store_accounts_time = Measure::start("store_accounts");
let infos = self.store_accounts_to(
&accounts,
hashes,
write_version_producer,
store_to,
transactions,
);
let infos = self.store_accounts_to(&accounts, hashes, store_to, transactions);
store_accounts_time.stop();
self.stats
.store_accounts
Expand Down Expand Up @@ -9492,7 +9474,7 @@ pub mod tests {
super::*,
crate::{
account_info::StoredSize,
account_storage::meta::{AccountMeta, StoredMeta},
account_storage::meta::{AccountMeta, StoredMeta, StoredMetaWriteVersion},
accounts_hash::MERKLE_FANOUT,
accounts_index::{tests::*, AccountSecondaryIndexesIncludeExclude},
ancient_append_vecs,
Expand Down
1 change: 0 additions & 1 deletion accounts-db/src/ancient_append_vecs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -438,7 +438,6 @@ impl AccountsDb {
accounts_to_write,
None::<Vec<AccountHash>>,
shrink_in_progress.new_storage(),
None,
StoreReclaims::Ignore,
));

Expand Down
3 changes: 0 additions & 3 deletions runtime/src/snapshot_minimizer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -359,12 +359,10 @@ impl<'a> SnapshotMinimizer<'a> {
if aligned_total > 0 {
let mut accounts = Vec::with_capacity(keep_accounts.len());
let mut hashes = Vec::with_capacity(keep_accounts.len());
let mut write_versions = Vec::with_capacity(keep_accounts.len());

for alive_account in keep_accounts {
accounts.push(alive_account);
hashes.push(alive_account.hash());
write_versions.push(alive_account.write_version());
}

shrink_in_progress = Some(self.accounts_db().get_store_for_shrink(slot, aligned_total));
Expand All @@ -373,7 +371,6 @@ impl<'a> SnapshotMinimizer<'a> {
(slot, &accounts[..]),
Some(hashes),
new_storage,
Some(Box::new(write_versions.into_iter())),
StoreReclaims::Ignore,
);

Expand Down

0 comments on commit ad35199

Please sign in to comment.