Skip to content
This repository has been archived by the owner on Feb 8, 2024. It is now read-only.

feat(overflow): add overflow_forced_keys envvar #54

Merged
merged 5 commits into from
Nov 27, 2023
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions capture/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ pub struct Config {
#[envconfig(default = "1000")]
pub burst_limit: NonZeroU32,

pub overflow_forced_keys: Option<String>, // Coma-delimited keys
tomasfarias marked this conversation as resolved.
Show resolved Hide resolved

#[envconfig(nested = true)]
pub kafka: KafkaConfig,

Expand Down
55 changes: 47 additions & 8 deletions capture/src/partition_limits.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,26 +6,37 @@
/// before it causes a negative impact. In this case, instead of passing the error to the customer
/// with a 429, we relax our ordering constraints and temporarily override the key, meaning the
/// customers data will be spread across all partitions.
use std::{num::NonZeroU32, sync::Arc};
use std::collections::HashSet;
use std::num::NonZeroU32;
use std::sync::Arc;

use governor::{clock, state::keyed::DefaultKeyedStateStore, Quota, RateLimiter};

// See: https://docs.rs/governor/latest/governor/_guide/index.html#usage-in-multiple-threads
#[derive(Clone)]
pub struct PartitionLimiter {
limiter: Arc<RateLimiter<String, DefaultKeyedStateStore<String>, clock::DefaultClock>>,
forced_keys: HashSet<String>,
}

impl PartitionLimiter {
pub fn new(per_second: NonZeroU32, burst: NonZeroU32) -> Self {
pub fn new(per_second: NonZeroU32, burst: NonZeroU32, forced_keys: Option<String>) -> Self {
let quota = Quota::per_second(per_second).allow_burst(burst);
let limiter = Arc::new(governor::RateLimiter::dashmap(quota));

PartitionLimiter { limiter }
let forced_keys: HashSet<String> = match forced_keys {
None => HashSet::new(),
Some(values) => values.split(',').map(String::from).collect(),
};

PartitionLimiter {
limiter,
forced_keys,
}
}

pub fn is_limited(&self, key: &String) -> bool {
self.limiter.check_key(key).is_err()
self.forced_keys.contains(key) || self.limiter.check_key(key).is_err()
}
}

Expand All @@ -36,8 +47,11 @@ mod tests {

#[tokio::test]
async fn low_limits() {
let limiter =
PartitionLimiter::new(NonZeroU32::new(1).unwrap(), NonZeroU32::new(1).unwrap());
let limiter = PartitionLimiter::new(
NonZeroU32::new(1).unwrap(),
NonZeroU32::new(1).unwrap(),
None,
);
let token = String::from("test");

assert!(!limiter.is_limited(&token));
Expand All @@ -46,13 +60,38 @@ mod tests {

#[tokio::test]
async fn bursting() {
let limiter =
PartitionLimiter::new(NonZeroU32::new(1).unwrap(), NonZeroU32::new(3).unwrap());
let limiter = PartitionLimiter::new(
NonZeroU32::new(1).unwrap(),
NonZeroU32::new(3).unwrap(),
None,
);
let token = String::from("test");

assert!(!limiter.is_limited(&token));
assert!(!limiter.is_limited(&token));
assert!(!limiter.is_limited(&token));
assert!(limiter.is_limited(&token));
}

#[tokio::test]
async fn forced_key() {
let key_one = String::from("one");
let key_two = String::from("two");
let key_three = String::from("three");
let forced_keys = Some(String::from("one,three"));

let limiter = PartitionLimiter::new(
NonZeroU32::new(1).unwrap(),
NonZeroU32::new(1).unwrap(),
forced_keys,
);

// One and three are limited from the start, two is not
assert!(limiter.is_limited(&key_one));
assert!(!limiter.is_limited(&key_two));
assert!(limiter.is_limited(&key_three));

// Two is limited on the second event
assert!(limiter.is_limited(&key_two));
}
}
6 changes: 5 additions & 1 deletion capture/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,11 @@ where
.register("rdkafka".to_string(), Duration::seconds(30))
.await;

let partition = PartitionLimiter::new(config.per_second_limit, config.burst_limit);
let partition = PartitionLimiter::new(
config.per_second_limit,
config.burst_limit,
config.overflow_forced_keys,
);
let sink = sink::KafkaSink::new(config.kafka, sink_liveness, partition).unwrap();

router::router(
Expand Down
Loading