Skip to content

Commit

Permalink
Merge pull request RedHatInsights#1153 from petracihalova/group_princ…
Browse files Browse the repository at this point in the history
…ipals_username_only

[RHCLOUD-33738] Group principals username only
  • Loading branch information
petracihalova authored Aug 9, 2024
2 parents 55b6bd8 + 4b24346 commit d045b40
Show file tree
Hide file tree
Showing 4 changed files with 86 additions and 16 deletions.
18 changes: 12 additions & 6 deletions rbac/management/group/view.py
Original file line number Diff line number Diff line change
Expand Up @@ -787,13 +787,14 @@ def principals(self, request: Request, uuid: Optional[UUID] = None):
# Get the "principal username" parameter.
options[PRINCIPAL_USERNAME_KEY] = request.query_params.get(PRINCIPAL_USERNAME_KEY)

# Validate the token only if username_only is false (default value)
if username_only == "false":
token_validator = ITSSOTokenValidator()
request.user.bearer_token = token_validator.validate_token(
request=request,
additional_scopes_to_validate=set[ScopeClaims]([ScopeClaims.SERVICE_ACCOUNTS_CLAIM]),
)
# Fetch the group's service accounts.
token_validator = ITSSOTokenValidator()
request.user.bearer_token = token_validator.validate_token(
request=request,
additional_scopes_to_validate=set[ScopeClaims]([ScopeClaims.SERVICE_ACCOUNTS_CLAIM]),
)

it_service = ITService()
try:
service_accounts = it_service.get_service_accounts_group(
Expand All @@ -813,6 +814,11 @@ def principals(self, request: Request, uuid: Optional[UUID] = None):
},
)

if username_only == "true":
resp = Response(status=200, data=service_accounts)
page = self.paginate_queryset(resp.data)
return self.get_paginated_response(page)

# Prettify the output payload and return it.
page = self.paginate_queryset(service_accounts)
serializer = ServiceAccountSerializer(page, many=True)
Expand Down
21 changes: 14 additions & 7 deletions rbac/management/principal/it_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -312,9 +312,12 @@ def get_service_accounts(self, user: User, options: dict[str, Any] = {}) -> Tupl

def get_service_accounts_group(self, group: Group, user: User, options: dict[str, Any] = {}) -> list[dict]:
"""Get the service accounts for the given group."""
# We might want to bypass calls to the IT service on ephemeral or test environments.
username_only: str = options.get("username_only", "false")
# We might want to bypass calls to the IT service
# - on ephemeral or test environments
# - when query param username_only == 'true'
it_service_accounts: list[dict[str, Union[str, int]]] = []
if not settings.IT_BYPASS_IT_CALLS:
if not settings.IT_BYPASS_IT_CALLS and username_only == "false":
it_service_accounts = self.request_service_accounts(bearer_token=user.bearer_token)

