diff --git a/Cargo.lock b/Cargo.lock index 8e3b873..6bf02df 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1531,9 +1531,9 @@ dependencies = [ [[package]] name = "rdkafka" -version = "0.34.0" +version = "0.36.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "053adfa02fab06e86c01d586cc68aa47ee0ff4489a59469081dc12cbcde578bf" +checksum = "d54f02a5a40220f8a2dfa47ddb38ba9064475a5807a69504b6f91711df2eea63" dependencies = [ "futures-channel", "futures-util", @@ -1549,9 +1549,9 @@ dependencies = [ [[package]] name = "rdkafka-sys" -version = "4.6.0+2.2.0" +version = "4.7.0+2.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ad63c279fca41a27c231c450a2d2ad18288032e9cbb159ad16c9d96eba35aaaf" +checksum = "55e0d2f9ba6253f6ec72385e453294f8618e9e15c2c6aba2a5c01ccf9622d615" dependencies = [ "cmake", "libc", diff --git a/Cargo.toml b/Cargo.toml index 5a0d501..983cbf2 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,4 +1,6 @@ [workspace] +resolver = "2" + members = [ "capture", "capture-server" @@ -25,7 +27,7 @@ uuid = { version = "1.3.3", features = ["serde"] } async-trait = "0.1.68" serde_urlencoded = "0.7.1" rand = "0.8.5" -rdkafka = { version = "0.34", features = ["cmake-build", "ssl"] } +rdkafka = { version = "0.36.0", features = ["cmake-build", "ssl"] } metrics = "0.21.1" metrics-exporter-prometheus = "0.12.1" thiserror = "1.0.48" diff --git a/capture-server/tests/common.rs b/capture-server/tests/common.rs index c0ee9ba..d74b5bf 100644 --- a/capture-server/tests/common.rs +++ b/capture-server/tests/common.rs @@ -18,7 +18,8 @@ use rdkafka::consumer::{BaseConsumer, Consumer}; use rdkafka::util::Timeout; use rdkafka::{Message, TopicPartitionList}; use tokio::sync::Notify; -use tracing::debug; +use tokio::time::timeout; +use tracing::{debug, warn}; use capture::config::{Config, KafkaConfig}; use capture::server::serve; @@ -32,6 +33,7 @@ pub static DEFAULT_CONFIG: Lazy = Lazy::new(|| Config { kafka: KafkaConfig { kafka_producer_linger_ms: 0, // Send messages as soon as possible kafka_producer_queue_mib: 10, + kafka_message_timeout_ms: 10000, // 10s, ACKs can be slow on low volumes, should be tuned kafka_compression_codec: "none".to_string(), kafka_hosts: "kafka:9092".to_string(), kafka_topic: "events_plugin_ingestion".to_string(), @@ -174,9 +176,14 @@ impl EphemeralTopic { impl Drop for EphemeralTopic { fn drop(&mut self) { debug!("dropping EphemeralTopic {}...", self.topic_name); - _ = self.consumer.unassign(); - futures::executor::block_on(delete_topic(self.topic_name.clone())); - debug!("dropped topic"); + self.consumer.unsubscribe(); + match futures::executor::block_on(timeout( + Duration::from_secs(10), + delete_topic(self.topic_name.clone()), + )) { + Ok(_) => debug!("dropped topic"), + Err(err) => warn!("failed to drop topic: {}", err), + } } } diff --git a/capture/src/capture.rs b/capture/src/capture.rs index 6935ee3..da4a726 100644 --- a/capture/src/capture.rs +++ b/capture/src/capture.rs @@ -228,11 +228,10 @@ pub async fn process_events<'a>( tracing::debug!(events=?events, "processed {} events", events.len()); if events.len() == 1 { - sink.send(events[0].clone()).await?; + sink.send(events[0].clone()).await } else { - sink.send_batch(events).await?; + sink.send_batch(events).await } - Ok(()) } #[cfg(test)] diff --git a/capture/src/config.rs b/capture/src/config.rs index 8a471b3..d69d4a9 100644 --- a/capture/src/config.rs +++ b/capture/src/config.rs @@ -35,6 +35,8 @@ pub struct KafkaConfig { pub kafka_producer_linger_ms: u32, // Maximum time between producer batches during low traffic #[envconfig(default = "400")] pub kafka_producer_queue_mib: u32, // Size of the in-memory producer queue in mebibytes + #[envconfig(default = "20000")] + pub kafka_message_timeout_ms: u32, // Time before we stop retrying producing a message: 20 seconds #[envconfig(default = "none")] pub kafka_compression_codec: String, // none, gzip, snappy, lz4, zstd pub kafka_hosts: String, diff --git a/capture/src/server.rs b/capture/src/server.rs index c84bd20..ad91509 100644 --- a/capture/src/server.rs +++ b/capture/src/server.rs @@ -45,7 +45,8 @@ where .await; let partition = PartitionLimiter::new(config.per_second_limit, config.burst_limit); - let sink = sink::KafkaSink::new(config.kafka, sink_liveness, partition).unwrap(); + let sink = sink::KafkaSink::new(config.kafka, sink_liveness, partition) + .expect("failed to start Kafka sink"); router::router( crate::time::SystemTime {}, diff --git a/capture/src/sink.rs b/capture/src/sink.rs index 9d915b0..c1d291a 100644 --- a/capture/src/sink.rs +++ b/capture/src/sink.rs @@ -3,12 +3,13 @@ use std::time::Duration; use async_trait::async_trait; use metrics::{absolute_counter, counter, gauge, histogram}; use rdkafka::config::ClientConfig; -use rdkafka::error::RDKafkaErrorCode; +use rdkafka::error::{KafkaError, RDKafkaErrorCode}; use rdkafka::producer::future_producer::{FutureProducer, FutureRecord}; -use rdkafka::producer::Producer; +use rdkafka::producer::{DeliveryFuture, Producer}; use rdkafka::util::Timeout; use tokio::task::JoinSet; -use tracing::{debug, info, instrument}; +use tracing::instrument; +use tracing::log::{debug, error, info}; use crate::api::CaptureError; use crate::config::KafkaConfig; @@ -128,6 +129,10 @@ impl KafkaSink { .set("bootstrap.servers", &config.kafka_hosts) .set("statistics.interval.ms", "10000") .set("linger.ms", config.kafka_producer_linger_ms.to_string()) + .set( + "message.timeout.ms", + config.kafka_message_timeout_ms.to_string(), + ) .set("compression.codec", config.kafka_compression_codec) .set( "queue.buffering.max.kbytes", @@ -157,17 +162,20 @@ impl KafkaSink { topic: config.kafka_topic, }) } -} -impl KafkaSink { + pub fn flush(&self) -> Result<(), KafkaError> { + // TODO: hook it up on shutdown + self.producer.flush(Duration::new(30, 0)) + } + async fn kafka_send( producer: FutureProducer, topic: String, event: ProcessedEvent, limited: bool, - ) -> Result<(), CaptureError> { + ) -> Result { let payload = serde_json::to_string(&event).map_err(|e| { - tracing::error!("failed to serialize event: {}", e); + error!("failed to serialize event: {}", e); CaptureError::NonRetryableSinkError })?; @@ -182,7 +190,7 @@ impl KafkaSink { timestamp: None, headers: None, }) { - Ok(_) => Ok(()), + Ok(ack) => Ok(ack), Err((e, _)) => match e.rdkafka_error_code() { Some(RDKafkaErrorCode::InvalidMessageSize) => { report_dropped_events("kafka_message_size", 1); @@ -191,12 +199,38 @@ impl KafkaSink { _ => { // TODO(maybe someday): Don't drop them but write them somewhere and try again report_dropped_events("kafka_write_error", 1); - tracing::error!("failed to produce event: {}", e); + error!("failed to produce event: {}", e); Err(CaptureError::RetryableSinkError) } }, } } + + async fn process_ack(delivery: DeliveryFuture) -> Result<(), CaptureError> { + match delivery.await { + Err(_) => { + // Cancelled due to timeout while retrying + counter!("capture_kafka_produce_errors_total", 1); + error!("failed to produce to Kafka before write timeout"); + Err(CaptureError::RetryableSinkError) + } + Ok(Err((KafkaError::MessageProduction(RDKafkaErrorCode::MessageSizeTooLarge), _))) => { + // Rejected by broker due to message size + report_dropped_events("kafka_message_size", 1); + Err(CaptureError::EventTooBig) + } + Ok(Err((err, _))) => { + // Unretriable produce error + counter!("capture_kafka_produce_errors_total", 1); + error!("failed to produce to Kafka: {}", err); + Err(CaptureError::RetryableSinkError) + } + Ok(Ok(_)) => { + counter!("capture_events_ingested_total", 1); + Ok(()) + } + } + } } #[async_trait] @@ -204,12 +238,10 @@ impl EventSink for KafkaSink { #[instrument(skip_all)] async fn send(&self, event: ProcessedEvent) -> Result<(), CaptureError> { let limited = self.partition.is_limited(&event.key()); - Self::kafka_send(self.producer.clone(), self.topic.clone(), event, limited).await?; - + let ack = + Self::kafka_send(self.producer.clone(), self.topic.clone(), event, limited).await?; histogram!("capture_event_batch_size", 1.0); - counter!("capture_events_ingested_total", 1); - - Ok(()) + Self::process_ack(ack).await } #[instrument(skip_all)] @@ -219,16 +251,148 @@ impl EventSink for KafkaSink { for event in events { let producer = self.producer.clone(); let topic = self.topic.clone(); - let limited = self.partition.is_limited(&event.key()); - set.spawn(Self::kafka_send(producer, topic, event, limited)); + + // We await kafka_send to get events in the producer queue sequentially + let ack = Self::kafka_send(producer, topic, event, limited).await?; + + // Then stash the returned DeliveryFuture, waiting concurrently for the write ACKs from brokers. + set.spawn(Self::process_ack(ack)); } - // Await on all the produce promises - while (set.join_next().await).is_some() {} + // Await on all the produce promises, fail batch on first failure + while let Some(res) = set.join_next().await { + match res { + Ok(Ok(_)) => {} + Ok(Err(err)) => { + set.abort_all(); + return Err(err); + } + Err(err) => { + set.abort_all(); + error!("join error while waiting on Kafka ACK: {:?}", err); + return Err(CaptureError::RetryableSinkError); + } + } + } histogram!("capture_event_batch_size", batch_size as f64); - counter!("capture_events_ingested_total", batch_size as u64); Ok(()) } } + +#[cfg(test)] +mod tests { + use crate::api::CaptureError; + use crate::config; + use crate::event::ProcessedEvent; + use crate::health::HealthRegistry; + use crate::partition_limits::PartitionLimiter; + use crate::sink::{EventSink, KafkaSink}; + use crate::utils::uuid_v7; + use rdkafka::mocking::MockCluster; + use rdkafka::producer::DefaultProducerContext; + use rdkafka::types::{RDKafkaApiKey, RDKafkaRespErr}; + use std::num::NonZeroU32; + use time::Duration; + + async fn start_on_mocked_sink() -> (MockCluster<'static, DefaultProducerContext>, KafkaSink) { + let registry = HealthRegistry::new("liveness"); + let handle = registry + .register("one".to_string(), Duration::seconds(30)) + .await; + let limiter = + PartitionLimiter::new(NonZeroU32::new(10).unwrap(), NonZeroU32::new(10).unwrap()); + let cluster = MockCluster::new(1).expect("failed to create mock brokers"); + let config = config::KafkaConfig { + kafka_producer_linger_ms: 0, + kafka_producer_queue_mib: 50, + kafka_message_timeout_ms: 500, + kafka_compression_codec: "none".to_string(), + kafka_hosts: cluster.bootstrap_servers(), + kafka_topic: "events_plugin_ingestion".to_string(), + kafka_tls: false, + }; + let sink = KafkaSink::new(config, handle, limiter).expect("failed to create sink"); + (cluster, sink) + } + + #[tokio::test] + async fn kafka_sink_error_handling() { + // Uses a mocked Kafka broker that allows injecting write errors, to check error handling. + // We test different cases in a single test to amortize the startup cost of the producer. + + let (cluster, sink) = start_on_mocked_sink().await; + let event: ProcessedEvent = ProcessedEvent { + uuid: uuid_v7(), + distinct_id: "id1".to_string(), + ip: "".to_string(), + data: "".to_string(), + now: "".to_string(), + sent_at: None, + token: "token1".to_string(), + }; + + // Wait for producer to be healthy, to keep kafka_message_timeout_ms short and tests faster + for _ in 0..20 { + if sink.send(event.clone()).await.is_ok() { + break; + } + } + + // Send events to confirm happy path + sink.send(event.clone()) + .await + .expect("failed to send one initial event"); + sink.send_batch(vec![event.clone(), event.clone()]) + .await + .expect("failed to send initial event batch"); + + // Simulate unretriable errors + cluster.clear_request_errors(RDKafkaApiKey::Produce); + let err = [RDKafkaRespErr::RD_KAFKA_RESP_ERR_MSG_SIZE_TOO_LARGE; 1]; + cluster.request_errors(RDKafkaApiKey::Produce, &err); + match sink.send(event.clone()).await { + Err(CaptureError::EventTooBig) => {} // Expected + Err(err) => panic!("wrong error code {}", err), + Ok(()) => panic!("should have errored"), + }; + cluster.clear_request_errors(RDKafkaApiKey::Produce); + let err = [RDKafkaRespErr::RD_KAFKA_RESP_ERR_INVALID_PARTITIONS; 1]; + cluster.request_errors(RDKafkaApiKey::Produce, &err); + match sink.send_batch(vec![event.clone(), event.clone()]).await { + Err(CaptureError::RetryableSinkError) => {} // Expected + Err(err) => panic!("wrong error code {}", err), + Ok(()) => panic!("should have errored"), + }; + + // Simulate transient errors, messages should go through OK + cluster.clear_request_errors(RDKafkaApiKey::Produce); + let err = [RDKafkaRespErr::RD_KAFKA_RESP_ERR_BROKER_NOT_AVAILABLE; 2]; + cluster.request_errors(RDKafkaApiKey::Produce, &err); + sink.send(event.clone()) + .await + .expect("failed to send one event after recovery"); + cluster.clear_request_errors(RDKafkaApiKey::Produce); + let err = [RDKafkaRespErr::RD_KAFKA_RESP_ERR_BROKER_NOT_AVAILABLE; 2]; + cluster.request_errors(RDKafkaApiKey::Produce, &err); + sink.send_batch(vec![event.clone(), event.clone()]) + .await + .expect("failed to send event batch after recovery"); + + // Timeout on a sustained transient error + cluster.clear_request_errors(RDKafkaApiKey::Produce); + let err = [RDKafkaRespErr::RD_KAFKA_RESP_ERR_BROKER_NOT_AVAILABLE; 50]; + cluster.request_errors(RDKafkaApiKey::Produce, &err); + match sink.send(event.clone()).await { + Err(CaptureError::RetryableSinkError) => {} // Expected + Err(err) => panic!("wrong error code {}", err), + Ok(()) => panic!("should have errored"), + }; + match sink.send_batch(vec![event.clone(), event.clone()]).await { + Err(CaptureError::RetryableSinkError) => {} // Expected + Err(err) => panic!("wrong error code {}", err), + Ok(()) => panic!("should have errored"), + }; + } +}