Skip to content

Commit

Permalink
feat(consolidated-kafka-events): add consolidated kafka payment events (
Browse files Browse the repository at this point in the history
#4798)

Co-authored-by: hyperswitch-bot[bot] <148525504+hyperswitch-bot[bot]@users.noreply.github.com>
Co-authored-by: Sampras Lopes <[email protected]>
  • Loading branch information
3 people authored Jun 3, 2024
1 parent 8650077 commit ccee1a9
Show file tree
Hide file tree
Showing 10 changed files with 455 additions and 39 deletions.
26 changes: 13 additions & 13 deletions config/config.example.toml
Original file line number Diff line number Diff line change
Expand Up @@ -263,8 +263,7 @@ stripe = { banks = "alior_bank,bank_millennium,bank_nowy_bfg_sa,bank_pekao_sa,ba

# This data is used to call respective connectors for wallets and cards
[connectors.supported]
wallets = ["klarna",
"mifinity", "braintree", "applepay"]
wallets = ["klarna", "mifinity", "braintree", "applepay"]
rewards = ["cashtocode", "zen"]
cards = [
"adyen",
Expand Down Expand Up @@ -352,8 +351,8 @@ email_role_arn = "" # The amazon resource name ( arn ) of the role which
sts_role_session_name = "" # An identifier for the assumed role session, used to uniquely identify a session.

[user]
password_validity_in_days = 90 # Number of days after which password should be updated
two_factor_auth_expiry_in_secs = 300 # Number of seconds after which 2FA should be done again if doing update/change from inside
password_validity_in_days = 90 # Number of days after which password should be updated
two_factor_auth_expiry_in_secs = 300 # Number of seconds after which 2FA should be done again if doing update/change from inside

#tokenization configuration which describe token lifetime and payment method for specific connector
[tokenization]
Expand All @@ -364,7 +363,7 @@ stax = { long_lived_token = true, payment_method = "card,bank_debit" }
square = { long_lived_token = false, payment_method = "card" }
braintree = { long_lived_token = false, payment_method = "card" }
gocardless = { long_lived_token = true, payment_method = "bank_debit" }
billwerk = {long_lived_token = false, payment_method = "card"}
billwerk = { long_lived_token = false, payment_method = "card" }

[temp_locker_enable_config]
stripe = { payment_method = "bank_transfer" }
Expand Down Expand Up @@ -397,16 +396,16 @@ slack_invite_url = "https://www.example.com/" # Slack invite url for hyperswit
discord_invite_url = "https://www.example.com/" # Discord invite url for hyperswitch

[mandates.supported_payment_methods]
card.credit = { connector_list = "stripe,adyen,cybersource,bankofamerica"} # Mandate supported payment method type and connector for card
wallet.paypal = { connector_list = "adyen" } # Mandate supported payment method type and connector for wallets
pay_later.klarna = { connector_list = "adyen" } # Mandate supported payment method type and connector for pay_later
bank_debit.ach = { connector_list = "gocardless" } # Mandate supported payment method type and connector for bank_debit
bank_debit.becs = { connector_list = "gocardless" } # Mandate supported payment method type and connector for bank_debit
bank_debit.sepa = { connector_list = "gocardless" } # Mandate supported payment method type and connector for bank_debit
bank_redirect.ideal = { connector_list = "stripe,adyen,globalpay" } # Mandate supported payment method type and connector for bank_redirect
card.credit = { connector_list = "stripe,adyen,cybersource,bankofamerica" } # Mandate supported payment method type and connector for card
wallet.paypal = { connector_list = "adyen" } # Mandate supported payment method type and connector for wallets
pay_later.klarna = { connector_list = "adyen" } # Mandate supported payment method type and connector for pay_later
bank_debit.ach = { connector_list = "gocardless" } # Mandate supported payment method type and connector for bank_debit
bank_debit.becs = { connector_list = "gocardless" } # Mandate supported payment method type and connector for bank_debit
bank_debit.sepa = { connector_list = "gocardless" } # Mandate supported payment method type and connector for bank_debit
bank_redirect.ideal = { connector_list = "stripe,adyen,globalpay" } # Mandate supported payment method type and connector for bank_redirect
bank_redirect.sofort = { connector_list = "stripe,adyen,globalpay" }
wallet.apple_pay = { connector_list = "stripe,adyen,cybersource,noon,bankofamerica" }
wallet.google_pay = { connector_list = "bankofamerica"}
wallet.google_pay = { connector_list = "bankofamerica" }
bank_redirect.giropay = { connector_list = "adyen,globalpay" }


Expand Down Expand Up @@ -589,6 +588,7 @@ outgoing_webhook_logs_topic = "topic" # Kafka topic to be used for outgoing webh
dispute_analytics_topic = "topic" # Kafka topic to be used for Dispute events
audit_events_topic = "topic" # Kafka topic to be used for Payment Audit events
payout_analytics_topic = "topic" # Kafka topic to be used for Payouts and PayoutAttempt events
consolidated_events_topic = "topic" # Kafka topic to be used for Consolidated events

# File storage configuration
[file_storage]
Expand Down
1 change: 1 addition & 0 deletions config/deployments/env_specific.toml
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ outgoing_webhook_logs_topic = "topic" # Kafka topic to be used for outgoing webh
dispute_analytics_topic = "topic" # Kafka topic to be used for Dispute events
audit_events_topic = "topic" # Kafka topic to be used for Payment Audit events
payout_analytics_topic = "topic" # Kafka topic to be used for Payouts and PayoutAttempt events
consolidated_events_topic = "topic" # Kafka topic to be used for Consolidated events

# File storage configuration
[file_storage]
Expand Down
6 changes: 3 additions & 3 deletions config/development.toml
Original file line number Diff line number Diff line change
Expand Up @@ -89,8 +89,7 @@ vault_private_key = ""
tunnel_private_key = ""

[connectors.supported]
wallets = ["klarna",
"mifinity", "braintree", "applepay", "adyen"]
wallets = ["klarna", "mifinity", "braintree", "applepay", "adyen"]
rewards = ["cashtocode", "zen"]
cards = [
"aci",
Expand Down Expand Up @@ -559,7 +558,7 @@ redis_lock_expiry_seconds = 180 # 3 * 60 seconds
delay_between_retries_in_milliseconds = 500

[kv_config]
ttl = 900 # 15 * 60 seconds
ttl = 900 # 15 * 60 seconds
soft_kill = false

[frm]
Expand All @@ -579,6 +578,7 @@ outgoing_webhook_logs_topic = "hyperswitch-outgoing-webhook-events"
dispute_analytics_topic = "hyperswitch-dispute-events"
audit_events_topic = "hyperswitch-audit-events"
payout_analytics_topic = "hyperswitch-payout-events"
consolidated_events_topic = "hyperswitch-consolidated-events"

[analytics]
source = "sqlx"
Expand Down
10 changes: 5 additions & 5 deletions config/docker_compose.toml
Original file line number Diff line number Diff line change
Expand Up @@ -180,8 +180,7 @@ zsl.base_url = "https://api.sitoffalb.net/"
apple_pay = { country = "AU,CN,HK,JP,MO,MY,NZ,SG,TW,AM,AT,AZ,BY,BE,BG,HR,CY,CZ,DK,EE,FO,FI,FR,GE,DE,GR,GL,GG,HU,IS,IE,IM,IT,KZ,JE,LV,LI,LT,LU,MT,MD,MC,ME,NL,NO,PL,PT,RO,SM,RS,SK,SI,ES,SE,CH,UA,GB,AR,CO,CR,BR,MX,PE,BH,IL,JO,KW,PS,QA,SA,AE,CA,UM,US,KR,VN,MA,ZA,VA,CL,SV,GT,HN,PA", currency = "AED,AUD,CHF,CAD,EUR,GBP,HKD,SGD,USD" }

[connectors.supported]
wallets = ["klarna",
"mifinity", "braintree", "applepay"]
wallets = ["klarna", "mifinity", "braintree", "applepay"]
rewards = ["cashtocode", "zen"]
cards = [
"aci",
Expand Down Expand Up @@ -239,7 +238,7 @@ cards = [
"worldline",
"worldpay",
"zen",
"zsl"
"zsl",
]

[delayed_session_response]
Expand Down Expand Up @@ -269,7 +268,7 @@ stax = { long_lived_token = true, payment_method = "card,bank_debit" }
square = { long_lived_token = false, payment_method = "card" }
braintree = { long_lived_token = false, payment_method = "card" }
gocardless = { long_lived_token = true, payment_method = "bank_debit" }
billwerk = {long_lived_token = false, payment_method = "card"}
billwerk = { long_lived_token = false, payment_method = "card" }

[temp_locker_enable_config]
stripe = { payment_method = "bank_transfer" }
Expand Down Expand Up @@ -433,6 +432,7 @@ outgoing_webhook_logs_topic = "hyperswitch-outgoing-webhook-events"
dispute_analytics_topic = "hyperswitch-dispute-events"
audit_events_topic = "hyperswitch-audit-events"
payout_analytics_topic = "hyperswitch-payout-events"
consolidated_events_topic = "hyperswitch-consolidated-events"

[analytics]
source = "sqlx"
Expand All @@ -454,7 +454,7 @@ connection_timeout = 10
queue_strategy = "Fifo"

[kv_config]
ttl = 900 # 15 * 60 seconds
ttl = 900 # 15 * 60 seconds
soft_kill = false

[frm]
Expand Down
1 change: 1 addition & 0 deletions crates/router/src/events.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ pub enum EventType {
AuditEvent,
#[cfg(feature = "payouts")]
Payout,
Consolidated,
}

#[derive(Debug, Default, Deserialize, Clone)]
Expand Down
105 changes: 87 additions & 18 deletions crates/router/src/services/kafka.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,13 @@ use rdkafka::{
pub mod payout;
use crate::events::EventType;
mod dispute;
mod dispute_event;
mod payment_attempt;
mod payment_attempt_event;
mod payment_intent;
mod payment_intent_event;
mod refund;
mod refund_event;
use diesel_models::refund::Refund;
use hyperswitch_domain_models::payments::{payment_attempt::PaymentAttempt, PaymentIntent};
use serde::Serialize;
Expand All @@ -23,8 +27,10 @@ use time::{OffsetDateTime, PrimitiveDateTime};
#[cfg(feature = "payouts")]
use self::payout::KafkaPayout;
use self::{
dispute::KafkaDispute, payment_attempt::KafkaPaymentAttempt,
payment_intent::KafkaPaymentIntent, refund::KafkaRefund,
dispute::KafkaDispute, dispute_event::KafkaDisputeEvent, payment_attempt::KafkaPaymentAttempt,
payment_attempt_event::KafkaPaymentAttemptEvent, payment_intent::KafkaPaymentIntent,
payment_intent_event::KafkaPaymentIntentEvent, refund::KafkaRefund,
refund_event::KafkaRefundEvent,
};
use crate::types::storage::Dispute;

Expand Down Expand Up @@ -89,6 +95,42 @@ impl<'a, T: KafkaMessage> KafkaMessage for KafkaEvent<'a, T> {
}
}

#[derive(serde::Serialize, Debug)]
struct KafkaConsolidatedLog<'a, T: KafkaMessage> {
#[serde(flatten)]
event: &'a T,
tenant_id: TenantID,
}

#[derive(serde::Serialize, Debug)]
struct KafkaConsolidatedEvent<'a, T: KafkaMessage> {
log: KafkaConsolidatedLog<'a, T>,
log_type: EventType,
}

impl<'a, T: KafkaMessage> KafkaConsolidatedEvent<'a, T> {
fn new(event: &'a T, tenant_id: TenantID) -> Self {
Self {
log: KafkaConsolidatedLog { event, tenant_id },
log_type: event.event_type(),
}
}
}

impl<'a, T: KafkaMessage> KafkaMessage for KafkaConsolidatedEvent<'a, T> {
fn key(&self) -> String {
self.log.event.key()
}

fn event_type(&self) -> EventType {
EventType::Consolidated
}

fn creation_timestamp(&self) -> Option<i64> {
self.log.event.creation_timestamp()
}
}

#[derive(Debug, serde::Deserialize, Clone, Default)]
#[serde(default)]
pub struct KafkaSettings {
Expand All @@ -103,6 +145,7 @@ pub struct KafkaSettings {
audit_events_topic: String,
#[cfg(feature = "payouts")]
payout_analytics_topic: String,
consolidated_events_topic: String,
}

impl KafkaSettings {
Expand Down Expand Up @@ -175,6 +218,12 @@ impl KafkaSettings {
))
})?;

common_utils::fp_utils::when(self.consolidated_events_topic.is_default_or_empty(), || {
Err(ApplicationError::InvalidConfigurationValueError(
"Consolidated Events topic must not be empty".into(),
))
})?;

Ok(())
}
}
Expand All @@ -192,6 +241,7 @@ pub struct KafkaProducer {
audit_events_topic: String,
#[cfg(feature = "payouts")]
payout_analytics_topic: String,
consolidated_events_topic: String,
}

