Skip to content

Commit

Permalink
feat(capture): allow to route some events by name to separate topics (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
xvello authored Jul 11, 2024
1 parent 207f02f commit 0f7a506
Show file tree
Hide file tree
Showing 6 changed files with 145 additions and 5 deletions.
3 changes: 3 additions & 0 deletions rust/capture/src/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,9 @@ impl IntoResponse for CaptureError {
pub enum DataType {
AnalyticsMain,
AnalyticsHistorical,
ClientIngestionWarning,
HeatmapMain,
ExceptionMain,
}
#[derive(Clone, Debug, Serialize, Eq, PartialEq)]
pub struct ProcessedEvent {
Expand Down
6 changes: 6 additions & 0 deletions rust/capture/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,12 @@ pub struct KafkaConfig {
pub kafka_topic: String,
#[envconfig(default = "events_plugin_ingestion_historical")]
pub kafka_historical_topic: String,
#[envconfig(default = "events_plugin_ingestion")]
pub kafka_client_ingestion_warning_topic: String,
#[envconfig(default = "events_plugin_ingestion")]
pub kafka_exceptions_topic: String,
#[envconfig(default = "events_plugin_ingestion")]
pub kafka_heatmaps_topic: String,
#[envconfig(default = "false")]
pub kafka_tls: bool,
}
15 changes: 15 additions & 0 deletions rust/capture/src/sinks/kafka.rs
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,9 @@ pub struct KafkaSink {
partition: Option<OverflowLimiter>,
main_topic: String,
historical_topic: String,
client_ingestion_warning_topic: String,
exceptions_topic: String,
heatmaps_topic: String,
}

impl KafkaSink {
Expand Down Expand Up @@ -158,6 +161,9 @@ impl KafkaSink {
partition,
main_topic: config.kafka_topic,
historical_topic: config.kafka_historical_topic,
client_ingestion_warning_topic: config.kafka_client_ingestion_warning_topic,
exceptions_topic: config.kafka_exceptions_topic,
heatmaps_topic: config.kafka_heatmaps_topic,
})
}

Expand Down Expand Up @@ -187,6 +193,12 @@ impl KafkaSink {
(&self.main_topic, Some(event_key.as_str()))
}
}
DataType::ClientIngestionWarning => (
&self.client_ingestion_warning_topic,
Some(event_key.as_str()),
),
DataType::HeatmapMain => (&self.heatmaps_topic, Some(event_key.as_str())),
DataType::ExceptionMain => (&self.exceptions_topic, Some(event_key.as_str())),
};

