Skip to content

Commit

Permalink
chore(capture): add billing limits integration tests (#23389)
Browse files Browse the repository at this point in the history
  • Loading branch information
xvello authored Jul 2, 2024
1 parent a3a6964 commit 22c14f9
Show file tree
Hide file tree
Showing 7 changed files with 161 additions and 30 deletions.
2 changes: 2 additions & 0 deletions rust/capture/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,10 @@ pub struct Config {
#[envconfig(default = "capture")]
pub otel_service_name: String,

// Used for integration tests
#[envconfig(default = "true")]
pub export_prometheus: bool,
pub redis_key_prefix: Option<String>,
}

#[derive(Envconfig, Clone)]
Expand Down
64 changes: 45 additions & 19 deletions rust/capture/src/limiters/billing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ pub enum QuotaResource {
}

impl QuotaResource {
fn as_str(&self) -> &'static str {
pub fn as_str(&self) -> &'static str {
match self {
Self::Events => "events",
Self::Recordings => "recordings",
Expand All @@ -53,6 +53,7 @@ pub enum LimiterError {
pub struct BillingLimiter {
limited: Arc<RwLock<HashSet<String>>>,
redis: Arc<dyn Client + Send + Sync>,
redis_key_prefix: String,
interval: Duration,
updated: Arc<RwLock<OffsetDateTime>>,
}
Expand All @@ -69,6 +70,7 @@ impl BillingLimiter {
pub fn new(
interval: Duration,
redis: Arc<dyn Client + Send + Sync>,
redis_key_prefix: Option<String>,
) -> anyhow::Result<BillingLimiter> {
let limited = Arc::new(RwLock::new(HashSet::new()));

Expand All @@ -81,22 +83,20 @@ impl BillingLimiter {
limited,
updated,
redis,
redis_key_prefix: redis_key_prefix.unwrap_or_default(),
})
}

#[instrument(skip_all)]
async fn fetch_limited(
client: &Arc<dyn Client + Send + Sync>,
key_prefix: &str,
resource: &QuotaResource,
) -> anyhow::Result<Vec<String>> {
let now = OffsetDateTime::now_utc().unix_timestamp();

let key = format!("{key_prefix}{QUOTA_LIMITER_CACHE_KEY}{}", resource.as_str());
client
.zrangebyscore(
format!("{QUOTA_LIMITER_CACHE_KEY}{}", resource.as_str()),
now.to_string(),
String::from("+Inf"),
)
.zrangebyscore(key, now.to_string(), String::from("+Inf"))
.await
}

Expand Down Expand Up @@ -131,7 +131,7 @@ impl BillingLimiter {
// On prod atm we call this around 15 times per second at peak times, and it usually
// completes in <1ms.

let set = Self::fetch_limited(&self.redis, &resource).await;
let set = Self::fetch_limited(&self.redis, &self.redis_key_prefix, &resource).await;

tracing::debug!("fetched set from redis, caching");

Expand Down Expand Up @@ -178,25 +178,51 @@ mod tests {

#[tokio::test]
async fn test_dynamic_limited() {
let client = MockRedisClient::new().zrangebyscore_ret(vec![String::from("banana")]);
let client = MockRedisClient::new()
.zrangebyscore_ret("@posthog/quota-limits/events", vec![String::from("banana")]);
let client = Arc::new(client);

let limiter = BillingLimiter::new(Duration::microseconds(1), client)
let limiter = BillingLimiter::new(Duration::microseconds(1), client, None)
.expect("Failed to create billing limiter");

assert_eq!(
limiter
.is_limited("idk it doesn't matter", QuotaResource::Events)
assert!(
!limiter
.is_limited("not_limited", QuotaResource::Events)
.await,
false
);
assert!(limiter.is_limited("banana", QuotaResource::Events).await);
}

#[tokio::test]
async fn test_custom_key_prefix() {
let client = MockRedisClient::new().zrangebyscore_ret(
"prefix//@posthog/quota-limits/events",
vec![String::from("banana")],
);
let client = Arc::new(client);

assert_eq!(
limiter
.is_limited("some_org_hit_limits", QuotaResource::Events)
// Default lookup without prefix fails
let limiter = BillingLimiter::new(Duration::microseconds(1), client.clone(), None)
.expect("Failed to create billing limiter");
assert!(!limiter.is_limited("banana", QuotaResource::Events).await);

// Limiter using the correct prefix
let prefixed_limiter = BillingLimiter::new(
Duration::microseconds(1),
client,
Some("prefix//".to_string()),
)
.expect("Failed to create billing limiter");

assert!(
!prefixed_limiter
.is_limited("not_limited", QuotaResource::Events)
.await,
false
);
assert!(limiter.is_limited("banana", QuotaResource::Events).await);
assert!(
prefixed_limiter
.is_limited("banana", QuotaResource::Events)
.await
);
}
}
19 changes: 11 additions & 8 deletions rust/capture/src/redis.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use std::collections::HashMap;
use std::time::Duration;

use anyhow::Result;
use anyhow::{anyhow, Result};
use async_trait::async_trait;
use redis::AsyncCommands;
use tokio::time::timeout;
Expand Down Expand Up @@ -48,19 +49,18 @@ impl Client for RedisClient {
// mockall got really annoying with async and results so I'm just gonna do my own
#[derive(Clone)]
pub struct MockRedisClient {
zrangebyscore_ret: Vec<String>,
zrangebyscore_ret: HashMap<String, Vec<String>>,
}

impl MockRedisClient {
pub fn new() -> MockRedisClient {
MockRedisClient {
zrangebyscore_ret: Vec::new(),
zrangebyscore_ret: HashMap::new(),
}
}

pub fn zrangebyscore_ret(&mut self, ret: Vec<String>) -> Self {
self.zrangebyscore_ret = ret;

pub fn zrangebyscore_ret(&mut self, key: &str, ret: Vec<String>) -> Self {
self.zrangebyscore_ret.insert(key.to_owned(), ret);
self.clone()
}
}
Expand All @@ -74,7 +74,10 @@ impl Default for MockRedisClient {
#[async_trait]
impl Client for MockRedisClient {
// A very simplified wrapper, but works for our usage
async fn zrangebyscore(&self, _k: String, _min: String, _max: String) -> Result<Vec<String>> {
Ok(self.zrangebyscore_ret.clone())
async fn zrangebyscore(&self, key: String, _min: String, _max: String) -> Result<Vec<String>> {
match self.zrangebyscore_ret.get(&key) {
Some(val) => Ok(val.clone()),
None => Err(anyhow!("unknown key")),
}
}
}
8 changes: 6 additions & 2 deletions rust/capture/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,12 @@ where
let redis_client =
Arc::new(RedisClient::new(config.redis_url).expect("failed to create redis client"));

let billing = BillingLimiter::new(Duration::seconds(5), redis_client.clone())
.expect("failed to create billing limiter");
let billing = BillingLimiter::new(
Duration::seconds(5),
redis_client.clone(),
config.redis_key_prefix,
)
.expect("failed to create billing limiter");

let app = if config.print_sink {
// Print sink is only used for local debug, don't allow a container with it to run on prod
Expand Down
34 changes: 34 additions & 0 deletions rust/capture/tests/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
use std::default::Default;
use std::net::SocketAddr;
use std::num::NonZeroU32;
use std::ops::Add;
use std::str::FromStr;
use std::string::ToString;
use std::sync::{Arc, Once};
Expand All @@ -17,12 +18,15 @@ use rdkafka::config::{ClientConfig, FromClientConfig};
use rdkafka::consumer::{BaseConsumer, Consumer};
use rdkafka::util::Timeout;
use rdkafka::{Message, TopicPartitionList};
use redis::{Client, Commands};
use time::OffsetDateTime;
use tokio::net::TcpListener;
use tokio::sync::Notify;
use tokio::time::timeout;
use tracing::{debug, warn};

use capture::config::{Config, KafkaConfig};
use capture::limiters::billing::QuotaResource;
use capture::server::serve;

pub static DEFAULT_CONFIG: Lazy<Config> = Lazy::new(|| Config {
Expand All @@ -47,6 +51,7 @@ pub static DEFAULT_CONFIG: Lazy<Config> = Lazy::new(|| Config {
otel_sampling_rate: 0.0,
otel_service_name: "capture-testing".to_string(),
export_prometheus: false,
redis_key_prefix: None,
});

static TRACING_INIT: Once = Once::new();
Expand Down Expand Up @@ -206,6 +211,35 @@ async fn delete_topic(topic: String) {
.expect("failed to delete topic");
}

pub struct PrefixedRedis {
key_prefix: String,
client: Client,
}

impl PrefixedRedis {
pub async fn new() -> Self {
Self {
key_prefix: random_string("test", 8) + "/",
client: Client::open(DEFAULT_CONFIG.redis_url.clone())
.expect("failed to create redis client"),
}
}

pub fn key_prefix(&self) -> Option<String> {
Some(self.key_prefix.to_string())
}

pub fn add_billing_limit(&self, res: QuotaResource, token: &str, until: time::Duration) {
let key = format!("{}@posthog/quota-limits/{}", self.key_prefix, res.as_str());
let score = OffsetDateTime::now_utc().add(until).unix_timestamp();
self.client
.get_connection()
.expect("failed to get connection")
.zadd::<String, i64, &str, i64>(key, token, score)
.expect("failed to insert in redis");
}
}

pub fn random_string(prefix: &str, length: usize) -> String {
let suffix: String = rand::thread_rng()
.sample_iter(Alphanumeric)
Expand Down
2 changes: 1 addition & 1 deletion rust/capture/tests/django_compat.rs
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ async fn it_matches_django_capture_behaviour() -> anyhow::Result<()> {
let timesource = FixedTime { time: case.now };

let redis = Arc::new(MockRedisClient::new());
let billing = BillingLimiter::new(Duration::weeks(1), redis.clone())
let billing = BillingLimiter::new(Duration::weeks(1), redis.clone(), None)
.expect("failed to create billing limiter");

let app = router(
Expand Down
62 changes: 62 additions & 0 deletions rust/capture/tests/events.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
use std::num::NonZeroU32;
use time::Duration;

use anyhow::Result;
use assert_json_diff::assert_json_include;
use capture::limiters::billing::QuotaResource;
use reqwest::StatusCode;
use serde_json::json;

Expand Down Expand Up @@ -349,3 +351,63 @@ async fn it_trims_distinct_id() -> Result<()> {

Ok(())
}

#[tokio::test]
async fn it_applies_billing_limits() -> Result<()> {
setup_tracing();
let token1 = random_string("token", 16);
let token2 = random_string("token", 16);
let token3 = random_string("token", 16);
let distinct_id = random_string("id", 16);

let topic = EphemeralTopic::new().await;

// Setup billing limits:
// - token1 limit is expired -> accept messages
// - token2 limit is active -> drop messages
// - token3 is not in redis -> accept by default
let redis = PrefixedRedis::new().await;
redis.add_billing_limit(QuotaResource::Events, &token1, Duration::seconds(-60));
redis.add_billing_limit(QuotaResource::Events, &token2, Duration::seconds(60));

let mut config = DEFAULT_CONFIG.clone();
config.redis_key_prefix = redis.key_prefix();
config.kafka.kafka_topic = topic.topic_name().to_string();
let server = ServerHandle::for_config(config).await;

for payload in [
json!({
"token": token1,
"batch": [{"event": "event1","distinct_id": distinct_id}]
}),
json!({
"token": token2,
"batch": [{"event": "to drop","distinct_id": distinct_id}]
}),
json!({
"token": token3,
"batch": [{"event": "event1","distinct_id": distinct_id}]
}),
] {
let res = server.capture_events(payload.to_string()).await;
assert_eq!(StatusCode::OK, res.status());
}

// Batches 1 and 3 go through, batch 2 is dropped
assert_json_include!(
actual: topic.next_event()?,
expected: json!({
"token": token1,
"distinct_id": distinct_id
})
);
assert_json_include!(
actual: topic.next_event()?,
expected: json!({
"token": token3,
"distinct_id": distinct_id
})
);

Ok(())
}

0 comments on commit 22c14f9

Please sign in to comment.