diff --git a/crates/diesel_models/src/query/merchant_account.rs b/crates/diesel_models/src/query/merchant_account.rs index ef9a4165d6..ba20f2e366 100644 --- a/crates/diesel_models/src/query/merchant_account.rs +++ b/crates/diesel_models/src/query/merchant_account.rs @@ -110,4 +110,24 @@ impl MerchantAccount { ) .await } + + #[instrument(skip_all)] + pub async fn list_multiple_merchant_accounts( + conn: &PgPooledConn, + merchant_ids: Vec, + ) -> StorageResult> { + generics::generic_filter::< + ::Table, + _, + <::Table as Table>::PrimaryKey, + _, + >( + conn, + dsl::merchant_id.eq_any(merchant_ids), + None, + None, + None, + ) + .await + } } diff --git a/crates/diesel_models/src/query/merchant_key_store.rs b/crates/diesel_models/src/query/merchant_key_store.rs index 27ec3be9fc..0e2ec1ddad 100644 --- a/crates/diesel_models/src/query/merchant_key_store.rs +++ b/crates/diesel_models/src/query/merchant_key_store.rs @@ -39,4 +39,24 @@ impl MerchantKeyStore { ) .await } + + #[instrument(skip(conn))] + pub async fn list_multiple_key_stores( + conn: &PgPooledConn, + merchant_ids: Vec, + ) -> StorageResult> { + generics::generic_filter::< + ::Table, + _, + <::Table as diesel::Table>::PrimaryKey, + _, + >( + conn, + dsl::merchant_id.eq_any(merchant_ids), + None, + None, + None, + ) + .await + } } diff --git a/crates/router/src/db/kafka_store.rs b/crates/router/src/db/kafka_store.rs index 1184992a8f..19a83088a0 100644 --- a/crates/router/src/db/kafka_store.rs +++ b/crates/router/src/db/kafka_store.rs @@ -662,6 +662,16 @@ impl MerchantAccountInterface for KafkaStore { .delete_merchant_account_by_merchant_id(merchant_id) .await } + + #[cfg(feature = "olap")] + async fn list_multiple_merchant_accounts( + &self, + merchant_ids: Vec, + ) -> CustomResult, errors::StorageError> { + self.diesel_store + .list_multiple_merchant_accounts(merchant_ids) + .await + } } #[async_trait::async_trait] @@ -1615,6 +1625,17 @@ impl MerchantKeyStoreInterface for KafkaStore { .delete_merchant_key_store_by_merchant_id(merchant_id) .await } + + #[cfg(feature = "olap")] + async fn list_multiple_key_stores( + &self, + merchant_ids: Vec, + key: &Secret>, + ) -> CustomResult, errors::StorageError> { + self.diesel_store + .list_multiple_key_stores(merchant_ids, key) + .await + } } #[async_trait::async_trait] diff --git a/crates/router/src/db/merchant_account.rs b/crates/router/src/db/merchant_account.rs index 0d3ce99b94..70d417c036 100644 --- a/crates/router/src/db/merchant_account.rs +++ b/crates/router/src/db/merchant_account.rs @@ -1,3 +1,6 @@ +#[cfg(feature = "olap")] +use std::collections::HashMap; + use common_utils::ext_traits::AsyncExt; use error_stack::{IntoReport, ResultExt}; #[cfg(feature = "accounts_cache")] @@ -65,6 +68,12 @@ where &self, merchant_id: &str, ) -> CustomResult; + + #[cfg(feature = "olap")] + async fn list_multiple_merchant_accounts( + &self, + merchant_ids: Vec, + ) -> CustomResult, errors::StorageError>; } #[async_trait::async_trait] @@ -294,6 +303,57 @@ impl MerchantAccountInterface for Store { Ok(is_deleted) } + + #[cfg(feature = "olap")] + async fn list_multiple_merchant_accounts( + &self, + merchant_ids: Vec, + ) -> CustomResult, errors::StorageError> { + let conn = connection::pg_connection_read(self).await?; + + let encrypted_merchant_accounts = + storage::MerchantAccount::list_multiple_merchant_accounts(&conn, merchant_ids) + .await + .map_err(Into::into) + .into_report()?; + + let db_master_key = self.get_master_key().to_vec().into(); + + let merchant_key_stores = self + .list_multiple_key_stores( + encrypted_merchant_accounts + .iter() + .map(|merchant_account| &merchant_account.merchant_id) + .cloned() + .collect(), + &db_master_key, + ) + .await?; + + let key_stores_by_id: HashMap<_, _> = merchant_key_stores + .iter() + .map(|key_store| (key_store.merchant_id.to_owned(), key_store)) + .collect(); + + let merchant_accounts = + futures::future::try_join_all(encrypted_merchant_accounts.into_iter().map( + |merchant_account| async { + let key_store = key_stores_by_id.get(&merchant_account.merchant_id).ok_or( + errors::StorageError::ValueNotFound(format!( + "merchant_key_store with merchant_id = {}", + merchant_account.merchant_id + )), + )?; + merchant_account + .convert(key_store.key.get_inner()) + .await + .change_context(errors::StorageError::DecryptionError) + }, + )) + .await?; + + Ok(merchant_accounts) + } } #[async_trait::async_trait] @@ -392,6 +452,14 @@ impl MerchantAccountInterface for MockDb { ) -> CustomResult, errors::StorageError> { Err(errors::StorageError::MockDbError)? } + + #[cfg(feature = "olap")] + async fn list_multiple_merchant_accounts( + &self, + _merchant_ids: Vec, + ) -> CustomResult, errors::StorageError> { + Err(errors::StorageError::MockDbError)? + } } #[cfg(feature = "accounts_cache")] diff --git a/crates/router/src/db/merchant_key_store.rs b/crates/router/src/db/merchant_key_store.rs index 970e2b7703..630b833afd 100644 --- a/crates/router/src/db/merchant_key_store.rs +++ b/crates/router/src/db/merchant_key_store.rs @@ -32,6 +32,13 @@ pub trait MerchantKeyStoreInterface { &self, merchant_id: &str, ) -> CustomResult; + + #[cfg(feature = "olap")] + async fn list_multiple_key_stores( + &self, + merchant_ids: Vec, + key: &Secret>, + ) -> CustomResult, errors::StorageError>; } #[async_trait::async_trait] @@ -128,6 +135,33 @@ impl MerchantKeyStoreInterface for Store { .await } } + + #[cfg(feature = "olap")] + async fn list_multiple_key_stores( + &self, + merchant_ids: Vec, + key: &Secret>, + ) -> CustomResult, errors::StorageError> { + let fetch_func = || async { + let conn = connection::pg_connection_read(self).await?; + + diesel_models::merchant_key_store::MerchantKeyStore::list_multiple_key_stores( + &conn, + merchant_ids, + ) + .await + .map_err(Into::into) + .into_report() + }; + + futures::future::try_join_all(fetch_func().await?.into_iter().map(|key_store| async { + key_store + .convert(key) + .await + .change_context(errors::StorageError::DecryptionError) + })) + .await + } } #[async_trait::async_trait] @@ -194,6 +228,28 @@ impl MerchantKeyStoreInterface for MockDb { merchant_key_stores.remove(index); Ok(true) } + + #[cfg(feature = "olap")] + async fn list_multiple_key_stores( + &self, + merchant_ids: Vec, + key: &Secret>, + ) -> CustomResult, errors::StorageError> { + let merchant_key_stores = self.merchant_key_store.lock().await; + futures::future::try_join_all( + merchant_key_stores + .iter() + .filter(|merchant_key| merchant_ids.contains(&merchant_key.merchant_id)) + .map(|merchant_key| async { + merchant_key + .to_owned() + .convert(key) + .await + .change_context(errors::StorageError::DecryptionError) + }), + ) + .await + } } #[cfg(test)]