match self.producer.send_result(FutureRecord {
Expand Down Expand Up @@ -325,6 +337,9 @@ mod tests {
kafka_hosts: cluster.bootstrap_servers(),
kafka_topic: "events_plugin_ingestion".to_string(),
kafka_historical_topic: "events_plugin_ingestion_historical".to_string(),
kafka_client_ingestion_warning_topic: "events_plugin_ingestion".to_string(),
kafka_exceptions_topic: "events_plugin_ingestion".to_string(),
kafka_heatmaps_topic: "events_plugin_ingestion".to_string(),
kafka_tls: false,
};
let sink = KafkaSink::new(config, handle, limiter).expect("failed to create sink");
Expand Down
9 changes: 6 additions & 3 deletions rust/capture/src/v0_endpoint.rs
Original file line number Diff line number Diff line change
Expand Up @@ -182,9 +182,12 @@ pub fn process_single_event(
return Err(CaptureError::MissingEventName);
}

let data_type = match context.historical_migration {
true => DataType::AnalyticsHistorical,
false => DataType::AnalyticsMain,
let data_type = match (event.event.as_str(), context.historical_migration) {
("$$client_ingestion_warning", _) => DataType::ClientIngestionWarning,
("$exception", _) => DataType::ExceptionMain,
("$$heatmap", _) => DataType::HeatmapMain,
(_, true) => DataType::AnalyticsHistorical,
(_, false) => DataType::AnalyticsMain,
};

let data = serde_json::to_string(&event).map_err(|e| {
Expand Down
12 changes: 12 additions & 0 deletions rust/capture/tests/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,9 @@ pub static DEFAULT_CONFIG: Lazy<Config> = Lazy::new(|| Config {
kafka_hosts: "kafka:9092".to_string(),
kafka_topic: "events_plugin_ingestion".to_string(),
kafka_historical_topic: "events_plugin_ingestion_historical".to_string(),
kafka_client_ingestion_warning_topic: "events_plugin_ingestion".to_string(),
kafka_exceptions_topic: "events_plugin_ingestion".to_string(),
kafka_heatmaps_topic: "events_plugin_ingestion".to_string(),
kafka_tls: false,
},
otel_url: None,
Expand Down Expand Up @@ -179,6 +182,15 @@ impl EphemeralTopic {
}
}

pub(crate) fn assert_empty(&self) {
assert!(
self.consumer
.poll(Timeout::After(Duration::from_secs(1)))
.is_none(),
"topic holds more messages"
)
}

pub fn topic_name(&self) -> &str {
&self.topic_name
}
Expand Down
105 changes: 103 additions & 2 deletions rust/capture/tests/events.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
use std::num::NonZeroU32;
use time::Duration;

use crate::common::*;
use anyhow::Result;
use assert_json_diff::assert_json_include;
use capture::limiters::billing::QuotaResource;
use reqwest::StatusCode;
use serde_json::json;

use crate::common::*;
use uuid::Uuid;
mod common;

#[tokio::test]
Expand Down Expand Up @@ -411,3 +411,104 @@ async fn it_applies_billing_limits() -> Result<()> {

Ok(())
}

#[tokio::test]
async fn it_routes_exceptions_and_heapmaps_to_separate_topics() -> Result<()> {
setup_tracing();

let token = random_string("token", 16);
let distinct_id = random_string("id", 16);
let uuids: [Uuid; 5] = core::array::from_fn(|_| Uuid::now_v7());

let main_topic = EphemeralTopic::new().await;
let warnings_topic = EphemeralTopic::new().await;
let exceptions_topic = EphemeralTopic::new().await;
let heatmaps_topic = EphemeralTopic::new().await;

let mut config = DEFAULT_CONFIG.clone();
config.kafka.kafka_topic = main_topic.topic_name().to_string();
config.kafka.kafka_client_ingestion_warning_topic = warnings_topic.topic_name().to_string();
config.kafka.kafka_exceptions_topic = exceptions_topic.topic_name().to_string();
config.kafka.kafka_heatmaps_topic = heatmaps_topic.topic_name().to_string();

let server = ServerHandle::for_config(config).await;

let event = json!([{
"token": token,
"event": "$$client_ingestion_warning",
"uuid": uuids[4],
"distinct_id": distinct_id
},{
"token": token,
"event": "event1",
"uuid": uuids[0],
"distinct_id": distinct_id
},{
"token": token,
"event": "$$heatmap",
"uuid": uuids[1],
"distinct_id": distinct_id
},{
"token": token,
"event": "$exception",
"uuid": uuids[2],
"distinct_id": distinct_id
},{
"token": token,
"event": "event2",
"uuid": uuids[3],
"distinct_id": distinct_id
}]);

let res = server.capture_events(event.to_string()).await;
assert_eq!(StatusCode::OK, res.status());

// Regular events are pushed to the main analytics topic
assert_json_include!(
actual: main_topic.next_event()?,
expected: json!({
"token": token,
"uuid": uuids[0],
"distinct_id": distinct_id
})
);
assert_json_include!(
actual: main_topic.next_event()?,
expected: json!({
"token": token,
"uuid": uuids[3],
"distinct_id": distinct_id
})
);
main_topic.assert_empty();

// Special-cased events are pushed to their own topics
assert_json_include!(
actual: exceptions_topic.next_event()?,
expected: json!({
"token": token,
"uuid": uuids[2],
"distinct_id": distinct_id
})
);
exceptions_topic.assert_empty();
assert_json_include!(
actual: heatmaps_topic.next_event()?,
expected: json!({
"token": token,
"uuid": uuids[1],
"distinct_id": distinct_id
})
);
heatmaps_topic.assert_empty();
assert_json_include!(
actual: warnings_topic.next_event()?,
expected: json!({
"token": token,
"uuid": uuids[4],
"distinct_id": distinct_id
})
);
warnings_topic.assert_empty();
Ok(())
}

0 comments on commit 0f7a506

Please sign in to comment.