From 8b90e98e9f7db747113ba82d19f3c54b08ee5083 Mon Sep 17 00:00:00 2001 From: Paulo Bressan Date: Thu, 21 Mar 2024 15:42:16 -0300 Subject: [PATCH] Support old kong metrics collector (#41) --- operator/src/lib.rs | 6 ++ operator/src/main.rs | 1 + operator/src/metrics.rs | 219 +++++++++++++++++++++++++++------------- 3 files changed, 154 insertions(+), 72 deletions(-) diff --git a/operator/src/lib.rs b/operator/src/lib.rs index f36b67d..e94bec7 100644 --- a/operator/src/lib.rs +++ b/operator/src/lib.rs @@ -41,6 +41,12 @@ impl From for Error { } } +impl From for Error { + fn from(value: reqwest::Error) -> Self { + Error::HttpError(value.to_string()) + } +} + #[derive(Clone)] pub struct State { registry: Registry, diff --git a/operator/src/main.rs b/operator/src/main.rs index 37f8c94..455d984 100644 --- a/operator/src/main.rs +++ b/operator/src/main.rs @@ -13,6 +13,7 @@ async fn main() -> io::Result<()> { let state = Arc::new(State::default()); metrics_collector::run_metrics_collector(state.clone()); + metrics_collector::run_kong_metrics_collector(state.clone()); metrics_collector::run_metrics_server(state.clone()); controller::run(state.clone()).await; diff --git a/operator/src/metrics.rs b/operator/src/metrics.rs index 0d767fe..a6c2c6f 100644 --- a/operator/src/metrics.rs +++ b/operator/src/metrics.rs @@ -8,9 +8,9 @@ use regex::Regex; use serde::{Deserialize, Deserializer}; use std::{net::SocketAddr, str::FromStr, sync::Arc}; use tokio::net::TcpListener; -use tracing::{error, info, instrument}; +use tracing::{error, info, instrument, warn}; -use crate::{get_config, Error, KupoPort, State}; +use crate::{get_config, Config, Error, KupoPort, State}; #[derive(Clone)] pub struct Metrics { @@ -87,13 +87,99 @@ impl Metrics { } } +async fn api_get_metrics( + state: Arc, +) -> Result>, hyper::Error> { + let metrics = state.metrics_collected(); + + let encoder = TextEncoder::new(); + let mut buffer = vec![]; + encoder.encode(&metrics, &mut buffer).unwrap(); + + let res = Response::builder() + .body( + Full::new(buffer.into()) + .map_err(|never| match never {}) + .boxed(), + ) + .unwrap(); + Ok(res) +} + +pub fn run_metrics_server(state: Arc) { + tokio::spawn(async move { + let addr = std::env::var("ADDR").unwrap_or("0.0.0.0:8080".into()); + let addr_result = SocketAddr::from_str(&addr); + if let Err(err) = addr_result { + error!(error = err.to_string(), "invalid prometheus addr"); + std::process::exit(1); + } + let addr = addr_result.unwrap(); + + let listener_result = TcpListener::bind(addr).await; + if let Err(err) = listener_result { + error!( + error = err.to_string(), + "fail to bind tcp prometheus server listener" + ); + std::process::exit(1); + } + let listener = listener_result.unwrap(); + + info!(addr = addr.to_string(), "metrics listening"); + + loop { + let state = state.clone(); + + let accept_result = listener.accept().await; + if let Err(err) = accept_result { + error!(error = err.to_string(), "accept client prometheus server"); + continue; + } + let (stream, _) = accept_result.unwrap(); + + let io = TokioIo::new(stream); + + tokio::task::spawn(async move { + let service = service_fn(move |_| api_get_metrics(state.clone())); + + if let Err(err) = http1::Builder::new().serve_connection(io, service).await { + error!(error = err.to_string(), "failed metrics server connection"); + } + }); + } + }); +} + +async fn collect_prometheus_metrics( + config: &Config, + query: String, +) -> Result { + let client = reqwest::Client::builder().build().unwrap(); + + let response = client + .get(format!("{}/query?query={query}", config.prometheus_url)) + .send() + .await?; + + let status = response.status(); + if status.is_client_error() || status.is_server_error() { + error!(status = status.to_string(), "request status code fail"); + return Err(Error::HttpError(format!( + "Prometheus request error. Status: {} Query: {}", + status, query + ))); + } + + Ok(response.json().await.unwrap()) +} + #[instrument("metrics collector run", skip_all)] pub fn run_metrics_collector(state: Arc) { tokio::spawn(async move { info!("collecting metrics running"); let config = get_config(); - let client = reqwest::Client::builder().build().unwrap(); let project_regex = Regex::new(r"prj-(.+)\..+").unwrap(); let network_regex = Regex::new(r"kupo-([\w]+)-.+").unwrap(); let mut last_execution = Utc::now(); @@ -111,31 +197,14 @@ pub fn run_metrics_collector(state: Arc) { end.timestamp_millis() / 1000 ); - let result = client - .get(format!("{}/query?query={query}", config.prometheus_url)) - .send() - .await; - - if let Err(err) = result { + let response = collect_prometheus_metrics(config, query).await; + if let Err(err) = response { error!(error = err.to_string(), "error to make prometheus request"); - state - .metrics - .metrics_failure(&Error::HttpError(err.to_string())); + state.metrics.metrics_failure(&err); continue; } + let response = response.unwrap(); - let response = result.unwrap(); - let status = response.status(); - if status.is_client_error() || status.is_server_error() { - error!(status = status.to_string(), "request status code fail"); - state.metrics.metrics_failure(&Error::HttpError(format!( - "Prometheus request error. Status: {} Query: {}", - status, query - ))); - continue; - } - - let response = response.json::().await.unwrap(); for result in response.data.result { if result.value == 0.0 || result.metric.consumer.is_none() @@ -147,6 +216,7 @@ pub fn run_metrics_collector(state: Arc) { let consumer = result.metric.consumer.unwrap(); let project_captures = project_regex.captures(&consumer); if project_captures.is_none() { + warn!(consumer, "invalid project to the regex"); continue; } let project_captures = project_captures.unwrap(); @@ -155,6 +225,7 @@ pub fn run_metrics_collector(state: Arc) { let instance = result.metric.exported_instance.unwrap(); let network_captures = network_regex.captures(&instance); if network_captures.is_none() { + warn!(instance, "invalid network to the regex"); continue; } let network_captures = network_captures.unwrap(); @@ -179,68 +250,72 @@ pub fn run_metrics_collector(state: Arc) { }); } -pub fn run_metrics_server(state: Arc) { +#[instrument("kong metrics collector run", skip_all)] +pub fn run_kong_metrics_collector(state: Arc) { tokio::spawn(async move { - let addr = std::env::var("ADDR").unwrap_or("0.0.0.0:8080".into()); - let addr_result = SocketAddr::from_str(&addr); - if let Err(err) = addr_result { - error!(error = err.to_string(), "invalid prometheus addr"); - std::process::exit(1); - } - let addr = addr_result.unwrap(); + info!("collecting kong metrics running"); - let listener_result = TcpListener::bind(addr).await; - if let Err(err) = listener_result { - error!( - error = err.to_string(), - "fail to bind tcp prometheus server listener" - ); - std::process::exit(1); - } - let listener = listener_result.unwrap(); - - info!(addr = addr.to_string(), "metrics listening"); + let config = get_config(); + let regex = Regex::new(r"(.+)\.(.+)-.+").unwrap(); + let mut last_execution = Utc::now(); loop { - let state = state.clone(); + tokio::time::sleep(config.metrics_delay).await; - let accept_result = listener.accept().await; - if let Err(err) = accept_result { - error!(error = err.to_string(), "accept client prometheus server"); + let end = Utc::now(); + let start = (end - last_execution).num_seconds(); + + last_execution = end; + + let query = format!( + "sum by (consumer) (increase(kong_http_requests_total{{service='kupo-v1-ingress-kong-proxy', code!~\"429|401|503\"}}[{start}s] @ {}))", + end.timestamp_millis() / 1000 + ); + + let response = collect_prometheus_metrics(config, query).await; + if let Err(err) = response { + error!(error = err.to_string(), "error to make prometheus request"); + state.metrics.metrics_failure(&err); continue; } - let (stream, _) = accept_result.unwrap(); + let response = response.unwrap(); - let io = TokioIo::new(stream); + for result in response.data.result { + if result.value == 0.0 || result.metric.consumer.is_none() { + continue; + } - tokio::task::spawn(async move { - let service = service_fn(move |_| api_get_metrics(state.clone())); + let consumer = result.metric.consumer.unwrap(); - if let Err(err) = http1::Builder::new().serve_connection(io, service).await { - error!(error = err.to_string(), "failed metrics server connection"); + let captures = regex.captures(&consumer); + if captures.is_none() { + warn!(consumer, "invalid consumer to the regex"); + continue; } - }); - } - }); -} -async fn api_get_metrics( - state: Arc, -) -> Result>, hyper::Error> { - let metrics = state.metrics_collected(); + let captures = captures.unwrap(); + let namespace = captures.get(1).unwrap().as_str(); + let network = captures.get(2).unwrap().as_str(); - let encoder = TextEncoder::new(); - let mut buffer = vec![]; - encoder.encode(&metrics, &mut buffer).unwrap(); + let dcu_per_request = config.dcu_per_request.get(network); + if dcu_per_request.is_none() { + let error = Error::ConfigError(format!( + "dcu_per_request not configured to {} network", + network + )); + error!(error = error.to_string()); + state.metrics.metrics_failure(&error); + continue; + } + let dcu_per_request = dcu_per_request.unwrap(); - let res = Response::builder() - .body( - Full::new(buffer.into()) - .map_err(|never| match never {}) - .boxed(), - ) - .unwrap(); - Ok(res) + let dcu = result.value * dcu_per_request; + + let project = namespace.split_once("prj-").unwrap().1; + state.metrics.count_dcu_consumed(project, network, dcu); + } + } + }); } #[derive(Debug, Deserialize)]