diff --git a/capture/src/config.rs b/capture/src/config.rs index 07b7f89..d91e7b7 100644 --- a/capture/src/config.rs +++ b/capture/src/config.rs @@ -13,6 +13,9 @@ pub struct Config { pub redis_url: String, pub otel_url: Option, + #[envconfig(default = "false")] + pub overflow_enabled: bool, + #[envconfig(default = "100")] pub overflow_per_second_limit: NonZeroU32, diff --git a/capture/src/server.rs b/capture/src/server.rs index 0704987..8585036 100644 --- a/capture/src/server.rs +++ b/capture/src/server.rs @@ -48,24 +48,30 @@ where .register("rdkafka".to_string(), Duration::seconds(30)) .await; - let partition = OverflowLimiter::new( - config.overflow_per_second_limit, - config.overflow_burst_limit, - config.overflow_forced_keys, - ); - if config.export_prometheus { - let partition = partition.clone(); - tokio::spawn(async move { - partition.report_metrics().await; - }); - } - { - // Ensure that the rate limiter state does not grow unbounded - let partition = partition.clone(); - tokio::spawn(async move { - partition.clean_state().await; - }); - } + let partition = match config.overflow_enabled { + false => None, + true => { + let partition = OverflowLimiter::new( + config.overflow_per_second_limit, + config.overflow_burst_limit, + config.overflow_forced_keys, + ); + if config.export_prometheus { + let partition = partition.clone(); + tokio::spawn(async move { + partition.report_metrics().await; + }); + } + { + // Ensure that the rate limiter state does not grow unbounded + let partition = partition.clone(); + tokio::spawn(async move { + partition.clean_state().await; + }); + } + Some(partition) + } + }; let sink = KafkaSink::new(config.kafka, sink_liveness, partition) .expect("failed to start Kafka sink"); diff --git a/capture/src/sinks/kafka.rs b/capture/src/sinks/kafka.rs index 945e581..bff61b5 100644 --- a/capture/src/sinks/kafka.rs +++ b/capture/src/sinks/kafka.rs @@ -36,12 +36,11 @@ impl rdkafka::ClientContext for KafkaContext { for (topic, stats) in stats.topics { gauge!( "capture_kafka_produce_avg_batch_size_bytes", - "topic" => topic.clone() + "topic" => topic.clone() ) .set(stats.batchsize.avg as f64); gauge!( "capture_kafka_produce_avg_batch_size_events", - "topic" => topic ) .set(stats.batchcnt.avg as f64); @@ -49,30 +48,58 @@ impl rdkafka::ClientContext for KafkaContext { for (_, stats) in stats.brokers { let id_string = format!("{}", stats.nodeid); + if let Some(rtt) = stats.rtt { + gauge!( + "capture_kafka_produce_rtt_latency_ms", + "quantile" => "p50", + "broker" => id_string.clone() + ) + .set(rtt.p50 as f64); + gauge!( + "capture_kafka_produce_rtt_latency_ms", + "quantile" => "p90", + "broker" => id_string.clone() + ) + .set(rtt.p90 as f64); + gauge!( + "capture_kafka_produce_rtt_latency_ms", + "quantile" => "p95", + "broker" => id_string.clone() + ) + .set(rtt.p95 as f64); + gauge!( + "capture_kafka_produce_rtt_latency_ms", + "quantile" => "p99", + "broker" => id_string.clone() + ) + .set(rtt.p99 as f64); + } + gauge!( "capture_kafka_broker_requests_pending", - "broker" => id_string.clone() ) .set(stats.outbuf_cnt as f64); gauge!( "capture_kafka_broker_responses_awaiting", - "broker" => id_string.clone() ) .set(stats.waitresp_cnt as f64); counter!( "capture_kafka_broker_tx_errors_total", - "broker" => id_string.clone() ) .absolute(stats.txerrs); counter!( "capture_kafka_broker_rx_errors_total", - - "broker" => id_string + "broker" => id_string.clone() ) .absolute(stats.rxerrs); + counter!( + "capture_kafka_broker_request_timeouts", + "broker" => id_string + ) + .absolute(stats.req_timeouts); } } } @@ -80,7 +107,7 @@ impl rdkafka::ClientContext for KafkaContext { #[derive(Clone)] pub struct KafkaSink { producer: FutureProducer, - partition: OverflowLimiter, + partition: Option, main_topic: String, historical_topic: String, } @@ -89,7 +116,7 @@ impl KafkaSink { pub fn new( config: KafkaConfig, liveness: HealthHandle, - partition: OverflowLimiter, + partition: Option, ) -> anyhow::Result { info!("connecting to Kafka brokers at {}...", config.kafka_hosts); @@ -150,7 +177,11 @@ impl KafkaSink { DataType::AnalyticsHistorical => (&self.historical_topic, Some(event_key.as_str())), // We never trigger overflow on historical events DataType::AnalyticsMain => { // TODO: deprecate capture-led overflow or move logic in handler - if self.partition.is_limited(&event_key) { + let is_limited = match &self.partition { + None => false, + Some(partition) => partition.is_limited(&event_key), + }; + if is_limited { (&self.main_topic, None) // Analytics overflow goes to the main topic without locality } else { (&self.main_topic, Some(event_key.as_str())) @@ -280,11 +311,11 @@ mod tests { let handle = registry .register("one".to_string(), Duration::seconds(30)) .await; - let limiter = OverflowLimiter::new( + let limiter = Some(OverflowLimiter::new( NonZeroU32::new(10).unwrap(), NonZeroU32::new(10).unwrap(), None, - ); + )); let cluster = MockCluster::new(1).expect("failed to create mock brokers"); let config = config::KafkaConfig { kafka_producer_linger_ms: 0, diff --git a/capture/tests/common.rs b/capture/tests/common.rs index 788e6e2..868b27c 100644 --- a/capture/tests/common.rs +++ b/capture/tests/common.rs @@ -29,6 +29,7 @@ pub static DEFAULT_CONFIG: Lazy = Lazy::new(|| Config { print_sink: false, address: SocketAddr::from_str("127.0.0.1:0").unwrap(), redis_url: "redis://localhost:6379/".to_string(), + overflow_enabled: false, overflow_burst_limit: NonZeroU32::new(5).unwrap(), overflow_per_second_limit: NonZeroU32::new(10).unwrap(), overflow_forced_keys: None, diff --git a/capture/tests/events.rs b/capture/tests/events.rs index 111b02c..7d2defc 100644 --- a/capture/tests/events.rs +++ b/capture/tests/events.rs @@ -174,6 +174,7 @@ async fn it_overflows_events_on_burst() -> Result<()> { let mut config = DEFAULT_CONFIG.clone(); config.kafka.kafka_topic = topic.topic_name().to_string(); + config.overflow_enabled = true; config.overflow_burst_limit = NonZeroU32::new(2).unwrap(); config.overflow_per_second_limit = NonZeroU32::new(1).unwrap(); @@ -223,6 +224,7 @@ async fn it_does_not_overflow_team_with_different_ids() -> Result<()> { let mut config = DEFAULT_CONFIG.clone(); config.kafka.kafka_topic = topic.topic_name().to_string(); + config.overflow_enabled = true; config.overflow_burst_limit = NonZeroU32::new(1).unwrap(); config.overflow_per_second_limit = NonZeroU32::new(1).unwrap(); @@ -254,6 +256,58 @@ async fn it_does_not_overflow_team_with_different_ids() -> Result<()> { Ok(()) } +#[tokio::test] +async fn it_skips_overflows_when_disabled() -> Result<()> { + setup_tracing(); + + let token = random_string("token", 16); + let distinct_id = random_string("id", 16); + + let topic = EphemeralTopic::new().await; + + let mut config = DEFAULT_CONFIG.clone(); + config.kafka.kafka_topic = topic.topic_name().to_string(); + config.overflow_enabled = false; + config.overflow_burst_limit = NonZeroU32::new(2).unwrap(); + config.overflow_per_second_limit = NonZeroU32::new(1).unwrap(); + + let server = ServerHandle::for_config(config).await; + + let event = json!([{ + "token": token, + "event": "event1", + "distinct_id": distinct_id + },{ + "token": token, + "event": "event2", + "distinct_id": distinct_id + },{ + "token": token, + "event": "event3", + "distinct_id": distinct_id + }]); + + let res = server.capture_events(event.to_string()).await; + assert_eq!(StatusCode::OK, res.status()); + + assert_eq!( + topic.next_message_key()?.unwrap(), + format!("{}:{}", token, distinct_id) + ); + + assert_eq!( + topic.next_message_key()?.unwrap(), + format!("{}:{}", token, distinct_id) + ); + + // Should have triggered overflow, but has not + assert_eq!( + topic.next_message_key()?.unwrap(), + format!("{}:{}", token, distinct_id) + ); + Ok(()) +} + #[tokio::test] async fn it_trims_distinct_id() -> Result<()> { setup_tracing(); diff --git a/feature-flags/src/redis.rs b/feature-flags/src/redis.rs index 3aeec47..89dde42 100644 --- a/feature-flags/src/redis.rs +++ b/feature-flags/src/redis.rs @@ -59,11 +59,6 @@ impl Client for RedisClient { Ok(fut?) } - // TODO: Ask Xavier if there's a better way to handle this. - // The problem: I want to match on the error type from this function, and do appropriate things like 400 or 500 response. - // Buuut, if I use anyhow::Error, I can't reverse-coerce into a NotFound or serde_pickle::Error. - // Thus, I need to create a custom error enum of all possible errors + my own custom not found, so I can match on it. - // Is this the canonical way? async fn get(&self, k: String) -> Result { let mut conn = self.client.get_async_connection().await?; diff --git a/feature-flags/src/test_utils.rs b/feature-flags/src/test_utils.rs index 57ac225..a2063b4 100644 --- a/feature-flags/src/test_utils.rs +++ b/feature-flags/src/test_utils.rs @@ -3,8 +3,7 @@ use serde_json::json; use std::sync::Arc; use crate::{ - flag_definitions, redis::{Client, RedisClient}, team::{self, Team} -}; + flag_definitions, redis::{Client, RedisClient}, team::{self, Team}} use rand::{distributions::Alphanumeric, Rng}; pub fn random_string(prefix: &str, length: usize) -> String {