Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add runtime traits under experimental flag. #2519

Merged
merged 9 commits into from
Jan 16, 2025
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 18 additions & 0 deletions opentelemetry-sdk/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,24 @@ metadata, a feature introduced in version 0.1.40. [#2418](https://github.com/ope
"rust.opentelemetry.io/sdk/tracer"
[#2486](https://github.com/open-telemetry/opentelemetry-rust/pull/2486)

- **Breaking** Introduced `experimental_async_runtime` feature for runtime-specific traits.
- Runtime-specific features (`rt-tokio`, `rt-tokio-current-thread`, and `rt-async-std`)
now depend on the `experimental_async_runtime` feature.
- Custom runtime implementations must enable the `experimental_async_runtime` feature.
- For users needing to enable the runtime related features
lalitb marked this conversation as resolved.
Show resolved Hide resolved
- **For users enabling runtime features**: No additional steps are required.
Enabling `rt-tokio`, `rt-tokio-current-thread`, or `rt-async-std` automatically enables `experimental_async_runtime`.
```toml
[dependencies]
opentelemetry_sdk = { version = "0.27.1", features = ["rt-tokio"] }
```
- **For users implementing a custom runtime**: Enable the experimental_async_runtime feature in your Cargo.toml
and implement the Runtime trait:
```toml
[dependencies]
opentelemetry_sdk = { version = "0.27.1", features = ["experimental_async_runtime"] }
lalitb marked this conversation as resolved.
Show resolved Hide resolved
```

## 0.27.1

Released 2024-Nov-27
Expand Down
7 changes: 4 additions & 3 deletions opentelemetry-sdk/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -49,9 +49,10 @@ logs = ["opentelemetry/logs", "serde_json"]
spec_unstable_logs_enabled = ["logs", "opentelemetry/spec_unstable_logs_enabled"]
metrics = ["opentelemetry/metrics", "glob", "async-trait"]
testing = ["opentelemetry/testing", "trace", "metrics", "logs", "rt-async-std", "rt-tokio", "rt-tokio-current-thread", "tokio/macros", "tokio/rt-multi-thread"]
rt-tokio = ["tokio", "tokio-stream"]
rt-tokio-current-thread = ["tokio", "tokio-stream"]
rt-async-std = ["async-std"]
experimental_async_runtime = []
rt-tokio = ["tokio", "tokio-stream", "experimental_async_runtime"]
rt-tokio-current-thread = ["tokio", "tokio-stream", "experimental_async_runtime"]
rt-async-std = ["async-std", "experimental_async_runtime"]
internal-logs = ["tracing"]
experimental_metrics_periodicreader_with_async_runtime = ["metrics"]
spec_unstable_metrics_views = ["metrics"]
Expand Down
2 changes: 2 additions & 0 deletions opentelemetry-sdk/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@
//! Support for recording and exporting telemetry asynchronously and perform
//! metrics aggregation can be added via the following flags:
//!
//! * `experimental_async_runtime`: Enables the experimental `Runtime` trait and related functionality.
//! * `rt-tokio`: Spawn telemetry tasks using [tokio]'s multi-thread runtime.
//! * `rt-tokio-current-thread`: Spawn telemetry tasks on a separate runtime so that the main runtime won't be blocked.
//! * `rt-async-std`: Spawn telemetry tasks using [async-std]'s runtime.
Expand Down Expand Up @@ -133,6 +134,7 @@ pub mod metrics;
#[cfg_attr(docsrs, doc(cfg(feature = "trace")))]
pub mod propagation;
pub mod resource;
#[cfg(feature = "experimental_async_runtime")]
pub mod runtime;
#[cfg(any(feature = "testing", test))]
#[cfg_attr(docsrs, doc(cfg(any(feature = "testing", test))))]
Expand Down
92 changes: 72 additions & 20 deletions opentelemetry-sdk/src/runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ use thiserror::Error;
///
/// [Tokio]: https://crates.io/crates/tokio
/// [async-std]: https://crates.io/crates/async-std
#[cfg(feature = "experimental_async_runtime")]
pub trait Runtime: Clone + Send + Sync + 'static {
/// A future stream, which returns items in a previously specified interval. The item type is
/// not important.
Expand Down Expand Up @@ -44,13 +45,19 @@ pub trait Runtime: Clone + Send + Sync + 'static {
}

/// Runtime implementation, which works with Tokio's multi thread runtime.
#[cfg(feature = "rt-tokio")]
#[cfg_attr(docsrs, doc(cfg(feature = "rt-tokio")))]
#[cfg(all(feature = "experimental_async_runtime", feature = "rt-tokio"))]
#[cfg_attr(
docsrs,
doc(cfg(all(feature = "experimental_async_runtime", feature = "rt-tokio")))
)]
#[derive(Debug, Clone)]
pub struct Tokio;

