Skip to content

Commit

Permalink
add in metric for tracking redundant pull messages
Browse files Browse the repository at this point in the history
  • Loading branch information
gregcusack committed Mar 7, 2024
1 parent adefcbb commit 215a465
Showing 1 changed file with 37 additions and 5 deletions.
42 changes: 37 additions & 5 deletions gossip/src/crds.rs
Original file line number Diff line number Diff line change
Expand Up @@ -147,14 +147,20 @@ impl Cursor {
}

impl VersionedCrdsValue {
fn new(value: CrdsValue, cursor: Cursor, local_timestamp: u64) -> Self {
fn new(value: CrdsValue, cursor: Cursor, local_timestamp: u64, route: GossipRoute) -> Self {
let value_hash = hash(&serialize(&value).unwrap());
let num_push_dups = match route {
// set num_push_dups to 2^8 to indicate PullResponse
// unlikely a node receives the exact same message via push 2^8 times
GossipRoute::PullResponse => u8::MAX,
_ => 0u8,
};
VersionedCrdsValue {
ordinal: cursor.ordinal(),
value,
local_timestamp,
value_hash,
num_push_dups: 0u8,
num_push_dups,
}
}
}
Expand Down Expand Up @@ -222,7 +228,7 @@ impl Crds {
) -> Result<(), CrdsError> {
let label = value.label();
let pubkey = value.pubkey();
let value = VersionedCrdsValue::new(value, self.cursor, now);
let value = VersionedCrdsValue::new(value, self.cursor, now, route);
match self.table.entry(label) {
Entry::Vacant(entry) => {
self.stats.lock().unwrap().record_insert(&value, route);
Expand Down Expand Up @@ -303,6 +309,25 @@ impl Crds {
Err(CrdsError::InsertFailed)
} else if matches!(route, GossipRoute::PushMessage(_)) {
let entry = entry.get_mut();
if entry.num_push_dups == u8::MAX {
datapoint_info!(
"gossip_crds_redundant_pull",
(
"origin",
value.value.pubkey().to_string().get(..8),
Option<String>
),
(
"signature",
value.value.signature.to_string().get(..8),
Option<String>
)
);
// now that we've recorded redundant pull,
// we can set num_push_dups to 0. It will get
// incremented to 1 right after as it should
entry.num_push_dups = 0;
}
entry.num_push_dups = entry.num_push_dups.saturating_add(1);
Err(CrdsError::DuplicatePush(entry.num_push_dups))
} else {
Expand Down Expand Up @@ -1450,8 +1475,9 @@ mod tests {
#[allow(clippy::neg_cmp_op_on_partial_ord)]
fn test_equal() {
let val = CrdsValue::new_unsigned(CrdsData::LegacyContactInfo(ContactInfo::default()));
let v1 = VersionedCrdsValue::new(val.clone(), Cursor::default(), 1);
let v2 = VersionedCrdsValue::new(val, Cursor::default(), 1);
let v1 =
VersionedCrdsValue::new(val.clone(), Cursor::default(), 1, GossipRoute::LocalMessage);
let v2 = VersionedCrdsValue::new(val, Cursor::default(), 1, GossipRoute::LocalMessage);
assert_eq!(v1, v2);
assert!(!(v1 != v2));
assert!(!overrides(&v1.value, &v2));
Expand All @@ -1467,6 +1493,7 @@ mod tests {
))),
Cursor::default(),
1, // local_timestamp
GossipRoute::LocalMessage,
);
let v2 = VersionedCrdsValue::new(
{
Expand All @@ -1476,6 +1503,7 @@ mod tests {
},
Cursor::default(),
1, // local_timestamp
GossipRoute::LocalMessage,
);

assert_eq!(v1.value.label(), v2.value.label());
Expand All @@ -1501,6 +1529,7 @@ mod tests {
))),
Cursor::default(),
1, // local_timestamp
GossipRoute::LocalMessage,
);
let v2 = VersionedCrdsValue::new(
CrdsValue::new_unsigned(CrdsData::LegacyContactInfo(ContactInfo::new_localhost(
Expand All @@ -1509,6 +1538,7 @@ mod tests {
))),
Cursor::default(),
1, // local_timestamp
GossipRoute::LocalMessage,
);
assert_eq!(v1.value.label(), v2.value.label());
assert!(overrides(&v1.value, &v2));
Expand All @@ -1527,6 +1557,7 @@ mod tests {
))),
Cursor::default(),
1, // local_timestamp
GossipRoute::LocalMessage,
);
let v2 = VersionedCrdsValue::new(
CrdsValue::new_unsigned(CrdsData::LegacyContactInfo(ContactInfo::new_localhost(
Expand All @@ -1535,6 +1566,7 @@ mod tests {
))),
Cursor::default(),
1, // local_timestamp
GossipRoute::LocalMessage,
);
assert_ne!(v1, v2);
assert!(!(v1 == v2));
Expand Down

0 comments on commit 215a465

Please sign in to comment.