diff --git a/core/src/banking_stage.rs b/core/src/banking_stage.rs index 8dff75832106a9..6f22db2d41a87b 100644 --- a/core/src/banking_stage.rs +++ b/core/src/banking_stage.rs @@ -456,7 +456,10 @@ impl BankingStage { let batch_limit = TOTAL_BUFFERED_PACKETS / ((num_threads - NUM_VOTE_PROCESSING_THREADS) as usize); // Keeps track of extraneous vote transactions for the vote threads - let latest_unprocessed_votes = Arc::new(LatestUnprocessedVotes::new()); + let latest_unprocessed_votes = { + let bank = bank_forks.read().unwrap().working_bank(); + Arc::new(LatestUnprocessedVotes::new(&bank)) + }; let decision_maker = DecisionMaker::new(cluster_info.id(), poh_recorder.clone()); let committer = Committer::new( @@ -539,7 +542,10 @@ impl BankingStage { // Once an entry has been recorded, its blockhash is registered with the bank. let data_budget = Arc::new(DataBudget::default()); // Keeps track of extraneous vote transactions for the vote threads - let latest_unprocessed_votes = Arc::new(LatestUnprocessedVotes::new()); + let latest_unprocessed_votes = { + let bank = bank_forks.read().unwrap().working_bank(); + Arc::new(LatestUnprocessedVotes::new(&bank)) + }; let decision_maker = DecisionMaker::new(cluster_info.id(), poh_recorder.clone()); let committer = Committer::new( diff --git a/core/src/banking_stage/forwarder.rs b/core/src/banking_stage/forwarder.rs index 82af221842dd0b..41e8e09fb372d2 100644 --- a/core/src/banking_stage/forwarder.rs +++ b/core/src/banking_stage/forwarder.rs @@ -102,6 +102,9 @@ impl Forwarder { // load all accounts from address loader; let current_bank = self.bank_forks.read().unwrap().working_bank(); + // if we have crossed an epoch boundary, recache any state + unprocessed_transaction_storage.cache_epoch_boundary_info(¤t_bank); + // sanitize and filter packets that are no longer valid (could be too old, a duplicate of something // already processed), then add to forwarding buffer. let filter_forwarding_result = unprocessed_transaction_storage diff --git a/core/src/banking_stage/latest_unprocessed_votes.rs b/core/src/banking_stage/latest_unprocessed_votes.rs index 0ddaaeafa4ac7e..b586a973cc3cd0 100644 --- a/core/src/banking_stage/latest_unprocessed_votes.rs +++ b/core/src/banking_stage/latest_unprocessed_votes.rs @@ -10,6 +10,7 @@ use { solana_sdk::{ account::from_account, clock::{Slot, UnixTimestamp}, + feature_set::{self}, hash::Hash, program_utils::limited_deserialize, pubkey::Pubkey, @@ -22,7 +23,7 @@ use { collections::HashMap, ops::DerefMut, sync::{ - atomic::{AtomicUsize, Ordering}, + atomic::{AtomicBool, AtomicU64, AtomicUsize, Ordering}, Arc, RwLock, }, }, @@ -47,18 +48,23 @@ pub struct LatestValidatorVotePacket { } impl LatestValidatorVotePacket { - pub fn new(packet: Packet, vote_source: VoteSource) -> Result { + pub fn new( + packet: Packet, + vote_source: VoteSource, + deprecate_legacy_vote_ixs: bool, + ) -> Result { if !packet.meta().is_simple_vote_tx() { return Err(DeserializedPacketError::VoteTransactionError); } let vote = Arc::new(ImmutableDeserializedPacket::new(packet)?); - Self::new_from_immutable(vote, vote_source) + Self::new_from_immutable(vote, vote_source, deprecate_legacy_vote_ixs) } pub fn new_from_immutable( vote: Arc, vote_source: VoteSource, + deprecate_legacy_vote_ixs: bool, ) -> Result { let message = vote.transaction().get_message(); let (_, instruction) = message @@ -66,9 +72,20 @@ impl LatestValidatorVotePacket { .next() .ok_or(DeserializedPacketError::VoteTransactionError)?; + let instruction_filter = |ix: &VoteInstruction| { + if deprecate_legacy_vote_ixs { + matches!( + ix, + VoteInstruction::TowerSync(_) | VoteInstruction::TowerSyncSwitch(_, _), + ) + } else { + ix.is_single_vote_state_update() + } + }; + match limited_deserialize::(&instruction.data) { Ok(vote_state_update_instruction) - if vote_state_update_instruction.is_single_vote_state_update() => + if instruction_filter(&vote_state_update_instruction) => { let &pubkey = message .message @@ -127,26 +144,6 @@ impl LatestValidatorVotePacket { } } -pub(crate) fn weighted_random_order_by_stake<'a>( - bank: &Bank, - pubkeys: impl Iterator, -) -> impl Iterator + 'static { - // Efraimidis and Spirakis algo for weighted random sample without replacement - let staked_nodes = bank.current_epoch_staked_nodes(); - let mut pubkey_with_weight: Vec<(f64, Pubkey)> = pubkeys - .filter_map(|&pubkey| { - let stake = staked_nodes.get(&pubkey).copied().unwrap_or(0); - if stake == 0 { - None // Ignore votes from unstaked validators - } else { - Some((thread_rng().gen::().powf(1.0 / (stake as f64)), pubkey)) - } - }) - .collect::>(); - pubkey_with_weight.sort_by(|(w1, _), (w2, _)| w1.partial_cmp(w2).unwrap()); - pubkey_with_weight.into_iter().map(|(_, pubkey)| pubkey) -} - #[derive(Default, Debug)] pub(crate) struct VoteBatchInsertionMetrics { pub(crate) num_dropped_gossip: usize, @@ -157,11 +154,23 @@ pub(crate) struct VoteBatchInsertionMetrics { pub struct LatestUnprocessedVotes { latest_votes_per_pubkey: RwLock>>>, num_unprocessed_votes: AtomicUsize, + // These are only ever written to by the tpu vote thread + cached_staked_nodes: RwLock>>, + deprecate_legacy_vote_ixs: AtomicBool, + current_epoch: AtomicU64, } impl LatestUnprocessedVotes { - pub fn new() -> Self { - Self::default() + pub fn new(bank: &Bank) -> Self { + let deprecate_legacy_vote_ixs = bank + .feature_set + .is_active(&feature_set::deprecate_legacy_vote_ixs::id()); + Self { + cached_staked_nodes: RwLock::new(bank.current_epoch_staked_nodes().clone()), + current_epoch: AtomicU64::new(bank.epoch()), + deprecate_legacy_vote_ixs: AtomicBool::new(deprecate_legacy_vote_ixs), + ..Self::default() + } } pub fn len(&self) -> usize { @@ -172,6 +181,17 @@ impl LatestUnprocessedVotes { self.len() == 0 } + fn filter_unstaked_votes<'a>( + &'a self, + votes: impl Iterator + 'a, + ) -> impl Iterator + 'a { + let staked_nodes = self.cached_staked_nodes.read().unwrap(); + votes.filter(move |vote| { + let stake = staked_nodes.get(&vote.pubkey()).copied().unwrap_or(0); + stake > 0 + }) + } + pub(crate) fn insert_batch( &self, votes: impl Iterator, @@ -180,7 +200,7 @@ impl LatestUnprocessedVotes { let mut num_dropped_gossip = 0; let mut num_dropped_tpu = 0; - for vote in votes { + for vote in self.filter_unstaked_votes(votes) { if let Some(vote) = self.update_latest_vote(vote, should_replenish_taken_votes) { match vote.vote_source { VoteSource::Gossip => num_dropped_gossip += 1, @@ -291,6 +311,48 @@ impl LatestUnprocessedVotes { .and_then(|l| l.read().unwrap().timestamp()) } + #[cfg(test)] + pub(crate) fn set_staked_nodes(&self, staked_nodes: &[Pubkey]) { + let staked_nodes: HashMap = + staked_nodes.iter().map(|pk| (*pk, 1u64)).collect(); + *self.cached_staked_nodes.write().unwrap() = Arc::new(staked_nodes); + } + + fn weighted_random_order_by_stake(&self) -> impl Iterator { + // Efraimidis and Spirakis algo for weighted random sample without replacement + let staked_nodes = self.cached_staked_nodes.read().unwrap(); + let latest_votes_per_pubkey = self.latest_votes_per_pubkey.read().unwrap(); + let mut pubkey_with_weight: Vec<(f64, Pubkey)> = latest_votes_per_pubkey + .keys() + .filter_map(|&pubkey| { + let stake = staked_nodes.get(&pubkey).copied().unwrap_or(0); + if stake == 0 { + None // Ignore votes from unstaked validators + } else { + Some((thread_rng().gen::().powf(1.0 / (stake as f64)), pubkey)) + } + }) + .collect::>(); + pubkey_with_weight.sort_by(|(w1, _), (w2, _)| w1.partial_cmp(w2).unwrap()); + pubkey_with_weight.into_iter().map(|(_, pubkey)| pubkey) + } + + /// Recache the staked nodes based on a bank from the new epoch. + /// This should only be run by the TPU vote thread + pub(super) fn cache_epoch_boundary_info(&self, bank: &Bank) { + if bank.epoch() <= self.current_epoch.load(Ordering::Relaxed) { + return; + } + let mut staked_nodes = self.cached_staked_nodes.write().unwrap(); + *staked_nodes = bank.current_epoch_staked_nodes().clone(); + self.current_epoch.store(bank.epoch(), Ordering::Relaxed); + self.deprecate_legacy_vote_ixs.store( + bank.feature_set + .is_active(&feature_set::deprecate_legacy_vote_ixs::id()), + Ordering::Relaxed, + ); + } + /// Returns how many packets were forwardable /// Performs a weighted random order based on stake and stops forwarding at the first error /// Votes from validators with 0 stakes are ignored @@ -299,11 +361,7 @@ impl LatestUnprocessedVotes { bank: Arc, forward_packet_batches_by_accounts: &mut ForwardPacketBatchesByAccounts, ) -> usize { - let pubkeys_by_stake = { - let binding = self.latest_votes_per_pubkey.read().unwrap(); - weighted_random_order_by_stake(&bank, binding.keys()) - }; - + let pubkeys_by_stake = self.weighted_random_order_by_stake(); let mut forwarded_count: usize = 0; for pubkey in pubkeys_by_stake { let Some(vote) = self.get_entry(pubkey) else { @@ -361,11 +419,7 @@ impl LatestUnprocessedVotes { ); } - let pubkeys_by_stake = { - let binding = self.latest_votes_per_pubkey.read().unwrap(); - weighted_random_order_by_stake(&bank, binding.keys()) - }; - pubkeys_by_stake + self.weighted_random_order_by_stake() .filter_map(|pubkey| { self.get_entry(pubkey).and_then(|lock| { let mut latest_vote = lock.write().unwrap(); @@ -410,6 +464,10 @@ impl LatestUnprocessedVotes { } }); } + + pub(super) fn should_deprecate_legacy_vote_ixs(&self) -> bool { + self.deprecate_legacy_vote_ixs.load(Ordering::Relaxed) + } } #[cfg(test)] @@ -424,7 +482,10 @@ mod tests { epoch_stakes::EpochStakes, genesis_utils::{self, ValidatorVoteKeypairs}, }, - solana_sdk::{hash::Hash, signature::Signer, system_transaction::transfer}, + solana_sdk::{ + epoch_schedule::MINIMUM_SLOTS_PER_EPOCH, genesis_config::GenesisConfig, hash::Hash, + signature::Signer, system_transaction::transfer, + }, solana_vote_program::{ vote_state::TowerSync, vote_transaction::new_tower_sync_transaction, }, @@ -452,7 +513,7 @@ mod tests { .meta_mut() .flags .set(PacketFlags::SIMPLE_VOTE_TX, true); - LatestValidatorVotePacket::new(packet, vote_source).unwrap() + LatestValidatorVotePacket::new(packet, vote_source, true).unwrap() } fn deserialize_packets<'a>( @@ -461,7 +522,8 @@ mod tests { vote_source: VoteSource, ) -> impl Iterator + 'a { packet_indexes.iter().filter_map(move |packet_index| { - LatestValidatorVotePacket::new(packet_batch[*packet_index].clone(), vote_source).ok() + LatestValidatorVotePacket::new(packet_batch[*packet_index].clone(), vote_source, true) + .ok() }) } @@ -541,9 +603,13 @@ mod tests { #[test] fn test_update_latest_vote() { - let latest_unprocessed_votes = LatestUnprocessedVotes::new(); + let latest_unprocessed_votes = LatestUnprocessedVotes::default(); let keypair_a = ValidatorVoteKeypairs::new_rand(); let keypair_b = ValidatorVoteKeypairs::new_rand(); + latest_unprocessed_votes.set_staked_nodes(&[ + keypair_a.node_keypair.pubkey(), + keypair_b.node_keypair.pubkey(), + ]); let vote_a = from_slots(vec![(0, 2), (1, 1)], VoteSource::Gossip, &keypair_a, None); let vote_b = from_slots( @@ -744,7 +810,7 @@ mod tests { fn test_update_latest_vote_race() { // There was a race condition in updating the same pubkey in the hashmap // when the entry does not initially exist. - let latest_unprocessed_votes = Arc::new(LatestUnprocessedVotes::new()); + let latest_unprocessed_votes = Arc::new(LatestUnprocessedVotes::default()); const NUM_VOTES: usize = 100; let keypairs = Arc::new( @@ -752,6 +818,11 @@ mod tests { .map(|_| ValidatorVoteKeypairs::new_rand()) .collect_vec(), ); + let staked_nodes = keypairs + .iter() + .map(|kp| kp.node_keypair.pubkey()) + .collect_vec(); + latest_unprocessed_votes.set_staked_nodes(&staked_nodes); // Insert votes in parallel let insert_vote = |latest_unprocessed_votes: &LatestUnprocessedVotes, @@ -783,7 +854,7 @@ mod tests { #[test] fn test_simulate_threads() { - let latest_unprocessed_votes = Arc::new(LatestUnprocessedVotes::new()); + let latest_unprocessed_votes = Arc::new(LatestUnprocessedVotes::default()); let latest_unprocessed_votes_tpu = latest_unprocessed_votes.clone(); let keypairs = Arc::new( (0..10) @@ -791,6 +862,11 @@ mod tests { .collect_vec(), ); let keypairs_tpu = keypairs.clone(); + let staked_nodes = keypairs + .iter() + .map(|kp| kp.node_keypair.pubkey()) + .collect_vec(); + latest_unprocessed_votes.set_staked_nodes(&staked_nodes); let vote_limit = 1000; let gossip = Builder::new() @@ -846,11 +922,17 @@ mod tests { #[test] fn test_forwardable_packets() { - let latest_unprocessed_votes = LatestUnprocessedVotes::new(); - let mut bank = Bank::default_for_tests(); + let latest_unprocessed_votes = LatestUnprocessedVotes::default(); + let bank_0 = Bank::new_for_tests(&GenesisConfig::default()); + let mut bank = Bank::new_from_parent( + Arc::new(bank_0), + &Pubkey::new_unique(), + MINIMUM_SLOTS_PER_EPOCH, + ); + assert_eq!(bank.epoch(), 1); bank.set_epoch_stakes_for_test( - bank.epoch().saturating_add(1), - EpochStakes::new_for_tests(HashMap::new(), bank.epoch().saturating_add(1)), + bank.epoch().saturating_add(2), + EpochStakes::new_for_tests(HashMap::new(), bank.epoch().saturating_add(2)), ); let bank = Arc::new(bank); let mut forward_packet_batches_by_accounts = @@ -864,7 +946,8 @@ mod tests { latest_unprocessed_votes.update_latest_vote(vote_a, false /* should replenish */); latest_unprocessed_votes.update_latest_vote(vote_b, false /* should replenish */); - // Don't forward 0 stake accounts + // Recache on epoch boundary and don't forward 0 stake accounts + latest_unprocessed_votes.cache_epoch_boundary_info(&bank); let forwarded = latest_unprocessed_votes .get_and_insert_forwardable_packets(bank, &mut forward_packet_batches_by_accounts); assert_eq!(0, forwarded); @@ -882,11 +965,17 @@ mod tests { 200, ) .genesis_config; - let bank = Bank::new_for_tests(&config); + let bank_0 = Bank::new_for_tests(&config); + let bank = Bank::new_from_parent( + Arc::new(bank_0), + &Pubkey::new_unique(), + 2 * MINIMUM_SLOTS_PER_EPOCH, + ); let mut forward_packet_batches_by_accounts = ForwardPacketBatchesByAccounts::new_with_default_batch_limits(); // Don't forward votes from gossip + latest_unprocessed_votes.cache_epoch_boundary_info(&bank); let forwarded = latest_unprocessed_votes.get_and_insert_forwardable_packets( Arc::new(bank), &mut forward_packet_batches_by_accounts, @@ -907,11 +996,17 @@ mod tests { 200, ) .genesis_config; - let bank = Arc::new(Bank::new_for_tests(&config)); + let bank_0 = Bank::new_for_tests(&config); + let bank = Arc::new(Bank::new_from_parent( + Arc::new(bank_0), + &Pubkey::new_unique(), + 3 * MINIMUM_SLOTS_PER_EPOCH, + )); let mut forward_packet_batches_by_accounts = ForwardPacketBatchesByAccounts::new_with_default_batch_limits(); // Forward from TPU + latest_unprocessed_votes.cache_epoch_boundary_info(&bank); let forwarded = latest_unprocessed_votes.get_and_insert_forwardable_packets( bank.clone(), &mut forward_packet_batches_by_accounts, @@ -944,11 +1039,17 @@ mod tests { #[test] fn test_clear_forwarded_packets() { - let latest_unprocessed_votes = LatestUnprocessedVotes::new(); + let latest_unprocessed_votes = LatestUnprocessedVotes::default(); let keypair_a = ValidatorVoteKeypairs::new_rand(); let keypair_b = ValidatorVoteKeypairs::new_rand(); let keypair_c = ValidatorVoteKeypairs::new_rand(); let keypair_d = ValidatorVoteKeypairs::new_rand(); + latest_unprocessed_votes.set_staked_nodes(&[ + keypair_a.node_keypair.pubkey(), + keypair_b.node_keypair.pubkey(), + keypair_c.node_keypair.pubkey(), + keypair_d.node_keypair.pubkey(), + ]); let vote_a = from_slots(vec![(1, 1)], VoteSource::Gossip, &keypair_a, None); let mut vote_b = from_slots(vec![(2, 1)], VoteSource::Tpu, &keypair_b, None); @@ -982,4 +1083,97 @@ mod tests { latest_unprocessed_votes.get_latest_vote_slot(keypair_d.node_keypair.pubkey()) ); } + + #[test] + fn test_insert_batch_unstaked() { + let keypair_a = ValidatorVoteKeypairs::new_rand(); + let keypair_b = ValidatorVoteKeypairs::new_rand(); + let keypair_c = ValidatorVoteKeypairs::new_rand(); + let keypair_d = ValidatorVoteKeypairs::new_rand(); + + let vote_a = from_slots(vec![(1, 1)], VoteSource::Gossip, &keypair_a, None); + let vote_b = from_slots(vec![(2, 1)], VoteSource::Tpu, &keypair_b, None); + let vote_c = from_slots(vec![(3, 1)], VoteSource::Tpu, &keypair_c, None); + let vote_d = from_slots(vec![(4, 1)], VoteSource::Gossip, &keypair_d, None); + let votes = [ + vote_a.clone(), + vote_b.clone(), + vote_c.clone(), + vote_d.clone(), + ] + .into_iter(); + + let bank_0 = Bank::new_for_tests(&GenesisConfig::default()); + let latest_unprocessed_votes = LatestUnprocessedVotes::new(&bank_0); + + // Insert batch should filter out all votes as they are unstaked + latest_unprocessed_votes.insert_batch(votes.clone(), true); + assert!(latest_unprocessed_votes.is_empty()); + + // Bank in same epoch should not update stakes + let config = genesis_utils::create_genesis_config_with_leader( + 100, + &keypair_a.node_keypair.pubkey(), + 200, + ) + .genesis_config; + let bank_0 = Bank::new_for_tests(&config); + let bank = Bank::new_from_parent( + Arc::new(bank_0), + &Pubkey::new_unique(), + MINIMUM_SLOTS_PER_EPOCH - 1, + ); + assert_eq!(bank.epoch(), 0); + latest_unprocessed_votes.cache_epoch_boundary_info(&bank); + latest_unprocessed_votes.insert_batch(votes.clone(), true); + assert!(latest_unprocessed_votes.is_empty()); + + // Bank in next epoch should update stakes + let config = genesis_utils::create_genesis_config_with_leader( + 100, + &keypair_b.node_keypair.pubkey(), + 200, + ) + .genesis_config; + let bank_0 = Bank::new_for_tests(&config); + let bank = Bank::new_from_parent( + Arc::new(bank_0), + &Pubkey::new_unique(), + MINIMUM_SLOTS_PER_EPOCH, + ); + assert_eq!(bank.epoch(), 1); + latest_unprocessed_votes.cache_epoch_boundary_info(&bank); + latest_unprocessed_votes.insert_batch(votes.clone(), true); + assert_eq!(latest_unprocessed_votes.len(), 1); + assert_eq!( + latest_unprocessed_votes.get_latest_vote_slot(keypair_b.node_keypair.pubkey()), + Some(vote_b.slot()) + ); + + // Previously unstaked votes are not (yet) removed + let config = genesis_utils::create_genesis_config_with_leader( + 100, + &keypair_c.node_keypair.pubkey(), + 200, + ) + .genesis_config; + let bank_0 = Bank::new_for_tests(&config); + let bank = Bank::new_from_parent( + Arc::new(bank_0), + &Pubkey::new_unique(), + 3 * MINIMUM_SLOTS_PER_EPOCH, + ); + assert_eq!(bank.epoch(), 2); + latest_unprocessed_votes.cache_epoch_boundary_info(&bank); + latest_unprocessed_votes.insert_batch(votes.clone(), true); + assert_eq!(latest_unprocessed_votes.len(), 2); + assert_eq!( + latest_unprocessed_votes.get_latest_vote_slot(keypair_b.node_keypair.pubkey()), + Some(vote_b.slot()) + ); + assert_eq!( + latest_unprocessed_votes.get_latest_vote_slot(keypair_c.node_keypair.pubkey()), + Some(vote_c.slot()) + ); + } } diff --git a/core/src/banking_stage/unprocessed_transaction_storage.rs b/core/src/banking_stage/unprocessed_transaction_storage.rs index 35bc04a2997995..1ee7363e0d1924 100644 --- a/core/src/banking_stage/unprocessed_transaction_storage.rs +++ b/core/src/banking_stage/unprocessed_transaction_storage.rs @@ -414,6 +414,13 @@ impl UnprocessedTransactionStorage { ), } } + + pub(crate) fn cache_epoch_boundary_info(&mut self, bank: &Bank) { + match self { + Self::LocalTransactionStorage(_) => (), + Self::VoteStorage(vote_storage) => vote_storage.cache_epoch_boundary_info(bank), + } + } } impl VoteStorage { @@ -451,6 +458,8 @@ impl VoteStorage { LatestValidatorVotePacket::new_from_immutable( Arc::new(deserialized_packet), self.vote_source, + self.latest_unprocessed_votes + .should_deprecate_legacy_vote_ixs(), ) .ok() }), @@ -514,6 +523,10 @@ impl VoteStorage { should_process_packet, ); + let deprecate_legacy_vote_ixs = self + .latest_unprocessed_votes + .should_deprecate_legacy_vote_ixs(); + while let Some((packets, payload)) = scanner.iterate() { let vote_packets = packets.iter().map(|p| (*p).clone()).collect_vec(); @@ -523,6 +536,7 @@ impl VoteStorage { LatestValidatorVotePacket::new_from_immutable( vote_packets[*i].clone(), self.vote_source, + deprecate_legacy_vote_ixs, ) .ok() }), @@ -531,7 +545,12 @@ impl VoteStorage { } else { self.latest_unprocessed_votes.insert_batch( vote_packets.into_iter().filter_map(|packet| { - LatestValidatorVotePacket::new_from_immutable(packet, self.vote_source).ok() + LatestValidatorVotePacket::new_from_immutable( + packet, + self.vote_source, + deprecate_legacy_vote_ixs, + ) + .ok() }), true, // should_replenish_taken_votes ); @@ -540,6 +559,14 @@ impl VoteStorage { scanner.finalize().payload.reached_end_of_slot } + + fn cache_epoch_boundary_info(&mut self, bank: &Bank) { + if matches!(self.vote_source, VoteSource::Gossip) { + panic!("Gossip vote thread should not be checking epoch boundary"); + } + self.latest_unprocessed_votes + .cache_epoch_boundary_info(bank); + } } impl ThreadLocalUnprocessedPackets { @@ -1246,9 +1273,16 @@ mod tests { assert!(deserialized_packets.contains(&big_transfer)); } - for vote_source in [VoteSource::Gossip, VoteSource::Tpu] { + for (vote_source, staked) in [VoteSource::Gossip, VoteSource::Tpu] + .into_iter() + .flat_map(|vs| [(vs, true), (vs, false)]) + { + let latest_unprocessed_votes = LatestUnprocessedVotes::default(); + if staked { + latest_unprocessed_votes.set_staked_nodes(&[keypair.pubkey()]); + } let mut transaction_storage = UnprocessedTransactionStorage::new_vote_storage( - Arc::new(LatestUnprocessedVotes::new()), + Arc::new(latest_unprocessed_votes), vote_source, ); transaction_storage.insert_batch(vec![ @@ -1256,7 +1290,7 @@ mod tests { ImmutableDeserializedPacket::new(vote.clone())?, ImmutableDeserializedPacket::new(big_transfer.clone())?, ]); - assert_eq!(1, transaction_storage.len()); + assert_eq!(if staked { 1 } else { 0 }, transaction_storage.len()); } Ok(()) } @@ -1282,8 +1316,10 @@ mod tests { )?; vote.meta_mut().flags.set(PacketFlags::SIMPLE_VOTE_TX, true); + let latest_unprocessed_votes = LatestUnprocessedVotes::default(); + latest_unprocessed_votes.set_staked_nodes(&[node_keypair.pubkey()]); let mut transaction_storage = UnprocessedTransactionStorage::new_vote_storage( - Arc::new(LatestUnprocessedVotes::new()), + Arc::new(latest_unprocessed_votes), VoteSource::Tpu, );