From b1cb053a55e9ce4d78f7770b53e39700311d9cd4 Mon Sep 17 00:00:00 2001 From: Sanchith Hegde <22217505+SanchithHegde@users.noreply.github.com> Date: Thu, 6 Jun 2024 14:00:09 +0530 Subject: [PATCH] refactor(webhooks): extract incoming and outgoing webhooks into separate modules (#4870) --- crates/router/src/core/webhooks.rs | 2385 +---------------- crates/router/src/core/webhooks/incoming.rs | 1628 +++++++++++ crates/router/src/core/webhooks/outgoing.rs | 775 ++++++ .../src/core/webhooks/webhook_events.rs | 2 +- crates/router/src/routes/webhooks.rs | 2 +- 5 files changed, 2412 insertions(+), 2380 deletions(-) create mode 100644 crates/router/src/core/webhooks/incoming.rs create mode 100644 crates/router/src/core/webhooks/outgoing.rs diff --git a/crates/router/src/core/webhooks.rs b/crates/router/src/core/webhooks.rs index 225607777055..a209d90f316f 100644 --- a/crates/router/src/core/webhooks.rs +++ b/crates/router/src/core/webhooks.rs @@ -1,2387 +1,16 @@ +mod incoming; +mod outgoing; pub mod types; pub mod utils; #[cfg(feature = "olap")] pub mod webhook_events; -use std::{str::FromStr, time::Instant}; - -use actix_web::FromRequest; -#[cfg(feature = "payouts")] -use api_models::payouts as payout_models; -use api_models::{ - payments::HeaderPayload, - webhook_events::{OutgoingWebhookRequestContent, OutgoingWebhookResponseContent}, - webhooks::{self, WebhookResponseTracker}, -}; -use common_utils::{ - errors::ReportSwitchExt, events::ApiEventsType, ext_traits::Encode, request::RequestContent, -}; -use error_stack::{report, ResultExt}; -use masking::{ExposeInterface, Mask, PeekInterface, Secret}; -use router_env::{ - instrument, - tracing::{self, Instrument}, - tracing_actix_web::RequestId, -}; - -#[cfg(feature = "payouts")] -use super::payouts; -use super::{errors::StorageErrorExt, metrics}; -#[cfg(feature = "stripe")] -use crate::compatibility::stripe::webhooks as stripe_webhooks; -#[cfg(not(feature = "payouts"))] -use crate::routes::SessionState; -use crate::{ - consts, - core::{ - api_locking, - errors::{self, ConnectorErrorExt, CustomResult, RouterResponse}, - payments, refunds, +pub(crate) use self::{ + incoming::incoming_webhooks_wrapper, + outgoing::{ + create_event_and_trigger_outgoing_webhook, get_outgoing_webhook_request, + trigger_webhook_and_raise_event, }, - db::StorageInterface, - events::{ - api_logs::ApiEvent, - outgoing_webhook_logs::{OutgoingWebhookEvent, OutgoingWebhookEventMetric}, - }, - logger, - routes::{ - app::{ReqState, SessionStateInfo}, - lock_utils, - metrics::request::add_attributes, - }, - services::{self, authentication as auth}, - types::{ - api::{self, mandates::MandateResponseExt}, - domain::{self, types as domain_types}, - storage::{self, enums}, - transformers::{ForeignFrom, ForeignInto, ForeignTryFrom}, - }, - utils::{self as helper_utils, generate_id, OptionExt, ValueExt}, - workflows::outgoing_webhook_retry, }; -#[cfg(feature = "payouts")] -use crate::{routes::SessionState, types::storage::PayoutAttemptUpdate}; -const OUTGOING_WEBHOOK_TIMEOUT_SECS: u64 = 5; const MERCHANT_ID: &str = "merchant_id"; - -pub async fn payments_incoming_webhook_flow( - state: SessionState, - req_state: ReqState, - merchant_account: domain::MerchantAccount, - business_profile: diesel_models::business_profile::BusinessProfile, - key_store: domain::MerchantKeyStore, - webhook_details: api::IncomingWebhookDetails, - source_verified: bool, -) -> CustomResult { - let consume_or_trigger_flow = if source_verified { - payments::CallConnectorAction::HandleResponse(webhook_details.resource_object) - } else { - payments::CallConnectorAction::Trigger - }; - let payments_response = match webhook_details.object_reference_id { - webhooks::ObjectReferenceId::PaymentId(id) => { - let payment_id = get_payment_id( - state.store.as_ref(), - &id, - merchant_account.merchant_id.as_str(), - merchant_account.storage_scheme, - ) - .await?; - - let lock_action = api_locking::LockAction::Hold { - input: api_locking::LockingInput { - unique_locking_key: payment_id, - api_identifier: lock_utils::ApiIdentifier::Payments, - override_lock_retries: None, - }, - }; - - lock_action - .clone() - .perform_locking_action(&state, merchant_account.merchant_id.to_string()) - .await?; - - let response = Box::pin(payments::payments_core::< - api::PSync, - api::PaymentsResponse, - _, - _, - _, - >( - state.clone(), - req_state, - merchant_account.clone(), - key_store.clone(), - payments::operations::PaymentStatus, - api::PaymentsRetrieveRequest { - resource_id: id, - merchant_id: Some(merchant_account.merchant_id.clone()), - force_sync: true, - connector: None, - param: None, - merchant_connector_details: None, - client_secret: None, - expand_attempts: None, - expand_captures: None, - }, - services::AuthFlow::Merchant, - consume_or_trigger_flow, - None, - HeaderPayload::default(), - )) - .await; - - lock_action - .free_lock_action(&state, merchant_account.merchant_id.to_owned()) - .await?; - - match response { - Ok(value) => value, - Err(err) - if matches!( - err.current_context(), - &errors::ApiErrorResponse::PaymentNotFound - ) && state - .conf - .webhooks - .ignore_error - .payment_not_found - .unwrap_or(true) => - { - metrics::WEBHOOK_PAYMENT_NOT_FOUND.add( - &metrics::CONTEXT, - 1, - &[add_attributes( - "merchant_id", - merchant_account.merchant_id.clone(), - )], - ); - return Ok(WebhookResponseTracker::NoEffect); - } - error @ Err(_) => error?, - } - } - _ => Err(errors::ApiErrorResponse::WebhookProcessingFailure).attach_printable( - "Did not get payment id as object reference id in webhook payments flow", - )?, - }; - - match payments_response { - services::ApplicationResponse::JsonWithHeaders((payments_response, _)) => { - let payment_id = payments_response - .payment_id - .clone() - .get_required_value("payment_id") - .change_context(errors::ApiErrorResponse::WebhookProcessingFailure) - .attach_printable("payment id not received from payments core")?; - - let status = payments_response.status; - - let event_type: Option = payments_response.status.foreign_into(); - - // If event is NOT an UnsupportedEvent, trigger Outgoing Webhook - if let Some(outgoing_event_type) = event_type { - let primary_object_created_at = payments_response.created; - create_event_and_trigger_outgoing_webhook( - state, - merchant_account, - business_profile, - &key_store, - outgoing_event_type, - enums::EventClass::Payments, - payment_id.clone(), - enums::EventObjectType::PaymentDetails, - api::OutgoingWebhookContent::PaymentDetails(payments_response), - primary_object_created_at, - ) - .await?; - }; - - let response = WebhookResponseTracker::Payment { payment_id, status }; - - Ok(response) - } - - _ => Err(errors::ApiErrorResponse::WebhookProcessingFailure) - .attach_printable("received non-json response from payments core")?, - } -} - -#[cfg(feature = "payouts")] -pub async fn payouts_incoming_webhook_flow( - state: SessionState, - merchant_account: domain::MerchantAccount, - business_profile: diesel_models::business_profile::BusinessProfile, - key_store: domain::MerchantKeyStore, - webhook_details: api::IncomingWebhookDetails, - event_type: webhooks::IncomingWebhookEvent, - source_verified: bool, -) -> CustomResult { - metrics::INCOMING_PAYOUT_WEBHOOK_METRIC.add(&metrics::CONTEXT, 1, &[]); - if source_verified { - let db = &*state.store; - //find payout_attempt by object_reference_id - let payout_attempt = match webhook_details.object_reference_id { - webhooks::ObjectReferenceId::PayoutId(payout_id_type) => match payout_id_type { - webhooks::PayoutIdType::PayoutAttemptId(id) => db - .find_payout_attempt_by_merchant_id_payout_attempt_id( - &merchant_account.merchant_id, - &id, - merchant_account.storage_scheme, - ) - .await - .change_context(errors::ApiErrorResponse::WebhookResourceNotFound) - .attach_printable("Failed to fetch the payout attempt")?, - webhooks::PayoutIdType::ConnectorPayoutId(id) => db - .find_payout_attempt_by_merchant_id_connector_payout_id( - &merchant_account.merchant_id, - &id, - merchant_account.storage_scheme, - ) - .await - .change_context(errors::ApiErrorResponse::WebhookResourceNotFound) - .attach_printable("Failed to fetch the payout attempt")?, - }, - _ => Err(errors::ApiErrorResponse::WebhookProcessingFailure) - .attach_printable("received a non-payout id when processing payout webhooks")?, - }; - - let payouts = db - .find_payout_by_merchant_id_payout_id( - &merchant_account.merchant_id, - &payout_attempt.payout_id, - merchant_account.storage_scheme, - ) - .await - .change_context(errors::ApiErrorResponse::WebhookResourceNotFound) - .attach_printable("Failed to fetch the payout")?; - - let payout_attempt_update = PayoutAttemptUpdate::StatusUpdate { - connector_payout_id: payout_attempt.connector_payout_id.clone(), - status: common_enums::PayoutStatus::foreign_try_from(event_type) - .change_context(errors::ApiErrorResponse::WebhookProcessingFailure) - .attach_printable("failed payout status mapping from event type")?, - error_message: None, - error_code: None, - is_eligible: payout_attempt.is_eligible, - }; - - let action_req = - payout_models::PayoutRequest::PayoutActionRequest(payout_models::PayoutActionRequest { - payout_id: payouts.payout_id.clone(), - }); - - let payout_data = - payouts::make_payout_data(&state, &merchant_account, &key_store, &action_req).await?; - - let updated_payout_attempt = db - .update_payout_attempt( - &payout_attempt, - payout_attempt_update, - &payout_data.payouts, - merchant_account.storage_scheme, - ) - .await - .change_context(errors::ApiErrorResponse::WebhookResourceNotFound) - .attach_printable_lazy(|| { - format!( - "Failed while updating payout attempt: payout_attempt_id: {}", - payout_attempt.payout_attempt_id - ) - })?; - - let event_type: Option = updated_payout_attempt.status.foreign_into(); - - // If event is NOT an UnsupportedEvent, trigger Outgoing Webhook - if let Some(outgoing_event_type) = event_type { - let router_response = - payouts::response_handler(&merchant_account, &payout_data).await?; - - let payout_create_response: payout_models::PayoutCreateResponse = match router_response - { - services::ApplicationResponse::Json(response) => response, - _ => Err(errors::ApiErrorResponse::WebhookResourceNotFound) - .attach_printable("Failed to fetch the payout create response")?, - }; - - create_event_and_trigger_outgoing_webhook( - state, - merchant_account, - business_profile, - &key_store, - outgoing_event_type, - enums::EventClass::Payouts, - updated_payout_attempt.payout_id.clone(), - enums::EventObjectType::PayoutDetails, - api::OutgoingWebhookContent::PayoutDetails(payout_create_response), - Some(updated_payout_attempt.created_at), - ) - .await?; - } - - Ok(WebhookResponseTracker::Payout { - payout_id: updated_payout_attempt.payout_id, - status: updated_payout_attempt.status, - }) - } else { - metrics::INCOMING_PAYOUT_WEBHOOK_SIGNATURE_FAILURE_METRIC.add(&metrics::CONTEXT, 1, &[]); - Err(report!( - errors::ApiErrorResponse::WebhookAuthenticationFailed - )) - } -} - -#[instrument(skip_all)] -#[allow(clippy::too_many_arguments)] -pub async fn refunds_incoming_webhook_flow( - state: SessionState, - merchant_account: domain::MerchantAccount, - business_profile: diesel_models::business_profile::BusinessProfile, - key_store: domain::MerchantKeyStore, - webhook_details: api::IncomingWebhookDetails, - connector_name: &str, - source_verified: bool, - event_type: webhooks::IncomingWebhookEvent, -) -> CustomResult { - let db = &*state.store; - //find refund by connector refund id - let refund = match webhook_details.object_reference_id { - webhooks::ObjectReferenceId::RefundId(refund_id_type) => match refund_id_type { - webhooks::RefundIdType::RefundId(id) => db - .find_refund_by_merchant_id_refund_id( - &merchant_account.merchant_id, - &id, - merchant_account.storage_scheme, - ) - .await - .change_context(errors::ApiErrorResponse::WebhookResourceNotFound) - .attach_printable("Failed to fetch the refund")?, - webhooks::RefundIdType::ConnectorRefundId(id) => db - .find_refund_by_merchant_id_connector_refund_id_connector( - &merchant_account.merchant_id, - &id, - connector_name, - merchant_account.storage_scheme, - ) - .await - .change_context(errors::ApiErrorResponse::WebhookResourceNotFound) - .attach_printable("Failed to fetch the refund")?, - }, - _ => Err(errors::ApiErrorResponse::WebhookProcessingFailure) - .attach_printable("received a non-refund id when processing refund webhooks")?, - }; - let refund_id = refund.refund_id.to_owned(); - //if source verified then update refund status else trigger refund sync - let updated_refund = if source_verified { - let refund_update = storage::RefundUpdate::StatusUpdate { - connector_refund_id: None, - sent_to_gateway: true, - refund_status: common_enums::RefundStatus::foreign_try_from(event_type) - .change_context(errors::ApiErrorResponse::WebhookProcessingFailure) - .attach_printable("failed refund status mapping from event type")?, - updated_by: merchant_account.storage_scheme.to_string(), - }; - db.update_refund( - refund.to_owned(), - refund_update, - merchant_account.storage_scheme, - ) - .await - .to_not_found_response(errors::ApiErrorResponse::WebhookResourceNotFound) - .attach_printable_lazy(|| format!("Failed while updating refund: refund_id: {refund_id}"))? - } else { - Box::pin(refunds::refund_retrieve_core( - state.clone(), - merchant_account.clone(), - key_store.clone(), - api_models::refunds::RefundsRetrieveRequest { - refund_id: refund_id.to_owned(), - force_sync: Some(true), - merchant_connector_details: None, - }, - )) - .await - .attach_printable_lazy(|| format!("Failed while updating refund: refund_id: {refund_id}"))? - }; - let event_type: Option = updated_refund.refund_status.foreign_into(); - - // If event is NOT an UnsupportedEvent, trigger Outgoing Webhook - if let Some(outgoing_event_type) = event_type { - let refund_response: api_models::refunds::RefundResponse = - updated_refund.clone().foreign_into(); - create_event_and_trigger_outgoing_webhook( - state, - merchant_account, - business_profile, - &key_store, - outgoing_event_type, - enums::EventClass::Refunds, - refund_id, - enums::EventObjectType::RefundDetails, - api::OutgoingWebhookContent::RefundDetails(refund_response), - Some(updated_refund.created_at), - ) - .await?; - } - - Ok(WebhookResponseTracker::Refund { - payment_id: updated_refund.payment_id, - refund_id: updated_refund.refund_id, - status: updated_refund.refund_status, - }) -} - -pub async fn get_payment_attempt_from_object_reference_id( - state: &SessionState, - object_reference_id: webhooks::ObjectReferenceId, - merchant_account: &domain::MerchantAccount, -) -> CustomResult< - hyperswitch_domain_models::payments::payment_attempt::PaymentAttempt, - errors::ApiErrorResponse, -> { - let db = &*state.store; - match object_reference_id { - api::ObjectReferenceId::PaymentId(api::PaymentIdType::ConnectorTransactionId(ref id)) => db - .find_payment_attempt_by_merchant_id_connector_txn_id( - &merchant_account.merchant_id, - id, - merchant_account.storage_scheme, - ) - .await - .to_not_found_response(errors::ApiErrorResponse::WebhookResourceNotFound), - api::ObjectReferenceId::PaymentId(api::PaymentIdType::PaymentAttemptId(ref id)) => db - .find_payment_attempt_by_attempt_id_merchant_id( - id, - &merchant_account.merchant_id, - merchant_account.storage_scheme, - ) - .await - .to_not_found_response(errors::ApiErrorResponse::WebhookResourceNotFound), - api::ObjectReferenceId::PaymentId(api::PaymentIdType::PreprocessingId(ref id)) => db - .find_payment_attempt_by_preprocessing_id_merchant_id( - id, - &merchant_account.merchant_id, - merchant_account.storage_scheme, - ) - .await - .to_not_found_response(errors::ApiErrorResponse::WebhookResourceNotFound), - _ => Err(errors::ApiErrorResponse::WebhookProcessingFailure) - .attach_printable("received a non-payment id for retrieving payment")?, - } -} - -#[allow(clippy::too_many_arguments)] -pub async fn get_or_update_dispute_object( - state: SessionState, - option_dispute: Option, - dispute_details: api::disputes::DisputePayload, - merchant_id: &str, - payment_attempt: &hyperswitch_domain_models::payments::payment_attempt::PaymentAttempt, - event_type: webhooks::IncomingWebhookEvent, - business_profile: &diesel_models::business_profile::BusinessProfile, - connector_name: &str, -) -> CustomResult { - let db = &*state.store; - match option_dispute { - None => { - metrics::INCOMING_DISPUTE_WEBHOOK_NEW_RECORD_METRIC.add(&metrics::CONTEXT, 1, &[]); - let dispute_id = generate_id(consts::ID_LENGTH, "dp"); - let new_dispute = diesel_models::dispute::DisputeNew { - dispute_id, - amount: dispute_details.amount.clone(), - currency: dispute_details.currency, - dispute_stage: dispute_details.dispute_stage, - dispute_status: common_enums::DisputeStatus::foreign_try_from(event_type) - .change_context(errors::ApiErrorResponse::WebhookProcessingFailure) - .attach_printable("event type to dispute status mapping failed")?, - payment_id: payment_attempt.payment_id.to_owned(), - connector: connector_name.to_owned(), - attempt_id: payment_attempt.attempt_id.to_owned(), - merchant_id: merchant_id.to_owned(), - connector_status: dispute_details.connector_status, - connector_dispute_id: dispute_details.connector_dispute_id, - connector_reason: dispute_details.connector_reason, - connector_reason_code: dispute_details.connector_reason_code, - challenge_required_by: dispute_details.challenge_required_by, - connector_created_at: dispute_details.created_at, - connector_updated_at: dispute_details.updated_at, - profile_id: Some(business_profile.profile_id.clone()), - evidence: None, - merchant_connector_id: payment_attempt.merchant_connector_id.clone(), - dispute_amount: dispute_details.amount.parse::().unwrap_or(0), - }; - state - .store - .insert_dispute(new_dispute.clone()) - .await - .to_not_found_response(errors::ApiErrorResponse::WebhookResourceNotFound) - } - Some(dispute) => { - logger::info!("Dispute Already exists, Updating the dispute details"); - metrics::INCOMING_DISPUTE_WEBHOOK_UPDATE_RECORD_METRIC.add(&metrics::CONTEXT, 1, &[]); - let dispute_status = diesel_models::enums::DisputeStatus::foreign_try_from(event_type) - .change_context(errors::ApiErrorResponse::WebhookProcessingFailure) - .attach_printable("event type to dispute state conversion failure")?; - crate::core::utils::validate_dispute_stage_and_dispute_status( - dispute.dispute_stage, - dispute.dispute_status, - dispute_details.dispute_stage, - dispute_status, - ) - .change_context(errors::ApiErrorResponse::WebhookProcessingFailure) - .attach_printable("dispute stage and status validation failed")?; - let update_dispute = diesel_models::dispute::DisputeUpdate::Update { - dispute_stage: dispute_details.dispute_stage, - dispute_status, - connector_status: dispute_details.connector_status, - connector_reason: dispute_details.connector_reason, - connector_reason_code: dispute_details.connector_reason_code, - challenge_required_by: dispute_details.challenge_required_by, - connector_updated_at: dispute_details.updated_at, - }; - db.update_dispute(dispute, update_dispute) - .await - .to_not_found_response(errors::ApiErrorResponse::WebhookResourceNotFound) - } - } -} - -#[allow(clippy::too_many_arguments)] -pub async fn external_authentication_incoming_webhook_flow( - state: SessionState, - req_state: ReqState, - merchant_account: domain::MerchantAccount, - key_store: domain::MerchantKeyStore, - source_verified: bool, - event_type: webhooks::IncomingWebhookEvent, - request_details: &api::IncomingWebhookRequestDetails<'_>, - connector: &(dyn api::Connector + Sync), - object_ref_id: api::ObjectReferenceId, - business_profile: diesel_models::business_profile::BusinessProfile, - merchant_connector_account: domain::MerchantConnectorAccount, -) -> CustomResult { - if source_verified { - let authentication_details = connector - .get_external_authentication_details(request_details) - .switch()?; - let trans_status = authentication_details.trans_status; - let authentication_update = storage::AuthenticationUpdate::PostAuthenticationUpdate { - authentication_status: common_enums::AuthenticationStatus::foreign_from( - trans_status.clone(), - ), - trans_status, - authentication_value: authentication_details.authentication_value, - eci: authentication_details.eci, - }; - let authentication = - if let webhooks::ObjectReferenceId::ExternalAuthenticationID(authentication_id_type) = - object_ref_id - { - match authentication_id_type { - webhooks::AuthenticationIdType::AuthenticationId(authentication_id) => state - .store - .find_authentication_by_merchant_id_authentication_id( - merchant_account.merchant_id.clone(), - authentication_id.clone(), - ) - .await - .to_not_found_response(errors::ApiErrorResponse::AuthenticationNotFound { - id: authentication_id, - }) - .attach_printable("Error while fetching authentication record"), - webhooks::AuthenticationIdType::ConnectorAuthenticationId( - connector_authentication_id, - ) => state - .store - .find_authentication_by_merchant_id_connector_authentication_id( - merchant_account.merchant_id.clone(), - connector_authentication_id.clone(), - ) - .await - .to_not_found_response(errors::ApiErrorResponse::AuthenticationNotFound { - id: connector_authentication_id, - }) - .attach_printable("Error while fetching authentication record"), - } - } else { - Err(errors::ApiErrorResponse::WebhookProcessingFailure).attach_printable( - "received a non-external-authentication id for retrieving authentication", - ) - }?; - let updated_authentication = state - .store - .update_authentication_by_merchant_id_authentication_id( - authentication, - authentication_update, - ) - .await - .change_context(errors::ApiErrorResponse::InternalServerError) - .attach_printable("Error while updating authentication")?; - // Check if it's a payment authentication flow, payment_id would be there only for payment authentication flows - if let Some(payment_id) = updated_authentication.payment_id { - let is_pull_mechanism_enabled = helper_utils::check_if_pull_mechanism_for_external_3ds_enabled_from_connector_metadata(merchant_connector_account.metadata.map(|metadata| metadata.expose())); - // Merchant doesn't have pull mechanism enabled and if it's challenge flow, we have to authorize whenever we receive a ARes webhook - if !is_pull_mechanism_enabled - && updated_authentication.authentication_type - == Some(common_enums::DecoupledAuthenticationType::Challenge) - && event_type == webhooks::IncomingWebhookEvent::ExternalAuthenticationARes - { - let payment_confirm_req = api::PaymentsRequest { - payment_id: Some(api_models::payments::PaymentIdType::PaymentIntentId( - payment_id, - )), - merchant_id: Some(merchant_account.merchant_id.clone()), - ..Default::default() - }; - let payments_response = Box::pin(payments::payments_core::< - api::Authorize, - api::PaymentsResponse, - _, - _, - _, - >( - state.clone(), - req_state, - merchant_account.clone(), - key_store.clone(), - payments::PaymentConfirm, - payment_confirm_req, - services::api::AuthFlow::Merchant, - payments::CallConnectorAction::Trigger, - None, - HeaderPayload::with_source(enums::PaymentSource::ExternalAuthenticator), - )) - .await?; - match payments_response { - services::ApplicationResponse::JsonWithHeaders((payments_response, _)) => { - let payment_id = payments_response - .payment_id - .clone() - .get_required_value("payment_id") - .change_context(errors::ApiErrorResponse::WebhookProcessingFailure) - .attach_printable("payment id not received from payments core")?; - let status = payments_response.status; - let event_type: Option = - payments_response.status.foreign_into(); - // Set poll_id as completed in redis to allow the fetch status of poll through retrieve_poll_status api from client - let poll_id = super::utils::get_poll_id( - merchant_account.merchant_id.clone(), - super::utils::get_external_authentication_request_poll_id(&payment_id), - ); - let redis_conn = state - .store - .get_redis_conn() - .change_context(errors::ApiErrorResponse::InternalServerError) - .attach_printable("Failed to get redis connection")?; - redis_conn - .set_key_without_modifying_ttl( - &poll_id, - api_models::poll::PollStatus::Completed.to_string(), - ) - .await - .change_context(errors::ApiErrorResponse::InternalServerError) - .attach_printable("Failed to add poll_id in redis")?; - // If event is NOT an UnsupportedEvent, trigger Outgoing Webhook - if let Some(outgoing_event_type) = event_type { - let primary_object_created_at = payments_response.created; - create_event_and_trigger_outgoing_webhook( - state, - merchant_account, - business_profile, - &key_store, - outgoing_event_type, - enums::EventClass::Payments, - payment_id.clone(), - enums::EventObjectType::PaymentDetails, - api::OutgoingWebhookContent::PaymentDetails(payments_response), - primary_object_created_at, - ) - .await?; - }; - let response = WebhookResponseTracker::Payment { payment_id, status }; - Ok(response) - } - _ => Err(errors::ApiErrorResponse::WebhookProcessingFailure).attach_printable( - "Did not get payment id as object reference id in webhook payments flow", - )?, - } - } else { - Ok(WebhookResponseTracker::NoEffect) - } - } else { - Ok(WebhookResponseTracker::NoEffect) - } - } else { - logger::error!( - "Webhook source verification failed for external authentication webhook flow" - ); - Err(report!( - errors::ApiErrorResponse::WebhookAuthenticationFailed - )) - } -} - -pub async fn mandates_incoming_webhook_flow( - state: SessionState, - merchant_account: domain::MerchantAccount, - business_profile: diesel_models::business_profile::BusinessProfile, - key_store: domain::MerchantKeyStore, - webhook_details: api::IncomingWebhookDetails, - source_verified: bool, - event_type: webhooks::IncomingWebhookEvent, -) -> CustomResult { - if source_verified { - let db = &*state.store; - let mandate = match webhook_details.object_reference_id { - webhooks::ObjectReferenceId::MandateId(webhooks::MandateIdType::MandateId( - mandate_id, - )) => db - .find_mandate_by_merchant_id_mandate_id( - &merchant_account.merchant_id, - mandate_id.as_str(), - merchant_account.storage_scheme, - ) - .await - .to_not_found_response(errors::ApiErrorResponse::MandateNotFound)?, - webhooks::ObjectReferenceId::MandateId( - webhooks::MandateIdType::ConnectorMandateId(connector_mandate_id), - ) => db - .find_mandate_by_merchant_id_connector_mandate_id( - &merchant_account.merchant_id, - connector_mandate_id.as_str(), - merchant_account.storage_scheme, - ) - .await - .to_not_found_response(errors::ApiErrorResponse::MandateNotFound)?, - _ => Err(errors::ApiErrorResponse::WebhookProcessingFailure) - .attach_printable("received a non-mandate id for retrieving mandate")?, - }; - let mandate_status = common_enums::MandateStatus::foreign_try_from(event_type) - .change_context(errors::ApiErrorResponse::WebhookProcessingFailure) - .attach_printable("event type to mandate status mapping failed")?; - let mandate_id = mandate.mandate_id.clone(); - let updated_mandate = db - .update_mandate_by_merchant_id_mandate_id( - &merchant_account.merchant_id, - &mandate_id, - storage::MandateUpdate::StatusUpdate { mandate_status }, - mandate, - merchant_account.storage_scheme, - ) - .await - .to_not_found_response(errors::ApiErrorResponse::MandateNotFound)?; - let mandates_response = Box::new( - api::mandates::MandateResponse::from_db_mandate( - &state, - key_store.clone(), - updated_mandate.clone(), - merchant_account.storage_scheme, - ) - .await?, - ); - let event_type: Option = updated_mandate.mandate_status.foreign_into(); - if let Some(outgoing_event_type) = event_type { - create_event_and_trigger_outgoing_webhook( - state, - merchant_account, - business_profile, - &key_store, - outgoing_event_type, - enums::EventClass::Mandates, - updated_mandate.mandate_id.clone(), - enums::EventObjectType::MandateDetails, - api::OutgoingWebhookContent::MandateDetails(mandates_response), - Some(updated_mandate.created_at), - ) - .await?; - } - Ok(WebhookResponseTracker::Mandate { - mandate_id: updated_mandate.mandate_id, - status: updated_mandate.mandate_status, - }) - } else { - logger::error!("Webhook source verification failed for mandates webhook flow"); - Err(report!( - errors::ApiErrorResponse::WebhookAuthenticationFailed - )) - } -} - -#[allow(clippy::too_many_arguments)] -#[instrument(skip_all)] -pub(crate) async fn frm_incoming_webhook_flow( - state: SessionState, - req_state: ReqState, - merchant_account: domain::MerchantAccount, - key_store: domain::MerchantKeyStore, - source_verified: bool, - event_type: webhooks::IncomingWebhookEvent, - object_ref_id: api::ObjectReferenceId, - business_profile: diesel_models::business_profile::BusinessProfile, -) -> CustomResult { - if source_verified { - let payment_attempt = - get_payment_attempt_from_object_reference_id(&state, object_ref_id, &merchant_account) - .await?; - let payment_response = match event_type { - webhooks::IncomingWebhookEvent::FrmApproved => { - Box::pin(payments::payments_core::< - api::Capture, - api::PaymentsResponse, - _, - _, - _, - >( - state.clone(), - req_state, - merchant_account.clone(), - key_store.clone(), - payments::PaymentApprove, - api::PaymentsCaptureRequest { - payment_id: payment_attempt.payment_id, - amount_to_capture: payment_attempt.amount_to_capture, - ..Default::default() - }, - services::api::AuthFlow::Merchant, - payments::CallConnectorAction::Trigger, - None, - HeaderPayload::default(), - )) - .await? - } - webhooks::IncomingWebhookEvent::FrmRejected => { - Box::pin(payments::payments_core::< - api::Void, - api::PaymentsResponse, - _, - _, - _, - >( - state.clone(), - req_state, - merchant_account.clone(), - key_store.clone(), - payments::PaymentReject, - api::PaymentsCancelRequest { - payment_id: payment_attempt.payment_id.clone(), - cancellation_reason: Some( - "Rejected by merchant based on FRM decision".to_string(), - ), - ..Default::default() - }, - services::api::AuthFlow::Merchant, - payments::CallConnectorAction::Trigger, - None, - HeaderPayload::default(), - )) - .await? - } - _ => Err(errors::ApiErrorResponse::EventNotFound)?, - }; - match payment_response { - services::ApplicationResponse::JsonWithHeaders((payments_response, _)) => { - let payment_id = payments_response - .payment_id - .clone() - .get_required_value("payment_id") - .change_context(errors::ApiErrorResponse::WebhookProcessingFailure) - .attach_printable("payment id not received from payments core")?; - let status = payments_response.status; - let event_type: Option = payments_response.status.foreign_into(); - if let Some(outgoing_event_type) = event_type { - let primary_object_created_at = payments_response.created; - create_event_and_trigger_outgoing_webhook( - state, - merchant_account, - business_profile, - &key_store, - outgoing_event_type, - enums::EventClass::Payments, - payment_id.clone(), - enums::EventObjectType::PaymentDetails, - api::OutgoingWebhookContent::PaymentDetails(payments_response), - primary_object_created_at, - ) - .await?; - }; - let response = WebhookResponseTracker::Payment { payment_id, status }; - Ok(response) - } - _ => Err(errors::ApiErrorResponse::WebhookProcessingFailure).attach_printable( - "Did not get payment id as object reference id in webhook payments flow", - )?, - } - } else { - logger::error!("Webhook source verification failed for frm webhooks flow"); - Err(report!( - errors::ApiErrorResponse::WebhookAuthenticationFailed - )) - } -} - -#[allow(clippy::too_many_arguments)] -#[instrument(skip_all)] -pub async fn disputes_incoming_webhook_flow( - state: SessionState, - merchant_account: domain::MerchantAccount, - business_profile: diesel_models::business_profile::BusinessProfile, - key_store: domain::MerchantKeyStore, - webhook_details: api::IncomingWebhookDetails, - source_verified: bool, - connector: &(dyn api::Connector + Sync), - request_details: &api::IncomingWebhookRequestDetails<'_>, - event_type: webhooks::IncomingWebhookEvent, -) -> CustomResult { - metrics::INCOMING_DISPUTE_WEBHOOK_METRIC.add(&metrics::CONTEXT, 1, &[]); - if source_verified { - let db = &*state.store; - let dispute_details = connector.get_dispute_details(request_details).switch()?; - let payment_attempt = get_payment_attempt_from_object_reference_id( - &state, - webhook_details.object_reference_id, - &merchant_account, - ) - .await?; - let option_dispute = db - .find_by_merchant_id_payment_id_connector_dispute_id( - &merchant_account.merchant_id, - &payment_attempt.payment_id, - &dispute_details.connector_dispute_id, - ) - .await - .to_not_found_response(errors::ApiErrorResponse::WebhookResourceNotFound)?; - let dispute_object = get_or_update_dispute_object( - state.clone(), - option_dispute, - dispute_details, - &merchant_account.merchant_id, - &payment_attempt, - event_type, - &business_profile, - connector.id(), - ) - .await?; - let disputes_response = Box::new(dispute_object.clone().foreign_into()); - let event_type: enums::EventType = dispute_object.dispute_status.foreign_into(); - - create_event_and_trigger_outgoing_webhook( - state, - merchant_account, - business_profile, - &key_store, - event_type, - enums::EventClass::Disputes, - dispute_object.dispute_id.clone(), - enums::EventObjectType::DisputeDetails, - api::OutgoingWebhookContent::DisputeDetails(disputes_response), - Some(dispute_object.created_at), - ) - .await?; - metrics::INCOMING_DISPUTE_WEBHOOK_MERCHANT_NOTIFIED_METRIC.add(&metrics::CONTEXT, 1, &[]); - Ok(WebhookResponseTracker::Dispute { - dispute_id: dispute_object.dispute_id, - payment_id: dispute_object.payment_id, - status: dispute_object.dispute_status, - }) - } else { - metrics::INCOMING_DISPUTE_WEBHOOK_SIGNATURE_FAILURE_METRIC.add(&metrics::CONTEXT, 1, &[]); - Err(report!( - errors::ApiErrorResponse::WebhookAuthenticationFailed - )) - } -} - -async fn bank_transfer_webhook_flow( - state: SessionState, - req_state: ReqState, - merchant_account: domain::MerchantAccount, - business_profile: diesel_models::business_profile::BusinessProfile, - key_store: domain::MerchantKeyStore, - webhook_details: api::IncomingWebhookDetails, - source_verified: bool, -) -> CustomResult { - let response = if source_verified { - let payment_attempt = get_payment_attempt_from_object_reference_id( - &state, - webhook_details.object_reference_id, - &merchant_account, - ) - .await?; - let payment_id = payment_attempt.payment_id; - let request = api::PaymentsRequest { - payment_id: Some(api_models::payments::PaymentIdType::PaymentIntentId( - payment_id, - )), - payment_token: payment_attempt.payment_token, - ..Default::default() - }; - Box::pin(payments::payments_core::< - api::Authorize, - api::PaymentsResponse, - _, - _, - _, - >( - state.clone(), - req_state, - merchant_account.to_owned(), - key_store.clone(), - payments::PaymentConfirm, - request, - services::api::AuthFlow::Merchant, - payments::CallConnectorAction::Trigger, - None, - HeaderPayload::with_source(common_enums::PaymentSource::Webhook), - )) - .await - } else { - Err(report!( - errors::ApiErrorResponse::WebhookAuthenticationFailed - )) - }; - - match response? { - services::ApplicationResponse::JsonWithHeaders((payments_response, _)) => { - let payment_id = payments_response - .payment_id - .clone() - .get_required_value("payment_id") - .change_context(errors::ApiErrorResponse::WebhookProcessingFailure) - .attach_printable("did not receive payment id from payments core response")?; - - let event_type: Option = payments_response.status.foreign_into(); - let status = payments_response.status; - - // If event is NOT an UnsupportedEvent, trigger Outgoing Webhook - if let Some(outgoing_event_type) = event_type { - let primary_object_created_at = payments_response.created; - create_event_and_trigger_outgoing_webhook( - state, - merchant_account, - business_profile, - &key_store, - outgoing_event_type, - enums::EventClass::Payments, - payment_id.clone(), - enums::EventObjectType::PaymentDetails, - api::OutgoingWebhookContent::PaymentDetails(payments_response), - primary_object_created_at, - ) - .await?; - } - - Ok(WebhookResponseTracker::Payment { payment_id, status }) - } - - _ => Err(errors::ApiErrorResponse::WebhookProcessingFailure) - .attach_printable("received non-json response from payments core")?, - } -} - -#[allow(clippy::too_many_arguments)] -#[instrument(skip_all)] -pub(crate) async fn create_event_and_trigger_outgoing_webhook( - state: SessionState, - merchant_account: domain::MerchantAccount, - business_profile: diesel_models::business_profile::BusinessProfile, - merchant_key_store: &domain::MerchantKeyStore, - event_type: enums::EventType, - event_class: enums::EventClass, - primary_object_id: String, - primary_object_type: enums::EventObjectType, - content: api::OutgoingWebhookContent, - primary_object_created_at: Option, -) -> CustomResult<(), errors::ApiErrorResponse> { - let delivery_attempt = enums::WebhookDeliveryAttempt::InitialAttempt; - let idempotent_event_id = - utils::get_idempotent_event_id(&primary_object_id, event_type, delivery_attempt); - let webhook_url_result = get_webhook_url_from_business_profile(&business_profile); - - if !state.conf.webhooks.outgoing_enabled - || webhook_url_result.is_err() - || webhook_url_result.as_ref().is_ok_and(String::is_empty) - { - logger::debug!( - business_profile_id=%business_profile.profile_id, - %idempotent_event_id, - "Outgoing webhooks are disabled in application configuration, or merchant webhook URL \ - could not be obtained; skipping outgoing webhooks for event" - ); - return Ok(()); - } - - let event_id = utils::generate_event_id(); - let merchant_id = business_profile.merchant_id.clone(); - let now = common_utils::date_time::now(); - - let outgoing_webhook = api::OutgoingWebhook { - merchant_id: merchant_id.clone(), - event_id: event_id.clone(), - event_type, - content: content.clone(), - timestamp: now, - }; - - let request_content = get_outgoing_webhook_request( - &merchant_account, - outgoing_webhook, - business_profile.payment_response_hash_key.as_deref(), - ) - .change_context(errors::ApiErrorResponse::WebhookProcessingFailure) - .attach_printable("Failed to construct outgoing webhook request content")?; - - let new_event = domain::Event { - event_id: event_id.clone(), - event_type, - event_class, - is_webhook_notified: false, - primary_object_id, - primary_object_type, - created_at: now, - merchant_id: Some(business_profile.merchant_id.clone()), - business_profile_id: Some(business_profile.profile_id.clone()), - primary_object_created_at, - idempotent_event_id: Some(idempotent_event_id.clone()), - initial_attempt_id: Some(event_id.clone()), - request: Some( - domain_types::encrypt( - request_content - .encode_to_string_of_json() - .change_context(errors::ApiErrorResponse::WebhookProcessingFailure) - .attach_printable("Failed to encode outgoing webhook request content") - .map(Secret::new)?, - merchant_key_store.key.get_inner().peek(), - ) - .await - .change_context(errors::ApiErrorResponse::WebhookProcessingFailure) - .attach_printable("Failed to encrypt outgoing webhook request content")?, - ), - response: None, - delivery_attempt: Some(delivery_attempt), - }; - - let event_insert_result = state - .store - .insert_event(new_event, merchant_key_store) - .await; - - let event = match event_insert_result { - Ok(event) => Ok(event), - Err(error) => { - if error.current_context().is_db_unique_violation() { - logger::debug!("Event with idempotent ID `{idempotent_event_id}` already exists in the database"); - return Ok(()); - } else { - logger::error!(event_insertion_failure=?error); - Err(error - .change_context(errors::ApiErrorResponse::WebhookProcessingFailure) - .attach_printable("Failed to insert event in events table")) - } - } - }?; - - let process_tracker = add_outgoing_webhook_retry_task_to_process_tracker( - &*state.store, - &business_profile, - &event, - ) - .await - .map_err(|error| { - logger::error!( - ?error, - "Failed to add outgoing webhook retry task to process tracker" - ); - error - }) - .ok(); - - let cloned_key_store = merchant_key_store.clone(); - // Using a tokio spawn here and not arbiter because not all caller of this function - // may have an actix arbiter - tokio::spawn( - async move { - Box::pin(trigger_webhook_and_raise_event( - state, - business_profile, - &cloned_key_store, - event, - request_content, - delivery_attempt, - Some(content), - process_tracker, - )) - .await; - } - .in_current_span(), - ); - - Ok(()) -} - -#[allow(clippy::too_many_arguments)] -#[instrument(skip_all)] -pub(crate) async fn trigger_webhook_and_raise_event( - state: SessionState, - business_profile: diesel_models::business_profile::BusinessProfile, - merchant_key_store: &domain::MerchantKeyStore, - event: domain::Event, - request_content: OutgoingWebhookRequestContent, - delivery_attempt: enums::WebhookDeliveryAttempt, - content: Option, - process_tracker: Option, -) { - logger::debug!( - event_id=%event.event_id, - idempotent_event_id=?event.idempotent_event_id, - initial_attempt_id=?event.initial_attempt_id, - "Attempting to send webhook" - ); - - let merchant_id = business_profile.merchant_id.clone(); - let trigger_webhook_result = trigger_webhook_to_merchant( - state.clone(), - business_profile, - merchant_key_store, - event.clone(), - request_content, - delivery_attempt, - process_tracker, - ) - .await; - - raise_webhooks_analytics_event(state, trigger_webhook_result, content, merchant_id, event); -} - -async fn trigger_webhook_to_merchant( - state: SessionState, - business_profile: diesel_models::business_profile::BusinessProfile, - merchant_key_store: &domain::MerchantKeyStore, - event: domain::Event, - request_content: OutgoingWebhookRequestContent, - delivery_attempt: enums::WebhookDeliveryAttempt, - process_tracker: Option, -) -> CustomResult<(), errors::WebhooksFlowError> { - let webhook_url = match ( - get_webhook_url_from_business_profile(&business_profile), - process_tracker.clone(), - ) { - (Ok(webhook_url), _) => Ok(webhook_url), - (Err(error), Some(process_tracker)) => { - if !error - .current_context() - .is_webhook_delivery_retryable_error() - { - logger::debug!("Failed to obtain merchant webhook URL, aborting retries"); - state - .store - .as_scheduler() - .finish_process_with_business_status(process_tracker, "FAILURE".into()) - .await - .change_context( - errors::WebhooksFlowError::OutgoingWebhookProcessTrackerTaskUpdateFailed, - )?; - } - Err(error) - } - (Err(error), None) => Err(error), - }?; - - let event_id = event.event_id; - - let headers = request_content - .headers - .into_iter() - .map(|(name, value)| (name, value.into_masked())) - .collect(); - let request = services::RequestBuilder::new() - .method(services::Method::Post) - .url(&webhook_url) - .attach_default_headers() - .headers(headers) - .set_body(RequestContent::RawBytes( - request_content.body.expose().into_bytes(), - )) - .build(); - - let response = state - .api_client - .send_request(&state, request, Some(OUTGOING_WEBHOOK_TIMEOUT_SECS), false) - .await; - - metrics::WEBHOOK_OUTGOING_COUNT.add( - &metrics::CONTEXT, - 1, - &[metrics::KeyValue::new( - MERCHANT_ID, - business_profile.merchant_id.clone(), - )], - ); - logger::debug!(outgoing_webhook_response=?response); - - let update_event_if_client_error = - |state: SessionState, - merchant_key_store: domain::MerchantKeyStore, - merchant_id: String, - event_id: String, - error_message: String| async move { - let is_webhook_notified = false; - - let response_to_store = OutgoingWebhookResponseContent { - body: None, - headers: None, - status_code: None, - error_message: Some(error_message), - }; - - let event_update = domain::EventUpdate::UpdateResponse { - is_webhook_notified, - response: Some( - domain_types::encrypt( - response_to_store - .encode_to_string_of_json() - .change_context( - errors::WebhooksFlowError::OutgoingWebhookResponseEncodingFailed, - ) - .map(Secret::new)?, - merchant_key_store.key.get_inner().peek(), - ) - .await - .change_context(errors::WebhooksFlowError::WebhookEventUpdationFailed) - .attach_printable("Failed to encrypt outgoing webhook response content")?, - ), - }; - - state - .store - .update_event_by_merchant_id_event_id( - &merchant_id, - &event_id, - event_update, - &merchant_key_store, - ) - .await - .change_context(errors::WebhooksFlowError::WebhookEventUpdationFailed) - }; - - let api_client_error_handler = - |state: SessionState, - merchant_key_store: domain::MerchantKeyStore, - merchant_id: String, - event_id: String, - client_error: error_stack::Report, - delivery_attempt: enums::WebhookDeliveryAttempt| async move { - // Not including detailed error message in response information since it contains too - // much of diagnostic information to be exposed to the merchant. - update_event_if_client_error( - state, - merchant_key_store, - merchant_id, - event_id, - "Unable to send request to merchant server".to_string(), - ) - .await?; - - let error = - client_error.change_context(errors::WebhooksFlowError::CallToMerchantFailed); - logger::error!( - ?error, - ?delivery_attempt, - "An error occurred when sending webhook to merchant" - ); - - Ok::<_, error_stack::Report>(()) - }; - let update_event_in_storage = |state: SessionState, - merchant_key_store: domain::MerchantKeyStore, - merchant_id: String, - event_id: String, - response: reqwest::Response| async move { - let status_code = response.status(); - let is_webhook_notified = status_code.is_success(); - - let response_headers = response - .headers() - .iter() - .map(|(name, value)| { - ( - name.as_str().to_owned(), - value - .to_str() - .map(|s| Secret::from(String::from(s))) - .unwrap_or_else(|error| { - logger::warn!( - "Response header {} contains non-UTF-8 characters: {error:?}", - name.as_str() - ); - Secret::from(String::from("Non-UTF-8 header value")) - }), - ) - }) - .collect::>(); - let response_body = response - .text() - .await - .map(Secret::from) - .unwrap_or_else(|error| { - logger::warn!("Response contains non-UTF-8 characters: {error:?}"); - Secret::from(String::from("Non-UTF-8 response body")) - }); - let response_to_store = OutgoingWebhookResponseContent { - body: Some(response_body), - headers: Some(response_headers), - status_code: Some(status_code.as_u16()), - error_message: None, - }; - - let event_update = domain::EventUpdate::UpdateResponse { - is_webhook_notified, - response: Some( - domain_types::encrypt( - response_to_store - .encode_to_string_of_json() - .change_context( - errors::WebhooksFlowError::OutgoingWebhookResponseEncodingFailed, - ) - .map(Secret::new)?, - merchant_key_store.key.get_inner().peek(), - ) - .await - .change_context(errors::WebhooksFlowError::WebhookEventUpdationFailed) - .attach_printable("Failed to encrypt outgoing webhook response content")?, - ), - }; - state - .store - .update_event_by_merchant_id_event_id( - &merchant_id, - &event_id, - event_update, - &merchant_key_store, - ) - .await - .change_context(errors::WebhooksFlowError::WebhookEventUpdationFailed) - }; - let increment_webhook_outgoing_received_count = |merchant_id: String| { - metrics::WEBHOOK_OUTGOING_RECEIVED_COUNT.add( - &metrics::CONTEXT, - 1, - &[metrics::KeyValue::new(MERCHANT_ID, merchant_id)], - ) - }; - let success_response_handler = - |state: SessionState, - merchant_id: String, - process_tracker: Option, - business_status: &'static str| async move { - increment_webhook_outgoing_received_count(merchant_id); - - match process_tracker { - Some(process_tracker) => state - .store - .as_scheduler() - .finish_process_with_business_status(process_tracker, business_status.into()) - .await - .change_context( - errors::WebhooksFlowError::OutgoingWebhookProcessTrackerTaskUpdateFailed, - ), - None => Ok(()), - } - }; - let error_response_handler = |merchant_id: String, - delivery_attempt: enums::WebhookDeliveryAttempt, - status_code: u16, - log_message: &'static str| { - metrics::WEBHOOK_OUTGOING_NOT_RECEIVED_COUNT.add( - &metrics::CONTEXT, - 1, - &[metrics::KeyValue::new(MERCHANT_ID, merchant_id)], - ); - - let error = report!(errors::WebhooksFlowError::NotReceivedByMerchant); - logger::warn!(?error, ?delivery_attempt, ?status_code, %log_message); - }; - - match delivery_attempt { - enums::WebhookDeliveryAttempt::InitialAttempt => match response { - Err(client_error) => { - api_client_error_handler( - state.clone(), - merchant_key_store.clone(), - business_profile.merchant_id.clone(), - event_id.clone(), - client_error, - delivery_attempt, - ) - .await? - } - Ok(response) => { - let status_code = response.status(); - let _updated_event = update_event_in_storage( - state.clone(), - merchant_key_store.clone(), - business_profile.merchant_id.clone(), - event_id.clone(), - response, - ) - .await?; - - if status_code.is_success() { - success_response_handler( - state.clone(), - business_profile.merchant_id, - process_tracker, - "INITIAL_DELIVERY_ATTEMPT_SUCCESSFUL", - ) - .await?; - } else { - error_response_handler( - business_profile.merchant_id, - delivery_attempt, - status_code.as_u16(), - "Ignoring error when sending webhook to merchant", - ); - } - } - }, - enums::WebhookDeliveryAttempt::AutomaticRetry => { - let process_tracker = process_tracker - .get_required_value("process_tracker") - .change_context(errors::WebhooksFlowError::OutgoingWebhookRetrySchedulingFailed) - .attach_printable("`process_tracker` is unavailable in automatic retry flow")?; - match response { - Err(client_error) => { - api_client_error_handler( - state.clone(), - merchant_key_store.clone(), - business_profile.merchant_id.clone(), - event_id.clone(), - client_error, - delivery_attempt, - ) - .await?; - // Schedule a retry attempt for webhook delivery - outgoing_webhook_retry::retry_webhook_delivery_task( - &*state.store, - &business_profile.merchant_id, - process_tracker, - ) - .await - .change_context( - errors::WebhooksFlowError::OutgoingWebhookRetrySchedulingFailed, - )?; - } - Ok(response) => { - let status_code = response.status(); - let _updated_event = update_event_in_storage( - state.clone(), - merchant_key_store.clone(), - business_profile.merchant_id.clone(), - event_id.clone(), - response, - ) - .await?; - - if status_code.is_success() { - success_response_handler( - state.clone(), - business_profile.merchant_id, - Some(process_tracker), - "COMPLETED_BY_PT", - ) - .await?; - } else { - error_response_handler( - business_profile.merchant_id.clone(), - delivery_attempt, - status_code.as_u16(), - "An error occurred when sending webhook to merchant", - ); - // Schedule a retry attempt for webhook delivery - outgoing_webhook_retry::retry_webhook_delivery_task( - &*state.store, - &business_profile.merchant_id, - process_tracker, - ) - .await - .change_context( - errors::WebhooksFlowError::OutgoingWebhookRetrySchedulingFailed, - )?; - } - } - } - } - enums::WebhookDeliveryAttempt::ManualRetry => match response { - Err(client_error) => { - api_client_error_handler( - state.clone(), - merchant_key_store.clone(), - business_profile.merchant_id.clone(), - event_id.clone(), - client_error, - delivery_attempt, - ) - .await? - } - Ok(response) => { - let status_code = response.status(); - let _updated_event = update_event_in_storage( - state.clone(), - merchant_key_store.clone(), - business_profile.merchant_id.clone(), - event_id.clone(), - response, - ) - .await?; - - if status_code.is_success() { - increment_webhook_outgoing_received_count(business_profile.merchant_id.clone()); - } else { - error_response_handler( - business_profile.merchant_id, - delivery_attempt, - status_code.as_u16(), - "Ignoring error when sending webhook to merchant", - ); - } - } - }, - } - - Ok(()) -} - -fn raise_webhooks_analytics_event( - state: SessionState, - trigger_webhook_result: CustomResult<(), errors::WebhooksFlowError>, - content: Option, - merchant_id: String, - event: domain::Event, -) { - let error = if let Err(error) = trigger_webhook_result { - logger::error!(?error, "Failed to send webhook to merchant"); - - serde_json::to_value(error.current_context()) - .change_context(errors::ApiErrorResponse::WebhookProcessingFailure) - .map_err(|error| { - logger::error!(?error, "Failed to serialize outgoing webhook error as JSON"); - error - }) - .ok() - } else { - None - }; - - let outgoing_webhook_event_content = content - .as_ref() - .and_then(api::OutgoingWebhookContent::get_outgoing_webhook_event_content); - let webhook_event = OutgoingWebhookEvent::new( - merchant_id, - event.event_id, - event.event_type, - outgoing_webhook_event_content, - error, - event.initial_attempt_id, - ); - state.event_handler().log_event(&webhook_event); -} - -#[allow(clippy::too_many_arguments)] -pub async fn webhooks_wrapper( - flow: &impl router_env::types::FlowMetric, - state: SessionState, - req_state: ReqState, - req: &actix_web::HttpRequest, - merchant_account: domain::MerchantAccount, - key_store: domain::MerchantKeyStore, - connector_name_or_mca_id: &str, - body: actix_web::web::Bytes, -) -> RouterResponse { - let start_instant = Instant::now(); - let (application_response, webhooks_response_tracker, serialized_req) = - Box::pin(webhooks_core::( - state.clone(), - req_state, - req, - merchant_account.clone(), - key_store, - connector_name_or_mca_id, - body.clone(), - )) - .await?; - - logger::info!(incoming_webhook_payload = ?serialized_req); - - let request_duration = Instant::now() - .saturating_duration_since(start_instant) - .as_millis(); - - let request_id = RequestId::extract(req) - .await - .attach_printable("Unable to extract request id from request") - .change_context(errors::ApiErrorResponse::InternalServerError)?; - let auth_type = auth::AuthenticationType::WebhookAuth { - merchant_id: merchant_account.merchant_id.clone(), - }; - let status_code = 200; - let api_event = ApiEventsType::Webhooks { - connector: connector_name_or_mca_id.to_string(), - payment_id: webhooks_response_tracker.get_payment_id(), - }; - let response_value = serde_json::to_value(&webhooks_response_tracker) - .change_context(errors::ApiErrorResponse::InternalServerError) - .attach_printable("Could not convert webhook effect to string")?; - - let api_event = ApiEvent::new( - Some(merchant_account.merchant_id.clone()), - flow, - &request_id, - request_duration, - status_code, - serialized_req, - Some(response_value), - None, - auth_type, - None, - api_event, - req, - req.method(), - ); - state.event_handler().log_event(&api_event); - Ok(application_response) -} - -#[instrument(skip_all)] -pub async fn webhooks_core( - state: SessionState, - req_state: ReqState, - req: &actix_web::HttpRequest, - merchant_account: domain::MerchantAccount, - key_store: domain::MerchantKeyStore, - connector_name_or_mca_id: &str, - body: actix_web::web::Bytes, -) -> errors::RouterResult<( - services::ApplicationResponse, - WebhookResponseTracker, - serde_json::Value, -)> { - metrics::WEBHOOK_INCOMING_COUNT.add( - &metrics::CONTEXT, - 1, - &[metrics::KeyValue::new( - MERCHANT_ID, - merchant_account.merchant_id.clone(), - )], - ); - let mut request_details = api::IncomingWebhookRequestDetails { - method: req.method().clone(), - uri: req.uri().clone(), - headers: req.headers(), - query_params: req.query_string().to_string(), - body: &body, - }; - - // Fetch the merchant connector account to get the webhooks source secret - // `webhooks source secret` is a secret shared between the merchant and connector - // This is used for source verification and webhooks integrity - let (merchant_connector_account, connector, connector_name) = fetch_optional_mca_and_connector( - &state, - &merchant_account, - connector_name_or_mca_id, - &key_store, - ) - .await?; - - let decoded_body = connector - .decode_webhook_body( - &*state.clone().store, - &request_details, - &merchant_account.merchant_id, - ) - .await - .switch() - .attach_printable("There was an error in incoming webhook body decoding")?; - - request_details.body = &decoded_body; - - let event_type = match connector - .get_webhook_event_type(&request_details) - .allow_webhook_event_type_not_found( - state - .clone() - .conf - .webhooks - .ignore_error - .event_type - .unwrap_or(true), - ) - .switch() - .attach_printable("Could not find event type in incoming webhook body")? - { - Some(event_type) => event_type, - // Early return allows us to acknowledge the webhooks that we do not support - None => { - logger::error!( - webhook_payload =? request_details.body, - "Failed while identifying the event type", - ); - - metrics::WEBHOOK_EVENT_TYPE_IDENTIFICATION_FAILURE_COUNT.add( - &metrics::CONTEXT, - 1, - &[ - metrics::KeyValue::new(MERCHANT_ID, merchant_account.merchant_id.clone()), - metrics::KeyValue::new("connector", connector_name.to_string()), - ], - ); - - let response = connector - .get_webhook_api_response(&request_details) - .switch() - .attach_printable("Failed while early return in case of event type parsing")?; - - return Ok(( - response, - WebhookResponseTracker::NoEffect, - serde_json::Value::Null, - )); - } - }; - logger::info!(event_type=?event_type); - - let is_webhook_event_supported = !matches!( - event_type, - webhooks::IncomingWebhookEvent::EventNotSupported - ); - let is_webhook_event_enabled = !utils::is_webhook_event_disabled( - &*state.clone().store, - connector_name.as_str(), - &merchant_account.merchant_id, - &event_type, - ) - .await; - - //process webhook further only if webhook event is enabled and is not event_not_supported - let process_webhook_further = is_webhook_event_enabled && is_webhook_event_supported; - - logger::info!(process_webhook=?process_webhook_further); - - let flow_type: api::WebhookFlow = event_type.into(); - let mut event_object: Box = Box::new(serde_json::Value::Null); - let webhook_effect = if process_webhook_further - && !matches!(flow_type, api::WebhookFlow::ReturnResponse) - { - let object_ref_id = connector - .get_webhook_object_reference_id(&request_details) - .switch() - .attach_printable("Could not find object reference id in incoming webhook body")?; - let connector_enum = api_models::enums::Connector::from_str(&connector_name) - .change_context(errors::ApiErrorResponse::InvalidDataValue { - field_name: "connector", - }) - .attach_printable_lazy(|| { - format!("unable to parse connector name {connector_name:?}") - })?; - let connectors_with_source_verification_call = &state.conf.webhook_source_verification_call; - - let merchant_connector_account = match merchant_connector_account { - Some(merchant_connector_account) => merchant_connector_account, - None => { - helper_utils::get_mca_from_object_reference_id( - &*state.clone().store, - object_ref_id.clone(), - &merchant_account, - &connector_name, - &key_store, - ) - .await? - } - }; - - let source_verified = if connectors_with_source_verification_call - .connectors_with_webhook_source_verification_call - .contains(&connector_enum) - { - connector - .verify_webhook_source_verification_call( - &state, - &merchant_account, - merchant_connector_account.clone(), - &connector_name, - &request_details, - ) - .await - .or_else(|error| match error.current_context() { - errors::ConnectorError::WebhookSourceVerificationFailed => { - logger::error!(?error, "Source Verification Failed"); - Ok(false) - } - _ => Err(error), - }) - .switch() - .attach_printable("There was an issue in incoming webhook source verification")? - } else { - connector - .verify_webhook_source( - &request_details, - &merchant_account, - merchant_connector_account.clone(), - connector_name.as_str(), - ) - .await - .or_else(|error| match error.current_context() { - errors::ConnectorError::WebhookSourceVerificationFailed => { - logger::error!(?error, "Source Verification Failed"); - Ok(false) - } - _ => Err(error), - }) - .switch() - .attach_printable("There was an issue in incoming webhook source verification")? - }; - - if source_verified { - metrics::WEBHOOK_SOURCE_VERIFIED_COUNT.add( - &metrics::CONTEXT, - 1, - &[metrics::KeyValue::new( - MERCHANT_ID, - merchant_account.merchant_id.clone(), - )], - ); - } else if connector.is_webhook_source_verification_mandatory() { - // if webhook consumption is mandatory for connector, fail webhook - // so that merchant can retrigger it after updating merchant_secret - return Err(errors::ApiErrorResponse::WebhookAuthenticationFailed.into()); - } - - logger::info!(source_verified=?source_verified); - - event_object = connector - .get_webhook_resource_object(&request_details) - .switch() - .attach_printable("Could not find resource object in incoming webhook body")?; - - let webhook_details = api::IncomingWebhookDetails { - object_reference_id: object_ref_id.clone(), - resource_object: serde_json::to_vec(&event_object) - .change_context(errors::ParsingError::EncodeError("byte-vec")) - .attach_printable("Unable to convert webhook payload to a value") - .change_context(errors::ApiErrorResponse::InternalServerError) - .attach_printable( - "There was an issue when encoding the incoming webhook body to bytes", - )?, - }; - - let profile_id = merchant_connector_account - .profile_id - .as_ref() - .get_required_value("profile_id") - .change_context(errors::ApiErrorResponse::InternalServerError) - .attach_printable("Could not find profile_id in merchant connector account")?; - - let business_profile = state - .store - .find_business_profile_by_profile_id(profile_id) - .await - .to_not_found_response(errors::ApiErrorResponse::BusinessProfileNotFound { - id: profile_id.to_string(), - })?; - - match flow_type { - api::WebhookFlow::Payment => Box::pin(payments_incoming_webhook_flow( - state.clone(), - req_state, - merchant_account, - business_profile, - key_store, - webhook_details, - source_verified, - )) - .await - .attach_printable("Incoming webhook flow for payments failed")?, - - api::WebhookFlow::Refund => Box::pin(refunds_incoming_webhook_flow( - state.clone(), - merchant_account, - business_profile, - key_store, - webhook_details, - connector_name.as_str(), - source_verified, - event_type, - )) - .await - .attach_printable("Incoming webhook flow for refunds failed")?, - - api::WebhookFlow::Dispute => Box::pin(disputes_incoming_webhook_flow( - state.clone(), - merchant_account, - business_profile, - key_store, - webhook_details, - source_verified, - connector, - &request_details, - event_type, - )) - .await - .attach_printable("Incoming webhook flow for disputes failed")?, - - api::WebhookFlow::BankTransfer => Box::pin(bank_transfer_webhook_flow( - state.clone(), - req_state, - merchant_account, - business_profile, - key_store, - webhook_details, - source_verified, - )) - .await - .attach_printable("Incoming bank-transfer webhook flow failed")?, - - api::WebhookFlow::ReturnResponse => WebhookResponseTracker::NoEffect, - - api::WebhookFlow::Mandate => Box::pin(mandates_incoming_webhook_flow( - state.clone(), - merchant_account, - business_profile, - key_store, - webhook_details, - source_verified, - event_type, - )) - .await - .attach_printable("Incoming webhook flow for mandates failed")?, - - api::WebhookFlow::ExternalAuthentication => { - Box::pin(external_authentication_incoming_webhook_flow( - state.clone(), - req_state, - merchant_account, - key_store, - source_verified, - event_type, - &request_details, - connector, - object_ref_id, - business_profile, - merchant_connector_account, - )) - .await - .attach_printable("Incoming webhook flow for external authentication failed")? - } - api::WebhookFlow::FraudCheck => Box::pin(frm_incoming_webhook_flow( - state.clone(), - req_state, - merchant_account, - key_store, - source_verified, - event_type, - object_ref_id, - business_profile, - )) - .await - .attach_printable("Incoming webhook flow for fraud check failed")?, - - #[cfg(feature = "payouts")] - api::WebhookFlow::Payout => Box::pin(payouts_incoming_webhook_flow( - state.clone(), - merchant_account, - business_profile, - key_store, - webhook_details, - event_type, - source_verified, - )) - .await - .attach_printable("Incoming webhook flow for payouts failed")?, - - _ => Err(errors::ApiErrorResponse::InternalServerError) - .attach_printable("Unsupported Flow Type received in incoming webhooks")?, - } - } else { - metrics::WEBHOOK_INCOMING_FILTERED_COUNT.add( - &metrics::CONTEXT, - 1, - &[metrics::KeyValue::new( - MERCHANT_ID, - merchant_account.merchant_id.clone(), - )], - ); - WebhookResponseTracker::NoEffect - }; - - let response = connector - .get_webhook_api_response(&request_details) - .switch() - .attach_printable("Could not get incoming webhook api response from connector")?; - - let serialized_request = event_object - .masked_serialize() - .change_context(errors::ApiErrorResponse::InternalServerError) - .attach_printable("Could not convert webhook effect to string")?; - Ok((response, webhook_effect, serialized_request)) -} - -#[inline] -pub async fn get_payment_id( - db: &dyn StorageInterface, - payment_id: &api::PaymentIdType, - merchant_id: &str, - storage_scheme: enums::MerchantStorageScheme, -) -> errors::RouterResult { - let pay_id = || async { - match payment_id { - api_models::payments::PaymentIdType::PaymentIntentId(ref id) => Ok(id.to_string()), - api_models::payments::PaymentIdType::ConnectorTransactionId(ref id) => db - .find_payment_attempt_by_merchant_id_connector_txn_id( - merchant_id, - id, - storage_scheme, - ) - .await - .map(|p| p.payment_id), - api_models::payments::PaymentIdType::PaymentAttemptId(ref id) => db - .find_payment_attempt_by_attempt_id_merchant_id(id, merchant_id, storage_scheme) - .await - .map(|p| p.payment_id), - api_models::payments::PaymentIdType::PreprocessingId(ref id) => db - .find_payment_attempt_by_preprocessing_id_merchant_id( - id, - merchant_id, - storage_scheme, - ) - .await - .map(|p| p.payment_id), - } - }; - - pay_id() - .await - .to_not_found_response(errors::ApiErrorResponse::PaymentNotFound) -} - -fn get_connector_by_connector_name( - state: &SessionState, - connector_name: &str, - merchant_connector_id: Option, -) -> CustomResult<(&'static (dyn api::Connector + Sync), String), errors::ApiErrorResponse> { - let authentication_connector = - api_models::enums::convert_authentication_connector(connector_name); - #[cfg(feature = "frm")] - { - let frm_connector = api_models::enums::convert_frm_connector(connector_name); - if frm_connector.is_some() { - let frm_connector_data = - api::FraudCheckConnectorData::get_connector_by_name(connector_name)?; - return Ok(( - *frm_connector_data.connector, - frm_connector_data.connector_name.to_string(), - )); - } - } - - let (connector, connector_name) = if authentication_connector.is_some() { - let authentication_connector_data = - api::AuthenticationConnectorData::get_connector_by_name(connector_name)?; - ( - authentication_connector_data.connector, - authentication_connector_data.connector_name.to_string(), - ) - } else { - let connector_data = api::ConnectorData::get_connector_by_name( - &state.conf.connectors, - connector_name, - api::GetToken::Connector, - merchant_connector_id, - ) - .change_context(errors::ApiErrorResponse::InvalidRequestData { - message: "invalid connector name received".to_string(), - }) - .attach_printable("Failed construction of ConnectorData")?; - ( - connector_data.connector, - connector_data.connector_name.to_string(), - ) - }; - Ok((*connector, connector_name)) -} - -/// This function fetches the merchant connector account ( if the url used is /{merchant_connector_id}) -/// if merchant connector id is not passed in the request, then this will return None for mca -async fn fetch_optional_mca_and_connector( - state: &SessionState, - merchant_account: &domain::MerchantAccount, - connector_name_or_mca_id: &str, - key_store: &domain::MerchantKeyStore, -) -> CustomResult< - ( - Option, - &'static (dyn api::Connector + Sync), - String, - ), - errors::ApiErrorResponse, -> { - let db = &state.store; - if connector_name_or_mca_id.starts_with("mca_") { - let mca = db - .find_by_merchant_connector_account_merchant_id_merchant_connector_id( - &merchant_account.merchant_id, - connector_name_or_mca_id, - key_store, - ) - .await - .to_not_found_response(errors::ApiErrorResponse::MerchantConnectorAccountNotFound { - id: connector_name_or_mca_id.to_string(), - }) - .attach_printable( - "error while fetching merchant_connector_account from connector_id", - )?; - let (connector, connector_name) = get_connector_by_connector_name( - state, - &mca.connector_name, - Some(mca.merchant_connector_id.clone()), - )?; - - Ok((Some(mca), connector, connector_name)) - } else { - // Merchant connector account is already being queried, it is safe to set connector id as None - let (connector, connector_name) = - get_connector_by_connector_name(state, connector_name_or_mca_id, None)?; - Ok((None, connector, connector_name)) - } -} - -pub async fn add_outgoing_webhook_retry_task_to_process_tracker( - db: &dyn StorageInterface, - business_profile: &diesel_models::business_profile::BusinessProfile, - event: &domain::Event, -) -> CustomResult { - let schedule_time = outgoing_webhook_retry::get_webhook_delivery_retry_schedule_time( - db, - &business_profile.merchant_id, - 0, - ) - .await - .ok_or(errors::StorageError::ValueNotFound( - "Process tracker schedule time".into(), // Can raise a better error here - )) - .attach_printable("Failed to obtain initial process tracker schedule time")?; - - let tracking_data = types::OutgoingWebhookTrackingData { - merchant_id: business_profile.merchant_id.clone(), - business_profile_id: business_profile.profile_id.clone(), - event_type: event.event_type, - event_class: event.event_class, - primary_object_id: event.primary_object_id.clone(), - primary_object_type: event.primary_object_type, - initial_attempt_id: event.initial_attempt_id.clone(), - }; - - let runner = storage::ProcessTrackerRunner::OutgoingWebhookRetryWorkflow; - let task = "OUTGOING_WEBHOOK_RETRY"; - let tag = ["OUTGOING_WEBHOOKS"]; - let process_tracker_id = scheduler::utils::get_process_tracker_id( - runner, - task, - &event.event_id, - &business_profile.merchant_id, - ); - let process_tracker_entry = storage::ProcessTrackerNew::new( - process_tracker_id, - task, - runner, - tag, - tracking_data, - schedule_time, - ) - .map_err(errors::StorageError::from)?; - - match db.insert_process(process_tracker_entry).await { - Ok(process_tracker) => { - crate::routes::metrics::TASKS_ADDED_COUNT.add( - &metrics::CONTEXT, - 1, - &[add_attributes("flow", "OutgoingWebhookRetry")], - ); - Ok(process_tracker) - } - Err(error) => { - crate::routes::metrics::TASK_ADDITION_FAILURES_COUNT.add( - &metrics::CONTEXT, - 1, - &[add_attributes("flow", "OutgoingWebhookRetry")], - ); - Err(error) - } - } -} - -fn get_webhook_url_from_business_profile( - business_profile: &diesel_models::business_profile::BusinessProfile, -) -> CustomResult { - let webhook_details_json = business_profile - .webhook_details - .clone() - .get_required_value("webhook_details") - .change_context(errors::WebhooksFlowError::MerchantWebhookDetailsNotFound)?; - - let webhook_details: api::WebhookDetails = - webhook_details_json - .parse_value("WebhookDetails") - .change_context(errors::WebhooksFlowError::MerchantWebhookDetailsNotFound)?; - - webhook_details - .webhook_url - .get_required_value("webhook_url") - .change_context(errors::WebhooksFlowError::MerchantWebhookUrlNotConfigured) - .map(ExposeInterface::expose) -} - -pub(crate) fn get_outgoing_webhook_request( - merchant_account: &domain::MerchantAccount, - outgoing_webhook: api::OutgoingWebhook, - payment_response_hash_key: Option<&str>, -) -> CustomResult { - #[inline] - fn get_outgoing_webhook_request_inner( - outgoing_webhook: api::OutgoingWebhook, - payment_response_hash_key: Option<&str>, - ) -> CustomResult { - let mut headers = vec![( - reqwest::header::CONTENT_TYPE.to_string(), - mime::APPLICATION_JSON.essence_str().into(), - )]; - - let transformed_outgoing_webhook = WebhookType::from(outgoing_webhook); - - let outgoing_webhooks_signature = transformed_outgoing_webhook - .get_outgoing_webhooks_signature(payment_response_hash_key)?; - - if let Some(signature) = outgoing_webhooks_signature.signature { - WebhookType::add_webhook_header(&mut headers, signature) - } - - Ok(OutgoingWebhookRequestContent { - body: outgoing_webhooks_signature.payload, - headers: headers - .into_iter() - .map(|(name, value)| (name, Secret::new(value.into_inner()))) - .collect(), - }) - } - - match merchant_account.get_compatible_connector() { - #[cfg(feature = "stripe")] - Some(api_models::enums::Connector::Stripe) => get_outgoing_webhook_request_inner::< - stripe_webhooks::StripeOutgoingWebhook, - >( - outgoing_webhook, payment_response_hash_key - ), - _ => get_outgoing_webhook_request_inner::( - outgoing_webhook, - payment_response_hash_key, - ), - } -} diff --git a/crates/router/src/core/webhooks/incoming.rs b/crates/router/src/core/webhooks/incoming.rs new file mode 100644 index 000000000000..2f1a37981ec6 --- /dev/null +++ b/crates/router/src/core/webhooks/incoming.rs @@ -0,0 +1,1628 @@ +use std::{str::FromStr, time::Instant}; + +use actix_web::FromRequest; +#[cfg(feature = "payouts")] +use api_models::payouts as payout_models; +use api_models::{ + payments::HeaderPayload, + webhooks::{self, WebhookResponseTracker}, +}; +use common_utils::{errors::ReportSwitchExt, events::ApiEventsType}; +use error_stack::{report, ResultExt}; +use masking::ExposeInterface; +use router_env::{instrument, tracing, tracing_actix_web::RequestId}; + +use super::{types, utils, MERCHANT_ID}; +use crate::{ + consts, + core::{ + api_locking, + errors::{self, ConnectorErrorExt, CustomResult, RouterResponse, StorageErrorExt}, + metrics, payments, refunds, utils as core_utils, + }, + db::StorageInterface, + events::api_logs::ApiEvent, + logger, + routes::{ + app::{ReqState, SessionStateInfo}, + lock_utils, + metrics::request::add_attributes, + SessionState, + }, + services::{self, authentication as auth}, + types::{ + api::{self, mandates::MandateResponseExt}, + domain, + storage::{self, enums}, + transformers::{ForeignFrom, ForeignInto, ForeignTryFrom}, + }, + utils::{self as helper_utils, generate_id, OptionExt}, +}; +#[cfg(feature = "payouts")] +use crate::{core::payouts, types::storage::PayoutAttemptUpdate}; + +#[allow(clippy::too_many_arguments)] +pub async fn incoming_webhooks_wrapper( + flow: &impl router_env::types::FlowMetric, + state: SessionState, + req_state: ReqState, + req: &actix_web::HttpRequest, + merchant_account: domain::MerchantAccount, + key_store: domain::MerchantKeyStore, + connector_name_or_mca_id: &str, + body: actix_web::web::Bytes, +) -> RouterResponse { + let start_instant = Instant::now(); + let (application_response, webhooks_response_tracker, serialized_req) = + Box::pin(incoming_webhooks_core::( + state.clone(), + req_state, + req, + merchant_account.clone(), + key_store, + connector_name_or_mca_id, + body.clone(), + )) + .await?; + + logger::info!(incoming_webhook_payload = ?serialized_req); + + let request_duration = Instant::now() + .saturating_duration_since(start_instant) + .as_millis(); + + let request_id = RequestId::extract(req) + .await + .attach_printable("Unable to extract request id from request") + .change_context(errors::ApiErrorResponse::InternalServerError)?; + let auth_type = auth::AuthenticationType::WebhookAuth { + merchant_id: merchant_account.merchant_id.clone(), + }; + let status_code = 200; + let api_event = ApiEventsType::Webhooks { + connector: connector_name_or_mca_id.to_string(), + payment_id: webhooks_response_tracker.get_payment_id(), + }; + let response_value = serde_json::to_value(&webhooks_response_tracker) + .change_context(errors::ApiErrorResponse::InternalServerError) + .attach_printable("Could not convert webhook effect to string")?; + + let api_event = ApiEvent::new( + Some(merchant_account.merchant_id.clone()), + flow, + &request_id, + request_duration, + status_code, + serialized_req, + Some(response_value), + None, + auth_type, + None, + api_event, + req, + req.method(), + ); + state.event_handler().log_event(&api_event); + Ok(application_response) +} + +#[instrument(skip_all)] +async fn incoming_webhooks_core( + state: SessionState, + req_state: ReqState, + req: &actix_web::HttpRequest, + merchant_account: domain::MerchantAccount, + key_store: domain::MerchantKeyStore, + connector_name_or_mca_id: &str, + body: actix_web::web::Bytes, +) -> errors::RouterResult<( + services::ApplicationResponse, + WebhookResponseTracker, + serde_json::Value, +)> { + metrics::WEBHOOK_INCOMING_COUNT.add( + &metrics::CONTEXT, + 1, + &[metrics::KeyValue::new( + MERCHANT_ID, + merchant_account.merchant_id.clone(), + )], + ); + let mut request_details = api::IncomingWebhookRequestDetails { + method: req.method().clone(), + uri: req.uri().clone(), + headers: req.headers(), + query_params: req.query_string().to_string(), + body: &body, + }; + + // Fetch the merchant connector account to get the webhooks source secret + // `webhooks source secret` is a secret shared between the merchant and connector + // This is used for source verification and webhooks integrity + let (merchant_connector_account, connector, connector_name) = fetch_optional_mca_and_connector( + &state, + &merchant_account, + connector_name_or_mca_id, + &key_store, + ) + .await?; + + let decoded_body = connector + .decode_webhook_body( + &*state.clone().store, + &request_details, + &merchant_account.merchant_id, + ) + .await + .switch() + .attach_printable("There was an error in incoming webhook body decoding")?; + + request_details.body = &decoded_body; + + let event_type = match connector + .get_webhook_event_type(&request_details) + .allow_webhook_event_type_not_found( + state + .clone() + .conf + .webhooks + .ignore_error + .event_type + .unwrap_or(true), + ) + .switch() + .attach_printable("Could not find event type in incoming webhook body")? + { + Some(event_type) => event_type, + // Early return allows us to acknowledge the webhooks that we do not support + None => { + logger::error!( + webhook_payload =? request_details.body, + "Failed while identifying the event type", + ); + + metrics::WEBHOOK_EVENT_TYPE_IDENTIFICATION_FAILURE_COUNT.add( + &metrics::CONTEXT, + 1, + &[ + metrics::KeyValue::new(MERCHANT_ID, merchant_account.merchant_id.clone()), + metrics::KeyValue::new("connector", connector_name.to_string()), + ], + ); + + let response = connector + .get_webhook_api_response(&request_details) + .switch() + .attach_printable("Failed while early return in case of event type parsing")?; + + return Ok(( + response, + WebhookResponseTracker::NoEffect, + serde_json::Value::Null, + )); + } + }; + logger::info!(event_type=?event_type); + + let is_webhook_event_supported = !matches!( + event_type, + webhooks::IncomingWebhookEvent::EventNotSupported + ); + let is_webhook_event_enabled = !utils::is_webhook_event_disabled( + &*state.clone().store, + connector_name.as_str(), + &merchant_account.merchant_id, + &event_type, + ) + .await; + + //process webhook further only if webhook event is enabled and is not event_not_supported + let process_webhook_further = is_webhook_event_enabled && is_webhook_event_supported; + + logger::info!(process_webhook=?process_webhook_further); + + let flow_type: api::WebhookFlow = event_type.into(); + let mut event_object: Box = Box::new(serde_json::Value::Null); + let webhook_effect = if process_webhook_further + && !matches!(flow_type, api::WebhookFlow::ReturnResponse) + { + let object_ref_id = connector + .get_webhook_object_reference_id(&request_details) + .switch() + .attach_printable("Could not find object reference id in incoming webhook body")?; + let connector_enum = api_models::enums::Connector::from_str(&connector_name) + .change_context(errors::ApiErrorResponse::InvalidDataValue { + field_name: "connector", + }) + .attach_printable_lazy(|| { + format!("unable to parse connector name {connector_name:?}") + })?; + let connectors_with_source_verification_call = &state.conf.webhook_source_verification_call; + + let merchant_connector_account = match merchant_connector_account { + Some(merchant_connector_account) => merchant_connector_account, + None => { + helper_utils::get_mca_from_object_reference_id( + &*state.clone().store, + object_ref_id.clone(), + &merchant_account, + &connector_name, + &key_store, + ) + .await? + } + }; + + let source_verified = if connectors_with_source_verification_call + .connectors_with_webhook_source_verification_call + .contains(&connector_enum) + { + connector + .verify_webhook_source_verification_call( + &state, + &merchant_account, + merchant_connector_account.clone(), + &connector_name, + &request_details, + ) + .await + .or_else(|error| match error.current_context() { + errors::ConnectorError::WebhookSourceVerificationFailed => { + logger::error!(?error, "Source Verification Failed"); + Ok(false) + } + _ => Err(error), + }) + .switch() + .attach_printable("There was an issue in incoming webhook source verification")? + } else { + connector + .verify_webhook_source( + &request_details, + &merchant_account, + merchant_connector_account.clone(), + connector_name.as_str(), + ) + .await + .or_else(|error| match error.current_context() { + errors::ConnectorError::WebhookSourceVerificationFailed => { + logger::error!(?error, "Source Verification Failed"); + Ok(false) + } + _ => Err(error), + }) + .switch() + .attach_printable("There was an issue in incoming webhook source verification")? + }; + + if source_verified { + metrics::WEBHOOK_SOURCE_VERIFIED_COUNT.add( + &metrics::CONTEXT, + 1, + &[metrics::KeyValue::new( + MERCHANT_ID, + merchant_account.merchant_id.clone(), + )], + ); + } else if connector.is_webhook_source_verification_mandatory() { + // if webhook consumption is mandatory for connector, fail webhook + // so that merchant can retrigger it after updating merchant_secret + return Err(errors::ApiErrorResponse::WebhookAuthenticationFailed.into()); + } + + logger::info!(source_verified=?source_verified); + + event_object = connector + .get_webhook_resource_object(&request_details) + .switch() + .attach_printable("Could not find resource object in incoming webhook body")?; + + let webhook_details = api::IncomingWebhookDetails { + object_reference_id: object_ref_id.clone(), + resource_object: serde_json::to_vec(&event_object) + .change_context(errors::ParsingError::EncodeError("byte-vec")) + .attach_printable("Unable to convert webhook payload to a value") + .change_context(errors::ApiErrorResponse::InternalServerError) + .attach_printable( + "There was an issue when encoding the incoming webhook body to bytes", + )?, + }; + + let profile_id = merchant_connector_account + .profile_id + .as_ref() + .get_required_value("profile_id") + .change_context(errors::ApiErrorResponse::InternalServerError) + .attach_printable("Could not find profile_id in merchant connector account")?; + + let business_profile = state + .store + .find_business_profile_by_profile_id(profile_id) + .await + .to_not_found_response(errors::ApiErrorResponse::BusinessProfileNotFound { + id: profile_id.to_string(), + })?; + + match flow_type { + api::WebhookFlow::Payment => Box::pin(payments_incoming_webhook_flow( + state.clone(), + req_state, + merchant_account, + business_profile, + key_store, + webhook_details, + source_verified, + )) + .await + .attach_printable("Incoming webhook flow for payments failed")?, + + api::WebhookFlow::Refund => Box::pin(refunds_incoming_webhook_flow( + state.clone(), + merchant_account, + business_profile, + key_store, + webhook_details, + connector_name.as_str(), + source_verified, + event_type, + )) + .await + .attach_printable("Incoming webhook flow for refunds failed")?, + + api::WebhookFlow::Dispute => Box::pin(disputes_incoming_webhook_flow( + state.clone(), + merchant_account, + business_profile, + key_store, + webhook_details, + source_verified, + connector, + &request_details, + event_type, + )) + .await + .attach_printable("Incoming webhook flow for disputes failed")?, + + api::WebhookFlow::BankTransfer => Box::pin(bank_transfer_webhook_flow( + state.clone(), + req_state, + merchant_account, + business_profile, + key_store, + webhook_details, + source_verified, + )) + .await + .attach_printable("Incoming bank-transfer webhook flow failed")?, + + api::WebhookFlow::ReturnResponse => WebhookResponseTracker::NoEffect, + + api::WebhookFlow::Mandate => Box::pin(mandates_incoming_webhook_flow( + state.clone(), + merchant_account, + business_profile, + key_store, + webhook_details, + source_verified, + event_type, + )) + .await + .attach_printable("Incoming webhook flow for mandates failed")?, + + api::WebhookFlow::ExternalAuthentication => { + Box::pin(external_authentication_incoming_webhook_flow( + state.clone(), + req_state, + merchant_account, + key_store, + source_verified, + event_type, + &request_details, + connector, + object_ref_id, + business_profile, + merchant_connector_account, + )) + .await + .attach_printable("Incoming webhook flow for external authentication failed")? + } + api::WebhookFlow::FraudCheck => Box::pin(frm_incoming_webhook_flow( + state.clone(), + req_state, + merchant_account, + key_store, + source_verified, + event_type, + object_ref_id, + business_profile, + )) + .await + .attach_printable("Incoming webhook flow for fraud check failed")?, + + #[cfg(feature = "payouts")] + api::WebhookFlow::Payout => Box::pin(payouts_incoming_webhook_flow( + state.clone(), + merchant_account, + business_profile, + key_store, + webhook_details, + event_type, + source_verified, + )) + .await + .attach_printable("Incoming webhook flow for payouts failed")?, + + _ => Err(errors::ApiErrorResponse::InternalServerError) + .attach_printable("Unsupported Flow Type received in incoming webhooks")?, + } + } else { + metrics::WEBHOOK_INCOMING_FILTERED_COUNT.add( + &metrics::CONTEXT, + 1, + &[metrics::KeyValue::new( + MERCHANT_ID, + merchant_account.merchant_id.clone(), + )], + ); + WebhookResponseTracker::NoEffect + }; + + let response = connector + .get_webhook_api_response(&request_details) + .switch() + .attach_printable("Could not get incoming webhook api response from connector")?; + + let serialized_request = event_object + .masked_serialize() + .change_context(errors::ApiErrorResponse::InternalServerError) + .attach_printable("Could not convert webhook effect to string")?; + Ok((response, webhook_effect, serialized_request)) +} + +#[instrument(skip_all)] +async fn payments_incoming_webhook_flow( + state: SessionState, + req_state: ReqState, + merchant_account: domain::MerchantAccount, + business_profile: diesel_models::business_profile::BusinessProfile, + key_store: domain::MerchantKeyStore, + webhook_details: api::IncomingWebhookDetails, + source_verified: bool, +) -> CustomResult { + let consume_or_trigger_flow = if source_verified { + payments::CallConnectorAction::HandleResponse(webhook_details.resource_object) + } else { + payments::CallConnectorAction::Trigger + }; + let payments_response = match webhook_details.object_reference_id { + webhooks::ObjectReferenceId::PaymentId(id) => { + let payment_id = get_payment_id( + state.store.as_ref(), + &id, + merchant_account.merchant_id.as_str(), + merchant_account.storage_scheme, + ) + .await?; + + let lock_action = api_locking::LockAction::Hold { + input: api_locking::LockingInput { + unique_locking_key: payment_id, + api_identifier: lock_utils::ApiIdentifier::Payments, + override_lock_retries: None, + }, + }; + + lock_action + .clone() + .perform_locking_action(&state, merchant_account.merchant_id.to_string()) + .await?; + + let response = Box::pin(payments::payments_core::< + api::PSync, + api::PaymentsResponse, + _, + _, + _, + >( + state.clone(), + req_state, + merchant_account.clone(), + key_store.clone(), + payments::operations::PaymentStatus, + api::PaymentsRetrieveRequest { + resource_id: id, + merchant_id: Some(merchant_account.merchant_id.clone()), + force_sync: true, + connector: None, + param: None, + merchant_connector_details: None, + client_secret: None, + expand_attempts: None, + expand_captures: None, + }, + services::AuthFlow::Merchant, + consume_or_trigger_flow, + None, + HeaderPayload::default(), + )) + .await; + + lock_action + .free_lock_action(&state, merchant_account.merchant_id.to_owned()) + .await?; + + match response { + Ok(value) => value, + Err(err) + if matches!( + err.current_context(), + &errors::ApiErrorResponse::PaymentNotFound + ) && state + .conf + .webhooks + .ignore_error + .payment_not_found + .unwrap_or(true) => + { + metrics::WEBHOOK_PAYMENT_NOT_FOUND.add( + &metrics::CONTEXT, + 1, + &[add_attributes( + "merchant_id", + merchant_account.merchant_id.clone(), + )], + ); + return Ok(WebhookResponseTracker::NoEffect); + } + error @ Err(_) => error?, + } + } + _ => Err(errors::ApiErrorResponse::WebhookProcessingFailure).attach_printable( + "Did not get payment id as object reference id in webhook payments flow", + )?, + }; + + match payments_response { + services::ApplicationResponse::JsonWithHeaders((payments_response, _)) => { + let payment_id = payments_response + .payment_id + .clone() + .get_required_value("payment_id") + .change_context(errors::ApiErrorResponse::WebhookProcessingFailure) + .attach_printable("payment id not received from payments core")?; + + let status = payments_response.status; + + let event_type: Option = payments_response.status.foreign_into(); + + // If event is NOT an UnsupportedEvent, trigger Outgoing Webhook + if let Some(outgoing_event_type) = event_type { + let primary_object_created_at = payments_response.created; + super::create_event_and_trigger_outgoing_webhook( + state, + merchant_account, + business_profile, + &key_store, + outgoing_event_type, + enums::EventClass::Payments, + payment_id.clone(), + enums::EventObjectType::PaymentDetails, + api::OutgoingWebhookContent::PaymentDetails(payments_response), + primary_object_created_at, + ) + .await?; + }; + + let response = WebhookResponseTracker::Payment { payment_id, status }; + + Ok(response) + } + + _ => Err(errors::ApiErrorResponse::WebhookProcessingFailure) + .attach_printable("received non-json response from payments core")?, + } +} + +#[cfg(feature = "payouts")] +#[instrument(skip_all)] +async fn payouts_incoming_webhook_flow( + state: SessionState, + merchant_account: domain::MerchantAccount, + business_profile: diesel_models::business_profile::BusinessProfile, + key_store: domain::MerchantKeyStore, + webhook_details: api::IncomingWebhookDetails, + event_type: webhooks::IncomingWebhookEvent, + source_verified: bool, +) -> CustomResult { + metrics::INCOMING_PAYOUT_WEBHOOK_METRIC.add(&metrics::CONTEXT, 1, &[]); + if source_verified { + let db = &*state.store; + //find payout_attempt by object_reference_id + let payout_attempt = match webhook_details.object_reference_id { + webhooks::ObjectReferenceId::PayoutId(payout_id_type) => match payout_id_type { + webhooks::PayoutIdType::PayoutAttemptId(id) => db + .find_payout_attempt_by_merchant_id_payout_attempt_id( + &merchant_account.merchant_id, + &id, + merchant_account.storage_scheme, + ) + .await + .change_context(errors::ApiErrorResponse::WebhookResourceNotFound) + .attach_printable("Failed to fetch the payout attempt")?, + webhooks::PayoutIdType::ConnectorPayoutId(id) => db + .find_payout_attempt_by_merchant_id_connector_payout_id( + &merchant_account.merchant_id, + &id, + merchant_account.storage_scheme, + ) + .await + .change_context(errors::ApiErrorResponse::WebhookResourceNotFound) + .attach_printable("Failed to fetch the payout attempt")?, + }, + _ => Err(errors::ApiErrorResponse::WebhookProcessingFailure) + .attach_printable("received a non-payout id when processing payout webhooks")?, + }; + + let payouts = db + .find_payout_by_merchant_id_payout_id( + &merchant_account.merchant_id, + &payout_attempt.payout_id, + merchant_account.storage_scheme, + ) + .await + .change_context(errors::ApiErrorResponse::WebhookResourceNotFound) + .attach_printable("Failed to fetch the payout")?; + + let payout_attempt_update = PayoutAttemptUpdate::StatusUpdate { + connector_payout_id: payout_attempt.connector_payout_id.clone(), + status: common_enums::PayoutStatus::foreign_try_from(event_type) + .change_context(errors::ApiErrorResponse::WebhookProcessingFailure) + .attach_printable("failed payout status mapping from event type")?, + error_message: None, + error_code: None, + is_eligible: payout_attempt.is_eligible, + }; + + let action_req = + payout_models::PayoutRequest::PayoutActionRequest(payout_models::PayoutActionRequest { + payout_id: payouts.payout_id.clone(), + }); + + let payout_data = + payouts::make_payout_data(&state, &merchant_account, &key_store, &action_req).await?; + + let updated_payout_attempt = db + .update_payout_attempt( + &payout_attempt, + payout_attempt_update, + &payout_data.payouts, + merchant_account.storage_scheme, + ) + .await + .change_context(errors::ApiErrorResponse::WebhookResourceNotFound) + .attach_printable_lazy(|| { + format!( + "Failed while updating payout attempt: payout_attempt_id: {}", + payout_attempt.payout_attempt_id + ) + })?; + + let event_type: Option = updated_payout_attempt.status.foreign_into(); + + // If event is NOT an UnsupportedEvent, trigger Outgoing Webhook + if let Some(outgoing_event_type) = event_type { + let router_response = + payouts::response_handler(&merchant_account, &payout_data).await?; + + let payout_create_response: payout_models::PayoutCreateResponse = match router_response + { + services::ApplicationResponse::Json(response) => response, + _ => Err(errors::ApiErrorResponse::WebhookResourceNotFound) + .attach_printable("Failed to fetch the payout create response")?, + }; + + super::create_event_and_trigger_outgoing_webhook( + state, + merchant_account, + business_profile, + &key_store, + outgoing_event_type, + enums::EventClass::Payouts, + updated_payout_attempt.payout_id.clone(), + enums::EventObjectType::PayoutDetails, + api::OutgoingWebhookContent::PayoutDetails(payout_create_response), + Some(updated_payout_attempt.created_at), + ) + .await?; + } + + Ok(WebhookResponseTracker::Payout { + payout_id: updated_payout_attempt.payout_id, + status: updated_payout_attempt.status, + }) + } else { + metrics::INCOMING_PAYOUT_WEBHOOK_SIGNATURE_FAILURE_METRIC.add(&metrics::CONTEXT, 1, &[]); + Err(report!( + errors::ApiErrorResponse::WebhookAuthenticationFailed + )) + } +} + +#[allow(clippy::too_many_arguments)] +#[instrument(skip_all)] +async fn refunds_incoming_webhook_flow( + state: SessionState, + merchant_account: domain::MerchantAccount, + business_profile: diesel_models::business_profile::BusinessProfile, + key_store: domain::MerchantKeyStore, + webhook_details: api::IncomingWebhookDetails, + connector_name: &str, + source_verified: bool, + event_type: webhooks::IncomingWebhookEvent, +) -> CustomResult { + let db = &*state.store; + //find refund by connector refund id + let refund = match webhook_details.object_reference_id { + webhooks::ObjectReferenceId::RefundId(refund_id_type) => match refund_id_type { + webhooks::RefundIdType::RefundId(id) => db + .find_refund_by_merchant_id_refund_id( + &merchant_account.merchant_id, + &id, + merchant_account.storage_scheme, + ) + .await + .change_context(errors::ApiErrorResponse::WebhookResourceNotFound) + .attach_printable("Failed to fetch the refund")?, + webhooks::RefundIdType::ConnectorRefundId(id) => db + .find_refund_by_merchant_id_connector_refund_id_connector( + &merchant_account.merchant_id, + &id, + connector_name, + merchant_account.storage_scheme, + ) + .await + .change_context(errors::ApiErrorResponse::WebhookResourceNotFound) + .attach_printable("Failed to fetch the refund")?, + }, + _ => Err(errors::ApiErrorResponse::WebhookProcessingFailure) + .attach_printable("received a non-refund id when processing refund webhooks")?, + }; + let refund_id = refund.refund_id.to_owned(); + //if source verified then update refund status else trigger refund sync + let updated_refund = if source_verified { + let refund_update = storage::RefundUpdate::StatusUpdate { + connector_refund_id: None, + sent_to_gateway: true, + refund_status: common_enums::RefundStatus::foreign_try_from(event_type) + .change_context(errors::ApiErrorResponse::WebhookProcessingFailure) + .attach_printable("failed refund status mapping from event type")?, + updated_by: merchant_account.storage_scheme.to_string(), + }; + db.update_refund( + refund.to_owned(), + refund_update, + merchant_account.storage_scheme, + ) + .await + .to_not_found_response(errors::ApiErrorResponse::WebhookResourceNotFound) + .attach_printable_lazy(|| format!("Failed while updating refund: refund_id: {refund_id}"))? + } else { + Box::pin(refunds::refund_retrieve_core( + state.clone(), + merchant_account.clone(), + key_store.clone(), + api_models::refunds::RefundsRetrieveRequest { + refund_id: refund_id.to_owned(), + force_sync: Some(true), + merchant_connector_details: None, + }, + )) + .await + .attach_printable_lazy(|| format!("Failed while updating refund: refund_id: {refund_id}"))? + }; + let event_type: Option = updated_refund.refund_status.foreign_into(); + + // If event is NOT an UnsupportedEvent, trigger Outgoing Webhook + if let Some(outgoing_event_type) = event_type { + let refund_response: api_models::refunds::RefundResponse = + updated_refund.clone().foreign_into(); + super::create_event_and_trigger_outgoing_webhook( + state, + merchant_account, + business_profile, + &key_store, + outgoing_event_type, + enums::EventClass::Refunds, + refund_id, + enums::EventObjectType::RefundDetails, + api::OutgoingWebhookContent::RefundDetails(refund_response), + Some(updated_refund.created_at), + ) + .await?; + } + + Ok(WebhookResponseTracker::Refund { + payment_id: updated_refund.payment_id, + refund_id: updated_refund.refund_id, + status: updated_refund.refund_status, + }) +} + +async fn get_payment_attempt_from_object_reference_id( + state: &SessionState, + object_reference_id: webhooks::ObjectReferenceId, + merchant_account: &domain::MerchantAccount, +) -> CustomResult< + hyperswitch_domain_models::payments::payment_attempt::PaymentAttempt, + errors::ApiErrorResponse, +> { + let db = &*state.store; + match object_reference_id { + api::ObjectReferenceId::PaymentId(api::PaymentIdType::ConnectorTransactionId(ref id)) => db + .find_payment_attempt_by_merchant_id_connector_txn_id( + &merchant_account.merchant_id, + id, + merchant_account.storage_scheme, + ) + .await + .to_not_found_response(errors::ApiErrorResponse::WebhookResourceNotFound), + api::ObjectReferenceId::PaymentId(api::PaymentIdType::PaymentAttemptId(ref id)) => db + .find_payment_attempt_by_attempt_id_merchant_id( + id, + &merchant_account.merchant_id, + merchant_account.storage_scheme, + ) + .await + .to_not_found_response(errors::ApiErrorResponse::WebhookResourceNotFound), + api::ObjectReferenceId::PaymentId(api::PaymentIdType::PreprocessingId(ref id)) => db + .find_payment_attempt_by_preprocessing_id_merchant_id( + id, + &merchant_account.merchant_id, + merchant_account.storage_scheme, + ) + .await + .to_not_found_response(errors::ApiErrorResponse::WebhookResourceNotFound), + _ => Err(errors::ApiErrorResponse::WebhookProcessingFailure) + .attach_printable("received a non-payment id for retrieving payment")?, + } +} + +#[allow(clippy::too_many_arguments)] +async fn get_or_update_dispute_object( + state: SessionState, + option_dispute: Option, + dispute_details: api::disputes::DisputePayload, + merchant_id: &str, + payment_attempt: &hyperswitch_domain_models::payments::payment_attempt::PaymentAttempt, + event_type: webhooks::IncomingWebhookEvent, + business_profile: &diesel_models::business_profile::BusinessProfile, + connector_name: &str, +) -> CustomResult { + let db = &*state.store; + match option_dispute { + None => { + metrics::INCOMING_DISPUTE_WEBHOOK_NEW_RECORD_METRIC.add(&metrics::CONTEXT, 1, &[]); + let dispute_id = generate_id(consts::ID_LENGTH, "dp"); + let new_dispute = diesel_models::dispute::DisputeNew { + dispute_id, + amount: dispute_details.amount.clone(), + currency: dispute_details.currency, + dispute_stage: dispute_details.dispute_stage, + dispute_status: common_enums::DisputeStatus::foreign_try_from(event_type) + .change_context(errors::ApiErrorResponse::WebhookProcessingFailure) + .attach_printable("event type to dispute status mapping failed")?, + payment_id: payment_attempt.payment_id.to_owned(), + connector: connector_name.to_owned(), + attempt_id: payment_attempt.attempt_id.to_owned(), + merchant_id: merchant_id.to_owned(), + connector_status: dispute_details.connector_status, + connector_dispute_id: dispute_details.connector_dispute_id, + connector_reason: dispute_details.connector_reason, + connector_reason_code: dispute_details.connector_reason_code, + challenge_required_by: dispute_details.challenge_required_by, + connector_created_at: dispute_details.created_at, + connector_updated_at: dispute_details.updated_at, + profile_id: Some(business_profile.profile_id.clone()), + evidence: None, + merchant_connector_id: payment_attempt.merchant_connector_id.clone(), + dispute_amount: dispute_details.amount.parse::().unwrap_or(0), + }; + state + .store + .insert_dispute(new_dispute.clone()) + .await + .to_not_found_response(errors::ApiErrorResponse::WebhookResourceNotFound) + } + Some(dispute) => { + logger::info!("Dispute Already exists, Updating the dispute details"); + metrics::INCOMING_DISPUTE_WEBHOOK_UPDATE_RECORD_METRIC.add(&metrics::CONTEXT, 1, &[]); + let dispute_status = diesel_models::enums::DisputeStatus::foreign_try_from(event_type) + .change_context(errors::ApiErrorResponse::WebhookProcessingFailure) + .attach_printable("event type to dispute state conversion failure")?; + crate::core::utils::validate_dispute_stage_and_dispute_status( + dispute.dispute_stage, + dispute.dispute_status, + dispute_details.dispute_stage, + dispute_status, + ) + .change_context(errors::ApiErrorResponse::WebhookProcessingFailure) + .attach_printable("dispute stage and status validation failed")?; + let update_dispute = diesel_models::dispute::DisputeUpdate::Update { + dispute_stage: dispute_details.dispute_stage, + dispute_status, + connector_status: dispute_details.connector_status, + connector_reason: dispute_details.connector_reason, + connector_reason_code: dispute_details.connector_reason_code, + challenge_required_by: dispute_details.challenge_required_by, + connector_updated_at: dispute_details.updated_at, + }; + db.update_dispute(dispute, update_dispute) + .await + .to_not_found_response(errors::ApiErrorResponse::WebhookResourceNotFound) + } + } +} + +#[allow(clippy::too_many_arguments)] +#[instrument(skip_all)] +async fn external_authentication_incoming_webhook_flow( + state: SessionState, + req_state: ReqState, + merchant_account: domain::MerchantAccount, + key_store: domain::MerchantKeyStore, + source_verified: bool, + event_type: webhooks::IncomingWebhookEvent, + request_details: &api::IncomingWebhookRequestDetails<'_>, + connector: &(dyn api::Connector + Sync), + object_ref_id: api::ObjectReferenceId, + business_profile: diesel_models::business_profile::BusinessProfile, + merchant_connector_account: domain::MerchantConnectorAccount, +) -> CustomResult { + if source_verified { + let authentication_details = connector + .get_external_authentication_details(request_details) + .switch()?; + let trans_status = authentication_details.trans_status; + let authentication_update = storage::AuthenticationUpdate::PostAuthenticationUpdate { + authentication_status: common_enums::AuthenticationStatus::foreign_from( + trans_status.clone(), + ), + trans_status, + authentication_value: authentication_details.authentication_value, + eci: authentication_details.eci, + }; + let authentication = + if let webhooks::ObjectReferenceId::ExternalAuthenticationID(authentication_id_type) = + object_ref_id + { + match authentication_id_type { + webhooks::AuthenticationIdType::AuthenticationId(authentication_id) => state + .store + .find_authentication_by_merchant_id_authentication_id( + merchant_account.merchant_id.clone(), + authentication_id.clone(), + ) + .await + .to_not_found_response(errors::ApiErrorResponse::AuthenticationNotFound { + id: authentication_id, + }) + .attach_printable("Error while fetching authentication record"), + webhooks::AuthenticationIdType::ConnectorAuthenticationId( + connector_authentication_id, + ) => state + .store + .find_authentication_by_merchant_id_connector_authentication_id( + merchant_account.merchant_id.clone(), + connector_authentication_id.clone(), + ) + .await + .to_not_found_response(errors::ApiErrorResponse::AuthenticationNotFound { + id: connector_authentication_id, + }) + .attach_printable("Error while fetching authentication record"), + } + } else { + Err(errors::ApiErrorResponse::WebhookProcessingFailure).attach_printable( + "received a non-external-authentication id for retrieving authentication", + ) + }?; + let updated_authentication = state + .store + .update_authentication_by_merchant_id_authentication_id( + authentication, + authentication_update, + ) + .await + .change_context(errors::ApiErrorResponse::InternalServerError) + .attach_printable("Error while updating authentication")?; + // Check if it's a payment authentication flow, payment_id would be there only for payment authentication flows + if let Some(payment_id) = updated_authentication.payment_id { + let is_pull_mechanism_enabled = helper_utils::check_if_pull_mechanism_for_external_3ds_enabled_from_connector_metadata(merchant_connector_account.metadata.map(|metadata| metadata.expose())); + // Merchant doesn't have pull mechanism enabled and if it's challenge flow, we have to authorize whenever we receive a ARes webhook + if !is_pull_mechanism_enabled + && updated_authentication.authentication_type + == Some(common_enums::DecoupledAuthenticationType::Challenge) + && event_type == webhooks::IncomingWebhookEvent::ExternalAuthenticationARes + { + let payment_confirm_req = api::PaymentsRequest { + payment_id: Some(api_models::payments::PaymentIdType::PaymentIntentId( + payment_id, + )), + merchant_id: Some(merchant_account.merchant_id.clone()), + ..Default::default() + }; + let payments_response = Box::pin(payments::payments_core::< + api::Authorize, + api::PaymentsResponse, + _, + _, + _, + >( + state.clone(), + req_state, + merchant_account.clone(), + key_store.clone(), + payments::PaymentConfirm, + payment_confirm_req, + services::api::AuthFlow::Merchant, + payments::CallConnectorAction::Trigger, + None, + HeaderPayload::with_source(enums::PaymentSource::ExternalAuthenticator), + )) + .await?; + match payments_response { + services::ApplicationResponse::JsonWithHeaders((payments_response, _)) => { + let payment_id = payments_response + .payment_id + .clone() + .get_required_value("payment_id") + .change_context(errors::ApiErrorResponse::WebhookProcessingFailure) + .attach_printable("payment id not received from payments core")?; + let status = payments_response.status; + let event_type: Option = + payments_response.status.foreign_into(); + // Set poll_id as completed in redis to allow the fetch status of poll through retrieve_poll_status api from client + let poll_id = core_utils::get_poll_id( + merchant_account.merchant_id.clone(), + core_utils::get_external_authentication_request_poll_id(&payment_id), + ); + let redis_conn = state + .store + .get_redis_conn() + .change_context(errors::ApiErrorResponse::InternalServerError) + .attach_printable("Failed to get redis connection")?; + redis_conn + .set_key_without_modifying_ttl( + &poll_id, + api_models::poll::PollStatus::Completed.to_string(), + ) + .await + .change_context(errors::ApiErrorResponse::InternalServerError) + .attach_printable("Failed to add poll_id in redis")?; + // If event is NOT an UnsupportedEvent, trigger Outgoing Webhook + if let Some(outgoing_event_type) = event_type { + let primary_object_created_at = payments_response.created; + super::create_event_and_trigger_outgoing_webhook( + state, + merchant_account, + business_profile, + &key_store, + outgoing_event_type, + enums::EventClass::Payments, + payment_id.clone(), + enums::EventObjectType::PaymentDetails, + api::OutgoingWebhookContent::PaymentDetails(payments_response), + primary_object_created_at, + ) + .await?; + }; + let response = WebhookResponseTracker::Payment { payment_id, status }; + Ok(response) + } + _ => Err(errors::ApiErrorResponse::WebhookProcessingFailure).attach_printable( + "Did not get payment id as object reference id in webhook payments flow", + )?, + } + } else { + Ok(WebhookResponseTracker::NoEffect) + } + } else { + Ok(WebhookResponseTracker::NoEffect) + } + } else { + logger::error!( + "Webhook source verification failed for external authentication webhook flow" + ); + Err(report!( + errors::ApiErrorResponse::WebhookAuthenticationFailed + )) + } +} + +#[instrument(skip_all)] +async fn mandates_incoming_webhook_flow( + state: SessionState, + merchant_account: domain::MerchantAccount, + business_profile: diesel_models::business_profile::BusinessProfile, + key_store: domain::MerchantKeyStore, + webhook_details: api::IncomingWebhookDetails, + source_verified: bool, + event_type: webhooks::IncomingWebhookEvent, +) -> CustomResult { + if source_verified { + let db = &*state.store; + let mandate = match webhook_details.object_reference_id { + webhooks::ObjectReferenceId::MandateId(webhooks::MandateIdType::MandateId( + mandate_id, + )) => db + .find_mandate_by_merchant_id_mandate_id( + &merchant_account.merchant_id, + mandate_id.as_str(), + merchant_account.storage_scheme, + ) + .await + .to_not_found_response(errors::ApiErrorResponse::MandateNotFound)?, + webhooks::ObjectReferenceId::MandateId( + webhooks::MandateIdType::ConnectorMandateId(connector_mandate_id), + ) => db + .find_mandate_by_merchant_id_connector_mandate_id( + &merchant_account.merchant_id, + connector_mandate_id.as_str(), + merchant_account.storage_scheme, + ) + .await + .to_not_found_response(errors::ApiErrorResponse::MandateNotFound)?, + _ => Err(errors::ApiErrorResponse::WebhookProcessingFailure) + .attach_printable("received a non-mandate id for retrieving mandate")?, + }; + let mandate_status = common_enums::MandateStatus::foreign_try_from(event_type) + .change_context(errors::ApiErrorResponse::WebhookProcessingFailure) + .attach_printable("event type to mandate status mapping failed")?; + let mandate_id = mandate.mandate_id.clone(); + let updated_mandate = db + .update_mandate_by_merchant_id_mandate_id( + &merchant_account.merchant_id, + &mandate_id, + storage::MandateUpdate::StatusUpdate { mandate_status }, + mandate, + merchant_account.storage_scheme, + ) + .await + .to_not_found_response(errors::ApiErrorResponse::MandateNotFound)?; + let mandates_response = Box::new( + api::mandates::MandateResponse::from_db_mandate( + &state, + key_store.clone(), + updated_mandate.clone(), + merchant_account.storage_scheme, + ) + .await?, + ); + let event_type: Option = updated_mandate.mandate_status.foreign_into(); + if let Some(outgoing_event_type) = event_type { + super::create_event_and_trigger_outgoing_webhook( + state, + merchant_account, + business_profile, + &key_store, + outgoing_event_type, + enums::EventClass::Mandates, + updated_mandate.mandate_id.clone(), + enums::EventObjectType::MandateDetails, + api::OutgoingWebhookContent::MandateDetails(mandates_response), + Some(updated_mandate.created_at), + ) + .await?; + } + Ok(WebhookResponseTracker::Mandate { + mandate_id: updated_mandate.mandate_id, + status: updated_mandate.mandate_status, + }) + } else { + logger::error!("Webhook source verification failed for mandates webhook flow"); + Err(report!( + errors::ApiErrorResponse::WebhookAuthenticationFailed + )) + } +} + +#[allow(clippy::too_many_arguments)] +#[instrument(skip_all)] +async fn frm_incoming_webhook_flow( + state: SessionState, + req_state: ReqState, + merchant_account: domain::MerchantAccount, + key_store: domain::MerchantKeyStore, + source_verified: bool, + event_type: webhooks::IncomingWebhookEvent, + object_ref_id: api::ObjectReferenceId, + business_profile: diesel_models::business_profile::BusinessProfile, +) -> CustomResult { + if source_verified { + let payment_attempt = + get_payment_attempt_from_object_reference_id(&state, object_ref_id, &merchant_account) + .await?; + let payment_response = match event_type { + webhooks::IncomingWebhookEvent::FrmApproved => { + Box::pin(payments::payments_core::< + api::Capture, + api::PaymentsResponse, + _, + _, + _, + >( + state.clone(), + req_state, + merchant_account.clone(), + key_store.clone(), + payments::PaymentApprove, + api::PaymentsCaptureRequest { + payment_id: payment_attempt.payment_id, + amount_to_capture: payment_attempt.amount_to_capture, + ..Default::default() + }, + services::api::AuthFlow::Merchant, + payments::CallConnectorAction::Trigger, + None, + HeaderPayload::default(), + )) + .await? + } + webhooks::IncomingWebhookEvent::FrmRejected => { + Box::pin(payments::payments_core::< + api::Void, + api::PaymentsResponse, + _, + _, + _, + >( + state.clone(), + req_state, + merchant_account.clone(), + key_store.clone(), + payments::PaymentReject, + api::PaymentsCancelRequest { + payment_id: payment_attempt.payment_id.clone(), + cancellation_reason: Some( + "Rejected by merchant based on FRM decision".to_string(), + ), + ..Default::default() + }, + services::api::AuthFlow::Merchant, + payments::CallConnectorAction::Trigger, + None, + HeaderPayload::default(), + )) + .await? + } + _ => Err(errors::ApiErrorResponse::EventNotFound)?, + }; + match payment_response { + services::ApplicationResponse::JsonWithHeaders((payments_response, _)) => { + let payment_id = payments_response + .payment_id + .clone() + .get_required_value("payment_id") + .change_context(errors::ApiErrorResponse::WebhookProcessingFailure) + .attach_printable("payment id not received from payments core")?; + let status = payments_response.status; + let event_type: Option = payments_response.status.foreign_into(); + if let Some(outgoing_event_type) = event_type { + let primary_object_created_at = payments_response.created; + super::create_event_and_trigger_outgoing_webhook( + state, + merchant_account, + business_profile, + &key_store, + outgoing_event_type, + enums::EventClass::Payments, + payment_id.clone(), + enums::EventObjectType::PaymentDetails, + api::OutgoingWebhookContent::PaymentDetails(payments_response), + primary_object_created_at, + ) + .await?; + }; + let response = WebhookResponseTracker::Payment { payment_id, status }; + Ok(response) + } + _ => Err(errors::ApiErrorResponse::WebhookProcessingFailure).attach_printable( + "Did not get payment id as object reference id in webhook payments flow", + )?, + } + } else { + logger::error!("Webhook source verification failed for frm webhooks flow"); + Err(report!( + errors::ApiErrorResponse::WebhookAuthenticationFailed + )) + } +} + +#[allow(clippy::too_many_arguments)] +#[instrument(skip_all)] +async fn disputes_incoming_webhook_flow( + state: SessionState, + merchant_account: domain::MerchantAccount, + business_profile: diesel_models::business_profile::BusinessProfile, + key_store: domain::MerchantKeyStore, + webhook_details: api::IncomingWebhookDetails, + source_verified: bool, + connector: &(dyn api::Connector + Sync), + request_details: &api::IncomingWebhookRequestDetails<'_>, + event_type: webhooks::IncomingWebhookEvent, +) -> CustomResult { + metrics::INCOMING_DISPUTE_WEBHOOK_METRIC.add(&metrics::CONTEXT, 1, &[]); + if source_verified { + let db = &*state.store; + let dispute_details = connector.get_dispute_details(request_details).switch()?; + let payment_attempt = get_payment_attempt_from_object_reference_id( + &state, + webhook_details.object_reference_id, + &merchant_account, + ) + .await?; + let option_dispute = db + .find_by_merchant_id_payment_id_connector_dispute_id( + &merchant_account.merchant_id, + &payment_attempt.payment_id, + &dispute_details.connector_dispute_id, + ) + .await + .to_not_found_response(errors::ApiErrorResponse::WebhookResourceNotFound)?; + let dispute_object = get_or_update_dispute_object( + state.clone(), + option_dispute, + dispute_details, + &merchant_account.merchant_id, + &payment_attempt, + event_type, + &business_profile, + connector.id(), + ) + .await?; + let disputes_response = Box::new(dispute_object.clone().foreign_into()); + let event_type: enums::EventType = dispute_object.dispute_status.foreign_into(); + + super::create_event_and_trigger_outgoing_webhook( + state, + merchant_account, + business_profile, + &key_store, + event_type, + enums::EventClass::Disputes, + dispute_object.dispute_id.clone(), + enums::EventObjectType::DisputeDetails, + api::OutgoingWebhookContent::DisputeDetails(disputes_response), + Some(dispute_object.created_at), + ) + .await?; + metrics::INCOMING_DISPUTE_WEBHOOK_MERCHANT_NOTIFIED_METRIC.add(&metrics::CONTEXT, 1, &[]); + Ok(WebhookResponseTracker::Dispute { + dispute_id: dispute_object.dispute_id, + payment_id: dispute_object.payment_id, + status: dispute_object.dispute_status, + }) + } else { + metrics::INCOMING_DISPUTE_WEBHOOK_SIGNATURE_FAILURE_METRIC.add(&metrics::CONTEXT, 1, &[]); + Err(report!( + errors::ApiErrorResponse::WebhookAuthenticationFailed + )) + } +} + +#[instrument(skip_all)] +async fn bank_transfer_webhook_flow( + state: SessionState, + req_state: ReqState, + merchant_account: domain::MerchantAccount, + business_profile: diesel_models::business_profile::BusinessProfile, + key_store: domain::MerchantKeyStore, + webhook_details: api::IncomingWebhookDetails, + source_verified: bool, +) -> CustomResult { + let response = if source_verified { + let payment_attempt = get_payment_attempt_from_object_reference_id( + &state, + webhook_details.object_reference_id, + &merchant_account, + ) + .await?; + let payment_id = payment_attempt.payment_id; + let request = api::PaymentsRequest { + payment_id: Some(api_models::payments::PaymentIdType::PaymentIntentId( + payment_id, + )), + payment_token: payment_attempt.payment_token, + ..Default::default() + }; + Box::pin(payments::payments_core::< + api::Authorize, + api::PaymentsResponse, + _, + _, + _, + >( + state.clone(), + req_state, + merchant_account.to_owned(), + key_store.clone(), + payments::PaymentConfirm, + request, + services::api::AuthFlow::Merchant, + payments::CallConnectorAction::Trigger, + None, + HeaderPayload::with_source(common_enums::PaymentSource::Webhook), + )) + .await + } else { + Err(report!( + errors::ApiErrorResponse::WebhookAuthenticationFailed + )) + }; + + match response? { + services::ApplicationResponse::JsonWithHeaders((payments_response, _)) => { + let payment_id = payments_response + .payment_id + .clone() + .get_required_value("payment_id") + .change_context(errors::ApiErrorResponse::WebhookProcessingFailure) + .attach_printable("did not receive payment id from payments core response")?; + + let event_type: Option = payments_response.status.foreign_into(); + let status = payments_response.status; + + // If event is NOT an UnsupportedEvent, trigger Outgoing Webhook + if let Some(outgoing_event_type) = event_type { + let primary_object_created_at = payments_response.created; + super::create_event_and_trigger_outgoing_webhook( + state, + merchant_account, + business_profile, + &key_store, + outgoing_event_type, + enums::EventClass::Payments, + payment_id.clone(), + enums::EventObjectType::PaymentDetails, + api::OutgoingWebhookContent::PaymentDetails(payments_response), + primary_object_created_at, + ) + .await?; + } + + Ok(WebhookResponseTracker::Payment { payment_id, status }) + } + + _ => Err(errors::ApiErrorResponse::WebhookProcessingFailure) + .attach_printable("received non-json response from payments core")?, + } +} + +#[inline] +async fn get_payment_id( + db: &dyn StorageInterface, + payment_id: &api::PaymentIdType, + merchant_id: &str, + storage_scheme: enums::MerchantStorageScheme, +) -> errors::RouterResult { + let pay_id = || async { + match payment_id { + api_models::payments::PaymentIdType::PaymentIntentId(ref id) => Ok(id.to_string()), + api_models::payments::PaymentIdType::ConnectorTransactionId(ref id) => db + .find_payment_attempt_by_merchant_id_connector_txn_id( + merchant_id, + id, + storage_scheme, + ) + .await + .map(|p| p.payment_id), + api_models::payments::PaymentIdType::PaymentAttemptId(ref id) => db + .find_payment_attempt_by_attempt_id_merchant_id(id, merchant_id, storage_scheme) + .await + .map(|p| p.payment_id), + api_models::payments::PaymentIdType::PreprocessingId(ref id) => db + .find_payment_attempt_by_preprocessing_id_merchant_id( + id, + merchant_id, + storage_scheme, + ) + .await + .map(|p| p.payment_id), + } + }; + + pay_id() + .await + .to_not_found_response(errors::ApiErrorResponse::PaymentNotFound) +} + +fn get_connector_by_connector_name( + state: &SessionState, + connector_name: &str, + merchant_connector_id: Option, +) -> CustomResult<(&'static (dyn api::Connector + Sync), String), errors::ApiErrorResponse> { + let authentication_connector = + api_models::enums::convert_authentication_connector(connector_name); + #[cfg(feature = "frm")] + { + let frm_connector = api_models::enums::convert_frm_connector(connector_name); + if frm_connector.is_some() { + let frm_connector_data = + api::FraudCheckConnectorData::get_connector_by_name(connector_name)?; + return Ok(( + *frm_connector_data.connector, + frm_connector_data.connector_name.to_string(), + )); + } + } + + let (connector, connector_name) = if authentication_connector.is_some() { + let authentication_connector_data = + api::AuthenticationConnectorData::get_connector_by_name(connector_name)?; + ( + authentication_connector_data.connector, + authentication_connector_data.connector_name.to_string(), + ) + } else { + let connector_data = api::ConnectorData::get_connector_by_name( + &state.conf.connectors, + connector_name, + api::GetToken::Connector, + merchant_connector_id, + ) + .change_context(errors::ApiErrorResponse::InvalidRequestData { + message: "invalid connector name received".to_string(), + }) + .attach_printable("Failed construction of ConnectorData")?; + ( + connector_data.connector, + connector_data.connector_name.to_string(), + ) + }; + Ok((*connector, connector_name)) +} + +/// This function fetches the merchant connector account ( if the url used is /{merchant_connector_id}) +/// if merchant connector id is not passed in the request, then this will return None for mca +async fn fetch_optional_mca_and_connector( + state: &SessionState, + merchant_account: &domain::MerchantAccount, + connector_name_or_mca_id: &str, + key_store: &domain::MerchantKeyStore, +) -> CustomResult< + ( + Option, + &'static (dyn api::Connector + Sync), + String, + ), + errors::ApiErrorResponse, +> { + let db = &state.store; + if connector_name_or_mca_id.starts_with("mca_") { + let mca = db + .find_by_merchant_connector_account_merchant_id_merchant_connector_id( + &merchant_account.merchant_id, + connector_name_or_mca_id, + key_store, + ) + .await + .to_not_found_response(errors::ApiErrorResponse::MerchantConnectorAccountNotFound { + id: connector_name_or_mca_id.to_string(), + }) + .attach_printable( + "error while fetching merchant_connector_account from connector_id", + )?; + let (connector, connector_name) = get_connector_by_connector_name( + state, + &mca.connector_name, + Some(mca.merchant_connector_id.clone()), + )?; + + Ok((Some(mca), connector, connector_name)) + } else { + // Merchant connector account is already being queried, it is safe to set connector id as None + let (connector, connector_name) = + get_connector_by_connector_name(state, connector_name_or_mca_id, None)?; + Ok((None, connector, connector_name)) + } +} diff --git a/crates/router/src/core/webhooks/outgoing.rs b/crates/router/src/core/webhooks/outgoing.rs new file mode 100644 index 000000000000..94f96c452166 --- /dev/null +++ b/crates/router/src/core/webhooks/outgoing.rs @@ -0,0 +1,775 @@ +use api_models::{ + webhook_events::{OutgoingWebhookRequestContent, OutgoingWebhookResponseContent}, + webhooks, +}; +use common_utils::{ext_traits::Encode, request::RequestContent}; +use error_stack::{report, ResultExt}; +use masking::{ExposeInterface, Mask, PeekInterface, Secret}; +use router_env::{ + instrument, + tracing::{self, Instrument}, +}; + +use super::{types, utils, MERCHANT_ID}; +#[cfg(feature = "stripe")] +use crate::compatibility::stripe::webhooks as stripe_webhooks; +use crate::{ + core::{ + errors::{self, CustomResult}, + metrics, + }, + db::StorageInterface, + events::outgoing_webhook_logs::{OutgoingWebhookEvent, OutgoingWebhookEventMetric}, + logger, + routes::{app::SessionStateInfo, metrics::request::add_attributes, SessionState}, + services, + types::{ + api, + domain::{self, types as domain_types}, + storage::{self, enums}, + }, + utils::{OptionExt, ValueExt}, + workflows::outgoing_webhook_retry, +}; + +const OUTGOING_WEBHOOK_TIMEOUT_SECS: u64 = 5; + +#[allow(clippy::too_many_arguments)] +#[instrument(skip_all)] +pub(crate) async fn create_event_and_trigger_outgoing_webhook( + state: SessionState, + merchant_account: domain::MerchantAccount, + business_profile: diesel_models::business_profile::BusinessProfile, + merchant_key_store: &domain::MerchantKeyStore, + event_type: enums::EventType, + event_class: enums::EventClass, + primary_object_id: String, + primary_object_type: enums::EventObjectType, + content: api::OutgoingWebhookContent, + primary_object_created_at: Option, +) -> CustomResult<(), errors::ApiErrorResponse> { + let delivery_attempt = enums::WebhookDeliveryAttempt::InitialAttempt; + let idempotent_event_id = + utils::get_idempotent_event_id(&primary_object_id, event_type, delivery_attempt); + let webhook_url_result = get_webhook_url_from_business_profile(&business_profile); + + if !state.conf.webhooks.outgoing_enabled + || webhook_url_result.is_err() + || webhook_url_result.as_ref().is_ok_and(String::is_empty) + { + logger::debug!( + business_profile_id=%business_profile.profile_id, + %idempotent_event_id, + "Outgoing webhooks are disabled in application configuration, or merchant webhook URL \ + could not be obtained; skipping outgoing webhooks for event" + ); + return Ok(()); + } + + let event_id = utils::generate_event_id(); + let merchant_id = business_profile.merchant_id.clone(); + let now = common_utils::date_time::now(); + + let outgoing_webhook = api::OutgoingWebhook { + merchant_id: merchant_id.clone(), + event_id: event_id.clone(), + event_type, + content: content.clone(), + timestamp: now, + }; + + let request_content = get_outgoing_webhook_request( + &merchant_account, + outgoing_webhook, + business_profile.payment_response_hash_key.as_deref(), + ) + .change_context(errors::ApiErrorResponse::WebhookProcessingFailure) + .attach_printable("Failed to construct outgoing webhook request content")?; + + let new_event = domain::Event { + event_id: event_id.clone(), + event_type, + event_class, + is_webhook_notified: false, + primary_object_id, + primary_object_type, + created_at: now, + merchant_id: Some(business_profile.merchant_id.clone()), + business_profile_id: Some(business_profile.profile_id.clone()), + primary_object_created_at, + idempotent_event_id: Some(idempotent_event_id.clone()), + initial_attempt_id: Some(event_id.clone()), + request: Some( + domain_types::encrypt( + request_content + .encode_to_string_of_json() + .change_context(errors::ApiErrorResponse::WebhookProcessingFailure) + .attach_printable("Failed to encode outgoing webhook request content") + .map(Secret::new)?, + merchant_key_store.key.get_inner().peek(), + ) + .await + .change_context(errors::ApiErrorResponse::WebhookProcessingFailure) + .attach_printable("Failed to encrypt outgoing webhook request content")?, + ), + response: None, + delivery_attempt: Some(delivery_attempt), + }; + + let event_insert_result = state + .store + .insert_event(new_event, merchant_key_store) + .await; + + let event = match event_insert_result { + Ok(event) => Ok(event), + Err(error) => { + if error.current_context().is_db_unique_violation() { + logger::debug!("Event with idempotent ID `{idempotent_event_id}` already exists in the database"); + return Ok(()); + } else { + logger::error!(event_insertion_failure=?error); + Err(error + .change_context(errors::ApiErrorResponse::WebhookProcessingFailure) + .attach_printable("Failed to insert event in events table")) + } + } + }?; + + let process_tracker = add_outgoing_webhook_retry_task_to_process_tracker( + &*state.store, + &business_profile, + &event, + ) + .await + .map_err(|error| { + logger::error!( + ?error, + "Failed to add outgoing webhook retry task to process tracker" + ); + error + }) + .ok(); + + let cloned_key_store = merchant_key_store.clone(); + // Using a tokio spawn here and not arbiter because not all caller of this function + // may have an actix arbiter + tokio::spawn( + async move { + Box::pin(trigger_webhook_and_raise_event( + state, + business_profile, + &cloned_key_store, + event, + request_content, + delivery_attempt, + Some(content), + process_tracker, + )) + .await; + } + .in_current_span(), + ); + + Ok(()) +} + +#[allow(clippy::too_many_arguments)] +#[instrument(skip_all)] +pub(crate) async fn trigger_webhook_and_raise_event( + state: SessionState, + business_profile: diesel_models::business_profile::BusinessProfile, + merchant_key_store: &domain::MerchantKeyStore, + event: domain::Event, + request_content: OutgoingWebhookRequestContent, + delivery_attempt: enums::WebhookDeliveryAttempt, + content: Option, + process_tracker: Option, +) { + logger::debug!( + event_id=%event.event_id, + idempotent_event_id=?event.idempotent_event_id, + initial_attempt_id=?event.initial_attempt_id, + "Attempting to send webhook" + ); + + let merchant_id = business_profile.merchant_id.clone(); + let trigger_webhook_result = trigger_webhook_to_merchant( + state.clone(), + business_profile, + merchant_key_store, + event.clone(), + request_content, + delivery_attempt, + process_tracker, + ) + .await; + + raise_webhooks_analytics_event(state, trigger_webhook_result, content, merchant_id, event); +} + +async fn trigger_webhook_to_merchant( + state: SessionState, + business_profile: diesel_models::business_profile::BusinessProfile, + merchant_key_store: &domain::MerchantKeyStore, + event: domain::Event, + request_content: OutgoingWebhookRequestContent, + delivery_attempt: enums::WebhookDeliveryAttempt, + process_tracker: Option, +) -> CustomResult<(), errors::WebhooksFlowError> { + let webhook_url = match ( + get_webhook_url_from_business_profile(&business_profile), + process_tracker.clone(), + ) { + (Ok(webhook_url), _) => Ok(webhook_url), + (Err(error), Some(process_tracker)) => { + if !error + .current_context() + .is_webhook_delivery_retryable_error() + { + logger::debug!("Failed to obtain merchant webhook URL, aborting retries"); + state + .store + .as_scheduler() + .finish_process_with_business_status(process_tracker, "FAILURE".into()) + .await + .change_context( + errors::WebhooksFlowError::OutgoingWebhookProcessTrackerTaskUpdateFailed, + )?; + } + Err(error) + } + (Err(error), None) => Err(error), + }?; + + let event_id = event.event_id; + + let headers = request_content + .headers + .into_iter() + .map(|(name, value)| (name, value.into_masked())) + .collect(); + let request = services::RequestBuilder::new() + .method(services::Method::Post) + .url(&webhook_url) + .attach_default_headers() + .headers(headers) + .set_body(RequestContent::RawBytes( + request_content.body.expose().into_bytes(), + )) + .build(); + + let response = state + .api_client + .send_request(&state, request, Some(OUTGOING_WEBHOOK_TIMEOUT_SECS), false) + .await; + + metrics::WEBHOOK_OUTGOING_COUNT.add( + &metrics::CONTEXT, + 1, + &[metrics::KeyValue::new( + MERCHANT_ID, + business_profile.merchant_id.clone(), + )], + ); + logger::debug!(outgoing_webhook_response=?response); + + let update_event_if_client_error = + |state: SessionState, + merchant_key_store: domain::MerchantKeyStore, + merchant_id: String, + event_id: String, + error_message: String| async move { + let is_webhook_notified = false; + + let response_to_store = OutgoingWebhookResponseContent { + body: None, + headers: None, + status_code: None, + error_message: Some(error_message), + }; + + let event_update = domain::EventUpdate::UpdateResponse { + is_webhook_notified, + response: Some( + domain_types::encrypt( + response_to_store + .encode_to_string_of_json() + .change_context( + errors::WebhooksFlowError::OutgoingWebhookResponseEncodingFailed, + ) + .map(Secret::new)?, + merchant_key_store.key.get_inner().peek(), + ) + .await + .change_context(errors::WebhooksFlowError::WebhookEventUpdationFailed) + .attach_printable("Failed to encrypt outgoing webhook response content")?, + ), + }; + + state + .store + .update_event_by_merchant_id_event_id( + &merchant_id, + &event_id, + event_update, + &merchant_key_store, + ) + .await + .change_context(errors::WebhooksFlowError::WebhookEventUpdationFailed) + }; + + let api_client_error_handler = + |state: SessionState, + merchant_key_store: domain::MerchantKeyStore, + merchant_id: String, + event_id: String, + client_error: error_stack::Report, + delivery_attempt: enums::WebhookDeliveryAttempt| async move { + // Not including detailed error message in response information since it contains too + // much of diagnostic information to be exposed to the merchant. + update_event_if_client_error( + state, + merchant_key_store, + merchant_id, + event_id, + "Unable to send request to merchant server".to_string(), + ) + .await?; + + let error = + client_error.change_context(errors::WebhooksFlowError::CallToMerchantFailed); + logger::error!( + ?error, + ?delivery_attempt, + "An error occurred when sending webhook to merchant" + ); + + Ok::<_, error_stack::Report>(()) + }; + let update_event_in_storage = |state: SessionState, + merchant_key_store: domain::MerchantKeyStore, + merchant_id: String, + event_id: String, + response: reqwest::Response| async move { + let status_code = response.status(); + let is_webhook_notified = status_code.is_success(); + + let response_headers = response + .headers() + .iter() + .map(|(name, value)| { + ( + name.as_str().to_owned(), + value + .to_str() + .map(|s| Secret::from(String::from(s))) + .unwrap_or_else(|error| { + logger::warn!( + "Response header {} contains non-UTF-8 characters: {error:?}", + name.as_str() + ); + Secret::from(String::from("Non-UTF-8 header value")) + }), + ) + }) + .collect::>(); + let response_body = response + .text() + .await + .map(Secret::from) + .unwrap_or_else(|error| { + logger::warn!("Response contains non-UTF-8 characters: {error:?}"); + Secret::from(String::from("Non-UTF-8 response body")) + }); + let response_to_store = OutgoingWebhookResponseContent { + body: Some(response_body), + headers: Some(response_headers), + status_code: Some(status_code.as_u16()), + error_message: None, + }; + + let event_update = domain::EventUpdate::UpdateResponse { + is_webhook_notified, + response: Some( + domain_types::encrypt( + response_to_store + .encode_to_string_of_json() + .change_context( + errors::WebhooksFlowError::OutgoingWebhookResponseEncodingFailed, + ) + .map(Secret::new)?, + merchant_key_store.key.get_inner().peek(), + ) + .await + .change_context(errors::WebhooksFlowError::WebhookEventUpdationFailed) + .attach_printable("Failed to encrypt outgoing webhook response content")?, + ), + }; + state + .store + .update_event_by_merchant_id_event_id( + &merchant_id, + &event_id, + event_update, + &merchant_key_store, + ) + .await + .change_context(errors::WebhooksFlowError::WebhookEventUpdationFailed) + }; + let increment_webhook_outgoing_received_count = |merchant_id: String| { + metrics::WEBHOOK_OUTGOING_RECEIVED_COUNT.add( + &metrics::CONTEXT, + 1, + &[metrics::KeyValue::new(MERCHANT_ID, merchant_id)], + ) + }; + let success_response_handler = + |state: SessionState, + merchant_id: String, + process_tracker: Option, + business_status: &'static str| async move { + increment_webhook_outgoing_received_count(merchant_id); + + match process_tracker { + Some(process_tracker) => state + .store + .as_scheduler() + .finish_process_with_business_status(process_tracker, business_status.into()) + .await + .change_context( + errors::WebhooksFlowError::OutgoingWebhookProcessTrackerTaskUpdateFailed, + ), + None => Ok(()), + } + }; + let error_response_handler = |merchant_id: String, + delivery_attempt: enums::WebhookDeliveryAttempt, + status_code: u16, + log_message: &'static str| { + metrics::WEBHOOK_OUTGOING_NOT_RECEIVED_COUNT.add( + &metrics::CONTEXT, + 1, + &[metrics::KeyValue::new(MERCHANT_ID, merchant_id)], + ); + + let error = report!(errors::WebhooksFlowError::NotReceivedByMerchant); + logger::warn!(?error, ?delivery_attempt, ?status_code, %log_message); + }; + + match delivery_attempt { + enums::WebhookDeliveryAttempt::InitialAttempt => match response { + Err(client_error) => { + api_client_error_handler( + state.clone(), + merchant_key_store.clone(), + business_profile.merchant_id.clone(), + event_id.clone(), + client_error, + delivery_attempt, + ) + .await? + } + Ok(response) => { + let status_code = response.status(); + let _updated_event = update_event_in_storage( + state.clone(), + merchant_key_store.clone(), + business_profile.merchant_id.clone(), + event_id.clone(), + response, + ) + .await?; + + if status_code.is_success() { + success_response_handler( + state.clone(), + business_profile.merchant_id, + process_tracker, + "INITIAL_DELIVERY_ATTEMPT_SUCCESSFUL", + ) + .await?; + } else { + error_response_handler( + business_profile.merchant_id, + delivery_attempt, + status_code.as_u16(), + "Ignoring error when sending webhook to merchant", + ); + } + } + }, + enums::WebhookDeliveryAttempt::AutomaticRetry => { + let process_tracker = process_tracker + .get_required_value("process_tracker") + .change_context(errors::WebhooksFlowError::OutgoingWebhookRetrySchedulingFailed) + .attach_printable("`process_tracker` is unavailable in automatic retry flow")?; + match response { + Err(client_error) => { + api_client_error_handler( + state.clone(), + merchant_key_store.clone(), + business_profile.merchant_id.clone(), + event_id.clone(), + client_error, + delivery_attempt, + ) + .await?; + // Schedule a retry attempt for webhook delivery + outgoing_webhook_retry::retry_webhook_delivery_task( + &*state.store, + &business_profile.merchant_id, + process_tracker, + ) + .await + .change_context( + errors::WebhooksFlowError::OutgoingWebhookRetrySchedulingFailed, + )?; + } + Ok(response) => { + let status_code = response.status(); + let _updated_event = update_event_in_storage( + state.clone(), + merchant_key_store.clone(), + business_profile.merchant_id.clone(), + event_id.clone(), + response, + ) + .await?; + + if status_code.is_success() { + success_response_handler( + state.clone(), + business_profile.merchant_id, + Some(process_tracker), + "COMPLETED_BY_PT", + ) + .await?; + } else { + error_response_handler( + business_profile.merchant_id.clone(), + delivery_attempt, + status_code.as_u16(), + "An error occurred when sending webhook to merchant", + ); + // Schedule a retry attempt for webhook delivery + outgoing_webhook_retry::retry_webhook_delivery_task( + &*state.store, + &business_profile.merchant_id, + process_tracker, + ) + .await + .change_context( + errors::WebhooksFlowError::OutgoingWebhookRetrySchedulingFailed, + )?; + } + } + } + } + enums::WebhookDeliveryAttempt::ManualRetry => match response { + Err(client_error) => { + api_client_error_handler( + state.clone(), + merchant_key_store.clone(), + business_profile.merchant_id.clone(), + event_id.clone(), + client_error, + delivery_attempt, + ) + .await? + } + Ok(response) => { + let status_code = response.status(); + let _updated_event = update_event_in_storage( + state.clone(), + merchant_key_store.clone(), + business_profile.merchant_id.clone(), + event_id.clone(), + response, + ) + .await?; + + if status_code.is_success() { + increment_webhook_outgoing_received_count(business_profile.merchant_id.clone()); + } else { + error_response_handler( + business_profile.merchant_id, + delivery_attempt, + status_code.as_u16(), + "Ignoring error when sending webhook to merchant", + ); + } + } + }, + } + + Ok(()) +} + +fn raise_webhooks_analytics_event( + state: SessionState, + trigger_webhook_result: CustomResult<(), errors::WebhooksFlowError>, + content: Option, + merchant_id: String, + event: domain::Event, +) { + let error = if let Err(error) = trigger_webhook_result { + logger::error!(?error, "Failed to send webhook to merchant"); + + serde_json::to_value(error.current_context()) + .change_context(errors::ApiErrorResponse::WebhookProcessingFailure) + .map_err(|error| { + logger::error!(?error, "Failed to serialize outgoing webhook error as JSON"); + error + }) + .ok() + } else { + None + }; + + let outgoing_webhook_event_content = content + .as_ref() + .and_then(api::OutgoingWebhookContent::get_outgoing_webhook_event_content); + let webhook_event = OutgoingWebhookEvent::new( + merchant_id, + event.event_id, + event.event_type, + outgoing_webhook_event_content, + error, + event.initial_attempt_id, + ); + state.event_handler().log_event(&webhook_event); +} + +pub(crate) async fn add_outgoing_webhook_retry_task_to_process_tracker( + db: &dyn StorageInterface, + business_profile: &diesel_models::business_profile::BusinessProfile, + event: &domain::Event, +) -> CustomResult { + let schedule_time = outgoing_webhook_retry::get_webhook_delivery_retry_schedule_time( + db, + &business_profile.merchant_id, + 0, + ) + .await + .ok_or(errors::StorageError::ValueNotFound( + "Process tracker schedule time".into(), // Can raise a better error here + )) + .attach_printable("Failed to obtain initial process tracker schedule time")?; + + let tracking_data = types::OutgoingWebhookTrackingData { + merchant_id: business_profile.merchant_id.clone(), + business_profile_id: business_profile.profile_id.clone(), + event_type: event.event_type, + event_class: event.event_class, + primary_object_id: event.primary_object_id.clone(), + primary_object_type: event.primary_object_type, + initial_attempt_id: event.initial_attempt_id.clone(), + }; + + let runner = storage::ProcessTrackerRunner::OutgoingWebhookRetryWorkflow; + let task = "OUTGOING_WEBHOOK_RETRY"; + let tag = ["OUTGOING_WEBHOOKS"]; + let process_tracker_id = scheduler::utils::get_process_tracker_id( + runner, + task, + &event.event_id, + &business_profile.merchant_id, + ); + let process_tracker_entry = storage::ProcessTrackerNew::new( + process_tracker_id, + task, + runner, + tag, + tracking_data, + schedule_time, + ) + .map_err(errors::StorageError::from)?; + + match db.insert_process(process_tracker_entry).await { + Ok(process_tracker) => { + crate::routes::metrics::TASKS_ADDED_COUNT.add( + &metrics::CONTEXT, + 1, + &[add_attributes("flow", "OutgoingWebhookRetry")], + ); + Ok(process_tracker) + } + Err(error) => { + crate::routes::metrics::TASK_ADDITION_FAILURES_COUNT.add( + &metrics::CONTEXT, + 1, + &[add_attributes("flow", "OutgoingWebhookRetry")], + ); + Err(error) + } + } +} + +fn get_webhook_url_from_business_profile( + business_profile: &diesel_models::business_profile::BusinessProfile, +) -> CustomResult { + let webhook_details_json = business_profile + .webhook_details + .clone() + .get_required_value("webhook_details") + .change_context(errors::WebhooksFlowError::MerchantWebhookDetailsNotFound)?; + + let webhook_details: api::WebhookDetails = + webhook_details_json + .parse_value("WebhookDetails") + .change_context(errors::WebhooksFlowError::MerchantWebhookDetailsNotFound)?; + + webhook_details + .webhook_url + .get_required_value("webhook_url") + .change_context(errors::WebhooksFlowError::MerchantWebhookUrlNotConfigured) + .map(ExposeInterface::expose) +} + +pub(crate) fn get_outgoing_webhook_request( + merchant_account: &domain::MerchantAccount, + outgoing_webhook: api::OutgoingWebhook, + payment_response_hash_key: Option<&str>, +) -> CustomResult { + #[inline] + fn get_outgoing_webhook_request_inner( + outgoing_webhook: api::OutgoingWebhook, + payment_response_hash_key: Option<&str>, + ) -> CustomResult { + let mut headers = vec![( + reqwest::header::CONTENT_TYPE.to_string(), + mime::APPLICATION_JSON.essence_str().into(), + )]; + + let transformed_outgoing_webhook = WebhookType::from(outgoing_webhook); + + let outgoing_webhooks_signature = transformed_outgoing_webhook + .get_outgoing_webhooks_signature(payment_response_hash_key)?; + + if let Some(signature) = outgoing_webhooks_signature.signature { + WebhookType::add_webhook_header(&mut headers, signature) + } + + Ok(OutgoingWebhookRequestContent { + body: outgoing_webhooks_signature.payload, + headers: headers + .into_iter() + .map(|(name, value)| (name, Secret::new(value.into_inner()))) + .collect(), + }) + } + + match merchant_account.get_compatible_connector() { + #[cfg(feature = "stripe")] + Some(api_models::enums::Connector::Stripe) => get_outgoing_webhook_request_inner::< + stripe_webhooks::StripeOutgoingWebhook, + >( + outgoing_webhook, payment_response_hash_key + ), + _ => get_outgoing_webhook_request_inner::( + outgoing_webhook, + payment_response_hash_key, + ), + } +} diff --git a/crates/router/src/core/webhooks/webhook_events.rs b/crates/router/src/core/webhooks/webhook_events.rs index 2cb16a53b3a8..c5ba557a78d9 100644 --- a/crates/router/src/core/webhooks/webhook_events.rs +++ b/crates/router/src/core/webhooks/webhook_events.rs @@ -231,7 +231,7 @@ pub async fn retry_delivery_attempt( .change_context(errors::ApiErrorResponse::InternalServerError) .attach_printable("Failed to parse webhook event request information")?; - Box::pin(super::trigger_webhook_and_raise_event( + Box::pin(super::outgoing::trigger_webhook_and_raise_event( state.clone(), business_profile, &key_store, diff --git a/crates/router/src/routes/webhooks.rs b/crates/router/src/routes/webhooks.rs index fee38dbacd2d..a198fe730fea 100644 --- a/crates/router/src/routes/webhooks.rs +++ b/crates/router/src/routes/webhooks.rs @@ -26,7 +26,7 @@ pub async fn receive_incoming_webhook( &req, (), |state, auth, _, req_state| { - webhooks::webhooks_wrapper::( + webhooks::incoming_webhooks_wrapper::( &flow, state.to_owned(), req_state,