diff --git a/rbac/api/common/exception_handler.py b/rbac/api/common/exception_handler.py index a79e51554..dae74d9c1 100644 --- a/rbac/api/common/exception_handler.py +++ b/rbac/api/common/exception_handler.py @@ -61,6 +61,25 @@ def _generate_errors_from_dict(data, **kwargs): return errors +def _generate_error_data_payload_response(detail: str, context, http_status_code: int) -> dict: + """Generate the payload for the "data" parameter of the response.""" + data = { + "errors": [ + { + "detail": detail, + "status": str(http_status_code), + } + ] + } + + # Some exceptions might be raised from places that are not views. + view = context.get("view") + if view: + data["errors"][0]["source"] = view.basename + + return data + + def custom_exception_handler(exc, context): """Create custom response for exceptions.""" response = exception_handler(exc, context) @@ -87,43 +106,30 @@ def custom_exception_handler(exc, context): ) elif isinstance(exc, InvalidTokenError): response = Response( - data={ - "errors": [ - { - "detail": "Invalid token provided.", - "status": str(status.HTTP_401_UNAUTHORIZED), - } - ] - }, + data=_generate_error_data_payload_response( + detail="Invalid token provided.", context=context, http_status_code=status.HTTP_401_UNAUTHORIZED + ), content_type="application/json", status=status.HTTP_401_UNAUTHORIZED, ) elif isinstance(exc, MissingAuthorizationError): response = Response( - data={ - "errors": [ - { - "detail": "A Bearer token in an authorization header is required when" - " performing service account operations.", - "source": context.get("view").basename, - "status": str(status.HTTP_401_UNAUTHORIZED), - } - ] - }, + data=_generate_error_data_payload_response( + detail="A Bearer token in an authorization header is required when performing service account" + " operations.", + context=context, + http_status_code=status.HTTP_401_UNAUTHORIZED, + ), content_type="application/json", status=status.HTTP_401_UNAUTHORIZED, ) elif isinstance(exc, UnableMeetPrerequisitesError): response = Response( - data={ - "errors": [ - { - "detail": "Unable to validate the provided token.", - "source": context.get("view").basename, - "status": str(status.HTTP_500_INTERNAL_SERVER_ERROR), - } - ] - }, + data=_generate_error_data_payload_response( + detail="Unable to validate the provided token.", + context=context, + http_status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, + ), content_type="application/json", status=status.HTTP_500_INTERNAL_SERVER_ERROR, ) diff --git a/rbac/management/utils.py b/rbac/management/utils.py index 65e940a88..b35cd22b5 100644 --- a/rbac/management/utils.py +++ b/rbac/management/utils.py @@ -23,8 +23,6 @@ from django.conf import settings from django.core.exceptions import PermissionDenied from django.utils.translation import gettext as _ -from management.authorization.scope_claims import ScopeClaims -from management.authorization.token_validator import ITSSOTokenValidator from management.models import Access, Group, Policy, Principal, Role from management.permissions.principal_access import PrincipalAccessPermission from management.principal.it_service import ITService @@ -72,45 +70,44 @@ def get_principal_from_request(request): def get_principal( username: str, request: Request, verify_principal: bool = True, from_query: bool = False ) -> Principal: - """Get principals from username.""" - # First check if principal exist on our side, - # if not call BOP to check if user exist in the account. - tenant: Tenant = request.tenant - user = request.user + """Get principals from username. - # Get the Bearer Token validated to get Service Accounts from IT - if ITService.is_username_service_account(username=username): - token_validator = ITSSOTokenValidator() - user.bearer_token = token_validator.validate_token( - request=request, additional_scopes_to_validate=set[ScopeClaims]([ScopeClaims.SERVICE_ACCOUNTS_CLAIM]) - ) + The service account usernames are not being validated for now because that would require for the clients to send + tokens with the "api.iam.service_accounts" scope. That conflicts with the following two use cases: + + - A user sends a request to the /access/?username=service-account- endpoint. + - A service account makes a request to /access. + + In these two cases, due to how "get_principal_from_request" works, we would need to validate the service account + against IT, but the only way to do it is by using a bearer token that we can only obtain from the incoming request. + + Until RBAC is given some other means to validate the service accounts, we are skipping that validation. Also, this + does not affect the other endpoints where we need to validate service accounts —listed below just in case—, because + there the bearer token with that claim is a must to be able to manage the service accounts. The endpoints that are + fine are: + + - GET /principals/?type=service-account + - GET /groups/{uuid}/principals/?principal_type=service-account + - POST /groups/{uuid}/principals/ with a service account payload + - DELETE /groups/{uuid}/principals/?service-account=. + """ + # First check if principal exist on our side, if not call BOP to check if user exist in the account. + tenant: Tenant = request.tenant + is_username_service_account = ITService.is_username_service_account(username) try: # If the username was provided through a query we must verify if it exists in the corresponding services first. - if from_query: - if ITService.is_username_service_account(username=username): - it_service = ITService() - if not it_service.is_service_account_valid_by_username(user=user, service_account_username=username): - raise serializers.ValidationError( - {"detail": f"No data found for service account with username '{username}'"} - ) - else: - verify_principal_with_proxy(username, request, verify_principal=verify_principal) + if from_query and not is_username_service_account: + verify_principal_with_proxy(username=username, request=request, verify_principal=verify_principal) + principal = Principal.objects.get(username__iexact=username, tenant=tenant) except Principal.DoesNotExist: # If the "from query" parameter was specified, the username was validated above, so there is no need to # validate it again. - if not from_query: - if ITService.is_username_service_account(username): - it_service = ITService() - if not it_service.is_service_account_valid_by_username(user=user, service_account_username=username): - raise serializers.ValidationError( - {"detail": f"No data found for service account with username '{username}'"} - ) - else: - verify_principal_with_proxy(username, request, verify_principal=verify_principal) - - if ITService.is_username_service_account(username=username): + if not from_query and not is_username_service_account: + verify_principal_with_proxy(username=username, request=request, verify_principal=verify_principal) + + if is_username_service_account: client_id: uuid.UUID = ITService.extract_client_id_service_account_username(username) principal, _ = Principal.objects.get_or_create( diff --git a/tests/api/common/test_exception_handler.py b/tests/api/common/test_exception_handler.py index eced79be5..854fdfc8c 100644 --- a/tests/api/common/test_exception_handler.py +++ b/tests/api/common/test_exception_handler.py @@ -24,7 +24,8 @@ from rest_framework.views import Response from unittest.mock import Mock -from api.common.exception_handler import _generate_errors_from_dict, custom_exception_handler +from api.common.exception_handler import custom_exception_handler +from api.common.exception_handler import _generate_errors_from_dict, _generate_error_data_payload_response class ExceptionHandlerTest(TestCase): @@ -92,8 +93,14 @@ def test_generate_errors_from_dict(self): def test_invalid_token_exception_handled(self): """Test that an "invalid token" exception gets properly handled.""" + # Mock the view and the context. + mocked_view = Mock() + mocked_view.basename = "some-view-handler" + + context = {"view": mocked_view} + # Call the function under test. - response: Response = custom_exception_handler(exc=InvalidTokenError(), context=Mock()) + response: Response = custom_exception_handler(exc=InvalidTokenError(), context=context) # Assert that the correct response was returned for the exception. self.assertEqual( @@ -108,6 +115,12 @@ def test_invalid_token_exception_handled(self): "unexpected error message in the response for the 'InvalidTokenError' exception handling", ) + self.assertEqual( + mocked_view.basename, + str(response.data.get("errors")[0].get("source")), + "unexpected source view in the response for the 'MissingAuthorizationError' exception handling", + ) + def test_missing_authorization_exception_handled(self): """Test that a "missing authorization" exception gets properly handled.""" # Mock the view and the context. @@ -147,23 +160,85 @@ def test_unable_meet_prerequisites_exception_handled(self): context = {"view": mocked_view} # Call the function under test. - response: Response = custom_exception_handler(exc=MissingAuthorizationError(), context=context) + response: Response = custom_exception_handler(exc=UnableMeetPrerequisitesError(), context=context) # Assert that the correct response was returned for the exception. self.assertEqual( - status.HTTP_401_UNAUTHORIZED, + status.HTTP_500_INTERNAL_SERVER_ERROR, response.status_code, - "unexpected status code in the response for the 'MissingAuthorizationError' exception handling", + "unexpected status code in the response for the 'UnableMeetPrerequisitesError' exception handling", ) self.assertEqual( - "A Bearer token in an authorization header is required when performing service account operations.", + "Unable to validate the provided token.", str(response.data.get("errors")[0].get("detail")), - "unexpected error message in the response for the 'MissingAuthorizationError' exception handling", + "unexpected error message in the response for the 'UnableMeetPrerequisitesError' exception handling", ) self.assertEqual( mocked_view.basename, str(response.data.get("errors")[0].get("source")), - "unexpected source view in the response for the 'MissingAuthorizationError' exception handling", + "unexpected source view in the response for the 'UnableMeetPrerequisitesError' exception handling", + ) + + def test_generate_error_data_payload_with_view_response(self): + """Tests that the function under test generates the data payload correctly when a view is passed in the context.""" + # Prepare a payload with a view in the context. + detail = "some error message" + mocked_view = Mock() + mocked_view.basename = "some-view-handler" + context = {"view": mocked_view} + http_status_code = status.HTTP_200_OK + + # Call the function under test. + result = _generate_error_data_payload_response( + detail=detail, context=context, http_status_code=http_status_code + ) + + # Assert that the correct structure is returned. + errors = result.get("errors") + if not errors: + self.fail(f"the errors array was not present in the payload: {result}") + + if len(errors) != 1: + self.fail(f"only one error was expected in the payload: {result}") + + only_error = errors[0] + + self.assertEqual(detail, only_error.get("detail"), f"unexpected detail message in the payload: {result}") + + self.assertEqual( + mocked_view.basename, only_error.get("source"), f"unexpected detail message in the payload: {result}" + ) + + self.assertEqual( + str(http_status_code), only_error.get("status"), f"unexpected status code in the payload: {result}" + ) + + def test_generate_error_data_payload_without_view_response(self): + """Tests that the function under test generates the data payload correctly when no view is passed in the context.""" + # Prepare a payload without a view in the context. + detail = "some error message" + context = {} + http_status_code = status.HTTP_200_OK + + # Call the function under test. + result = _generate_error_data_payload_response( + detail=detail, context=context, http_status_code=http_status_code + ) + + # Assert that the correct structure is returned. + errors = result.get("errors") + if not errors: + self.fail(f"the errors array was not present in the payload: {result}") + + if len(errors) != 1: + self.fail(f"only one error was expected in the payload: {result}") + + only_error = errors[0] + + self.assertEqual(detail, only_error.get("detail"), f"unexpected detail message in the payload: {result}") + + self.assertEqual( + str(http_status_code), only_error.get("status"), f"unexpected status code in the payload: {result}" ) diff --git a/tests/management/test_utils.py b/tests/management/test_utils.py index bf58d0d90..d6bad005f 100644 --- a/tests/management/test_utils.py +++ b/tests/management/test_utils.py @@ -161,263 +161,69 @@ def test_get_principal_created(self, mocked): self.assertEqual(created_service_account.type, "user") self.assertEqual(created_service_account.username, user.username) - @mock.patch("management.authorization.token_validator.ITSSOTokenValidator.validate_token") - @mock.patch("management.principal.it_service.ITService.is_service_account_valid_by_username") @mock.patch("management.utils.verify_principal_with_proxy") - def test_get_principal_service_account_created( - self, mocked: Mock, is_service_account_valid_by_username: Mock, validate_token: Mock - ): - """Test that when a service account principal does not exist in the database, it gets created.""" - # Build a non-existent service account. - user = User() - user.client_id = uuid.uuid4() - user.is_service_account = True - user.username = f"service-account-{user.client_id}" + def test_get_username_principal_not_service_account_validated(self, verify_principal_with_proxy: Mock): + """Test that the username gets validated only when it is not a service account username.""" + # Create a database principal in order to make the function under test get it from the database, and to make + # sure that we are calling the first "verify_principal_with_proxy" call and not the one in the exception + # handling block. + database_principal = Principal() + database_principal.cross_account = False + database_principal.tenant = self.tenant + database_principal.type = "user" + database_principal.username = "clearly-not-a-service-account-db" + database_principal.save() request = mock.Mock() - request.user = user request.tenant = self.tenant - request.query_params = {} - - # Make sure the service account gets flagged as valid so that it gets persisted in the database. - is_service_account_valid_by_username.return_value = True - - # Attempt to fetch the service account principal from the database. Since it does not exist, it should create - # one. - get_principal(username=user.username, request=request) - - # Assert that the service account was properly created in the database. - created_service_account = Principal.objects.get(username=user.username) - self.assertEqual(created_service_account.service_account_id, str(user.client_id)) - self.assertEqual(created_service_account.type, "service-account") - self.assertEqual(created_service_account.username, user.username) - - @mock.patch("management.principal.it_service.ITService.is_service_account_valid_by_username") - def test_get_principal_from_query_service_account_validated_once_principal_exists( - self, is_service_account_valid_by_username: Mock - ): - """Test that when specifying the "from query" parameter the service account is validated""" - # Create a service account principal in the database, which will be fetched by the function under test. - client_id = uuid.uuid4() - username = f"service-account-{client_id}" - - created_principal = Principal.objects.create( - username=username, tenant=self.tenant, type=SERVICE_ACCOUNT_KEY, service_account_id=client_id - ) - - # Simulate that a bearer token was given, and that it was correctly validated. - validate_token.return_value = "mocked-bearer-token" - - # Simulate that the IT service says the service account's username is valid. - is_service_account_valid_by_username.return_value = True - - # Mock the request with the required bits. - request = Mock() - request.tenant = self.tenant - request.user = User() - - # Call the function under test. - returned_principal = get_principal( - username=username, request=request, verify_principal=True, from_query=True - ) - - self.assertEqual( - created_principal, - returned_principal, - "the service account principal we created and the returned principal should be the same", - ) - - @mock.patch("management.authorization.token_validator.ITSSOTokenValidator.validate_token") - @mock.patch("management.principal.it_service.ITService.is_service_account_valid_by_username") - def test_get_principal_from_query_service_account_not_validated_validation_error( - self, is_service_account_valid_by_username: Mock, validate_token: Mock - ): - """Test that when the service account username cannot be validated, a validation error is raised""" - # Create a service account principal in the database, which will be fetched by the function under test. - client_id = uuid.uuid4() - username = f"service-account-{client_id}" - - # Simulate that a bearer token was given, and that it was correctly validated. - validate_token.return_value = "mocked-bearer-token" - # Simulate that the IT service says the service account's username is valid. - is_service_account_valid_by_username.return_value = False + # Call the function under test with the database principal's username and the "from_query" flag as "True", so + # that we execute the "verify_principal_with_proxy" function from the "try" block, and not the "except" one. + fetched_result = get_principal(username=database_principal.username, request=request, from_query=True) - # Mock the request with the required bits. - request = Mock() - request.tenant = self.tenant - request.user = User() - - # Call the function under test. - try: - get_principal(username=username, request=request, verify_principal=True, from_query=True) - self.fail("expected a validation exception, none gotten") - except ValidationError as ve: - self.assertEqual( - f"No data found for service account with username '{username}'", - str(ve.detail.get("detail")), - "unexpected exception message", - ) - - # Assert that the validation function gets called once. - is_service_account_valid_by_username.assert_called_with(user=request.user, service_account_username=username) - - @mock.patch("management.authorization.token_validator.ITSSOTokenValidator.validate_token") - @mock.patch("management.principal.it_service.ITService.is_service_account_valid_by_username") - def test_get_principal_from_query_service_account_validated_principal_exists( - self, is_service_account_valid_by_username: Mock, validate_token: Mock - ): - """Test that when specifying the "from query" parameter the service account is validated""" - # Create a service account principal in the database, which will be fetched by the function under test. - client_id = uuid.uuid4() - username = f"service-account-{client_id}" - - created_principal = Principal.objects.create( - username=username, tenant=self.tenant, type=SERVICE_ACCOUNT_KEY, service_account_id=client_id + # Verify that the principal validation function got called. + verify_principal_with_proxy.assert_called_with( + username=database_principal.username, request=request, verify_principal=True ) - # Simulate that a bearer token was given, and that it was correctly validated. - validate_token.return_value = "mocked-bearer-token" - - # Simulate that the IT service says the service account's username is valid. - is_service_account_valid_by_username.return_value = True - - # Mock the request with the required bits. - request = Mock() - request.tenant = self.tenant - request.user = User() - - # Call the function under test. - returned_principal = get_principal(username=username, request=request, verify_principal=True, from_query=True) - - # Assert that the validation function gets called once. - is_service_account_valid_by_username.assert_called_with(user=request.user, service_account_username=username) - - # Assert that the returned principal is the same one as the one created for the test. self.assertEqual( - created_principal, - returned_principal, - "the service account principal we created and the returned principal should be the same", + database_principal.uuid, + fetched_result.uuid, + "this flags that the fetched principal is not the one created to avoid executing the code in the" + " exception handling block", ) - @mock.patch("management.authorization.token_validator.ITSSOTokenValidator.validate_token") - @mock.patch("management.principal.it_service.ITService.is_service_account_valid_by_username") - def test_get_principal_from_query_service_account_validated_principal_not_exists( - self, is_service_account_valid_by_username: Mock, validate_token: Mock - ): - """Test that when specifying the "from query" parameter the service account is validated just once""" - # Create a service account principal in the database, which will be fetched by the function under test. - client_id = uuid.uuid4() - username = f"service-account-{client_id}" - - # Simulate that a bearer token was given, and that it was correctly validated. - validate_token.return_value = "mocked-bearer-token" - - # Simulate that the IT service says the service account's username is valid. - is_service_account_valid_by_username.return_value = True - - # Mock the request with the required bits. - user = User() - user.client_id = str(uuid.uuid4()) - user.is_service_account = True - user.username = f"service-account-{user.client_id}" - - request = Mock() - request.tenant = self.tenant - request.user = user + # Call the function under test again which should create the new principal, and run the + # "verify_principal_with_proxy" method from the "except" block, and not from the "try" block. + username = "clearly-not-another-service-account" - # Call the function under test. - returned_principal: Principal = get_principal( - username=username, request=request, verify_principal=True, from_query=True - ) + created_result = get_principal(username=username, request=request, from_query=False) - # Assert that the validation function gets called once. - self.assertEqual( - 1, - is_service_account_valid_by_username.call_count, - "the verification of the service account should have only be made once", - ) - is_service_account_valid_by_username.assert_called_with(user=request.user, service_account_username=username) + # Verify that the principal validation function got called. + verify_principal_with_proxy.assert_called_with(username=username, request=request, verify_principal=True) - # Assert that the created principal has the data from the username. - self.assertEqual( - username, - returned_principal.username, - "the username of the created service account principal does not match the given username to the function", - ) - self.assertEqual( - self.tenant, - returned_principal.tenant, - "the created service account's principal tenant does not match the one specified", - ) - self.assertEqual( - SERVICE_ACCOUNT_KEY, - returned_principal.type, - "the type of the created service account principal is not correct", - ) - self.assertEqual( - client_id, - returned_principal.service_account_id, - "the client ID of the created service account principal does not match the one given to the function under test", + self.assertNotEqual( + database_principal.uuid, + created_result.uuid, + "this flags that the specified username is from the database principal created in the test, but in" + "this case we were expecting a new principal to be created", ) - @mock.patch("management.authorization.token_validator.ITSSOTokenValidator.validate_token") - @mock.patch("management.principal.it_service.ITService.is_service_account_valid_by_username") - def test_get_principal_service_account_validated_principal_not_exists( - self, is_service_account_valid_by_username: Mock, validate_token: Mock - ): - """Test that when the "from query" parameter is missing the service account is validated just once""" - # Create a service account principal in the database, which will be fetched by the function under test. + def test_get_principal_service_account_created(self): + """Test that when a service account principal does not exist in the database, it gets created.""" client_id = uuid.uuid4() - username = f"service-account-{client_id}" - - # Simulate that a bearer token was given, and that it was correctly validated. - validate_token.return_value = "mocked-bearer-token" - - # Simulate that the IT service says the service account's username is valid. - is_service_account_valid_by_username.return_value = True + service_account_username = f"service-account-{client_id}" - # Mock the request with the required bits. - user = User() - user.client_id = str(uuid.uuid4()) - user.is_service_account = True - user.username = f"service-account-{user.client_id}" - - request = Mock() + request = mock.Mock() request.tenant = self.tenant - request.user = user - - # Call the function under test. - returned_principal: Principal = get_principal( - username=username, request=request, verify_principal=True, from_query=False - ) + request.query_params = {} - # Assert that the validation function gets called once. - self.assertEqual( - 1, - is_service_account_valid_by_username.call_count, - "the verification of the service account should have only be made once", - ) - is_service_account_valid_by_username.assert_called_with(user=request.user, service_account_username=username) + # Attempt to fetch the service account principal from the database. Since it does not exist, it should create + # one. + get_principal(username=service_account_username, request=request) - # Assert that the created principal has the data from the username. - self.assertEqual( - username, - returned_principal.username, - "the username of the created service account principal does not match the given username to the function", - ) - self.assertEqual( - self.tenant, - returned_principal.tenant, - "the created service account's principal tenant does not match the one specified", - ) - self.assertEqual( - SERVICE_ACCOUNT_KEY, - returned_principal.type, - "the type of the created service account principal is not correct", - ) - self.assertEqual( - client_id, - returned_principal.service_account_id, - "the client ID of the created service account principal does not match the one given to the function under test", - ) + # Assert that the service account was properly created in the database. + created_service_account = Principal.objects.get(username=service_account_username) + self.assertEqual(created_service_account.service_account_id, str(client_id)) + self.assertEqual(created_service_account.type, "service-account") + self.assertEqual(created_service_account.username, service_account_username)