Skip to content
This repository has been archived by the owner on Feb 8, 2024. It is now read-only.

carve out the limiters and sinks sub-modules #70

Merged
merged 4 commits into from
Dec 13, 2023
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions capture/src/capture.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,14 @@ use metrics::counter;
use time::OffsetDateTime;
use tracing::instrument;

use crate::billing_limits::QuotaResource;
use crate::event::{Compression, ProcessingContext};
use crate::limiters::billing_limits::QuotaResource;
xvello marked this conversation as resolved.
Show resolved Hide resolved
use crate::prometheus::report_dropped_events;
use crate::token::validate_token;
use crate::{
api::{CaptureError, CaptureResponse, CaptureResponseCode},
event::{EventFormData, EventQuery, ProcessedEvent, RawEvent},
router, sink,
router, sinks,
utils::uuid_v7,
};

Expand Down Expand Up @@ -209,7 +209,7 @@ pub fn extract_and_verify_token(events: &[RawEvent]) -> Result<String, CaptureEr

#[instrument(skip_all, fields(events = events.len()))]
pub async fn process_events<'a>(
sink: Arc<dyn sink::EventSink + Send + Sync>,
sink: Arc<dyn sinks::EventSink + Send + Sync>,
xvello marked this conversation as resolved.
Show resolved Hide resolved
events: &'a [RawEvent],
context: &'a ProcessingContext,
) -> Result<(), CaptureError> {
Expand Down
5 changes: 2 additions & 3 deletions capture/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,14 @@
pub mod api;
pub mod billing_limits;
pub mod capture;
pub mod config;
pub mod event;
pub mod health;
pub mod partition_limits;
pub mod limiters;
pub mod prometheus;
pub mod redis;
pub mod router;
pub mod server;
pub mod sink;
pub mod sinks;
pub mod time;
pub mod token;
pub mod utils;
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ mod tests {
use time::Duration;

use crate::{
billing_limits::{BillingLimiter, QuotaResource},
limiters::billing_limits::{BillingLimiter, QuotaResource},
redis::MockRedisClient,
};

Expand Down
2 changes: 2 additions & 0 deletions capture/src/limiters/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
pub mod billing_limits;
pub mod partition_limits;
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ impl PartitionLimiter {

#[cfg(test)]
mod tests {
use crate::partition_limits::PartitionLimiter;
use crate::limiters::partition_limits::PartitionLimiter;
use std::num::NonZeroU32;

#[tokio::test]
Expand Down
8 changes: 5 additions & 3 deletions capture/src/router.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,15 @@ use tower_http::cors::{AllowHeaders, AllowOrigin, CorsLayer};
use tower_http::trace::TraceLayer;

use crate::health::HealthRegistry;
use crate::{billing_limits::BillingLimiter, capture, redis::Client, sink, time::TimeSource};
use crate::{
capture, limiters::billing_limits::BillingLimiter, redis::Client, sinks, time::TimeSource,
};

use crate::prometheus::{setup_metrics_recorder, track_metrics};

#[derive(Clone)]
pub struct State {
pub sink: Arc<dyn sink::EventSink + Send + Sync>,
pub sink: Arc<dyn sinks::EventSink + Send + Sync>,
pub timesource: Arc<dyn TimeSource + Send + Sync>,
pub redis: Arc<dyn Client + Send + Sync>,
pub billing: BillingLimiter,
Expand All @@ -28,7 +30,7 @@ async fn index() -> &'static str {

pub fn router<
TZ: TimeSource + Send + Sync + 'static,
S: sink::EventSink + Send + Sync + 'static,
S: sinks::EventSink + Send + Sync + 'static,
R: Client + Send + Sync + 'static,
>(
timesource: TZ,
Expand Down
11 changes: 6 additions & 5 deletions capture/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,13 @@ use std::sync::Arc;

use time::Duration;

use crate::billing_limits::BillingLimiter;
use crate::config::Config;
use crate::health::{ComponentStatus, HealthRegistry};
use crate::partition_limits::PartitionLimiter;
use crate::limiters::billing_limits::BillingLimiter;
use crate::limiters::partition_limits::PartitionLimiter;
use crate::redis::RedisClient;
use crate::{router, sink};
use crate::router;
use crate::sinks::{kafka, print};

pub async fn serve<F>(config: Config, listener: TcpListener, shutdown: F)
where
Expand All @@ -34,7 +35,7 @@ where
router::router(
crate::time::SystemTime {},
liveness,
sink::PrintSink {},
print::PrintSink {},
redis_client,
billing,
config.export_prometheus,
Expand All @@ -55,7 +56,7 @@ where
partition.report_metrics().await;
});
}
let sink = sink::KafkaSink::new(config.kafka, sink_liveness, partition)
let sink = kafka::KafkaSink::new(config.kafka, sink_liveness, partition)
.expect("failed to start Kafka sink");

router::router(
Expand Down
43 changes: 7 additions & 36 deletions capture/src/sink.rs → capture/src/sinks/kafka.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,10 @@ use std::time::Duration;

use async_trait::async_trait;
use metrics::{absolute_counter, counter, gauge, histogram};
use rdkafka::config::ClientConfig;
use rdkafka::error::{KafkaError, RDKafkaErrorCode};
use rdkafka::producer::future_producer::{FutureProducer, FutureRecord};
use rdkafka::producer::{DeliveryFuture, Producer};
use rdkafka::producer::{DeliveryFuture, FutureProducer, FutureRecord, Producer};
use rdkafka::util::Timeout;
use rdkafka::ClientConfig;
use tokio::task::JoinSet;
use tracing::log::{debug, error, info};
use tracing::{info_span, instrument, Instrument};
Expand All @@ -15,38 +14,9 @@ use crate::api::CaptureError;
use crate::config::KafkaConfig;
use crate::event::ProcessedEvent;
use crate::health::HealthHandle;
use crate::partition_limits::PartitionLimiter;
use crate::limiters::partition_limits::PartitionLimiter;
use crate::prometheus::report_dropped_events;

#[async_trait]
pub trait EventSink {
async fn send(&self, event: ProcessedEvent) -> Result<(), CaptureError>;
async fn send_batch(&self, events: Vec<ProcessedEvent>) -> Result<(), CaptureError>;
}

pub struct PrintSink {}

#[async_trait]
impl EventSink for PrintSink {
async fn send(&self, event: ProcessedEvent) -> Result<(), CaptureError> {
info!("single event: {:?}", event);
counter!("capture_events_ingested_total", 1);

Ok(())
}
async fn send_batch(&self, events: Vec<ProcessedEvent>) -> Result<(), CaptureError> {
let span = tracing::span!(tracing::Level::INFO, "batch of events");
let _enter = span.enter();

histogram!("capture_event_batch_size", events.len() as f64);
counter!("capture_events_ingested_total", events.len() as u64);
for event in events {
info!("event: {:?}", event);
}

Ok(())
}
}
use crate::sinks::EventSink;

struct KafkaContext {
liveness: HealthHandle,
Expand Down Expand Up @@ -294,8 +264,9 @@ mod tests {
use crate::config;
use crate::event::ProcessedEvent;
use crate::health::HealthRegistry;
use crate::partition_limits::PartitionLimiter;
use crate::sink::{EventSink, KafkaSink};
use crate::limiters::partition_limits::PartitionLimiter;
use crate::sinks::kafka::KafkaSink;
use crate::sinks::EventSink;
use crate::utils::uuid_v7;
use rand::distributions::Alphanumeric;
use rand::Rng;
Expand Down
13 changes: 13 additions & 0 deletions capture/src/sinks/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
use async_trait::async_trait;

use crate::api::CaptureError;
use crate::event::ProcessedEvent;

pub mod kafka;
pub mod print;

#[async_trait]
pub trait EventSink {
async fn send(&self, event: ProcessedEvent) -> Result<(), CaptureError>;
async fn send_batch(&self, events: Vec<ProcessedEvent>) -> Result<(), CaptureError>;
}
31 changes: 31 additions & 0 deletions capture/src/sinks/print.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
use async_trait::async_trait;
use metrics::{counter, histogram};
use tracing::log::info;

use crate::api::CaptureError;
use crate::event::ProcessedEvent;
use crate::sinks::EventSink;

pub struct PrintSink {}

#[async_trait]
impl EventSink for PrintSink {
async fn send(&self, event: ProcessedEvent) -> Result<(), CaptureError> {
info!("single event: {:?}", event);
counter!("capture_events_ingested_total", 1);

Ok(())
}
async fn send_batch(&self, events: Vec<ProcessedEvent>) -> Result<(), CaptureError> {
let span = tracing::span!(tracing::Level::INFO, "batch of events");
let _enter = span.enter();

histogram!("capture_event_batch_size", events.len() as f64);
counter!("capture_events_ingested_total", events.len() as u64);
for event in events {
info!("event: {:?}", event);
}

Ok(())
}
}
4 changes: 2 additions & 2 deletions capture/tests/django_compat.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,12 @@ use axum_test_helper::TestClient;
use base64::engine::general_purpose;
use base64::Engine;
use capture::api::{CaptureError, CaptureResponse, CaptureResponseCode};
use capture::billing_limits::BillingLimiter;
use capture::event::ProcessedEvent;
use capture::health::HealthRegistry;
use capture::limiters::billing_limits::BillingLimiter;
use capture::redis::MockRedisClient;
use capture::router::router;
use capture::sink::EventSink;
use capture::sinks::EventSink;
use capture::time::TimeSource;
use serde::Deserialize;
use serde_json::{json, Value};
Expand Down