Skip to content
This repository has been archived by the owner on Feb 8, 2024. It is now read-only.

Commit

Permalink
tracing: add ack_wait spans (#60)
Browse files Browse the repository at this point in the history
  • Loading branch information
xvello authored Nov 28, 2023
1 parent 7056e80 commit cd4d639
Showing 1 changed file with 20 additions and 13 deletions.
33 changes: 20 additions & 13 deletions capture/src/sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@ use rdkafka::producer::future_producer::{FutureProducer, FutureRecord};
use rdkafka::producer::{DeliveryFuture, Producer};
use rdkafka::util::Timeout;
use tokio::task::JoinSet;
use tracing::instrument;
use tracing::log::{debug, error, info};
use tracing::{info_span, instrument, Instrument};

use crate::api::CaptureError;
use crate::config::KafkaConfig;
Expand Down Expand Up @@ -241,7 +241,9 @@ impl EventSink for KafkaSink {
let ack =
Self::kafka_send(self.producer.clone(), self.topic.clone(), event, limited).await?;
histogram!("capture_event_batch_size", 1.0);
Self::process_ack(ack).await
Self::process_ack(ack)
.instrument(info_span!("ack_wait_one"))
.await
}

#[instrument(skip_all)]
Expand All @@ -261,20 +263,25 @@ impl EventSink for KafkaSink {
}

// Await on all the produce promises, fail batch on first failure
while let Some(res) = set.join_next().await {
match res {
Ok(Ok(_)) => {}
Ok(Err(err)) => {
set.abort_all();
return Err(err);
}
Err(err) => {
set.abort_all();
error!("join error while waiting on Kafka ACK: {:?}", err);
return Err(CaptureError::RetryableSinkError);
async move {
while let Some(res) = set.join_next().await {
match res {
Ok(Ok(_)) => {}
Ok(Err(err)) => {
set.abort_all();
return Err(err);
}
Err(err) => {
set.abort_all();
error!("join error while waiting on Kafka ACK: {:?}", err);
return Err(CaptureError::RetryableSinkError);
}
}
}
Ok(())
}
.instrument(info_span!("ack_wait_many"))
.await?;

histogram!("capture_event_batch_size", batch_size as f64);
Ok(())
Expand Down

0 comments on commit cd4d639

Please sign in to comment.