Skip to content

Commit

Permalink
feat(analytics): added filter api for dispute analytics (#3724)
Browse files Browse the repository at this point in the history
  • Loading branch information
harsh-sharma-juspay authored Feb 20, 2024
1 parent 49c71d0 commit 6aeb440
Show file tree
Hide file tree
Showing 12 changed files with 255 additions and 1 deletion.
14 changes: 14 additions & 0 deletions crates/analytics/src/clickhouse.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ use crate::{
metrics::{latency::LatencyAvg, ApiEventMetricRow},
},
connector_events::events::ConnectorEventsResult,
disputes::filters::DisputeFilterRow,
outgoing_webhook_event::events::OutgoingWebhookLogsResult,
sdk_events::events::SdkEventsResult,
types::TableEngine,
Expand Down Expand Up @@ -168,6 +169,7 @@ impl super::outgoing_webhook_event::events::OutgoingWebhookLogsFilterAnalytics
for ClickhouseClient
{
}
impl super::disputes::filters::DisputeFilterAnalytics for ClickhouseClient {}

#[derive(Debug, serde::Serialize)]
struct CkhQuery {
Expand Down Expand Up @@ -277,6 +279,18 @@ impl TryInto<RefundFilterRow> for serde_json::Value {
}
}

impl TryInto<DisputeFilterRow> for serde_json::Value {
type Error = Report<ParsingError>;

fn try_into(self) -> Result<DisputeFilterRow, Self::Error> {
serde_json::from_value(self)
.into_report()
.change_context(ParsingError::StructParseFailure(
"Failed to parse DisputeFilterRow in clickhouse results",
))
}
}

impl TryInto<ApiEventMetricRow> for serde_json::Value {
type Error = Report<ParsingError>;

Expand Down
5 changes: 5 additions & 0 deletions crates/analytics/src/disputes.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
mod core;

pub mod filters;

pub use self::core::get_filters;
91 changes: 91 additions & 0 deletions crates/analytics/src/disputes/core.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
use api_models::analytics::{
disputes::DisputeDimensions, DisputeFilterValue, DisputeFiltersResponse,
GetDisputeFilterRequest,
};
use error_stack::ResultExt;

use super::filters::{get_dispute_filter_for_dimension, DisputeFilterRow};
use crate::{
errors::{AnalyticsError, AnalyticsResult},
AnalyticsProvider,
};

pub async fn get_filters(
pool: &AnalyticsProvider,
req: GetDisputeFilterRequest,
merchant_id: &String,
) -> AnalyticsResult<DisputeFiltersResponse> {
let mut res = DisputeFiltersResponse::default();
for dim in req.group_by_names {
let values = match pool {
AnalyticsProvider::Sqlx(pool) => {
get_dispute_filter_for_dimension(dim, merchant_id, &req.time_range, pool)
.await
}
AnalyticsProvider::Clickhouse(pool) => {
get_dispute_filter_for_dimension(dim, merchant_id, &req.time_range, pool)
.await
}
AnalyticsProvider::CombinedCkh(sqlx_pool, ckh_pool) => {
let ckh_result = get_dispute_filter_for_dimension(
dim,
merchant_id,
&req.time_range,
ckh_pool,
)
.await;
let sqlx_result = get_dispute_filter_for_dimension(
dim,
merchant_id,
&req.time_range,
sqlx_pool,
)
.await;
match (&sqlx_result, &ckh_result) {
(Ok(ref sqlx_res), Ok(ref ckh_res)) if sqlx_res != ckh_res => {
router_env::logger::error!(clickhouse_result=?ckh_res, postgres_result=?sqlx_res, "Mismatch between clickhouse & postgres disputes analytics filters")
},
_ => {}
};
ckh_result
}
AnalyticsProvider::CombinedSqlx(sqlx_pool, ckh_pool) => {
let ckh_result = get_dispute_filter_for_dimension(
dim,
merchant_id,
&req.time_range,
ckh_pool,
)
.await;
let sqlx_result = get_dispute_filter_for_dimension(
dim,
merchant_id,
&req.time_range,
sqlx_pool,
)
.await;
match (&sqlx_result, &ckh_result) {
(Ok(ref sqlx_res), Ok(ref ckh_res)) if sqlx_res != ckh_res => {
router_env::logger::error!(clickhouse_result=?ckh_res, postgres_result=?sqlx_res, "Mismatch between clickhouse & postgres disputes analytics filters")
},
_ => {}
};
sqlx_result
}
}
.change_context(AnalyticsError::UnknownError)?
.into_iter()
.filter_map(|fil: DisputeFilterRow| match dim {
DisputeDimensions::DisputeStatus => fil.dispute_status,
DisputeDimensions::DisputeStage => fil.dispute_stage,
DisputeDimensions::ConnectorStatus => fil.connector_status,
DisputeDimensions::Connector => fil.connector,
})
.collect::<Vec<String>>();
res.query_data.push(DisputeFilterValue {
dimension: dim,
values,
})
}
Ok(res)
}
52 changes: 52 additions & 0 deletions crates/analytics/src/disputes/filters.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
use api_models::analytics::{disputes::DisputeDimensions, Granularity, TimeRange};
use common_utils::errors::ReportSwitchExt;
use error_stack::ResultExt;
use time::PrimitiveDateTime;

use crate::{
query::{Aggregate, GroupByClause, QueryBuilder, QueryFilter, ToSql, Window},
types::{AnalyticsCollection, AnalyticsDataSource, FiltersError, FiltersResult, LoadRow},
};
pub trait DisputeFilterAnalytics: LoadRow<DisputeFilterRow> {}

pub async fn get_dispute_filter_for_dimension<T>(
dimension: DisputeDimensions,
merchant: &String,
time_range: &TimeRange,
pool: &T,
) -> FiltersResult<Vec<DisputeFilterRow>>
where
T: AnalyticsDataSource + DisputeFilterAnalytics,
PrimitiveDateTime: ToSql<T>,
AnalyticsCollection: ToSql<T>,
Granularity: GroupByClause<T>,
Aggregate<&'static str>: ToSql<T>,
Window<&'static str>: ToSql<T>,
{
let mut query_builder: QueryBuilder<T> = QueryBuilder::new(AnalyticsCollection::Dispute);

query_builder.add_select_column(dimension).switch()?;
time_range
.set_filter_clause(&mut query_builder)
.attach_printable("Error filtering time range")
.switch()?;

query_builder
.add_filter_clause("merchant_id", merchant)
.switch()?;

query_builder.set_distinct();

query_builder
.execute_query::<DisputeFilterRow, _>(pool)
.await
.change_context(FiltersError::QueryBuildingError)?
.change_context(FiltersError::QueryExecutionFailure)
}
#[derive(Debug, serde::Serialize, Eq, PartialEq, serde::Deserialize)]
pub struct DisputeFilterRow {
pub connector: Option<String>,
pub dispute_status: Option<String>,
pub connector_status: Option<String>,
pub dispute_stage: Option<String>,
}
1 change: 1 addition & 0 deletions crates/analytics/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
mod clickhouse;
pub mod core;
pub mod disputes;
pub mod errors;
pub mod metrics;
pub mod payments;
Expand Down
3 changes: 3 additions & 0 deletions crates/analytics/src/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use api_models::{
analytics::{
self as analytics_api,
api_event::ApiEventDimensions,
disputes::DisputeDimensions,
payments::{PaymentDimensions, PaymentDistributions},
refunds::{RefundDimensions, RefundType},
sdk_events::{SdkEventDimensions, SdkEventNames},
Expand Down Expand Up @@ -362,6 +363,8 @@ impl_to_sql_for_to_string!(
PaymentDimensions,
&PaymentDistributions,
RefundDimensions,
&DisputeDimensions,
DisputeDimensions,
PaymentMethod,
PaymentMethodType,
AuthenticationType,
Expand Down
30 changes: 30 additions & 0 deletions crates/analytics/src/sqlx.rs
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,7 @@ impl super::payments::metrics::PaymentMetricAnalytics for SqlxClient {}
impl super::payments::distribution::PaymentDistributionAnalytics for SqlxClient {}
impl super::refunds::metrics::RefundMetricAnalytics for SqlxClient {}
impl super::refunds::filters::RefundFilterAnalytics for SqlxClient {}
impl super::disputes::filters::DisputeFilterAnalytics for SqlxClient {}

#[async_trait::async_trait]
impl AnalyticsDataSource for SqlxClient {
Expand Down Expand Up @@ -425,6 +426,35 @@ impl<'a> FromRow<'a, PgRow> for super::refunds::filters::RefundFilterRow {
}
}

impl<'a> FromRow<'a, PgRow> for super::disputes::filters::DisputeFilterRow {
fn from_row(row: &'a PgRow) -> sqlx::Result<Self> {
let dispute_stage: Option<String> = row.try_get("dispute_stage").or_else(|e| match e {
ColumnNotFound(_) => Ok(Default::default()),
e => Err(e),
})?;
let dispute_status: Option<String> =
row.try_get("dispute_status").or_else(|e| match e {
ColumnNotFound(_) => Ok(Default::default()),
e => Err(e),
})?;
let connector: Option<String> = row.try_get("connector").or_else(|e| match e {
ColumnNotFound(_) => Ok(Default::default()),
e => Err(e),
})?;
let connector_status: Option<String> =
row.try_get("connector_status").or_else(|e| match e {
ColumnNotFound(_) => Ok(Default::default()),
e => Err(e),
})?;
Ok(Self {
dispute_stage,
dispute_status,
connector,
connector_status,
})
}
}

impl ToSql<SqlxClient> for PrimitiveDateTime {
fn to_sql(&self, _table_engine: &TableEngine) -> error_stack::Result<String, ParsingError> {
Ok(self.to_string())
Expand Down
24 changes: 24 additions & 0 deletions crates/api_models/src/analytics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use masking::Secret;

use self::{
api_event::{ApiEventDimensions, ApiEventMetrics},
disputes::DisputeDimensions,
payments::{PaymentDimensions, PaymentDistributions, PaymentMetrics},
refunds::{RefundDimensions, RefundMetrics},
sdk_events::{SdkEventDimensions, SdkEventMetrics},
Expand Down Expand Up @@ -247,3 +248,26 @@ pub struct GetApiEventMetricRequest {
#[serde(default)]
pub delta: bool,
}

#[derive(Debug, serde::Deserialize, serde::Serialize)]
#[serde(rename_all = "camelCase")]

pub struct GetDisputeFilterRequest {
pub time_range: TimeRange,
#[serde(default)]
pub group_by_names: Vec<DisputeDimensions>,
}

#[derive(Debug, Default, serde::Serialize, Eq, PartialEq)]
#[serde(rename_all = "camelCase")]
pub struct DisputeFiltersResponse {
pub query_data: Vec<DisputeFilterValue>,
}

#[derive(Debug, serde::Serialize, Eq, PartialEq)]
#[serde(rename_all = "camelCase")]

pub struct DisputeFilterValue {
pub dimension: DisputeDimensions,
pub values: Vec<String>,
}
1 change: 1 addition & 0 deletions crates/api_models/src/analytics/disputes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ pub enum DisputeDimensions {
Connector,
DisputeStatus,
ConnectorStatus,
DisputeStage,
}

impl From<DisputeDimensions> for NameDescription {
Expand Down
4 changes: 3 additions & 1 deletion crates/api_models/src/events.rs
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,9 @@ impl_misc_api_event_type!(
SdkEventsRequest,
ReportRequest,
ConnectorEventsRequest,
OutgoingWebhookLogsRequest
OutgoingWebhookLogsRequest,
GetDisputeFilterRequest,
DisputeFiltersResponse
);

#[cfg(feature = "stripe")]
Expand Down
30 changes: 30 additions & 0 deletions crates/router/src/analytics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,10 @@ pub mod routes {
web::resource("metrics/api_events")
.route(web::post().to(get_api_events_metrics)),
)
.service(
web::resource("filters/disputes")
.route(web::post().to(get_dispute_filters)),
)
}
route
}
Expand Down Expand Up @@ -582,4 +586,30 @@ pub mod routes {
))
.await
}

pub async fn get_dispute_filters(
state: web::Data<AppState>,
req: actix_web::HttpRequest,
json_payload: web::Json<api_models::analytics::GetDisputeFilterRequest>,
) -> impl Responder {
let flow = AnalyticsFlow::GetDisputeFilters;
Box::pin(api::server_wrap(
flow,
state,
&req,
json_payload.into_inner(),
|state, auth: AuthenticationData, req| async move {
analytics::disputes::get_filters(
&state.pool,
req,
&auth.merchant_account.merchant_id,
)
.await
.map(ApplicationResponse::Json)
},
&auth::JWTAuth(Permission::Analytics),
api_locking::LockAction::NotApplicable,
))
.await
}
}
1 change: 1 addition & 0 deletions crates/router_env/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ pub enum AnalyticsFlow {
GetApiEventFilters,
GetConnectorEvents,
GetOutgoingWebhookEvents,
GetDisputeFilters,
}

impl FlowMetric for AnalyticsFlow {}

0 comments on commit 6aeb440

Please sign in to comment.