Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(merchant_account): Add list multiple merchants in MerchantAccountInterface #3220

Merged
merged 10 commits into from
Jan 5, 2024
20 changes: 20 additions & 0 deletions crates/diesel_models/src/query/merchant_account.rs
Original file line number Diff line number Diff line change
Expand Up @@ -110,4 +110,24 @@ impl MerchantAccount {
)
.await
}

#[instrument(skip_all)]
pub async fn list_multiple_merchant_accounts(
conn: &PgPooledConn,
merchant_ids: Vec<String>,
) -> StorageResult<Vec<Self>> {
generics::generic_filter::<
<Self as HasTable>::Table,
_,
<<Self as HasTable>::Table as Table>::PrimaryKey,
_,
>(
conn,
dsl::merchant_id.eq_any(merchant_ids),
None,
None,
None,
)
.await
}
}
20 changes: 20 additions & 0 deletions crates/diesel_models/src/query/merchant_key_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,4 +39,24 @@ impl MerchantKeyStore {
)
.await
}

#[instrument(skip(conn))]
pub async fn list_multiple_key_stores(
conn: &PgPooledConn,
merchant_ids: Vec<String>,
) -> StorageResult<Vec<Self>> {
generics::generic_filter::<
<Self as HasTable>::Table,
_,
<<Self as HasTable>::Table as diesel::Table>::PrimaryKey,
_,
>(
conn,
dsl::merchant_id.eq_any(merchant_ids),
None,
None,
None,
)
.await
}
}
21 changes: 21 additions & 0 deletions crates/router/src/db/kafka_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -662,6 +662,16 @@ impl MerchantAccountInterface for KafkaStore {
.delete_merchant_account_by_merchant_id(merchant_id)
.await
}

#[cfg(feature = "olap")]
async fn list_multiple_merchant_accounts(
&self,
merchant_ids: Vec<String>,
) -> CustomResult<Vec<domain::MerchantAccount>, errors::StorageError> {
self.diesel_store
.list_multiple_merchant_accounts(merchant_ids)
.await
}
}

#[async_trait::async_trait]
Expand Down Expand Up @@ -1615,6 +1625,17 @@ impl MerchantKeyStoreInterface for KafkaStore {
.delete_merchant_key_store_by_merchant_id(merchant_id)
.await
}

#[cfg(feature = "olap")]
async fn list_multiple_key_stores(
&self,
merchant_ids: Vec<String>,
key: &Secret<Vec<u8>>,
) -> CustomResult<Vec<domain::MerchantKeyStore>, errors::StorageError> {
self.diesel_store
.list_multiple_key_stores(merchant_ids, key)
.await
}
}

#[async_trait::async_trait]
Expand Down
68 changes: 68 additions & 0 deletions crates/router/src/db/merchant_account.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
#[cfg(feature = "olap")]
use std::collections::HashMap;

use common_utils::ext_traits::AsyncExt;
use error_stack::{IntoReport, ResultExt};
#[cfg(feature = "accounts_cache")]
Expand Down Expand Up @@ -65,6 +68,12 @@ where
&self,
merchant_id: &str,
) -> CustomResult<bool, errors::StorageError>;

#[cfg(feature = "olap")]
async fn list_multiple_merchant_accounts(
&self,
merchant_ids: Vec<String>,
) -> CustomResult<Vec<domain::MerchantAccount>, errors::StorageError>;
}

#[async_trait::async_trait]
Expand Down Expand Up @@ -294,6 +303,57 @@ impl MerchantAccountInterface for Store {

Ok(is_deleted)
}

