Skip to content

Commit

Permalink
Merge pull request RedHatInsights#1052 from MikelAlejoBR/RHCLOUD-3127…
Browse files Browse the repository at this point in the history
…5-fix-requiring-tokens-for-access

RHCLOUD-31275 | fix requiring tokens for access
  • Loading branch information
petracihalova authored Mar 1, 2024
2 parents d3ae831 + fc67ae4 commit 2bf3144
Show file tree
Hide file tree
Showing 4 changed files with 191 additions and 307 deletions.
60 changes: 33 additions & 27 deletions rbac/api/common/exception_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,25 @@ def _generate_errors_from_dict(data, **kwargs):
return errors


def _generate_error_data_payload_response(detail: str, context, http_status_code: int) -> dict:
"""Generate the payload for the "data" parameter of the response."""
data = {
"errors": [
{
"detail": detail,
"status": str(http_status_code),
}
]
}

# Some exceptions might be raised from places that are not views.
view = context.get("view")
if view:
data["errors"][0]["source"] = view.basename

return data


def custom_exception_handler(exc, context):
"""Create custom response for exceptions."""
response = exception_handler(exc, context)
Expand All @@ -87,43 +106,30 @@ def custom_exception_handler(exc, context):
)
elif isinstance(exc, InvalidTokenError):
response = Response(
data={
"errors": [
{
"detail": "Invalid token provided.",
"status": str(status.HTTP_401_UNAUTHORIZED),
}
]
},
data=_generate_error_data_payload_response(
detail="Invalid token provided.", context=context, http_status_code=status.HTTP_401_UNAUTHORIZED
),
content_type="application/json",
status=status.HTTP_401_UNAUTHORIZED,
)
elif isinstance(exc, MissingAuthorizationError):
response = Response(
data={
"errors": [
{
"detail": "A Bearer token in an authorization header is required when"
" performing service account operations.",
"source": context.get("view").basename,
"status": str(status.HTTP_401_UNAUTHORIZED),
}
]
},
data=_generate_error_data_payload_response(
detail="A Bearer token in an authorization header is required when performing service account"
" operations.",
context=context,
http_status_code=status.HTTP_401_UNAUTHORIZED,
),
content_type="application/json",
status=status.HTTP_401_UNAUTHORIZED,
)
elif isinstance(exc, UnableMeetPrerequisitesError):
response = Response(
data={
"errors": [
{
"detail": "Unable to validate the provided token.",
"source": context.get("view").basename,
"status": str(status.HTTP_500_INTERNAL_SERVER_ERROR),
}
]
},
data=_generate_error_data_payload_response(
detail="Unable to validate the provided token.",
context=context,
http_status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
),
content_type="application/json",
status=status.HTTP_500_INTERNAL_SERVER_ERROR,
)
Expand Down
63 changes: 30 additions & 33 deletions rbac/management/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,6 @@
from django.conf import settings
from django.core.exceptions import PermissionDenied
from django.utils.translation import gettext as _
from management.authorization.scope_claims import ScopeClaims
from management.authorization.token_validator import ITSSOTokenValidator
from management.models import Access, Group, Policy, Principal, Role
from management.permissions.principal_access import PrincipalAccessPermission
from management.principal.it_service import ITService
Expand Down Expand Up @@ -72,45 +70,44 @@ def get_principal_from_request(request):
def get_principal(
username: str, request: Request, verify_principal: bool = True, from_query: bool = False
) -> Principal:
"""Get principals from username."""
# First check if principal exist on our side,
# if not call BOP to check if user exist in the account.
tenant: Tenant = request.tenant
user = request.user
"""Get principals from username.
# Get the Bearer Token validated to get Service Accounts from IT
if ITService.is_username_service_account(username=username):
token_validator = ITSSOTokenValidator()
user.bearer_token = token_validator.validate_token(
request=request, additional_scopes_to_validate=set[ScopeClaims]([ScopeClaims.SERVICE_ACCOUNTS_CLAIM])
)
The service account usernames are not being validated for now because that would require for the clients to send
tokens with the "api.iam.service_accounts" scope. That conflicts with the following two use cases:
- A user sends a request to the /access/?username=service-account-<uuid> endpoint.
- A service account makes a request to /access.
In these two cases, due to how "get_principal_from_request" works, we would need to validate the service account
against IT, but the only way to do it is by using a bearer token that we can only obtain from the incoming request.
Until RBAC is given some other means to validate the service accounts, we are skipping that validation. Also, this
does not affect the other endpoints where we need to validate service accounts —listed below just in case—, because
there the bearer token with that claim is a must to be able to manage the service accounts. The endpoints that are
fine are:
- GET /principals/?type=service-account
- GET /groups/{uuid}/principals/?principal_type=service-account
- POST /groups/{uuid}/principals/ with a service account payload
- DELETE /groups/{uuid}/principals/?service-account=<uuid>.
"""
# First check if principal exist on our side, if not call BOP to check if user exist in the account.
tenant: Tenant = request.tenant
is_username_service_account = ITService.is_username_service_account(username)

try:
# If the username was provided through a query we must verify if it exists in the corresponding services first.
if from_query:
if ITService.is_username_service_account(username=username):
it_service = ITService()
if not it_service.is_service_account_valid_by_username(user=user, service_account_username=username):
raise serializers.ValidationError(
{"detail": f"No data found for service account with username '{username}'"}
)
else:
verify_principal_with_proxy(username, request, verify_principal=verify_principal)
if from_query and not is_username_service_account:
verify_principal_with_proxy(username=username, request=request, verify_principal=verify_principal)

