diff --git a/core/src/banking_stage/latest_unprocessed_votes.rs b/core/src/banking_stage/latest_unprocessed_votes.rs index 084e4125b842ae..6e9f20441d9efb 100644 --- a/core/src/banking_stage/latest_unprocessed_votes.rs +++ b/core/src/banking_stage/latest_unprocessed_votes.rs @@ -14,6 +14,7 @@ use { }, solana_vote_program::vote_instruction::VoteInstruction, std::{ + cmp, collections::HashMap, ops::DerefMut, sync::{ @@ -166,12 +167,13 @@ impl LatestUnprocessedVotes { pub(crate) fn insert_batch( &self, votes: impl Iterator, + should_replenish_taken_votes: bool, ) -> VoteBatchInsertionMetrics { let mut num_dropped_gossip = 0; let mut num_dropped_tpu = 0; for vote in votes { - if let Some(vote) = self.update_latest_vote(vote) { + if let Some(vote) = self.update_latest_vote(vote, should_replenish_taken_votes) { match vote.vote_source { VoteSource::Gossip => num_dropped_gossip += 1, VoteSource::Tpu => num_dropped_tpu += 1, @@ -199,26 +201,41 @@ impl LatestUnprocessedVotes { pub fn update_latest_vote( &self, vote: LatestValidatorVotePacket, + should_replenish_taken_votes: bool, ) -> Option { let pubkey = vote.pubkey(); let slot = vote.slot(); let timestamp = vote.timestamp(); + // Allow votes for later slots or the same slot with later timestamp (refreshed votes) + // We directly compare as options to prioritize votes for same slot with timestamp as + // Some > None + let allow_update = |latest_vote: &LatestValidatorVotePacket| -> bool { + match slot.cmp(&latest_vote.slot()) { + cmp::Ordering::Less => return false, + cmp::Ordering::Greater => return true, + cmp::Ordering::Equal => {} + }; + + // Slots are equal, now check timestamp + match timestamp.cmp(&latest_vote.timestamp()) { + cmp::Ordering::Less => return false, + cmp::Ordering::Greater => return true, + cmp::Ordering::Equal => {} + }; + + // Timestamps are equal, lastly check if vote was taken previously + // and should be replenished + should_replenish_taken_votes && latest_vote.is_vote_taken() + }; + let with_latest_vote = |latest_vote: &RwLock, vote: LatestValidatorVotePacket| -> Option { - let (latest_slot, latest_timestamp) = latest_vote - .read() - .map(|vote| (vote.slot(), vote.timestamp())) - .unwrap(); - // Allow votes for later slots or the same slot with later timestamp (refreshed votes) - // We directly compare as options to prioritize votes for same slot with timestamp as - // Some > None - if slot > latest_slot || ((slot == latest_slot) && (timestamp > latest_timestamp)) { + let should_try_update = allow_update(&latest_vote.read().unwrap()); + if should_try_update { let mut latest_vote = latest_vote.write().unwrap(); - let latest_slot = latest_vote.slot(); - let latest_timestamp = latest_vote.timestamp(); - if slot > latest_slot || ((slot == latest_slot) && (timestamp > latest_timestamp)) { + if allow_update(&latest_vote) { let old_vote = std::mem::replace(latest_vote.deref_mut(), vote); if old_vote.is_vote_taken() { self.num_unprocessed_votes.fetch_add(1, Ordering::Relaxed); @@ -536,10 +553,10 @@ mod tests { ); assert!(latest_unprocessed_votes - .update_latest_vote(vote_a) + .update_latest_vote(vote_a, false /* should replenish */) .is_none()); assert!(latest_unprocessed_votes - .update_latest_vote(vote_b) + .update_latest_vote(vote_b, false /* should replenish */) .is_none()); assert_eq!(2, latest_unprocessed_votes.len()); @@ -569,7 +586,7 @@ mod tests { assert_eq!( 1, latest_unprocessed_votes - .update_latest_vote(vote_a) + .update_latest_vote(vote_a, false /* should replenish */) .unwrap() .slot ); @@ -577,7 +594,7 @@ mod tests { assert_eq!( 6, latest_unprocessed_votes - .update_latest_vote(vote_b) + .update_latest_vote(vote_b, false /* should replenish */) .unwrap() .slot ); @@ -597,8 +614,8 @@ mod tests { &keypair_b, None, ); - latest_unprocessed_votes.update_latest_vote(vote_a); - latest_unprocessed_votes.update_latest_vote(vote_b); + latest_unprocessed_votes.update_latest_vote(vote_a, false /* should replenish */); + latest_unprocessed_votes.update_latest_vote(vote_b, false /* should replenish */); assert_eq!(2, latest_unprocessed_votes.len()); assert_eq!( @@ -627,8 +644,8 @@ mod tests { &keypair_b, Some(2), ); - latest_unprocessed_votes.update_latest_vote(vote_a); - latest_unprocessed_votes.update_latest_vote(vote_b); + latest_unprocessed_votes.update_latest_vote(vote_a, false /* should replenish */); + latest_unprocessed_votes.update_latest_vote(vote_b, false /* should replenish */); assert_eq!(2, latest_unprocessed_votes.len()); assert_eq!( @@ -653,8 +670,8 @@ mod tests { &keypair_b, Some(6), ); - latest_unprocessed_votes.update_latest_vote(vote_a); - latest_unprocessed_votes.update_latest_vote(vote_b); + latest_unprocessed_votes.update_latest_vote(vote_a, false /* should replenish */); + latest_unprocessed_votes.update_latest_vote(vote_b, false /* should replenish */); assert_eq!(2, latest_unprocessed_votes.len()); assert_eq!( @@ -679,8 +696,10 @@ mod tests { &keypair_b, Some(3), ); - latest_unprocessed_votes.update_latest_vote(vote_a); - latest_unprocessed_votes.update_latest_vote(vote_b); + latest_unprocessed_votes + .update_latest_vote(vote_a.clone(), false /* should replenish */); + latest_unprocessed_votes + .update_latest_vote(vote_b.clone(), false /* should replenish */); assert_eq!(2, latest_unprocessed_votes.len()); assert_eq!( @@ -691,6 +710,33 @@ mod tests { Some(6), latest_unprocessed_votes.get_latest_timestamp(keypair_b.node_keypair.pubkey()) ); + + // Drain all latest votes + for packet in latest_unprocessed_votes + .latest_votes_per_pubkey + .read() + .unwrap() + .values() + { + packet.write().unwrap().take_vote().inspect(|_vote| { + latest_unprocessed_votes + .num_unprocessed_votes + .fetch_sub(1, Ordering::Relaxed); + }); + } + assert_eq!(0, latest_unprocessed_votes.len()); + + // Same votes with same timestamps should not replenish without flag + latest_unprocessed_votes + .update_latest_vote(vote_a.clone(), false /* should replenish */); + latest_unprocessed_votes + .update_latest_vote(vote_b.clone(), false /* should replenish */); + assert_eq!(0, latest_unprocessed_votes.len()); + + // Same votes with same timestamps should replenish with the flag + latest_unprocessed_votes.update_latest_vote(vote_a, true /* should replenish */); + latest_unprocessed_votes.update_latest_vote(vote_b, true /* should replenish */); + assert_eq!(0, latest_unprocessed_votes.len()); } #[test] @@ -711,7 +757,7 @@ mod tests { keypairs: &Arc>, i: usize| { let vote = from_slots(vec![(i as u64, 1)], VoteSource::Gossip, &keypairs[i], None); - latest_unprocessed_votes.update_latest_vote(vote); + latest_unprocessed_votes.update_latest_vote(vote, false /* should replenish */); }; let hdl = Builder::new() @@ -756,7 +802,8 @@ mod tests { &keypairs[rng.gen_range(0..10)], None, ); - latest_unprocessed_votes.update_latest_vote(vote); + latest_unprocessed_votes + .update_latest_vote(vote, false /* should replenish */); } }) .unwrap(); @@ -771,7 +818,8 @@ mod tests { &keypairs_tpu[rng.gen_range(0..10)], None, ); - latest_unprocessed_votes_tpu.update_latest_vote(vote); + latest_unprocessed_votes_tpu + .update_latest_vote(vote, false /* should replenish */); if i % 214 == 0 { // Simulate draining and processing packets let latest_votes_per_pubkey = latest_unprocessed_votes_tpu @@ -807,8 +855,8 @@ mod tests { let vote_a = from_slots(vec![(1, 1)], VoteSource::Gossip, &keypair_a, None); let vote_b = from_slots(vec![(2, 1)], VoteSource::Tpu, &keypair_b, None); - latest_unprocessed_votes.update_latest_vote(vote_a); - latest_unprocessed_votes.update_latest_vote(vote_b); + latest_unprocessed_votes.update_latest_vote(vote_a, false /* should replenish */); + latest_unprocessed_votes.update_latest_vote(vote_b, false /* should replenish */); // Don't forward 0 stake accounts let forwarded = latest_unprocessed_votes @@ -902,10 +950,10 @@ mod tests { let vote_c = from_slots(vec![(3, 1)], VoteSource::Tpu, &keypair_c, None); let vote_d = from_slots(vec![(4, 1)], VoteSource::Gossip, &keypair_d, None); - latest_unprocessed_votes.update_latest_vote(vote_a); - latest_unprocessed_votes.update_latest_vote(vote_b); - latest_unprocessed_votes.update_latest_vote(vote_c); - latest_unprocessed_votes.update_latest_vote(vote_d); + latest_unprocessed_votes.update_latest_vote(vote_a, false /* should replenish */); + latest_unprocessed_votes.update_latest_vote(vote_b, false /* should replenish */); + latest_unprocessed_votes.update_latest_vote(vote_c, false /* should replenish */); + latest_unprocessed_votes.update_latest_vote(vote_d, false /* should replenish */); assert_eq!(4, latest_unprocessed_votes.len()); latest_unprocessed_votes.clear_forwarded_packets(); diff --git a/core/src/banking_stage/unprocessed_transaction_storage.rs b/core/src/banking_stage/unprocessed_transaction_storage.rs index e790993dfcccd0..7a529bd457856c 100644 --- a/core/src/banking_stage/unprocessed_transaction_storage.rs +++ b/core/src/banking_stage/unprocessed_transaction_storage.rs @@ -442,18 +442,18 @@ impl VoteStorage { &mut self, deserialized_packets: Vec, ) -> VoteBatchInsertionMetrics { - self.latest_unprocessed_votes - .insert_batch( - deserialized_packets - .into_iter() - .filter_map(|deserialized_packet| { - LatestValidatorVotePacket::new_from_immutable( - Arc::new(deserialized_packet), - self.vote_source, - ) - .ok() - }), - ) + self.latest_unprocessed_votes.insert_batch( + deserialized_packets + .into_iter() + .filter_map(|deserialized_packet| { + LatestValidatorVotePacket::new_from_immutable( + Arc::new(deserialized_packet), + self.vote_source, + ) + .ok() + }), + false, // should_replenish_taken_votes + ) } fn filter_forwardable_packets_and_add_batches( @@ -524,12 +524,15 @@ impl VoteStorage { ) .ok() }), + true, // should_replenish_taken_votes ); } else { - self.latest_unprocessed_votes - .insert_batch(vote_packets.into_iter().filter_map(|packet| { + self.latest_unprocessed_votes.insert_batch( + vote_packets.into_iter().filter_map(|packet| { LatestValidatorVotePacket::new_from_immutable(packet, self.vote_source).ok() - })); + }), + true, // should_replenish_taken_votes + ); } } @@ -998,6 +1001,7 @@ mod tests { super::*, solana_ledger::genesis_utils::{create_genesis_config, GenesisConfigInfo}, solana_perf::packet::{Packet, PacketFlags}, + solana_runtime::genesis_utils, solana_sdk::{ hash::Hash, signature::{Keypair, Signer}, @@ -1266,6 +1270,58 @@ mod tests { Ok(()) } + #[test] + fn test_process_packets_retryable_indexes_reinserted() -> Result<(), Box> { + let node_keypair = Keypair::new(); + let genesis_config = + genesis_utils::create_genesis_config_with_leader(100, &node_keypair.pubkey(), 200) + .genesis_config; + let (bank, _bank_forks) = Bank::new_with_bank_forks_for_tests(&genesis_config); + let vote_keypair = Keypair::new(); + let mut vote = Packet::from_data( + None, + new_tower_sync_transaction( + TowerSync::default(), + Hash::new_unique(), + &node_keypair, + &vote_keypair, + &vote_keypair, + None, + ), + )?; + vote.meta_mut().flags.set(PacketFlags::SIMPLE_VOTE_TX, true); + + let mut transaction_storage = UnprocessedTransactionStorage::new_vote_storage( + Arc::new(LatestUnprocessedVotes::new()), + VoteSource::Tpu, + ); + + transaction_storage.insert_batch(vec![ImmutableDeserializedPacket::new(vote.clone())?]); + assert_eq!(1, transaction_storage.len()); + + // When processing packets, return all packets as retryable so that they + // are reinserted into storage + let _ = transaction_storage.process_packets( + bank.clone(), + &BankingStageStats::default(), + &mut LeaderSlotMetricsTracker::new(0), + |packets, _payload| { + // Return all packets indexes as retryable + Some( + packets + .iter() + .enumerate() + .map(|(index, _packet)| index) + .collect_vec(), + ) + }, + ); + + // All packets should remain in the transaction storage + assert_eq!(1, transaction_storage.len()); + Ok(()) + } + #[test] fn test_prepare_packets_to_forward() { solana_logger::setup();