Skip to content

Commit

Permalink
pr: parallelize calculate_delta_lt_hash()
Browse files Browse the repository at this point in the history
  • Loading branch information
brooksprumo committed Oct 4, 2024
1 parent 5b77101 commit 30db43f
Showing 1 changed file with 138 additions and 73 deletions.
211 changes: 138 additions & 73 deletions runtime/src/bank/accounts_lt_hash.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use {
super::Bank,
rayon::prelude::*,
solana_accounts_db::accounts_db::AccountsDb,
solana_lattice_hash::lt_hash::{Checksum as LtChecksum, LtHash},
solana_measure::{meas_dur, measure::Measure},
Expand All @@ -8,7 +9,7 @@ use {
pubkey::Pubkey,
},
solana_svm::transaction_processing_callback::AccountState,
std::time::Duration,
std::{ops::AddAssign, time::Duration},
};

impl Bank {
Expand Down Expand Up @@ -62,104 +63,168 @@ impl Bank {
// Get all the accounts stored in this slot.
// Since this bank is in the middle of being frozen, it hasn't been rooted.
// That means the accounts should all be in the write cache, and loading will be fast.
let (accounts_curr, loading_accounts_curr_time) = meas_dur!({
let (accounts_curr, time_loading_accounts_curr) = meas_dur!({
self.rc
.accounts
.accounts_db
.get_pubkey_hash_account_for_slot(slot)
});
let num_accounts_total = accounts_curr.len();

let mut num_accounts_unmodified = 0_usize;
let mut num_cache_misses = 0_usize;
let mut loading_accounts_prev_time = Duration::default();
let mut computing_hashes_time = Duration::default();
let mut mixing_hashes_time = Duration::default();
let mut delta_lt_hash = LtHash::identity();
let cache_for_accounts_lt_hash = self.cache_for_accounts_lt_hash.read().unwrap();
for curr in accounts_curr {
let pubkey = &curr.pubkey;

// load the initial state of the account
let (initial_state_of_account, measure_load) = meas_dur!({
match cache_for_accounts_lt_hash.get(pubkey) {
Some(initial_state_of_account) => initial_state_of_account.clone(),
None => {
num_cache_misses += 1;
// If the initial state of the account is not in the accounts lt hash cache,
// it is likely this account was stored *outside* of transaction processing
// (e.g. as part of rent collection). Do not populate the read cache,
// as this account likely will not be accessed again soon.
let account_slot = self
.rc
.accounts
.load_with_fixed_root_do_not_populate_read_cache(
&strictly_ancestors,
pubkey,
);
match account_slot {
Some((account, _slot)) => InitialStateOfAccount::Alive(account),
None => InitialStateOfAccount::Dead,
}
}
}
});
loading_accounts_prev_time += measure_load;

// mix out the previous version of the account
match initial_state_of_account {
InitialStateOfAccount::Dead => {
// nothing to do here
}
InitialStateOfAccount::Alive(prev_account) => {
if accounts_equal(&curr.account, &prev_account) {
// this account didn't actually change, so can skip it for lt hashing
num_accounts_unmodified += 1;
continue;
}
let (prev_lt_hash, measure_hashing) =
meas_dur!(AccountsDb::lt_hash_account(&prev_account, pubkey));
let (_, measure_mixing) = meas_dur!(delta_lt_hash.mix_out(&prev_lt_hash.0));
computing_hashes_time += measure_hashing;
mixing_hashes_time += measure_mixing;
}
#[derive(Debug, Default)]
struct Stats {
num_cache_misses: usize,
num_accounts_unmodified: usize,
time_loading_accounts_prev: Duration,
time_comparing_accounts: Duration,
time_computing_hashes: Duration,
time_mixing_hashes: Duration,
}
impl AddAssign for Stats {
fn add_assign(&mut self, other: Self) {
self.num_cache_misses += other.num_cache_misses;
self.num_accounts_unmodified += other.num_accounts_unmodified;
self.time_loading_accounts_prev += other.time_loading_accounts_prev;
self.time_comparing_accounts += other.time_comparing_accounts;
self.time_computing_hashes += other.time_computing_hashes;
self.time_mixing_hashes += other.time_mixing_hashes;
}

// mix in the new version of the account
let (curr_lt_hash, measure_hashing) =
meas_dur!(AccountsDb::lt_hash_account(&curr.account, pubkey));
let (_, measure_mixing) = meas_dur!(delta_lt_hash.mix_in(&curr_lt_hash.0));
computing_hashes_time += measure_hashing;
mixing_hashes_time += measure_mixing;
}
drop(cache_for_accounts_lt_hash);

let do_calculate_delta_lt_hash = || {
// Work on chunks of 128 pubkeys, which is 4 KiB.
// And 4 KiB is likely the smallest a real page size will be.
// And a single page is likely the smallest size a disk read will actually read.
// This can be tuned larger, but likely not smaller.
const CHUNK_SIZE: usize = 128;
let cache_for_accounts_lt_hash = self.cache_for_accounts_lt_hash.read().unwrap();
accounts_curr
.par_iter()
.fold_chunks(
CHUNK_SIZE,
|| (LtHash::identity(), Stats::default()),
|mut accum, elem| {
let pubkey = &elem.pubkey;
let curr_account = &elem.account;

// load the initial state of the account
let (initial_state_of_account, measure_load) = meas_dur!({
match cache_for_accounts_lt_hash.get(pubkey) {
Some(initial_state_of_account) => initial_state_of_account.clone(),
None => {
accum.1.num_cache_misses += 1;
// If the initial state of the account is not in the accounts
// lt hash cache, it is likely this account was stored
// *outside* of transaction processing (e.g. as part of rent
// collection). Do not populate the read cache, as this
// account likely will not be accessed again soon.
let account_slot = self
.rc
.accounts
.load_with_fixed_root_do_not_populate_read_cache(
&strictly_ancestors,
pubkey,
);
match account_slot {
Some((account, _slot)) => {
InitialStateOfAccount::Alive(account)
}
None => InitialStateOfAccount::Dead,
}
}
}
});
accum.1.time_loading_accounts_prev += measure_load;

// mix out the previous version of the account
match initial_state_of_account {
InitialStateOfAccount::Dead => {
// nothing to do here
}
InitialStateOfAccount::Alive(prev_account) => {
let (are_accounts_equal, measure_is_equal) =
meas_dur!(accounts_equal(curr_account, &prev_account));
accum.1.time_comparing_accounts += measure_is_equal;
if are_accounts_equal {
// this account didn't actually change, so skip it for lt hashing
accum.1.num_accounts_unmodified += 1;
return accum;
}
let (prev_lt_hash, measure_hashing) =
meas_dur!(AccountsDb::lt_hash_account(&prev_account, pubkey));
let (_, measure_mixing) =
meas_dur!(accum.0.mix_out(&prev_lt_hash.0));
accum.1.time_computing_hashes += measure_hashing;
accum.1.time_mixing_hashes += measure_mixing;
}
}

// mix in the new version of the account
let (curr_lt_hash, measure_hashing) =
meas_dur!(AccountsDb::lt_hash_account(curr_account, pubkey));
let (_, measure_mixing) = meas_dur!(accum.0.mix_in(&curr_lt_hash.0));
accum.1.time_computing_hashes += measure_hashing;
accum.1.time_mixing_hashes += measure_mixing;

accum
},
)
.reduce(
|| (LtHash::identity(), Stats::default()),
|mut accum, elem| {
accum.0.mix_in(&elem.0);
accum.1 += elem.1;
accum
},
)
};
let (delta_lt_hash, stats) = self
.rc
.accounts
.accounts_db
.thread_pool
.install(do_calculate_delta_lt_hash);

let total_time = measure_total.end_as_duration();
let num_accounts_modified = num_accounts_total.saturating_sub(num_accounts_unmodified);
let num_accounts_modified =
num_accounts_total.saturating_sub(stats.num_accounts_unmodified);
datapoint_info!(
"bank-accounts_lt_hash",
("slot", slot, i64),
("num_accounts_total", num_accounts_total, i64),
("num_accounts_modified", num_accounts_modified, i64),
("num_accounts_unmodified", num_accounts_unmodified, i64),
("num_cache_misses", num_cache_misses, i64),
("total_us", total_time.as_micros(), i64),
(
"loading_accounts_prev_us",
loading_accounts_prev_time.as_micros(),
"num_accounts_unmodified",
stats.num_accounts_unmodified,
i64
),
("num_cache_misses", stats.num_cache_misses, i64),
("total_us", total_time.as_micros(), i64),
(
"loading_accounts_curr_us",
loading_accounts_curr_time.as_micros(),
time_loading_accounts_curr.as_micros(),
i64
),
(
"par_loading_accounts_prev_us",
stats.time_loading_accounts_prev.as_micros(),
i64
),
(
"par_comparing_accounts_us",
stats.time_comparing_accounts.as_micros(),
i64
),
(
"par_computing_hashes_us",
stats.time_computing_hashes.as_micros(),
i64
),
(
"computing_hashes_us",
computing_hashes_time.as_micros(),
"par_mixing_hashes_us",
stats.time_mixing_hashes.as_micros(),
i64
),
("mixing_hashes_us", mixing_hashes_time.as_micros(), i64),
);

delta_lt_hash
Expand Down

0 comments on commit 30db43f

Please sign in to comment.