From 0559e8b54fb41bcf04125e2e3cedeec87b24a9f9 Mon Sep 17 00:00:00 2001 From: Landon James Date: Sun, 17 Nov 2024 17:44:46 -0800 Subject: [PATCH] PR feedback --- .../src/attributes.rs | 38 ++++++-------- .../aws-smithy-observability-otel/src/lib.rs | 28 ++++++---- .../src/meter.rs | 51 +++++++++++-------- .../aws-smithy-observability/README.md | 2 +- .../src/attributes.rs | 13 +++-- .../aws-smithy-observability/src/global.rs | 29 +++++------ .../aws-smithy-observability/src/lib.rs | 5 +- .../aws-smithy-observability/src/meter.rs | 12 +---- .../aws-smithy-observability/src/noop.rs | 4 ++ .../aws-smithy-observability/src/provider.rs | 22 +++++--- 10 files changed, 112 insertions(+), 92 deletions(-) diff --git a/rust-runtime/aws-smithy-observability-otel/src/attributes.rs b/rust-runtime/aws-smithy-observability-otel/src/attributes.rs index d90601e572..befbdb25fc 100644 --- a/rust-runtime/aws-smithy-observability-otel/src/attributes.rs +++ b/rust-runtime/aws-smithy-observability-otel/src/attributes.rs @@ -44,16 +44,16 @@ pub(crate) fn option_attr_from_kv(input: &[KeyValue]) -> Option { impl From for Vec { fn from(value: AttributesWrap) -> Self { value - .attributes() - .iter() + .0 + .into_attributes() .map(|(k, v)| { KeyValue::new( - k.clone(), + k, match v { - AttributeValue::I64(val) => Value::I64(*val), - AttributeValue::F64(val) => Value::F64(*val), - AttributeValue::String(val) => Value::String(val.clone().into()), - AttributeValue::Bool(val) => Value::Bool(*val), + AttributeValue::I64(val) => Value::I64(val), + AttributeValue::F64(val) => Value::F64(val), + AttributeValue::String(val) => Value::String(val.into()), + AttributeValue::Bool(val) => Value::Bool(val), _ => Value::String("UNSUPPORTED ATTRIBUTE VALUE TYPE".into()), }, ) @@ -68,7 +68,7 @@ impl From<&[KeyValue]> for AttributesWrap { value.iter().for_each(|kv| { attrs.set( - kv.key.clone().into(), + kv.key.clone(), match &kv.value { Value::Bool(val) => AttributeValue::Bool(*val), Value::I64(val) => AttributeValue::I64(*val), @@ -96,13 +96,10 @@ mod tests { #[test] fn attr_to_kv() { let mut attrs = Attributes::new(); - attrs.set("I64".into(), AttributeValue::I64(64)); - attrs.set("F64".into(), AttributeValue::F64(64.0)); - attrs.set( - "String".into(), - AttributeValue::String("I AM A STRING".into()), - ); - attrs.set("Bool".into(), AttributeValue::Bool(true)); + attrs.set("I64", AttributeValue::I64(64)); + attrs.set("F64", AttributeValue::F64(64.0)); + attrs.set("String", AttributeValue::String("I AM A STRING".into())); + attrs.set("Bool", AttributeValue::Bool(true)); let kv = kv_from_option_attr(Some(&attrs)); @@ -130,15 +127,12 @@ mod tests { ]; let attrs = option_attr_from_kv(&kvs).unwrap(); + assert_eq!(attrs.get("Bool").unwrap(), &AttributeValue::Bool(true)); assert_eq!( - attrs.get("Bool".into()).unwrap(), - &AttributeValue::Bool(true) - ); - assert_eq!( - attrs.get("String".into()).unwrap(), + attrs.get("String").unwrap(), &AttributeValue::String("I AM A STRING".into()) ); - assert_eq!(attrs.get("I64".into()).unwrap(), &AttributeValue::I64(64)); - assert_eq!(attrs.get("F64".into()).unwrap(), &AttributeValue::F64(64.0)); + assert_eq!(attrs.get("I64").unwrap(), &AttributeValue::I64(64)); + assert_eq!(attrs.get("F64").unwrap(), &AttributeValue::F64(64.0)); } } diff --git a/rust-runtime/aws-smithy-observability-otel/src/lib.rs b/rust-runtime/aws-smithy-observability-otel/src/lib.rs index e2532f683f..331738b781 100644 --- a/rust-runtime/aws-smithy-observability-otel/src/lib.rs +++ b/rust-runtime/aws-smithy-observability-otel/src/lib.rs @@ -26,9 +26,7 @@ pub mod meter; mod tests { use crate::meter::AwsSdkOtelMeterProvider; - use aws_smithy_observability::global::{ - get_global_telemetry_provider, set_global_telemetry_provider, - }; + use aws_smithy_observability::global::{get_telemetry_provider, set_telemetry_provider}; use aws_smithy_observability::provider::TelemetryProvider; use opentelemetry_sdk::metrics::{data::Sum, PeriodicReader, SdkMeterProvider}; use opentelemetry_sdk::runtime::Tokio; @@ -44,13 +42,11 @@ mod tests { // Create the SDK metrics types from the OTel objects let sdk_mp = AwsSdkOtelMeterProvider::new(otel_mp); - let sdk_tp = TelemetryProvider::builder() - .meter_provider(Box::new(sdk_mp)) - .build(); + let sdk_tp = TelemetryProvider::builder().meter_provider(sdk_mp).build(); // Set the global TelemetryProvider and then get it back out - let _ = set_global_telemetry_provider(Some(sdk_tp)); - let global_tp = get_global_telemetry_provider(); + let _ = set_telemetry_provider(sdk_tp); + let global_tp = get_telemetry_provider(); // Create an instrument and record a value let global_meter = global_tp @@ -62,7 +58,13 @@ mod tests { mono_counter.add(4, None, None); // Flush metric pipeline and extract metrics from exporter - global_tp.meter_provider().flush().unwrap(); + global_tp + .meter_provider() + .as_any() + .downcast_ref::() + .unwrap() + .shutdown() + .unwrap(); let finished_metrics = exporter.get_finished_metrics().unwrap(); let extracted_mono_counter_data = &finished_metrics[0].scope_metrics[0].metrics[0] @@ -75,7 +77,11 @@ mod tests { assert_eq!(extracted_mono_counter_data, &4); // Get the OTel TP out and shut it down - let otel_tp = set_global_telemetry_provider(None); - otel_tp.meter_provider().shutdown().unwrap(); + let foo = global_tp + .meter_provider() + .as_any() + .downcast_ref::() + .unwrap(); + foo.shutdown().unwrap(); } } diff --git a/rust-runtime/aws-smithy-observability-otel/src/meter.rs b/rust-runtime/aws-smithy-observability-otel/src/meter.rs index 16fbb461a8..1270b075a0 100644 --- a/rust-runtime/aws-smithy-observability-otel/src/meter.rs +++ b/rust-runtime/aws-smithy-observability-otel/src/meter.rs @@ -266,21 +266,17 @@ impl AwsSdkOtelMeterProvider { meter_provider: otel_meter_provider, } } -} -impl MeterProvider for AwsSdkOtelMeterProvider { - fn get_meter(&self, scope: &'static str, _attributes: Option<&Attributes>) -> Box { - Box::new(MeterWrap(self.meter_provider.meter(scope))) - } - - fn flush(&self) -> Result<(), ObservabilityError> { + /// Flush the metric pipeline. + pub fn flush(&self) -> Result<(), ObservabilityError> { match self.meter_provider.force_flush() { Ok(_) => Ok(()), Err(err) => Err(ObservabilityError::new(ErrorKind::MetricsFlush, err)), } } - fn shutdown(&self) -> Result<(), ObservabilityError> { + /// Gracefully shutdown the metric pipeline. + pub fn shutdown(&self) -> Result<(), ObservabilityError> { match self.meter_provider.force_flush() { Ok(_) => Ok(()), Err(err) => Err(ObservabilityError::new(ErrorKind::MetricsShutdown, err)), @@ -288,6 +284,16 @@ impl MeterProvider for AwsSdkOtelMeterProvider { } } +impl MeterProvider for AwsSdkOtelMeterProvider { + fn get_meter(&self, scope: &'static str, _attributes: Option<&Attributes>) -> Box { + Box::new(MeterWrap(self.meter_provider.meter(scope))) + } + + fn as_any(&self) -> &dyn std::any::Any { + self + } +} + #[cfg(test)] mod tests { @@ -313,9 +319,7 @@ mod tests { // Create the SDK metrics types from the OTel objects let sdk_mp = AwsSdkOtelMeterProvider::new(otel_mp); - let sdk_tp = TelemetryProvider::builder() - .meter_provider(Box::new(sdk_mp)) - .build(); + let sdk_tp = TelemetryProvider::builder().meter_provider(sdk_mp).build(); // Get the dyn versions of the SDK metrics objects let dyn_sdk_mp = sdk_tp.meter_provider(); @@ -332,7 +336,12 @@ mod tests { histogram.record(1.234, None, None); // Gracefully shutdown the metrics provider so all metrics are flushed through the pipeline - dyn_sdk_mp.shutdown().unwrap(); + dyn_sdk_mp + .as_any() + .downcast_ref::() + .unwrap() + .shutdown() + .unwrap(); // Extract the metrics from the exporter and assert that they are what we expect let finished_metrics = exporter.get_finished_metrics().unwrap(); @@ -373,9 +382,7 @@ mod tests { // Create the SDK metrics types from the OTel objects let sdk_mp = AwsSdkOtelMeterProvider::new(otel_mp); - let sdk_tp = TelemetryProvider::builder() - .meter_provider(Box::new(sdk_mp)) - .build(); + let sdk_tp = TelemetryProvider::builder().meter_provider(sdk_mp).build(); // Get the dyn versions of the SDK metrics objects let dyn_sdk_mp = sdk_tp.meter_provider(); @@ -388,7 +395,7 @@ mod tests { Box::new(|measurement: &dyn AsyncMeasurement| { let mut attrs = Attributes::new(); attrs.set( - "TestGaugeAttr".into(), + "TestGaugeAttr", AttributeValue::String("TestGaugeAttr".into()), ); measurement.record(6.789, Some(&attrs), None); @@ -403,7 +410,7 @@ mod tests { Box::new(|measurement: &dyn AsyncMeasurement| { let mut attrs = Attributes::new(); attrs.set( - "TestAsyncUpDownCounterAttr".into(), + "TestAsyncUpDownCounterAttr", AttributeValue::String("TestAsyncUpDownCounterAttr".into()), ); measurement.record(12, Some(&attrs), None); @@ -418,7 +425,7 @@ mod tests { Box::new(|measurement: &dyn AsyncMeasurement| { let mut attrs = Attributes::new(); attrs.set( - "TestAsyncMonoCounterAttr".into(), + "TestAsyncMonoCounterAttr", AttributeValue::String("TestAsyncMonoCounterAttr".into()), ); measurement.record(123, Some(&attrs), None); @@ -429,8 +436,12 @@ mod tests { async_mono_counter.record(4, None, None); // Gracefully shutdown the metrics provider so all metrics are flushed through the pipeline - dyn_sdk_mp.flush().unwrap(); - dyn_sdk_mp.shutdown().unwrap(); + dyn_sdk_mp + .as_any() + .downcast_ref::() + .unwrap() + .shutdown() + .unwrap(); // Extract the metrics from the exporter let finished_metrics = exporter.get_finished_metrics().unwrap(); diff --git a/rust-runtime/aws-smithy-observability/README.md b/rust-runtime/aws-smithy-observability/README.md index 1f963acf40..a3b5b3a74a 100644 --- a/rust-runtime/aws-smithy-observability/README.md +++ b/rust-runtime/aws-smithy-observability/README.md @@ -1,6 +1,6 @@ # aws-smithy-observability -This crate contains traits allowing for the implementation of `TelemetryProvider`s for the AWS SDK for Rust. I also contains a `global` module for setting and interacting with the current `GlobalTelemetryProvider`. +This crate contains traits allowing for the implementation of `TelemetryProvider`s for the AWS SDK for Rust. It also contains a `global` module for setting and interacting with the current `GlobalTelemetryProvider`. This crate is part of the [AWS SDK for Rust](https://awslabs.github.io/aws-sdk-rust/) and the [smithy-rs](https://github.com/smithy-lang/smithy-rs) code generator. In most cases, it should not be used directly. diff --git a/rust-runtime/aws-smithy-observability/src/attributes.rs b/rust-runtime/aws-smithy-observability/src/attributes.rs index d90e89e414..055228ed35 100644 --- a/rust-runtime/aws-smithy-observability/src/attributes.rs +++ b/rust-runtime/aws-smithy-observability/src/attributes.rs @@ -39,19 +39,24 @@ impl Attributes { } /// Set an attribute. - pub fn set(&mut self, key: String, value: AttributeValue) { - self.attrs.insert(key, value); + pub fn set(&mut self, key: impl Into, value: impl Into) { + self.attrs.insert(key.into(), value.into()); } /// Get an attribute. - pub fn get(&self, key: String) -> Option<&AttributeValue> { - self.attrs.get(&key) + pub fn get(&self, key: impl Into) -> Option<&AttributeValue> { + self.attrs.get(&key.into()) } /// Get all of the attribute key value pairs. pub fn attributes(&self) -> &HashMap { &self.attrs } + + /// Get an owned [Iterator] of ([String], [AttributeValue]). + pub fn into_attributes(self) -> impl Iterator { + self.attrs.into_iter() + } } impl Default for Attributes { diff --git a/rust-runtime/aws-smithy-observability/src/global.rs b/rust-runtime/aws-smithy-observability/src/global.rs index d4cdf90b92..7355fe5257 100644 --- a/rust-runtime/aws-smithy-observability/src/global.rs +++ b/rust-runtime/aws-smithy-observability/src/global.rs @@ -17,28 +17,26 @@ use crate::provider::{GlobalTelemetryProvider, TelemetryProvider}; static GLOBAL_TELEMETRY_PROVIDER: Lazy> = Lazy::new(|| RwLock::new(GlobalTelemetryProvider::new(TelemetryProvider::default()))); -/// Set the current global [TelemetryProvider]. If [None] is supplied then a noop provider is set. -/// The previous [TelemetryProvider] is returned in an [Arc] so appropriate cleanup can be done if necessary. -pub fn set_global_telemetry_provider( - new_provider: Option, -) -> Arc { +/// Set the current global [TelemetryProvider]. +/// +/// This is meant to be run once at the beginning of an application. It will panic if two threads +/// attempt to call it at the same time. +pub fn set_telemetry_provider(new_provider: TelemetryProvider) { // TODO(smithyObservability): would probably be nicer to return a Result here, but the Guard held by the error from // .try_write is not Send so I struggled to build an ObservabilityError from it let mut old_provider = GLOBAL_TELEMETRY_PROVIDER .try_write() .expect("GLOBAL_TELEMETRY_PROVIDER RwLock Poisoned"); - let new_global_provider = if let Some(tp) = new_provider { - GlobalTelemetryProvider::new(tp) - } else { - GlobalTelemetryProvider::new(TelemetryProvider::default()) - }; + let new_global_provider = GlobalTelemetryProvider::new(new_provider); - mem::replace(&mut *old_provider, new_global_provider).telemetry_provider + let _ = mem::replace(&mut *old_provider, new_global_provider); } /// Get an [Arc] reference to the current global [TelemetryProvider]. -pub fn get_global_telemetry_provider() -> Arc { +/// +/// This can panic if called when another thread is calling [set_telemetry_provider]. +pub fn get_telemetry_provider() -> Arc { // TODO(smithyObservability): would probably be nicer to return a Result here, but the Guard held by the error from // .try_read is not Send so I struggled to build an ObservabilityError from it GLOBAL_TELEMETRY_PROVIDER @@ -62,16 +60,13 @@ mod tests { let my_provider = TelemetryProvider::default(); // Set the new counter and get a reference to the old one - let old_provider = set_global_telemetry_provider(Some(my_provider)); - - // Call shutdown on the old meter provider - let _old_meter = old_provider.meter_provider().shutdown().unwrap(); + set_telemetry_provider(my_provider); } #[test] #[serial] fn can_get_global_telemetry_provider() { - let curr_provider = get_global_telemetry_provider(); + let curr_provider = get_telemetry_provider(); // Use the global provider to create an instrument and record a value with it let curr_mp = curr_provider.meter_provider(); diff --git a/rust-runtime/aws-smithy-observability/src/lib.rs b/rust-runtime/aws-smithy-observability/src/lib.rs index 0c779de506..06261214e3 100644 --- a/rust-runtime/aws-smithy-observability/src/lib.rs +++ b/rust-runtime/aws-smithy-observability/src/lib.rs @@ -14,12 +14,15 @@ )] //! Smithy Observability -//TODO(smithyobservability): once we have finalized everything and integrated metrics with our runtime +// TODO(smithyobservability): once we have finalized everything and integrated metrics with our runtime // libraries update this with detailed usage docs and examples pub mod attributes; +pub use attributes::{AttributeValue, Attributes}; pub mod error; +pub use error::{ErrorKind, ObservabilityError}; pub mod global; pub mod meter; mod noop; pub mod provider; +pub use provider::{TelemetryProvider, TelemetryProviderBuilder}; diff --git a/rust-runtime/aws-smithy-observability/src/meter.rs b/rust-runtime/aws-smithy-observability/src/meter.rs index f7971ae5f4..3464fa21cf 100644 --- a/rust-runtime/aws-smithy-observability/src/meter.rs +++ b/rust-runtime/aws-smithy-observability/src/meter.rs @@ -7,22 +7,14 @@ //! real time. use crate::attributes::{Attributes, Context}; -use crate::error::ObservabilityError; /// Provides named instances of [Meter]. pub trait MeterProvider { /// Get or create a named [Meter]. fn get_meter(&self, scope: &'static str, attributes: Option<&Attributes>) -> Box; - /// Optional method to flush the metrics pipeline, default is noop - fn flush(&self) -> Result<(), ObservabilityError> { - Ok(()) - } - - /// Optional method to shutdown the metrics provider, default is noop - fn shutdown(&self) -> Result<(), ObservabilityError> { - Ok(()) - } + /// Foo + fn as_any(&self) -> &dyn std::any::Any; } /// The entry point to creating instruments. A grouping of related metrics. diff --git a/rust-runtime/aws-smithy-observability/src/noop.rs b/rust-runtime/aws-smithy-observability/src/noop.rs index f86b633cfa..291826a1c9 100644 --- a/rust-runtime/aws-smithy-observability/src/noop.rs +++ b/rust-runtime/aws-smithy-observability/src/noop.rs @@ -17,6 +17,10 @@ impl MeterProvider for NoopMeterProvider { fn get_meter(&self, _scope: &'static str, _attributes: Option<&Attributes>) -> Box { Box::new(NoopMeter) } + + fn as_any(&self) -> &dyn std::any::Any { + self + } } pub(crate) struct NoopMeter; diff --git a/rust-runtime/aws-smithy-observability/src/provider.rs b/rust-runtime/aws-smithy-observability/src/provider.rs index 43a9de6e59..12fc0f91ea 100644 --- a/rust-runtime/aws-smithy-observability/src/provider.rs +++ b/rust-runtime/aws-smithy-observability/src/provider.rs @@ -23,12 +23,23 @@ impl TelemetryProvider { } } + /// Returns a noop [TelemetryProvider] + pub fn noop() -> TelemetryProvider { + Self { + meter_provider: Box::new(NoopMeterProvider), + } + } + /// Get the set [MeterProvider] pub fn meter_provider(&self) -> &(dyn MeterProvider + Send + Sync) { self.meter_provider.as_ref() } } +// If we choose to expand our Telemetry provider and make Logging and Tracing +// configurable at some point in the future we can do that by adding default +// logger_provider and tracer_providers based on `tracing` to maintain backwards +// compatibilty with what we have today. impl Default for TelemetryProvider { fn default() -> Self { Self { @@ -37,10 +48,6 @@ impl Default for TelemetryProvider { } } -// If we choose to expand our Telemetry provider and make Logging and Tracing -// configurable at some point in the future we can do that by adding default -// logger_provider and tracer_providers based on `tracing` to maintain backwards -// compatibilty with what we have today. /// A builder for [TelemetryProvider]. #[non_exhaustive] pub struct TelemetryProviderBuilder { @@ -49,8 +56,11 @@ pub struct TelemetryProviderBuilder { impl TelemetryProviderBuilder { /// Set the [MeterProvider]. - pub fn meter_provider(mut self, meter_provider: Box) -> Self { - self.meter_provider = meter_provider; + pub fn meter_provider( + mut self, + meter_provider: impl MeterProvider + Send + Sync + 'static, + ) -> Self { + self.meter_provider = Box::new(meter_provider); self }