From a37c272e8c607f972a5e075216b852bc3c6dd8f6 Mon Sep 17 00:00:00 2001 From: Mikel Alejo Barcina Ribera Date: Wed, 17 Jan 2024 17:33:45 +0100 Subject: [PATCH 1/3] fix: the access endpoint does not work for checking principals' permissions The "access" endpoint should return the permissions for a user for a given application when the user or service account that is requesting them has the proper permissions. However, since the "access" endpoint receives so much traffic, loading permissions for every user that sent a request to the endpoint was becoming much of a burden. Therefore, loading the users' permissions was restricted to a few particular cases. These changes should allow to satisfy the use case we are after without sacrificing the restrictions that were set for the endpoint. RHCLOUD-30043 --- rbac/rbac/middleware.py | 30 +++++++++++++-- tests/rbac/test_middleware.py | 69 +++++++++++++++++++++++++++++++++++ 2 files changed, 96 insertions(+), 3 deletions(-) diff --git a/rbac/rbac/middleware.py b/rbac/rbac/middleware.py index f2c2c2b8..90f6d1fc 100644 --- a/rbac/rbac/middleware.py +++ b/rbac/rbac/middleware.py @@ -22,8 +22,9 @@ from json.decoder import JSONDecodeError from django.conf import settings +from django.core.handlers.wsgi import WSGIRequest from django.db import IntegrityError -from django.http import Http404, HttpResponse +from django.http import Http404, HttpResponse, QueryDict from django.urls import resolve from django.utils.deprecation import MiddlewareMixin from management.cache import TenantCache @@ -259,7 +260,7 @@ def process_request(self, request): # pylint: disable=R1710 return HttpResponse(json.dumps(payload), content_type="application/json", status=400) if settings.AUTHENTICATE_WITH_ORG_ID: - if not user.admin and not (request.path.endswith("/access/") and request.method == "GET"): + if self.is_allowed_load_user_permissions(request, user): try: tenant = Tenant.objects.filter(org_id=user.org_id).get() except Tenant.DoesNotExist: @@ -268,7 +269,7 @@ def process_request(self, request): # pylint: disable=R1710 user.access = IdentityHeaderMiddleware._get_access_for_user(user.username, tenant) else: - if not user.admin and not (request.path.endswith("/access/") and request.method == "GET"): + if self.is_allowed_load_user_permissions(request, user): try: tenant_name = create_tenant_name(user.account) tenant = Tenant.objects.filter(tenant_name=tenant_name).get() @@ -416,6 +417,29 @@ def process_response(self, request, response): # pylint: disable=no-self-use IdentityHeaderMiddleware.log_request(request, response, is_internal) return response + def is_allowed_load_user_permissions(self, request: WSGIRequest, user: User) -> bool: + """Decide whether RBAC should load the access permissions for the user based on the given request.""" + # Organization administrators will have already all the permissions so there is no need to load permissions for + # them. + if user.admin: + return False + + # The access endpoint gets a lot of traffic, so we need to restrict for which queries we are actually going + # to load the user permissions, since it is a very heavy operation. The following Jira tickets have more + # details: + # + # - RHCLOUD-15394 + # - RHCLOUD-29631 + # + # There is one use case where we need to load the user's permissions: whenever they want to query for their + # or other users' permissions. In that case, we need to know if they're allowed to do so, and for that, we + # need to preload their permissions to check them afterwards in the subsequent permission checkers. + if request.path.endswith("/access/") and request.method == "GET": + query_params: QueryDict = request.GET + return "application" in query_params and "username" in query_params + else: + return True + class DisableCSRF(MiddlewareMixin): # pylint: disable=too-few-public-methods """Middleware to disable CSRF for 3scale usecase.""" diff --git a/tests/rbac/test_middleware.py b/tests/rbac/test_middleware.py index d4c3755e..4618d281 100644 --- a/tests/rbac/test_middleware.py +++ b/tests/rbac/test_middleware.py @@ -19,6 +19,7 @@ import os from unittest.mock import Mock from django.conf import settings +from django.http import QueryDict from django.test import TestCase from django.urls import reverse @@ -273,6 +274,74 @@ def test_tenant_process_without_org_id(self): self.assertEqual(Tenant.objects.filter(tenant_name="test_user").count(), 1) self.assertEqual(Tenant.objects.filter(tenant_name="test_user").first().org_id, None) + def test_is_allowed_load_user_permissions_org_admin(self): + """Tests that the function that determines if user permissions should be loaded returns False for org admins.""" + user = User() + user.admin = True + + middleware = IdentityHeaderMiddleware(get_response=Mock()) + self.assertEqual(middleware.is_allowed_load_user_permissions(Mock(), user), False) + + def test_is_allowed_load_user_permissions_regular_user_non_access_endpoint(self): + """Tests that the function under test returns True for regular users who have requested a path which isn't the access path""" + user = User() + user.admin = False + + request = Mock() + request.path = "/principals/" + + middleware = IdentityHeaderMiddleware(get_response=Mock()) + self.assertEqual(middleware.is_allowed_load_user_permissions(request, user), True) + + def test_is_allowed_load_user_permissions_regular_user_access_non_get_request(self): + """Tests that the function under test returns True for regular users who have requested the access path but with a different HTTP verb than GET""" + user = User() + user.admin = False + + request = Mock() + request.path = "/access/" + + middleware = IdentityHeaderMiddleware(get_response=Mock()) + + invalid_verbs = ["DELETE", "POST", "PATCH"] + for verb in invalid_verbs: + request.method = verb + self.assertEqual(middleware.is_allowed_load_user_permissions(request, user), True) + + def test_is_allowed_load_user_permissions_regular_user_access(self): + """Tests that the function under test returns True for regular users who have requested the access path with the expected query parameters""" + user = User() + user.admin = False + + request = Mock() + request.path = "/access/" + request.method = "GET" + request.GET = QueryDict("application=rbac&username=foo") + middleware = IdentityHeaderMiddleware(get_response=Mock()) + self.assertEqual(middleware.is_allowed_load_user_permissions(request, user), True) + + def test_is_allowed_load_user_permissions_regular_user_access_missing_query_params(self): + """Tests that the function under test returns False for regular users who have requested the access path without the expected query parameters""" + user = User() + user.admin = False + + request = Mock() + request.path = "/access/" + request.method = "GET" + + test_cases: list[QueryDict] = [ + QueryDict("application=rbac"), + QueryDict("username=foo"), + QueryDict("applications=rbac&username=foo"), + QueryDict("application=rbac&usernames=foo"), + ] + + middleware = IdentityHeaderMiddleware(get_response=Mock()) + for test_case in test_cases: + request.GET = test_case + + self.assertEqual(middleware.is_allowed_load_user_permissions(request, user), False) + class ServiceToService(IdentityRequest): """Tests requests without an identity header.""" From a698955baec80c3b7a0d60aaa50e5103fc48574e Mon Sep 17 00:00:00 2001 From: Mikel Alejo Barcina Ribera Date: Mon, 22 Jan 2024 10:15:07 +0100 Subject: [PATCH 2/3] refactor: rename function RHCLOUD-30043 --- rbac/rbac/middleware.py | 6 +++--- tests/rbac/test_middleware.py | 20 ++++++++++---------- 2 files changed, 13 insertions(+), 13 deletions(-) diff --git a/rbac/rbac/middleware.py b/rbac/rbac/middleware.py index 90f6d1fc..72eae57f 100644 --- a/rbac/rbac/middleware.py +++ b/rbac/rbac/middleware.py @@ -260,7 +260,7 @@ def process_request(self, request): # pylint: disable=R1710 return HttpResponse(json.dumps(payload), content_type="application/json", status=400) if settings.AUTHENTICATE_WITH_ORG_ID: - if self.is_allowed_load_user_permissions(request, user): + if self.should_load_user_permissions(request, user): try: tenant = Tenant.objects.filter(org_id=user.org_id).get() except Tenant.DoesNotExist: @@ -269,7 +269,7 @@ def process_request(self, request): # pylint: disable=R1710 user.access = IdentityHeaderMiddleware._get_access_for_user(user.username, tenant) else: - if self.is_allowed_load_user_permissions(request, user): + if self.should_load_user_permissions(request, user): try: tenant_name = create_tenant_name(user.account) tenant = Tenant.objects.filter(tenant_name=tenant_name).get() @@ -417,7 +417,7 @@ def process_response(self, request, response): # pylint: disable=no-self-use IdentityHeaderMiddleware.log_request(request, response, is_internal) return response - def is_allowed_load_user_permissions(self, request: WSGIRequest, user: User) -> bool: + def should_load_user_permissions(self, request: WSGIRequest, user: User) -> bool: """Decide whether RBAC should load the access permissions for the user based on the given request.""" # Organization administrators will have already all the permissions so there is no need to load permissions for # them. diff --git a/tests/rbac/test_middleware.py b/tests/rbac/test_middleware.py index 4618d281..49eae020 100644 --- a/tests/rbac/test_middleware.py +++ b/tests/rbac/test_middleware.py @@ -274,15 +274,15 @@ def test_tenant_process_without_org_id(self): self.assertEqual(Tenant.objects.filter(tenant_name="test_user").count(), 1) self.assertEqual(Tenant.objects.filter(tenant_name="test_user").first().org_id, None) - def test_is_allowed_load_user_permissions_org_admin(self): + def test_should_load_user_permissions_org_admin(self): """Tests that the function that determines if user permissions should be loaded returns False for org admins.""" user = User() user.admin = True middleware = IdentityHeaderMiddleware(get_response=Mock()) - self.assertEqual(middleware.is_allowed_load_user_permissions(Mock(), user), False) + self.assertEqual(middleware.should_load_user_permissions(Mock(), user), False) - def test_is_allowed_load_user_permissions_regular_user_non_access_endpoint(self): + def test_should_load_user_permissions_regular_user_non_access_endpoint(self): """Tests that the function under test returns True for regular users who have requested a path which isn't the access path""" user = User() user.admin = False @@ -291,9 +291,9 @@ def test_is_allowed_load_user_permissions_regular_user_non_access_endpoint(self) request.path = "/principals/" middleware = IdentityHeaderMiddleware(get_response=Mock()) - self.assertEqual(middleware.is_allowed_load_user_permissions(request, user), True) + self.assertEqual(middleware.should_load_user_permissions(request, user), True) - def test_is_allowed_load_user_permissions_regular_user_access_non_get_request(self): + def test_should_load_user_permissions_regular_user_access_non_get_request(self): """Tests that the function under test returns True for regular users who have requested the access path but with a different HTTP verb than GET""" user = User() user.admin = False @@ -306,9 +306,9 @@ def test_is_allowed_load_user_permissions_regular_user_access_non_get_request(se invalid_verbs = ["DELETE", "POST", "PATCH"] for verb in invalid_verbs: request.method = verb - self.assertEqual(middleware.is_allowed_load_user_permissions(request, user), True) + self.assertEqual(middleware.should_load_user_permissions(request, user), True) - def test_is_allowed_load_user_permissions_regular_user_access(self): + def test_should_load_user_permissions_regular_user_access(self): """Tests that the function under test returns True for regular users who have requested the access path with the expected query parameters""" user = User() user.admin = False @@ -318,9 +318,9 @@ def test_is_allowed_load_user_permissions_regular_user_access(self): request.method = "GET" request.GET = QueryDict("application=rbac&username=foo") middleware = IdentityHeaderMiddleware(get_response=Mock()) - self.assertEqual(middleware.is_allowed_load_user_permissions(request, user), True) + self.assertEqual(middleware.should_load_user_permissions(request, user), True) - def test_is_allowed_load_user_permissions_regular_user_access_missing_query_params(self): + def test_should_load_user_permissions_regular_user_access_missing_query_params(self): """Tests that the function under test returns False for regular users who have requested the access path without the expected query parameters""" user = User() user.admin = False @@ -340,7 +340,7 @@ def test_is_allowed_load_user_permissions_regular_user_access_missing_query_para for test_case in test_cases: request.GET = test_case - self.assertEqual(middleware.is_allowed_load_user_permissions(request, user), False) + self.assertEqual(middleware.should_load_user_permissions(request, user), False) class ServiceToService(IdentityRequest): From cf3401b92faaa9ab905a2793272c5f915decbd4a Mon Sep 17 00:00:00 2001 From: Mikel Alejo Barcina Ribera Date: Wed, 14 Feb 2024 08:42:16 +0100 Subject: [PATCH 3/3] refactor: apply review suggestions RHCLOUD-30043 --- rbac/rbac/middleware.py | 4 ++-- tests/rbac/test_middleware.py | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/rbac/rbac/middleware.py b/rbac/rbac/middleware.py index 72eae57f..2fd53960 100644 --- a/rbac/rbac/middleware.py +++ b/rbac/rbac/middleware.py @@ -433,10 +433,10 @@ def should_load_user_permissions(self, request: WSGIRequest, user: User) -> bool # # There is one use case where we need to load the user's permissions: whenever they want to query for their # or other users' permissions. In that case, we need to know if they're allowed to do so, and for that, we - # need to preload their permissions to check them afterwards in the subsequent permission checkers. + # need to preload their permissions to check them afterward in the subsequent permission checkers. if request.path.endswith("/access/") and request.method == "GET": query_params: QueryDict = request.GET - return "application" in query_params and "username" in query_params + return "username" in query_params and "application" in query_params else: return True diff --git a/tests/rbac/test_middleware.py b/tests/rbac/test_middleware.py index 49eae020..2290acf4 100644 --- a/tests/rbac/test_middleware.py +++ b/tests/rbac/test_middleware.py @@ -303,8 +303,8 @@ def test_should_load_user_permissions_regular_user_access_non_get_request(self): middleware = IdentityHeaderMiddleware(get_response=Mock()) - invalid_verbs = ["DELETE", "POST", "PATCH"] - for verb in invalid_verbs: + http_verbs = ["DELETE", "PATCH", "POST"] + for verb in http_verbs: request.method = verb self.assertEqual(middleware.should_load_user_permissions(request, user), True)