diff --git a/crates/tests-e2e/src/maker.rs b/crates/tests-e2e/src/maker.rs index 026dcf5ca..10aaa2ee6 100644 --- a/crates/tests-e2e/src/maker.rs +++ b/crates/tests-e2e/src/maker.rs @@ -29,8 +29,12 @@ impl Maker { Self::new(client, "http://localhost:18000") } + /// Check whether maker is running and that it is connected to all services + /// it depends on pub async fn is_running(&self) -> bool { - self.get("/").await.is_ok() + self.get("/health") + .await + .is_ok_and(|r| r.status().is_success()) } pub async fn sync_on_chain(&self) -> Result<()> { diff --git a/crates/tests-e2e/tests/maker.rs b/crates/tests-e2e/tests/maker.rs index a335c0e20..5bba58389 100644 --- a/crates/tests-e2e/tests/maker.rs +++ b/crates/tests-e2e/tests/maker.rs @@ -15,12 +15,13 @@ async fn maker_can_open_channel_to_coordinator_and_send_payment() -> Result<()> let client = init_reqwest(); - let maker = Maker::new_local(client.clone()); - assert!(maker.is_running().await); - let coordinator = Coordinator::new_local(client.clone()); assert!(coordinator.is_running().await); + // Start maker after coordinator as its health check needs coordinator + let maker = Maker::new_local(client.clone()); + wait_until!(maker.is_running().await); + let node_info_coordinator = coordinator.get_node_info().await?; // Ensure the maker has a free UTXO available. diff --git a/maker/src/bin/maker.rs b/maker/src/bin/maker.rs index 07bd8cc8d..b6d740e25 100644 --- a/maker/src/bin/maker.rs +++ b/maker/src/bin/maker.rs @@ -7,6 +7,7 @@ use ln_dlc_node::node::InMemoryStore; use ln_dlc_node::node::LnDlcNodeSettings; use ln_dlc_node::seed::Bip39Seed; use maker::cli::Opts; +use maker::health; use maker::ln::ldk_config; use maker::ln::EventHandler; use maker::logger; @@ -87,32 +88,55 @@ async fn main() -> Result<()> { let event_handler = EventHandler::new(node.clone()); let _running_node = node.start(event_handler)?; + let (health, health_tx) = health::Health::new(); + let node_pubkey = node.info.pubkey; - tokio::spawn(async move { - match trading::run( - &opts.orderbook, - node_pubkey, - network, - opts.concurrent_orders, - time::Duration::seconds(opts.order_expiry_after_seconds as i64), - ) - .await - { - Ok(()) => { - tracing::error!("Maker stopped trading"); - } - Err(error) => { - tracing::error!("Maker stopped trading: {error:#}"); + tokio::spawn({ + let orderbook = opts.orderbook.clone(); + async move { + match trading::run( + &orderbook, + node_pubkey, + network, + opts.concurrent_orders, + time::Duration::seconds(opts.order_expiry_after_seconds as i64), + health_tx.bitmex_pricefeed, + ) + .await + { + Ok(()) => { + tracing::error!("Maker stopped trading"); + } + Err(error) => { + tracing::error!("Maker stopped trading: {error:#}"); + } } } }); - tokio::spawn({ + let _monitor_coordinator_status = tokio::spawn({ + let endpoint = opts.orderbook.clone(); + let client = reqwest_client(); + let interval = Duration::from_secs(10); + async move { + health::check_health_endpoint(&client, endpoint, health_tx.coordinator, interval).await; + } + }); + + // TODO: Monitor orderbook websocket stream with `health_tx.orderbook` when we subscribe to it + health_tx + .orderbook + .send(health::ServiceStatus::Online) + .expect("to be able to send"); + + let _collect_prometheus_metrics = tokio::spawn({ let node = node.clone(); + let health = health.clone(); async move { loop { let node = node.clone(); - spawn_blocking(move || metrics::collect(node)) + let health = health.clone(); + spawn_blocking(move || metrics::collect(node, health)) .await .expect("To spawn blocking thread"); tokio::time::sleep(PROCESS_PROMETHEUS_METRICS).await; @@ -128,7 +152,7 @@ async fn main() -> Result<()> { let mut conn = pool.get().expect("to get connection from pool"); run_migration(&mut conn); - let app = router(node, exporter, pool); + let app = router(node, exporter, pool, health); // Start the metrics exporter autometrics::prometheus_exporter::init(); @@ -150,3 +174,9 @@ async fn main() -> Result<()> { Ok(()) } +fn reqwest_client() -> reqwest::Client { + reqwest::Client::builder() + .timeout(std::time::Duration::from_secs(30)) + .build() + .expect("Failed to build reqwest client") +} diff --git a/maker/src/health.rs b/maker/src/health.rs new file mode 100644 index 000000000..026e73581 --- /dev/null +++ b/maker/src/health.rs @@ -0,0 +1,138 @@ +use anyhow::bail; +use anyhow::Context; +use anyhow::Result; +use reqwest::Client; +use reqwest::StatusCode; +use reqwest::Url; +use serde::Serialize; +use std::time::Duration; +use tokio::sync::watch; + +/// Health status of a service +#[derive(Debug, Default, Clone, Copy, PartialEq, Eq, Serialize)] +pub enum ServiceStatus { + #[default] + Unknown, + Online, + Offline, +} + +/// Health monitoring for the node +/// +/// Simple endpoint querying is handled by provided configuration, for more complex health checks +/// the transmitters are exposed to be plugged in the services that need to publish their health +/// status. + +#[derive(Clone)] +pub struct Health { + /// Coordinator HTTP API status + coordinator_rx: watch::Receiver, + /// Orderbook websocket stream status + orderbook_rx: watch::Receiver, + /// Bitmex pricefeed stream status + bitmex_pricefeed_rx: watch::Receiver, +} + +/// Transmitters that need to be plugged in the services that need to publish their health status. +pub struct Tx { + pub orderbook: watch::Sender, + pub coordinator: watch::Sender, + pub bitmex_pricefeed: watch::Sender, +} + +/// Struct returned by maker's health endpoint. +#[derive(Debug, Serialize)] +pub struct OverallMakerHealth { + coordinator: ServiceStatus, + orderbook: ServiceStatus, + bitmex_pricefeed: ServiceStatus, +} + +impl OverallMakerHealth { + pub fn is_healthy(&self) -> bool { + self.coordinator == ServiceStatus::Online + && self.bitmex_pricefeed == ServiceStatus::Online + && self.orderbook == ServiceStatus::Online + } +} + +impl Health { + pub fn new() -> (Self, Tx) { + let (orderbook_tx, orderbook_rx) = watch::channel(ServiceStatus::Unknown); + let (coordinator_tx, coordinator_rx) = watch::channel(ServiceStatus::Unknown); + let (bitmex_pricefeed_tx, bitmex_pricefeed_rx) = watch::channel(ServiceStatus::Unknown); + + ( + Self { + coordinator_rx, + orderbook_rx, + bitmex_pricefeed_rx, + }, + Tx { + orderbook: orderbook_tx, + coordinator: coordinator_tx, + bitmex_pricefeed: bitmex_pricefeed_tx, + }, + ) + } + + pub fn get_health(&self) -> Result { + let health_info = OverallMakerHealth { + coordinator: self.get_coordinator_status(), + orderbook: self.get_orderbook_status(), + bitmex_pricefeed: self.get_bitmex_pricefeed_status(), + }; + + match health_info.is_healthy() { + true => Ok(health_info), + false => { + bail!("Status: ERROR\n + {health_info:?}"); + } + } + } + + pub fn get_coordinator_status(&self) -> ServiceStatus { + *self.coordinator_rx.borrow() + } + + pub fn get_orderbook_status(&self) -> ServiceStatus { + *self.orderbook_rx.borrow() + } + + pub fn get_bitmex_pricefeed_status(&self) -> ServiceStatus { + *self.bitmex_pricefeed_rx.borrow() + } +} + +/// Simple way of checking if a service is online or offline +pub async fn check_health_endpoint( + client: &Client, + endpoint: Url, + tx: watch::Sender, + interval: Duration, +) { + loop { + let status = if check_endpoint_availability(client, endpoint.clone()) + .await + .is_ok() + { + ServiceStatus::Online + } else { + ServiceStatus::Offline + }; + + tx.send(status).expect("Receiver not to be dropped"); + tokio::time::sleep(interval).await; + } +} + +async fn check_endpoint_availability(client: &Client, endpoint: Url) -> Result { + tracing::trace!(%endpoint, "Sending request to check health"); + let response = client + .get(endpoint) + .send() + .await + .context("could not send request")? + .error_for_status()?; + Ok(response.status()) +} diff --git a/maker/src/lib.rs b/maker/src/lib.rs index bbfdc2f40..4d9d2da19 100644 --- a/maker/src/lib.rs +++ b/maker/src/lib.rs @@ -7,6 +7,7 @@ use diesel_migrations::MigrationHarness; mod tests; pub mod cli; +pub mod health; pub mod ln; pub mod logger; pub mod metrics; diff --git a/maker/src/metrics.rs b/maker/src/metrics.rs index ecc8037f5..4f8ffcc60 100644 --- a/maker/src/metrics.rs +++ b/maker/src/metrics.rs @@ -1,3 +1,5 @@ +use crate::health::Health; +use crate::health::ServiceStatus; use lazy_static::lazy_static; use lightning::ln::channelmanager::ChannelDetails; use ln_dlc_node::node::InMemoryStore; @@ -16,7 +18,20 @@ use std::sync::Arc; use std::time::Duration; lazy_static! { - pub static ref METER: Meter = global::meter("coordinator"); + pub static ref METER: Meter = global::meter("maker"); + + // health metrics + pub static ref COORDINATOR_STATUS: ObservableGauge = METER.u64_observable_gauge("coordinator_status") + .with_description("Coordinator status") + .init(); + + pub static ref ORDERBOOK_STATUS: ObservableGauge = METER.u64_observable_gauge("orderbook_status") + .with_description("Orderbook status") + .init(); + + pub static ref BITMEX_PRICEFEED_STATUS: ObservableGauge = METER.u64_observable_gauge("bitmex_pricefeed_status") + .with_description("Bitmex pricefeed status") + .init(); // channel details metrics pub static ref CHANNEL_BALANCE_SATOSHI: ObservableGauge = METER @@ -58,12 +73,33 @@ pub fn init_meter() -> PrometheusExporter { opentelemetry_prometheus::exporter(controller).init() } -pub fn collect(node: Arc>) { +pub fn collect(node: Arc>, health: Health) { let cx = opentelemetry::Context::current(); let channels = node.channel_manager.list_channels(); channel_metrics(&cx, channels); node_metrics(&cx, node); + health_metrics(&cx, &health); +} + +fn health_metrics(cx: &Context, health: &Health) { + update_health_metric(cx, health.get_coordinator_status(), &COORDINATOR_STATUS); + update_health_metric(cx, health.get_orderbook_status(), &ORDERBOOK_STATUS); + update_health_metric( + cx, + health.get_bitmex_pricefeed_status(), + &BITMEX_PRICEFEED_STATUS, + ); +} + +/// Updates the health metric given a service status +fn update_health_metric(cx: &Context, service_status: ServiceStatus, gauge: &ObservableGauge) { + let value = match service_status { + ServiceStatus::Offline => 0, + ServiceStatus::Online => 1, + ServiceStatus::Unknown => 2, + }; + gauge.observe(cx, value, &[]); } fn channel_metrics(cx: &Context, channels: Vec) { diff --git a/maker/src/routes.rs b/maker/src/routes.rs index d7d0a72c4..3ce8c1513 100644 --- a/maker/src/routes.rs +++ b/maker/src/routes.rs @@ -1,3 +1,5 @@ +use crate::health::Health; +use crate::health::OverallMakerHealth; use axum::extract::Path; use axum::extract::State; use axum::http::StatusCode; @@ -29,17 +31,20 @@ pub struct AppState { pub node: Arc>, pub exporter: PrometheusExporter, pub pool: Pool>, + pub health: Health, } pub fn router( node: Arc>, exporter: PrometheusExporter, pool: Pool>, + health: Health, ) -> Router { let app_state = Arc::new(AppState { node, exporter, pool, + health, }); Router::new() @@ -52,6 +57,7 @@ pub fn router( .route("/api/pay-invoice/:invoice", post(pay_invoice)) .route("/api/sync-on-chain", post(sync_on_chain)) .route("/metrics", get(get_metrics)) + .route("/health", get(get_health)) .with_state(app_state) } @@ -267,3 +273,14 @@ pub async fn get_metrics(State(state): State>) -> impl IntoRespons (StatusCode::OK, open_telemetry_metrics + &autometrics) } + +/// Returns 500 if any of the vital services are offline +pub async fn get_health( + State(state): State>, +) -> Result, AppError> { + let resp = state + .health + .get_health() + .map_err(|e| AppError::InternalServerError(format!("Error: {e:#}")))?; + Ok(Json(resp)) +} diff --git a/maker/src/trading/mod.rs b/maker/src/trading/mod.rs index 9f5e5b260..f4bc8539d 100644 --- a/maker/src/trading/mod.rs +++ b/maker/src/trading/mod.rs @@ -1,3 +1,4 @@ +use crate::health::ServiceStatus; use anyhow::Result; use bitcoin::secp256k1::PublicKey; use bitcoin::Network; @@ -11,6 +12,7 @@ use rust_decimal::Decimal; use rust_decimal_macros::dec; use time::Duration; use time::OffsetDateTime; +use tokio::sync::watch; use trade::Direction; use uuid::Uuid; @@ -23,6 +25,7 @@ pub async fn run( network: Network, concurrent_orders: usize, order_expiry_after: Duration, + bitmex_pricefeed_tx: watch::Sender, ) -> Result<()> { let network = match network { Network::Bitcoin => bitmex_stream::Network::Mainnet, @@ -48,6 +51,9 @@ pub async fn run( }; while let Some(quote) = price_stream.try_next().await? { + bitmex_pricefeed_tx + .send(ServiceStatus::Online) + .expect("Receiver not to be dropped"); tracing::debug!("Received new quote {quote:?}"); // Clear stale orders. They should have expired by now. @@ -66,6 +72,9 @@ pub async fn run( }; } } + bitmex_pricefeed_tx + .send(ServiceStatus::Offline) + .expect("Receiver not to be dropped"); Ok(()) }