Skip to content

Commit

Permalink
Merge pull request #1170 from get10101/feat/track-channel-state-of-ma…
Browse files Browse the repository at this point in the history
…ker-coordinator

feat: track channel state of maker
  • Loading branch information
holzeis authored Aug 28, 2023
2 parents 9d20215 + 9ed7a81 commit 1df5c45
Show file tree
Hide file tree
Showing 7 changed files with 234 additions and 3 deletions.
6 changes: 6 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion coordinator/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use trade::ContractSymbol;
use trade::Direction;

lazy_static! {
pub static ref METER: Meter = global::meter("coordinator");
pub static ref METER: Meter = global::meter("maker");

// channel details metrics
pub static ref CHANNEL_BALANCE_SATOSHI: ObservableGauge<u64> = METER
Expand Down
6 changes: 6 additions & 0 deletions maker/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ anyhow = { version = "1", features = ["backtrace"] }
async-stream = "0.3"
async-trait = "0.1"
atty = "0.2.14"
autometrics = { version = "0.5", features = ["prometheus-exporter"] }
axum = { version = "0.6.7", features = ["ws"] }
bdk = { version = "0.27.0", default-features = false, features = ["key-value-db", "use-esplora-blocking"] }
bitcoin = "0.29"
Expand All @@ -17,10 +18,15 @@ diesel = { version = "2.0.0", features = ["r2d2", "postgres"] }
diesel_migrations = "2.0.0"
futures = "0.3"
hex = "0.4"
lazy_static = "1.4.0"
lightning = { version = "0.0.114", features = ["max_level_trace"] }
ln-dlc-node = { path = "../crates/ln-dlc-node" }
# adding this as explicit dependency as we need the "vendored" flag for cross compilation
openssl = { version = "0.10.45", features = ["vendored"] }
opentelemetry = "0.19.0"
opentelemetry-prometheus = "0.12.0"
orderbook-commons = { path = "../crates/orderbook-commons" }
prometheus = "0.13.3"
rand = "0.8.5"
reqwest = "0.11.14"
rust_decimal = { version = "1", features = ["serde-with-float"] }
Expand Down
26 changes: 25 additions & 1 deletion maker/src/bin/maker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ use maker::cli::Opts;
use maker::ln::ldk_config;
use maker::ln::EventHandler;
use maker::logger;
use maker::metrics;
use maker::metrics::init_meter;
use maker::routes::router;
use maker::run_migration;
use maker::trading;
Expand All @@ -20,8 +22,12 @@ use std::net::IpAddr;
use std::net::Ipv4Addr;
use std::net::SocketAddr;
use std::sync::Arc;
use std::time::Duration;
use tokio::task::spawn_blocking;
use tracing::metadata::LevelFilter;

const PROCESS_PROMETHEUS_METRICS: Duration = Duration::from_secs(10);

#[tokio::main]
async fn main() -> Result<()> {
std::panic::set_hook(
Expand All @@ -36,6 +42,8 @@ async fn main() -> Result<()> {
}),
);

let exporter = init_meter();

let opts = Opts::read();
let data_dir = opts.data_dir()?;
let address = opts.p2p_address;
Expand Down Expand Up @@ -99,6 +107,19 @@ async fn main() -> Result<()> {
}
});

tokio::spawn({
let node = node.clone();
async move {
loop {
let node = node.clone();
spawn_blocking(move || metrics::collect(node))
.await
.expect("To spawn blocking thread");
tokio::time::sleep(PROCESS_PROMETHEUS_METRICS).await;
}
}
});

