Skip to content

Commit

Permalink
Support old kong metrics collector (#41)
Browse files Browse the repository at this point in the history
  • Loading branch information
paulobressan authored Mar 21, 2024
1 parent 4710dd0 commit 8b90e98
Show file tree
Hide file tree
Showing 3 changed files with 154 additions and 72 deletions.
6 changes: 6 additions & 0 deletions operator/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,12 @@ impl From<kube::Error> for Error {
}
}

impl From<reqwest::Error> for Error {
fn from(value: reqwest::Error) -> Self {
Error::HttpError(value.to_string())
}
}

#[derive(Clone)]
pub struct State {
registry: Registry,
Expand Down
1 change: 1 addition & 0 deletions operator/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ async fn main() -> io::Result<()> {
let state = Arc::new(State::default());

metrics_collector::run_metrics_collector(state.clone());
metrics_collector::run_kong_metrics_collector(state.clone());
metrics_collector::run_metrics_server(state.clone());

controller::run(state.clone()).await;
Expand Down
219 changes: 147 additions & 72 deletions operator/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,9 @@ use regex::Regex;
use serde::{Deserialize, Deserializer};
use std::{net::SocketAddr, str::FromStr, sync::Arc};
use tokio::net::TcpListener;
use tracing::{error, info, instrument};
use tracing::{error, info, instrument, warn};

use crate::{get_config, Error, KupoPort, State};
use crate::{get_config, Config, Error, KupoPort, State};

#[derive(Clone)]
pub struct Metrics {
Expand Down Expand Up @@ -87,13 +87,99 @@ impl Metrics {
}
}

async fn api_get_metrics(
state: Arc<State>,
) -> Result<Response<BoxBody<Bytes, hyper::Error>>, hyper::Error> {
let metrics = state.metrics_collected();

let encoder = TextEncoder::new();
let mut buffer = vec![];
encoder.encode(&metrics, &mut buffer).unwrap();

let res = Response::builder()
.body(
Full::new(buffer.into())
.map_err(|never| match never {})
.boxed(),
)
.unwrap();
Ok(res)
}

pub fn run_metrics_server(state: Arc<State>) {
tokio::spawn(async move {
let addr = std::env::var("ADDR").unwrap_or("0.0.0.0:8080".into());
let addr_result = SocketAddr::from_str(&addr);
if let Err(err) = addr_result {
error!(error = err.to_string(), "invalid prometheus addr");
std::process::exit(1);
}
let addr = addr_result.unwrap();

let listener_result = TcpListener::bind(addr).await;
if let Err(err) = listener_result {
error!(
error = err.to_string(),
"fail to bind tcp prometheus server listener"
);
std::process::exit(1);
}
let listener = listener_result.unwrap();

info!(addr = addr.to_string(), "metrics listening");

loop {
let state = state.clone();

let accept_result = listener.accept().await;
if let Err(err) = accept_result {
error!(error = err.to_string(), "accept client prometheus server");
continue;
}
let (stream, _) = accept_result.unwrap();

let io = TokioIo::new(stream);

tokio::task::spawn(async move {
let service = service_fn(move |_| api_get_metrics(state.clone()));

if let Err(err) = http1::Builder::new().serve_connection(io, service).await {
error!(error = err.to_string(), "failed metrics server connection");
}
});
}
});
}

async fn collect_prometheus_metrics(
config: &Config,
query: String,
) -> Result<PrometheusResponse, Error> {
let client = reqwest::Client::builder().build().unwrap();

let response = client
.get(format!("{}/query?query={query}", config.prometheus_url))
.send()
.await?;

let status = response.status();
if status.is_client_error() || status.is_server_error() {
error!(status = status.to_string(), "request status code fail");
return Err(Error::HttpError(format!(
"Prometheus request error. Status: {} Query: {}",
status, query
)));
}

Ok(response.json().await.unwrap())
}

#[instrument("metrics collector run", skip_all)]
pub fn run_metrics_collector(state: Arc<State>) {
tokio::spawn(async move {
info!("collecting metrics running");

let config = get_config();
let client = reqwest::Client::builder().build().unwrap();
let project_regex = Regex::new(r"prj-(.+)\..+").unwrap();
let network_regex = Regex::new(r"kupo-([\w]+)-.+").unwrap();
let mut last_execution = Utc::now();
Expand All @@ -111,31 +197,14 @@ pub fn run_metrics_collector(state: Arc<State>) {
end.timestamp_millis() / 1000
);

let result = client
.get(format!("{}/query?query={query}", config.prometheus_url))
.send()
.await;

if let Err(err) = result {
let response = collect_prometheus_metrics(config, query).await;
if let Err(err) = response {
error!(error = err.to_string(), "error to make prometheus request");
state
.metrics
.metrics_failure(&Error::HttpError(err.to_string()));
state.metrics.metrics_failure(&err);
continue;
}
let response = response.unwrap();

let response = result.unwrap();
let status = response.status();
if status.is_client_error() || status.is_server_error() {
error!(status = status.to_string(), "request status code fail");
state.metrics.metrics_failure(&Error::HttpError(format!(
"Prometheus request error. Status: {} Query: {}",
status, query
)));
continue;
}

let response = response.json::<PrometheusResponse>().await.unwrap();
for result in response.data.result {
if result.value == 0.0
|| result.metric.consumer.is_none()
Expand All @@ -147,6 +216,7 @@ pub fn run_metrics_collector(state: Arc<State>) {
let consumer = result.metric.consumer.unwrap();
let project_captures = project_regex.captures(&consumer);
if project_captures.is_none() {
warn!(consumer, "invalid project to the regex");
continue;
}
let project_captures = project_captures.unwrap();
Expand All @@ -155,6 +225,7 @@ pub fn run_metrics_collector(state: Arc<State>) {
let instance = result.metric.exported_instance.unwrap();
let network_captures = network_regex.captures(&instance);
if network_captures.is_none() {
warn!(instance, "invalid network to the regex");
continue;
}
let network_captures = network_captures.unwrap();
Expand All @@ -179,68 +250,72 @@ pub fn run_metrics_collector(state: Arc<State>) {
});
}

pub fn run_metrics_server(state: Arc<State>) {
#[instrument("kong metrics collector run", skip_all)]
pub fn run_kong_metrics_collector(state: Arc<State>) {
tokio::spawn(async move {
let addr = std::env::var("ADDR").unwrap_or("0.0.0.0:8080".into());
let addr_result = SocketAddr::from_str(&addr);
if let Err(err) = addr_result {
error!(error = err.to_string(), "invalid prometheus addr");
std::process::exit(1);
}
let addr = addr_result.unwrap();
info!("collecting kong metrics running");

let listener_result = TcpListener::bind(addr).await;
if let Err(err) = listener_result {
error!(
error = err.to_string(),
"fail to bind tcp prometheus server listener"
);
std::process::exit(1);
}
let listener = listener_result.unwrap();

info!(addr = addr.to_string(), "metrics listening");
let config = get_config();
let regex = Regex::new(r"(.+)\.(.+)-.+").unwrap();
let mut last_execution = Utc::now();

loop {
let state = state.clone();
tokio::time::sleep(config.metrics_delay).await;

let accept_result = listener.accept().await;
if let Err(err) = accept_result {
error!(error = err.to_string(), "accept client prometheus server");
let end = Utc::now();
let start = (end - last_execution).num_seconds();

last_execution = end;

let query = format!(
"sum by (consumer) (increase(kong_http_requests_total{{service='kupo-v1-ingress-kong-proxy', code!~\"429|401|503\"}}[{start}s] @ {}))",
end.timestamp_millis() / 1000
);

let response = collect_prometheus_metrics(config, query).await;
if let Err(err) = response {
error!(error = err.to_string(), "error to make prometheus request");
state.metrics.metrics_failure(&err);
continue;
}
let (stream, _) = accept_result.unwrap();
let response = response.unwrap();

let io = TokioIo::new(stream);
for result in response.data.result {
if result.value == 0.0 || result.metric.consumer.is_none() {
continue;
}

tokio::task::spawn(async move {
let service = service_fn(move |_| api_get_metrics(state.clone()));
let consumer = result.metric.consumer.unwrap();

if let Err(err) = http1::Builder::new().serve_connection(io, service).await {
error!(error = err.to_string(), "failed metrics server connection");
let captures = regex.captures(&consumer);
if captures.is_none() {
warn!(consumer, "invalid consumer to the regex");
continue;
}
});
}
});
}

async fn api_get_metrics(
state: Arc<State>,
) -> Result<Response<BoxBody<Bytes, hyper::Error>>, hyper::Error> {
let metrics = state.metrics_collected();
let captures = captures.unwrap();
let namespace = captures.get(1).unwrap().as_str();
let network = captures.get(2).unwrap().as_str();

let encoder = TextEncoder::new();
let mut buffer = vec![];
encoder.encode(&metrics, &mut buffer).unwrap();
let dcu_per_request = config.dcu_per_request.get(network);
if dcu_per_request.is_none() {
let error = Error::ConfigError(format!(
"dcu_per_request not configured to {} network",
network
));
error!(error = error.to_string());
state.metrics.metrics_failure(&error);
continue;
}
let dcu_per_request = dcu_per_request.unwrap();

let res = Response::builder()
.body(
Full::new(buffer.into())
.map_err(|never| match never {})
.boxed(),
)
.unwrap();
Ok(res)
let dcu = result.value * dcu_per_request;

let project = namespace.split_once("prj-").unwrap().1;
state.metrics.count_dcu_consumed(project, network, dcu);
}
}
});
}

#[derive(Debug, Deserialize)]
Expand Down

0 comments on commit 8b90e98

Please sign in to comment.