From e7778eb1d6c8007a2240b3c1521f4a521d2aef4e Mon Sep 17 00:00:00 2001 From: behzad nouri Date: Mon, 4 Nov 2024 22:34:26 +0000 Subject: [PATCH] splits out gossip CrdsData out of crds_value.rs (#3463) Reorganizing this code in preparation of upcoming changes to CrdsValue (de)serialization. --- core/src/cluster_slots_service.rs | 2 +- gossip/src/cluster_info.rs | 32 +- gossip/src/contact_info.rs | 2 +- gossip/src/crds.rs | 28 +- gossip/src/crds_data.rs | 718 +++++++++++++++++++++++++ gossip/src/crds_entry.rs | 7 +- gossip/src/crds_gossip.rs | 4 +- gossip/src/crds_gossip_pull.rs | 2 +- gossip/src/crds_gossip_push.rs | 2 +- gossip/src/crds_value.rs | 831 ++--------------------------- gossip/src/duplicate_shred.rs | 2 +- gossip/src/epoch_slots.rs | 4 +- gossip/src/legacy_contact_info.rs | 2 +- gossip/src/lib.rs | 1 + gossip/src/restart_crds_values.rs | 7 +- gossip/tests/crds_gossip.rs | 3 +- local-cluster/src/cluster_tests.rs | 5 +- turbine/src/cluster_nodes.rs | 3 +- validator/src/bootstrap.rs | 4 +- wen-restart/src/wen_restart.rs | 3 +- 20 files changed, 824 insertions(+), 838 deletions(-) create mode 100644 gossip/src/crds_data.rs diff --git a/core/src/cluster_slots_service.rs b/core/src/cluster_slots_service.rs index 7f6226e87e468f..8fbc0534d10c27 100644 --- a/core/src/cluster_slots_service.rs +++ b/core/src/cluster_slots_service.rs @@ -182,7 +182,7 @@ impl ClusterSlotsService { mod test { use { super::*, - solana_gossip::{cluster_info::Node, crds_value::LowestSlot}, + solana_gossip::{cluster_info::Node, crds_data::LowestSlot}, solana_sdk::signature::{Keypair, Signer}, solana_streamer::socket::SocketAddrSpace, }; diff --git a/gossip/src/cluster_info.rs b/gossip/src/cluster_info.rs index d8917c68f01629..1f33b04a3f4856 100644 --- a/gossip/src/cluster_info.rs +++ b/gossip/src/cluster_info.rs @@ -20,16 +20,17 @@ use { }, contact_info::{self, ContactInfo, Error as ContactInfoError}, crds::{Crds, Cursor, GossipRoute}, + crds_data::{ + self, CrdsData, EpochSlotsIndex, LowestSlot, NodeInstance, SnapshotHashes, Version, + Vote, MAX_WALLCLOCK, + }, crds_gossip::CrdsGossip, crds_gossip_error::CrdsGossipError, crds_gossip_pull::{ get_max_bloom_filter_bytes, CrdsFilter, CrdsTimeouts, ProcessPullStats, CRDS_GOSSIP_PULL_CRDS_TIMEOUT_MS, }, - crds_value::{ - self, CrdsData, CrdsValue, CrdsValueLabel, EpochSlotsIndex, LowestSlot, NodeInstance, - SnapshotHashes, Version, Vote, MAX_WALLCLOCK, - }, + crds_value::{CrdsValue, CrdsValueLabel}, duplicate_shred::DuplicateShred, epoch_slots::EpochSlots, gossip_error::GossipError, @@ -201,7 +202,7 @@ impl PruneData { /// New random PruneData for tests and benchmarks. #[cfg(test)] fn new_rand(rng: &mut R, self_keypair: &Keypair, num_nodes: Option) -> Self { - let wallclock = crds_value::new_rand_timestamp(rng); + let wallclock = crds_data::new_rand_timestamp(rng); let num_nodes = num_nodes.unwrap_or_else(|| rng.gen_range(0..MAX_PRUNE_DATA_NODES + 1)); let prunes = std::iter::repeat_with(Pubkey::new_unique) .take(num_nodes) @@ -310,7 +311,7 @@ pub(crate) type Ping = ping_pong::Ping<[u8; GOSSIP_PING_TOKEN_SIZE]>; #[cfg_attr( feature = "frozen-abi", derive(AbiExample, AbiEnumVisitor), - frozen_abi(digest = "GfVFxfPfYcFLCaa29uxQxyKJAuTZ1cYqcRKhVrEKwDK7") + frozen_abi(digest = "AbxVYLVnKbfJRjtnj9gSHegnE31wcTGAm2nEFuuyJKr4") )] #[derive(Serialize, Deserialize, Debug)] #[allow(clippy::large_enum_variant)] @@ -915,7 +916,7 @@ impl ClusterInfo { let current_slots: Vec<_> = { let gossip_crds = self.time_gossip_read_lock("lookup_epoch_slots", &self.stats.epoch_slots_lookup); - (0..crds_value::MAX_EPOCH_SLOTS) + (0..crds_data::MAX_EPOCH_SLOTS) .filter_map(|ix| { let label = CrdsValueLabel::EpochSlots(ix, self_pubkey); let crds_value = gossip_crds.get::<&CrdsValue>(&label)?; @@ -934,7 +935,7 @@ impl ClusterInfo { let total_slots = max_slot as isize - min_slot as isize; // WARN if CRDS is not storing at least a full epoch worth of slots if DEFAULT_SLOTS_PER_EPOCH as isize > total_slots - && crds_value::MAX_EPOCH_SLOTS as usize <= current_slots.len() + && crds_data::MAX_EPOCH_SLOTS as usize <= current_slots.len() { self.stats.epoch_slots_filled.add_relaxed(1); warn!( @@ -951,7 +952,7 @@ impl ClusterInfo { let mut entries = Vec::default(); let keypair = self.keypair(); while !update.is_empty() { - let ix = epoch_slot_index % crds_value::MAX_EPOCH_SLOTS; + let ix = epoch_slot_index % crds_data::MAX_EPOCH_SLOTS; let now = timestamp(); let mut slots = if !reset { self.lookup_epoch_slots(ix) @@ -1321,7 +1322,7 @@ impl ClusterInfo { .map(|entry| entry.version.clone()) .or_else(|| { gossip_crds - .get::<&crds_value::LegacyVersion>(*pubkey) + .get::<&crds_data::LegacyVersion>(*pubkey) .map(|entry| entry.version.clone()) .map(solana_version::LegacyVersion2::from) }) @@ -2477,10 +2478,10 @@ impl ClusterInfo { let my_contact_info = self.my_contact_info(); move |values: &[CrdsValue]| { if should_check_duplicate_instance - && values.iter().any(|value| { - instance.check_duplicate(value) - || matches!(value.data(), CrdsData::ContactInfo(other) - if my_contact_info.check_duplicate(other)) + && values.iter().any(|value| match value.data() { + CrdsData::ContactInfo(other) => my_contact_info.check_duplicate(other), + CrdsData::NodeInstance(other) => instance.check_duplicate(other), + _ => false, }) { return Err(GossipError::DuplicateNodeInstance); @@ -3403,8 +3404,9 @@ mod tests { use { super::*, crate::{ + crds_data::{AccountsHashes, Vote as CrdsVote}, crds_gossip_pull::tests::MIN_NUM_BLOOM_FILTERS, - crds_value::{AccountsHashes, CrdsValue, CrdsValueLabel, Vote as CrdsVote}, + crds_value::{CrdsValue, CrdsValueLabel}, duplicate_shred::{self, tests::new_rand_shred, MAX_DUPLICATE_SHREDS}, socketaddr, }, diff --git a/gossip/src/contact_info.rs b/gossip/src/contact_info.rs index 2d65b0147b8bce..430bfa34730574 100644 --- a/gossip/src/contact_info.rs +++ b/gossip/src/contact_info.rs @@ -1,6 +1,6 @@ pub use solana_client::connection_cache::Protocol; use { - crate::{crds_value::MAX_WALLCLOCK, legacy_contact_info::LegacyContactInfo}, + crate::{crds_data::MAX_WALLCLOCK, legacy_contact_info::LegacyContactInfo}, assert_matches::{assert_matches, debug_assert_matches}, serde::{Deserialize, Deserializer, Serialize}, solana_sanitize::{Sanitize, SanitizeError}, diff --git a/gossip/src/crds.rs b/gossip/src/crds.rs index e6291de9368271..adcbef1655aa1d 100644 --- a/gossip/src/crds.rs +++ b/gossip/src/crds.rs @@ -28,10 +28,11 @@ use { crate::{ contact_info::ContactInfo, + crds_data::CrdsData, crds_entry::CrdsEntry, crds_gossip_pull::CrdsTimeouts, crds_shards::CrdsShards, - crds_value::{CrdsData, CrdsValue, CrdsValueLabel}, + crds_value::{CrdsValue, CrdsValueLabel}, }, assert_matches::debug_assert_matches, bincode::serialize, @@ -199,17 +200,22 @@ fn overrides(value: &CrdsValue, other: &VersionedCrdsValue) -> bool { // Contact-infos and node instances are special cased so that if there are // two running instances of the same node, the more recent start is // propagated through gossip regardless of wallclocks. - if let CrdsData::NodeInstance(value) = value.data() { - if let Some(out) = value.overrides(&other.value) { - return out; + match value.data() { + CrdsData::ContactInfo(value) => { + if let CrdsData::ContactInfo(other) = other.value.data() { + if let Some(out) = value.overrides(other) { + return out; + } + } } - } - if let CrdsData::ContactInfo(value) = value.data() { - if let CrdsData::ContactInfo(other) = other.value.data() { - if let Some(out) = value.overrides(other) { - return out; + CrdsData::NodeInstance(value) => { + if let CrdsData::NodeInstance(other) = other.value.data() { + if let Some(out) = value.overrides(other) { + return out; + } } } + _ => (), } match value.wallclock().cmp(&other.value.wallclock()) { Ordering::Less => false, @@ -311,7 +317,7 @@ impl Crds { Entry::Occupied(mut entry) => { stats.record_fail(&value, route); trace!( - "INSERT FAILED data: {} new.wallclock: {}", + "INSERT FAILED data: {:?} new.wallclock: {}", value.value.label(), value.value.wallclock(), ); @@ -786,7 +792,7 @@ fn should_report_message_signature(signature: &Signature) -> bool { mod tests { use { super::*, - crate::crds_value::{new_rand_timestamp, AccountsHashes, NodeInstance}, + crate::crds_data::{new_rand_timestamp, AccountsHashes, NodeInstance}, rand::{thread_rng, Rng, SeedableRng}, rand_chacha::ChaChaRng, rayon::ThreadPoolBuilder, diff --git a/gossip/src/crds_data.rs b/gossip/src/crds_data.rs new file mode 100644 index 00000000000000..a1f4fdbcc6c517 --- /dev/null +++ b/gossip/src/crds_data.rs @@ -0,0 +1,718 @@ +use { + crate::{ + cluster_info::MAX_ACCOUNTS_HASHES, + contact_info::ContactInfo, + deprecated, + duplicate_shred::{DuplicateShred, DuplicateShredIndex, MAX_DUPLICATE_SHREDS}, + epoch_slots::EpochSlots, + legacy_contact_info::LegacyContactInfo, + restart_crds_values::{RestartHeaviestFork, RestartLastVotedForkSlots}, + }, + rand::{CryptoRng, Rng}, + serde::de::{Deserialize, Deserializer}, + solana_sanitize::{Sanitize, SanitizeError}, + solana_sdk::{ + clock::Slot, + hash::Hash, + pubkey::{self, Pubkey}, + timing::timestamp, + transaction::Transaction, + }, + solana_vote::vote_parser, + std::{cmp::Ordering, collections::BTreeSet}, +}; + +pub(crate) const MAX_WALLCLOCK: u64 = 1_000_000_000_000_000; +pub(crate) const MAX_SLOT: u64 = 1_000_000_000_000_000; + +pub(crate) type VoteIndex = u8; +// TODO: Remove this in favor of vote_state::MAX_LOCKOUT_HISTORY once +// the fleet is updated to the new ClusterInfo::push_vote code. +const MAX_VOTES: VoteIndex = 32; + +pub(crate) type EpochSlotsIndex = u8; +pub(crate) const MAX_EPOCH_SLOTS: EpochSlotsIndex = 255; + +/// CrdsData that defines the different types of items CrdsValues can hold +/// * Merge Strategy - Latest wallclock is picked +/// * LowestSlot index is deprecated +#[allow(clippy::large_enum_variant)] +#[cfg_attr(feature = "frozen-abi", derive(AbiExample, AbiEnumVisitor))] +#[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Eq)] +pub enum CrdsData { + #[allow(private_interfaces)] + LegacyContactInfo(LegacyContactInfo), + Vote(VoteIndex, Vote), + LowestSlot(/*DEPRECATED:*/ u8, LowestSlot), + #[allow(private_interfaces)] + LegacySnapshotHashes(LegacySnapshotHashes), // Deprecated + #[allow(private_interfaces)] + AccountsHashes(AccountsHashes), // Deprecated + EpochSlots(EpochSlotsIndex, EpochSlots), + #[allow(private_interfaces)] + LegacyVersion(LegacyVersion), + #[allow(private_interfaces)] + Version(Version), + #[allow(private_interfaces)] + NodeInstance(NodeInstance), + DuplicateShred(DuplicateShredIndex, DuplicateShred), + SnapshotHashes(SnapshotHashes), + ContactInfo(ContactInfo), + RestartLastVotedForkSlots(RestartLastVotedForkSlots), + RestartHeaviestFork(RestartHeaviestFork), +} + +impl Sanitize for CrdsData { + fn sanitize(&self) -> Result<(), SanitizeError> { + match self { + CrdsData::LegacyContactInfo(val) => val.sanitize(), + CrdsData::Vote(ix, val) => { + if *ix >= MAX_VOTES { + return Err(SanitizeError::ValueOutOfBounds); + } + val.sanitize() + } + CrdsData::LowestSlot(ix, val) => { + if *ix as usize >= 1 { + return Err(SanitizeError::ValueOutOfBounds); + } + val.sanitize() + } + CrdsData::LegacySnapshotHashes(val) => val.sanitize(), + CrdsData::AccountsHashes(val) => val.sanitize(), + CrdsData::EpochSlots(ix, val) => { + if *ix as usize >= MAX_EPOCH_SLOTS as usize { + return Err(SanitizeError::ValueOutOfBounds); + } + val.sanitize() + } + CrdsData::LegacyVersion(version) => version.sanitize(), + CrdsData::Version(version) => version.sanitize(), + CrdsData::NodeInstance(node) => node.sanitize(), + CrdsData::DuplicateShred(ix, shred) => { + if *ix >= MAX_DUPLICATE_SHREDS { + Err(SanitizeError::ValueOutOfBounds) + } else { + shred.sanitize() + } + } + CrdsData::SnapshotHashes(val) => val.sanitize(), + CrdsData::ContactInfo(node) => node.sanitize(), + CrdsData::RestartLastVotedForkSlots(slots) => slots.sanitize(), + CrdsData::RestartHeaviestFork(fork) => fork.sanitize(), + } + } +} + +/// Random timestamp for tests and benchmarks. +pub(crate) fn new_rand_timestamp(rng: &mut R) -> u64 { + const DELAY: u64 = 10 * 60 * 1000; // 10 minutes + timestamp() - DELAY + rng.gen_range(0..2 * DELAY) +} + +impl CrdsData { + /// New random CrdsData for tests and benchmarks. + pub(crate) fn new_rand(rng: &mut R, pubkey: Option) -> CrdsData { + let kind = rng.gen_range(0..9); + // TODO: Implement other kinds of CrdsData here. + // TODO: Assign ranges to each arm proportional to their frequency in + // the mainnet crds table. + match kind { + 0 => CrdsData::ContactInfo(ContactInfo::new_rand(rng, pubkey)), + // Index for LowestSlot is deprecated and should be zero. + 1 => CrdsData::LowestSlot(0, LowestSlot::new_rand(rng, pubkey)), + 2 => CrdsData::LegacySnapshotHashes(LegacySnapshotHashes::new_rand(rng, pubkey)), + 3 => CrdsData::AccountsHashes(AccountsHashes::new_rand(rng, pubkey)), + 4 => CrdsData::Version(Version::new_rand(rng, pubkey)), + 5 => CrdsData::Vote(rng.gen_range(0..MAX_VOTES), Vote::new_rand(rng, pubkey)), + 6 => CrdsData::RestartLastVotedForkSlots(RestartLastVotedForkSlots::new_rand( + rng, pubkey, + )), + 7 => CrdsData::RestartHeaviestFork(RestartHeaviestFork::new_rand(rng, pubkey)), + _ => CrdsData::EpochSlots( + rng.gen_range(0..MAX_EPOCH_SLOTS), + EpochSlots::new_rand(rng, pubkey), + ), + } + } + + pub(crate) fn wallclock(&self) -> u64 { + match self { + CrdsData::LegacyContactInfo(contact_info) => contact_info.wallclock(), + CrdsData::Vote(_, vote) => vote.wallclock, + CrdsData::LowestSlot(_, obj) => obj.wallclock, + CrdsData::LegacySnapshotHashes(hash) => hash.wallclock, + CrdsData::AccountsHashes(hash) => hash.wallclock, + CrdsData::EpochSlots(_, p) => p.wallclock, + CrdsData::LegacyVersion(version) => version.wallclock, + CrdsData::Version(version) => version.wallclock, + CrdsData::NodeInstance(node) => node.wallclock, + CrdsData::DuplicateShred(_, shred) => shred.wallclock, + CrdsData::SnapshotHashes(hash) => hash.wallclock, + CrdsData::ContactInfo(node) => node.wallclock(), + CrdsData::RestartLastVotedForkSlots(slots) => slots.wallclock, + CrdsData::RestartHeaviestFork(fork) => fork.wallclock, + } + } + + pub(crate) fn pubkey(&self) -> Pubkey { + match &self { + CrdsData::LegacyContactInfo(contact_info) => *contact_info.pubkey(), + CrdsData::Vote(_, vote) => vote.from, + CrdsData::LowestSlot(_, slots) => slots.from, + CrdsData::LegacySnapshotHashes(hash) => hash.from, + CrdsData::AccountsHashes(hash) => hash.from, + CrdsData::EpochSlots(_, p) => p.from, + CrdsData::LegacyVersion(version) => version.from, + CrdsData::Version(version) => version.from, + CrdsData::NodeInstance(node) => node.from, + CrdsData::DuplicateShred(_, shred) => shred.from, + CrdsData::SnapshotHashes(hash) => hash.from, + CrdsData::ContactInfo(node) => *node.pubkey(), + CrdsData::RestartLastVotedForkSlots(slots) => slots.from, + CrdsData::RestartHeaviestFork(fork) => fork.from, + } + } +} + +#[cfg_attr(feature = "frozen-abi", derive(AbiExample))] +#[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Eq)] +pub(crate) struct AccountsHashes { + pub(crate) from: Pubkey, + pub(crate) hashes: Vec<(Slot, Hash)>, + pub(crate) wallclock: u64, +} + +impl Sanitize for AccountsHashes { + fn sanitize(&self) -> Result<(), SanitizeError> { + sanitize_wallclock(self.wallclock)?; + for (slot, _) in &self.hashes { + if *slot >= MAX_SLOT { + return Err(SanitizeError::ValueOutOfBounds); + } + } + self.from.sanitize() + } +} + +impl AccountsHashes { + /// New random AccountsHashes for tests and benchmarks. + pub(crate) fn new_rand(rng: &mut R, pubkey: Option) -> Self { + let num_hashes = rng.gen_range(0..MAX_ACCOUNTS_HASHES) + 1; + let hashes = std::iter::repeat_with(|| { + let slot = 47825632 + rng.gen_range(0..512); + let hash = Hash::new_unique(); + (slot, hash) + }) + .take(num_hashes) + .collect(); + Self { + from: pubkey.unwrap_or_else(pubkey::new_rand), + hashes, + wallclock: new_rand_timestamp(rng), + } + } +} + +type LegacySnapshotHashes = AccountsHashes; + +#[cfg_attr(feature = "frozen-abi", derive(AbiExample))] +#[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Eq)] +pub struct SnapshotHashes { + pub from: Pubkey, + pub full: (Slot, Hash), + pub incremental: Vec<(Slot, Hash)>, + pub wallclock: u64, +} + +impl Sanitize for SnapshotHashes { + fn sanitize(&self) -> Result<(), SanitizeError> { + sanitize_wallclock(self.wallclock)?; + if self.full.0 >= MAX_SLOT { + return Err(SanitizeError::ValueOutOfBounds); + } + for (slot, _) in &self.incremental { + if *slot >= MAX_SLOT { + return Err(SanitizeError::ValueOutOfBounds); + } + if self.full.0 >= *slot { + return Err(SanitizeError::InvalidValue); + } + } + self.from.sanitize() + } +} + +#[cfg_attr(feature = "frozen-abi", derive(AbiExample))] +#[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Eq)] +pub struct LowestSlot { + pub(crate) from: Pubkey, + root: Slot, //deprecated + pub lowest: Slot, + slots: BTreeSet, //deprecated + stash: Vec, //deprecated + wallclock: u64, +} + +impl LowestSlot { + pub(crate) fn new(from: Pubkey, lowest: Slot, wallclock: u64) -> Self { + Self { + from, + root: 0, + lowest, + slots: BTreeSet::new(), + stash: vec![], + wallclock, + } + } + + /// New random LowestSlot for tests and benchmarks. + fn new_rand(rng: &mut R, pubkey: Option) -> Self { + Self { + from: pubkey.unwrap_or_else(pubkey::new_rand), + root: rng.gen(), + lowest: rng.gen(), + slots: BTreeSet::default(), + stash: Vec::default(), + wallclock: new_rand_timestamp(rng), + } + } +} + +impl Sanitize for LowestSlot { + fn sanitize(&self) -> Result<(), SanitizeError> { + sanitize_wallclock(self.wallclock)?; + if self.lowest >= MAX_SLOT { + return Err(SanitizeError::ValueOutOfBounds); + } + if self.root != 0 { + return Err(SanitizeError::InvalidValue); + } + if !self.slots.is_empty() { + return Err(SanitizeError::InvalidValue); + } + if !self.stash.is_empty() { + return Err(SanitizeError::InvalidValue); + } + self.from.sanitize() + } +} + +#[cfg_attr(feature = "frozen-abi", derive(AbiExample))] +#[derive(Clone, Debug, PartialEq, Eq, Serialize)] +pub struct Vote { + pub(crate) from: Pubkey, + transaction: Transaction, + pub(crate) wallclock: u64, + #[serde(skip_serializing)] + slot: Option, +} + +impl Sanitize for Vote { + fn sanitize(&self) -> Result<(), SanitizeError> { + sanitize_wallclock(self.wallclock)?; + self.from.sanitize()?; + self.transaction.sanitize() + } +} + +impl Vote { + // Returns None if cannot parse transaction into a vote. + pub fn new(from: Pubkey, transaction: Transaction, wallclock: u64) -> Option { + vote_parser::parse_vote_transaction(&transaction).map(|(_, vote, ..)| Self { + from, + transaction, + wallclock, + slot: vote.last_voted_slot(), + }) + } + + /// New random Vote for tests and benchmarks. + fn new_rand(rng: &mut R, pubkey: Option) -> Self { + Self { + from: pubkey.unwrap_or_else(pubkey::new_rand), + transaction: Transaction::default(), + wallclock: new_rand_timestamp(rng), + slot: None, + } + } + + pub(crate) fn transaction(&self) -> &Transaction { + &self.transaction + } + + pub(crate) fn slot(&self) -> Option { + self.slot + } +} + +impl<'de> Deserialize<'de> for Vote { + fn deserialize(deserializer: D) -> Result + where + D: Deserializer<'de>, + { + #[derive(Deserialize)] + struct Vote { + from: Pubkey, + transaction: Transaction, + wallclock: u64, + } + let vote = Vote::deserialize(deserializer)?; + vote.transaction + .sanitize() + .map_err(serde::de::Error::custom)?; + Self::new(vote.from, vote.transaction, vote.wallclock) + .ok_or_else(|| serde::de::Error::custom("invalid vote tx")) + } +} + +#[cfg_attr(feature = "frozen-abi", derive(AbiExample))] +#[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Eq)] +pub(crate) struct LegacyVersion { + from: Pubkey, + wallclock: u64, + pub(crate) version: solana_version::LegacyVersion1, +} + +impl Sanitize for LegacyVersion { + fn sanitize(&self) -> Result<(), SanitizeError> { + sanitize_wallclock(self.wallclock)?; + self.from.sanitize()?; + self.version.sanitize() + } +} + +#[cfg_attr(feature = "frozen-abi", derive(AbiExample))] +#[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Eq)] +pub(crate) struct Version { + from: Pubkey, + wallclock: u64, + pub(crate) version: solana_version::LegacyVersion2, +} + +impl Sanitize for Version { + fn sanitize(&self) -> Result<(), SanitizeError> { + sanitize_wallclock(self.wallclock)?; + self.from.sanitize()?; + self.version.sanitize() + } +} + +impl Version { + pub(crate) fn new(from: Pubkey) -> Self { + Self { + from, + wallclock: timestamp(), + version: solana_version::LegacyVersion2::default(), + } + } + + /// New random Version for tests and benchmarks. + fn new_rand(rng: &mut R, pubkey: Option) -> Self { + Self { + from: pubkey.unwrap_or_else(pubkey::new_rand), + wallclock: new_rand_timestamp(rng), + version: solana_version::LegacyVersion2 { + major: rng.gen(), + minor: rng.gen(), + patch: rng.gen(), + commit: Some(rng.gen()), + feature_set: rng.gen(), + }, + } + } +} + +#[cfg_attr(feature = "frozen-abi", derive(AbiExample))] +#[derive(Clone, Debug, PartialEq, Eq, Deserialize, Serialize)] +pub(crate) struct NodeInstance { + from: Pubkey, + wallclock: u64, + timestamp: u64, // Timestamp when the instance was created. + token: u64, // Randomly generated value at node instantiation. +} + +impl NodeInstance { + pub(crate) fn new(rng: &mut R, from: Pubkey, now: u64) -> Self + where + R: Rng + CryptoRng, + { + Self { + from, + wallclock: now, + timestamp: now, + token: rng.gen(), + } + } + + // Clones the value with an updated wallclock. + pub(crate) fn with_wallclock(&self, wallclock: u64) -> Self { + Self { wallclock, ..*self } + } + + // Returns true if the crds-value is a duplicate instance of this node, + // with a more recent timestamp. + // The older instance is considered the duplicate instance. If a staked + // node is restarted it will receive its old instance value from gossip. + // Considering the new instance as the duplicate would prevent the node + // from restarting. + pub(crate) fn check_duplicate(&self, other: &NodeInstance) -> bool { + self.token != other.token && self.timestamp <= other.timestamp && self.from == other.from + } + + // Returns None if tokens are the same or other is not a node-instance from + // the same owner. Otherwise returns true if self has more recent timestamp + // than other, and so overrides it. + pub(crate) fn overrides(&self, other: &NodeInstance) -> Option { + if self.token == other.token || self.from != other.from { + return None; + } + match self.timestamp.cmp(&other.timestamp) { + Ordering::Less => Some(false), + Ordering::Greater => Some(true), + // Ties should be broken in a deterministic way across the cluster, + // so that nodes propagate the same value through gossip. + Ordering::Equal => Some(other.token < self.token), + } + } +} + +impl Sanitize for NodeInstance { + fn sanitize(&self) -> Result<(), SanitizeError> { + sanitize_wallclock(self.wallclock)?; + self.from.sanitize() + } +} + +pub(crate) fn sanitize_wallclock(wallclock: u64) -> Result<(), SanitizeError> { + if wallclock >= MAX_WALLCLOCK { + Err(SanitizeError::ValueOutOfBounds) + } else { + Ok(()) + } +} + +#[cfg(test)] +mod test { + use { + super::*, + crate::crds_value::CrdsValue, + bincode::Options, + solana_perf::test_tx::new_test_vote_tx, + solana_sdk::{ + signature::{Keypair, Signer}, + timing::timestamp, + }, + solana_vote_program::{vote_instruction, vote_state}, + }; + + #[test] + fn test_lowest_slot_sanitize() { + let ls = LowestSlot::new(Pubkey::default(), 0, 0); + let v = CrdsValue::new_unsigned(CrdsData::LowestSlot(0, ls.clone())); + assert_eq!(v.sanitize(), Ok(())); + + let mut o = ls.clone(); + o.root = 1; + let v = CrdsValue::new_unsigned(CrdsData::LowestSlot(0, o)); + assert_eq!(v.sanitize(), Err(SanitizeError::InvalidValue)); + + let o = ls.clone(); + let v = CrdsValue::new_unsigned(CrdsData::LowestSlot(1, o)); + assert_eq!(v.sanitize(), Err(SanitizeError::ValueOutOfBounds)); + + let mut o = ls.clone(); + o.slots.insert(1); + let v = CrdsValue::new_unsigned(CrdsData::LowestSlot(0, o)); + assert_eq!(v.sanitize(), Err(SanitizeError::InvalidValue)); + + let mut o = ls; + o.stash.push(deprecated::EpochIncompleteSlots::default()); + let v = CrdsValue::new_unsigned(CrdsData::LowestSlot(0, o)); + assert_eq!(v.sanitize(), Err(SanitizeError::InvalidValue)); + } + + #[test] + fn test_max_vote_index() { + let mut rng = rand::thread_rng(); + let keypair = Keypair::new(); + let vote = Vote::new(keypair.pubkey(), new_test_vote_tx(&mut rng), timestamp()).unwrap(); + let vote = CrdsValue::new_signed(CrdsData::Vote(MAX_VOTES, vote), &keypair); + assert!(vote.sanitize().is_err()); + } + + #[test] + fn test_vote_round_trip() { + let mut rng = rand::thread_rng(); + let vote = vote_state::Vote::new( + vec![1, 3, 7], // slots + Hash::new_unique(), + ); + let ix = vote_instruction::vote( + &Pubkey::new_unique(), // vote_pubkey + &Pubkey::new_unique(), // authorized_voter_pubkey + vote, + ); + let tx = Transaction::new_with_payer( + &[ix], // instructions + Some(&Pubkey::new_unique()), // payer + ); + let vote = Vote::new( + Pubkey::new_unique(), // from + tx, + rng.gen(), // wallclock + ) + .unwrap(); + assert_eq!(vote.slot, Some(7)); + let bytes = bincode::serialize(&vote).unwrap(); + let other = bincode::deserialize(&bytes[..]).unwrap(); + assert_eq!(vote, other); + assert_eq!(other.slot, Some(7)); + let bytes = bincode::options().serialize(&vote).unwrap(); + let other = bincode::options().deserialize(&bytes[..]).unwrap(); + assert_eq!(vote, other); + assert_eq!(other.slot, Some(7)); + } + + #[test] + fn test_max_epoch_slots_index() { + let keypair = Keypair::new(); + let item = CrdsValue::new_signed( + CrdsData::EpochSlots( + MAX_EPOCH_SLOTS, + EpochSlots::new(keypair.pubkey(), timestamp()), + ), + &keypair, + ); + assert_eq!(item.sanitize(), Err(SanitizeError::ValueOutOfBounds)); + } + + #[test] + fn test_node_instance_crds_label() { + fn make_crds_value(node: NodeInstance) -> CrdsValue { + CrdsValue::new_unsigned(CrdsData::NodeInstance(node)) + } + let mut rng = rand::thread_rng(); + let now = timestamp(); + let pubkey = Pubkey::new_unique(); + let node = NodeInstance::new(&mut rng, pubkey, now); + assert_eq!( + make_crds_value(node.clone()).label(), + make_crds_value(node.with_wallclock(now + 8)).label() + ); + let other = NodeInstance { + from: Pubkey::new_unique(), + ..node + }; + assert_ne!( + make_crds_value(node.clone()).label(), + make_crds_value(other).label() + ); + let other = NodeInstance { + wallclock: now + 8, + ..node + }; + assert_eq!( + make_crds_value(node.clone()).label(), + make_crds_value(other).label() + ); + let other = NodeInstance { + timestamp: now + 8, + ..node + }; + assert_eq!( + make_crds_value(node.clone()).label(), + make_crds_value(other).label() + ); + let other = NodeInstance { + token: rng.gen(), + ..node + }; + assert_eq!( + make_crds_value(node).label(), + make_crds_value(other).label() + ); + } + + #[test] + fn test_check_duplicate_instance() { + let now = timestamp(); + let mut rng = rand::thread_rng(); + let pubkey = Pubkey::new_unique(); + let node = NodeInstance::new(&mut rng, pubkey, now); + // Same token is not a duplicate. + let other = NodeInstance { + from: pubkey, + wallclock: now + 1, + timestamp: now + 1, + token: node.token, + }; + assert!(!node.check_duplicate(&other)); + assert!(!other.check_duplicate(&node)); + assert_eq!(node.overrides(&other), None); + assert_eq!(other.overrides(&node), None); + // Older timestamp is not a duplicate. + let other = NodeInstance { + from: pubkey, + wallclock: now + 1, + timestamp: now - 1, + token: rng.gen(), + }; + assert!(!node.check_duplicate(&other)); + assert!(other.check_duplicate(&node)); + assert_eq!(node.overrides(&other), Some(true)); + assert_eq!(other.overrides(&node), Some(false)); + // Updated wallclock is not a duplicate. + let other = node.with_wallclock(now + 8); + assert_eq!( + other, + NodeInstance { + from: pubkey, + wallclock: now + 8, + timestamp: now, + token: node.token, + } + ); + assert!(!node.check_duplicate(&other)); + assert!(!other.check_duplicate(&node)); + assert_eq!(node.overrides(&other), None); + assert_eq!(other.overrides(&node), None); + // Duplicate instance; tied timestamp. + for _ in 0..10 { + let other = NodeInstance { + from: pubkey, + wallclock: 0, + timestamp: now, + token: rng.gen(), + }; + assert!(node.check_duplicate(&other)); + assert!(other.check_duplicate(&node)); + assert_eq!(node.overrides(&other), Some(other.token < node.token)); + assert_eq!(other.overrides(&node), Some(node.token < other.token)); + } + // Duplicate instance; more recent timestamp. + for _ in 0..10 { + let other = NodeInstance { + from: pubkey, + wallclock: 0, + timestamp: now + 1, + token: rng.gen(), + }; + assert!(node.check_duplicate(&other)); + assert!(!other.check_duplicate(&node)); + assert_eq!(node.overrides(&other), Some(false)); + assert_eq!(other.overrides(&node), Some(true)); + } + // Different pubkey is not a duplicate. + let other = NodeInstance { + from: Pubkey::new_unique(), + wallclock: now + 1, + timestamp: now + 1, + token: rng.gen(), + }; + assert!(!node.check_duplicate(&other)); + assert!(!other.check_duplicate(&node)); + assert_eq!(node.overrides(&other), None); + assert_eq!(other.overrides(&node), None); + } +} diff --git a/gossip/src/crds_entry.rs b/gossip/src/crds_entry.rs index 33ab68cd40e164..0427f63fa45729 100644 --- a/gossip/src/crds_entry.rs +++ b/gossip/src/crds_entry.rs @@ -2,9 +2,8 @@ use { crate::{ contact_info::ContactInfo, crds::VersionedCrdsValue, - crds_value::{ - CrdsData, CrdsValue, CrdsValueLabel, LegacyVersion, LowestSlot, SnapshotHashes, Version, - }, + crds_data::{CrdsData, LegacyVersion, LowestSlot, SnapshotHashes, Version}, + crds_value::{CrdsValue, CrdsValueLabel}, }, indexmap::IndexMap, solana_sdk::pubkey::Pubkey, @@ -68,7 +67,7 @@ mod tests { super::*, crate::{ crds::{Crds, GossipRoute}, - crds_value::new_rand_timestamp, + crds_data::new_rand_timestamp, }, rand::seq::SliceRandom, solana_sdk::signature::Keypair, diff --git a/gossip/src/crds_gossip.rs b/gossip/src/crds_gossip.rs index 0d100687a651cd..4426a16fb23f52 100644 --- a/gossip/src/crds_gossip.rs +++ b/gossip/src/crds_gossip.rs @@ -10,10 +10,11 @@ use { cluster_info_metrics::GossipStats, contact_info::ContactInfo, crds::{Crds, GossipRoute}, + crds_data::CrdsData, crds_gossip_error::CrdsGossipError, crds_gossip_pull::{CrdsFilter, CrdsGossipPull, CrdsTimeouts, ProcessPullStats}, crds_gossip_push::CrdsGossipPush, - crds_value::{CrdsData, CrdsValue}, + crds_value::CrdsValue, duplicate_shred::{self, DuplicateShredIndex, MAX_DUPLICATE_SHREDS}, ping_pong::PingCache, }, @@ -409,7 +410,6 @@ pub(crate) fn maybe_ping_gossip_addresses( mod test { use { super::*, - crate::crds_value::CrdsData, solana_sdk::{hash::hash, timing::timestamp}, }; diff --git a/gossip/src/crds_gossip_pull.rs b/gossip/src/crds_gossip_pull.rs index ec77fde1b7a5e4..302ab8ae5efe62 100644 --- a/gossip/src/crds_gossip_pull.rs +++ b/gossip/src/crds_gossip_pull.rs @@ -656,7 +656,7 @@ pub(crate) mod tests { super::*, crate::{ cluster_info::Protocol, - crds_value::{CrdsData, Vote}, + crds_data::{CrdsData, Vote}, legacy_contact_info::LegacyContactInfo, }, itertools::Itertools, diff --git a/gossip/src/crds_gossip_push.rs b/gossip/src/crds_gossip_push.rs index 49f04f6cd30d3a..7426edcbfb76b6 100644 --- a/gossip/src/crds_gossip_push.rs +++ b/gossip/src/crds_gossip_push.rs @@ -280,7 +280,7 @@ impl CrdsGossipPush { mod tests { use { super::*, - crate::{contact_info::ContactInfo, crds_value::CrdsData}, + crate::{contact_info::ContactInfo, crds_data::CrdsData}, std::time::{Duration, Instant}, }; diff --git a/gossip/src/crds_value.rs b/gossip/src/crds_value.rs index 9da9d30d904e44..4902de46538876 100644 --- a/gossip/src/crds_value.rs +++ b/gossip/src/crds_value.rs @@ -1,45 +1,20 @@ use { crate::{ - cluster_info::MAX_ACCOUNTS_HASHES, contact_info::ContactInfo, - deprecated, - duplicate_shred::{DuplicateShred, DuplicateShredIndex, MAX_DUPLICATE_SHREDS}, + crds_data::{CrdsData, EpochSlotsIndex, VoteIndex}, + duplicate_shred::DuplicateShredIndex, epoch_slots::EpochSlots, - legacy_contact_info::LegacyContactInfo, - restart_crds_values::{RestartHeaviestFork, RestartLastVotedForkSlots}, }, - bincode::{serialize, serialized_size}, - rand::{CryptoRng, Rng}, - serde::de::{Deserialize, Deserializer}, + bincode::serialize, + rand::Rng, solana_sanitize::{Sanitize, SanitizeError}, solana_sdk::{ - clock::Slot, - hash::Hash, - pubkey::{self, Pubkey}, + pubkey::Pubkey, signature::{Keypair, Signable, Signature, Signer}, - timing::timestamp, - transaction::Transaction, - }, - solana_vote::vote_parser, - std::{ - borrow::{Borrow, Cow}, - cmp::Ordering, - collections::BTreeSet, - fmt, }, + std::borrow::{Borrow, Cow}, }; -pub const MAX_WALLCLOCK: u64 = 1_000_000_000_000_000; -pub const MAX_SLOT: u64 = 1_000_000_000_000_000; - -pub type VoteIndex = u8; -// TODO: Remove this in favor of vote_state::MAX_LOCKOUT_HISTORY once -// the fleet is updated to the new ClusterInfo::push_vote code. -pub const MAX_VOTES: VoteIndex = 32; - -pub type EpochSlotsIndex = u8; -pub const MAX_EPOCH_SLOTS: EpochSlotsIndex = 255; - /// CrdsValue that is replicated across the cluster #[cfg_attr(feature = "frozen-abi", derive(AbiExample))] #[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Eq)] @@ -78,432 +53,6 @@ impl Signable for CrdsValue { } } -/// CrdsData that defines the different types of items CrdsValues can hold -/// * Merge Strategy - Latest wallclock is picked -/// * LowestSlot index is deprecated -#[allow(clippy::large_enum_variant)] -#[cfg_attr(feature = "frozen-abi", derive(AbiExample, AbiEnumVisitor))] -#[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Eq)] -pub enum CrdsData { - #[allow(private_interfaces)] - LegacyContactInfo(LegacyContactInfo), - Vote(VoteIndex, Vote), - LowestSlot(/*DEPRECATED:*/ u8, LowestSlot), - LegacySnapshotHashes(LegacySnapshotHashes), // Deprecated - AccountsHashes(AccountsHashes), // Deprecated - EpochSlots(EpochSlotsIndex, EpochSlots), - LegacyVersion(LegacyVersion), - Version(Version), - NodeInstance(NodeInstance), - DuplicateShred(DuplicateShredIndex, DuplicateShred), - SnapshotHashes(SnapshotHashes), - ContactInfo(ContactInfo), - RestartLastVotedForkSlots(RestartLastVotedForkSlots), - RestartHeaviestFork(RestartHeaviestFork), -} - -impl Sanitize for CrdsData { - fn sanitize(&self) -> Result<(), SanitizeError> { - match self { - CrdsData::LegacyContactInfo(val) => val.sanitize(), - CrdsData::Vote(ix, val) => { - if *ix >= MAX_VOTES { - return Err(SanitizeError::ValueOutOfBounds); - } - val.sanitize() - } - CrdsData::LowestSlot(ix, val) => { - if *ix as usize >= 1 { - return Err(SanitizeError::ValueOutOfBounds); - } - val.sanitize() - } - CrdsData::LegacySnapshotHashes(val) => val.sanitize(), - CrdsData::AccountsHashes(val) => val.sanitize(), - CrdsData::EpochSlots(ix, val) => { - if *ix as usize >= MAX_EPOCH_SLOTS as usize { - return Err(SanitizeError::ValueOutOfBounds); - } - val.sanitize() - } - CrdsData::LegacyVersion(version) => version.sanitize(), - CrdsData::Version(version) => version.sanitize(), - CrdsData::NodeInstance(node) => node.sanitize(), - CrdsData::DuplicateShred(ix, shred) => { - if *ix >= MAX_DUPLICATE_SHREDS { - Err(SanitizeError::ValueOutOfBounds) - } else { - shred.sanitize() - } - } - CrdsData::SnapshotHashes(val) => val.sanitize(), - CrdsData::ContactInfo(node) => node.sanitize(), - CrdsData::RestartLastVotedForkSlots(slots) => slots.sanitize(), - CrdsData::RestartHeaviestFork(fork) => fork.sanitize(), - } - } -} - -/// Random timestamp for tests and benchmarks. -pub(crate) fn new_rand_timestamp(rng: &mut R) -> u64 { - const DELAY: u64 = 10 * 60 * 1000; // 10 minutes - timestamp() - DELAY + rng.gen_range(0..2 * DELAY) -} - -impl CrdsData { - /// New random CrdsData for tests and benchmarks. - fn new_rand(rng: &mut R, pubkey: Option) -> CrdsData { - let kind = rng.gen_range(0..9); - // TODO: Implement other kinds of CrdsData here. - // TODO: Assign ranges to each arm proportional to their frequency in - // the mainnet crds table. - match kind { - 0 => CrdsData::ContactInfo(ContactInfo::new_rand(rng, pubkey)), - // Index for LowestSlot is deprecated and should be zero. - 1 => CrdsData::LowestSlot(0, LowestSlot::new_rand(rng, pubkey)), - 2 => CrdsData::LegacySnapshotHashes(LegacySnapshotHashes::new_rand(rng, pubkey)), - 3 => CrdsData::AccountsHashes(AccountsHashes::new_rand(rng, pubkey)), - 4 => CrdsData::Version(Version::new_rand(rng, pubkey)), - 5 => CrdsData::Vote(rng.gen_range(0..MAX_VOTES), Vote::new_rand(rng, pubkey)), - 6 => CrdsData::RestartLastVotedForkSlots(RestartLastVotedForkSlots::new_rand( - rng, pubkey, - )), - 7 => CrdsData::RestartHeaviestFork(RestartHeaviestFork::new_rand(rng, pubkey)), - _ => CrdsData::EpochSlots( - rng.gen_range(0..MAX_EPOCH_SLOTS), - EpochSlots::new_rand(rng, pubkey), - ), - } - } -} - -#[cfg_attr(feature = "frozen-abi", derive(AbiExample))] -#[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Eq)] -pub struct AccountsHashes { - pub from: Pubkey, - pub hashes: Vec<(Slot, Hash)>, - pub wallclock: u64, -} - -impl Sanitize for AccountsHashes { - fn sanitize(&self) -> Result<(), SanitizeError> { - sanitize_wallclock(self.wallclock)?; - for (slot, _) in &self.hashes { - if *slot >= MAX_SLOT { - return Err(SanitizeError::ValueOutOfBounds); - } - } - self.from.sanitize() - } -} - -impl AccountsHashes { - pub fn new(from: Pubkey, hashes: Vec<(Slot, Hash)>) -> Self { - Self { - from, - hashes, - wallclock: timestamp(), - } - } - - /// New random AccountsHashes for tests and benchmarks. - pub(crate) fn new_rand(rng: &mut R, pubkey: Option) -> Self { - let num_hashes = rng.gen_range(0..MAX_ACCOUNTS_HASHES) + 1; - let hashes = std::iter::repeat_with(|| { - let slot = 47825632 + rng.gen_range(0..512); - let hash = Hash::new_unique(); - (slot, hash) - }) - .take(num_hashes) - .collect(); - Self { - from: pubkey.unwrap_or_else(pubkey::new_rand), - hashes, - wallclock: new_rand_timestamp(rng), - } - } -} - -type LegacySnapshotHashes = AccountsHashes; - -#[cfg_attr(feature = "frozen-abi", derive(AbiExample))] -#[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Eq)] -pub struct SnapshotHashes { - pub from: Pubkey, - pub full: (Slot, Hash), - pub incremental: Vec<(Slot, Hash)>, - pub wallclock: u64, -} - -impl Sanitize for SnapshotHashes { - fn sanitize(&self) -> Result<(), SanitizeError> { - sanitize_wallclock(self.wallclock)?; - if self.full.0 >= MAX_SLOT { - return Err(SanitizeError::ValueOutOfBounds); - } - for (slot, _) in &self.incremental { - if *slot >= MAX_SLOT { - return Err(SanitizeError::ValueOutOfBounds); - } - if self.full.0 >= *slot { - return Err(SanitizeError::InvalidValue); - } - } - self.from.sanitize() - } -} - -#[cfg_attr(feature = "frozen-abi", derive(AbiExample))] -#[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Eq)] -pub struct LowestSlot { - pub from: Pubkey, - root: Slot, //deprecated - pub lowest: Slot, - slots: BTreeSet, //deprecated - stash: Vec, //deprecated - pub wallclock: u64, -} - -impl LowestSlot { - pub fn new(from: Pubkey, lowest: Slot, wallclock: u64) -> Self { - Self { - from, - root: 0, - lowest, - slots: BTreeSet::new(), - stash: vec![], - wallclock, - } - } - - /// New random LowestSlot for tests and benchmarks. - fn new_rand(rng: &mut R, pubkey: Option) -> Self { - Self { - from: pubkey.unwrap_or_else(pubkey::new_rand), - root: rng.gen(), - lowest: rng.gen(), - slots: BTreeSet::default(), - stash: Vec::default(), - wallclock: new_rand_timestamp(rng), - } - } -} - -impl Sanitize for LowestSlot { - fn sanitize(&self) -> Result<(), SanitizeError> { - sanitize_wallclock(self.wallclock)?; - if self.lowest >= MAX_SLOT { - return Err(SanitizeError::ValueOutOfBounds); - } - if self.root != 0 { - return Err(SanitizeError::InvalidValue); - } - if !self.slots.is_empty() { - return Err(SanitizeError::InvalidValue); - } - if !self.stash.is_empty() { - return Err(SanitizeError::InvalidValue); - } - self.from.sanitize() - } -} - -#[cfg_attr(feature = "frozen-abi", derive(AbiExample))] -#[derive(Clone, Debug, PartialEq, Eq, Serialize)] -pub struct Vote { - pub(crate) from: Pubkey, - transaction: Transaction, - pub(crate) wallclock: u64, - #[serde(skip_serializing)] - slot: Option, -} - -impl Sanitize for Vote { - fn sanitize(&self) -> Result<(), SanitizeError> { - sanitize_wallclock(self.wallclock)?; - self.from.sanitize()?; - self.transaction.sanitize() - } -} - -impl Vote { - // Returns None if cannot parse transaction into a vote. - pub fn new(from: Pubkey, transaction: Transaction, wallclock: u64) -> Option { - vote_parser::parse_vote_transaction(&transaction).map(|(_, vote, ..)| Self { - from, - transaction, - wallclock, - slot: vote.last_voted_slot(), - }) - } - - /// New random Vote for tests and benchmarks. - fn new_rand(rng: &mut R, pubkey: Option) -> Self { - Self { - from: pubkey.unwrap_or_else(pubkey::new_rand), - transaction: Transaction::default(), - wallclock: new_rand_timestamp(rng), - slot: None, - } - } - - pub(crate) fn transaction(&self) -> &Transaction { - &self.transaction - } - - pub(crate) fn slot(&self) -> Option { - self.slot - } -} - -impl<'de> Deserialize<'de> for Vote { - fn deserialize(deserializer: D) -> Result - where - D: Deserializer<'de>, - { - #[derive(Deserialize)] - struct Vote { - from: Pubkey, - transaction: Transaction, - wallclock: u64, - } - let vote = Vote::deserialize(deserializer)?; - vote.transaction - .sanitize() - .map_err(serde::de::Error::custom)?; - Self::new(vote.from, vote.transaction, vote.wallclock) - .ok_or_else(|| serde::de::Error::custom("invalid vote tx")) - } -} - -#[cfg_attr(feature = "frozen-abi", derive(AbiExample))] -#[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Eq)] -pub struct LegacyVersion { - pub from: Pubkey, - pub wallclock: u64, - pub version: solana_version::LegacyVersion1, -} - -impl Sanitize for LegacyVersion { - fn sanitize(&self) -> Result<(), SanitizeError> { - sanitize_wallclock(self.wallclock)?; - self.from.sanitize()?; - self.version.sanitize() - } -} - -#[cfg_attr(feature = "frozen-abi", derive(AbiExample))] -#[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Eq)] -pub struct Version { - pub from: Pubkey, - pub wallclock: u64, - pub version: solana_version::LegacyVersion2, -} - -impl Sanitize for Version { - fn sanitize(&self) -> Result<(), SanitizeError> { - sanitize_wallclock(self.wallclock)?; - self.from.sanitize()?; - self.version.sanitize() - } -} - -impl Version { - pub fn new(from: Pubkey) -> Self { - Self { - from, - wallclock: timestamp(), - version: solana_version::LegacyVersion2::default(), - } - } - - /// New random Version for tests and benchmarks. - fn new_rand(rng: &mut R, pubkey: Option) -> Self { - Self { - from: pubkey.unwrap_or_else(pubkey::new_rand), - wallclock: new_rand_timestamp(rng), - version: solana_version::LegacyVersion2 { - major: rng.gen(), - minor: rng.gen(), - patch: rng.gen(), - commit: Some(rng.gen()), - feature_set: rng.gen(), - }, - } - } -} - -#[cfg_attr(feature = "frozen-abi", derive(AbiExample))] -#[derive(Clone, Debug, PartialEq, Eq, Deserialize, Serialize)] -pub struct NodeInstance { - from: Pubkey, - wallclock: u64, - timestamp: u64, // Timestamp when the instance was created. - token: u64, // Randomly generated value at node instantiation. -} - -impl NodeInstance { - pub fn new(rng: &mut R, from: Pubkey, now: u64) -> Self - where - R: Rng + CryptoRng, - { - Self { - from, - wallclock: now, - timestamp: now, - token: rng.gen(), - } - } - - // Clones the value with an updated wallclock. - pub(crate) fn with_wallclock(&self, wallclock: u64) -> Self { - Self { wallclock, ..*self } - } - - // Returns true if the crds-value is a duplicate instance of this node, - // with a more recent timestamp. - // The older instance is considered the duplicate instance. If a staked - // node is restarted it will receive its old instance value from gossip. - // Considering the new instance as the duplicate would prevent the node - // from restarting. - pub(crate) fn check_duplicate(&self, other: &CrdsValue) -> bool { - match &other.data { - CrdsData::NodeInstance(other) => { - self.token != other.token - && self.timestamp <= other.timestamp - && self.from == other.from - } - _ => false, - } - } - - // Returns None if tokens are the same or other is not a node-instance from - // the same owner. Otherwise returns true if self has more recent timestamp - // than other, and so overrides it. - pub(crate) fn overrides(&self, other: &CrdsValue) -> Option { - let CrdsData::NodeInstance(other) = &other.data else { - return None; - }; - if self.token == other.token || self.from != other.from { - return None; - } - match self.timestamp.cmp(&other.timestamp) { - Ordering::Less => Some(false), - Ordering::Greater => Some(true), - // Ties should be broken in a deterministic way across the cluster, - // so that nodes propagate the same value through gossip. - Ordering::Equal => Some(other.token < self.token), - } - } -} - -impl Sanitize for NodeInstance { - fn sanitize(&self) -> Result<(), SanitizeError> { - sanitize_wallclock(self.wallclock)?; - self.from.sanitize() - } -} - /// Type of the replicated value /// These are labels for values in a record that is associated with `Pubkey` #[derive(PartialEq, Hash, Eq, Clone, Debug)] @@ -524,37 +73,6 @@ pub enum CrdsValueLabel { RestartHeaviestFork(Pubkey), } -impl fmt::Display for CrdsValueLabel { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - match self { - CrdsValueLabel::LegacyContactInfo(_) => { - write!(f, "LegacyContactInfo({})", self.pubkey()) - } - CrdsValueLabel::Vote(ix, _) => write!(f, "Vote({}, {})", ix, self.pubkey()), - CrdsValueLabel::LowestSlot(_) => write!(f, "LowestSlot({})", self.pubkey()), - CrdsValueLabel::LegacySnapshotHashes(_) => { - write!(f, "LegacySnapshotHashes({})", self.pubkey()) - } - CrdsValueLabel::EpochSlots(ix, _) => write!(f, "EpochSlots({}, {})", ix, self.pubkey()), - CrdsValueLabel::AccountsHashes(_) => write!(f, "AccountsHashes({})", self.pubkey()), - CrdsValueLabel::LegacyVersion(_) => write!(f, "LegacyVersion({})", self.pubkey()), - CrdsValueLabel::Version(_) => write!(f, "Version({})", self.pubkey()), - CrdsValueLabel::NodeInstance(pk) => write!(f, "NodeInstance({pk})"), - CrdsValueLabel::DuplicateShred(ix, pk) => write!(f, "DuplicateShred({ix}, {pk})"), - CrdsValueLabel::SnapshotHashes(_) => { - write!(f, "SnapshotHashes({})", self.pubkey()) - } - CrdsValueLabel::ContactInfo(_) => write!(f, "ContactInfo({})", self.pubkey()), - CrdsValueLabel::RestartLastVotedForkSlots(_) => { - write!(f, "RestartLastVotedForkSlots({})", self.pubkey()) - } - CrdsValueLabel::RestartHeaviestFork(_) => { - write!(f, "RestartHeaviestFork({})", self.pubkey()) - } - } - } -} - impl CrdsValueLabel { pub fn pubkey(&self) -> Pubkey { match self { @@ -618,98 +136,60 @@ impl CrdsValue { /// Totally unsecure unverifiable wallclock of the node that generated this message /// Latest wallclock is always picked. /// This is used to time out push messages. - pub fn wallclock(&self) -> u64 { - match &self.data { - CrdsData::LegacyContactInfo(contact_info) => contact_info.wallclock(), - CrdsData::Vote(_, vote) => vote.wallclock, - CrdsData::LowestSlot(_, obj) => obj.wallclock, - CrdsData::LegacySnapshotHashes(hash) => hash.wallclock, - CrdsData::AccountsHashes(hash) => hash.wallclock, - CrdsData::EpochSlots(_, p) => p.wallclock, - CrdsData::LegacyVersion(version) => version.wallclock, - CrdsData::Version(version) => version.wallclock, - CrdsData::NodeInstance(node) => node.wallclock, - CrdsData::DuplicateShred(_, shred) => shred.wallclock, - CrdsData::SnapshotHashes(hash) => hash.wallclock, - CrdsData::ContactInfo(node) => node.wallclock(), - CrdsData::RestartLastVotedForkSlots(slots) => slots.wallclock, - CrdsData::RestartHeaviestFork(fork) => fork.wallclock, - } + pub(crate) fn wallclock(&self) -> u64 { + self.data.wallclock() } - pub fn pubkey(&self) -> Pubkey { - match &self.data { - CrdsData::LegacyContactInfo(contact_info) => *contact_info.pubkey(), - CrdsData::Vote(_, vote) => vote.from, - CrdsData::LowestSlot(_, slots) => slots.from, - CrdsData::LegacySnapshotHashes(hash) => hash.from, - CrdsData::AccountsHashes(hash) => hash.from, - CrdsData::EpochSlots(_, p) => p.from, - CrdsData::LegacyVersion(version) => version.from, - CrdsData::Version(version) => version.from, - CrdsData::NodeInstance(node) => node.from, - CrdsData::DuplicateShred(_, shred) => shred.from, - CrdsData::SnapshotHashes(hash) => hash.from, - CrdsData::ContactInfo(node) => *node.pubkey(), - CrdsData::RestartLastVotedForkSlots(slots) => slots.from, - CrdsData::RestartHeaviestFork(fork) => fork.from, - } + + pub(crate) fn pubkey(&self) -> Pubkey { + self.data.pubkey() } + pub fn label(&self) -> CrdsValueLabel { - match &self.data { - CrdsData::LegacyContactInfo(_) => CrdsValueLabel::LegacyContactInfo(self.pubkey()), - CrdsData::Vote(ix, _) => CrdsValueLabel::Vote(*ix, self.pubkey()), - CrdsData::LowestSlot(_, _) => CrdsValueLabel::LowestSlot(self.pubkey()), - CrdsData::LegacySnapshotHashes(_) => { - CrdsValueLabel::LegacySnapshotHashes(self.pubkey()) - } - CrdsData::AccountsHashes(_) => CrdsValueLabel::AccountsHashes(self.pubkey()), - CrdsData::EpochSlots(ix, _) => CrdsValueLabel::EpochSlots(*ix, self.pubkey()), - CrdsData::LegacyVersion(_) => CrdsValueLabel::LegacyVersion(self.pubkey()), - CrdsData::Version(_) => CrdsValueLabel::Version(self.pubkey()), - CrdsData::NodeInstance(node) => CrdsValueLabel::NodeInstance(node.from), - CrdsData::DuplicateShred(ix, shred) => CrdsValueLabel::DuplicateShred(*ix, shred.from), - CrdsData::SnapshotHashes(_) => CrdsValueLabel::SnapshotHashes(self.pubkey()), - CrdsData::ContactInfo(node) => CrdsValueLabel::ContactInfo(*node.pubkey()), + let pubkey = self.data.pubkey(); + match self.data { + CrdsData::LegacyContactInfo(_) => CrdsValueLabel::LegacyContactInfo(pubkey), + CrdsData::Vote(ix, _) => CrdsValueLabel::Vote(ix, pubkey), + CrdsData::LowestSlot(_, _) => CrdsValueLabel::LowestSlot(pubkey), + CrdsData::LegacySnapshotHashes(_) => CrdsValueLabel::LegacySnapshotHashes(pubkey), + CrdsData::AccountsHashes(_) => CrdsValueLabel::AccountsHashes(pubkey), + CrdsData::EpochSlots(ix, _) => CrdsValueLabel::EpochSlots(ix, pubkey), + CrdsData::LegacyVersion(_) => CrdsValueLabel::LegacyVersion(pubkey), + CrdsData::Version(_) => CrdsValueLabel::Version(pubkey), + CrdsData::NodeInstance(_) => CrdsValueLabel::NodeInstance(pubkey), + CrdsData::DuplicateShred(ix, _) => CrdsValueLabel::DuplicateShred(ix, pubkey), + CrdsData::SnapshotHashes(_) => CrdsValueLabel::SnapshotHashes(pubkey), + CrdsData::ContactInfo(_) => CrdsValueLabel::ContactInfo(pubkey), CrdsData::RestartLastVotedForkSlots(_) => { - CrdsValueLabel::RestartLastVotedForkSlots(self.pubkey()) + CrdsValueLabel::RestartLastVotedForkSlots(pubkey) } - CrdsData::RestartHeaviestFork(_) => CrdsValueLabel::RestartHeaviestFork(self.pubkey()), + CrdsData::RestartHeaviestFork(_) => CrdsValueLabel::RestartHeaviestFork(pubkey), } } + pub(crate) fn contact_info(&self) -> Option<&ContactInfo> { - match &self.data { - CrdsData::ContactInfo(contact_info) => Some(contact_info), - _ => None, - } + let CrdsData::ContactInfo(node) = &self.data else { + return None; + }; + Some(node) } pub(crate) fn epoch_slots(&self) -> Option<&EpochSlots> { - match &self.data { - CrdsData::EpochSlots(_, slots) => Some(slots), - _ => None, - } + let CrdsData::EpochSlots(_, epoch_slots) = &self.data else { + return None; + }; + Some(epoch_slots) } /// Returns the size (in bytes) of a CrdsValue - pub fn size(&self) -> u64 { - serialized_size(&self).expect("unable to serialize contact info") + #[cfg(test)] + pub(crate) fn size(&self) -> u64 { + bincode::serialized_size(&self).expect("unable to serialize contact info") } /// Returns true if, regardless of prunes, this crds-value /// should be pushed to the receiving node. pub(crate) fn should_force_push(&self, peer: &Pubkey) -> bool { - match &self.data { - CrdsData::NodeInstance(node) => node.from == *peer, - _ => false, - } - } -} - -pub(crate) fn sanitize_wallclock(wallclock: u64) -> Result<(), SanitizeError> { - if wallclock >= MAX_WALLCLOCK { - Err(SanitizeError::ValueOutOfBounds) - } else { - Ok(()) + matches!(self.data, CrdsData::NodeInstance(_)) && &self.pubkey() == peer } } @@ -717,13 +197,13 @@ pub(crate) fn sanitize_wallclock(wallclock: u64) -> Result<(), SanitizeError> { mod test { use { super::*, - bincode::{deserialize, Options}, + crate::crds_data::{LowestSlot, NodeInstance, Vote}, + bincode::deserialize, solana_perf::test_tx::new_test_vote_tx, solana_sdk::{ signature::{Keypair, Signer}, timing::timestamp, }, - solana_vote_program::{vote_instruction, vote_state}, }; #[test] @@ -755,32 +235,6 @@ mod test { assert_eq!(v.label(), CrdsValueLabel::LowestSlot(key)); } - #[test] - fn test_lowest_slot_sanitize() { - let ls = LowestSlot::new(Pubkey::default(), 0, 0); - let v = CrdsValue::new_unsigned(CrdsData::LowestSlot(0, ls.clone())); - assert_eq!(v.sanitize(), Ok(())); - - let mut o = ls.clone(); - o.root = 1; - let v = CrdsValue::new_unsigned(CrdsData::LowestSlot(0, o)); - assert_eq!(v.sanitize(), Err(SanitizeError::InvalidValue)); - - let o = ls.clone(); - let v = CrdsValue::new_unsigned(CrdsData::LowestSlot(1, o)); - assert_eq!(v.sanitize(), Err(SanitizeError::ValueOutOfBounds)); - - let mut o = ls.clone(); - o.slots.insert(1); - let v = CrdsValue::new_unsigned(CrdsData::LowestSlot(0, o)); - assert_eq!(v.sanitize(), Err(SanitizeError::InvalidValue)); - - let mut o = ls; - o.stash.push(deprecated::EpochIncompleteSlots::default()); - let v = CrdsValue::new_unsigned(CrdsData::LowestSlot(0, o)); - assert_eq!(v.sanitize(), Err(SanitizeError::InvalidValue)); - } - #[test] fn test_signature() { let mut rng = rand::thread_rng(); @@ -801,61 +255,6 @@ mod test { verify_signatures(&mut v, &keypair, &wrong_keypair); } - #[test] - fn test_max_vote_index() { - let mut rng = rand::thread_rng(); - let keypair = Keypair::new(); - let vote = Vote::new(keypair.pubkey(), new_test_vote_tx(&mut rng), timestamp()).unwrap(); - let vote = CrdsValue::new_signed(CrdsData::Vote(MAX_VOTES, vote), &keypair); - assert!(vote.sanitize().is_err()); - } - - #[test] - fn test_vote_round_trip() { - let mut rng = rand::thread_rng(); - let vote = vote_state::Vote::new( - vec![1, 3, 7], // slots - Hash::new_unique(), - ); - let ix = vote_instruction::vote( - &Pubkey::new_unique(), // vote_pubkey - &Pubkey::new_unique(), // authorized_voter_pubkey - vote, - ); - let tx = Transaction::new_with_payer( - &[ix], // instructions - Some(&Pubkey::new_unique()), // payer - ); - let vote = Vote::new( - Pubkey::new_unique(), // from - tx, - rng.gen(), // wallclock - ) - .unwrap(); - assert_eq!(vote.slot, Some(7)); - let bytes = bincode::serialize(&vote).unwrap(); - let other = bincode::deserialize(&bytes[..]).unwrap(); - assert_eq!(vote, other); - assert_eq!(other.slot, Some(7)); - let bytes = bincode::options().serialize(&vote).unwrap(); - let other = bincode::options().deserialize(&bytes[..]).unwrap(); - assert_eq!(vote, other); - assert_eq!(other.slot, Some(7)); - } - - #[test] - fn test_max_epoch_slots_index() { - let keypair = Keypair::new(); - let item = CrdsValue::new_signed( - CrdsData::EpochSlots( - MAX_EPOCH_SLOTS, - EpochSlots::new(keypair.pubkey(), timestamp()), - ), - &keypair, - ); - assert_eq!(item.sanitize(), Err(SanitizeError::ValueOutOfBounds)); - } - fn serialize_deserialize_value(value: &mut CrdsValue, keypair: &Keypair) { let num_tries = 10; value.sign(keypair); @@ -886,150 +285,6 @@ mod test { serialize_deserialize_value(value, correct_keypair); } - #[test] - fn test_node_instance_crds_lable() { - fn make_crds_value(node: NodeInstance) -> CrdsValue { - CrdsValue::new_unsigned(CrdsData::NodeInstance(node)) - } - let mut rng = rand::thread_rng(); - let now = timestamp(); - let pubkey = Pubkey::new_unique(); - let node = NodeInstance::new(&mut rng, pubkey, now); - assert_eq!( - make_crds_value(node.clone()).label(), - make_crds_value(node.with_wallclock(now + 8)).label() - ); - let other = NodeInstance { - from: Pubkey::new_unique(), - ..node - }; - assert_ne!( - make_crds_value(node.clone()).label(), - make_crds_value(other).label() - ); - let other = NodeInstance { - wallclock: now + 8, - ..node - }; - assert_eq!( - make_crds_value(node.clone()).label(), - make_crds_value(other).label() - ); - let other = NodeInstance { - timestamp: now + 8, - ..node - }; - assert_eq!( - make_crds_value(node.clone()).label(), - make_crds_value(other).label() - ); - let other = NodeInstance { - token: rng.gen(), - ..node - }; - assert_eq!( - make_crds_value(node).label(), - make_crds_value(other).label() - ); - } - - #[test] - fn test_check_duplicate_instance() { - fn make_crds_value(node: NodeInstance) -> CrdsValue { - CrdsValue::new_unsigned(CrdsData::NodeInstance(node)) - } - let now = timestamp(); - let mut rng = rand::thread_rng(); - let pubkey = Pubkey::new_unique(); - let node = NodeInstance::new(&mut rng, pubkey, now); - let node_crds = make_crds_value(node.clone()); - // Same token is not a duplicate. - let other = NodeInstance { - from: pubkey, - wallclock: now + 1, - timestamp: now + 1, - token: node.token, - }; - let other_crds = make_crds_value(other.clone()); - assert!(!node.check_duplicate(&other_crds)); - assert!(!other.check_duplicate(&node_crds)); - assert_eq!(node.overrides(&other_crds), None); - assert_eq!(other.overrides(&node_crds), None); - // Older timestamp is not a duplicate. - let other = NodeInstance { - from: pubkey, - wallclock: now + 1, - timestamp: now - 1, - token: rng.gen(), - }; - let other_crds = make_crds_value(other.clone()); - assert!(!node.check_duplicate(&other_crds)); - assert!(other.check_duplicate(&node_crds)); - assert_eq!(node.overrides(&other_crds), Some(true)); - assert_eq!(other.overrides(&node_crds), Some(false)); - // Updated wallclock is not a duplicate. - let other = node.with_wallclock(now + 8); - assert_eq!( - other, - NodeInstance { - from: pubkey, - wallclock: now + 8, - timestamp: now, - token: node.token, - } - ); - let other_crds = make_crds_value(other.clone()); - assert!(!node.check_duplicate(&other_crds)); - assert!(!other.check_duplicate(&node_crds)); - assert_eq!(node.overrides(&other_crds), None); - assert_eq!(other.overrides(&node_crds), None); - // Duplicate instance; tied timestamp. - for _ in 0..10 { - let other = NodeInstance { - from: pubkey, - wallclock: 0, - timestamp: now, - token: rng.gen(), - }; - let other_crds = make_crds_value(other.clone()); - assert!(node.check_duplicate(&other_crds)); - assert!(other.check_duplicate(&node_crds)); - assert_eq!(node.overrides(&other_crds), Some(other.token < node.token)); - assert_eq!(other.overrides(&node_crds), Some(node.token < other.token)); - } - // Duplicate instance; more recent timestamp. - for _ in 0..10 { - let other = NodeInstance { - from: pubkey, - wallclock: 0, - timestamp: now + 1, - token: rng.gen(), - }; - let other_crds = make_crds_value(other.clone()); - assert!(node.check_duplicate(&other_crds)); - assert!(!other.check_duplicate(&node_crds)); - assert_eq!(node.overrides(&other_crds), Some(false)); - assert_eq!(other.overrides(&node_crds), Some(true)); - } - // Different pubkey is not a duplicate. - let other = NodeInstance { - from: Pubkey::new_unique(), - wallclock: now + 1, - timestamp: now + 1, - token: rng.gen(), - }; - let other_crds = make_crds_value(other.clone()); - assert!(!node.check_duplicate(&other_crds)); - assert!(!other.check_duplicate(&node_crds)); - assert_eq!(node.overrides(&other_crds), None); - assert_eq!(other.overrides(&node_crds), None); - // Different crds value is not a duplicate. - let other = ContactInfo::new_rand(&mut rng, Some(pubkey)); - let other = CrdsValue::new_unsigned(CrdsData::ContactInfo(other)); - assert!(!node.check_duplicate(&other)); - assert_eq!(node.overrides(&other), None); - } - #[test] fn test_should_force_push() { let mut rng = rand::thread_rng(); diff --git a/gossip/src/duplicate_shred.rs b/gossip/src/duplicate_shred.rs index a6d49f104fadea..411b5d4af61381 100644 --- a/gossip/src/duplicate_shred.rs +++ b/gossip/src/duplicate_shred.rs @@ -1,5 +1,5 @@ use { - crate::crds_value::sanitize_wallclock, + crate::crds_data::sanitize_wallclock, itertools::Itertools, solana_ledger::{ blockstore::BlockstoreError, diff --git a/gossip/src/epoch_slots.rs b/gossip/src/epoch_slots.rs index 373ac8840722e2..c7261b70e331b3 100644 --- a/gossip/src/epoch_slots.rs +++ b/gossip/src/epoch_slots.rs @@ -1,7 +1,7 @@ use { crate::{ cluster_info::MAX_CRDS_OBJECT_SIZE, - crds_value::{self, MAX_SLOT, MAX_WALLCLOCK}, + crds_data::{self, MAX_SLOT, MAX_WALLCLOCK}, }, bincode::serialized_size, bv::BitVec, @@ -324,7 +324,7 @@ impl EpochSlots { /// New random EpochSlots for tests and simulations. pub(crate) fn new_rand(rng: &mut R, pubkey: Option) -> Self { - let now = crds_value::new_rand_timestamp(rng); + let now = crds_data::new_rand_timestamp(rng); let pubkey = pubkey.unwrap_or_else(solana_sdk::pubkey::new_rand); let mut epoch_slots = Self::new(pubkey, now); let num_slots = rng.gen_range(0..20); diff --git a/gossip/src/legacy_contact_info.rs b/gossip/src/legacy_contact_info.rs index a0b341ffcbe083..d9733ad9ce3802 100644 --- a/gossip/src/legacy_contact_info.rs +++ b/gossip/src/legacy_contact_info.rs @@ -5,7 +5,7 @@ use { contact_info::{ sanitize_quic_offset, ContactInfo, Error, Protocol, SOCKET_ADDR_UNSPECIFIED, }, - crds_value::MAX_WALLCLOCK, + crds_data::MAX_WALLCLOCK, }, solana_sanitize::{Sanitize, SanitizeError}, solana_sdk::pubkey::Pubkey, diff --git a/gossip/src/lib.rs b/gossip/src/lib.rs index b1c90c335edb4b..59b84a871f4fad 100644 --- a/gossip/src/lib.rs +++ b/gossip/src/lib.rs @@ -5,6 +5,7 @@ pub mod cluster_info; pub mod cluster_info_metrics; pub mod contact_info; pub mod crds; +pub mod crds_data; pub mod crds_entry; pub mod crds_gossip; pub mod crds_gossip_error; diff --git a/gossip/src/restart_crds_values.rs b/gossip/src/restart_crds_values.rs index 5da66eb46bf007..3198f5612f3549 100644 --- a/gossip/src/restart_crds_values.rs +++ b/gossip/src/restart_crds_values.rs @@ -1,5 +1,5 @@ use { - crate::crds_value::{new_rand_timestamp, sanitize_wallclock}, + crate::crds_data::{new_rand_timestamp, sanitize_wallclock}, bv::BitVec, itertools::Itertools, rand::Rng, @@ -219,7 +219,8 @@ mod test { super::*, crate::{ cluster_info::MAX_CRDS_OBJECT_SIZE, - crds_value::{CrdsData, CrdsValue, CrdsValueLabel}, + crds_data::CrdsData, + crds_value::{CrdsValue, CrdsValueLabel}, }, bincode::serialized_size, solana_sdk::{signature::Signer, signer::keypair::Keypair, timing::timestamp}, @@ -366,7 +367,7 @@ mod test { }; assert_eq!(fork.sanitize(), Ok(())); assert_eq!(fork.observed_stake, 800_000); - fork.wallclock = crate::crds_value::MAX_WALLCLOCK; + fork.wallclock = crate::crds_data::MAX_WALLCLOCK; assert_eq!(fork.sanitize(), Err(SanitizeError::ValueOutOfBounds)); } } diff --git a/gossip/tests/crds_gossip.rs b/gossip/tests/crds_gossip.rs index 8c83e9e96e8fbc..72b51313b0b007 100644 --- a/gossip/tests/crds_gossip.rs +++ b/gossip/tests/crds_gossip.rs @@ -8,11 +8,12 @@ use { cluster_info_metrics::GossipStats, contact_info::ContactInfo, crds::GossipRoute, + crds_data::CrdsData, crds_gossip::*, crds_gossip_error::CrdsGossipError, crds_gossip_pull::{CrdsTimeouts, ProcessPullStats, CRDS_GOSSIP_PULL_CRDS_TIMEOUT_MS}, crds_gossip_push::CRDS_GOSSIP_PUSH_MSG_TIMEOUT_MS, - crds_value::{CrdsData, CrdsValue, CrdsValueLabel}, + crds_value::{CrdsValue, CrdsValueLabel}, ping_pong::PingCache, }, solana_rayon_threadlimit::get_thread_count, diff --git a/local-cluster/src/cluster_tests.rs b/local-cluster/src/cluster_tests.rs index 6c1140118a250b..8504eb9988189d 100644 --- a/local-cluster/src/cluster_tests.rs +++ b/local-cluster/src/cluster_tests.rs @@ -14,7 +14,8 @@ use { cluster_info::{self, ClusterInfo}, contact_info::ContactInfo, crds::Cursor, - crds_value::{self, CrdsData, CrdsValue, CrdsValueLabel}, + crds_data::{self, CrdsData}, + crds_value::{CrdsValue, CrdsValueLabel}, gossip_error::GossipError, gossip_service::{self, discover_cluster, GossipService}, }, @@ -679,7 +680,7 @@ pub fn submit_vote_to_cluster_gossip( vec![CrdsValue::new_signed( CrdsData::Vote( 0, - crds_value::Vote::new(node_keypair.pubkey(), vote_tx, timestamp()).unwrap(), + crds_data::Vote::new(node_keypair.pubkey(), vote_tx, timestamp()).unwrap(), ), node_keypair, )], diff --git a/turbine/src/cluster_nodes.rs b/turbine/src/cluster_nodes.rs index be3bca47458f30..d7dabbab4f4594 100644 --- a/turbine/src/cluster_nodes.rs +++ b/turbine/src/cluster_nodes.rs @@ -9,8 +9,9 @@ use { cluster_info::ClusterInfo, contact_info::{ContactInfo, Protocol}, crds::GossipRoute, + crds_data::CrdsData, crds_gossip_pull::CRDS_GOSSIP_PULL_CRDS_TIMEOUT_MS, - crds_value::{CrdsData, CrdsValue}, + crds_value::CrdsValue, weighted_shuffle::WeightedShuffle, }, solana_ledger::shred::ShredId, diff --git a/validator/src/bootstrap.rs b/validator/src/bootstrap.rs index aeb036c4c4e2af..e68abf5c7a7504 100644 --- a/validator/src/bootstrap.rs +++ b/validator/src/bootstrap.rs @@ -9,7 +9,7 @@ use { solana_gossip::{ cluster_info::{ClusterInfo, Node}, contact_info::{ContactInfo, Protocol}, - crds_value, + crds_data, gossip_service::GossipService, }, solana_metrics::datapoint_info, @@ -1356,7 +1356,7 @@ fn should_use_local_snapshot( /// Get the node's highest snapshot hashes from CRDS fn get_snapshot_hashes_for_node(cluster_info: &ClusterInfo, node: &Pubkey) -> Option { cluster_info.get_snapshot_hashes_for_node(node).map( - |crds_value::SnapshotHashes { + |crds_data::SnapshotHashes { full, incremental, .. }| { let highest_incremental_snapshot_hash = incremental.into_iter().max(); diff --git a/wen-restart/src/wen_restart.rs b/wen-restart/src/wen_restart.rs index 9296ba8e82bd42..703845e154c868 100644 --- a/wen-restart/src/wen_restart.rs +++ b/wen-restart/src/wen_restart.rs @@ -1402,7 +1402,8 @@ mod tests { cluster_info::ClusterInfo, contact_info::ContactInfo, crds::GossipRoute, - crds_value::{CrdsData, CrdsValue}, + crds_data::CrdsData, + crds_value::CrdsValue, restart_crds_values::{RestartHeaviestFork, RestartLastVotedForkSlots}, }, solana_ledger::{