Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove export timeout configuration for PeriodicReader #2598

Merged
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 27 additions & 2 deletions opentelemetry-sdk/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -304,12 +304,13 @@ limit.
`opentelemetry_sdk::trace::{InMemorySpanExporter, InMemorySpanExporterBuilder};`
`opentelemetry_sdk::metrics::{InMemoryMetricExporter, InMemoryMetricExporterBuilder};`

- *Breaking*: The `BatchLogProcessor` no longer supports configuration of `max_export_timeout`
- **Breaking**: The `BatchLogProcessor` no longer supports configuration of `max_export_timeout`
or the `OTEL_BLRP_EXPORT_TIMEOUT` environment variable. Timeout handling is now the
responsibility of the exporter.
For example, in the OTLP Logs exporter, the export timeout can be configured using:
- The environment variables `OTEL_EXPORTER_OTLP_TIMEOUT` or `OTEL_EXPORTER_OTLP_LOGS_TIMEOUT`.
- The opentelemetry_otlp API, via `.with_tonic().with_timeout()` or `.with_http().with_timeout()`.

Before:
```rust
let processor = BatchLogProcessor::builder(exporter)
Expand Down Expand Up @@ -337,12 +338,13 @@ let processor = BatchLogProcessor::builder(exporter)
.build();
```

- *Breaking*: The `BatchSpanProcessor` no longer supports configuration of `max_export_timeout`
- **Breaking**: The `BatchSpanProcessor` no longer supports configuration of `max_export_timeout`
or the `OTEL_BLRP_EXPORT_TIMEOUT` environment variable. Timeout handling is now the
responsibility of the exporter.
For example, in the OTLP Span exporter, the export timeout can be configured using:
- The environment variables `OTEL_EXPORTER_OTLP_TIMEOUT` or `OTEL_EXPORTER_OTLP_TRACES_TIMEOUT`.
- The opentelemetry_otlp API, via `.with_tonic().with_timeout()` or `.with_http().with_timeout()`.

Before:
```rust
let processor = BatchSpanProcessor::builder(exporter)
Expand Down Expand Up @@ -370,6 +372,29 @@ let processor = BatchSpanProcessor::builder(exporter)
.build();
```

- **Breaking**: The `PeriodicReader` no supports configuration of export timeout using
`with_timeout` API method.
Timeout handling is now the responsibility of the exporter.

For example, in the OTLP Metrics exporter, the export timeout can be configured using:
- The environment variables `OTEL_EXPORTER_OTLP_TIMEOUT` or `OTEL_EXPORTER_OTLP_METRICS_TIMEOUT`.
- The `opentelemetry_otlp` API, via `.with_tonic().with_timeout()` or `.with_http().with_timeout()`.

Before:
```rust
let reader = PeriodicReader::builder(exporter)
.with_interval(Duration::from_secs(30))
.with_timeout(Duration::from_secs(10)) // Previously configurable timeout
.build();
```

After:
```rust
let reader = PeriodicReader::builder(exporter)
lalitb marked this conversation as resolved.
Show resolved Hide resolved
.with_interval(Duration::from_secs(30))
.build();
```

## 0.27.1

Released 2024-Nov-27
Expand Down
54 changes: 10 additions & 44 deletions opentelemetry-sdk/src/metrics/periodic_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,17 +20,14 @@ use super::{
data::ResourceMetrics, instrument::InstrumentKind, reader::MetricReader, Pipeline, Temporality,
};

const DEFAULT_TIMEOUT: Duration = Duration::from_secs(30);
const DEFAULT_INTERVAL: Duration = Duration::from_secs(60);

const METRIC_EXPORT_INTERVAL_NAME: &str = "OTEL_METRIC_EXPORT_INTERVAL";
const METRIC_EXPORT_TIMEOUT_NAME: &str = "OTEL_METRIC_EXPORT_TIMEOUT";

/// Configuration options for [PeriodicReader].
#[derive(Debug)]
pub struct PeriodicReaderBuilder<E> {
interval: Duration,
timeout: Duration,
exporter: E,
}

Expand All @@ -43,16 +40,8 @@ where
.ok()
.and_then(|v| v.parse().map(Duration::from_millis).ok())
.unwrap_or(DEFAULT_INTERVAL);
let timeout = env::var(METRIC_EXPORT_TIMEOUT_NAME)
.ok()
.and_then(|v| v.parse().map(Duration::from_millis).ok())
.unwrap_or(DEFAULT_TIMEOUT);

PeriodicReaderBuilder {
interval,
timeout,
exporter,
}
PeriodicReaderBuilder { interval, exporter }
}

