diff --git a/aggregator/src/bin/janus_cli.rs b/aggregator/src/bin/janus_cli.rs index ee718f65b..2b7dfb3f4 100644 --- a/aggregator/src/bin/janus_cli.rs +++ b/aggregator/src/bin/janus_cli.rs @@ -143,8 +143,10 @@ impl Command { async fn install_tracing_and_metrics_handlers( config: &CommonConfig, ) -> Result<(TraceGuards, MetricsExporterHandle)> { - let trace_guard = install_trace_subscriber(&config.logging_config) + // Discard the trace reload handler, since this program is short-lived. + let (trace_guard, _) = install_trace_subscriber(&config.logging_config) .context("couldn't install tracing subscriber")?; + let metrics_guard = install_metrics_exporter(&config.metrics_config) .await .context("failed to install metrics exporter")?; diff --git a/aggregator/src/binary_utils.rs b/aggregator/src/binary_utils.rs index 096bf3863..92f6733ad 100644 --- a/aggregator/src/binary_utils.rs +++ b/aggregator/src/binary_utils.rs @@ -5,7 +5,7 @@ pub mod job_driver; use crate::{ config::{BinaryConfig, DbConfig}, metrics::{install_metrics_exporter, MetricsExporterConfiguration}, - trace::{install_trace_subscriber, OpenTelemetryTraceConfiguration}, + trace::{install_trace_subscriber, OpenTelemetryTraceConfiguration, TraceReloadHandle}, }; use anyhow::{anyhow, Context as _, Result}; use backoff::{future::retry, ExponentialBackoff}; @@ -18,6 +18,7 @@ use janus_aggregator_core::datastore::{Crypter, Datastore}; use janus_core::time::Clock; use opentelemetry::metrics::Meter; use ring::aead::{LessSafeKey, UnboundKey, AES_128_GCM}; +use serde::{Deserialize, Serialize}; use std::{ fmt::{self, Debug, Formatter}, fs, @@ -26,12 +27,15 @@ use std::{ panic, path::PathBuf, str::FromStr, + sync::Arc, time::Duration, }; use tokio::sync::oneshot; use tokio_postgres::NoTls; use tracing::{debug, info}; -use trillium::{Handler, Headers, Info, Init}; +use tracing_subscriber::EnvFilter; +use trillium::{Handler, Headers, Info, Init, Status}; +use trillium_api::{api, Json, State}; use trillium_head::Head; use trillium_router::Router; use trillium_tokio::Stopper; @@ -278,8 +282,9 @@ where let config: Config = read_config(options.common_options())?; // Install tracing/metrics handlers. - let _guards = install_trace_subscriber(&config.common_config().logging_config) - .context("couldn't install tracing subscriber")?; + let (_guards, trace_reload_handle) = + install_trace_subscriber(&config.common_config().logging_config) + .context("couldn't install tracing subscriber")?; let _metrics_exporter = install_metrics_exporter(&config.common_config().metrics_config) .await .context("failed to install metrics exporter")?; @@ -305,10 +310,9 @@ where .context("couldn't create datastore")?; let health_check_listen_address = config.common_config().health_check_listen_address; - let healthz_task_handle = - tokio::task::spawn( - async move { health_endpoint_server(health_check_listen_address).await }, - ); + let zpages_task_handle = tokio::task::spawn(async move { + zpages_server(health_check_listen_address, trace_reload_handle).await + }); let result = f(BinaryContext { clock, @@ -319,20 +323,22 @@ where }) .await; - healthz_task_handle.abort(); + zpages_task_handle.abort(); result } -/// Listen for HTTP requests on a given port, and respond to requests for "/healthz" with an empty -/// body and status code 200. Each Janus component exposes this HTTP server to enable health -/// checks, and to indicate when it has successfully started up. -async fn health_endpoint_server(address: SocketAddr) { - let router = Router::new().get( - "/healthz", - |conn: trillium::Conn| async move { conn.ok("") }, - ); - let handler = (Head::new(), router); +/// A trillium server which serves z-pages, which are utility endpoints for health checks and +/// tracing configuration. It listens on the given address and port. It also takes the reload +/// handle necessary for reloading the tracing_subscriber configuration. +/// +/// `/healthz` responds with an empty body and status code 200, which serves as a healthcheck to +/// indicate when Janus has started up. +/// +/// `/traceconfigz` responds with the tracing_subscriber configuration, or allows configuring it +/// with a PUT request. +async fn zpages_server(address: SocketAddr, trace_reload_handle: TraceReloadHandle) { + let handler = zpages_handler(trace_reload_handle); trillium_tokio::config() .with_port(address.port()) .with_host(&address.ip().to_string()) @@ -341,6 +347,70 @@ async fn health_endpoint_server(address: SocketAddr) { .await; } +fn zpages_handler(trace_reload_handle: TraceReloadHandle) -> impl Handler { + ( + Head::new(), + State(Arc::new(trace_reload_handle)), + Router::new() + .get( + "/healthz", + |conn: trillium::Conn| async move { conn.ok("") }, + ) + .get("/traceconfigz", api(get_traceconfigz)) + .put("/traceconfigz", api(put_traceconfigz)), + ) +} + +async fn get_traceconfigz( + conn: &mut trillium::Conn, + State(trace_reload_handle): State>, +) -> Result, Status> { + Ok(Json(TraceconfigzBody { + filter: trace_reload_handle + .with_current(|trace_filter| trace_filter.to_string()) + .map_err(|err| { + conn.set_body(format!("failed to get current filter: {err}")); + Status::InternalServerError + })?, + })) +} + +/// Allows modifying the runtime tracing filter. Accepts a request with a body corresponding to +/// [`TraceconfigzBody`]. If the `filter` field is empty, the filter will fallback to `error`. +/// See [`EnvFilter::try_new`] for details. +async fn put_traceconfigz( + conn: &mut trillium::Conn, + (State(trace_reload_handle), Json(req)): ( + State>, + Json, + ), +) -> Result, Status> { + let new_filter = EnvFilter::try_new(req.filter).map_err(|err| { + conn.set_body(format!("invalid filter: {err}")); + Status::BadRequest + })?; + trace_reload_handle.reload(new_filter).map_err(|err| { + conn.set_body(format!("failed to update filter: {err}")); + Status::InternalServerError + })?; + Ok(Json(TraceconfigzBody { + filter: trace_reload_handle + .with_current(|trace_filter| trace_filter.to_string()) + .map_err(|err| { + conn.set_body(format!("failed to get current filter: {err}")); + Status::InternalServerError + })?, + })) +} + +/// The response and request body used by /traceconfigz for reporting and updating its configuration. +#[derive(Debug, PartialEq, Eq, Serialize, Deserialize)] +struct TraceconfigzBody { + /// The directive that filters spans and events. This field follows the [`EnvFilter`][1] syntax. + /// [1]: https://docs.rs/tracing-subscriber/latest/tracing_subscriber/filter/struct.EnvFilter.html#directives + filter: String, +} + /// Register a signal handler for SIGTERM, and stop the [`Stopper`] when a SIGTERM signal is /// received. pub fn setup_signal_handler(stopper: Stopper) -> Result<(), std::io::Error> { @@ -402,10 +472,98 @@ pub async fn setup_server( #[cfg(test)] mod tests { use super::CommonBinaryOptions; + use crate::{ + aggregator::http_handlers::test_util::take_response_body, + binary_utils::{zpages_handler, TraceconfigzBody}, + }; use clap::CommandFactory; + use tracing_subscriber::{reload, EnvFilter}; + use trillium::Status; + use trillium_testing::prelude::*; #[test] fn verify_app() { CommonBinaryOptions::command().debug_assert() } + + #[tokio::test] + async fn healthz() { + let (_, filter_handle) = reload::Layer::new(EnvFilter::new("info")); + let handler = zpages_handler(filter_handle); + + let test_conn = get("/healthz").run_async(&handler).await; + assert_eq!(test_conn.status(), Some(Status::Ok)); + } + + #[tokio::test] + async fn traceconfigz() { + let (_filter, filter_handle) = reload::Layer::new(EnvFilter::new("info")); + let handler = zpages_handler(filter_handle); + + let mut test_conn = get("/traceconfigz").run_async(&handler).await; + assert_eq!(test_conn.status(), Some(Status::Ok)); + assert_eq!( + serde_json::from_slice::(&take_response_body(&mut test_conn).await) + .unwrap(), + TraceconfigzBody { + filter: "info".to_string() + } + ); + + let req = TraceconfigzBody { + filter: "debug".to_string(), + }; + let mut test_conn = put("/traceconfigz") + .with_request_body(serde_json::to_vec(&req).unwrap()) + .run_async(&handler) + .await; + assert_eq!(test_conn.status(), Some(Status::Ok)); + assert_eq!( + serde_json::from_slice::(&take_response_body(&mut test_conn).await) + .unwrap(), + req, + ); + + let req = TraceconfigzBody { + filter: "!(#*$#@)".to_string(), + }; + let mut test_conn = dbg!( + put("/traceconfigz") + .with_request_body(serde_json::to_vec(&req).unwrap()) + .run_async(&handler) + .await + ); + assert_eq!(test_conn.status(), Some(Status::BadRequest)); + assert!( + String::from_utf8_lossy(&take_response_body(&mut test_conn).await) + .starts_with("invalid filter:") + ); + } + + #[tokio::test] + async fn traceconfigz_dropped_filter() { + // Drop the filter immediately but leave the handle open. + let (_, filter_handle) = reload::Layer::new(EnvFilter::new("info")); + let handler = zpages_handler(filter_handle); + + let mut test_conn = get("/traceconfigz").run_async(&handler).await; + assert_eq!(test_conn.status(), Some(Status::InternalServerError)); + assert!( + String::from_utf8_lossy(&take_response_body(&mut test_conn).await) + .starts_with("failed to get current filter:") + ); + + let req = TraceconfigzBody { + filter: "debug".to_string(), + }; + let mut test_conn = put("/traceconfigz") + .with_request_body(serde_json::to_vec(&req).unwrap()) + .run_async(&handler) + .await; + assert_eq!(test_conn.status(), Some(Status::InternalServerError)); + assert!( + String::from_utf8_lossy(&take_response_body(&mut test_conn).await) + .starts_with("failed to update filter:") + ); + } } diff --git a/aggregator/src/trace.rs b/aggregator/src/trace.rs index c5e30d5ae..385f1a309 100644 --- a/aggregator/src/trace.rs +++ b/aggregator/src/trace.rs @@ -6,7 +6,9 @@ use std::{collections::HashMap, net::SocketAddr}; use tracing::Level; use tracing_chrome::{ChromeLayerBuilder, TraceStyle}; use tracing_log::LogTracer; -use tracing_subscriber::{filter::FromEnvError, layer::SubscriberExt, EnvFilter, Layer, Registry}; +use tracing_subscriber::{ + filter::FromEnvError, layer::SubscriberExt, reload, EnvFilter, Layer, Registry, +}; #[cfg(feature = "otlp")] use { @@ -119,17 +121,23 @@ fn make_trace_filter() -> Result { .from_env() } -/// Configures and installs a tracing subscriber, to capture events logged with -/// [`tracing::info`] and the like. Captured events are written to stdout, with -/// formatting affected by the provided [`TraceConfiguration`]. -pub fn install_trace_subscriber(config: &TraceConfiguration) -> Result { +pub type TraceReloadHandle = reload::Handle; + +/// Configures and installs a tracing subscriber, to capture events logged with [`tracing::info`] +/// and the like. Captured events are written to stdout, with formatting affected by the provided +/// [`TraceConfiguration`]. A handle to the stdout [`EnvFilter`] is provided, so that the filter +/// configuration can be altered later on at runtime. +pub fn install_trace_subscriber( + config: &TraceConfiguration, +) -> Result<(TraceGuards, TraceReloadHandle), Error> { // If stdout is not a tty or if forced by config, output logs as JSON // structures let output_json = atty::isnt(Stream::Stdout) || config.force_json_output; // Configure filters with RUST_LOG env var. Format discussed at // https://docs.rs/tracing-subscriber/latest/tracing_subscriber/struct.EnvFilter.html - let stdout_filter = EnvFilter::builder().from_env()?; + let (stdout_filter, stdout_filter_handle) = + reload::Layer::new(EnvFilter::builder().from_env()?); let mut layers = Vec::new(); match ( @@ -234,10 +242,13 @@ pub fn install_trace_subscriber(config: &TraceConfiguration) -> Result