Skip to content

Commit

Permalink
Remove more old rewards logic: helper fns; and fixup tests
Browse files Browse the repository at this point in the history
  • Loading branch information
CriesofCarrots committed Dec 19, 2024
1 parent e1376d0 commit db82c35
Show file tree
Hide file tree
Showing 2 changed files with 161 additions and 377 deletions.
347 changes: 2 additions & 345 deletions runtime/src/bank.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,6 @@ use {
serde_snapshot::BankIncrementalSnapshotPersistence,
snapshot_hash::SnapshotHash,
stake_account::StakeAccount,
stake_history::StakeHistory,
stake_weighted_timestamp::{
calculate_stake_weighted_timestamp, MaxAllowableDrift,
MAX_ALLOWABLE_DRIFT_PERCENTAGE_FAST, MAX_ALLOWABLE_DRIFT_PERCENTAGE_SLOW_V2,
Expand All @@ -65,8 +64,7 @@ use {
log::*,
rayon::{
iter::{IntoParallelIterator, IntoParallelRefIterator, ParallelIterator},
slice::ParallelSlice,
ThreadPool, ThreadPoolBuilder,
ThreadPoolBuilder,
},
serde::Serialize,
solana_accounts_db::{
Expand All @@ -88,7 +86,6 @@ use {
blockhash_queue::BlockhashQueue,
epoch_accounts_hash::EpochAccountsHash,
sorted_storages::SortedStorages,
stake_rewards::StakeReward,
storable_accounts::StorableAccounts,
},
solana_bpf_loader_program::syscalls::{
Expand Down Expand Up @@ -157,10 +154,7 @@ use {
},
transaction_context::{TransactionAccount, TransactionReturnData},
},
solana_stake_program::{
points::{InflationPointCalculationEvent, PointValue},
stake_state::StakeStateV2,
},
solana_stake_program::points::InflationPointCalculationEvent,
solana_svm::{
account_loader::{collect_rent_from_account, LoadedTransaction},
account_overrides::AccountOverrides,
Expand Down Expand Up @@ -2442,343 +2436,6 @@ impl Bank {
}
}

fn _load_vote_and_stake_accounts(
&self,
thread_pool: &ThreadPool,
reward_calc_tracer: Option<impl RewardCalcTracer>,
) -> LoadVoteAndStakeAccountsResult {
let stakes = self.stakes_cache.stakes();
let stake_delegations = self.filter_stake_delegations(&stakes);

// Obtain all unique voter pubkeys from stake delegations.
fn merge(mut acc: HashSet<Pubkey>, other: HashSet<Pubkey>) -> HashSet<Pubkey> {
if acc.len() < other.len() {
return merge(other, acc);
}
acc.extend(other);
acc
}
let voter_pubkeys = thread_pool.install(|| {
stake_delegations
.par_iter()
.fold(
HashSet::default,
|mut voter_pubkeys, (_stake_pubkey, stake_account)| {
voter_pubkeys.insert(stake_account.delegation().voter_pubkey);
voter_pubkeys
},
)
.reduce(HashSet::default, merge)
});
// Obtain vote-accounts for unique voter pubkeys.
let cached_vote_accounts = stakes.vote_accounts();
let solana_vote_program: Pubkey = solana_vote_program::id();
let vote_accounts_cache_miss_count = AtomicUsize::default();
let get_vote_account = |vote_pubkey: &Pubkey| -> Option<VoteAccount> {
if let Some(vote_account) = cached_vote_accounts.get(vote_pubkey) {
return Some(vote_account.clone());
}
// If accounts-db contains a valid vote account, then it should
// already have been cached in cached_vote_accounts; so the code
// below is only for sanity check, and can be removed once
// vote_accounts_cache_miss_count is shown to be always zero.
let account = self.get_account_with_fixed_root(vote_pubkey)?;
if account.owner() == &solana_vote_program
&& VoteState::deserialize(account.data()).is_ok()
{
vote_accounts_cache_miss_count.fetch_add(1, Relaxed);
}
VoteAccount::try_from(account).ok()
};
let invalid_vote_keys = DashMap::<Pubkey, InvalidCacheEntryReason>::new();
let make_vote_delegations_entry = |vote_pubkey| {
let Some(vote_account) = get_vote_account(&vote_pubkey) else {
invalid_vote_keys.insert(vote_pubkey, InvalidCacheEntryReason::Missing);
return None;
};
if vote_account.owner() != &solana_vote_program {
invalid_vote_keys.insert(vote_pubkey, InvalidCacheEntryReason::WrongOwner);
return None;
}
let vote_with_stake_delegations = VoteWithStakeDelegations {
vote_state: Arc::new(vote_account.vote_state().clone()),
vote_account: AccountSharedData::from(vote_account),
delegations: Vec::default(),
};
Some((vote_pubkey, vote_with_stake_delegations))
};
let vote_with_stake_delegations_map: DashMap<Pubkey, VoteWithStakeDelegations> =
thread_pool.install(|| {
voter_pubkeys
.into_par_iter()
.filter_map(make_vote_delegations_entry)
.collect()
});
// Join stake accounts with vote-accounts.
let push_stake_delegation = |(stake_pubkey, stake_account): (&Pubkey, &StakeAccount<_>)| {
let delegation = stake_account.delegation();
let Some(mut vote_delegations) =
vote_with_stake_delegations_map.get_mut(&delegation.voter_pubkey)
else {
return;
};
if let Some(reward_calc_tracer) = reward_calc_tracer.as_ref() {
let delegation =
InflationPointCalculationEvent::Delegation(*delegation, solana_vote_program);
let event = RewardCalculationEvent::Staking(stake_pubkey, &delegation);
reward_calc_tracer(&event);
}
let stake_delegation = (*stake_pubkey, stake_account.clone());
vote_delegations.delegations.push(stake_delegation);
};
thread_pool.install(|| {
stake_delegations
.into_par_iter()
.for_each(push_stake_delegation);
});
LoadVoteAndStakeAccountsResult {
vote_with_stake_delegations_map,
invalid_vote_keys,
vote_accounts_cache_miss_count: vote_accounts_cache_miss_count.into_inner(),
}
}

fn load_vote_and_stake_accounts(
&mut self,
thread_pool: &ThreadPool,
reward_calc_tracer: Option<impl RewardCalcTracer>,
metrics: &mut RewardsMetrics,
) -> VoteWithStakeDelegationsMap {
let (
LoadVoteAndStakeAccountsResult {
vote_with_stake_delegations_map,
invalid_vote_keys,
vote_accounts_cache_miss_count,
},
load_vote_and_stake_accounts_us,
) = measure_us!({
self._load_vote_and_stake_accounts(thread_pool, reward_calc_tracer.as_ref())
});
metrics
.load_vote_and_stake_accounts_us
.fetch_add(load_vote_and_stake_accounts_us, Relaxed);
metrics.vote_accounts_cache_miss_count += vote_accounts_cache_miss_count;
self.stakes_cache
.handle_invalid_keys(invalid_vote_keys, self.slot());
vote_with_stake_delegations_map
}

fn calculate_reward_points(
&self,
vote_with_stake_delegations_map: &VoteWithStakeDelegationsMap,
rewards: u64,
stake_history: &StakeHistory,
thread_pool: &ThreadPool,
metrics: &RewardsMetrics,
) -> Option<PointValue> {
let new_warmup_cooldown_rate_epoch = self.new_warmup_cooldown_rate_epoch();
let (points, calculate_points_us) = measure_us!(thread_pool.install(|| {
vote_with_stake_delegations_map
.par_iter()
.map(|entry| {
let VoteWithStakeDelegations {
vote_state,
delegations,
..
} = entry.value();

delegations
.par_iter()
.map(|(_stake_pubkey, stake_account)| {
solana_stake_program::points::calculate_points(
stake_account.stake_state(),
vote_state,
stake_history,
new_warmup_cooldown_rate_epoch,
)
.unwrap_or(0)
})
.sum::<u128>()
})
.sum()
}));
metrics
.calculate_points_us
.fetch_add(calculate_points_us, Relaxed);

(points > 0).then_some(PointValue { rewards, points })
}

fn redeem_rewards(
&self,
vote_with_stake_delegations_map: DashMap<Pubkey, VoteWithStakeDelegations>,
rewarded_epoch: Epoch,
point_value: PointValue,
stake_history: &StakeHistory,
thread_pool: &ThreadPool,
reward_calc_tracer: Option<impl RewardCalcTracer>,
metrics: &mut RewardsMetrics,
) -> (VoteRewards, StakeRewards) {
let new_warmup_cooldown_rate_epoch = self.new_warmup_cooldown_rate_epoch();
let vote_account_rewards: VoteRewards =
DashMap::with_capacity(vote_with_stake_delegations_map.len());
let stake_delegation_iterator = vote_with_stake_delegations_map.into_par_iter().flat_map(
|(
vote_pubkey,
VoteWithStakeDelegations {
vote_state,
vote_account,
delegations,
},
)| {
vote_account_rewards.insert(
vote_pubkey,
VoteReward {
vote_account,
commission: vote_state.commission,
vote_rewards: 0,
vote_needs_store: false,
},
);
delegations
.into_par_iter()
.map(move |delegation| (vote_pubkey, Arc::clone(&vote_state), delegation))
},
);

let (stake_rewards, redeem_rewards_us) = measure_us!(thread_pool.install(|| {
stake_delegation_iterator
.filter_map(|(vote_pubkey, vote_state, (stake_pubkey, stake_account))| {
// curry closure to add the contextual stake_pubkey
let reward_calc_tracer = reward_calc_tracer.as_ref().map(|outer| {
// inner
move |inner_event: &_| {
outer(&RewardCalculationEvent::Staking(&stake_pubkey, inner_event))
}
});
let (mut stake_account, stake_state) =
<(AccountSharedData, StakeStateV2)>::from(stake_account);
let redeemed = solana_stake_program::rewards::redeem_rewards(
rewarded_epoch,
stake_state,
&mut stake_account,
&vote_state,
&point_value,
stake_history,
reward_calc_tracer.as_ref(),
new_warmup_cooldown_rate_epoch,
);
if let Ok((stakers_reward, voters_reward)) = redeemed {
// track voter rewards
if let Some(VoteReward {
vote_account: _,
commission: _,
vote_rewards: vote_rewards_sum,
vote_needs_store,
}) = vote_account_rewards.get_mut(&vote_pubkey).as_deref_mut()
{
*vote_needs_store = true;
*vote_rewards_sum = vote_rewards_sum.saturating_add(voters_reward);
}

let post_balance = stake_account.lamports();
return Some(StakeReward {
stake_pubkey,
stake_reward_info: RewardInfo {
reward_type: RewardType::Staking,
lamports: i64::try_from(stakers_reward).unwrap(),
post_balance,
commission: Some(vote_state.commission),
},
stake_account,
});
} else {
debug!(
"solana_stake_program::rewards::redeem_rewards() failed for {}: {:?}",
stake_pubkey, redeemed
);
}
None
})
.collect()
}));
metrics.redeem_rewards_us += redeem_rewards_us;
(vote_account_rewards, stake_rewards)
}

fn store_stake_accounts(
&self,
thread_pool: &ThreadPool,
stake_rewards: &[StakeReward],
metrics: &RewardsMetrics,
) {
// store stake account even if stake_reward is 0
// because credits observed has changed
let now = Instant::now();
let slot = self.slot();
self.stakes_cache.update_stake_accounts(
thread_pool,
stake_rewards,
self.new_warmup_cooldown_rate_epoch(),
);
assert!(!self.freeze_started());
thread_pool.install(|| {
stake_rewards.par_chunks(512).for_each(|chunk| {
let to_store = (slot, chunk);
self.update_bank_hash_stats(&to_store);
self.rc.accounts.store_accounts_cached(to_store);
})
});
metrics
.store_stake_accounts_us
.fetch_add(now.elapsed().as_micros() as u64, Relaxed);
}

fn store_vote_accounts(
&self,
vote_account_rewards: VoteRewards,
metrics: &RewardsMetrics,
) -> Vec<(Pubkey, RewardInfo)> {
let (vote_rewards, store_vote_accounts_us) = measure_us!(vote_account_rewards
.into_iter()
.filter_map(
|(
vote_pubkey,
VoteReward {
mut vote_account,
commission,
vote_rewards,
vote_needs_store,
},
)| {
if let Err(err) = vote_account.checked_add_lamports(vote_rewards) {
debug!("reward redemption failed for {}: {:?}", vote_pubkey, err);
return None;
}

if vote_needs_store {
self.store_account(&vote_pubkey, &vote_account);
}

Some((
vote_pubkey,
RewardInfo {
reward_type: RewardType::Voting,
lamports: vote_rewards as i64,
post_balance: vote_account.lamports(),
commission: Some(commission),
},
))
},
)
.collect::<Vec<_>>());

metrics
.store_vote_accounts_us
.fetch_add(store_vote_accounts_us, Relaxed);
vote_rewards
}

/// return reward info for each vote account
/// return account data for each vote account that needs to be stored
/// This return value is a little awkward at the moment so that downstream existing code in the non-partitioned rewards code path can be re-used without duplication or modification.
Expand Down
Loading

0 comments on commit db82c35

Please sign in to comment.