diff --git a/capture/src/sink.rs b/capture/src/sink.rs index 93ba6c5..13397dc 100644 --- a/capture/src/sink.rs +++ b/capture/src/sink.rs @@ -8,8 +8,8 @@ use rdkafka::producer::future_producer::{FutureProducer, FutureRecord}; use rdkafka::producer::{DeliveryFuture, Producer}; use rdkafka::util::Timeout; use tokio::task::JoinSet; -use tracing::instrument; use tracing::log::{debug, error, info}; +use tracing::{info_span, instrument, Instrument}; use crate::api::CaptureError; use crate::config::KafkaConfig; @@ -241,7 +241,9 @@ impl EventSink for KafkaSink { let ack = Self::kafka_send(self.producer.clone(), self.topic.clone(), event, limited).await?; histogram!("capture_event_batch_size", 1.0); - Self::process_ack(ack).await + Self::process_ack(ack) + .instrument(info_span!("ack_wait_one")) + .await } #[instrument(skip_all)] @@ -261,20 +263,25 @@ impl EventSink for KafkaSink { } // Await on all the produce promises, fail batch on first failure - while let Some(res) = set.join_next().await { - match res { - Ok(Ok(_)) => {} - Ok(Err(err)) => { - set.abort_all(); - return Err(err); - } - Err(err) => { - set.abort_all(); - error!("join error while waiting on Kafka ACK: {:?}", err); - return Err(CaptureError::RetryableSinkError); + async move { + while let Some(res) = set.join_next().await { + match res { + Ok(Ok(_)) => {} + Ok(Err(err)) => { + set.abort_all(); + return Err(err); + } + Err(err) => { + set.abort_all(); + error!("join error while waiting on Kafka ACK: {:?}", err); + return Err(CaptureError::RetryableSinkError); + } } } + Ok(()) } + .instrument(info_span!("ack_wait_many")) + .await?; histogram!("capture_event_batch_size", batch_size as f64); Ok(())