From a67bcd82716c9a84b07b2f7da50823eee8f696da Mon Sep 17 00:00:00 2001 From: Xavier Vello Date: Tue, 21 Nov 2023 12:20:23 +0100 Subject: [PATCH 1/4] feat(overflow): add overflow_forced_keys envvar --- capture/src/config.rs | 2 ++ capture/src/partition_limits.rs | 37 ++++++++++++++++++++++++++++----- capture/src/server.rs | 2 +- 3 files changed, 35 insertions(+), 6 deletions(-) diff --git a/capture/src/config.rs b/capture/src/config.rs index 8a471b3..02e561a 100644 --- a/capture/src/config.rs +++ b/capture/src/config.rs @@ -19,6 +19,8 @@ pub struct Config { #[envconfig(default = "1000")] pub burst_limit: NonZeroU32, + pub overflow_forced_keys: Option, // Coma-delimited keys + #[envconfig(nested = true)] pub kafka: KafkaConfig, diff --git a/capture/src/partition_limits.rs b/capture/src/partition_limits.rs index fe63ec1..8c56b21 100644 --- a/capture/src/partition_limits.rs +++ b/capture/src/partition_limits.rs @@ -7,6 +7,7 @@ /// with a 429, we relax our ordering constraints and temporarily override the key, meaning the /// customers data will be spread across all partitions. use std::{num::NonZeroU32, sync::Arc}; +use std::collections::HashSet; use governor::{clock, state::keyed::DefaultKeyedStateStore, Quota, RateLimiter}; @@ -14,18 +15,24 @@ use governor::{clock, state::keyed::DefaultKeyedStateStore, Quota, RateLimiter}; #[derive(Clone)] pub struct PartitionLimiter { limiter: Arc, clock::DefaultClock>>, + forced_keys: HashSet, } impl PartitionLimiter { - pub fn new(per_second: NonZeroU32, burst: NonZeroU32) -> Self { + pub fn new(per_second: NonZeroU32, burst: NonZeroU32, forced_keys: Option) -> Self { let quota = Quota::per_second(per_second).allow_burst(burst); let limiter = Arc::new(governor::RateLimiter::dashmap(quota)); - PartitionLimiter { limiter } + let forced_keys: HashSet = match forced_keys { + None => HashSet::new(), + Some(values) => values.split(',').map(String::from).collect(), + }; + + PartitionLimiter { limiter, forced_keys } } pub fn is_limited(&self, key: &String) -> bool { - self.limiter.check_key(key).is_err() + self.forced_keys.contains(key) || self.limiter.check_key(key).is_err() } } @@ -37,7 +44,7 @@ mod tests { #[tokio::test] async fn low_limits() { let limiter = - PartitionLimiter::new(NonZeroU32::new(1).unwrap(), NonZeroU32::new(1).unwrap()); + PartitionLimiter::new(NonZeroU32::new(1).unwrap(), NonZeroU32::new(1).unwrap(), None); let token = String::from("test"); assert!(!limiter.is_limited(&token)); @@ -47,7 +54,7 @@ mod tests { #[tokio::test] async fn bursting() { let limiter = - PartitionLimiter::new(NonZeroU32::new(1).unwrap(), NonZeroU32::new(3).unwrap()); + PartitionLimiter::new(NonZeroU32::new(1).unwrap(), NonZeroU32::new(3).unwrap(), None); let token = String::from("test"); assert!(!limiter.is_limited(&token)); @@ -55,4 +62,24 @@ mod tests { assert!(!limiter.is_limited(&token)); assert!(limiter.is_limited(&token)); } + + #[tokio::test] + async fn forced_key() { + let key_one = String::from("one"); + let key_two = String::from("two"); + let key_three = String::from("three"); + let forced_keys = Some(String::from("one,three")); + + let limiter = + PartitionLimiter::new(NonZeroU32::new(1).unwrap(), NonZeroU32::new(1).unwrap(), forced_keys); + + // One and three are limited from the start, two is not + assert!(limiter.is_limited(&key_one)); + assert!(!limiter.is_limited(&key_two)); + assert!(limiter.is_limited(&key_three)); + + // Two is limited on the second event + assert!(limiter.is_limited(&key_two)); + + } } diff --git a/capture/src/server.rs b/capture/src/server.rs index c84bd20..bb4ad11 100644 --- a/capture/src/server.rs +++ b/capture/src/server.rs @@ -44,7 +44,7 @@ where .register("rdkafka".to_string(), Duration::seconds(30)) .await; - let partition = PartitionLimiter::new(config.per_second_limit, config.burst_limit); + let partition = PartitionLimiter::new(config.per_second_limit, config.burst_limit, config.overflow_forced_keys); let sink = sink::KafkaSink::new(config.kafka, sink_liveness, partition).unwrap(); router::router( From 7f0bc104a39159db18192b33b5522e53db0d37aa Mon Sep 17 00:00:00 2001 From: Xavier Vello Date: Tue, 21 Nov 2023 12:25:13 +0100 Subject: [PATCH 2/4] fmt --- capture/src/partition_limits.rs | 30 +++++++++++++++++++++--------- capture/src/server.rs | 6 +++++- 2 files changed, 26 insertions(+), 10 deletions(-) diff --git a/capture/src/partition_limits.rs b/capture/src/partition_limits.rs index 8c56b21..3866657 100644 --- a/capture/src/partition_limits.rs +++ b/capture/src/partition_limits.rs @@ -6,8 +6,9 @@ /// before it causes a negative impact. In this case, instead of passing the error to the customer /// with a 429, we relax our ordering constraints and temporarily override the key, meaning the /// customers data will be spread across all partitions. -use std::{num::NonZeroU32, sync::Arc}; use std::collections::HashSet; +use std::num::NonZeroU32; +use std::sync::Arc; use governor::{clock, state::keyed::DefaultKeyedStateStore, Quota, RateLimiter}; @@ -28,7 +29,10 @@ impl PartitionLimiter { Some(values) => values.split(',').map(String::from).collect(), }; - PartitionLimiter { limiter, forced_keys } + PartitionLimiter { + limiter, + forced_keys, + } } pub fn is_limited(&self, key: &String) -> bool { @@ -43,8 +47,11 @@ mod tests { #[tokio::test] async fn low_limits() { - let limiter = - PartitionLimiter::new(NonZeroU32::new(1).unwrap(), NonZeroU32::new(1).unwrap(), None); + let limiter = PartitionLimiter::new( + NonZeroU32::new(1).unwrap(), + NonZeroU32::new(1).unwrap(), + None, + ); let token = String::from("test"); assert!(!limiter.is_limited(&token)); @@ -53,8 +60,11 @@ mod tests { #[tokio::test] async fn bursting() { - let limiter = - PartitionLimiter::new(NonZeroU32::new(1).unwrap(), NonZeroU32::new(3).unwrap(), None); + let limiter = PartitionLimiter::new( + NonZeroU32::new(1).unwrap(), + NonZeroU32::new(3).unwrap(), + None, + ); let token = String::from("test"); assert!(!limiter.is_limited(&token)); @@ -70,8 +80,11 @@ mod tests { let key_three = String::from("three"); let forced_keys = Some(String::from("one,three")); - let limiter = - PartitionLimiter::new(NonZeroU32::new(1).unwrap(), NonZeroU32::new(1).unwrap(), forced_keys); + let limiter = PartitionLimiter::new( + NonZeroU32::new(1).unwrap(), + NonZeroU32::new(1).unwrap(), + forced_keys, + ); // One and three are limited from the start, two is not assert!(limiter.is_limited(&key_one)); @@ -80,6 +93,5 @@ mod tests { // Two is limited on the second event assert!(limiter.is_limited(&key_two)); - } } diff --git a/capture/src/server.rs b/capture/src/server.rs index bb4ad11..d5586ac 100644 --- a/capture/src/server.rs +++ b/capture/src/server.rs @@ -44,7 +44,11 @@ where .register("rdkafka".to_string(), Duration::seconds(30)) .await; - let partition = PartitionLimiter::new(config.per_second_limit, config.burst_limit, config.overflow_forced_keys); + let partition = PartitionLimiter::new( + config.per_second_limit, + config.burst_limit, + config.overflow_forced_keys, + ); let sink = sink::KafkaSink::new(config.kafka, sink_liveness, partition).unwrap(); router::router( From 6166b8ddbf44f8ca5ca999eabab4331cf0f76a8e Mon Sep 17 00:00:00 2001 From: Xavier Vello Date: Tue, 21 Nov 2023 12:31:47 +0100 Subject: [PATCH 3/4] update integration test config --- capture-server/tests/common.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/capture-server/tests/common.rs b/capture-server/tests/common.rs index c0ee9ba..2183419 100644 --- a/capture-server/tests/common.rs +++ b/capture-server/tests/common.rs @@ -29,6 +29,7 @@ pub static DEFAULT_CONFIG: Lazy = Lazy::new(|| Config { redis_url: "redis://localhost:6379/".to_string(), burst_limit: NonZeroU32::new(5).unwrap(), per_second_limit: NonZeroU32::new(10).unwrap(), + overflow_forced_keys: None, kafka: KafkaConfig { kafka_producer_linger_ms: 0, // Send messages as soon as possible kafka_producer_queue_mib: 10, From 8ddfce80c66ff80fff4e2aee2635491e681ea18b Mon Sep 17 00:00:00 2001 From: Xavier Vello Date: Mon, 27 Nov 2023 14:02:19 +0100 Subject: [PATCH 4/4] fix test --- capture/src/sink.rs | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/capture/src/sink.rs b/capture/src/sink.rs index c1d291a..93ba6c5 100644 --- a/capture/src/sink.rs +++ b/capture/src/sink.rs @@ -301,8 +301,11 @@ mod tests { let handle = registry .register("one".to_string(), Duration::seconds(30)) .await; - let limiter = - PartitionLimiter::new(NonZeroU32::new(10).unwrap(), NonZeroU32::new(10).unwrap()); + let limiter = PartitionLimiter::new( + NonZeroU32::new(10).unwrap(), + NonZeroU32::new(10).unwrap(), + None, + ); let cluster = MockCluster::new(1).expect("failed to create mock brokers"); let config = config::KafkaConfig { kafka_producer_linger_ms: 0,