diff --git a/rbac/internal/integration_views.py b/rbac/internal/integration_views.py index 0381963ea..ceed97925 100644 --- a/rbac/internal/integration_views.py +++ b/rbac/internal/integration_views.py @@ -16,46 +16,43 @@ # """View for OCM group/role API.""" -from audioop import reverse -import datetime -import json + import logging -import pytz -from django.conf import settings -from django.db import transaction -from django.db.migrations.recorder import MigrationRecorder -from django.http import Http404, HttpResponse +from django.http import HttpResponseBadRequest from django.shortcuts import redirect, reverse -from management import views from management.cache import TenantCache -from management.models import Group, Role - - -from api.models import Tenant logger = logging.getLogger(__name__) TENANTS = TenantCache() + def groups(request, account_number): + """Formant and pass internal groups request to /groups/ API.""" username = request.GET.get("username") if username: base_url = reverse("group-list") - url = f'{base_url}?principals={username}' + url = f"{base_url}?principals={username}" return redirect(url) else: - return Http404 + return HttpResponseBadRequest("Username must be supplied.") + def groups_for_principal(request, account_number, username): + """Format and pass internal groups for principal request to /groups/ API.""" base_url = reverse("group-list") - url = f'{base_url}?principals={username}' + url = f"{base_url}?principals={username}" return redirect(url) + def roles_from_group(request, account_number, uuid): + """Pass internal /groups//roles/ request to /groups/ API.""" return redirect("group-roles", uuid=uuid) -def roles_for_group(request, account_number, username, uuid): - base_url = reverse("group-roles", kwargs={'uuid': uuid}) - url = f'{base_url}?principals={username}' - return redirect(url) \ No newline at end of file + +def roles_for_group_principal(request, account_number, username, uuid): + """Pass internal /principal//groups//roles/ request to /groups/ API.""" + base_url = reverse("group-roles", kwargs={"uuid": uuid}) + url = f"{base_url}?principals={username}" + return redirect(url) diff --git a/rbac/internal/middleware.py b/rbac/internal/middleware.py index 663f1def3..fb9a6f8df 100644 --- a/rbac/internal/middleware.py +++ b/rbac/internal/middleware.py @@ -27,7 +27,7 @@ from django.utils.deprecation import MiddlewareMixin from api.common import RH_IDENTITY_HEADER -from api.models import User, Tenant +from api.models import Tenant, User from api.serializers import extract_header from rbac.middleware import IdentityHeaderMiddleware @@ -61,7 +61,8 @@ def process_request(self, request): logger.error("Malformed X-RH-Identity header.") return HttpResponseForbidden() - if "integration" in resolve(request.path).url_name: + target = resolve(request.path) + if target and "integration" in target: return IdentityHeaderMiddleware.process_request(self, request) request.user = user @@ -71,4 +72,7 @@ def process_response(self, request, response): return response def get_tenant(self, request): - request.tenant = get_object_or_404(Tenant, tenant_name=self.tenant_re.match(request.path_info).group('tenant_id')) \ No newline at end of file + """Ensure internal requests carry proper tenant id.""" + request.tenant = get_object_or_404( + Tenant, tenant_name=self.tenant_re.match(request.path_info).group("tenant_id") + ) diff --git a/rbac/internal/urls.py b/rbac/internal/urls.py index 4a9006bbd..c1a9b10ef 100644 --- a/rbac/internal/urls.py +++ b/rbac/internal/urls.py @@ -27,9 +27,21 @@ path("api/tenant/", views.list_tenants), path("api/tenant//", views.tenant_view), path("api/tenant//groups/", integration_views.groups, name="integration-groups"), - path("api/tenant//groups//roles/", integration_views.roles_from_group, name="integration-group-roles"), - path("api/tenant//principal//groups/", integration_views.groups_for_principal, name="integration-princ-groups"), - path("api/tenant//principal//groups//roles/", integration_views.roles_for_group, name="integration-princ-roles"), + path( + "api/tenant//groups//roles/", + integration_views.roles_from_group, + name="integration-group-roles", + ), + path( + "api/tenant//principal//groups/", + integration_views.groups_for_principal, + name="integration-princ-groups", + ), + path( + "api/tenant//principal//groups//roles/", + integration_views.roles_for_group_principal, + name="integration-princ-roles", + ), path("api/migrations/run/", views.run_migrations), path("api/migrations/progress/", views.migration_progress), path("api/seeds/run/", views.run_seeds), diff --git a/rbac/management/group/view.py b/rbac/management/group/view.py index d18b47c61..26f274190 100644 --- a/rbac/management/group/view.py +++ b/rbac/management/group/view.py @@ -106,7 +106,7 @@ def principal_filter(self, queryset, field, values): principals = [value.lower() for value in values.split(",")] for principal in principals: - queryset = queryset.filter(principals__username__icontains=principal) + queryset = queryset.filter(principals__username__iexact=principal) return queryset diff --git a/tests/identity_request.py b/tests/identity_request.py index fe4c78519..df537c424 100644 --- a/tests/identity_request.py +++ b/tests/identity_request.py @@ -130,7 +130,7 @@ def _build_identity(cls, user_data, account, org_id, is_org_admin, is_internal): if is_internal: identity["identity"]["type"] = "Associate" - identity["identity"]["associate"] = {"email": user_data["email"]} + identity["identity"]["associate"] = identity.get("identity").get("user") identity["identity"]["user"]["is_internal"] = True else: identity["identity"]["type"] = "User" diff --git a/tests/internal/test_integration_views.py b/tests/internal/test_integration_views.py new file mode 100644 index 000000000..e74c9eba9 --- /dev/null +++ b/tests/internal/test_integration_views.py @@ -0,0 +1,281 @@ +# +# Copyright 2019 Red Hat, Inc. +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as +# published by the Free Software Foundation, either version 3 of the +# License, or (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. +# +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see . +# +"""Test the internal viewset.""" +import uuid +from rest_framework import status +from rest_framework.test import APIClient +from unittest.mock import patch + +from api.models import User +from management.models import Group, Policy, Principal, Role +from tests.identity_request import IdentityRequest + + +class IntegrationViewsTests(IdentityRequest): + """Test the integration views.""" + + def setUp(self): + """Set up the integration view tests.""" + test_roles = ["Role Admin", "Role A", "Role B"] + test_principals = ["user_admin", "user_a", "user_b"] + + super().setUp() + self.client = APIClient() + self.customer = self.customer_data + self.internal_request_context = self._create_request_context( + self.customer, self.user_data, create_customer=False, is_internal=True, create_tenant=False + ) + + self.request = self.internal_request_context["request"] + user = User() + user.username = self.user_data["username"] + user.account = self.customer_data["account_id"] + self.request.user = user + + for username in test_principals: + principal = Principal.objects.create(username=username, tenant=self.tenant) + principal.save() + + for role_name in test_roles: + role = Role.objects.create( + name=role_name, description="A role for a group.", system=True, tenant=self.tenant + ) + role.save() + + group = Group(name="Group Admin", system=True, tenant=self.tenant) + group.save() + group.principals.add(Principal.objects.get(username="user_admin")) + policy = Policy.objects.create(name="Admin Policy", group=group, tenant=self.tenant) + policy.roles.add(Role.objects.get(name="Role Admin")) + policy.save() + group.policies.add(policy) + group.save() + + group = Group(name="Group All", system=True, tenant=self.tenant) + group.save() + for principal in test_principals: + group.principals.add(Principal.objects.get(username=principal)) + policy = Policy.objects.create(name="All Policy", group=group, tenant=self.tenant) + for role in test_roles: + policy.roles.add(Role.objects.get(name=role)) + group.policies.add(policy) + group.save() + + group = Group(name="Group A", system=True, tenant=self.tenant) + group.save() + group.principals.add(Principal.objects.get(username="user_admin")) + group.principals.add(Principal.objects.get(username="user_a")) + policy = Policy.objects.create(name="A Policy", group=group, tenant=self.tenant) + policy.roles.add(Role.objects.get(name="Role A")) + group.policies.add(policy) + group.save + + def tearDown(self): + """Tear down internal viewset tests.""" + Group.objects.all().delete() + Role.objects.all().delete() + Policy.objects.all().delete() + + def test_groups_valid_account(self): + """Test that a request to /tenant//groups/?username= from an internal account works.""" + response = self.client.get( + f"/_private/api/tenant/{self.tenant.tenant_name}/groups/?username=user_admin", + **self.request.META, + follow=True, + ) + self.assertEqual(response.status_code, status.HTTP_200_OK) + # Expecting ["Group All", "Group A", "Group Admin"] + self.assertEqual(response.data.get("meta").get("count"), 3) + + def test_groups_invalid_account(self): + """Test that a /tenant//groups/?username= request from an external account fails.""" + external_request_context = self._create_request_context( + self.customer, self.user_data, create_customer=False, is_internal=False, create_tenant=False + ) + request = external_request_context["request"] + response = self.client.get( + f"/_private/api/tenant/{self.tenant.tenant_name}/groups/?username=user_a", **request.META, follow=True + ) + self.assertEqual(response.status_code, status.HTTP_403_FORBIDDEN) + + def test_groups_user_filter(self): + """Test that only the groups a user is a member of are returned for a /tenant//groups/?username= request.""" + response = self.client.get( + f"/_private/api/tenant/{self.tenant.tenant_name}/groups/?username=user_a", **self.request.META, follow=True + ) + self.assertEqual(response.status_code, status.HTTP_200_OK) + # Expecting ["Group All", "Group A"] + self.assertEqual(response.data.get("meta").get("count"), 2) + + def test_groups_nonexistent_user(self): + """Test that a request for groups of a nonexistent user returns 0.""" + response = self.client.get( + f"/_private/api/tenant/{self.tenant.tenant_name}/groups/?username=user_x", **self.request.META, follow=True + ) + self.assertEqual(response.status_code, status.HTTP_200_OK) + self.assertEqual(response.data.get("meta").get("count"), 0) + + def test_groups_empty_user(self): + """Test that a request for groups without a ?username param returns as a bad request.""" + response = self.client.get( + f"/_private/api/tenant/{self.tenant.tenant_name}/groups/", **self.request.META, follow=True + ) + self.assertEqual(response.status_code, status.HTTP_400_BAD_REQUEST) + + def test_groups_for_principal_valid_account(self): + """Test that a request to /tenant//principal//groups/ from an internal account works.""" + response = self.client.get( + f"/_private/api/tenant/{self.tenant.tenant_name}/principal/user_admin/groups/", + **self.request.META, + follow=True, + ) + self.assertEqual(response.status_code, status.HTTP_200_OK) + # Expecting ["Group All", "Group A", "Group Admin"] + self.assertEqual(response.data.get("meta").get("count"), 3) + + def test_groups_for_principal_invalid_account(self): + """Test that a /tenant//principal/groups/ request from an external account fails.""" + external_request_context = self._create_request_context( + self.customer, self.user_data, create_customer=False, is_internal=False, create_tenant=False + ) + request = external_request_context["request"] + response = self.client.get( + f"/_private/api/tenant/{self.tenant.tenant_name}/principal/user_a/groups/", **request.META, follow=True + ) + self.assertEqual(response.status_code, status.HTTP_403_FORBIDDEN) + + def test_groups_for_principal_filter(self): + """Test that only the groups a user is a member of are returned for a /tenant//principal/user_a/groups/ request.""" + response = self.client.get( + f"/_private/api/tenant/{self.tenant.tenant_name}/groups/?username=user_a", **self.request.META, follow=True + ) + self.assertEqual(response.status_code, status.HTTP_200_OK) + # Expecting ["Group All", "Group A"] + self.assertEqual(response.data.get("meta").get("count"), 2) + + def test_groups_for_principal_nonexsistant_user(self): + """Test that only the groups a user is a member of are returned for a /tenant//principal/user_x/groups/ request.""" + response = self.client.get( + f"/_private/api/tenant/{self.tenant.tenant_name}/groups/?username=user_x", **self.request.META, follow=True + ) + self.assertEqual(response.status_code, status.HTTP_200_OK) + self.assertEqual(response.data.get("meta").get("count"), 0) + + def test_roles_from_group_valid(self): + """Test that a valid request to /tenant//groups//roles/ from an internal account works.""" + group_all_uuid = Group.objects.get(name="Group All").uuid + response = self.client.get( + f"/_private/api/tenant/{self.tenant.tenant_name}/groups/{group_all_uuid}/roles/", + **self.request.META, + follow=True, + ) + self.assertEqual(response.status_code, status.HTTP_200_OK) + # Expecting ["Role Admin", "Role A", "Role B"] + self.assertEqual(response.data.get("meta").get("count"), 3) + + def test_roles_from_group_invalid_uuid(self): + """Test that a request to /tenant//groups//roles/ with an invalid uuid fails.""" + response = self.client.get( + f"/_private/api/tenant/{self.tenant.tenant_name}/groups/{uuid.uuid4}/roles/", + **self.request.META, + follow=True, + ) + self.assertEqual(response.status_code, status.HTTP_400_BAD_REQUEST) + + def test_roles_from_group_invalid_account(self): + """Test that a /tenant//groups//roles/ request from an external account fails.""" + group_all_uuid = Group.objects.get(name="Group All").uuid + external_request_context = self._create_request_context( + self.customer, self.user_data, create_customer=False, is_internal=False, create_tenant=False + ) + request = external_request_context["request"] + response = self.client.get( + f"/_private/api/tenant/{self.tenant.tenant_name}/groups/{group_all_uuid}/roles/", + **request.META, + follow=True, + ) + self.assertEqual(response.status_code, status.HTTP_403_FORBIDDEN) + + def test_roles_from_group_filter(self): + """Test that a valid request to /tenant//groups//roles/ from an internal properly filters groups.""" + group_a_uuid = Group.objects.get(name="Group A").uuid + response = self.client.get( + f"/_private/api/tenant/{self.tenant.tenant_name}/groups/{group_a_uuid}/roles/", + **self.request.META, + follow=True, + ) + self.assertEqual(response.status_code, status.HTTP_200_OK) + # Expecting ["Role A"] + self.assertEqual(response.data.get("meta").get("count"), 1) + + def test_roles_for_group_principal_valid(self): + """Test that a valid request to /tenant//principal/user_admin/groups//roles/ from an internal account works.""" + group_all_uuid = Group.objects.get(name="Group All").uuid + response = self.client.get( + f"/_private/api/tenant/{self.tenant.tenant_name}/principal/user_admin/groups/{group_all_uuid}/roles/", + **self.request.META, + follow=True, + ) + self.assertEqual(response.status_code, status.HTTP_200_OK) + # Expecting ["Role Admin", "Role A", "Role B"] + self.assertEqual(response.data.get("meta").get("count"), 3) + + def test_roles_for_group_principal_invalid_account(self): + """Test that a valid request to /tenant//principal/user_admin/groups//roles/ from an external account fails.""" + group_all_uuid = Group.objects.get(name="Group All").uuid + external_request_context = self._create_request_context( + self.customer, self.user_data, create_customer=False, is_internal=False, create_tenant=False + ) + request = external_request_context["request"] + response = self.client.get( + f"/_private/api/tenant/{self.tenant.tenant_name}/principal/user_admin/groups/{group_all_uuid}/roles/", + **request.META, + follow=True, + ) + self.assertEqual(response.status_code, status.HTTP_403_FORBIDDEN) + + def test_roles_for_group_principal_invalid_uuid(self): + """Test that a request to /tenant//principal/user_admin/groups//roles/ with an invalid uuid fails.""" + group_all_uuid = Group.objects.get(name="Group All").uuid + response = self.client.get( + f"/_private/api/tenant/{self.tenant.tenant_name}/principal/user_admin/groups/{uuid.uuid4}/roles/", + **self.request.META, + follow=True, + ) + self.assertEqual(response.status_code, status.HTTP_400_BAD_REQUEST) + + def test_roles_for_group_principal_filter(self): + """Test that a valid request to /tenant//principal/user_admin/groups//roles/ filters properly.""" + # user_a in Group A + group_a_uuid = Group.objects.get(name="Group A").uuid + response = self.client.get( + f"/_private/api/tenant/{self.tenant.tenant_name}/principal/user_a/groups/{group_a_uuid}/roles/", + **self.request.META, + follow=True, + ) + self.assertEqual(response.status_code, status.HTTP_200_OK) + # Expecting ["Role A"] + self.assertEqual(response.data.get("meta").get("count"), 1) + + # user_b not in Group A + group_a_uuid = Group.objects.get(name="Group A").uuid + response = self.client.get( + f"/_private/api/tenant/{self.tenant.tenant_name}/principal/user_b/groups/{group_a_uuid}/roles/", + **self.request.META, + follow=True, + ) + self.assertEqual(response.status_code, status.HTTP_404_NOT_FOUND) diff --git a/tests/rbac/test_middleware.py b/tests/rbac/test_middleware.py index f915c3fa8..e4319368e 100644 --- a/tests/rbac/test_middleware.py +++ b/tests/rbac/test_middleware.py @@ -239,8 +239,8 @@ def test_race_condition_customer(self): mock_request.user = User() mock_request.user.username = self.user_data["username"] mock_request.user.account = self.customer_data["account_id"] - orig_cust = IdentityHeaderMiddleware().get_tenant(None, None, mock_request) - dup_cust = IdentityHeaderMiddleware().get_tenant(None, None, mock_request) + orig_cust = IdentityHeaderMiddleware().get_tenant(mock_request) + dup_cust = IdentityHeaderMiddleware().get_tenant(mock_request) self.assertEqual(orig_cust, dup_cust) def test_tenant_process_without_org_id(self):