#[cfg(feature = "rt-tokio")]
#[cfg_attr(docsrs, doc(cfg(feature = "rt-tokio")))]
#[cfg(all(feature = "experimental_async_runtime", feature = "rt-tokio"))]
#[cfg_attr(
docsrs,
doc(cfg(all(feature = "experimental_async_runtime", feature = "rt-tokio")))
)]
impl Runtime for Tokio {
type Interval = tokio_stream::wrappers::IntervalStream;
type Delay = ::std::pin::Pin<Box<tokio::time::Sleep>>;
Expand All @@ -71,13 +78,31 @@ impl Runtime for Tokio {
}

/// Runtime implementation, which works with Tokio's current thread runtime.
#[cfg(feature = "rt-tokio-current-thread")]
#[cfg_attr(docsrs, doc(cfg(feature = "rt-tokio-current-thread")))]
#[cfg(all(
feature = "experimental_async_runtime",
feature = "rt-tokio-current-thread"
))]
#[cfg_attr(
docsrs,
doc(cfg(all(
feature = "experimental_async_runtime",
feature = "rt-tokio-current-thread"
)))
)]
#[derive(Debug, Clone)]
pub struct TokioCurrentThread;

#[cfg(feature = "rt-tokio-current-thread")]
#[cfg_attr(docsrs, doc(cfg(feature = "rt-tokio-current-thread")))]
#[cfg(all(
feature = "experimental_async_runtime",
feature = "rt-tokio-current-thread"
))]
#[cfg_attr(
docsrs,
doc(cfg(all(
feature = "experimental_async_runtime",
feature = "rt-tokio-current-thread"
)))
)]
impl Runtime for TokioCurrentThread {
type Interval = tokio_stream::wrappers::IntervalStream;
type Delay = ::std::pin::Pin<Box<tokio::time::Sleep>>;
Expand Down Expand Up @@ -108,13 +133,19 @@ impl Runtime for TokioCurrentThread {
}

/// Runtime implementation, which works with async-std.
#[cfg(feature = "rt-async-std")]
#[cfg_attr(docsrs, doc(cfg(feature = "rt-async-std")))]
#[cfg(all(feature = "experimental_async_runtime", feature = "rt-async-std"))]
#[cfg_attr(
docsrs,
doc(cfg(all(feature = "experimental_async_runtime", feature = "rt-async-std")))
)]
#[derive(Debug, Clone)]
pub struct AsyncStd;

