From c95f2f9a09b1429eb88e4add8b87857a2a877732 Mon Sep 17 00:00:00 2001 From: David Cook Date: Mon, 29 Jul 2024 16:28:31 -0500 Subject: [PATCH] Update Tokio runtime metrics (#3344) * Cleanup after OpenTelemetry upgrade * Rename of configuration field * Adopt new name of "num_alive_tasks" * Add new metric for total number of spawned tasks --- aggregator/src/metrics/tokio_runtime.rs | 138 +++++++++++++----------- 1 file changed, 73 insertions(+), 65 deletions(-) diff --git a/aggregator/src/metrics/tokio_runtime.rs b/aggregator/src/metrics/tokio_runtime.rs index 177d9c2bc..0c1828fcc 100644 --- a/aggregator/src/metrics/tokio_runtime.rs +++ b/aggregator/src/metrics/tokio_runtime.rs @@ -2,14 +2,11 @@ use std::time::{Duration, SystemTime}; use derivative::Derivative; use opentelemetry::{metrics::MetricsError, InstrumentationLibrary, KeyValue}; -use opentelemetry_sdk::{ - metrics::{ - data::{ - DataPoint, Gauge, Histogram, HistogramDataPoint, Metric, ScopeMetrics, Sum, Temporality, - }, - reader::MetricProducer, +use opentelemetry_sdk::metrics::{ + data::{ + DataPoint, Gauge, Histogram, HistogramDataPoint, Metric, ScopeMetrics, Sum, Temporality, }, - AttributeSet, + reader::MetricProducer, }; use tokio::runtime::{self, HistogramScale, RuntimeMetrics}; @@ -31,7 +28,7 @@ pub(crate) fn configure_runtime( if config.enable_poll_time_histogram { runtime_builder.enable_metrics_poll_count_histogram(); runtime_builder.metrics_poll_count_histogram_scale(config.poll_time_histogram_scale.into()); - if let Some(resolution) = config.poll_time_histogram_resolution_microseconds { + if let Some(resolution) = config.poll_time_histogram_resolution_us { let resolution = Duration::from_micros(resolution); runtime_builder.metrics_poll_count_histogram_resolution(resolution); } @@ -53,25 +50,20 @@ pub(super) struct TokioRuntimeMetrics { poll_count_histogram_num_buckets: usize, poll_count_histogram_bucket_bounds: Vec, #[derivative(Debug = "ignore")] - attributes_local: AttributeSet, + attributes_local: Vec, #[derivative(Debug = "ignore")] - attributes_local_overflow: AttributeSet, + attributes_local_overflow: Vec, #[derivative(Debug = "ignore")] - attributes_remote: AttributeSet, + attributes_remote: Vec, #[derivative(Debug = "ignore")] - attributes_local_queue_worker: Vec, - attributes_injection_queue: AttributeSet, - attributes_blocking_queue: AttributeSet, + attributes_local_queue_worker: Vec>, + attributes_injection_queue: Vec, + attributes_blocking_queue: Vec, } impl TokioRuntimeMetrics { pub(super) fn new(runtime_metrics: RuntimeMetrics) -> Self { - let scope = InstrumentationLibrary::new( - "tokio-runtime-metrics", - None::<&'static str>, - None::<&'static str>, - None, - ); + let scope = InstrumentationLibrary::builder("tokio-runtime-metrics").build(); let start_time = SystemTime::now(); @@ -92,13 +84,13 @@ impl TokioRuntimeMetrics { }) .collect(); - let attributes_local = AttributeSet::from([KeyValue::new("queue", "local")].as_slice()); + let attributes_local = Vec::from([KeyValue::new("queue", "local")].as_slice()); let attributes_local_overflow = - AttributeSet::from([KeyValue::new("queue", "local_overflow")].as_slice()); - let attributes_remote = AttributeSet::from([KeyValue::new("queue", "remote")].as_slice()); + Vec::from([KeyValue::new("queue", "local_overflow")].as_slice()); + let attributes_remote = Vec::from([KeyValue::new("queue", "remote")].as_slice()); let attributes_local_queue_worker = (0..num_workers) .map(|i| { - AttributeSet::from( + Vec::from( [ KeyValue::new("queue", "local"), KeyValue::new("worker", i64::try_from(i).unwrap()), @@ -108,9 +100,8 @@ impl TokioRuntimeMetrics { }) .collect(); let attributes_injection_queue = - AttributeSet::from([KeyValue::new("queue", "injection")].as_slice()); - let attributes_blocking_queue = - AttributeSet::from([KeyValue::new("queue", "blocking")].as_slice()); + Vec::from([KeyValue::new("queue", "injection")].as_slice()); + let attributes_blocking_queue = Vec::from([KeyValue::new("queue", "blocking")].as_slice()); Self { runtime_metrics, @@ -134,8 +125,9 @@ impl MetricProducer for TokioRuntimeMetrics { let now = SystemTime::now(); let num_blocking_threads = self.runtime_metrics.num_blocking_threads(); - let active_tasks_count = self.runtime_metrics.active_tasks_count(); + let num_alive_tasks = self.runtime_metrics.num_alive_tasks(); let num_idle_blocking_threads = self.runtime_metrics.num_idle_blocking_threads(); + let spawned_task_count = self.runtime_metrics.spawned_tasks_count(); let remote_schedule_count = self.runtime_metrics.remote_schedule_count(); let budget_forced_yield_count = self.runtime_metrics.budget_forced_yield_count(); let injection_queue_depth = self.runtime_metrics.injection_queue_depth(); @@ -182,10 +174,10 @@ impl MetricProducer for TokioRuntimeMetrics { Metric { name: "tokio.thread.worker.count".into(), description: "Number of runtime worker threads".into(), - unit: "{thread}", + unit: "{thread}".into(), data: Box::new(Gauge:: { data_points: Vec::from([DataPoint { - attributes: AttributeSet::default(), + attributes: Vec::default(), start_time: Some(self.start_time), time: Some(now), value: u64::try_from(self.num_workers).unwrap_or(u64::MAX), @@ -198,10 +190,10 @@ impl MetricProducer for TokioRuntimeMetrics { description: "Number of additional threads spawned by the runtime for blocking \ operations" .into(), - unit: "{thread}", + unit: "{thread}".into(), data: Box::new(Gauge:: { data_points: Vec::from([DataPoint { - attributes: AttributeSet::default(), + attributes: Vec::new(), start_time: Some(self.start_time), time: Some(now), value: u64::try_from(num_blocking_threads).unwrap_or(u64::MAX), @@ -210,15 +202,15 @@ impl MetricProducer for TokioRuntimeMetrics { }), }, Metric { - name: "tokio.task.active.count".into(), - description: "Number of active tasks in the runtime".into(), - unit: "{task}", + name: "tokio.task.alive.count".into(), + description: "Number of alive tasks in the runtime".into(), + unit: "{task}".into(), data: Box::new(Gauge:: { data_points: Vec::from([DataPoint { - attributes: AttributeSet::default(), + attributes: Vec::new(), start_time: Some(self.start_time), time: Some(now), - value: u64::try_from(active_tasks_count).unwrap_or(u64::MAX), + value: u64::try_from(num_alive_tasks).unwrap_or(u64::MAX), exemplars: Vec::new(), }]), }), @@ -227,10 +219,10 @@ impl MetricProducer for TokioRuntimeMetrics { name: "tokio.thread.blocking.idle.count".into(), description: "Number of additional threads for blocking operations which are idle" .into(), - unit: "{thread}", + unit: "{thread}".into(), data: Box::new(Gauge:: { data_points: Vec::from([DataPoint { - attributes: AttributeSet::default(), + attributes: Vec::new(), start_time: Some(self.start_time), time: Some(now), value: u64::try_from(num_idle_blocking_threads).unwrap_or(u64::MAX), @@ -238,13 +230,29 @@ impl MetricProducer for TokioRuntimeMetrics { }]), }), }, + Metric { + name: "tokio.task.spawned".into(), + description: "Total number of tasks spawned in the runtime".into(), + unit: "{task}".into(), + data: Box::new(Sum:: { + data_points: Vec::from([DataPoint { + attributes: Vec::new(), + start_time: Some(self.start_time), + time: Some(now), + value: spawned_task_count, + exemplars: Vec::new(), + }]), + temporality: Temporality::Cumulative, + is_monotonic: true, + }), + }, Metric { name: "tokio.task.scheduled".into(), description: "Number of tasks scheduled, either to the thread's own local queue, \ from a worker thread to the injection queue due to overflow, or \ from a remote thread to the injection queue" .into(), - unit: "{task}", + unit: "{task}".into(), data: Box::new(Sum:: { data_points: Vec::from([ DataPoint { @@ -278,10 +286,10 @@ impl MetricProducer for TokioRuntimeMetrics { description: "Number of times tasks have been forced to yield because their task \ budget was exhausted" .into(), - unit: "", + unit: "".into(), data: Box::new(Sum:: { data_points: Vec::from([DataPoint { - attributes: AttributeSet::default(), + attributes: Vec::new(), start_time: Some(self.start_time), time: Some(now), value: budget_forced_yield_count, @@ -294,10 +302,10 @@ impl MetricProducer for TokioRuntimeMetrics { Metric { name: "tokio.park".into(), description: "Total number of times worker threads have parked".into(), - unit: "", + unit: "".into(), data: Box::new(Sum:: { data_points: Vec::from([DataPoint { - attributes: AttributeSet::default(), + attributes: Vec::new(), start_time: Some(self.start_time), time: Some(now), value: park_count, @@ -312,10 +320,10 @@ impl MetricProducer for TokioRuntimeMetrics { description: "Total number of times worker threads unparked and parked again \ without doing any work" .into(), - unit: "", + unit: "".into(), data: Box::new(Sum:: { data_points: Vec::from([DataPoint { - attributes: AttributeSet::default(), + attributes: Vec::new(), start_time: Some(self.start_time), time: Some(now), value: noop_count, @@ -328,10 +336,10 @@ impl MetricProducer for TokioRuntimeMetrics { Metric { name: "tokio.task.stolen".into(), description: "Total number of tasks stolen between worker threads".into(), - unit: "{task}", + unit: "{task}".into(), data: Box::new(Sum:: { data_points: Vec::from([DataPoint { - attributes: AttributeSet::default(), + attributes: Vec::new(), start_time: Some(self.start_time), time: Some(now), value: steal_count, @@ -345,10 +353,10 @@ impl MetricProducer for TokioRuntimeMetrics { name: "tokio.steals".into(), description: "Number of times worker threads successfully stole one or more tasks" .into(), - unit: "{operation}", + unit: "{operation}".into(), data: Box::new(Sum:: { data_points: Vec::from([DataPoint { - attributes: AttributeSet::default(), + attributes: Vec::new(), start_time: Some(self.start_time), time: Some(now), value: steal_operations, @@ -361,10 +369,10 @@ impl MetricProducer for TokioRuntimeMetrics { Metric { name: "tokio.thread.worker.busy.time".into(), description: "Total amount of time that all worker threads have been busy".into(), - unit: "s", + unit: "s".into(), data: Box::new(Sum:: { data_points: Vec::from([DataPoint { - attributes: AttributeSet::default(), + attributes: Vec::new(), start_time: Some(self.start_time), time: Some(now), value: total_busy_duration.as_secs_f64(), @@ -379,7 +387,7 @@ impl MetricProducer for TokioRuntimeMetrics { description: "Number of tasks currently in the runtime's injection queue, \ blocking thread pool queue, or a worker's local queue" .into(), - unit: "{task}", + unit: "{task}".into(), data: Box::new(Gauge:: { data_points: { let mut data_points = Vec::with_capacity(self.num_workers + 2); @@ -417,10 +425,10 @@ impl MetricProducer for TokioRuntimeMetrics { Metric { name: "tokio.task.poll.time".into(), description: "Histogram of task poll times".into(), - unit: "s", + unit: "s".into(), data: Box::new(Histogram:: { data_points: Vec::from([HistogramDataPoint { - attributes: AttributeSet::default(), + attributes: Vec::new(), start_time: self.start_time, time: now, count: poll_count, @@ -437,10 +445,10 @@ impl MetricProducer for TokioRuntimeMetrics { Metric { name: "tokio.task.poll.time.average".into(), description: "Exponentially weighted moving average of task poll times".into(), - unit: "s", + unit: "s".into(), data: Box::new(Gauge:: { data_points: Vec::from([DataPoint { - attributes: AttributeSet::default(), + attributes: Vec::new(), start_time: Some(self.start_time), time: Some(now), value: mean_poll_time.as_secs_f64(), @@ -452,10 +460,10 @@ impl MetricProducer for TokioRuntimeMetrics { name: "tokio.io.fd.count".into(), description: "Number of file descriptors currently registered with the I/O driver" .into(), - unit: "{fd}", + unit: "{fd}".into(), data: Box::new(Gauge:: { data_points: Vec::from([DataPoint { - attributes: AttributeSet::default(), + attributes: Vec::new(), start_time: Some(self.start_time), time: Some(now), value: io_driver_fd_registered_count @@ -467,10 +475,10 @@ impl MetricProducer for TokioRuntimeMetrics { Metric { name: "tokio.io.fd.registered".into(), description: "Total number of file descriptors registered by the I/O driver".into(), - unit: "{fd}", + unit: "{fd}".into(), data: Box::new(Sum:: { data_points: Vec::from([DataPoint { - attributes: AttributeSet::default(), + attributes: Vec::new(), start_time: Some(self.start_time), time: Some(now), value: io_driver_fd_registered_count, @@ -484,10 +492,10 @@ impl MetricProducer for TokioRuntimeMetrics { name: "tokio.io.fd.deregistered".into(), description: "Total number of file descriptors deregistered by the I/O driver" .into(), - unit: "{fd}", + unit: "{fd}".into(), data: Box::new(Sum:: { data_points: Vec::from([DataPoint { - attributes: AttributeSet::default(), + attributes: Vec::new(), start_time: Some(self.start_time), time: Some(now), value: io_driver_fd_deregistered_count, @@ -500,10 +508,10 @@ impl MetricProducer for TokioRuntimeMetrics { Metric { name: "tokio.io.ready_events".into(), description: "Number of ready events processed by the I/O driver".into(), - unit: "{event}", + unit: "{event}".into(), data: Box::new(Sum:: { data_points: Vec::from([DataPoint { - attributes: AttributeSet::default(), + attributes: Vec::new(), start_time: Some(self.start_time), time: Some(now), value: io_driver_ready_count,