Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

v2.0: reworks max number of outgoing push messages (backport of #3016) #3038

Merged
merged 1 commit into from
Oct 1, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 5 additions & 13 deletions gossip/src/crds_gossip_push.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,8 @@ use {
push_active_set::PushActiveSet,
received_cache::ReceivedCache,
},
bincode::serialized_size,
itertools::Itertools,
solana_sdk::{
packet::PACKET_DATA_SIZE,
pubkey::Pubkey,
signature::{Keypair, Signer},
timing::timestamp,
Expand Down Expand Up @@ -53,8 +51,6 @@ const CRDS_GOSSIP_PRUNE_MIN_INGRESS_NODES: usize = 2;
const CRDS_GOSSIP_PUSH_ACTIVE_SET_SIZE: usize = CRDS_GOSSIP_PUSH_FANOUT + 3;

pub struct CrdsGossipPush {
/// Max bytes per message
max_bytes: usize,
/// Active set of validators for push
active_set: RwLock<PushActiveSet>,
/// Cursor into the crds table for values to push.
Expand All @@ -74,8 +70,6 @@ pub struct CrdsGossipPush {
impl Default for CrdsGossipPush {
fn default() -> Self {
Self {
// Allow upto 64 Crds Values per PUSH
max_bytes: PACKET_DATA_SIZE * 64,
active_set: RwLock::default(),
crds_cursor: Mutex::default(),
received_cache: Mutex::new(ReceivedCache::new(2 * CRDS_UNIQUE_PUBKEY_CAPACITY)),
Expand Down Expand Up @@ -180,10 +174,10 @@ impl CrdsGossipPush {
usize, // number of values
usize, // number of push messages
) {
const MAX_NUM_PUSHES: usize = 1 << 12;

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why was 4k chosen? Any idea how close we get to this limit during steady state? Or what we burst up to?

If we were to sustain at this rate, how much egress would that be? Something like 300Mbps?

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Mostly based on testing on an unstaked node in east asia, mainnet/testnet metrics plus some margin for spikes. This should leave enough margin during steady state with the caveat below.

How often this function is invoked partly depends on how often the node receives push messages. So I don't have a mathematical mapping between this limit and the egress rate. But I think we have enough metrics to monitor this on testnet and get some estimation.

For now this seems like a working patch addressing contact-info propagation issue for unstaked east asia nodes.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The max possible limit is actually pretty high which may be a concern. Some napkin math puts our max rate at about 4.4 GB/s:

910 calls to new_push_requests/s
4096 CrdsValues/call
Max CrdsValue size: 1187 bytes/CrdsValue

910 call/s * 4096 CrdsValue/call = 3,727,360 CrdsValue/s
3,727,360 CrdsValue/s * 1187 bytes/CrdsValue = 4.424 GB/s

^ Although in order to be sending that much traffic, the node would have to be receiving at the lowest: 491 MB/s

4.24GB/s / 9 push fanout peers = 491 MB/s

with a staked node on testnet, it is closer to:

4.24GB/s / 4 push fanout peers = 1.1 GB/s

With this same staked node on testnet:
Push Burst @ ~192 MB/s on validator startup.
Push Steady State @ ~10MB/s

So our steady state push bandwidth is pretty low. But, the peak is pretty significant and will likely be higher for higher staked nodes.

4096 is chosen to ensure an unstaked node, not receiving any push messages, can keep up with the demand of incoming pull requests while also being able to send out its own ContactInfo. Initially an unstaked node only receives data via pull request. So, the data from the pull responses fills up its table quickly. The problem is, the node cannot push out all of the new CrdsValues quick enough before the node refreshes its ContactInfo in the table and the node's ContactInfo gets pushed to the end of the table.

Out of the 910 calls to new_push_requests per second, only 10 of them are run within the run_gossip set of threads. The other ~900/s are called via the handle_batch_push_messages set of threads. But if a node is not receiving any push messages ,handle_batch_push_messages is exited early so no calls to new_push_requests are made.

As a result, the node only has 10 threads dedicated to sending push messages, but those 10 threads in their previous state (before this PR), cannot send enough push messages before the node's ContactInfo gets refreshed.

All that is to say, that we need a high enough limit in the new_push_requests function to send enough data so that the node can send its ContactInfo via push using just the run_gossip threads before it gets refreshed.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that max calculation does not take into account that the frequency the function is called limits how many push messages are generated in each call.

For example for the function to be called ~1000 times per second then each call should only take 1ms in which case it cannot generate many push messages in that short period of time.

iow more frequent calls => fewer push messages per call.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ahh you are right it does not. good point

let active_set = self.active_set.read().unwrap();
let mut num_pushes = 0;
let mut num_values = 0;
let mut total_bytes: usize = 0;
let mut push_messages: HashMap<Pubkey, Vec<CrdsValue>> = HashMap::new();
let wallclock_window = self.wallclock_window(now);
let mut crds_cursor = self.crds_cursor.lock().unwrap();
Expand All @@ -193,12 +187,7 @@ impl CrdsGossipPush {
.get_entries(crds_cursor.deref_mut())
.map(|entry| &entry.value)
.filter(|value| wallclock_window.contains(&value.wallclock()));
for value in entries {
let serialized_size = serialized_size(&value).unwrap();
total_bytes = total_bytes.saturating_add(serialized_size as usize);
if total_bytes > self.max_bytes {
break;
}
'outer: for value in entries {
num_values += 1;
let origin = value.pubkey();
let nodes = active_set.get_nodes(
Expand All @@ -210,6 +199,9 @@ impl CrdsGossipPush {
for node in nodes.take(self.push_fanout) {
push_messages.entry(*node).or_default().push(value.clone());
num_pushes += 1;
if num_pushes >= MAX_NUM_PUSHES {
break 'outer;
}
}
}
drop(crds);
Expand Down