struct RdKafkaProducer(ThreadedProducer<DefaultProducerContext>);
Expand Down Expand Up @@ -233,23 +283,13 @@ impl KafkaProducer {
audit_events_topic: conf.audit_events_topic.clone(),
#[cfg(feature = "payouts")]
payout_analytics_topic: conf.payout_analytics_topic.clone(),
consolidated_events_topic: conf.consolidated_events_topic.clone(),
})
}

pub fn log_event<T: KafkaMessage>(&self, event: &T) -> MQResult<()> {
router_env::logger::debug!("Logging Kafka Event {event:?}");
let topic = match event.event_type() {
EventType::PaymentIntent => &self.intent_analytics_topic,
EventType::PaymentAttempt => &self.attempt_analytics_topic,
EventType::Refund => &self.refund_analytics_topic,
EventType::ApiLogs => &self.api_logs_topic,
EventType::ConnectorApiLogs => &self.connector_logs_topic,
EventType::OutgoingWebhookLogs => &self.outgoing_webhook_logs_topic,
EventType::Dispute => &self.dispute_analytics_topic,
EventType::AuditEvent => &self.audit_events_topic,
#[cfg(feature = "payouts")]
EventType::Payout => &self.payout_analytics_topic,
};
let topic = self.get_topic(event.event_type());
self.producer
.0
.send(
Expand Down Expand Up @@ -281,11 +321,18 @@ impl KafkaProducer {
format!("Failed to add negative attempt event {negative_event:?}")
})?;
};

