Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(maker): Monitor and expose status of services connected #1180

Merged
merged 1 commit into from
Sep 13, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion crates/tests-e2e/src/maker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,12 @@ impl Maker {
Self::new(client, "http://localhost:18000")
}

/// Check whether maker is running and that it is connected to all services
/// it depends on
pub async fn is_running(&self) -> bool {
self.get("/").await.is_ok()
self.get("/health")
.await
.is_ok_and(|r| r.status().is_success())
}

pub async fn sync_on_chain(&self) -> Result<()> {
Expand Down
7 changes: 4 additions & 3 deletions crates/tests-e2e/tests/maker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,13 @@ async fn maker_can_open_channel_to_coordinator_and_send_payment() -> Result<()>

let client = init_reqwest();

let maker = Maker::new_local(client.clone());
assert!(maker.is_running().await);

let coordinator = Coordinator::new_local(client.clone());
assert!(coordinator.is_running().await);

// Start maker after coordinator as its health check needs coordinator
let maker = Maker::new_local(client.clone());
wait_until!(maker.is_running().await);

let node_info_coordinator = coordinator.get_node_info().await?;

// Ensure the maker has a free UTXO available.
Expand Down
66 changes: 48 additions & 18 deletions maker/src/bin/maker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use ln_dlc_node::node::InMemoryStore;
use ln_dlc_node::node::LnDlcNodeSettings;
use ln_dlc_node::seed::Bip39Seed;
use maker::cli::Opts;
use maker::health;
use maker::ln::ldk_config;
use maker::ln::EventHandler;
use maker::logger;
Expand Down Expand Up @@ -87,32 +88,55 @@ async fn main() -> Result<()> {
let event_handler = EventHandler::new(node.clone());
let _running_node = node.start(event_handler)?;

let (health, health_tx) = health::Health::new();

let node_pubkey = node.info.pubkey;
tokio::spawn(async move {
match trading::run(
&opts.orderbook,
node_pubkey,
network,
opts.concurrent_orders,
time::Duration::seconds(opts.order_expiry_after_seconds as i64),
)
.await
{
Ok(()) => {
tracing::error!("Maker stopped trading");
}
Err(error) => {
tracing::error!("Maker stopped trading: {error:#}");
tokio::spawn({
let orderbook = opts.orderbook.clone();
async move {
match trading::run(
&orderbook,
node_pubkey,
network,
opts.concurrent_orders,
time::Duration::seconds(opts.order_expiry_after_seconds as i64),
health_tx.bitmex_pricefeed,
)
.await
{
Ok(()) => {
tracing::error!("Maker stopped trading");
}
Err(error) => {
tracing::error!("Maker stopped trading: {error:#}");
}
}
}
});

tokio::spawn({
let _monitor_coordinator_status = tokio::spawn({
let endpoint = opts.orderbook.clone();
let client = reqwest_client();
let interval = Duration::from_secs(10);
async move {
health::check_health_endpoint(&client, endpoint, health_tx.coordinator, interval).await;
}
});

// TODO: Monitor orderbook websocket stream with `health_tx.orderbook` when we subscribe to it
health_tx
.orderbook
.send(health::ServiceStatus::Online)
.expect("to be able to send");

let _collect_prometheus_metrics = tokio::spawn({
let node = node.clone();
let health = health.clone();
async move {
loop {
let node = node.clone();
spawn_blocking(move || metrics::collect(node))
let health = health.clone();
spawn_blocking(move || metrics::collect(node, health))
.await
.expect("To spawn blocking thread");
tokio::time::sleep(PROCESS_PROMETHEUS_METRICS).await;
Expand All @@ -128,7 +152,7 @@ async fn main() -> Result<()> {
let mut conn = pool.get().expect("to get connection from pool");
run_migration(&mut conn);

let app = router(node, exporter, pool);
let app = router(node, exporter, pool, health);

// Start the metrics exporter
autometrics::prometheus_exporter::init();
Expand All @@ -150,3 +174,9 @@ async fn main() -> Result<()> {

Ok(())
}
fn reqwest_client() -> reqwest::Client {
reqwest::Client::builder()
.timeout(std::time::Duration::from_secs(30))
.build()
.expect("Failed to build reqwest client")
}
138 changes: 138 additions & 0 deletions maker/src/health.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
use anyhow::bail;
use anyhow::Context;
use anyhow::Result;
use reqwest::Client;
use reqwest::StatusCode;
use reqwest::Url;
use serde::Serialize;
use std::time::Duration;
use tokio::sync::watch;

/// Health status of a service
#[derive(Debug, Default, Clone, Copy, PartialEq, Eq, Serialize)]
pub enum ServiceStatus {
#[default]
Unknown,
Online,
Offline,
}

/// Health monitoring for the node
///
/// Simple endpoint querying is handled by provided configuration, for more complex health checks
/// the transmitters are exposed to be plugged in the services that need to publish their health
/// status.

#[derive(Clone)]
pub struct Health {
/// Coordinator HTTP API status
coordinator_rx: watch::Receiver<ServiceStatus>,
/// Orderbook websocket stream status
orderbook_rx: watch::Receiver<ServiceStatus>,
/// Bitmex pricefeed stream status
bitmex_pricefeed_rx: watch::Receiver<ServiceStatus>,
}

/// Transmitters that need to be plugged in the services that need to publish their health status.
pub struct Tx {
pub orderbook: watch::Sender<ServiceStatus>,
pub coordinator: watch::Sender<ServiceStatus>,
pub bitmex_pricefeed: watch::Sender<ServiceStatus>,
}

/// Struct returned by maker's health endpoint.
#[derive(Debug, Serialize)]
pub struct OverallMakerHealth {
coordinator: ServiceStatus,
orderbook: ServiceStatus,
bitmex_pricefeed: ServiceStatus,
}

impl OverallMakerHealth {
pub fn is_healthy(&self) -> bool {
self.coordinator == ServiceStatus::Online
&& self.bitmex_pricefeed == ServiceStatus::Online
&& self.orderbook == ServiceStatus::Online
}
}

impl Health {
pub fn new() -> (Self, Tx) {
let (orderbook_tx, orderbook_rx) = watch::channel(ServiceStatus::Unknown);
let (coordinator_tx, coordinator_rx) = watch::channel(ServiceStatus::Unknown);
let (bitmex_pricefeed_tx, bitmex_pricefeed_rx) = watch::channel(ServiceStatus::Unknown);

(
Self {
coordinator_rx,
orderbook_rx,
bitmex_pricefeed_rx,
},
Tx {
orderbook: orderbook_tx,
coordinator: coordinator_tx,
bitmex_pricefeed: bitmex_pricefeed_tx,
},
)
}

pub fn get_health(&self) -> Result<OverallMakerHealth> {
let health_info = OverallMakerHealth {
coordinator: self.get_coordinator_status(),
orderbook: self.get_orderbook_status(),
bitmex_pricefeed: self.get_bitmex_pricefeed_status(),
};

match health_info.is_healthy() {
true => Ok(health_info),
false => {
bail!("Status: ERROR\n + {health_info:?}");
}
}
}

pub fn get_coordinator_status(&self) -> ServiceStatus {
*self.coordinator_rx.borrow()
}

pub fn get_orderbook_status(&self) -> ServiceStatus {
*self.orderbook_rx.borrow()
}

pub fn get_bitmex_pricefeed_status(&self) -> ServiceStatus {
*self.bitmex_pricefeed_rx.borrow()
}
}

/// Simple way of checking if a service is online or offline
pub async fn check_health_endpoint(
client: &Client,
endpoint: Url,
tx: watch::Sender<ServiceStatus>,
interval: Duration,
) {
loop {
let status = if check_endpoint_availability(client, endpoint.clone())
.await
.is_ok()
{
ServiceStatus::Online
} else {
ServiceStatus::Offline
};

tx.send(status).expect("Receiver not to be dropped");
tokio::time::sleep(interval).await;
}
}

async fn check_endpoint_availability(client: &Client, endpoint: Url) -> Result<StatusCode> {
tracing::trace!(%endpoint, "Sending request to check health");
let response = client
.get(endpoint)
.send()
.await
.context("could not send request")?
.error_for_status()?;
Ok(response.status())
}
1 change: 1 addition & 0 deletions maker/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use diesel_migrations::MigrationHarness;
mod tests;

pub mod cli;
pub mod health;
pub mod ln;
pub mod logger;
pub mod metrics;
Expand Down
40 changes: 38 additions & 2 deletions maker/src/metrics.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
use crate::health::Health;
use crate::health::ServiceStatus;
use lazy_static::lazy_static;
use lightning::ln::channelmanager::ChannelDetails;
use ln_dlc_node::node::InMemoryStore;
Expand All @@ -16,7 +18,20 @@ use std::sync::Arc;
use std::time::Duration;

lazy_static! {
pub static ref METER: Meter = global::meter("coordinator");
pub static ref METER: Meter = global::meter("maker");

// health metrics
pub static ref COORDINATOR_STATUS: ObservableGauge<u64> = METER.u64_observable_gauge("coordinator_status")
.with_description("Coordinator status")
.init();

pub static ref ORDERBOOK_STATUS: ObservableGauge<u64> = METER.u64_observable_gauge("orderbook_status")
.with_description("Orderbook status")
.init();

pub static ref BITMEX_PRICEFEED_STATUS: ObservableGauge<u64> = METER.u64_observable_gauge("bitmex_pricefeed_status")
.with_description("Bitmex pricefeed status")
.init();

// channel details metrics
pub static ref CHANNEL_BALANCE_SATOSHI: ObservableGauge<u64> = METER
Expand Down Expand Up @@ -58,12 +73,33 @@ pub fn init_meter() -> PrometheusExporter {
opentelemetry_prometheus::exporter(controller).init()
}

pub fn collect(node: Arc<Node<InMemoryStore>>) {
pub fn collect(node: Arc<Node<InMemoryStore>>, health: Health) {
let cx = opentelemetry::Context::current();

let channels = node.channel_manager.list_channels();
channel_metrics(&cx, channels);
node_metrics(&cx, node);
health_metrics(&cx, &health);
}

fn health_metrics(cx: &Context, health: &Health) {
update_health_metric(cx, health.get_coordinator_status(), &COORDINATOR_STATUS);
update_health_metric(cx, health.get_orderbook_status(), &ORDERBOOK_STATUS);
update_health_metric(
cx,
health.get_bitmex_pricefeed_status(),
&BITMEX_PRICEFEED_STATUS,
);
}

/// Updates the health metric given a service status
fn update_health_metric(cx: &Context, service_status: ServiceStatus, gauge: &ObservableGauge<u64>) {
let value = match service_status {
ServiceStatus::Offline => 0,
ServiceStatus::Online => 1,
ServiceStatus::Unknown => 2,
};
gauge.observe(cx, value, &[]);
}

fn channel_metrics(cx: &Context, channels: Vec<ChannelDetails>) {
Expand Down
17 changes: 17 additions & 0 deletions maker/src/routes.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
use crate::health::Health;
use crate::health::OverallMakerHealth;
use axum::extract::Path;
use axum::extract::State;
use axum::http::StatusCode;
Expand Down Expand Up @@ -29,17 +31,20 @@ pub struct AppState {
pub node: Arc<Node<InMemoryStore>>,
pub exporter: PrometheusExporter,
pub pool: Pool<ConnectionManager<PgConnection>>,
pub health: Health,
}

pub fn router(
node: Arc<Node<InMemoryStore>>,
exporter: PrometheusExporter,
pool: Pool<ConnectionManager<PgConnection>>,
health: Health,
) -> Router {
let app_state = Arc::new(AppState {
node,
exporter,
pool,
health,
});

Router::new()
Expand All @@ -52,6 +57,7 @@ pub fn router(
.route("/api/pay-invoice/:invoice", post(pay_invoice))
.route("/api/sync-on-chain", post(sync_on_chain))
.route("/metrics", get(get_metrics))
.route("/health", get(get_health))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For whom is this new endpoint?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it's useful in development, testing, and manual checking if we want to quickly see the state.

.with_state(app_state)
}

Expand Down Expand Up @@ -267,3 +273,14 @@ pub async fn get_metrics(State(state): State<Arc<AppState>>) -> impl IntoRespons

(StatusCode::OK, open_telemetry_metrics + &autometrics)
}

/// Returns 500 if any of the vital services are offline
pub async fn get_health(
State(state): State<Arc<AppState>>,
) -> Result<Json<OverallMakerHealth>, AppError> {
let resp = state
.health
.get_health()
.map_err(|e| AppError::InternalServerError(format!("Error: {e:#}")))?;
Ok(Json(resp))
}
Loading