Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Sets write version to 0 when storing accounts #476

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
74 changes: 22 additions & 52 deletions accounts-db/src/accounts_db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,7 @@ use {
crate::{
account_info::{AccountInfo, StorageLocation},
account_storage::{
meta::{
StorableAccountsWithHashesAndWriteVersions, StoredAccountMeta,
StoredMetaWriteVersion,
},
meta::{StorableAccountsWithHashesAndWriteVersions, StoredAccountMeta},
AccountStorage, AccountStorageStatus, ShrinkInProgress,
},
accounts_cache::{AccountsCache, CachedAccount, SlotCache},
Expand Down Expand Up @@ -393,7 +390,6 @@ impl CurrentAncientAccountsFile {
(self.slot(), accounts, accounts_to_store.slot()),
None::<Vec<AccountHash>>,
self.accounts_file(),
None,
StoreReclaims::Ignore,
);
let bytes_written =
Expand Down Expand Up @@ -4004,7 +4000,6 @@ impl AccountsDb {
(slot, &shrink_collect.alive_accounts.alive_accounts()[..]),
None::<Vec<AccountHash>>,
shrink_in_progress.new_storage(),
None,
StoreReclaims::Ignore,
);

Expand Down Expand Up @@ -5976,11 +5971,6 @@ impl AccountsDb {
AccountHash(Hash::new_from_array(hasher.finalize().into()))
}

fn bulk_assign_write_version(&self, count: usize) -> StoredMetaWriteVersion {
self.write_version
.fetch_add(count as StoredMetaWriteVersion, Ordering::AcqRel)
}

fn write_accounts_to_storage<
'a,
'b,
Expand Down Expand Up @@ -6322,7 +6312,6 @@ impl AccountsDb {
(slot, &accounts[..]),
Some(hashes),
&flushed_store,
None,
StoreReclaims::Default,
));
store_accounts_timing = store_accounts_timing_inner;
Expand Down Expand Up @@ -6403,16 +6392,26 @@ impl AccountsDb {
}
}

