Skip to content

Commit

Permalink
gossip: reduce votes stored per validator from 31 to 2
Browse files Browse the repository at this point in the history
  • Loading branch information
AshwinSekar committed Oct 16, 2024
1 parent 38404a6 commit e341e02
Show file tree
Hide file tree
Showing 2 changed files with 43 additions and 81 deletions.
117 changes: 39 additions & 78 deletions gossip/src/cluster_info.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ use {
},
crds_value::{
self, CrdsData, CrdsValue, CrdsValueLabel, EpochSlotsIndex, LowestSlot, NodeInstance,
SnapshotHashes, Version, Vote, MAX_WALLCLOCK,
SnapshotHashes, Version, Vote, MAX_VOTES, MAX_WALLCLOCK,
},
duplicate_shred::DuplicateShred,
epoch_slots::EpochSlots,
Expand Down Expand Up @@ -77,7 +77,6 @@ use {
streamer::{PacketBatchReceiver, PacketBatchSender},
},
solana_vote::vote_parser,
solana_vote_program::vote_state::MAX_LOCKOUT_HISTORY,
std::{
borrow::{Borrow, Cow},
collections::{HashMap, HashSet, VecDeque},
Expand Down Expand Up @@ -1054,7 +1053,7 @@ impl ClusterInfo {
}

pub fn push_vote_at_index(&self, vote: Transaction, vote_index: u8) {
assert!((vote_index as usize) < MAX_LOCKOUT_HISTORY);
assert!(vote_index < MAX_VOTES);
let self_pubkey = self.id();
let now = timestamp();
let vote = Vote::new(self_pubkey, vote, now).unwrap();
Expand All @@ -1066,13 +1065,19 @@ impl ClusterInfo {
}
}

fn find_vote_index_to_evict(&self, should_evict_vote: impl Fn(&Vote) -> bool) -> u8 {
/// If there are less than MAX_VOTES present, returns the next index without a vote.
/// If there are MAX_VOTES present:
/// - Finds the oldest wallclock vote with slot < `new_vote_slot` and returns its index
/// - If all votes are newer returns MAX_VOTES
fn find_vote_index_to_evict(&self, new_vote_slot: Slot) -> u8 {
let should_evict_vote =
|vote: &Vote| -> bool { vote.slot().map(|slot| new_vote_slot > slot).unwrap_or(true) };
let self_pubkey = self.id();
let mut num_crds_votes = 0;
let vote_index = {
let gossip_crds =
self.time_gossip_read_lock("gossip_read_push_vote", &self.stats.push_vote_read);
(0..MAX_LOCKOUT_HISTORY as u8)
(0..MAX_VOTES)
.filter_map(|ix| {
let vote = CrdsValueLabel::Vote(ix, self_pubkey);
let vote: &CrdsData = gossip_crds.get(&vote)?;
Expand All @@ -1088,34 +1093,20 @@ impl ClusterInfo {
.min() // Boot the oldest evicted vote by wallclock.
.map(|(_ /*wallclock*/, ix)| ix)
};
vote_index.unwrap_or(num_crds_votes)
if num_crds_votes < MAX_VOTES {
num_crds_votes
} else {
vote_index.unwrap_or(num_crds_votes)
}
}

pub fn push_vote(&self, tower: &[Slot], vote: Transaction) {
debug_assert!(tower.iter().tuple_windows().all(|(a, b)| a < b));
// Find a crds vote which is evicted from the tower, and recycle its
// vote-index. This can be either an old vote which is popped off the
// deque, or recent vote which has expired before getting enough
// confirmations.
// If all votes are still in the tower, add a new vote-index. If more
// than one vote is evicted, the oldest one by wallclock is returned in
// order to allow more recent votes more time to propagate through
// gossip.
// TODO: When there are more than one vote evicted from the tower, only
// one crds vote is overwritten here. Decide what to do with the rest.

// Returns true if the tower does not contain the vote.slot.
let should_evict_vote = |vote: &Vote| -> bool {
match vote.slot() {
Some(slot) => !tower.contains(&slot),
None => {
error!("crds vote with no slots!");
true
}
}
};
let vote_index = self.find_vote_index_to_evict(should_evict_vote);
if (vote_index as usize) >= MAX_LOCKOUT_HISTORY {
let vote_index =
self.find_vote_index_to_evict(tower.last().copied().expect("vote cannot be empty"));
if vote_index >= MAX_VOTES {
// In this case we have restarted with a mangled/missing tower and are attempting
// to push an old vote. This could be a slashable offense so better to panic here.
let (_, vote, hash, _) = vote_parser::parse_vote_transaction(&vote).unwrap();
panic!(
"invalid vote index: {}, switch: {}, vote slots: {:?}, tower: {:?}",
Expand All @@ -1133,7 +1124,7 @@ impl ClusterInfo {
let self_pubkey = self.id();
let gossip_crds =
self.time_gossip_read_lock("gossip_read_push_vote", &self.stats.push_vote_read);
(0..MAX_LOCKOUT_HISTORY as u8).find(|ix| {
(0..MAX_VOTES).find(|ix| {
let vote = CrdsValueLabel::Vote(*ix, self_pubkey);
let Some(vote) = gossip_crds.get::<&CrdsData>(&vote) else {
return false;
Expand All @@ -1158,13 +1149,8 @@ impl ClusterInfo {
} else {
// If you don't see a vote with the same slot yet, this means you probably
// restarted, and need to repush and evict the oldest vote
let should_evict_vote = |vote: &Vote| -> bool {
vote.slot()
.map(|slot| refresh_vote_slot > slot)
.unwrap_or(true)
};
let vote_index = self.find_vote_index_to_evict(should_evict_vote);
if (vote_index as usize) >= MAX_LOCKOUT_HISTORY {
let vote_index = self.find_vote_index_to_evict(refresh_vote_slot);
if vote_index >= MAX_VOTES {
warn!(
"trying to refresh slot {} but all votes in gossip table are for newer slots",
refresh_vote_slot,
Expand Down Expand Up @@ -3415,10 +3401,14 @@ mod tests {
duplicate_shred::{self, tests::new_rand_shred, MAX_DUPLICATE_SHREDS},
socketaddr,
},
crds_value::MAX_VOTES,
itertools::izip,
solana_ledger::shred::Shredder,
solana_sdk::signature::{Keypair, Signer},
solana_vote_program::{vote_instruction, vote_state::Vote},
solana_vote_program::{
vote_instruction,
vote_state::{Vote, MAX_LOCKOUT_HISTORY},
},
std::{
iter::repeat_with,
net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddrV4},
Expand Down Expand Up @@ -3967,7 +3957,7 @@ mod tests {
}

let initial_votes = cluster_info.get_votes(&mut Cursor::default());
assert_eq!(initial_votes.len(), MAX_LOCKOUT_HISTORY);
assert_eq!(initial_votes.len(), MAX_VOTES as usize);

// Trying to refresh a vote less than all votes in gossip should do nothing
let refresh_slot = lowest_vote_slot - 1;
Expand Down Expand Up @@ -4002,7 +3992,7 @@ mod tests {

// This should evict the latest vote since it's for a slot less than refresh_slot
let votes = cluster_info.get_votes(&mut Cursor::default());
assert_eq!(votes.len(), MAX_LOCKOUT_HISTORY);
assert_eq!(votes.len(), MAX_VOTES as usize);
assert!(votes.contains(&refresh_tx));
assert!(!votes.contains(&first_vote.unwrap()));
}
Expand Down Expand Up @@ -4148,62 +4138,33 @@ mod tests {

#[test]
fn test_push_votes_with_tower() {
let get_vote_slots = |cluster_info: &ClusterInfo| -> Vec<Slot> {
let get_vote_slots = |cluster_info: &ClusterInfo| -> Vec<usize> {
let (labels, _) = cluster_info.get_votes_with_labels(&mut Cursor::default());
let gossip_crds = cluster_info.gossip.crds.read().unwrap();
let mut vote_slots = HashSet::new();
for label in labels {
let CrdsData::Vote(_, vote) = &gossip_crds.get::<&CrdsData>(&label).unwrap() else {
panic!("this should not happen!");
};
assert!(vote_slots.insert(vote.slot().unwrap()));
assert!(vote_slots.insert(vote.slot().unwrap() as usize));
}
vote_slots.into_iter().collect()
vote_slots.into_iter().sorted().collect()
};
let keypair = Arc::new(Keypair::new());
let contact_info = ContactInfo::new_localhost(&keypair.pubkey(), 0);
let cluster_info = ClusterInfo::new(contact_info, keypair, SocketAddrSpace::Unspecified);
let mut tower = Vec::new();

// Should just evict the oldest vote
for k in 0..MAX_LOCKOUT_HISTORY {
let slot = k as Slot;
tower.push(slot);
let vote = new_vote_transaction(vec![slot]);
cluster_info.push_vote(&tower, vote);
}
let vote_slots = get_vote_slots(&cluster_info);
assert_eq!(vote_slots.len(), MAX_LOCKOUT_HISTORY);
for vote_slot in vote_slots {
assert!(vote_slot < MAX_LOCKOUT_HISTORY as u64);
}
// Push a new vote evicting one.
let slot = MAX_LOCKOUT_HISTORY as Slot;
tower.push(slot);
tower.remove(23);
let vote = new_vote_transaction(vec![slot]);
// New versioned-crds-value should have wallclock later than existing
// entries, otherwise might not get inserted into the table.
sleep(Duration::from_millis(5));
cluster_info.push_vote(&tower, vote);
let vote_slots = get_vote_slots(&cluster_info);
assert_eq!(vote_slots.len(), MAX_LOCKOUT_HISTORY);
for vote_slot in vote_slots {
assert!(vote_slot <= slot);
assert!(vote_slot != 23);
}
// Push a new vote evicting two.
// Older one should be evicted from the crds table.
let slot = slot + 1;
tower.push(slot);
tower.remove(17);
tower.remove(5);
let vote = new_vote_transaction(vec![slot]);
cluster_info.push_vote(&tower, vote);
let vote_slots = get_vote_slots(&cluster_info);
assert_eq!(vote_slots.len(), MAX_LOCKOUT_HISTORY);
for vote_slot in vote_slots {
assert!(vote_slot <= slot);
assert!(vote_slot != 23);
assert!(vote_slot != 5);
let vote_slots = get_vote_slots(&cluster_info);
let min_vote = k.saturating_sub((MAX_VOTES - 1) as usize);
assert!(vote_slots.into_iter().eq(min_vote..=k));
sleep(Duration::from_millis(1));
}
}

Expand Down
7 changes: 4 additions & 3 deletions gossip/src/crds_value.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,10 @@ pub const MAX_WALLCLOCK: u64 = 1_000_000_000_000_000;
pub const MAX_SLOT: u64 = 1_000_000_000_000_000;

pub type VoteIndex = u8;
// TODO: Remove this in favor of vote_state::MAX_LOCKOUT_HISTORY once
// the fleet is updated to the new ClusterInfo::push_vote code.
pub const MAX_VOTES: VoteIndex = 32;
/// Number of votes per validator to store.
/// TODO: update comment after testing
/// Only need latest vote + potential refresh of that vote.
pub const MAX_VOTES: VoteIndex = 2;

pub type EpochSlotsIndex = u8;
pub const MAX_EPOCH_SLOTS: EpochSlotsIndex = 255;
Expand Down

0 comments on commit e341e02

Please sign in to comment.