Skip to content
This repository has been archived by the owner on Feb 8, 2024. It is now read-only.

carve out the limiters and sinks sub-modules #70

Merged
merged 4 commits into from
Dec 13, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions capture-server/tests/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,8 @@ pub static DEFAULT_CONFIG: Lazy<Config> = Lazy::new(|| Config {
print_sink: false,
address: SocketAddr::from_str("127.0.0.1:0").unwrap(),
redis_url: "redis://localhost:6379/".to_string(),
burst_limit: NonZeroU32::new(5).unwrap(),
per_second_limit: NonZeroU32::new(10).unwrap(),
overflow_burst_limit: NonZeroU32::new(5).unwrap(),
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These two are not defined in the chart, safe to rename now

overflow_per_second_limit: NonZeroU32::new(10).unwrap(),
overflow_forced_keys: None,
kafka: KafkaConfig {
kafka_producer_linger_ms: 0, // Send messages as soon as possible
Expand Down
12 changes: 6 additions & 6 deletions capture-server/tests/events.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ async fn it_captures_a_batch() -> Result<()> {
}

#[tokio::test]
async fn it_is_limited_with_burst() -> Result<()> {
async fn it_overflows_events_on_burst() -> Result<()> {
setup_tracing();

let token = random_string("token", 16);
Expand All @@ -87,8 +87,8 @@ async fn it_is_limited_with_burst() -> Result<()> {

let mut config = DEFAULT_CONFIG.clone();
config.kafka.kafka_topic = topic.topic_name().to_string();
config.burst_limit = NonZeroU32::new(2).unwrap();
config.per_second_limit = NonZeroU32::new(1).unwrap();
config.overflow_burst_limit = NonZeroU32::new(2).unwrap();
config.overflow_per_second_limit = NonZeroU32::new(1).unwrap();

let server = ServerHandle::for_config(config);

Expand Down Expand Up @@ -125,7 +125,7 @@ async fn it_is_limited_with_burst() -> Result<()> {
}

#[tokio::test]
async fn it_does_not_partition_limit_different_ids() -> Result<()> {
async fn it_does_not_overflow_team_with_different_ids() -> Result<()> {
setup_tracing();

let token = random_string("token", 16);
Expand All @@ -136,8 +136,8 @@ async fn it_does_not_partition_limit_different_ids() -> Result<()> {

let mut config = DEFAULT_CONFIG.clone();
config.kafka.kafka_topic = topic.topic_name().to_string();
config.burst_limit = NonZeroU32::new(1).unwrap();
config.per_second_limit = NonZeroU32::new(1).unwrap();
config.overflow_burst_limit = NonZeroU32::new(1).unwrap();
config.overflow_per_second_limit = NonZeroU32::new(1).unwrap();

let server = ServerHandle::for_config(config);

Expand Down
6 changes: 3 additions & 3 deletions capture/src/capture.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,14 @@ use metrics::counter;
use time::OffsetDateTime;
use tracing::instrument;

use crate::billing_limits::QuotaResource;
use crate::event::{Compression, ProcessingContext};
use crate::limiters::billing::QuotaResource;
use crate::prometheus::report_dropped_events;
use crate::token::validate_token;
use crate::{
api::{CaptureError, CaptureResponse, CaptureResponseCode},
event::{EventFormData, EventQuery, ProcessedEvent, RawEvent},
router, sink,
router, sinks,
utils::uuid_v7,
};

Expand Down Expand Up @@ -209,7 +209,7 @@ pub fn extract_and_verify_token(events: &[RawEvent]) -> Result<String, CaptureEr

#[instrument(skip_all, fields(events = events.len()))]
pub async fn process_events<'a>(
sink: Arc<dyn sink::EventSink + Send + Sync>,
sink: Arc<dyn sinks::Event + Send + Sync>,
events: &'a [RawEvent],
context: &'a ProcessingContext,
) -> Result<(), CaptureError> {
Expand Down
4 changes: 2 additions & 2 deletions capture/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,10 @@ pub struct Config {
pub otel_url: Option<String>,

#[envconfig(default = "100")]
pub per_second_limit: NonZeroU32,
pub overflow_per_second_limit: NonZeroU32,

#[envconfig(default = "1000")]
pub burst_limit: NonZeroU32,
pub overflow_burst_limit: NonZeroU32,

pub overflow_forced_keys: Option<String>, // Coma-delimited keys

Expand Down
5 changes: 2 additions & 3 deletions capture/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,14 @@
pub mod api;
pub mod billing_limits;
pub mod capture;
pub mod config;
pub mod event;
pub mod health;
pub mod partition_limits;
pub mod limiters;
pub mod prometheus;
pub mod redis;
pub mod router;
pub mod server;
pub mod sink;
pub mod sinks;
pub mod time;
pub mod token;
pub mod utils;
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ mod tests {
use time::Duration;

use crate::{
billing_limits::{BillingLimiter, QuotaResource},
limiters::billing::{BillingLimiter, QuotaResource},
redis::MockRedisClient,
};

Expand Down
2 changes: 2 additions & 0 deletions capture/src/limiters/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
pub mod billing;
pub mod overflow;
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
/// When a customer is writing too often to the same key, we get hot partitions. This negatively
/// affects our write latency and cluster health. We try to provide ordering guarantees wherever
/// possible, but this does require that we map key -> partition.
/// The analytics ingestion pipeline provides ordering guarantees for events of the same
/// token and distinct_id. We currently achieve this through a locality constraint on the
/// Kafka partition (consistent partition hashing through a computed key).
///
/// If the write-rate reaches a certain amount, we need to be able to handle the hot partition
/// before it causes a negative impact. In this case, instead of passing the error to the customer
/// with a 429, we relax our ordering constraints and temporarily override the key, meaning the
/// customers data will be spread across all partitions.
/// Volume spikes to a given key can create lag on the destination partition and induce
/// ingestion lag. To protect the downstream systems, capture can relax this locality
/// constraint when bursts are detected. When that happens, the excess traffic will be
/// spread across all partitions and be processed by the overflow consumer, without
/// strict ordering guarantees.
use std::collections::HashSet;
use std::num::NonZeroU32;
use std::sync::Arc;
Expand All @@ -16,12 +17,12 @@ use rand::Rng;

// See: https://docs.rs/governor/latest/governor/_guide/index.html#usage-in-multiple-threads
#[derive(Clone)]
pub struct PartitionLimiter {
pub struct OverflowLimiter {
limiter: Arc<RateLimiter<String, DefaultKeyedStateStore<String>, clock::DefaultClock>>,
forced_keys: HashSet<String>,
}

impl PartitionLimiter {
impl OverflowLimiter {
pub fn new(per_second: NonZeroU32, burst: NonZeroU32, forced_keys: Option<String>) -> Self {
let quota = Quota::per_second(per_second).allow_burst(burst);
let limiter = Arc::new(governor::RateLimiter::dashmap(quota));
Expand All @@ -31,7 +32,7 @@ impl PartitionLimiter {
Some(values) => values.split(',').map(String::from).collect(),
};

PartitionLimiter {
OverflowLimiter {
limiter,
forced_keys,
}
Expand Down Expand Up @@ -71,12 +72,12 @@ impl PartitionLimiter {

#[cfg(test)]
mod tests {
use crate::partition_limits::PartitionLimiter;
use crate::limiters::overflow::OverflowLimiter;
use std::num::NonZeroU32;

#[tokio::test]
async fn low_limits() {
let limiter = PartitionLimiter::new(
let limiter = OverflowLimiter::new(
NonZeroU32::new(1).unwrap(),
NonZeroU32::new(1).unwrap(),
None,
Expand All @@ -89,7 +90,7 @@ mod tests {

#[tokio::test]
async fn bursting() {
let limiter = PartitionLimiter::new(
let limiter = OverflowLimiter::new(
NonZeroU32::new(1).unwrap(),
NonZeroU32::new(3).unwrap(),
None,
Expand All @@ -109,7 +110,7 @@ mod tests {
let key_three = String::from("three");
let forced_keys = Some(String::from("one,three"));

let limiter = PartitionLimiter::new(
let limiter = OverflowLimiter::new(
NonZeroU32::new(1).unwrap(),
NonZeroU32::new(1).unwrap(),
forced_keys,
Expand Down
6 changes: 3 additions & 3 deletions capture/src/router.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,13 @@ use tower_http::cors::{AllowHeaders, AllowOrigin, CorsLayer};
use tower_http::trace::TraceLayer;

use crate::health::HealthRegistry;
use crate::{billing_limits::BillingLimiter, capture, redis::Client, sink, time::TimeSource};
use crate::{capture, limiters::billing::BillingLimiter, redis::Client, sinks, time::TimeSource};

use crate::prometheus::{setup_metrics_recorder, track_metrics};

#[derive(Clone)]
pub struct State {
pub sink: Arc<dyn sink::EventSink + Send + Sync>,
pub sink: Arc<dyn sinks::Event + Send + Sync>,
pub timesource: Arc<dyn TimeSource + Send + Sync>,
pub redis: Arc<dyn Client + Send + Sync>,
pub billing: BillingLimiter,
Expand All @@ -28,7 +28,7 @@ async fn index() -> &'static str {

pub fn router<
TZ: TimeSource + Send + Sync + 'static,
S: sink::EventSink + Send + Sync + 'static,
S: sinks::Event + Send + Sync + 'static,
R: Client + Send + Sync + 'static,
>(
timesource: TZ,
Expand Down
22 changes: 10 additions & 12 deletions capture/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,14 @@ use std::sync::Arc;

use time::Duration;

use crate::billing_limits::BillingLimiter;
use crate::config::Config;
use crate::health::{ComponentStatus, HealthRegistry};
use crate::partition_limits::PartitionLimiter;
use crate::limiters::billing::BillingLimiter;
use crate::limiters::overflow::OverflowLimiter;
use crate::redis::RedisClient;
use crate::{router, sink};
use crate::router;
use crate::sinks::kafka::KafkaSink;
use crate::sinks::print::PrintSink;

pub async fn serve<F>(config: Config, listener: TcpListener, shutdown: F)
where
Expand All @@ -34,7 +36,7 @@ where
router::router(
crate::time::SystemTime {},
liveness,
sink::PrintSink {},
PrintSink {},
redis_client,
billing,
config.export_prometheus,
Expand All @@ -44,9 +46,9 @@ where
.register("rdkafka".to_string(), Duration::seconds(30))
.await;

let partition = PartitionLimiter::new(
config.per_second_limit,
config.burst_limit,
let partition = OverflowLimiter::new(
config.overflow_per_second_limit,
config.overflow_burst_limit,
config.overflow_forced_keys,
);
if config.export_prometheus {
Expand All @@ -55,18 +57,14 @@ where
partition.report_metrics().await;
});
}

{
// Ensure that the rate limiter state does not grow unbounded

let partition = partition.clone();

tokio::spawn(async move {
partition.clean_state().await;
});
}

let sink = sink::KafkaSink::new(config.kafka, sink_liveness, partition)
let sink = KafkaSink::new(config.kafka, sink_liveness, partition)
.expect("failed to start Kafka sink");

router::router(
Expand Down
51 changes: 11 additions & 40 deletions capture/src/sink.rs → capture/src/sinks/kafka.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,10 @@ use std::time::Duration;

use async_trait::async_trait;
use metrics::{absolute_counter, counter, gauge, histogram};
use rdkafka::config::ClientConfig;
use rdkafka::error::{KafkaError, RDKafkaErrorCode};
use rdkafka::producer::future_producer::{FutureProducer, FutureRecord};
use rdkafka::producer::{DeliveryFuture, Producer};
use rdkafka::producer::{DeliveryFuture, FutureProducer, FutureRecord, Producer};
use rdkafka::util::Timeout;
use rdkafka::ClientConfig;
use tokio::task::JoinSet;
use tracing::log::{debug, error, info};
use tracing::{info_span, instrument, Instrument};
Expand All @@ -15,38 +14,9 @@ use crate::api::CaptureError;
use crate::config::KafkaConfig;
use crate::event::ProcessedEvent;
use crate::health::HealthHandle;
use crate::partition_limits::PartitionLimiter;
use crate::limiters::overflow::OverflowLimiter;
use crate::prometheus::report_dropped_events;

#[async_trait]
pub trait EventSink {
async fn send(&self, event: ProcessedEvent) -> Result<(), CaptureError>;
async fn send_batch(&self, events: Vec<ProcessedEvent>) -> Result<(), CaptureError>;
}

pub struct PrintSink {}

#[async_trait]
impl EventSink for PrintSink {
async fn send(&self, event: ProcessedEvent) -> Result<(), CaptureError> {
info!("single event: {:?}", event);
counter!("capture_events_ingested_total", 1);

Ok(())
}
async fn send_batch(&self, events: Vec<ProcessedEvent>) -> Result<(), CaptureError> {
let span = tracing::span!(tracing::Level::INFO, "batch of events");
let _enter = span.enter();

histogram!("capture_event_batch_size", events.len() as f64);
counter!("capture_events_ingested_total", events.len() as u64);
for event in events {
info!("event: {:?}", event);
}

Ok(())
}
}
use crate::sinks::Event;

struct KafkaContext {
liveness: HealthHandle,
Expand Down Expand Up @@ -113,14 +83,14 @@ impl rdkafka::ClientContext for KafkaContext {
pub struct KafkaSink {
producer: FutureProducer<KafkaContext>,
topic: String,
partition: PartitionLimiter,
partition: OverflowLimiter,
}

impl KafkaSink {
pub fn new(
config: KafkaConfig,
liveness: HealthHandle,
partition: PartitionLimiter,
partition: OverflowLimiter,
) -> anyhow::Result<KafkaSink> {
info!("connecting to Kafka brokers at {}...", config.kafka_hosts);

Expand Down Expand Up @@ -234,7 +204,7 @@ impl KafkaSink {
}

#[async_trait]
impl EventSink for KafkaSink {
impl Event for KafkaSink {
#[instrument(skip_all)]
async fn send(&self, event: ProcessedEvent) -> Result<(), CaptureError> {
let limited = self.partition.is_limited(&event.key());
Expand Down Expand Up @@ -294,8 +264,9 @@ mod tests {
use crate::config;
use crate::event::ProcessedEvent;
use crate::health::HealthRegistry;
use crate::partition_limits::PartitionLimiter;
use crate::sink::{EventSink, KafkaSink};
use crate::limiters::overflow::OverflowLimiter;
use crate::sinks::kafka::KafkaSink;
use crate::sinks::Event;
use crate::utils::uuid_v7;
use rand::distributions::Alphanumeric;
use rand::Rng;
Expand All @@ -310,7 +281,7 @@ mod tests {
let handle = registry
.register("one".to_string(), Duration::seconds(30))
.await;
let limiter = PartitionLimiter::new(
let limiter = OverflowLimiter::new(
NonZeroU32::new(10).unwrap(),
NonZeroU32::new(10).unwrap(),
None,
Expand Down
13 changes: 13 additions & 0 deletions capture/src/sinks/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
use async_trait::async_trait;

use crate::api::CaptureError;
use crate::event::ProcessedEvent;

pub mod kafka;
pub mod print;

#[async_trait]
pub trait Event {
async fn send(&self, event: ProcessedEvent) -> Result<(), CaptureError>;
async fn send_batch(&self, events: Vec<ProcessedEvent>) -> Result<(), CaptureError>;
}
Loading