diff --git a/rust/capture/src/api.rs b/rust/capture/src/api.rs index 97d84857075e4..646bd2d3c9c8b 100644 --- a/rust/capture/src/api.rs +++ b/rust/capture/src/api.rs @@ -84,6 +84,9 @@ impl IntoResponse for CaptureError { pub enum DataType { AnalyticsMain, AnalyticsHistorical, + ClientIngestionWarning, + HeatmapMain, + ExceptionMain, } #[derive(Clone, Debug, Serialize, Eq, PartialEq)] pub struct ProcessedEvent { diff --git a/rust/capture/src/config.rs b/rust/capture/src/config.rs index cfc38877fec33..6c66d09e68454 100644 --- a/rust/capture/src/config.rs +++ b/rust/capture/src/config.rs @@ -54,6 +54,12 @@ pub struct KafkaConfig { pub kafka_topic: String, #[envconfig(default = "events_plugin_ingestion_historical")] pub kafka_historical_topic: String, + #[envconfig(default = "events_plugin_ingestion")] + pub kafka_client_ingestion_warning_topic: String, + #[envconfig(default = "events_plugin_ingestion")] + pub kafka_exceptions_topic: String, + #[envconfig(default = "events_plugin_ingestion")] + pub kafka_heatmaps_topic: String, #[envconfig(default = "false")] pub kafka_tls: bool, } diff --git a/rust/capture/src/sinks/kafka.rs b/rust/capture/src/sinks/kafka.rs index b82d3c342a115..760c6f31740ba 100644 --- a/rust/capture/src/sinks/kafka.rs +++ b/rust/capture/src/sinks/kafka.rs @@ -110,6 +110,9 @@ pub struct KafkaSink { partition: Option, main_topic: String, historical_topic: String, + client_ingestion_warning_topic: String, + exceptions_topic: String, + heatmaps_topic: String, } impl KafkaSink { @@ -158,6 +161,9 @@ impl KafkaSink { partition, main_topic: config.kafka_topic, historical_topic: config.kafka_historical_topic, + client_ingestion_warning_topic: config.kafka_client_ingestion_warning_topic, + exceptions_topic: config.kafka_exceptions_topic, + heatmaps_topic: config.kafka_heatmaps_topic, }) } @@ -187,6 +193,12 @@ impl KafkaSink { (&self.main_topic, Some(event_key.as_str())) } } + DataType::ClientIngestionWarning => ( + &self.client_ingestion_warning_topic, + Some(event_key.as_str()), + ), + DataType::HeatmapMain => (&self.heatmaps_topic, Some(event_key.as_str())), + DataType::ExceptionMain => (&self.exceptions_topic, Some(event_key.as_str())), }; match self.producer.send_result(FutureRecord { @@ -325,6 +337,9 @@ mod tests { kafka_hosts: cluster.bootstrap_servers(), kafka_topic: "events_plugin_ingestion".to_string(), kafka_historical_topic: "events_plugin_ingestion_historical".to_string(), + kafka_client_ingestion_warning_topic: "events_plugin_ingestion".to_string(), + kafka_exceptions_topic: "events_plugin_ingestion".to_string(), + kafka_heatmaps_topic: "events_plugin_ingestion".to_string(), kafka_tls: false, }; let sink = KafkaSink::new(config, handle, limiter).expect("failed to create sink"); diff --git a/rust/capture/src/v0_endpoint.rs b/rust/capture/src/v0_endpoint.rs index ff4b90f2662e2..83290e1016742 100644 --- a/rust/capture/src/v0_endpoint.rs +++ b/rust/capture/src/v0_endpoint.rs @@ -182,9 +182,12 @@ pub fn process_single_event( return Err(CaptureError::MissingEventName); } - let data_type = match context.historical_migration { - true => DataType::AnalyticsHistorical, - false => DataType::AnalyticsMain, + let data_type = match (event.event.as_str(), context.historical_migration) { + ("$$client_ingestion_warning", _) => DataType::ClientIngestionWarning, + ("$exception", _) => DataType::ExceptionMain, + ("$$heatmap", _) => DataType::HeatmapMain, + (_, true) => DataType::AnalyticsHistorical, + (_, false) => DataType::AnalyticsMain, }; let data = serde_json::to_string(&event).map_err(|e| { diff --git a/rust/capture/tests/common.rs b/rust/capture/tests/common.rs index 5dd4c639aa5f0..f469f8c6815a4 100644 --- a/rust/capture/tests/common.rs +++ b/rust/capture/tests/common.rs @@ -45,6 +45,9 @@ pub static DEFAULT_CONFIG: Lazy = Lazy::new(|| Config { kafka_hosts: "kafka:9092".to_string(), kafka_topic: "events_plugin_ingestion".to_string(), kafka_historical_topic: "events_plugin_ingestion_historical".to_string(), + kafka_client_ingestion_warning_topic: "events_plugin_ingestion".to_string(), + kafka_exceptions_topic: "events_plugin_ingestion".to_string(), + kafka_heatmaps_topic: "events_plugin_ingestion".to_string(), kafka_tls: false, }, otel_url: None, @@ -179,6 +182,15 @@ impl EphemeralTopic { } } + pub(crate) fn assert_empty(&self) { + assert!( + self.consumer + .poll(Timeout::After(Duration::from_secs(1))) + .is_none(), + "topic holds more messages" + ) + } + pub fn topic_name(&self) -> &str { &self.topic_name } diff --git a/rust/capture/tests/events.rs b/rust/capture/tests/events.rs index 0554aae905ec8..8fc95c10bc2b7 100644 --- a/rust/capture/tests/events.rs +++ b/rust/capture/tests/events.rs @@ -1,13 +1,13 @@ use std::num::NonZeroU32; use time::Duration; +use crate::common::*; use anyhow::Result; use assert_json_diff::assert_json_include; use capture::limiters::billing::QuotaResource; use reqwest::StatusCode; use serde_json::json; - -use crate::common::*; +use uuid::Uuid; mod common; #[tokio::test] @@ -411,3 +411,104 @@ async fn it_applies_billing_limits() -> Result<()> { Ok(()) } + +#[tokio::test] +async fn it_routes_exceptions_and_heapmaps_to_separate_topics() -> Result<()> { + setup_tracing(); + + let token = random_string("token", 16); + let distinct_id = random_string("id", 16); + let uuids: [Uuid; 5] = core::array::from_fn(|_| Uuid::now_v7()); + + let main_topic = EphemeralTopic::new().await; + let warnings_topic = EphemeralTopic::new().await; + let exceptions_topic = EphemeralTopic::new().await; + let heatmaps_topic = EphemeralTopic::new().await; + + let mut config = DEFAULT_CONFIG.clone(); + config.kafka.kafka_topic = main_topic.topic_name().to_string(); + config.kafka.kafka_client_ingestion_warning_topic = warnings_topic.topic_name().to_string(); + config.kafka.kafka_exceptions_topic = exceptions_topic.topic_name().to_string(); + config.kafka.kafka_heatmaps_topic = heatmaps_topic.topic_name().to_string(); + + let server = ServerHandle::for_config(config).await; + + let event = json!([{ + "token": token, + "event": "$$client_ingestion_warning", + "uuid": uuids[4], + "distinct_id": distinct_id + },{ + "token": token, + "event": "event1", + "uuid": uuids[0], + "distinct_id": distinct_id + },{ + "token": token, + "event": "$$heatmap", + "uuid": uuids[1], + "distinct_id": distinct_id + },{ + "token": token, + "event": "$exception", + "uuid": uuids[2], + "distinct_id": distinct_id + },{ + "token": token, + "event": "event2", + "uuid": uuids[3], + "distinct_id": distinct_id + }]); + + let res = server.capture_events(event.to_string()).await; + assert_eq!(StatusCode::OK, res.status()); + + // Regular events are pushed to the main analytics topic + assert_json_include!( + actual: main_topic.next_event()?, + expected: json!({ + "token": token, + "uuid": uuids[0], + "distinct_id": distinct_id + }) + ); + assert_json_include!( + actual: main_topic.next_event()?, + expected: json!({ + "token": token, + "uuid": uuids[3], + "distinct_id": distinct_id + }) + ); + main_topic.assert_empty(); + + // Special-cased events are pushed to their own topics + assert_json_include!( + actual: exceptions_topic.next_event()?, + expected: json!({ + "token": token, + "uuid": uuids[2], + "distinct_id": distinct_id + }) + ); + exceptions_topic.assert_empty(); + assert_json_include!( + actual: heatmaps_topic.next_event()?, + expected: json!({ + "token": token, + "uuid": uuids[1], + "distinct_id": distinct_id + }) + ); + heatmaps_topic.assert_empty(); + assert_json_include!( + actual: warnings_topic.next_event()?, + expected: json!({ + "token": token, + "uuid": uuids[4], + "distinct_id": distinct_id + }) + ); + warnings_topic.assert_empty(); + Ok(()) +}