Skip to content

Commit

Permalink
feat(maker): Monitor and expose status of connected services
Browse files Browse the repository at this point in the history
Monitor the following services:
    - coordinator
    - orderbook (as we don't subscribe to orderbook feed yet, so it's hardcoded to "online")
    - bitmex pricefeed

in order to derive health status of the maker.

Health status is exposed via the HTTP API as well as reported as
dedicated prometheus metrics.

TODO: Subscribe to orderbook websocket stream to be able to detect if it is down
TODO: Subcribe to bitmex client stream to be able to spot outdated information
  • Loading branch information
klochowicz committed Sep 13, 2023
1 parent ffb7766 commit b9b6fec
Show file tree
Hide file tree
Showing 8 changed files with 260 additions and 24 deletions.
6 changes: 5 additions & 1 deletion crates/tests-e2e/src/maker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,12 @@ impl Maker {
Self::new(client, "http://localhost:18000")
}

/// Check whether maker is running and that it is connected to all services
/// it depends on
pub async fn is_running(&self) -> bool {
self.get("/").await.is_ok()
self.get("/health")
.await
.is_ok_and(|r| r.status().is_success())
}

pub async fn sync_on_chain(&self) -> Result<()> {
Expand Down
7 changes: 4 additions & 3 deletions crates/tests-e2e/tests/maker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,13 @@ async fn maker_can_open_channel_to_coordinator_and_send_payment() -> Result<()>

let client = init_reqwest();

let maker = Maker::new_local(client.clone());
assert!(maker.is_running().await);

let coordinator = Coordinator::new_local(client.clone());
assert!(coordinator.is_running().await);

// Start maker after coordinator as its health check needs coordinator
let maker = Maker::new_local(client.clone());
wait_until!(maker.is_running().await);

let node_info_coordinator = coordinator.get_node_info().await?;

// Ensure the maker has a free UTXO available.
Expand Down
66 changes: 48 additions & 18 deletions maker/src/bin/maker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use ln_dlc_node::node::InMemoryStore;
use ln_dlc_node::node::LnDlcNodeSettings;
use ln_dlc_node::seed::Bip39Seed;
use maker::cli::Opts;
use maker::health;
use maker::ln::ldk_config;
use maker::ln::EventHandler;
use maker::logger;
Expand Down Expand Up @@ -87,32 +88,55 @@ async fn main() -> Result<()> {
let event_handler = EventHandler::new(node.clone());
let _running_node = node.start(event_handler)?;

let (health, health_tx) = health::Health::new();

let node_pubkey = node.info.pubkey;
tokio::spawn(async move {
match trading::run(
&opts.orderbook,
node_pubkey,
network,
opts.concurrent_orders,
time::Duration::seconds(opts.order_expiry_after_seconds as i64),
)
.await
{
Ok(()) => {
tracing::error!("Maker stopped trading");
}
Err(error) => {
tracing::error!("Maker stopped trading: {error:#}");
tokio::spawn({
let orderbook = opts.orderbook.clone();
async move {
match trading::run(
&orderbook,
node_pubkey,
network,
opts.concurrent_orders,
time::Duration::seconds(opts.order_expiry_after_seconds as i64),
health_tx.bitmex_pricefeed,
)
.await
{
Ok(()) => {
tracing::error!("Maker stopped trading");
}
Err(error) => {
tracing::error!("Maker stopped trading: {error:#}");
}
}
}
});

tokio::spawn({
let _monitor_coordinator_status = tokio::spawn({
let endpoint = opts.orderbook.clone();
let client = reqwest_client();
let interval = Duration::from_secs(10);
async move {
health::check_health_endpoint(&client, endpoint, health_tx.coordinator, interval).await;
}
});

// TODO: Monitor orderbook websocket stream with `health_tx.orderbook` when we subscribe to it
health_tx
.orderbook
.send(health::ServiceStatus::Online)
.expect("to be able to send");

let _collect_prometheus_metrics = tokio::spawn({
let node = node.clone();
let health = health.clone();
async move {
loop {
let node = node.clone();
spawn_blocking(move || metrics::collect(node))
let health = health.clone();
spawn_blocking(move || metrics::collect(node, health))
.await
.expect("To spawn blocking thread");
tokio::time::sleep(PROCESS_PROMETHEUS_METRICS).await;
Expand All @@ -128,7 +152,7 @@ async fn main() -> Result<()> {
let mut conn = pool.get().expect("to get connection from pool");
run_migration(&mut conn);

let app = router(node, exporter, pool);
let app = router(node, exporter, pool, health);

// Start the metrics exporter
autometrics::prometheus_exporter::init();
Expand All @@ -150,3 +174,9 @@ async fn main() -> Result<()> {

Ok(())
}
fn reqwest_client() -> reqwest::Client {
reqwest::Client::builder()
.timeout(std::time::Duration::from_secs(30))
.build()
.expect("Failed to build reqwest client")
}
138 changes: 138 additions & 0 deletions maker/src/health.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
use anyhow::bail;
use anyhow::Context;
use anyhow::Result;
use reqwest::Client;
use reqwest::StatusCode;
use reqwest::Url;
use serde::Serialize;
use std::time::Duration;
use tokio::sync::watch;

/// Health status of a service
#[derive(Debug, Default, Clone, Copy, PartialEq, Eq, Serialize)]
pub enum ServiceStatus {
#[default]
Unknown,
Online,
Offline,
}

/// Health monitoring for the node
///
/// Simple endpoint querying is handled by provided configuration, for more complex health checks
/// the transmitters are exposed to be plugged in the services that need to publish their health
/// status.
#[derive(Clone)]
pub struct Health {
/// Coordinator HTTP API status
coordinator_rx: watch::Receiver<ServiceStatus>,
/// Orderbook websocket stream status
orderbook_rx: watch::Receiver<ServiceStatus>,
/// Bitmex pricefeed stream status
bitmex_pricefeed_rx: watch::Receiver<ServiceStatus>,
}

/// Transmitters that need to be plugged in the services that need to publish their health status.
pub struct Tx {
pub orderbook: watch::Sender<ServiceStatus>,
pub coordinator: watch::Sender<ServiceStatus>,
pub bitmex_pricefeed: watch::Sender<ServiceStatus>,
}

/// Struct returned by maker's health endpoint.
#[derive(Debug, Serialize)]
pub struct OverallMakerHealth {
coordinator: ServiceStatus,
orderbook: ServiceStatus,
bitmex_pricefeed: ServiceStatus,
}

impl OverallMakerHealth {
pub fn is_healthy(&self) -> bool {
self.coordinator == ServiceStatus::Online
&& self.bitmex_pricefeed == ServiceStatus::Online
&& self.orderbook == ServiceStatus::Online
}
}

impl Health {
pub fn new() -> (Self, Tx) {
let (orderbook_tx, orderbook_rx) = watch::channel(ServiceStatus::Unknown);
let (coordinator_tx, coordinator_rx) = watch::channel(ServiceStatus::Unknown);
let (bitmex_pricefeed_tx, bitmex_pricefeed_rx) = watch::channel(ServiceStatus::Unknown);

(
Self {
coordinator_rx,
orderbook_rx,
bitmex_pricefeed_rx,
},
Tx {
orderbook: orderbook_tx,
coordinator: coordinator_tx,
bitmex_pricefeed: bitmex_pricefeed_tx,
},
)
}

pub fn get_health(&self) -> Result<OverallMakerHealth> {
let health_info = OverallMakerHealth {
coordinator: self.get_coordinator_status(),
orderbook: self.get_orderbook_status(),
bitmex_pricefeed: self.get_bitmex_pricefeed_status(),
};

match health_info.is_healthy() {
true => Ok(health_info),
false => {
bail!("Status: ERROR\n + {health_info:?}");
}
}
}

pub fn get_coordinator_status(&self) -> ServiceStatus {
*self.coordinator_rx.borrow()
}

pub fn get_orderbook_status(&self) -> ServiceStatus {
*self.orderbook_rx.borrow()
}

pub fn get_bitmex_pricefeed_status(&self) -> ServiceStatus {
*self.bitmex_pricefeed_rx.borrow()
}
}

/// Simple way of checking if a service is online or offline
pub async fn check_health_endpoint(
client: &Client,
endpoint: Url,
tx: watch::Sender<ServiceStatus>,
interval: Duration,
) {
loop {
let status = if check_endpoint_availability(client, endpoint.clone())
.await
.is_ok()
{
ServiceStatus::Online
} else {
ServiceStatus::Offline
};

tx.send(status).expect("Receiver not to be dropped");
tokio::time::sleep(interval).await;
}
}

async fn check_endpoint_availability(client: &Client, endpoint: Url) -> Result<StatusCode> {
tracing::trace!(%endpoint, "Sending request to check health");
let response = client
.get(endpoint)
.send()
.await
.context("could not send request")?
.error_for_status()?;
Ok(response.status())
}
1 change: 1 addition & 0 deletions maker/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use diesel_migrations::MigrationHarness;
mod tests;

pub mod cli;
pub mod health;
pub mod ln;
pub mod logger;
pub mod metrics;
Expand Down
40 changes: 38 additions & 2 deletions maker/src/metrics.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
use crate::health::Health;
use crate::health::ServiceStatus;
use lazy_static::lazy_static;
use lightning::ln::channelmanager::ChannelDetails;
use ln_dlc_node::node::InMemoryStore;
Expand All @@ -16,7 +18,20 @@ use std::sync::Arc;
use std::time::Duration;

lazy_static! {
pub static ref METER: Meter = global::meter("coordinator");
pub static ref METER: Meter = global::meter("maker");

// health metrics
pub static ref COORDINATOR_STATUS: ObservableGauge<u64> = METER.u64_observable_gauge("coordinator_status")
.with_description("Coordinator status")
.init();

pub static ref ORDERBOOK_STATUS: ObservableGauge<u64> = METER.u64_observable_gauge("orderbook_status")
.with_description("Orderbook status")
.init();

pub static ref BITMEX_PRICEFEED_STATUS: ObservableGauge<u64> = METER.u64_observable_gauge("bitmex_pricefeed_status")
.with_description("Bitmex pricefeed status")
.init();

// channel details metrics
pub static ref CHANNEL_BALANCE_SATOSHI: ObservableGauge<u64> = METER
Expand Down Expand Up @@ -58,12 +73,33 @@ pub fn init_meter() -> PrometheusExporter {
opentelemetry_prometheus::exporter(controller).init()
}

pub fn collect(node: Arc<Node<InMemoryStore>>) {
pub fn collect(node: Arc<Node<InMemoryStore>>, health: Health) {
let cx = opentelemetry::Context::current();

let channels = node.channel_manager.list_channels();
channel_metrics(&cx, channels);
node_metrics(&cx, node);
health_metrics(&cx, &health);
}

fn health_metrics(cx: &Context, health: &Health) {
update_health_metric(cx, health.get_coordinator_status(), &COORDINATOR_STATUS);
update_health_metric(cx, health.get_orderbook_status(), &ORDERBOOK_STATUS);
update_health_metric(
cx,
health.get_bitmex_pricefeed_status(),
&BITMEX_PRICEFEED_STATUS,
);
}

/// Updates the health metric given a service status
fn update_health_metric(cx: &Context, service_status: ServiceStatus, gauge: &ObservableGauge<u64>) {
let value = match service_status {
ServiceStatus::Offline => 0,
ServiceStatus::Online => 1,
ServiceStatus::Unknown => 2,
};
gauge.observe(cx, value, &[]);
}

fn channel_metrics(cx: &Context, channels: Vec<ChannelDetails>) {
Expand Down
17 changes: 17 additions & 0 deletions maker/src/routes.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
use crate::health::Health;
use crate::health::OverallMakerHealth;
use axum::extract::Path;
use axum::extract::State;
use axum::http::StatusCode;
Expand Down Expand Up @@ -29,17 +31,20 @@ pub struct AppState {
pub node: Arc<Node<InMemoryStore>>,
pub exporter: PrometheusExporter,
pub pool: Pool<ConnectionManager<PgConnection>>,
pub health: Health,
}

pub fn router(
node: Arc<Node<InMemoryStore>>,
exporter: PrometheusExporter,
pool: Pool<ConnectionManager<PgConnection>>,
health: Health,
) -> Router {
let app_state = Arc::new(AppState {
node,
exporter,
pool,
health,
});

Router::new()
Expand All @@ -52,6 +57,7 @@ pub fn router(
.route("/api/pay-invoice/:invoice", post(pay_invoice))
.route("/api/sync-on-chain", post(sync_on_chain))
.route("/metrics", get(get_metrics))
.route("/health", get(get_health))
.with_state(app_state)
}

Expand Down Expand Up @@ -267,3 +273,14 @@ pub async fn get_metrics(State(state): State<Arc<AppState>>) -> impl IntoRespons

(StatusCode::OK, open_telemetry_metrics + &autometrics)
}

/// Returns 500 if any of the vital services are offline
pub async fn get_health(
State(state): State<Arc<AppState>>,
) -> Result<Json<OverallMakerHealth>, AppError> {
let resp = state
.health
.get_health()
.map_err(|e| AppError::InternalServerError(format!("Error: {e:#}")))?;
Ok(Json(resp))
}
Loading

0 comments on commit b9b6fec

Please sign in to comment.