#[cfg(feature = "rt-async-std")]
#[cfg_attr(docsrs, doc(cfg(feature = "rt-async-std")))]
#[cfg(all(feature = "experimental_async_runtime", feature = "rt-async-std"))]
#[cfg_attr(
docsrs,
doc(cfg(all(feature = "experimental_async_runtime", feature = "rt-async-std")))
)]
impl Runtime for AsyncStd {
type Interval = async_std::stream::Interval;
type Delay = BoxFuture<'static, ()>;
Expand All @@ -138,6 +169,7 @@ impl Runtime for AsyncStd {
///
/// [log]: crate::logs::BatchLogProcessor
/// [span]: crate::trace::BatchSpanProcessor
#[cfg(feature = "experimental_async_runtime")]
pub trait RuntimeChannel: Runtime {
/// A future stream to receive batch messages from channels.
type Receiver<T: Debug + Send>: Stream<Item = T> + Send;
Expand All @@ -152,6 +184,7 @@ pub trait RuntimeChannel: Runtime {
}

/// Error returned by a [`TrySend`] implementation.
#[cfg(feature = "experimental_async_runtime")]
#[derive(Debug, Error)]
pub enum TrySendError {
/// Send failed due to the channel being full.
Expand All @@ -166,6 +199,7 @@ pub enum TrySendError {
}

/// TrySend is an abstraction of `Sender` that is capable of sending messages through a reference.
#[cfg(feature = "experimental_async_runtime")]
pub trait TrySend: Sync + Send {
/// The message that will be sent.
type Message;
Expand All @@ -176,7 +210,10 @@ pub trait TrySend: Sync + Send {
fn try_send(&self, item: Self::Message) -> Result<(), TrySendError>;
}

#[cfg(any(feature = "rt-tokio", feature = "rt-tokio-current-thread"))]
#[cfg(all(
feature = "experimental_async_runtime",
any(feature = "rt-tokio", feature = "rt-tokio-current-thread")
))]
impl<T: Send> TrySend for tokio::sync::mpsc::Sender<T> {
type Message = T;

Expand All @@ -188,8 +225,11 @@ impl<T: Send> TrySend for tokio::sync::mpsc::Sender<T> {
}
}

#[cfg(feature = "rt-tokio")]
#[cfg_attr(docsrs, doc(cfg(feature = "rt-tokio")))]
#[cfg(all(feature = "experimental_async_runtime", feature = "rt-tokio"))]
#[cfg_attr(
docsrs,
doc(cfg(all(feature = "experimental_async_runtime", feature = "rt-tokio")))
)]
impl RuntimeChannel for Tokio {
type Receiver<T: Debug + Send> = tokio_stream::wrappers::ReceiverStream<T>;
type Sender<T: Debug + Send> = tokio::sync::mpsc::Sender<T>;
Expand All @@ -206,8 +246,17 @@ impl RuntimeChannel for Tokio {
}
}

#[cfg(feature = "rt-tokio-current-thread")]
#[cfg_attr(docsrs, doc(cfg(feature = "rt-tokio-current-thread")))]
#[cfg(all(
feature = "experimental_async_runtime",
feature = "rt-tokio-current-thread"
))]
#[cfg_attr(
docsrs,
doc(cfg(all(
feature = "experimental_async_runtime",
feature = "rt-tokio-current-thread"
)))
)]
impl RuntimeChannel for TokioCurrentThread {
type Receiver<T: Debug + Send> = tokio_stream::wrappers::ReceiverStream<T>;
type Sender<T: Debug + Send> = tokio::sync::mpsc::Sender<T>;
Expand All @@ -224,7 +273,7 @@ impl RuntimeChannel for TokioCurrentThread {
}
}

#[cfg(feature = "rt-async-std")]
#[cfg(all(feature = "experimental_async_runtime", feature = "rt-async-std"))]
impl<T: Send> TrySend for async_std::channel::Sender<T> {
type Message = T;

Expand All @@ -236,8 +285,11 @@ impl<T: Send> TrySend for async_std::channel::Sender<T> {
}
}

#[cfg(feature = "rt-async-std")]
#[cfg_attr(docsrs, doc(cfg(feature = "rt-async-std")))]
#[cfg(all(feature = "experimental_async_runtime", feature = "rt-async-std"))]
#[cfg_attr(
docsrs,
doc(cfg(all(feature = "experimental_async_runtime", feature = "rt-async-std")))
)]
impl RuntimeChannel for AsyncStd {
type Receiver<T: Debug + Send> = async_std::channel::Receiver<T>;
type Sender<T: Debug + Send> = async_std::channel::Sender<T>;
Expand Down
Loading