From 78db32c58d47fe8adaa154c891096b10c338c947 Mon Sep 17 00:00:00 2001 From: Lalit Kumar Bhasin Date: Tue, 21 Jan 2025 14:33:20 -0800 Subject: [PATCH 1/6] Add the full log validation test in Integration test (#2525) --- .../tests/integration_test/src/test_utils.rs | 14 ++- .../tests/integration_test/tests/logs.rs | 90 ++++++------------- .../tests/logs_serialize_deserialize.rs | 64 +++++++++++++ 3 files changed, 104 insertions(+), 64 deletions(-) create mode 100644 opentelemetry-otlp/tests/integration_test/tests/logs_serialize_deserialize.rs diff --git a/opentelemetry-otlp/tests/integration_test/src/test_utils.rs b/opentelemetry-otlp/tests/integration_test/src/test_utils.rs index b0264a5b76..d5662407f9 100644 --- a/opentelemetry-otlp/tests/integration_test/src/test_utils.rs +++ b/opentelemetry-otlp/tests/integration_test/src/test_utils.rs @@ -20,8 +20,7 @@ use anyhow::Result; use opentelemetry::{otel_debug, otel_info}; -use std::fs; -use std::fs::File; +use std::fs::{self, File, OpenOptions}; use std::os::unix::fs::PermissionsExt; use std::sync::{Arc, Mutex, Once, OnceLock}; use testcontainers::core::wait::HttpWaitStrategy; @@ -125,6 +124,17 @@ fn upsert_empty_file(path: &str) -> File { file } +/// Cleans up file specificed as argument by truncating its content. +/// +/// This function is meant to cleanup the generated json file before a test starts, +/// preventing entries from previous tests from interfering with the current test's results. +pub fn cleanup_file(file_path: &str) { + let _ = OpenOptions::new() + .write(true) + .truncate(true) + .open(file_path); // ignore result, as file may not exist +} + /// /// Shuts down our collector container. This should be run as part of each test /// suite shutting down! diff --git a/opentelemetry-otlp/tests/integration_test/tests/logs.rs b/opentelemetry-otlp/tests/integration_test/tests/logs.rs index eb5bc7170b..62992a162f 100644 --- a/opentelemetry-otlp/tests/integration_test/tests/logs.rs +++ b/opentelemetry-otlp/tests/integration_test/tests/logs.rs @@ -2,12 +2,14 @@ use anyhow::Result; use ctor::dtor; +use integration_test_runner::logs_asserter::{read_logs_from_json, LogsAsserter}; use integration_test_runner::test_utils; use opentelemetry_otlp::LogExporter; use opentelemetry_sdk::logs::LoggerProvider; use opentelemetry_sdk::{logs as sdklogs, Resource}; use std::fs::File; use std::io::Read; +use std::os::unix::fs::MetadataExt; fn init_logs(is_simple: bool) -> Result { let exporter_builder = LogExporter::builder(); @@ -88,26 +90,26 @@ mod logtests { #[tokio::test(flavor = "multi_thread", worker_threads = 4)] #[cfg(any(feature = "tonic-client", feature = "reqwest-blocking-client"))] pub async fn logs_batch_tokio_multi_thread() -> Result<()> { - logs_batch_tokio_helper().await + logs_tokio_helper(false).await } #[tokio::test(flavor = "multi_thread", worker_threads = 1)] #[cfg(any(feature = "tonic-client", feature = "reqwest-blocking-client"))] pub async fn logs_batch_tokio_multi_with_one_worker() -> Result<()> { - logs_batch_tokio_helper().await + logs_tokio_helper(false).await } #[tokio::test(flavor = "current_thread")] #[cfg(any(feature = "tonic-client", feature = "reqwest-blocking-client"))] pub async fn logs_batch_tokio_current() -> Result<()> { - logs_batch_tokio_helper().await + logs_tokio_helper(false).await } - async fn logs_batch_tokio_helper() -> Result<()> { - use crate::{assert_logs_results, init_logs}; + async fn logs_tokio_helper(is_simple: bool) -> Result<()> { + use crate::{assert_logs_results_contains, init_logs}; test_utils::start_collector_container().await?; - let logger_provider = init_logs(false).unwrap(); + let logger_provider = init_logs(is_simple).unwrap(); let layer = OpenTelemetryTracingBridge::new(&logger_provider); let subscriber = tracing_subscriber::registry().with(layer); // generate a random uuid and store it to expected guid @@ -119,20 +121,20 @@ mod logtests { let _ = logger_provider.shutdown(); tokio::time::sleep(Duration::from_secs(5)).await; - assert_logs_results(test_utils::LOGS_FILE, expected_uuid.as_str())?; + assert_logs_results_contains(test_utils::LOGS_FILE, expected_uuid.as_str())?; Ok(()) } #[tokio::test(flavor = "multi_thread", worker_threads = 4)] #[cfg(any(feature = "tonic-client", feature = "reqwest-client"))] pub async fn logs_simple_tokio_multi_thread() -> Result<()> { - logs_simple_tokio_helper().await + logs_tokio_helper(true).await } #[tokio::test(flavor = "multi_thread", worker_threads = 1)] #[cfg(any(feature = "tonic-client", feature = "reqwest-client"))] pub async fn logs_simple_tokio_multi_with_one_worker() -> Result<()> { - logs_simple_tokio_helper().await + logs_tokio_helper(true).await } // Ignored, to be investigated @@ -140,37 +142,16 @@ mod logtests { #[tokio::test(flavor = "current_thread")] #[cfg(any(feature = "tonic-client", feature = "reqwest-client"))] pub async fn logs_simple_tokio_current() -> Result<()> { - logs_simple_tokio_helper().await - } - - async fn logs_simple_tokio_helper() -> Result<()> { - use crate::{assert_logs_results, init_logs}; - test_utils::start_collector_container().await?; - - let logger_provider = init_logs(true).unwrap(); - let layer = OpenTelemetryTracingBridge::new(&logger_provider); - let subscriber = tracing_subscriber::registry().with(layer); - info!("Tracing initialized"); - // generate a random uuid and store it to expected guid - let expected_uuid = Uuid::new_v4().to_string(); - { - let _guard = tracing::subscriber::set_default(subscriber); - info!(target: "my-target", uuid = expected_uuid, "hello from {}. My price is {}.", "banana", 2.99); - } - - let _ = logger_provider.shutdown(); - tokio::time::sleep(Duration::from_secs(5)).await; - assert_logs_results(test_utils::LOGS_FILE, expected_uuid.as_str())?; - Ok(()) + logs_tokio_helper(true).await } #[test] #[cfg(any(feature = "tonic-client", feature = "reqwest-blocking-client"))] pub fn logs_batch_non_tokio_main() -> Result<()> { - logs_batch_non_tokio_helper() + logs_non_tokio_helper(false) } - fn logs_batch_non_tokio_helper() -> Result<()> { + fn logs_non_tokio_helper(is_simple: bool) -> Result<()> { // Initialize the logger provider inside a tokio runtime // as this allows tonic client to capture the runtime, // but actual export occurs from the dedicated std::thread @@ -179,7 +160,7 @@ mod logtests { let logger_provider = rt.block_on(async { // While we're here setup our collector container too, as this needs tokio to run test_utils::start_collector_container().await?; - init_logs(false) + init_logs(is_simple) })?; let layer = layer::OpenTelemetryTracingBridge::new(&logger_provider); let subscriber = tracing_subscriber::registry().with(layer); @@ -192,43 +173,18 @@ mod logtests { let _ = logger_provider.shutdown(); std::thread::sleep(Duration::from_secs(5)); - assert_logs_results(test_utils::LOGS_FILE, expected_uuid.as_str())?; + assert_logs_results_contains(test_utils::LOGS_FILE, expected_uuid.as_str())?; Ok(()) } #[test] #[cfg(any(feature = "tonic-client", feature = "reqwest-blocking-client"))] pub fn logs_simple_non_tokio_main() -> Result<()> { - logs_simple_non_tokio_helper() - } - - fn logs_simple_non_tokio_helper() -> Result<()> { - // Initialize the logger provider inside a tokio runtime - // as this allows tonic client to capture the runtime, - // but actual export occurs from the main non-tokio thread. - let rt = tokio::runtime::Runtime::new()?; - let logger_provider = rt.block_on(async { - // While we're here setup our collector container too, as this needs tokio to run - test_utils::start_collector_container().await?; - init_logs(true) - })?; - let layer = layer::OpenTelemetryTracingBridge::new(&logger_provider); - let subscriber = tracing_subscriber::registry().with(layer); - // generate a random uuid and store it to expected guid - let expected_uuid = Uuid::new_v4().to_string(); - { - let _guard = tracing::subscriber::set_default(subscriber); - info!(target: "my-target", uuid = expected_uuid, "hello from {}. My price is {}.", "banana", 2.99); - } - - let _ = logger_provider.shutdown(); - std::thread::sleep(Duration::from_secs(5)); - assert_logs_results(test_utils::LOGS_FILE, expected_uuid.as_str())?; - Ok(()) + logs_non_tokio_helper(true) } } -pub fn assert_logs_results(result: &str, expected_content: &str) -> Result<()> { +pub fn assert_logs_results_contains(result: &str, expected_content: &str) -> Result<()> { let file = File::open(result)?; let mut contents = String::new(); let mut reader = std::io::BufReader::new(&file); @@ -237,6 +193,16 @@ pub fn assert_logs_results(result: &str, expected_content: &str) -> Result<()> { Ok(()) } +pub fn assert_logs_results(result: &str, expected: &str) -> Result<()> { + let left = read_logs_from_json(File::open(expected)?)?; + let right = read_logs_from_json(File::open(result)?)?; + + LogsAsserter::new(left, right).assert(); + + assert!(File::open(result).unwrap().metadata().unwrap().size() > 0); + Ok(()) +} + /// /// Make sure we stop the collector container, otherwise it will sit around hogging our /// ports and subsequent test runs will fail. diff --git a/opentelemetry-otlp/tests/integration_test/tests/logs_serialize_deserialize.rs b/opentelemetry-otlp/tests/integration_test/tests/logs_serialize_deserialize.rs new file mode 100644 index 0000000000..37854ba397 --- /dev/null +++ b/opentelemetry-otlp/tests/integration_test/tests/logs_serialize_deserialize.rs @@ -0,0 +1,64 @@ +#![cfg(unix)] + +use anyhow::Result; +use ctor::dtor; +use integration_test_runner::logs_asserter::{read_logs_from_json, LogsAsserter}; +use integration_test_runner::test_utils; +use opentelemetry_appender_tracing::layer; +use opentelemetry_otlp::LogExporter; +use opentelemetry_sdk::logs::LoggerProvider; +use opentelemetry_sdk::Resource; +use std::fs::File; +use std::os::unix::fs::MetadataExt; +use tracing::info; +use tracing_subscriber::layer::SubscriberExt; + +#[tokio::test(flavor = "multi_thread", worker_threads = 4)] +#[cfg(feature = "tonic-client")] +pub async fn test_logs() -> Result<()> { + test_utils::start_collector_container().await?; + test_utils::cleanup_file("./actual/logs.json"); // Ensure logs.json is empty before the test + let exporter_builder = LogExporter::builder().with_tonic(); + let exporter = exporter_builder.build()?; + let mut logger_provider_builder = LoggerProvider::builder(); + logger_provider_builder = logger_provider_builder.with_batch_exporter(exporter); + let logger_provider = logger_provider_builder + .with_resource( + Resource::builder_empty() + .with_service_name("logs-integration-test") + .build(), + ) + .build(); + let layer = layer::OpenTelemetryTracingBridge::new(&logger_provider); + let subscriber = tracing_subscriber::registry().with(layer); + + { + let _guard = tracing::subscriber::set_default(subscriber); + info!(target: "my-target", "hello from {}. My price is {}.", "banana", 2.99); + } + + let _ = logger_provider.shutdown(); + tokio::time::sleep(std::time::Duration::from_secs(5)).await; + assert_logs_results(test_utils::LOGS_FILE, "expected/logs.json")?; + Ok(()) +} + +fn assert_logs_results(result: &str, expected: &str) -> Result<()> { + let left = read_logs_from_json(File::open(expected)?)?; + let right = read_logs_from_json(File::open(result)?)?; + + LogsAsserter::new(left, right).assert(); + + assert!(File::open(result).unwrap().metadata().unwrap().size() > 0); + Ok(()) +} + +/// +/// Make sure we stop the collector container, otherwise it will sit around hogging our +/// ports and subsequent test runs will fail. +/// +#[dtor] +fn shutdown() { + println!("metrics::shutdown"); + test_utils::stop_collector_container(); +} From d2a6b3b2fc3ca10c834ef0bad1a57611b589f495 Mon Sep 17 00:00:00 2001 From: Lalit Kumar Bhasin Date: Tue, 21 Jan 2025 15:00:40 -0800 Subject: [PATCH 2/6] BatchSpanProcessor optimizations - Separate control signal queue, and wake up background thread only when required. (#2526) Co-authored-by: Cijo Thomas --- opentelemetry-sdk/src/trace/span_processor.rs | 215 +++++++++++++++--- 1 file changed, 180 insertions(+), 35 deletions(-) diff --git a/opentelemetry-sdk/src/trace/span_processor.rs b/opentelemetry-sdk/src/trace/span_processor.rs index 53ee2f9bc0..87bf76b9c2 100644 --- a/opentelemetry-sdk/src/trace/span_processor.rs +++ b/opentelemetry-sdk/src/trace/span_processor.rs @@ -85,6 +85,7 @@ pub trait SpanProcessor: Send + Sync + std::fmt::Debug { /// `on_end` is called after a `Span` is ended (i.e., the end timestamp is /// already set). This method is called synchronously within the `Span::end` /// API, therefore it should not block or throw an exception. + /// TODO - This method should take reference to `SpanData` fn on_end(&self, span: SpanData); /// Force the spans lying in the cache to be exported. fn force_flush(&self) -> TraceResult<()>; @@ -163,6 +164,7 @@ impl SpanProcessor for SimpleSpanProcessor { } } +use crate::export::trace::ExportResult; /// The `BatchSpanProcessor` collects finished spans in a buffer and exports them /// in batches to the configured `SpanExporter`. This processor is ideal for /// high-throughput environments, as it minimizes the overhead of exporting spans @@ -217,8 +219,8 @@ impl SpanProcessor for SimpleSpanProcessor { /// provider.shutdown(); /// } /// ``` -use futures_executor::block_on; use std::sync::mpsc::sync_channel; +use std::sync::mpsc::Receiver; use std::sync::mpsc::RecvTimeoutError; use std::sync::mpsc::SyncSender; @@ -226,7 +228,8 @@ use std::sync::mpsc::SyncSender; #[allow(clippy::large_enum_variant)] #[derive(Debug)] enum BatchMessage { - ExportSpan(SpanData), + //ExportSpan(SpanData), + ExportSpan(Arc), ForceFlush(SyncSender>), Shutdown(SyncSender>), SetResource(Arc), @@ -235,12 +238,17 @@ enum BatchMessage { /// A batch span processor with a dedicated background thread. #[derive(Debug)] pub struct BatchSpanProcessor { - message_sender: SyncSender, + span_sender: SyncSender, // Data channel to store spans + message_sender: SyncSender, // Control channel to store control messages. handle: Mutex>>, forceflush_timeout: Duration, shutdown_timeout: Duration, is_shutdown: AtomicBool, dropped_span_count: Arc, + export_span_message_sent: Arc, + current_batch_size: Arc, + max_export_batch_size: usize, + max_queue_size: usize, } impl BatchSpanProcessor { @@ -255,7 +263,12 @@ impl BatchSpanProcessor { where E: SpanExporter + Send + 'static, { - let (message_sender, message_receiver) = sync_channel(config.max_queue_size); + let (span_sender, span_receiver) = sync_channel::(config.max_queue_size); + let (message_sender, message_receiver) = sync_channel::(64); // Is this a reasonable bound? + let max_queue_size = config.max_queue_size; + let max_export_batch_size = config.max_export_batch_size; + let current_batch_size = Arc::new(AtomicUsize::new(0)); + let current_batch_size_for_thread = current_batch_size.clone(); let handle = thread::Builder::new() .name("OpenTelemetry.Traces.BatchProcessor".to_string()) @@ -268,7 +281,7 @@ impl BatchSpanProcessor { ); let mut spans = Vec::with_capacity(config.max_export_batch_size); let mut last_export_time = Instant::now(); - + let current_batch_size = current_batch_size_for_thread; loop { let remaining_time_option = config .scheduled_delay @@ -279,28 +292,52 @@ impl BatchSpanProcessor { }; match message_receiver.recv_timeout(remaining_time) { Ok(message) => match message { - BatchMessage::ExportSpan(span) => { - spans.push(span); - if spans.len() >= config.max_queue_size - || last_export_time.elapsed() >= config.scheduled_delay - { - if let Err(err) = block_on(exporter.export(spans.split_off(0))) - { - otel_error!( - name: "BatchSpanProcessor.ExportError", - error = format!("{}", err) - ); - } - last_export_time = Instant::now(); - } + BatchMessage::ExportSpan(export_span_message_sent) => { + // Reset the export span message sent flag now it has has been processed. + export_span_message_sent.store(false, Ordering::Relaxed); + otel_debug!( + name: "BatchSpanProcessor.ExportingDueToBatchSize", + ); + let _ = Self::get_spans_and_export( + &span_receiver, + &mut exporter, + &mut spans, + &mut last_export_time, + ¤t_batch_size, + &config, + ); } BatchMessage::ForceFlush(sender) => { - let result = block_on(exporter.export(spans.split_off(0))); + otel_debug!(name: "BatchSpanProcessor.ExportingDueToForceFlush"); + let result = Self::get_spans_and_export( + &span_receiver, + &mut exporter, + &mut spans, + &mut last_export_time, + ¤t_batch_size, + &config, + ); let _ = sender.send(result); } BatchMessage::Shutdown(sender) => { - let result = block_on(exporter.export(spans.split_off(0))); + otel_debug!(name: "BatchSpanProcessor.ExportingDueToShutdown"); + let result = Self::get_spans_and_export( + &span_receiver, + &mut exporter, + &mut spans, + &mut last_export_time, + ¤t_batch_size, + &config, + ); let _ = sender.send(result); + + otel_debug!( + name: "BatchSpanProcessor.ThreadExiting", + reason = "ShutdownRequested" + ); + // + // break out the loop and return from the current background thread. + // break; } BatchMessage::SetResource(resource) => { @@ -308,15 +345,18 @@ impl BatchSpanProcessor { } }, Err(RecvTimeoutError::Timeout) => { - if last_export_time.elapsed() >= config.scheduled_delay { - if let Err(err) = block_on(exporter.export(spans.split_off(0))) { - otel_error!( - name: "BatchSpanProcessor.ExportError", - error = format!("{}", err) - ); - } - last_export_time = Instant::now(); - } + otel_debug!( + name: "BatchSpanProcessor.ExportingDueToTimer", + ); + + let _ = Self::get_spans_and_export( + &span_receiver, + &mut exporter, + &mut spans, + &mut last_export_time, + ¤t_batch_size, + &config, + ); } Err(RecvTimeoutError::Disconnected) => { // Channel disconnected, only thing to do is break @@ -336,12 +376,17 @@ impl BatchSpanProcessor { .expect("Failed to spawn thread"); //TODO: Handle thread spawn failure Self { + span_sender, message_sender, handle: Mutex::new(Some(handle)), forceflush_timeout: Duration::from_secs(5), // TODO: make this configurable shutdown_timeout: Duration::from_secs(5), // TODO: make this configurable is_shutdown: AtomicBool::new(false), dropped_span_count: Arc::new(AtomicUsize::new(0)), + max_queue_size, + export_span_message_sent: Arc::new(AtomicBool::new(false)), + current_batch_size, + max_export_batch_size, } } @@ -355,6 +400,72 @@ impl BatchSpanProcessor { config: BatchConfig::default(), } } + + // This method gets upto `max_export_batch_size` amount of spans from the channel and exports them. + // It returns the result of the export operation. + // It expects the span vec to be empty when it's called. + #[inline] + fn get_spans_and_export( + spans_receiver: &Receiver, + exporter: &mut E, + spans: &mut Vec, + last_export_time: &mut Instant, + current_batch_size: &AtomicUsize, + config: &BatchConfig, + ) -> ExportResult + where + E: SpanExporter + Send + Sync + 'static, + { + // Get upto `max_export_batch_size` amount of spans from the channel and push them to the span vec + while let Ok(span) = spans_receiver.try_recv() { + spans.push(span); + if spans.len() == config.max_export_batch_size { + break; + } + } + + let count_of_spans = spans.len(); // Count of spans that will be exported + let result = Self::export_with_timeout_sync( + config.max_export_timeout, + exporter, + spans, + last_export_time, + ); // This method clears the spans vec after exporting + + current_batch_size.fetch_sub(count_of_spans, Ordering::Relaxed); + result + } + + #[allow(clippy::vec_box)] + fn export_with_timeout_sync( + _: Duration, // TODO, enforcing timeout in exporter. + exporter: &mut E, + batch: &mut Vec, + last_export_time: &mut Instant, + ) -> ExportResult + where + E: SpanExporter + Send + Sync + 'static, + { + *last_export_time = Instant::now(); + + if batch.is_empty() { + return TraceResult::Ok(()); + } + + let export = exporter.export(batch.split_off(0)); + let export_result = futures_executor::block_on(export); + + match export_result { + Ok(_) => TraceResult::Ok(()), + Err(err) => { + otel_error!( + name: "BatchSpanProcessor.ExportError", + error = format!("{}", err) + ); + TraceResult::Err(err) + } + } + } } impl SpanProcessor for BatchSpanProcessor { @@ -369,10 +480,11 @@ impl SpanProcessor for BatchSpanProcessor { // this is a warning, as the user is trying to emit after the processor has been shutdown otel_warn!( name: "BatchSpanProcessor.Emit.ProcessorShutdown", + message = "BatchSpanProcessor has been shutdown. No further spans will be emitted." ); return; } - let result = self.message_sender.try_send(BatchMessage::ExportSpan(span)); + let result = self.span_sender.try_send(span); if result.is_err() { // Increment dropped span count. The first time we have to drop a span, @@ -382,6 +494,36 @@ impl SpanProcessor for BatchSpanProcessor { message = "BatchSpanProcessor dropped a Span due to queue full/internal errors. No further internal log will be emitted for further drops until Shutdown. During Shutdown time, a log will be emitted with exact count of total Spans dropped."); } } + // At this point, sending the span to the data channel was successful. + // Increment the current batch size and check if it has reached the max export batch size. + if self.current_batch_size.fetch_add(1, Ordering::Relaxed) + 1 >= self.max_export_batch_size + { + // Check if the a control message for exporting spans is already sent to the worker thread. + // If not, send a control message to export spans. + // `export_span_message_sent` is set to false ONLY when the worker thread has processed the control message. + + if !self.export_span_message_sent.load(Ordering::Relaxed) { + // This is a cost-efficient check as atomic load operations do not require exclusive access to cache line. + // Perform atomic swap to `export_span_message_sent` ONLY when the atomic load operation above returns false. + // Atomic swap/compare_exchange operations require exclusive access to cache line on most processor architectures. + // We could have used compare_exchange as well here, but it's more verbose than swap. + if !self.export_span_message_sent.swap(true, Ordering::Relaxed) { + match self.message_sender.try_send(BatchMessage::ExportSpan( + self.export_span_message_sent.clone(), + )) { + Ok(_) => { + // Control message sent successfully. + } + Err(_err) => { + // TODO: Log error + // If the control message could not be sent, reset the `export_span_message_sent` flag. + self.export_span_message_sent + .store(false, Ordering::Relaxed); + } + } + } + } + } } /// Flushes all pending spans. @@ -401,17 +543,20 @@ impl SpanProcessor for BatchSpanProcessor { /// Shuts down the processor. fn shutdown(&self) -> TraceResult<()> { + if self.is_shutdown.swap(true, Ordering::Relaxed) { + return Err(TraceError::Other("Processor already shutdown".into())); + } let dropped_spans = self.dropped_span_count.load(Ordering::Relaxed); + let max_queue_size = self.max_queue_size; if dropped_spans > 0 { otel_warn!( - name: "BatchSpanProcessor.LogsDropped", + name: "BatchSpanProcessor.SpansDropped", dropped_span_count = dropped_spans, + max_queue_size = max_queue_size, message = "Spans were dropped due to a queue being full or other error. The count represents the total count of spans dropped in the lifetime of this BatchSpanProcessor. Consider increasing the queue size and/or decrease delay between intervals." ); } - if self.is_shutdown.swap(true, Ordering::Relaxed) { - return Err(TraceError::Other("Processor already shutdown".into())); - } + let (sender, receiver) = sync_channel(1); self.message_sender .try_send(BatchMessage::Shutdown(sender)) From 90b0dd46c8005e2ca69fe85d4348275e243d7675 Mon Sep 17 00:00:00 2001 From: Cijo Thomas Date: Tue, 21 Jan 2025 17:38:46 -0800 Subject: [PATCH 3/6] Remove cardinality capping in Metrics (#2528) --- opentelemetry-sdk/CHANGELOG.md | 6 ++++++ opentelemetry-sdk/src/metrics/internal/aggregate.rs | 7 +++++-- opentelemetry-sdk/src/metrics/mod.rs | 2 ++ 3 files changed, 13 insertions(+), 2 deletions(-) diff --git a/opentelemetry-sdk/CHANGELOG.md b/opentelemetry-sdk/CHANGELOG.md index d1e4910523..b053853a11 100644 --- a/opentelemetry-sdk/CHANGELOG.md +++ b/opentelemetry-sdk/CHANGELOG.md @@ -257,6 +257,12 @@ metadata, a feature introduced in version 0.1.40. [#2418](https://github.com/ope - If you're implementing a custom runtime, you must explicitly enable the experimental_async_runtime` feature in your Cargo.toml and implement the required `Runtime` traits. +- Removed Metrics Cardinality Limit feature. This was originally introduced in +[#1066](https://github.com/open-telemetry/opentelemetry-rust/pull/1066) with a +hardcoded limit of 2000 and no ability to change it. This feature will be +re-introduced in a future date, along with the ability to change the cardinality +limit. + ## 0.27.1 Released 2024-Nov-27 diff --git a/opentelemetry-sdk/src/metrics/internal/aggregate.rs b/opentelemetry-sdk/src/metrics/internal/aggregate.rs index fc9d5975c3..8713bce3c4 100644 --- a/opentelemetry-sdk/src/metrics/internal/aggregate.rs +++ b/opentelemetry-sdk/src/metrics/internal/aggregate.rs @@ -18,8 +18,11 @@ use super::{ pub(crate) const STREAM_CARDINALITY_LIMIT: usize = 2000; /// Checks whether aggregator has hit cardinality limit for metric streams -pub(crate) fn is_under_cardinality_limit(size: usize) -> bool { - size < STREAM_CARDINALITY_LIMIT +pub(crate) fn is_under_cardinality_limit(_size: usize) -> bool { + true + + // TODO: Implement this feature, after allowing the ability to customize the cardinality limit. + // size < STREAM_CARDINALITY_LIMIT } /// Receives measurements to be aggregated. diff --git a/opentelemetry-sdk/src/metrics/mod.rs b/opentelemetry-sdk/src/metrics/mod.rs index 5faeba724a..a6a53a4f7a 100644 --- a/opentelemetry-sdk/src/metrics/mod.rs +++ b/opentelemetry-sdk/src/metrics/mod.rs @@ -259,11 +259,13 @@ mod tests { assert_eq!(data_point.value, 50, "Unexpected data point value"); } + #[ignore = "https://github.com/open-telemetry/opentelemetry-rust/issues/1065"] #[tokio::test(flavor = "multi_thread", worker_threads = 1)] async fn counter_aggregation_overflow_delta() { counter_aggregation_overflow_helper(Temporality::Delta); } + #[ignore = "https://github.com/open-telemetry/opentelemetry-rust/issues/1065"] #[tokio::test(flavor = "multi_thread", worker_threads = 1)] async fn counter_aggregation_overflow_cumulative() { counter_aggregation_overflow_helper(Temporality::Cumulative); From acaa98d8147fd7f97bd7076adcee025eb25ebc69 Mon Sep 17 00:00:00 2001 From: Cijo Thomas Date: Wed, 22 Jan 2025 10:15:42 -0800 Subject: [PATCH 4/6] Doc additions for Simple and Batch processors (#2529) Co-authored-by: Lalit Kumar Bhasin --- opentelemetry-sdk/src/lib.rs | 5 +-- opentelemetry-sdk/src/logs/log_emitter.rs | 36 +++++++++++++++++-- opentelemetry-sdk/src/logs/log_processor.rs | 13 ++++++- opentelemetry-sdk/src/trace/provider.rs | 36 +++++++++++++++++-- opentelemetry-sdk/src/trace/span_processor.rs | 18 ++++++++++ 5 files changed, 97 insertions(+), 11 deletions(-) diff --git a/opentelemetry-sdk/src/lib.rs b/opentelemetry-sdk/src/lib.rs index 4e2ef47ba7..3425dd17a3 100644 --- a/opentelemetry-sdk/src/lib.rs +++ b/opentelemetry-sdk/src/lib.rs @@ -44,10 +44,7 @@ //! [examples]: https://github.com/open-telemetry/opentelemetry-rust/tree/main/examples //! [`trace`]: https://docs.rs/opentelemetry/latest/opentelemetry/trace/index.html //! -//! # Metrics (Alpha) -//! -//! Note: the metrics implementation is **still in progress** and **subject to major -//! changes**. +//! # Metrics //! //! ### Creating instruments and recording measurements //! diff --git a/opentelemetry-sdk/src/logs/log_emitter.rs b/opentelemetry-sdk/src/logs/log_emitter.rs index ba63896277..4d5e950fb5 100644 --- a/opentelemetry-sdk/src/logs/log_emitter.rs +++ b/opentelemetry-sdk/src/logs/log_emitter.rs @@ -185,7 +185,17 @@ pub struct Builder { } impl Builder { - /// The `LogExporter` that this provider should use. + /// Adds a [SimpleLogProcessor] with the configured exporter to the pipeline. + /// + /// # Arguments + /// + /// * `exporter` - The exporter to be used by the SimpleLogProcessor. + /// + /// # Returns + /// + /// A new `Builder` instance with the SimpleLogProcessor added to the pipeline. + /// + /// Processors are invoked in the order they are added. pub fn with_simple_exporter(self, exporter: T) -> Self { let mut processors = self.processors; processors.push(Box::new(SimpleLogProcessor::new(exporter))); @@ -193,13 +203,33 @@ impl Builder { Builder { processors, ..self } } - /// The `LogExporter` setup using a default `BatchLogProcessor` that this provider should use. + /// Adds a [BatchLogProcessor] with the configured exporter to the pipeline. + /// + /// # Arguments + /// + /// * `exporter` - The exporter to be used by the BatchLogProcessor. + /// + /// # Returns + /// + /// A new `Builder` instance with the BatchLogProcessor added to the pipeline. + /// + /// Processors are invoked in the order they are added. pub fn with_batch_exporter(self, exporter: T) -> Self { let batch = BatchLogProcessor::builder(exporter).build(); self.with_log_processor(batch) } - /// The `LogProcessor` that this provider should use. + /// Adds a custom [LogProcessor] to the pipeline. + /// + /// # Arguments + /// + /// * `processor` - The `LogProcessor` to be added. + /// + /// # Returns + /// + /// A new `Builder` instance with the custom `LogProcessor` added to the pipeline. + /// + /// Processors are invoked in the order they are added. pub fn with_log_processor(self, processor: T) -> Self { let mut processors = self.processors; processors.push(Box::new(processor)); diff --git a/opentelemetry-sdk/src/logs/log_processor.rs b/opentelemetry-sdk/src/logs/log_processor.rs index 9135141574..c7898933e5 100644 --- a/opentelemetry-sdk/src/logs/log_processor.rs +++ b/opentelemetry-sdk/src/logs/log_processor.rs @@ -104,7 +104,18 @@ pub trait LogProcessor: Send + Sync + Debug { } /// A [`LogProcessor`] designed for testing and debugging purpose, that immediately -/// exports log records as they are emitted. +/// exports log records as they are emitted. Log records are exported synchronously +/// in the same thread that emits the log record. +/// When using this processor with the OTLP Exporter, the following exporter +/// features are supported: +/// - `grpc-tonic`: This requires LoggerProvider to be created within a tokio +/// runtime. Logs can be emitted from any thread, including tokio runtime +/// threads. +/// - `reqwest-blocking-client`: LoggerProvider may be created anywhere, but +/// logs must be emitted from a non-tokio runtime thread. +/// - `reqwest-client`: LoggerProvider may be created anywhere, but logs must be +/// emitted from a tokio runtime thread. +/// /// ## Example /// /// ### Using a SimpleLogProcessor diff --git a/opentelemetry-sdk/src/trace/provider.rs b/opentelemetry-sdk/src/trace/provider.rs index 447404cbb0..43761c5107 100644 --- a/opentelemetry-sdk/src/trace/provider.rs +++ b/opentelemetry-sdk/src/trace/provider.rs @@ -280,7 +280,17 @@ pub struct Builder { } impl Builder { - /// The `SpanExporter` that this provider should use. + /// Adds a [SimpleSpanProcessor] with the configured exporter to the pipeline. + /// + /// # Arguments + /// + /// * `exporter` - The exporter to be used by the SimpleSpanProcessor. + /// + /// # Returns + /// + /// A new `Builder` instance with the SimpleSpanProcessor added to the pipeline. + /// + /// Processors are invoked in the order they are added. pub fn with_simple_exporter(self, exporter: T) -> Self { let mut processors = self.processors; processors.push(Box::new(SimpleSpanProcessor::new(Box::new(exporter)))); @@ -288,13 +298,33 @@ impl Builder { Builder { processors, ..self } } - /// The [`SpanExporter`] setup using a default [`BatchSpanProcessor`] that this provider should use. + /// Adds a [BatchSpanProcessor] with the configured exporter to the pipeline. + /// + /// # Arguments + /// + /// * `exporter` - The exporter to be used by the BatchSpanProcessor. + /// + /// # Returns + /// + /// A new `Builder` instance with the BatchSpanProcessor added to the pipeline. + /// + /// Processors are invoked in the order they are added. pub fn with_batch_exporter(self, exporter: T) -> Self { let batch = BatchSpanProcessor::builder(exporter).build(); self.with_span_processor(batch) } - /// The [`SpanProcessor`] that this provider should use. + /// Adds a custom [SpanProcessor] to the pipeline. + /// + /// # Arguments + /// + /// * `processor` - The `SpanProcessor` to be added. + /// + /// # Returns + /// + /// A new `Builder` instance with the custom `SpanProcessor` added to the pipeline. + /// + /// Processors are invoked in the order they are added. pub fn with_span_processor(self, processor: T) -> Self { let mut processors = self.processors; processors.push(Box::new(processor)); diff --git a/opentelemetry-sdk/src/trace/span_processor.rs b/opentelemetry-sdk/src/trace/span_processor.rs index 87bf76b9c2..f41ddd178e 100644 --- a/opentelemetry-sdk/src/trace/span_processor.rs +++ b/opentelemetry-sdk/src/trace/span_processor.rs @@ -102,6 +102,17 @@ pub trait SpanProcessor: Send + Sync + std::fmt::Debug { /// `SpanExporter`, as soon as they are finished, without any batching. This is /// typically useful for debugging and testing. For scenarios requiring higher /// performance/throughput, consider using [BatchSpanProcessor]. +/// Spans are exported synchronously +/// in the same thread that emits the log record. +/// When using this processor with the OTLP Exporter, the following exporter +/// features are supported: +/// - `grpc-tonic`: This requires TracerProvider to be created within a tokio +/// runtime. Spans can be emitted from any thread, including tokio runtime +/// threads. +/// - `reqwest-blocking-client`: TracerProvider may be created anywhere, but +/// spans must be emitted from a non-tokio runtime thread. +/// - `reqwest-client`: TracerProvider may be created anywhere, but spans must be +/// emitted from a tokio runtime thread. #[derive(Debug)] pub struct SimpleSpanProcessor { exporter: Mutex>, @@ -171,6 +182,13 @@ use crate::export::trace::ExportResult; /// individually. It uses a **dedicated background thread** to manage and export spans /// asynchronously, ensuring that the application's main execution flow is not blocked. /// +/// When using this processor with the OTLP Exporter, the following exporter +/// features are supported: +/// - `grpc-tonic`: This requires `TracerProvider` to be created within a tokio +/// runtime. +/// - `reqwest-blocking-client`: Works with a regular `main` or `tokio::main`. +/// +/// In other words, other clients like `reqwest` and `hyper` are not supported. /// /// # Example /// /// This example demonstrates how to configure and use the `BatchSpanProcessor` From b1debf0797c4cc8ce83095138f3e14e84a570906 Mon Sep 17 00:00:00 2001 From: Tom Tan Date: Wed, 22 Jan 2025 15:10:02 -0800 Subject: [PATCH 5/6] Short circuit the event_enabled check (#2533) --- opentelemetry-sdk/src/logs/log_emitter.rs | 11 ++++------- 1 file changed, 4 insertions(+), 7 deletions(-) diff --git a/opentelemetry-sdk/src/logs/log_emitter.rs b/opentelemetry-sdk/src/logs/log_emitter.rs index 4d5e950fb5..497cbb6db1 100644 --- a/opentelemetry-sdk/src/logs/log_emitter.rs +++ b/opentelemetry-sdk/src/logs/log_emitter.rs @@ -323,13 +323,10 @@ impl opentelemetry::logs::Logger for Logger { #[cfg(feature = "spec_unstable_logs_enabled")] fn event_enabled(&self, level: Severity, target: &str) -> bool { - let provider = &self.provider; - - let mut enabled = false; - for processor in provider.log_processors() { - enabled = enabled || processor.event_enabled(level, target, self.scope.name().as_ref()); - } - enabled + self.provider + .log_processors() + .iter() + .any(|processor| processor.event_enabled(level, target, self.scope.name().as_ref())) } } From 57d129734c895029c74a1bca2204f80810c66d21 Mon Sep 17 00:00:00 2001 From: Cijo Thomas Date: Wed, 22 Jan 2025 16:50:18 -0800 Subject: [PATCH 6/6] Refactor LogExporter mod (#2534) --- opentelemetry-appender-tracing/benches/logs.rs | 2 +- opentelemetry-appender-tracing/src/layer.rs | 2 +- opentelemetry-otlp/src/exporter/http/logs.rs | 2 +- opentelemetry-otlp/src/exporter/http/mod.rs | 4 ++-- opentelemetry-otlp/src/exporter/tonic/logs.rs | 2 +- opentelemetry-otlp/src/logs.rs | 4 ++-- opentelemetry-proto/src/transform/logs.rs | 4 ++-- opentelemetry-sdk/CHANGELOG.md | 7 +++++++ opentelemetry-sdk/benches/log_exporter.rs | 2 +- opentelemetry-sdk/src/export/mod.rs | 4 ---- .../src/{export/logs/mod.rs => logs/export.rs} | 0 opentelemetry-sdk/src/logs/log_emitter.rs | 4 ++-- opentelemetry-sdk/src/logs/log_processor.rs | 8 +++----- .../src/logs/log_processor_with_async_runtime.rs | 5 ++--- opentelemetry-sdk/src/logs/mod.rs | 2 ++ opentelemetry-sdk/src/testing/logs/in_memory_exporter.rs | 3 +-- opentelemetry-stdout/src/logs/exporter.rs | 4 ++-- opentelemetry-stdout/src/logs/mod.rs | 2 +- stress/src/logs.rs | 2 +- 19 files changed, 32 insertions(+), 31 deletions(-) rename opentelemetry-sdk/src/{export/logs/mod.rs => logs/export.rs} (100%) diff --git a/opentelemetry-appender-tracing/benches/logs.rs b/opentelemetry-appender-tracing/benches/logs.rs index e5fb98273c..6c09f5c966 100644 --- a/opentelemetry-appender-tracing/benches/logs.rs +++ b/opentelemetry-appender-tracing/benches/logs.rs @@ -16,8 +16,8 @@ use criterion::{criterion_group, criterion_main, Criterion}; use opentelemetry::InstrumentationScope; use opentelemetry_appender_tracing::layer as tracing_layer; -use opentelemetry_sdk::export::logs::{LogBatch, LogExporter}; use opentelemetry_sdk::logs::LogResult; +use opentelemetry_sdk::logs::{LogBatch, LogExporter}; use opentelemetry_sdk::logs::{LogProcessor, LogRecord, LoggerProvider}; use opentelemetry_sdk::Resource; use pprof::criterion::{Output, PProfProfiler}; diff --git a/opentelemetry-appender-tracing/src/layer.rs b/opentelemetry-appender-tracing/src/layer.rs index af752c5f8e..32a8a5b1f0 100644 --- a/opentelemetry-appender-tracing/src/layer.rs +++ b/opentelemetry-appender-tracing/src/layer.rs @@ -213,7 +213,7 @@ mod tests { use opentelemetry::trace::TracerProvider as _; use opentelemetry::trace::{TraceContextExt, TraceFlags, Tracer}; use opentelemetry::{logs::AnyValue, Key}; - use opentelemetry_sdk::export::logs::{LogBatch, LogExporter}; + use opentelemetry_sdk::logs::{LogBatch, LogExporter}; use opentelemetry_sdk::logs::{LogRecord, LogResult, LoggerProvider}; use opentelemetry_sdk::testing::logs::InMemoryLogExporter; use opentelemetry_sdk::trace::{Sampler, TracerProvider}; diff --git a/opentelemetry-otlp/src/exporter/http/logs.rs b/opentelemetry-otlp/src/exporter/http/logs.rs index 9d00602eed..8b828730cd 100644 --- a/opentelemetry-otlp/src/exporter/http/logs.rs +++ b/opentelemetry-otlp/src/exporter/http/logs.rs @@ -2,7 +2,7 @@ use std::sync::Arc; use http::{header::CONTENT_TYPE, Method}; use opentelemetry::otel_debug; -use opentelemetry_sdk::export::logs::{LogBatch, LogExporter}; +use opentelemetry_sdk::logs::{LogBatch, LogExporter}; use opentelemetry_sdk::logs::{LogError, LogResult}; use super::OtlpHttpClient; diff --git a/opentelemetry-otlp/src/exporter/http/mod.rs b/opentelemetry-otlp/src/exporter/http/mod.rs index 4d1af8c880..87f425aca4 100644 --- a/opentelemetry-otlp/src/exporter/http/mod.rs +++ b/opentelemetry-otlp/src/exporter/http/mod.rs @@ -13,10 +13,10 @@ use opentelemetry_proto::transform::common::tonic::ResourceAttributesWithSchema; use opentelemetry_proto::transform::logs::tonic::group_logs_by_resource_and_scope; #[cfg(feature = "trace")] use opentelemetry_proto::transform::trace::tonic::group_spans_by_resource_and_scope; -#[cfg(feature = "logs")] -use opentelemetry_sdk::export::logs::LogBatch; #[cfg(feature = "trace")] use opentelemetry_sdk::export::trace::SpanData; +#[cfg(feature = "logs")] +use opentelemetry_sdk::logs::LogBatch; use prost::Message; use std::collections::HashMap; use std::env; diff --git a/opentelemetry-otlp/src/exporter/tonic/logs.rs b/opentelemetry-otlp/src/exporter/tonic/logs.rs index 053331b428..a4d276fd8f 100644 --- a/opentelemetry-otlp/src/exporter/tonic/logs.rs +++ b/opentelemetry-otlp/src/exporter/tonic/logs.rs @@ -3,7 +3,7 @@ use opentelemetry::otel_debug; use opentelemetry_proto::tonic::collector::logs::v1::{ logs_service_client::LogsServiceClient, ExportLogsServiceRequest, }; -use opentelemetry_sdk::export::logs::{LogBatch, LogExporter}; +use opentelemetry_sdk::logs::{LogBatch, LogExporter}; use opentelemetry_sdk::logs::{LogError, LogResult}; use tonic::{codegen::CompressionEncoding, service::Interceptor, transport::Channel, Request}; diff --git a/opentelemetry-otlp/src/logs.rs b/opentelemetry-otlp/src/logs.rs index aa4ea8fa07..3b17c30feb 100644 --- a/opentelemetry-otlp/src/logs.rs +++ b/opentelemetry-otlp/src/logs.rs @@ -8,7 +8,7 @@ use std::fmt::Debug; use opentelemetry_sdk::logs::LogResult; -use opentelemetry_sdk::export::logs::LogBatch; +use opentelemetry_sdk::logs::LogBatch; use crate::{HasExportConfig, NoExporterBuilderSet}; @@ -140,7 +140,7 @@ impl LogExporter { } } -impl opentelemetry_sdk::export::logs::LogExporter for LogExporter { +impl opentelemetry_sdk::logs::LogExporter for LogExporter { #[allow(clippy::manual_async_fn)] fn export( &self, diff --git a/opentelemetry-proto/src/transform/logs.rs b/opentelemetry-proto/src/transform/logs.rs index b6f28490d7..37be35709b 100644 --- a/opentelemetry-proto/src/transform/logs.rs +++ b/opentelemetry-proto/src/transform/logs.rs @@ -12,7 +12,7 @@ pub mod tonic { transform::common::{to_nanos, tonic::ResourceAttributesWithSchema}, }; use opentelemetry::logs::{AnyValue as LogsAnyValue, Severity}; - use opentelemetry_sdk::export::logs::LogBatch; + use opentelemetry_sdk::logs::LogBatch; use std::borrow::Cow; use std::collections::HashMap; @@ -222,7 +222,7 @@ mod tests { use crate::transform::common::tonic::ResourceAttributesWithSchema; use opentelemetry::logs::LogRecord as _; use opentelemetry::InstrumentationScope; - use opentelemetry_sdk::{export::logs::LogBatch, logs::LogRecord, Resource}; + use opentelemetry_sdk::{logs::LogBatch, logs::LogRecord, Resource}; use std::time::SystemTime; fn create_test_log_data( diff --git a/opentelemetry-sdk/CHANGELOG.md b/opentelemetry-sdk/CHANGELOG.md index b053853a11..61e9207cdc 100644 --- a/opentelemetry-sdk/CHANGELOG.md +++ b/opentelemetry-sdk/CHANGELOG.md @@ -263,6 +263,13 @@ hardcoded limit of 2000 and no ability to change it. This feature will be re-introduced in a future date, along with the ability to change the cardinality limit. +- Refactor modules. This is *Breaking* change, if you author custom + LogExporter,LogProcessor. + before: + `opentelemetry_sdk::export::logs::{ExportResult, LogBatch, LogExporter};` + now: + `opentelemetry_sdk::logs::{ExportResult, LogBatch, LogExporter}` + ## 0.27.1 Released 2024-Nov-27 diff --git a/opentelemetry-sdk/benches/log_exporter.rs b/opentelemetry-sdk/benches/log_exporter.rs index c2ecb78ce9..523725fc7e 100644 --- a/opentelemetry-sdk/benches/log_exporter.rs +++ b/opentelemetry-sdk/benches/log_exporter.rs @@ -20,7 +20,7 @@ use opentelemetry::logs::{LogRecord as _, Logger as _, LoggerProvider as _, Seve use opentelemetry_sdk::logs::LogResult; use opentelemetry::InstrumentationScope; -use opentelemetry_sdk::export::logs::LogBatch; +use opentelemetry_sdk::logs::LogBatch; use opentelemetry_sdk::logs::LogProcessor; use opentelemetry_sdk::logs::LogRecord; use opentelemetry_sdk::logs::LoggerProvider; diff --git a/opentelemetry-sdk/src/export/mod.rs b/opentelemetry-sdk/src/export/mod.rs index 21dc2b570c..92f111c4e4 100644 --- a/opentelemetry-sdk/src/export/mod.rs +++ b/opentelemetry-sdk/src/export/mod.rs @@ -1,9 +1,5 @@ //! Telemetry Export -#[cfg(feature = "logs")] -#[cfg_attr(docsrs, doc(cfg(feature = "logs")))] -pub mod logs; - #[cfg(feature = "trace")] #[cfg_attr(docsrs, doc(cfg(feature = "trace")))] pub mod trace; diff --git a/opentelemetry-sdk/src/export/logs/mod.rs b/opentelemetry-sdk/src/logs/export.rs similarity index 100% rename from opentelemetry-sdk/src/export/logs/mod.rs rename to opentelemetry-sdk/src/logs/export.rs diff --git a/opentelemetry-sdk/src/logs/log_emitter.rs b/opentelemetry-sdk/src/logs/log_emitter.rs index 497cbb6db1..f6b66d2c2c 100644 --- a/opentelemetry-sdk/src/logs/log_emitter.rs +++ b/opentelemetry-sdk/src/logs/log_emitter.rs @@ -1,6 +1,6 @@ use super::{BatchLogProcessor, LogProcessor, LogRecord, SimpleLogProcessor, TraceContext}; -use crate::{export::logs::LogExporter, Resource}; -use crate::{logs::LogError, logs::LogResult}; +use crate::Resource; +use crate::{logs::LogError, logs::LogExporter, logs::LogResult}; use opentelemetry::{otel_debug, otel_info, trace::TraceContextExt, Context, InstrumentationScope}; #[cfg(feature = "spec_unstable_logs_enabled")] diff --git a/opentelemetry-sdk/src/logs/log_processor.rs b/opentelemetry-sdk/src/logs/log_processor.rs index c7898933e5..15eeb5361a 100644 --- a/opentelemetry-sdk/src/logs/log_processor.rs +++ b/opentelemetry-sdk/src/logs/log_processor.rs @@ -32,8 +32,7 @@ //! ``` use crate::{ - export::logs::{ExportResult, LogBatch, LogExporter}, - logs::{LogError, LogRecord, LogResult}, + logs::{ExportResult, LogBatch, LogError, LogExporter, LogRecord, LogResult}, Resource, }; use std::sync::mpsc::{self, RecvTimeoutError, SyncSender}; @@ -123,7 +122,7 @@ pub trait LogProcessor: Send + Sync + Debug { /// ```rust /// use opentelemetry_sdk::logs::{SimpleLogProcessor, LoggerProvider}; /// use opentelemetry::global; -/// use opentelemetry_sdk::export::logs::LogExporter; +/// use opentelemetry_sdk::logs::LogExporter; /// use opentelemetry_sdk::testing::logs::InMemoryLogExporter; /// /// let exporter = InMemoryLogExporter::default(); // Replace with an actual exporter @@ -818,9 +817,8 @@ mod tests { BatchLogProcessor, OTEL_BLRP_EXPORT_TIMEOUT, OTEL_BLRP_MAX_EXPORT_BATCH_SIZE, OTEL_BLRP_MAX_QUEUE_SIZE, OTEL_BLRP_SCHEDULE_DELAY, }; - use crate::export::logs::{LogBatch, LogExporter}; - use crate::logs::LogRecord; use crate::logs::LogResult; + use crate::logs::{LogBatch, LogExporter, LogRecord}; use crate::testing::logs::InMemoryLogExporterBuilder; use crate::{ logs::{ diff --git a/opentelemetry-sdk/src/logs/log_processor_with_async_runtime.rs b/opentelemetry-sdk/src/logs/log_processor_with_async_runtime.rs index b5a3df2197..ccd918be5d 100644 --- a/opentelemetry-sdk/src/logs/log_processor_with_async_runtime.rs +++ b/opentelemetry-sdk/src/logs/log_processor_with_async_runtime.rs @@ -1,6 +1,5 @@ use crate::{ - export::logs::{ExportResult, LogBatch, LogExporter}, - logs::{LogError, LogRecord, LogResult}, + logs::{ExportResult, LogBatch, LogError, LogExporter, LogRecord, LogResult}, Resource, }; @@ -282,7 +281,6 @@ where #[cfg(all(test, feature = "testing", feature = "logs"))] mod tests { - use crate::export::logs::{LogBatch, LogExporter}; use crate::logs::log_processor::{ OTEL_BLRP_EXPORT_TIMEOUT, OTEL_BLRP_MAX_EXPORT_BATCH_SIZE, OTEL_BLRP_MAX_QUEUE_SIZE, OTEL_BLRP_SCHEDULE_DELAY, @@ -290,6 +288,7 @@ mod tests { use crate::logs::log_processor_with_async_runtime::BatchLogProcessor; use crate::logs::LogRecord; use crate::logs::LogResult; + use crate::logs::{LogBatch, LogExporter}; use crate::runtime; use crate::testing::logs::InMemoryLogExporterBuilder; use crate::{ diff --git a/opentelemetry-sdk/src/logs/mod.rs b/opentelemetry-sdk/src/logs/mod.rs index 97ae74ee85..9fa8824001 100644 --- a/opentelemetry-sdk/src/logs/mod.rs +++ b/opentelemetry-sdk/src/logs/mod.rs @@ -1,10 +1,12 @@ //! # OpenTelemetry Log SDK mod error; +mod export; mod log_emitter; mod log_processor; pub(crate) mod record; pub use error::{LogError, LogResult}; +pub use export::{ExportResult, LogBatch, LogExporter}; pub use log_emitter::{Builder, Logger, LoggerProvider}; pub use log_processor::{ BatchConfig, BatchConfigBuilder, BatchLogProcessor, BatchLogProcessorBuilder, LogProcessor, diff --git a/opentelemetry-sdk/src/testing/logs/in_memory_exporter.rs b/opentelemetry-sdk/src/testing/logs/in_memory_exporter.rs index dff6d93c7e..a59b8e4c2d 100644 --- a/opentelemetry-sdk/src/testing/logs/in_memory_exporter.rs +++ b/opentelemetry-sdk/src/testing/logs/in_memory_exporter.rs @@ -1,6 +1,5 @@ -use crate::export::logs::{LogBatch, LogExporter}; use crate::logs::LogRecord; -use crate::logs::{LogError, LogResult}; +use crate::logs::{LogBatch, LogError, LogExporter, LogResult}; use crate::Resource; use opentelemetry::InstrumentationScope; use std::borrow::Cow; diff --git a/opentelemetry-stdout/src/logs/exporter.rs b/opentelemetry-stdout/src/logs/exporter.rs index 6313474dd1..b4b1c56d9e 100644 --- a/opentelemetry-stdout/src/logs/exporter.rs +++ b/opentelemetry-stdout/src/logs/exporter.rs @@ -1,6 +1,6 @@ use chrono::{DateTime, Utc}; use core::fmt; -use opentelemetry_sdk::export::logs::LogBatch; +use opentelemetry_sdk::logs::LogBatch; use opentelemetry_sdk::logs::LogResult; use opentelemetry_sdk::Resource; use std::sync::atomic; @@ -29,7 +29,7 @@ impl fmt::Debug for LogExporter { } } -impl opentelemetry_sdk::export::logs::LogExporter for LogExporter { +impl opentelemetry_sdk::logs::LogExporter for LogExporter { /// Export spans to stdout #[allow(clippy::manual_async_fn)] fn export( diff --git a/opentelemetry-stdout/src/logs/mod.rs b/opentelemetry-stdout/src/logs/mod.rs index 76a8b1debe..8c48429366 100644 --- a/opentelemetry-stdout/src/logs/mod.rs +++ b/opentelemetry-stdout/src/logs/mod.rs @@ -2,7 +2,7 @@ //! //! The stdout [`LogExporter`] writes debug printed [`LogRecord`]s to Stdout. //! -//! [`LogExporter`]: opentelemetry_sdk::export::logs::LogExporter +//! [`LogExporter`]: opentelemetry_sdk::logs::LogExporter //! [`LogRecord`]: opentelemetry::logs::LogRecord mod exporter; pub use exporter::*; diff --git a/stress/src/logs.rs b/stress/src/logs.rs index 2242d48eea..4119ac0d35 100644 --- a/stress/src/logs.rs +++ b/stress/src/logs.rs @@ -11,7 +11,7 @@ use opentelemetry::InstrumentationScope; use opentelemetry_appender_tracing::layer; -use opentelemetry_sdk::export::logs::{LogBatch, LogExporter}; +use opentelemetry_sdk::logs::{LogBatch, LogExporter}; use opentelemetry_sdk::logs::{LogProcessor, LogRecord, LogResult, LoggerProvider}; use tracing::error;