/// Configures the intervening time between exports for a [PeriodicReader].
Expand All @@ -69,25 +58,9 @@ where
self
}

/// Configures the timeout for an export to complete. PeriodicReader itself
/// does not enforce timeout. Instead timeout is passed on to the exporter
/// for each export attempt.
///
/// This option overrides any value set for the `OTEL_METRIC_EXPORT_TIMEOUT`
/// environment variable.
///
/// If this option is not used or `timeout` is equal to zero, 30 seconds is used
/// as the default.
pub fn with_timeout(mut self, timeout: Duration) -> Self {
if !timeout.is_zero() {
self.timeout = timeout;
}
self
}

/// Create a [PeriodicReader] with the given config.
pub fn build(self) -> PeriodicReader {
PeriodicReader::new(self.exporter, self.interval, self.timeout)
PeriodicReader::new(self.exporter, self.interval)
}
}

Expand Down Expand Up @@ -165,7 +138,7 @@ impl PeriodicReader {
PeriodicReaderBuilder::new(exporter)
}

fn new<E>(exporter: E, interval: Duration, timeout: Duration) -> Self
fn new<E>(exporter: E, interval: Duration) -> Self
where
E: PushMetricExporter,
{
Expand All @@ -189,7 +162,6 @@ impl PeriodicReader {
otel_info!(
name: "PeriodReaderThreadStarted",
interval_in_millisecs = interval.as_millis(),
timeout_in_millisecs = timeout.as_millis()
);
loop {
otel_debug!(
Expand All @@ -200,7 +172,7 @@ impl PeriodicReader {
otel_debug!(
name: "PeriodReaderThreadExportingDueToFlush"
);
if let Err(_e) = cloned_reader.collect_and_export(timeout) {
if let Err(_e) = cloned_reader.collect_and_export() {
response_sender.send(false).unwrap();
} else {
response_sender.send(true).unwrap();
Expand Down Expand Up @@ -231,7 +203,7 @@ impl PeriodicReader {
Ok(Message::Shutdown(response_sender)) => {
// Perform final export and break out of loop and exit the thread
otel_debug!(name: "PeriodReaderThreadExportingDueToShutdown");
let export_result = cloned_reader.collect_and_export(timeout);
let export_result = cloned_reader.collect_and_export();
let shutdown_result = exporter_arc.shutdown();
otel_debug!(
name: "PeriodReaderInvokedExporterShutdown",
Expand All @@ -255,7 +227,7 @@ impl PeriodicReader {
name: "PeriodReaderThreadExportingDueToTimer"
);

if let Err(_e) = cloned_reader.collect_and_export(timeout) {
if let Err(_e) = cloned_reader.collect_and_export() {
otel_debug!(
name: "PeriodReaderThreadExportingDueToTimerFailed"
);
Expand Down Expand Up @@ -307,8 +279,8 @@ impl PeriodicReader {
reader
}

fn collect_and_export(&self, timeout: Duration) -> MetricResult<()> {
self.inner.collect_and_export(timeout)
fn collect_and_export(&self) -> MetricResult<()> {
self.inner.collect_and_export()
}
}

Expand Down Expand Up @@ -352,23 +324,18 @@ impl PeriodicReaderInner {
}
}

fn collect_and_export(&self, timeout: Duration) -> MetricResult<()> {
fn collect_and_export(&self) -> MetricResult<()> {
// TODO: Reuse the internal vectors. Or refactor to avoid needing any
// owned data structures to be passed to exporters.
let mut rm = ResourceMetrics {
resource: Resource::empty(),
scope_metrics: Vec::new(),
};

// Measure time taken for collect, and subtract it from the timeout.
let current_time = Instant::now();
let collect_result = self.collect(&mut rm);
let time_taken_for_collect = current_time.elapsed();
let _timeout = if time_taken_for_collect > timeout {
Duration::from_secs(0)
} else {
timeout - time_taken_for_collect
};

cijothomas marked this conversation as resolved.
Show resolved Hide resolved
#[allow(clippy::question_mark)]
if let Err(e) = collect_result {
otel_warn!(
Expand All @@ -389,7 +356,6 @@ impl PeriodicReaderInner {
otel_debug!(name: "PeriodicReaderMetricsCollected", count = metrics_count, time_taken_in_millis = time_taken_for_collect.as_millis());

// Relying on futures executor to execute async call.
// TODO: Pass timeout to exporter
let exporter_result = futures_executor::block_on(self.exporter.export(&mut rm));
#[allow(clippy::question_mark)]
if let Err(e) = exporter_result {
Expand Down