diff --git a/capture-server/tests/common.rs b/capture-server/tests/common.rs index fa86881..2959d83 100644 --- a/capture-server/tests/common.rs +++ b/capture-server/tests/common.rs @@ -28,6 +28,7 @@ pub static DEFAULT_CONFIG: Lazy = Lazy::new(|| Config { print_sink: false, address: SocketAddr::from_str("127.0.0.1:0").unwrap(), redis_url: "redis://localhost:6379/".to_string(), + overflow_enabled: true, overflow_burst_limit: NonZeroU32::new(5).unwrap(), overflow_per_second_limit: NonZeroU32::new(10).unwrap(), overflow_forced_keys: None, diff --git a/capture/src/config.rs b/capture/src/config.rs index 0c6ab1c..7f147c7 100644 --- a/capture/src/config.rs +++ b/capture/src/config.rs @@ -13,6 +13,9 @@ pub struct Config { pub redis_url: String, pub otel_url: Option, + #[envconfig(default = "true")] + pub overflow_enabled: bool, + #[envconfig(default = "100")] pub overflow_per_second_limit: NonZeroU32, diff --git a/capture/src/server.rs b/capture/src/server.rs index 22a1f3b..1cfbae0 100644 --- a/capture/src/server.rs +++ b/capture/src/server.rs @@ -46,24 +46,31 @@ where .register("rdkafka".to_string(), Duration::seconds(30)) .await; - let partition = OverflowLimiter::new( - config.overflow_per_second_limit, - config.overflow_burst_limit, - config.overflow_forced_keys, - ); - if config.export_prometheus { - let partition = partition.clone(); - tokio::spawn(async move { - partition.report_metrics().await; - }); - } - { - // Ensure that the rate limiter state does not grow unbounded - let partition = partition.clone(); - tokio::spawn(async move { - partition.clean_state().await; - }); - } + let partition = match config.overflow_enabled { + true => { + let partition = OverflowLimiter::new( + config.overflow_per_second_limit, + config.overflow_burst_limit, + config.overflow_forced_keys, + ); + if config.export_prometheus { + let partition = partition.clone(); + tokio::spawn(async move { + partition.report_metrics().await; + }); + } + { + // Ensure that the rate limiter state does not grow unbounded + let partition = partition.clone(); + tokio::spawn(async move { + partition.clean_state().await; + }); + } + Some(partition) + } + false => None, + }; + let sink = KafkaSink::new(config.kafka, sink_liveness, partition) .expect("failed to start Kafka sink"); diff --git a/capture/src/sinks/kafka.rs b/capture/src/sinks/kafka.rs index 4a2bd94..262b719 100644 --- a/capture/src/sinks/kafka.rs +++ b/capture/src/sinks/kafka.rs @@ -82,14 +82,14 @@ impl rdkafka::ClientContext for KafkaContext { pub struct KafkaSink { producer: FutureProducer, topic: String, - partition: OverflowLimiter, + partition: Option, } impl KafkaSink { pub fn new( config: KafkaConfig, liveness: HealthHandle, - partition: OverflowLimiter, + partition: Option, ) -> anyhow::Result { info!("connecting to Kafka brokers at {}...", config.kafka_hosts); @@ -206,7 +206,10 @@ impl KafkaSink { impl Event for KafkaSink { #[instrument(skip_all)] async fn send(&self, event: ProcessedEvent) -> Result<(), CaptureError> { - let limited = self.partition.is_limited(&event.key()); + let limited = match &self.partition { + Some(limiter) => limiter.is_limited(&event.key()), + None => false, + }; let ack = Self::kafka_send(self.producer.clone(), self.topic.clone(), event, limited).await?; histogram!("capture_event_batch_size").record(1.0); @@ -222,7 +225,10 @@ impl Event for KafkaSink { for event in events { let producer = self.producer.clone(); let topic = self.topic.clone(); - let limited = self.partition.is_limited(&event.key()); + let limited = match &self.partition { + Some(limiter) => limiter.is_limited(&event.key()), + None => false, + }; // We await kafka_send to get events in the producer queue sequentially let ack = Self::kafka_send(producer, topic, event, limited).await?; @@ -295,7 +301,7 @@ mod tests { kafka_topic: "events_plugin_ingestion".to_string(), kafka_tls: false, }; - let sink = KafkaSink::new(config, handle, limiter).expect("failed to create sink"); + let sink = KafkaSink::new(config, handle, Some(limiter)).expect("failed to create sink"); (cluster, sink) }