Skip to content

Commit

Permalink
Merge pull request #8 from demeter-run/7-fix-dbsync-controller-to-use…
Browse files Browse the repository at this point in the history
…-threads

fix dbsync controller to use threads
  • Loading branch information
paulobressan authored Dec 20, 2023
2 parents 127323f + ba036e3 commit ee43773
Show file tree
Hide file tree
Showing 2 changed files with 30 additions and 13 deletions.
14 changes: 6 additions & 8 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ use actix_web::{
use dotenv::dotenv;
use prometheus::{Encoder, TextEncoder};
use std::{io, sync::Arc};
use tracing::error;

use ext_cardano_dbsync::{controller, metrics as metrics_collector, Config, State};

Expand All @@ -29,8 +28,11 @@ async fn main() -> io::Result<()> {
let state = Arc::new(State::new());
let config = Config::try_new().unwrap();

let controller = controller::run(state.clone(), config.clone());
let metrics_collector = metrics_collector::run_metrics_collector(state.clone(), config.clone());
let controller = tokio::spawn(controller::run(state.clone(), config.clone()));
let metrics_collector = tokio::spawn(metrics_collector::run_metrics_collector(
state.clone(),
config.clone(),
));

let addr = std::env::var("ADDR").unwrap_or("0.0.0.0:8080".into());

Expand All @@ -43,11 +45,7 @@ async fn main() -> io::Result<()> {
})
.bind(addr)?;

let result = tokio::join!(controller, metrics_collector, server.run()).1;
if let Err(err) = result {
error!("{err}");
std::process::exit(1)
}
tokio::join!(controller, metrics_collector, server.run()).2?;

Ok(())
}
29 changes: 24 additions & 5 deletions src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,8 @@ use crate::{
pub struct Metrics {
pub users_created: IntCounterVec,
pub users_deactivated: IntCounterVec,
pub failures: IntCounterVec,
pub reconcile_failures: IntCounterVec,
pub metrics_failures: IntCounterVec,
pub dcu: IntCounterVec,
}

Expand All @@ -36,7 +37,7 @@ impl Default for Metrics {
)
.unwrap();

let failures = IntCounterVec::new(
let reconcile_failures = IntCounterVec::new(
opts!(
"crd_controller_reconciliation_errors_total",
"reconciliation errors",
Expand All @@ -45,6 +46,15 @@ impl Default for Metrics {
)
.unwrap();

let metrics_failures = IntCounterVec::new(
opts!(
"metrics_controller_errors_total",
"errors to calculation metrics",
),
&["error"],
)
.unwrap();

let dcu = IntCounterVec::new(
opts!("dmtr_consumed_dcus", "quantity of dcu consumed",),
&["project", "service", "service_type", "tenancy"],
Expand All @@ -54,27 +64,34 @@ impl Default for Metrics {
Metrics {
users_created,
users_deactivated,
failures,
reconcile_failures,
metrics_failures,
dcu,
}
}
}

impl Metrics {
pub fn register(self, registry: &Registry) -> Result<Self, prometheus::Error> {
registry.register(Box::new(self.failures.clone()))?;
registry.register(Box::new(self.reconcile_failures.clone()))?;
registry.register(Box::new(self.users_created.clone()))?;
registry.register(Box::new(self.users_deactivated.clone()))?;
registry.register(Box::new(self.dcu.clone()))?;
Ok(self)
}

pub fn reconcile_failure(&self, crd: &DbSyncPort, e: &Error) {
self.failures
self.reconcile_failures
.with_label_values(&[crd.name_any().as_ref(), e.metric_label().as_ref()])
.inc()
}

pub fn metrics_failure(&self, e: &Error) {
self.metrics_failures
.with_label_values(&[e.metric_label().as_ref()])
.inc()
}

pub fn count_user_created(&self, username: &str) {
self.users_created.with_label_values(&[username]).inc();
}
Expand Down Expand Up @@ -124,13 +141,15 @@ pub async fn run_metrics_collector(state: Arc<State>, config: Config) -> Result<
let postgres_result = Postgres::new(url).await;
if let Err(err) = postgres_result {
error!("Error to connect postgres: {err}");
state.metrics.metrics_failure(&err);
continue;
}
let postgres = postgres_result.unwrap();

let user_statements_result = postgres.user_metrics().await;
if let Err(err) = user_statements_result {
error!("Error get user statements: {err}");
state.metrics.metrics_failure(&err);
continue;
}

Expand Down

0 comments on commit ee43773

Please sign in to comment.