diff --git a/gossip/src/crds.rs b/gossip/src/crds.rs index 719bc138479096..6fe4e9de0e9ff8 100644 --- a/gossip/src/crds.rs +++ b/gossip/src/crds.rs @@ -147,14 +147,20 @@ impl Cursor { } impl VersionedCrdsValue { - fn new(value: CrdsValue, cursor: Cursor, local_timestamp: u64) -> Self { + fn new(value: CrdsValue, cursor: Cursor, local_timestamp: u64, route: GossipRoute) -> Self { let value_hash = hash(&serialize(&value).unwrap()); + let num_push_dups = match route { + // set num_push_dups to 2^8 to indicate PullResponse + // unlikely a node receives the exact same message via push 2^8 times + GossipRoute::PullResponse => u8::MAX, + _ => 0u8, + }; VersionedCrdsValue { ordinal: cursor.ordinal(), value, local_timestamp, value_hash, - num_push_dups: 0u8, + num_push_dups, } } } @@ -222,7 +228,7 @@ impl Crds { ) -> Result<(), CrdsError> { let label = value.label(); let pubkey = value.pubkey(); - let value = VersionedCrdsValue::new(value, self.cursor, now); + let value = VersionedCrdsValue::new(value, self.cursor, now, route); match self.table.entry(label) { Entry::Vacant(entry) => { self.stats.lock().unwrap().record_insert(&value, route); @@ -303,6 +309,25 @@ impl Crds { Err(CrdsError::InsertFailed) } else if matches!(route, GossipRoute::PushMessage(_)) { let entry = entry.get_mut(); + if entry.num_push_dups == u8::MAX { + datapoint_info!( + "gossip_crds_redundant_pull", + ( + "origin", + value.value.pubkey().to_string().get(..8), + Option + ), + ( + "signature", + value.value.signature.to_string().get(..8), + Option + ) + ); + // now that we've recorded redundant pull, + // we can set num_push_dups to 0. It will get + // incremented to 1 right after as it should + entry.num_push_dups = 0; + } entry.num_push_dups = entry.num_push_dups.saturating_add(1); Err(CrdsError::DuplicatePush(entry.num_push_dups)) } else { @@ -1450,8 +1475,9 @@ mod tests { #[allow(clippy::neg_cmp_op_on_partial_ord)] fn test_equal() { let val = CrdsValue::new_unsigned(CrdsData::LegacyContactInfo(ContactInfo::default())); - let v1 = VersionedCrdsValue::new(val.clone(), Cursor::default(), 1); - let v2 = VersionedCrdsValue::new(val, Cursor::default(), 1); + let v1 = + VersionedCrdsValue::new(val.clone(), Cursor::default(), 1, GossipRoute::LocalMessage); + let v2 = VersionedCrdsValue::new(val, Cursor::default(), 1, GossipRoute::LocalMessage); assert_eq!(v1, v2); assert!(!(v1 != v2)); assert!(!overrides(&v1.value, &v2)); @@ -1467,6 +1493,7 @@ mod tests { ))), Cursor::default(), 1, // local_timestamp + GossipRoute::LocalMessage, ); let v2 = VersionedCrdsValue::new( { @@ -1476,6 +1503,7 @@ mod tests { }, Cursor::default(), 1, // local_timestamp + GossipRoute::LocalMessage, ); assert_eq!(v1.value.label(), v2.value.label()); @@ -1501,6 +1529,7 @@ mod tests { ))), Cursor::default(), 1, // local_timestamp + GossipRoute::LocalMessage, ); let v2 = VersionedCrdsValue::new( CrdsValue::new_unsigned(CrdsData::LegacyContactInfo(ContactInfo::new_localhost( @@ -1509,6 +1538,7 @@ mod tests { ))), Cursor::default(), 1, // local_timestamp + GossipRoute::LocalMessage, ); assert_eq!(v1.value.label(), v2.value.label()); assert!(overrides(&v1.value, &v2)); @@ -1527,6 +1557,7 @@ mod tests { ))), Cursor::default(), 1, // local_timestamp + GossipRoute::LocalMessage, ); let v2 = VersionedCrdsValue::new( CrdsValue::new_unsigned(CrdsData::LegacyContactInfo(ContactInfo::new_localhost( @@ -1535,6 +1566,7 @@ mod tests { ))), Cursor::default(), 1, // local_timestamp + GossipRoute::LocalMessage, ); assert_ne!(v1, v2); assert!(!(v1 == v2));