Skip to content

Commit

Permalink
Better logs when doc processing fails. (#4323)
Browse files Browse the repository at this point in the history
* Better logs when doc processing fails.

* Small log improvement.
  • Loading branch information
fmassot authored Dec 26, 2023
1 parent 2a01fe8 commit 4e89425
Showing 1 changed file with 63 additions and 40 deletions.
103 changes: 63 additions & 40 deletions quickwit/quickwit-indexing/src/actors/doc_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ use serde::Serialize;
use serde_json::Value as JsonValue;
use tantivy::schema::{Field, Value};
use tantivy::{DateTime, TantivyDocument};
use thiserror::Error;
use tokio::runtime::Handle;
use tracing::warn;

Expand Down Expand Up @@ -66,7 +67,9 @@ impl JsonDoc {
) -> Result<Self, DocProcessorError> {
match json_value {
JsonValue::Object(json_obj) => Ok(Self::new(json_obj, num_bytes)),
_ => Err(DocProcessorError::Parse),
_ => Err(DocProcessorError::Parsing(
"document must be a JSON object".to_string(),
)),
}
}

Expand All @@ -77,23 +80,40 @@ impl JsonDoc {
}
}

#[derive(Debug)]
#[derive(Error, Debug)]
pub enum DocProcessorError {
Parse,
Schema,
#[error("doc mapper parsing error: {0}")]
DocMapperParsing(DocParsingError),
#[error("OLTP trace parsing error: {0}")]
OltpTraceParsing(OtlpTraceError),
#[error("doc parsing error: {0}")]
Parsing(String),
#[cfg(feature = "vrl")]
#[error("VRL transform error: {0}")]
Transform(VrlTerminate),
}

impl From<OtlpTraceError> for DocProcessorError {
fn from(error: OtlpTraceError) -> Self {
DocProcessorError::OltpTraceParsing(error)
}
}

impl From<DocParsingError> for DocProcessorError {
fn from(error: DocParsingError) -> Self {
DocProcessorError::DocMapperParsing(error)
}
}

