diff --git a/opentelemetry-sdk/CHANGELOG.md b/opentelemetry-sdk/CHANGELOG.md index 4837e6e11b..b409a65b99 100644 --- a/opentelemetry-sdk/CHANGELOG.md +++ b/opentelemetry-sdk/CHANGELOG.md @@ -217,6 +217,14 @@ metadata, a feature introduced in version 0.1.40. [#2418](https://github.com/ope - Update `EnvResourceDetector` to allow resource attribute values containing equal signs (`"="`). [#2120](https://github.com/open-telemetry/opentelemetry-rust/pull/2120) +- **Breaking** Introduced `experimental_async_runtime` feature for runtime-specific traits. + - Runtime-specific features (`rt-tokio`, `rt-tokio-current-thread`, and `rt-async-std`) + now depend on the `experimental_async_runtime` feature. + - For most users, no action is required. Enabling runtime features such as `rt-tokio`, `rt-tokio-current-thread`, + or `rt-async-std` will automatically enable the `experimental_async_runtime` feature. + - If you're implementing a custom runtime, you must explicitly enable the experimental_async_runtime` feature in your + Cargo.toml and implement the required `Runtime` traits. + ## 0.27.1 Released 2024-Nov-27 diff --git a/opentelemetry-sdk/Cargo.toml b/opentelemetry-sdk/Cargo.toml index 167846dce1..ca21fd55dd 100644 --- a/opentelemetry-sdk/Cargo.toml +++ b/opentelemetry-sdk/Cargo.toml @@ -49,9 +49,10 @@ logs = ["opentelemetry/logs", "serde_json"] spec_unstable_logs_enabled = ["logs", "opentelemetry/spec_unstable_logs_enabled"] metrics = ["opentelemetry/metrics", "glob", "async-trait"] testing = ["opentelemetry/testing", "trace", "metrics", "logs", "rt-async-std", "rt-tokio", "rt-tokio-current-thread", "tokio/macros", "tokio/rt-multi-thread"] -rt-tokio = ["tokio", "tokio-stream"] -rt-tokio-current-thread = ["tokio", "tokio-stream"] -rt-async-std = ["async-std"] +experimental_async_runtime = [] +rt-tokio = ["tokio", "tokio-stream", "experimental_async_runtime"] +rt-tokio-current-thread = ["tokio", "tokio-stream", "experimental_async_runtime"] +rt-async-std = ["async-std", "experimental_async_runtime"] internal-logs = ["tracing"] experimental_metrics_periodicreader_with_async_runtime = ["metrics"] spec_unstable_metrics_views = ["metrics"] diff --git a/opentelemetry-sdk/src/lib.rs b/opentelemetry-sdk/src/lib.rs index 4afda7deb7..4e2ef47ba7 100644 --- a/opentelemetry-sdk/src/lib.rs +++ b/opentelemetry-sdk/src/lib.rs @@ -94,6 +94,7 @@ //! Support for recording and exporting telemetry asynchronously and perform //! metrics aggregation can be added via the following flags: //! +//! * `experimental_async_runtime`: Enables the experimental `Runtime` trait and related functionality. //! * `rt-tokio`: Spawn telemetry tasks using [tokio]'s multi-thread runtime. //! * `rt-tokio-current-thread`: Spawn telemetry tasks on a separate runtime so that the main runtime won't be blocked. //! * `rt-async-std`: Spawn telemetry tasks using [async-std]'s runtime. @@ -133,6 +134,7 @@ pub mod metrics; #[cfg_attr(docsrs, doc(cfg(feature = "trace")))] pub mod propagation; pub mod resource; +#[cfg(feature = "experimental_async_runtime")] pub mod runtime; #[cfg(any(feature = "testing", test))] #[cfg_attr(docsrs, doc(cfg(any(feature = "testing", test))))] diff --git a/opentelemetry-sdk/src/runtime.rs b/opentelemetry-sdk/src/runtime.rs index 7705c10e91..00720e0892 100644 --- a/opentelemetry-sdk/src/runtime.rs +++ b/opentelemetry-sdk/src/runtime.rs @@ -15,6 +15,7 @@ use thiserror::Error; /// /// [Tokio]: https://crates.io/crates/tokio /// [async-std]: https://crates.io/crates/async-std +#[cfg(feature = "experimental_async_runtime")] pub trait Runtime: Clone + Send + Sync + 'static { /// A future stream, which returns items in a previously specified interval. The item type is /// not important. @@ -44,13 +45,19 @@ pub trait Runtime: Clone + Send + Sync + 'static { } /// Runtime implementation, which works with Tokio's multi thread runtime. -#[cfg(feature = "rt-tokio")] -#[cfg_attr(docsrs, doc(cfg(feature = "rt-tokio")))] +#[cfg(all(feature = "experimental_async_runtime", feature = "rt-tokio"))] +#[cfg_attr( + docsrs, + doc(cfg(all(feature = "experimental_async_runtime", feature = "rt-tokio"))) +)] #[derive(Debug, Clone)] pub struct Tokio; -#[cfg(feature = "rt-tokio")] -#[cfg_attr(docsrs, doc(cfg(feature = "rt-tokio")))] +#[cfg(all(feature = "experimental_async_runtime", feature = "rt-tokio"))] +#[cfg_attr( + docsrs, + doc(cfg(all(feature = "experimental_async_runtime", feature = "rt-tokio"))) +)] impl Runtime for Tokio { type Interval = tokio_stream::wrappers::IntervalStream; type Delay = ::std::pin::Pin<Box<tokio::time::Sleep>>; @@ -71,13 +78,31 @@ impl Runtime for Tokio { } /// Runtime implementation, which works with Tokio's current thread runtime. -#[cfg(feature = "rt-tokio-current-thread")] -#[cfg_attr(docsrs, doc(cfg(feature = "rt-tokio-current-thread")))] +#[cfg(all( + feature = "experimental_async_runtime", + feature = "rt-tokio-current-thread" +))] +#[cfg_attr( + docsrs, + doc(cfg(all( + feature = "experimental_async_runtime", + feature = "rt-tokio-current-thread" + ))) +)] #[derive(Debug, Clone)] pub struct TokioCurrentThread; -#[cfg(feature = "rt-tokio-current-thread")] -#[cfg_attr(docsrs, doc(cfg(feature = "rt-tokio-current-thread")))] +#[cfg(all( + feature = "experimental_async_runtime", + feature = "rt-tokio-current-thread" +))] +#[cfg_attr( + docsrs, + doc(cfg(all( + feature = "experimental_async_runtime", + feature = "rt-tokio-current-thread" + ))) +)] impl Runtime for TokioCurrentThread { type Interval = tokio_stream::wrappers::IntervalStream; type Delay = ::std::pin::Pin<Box<tokio::time::Sleep>>; @@ -108,13 +133,19 @@ impl Runtime for TokioCurrentThread { } /// Runtime implementation, which works with async-std. -#[cfg(feature = "rt-async-std")] -#[cfg_attr(docsrs, doc(cfg(feature = "rt-async-std")))] +#[cfg(all(feature = "experimental_async_runtime", feature = "rt-async-std"))] +#[cfg_attr( + docsrs, + doc(cfg(all(feature = "experimental_async_runtime", feature = "rt-async-std"))) +)] #[derive(Debug, Clone)] pub struct AsyncStd; -#[cfg(feature = "rt-async-std")] -#[cfg_attr(docsrs, doc(cfg(feature = "rt-async-std")))] +#[cfg(all(feature = "experimental_async_runtime", feature = "rt-async-std"))] +#[cfg_attr( + docsrs, + doc(cfg(all(feature = "experimental_async_runtime", feature = "rt-async-std"))) +)] impl Runtime for AsyncStd { type Interval = async_std::stream::Interval; type Delay = BoxFuture<'static, ()>; @@ -138,6 +169,7 @@ impl Runtime for AsyncStd { /// /// [log]: crate::logs::BatchLogProcessor /// [span]: crate::trace::BatchSpanProcessor +#[cfg(feature = "experimental_async_runtime")] pub trait RuntimeChannel: Runtime { /// A future stream to receive batch messages from channels. type Receiver<T: Debug + Send>: Stream<Item = T> + Send; @@ -152,6 +184,7 @@ pub trait RuntimeChannel: Runtime { } /// Error returned by a [`TrySend`] implementation. +#[cfg(feature = "experimental_async_runtime")] #[derive(Debug, Error)] pub enum TrySendError { /// Send failed due to the channel being full. @@ -166,6 +199,7 @@ pub enum TrySendError { } /// TrySend is an abstraction of `Sender` that is capable of sending messages through a reference. +#[cfg(feature = "experimental_async_runtime")] pub trait TrySend: Sync + Send { /// The message that will be sent. type Message; @@ -176,7 +210,10 @@ pub trait TrySend: Sync + Send { fn try_send(&self, item: Self::Message) -> Result<(), TrySendError>; } -#[cfg(any(feature = "rt-tokio", feature = "rt-tokio-current-thread"))] +#[cfg(all( + feature = "experimental_async_runtime", + any(feature = "rt-tokio", feature = "rt-tokio-current-thread") +))] impl<T: Send> TrySend for tokio::sync::mpsc::Sender<T> { type Message = T; @@ -188,8 +225,11 @@ impl<T: Send> TrySend for tokio::sync::mpsc::Sender<T> { } } -#[cfg(feature = "rt-tokio")] -#[cfg_attr(docsrs, doc(cfg(feature = "rt-tokio")))] +#[cfg(all(feature = "experimental_async_runtime", feature = "rt-tokio"))] +#[cfg_attr( + docsrs, + doc(cfg(all(feature = "experimental_async_runtime", feature = "rt-tokio"))) +)] impl RuntimeChannel for Tokio { type Receiver<T: Debug + Send> = tokio_stream::wrappers::ReceiverStream<T>; type Sender<T: Debug + Send> = tokio::sync::mpsc::Sender<T>; @@ -206,8 +246,17 @@ impl RuntimeChannel for Tokio { } } -#[cfg(feature = "rt-tokio-current-thread")] -#[cfg_attr(docsrs, doc(cfg(feature = "rt-tokio-current-thread")))] +#[cfg(all( + feature = "experimental_async_runtime", + feature = "rt-tokio-current-thread" +))] +#[cfg_attr( + docsrs, + doc(cfg(all( + feature = "experimental_async_runtime", + feature = "rt-tokio-current-thread" + ))) +)] impl RuntimeChannel for TokioCurrentThread { type Receiver<T: Debug + Send> = tokio_stream::wrappers::ReceiverStream<T>; type Sender<T: Debug + Send> = tokio::sync::mpsc::Sender<T>; @@ -224,7 +273,7 @@ impl RuntimeChannel for TokioCurrentThread { } } -#[cfg(feature = "rt-async-std")] +#[cfg(all(feature = "experimental_async_runtime", feature = "rt-async-std"))] impl<T: Send> TrySend for async_std::channel::Sender<T> { type Message = T; @@ -236,8 +285,11 @@ impl<T: Send> TrySend for async_std::channel::Sender<T> { } } -#[cfg(feature = "rt-async-std")] -#[cfg_attr(docsrs, doc(cfg(feature = "rt-async-std")))] +#[cfg(all(feature = "experimental_async_runtime", feature = "rt-async-std"))] +#[cfg_attr( + docsrs, + doc(cfg(all(feature = "experimental_async_runtime", feature = "rt-async-std"))) +)] impl RuntimeChannel for AsyncStd { type Receiver<T: Debug + Send> = async_std::channel::Receiver<T>; type Sender<T: Debug + Send> = async_std::channel::Sender<T>;