From c2863c92d62fcffc8e4c4edb52d91eca7ac3bbc3 Mon Sep 17 00:00:00 2001 From: Mikel Alejo Barcina Ribera Date: Thu, 22 Feb 2024 17:07:47 +0100 Subject: [PATCH 01/32] feature: principals with User Access Admin role should manage groups The service accounts and user principals with the User Access Administrator role should be able to create, modify and delete groups. RHCLOUd-30986 --- rbac/management/group/view.py | 10 +- rbac/management/permissions/group_access.py | 16 +- tests/management/group/test_view.py | 639 +++++++++++++++++- .../permissions/test_group_access.py | 108 ++- 4 files changed, 761 insertions(+), 12 deletions(-) diff --git a/rbac/management/group/view.py b/rbac/management/group/view.py index d783783f..f2acfc2e 100644 --- a/rbac/management/group/view.py +++ b/rbac/management/group/view.py @@ -590,7 +590,11 @@ def principals(self, request: Request, uuid: Optional[UUID] = None): account = self.request.user.account org_id = self.request.user.org_id if request.method == "POST": + # Make sure that system groups are kept unmodified. self.protect_system_groups("add principals") + + # Only organization administrators are allowed to add principals to groups with the "User Access + # Administrator" role. if not request.user.admin: for role in group.roles_with_access(): for access in role.access.all(): @@ -598,6 +602,7 @@ def principals(self, request: Request, uuid: Optional[UUID] = None): key = "add_principals" message = "Non-admin users may not add principals to Groups with RBAC permissions." raise serializers.ValidationError({key: _(message)}) + serializer = GroupPrincipalInputSerializer(data=request.data) # Serialize the payload and validate that it is correct. @@ -875,7 +880,10 @@ def principals(self, request: Request, uuid: Optional[UUID] = None): }, ) - return Response(status=status.HTTP_204_NO_CONTENT) + # Create a default and successful response object. If no user principals are to be removed below, this + # response will be returned. Else, it will be overriden with whichever response the user removal + # generates. + response = Response(status=status.HTTP_204_NO_CONTENT) # Remove the users from the group too. if USERNAMES_KEY in request.query_params: diff --git a/rbac/management/permissions/group_access.py b/rbac/management/permissions/group_access.py index 33c7f2e3..7310a958 100644 --- a/rbac/management/permissions/group_access.py +++ b/rbac/management/permissions/group_access.py @@ -18,14 +18,18 @@ from django.urls import reverse from management.permissions.utils import is_scope_principal from rest_framework import permissions +from rest_framework.request import Request from rbac.env import ENVIRONMENT +# Allowed methods to be able to modify principals from a group. +ALLOWED_METHODS = ["DELETE", "POST"] + class GroupAccessPermission(permissions.BasePermission): """Determines if a user has access to Group APIs.""" - def has_permission(self, request, view): + def has_permission(self, request: Request, view): """Check permission based on the defined access.""" if ENVIRONMENT.get_value("ALLOW_ANY", default=False, cast=bool): return True @@ -43,6 +47,16 @@ def has_permission(self, request, view): return True else: group_write = request.user.access.get("group", {}).get("write", []) + + # In the case that group principals are trying to be modified, check that the user or the service account + # has the proper "User Access Administrator" rights. + if request.method in ALLOWED_METHODS and view.basename == "group" and view.action == "principals": + principal_write = request.user.access.get("principal", {}).get("write", []) + if group_write and principal_write: + return True + else: + return False + if group_write: return True diff --git a/tests/management/group/test_view.py b/tests/management/group/test_view.py index bf016758..bd706c99 100644 --- a/tests/management/group/test_view.py +++ b/tests/management/group/test_view.py @@ -16,7 +16,7 @@ # """Test the group viewset.""" import random -from unittest.mock import call, patch, ANY +from unittest.mock import call, patch, ANY, Mock from uuid import uuid4 from django.db import transaction @@ -30,7 +30,7 @@ from api.models import Tenant, User from management.cache import TenantCache from management.group.serializer import GroupInputSerializer -from management.models import Group, Principal, Policy, Role, ExtRoleRelation, ExtTenant +from management.models import Access, Group, Permission, Principal, Policy, Role, ExtRoleRelation, ExtTenant from tests.core.test_kafka import copy_call_args from tests.identity_request import IdentityRequest @@ -2643,6 +2643,59 @@ def tearDown(self): Role.objects.all().delete() Policy.objects.all().delete() + def _create_group_with_user_access_administrator_role(self, tenant: Tenant) -> Group: + """Create a group with an associated User Access Administrator role.""" + # Replicate a "User Access Administrator" permission. + rbac_user_access_admin_permission = Permission() + rbac_user_access_admin_permission.application = "rbac" + rbac_user_access_admin_permission.permission = "rbac:*:*" + rbac_user_access_admin_permission.resource_type = "*" + rbac_user_access_admin_permission.tenant = tenant + rbac_user_access_admin_permission.verb = "*" + rbac_user_access_admin_permission.save() + + # Replicate a "User Access Administrator" role. + user_access_admin_role = Role() + user_access_admin_role.admin_default = True + user_access_admin_role.description = "Grants a non-org admin full access to configure and manage user access to services hosted on console.redhat.com. This role can only be viewed and assigned by Organization Administrators." + user_access_admin_role.display_name = "User Access administrator" + user_access_admin_role.name = "User Access administrator" + user_access_admin_role.platform_default = False + user_access_admin_role.system = True + user_access_admin_role.tenant = tenant + user_access_admin_role.version = 3 + user_access_admin_role.save() + + # Associate the role and the permission. + access = Access() + access.permission = rbac_user_access_admin_permission + access.role = user_access_admin_role + access.tenant = tenant + access.save() + + # Create the group that will be associated with the role. + user_access_admin_group = Group() + user_access_admin_group.admin_default = False + user_access_admin_group.description = "A group with the User Access Administrator associated role" + user_access_admin_group.name = "user-access-admin-group" + user_access_admin_group.platform_default = False + user_access_admin_group.system = False + user_access_admin_group.tenant = tenant + user_access_admin_group.save() + + # Create a policy which will be associated with the role. + user_access_admin_policy = Policy() + user_access_admin_policy.group = user_access_admin_group + user_access_admin_policy.name = "User Access Administrator policy for regular-group" + user_access_admin_policy.system = True + user_access_admin_policy.tenant = tenant + user_access_admin_policy.save() + + user_access_admin_policy.roles.add(user_access_admin_role) + user_access_admin_policy.save() + + return user_access_admin_group + def test_nonadmin_RonR_list(self): """Test that a nonadmin user can list groups in tenant""" url = "{}?application={}".format(reverse("group-list"), "rbac") @@ -2679,3 +2732,585 @@ def test_group_role_filter_as_non_admin(self): client = APIClient() response = client.get(url, **self.headers) self.assertEqual(response.status_code, status.HTTP_403_FORBIDDEN) + + @override_settings(IT_BYPASS_TOKEN_VALIDATION=True) + @override_settings(IT_BYPASS_IT_CALLS=True) + @patch("management.principal.proxy.PrincipalProxy.request_filtered_principals") + def test_group_service_account_with_user_administrator_role_add_principals( + self, request_filtered_principals: Mock + ): + """Test that a service account with the User Access administrator role can manage principals in a group + + The way the test is performed is the following: + - Creates a group with a User Access Administrator role associated to it. + - Creates a regular group which we will use for the tests. + - Sends a request with an unprivileged service account to attempt to add principals to the regular group. + - Creates a service account principal and adds it to the User Access Administrator group. + - Sends a request with the privileged user and asserts that the principals were added to the regular group. + """ + # Create the customer data we will be using for the tenant. + customer_data = self._create_customer_data() + + # Create a tenant. + user_access_admin_tenant = Tenant() + user_access_admin_tenant.account_id = customer_data["account_id"] + user_access_admin_tenant.org_id = customer_data["org_id"] + user_access_admin_tenant.ready = True + user_access_admin_tenant.tenant_name = "new-tenant" + user_access_admin_tenant.save() + + user_access_admin_group = self._create_group_with_user_access_administrator_role( + tenant=user_access_admin_tenant + ) + + # Create a regular group that we will use to attempt to add principals to. + regular_group = Group() + regular_group.admin_default = False + regular_group.description = "Just a regular group" + regular_group.name = "regular-group" + regular_group.platform_default = False + regular_group.system = False + regular_group.tenant = user_access_admin_tenant + regular_group.save() + + # Generate the URL for adding principals to the regular group. + group_principals_url = reverse("group-principals", kwargs={"uuid": regular_group.uuid}) + api_client = APIClient() + + # Create a new principal to be added to the regular group. + new_principal = Principal() + new_principal.cross_account = False + new_principal.tenant = user_access_admin_tenant + new_principal.type = "user" + new_principal.username = "fake-red-hat-user" + new_principal.save() + + # Create a new service account to be added to the regular group. + new_sa_principal = Principal() + new_sa_principal.service_account_id = uuid4() + new_sa_principal.tenant = user_access_admin_tenant + new_sa_principal.type = "service-account" + new_sa_principal.username = f"service-account-{new_sa_principal.service_account_id}" + new_sa_principal.save() + + # Create the test data to add a service account and a regular user to the group. + test_data = { + "principals": [ + {"clientID": new_sa_principal.service_account_id, "type": "service-account"}, + {"username": new_principal.username}, + ] + } + + # Make sure that before calling the endpoint the group has no members. + self.assertEqual( + 0, + regular_group.principals.count(), + "the regular group should not have any principals associated with it", + ) + + # Create the request context for an unprivileged user. + privileged_request_context = self._create_request_context( + customer_data=customer_data, + user_data=None, + service_account_data=self._create_service_account_data(), + is_org_admin=False, + ) + + # Call the endpoint under test with an unprivileged user. + response = api_client.post( + group_principals_url, test_data, format="json", **privileged_request_context["request"].META + ) + + # Assert that the user does not have permission to perform this operation. + self.assertEqual( + status.HTTP_403_FORBIDDEN, + response.status_code, + "unexpected status code when an unprivileged user attempts to add principals to a group", + ) + + # Assert that no principals were added to the group. + self.assertEqual( + 0, + regular_group.principals.count(), + "after a failed request to add a principal, the regular group should still have zero associated principals", + ) + + # Add the user access admin principal to the user access admin group. + service_account_data = self._create_service_account_data() + + user_access_admin_sa_principal = Principal() + user_access_admin_sa_principal.cross_account = False + user_access_admin_sa_principal.service_account_id = service_account_data["client_id"] + user_access_admin_sa_principal.tenant = user_access_admin_tenant + user_access_admin_sa_principal.type = "service-account" + user_access_admin_sa_principal.username = service_account_data["username"] + user_access_admin_sa_principal.save() + + user_access_admin_group.principals.add(user_access_admin_sa_principal) + + # Simulate that the proxy validates the specified user principal correctly. + request_filtered_principals.return_value = {"data": [{"username": new_principal.username}]} + + # Create the request context for privileged user. + privileged_request_context = self._create_request_context( + customer_data=customer_data, user_data=None, service_account_data=service_account_data, is_org_admin=False + ) + + # Call the endpoint under test with the user with "User Access Administrator" permissions. + response_two = api_client.post( + group_principals_url, test_data, format="json", **privileged_request_context["request"].META + ) + + # Assert that the response is the expected one. + self.assertEqual( + status.HTTP_200_OK, + response_two.status_code, + "unexpected status code when a user with the User Access Administrator role attempts to manage" + " principals on a group", + ) + + # Assert that both the service account and the user principal were added to the group. + self.assertEqual( + 2, + regular_group.principals.count(), + "after adding a principal to the regular group, the added service account and the user principal" + " should be present", + ) + + # Double check that the principals are just a service account and a regular user principal. + for principal in regular_group.principals.all(): + if principal.type == "service-account": + self.assertEqual( + new_sa_principal.uuid, + principal.uuid, + "the created service account principal in the regular group is not the same as the one we expected", + ) + else: + self.assertEqual( + new_principal.uuid, + principal.uuid, + "the created user principal in the regular group is not the same as the one we expected", + ) + + @override_settings(IT_BYPASS_TOKEN_VALIDATION=True) + @override_settings(IT_BYPASS_IT_CALLS=True) + def test_group_service_account_with_user_administrator_role_remove_principals(self): + """Test that a service account with the User Access administrator role can manage principals in a group + + The way the test is performed is the following: + - Creates a group with a User Access Administrator role associated to it. + - Creates a regular group which we will use for the tests. + - Adds a couple of principals to the regular group. + - Sends a request with an unprivileged service account to attempt to remove principals from the regular group. + - Sends a request with the privileged service account and asserts that the principals were added to the regular group. + """ + # Create the customer data we will be using for the tenant. + customer_data = self._create_customer_data() + + # Create a tenant. + user_access_admin_tenant = Tenant() + user_access_admin_tenant.account_id = customer_data["account_id"] + user_access_admin_tenant.org_id = customer_data["org_id"] + user_access_admin_tenant.ready = True + user_access_admin_tenant.tenant_name = "new-tenant" + user_access_admin_tenant.save() + + user_access_admin_group = self._create_group_with_user_access_administrator_role( + tenant=user_access_admin_tenant + ) + + # Create a regular group that we will use to attempt to remove principals from. + regular_group = Group() + regular_group.admin_default = False + regular_group.description = "Just a regular group" + regular_group.name = "regular-group" + regular_group.platform_default = False + regular_group.system = False + regular_group.tenant = user_access_admin_tenant + regular_group.save() + + # Add a service account principal to the group... + service_account_principal_to_delete = Principal() + service_account_principal_to_delete.service_account_id = uuid4() + service_account_principal_to_delete.tenant = user_access_admin_tenant + service_account_principal_to_delete.type = "service-account" + service_account_principal_to_delete.username = ( + f"service-account-{service_account_principal_to_delete.service_account_id}" + ) + service_account_principal_to_delete.save() + + regular_group.principals.add(service_account_principal_to_delete) + + # ... and a user principal one. + user_principal_to_remove = Principal() + user_principal_to_remove.cross_account = False + user_principal_to_remove.tenant = user_access_admin_tenant + user_principal_to_remove.type = "user" + user_principal_to_remove.username = "fake-red-hat-user" + user_principal_to_remove.save() + + regular_group.principals.add(user_principal_to_remove) + + # Generate the URL for removing the principals from the regular group. + group_principals_url = reverse("group-principals", kwargs={"uuid": regular_group.uuid}) + group_principals_url = f"{group_principals_url}?service-accounts={service_account_principal_to_delete.username}&usernames={user_principal_to_remove.username}" + api_client = APIClient() + + # Make sure that before calling the endpoint the group has the two principals. + self.assertEqual( + 2, + regular_group.principals.count(), + "the regular group should have two principals before the deletion attempt", + ) + + # Create the request context for an unprivileged user. + privileged_request_context = self._create_request_context( + customer_data=customer_data, + user_data=None, + service_account_data=self._create_service_account_data(), + is_org_admin=False, + ) + + # Call the endpoint under test with an unprivileged user. + response = api_client.delete( + path=group_principals_url, data=None, format="json", **privileged_request_context["request"].META + ) + + # Assert that the user does not have permission to perform this operation. + self.assertEqual( + status.HTTP_403_FORBIDDEN, + response.status_code, + "unexpected status code when an unprivileged service account attempts to remove principals from a group", + ) + + # Assert that no principals were removed from the group. + self.assertEqual( + 2, + regular_group.principals.count(), + "after a failed request to remove principals, the regular group should still two associated principals", + ) + + # Add the service account access admin principal to the user access admin group. + service_account_data = self._create_service_account_data() + + user_access_admin_sa_principal = Principal() + user_access_admin_sa_principal.cross_account = False + user_access_admin_sa_principal.service_account_id = service_account_data["client_id"] + user_access_admin_sa_principal.tenant = user_access_admin_tenant + user_access_admin_sa_principal.type = "service-account" + user_access_admin_sa_principal.username = service_account_data["username"] + user_access_admin_sa_principal.save() + + user_access_admin_group.principals.add(user_access_admin_sa_principal) + + # Create the request context for privileged user. + privileged_request_context = self._create_request_context( + customer_data=customer_data, user_data=None, service_account_data=service_account_data, is_org_admin=False + ) + + # Call the endpoint under test with the user with "User Access Administrator" permissions. + response_two = api_client.delete( + path=group_principals_url, format="json", **privileged_request_context["request"].META + ) + + # Assert that the response is the expected one. + self.assertEqual( + status.HTTP_204_NO_CONTENT, + response_two.status_code, + "unexpected status code when a service account with the User Access Administrator role attempts to delete" + " principals from a group", + ) + + # Assert that both the service account and the user principal were added to the group. + self.assertEqual( + 0, + regular_group.principals.count(), + "after removing the principals from the regular group, the added service account and the user" + " principal should no longer be present", + ) + + @override_settings(IT_BYPASS_TOKEN_VALIDATION=True) + @override_settings(IT_BYPASS_IT_CALLS=True) + @patch("management.principal.proxy.PrincipalProxy.request_filtered_principals") + def test_group_user_with_user_administrator_role_add_principals(self, request_filtered_principals: Mock): + """Test that a user with the User Access administrator role can manage principals in a group + + The way the test is performed is the following: + - Creates a group with a User Access Administrator role associated to it. + - Creates a regular group which we will use for the tests. + - Sends a request with an unprivileged user to attempt to add principals to the regular group. + - Creates a user principal and adds it to the User Access Administrator group. + - Sends a request with the privileged user and asserts that the principals were added to the regular group. + """ + # Create the customer data we will be using for the tenant. + customer_data = self._create_customer_data() + + # Create a tenant. + user_access_admin_tenant = Tenant() + user_access_admin_tenant.account_id = customer_data["account_id"] + user_access_admin_tenant.org_id = customer_data["org_id"] + user_access_admin_tenant.ready = True + user_access_admin_tenant.tenant_name = "new-tenant" + user_access_admin_tenant.save() + + user_access_admin_group = self._create_group_with_user_access_administrator_role( + tenant=user_access_admin_tenant + ) + + # Create a regular group that we will use to attempt to add principals to. + regular_group = Group() + regular_group.admin_default = False + regular_group.description = "Just a regular group" + regular_group.name = "regular-group" + regular_group.platform_default = False + regular_group.system = False + regular_group.tenant = user_access_admin_tenant + regular_group.save() + + # Generate the URL for adding principals to the regular group. + group_principals_url = reverse("group-principals", kwargs={"uuid": regular_group.uuid}) + api_client = APIClient() + + # Create a new principal to be added to the regular group. + new_principal = Principal() + new_principal.cross_account = False + new_principal.tenant = user_access_admin_tenant + new_principal.type = "user" + new_principal.username = "fake-red-hat-user" + new_principal.save() + + # Create a new service account to be added to the regular group. + new_sa_principal = Principal() + new_sa_principal.service_account_id = uuid4() + new_sa_principal.tenant = user_access_admin_tenant + new_sa_principal.type = "service-account" + new_sa_principal.username = f"service-account-{new_sa_principal.service_account_id}" + new_sa_principal.save() + + # Create the test data to add a service account and a regular user to the group. + test_data = { + "principals": [ + {"clientID": new_sa_principal.service_account_id, "type": "service-account"}, + {"username": new_principal.username}, + ] + } + + # Make sure that before calling the endpoint the group has no members. + self.assertEqual( + 0, + regular_group.principals.count(), + "the regular group should not have any principals associated with it", + ) + + # Create the request context for an unprivileged user. + privileged_request_context = self._create_request_context( + customer_data=customer_data, user_data=self._create_user_data(), is_org_admin=False + ) + + # Call the endpoint under test with an unprivileged user. + response = api_client.post( + path=group_principals_url, data=test_data, format="json", **privileged_request_context["request"].META + ) + + # Assert that the user does not have permission to perform this operation. + self.assertEqual( + status.HTTP_403_FORBIDDEN, + response.status_code, + "unexpected status code when an unprivileged user attempts to add principals to a group", + ) + + # Assert that no principals were added to the group. + self.assertEqual( + 0, + regular_group.principals.count(), + "after a failed request to add a principal, the regular group should still have zero associated principals", + ) + + # Add the user access admin principal to the user access admin group. + user_data = self._create_user_data() + + user_access_admin_principal = Principal() + user_access_admin_principal.cross_account = False + user_access_admin_principal.tenant = user_access_admin_tenant + user_access_admin_principal.type = "user" + user_access_admin_principal.username = user_data["username"] + user_access_admin_principal.save() + + user_access_admin_group.principals.add(user_access_admin_principal) + + # Simulate that the proxy validates the specified user principal correctly. + request_filtered_principals.return_value = {"data": [{"username": new_principal.username}]} + + # Create the request context for privileged user. + privileged_request_context = self._create_request_context( + customer_data=customer_data, user_data=user_data, is_org_admin=False + ) + + # Call the endpoint under test with the user with "User Access Administrator" permissions. + response_two = api_client.post( + path=group_principals_url, data=test_data, format="json", **privileged_request_context["request"].META + ) + + # Assert that the response is the expected one. + self.assertEqual( + status.HTTP_200_OK, + response_two.status_code, + "unexpected status code when a user with the User Access Administrator role attempts to add" + " principals to a group", + ) + + # Assert that both the service account and the user principal were added to the group. + self.assertEqual( + 2, + regular_group.principals.count(), + "after adding a principal to the regular group, the added service account and the user principal" + " should be present", + ) + + # Double check that the principals are just a service account and a regular user principal. + for principal in regular_group.principals.all(): + if principal.type == "service-account": + self.assertEqual( + new_sa_principal.uuid, + principal.uuid, + "the created service account principal in the regular group is not the same as the one we expected", + ) + else: + self.assertEqual( + new_principal.uuid, + principal.uuid, + "the created user principal in the regular group is not the same as the one we expected", + ) + + @override_settings(IT_BYPASS_TOKEN_VALIDATION=True) + @override_settings(IT_BYPASS_IT_CALLS=True) + def test_group_user_with_user_administrator_role_remove_principals(self): + """Test that a user with the User Access administrator role can manage principals in a group + + The way the test is performed is the following: + - Creates a group with a User Access Administrator role associated to it. + - Creates a regular group which we will use for the tests. + - Adds a couple of principals to the regular group. + - Sends a request with an unprivileged user to attempt to remove principals from the regular group. + - Sends a request with the privileged user and asserts that the principals were added to the regular group. + """ + # Create the customer data we will be using for the tenant. + customer_data = self._create_customer_data() + + # Create a tenant. + user_access_admin_tenant = Tenant() + user_access_admin_tenant.account_id = customer_data["account_id"] + user_access_admin_tenant.org_id = customer_data["org_id"] + user_access_admin_tenant.ready = True + user_access_admin_tenant.tenant_name = "new-tenant" + user_access_admin_tenant.save() + + user_access_admin_group = self._create_group_with_user_access_administrator_role( + tenant=user_access_admin_tenant + ) + + # Create a regular group that we will use to attempt to remove principals from. + regular_group = Group() + regular_group.admin_default = False + regular_group.description = "Just a regular group" + regular_group.name = "regular-group" + regular_group.platform_default = False + regular_group.system = False + regular_group.tenant = user_access_admin_tenant + regular_group.save() + + # Add a service account principal to the group... + service_account_principal_to_delete = Principal() + service_account_principal_to_delete.service_account_id = uuid4() + service_account_principal_to_delete.tenant = user_access_admin_tenant + service_account_principal_to_delete.type = "service-account" + service_account_principal_to_delete.username = ( + f"service-account-{service_account_principal_to_delete.service_account_id}" + ) + service_account_principal_to_delete.save() + + regular_group.principals.add(service_account_principal_to_delete) + + # ... and a user principal one. + user_principal_to_remove = Principal() + user_principal_to_remove.cross_account = False + user_principal_to_remove.tenant = user_access_admin_tenant + user_principal_to_remove.type = "user" + user_principal_to_remove.username = "fake-red-hat-user" + user_principal_to_remove.save() + + regular_group.principals.add(user_principal_to_remove) + + # Generate the URL for removing the principals from the regular group. + group_principals_url = reverse("group-principals", kwargs={"uuid": regular_group.uuid}) + group_principals_url = f"{group_principals_url}?service-accounts={service_account_principal_to_delete.username}&usernames={user_principal_to_remove.username}" + api_client = APIClient() + + # Make sure that before calling the endpoint the group has the two principals. + self.assertEqual( + 2, + regular_group.principals.count(), + "the regular group should have two principals before the deletion attempt", + ) + + # Create the request context for an unprivileged user. + privileged_request_context = self._create_request_context( + customer_data=customer_data, user_data=self._create_user_data(), is_org_admin=False + ) + + # Call the endpoint under test with an unprivileged user. + response = api_client.delete( + path=group_principals_url, data=None, format="json", **privileged_request_context["request"].META + ) + + # Assert that the user does not have permission to perform this operation. + self.assertEqual( + status.HTTP_403_FORBIDDEN, + response.status_code, + "unexpected status code when an unprivileged user attempts to remove principals from a group", + ) + + # Assert that no principals were removed from the group. + self.assertEqual( + 2, + regular_group.principals.count(), + "after a failed request to remove principals, the regular group should still two associated principals", + ) + + # Add the user access admin principal to the user access admin group. + user_data = self._create_user_data() + + user_access_admin_principal = Principal() + user_access_admin_principal.cross_account = False + user_access_admin_principal.tenant = user_access_admin_tenant + user_access_admin_principal.type = "user" + user_access_admin_principal.username = user_data["username"] + user_access_admin_principal.save() + + user_access_admin_group.principals.add(user_access_admin_principal) + + # Create the request context for privileged user. + privileged_request_context = self._create_request_context( + customer_data=customer_data, user_data=user_data, is_org_admin=False + ) + + # Call the endpoint under test with the user with "User Access Administrator" permissions. + response_two = api_client.delete( + path=group_principals_url, format="json", **privileged_request_context["request"].META + ) + + # Assert that the response is the expected one. + self.assertEqual( + status.HTTP_204_NO_CONTENT, + response_two.status_code, + "unexpected status code when a user with the User Access Administrator role attempts to delete" + " principals from a group", + ) + + # Assert that both the service account and the user principal were added to the group. + self.assertEqual( + 0, + regular_group.principals.count(), + "after removing the principals from the regular group, the added service account and the user" + " principal should no longer be present", + ) diff --git a/tests/management/permissions/test_group_access.py b/tests/management/permissions/test_group_access.py index c7a888da..2db6ab9e 100644 --- a/tests/management/permissions/test_group_access.py +++ b/tests/management/permissions/test_group_access.py @@ -27,12 +27,17 @@ class GroupAccessPermissionTest(TestCase): """Test the group access permission.""" + def setUp(self): + self.mocked_view = Mock() + self.mocked_view.action = "mocked-action" + self.mocked_view.basename = "mocked-view" + def test_has_perm_admin(self): """Test that an admin user can execute.""" user = Mock(spec=User, admin=True) req = Mock(user=user) accessPerm = GroupAccessPermission() - result = accessPerm.has_permission(request=req, view=None) + result = accessPerm.has_permission(request=req, view=self.mocked_view) self.assertTrue(result) def test_no_perm_not_admin_get(self): @@ -45,7 +50,7 @@ def test_no_perm_not_admin_get(self): user = Mock(spec=User, admin=False, access=access, identity_header={}) req = Mock(user=user, method="GET") accessPerm = GroupAccessPermission() - result = accessPerm.has_permission(request=req, view=None) + result = accessPerm.has_permission(request=req, view=self.mocked_view) self.assertFalse(result) def test_no_perm_not_admin_post(self): @@ -58,7 +63,7 @@ def test_no_perm_not_admin_post(self): user = Mock(spec=User, admin=False, access=access, identity_header={}) req = Mock(user=user, method="POST") accessPerm = GroupAccessPermission() - result = accessPerm.has_permission(request=req, view=None) + result = accessPerm.has_permission(request=req, view=self.mocked_view) self.assertFalse(result) def test_has_perm_not_admin_post(self): @@ -71,7 +76,7 @@ def test_has_perm_not_admin_post(self): user = Mock(spec=User, admin=False, access=access, identity_header={}) req = Mock(user=user, method="POST") accessPerm = GroupAccessPermission() - result = accessPerm.has_permission(request=req, view=None) + result = accessPerm.has_permission(request=req, view=self.mocked_view) self.assertFalse(result) def test_has_perm_not_admin_post_success(self): @@ -84,7 +89,7 @@ def test_has_perm_not_admin_post_success(self): user = Mock(spec=User, admin=False, access=access, identity_header={}) req = Mock(user=user, method="POST") accessPerm = GroupAccessPermission() - result = accessPerm.has_permission(request=req, view=None) + result = accessPerm.has_permission(request=req, view=self.mocked_view) self.assertTrue(result) def test_has_perm_not_admin_get(self): @@ -97,7 +102,7 @@ def test_has_perm_not_admin_get(self): user = Mock(spec=User, admin=False, access=access, identity_header={}) req = Mock(user=user, method="GET", query_params={}) accessPerm = GroupAccessPermission() - result = accessPerm.has_permission(request=req, view=None) + result = accessPerm.has_permission(request=req, view=self.mocked_view) self.assertTrue(result) def test_no_perm_not_admin_get_own_groups(self): @@ -111,7 +116,7 @@ def test_no_perm_not_admin_get_own_groups(self): user = Mock(spec=User, admin=False, access=access, username="test_user") req = Mock(user=user, method="GET", query_params={"username": "test_user"}) accessPerm = GroupAccessPermission() - result = accessPerm.has_permission(request=req, view=None) + result = accessPerm.has_permission(request=req, view=self.mocked_view) self.assertTrue(result) def test_no_perm_not_admin_get_others_groups(self): @@ -125,5 +130,92 @@ def test_no_perm_not_admin_get_others_groups(self): user = Mock(spec=User, admin=False, access=access, username="test_user") req = Mock(user=user, method="GET", query_params={"username": "test_user2"}) accessPerm = GroupAccessPermission() - result = accessPerm.has_permission(request=req, view=None) + result = accessPerm.has_permission(request=req, view=self.mocked_view) self.assertFalse(result) + + def test_perm_not_admin_user_admin_role_modify_principals(self): + """Test that a user with a User Administrator Role can manage the principals of a group.""" + # Mock the user's access. + access = { + "group": {"read": ["*"], "write": ["*"]}, + "principal": {"read": ["*"], "write": ["*"]}, + "role": {"read": [], "write": []}, + "policy": {"read": [], "write": []}, + } + + # Mock the request. + user = Mock(spec=User, admin=False, access=access, username="test_user") + + for method in ["DELETE", "POST"]: + request = Mock(user=user, method=method) + + # Mock the view to make it seem like we are about to manage principals from the group. + mocked_view = Mock() + mocked_view.action = "principals" + mocked_view.basename = "group" + + # Call the function under test. + group_access_permission = GroupAccessPermission() + + self.assertTrue( + group_access_permission.has_permission(request=request, view=mocked_view), + f"a user with a User Administrator role should be able to manage principals using method '{method}'", + ) + + def test_perm_not_admin_no_group_write_not_allowed_modify_principals(self): + """Test that a user which does not have "write" permissions for a group cannot manage principals.""" + # Mock the user's access. + access = { + "group": {"read": ["*"], "write": []}, + "principal": {"read": ["*"], "write": ["*"]}, + "role": {"read": [], "write": []}, + "policy": {"read": [], "write": []}, + } + + # Mock the request. + user = Mock(spec=User, admin=False, access=access, username="test_user") + + for method in ["DELETE", "POST"]: + request = Mock(user=user, method="POST") + + # Mock the view to make it seem like we are about to manage principals from the group. + mocked_view = Mock() + mocked_view.action = "principals" + mocked_view.basename = "group" + + # Call the function under test. + group_access_permission = GroupAccessPermission() + + self.assertFalse( + group_access_permission.has_permission(request=request, view=mocked_view), + f"a user without group \"write\" permissions should not be allowed to manage principals using method '{method}'", + ) + + def test_perm_not_admin_no_principal_write_not_allowed_modify_principals(self): + """Test that a user which does not have "write" permissions for a principal cannot manage principals.""" + # Mock the user's access. + access = { + "group": {"read": ["*"], "write": ["*"]}, + "principal": {"read": ["*"], "write": []}, + "role": {"read": [], "write": []}, + "policy": {"read": [], "write": []}, + } + + # Mock the request. + user = Mock(spec=User, admin=False, access=access, username="test_user") + for method in ["DELETE", "POST"]: + request = Mock(user=user, method="POST") + + # Mock the view to make it seem like we are about to manage principals from the group. + mocked_view = Mock() + mocked_view.action = "principals" + mocked_view.basename = "group" + + # Call the function under test. + group_access_permission = GroupAccessPermission() + + self.assertFalse( + group_access_permission.has_permission(request=request, view=mocked_view), + f'a user without principal "write" permissions should not be allowed to manage principals using method' + f" '{method}'", + ) From 2e4f01d024fbb2b0d13c2c4e4bbc9523eb39fd47 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Petra=20C=CC=8Ci=CC=81halova=CC=81?= Date: Mon, 11 Mar 2024 11:54:19 +0100 Subject: [PATCH 02/32] improved error message for insufficient permissions adding a principal into a group --- rbac/management/group/view.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rbac/management/group/view.py b/rbac/management/group/view.py index f2acfc2e..0cf6f48c 100644 --- a/rbac/management/group/view.py +++ b/rbac/management/group/view.py @@ -600,7 +600,7 @@ def principals(self, request: Request, uuid: Optional[UUID] = None): for access in role.access.all(): if access.permission_application() == "rbac": key = "add_principals" - message = "Non-admin users may not add principals to Groups with RBAC permissions." + message = "Non-admin users may not add principals to Groups with RBAC Admin permissions." raise serializers.ValidationError({key: _(message)}) serializer = GroupPrincipalInputSerializer(data=request.data) From 6fc9bea7c29f3a34bc97b1cab9d41b9f0f7e1d5f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Petra=20C=CC=8Ci=CC=81halova=CC=81?= Date: Mon, 11 Mar 2024 11:55:00 +0100 Subject: [PATCH 03/32] typo fix in the rbac/management/group/view.py --- rbac/management/group/view.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rbac/management/group/view.py b/rbac/management/group/view.py index 0cf6f48c..4be2a2ab 100644 --- a/rbac/management/group/view.py +++ b/rbac/management/group/view.py @@ -881,7 +881,7 @@ def principals(self, request: Request, uuid: Optional[UUID] = None): ) # Create a default and successful response object. If no user principals are to be removed below, this - # response will be returned. Else, it will be overriden with whichever response the user removal + # response will be returned. Else, it will be overridden with whichever response the user removal # generates. response = Response(status=status.HTTP_204_NO_CONTENT) From 7a74b173ce94e46f2b78f6e580da2249b90e4df0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Petra=20C=CC=8Ci=CC=81halova=CC=81?= Date: Mon, 11 Mar 2024 13:02:11 +0100 Subject: [PATCH 04/32] renamed the local rbac admin role into 'User Access Administrator' this will solve inconsistency between the name for this role used in stage/prod environents and in local environment --- rbac/management/role/definitions/rbac_local_test.json | 2 +- tests/management/group/test_definer.py | 4 ++-- tests/management/role/test_definer.py | 6 +++--- 3 files changed, 6 insertions(+), 6 deletions(-) diff --git a/rbac/management/role/definitions/rbac_local_test.json b/rbac/management/role/definitions/rbac_local_test.json index 2213201f..179055bd 100644 --- a/rbac/management/role/definitions/rbac_local_test.json +++ b/rbac/management/role/definitions/rbac_local_test.json @@ -1,7 +1,7 @@ { "roles": [ { - "name": "RBAC Administrator Local Test", + "name": "User Access administrator", "description": "Grants a non-org admin full access to configure and manage user access to services hosted on cloud.redhat.com. This role can only be viewed and assigned by Organization Administrators.", "system": true, "version": 3, diff --git a/tests/management/group/test_definer.py b/tests/management/group/test_definer.py index 4f2c54f7..5ca2825c 100644 --- a/tests/management/group/test_definer.py +++ b/tests/management/group/test_definer.py @@ -75,7 +75,7 @@ def test_default_group_seeding_reassign_roles(self, send_kafka_message): new_platform_role = Role.objects.create( name="new_platform_role", platform_default=True, tenant=self.public_tenant ) - role_to_remove = Role.objects.get(name="RBAC Administrator Local Test") + role_to_remove = Role.objects.get(name="User Access administrator") with self.settings(NOTIFICATIONS_RH_ENABLED=True, NOTIFICATIONS_ENABLED=True): try: seed_group() @@ -147,7 +147,7 @@ def test_default_group_seeding_reassign_roles(self, send_kafka_message): def modify_default_group(self, system=True): """Add a role to the default group and/or change the system flag""" group = Group.objects.get(platform_default=True) - roles = Role.objects.filter(name="RBAC Administrator Local Test") + roles = Role.objects.filter(name="User Access administrator") add_roles(group, roles, self.public_tenant) group.system = system diff --git a/tests/management/role/test_definer.py b/tests/management/role/test_definer.py index 92d87339..cd2d4c6a 100644 --- a/tests/management/role/test_definer.py +++ b/tests/management/role/test_definer.py @@ -47,7 +47,7 @@ def test_role_create(self, send_kafka_message): org_id = None self.assertTrue(len(roles)) - self.assertFalse(Role.objects.get(name="RBAC Administrator Local Test").platform_default) + self.assertFalse(Role.objects.get(name="User Access administrator").platform_default) kafka_mock.assert_any_call( settings.NOTIFICATIONS_TOPIC, @@ -109,7 +109,7 @@ def test_role_update_platform_default_role(self, send_kafka_message): self.try_seed_roles() # Update non platform default role - non_platform_role_to_update = Role.objects.get(name="RBAC Administrator Local Test") + non_platform_role_to_update = Role.objects.get(name="User Access administrator") non_platform_role_to_update.version = 0 access = non_platform_role_to_update.access.first() access.permission = Permission.objects.get(permission="rbac:principal:read") @@ -201,7 +201,7 @@ def try_seed_roles(self): # Update relation to point to a new role. Seed again would update relation back. ext_relation = ExtRoleRelation.objects.first() origin_role = ext_relation.role - ext_relation.role = Role.objects.get(name="RBAC Administrator Local Test") + ext_relation.role = Role.objects.get(name="User Access administrator") ext_relation.save() origin_role.version = 1 origin_role.save() From 6d45be371b3563912508cfa3b85479f1eb7a3bcc Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Petra=20C=CC=8Ci=CC=81halova=CC=81?= Date: Mon, 11 Mar 2024 13:05:37 +0100 Subject: [PATCH 05/32] removed 'user_has_permission_act_on_service_account' this function is not needed anymore, the check will be processed in the same place as user based principals to manipulate with both types in the same way --- rbac/management/group/view.py | 62 ----------------------------------- 1 file changed, 62 deletions(-) diff --git a/rbac/management/group/view.py b/rbac/management/group/view.py index 4be2a2ab..f82dad23 100644 --- a/rbac/management/group/view.py +++ b/rbac/management/group/view.py @@ -22,7 +22,6 @@ import requests from django.conf import settings -from django.db import connection from django.db import transaction from django.db.models import Q from django.db.models.aggregates import Count @@ -435,8 +434,6 @@ def add_service_accounts( # Fetch the service account from our database to add it to the group. If it doesn't exist, we create # it. for specified_sa in service_accounts: - self.user_has_permission_act_on_service_account(user=user, service_account=specified_sa) - client_id = specified_sa["clientID"] try: principal = Principal.objects.get( @@ -1122,62 +1119,3 @@ def remove_service_accounts( ) for username in service_accounts: group_principal_change_notification_handler(self.request.user, group, username, "removed") - - def user_has_permission_act_on_service_account(self, user: User, service_account: dict = {}): - """Check if the user has permission to create or delete the service account. - - Only org admins, users with the "User Access administrator" or the owner of the service account can create or - remove service accounts. - """ - if settings.IT_BYPASS_PERMISSIONS_MODIFY_SERVICE_ACCOUNTS: - return - - # Is the user an organization administrator? - is_organization_admin: bool = user.admin - - # Is the user the owner of the service account? - is_user_owner: bool = False - - owner = service_account.get("owner") - if owner: - is_user_owner = user.username == owner - - # Check if the user has the "User Access administrator" permission. Leaving the RAW query here - username: str = user.username # type: ignore - query = ( - "SELECT EXISTS ( " - "SELECT " - "1 " - "FROM " - '"management_principal" AS "mp" ' - "INNER JOIN " - '"management_group_principals" AS "mgp" ON "mgp"."principal_id" = "mp"."id" ' - "INNER JOIN " - '"management_policy" AS "mpolicy" ON "mpolicy"."group_id" = "mgp"."group_id" ' - "INNER JOIN " - '"management_policy_roles" AS "mpr" ON "mpr"."policy_id" = "mpolicy"."id" ' - "INNER JOIN " - '"management_role" AS "mr" ON "mr"."id" = "mpr"."role_id" ' - "WHERE " - '"mp"."username" = %s ' - "AND " - "mr.\"name\" = 'User Access administrator' " - "LIMIT 1 " - ') AS "user_has_user_access_administrator_permission"' - ) - - with connection.cursor() as cursor: - cursor.execute(query, [username]) - - row: tuple = cursor.fetchone() - - user_has_user_access_administrator_permission = row[0] - - if (not is_organization_admin) and (not user_has_user_access_administrator_permission) and (not is_user_owner): - logger.debug( - f"User {user} was denied altering service account {service_account} due to insufficient privileges." - ) - - raise InsufficientPrivilegesError( - f"Unable to alter service account {service_account} due to insufficient privileges." - ) From d896c7545ffb507e2b7eeb6eaaea9e62c28e6f54 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Petra=20C=CC=8Ci=CC=81halova=CC=81?= Date: Mon, 11 Mar 2024 13:08:02 +0100 Subject: [PATCH 06/32] updated logic for add principal into group that contains User Access Administrator role we need to check if the group where we want to add new principal contains role 'User Access Administrator' and in that case we must disallow this action because only Organisation Administrator can do this before we checked only if the cole contains 'rbac' under applications but this was applicable to oher groups as 'User Access principal viewer' too it was therefore necessary to make this condition more precise --- rbac/management/group/view.py | 13 ++++++++----- 1 file changed, 8 insertions(+), 5 deletions(-) diff --git a/rbac/management/group/view.py b/rbac/management/group/view.py index f82dad23..67a9a220 100644 --- a/rbac/management/group/view.py +++ b/rbac/management/group/view.py @@ -84,6 +84,7 @@ VALID_PRINCIPAL_ORDER_FIELDS = ["username"] VALID_PRINCIPAL_TYPE_VALUE = ["service-account", "user"] VALID_ROLE_ROLE_DISCRIMINATOR = ["all", "any"] +USER_ACCESS_ADMINISTRATOR_ROLE_KEY = "User Access administrator" logger = logging.getLogger(__name__) # pylint: disable=invalid-name @@ -594,11 +595,13 @@ def principals(self, request: Request, uuid: Optional[UUID] = None): # Administrator" role. if not request.user.admin: for role in group.roles_with_access(): - for access in role.access.all(): - if access.permission_application() == "rbac": - key = "add_principals" - message = "Non-admin users may not add principals to Groups with RBAC Admin permissions." - raise serializers.ValidationError({key: _(message)}) + if role.name == USER_ACCESS_ADMINISTRATOR_ROLE_KEY: + key = "add_principals" + message = ( + "Non-admin users may not add principals to Groups with " + f"'{USER_ACCESS_ADMINISTRATOR_ROLE_KEY}' role." + ) + raise serializers.ValidationError({key: _(message)}) serializer = GroupPrincipalInputSerializer(data=request.data) From 78c7197a9a71d071b5a6c1e34fd254e2fca6123c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Petra=20C=CC=8Ci=CC=81halova=CC=81?= Date: Mon, 11 Mar 2024 13:22:57 +0100 Subject: [PATCH 07/32] the non org admins cannot remove principal from group with User Access Administrator role --- rbac/management/group/view.py | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/rbac/management/group/view.py b/rbac/management/group/view.py index 67a9a220..5afd5b9c 100644 --- a/rbac/management/group/view.py +++ b/rbac/management/group/view.py @@ -833,6 +833,18 @@ def principals(self, request: Request, uuid: Optional[UUID] = None): else: self.protect_system_groups("remove principals") + # Only organization administrators are allowed to remove principals from groups with the + # "User Access Administrator" role. + if not request.user.admin: + for role in group.roles_with_access(): + if role.name == USER_ACCESS_ADMINISTRATOR_ROLE_KEY: + key = "remove_principals" + message = ( + "Non-admin users may not remove principals from Groups with " + f"'{USER_ACCESS_ADMINISTRATOR_ROLE_KEY}' role." + ) + raise serializers.ValidationError({key: _(message)}) + if SERVICE_ACCOUNTS_KEY not in request.query_params and USERNAMES_KEY not in request.query_params: key = "detail" message = "Query parameter {} or {} is required.".format(SERVICE_ACCOUNTS_KEY, USERNAMES_KEY) From dc1a98ed4836abae16d63abe69a9230075238912 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Petra=20C=CC=8Ci=CC=81halova=CC=81?= Date: Mon, 11 Mar 2024 13:38:04 +0100 Subject: [PATCH 08/32] update http method to PUT for update a group endpoint --- rbac/management/group/view.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rbac/management/group/view.py b/rbac/management/group/view.py index 5afd5b9c..fa87fb0c 100644 --- a/rbac/management/group/view.py +++ b/rbac/management/group/view.py @@ -345,7 +345,7 @@ def destroy(self, request, *args, **kwargs): def update(self, request, *args, **kwargs): """Update a group. - @api {post} /api/v1/groups/:uuid Update a group + @api {put} /api/v1/groups/:uuid Update a group @apiName updateGroup @apiGroup Group @apiVersion 1.0.0 From a01898e439b86b9539bc221480189f147c4fa4e6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Petra=20C=CC=8Ci=CC=81halova=CC=81?= Date: Mon, 11 Mar 2024 13:38:54 +0100 Subject: [PATCH 09/32] the non org admins cannot update a group with User Access Administrator role --- rbac/management/group/view.py | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/rbac/management/group/view.py b/rbac/management/group/view.py index fa87fb0c..ea506799 100644 --- a/rbac/management/group/view.py +++ b/rbac/management/group/view.py @@ -366,6 +366,18 @@ def update(self, request, *args, **kwargs): """ validate_uuid(kwargs.get("uuid"), "group uuid validation") self.protect_system_groups("update") + + group = self.get_object() + # Only organization administrators are allowed to update a groups with the "User Access Administrator" role. + if not request.user.admin: + for role in group.roles_with_access(): + if role.name == USER_ACCESS_ADMINISTRATOR_ROLE_KEY: + key = "update_group" + message = ( + f"Non-admin users may not update a group with '{USER_ACCESS_ADMINISTRATOR_ROLE_KEY}' role." + ) + raise serializers.ValidationError({key: _(message)}) + return super().update(request=request, args=args, kwargs=kwargs) def add_principals(self, group, principals, account=None, org_id=None): From 3d81a9b9ec3f6c5ea5624a4cb23d7bf9be49db84 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Petra=20C=CC=8Ci=CC=81halova=CC=81?= Date: Mon, 11 Mar 2024 13:39:14 +0100 Subject: [PATCH 10/32] the non org admins cannot remove a group with User Access Administrator role --- rbac/management/group/view.py | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/rbac/management/group/view.py b/rbac/management/group/view.py index ea506799..1e7d22ef 100644 --- a/rbac/management/group/view.py +++ b/rbac/management/group/view.py @@ -337,6 +337,17 @@ def destroy(self, request, *args, **kwargs): validate_uuid(kwargs.get("uuid"), "group uuid validation") self.protect_system_groups("delete") group = self.get_object() + + # Only organization administrators are allowed to remove a groups with the "User Access Administrator" role. + if not request.user.admin: + for role in group.roles_with_access(): + if role.name == USER_ACCESS_ADMINISTRATOR_ROLE_KEY: + key = "remove_group" + message = ( + f"Non-admin users may not remove a group with '{USER_ACCESS_ADMINISTRATOR_ROLE_KEY}' role." + ) + raise serializers.ValidationError({key: _(message)}) + response = super().destroy(request=request, args=args, kwargs=kwargs) if response.status_code == status.HTTP_204_NO_CONTENT: group_obj_change_notification_handler(request.user, group, "deleted") From 6c2f8beb0a936976a3a29ecab7f6e2414226a654 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Petra=20C=CC=8Ci=CC=81halova=CC=81?= Date: Mon, 11 Mar 2024 13:55:31 +0100 Subject: [PATCH 11/32] the non org admins cannot add a role 'User Access Administrator' into groups --- rbac/management/group/definer.py | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/rbac/management/group/definer.py b/rbac/management/group/definer.py index 393787b3..15b92dd7 100644 --- a/rbac/management/group/definer.py +++ b/rbac/management/group/definer.py @@ -36,6 +36,7 @@ from api.models import Tenant +USER_ACCESS_ADMINISTRATOR_ROLE_KEY = "User Access administrator" logger = logging.getLogger(__name__) # pylint: disable=invalid-name @@ -133,12 +134,11 @@ def add_roles(group, roles_or_role_ids, tenant, user=None): Q(tenant=tenant) | Q(tenant=Tenant.objects.get(tenant_name="public")), name__in=role_names ) for role in roles: - accesses = role.access.all() - for access in accesses: - if access.permission_application() == "rbac" and user and not user.admin: - key = "add-roles" - message = f"Non-admin users cannot add RBAC role {role.display_name} to groups." - raise serializers.ValidationError({key: _(message)}) + # Only Organization administrators are allowed to add 'User Access Administrator' role into a group. + if role.name == USER_ACCESS_ADMINISTRATOR_ROLE_KEY and user and not user.admin: + key = "add-roles" + message = f"Non-admin users cannot add '{USER_ACCESS_ADMINISTRATOR_ROLE_KEY}' role to groups." + raise serializers.ValidationError({key: _(message)}) # Only add the role if it was not attached if not system_policy.roles.filter(pk=role.pk).exists(): system_policy.roles.add(role) From a0ee3cc21745cc6aed569181fe6a42cfb62399f7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Petra=20C=CC=8Ci=CC=81halova=CC=81?= Date: Mon, 11 Mar 2024 14:01:49 +0100 Subject: [PATCH 12/32] import USER_ACCESS_ADMINISTRATOR_ROLE_KEY to have this constant only once in the code --- rbac/management/group/view.py | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/rbac/management/group/view.py b/rbac/management/group/view.py index 1e7d22ef..31e0db56 100644 --- a/rbac/management/group/view.py +++ b/rbac/management/group/view.py @@ -30,7 +30,12 @@ from management.authorization.scope_claims import ScopeClaims from management.authorization.token_validator import ITSSOTokenValidator from management.filters import CommonFilters -from management.group.definer import add_roles, remove_roles, set_system_flag_before_update +from management.group.definer import ( + USER_ACCESS_ADMINISTRATOR_ROLE_KEY, + add_roles, + remove_roles, + set_system_flag_before_update, +) from management.group.model import Group from management.group.serializer import ( GroupInputSerializer, @@ -84,7 +89,6 @@ VALID_PRINCIPAL_ORDER_FIELDS = ["username"] VALID_PRINCIPAL_TYPE_VALUE = ["service-account", "user"] VALID_ROLE_ROLE_DISCRIMINATOR = ["all", "any"] -USER_ACCESS_ADMINISTRATOR_ROLE_KEY = "User Access administrator" logger = logging.getLogger(__name__) # pylint: disable=invalid-name From d5890bad5d7471a9133378a03c8cdf38498237d3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Petra=20C=CC=8Ci=CC=81halova=CC=81?= Date: Mon, 11 Mar 2024 14:20:23 +0100 Subject: [PATCH 13/32] the non org admins cannot add a role into group with User Access Administrator role --- rbac/management/group/view.py | 13 +++++++++++++ 1 file changed, 13 insertions(+) diff --git a/rbac/management/group/view.py b/rbac/management/group/view.py index 31e0db56..95dc3dba 100644 --- a/rbac/management/group/view.py +++ b/rbac/management/group/view.py @@ -1026,6 +1026,19 @@ def roles(self, request, uuid=None, principals=None): group = self.get_object() if request.method == "POST": self.protect_default_admin_group_roles(group) + + # Only organization administrators are allowed to add a role into a groups + # with the "User Access Administrator" role. + if not request.user.admin: + for role in group.roles_with_access(): + if role.name == USER_ACCESS_ADMINISTRATOR_ROLE_KEY: + key = "add_role" + message = ( + "Non-admin users may not add a role into a group with " + f"'{USER_ACCESS_ADMINISTRATOR_ROLE_KEY}' role." + ) + raise serializers.ValidationError({key: _(message)}) + serializer = GroupRoleSerializerIn(data=request.data) if serializer.is_valid(raise_exception=True): roles = request.data.pop(ROLES_KEY, []) From 75f22420a2b3da27507a6b959bebfb592bd28918 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Petra=20C=CC=8Ci=CC=81halova=CC=81?= Date: Mon, 11 Mar 2024 14:54:29 +0100 Subject: [PATCH 14/32] the non org admins cannot remove a role into group with User Access Administrator role --- rbac/management/group/view.py | 13 +++++++++++++ 1 file changed, 13 insertions(+) diff --git a/rbac/management/group/view.py b/rbac/management/group/view.py index 95dc3dba..195ab4b7 100644 --- a/rbac/management/group/view.py +++ b/rbac/management/group/view.py @@ -1052,6 +1052,19 @@ def roles(self, request, uuid=None, principals=None): return self.get_paginated_response(serializer.data) else: self.protect_default_admin_group_roles(group) + + # Only organization administrators are allowed to remove a role from a group + # with the "User Access Administrator" role. + if not request.user.admin: + for role in group.roles_with_access(): + if role.name == USER_ACCESS_ADMINISTRATOR_ROLE_KEY: + key = "remove_role" + message = ( + "Non-admin users may not remove a role from a group with " + f"'{USER_ACCESS_ADMINISTRATOR_ROLE_KEY}' role." + ) + raise serializers.ValidationError({key: _(message)}) + if ROLES_KEY not in request.query_params: key = "detail" message = "Query parameter {} is required.".format(ROLES_KEY) From 2b8f6cca6e0825861df3c72cbcb8e83efd2978b4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Petra=20C=CC=8Ci=CC=81halova=CC=81?= Date: Mon, 11 Mar 2024 15:36:47 +0100 Subject: [PATCH 15/32] moved the logic for disallowing group with User Access Administrator role from being updated into a method --- rbac/management/group/view.py | 76 +++++++++-------------------------- 1 file changed, 19 insertions(+), 57 deletions(-) diff --git a/rbac/management/group/view.py b/rbac/management/group/view.py index 195ab4b7..6c6d0a94 100644 --- a/rbac/management/group/view.py +++ b/rbac/management/group/view.py @@ -209,6 +209,19 @@ def protect_default_admin_group_roles(self, group): error = {"group": [_("Default admin access cannot be modified.")]} raise serializers.ValidationError(error) + def protect_group_with_user_access_admin_role(self, roles, source_key): + """Disallow group with 'User Access administrator' role from being updated.""" + # Only organization administrators are allowed to create, modify, or delete a group + # that is assigned the User Access administrator role. + for role in roles: + if role.name == USER_ACCESS_ADMINISTRATOR_ROLE_KEY: + key = source_key + message = ( + "Non-admin users are not allowed to create, modify, or delete a group that is assigned the " + f"'{USER_ACCESS_ADMINISTRATOR_ROLE_KEY}' role." + ) + raise serializers.ValidationError({key: _(message)}) + def create(self, request, *args, **kwargs): """Create a group. @@ -341,16 +354,8 @@ def destroy(self, request, *args, **kwargs): validate_uuid(kwargs.get("uuid"), "group uuid validation") self.protect_system_groups("delete") group = self.get_object() - - # Only organization administrators are allowed to remove a groups with the "User Access Administrator" role. if not request.user.admin: - for role in group.roles_with_access(): - if role.name == USER_ACCESS_ADMINISTRATOR_ROLE_KEY: - key = "remove_group" - message = ( - f"Non-admin users may not remove a group with '{USER_ACCESS_ADMINISTRATOR_ROLE_KEY}' role." - ) - raise serializers.ValidationError({key: _(message)}) + self.protect_group_with_user_access_admin_role(group.roles_with_access(), "remove_group") response = super().destroy(request=request, args=args, kwargs=kwargs) if response.status_code == status.HTTP_204_NO_CONTENT: @@ -383,15 +388,8 @@ def update(self, request, *args, **kwargs): self.protect_system_groups("update") group = self.get_object() - # Only organization administrators are allowed to update a groups with the "User Access Administrator" role. if not request.user.admin: - for role in group.roles_with_access(): - if role.name == USER_ACCESS_ADMINISTRATOR_ROLE_KEY: - key = "update_group" - message = ( - f"Non-admin users may not update a group with '{USER_ACCESS_ADMINISTRATOR_ROLE_KEY}' role." - ) - raise serializers.ValidationError({key: _(message)}) + self.protect_group_with_user_access_admin_role(group.roles_with_access(), "update_group") return super().update(request=request, args=args, kwargs=kwargs) @@ -618,17 +616,8 @@ def principals(self, request: Request, uuid: Optional[UUID] = None): # Make sure that system groups are kept unmodified. self.protect_system_groups("add principals") - # Only organization administrators are allowed to add principals to groups with the "User Access - # Administrator" role. if not request.user.admin: - for role in group.roles_with_access(): - if role.name == USER_ACCESS_ADMINISTRATOR_ROLE_KEY: - key = "add_principals" - message = ( - "Non-admin users may not add principals to Groups with " - f"'{USER_ACCESS_ADMINISTRATOR_ROLE_KEY}' role." - ) - raise serializers.ValidationError({key: _(message)}) + self.protect_group_with_user_access_admin_role(group.roles_with_access(), "add_principals") serializer = GroupPrincipalInputSerializer(data=request.data) @@ -860,17 +849,8 @@ def principals(self, request: Request, uuid: Optional[UUID] = None): else: self.protect_system_groups("remove principals") - # Only organization administrators are allowed to remove principals from groups with the - # "User Access Administrator" role. if not request.user.admin: - for role in group.roles_with_access(): - if role.name == USER_ACCESS_ADMINISTRATOR_ROLE_KEY: - key = "remove_principals" - message = ( - "Non-admin users may not remove principals from Groups with " - f"'{USER_ACCESS_ADMINISTRATOR_ROLE_KEY}' role." - ) - raise serializers.ValidationError({key: _(message)}) + self.protect_group_with_user_access_admin_role(group.roles_with_access(), "remove_principals") if SERVICE_ACCOUNTS_KEY not in request.query_params and USERNAMES_KEY not in request.query_params: key = "detail" @@ -1027,17 +1007,8 @@ def roles(self, request, uuid=None, principals=None): if request.method == "POST": self.protect_default_admin_group_roles(group) - # Only organization administrators are allowed to add a role into a groups - # with the "User Access Administrator" role. if not request.user.admin: - for role in group.roles_with_access(): - if role.name == USER_ACCESS_ADMINISTRATOR_ROLE_KEY: - key = "add_role" - message = ( - "Non-admin users may not add a role into a group with " - f"'{USER_ACCESS_ADMINISTRATOR_ROLE_KEY}' role." - ) - raise serializers.ValidationError({key: _(message)}) + self.protect_group_with_user_access_admin_role(group.roles_with_access(), "add_role") serializer = GroupRoleSerializerIn(data=request.data) if serializer.is_valid(raise_exception=True): @@ -1053,17 +1024,8 @@ def roles(self, request, uuid=None, principals=None): else: self.protect_default_admin_group_roles(group) - # Only organization administrators are allowed to remove a role from a group - # with the "User Access Administrator" role. if not request.user.admin: - for role in group.roles_with_access(): - if role.name == USER_ACCESS_ADMINISTRATOR_ROLE_KEY: - key = "remove_role" - message = ( - "Non-admin users may not remove a role from a group with " - f"'{USER_ACCESS_ADMINISTRATOR_ROLE_KEY}' role." - ) - raise serializers.ValidationError({key: _(message)}) + self.protect_group_with_user_access_admin_role(group.roles_with_access(), "remove_role") if ROLES_KEY not in request.query_params: key = "detail" From 6da73abf6098418756a57f103d558806e1392620 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Petra=20C=CC=8Ci=CC=81halova=CC=81?= Date: Wed, 13 Mar 2024 11:50:58 +0100 Subject: [PATCH 16/32] add type annotation in the '_create_request_context' function --- tests/identity_request.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/identity_request.py b/tests/identity_request.py index b74bea61..71bcbd4a 100644 --- a/tests/identity_request.py +++ b/tests/identity_request.py @@ -80,7 +80,7 @@ def _create_service_account_data(cls) -> dict[str, str]: def _create_request_context( cls, customer_data: dict[str, str], - user_data: dict[str, str], + user_data: dict[str, str] = None, is_org_admin: bool = True, is_internal: bool = False, cross_account: bool = False, From 4d98526770c7f22b2a9b132856655937b572824b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Petra=20C=CC=8Ci=CC=81halova=CC=81?= Date: Wed, 13 Mar 2024 11:52:28 +0100 Subject: [PATCH 17/32] make the '_create_group_with_user_access_administrator_role' as static method --- tests/management/group/test_view.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/tests/management/group/test_view.py b/tests/management/group/test_view.py index bd706c99..01df5533 100644 --- a/tests/management/group/test_view.py +++ b/tests/management/group/test_view.py @@ -2643,7 +2643,8 @@ def tearDown(self): Role.objects.all().delete() Policy.objects.all().delete() - def _create_group_with_user_access_administrator_role(self, tenant: Tenant) -> Group: + @staticmethod + def _create_group_with_user_access_administrator_role(tenant: Tenant) -> Group: """Create a group with an associated User Access Administrator role.""" # Replicate a "User Access Administrator" permission. rbac_user_access_admin_permission = Permission() From 89debe78cce2e16c62640ad090a971d9fa22089f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Petra=20C=CC=8Ci=CC=81halova=CC=81?= Date: Wed, 13 Mar 2024 11:53:09 +0100 Subject: [PATCH 18/32] add fixtures needed for test with non org admin headers --- tests/management/group/test_view.py | 56 +++++++++++++++++++++++++++++ 1 file changed, 56 insertions(+) diff --git a/tests/management/group/test_view.py b/tests/management/group/test_view.py index 01df5533..3cf36f03 100644 --- a/tests/management/group/test_view.py +++ b/tests/management/group/test_view.py @@ -2636,6 +2636,62 @@ def setUp(self): ) self.role.save() + # Create 2 non org admin principals and 1 org admin + # 1. user based principal + self.user_based_principal = Principal(username="user_based_principal", tenant=self.tenant) + self.user_based_principal.save() + + customer_data = { + "account_id": self.tenant.account_id, + "tenant_name": self.tenant.tenant_name, + "org_id": self.tenant.org_id, + } + + request_context_user_based_principal = self._create_request_context( + customer_data=customer_data, + user_data={"username": self.user_based_principal.username, "email": "test@email.com"}, + is_org_admin=False, + ) + self.headers_user_based_principal = request_context_user_based_principal["request"].META + + # 2. service account based principal + service_account_data = self._create_service_account_data() + self.service_account_principal = Principal( + username=service_account_data["username"], + tenant=self.tenant, + type="service-account", + service_account_id=service_account_data["client_id"], + ) + self.service_account_principal.save() + + request_context_service_account_principal = self._create_request_context( + customer_data=customer_data, + service_account_data=service_account_data, + is_org_admin=False, + ) + self.headers_service_account_principal = request_context_service_account_principal["request"].META + + # 3 org admin principal in the tenant + self.org_admin = Principal(username="org_admin", tenant=self.tenant) + self.org_admin.save() + + request_context_org_admin = self._create_request_context( + customer_data=customer_data, + user_data={"username": self.org_admin.username, "email": "test@email.com"}, + is_org_admin=True, + ) + self.headers_org_admin = request_context_org_admin["request"].META + + # Error messages + self.no_permission_err_message = "You do not have permission to perform this action." + self.user_access_admin_group_err_message = ( + "Non-admin users are not allowed to create, modify, or delete a " + "group that is assigned the 'User Access administrator' role." + ) + self.user_access_admin_role_err_message = ( + "Non-admin users cannot add 'User Access administrator' role to groups." + ) + def tearDown(self): """Tear down group view tests.""" Group.objects.all().delete() From 70f70d14f2e4c4807d2222628ed001662c703bb9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Petra=20C=CC=8Ci=CC=81halova=CC=81?= Date: Wed, 13 Mar 2024 11:56:00 +0100 Subject: [PATCH 19/32] add tests for non org admin and POST /groups/ --- tests/management/group/test_view.py | 31 +++++++++++++++++++++++++++++ 1 file changed, 31 insertions(+) diff --git a/tests/management/group/test_view.py b/tests/management/group/test_view.py index 3cf36f03..d99b27f6 100644 --- a/tests/management/group/test_view.py +++ b/tests/management/group/test_view.py @@ -3371,3 +3371,34 @@ def test_group_user_with_user_administrator_role_remove_principals(self): "after removing the principals from the regular group, the added service account and the user" " principal should no longer be present", ) + + def test_create_group_without_User_Access_Admin_fail(self): + """Test that non org admin without 'User Access administrator' role cannot create a group.""" + url = reverse("group-list") + client = APIClient() + + request_body = {"name": "New group name"} + response = client.post(url, request_body, format="json", **self.headers_user_based_principal) + self.assertEqual(response.status_code, status.HTTP_403_FORBIDDEN) + self.assertEqual(response.data.get("errors")[0].get("detail"), self.no_permission_err_message) + + response = client.post(url, request_body, format="json", **self.headers_service_account_principal) + self.assertEqual(response.status_code, status.HTTP_403_FORBIDDEN) + self.assertEqual(response.data.get("errors")[0].get("detail"), self.no_permission_err_message) + + def test_create_group_with_User_Access_Admin_success(self): + """Test that non org admin with 'User Access administrator' role can create a group.""" + # Create a group with 'User Access administrator' role and add principals we use in headers + group_with_UA_admin = self._create_group_with_user_access_administrator_role(self.tenant) + group_with_UA_admin.principals.add(self.user_based_principal, self.service_account_principal) + + url = reverse("group-list") + client = APIClient() + + request_body = {"name": "New group created by user based principal"} + response = client.post(url, request_body, format="json", **self.headers_user_based_principal) + self.assertEqual(response.status_code, status.HTTP_201_CREATED) + + request_body = {"name": "New group created by service account based principal"} + response = client.post(url, request_body, format="json", **self.headers_service_account_principal) + self.assertEqual(response.status_code, status.HTTP_201_CREATED) From 504ad4444cc83bba9315f654d0d00834c4434ff8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Petra=20C=CC=8Ci=CC=81halova=CC=81?= Date: Wed, 13 Mar 2024 11:57:01 +0100 Subject: [PATCH 20/32] add tests for non org admin and PUT /groups/{uuid}/ --- tests/management/group/test_view.py | 82 +++++++++++++++++++++++++++++ 1 file changed, 82 insertions(+) diff --git a/tests/management/group/test_view.py b/tests/management/group/test_view.py index d99b27f6..22ea52b1 100644 --- a/tests/management/group/test_view.py +++ b/tests/management/group/test_view.py @@ -3402,3 +3402,85 @@ def test_create_group_with_User_Access_Admin_success(self): request_body = {"name": "New group created by service account based principal"} response = client.post(url, request_body, format="json", **self.headers_service_account_principal) self.assertEqual(response.status_code, status.HTTP_201_CREATED) + + def test_update_group_without_User_Access_Admin_fail(self): + """Test that non org admin without 'User Access administrator' role cannot update a group.""" + test_group = Group(name="test group", tenant=self.tenant) + test_group.save() + + url = reverse("group-detail", kwargs={"uuid": test_group.uuid}) + request_body = {"name": "new name"} + client = APIClient() + + response = client.put(url, request_body, format="json", **self.headers_user_based_principal) + self.assertEqual(response.status_code, status.HTTP_403_FORBIDDEN) + self.assertEqual(response.data.get("errors")[0].get("detail"), self.no_permission_err_message) + + response = client.put(url, request_body, format="json", **self.headers_service_account_principal) + self.assertEqual(response.status_code, status.HTTP_403_FORBIDDEN) + self.assertEqual(response.data.get("errors")[0].get("detail"), self.no_permission_err_message) + + def test_update_group_with_User_Access_Admin_success(self): + """ + Test that non org admin with 'User Access administrator' role can update a group + without 'User Access administrator' role. + """ + # Create a group with 'User Access administrator' role and add principals we use in headers + group_with_UA_admin = self._create_group_with_user_access_administrator_role(self.tenant) + group_with_UA_admin.principals.add(self.user_based_principal, self.service_account_principal) + + test_group = Group(name="test group", tenant=self.tenant) + test_group.save() + + url = reverse("group-detail", kwargs={"uuid": test_group.uuid}) + client = APIClient() + + new_name_user = "New name - user based principal" + request_body = {"name": new_name_user} + response = client.put(url, request_body, format="json", **self.headers_user_based_principal) + self.assertEqual(response.status_code, status.HTTP_200_OK) + self.assertEqual(response.json()["name"], new_name_user) + + new_name_sa = "New name - service account principal" + request_body = {"name": new_name_sa} + response = client.put(url, request_body, format="json", **self.headers_service_account_principal) + self.assertEqual(response.status_code, status.HTTP_200_OK) + self.assertEqual(response.json()["name"], new_name_sa) + + def test_update_group_with_User_Access_Admin_fail(self): + """ + Test that non org admin with 'User Access administrator' role cannot update a group + with 'User Access administrator' role. + """ + # Create a group with 'User Access administrator' role and add principals we use in headers + group_with_UA_admin = self._create_group_with_user_access_administrator_role(self.tenant) + group_with_UA_admin.principals.add(self.user_based_principal, self.service_account_principal) + + # Create another group with 'User Access administrator' role we will try to update + test_group = Group(name="test group", tenant=self.tenant) + test_group.save() + + user_access_admin_role = group_with_UA_admin.roles()[0] + request_body = {"roles": [user_access_admin_role.uuid]} + + url = reverse("group-roles", kwargs={"uuid": test_group.uuid}) + client = APIClient() + response = client.post(url, request_body, format="json", **self.headers_org_admin) + # Role 'User Access administrator' added successfully into test group + self.assertEqual(response.status_code, status.HTTP_200_OK) + + # Try to update a group with 'User Access administrator' + url = reverse("group-detail", kwargs={"uuid": test_group.uuid}) + request_body = {"name": "new name"} + + response = client.put(url, request_body, format="json", **self.headers_user_based_principal) + self.assertEqual(response.status_code, status.HTTP_400_BAD_REQUEST) + self.assertEqual(response.data.get("errors")[0].get("detail"), self.user_access_admin_group_err_message) + + response = client.put(url, request_body, format="json", **self.headers_service_account_principal) + self.assertEqual(response.status_code, status.HTTP_400_BAD_REQUEST) + self.assertEqual(response.data.get("errors")[0].get("detail"), self.user_access_admin_group_err_message) + + # Only Org Admin can update a group with 'User Access administrator' + response = client.put(url, request_body, format="json", **self.headers_org_admin) + self.assertEqual(response.status_code, status.HTTP_200_OK) From dae82ffd4cc99e3ae059af3fb482540348ce5c20 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Petra=20C=CC=8Ci=CC=81halova=CC=81?= Date: Wed, 13 Mar 2024 11:57:53 +0100 Subject: [PATCH 21/32] add tests for non org admin and DEL /groups/{uuid}/ --- tests/management/group/test_view.py | 78 +++++++++++++++++++++++++++++ 1 file changed, 78 insertions(+) diff --git a/tests/management/group/test_view.py b/tests/management/group/test_view.py index 22ea52b1..e685542d 100644 --- a/tests/management/group/test_view.py +++ b/tests/management/group/test_view.py @@ -3484,3 +3484,81 @@ def test_update_group_with_User_Access_Admin_fail(self): # Only Org Admin can update a group with 'User Access administrator' response = client.put(url, request_body, format="json", **self.headers_org_admin) self.assertEqual(response.status_code, status.HTTP_200_OK) + + def test_remove_group_without_User_Access_Admin_fail(self): + """Test that non org admin without 'User Access administrator' role cannot remove a group.""" + test_group = Group(name="test group", tenant=self.tenant) + test_group.save() + + url = reverse("group-detail", kwargs={"uuid": test_group.uuid}) + client = APIClient() + + response = client.delete(url, **self.headers_user_based_principal) + self.assertEqual(response.status_code, status.HTTP_403_FORBIDDEN) + self.assertEqual(response.data.get("errors")[0].get("detail"), self.no_permission_err_message) + + response = client.delete(url, **self.headers_service_account_principal) + self.assertEqual(response.status_code, status.HTTP_403_FORBIDDEN) + self.assertEqual(response.data.get("errors")[0].get("detail"), self.no_permission_err_message) + + def test_remove_group_with_User_Access_Admin_success(self): + """ + Test that non org admin with 'User Access administrator' role can remove a group + without 'User Access administrator' role. + """ + # Create a group with 'User Access administrator' role and add principals we use in headers + group_with_UA_admin = self._create_group_with_user_access_administrator_role(self.tenant) + group_with_UA_admin.principals.add(self.user_based_principal, self.service_account_principal) + + test_group = Group(name="test group", tenant=self.tenant) + test_group.save() + + client = APIClient() + + url = reverse("group-detail", kwargs={"uuid": test_group.uuid}) + response = client.delete(url, **self.headers_user_based_principal) + self.assertEqual(response.status_code, status.HTTP_204_NO_CONTENT) + + test_group = Group(name="test group", tenant=self.tenant) + test_group.save() + + url = reverse("group-detail", kwargs={"uuid": test_group.uuid}) + response = client.delete(url, **self.headers_service_account_principal) + self.assertEqual(response.status_code, status.HTTP_204_NO_CONTENT) + + def test_remove_group_with_User_Access_Admin_fail(self): + """ + Test that non org admin with 'User Access administrator' role cannot update a group + with 'User Access administrator' role. + """ + # Create a group with 'User Access administrator' role and add principals we use in headers + group_with_UA_admin = self._create_group_with_user_access_administrator_role(self.tenant) + group_with_UA_admin.principals.add(self.user_based_principal, self.service_account_principal) + + # Create another group with 'User Access administrator' role we will try to remove + test_group = Group(name="test group", tenant=self.tenant) + test_group.save() + + user_access_admin_role = group_with_UA_admin.roles()[0] + request_body = {"roles": [user_access_admin_role.uuid]} + + url = reverse("group-roles", kwargs={"uuid": test_group.uuid}) + client = APIClient() + response = client.post(url, request_body, format="json", **self.headers_org_admin) + # Role 'User Access administrator' added successfully into test group + self.assertEqual(response.status_code, status.HTTP_200_OK) + + # Try to remove a group with 'User Access administrator' + url = reverse("group-detail", kwargs={"uuid": test_group.uuid}) + + response = client.delete(url, **self.headers_user_based_principal) + self.assertEqual(response.status_code, status.HTTP_400_BAD_REQUEST) + self.assertEqual(response.data.get("errors")[0].get("detail"), self.user_access_admin_group_err_message) + + response = client.delete(url, **self.headers_service_account_principal) + self.assertEqual(response.status_code, status.HTTP_400_BAD_REQUEST) + self.assertEqual(response.data.get("errors")[0].get("detail"), self.user_access_admin_group_err_message) + + # Only Org Admin can remove a group with 'User Access administrator' + response = client.delete(url, **self.headers_org_admin) + self.assertEqual(response.status_code, status.HTTP_204_NO_CONTENT) From 99f7159625531322a5316ea1511dce98e26c558b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Petra=20C=CC=8Ci=CC=81halova=CC=81?= Date: Wed, 13 Mar 2024 11:59:06 +0100 Subject: [PATCH 22/32] add tests for non org admin and GET /groups/{uuid}/principals/ --- tests/management/group/test_view.py | 140 ++++++++++++++++++++++++++++ 1 file changed, 140 insertions(+) diff --git a/tests/management/group/test_view.py b/tests/management/group/test_view.py index e685542d..c9018a45 100644 --- a/tests/management/group/test_view.py +++ b/tests/management/group/test_view.py @@ -3562,3 +3562,143 @@ def test_remove_group_with_User_Access_Admin_fail(self): # Only Org Admin can remove a group with 'User Access administrator' response = client.delete(url, **self.headers_org_admin) self.assertEqual(response.status_code, status.HTTP_204_NO_CONTENT) + + def test_list_user_based_principals_in_group_without_User_Access_Admin_fail(self): + """ + Test that non org admin without 'User Access administrator' role cannot list + user based principals in group. + """ + test_group = Group(name="test group", tenant=self.tenant) + test_group.save() + test_group.principals.add(self.principal) + test_group.save() + + url = reverse("group-principals", kwargs={"uuid": test_group.uuid}) + client = APIClient() + + response = client.get(url, format="json", **self.headers_user_based_principal) + self.assertEqual(response.status_code, status.HTTP_403_FORBIDDEN) + self.assertEqual(response.data.get("errors")[0].get("detail"), self.no_permission_err_message) + + response = client.get(url, format="json", **self.headers_service_account_principal) + self.assertEqual(response.status_code, status.HTTP_403_FORBIDDEN) + self.assertEqual(response.data.get("errors")[0].get("detail"), self.no_permission_err_message) + + # Check that principal is in the db + self.assertIsNotNone(Principal.objects.get(username=self.principal.username)) + + def test_list_service_account_principals_in_group_without_User_Access_Admin_fail(self): + """ + Test that non org admin without 'User Access administrator' role cannot list + service account based principals in group. + """ + test_group = Group(name="test group", tenant=self.tenant) + test_group.save() + service_account_data = self._create_service_account_data() + sa_principal = Principal( + username=service_account_data["username"], + tenant=self.tenant, + type="service-account", + service_account_id=service_account_data["client_id"], + ) + sa_principal.save() + test_group.principals.add(sa_principal) + test_group.save() + + url = reverse("group-principals", kwargs={"uuid": test_group.uuid}) + "?principal_type=service-account" + client = APIClient() + + response = client.get(url, format="json", **self.headers_user_based_principal) + self.assertEqual(response.status_code, status.HTTP_403_FORBIDDEN) + self.assertEqual(response.data.get("errors")[0].get("detail"), self.no_permission_err_message) + + response = client.get(url, format="json", **self.headers_service_account_principal) + self.assertEqual(response.status_code, status.HTTP_403_FORBIDDEN) + self.assertEqual(response.data.get("errors")[0].get("detail"), self.no_permission_err_message) + + # Check that principal is in the db + self.assertIsNotNone(Principal.objects.get(username=sa_principal.username)) + + @patch( + "management.principal.proxy.PrincipalProxy.request_filtered_principals", + return_value={"status_code": 200, "data": []}, + ) + def test_list_user_based_principals_in_group_with_User_Access_Admin_success(self, mock_request): + """ + Test that non org admin with 'User Access administrator' role can list + user based principals in group. + """ + # Create a group with 'User Access administrator' role and add principals we use in headers + group_with_UA_admin = self._create_group_with_user_access_administrator_role(self.tenant) + group_with_UA_admin.principals.add(self.user_based_principal, self.service_account_principal) + + test_group = Group(name="test group", tenant=self.tenant) + test_group.save() + test_principal = Principal(username="test-principal", tenant=self.tenant) + test_principal.save() + test_group.principals.add(test_principal) + test_group.save() + + # Set the return value for the mock + mock_request.return_value["data"] = [{"username": test_principal.username}] + + url = reverse("group-principals", kwargs={"uuid": test_group.uuid}) + client = APIClient() + + response = client.get(url, **self.headers_user_based_principal) + self.assertEqual(response.status_code, status.HTTP_200_OK) + self.assertEqual(len(response.data.get("data")), 1) + + response = client.get(url, **self.headers_service_account_principal) + self.assertEqual(response.status_code, status.HTTP_200_OK) + self.assertEqual(len(response.data.get("data")), 1) + + @override_settings(IT_BYPASS_TOKEN_VALIDATION=True) + @patch("management.principal.it_service.ITService.request_service_accounts") + def test_list_service_account_principals_in_group_with_User_Access_Admin_success(self, mock_request): + """ + Test that non org admin with 'User Access administrator' role can list + service account based principals in group. + """ + # Create a group with 'User Access administrator' role and add principals we use in headers + group_with_UA_admin = self._create_group_with_user_access_administrator_role(self.tenant) + group_with_UA_admin.principals.add(self.user_based_principal, self.service_account_principal) + + test_group = Group(name="test group", tenant=self.tenant) + test_group.save() + service_account_data = self._create_service_account_data() + sa_principal = Principal( + username=service_account_data["username"], + tenant=self.tenant, + type="service-account", + service_account_id=service_account_data["client_id"], + ) + sa_principal.save() + test_group.principals.add(sa_principal) + test_group.save() + + # Set the return value for the mock + sa_uuid = sa_principal.service_account_id + mocked_values = [ + { + "clientID": sa_uuid, + "name": f"Service Account name", + "description": f"Service Account description", + "owner": "jsmith", + "username": "service_account-" + sa_uuid, + "time_created": 1706784741, + "type": "service-account", + } + ] + mock_request.return_value = mocked_values + + url = reverse("group-principals", kwargs={"uuid": test_group.uuid}) + "?principal_type=service-account" + client = APIClient() + + response = client.get(url, **self.headers_user_based_principal) + self.assertEqual(response.status_code, status.HTTP_200_OK) + self.assertEqual(len(response.data.get("data")), 1) + + response = client.get(url, **self.headers_service_account_principal) + self.assertEqual(response.status_code, status.HTTP_200_OK) + self.assertEqual(len(response.data.get("data")), 1) From 9183be31d9768527e95fc27baa8ee2b22e923f5b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Petra=20C=CC=8Ci=CC=81halova=CC=81?= Date: Wed, 13 Mar 2024 12:00:51 +0100 Subject: [PATCH 23/32] add tests for non org admin and POST /groups/{uuid}/principals/ --- tests/management/group/test_view.py | 284 ++++++++++++++++++++++++++++ 1 file changed, 284 insertions(+) diff --git a/tests/management/group/test_view.py b/tests/management/group/test_view.py index c9018a45..9fe20579 100644 --- a/tests/management/group/test_view.py +++ b/tests/management/group/test_view.py @@ -3702,3 +3702,287 @@ def test_list_service_account_principals_in_group_with_User_Access_Admin_success response = client.get(url, **self.headers_service_account_principal) self.assertEqual(response.status_code, status.HTTP_200_OK) self.assertEqual(len(response.data.get("data")), 1) + + @patch( + "management.principal.proxy.PrincipalProxy.request_filtered_principals", + return_value={"status_code": 200, "data": []}, + ) + def test_add_user_based_principal_in_group_without_User_Access_Admin_fail(self, mock_request): + """ + Test that non org admin without 'User Access administrator' role cannot add + user based principal into a group without 'User Access administrator' role. + """ + test_group = Group(name="test group", tenant=self.tenant) + test_group.save() + + test_principal = Principal(username="test-principal", tenant=self.tenant) + test_principal.save() + + # Set the return value for the mock + mock_request.return_value["data"] = [{"username": test_principal.username}] + + url = reverse("group-principals", kwargs={"uuid": test_group.uuid}) + client = APIClient() + + request_body = {"principals": [{"username": test_principal.username}]} + + response = client.post(url, request_body, format="json", **self.headers_user_based_principal) + self.assertEqual(response.status_code, status.HTTP_403_FORBIDDEN) + self.assertEqual(response.data.get("errors")[0].get("detail"), self.no_permission_err_message) + + response = client.post(url, request_body, format="json", **self.headers_service_account_principal) + self.assertEqual(response.status_code, status.HTTP_403_FORBIDDEN) + self.assertEqual(response.data.get("errors")[0].get("detail"), self.no_permission_err_message) + + # Org Admin can add this principal into a group + response = client.post(url, request_body, format="json", **self.headers_org_admin) + self.assertEqual(response.status_code, status.HTTP_200_OK) + + @override_settings(IT_BYPASS_TOKEN_VALIDATION=True) + @patch("management.principal.it_service.ITService.request_service_accounts") + def test_add_service_account_principal_in_group_without_User_Access_Admin_fail(self, mock_request): + """ + Test that non org admin without 'User Access administrator' role cannot add + service account based principal into a group without 'User Access administrator' role. + """ + test_group = Group(name="test group", tenant=self.tenant) + test_group.save() + + service_account_data = self._create_service_account_data() + sa_principal = Principal( + username=service_account_data["username"], + tenant=self.tenant, + type="service-account", + service_account_id=service_account_data["client_id"], + ) + sa_principal.save() + + # Set the return value for the mock + sa_uuid = sa_principal.service_account_id + mocked_values = [ + { + "clientID": sa_uuid, + "name": f"Service Account name", + "description": f"Service Account description", + "owner": "jsmith", + "username": "service_account-" + sa_uuid, + "time_created": 1706784741, + "type": "service-account", + } + ] + mock_request.return_value = mocked_values + + url = reverse("group-principals", kwargs={"uuid": test_group.uuid}) + client = APIClient() + + request_body = {"principals": [{"clientID": sa_uuid, "type": "service-account"}]} + + response = client.post(url, request_body, format="json", **self.headers_user_based_principal) + self.assertEqual(response.status_code, status.HTTP_403_FORBIDDEN) + self.assertEqual(response.data.get("errors")[0].get("detail"), self.no_permission_err_message) + + response = client.post(url, request_body, format="json", **self.headers_service_account_principal) + self.assertEqual(response.status_code, status.HTTP_403_FORBIDDEN) + self.assertEqual(response.data.get("errors")[0].get("detail"), self.no_permission_err_message) + + # Org Admin can add this principal into a group + response = client.post(url, request_body, format="json", **self.headers_org_admin) + self.assertEqual(response.status_code, status.HTTP_200_OK) + + @patch( + "management.principal.proxy.PrincipalProxy.request_filtered_principals", + return_value={"status_code": 200, "data": []}, + ) + def test_add_user_based_principal_in_group_with_User_Access_Admin_success(self, mock_request): + """ + Test that non org admin with 'User Access administrator' role can add + user based principal into a group without 'User Access administrator' role. + """ + # Create a group with 'User Access administrator' role and add principals we use in headers + group_with_UA_admin = self._create_group_with_user_access_administrator_role(self.tenant) + group_with_UA_admin.principals.add(self.user_based_principal, self.service_account_principal) + + # Create a group and a principal + test_group = Group(name="test group", tenant=self.tenant) + test_group.save() + + test_principal = Principal(username="test-principal", tenant=self.tenant) + test_principal.save() + + # Set the return value for the mock + mock_request.return_value["data"] = [{"username": test_principal.username}] + + url = reverse("group-principals", kwargs={"uuid": test_group.uuid}) + client = APIClient() + + request_body = {"principals": [{"username": test_principal.username}]} + + response = client.post(url, request_body, format="json", **self.headers_user_based_principal) + self.assertEqual(response.status_code, status.HTTP_200_OK) + + response = client.post(url, request_body, format="json", **self.headers_service_account_principal) + self.assertEqual(response.status_code, status.HTTP_200_OK) + + @override_settings(IT_BYPASS_TOKEN_VALIDATION=True) + @patch("management.principal.it_service.ITService.request_service_accounts") + def test_add_service_account_principal_in_group_with_User_Access_Admin_success(self, mock_request): + """ + Test that non org admin with 'User Access administrator' role can add + service account based principal into a group without 'User Access administrator' role. + """ + # Create a group with 'User Access administrator' role and add principals we use in headers + group_with_UA_admin = self._create_group_with_user_access_administrator_role(self.tenant) + group_with_UA_admin.principals.add(self.user_based_principal, self.service_account_principal) + + # Create a group and a principal + test_group = Group(name="test group", tenant=self.tenant) + test_group.save() + + service_account_data = self._create_service_account_data() + sa_principal = Principal( + username=service_account_data["username"], + tenant=self.tenant, + type="service-account", + service_account_id=service_account_data["client_id"], + ) + sa_principal.save() + + # Set the return value for the mock + sa_uuid = sa_principal.service_account_id + mocked_values = [ + { + "clientID": sa_uuid, + "name": f"Service Account name", + "description": f"Service Account description", + "owner": "jsmith", + "username": "service_account-" + sa_uuid, + "time_created": 1706784741, + "type": "service-account", + } + ] + mock_request.return_value = mocked_values + + url = reverse("group-principals", kwargs={"uuid": test_group.uuid}) + client = APIClient() + + request_body = {"principals": [{"clientID": sa_uuid, "type": "service-account"}]} + + response = client.post(url, request_body, format="json", **self.headers_user_based_principal) + self.assertEqual(response.status_code, status.HTTP_200_OK) + + response = client.post(url, request_body, format="json", **self.headers_service_account_principal) + self.assertEqual(response.status_code, status.HTTP_200_OK) + + @patch( + "management.principal.proxy.PrincipalProxy.request_filtered_principals", + return_value={"status_code": 200, "data": []}, + ) + def test_add_user_based_principal_in_group_with_User_Access_Admin_fail(self, mock_request): + """ + Test that non org admin with 'User Access administrator' role cannot add + user based principal into a group with 'User Access administrator' role. + """ + # Create a group with 'User Access administrator' role and add principals we use in headers + group_with_UA_admin = self._create_group_with_user_access_administrator_role(self.tenant) + group_with_UA_admin.principals.add(self.user_based_principal, self.service_account_principal) + + # Create a group with 'User Access administrator' role and a principal + test_group = Group(name="test group", tenant=self.tenant) + test_group.save() + + user_access_admin_role = group_with_UA_admin.roles()[0] + request_body = {"roles": [user_access_admin_role.uuid]} + + url = reverse("group-roles", kwargs={"uuid": test_group.uuid}) + client = APIClient() + response = client.post(url, request_body, format="json", **self.headers_org_admin) + # Role 'User Access administrator' added successfully into test group + self.assertEqual(response.status_code, status.HTTP_200_OK) + + test_principal = Principal(username="test-principal", tenant=self.tenant) + test_principal.save() + + # Set the return value for the mock + mock_request.return_value["data"] = [{"username": test_principal.username}] + + url = reverse("group-principals", kwargs={"uuid": test_group.uuid}) + client = APIClient() + + request_body = {"principals": [{"username": test_principal.username}]} + + response = client.post(url, request_body, format="json", **self.headers_user_based_principal) + self.assertEqual(response.status_code, status.HTTP_400_BAD_REQUEST) + self.assertEqual(response.data.get("errors")[0].get("detail"), self.user_access_admin_group_err_message) + + response = client.post(url, request_body, format="json", **self.headers_service_account_principal) + self.assertEqual(response.status_code, status.HTTP_400_BAD_REQUEST) + self.assertEqual(response.data.get("errors")[0].get("detail"), self.user_access_admin_group_err_message) + + # Only Org Admin can add a principal into a group with 'User Access administrator' role + response = client.post(url, request_body, format="json", **self.headers_org_admin) + self.assertEqual(response.status_code, status.HTTP_200_OK) + + @override_settings(IT_BYPASS_TOKEN_VALIDATION=True) + @patch("management.principal.it_service.ITService.request_service_accounts") + def test_add_service_account_principal_in_group_with_User_Access_Admin_fail(self, mock_request): + """ + Test that non org admin with 'User Access administrator' role cannot add + service account based principal into a group with 'User Access administrator' role. + """ + # Create a group with 'User Access administrator' role and add principals we use in headers + group_with_UA_admin = self._create_group_with_user_access_administrator_role(self.tenant) + group_with_UA_admin.principals.add(self.user_based_principal, self.service_account_principal) + + # Create a group with 'User Access administrator' role and a principal + test_group = Group(name="test group", tenant=self.tenant) + test_group.save() + + user_access_admin_role = group_with_UA_admin.roles()[0] + request_body = {"roles": [user_access_admin_role.uuid]} + + url = reverse("group-roles", kwargs={"uuid": test_group.uuid}) + client = APIClient() + response = client.post(url, request_body, format="json", **self.headers_org_admin) + # Role 'User Access administrator' added successfully into test group + self.assertEqual(response.status_code, status.HTTP_200_OK) + + service_account_data = self._create_service_account_data() + sa_principal = Principal( + username=service_account_data["username"], + tenant=self.tenant, + type="service-account", + service_account_id=service_account_data["client_id"], + ) + sa_principal.save() + + # Set the return value for the mock + sa_uuid = sa_principal.service_account_id + mocked_values = [ + { + "clientID": sa_uuid, + "name": f"Service Account name", + "description": f"Service Account description", + "owner": "jsmith", + "username": "service_account-" + sa_uuid, + "time_created": 1706784741, + "type": "service-account", + } + ] + mock_request.return_value = mocked_values + + url = reverse("group-principals", kwargs={"uuid": test_group.uuid}) + client = APIClient() + + request_body = {"principals": [{"clientID": sa_uuid, "type": "service-account"}]} + + response = client.post(url, request_body, format="json", **self.headers_user_based_principal) + self.assertEqual(response.status_code, status.HTTP_400_BAD_REQUEST) + self.assertEqual(response.data.get("errors")[0].get("detail"), self.user_access_admin_group_err_message) + + response = client.post(url, request_body, format="json", **self.headers_service_account_principal) + self.assertEqual(response.status_code, status.HTTP_400_BAD_REQUEST) + self.assertEqual(response.data.get("errors")[0].get("detail"), self.user_access_admin_group_err_message) + + # Only Org Admin can add a principal into a group with 'User Access administrator' role + response = client.post(url, request_body, format="json", **self.headers_org_admin) + self.assertEqual(response.status_code, status.HTTP_200_OK) From 7de5b0a26ec8dfdeced27ce53f15f7948100a4a7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Petra=20C=CC=8Ci=CC=81halova=CC=81?= Date: Wed, 13 Mar 2024 12:02:06 +0100 Subject: [PATCH 24/32] add tests for non org admin and DEL /groups/{uuid}/principals/ --- tests/management/group/test_view.py | 227 ++++++++++++++++++++++++++++ 1 file changed, 227 insertions(+) diff --git a/tests/management/group/test_view.py b/tests/management/group/test_view.py index 9fe20579..61bf2bbb 100644 --- a/tests/management/group/test_view.py +++ b/tests/management/group/test_view.py @@ -3986,3 +3986,230 @@ def test_add_service_account_principal_in_group_with_User_Access_Admin_fail(self # Only Org Admin can add a principal into a group with 'User Access administrator' role response = client.post(url, request_body, format="json", **self.headers_org_admin) self.assertEqual(response.status_code, status.HTTP_200_OK) + + def test_remove_user_based_principal_from_group_without_User_Access_Admin_fail(self): + """ + Test that non org admin without 'User Access administrator' role cannot remove + user based principal from a group without 'User Access administrator' role. + """ + test_group = Group(name="test group", tenant=self.tenant) + test_group.save() + test_principal = Principal(username="test-principal", tenant=self.tenant) + test_principal.save() + test_group.principals.add(test_principal) + test_group.save() + + url = reverse("group-principals", kwargs={"uuid": test_group.uuid}) + f"?usernames={test_principal.username}" + client = APIClient() + + response = client.delete(url, format="json", **self.headers_user_based_principal) + self.assertEqual(response.status_code, status.HTTP_403_FORBIDDEN) + self.assertEqual(response.data.get("errors")[0].get("detail"), self.no_permission_err_message) + + response = client.delete(url, format="json", **self.headers_service_account_principal) + self.assertEqual(response.status_code, status.HTTP_403_FORBIDDEN) + self.assertEqual(response.data.get("errors")[0].get("detail"), self.no_permission_err_message) + + # Org Admin can remove a principal from a group + response = client.delete(url, format="json", **self.headers_org_admin) + self.assertEqual(response.status_code, status.HTTP_204_NO_CONTENT) + + @override_settings(IT_BYPASS_TOKEN_VALIDATION=True) + def test_remove_service_account_principal_from_group_without_User_Access_Admin_fail(self): + """ + Test that non org admin without 'User Access administrator' role cannot remove + service account based principal from a group without 'User Access administrator' role. + """ + test_group = Group(name="test group", tenant=self.tenant) + test_group.save() + + service_account_data = self._create_service_account_data() + sa_principal = Principal( + username=service_account_data["username"], + tenant=self.tenant, + type="service-account", + service_account_id=service_account_data["client_id"], + ) + sa_principal.save() + test_group.principals.add(sa_principal) + test_group.save() + + url = ( + reverse("group-principals", kwargs={"uuid": test_group.uuid}) + + f"?service-accounts={sa_principal.username}" + ) + client = APIClient() + + response = client.delete(url, format="json", **self.headers_user_based_principal) + self.assertEqual(response.status_code, status.HTTP_403_FORBIDDEN) + self.assertEqual(response.data.get("errors")[0].get("detail"), self.no_permission_err_message) + + response = client.delete(url, format="json", **self.headers_service_account_principal) + self.assertEqual(response.status_code, status.HTTP_403_FORBIDDEN) + self.assertEqual(response.data.get("errors")[0].get("detail"), self.no_permission_err_message) + + # Org Admin can remove a principal from a group + response = client.delete(url, format="json", **self.headers_org_admin) + self.assertEqual(response.status_code, status.HTTP_204_NO_CONTENT) + + def test_remove_user_based_principal_from_group_with_User_Access_Admin_success(self): + """ + Test that non org admin with 'User Access administrator' role can remove + user based principal from a group without 'User Access administrator' role. + """ + # Create a group with 'User Access administrator' role and add principals we use in headers + group_with_UA_admin = self._create_group_with_user_access_administrator_role(self.tenant) + group_with_UA_admin.principals.add(self.user_based_principal, self.service_account_principal) + + test_group = Group(name="test group", tenant=self.tenant) + test_group.save() + test_principal = Principal(username="test-principal", tenant=self.tenant) + test_principal.save() + test_group.principals.add(test_principal) + test_group.save() + + url = reverse("group-principals", kwargs={"uuid": test_group.uuid}) + f"?usernames={test_principal.username}" + client = APIClient() + + response = client.delete(url, format="json", **self.headers_user_based_principal) + self.assertEqual(response.status_code, status.HTTP_204_NO_CONTENT) + + # Add once removed principal into group + test_group.principals.add(test_principal) + test_group.save() + + response = client.delete(url, format="json", **self.headers_service_account_principal) + self.assertEqual(response.status_code, status.HTTP_204_NO_CONTENT) + + @override_settings(IT_BYPASS_TOKEN_VALIDATION=True) + def test_remove_service_account_principal_from_group_with_User_Access_Admin_success(self): + """ + Test that non org admin with 'User Access administrator' role can remove + service account based principal from a group without 'User Access administrator' role. + """ + # Create a group with 'User Access administrator' role and add principals we use in headers + group_with_UA_admin = self._create_group_with_user_access_administrator_role(self.tenant) + group_with_UA_admin.principals.add(self.user_based_principal, self.service_account_principal) + + test_group = Group(name="test group", tenant=self.tenant) + test_group.save() + service_account_data = self._create_service_account_data() + sa_principal = Principal( + username=service_account_data["username"], + tenant=self.tenant, + type="service-account", + service_account_id=service_account_data["client_id"], + ) + sa_principal.save() + test_group.principals.add(sa_principal) + test_group.save() + + url = ( + reverse("group-principals", kwargs={"uuid": test_group.uuid}) + + f"?service-accounts={sa_principal.username}" + ) + client = APIClient() + + response = client.delete(url, format="json", **self.headers_user_based_principal) + self.assertEqual(response.status_code, status.HTTP_204_NO_CONTENT) + + # Add once removed principal into group + test_group.principals.add(sa_principal) + test_group.save() + + response = client.delete(url, format="json", **self.headers_service_account_principal) + self.assertEqual(response.status_code, status.HTTP_204_NO_CONTENT) + + def test_remove_user_based_principal_from_group_with_User_Access_Admin_fail(self): + """ + Test that non org admin with 'User Access administrator' role cannot remove + user based principal from a group with 'User Access administrator' role. + """ + # Create a group with 'User Access administrator' role and add principals we use in headers + group_with_UA_admin = self._create_group_with_user_access_administrator_role(self.tenant) + group_with_UA_admin.principals.add(self.user_based_principal, self.service_account_principal) + + # Create a group with 'User Access administrator' role and a principal + test_group = Group(name="test group", tenant=self.tenant) + test_group.save() + + user_access_admin_role = group_with_UA_admin.roles()[0] + request_body = {"roles": [user_access_admin_role.uuid]} + + url = reverse("group-roles", kwargs={"uuid": test_group.uuid}) + client = APIClient() + response = client.post(url, request_body, format="json", **self.headers_org_admin) + # Role 'User Access administrator' added successfully into test group + self.assertEqual(response.status_code, status.HTTP_200_OK) + + test_principal = Principal(username="test-principal", tenant=self.tenant) + test_principal.save() + test_group.principals.add(test_principal) + test_group.save() + + url = reverse("group-principals", kwargs={"uuid": test_group.uuid}) + f"?usernames={test_principal.username}" + client = APIClient() + + response = client.delete(url, format="json", **self.headers_user_based_principal) + self.assertEqual(response.status_code, status.HTTP_400_BAD_REQUEST) + self.assertEqual(response.data.get("errors")[0].get("detail"), self.user_access_admin_group_err_message) + + response = client.delete(url, format="json", **self.headers_service_account_principal) + self.assertEqual(response.status_code, status.HTTP_400_BAD_REQUEST) + self.assertEqual(response.data.get("errors")[0].get("detail"), self.user_access_admin_group_err_message) + + # Only Org admin can remove a principal from a group with 'User Access administrator' role + response = client.delete(url, format="json", **self.headers_org_admin) + self.assertEqual(response.status_code, status.HTTP_204_NO_CONTENT) + + @override_settings(IT_BYPASS_TOKEN_VALIDATION=True) + def test_remove_service_account_principal_from_group_with_User_Access_Admin_fail(self): + """ + Test that non org admin with 'User Access administrator' role cannot remove + service account based principal from a group with 'User Access administrator' role. + """ + # Create a group with 'User Access administrator' role and add principals we use in headers + group_with_UA_admin = self._create_group_with_user_access_administrator_role(self.tenant) + group_with_UA_admin.principals.add(self.user_based_principal, self.service_account_principal) + + # Create a group with 'User Access administrator' role and a principal + test_group = Group(name="test group", tenant=self.tenant) + test_group.save() + + user_access_admin_role = group_with_UA_admin.roles()[0] + request_body = {"roles": [user_access_admin_role.uuid]} + + url = reverse("group-roles", kwargs={"uuid": test_group.uuid}) + client = APIClient() + response = client.post(url, request_body, format="json", **self.headers_org_admin) + # Role 'User Access administrator' added successfully into test group + self.assertEqual(response.status_code, status.HTTP_200_OK) + + service_account_data = self._create_service_account_data() + sa_principal = Principal( + username=service_account_data["username"], + tenant=self.tenant, + type="service-account", + service_account_id=service_account_data["client_id"], + ) + sa_principal.save() + test_group.principals.add(sa_principal) + test_group.save() + + url = ( + reverse("group-principals", kwargs={"uuid": test_group.uuid}) + + f"?service-accounts={sa_principal.username}" + ) + client = APIClient() + + response = client.delete(url, format="json", **self.headers_user_based_principal) + self.assertEqual(response.status_code, status.HTTP_400_BAD_REQUEST) + self.assertEqual(response.data.get("errors")[0].get("detail"), self.user_access_admin_group_err_message) + + response = client.delete(url, format="json", **self.headers_service_account_principal) + self.assertEqual(response.status_code, status.HTTP_400_BAD_REQUEST) + self.assertEqual(response.data.get("errors")[0].get("detail"), self.user_access_admin_group_err_message) + + # Only Org admin can remove a principal from a group with 'User Access administrator' role + response = client.delete(url, format="json", **self.headers_org_admin) + self.assertEqual(response.status_code, status.HTTP_204_NO_CONTENT) From 3c50a431973d636858de16025094f22427366610 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Petra=20C=CC=8Ci=CC=81halova=CC=81?= Date: Wed, 13 Mar 2024 12:02:50 +0100 Subject: [PATCH 25/32] add tests for non org admin and GET /groups/{uuid}/roles/ --- tests/management/group/test_view.py | 56 +++++++++++++++++++++++++++++ 1 file changed, 56 insertions(+) diff --git a/tests/management/group/test_view.py b/tests/management/group/test_view.py index 61bf2bbb..5e5b87b1 100644 --- a/tests/management/group/test_view.py +++ b/tests/management/group/test_view.py @@ -4213,3 +4213,59 @@ def test_remove_service_account_principal_from_group_with_User_Access_Admin_fail # Only Org admin can remove a principal from a group with 'User Access administrator' role response = client.delete(url, format="json", **self.headers_org_admin) self.assertEqual(response.status_code, status.HTTP_204_NO_CONTENT) + + def test_list_roles_in_group_without_User_Access_Admin_fail(self): + """Test that non org admin without 'User Access administrator' role cannot list roles in the group.""" + # Create a group, policy and role we need for the test + group = Group.objects.create(name="test group", tenant=self.tenant) + policy = Policy.objects.create(name="policy for test group", tenant=self.tenant) + role = Role.objects.create( + name="test role", description="test role description", system=False, tenant=self.tenant + ) + policy.roles.add(role) + group.policies.add(policy) + + url = reverse("group-roles", kwargs={"uuid": group.uuid}) + client = APIClient() + + response = client.get(url, format="json", **self.headers_user_based_principal) + self.assertEqual(response.status_code, status.HTTP_403_FORBIDDEN) + self.assertEqual(response.data.get("errors")[0].get("detail"), self.no_permission_err_message) + + response = client.get(url, format="json", **self.headers_service_account_principal) + self.assertEqual(response.status_code, status.HTTP_403_FORBIDDEN) + self.assertEqual(response.data.get("errors")[0].get("detail"), self.no_permission_err_message) + + # Check that role exists in the database + self.assertIsNotNone(Role.objects.get(uuid=role.uuid)) + + # Check that org admin can list the roles under group + response = client.get(url, format="json", **self.headers_org_admin) + self.assertEqual(response.status_code, status.HTTP_200_OK) + self.assertEqual(len(response.data.get("data")), 1) + + def test_list_roles_in_group_with_User_Access_Admin_success(self): + """Test that non org admin with 'User Access administrator' role can list roles in the group.""" + # Create a group with 'User Access administrator' role and add principals we use in headers + group_with_UA_admin = self._create_group_with_user_access_administrator_role(self.tenant) + group_with_UA_admin.principals.add(self.user_based_principal, self.service_account_principal) + + # Create a group, policy and role we need for the test + group = Group.objects.create(name="test group", tenant=self.tenant) + policy = Policy.objects.create(name="policy for test group", tenant=self.tenant) + role = Role.objects.create( + name="test role", description="test role description", system=False, tenant=self.tenant + ) + policy.roles.add(role) + group.policies.add(policy) + + url = reverse("group-roles", kwargs={"uuid": group.uuid}) + client = APIClient() + + response = client.get(url, format="json", **self.headers_user_based_principal) + self.assertEqual(response.status_code, status.HTTP_200_OK) + self.assertEqual(len(response.data.get("data")), 1) + + response = client.get(url, format="json", **self.headers_service_account_principal) + self.assertEqual(response.status_code, status.HTTP_200_OK) + self.assertEqual(len(response.data.get("data")), 1) From 3af55335e1253c3980c8497e94041e013d77ac5e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Petra=20C=CC=8Ci=CC=81halova=CC=81?= Date: Wed, 13 Mar 2024 12:03:35 +0100 Subject: [PATCH 26/32] add tests for non org admin and POST /groups/{uuid}/roles/ --- tests/management/group/test_view.py | 123 ++++++++++++++++++++++++++++ 1 file changed, 123 insertions(+) diff --git a/tests/management/group/test_view.py b/tests/management/group/test_view.py index 5e5b87b1..10ab0cb8 100644 --- a/tests/management/group/test_view.py +++ b/tests/management/group/test_view.py @@ -4269,3 +4269,126 @@ def test_list_roles_in_group_with_User_Access_Admin_success(self): response = client.get(url, format="json", **self.headers_service_account_principal) self.assertEqual(response.status_code, status.HTTP_200_OK) self.assertEqual(len(response.data.get("data")), 1) + + def test_add_role_in_group_without_User_Access_Admin_fail(self): + """ + Test that non org admin without 'User Access administrator' role cannot add new role + (different from 'User Access administrator') to the group without 'User Access administrator' role. + """ + # Create a group and role we need for the test + group = Group.objects.create(name="test group", tenant=self.tenant) + role = Role.objects.create( + name="test role", description="test role description", system=False, tenant=self.tenant + ) + + request_body = {"roles": [role.uuid]} + url = reverse("group-roles", kwargs={"uuid": group.uuid}) + client = APIClient() + + response = client.post(url, request_body, format="json", **self.headers_user_based_principal) + self.assertEqual(response.status_code, status.HTTP_403_FORBIDDEN) + self.assertEqual(response.data.get("errors")[0].get("detail"), self.no_permission_err_message) + + response = client.post(url, request_body, format="json", **self.headers_service_account_principal) + self.assertEqual(response.status_code, status.HTTP_403_FORBIDDEN) + self.assertEqual(response.data.get("errors")[0].get("detail"), self.no_permission_err_message) + + # Check that org admin can add a role in group + response = client.post(url, request_body, format="json", **self.headers_org_admin) + self.assertEqual(response.status_code, status.HTTP_200_OK) + self.assertEqual(len(response.data.get("data")), 1) + self.assertEqual(response.data.get("data")[0].get("uuid"), str(role.uuid)) + + def test_add_role_in_group_with_User_Access_Admin_success(self): + """ + Test that non org admin with 'User Access administrator' role can add new role + (different from 'User Access administrator') to the group without 'User Access administrator' role. + """ + # Create a group with 'User Access administrator' role and add principals we use in headers + group_with_UA_admin = self._create_group_with_user_access_administrator_role(self.tenant) + group_with_UA_admin.principals.add(self.user_based_principal, self.service_account_principal) + + # Create a group and role we need for the test + group = Group.objects.create(name="test group", tenant=self.tenant) + role = Role.objects.create( + name="test role", description="test role description", system=False, tenant=self.tenant + ) + + request_body = {"roles": [role.uuid]} + url = reverse("group-roles", kwargs={"uuid": group.uuid}) + client = APIClient() + + response = client.post(url, request_body, format="json", **self.headers_user_based_principal) + self.assertEqual(response.status_code, status.HTTP_200_OK) + self.assertEqual(len(response.data.get("data")), 1) + self.assertEqual(response.data.get("data")[0].get("uuid"), str(role.uuid)) + + response = client.post(url, request_body, format="json", **self.headers_service_account_principal) + self.assertEqual(response.status_code, status.HTTP_200_OK) + self.assertEqual(len(response.data.get("data")), 1) + self.assertEqual(response.data.get("data")[0].get("uuid"), str(role.uuid)) + + def test_add_User_Access_Admin_role_in_group_with_User_Access_Admin_fail(self): + """ + Test that non org admin with 'User Access administrator' role cannot add the 'User Access + administrator' role to the group. + """ + # Create a group with 'User Access administrator' role and add principals we use in headers + group_with_UA_admin = self._create_group_with_user_access_administrator_role(self.tenant) + group_with_UA_admin.principals.add(self.user_based_principal, self.service_account_principal) + + # Create a group we need for the test + group = Group.objects.create(name="test group", tenant=self.tenant) + user_access_admin_role = group_with_UA_admin.roles()[0] + + request_body = {"roles": [user_access_admin_role.uuid]} + url = reverse("group-roles", kwargs={"uuid": group.uuid}) + client = APIClient() + + response = client.post(url, request_body, format="json", **self.headers_user_based_principal) + self.assertEqual(response.status_code, status.HTTP_400_BAD_REQUEST) + self.assertEqual(response.data.get("errors")[0].get("detail"), self.user_access_admin_role_err_message) + + response = client.post(url, request_body, format="json", **self.headers_service_account_principal) + self.assertEqual(response.status_code, status.HTTP_400_BAD_REQUEST) + self.assertEqual(response.data.get("errors")[0].get("detail"), self.user_access_admin_role_err_message) + + # Only Org Admin can add 'User Access administrator' role into a group + response = client.post(url, request_body, format="json", **self.headers_org_admin) + self.assertEqual(response.status_code, status.HTTP_200_OK) + self.assertEqual(len(response.data.get("data")), 1) + self.assertEqual(response.data.get("data")[0].get("uuid"), str(user_access_admin_role.uuid)) + + def test_add_role_in_group_with_User_Access_Admin_fail(self): + """ + Test that non org admin with 'User Access administrator' role cannot add new role + (different from 'User Access administrator') to the group with 'User Access administrator' role. + """ + # Create a group with 'User Access administrator' role and add principals we use in headers + group_with_UA_admin = self._create_group_with_user_access_administrator_role(self.tenant) + group_with_UA_admin.principals.add(self.user_based_principal, self.service_account_principal) + + # Create a role we need for the test + role = Role.objects.create( + name="test role", description="test role description", system=False, tenant=self.tenant + ) + + request_body = {"roles": [role.uuid]} + url = reverse("group-roles", kwargs={"uuid": group_with_UA_admin.uuid}) + client = APIClient() + + response = client.post(url, request_body, format="json", **self.headers_user_based_principal) + self.assertEqual(response.status_code, status.HTTP_400_BAD_REQUEST) + self.assertEqual(response.data.get("errors")[0].get("detail"), self.user_access_admin_group_err_message) + + response = client.post(url, request_body, format="json", **self.headers_service_account_principal) + self.assertEqual(response.status_code, status.HTTP_400_BAD_REQUEST) + self.assertEqual(response.data.get("errors")[0].get("detail"), self.user_access_admin_group_err_message) + + # Only Org Admin can add a role to a group with 'User Access administrator' role + response = client.post(url, request_body, format="json", **self.headers_org_admin) + self.assertEqual(response.status_code, status.HTTP_200_OK) + self.assertEqual(len(response.data.get("data")), 2) + + role_uuid_from_response = [item.get("uuid") for item in response.data.get("data")] + self.assertIn(str(role.uuid), role_uuid_from_response) From bd0c26223757d03cea0cc0f95d9f84da93260ae7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Petra=20C=CC=8Ci=CC=81halova=CC=81?= Date: Wed, 13 Mar 2024 12:04:05 +0100 Subject: [PATCH 27/32] add tests for non org admin and DEL /groups/{uuid}/roles/ --- tests/management/group/test_view.py | 125 ++++++++++++++++++++++++++++ 1 file changed, 125 insertions(+) diff --git a/tests/management/group/test_view.py b/tests/management/group/test_view.py index 10ab0cb8..42130091 100644 --- a/tests/management/group/test_view.py +++ b/tests/management/group/test_view.py @@ -4392,3 +4392,128 @@ def test_add_role_in_group_with_User_Access_Admin_fail(self): role_uuid_from_response = [item.get("uuid") for item in response.data.get("data")] self.assertIn(str(role.uuid), role_uuid_from_response) + + def test_remove_role_from_group_without_User_Access_Admin_fail(self): + """Test that non org admin without 'User Access administrator' role cannot remove a role from the group.""" + # Create a group, policy and role we need for the test + group = Group.objects.create(name="test group", tenant=self.tenant) + policy = Policy.objects.create(name="policy for test group", tenant=self.tenant) + role = Role.objects.create( + name="test role", description="test role description", system=False, tenant=self.tenant + ) + policy.roles.add(role) + group.policies.add(policy) + + url = reverse("group-roles", kwargs={"uuid": group.uuid}) + f"?roles={role.uuid}" + client = APIClient() + + response = client.delete(url, format="json", **self.headers_user_based_principal) + self.assertEqual(response.status_code, status.HTTP_403_FORBIDDEN) + self.assertEqual(response.data.get("errors")[0].get("detail"), self.no_permission_err_message) + + response = client.delete(url, format="json", **self.headers_service_account_principal) + self.assertEqual(response.status_code, status.HTTP_403_FORBIDDEN) + self.assertEqual(response.data.get("errors")[0].get("detail"), self.no_permission_err_message) + + # Check that org admin can remove the roles from the group + response = client.delete(url, format="json", **self.headers_org_admin) + self.assertEqual(response.status_code, status.HTTP_204_NO_CONTENT) + + # Check that the group doesn't contain the role anymore + self.assertTrue(len(group.roles()) == 0) + + def test_remove_role_from_group_with_User_Access_Admin_success(self): + """Test that non org admin with 'User Access administrator' role can remove a role from the group.""" + # Create a group with 'User Access administrator' role and add principals we use in headers + group_with_UA_admin = self._create_group_with_user_access_administrator_role(self.tenant) + group_with_UA_admin.principals.add(self.user_based_principal, self.service_account_principal) + + # Create a group, policy and role we need for the test + group = Group.objects.create(name="test group", tenant=self.tenant) + policy = Policy.objects.create(name="policy for test group", tenant=self.tenant) + role = Role.objects.create( + name="test role", description="test role description", system=False, tenant=self.tenant + ) + policy.roles.add(role) + group.policies.add(policy) + + url = reverse("group-roles", kwargs={"uuid": group.uuid}) + f"?roles={role.uuid}" + client = APIClient() + + response = client.delete(url, format="json", **self.headers_user_based_principal) + self.assertEqual(response.status_code, status.HTTP_204_NO_CONTENT) + + # Add the role again to the group for the test with service account in headers + group.policies.add(policy) + + response = client.delete(url, format="json", **self.headers_service_account_principal) + self.assertEqual(response.status_code, status.HTTP_204_NO_CONTENT) + + def test_remove_role_from_group_with_User_Access_Admin_fail(self): + """ + Test that non org admin with 'User Access administrator' role cannot remove a role + (different from 'User Access administrator') from the group with 'User Access administrator' role. + """ + # Create a group with 'User Access administrator' role and add principals we use in headers + group_with_UA_admin = self._create_group_with_user_access_administrator_role(self.tenant) + group_with_UA_admin.principals.add(self.user_based_principal, self.service_account_principal) + + # Add new role to the group with 'User Access administrator' role + role = Role.objects.create( + name="test role", description="test role description", system=False, tenant=self.tenant + ) + request_body = {"roles": [role.uuid]} + url = reverse("group-roles", kwargs={"uuid": group_with_UA_admin.uuid}) + client = APIClient() + response = client.post(url, request_body, format="json", **self.headers_org_admin) + self.assertEqual(response.status_code, status.HTTP_200_OK) + + # Test that non org admin with 'User Access administrator' role cannot remove a role + url = reverse("group-roles", kwargs={"uuid": group_with_UA_admin.uuid}) + f"?roles={role.uuid}" + + response = client.delete(url, format="json", **self.headers_user_based_principal) + self.assertEqual(response.status_code, status.HTTP_400_BAD_REQUEST) + self.assertEqual(response.data.get("errors")[0].get("detail"), self.user_access_admin_group_err_message) + + response = client.delete(url, format="json", **self.headers_service_account_principal) + self.assertEqual(response.status_code, status.HTTP_400_BAD_REQUEST) + self.assertEqual(response.data.get("errors")[0].get("detail"), self.user_access_admin_group_err_message) + + # Only Org Admin can remove role like this + response = client.delete(url, format="json", **self.headers_org_admin) + self.assertEqual(response.status_code, status.HTTP_204_NO_CONTENT) + + def test_remove_User_Access_Admin_role_from_group_fail(self): + """ + Test that non org admin with 'User Access administrator' role cannot remove the 'User Access administrator' + role from the group. + """ + # Create a group with 'User Access administrator' role and add principals we use in headers + group_with_UA_admin = self._create_group_with_user_access_administrator_role(self.tenant) + group_with_UA_admin.principals.add(self.user_based_principal, self.service_account_principal) + + # Create a group with 'User Access administrator' role + group = Group.objects.create(name="test group", tenant=self.tenant) + + user_access_admin_role = group_with_UA_admin.roles()[0] + request_body = {"roles": [user_access_admin_role.uuid]} + + url = reverse("group-roles", kwargs={"uuid": group.uuid}) + client = APIClient() + response = client.post(url, request_body, format="json", **self.headers_org_admin) + self.assertEqual(response.status_code, status.HTTP_200_OK) + + # Test that non org admin with 'User Access administrator' role cannot remove a role + url = reverse("group-roles", kwargs={"uuid": group.uuid}) + f"?roles={user_access_admin_role.uuid}" + + response = client.delete(url, format="json", **self.headers_user_based_principal) + self.assertEqual(response.status_code, status.HTTP_400_BAD_REQUEST) + self.assertEqual(response.data.get("errors")[0].get("detail"), self.user_access_admin_group_err_message) + + response = client.delete(url, format="json", **self.headers_service_account_principal) + self.assertEqual(response.status_code, status.HTTP_400_BAD_REQUEST) + self.assertEqual(response.data.get("errors")[0].get("detail"), self.user_access_admin_group_err_message) + + # Only Org Admin can remove role like this + response = client.delete(url, format="json", **self.headers_org_admin) + self.assertEqual(response.status_code, status.HTTP_204_NO_CONTENT) From 0292fef8ea830f1e04e3badb8f225d892fc1f2c3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Petra=20C=CC=8Ci=CC=81halova=CC=81?= Date: Wed, 13 Mar 2024 12:12:00 +0100 Subject: [PATCH 28/32] add tests for non org admin and GET /groups/ --- tests/management/group/test_view.py | 26 ++++++++++++++++++++++++++ 1 file changed, 26 insertions(+) diff --git a/tests/management/group/test_view.py b/tests/management/group/test_view.py index 42130091..6df96f1b 100644 --- a/tests/management/group/test_view.py +++ b/tests/management/group/test_view.py @@ -4517,3 +4517,29 @@ def test_remove_User_Access_Admin_role_from_group_fail(self): # Only Org Admin can remove role like this response = client.delete(url, format="json", **self.headers_org_admin) self.assertEqual(response.status_code, status.HTTP_204_NO_CONTENT) + + def test_list_groups_without_User_Access_Admin_success(self): + """Test that non org admin without 'User Access administrator' role can list groups.""" + url = reverse("group-list") + client = APIClient() + + response = client.get(url, format="json", **self.headers_user_based_principal) + self.assertEqual(response.status_code, status.HTTP_200_OK) + + response = client.get(url, format="json", **self.headers_service_account_principal) + self.assertEqual(response.status_code, status.HTTP_200_OK) + + def test_list_groups_with_User_Access_Admin_success(self): + """Test that non org admin with 'User Access administrator' role can list groups.""" + # Create a group with 'User Access administrator' role and add principals we use in headers + group_with_UA_admin = self._create_group_with_user_access_administrator_role(self.tenant) + group_with_UA_admin.principals.add(self.user_based_principal, self.service_account_principal) + + url = reverse("group-list") + client = APIClient() + + response = client.get(url, format="json", **self.headers_user_based_principal) + self.assertEqual(response.status_code, status.HTTP_200_OK) + + response = client.get(url, format="json", **self.headers_service_account_principal) + self.assertEqual(response.status_code, status.HTTP_200_OK) From f3f641ae4eface1ca779a935817c1fa6b36cbb21 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Petra=20C=CC=8Ci=CC=81halova=CC=81?= Date: Wed, 13 Mar 2024 12:36:45 +0100 Subject: [PATCH 29/32] add tests for non org admin and GET /groups/{uuid}/ --- tests/management/group/test_view.py | 40 +++++++++++++++++++++++++++++ 1 file changed, 40 insertions(+) diff --git a/tests/management/group/test_view.py b/tests/management/group/test_view.py index 6df96f1b..5a9f44ee 100644 --- a/tests/management/group/test_view.py +++ b/tests/management/group/test_view.py @@ -4543,3 +4543,43 @@ def test_list_groups_with_User_Access_Admin_success(self): response = client.get(url, format="json", **self.headers_service_account_principal) self.assertEqual(response.status_code, status.HTTP_200_OK) + + def test_read_group_without_User_Access_Admin_fail(self): + """Test that non org admin without 'User Access administrator' role cannot read a group.""" + group = Group.objects.create(name="test group", tenant=self.tenant) + + url = reverse("group-detail", kwargs={"uuid": group.uuid}) + client = APIClient() + + response = client.get(url, format="json", **self.headers_user_based_principal) + self.assertEqual(response.status_code, status.HTTP_403_FORBIDDEN) + self.assertEqual(response.data.get("errors")[0].get("detail"), self.no_permission_err_message) + + response = client.get(url, format="json", **self.headers_service_account_principal) + self.assertEqual(response.status_code, status.HTTP_403_FORBIDDEN) + self.assertEqual(response.data.get("errors")[0].get("detail"), self.no_permission_err_message) + + # Org Admin can read detail of a group + response = client.get(url, format="json", **self.headers_org_admin) + + self.assertEqual(response.status_code, status.HTTP_200_OK) + self.assertEqual(response.data.get("uuid"), str(group.uuid)) + + def test_read_group_with_User_Access_Admin_success(self): + """Test that non org admin with 'User Access administrator' role can read a group.""" + # Create a group with 'User Access administrator' role and add principals we use in headers + group_with_UA_admin = self._create_group_with_user_access_administrator_role(self.tenant) + group_with_UA_admin.principals.add(self.user_based_principal, self.service_account_principal) + + group = Group.objects.create(name="test group", tenant=self.tenant) + + url = reverse("group-detail", kwargs={"uuid": group.uuid}) + client = APIClient() + + response = client.get(url, format="json", **self.headers_user_based_principal) + self.assertEqual(response.status_code, status.HTTP_200_OK) + self.assertEqual(response.data.get("uuid"), str(group.uuid)) + + response = client.get(url, format="json", **self.headers_service_account_principal) + self.assertEqual(response.status_code, status.HTTP_200_OK) + self.assertEqual(response.data.get("uuid"), str(group.uuid)) From fd9475f2a10becdc50948b3a330c3909f599e79e Mon Sep 17 00:00:00 2001 From: Mikel Alejo Barcina Ribera Date: Wed, 13 Mar 2024 16:01:39 +0100 Subject: [PATCH 30/32] refactor: rename comments and variables accordingly Some of the comments and variable names do not match what they are representing. RHCLOUD-30986 --- tests/management/group/test_view.py | 53 +++++++++++++++-------------- 1 file changed, 27 insertions(+), 26 deletions(-) diff --git a/tests/management/group/test_view.py b/tests/management/group/test_view.py index 5a9f44ee..11da965e 100644 --- a/tests/management/group/test_view.py +++ b/tests/management/group/test_view.py @@ -2803,7 +2803,8 @@ def test_group_service_account_with_user_administrator_role_add_principals( - Creates a regular group which we will use for the tests. - Sends a request with an unprivileged service account to attempt to add principals to the regular group. - Creates a service account principal and adds it to the User Access Administrator group. - - Sends a request with the privileged user and asserts that the principals were added to the regular group. + - Sends a request with the privileged service account and asserts that the principals were added to the regular + group. """ # Create the customer data we will be using for the tenant. customer_data = self._create_customer_data() @@ -2865,24 +2866,24 @@ def test_group_service_account_with_user_administrator_role_add_principals( "the regular group should not have any principals associated with it", ) - # Create the request context for an unprivileged user. - privileged_request_context = self._create_request_context( + # Create the request context for an unprivileged service account. + unprivileged_request_context = self._create_request_context( customer_data=customer_data, user_data=None, service_account_data=self._create_service_account_data(), is_org_admin=False, ) - # Call the endpoint under test with an unprivileged user. + # Call the endpoint under test with an unprivileged service account. response = api_client.post( - group_principals_url, test_data, format="json", **privileged_request_context["request"].META + group_principals_url, test_data, format="json", **unprivileged_request_context["request"].META ) - # Assert that the user does not have permission to perform this operation. + # Assert that the service account does not have permission to perform this operation. self.assertEqual( status.HTTP_403_FORBIDDEN, response.status_code, - "unexpected status code when an unprivileged user attempts to add principals to a group", + "unexpected status code when an unprivileged service account attempts to add principals to a group", ) # Assert that no principals were added to the group. @@ -2892,7 +2893,7 @@ def test_group_service_account_with_user_administrator_role_add_principals( "after a failed request to add a principal, the regular group should still have zero associated principals", ) - # Add the user access admin principal to the user access admin group. + # Add the user access admin service account principal to the user access admin group. service_account_data = self._create_service_account_data() user_access_admin_sa_principal = Principal() @@ -2908,7 +2909,7 @@ def test_group_service_account_with_user_administrator_role_add_principals( # Simulate that the proxy validates the specified user principal correctly. request_filtered_principals.return_value = {"data": [{"username": new_principal.username}]} - # Create the request context for privileged user. + # Create the request context for privileged service account. privileged_request_context = self._create_request_context( customer_data=customer_data, user_data=None, service_account_data=service_account_data, is_org_admin=False ) @@ -2922,8 +2923,8 @@ def test_group_service_account_with_user_administrator_role_add_principals( self.assertEqual( status.HTTP_200_OK, response_two.status_code, - "unexpected status code when a user with the User Access Administrator role attempts to manage" - " principals on a group", + "unexpected status code when a service account with the User Access Administrator role attempts to" + " manage principals on a group", ) # Assert that both the service account and the user principal were added to the group. @@ -2952,7 +2953,7 @@ def test_group_service_account_with_user_administrator_role_add_principals( @override_settings(IT_BYPASS_TOKEN_VALIDATION=True) @override_settings(IT_BYPASS_IT_CALLS=True) def test_group_service_account_with_user_administrator_role_remove_principals(self): - """Test that a service account with the User Access administrator role can manage principals in a group + """Test that a service account with the User Access administrator role can manage principals in a group. The way the test is performed is the following: - Creates a group with a User Access Administrator role associated to it. @@ -3020,20 +3021,20 @@ def test_group_service_account_with_user_administrator_role_remove_principals(se "the regular group should have two principals before the deletion attempt", ) - # Create the request context for an unprivileged user. - privileged_request_context = self._create_request_context( + # Create the request context for an unprivileged service account. + unprivileged_request_context = self._create_request_context( customer_data=customer_data, user_data=None, service_account_data=self._create_service_account_data(), is_org_admin=False, ) - # Call the endpoint under test with an unprivileged user. + # Call the endpoint under test with an unprivileged service account. response = api_client.delete( - path=group_principals_url, data=None, format="json", **privileged_request_context["request"].META + path=group_principals_url, data=None, format="json", **unprivileged_request_context["request"].META ) - # Assert that the user does not have permission to perform this operation. + # Assert that the service account does not have permission to perform this operation. self.assertEqual( status.HTTP_403_FORBIDDEN, response.status_code, @@ -3060,12 +3061,12 @@ def test_group_service_account_with_user_administrator_role_remove_principals(se user_access_admin_group.principals.add(user_access_admin_sa_principal) - # Create the request context for privileged user. + # Create the request context for privileged service account. privileged_request_context = self._create_request_context( customer_data=customer_data, user_data=None, service_account_data=service_account_data, is_org_admin=False ) - # Call the endpoint under test with the user with "User Access Administrator" permissions. + # Call the endpoint under test with the service account with "User Access Administrator" permissions. response_two = api_client.delete( path=group_principals_url, format="json", **privileged_request_context["request"].META ) @@ -3078,7 +3079,7 @@ def test_group_service_account_with_user_administrator_role_remove_principals(se " principals from a group", ) - # Assert that both the service account and the user principal were added to the group. + # Assert that both the service account and the user principal were removed from the group. self.assertEqual( 0, regular_group.principals.count(), @@ -3160,13 +3161,13 @@ def test_group_user_with_user_administrator_role_add_principals(self, request_fi ) # Create the request context for an unprivileged user. - privileged_request_context = self._create_request_context( + unprivileged_request_context = self._create_request_context( customer_data=customer_data, user_data=self._create_user_data(), is_org_admin=False ) # Call the endpoint under test with an unprivileged user. response = api_client.post( - path=group_principals_url, data=test_data, format="json", **privileged_request_context["request"].META + path=group_principals_url, data=test_data, format="json", **unprivileged_request_context["request"].META ) # Assert that the user does not have permission to perform this operation. @@ -3198,7 +3199,7 @@ def test_group_user_with_user_administrator_role_add_principals(self, request_fi # Simulate that the proxy validates the specified user principal correctly. request_filtered_principals.return_value = {"data": [{"username": new_principal.username}]} - # Create the request context for privileged user. + # Create the request context for the privileged user. privileged_request_context = self._create_request_context( customer_data=customer_data, user_data=user_data, is_org_admin=False ) @@ -3311,13 +3312,13 @@ def test_group_user_with_user_administrator_role_remove_principals(self): ) # Create the request context for an unprivileged user. - privileged_request_context = self._create_request_context( + unprivileged_request_context = self._create_request_context( customer_data=customer_data, user_data=self._create_user_data(), is_org_admin=False ) # Call the endpoint under test with an unprivileged user. response = api_client.delete( - path=group_principals_url, data=None, format="json", **privileged_request_context["request"].META + path=group_principals_url, data=None, format="json", **unprivileged_request_context["request"].META ) # Assert that the user does not have permission to perform this operation. @@ -3346,7 +3347,7 @@ def test_group_user_with_user_administrator_role_remove_principals(self): user_access_admin_group.principals.add(user_access_admin_principal) - # Create the request context for privileged user. + # Create the request context for the privileged user. privileged_request_context = self._create_request_context( customer_data=customer_data, user_data=user_data, is_org_admin=False ) From 4079dea8b48a9a3c147f232e1d1323d3b50cdacd Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Petra=20C=CC=8Ci=CC=81halova=CC=81?= Date: Wed, 13 Mar 2024 16:35:21 +0100 Subject: [PATCH 31/32] typo fix in the 'test_remove_group_with_User_Access_Admin_fail' docstring --- tests/management/group/test_view.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/management/group/test_view.py b/tests/management/group/test_view.py index 11da965e..82304acd 100644 --- a/tests/management/group/test_view.py +++ b/tests/management/group/test_view.py @@ -3529,7 +3529,7 @@ def test_remove_group_with_User_Access_Admin_success(self): def test_remove_group_with_User_Access_Admin_fail(self): """ - Test that non org admin with 'User Access administrator' role cannot update a group + Test that non org admin with 'User Access administrator' role cannot remove a group with 'User Access administrator' role. """ # Create a group with 'User Access administrator' role and add principals we use in headers From 3c10703d4054cac472fbfb6648e7c4bbce1238cd Mon Sep 17 00:00:00 2001 From: Mikel Alejo Date: Fri, 15 Mar 2024 09:38:50 +0100 Subject: [PATCH 32/32] Update tests/management/group/test_view.py Co-authored-by: Petra Cihalova <89980168+petracihalova@users.noreply.github.com> --- tests/management/group/test_view.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/tests/management/group/test_view.py b/tests/management/group/test_view.py index 82304acd..891ad0a8 100644 --- a/tests/management/group/test_view.py +++ b/tests/management/group/test_view.py @@ -3935,8 +3935,7 @@ def test_add_service_account_principal_in_group_with_User_Access_Admin_fail(self group_with_UA_admin.principals.add(self.user_based_principal, self.service_account_principal) # Create a group with 'User Access administrator' role and a principal - test_group = Group(name="test group", tenant=self.tenant) - test_group.save() + test_group = Group.objects.create(name="test group", tenant=self.tenant) user_access_admin_role = group_with_UA_admin.roles()[0] request_body = {"roles": [user_access_admin_role.uuid]}