From 2020f542f818b317cc791acce9a1b63cbc161bfd Mon Sep 17 00:00:00 2001 From: Zak Stucke Date: Thu, 15 Feb 2024 19:50:14 +0200 Subject: [PATCH] Added func to add trace parent to span from http headers, and reverse to add current span/context to http response --- py/bitbazaar/tracing/_setup.py | 1 + py_rust/rust-toolchain.toml | 2 +- rust/Cargo.lock | 26 ++++-- rust/Cargo.toml | 2 + .../logging/global_log/global_fns.rs | 54 ++++++++++++ .../logging/global_log/http_headers.rs | 32 +++++++ rust/bitbazaar/logging/global_log/mod.rs | 3 + rust/bitbazaar/logging/global_log/out.rs | 88 ++++++++++++++----- rust/bitbazaar/logging/global_log/setup.rs | 21 ++++- rust/bitbazaar/logging/mod.rs | 4 +- 10 files changed, 201 insertions(+), 32 deletions(-) create mode 100644 rust/bitbazaar/logging/global_log/global_fns.rs create mode 100644 rust/bitbazaar/logging/global_log/http_headers.rs diff --git a/py/bitbazaar/tracing/_setup.py b/py/bitbazaar/tracing/_setup.py index cd39ceb3..1c71e4f5 100644 --- a/py/bitbazaar/tracing/_setup.py +++ b/py/bitbazaar/tracing/_setup.py @@ -129,6 +129,7 @@ def prepare_providers( otlp["port"] ) ) + endpoint = "localhost:{}".format(otlp["port"]) log_provider.add_log_record_processor( BatchLogRecordProcessor( diff --git a/py_rust/rust-toolchain.toml b/py_rust/rust-toolchain.toml index 5d56faf9..292fe499 100644 --- a/py_rust/rust-toolchain.toml +++ b/py_rust/rust-toolchain.toml @@ -1,2 +1,2 @@ [toolchain] -channel = "nightly" +channel = "stable" diff --git a/rust/Cargo.lock b/rust/Cargo.lock index d502d149..8d47953a 100644 --- a/rust/Cargo.lock +++ b/rust/Cargo.lock @@ -123,7 +123,7 @@ dependencies = [ "bitflags 1.3.2", "bytes", "futures-util", - "http", + "http 0.2.11", "http-body", "hyper", "itoa", @@ -149,7 +149,7 @@ dependencies = [ "async-trait", "bytes", "futures-util", - "http", + "http 0.2.11", "http-body", "mime", "rustversion", @@ -191,6 +191,7 @@ dependencies = [ "error-stack", "homedir", "hostname", + "http 1.0.0", "normpath", "once_cell", "opentelemetry", @@ -649,7 +650,7 @@ dependencies = [ "futures-core", "futures-sink", "futures-util", - "http", + "http 0.2.11", "indexmap 2.2.2", "slab", "tokio", @@ -717,6 +718,17 @@ dependencies = [ "itoa", ] +[[package]] +name = "http" +version = "1.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b32afd38673a8016f7c9ae69e5af41a58f81b1d31689040f2f1959594ce194ea" +dependencies = [ + "bytes", + "fnv", + "itoa", +] + [[package]] name = "http-body" version = "0.4.6" @@ -724,7 +736,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7ceab25649e9960c0311ea418d17bee82c0dcec1bd053b5f9a66e265a693bed2" dependencies = [ "bytes", - "http", + "http 0.2.11", "pin-project-lite", ] @@ -751,7 +763,7 @@ dependencies = [ "futures-core", "futures-util", "h2", - "http", + "http 0.2.11", "http-body", "httparse", "httpdate", @@ -1060,7 +1072,7 @@ checksum = "f24cda83b20ed2433c68241f918d0f6fdec8b1d43b7a9590ab4420c5095ca930" dependencies = [ "async-trait", "futures-core", - "http", + "http 0.2.11", "opentelemetry", "opentelemetry-proto", "opentelemetry-semantic-conventions", @@ -1770,7 +1782,7 @@ dependencies = [ "futures-core", "futures-util", "h2", - "http", + "http 0.2.11", "http-body", "hyper", "hyper-timeout", diff --git a/rust/Cargo.toml b/rust/Cargo.toml index 693229a7..a12cc574 100644 --- a/rust/Cargo.toml +++ b/rust/Cargo.toml @@ -23,6 +23,7 @@ opentelemetry = [ 'dep:opentelemetry', 'dep:opentelemetry-otlp', 'dep:opentelemetry-semantic-conventions', + 'dep:http', ] # Add your dependencies here @@ -65,6 +66,7 @@ opentelemetry = { version = "0.21", default-features = false, features = [ ], optional = true } opentelemetry-otlp = { version = "0.14", features = ["logs", "trace", "metrics"], optional = true } opentelemetry-semantic-conventions = { version = "0.13.0", optional = true } +http = { version = "1.0", optional = true } [dev-dependencies] rstest = "0.18" diff --git a/rust/bitbazaar/logging/global_log/global_fns.rs b/rust/bitbazaar/logging/global_log/global_fns.rs new file mode 100644 index 00000000..0605bfc9 --- /dev/null +++ b/rust/bitbazaar/logging/global_log/global_fns.rs @@ -0,0 +1,54 @@ +use std::borrow::Cow; + +use parking_lot::{MappedMutexGuard, MutexGuard}; + +use super::{out::GLOBAL_LOG, GlobalLog}; +use crate::prelude::*; + +#[cfg(feature = "opentelemetry")] +/// Returns a new [Meter] with the provided name and default configuration. +/// +/// A [Meter] should be scoped at most to a single application or crate. The +/// name needs to be unique so it does not collide with other names used by +/// an application, nor other applications. +/// +/// If the name is empty, then an implementation defined default name will +/// be used instead. +pub fn meter(name: impl Into>) -> Result { + get_global()?.meter(name) +} + +#[cfg(feature = "opentelemetry")] +/// Connect this program's span to the trace that is represented by the provided HTTP headers. +/// E.g. connect an axum handler's trace/span to the nginx trace/span. +pub fn set_span_parent_from_http_headers( + span: &tracing::Span, + headers: &http::HeaderMap, +) -> Result<(), AnyErr> { + get_global()?.set_span_parent_from_http_headers(span, headers) +} + +#[cfg(feature = "opentelemetry")] +/// Set the response headers from the current span context. So downstream services can continue the current trace. +pub fn set_response_headers_from_ctx(response: &mut http::Response) -> Result<(), AnyErr> { + get_global()?.set_response_headers_from_ctx(response) +} + +/// Force through logs, traces and metrics, useful in e.g. testing. +/// +/// Note there doesn't seem to be an underlying interface to force through metrics. +pub fn flush() -> Result<(), AnyErr> { + get_global()?.flush() +} + +/// Shutdown the logger, traces and metrics, should be called when the program is about to exit. +pub fn shutdown() -> Result<(), AnyErr> { + get_global()?.shutdown() +} + +fn get_global<'a>() -> Result, AnyErr> { + if GLOBAL_LOG.lock().is_none() { + return Err(anyerr!("GlobalLog not registered!")); + } + Ok(MutexGuard::map(GLOBAL_LOG.lock(), |x| x.as_mut().unwrap())) +} diff --git a/rust/bitbazaar/logging/global_log/http_headers.rs b/rust/bitbazaar/logging/global_log/http_headers.rs new file mode 100644 index 00000000..ea88f5d8 --- /dev/null +++ b/rust/bitbazaar/logging/global_log/http_headers.rs @@ -0,0 +1,32 @@ +use opentelemetry::propagation::{Extractor, Injector}; + +// copy from crate opentelemetry-http (to not be dependants of on 3rd: http, ...) +pub struct HeaderInjector<'a>(pub &'a mut http::HeaderMap); + +impl<'a> Injector for HeaderInjector<'a> { + /// Set a key and value in the `HeaderMap`. Does nothing if the key or value are not valid inputs. + fn set(&mut self, key: &str, value: String) { + if let Ok(name) = http::header::HeaderName::from_bytes(key.as_bytes()) { + if let Ok(val) = http::header::HeaderValue::from_str(&value) { + self.0.insert(name, val); + } + } + } +} + +pub struct HeaderExtractor<'a>(pub &'a http::HeaderMap); + +impl<'a> Extractor for HeaderExtractor<'a> { + /// Get a value for a key from the `HeaderMap`. If the value is not valid ASCII, returns None. + fn get(&self, key: &str) -> Option<&str> { + self.0.get(key).and_then(|value| value.to_str().ok()) + } + + /// Collect all the keys from the `HeaderMap`. + fn keys(&self) -> Vec<&str> { + self.0 + .keys() + .map(http::HeaderName::as_str) + .collect::>() + } +} diff --git a/rust/bitbazaar/logging/global_log/mod.rs b/rust/bitbazaar/logging/global_log/mod.rs index e4f5984a..3181450f 100644 --- a/rust/bitbazaar/logging/global_log/mod.rs +++ b/rust/bitbazaar/logging/global_log/mod.rs @@ -1,4 +1,7 @@ mod builder; +pub mod global_fns; +#[cfg(feature = "opentelemetry")] +mod http_headers; mod out; mod setup; diff --git a/rust/bitbazaar/logging/global_log/out.rs b/rust/bitbazaar/logging/global_log/out.rs index 2b24327a..aa71237b 100644 --- a/rust/bitbazaar/logging/global_log/out.rs +++ b/rust/bitbazaar/logging/global_log/out.rs @@ -9,7 +9,7 @@ use tracing_subscriber::prelude::*; use crate::errors::prelude::*; // When registering globally, hoist the guards out into here, to allow the CreatedSubscriber to go out of scope but keep the guards permanently. -static GLOBAL_GUARDS: Lazy>>> = Lazy::new(Mutex::default); +pub static GLOBAL_LOG: Lazy>> = Lazy::new(Mutex::default); /// The global logger/tracer for stdout, file and full open telemetry. Works with the tracing crates (info!, debug!, warn!, error!) and span funcs and decorators. /// @@ -44,7 +44,7 @@ pub struct GlobalLog { /// Need to store these guards, when they go out of scope the logging may stop. /// When made global these are hoisted into a static lazy var. - pub(crate) guards: Vec, + pub(crate) _guards: Vec, #[cfg(feature = "opentelemetry")] pub(crate) otlp_providers: OtlpProviders, @@ -79,12 +79,10 @@ impl GlobalLog { /// Register the logger as the global logger/tracer/metric manager, can only be done once during the lifetime of the program. /// /// If you need temporary globality, use the [`GlobalLog::as_tmp_global`] method. - pub fn register_global(&mut self) -> Result<(), AnyErr> { + pub fn register_global(mut self) -> Result<(), AnyErr> { if let Some(dispatch) = self.dispatch.take() { - // Keep hold of the guards: - GLOBAL_GUARDS - .lock() - .replace(std::mem::take(&mut self.guards)); + // Make it global: + GLOBAL_LOG.lock().replace(self); dispatch.init(); Ok(()) } else { @@ -93,18 +91,51 @@ impl GlobalLog { } #[cfg(feature = "opentelemetry")] - /// Returns a new [Meter] with the provided name and default configuration. - /// - /// A [Meter] should be scoped at most to a single application or crate. The - /// name needs to be unique so it does not collide with other names used by - /// an application, nor other applications. - /// - /// If the name is empty, then an implementation defined default name will - /// be used instead. - pub fn meter(&self, name: impl Into>) -> opentelemetry::metrics::Meter { + /// See [`super::global_fns::meter`]` + pub fn meter( + &self, + name: impl Into>, + ) -> Result { use opentelemetry::metrics::MeterProvider; - self.otlp_providers.meter_provider.meter(name) + Ok(self.otlp_providers.meter_provider.meter(name)) + } + + #[cfg(feature = "opentelemetry")] + /// See [`super::global_fns::update_span_with_http_headers`]` + pub fn set_span_parent_from_http_headers( + &self, + span: &tracing::Span, + headers: &http::HeaderMap, + ) -> Result<(), AnyErr> { + use tracing_opentelemetry::OpenTelemetrySpanExt; + + use crate::logging::global_log::http_headers::HeaderExtractor; + + let ctx_extractor = HeaderExtractor(headers); + let ctx = opentelemetry::global::get_text_map_propagator(|propagator| { + propagator.extract(&ctx_extractor) + }); + span.set_parent(ctx); + Ok(()) + } + + #[cfg(feature = "opentelemetry")] + /// See [`super::global_fns::set_response_headers_from_ctx`]` + pub fn set_response_headers_from_ctx( + &self, + response: &mut http::Response, + ) -> Result<(), AnyErr> { + use tracing_opentelemetry::OpenTelemetrySpanExt; + + use crate::logging::global_log::http_headers::HeaderInjector; + + let ctx = tracing::Span::current().context(); + let mut injector = HeaderInjector(response.headers_mut()); + opentelemetry::global::get_text_map_propagator(|propagator| { + propagator.inject_context(&ctx, &mut injector); + }); + Ok(()) } /// Temporarily make the logger global, for the duration of the given closure. @@ -118,9 +149,7 @@ impl GlobalLog { } } - /// Force through logs, traces and metrics, useful in e.g. testing. - /// - /// Note there doesn't seem to be an underlying interface to force through metrics. + /// See [`super::global_fns::flush`]` pub fn flush(&self) -> Result<(), AnyErr> { #[cfg(feature = "opentelemetry")] { @@ -137,4 +166,23 @@ impl GlobalLog { } Ok(()) } + + /// See [`super::global_fns::shutdown`]` + pub fn shutdown(&mut self) -> Result<(), AnyErr> { + #[cfg(feature = "opentelemetry")] + { + if let Some(prov) = &mut self.otlp_providers.logger_provider { + prov.shutdown(); + } + if let Some(prov) = &self.otlp_providers.tracer_provider { + // Doesn't have a shutdown interface. + prov.force_flush(); + } + self.otlp_providers + .meter_provider + .shutdown() + .change_context(AnyErr)?; + } + Ok(()) + } } diff --git a/rust/bitbazaar/logging/global_log/setup.rs b/rust/bitbazaar/logging/global_log/setup.rs index 887f2cb1..23de3549 100644 --- a/rust/bitbazaar/logging/global_log/setup.rs +++ b/rust/bitbazaar/logging/global_log/setup.rs @@ -109,8 +109,15 @@ pub fn builder_into_global_log(builder: GlobalLogBuilder) -> Result { + use opentelemetry::global::set_text_map_propagator; use opentelemetry_otlp::{new_exporter, new_pipeline, WithExportConfig}; - use opentelemetry_sdk::{logs as sdklogs, resource, trace as sdktrace}; + use opentelemetry_sdk::{ + logs as sdklogs, + propagation::{ + BaggagePropagator, TextMapCompositePropagator, TraceContextPropagator, + }, + resource, trace as sdktrace, + }; if !crate::misc::is_tcp_port_listening("localhost", otlp.port)? { return Err(anyerr!("Can't connect to open telemetry collector on local port {}. Are you sure it's running?", otlp.port)); @@ -119,6 +126,16 @@ pub fn builder_into_global_log(builder: GlobalLogBuilder) -> Result Result