Skip to content

Commit

Permalink
WIP: enable tracing for development purposes
Browse files Browse the repository at this point in the history
  • Loading branch information
dandavison committed Oct 17, 2024
1 parent 9690510 commit a66cd7c
Show file tree
Hide file tree
Showing 6 changed files with 86 additions and 2 deletions.
1 change: 1 addition & 0 deletions core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ tokio-util = { version = "0.7", features = ["io", "io-util"] }
tokio-stream = "0.1"
tonic = { workspace = true, features = ["tls", "tls-roots"] }
tracing = "0.1"
tracing-opentelemetry = "0.25"
tracing-subscriber = { version = "0.3", features = ["parking_lot", "env-filter", "registry"] }
url = "2.2"
uuid = { version = "1.1", features = ["v4"] }
Expand Down
56 changes: 55 additions & 1 deletion core/src/telemetry/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,16 @@ mod prometheus_server;
#[cfg(feature = "otel")]
pub use metrics::{default_buckets_for, MetricsCallBuffer};
#[cfg(feature = "otel")]
use opentelemetry::{
self,
trace::{SpanKind, Tracer, TracerProvider},
KeyValue,
};
use opentelemetry_otlp::WithExportConfig;
#[cfg(feature = "otel")]
use opentelemetry_sdk;
use otel::default_resource_instance;
#[cfg(feature = "otel")]
pub use otel::{build_otlp_metric_exporter, start_prometheus_metric_exporter};

pub use log_export::{CoreLogBuffer, CoreLogBufferedConsumer, CoreLogStreamConsumer};
Expand Down Expand Up @@ -166,6 +176,7 @@ pub fn telemetry_init(opts: TelemetryOptions) -> Result<TelemetryInstance, anyho
let mut console_pretty_layer = None;
let mut console_compact_layer = None;
let mut forward_layer = None;
let mut export_layer = None;
// ===================================

let tracing_sub = opts.logging.map(|logger| {
Expand Down Expand Up @@ -207,10 +218,53 @@ pub fn telemetry_init(opts: TelemetryOptions) -> Result<TelemetryInstance, anyho
Some(CoreLogConsumerLayer::new(consumer).with_filter(EnvFilter::new(filter)));
}
};

let runtime = tokio::runtime::Builder::new_multi_thread()
.thread_name("telemetry")
.worker_threads(2)
.enable_all()
.build()
.unwrap();

// create otel export layer
runtime.block_on(async {
let tracer_cfg = opentelemetry_sdk::trace::Config::default()
.with_resource(default_resource_instance().clone());
let provider = opentelemetry_otlp::new_pipeline()
.tracing()
.with_exporter(
opentelemetry_otlp::new_exporter()
.tonic()
.with_endpoint("grpc://localhost:4317".to_string()),
// .with_metadata(MetadataMap::from_headers(headers.try_into()?)),
)
.with_trace_config(tracer_cfg)
// Using install_simple instead for now because install_batch is not producing spans and is emitting this error message:
// OpenTelemetry trace error occurred. cannot send message to batch processor as the channel is closed
// .install_batch(opentelemetry_sdk::runtime::Tokio)
.install_simple()
.unwrap();
opentelemetry::global::set_tracer_provider(provider.clone());

let tracer = provider.tracer_builder("sdk-core").build();
let opentelemetry = tracing_opentelemetry::layer().with_tracer(tracer);
// .with_filter(EnvFilter::new(&tracing.filter))
export_layer = Some(opentelemetry);

let tracer = provider.tracer("sdk-core");

let _span = tracer
.span_builder("telemetry_init")
.with_kind(SpanKind::Server)
.with_attributes([KeyValue::new("temporal.worker", true)])
.start(&tracer);
});

let reg = tracing_subscriber::registry()
.with(console_pretty_layer)
.with(console_compact_layer)
.with(forward_layer);
.with(forward_layer)
.with(export_layer);

#[cfg(feature = "tokio-console")]
let reg = reg.with(console_subscriber::spawn());
Expand Down
2 changes: 1 addition & 1 deletion core/src/telemetry/otel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -334,7 +334,7 @@ impl GaugeF64 for MemoryGauge<f64> {
}
}

fn default_resource_instance() -> &'static Resource {
pub(crate) fn default_resource_instance() -> &'static Resource {
use once_cell::sync::OnceCell;

static INSTANCE: OnceCell<Resource> = OnceCell::new();
Expand Down
13 changes: 13 additions & 0 deletions core/src/worker/workflow/machines/workflow_machines.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,11 @@
mod local_acts;

use opentelemetry::{
self,
trace::{SpanKind, Tracer},
KeyValue,
};

use super::{
cancel_external_state_machine::new_external_cancel,
cancel_workflow_state_machine::cancel_workflow,
Expand Down Expand Up @@ -526,6 +532,13 @@ impl WorkflowMachines {
/// Apply the next (unapplied) entire workflow task from history to these machines. Will replay
/// any events that need to be replayed until caught up to the newest WFT.
pub(crate) fn apply_next_wft_from_history(&mut self) -> Result<usize> {
let tracer = opentelemetry::global::tracer("apply_next_wft_from_history-tracer");
let _span = tracer
.span_builder("apply_next_wft_from_history")
.with_kind(SpanKind::Server)
.with_attributes([KeyValue::new("temporal.worker", true)])
.start(&tracer);

// If we have already seen the terminal event for the entire workflow in a previous WFT,
// then we don't need to do anything here, and in fact we need to avoid re-applying the
// final WFT.
Expand Down
8 changes: 8 additions & 0 deletions core/src/worker/workflow/managed_run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ use crate::{
MetricsContext,
};
use futures_util::future::AbortHandle;
use opentelemetry::{trace::SpanKind, trace::Tracer, KeyValue};
use std::{
collections::HashSet,
mem,
Expand Down Expand Up @@ -370,6 +371,13 @@ impl ManagedRun {
used_flags: Vec<u32>,
resp_chan: Option<oneshot::Sender<ActivationCompleteResult>>,
) -> Result<RunUpdateAct, NextPageReq> {
let tracer = opentelemetry::global::tracer("successful_completion-tracer");
let _span = tracer
.span_builder("successful_completion")
.with_kind(SpanKind::Server)
.with_attributes([KeyValue::new("temporal.worker", true)])
.start(&tracer);

let activation_was_only_eviction = self.activation_is_eviction();
let (task_token, has_pending_query, start_time) = if let Some(entry) = self.wft.as_ref() {
(
Expand Down
8 changes: 8 additions & 0 deletions core/src/worker/workflow/workflow_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ use crate::{
MetricsContext,
};
use futures::{stream, stream::PollNext, Stream, StreamExt};
use opentelemetry::{trace::SpanKind, trace::Tracer, KeyValue};
use std::{collections::VecDeque, fmt::Debug, future, sync::Arc};
use temporal_sdk_core_api::errors::PollWfError;
use temporal_sdk_core_protos::coresdk::workflow_activation::remove_from_cache::EvictionReason;
Expand Down Expand Up @@ -252,6 +253,13 @@ impl WFStream {
}

fn process_completion(&mut self, complete: NewOrFetchedComplete) -> Vec<ActivationOrAuto> {
let tracer = opentelemetry::global::tracer("process_completion-tracer");
let _span = tracer
.span_builder("process_completion")
.with_kind(SpanKind::Server)
.with_attributes([KeyValue::new("temporal.worker", true)])
.start(&tracer);

let rh = if let Some(rh) = self.runs.get_mut(complete.run_id()) {
rh
} else {
Expand Down

0 comments on commit a66cd7c

Please sign in to comment.