diff --git a/config/development.toml b/config/development.toml index 80f4d2b8eaa5..af26d91446f8 100644 --- a/config/development.toml +++ b/config/development.toml @@ -560,6 +560,7 @@ delay_between_retries_in_milliseconds = 500 [kv_config] ttl = 900 # 15 * 60 seconds +soft_kill = false [frm] enabled = true diff --git a/config/docker_compose.toml b/config/docker_compose.toml index b9b25e4666fd..c13c436a19c4 100644 --- a/config/docker_compose.toml +++ b/config/docker_compose.toml @@ -450,6 +450,7 @@ queue_strategy = "Fifo" [kv_config] ttl = 900 # 15 * 60 seconds +soft_kill = false [frm] enabled = true diff --git a/crates/api_models/src/admin.rs b/crates/api_models/src/admin.rs index 93cf7574d988..03256a51ec79 100644 --- a/crates/api_models/src/admin.rs +++ b/crates/api_models/src/admin.rs @@ -826,6 +826,23 @@ pub struct ToggleKVRequest { pub kv_enabled: bool, } +#[derive(Debug, Clone, Serialize, Deserialize, ToSchema)] +pub struct ToggleAllKVRequest { + /// Status of KV for the specific merchant + #[schema(example = true)] + pub kv_enabled: bool, +} + +#[derive(Debug, Clone, Serialize, Deserialize, ToSchema)] +pub struct ToggleAllKVResponse { + ///Total number of updated merchants + #[schema(example = 20)] + pub total_updated: usize, + /// Status of KV for the specific merchant + #[schema(example = true)] + pub kv_enabled: bool, +} + #[derive(Debug, Clone, Default, Eq, PartialEq, serde::Deserialize, serde::Serialize, ToSchema)] pub struct MerchantConnectorDetailsWrap { /// Creds Identifier is to uniquely identify the credentials. Do not send any sensitive info in this field. And do not send the string "null". diff --git a/crates/api_models/src/events.rs b/crates/api_models/src/events.rs index c9ae775fed87..9c26576e77b5 100644 --- a/crates/api_models/src/events.rs +++ b/crates/api_models/src/events.rs @@ -60,6 +60,8 @@ impl_misc_api_event_type!( RevokeApiKeyResponse, ToggleKVResponse, ToggleKVRequest, + ToggleAllKVRequest, + ToggleAllKVResponse, MerchantAccountDeleteResponse, MerchantAccountUpdate, CardInfoResponse, diff --git a/crates/diesel_models/src/customers.rs b/crates/diesel_models/src/customers.rs index fcabc8879b3f..0d1657136e02 100644 --- a/crates/diesel_models/src/customers.rs +++ b/crates/diesel_models/src/customers.rs @@ -1,3 +1,4 @@ +use common_enums::MerchantStorageScheme; use common_utils::pii; use diesel::{AsChangeset, Identifiable, Insertable, Queryable}; use time::PrimitiveDateTime; @@ -21,6 +22,13 @@ pub struct CustomerNew { pub created_at: PrimitiveDateTime, pub modified_at: PrimitiveDateTime, pub address_id: Option, + pub updated_by: Option, +} + +impl CustomerNew { + pub fn update_storage_scheme(&mut self, storage_scheme: MerchantStorageScheme) { + self.updated_by = Some(storage_scheme.to_string()); + } } impl From for Customer { @@ -40,6 +48,7 @@ impl From for Customer { modified_at: customer_new.modified_at, address_id: customer_new.address_id, default_payment_method_id: None, + updated_by: customer_new.updated_by, } } } @@ -61,6 +70,7 @@ pub struct Customer { pub modified_at: PrimitiveDateTime, pub address_id: Option, pub default_payment_method_id: Option, + pub updated_by: Option, } #[derive( @@ -84,6 +94,7 @@ pub struct CustomerUpdateInternal { pub connector_customer: Option, pub address_id: Option, pub default_payment_method_id: Option>, + pub updated_by: Option, } impl CustomerUpdateInternal { diff --git a/crates/diesel_models/src/mandate.rs b/crates/diesel_models/src/mandate.rs index 39c43a4a178b..65576b08f237 100644 --- a/crates/diesel_models/src/mandate.rs +++ b/crates/diesel_models/src/mandate.rs @@ -1,3 +1,4 @@ +use common_enums::MerchantStorageScheme; use common_utils::pii; use diesel::{AsChangeset, Identifiable, Insertable, Queryable}; use masking::Secret; @@ -32,6 +33,7 @@ pub struct Mandate { pub connector_mandate_ids: Option, pub original_payment_id: Option, pub merchant_connector_id: Option, + pub updated_by: Option, } #[derive( @@ -69,6 +71,13 @@ pub struct MandateNew { pub connector_mandate_ids: Option, pub original_payment_id: Option, pub merchant_connector_id: Option, + pub updated_by: Option, +} + +impl MandateNew { + pub fn update_storage_scheme(&mut self, storage_scheme: MerchantStorageScheme) { + self.updated_by = Some(storage_scheme.to_string()); + } } #[derive(Debug)] @@ -90,6 +99,17 @@ pub enum MandateUpdate { }, } +impl MandateUpdate { + pub fn convert_to_mandate_update( + self, + storage_scheme: MerchantStorageScheme, + ) -> MandateUpdateInternal { + let mut updated_object = MandateUpdateInternal::from(self); + updated_object.updated_by = Some(storage_scheme.to_string()); + updated_object + } +} + #[derive(Clone, Eq, PartialEq, Copy, Debug, Default, serde::Serialize, serde::Deserialize)] pub struct SingleUseMandate { pub amount: i64, @@ -113,6 +133,7 @@ pub struct MandateUpdateInternal { connector_mandate_id: Option, payment_method_id: Option, original_payment_id: Option, + updated_by: Option, } impl From for MandateUpdateInternal { @@ -125,6 +146,7 @@ impl From for MandateUpdateInternal { connector_mandate_id: None, payment_method_id: None, original_payment_id: None, + updated_by: None, }, MandateUpdate::CaptureAmountUpdate { amount_captured } => Self { mandate_status: None, @@ -133,6 +155,7 @@ impl From for MandateUpdateInternal { connector_mandate_id: None, payment_method_id: None, original_payment_id: None, + updated_by: None, }, MandateUpdate::ConnectorReferenceUpdate { connector_mandate_ids, @@ -165,6 +188,7 @@ impl MandateUpdateInternal { connector_mandate_id, payment_method_id, original_payment_id, + updated_by, } = self; Mandate { @@ -174,6 +198,7 @@ impl MandateUpdateInternal { connector_mandate_id: connector_mandate_id.map_or(source.connector_mandate_id, Some), payment_method_id: payment_method_id.unwrap_or(source.payment_method_id), original_payment_id: original_payment_id.map_or(source.original_payment_id, Some), + updated_by: updated_by.map_or(source.updated_by, Some), ..source } } @@ -208,6 +233,7 @@ impl From<&MandateNew> for Mandate { connector_mandate_ids: mandate_new.connector_mandate_ids.clone(), original_payment_id: mandate_new.original_payment_id.clone(), merchant_connector_id: mandate_new.merchant_connector_id.clone(), + updated_by: mandate_new.updated_by.clone(), } } } diff --git a/crates/diesel_models/src/payment_method.rs b/crates/diesel_models/src/payment_method.rs index e71841743145..99e0972e3daa 100644 --- a/crates/diesel_models/src/payment_method.rs +++ b/crates/diesel_models/src/payment_method.rs @@ -1,3 +1,4 @@ +use common_enums::MerchantStorageScheme; use common_utils::pii; use diesel::{AsChangeset, Identifiable, Insertable, Queryable}; use masking::Secret; @@ -41,6 +42,7 @@ pub struct PaymentMethod { pub network_transaction_id: Option, pub client_secret: Option, pub payment_method_billing_address: Option, + pub updated_by: Option, } #[derive( @@ -77,6 +79,13 @@ pub struct PaymentMethodNew { pub network_transaction_id: Option, pub client_secret: Option, pub payment_method_billing_address: Option, + pub updated_by: Option, +} + +impl PaymentMethodNew { + pub fn update_storage_scheme(&mut self, storage_scheme: MerchantStorageScheme) { + self.updated_by = Some(storage_scheme.to_string()); + } } #[derive(Debug, Eq, PartialEq, Deserialize, Serialize)] @@ -116,6 +125,17 @@ pub enum PaymentMethodUpdate { }, } +impl PaymentMethodUpdate { + pub fn convert_to_payment_method_update( + self, + storage_scheme: MerchantStorageScheme, + ) -> PaymentMethodUpdateInternal { + let mut update_internal: PaymentMethodUpdateInternal = self.into(); + update_internal.updated_by = Some(storage_scheme.to_string()); + update_internal + } +} + #[derive( Clone, Debug, Default, AsChangeset, router_derive::DebugAsDisplay, Serialize, Deserialize, )] @@ -131,6 +151,7 @@ pub struct PaymentMethodUpdateInternal { connector_mandate_details: Option, payment_method_type: Option, payment_method_issuer: Option, + updated_by: Option, } impl PaymentMethodUpdateInternal { @@ -148,6 +169,7 @@ impl PaymentMethodUpdateInternal { network_transaction_id, status, connector_mandate_details, + updated_by, .. } = self; @@ -160,6 +182,7 @@ impl PaymentMethodUpdateInternal { status: status.unwrap_or(source.status), connector_mandate_details: connector_mandate_details .map_or(source.connector_mandate_details, Some), + updated_by: updated_by.map_or(source.updated_by, Some), ..source } } @@ -179,6 +202,7 @@ impl From for PaymentMethodUpdateInternal { connector_mandate_details: None, payment_method_issuer: None, payment_method_type: None, + updated_by: None, }, PaymentMethodUpdate::PaymentMethodDataUpdate { payment_method_data, @@ -193,6 +217,7 @@ impl From for PaymentMethodUpdateInternal { connector_mandate_details: None, payment_method_issuer: None, payment_method_type: None, + updated_by: None, }, PaymentMethodUpdate::LastUsedUpdate { last_used_at } => Self { metadata: None, @@ -205,6 +230,7 @@ impl From for PaymentMethodUpdateInternal { connector_mandate_details: None, payment_method_issuer: None, payment_method_type: None, + updated_by: None, }, PaymentMethodUpdate::NetworkTransactionIdAndStatusUpdate { network_transaction_id, @@ -220,6 +246,7 @@ impl From for PaymentMethodUpdateInternal { connector_mandate_details: None, payment_method_issuer: None, payment_method_type: None, + updated_by: None, }, PaymentMethodUpdate::StatusUpdate { status } => Self { metadata: None, @@ -232,6 +259,7 @@ impl From for PaymentMethodUpdateInternal { connector_mandate_details: None, payment_method_issuer: None, payment_method_type: None, + updated_by: None, }, PaymentMethodUpdate::AdditionalDataUpdate { payment_method_data, @@ -251,6 +279,7 @@ impl From for PaymentMethodUpdateInternal { connector_mandate_details: None, payment_method_issuer, payment_method_type, + updated_by: None, }, PaymentMethodUpdate::ConnectorMandateDetailsUpdate { connector_mandate_details, @@ -265,6 +294,7 @@ impl From for PaymentMethodUpdateInternal { network_transaction_id: None, payment_method_issuer: None, payment_method_type: None, + updated_by: None, }, } } @@ -305,6 +335,7 @@ impl From<&PaymentMethodNew> for PaymentMethod { payment_method_billing_address: payment_method_new .payment_method_billing_address .clone(), + updated_by: payment_method_new.updated_by.clone(), } } } diff --git a/crates/diesel_models/src/query/merchant_account.rs b/crates/diesel_models/src/query/merchant_account.rs index 1d4eef752060..dd2f284305be 100644 --- a/crates/diesel_models/src/query/merchant_account.rs +++ b/crates/diesel_models/src/query/merchant_account.rs @@ -123,4 +123,16 @@ impl MerchantAccount { ) .await } + + pub async fn update_all_merchant_accounts( + conn: &PgPooledConn, + merchant_account: MerchantAccountUpdateInternal, + ) -> StorageResult> { + generics::generic_update_with_results::<::Table, _, _, _>( + conn, + dsl::merchant_id.ne_all(vec![""]), + merchant_account, + ) + .await + } } diff --git a/crates/diesel_models/src/schema.rs b/crates/diesel_models/src/schema.rs index e63c14eef156..c819ce4de1e7 100644 --- a/crates/diesel_models/src/schema.rs +++ b/crates/diesel_models/src/schema.rs @@ -295,6 +295,8 @@ diesel::table! { address_id -> Nullable, #[max_length = 64] default_payment_method_id -> Nullable, + #[max_length = 64] + updated_by -> Nullable, } } @@ -590,6 +592,8 @@ diesel::table! { original_payment_id -> Nullable, #[max_length = 32] merchant_connector_id -> Nullable, + #[max_length = 64] + updated_by -> Nullable, } } @@ -928,6 +932,8 @@ diesel::table! { #[max_length = 128] client_secret -> Nullable, payment_method_billing_address -> Nullable, + #[max_length = 64] + updated_by -> Nullable, } } diff --git a/crates/router/src/configs/defaults.rs b/crates/router/src/configs/defaults.rs index 667bc90fe45f..213492ac0d93 100644 --- a/crates/router/src/configs/defaults.rs +++ b/crates/router/src/configs/defaults.rs @@ -123,7 +123,10 @@ impl Default for super::settings::DrainerSettings { #[cfg(feature = "kv_store")] impl Default for super::settings::KvConfig { fn default() -> Self { - Self { ttl: 900 } + Self { + ttl: 900, + soft_kill: Some(false), + } } } diff --git a/crates/router/src/configs/settings.rs b/crates/router/src/configs/settings.rs index 2dfae47bff13..9564a759ccd8 100644 --- a/crates/router/src/configs/settings.rs +++ b/crates/router/src/configs/settings.rs @@ -18,7 +18,7 @@ use external_services::{ }, }; use hyperswitch_interfaces::secrets_interface::secret_state::{ - SecretState, SecretStateContainer, SecuredSecret, + RawSecret, SecretState, SecretStateContainer, SecuredSecret, }; use masking::Secret; use redis_interface::RedisSettings; @@ -138,6 +138,7 @@ pub struct Frm { #[derive(Debug, Deserialize, Clone)] pub struct KvConfig { pub ttl: u32, + pub soft_kill: Option, } #[derive(Debug, Deserialize, Clone, Default)] @@ -759,6 +760,18 @@ impl Settings { } } +impl Settings { + #[cfg(feature = "kv_store")] + pub fn is_kv_soft_kill_mode(&self) -> bool { + self.kv_config.soft_kill.unwrap_or(false) + } + + #[cfg(not(feature = "kv_store"))] + pub fn is_kv_soft_kill_mode(&self) -> bool { + false + } +} + #[cfg(feature = "payouts")] #[derive(Debug, Deserialize, Clone, Default)] pub struct Payouts { diff --git a/crates/router/src/core/admin.rs b/crates/router/src/core/admin.rs index 288de66de07c..4c2ebb516e63 100644 --- a/crates/router/src/core/admin.rs +++ b/crates/router/src/core/admin.rs @@ -1382,6 +1382,13 @@ pub async fn kv_for_merchant( Ok(merchant_account) } (true, MerchantStorageScheme::PostgresOnly) => { + if state.conf.as_ref().is_kv_soft_kill_mode() { + Err(errors::ApiErrorResponse::InvalidRequestData { + message: "Kv cannot be enabled when application is in soft_kill_mode" + .to_owned(), + })? + } + db.update_merchant( merchant_account, storage::MerchantAccountUpdate::StorageSchemeUpdate { @@ -1420,6 +1427,36 @@ pub async fn kv_for_merchant( )) } +pub async fn toggle_kv_for_all_merchants( + state: AppState, + enable: bool, +) -> RouterResponse { + let db = state.store.as_ref(); + let storage_scheme = if enable { + MerchantStorageScheme::RedisKv + } else { + MerchantStorageScheme::PostgresOnly + }; + + let total_update = db + .update_all_merchant_account(storage::MerchantAccountUpdate::StorageSchemeUpdate { + storage_scheme, + }) + .await + .map_err(|error| { + error + .change_context(errors::ApiErrorResponse::InternalServerError) + .attach_printable("Failed to switch merchant_storage_scheme for all merchants") + })?; + + Ok(service_api::ApplicationResponse::Json( + api_models::admin::ToggleAllKVResponse { + total_updated: total_update, + kv_enabled: enable, + }, + )) +} + pub async fn check_merchant_account_kv_status( state: AppState, merchant_id: String, diff --git a/crates/router/src/core/customers.rs b/crates/router/src/core/customers.rs index 3b10b408f9e6..0f3d60cccd79 100644 --- a/crates/router/src/core/customers.rs +++ b/crates/router/src/core/customers.rs @@ -116,6 +116,7 @@ pub async fn create_customer( created_at: common_utils::date_time::now(), modified_at: common_utils::date_time::now(), default_payment_method_id: None, + updated_by: None, }) } .await diff --git a/crates/router/src/core/payment_methods/cards.rs b/crates/router/src/core/payment_methods/cards.rs index 741a0e246b65..4b6aa3f58b2b 100644 --- a/crates/router/src/core/payment_methods/cards.rs +++ b/crates/router/src/core/payment_methods/cards.rs @@ -140,6 +140,7 @@ pub async fn create_payment_method( last_modified: current_time, last_used_at: current_time, payment_method_billing_address, + updated_by: None, }, storage_scheme, ) diff --git a/crates/router/src/core/payments/helpers.rs b/crates/router/src/core/payments/helpers.rs index 38acf574a7c6..c1b48a361391 100644 --- a/crates/router/src/core/payments/helpers.rs +++ b/crates/router/src/core/payments/helpers.rs @@ -1608,6 +1608,7 @@ pub async fn create_customer_if_not_exist<'a, F: Clone, R>( connector_customer: None, address_id: None, default_payment_method_id: None, + updated_by: None, }, ) } diff --git a/crates/router/src/core/payouts/helpers.rs b/crates/router/src/core/payouts/helpers.rs index c22f74bab4a8..f356e5f05a14 100644 --- a/crates/router/src/core/payouts/helpers.rs +++ b/crates/router/src/core/payouts/helpers.rs @@ -626,6 +626,7 @@ pub async fn get_or_create_customer_details( modified_at: common_utils::date_time::now(), address_id: None, default_payment_method_id: None, + updated_by: None, }; Ok(Some( diff --git a/crates/router/src/core/pm_auth.rs b/crates/router/src/core/pm_auth.rs index 8d30a79df19e..f184e166e22d 100644 --- a/crates/router/src/core/pm_auth.rs +++ b/crates/router/src/core/pm_auth.rs @@ -467,6 +467,7 @@ async fn store_bank_details_in_payment_methods( network_transaction_id: None, client_secret: None, payment_method_billing_address: None, + updated_by: None, }; new_entries.push(pm_new); diff --git a/crates/router/src/db/address.rs b/crates/router/src/db/address.rs index c93e33b14718..5a6b2606d0c1 100644 --- a/crates/router/src/db/address.rs +++ b/crates/router/src/db/address.rs @@ -275,7 +275,9 @@ mod storage { use error_stack::{report, ResultExt}; use redis_interface::HsetnxReply; use router_env::{instrument, tracing}; - use storage_impl::redis::kv_store::{kv_wrapper, KvOperation, PartitionKey}; + use storage_impl::redis::kv_store::{ + decide_storage_scheme, kv_wrapper, KvOperation, Op, PartitionKey, + }; use super::AddressInterface; use crate::{ @@ -332,6 +334,9 @@ mod storage { .await .map_err(|error| report!(errors::StorageError::from(error))) }; + let storage_scheme = + decide_storage_scheme::<_, storage_types::Address>(self, storage_scheme, Op::Find) + .await; let address = match storage_scheme { MerchantStorageScheme::PostgresOnly => database_call().await, MerchantStorageScheme::RedisKv => { @@ -394,6 +399,18 @@ mod storage { let address = Conversion::convert(this) .await .change_context(errors::StorageError::EncryptionError)?; + let merchant_id = address.merchant_id.clone(); + let key = PartitionKey::MerchantIdPaymentId { + merchant_id: &merchant_id, + payment_id: &payment_id, + }; + let field = format!("add_{}", address.address_id); + let storage_scheme = decide_storage_scheme::<_, storage_types::Address>( + self, + storage_scheme, + Op::Update(key.clone(), &field, Some(address.updated_by.as_str())), + ) + .await; match storage_scheme { MerchantStorageScheme::PostgresOnly => { address @@ -409,12 +426,6 @@ mod storage { .await } MerchantStorageScheme::RedisKv => { - let merchant_id = address.merchant_id.clone(); - let key = PartitionKey::MerchantIdPaymentId { - merchant_id: &merchant_id, - payment_id: &payment_id, - }; - let field = format!("add_{}", address.address_id); let updated_address = AddressUpdateInternal::from(address_update.clone()) .create_address(address.clone()); let redis_value = serde_json::to_string(&updated_address) @@ -466,6 +477,12 @@ mod storage { .await .change_context(errors::StorageError::EncryptionError)?; let merchant_id = address_new.merchant_id.clone(); + let storage_scheme = decide_storage_scheme::<_, storage_types::Address>( + self, + storage_scheme, + Op::Insert, + ) + .await; match storage_scheme { MerchantStorageScheme::PostgresOnly => { let conn = connection::pg_connection_write(self).await?; diff --git a/crates/router/src/db/customers.rs b/crates/router/src/db/customers.rs index 00fb7e4a08f7..5f8dbe69f803 100644 --- a/crates/router/src/db/customers.rs +++ b/crates/router/src/db/customers.rs @@ -75,7 +75,9 @@ mod storage { use futures::future::try_join_all; use masking::PeekInterface; use router_env::{instrument, tracing}; - use storage_impl::redis::kv_store::{kv_wrapper, KvOperation, PartitionKey}; + use storage_impl::redis::kv_store::{ + decide_storage_scheme, kv_wrapper, KvOperation, Op, PartitionKey, + }; use super::CustomerInterface; use crate::{ @@ -116,7 +118,9 @@ mod storage { .await .map_err(|err| report!(errors::StorageError::from(err))) }; - + let storage_scheme = + decide_storage_scheme::<_, diesel_models::Customer>(self, storage_scheme, Op::Find) + .await; let maybe_customer = match storage_scheme { MerchantStorageScheme::PostgresOnly => database_call().await, MerchantStorageScheme::RedisKv => { @@ -184,15 +188,20 @@ mod storage { .await .map_err(|error| report!(errors::StorageError::from(error))) }; - + let key = PartitionKey::MerchantIdCustomerId { + merchant_id: merchant_id.as_str(), + customer_id: customer_id.as_str(), + }; + let field = format!("cust_{}", customer_id); + let storage_scheme = decide_storage_scheme::<_, diesel_models::Customer>( + self, + storage_scheme, + Op::Update(key.clone(), &field, customer.updated_by.as_deref()), + ) + .await; let updated_object = match storage_scheme { MerchantStorageScheme::PostgresOnly => database_call().await, MerchantStorageScheme::RedisKv => { - let key = PartitionKey::MerchantIdCustomerId { - merchant_id: merchant_id.as_str(), - customer_id: customer_id.as_str(), - }; - let field = format!("cust_{}", customer_id); let updated_customer = diesel_models::CustomerUpdateInternal::from(customer_update.clone()) .apply_changeset(customer.clone()); @@ -250,7 +259,9 @@ mod storage { .await .map_err(|error| report!(errors::StorageError::from(error))) }; - + let storage_scheme = + decide_storage_scheme::<_, diesel_models::Customer>(self, storage_scheme, Op::Find) + .await; let customer = match storage_scheme { MerchantStorageScheme::PostgresOnly => database_call().await, MerchantStorageScheme::RedisKv => { @@ -324,11 +335,17 @@ mod storage { ) -> CustomResult { let customer_id = customer_data.customer_id.clone(); let merchant_id = customer_data.merchant_id.clone(); - let new_customer = customer_data + let mut new_customer = customer_data .construct_new() .await .change_context(errors::StorageError::EncryptionError)?; - + let storage_scheme = decide_storage_scheme::<_, diesel_models::Customer>( + self, + storage_scheme, + Op::Insert, + ) + .await; + new_customer.update_storage_scheme(storage_scheme); let create_customer = match storage_scheme { MerchantStorageScheme::PostgresOnly => { let conn = connection::pg_connection_write(self).await?; diff --git a/crates/router/src/db/kafka_store.rs b/crates/router/src/db/kafka_store.rs index bfe0592fd3da..88579892d2f2 100644 --- a/crates/router/src/db/kafka_store.rs +++ b/crates/router/src/db/kafka_store.rs @@ -828,6 +828,15 @@ impl MerchantAccountInterface for KafkaStore { .await } + async fn update_all_merchant_account( + &self, + merchant_account: storage::MerchantAccountUpdate, + ) -> CustomResult { + self.diesel_store + .update_all_merchant_account(merchant_account) + .await + } + async fn find_merchant_account_by_publishable_key( &self, publishable_key: &str, diff --git a/crates/router/src/db/mandate.rs b/crates/router/src/db/mandate.rs index 1751657e3f9c..c9995d502c0d 100644 --- a/crates/router/src/db/mandate.rs +++ b/crates/router/src/db/mandate.rs @@ -57,7 +57,9 @@ mod storage { use error_stack::{report, ResultExt}; use redis_interface::HsetnxReply; use router_env::{instrument, tracing}; - use storage_impl::redis::kv_store::{kv_wrapper, KvOperation, PartitionKey}; + use storage_impl::redis::kv_store::{ + decide_storage_scheme, kv_wrapper, KvOperation, Op, PartitionKey, + }; use super::MandateInterface; use crate::{ @@ -88,7 +90,9 @@ mod storage { .await .map_err(|error| report!(errors::StorageError::from(error))) }; - + let storage_scheme = + decide_storage_scheme::<_, diesel_models::Mandate>(self, storage_scheme, Op::Find) + .await; match storage_scheme { MerchantStorageScheme::PostgresOnly => database_call().await, MerchantStorageScheme::RedisKv => { @@ -132,7 +136,9 @@ mod storage { .await .map_err(|error| report!(errors::StorageError::from(error))) }; - + let storage_scheme = + decide_storage_scheme::<_, diesel_models::Mandate>(self, storage_scheme, Op::Find) + .await; match storage_scheme { MerchantStorageScheme::PostgresOnly => database_call().await, MerchantStorageScheme::RedisKv => { @@ -187,24 +193,29 @@ mod storage { storage_scheme: MerchantStorageScheme, ) -> CustomResult { let conn = connection::pg_connection_write(self).await?; - + let key = PartitionKey::MerchantIdMandateId { + merchant_id, + mandate_id, + }; + let field = format!("mandate_{}", mandate_id); + let storage_scheme = decide_storage_scheme::<_, diesel_models::Mandate>( + self, + storage_scheme, + Op::Update(key.clone(), &field, mandate.updated_by.as_deref()), + ) + .await; match storage_scheme { MerchantStorageScheme::PostgresOnly => { storage_types::Mandate::update_by_merchant_id_mandate_id( &conn, merchant_id, mandate_id, - storage_types::MandateUpdateInternal::from(mandate_update), + mandate_update.convert_to_mandate_update(storage_scheme), ) .await .map_err(|error| report!(errors::StorageError::from(error))) } MerchantStorageScheme::RedisKv => { - let key = PartitionKey::MerchantIdMandateId { - merchant_id, - mandate_id, - }; - let field = format!("mandate_{}", mandate_id); let key_str = key.to_string(); if let diesel_models::MandateUpdate::ConnectorMandateIdUpdate { @@ -223,7 +234,7 @@ mod storage { .await?; } - let m_update = diesel_models::MandateUpdateInternal::from(mandate_update); + let m_update = mandate_update.convert_to_mandate_update(storage_scheme); let updated_mandate = m_update.clone().apply_changeset(mandate.clone()); let redis_value = serde_json::to_string(&updated_mandate) @@ -271,11 +282,17 @@ mod storage { #[instrument(skip_all)] async fn insert_mandate( &self, - mandate: storage_types::MandateNew, + mut mandate: storage_types::MandateNew, storage_scheme: MerchantStorageScheme, ) -> CustomResult { let conn = connection::pg_connection_write(self).await?; - + let storage_scheme = decide_storage_scheme::<_, diesel_models::Mandate>( + self, + storage_scheme, + Op::Insert, + ) + .await; + mandate.update_storage_scheme(storage_scheme); match storage_scheme { MerchantStorageScheme::PostgresOnly => mandate .insert(&conn) @@ -516,32 +533,9 @@ impl MandateInterface for MockDb { .find(|mandate| mandate.merchant_id == merchant_id && mandate.mandate_id == mandate_id) { Some(mandate) => { - match mandate_update { - storage_types::MandateUpdate::StatusUpdate { mandate_status } => { - mandate.mandate_status = mandate_status; - } - storage_types::MandateUpdate::CaptureAmountUpdate { amount_captured } => { - mandate.amount_captured = amount_captured; - } - storage_types::MandateUpdate::ConnectorReferenceUpdate { - connector_mandate_ids, - } => { - mandate.connector_mandate_ids = connector_mandate_ids; - } - - diesel_models::MandateUpdate::ConnectorMandateIdUpdate { - connector_mandate_id, - connector_mandate_ids, - payment_method_id, - original_payment_id, - } => { - mandate.connector_mandate_ids = connector_mandate_ids; - mandate.connector_mandate_id = connector_mandate_id; - mandate.payment_method_id = payment_method_id; - mandate.original_payment_id = original_payment_id - } - } - Ok(mandate.clone()) + let m_update = diesel_models::MandateUpdateInternal::from(mandate_update); + let updated_mandate = m_update.clone().apply_changeset(mandate.clone()); + Ok(updated_mandate) } None => { Err(errors::StorageError::ValueNotFound("mandate not found".to_string()).into()) @@ -634,6 +628,7 @@ impl MandateInterface for MockDb { metadata: mandate_new.metadata, connector_mandate_ids: mandate_new.connector_mandate_ids, merchant_connector_id: mandate_new.merchant_connector_id, + updated_by: mandate_new.updated_by, }; mandates.push(mandate.clone()); Ok(mandate) diff --git a/crates/router/src/db/merchant_account.rs b/crates/router/src/db/merchant_account.rs index 08d0c2790473..c323ca4abb15 100644 --- a/crates/router/src/db/merchant_account.rs +++ b/crates/router/src/db/merchant_account.rs @@ -2,6 +2,7 @@ use std::collections::HashMap; use common_utils::ext_traits::AsyncExt; +use diesel_models::MerchantAccountUpdateInternal; use error_stack::{report, ResultExt}; use router_env::{instrument, tracing}; #[cfg(feature = "accounts_cache")] @@ -40,6 +41,11 @@ where merchant_key_store: &domain::MerchantKeyStore, ) -> CustomResult; + async fn update_all_merchant_account( + &self, + merchant_account: storage::MerchantAccountUpdate, + ) -> CustomResult; + async fn update_merchant( &self, this: domain::MerchantAccount, @@ -354,6 +360,38 @@ impl MerchantAccountInterface for Store { Ok(merchant_accounts) } + + async fn update_all_merchant_account( + &self, + merchant_account: storage::MerchantAccountUpdate, + ) -> CustomResult { + let conn = connection::pg_connection_read(self).await?; + + let db_func = || async { + storage::MerchantAccount::update_all_merchant_accounts( + &conn, + MerchantAccountUpdateInternal::from(merchant_account), + ) + .await + .map_err(|error| report!(errors::StorageError::from(error))) + }; + + let total; + #[cfg(not(feature = "accounts_cache"))] + { + let ma = db_func().await?; + total = ma.len(); + } + + #[cfg(feature = "accounts_cache")] + { + let ma = db_func().await?; + publish_and_redact_all_merchant_account_cache(self, &ma).await?; + total = ma.len(); + } + + Ok(total) + } } #[async_trait::async_trait] @@ -433,6 +471,13 @@ impl MerchantAccountInterface for MockDb { Err(errors::StorageError::MockDbError)? } + async fn update_all_merchant_account( + &self, + _merchant_account_update: storage::MerchantAccountUpdate, + ) -> CustomResult { + Err(errors::StorageError::MockDbError)? + } + async fn delete_merchant_account_by_merchant_id( &self, _merchant_id: &str, @@ -477,3 +522,22 @@ async fn publish_and_redact_merchant_account_cache( super::cache::publish_into_redact_channel(store, cache_keys).await?; Ok(()) } + +#[cfg(feature = "accounts_cache")] +async fn publish_and_redact_all_merchant_account_cache( + store: &dyn super::StorageInterface, + merchant_accounts: &[storage::MerchantAccount], +) -> CustomResult<(), errors::StorageError> { + let merchant_ids = merchant_accounts.iter().map(|m| m.merchant_id.clone()); + let publishable_keys = merchant_accounts + .iter() + .filter_map(|m| m.publishable_key.clone()); + + let cache_keys: Vec> = merchant_ids + .chain(publishable_keys) + .map(|s| CacheKind::Accounts(s.into())) + .collect(); + + super::cache::publish_into_redact_channel(store, cache_keys).await?; + Ok(()) +} diff --git a/crates/router/src/db/payment_method.rs b/crates/router/src/db/payment_method.rs index 7c987c38d5b3..dc685ae49d32 100644 --- a/crates/router/src/db/payment_method.rs +++ b/crates/router/src/db/payment_method.rs @@ -71,7 +71,9 @@ mod storage { use error_stack::{report, ResultExt}; use redis_interface::HsetnxReply; use router_env::{instrument, tracing}; - use storage_impl::redis::kv_store::{kv_wrapper, KvOperation, PartitionKey}; + use storage_impl::redis::kv_store::{ + decide_storage_scheme, kv_wrapper, KvOperation, Op, PartitionKey, + }; use super::PaymentMethodInterface; use crate::{ @@ -97,7 +99,12 @@ mod storage { .await .map_err(|error| report!(errors::StorageError::from(error))) }; - + let storage_scheme = decide_storage_scheme::<_, storage_types::PaymentMethod>( + self, + storage_scheme, + Op::Find, + ) + .await; match storage_scheme { MerchantStorageScheme::PostgresOnly => database_call().await, MerchantStorageScheme::RedisKv => { @@ -141,7 +148,12 @@ mod storage { .await .map_err(|error| report!(errors::StorageError::from(error))) }; - + let storage_scheme = decide_storage_scheme::<_, storage_types::PaymentMethod>( + self, + storage_scheme, + Op::Find, + ) + .await; match storage_scheme { MerchantStorageScheme::PostgresOnly => database_call().await, MerchantStorageScheme::RedisKv => { @@ -194,9 +206,16 @@ mod storage { #[instrument(skip_all)] async fn insert_payment_method( &self, - payment_method_new: storage_types::PaymentMethodNew, + mut payment_method_new: storage_types::PaymentMethodNew, storage_scheme: MerchantStorageScheme, ) -> CustomResult { + let storage_scheme = decide_storage_scheme::<_, storage_types::PaymentMethod>( + self, + storage_scheme, + Op::Insert, + ) + .await; + payment_method_new.update_storage_scheme(storage_scheme); match storage_scheme { MerchantStorageScheme::PostgresOnly => { let conn = connection::pg_connection_write(self).await?; @@ -278,25 +297,35 @@ mod storage { payment_method_update: storage_types::PaymentMethodUpdate, storage_scheme: MerchantStorageScheme, ) -> CustomResult { + let merchant_id = payment_method.merchant_id.clone(); + let customer_id = payment_method.customer_id.clone(); + let key = PartitionKey::MerchantIdCustomerId { + merchant_id: &merchant_id, + customer_id: &customer_id, + }; + let field = format!("payment_method_id_{}", payment_method.payment_method_id); + let storage_scheme = decide_storage_scheme::<_, storage_types::PaymentMethod>( + self, + storage_scheme, + Op::Update(key.clone(), &field, payment_method.updated_by.as_deref()), + ) + .await; match storage_scheme { MerchantStorageScheme::PostgresOnly => { let conn = connection::pg_connection_write(self).await?; payment_method - .update_with_payment_method_id(&conn, payment_method_update.into()) + .update_with_payment_method_id( + &conn, + payment_method_update.convert_to_payment_method_update(storage_scheme), + ) .await .map_err(|error| report!(errors::StorageError::from(error))) } MerchantStorageScheme::RedisKv => { - let merchant_id = payment_method.merchant_id.clone(); - let customer_id = payment_method.customer_id.clone(); - let key = PartitionKey::MerchantIdCustomerId { - merchant_id: &merchant_id, - customer_id: &customer_id, - }; let key_str = key.to_string(); - let field = format!("payment_method_id_{}", payment_method.payment_method_id); - let p_update: PaymentMethodUpdateInternal = payment_method_update.into(); + let p_update: PaymentMethodUpdateInternal = + payment_method_update.convert_to_payment_method_update(storage_scheme); let updated_payment_method = p_update.clone().apply_changeset(payment_method.clone()); @@ -663,6 +692,7 @@ impl PaymentMethodInterface for MockDb { client_secret: payment_method_new.client_secret, network_transaction_id: payment_method_new.network_transaction_id, payment_method_billing_address: payment_method_new.payment_method_billing_address, + updated_by: payment_method_new.updated_by, }; payment_methods.push(payment_method.clone()); Ok(payment_method) diff --git a/crates/router/src/db/refund.rs b/crates/router/src/db/refund.rs index df1130b7af67..3b24dd3960e7 100644 --- a/crates/router/src/db/refund.rs +++ b/crates/router/src/db/refund.rs @@ -275,7 +275,9 @@ mod storage { use error_stack::{report, ResultExt}; use redis_interface::HsetnxReply; use router_env::{instrument, tracing}; - use storage_impl::redis::kv_store::{kv_wrapper, KvOperation, PartitionKey}; + use storage_impl::redis::kv_store::{ + decide_storage_scheme, kv_wrapper, KvOperation, Op, PartitionKey, + }; use super::RefundInterface; use crate::{ @@ -305,6 +307,9 @@ mod storage { .await .map_err(|error| report!(errors::StorageError::from(error))) }; + let storage_scheme = + decide_storage_scheme::<_, storage_types::Refund>(self, storage_scheme, Op::Find) + .await; match storage_scheme { enums::MerchantStorageScheme::PostgresOnly => database_call().await, enums::MerchantStorageScheme::RedisKv => { @@ -341,6 +346,9 @@ mod storage { new: storage_types::RefundNew, storage_scheme: enums::MerchantStorageScheme, ) -> CustomResult { + let storage_scheme = + decide_storage_scheme::<_, storage_types::Refund>(self, storage_scheme, Op::Insert) + .await; match storage_scheme { enums::MerchantStorageScheme::PostgresOnly => { let conn = connection::pg_connection_write(self).await?; @@ -485,6 +493,9 @@ mod storage { .await .map_err(|error| report!(errors::StorageError::from(error))) }; + let storage_scheme = + decide_storage_scheme::<_, storage_types::Refund>(self, storage_scheme, Op::Find) + .await; match storage_scheme { enums::MerchantStorageScheme::PostgresOnly => database_call().await, enums::MerchantStorageScheme::RedisKv => { @@ -526,6 +537,19 @@ mod storage { refund: storage_types::RefundUpdate, storage_scheme: enums::MerchantStorageScheme, ) -> CustomResult { + let merchant_id = this.merchant_id.clone(); + let payment_id = this.payment_id.clone(); + let key = PartitionKey::MerchantIdPaymentId { + merchant_id: &merchant_id, + payment_id: &payment_id, + }; + let field = format!("pa_{}_ref_{}", &this.attempt_id, &this.refund_id); + let storage_scheme = decide_storage_scheme::<_, storage_types::Refund>( + self, + storage_scheme, + Op::Update(key.clone(), &field, Some(&this.updated_by)), + ) + .await; match storage_scheme { enums::MerchantStorageScheme::PostgresOnly => { let conn = connection::pg_connection_write(self).await?; @@ -534,14 +558,7 @@ mod storage { .map_err(|error| report!(errors::StorageError::from(error))) } enums::MerchantStorageScheme::RedisKv => { - let merchant_id = this.merchant_id.clone(); - let payment_id = this.payment_id.clone(); - let key = PartitionKey::MerchantIdPaymentId { - merchant_id: &merchant_id, - payment_id: &payment_id, - }; let key_str = key.to_string(); - let field = format!("pa_{}_ref_{}", &this.attempt_id, &this.refund_id); let updated_refund = refund.clone().apply_changeset(this.clone()); let redis_value = updated_refund @@ -588,6 +605,9 @@ mod storage { .await .map_err(|error| report!(errors::StorageError::from(error))) }; + let storage_scheme = + decide_storage_scheme::<_, storage_types::Refund>(self, storage_scheme, Op::Find) + .await; match storage_scheme { enums::MerchantStorageScheme::PostgresOnly => database_call().await, enums::MerchantStorageScheme::RedisKv => { @@ -637,6 +657,9 @@ mod storage { .await .map_err(|error| report!(errors::StorageError::from(error))) }; + let storage_scheme = + decide_storage_scheme::<_, storage_types::Refund>(self, storage_scheme, Op::Find) + .await; match storage_scheme { enums::MerchantStorageScheme::PostgresOnly => database_call().await, enums::MerchantStorageScheme::RedisKv => { @@ -685,6 +708,9 @@ mod storage { .await .map_err(|error| report!(errors::StorageError::from(error))) }; + let storage_scheme = + decide_storage_scheme::<_, storage_types::Refund>(self, storage_scheme, Op::Find) + .await; match storage_scheme { enums::MerchantStorageScheme::PostgresOnly => database_call().await, enums::MerchantStorageScheme::RedisKv => { diff --git a/crates/router/src/db/reverse_lookup.rs b/crates/router/src/db/reverse_lookup.rs index 121ec06ec1f9..fcf38ca420d0 100644 --- a/crates/router/src/db/reverse_lookup.rs +++ b/crates/router/src/db/reverse_lookup.rs @@ -69,7 +69,9 @@ mod storage { use error_stack::{report, ResultExt}; use redis_interface::SetnxReply; use router_env::{instrument, tracing}; - use storage_impl::redis::kv_store::{kv_wrapper, KvOperation, PartitionKey}; + use storage_impl::redis::kv_store::{ + decide_storage_scheme, kv_wrapper, KvOperation, Op, PartitionKey, + }; use super::{ReverseLookupInterface, Store}; use crate::{ @@ -91,6 +93,8 @@ mod storage { new: ReverseLookupNew, storage_scheme: enums::MerchantStorageScheme, ) -> CustomResult { + let storage_scheme = + decide_storage_scheme::<_, ReverseLookup>(self, storage_scheme, Op::Insert).await; match storage_scheme { enums::MerchantStorageScheme::PostgresOnly => { let conn = connection::pg_connection_write(self).await?; @@ -150,7 +154,8 @@ mod storage { .await .map_err(|error| report!(errors::StorageError::from(error))) }; - + let storage_scheme = + decide_storage_scheme::<_, ReverseLookup>(self, storage_scheme, Op::Find).await; match storage_scheme { enums::MerchantStorageScheme::PostgresOnly => database_call().await, enums::MerchantStorageScheme::RedisKv => { diff --git a/crates/router/src/routes/admin.rs b/crates/router/src/routes/admin.rs index d9cadf002c1b..2d1b77460a5f 100644 --- a/crates/router/src/routes/admin.rs +++ b/crates/router/src/routes/admin.rs @@ -458,6 +458,31 @@ pub async fn merchant_account_toggle_kv( ) .await } + +/// Merchant Account - Toggle KV +/// +/// Toggle KV mode for all Merchant Accounts +#[instrument(skip_all)] +pub async fn merchant_account_toggle_all_kv( + state: web::Data, + req: HttpRequest, + json_payload: web::Json, +) -> HttpResponse { + let flow = Flow::ConfigKeyUpdate; + let payload = json_payload.into_inner(); + + api::server_wrap( + flow, + state, + &req, + payload, + |state, _, payload, _| toggle_kv_for_all_merchants(state, payload.kv_enabled), + &auth::AdminApiAuth, + api_locking::LockAction::NotApplicable, + ) + .await +} + #[instrument(skip_all, fields(flow = ?Flow::BusinessProfileCreate))] pub async fn business_profile_create( state: web::Data, diff --git a/crates/router/src/routes/app.rs b/crates/router/src/routes/app.rs index 21cc994381eb..f4eded24be6b 100644 --- a/crates/router/src/routes/app.rs +++ b/crates/router/src/routes/app.rs @@ -866,6 +866,7 @@ impl MerchantAccount { .route(web::post().to(merchant_account_toggle_kv)) .route(web::get().to(merchant_account_kv_status)), ) + .service(web::resource("/kv").route(web::post().to(merchant_account_toggle_all_kv))) .service( web::resource("/{id}") .route(web::get().to(retrieve_merchant_account)) diff --git a/crates/router/src/services.rs b/crates/router/src/services.rs index 5af325a46f76..6d9d657e1085 100644 --- a/crates/router/src/services.rs +++ b/crates/router/src/services.rs @@ -79,6 +79,7 @@ pub async fn get_store( config.drainer.stream_name.clone(), config.drainer.num_partitions, config.kv_config.ttl, + config.kv_config.soft_kill, ); Ok(store) diff --git a/crates/router/src/types/api/admin.rs b/crates/router/src/types/api/admin.rs index fcbd97d5cb13..8fc3ebb721f2 100644 --- a/crates/router/src/types/api/admin.rs +++ b/crates/router/src/types/api/admin.rs @@ -3,7 +3,8 @@ pub use api_models::admin::{ MerchantAccountDeleteResponse, MerchantAccountResponse, MerchantAccountUpdate, MerchantConnectorCreate, MerchantConnectorDeleteResponse, MerchantConnectorDetails, MerchantConnectorDetailsWrap, MerchantConnectorId, MerchantConnectorResponse, MerchantDetails, - MerchantId, PaymentMethodsEnabled, ToggleKVRequest, ToggleKVResponse, WebhookDetails, + MerchantId, PaymentMethodsEnabled, ToggleAllKVRequest, ToggleAllKVResponse, ToggleKVRequest, + ToggleKVResponse, WebhookDetails, }; use common_utils::ext_traits::{Encode, ValueExt}; use error_stack::ResultExt; diff --git a/crates/router/src/types/domain/customer.rs b/crates/router/src/types/domain/customer.rs index 139cd1057790..0e8ac32b4af3 100644 --- a/crates/router/src/types/domain/customer.rs +++ b/crates/router/src/types/domain/customer.rs @@ -23,6 +23,7 @@ pub struct Customer { pub connector_customer: Option, pub address_id: Option, pub default_payment_method_id: Option, + pub updated_by: Option, } #[async_trait::async_trait] @@ -47,6 +48,7 @@ impl super::behaviour::Conversion for Customer { connector_customer: self.connector_customer, address_id: self.address_id, default_payment_method_id: self.default_payment_method_id, + updated_by: self.updated_by, }) } @@ -75,6 +77,7 @@ impl super::behaviour::Conversion for Customer { connector_customer: item.connector_customer, address_id: item.address_id, default_payment_method_id: item.default_payment_method_id, + updated_by: item.updated_by, }) } .await @@ -98,6 +101,7 @@ impl super::behaviour::Conversion for Customer { modified_at: now, connector_customer: self.connector_customer, address_id: self.address_id, + updated_by: self.updated_by, }) } } diff --git a/crates/storage_impl/src/lib.rs b/crates/storage_impl/src/lib.rs index 7a3ef30bef3a..0963cd2b41e2 100644 --- a/crates/storage_impl/src/lib.rs +++ b/crates/storage_impl/src/lib.rs @@ -151,6 +151,7 @@ pub struct KVRouterStore { drainer_num_partitions: u8, ttl_for_kv: u32, pub request_id: Option, + soft_kill_mode: bool, } #[async_trait::async_trait] @@ -159,14 +160,16 @@ where RouterStore: DatabaseStore, T: DatabaseStore, { - type Config = (RouterStore, String, u8, u32); + type Config = (RouterStore, String, u8, u32, Option); async fn new(config: Self::Config, _test_transaction: bool) -> StorageResult { - let (router_store, drainer_stream_name, drainer_num_partitions, ttl_for_kv) = config; + let (router_store, drainer_stream_name, drainer_num_partitions, ttl_for_kv, soft_kill_mode) = + config; Ok(Self::from_store( router_store, drainer_stream_name, drainer_num_partitions, ttl_for_kv, + soft_kill_mode, )) } fn get_master_pool(&self) -> &PgPool { @@ -191,6 +194,7 @@ impl KVRouterStore { drainer_stream_name: String, drainer_num_partitions: u8, ttl_for_kv: u32, + soft_kill: Option, ) -> Self { let request_id = store.request_id.clone(); @@ -200,6 +204,7 @@ impl KVRouterStore { drainer_num_partitions, ttl_for_kv, request_id, + soft_kill_mode: soft_kill.unwrap_or(false), } } diff --git a/crates/storage_impl/src/lookup.rs b/crates/storage_impl/src/lookup.rs index f8daabf2e9eb..67b8635aba4a 100644 --- a/crates/storage_impl/src/lookup.rs +++ b/crates/storage_impl/src/lookup.rs @@ -12,7 +12,7 @@ use redis_interface::SetnxReply; use crate::{ diesel_error_to_data_error, errors::RedisErrorExt, - redis::kv_store::{kv_wrapper, KvOperation, PartitionKey}, + redis::kv_store::{decide_storage_scheme, kv_wrapper, KvOperation, Op, PartitionKey}, utils::{self, try_redis_get_else_try_database_get}, DatabaseStore, KVRouterStore, RouterStore, }; @@ -71,6 +71,8 @@ impl ReverseLookupInterface for KVRouterStore { new: DieselReverseLookupNew, storage_scheme: storage_enums::MerchantStorageScheme, ) -> CustomResult { + let storage_scheme = + decide_storage_scheme::<_, DieselReverseLookup>(self, storage_scheme, Op::Insert).await; match storage_scheme { storage_enums::MerchantStorageScheme::PostgresOnly => { self.router_store @@ -124,6 +126,8 @@ impl ReverseLookupInterface for KVRouterStore { .get_lookup_by_lookup_id(id, storage_scheme) .await }; + let storage_scheme = + decide_storage_scheme::<_, DieselReverseLookup>(self, storage_scheme, Op::Find).await; match storage_scheme { storage_enums::MerchantStorageScheme::PostgresOnly => database_call().await, storage_enums::MerchantStorageScheme::RedisKv => { diff --git a/crates/storage_impl/src/metrics.rs b/crates/storage_impl/src/metrics.rs index 29bca2a007b7..2f22d578133e 100644 --- a/crates/storage_impl/src/metrics.rs +++ b/crates/storage_impl/src/metrics.rs @@ -10,3 +10,4 @@ counter_metric!(KV_OPERATION_SUCCESSFUL, GLOBAL_METER); counter_metric!(KV_OPERATION_FAILED, GLOBAL_METER); counter_metric!(KV_PUSHED_TO_DRAINER, GLOBAL_METER); counter_metric!(KV_FAILED_TO_PUSH_TO_DRAINER, GLOBAL_METER); +counter_metric!(KV_SOFT_KILL_ACTIVE_UPDATE, GLOBAL_METER); diff --git a/crates/storage_impl/src/payments/payment_attempt.rs b/crates/storage_impl/src/payments/payment_attempt.rs index d090f646da32..88387d7c1afd 100644 --- a/crates/storage_impl/src/payments/payment_attempt.rs +++ b/crates/storage_impl/src/payments/payment_attempt.rs @@ -31,7 +31,7 @@ use crate::{ diesel_error_to_data_error, errors::RedisErrorExt, lookup::ReverseLookupInterface, - redis::kv_store::{kv_wrapper, KvOperation, PartitionKey}, + redis::kv_store::{decide_storage_scheme, kv_wrapper, KvOperation, Op, PartitionKey}, utils::{pg_connection_read, pg_connection_write, try_redis_get_else_try_database_get}, DataModelExt, DatabaseStore, KVRouterStore, RouterStore, }; @@ -333,6 +333,9 @@ impl PaymentAttemptInterface for KVRouterStore { payment_attempt: PaymentAttemptNew, storage_scheme: MerchantStorageScheme, ) -> error_stack::Result { + let storage_scheme = + decide_storage_scheme::<_, DieselPaymentAttempt>(self, storage_scheme, Op::Insert) + .await; match storage_scheme { MerchantStorageScheme::PostgresOnly => { self.router_store @@ -468,6 +471,17 @@ impl PaymentAttemptInterface for KVRouterStore { payment_attempt: PaymentAttemptUpdate, storage_scheme: MerchantStorageScheme, ) -> error_stack::Result { + let key = PartitionKey::MerchantIdPaymentId { + merchant_id: &this.merchant_id, + payment_id: &this.payment_id, + }; + let field = format!("pa_{}", this.attempt_id); + let storage_scheme = decide_storage_scheme::<_, DieselPaymentAttempt>( + self, + storage_scheme, + Op::Update(key.clone(), &field, Some(&this.updated_by)), + ) + .await; match storage_scheme { MerchantStorageScheme::PostgresOnly => { self.router_store @@ -475,10 +489,6 @@ impl PaymentAttemptInterface for KVRouterStore { .await } MerchantStorageScheme::RedisKv => { - let key = PartitionKey::MerchantIdPaymentId { - merchant_id: &this.merchant_id, - payment_id: &this.payment_id, - }; let key_str = key.to_string(); let old_connector_transaction_id = &this.connector_transaction_id; let old_preprocessing_id = &this.preprocessing_step_id; @@ -491,7 +501,6 @@ impl PaymentAttemptInterface for KVRouterStore { // Check for database presence as well Maybe use a read replica here ? let redis_value = serde_json::to_string(&updated_attempt) .change_context(errors::StorageError::KVError)?; - let field = format!("pa_{}", updated_attempt.attempt_id); let redis_entry = kv::TypedSql { op: kv::DBOperation::Update { @@ -586,6 +595,8 @@ impl PaymentAttemptInterface for KVRouterStore { merchant_id: &str, storage_scheme: MerchantStorageScheme, ) -> error_stack::Result { + let storage_scheme = + decide_storage_scheme::<_, DieselPaymentAttempt>(self, storage_scheme, Op::Find).await; match storage_scheme { MerchantStorageScheme::PostgresOnly => { self.router_store @@ -643,6 +654,8 @@ impl PaymentAttemptInterface for KVRouterStore { storage_scheme, ) }; + let storage_scheme = + decide_storage_scheme::<_, DieselPaymentAttempt>(self, storage_scheme, Op::Find).await; match storage_scheme { MerchantStorageScheme::PostgresOnly => database_call().await, MerchantStorageScheme::RedisKv => { @@ -695,6 +708,8 @@ impl PaymentAttemptInterface for KVRouterStore { storage_scheme, ) }; + let storage_scheme = + decide_storage_scheme::<_, DieselPaymentAttempt>(self, storage_scheme, Op::Find).await; match storage_scheme { MerchantStorageScheme::PostgresOnly => database_call().await, MerchantStorageScheme::RedisKv => { @@ -742,6 +757,8 @@ impl PaymentAttemptInterface for KVRouterStore { connector_txn_id: &str, storage_scheme: MerchantStorageScheme, ) -> error_stack::Result { + let storage_scheme = + decide_storage_scheme::<_, DieselPaymentAttempt>(self, storage_scheme, Op::Find).await; match storage_scheme { MerchantStorageScheme::PostgresOnly => { self.router_store @@ -802,6 +819,8 @@ impl PaymentAttemptInterface for KVRouterStore { attempt_id: &str, storage_scheme: MerchantStorageScheme, ) -> error_stack::Result { + let storage_scheme = + decide_storage_scheme::<_, DieselPaymentAttempt>(self, storage_scheme, Op::Find).await; match storage_scheme { MerchantStorageScheme::PostgresOnly => { self.router_store @@ -848,6 +867,8 @@ impl PaymentAttemptInterface for KVRouterStore { merchant_id: &str, storage_scheme: MerchantStorageScheme, ) -> error_stack::Result { + let storage_scheme = + decide_storage_scheme::<_, DieselPaymentAttempt>(self, storage_scheme, Op::Find).await; match storage_scheme { MerchantStorageScheme::PostgresOnly => { self.router_store @@ -907,6 +928,8 @@ impl PaymentAttemptInterface for KVRouterStore { merchant_id: &str, storage_scheme: MerchantStorageScheme, ) -> error_stack::Result { + let storage_scheme = + decide_storage_scheme::<_, DieselPaymentAttempt>(self, storage_scheme, Op::Find).await; match storage_scheme { MerchantStorageScheme::PostgresOnly => { self.router_store @@ -966,6 +989,8 @@ impl PaymentAttemptInterface for KVRouterStore { payment_id: &str, storage_scheme: MerchantStorageScheme, ) -> error_stack::Result, errors::StorageError> { + let storage_scheme = + decide_storage_scheme::<_, DieselPaymentAttempt>(self, storage_scheme, Op::Find).await; match storage_scheme { MerchantStorageScheme::PostgresOnly => { self.router_store diff --git a/crates/storage_impl/src/payments/payment_intent.rs b/crates/storage_impl/src/payments/payment_intent.rs index 2cbcbaaf01e6..ee1fdea64f2c 100644 --- a/crates/storage_impl/src/payments/payment_intent.rs +++ b/crates/storage_impl/src/payments/payment_intent.rs @@ -43,7 +43,7 @@ use crate::connection; use crate::{ diesel_error_to_data_error, errors::RedisErrorExt, - redis::kv_store::{kv_wrapper, KvOperation, PartitionKey}, + redis::kv_store::{decide_storage_scheme, kv_wrapper, KvOperation, Op, PartitionKey}, utils::{self, pg_connection_read, pg_connection_write}, DataModelExt, DatabaseStore, KVRouterStore, }; @@ -55,6 +55,15 @@ impl PaymentIntentInterface for KVRouterStore { new: PaymentIntentNew, storage_scheme: MerchantStorageScheme, ) -> error_stack::Result { + let merchant_id = new.merchant_id.clone(); + let payment_id = new.payment_id.clone(); + let field = format!("pi_{}", new.payment_id); + let key = PartitionKey::MerchantIdPaymentId { + merchant_id: &merchant_id, + payment_id: &payment_id, + }; + let storage_scheme = + decide_storage_scheme::<_, DieselPaymentIntent>(self, storage_scheme, Op::Insert).await; match storage_scheme { MerchantStorageScheme::PostgresOnly => { self.router_store @@ -63,14 +72,7 @@ impl PaymentIntentInterface for KVRouterStore { } MerchantStorageScheme::RedisKv => { - let merchant_id = new.merchant_id.clone(); - let payment_id = new.payment_id.clone(); - let key = PartitionKey::MerchantIdPaymentId { - merchant_id: &merchant_id, - payment_id: &payment_id, - }; let key_str = key.to_string(); - let field = format!("pi_{}", new.payment_id); let created_intent = PaymentIntent { id: 0i32, payment_id: new.payment_id.clone(), @@ -154,6 +156,19 @@ impl PaymentIntentInterface for KVRouterStore { payment_intent_update: PaymentIntentUpdate, storage_scheme: MerchantStorageScheme, ) -> error_stack::Result { + let merchant_id = this.merchant_id.clone(); + let payment_id = this.payment_id.clone(); + let key = PartitionKey::MerchantIdPaymentId { + merchant_id: &merchant_id, + payment_id: &payment_id, + }; + let field = format!("pi_{}", this.payment_id); + let storage_scheme = decide_storage_scheme::<_, DieselPaymentIntent>( + self, + storage_scheme, + Op::Update(key.clone(), &field, Some(&this.updated_by)), + ) + .await; match storage_scheme { MerchantStorageScheme::PostgresOnly => { self.router_store @@ -161,14 +176,7 @@ impl PaymentIntentInterface for KVRouterStore { .await } MerchantStorageScheme::RedisKv => { - let merchant_id = this.merchant_id.clone(); - let payment_id = this.payment_id.clone(); - let key = PartitionKey::MerchantIdPaymentId { - merchant_id: &merchant_id, - payment_id: &payment_id, - }; let key_str = key.to_string(); - let field = format!("pi_{}", this.payment_id); let diesel_intent_update = payment_intent_update.to_storage_model(); let origin_diesel_intent = this.to_storage_model(); @@ -224,6 +232,8 @@ impl PaymentIntentInterface for KVRouterStore { er.change_context(new_err) }) }; + let storage_scheme = + decide_storage_scheme::<_, DieselPaymentIntent>(self, storage_scheme, Op::Find).await; match storage_scheme { MerchantStorageScheme::PostgresOnly => database_call().await, diff --git a/crates/storage_impl/src/payouts/payout_attempt.rs b/crates/storage_impl/src/payouts/payout_attempt.rs index 6a62832e1bf9..91c1669db30f 100644 --- a/crates/storage_impl/src/payouts/payout_attempt.rs +++ b/crates/storage_impl/src/payouts/payout_attempt.rs @@ -29,7 +29,7 @@ use crate::{ diesel_error_to_data_error, errors::RedisErrorExt, lookup::ReverseLookupInterface, - redis::kv_store::{kv_wrapper, KvOperation, PartitionKey}, + redis::kv_store::{decide_storage_scheme, kv_wrapper, KvOperation, Op, PartitionKey}, utils::{self, pg_connection_read, pg_connection_write}, DataModelExt, DatabaseStore, KVRouterStore, }; @@ -43,6 +43,8 @@ impl PayoutAttemptInterface for KVRouterStore { payouts: &Payouts, storage_scheme: MerchantStorageScheme, ) -> error_stack::Result { + let storage_scheme = + decide_storage_scheme::<_, DieselPayoutAttempt>(self, storage_scheme, Op::Insert).await; match storage_scheme { MerchantStorageScheme::PostgresOnly => { self.router_store @@ -136,6 +138,17 @@ impl PayoutAttemptInterface for KVRouterStore { payouts: &Payouts, storage_scheme: MerchantStorageScheme, ) -> error_stack::Result { + let key = PartitionKey::MerchantIdPayoutAttemptId { + merchant_id: &this.merchant_id, + payout_attempt_id: &this.payout_id, + }; + let field = format!("poa_{}", this.payout_attempt_id); + let storage_scheme = decide_storage_scheme::<_, DieselPayoutAttempt>( + self, + storage_scheme, + Op::Update(key.clone(), &field, None), + ) + .await; match storage_scheme { MerchantStorageScheme::PostgresOnly => { self.router_store @@ -143,12 +156,7 @@ impl PayoutAttemptInterface for KVRouterStore { .await } MerchantStorageScheme::RedisKv => { - let key = PartitionKey::MerchantIdPayoutAttemptId { - merchant_id: &this.merchant_id, - payout_attempt_id: &this.payout_id, - }; let key_str = key.to_string(); - let field = format!("poa_{}", this.payout_attempt_id); let diesel_payout_update = payout_update.to_storage_model(); let origin_diesel_payout = this.clone().to_storage_model(); @@ -195,6 +203,8 @@ impl PayoutAttemptInterface for KVRouterStore { payout_attempt_id: &str, storage_scheme: MerchantStorageScheme, ) -> error_stack::Result { + let storage_scheme = + decide_storage_scheme::<_, DieselPayoutAttempt>(self, storage_scheme, Op::Find).await; match storage_scheme { MerchantStorageScheme::PostgresOnly => { self.router_store diff --git a/crates/storage_impl/src/payouts/payouts.rs b/crates/storage_impl/src/payouts/payouts.rs index 19921121a94d..92835dee59cf 100644 --- a/crates/storage_impl/src/payouts/payouts.rs +++ b/crates/storage_impl/src/payouts/payouts.rs @@ -38,7 +38,7 @@ use crate::connection; use crate::{ diesel_error_to_data_error, errors::RedisErrorExt, - redis::kv_store::{kv_wrapper, KvOperation, PartitionKey}, + redis::kv_store::{decide_storage_scheme, kv_wrapper, KvOperation, Op, PartitionKey}, utils::{self, pg_connection_read, pg_connection_write}, DataModelExt, DatabaseStore, KVRouterStore, }; @@ -51,6 +51,8 @@ impl PayoutsInterface for KVRouterStore { new: PayoutsNew, storage_scheme: MerchantStorageScheme, ) -> error_stack::Result { + let storage_scheme = + decide_storage_scheme::<_, DieselPayouts>(self, storage_scheme, Op::Insert).await; match storage_scheme { MerchantStorageScheme::PostgresOnly => { self.router_store.insert_payout(new, storage_scheme).await @@ -128,6 +130,17 @@ impl PayoutsInterface for KVRouterStore { payout_attempt: &PayoutAttempt, storage_scheme: MerchantStorageScheme, ) -> error_stack::Result { + let key = PartitionKey::MerchantIdPayoutId { + merchant_id: &this.merchant_id, + payout_id: &this.payout_id, + }; + let field = format!("po_{}", this.payout_id); + let storage_scheme = decide_storage_scheme::<_, DieselPayouts>( + self, + storage_scheme, + Op::Update(key.clone(), &field, None), + ) + .await; match storage_scheme { MerchantStorageScheme::PostgresOnly => { self.router_store @@ -135,12 +148,7 @@ impl PayoutsInterface for KVRouterStore { .await } MerchantStorageScheme::RedisKv => { - let key = PartitionKey::MerchantIdPayoutId { - merchant_id: &this.merchant_id, - payout_id: &this.payout_id, - }; let key_str = key.to_string(); - let field = format!("po_{}", this.payout_id); let diesel_payout_update = payout_update.to_storage_model(); let origin_diesel_payout = this.clone().to_storage_model(); @@ -194,6 +202,8 @@ impl PayoutsInterface for KVRouterStore { er.change_context(new_err) }) }; + let storage_scheme = + decide_storage_scheme::<_, DieselPayouts>(self, storage_scheme, Op::Find).await; match storage_scheme { MerchantStorageScheme::PostgresOnly => database_call().await, MerchantStorageScheme::RedisKv => { @@ -236,6 +246,8 @@ impl PayoutsInterface for KVRouterStore { er.change_context(new_err) }) }; + let storage_scheme = + decide_storage_scheme::<_, DieselPayouts>(self, storage_scheme, Op::Find).await; match storage_scheme { MerchantStorageScheme::PostgresOnly => { let maybe_payouts = database_call().await?; diff --git a/crates/storage_impl/src/redis/kv_store.rs b/crates/storage_impl/src/redis/kv_store.rs index 7853763f32f9..c39e67eb841b 100644 --- a/crates/storage_impl/src/redis/kv_store.rs +++ b/crates/storage_impl/src/redis/kv_store.rs @@ -1,6 +1,7 @@ use std::{fmt::Debug, sync::Arc}; use common_utils::errors::CustomResult; +use diesel_models::enums::MerchantStorageScheme; use error_stack::report; use redis_interface::errors::RedisError; use router_derive::TryGetEnumVariant; @@ -20,6 +21,7 @@ pub trait KvStorePartition { } #[allow(unused)] +#[derive(Clone)] pub enum PartitionKey<'a> { MerchantIdPaymentId { merchant_id: &'a str, @@ -235,3 +237,65 @@ where err }) } + +pub enum Op<'a> { + Insert, + Update(PartitionKey<'a>, &'a str, Option<&'a str>), + Find, +} + +impl<'a> std::fmt::Display for Op<'a> { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + Op::Insert => f.write_str("insert"), + Op::Find => f.write_str("find"), + Op::Update(p_key, _, updated_by) => { + f.write_str(&format!("update_{} for updated_by_{:?}", p_key, updated_by)) + } + } + } +} + +pub async fn decide_storage_scheme<'a, T, D>( + store: &KVRouterStore, + storage_scheme: MerchantStorageScheme, + operation: Op<'a>, +) -> MerchantStorageScheme +where + D: de::DeserializeOwned + + serde::Serialize + + Debug + + KvStorePartition + + UniqueConstraints + + Sync, + T: crate::database::store::DatabaseStore, +{ + if store.soft_kill_mode { + let ops = operation.to_string(); + let updated_scheme = match operation { + Op::Insert => MerchantStorageScheme::PostgresOnly, + Op::Find => MerchantStorageScheme::RedisKv, + Op::Update(_, _, Some("postgres_only")) => MerchantStorageScheme::PostgresOnly, + Op::Update(partition_key, field, Some(_updated_by)) => { + match kv_wrapper::(store, KvOperation::::HGet(field), partition_key) + .await + { + Ok(_) => { + metrics::KV_SOFT_KILL_ACTIVE_UPDATE.add(&metrics::CONTEXT, 1, &[]); + MerchantStorageScheme::RedisKv + } + Err(_) => MerchantStorageScheme::PostgresOnly, + } + } + + Op::Update(_, _, None) => MerchantStorageScheme::PostgresOnly, + }; + + let type_name = std::any::type_name::(); + logger::info!(soft_kill_mode = "decide_storage_scheme", decided_scheme = %updated_scheme, configured_scheme = %storage_scheme,entity = %type_name, operation = %ops); + + updated_scheme + } else { + storage_scheme + } +} diff --git a/migrations/2024-05-14-092623_add_updated_by_column/down.sql b/migrations/2024-05-14-092623_add_updated_by_column/down.sql new file mode 100644 index 000000000000..436a57dade3f --- /dev/null +++ b/migrations/2024-05-14-092623_add_updated_by_column/down.sql @@ -0,0 +1,6 @@ +-- This file should undo anything in `up.sql` +ALTER TABLE payment_methods DROP COLUMN IF EXISTS updated_by; + +ALTER TABLE mandate DROP COLUMN IF EXISTS updated_by; + +ALTER TABLE customers DROP COLUMN IF EXISTS updated_by; \ No newline at end of file diff --git a/migrations/2024-05-14-092623_add_updated_by_column/up.sql b/migrations/2024-05-14-092623_add_updated_by_column/up.sql new file mode 100644 index 000000000000..869f52cbfe9a --- /dev/null +++ b/migrations/2024-05-14-092623_add_updated_by_column/up.sql @@ -0,0 +1,6 @@ +-- Your SQL goes here +ALTER TABLE payment_methods ADD COLUMN IF NOT EXISTS updated_by VARCHAR(64); + +ALTER TABLE mandate ADD COLUMN IF NOT EXISTS updated_by VARCHAR(64); + +ALTER TABLE customers ADD COLUMN IF NOT EXISTS updated_by VARCHAR(64); \ No newline at end of file