From 9ed7a817628fbc4d8cda041162b8aa86b0cd62ea Mon Sep 17 00:00:00 2001 From: Richard Holzeis Date: Mon, 28 Aug 2023 14:03:46 +0200 Subject: [PATCH] feat: track channel state of maker --- Cargo.lock | 6 ++ coordinator/src/metrics.rs | 2 +- maker/Cargo.toml | 6 ++ maker/src/bin/maker.rs | 26 ++++++- maker/src/lib.rs | 1 + maker/src/metrics.rs | 152 +++++++++++++++++++++++++++++++++++++ maker/src/routes.rs | 44 ++++++++++- 7 files changed, 234 insertions(+), 3 deletions(-) create mode 100644 maker/src/metrics.rs diff --git a/Cargo.lock b/Cargo.lock index 5da9beef2..f38d75525 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1879,6 +1879,7 @@ dependencies = [ "async-stream", "async-trait", "atty", + "autometrics", "axum", "bdk", "bitcoin", @@ -1888,9 +1889,14 @@ dependencies = [ "diesel_migrations", "futures", "hex", + "lazy_static", + "lightning", "ln-dlc-node", "openssl", + "opentelemetry", + "opentelemetry-prometheus", "orderbook-commons", + "prometheus", "rand", "reqwest", "rust_decimal", diff --git a/coordinator/src/metrics.rs b/coordinator/src/metrics.rs index 2187a2281..d39382382 100644 --- a/coordinator/src/metrics.rs +++ b/coordinator/src/metrics.rs @@ -19,7 +19,7 @@ use trade::ContractSymbol; use trade::Direction; lazy_static! { - pub static ref METER: Meter = global::meter("coordinator"); + pub static ref METER: Meter = global::meter("maker"); // channel details metrics pub static ref CHANNEL_BALANCE_SATOSHI: ObservableGauge = METER diff --git a/maker/Cargo.toml b/maker/Cargo.toml index dbb962714..5bbefa4e5 100644 --- a/maker/Cargo.toml +++ b/maker/Cargo.toml @@ -8,6 +8,7 @@ anyhow = { version = "1", features = ["backtrace"] } async-stream = "0.3" async-trait = "0.1" atty = "0.2.14" +autometrics = { version = "0.5", features = ["prometheus-exporter"] } axum = { version = "0.6.7", features = ["ws"] } bdk = { version = "0.27.0", default-features = false, features = ["key-value-db", "use-esplora-blocking"] } bitcoin = "0.29" @@ -17,10 +18,15 @@ diesel = { version = "2.0.0", features = ["r2d2", "postgres"] } diesel_migrations = "2.0.0" futures = "0.3" hex = "0.4" +lazy_static = "1.4.0" +lightning = { version = "0.0.114", features = ["max_level_trace"] } ln-dlc-node = { path = "../crates/ln-dlc-node" } # adding this as explicit dependency as we need the "vendored" flag for cross compilation openssl = { version = "0.10.45", features = ["vendored"] } +opentelemetry = "0.19.0" +opentelemetry-prometheus = "0.12.0" orderbook-commons = { path = "../crates/orderbook-commons" } +prometheus = "0.13.3" rand = "0.8.5" reqwest = "0.11.14" rust_decimal = { version = "1", features = ["serde-with-float"] } diff --git a/maker/src/bin/maker.rs b/maker/src/bin/maker.rs index e37c977cf..07bd8cc8d 100644 --- a/maker/src/bin/maker.rs +++ b/maker/src/bin/maker.rs @@ -10,6 +10,8 @@ use maker::cli::Opts; use maker::ln::ldk_config; use maker::ln::EventHandler; use maker::logger; +use maker::metrics; +use maker::metrics::init_meter; use maker::routes::router; use maker::run_migration; use maker::trading; @@ -20,8 +22,12 @@ use std::net::IpAddr; use std::net::Ipv4Addr; use std::net::SocketAddr; use std::sync::Arc; +use std::time::Duration; +use tokio::task::spawn_blocking; use tracing::metadata::LevelFilter; +const PROCESS_PROMETHEUS_METRICS: Duration = Duration::from_secs(10); + #[tokio::main] async fn main() -> Result<()> { std::panic::set_hook( @@ -36,6 +42,8 @@ async fn main() -> Result<()> { }), ); + let exporter = init_meter(); + let opts = Opts::read(); let data_dir = opts.data_dir()?; let address = opts.p2p_address; @@ -99,6 +107,19 @@ async fn main() -> Result<()> { } }); + tokio::spawn({ + let node = node.clone(); + async move { + loop { + let node = node.clone(); + spawn_blocking(move || metrics::collect(node)) + .await + .expect("To spawn blocking thread"); + tokio::time::sleep(PROCESS_PROMETHEUS_METRICS).await; + } + } + }); + let manager = ConnectionManager::::new(opts.database); let pool = r2d2::Pool::builder() .build(manager) @@ -107,7 +128,10 @@ async fn main() -> Result<()> { let mut conn = pool.get().expect("to get connection from pool"); run_migration(&mut conn); - let app = router(node, pool); + let app = router(node, exporter, pool); + + // Start the metrics exporter + autometrics::prometheus_exporter::init(); let addr = SocketAddr::from((http_address.ip(), http_address.port())); tracing::debug!("Listening on http://{}", addr); diff --git a/maker/src/lib.rs b/maker/src/lib.rs index 054a988ca..bbfdc2f40 100644 --- a/maker/src/lib.rs +++ b/maker/src/lib.rs @@ -9,6 +9,7 @@ mod tests; pub mod cli; pub mod ln; pub mod logger; +pub mod metrics; pub mod routes; pub mod schema; pub mod trading; diff --git a/maker/src/metrics.rs b/maker/src/metrics.rs new file mode 100644 index 000000000..ecc8037f5 --- /dev/null +++ b/maker/src/metrics.rs @@ -0,0 +1,152 @@ +use lazy_static::lazy_static; +use lightning::ln::channelmanager::ChannelDetails; +use ln_dlc_node::node::InMemoryStore; +use ln_dlc_node::node::Node; +use opentelemetry::global; +use opentelemetry::metrics::Meter; +use opentelemetry::metrics::ObservableGauge; +use opentelemetry::sdk::export::metrics::aggregation; +use opentelemetry::sdk::metrics::controllers; +use opentelemetry::sdk::metrics::processors; +use opentelemetry::sdk::metrics::selectors; +use opentelemetry::Context; +use opentelemetry::KeyValue; +use opentelemetry_prometheus::PrometheusExporter; +use std::sync::Arc; +use std::time::Duration; + +lazy_static! { + pub static ref METER: Meter = global::meter("coordinator"); + + // channel details metrics + pub static ref CHANNEL_BALANCE_SATOSHI: ObservableGauge = METER + .u64_observable_gauge("channel_balance_satoshi") + .with_description("Current channel balance in satoshi") + .init(); + pub static ref CHANNEL_OUTBOUND_CAPACITY_SATOSHI: ObservableGauge = METER + .u64_observable_gauge("channel_outbound_capacity_satoshi") + .with_description("Channel outbound capacity in satoshi") + .init(); + pub static ref CHANNEL_INBOUND_CAPACITY_SATOSHI: ObservableGauge = METER + .u64_observable_gauge("channel_inbound_capacity_satoshi") + .with_description("Channel inbound capacity in satoshi") + .init(); + pub static ref CHANNEL_IS_USABLE: ObservableGauge = METER + .u64_observable_gauge("channel_is_usable") + .with_description("If a channel is usable") + .init(); + + // general node metrics + pub static ref CONNECTED_PEERS: ObservableGauge = METER + .u64_observable_gauge("node_connected_peers_total") + .with_description("Total number of connected peers") + .init(); + pub static ref NODE_BALANCE_SATOSHI: ObservableGauge = METER + .u64_observable_gauge("node_balance_satoshi") + .with_description("Node balance in satoshi") + .init(); +} + +pub fn init_meter() -> PrometheusExporter { + let controller = controllers::basic(processors::factory( + selectors::simple::histogram([1.0, 2.0, 5.0, 10.0, 20.0, 50.0]), + aggregation::cumulative_temporality_selector(), + )) + .with_collect_period(Duration::from_secs(10)) + .build(); + + opentelemetry_prometheus::exporter(controller).init() +} + +pub fn collect(node: Arc>) { + let cx = opentelemetry::Context::current(); + + let channels = node.channel_manager.list_channels(); + channel_metrics(&cx, channels); + node_metrics(&cx, node); +} + +fn channel_metrics(cx: &Context, channels: Vec) { + for channel_detail in channels { + let key_values = [ + KeyValue::new("channel_id", hex::encode(channel_detail.channel_id)), + KeyValue::new("is_outbound", channel_detail.is_outbound), + KeyValue::new("is_public", channel_detail.is_public), + ]; + CHANNEL_BALANCE_SATOSHI.observe(cx, channel_detail.balance_msat / 1_000, &key_values); + CHANNEL_OUTBOUND_CAPACITY_SATOSHI.observe( + cx, + channel_detail.outbound_capacity_msat / 1_000, + &key_values, + ); + CHANNEL_INBOUND_CAPACITY_SATOSHI.observe( + cx, + channel_detail.inbound_capacity_msat / 1_000, + &key_values, + ); + CHANNEL_IS_USABLE.observe(cx, channel_detail.is_usable as u64, &key_values); + } +} + +fn node_metrics(cx: &Context, node: Arc>) { + let connected_peers = node.list_peers().len(); + CONNECTED_PEERS.observe(cx, connected_peers as u64, &[]); + let offchain = node.get_ldk_balance(); + + NODE_BALANCE_SATOSHI.observe( + cx, + offchain.available(), + &[ + KeyValue::new("type", "off-chain"), + KeyValue::new("status", "available"), + ], + ); + NODE_BALANCE_SATOSHI.observe( + cx, + offchain.pending_close(), + &[ + KeyValue::new("type", "off-chain"), + KeyValue::new("status", "pending_close"), + ], + ); + + match node.get_on_chain_balance() { + Ok(onchain) => { + NODE_BALANCE_SATOSHI.observe( + cx, + onchain.confirmed, + &[ + KeyValue::new("type", "on-chain"), + KeyValue::new("status", "confirmed"), + ], + ); + NODE_BALANCE_SATOSHI.observe( + cx, + onchain.immature, + &[ + KeyValue::new("type", "on-chain"), + KeyValue::new("status", "immature"), + ], + ); + NODE_BALANCE_SATOSHI.observe( + cx, + onchain.trusted_pending, + &[ + KeyValue::new("type", "on-chain"), + KeyValue::new("status", "trusted_pending"), + ], + ); + NODE_BALANCE_SATOSHI.observe( + cx, + onchain.untrusted_pending, + &[ + KeyValue::new("type", "on-chain"), + KeyValue::new("status", "untrusted_pending"), + ], + ); + } + Err(err) => { + tracing::error!("Could not retrieve on-chain balance for metrics {err:#}") + } + } +} diff --git a/maker/src/routes.rs b/maker/src/routes.rs index 0b981c798..d7d0a72c4 100644 --- a/maker/src/routes.rs +++ b/maker/src/routes.rs @@ -15,6 +15,9 @@ use ln_dlc_node::node::InMemoryStore; use ln_dlc_node::node::Node; use ln_dlc_node::node::NodeInfo; use ln_dlc_node::ChannelDetails; +use opentelemetry_prometheus::PrometheusExporter; +use prometheus::Encoder; +use prometheus::TextEncoder; use serde::Deserialize; use serde::Serialize; use serde_json::json; @@ -24,14 +27,20 @@ use tokio::task::spawn_blocking; pub struct AppState { pub node: Arc>, + pub exporter: PrometheusExporter, pub pool: Pool>, } pub fn router( node: Arc>, + exporter: PrometheusExporter, pool: Pool>, ) -> Router { - let app_state = Arc::new(AppState { node, pool }); + let app_state = Arc::new(AppState { + node, + exporter, + pool, + }); Router::new() .route("/", get(index)) @@ -42,6 +51,7 @@ pub fn router( .route("/api/connect", post(connect_to_peer)) .route("/api/pay-invoice/:invoice", post(pay_invoice)) .route("/api/sync-on-chain", post(sync_on_chain)) + .route("/metrics", get(get_metrics)) .with_state(app_state) } @@ -225,3 +235,35 @@ pub async fn sync_on_chain(State(state): State>) -> Result<(), App Ok(()) } + +pub async fn get_metrics(State(state): State>) -> impl IntoResponse { + let autometrics = match autometrics::prometheus_exporter::encode_to_string() { + Ok(metrics) => metrics, + Err(err) => { + tracing::error!("Could not collect autometrics {err:#}"); + return (StatusCode::INTERNAL_SERVER_ERROR, format!("{:?}", err)); + } + }; + + let exporter = state.exporter.clone(); + let encoder = TextEncoder::new(); + let metric_families = exporter.registry().gather(); + let mut result = vec![]; + match encoder.encode(&metric_families, &mut result) { + Ok(()) => (), + Err(err) => { + tracing::error!("Could not collect opentelemetry metrics {err:#}"); + return (StatusCode::INTERNAL_SERVER_ERROR, format!("{:?}", err)); + } + }; + + let open_telemetry_metrics = match String::from_utf8(result) { + Ok(s) => s, + Err(err) => { + tracing::error!("Could not format metrics as string {err:#}"); + return (StatusCode::INTERNAL_SERVER_ERROR, format!("{:?}", err)); + } + }; + + (StatusCode::OK, open_telemetry_metrics + &autometrics) +}