fn write_accounts_to_cache<'a, 'b, T: ReadableAccount + Sync, P>(
fn write_accounts_to_cache<'a, 'b, T: ReadableAccount + Sync>(
&self,
slot: Slot,
accounts_and_meta_to_store: &impl StorableAccounts<'b, T>,
txn_iter: Box<dyn std::iter::Iterator<Item = &Option<&SanitizedTransaction>> + 'a>,
mut write_version_producer: P,
) -> Vec<AccountInfo>
where
P: Iterator<Item = u64>,
{
) -> Vec<AccountInfo> {
let mut write_version_producer: Box<dyn Iterator<Item = u64>> =
if self.accounts_update_notifier.is_some() {
let mut current_version = self
.write_version
.fetch_add(accounts_and_meta_to_store.len() as u64, Ordering::AcqRel);
Box::new(std::iter::from_fn(move || {
let ret = current_version;
current_version += 1;
Some(ret)
}))
} else {
Box::new(std::iter::empty())
};
Comment on lines +6401 to +6413
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The write_verison_producer is only used by notify_account_at_accounts_update(), below. This function only does anything when geyser is enabled (i.e. accounts_update_notifier is SOME). If geyser is disabled, then the write version producer is not used. Hence why the else case here does nothing.

And in the if case, we continue to increment write version as before, since geyser needs that behavior.

A nice fallout is that we can remove the write version producer from all function signatures now.


txn_iter
.enumerate()
.map(|(i, txn)| {
Expand Down Expand Up @@ -6445,17 +6444,10 @@ impl AccountsDb {
.collect()
}

fn store_accounts_to<
'a: 'c,
'b,
'c,
P: Iterator<Item = u64>,
T: ReadableAccount + Sync + ZeroLamport + 'b,
>(
fn store_accounts_to<'a: 'c, 'b, 'c, T: ReadableAccount + Sync + ZeroLamport + 'b>(
&self,
accounts: &'c impl StorableAccounts<'b, T>,
hashes: Option<Vec<impl Borrow<AccountHash>>>,
mut write_version_producer: P,
store_to: &StoreTo,
transactions: Option<&[Option<&'a SanitizedTransaction>]>,
) -> Vec<AccountInfo> {
Expand Down Expand Up @@ -6489,7 +6481,7 @@ impl AccountsDb {
None => Box::new(std::iter::repeat(&None).take(accounts.len())),
};

self.write_accounts_to_cache(slot, accounts, txn_iter, write_version_producer)
self.write_accounts_to_cache(slot, accounts, txn_iter)
}
StoreTo::Storage(storage) => {
if accounts.has_hash_and_write_version() {
Expand All @@ -6501,9 +6493,7 @@ impl AccountsDb {
),
)
} else {
let write_versions = (0..accounts.len())
.map(|_| write_version_producer.next().unwrap())
.collect::<Vec<_>>();
let write_versions = vec![0; accounts.len()];
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For now, we set the write versions here to a vector of zeroes. Subsequent PRs will remove the AndWriteVersion part from StorableAccountsWithHashesAndWriteVersions, which will allow removing this vector.

match hashes {
Some(hashes) => self.write_accounts_to_storage(
slot,
Expand Down Expand Up @@ -8402,7 +8392,6 @@ impl AccountsDb {
self.store_accounts_custom(
accounts,
hashes,
None::<Box<dyn Iterator<Item = u64>>>,
store_to,
reset_accounts,
transactions,
Expand All @@ -8416,7 +8405,6 @@ impl AccountsDb {
accounts: impl StorableAccounts<'a, T>,
hashes: Option<Vec<impl Borrow<AccountHash>>>,
storage: &Arc<AccountStorageEntry>,
write_version_producer: Option<Box<dyn Iterator<Item = StoredMetaWriteVersion>>>,
reclaim: StoreReclaims,
) -> StoreAccountsTiming {
// stores on a frozen slot should not reset
Expand All @@ -8426,7 +8414,6 @@ impl AccountsDb {
self.store_accounts_custom(
accounts,
hashes,
write_version_producer,
&StoreTo::Storage(storage),
reset_accounts,
None,
Expand All @@ -8439,34 +8426,17 @@ impl AccountsDb {
&self,
accounts: impl StorableAccounts<'a, T>,
hashes: Option<Vec<impl Borrow<AccountHash>>>,
write_version_producer: Option<Box<dyn Iterator<Item = u64>>>,
store_to: &StoreTo,
reset_accounts: bool,
transactions: Option<&[Option<&SanitizedTransaction>]>,
reclaim: StoreReclaims,
update_index_thread_selection: UpdateIndexThreadSelection,
) -> StoreAccountsTiming {
let write_version_producer: Box<dyn Iterator<Item = u64>> = write_version_producer
.unwrap_or_else(|| {
let mut current_version = self.bulk_assign_write_version(accounts.len());
Box::new(std::iter::from_fn(move || {
let ret = current_version;
current_version += 1;
Some(ret)
}))
});
brooksprumo marked this conversation as resolved.
Show resolved Hide resolved

self.stats
.store_num_accounts
.fetch_add(accounts.len() as u64, Ordering::Relaxed);
let mut store_accounts_time = Measure::start("store_accounts");
let infos = self.store_accounts_to(
&accounts,
hashes,
write_version_producer,
store_to,
transactions,
);
let infos = self.store_accounts_to(&accounts, hashes, store_to, transactions);
store_accounts_time.stop();
self.stats
.store_accounts
Expand Down Expand Up @@ -9503,7 +9473,7 @@ pub mod tests {
super::*,
crate::{
account_info::StoredSize,
account_storage::meta::{AccountMeta, StoredMeta},
account_storage::meta::{AccountMeta, StoredMeta, StoredMetaWriteVersion},
accounts_file::AccountsFileProvider,
accounts_hash::MERKLE_FANOUT,
accounts_index::{tests::*, AccountSecondaryIndexesIncludeExclude},
Expand Down
1 change: 0 additions & 1 deletion accounts-db/src/ancient_append_vecs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -438,7 +438,6 @@ impl AccountsDb {
accounts_to_write,
None::<Vec<AccountHash>>,
shrink_in_progress.new_storage(),
None,
StoreReclaims::Ignore,
));

Expand Down
3 changes: 0 additions & 3 deletions runtime/src/snapshot_minimizer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -359,12 +359,10 @@ impl<'a> SnapshotMinimizer<'a> {
if aligned_total > 0 {
let mut accounts = Vec::with_capacity(keep_accounts.len());
let mut hashes = Vec::with_capacity(keep_accounts.len());
let mut write_versions = Vec::with_capacity(keep_accounts.len());

for alive_account in keep_accounts {
accounts.push(alive_account);
hashes.push(alive_account.hash());
write_versions.push(alive_account.write_version());
}

shrink_in_progress = Some(self.accounts_db().get_store_for_shrink(slot, aligned_total));
Expand All @@ -373,7 +371,6 @@ impl<'a> SnapshotMinimizer<'a> {
(slot, &accounts[..]),
Some(hashes),
new_storage,
Some(Box::new(write_versions.into_iter())),
StoreReclaims::Ignore,
);

Expand Down
Loading