From 7a9076032d270ae283c0549c7ef2f03833bbd2e5 Mon Sep 17 00:00:00 2001 From: Cijo Thomas Date: Mon, 29 Jul 2024 11:07:07 -0700 Subject: [PATCH] Apply changes from Span BatchProcessor to Log BatchProcessor (#1973) --- opentelemetry-sdk/CHANGELOG.md | 10 +++++++--- opentelemetry-sdk/src/logs/log_processor.rs | 12 ++++++++---- 2 files changed, 15 insertions(+), 7 deletions(-) diff --git a/opentelemetry-sdk/CHANGELOG.md b/opentelemetry-sdk/CHANGELOG.md index 717c73d0f3..410c942ac0 100644 --- a/opentelemetry-sdk/CHANGELOG.md +++ b/opentelemetry-sdk/CHANGELOG.md @@ -3,11 +3,15 @@ ## vNext - `opentelemetry_sdk::logs::record::LogRecord` and `opentelemetry_sdk::logs::record::TraceContext` derive from `PartialEq` to facilitate Unit Testing. -- Fixed an issue causing a panic during shutdown when using the `TokioCurrentThread` tracing batch processor. +- Fixed an issue causing a panic during shutdown when using the + `TokioCurrentThread` in BatchExportProcessor for traces and logs. [#1964](https://github.com/open-telemetry/opentelemetry-rust/pull/1964) -- Fix BatchSpanProcessor to trigger first export at the first interval - instead of doing it right away. + [#1973](https://github.com/open-telemetry/opentelemetry-rust/pull/1973) +- Fix BatchExportProcessor for traces and logs to trigger first export at the + first interval instead of doing it right away. [#1970](https://github.com/open-telemetry/opentelemetry-rust/pull/1970) + [#1973](https://github.com/open-telemetry/opentelemetry-rust/pull/1973) + ## v0.24.1 diff --git a/opentelemetry-sdk/src/logs/log_processor.rs b/opentelemetry-sdk/src/logs/log_processor.rs index 95a0378a2d..57bb8afe71 100644 --- a/opentelemetry-sdk/src/logs/log_processor.rs +++ b/opentelemetry-sdk/src/logs/log_processor.rs @@ -201,13 +201,17 @@ impl BatchLogProcessor { pub(crate) fn new(mut exporter: Box, config: BatchConfig, runtime: R) -> Self { let (message_sender, message_receiver) = runtime.batch_message_channel(config.max_queue_size); - let ticker = runtime - .interval(config.scheduled_delay) - .map(|_| BatchMessage::Flush(None)); - let timeout_runtime = runtime.clone(); + let inner_runtime = runtime.clone(); // Spawn worker process via user-defined spawn function. runtime.spawn(Box::pin(async move { + // Timer will take a reference to the current runtime, so its important we do this within the + // runtime.spawn() + let ticker = inner_runtime + .interval(config.scheduled_delay) + .skip(1) // The ticker is fired immediately, so we should skip the first one to align with the interval. + .map(|_| BatchMessage::Flush(None)); + let timeout_runtime = inner_runtime.clone(); let mut logs = Vec::new(); let mut messages = Box::pin(stream::select(message_receiver, ticker));