let manager = ConnectionManager::<PgConnection>::new(opts.database);
let pool = r2d2::Pool::builder()
.build(manager)
Expand All @@ -107,7 +128,10 @@ async fn main() -> Result<()> {
let mut conn = pool.get().expect("to get connection from pool");
run_migration(&mut conn);

let app = router(node, pool);
let app = router(node, exporter, pool);

// Start the metrics exporter
autometrics::prometheus_exporter::init();

let addr = SocketAddr::from((http_address.ip(), http_address.port()));
tracing::debug!("Listening on http://{}", addr);
Expand Down
1 change: 1 addition & 0 deletions maker/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ mod tests;
pub mod cli;
pub mod ln;
pub mod logger;
pub mod metrics;
pub mod routes;
pub mod schema;
pub mod trading;
Expand Down
152 changes: 152 additions & 0 deletions maker/src/metrics.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,152 @@
use lazy_static::lazy_static;
use lightning::ln::channelmanager::ChannelDetails;
use ln_dlc_node::node::InMemoryStore;
use ln_dlc_node::node::Node;
use opentelemetry::global;
use opentelemetry::metrics::Meter;
use opentelemetry::metrics::ObservableGauge;
use opentelemetry::sdk::export::metrics::aggregation;
use opentelemetry::sdk::metrics::controllers;
use opentelemetry::sdk::metrics::processors;
use opentelemetry::sdk::metrics::selectors;
use opentelemetry::Context;
use opentelemetry::KeyValue;
use opentelemetry_prometheus::PrometheusExporter;
use std::sync::Arc;
use std::time::Duration;

lazy_static! {
pub static ref METER: Meter = global::meter("coordinator");

// channel details metrics
pub static ref CHANNEL_BALANCE_SATOSHI: ObservableGauge<u64> = METER
.u64_observable_gauge("channel_balance_satoshi")
.with_description("Current channel balance in satoshi")
.init();
pub static ref CHANNEL_OUTBOUND_CAPACITY_SATOSHI: ObservableGauge<u64> = METER
.u64_observable_gauge("channel_outbound_capacity_satoshi")
.with_description("Channel outbound capacity in satoshi")
.init();
pub static ref CHANNEL_INBOUND_CAPACITY_SATOSHI: ObservableGauge<u64> = METER
.u64_observable_gauge("channel_inbound_capacity_satoshi")
.with_description("Channel inbound capacity in satoshi")
.init();
pub static ref CHANNEL_IS_USABLE: ObservableGauge<u64> = METER
.u64_observable_gauge("channel_is_usable")
.with_description("If a channel is usable")
.init();

// general node metrics
pub static ref CONNECTED_PEERS: ObservableGauge<u64> = METER
.u64_observable_gauge("node_connected_peers_total")
.with_description("Total number of connected peers")
.init();
pub static ref NODE_BALANCE_SATOSHI: ObservableGauge<u64> = METER
.u64_observable_gauge("node_balance_satoshi")
.with_description("Node balance in satoshi")
.init();
}

pub fn init_meter() -> PrometheusExporter {
let controller = controllers::basic(processors::factory(
selectors::simple::histogram([1.0, 2.0, 5.0, 10.0, 20.0, 50.0]),
aggregation::cumulative_temporality_selector(),
))
.with_collect_period(Duration::from_secs(10))
.build();

opentelemetry_prometheus::exporter(controller).init()
}

pub fn collect(node: Arc<Node<InMemoryStore>>) {
let cx = opentelemetry::Context::current();

let channels = node.channel_manager.list_channels();
channel_metrics(&cx, channels);
node_metrics(&cx, node);
}

fn channel_metrics(cx: &Context, channels: Vec<ChannelDetails>) {
for channel_detail in channels {
let key_values = [
KeyValue::new("channel_id", hex::encode(channel_detail.channel_id)),
KeyValue::new("is_outbound", channel_detail.is_outbound),
KeyValue::new("is_public", channel_detail.is_public),
];
CHANNEL_BALANCE_SATOSHI.observe(cx, channel_detail.balance_msat / 1_000, &key_values);
CHANNEL_OUTBOUND_CAPACITY_SATOSHI.observe(
cx,
channel_detail.outbound_capacity_msat / 1_000,
&key_values,
);
CHANNEL_INBOUND_CAPACITY_SATOSHI.observe(
cx,
channel_detail.inbound_capacity_msat / 1_000,
&key_values,
);
CHANNEL_IS_USABLE.observe(cx, channel_detail.is_usable as u64, &key_values);
}
}

fn node_metrics(cx: &Context, node: Arc<Node<InMemoryStore>>) {
let connected_peers = node.list_peers().len();
CONNECTED_PEERS.observe(cx, connected_peers as u64, &[]);
let offchain = node.get_ldk_balance();

NODE_BALANCE_SATOSHI.observe(
cx,
offchain.available(),
&[
KeyValue::new("type", "off-chain"),
KeyValue::new("status", "available"),
],
);
NODE_BALANCE_SATOSHI.observe(
cx,
offchain.pending_close(),
&[
KeyValue::new("type", "off-chain"),
KeyValue::new("status", "pending_close"),
],
);

match node.get_on_chain_balance() {
Ok(onchain) => {
NODE_BALANCE_SATOSHI.observe(
cx,
onchain.confirmed,
&[
KeyValue::new("type", "on-chain"),
KeyValue::new("status", "confirmed"),
],
);
NODE_BALANCE_SATOSHI.observe(
cx,
onchain.immature,
&[
KeyValue::new("type", "on-chain"),
KeyValue::new("status", "immature"),
],
);
NODE_BALANCE_SATOSHI.observe(
cx,
onchain.trusted_pending,
&[
KeyValue::new("type", "on-chain"),
KeyValue::new("status", "trusted_pending"),
],
);
NODE_BALANCE_SATOSHI.observe(
cx,
onchain.untrusted_pending,
&[
KeyValue::new("type", "on-chain"),
KeyValue::new("status", "untrusted_pending"),
],
);
}
Err(err) => {
tracing::error!("Could not retrieve on-chain balance for metrics {err:#}")
}
}
}
44 changes: 43 additions & 1 deletion maker/src/routes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,9 @@ use ln_dlc_node::node::InMemoryStore;
use ln_dlc_node::node::Node;
use ln_dlc_node::node::NodeInfo;
use ln_dlc_node::ChannelDetails;
use opentelemetry_prometheus::PrometheusExporter;
use prometheus::Encoder;
use prometheus::TextEncoder;
use serde::Deserialize;
use serde::Serialize;
use serde_json::json;
Expand All @@ -24,14 +27,20 @@ use tokio::task::spawn_blocking;

pub struct AppState {
pub node: Arc<Node<InMemoryStore>>,
pub exporter: PrometheusExporter,
pub pool: Pool<ConnectionManager<PgConnection>>,
}

pub fn router(
node: Arc<Node<InMemoryStore>>,
exporter: PrometheusExporter,
pool: Pool<ConnectionManager<PgConnection>>,
) -> Router {
let app_state = Arc::new(AppState { node, pool });
let app_state = Arc::new(AppState {
node,
exporter,
pool,
});

Router::new()
.route("/", get(index))
Expand All @@ -42,6 +51,7 @@ pub fn router(
.route("/api/connect", post(connect_to_peer))
.route("/api/pay-invoice/:invoice", post(pay_invoice))
.route("/api/sync-on-chain", post(sync_on_chain))
.route("/metrics", get(get_metrics))
.with_state(app_state)
}

Expand Down Expand Up @@ -225,3 +235,35 @@ pub async fn sync_on_chain(State(state): State<Arc<AppState>>) -> Result<(), App

Ok(())
}

pub async fn get_metrics(State(state): State<Arc<AppState>>) -> impl IntoResponse {
let autometrics = match autometrics::prometheus_exporter::encode_to_string() {
Ok(metrics) => metrics,
Err(err) => {
tracing::error!("Could not collect autometrics {err:#}");
return (StatusCode::INTERNAL_SERVER_ERROR, format!("{:?}", err));
}
};

let exporter = state.exporter.clone();
let encoder = TextEncoder::new();
let metric_families = exporter.registry().gather();
let mut result = vec![];
match encoder.encode(&metric_families, &mut result) {
Ok(()) => (),
Err(err) => {
tracing::error!("Could not collect opentelemetry metrics {err:#}");
return (StatusCode::INTERNAL_SERVER_ERROR, format!("{:?}", err));
}
};

let open_telemetry_metrics = match String::from_utf8(result) {
Ok(s) => s,
Err(err) => {
tracing::error!("Could not format metrics as string {err:#}");
return (StatusCode::INTERNAL_SERVER_ERROR, format!("{:?}", err));
}
};

(StatusCode::OK, open_telemetry_metrics + &autometrics)
}

0 comments on commit 1df5c45

Please sign in to comment.