Skip to content
This repository has been archived by the owner on Feb 8, 2024. It is now read-only.

Commit

Permalink
properly process Kafka produce ACKs to propagate errors (#59)
Browse files Browse the repository at this point in the history
  • Loading branch information
xvello authored Nov 27, 2023
1 parent f8a07fb commit 7228307
Show file tree
Hide file tree
Showing 7 changed files with 207 additions and 32 deletions.
8 changes: 4 additions & 4 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 3 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
[workspace]
resolver = "2"

members = [
"capture",
"capture-server"
Expand All @@ -25,7 +27,7 @@ uuid = { version = "1.3.3", features = ["serde"] }
async-trait = "0.1.68"
serde_urlencoded = "0.7.1"
rand = "0.8.5"
rdkafka = { version = "0.34", features = ["cmake-build", "ssl"] }
rdkafka = { version = "0.36.0", features = ["cmake-build", "ssl"] }
metrics = "0.21.1"
metrics-exporter-prometheus = "0.12.1"
thiserror = "1.0.48"
Expand Down
15 changes: 11 additions & 4 deletions capture-server/tests/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@ use rdkafka::consumer::{BaseConsumer, Consumer};
use rdkafka::util::Timeout;
use rdkafka::{Message, TopicPartitionList};
use tokio::sync::Notify;
use tracing::debug;
use tokio::time::timeout;
use tracing::{debug, warn};

use capture::config::{Config, KafkaConfig};
use capture::server::serve;
Expand All @@ -32,6 +33,7 @@ pub static DEFAULT_CONFIG: Lazy<Config> = Lazy::new(|| Config {
kafka: KafkaConfig {
kafka_producer_linger_ms: 0, // Send messages as soon as possible
kafka_producer_queue_mib: 10,
kafka_message_timeout_ms: 10000, // 10s, ACKs can be slow on low volumes, should be tuned
kafka_compression_codec: "none".to_string(),
kafka_hosts: "kafka:9092".to_string(),
kafka_topic: "events_plugin_ingestion".to_string(),
Expand Down Expand Up @@ -174,9 +176,14 @@ impl EphemeralTopic {
impl Drop for EphemeralTopic {
fn drop(&mut self) {
debug!("dropping EphemeralTopic {}...", self.topic_name);
_ = self.consumer.unassign();
futures::executor::block_on(delete_topic(self.topic_name.clone()));
debug!("dropped topic");
self.consumer.unsubscribe();
match futures::executor::block_on(timeout(
Duration::from_secs(10),
delete_topic(self.topic_name.clone()),
)) {
Ok(_) => debug!("dropped topic"),
Err(err) => warn!("failed to drop topic: {}", err),
}
}
}

Expand Down
5 changes: 2 additions & 3 deletions capture/src/capture.rs
Original file line number Diff line number Diff line change
Expand Up @@ -228,11 +228,10 @@ pub async fn process_events<'a>(
tracing::debug!(events=?events, "processed {} events", events.len());

if events.len() == 1 {
sink.send(events[0].clone()).await?;
sink.send(events[0].clone()).await
} else {
sink.send_batch(events).await?;
sink.send_batch(events).await
}
Ok(())
}

#[cfg(test)]
Expand Down
2 changes: 2 additions & 0 deletions capture/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ pub struct KafkaConfig {
pub kafka_producer_linger_ms: u32, // Maximum time between producer batches during low traffic
#[envconfig(default = "400")]
pub kafka_producer_queue_mib: u32, // Size of the in-memory producer queue in mebibytes
#[envconfig(default = "20000")]
pub kafka_message_timeout_ms: u32, // Time before we stop retrying producing a message: 20 seconds
#[envconfig(default = "none")]
pub kafka_compression_codec: String, // none, gzip, snappy, lz4, zstd
pub kafka_hosts: String,
Expand Down
3 changes: 2 additions & 1 deletion capture/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,8 @@ where
.await;

let partition = PartitionLimiter::new(config.per_second_limit, config.burst_limit);
let sink = sink::KafkaSink::new(config.kafka, sink_liveness, partition).unwrap();
let sink = sink::KafkaSink::new(config.kafka, sink_liveness, partition)
.expect("failed to start Kafka sink");

router::router(
crate::time::SystemTime {},
Expand Down
202 changes: 183 additions & 19 deletions capture/src/sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,13 @@ use std::time::Duration;
use async_trait::async_trait;
use metrics::{absolute_counter, counter, gauge, histogram};
use rdkafka::config::ClientConfig;
use rdkafka::error::RDKafkaErrorCode;
use rdkafka::error::{KafkaError, RDKafkaErrorCode};
use rdkafka::producer::future_producer::{FutureProducer, FutureRecord};
use rdkafka::producer::Producer;
use rdkafka::producer::{DeliveryFuture, Producer};
use rdkafka::util::Timeout;
use tokio::task::JoinSet;
use tracing::{debug, info, instrument};
use tracing::instrument;
use tracing::log::{debug, error, info};

use crate::api::CaptureError;
use crate::config::KafkaConfig;
Expand Down Expand Up @@ -128,6 +129,10 @@ impl KafkaSink {
.set("bootstrap.servers", &config.kafka_hosts)
.set("statistics.interval.ms", "10000")
.set("linger.ms", config.kafka_producer_linger_ms.to_string())
.set(
"message.timeout.ms",
config.kafka_message_timeout_ms.to_string(),
)
.set("compression.codec", config.kafka_compression_codec)
.set(
"queue.buffering.max.kbytes",
Expand Down Expand Up @@ -157,17 +162,20 @@ impl KafkaSink {
topic: config.kafka_topic,
})
}
}

impl KafkaSink {
pub fn flush(&self) -> Result<(), KafkaError> {
// TODO: hook it up on shutdown
self.producer.flush(Duration::new(30, 0))
}

async fn kafka_send(
producer: FutureProducer<KafkaContext>,
topic: String,
event: ProcessedEvent,
limited: bool,
) -> Result<(), CaptureError> {
) -> Result<DeliveryFuture, CaptureError> {
let payload = serde_json::to_string(&event).map_err(|e| {
tracing::error!("failed to serialize event: {}", e);
error!("failed to serialize event: {}", e);
CaptureError::NonRetryableSinkError
})?;

Expand All @@ -182,7 +190,7 @@ impl KafkaSink {
timestamp: None,
headers: None,
}) {
Ok(_) => Ok(()),
Ok(ack) => Ok(ack),
Err((e, _)) => match e.rdkafka_error_code() {
Some(RDKafkaErrorCode::InvalidMessageSize) => {
report_dropped_events("kafka_message_size", 1);
Expand All @@ -191,25 +199,49 @@ impl KafkaSink {
_ => {
// TODO(maybe someday): Don't drop them but write them somewhere and try again
report_dropped_events("kafka_write_error", 1);
tracing::error!("failed to produce event: {}", e);
error!("failed to produce event: {}", e);
Err(CaptureError::RetryableSinkError)
}
},
}
}

async fn process_ack(delivery: DeliveryFuture) -> Result<(), CaptureError> {
match delivery.await {
Err(_) => {
// Cancelled due to timeout while retrying
counter!("capture_kafka_produce_errors_total", 1);
error!("failed to produce to Kafka before write timeout");
Err(CaptureError::RetryableSinkError)
}
Ok(Err((KafkaError::MessageProduction(RDKafkaErrorCode::MessageSizeTooLarge), _))) => {
// Rejected by broker due to message size
report_dropped_events("kafka_message_size", 1);
Err(CaptureError::EventTooBig)
}
Ok(Err((err, _))) => {
// Unretriable produce error
counter!("capture_kafka_produce_errors_total", 1);
error!("failed to produce to Kafka: {}", err);
Err(CaptureError::RetryableSinkError)
}
Ok(Ok(_)) => {
counter!("capture_events_ingested_total", 1);
Ok(())
}
}
}
}

#[async_trait]
impl EventSink for KafkaSink {
#[instrument(skip_all)]
async fn send(&self, event: ProcessedEvent) -> Result<(), CaptureError> {
let limited = self.partition.is_limited(&event.key());
Self::kafka_send(self.producer.clone(), self.topic.clone(), event, limited).await?;

let ack =
Self::kafka_send(self.producer.clone(), self.topic.clone(), event, limited).await?;
histogram!("capture_event_batch_size", 1.0);
counter!("capture_events_ingested_total", 1);

Ok(())
Self::process_ack(ack).await
}

#[instrument(skip_all)]
Expand All @@ -219,16 +251,148 @@ impl EventSink for KafkaSink {
for event in events {
let producer = self.producer.clone();
let topic = self.topic.clone();

let limited = self.partition.is_limited(&event.key());
set.spawn(Self::kafka_send(producer, topic, event, limited));

// We await kafka_send to get events in the producer queue sequentially
let ack = Self::kafka_send(producer, topic, event, limited).await?;

// Then stash the returned DeliveryFuture, waiting concurrently for the write ACKs from brokers.
set.spawn(Self::process_ack(ack));
}

// Await on all the produce promises
while (set.join_next().await).is_some() {}
// Await on all the produce promises, fail batch on first failure
while let Some(res) = set.join_next().await {
match res {
Ok(Ok(_)) => {}
Ok(Err(err)) => {
set.abort_all();
return Err(err);
}
Err(err) => {
set.abort_all();
error!("join error while waiting on Kafka ACK: {:?}", err);
return Err(CaptureError::RetryableSinkError);
}
}
}

histogram!("capture_event_batch_size", batch_size as f64);
counter!("capture_events_ingested_total", batch_size as u64);
Ok(())
}
}

#[cfg(test)]
mod tests {
use crate::api::CaptureError;
use crate::config;
use crate::event::ProcessedEvent;
use crate::health::HealthRegistry;
use crate::partition_limits::PartitionLimiter;
use crate::sink::{EventSink, KafkaSink};
use crate::utils::uuid_v7;
use rdkafka::mocking::MockCluster;
use rdkafka::producer::DefaultProducerContext;
use rdkafka::types::{RDKafkaApiKey, RDKafkaRespErr};
use std::num::NonZeroU32;
use time::Duration;

async fn start_on_mocked_sink() -> (MockCluster<'static, DefaultProducerContext>, KafkaSink) {
let registry = HealthRegistry::new("liveness");
let handle = registry
.register("one".to_string(), Duration::seconds(30))
.await;
let limiter =
PartitionLimiter::new(NonZeroU32::new(10).unwrap(), NonZeroU32::new(10).unwrap());
let cluster = MockCluster::new(1).expect("failed to create mock brokers");
let config = config::KafkaConfig {
kafka_producer_linger_ms: 0,
kafka_producer_queue_mib: 50,
kafka_message_timeout_ms: 500,
kafka_compression_codec: "none".to_string(),
kafka_hosts: cluster.bootstrap_servers(),
kafka_topic: "events_plugin_ingestion".to_string(),
kafka_tls: false,
};
let sink = KafkaSink::new(config, handle, limiter).expect("failed to create sink");
(cluster, sink)
}

#[tokio::test]
async fn kafka_sink_error_handling() {
// Uses a mocked Kafka broker that allows injecting write errors, to check error handling.
// We test different cases in a single test to amortize the startup cost of the producer.

let (cluster, sink) = start_on_mocked_sink().await;
let event: ProcessedEvent = ProcessedEvent {
uuid: uuid_v7(),
distinct_id: "id1".to_string(),
ip: "".to_string(),
data: "".to_string(),
now: "".to_string(),
sent_at: None,
token: "token1".to_string(),
};

// Wait for producer to be healthy, to keep kafka_message_timeout_ms short and tests faster
for _ in 0..20 {
if sink.send(event.clone()).await.is_ok() {
break;
}
}

// Send events to confirm happy path
sink.send(event.clone())
.await
.expect("failed to send one initial event");
sink.send_batch(vec![event.clone(), event.clone()])
.await
.expect("failed to send initial event batch");

// Simulate unretriable errors
cluster.clear_request_errors(RDKafkaApiKey::Produce);
let err = [RDKafkaRespErr::RD_KAFKA_RESP_ERR_MSG_SIZE_TOO_LARGE; 1];
cluster.request_errors(RDKafkaApiKey::Produce, &err);
match sink.send(event.clone()).await {
Err(CaptureError::EventTooBig) => {} // Expected
Err(err) => panic!("wrong error code {}", err),
Ok(()) => panic!("should have errored"),
};
cluster.clear_request_errors(RDKafkaApiKey::Produce);
let err = [RDKafkaRespErr::RD_KAFKA_RESP_ERR_INVALID_PARTITIONS; 1];
cluster.request_errors(RDKafkaApiKey::Produce, &err);
match sink.send_batch(vec![event.clone(), event.clone()]).await {
Err(CaptureError::RetryableSinkError) => {} // Expected
Err(err) => panic!("wrong error code {}", err),
Ok(()) => panic!("should have errored"),
};

// Simulate transient errors, messages should go through OK
cluster.clear_request_errors(RDKafkaApiKey::Produce);
let err = [RDKafkaRespErr::RD_KAFKA_RESP_ERR_BROKER_NOT_AVAILABLE; 2];
cluster.request_errors(RDKafkaApiKey::Produce, &err);
sink.send(event.clone())
.await
.expect("failed to send one event after recovery");
cluster.clear_request_errors(RDKafkaApiKey::Produce);
let err = [RDKafkaRespErr::RD_KAFKA_RESP_ERR_BROKER_NOT_AVAILABLE; 2];
cluster.request_errors(RDKafkaApiKey::Produce, &err);
sink.send_batch(vec![event.clone(), event.clone()])
.await
.expect("failed to send event batch after recovery");

// Timeout on a sustained transient error
cluster.clear_request_errors(RDKafkaApiKey::Produce);
let err = [RDKafkaRespErr::RD_KAFKA_RESP_ERR_BROKER_NOT_AVAILABLE; 50];
cluster.request_errors(RDKafkaApiKey::Produce, &err);
match sink.send(event.clone()).await {
Err(CaptureError::RetryableSinkError) => {} // Expected
Err(err) => panic!("wrong error code {}", err),
Ok(()) => panic!("should have errored"),
};
match sink.send_batch(vec![event.clone(), event.clone()]).await {
Err(CaptureError::RetryableSinkError) => {} // Expected
Err(err) => panic!("wrong error code {}", err),
Ok(()) => panic!("should have errored"),
};
}
}

0 comments on commit 7228307

Please sign in to comment.