Skip to content

Commit

Permalink
feat: compute DCU usage (#6)
Browse files Browse the repository at this point in the history
  • Loading branch information
paulobressan authored Dec 20, 2023
1 parent 1b492d6 commit 127323f
Show file tree
Hide file tree
Showing 14 changed files with 411 additions and 173 deletions.
15 changes: 3 additions & 12 deletions .github/workflows/validate.yml
Original file line number Diff line number Diff line change
Expand Up @@ -32,17 +32,8 @@ jobs:
- name: Load Image into Kind
run: kind load docker-image ext-cardano-dbsync:1.0 --name k8scluster

- name: Apply manifest
run: kubectl apply -f test/manifest.yaml

- name: Wait containers is ready
run: sleep 8;

- name: Apply manifests
run: kubectl apply -f test/dbsyncport.yaml
run: kubectl apply -f test

- name: Validate if controller was executed
run: |
username=$(kubectl describe dbsyncports.demeter.run --namespace project useraccess | grep -oP 'Username: \K\S+')
password=$(kubectl describe dbsyncports.demeter.run --namespace project useraccess | grep -oP 'Password: \K\S+')
if [ -z "$username" ] && [ -z "$password" ]; then echo "Error: controller not executed" && exit 1; fi
- name: Validate controller
run: ./test/validate-execution
3 changes: 0 additions & 3 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,16 +24,13 @@ prometheus = "0.13.3"
actix-web = "4.4.0"

[[bin]]
doc = false
name = "controller"
path = "src/main.rs"

[[bin]]
doc = false
name = "crdgen"
path = "src/crdgen.rs"

[lib]
name = "controller"
path = "src/lib.rs"

28 changes: 22 additions & 6 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,30 @@

This project is a Kubernetes custom controller to create users on dbsync's Postgres. This controller defines a new CRD DbSyncPort on Kubernetes and when the new users enable the External Dbsync, the Demiter will generate a manifest with the kind DbSyncPort and the controller will be watching for creating a new user on Postgres.

> [!IMPORTANT]
> The metrics collector uses the `pg_stat_statements` extension enabled on Postgres. To enable that extension follow the steps bellow.
- set pg_stat_statements at `shared_preload_libraries` on postgresql.conf
```
shared_preload_libraries = 'pg_stat_statements'
```
- create the extension on postgres
```
CREATE EXTENSION IF NOT EXISTS pg_stat_statements;
```

## Environment

| Key | Value |
| -------------- | ------------------------------------- |
| ADDR | 0.0.0.0:5000 |
| DB_URL_MAINNET | postgres://user:password@host:post/db |
| DB_URL_PREPROD | postgres://user:password@host:post/db |
| DB_URL_PREVIEW | postgres://user:password@host:post/db |
| Key | Value |
| ---------------------- | ------------------------------------- |
| ADDR | 0.0.0.0:5000 |
| DB_URL_MAINNET | postgres://user:password@host:post/db |
| DB_URL_PREPROD | postgres://user:password@host:post/db |
| DB_URL_PREVIEW | postgres://user:password@host:post/db |
| METRICS_DELAY | 30 |
| DCU_PER_SECOND_MAINNET | 10 |
| DCU_PER_SECOND_PREPROD | 10 |
| DCU_PER_SECOND_PREVIEW | 10 |

## Commands

Expand Down
59 changes: 15 additions & 44 deletions src/controller.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,3 @@
use crate::{
postgres::{self, user_already_exists, user_create, user_disable, user_enable},
Config, Error, Metrics, Result,
};
use futures::StreamExt;
use kube::{
api::{Patch, PatchParams},
Expand All @@ -20,6 +16,8 @@ use serde_json::json;
use std::{sync::Arc, time::Duration};
use tracing::error;

use crate::{postgres::Postgres, Config, Error, Metrics, State, Network};

pub static DB_SYNC_PORT_FINALIZER: &str = "dbsyncports.demeter.run";

struct Context {
Expand All @@ -36,25 +34,6 @@ impl Context {
}
}
}
#[derive(Clone, Default)]
pub struct State {
registry: prometheus::Registry,
}
impl State {
pub fn metrics(&self) -> Vec<prometheus::proto::MetricFamily> {
self.registry.gather()
}
}

#[derive(Debug, Clone, Deserialize, Serialize, JsonSchema)]
pub enum Network {
#[serde(rename = "mainnet")]
Mainnet,
#[serde(rename = "preprod")]
Preprod,
#[serde(rename = "preview")]
Preview,
}

#[derive(CustomResource, Deserialize, Serialize, Clone, Debug, JsonSchema)]
#[kube(kind = "DbSyncPort", group = "demeter.run", version = "v1", namespaced)]
Expand All @@ -75,11 +54,7 @@ impl DbSyncPort {
.unwrap_or(false)
}

async fn reconcile(
&self,
ctx: Arc<Context>,
pg_client: &mut tokio_postgres::Client,
) -> Result<Action> {
async fn reconcile(&self, ctx: Arc<Context>, pg: &mut Postgres) -> Result<Action, Error> {
let client = ctx.client.clone();
let ns = self.namespace().unwrap();
let name = self.name_any();
Expand All @@ -89,9 +64,9 @@ impl DbSyncPort {
let password = Alphanumeric.sample_string(&mut rand::thread_rng(), 16);

if !self.was_executed() {
match user_already_exists(pg_client, &username).await? {
true => user_enable(pg_client, &username, &password).await?,
false => user_create(pg_client, &username, &password).await?,
match pg.user_already_exists(&username).await? {
true => pg.user_enable(&username, &password).await?,
false => pg.user_create(&username, &password).await?,
};

let new_status = Patch::Apply(json!({
Expand All @@ -114,19 +89,15 @@ impl DbSyncPort {
Ok(Action::requeue(Duration::from_secs(5 * 60)))
}

async fn cleanup(
&self,
ctx: Arc<Context>,
pg_client: &mut tokio_postgres::Client,
) -> Result<Action> {
async fn cleanup(&self, ctx: Arc<Context>, pg: &mut Postgres) -> Result<Action, Error> {
let username = self.status.as_ref().unwrap().username.clone();
user_disable(pg_client, &username).await?;
pg.user_disable(&username).await?;
ctx.metrics.count_user_deactivated(&username);
Ok(Action::await_change())
}
}

async fn reconcile(crd: Arc<DbSyncPort>, ctx: Arc<Context>) -> Result<Action> {
async fn reconcile(crd: Arc<DbSyncPort>, ctx: Arc<Context>) -> Result<Action, Error> {
let url = match crd.spec.network {
Network::Mainnet => &ctx.config.db_url_mainnet,
Network::Preprod => &ctx.config.db_url_preprod,
Expand All @@ -136,12 +107,12 @@ async fn reconcile(crd: Arc<DbSyncPort>, ctx: Arc<Context>) -> Result<Action> {
let ns = crd.namespace().unwrap();
let crds: Api<DbSyncPort> = Api::namespaced(ctx.client.clone(), &ns);

let mut pg_client = postgres::connect(url).await?;
let mut postgres = Postgres::new(url).await?;

finalizer(&crds, DB_SYNC_PORT_FINALIZER, crd, |event| async {
match event {
Event::Apply(crd) => crd.reconcile(ctx.clone(), &mut pg_client).await,
Event::Cleanup(crd) => crd.cleanup(ctx.clone(), &mut pg_client).await,
Event::Apply(crd) => crd.reconcile(ctx.clone(), &mut postgres).await,
Event::Cleanup(crd) => crd.cleanup(ctx.clone(), &mut postgres).await,
}
})
.await
Expand All @@ -154,11 +125,11 @@ fn error_policy(crd: Arc<DbSyncPort>, err: &Error, ctx: Arc<Context>) -> Action
Action::requeue(Duration::from_secs(5))
}

pub async fn run(state: State, config: Config) -> Result<(), Error> {
pub async fn run(state: Arc<State>, config: Config) -> Result<(), Error> {
let client = Client::try_default().await?;
let crds = Api::<DbSyncPort>::all(client.clone());
let metrics = Metrics::default().register(&state.registry).unwrap();
let ctx = Context::new(client, metrics, config);

let ctx = Context::new(client, state.metrics.clone(), config);

Controller::new(crds, WatcherConfig::default().any_semantic())
.shutdown_on_signal()
Expand Down
1 change: 1 addition & 0 deletions src/crdgen.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use ext_cardano_dbsync::controller;
use kube::CustomResourceExt;

fn main() {
Expand Down
117 changes: 101 additions & 16 deletions src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,10 @@
use prometheus::Registry;
use schemars::JsonSchema;
use serde::{Deserialize, Serialize};
use thiserror::Error;

use std::{fmt::Display, time::Duration};

#[derive(Error, Debug)]
pub enum Error {
#[error("Postgres Error: {0}")]
Expand All @@ -10,51 +15,131 @@ pub enum Error {

#[error("Finalizer Error: {0}")]
FinalizerError(#[source] Box<kube::runtime::finalizer::Error<Error>>),

#[error("Env Error: {0}")]
EnvError(#[source] std::env::VarError),

#[error("Prometheus Error: {0}")]
PrometheusError(#[source] prometheus::Error),

#[error("Parse Int Error: {0}")]
ParseIntError(#[source] std::num::ParseIntError),

#[error("Parse Float Error: {0}")]
ParseFloatError(#[source] std::num::ParseFloatError),
}

impl Error {
pub fn metric_label(&self) -> String {
format!("{self:?}").to_lowercase()
}
}

pub type Result<T, E = Error> = std::result::Result<T, E>;

impl From<tokio_postgres::Error> for Error {
fn from(value: tokio_postgres::Error) -> Self {
Error::PgError(value)
}
}

impl From<kube::Error> for Error {
fn from(value: kube::Error) -> Self {
Error::KubeError(value)
}
}
impl From<std::env::VarError> for Error {
fn from(value: std::env::VarError) -> Self {
Error::EnvError(value)
}
}
impl From<prometheus::Error> for Error {
fn from(value: prometheus::Error) -> Self {
Error::PrometheusError(value)
}
}
impl From<std::num::ParseIntError> for Error {
fn from(value: std::num::ParseIntError) -> Self {
Error::ParseIntError(value)
}
}
impl From<std::num::ParseFloatError> for Error {
fn from(value: std::num::ParseFloatError) -> Self {
Error::ParseFloatError(value)
}
}

#[derive(Clone, Default)]
pub struct State {
registry: Registry,
pub metrics: Metrics,
}
impl State {
pub fn new() -> Self {
let registry = Registry::default();
let metrics = Metrics::default().register(&registry).unwrap();
Self { registry, metrics }
}

pub fn metrics_collected(&self) -> Vec<prometheus::proto::MetricFamily> {
self.registry.gather()
}
}

#[derive(Clone)]
pub struct Config {
pub db_url_mainnet: String,
pub db_url_preprod: String,
pub db_url_preview: String,

pub dcu_per_second_mainnet: f64,
pub dcu_per_second_preprod: f64,
pub dcu_per_second_preview: f64,

pub metrics_delay: Duration,
}
impl Config {
pub fn new() -> Self {
Self {
db_url_mainnet: std::env::var("DB_URL_MAINNET").expect("DB_URL_MAINNET must be set"),
db_url_preprod: std::env::var("DB_URL_PREPROD").expect("DB_URL_PREPROD must be set"),
db_url_preview: std::env::var("DB_URL_PREVIEW").expect("DB_URL_PREVIEW must be set"),
}
pub fn try_new() -> Result<Self, Error> {
let db_url_mainnet = std::env::var("DB_URL_MAINNET")?;
let db_url_preprod = std::env::var("DB_URL_PREPROD")?;
let db_url_preview = std::env::var("DB_URL_PREVIEW")?;

let metrics_delay = Duration::from_secs(std::env::var("METRICS_DELAY")?.parse::<u64>()?);

let dcu_per_second_mainnet = std::env::var("DCU_PER_SECOND_MAINNET")?.parse::<f64>()?;
let dcu_per_second_preprod = std::env::var("DCU_PER_SECOND_PREPROD")?.parse::<f64>()?;
let dcu_per_second_preview = std::env::var("DCU_PER_SECOND_PREVIEW")?.parse::<f64>()?;

Ok(Self {
db_url_mainnet,
db_url_preprod,
db_url_preview,
metrics_delay,
dcu_per_second_mainnet,
dcu_per_second_preprod,
dcu_per_second_preview,
})
}
}
impl Default for Config {
fn default() -> Self {
Self::new()

#[derive(Debug, Clone, Deserialize, Serialize, JsonSchema)]
pub enum Network {
#[serde(rename = "mainnet")]
Mainnet,
#[serde(rename = "preprod")]
Preprod,
#[serde(rename = "preview")]
Preview,
}
impl Display for Network {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Network::Mainnet => write!(f, "mainnet"),
Network::Preprod => write!(f, "preprod"),
Network::Preview => write!(f, "preview"),
}
}
}

pub mod controller;
pub mod metrics;
pub mod postgres;
pub use crate::controller::*;

mod metrics;
pub use metrics::Metrics;
pub use controller::*;
pub use metrics::*;
Loading

0 comments on commit 127323f

Please sign in to comment.