From ccee1a9ce9e860bfa04e74329fb47fd73f010b23 Mon Sep 17 00:00:00 2001 From: ivor-juspay <138492857+ivor-juspay@users.noreply.github.com> Date: Mon, 3 Jun 2024 14:46:17 +0530 Subject: [PATCH] feat(consolidated-kafka-events): add consolidated kafka payment events (#4798) Co-authored-by: hyperswitch-bot[bot] <148525504+hyperswitch-bot[bot]@users.noreply.github.com> Co-authored-by: Sampras Lopes --- config/config.example.toml | 26 ++-- config/deployments/env_specific.toml | 1 + config/development.toml | 6 +- config/docker_compose.toml | 10 +- crates/router/src/events.rs | 1 + crates/router/src/services/kafka.rs | 105 +++++++++++++--- .../src/services/kafka/dispute_event.rs | 77 ++++++++++++ .../services/kafka/payment_attempt_event.rs | 119 ++++++++++++++++++ .../services/kafka/payment_intent_event.rs | 75 +++++++++++ .../router/src/services/kafka/refund_event.rs | 74 +++++++++++ 10 files changed, 455 insertions(+), 39 deletions(-) create mode 100644 crates/router/src/services/kafka/dispute_event.rs create mode 100644 crates/router/src/services/kafka/payment_attempt_event.rs create mode 100644 crates/router/src/services/kafka/payment_intent_event.rs create mode 100644 crates/router/src/services/kafka/refund_event.rs diff --git a/config/config.example.toml b/config/config.example.toml index 5433d7a8e46e..5d40d0e55ef9 100644 --- a/config/config.example.toml +++ b/config/config.example.toml @@ -263,8 +263,7 @@ stripe = { banks = "alior_bank,bank_millennium,bank_nowy_bfg_sa,bank_pekao_sa,ba # This data is used to call respective connectors for wallets and cards [connectors.supported] -wallets = ["klarna", - "mifinity", "braintree", "applepay"] +wallets = ["klarna", "mifinity", "braintree", "applepay"] rewards = ["cashtocode", "zen"] cards = [ "adyen", @@ -352,8 +351,8 @@ email_role_arn = "" # The amazon resource name ( arn ) of the role which sts_role_session_name = "" # An identifier for the assumed role session, used to uniquely identify a session. [user] -password_validity_in_days = 90 # Number of days after which password should be updated -two_factor_auth_expiry_in_secs = 300 # Number of seconds after which 2FA should be done again if doing update/change from inside +password_validity_in_days = 90 # Number of days after which password should be updated +two_factor_auth_expiry_in_secs = 300 # Number of seconds after which 2FA should be done again if doing update/change from inside #tokenization configuration which describe token lifetime and payment method for specific connector [tokenization] @@ -364,7 +363,7 @@ stax = { long_lived_token = true, payment_method = "card,bank_debit" } square = { long_lived_token = false, payment_method = "card" } braintree = { long_lived_token = false, payment_method = "card" } gocardless = { long_lived_token = true, payment_method = "bank_debit" } -billwerk = {long_lived_token = false, payment_method = "card"} +billwerk = { long_lived_token = false, payment_method = "card" } [temp_locker_enable_config] stripe = { payment_method = "bank_transfer" } @@ -397,16 +396,16 @@ slack_invite_url = "https://www.example.com/" # Slack invite url for hyperswit discord_invite_url = "https://www.example.com/" # Discord invite url for hyperswitch [mandates.supported_payment_methods] -card.credit = { connector_list = "stripe,adyen,cybersource,bankofamerica"} # Mandate supported payment method type and connector for card -wallet.paypal = { connector_list = "adyen" } # Mandate supported payment method type and connector for wallets -pay_later.klarna = { connector_list = "adyen" } # Mandate supported payment method type and connector for pay_later -bank_debit.ach = { connector_list = "gocardless" } # Mandate supported payment method type and connector for bank_debit -bank_debit.becs = { connector_list = "gocardless" } # Mandate supported payment method type and connector for bank_debit -bank_debit.sepa = { connector_list = "gocardless" } # Mandate supported payment method type and connector for bank_debit -bank_redirect.ideal = { connector_list = "stripe,adyen,globalpay" } # Mandate supported payment method type and connector for bank_redirect +card.credit = { connector_list = "stripe,adyen,cybersource,bankofamerica" } # Mandate supported payment method type and connector for card +wallet.paypal = { connector_list = "adyen" } # Mandate supported payment method type and connector for wallets +pay_later.klarna = { connector_list = "adyen" } # Mandate supported payment method type and connector for pay_later +bank_debit.ach = { connector_list = "gocardless" } # Mandate supported payment method type and connector for bank_debit +bank_debit.becs = { connector_list = "gocardless" } # Mandate supported payment method type and connector for bank_debit +bank_debit.sepa = { connector_list = "gocardless" } # Mandate supported payment method type and connector for bank_debit +bank_redirect.ideal = { connector_list = "stripe,adyen,globalpay" } # Mandate supported payment method type and connector for bank_redirect bank_redirect.sofort = { connector_list = "stripe,adyen,globalpay" } wallet.apple_pay = { connector_list = "stripe,adyen,cybersource,noon,bankofamerica" } -wallet.google_pay = { connector_list = "bankofamerica"} +wallet.google_pay = { connector_list = "bankofamerica" } bank_redirect.giropay = { connector_list = "adyen,globalpay" } @@ -589,6 +588,7 @@ outgoing_webhook_logs_topic = "topic" # Kafka topic to be used for outgoing webh dispute_analytics_topic = "topic" # Kafka topic to be used for Dispute events audit_events_topic = "topic" # Kafka topic to be used for Payment Audit events payout_analytics_topic = "topic" # Kafka topic to be used for Payouts and PayoutAttempt events +consolidated_events_topic = "topic" # Kafka topic to be used for Consolidated events # File storage configuration [file_storage] diff --git a/config/deployments/env_specific.toml b/config/deployments/env_specific.toml index fc02335dcf53..7215e1931a40 100644 --- a/config/deployments/env_specific.toml +++ b/config/deployments/env_specific.toml @@ -81,6 +81,7 @@ outgoing_webhook_logs_topic = "topic" # Kafka topic to be used for outgoing webh dispute_analytics_topic = "topic" # Kafka topic to be used for Dispute events audit_events_topic = "topic" # Kafka topic to be used for Payment Audit events payout_analytics_topic = "topic" # Kafka topic to be used for Payouts and PayoutAttempt events +consolidated_events_topic = "topic" # Kafka topic to be used for Consolidated events # File storage configuration [file_storage] diff --git a/config/development.toml b/config/development.toml index 56d74e674897..496ce6477a8d 100644 --- a/config/development.toml +++ b/config/development.toml @@ -89,8 +89,7 @@ vault_private_key = "" tunnel_private_key = "" [connectors.supported] -wallets = ["klarna", - "mifinity", "braintree", "applepay", "adyen"] +wallets = ["klarna", "mifinity", "braintree", "applepay", "adyen"] rewards = ["cashtocode", "zen"] cards = [ "aci", @@ -559,7 +558,7 @@ redis_lock_expiry_seconds = 180 # 3 * 60 seconds delay_between_retries_in_milliseconds = 500 [kv_config] -ttl = 900 # 15 * 60 seconds +ttl = 900 # 15 * 60 seconds soft_kill = false [frm] @@ -579,6 +578,7 @@ outgoing_webhook_logs_topic = "hyperswitch-outgoing-webhook-events" dispute_analytics_topic = "hyperswitch-dispute-events" audit_events_topic = "hyperswitch-audit-events" payout_analytics_topic = "hyperswitch-payout-events" +consolidated_events_topic = "hyperswitch-consolidated-events" [analytics] source = "sqlx" diff --git a/config/docker_compose.toml b/config/docker_compose.toml index 3dac84e21d83..b94b5c8afbb6 100644 --- a/config/docker_compose.toml +++ b/config/docker_compose.toml @@ -180,8 +180,7 @@ zsl.base_url = "https://api.sitoffalb.net/" apple_pay = { country = "AU,CN,HK,JP,MO,MY,NZ,SG,TW,AM,AT,AZ,BY,BE,BG,HR,CY,CZ,DK,EE,FO,FI,FR,GE,DE,GR,GL,GG,HU,IS,IE,IM,IT,KZ,JE,LV,LI,LT,LU,MT,MD,MC,ME,NL,NO,PL,PT,RO,SM,RS,SK,SI,ES,SE,CH,UA,GB,AR,CO,CR,BR,MX,PE,BH,IL,JO,KW,PS,QA,SA,AE,CA,UM,US,KR,VN,MA,ZA,VA,CL,SV,GT,HN,PA", currency = "AED,AUD,CHF,CAD,EUR,GBP,HKD,SGD,USD" } [connectors.supported] -wallets = ["klarna", - "mifinity", "braintree", "applepay"] +wallets = ["klarna", "mifinity", "braintree", "applepay"] rewards = ["cashtocode", "zen"] cards = [ "aci", @@ -239,7 +238,7 @@ cards = [ "worldline", "worldpay", "zen", - "zsl" + "zsl", ] [delayed_session_response] @@ -269,7 +268,7 @@ stax = { long_lived_token = true, payment_method = "card,bank_debit" } square = { long_lived_token = false, payment_method = "card" } braintree = { long_lived_token = false, payment_method = "card" } gocardless = { long_lived_token = true, payment_method = "bank_debit" } -billwerk = {long_lived_token = false, payment_method = "card"} +billwerk = { long_lived_token = false, payment_method = "card" } [temp_locker_enable_config] stripe = { payment_method = "bank_transfer" } @@ -433,6 +432,7 @@ outgoing_webhook_logs_topic = "hyperswitch-outgoing-webhook-events" dispute_analytics_topic = "hyperswitch-dispute-events" audit_events_topic = "hyperswitch-audit-events" payout_analytics_topic = "hyperswitch-payout-events" +consolidated_events_topic = "hyperswitch-consolidated-events" [analytics] source = "sqlx" @@ -454,7 +454,7 @@ connection_timeout = 10 queue_strategy = "Fifo" [kv_config] -ttl = 900 # 15 * 60 seconds +ttl = 900 # 15 * 60 seconds soft_kill = false [frm] diff --git a/crates/router/src/events.rs b/crates/router/src/events.rs index 4bca58248351..e1ecd09804cf 100644 --- a/crates/router/src/events.rs +++ b/crates/router/src/events.rs @@ -30,6 +30,7 @@ pub enum EventType { AuditEvent, #[cfg(feature = "payouts")] Payout, + Consolidated, } #[derive(Debug, Default, Deserialize, Clone)] diff --git a/crates/router/src/services/kafka.rs b/crates/router/src/services/kafka.rs index 6f2aa9fcd100..3523ab9261fd 100644 --- a/crates/router/src/services/kafka.rs +++ b/crates/router/src/services/kafka.rs @@ -12,9 +12,13 @@ use rdkafka::{ pub mod payout; use crate::events::EventType; mod dispute; +mod dispute_event; mod payment_attempt; +mod payment_attempt_event; mod payment_intent; +mod payment_intent_event; mod refund; +mod refund_event; use diesel_models::refund::Refund; use hyperswitch_domain_models::payments::{payment_attempt::PaymentAttempt, PaymentIntent}; use serde::Serialize; @@ -23,8 +27,10 @@ use time::{OffsetDateTime, PrimitiveDateTime}; #[cfg(feature = "payouts")] use self::payout::KafkaPayout; use self::{ - dispute::KafkaDispute, payment_attempt::KafkaPaymentAttempt, - payment_intent::KafkaPaymentIntent, refund::KafkaRefund, + dispute::KafkaDispute, dispute_event::KafkaDisputeEvent, payment_attempt::KafkaPaymentAttempt, + payment_attempt_event::KafkaPaymentAttemptEvent, payment_intent::KafkaPaymentIntent, + payment_intent_event::KafkaPaymentIntentEvent, refund::KafkaRefund, + refund_event::KafkaRefundEvent, }; use crate::types::storage::Dispute; @@ -89,6 +95,42 @@ impl<'a, T: KafkaMessage> KafkaMessage for KafkaEvent<'a, T> { } } +#[derive(serde::Serialize, Debug)] +struct KafkaConsolidatedLog<'a, T: KafkaMessage> { + #[serde(flatten)] + event: &'a T, + tenant_id: TenantID, +} + +#[derive(serde::Serialize, Debug)] +struct KafkaConsolidatedEvent<'a, T: KafkaMessage> { + log: KafkaConsolidatedLog<'a, T>, + log_type: EventType, +} + +impl<'a, T: KafkaMessage> KafkaConsolidatedEvent<'a, T> { + fn new(event: &'a T, tenant_id: TenantID) -> Self { + Self { + log: KafkaConsolidatedLog { event, tenant_id }, + log_type: event.event_type(), + } + } +} + +impl<'a, T: KafkaMessage> KafkaMessage for KafkaConsolidatedEvent<'a, T> { + fn key(&self) -> String { + self.log.event.key() + } + + fn event_type(&self) -> EventType { + EventType::Consolidated + } + + fn creation_timestamp(&self) -> Option { + self.log.event.creation_timestamp() + } +} + #[derive(Debug, serde::Deserialize, Clone, Default)] #[serde(default)] pub struct KafkaSettings { @@ -103,6 +145,7 @@ pub struct KafkaSettings { audit_events_topic: String, #[cfg(feature = "payouts")] payout_analytics_topic: String, + consolidated_events_topic: String, } impl KafkaSettings { @@ -175,6 +218,12 @@ impl KafkaSettings { )) })?; + common_utils::fp_utils::when(self.consolidated_events_topic.is_default_or_empty(), || { + Err(ApplicationError::InvalidConfigurationValueError( + "Consolidated Events topic must not be empty".into(), + )) + })?; + Ok(()) } } @@ -192,6 +241,7 @@ pub struct KafkaProducer { audit_events_topic: String, #[cfg(feature = "payouts")] payout_analytics_topic: String, + consolidated_events_topic: String, } struct RdKafkaProducer(ThreadedProducer); @@ -233,23 +283,13 @@ impl KafkaProducer { audit_events_topic: conf.audit_events_topic.clone(), #[cfg(feature = "payouts")] payout_analytics_topic: conf.payout_analytics_topic.clone(), + consolidated_events_topic: conf.consolidated_events_topic.clone(), }) } pub fn log_event(&self, event: &T) -> MQResult<()> { router_env::logger::debug!("Logging Kafka Event {event:?}"); - let topic = match event.event_type() { - EventType::PaymentIntent => &self.intent_analytics_topic, - EventType::PaymentAttempt => &self.attempt_analytics_topic, - EventType::Refund => &self.refund_analytics_topic, - EventType::ApiLogs => &self.api_logs_topic, - EventType::ConnectorApiLogs => &self.connector_logs_topic, - EventType::OutgoingWebhookLogs => &self.outgoing_webhook_logs_topic, - EventType::Dispute => &self.dispute_analytics_topic, - EventType::AuditEvent => &self.audit_events_topic, - #[cfg(feature = "payouts")] - EventType::Payout => &self.payout_analytics_topic, - }; + let topic = self.get_topic(event.event_type()); self.producer .0 .send( @@ -281,11 +321,18 @@ impl KafkaProducer { format!("Failed to add negative attempt event {negative_event:?}") })?; }; + self.log_event(&KafkaEvent::new( &KafkaPaymentAttempt::from_storage(attempt), tenant_id.clone(), )) - .attach_printable_lazy(|| format!("Failed to add positive attempt event {attempt:?}")) + .attach_printable_lazy(|| format!("Failed to add positive attempt event {attempt:?}"))?; + + self.log_event(&KafkaConsolidatedEvent::new( + &KafkaPaymentAttemptEvent::from_storage(attempt), + tenant_id.clone(), + )) + .attach_printable_lazy(|| format!("Failed to add consolidated attempt event {attempt:?}")) } pub async fn log_payment_attempt_delete( @@ -317,11 +364,18 @@ impl KafkaProducer { format!("Failed to add negative intent event {negative_event:?}") })?; }; + self.log_event(&KafkaEvent::new( &KafkaPaymentIntent::from_storage(intent), tenant_id.clone(), )) - .attach_printable_lazy(|| format!("Failed to add positive intent event {intent:?}")) + .attach_printable_lazy(|| format!("Failed to add positive intent event {intent:?}"))?; + + self.log_event(&KafkaConsolidatedEvent::new( + &KafkaPaymentIntentEvent::from_storage(intent), + tenant_id.clone(), + )) + .attach_printable_lazy(|| format!("Failed to add consolidated intent event {intent:?}")) } pub async fn log_payment_intent_delete( @@ -353,11 +407,18 @@ impl KafkaProducer { format!("Failed to add negative refund event {negative_event:?}") })?; }; + self.log_event(&KafkaEvent::new( &KafkaRefund::from_storage(refund), tenant_id.clone(), )) - .attach_printable_lazy(|| format!("Failed to add positive refund event {refund:?}")) + .attach_printable_lazy(|| format!("Failed to add positive refund event {refund:?}"))?; + + self.log_event(&KafkaConsolidatedEvent::new( + &KafkaRefundEvent::from_storage(refund), + tenant_id.clone(), + )) + .attach_printable_lazy(|| format!("Failed to add consolidated refund event {refund:?}")) } pub async fn log_refund_delete( @@ -389,11 +450,18 @@ impl KafkaProducer { format!("Failed to add negative dispute event {negative_event:?}") })?; }; + self.log_event(&KafkaEvent::new( &KafkaDispute::from_storage(dispute), tenant_id.clone(), )) - .attach_printable_lazy(|| format!("Failed to add positive dispute event {dispute:?}")) + .attach_printable_lazy(|| format!("Failed to add positive dispute event {dispute:?}"))?; + + self.log_event(&KafkaConsolidatedEvent::new( + &KafkaDisputeEvent::from_storage(dispute), + tenant_id.clone(), + )) + .attach_printable_lazy(|| format!("Failed to add consolidated dispute event {dispute:?}")) } #[cfg(feature = "payouts")] @@ -437,6 +505,7 @@ impl KafkaProducer { EventType::AuditEvent => &self.audit_events_topic, #[cfg(feature = "payouts")] EventType::Payout => &self.payout_analytics_topic, + EventType::Consolidated => &self.consolidated_events_topic, } } } diff --git a/crates/router/src/services/kafka/dispute_event.rs b/crates/router/src/services/kafka/dispute_event.rs new file mode 100644 index 000000000000..71e0a11e04d5 --- /dev/null +++ b/crates/router/src/services/kafka/dispute_event.rs @@ -0,0 +1,77 @@ +use diesel_models::enums as storage_enums; +use masking::Secret; +use time::OffsetDateTime; + +use crate::types::storage::dispute::Dispute; + +#[serde_with::skip_serializing_none] +#[derive(serde::Serialize, Debug)] +pub struct KafkaDisputeEvent<'a> { + pub dispute_id: &'a String, + pub dispute_amount: i64, + pub currency: &'a String, + pub dispute_stage: &'a storage_enums::DisputeStage, + pub dispute_status: &'a storage_enums::DisputeStatus, + pub payment_id: &'a String, + pub attempt_id: &'a String, + pub merchant_id: &'a String, + pub connector_status: &'a String, + pub connector_dispute_id: &'a String, + pub connector_reason: Option<&'a String>, + pub connector_reason_code: Option<&'a String>, + #[serde(default, with = "time::serde::timestamp::milliseconds::option")] + pub challenge_required_by: Option, + #[serde(default, with = "time::serde::timestamp::milliseconds::option")] + pub connector_created_at: Option, + #[serde(default, with = "time::serde::timestamp::milliseconds::option")] + pub connector_updated_at: Option, + #[serde(default, with = "time::serde::timestamp::milliseconds")] + pub created_at: OffsetDateTime, + #[serde(default, with = "time::serde::timestamp::milliseconds")] + pub modified_at: OffsetDateTime, + pub connector: &'a String, + pub evidence: &'a Secret, + pub profile_id: Option<&'a String>, + pub merchant_connector_id: Option<&'a String>, +} + +impl<'a> KafkaDisputeEvent<'a> { + pub fn from_storage(dispute: &'a Dispute) -> Self { + Self { + dispute_id: &dispute.dispute_id, + dispute_amount: dispute.amount.parse::().unwrap_or_default(), + currency: &dispute.currency, + dispute_stage: &dispute.dispute_stage, + dispute_status: &dispute.dispute_status, + payment_id: &dispute.payment_id, + attempt_id: &dispute.attempt_id, + merchant_id: &dispute.merchant_id, + connector_status: &dispute.connector_status, + connector_dispute_id: &dispute.connector_dispute_id, + connector_reason: dispute.connector_reason.as_ref(), + connector_reason_code: dispute.connector_reason_code.as_ref(), + challenge_required_by: dispute.challenge_required_by.map(|i| i.assume_utc()), + connector_created_at: dispute.connector_created_at.map(|i| i.assume_utc()), + connector_updated_at: dispute.connector_updated_at.map(|i| i.assume_utc()), + created_at: dispute.created_at.assume_utc(), + modified_at: dispute.modified_at.assume_utc(), + connector: &dispute.connector, + evidence: &dispute.evidence, + profile_id: dispute.profile_id.as_ref(), + merchant_connector_id: dispute.merchant_connector_id.as_ref(), + } + } +} + +impl<'a> super::KafkaMessage for KafkaDisputeEvent<'a> { + fn key(&self) -> String { + format!( + "{}_{}_{}", + self.merchant_id, self.payment_id, self.dispute_id + ) + } + + fn event_type(&self) -> crate::events::EventType { + crate::events::EventType::Dispute + } +} diff --git a/crates/router/src/services/kafka/payment_attempt_event.rs b/crates/router/src/services/kafka/payment_attempt_event.rs new file mode 100644 index 000000000000..bb4d69eda27f --- /dev/null +++ b/crates/router/src/services/kafka/payment_attempt_event.rs @@ -0,0 +1,119 @@ +// use diesel_models::enums::MandateDetails; +use common_utils::types::MinorUnit; +use diesel_models::enums as storage_enums; +use hyperswitch_domain_models::{ + mandates::MandateDetails, payments::payment_attempt::PaymentAttempt, +}; +use time::OffsetDateTime; + +#[serde_with::skip_serializing_none] +#[derive(serde::Serialize, Debug)] +pub struct KafkaPaymentAttemptEvent<'a> { + pub payment_id: &'a String, + pub merchant_id: &'a String, + pub attempt_id: &'a String, + pub status: storage_enums::AttemptStatus, + pub amount: MinorUnit, + pub currency: Option, + pub save_to_locker: Option, + pub connector: Option<&'a String>, + pub error_message: Option<&'a String>, + pub offer_amount: Option, + pub surcharge_amount: Option, + pub tax_amount: Option, + pub payment_method_id: Option<&'a String>, + pub payment_method: Option, + pub connector_transaction_id: Option<&'a String>, + pub capture_method: Option, + #[serde(default, with = "time::serde::timestamp::milliseconds::option")] + pub capture_on: Option, + pub confirm: bool, + pub authentication_type: Option, + #[serde(with = "time::serde::timestamp::milliseconds")] + pub created_at: OffsetDateTime, + #[serde(with = "time::serde::timestamp::milliseconds")] + pub modified_at: OffsetDateTime, + #[serde(default, with = "time::serde::timestamp::milliseconds::option")] + pub last_synced: Option, + pub cancellation_reason: Option<&'a String>, + pub amount_to_capture: Option, + pub mandate_id: Option<&'a String>, + pub browser_info: Option, + pub error_code: Option<&'a String>, + pub connector_metadata: Option, + // TODO: These types should implement copy ideally + pub payment_experience: Option<&'a storage_enums::PaymentExperience>, + pub payment_method_type: Option<&'a storage_enums::PaymentMethodType>, + pub payment_method_data: Option, + pub error_reason: Option<&'a String>, + pub multiple_capture_count: Option, + pub amount_capturable: MinorUnit, + pub merchant_connector_id: Option<&'a String>, + pub net_amount: MinorUnit, + pub unified_code: Option<&'a String>, + pub unified_message: Option<&'a String>, + pub mandate_data: Option<&'a MandateDetails>, + pub client_source: Option<&'a String>, + pub client_version: Option<&'a String>, +} + +impl<'a> KafkaPaymentAttemptEvent<'a> { + pub fn from_storage(attempt: &'a PaymentAttempt) -> Self { + Self { + payment_id: &attempt.payment_id, + merchant_id: &attempt.merchant_id, + attempt_id: &attempt.attempt_id, + status: attempt.status, + amount: attempt.amount, + currency: attempt.currency, + save_to_locker: attempt.save_to_locker, + connector: attempt.connector.as_ref(), + error_message: attempt.error_message.as_ref(), + offer_amount: attempt.offer_amount, + surcharge_amount: attempt.surcharge_amount, + tax_amount: attempt.tax_amount, + payment_method_id: attempt.payment_method_id.as_ref(), + payment_method: attempt.payment_method, + connector_transaction_id: attempt.connector_transaction_id.as_ref(), + capture_method: attempt.capture_method, + capture_on: attempt.capture_on.map(|i| i.assume_utc()), + confirm: attempt.confirm, + authentication_type: attempt.authentication_type, + created_at: attempt.created_at.assume_utc(), + modified_at: attempt.modified_at.assume_utc(), + last_synced: attempt.last_synced.map(|i| i.assume_utc()), + cancellation_reason: attempt.cancellation_reason.as_ref(), + amount_to_capture: attempt.amount_to_capture, + mandate_id: attempt.mandate_id.as_ref(), + browser_info: attempt.browser_info.as_ref().map(|v| v.to_string()), + error_code: attempt.error_code.as_ref(), + connector_metadata: attempt.connector_metadata.as_ref().map(|v| v.to_string()), + payment_experience: attempt.payment_experience.as_ref(), + payment_method_type: attempt.payment_method_type.as_ref(), + payment_method_data: attempt.payment_method_data.as_ref().map(|v| v.to_string()), + error_reason: attempt.error_reason.as_ref(), + multiple_capture_count: attempt.multiple_capture_count, + amount_capturable: attempt.amount_capturable, + merchant_connector_id: attempt.merchant_connector_id.as_ref(), + net_amount: attempt.net_amount, + unified_code: attempt.unified_code.as_ref(), + unified_message: attempt.unified_message.as_ref(), + mandate_data: attempt.mandate_data.as_ref(), + client_source: attempt.client_source.as_ref(), + client_version: attempt.client_version.as_ref(), + } + } +} + +impl<'a> super::KafkaMessage for KafkaPaymentAttemptEvent<'a> { + fn key(&self) -> String { + format!( + "{}_{}_{}", + self.merchant_id, self.payment_id, self.attempt_id + ) + } + + fn event_type(&self) -> crate::events::EventType { + crate::events::EventType::PaymentAttempt + } +} diff --git a/crates/router/src/services/kafka/payment_intent_event.rs b/crates/router/src/services/kafka/payment_intent_event.rs new file mode 100644 index 000000000000..a3fbd9ddc458 --- /dev/null +++ b/crates/router/src/services/kafka/payment_intent_event.rs @@ -0,0 +1,75 @@ +use common_utils::{id_type, types::MinorUnit}; +use diesel_models::enums as storage_enums; +use hyperswitch_domain_models::payments::PaymentIntent; +use time::OffsetDateTime; + +#[serde_with::skip_serializing_none] +#[derive(serde::Serialize, Debug)] +pub struct KafkaPaymentIntentEvent<'a> { + pub payment_id: &'a String, + pub merchant_id: &'a String, + pub status: storage_enums::IntentStatus, + pub amount: MinorUnit, + pub currency: Option, + pub amount_captured: Option, + pub customer_id: Option<&'a id_type::CustomerId>, + pub description: Option<&'a String>, + pub return_url: Option<&'a String>, + pub connector_id: Option<&'a String>, + pub statement_descriptor_name: Option<&'a String>, + pub statement_descriptor_suffix: Option<&'a String>, + #[serde(with = "time::serde::timestamp::milliseconds")] + pub created_at: OffsetDateTime, + #[serde(with = "time::serde::timestamp::milliseconds")] + pub modified_at: OffsetDateTime, + #[serde(default, with = "time::serde::timestamp::milliseconds::option")] + pub last_synced: Option, + pub setup_future_usage: Option, + pub off_session: Option, + pub client_secret: Option<&'a String>, + pub active_attempt_id: String, + pub business_country: Option, + pub business_label: Option<&'a String>, + pub attempt_count: i16, + pub payment_confirm_source: Option, +} + +impl<'a> KafkaPaymentIntentEvent<'a> { + pub fn from_storage(intent: &'a PaymentIntent) -> Self { + Self { + payment_id: &intent.payment_id, + merchant_id: &intent.merchant_id, + status: intent.status, + amount: intent.amount, + currency: intent.currency, + amount_captured: intent.amount_captured, + customer_id: intent.customer_id.as_ref(), + description: intent.description.as_ref(), + return_url: intent.return_url.as_ref(), + connector_id: intent.connector_id.as_ref(), + statement_descriptor_name: intent.statement_descriptor_name.as_ref(), + statement_descriptor_suffix: intent.statement_descriptor_suffix.as_ref(), + created_at: intent.created_at.assume_utc(), + modified_at: intent.modified_at.assume_utc(), + last_synced: intent.last_synced.map(|i| i.assume_utc()), + setup_future_usage: intent.setup_future_usage, + off_session: intent.off_session, + client_secret: intent.client_secret.as_ref(), + active_attempt_id: intent.active_attempt.get_id(), + business_country: intent.business_country, + business_label: intent.business_label.as_ref(), + attempt_count: intent.attempt_count, + payment_confirm_source: intent.payment_confirm_source, + } + } +} + +impl<'a> super::KafkaMessage for KafkaPaymentIntentEvent<'a> { + fn key(&self) -> String { + format!("{}_{}", self.merchant_id, self.payment_id) + } + + fn event_type(&self) -> crate::events::EventType { + crate::events::EventType::PaymentIntent + } +} diff --git a/crates/router/src/services/kafka/refund_event.rs b/crates/router/src/services/kafka/refund_event.rs new file mode 100644 index 000000000000..6aa80b243c11 --- /dev/null +++ b/crates/router/src/services/kafka/refund_event.rs @@ -0,0 +1,74 @@ +use common_utils::types::MinorUnit; +use diesel_models::{enums as storage_enums, refund::Refund}; +use time::OffsetDateTime; + +#[serde_with::skip_serializing_none] +#[derive(serde::Serialize, Debug)] +pub struct KafkaRefundEvent<'a> { + pub internal_reference_id: &'a String, + pub refund_id: &'a String, //merchant_reference id + pub payment_id: &'a String, + pub merchant_id: &'a String, + pub connector_transaction_id: &'a String, + pub connector: &'a String, + pub connector_refund_id: Option<&'a String>, + pub external_reference_id: Option<&'a String>, + pub refund_type: &'a storage_enums::RefundType, + pub total_amount: &'a MinorUnit, + pub currency: &'a storage_enums::Currency, + pub refund_amount: &'a MinorUnit, + pub refund_status: &'a storage_enums::RefundStatus, + pub sent_to_gateway: &'a bool, + pub refund_error_message: Option<&'a String>, + pub refund_arn: Option<&'a String>, + #[serde(default, with = "time::serde::timestamp::milliseconds")] + pub created_at: OffsetDateTime, + #[serde(default, with = "time::serde::timestamp::milliseconds")] + pub modified_at: OffsetDateTime, + pub description: Option<&'a String>, + pub attempt_id: &'a String, + pub refund_reason: Option<&'a String>, + pub refund_error_code: Option<&'a String>, +} + +impl<'a> KafkaRefundEvent<'a> { + pub fn from_storage(refund: &'a Refund) -> Self { + Self { + internal_reference_id: &refund.internal_reference_id, + refund_id: &refund.refund_id, + payment_id: &refund.payment_id, + merchant_id: &refund.merchant_id, + connector_transaction_id: &refund.connector_transaction_id, + connector: &refund.connector, + connector_refund_id: refund.connector_refund_id.as_ref(), + external_reference_id: refund.external_reference_id.as_ref(), + refund_type: &refund.refund_type, + total_amount: &refund.total_amount, + currency: &refund.currency, + refund_amount: &refund.refund_amount, + refund_status: &refund.refund_status, + sent_to_gateway: &refund.sent_to_gateway, + refund_error_message: refund.refund_error_message.as_ref(), + refund_arn: refund.refund_arn.as_ref(), + created_at: refund.created_at.assume_utc(), + modified_at: refund.updated_at.assume_utc(), + description: refund.description.as_ref(), + attempt_id: &refund.attempt_id, + refund_reason: refund.refund_reason.as_ref(), + refund_error_code: refund.refund_error_code.as_ref(), + } + } +} + +impl<'a> super::KafkaMessage for KafkaRefundEvent<'a> { + fn key(&self) -> String { + format!( + "{}_{}_{}_{}", + self.merchant_id, self.payment_id, self.attempt_id, self.refund_id + ) + } + + fn event_type(&self) -> crate::events::EventType { + crate::events::EventType::Refund + } +}