Skip to content

Commit

Permalink
Merge pull request RedHatInsights#1026 from MikelAlejoBR/RHCLOUD-3092…
Browse files Browse the repository at this point in the history
…6-feature-tests-request-service-accounts

RHCLOUD-30926 | feature: tests for the IT Service class
  • Loading branch information
petracihalova authored Mar 6, 2024
2 parents 6b9c805 + 441da53 commit a8f22a2
Show file tree
Hide file tree
Showing 2 changed files with 1,030 additions and 23 deletions.
54 changes: 35 additions & 19 deletions rbac/management/principal/it_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
import logging
import time
import uuid
from typing import Optional, Tuple, Union
from typing import Any, Optional, Tuple, Union

import requests
from django.conf import settings
Expand Down Expand Up @@ -105,6 +105,7 @@ def request_service_accounts(self, bearer_token: str, client_ids: Optional[list[

# Attempt fetching all the service accounts for the tenant.
try:
# Define some sane initial values.
offset = 0
limit = 100

Expand All @@ -115,7 +116,15 @@ def request_service_accounts(self, bearer_token: str, client_ids: Optional[list[
if client_ids:
parameters["clientId"] = client_ids

while offset == 0 or offset == limit:
continue_fetching: bool = True
while continue_fetching:
# Recreate the parameters dictionary every time since otherwise the "assert_has_calls" statement of the
# tests only sees the last value for the offset when attempting to fetch multiple pages.
parameters = {"first": offset, "max": limit}
if client_ids:
parameters["clientId"] = client_ids

# Call IT.
response = requests.get(
url=self.it_url,
headers={"Authorization": f"Bearer {bearer_token}"},
Expand All @@ -140,14 +149,15 @@ def request_service_accounts(self, bearer_token: str, client_ids: Optional[list[
# Extract the body contents.
body_contents = response.json()

# Recalculate the offset to decide whether to get more service accounts or not. If the offset is zero,
# it means that there were no service accounts in IT for the tenant.
offset = offset + len(body_contents)
if offset == 0:
break

# Merge the previously received service accounts with the new ones.
received_service_accounts = received_service_accounts + body_contents

# Reassess if we need to keep fetching pages from IT. They don't return page metadata, so we need to
# keep looping until the incoming body is an empty array.
continue_fetching = limit == len(body_contents)
if continue_fetching:
offset = offset + len(body_contents)

except requests.exceptions.ConnectionError as exception:
LOGGER.error(
"Unable to connect to IT to fetch the service accounts. Attempted URL %s with error: %s",
Expand All @@ -158,7 +168,7 @@ def request_service_accounts(self, bearer_token: str, client_ids: Optional[list[
# Increment the error count.
it_request_error.labels(error="connection-error").inc()

# Raise the exception again to return a proper response to the client
# Raise the exception again to return a proper response to the client.
raise exception
except requests.exceptions.Timeout as exception:
LOGGER.error(
Expand All @@ -170,6 +180,9 @@ def request_service_accounts(self, bearer_token: str, client_ids: Optional[list[
# Increment the error count.
it_request_error.labels(error="timeout").inc()

# Raise the exception again to return a proper response to the client.
raise exception

# Transform the incoming payload into our model's service accounts.
service_accounts: list[dict] = []
for incoming_service_account in received_service_accounts:
Expand Down Expand Up @@ -214,7 +227,7 @@ def _is_service_account_valid(self, user: User, client_id: str) -> bool:

return False

def get_service_accounts(self, user: User, options: dict = {}) -> Tuple[list[dict], int]:
def get_service_accounts(self, user: User, options: dict[str, Any] = {}) -> Tuple[list[dict], int]:
"""Request and returns the service accounts for the given tenant."""
# We might want to bypass calls to the IT service on ephemeral or test environments.
it_service_accounts: list[dict] = []
Expand Down Expand Up @@ -275,7 +288,7 @@ def get_service_accounts(self, user: User, options: dict = {}) -> Tuple[list[dic
)

# Put the service accounts in a dict by for a quicker search.
sap_dict: dict[str, dict] = {}
sap_dict: dict[str, Principal] = {}
for sap in service_account_principals:
sap_dict[sap.service_account_id] = sap

Expand All @@ -297,10 +310,10 @@ def get_service_accounts(self, user: User, options: dict = {}) -> Tuple[list[dic

return service_accounts, count

def get_service_accounts_group(self, group: Group, user: User, options: dict = {}) -> list[dict]:
def get_service_accounts_group(self, group: Group, user: User, options: dict[str, Any] = {}) -> list[dict]:
"""Get the service accounts for the given group."""
# We might want to bypass calls to the IT service on ephemeral or test environments.
it_service_accounts: list[dict] = []
it_service_accounts: list[dict[str, Union[str, int]]] = []
if not settings.IT_BYPASS_IT_CALLS:
it_service_accounts = self.request_service_accounts(bearer_token=user.bearer_token)

Expand Down Expand Up @@ -333,7 +346,7 @@ def get_service_accounts_group(self, group: Group, user: User, options: dict = {
)

# Put the service accounts in a dict by for a quicker search.
sap_dict: dict[str, dict] = {}
sap_dict: dict[str, Principal] = {}
for sap in group_service_account_principals:
sap_dict[sap.service_account_id] = sap

Expand Down Expand Up @@ -420,9 +433,9 @@ def generate_service_accounts_report_in_group(self, group: Group, client_ids: se

return result

def _transform_incoming_payload(self, service_account_from_it_service: dict) -> dict:
def _transform_incoming_payload(self, service_account_from_it_service: dict) -> dict[str, Any]:
"""Transform the incoming service account from IT into a dict which fits our response structure."""
service_account: dict = {}
service_account: dict[str, Any] = {}

client_id = service_account_from_it_service.get("clientId")
name = service_account_from_it_service.get("name")
Expand Down Expand Up @@ -451,9 +464,12 @@ def _transform_incoming_payload(self, service_account_from_it_service: dict) ->
return service_account

def _merge_principals_it_service_accounts(
self, service_account_principals: dict[str, dict], it_service_accounts: list[dict], options: dict
self, service_account_principals: dict[str, Principal], it_service_accounts: list[dict], options: dict
) -> list[dict]:
"""Merge the database principals with the service account principals and return the response payload."""
"""Merge the database principals with the service account principals and return the response payload.
We only return the service accounts which we have references for in our database.
"""
service_accounts: list[dict] = []

# If the "username_only" parameter was set, we should only return that for the user.
Expand All @@ -471,7 +487,7 @@ def _merge_principals_it_service_accounts(

service_accounts.append(it_service_account)
# If we cannot find a requested service account to IT in the database, we simply
# skip them.
# skip it.
except KeyError:
continue

Expand Down
Loading

0 comments on commit a8f22a2

Please sign in to comment.