Skip to content

Commit

Permalink
splits out gossip CrdsData out of crds_value.rs (#3463)
Browse files Browse the repository at this point in the history
Reorganizing this code in preparation of upcoming changes to CrdsValue
(de)serialization.
  • Loading branch information
behzadnouri authored Nov 4, 2024
1 parent eb3b18e commit e7778eb
Show file tree
Hide file tree
Showing 20 changed files with 824 additions and 838 deletions.
2 changes: 1 addition & 1 deletion core/src/cluster_slots_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,7 @@ impl ClusterSlotsService {
mod test {
use {
super::*,
solana_gossip::{cluster_info::Node, crds_value::LowestSlot},
solana_gossip::{cluster_info::Node, crds_data::LowestSlot},
solana_sdk::signature::{Keypair, Signer},
solana_streamer::socket::SocketAddrSpace,
};
Expand Down
32 changes: 17 additions & 15 deletions gossip/src/cluster_info.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,16 +20,17 @@ use {
},
contact_info::{self, ContactInfo, Error as ContactInfoError},
crds::{Crds, Cursor, GossipRoute},
crds_data::{
self, CrdsData, EpochSlotsIndex, LowestSlot, NodeInstance, SnapshotHashes, Version,
Vote, MAX_WALLCLOCK,
},
crds_gossip::CrdsGossip,
crds_gossip_error::CrdsGossipError,
crds_gossip_pull::{
get_max_bloom_filter_bytes, CrdsFilter, CrdsTimeouts, ProcessPullStats,
CRDS_GOSSIP_PULL_CRDS_TIMEOUT_MS,
},
crds_value::{
self, CrdsData, CrdsValue, CrdsValueLabel, EpochSlotsIndex, LowestSlot, NodeInstance,
SnapshotHashes, Version, Vote, MAX_WALLCLOCK,
},
crds_value::{CrdsValue, CrdsValueLabel},
duplicate_shred::DuplicateShred,
epoch_slots::EpochSlots,
gossip_error::GossipError,
Expand Down Expand Up @@ -201,7 +202,7 @@ impl PruneData {
/// New random PruneData for tests and benchmarks.
#[cfg(test)]
fn new_rand<R: Rng>(rng: &mut R, self_keypair: &Keypair, num_nodes: Option<usize>) -> Self {
let wallclock = crds_value::new_rand_timestamp(rng);
let wallclock = crds_data::new_rand_timestamp(rng);
let num_nodes = num_nodes.unwrap_or_else(|| rng.gen_range(0..MAX_PRUNE_DATA_NODES + 1));
let prunes = std::iter::repeat_with(Pubkey::new_unique)
.take(num_nodes)
Expand Down Expand Up @@ -310,7 +311,7 @@ pub(crate) type Ping = ping_pong::Ping<[u8; GOSSIP_PING_TOKEN_SIZE]>;
#[cfg_attr(
feature = "frozen-abi",
derive(AbiExample, AbiEnumVisitor),
frozen_abi(digest = "GfVFxfPfYcFLCaa29uxQxyKJAuTZ1cYqcRKhVrEKwDK7")
frozen_abi(digest = "AbxVYLVnKbfJRjtnj9gSHegnE31wcTGAm2nEFuuyJKr4")
)]
#[derive(Serialize, Deserialize, Debug)]
#[allow(clippy::large_enum_variant)]
Expand Down Expand Up @@ -915,7 +916,7 @@ impl ClusterInfo {
let current_slots: Vec<_> = {
let gossip_crds =
self.time_gossip_read_lock("lookup_epoch_slots", &self.stats.epoch_slots_lookup);
(0..crds_value::MAX_EPOCH_SLOTS)
(0..crds_data::MAX_EPOCH_SLOTS)
.filter_map(|ix| {
let label = CrdsValueLabel::EpochSlots(ix, self_pubkey);
let crds_value = gossip_crds.get::<&CrdsValue>(&label)?;
Expand All @@ -934,7 +935,7 @@ impl ClusterInfo {
let total_slots = max_slot as isize - min_slot as isize;
// WARN if CRDS is not storing at least a full epoch worth of slots
if DEFAULT_SLOTS_PER_EPOCH as isize > total_slots
&& crds_value::MAX_EPOCH_SLOTS as usize <= current_slots.len()
&& crds_data::MAX_EPOCH_SLOTS as usize <= current_slots.len()
{
self.stats.epoch_slots_filled.add_relaxed(1);
warn!(
Expand All @@ -951,7 +952,7 @@ impl ClusterInfo {
let mut entries = Vec::default();
let keypair = self.keypair();
while !update.is_empty() {
let ix = epoch_slot_index % crds_value::MAX_EPOCH_SLOTS;
let ix = epoch_slot_index % crds_data::MAX_EPOCH_SLOTS;
let now = timestamp();
let mut slots = if !reset {
self.lookup_epoch_slots(ix)
Expand Down Expand Up @@ -1321,7 +1322,7 @@ impl ClusterInfo {
.map(|entry| entry.version.clone())
.or_else(|| {
gossip_crds
.get::<&crds_value::LegacyVersion>(*pubkey)
.get::<&crds_data::LegacyVersion>(*pubkey)
.map(|entry| entry.version.clone())
.map(solana_version::LegacyVersion2::from)
})
Expand Down Expand Up @@ -2477,10 +2478,10 @@ impl ClusterInfo {
let my_contact_info = self.my_contact_info();
move |values: &[CrdsValue]| {
if should_check_duplicate_instance
&& values.iter().any(|value| {
instance.check_duplicate(value)
|| matches!(value.data(), CrdsData::ContactInfo(other)
if my_contact_info.check_duplicate(other))
&& values.iter().any(|value| match value.data() {
CrdsData::ContactInfo(other) => my_contact_info.check_duplicate(other),
CrdsData::NodeInstance(other) => instance.check_duplicate(other),
_ => false,
})
{
return Err(GossipError::DuplicateNodeInstance);
Expand Down Expand Up @@ -3403,8 +3404,9 @@ mod tests {
use {
super::*,
crate::{
crds_data::{AccountsHashes, Vote as CrdsVote},
crds_gossip_pull::tests::MIN_NUM_BLOOM_FILTERS,
crds_value::{AccountsHashes, CrdsValue, CrdsValueLabel, Vote as CrdsVote},
crds_value::{CrdsValue, CrdsValueLabel},
duplicate_shred::{self, tests::new_rand_shred, MAX_DUPLICATE_SHREDS},
socketaddr,
},
Expand Down
2 changes: 1 addition & 1 deletion gossip/src/contact_info.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
pub use solana_client::connection_cache::Protocol;
use {
crate::{crds_value::MAX_WALLCLOCK, legacy_contact_info::LegacyContactInfo},
crate::{crds_data::MAX_WALLCLOCK, legacy_contact_info::LegacyContactInfo},
assert_matches::{assert_matches, debug_assert_matches},
serde::{Deserialize, Deserializer, Serialize},
solana_sanitize::{Sanitize, SanitizeError},
Expand Down
28 changes: 17 additions & 11 deletions gossip/src/crds.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,11 @@
use {
crate::{
contact_info::ContactInfo,
crds_data::CrdsData,
crds_entry::CrdsEntry,
crds_gossip_pull::CrdsTimeouts,
crds_shards::CrdsShards,
crds_value::{CrdsData, CrdsValue, CrdsValueLabel},
crds_value::{CrdsValue, CrdsValueLabel},
},
assert_matches::debug_assert_matches,
bincode::serialize,
Expand Down Expand Up @@ -199,17 +200,22 @@ fn overrides(value: &CrdsValue, other: &VersionedCrdsValue) -> bool {
// Contact-infos and node instances are special cased so that if there are
// two running instances of the same node, the more recent start is
// propagated through gossip regardless of wallclocks.
if let CrdsData::NodeInstance(value) = value.data() {
if let Some(out) = value.overrides(&other.value) {
return out;
match value.data() {
CrdsData::ContactInfo(value) => {
if let CrdsData::ContactInfo(other) = other.value.data() {
if let Some(out) = value.overrides(other) {
return out;
}
}
}
}
if let CrdsData::ContactInfo(value) = value.data() {
if let CrdsData::ContactInfo(other) = other.value.data() {
if let Some(out) = value.overrides(other) {
return out;
CrdsData::NodeInstance(value) => {
if let CrdsData::NodeInstance(other) = other.value.data() {
if let Some(out) = value.overrides(other) {
return out;
}
}
}
_ => (),
}
match value.wallclock().cmp(&other.value.wallclock()) {
Ordering::Less => false,
Expand Down Expand Up @@ -311,7 +317,7 @@ impl Crds {
Entry::Occupied(mut entry) => {
stats.record_fail(&value, route);
trace!(
"INSERT FAILED data: {} new.wallclock: {}",
"INSERT FAILED data: {:?} new.wallclock: {}",
value.value.label(),
value.value.wallclock(),
);
Expand Down Expand Up @@ -786,7 +792,7 @@ fn should_report_message_signature(signature: &Signature) -> bool {
mod tests {
use {
super::*,
crate::crds_value::{new_rand_timestamp, AccountsHashes, NodeInstance},
crate::crds_data::{new_rand_timestamp, AccountsHashes, NodeInstance},
rand::{thread_rng, Rng, SeedableRng},
rand_chacha::ChaChaRng,
rayon::ThreadPoolBuilder,
Expand Down
Loading

0 comments on commit e7778eb

Please sign in to comment.