Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added func to add trace parent to span from http headers, and reverse to add current span/context to http response #19

Merged
merged 1 commit into from
Feb 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions py/bitbazaar/tracing/_setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,7 @@ def prepare_providers(
otlp["port"]
)
)

endpoint = "localhost:{}".format(otlp["port"])
log_provider.add_log_record_processor(
BatchLogRecordProcessor(
Expand Down
2 changes: 1 addition & 1 deletion py_rust/rust-toolchain.toml
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
[toolchain]
channel = "nightly"
channel = "stable"
26 changes: 19 additions & 7 deletions rust/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions rust/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ opentelemetry = [
'dep:opentelemetry',
'dep:opentelemetry-otlp',
'dep:opentelemetry-semantic-conventions',
'dep:http',
]

# Add your dependencies here
Expand Down Expand Up @@ -65,6 +66,7 @@ opentelemetry = { version = "0.21", default-features = false, features = [
], optional = true }
opentelemetry-otlp = { version = "0.14", features = ["logs", "trace", "metrics"], optional = true }
opentelemetry-semantic-conventions = { version = "0.13.0", optional = true }
http = { version = "1.0", optional = true }

[dev-dependencies]
rstest = "0.18"
Expand Down
54 changes: 54 additions & 0 deletions rust/bitbazaar/logging/global_log/global_fns.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
use std::borrow::Cow;

use parking_lot::{MappedMutexGuard, MutexGuard};

use super::{out::GLOBAL_LOG, GlobalLog};
use crate::prelude::*;

#[cfg(feature = "opentelemetry")]
/// Returns a new [Meter] with the provided name and default configuration.
///
/// A [Meter] should be scoped at most to a single application or crate. The
/// name needs to be unique so it does not collide with other names used by
/// an application, nor other applications.
///
/// If the name is empty, then an implementation defined default name will
/// be used instead.
pub fn meter(name: impl Into<Cow<'static, str>>) -> Result<opentelemetry::metrics::Meter, AnyErr> {
get_global()?.meter(name)
}

#[cfg(feature = "opentelemetry")]
/// Connect this program's span to the trace that is represented by the provided HTTP headers.
/// E.g. connect an axum handler's trace/span to the nginx trace/span.
pub fn set_span_parent_from_http_headers(
span: &tracing::Span,
headers: &http::HeaderMap,
) -> Result<(), AnyErr> {
get_global()?.set_span_parent_from_http_headers(span, headers)
}

#[cfg(feature = "opentelemetry")]
/// Set the response headers from the current span context. So downstream services can continue the current trace.
pub fn set_response_headers_from_ctx<B>(response: &mut http::Response<B>) -> Result<(), AnyErr> {
get_global()?.set_response_headers_from_ctx(response)
}

/// Force through logs, traces and metrics, useful in e.g. testing.
///
/// Note there doesn't seem to be an underlying interface to force through metrics.
pub fn flush() -> Result<(), AnyErr> {
get_global()?.flush()
}

/// Shutdown the logger, traces and metrics, should be called when the program is about to exit.
pub fn shutdown() -> Result<(), AnyErr> {
get_global()?.shutdown()
}

fn get_global<'a>() -> Result<MappedMutexGuard<'a, GlobalLog>, AnyErr> {
if GLOBAL_LOG.lock().is_none() {
return Err(anyerr!("GlobalLog not registered!"));
}
Ok(MutexGuard::map(GLOBAL_LOG.lock(), |x| x.as_mut().unwrap()))
}
32 changes: 32 additions & 0 deletions rust/bitbazaar/logging/global_log/http_headers.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
use opentelemetry::propagation::{Extractor, Injector};

// copy from crate opentelemetry-http (to not be dependants of on 3rd: http, ...)
pub struct HeaderInjector<'a>(pub &'a mut http::HeaderMap);

impl<'a> Injector for HeaderInjector<'a> {
/// Set a key and value in the `HeaderMap`. Does nothing if the key or value are not valid inputs.
fn set(&mut self, key: &str, value: String) {
if let Ok(name) = http::header::HeaderName::from_bytes(key.as_bytes()) {
if let Ok(val) = http::header::HeaderValue::from_str(&value) {
self.0.insert(name, val);
}
}
}
}

pub struct HeaderExtractor<'a>(pub &'a http::HeaderMap);

