Skip to content

Commit

Permalink
Add endpoint for configuring tracing filter at runtime (#2168) (#2171)
Browse files Browse the repository at this point in the history
* Add endpoint for configuring tracing filter at runtime

* Document how to use new /traceconfigz endpoint

* Errant dbg! statement

* Fix doccomment

* PR feedback

* Use tuple for error return type

* Add tests for bad requests and dropped filters
  • Loading branch information
inahga authored Oct 31, 2023
1 parent dda4531 commit 111da67
Show file tree
Hide file tree
Showing 12 changed files with 224 additions and 37 deletions.
4 changes: 3 additions & 1 deletion aggregator/src/bin/janus_cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -143,8 +143,10 @@ impl Command {
async fn install_tracing_and_metrics_handlers(
config: &CommonConfig,
) -> Result<(TraceGuards, MetricsExporterHandle)> {
let trace_guard = install_trace_subscriber(&config.logging_config)
// Discard the trace reload handler, since this program is short-lived.
let (trace_guard, _) = install_trace_subscriber(&config.logging_config)
.context("couldn't install tracing subscriber")?;

let metrics_guard = install_metrics_exporter(&config.metrics_config)
.await
.context("failed to install metrics exporter")?;
Expand Down
194 changes: 176 additions & 18 deletions aggregator/src/binary_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ pub mod job_driver;
use crate::{
config::{BinaryConfig, DbConfig},
metrics::{install_metrics_exporter, MetricsExporterConfiguration},
trace::{install_trace_subscriber, OpenTelemetryTraceConfiguration},
trace::{install_trace_subscriber, OpenTelemetryTraceConfiguration, TraceReloadHandle},
};
use anyhow::{anyhow, Context as _, Result};
use backoff::{future::retry, ExponentialBackoff};
Expand All @@ -18,6 +18,7 @@ use janus_aggregator_core::datastore::{Crypter, Datastore};
use janus_core::time::Clock;
use opentelemetry::metrics::Meter;
use ring::aead::{LessSafeKey, UnboundKey, AES_128_GCM};
use serde::{Deserialize, Serialize};
use std::{
fmt::{self, Debug, Formatter},
fs,
Expand All @@ -26,12 +27,15 @@ use std::{
panic,
path::PathBuf,
str::FromStr,
sync::Arc,
time::Duration,
};
use tokio::sync::oneshot;
use tokio_postgres::NoTls;
use tracing::{debug, info};
use trillium::{Handler, Headers, Info, Init};
use tracing_subscriber::EnvFilter;
use trillium::{Handler, Headers, Info, Init, Status};
use trillium_api::{api, Json, State};
use trillium_head::Head;
use trillium_router::Router;
use trillium_tokio::Stopper;
Expand Down Expand Up @@ -278,8 +282,9 @@ where
let config: Config = read_config(options.common_options())?;

// Install tracing/metrics handlers.
let _guards = install_trace_subscriber(&config.common_config().logging_config)
.context("couldn't install tracing subscriber")?;
let (_guards, trace_reload_handle) =
install_trace_subscriber(&config.common_config().logging_config)
.context("couldn't install tracing subscriber")?;
let _metrics_exporter = install_metrics_exporter(&config.common_config().metrics_config)
.await
.context("failed to install metrics exporter")?;
Expand All @@ -305,10 +310,9 @@ where
.context("couldn't create datastore")?;

let health_check_listen_address = config.common_config().health_check_listen_address;
let healthz_task_handle =
tokio::task::spawn(
async move { health_endpoint_server(health_check_listen_address).await },
);
let zpages_task_handle = tokio::task::spawn(async move {
zpages_server(health_check_listen_address, trace_reload_handle).await
});

let result = f(BinaryContext {
clock,
Expand All @@ -319,20 +323,22 @@ where
})
.await;

healthz_task_handle.abort();
zpages_task_handle.abort();

result
}

/// Listen for HTTP requests on a given port, and respond to requests for "/healthz" with an empty
/// body and status code 200. Each Janus component exposes this HTTP server to enable health
/// checks, and to indicate when it has successfully started up.
async fn health_endpoint_server(address: SocketAddr) {
let router = Router::new().get(
"/healthz",
|conn: trillium::Conn| async move { conn.ok("") },
);
let handler = (Head::new(), router);
/// A trillium server which serves z-pages, which are utility endpoints for health checks and
/// tracing configuration. It listens on the given address and port. It also takes the reload
/// handle necessary for reloading the tracing_subscriber configuration.
///
/// `/healthz` responds with an empty body and status code 200, which serves as a healthcheck to
/// indicate when Janus has started up.
///
/// `/traceconfigz` responds with the tracing_subscriber configuration, or allows configuring it
/// with a PUT request.
async fn zpages_server(address: SocketAddr, trace_reload_handle: TraceReloadHandle) {
let handler = zpages_handler(trace_reload_handle);
trillium_tokio::config()
.with_port(address.port())
.with_host(&address.ip().to_string())
Expand All @@ -341,6 +347,70 @@ async fn health_endpoint_server(address: SocketAddr) {
.await;
}

fn zpages_handler(trace_reload_handle: TraceReloadHandle) -> impl Handler {
(
Head::new(),
State(Arc::new(trace_reload_handle)),
Router::new()
.get(
"/healthz",
|conn: trillium::Conn| async move { conn.ok("") },
)
.get("/traceconfigz", api(get_traceconfigz))
.put("/traceconfigz", api(put_traceconfigz)),
)
}

async fn get_traceconfigz(
conn: &mut trillium::Conn,
State(trace_reload_handle): State<Arc<TraceReloadHandle>>,
) -> Result<Json<TraceconfigzBody>, Status> {
Ok(Json(TraceconfigzBody {
filter: trace_reload_handle
.with_current(|trace_filter| trace_filter.to_string())
.map_err(|err| {
conn.set_body(format!("failed to get current filter: {err}"));
Status::InternalServerError
})?,
}))
}

/// Allows modifying the runtime tracing filter. Accepts a request with a body corresponding to
/// [`TraceconfigzBody`]. If the `filter` field is empty, the filter will fallback to `error`.
/// See [`EnvFilter::try_new`] for details.
async fn put_traceconfigz(
conn: &mut trillium::Conn,
(State(trace_reload_handle), Json(req)): (
State<Arc<TraceReloadHandle>>,
Json<TraceconfigzBody>,
),
) -> Result<Json<TraceconfigzBody>, Status> {
let new_filter = EnvFilter::try_new(req.filter).map_err(|err| {
conn.set_body(format!("invalid filter: {err}"));
Status::BadRequest
})?;
trace_reload_handle.reload(new_filter).map_err(|err| {
conn.set_body(format!("failed to update filter: {err}"));
Status::InternalServerError
})?;
Ok(Json(TraceconfigzBody {
filter: trace_reload_handle
.with_current(|trace_filter| trace_filter.to_string())
.map_err(|err| {
conn.set_body(format!("failed to get current filter: {err}"));
Status::InternalServerError
})?,
}))
}

/// The response and request body used by /traceconfigz for reporting and updating its configuration.
#[derive(Debug, PartialEq, Eq, Serialize, Deserialize)]
struct TraceconfigzBody {
/// The directive that filters spans and events. This field follows the [`EnvFilter`][1] syntax.
/// [1]: https://docs.rs/tracing-subscriber/latest/tracing_subscriber/filter/struct.EnvFilter.html#directives
filter: String,
}

/// Register a signal handler for SIGTERM, and stop the [`Stopper`] when a SIGTERM signal is
/// received.
pub fn setup_signal_handler(stopper: Stopper) -> Result<(), std::io::Error> {
Expand Down Expand Up @@ -402,10 +472,98 @@ pub async fn setup_server(
#[cfg(test)]
mod tests {
use super::CommonBinaryOptions;
use crate::{
aggregator::http_handlers::test_util::take_response_body,
binary_utils::{zpages_handler, TraceconfigzBody},
};
use clap::CommandFactory;
use tracing_subscriber::{reload, EnvFilter};
use trillium::Status;
use trillium_testing::prelude::*;

#[test]
fn verify_app() {
CommonBinaryOptions::command().debug_assert()
}

#[tokio::test]
async fn healthz() {
let (_, filter_handle) = reload::Layer::new(EnvFilter::new("info"));
let handler = zpages_handler(filter_handle);

let test_conn = get("/healthz").run_async(&handler).await;
assert_eq!(test_conn.status(), Some(Status::Ok));
}

#[tokio::test]
async fn traceconfigz() {
let (_filter, filter_handle) = reload::Layer::new(EnvFilter::new("info"));
let handler = zpages_handler(filter_handle);

let mut test_conn = get("/traceconfigz").run_async(&handler).await;
assert_eq!(test_conn.status(), Some(Status::Ok));
assert_eq!(
serde_json::from_slice::<TraceconfigzBody>(&take_response_body(&mut test_conn).await)
.unwrap(),
TraceconfigzBody {
filter: "info".to_string()
}
);

let req = TraceconfigzBody {
filter: "debug".to_string(),
};
let mut test_conn = put("/traceconfigz")
.with_request_body(serde_json::to_vec(&req).unwrap())
.run_async(&handler)
.await;
assert_eq!(test_conn.status(), Some(Status::Ok));
assert_eq!(
serde_json::from_slice::<TraceconfigzBody>(&take_response_body(&mut test_conn).await)
.unwrap(),
req,
);

let req = TraceconfigzBody {
filter: "!(#*$#@)".to_string(),
};
let mut test_conn = dbg!(
put("/traceconfigz")
.with_request_body(serde_json::to_vec(&req).unwrap())
.run_async(&handler)
.await
);
assert_eq!(test_conn.status(), Some(Status::BadRequest));
assert!(
String::from_utf8_lossy(&take_response_body(&mut test_conn).await)
.starts_with("invalid filter:")
);
}

#[tokio::test]
async fn traceconfigz_dropped_filter() {
// Drop the filter immediately but leave the handle open.
let (_, filter_handle) = reload::Layer::new(EnvFilter::new("info"));
let handler = zpages_handler(filter_handle);

let mut test_conn = get("/traceconfigz").run_async(&handler).await;
assert_eq!(test_conn.status(), Some(Status::InternalServerError));
assert!(
String::from_utf8_lossy(&take_response_body(&mut test_conn).await)
.starts_with("failed to get current filter:")
);

let req = TraceconfigzBody {
filter: "debug".to_string(),
};
let mut test_conn = put("/traceconfigz")
.with_request_body(serde_json::to_vec(&req).unwrap())
.run_async(&handler)
.await;
assert_eq!(test_conn.status(), Some(Status::InternalServerError));
assert!(
String::from_utf8_lossy(&take_response_body(&mut test_conn).await)
.starts_with("failed to update filter:")
);
}
}
31 changes: 21 additions & 10 deletions aggregator/src/trace.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,9 @@ use std::{collections::HashMap, net::SocketAddr};
use tracing::Level;
use tracing_chrome::{ChromeLayerBuilder, TraceStyle};
use tracing_log::LogTracer;
use tracing_subscriber::{filter::FromEnvError, layer::SubscriberExt, EnvFilter, Layer, Registry};
use tracing_subscriber::{
filter::FromEnvError, layer::SubscriberExt, reload, EnvFilter, Layer, Registry,
};

#[cfg(feature = "otlp")]
use {
Expand Down Expand Up @@ -119,17 +121,23 @@ fn make_trace_filter() -> Result<EnvFilter, FromEnvError> {
.from_env()
}

/// Configures and installs a tracing subscriber, to capture events logged with
/// [`tracing::info`] and the like. Captured events are written to stdout, with
/// formatting affected by the provided [`TraceConfiguration`].
pub fn install_trace_subscriber(config: &TraceConfiguration) -> Result<TraceGuards, Error> {
pub type TraceReloadHandle = reload::Handle<EnvFilter, Registry>;

/// Configures and installs a tracing subscriber, to capture events logged with [`tracing::info`]
/// and the like. Captured events are written to stdout, with formatting affected by the provided
/// [`TraceConfiguration`]. A handle to the stdout [`EnvFilter`] is provided, so that the filter
/// configuration can be altered later on at runtime.
pub fn install_trace_subscriber(
config: &TraceConfiguration,
) -> Result<(TraceGuards, TraceReloadHandle), Error> {
// If stdout is not a tty or if forced by config, output logs as JSON
// structures
let output_json = atty::isnt(Stream::Stdout) || config.force_json_output;

// Configure filters with RUST_LOG env var. Format discussed at
// https://docs.rs/tracing-subscriber/latest/tracing_subscriber/struct.EnvFilter.html
let stdout_filter = EnvFilter::builder().from_env()?;
let (stdout_filter, stdout_filter_handle) =
reload::Layer::new(EnvFilter::builder().from_env()?);

let mut layers = Vec::new();
match (
Expand Down Expand Up @@ -234,10 +242,13 @@ pub fn install_trace_subscriber(config: &TraceConfiguration) -> Result<TraceGuar
// Install a logger that converts logs into tracing events
LogTracer::init()?;

Ok(TraceGuards {
uses_otel_tracer: config.open_telemetry_config.is_some(),
_chrome_guard: chrome_guard,
})
Ok((
TraceGuards {
uses_otel_tracer: config.open_telemetry_config.is_some(),
_chrome_guard: chrome_guard,
},
stdout_filter_handle,
))
}

pub struct TraceGuards {
Expand Down
16 changes: 16 additions & 0 deletions docs/DEPLOYING.md
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,22 @@ human-readable or structured JSON log format will be chosen automatically. This
can be overridden with the `force_json_output` and `stackdriver_json_output`
configuration parameters under `logging_config`.

The `RUST_LOG` environment variable can be overridden at runtime using the
`/traceconfigz` path on the `health_check_listen_address` endpoint. Here's
an example using this with `curl`:

```bash
$ HEALTH_CHECK_LISTEN_ADDRESS=http://localhost:8000

$ curl $HEALTH_CHECK_LISTEN_ADDRESS/traceconfigz
{"filter":"info"}

$ curl $HEALTH_CHECK_LISTEN_ADDRESS/traceconfigz -X PUT -d '{"filter":"debug"}'
{"filter":"debug"}
```

The `filter` field corresponds exactly to the [EnvFilter] format.

[EnvFilter]: https://docs.rs/tracing-subscriber/latest/tracing_subscriber/struct.EnvFilter.html

##### Metrics
Expand Down
2 changes: 1 addition & 1 deletion docs/samples/advanced_config/aggregation_job_creator.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ database:
# (optional, defaults to true)
check_schema_version: true

# Socket address for /healthz HTTP requests. Defaults to 127.0.0.1:9001.
# Socket address for /healthz and /traceconfigz HTTP requests. Defaults to 127.0.0.1:9001.
health_check_listen_address: "0.0.0.0:8000"

# Logging configuration. (optional)
Expand Down
2 changes: 1 addition & 1 deletion docs/samples/advanced_config/aggregation_job_driver.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ database:
# (optional, defaults to true)
check_schema_version: true

# Socket address for /healthz HTTP requests. Defaults to 127.0.0.1:9001.
# Socket address for /healthz and /traceconfigz HTTP requests. Defaults to 127.0.0.1:9001.
health_check_listen_address: "0.0.0.0:8000"

# Logging configuration. (optional)
Expand Down
2 changes: 1 addition & 1 deletion docs/samples/advanced_config/aggregator.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ database:
# (optional, defaults to true)
check_schema_version: true

# Socket address for /healthz HTTP requests. Defaults to 127.0.0.1:9001.
# Socket address for /healthz and /traceconfigz HTTP requests. Defaults to 127.0.0.1:9001.
health_check_listen_address: "0.0.0.0:8000"

# Logging configuration. (optional)
Expand Down
2 changes: 1 addition & 1 deletion docs/samples/advanced_config/collection_job_driver.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ database:
# (optional, defaults to true)
check_schema_version: true

# Socket address for /healthz HTTP requests. Defaults to 127.0.0.1:9001.
# Socket address for /healthz and /traceconfigz HTTP requests. Defaults to 127.0.0.1:9001.
health_check_listen_address: "0.0.0.0:8000"

# Logging configuration. (optional)
Expand Down
Loading

0 comments on commit 111da67

Please sign in to comment.