diff --git a/capture/src/capture.rs b/capture/src/capture.rs index 1b2c386..ecfec8e 100644 --- a/capture/src/capture.rs +++ b/capture/src/capture.rs @@ -22,7 +22,7 @@ use crate::token::validate_token; use crate::{ api::{CaptureError, CaptureResponse, CaptureResponseCode}, event::{EventFormData, EventQuery, ProcessedEvent, RawEvent}, - router, sink, + router, sinks, utils::uuid_v7, }; @@ -209,7 +209,7 @@ pub fn extract_and_verify_token(events: &[RawEvent]) -> Result( - sink: Arc, + sink: Arc, events: &'a [RawEvent], context: &'a ProcessingContext, ) -> Result<(), CaptureError> { diff --git a/capture/src/lib.rs b/capture/src/lib.rs index 184efc7..058e994 100644 --- a/capture/src/lib.rs +++ b/capture/src/lib.rs @@ -8,7 +8,7 @@ pub mod prometheus; pub mod redis; pub mod router; pub mod server; -pub mod sink; +pub mod sinks; pub mod time; pub mod token; pub mod utils; diff --git a/capture/src/router.rs b/capture/src/router.rs index 52e49d6..ee282f5 100644 --- a/capture/src/router.rs +++ b/capture/src/router.rs @@ -11,14 +11,14 @@ use tower_http::trace::TraceLayer; use crate::health::HealthRegistry; use crate::{ - capture, limiters::billing_limits::BillingLimiter, redis::Client, sink, time::TimeSource, + capture, limiters::billing_limits::BillingLimiter, redis::Client, sinks, time::TimeSource, }; use crate::prometheus::{setup_metrics_recorder, track_metrics}; #[derive(Clone)] pub struct State { - pub sink: Arc, + pub sink: Arc, pub timesource: Arc, pub redis: Arc, pub billing: BillingLimiter, @@ -30,7 +30,7 @@ async fn index() -> &'static str { pub fn router< TZ: TimeSource + Send + Sync + 'static, - S: sink::EventSink + Send + Sync + 'static, + S: sinks::EventSink + Send + Sync + 'static, R: Client + Send + Sync + 'static, >( timesource: TZ, diff --git a/capture/src/server.rs b/capture/src/server.rs index bcf8da0..ce54fe2 100644 --- a/capture/src/server.rs +++ b/capture/src/server.rs @@ -9,7 +9,8 @@ use crate::health::{ComponentStatus, HealthRegistry}; use crate::limiters::billing_limits::BillingLimiter; use crate::limiters::partition_limits::PartitionLimiter; use crate::redis::RedisClient; -use crate::{router, sink}; +use crate::router; +use crate::sinks::{kafka, print}; pub async fn serve(config: Config, listener: TcpListener, shutdown: F) where @@ -34,7 +35,7 @@ where router::router( crate::time::SystemTime {}, liveness, - sink::PrintSink {}, + print::PrintSink {}, redis_client, billing, config.export_prometheus, @@ -55,7 +56,7 @@ where partition.report_metrics().await; }); } - let sink = sink::KafkaSink::new(config.kafka, sink_liveness, partition) + let sink = kafka::KafkaSink::new(config.kafka, sink_liveness, partition) .expect("failed to start Kafka sink"); router::router( diff --git a/capture/src/sink.rs b/capture/src/sinks/kafka.rs similarity index 92% rename from capture/src/sink.rs rename to capture/src/sinks/kafka.rs index 97f64f0..2d36101 100644 --- a/capture/src/sink.rs +++ b/capture/src/sinks/kafka.rs @@ -2,11 +2,10 @@ use std::time::Duration; use async_trait::async_trait; use metrics::{absolute_counter, counter, gauge, histogram}; -use rdkafka::config::ClientConfig; use rdkafka::error::{KafkaError, RDKafkaErrorCode}; -use rdkafka::producer::future_producer::{FutureProducer, FutureRecord}; -use rdkafka::producer::{DeliveryFuture, Producer}; +use rdkafka::producer::{DeliveryFuture, FutureProducer, FutureRecord, Producer}; use rdkafka::util::Timeout; +use rdkafka::ClientConfig; use tokio::task::JoinSet; use tracing::log::{debug, error, info}; use tracing::{info_span, instrument, Instrument}; @@ -17,36 +16,7 @@ use crate::event::ProcessedEvent; use crate::health::HealthHandle; use crate::limiters::partition_limits::PartitionLimiter; use crate::prometheus::report_dropped_events; - -#[async_trait] -pub trait EventSink { - async fn send(&self, event: ProcessedEvent) -> Result<(), CaptureError>; - async fn send_batch(&self, events: Vec) -> Result<(), CaptureError>; -} - -pub struct PrintSink {} - -#[async_trait] -impl EventSink for PrintSink { - async fn send(&self, event: ProcessedEvent) -> Result<(), CaptureError> { - info!("single event: {:?}", event); - counter!("capture_events_ingested_total", 1); - - Ok(()) - } - async fn send_batch(&self, events: Vec) -> Result<(), CaptureError> { - let span = tracing::span!(tracing::Level::INFO, "batch of events"); - let _enter = span.enter(); - - histogram!("capture_event_batch_size", events.len() as f64); - counter!("capture_events_ingested_total", events.len() as u64); - for event in events { - info!("event: {:?}", event); - } - - Ok(()) - } -} +use crate::sinks::EventSink; struct KafkaContext { liveness: HealthHandle, @@ -295,7 +265,8 @@ mod tests { use crate::event::ProcessedEvent; use crate::health::HealthRegistry; use crate::limiters::partition_limits::PartitionLimiter; - use crate::sink::{EventSink, KafkaSink}; + use crate::sinks::kafka::KafkaSink; + use crate::sinks::EventSink; use crate::utils::uuid_v7; use rand::distributions::Alphanumeric; use rand::Rng; diff --git a/capture/src/sinks/mod.rs b/capture/src/sinks/mod.rs new file mode 100644 index 0000000..7183250 --- /dev/null +++ b/capture/src/sinks/mod.rs @@ -0,0 +1,13 @@ +use async_trait::async_trait; + +use crate::api::CaptureError; +use crate::event::ProcessedEvent; + +pub mod kafka; +pub mod print; + +#[async_trait] +pub trait EventSink { + async fn send(&self, event: ProcessedEvent) -> Result<(), CaptureError>; + async fn send_batch(&self, events: Vec) -> Result<(), CaptureError>; +} diff --git a/capture/src/sinks/print.rs b/capture/src/sinks/print.rs new file mode 100644 index 0000000..84add74 --- /dev/null +++ b/capture/src/sinks/print.rs @@ -0,0 +1,31 @@ +use async_trait::async_trait; +use metrics::{counter, histogram}; +use tracing::log::info; + +use crate::api::CaptureError; +use crate::event::ProcessedEvent; +use crate::sinks::EventSink; + +pub struct PrintSink {} + +#[async_trait] +impl EventSink for PrintSink { + async fn send(&self, event: ProcessedEvent) -> Result<(), CaptureError> { + info!("single event: {:?}", event); + counter!("capture_events_ingested_total", 1); + + Ok(()) + } + async fn send_batch(&self, events: Vec) -> Result<(), CaptureError> { + let span = tracing::span!(tracing::Level::INFO, "batch of events"); + let _enter = span.enter(); + + histogram!("capture_event_batch_size", events.len() as f64); + counter!("capture_events_ingested_total", events.len() as u64); + for event in events { + info!("event: {:?}", event); + } + + Ok(()) + } +} diff --git a/capture/tests/django_compat.rs b/capture/tests/django_compat.rs index 6606fbc..b1c2df5 100644 --- a/capture/tests/django_compat.rs +++ b/capture/tests/django_compat.rs @@ -10,7 +10,7 @@ use capture::health::HealthRegistry; use capture::limiters::billing_limits::BillingLimiter; use capture::redis::MockRedisClient; use capture::router::router; -use capture::sink::EventSink; +use capture::sinks::EventSink; use capture::time::TimeSource; use serde::Deserialize; use serde_json::{json, Value};