From fc0c52e99973396999ee85381db8ca3dc3f6e679 Mon Sep 17 00:00:00 2001 From: Paul Masurel Date: Mon, 8 Jul 2024 18:58:08 +0900 Subject: [PATCH] Minor refactoring in the rate limiter. (#5201) The point is to be able to remove the needless private function, and test public functions. --- quickwit/quickwit-common/src/tower/rate.rs | 23 +++++++++++++--- quickwit/quickwit-ingest/Cargo.toml | 1 + .../src/ingest_v2/rate_meter.rs | 27 ++++++++++--------- 3 files changed, 34 insertions(+), 17 deletions(-) diff --git a/quickwit/quickwit-common/src/tower/rate.rs b/quickwit/quickwit-common/src/tower/rate.rs index 7d365a0668a..90f3ff99dd9 100644 --- a/quickwit/quickwit-common/src/tower/rate.rs +++ b/quickwit/quickwit-common/src/tower/rate.rs @@ -46,10 +46,9 @@ impl ConstantRate { /// /// # Panics /// - /// This function panics if `period` is 0. + /// This function panics if `period` is 0 while work is != 0. pub const fn new(work: u64, period: Duration) -> Self { - assert!(!period.is_zero()); - + assert!(!period.is_zero() || work == 0u64); Self { work, period } } @@ -69,8 +68,10 @@ impl ConstantRate { /// /// This function panics if `new_period` is 0. pub fn rescale(&self, new_period: Duration) -> Self { + if self.work == 0u64 { + return Self::new(0u64, new_period); + } assert!(!new_period.is_zero()); - let new_work = self.work() as u128 * new_period.as_nanos() / self.period().as_nanos(); Self::new(new_work as u64, new_period) } @@ -90,6 +91,20 @@ impl Rate for ConstantRate { mod tests { use super::*; + #[test] + #[should_panic] + fn test_rescale_zero_duration_panics() { + ConstantRate::bytes_per_period(ByteSize::b(1), Duration::default()); + } + + #[test] + fn test_rescale_zero_duration_accepted_if_no_work() { + let rate = ConstantRate::bytes_per_period(ByteSize::b(0), Duration::default()); + let rescaled_rate = rate.rescale(Duration::from_secs(1)); + assert_eq!(rescaled_rate.work_bytes(), ByteSize::b(0)); + assert_eq!(rescaled_rate.period(), Duration::from_secs(1)); + } + #[test] fn test_rescale() { let rate = ConstantRate::bytes_per_period(ByteSize::mib(5), Duration::from_secs(5)); diff --git a/quickwit/quickwit-ingest/Cargo.toml b/quickwit/quickwit-ingest/Cargo.toml index 495a6cee2df..5ac233859f8 100644 --- a/quickwit/quickwit-ingest/Cargo.toml +++ b/quickwit/quickwit-ingest/Cargo.toml @@ -48,6 +48,7 @@ mockall = { workspace = true } rand = { workspace = true } rand_distr = { workspace = true } tempfile = { workspace = true } +tokio = { workspace = true, features = ["test-util"]} quickwit-actors = { workspace = true, features = ["testsuite"] } quickwit-cluster = { workspace = true, features = ["testsuite"] } diff --git a/quickwit/quickwit-ingest/src/ingest_v2/rate_meter.rs b/quickwit/quickwit-ingest/src/ingest_v2/rate_meter.rs index be0d9e9230f..b2eb4748916 100644 --- a/quickwit/quickwit-ingest/src/ingest_v2/rate_meter.rs +++ b/quickwit/quickwit-ingest/src/ingest_v2/rate_meter.rs @@ -17,9 +17,8 @@ // You should have received a copy of the GNU Affero General Public License // along with this program. If not, see . -use std::time::Instant; - use quickwit_common::tower::ConstantRate; +use tokio::time::Instant; /// A naive rate meter that tracks how much work was performed during a period of time defined by /// two successive calls to `harvest`. @@ -47,10 +46,7 @@ impl RateMeter { /// Returns the average work rate since the last call to this method and resets the internal /// state. pub fn harvest(&mut self) -> ConstantRate { - self.harvest_inner(Instant::now()) - } - - fn harvest_inner(&mut self, now: Instant) -> ConstantRate { + let now = Instant::now(); let elapsed = now.duration_since(self.harvested_at); let rate = ConstantRate::new(self.total_work, elapsed); self.total_work = 0; @@ -67,20 +63,25 @@ mod tests { use super::*; - #[test] - fn test_rate_meter() { + #[tokio::test] + async fn test_rate_meter() { + tokio::time::pause(); let mut rate_meter = RateMeter::default(); - assert_eq!(rate_meter.total_work, 0); - let now = Instant::now(); - rate_meter.harvested_at = now; + let rate = rate_meter.harvest(); + assert_eq!(rate.work(), 0); + assert!(rate.period().is_zero()); - let rate = rate_meter.harvest_inner(now + Duration::from_millis(100)); + tokio::time::advance(Duration::from_millis(100)).await; + + let rate = rate_meter.harvest(); assert_eq!(rate.work(), 0); assert_eq!(rate.period(), Duration::from_millis(100)); rate_meter.update(1); - let rate = rate_meter.harvest_inner(now + Duration::from_millis(200)); + tokio::time::advance(Duration::from_millis(100)).await; + + let rate = rate_meter.harvest(); assert_eq!(rate.work(), 1); assert_eq!(rate.period(), Duration::from_millis(100)); }