# Fetch the service accounts from the group.
Expand Down Expand Up @@ -357,11 +360,15 @@ def get_service_accounts_group(self, group: Group, user: User, options: dict[str
for sap in group_service_account_principals:
sap_dict[sap.service_account_id] = sap

# Filter the incoming service accounts. Also, transform them to the payload we will
# be returning.
service_accounts: list[dict] = self._merge_principals_it_service_accounts(
service_account_principals=sap_dict, it_service_accounts=it_service_accounts, options=options
)
service_accounts: list[dict] = []
if username_only == "true":
# Grab the service account usernames
service_accounts = [{"username": sa.username} for sa in group_service_account_principals]
else:
# Filter the incoming service accounts. Also, transform them to the payload we will be returning.
service_accounts = self._merge_principals_it_service_accounts(
service_account_principals=sap_dict, it_service_accounts=it_service_accounts, options=options
)

# If either the description or name filters were specified, we need to only keep the service accounts that
# match that criteria.
Expand Down
24 changes: 23 additions & 1 deletion tests/management/group/test_view.py
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ def setUp(self):
self.service_accounts = []
for uuid in self.sa_client_ids:
principal = Principal(
username="service_account-" + uuid,
username="service-account-" + uuid,
tenant=self.tenant,
type="service-account",
service_account_id=uuid,
Expand Down Expand Up @@ -2158,6 +2158,28 @@ def test_get_group_service_account_success(self, mock_request):
self.assertEqual(sa.get("type"), "service-account")
self.assertEqual(sa.get("username"), mock_sa["username"])

def test_get_group_service_account_username_only_success(self):
"""
Test that getting the service account usernames from a group returns successfully
with 'username_only' query parameter.
"""
# We expected only usernames from RBAC database
expected_usernames = [f"service-account-{sa}" for sa in self.sa_client_ids]

sa_type_param = "principal_type=service-account"
username_only_param = "username_only=true"
url = f"{reverse('group-principals', kwargs={'uuid': self.group.uuid})}?{sa_type_param}&{username_only_param}"
client = APIClient()
response = client.get(url, **self.headers)

self.assertEqual(response.status_code, status.HTTP_200_OK)
self.assertIsInstance(response.data.get("data"), list)
self.assertEqual(int(response.data.get("meta").get("count")), 3)
self.assertEqual(len(response.data.get("data")), 3)

for sa in response.data.get("data"):
self.assertIn(sa["username"], expected_usernames)

@override_settings(IT_BYPASS_TOKEN_VALIDATION=True)
@patch("management.principal.it_service.ITService.request_service_accounts")
def test_get_group_service_account_empty_response(self, mock_request):
Expand Down
39 changes: 37 additions & 2 deletions tests/management/principal/test_it_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -683,7 +683,10 @@ def test_is_service_account_valid_not_matching_result_from_it(self, request_serv

@mock.patch("management.principal.it_service.ITService.request_service_accounts")
def test_is_service_account_valid_multiple_results_from_it(self, request_service_accounts: mock.Mock):
"""Test that the function under retunrs False when IT returns multiple service accounts for a single client ID."""
"""
Test that the function under test returns False
when IT returns multiple service accounts for a single client ID.
"""
request_service_accounts.return_value = [{}, {}]
user = User()
user.bearer_token = "mocked-bt"
Expand Down Expand Up @@ -935,6 +938,36 @@ def test_get_service_accounts_group_bypass_it_calls(self):
created_database_sa_principals=sa_principals_should_be_in_group, function_result=result
)

def test_get_service_accounts_group_username_only(self):
"""
Test the function returns the list of service account usernames with query
parameter username_only='true'.
"""
group_a, _ = self._create_two_rbac_groups_with_service_accounts()
expected_usernames = [sa.username for sa in group_a.principals.all()]

user = User()
user.account = self.tenant.account_id
user.org_id = self.tenant.org_id

# Call the function under test.
options = {"username_only": "true"}
result = self.it_service.get_service_accounts_group(group=group_a, user=user, options=options)

self.assertEqual(
2,
len(result),
"only two service accounts were added to the group, and a different number of them is present",
)

for item in result:
self.assertIn(
item["username"],
expected_usernames,
f'the service account with username \'{item["username"]}\' is not present in the list of expected '
f"usernames '{expected_usernames}'",
)

@mock.patch("management.principal.it_service.ITService.request_service_accounts")
def test_get_service_accounts_group_filter_by_username(self, request_service_accounts: mock.Mock):
"""Test the function under test returns the filtered service accounts by username from the given group"""
Expand Down Expand Up @@ -1191,7 +1224,9 @@ def test_generate_service_accounts_report_in_group_zero_matches(self):
)

def test_generate_service_accounts_report_in_group_mixed_results(self):
"""Test that the function under test is able to correctly flag the sevice accounts when there are mixed results"""
"""
Test that the function under test is able to correctly flag the service accounts when there are mixed results
"""
# Create a group and associate principals to it.
group = Group(name="it-service-group", platform_default=False, system=False, tenant=self.tenant)
group.save()
Expand Down

0 comments on commit d045b40

Please sign in to comment.