From 24950c0063d24bb35ef0a7c1575837ff8c739519 Mon Sep 17 00:00:00 2001 From: Adrien Guillo Date: Thu, 28 Mar 2024 16:52:07 -0700 Subject: [PATCH] Parse OTLP logs in doc processor (#4807) * Rename `OtlpTrace*` to `OtlpTraces*` * Parse OTLP logs in doc processor --- .../tutorials/otel-traces/kafka-source.yaml | 2 +- .../src/rate_limited_tracing.rs | 2 +- .../quickwit-config/src/source_config/mod.rs | 14 +- .../src/source_config/serialize.rs | 5 +- .../src/actors/doc_processor.rs | 311 ++++++++++++++---- .../quickwit-opentelemetry/src/otlp/logs.rs | 294 +++++++++++------ .../quickwit-opentelemetry/src/otlp/mod.rs | 21 +- .../quickwit-opentelemetry/src/otlp/traces.rs | 20 +- 8 files changed, 479 insertions(+), 190 deletions(-) diff --git a/config/tutorials/otel-traces/kafka-source.yaml b/config/tutorials/otel-traces/kafka-source.yaml index 59f78dcf4fb..b1eb22ad153 100644 --- a/config/tutorials/otel-traces/kafka-source.yaml +++ b/config/tutorials/otel-traces/kafka-source.yaml @@ -1,7 +1,7 @@ version: 0.7 source_id: kafka-source source_type: kafka -input_format: otlp_trace_proto +input_format: otlp_traces_proto params: topic: otlp_spans client_params: diff --git a/quickwit/quickwit-common/src/rate_limited_tracing.rs b/quickwit/quickwit-common/src/rate_limited_tracing.rs index 0fab23af045..fbfd6acd31a 100644 --- a/quickwit/quickwit-common/src/rate_limited_tracing.rs +++ b/quickwit/quickwit-common/src/rate_limited_tracing.rs @@ -86,7 +86,7 @@ macro_rules! rate_limited_warn { } #[macro_export] macro_rules! rate_limited_error { - ($unit:literal=$limit:literal, $($args:tt)*) => { + ($unit:ident=$limit:literal, $($args:tt)*) => { $crate::rate_limited_tracing::rate_limited_tracing!(error, $unit=$limit, $($args)*) }; } diff --git a/quickwit/quickwit-config/src/source_config/mod.rs b/quickwit/quickwit-config/src/source_config/mod.rs index 2bec4be7935..9992729811f 100644 --- a/quickwit/quickwit-config/src/source_config/mod.rs +++ b/quickwit/quickwit-config/src/source_config/mod.rs @@ -182,9 +182,17 @@ impl TestableForRegression for SourceConfig { pub enum SourceInputFormat { #[default] Json, - OtlpTraceJson, - #[serde(alias = "otlp_trace_proto")] - OtlpTraceProtobuf, + OtlpLogsJson, + #[serde(alias = "otlp_logs_proto")] + OtlpLogsProtobuf, + #[serde(alias = "otlp_trace_json")] + OtlpTracesJson, + #[serde( + alias = "otlp_trace_proto", + alias = "otlp_trace_protobuf", + alias = "otlp_traces_proto" + )] + OtlpTracesProtobuf, #[serde(alias = "plain")] PlainText, } diff --git a/quickwit/quickwit-config/src/source_config/serialize.rs b/quickwit/quickwit-config/src/source_config/serialize.rs index 3aacf5bf77f..ae2c00bb414 100644 --- a/quickwit/quickwit-config/src/source_config/serialize.rs +++ b/quickwit/quickwit-config/src/source_config/serialize.rs @@ -110,7 +110,10 @@ impl SourceConfigForSerialization { if let Some(transform_config) = &self.transform { if matches!( self.input_format, - SourceInputFormat::OtlpTraceJson | SourceInputFormat::OtlpTraceProtobuf + SourceInputFormat::OtlpLogsJson + | SourceInputFormat::OtlpLogsProtobuf + | SourceInputFormat::OtlpTracesJson + | SourceInputFormat::OtlpTracesProtobuf ) { bail!("VRL transforms are not supported for OTLP input formats"); } diff --git a/quickwit/quickwit-indexing/src/actors/doc_processor.rs b/quickwit/quickwit-indexing/src/actors/doc_processor.rs index 5e4c424a8cf..6423c8e8c9a 100644 --- a/quickwit/quickwit-indexing/src/actors/doc_processor.rs +++ b/quickwit/quickwit-indexing/src/actors/doc_processor.rs @@ -30,7 +30,8 @@ use quickwit_common::runtimes::RuntimeType; use quickwit_config::{SourceInputFormat, TransformConfig}; use quickwit_doc_mapper::{DocMapper, DocParsingError, JsonObject}; use quickwit_opentelemetry::otlp::{ - parse_otlp_spans_json, parse_otlp_spans_protobuf, JsonSpanIterator, OtlpTraceError, + parse_otlp_logs_json, parse_otlp_logs_protobuf, parse_otlp_spans_json, + parse_otlp_spans_protobuf, JsonLogIterator, JsonSpanIterator, OtlpLogsError, OtlpTracesError, }; use serde::Serialize; use serde_json::Value as JsonValue; @@ -67,8 +68,8 @@ impl JsonDoc { ) -> Result { match json_value { JsonValue::Object(json_obj) => Ok(Self::new(json_obj, num_bytes)), - _ => Err(DocProcessorError::Parsing( - "document must be a JSON object".to_string(), + _ => Err(DocProcessorError::JsonParsing( + "document is not an object".to_string(), )), } } @@ -82,38 +83,46 @@ impl JsonDoc { #[derive(Error, Debug)] pub enum DocProcessorError { - #[error("doc mapper parsing error: {0}")] + #[error("doc mapper parse error: {0}")] DocMapperParsing(DocParsingError), - #[error("OLTP trace parsing error: {0}")] - OltpTraceParsing(OtlpTraceError), - #[error("doc parsing error: {0}")] - Parsing(String), + #[error("JSON parse error: {0}")] + JsonParsing(String), + #[error("OLTP log records parse error: {0}")] + OltpLogsParsing(OtlpLogsError), + #[error("OLTP traces parse error: {0}")] + OltpTracesParsing(OtlpTracesError), #[cfg(feature = "vrl")] #[error("VRL transform error: {0}")] Transform(VrlTerminate), } -impl From for DocProcessorError { - fn from(error: OtlpTraceError) -> Self { - DocProcessorError::OltpTraceParsing(error) +impl From for DocProcessorError { + fn from(error: OtlpLogsError) -> Self { + Self::OltpLogsParsing(error) + } +} + +impl From for DocProcessorError { + fn from(error: OtlpTracesError) -> Self { + Self::OltpTracesParsing(error) } } impl From for DocProcessorError { fn from(error: DocParsingError) -> Self { - DocProcessorError::DocMapperParsing(error) + Self::DocMapperParsing(error) } } impl From for DocProcessorError { fn from(error: serde_json::Error) -> Self { - DocProcessorError::Parsing(error.to_string()) + Self::JsonParsing(error.to_string()) } } impl From for DocProcessorError { fn from(error: FromUtf8Error) -> Self { - DocProcessorError::Parsing(error.to_string()) + Self::JsonParsing(error.to_string()) } } @@ -132,8 +141,11 @@ fn try_into_vrl_doc( map.insert(key, value); VrlValue::Object(map) } - SourceInputFormat::OtlpTraceJson | SourceInputFormat::OtlpTraceProtobuf => { - panic!("OTP log or trace data does not support VRL transforms") + SourceInputFormat::OtlpLogsJson + | SourceInputFormat::OtlpLogsProtobuf + | SourceInputFormat::OtlpTracesJson + | SourceInputFormat::OtlpTracesProtobuf => { + panic!("OTP logs or traces do not support VRL transforms") } }; let vrl_doc = VrlDoc::new(vrl_value, num_bytes); @@ -151,11 +163,19 @@ fn try_into_json_docs( .map(|json_obj| JsonDoc::new(json_obj, num_bytes)); JsonDocIterator::from(json_doc_result) } - SourceInputFormat::OtlpTraceJson => { + SourceInputFormat::OtlpLogsJson => { + let logs = parse_otlp_logs_json(&raw_doc); + JsonDocIterator::from(logs) + } + SourceInputFormat::OtlpLogsProtobuf => { + let logs = parse_otlp_logs_protobuf(&raw_doc); + JsonDocIterator::from(logs) + } + SourceInputFormat::OtlpTracesJson => { let spans = parse_otlp_spans_json(&raw_doc); JsonDocIterator::from(spans) } - SourceInputFormat::OtlpTraceProtobuf => { + SourceInputFormat::OtlpTracesProtobuf => { let spans = parse_otlp_spans_protobuf(&raw_doc); JsonDocIterator::from(spans) } @@ -200,6 +220,7 @@ fn parse_raw_doc( enum JsonDocIterator { One(Option>), + Logs(JsonLogIterator), Spans(JsonSpanIterator), } @@ -209,6 +230,9 @@ impl Iterator for JsonDocIterator { fn next(&mut self) -> Option { match self { Self::One(opt) => opt.take(), + Self::Logs(logs) => logs + .next() + .map(|(json_value, num_bytes)| JsonDoc::try_from_json_value(json_value, num_bytes)), Self::Spans(spans) => spans .next() .map(|(json_value, num_bytes)| JsonDoc::try_from_json_value(json_value, num_bytes)), @@ -227,10 +251,19 @@ where E: Into } } -impl From> for JsonDocIterator { - fn from(result: Result) -> Self { +impl From> for JsonDocIterator { + fn from(result: Result) -> Self { match result { - Ok(json_doc) => Self::Spans(json_doc), + Ok(logs) => Self::Logs(logs), + Err(error) => Self::One(Some(Err(DocProcessorError::from(error)))), + } + } +} + +impl From> for JsonDocIterator { + fn from(result: Result) -> Self { + match result { + Ok(spans) => Self::Spans(spans), Err(error) => Self::One(Some(Err(DocProcessorError::from(error)))), } } @@ -246,9 +279,9 @@ pub struct DocProcessorCounters { /// - number of docs that could not be transformed. /// - number of docs for which the doc mapper returnd an error. /// - number of valid docs. - pub num_doc_parsing_errors: AtomicU64, + pub num_doc_parse_errors: AtomicU64, pub num_transform_errors: AtomicU64, - pub num_oltp_trace_errors: AtomicU64, + pub num_oltp_parse_errors: AtomicU64, pub num_valid_docs: AtomicU64, /// Number of bytes that went through the indexer @@ -263,9 +296,9 @@ impl DocProcessorCounters { Self { index_id, source_id, - num_doc_parsing_errors: Default::default(), + num_doc_parse_errors: Default::default(), num_transform_errors: Default::default(), - num_oltp_trace_errors: Default::default(), + num_oltp_parse_errors: Default::default(), num_valid_docs: Default::default(), num_bytes_total: Default::default(), } @@ -274,8 +307,8 @@ impl DocProcessorCounters { /// Returns the overall number of docs that went through the indexer (valid or not). pub fn num_processed_docs(&self) -> u64 { self.num_valid_docs.load(Ordering::Relaxed) - + self.num_doc_parsing_errors.load(Ordering::Relaxed) - + self.num_oltp_trace_errors.load(Ordering::Relaxed) + + self.num_doc_parse_errors.load(Ordering::Relaxed) + + self.num_oltp_parse_errors.load(Ordering::Relaxed) + self.num_transform_errors.load(Ordering::Relaxed) } @@ -283,8 +316,8 @@ impl DocProcessorCounters { /// (For instance, because they were missing a required field or because their because /// their format was invalid) pub fn num_invalid_docs(&self) -> u64 { - self.num_doc_parsing_errors.load(Ordering::Relaxed) - + self.num_oltp_trace_errors.load(Ordering::Relaxed) + self.num_doc_parse_errors.load(Ordering::Relaxed) + + self.num_oltp_parse_errors.load(Ordering::Relaxed) + self.num_transform_errors.load(Ordering::Relaxed) } @@ -305,16 +338,16 @@ impl DocProcessorCounters { pub fn record_error(&self, error: DocProcessorError, num_bytes: u64) { let label = match error { DocProcessorError::DocMapperParsing(_) => { - self.num_doc_parsing_errors.fetch_add(1, Ordering::Relaxed); + self.num_doc_parse_errors.fetch_add(1, Ordering::Relaxed); "doc_mapper_error" } - DocProcessorError::OltpTraceParsing(_) => { - self.num_oltp_trace_errors.fetch_add(1, Ordering::Relaxed); - "otlp_trace_parsing_error" + DocProcessorError::JsonParsing(_) => { + self.num_doc_parse_errors.fetch_add(1, Ordering::Relaxed); + "json_parse_error" } - DocProcessorError::Parsing(_) => { - self.num_doc_parsing_errors.fetch_add(1, Ordering::Relaxed); - "parsing_error" + DocProcessorError::OltpLogsParsing(_) | DocProcessorError::OltpTracesParsing(_) => { + self.num_oltp_parse_errors.fetch_add(1, Ordering::Relaxed); + "otlp_parse_error" } #[cfg(feature = "vrl")] DocProcessorError::Transform(_) => { @@ -358,7 +391,7 @@ impl DocProcessor { ) -> anyhow::Result { let timestamp_field_opt = extract_timestamp_field(&*doc_mapper)?; if cfg!(not(feature = "vrl")) && transform_config_opt.is_some() { - bail!("VRL is not enabled. please recompile with the `vrl` feature") + bail!("VRL is not enabled: please recompile with the `vrl` feature") } let doc_processor = Self { doc_mapper, @@ -414,11 +447,10 @@ impl DocProcessor { } Err(error) => { rate_limited_warn!( - limit_per_min = 5, + limit_per_min = 10, index_id = self.counters.index_id, source_id = self.counters.source_id, - "{}", - error + "{error}", ); self.counters.record_error(error, num_bytes as u64); } @@ -563,8 +595,12 @@ mod tests { use quickwit_config::{build_doc_mapper, SearchSettings}; use quickwit_doc_mapper::{default_doc_mapper_for_test, DefaultDocMapper}; use quickwit_metastore::checkpoint::SourceCheckpointDelta; - use quickwit_opentelemetry::otlp::OtlpGrpcTracesService; + use quickwit_opentelemetry::otlp::{OtlpGrpcLogsService, OtlpGrpcTracesService}; + use quickwit_proto::opentelemetry::proto::collector::logs::v1::ExportLogsServiceRequest; use quickwit_proto::opentelemetry::proto::collector::trace::v1::ExportTraceServiceRequest; + use quickwit_proto::opentelemetry::proto::common::v1::any_value::Value as OtlpAnyValueValue; + use quickwit_proto::opentelemetry::proto::common::v1::AnyValue as OtlpAnyValue; + use quickwit_proto::opentelemetry::proto::logs::v1::{LogRecord, ResourceLogs, ScopeLogs}; use quickwit_proto::opentelemetry::proto::trace::v1::{ResourceSpans, ScopeSpans, Span}; use serde_json::Value as JsonValue; use tantivy::schema::NamedFieldDocument; @@ -574,7 +610,7 @@ mod tests { use crate::models::{PublishLock, RawDocBatch}; #[tokio::test] - async fn test_doc_processor_simple() -> anyhow::Result<()> { + async fn test_doc_processor_simple() { let index_id = "my-index"; let source_id = "my-source"; let universe = Universe::with_accelerated_time(); @@ -602,16 +638,17 @@ mod tests { ], 0..4, )) - .await?; + .await.unwrap(); + let counters = doc_processor_handle .process_pending_and_observe() .await .state; assert_eq!(counters.index_id, index_id); assert_eq!(counters.source_id, source_id); - assert_eq!(counters.num_doc_parsing_errors.load(Ordering::Relaxed), 2); + assert_eq!(counters.num_doc_parse_errors.load(Ordering::Relaxed), 2); assert_eq!(counters.num_transform_errors.load(Ordering::Relaxed), 0); - assert_eq!(counters.num_oltp_trace_errors.load(Ordering::Relaxed), 0); + assert_eq!(counters.num_oltp_parse_errors.load(Ordering::Relaxed), 0); assert_eq!(counters.num_valid_docs.load(Ordering::Relaxed), 2); assert_eq!(counters.num_bytes_total.load(Ordering::Relaxed), 387); @@ -628,7 +665,7 @@ mod tests { let schema = doc_mapper.schema(); let NamedFieldDocument(named_field_doc_map) = batch.docs[0].doc.to_named_doc(&schema); - let doc_json = JsonValue::Object(doc_mapper.doc_to_json(named_field_doc_map)?); + let doc_json = JsonValue::Object(doc_mapper.doc_to_json(named_field_doc_map).unwrap()); assert_eq!( doc_json, serde_json::json!({ @@ -647,7 +684,6 @@ mod tests { }) ); universe.assert_quit().await; - Ok(()) } const DOCMAPPER_WITH_PARTITION_JSON: &str = r#" @@ -661,7 +697,7 @@ mod tests { }"#; #[tokio::test] - async fn test_doc_processor_partitioning() -> anyhow::Result<()> { + async fn test_doc_processor_partitioning() { let doc_mapper: Arc = Arc::new( serde_json::from_str::(DOCMAPPER_WITH_PARTITION_JSON).unwrap(), ); @@ -688,7 +724,9 @@ mod tests { ], 0..2, )) - .await?; + .await + .unwrap(); + universe .send_exit_with_success(&doc_processor_mailbox) .await @@ -706,7 +744,6 @@ mod tests { assert_eq!(partition_ids[1], partition_ids[3]); assert_ne!(partition_ids[0], partition_ids[1]); universe.assert_quit().await; - Ok(()) } #[tokio::test] @@ -784,7 +821,163 @@ mod tests { } #[tokio::test] - async fn test_doc_processor_otlp_trace_json() { + async fn test_doc_processor_otlp_logs_json() { + let root_uri = Uri::for_test("ram:///indexes"); + let index_config = OtlpGrpcLogsService::index_config(&root_uri).unwrap(); + let doc_mapper = + build_doc_mapper(&index_config.doc_mapping, &SearchSettings::default()).unwrap(); + + let universe = Universe::with_accelerated_time(); + let (indexer_mailbox, indexer_inbox) = universe.create_test_mailbox(); + let doc_processor = DocProcessor::try_new( + "my-index".to_string(), + "my-source".to_string(), + doc_mapper, + indexer_mailbox, + None, + SourceInputFormat::OtlpLogsJson, + ) + .unwrap(); + + let (doc_processor_mailbox, doc_processor_handle) = + universe.spawn_builder().spawn(doc_processor); + + let scope_logs = vec![ScopeLogs { + log_records: vec![ + LogRecord { + time_unix_nano: 1_000_000_000, + body: Some(OtlpAnyValue { + value: Some(OtlpAnyValueValue::StringValue( + "foo log message".to_string(), + )), + }), + ..Default::default() + }, + LogRecord { + time_unix_nano: 1_000_000_001, + body: Some(OtlpAnyValue { + value: Some(OtlpAnyValueValue::StringValue( + "bar log message".to_string(), + )), + }), + ..Default::default() + }, + ], + ..Default::default() + }]; + let resource_logs = vec![ResourceLogs { + scope_logs, + ..Default::default() + }]; + let request = ExportLogsServiceRequest { resource_logs }; + let raw_doc_json = serde_json::to_vec(&request).unwrap(); + let raw_doc_batch = RawDocBatch::for_test(&[&raw_doc_json], 0..2); + doc_processor_mailbox + .send_message(raw_doc_batch) + .await + .unwrap(); + + universe + .send_exit_with_success(&doc_processor_mailbox) + .await + .unwrap(); + + let counters = doc_processor_handle + .process_pending_and_observe() + .await + .state; + assert_eq!(counters.num_valid_docs.load(Ordering::Relaxed), 2); + + let batch = indexer_inbox.drain_for_test_typed::(); + assert_eq!(batch.len(), 1); + assert_eq!(batch[0].docs.len(), 2); + + let (exit_status, _) = doc_processor_handle.join().await; + assert!(matches!(exit_status, ActorExitStatus::Success)); + universe.assert_quit().await; + } + + #[tokio::test] + async fn test_doc_processor_otlp_logs_proto() { + let root_uri = Uri::for_test("ram:///indexes"); + let index_config = OtlpGrpcLogsService::index_config(&root_uri).unwrap(); + let doc_mapper = + build_doc_mapper(&index_config.doc_mapping, &SearchSettings::default()).unwrap(); + + let universe = Universe::with_accelerated_time(); + let (indexer_mailbox, indexer_inbox) = universe.create_test_mailbox(); + let doc_processor = DocProcessor::try_new( + "my-index".to_string(), + "my-source".to_string(), + doc_mapper, + indexer_mailbox, + None, + SourceInputFormat::OtlpLogsProtobuf, + ) + .unwrap(); + + let (doc_processor_mailbox, doc_processor_handle) = + universe.spawn_builder().spawn(doc_processor); + + let scope_logs = vec![ScopeLogs { + log_records: vec![ + LogRecord { + time_unix_nano: 1_000_000_000, + body: Some(OtlpAnyValue { + value: Some(OtlpAnyValueValue::StringValue( + "foo log message".to_string(), + )), + }), + ..Default::default() + }, + LogRecord { + time_unix_nano: 1_000_000_001, + body: Some(OtlpAnyValue { + value: Some(OtlpAnyValueValue::StringValue( + "bar log message".to_string(), + )), + }), + ..Default::default() + }, + ], + ..Default::default() + }]; + let resource_logs = vec![ResourceLogs { + scope_logs, + ..Default::default() + }]; + let request = ExportLogsServiceRequest { resource_logs }; + let mut raw_doc_buffer = Vec::new(); + request.encode(&mut raw_doc_buffer).unwrap(); + + let raw_doc_batch = RawDocBatch::for_test(&[&raw_doc_buffer], 0..2); + doc_processor_mailbox + .send_message(raw_doc_batch) + .await + .unwrap(); + + universe + .send_exit_with_success(&doc_processor_mailbox) + .await + .unwrap(); + + let counters = doc_processor_handle + .process_pending_and_observe() + .await + .state; + assert_eq!(counters.num_valid_docs.load(Ordering::Relaxed), 2); + + let batch = indexer_inbox.drain_for_test_typed::(); + assert_eq!(batch.len(), 1); + assert_eq!(batch[0].docs.len(), 2); + + let (exit_status, _) = doc_processor_handle.join().await; + assert!(matches!(exit_status, ActorExitStatus::Success)); + universe.assert_quit().await; + } + + #[tokio::test] + async fn test_doc_processor_otlp_traces_json() { let root_uri = Uri::for_test("ram:///indexes"); let index_config = OtlpGrpcTracesService::index_config(&root_uri).unwrap(); let doc_mapper = @@ -798,7 +991,7 @@ mod tests { doc_mapper, indexer_mailbox, None, - SourceInputFormat::OtlpTraceJson, + SourceInputFormat::OtlpTracesJson, ) .unwrap(); @@ -845,7 +1038,6 @@ mod tests { .process_pending_and_observe() .await .state; - // assert_eq!(counters.num_parse_errors.load(Ordering::Relaxed), 1); assert_eq!(counters.num_valid_docs.load(Ordering::Relaxed), 2); let batch = indexer_inbox.drain_for_test_typed::(); @@ -858,7 +1050,7 @@ mod tests { } #[tokio::test] - async fn test_doc_processor_otlp_trace_proto() { + async fn test_doc_processor_otlp_traces_proto() { let root_uri = Uri::for_test("ram:///indexes"); let index_config = OtlpGrpcTracesService::index_config(&root_uri).unwrap(); let doc_mapper = @@ -872,7 +1064,7 @@ mod tests { doc_mapper, indexer_mailbox, None, - SourceInputFormat::OtlpTraceProtobuf, + SourceInputFormat::OtlpTracesProtobuf, ) .unwrap(); @@ -921,7 +1113,6 @@ mod tests { .process_pending_and_observe() .await .state; - // assert_eq!(counters.num_parse_errors.load(Ordering::Relaxed), 1); assert_eq!(counters.num_valid_docs.load(Ordering::Relaxed), 2); let batch = indexer_inbox.drain_for_test_typed::(); @@ -981,9 +1172,9 @@ mod tests_vrl { .state; assert_eq!(counters.index_id, index_id.to_string()); assert_eq!(counters.source_id, source_id.to_string()); - assert_eq!(counters.num_doc_parsing_errors.load(Ordering::Relaxed), 2); + assert_eq!(counters.num_doc_parse_errors.load(Ordering::Relaxed), 2); assert_eq!(counters.num_transform_errors.load(Ordering::Relaxed), 0); - assert_eq!(counters.num_oltp_trace_errors.load(Ordering::Relaxed), 0); + assert_eq!(counters.num_oltp_parse_errors.load(Ordering::Relaxed), 0); assert_eq!(counters.num_valid_docs.load(Ordering::Relaxed), 2); assert_eq!(counters.num_bytes_total.load(Ordering::Relaxed), 397); @@ -1071,9 +1262,9 @@ mod tests_vrl { .state; assert_eq!(counters.index_id, index_id); assert_eq!(counters.source_id, source_id); - assert_eq!(counters.num_doc_parsing_errors.load(Ordering::Relaxed), 0,); + assert_eq!(counters.num_doc_parse_errors.load(Ordering::Relaxed), 0,); assert_eq!(counters.num_transform_errors.load(Ordering::Relaxed), 1,); - assert_eq!(counters.num_oltp_trace_errors.load(Ordering::Relaxed), 0,); + assert_eq!(counters.num_oltp_parse_errors.load(Ordering::Relaxed), 0,); assert_eq!(counters.num_valid_docs.load(Ordering::Relaxed), 2,); assert_eq!(counters.num_bytes_total.load(Ordering::Relaxed), 200,); diff --git a/quickwit/quickwit-opentelemetry/src/otlp/logs.rs b/quickwit/quickwit-opentelemetry/src/otlp/logs.rs index 84b1f4926de..613203fdeeb 100644 --- a/quickwit/quickwit-opentelemetry/src/otlp/logs.rs +++ b/quickwit/quickwit-opentelemetry/src/otlp/logs.rs @@ -18,9 +18,11 @@ // along with this program. If not, see . use std::cmp::{Ord, Ordering, PartialEq, PartialOrd}; -use std::collections::{BTreeSet, HashMap}; +use std::collections::{btree_set, BTreeSet, HashMap}; use async_trait::async_trait; +use prost::Message; +use quickwit_common::rate_limited_error; use quickwit_common::uri::Uri; use quickwit_config::{load_index_config_from_user_config, ConfigFormat, IndexConfig}; use quickwit_ingest::{ @@ -39,7 +41,7 @@ use tracing::{error, instrument, warn, Span as RuntimeSpan}; use super::{ extract_otel_index_id_from_metadata, is_zero, parse_log_record_body, OtelSignal, SpanId, - TraceId, + TraceId, TryFromSpanIdError, TryFromTraceIdError, }; use crate::otlp::extract_attributes; use crate::otlp::metrics::OTLP_SERVICE_METRICS; @@ -129,6 +131,18 @@ search_settings: default_search_fields: [body.message] "#; +#[derive(Debug, thiserror::Error)] +pub enum OtlpLogsError { + #[error("failed to deserialize JSON log records: `{0}`")] + Json(#[from] serde_json::Error), + #[error("failed to deserialize Protobuf log records: `{0}`")] + Protobuf(#[from] prost::DecodeError), + #[error("failed to parse log record: `{0}`")] + SpanId(#[from] TryFromSpanIdError), + #[error("failed to parse log record: `{0}`")] + TraceId(#[from] TryFromTraceIdError), +} + #[derive(Debug, Serialize, Deserialize)] pub struct LogRecord { pub timestamp_nanos: u64, @@ -273,7 +287,7 @@ impl OtlpGrpcLogsService { .inc_by(num_bytes); let response = ExportLogsServiceResponse { - // `rejected_spans=0` and `error_message=""` is consided a "full" success. + // `rejected_log_records=0` and `error_message=""` is consided a "full" success. partial_success: Some(ExportLogsPartialSuccess { rejected_log_records: num_parse_errors as i64, error_message, @@ -282,117 +296,16 @@ impl OtlpGrpcLogsService { Ok(response) } - #[instrument(skip_all, parent = parent_span, fields(num_spans = Empty, num_bytes = Empty, num_parse_errors = Empty))] + #[instrument(skip_all, parent = parent_span, fields(num_log_records = Empty, num_bytes = Empty, num_parse_errors = Empty))] fn parse_logs( request: ExportLogsServiceRequest, parent_span: RuntimeSpan, index_id: IndexId, ) -> Result { - let mut log_records = BTreeSet::new(); - let mut num_log_records = 0; - let mut num_parse_errors = 0; + let (log_records, mut num_parse_errors) = parse_otlp_logs(request)?; + let num_log_records = log_records.len() as u64; let mut error_message = String::new(); - for resource_log in request.resource_logs { - let mut resource_attributes = extract_attributes( - resource_log - .resource - .clone() - .map(|rsrc| rsrc.attributes) - .unwrap_or_else(Vec::new), - ); - let resource_dropped_attributes_count = resource_log - .resource - .map(|rsrc| rsrc.dropped_attributes_count) - .unwrap_or(0); - - let service_name = match resource_attributes.remove("service.name") { - Some(JsonValue::String(value)) => value.to_string(), - _ => "unknown_service".to_string(), - }; - for scope_log in resource_log.scope_logs { - let scope_name = scope_log - .scope - .as_ref() - .map(|scope| &scope.name) - .filter(|name| !name.is_empty()); - let scope_version = scope_log - .scope - .as_ref() - .map(|scope| &scope.version) - .filter(|version| !version.is_empty()); - let scope_attributes = extract_attributes( - scope_log - .scope - .clone() - .map(|scope| scope.attributes) - .unwrap_or_else(Vec::new), - ); - let scope_dropped_attributes_count = scope_log - .scope - .as_ref() - .map(|scope| scope.dropped_attributes_count) - .unwrap_or(0); - - for log_record in scope_log.log_records { - num_log_records += 1; - - if log_record.time_unix_nano == 0 { - num_parse_errors += 1; - continue; - } - let observed_timestamp_nanos = if log_record.observed_time_unix_nano != 0 { - Some(log_record.observed_time_unix_nano) - } else { - None - }; - let trace_id = if log_record.trace_id.iter().any(|&byte| byte != 0) { - let trace_id = TraceId::try_from(log_record.trace_id)?; - Some(trace_id) - } else { - None - }; - let span_id = if log_record.span_id.iter().any(|&byte| byte != 0) { - let span_id = SpanId::try_from(log_record.span_id)?; - Some(span_id) - } else { - None - }; - let trace_flags = Some(log_record.flags); - - let severity_text = if !log_record.severity_text.is_empty() { - Some(log_record.severity_text) - } else { - None - }; - let severity_number = log_record.severity_number; - let body = log_record.body.and_then(parse_log_record_body); - let attributes = extract_attributes(log_record.attributes); - let dropped_attributes_count = log_record.dropped_attributes_count; - - let log_record = LogRecord { - timestamp_nanos: log_record.time_unix_nano, - observed_timestamp_nanos, - service_name: service_name.clone(), - severity_text, - severity_number, - body, - attributes, - trace_id, - span_id, - trace_flags, - dropped_attributes_count, - resource_attributes: resource_attributes.clone(), - resource_dropped_attributes_count, - scope_name: scope_name.cloned(), - scope_version: scope_version.cloned(), - scope_attributes: scope_attributes.clone(), - scope_dropped_attributes_count, - }; - log_records.insert(OrdLogRecord(log_record)); - } - } - } let mut doc_batch = DocBatchBuilder::new(index_id).json_writer(); for log_record in log_records { if let Err(error) = doc_batch.ingest_doc(&log_record.0) { @@ -407,13 +320,13 @@ impl OtlpGrpcLogsService { current_span.record("num_bytes", doc_batch.num_bytes()); current_span.record("num_parse_errors", num_parse_errors); - let parsed_spans = ParsedLogRecords { + let parsed_logs = ParsedLogRecords { doc_batch, num_log_records, num_parse_errors, error_message, }; - Ok(parsed_spans) + Ok(parsed_logs) } #[instrument(skip_all, fields(num_bytes = doc_batch.num_bytes()))] @@ -477,6 +390,169 @@ impl LogsService for OtlpGrpcLogsService { } } +fn parse_otlp_logs( + request: ExportLogsServiceRequest, +) -> Result<(BTreeSet, u64), OtlpLogsError> { + let mut log_records = BTreeSet::new(); + let mut num_parse_errors = 0; + + for resource_log in request.resource_logs { + let mut resource_attributes = extract_attributes( + resource_log + .resource + .clone() + .map(|rsrc| rsrc.attributes) + .unwrap_or_default(), + ); + let resource_dropped_attributes_count = resource_log + .resource + .map(|rsrc| rsrc.dropped_attributes_count) + .unwrap_or(0); + + let service_name = match resource_attributes.remove("service.name") { + Some(JsonValue::String(value)) => value.to_string(), + _ => "unknown_service".to_string(), + }; + for scope_log in resource_log.scope_logs { + let scope_name = scope_log + .scope + .as_ref() + .map(|scope| &scope.name) + .filter(|name| !name.is_empty()); + let scope_version = scope_log + .scope + .as_ref() + .map(|scope| &scope.version) + .filter(|version| !version.is_empty()); + let scope_attributes = extract_attributes( + scope_log + .scope + .clone() + .map(|scope| scope.attributes) + .unwrap_or_default(), + ); + let scope_dropped_attributes_count = scope_log + .scope + .as_ref() + .map(|scope| scope.dropped_attributes_count) + .unwrap_or(0); + + for log_record in scope_log.log_records { + if log_record.time_unix_nano == 0 { + rate_limited_error!(limit_per_min = 10, "skipping record"); + num_parse_errors += 1; + continue; + } + let observed_timestamp_nanos = if log_record.observed_time_unix_nano != 0 { + Some(log_record.observed_time_unix_nano) + } else { + None + }; + let trace_id = if log_record.trace_id.iter().any(|&byte| byte != 0) { + let trace_id = TraceId::try_from(log_record.trace_id)?; + Some(trace_id) + } else { + None + }; + let span_id = if log_record.span_id.iter().any(|&byte| byte != 0) { + let span_id = SpanId::try_from(log_record.span_id)?; + Some(span_id) + } else { + None + }; + let trace_flags = Some(log_record.flags); + + let severity_text = if !log_record.severity_text.is_empty() { + Some(log_record.severity_text) + } else { + None + }; + let severity_number = log_record.severity_number; + let body = log_record.body.and_then(parse_log_record_body); + let attributes = extract_attributes(log_record.attributes); + let dropped_attributes_count = log_record.dropped_attributes_count; + + let log_record = LogRecord { + timestamp_nanos: log_record.time_unix_nano, + observed_timestamp_nanos, + service_name: service_name.clone(), + severity_text, + severity_number, + body, + attributes, + trace_id, + span_id, + trace_flags, + dropped_attributes_count, + resource_attributes: resource_attributes.clone(), + resource_dropped_attributes_count, + scope_name: scope_name.cloned(), + scope_version: scope_version.cloned(), + scope_attributes: scope_attributes.clone(), + scope_dropped_attributes_count, + }; + log_records.insert(OrdLogRecord(log_record)); + } + } + } + Ok((log_records, num_parse_errors)) +} + +/// An iterator of JSON OTLP log records for use in the doc processor. +pub struct JsonLogIterator { + logs: btree_set::IntoIter, + current_log_idx: usize, + num_logs: usize, + avg_log_size: usize, + avg_log_size_rem: usize, +} + +impl JsonLogIterator { + fn new(logs: BTreeSet, num_bytes: usize) -> Self { + let num_logs = logs.len(); + let avg_log_size = num_bytes.checked_div(num_logs).unwrap_or(0); + let avg_log_size_rem = avg_log_size + num_bytes.checked_rem(num_logs).unwrap_or(0); + + Self { + logs: logs.into_iter(), + current_log_idx: 0, + num_logs, + avg_log_size, + avg_log_size_rem, + } + } +} + +impl Iterator for JsonLogIterator { + type Item = (JsonValue, usize); + + fn next(&mut self) -> Option { + let log_opt = self.logs.next().map(|OrdLogRecord(log)| { + serde_json::to_value(log).expect("`LogRecord` should be JSON serializable") + }); + if log_opt.is_some() { + self.current_log_idx += 1; + } + if self.current_log_idx < self.num_logs { + log_opt.map(|span| (span, self.avg_log_size)) + } else { + log_opt.map(|span| (span, self.avg_log_size_rem)) + } + } +} + +pub fn parse_otlp_logs_json(payload_json: &[u8]) -> Result { + let request: ExportLogsServiceRequest = serde_json::from_slice(payload_json)?; + let (log_records, _num_parse_errors) = parse_otlp_logs(request)?; + Ok(JsonLogIterator::new(log_records, payload_json.len())) +} + +pub fn parse_otlp_logs_protobuf(payload_proto: &[u8]) -> Result { + let request = ExportLogsServiceRequest::decode(payload_proto)?; + let (log_records, _num_parse_errors) = parse_otlp_logs(request)?; + Ok(JsonLogIterator::new(log_records, payload_proto.len())) +} + #[cfg(test)] mod tests { use quickwit_metastore::{metastore_for_test, CreateIndexRequestExt}; diff --git a/quickwit/quickwit-opentelemetry/src/otlp/mod.rs b/quickwit/quickwit-opentelemetry/src/otlp/mod.rs index 66866bf589c..8ff9b4b8e62 100644 --- a/quickwit/quickwit-opentelemetry/src/otlp/mod.rs +++ b/quickwit/quickwit-opentelemetry/src/otlp/mod.rs @@ -34,7 +34,10 @@ mod test_utils; mod trace_id; mod traces; -pub use logs::{OtlpGrpcLogsService, OTEL_LOGS_INDEX_ID}; +pub use logs::{ + parse_otlp_logs_json, parse_otlp_logs_protobuf, JsonLogIterator, OtlpGrpcLogsService, + OtlpLogsError, OTEL_LOGS_INDEX_ID, +}; pub use span_id::{SpanId, TryFromSpanIdError}; #[cfg(any(test, feature = "testsuite"))] pub use test_utils::make_resource_spans_for_test; @@ -42,7 +45,7 @@ use tonic::Status; pub use trace_id::{TraceId, TryFromTraceIdError}; pub use traces::{ parse_otlp_spans_json, parse_otlp_spans_protobuf, Event, JsonSpanIterator, Link, - OtlpGrpcTracesService, OtlpTraceError, Span, SpanFingerprint, SpanKind, SpanStatus, + OtlpGrpcTracesService, OtlpTracesError, Span, SpanFingerprint, SpanKind, SpanStatus, OTEL_TRACES_INDEX_ID, OTEL_TRACES_INDEX_ID_PATTERN, }; @@ -68,8 +71,14 @@ impl OtelSignal { } } -impl From for tonic::Status { - fn from(error: OtlpTraceError) -> Self { +impl From for tonic::Status { + fn from(error: OtlpLogsError) -> Self { + tonic::Status::invalid_argument(error.to_string()) + } +} + +impl From for tonic::Status { + fn from(error: OtlpTracesError) -> Self { tonic::Status::invalid_argument(error.to_string()) } } @@ -197,14 +206,14 @@ pub(crate) fn extract_otel_index_id_from_metadata( .transpose() .map_err(|error| { Status::internal(format!( - "Failed to extract index ID from request metadata: {}", + "failed to extract index ID from request metadata: {}", error )) })? .unwrap_or_else(|| otel_signal.default_index_id()); validate_identifier("index_id", index_id).map_err(|error| { Status::internal(format!( - "Invalid index ID pattern in request metadata: {}", + "invalid index ID pattern in request metadata: {}", error )) })?; diff --git a/quickwit/quickwit-opentelemetry/src/otlp/traces.rs b/quickwit/quickwit-opentelemetry/src/otlp/traces.rs index 5bf4081a20f..085d780d5f3 100644 --- a/quickwit/quickwit-opentelemetry/src/otlp/traces.rs +++ b/quickwit/quickwit-opentelemetry/src/otlp/traces.rs @@ -169,10 +169,10 @@ search_settings: "#; #[derive(Debug, thiserror::Error)] -pub enum OtlpTraceError { - #[error("failed to deserialize JSON span: `{0}`")] +pub enum OtlpTracesError { + #[error("failed to deserialize JSON spans: `{0}`")] Json(#[from] serde_json::Error), - #[error("failed to deserialize Protobuf span: `{0}`")] + #[error("failed to deserialize Protobuf spans: `{0}`")] Protobuf(#[from] prost::DecodeError), #[error("failed to parse span: `{0}`")] SpanId(#[from] TryFromSpanIdError), @@ -246,7 +246,7 @@ impl Span { span: OtlpSpan, resource: &Resource, scope: &Scope, - ) -> Result { + ) -> Result { let trace_id = TraceId::try_from(span.trace_id)?; let span_id = SpanId::try_from(span.span_id)?; let parent_span_id = if !span.parent_span_id.is_empty() { @@ -629,7 +629,7 @@ pub struct Link { } impl Link { - fn try_from_otlp(link: OtlpLink) -> Result { + fn try_from_otlp(link: OtlpLink) -> Result { let link_trace_id = TraceId::try_from(link.trace_id)?; let link_span_id = SpanId::try_from(link.span_id)?; let link = Link { @@ -649,7 +649,7 @@ impl Link { fn parse_otlp_spans( request: ExportTraceServiceRequest, -) -> Result, OtlpTraceError> { +) -> Result, OtlpTracesError> { let mut spans = BTreeSet::new(); for resource_spans in request.resource_spans { @@ -843,7 +843,7 @@ impl TraceService for OtlpGrpcTracesService { } } -/// An JSON span iterator for use in the doc processor. +/// An iterator of JSON OTLP spans for use in the doc processor. pub struct JsonSpanIterator { spans: btree_set::IntoIter, current_span_idx: usize, @@ -886,13 +886,15 @@ impl Iterator for JsonSpanIterator { } } -pub fn parse_otlp_spans_json(payload_json: &[u8]) -> Result { +pub fn parse_otlp_spans_json(payload_json: &[u8]) -> Result { let request: ExportTraceServiceRequest = serde_json::from_slice(payload_json)?; let spans = parse_otlp_spans(request)?; Ok(JsonSpanIterator::new(spans, payload_json.len())) } -pub fn parse_otlp_spans_protobuf(payload_proto: &[u8]) -> Result { +pub fn parse_otlp_spans_protobuf( + payload_proto: &[u8], +) -> Result { let request = ExportTraceServiceRequest::decode(payload_proto)?; let spans = parse_otlp_spans(request)?; Ok(JsonSpanIterator::new(spans, payload_proto.len()))