diff --git a/core/src/banking_stage/latest_unprocessed_votes.rs b/core/src/banking_stage/latest_unprocessed_votes.rs index c126d1a40ed6b4..bb97142bda5e81 100644 --- a/core/src/banking_stage/latest_unprocessed_votes.rs +++ b/core/src/banking_stage/latest_unprocessed_votes.rs @@ -6,7 +6,7 @@ use { itertools::Itertools, rand::{thread_rng, Rng}, solana_perf::packet::Packet, - solana_runtime::bank::Bank, + solana_runtime::{bank::Bank, epoch_stakes::EpochStakes}, solana_sdk::{ account::from_account, clock::{Slot, UnixTimestamp}, @@ -39,7 +39,7 @@ pub enum VoteSource { #[derive(Debug, Clone)] pub struct LatestValidatorVotePacket { vote_source: VoteSource, - pubkey: Pubkey, + vote_pubkey: Pubkey, vote: Option>, slot: Slot, hash: Hash, @@ -87,10 +87,16 @@ impl LatestValidatorVotePacket { Ok(vote_state_update_instruction) if instruction_filter(&vote_state_update_instruction) => { - let &pubkey = message + let vote_account_index = instruction + .accounts + .first() + .copied() + .ok_or(DeserializedPacketError::VoteTransactionError)?; + let vote_pubkey = message .message .static_account_keys() - .first() + .get(vote_account_index as usize) + .copied() .ok_or(DeserializedPacketError::VoteTransactionError)?; let slot = vote_state_update_instruction.last_voted_slot().unwrap_or(0); let hash = vote_state_update_instruction.hash(); @@ -100,7 +106,7 @@ impl LatestValidatorVotePacket { vote: Some(vote), slot, hash, - pubkey, + vote_pubkey, vote_source, forwarded: false, timestamp, @@ -114,8 +120,8 @@ impl LatestValidatorVotePacket { self.vote.as_ref().unwrap().clone() } - pub fn pubkey(&self) -> Pubkey { - self.pubkey + pub fn vote_pubkey(&self) -> Pubkey { + self.vote_pubkey } pub fn slot(&self) -> Slot { @@ -150,12 +156,12 @@ pub(crate) struct VoteBatchInsertionMetrics { pub(crate) num_dropped_tpu: usize, } -#[derive(Debug, Default)] +#[derive(Debug)] pub struct LatestUnprocessedVotes { - latest_votes_per_pubkey: RwLock>>>, + latest_vote_per_vote_pubkey: RwLock>>>, num_unprocessed_votes: AtomicUsize, // These are only ever written to by the tpu vote thread - cached_staked_nodes: RwLock>>, + cached_epoch_stakes: RwLock, deprecate_legacy_vote_ixs: AtomicBool, current_epoch: AtomicU64, } @@ -166,10 +172,30 @@ impl LatestUnprocessedVotes { .feature_set .is_active(&feature_set::deprecate_legacy_vote_ixs::id()); Self { - cached_staked_nodes: RwLock::new(bank.current_epoch_staked_nodes().clone()), + latest_vote_per_vote_pubkey: RwLock::new(HashMap::default()), + num_unprocessed_votes: AtomicUsize::new(0), + cached_epoch_stakes: RwLock::new(bank.current_epoch_stakes().clone()), current_epoch: AtomicU64::new(bank.epoch()), deprecate_legacy_vote_ixs: AtomicBool::new(deprecate_legacy_vote_ixs), - ..Self::default() + } + } + + #[cfg(test)] + pub fn new_for_tests(vote_pubkeys_to_stake: &[Pubkey]) -> Self { + use solana_vote::vote_account::VoteAccount; + + let vote_accounts = vote_pubkeys_to_stake + .iter() + .map(|pubkey| (*pubkey, (1u64, VoteAccount::new_random()))) + .collect(); + let epoch_stakes = EpochStakes::new_for_tests(vote_accounts, 0); + + Self { + latest_vote_per_vote_pubkey: RwLock::new(HashMap::default()), + num_unprocessed_votes: AtomicUsize::new(0), + cached_epoch_stakes: RwLock::new(epoch_stakes), + current_epoch: AtomicU64::new(0), + deprecate_legacy_vote_ixs: AtomicBool::new(true), } } @@ -185,9 +211,9 @@ impl LatestUnprocessedVotes { &'a self, votes: impl Iterator + 'a, ) -> impl Iterator + 'a { - let staked_nodes = self.cached_staked_nodes.read().unwrap(); + let epoch_stakes = self.cached_epoch_stakes.read().unwrap(); votes.filter(move |vote| { - let stake = staked_nodes.get(&vote.pubkey()).copied().unwrap_or(0); + let stake = epoch_stakes.vote_account_stake(&vote.vote_pubkey()); stake > 0 }) } @@ -216,7 +242,7 @@ impl LatestUnprocessedVotes { } fn get_entry(&self, pubkey: Pubkey) -> Option>> { - self.latest_votes_per_pubkey + self.latest_vote_per_vote_pubkey .read() .unwrap() .get(&pubkey) @@ -231,7 +257,7 @@ impl LatestUnprocessedVotes { vote: LatestValidatorVotePacket, should_replenish_taken_votes: bool, ) -> Option { - let pubkey = vote.pubkey(); + let vote_pubkey = vote.vote_pubkey(); let slot = vote.slot(); let timestamp = vote.timestamp(); @@ -276,11 +302,16 @@ impl LatestUnprocessedVotes { Some(vote) }; - if let Some(latest_vote) = self.get_entry(pubkey) { + if let Some(latest_vote) = self.get_entry(vote_pubkey) { with_latest_vote(&latest_vote, vote) } else { // Grab write-lock to insert new vote. - match self.latest_votes_per_pubkey.write().unwrap().entry(pubkey) { + match self + .latest_vote_per_vote_pubkey + .write() + .unwrap() + .entry(vote_pubkey) + { std::collections::hash_map::Entry::Occupied(entry) => { with_latest_vote(entry.get(), vote) } @@ -295,7 +326,7 @@ impl LatestUnprocessedVotes { #[cfg(test)] pub fn get_latest_vote_slot(&self, pubkey: Pubkey) -> Option { - self.latest_votes_per_pubkey + self.latest_vote_per_vote_pubkey .read() .unwrap() .get(&pubkey) @@ -304,28 +335,21 @@ impl LatestUnprocessedVotes { #[cfg(test)] fn get_latest_timestamp(&self, pubkey: Pubkey) -> Option { - self.latest_votes_per_pubkey + self.latest_vote_per_vote_pubkey .read() .unwrap() .get(&pubkey) .and_then(|l| l.read().unwrap().timestamp()) } - #[cfg(test)] - pub(crate) fn set_staked_nodes(&self, staked_nodes: &[Pubkey]) { - let staked_nodes: HashMap = - staked_nodes.iter().map(|pk| (*pk, 1u64)).collect(); - *self.cached_staked_nodes.write().unwrap() = Arc::new(staked_nodes); - } - fn weighted_random_order_by_stake(&self) -> impl Iterator { // Efraimidis and Spirakis algo for weighted random sample without replacement - let staked_nodes = self.cached_staked_nodes.read().unwrap(); - let latest_votes_per_pubkey = self.latest_votes_per_pubkey.read().unwrap(); - let mut pubkey_with_weight: Vec<(f64, Pubkey)> = latest_votes_per_pubkey + let epoch_stakes = self.cached_epoch_stakes.read().unwrap(); + let latest_vote_per_vote_pubkey = self.latest_vote_per_vote_pubkey.read().unwrap(); + let mut pubkey_with_weight: Vec<(f64, Pubkey)> = latest_vote_per_vote_pubkey .keys() .filter_map(|&pubkey| { - let stake = staked_nodes.get(&pubkey).copied().unwrap_or(0); + let stake = epoch_stakes.vote_account_stake(&pubkey); if stake == 0 { None // Ignore votes from unstaked validators } else { @@ -343,24 +367,24 @@ impl LatestUnprocessedVotes { if bank.epoch() <= self.current_epoch.load(Ordering::Relaxed) { return; } - let mut staked_nodes = self.cached_staked_nodes.write().unwrap(); - *staked_nodes = bank.current_epoch_staked_nodes().clone(); - self.current_epoch.store(bank.epoch(), Ordering::Relaxed); - self.deprecate_legacy_vote_ixs.store( - bank.feature_set - .is_active(&feature_set::deprecate_legacy_vote_ixs::id()), - Ordering::Relaxed, - ); + { + let mut epoch_stakes = self.cached_epoch_stakes.write().unwrap(); + *epoch_stakes = bank.current_epoch_stakes().clone(); + self.current_epoch.store(bank.epoch(), Ordering::Relaxed); + self.deprecate_legacy_vote_ixs.store( + bank.feature_set + .is_active(&feature_set::deprecate_legacy_vote_ixs::id()), + Ordering::Relaxed, + ); + } // Evict any now unstaked pubkeys - let mut latest_votes_per_pubkey = self.latest_votes_per_pubkey.write().unwrap(); + let epoch_stakes = self.cached_epoch_stakes.read().unwrap(); + let mut latest_vote_per_vote_pubkey = self.latest_vote_per_vote_pubkey.write().unwrap(); let mut unstaked_votes = 0; - latest_votes_per_pubkey.retain(|pubkey, vote| { + latest_vote_per_vote_pubkey.retain(|vote_pubkey, vote| { let is_present = !vote.read().unwrap().is_vote_taken(); - let should_evict = match staked_nodes.get(pubkey) { - None => true, - Some(stake) => *stake == 0, - }; + let should_evict = epoch_stakes.vote_account_stake(vote_pubkey) == 0; if is_present && should_evict { unstaked_votes += 1; } @@ -474,7 +498,7 @@ impl LatestUnprocessedVotes { /// Sometimes we forward and hold the packets, sometimes we forward and clear. /// This also clears all gossip votes since by definition they have been forwarded pub fn clear_forwarded_packets(&self) { - self.latest_votes_per_pubkey + self.latest_vote_per_vote_pubkey .read() .unwrap() .values() @@ -611,12 +635,12 @@ mod tests { assert_eq!(VoteSource::Gossip, deserialized_packets[1].vote_source); assert_eq!( - keypairs.node_keypair.pubkey(), - deserialized_packets[0].pubkey + keypairs.vote_keypair.pubkey(), + deserialized_packets[0].vote_pubkey ); assert_eq!( - keypairs.node_keypair.pubkey(), - deserialized_packets[1].pubkey + keypairs.vote_keypair.pubkey(), + deserialized_packets[1].vote_pubkey ); assert!(deserialized_packets[0].vote.is_some()); @@ -625,12 +649,11 @@ mod tests { #[test] fn test_update_latest_vote() { - let latest_unprocessed_votes = LatestUnprocessedVotes::default(); let keypair_a = ValidatorVoteKeypairs::new_rand(); let keypair_b = ValidatorVoteKeypairs::new_rand(); - latest_unprocessed_votes.set_staked_nodes(&[ - keypair_a.node_keypair.pubkey(), - keypair_b.node_keypair.pubkey(), + let latest_unprocessed_votes = LatestUnprocessedVotes::new_for_tests(&[ + keypair_a.vote_keypair.pubkey(), + keypair_b.vote_keypair.pubkey(), ]); let vote_a = from_slots(vec![(0, 2), (1, 1)], VoteSource::Gossip, &keypair_a, None); @@ -651,11 +674,11 @@ mod tests { assert_eq!( Some(1), - latest_unprocessed_votes.get_latest_vote_slot(keypair_a.node_keypair.pubkey()) + latest_unprocessed_votes.get_latest_vote_slot(keypair_a.vote_keypair.pubkey()) ); assert_eq!( Some(9), - latest_unprocessed_votes.get_latest_vote_slot(keypair_b.node_keypair.pubkey()) + latest_unprocessed_votes.get_latest_vote_slot(keypair_b.vote_keypair.pubkey()) ); let vote_a = from_slots( @@ -710,13 +733,13 @@ mod tests { assert_eq!( 10, latest_unprocessed_votes - .get_latest_vote_slot(keypair_a.node_keypair.pubkey()) + .get_latest_vote_slot(keypair_a.vote_keypair.pubkey()) .unwrap() ); assert_eq!( 9, latest_unprocessed_votes - .get_latest_vote_slot(keypair_b.node_keypair.pubkey()) + .get_latest_vote_slot(keypair_b.vote_keypair.pubkey()) .unwrap() ); @@ -739,11 +762,11 @@ mod tests { assert_eq!(2, latest_unprocessed_votes.len()); assert_eq!( Some(1), - latest_unprocessed_votes.get_latest_timestamp(keypair_a.node_keypair.pubkey()) + latest_unprocessed_votes.get_latest_timestamp(keypair_a.vote_keypair.pubkey()) ); assert_eq!( Some(2), - latest_unprocessed_votes.get_latest_timestamp(keypair_b.node_keypair.pubkey()) + latest_unprocessed_votes.get_latest_timestamp(keypair_b.vote_keypair.pubkey()) ); // Same votes with bigger timestamps should override @@ -765,11 +788,11 @@ mod tests { assert_eq!(2, latest_unprocessed_votes.len()); assert_eq!( Some(5), - latest_unprocessed_votes.get_latest_timestamp(keypair_a.node_keypair.pubkey()) + latest_unprocessed_votes.get_latest_timestamp(keypair_a.vote_keypair.pubkey()) ); assert_eq!( Some(6), - latest_unprocessed_votes.get_latest_timestamp(keypair_b.node_keypair.pubkey()) + latest_unprocessed_votes.get_latest_timestamp(keypair_b.vote_keypair.pubkey()) ); // Same votes with smaller timestamps should not override @@ -793,16 +816,16 @@ mod tests { assert_eq!(2, latest_unprocessed_votes.len()); assert_eq!( Some(5), - latest_unprocessed_votes.get_latest_timestamp(keypair_a.node_keypair.pubkey()) + latest_unprocessed_votes.get_latest_timestamp(keypair_a.vote_keypair.pubkey()) ); assert_eq!( Some(6), - latest_unprocessed_votes.get_latest_timestamp(keypair_b.node_keypair.pubkey()) + latest_unprocessed_votes.get_latest_timestamp(keypair_b.vote_keypair.pubkey()) ); // Drain all latest votes for packet in latest_unprocessed_votes - .latest_votes_per_pubkey + .latest_vote_per_vote_pubkey .read() .unwrap() .values() @@ -832,8 +855,6 @@ mod tests { fn test_update_latest_vote_race() { // There was a race condition in updating the same pubkey in the hashmap // when the entry does not initially exist. - let latest_unprocessed_votes = Arc::new(LatestUnprocessedVotes::default()); - const NUM_VOTES: usize = 100; let keypairs = Arc::new( (0..NUM_VOTES) @@ -842,9 +863,10 @@ mod tests { ); let staked_nodes = keypairs .iter() - .map(|kp| kp.node_keypair.pubkey()) + .map(|kp| kp.vote_keypair.pubkey()) .collect_vec(); - latest_unprocessed_votes.set_staked_nodes(&staked_nodes); + let latest_unprocessed_votes = + Arc::new(LatestUnprocessedVotes::new_for_tests(&staked_nodes)); // Insert votes in parallel let insert_vote = |latest_unprocessed_votes: &LatestUnprocessedVotes, @@ -876,8 +898,6 @@ mod tests { #[test] fn test_simulate_threads() { - let latest_unprocessed_votes = Arc::new(LatestUnprocessedVotes::default()); - let latest_unprocessed_votes_tpu = latest_unprocessed_votes.clone(); let keypairs = Arc::new( (0..10) .map(|_| ValidatorVoteKeypairs::new_rand()) @@ -886,9 +906,11 @@ mod tests { let keypairs_tpu = keypairs.clone(); let staked_nodes = keypairs .iter() - .map(|kp| kp.node_keypair.pubkey()) + .map(|kp| kp.vote_keypair.pubkey()) .collect_vec(); - latest_unprocessed_votes.set_staked_nodes(&staked_nodes); + let latest_unprocessed_votes = + Arc::new(LatestUnprocessedVotes::new_for_tests(&staked_nodes)); + let latest_unprocessed_votes_tpu = latest_unprocessed_votes.clone(); let vote_limit = 1000; let gossip = Builder::new() @@ -921,19 +943,21 @@ mod tests { .update_latest_vote(vote, false /* should replenish */); if i % 214 == 0 { // Simulate draining and processing packets - let latest_votes_per_pubkey = latest_unprocessed_votes_tpu - .latest_votes_per_pubkey + let latest_vote_per_vote_pubkey = latest_unprocessed_votes_tpu + .latest_vote_per_vote_pubkey .read() .unwrap(); - latest_votes_per_pubkey.iter().for_each(|(_pubkey, lock)| { - let mut latest_vote = lock.write().unwrap(); - if !latest_vote.is_vote_taken() { - latest_vote.take_vote(); - latest_unprocessed_votes_tpu - .num_unprocessed_votes - .fetch_sub(1, Ordering::Relaxed); - } - }); + latest_vote_per_vote_pubkey + .iter() + .for_each(|(_pubkey, lock)| { + let mut latest_vote = lock.write().unwrap(); + if !latest_vote.is_vote_taken() { + latest_vote.take_vote(); + latest_unprocessed_votes_tpu + .num_unprocessed_votes + .fetch_sub(1, Ordering::Relaxed); + } + }); } } }) @@ -944,7 +968,7 @@ mod tests { #[test] fn test_forwardable_packets() { - let latest_unprocessed_votes = LatestUnprocessedVotes::default(); + let latest_unprocessed_votes = LatestUnprocessedVotes::new_for_tests(&[]); let bank_0 = Bank::new_for_tests(&GenesisConfig::default()); let mut bank = Bank::new_from_parent( Arc::new(bank_0), @@ -983,12 +1007,9 @@ mod tests { .count() ); - let config = genesis_utils::create_genesis_config_with_leader( - 100, - &keypair_a.node_keypair.pubkey(), - 200, - ) - .genesis_config; + let config = + genesis_utils::create_genesis_config_with_vote_accounts(100, &[keypair_a], vec![200]) + .genesis_config; let bank_0 = Bank::new_for_tests(&config); let bank = Bank::new_from_parent( Arc::new(bank_0), @@ -1018,12 +1039,9 @@ mod tests { .count() ); - let config = genesis_utils::create_genesis_config_with_leader( - 100, - &keypair_b.node_keypair.pubkey(), - 200, - ) - .genesis_config; + let config = + genesis_utils::create_genesis_config_with_vote_accounts(100, &[keypair_b], vec![200]) + .genesis_config; let bank_0 = Bank::new_for_tests(&config); let bank = Arc::new(Bank::new_from_parent( Arc::new(bank_0), @@ -1069,16 +1087,15 @@ mod tests { #[test] fn test_clear_forwarded_packets() { - let latest_unprocessed_votes = LatestUnprocessedVotes::default(); let keypair_a = ValidatorVoteKeypairs::new_rand(); let keypair_b = ValidatorVoteKeypairs::new_rand(); let keypair_c = ValidatorVoteKeypairs::new_rand(); let keypair_d = ValidatorVoteKeypairs::new_rand(); - latest_unprocessed_votes.set_staked_nodes(&[ - keypair_a.node_keypair.pubkey(), - keypair_b.node_keypair.pubkey(), - keypair_c.node_keypair.pubkey(), - keypair_d.node_keypair.pubkey(), + let latest_unprocessed_votes = LatestUnprocessedVotes::new_for_tests(&[ + keypair_a.vote_keypair.pubkey(), + keypair_b.vote_keypair.pubkey(), + keypair_c.vote_keypair.pubkey(), + keypair_d.vote_keypair.pubkey(), ]); let vote_a = from_slots(vec![(1, 1)], VoteSource::Gossip, &keypair_a, None); @@ -1098,19 +1115,19 @@ mod tests { assert_eq!( Some(1), - latest_unprocessed_votes.get_latest_vote_slot(keypair_a.node_keypair.pubkey()) + latest_unprocessed_votes.get_latest_vote_slot(keypair_a.vote_keypair.pubkey()) ); assert_eq!( Some(2), - latest_unprocessed_votes.get_latest_vote_slot(keypair_b.node_keypair.pubkey()) + latest_unprocessed_votes.get_latest_vote_slot(keypair_b.vote_keypair.pubkey()) ); assert_eq!( Some(3), - latest_unprocessed_votes.get_latest_vote_slot(keypair_c.node_keypair.pubkey()) + latest_unprocessed_votes.get_latest_vote_slot(keypair_c.vote_keypair.pubkey()) ); assert_eq!( Some(4), - latest_unprocessed_votes.get_latest_vote_slot(keypair_d.node_keypair.pubkey()) + latest_unprocessed_votes.get_latest_vote_slot(keypair_d.vote_keypair.pubkey()) ); } @@ -1141,12 +1158,9 @@ mod tests { assert!(latest_unprocessed_votes.is_empty()); // Bank in same epoch should not update stakes - let config = genesis_utils::create_genesis_config_with_leader( - 100, - &keypair_a.node_keypair.pubkey(), - 200, - ) - .genesis_config; + let config = + genesis_utils::create_genesis_config_with_vote_accounts(100, &[&keypair_a], vec![200]) + .genesis_config; let bank_0 = Bank::new_for_tests(&config); let bank = Bank::new_from_parent( Arc::new(bank_0), @@ -1159,12 +1173,9 @@ mod tests { assert!(latest_unprocessed_votes.is_empty()); // Bank in next epoch should update stakes - let config = genesis_utils::create_genesis_config_with_leader( - 100, - &keypair_b.node_keypair.pubkey(), - 200, - ) - .genesis_config; + let config = + genesis_utils::create_genesis_config_with_vote_accounts(100, &[&keypair_b], vec![200]) + .genesis_config; let bank_0 = Bank::new_for_tests(&config); let bank = Bank::new_from_parent( Arc::new(bank_0), @@ -1176,17 +1187,14 @@ mod tests { latest_unprocessed_votes.insert_batch(votes.clone(), true); assert_eq!(latest_unprocessed_votes.len(), 1); assert_eq!( - latest_unprocessed_votes.get_latest_vote_slot(keypair_b.node_keypair.pubkey()), + latest_unprocessed_votes.get_latest_vote_slot(keypair_b.vote_keypair.pubkey()), Some(vote_b.slot()) ); // Previously unstaked votes are removed - let config = genesis_utils::create_genesis_config_with_leader( - 100, - &keypair_c.node_keypair.pubkey(), - 200, - ) - .genesis_config; + let config = + genesis_utils::create_genesis_config_with_vote_accounts(100, &[&keypair_c], vec![200]) + .genesis_config; let bank_0 = Bank::new_for_tests(&config); let bank = Bank::new_from_parent( Arc::new(bank_0), @@ -1199,7 +1207,7 @@ mod tests { latest_unprocessed_votes.insert_batch(votes.clone(), true); assert_eq!(latest_unprocessed_votes.len(), 1); assert_eq!( - latest_unprocessed_votes.get_latest_vote_slot(keypair_c.node_keypair.pubkey()), + latest_unprocessed_votes.get_latest_vote_slot(keypair_c.vote_keypair.pubkey()), Some(vote_c.slot()) ); } diff --git a/core/src/banking_stage/unprocessed_transaction_storage.rs b/core/src/banking_stage/unprocessed_transaction_storage.rs index cdcba288f90f8d..f612f5eaf08b11 100644 --- a/core/src/banking_stage/unprocessed_transaction_storage.rs +++ b/core/src/banking_stage/unprocessed_transaction_storage.rs @@ -1278,10 +1278,12 @@ mod tests { [VoteSource::Gossip, VoteSource::Tpu].into_iter(), [true, false].into_iter() ) { - let latest_unprocessed_votes = LatestUnprocessedVotes::default(); - if staked { - latest_unprocessed_votes.set_staked_nodes(&[keypair.pubkey()]); - } + let staked_keys = if staked { + vec![vote_keypair.pubkey()] + } else { + vec![] + }; + let latest_unprocessed_votes = LatestUnprocessedVotes::new_for_tests(&staked_keys); let mut transaction_storage = UnprocessedTransactionStorage::new_vote_storage( Arc::new(latest_unprocessed_votes), vote_source, @@ -1317,8 +1319,8 @@ mod tests { )?; vote.meta_mut().flags.set(PacketFlags::SIMPLE_VOTE_TX, true); - let latest_unprocessed_votes = LatestUnprocessedVotes::default(); - latest_unprocessed_votes.set_staked_nodes(&[node_keypair.pubkey()]); + let latest_unprocessed_votes = + LatestUnprocessedVotes::new_for_tests(&[vote_keypair.pubkey()]); let mut transaction_storage = UnprocessedTransactionStorage::new_vote_storage( Arc::new(latest_unprocessed_votes), VoteSource::Tpu,