Skip to content

Commit

Permalink
ref(system): Expose better runtime metrics
Browse files Browse the repository at this point in the history
  • Loading branch information
Dav1dde committed Nov 26, 2024
1 parent 434c1c5 commit 821b1f4
Show file tree
Hide file tree
Showing 9 changed files with 436 additions and 64 deletions.
3 changes: 2 additions & 1 deletion relay-server/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -295,13 +295,14 @@ pub fn run(config: Config) -> anyhow::Result<()> {

// Creates the main runtime.
let main_runtime = crate::service::create_runtime("main-rt", config.cpu_concurrency());
let main_rt_metrics = main_runtime.metrics();

// Run the system and block until a shutdown signal is sent to this process. Inside, start a
// web server and run all relevant services. See the `actors` module documentation for more
// information on all services.
main_runtime.block_on(async {
Controller::start(config.shutdown_timeout());
let (state, mut runner) = ServiceState::start(config.clone())?;
let (state, mut runner) = ServiceState::start(main_rt_metrics, config.clone())?;
runner.start(HttpServer::new(config, state.clone())?);

tokio::select! {
Expand Down
18 changes: 9 additions & 9 deletions relay-server/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ use relay_redis::redis::Script;
use relay_redis::{PooledClient, RedisScripts};
use relay_redis::{RedisError, RedisPool, RedisPools};
use relay_system::{channel, Addr, Service, ServiceRunner};
use tokio::runtime::Runtime;
use tokio::sync::mpsc;

/// Indicates the type of failure of the server.
Expand Down Expand Up @@ -68,16 +67,15 @@ pub struct Registry {
pub project_cache_handle: ProjectCacheHandle,
}

/// Constructs a tokio [`Runtime`] configured for running [services](relay_system::Service).
pub fn create_runtime(name: &str, threads: usize) -> Runtime {
tokio::runtime::Builder::new_multi_thread()
.thread_name(name)
/// Constructs a Tokio [`Runtime`] configured for running [services](relay_system::Service).
pub fn create_runtime(name: &'static str, threads: usize) -> relay_system::Runtime {
relay_system::Runtime::builder(name)
.worker_threads(threads)
// Relay uses `spawn_blocking` only for Redis connections within the project
// cache, those should never exceed 100 concurrent connections
// (limited by connection pool).
//
// Relay also does not use other blocking opertions from Tokio which require
// Relay also does not use other blocking operations from Tokio which require
// this pool, no usage of `tokio::fs` and `tokio::io::{Stdin, Stdout, Stderr}`.
//
// We limit the maximum amount of threads here, we've seen that Tokio
Expand All @@ -88,9 +86,7 @@ pub fn create_runtime(name: &str, threads: usize) -> Runtime {
// threads to encourage the runtime to not keep too many idle blocking threads
// around.
.thread_keep_alive(Duration::from_secs(1))
.enable_all()
.build()
.unwrap()
}

fn create_processor_pool(config: &Config) -> Result<ThreadPool> {
Expand Down Expand Up @@ -145,7 +141,10 @@ pub struct ServiceState {

impl ServiceState {
/// Starts all services and returns addresses to all of them.
pub fn start(config: Arc<Config>) -> Result<(Self, ServiceRunner)> {
pub fn start(
rt_metrics: relay_system::RuntimeMetrics,
config: Arc<Config>,
) -> Result<(Self, ServiceRunner)> {
let mut runner = ServiceRunner::new();
let upstream_relay = runner.start(UpstreamRelayService::new(config.clone()));
let test_store = runner.start(TestStoreService::new(config.clone()));
Expand Down Expand Up @@ -305,6 +304,7 @@ impl ServiceState {

runner.start(RelayStats::new(
config.clone(),
rt_metrics,
upstream_relay.clone(),
#[cfg(feature = "processing")]
redis_pools.clone(),
Expand Down
72 changes: 47 additions & 25 deletions relay-server/src/services/stats.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,17 +4,18 @@ use relay_config::{Config, RelayMode};
#[cfg(feature = "processing")]
use relay_redis::{RedisPool, RedisPools};
use relay_statsd::metric;
use relay_system::{Addr, Service};
use relay_system::{Addr, RuntimeMetrics, Service};
use tokio::time::interval;

use crate::services::upstream::{IsNetworkOutage, UpstreamRelay};
use crate::statsd::{RelayGauges, TokioGauges};
use crate::statsd::{RelayGauges, RuntimeCounters, RuntimeGauges};

/// Relay Stats Service.
///
/// Service which collects stats periodically and emits them via statsd.
pub struct RelayStats {
config: Arc<Config>,
runtime: RuntimeMetrics,
upstream_relay: Addr<UpstreamRelay>,
#[cfg(feature = "processing")]
redis_pools: Option<RedisPools>,
Expand All @@ -23,70 +24,91 @@ pub struct RelayStats {
impl RelayStats {
pub fn new(
config: Arc<Config>,
runtime: RuntimeMetrics,
upstream_relay: Addr<UpstreamRelay>,
#[cfg(feature = "processing")] redis_pools: Option<RedisPools>,
) -> Self {
Self {
config,
upstream_relay,
runtime,
#[cfg(feature = "processing")]
redis_pools,
}
}

async fn tokio_metrics(&self) {
let m = tokio::runtime::Handle::current().metrics();

metric!(gauge(TokioGauges::ActiveTasksCount) = m.num_alive_tasks() as u64);
metric!(gauge(TokioGauges::BlockingQueueDepth) = m.blocking_queue_depth() as u64);
metric!(gauge(TokioGauges::BudgetForcedYieldCount) = m.budget_forced_yield_count());
metric!(gauge(TokioGauges::NumBlockingThreads) = m.num_blocking_threads() as u64);
metric!(gauge(TokioGauges::NumIdleBlockingThreads) = m.num_idle_blocking_threads() as u64);
metric!(gauge(TokioGauges::NumWorkers) = m.num_workers() as u64);
for worker in 0..m.num_workers() {
metric!(gauge(RuntimeGauges::NumIdleThreads) = self.runtime.num_idle_threads() as u64);
metric!(gauge(RuntimeGauges::NumAliveTasks) = self.runtime.num_alive_tasks() as u64);
metric!(
gauge(RuntimeGauges::BlockingQueueDepth) = self.runtime.blocking_queue_depth() as u64
);
metric!(
gauge(RuntimeGauges::NumBlockingThreads) = self.runtime.num_blocking_threads() as u64
);
metric!(
gauge(RuntimeGauges::NumBlockingThreads) = self.runtime.num_blocking_threads() as u64
);
metric!(
gauge(RuntimeGauges::NumIdleBlockingThreads) =
self.runtime.num_idle_blocking_threads() as u64
);

metric!(
counter(RuntimeCounters::BudgetForcedYieldCount) +=
self.runtime.budget_forced_yield_count()
);

metric!(gauge(RuntimeGauges::NumWorkers) = self.runtime.num_workers() as u64);
for worker in 0..self.runtime.num_workers() {
let worker_name = worker.to_string();

metric!(
gauge(TokioGauges::WorkerLocalQueueDepth) =
m.worker_local_queue_depth(worker) as u64,
gauge(RuntimeGauges::WorkerLocalQueueDepth) =
self.runtime.worker_local_queue_depth(worker) as u64,
worker = &worker_name,
);
metric!(
gauge(TokioGauges::WorkerLocalScheduleCount) =
m.worker_local_schedule_count(worker),
gauge(RuntimeGauges::WorkerMeanPollTime) =
self.runtime.worker_mean_poll_time(worker).as_secs_f64(),
worker = &worker_name,
);

metric!(
gauge(TokioGauges::WorkerMeanPollTime) =
m.worker_mean_poll_time(worker).as_secs_f64(),
counter(RuntimeCounters::WorkerLocalScheduleCount) +=
self.runtime.worker_local_schedule_count(worker),
worker = &worker_name,
);
metric!(
gauge(TokioGauges::WorkerNoopCount) = m.worker_noop_count(worker),
counter(RuntimeCounters::WorkerNoopCount) += self.runtime.worker_noop_count(worker),
worker = &worker_name,
);
metric!(
gauge(TokioGauges::WorkerOverflowCount) = m.worker_overflow_count(worker),
counter(RuntimeCounters::WorkerOverflowCount) +=
self.runtime.worker_overflow_count(worker),
worker = &worker_name,
);
metric!(
gauge(TokioGauges::WorkerParkCount) = m.worker_park_count(worker),
counter(RuntimeCounters::WorkerParkCount) += self.runtime.worker_park_count(worker),
worker = &worker_name,
);
metric!(
gauge(TokioGauges::WorkerPollCount) = m.worker_poll_count(worker),
counter(RuntimeCounters::WorkerPollCount) += self.runtime.worker_poll_count(worker),
worker = &worker_name,
);
metric!(
gauge(TokioGauges::WorkerStealCount) = m.worker_steal_count(worker),
counter(RuntimeCounters::WorkerStealCount) +=
self.runtime.worker_steal_count(worker),
worker = &worker_name,
);
metric!(
gauge(TokioGauges::WorkerStealOperations) = m.worker_steal_operations(worker),
counter(RuntimeCounters::WorkerStealOperations) +=
self.runtime.worker_steal_operations(worker),
worker = &worker_name,
);
metric!(
gauge(TokioGauges::WorkerTotalBusyDuration) =
m.worker_total_busy_duration(worker).as_secs_f64(),
counter(RuntimeCounters::WorkerTotalBusyDuration) +=
self.runtime.worker_total_busy_duration(worker).as_millis() as u64,
worker = &worker_name,
);
}
Expand Down
70 changes: 42 additions & 28 deletions relay-server/src/statsd.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use relay_statsd::{CounterMetric, GaugeMetric, HistogramMetric, TimerMetric};
#[cfg(doc)]
use tokio::runtime::RuntimeMetrics;
use relay_system::RuntimeMetrics;

/// Gauge metrics used by Relay
pub enum RelayGauges {
Expand Down Expand Up @@ -76,14 +76,14 @@ impl GaugeMetric for RelayGauges {
}
}

/// Gauge metrics collected from the Tokio Runtime.
pub enum TokioGauges {
/// Exposes [`RuntimeMetrics::active_tasks_count`].
ActiveTasksCount,
/// Gauge metrics collected from the Runtime.
pub enum RuntimeGauges {
/// Exposes [`RuntimeMetrics::num_idle_threads`].
NumIdleThreads,
/// Exposes [`RuntimeMetrics::num_alive_tasks`].
NumAliveTasks,
/// Exposes [`RuntimeMetrics::blocking_queue_depth`].
BlockingQueueDepth,
/// Exposes [`RuntimeMetrics::budget_forced_yield_count`].
BudgetForcedYieldCount,
/// Exposes [`RuntimeMetrics::num_blocking_threads`].
NumBlockingThreads,
/// Exposes [`RuntimeMetrics::num_idle_blocking_threads`].
Expand All @@ -95,16 +95,37 @@ pub enum TokioGauges {
/// This metric is tagged with:
/// - `worker`: the worker id.
WorkerLocalQueueDepth,
/// Exposes [`RuntimeMetrics::worker_local_schedule_count`].
/// Exposes [`RuntimeMetrics::worker_mean_poll_time`].
///
/// This metric is tagged with:
/// - `worker`: the worker id.
WorkerLocalScheduleCount,
/// Exposes [`RuntimeMetrics::worker_mean_poll_time`].
WorkerMeanPollTime,
}

impl GaugeMetric for RuntimeGauges {
fn name(&self) -> &'static str {
match self {
RuntimeGauges::NumIdleThreads => "runtime.idle_threads",
RuntimeGauges::NumAliveTasks => "runtime.alive_tasks",
RuntimeGauges::BlockingQueueDepth => "runtime.blocking_queue_depth",
RuntimeGauges::NumBlockingThreads => "runtime.num_blocking_threads",
RuntimeGauges::NumIdleBlockingThreads => "runtime.num_idle_blocking_threads",
RuntimeGauges::NumWorkers => "runtime.num_workers",
RuntimeGauges::WorkerLocalQueueDepth => "runtime.worker_local_queue_depth",
RuntimeGauges::WorkerMeanPollTime => "runtime.worker_mean_poll_time",
}
}
}

/// Counter metrics collected from the Runtime.
pub enum RuntimeCounters {
/// Exposes [`RuntimeMetrics::budget_forced_yield_count`].
BudgetForcedYieldCount,
/// Exposes [`RuntimeMetrics::worker_local_schedule_count`].
///
/// This metric is tagged with:
/// - `worker`: the worker id.
WorkerMeanPollTime,
WorkerLocalScheduleCount,
/// Exposes [`RuntimeMetrics::worker_noop_count`].
///
/// This metric is tagged with:
Expand Down Expand Up @@ -142,25 +163,18 @@ pub enum TokioGauges {
WorkerTotalBusyDuration,
}

impl GaugeMetric for TokioGauges {
impl CounterMetric for RuntimeCounters {
fn name(&self) -> &'static str {
match self {
TokioGauges::ActiveTasksCount => "tokio.active_task_count",
TokioGauges::BlockingQueueDepth => "tokio.blocking_queue_depth",
TokioGauges::BudgetForcedYieldCount => "tokio.budget_forced_yield_count",
TokioGauges::NumBlockingThreads => "tokio.num_blocking_threads",
TokioGauges::NumIdleBlockingThreads => "tokio.num_idle_blocking_threads",
TokioGauges::NumWorkers => "tokio.num_workers",
TokioGauges::WorkerLocalQueueDepth => "tokio.worker_local_queue_depth",
TokioGauges::WorkerLocalScheduleCount => "tokio.worker_local_schedule_count",
TokioGauges::WorkerMeanPollTime => "tokio.worker_mean_poll_time",
TokioGauges::WorkerNoopCount => "tokio.worker_noop_count",
TokioGauges::WorkerOverflowCount => "tokio.worker_overflow_count",
TokioGauges::WorkerParkCount => "tokio.worker_park_count",
TokioGauges::WorkerPollCount => "tokio.worker_poll_count",
TokioGauges::WorkerStealCount => "tokio.worker_steal_count",
TokioGauges::WorkerStealOperations => "tokio.worker_steal_operations",
TokioGauges::WorkerTotalBusyDuration => "tokio.worker_total_busy_duration",
RuntimeCounters::BudgetForcedYieldCount => "runtime.budget_forced_yield_count",
RuntimeCounters::WorkerLocalScheduleCount => "runtime.worker_local_schedule_count",
RuntimeCounters::WorkerNoopCount => "runtime.worker_noop_count",
RuntimeCounters::WorkerOverflowCount => "runtime.worker_overflow_count",
RuntimeCounters::WorkerParkCount => "runtime.worker_park_count",
RuntimeCounters::WorkerPollCount => "runtime.worker_poll_count",
RuntimeCounters::WorkerStealCount => "runtime.worker_steal_count",
RuntimeCounters::WorkerStealOperations => "runtime.worker_steal_operations",
RuntimeCounters::WorkerTotalBusyDuration => "runtime.worker_total_busy_duration",
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion relay-system/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ futures = { workspace = true }
once_cell = { workspace = true }
relay-log = { workspace = true }
relay-statsd = { workspace = true }
tokio = { workspace = true, features = ["rt", "signal", "macros", "sync", "time"] }
tokio = { workspace = true, features = ["rt", "signal", "macros", "sync", "time", "rt-multi-thread"] }
pin-project-lite = { workspace = true }


Expand Down
Loading

0 comments on commit 821b1f4

Please sign in to comment.