Skip to content

Commit

Permalink
Added func to add trace parent to span from http headers, and reverse…
Browse files Browse the repository at this point in the history
… to add current span/context to http response
  • Loading branch information
zakstucke committed Feb 15, 2024
1 parent b9c3eec commit 2020f54
Show file tree
Hide file tree
Showing 10 changed files with 201 additions and 32 deletions.
1 change: 1 addition & 0 deletions py/bitbazaar/tracing/_setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,7 @@ def prepare_providers(
otlp["port"]
)
)

endpoint = "localhost:{}".format(otlp["port"])
log_provider.add_log_record_processor(
BatchLogRecordProcessor(
Expand Down
2 changes: 1 addition & 1 deletion py_rust/rust-toolchain.toml
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
[toolchain]
channel = "nightly"
channel = "stable"
26 changes: 19 additions & 7 deletions rust/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions rust/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ opentelemetry = [
'dep:opentelemetry',
'dep:opentelemetry-otlp',
'dep:opentelemetry-semantic-conventions',
'dep:http',
]

# Add your dependencies here
Expand Down Expand Up @@ -65,6 +66,7 @@ opentelemetry = { version = "0.21", default-features = false, features = [
], optional = true }
opentelemetry-otlp = { version = "0.14", features = ["logs", "trace", "metrics"], optional = true }
opentelemetry-semantic-conventions = { version = "0.13.0", optional = true }
http = { version = "1.0", optional = true }

[dev-dependencies]
rstest = "0.18"
Expand Down
54 changes: 54 additions & 0 deletions rust/bitbazaar/logging/global_log/global_fns.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
use std::borrow::Cow;

use parking_lot::{MappedMutexGuard, MutexGuard};

use super::{out::GLOBAL_LOG, GlobalLog};
use crate::prelude::*;

#[cfg(feature = "opentelemetry")]
/// Returns a new [Meter] with the provided name and default configuration.
///
/// A [Meter] should be scoped at most to a single application or crate. The
/// name needs to be unique so it does not collide with other names used by
/// an application, nor other applications.
///
/// If the name is empty, then an implementation defined default name will
/// be used instead.
pub fn meter(name: impl Into<Cow<'static, str>>) -> Result<opentelemetry::metrics::Meter, AnyErr> {
get_global()?.meter(name)
}

#[cfg(feature = "opentelemetry")]
/// Connect this program's span to the trace that is represented by the provided HTTP headers.
/// E.g. connect an axum handler's trace/span to the nginx trace/span.
pub fn set_span_parent_from_http_headers(
span: &tracing::Span,
headers: &http::HeaderMap,
) -> Result<(), AnyErr> {
get_global()?.set_span_parent_from_http_headers(span, headers)
}

#[cfg(feature = "opentelemetry")]
/// Set the response headers from the current span context. So downstream services can continue the current trace.
pub fn set_response_headers_from_ctx<B>(response: &mut http::Response<B>) -> Result<(), AnyErr> {
get_global()?.set_response_headers_from_ctx(response)
}

/// Force through logs, traces and metrics, useful in e.g. testing.
///
/// Note there doesn't seem to be an underlying interface to force through metrics.
pub fn flush() -> Result<(), AnyErr> {
get_global()?.flush()
}

/// Shutdown the logger, traces and metrics, should be called when the program is about to exit.
pub fn shutdown() -> Result<(), AnyErr> {
get_global()?.shutdown()
}

fn get_global<'a>() -> Result<MappedMutexGuard<'a, GlobalLog>, AnyErr> {
if GLOBAL_LOG.lock().is_none() {
return Err(anyerr!("GlobalLog not registered!"));
}
Ok(MutexGuard::map(GLOBAL_LOG.lock(), |x| x.as_mut().unwrap()))
}
32 changes: 32 additions & 0 deletions rust/bitbazaar/logging/global_log/http_headers.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
use opentelemetry::propagation::{Extractor, Injector};

// copy from crate opentelemetry-http (to not be dependants of on 3rd: http, ...)
pub struct HeaderInjector<'a>(pub &'a mut http::HeaderMap);

impl<'a> Injector for HeaderInjector<'a> {
/// Set a key and value in the `HeaderMap`. Does nothing if the key or value are not valid inputs.
fn set(&mut self, key: &str, value: String) {
if let Ok(name) = http::header::HeaderName::from_bytes(key.as_bytes()) {
if let Ok(val) = http::header::HeaderValue::from_str(&value) {
self.0.insert(name, val);
}
}
}
}

pub struct HeaderExtractor<'a>(pub &'a http::HeaderMap);

