From 943dfaa024d54d620ec839ccf88e9f09d3581d3c Mon Sep 17 00:00:00 2001 From: Mariusz Klochowicz Date: Mon, 28 Aug 2023 15:48:45 -0700 Subject: [PATCH] feat(maker): Monitor and expose status of services connected Monitor the following services: - coordinator - orderbook (we don't subscribe to orderbook feed yet) in order to derive health status of the maker. Health status is exposed via the HTTP API as well as reported as dedicated prometheus metrics. TODO: Subscribe to orderbook websocket stream to be able to detect if it is down TODO: Subcribe to bitmex client stream to be able to spot outdated information almost there, just exposing HTTP API I still need to show quickly how to return different HTTP errors based on health status --- maker/src/bin/maker.rs | 61 ++++++++++++++------- maker/src/health.rs | 120 +++++++++++++++++++++++++++++++++++++++++ maker/src/lib.rs | 1 + maker/src/metrics.rs | 45 +++++++++++++++- maker/src/routes.rs | 14 +++++ 5 files changed, 221 insertions(+), 20 deletions(-) create mode 100644 maker/src/health.rs diff --git a/maker/src/bin/maker.rs b/maker/src/bin/maker.rs index 07bd8cc8d..b2df04046 100644 --- a/maker/src/bin/maker.rs +++ b/maker/src/bin/maker.rs @@ -7,6 +7,7 @@ use ln_dlc_node::node::InMemoryStore; use ln_dlc_node::node::LnDlcNodeSettings; use ln_dlc_node::seed::Bip39Seed; use maker::cli::Opts; +use maker::health; use maker::ln::ldk_config; use maker::ln::EventHandler; use maker::logger; @@ -88,31 +89,49 @@ async fn main() -> Result<()> { let _running_node = node.start(event_handler)?; let node_pubkey = node.info.pubkey; - tokio::spawn(async move { - match trading::run( - &opts.orderbook, - node_pubkey, - network, - opts.concurrent_orders, - time::Duration::seconds(opts.order_expiry_after_seconds as i64), - ) - .await - { - Ok(()) => { - tracing::error!("Maker stopped trading"); - } - Err(error) => { - tracing::error!("Maker stopped trading: {error:#}"); + tokio::spawn({ + let orderbook = opts.orderbook.clone(); + async move { + match trading::run( + &orderbook, + node_pubkey, + network, + opts.concurrent_orders, + time::Duration::seconds(opts.order_expiry_after_seconds as i64), + ) + .await + { + Ok(()) => { + tracing::error!("Maker stopped trading"); + } + Err(error) => { + tracing::error!("Maker stopped trading: {error:#}"); + } } } }); - tokio::spawn({ + let (health, health_tx) = health::Health::new(); + + let _monitor_coordinator_status = tokio::spawn({ + let endpoint = opts.orderbook.clone(); + let client = reqwest_client(); + let interval = Duration::from_secs(10); + async move { + health::check_health_endpoint(&client, endpoint, health_tx.coordinator, interval).await; + } + }); + + // TODO: Monitor orderbook websocket stream with `health_tx.orderbook` when we subscribe to it + + let _collect_prometheus_metrics = tokio::spawn({ let node = node.clone(); + let health = health.clone(); async move { loop { let node = node.clone(); - spawn_blocking(move || metrics::collect(node)) + let health = health.clone(); + spawn_blocking(move || metrics::collect(node, health)) .await .expect("To spawn blocking thread"); tokio::time::sleep(PROCESS_PROMETHEUS_METRICS).await; @@ -128,7 +147,7 @@ async fn main() -> Result<()> { let mut conn = pool.get().expect("to get connection from pool"); run_migration(&mut conn); - let app = router(node, exporter, pool); + let app = router(node, exporter, pool, health); // Start the metrics exporter autometrics::prometheus_exporter::init(); @@ -150,3 +169,9 @@ async fn main() -> Result<()> { Ok(()) } +fn reqwest_client() -> reqwest::Client { + reqwest::Client::builder() + .timeout(std::time::Duration::from_secs(30)) + .build() + .expect("Failed to build reqwest client") +} diff --git a/maker/src/health.rs b/maker/src/health.rs new file mode 100644 index 000000000..7968264a1 --- /dev/null +++ b/maker/src/health.rs @@ -0,0 +1,120 @@ +// TODO: Add health collection metrics +// This can be later used to add health metrics to the health endpoint + +use anyhow::bail; +use anyhow::Context; +use anyhow::Result; +use reqwest::Client; +use reqwest::StatusCode; +use reqwest::Url; +use std::time::Duration; +use tokio::sync::watch; + +/// Health status of a service +#[derive(Debug, Default, Clone, Copy, PartialEq, Eq)] +pub enum ServiceStatus { + #[default] + Unknown, + Online, + Offline, +} + +/// Health monitoring for the node +/// +/// Simple endpoint querying is handled by provided configuration, for more complex health checks +/// the transmitters are exposed to be plugged in the services that need to publish their health +/// status. + +#[derive(Clone)] +pub struct Health { + /// Coordinator HTTP API status + coordinator_rx: watch::Receiver, + /// Orderbook websocket stream status + orderbook_rx: watch::Receiver, +} + +/// Transmitters that need to be plugged in the services that need to publish their health status. +pub struct Tx { + pub orderbook: watch::Sender, + pub coordinator: watch::Sender, +} + +impl Health { + pub fn new() -> (Self, Tx) { + let (orderbook_tx, orderbook_rx) = watch::channel(ServiceStatus::Unknown); + let (coordinator_tx, coordinator_rx) = watch::channel(ServiceStatus::Unknown); + + ( + Self { + coordinator_rx, + orderbook_rx, + }, + Tx { + orderbook: orderbook_tx, + coordinator: coordinator_tx, + }, + ) + } + + // TODO: Any ideas of the most useful way of sending this information? + // - perhaps a serialized struct (json) with the status of each service? + pub fn get_health(&self) -> Result { + let mut health_info = String::new(); + health_info.push_str(&format!( + "Coordinator: {:?}\n", + self.get_coordinator_status() + )); + health_info.push_str(&format!("Orderbook: {:?}\n", self.get_orderbook_status())); + + if self.get_coordinator_status() == ServiceStatus::Online + // TODO: Uncomment this line when we retrieve the orderbook status + // && (self.get_orderbook_status() == ServiceStatus::Online) + { + Ok(health_info) + } else { + bail!("Status: ERROR\n + {health_info}"); + } + } + + pub fn get_coordinator_status(&self) -> ServiceStatus { + *self.coordinator_rx.borrow() + } + + pub fn get_orderbook_status(&self) -> ServiceStatus { + *self.orderbook_rx.borrow() + } + // TODO: Add bitmex status? would anything else be useful here? +} + +/// Simple way of checking if a service is online or offline +pub async fn check_health_endpoint( + client: &Client, + endpoint: Url, + tx: watch::Sender, + interval: Duration, +) { + loop { + let status = if check_endpoint_availability(client, endpoint.clone()) + .await + .is_ok() + { + ServiceStatus::Online + } else { + ServiceStatus::Offline + }; + + tx.send(status).expect("Receiver not to be dropped"); + tokio::time::sleep(interval).await; + } +} + +async fn check_endpoint_availability(client: &Client, endpoint: Url) -> Result { + tracing::trace!(%endpoint, "Sending request to check health"); + let response = client + .get(endpoint) + .send() + .await + .context("could not send request")? + .error_for_status()?; + Ok(response.status()) +} diff --git a/maker/src/lib.rs b/maker/src/lib.rs index bbfdc2f40..4d9d2da19 100644 --- a/maker/src/lib.rs +++ b/maker/src/lib.rs @@ -7,6 +7,7 @@ use diesel_migrations::MigrationHarness; mod tests; pub mod cli; +pub mod health; pub mod ln; pub mod logger; pub mod metrics; diff --git a/maker/src/metrics.rs b/maker/src/metrics.rs index ecc8037f5..cd6535b5c 100644 --- a/maker/src/metrics.rs +++ b/maker/src/metrics.rs @@ -1,3 +1,5 @@ +use crate::health::Health; +use crate::health::ServiceStatus; use lazy_static::lazy_static; use lightning::ln::channelmanager::ChannelDetails; use ln_dlc_node::node::InMemoryStore; @@ -16,7 +18,16 @@ use std::sync::Arc; use std::time::Duration; lazy_static! { - pub static ref METER: Meter = global::meter("coordinator"); + pub static ref METER: Meter = global::meter("maker"); + + // health metrics + pub static ref COORDINATOR_STATUS: ObservableGauge = METER.u64_observable_gauge("coordinator_status") + .with_description("Coordinator status") + .init(); + + pub static ref ORDERBOOK_STATUS: ObservableGauge = METER.u64_observable_gauge("orderbook_status") + .with_description("Orderbook status") + .init(); // channel details metrics pub static ref CHANNEL_BALANCE_SATOSHI: ObservableGauge = METER @@ -58,12 +69,42 @@ pub fn init_meter() -> PrometheusExporter { opentelemetry_prometheus::exporter(controller).init() } -pub fn collect(node: Arc>) { +pub fn collect(node: Arc>, health: Health) { let cx = opentelemetry::Context::current(); let channels = node.channel_manager.list_channels(); channel_metrics(&cx, channels); node_metrics(&cx, node); + health_metrics(&cx, &health); +} + +fn health_metrics(cx: &Context, health: &Health) { + update_health_metric(cx, health.get_coordinator_status(), &COORDINATOR_STATUS); + update_health_metric(cx, health.get_orderbook_status(), &ORDERBOOK_STATUS); +} + +/// Updates the health metric given a service status +fn update_health_metric(cx: &Context, service_status: ServiceStatus, gauge: &ObservableGauge) { + match service_status { + ServiceStatus::Unknown => { + gauge.observe(cx, 0, &[KeyValue::new("status", "unknown")]); + } + ServiceStatus::Online => { + gauge.observe(cx, 1, &[KeyValue::new("status", "online")]); + } + ServiceStatus::Offline => { + gauge.observe(cx, 0, &[KeyValue::new("status", "offline")]); + } + } + + // let _ = check_endpoint_availability(client, "http://localhost:8080/health").await.map(|status| match status { + // StatusCode::OK => { + // COORDINATOR_STATUS.observe(cx, 1, &[KeyValue::new("status", "ok")]); + // } + // _ => { + // COORDINATOR_STATUS.observe(cx, 0, &[KeyValue::new("status", "error")]); + // } + // }); } fn channel_metrics(cx: &Context, channels: Vec) { diff --git a/maker/src/routes.rs b/maker/src/routes.rs index d7d0a72c4..360dab0f7 100644 --- a/maker/src/routes.rs +++ b/maker/src/routes.rs @@ -1,3 +1,4 @@ +use crate::health::Health; use axum::extract::Path; use axum::extract::State; use axum::http::StatusCode; @@ -29,17 +30,20 @@ pub struct AppState { pub node: Arc>, pub exporter: PrometheusExporter, pub pool: Pool>, + pub health: Health, } pub fn router( node: Arc>, exporter: PrometheusExporter, pool: Pool>, + health: Health, ) -> Router { let app_state = Arc::new(AppState { node, exporter, pool, + health, }); Router::new() @@ -52,6 +56,7 @@ pub fn router( .route("/api/pay-invoice/:invoice", post(pay_invoice)) .route("/api/sync-on-chain", post(sync_on_chain)) .route("/metrics", get(get_metrics)) + .route("/health", get(get_health)) .with_state(app_state) } @@ -267,3 +272,12 @@ pub async fn get_metrics(State(state): State>) -> impl IntoRespons (StatusCode::OK, open_telemetry_metrics + &autometrics) } + +/// Returns 500 if any of the vital services are offline +pub async fn get_health(State(state): State>) -> Result, AppError> { + let resp = state + .health + .get_health() + .map_err(|e| AppError::InternalServerError(format!("Error: {e:#}")))?; + Ok(Json(resp)) +}