self.log_event(&KafkaEvent::new(
&KafkaPaymentAttempt::from_storage(attempt),
tenant_id.clone(),
))
.attach_printable_lazy(|| format!("Failed to add positive attempt event {attempt:?}"))
.attach_printable_lazy(|| format!("Failed to add positive attempt event {attempt:?}"))?;

self.log_event(&KafkaConsolidatedEvent::new(
&KafkaPaymentAttemptEvent::from_storage(attempt),
tenant_id.clone(),
))
.attach_printable_lazy(|| format!("Failed to add consolidated attempt event {attempt:?}"))
}

pub async fn log_payment_attempt_delete(
Expand Down Expand Up @@ -317,11 +364,18 @@ impl KafkaProducer {
format!("Failed to add negative intent event {negative_event:?}")
})?;
};

self.log_event(&KafkaEvent::new(
&KafkaPaymentIntent::from_storage(intent),
tenant_id.clone(),
))
.attach_printable_lazy(|| format!("Failed to add positive intent event {intent:?}"))
.attach_printable_lazy(|| format!("Failed to add positive intent event {intent:?}"))?;

self.log_event(&KafkaConsolidatedEvent::new(
&KafkaPaymentIntentEvent::from_storage(intent),
tenant_id.clone(),
))
.attach_printable_lazy(|| format!("Failed to add consolidated intent event {intent:?}"))
}

