Skip to content

Commit

Permalink
feat(maker): Monitor and expose status of services connected
Browse files Browse the repository at this point in the history
Monitor the following services:
    - coordinator
    - orderbook (we don't subscribe to orderbook feed yet)

in order to derive health status of the maker.

Health status is exposed via the HTTP API as well as reported as
dedicated prometheus metrics.

TODO: Subscribe to orderbook websocket stream to be able to detect if it is down
TODO: Subcribe to bitmex client stream to be able to spot outdated information

almost there, just exposing HTTP API

I still need to show quickly how to return different HTTP errors based on health status
  • Loading branch information
klochowicz committed Aug 29, 2023
1 parent 1df5c45 commit fc43972
Show file tree
Hide file tree
Showing 7 changed files with 229 additions and 25 deletions.
6 changes: 4 additions & 2 deletions crates/tests-e2e/src/maker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,10 @@ impl Maker {
Self::new(client, "http://localhost:18000")
}

/// Check whether maker is running and that it is connected to all services
/// it depends on
pub async fn is_running(&self) -> bool {
self.get("/").await.is_ok()
self.get("/health").await.is_ok()
}

pub async fn sync_on_chain(&self) -> Result<()> {
Expand Down Expand Up @@ -79,7 +81,7 @@ impl Maker {
remote_balance,
}),
)
.await?;
.await?;

Ok(())
}
Expand Down
7 changes: 4 additions & 3 deletions crates/tests-e2e/tests/maker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,13 @@ async fn maker_can_open_channel_to_coordinator_and_send_payment() -> Result<()>

let client = init_reqwest();

let maker = Maker::new_local(client.clone());
assert!(maker.is_running().await);

let coordinator = Coordinator::new_local(client.clone());
assert!(coordinator.is_running().await);

// Start maker after coordinator as its health check needs coordinator
let maker = Maker::new_local(client.clone());
wait_until!(maker.is_running().await);

let node_info_coordinator = coordinator.get_node_info().await?;

