diff --git a/crates/tests-e2e/src/maker.rs b/crates/tests-e2e/src/maker.rs index 026dcf5ca..fa7fff380 100644 --- a/crates/tests-e2e/src/maker.rs +++ b/crates/tests-e2e/src/maker.rs @@ -29,8 +29,10 @@ impl Maker { Self::new(client, "http://localhost:18000") } + /// Check whether maker is running and that it is connected to all services + /// it depends on pub async fn is_running(&self) -> bool { - self.get("/").await.is_ok() + self.get("/health").await.is_ok() } pub async fn sync_on_chain(&self) -> Result<()> { @@ -79,7 +81,7 @@ impl Maker { remote_balance, }), ) - .await?; + .await?; Ok(()) } diff --git a/crates/tests-e2e/tests/maker.rs b/crates/tests-e2e/tests/maker.rs index a335c0e20..5bba58389 100644 --- a/crates/tests-e2e/tests/maker.rs +++ b/crates/tests-e2e/tests/maker.rs @@ -15,12 +15,13 @@ async fn maker_can_open_channel_to_coordinator_and_send_payment() -> Result<()> let client = init_reqwest(); - let maker = Maker::new_local(client.clone()); - assert!(maker.is_running().await); - let coordinator = Coordinator::new_local(client.clone()); assert!(coordinator.is_running().await); + // Start maker after coordinator as its health check needs coordinator + let maker = Maker::new_local(client.clone()); + wait_until!(maker.is_running().await); + let node_info_coordinator = coordinator.get_node_info().await?; // Ensure the maker has a free UTXO available. diff --git a/maker/src/bin/maker.rs b/maker/src/bin/maker.rs index 07bd8cc8d..b2df04046 100644 --- a/maker/src/bin/maker.rs +++ b/maker/src/bin/maker.rs @@ -7,6 +7,7 @@ use ln_dlc_node::node::InMemoryStore; use ln_dlc_node::node::LnDlcNodeSettings; use ln_dlc_node::seed::Bip39Seed; use maker::cli::Opts; +use maker::health; use maker::ln::ldk_config; use maker::ln::EventHandler; use maker::logger; @@ -88,31 +89,49 @@ async fn main() -> Result<()> { let _running_node = node.start(event_handler)?; let node_pubkey = node.info.pubkey; - tokio::spawn(async move { - match trading::run( - &opts.orderbook, - node_pubkey, - network, - opts.concurrent_orders, - time::Duration::seconds(opts.order_expiry_after_seconds as i64), - ) - .await - { - Ok(()) => { - tracing::error!("Maker stopped trading"); - } - Err(error) => { - tracing::error!("Maker stopped trading: {error:#}"); + tokio::spawn({ + let orderbook = opts.orderbook.clone(); + async move { + match trading::run( + &orderbook, + node_pubkey, + network, + opts.concurrent_orders, + time::Duration::seconds(opts.order_expiry_after_seconds as i64), + ) + .await + { + Ok(()) => { + tracing::error!("Maker stopped trading"); + } + Err(error) => { + tracing::error!("Maker stopped trading: {error:#}"); + } } } }); - tokio::spawn({ + let (health, health_tx) = health::Health::new(); + + let _monitor_coordinator_status = tokio::spawn({ + let endpoint = opts.orderbook.clone(); + let client = reqwest_client(); + let interval = Duration::from_secs(10); + async move { + health::check_health_endpoint(&client, endpoint, health_tx.coordinator, interval).await; + } + }); + + // TODO: Monitor orderbook websocket stream with `health_tx.orderbook` when we subscribe to it + + let _collect_prometheus_metrics = tokio::spawn({ let node = node.clone(); + let health = health.clone(); async move { loop { let node = node.clone(); - spawn_blocking(move || metrics::collect(node)) + let health = health.clone(); + spawn_blocking(move || metrics::collect(node, health)) .await .expect("To spawn blocking thread"); tokio::time::sleep(PROCESS_PROMETHEUS_METRICS).await; @@ -128,7 +147,7 @@ async fn main() -> Result<()> { let mut conn = pool.get().expect("to get connection from pool"); run_migration(&mut conn); - let app = router(node, exporter, pool); + let app = router(node, exporter, pool, health); // Start the metrics exporter autometrics::prometheus_exporter::init(); @@ -150,3 +169,9 @@ async fn main() -> Result<()> { Ok(()) } +fn reqwest_client() -> reqwest::Client { + reqwest::Client::builder() + .timeout(std::time::Duration::from_secs(30)) + .build() + .expect("Failed to build reqwest client") +} diff --git a/maker/src/health.rs b/maker/src/health.rs new file mode 100644 index 000000000..7968264a1 --- /dev/null +++ b/maker/src/health.rs @@ -0,0 +1,120 @@ +// TODO: Add health collection metrics +// This can be later used to add health metrics to the health endpoint + +use anyhow::bail; +use anyhow::Context; +use anyhow::Result; +use reqwest::Client; +use reqwest::StatusCode; +use reqwest::Url; +use std::time::Duration; +use tokio::sync::watch; + +/// Health status of a service +#[derive(Debug, Default, Clone, Copy, PartialEq, Eq)] +pub enum ServiceStatus { + #[default] + Unknown, + Online, + Offline, +} + +/// Health monitoring for the node +/// +/// Simple endpoint querying is handled by provided configuration, for more complex health checks +/// the transmitters are exposed to be plugged in the services that need to publish their health +/// status. + +#[derive(Clone)] +pub struct Health { + /// Coordinator HTTP API status + coordinator_rx: watch::Receiver, + /// Orderbook websocket stream status + orderbook_rx: watch::Receiver, +} + +/// Transmitters that need to be plugged in the services that need to publish their health status. +pub struct Tx { + pub orderbook: watch::Sender, + pub coordinator: watch::Sender, +} + +impl Health { + pub fn new() -> (Self, Tx) { + let (orderbook_tx, orderbook_rx) = watch::channel(ServiceStatus::Unknown); + let (coordinator_tx, coordinator_rx) = watch::channel(ServiceStatus::Unknown); + + ( + Self { + coordinator_rx, + orderbook_rx, + }, + Tx { + orderbook: orderbook_tx, + coordinator: coordinator_tx, + }, + ) + } + + // TODO: Any ideas of the most useful way of sending this information? + // - perhaps a serialized struct (json) with the status of each service? + pub fn get_health(&self) -> Result { + let mut health_info = String::new(); + health_info.push_str(&format!( + "Coordinator: {:?}\n", + self.get_coordinator_status() + )); + health_info.push_str(&format!("Orderbook: {:?}\n", self.get_orderbook_status())); + + if self.get_coordinator_status() == ServiceStatus::Online + // TODO: Uncomment this line when we retrieve the orderbook status + // && (self.get_orderbook_status() == ServiceStatus::Online) + { + Ok(health_info) + } else { + bail!("Status: ERROR\n + {health_info}"); + } + } + + pub fn get_coordinator_status(&self) -> ServiceStatus { + *self.coordinator_rx.borrow() + } + + pub fn get_orderbook_status(&self) -> ServiceStatus { + *self.orderbook_rx.borrow() + } + // TODO: Add bitmex status? would anything else be useful here? +} + +/// Simple way of checking if a service is online or offline +pub async fn check_health_endpoint( + client: &Client, + endpoint: Url, + tx: watch::Sender, + interval: Duration, +) { + loop { + let status = if check_endpoint_availability(client, endpoint.clone()) + .await + .is_ok() + { + ServiceStatus::Online + } else { + ServiceStatus::Offline + }; + + tx.send(status).expect("Receiver not to be dropped"); + tokio::time::sleep(interval).await; + } +} + +async fn check_endpoint_availability(client: &Client, endpoint: Url) -> Result { + tracing::trace!(%endpoint, "Sending request to check health"); + let response = client + .get(endpoint) + .send() + .await + .context("could not send request")? + .error_for_status()?; + Ok(response.status()) +} diff --git a/maker/src/lib.rs b/maker/src/lib.rs index bbfdc2f40..4d9d2da19 100644 --- a/maker/src/lib.rs +++ b/maker/src/lib.rs @@ -7,6 +7,7 @@ use diesel_migrations::MigrationHarness; mod tests; pub mod cli; +pub mod health; pub mod ln; pub mod logger; pub mod metrics; diff --git a/maker/src/metrics.rs b/maker/src/metrics.rs index ecc8037f5..cd6535b5c 100644 --- a/maker/src/metrics.rs +++ b/maker/src/metrics.rs @@ -1,3 +1,5 @@ +use crate::health::Health; +use crate::health::ServiceStatus; use lazy_static::lazy_static; use lightning::ln::channelmanager::ChannelDetails; use ln_dlc_node::node::InMemoryStore; @@ -16,7 +18,16 @@ use std::sync::Arc; use std::time::Duration; lazy_static! { - pub static ref METER: Meter = global::meter("coordinator"); + pub static ref METER: Meter = global::meter("maker"); + + // health metrics + pub static ref COORDINATOR_STATUS: ObservableGauge = METER.u64_observable_gauge("coordinator_status") + .with_description("Coordinator status") + .init(); + + pub static ref ORDERBOOK_STATUS: ObservableGauge = METER.u64_observable_gauge("orderbook_status") + .with_description("Orderbook status") + .init(); // channel details metrics pub static ref CHANNEL_BALANCE_SATOSHI: ObservableGauge = METER @@ -58,12 +69,42 @@ pub fn init_meter() -> PrometheusExporter { opentelemetry_prometheus::exporter(controller).init() } -pub fn collect(node: Arc>) { +pub fn collect(node: Arc>, health: Health) { let cx = opentelemetry::Context::current(); let channels = node.channel_manager.list_channels(); channel_metrics(&cx, channels); node_metrics(&cx, node); + health_metrics(&cx, &health); +} + +fn health_metrics(cx: &Context, health: &Health) { + update_health_metric(cx, health.get_coordinator_status(), &COORDINATOR_STATUS); + update_health_metric(cx, health.get_orderbook_status(), &ORDERBOOK_STATUS); +} + +/// Updates the health metric given a service status +fn update_health_metric(cx: &Context, service_status: ServiceStatus, gauge: &ObservableGauge) { + match service_status { + ServiceStatus::Unknown => { + gauge.observe(cx, 0, &[KeyValue::new("status", "unknown")]); + } + ServiceStatus::Online => { + gauge.observe(cx, 1, &[KeyValue::new("status", "online")]); + } + ServiceStatus::Offline => { + gauge.observe(cx, 0, &[KeyValue::new("status", "offline")]); + } + } + + // let _ = check_endpoint_availability(client, "http://localhost:8080/health").await.map(|status| match status { + // StatusCode::OK => { + // COORDINATOR_STATUS.observe(cx, 1, &[KeyValue::new("status", "ok")]); + // } + // _ => { + // COORDINATOR_STATUS.observe(cx, 0, &[KeyValue::new("status", "error")]); + // } + // }); } fn channel_metrics(cx: &Context, channels: Vec) { diff --git a/maker/src/routes.rs b/maker/src/routes.rs index d7d0a72c4..360dab0f7 100644 --- a/maker/src/routes.rs +++ b/maker/src/routes.rs @@ -1,3 +1,4 @@ +use crate::health::Health; use axum::extract::Path; use axum::extract::State; use axum::http::StatusCode; @@ -29,17 +30,20 @@ pub struct AppState { pub node: Arc>, pub exporter: PrometheusExporter, pub pool: Pool>, + pub health: Health, } pub fn router( node: Arc>, exporter: PrometheusExporter, pool: Pool>, + health: Health, ) -> Router { let app_state = Arc::new(AppState { node, exporter, pool, + health, }); Router::new() @@ -52,6 +56,7 @@ pub fn router( .route("/api/pay-invoice/:invoice", post(pay_invoice)) .route("/api/sync-on-chain", post(sync_on_chain)) .route("/metrics", get(get_metrics)) + .route("/health", get(get_health)) .with_state(app_state) } @@ -267,3 +272,12 @@ pub async fn get_metrics(State(state): State>) -> impl IntoRespons (StatusCode::OK, open_telemetry_metrics + &autometrics) } + +/// Returns 500 if any of the vital services are offline +pub async fn get_health(State(state): State>) -> Result, AppError> { + let resp = state + .health + .get_health() + .map_err(|e| AppError::InternalServerError(format!("Error: {e:#}")))?; + Ok(Json(resp)) +}