pub async fn log_payment_intent_delete(
Expand Down Expand Up @@ -353,11 +407,18 @@ impl KafkaProducer {
format!("Failed to add negative refund event {negative_event:?}")
})?;
};

self.log_event(&KafkaEvent::new(
&KafkaRefund::from_storage(refund),
tenant_id.clone(),
))
.attach_printable_lazy(|| format!("Failed to add positive refund event {refund:?}"))
.attach_printable_lazy(|| format!("Failed to add positive refund event {refund:?}"))?;

self.log_event(&KafkaConsolidatedEvent::new(
&KafkaRefundEvent::from_storage(refund),
tenant_id.clone(),
))
.attach_printable_lazy(|| format!("Failed to add consolidated refund event {refund:?}"))
}

pub async fn log_refund_delete(
Expand Down Expand Up @@ -389,11 +450,18 @@ impl KafkaProducer {
format!("Failed to add negative dispute event {negative_event:?}")
})?;
};

self.log_event(&KafkaEvent::new(
&KafkaDispute::from_storage(dispute),
tenant_id.clone(),
))
.attach_printable_lazy(|| format!("Failed to add positive dispute event {dispute:?}"))
.attach_printable_lazy(|| format!("Failed to add positive dispute event {dispute:?}"))?;

self.log_event(&KafkaConsolidatedEvent::new(
&KafkaDisputeEvent::from_storage(dispute),
tenant_id.clone(),
))
.attach_printable_lazy(|| format!("Failed to add consolidated dispute event {dispute:?}"))
}

#[cfg(feature = "payouts")]
Expand Down Expand Up @@ -437,6 +505,7 @@ impl KafkaProducer {
EventType::AuditEvent => &self.audit_events_topic,
#[cfg(feature = "payouts")]
EventType::Payout => &self.payout_analytics_topic,
EventType::Consolidated => &self.consolidated_events_topic,
}
}
}
Expand Down
Loading

0 comments on commit ccee1a9

Please sign in to comment.