From 905ffbc5d8aaee55c621c5b9cf81aa3d9e2a2cb2 Mon Sep 17 00:00:00 2001 From: Tom Tan Date: Tue, 5 Nov 2024 13:58:33 -0800 Subject: [PATCH 01/14] Add force_flush to LogExporter --- opentelemetry-sdk/src/export/logs/mod.rs | 20 ++++++++++++++++++++ opentelemetry-sdk/src/logs/log_processor.rs | 3 ++- 2 files changed, 22 insertions(+), 1 deletion(-) diff --git a/opentelemetry-sdk/src/export/logs/mod.rs b/opentelemetry-sdk/src/export/logs/mod.rs index 2656544ce0..5516483459 100644 --- a/opentelemetry-sdk/src/export/logs/mod.rs +++ b/opentelemetry-sdk/src/export/logs/mod.rs @@ -2,6 +2,7 @@ use crate::logs::LogRecord; use crate::Resource; use async_trait::async_trait; +use futures_util::future::BoxFuture; #[cfg(feature = "logs_level_enabled")] use opentelemetry::logs::Severity; use opentelemetry::logs::{LogError, LogResult}; @@ -91,6 +92,25 @@ pub trait LogExporter: Send + Sync + Debug { // By default, all logs are enabled true } + + /// This is a hint to ensure that the export of any Spans the exporter + /// has received prior to the call to this function SHOULD be completed + /// as soon as possible, preferably before returning from this method. + /// + /// This function SHOULD provide a way to let the caller know + /// whether it succeeded, failed or timed out. + /// + /// This function SHOULD only be called in cases where it is absolutely necessary, + /// such as when using some FaaS providers that may suspend the process after + /// an invocation, but before the exporter exports the completed spans. + /// + /// This function SHOULD complete or abort within some timeout. This function can be + /// implemented as a blocking API or an asynchronous API which notifies the caller via + /// a callback or an event. OpenTelemetry client authors can decide if they want to + /// make the flush timeout configurable. + fn force_flush(&mut self) -> BoxFuture<'static, ExportResult> { + Box::pin(async { Ok(()) }) + } /// Set the resource for the exporter. fn set_resource(&mut self, _resource: &Resource) {} } diff --git a/opentelemetry-sdk/src/logs/log_processor.rs b/opentelemetry-sdk/src/logs/log_processor.rs index fa5308159f..8d657fe04c 100644 --- a/opentelemetry-sdk/src/logs/log_processor.rs +++ b/opentelemetry-sdk/src/logs/log_processor.rs @@ -261,7 +261,8 @@ impl BatchLogProcessor { &timeout_runtime, logs.split_off(0), ) - .await; + .await + .and(exporter.as_mut().force_flush().await); if let Some(channel) = res_channel { if let Err(send_error) = channel.send(result) { From 9cc40c14f4c4d3b3344eeb63f91457f0750c60e0 Mon Sep 17 00:00:00 2001 From: Tom Tan Date: Wed, 6 Nov 2024 10:45:36 -0800 Subject: [PATCH 02/14] Fix outdated comments --- opentelemetry-sdk/src/export/logs/mod.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/opentelemetry-sdk/src/export/logs/mod.rs b/opentelemetry-sdk/src/export/logs/mod.rs index 5516483459..485dc49c09 100644 --- a/opentelemetry-sdk/src/export/logs/mod.rs +++ b/opentelemetry-sdk/src/export/logs/mod.rs @@ -93,7 +93,7 @@ pub trait LogExporter: Send + Sync + Debug { true } - /// This is a hint to ensure that the export of any Spans the exporter + /// This is a hint to ensure that the export of any Logs the exporter /// has received prior to the call to this function SHOULD be completed /// as soon as possible, preferably before returning from this method. /// @@ -102,7 +102,7 @@ pub trait LogExporter: Send + Sync + Debug { /// /// This function SHOULD only be called in cases where it is absolutely necessary, /// such as when using some FaaS providers that may suspend the process after - /// an invocation, but before the exporter exports the completed spans. + /// an invocation, but before the exporter exports the completed logs. /// /// This function SHOULD complete or abort within some timeout. This function can be /// implemented as a blocking API or an asynchronous API which notifies the caller via From d67c9751afc79f31d90f6d8e89c2cc1aecd01910 Mon Sep 17 00:00:00 2001 From: Tom Tan Date: Thu, 7 Nov 2024 13:32:13 -0800 Subject: [PATCH 03/14] Change force_flush to async --- opentelemetry-sdk/src/export/logs/mod.rs | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/opentelemetry-sdk/src/export/logs/mod.rs b/opentelemetry-sdk/src/export/logs/mod.rs index db6b3b1e4d..335ac85ffb 100644 --- a/opentelemetry-sdk/src/export/logs/mod.rs +++ b/opentelemetry-sdk/src/export/logs/mod.rs @@ -3,7 +3,6 @@ use crate::logs::LogRecord; use crate::logs::{LogError, LogResult}; use crate::Resource; use async_trait::async_trait; -use futures_util::future::BoxFuture; #[cfg(feature = "logs_level_enabled")] use opentelemetry::logs::Severity; use opentelemetry::InstrumentationScope; @@ -108,8 +107,8 @@ pub trait LogExporter: Send + Sync + Debug { /// implemented as a blocking API or an asynchronous API which notifies the caller via /// a callback or an event. OpenTelemetry client authors can decide if they want to /// make the flush timeout configurable. - fn force_flush(&mut self) -> BoxFuture<'static, ExportResult> { - Box::pin(async { Ok(()) }) + async fn force_flush(&mut self) -> ExportResult { + Ok(()) } /// Set the resource for the exporter. fn set_resource(&mut self, _resource: &Resource) {} From 3aa6f8dd45eebd799803648f40cb9930ddbf46dd Mon Sep 17 00:00:00 2001 From: Tom Tan Date: Wed, 13 Nov 2024 16:25:58 -0800 Subject: [PATCH 04/14] Remove async from force_flush in LogExporter --- opentelemetry-sdk/src/export/logs/mod.rs | 2 +- opentelemetry-sdk/src/logs/log_processor.rs | 22 +++++++++++++++++++-- 2 files changed, 21 insertions(+), 3 deletions(-) diff --git a/opentelemetry-sdk/src/export/logs/mod.rs b/opentelemetry-sdk/src/export/logs/mod.rs index 335ac85ffb..460152fa7f 100644 --- a/opentelemetry-sdk/src/export/logs/mod.rs +++ b/opentelemetry-sdk/src/export/logs/mod.rs @@ -107,7 +107,7 @@ pub trait LogExporter: Send + Sync + Debug { /// implemented as a blocking API or an asynchronous API which notifies the caller via /// a callback or an event. OpenTelemetry client authors can decide if they want to /// make the flush timeout configurable. - async fn force_flush(&mut self) -> ExportResult { + fn force_flush(&mut self) -> ExportResult { Ok(()) } /// Set the resource for the exporter. diff --git a/opentelemetry-sdk/src/logs/log_processor.rs b/opentelemetry-sdk/src/logs/log_processor.rs index ee65ed3a74..2081236241 100644 --- a/opentelemetry-sdk/src/logs/log_processor.rs +++ b/opentelemetry-sdk/src/logs/log_processor.rs @@ -259,7 +259,7 @@ impl BatchLogProcessor { logs.split_off(0), ) .await - .and(exporter.as_mut().force_flush().await); + .and(exporter.as_mut().force_flush()); if let Some(channel) = res_channel { if let Err(send_error) = channel.send(result) { @@ -779,6 +779,25 @@ mod tests { let _ = provider.shutdown(); } + #[tokio::test(flavor = "multi_thread")] + async fn test_batch_forceflush() { + let exporter = InMemoryLogExporterBuilder::default().build(); + // TODO: Verify exporter.force_flush() is called + + let processor = BatchLogProcessor::new( + Box::new(exporter.clone()), + BatchConfig::default(), + runtime::Tokio, + ); + + let mut record = LogRecord::default(); + let instrumentation = InstrumentationScope::default(); + + processor.emit(&mut record, &instrumentation); + processor.force_flush().unwrap(); + assert_eq!(1, exporter.get_emitted_logs().unwrap().len()); + } + #[tokio::test(flavor = "multi_thread")] async fn test_batch_shutdown() { // assert we will receive an error @@ -796,7 +815,6 @@ mod tests { let instrumentation = InstrumentationScope::default(); processor.emit(&mut record, &instrumentation); - processor.force_flush().unwrap(); processor.shutdown().unwrap(); // todo: expect to see errors here. How should we assert this? processor.emit(&mut record, &instrumentation); From 9290828d978e709daefb7fd1d3a5cf377102d2bd Mon Sep 17 00:00:00 2001 From: Tom Tan Date: Mon, 18 Nov 2024 15:13:08 -0800 Subject: [PATCH 05/14] Add force_flush exporter call to SimpleLogProcessor --- opentelemetry-sdk/src/logs/log_processor.rs | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/opentelemetry-sdk/src/logs/log_processor.rs b/opentelemetry-sdk/src/logs/log_processor.rs index b9f5e9094a..9f5dc3f343 100644 --- a/opentelemetry-sdk/src/logs/log_processor.rs +++ b/opentelemetry-sdk/src/logs/log_processor.rs @@ -129,7 +129,11 @@ impl LogProcessor for SimpleLogProcessor { } fn force_flush(&self) -> LogResult<()> { - Ok(()) + if let Ok(mut exporter) = self.exporter.lock() { + exporter.force_flush() + } else { + Err(LogError::MutexPoisoned("SimpleLogProcessor".into())) + } } fn shutdown(&self) -> LogResult<()> { From 1bf3d45115bc824507f60b93e147ffa6a3932207 Mon Sep 17 00:00:00 2001 From: Tom Tan Date: Tue, 21 Jan 2025 14:40:09 -0800 Subject: [PATCH 06/14] Address feedback --- opentelemetry-sdk/src/export/logs/mod.rs | 15 ++------------- 1 file changed, 2 insertions(+), 13 deletions(-) diff --git a/opentelemetry-sdk/src/export/logs/mod.rs b/opentelemetry-sdk/src/export/logs/mod.rs index cbb2ed1910..c7cc25731d 100644 --- a/opentelemetry-sdk/src/export/logs/mod.rs +++ b/opentelemetry-sdk/src/export/logs/mod.rs @@ -144,21 +144,10 @@ pub trait LogExporter: Send + Sync + Debug { true } - /// This is a hint to ensure that the export of any Logs the exporter - /// has received prior to the call to this function SHOULD be completed - /// as soon as possible, preferably before returning from this method. /// - /// This function SHOULD provide a way to let the caller know - /// whether it succeeded, failed or timed out. + /// This method SHOULD block the current thread until all pending log records are exported. + /// If the export was not successful, an error is returned. /// - /// This function SHOULD only be called in cases where it is absolutely necessary, - /// such as when using some FaaS providers that may suspend the process after - /// an invocation, but before the exporter exports the completed logs. - /// - /// This function SHOULD complete or abort within some timeout. This function can be - /// implemented as a blocking API or an asynchronous API which notifies the caller via - /// a callback or an event. OpenTelemetry client authors can decide if they want to - /// make the flush timeout configurable. fn force_flush(&mut self) -> ExportResult { Ok(()) } From 55dfd625748c9d5ae384952927a491f723f00ab7 Mon Sep 17 00:00:00 2001 From: Tom Tan Date: Tue, 21 Jan 2025 14:59:03 -0800 Subject: [PATCH 07/14] Fix build --- opentelemetry-sdk/src/logs/log_processor.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/opentelemetry-sdk/src/logs/log_processor.rs b/opentelemetry-sdk/src/logs/log_processor.rs index 251e196104..8bd3d514db 100644 --- a/opentelemetry-sdk/src/logs/log_processor.rs +++ b/opentelemetry-sdk/src/logs/log_processor.rs @@ -478,8 +478,8 @@ impl BatchLogProcessor { result = export_with_timeout_sync( config.max_export_timeout, exporter.as_mut(), - &timeout_runtime, logs.split_off(0), + &mut last_export_time, ) .await .and(exporter.as_mut().force_flush()); From eae55feeb62f9e530bf9f0d85b7caef2d6ae1a2c Mon Sep 17 00:00:00 2001 From: Tom Tan Date: Tue, 21 Jan 2025 15:25:19 -0800 Subject: [PATCH 08/14] Remvoe force_flush call after export --- opentelemetry-sdk/src/logs/log_processor.rs | 2 -- 1 file changed, 2 deletions(-) diff --git a/opentelemetry-sdk/src/logs/log_processor.rs b/opentelemetry-sdk/src/logs/log_processor.rs index 8bd3d514db..56fac409cd 100644 --- a/opentelemetry-sdk/src/logs/log_processor.rs +++ b/opentelemetry-sdk/src/logs/log_processor.rs @@ -481,8 +481,6 @@ impl BatchLogProcessor { logs.split_off(0), &mut last_export_time, ) - .await - .and(exporter.as_mut().force_flush()); current_batch_size.fetch_sub(count_of_logs, Ordering::Relaxed); } From bacb62870cb099901510b8ef4ca1fa1f37b01e46 Mon Sep 17 00:00:00 2001 From: Tom Tan Date: Tue, 21 Jan 2025 15:27:32 -0800 Subject: [PATCH 09/14] Fix build --- opentelemetry-sdk/src/logs/log_processor.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/opentelemetry-sdk/src/logs/log_processor.rs b/opentelemetry-sdk/src/logs/log_processor.rs index 56fac409cd..d9a26c2723 100644 --- a/opentelemetry-sdk/src/logs/log_processor.rs +++ b/opentelemetry-sdk/src/logs/log_processor.rs @@ -480,7 +480,7 @@ impl BatchLogProcessor { exporter.as_mut(), logs.split_off(0), &mut last_export_time, - ) + ); current_batch_size.fetch_sub(count_of_logs, Ordering::Relaxed); } From b67e2b0c270b53eec7ffdc68e0d91dd51f05706c Mon Sep 17 00:00:00 2001 From: Tom Tan Date: Wed, 22 Jan 2025 14:04:38 -0800 Subject: [PATCH 10/14] Fix parameters --- opentelemetry-sdk/src/logs/log_processor.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/opentelemetry-sdk/src/logs/log_processor.rs b/opentelemetry-sdk/src/logs/log_processor.rs index d9a26c2723..56f5b8376e 100644 --- a/opentelemetry-sdk/src/logs/log_processor.rs +++ b/opentelemetry-sdk/src/logs/log_processor.rs @@ -477,8 +477,8 @@ impl BatchLogProcessor { result = export_with_timeout_sync( config.max_export_timeout, - exporter.as_mut(), - logs.split_off(0), + &exporter, + logs, &mut last_export_time, ); From a154ce877ce640edf2a04690dd5804860fbbc53c Mon Sep 17 00:00:00 2001 From: Tom Tan Date: Wed, 22 Jan 2025 14:17:26 -0800 Subject: [PATCH 11/14] Remove unneeded reference --- opentelemetry-sdk/src/logs/log_processor.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/opentelemetry-sdk/src/logs/log_processor.rs b/opentelemetry-sdk/src/logs/log_processor.rs index 56f5b8376e..4efaf69982 100644 --- a/opentelemetry-sdk/src/logs/log_processor.rs +++ b/opentelemetry-sdk/src/logs/log_processor.rs @@ -477,7 +477,7 @@ impl BatchLogProcessor { result = export_with_timeout_sync( config.max_export_timeout, - &exporter, + exporter, logs, &mut last_export_time, ); From 54bb9fafa2016855ca821feb527642e3326a3ad3 Mon Sep 17 00:00:00 2001 From: Tom Tan Date: Wed, 22 Jan 2025 15:15:05 -0800 Subject: [PATCH 12/14] Remove unnecessary mut --- opentelemetry-sdk/src/logs/log_processor.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/opentelemetry-sdk/src/logs/log_processor.rs b/opentelemetry-sdk/src/logs/log_processor.rs index 4efaf69982..507c298c8a 100644 --- a/opentelemetry-sdk/src/logs/log_processor.rs +++ b/opentelemetry-sdk/src/logs/log_processor.rs @@ -479,7 +479,7 @@ impl BatchLogProcessor { config.max_export_timeout, exporter, logs, - &mut last_export_time, + last_export_time, ); current_batch_size.fetch_sub(count_of_logs, Ordering::Relaxed); From 86d70583f599dd8913e71c05b5c6b5e505ac0ade Mon Sep 17 00:00:00 2001 From: Tom Tan Date: Wed, 22 Jan 2025 15:28:23 -0800 Subject: [PATCH 13/14] Remove runtime::tokio --- opentelemetry-sdk/src/logs/log_processor.rs | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/opentelemetry-sdk/src/logs/log_processor.rs b/opentelemetry-sdk/src/logs/log_processor.rs index 507c298c8a..888afcfbcb 100644 --- a/opentelemetry-sdk/src/logs/log_processor.rs +++ b/opentelemetry-sdk/src/logs/log_processor.rs @@ -480,7 +480,7 @@ impl BatchLogProcessor { exporter, logs, last_export_time, - ); + ); // This method clears the logs vec after exporting current_batch_size.fetch_sub(count_of_logs, Ordering::Relaxed); } @@ -1070,12 +1070,10 @@ mod tests { #[tokio::test(flavor = "multi_thread")] async fn test_batch_forceflush() { let exporter = InMemoryLogExporterBuilder::default().build(); - // TODO: Verify exporter.force_flush() is called let processor = BatchLogProcessor::new( Box::new(exporter.clone()), BatchConfig::default(), - runtime::Tokio, ); let mut record = LogRecord::default(); @@ -1099,6 +1097,7 @@ mod tests { let instrumentation = InstrumentationScope::default(); processor.emit(&mut record, &instrumentation); + processor.force_flush().unwrap(); processor.shutdown().unwrap(); // todo: expect to see errors here. How should we assert this? processor.emit(&mut record, &instrumentation); From 6e7e418e2626351327c25071a4a2f09f2c64c5a1 Mon Sep 17 00:00:00 2001 From: Tom Tan Date: Wed, 22 Jan 2025 15:38:53 -0800 Subject: [PATCH 14/14] Fix test --- opentelemetry-sdk/src/logs/log_processor.rs | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/opentelemetry-sdk/src/logs/log_processor.rs b/opentelemetry-sdk/src/logs/log_processor.rs index 888afcfbcb..671b36bdff 100644 --- a/opentelemetry-sdk/src/logs/log_processor.rs +++ b/opentelemetry-sdk/src/logs/log_processor.rs @@ -1071,10 +1071,7 @@ mod tests { async fn test_batch_forceflush() { let exporter = InMemoryLogExporterBuilder::default().build(); - let processor = BatchLogProcessor::new( - Box::new(exporter.clone()), - BatchConfig::default(), - ); + let processor = BatchLogProcessor::new(exporter.clone(), BatchConfig::default()); let mut record = LogRecord::default(); let instrumentation = InstrumentationScope::default();