impl<'a> Extractor for HeaderExtractor<'a> {
/// Get a value for a key from the `HeaderMap`. If the value is not valid ASCII, returns None.
fn get(&self, key: &str) -> Option<&str> {
self.0.get(key).and_then(|value| value.to_str().ok())
}

/// Collect all the keys from the `HeaderMap`.
fn keys(&self) -> Vec<&str> {
self.0
.keys()
.map(http::HeaderName::as_str)
.collect::<Vec<_>>()
}
}
3 changes: 3 additions & 0 deletions rust/bitbazaar/logging/global_log/mod.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
mod builder;
pub mod global_fns;
#[cfg(feature = "opentelemetry")]
mod http_headers;
mod out;
mod setup;

Expand Down
88 changes: 68 additions & 20 deletions rust/bitbazaar/logging/global_log/out.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use tracing_subscriber::prelude::*;
use crate::errors::prelude::*;

// When registering globally, hoist the guards out into here, to allow the CreatedSubscriber to go out of scope but keep the guards permanently.
static GLOBAL_GUARDS: Lazy<Mutex<Option<Vec<WorkerGuard>>>> = Lazy::new(Mutex::default);
pub static GLOBAL_LOG: Lazy<Mutex<Option<GlobalLog>>> = Lazy::new(Mutex::default);