principal = Principal.objects.get(username__iexact=username, tenant=tenant)
except Principal.DoesNotExist:
# If the "from query" parameter was specified, the username was validated above, so there is no need to
# validate it again.
if not from_query:
if ITService.is_username_service_account(username):
it_service = ITService()
if not it_service.is_service_account_valid_by_username(user=user, service_account_username=username):
raise serializers.ValidationError(
{"detail": f"No data found for service account with username '{username}'"}
)
else:
verify_principal_with_proxy(username, request, verify_principal=verify_principal)

if ITService.is_username_service_account(username=username):
if not from_query and not is_username_service_account:
verify_principal_with_proxy(username=username, request=request, verify_principal=verify_principal)

if is_username_service_account:
client_id: uuid.UUID = ITService.extract_client_id_service_account_username(username)

principal, _ = Principal.objects.get_or_create(
Expand Down
91 changes: 83 additions & 8 deletions tests/api/common/test_exception_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,8 @@
from rest_framework.views import Response
from unittest.mock import Mock

from api.common.exception_handler import _generate_errors_from_dict, custom_exception_handler
from api.common.exception_handler import custom_exception_handler
from api.common.exception_handler import _generate_errors_from_dict, _generate_error_data_payload_response


class ExceptionHandlerTest(TestCase):
Expand Down Expand Up @@ -92,8 +93,14 @@ def test_generate_errors_from_dict(self):

def test_invalid_token_exception_handled(self):
"""Test that an "invalid token" exception gets properly handled."""
# Mock the view and the context.
mocked_view = Mock()
mocked_view.basename = "some-view-handler"

context = {"view": mocked_view}

# Call the function under test.
response: Response = custom_exception_handler(exc=InvalidTokenError(), context=Mock())
response: Response = custom_exception_handler(exc=InvalidTokenError(), context=context)

# Assert that the correct response was returned for the exception.
self.assertEqual(
Expand All @@ -108,6 +115,12 @@ def test_invalid_token_exception_handled(self):
"unexpected error message in the response for the 'InvalidTokenError' exception handling",
)

self.assertEqual(
mocked_view.basename,
str(response.data.get("errors")[0].get("source")),
"unexpected source view in the response for the 'MissingAuthorizationError' exception handling",
)

def test_missing_authorization_exception_handled(self):
"""Test that a "missing authorization" exception gets properly handled."""
# Mock the view and the context.
Expand Down Expand Up @@ -147,23 +160,85 @@ def test_unable_meet_prerequisites_exception_handled(self):
context = {"view": mocked_view}

# Call the function under test.
response: Response = custom_exception_handler(exc=MissingAuthorizationError(), context=context)
response: Response = custom_exception_handler(exc=UnableMeetPrerequisitesError(), context=context)

# Assert that the correct response was returned for the exception.
self.assertEqual(
status.HTTP_401_UNAUTHORIZED,
status.HTTP_500_INTERNAL_SERVER_ERROR,
response.status_code,
"unexpected status code in the response for the 'MissingAuthorizationError' exception handling",
"unexpected status code in the response for the 'UnableMeetPrerequisitesError' exception handling",
)

self.assertEqual(
"A Bearer token in an authorization header is required when performing service account operations.",
"Unable to validate the provided token.",
str(response.data.get("errors")[0].get("detail")),
"unexpected error message in the response for the 'MissingAuthorizationError' exception handling",
"unexpected error message in the response for the 'UnableMeetPrerequisitesError' exception handling",
)

self.assertEqual(
mocked_view.basename,
str(response.data.get("errors")[0].get("source")),
"unexpected source view in the response for the 'MissingAuthorizationError' exception handling",
"unexpected source view in the response for the 'UnableMeetPrerequisitesError' exception handling",
)

def test_generate_error_data_payload_with_view_response(self):
"""Tests that the function under test generates the data payload correctly when a view is passed in the context."""
# Prepare a payload with a view in the context.
detail = "some error message"
mocked_view = Mock()
mocked_view.basename = "some-view-handler"
context = {"view": mocked_view}
http_status_code = status.HTTP_200_OK

# Call the function under test.
result = _generate_error_data_payload_response(
detail=detail, context=context, http_status_code=http_status_code
)

# Assert that the correct structure is returned.
errors = result.get("errors")
if not errors:
self.fail(f"the errors array was not present in the payload: {result}")

if len(errors) != 1:
self.fail(f"only one error was expected in the payload: {result}")

only_error = errors[0]

self.assertEqual(detail, only_error.get("detail"), f"unexpected detail message in the payload: {result}")

self.assertEqual(
mocked_view.basename, only_error.get("source"), f"unexpected detail message in the payload: {result}"
)

self.assertEqual(
str(http_status_code), only_error.get("status"), f"unexpected status code in the payload: {result}"
)

def test_generate_error_data_payload_without_view_response(self):
"""Tests that the function under test generates the data payload correctly when no view is passed in the context."""
# Prepare a payload without a view in the context.
detail = "some error message"
context = {}
http_status_code = status.HTTP_200_OK

# Call the function under test.
result = _generate_error_data_payload_response(
detail=detail, context=context, http_status_code=http_status_code
)

# Assert that the correct structure is returned.
errors = result.get("errors")
if not errors:
self.fail(f"the errors array was not present in the payload: {result}")

if len(errors) != 1:
self.fail(f"only one error was expected in the payload: {result}")

only_error = errors[0]

self.assertEqual(detail, only_error.get("detail"), f"unexpected detail message in the payload: {result}")

self.assertEqual(
str(http_status_code), only_error.get("status"), f"unexpected status code in the payload: {result}"
)
Loading

0 comments on commit 2bf3144

Please sign in to comment.