Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(analytics): Adds tenant ID to Analytics Provider #4594

Closed
wants to merge 13 commits into from
3 changes: 2 additions & 1 deletion crates/analytics/src/clickhouse.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use std::sync::Arc;

use actix_web::http::StatusCode;
use common_utils::errors::ParsingError;
use common_utils::{errors::ParsingError, types::TenantID};
use error_stack::{report, Report, ResultExt};
use router_env::logger;
use time::PrimitiveDateTime;
Expand Down Expand Up @@ -35,6 +35,7 @@ pub type ClickhouseResult<T> = error_stack::Result<T, ClickhouseError>;
#[derive(Clone, Debug)]
pub struct ClickhouseClient {
pub config: Arc<ClickhouseConfig>,
pub tenant_id: TenantID,
}

#[derive(Clone, Debug, serde::Deserialize)]
Expand Down
15 changes: 10 additions & 5 deletions crates/analytics/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ pub mod search;
mod sqlx;
mod types;
use api_event::metrics::{ApiEventMetric, ApiEventMetricRow};
use common_utils::errors::CustomResult;
use common_utils::{errors::CustomResult, types::TenantID};
use disputes::metrics::{DisputeMetric, DisputeMetricRow};
use hyperswitch_interfaces::secrets_interface::{
secret_handler::SecretsHandler,
Expand Down Expand Up @@ -601,22 +601,27 @@ impl AnalyticsProvider {
}
}

pub async fn from_conf(config: &AnalyticsConfig) -> Self {
pub async fn from_conf(config: &AnalyticsConfig, tenant_id: TenantID) -> Self {
match config {
AnalyticsConfig::Sqlx { sqlx } => Self::Sqlx(SqlxClient::from_conf(sqlx).await),
AnalyticsConfig::Sqlx { sqlx } => {
Self::Sqlx(SqlxClient::from_conf(sqlx, tenant_id).await)
}
AnalyticsConfig::Clickhouse { clickhouse } => Self::Clickhouse(ClickhouseClient {
config: Arc::new(clickhouse.clone()),
tenant_id,
}),
AnalyticsConfig::CombinedCkh { sqlx, clickhouse } => Self::CombinedCkh(
SqlxClient::from_conf(sqlx).await,
SqlxClient::from_conf(sqlx, tenant_id.clone()).await,
ClickhouseClient {
config: Arc::new(clickhouse.clone()),
tenant_id,
},
),
AnalyticsConfig::CombinedSqlx { sqlx, clickhouse } => Self::CombinedSqlx(
SqlxClient::from_conf(sqlx).await,
SqlxClient::from_conf(sqlx, tenant_id.clone()).await,
ClickhouseClient {
config: Arc::new(clickhouse.clone()),
tenant_id,
},
),
}
Expand Down
12 changes: 9 additions & 3 deletions crates/analytics/src/sqlx.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,10 @@ use api_models::{
analytics::refunds::RefundType,
enums::{DisputeStage, DisputeStatus},
};
use common_utils::errors::{CustomResult, ParsingError};
use common_utils::{
errors::{CustomResult, ParsingError},
types::TenantID,
};
use diesel_models::enums::{
AttemptStatus, AuthenticationType, Currency, PaymentMethod, RefundStatus,
};
Expand All @@ -31,6 +34,8 @@ use super::{
#[derive(Debug, Clone)]
pub struct SqlxClient {
pool: Pool<Postgres>,
#[allow(unused)]
tenant_id: TenantID,
}

impl Default for SqlxClient {
Expand All @@ -44,12 +49,13 @@ impl Default for SqlxClient {
pool: PgPoolOptions::new()
.connect_lazy(&database_url)
.expect("SQLX Pool Creation failed"),
tenant_id: TenantID::default(),
}
}
}

impl SqlxClient {
pub async fn from_conf(conf: &Database) -> Self {
pub async fn from_conf(conf: &Database, tenant_id: TenantID) -> Self {
let password = &conf.password.peek();
let database_url = format!(
"postgres://{}:{}@{}:{}/{}",
Expand All @@ -61,7 +67,7 @@ impl SqlxClient {
.acquire_timeout(std::time::Duration::from_secs(conf.connection_timeout))
.connect_lazy(&database_url)
.expect("SQLX Pool Creation failed");
Self { pool }
Self { pool, tenant_id }
}
}

Expand Down
10 changes: 10 additions & 0 deletions crates/common_utils/src/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -231,6 +231,16 @@
}
}

/// This struct represnts the Tenant Id and stores it in a String

Check warning on line 234 in crates/common_utils/src/types.rs

View workflow job for this annotation

GitHub Actions / Spell check

"represnts" should be "represents".
#[derive(serde::Serialize, Clone, Debug)]
pub struct TenantID(#[allow(unused)] String);

impl Default for TenantID {
fn default() -> Self {
Self(String::from("default"))
}
}

/// Amount convertor trait for connector
pub trait AmountConvertor: Send {
/// Output type for the connector
Expand Down
6 changes: 1 addition & 5 deletions crates/router/src/db/kafka_store.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use std::sync::Arc;

use common_enums::enums::MerchantStorageScheme;
use common_utils::{errors::CustomResult, id_type, pii};
use common_utils::{errors::CustomResult, id_type, pii, types::TenantID};
use diesel_models::{
enums,
enums::ProcessTrackerStatus,
Expand All @@ -25,7 +25,6 @@ use scheduler::{
db::{process_tracker::ProcessTrackerInterface, queue::QueueInterface},
SchedulerInterface,
};
use serde::Serialize;
use storage_impl::redis::kv_store::RedisConnInterface;
use time::PrimitiveDateTime;

Expand Down Expand Up @@ -76,9 +75,6 @@ use crate::{
},
};

#[derive(Debug, Clone, Serialize)]
pub struct TenantID(pub String);

#[derive(Clone)]
pub struct KafkaStore {
kafka_producer: KafkaProducer,
Expand Down
10 changes: 7 additions & 3 deletions crates/router/src/routes/app.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use actix_web::{web, Scope};
use api_models::routing::RoutingRetrieveQuery;
#[cfg(feature = "olap")]
use common_enums::TransactionType;
use common_utils::types::TenantID;
#[cfg(feature = "email")]
use external_services::email::{ses::AwsSes, EmailService};
use external_services::file_storage::FileStorageInterface;
Expand Down Expand Up @@ -198,7 +199,7 @@ impl AppState {
.await
.expect("Failed to create store"),
kafka_client.clone(),
crate::db::kafka_store::TenantID("default".to_string()),
TenantID::default(),
)
.await,
),
Expand All @@ -218,8 +219,11 @@ impl AppState {
};

#[cfg(feature = "olap")]
let pool =
crate::analytics::AnalyticsProvider::from_conf(conf.analytics.get_inner()).await;
let pool = crate::analytics::AnalyticsProvider::from_conf(
conf.analytics.get_inner(),
TenantID::default(),
)
.await;

#[cfg(feature = "email")]
let email_client = Arc::new(create_email_client(&conf).await);
Expand Down
2 changes: 1 addition & 1 deletion crates/router/src/services/kafka.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ use crate::types::storage::Dispute;

// Using message queue result here to avoid confusion with Kafka result provided by library
pub type MQResult<T> = CustomResult<T, KafkaError>;
use crate::db::kafka_store::TenantID;
use common_utils::types::TenantID;

pub trait KafkaMessage
where
Expand Down
Loading