/// The global logger/tracer for stdout, file and full open telemetry. Works with the tracing crates (info!, debug!, warn!, error!) and span funcs and decorators.
///
Expand Down Expand Up @@ -44,7 +44,7 @@ pub struct GlobalLog {

/// Need to store these guards, when they go out of scope the logging may stop.
/// When made global these are hoisted into a static lazy var.
pub(crate) guards: Vec<WorkerGuard>,
pub(crate) _guards: Vec<WorkerGuard>,

#[cfg(feature = "opentelemetry")]
pub(crate) otlp_providers: OtlpProviders,
Expand Down Expand Up @@ -79,12 +79,10 @@ impl GlobalLog {
/// Register the logger as the global logger/tracer/metric manager, can only be done once during the lifetime of the program.
///
/// If you need temporary globality, use the [`GlobalLog::as_tmp_global`] method.
pub fn register_global(&mut self) -> Result<(), AnyErr> {
pub fn register_global(mut self) -> Result<(), AnyErr> {
if let Some(dispatch) = self.dispatch.take() {
// Keep hold of the guards:
GLOBAL_GUARDS
.lock()
.replace(std::mem::take(&mut self.guards));
// Make it global:
GLOBAL_LOG.lock().replace(self);
dispatch.init();
Ok(())
} else {
Expand All @@ -93,18 +91,51 @@ impl GlobalLog {
}

#[cfg(feature = "opentelemetry")]
/// Returns a new [Meter] with the provided name and default configuration.
///
/// A [Meter] should be scoped at most to a single application or crate. The
/// name needs to be unique so it does not collide with other names used by
/// an application, nor other applications.
///
/// If the name is empty, then an implementation defined default name will
/// be used instead.
pub fn meter(&self, name: impl Into<Cow<'static, str>>) -> opentelemetry::metrics::Meter {
/// See [`super::global_fns::meter`]`
pub fn meter(
&self,
name: impl Into<Cow<'static, str>>,
) -> Result<opentelemetry::metrics::Meter, AnyErr> {
use opentelemetry::metrics::MeterProvider;

self.otlp_providers.meter_provider.meter(name)
Ok(self.otlp_providers.meter_provider.meter(name))
}

#[cfg(feature = "opentelemetry")]
/// See [`super::global_fns::update_span_with_http_headers`]`
pub fn set_span_parent_from_http_headers(
&self,
span: &tracing::Span,
headers: &http::HeaderMap,
) -> Result<(), AnyErr> {
use tracing_opentelemetry::OpenTelemetrySpanExt;

use crate::logging::global_log::http_headers::HeaderExtractor;

let ctx_extractor = HeaderExtractor(headers);
let ctx = opentelemetry::global::get_text_map_propagator(|propagator| {
propagator.extract(&ctx_extractor)
});
span.set_parent(ctx);
Ok(())
}

#[cfg(feature = "opentelemetry")]
/// See [`super::global_fns::set_response_headers_from_ctx`]`
pub fn set_response_headers_from_ctx<B>(
&self,
response: &mut http::Response<B>,
) -> Result<(), AnyErr> {
use tracing_opentelemetry::OpenTelemetrySpanExt;

use crate::logging::global_log::http_headers::HeaderInjector;

let ctx = tracing::Span::current().context();
let mut injector = HeaderInjector(response.headers_mut());
opentelemetry::global::get_text_map_propagator(|propagator| {
propagator.inject_context(&ctx, &mut injector);
});
Ok(())
}

/// Temporarily make the logger global, for the duration of the given closure.
Expand All @@ -118,9 +149,7 @@ impl GlobalLog {
}
}

/// Force through logs, traces and metrics, useful in e.g. testing.
///
/// Note there doesn't seem to be an underlying interface to force through metrics.
/// See [`super::global_fns::flush`]`
pub fn flush(&self) -> Result<(), AnyErr> {
#[cfg(feature = "opentelemetry")]
{
Expand All @@ -137,4 +166,23 @@ impl GlobalLog {
}
Ok(())
}

/// See [`super::global_fns::shutdown`]`
pub fn shutdown(&mut self) -> Result<(), AnyErr> {
#[cfg(feature = "opentelemetry")]
{
if let Some(prov) = &mut self.otlp_providers.logger_provider {
prov.shutdown();
}
if let Some(prov) = &self.otlp_providers.tracer_provider {
// Doesn't have a shutdown interface.
prov.force_flush();
}
self.otlp_providers
.meter_provider
.shutdown()
.change_context(AnyErr)?;
}
Ok(())
}
}
21 changes: 19 additions & 2 deletions rust/bitbazaar/logging/global_log/setup.rs
Original file line number Diff line number Diff line change
Expand Up @@ -109,8 +109,15 @@ pub fn builder_into_global_log(builder: GlobalLogBuilder) -> Result<GlobalLog, A
}
#[cfg(feature = "opentelemetry")]
super::builder::Output::Otlp(otlp) => {
use opentelemetry::global::set_text_map_propagator;
use opentelemetry_otlp::{new_exporter, new_pipeline, WithExportConfig};
use opentelemetry_sdk::{logs as sdklogs, resource, trace as sdktrace};
use opentelemetry_sdk::{
logs as sdklogs,
propagation::{
BaggagePropagator, TextMapCompositePropagator, TraceContextPropagator,
},
resource, trace as sdktrace,
};

if !crate::misc::is_tcp_port_listening("localhost", otlp.port)? {
return Err(anyerr!("Can't connect to open telemetry collector on local port {}. Are you sure it's running?", otlp.port));
Expand All @@ -119,6 +126,16 @@ pub fn builder_into_global_log(builder: GlobalLogBuilder) -> Result<GlobalLog, A
let endpoint = format!("grpc://localhost:{}", otlp.port);
let get_exporter = || new_exporter().tonic().with_endpoint(&endpoint);

// Configure the global propagator to use between different services, without this step when you try and connect other services they'll strangely not work (this defaults to a no-op)
//
// Only enable to the 2 main standard propagators, the w3c trace context and baggage.
//
// https://opentelemetry.io/docs/concepts/sdk-configuration/general-sdk-configuration/#otel_propagators
set_text_map_propagator(TextMapCompositePropagator::new(vec![
Box::new(TraceContextPropagator::new()),
Box::new(BaggagePropagator::new()),
]));

let resource = resource::Resource::new(vec![
opentelemetry::KeyValue::new(
opentelemetry_semantic_conventions::resource::SERVICE_NAME,
Expand Down Expand Up @@ -186,7 +203,7 @@ pub fn builder_into_global_log(builder: GlobalLogBuilder) -> Result<GlobalLog, A
let dispatch: Dispatch = subscriber.into();
Ok(GlobalLog {
dispatch: Some(dispatch),
guards,
_guards: guards,
#[cfg(feature = "opentelemetry")]
otlp_providers,
})
Expand Down
Loading
Loading