diff --git a/ansible_base/lib/utils/views/permissions.py b/ansible_base/lib/utils/views/permissions.py index 784332b9d..a02d93e5a 100644 --- a/ansible_base/lib/utils/views/permissions.py +++ b/ansible_base/lib/utils/views/permissions.py @@ -1,4 +1,4 @@ -from rest_framework.permissions import BasePermission +from rest_framework.permissions import SAFE_METHODS, BasePermission class IsSuperuser(BasePermission): @@ -7,4 +7,20 @@ class IsSuperuser(BasePermission): """ def has_permission(self, request, view): - return bool(request.user and request.user.is_superuser) + return bool(request.user and request.user.is_authenticated and request.user.is_superuser) + + +class IsSuperuserOrAuditor(BasePermission): + """ + Allows write access only to system admin users. + Allows read access only to system auditor users. + """ + + def has_permission(self, request, view): + if not (request.user and request.user.is_authenticated): + return False + if request.user.is_superuser: + return True + if request.method in SAFE_METHODS: + return getattr(request.user, 'is_system_auditor', False) + return False diff --git a/ansible_base/oauth2_provider/migrations/0004_alter_oauth2accesstoken_scope.py b/ansible_base/oauth2_provider/migrations/0004_alter_oauth2accesstoken_scope.py new file mode 100644 index 000000000..08cbae180 --- /dev/null +++ b/ansible_base/oauth2_provider/migrations/0004_alter_oauth2accesstoken_scope.py @@ -0,0 +1,19 @@ +# Generated by Django 4.2.11 on 2024-06-06 22:46 + +import ansible_base.oauth2_provider.models.access_token +from django.db import migrations, models + + +class Migration(migrations.Migration): + + dependencies = [ + ('dab_oauth2_provider', '0003_remove_oauth2application_logo_data'), + ] + + operations = [ + migrations.AlterField( + model_name='oauth2accesstoken', + name='scope', + field=models.CharField(default='write', help_text="Allowed scopes, further restricts user's permissions. Must be a simple space-separated string with allowed scopes ['read', 'write'].", max_length=32, validators=[ansible_base.oauth2_provider.models.access_token.validate_scope]), + ), + ] diff --git a/ansible_base/oauth2_provider/models/access_token.py b/ansible_base/oauth2_provider/models/access_token.py index 175cc2baa..204a3bb33 100644 --- a/ansible_base/oauth2_provider/models/access_token.py +++ b/ansible_base/oauth2_provider/models/access_token.py @@ -1,5 +1,6 @@ import oauth2_provider.models as oauth2_models from django.conf import settings +from django.core.exceptions import ValidationError from django.db import connection, models from django.utils.timezone import now from django.utils.translation import gettext_lazy as _ @@ -10,6 +11,18 @@ from ansible_base.lib.utils.settings import get_setting from ansible_base.oauth2_provider.utils import is_external_account +SCOPES = ['read', 'write'] + + +def validate_scope(value): + given_scopes = value.split(' ') + if not given_scopes: + raise ValidationError(_('Scope must be a simple space-separated string with allowed scopes: %(scopes)s') % {'scopes': ', '.join(SCOPES)}) + for scope in given_scopes: + if scope not in SCOPES: + raise ValidationError(_('Invalid scope: %(scope)s. Must be one of: %(scopes)s') % {'scope': scope, 'scopes': ', '.join(SCOPES)}) + + activitystream = object if 'ansible_base.activitystream' in settings.INSTALLED_APPS: from ansible_base.activitystream.models import AuditableModel @@ -26,11 +39,6 @@ class Meta(oauth2_models.AbstractAccessToken.Meta): ordering = ('id',) swappable = "OAUTH2_PROVIDER_ACCESS_TOKEN_MODEL" - SCOPE_CHOICES = [ - ('read', _('Read')), - ('write', _('Write')), - ] - user = models.ForeignKey( settings.AUTH_USER_MODEL, on_delete=models.CASCADE, @@ -57,11 +65,10 @@ class Meta(oauth2_models.AbstractAccessToken.Meta): editable=False, ) scope = models.CharField( - blank=True, default='write', max_length=32, - choices=SCOPE_CHOICES, help_text=_("Allowed scopes, further restricts user's permissions. Must be a simple space-separated string with allowed scopes ['read', 'write']."), + validators=[validate_scope], ) token = prevent_search( models.CharField( diff --git a/ansible_base/oauth2_provider/serializers/token.py b/ansible_base/oauth2_provider/serializers/token.py index 150437f07..6e6d3128d 100644 --- a/ansible_base/oauth2_provider/serializers/token.py +++ b/ansible_base/oauth2_provider/serializers/token.py @@ -22,7 +22,7 @@ class BaseOAuth2TokenSerializer(CommonModelSerializer): refresh_token = SerializerMethodField() token = SerializerMethodField() - ALLOWED_SCOPES = [x[0] for x in OAuth2AccessToken.SCOPE_CHOICES] + ALLOWED_SCOPES = ['read', 'write'] class Meta: model = OAuth2AccessToken diff --git a/ansible_base/oauth2_provider/views/application.py b/ansible_base/oauth2_provider/views/application.py index 25c5919ef..c5d2f7c77 100644 --- a/ansible_base/oauth2_provider/views/application.py +++ b/ansible_base/oauth2_provider/views/application.py @@ -1,7 +1,7 @@ -from rest_framework import permissions from rest_framework.viewsets import ModelViewSet from ansible_base.lib.utils.views.django_app_api import AnsibleBaseDjangoAppApiView +from ansible_base.lib.utils.views.permissions import IsSuperuserOrAuditor from ansible_base.oauth2_provider.models import OAuth2Application from ansible_base.oauth2_provider.serializers import OAuth2ApplicationSerializer @@ -9,4 +9,4 @@ class OAuth2ApplicationViewSet(AnsibleBaseDjangoAppApiView, ModelViewSet): queryset = OAuth2Application.objects.all() serializer_class = OAuth2ApplicationSerializer - permission_classes = [permissions.IsAuthenticated] + permission_classes = [IsSuperuserOrAuditor] diff --git a/ansible_base/oauth2_provider/views/permissions.py b/ansible_base/oauth2_provider/views/permissions.py new file mode 100644 index 000000000..158196d1f --- /dev/null +++ b/ansible_base/oauth2_provider/views/permissions.py @@ -0,0 +1,37 @@ +from django.conf import settings +from rest_framework.permissions import SAFE_METHODS, BasePermission + + +class OAuth2TokenPermission(BasePermission): + # An app token is a token that has an application attached to it + # A personal access token (PAT) is a token with no application attached to it + # With that in mind: + # - An app token can be read, changed, or deleted if: + # - I am the superuser + # - I am the admin of the organization of the application of the token + # - I am the user of the token + # - An app token can be created if: + # - I have read access to the application (currently this means: I am the superuser) + # - A PAT can be read, changed, or deleted if: + # - I am the superuser + # - I am the user of the token + # - A PAT can be created if: + # - I am a user + + def has_permission(self, request, view): + # Handle PAT and app token creation separately + if request.method == 'POST': + if request.data.get('application'): + # TODO: Change this once ansible/django-ansible-base#424 is fixed + return request.user.is_superuser + return request.user.is_authenticated + + def has_object_permission(self, request, view, obj): + if request.method in SAFE_METHODS and getattr(request.user, 'is_system_auditor', False): + return True + if request.user.is_superuser: + return True + if 'ansible_base.rbac' in settings.INSTALLED_APPS: + if obj.application and obj.application.organization.access_qs(request.user, "change").exists(): + return True + return request.user == obj.user diff --git a/ansible_base/oauth2_provider/views/token.py b/ansible_base/oauth2_provider/views/token.py index 021823797..787b9168c 100644 --- a/ansible_base/oauth2_provider/views/token.py +++ b/ansible_base/oauth2_provider/views/token.py @@ -3,13 +3,13 @@ from django.utils.timezone import now from oauth2_provider import views as oauth_views from oauthlib import oauth2 -from rest_framework import permissions from rest_framework.viewsets import ModelViewSet from ansible_base.lib.utils.settings import get_setting from ansible_base.lib.utils.views.django_app_api import AnsibleBaseDjangoAppApiView from ansible_base.oauth2_provider.models import OAuth2AccessToken, OAuth2RefreshToken from ansible_base.oauth2_provider.serializers import OAuth2TokenSerializer +from ansible_base.oauth2_provider.views.permissions import OAuth2TokenPermission class TokenView(oauth_views.TokenView, AnsibleBaseDjangoAppApiView): @@ -54,4 +54,4 @@ def create_token_response(self, request): class OAuth2TokenViewSet(ModelViewSet, AnsibleBaseDjangoAppApiView): queryset = OAuth2AccessToken.objects.all() serializer_class = OAuth2TokenSerializer - permission_classes = [permissions.IsAuthenticated] + permission_classes = [OAuth2TokenPermission] diff --git a/test_app/tests/oauth2_provider/test_rbac.py b/test_app/tests/oauth2_provider/test_rbac.py new file mode 100644 index 000000000..63bb86091 --- /dev/null +++ b/test_app/tests/oauth2_provider/test_rbac.py @@ -0,0 +1,405 @@ +""" +This module tests RBAC against OAuth2 views and models. +The exact set of expected access comes directly from awx.main.access +and the doc strings of the relevant classes defined there are directly turned +into tests here. + +Some of these tests might be redundant with other tests defined elsewhere. +This is intentional as this module is meant to check every single condition +defined within the AWX spec (as well as additional cases where the access +conditions are not met). +""" + +import pytest +from django.urls import reverse + +from ansible_base.oauth2_provider.models import OAuth2AccessToken + + +@pytest.mark.parametrize( + 'user_case, has_access', + [ + ('superuser', True), + pytest.param('admin_of_app_user_org', True, marks=pytest.mark.xfail(reason="https://github.com/ansible/django-ansible-base/issues/424")), + pytest.param('user_in_org', True, marks=pytest.mark.xfail(reason="https://github.com/ansible/django-ansible-base/issues/424")), + ('user', False), + ('anon', False), + ], +) +@pytest.mark.django_db +def test_oauth2_application_read_change_delete( + request, user_case, has_access, org_member_rd, org_admin_rd, random_user, oauth2_application, organization, unauthenticated_api_client +): + """ + From awx.main.access.OAuth2ApplicationAccess: + + I can read, change or delete OAuth 2 applications when: + - I am a superuser. + - I am the admin of the organization of the user of the application. + - I am a user in the organization of the application. + """ + app = oauth2_application[0] + app.organization = organization + app.user = random_user + app.save() + + expected_read_status = 200 if has_access else 403 + expected_change_status = 200 if has_access else 403 + expected_delete_status = 204 if has_access else 403 + + # Determine the user based on the test case (adding them to organizations, etc. as necessary). + if user_case == 'superuser': + # - I am a superuser. + user = request.getfixturevalue('admin_user') # Nothing to do, just use the admin user + else: + user = request.getfixturevalue('random_user_1') + second_org = request.getfixturevalue('organization_1') + if user_case == 'admin_of_app_user_org': + # - I am the admin of the organization of the user of the application. + second_org.users.add(app.user) # Add the app owner user to the org + second_org.users.add(user) # Add me to the org + second_org.admins.add(user) # And make me an admin of the org + elif user_case == 'user_in_org': + # - I am a user in the organization of the application. + organization.users.add(user) # Add me to the org + elif user_case == 'user': + # Negative case, do nothing, the user is just some random user + pass + elif user_case == 'anon': + user = None + expected_read_status = 401 + expected_change_status = 401 + expected_delete_status = 401 + else: + raise ValueError(f"Invalid user_case: {user_case}") + + if user is not None: + unauthenticated_api_client.force_login(user) + url = reverse("application-detail", args=[app.id]) + + # Read + response = unauthenticated_api_client.get(url) + assert response.status_code == expected_read_status, (response.status_code, response.data) + + # Change + response = unauthenticated_api_client.patch(url, data={'name': 'new name'}) + assert response.status_code == expected_change_status, (response.status_code, response.data) + + # Delete + response = unauthenticated_api_client.delete(url) + assert response.status_code == expected_delete_status, (response.status_code, response.data) + + +@pytest.mark.parametrize( + 'user_case, has_access', + [ + ('superuser', True), + pytest.param('admin_of_app_org', True, marks=pytest.mark.xfail(reason="https://github.com/ansible/django-ansible-base/issues/424")), + ('user_in_org', False), + ('user', False), + ('anon', False), + ], +) +@pytest.mark.django_db +def test_oauth2_application_create( + request, user_case, has_access, org_member_rd, org_admin_rd, random_user, oauth2_application, organization, unauthenticated_api_client +): + """ + From awx.main.access.OAuth2ApplicationAccess: + + I can create OAuth 2 applications when: + - I am a superuser. + - I am the admin of the organization of the application. + """ + app = oauth2_application[0] + app.organization = organization + app.user = random_user + app.save() + + expected_create_status = 201 if has_access else 403 + + # Determine the user based on the test case (adding them to organizations, etc. as necessary). + if user_case == 'superuser': + # - I am a superuser. + user = request.getfixturevalue('admin_user') # Nothing to do, just use the admin user + else: + user = request.getfixturevalue('random_user_1') + if user_case == 'admin_of_app_org': + # - I am the admin of the organization of the application. + organization.users.add(user) # Add me to the org + organization.admins.add(user) # And make me an admin of the org + elif user_case == 'user_in_org': + # - I am a user in the organization of the application. + organization.users.add(user) # Add me to the org + elif user_case == 'user': + # Negative case, do nothing, the user is just some random user + pass + elif user_case == 'anon': + user = None + expected_create_status = 401 + else: + raise ValueError(f"Invalid user_case: {user_case}") + + if user is not None: + unauthenticated_api_client.force_login(user) + url = reverse("application-list") + data = { + 'name': 'new app', + 'organization': organization.id, + 'client_type': 'confidential', + 'authorization_grant_type': 'password', + } + if user is not None: + data['user'] = user.id + response = unauthenticated_api_client.post(url, data=data) + assert response.status_code == expected_create_status, (response.status_code, response.data) + + +@pytest.mark.parametrize( + 'user_case, has_access', + [ + ('superuser', True), + ('admin_of_token_app_org', True), + ('user_of_token', True), + ('user_in_app_org', False), + ('user', False), + ('anon', False), + ], +) +@pytest.mark.django_db +def test_oauth2_application_token_read_change_delete( + request, + user_case, + has_access, + org_member_rd, + org_admin_rd, + random_user, + oauth2_application, + oauth2_user_application_token, + organization, + unauthenticated_api_client, +): + """ + From awx.main.access.OAuth2TokenAccess: + + I can read, change or delete an app token when: + - I am a superuser. + - I am the admin of the organization of the application of the token. + - I am the user of the token. + """ + app = oauth2_application[0] + app.organization = organization + app.user = random_user + app.save() + + expected_read_status = 200 if has_access else 403 + expected_change_status = 200 if has_access else 403 + expected_delete_status = 204 if has_access else 403 + + # Determine the user based on the test case (adding them to organizations, etc. as necessary). + if user_case == 'superuser': + # - I am a superuser. + user = request.getfixturevalue('admin_user') # Nothing to do, just use the admin user + else: + user = request.getfixturevalue('random_user_1') + if user_case == 'admin_of_token_app_org': + # - I am the admin of the organization of the application of the token. + organization.users.add(user) # Add me to the org + organization.admins.add(user) # And make me an admin of the org + elif user_case == 'user_of_token': + # - I am the user of the token. + user = oauth2_user_application_token.user + elif user_case == 'user_in_app_org': + # Negative case, user is in the org but not an admin or the token user + organization.users.add(user) # Add me to the org + elif user_case == 'user': + # Negative case, do nothing, the user is just some random user + pass + elif user_case == 'anon': + user = None + expected_read_status = 401 + expected_change_status = 401 + expected_delete_status = 401 + else: + raise ValueError(f"Invalid user_case: {user_case}") + + if user is not None: + unauthenticated_api_client.force_login(user) + url = reverse("token-detail", args=[oauth2_user_application_token.id]) + + # Read + response = unauthenticated_api_client.get(url) + assert response.status_code == expected_read_status, (response.status_code, response.data) + + # Change + response = unauthenticated_api_client.patch(url, data={'description': 'new description'}) + assert response.status_code == expected_change_status, (response.status_code, response.data) + + # Delete + response = unauthenticated_api_client.delete(url) + assert response.status_code == expected_delete_status, (response.status_code, response.data) + + +@pytest.mark.parametrize( + 'user_case, has_access', + [ + ('superuser', True), + pytest.param('admin_of_app_user_org', True, marks=pytest.mark.xfail(reason="https://github.com/ansible/django-ansible-base/issues/424")), + pytest.param('user_in_app_org', True, marks=pytest.mark.xfail(reason="https://github.com/ansible/django-ansible-base/issues/424")), + ('user', False), + ('anon', False), + ], +) +@pytest.mark.django_db +def test_oauth2_application_token_create( + request, user_case, has_access, org_member_rd, org_admin_rd, random_user, oauth2_application, organization, unauthenticated_api_client +): + """ + From awx.main.access.OAuth2TokenAccess: + + I can create an OAuth2 app token when: + - I have the read permission of the related application. + + --- + + Inheriting from app access: + I can read OAuth 2 applications when: + - I am a superuser. + - I am the admin of the organization of the user of the application. + - I am a user in the organization of the application. + """ + app = oauth2_application[0] + app.organization = organization + app.user = random_user + app.save() + + expected_create_status = 201 if has_access else 403 + + if user_case == 'superuser': + # - I am a superuser. + user = request.getfixturevalue('admin_user') # Nothing to do, just use the admin user + else: + user = request.getfixturevalue('random_user_1') + second_org = request.getfixturevalue('organization_1') + if user_case == 'admin_of_app_user_org': + # - I am the admin of the organization of the user of the application. + second_org.users.add(app.user) # Add the app owner user to the org + second_org.users.add(user) # Add me to the org + second_org.admins.add(user) # And make me an admin of the org + elif user_case == 'user_in_app_org': + # - I am a user in the organization of the application. + organization.users.add(user) # Add me to the org + elif user_case == 'user': + # Negative case, do nothing, the user is just some random user + pass + elif user_case == 'anon': + user = None + expected_create_status = 401 + else: + raise ValueError(f"Invalid user_case: {user_case}") + + if user is not None: + unauthenticated_api_client.force_login(user) + url = reverse("token-list") + + # Create + data = { + 'application': app.id, + 'description': 'new token', + 'scope': 'read write', + } + response = unauthenticated_api_client.post(url, data=data) + assert response.status_code == expected_create_status, (response.status_code, response.data) + + +@pytest.mark.django_db +def test_oauth2_pat_create(request, org_member_rd, org_admin_rd, user, random_user, user_api_client): + """ + From awx.main.access.OAuth2TokenAccess: + + I can create an OAuth2 Personal Access Token when: + - I am a user. But I can only create a PAT for myself. + """ + + url = reverse("token-list") + + # Create PAT + data = { + 'description': 'new PAT', + 'scope': 'read write', + } + response = user_api_client.post(url, data=data) + assert response.status_code == 201 + token_id = response.data['id'] + token = OAuth2AccessToken.objects.get(id=token_id) + assert token.user == user + + # Create PAT with another user + data = { + 'description': 'new PAT that is not mine', + 'scope': 'read write', + 'user': random_user.id, + } + response = user_api_client.post(url, data=data) + # We don't block the request but we force the user to be the requester + assert response.status_code == 201 + token_id = response.data['id'] + token = OAuth2AccessToken.objects.get(id=token_id) + assert token.user == user # not random_user + + +@pytest.mark.parametrize( + 'user_case, has_access', + [ + ('superuser', True), + ('user_of_token', True), + ('user', False), + ('anon', False), + ], +) +@pytest.mark.django_db +def test_oauth2_pat_read_change_delete(request, user_case, has_access, org_member_rd, org_admin_rd, unauthenticated_api_client, oauth2_user_pat): + """ + From awx.main.access.OAuth2TokenAccess: + + I can read, change or delete a personal token when: + - I am the user of the token + - I am the superuser + """ + expected_read_status = 200 if has_access else 403 + expected_change_status = 200 if has_access else 403 + expected_delete_status = 204 if has_access else 403 + + # Determine the user based on the test case (adding them to organizations, etc. as necessary). + if user_case == 'superuser': + # - I am the superuser + user = request.getfixturevalue('admin_user') # Nothing to do, just use the admin user + elif user_case == 'user_of_token': + # - I am the user of the token + user = oauth2_user_pat.user + elif user_case == 'user': + # Negative case, the user is just some random user + user = request.getfixturevalue('random_user') + elif user_case == 'anon': + user = None + expected_read_status = 401 + expected_change_status = 401 + expected_delete_status = 401 + else: + raise ValueError(f"Invalid user_case: {user_case}") + + if user is not None: + unauthenticated_api_client.force_login(user) + url = reverse("token-detail", args=[oauth2_user_pat.id]) + + # Read + response = unauthenticated_api_client.get(url) + assert response.status_code == expected_read_status, (response.status_code, response.data) + + # Change + response = unauthenticated_api_client.patch(url, data={'description': 'new description'}) + assert response.status_code == expected_change_status, (response.status_code, response.data) + + # Delete + response = unauthenticated_api_client.delete(url) + assert response.status_code == expected_delete_status, (response.status_code, response.data) diff --git a/test_app/tests/oauth2_provider/views/test_application.py b/test_app/tests/oauth2_provider/views/test_application.py index c9c311025..902d106b8 100644 --- a/test_app/tests/oauth2_provider/views/test_application.py +++ b/test_app/tests/oauth2_provider/views/test_application.py @@ -10,7 +10,7 @@ "client_fixture,expected_status", [ ("admin_api_client", 200), - ("user_api_client", 200), + pytest.param("user_api_client", 200, marks=pytest.mark.xfail(reason="https://github.com/ansible/django-ansible-base/issues/424")), ("unauthenticated_api_client", 401), ], ) @@ -67,7 +67,7 @@ def test_oauth2_provider_application_related(admin_api_client, oauth2_applicatio "client_fixture,expected_status", [ ("admin_api_client", 200), - ("user_api_client", 200), + pytest.param("user_api_client", 200, marks=pytest.mark.xfail(reason="https://github.com/ansible/django-ansible-base/issues/424")), ("unauthenticated_api_client", 401), ], ) @@ -89,7 +89,7 @@ def test_oauth2_provider_application_detail(request, client_fixture, expected_st "client_fixture,expected_status", [ ("admin_api_client", 201), - ("user_api_client", 201), + pytest.param("user_api_client", 201, marks=pytest.mark.xfail(reason="https://github.com/ansible/django-ansible-base/issues/424")), ("unauthenticated_api_client", 401), ], ) @@ -145,7 +145,7 @@ def test_oauth2_provider_application_validator(admin_api_client): "client_fixture,expected_status", [ ("admin_api_client", 200), - ("user_api_client", 200), + pytest.param("user_api_client", 200, marks=pytest.mark.xfail(reason="https://github.com/ansible/django-ansible-base/issues/424")), ("unauthenticated_api_client", 401), ], ) diff --git a/test_app/tests/oauth2_provider/views/test_token.py b/test_app/tests/oauth2_provider/views/test_token.py index b268943b8..15d1594f2 100644 --- a/test_app/tests/oauth2_provider/views/test_token.py +++ b/test_app/tests/oauth2_provider/views/test_token.py @@ -425,3 +425,33 @@ def test_oauth2_tokens_list_for_user( response = admin_api_client.get(url) assert response.status_code == 200 assert len(response.data['results']) == 6 + + +@pytest.mark.parametrize( + 'given,error', + [ + ('read write', None), + ('read', None), + ('write', None), + ('read write foo', 'Invalid scope: foo'), + ('foo', 'Invalid scope: foo'), + ('', None), # default scope is 'write' + ], +) +@pytest.mark.django_db +def test_oauth2_token_scope_validator(user_api_client, given, error): + """ + Ensure that the scope validator works as expected. + """ + + url = reverse("token-list") + + # Create PAT + data = { + 'description': 'new PAT', + 'scope': given, + } + response = user_api_client.post(url, data=data) + assert response.status_code == 400 if error else 201 + if error: + assert error in str(response.data['scope'][0])