#[cfg(feature = "olap")]
async fn list_multiple_merchant_accounts(
&self,
merchant_ids: Vec<String>,
) -> CustomResult<Vec<domain::MerchantAccount>, errors::StorageError> {
let conn = connection::pg_connection_read(self).await?;

let encrypted_merchant_accounts =
storage::MerchantAccount::list_multiple_merchant_accounts(&conn, merchant_ids)
.await
.map_err(Into::into)
.into_report()?;

let db_master_key = self.get_master_key().to_vec().into();

let merchant_key_stores = self
.list_multiple_key_stores(
encrypted_merchant_accounts
.iter()
.map(|merchant_account| &merchant_account.merchant_id)
.cloned()
.collect(),
&db_master_key,
)
.await?;

let key_stores_by_id: HashMap<_, _> = merchant_key_stores
.iter()
.map(|key_store| (key_store.merchant_id.to_owned(), key_store))
.collect();

let merchant_accounts =
futures::future::try_join_all(encrypted_merchant_accounts.into_iter().map(
|merchant_account| async {
let key_store = key_stores_by_id.get(&merchant_account.merchant_id).ok_or(
errors::StorageError::ValueNotFound(format!(
"merchant_key_store with merchant_id = {}",
merchant_account.merchant_id
)),
)?;
merchant_account
.convert(key_store.key.get_inner())
.await
.change_context(errors::StorageError::DecryptionError)
},
))
.await?;

Ok(merchant_accounts)
}
}

#[async_trait::async_trait]
Expand Down Expand Up @@ -392,6 +452,14 @@ impl MerchantAccountInterface for MockDb {
) -> CustomResult<Vec<domain::MerchantAccount>, errors::StorageError> {
Err(errors::StorageError::MockDbError)?
}

#[cfg(feature = "olap")]
async fn list_multiple_merchant_accounts(
&self,
_merchant_ids: Vec<String>,
) -> CustomResult<Vec<domain::MerchantAccount>, errors::StorageError> {
Err(errors::StorageError::MockDbError)?
}
}

#[cfg(feature = "accounts_cache")]
Expand Down
56 changes: 56 additions & 0 deletions crates/router/src/db/merchant_key_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,13 @@ pub trait MerchantKeyStoreInterface {
&self,
merchant_id: &str,
) -> CustomResult<bool, errors::StorageError>;

#[cfg(feature = "olap")]
async fn list_multiple_key_stores(
&self,
merchant_ids: Vec<String>,
key: &Secret<Vec<u8>>,
) -> CustomResult<Vec<domain::MerchantKeyStore>, errors::StorageError>;
}

#[async_trait::async_trait]
Expand Down Expand Up @@ -128,6 +135,33 @@ impl MerchantKeyStoreInterface for Store {
.await
}
}

#[cfg(feature = "olap")]
async fn list_multiple_key_stores(
&self,
merchant_ids: Vec<String>,
key: &Secret<Vec<u8>>,
) -> CustomResult<Vec<domain::MerchantKeyStore>, errors::StorageError> {
let fetch_func = || async {
let conn = connection::pg_connection_read(self).await?;

diesel_models::merchant_key_store::MerchantKeyStore::list_multiple_key_stores(
&conn,
merchant_ids,
)
.await
.map_err(Into::into)
.into_report()
};

futures::future::try_join_all(fetch_func().await?.into_iter().map(|key_store| async {
key_store
.convert(key)
.await
.change_context(errors::StorageError::DecryptionError)
}))
.await
}
}

#[async_trait::async_trait]
Expand Down Expand Up @@ -194,6 +228,28 @@ impl MerchantKeyStoreInterface for MockDb {
merchant_key_stores.remove(index);
Ok(true)
}

#[cfg(feature = "olap")]
async fn list_multiple_key_stores(
&self,
merchant_ids: Vec<String>,
key: &Secret<Vec<u8>>,
) -> CustomResult<Vec<domain::MerchantKeyStore>, errors::StorageError> {
let merchant_key_stores = self.merchant_key_store.lock().await;
futures::future::try_join_all(
merchant_key_stores
.iter()
.filter(|merchant_key| merchant_ids.contains(&merchant_key.merchant_id))
.map(|merchant_key| async {
merchant_key
.to_owned()
.convert(key)
.await
.change_context(errors::StorageError::DecryptionError)
}),
)
.await
}
}

#[cfg(test)]
Expand Down
Loading