Skip to content

Commit

Permalink
Apply changes from Span BatchProcessor to Log BatchProcessor (#1973)
Browse files Browse the repository at this point in the history
  • Loading branch information
cijothomas authored Jul 29, 2024
1 parent cd59346 commit 7a90760
Show file tree
Hide file tree
Showing 2 changed files with 15 additions and 7 deletions.
10 changes: 7 additions & 3 deletions opentelemetry-sdk/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,15 @@
## vNext

- `opentelemetry_sdk::logs::record::LogRecord` and `opentelemetry_sdk::logs::record::TraceContext` derive from `PartialEq` to facilitate Unit Testing.
- Fixed an issue causing a panic during shutdown when using the `TokioCurrentThread` tracing batch processor.
- Fixed an issue causing a panic during shutdown when using the
`TokioCurrentThread` in BatchExportProcessor for traces and logs.
[#1964](https://github.com/open-telemetry/opentelemetry-rust/pull/1964)
- Fix BatchSpanProcessor to trigger first export at the first interval
instead of doing it right away.
[#1973](https://github.com/open-telemetry/opentelemetry-rust/pull/1973)
- Fix BatchExportProcessor for traces and logs to trigger first export at the
first interval instead of doing it right away.
[#1970](https://github.com/open-telemetry/opentelemetry-rust/pull/1970)
[#1973](https://github.com/open-telemetry/opentelemetry-rust/pull/1973)


## v0.24.1

Expand Down
12 changes: 8 additions & 4 deletions opentelemetry-sdk/src/logs/log_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -201,13 +201,17 @@ impl<R: RuntimeChannel> BatchLogProcessor<R> {
pub(crate) fn new(mut exporter: Box<dyn LogExporter>, config: BatchConfig, runtime: R) -> Self {
let (message_sender, message_receiver) =
runtime.batch_message_channel(config.max_queue_size);
let ticker = runtime
.interval(config.scheduled_delay)
.map(|_| BatchMessage::Flush(None));
let timeout_runtime = runtime.clone();
let inner_runtime = runtime.clone();

// Spawn worker process via user-defined spawn function.
runtime.spawn(Box::pin(async move {
// Timer will take a reference to the current runtime, so its important we do this within the
// runtime.spawn()
let ticker = inner_runtime
.interval(config.scheduled_delay)
.skip(1) // The ticker is fired immediately, so we should skip the first one to align with the interval.
.map(|_| BatchMessage::Flush(None));
let timeout_runtime = inner_runtime.clone();
let mut logs = Vec::new();
let mut messages = Box::pin(stream::select(message_receiver, ticker));

Expand Down

0 comments on commit 7a90760

Please sign in to comment.