impl From<serde_json::Error> for DocProcessorError {
fn from(_error: serde_json::Error) -> Self {
DocProcessorError::Parse
fn from(error: serde_json::Error) -> Self {
DocProcessorError::Parsing(error.to_string())
}
}

impl From<FromUtf8Error> for DocProcessorError {
fn from(_error: FromUtf8Error) -> Self {
DocProcessorError::Parse
fn from(error: FromUtf8Error) -> Self {
DocProcessorError::Parsing(error.to_string())
}
}

Expand Down Expand Up @@ -211,7 +231,7 @@ impl From<Result<JsonSpanIterator, OtlpTraceError>> for JsonDocIterator {
fn from(result: Result<JsonSpanIterator, OtlpTraceError>) -> Self {
match result {
Ok(json_doc) => Self::Spans(json_doc),
Err(_) => Self::One(Some(Err(DocProcessorError::Parse))),
Err(error) => Self::One(Some(Err(DocProcessorError::from(error)))),
}
}
}
Expand All @@ -226,9 +246,9 @@ pub struct DocProcessorCounters {
/// - number of docs that could not be transformed.
/// - number of docs for which the doc mapper returnd an error.
/// - number of valid docs.
pub num_parse_errors: AtomicU64,
pub num_doc_parsing_errors: AtomicU64,
pub num_transform_errors: AtomicU64,
pub num_schema_errors: AtomicU64,
pub num_oltp_trace_errors: AtomicU64,
pub num_valid_docs: AtomicU64,

/// Number of bytes that went through the indexer
Expand All @@ -243,9 +263,9 @@ impl DocProcessorCounters {
Self {
index_id,
source_id,
num_parse_errors: Default::default(),
num_doc_parsing_errors: Default::default(),
num_transform_errors: Default::default(),
num_schema_errors: Default::default(),
num_oltp_trace_errors: Default::default(),
num_valid_docs: Default::default(),
num_bytes_total: Default::default(),
}
Expand All @@ -254,17 +274,17 @@ impl DocProcessorCounters {
/// Returns the overall number of docs that went through the indexer (valid or not).
pub fn num_processed_docs(&self) -> u64 {
self.num_valid_docs.load(Ordering::Relaxed)
+ self.num_parse_errors.load(Ordering::Relaxed)
+ self.num_schema_errors.load(Ordering::Relaxed)
+ self.num_doc_parsing_errors.load(Ordering::Relaxed)
+ self.num_oltp_trace_errors.load(Ordering::Relaxed)
+ self.num_transform_errors.load(Ordering::Relaxed)
}

/// Returns the overall number of docs that were sent to the indexer but were invalid.
/// (For instance, because they were missing a required field or because their because
/// their format was invalid)
pub fn num_invalid_docs(&self) -> u64 {
self.num_parse_errors.load(Ordering::Relaxed)
+ self.num_schema_errors.load(Ordering::Relaxed)
self.num_doc_parsing_errors.load(Ordering::Relaxed)
+ self.num_oltp_trace_errors.load(Ordering::Relaxed)
+ self.num_transform_errors.load(Ordering::Relaxed)
}

Expand All @@ -284,13 +304,17 @@ impl DocProcessorCounters {

pub fn record_error(&self, error: DocProcessorError, num_bytes: u64) {
let label = match error {
DocProcessorError::Parse => {
self.num_parse_errors.fetch_add(1, Ordering::Relaxed);
"parse_error"
DocProcessorError::DocMapperParsing(_) => {
self.num_doc_parsing_errors.fetch_add(1, Ordering::Relaxed);
"doc_mapper_error"
}
DocProcessorError::Schema => {
self.num_schema_errors.fetch_add(1, Ordering::Relaxed);
"schema_error"
DocProcessorError::OltpTraceParsing(_) => {
self.num_oltp_trace_errors.fetch_add(1, Ordering::Relaxed);
"otlp_trace_parsing_error"
}
DocProcessorError::Parsing(_) => {
self.num_doc_parsing_errors.fetch_add(1, Ordering::Relaxed);
"parsing_error"
}
#[cfg(feature = "vrl")]
DocProcessorError::Transform(_) => {
Expand Down Expand Up @@ -365,7 +389,9 @@ impl DocProcessor {
let timestamp = doc
.get_first(timestamp_field)
.and_then(|val| val.as_datetime())
.ok_or(DocProcessorError::Schema)?;
.ok_or(DocProcessorError::from(DocParsingError::RequiredField(
"timestamp field is required".to_string(),
)))?;
Ok(Some(timestamp))
}

Expand All @@ -387,6 +413,12 @@ impl DocProcessor {
processed_docs.push(processed_doc);
}
Err(error) => {
warn!(
index_id = self.counters.index_id,
source_id = self.counters.source_id,
"{}",
error
);
self.counters.record_error(error, num_bytes as u64);
}
}
Expand All @@ -396,16 +428,7 @@ impl DocProcessor {
fn process_json_doc(&self, json_doc: JsonDoc) -> Result<ProcessedDoc, DocProcessorError> {
let num_bytes = json_doc.num_bytes;

let (partition, doc) = self
.doc_mapper
.doc_from_json_obj(json_doc.json_obj)
.map_err(|error| {
warn!(index_id=self.counters.index_id, source_id=self.counters.source_id, error=?error);
match error {
DocParsingError::RequiredField(_) => DocProcessorError::Schema,
_ => DocProcessorError::Parse,
}
})?;
let (partition, doc) = self.doc_mapper.doc_from_json_obj(json_doc.json_obj)?;
let timestamp_opt = self.extract_timestamp(&doc)?;
Ok(ProcessedDoc {
doc,
Expand Down Expand Up @@ -585,9 +608,9 @@ mod tests {
.state;
assert_eq!(counters.index_id, index_id);
assert_eq!(counters.source_id, source_id);
assert_eq!(counters.num_parse_errors.load(Ordering::Relaxed), 1);
assert_eq!(counters.num_doc_parsing_errors.load(Ordering::Relaxed), 2);
assert_eq!(counters.num_transform_errors.load(Ordering::Relaxed), 0);
assert_eq!(counters.num_schema_errors.load(Ordering::Relaxed), 1);
assert_eq!(counters.num_oltp_trace_errors.load(Ordering::Relaxed), 0);
assert_eq!(counters.num_valid_docs.load(Ordering::Relaxed), 2);
assert_eq!(counters.num_bytes_total.load(Ordering::Relaxed), 387);

Expand Down Expand Up @@ -978,9 +1001,9 @@ mod tests_vrl {
.state;
assert_eq!(counters.index_id, index_id.to_string());
assert_eq!(counters.source_id, source_id.to_string());
assert_eq!(counters.num_parse_errors.load(Ordering::Relaxed), 1);
assert_eq!(counters.num_doc_parsing_errors.load(Ordering::Relaxed), 1);
assert_eq!(counters.num_transform_errors.load(Ordering::Relaxed), 0);
assert_eq!(counters.num_schema_errors.load(Ordering::Relaxed), 1);
assert_eq!(counters.num_oltp_trace_errors.load(Ordering::Relaxed), 1);
assert_eq!(counters.num_valid_docs.load(Ordering::Relaxed), 2);
assert_eq!(counters.num_bytes_total.load(Ordering::Relaxed), 397);

Expand Down Expand Up @@ -1068,9 +1091,9 @@ mod tests_vrl {
.state;
assert_eq!(counters.index_id, index_id);
assert_eq!(counters.source_id, source_id);
assert_eq!(counters.num_parse_errors.load(Ordering::Relaxed), 0,);
assert_eq!(counters.num_doc_parsing_errors.load(Ordering::Relaxed), 0,);
assert_eq!(counters.num_transform_errors.load(Ordering::Relaxed), 1,);
assert_eq!(counters.num_schema_errors.load(Ordering::Relaxed), 0,);
assert_eq!(counters.num_oltp_trace_errors.load(Ordering::Relaxed), 0,);
assert_eq!(counters.num_valid_docs.load(Ordering::Relaxed), 2,);
assert_eq!(counters.num_bytes_total.load(Ordering::Relaxed), 200,);

Expand Down

0 comments on commit 4e89425

Please sign in to comment.