Skip to content

Commit

Permalink
Update Tokio runtime metrics (#3344)
Browse files Browse the repository at this point in the history
* Cleanup after OpenTelemetry upgrade

* Rename of configuration field

* Adopt new name of "num_alive_tasks"

* Add new metric for total number of spawned tasks
  • Loading branch information
divergentdave authored Jul 29, 2024
1 parent e249402 commit c95f2f9
Showing 1 changed file with 73 additions and 65 deletions.
138 changes: 73 additions & 65 deletions aggregator/src/metrics/tokio_runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,11 @@ use std::time::{Duration, SystemTime};

use derivative::Derivative;
use opentelemetry::{metrics::MetricsError, InstrumentationLibrary, KeyValue};
use opentelemetry_sdk::{
metrics::{
data::{
DataPoint, Gauge, Histogram, HistogramDataPoint, Metric, ScopeMetrics, Sum, Temporality,
},
reader::MetricProducer,
use opentelemetry_sdk::metrics::{
data::{
DataPoint, Gauge, Histogram, HistogramDataPoint, Metric, ScopeMetrics, Sum, Temporality,
},
AttributeSet,
reader::MetricProducer,
};
use tokio::runtime::{self, HistogramScale, RuntimeMetrics};

Expand All @@ -31,7 +28,7 @@ pub(crate) fn configure_runtime(
if config.enable_poll_time_histogram {
runtime_builder.enable_metrics_poll_count_histogram();
runtime_builder.metrics_poll_count_histogram_scale(config.poll_time_histogram_scale.into());
if let Some(resolution) = config.poll_time_histogram_resolution_microseconds {
if let Some(resolution) = config.poll_time_histogram_resolution_us {
let resolution = Duration::from_micros(resolution);
runtime_builder.metrics_poll_count_histogram_resolution(resolution);
}
Expand All @@ -53,25 +50,20 @@ pub(super) struct TokioRuntimeMetrics {
poll_count_histogram_num_buckets: usize,
poll_count_histogram_bucket_bounds: Vec<f64>,
#[derivative(Debug = "ignore")]
attributes_local: AttributeSet,
attributes_local: Vec<KeyValue>,
#[derivative(Debug = "ignore")]
attributes_local_overflow: AttributeSet,
attributes_local_overflow: Vec<KeyValue>,
#[derivative(Debug = "ignore")]
attributes_remote: AttributeSet,
attributes_remote: Vec<KeyValue>,
#[derivative(Debug = "ignore")]
attributes_local_queue_worker: Vec<AttributeSet>,
attributes_injection_queue: AttributeSet,
attributes_blocking_queue: AttributeSet,
attributes_local_queue_worker: Vec<Vec<KeyValue>>,
attributes_injection_queue: Vec<KeyValue>,
attributes_blocking_queue: Vec<KeyValue>,
}

impl TokioRuntimeMetrics {
pub(super) fn new(runtime_metrics: RuntimeMetrics) -> Self {
let scope = InstrumentationLibrary::new(
"tokio-runtime-metrics",
None::<&'static str>,
None::<&'static str>,
None,
);
let scope = InstrumentationLibrary::builder("tokio-runtime-metrics").build();

let start_time = SystemTime::now();

Expand All @@ -92,13 +84,13 @@ impl TokioRuntimeMetrics {
})
.collect();

let attributes_local = AttributeSet::from([KeyValue::new("queue", "local")].as_slice());
let attributes_local = Vec::from([KeyValue::new("queue", "local")].as_slice());
let attributes_local_overflow =
AttributeSet::from([KeyValue::new("queue", "local_overflow")].as_slice());
let attributes_remote = AttributeSet::from([KeyValue::new("queue", "remote")].as_slice());
Vec::from([KeyValue::new("queue", "local_overflow")].as_slice());
let attributes_remote = Vec::from([KeyValue::new("queue", "remote")].as_slice());
let attributes_local_queue_worker = (0..num_workers)
.map(|i| {
AttributeSet::from(
Vec::from(
[
KeyValue::new("queue", "local"),
KeyValue::new("worker", i64::try_from(i).unwrap()),
Expand All @@ -108,9 +100,8 @@ impl TokioRuntimeMetrics {
})
.collect();
let attributes_injection_queue =
AttributeSet::from([KeyValue::new("queue", "injection")].as_slice());
let attributes_blocking_queue =
AttributeSet::from([KeyValue::new("queue", "blocking")].as_slice());
Vec::from([KeyValue::new("queue", "injection")].as_slice());
let attributes_blocking_queue = Vec::from([KeyValue::new("queue", "blocking")].as_slice());

Self {
runtime_metrics,
Expand All @@ -134,8 +125,9 @@ impl MetricProducer for TokioRuntimeMetrics {
let now = SystemTime::now();

let num_blocking_threads = self.runtime_metrics.num_blocking_threads();
let active_tasks_count = self.runtime_metrics.active_tasks_count();
let num_alive_tasks = self.runtime_metrics.num_alive_tasks();
let num_idle_blocking_threads = self.runtime_metrics.num_idle_blocking_threads();
let spawned_task_count = self.runtime_metrics.spawned_tasks_count();
let remote_schedule_count = self.runtime_metrics.remote_schedule_count();
let budget_forced_yield_count = self.runtime_metrics.budget_forced_yield_count();
let injection_queue_depth = self.runtime_metrics.injection_queue_depth();
Expand Down Expand Up @@ -182,10 +174,10 @@ impl MetricProducer for TokioRuntimeMetrics {
Metric {
name: "tokio.thread.worker.count".into(),
description: "Number of runtime worker threads".into(),
unit: "{thread}",
unit: "{thread}".into(),
data: Box::new(Gauge::<u64> {
data_points: Vec::from([DataPoint {
attributes: AttributeSet::default(),
attributes: Vec::default(),
start_time: Some(self.start_time),
time: Some(now),
value: u64::try_from(self.num_workers).unwrap_or(u64::MAX),
Expand All @@ -198,10 +190,10 @@ impl MetricProducer for TokioRuntimeMetrics {
description: "Number of additional threads spawned by the runtime for blocking \
operations"
.into(),
unit: "{thread}",
unit: "{thread}".into(),
data: Box::new(Gauge::<u64> {
data_points: Vec::from([DataPoint {
attributes: AttributeSet::default(),
attributes: Vec::new(),
start_time: Some(self.start_time),
time: Some(now),
value: u64::try_from(num_blocking_threads).unwrap_or(u64::MAX),
Expand All @@ -210,15 +202,15 @@ impl MetricProducer for TokioRuntimeMetrics {
}),
},
Metric {
name: "tokio.task.active.count".into(),
description: "Number of active tasks in the runtime".into(),
unit: "{task}",
name: "tokio.task.alive.count".into(),
description: "Number of alive tasks in the runtime".into(),
unit: "{task}".into(),
data: Box::new(Gauge::<u64> {
data_points: Vec::from([DataPoint {
attributes: AttributeSet::default(),
attributes: Vec::new(),
start_time: Some(self.start_time),
time: Some(now),
value: u64::try_from(active_tasks_count).unwrap_or(u64::MAX),
value: u64::try_from(num_alive_tasks).unwrap_or(u64::MAX),
exemplars: Vec::new(),
}]),
}),
Expand All @@ -227,24 +219,40 @@ impl MetricProducer for TokioRuntimeMetrics {
name: "tokio.thread.blocking.idle.count".into(),
description: "Number of additional threads for blocking operations which are idle"
.into(),
unit: "{thread}",
unit: "{thread}".into(),
data: Box::new(Gauge::<u64> {
data_points: Vec::from([DataPoint {
attributes: AttributeSet::default(),
attributes: Vec::new(),
start_time: Some(self.start_time),
time: Some(now),
value: u64::try_from(num_idle_blocking_threads).unwrap_or(u64::MAX),
exemplars: Vec::new(),
}]),
}),
},
Metric {
name: "tokio.task.spawned".into(),
description: "Total number of tasks spawned in the runtime".into(),
unit: "{task}".into(),
data: Box::new(Sum::<u64> {
data_points: Vec::from([DataPoint {
attributes: Vec::new(),
start_time: Some(self.start_time),
time: Some(now),
value: spawned_task_count,
exemplars: Vec::new(),
}]),
temporality: Temporality::Cumulative,
is_monotonic: true,
}),
},
Metric {
name: "tokio.task.scheduled".into(),
description: "Number of tasks scheduled, either to the thread's own local queue, \
from a worker thread to the injection queue due to overflow, or \
from a remote thread to the injection queue"
.into(),
unit: "{task}",
unit: "{task}".into(),
data: Box::new(Sum::<u64> {
data_points: Vec::from([
DataPoint {
Expand Down Expand Up @@ -278,10 +286,10 @@ impl MetricProducer for TokioRuntimeMetrics {
description: "Number of times tasks have been forced to yield because their task \
budget was exhausted"
.into(),
unit: "",
unit: "".into(),
data: Box::new(Sum::<u64> {
data_points: Vec::from([DataPoint {
attributes: AttributeSet::default(),
attributes: Vec::new(),
start_time: Some(self.start_time),
time: Some(now),
value: budget_forced_yield_count,
Expand All @@ -294,10 +302,10 @@ impl MetricProducer for TokioRuntimeMetrics {
Metric {
name: "tokio.park".into(),
description: "Total number of times worker threads have parked".into(),
unit: "",
unit: "".into(),
data: Box::new(Sum::<u64> {
data_points: Vec::from([DataPoint {
attributes: AttributeSet::default(),
attributes: Vec::new(),
start_time: Some(self.start_time),
time: Some(now),
value: park_count,
Expand All @@ -312,10 +320,10 @@ impl MetricProducer for TokioRuntimeMetrics {
description: "Total number of times worker threads unparked and parked again \
without doing any work"
.into(),
unit: "",
unit: "".into(),
data: Box::new(Sum::<u64> {
data_points: Vec::from([DataPoint {
attributes: AttributeSet::default(),
attributes: Vec::new(),
start_time: Some(self.start_time),
time: Some(now),
value: noop_count,
Expand All @@ -328,10 +336,10 @@ impl MetricProducer for TokioRuntimeMetrics {
Metric {
name: "tokio.task.stolen".into(),
description: "Total number of tasks stolen between worker threads".into(),
unit: "{task}",
unit: "{task}".into(),
data: Box::new(Sum::<u64> {
data_points: Vec::from([DataPoint {
attributes: AttributeSet::default(),
attributes: Vec::new(),
start_time: Some(self.start_time),
time: Some(now),
value: steal_count,
Expand All @@ -345,10 +353,10 @@ impl MetricProducer for TokioRuntimeMetrics {
name: "tokio.steals".into(),
description: "Number of times worker threads successfully stole one or more tasks"
.into(),
unit: "{operation}",
unit: "{operation}".into(),
data: Box::new(Sum::<u64> {
data_points: Vec::from([DataPoint {
attributes: AttributeSet::default(),
attributes: Vec::new(),
start_time: Some(self.start_time),
time: Some(now),
value: steal_operations,
Expand All @@ -361,10 +369,10 @@ impl MetricProducer for TokioRuntimeMetrics {
Metric {
name: "tokio.thread.worker.busy.time".into(),
description: "Total amount of time that all worker threads have been busy".into(),
unit: "s",
unit: "s".into(),
data: Box::new(Sum::<f64> {
data_points: Vec::from([DataPoint {
attributes: AttributeSet::default(),
attributes: Vec::new(),
start_time: Some(self.start_time),
time: Some(now),
value: total_busy_duration.as_secs_f64(),
Expand All @@ -379,7 +387,7 @@ impl MetricProducer for TokioRuntimeMetrics {
description: "Number of tasks currently in the runtime's injection queue, \
blocking thread pool queue, or a worker's local queue"
.into(),
unit: "{task}",
unit: "{task}".into(),
data: Box::new(Gauge::<u64> {
data_points: {
let mut data_points = Vec::with_capacity(self.num_workers + 2);
Expand Down Expand Up @@ -417,10 +425,10 @@ impl MetricProducer for TokioRuntimeMetrics {
Metric {
name: "tokio.task.poll.time".into(),
description: "Histogram of task poll times".into(),
unit: "s",
unit: "s".into(),
data: Box::new(Histogram::<f64> {
data_points: Vec::from([HistogramDataPoint {
attributes: AttributeSet::default(),
attributes: Vec::new(),
start_time: self.start_time,
time: now,
count: poll_count,
Expand All @@ -437,10 +445,10 @@ impl MetricProducer for TokioRuntimeMetrics {
Metric {
name: "tokio.task.poll.time.average".into(),
description: "Exponentially weighted moving average of task poll times".into(),
unit: "s",
unit: "s".into(),
data: Box::new(Gauge::<f64> {
data_points: Vec::from([DataPoint {
attributes: AttributeSet::default(),
attributes: Vec::new(),
start_time: Some(self.start_time),
time: Some(now),
value: mean_poll_time.as_secs_f64(),
Expand All @@ -452,10 +460,10 @@ impl MetricProducer for TokioRuntimeMetrics {
name: "tokio.io.fd.count".into(),
description: "Number of file descriptors currently registered with the I/O driver"
.into(),
unit: "{fd}",
unit: "{fd}".into(),
data: Box::new(Gauge::<u64> {
data_points: Vec::from([DataPoint {
attributes: AttributeSet::default(),
attributes: Vec::new(),
start_time: Some(self.start_time),
time: Some(now),
value: io_driver_fd_registered_count
Expand All @@ -467,10 +475,10 @@ impl MetricProducer for TokioRuntimeMetrics {
Metric {
name: "tokio.io.fd.registered".into(),
description: "Total number of file descriptors registered by the I/O driver".into(),
unit: "{fd}",
unit: "{fd}".into(),
data: Box::new(Sum::<u64> {
data_points: Vec::from([DataPoint {
attributes: AttributeSet::default(),
attributes: Vec::new(),
start_time: Some(self.start_time),
time: Some(now),
value: io_driver_fd_registered_count,
Expand All @@ -484,10 +492,10 @@ impl MetricProducer for TokioRuntimeMetrics {
name: "tokio.io.fd.deregistered".into(),
description: "Total number of file descriptors deregistered by the I/O driver"
.into(),
unit: "{fd}",
unit: "{fd}".into(),
data: Box::new(Sum::<u64> {
data_points: Vec::from([DataPoint {
attributes: AttributeSet::default(),
attributes: Vec::new(),
start_time: Some(self.start_time),
time: Some(now),
value: io_driver_fd_deregistered_count,
Expand All @@ -500,10 +508,10 @@ impl MetricProducer for TokioRuntimeMetrics {
Metric {
name: "tokio.io.ready_events".into(),
description: "Number of ready events processed by the I/O driver".into(),
unit: "{event}",
unit: "{event}".into(),
data: Box::new(Sum::<u64> {
data_points: Vec::from([DataPoint {
attributes: AttributeSet::default(),
attributes: Vec::new(),
start_time: Some(self.start_time),
time: Some(now),
value: io_driver_ready_count,
Expand Down

0 comments on commit c95f2f9

Please sign in to comment.