Skip to content

Commit

Permalink
Consider connection count of staked nodes when calculating allowed PPS
Browse files Browse the repository at this point in the history
  • Loading branch information
lijunwangs committed Apr 20, 2024
1 parent 6aacbf3 commit d5ebd8f
Show file tree
Hide file tree
Showing 2 changed files with 60 additions and 10 deletions.
6 changes: 6 additions & 0 deletions streamer/src/nonblocking/quic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -823,6 +823,7 @@ async fn handle_connection(
.available_load_capacity_in_throttling_duration(
params.peer_type,
params.total_stake,
stream_counter.get_connection_count(),
);

stream_counter.reset_throttling_params_if_needed();
Expand Down Expand Up @@ -1204,6 +1205,7 @@ impl ConnectionTable {
// use the same IP due to NAT. So counting all the streams from a given IP could be too restrictive.
Arc::new(ConnectionStreamCounter::new())
};
stream_counter.increment_connection_count(1);
connection_entry.push(ConnectionEntry::new(
exit.clone(),
peer_type,
Expand Down Expand Up @@ -1247,6 +1249,10 @@ impl ConnectionTable {
let new_size = e_ref.len();
if e_ref.is_empty() {
e.swap_remove_entry();
} else {
e_ref[0]
.stream_counter
.decrement_connection_count(old_size - new_size);
}
let connections_removed = old_size.saturating_sub(new_size);
self.total_size = self.total_size.saturating_sub(connections_removed);
Expand Down
64 changes: 54 additions & 10 deletions streamer/src/nonblocking/stream_throttle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use {
std::{
cmp,
sync::{
atomic::{AtomicU64, Ordering},
atomic::{AtomicU64, AtomicUsize, Ordering},
Arc, RwLock,
},
time::{Duration, Instant},
Expand Down Expand Up @@ -151,6 +151,7 @@ impl StakedStreamLoadEMA {
&self,
peer_type: ConnectionPeerType,
total_stake: u64,
connection_count: usize,
) -> u64 {
match peer_type {
ConnectionPeerType::Unstaked => self.max_unstaked_load_in_throttling_window,
Expand All @@ -161,11 +162,14 @@ impl StakedStreamLoadEMA {
self.max_staked_load_in_ema_window / 4,
));

// Formula is (max_load ^ 2 / current_load) * (stake / total_stake)
// Formula is (max_load ^ 2 / current_load) * (stake / total_stake) / connection_count
// The connection count must be considered, otherwise the streams per second from the stake
// node can be as large as 8 (the default connection limit per peer) times as designed.
let capacity_in_ema_window = (u128::from(self.max_staked_load_in_ema_window)
* u128::from(self.max_staked_load_in_ema_window)
* u128::from(stake))
/ (current_load * u128::from(total_stake));
/ (current_load * u128::from(total_stake))
/ connection_count as u128;

let calculated_capacity = capacity_in_ema_window
* u128::from(STREAM_THROTTLING_INTERVAL_MS)
Expand Down Expand Up @@ -198,13 +202,15 @@ impl StakedStreamLoadEMA {
pub(crate) struct ConnectionStreamCounter {
pub(crate) stream_count: AtomicU64,
last_throttling_instant: RwLock<tokio::time::Instant>,
connection_count: AtomicUsize,
}

impl ConnectionStreamCounter {
pub(crate) fn new() -> Self {
Self {
stream_count: AtomicU64::default(),
last_throttling_instant: RwLock::new(tokio::time::Instant::now()),
connection_count: AtomicUsize::default(),
}
}

Expand All @@ -223,6 +229,18 @@ impl ConnectionStreamCounter {
}
}
}

pub(crate) fn increment_connection_count(&self, count: usize) -> usize {
self.connection_count.fetch_add(count, Ordering::Relaxed)
}

pub(crate) fn decrement_connection_count(&self, count: usize) -> usize {
self.connection_count.fetch_sub(count, Ordering::Relaxed)
}

pub(crate) fn get_connection_count(&self) -> usize {
self.connection_count.load(Ordering::Relaxed)
}
}

#[cfg(test)]
Expand Down Expand Up @@ -253,6 +271,7 @@ pub mod test {
load_ema.available_load_capacity_in_throttling_duration(
ConnectionPeerType::Unstaked,
10000,
1,
),
10
);
Expand Down Expand Up @@ -281,6 +300,7 @@ pub mod test {
load_ema.available_load_capacity_in_throttling_duration(
ConnectionPeerType::Staked(15),
10000,
1,
),
30
);
Expand All @@ -291,6 +311,7 @@ pub mod test {
load_ema.available_load_capacity_in_throttling_duration(
ConnectionPeerType::Staked(1000),
10000,
1,
),
2000
);
Expand All @@ -302,6 +323,7 @@ pub mod test {
load_ema.available_load_capacity_in_throttling_duration(
ConnectionPeerType::Staked(15),
10000,
1,
),
120
);
Expand All @@ -312,6 +334,7 @@ pub mod test {
load_ema.available_load_capacity_in_throttling_duration(
ConnectionPeerType::Staked(1000),
10000,
1,
),
8000
);
Expand All @@ -324,6 +347,7 @@ pub mod test {
load_ema.available_load_capacity_in_throttling_duration(
ConnectionPeerType::Staked(15),
10000,
1,
),
120
);
Expand All @@ -333,6 +357,7 @@ pub mod test {
load_ema.available_load_capacity_in_throttling_duration(
ConnectionPeerType::Staked(1000),
10000,
1,
),
8000
);
Expand All @@ -343,6 +368,7 @@ pub mod test {
load_ema.available_load_capacity_in_throttling_duration(
ConnectionPeerType::Staked(1),
40000,
1,
),
load_ema
.max_unstaked_load_in_throttling_window
Expand Down Expand Up @@ -372,7 +398,8 @@ pub mod test {
assert!(
(46u64..=47).contains(&load_ema.available_load_capacity_in_throttling_duration(
ConnectionPeerType::Staked(15),
10000
10000,
1,
))
);

Expand All @@ -381,7 +408,8 @@ pub mod test {
assert!((3124u64..=3125).contains(
&load_ema.available_load_capacity_in_throttling_duration(
ConnectionPeerType::Staked(1000),
10000
10000,
1,
)
));

Expand All @@ -391,7 +419,8 @@ pub mod test {
assert!(
(92u64..=94).contains(&load_ema.available_load_capacity_in_throttling_duration(
ConnectionPeerType::Staked(15),
10000
10000,
1,
))
);

Expand All @@ -400,7 +429,8 @@ pub mod test {
assert!((6248u64..=6250).contains(
&load_ema.available_load_capacity_in_throttling_duration(
ConnectionPeerType::Staked(1000),
10000
10000,
1,
)
));

Expand All @@ -411,7 +441,8 @@ pub mod test {
assert_eq!(
load_ema.available_load_capacity_in_throttling_duration(
ConnectionPeerType::Staked(15),
10000
10000,
1,
),
150
);
Expand All @@ -420,17 +451,30 @@ pub mod test {
assert_eq!(
load_ema.available_load_capacity_in_throttling_duration(
ConnectionPeerType::Staked(1000),
10000
10000,
1,
),
10000
);

// function = ((12.5K * 12.5K) / 25% of 12.5K) * stake / total_stake / connection_count
// as there are more than 1 connection from the staked node
assert_eq!(
load_ema.available_load_capacity_in_throttling_duration(
ConnectionPeerType::Staked(1000),
10000,
4,
),
10000 / 4
);

// At 1/400000 stake weight, and minimum load, it should still allow
// max_unstaked_load_in_throttling_window + 1 streams.
assert_eq!(
load_ema.available_load_capacity_in_throttling_duration(
ConnectionPeerType::Staked(1),
400000
400000,
1,
),
load_ema
.max_unstaked_load_in_throttling_window
Expand Down

0 comments on commit d5ebd8f

Please sign in to comment.