Skip to content

Commit

Permalink
Move distribution methods; partitioned epoch-rewards reorg, 3 of 5 (#528
Browse files Browse the repository at this point in the history
)

* Add distribution sub-submodule

* Move distribution methods to sub-submodule

* Move unit tests into distribution sub-submodule
  • Loading branch information
CriesofCarrots authored Apr 3, 2024
1 parent 64260fc commit 5d53389
Show file tree
Hide file tree
Showing 5 changed files with 401 additions and 368 deletions.
123 changes: 0 additions & 123 deletions runtime/src/bank.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1518,47 +1518,6 @@ impl Bank {
);
}

/// Process reward distribution for the block if it is inside reward interval.
fn distribute_partitioned_epoch_rewards(&mut self) {
let EpochRewardStatus::Active(status) = &self.epoch_reward_status else {
return;
};

let height = self.block_height();
let start_block_height = status.start_block_height;
let credit_start = start_block_height + self.get_reward_calculation_num_blocks();
let credit_end_exclusive = credit_start + status.stake_rewards_by_partition.len() as u64;
assert!(
self.epoch_schedule.get_slots_in_epoch(self.epoch)
> credit_end_exclusive.saturating_sub(credit_start)
);

if height >= credit_start && height < credit_end_exclusive {
let partition_index = height - credit_start;
self.distribute_epoch_rewards_in_partition(
&status.stake_rewards_by_partition,
partition_index,
);
}

if height.saturating_add(1) >= credit_end_exclusive {
datapoint_info!(
"epoch-rewards-status-update",
("slot", self.slot(), i64),
("block_height", height, i64),
("active", 0, i64),
("start_block_height", start_block_height, i64),
);

assert!(matches!(
self.epoch_reward_status,
EpochRewardStatus::Active(_)
));
self.epoch_reward_status = EpochRewardStatus::Inactive;
self.destroy_epoch_rewards_sysvar();
}
}

pub fn byte_limit_for_scans(&self) -> Option<usize> {
self.rc
.accounts
Expand Down Expand Up @@ -3208,39 +3167,6 @@ impl Bank {
.fetch_add(now.elapsed().as_micros() as u64, Relaxed);
}

/// store stake rewards in partition
/// return the sum of all the stored rewards
///
/// Note: even if staker's reward is 0, the stake account still needs to be stored because
/// credits observed has changed
fn store_stake_accounts_in_partition(&self, stake_rewards: &[StakeReward]) -> u64 {
// Verify that stake account `lamports + reward_amount` matches what we have in the
// rewarded account. This code will have a performance hit - an extra load and compare of
// the stake accounts. This is for debugging. Once we are confident, we can disable the
// check.
const VERIFY_REWARD_LAMPORT: bool = true;

if VERIFY_REWARD_LAMPORT {
for r in stake_rewards {
let stake_pubkey = r.stake_pubkey;
let reward_amount = r.get_stake_reward();
let post_stake_account = &r.stake_account;
if let Some(curr_stake_account) = self.get_account_with_fixed_root(&stake_pubkey) {
let pre_lamport = curr_stake_account.lamports();
let post_lamport = post_stake_account.lamports();
assert_eq!(pre_lamport + u64::try_from(reward_amount).unwrap(), post_lamport,
"stake account balance has changed since the reward calculation! account: {stake_pubkey}, pre balance: {pre_lamport}, post balance: {post_lamport}, rewards: {reward_amount}");
}
}
}

self.store_accounts((self.slot(), stake_rewards));
stake_rewards
.iter()
.map(|stake_reward| stake_reward.stake_reward_info.lamports)
.sum::<i64>() as u64
}

fn store_vote_accounts_partitioned(
&self,
vote_account_rewards: VoteRewardsAccounts,
Expand Down Expand Up @@ -3375,55 +3301,6 @@ impl Bank {
.for_each(|x| rewards.push((x.stake_pubkey, x.stake_reward_info)));
}

/// insert non-zero stake rewards to self.rewards
/// Return the number of rewards inserted
fn update_reward_history_in_partition(&self, stake_rewards: &[StakeReward]) -> usize {
let mut rewards = self.rewards.write().unwrap();
rewards.reserve(stake_rewards.len());
let initial_len = rewards.len();
stake_rewards
.iter()
.filter(|x| x.get_stake_reward() > 0)
.for_each(|x| rewards.push((x.stake_pubkey, x.stake_reward_info)));
rewards.len().saturating_sub(initial_len)
}

/// Process reward credits for a partition of rewards
/// Store the rewards to AccountsDB, update reward history record and total capitalization.
fn distribute_epoch_rewards_in_partition(
&self,
all_stake_rewards: &[Vec<StakeReward>],
partition_index: u64,
) {
let pre_capitalization = self.capitalization();
let this_partition_stake_rewards = &all_stake_rewards[partition_index as usize];

let (total_rewards_in_lamports, store_stake_accounts_us) =
measure_us!(self.store_stake_accounts_in_partition(this_partition_stake_rewards));

// increase total capitalization by the distributed rewards
self.capitalization
.fetch_add(total_rewards_in_lamports, Relaxed);

// decrease distributed capital from epoch rewards sysvar
self.update_epoch_rewards_sysvar(total_rewards_in_lamports);

// update reward history for this partitioned distribution
self.update_reward_history_in_partition(this_partition_stake_rewards);

let metrics = RewardsStoreMetrics {
pre_capitalization,
post_capitalization: self.capitalization(),
total_stake_accounts_count: all_stake_rewards.len(),
partition_index,
store_stake_accounts_us,
store_stake_accounts_count: this_partition_stake_rewards.len(),
distributed_rewards: total_rewards_in_lamports,
};

report_partitioned_reward_metrics(self, metrics);
}

fn update_recent_blockhashes_locked(&self, locked_blockhash_queue: &BlockhashQueue) {
#[allow(deprecated)]
self.update_sysvar_account(&sysvar::recent_blockhashes::id(), |account| {
Expand Down
Loading

0 comments on commit 5d53389

Please sign in to comment.