Skip to content

Commit

Permalink
feat: add deep health check for drainer (#3396)
Browse files Browse the repository at this point in the history
Co-authored-by: hyperswitch-bot[bot] <148525504+hyperswitch-bot[bot]@users.noreply.github.com>
Co-authored-by: dracarys18 <[email protected]>
  • Loading branch information
3 people authored Feb 2, 2024
1 parent 91519d8 commit 63c383f
Show file tree
Hide file tree
Showing 9 changed files with 396 additions and 3 deletions.
3 changes: 3 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions crates/api_models/src/health_check.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ pub struct RouterHealthCheckResponse {
}

impl common_utils::events::ApiEventMetric for RouterHealthCheckResponse {}

#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
pub struct SchedulerHealthCheckResponse {
pub database: bool,
Expand Down
3 changes: 3 additions & 0 deletions crates/drainer/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,16 @@ hashicorp-vault = ["external_services/hashicorp-vault"]
vergen = ["router_env/vergen"]

[dependencies]
actix-web = "4.3.1"
async-bb8-diesel = { git = "https://github.com/jarnura/async-bb8-diesel", rev = "53b4ab901aab7635c8215fd1c2d542c8db443094" }
bb8 = "0.8"
clap = { version = "4.3.2", default-features = false, features = ["std", "derive", "help", "usage"] }
config = { version = "0.13.3", features = ["toml"] }
diesel = { version = "2.1.0", features = ["postgres"] }
error-stack = "0.3.1"
mime = "0.3.17"
once_cell = "1.18.0"
reqwest = { version = "0.11.18" }
serde = "1.0.193"
serde_json = "1.0.108"
serde_path_to_error = "0.1.14"
Expand Down
26 changes: 26 additions & 0 deletions crates/drainer/src/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,22 @@ pub enum DrainerError {
ParsingError(error_stack::Report<common_utils::errors::ParsingError>),
#[error("Unexpected error occurred: {0}")]
UnexpectedError(String),
#[error("I/O: {0}")]
IoError(std::io::Error),
}

#[derive(Debug, Error, Clone, serde::Serialize)]
pub enum HealthCheckError {
#[error("Database health check is failing with error: {message}")]
DbError { message: String },
#[error("Redis health check is failing with error: {message}")]
RedisError { message: String },
}

impl From<std::io::Error> for DrainerError {
fn from(err: std::io::Error) -> Self {
Self::IoError(err)
}
}

pub type DrainerResult<T> = error_stack::Result<T, DrainerError>;
Expand All @@ -30,3 +46,13 @@ impl From<error_stack::Report<redis::errors::RedisError>> for DrainerError {
Self::RedisError(err)
}
}

impl actix_web::ResponseError for HealthCheckError {
fn status_code(&self) -> reqwest::StatusCode {
use reqwest::StatusCode;

match self {
Self::DbError { .. } | Self::RedisError { .. } => StatusCode::INTERNAL_SERVER_ERROR,
}
}
}
268 changes: 268 additions & 0 deletions crates/drainer/src/health_check.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,268 @@
use std::sync::Arc;

use actix_web::{web, Scope};
use async_bb8_diesel::{AsyncConnection, AsyncRunQueryDsl};
use common_utils::errors::CustomResult;
use diesel_models::{Config, ConfigNew};
use error_stack::ResultExt;
use router_env::{instrument, logger, tracing};

use crate::{
connection::{pg_connection, redis_connection},
errors::HealthCheckError,
services::{self, Store},
settings::Settings,
};

pub const TEST_STREAM_NAME: &str = "TEST_STREAM_0";
pub const TEST_STREAM_DATA: &[(&str, &str)] = &[("data", "sample_data")];

pub struct Health;

impl Health {
pub fn server(conf: Settings, store: Arc<Store>) -> Scope {
web::scope("health")
.app_data(web::Data::new(conf))
.app_data(web::Data::new(store))
.service(web::resource("").route(web::get().to(health)))
.service(web::resource("/ready").route(web::get().to(deep_health_check)))
}
}

#[instrument(skip_all)]
pub async fn health() -> impl actix_web::Responder {
logger::info!("Drainer health was called");
actix_web::HttpResponse::Ok().body("Drainer health is good")
}

#[instrument(skip_all)]
pub async fn deep_health_check(
conf: web::Data<Settings>,
store: web::Data<Arc<Store>>,
) -> impl actix_web::Responder {
match deep_health_check_func(conf, store).await {
Ok(response) => services::http_response_json(
serde_json::to_string(&response)
.map_err(|err| {
logger::error!(serialization_error=?err);
})
.unwrap_or_default(),
),

Err(err) => services::log_and_return_error_response(err),
}
}

#[instrument(skip_all)]
pub async fn deep_health_check_func(
conf: web::Data<Settings>,
store: web::Data<Arc<Store>>,
) -> Result<DrainerHealthCheckResponse, error_stack::Report<HealthCheckError>> {
logger::info!("Deep health check was called");

logger::debug!("Database health check begin");

let db_status = store.health_check_db().await.map(|_| true).map_err(|err| {
error_stack::report!(HealthCheckError::DbError {
message: err.to_string()
})
})?;

logger::debug!("Database health check end");

logger::debug!("Redis health check begin");

let redis_status = store
.health_check_redis(&conf.into_inner())
.await
.map(|_| true)
.map_err(|err| {
error_stack::report!(HealthCheckError::RedisError {
message: err.to_string()
})
})?;

logger::debug!("Redis health check end");

Ok(DrainerHealthCheckResponse {
database: db_status,
redis: redis_status,
})
}

#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
pub struct DrainerHealthCheckResponse {
pub database: bool,
pub redis: bool,
}

#[async_trait::async_trait]
pub trait HealthCheckInterface {
async fn health_check_db(&self) -> CustomResult<(), HealthCheckDBError>;
async fn health_check_redis(&self, conf: &Settings) -> CustomResult<(), HealthCheckRedisError>;
}

#[async_trait::async_trait]
impl HealthCheckInterface for Store {
async fn health_check_db(&self) -> CustomResult<(), HealthCheckDBError> {
let conn = pg_connection(&self.master_pool).await;

conn
.transaction_async(|conn| {
Box::pin(async move {
let query =
diesel::select(diesel::dsl::sql::<diesel::sql_types::Integer>("1 + 1"));
let _x: i32 = query.get_result_async(&conn).await.map_err(|err| {
logger::error!(read_err=?err,"Error while reading element in the database");
HealthCheckDBError::DbReadError
})?;

logger::debug!("Database read was successful");

let config = ConfigNew {
key: "test_key".to_string(),
config: "test_value".to_string(),
};

config.insert(&conn).await.map_err(|err| {
logger::error!(write_err=?err,"Error while writing to database");
HealthCheckDBError::DbWriteError
})?;

logger::debug!("Database write was successful");

Config::delete_by_key(&conn, "test_key").await.map_err(|err| {
logger::error!(delete_err=?err,"Error while deleting element in the database");
HealthCheckDBError::DbDeleteError
})?;

logger::debug!("Database delete was successful");

Ok::<_, HealthCheckDBError>(())
})
})
.await?;

Ok(())
}

async fn health_check_redis(&self, conf: &Settings) -> CustomResult<(), HealthCheckRedisError> {
let redis_conn = redis_connection(conf).await;

redis_conn
.serialize_and_set_key_with_expiry("test_key", "test_value", 30)
.await
.change_context(HealthCheckRedisError::SetFailed)?;

logger::debug!("Redis set_key was successful");

redis_conn
.get_key("test_key")
.await
.change_context(HealthCheckRedisError::GetFailed)?;

logger::debug!("Redis get_key was successful");

redis_conn
.delete_key("test_key")
.await
.change_context(HealthCheckRedisError::DeleteFailed)?;

logger::debug!("Redis delete_key was successful");

redis_conn
.stream_append_entry(
TEST_STREAM_NAME,
&redis_interface::RedisEntryId::AutoGeneratedID,
TEST_STREAM_DATA.to_vec(),
)
.await
.change_context(HealthCheckRedisError::StreamAppendFailed)?;

logger::debug!("Stream append succeeded");

let output = self
.redis_conn
.stream_read_entries(TEST_STREAM_NAME, "0-0", Some(10))
.await
.change_context(HealthCheckRedisError::StreamReadFailed)?;
logger::debug!("Stream read succeeded");

let (_, id_to_trim) = output
.get(TEST_STREAM_NAME)
.and_then(|entries| {
entries
.last()
.map(|last_entry| (entries, last_entry.0.clone()))
})
.ok_or(error_stack::report!(
HealthCheckRedisError::StreamReadFailed
))?;
logger::debug!("Stream parse succeeded");

redis_conn
.stream_trim_entries(
TEST_STREAM_NAME,
(
redis_interface::StreamCapKind::MinID,
redis_interface::StreamCapTrim::Exact,
id_to_trim,
),
)
.await
.change_context(HealthCheckRedisError::StreamTrimFailed)?;
logger::debug!("Stream trim succeeded");

Ok(())
}
}

#[allow(clippy::enum_variant_names)]
#[derive(Debug, thiserror::Error)]
pub enum HealthCheckDBError {
#[error("Error while connecting to database")]
DbError,
#[error("Error while writing to database")]
DbWriteError,
#[error("Error while reading element in the database")]
DbReadError,
#[error("Error while deleting element in the database")]
DbDeleteError,
#[error("Unpredictable error occurred")]
UnknownError,
#[error("Error in database transaction")]
TransactionError,
}

impl From<diesel::result::Error> for HealthCheckDBError {
fn from(error: diesel::result::Error) -> Self {
match error {
diesel::result::Error::DatabaseError(_, _) => Self::DbError,

diesel::result::Error::RollbackErrorOnCommit { .. }
| diesel::result::Error::RollbackTransaction
| diesel::result::Error::AlreadyInTransaction
| diesel::result::Error::NotInTransaction
| diesel::result::Error::BrokenTransactionManager => Self::TransactionError,

_ => Self::UnknownError,
}
}
}

#[allow(clippy::enum_variant_names)]
#[derive(Debug, thiserror::Error)]
pub enum HealthCheckRedisError {
#[error("Failed to set key value in Redis")]
SetFailed,
#[error("Failed to get key value in Redis")]
GetFailed,
#[error("Failed to delete key value in Redis")]
DeleteFailed,
#[error("Failed to append data to the stream in Redis")]
StreamAppendFailed,
#[error("Failed to read data from the stream in Redis")]
StreamReadFailed,
#[error("Failed to trim data from the stream in Redis")]
StreamTrimFailed,
}
22 changes: 21 additions & 1 deletion crates/drainer/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
mod connection;
pub mod errors;
mod handler;
mod health_check;
pub mod logger;
pub(crate) mod metrics;
mod query;
Expand All @@ -11,14 +12,18 @@ mod types;
mod utils;
use std::sync::Arc;

use actix_web::dev::Server;
use common_utils::signals::get_allowed_signals;
use diesel_models::kv;
use error_stack::{IntoReport, ResultExt};
use router_env::{instrument, tracing};
use tokio::sync::mpsc;

use crate::{
connection::pg_connection, services::Store, settings::DrainerSettings, types::StreamData,
connection::pg_connection,
services::Store,
settings::{DrainerSettings, Settings},
types::StreamData,
};

pub async fn start_drainer(store: Arc<Store>, conf: DrainerSettings) -> errors::DrainerResult<()> {
Expand Down Expand Up @@ -49,3 +54,18 @@ pub async fn start_drainer(store: Arc<Store>, conf: DrainerSettings) -> errors::

Ok(())
}

pub async fn start_web_server(
conf: Settings,
store: Arc<Store>,
) -> Result<Server, errors::DrainerError> {
let server = conf.server.clone();
let web_server = actix_web::HttpServer::new(move || {
actix_web::App::new().service(health_check::Health::server(conf.clone(), store.clone()))
})
.bind((server.host.as_str(), server.port))?
.run();
let _ = web_server.handle();

Ok(web_server)
}
Loading

0 comments on commit 63c383f

Please sign in to comment.