impl<'a> Extractor for HeaderExtractor<'a> {
/// Get a value for a key from the `HeaderMap`. If the value is not valid ASCII, returns None.
fn get(&self, key: &str) -> Option<&str> {
self.0.get(key).and_then(|value| value.to_str().ok())
}

/// Collect all the keys from the `HeaderMap`.
fn keys(&self) -> Vec<&str> {
self.0
.keys()
.map(http::HeaderName::as_str)
.collect::<Vec<_>>()
}
}
3 changes: 3 additions & 0 deletions rust/bitbazaar/logging/global_log/mod.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
mod builder;
pub mod global_fns;
#[cfg(feature = "opentelemetry")]
mod http_headers;
mod out;
mod setup;

Expand Down
88 changes: 68 additions & 20 deletions rust/bitbazaar/logging/global_log/out.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use tracing_subscriber::prelude::*;
use crate::errors::prelude::*;

// When registering globally, hoist the guards out into here, to allow the CreatedSubscriber to go out of scope but keep the guards permanently.
static GLOBAL_GUARDS: Lazy<Mutex<Option<Vec<WorkerGuard>>>> = Lazy::new(Mutex::default);
pub static GLOBAL_LOG: Lazy<Mutex<Option<GlobalLog>>> = Lazy::new(Mutex::default);