// Ensure the maker has a free UTXO available.
Expand Down
61 changes: 43 additions & 18 deletions maker/src/bin/maker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use ln_dlc_node::node::InMemoryStore;
use ln_dlc_node::node::LnDlcNodeSettings;
use ln_dlc_node::seed::Bip39Seed;
use maker::cli::Opts;
use maker::health;
use maker::ln::ldk_config;
use maker::ln::EventHandler;
use maker::logger;
Expand Down Expand Up @@ -88,31 +89,49 @@ async fn main() -> Result<()> {
let _running_node = node.start(event_handler)?;

let node_pubkey = node.info.pubkey;
tokio::spawn(async move {
match trading::run(
&opts.orderbook,
node_pubkey,
network,
opts.concurrent_orders,
time::Duration::seconds(opts.order_expiry_after_seconds as i64),
)
.await
{
Ok(()) => {
tracing::error!("Maker stopped trading");
}
Err(error) => {
tracing::error!("Maker stopped trading: {error:#}");
tokio::spawn({
let orderbook = opts.orderbook.clone();
async move {
match trading::run(
&orderbook,
node_pubkey,
network,
opts.concurrent_orders,
time::Duration::seconds(opts.order_expiry_after_seconds as i64),
)
.await
{
Ok(()) => {
tracing::error!("Maker stopped trading");
}
Err(error) => {
tracing::error!("Maker stopped trading: {error:#}");
}
}
}
});

tokio::spawn({
let (health, health_tx) = health::Health::new();

let _monitor_coordinator_status = tokio::spawn({
let endpoint = opts.orderbook.clone();
let client = reqwest_client();
let interval = Duration::from_secs(10);
async move {
health::check_health_endpoint(&client, endpoint, health_tx.coordinator, interval).await;
}
});

// TODO: Monitor orderbook websocket stream with `health_tx.orderbook` when we subscribe to it

let _collect_prometheus_metrics = tokio::spawn({
let node = node.clone();
let health = health.clone();
async move {
loop {
let node = node.clone();
spawn_blocking(move || metrics::collect(node))
let health = health.clone();
spawn_blocking(move || metrics::collect(node, health))
.await
.expect("To spawn blocking thread");
tokio::time::sleep(PROCESS_PROMETHEUS_METRICS).await;
Expand All @@ -128,7 +147,7 @@ async fn main() -> Result<()> {
let mut conn = pool.get().expect("to get connection from pool");
run_migration(&mut conn);

let app = router(node, exporter, pool);
let app = router(node, exporter, pool, health);

// Start the metrics exporter
autometrics::prometheus_exporter::init();
Expand All @@ -150,3 +169,9 @@ async fn main() -> Result<()> {

Ok(())
}
fn reqwest_client() -> reqwest::Client {
reqwest::Client::builder()
.timeout(std::time::Duration::from_secs(30))
.build()
.expect("Failed to build reqwest client")
}
120 changes: 120 additions & 0 deletions maker/src/health.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
// TODO: Add health collection metrics
// This can be later used to add health metrics to the health endpoint

use anyhow::bail;
use anyhow::Context;
use anyhow::Result;
use reqwest::Client;
use reqwest::StatusCode;
use reqwest::Url;
use std::time::Duration;
use tokio::sync::watch;

/// Health status of a service
#[derive(Debug, Default, Clone, Copy, PartialEq, Eq)]
pub enum ServiceStatus {
#[default]
Unknown,
Online,
Offline,
}

/// Health monitoring for the node
///
/// Simple endpoint querying is handled by provided configuration, for more complex health checks
/// the transmitters are exposed to be plugged in the services that need to publish their health
/// status.
#[derive(Clone)]
pub struct Health {
/// Coordinator HTTP API status
coordinator_rx: watch::Receiver<ServiceStatus>,
/// Orderbook websocket stream status
orderbook_rx: watch::Receiver<ServiceStatus>,
}

/// Transmitters that need to be plugged in the services that need to publish their health status.
pub struct Tx {
pub orderbook: watch::Sender<ServiceStatus>,
pub coordinator: watch::Sender<ServiceStatus>,
}

impl Health {
pub fn new() -> (Self, Tx) {
let (orderbook_tx, orderbook_rx) = watch::channel(ServiceStatus::Unknown);
let (coordinator_tx, coordinator_rx) = watch::channel(ServiceStatus::Unknown);

(
Self {
coordinator_rx,
orderbook_rx,
},
Tx {
orderbook: orderbook_tx,
coordinator: coordinator_tx,
},
)
}

// TODO: Any ideas of the most useful way of sending this information?
// - perhaps a serialized struct (json) with the status of each service?
pub fn get_health(&self) -> Result<String> {
let mut health_info = String::new();
health_info.push_str(&format!(
"Coordinator: {:?}\n",
self.get_coordinator_status()
));
health_info.push_str(&format!("Orderbook: {:?}\n", self.get_orderbook_status()));

if self.get_coordinator_status() == ServiceStatus::Online
// TODO: Uncomment this line when we retrieve the orderbook status
// && (self.get_orderbook_status() == ServiceStatus::Online)
{
Ok(health_info)
} else {
bail!("Status: ERROR\n + {health_info}");
}
}

pub fn get_coordinator_status(&self) -> ServiceStatus {
*self.coordinator_rx.borrow()
}

pub fn get_orderbook_status(&self) -> ServiceStatus {
*self.orderbook_rx.borrow()
}
// TODO: Add bitmex status? would anything else be useful here?
}

/// Simple way of checking if a service is online or offline
pub async fn check_health_endpoint(
client: &Client,
endpoint: Url,
tx: watch::Sender<ServiceStatus>,
interval: Duration,
) {
loop {
let status = if check_endpoint_availability(client, endpoint.clone())
.await
.is_ok()
{
ServiceStatus::Online
} else {
ServiceStatus::Offline
};

tx.send(status).expect("Receiver not to be dropped");
tokio::time::sleep(interval).await;
}
}

async fn check_endpoint_availability(client: &Client, endpoint: Url) -> Result<StatusCode> {
tracing::trace!(%endpoint, "Sending request to check health");
let response = client
.get(endpoint)
.send()
.await
.context("could not send request")?
.error_for_status()?;
Ok(response.status())
}
1 change: 1 addition & 0 deletions maker/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use diesel_migrations::MigrationHarness;
mod tests;

pub mod cli;
pub mod health;
pub mod ln;
pub mod logger;
pub mod metrics;
Expand Down
45 changes: 43 additions & 2 deletions maker/src/metrics.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
use crate::health::Health;
use crate::health::ServiceStatus;
use lazy_static::lazy_static;
use lightning::ln::channelmanager::ChannelDetails;
use ln_dlc_node::node::InMemoryStore;
Expand All @@ -16,7 +18,16 @@ use std::sync::Arc;
use std::time::Duration;

lazy_static! {
pub static ref METER: Meter = global::meter("coordinator");
pub static ref METER: Meter = global::meter("maker");

// health metrics
pub static ref COORDINATOR_STATUS: ObservableGauge<u64> = METER.u64_observable_gauge("coordinator_status")
.with_description("Coordinator status")
.init();

pub static ref ORDERBOOK_STATUS: ObservableGauge<u64> = METER.u64_observable_gauge("orderbook_status")
.with_description("Orderbook status")
.init();

// channel details metrics
pub static ref CHANNEL_BALANCE_SATOSHI: ObservableGauge<u64> = METER
Expand Down Expand Up @@ -58,12 +69,42 @@ pub fn init_meter() -> PrometheusExporter {
opentelemetry_prometheus::exporter(controller).init()
}

pub fn collect(node: Arc<Node<InMemoryStore>>) {
pub fn collect(node: Arc<Node<InMemoryStore>>, health: Health) {
let cx = opentelemetry::Context::current();

let channels = node.channel_manager.list_channels();
channel_metrics(&cx, channels);
node_metrics(&cx, node);
health_metrics(&cx, &health);
}

fn health_metrics(cx: &Context, health: &Health) {
update_health_metric(cx, health.get_coordinator_status(), &COORDINATOR_STATUS);
update_health_metric(cx, health.get_orderbook_status(), &ORDERBOOK_STATUS);
}

/// Updates the health metric given a service status
fn update_health_metric(cx: &Context, service_status: ServiceStatus, gauge: &ObservableGauge<u64>) {
match service_status {
ServiceStatus::Unknown => {
gauge.observe(cx, 0, &[KeyValue::new("status", "unknown")]);
}
ServiceStatus::Online => {
gauge.observe(cx, 1, &[KeyValue::new("status", "online")]);
}
ServiceStatus::Offline => {
gauge.observe(cx, 0, &[KeyValue::new("status", "offline")]);
}
}

// let _ = check_endpoint_availability(client, "http://localhost:8080/health").await.map(|status| match status {
// StatusCode::OK => {
// COORDINATOR_STATUS.observe(cx, 1, &[KeyValue::new("status", "ok")]);
// }
// _ => {
// COORDINATOR_STATUS.observe(cx, 0, &[KeyValue::new("status", "error")]);
// }
// });
}

fn channel_metrics(cx: &Context, channels: Vec<ChannelDetails>) {
Expand Down
14 changes: 14 additions & 0 deletions maker/src/routes.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use crate::health::Health;
use axum::extract::Path;
use axum::extract::State;
use axum::http::StatusCode;
Expand Down Expand Up @@ -29,17 +30,20 @@ pub struct AppState {
pub node: Arc<Node<InMemoryStore>>,
pub exporter: PrometheusExporter,
pub pool: Pool<ConnectionManager<PgConnection>>,
pub health: Health,
}

pub fn router(
node: Arc<Node<InMemoryStore>>,
exporter: PrometheusExporter,
pool: Pool<ConnectionManager<PgConnection>>,
health: Health,
) -> Router {
let app_state = Arc::new(AppState {
node,
exporter,
pool,
health,
});

Router::new()
Expand All @@ -52,6 +56,7 @@ pub fn router(
.route("/api/pay-invoice/:invoice", post(pay_invoice))
.route("/api/sync-on-chain", post(sync_on_chain))
.route("/metrics", get(get_metrics))
.route("/health", get(get_health))
.with_state(app_state)
}

Expand Down Expand Up @@ -267,3 +272,12 @@ pub async fn get_metrics(State(state): State<Arc<AppState>>) -> impl IntoRespons

(StatusCode::OK, open_telemetry_metrics + &autometrics)
}

/// Returns 500 if any of the vital services are offline
pub async fn get_health(State(state): State<Arc<AppState>>) -> Result<Json<String>, AppError> {
let resp = state
.health
.get_health()
.map_err(|e| AppError::InternalServerError(format!("Error: {e:#}")))?;
Ok(Json(resp))
}

0 comments on commit fc43972

Please sign in to comment.