diff --git a/crates/diesel_models/src/process_tracker.rs b/crates/diesel_models/src/process_tracker.rs index 135c8e2b0554..342ad8453af2 100644 --- a/crates/diesel_models/src/process_tracker.rs +++ b/crates/diesel_models/src/process_tracker.rs @@ -208,6 +208,7 @@ pub enum ProcessTrackerRunner { ApiKeyExpiryWorkflow, OutgoingWebhookRetryWorkflow, AttachPayoutAccountWorkflow, + PaymentMethodStatusUpdateWorkflow, } #[cfg(test)] diff --git a/crates/router/src/bin/scheduler.rs b/crates/router/src/bin/scheduler.rs index 167e7a6b6ede..1afc09e5a6a1 100644 --- a/crates/router/src/bin/scheduler.rs +++ b/crates/router/src/bin/scheduler.rs @@ -310,6 +310,9 @@ impl ProcessTrackerWorkflows for WorkflowRunner { ) } } + storage::ProcessTrackerRunner::PaymentMethodStatusUpdateWorkflow => Ok(Box::new( + workflows::payment_method_status_update::PaymentMethodStatusUpdateWorkflow, + )), } }; diff --git a/crates/router/src/core/payment_methods.rs b/crates/router/src/core/payment_methods.rs index f0aa0480d09c..a4d4f38bfaf0 100644 --- a/crates/router/src/core/payment_methods.rs +++ b/crates/router/src/core/payment_methods.rs @@ -8,11 +8,18 @@ use api_models::payments::CardToken; #[cfg(feature = "payouts")] pub use api_models::{enums::PayoutConnectors, payouts as payout_types}; use diesel_models::enums; +use error_stack::ResultExt; use hyperswitch_domain_models::payments::{payment_attempt::PaymentAttempt, PaymentIntent}; use router_env::{instrument, tracing}; use crate::{ - core::{errors::RouterResult, payments::helpers, pm_auth as core_pm_auth}, + consts, + core::{ + errors::{self, RouterResult}, + payments::helpers, + pm_auth as core_pm_auth, + }, + db, routes::SessionState, types::{ api::{self, payments}, @@ -20,6 +27,9 @@ use crate::{ }, }; +const PAYMENT_METHOD_STATUS_UPDATE_TASK: &str = "PAYMENT_METHOD_STATUS_UPDATE"; +const PAYMENT_METHOD_STATUS_TAG: &str = "PAYMENT_METHOD_STATUS"; + #[instrument(skip_all)] pub async fn retrieve_payment_method( pm_data: &Option, @@ -94,6 +104,66 @@ pub async fn retrieve_payment_method( } } +fn generate_task_id_for_payment_method_status_update_workflow( + key_id: &str, + runner: &storage::ProcessTrackerRunner, + task: &str, +) -> String { + format!("{runner}_{task}_{key_id}") +} + +pub async fn add_payment_method_status_update_task( + db: &dyn db::StorageInterface, + payment_method: &diesel_models::PaymentMethod, + prev_status: enums::PaymentMethodStatus, + curr_status: enums::PaymentMethodStatus, + merchant_id: &str, +) -> Result<(), errors::ProcessTrackerError> { + let created_at = payment_method.created_at; + let schedule_time = + created_at.saturating_add(time::Duration::seconds(consts::DEFAULT_SESSION_EXPIRY)); + + let tracking_data = storage::PaymentMethodStatusTrackingData { + payment_method_id: payment_method.payment_method_id.clone(), + prev_status, + curr_status, + merchant_id: merchant_id.to_string(), + }; + + let runner = storage::ProcessTrackerRunner::PaymentMethodStatusUpdateWorkflow; + let task = PAYMENT_METHOD_STATUS_UPDATE_TASK; + let tag = [PAYMENT_METHOD_STATUS_TAG]; + + let process_tracker_id = generate_task_id_for_payment_method_status_update_workflow( + payment_method.payment_method_id.as_str(), + &runner, + task, + ); + let process_tracker_entry = storage::ProcessTrackerNew::new( + process_tracker_id, + task, + runner, + tag, + tracking_data, + schedule_time, + ) + .change_context(errors::ApiErrorResponse::InternalServerError) + .attach_printable("Failed to construct PAYMENT_METHOD_STATUS_UPDATE process tracker task")?; + + db + .insert_process(process_tracker_entry) + .await + .change_context(errors::ApiErrorResponse::InternalServerError) + .attach_printable_lazy(|| { + format!( + "Failed while inserting PAYMENT_METHOD_STATUS_UPDATE reminder to process_tracker for payment_method_id: {}", + payment_method.payment_method_id.clone() + ) + })?; + + Ok(()) +} + #[instrument(skip_all)] pub async fn retrieve_payment_method_with_token( state: &SessionState, diff --git a/crates/router/src/core/payment_methods/cards.rs b/crates/router/src/core/payment_methods/cards.rs index fad3e6d918d4..ff4517a17d92 100644 --- a/crates/router/src/core/payment_methods/cards.rs +++ b/crates/router/src/core/payment_methods/cards.rs @@ -45,7 +45,9 @@ use crate::{ configs::settings, core::{ errors::{self, StorageErrorExt}, - payment_methods::{transformers as payment_methods, vault}, + payment_methods::{ + add_payment_method_status_update_task, transformers as payment_methods, vault, + }, payments::{ helpers, routing::{self, SessionFlowRoutingInput}, @@ -297,6 +299,21 @@ pub async fn get_client_secret_or_add_payment_method( ) .await?; + if res.status == enums::PaymentMethodStatus::AwaitingData { + add_payment_method_status_update_task( + db, + &res, + enums::PaymentMethodStatus::AwaitingData, + enums::PaymentMethodStatus::Inactive, + merchant_id, + ) + .await + .change_context(errors::ApiErrorResponse::InternalServerError) + .attach_printable( + "Failed to add payment method status update task in process tracker", + )?; + } + Ok(services::api::ApplicationResponse::Json( api::PaymentMethodResponse::foreign_from(res), )) @@ -357,7 +374,7 @@ pub async fn add_payment_method_data( .attach_printable("Unable to find payment method")?; if payment_method.status != enums::PaymentMethodStatus::AwaitingData { - return Err((errors::ApiErrorResponse::DuplicatePaymentMethod).into()); + return Err((errors::ApiErrorResponse::ClientSecretExpired).into()); } let customer_id = payment_method.customer_id.clone(); diff --git a/crates/router/src/types/storage/payment_method.rs b/crates/router/src/types/storage/payment_method.rs index d9f96ef482c1..308aa3625626 100644 --- a/crates/router/src/types/storage/payment_method.rs +++ b/crates/router/src/types/storage/payment_method.rs @@ -110,3 +110,11 @@ impl DerefMut for PaymentsMandateReference { &mut self.0 } } + +#[derive(Debug, serde::Deserialize, serde::Serialize, Clone)] +pub struct PaymentMethodStatusTrackingData { + pub payment_method_id: String, + pub prev_status: enums::PaymentMethodStatus, + pub curr_status: enums::PaymentMethodStatus, + pub merchant_id: String, +} diff --git a/crates/router/src/workflows.rs b/crates/router/src/workflows.rs index 7b29ded51851..2f858c59809a 100644 --- a/crates/router/src/workflows.rs +++ b/crates/router/src/workflows.rs @@ -3,6 +3,7 @@ pub mod api_key_expiry; #[cfg(feature = "payouts")] pub mod attach_payout_account_workflow; pub mod outgoing_webhook_retry; +pub mod payment_method_status_update; pub mod payment_sync; pub mod refund_router; pub mod tokenized_data; diff --git a/crates/router/src/workflows/payment_method_status_update.rs b/crates/router/src/workflows/payment_method_status_update.rs new file mode 100644 index 000000000000..b8e57360d90f --- /dev/null +++ b/crates/router/src/workflows/payment_method_status_update.rs @@ -0,0 +1,107 @@ +use common_utils::ext_traits::ValueExt; +use scheduler::{ + consumer::types::process_data, utils as pt_utils, workflows::ProcessTrackerWorkflow, +}; + +use crate::{ + errors, + logger::error, + routes::SessionState, + types::storage::{self, PaymentMethodStatusTrackingData}, +}; + +pub struct PaymentMethodStatusUpdateWorkflow; + +#[async_trait::async_trait] +impl ProcessTrackerWorkflow for PaymentMethodStatusUpdateWorkflow { + async fn execute_workflow<'a>( + &'a self, + state: &'a SessionState, + process: storage::ProcessTracker, + ) -> Result<(), errors::ProcessTrackerError> { + let db = &*state.store; + let tracking_data: PaymentMethodStatusTrackingData = process + .tracking_data + .clone() + .parse_value("PaymentMethodStatusTrackingData")?; + + let retry_count = process.retry_count; + let pm_id = tracking_data.payment_method_id; + let prev_pm_status = tracking_data.prev_status; + let curr_pm_status = tracking_data.curr_status; + let merchant_id = tracking_data.merchant_id; + + let key_store = state + .store + .get_merchant_key_store_by_merchant_id( + merchant_id.as_str(), + &state.store.get_master_key().to_vec().into(), + ) + .await?; + + let merchant_account = db + .find_merchant_account_by_merchant_id(&merchant_id, &key_store) + .await?; + + let payment_method = db + .find_payment_method(&pm_id, merchant_account.storage_scheme) + .await?; + + if payment_method.status != prev_pm_status { + return db + .as_scheduler() + .finish_process_with_business_status(process, "PROCESS_ALREADY_COMPLETED") + .await + .map_err(Into::::into); + } + + let pm_update = storage::PaymentMethodUpdate::StatusUpdate { + status: Some(curr_pm_status), + }; + + let res = db + .update_payment_method(payment_method, pm_update, merchant_account.storage_scheme) + .await + .map_err(errors::ProcessTrackerError::EStorageError); + + if let Ok(_pm) = res { + db.as_scheduler() + .finish_process_with_business_status(process, "COMPLETED_BY_PT") + .await?; + } else { + let mapping = process_data::PaymentMethodsPTMapping::default(); + let time_delta = if retry_count == 0 { + Some(mapping.default_mapping.start_after) + } else { + pt_utils::get_delay(retry_count + 1, &mapping.default_mapping.frequencies) + }; + + let schedule_time = pt_utils::get_time_from_delta(time_delta); + + match schedule_time { + Some(s_time) => db + .as_scheduler() + .retry_process(process, s_time) + .await + .map_err(Into::::into)?, + None => db + .as_scheduler() + .finish_process_with_business_status(process, "RETRIES_EXCEEDED") + .await + .map_err(Into::::into)?, + }; + }; + + Ok(()) + } + + async fn error_handler<'a>( + &'a self, + _state: &'a SessionState, + process: storage::ProcessTracker, + _error: errors::ProcessTrackerError, + ) -> errors::CustomResult<(), errors::ProcessTrackerError> { + error!(%process.id, "Failed while executing workflow"); + Ok(()) + } +} diff --git a/crates/scheduler/src/utils.rs b/crates/scheduler/src/utils.rs index 7f073a8e10b9..f3253e0ad343 100644 --- a/crates/scheduler/src/utils.rs +++ b/crates/scheduler/src/utils.rs @@ -342,7 +342,7 @@ pub fn get_outgoing_webhook_retry_schedule_time( } /// Get the delay based on the retry count -fn get_delay<'a>( +pub fn get_delay<'a>( retry_count: i32, frequencies: impl IntoIterator, ) -> Option {