diff --git a/core/src/banking_stage/latest_unprocessed_votes.rs b/core/src/banking_stage/latest_unprocessed_votes.rs index c126d1a40ed6b4..48c09cefe13de4 100644 --- a/core/src/banking_stage/latest_unprocessed_votes.rs +++ b/core/src/banking_stage/latest_unprocessed_votes.rs @@ -6,7 +6,7 @@ use { itertools::Itertools, rand::{thread_rng, Rng}, solana_perf::packet::Packet, - solana_runtime::bank::Bank, + solana_runtime::{bank::Bank, epoch_stakes::EpochStakes}, solana_sdk::{ account::from_account, clock::{Slot, UnixTimestamp}, @@ -39,7 +39,7 @@ pub enum VoteSource { #[derive(Debug, Clone)] pub struct LatestValidatorVotePacket { vote_source: VoteSource, - pubkey: Pubkey, + vote_pubkey: Pubkey, vote: Option>, slot: Slot, hash: Hash, @@ -87,11 +87,8 @@ impl LatestValidatorVotePacket { Ok(vote_state_update_instruction) if instruction_filter(&vote_state_update_instruction) => { - let &pubkey = message - .message - .static_account_keys() - .first() - .ok_or(DeserializedPacketError::VoteTransactionError)?; + let vote_pubkey = + message.message.static_account_keys()[instruction.accounts[0] as usize]; let slot = vote_state_update_instruction.last_voted_slot().unwrap_or(0); let hash = vote_state_update_instruction.hash(); let timestamp = vote_state_update_instruction.timestamp(); @@ -100,7 +97,7 @@ impl LatestValidatorVotePacket { vote: Some(vote), slot, hash, - pubkey, + vote_pubkey, vote_source, forwarded: false, timestamp, @@ -114,8 +111,8 @@ impl LatestValidatorVotePacket { self.vote.as_ref().unwrap().clone() } - pub fn pubkey(&self) -> Pubkey { - self.pubkey + pub fn vote_pubkey(&self) -> Pubkey { + self.vote_pubkey } pub fn slot(&self) -> Slot { @@ -150,12 +147,12 @@ pub(crate) struct VoteBatchInsertionMetrics { pub(crate) num_dropped_tpu: usize, } -#[derive(Debug, Default)] +#[derive(Debug)] pub struct LatestUnprocessedVotes { latest_votes_per_pubkey: RwLock>>>, num_unprocessed_votes: AtomicUsize, // These are only ever written to by the tpu vote thread - cached_staked_nodes: RwLock>>, + cached_epoch_stakes: RwLock, deprecate_legacy_vote_ixs: AtomicBool, current_epoch: AtomicU64, } @@ -166,10 +163,30 @@ impl LatestUnprocessedVotes { .feature_set .is_active(&feature_set::deprecate_legacy_vote_ixs::id()); Self { - cached_staked_nodes: RwLock::new(bank.current_epoch_staked_nodes().clone()), + latest_votes_per_pubkey: RwLock::new(HashMap::default()), + num_unprocessed_votes: AtomicUsize::new(0), + cached_epoch_stakes: RwLock::new(bank.current_epoch_stakes().clone()), current_epoch: AtomicU64::new(bank.epoch()), deprecate_legacy_vote_ixs: AtomicBool::new(deprecate_legacy_vote_ixs), - ..Self::default() + } + } + + #[cfg(test)] + pub fn new_for_tests(vote_pubkeys_to_stake: &[Pubkey]) -> Self { + use solana_vote::vote_account::VoteAccount; + + let vote_accounts = vote_pubkeys_to_stake + .iter() + .map(|pubkey| (*pubkey, (1u64, VoteAccount::new_random()))) + .collect(); + let epoch_stakes = EpochStakes::new_for_tests(vote_accounts, 0); + + Self { + latest_votes_per_pubkey: RwLock::new(HashMap::default()), + num_unprocessed_votes: AtomicUsize::new(0), + cached_epoch_stakes: RwLock::new(epoch_stakes), + current_epoch: AtomicU64::new(0), + deprecate_legacy_vote_ixs: AtomicBool::new(true), } } @@ -185,9 +202,9 @@ impl LatestUnprocessedVotes { &'a self, votes: impl Iterator + 'a, ) -> impl Iterator + 'a { - let staked_nodes = self.cached_staked_nodes.read().unwrap(); + let epoch_stakes = self.cached_epoch_stakes.read().unwrap(); votes.filter(move |vote| { - let stake = staked_nodes.get(&vote.pubkey()).copied().unwrap_or(0); + let stake = epoch_stakes.vote_account_stake(&vote.vote_pubkey()); stake > 0 }) } @@ -231,7 +248,7 @@ impl LatestUnprocessedVotes { vote: LatestValidatorVotePacket, should_replenish_taken_votes: bool, ) -> Option { - let pubkey = vote.pubkey(); + let pubkey = vote.vote_pubkey(); let slot = vote.slot(); let timestamp = vote.timestamp(); @@ -311,21 +328,14 @@ impl LatestUnprocessedVotes { .and_then(|l| l.read().unwrap().timestamp()) } - #[cfg(test)] - pub(crate) fn set_staked_nodes(&self, staked_nodes: &[Pubkey]) { - let staked_nodes: HashMap = - staked_nodes.iter().map(|pk| (*pk, 1u64)).collect(); - *self.cached_staked_nodes.write().unwrap() = Arc::new(staked_nodes); - } - fn weighted_random_order_by_stake(&self) -> impl Iterator { // Efraimidis and Spirakis algo for weighted random sample without replacement - let staked_nodes = self.cached_staked_nodes.read().unwrap(); + let epoch_stakes = self.cached_epoch_stakes.read().unwrap(); let latest_votes_per_pubkey = self.latest_votes_per_pubkey.read().unwrap(); let mut pubkey_with_weight: Vec<(f64, Pubkey)> = latest_votes_per_pubkey .keys() .filter_map(|&pubkey| { - let stake = staked_nodes.get(&pubkey).copied().unwrap_or(0); + let stake = epoch_stakes.vote_account_stake(&pubkey); if stake == 0 { None // Ignore votes from unstaked validators } else { @@ -343,8 +353,8 @@ impl LatestUnprocessedVotes { if bank.epoch() <= self.current_epoch.load(Ordering::Relaxed) { return; } - let mut staked_nodes = self.cached_staked_nodes.write().unwrap(); - *staked_nodes = bank.current_epoch_staked_nodes().clone(); + let mut epoch_stakes = self.cached_epoch_stakes.write().unwrap(); + *epoch_stakes = bank.current_epoch_stakes().clone(); self.current_epoch.store(bank.epoch(), Ordering::Relaxed); self.deprecate_legacy_vote_ixs.store( bank.feature_set @@ -355,12 +365,9 @@ impl LatestUnprocessedVotes { // Evict any now unstaked pubkeys let mut latest_votes_per_pubkey = self.latest_votes_per_pubkey.write().unwrap(); let mut unstaked_votes = 0; - latest_votes_per_pubkey.retain(|pubkey, vote| { + latest_votes_per_pubkey.retain(|vote_pubkey, vote| { let is_present = !vote.read().unwrap().is_vote_taken(); - let should_evict = match staked_nodes.get(pubkey) { - None => true, - Some(stake) => *stake == 0, - }; + let should_evict = epoch_stakes.vote_account_stake(vote_pubkey) == 0; if is_present && should_evict { unstaked_votes += 1; } @@ -611,12 +618,12 @@ mod tests { assert_eq!(VoteSource::Gossip, deserialized_packets[1].vote_source); assert_eq!( - keypairs.node_keypair.pubkey(), - deserialized_packets[0].pubkey + keypairs.vote_keypair.pubkey(), + deserialized_packets[0].vote_pubkey ); assert_eq!( - keypairs.node_keypair.pubkey(), - deserialized_packets[1].pubkey + keypairs.vote_keypair.pubkey(), + deserialized_packets[1].vote_pubkey ); assert!(deserialized_packets[0].vote.is_some()); @@ -625,12 +632,11 @@ mod tests { #[test] fn test_update_latest_vote() { - let latest_unprocessed_votes = LatestUnprocessedVotes::default(); let keypair_a = ValidatorVoteKeypairs::new_rand(); let keypair_b = ValidatorVoteKeypairs::new_rand(); - latest_unprocessed_votes.set_staked_nodes(&[ - keypair_a.node_keypair.pubkey(), - keypair_b.node_keypair.pubkey(), + let latest_unprocessed_votes = LatestUnprocessedVotes::new_for_tests(&[ + keypair_a.vote_keypair.pubkey(), + keypair_b.vote_keypair.pubkey(), ]); let vote_a = from_slots(vec![(0, 2), (1, 1)], VoteSource::Gossip, &keypair_a, None); @@ -651,11 +657,11 @@ mod tests { assert_eq!( Some(1), - latest_unprocessed_votes.get_latest_vote_slot(keypair_a.node_keypair.pubkey()) + latest_unprocessed_votes.get_latest_vote_slot(keypair_a.vote_keypair.pubkey()) ); assert_eq!( Some(9), - latest_unprocessed_votes.get_latest_vote_slot(keypair_b.node_keypair.pubkey()) + latest_unprocessed_votes.get_latest_vote_slot(keypair_b.vote_keypair.pubkey()) ); let vote_a = from_slots( @@ -710,13 +716,13 @@ mod tests { assert_eq!( 10, latest_unprocessed_votes - .get_latest_vote_slot(keypair_a.node_keypair.pubkey()) + .get_latest_vote_slot(keypair_a.vote_keypair.pubkey()) .unwrap() ); assert_eq!( 9, latest_unprocessed_votes - .get_latest_vote_slot(keypair_b.node_keypair.pubkey()) + .get_latest_vote_slot(keypair_b.vote_keypair.pubkey()) .unwrap() ); @@ -739,11 +745,11 @@ mod tests { assert_eq!(2, latest_unprocessed_votes.len()); assert_eq!( Some(1), - latest_unprocessed_votes.get_latest_timestamp(keypair_a.node_keypair.pubkey()) + latest_unprocessed_votes.get_latest_timestamp(keypair_a.vote_keypair.pubkey()) ); assert_eq!( Some(2), - latest_unprocessed_votes.get_latest_timestamp(keypair_b.node_keypair.pubkey()) + latest_unprocessed_votes.get_latest_timestamp(keypair_b.vote_keypair.pubkey()) ); // Same votes with bigger timestamps should override @@ -765,11 +771,11 @@ mod tests { assert_eq!(2, latest_unprocessed_votes.len()); assert_eq!( Some(5), - latest_unprocessed_votes.get_latest_timestamp(keypair_a.node_keypair.pubkey()) + latest_unprocessed_votes.get_latest_timestamp(keypair_a.vote_keypair.pubkey()) ); assert_eq!( Some(6), - latest_unprocessed_votes.get_latest_timestamp(keypair_b.node_keypair.pubkey()) + latest_unprocessed_votes.get_latest_timestamp(keypair_b.vote_keypair.pubkey()) ); // Same votes with smaller timestamps should not override @@ -793,11 +799,11 @@ mod tests { assert_eq!(2, latest_unprocessed_votes.len()); assert_eq!( Some(5), - latest_unprocessed_votes.get_latest_timestamp(keypair_a.node_keypair.pubkey()) + latest_unprocessed_votes.get_latest_timestamp(keypair_a.vote_keypair.pubkey()) ); assert_eq!( Some(6), - latest_unprocessed_votes.get_latest_timestamp(keypair_b.node_keypair.pubkey()) + latest_unprocessed_votes.get_latest_timestamp(keypair_b.vote_keypair.pubkey()) ); // Drain all latest votes @@ -832,8 +838,6 @@ mod tests { fn test_update_latest_vote_race() { // There was a race condition in updating the same pubkey in the hashmap // when the entry does not initially exist. - let latest_unprocessed_votes = Arc::new(LatestUnprocessedVotes::default()); - const NUM_VOTES: usize = 100; let keypairs = Arc::new( (0..NUM_VOTES) @@ -842,9 +846,10 @@ mod tests { ); let staked_nodes = keypairs .iter() - .map(|kp| kp.node_keypair.pubkey()) + .map(|kp| kp.vote_keypair.pubkey()) .collect_vec(); - latest_unprocessed_votes.set_staked_nodes(&staked_nodes); + let latest_unprocessed_votes = + Arc::new(LatestUnprocessedVotes::new_for_tests(&staked_nodes)); // Insert votes in parallel let insert_vote = |latest_unprocessed_votes: &LatestUnprocessedVotes, @@ -876,8 +881,6 @@ mod tests { #[test] fn test_simulate_threads() { - let latest_unprocessed_votes = Arc::new(LatestUnprocessedVotes::default()); - let latest_unprocessed_votes_tpu = latest_unprocessed_votes.clone(); let keypairs = Arc::new( (0..10) .map(|_| ValidatorVoteKeypairs::new_rand()) @@ -886,9 +889,11 @@ mod tests { let keypairs_tpu = keypairs.clone(); let staked_nodes = keypairs .iter() - .map(|kp| kp.node_keypair.pubkey()) + .map(|kp| kp.vote_keypair.pubkey()) .collect_vec(); - latest_unprocessed_votes.set_staked_nodes(&staked_nodes); + let latest_unprocessed_votes = + Arc::new(LatestUnprocessedVotes::new_for_tests(&staked_nodes)); + let latest_unprocessed_votes_tpu = latest_unprocessed_votes.clone(); let vote_limit = 1000; let gossip = Builder::new() @@ -944,7 +949,7 @@ mod tests { #[test] fn test_forwardable_packets() { - let latest_unprocessed_votes = LatestUnprocessedVotes::default(); + let latest_unprocessed_votes = LatestUnprocessedVotes::new_for_tests(&[]); let bank_0 = Bank::new_for_tests(&GenesisConfig::default()); let mut bank = Bank::new_from_parent( Arc::new(bank_0), @@ -983,12 +988,9 @@ mod tests { .count() ); - let config = genesis_utils::create_genesis_config_with_leader( - 100, - &keypair_a.node_keypair.pubkey(), - 200, - ) - .genesis_config; + let config = + genesis_utils::create_genesis_config_with_vote_accounts(100, &[keypair_a], vec![200]) + .genesis_config; let bank_0 = Bank::new_for_tests(&config); let bank = Bank::new_from_parent( Arc::new(bank_0), @@ -1018,12 +1020,9 @@ mod tests { .count() ); - let config = genesis_utils::create_genesis_config_with_leader( - 100, - &keypair_b.node_keypair.pubkey(), - 200, - ) - .genesis_config; + let config = + genesis_utils::create_genesis_config_with_vote_accounts(100, &[keypair_b], vec![200]) + .genesis_config; let bank_0 = Bank::new_for_tests(&config); let bank = Arc::new(Bank::new_from_parent( Arc::new(bank_0), @@ -1069,16 +1068,15 @@ mod tests { #[test] fn test_clear_forwarded_packets() { - let latest_unprocessed_votes = LatestUnprocessedVotes::default(); let keypair_a = ValidatorVoteKeypairs::new_rand(); let keypair_b = ValidatorVoteKeypairs::new_rand(); let keypair_c = ValidatorVoteKeypairs::new_rand(); let keypair_d = ValidatorVoteKeypairs::new_rand(); - latest_unprocessed_votes.set_staked_nodes(&[ - keypair_a.node_keypair.pubkey(), - keypair_b.node_keypair.pubkey(), - keypair_c.node_keypair.pubkey(), - keypair_d.node_keypair.pubkey(), + let latest_unprocessed_votes = LatestUnprocessedVotes::new_for_tests(&[ + keypair_a.vote_keypair.pubkey(), + keypair_b.vote_keypair.pubkey(), + keypair_c.vote_keypair.pubkey(), + keypair_d.vote_keypair.pubkey(), ]); let vote_a = from_slots(vec![(1, 1)], VoteSource::Gossip, &keypair_a, None); @@ -1098,19 +1096,19 @@ mod tests { assert_eq!( Some(1), - latest_unprocessed_votes.get_latest_vote_slot(keypair_a.node_keypair.pubkey()) + latest_unprocessed_votes.get_latest_vote_slot(keypair_a.vote_keypair.pubkey()) ); assert_eq!( Some(2), - latest_unprocessed_votes.get_latest_vote_slot(keypair_b.node_keypair.pubkey()) + latest_unprocessed_votes.get_latest_vote_slot(keypair_b.vote_keypair.pubkey()) ); assert_eq!( Some(3), - latest_unprocessed_votes.get_latest_vote_slot(keypair_c.node_keypair.pubkey()) + latest_unprocessed_votes.get_latest_vote_slot(keypair_c.vote_keypair.pubkey()) ); assert_eq!( Some(4), - latest_unprocessed_votes.get_latest_vote_slot(keypair_d.node_keypair.pubkey()) + latest_unprocessed_votes.get_latest_vote_slot(keypair_d.vote_keypair.pubkey()) ); } @@ -1141,12 +1139,9 @@ mod tests { assert!(latest_unprocessed_votes.is_empty()); // Bank in same epoch should not update stakes - let config = genesis_utils::create_genesis_config_with_leader( - 100, - &keypair_a.node_keypair.pubkey(), - 200, - ) - .genesis_config; + let config = + genesis_utils::create_genesis_config_with_vote_accounts(100, &[&keypair_a], vec![200]) + .genesis_config; let bank_0 = Bank::new_for_tests(&config); let bank = Bank::new_from_parent( Arc::new(bank_0), @@ -1159,12 +1154,9 @@ mod tests { assert!(latest_unprocessed_votes.is_empty()); // Bank in next epoch should update stakes - let config = genesis_utils::create_genesis_config_with_leader( - 100, - &keypair_b.node_keypair.pubkey(), - 200, - ) - .genesis_config; + let config = + genesis_utils::create_genesis_config_with_vote_accounts(100, &[&keypair_b], vec![200]) + .genesis_config; let bank_0 = Bank::new_for_tests(&config); let bank = Bank::new_from_parent( Arc::new(bank_0), @@ -1176,17 +1168,14 @@ mod tests { latest_unprocessed_votes.insert_batch(votes.clone(), true); assert_eq!(latest_unprocessed_votes.len(), 1); assert_eq!( - latest_unprocessed_votes.get_latest_vote_slot(keypair_b.node_keypair.pubkey()), + latest_unprocessed_votes.get_latest_vote_slot(keypair_b.vote_keypair.pubkey()), Some(vote_b.slot()) ); // Previously unstaked votes are removed - let config = genesis_utils::create_genesis_config_with_leader( - 100, - &keypair_c.node_keypair.pubkey(), - 200, - ) - .genesis_config; + let config = + genesis_utils::create_genesis_config_with_vote_accounts(100, &[&keypair_c], vec![200]) + .genesis_config; let bank_0 = Bank::new_for_tests(&config); let bank = Bank::new_from_parent( Arc::new(bank_0), @@ -1199,7 +1188,7 @@ mod tests { latest_unprocessed_votes.insert_batch(votes.clone(), true); assert_eq!(latest_unprocessed_votes.len(), 1); assert_eq!( - latest_unprocessed_votes.get_latest_vote_slot(keypair_c.node_keypair.pubkey()), + latest_unprocessed_votes.get_latest_vote_slot(keypair_c.vote_keypair.pubkey()), Some(vote_c.slot()) ); } diff --git a/core/src/banking_stage/unprocessed_transaction_storage.rs b/core/src/banking_stage/unprocessed_transaction_storage.rs index 1ee7363e0d1924..7b7b92b2f33a2f 100644 --- a/core/src/banking_stage/unprocessed_transaction_storage.rs +++ b/core/src/banking_stage/unprocessed_transaction_storage.rs @@ -1277,10 +1277,12 @@ mod tests { .into_iter() .flat_map(|vs| [(vs, true), (vs, false)]) { - let latest_unprocessed_votes = LatestUnprocessedVotes::default(); - if staked { - latest_unprocessed_votes.set_staked_nodes(&[keypair.pubkey()]); - } + let staked_keys = if staked { + vec![vote_keypair.pubkey()] + } else { + vec![] + }; + let latest_unprocessed_votes = LatestUnprocessedVotes::new_for_tests(&staked_keys); let mut transaction_storage = UnprocessedTransactionStorage::new_vote_storage( Arc::new(latest_unprocessed_votes), vote_source, @@ -1316,8 +1318,8 @@ mod tests { )?; vote.meta_mut().flags.set(PacketFlags::SIMPLE_VOTE_TX, true); - let latest_unprocessed_votes = LatestUnprocessedVotes::default(); - latest_unprocessed_votes.set_staked_nodes(&[node_keypair.pubkey()]); + let latest_unprocessed_votes = + LatestUnprocessedVotes::new_for_tests(&[vote_keypair.pubkey()]); let mut transaction_storage = UnprocessedTransactionStorage::new_vote_storage( Arc::new(latest_unprocessed_votes), VoteSource::Tpu,