Skip to content
This repository has been archived by the owner on Feb 8, 2024. It is now read-only.

Commit

Permalink
carve out sinks submodule
Browse files Browse the repository at this point in the history
  • Loading branch information
xvello committed Dec 12, 2023
1 parent c9b22b4 commit f3e08bb
Show file tree
Hide file tree
Showing 8 changed files with 60 additions and 44 deletions.
4 changes: 2 additions & 2 deletions capture/src/capture.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use crate::token::validate_token;
use crate::{
api::{CaptureError, CaptureResponse, CaptureResponseCode},
event::{EventFormData, EventQuery, ProcessedEvent, RawEvent},
router, sink,
router, sinks,
utils::uuid_v7,
};

Expand Down Expand Up @@ -209,7 +209,7 @@ pub fn extract_and_verify_token(events: &[RawEvent]) -> Result<String, CaptureEr

#[instrument(skip_all, fields(events = events.len()))]
pub async fn process_events<'a>(
sink: Arc<dyn sink::EventSink + Send + Sync>,
sink: Arc<dyn sinks::EventSink + Send + Sync>,
events: &'a [RawEvent],
context: &'a ProcessingContext,
) -> Result<(), CaptureError> {
Expand Down
2 changes: 1 addition & 1 deletion capture/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ pub mod prometheus;
pub mod redis;
pub mod router;
pub mod server;
pub mod sink;
pub mod sinks;
pub mod time;
pub mod token;
pub mod utils;
6 changes: 3 additions & 3 deletions capture/src/router.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,14 @@ use tower_http::trace::TraceLayer;

use crate::health::HealthRegistry;
use crate::{
capture, limiters::billing_limits::BillingLimiter, redis::Client, sink, time::TimeSource,
capture, limiters::billing_limits::BillingLimiter, redis::Client, sinks, time::TimeSource,
};

use crate::prometheus::{setup_metrics_recorder, track_metrics};

#[derive(Clone)]
pub struct State {
pub sink: Arc<dyn sink::EventSink + Send + Sync>,
pub sink: Arc<dyn sinks::EventSink + Send + Sync>,
pub timesource: Arc<dyn TimeSource + Send + Sync>,
pub redis: Arc<dyn Client + Send + Sync>,
pub billing: BillingLimiter,
Expand All @@ -30,7 +30,7 @@ async fn index() -> &'static str {

pub fn router<
TZ: TimeSource + Send + Sync + 'static,
S: sink::EventSink + Send + Sync + 'static,
S: sinks::EventSink + Send + Sync + 'static,
R: Client + Send + Sync + 'static,
>(
timesource: TZ,
Expand Down
7 changes: 4 additions & 3 deletions capture/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,8 @@ use crate::health::{ComponentStatus, HealthRegistry};
use crate::limiters::billing_limits::BillingLimiter;
use crate::limiters::partition_limits::PartitionLimiter;
use crate::redis::RedisClient;
use crate::{router, sink};
use crate::router;
use crate::sinks::{kafka, print};

pub async fn serve<F>(config: Config, listener: TcpListener, shutdown: F)
where
Expand All @@ -34,7 +35,7 @@ where
router::router(
crate::time::SystemTime {},
liveness,
sink::PrintSink {},
print::PrintSink {},
redis_client,
billing,
config.export_prometheus,
Expand All @@ -55,7 +56,7 @@ where
partition.report_metrics().await;
});
}
let sink = sink::KafkaSink::new(config.kafka, sink_liveness, partition)
let sink = kafka::KafkaSink::new(config.kafka, sink_liveness, partition)
.expect("failed to start Kafka sink");

router::router(
Expand Down
39 changes: 5 additions & 34 deletions capture/src/sink.rs → capture/src/sinks/kafka.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,10 @@ use std::time::Duration;

use async_trait::async_trait;
use metrics::{absolute_counter, counter, gauge, histogram};
use rdkafka::config::ClientConfig;
use rdkafka::error::{KafkaError, RDKafkaErrorCode};
use rdkafka::producer::future_producer::{FutureProducer, FutureRecord};
use rdkafka::producer::{DeliveryFuture, Producer};
use rdkafka::producer::{DeliveryFuture, FutureProducer, FutureRecord, Producer};
use rdkafka::util::Timeout;
use rdkafka::ClientConfig;
use tokio::task::JoinSet;
use tracing::log::{debug, error, info};
use tracing::{info_span, instrument, Instrument};
Expand All @@ -17,36 +16,7 @@ use crate::event::ProcessedEvent;
use crate::health::HealthHandle;
use crate::limiters::partition_limits::PartitionLimiter;
use crate::prometheus::report_dropped_events;

#[async_trait]
pub trait EventSink {
async fn send(&self, event: ProcessedEvent) -> Result<(), CaptureError>;
async fn send_batch(&self, events: Vec<ProcessedEvent>) -> Result<(), CaptureError>;
}

pub struct PrintSink {}

#[async_trait]
impl EventSink for PrintSink {
async fn send(&self, event: ProcessedEvent) -> Result<(), CaptureError> {
info!("single event: {:?}", event);
counter!("capture_events_ingested_total", 1);

Ok(())
}
async fn send_batch(&self, events: Vec<ProcessedEvent>) -> Result<(), CaptureError> {
let span = tracing::span!(tracing::Level::INFO, "batch of events");
let _enter = span.enter();

histogram!("capture_event_batch_size", events.len() as f64);
counter!("capture_events_ingested_total", events.len() as u64);
for event in events {
info!("event: {:?}", event);
}

Ok(())
}
}
use crate::sinks::EventSink;

struct KafkaContext {
liveness: HealthHandle,
Expand Down Expand Up @@ -295,7 +265,8 @@ mod tests {
use crate::event::ProcessedEvent;
use crate::health::HealthRegistry;
use crate::limiters::partition_limits::PartitionLimiter;
use crate::sink::{EventSink, KafkaSink};
use crate::sinks::kafka::KafkaSink;
use crate::sinks::EventSink;
use crate::utils::uuid_v7;
use rand::distributions::Alphanumeric;
use rand::Rng;
Expand Down
13 changes: 13 additions & 0 deletions capture/src/sinks/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
use async_trait::async_trait;

use crate::api::CaptureError;
use crate::event::ProcessedEvent;

pub mod kafka;
pub mod print;

#[async_trait]
pub trait EventSink {
async fn send(&self, event: ProcessedEvent) -> Result<(), CaptureError>;
async fn send_batch(&self, events: Vec<ProcessedEvent>) -> Result<(), CaptureError>;
}
31 changes: 31 additions & 0 deletions capture/src/sinks/print.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
use async_trait::async_trait;
use metrics::{counter, histogram};
use tracing::log::info;

use crate::api::CaptureError;
use crate::event::ProcessedEvent;
use crate::sinks::EventSink;

pub struct PrintSink {}

#[async_trait]
impl EventSink for PrintSink {
async fn send(&self, event: ProcessedEvent) -> Result<(), CaptureError> {
info!("single event: {:?}", event);
counter!("capture_events_ingested_total", 1);

Ok(())
}
async fn send_batch(&self, events: Vec<ProcessedEvent>) -> Result<(), CaptureError> {
let span = tracing::span!(tracing::Level::INFO, "batch of events");
let _enter = span.enter();

histogram!("capture_event_batch_size", events.len() as f64);
counter!("capture_events_ingested_total", events.len() as u64);
for event in events {
info!("event: {:?}", event);
}

Ok(())
}
}
2 changes: 1 addition & 1 deletion capture/tests/django_compat.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use capture::health::HealthRegistry;
use capture::limiters::billing_limits::BillingLimiter;
use capture::redis::MockRedisClient;
use capture::router::router;
use capture::sink::EventSink;
use capture::sinks::EventSink;
use capture::time::TimeSource;
use serde::Deserialize;
use serde_json::{json, Value};
Expand Down

0 comments on commit f3e08bb

Please sign in to comment.