/// The global logger/tracer for stdout, file and full open telemetry. Works with the tracing crates (info!, debug!, warn!, error!) and span funcs and decorators.
///
Expand Down Expand Up @@ -44,7 +44,7 @@ pub struct GlobalLog {

/// Need to store these guards, when they go out of scope the logging may stop.
/// When made global these are hoisted into a static lazy var.
pub(crate) guards: Vec<WorkerGuard>,
pub(crate) _guards: Vec<WorkerGuard>,

#[cfg(feature = "opentelemetry")]
pub(crate) otlp_providers: OtlpProviders,
Expand Down Expand Up @@ -79,12 +79,10 @@ impl GlobalLog {
/// Register the logger as the global logger/tracer/metric manager, can only be done once during the lifetime of the program.
///
/// If you need temporary globality, use the [`GlobalLog::as_tmp_global`] method.
pub fn register_global(&mut self) -> Result<(), AnyErr> {
pub fn register_global(mut self) -> Result<(), AnyErr> {
if let Some(dispatch) = self.dispatch.take() {
// Keep hold of the guards:
GLOBAL_GUARDS
.lock()
.replace(std::mem::take(&mut self.guards));
// Make it global:
GLOBAL_LOG.lock().replace(self);
dispatch.init();
Ok(())
} else {
Expand All @@ -93,18 +91,51 @@ impl GlobalLog {
}

#[cfg(feature = "opentelemetry")]
/// Returns a new [Meter] with the provided name and default configuration.
///
/// A [Meter] should be scoped at most to a single application or crate. The
/// name needs to be unique so it does not collide with other names used by
/// an application, nor other applications.
///
/// If the name is empty, then an implementation defined default name will
/// be used instead.
pub fn meter(&self, name: impl Into<Cow<'static, str>>) -> opentelemetry::metrics::Meter {
/// See [`super::global_fns::meter`]`
pub fn meter(
&self,
name: impl Into<Cow<'static, str>>,
) -> Result<opentelemetry::metrics::Meter, AnyErr> {
use opentelemetry::metrics::MeterProvider;

self.otlp_providers.meter_provider.meter(name)
Ok(self.otlp_providers.meter_provider.meter(name))
}

#[cfg(feature = "opentelemetry")]
/// See [`super::global_fns::update_span_with_http_headers`]`
pub fn set_span_parent_from_http_headers(
&self,
span: &tracing::Span,
headers: &http::HeaderMap,
) -> Result<(), AnyErr> {
use tracing_opentelemetry::OpenTelemetrySpanExt;

use crate::logging::global_log::http_headers::HeaderExtractor;

let ctx_extractor = HeaderExtractor(headers);
let ctx = opentelemetry::global::get_text_map_propagator(|propagator| {
propagator.extract(&ctx_extractor)
});
span.set_parent(ctx);
Ok(())
}

#[cfg(feature = "opentelemetry")]
/// See [`super::global_fns::set_response_headers_from_ctx`]`
pub fn set_response_headers_from_ctx<B>(
&self,
response: &mut http::Response<B>,
) -> Result<(), AnyErr> {
use tracing_opentelemetry::OpenTelemetrySpanExt;

use crate::logging::global_log::http_headers::HeaderInjector;

let ctx = tracing::Span::current().context();
let mut injector = HeaderInjector(response.headers_mut());
opentelemetry::global::get_text_map_propagator(|propagator| {
propagator.inject_context(&ctx, &mut injector);
});
Ok(())
}

/// Temporarily make the logger global, for the duration of the given closure.
Expand All @@ -118,9 +149,7 @@ impl GlobalLog {
}
}

/// Force through logs, traces and metrics, useful in e.g. testing.
///
/// Note there doesn't seem to be an underlying interface to force through metrics.
/// See [`super::global_fns::flush`]`
pub fn flush(&self) -> Result<(), AnyErr> {
#[cfg(feature = "opentelemetry")]
{
Expand All @@ -137,4 +166,23 @@ impl GlobalLog {
}
Ok(())
}

/// See [`super::global_fns::shutdown`]`
pub fn shutdown(&mut self) -> Result<(), AnyErr> {
#[cfg(feature = "opentelemetry")]
{
if let Some(prov) = &mut self.otlp_providers.logger_provider {
prov.shutdown();
}
if let Some(prov) = &self.otlp_providers.tracer_provider {
// Doesn't have a shutdown interface.
prov.force_flush();
}
self.otlp_providers
.meter_provider
.shutdown()
.change_context(AnyErr)?;
}
Ok(())
}
}
21 changes: 19 additions & 2 deletions rust/bitbazaar/logging/global_log/setup.rs
Original file line number Diff line number Diff line change
Expand Up @@ -109,8 +109,15 @@ pub fn builder_into_global_log(builder: GlobalLogBuilder) -> Result<GlobalLog, A
}
#[cfg(feature = "opentelemetry")]
super::builder::Output::Otlp(otlp) => {
use opentelemetry::global::set_text_map_propagator;
use opentelemetry_otlp::{new_exporter, new_pipeline, WithExportConfig};
use opentelemetry_sdk::{logs as sdklogs, resource, trace as sdktrace};
use opentelemetry_sdk::{
logs as sdklogs,
propagation::{
BaggagePropagator, TextMapCompositePropagator, TraceContextPropagator,
},
resource, trace as sdktrace,
};

if !crate::misc::is_tcp_port_listening("localhost", otlp.port)? {
return Err(anyerr!("Can't connect to open telemetry collector on local port {}. Are you sure it's running?", otlp.port));
Expand All @@ -119,6 +126,16 @@ pub fn builder_into_global_log(builder: GlobalLogBuilder) -> Result<GlobalLog, A
let endpoint = format!("grpc://localhost:{}", otlp.port);
let get_exporter = || new_exporter().tonic().with_endpoint(&endpoint);

// Configure the global propagator to use between different services, without this step when you try and connect other services they'll strangely not work (this defaults to a no-op)
//
// Only enable to the 2 main standard propagators, the w3c trace context and baggage.
//
// https://opentelemetry.io/docs/concepts/sdk-configuration/general-sdk-configuration/#otel_propagators
set_text_map_propagator(TextMapCompositePropagator::new(vec![
Box::new(TraceContextPropagator::new()),
Box::new(BaggagePropagator::new()),
]));

let resource = resource::Resource::new(vec![
opentelemetry::KeyValue::new(
opentelemetry_semantic_conventions::resource::SERVICE_NAME,
Expand Down Expand Up @@ -186,7 +203,7 @@ pub fn builder_into_global_log(builder: GlobalLogBuilder) -> Result<GlobalLog, A
let dispatch: Dispatch = subscriber.into();
Ok(GlobalLog {
dispatch: Some(dispatch),
guards,
_guards: guards,
#[cfg(feature = "opentelemetry")]
otlp_providers,
})
Expand Down
Loading

0 comments on commit 2020f54

Please sign in to comment.