From 04ab6764633baa02f9292b04229d7623400eadcc Mon Sep 17 00:00:00 2001 From: Joey Orlando Date: Thu, 10 Oct 2024 15:02:21 -0400 Subject: [PATCH] feat: update RBAC permissioning to support `grafana-irm-app` (#5149) # What this PR does Closes https://github.com/grafana/irm/issues/31 (and supersedes https://github.com/grafana/oncall/pull/4784) Main changes: - updates `apps.api.permissions.user_is_authorized` to check the value of `organization.is_grafana_irm_enabled`. If it is, we check for the presence of `grafana-irm-app` prefixed RBAC permissions rather than `grafana-oncall-app` - cleans-up `engine/apps/api/tests/test_permissions.py` (bulk of the changes in the PR) - converts `apps.user_management.models.User.build_permissions_query` to a `UserQuerySet` method instead - means we can now do things like this instead: ```python3 User.objects.filter_by_permission(RBACPermission.Permissions.NOTIFICATIONS_READ, organization) ``` ## Checklist - [x] Unit, integration, and e2e (if applicable) tests updated - [x] Documentation added (or `pr:no public docs` PR label added if not required) - [x] Added the relevant release notes label (see labels prefixed w/ `release:`). These labels dictate how your PR will show up in the autogenerated release notes. --- engine/apps/api/permissions.py | 75 +- engine/apps/api/tests/test_permissions.py | 646 ++++++++++++++---- engine/apps/api/tests/test_user.py | 311 +++++---- engine/apps/api/views/user.py | 8 +- engine/apps/auth_token/auth.py | 4 +- engine/apps/grafana_plugin/helpers/client.py | 4 +- .../oss_installation/views/cloud_users.py | 5 +- engine/apps/schedules/ical_utils.py | 26 +- .../apps/schedules/tests/test_ical_utils.py | 4 +- .../apps/slack/models/slack_team_identity.py | 17 +- engine/apps/slack/models/slack_usergroup.py | 9 +- engine/apps/user_management/models/user.py | 74 +- .../apps/user_management/tests/test_user.py | 76 ++- 13 files changed, 879 insertions(+), 380 deletions(-) diff --git a/engine/apps/api/permissions.py b/engine/apps/api/permissions.py index e0120de567..852506a109 100644 --- a/engine/apps/api/permissions.py +++ b/engine/apps/api/permissions.py @@ -13,7 +13,7 @@ from common.utils import getattrd if typing.TYPE_CHECKING: - from apps.user_management.models import User + from apps.user_management.models import Organization, User RBAC_PERMISSIONS_ATTR = "rbac_permissions" RBAC_OBJECT_PERMISSIONS_ATTR = "rbac_object_permissions" @@ -50,6 +50,12 @@ class GrafanaAPIPermission(typing.TypedDict): action: str +class GrafanaAPIPermissions: + @classmethod + def construct_permissions(cls, actions: typing.List[str]) -> typing.List[GrafanaAPIPermission]: + return [GrafanaAPIPermission(action=action) for action in actions] + + class Resources(enum.Enum): ALERT_GROUPS = "alert-groups" INTEGRATIONS = "integrations" @@ -103,6 +109,9 @@ def __init__( self.value = f"{prefix}.{resource.value}:{action.value}" self.fallback_role = fallback_role + def user_has_permission(self, user: "User") -> bool: + return user_is_authorized(user, [self]) + LegacyAccessControlCompatiblePermissions = typing.List[LegacyAccessControlCompatiblePermission] RBACPermissionsAttribute = typing.Dict[str, LegacyAccessControlCompatiblePermissions] @@ -126,6 +135,36 @@ def get_most_authorized_role(permissions: LegacyAccessControlCompatiblePermissio return min({p.fallback_role for p in permissions}, key=lambda r: r.value) +def convert_oncall_permission_to_irm(permission: LegacyAccessControlCompatiblePermission) -> str: + return permission.value.replace(PluginID.ONCALL, PluginID.IRM) + + +def get_required_permission_values( + organization: "Organization", required_permissions: LegacyAccessControlCompatiblePermissions +) -> typing.List[str]: + """ + This function returns a list of required permission values, taking into account whether or not the organization + is using the IRM plugin. + + If the IRM plugin is being used, we substitue `grafana-oncall-app` with `grafana-irm-app` + as the RBAC permission prefix. + """ + permission_values = [] + + for permission in required_permissions: + permission_value = permission.value + if permission_value.startswith(PluginID.ONCALL) and organization.is_grafana_irm_enabled: + permission_values.append(convert_oncall_permission_to_irm(permission)) + else: + permission_values.append(permission_value) + + return permission_values + + +def user_has_minimum_required_basic_role(user: "User", required_basic_role: LegacyAccessControlRole) -> bool: + return user.role <= required_basic_role.value + + def user_is_authorized(user: "User", required_permissions: LegacyAccessControlCompatiblePermissions) -> bool: """ This function checks whether `user` has all necessary permissions specified in `required_permissions`. @@ -134,11 +173,12 @@ def user_is_authorized(user: "User", required_permissions: LegacyAccessControlCo `user` - The user to check permissions for `required_permissions` - A list of permissions that a user must have to be considered authorized """ - if user.organization.is_rbac_permissions_enabled: + organization = user.organization + if organization.is_rbac_permissions_enabled: user_permissions = [u["action"] for u in user.permissions] - required_permission_values = [p.value for p in required_permissions] + required_permission_values = get_required_permission_values(organization, required_permissions) return all(permission in user_permissions for permission in required_permission_values) - return user.role <= get_most_authorized_role(required_permissions).value + return user_has_minimum_required_basic_role(user, get_most_authorized_role(required_permissions)) class RBACPermission(permissions.BasePermission): @@ -310,20 +350,23 @@ def has_object_permission(self, request: AuthenticatedRequest, view: ViewSetOrAP ALL_PERMISSION_NAMES = [perm for perm in dir(RBACPermission.Permissions) if not perm.startswith("_")] -ALL_PERMISSION_CLASSES = [ +ALL_PERMISSION_CLASSES: LegacyAccessControlCompatiblePermissions = [ getattr(RBACPermission.Permissions, permission_name) for permission_name in ALL_PERMISSION_NAMES ] -ALL_PERMISSION_CHOICES = [ - (permission_class.value, permission_name) - for permission_class, permission_name in zip(ALL_PERMISSION_CLASSES, ALL_PERMISSION_NAMES) -] - - -def get_permission_from_permission_string(perm: str) -> typing.Optional[LegacyAccessControlCompatiblePermission]: - for permission_class in ALL_PERMISSION_CLASSES: - if permission_class.value == perm: - return permission_class - return None +ALL_PERMISSION_CHOICES: typing.List[typing.Tuple[str, str]] = [] +for permission_class, permission_name in zip(ALL_PERMISSION_CLASSES, ALL_PERMISSION_NAMES): + ALL_PERMISSION_CHOICES += [ + (permission_class.value, permission_name), + (convert_oncall_permission_to_irm(permission_class), permission_name), + ] +ALL_PERMISSION_NAME_TO_CLASS_MAP: typing.Dict[str, LegacyAccessControlCompatiblePermission] = {} +for permission_class in ALL_PERMISSION_CLASSES: + ALL_PERMISSION_NAME_TO_CLASS_MAP.update( + { + permission_class.value: permission_class, + convert_oncall_permission_to_irm(permission_class): permission_class, + } + ) class IsOwner(permissions.BasePermission): diff --git a/engine/apps/api/tests/test_permissions.py b/engine/apps/api/tests/test_permissions.py index 46bf59fb3f..ab99f5ba01 100644 --- a/engine/apps/api/tests/test_permissions.py +++ b/engine/apps/api/tests/test_permissions.py @@ -4,47 +4,18 @@ from rest_framework.views import APIView from rest_framework.viewsets import ViewSetMixin -from apps.api.permissions import ( - RBAC_PERMISSIONS_ATTR, - GrafanaAPIPermission, - HasRBACPermissions, - IsOwner, - IsOwnerOrHasRBACPermissions, - LegacyAccessControlCompatiblePermission, - LegacyAccessControlRole, - RBACObjectPermissionsAttribute, - RBACPermission, - RBACPermissionsAttribute, - get_most_authorized_role, - get_view_action, - user_is_authorized, -) - - -class MockedOrg: - def __init__(self, org_has_rbac_enabled: bool) -> None: - self.is_rbac_permissions_enabled = org_has_rbac_enabled - - -class MockedUser: - def __init__( - self, - permissions: typing.List[LegacyAccessControlCompatiblePermission], - org_has_rbac_enabled=True, - basic_role: typing.Optional[LegacyAccessControlRole] = None, - ) -> None: - self.permissions = [GrafanaAPIPermission(action=perm.value) for perm in permissions] - self.role = basic_role if basic_role is not None else get_most_authorized_role(permissions) - self.organization = MockedOrg(org_has_rbac_enabled) +from apps.api import permissions +from apps.user_management.models import User +from common.constants.plugin_ids import PluginID class MockedSchedule: - def __init__(self, user: MockedUser) -> None: + def __init__(self, user: User) -> None: self.user = user class MockedRequest: - def __init__(self, user: typing.Optional[MockedUser] = None, method: typing.Optional[str] = None) -> None: + def __init__(self, user: typing.Optional[User] = None, method: typing.Optional[str] = None) -> None: if user: self.user = user if method: @@ -55,8 +26,8 @@ class MockedViewSet(ViewSetMixin): def __init__( self, action: str, - rbac_permissions: typing.Optional[RBACPermissionsAttribute] = None, - rbac_object_permissions: typing.Optional[RBACObjectPermissionsAttribute] = None, + rbac_permissions: typing.Optional[permissions.RBACPermissionsAttribute] = None, + rbac_object_permissions: typing.Optional[permissions.RBACObjectPermissionsAttribute] = None, ) -> None: super().__init__() self.action = action @@ -70,8 +41,8 @@ def __init__( class MockedAPIView(APIView): def __init__( self, - rbac_permissions: typing.Optional[RBACPermissionsAttribute] = None, - rbac_object_permissions: typing.Optional[RBACObjectPermissionsAttribute] = None, + rbac_permissions: typing.Optional[permissions.RBACPermissionsAttribute] = None, + rbac_object_permissions: typing.Optional[permissions.RBACObjectPermissionsAttribute] = None, ) -> None: super().__init__() @@ -81,84 +52,263 @@ def __init__( self.rbac_object_permissions = rbac_object_permissions +class TestLegacyAccessControlCompatiblePermission: + @pytest.mark.parametrize( + "permission_to_test,user_basic_role,is_rbac_permissions_enabled,is_grafana_irm_enabled,expected_result", + [ + # rbac enabled - is_grafana_irm_enabled disabled + ( + permissions.RBACPermission.Permissions.ALERT_GROUPS_READ, + permissions.LegacyAccessControlRole.VIEWER, + True, + False, + True, + ), + ( + permissions.RBACPermission.Permissions.ALERT_GROUPS_WRITE, + permissions.LegacyAccessControlRole.VIEWER, + True, + False, + False, + ), + # rbac enabled - is_grafana_irm_enabled enabled + ( + permissions.RBACPermission.Permissions.ALERT_GROUPS_READ, + permissions.LegacyAccessControlRole.VIEWER, + True, + True, + True, + ), + ( + permissions.RBACPermission.Permissions.ALERT_GROUPS_WRITE, + permissions.LegacyAccessControlRole.VIEWER, + True, + True, + False, + ), + # rbac disabled (and hence is_grafana_irm_enabled is irrelevant) + ( + permissions.RBACPermission.Permissions.ALERT_GROUPS_READ, + permissions.LegacyAccessControlRole.VIEWER, + False, + False, + True, + ), + ( + permissions.RBACPermission.Permissions.ALERT_GROUPS_READ, + permissions.LegacyAccessControlRole.VIEWER, + False, + True, + True, + ), + ( + permissions.RBACPermission.Permissions.ALERT_GROUPS_WRITE, + permissions.LegacyAccessControlRole.VIEWER, + False, + False, + False, + ), + ( + permissions.RBACPermission.Permissions.ALERT_GROUPS_WRITE, + permissions.LegacyAccessControlRole.VIEWER, + False, + True, + False, + ), + ], + ) + @pytest.mark.django_db + def test_user_has_permission( + self, + make_organization, + make_user_for_organization, + permission_to_test, + user_basic_role, + is_rbac_permissions_enabled, + is_grafana_irm_enabled, + expected_result, + ): + user_permission = permissions.RBACPermission.Permissions.ALERT_GROUPS_READ + + org = make_organization( + is_rbac_permissions_enabled=is_rbac_permissions_enabled, is_grafana_irm_enabled=is_grafana_irm_enabled + ) + user = make_user_for_organization( + org, + role=user_basic_role, + permissions=permissions.GrafanaAPIPermissions.construct_permissions( + [ + permissions.convert_oncall_permission_to_irm(user_permission) + if is_grafana_irm_enabled + else user_permission.value + ] + ), + ) + + assert permission_to_test.user_has_permission(user) == expected_result + + +@pytest.mark.parametrize( + "user_role,required_basic_role,expected_result", + [ + (permissions.LegacyAccessControlRole.NONE, permissions.LegacyAccessControlRole.NONE, True), + (permissions.LegacyAccessControlRole.NONE, permissions.LegacyAccessControlRole.VIEWER, False), + (permissions.LegacyAccessControlRole.NONE, permissions.LegacyAccessControlRole.EDITOR, False), + (permissions.LegacyAccessControlRole.NONE, permissions.LegacyAccessControlRole.ADMIN, False), + (permissions.LegacyAccessControlRole.VIEWER, permissions.LegacyAccessControlRole.NONE, True), + (permissions.LegacyAccessControlRole.VIEWER, permissions.LegacyAccessControlRole.VIEWER, True), + (permissions.LegacyAccessControlRole.VIEWER, permissions.LegacyAccessControlRole.EDITOR, False), + (permissions.LegacyAccessControlRole.VIEWER, permissions.LegacyAccessControlRole.ADMIN, False), + (permissions.LegacyAccessControlRole.EDITOR, permissions.LegacyAccessControlRole.NONE, True), + (permissions.LegacyAccessControlRole.EDITOR, permissions.LegacyAccessControlRole.VIEWER, True), + (permissions.LegacyAccessControlRole.EDITOR, permissions.LegacyAccessControlRole.EDITOR, True), + (permissions.LegacyAccessControlRole.EDITOR, permissions.LegacyAccessControlRole.ADMIN, False), + (permissions.LegacyAccessControlRole.ADMIN, permissions.LegacyAccessControlRole.NONE, True), + (permissions.LegacyAccessControlRole.ADMIN, permissions.LegacyAccessControlRole.VIEWER, True), + (permissions.LegacyAccessControlRole.ADMIN, permissions.LegacyAccessControlRole.EDITOR, True), + (permissions.LegacyAccessControlRole.ADMIN, permissions.LegacyAccessControlRole.ADMIN, True), + ], +) +@pytest.mark.django_db +def test_user_has_minimum_required_basic_role( + make_organization, + make_user_for_organization, + user_role, + required_basic_role, + expected_result, +): + org = make_organization() + user = make_user_for_organization(org, role=user_role, permissions=[]) + assert permissions.user_has_minimum_required_basic_role(user, required_basic_role) is expected_result + + +@pytest.mark.parametrize("is_grafana_irm_enabled", [True, False]) @pytest.mark.parametrize( - "user_permissions,required_permissions,org_has_rbac_enabled,expected_result", + "user_permissions,required_permissions,is_rbac_permissions_enabled,expected_result", [ ( - [RBACPermission.Permissions.ALERT_GROUPS_READ], - [RBACPermission.Permissions.ALERT_GROUPS_READ], + [permissions.RBACPermission.Permissions.ALERT_GROUPS_READ], + [permissions.RBACPermission.Permissions.ALERT_GROUPS_READ], True, True, ), ( - [RBACPermission.Permissions.ALERT_GROUPS_READ], - [RBACPermission.Permissions.ALERT_GROUPS_READ], + [permissions.RBACPermission.Permissions.ALERT_GROUPS_READ], + [permissions.RBACPermission.Permissions.ALERT_GROUPS_READ], False, True, ), ( - [RBACPermission.Permissions.ALERT_GROUPS_READ, RBACPermission.Permissions.ALERT_GROUPS_WRITE], - [RBACPermission.Permissions.ALERT_GROUPS_READ, RBACPermission.Permissions.ALERT_GROUPS_WRITE], + [ + permissions.RBACPermission.Permissions.ALERT_GROUPS_READ, + permissions.RBACPermission.Permissions.ALERT_GROUPS_WRITE, + ], + [ + permissions.RBACPermission.Permissions.ALERT_GROUPS_READ, + permissions.RBACPermission.Permissions.ALERT_GROUPS_WRITE, + ], True, True, ), ( - [RBACPermission.Permissions.ALERT_GROUPS_READ, RBACPermission.Permissions.ALERT_GROUPS_WRITE], - [RBACPermission.Permissions.ALERT_GROUPS_READ, RBACPermission.Permissions.ALERT_GROUPS_WRITE], + [ + permissions.RBACPermission.Permissions.ALERT_GROUPS_READ, + permissions.RBACPermission.Permissions.ALERT_GROUPS_WRITE, + ], + [ + permissions.RBACPermission.Permissions.ALERT_GROUPS_READ, + permissions.RBACPermission.Permissions.ALERT_GROUPS_WRITE, + ], False, True, ), ( - [RBACPermission.Permissions.ALERT_GROUPS_WRITE], - [RBACPermission.Permissions.ALERT_GROUPS_READ], + [permissions.RBACPermission.Permissions.ALERT_GROUPS_WRITE], + [permissions.RBACPermission.Permissions.ALERT_GROUPS_READ], True, False, ), ( - [RBACPermission.Permissions.ALERT_GROUPS_WRITE], - [RBACPermission.Permissions.ALERT_GROUPS_READ], + [permissions.RBACPermission.Permissions.ALERT_GROUPS_WRITE], + [permissions.RBACPermission.Permissions.ALERT_GROUPS_READ], False, True, ), ( - [RBACPermission.Permissions.ALERT_GROUPS_READ], - [RBACPermission.Permissions.ALERT_GROUPS_READ, RBACPermission.Permissions.ALERT_GROUPS_WRITE], + [permissions.RBACPermission.Permissions.ALERT_GROUPS_READ], + [ + permissions.RBACPermission.Permissions.ALERT_GROUPS_READ, + permissions.RBACPermission.Permissions.ALERT_GROUPS_WRITE, + ], False, False, ), ( - [RBACPermission.Permissions.ALERT_GROUPS_READ], - [RBACPermission.Permissions.ALERT_GROUPS_READ, RBACPermission.Permissions.ALERT_GROUPS_WRITE], + [permissions.RBACPermission.Permissions.ALERT_GROUPS_READ], + [ + permissions.RBACPermission.Permissions.ALERT_GROUPS_READ, + permissions.RBACPermission.Permissions.ALERT_GROUPS_WRITE, + ], True, False, ), ], ) -def test_user_is_authorized(user_permissions, required_permissions, org_has_rbac_enabled, expected_result) -> None: - user = MockedUser(user_permissions, org_has_rbac_enabled=org_has_rbac_enabled) - assert user_is_authorized(user, required_permissions) == expected_result +@pytest.mark.django_db +def test_user_is_authorized( + make_organization, + make_user_for_organization, + user_permissions, + required_permissions, + is_rbac_permissions_enabled, + is_grafana_irm_enabled, + expected_result, +) -> None: + basic_role = permissions.get_most_authorized_role(user_permissions) + + org = make_organization( + is_rbac_permissions_enabled=is_rbac_permissions_enabled, is_grafana_irm_enabled=is_grafana_irm_enabled + ) + user = make_user_for_organization( + org, + role=basic_role, + permissions=permissions.GrafanaAPIPermissions.construct_permissions( + [ + permissions.convert_oncall_permission_to_irm(perm) if is_grafana_irm_enabled else perm.value + for perm in user_permissions + ] + ), + ) + + assert permissions.user_is_authorized(user, required_permissions) == expected_result @pytest.mark.parametrize( - "permissions,expected_role", + "user_permissions,expected_role", [ - ([RBACPermission.Permissions.ALERT_GROUPS_READ], RBACPermission.Permissions.ALERT_GROUPS_READ.fallback_role), ( - [RBACPermission.Permissions.ALERT_GROUPS_READ, RBACPermission.Permissions.ALERT_GROUPS_WRITE], - RBACPermission.Permissions.ALERT_GROUPS_WRITE.fallback_role, + [permissions.RBACPermission.Permissions.ALERT_GROUPS_READ], + permissions.RBACPermission.Permissions.ALERT_GROUPS_READ.fallback_role, + ), + ( + [ + permissions.RBACPermission.Permissions.ALERT_GROUPS_READ, + permissions.RBACPermission.Permissions.ALERT_GROUPS_WRITE, + ], + permissions.RBACPermission.Permissions.ALERT_GROUPS_WRITE.fallback_role, ), ( [ - RBACPermission.Permissions.USER_SETTINGS_READ, - RBACPermission.Permissions.USER_SETTINGS_WRITE, - RBACPermission.Permissions.USER_SETTINGS_ADMIN, + permissions.RBACPermission.Permissions.USER_SETTINGS_READ, + permissions.RBACPermission.Permissions.USER_SETTINGS_WRITE, + permissions.RBACPermission.Permissions.USER_SETTINGS_ADMIN, ], - RBACPermission.Permissions.USER_SETTINGS_ADMIN.fallback_role, + permissions.RBACPermission.Permissions.USER_SETTINGS_ADMIN.fallback_role, ), ], ) -def test_get_most_authorized_role(permissions, expected_role) -> None: - assert get_most_authorized_role(permissions) == expected_role +def test_get_most_authorized_role(user_permissions, expected_role) -> None: + assert permissions.get_most_authorized_role(user_permissions) == expected_role def test_get_view_action(): @@ -170,13 +320,18 @@ def test_get_view_action(): method = "APIVIEW_ACTION" request = MockedRequest(method=method) - assert get_view_action(request, viewset) == viewset_action, "it works with a ViewSet" - assert get_view_action(request, apiview) == method.lower(), "it works with an APIView" + assert permissions.get_view_action(request, viewset) == viewset_action, "it works with a ViewSet" + assert permissions.get_view_action(request, apiview) == method.lower(), "it works with an APIView" class TestRBACPermission: - def test_has_permission_works_on_a_viewset_view(self) -> None: - required_permission = RBACPermission.Permissions.ALERT_GROUPS_READ + @pytest.mark.django_db + def test_has_permission_works_on_a_viewset_view( + self, + make_organization, + make_user_for_organization, + ) -> None: + required_permission = permissions.RBACPermission.Permissions.ALERT_GROUPS_READ action = "hello" viewset = MockedViewSet( @@ -193,26 +348,42 @@ def test_has_permission_works_on_a_viewset_view(self) -> None: }, ) - user_with_permission = MockedUser([required_permission]) - user_without_permission = MockedUser([RBACPermission.Permissions.ALERT_GROUPS_WRITE]) + org = make_organization(is_rbac_permissions_enabled=True) + user_with_permission = make_user_for_organization( + org, + role=permissions.LegacyAccessControlRole.NONE, + permissions=permissions.GrafanaAPIPermissions.construct_permissions([required_permission.value]), + ) + user_without_permission = make_user_for_organization( + org, + role=permissions.LegacyAccessControlRole.NONE, + permissions=permissions.GrafanaAPIPermissions.construct_permissions( + [permissions.RBACPermission.Permissions.ALERT_GROUPS_WRITE.value] + ), + ) assert ( - RBACPermission().has_permission(MockedRequest(user_with_permission), viewset) is True + permissions.RBACPermission().has_permission(MockedRequest(user_with_permission), viewset) is True ), "it works on a viewset when the user does have permission" assert ( - RBACPermission().has_permission(MockedRequest(user_without_permission), viewset) is False + permissions.RBACPermission().has_permission(MockedRequest(user_without_permission), viewset) is False ), "it works on a viewset when the user does have permission" assert ( - RBACPermission().has_permission( + permissions.RBACPermission().has_permission( MockedRequest(user_without_permission), viewset_with_no_required_permissions ) is True ), "it works on a viewset when the viewset action does not require permissions" - def test_has_permission_works_on_an_apiview_view(self) -> None: - required_permission = RBACPermission.Permissions.ALERT_GROUPS_READ + @pytest.mark.django_db + def test_has_permission_works_on_an_apiview_view( + self, + make_organization, + make_user_for_organization, + ) -> None: + required_permission = permissions.RBACPermission.Permissions.ALERT_GROUPS_READ method = "hello" apiview = MockedAPIView( @@ -226,61 +397,70 @@ def test_has_permission_works_on_an_apiview_view(self) -> None: } ) - user1 = MockedUser([required_permission]) - user2 = MockedUser([RBACPermission.Permissions.ALERT_GROUPS_WRITE]) + org = make_organization(is_rbac_permissions_enabled=True) + user1 = make_user_for_organization( + org, + role=permissions.LegacyAccessControlRole.NONE, + permissions=permissions.GrafanaAPIPermissions.construct_permissions([required_permission.value]), + ) + user2 = make_user_for_organization( + org, + role=permissions.LegacyAccessControlRole.NONE, + permissions=permissions.GrafanaAPIPermissions.construct_permissions( + [permissions.RBACPermission.Permissions.ALERT_GROUPS_WRITE.value] + ), + ) class Request(MockedRequest): - def __init__(self, user: typing.Optional[MockedUser] = None) -> None: + def __init__(self, user: typing.Optional[User] = None) -> None: super().__init__(user, method) assert ( - RBACPermission().has_permission(Request(user1), apiview) is True + permissions.RBACPermission().has_permission(Request(user1), apiview) is True ), "it works on an APIView when the user has permission" assert ( - RBACPermission().has_permission(Request(user2), apiview) is False + permissions.RBACPermission().has_permission(Request(user2), apiview) is False ), "it works on an APIView when the user does not have permission" assert ( - RBACPermission().has_permission(Request(user2), apiview_with_no_permissions) is True + permissions.RBACPermission().has_permission(Request(user2), apiview_with_no_permissions) is True ), "it works on a viewset when the viewset action does not require permissions" def test_has_permission_throws_assertion_error_if_developer_forgets_to_specify_rbac_permissions(self) -> None: action_slash_method = "hello" - error_msg = ( - f"Must define a {RBAC_PERMISSIONS_ATTR} dict on the ViewSet that is consuming the RBACPermission class" - ) + error_msg = f"Must define a {permissions.RBAC_PERMISSIONS_ATTR} dict on the ViewSet that is consuming the RBACPermission class" viewset = MockedViewSet(action_slash_method) apiview = MockedAPIView() with pytest.raises(AssertionError, match=error_msg): - RBACPermission().has_permission(MockedRequest(), viewset) + permissions.RBACPermission().has_permission(MockedRequest(), viewset) with pytest.raises(AssertionError, match=error_msg): - RBACPermission().has_permission(MockedRequest(method=action_slash_method), apiview) + permissions.RBACPermission().has_permission(MockedRequest(method=action_slash_method), apiview) def test_has_permission_throws_assertion_error_if_developer_forgets_to_specify_an_action_in_rbac_permissions( self, ) -> None: action_slash_method = "hello" other_action_rbac_permissions = {"bonjour": []} - error_msg = f"""Each action must be defined within the {RBAC_PERMISSIONS_ATTR} dict on the ViewSet. + error_msg = f"""Each action must be defined within the {permissions.RBAC_PERMISSIONS_ATTR} dict on the ViewSet. \nIf an action requires no permissions, its value should explicitly be set to an empty list""" viewset = MockedViewSet(action_slash_method, other_action_rbac_permissions) apiview = MockedAPIView(rbac_permissions=other_action_rbac_permissions) with pytest.raises(AssertionError, match=error_msg): - RBACPermission().has_permission(MockedRequest(), viewset) + permissions.RBACPermission().has_permission(MockedRequest(), viewset) with pytest.raises(AssertionError, match=error_msg): - RBACPermission().has_permission(MockedRequest(method=action_slash_method), apiview) + permissions.RBACPermission().has_permission(MockedRequest(method=action_slash_method), apiview) def test_has_object_permission_returns_true_if_rbac_object_permissions_not_specified(self) -> None: request = MockedRequest() - assert RBACPermission().has_object_permission(request, MockedAPIView(), None) is True - assert RBACPermission().has_object_permission(request, MockedViewSet("potato"), None) is True + assert permissions.RBACPermission().has_object_permission(request, MockedAPIView(), None) is True + assert permissions.RBACPermission().has_object_permission(request, MockedViewSet("potato"), None) is True def test_has_object_permission_works_if_no_permission_class_specified_for_action(self) -> None: action = "hello" @@ -289,8 +469,8 @@ def test_has_object_permission_works_if_no_permission_class_specified_for_action apiview = MockedAPIView(rbac_object_permissions={}) viewset = MockedViewSet(action, rbac_object_permissions={}) - assert RBACPermission().has_object_permission(request, apiview, None) is True - assert RBACPermission().has_object_permission(request, viewset, None) is True + assert permissions.RBACPermission().has_object_permission(request, apiview, None) is True + assert permissions.RBACPermission().has_object_permission(request, viewset, None) is True def test_has_object_permission_returns_true_if_action_omitted_from_rbac_object_permissions(self) -> None: action1 = "hello" @@ -308,8 +488,8 @@ def has_object_permission(self, _req, _view, _obj) -> None: apiview = MockedAPIView(rbac_object_permissions=rbac_object_permissions) viewset = MockedViewSet(action2, rbac_object_permissions=rbac_object_permissions) - assert RBACPermission().has_object_permission(request, apiview, None) is True - assert RBACPermission().has_object_permission(request, viewset, None) is True + assert permissions.RBACPermission().has_object_permission(request, apiview, None) is True + assert permissions.RBACPermission().has_object_permission(request, viewset, None) is True def test_has_object_permission_works_when_permission_class_specified_for_action(self) -> None: action = "hello" @@ -324,39 +504,66 @@ def has_object_permission(self, _req, _view, _obj) -> None: apiview = MockedAPIView(rbac_object_permissions=rbac_object_permissions) viewset = MockedViewSet(action, rbac_object_permissions=rbac_object_permissions) - assert RBACPermission().has_object_permission(request, apiview, None) == mocked_permission_class_response - assert RBACPermission().has_object_permission(request, viewset, None) == mocked_permission_class_response + assert ( + permissions.RBACPermission().has_object_permission(request, apiview, None) + == mocked_permission_class_response + ) + assert ( + permissions.RBACPermission().has_object_permission(request, viewset, None) + == mocked_permission_class_response + ) class TestIsOwner: - def test_it_works_when_comparing_user_to_object(self) -> None: - user1 = MockedUser([]) - user2 = MockedUser([]) + @pytest.mark.django_db + def test_it_works_when_comparing_user_to_object( + self, + make_organization, + make_user_for_organization, + ) -> None: + org = make_organization(is_rbac_permissions_enabled=True) + user1 = make_user_for_organization(org, role=permissions.LegacyAccessControlRole.NONE, permissions=[]) + user2 = make_user_for_organization(org, role=permissions.LegacyAccessControlRole.NONE, permissions=[]) + request = MockedRequest(user1) - IsUser = IsOwner() + IsUser = permissions.IsOwner() assert IsUser.has_object_permission(request, None, user1) is True assert IsUser.has_object_permission(request, None, user2) is False - def test_it_works_when_comparing_user_to_ownership_field_object(self) -> None: - user1 = MockedUser([]) - user2 = MockedUser([]) + @pytest.mark.django_db + def test_it_works_when_comparing_user_to_ownership_field_object( + self, + make_organization, + make_user_for_organization, + ) -> None: + org = make_organization(is_rbac_permissions_enabled=True) + user1 = make_user_for_organization(org, role=permissions.LegacyAccessControlRole.NONE, permissions=[]) + user2 = make_user_for_organization(org, role=permissions.LegacyAccessControlRole.NONE, permissions=[]) + schedule = MockedSchedule(user1) - IsScheduleOwner = IsOwner("user") + IsScheduleOwner = permissions.IsOwner("user") assert IsScheduleOwner.has_object_permission(MockedRequest(user1), None, schedule) is True assert IsScheduleOwner.has_object_permission(MockedRequest(user2), None, schedule) is False - def test_it_works_when_comparing_user_to_nested_ownership_field_object(self) -> None: + @pytest.mark.django_db + def test_it_works_when_comparing_user_to_nested_ownership_field_object( + self, + make_organization, + make_user_for_organization, + ) -> None: class Thingy: def __init__(self, schedule: MockedSchedule) -> None: self.schedule = schedule - user1 = MockedUser([]) - user2 = MockedUser([]) + org = make_organization(is_rbac_permissions_enabled=True) + user1 = make_user_for_organization(org, role=permissions.LegacyAccessControlRole.NONE, permissions=[]) + user2 = make_user_for_organization(org, role=permissions.LegacyAccessControlRole.NONE, permissions=[]) + schedule = MockedSchedule(user1) thingy = Thingy(schedule) - IsScheduleOwner = IsOwner("schedule.user") + IsScheduleOwner = permissions.IsOwner("schedule.user") assert IsScheduleOwner.has_object_permission(MockedRequest(user1), None, thingy) is True assert IsScheduleOwner.has_object_permission(MockedRequest(user2), None, thingy) is False @@ -366,80 +573,140 @@ def __init__(self, schedule: MockedSchedule) -> None: "user_permissions,required_permissions,expected_result", [ ( - [RBACPermission.Permissions.ALERT_GROUPS_READ], - [RBACPermission.Permissions.ALERT_GROUPS_READ], + [permissions.RBACPermission.Permissions.ALERT_GROUPS_READ], + [permissions.RBACPermission.Permissions.ALERT_GROUPS_READ], True, ), ( - [RBACPermission.Permissions.ALERT_GROUPS_READ, RBACPermission.Permissions.ALERT_GROUPS_WRITE], - [RBACPermission.Permissions.ALERT_GROUPS_READ, RBACPermission.Permissions.ALERT_GROUPS_WRITE], + [ + permissions.RBACPermission.Permissions.ALERT_GROUPS_READ, + permissions.RBACPermission.Permissions.ALERT_GROUPS_WRITE, + ], + [ + permissions.RBACPermission.Permissions.ALERT_GROUPS_READ, + permissions.RBACPermission.Permissions.ALERT_GROUPS_WRITE, + ], True, ), ( - [RBACPermission.Permissions.ALERT_GROUPS_WRITE], - [RBACPermission.Permissions.ALERT_GROUPS_READ], + [permissions.RBACPermission.Permissions.ALERT_GROUPS_WRITE], + [permissions.RBACPermission.Permissions.ALERT_GROUPS_READ], False, ), ( - [RBACPermission.Permissions.ALERT_GROUPS_READ], - [RBACPermission.Permissions.ALERT_GROUPS_READ, RBACPermission.Permissions.ALERT_GROUPS_WRITE], + [permissions.RBACPermission.Permissions.ALERT_GROUPS_READ], + [ + permissions.RBACPermission.Permissions.ALERT_GROUPS_READ, + permissions.RBACPermission.Permissions.ALERT_GROUPS_WRITE, + ], False, ), ], ) -def test_HasRBACPermission(user_permissions, required_permissions, expected_result) -> None: - request = MockedRequest(MockedUser(user_permissions)) - assert HasRBACPermissions(required_permissions).has_object_permission(request, None, None) == expected_result +@pytest.mark.django_db +def test_HasRBACPermission( + make_organization, + make_user_for_organization, + user_permissions, + required_permissions, + expected_result, +) -> None: + org = make_organization(is_rbac_permissions_enabled=True) + user = make_user_for_organization( + org, + role=permissions.LegacyAccessControlRole.NONE, + permissions=permissions.GrafanaAPIPermissions.construct_permissions([perm.value for perm in user_permissions]), + ) + + request = MockedRequest(user) + assert ( + permissions.HasRBACPermissions(required_permissions).has_object_permission(request, None, None) + == expected_result + ) class TestIsOwnerOrHasRBACPermissions: - required_permission = RBACPermission.Permissions.SCHEDULES_READ + required_permission = permissions.RBACPermission.Permissions.SCHEDULES_READ required_permissions = [required_permission] + user_permissions = permissions.GrafanaAPIPermissions.construct_permissions( + [perm.value for perm in required_permissions] + ) - def test_it_works_when_user_is_owner_and_does_not_have_permissions(self) -> None: - user1 = MockedUser([]) + @pytest.mark.django_db + def test_it_works_when_user_is_owner_and_does_not_have_permissions( + self, + make_organization, + make_user_for_organization, + ) -> None: + org = make_organization(is_rbac_permissions_enabled=True) + user1 = make_user_for_organization(org, role=permissions.LegacyAccessControlRole.NONE, permissions=[]) schedule = MockedSchedule(user1) request = MockedRequest(user1) - PermClass = IsOwnerOrHasRBACPermissions(self.required_permissions) + PermClass = permissions.IsOwnerOrHasRBACPermissions(self.required_permissions) assert PermClass.has_object_permission(request, None, user1) is True - PermClass = IsOwnerOrHasRBACPermissions(self.required_permissions, "user") + PermClass = permissions.IsOwnerOrHasRBACPermissions(self.required_permissions, "user") assert PermClass.has_object_permission(request, None, schedule) is True - def test_it_works_when_user_is_owner_and_has_permissions(self) -> None: - user1 = MockedUser(self.required_permissions) + @pytest.mark.django_db + def test_it_works_when_user_is_owner_and_has_permissions( + self, + make_organization, + make_user_for_organization, + ) -> None: + org = make_organization(is_rbac_permissions_enabled=True) + user1 = make_user_for_organization( + org, role=permissions.LegacyAccessControlRole.NONE, permissions=self.user_permissions + ) schedule = MockedSchedule(user1) request = MockedRequest(user1) - PermClass = IsOwnerOrHasRBACPermissions(self.required_permissions) + PermClass = permissions.IsOwnerOrHasRBACPermissions(self.required_permissions) assert PermClass.has_object_permission(request, None, user1) is True - PermClass = IsOwnerOrHasRBACPermissions(self.required_permissions, "user") + PermClass = permissions.IsOwnerOrHasRBACPermissions(self.required_permissions, "user") assert PermClass.has_object_permission(request, None, schedule) is True - def test_it_works_when_user_is_not_owner_and_does_not_have_permissions(self) -> None: - user1 = MockedUser([]) - user2 = MockedUser([]) + @pytest.mark.django_db + def test_it_works_when_user_is_not_owner_and_does_not_have_permissions( + self, + make_organization, + make_user_for_organization, + ) -> None: + org = make_organization(is_rbac_permissions_enabled=True) + user1 = make_user_for_organization(org, role=permissions.LegacyAccessControlRole.NONE, permissions=[]) + user2 = make_user_for_organization(org, role=permissions.LegacyAccessControlRole.NONE, permissions=[]) schedule = MockedSchedule(user1) request = MockedRequest(user2) - PermClass = IsOwnerOrHasRBACPermissions(self.required_permissions) + PermClass = permissions.IsOwnerOrHasRBACPermissions(self.required_permissions) assert PermClass.has_object_permission(request, None, user1) is False - PermClass = IsOwnerOrHasRBACPermissions(self.required_permissions, "user") + PermClass = permissions.IsOwnerOrHasRBACPermissions(self.required_permissions, "user") assert PermClass.has_object_permission(request, None, schedule) is False - def test_it_works_when_user_is_not_owner_and_has_permissions(self) -> None: - user1 = MockedUser([]) - user2 = MockedUser(self.required_permissions) + @pytest.mark.django_db + def test_it_works_when_user_is_not_owner_and_has_permissions( + self, + make_organization, + make_user_for_organization, + ) -> None: + org = make_organization(is_rbac_permissions_enabled=True) + user1 = make_user_for_organization(org, role=permissions.LegacyAccessControlRole.NONE, permissions=[]) + user2 = make_user_for_organization( + org, role=permissions.LegacyAccessControlRole.NONE, permissions=self.user_permissions + ) + user3 = make_user_for_organization(org, role=permissions.LegacyAccessControlRole.NONE, permissions=[]) + schedule = MockedSchedule(user1) request = MockedRequest(user2) + request_user3 = MockedRequest(user3) - PermClass = IsOwnerOrHasRBACPermissions(self.required_permissions) + PermClass = permissions.IsOwnerOrHasRBACPermissions(self.required_permissions) assert PermClass.has_object_permission(request, None, user1) is True - PermClass = IsOwnerOrHasRBACPermissions(self.required_permissions, "user") + PermClass = permissions.IsOwnerOrHasRBACPermissions(self.required_permissions, "user") assert PermClass.has_object_permission(request, None, schedule) is True class Thingy: @@ -447,7 +714,96 @@ def __init__(self, schedule: MockedSchedule) -> None: self.schedule = schedule thingy = Thingy(schedule) - PermClass = IsOwnerOrHasRBACPermissions(self.required_permissions, "schedule.user") + PermClass = permissions.IsOwnerOrHasRBACPermissions(self.required_permissions, "schedule.user") assert PermClass.has_object_permission(request, None, thingy) is True - assert PermClass.has_object_permission(MockedRequest(MockedUser([])), None, thingy) is False + assert PermClass.has_object_permission(request_user3, None, thingy) is False + + +@pytest.mark.parametrize( + "permission,expected", + [ + ( + permissions.RBACPermission.Permissions.ALERT_GROUPS_READ, + f"{PluginID.IRM}.alert-groups:read", + ), + ( + permissions.RBACPermission.Permissions.LABEL_READ, + permissions.RBACPermission.Permissions.LABEL_READ.value, + ), + ], +) +def test_convert_oncall_permission_to_irm(permission, expected) -> None: + assert permissions.convert_oncall_permission_to_irm(permission) == expected + + +@pytest.mark.parametrize( + "is_grafana_irm_enabled,required_permissions,expected_permission_values", + [ + ( + False, + [ + permissions.RBACPermission.Permissions.ALERT_GROUPS_READ, + permissions.RBACPermission.Permissions.ALERT_GROUPS_WRITE, + ], + [ + permissions.RBACPermission.Permissions.ALERT_GROUPS_READ.value, + permissions.RBACPermission.Permissions.ALERT_GROUPS_WRITE.value, + ], + ), + ( + True, + [ + permissions.RBACPermission.Permissions.ALERT_GROUPS_READ, + permissions.RBACPermission.Permissions.ALERT_GROUPS_WRITE, + ], + [ + permissions.RBACPermission.Permissions.ALERT_GROUPS_READ.value.replace(PluginID.ONCALL, PluginID.IRM), + permissions.RBACPermission.Permissions.ALERT_GROUPS_WRITE.value.replace(PluginID.ONCALL, PluginID.IRM), + ], + ), + ( + True, + [ + permissions.RBACPermission.Permissions.LABEL_CREATE, + permissions.RBACPermission.Permissions.LABEL_WRITE, + permissions.RBACPermission.Permissions.LABEL_READ, + ], + [ + permissions.RBACPermission.Permissions.LABEL_CREATE.value, + permissions.RBACPermission.Permissions.LABEL_WRITE.value, + permissions.RBACPermission.Permissions.LABEL_READ.value, + ], + ), + ], +) +@pytest.mark.django_db +def test_get_required_permission_values( + make_organization, + is_grafana_irm_enabled, + required_permissions, + expected_permission_values, +) -> None: + organization = make_organization(is_rbac_permissions_enabled=True, is_grafana_irm_enabled=is_grafana_irm_enabled) + assert permissions.get_required_permission_values(organization, required_permissions) == expected_permission_values + + +@pytest.mark.parametrize( + "perm,expected_permission", + [ + ( + permissions.RBACPermission.Permissions.ALERT_GROUPS_READ.value, + permissions.RBACPermission.Permissions.ALERT_GROUPS_READ, + ), + ( + "non.existent.permission", + None, + ), + ( + permissions.convert_oncall_permission_to_irm(permissions.RBACPermission.Permissions.ALERT_GROUPS_READ), + permissions.RBACPermission.Permissions.ALERT_GROUPS_READ, + ), + ], +) +def test_all_permission_name_to_class_map(perm, expected_permission) -> None: + assert permissions.ALL_PERMISSION_NAME_TO_CLASS_MAP.get(perm, None) == expected_permission diff --git a/engine/apps/api/tests/test_user.py b/engine/apps/api/tests/test_user.py index b89a0ba340..262989281e 100644 --- a/engine/apps/api/tests/test_user.py +++ b/engine/apps/api/tests/test_user.py @@ -11,7 +11,7 @@ from rest_framework.response import Response from rest_framework.test import APIClient -from apps.api.permissions import GrafanaAPIPermission, LegacyAccessControlRole, RBACPermission +from apps.api import permissions from apps.api.serializers.user import UserHiddenFieldsSerializer from apps.api.views.user import UPCOMING_SHIFTS_DEFAULT_DAYS from apps.base.models import UserNotificationPolicy @@ -231,7 +231,7 @@ def test_list_users( ): organization = make_organization() admin = make_user_for_organization(organization, _verified_phone_number="1234567890") - editor = make_user_for_organization(organization, role=LegacyAccessControlRole.EDITOR) + editor = make_user_for_organization(organization, role=permissions.LegacyAccessControlRole.EDITOR) _, token = make_token_for_organization(organization) client = APIClient() @@ -316,21 +316,39 @@ def test_list_users( assert response.json() == expected_payload +@pytest.mark.parametrize("is_grafana_irm_enabled", [False, True]) @pytest.mark.django_db def test_list_users_filtered_by_granted_permission( + is_grafana_irm_enabled, make_organization, make_user_for_organization, make_token_for_organization, make_user_auth_headers, ): - perm_to_filter_on = RBACPermission.Permissions.NOTIFICATIONS_READ.value - perms_to_grant = [GrafanaAPIPermission(action=perm_to_filter_on)] + permission = permissions.RBACPermission.Permissions.NOTIFICATIONS_READ + admin_perm_required_to_call_endpoint = permissions.RBACPermission.Permissions.USER_SETTINGS_READ + perm_to_filter_on = ( + permissions.convert_oncall_permission_to_irm(permission) if is_grafana_irm_enabled else permission.value + ) - organization = make_organization() - admin_user = make_user_for_organization(organization) + perms_to_grant = permissions.GrafanaAPIPermissions.construct_permissions([perm_to_filter_on]) + + organization = make_organization(is_grafana_irm_enabled=is_grafana_irm_enabled, is_rbac_permissions_enabled=True) + admin_user = make_user_for_organization( + organization, + # NOTE: need to explicitly grant this permission here because otherwise the permissions granted by the + # make_user_for_organization fixture will only grant the oncall flavour of the permission + permissions=permissions.GrafanaAPIPermissions.construct_permissions( + [ + permissions.convert_oncall_permission_to_irm(admin_perm_required_to_call_endpoint) + if is_grafana_irm_enabled + else admin_perm_required_to_call_endpoint.value + ] + ), + ) user1 = make_user_for_organization(organization, permissions=perms_to_grant) user2 = make_user_for_organization(organization, permissions=perms_to_grant) - user3 = make_user_for_organization(organization, role=LegacyAccessControlRole.VIEWER) + user3 = make_user_for_organization(organization, role=permissions.LegacyAccessControlRole.VIEWER) _, token = make_token_for_organization(organization) client = APIClient() @@ -343,7 +361,8 @@ def test_list_users_filtered_by_granted_permission( assert response.status_code == status.HTTP_200_OK returned_user_pks = [u["pk"] for u in response.json()["results"]] - assert admin_user.public_primary_key in returned_user_pks + assert len(returned_user_pks) == 2 + assert user1.public_primary_key in returned_user_pks assert user2.public_primary_key in returned_user_pks assert user3.public_primary_key not in returned_user_pks @@ -481,10 +500,10 @@ def test_notification_chain_verbal( @pytest.mark.parametrize( "role,expected_status", [ - (LegacyAccessControlRole.ADMIN, status.HTTP_200_OK), - (LegacyAccessControlRole.EDITOR, status.HTTP_200_OK), - (LegacyAccessControlRole.VIEWER, status.HTTP_403_FORBIDDEN), - (LegacyAccessControlRole.NONE, status.HTTP_403_FORBIDDEN), + (permissions.LegacyAccessControlRole.ADMIN, status.HTTP_200_OK), + (permissions.LegacyAccessControlRole.EDITOR, status.HTTP_200_OK), + (permissions.LegacyAccessControlRole.VIEWER, status.HTTP_403_FORBIDDEN), + (permissions.LegacyAccessControlRole.NONE, status.HTTP_403_FORBIDDEN), ], ) def test_user_update_self_permissions( @@ -511,10 +530,10 @@ def test_user_update_self_permissions( @pytest.mark.parametrize( "role,expected_status", [ - (LegacyAccessControlRole.ADMIN, status.HTTP_200_OK), - (LegacyAccessControlRole.EDITOR, status.HTTP_403_FORBIDDEN), - (LegacyAccessControlRole.VIEWER, status.HTTP_403_FORBIDDEN), - (LegacyAccessControlRole.NONE, status.HTTP_403_FORBIDDEN), + (permissions.LegacyAccessControlRole.ADMIN, status.HTTP_200_OK), + (permissions.LegacyAccessControlRole.EDITOR, status.HTTP_403_FORBIDDEN), + (permissions.LegacyAccessControlRole.VIEWER, status.HTTP_403_FORBIDDEN), + (permissions.LegacyAccessControlRole.NONE, status.HTTP_403_FORBIDDEN), ], ) def test_user_update_other_permissions( @@ -540,10 +559,10 @@ def test_user_update_other_permissions( @pytest.mark.parametrize( "role,expected_status", [ - (LegacyAccessControlRole.ADMIN, status.HTTP_200_OK), - (LegacyAccessControlRole.EDITOR, status.HTTP_200_OK), - (LegacyAccessControlRole.VIEWER, status.HTTP_200_OK), - (LegacyAccessControlRole.NONE, status.HTTP_403_FORBIDDEN), + (permissions.LegacyAccessControlRole.ADMIN, status.HTTP_200_OK), + (permissions.LegacyAccessControlRole.EDITOR, status.HTTP_200_OK), + (permissions.LegacyAccessControlRole.VIEWER, status.HTTP_200_OK), + (permissions.LegacyAccessControlRole.NONE, status.HTTP_403_FORBIDDEN), ], ) def test_user_list_permissions( @@ -571,10 +590,10 @@ def test_user_list_permissions( @pytest.mark.parametrize( "role,expected_status", [ - (LegacyAccessControlRole.ADMIN, status.HTTP_200_OK), - (LegacyAccessControlRole.EDITOR, status.HTTP_200_OK), - (LegacyAccessControlRole.VIEWER, status.HTTP_200_OK), - (LegacyAccessControlRole.NONE, status.HTTP_403_FORBIDDEN), + (permissions.LegacyAccessControlRole.ADMIN, status.HTTP_200_OK), + (permissions.LegacyAccessControlRole.EDITOR, status.HTTP_200_OK), + (permissions.LegacyAccessControlRole.VIEWER, status.HTTP_200_OK), + (permissions.LegacyAccessControlRole.NONE, status.HTTP_403_FORBIDDEN), ], ) def test_user_detail_self_permissions( @@ -602,10 +621,10 @@ def test_user_detail_self_permissions( @pytest.mark.parametrize( "role,expected_status", [ - (LegacyAccessControlRole.ADMIN, status.HTTP_200_OK), - (LegacyAccessControlRole.EDITOR, status.HTTP_200_OK), - (LegacyAccessControlRole.VIEWER, status.HTTP_200_OK), - (LegacyAccessControlRole.NONE, status.HTTP_403_FORBIDDEN), + (permissions.LegacyAccessControlRole.ADMIN, status.HTTP_200_OK), + (permissions.LegacyAccessControlRole.EDITOR, status.HTTP_200_OK), + (permissions.LegacyAccessControlRole.VIEWER, status.HTTP_200_OK), + (permissions.LegacyAccessControlRole.NONE, status.HTTP_403_FORBIDDEN), ], ) def test_user_detail_other_permissions( @@ -625,7 +644,7 @@ def test_user_detail_other_permissions( assert response.status_code == expected_status # hidden information for editor/viewer available_fields = UserHiddenFieldsSerializer.fields_available_for_all_users + ["hidden_fields"] - if role in (LegacyAccessControlRole.EDITOR, LegacyAccessControlRole.VIEWER): + if role in (permissions.LegacyAccessControlRole.EDITOR, permissions.LegacyAccessControlRole.VIEWER): user_details = response.json() for f_name in user_details: if f_name not in available_fields: @@ -636,10 +655,10 @@ def test_user_detail_other_permissions( @pytest.mark.parametrize( "role,expected_status", [ - (LegacyAccessControlRole.ADMIN, status.HTTP_200_OK), - (LegacyAccessControlRole.EDITOR, status.HTTP_200_OK), - (LegacyAccessControlRole.VIEWER, status.HTTP_403_FORBIDDEN), - (LegacyAccessControlRole.NONE, status.HTTP_403_FORBIDDEN), + (permissions.LegacyAccessControlRole.ADMIN, status.HTTP_200_OK), + (permissions.LegacyAccessControlRole.EDITOR, status.HTTP_200_OK), + (permissions.LegacyAccessControlRole.VIEWER, status.HTTP_403_FORBIDDEN), + (permissions.LegacyAccessControlRole.NONE, status.HTTP_403_FORBIDDEN), ], ) def test_user_get_own_verification_code( @@ -667,10 +686,10 @@ def test_user_get_own_verification_code( @pytest.mark.parametrize( "role,expected_status", [ - (LegacyAccessControlRole.ADMIN, status.HTTP_200_OK), - (LegacyAccessControlRole.EDITOR, status.HTTP_403_FORBIDDEN), - (LegacyAccessControlRole.VIEWER, status.HTTP_403_FORBIDDEN), - (LegacyAccessControlRole.NONE, status.HTTP_403_FORBIDDEN), + (permissions.LegacyAccessControlRole.ADMIN, status.HTTP_200_OK), + (permissions.LegacyAccessControlRole.EDITOR, status.HTTP_403_FORBIDDEN), + (permissions.LegacyAccessControlRole.VIEWER, status.HTTP_403_FORBIDDEN), + (permissions.LegacyAccessControlRole.NONE, status.HTTP_403_FORBIDDEN), ], ) def test_user_get_other_verification_code( @@ -740,10 +759,10 @@ def test_verification_code_provider_exception( @pytest.mark.parametrize( "role,expected_status", [ - (LegacyAccessControlRole.ADMIN, status.HTTP_200_OK), - (LegacyAccessControlRole.EDITOR, status.HTTP_200_OK), - (LegacyAccessControlRole.VIEWER, status.HTTP_403_FORBIDDEN), - (LegacyAccessControlRole.NONE, status.HTTP_403_FORBIDDEN), + (permissions.LegacyAccessControlRole.ADMIN, status.HTTP_200_OK), + (permissions.LegacyAccessControlRole.EDITOR, status.HTTP_200_OK), + (permissions.LegacyAccessControlRole.VIEWER, status.HTTP_403_FORBIDDEN), + (permissions.LegacyAccessControlRole.NONE, status.HTTP_403_FORBIDDEN), ], ) def test_user_verify_own_phone( @@ -771,10 +790,10 @@ def test_user_verify_own_phone( @pytest.mark.parametrize( "role,expected_status", [ - (LegacyAccessControlRole.ADMIN, status.HTTP_400_BAD_REQUEST), - (LegacyAccessControlRole.EDITOR, status.HTTP_400_BAD_REQUEST), - (LegacyAccessControlRole.VIEWER, status.HTTP_403_FORBIDDEN), - (LegacyAccessControlRole.NONE, status.HTTP_403_FORBIDDEN), + (permissions.LegacyAccessControlRole.ADMIN, status.HTTP_400_BAD_REQUEST), + (permissions.LegacyAccessControlRole.EDITOR, status.HTTP_400_BAD_REQUEST), + (permissions.LegacyAccessControlRole.VIEWER, status.HTTP_403_FORBIDDEN), + (permissions.LegacyAccessControlRole.NONE, status.HTTP_403_FORBIDDEN), ], ) def test_user_get_own_telegram_verification_code_with_telegram_connected( @@ -802,10 +821,10 @@ def test_user_get_own_telegram_verification_code_with_telegram_connected( @pytest.mark.parametrize( "role,expected_status", [ - (LegacyAccessControlRole.ADMIN, status.HTTP_200_OK), - (LegacyAccessControlRole.EDITOR, status.HTTP_403_FORBIDDEN), - (LegacyAccessControlRole.VIEWER, status.HTTP_403_FORBIDDEN), - (LegacyAccessControlRole.NONE, status.HTTP_403_FORBIDDEN), + (permissions.LegacyAccessControlRole.ADMIN, status.HTTP_200_OK), + (permissions.LegacyAccessControlRole.EDITOR, status.HTTP_403_FORBIDDEN), + (permissions.LegacyAccessControlRole.VIEWER, status.HTTP_403_FORBIDDEN), + (permissions.LegacyAccessControlRole.NONE, status.HTTP_403_FORBIDDEN), ], ) def test_user_verify_another_phone( @@ -816,7 +835,7 @@ def test_user_verify_another_phone( expected_status, ): organization, tester, token = make_organization_and_user_with_plugin_token(role) - other_user = make_user_for_organization(organization, role=LegacyAccessControlRole.EDITOR) + other_user = make_user_for_organization(organization, role=permissions.LegacyAccessControlRole.EDITOR) client = APIClient() url = reverse("api-internal:user-verify-number", kwargs={"pk": other_user.public_primary_key}) @@ -831,10 +850,10 @@ def test_user_verify_another_phone( @pytest.mark.parametrize( "role,expected_status", [ - (LegacyAccessControlRole.ADMIN, status.HTTP_200_OK), - (LegacyAccessControlRole.EDITOR, status.HTTP_200_OK), - (LegacyAccessControlRole.VIEWER, status.HTTP_403_FORBIDDEN), - (LegacyAccessControlRole.NONE, status.HTTP_403_FORBIDDEN), + (permissions.LegacyAccessControlRole.ADMIN, status.HTTP_200_OK), + (permissions.LegacyAccessControlRole.EDITOR, status.HTTP_200_OK), + (permissions.LegacyAccessControlRole.VIEWER, status.HTTP_403_FORBIDDEN), + (permissions.LegacyAccessControlRole.NONE, status.HTTP_403_FORBIDDEN), ], ) def test_user_get_own_telegram_verification_code( @@ -856,10 +875,10 @@ def test_user_get_own_telegram_verification_code( @pytest.mark.parametrize( "role,expected_status", [ - (LegacyAccessControlRole.ADMIN, status.HTTP_200_OK), - (LegacyAccessControlRole.EDITOR, status.HTTP_403_FORBIDDEN), - (LegacyAccessControlRole.VIEWER, status.HTTP_403_FORBIDDEN), - (LegacyAccessControlRole.NONE, status.HTTP_403_FORBIDDEN), + (permissions.LegacyAccessControlRole.ADMIN, status.HTTP_200_OK), + (permissions.LegacyAccessControlRole.EDITOR, status.HTTP_403_FORBIDDEN), + (permissions.LegacyAccessControlRole.VIEWER, status.HTTP_403_FORBIDDEN), + (permissions.LegacyAccessControlRole.NONE, status.HTTP_403_FORBIDDEN), ], ) def test_user_get_another_telegram_verification_code( @@ -870,7 +889,7 @@ def test_user_get_another_telegram_verification_code( expected_status, ): organization, tester, token = make_organization_and_user_with_plugin_token(role) - other_user = make_user_for_organization(organization, role=LegacyAccessControlRole.EDITOR) + other_user = make_user_for_organization(organization, role=permissions.LegacyAccessControlRole.EDITOR) client = APIClient() url = reverse("api-internal:user-get-telegram-verification-code", kwargs={"pk": other_user.public_primary_key}) @@ -886,7 +905,7 @@ def test_admin_can_update_user( make_user_auth_headers, ): organization, first_user, token = make_organization_and_user_with_plugin_token() - second_user = make_user_for_organization(organization, role=LegacyAccessControlRole.ADMIN) + second_user = make_user_for_organization(organization, role=permissions.LegacyAccessControlRole.ADMIN) client = APIClient() data = { @@ -903,7 +922,7 @@ def test_admin_can_update_user( @pytest.mark.django_db def test_admin_can_update_himself(make_organization_and_user_with_plugin_token, make_user_auth_headers): - _, user, token = make_organization_and_user_with_plugin_token(role=LegacyAccessControlRole.ADMIN) + _, user, token = make_organization_and_user_with_plugin_token(role=permissions.LegacyAccessControlRole.ADMIN) client = APIClient() data = { @@ -921,7 +940,7 @@ def test_admin_can_update_himself(make_organization_and_user_with_plugin_token, @pytest.mark.django_db def test_admin_can_list_users(make_organization_and_user_with_plugin_token, make_user_auth_headers): - _, user, token = make_organization_and_user_with_plugin_token(role=LegacyAccessControlRole.ADMIN) + _, user, token = make_organization_and_user_with_plugin_token(role=permissions.LegacyAccessControlRole.ADMIN) client = APIClient() @@ -938,7 +957,7 @@ def test_admin_can_detail_users( make_user_auth_headers, ): organization, first_user, token = make_organization_and_user_with_plugin_token() - second_user = make_user_for_organization(organization, role=LegacyAccessControlRole.ADMIN) + second_user = make_user_for_organization(organization, role=permissions.LegacyAccessControlRole.ADMIN) client = APIClient() @@ -955,7 +974,7 @@ def test_admin_can_get_own_verification_code( make_organization_and_user_with_plugin_token, make_user_auth_headers, ): - _, user, token = make_organization_and_user_with_plugin_token(role=LegacyAccessControlRole.ADMIN) + _, user, token = make_organization_and_user_with_plugin_token(role=permissions.LegacyAccessControlRole.ADMIN) client = APIClient() url = reverse("api-internal:user-get-verification-code", kwargs={"pk": user.public_primary_key}) @@ -973,7 +992,7 @@ def test_admin_can_get_another_user_verification_code( make_user_auth_headers, ): organization, first_user, token = make_organization_and_user_with_plugin_token() - second_user = make_user_for_organization(organization, role=LegacyAccessControlRole.ADMIN) + second_user = make_user_for_organization(organization, role=permissions.LegacyAccessControlRole.ADMIN) client = APIClient() url = reverse("api-internal:user-get-verification-code", kwargs={"pk": first_user.public_primary_key}) @@ -988,7 +1007,7 @@ def test_admin_can_verify_own_phone( make_organization_and_user_with_plugin_token, make_user_auth_headers, ): - _, user, token = make_organization_and_user_with_plugin_token(role=LegacyAccessControlRole.ADMIN) + _, user, token = make_organization_and_user_with_plugin_token(role=permissions.LegacyAccessControlRole.ADMIN) client = APIClient() url = reverse("api-internal:user-verify-number", kwargs={"pk": user.public_primary_key}) @@ -1005,7 +1024,7 @@ def test_admin_can_verify_another_user_phone( make_user_auth_headers, ): organization, first_user, token = make_organization_and_user_with_plugin_token() - second_user = make_user_for_organization(organization, role=LegacyAccessControlRole.ADMIN) + second_user = make_user_for_organization(organization, role=permissions.LegacyAccessControlRole.ADMIN) client = APIClient() url = reverse("api-internal:user-verify-number", kwargs={"pk": first_user.public_primary_key}) @@ -1018,7 +1037,7 @@ def test_admin_can_verify_another_user_phone( def test_admin_can_get_own_telegram_verification_code( make_organization_and_user_with_plugin_token, make_user_auth_headers ): - _, user, token = make_organization_and_user_with_plugin_token(role=LegacyAccessControlRole.ADMIN) + _, user, token = make_organization_and_user_with_plugin_token(role=permissions.LegacyAccessControlRole.ADMIN) client = APIClient() url = reverse("api-internal:user-get-telegram-verification-code", kwargs={"pk": user.public_primary_key}) @@ -1034,7 +1053,7 @@ def test_admin_can_get_another_user_telegram_verification_code( make_user_auth_headers, ): organization, first_user, token = make_organization_and_user_with_plugin_token() - second_user = make_user_for_organization(organization, role=LegacyAccessControlRole.ADMIN) + second_user = make_user_for_organization(organization, role=permissions.LegacyAccessControlRole.ADMIN) client = APIClient() url = reverse("api-internal:user-get-telegram-verification-code", kwargs={"pk": first_user.public_primary_key}) @@ -1050,7 +1069,7 @@ def test_admin_can_get_another_user_backend_verification_code( make_user_auth_headers, ): organization, first_user, token = make_organization_and_user_with_plugin_token() - second_user = make_user_for_organization(organization, role=LegacyAccessControlRole.ADMIN) + second_user = make_user_for_organization(organization, role=permissions.LegacyAccessControlRole.ADMIN) client = APIClient() url = ( @@ -1069,7 +1088,7 @@ def test_admin_can_unlink_another_user_backend_account( make_user_auth_headers, ): organization, first_user, token = make_organization_and_user_with_plugin_token() - second_user = make_user_for_organization(organization, role=LegacyAccessControlRole.ADMIN) + second_user = make_user_for_organization(organization, role=permissions.LegacyAccessControlRole.ADMIN) client = APIClient() url = ( @@ -1092,7 +1111,7 @@ def test_admin_can_unlink_another_user_slack_account( _, token = make_token_for_organization(organization) user, _ = make_user_with_slack_user_identity( - slack_team_identity, organization, slack_id="user_2", role=LegacyAccessControlRole.ADMIN + slack_team_identity, organization, slack_id="user_2", role=permissions.LegacyAccessControlRole.ADMIN ) other_user = make_user_for_organization(organization) @@ -1114,8 +1133,10 @@ def test_user_cant_update_user( make_user_for_organization, make_user_auth_headers, ): - organization, first_user, token = make_organization_and_user_with_plugin_token(role=LegacyAccessControlRole.EDITOR) - second_user = make_user_for_organization(organization, role=LegacyAccessControlRole.EDITOR) + organization, first_user, token = make_organization_and_user_with_plugin_token( + role=permissions.LegacyAccessControlRole.EDITOR + ) + second_user = make_user_for_organization(organization, role=permissions.LegacyAccessControlRole.EDITOR) client = APIClient() data = { @@ -1132,7 +1153,7 @@ def test_user_cant_update_user( @pytest.mark.django_db def test_user_can_update_themself(make_organization_and_user_with_plugin_token, make_user_auth_headers): - _, user, token = make_organization_and_user_with_plugin_token(role=LegacyAccessControlRole.EDITOR) + _, user, token = make_organization_and_user_with_plugin_token(role=permissions.LegacyAccessControlRole.EDITOR) client = APIClient() data = { @@ -1150,7 +1171,7 @@ def test_user_can_update_themself(make_organization_and_user_with_plugin_token, @pytest.mark.django_db def test_user_can_list_users(make_organization_and_user_with_plugin_token, make_user_auth_headers): - _, user, token = make_organization_and_user_with_plugin_token(role=LegacyAccessControlRole.EDITOR) + _, user, token = make_organization_and_user_with_plugin_token(role=permissions.LegacyAccessControlRole.EDITOR) client = APIClient() @@ -1164,8 +1185,10 @@ def test_user_can_list_users(make_organization_and_user_with_plugin_token, make_ def test_user_can_detail_users( make_organization_and_user_with_plugin_token, make_user_for_organization, make_user_auth_headers ): - organization, first_user, token = make_organization_and_user_with_plugin_token(role=LegacyAccessControlRole.EDITOR) - second_user = make_user_for_organization(organization, role=LegacyAccessControlRole.EDITOR) + organization, first_user, token = make_organization_and_user_with_plugin_token( + role=permissions.LegacyAccessControlRole.EDITOR + ) + second_user = make_user_for_organization(organization, role=permissions.LegacyAccessControlRole.EDITOR) client = APIClient() url = reverse("api-internal:user-detail", kwargs={"pk": first_user.public_primary_key}) @@ -1185,7 +1208,7 @@ def test_user_can_detail_users( def test_user_can_get_own_verification_code( mock_verification_start, make_organization_and_user_with_plugin_token, make_user_auth_headers ): - _, user, token = make_organization_and_user_with_plugin_token(role=LegacyAccessControlRole.EDITOR) + _, user, token = make_organization_and_user_with_plugin_token(role=permissions.LegacyAccessControlRole.EDITOR) client = APIClient() url = reverse("api-internal:user-get-verification-code", kwargs={"pk": user.public_primary_key}) @@ -1202,8 +1225,10 @@ def test_user_cant_get_another_user_verification_code( make_user_for_organization, make_user_auth_headers, ): - organization, first_user, token = make_organization_and_user_with_plugin_token(role=LegacyAccessControlRole.EDITOR) - second_user = make_user_for_organization(organization, role=LegacyAccessControlRole.EDITOR) + organization, first_user, token = make_organization_and_user_with_plugin_token( + role=permissions.LegacyAccessControlRole.EDITOR + ) + second_user = make_user_for_organization(organization, role=permissions.LegacyAccessControlRole.EDITOR) client = APIClient() url = reverse("api-internal:user-get-verification-code", kwargs={"pk": first_user.public_primary_key}) @@ -1217,7 +1242,7 @@ def test_user_cant_get_another_user_verification_code( def test_user_can_verify_own_phone( mocked_verification_check, make_organization_and_user_with_plugin_token, make_user_auth_headers ): - _, user, token = make_organization_and_user_with_plugin_token(role=LegacyAccessControlRole.EDITOR) + _, user, token = make_organization_and_user_with_plugin_token(role=permissions.LegacyAccessControlRole.EDITOR) client = APIClient() url = reverse("api-internal:user-verify-number", kwargs={"pk": user.public_primary_key}) @@ -1234,8 +1259,10 @@ def test_user_cant_verify_another_user_phone( make_user_for_organization, make_user_auth_headers, ): - organization, first_user, token = make_organization_and_user_with_plugin_token(role=LegacyAccessControlRole.EDITOR) - second_user = make_user_for_organization(organization, role=LegacyAccessControlRole.EDITOR) + organization, first_user, token = make_organization_and_user_with_plugin_token( + role=permissions.LegacyAccessControlRole.EDITOR + ) + second_user = make_user_for_organization(organization, role=permissions.LegacyAccessControlRole.EDITOR) client = APIClient() url = reverse("api-internal:user-verify-number", kwargs={"pk": first_user.public_primary_key}) @@ -1248,7 +1275,7 @@ def test_user_cant_verify_another_user_phone( def test_user_can_get_own_telegram_verification_code( make_organization_and_user_with_plugin_token, make_user_auth_headers ): - _, user, token = make_organization_and_user_with_plugin_token(role=LegacyAccessControlRole.EDITOR) + _, user, token = make_organization_and_user_with_plugin_token(role=permissions.LegacyAccessControlRole.EDITOR) client = APIClient() url = reverse("api-internal:user-get-telegram-verification-code", kwargs={"pk": user.public_primary_key}) @@ -1263,8 +1290,10 @@ def test_user_cant_get_another_user_telegram_verification_code( make_user_for_organization, make_user_auth_headers, ): - organization, first_user, token = make_organization_and_user_with_plugin_token(role=LegacyAccessControlRole.EDITOR) - second_user = make_user_for_organization(organization, role=LegacyAccessControlRole.EDITOR) + organization, first_user, token = make_organization_and_user_with_plugin_token( + role=permissions.LegacyAccessControlRole.EDITOR + ) + second_user = make_user_for_organization(organization, role=permissions.LegacyAccessControlRole.EDITOR) client = APIClient() url = reverse("api-internal:user-get-telegram-verification-code", kwargs={"pk": first_user.public_primary_key}) @@ -1277,7 +1306,7 @@ def test_user_cant_get_another_user_telegram_verification_code( def test_user_can_get_own_backend_verification_code( make_organization_and_user_with_plugin_token, make_user_auth_headers ): - _, user, token = make_organization_and_user_with_plugin_token(role=LegacyAccessControlRole.EDITOR) + _, user, token = make_organization_and_user_with_plugin_token(role=permissions.LegacyAccessControlRole.EDITOR) client = APIClient() url = ( @@ -1302,8 +1331,10 @@ def test_user_cant_get_another_user_backend_verification_code( make_user_for_organization, make_user_auth_headers, ): - organization, first_user, token = make_organization_and_user_with_plugin_token(role=LegacyAccessControlRole.EDITOR) - second_user = make_user_for_organization(organization, role=LegacyAccessControlRole.EDITOR) + organization, first_user, token = make_organization_and_user_with_plugin_token( + role=permissions.LegacyAccessControlRole.EDITOR + ) + second_user = make_user_for_organization(organization, role=permissions.LegacyAccessControlRole.EDITOR) client = APIClient() url = ( @@ -1324,7 +1355,7 @@ def test_user_can_unlink_own_slack_account( ): organization, slack_team_identity = make_organization_with_slack_team_identity() user, _ = make_user_with_slack_user_identity( - slack_team_identity, organization, slack_id="user_2", role=LegacyAccessControlRole.EDITOR + slack_team_identity, organization, slack_id="user_2", role=permissions.LegacyAccessControlRole.EDITOR ) _, token = make_token_for_organization(organization) @@ -1339,7 +1370,7 @@ def test_user_can_unlink_own_slack_account( @pytest.mark.django_db def test_user_can_unlink_backend_own_account(make_organization_and_user_with_plugin_token, make_user_auth_headers): - _, user, token = make_organization_and_user_with_plugin_token(role=LegacyAccessControlRole.EDITOR) + _, user, token = make_organization_and_user_with_plugin_token(role=permissions.LegacyAccessControlRole.EDITOR) client = APIClient() url = reverse("api-internal:user-unlink-backend", kwargs={"pk": user.public_primary_key}) + "?backend=TESTONLY" @@ -1351,7 +1382,7 @@ def test_user_can_unlink_backend_own_account(make_organization_and_user_with_plu @pytest.mark.django_db def test_user_unlink_backend_invalid_backend_id(make_organization_and_user_with_plugin_token, make_user_auth_headers): - _, user, token = make_organization_and_user_with_plugin_token(role=LegacyAccessControlRole.EDITOR) + _, user, token = make_organization_and_user_with_plugin_token(role=permissions.LegacyAccessControlRole.EDITOR) client = APIClient() url = reverse("api-internal:user-unlink-backend", kwargs={"pk": user.public_primary_key}) + "?backend=INVALID" @@ -1365,7 +1396,7 @@ def test_user_unlink_backend_invalid_backend_id(make_organization_and_user_with_ def test_user_unlink_backend_backend_account_not_found( make_organization_and_user_with_plugin_token, make_user_auth_headers ): - _, user, token = make_organization_and_user_with_plugin_token(role=LegacyAccessControlRole.EDITOR) + _, user, token = make_organization_and_user_with_plugin_token(role=permissions.LegacyAccessControlRole.EDITOR) client = APIClient() url = reverse("api-internal:user-unlink-backend", kwargs={"pk": user.public_primary_key}) + "?backend=TESTONLY" @@ -1385,10 +1416,10 @@ def test_user_cant_unlink_slack_another_user( organization, slack_team_identity = make_organization_with_slack_team_identity() first_user, _ = make_user_with_slack_user_identity( - slack_team_identity, organization, slack_id="user_1", role=LegacyAccessControlRole.EDITOR + slack_team_identity, organization, slack_id="user_1", role=permissions.LegacyAccessControlRole.EDITOR ) second_user, _ = make_user_with_slack_user_identity( - slack_team_identity, organization, slack_id="user_2", role=LegacyAccessControlRole.EDITOR + slack_team_identity, organization, slack_id="user_2", role=permissions.LegacyAccessControlRole.EDITOR ) _, token = make_token_for_organization(organization) @@ -1405,8 +1436,10 @@ def test_user_cant_unlink_slack_another_user( def test_user_cant_unlink_backend_another_user( make_organization_and_user_with_plugin_token, make_user_for_organization, make_user_auth_headers ): - organization, first_user, token = make_organization_and_user_with_plugin_token(role=LegacyAccessControlRole.EDITOR) - second_user = make_user_for_organization(organization, role=LegacyAccessControlRole.EDITOR) + organization, first_user, token = make_organization_and_user_with_plugin_token( + role=permissions.LegacyAccessControlRole.EDITOR + ) + second_user = make_user_for_organization(organization, role=permissions.LegacyAccessControlRole.EDITOR) client = APIClient() url = ( @@ -1424,12 +1457,14 @@ def test_user_cant_unlink_backend_another_user( def test_viewer_cant_update_user( make_organization_and_user_with_plugin_token, make_user_for_organization, make_user_auth_headers ): - organization, first_user, token = make_organization_and_user_with_plugin_token(role=LegacyAccessControlRole.VIEWER) - second_user = make_user_for_organization(organization, role=LegacyAccessControlRole.VIEWER) + organization, first_user, token = make_organization_and_user_with_plugin_token( + role=permissions.LegacyAccessControlRole.VIEWER + ) + second_user = make_user_for_organization(organization, role=permissions.LegacyAccessControlRole.VIEWER) data = { "email": "test@amixr.io", - "role": LegacyAccessControlRole.EDITOR, + "role": permissions.LegacyAccessControlRole.EDITOR, "username": "updated_test_username", "unverified_phone_number": "+1234567890", "slack_login": "", @@ -1444,11 +1479,11 @@ def test_viewer_cant_update_user( @pytest.mark.django_db def test_viewer_cant_update_himself(make_organization_and_user_with_plugin_token, make_user_auth_headers): - _, user, token = make_organization_and_user_with_plugin_token(role=LegacyAccessControlRole.VIEWER) + _, user, token = make_organization_and_user_with_plugin_token(role=permissions.LegacyAccessControlRole.VIEWER) data = { "email": "test@amixr.io", - "role": LegacyAccessControlRole.VIEWER, + "role": permissions.LegacyAccessControlRole.VIEWER, "username": "updated_test_username", "unverified_phone_number": "+1234567890", "slack_login": "", @@ -1463,7 +1498,7 @@ def test_viewer_cant_update_himself(make_organization_and_user_with_plugin_token @pytest.mark.django_db def test_viewer_can_list_users(make_organization_and_user_with_plugin_token, make_user_auth_headers): - _, user, token = make_organization_and_user_with_plugin_token(role=LegacyAccessControlRole.VIEWER) + _, user, token = make_organization_and_user_with_plugin_token(role=permissions.LegacyAccessControlRole.VIEWER) client = APIClient() url = reverse("api-internal:user-list") @@ -1477,7 +1512,7 @@ def test_viewer_can_list_users(make_organization_and_user_with_plugin_token, mak def test_viewer_cant_get_own_verification_code( mock_verification_start, make_organization_and_user_with_plugin_token, make_user_auth_headers ): - _, user, token = make_organization_and_user_with_plugin_token(role=LegacyAccessControlRole.VIEWER) + _, user, token = make_organization_and_user_with_plugin_token(role=permissions.LegacyAccessControlRole.VIEWER) client = APIClient() url = reverse("api-internal:user-get-verification-code", kwargs={"pk": user.public_primary_key}) @@ -1494,8 +1529,10 @@ def test_viewer_cant_get_another_user_verification_code( make_user_for_organization, make_user_auth_headers, ): - organization, first_user, token = make_organization_and_user_with_plugin_token(role=LegacyAccessControlRole.EDITOR) - second_user = make_user_for_organization(organization, role=LegacyAccessControlRole.VIEWER) + organization, first_user, token = make_organization_and_user_with_plugin_token( + role=permissions.LegacyAccessControlRole.EDITOR + ) + second_user = make_user_for_organization(organization, role=permissions.LegacyAccessControlRole.VIEWER) client = APIClient() url = reverse("api-internal:user-get-verification-code", kwargs={"pk": first_user.public_primary_key}) @@ -1509,7 +1546,7 @@ def test_viewer_cant_get_another_user_verification_code( def test_viewer_cant_verify_own_phone( mocked_verification_check, make_organization_and_user_with_plugin_token, make_user_auth_headers ): - _, user, token = make_organization_and_user_with_plugin_token(role=LegacyAccessControlRole.VIEWER) + _, user, token = make_organization_and_user_with_plugin_token(role=permissions.LegacyAccessControlRole.VIEWER) client = APIClient() url = reverse("api-internal:user-verify-number", kwargs={"pk": user.public_primary_key}) @@ -1526,8 +1563,10 @@ def test_viewer_cant_verify_another_user_phone( make_user_for_organization, make_user_auth_headers, ): - organization, first_user, token = make_organization_and_user_with_plugin_token(role=LegacyAccessControlRole.EDITOR) - second_user = make_user_for_organization(organization, role=LegacyAccessControlRole.VIEWER) + organization, first_user, token = make_organization_and_user_with_plugin_token( + role=permissions.LegacyAccessControlRole.EDITOR + ) + second_user = make_user_for_organization(organization, role=permissions.LegacyAccessControlRole.VIEWER) client = APIClient() url = reverse("api-internal:user-verify-number", kwargs={"pk": first_user.public_primary_key}) @@ -1540,7 +1579,7 @@ def test_viewer_cant_verify_another_user_phone( def test_viewer_cant_get_own_telegram_verification_code( make_organization_and_user_with_plugin_token, make_user_auth_headers ): - _, user, token = make_organization_and_user_with_plugin_token(role=LegacyAccessControlRole.VIEWER) + _, user, token = make_organization_and_user_with_plugin_token(role=permissions.LegacyAccessControlRole.VIEWER) client = APIClient() url = reverse("api-internal:user-get-telegram-verification-code", kwargs={"pk": user.public_primary_key}) @@ -1553,8 +1592,10 @@ def test_viewer_cant_get_own_telegram_verification_code( def test_viewer_cant_get_another_user_telegram_verification_code( make_organization_and_user_with_plugin_token, make_user_for_organization, make_user_auth_headers ): - organization, first_user, token = make_organization_and_user_with_plugin_token(role=LegacyAccessControlRole.EDITOR) - second_user = make_user_for_organization(organization, role=LegacyAccessControlRole.VIEWER) + organization, first_user, token = make_organization_and_user_with_plugin_token( + role=permissions.LegacyAccessControlRole.EDITOR + ) + second_user = make_user_for_organization(organization, role=permissions.LegacyAccessControlRole.VIEWER) client = APIClient() url = reverse("api-internal:user-get-telegram-verification-code", kwargs={"pk": first_user.public_primary_key}) @@ -1567,12 +1608,12 @@ def test_viewer_cant_get_another_user_telegram_verification_code( @pytest.mark.parametrize( "role,expected_status,initial_unverified_number,initial_verified_number", [ - (LegacyAccessControlRole.ADMIN, status.HTTP_200_OK, "+1234567890", None), - (LegacyAccessControlRole.EDITOR, status.HTTP_200_OK, "+1234567890", None), - (LegacyAccessControlRole.VIEWER, status.HTTP_403_FORBIDDEN, "+1234567890", None), - (LegacyAccessControlRole.ADMIN, status.HTTP_200_OK, None, "+1234567890"), - (LegacyAccessControlRole.EDITOR, status.HTTP_200_OK, None, "+1234567890"), - (LegacyAccessControlRole.VIEWER, status.HTTP_403_FORBIDDEN, None, "+1234567890"), + (permissions.LegacyAccessControlRole.ADMIN, status.HTTP_200_OK, "+1234567890", None), + (permissions.LegacyAccessControlRole.EDITOR, status.HTTP_200_OK, "+1234567890", None), + (permissions.LegacyAccessControlRole.VIEWER, status.HTTP_403_FORBIDDEN, "+1234567890", None), + (permissions.LegacyAccessControlRole.ADMIN, status.HTTP_200_OK, None, "+1234567890"), + (permissions.LegacyAccessControlRole.EDITOR, status.HTTP_200_OK, None, "+1234567890"), + (permissions.LegacyAccessControlRole.VIEWER, status.HTTP_403_FORBIDDEN, None, "+1234567890"), ], ) def test_forget_own_number( @@ -1613,12 +1654,12 @@ def test_forget_own_number( @pytest.mark.parametrize( "role,expected_status,initial_unverified_number,initial_verified_number", [ - (LegacyAccessControlRole.ADMIN, status.HTTP_200_OK, "+1234567890", None), - (LegacyAccessControlRole.EDITOR, status.HTTP_403_FORBIDDEN, "+1234567890", None), - (LegacyAccessControlRole.VIEWER, status.HTTP_403_FORBIDDEN, "+1234567890", None), - (LegacyAccessControlRole.ADMIN, status.HTTP_200_OK, None, "+1234567890"), - (LegacyAccessControlRole.EDITOR, status.HTTP_403_FORBIDDEN, None, "+1234567890"), - (LegacyAccessControlRole.VIEWER, status.HTTP_403_FORBIDDEN, None, "+1234567890"), + (permissions.LegacyAccessControlRole.ADMIN, status.HTTP_200_OK, "+1234567890", None), + (permissions.LegacyAccessControlRole.EDITOR, status.HTTP_403_FORBIDDEN, "+1234567890", None), + (permissions.LegacyAccessControlRole.VIEWER, status.HTTP_403_FORBIDDEN, "+1234567890", None), + (permissions.LegacyAccessControlRole.ADMIN, status.HTTP_200_OK, None, "+1234567890"), + (permissions.LegacyAccessControlRole.EDITOR, status.HTTP_403_FORBIDDEN, None, "+1234567890"), + (permissions.LegacyAccessControlRole.VIEWER, status.HTTP_403_FORBIDDEN, None, "+1234567890"), ], ) def test_forget_other_number( @@ -1662,8 +1703,10 @@ def test_forget_other_number( def test_viewer_cant_get_another_user_backend_verification_code( make_organization_and_user_with_plugin_token, make_user_for_organization, make_user_auth_headers ): - organization, first_user, token = make_organization_and_user_with_plugin_token(role=LegacyAccessControlRole.EDITOR) - second_user = make_user_for_organization(organization, role=LegacyAccessControlRole.VIEWER) + organization, first_user, token = make_organization_and_user_with_plugin_token( + role=permissions.LegacyAccessControlRole.EDITOR + ) + second_user = make_user_for_organization(organization, role=permissions.LegacyAccessControlRole.VIEWER) client = APIClient() url = ( @@ -1679,8 +1722,10 @@ def test_viewer_cant_get_another_user_backend_verification_code( def test_viewer_cant_unlink_backend_another_user( make_organization_and_user_with_plugin_token, make_user_for_organization, make_user_auth_headers ): - organization, first_user, token = make_organization_and_user_with_plugin_token(role=LegacyAccessControlRole.EDITOR) - second_user = make_user_for_organization(organization, role=LegacyAccessControlRole.VIEWER) + organization, first_user, token = make_organization_and_user_with_plugin_token( + role=permissions.LegacyAccessControlRole.EDITOR + ) + second_user = make_user_for_organization(organization, role=permissions.LegacyAccessControlRole.VIEWER) client = APIClient() url = ( @@ -1693,7 +1738,7 @@ def test_viewer_cant_unlink_backend_another_user( @pytest.mark.django_db def test_change_timezone(make_organization_and_user_with_plugin_token, make_user_auth_headers): - _, user, token = make_organization_and_user_with_plugin_token(role=LegacyAccessControlRole.EDITOR) + _, user, token = make_organization_and_user_with_plugin_token(role=permissions.LegacyAccessControlRole.EDITOR) client = APIClient() url = reverse("api-internal:user-detail", kwargs={"pk": user.public_primary_key}) @@ -1709,7 +1754,7 @@ def test_change_timezone(make_organization_and_user_with_plugin_token, make_user @pytest.mark.django_db @pytest.mark.parametrize("timezone", ["", 1, "NotATimezone"]) def test_invalid_timezone(make_organization_and_user_with_plugin_token, make_user_auth_headers, timezone): - _, user, token = make_organization_and_user_with_plugin_token(role=LegacyAccessControlRole.EDITOR) + _, user, token = make_organization_and_user_with_plugin_token(role=permissions.LegacyAccessControlRole.EDITOR) client = APIClient() url = reverse("api-internal:user-detail", kwargs={"pk": user.public_primary_key}) @@ -1722,7 +1767,7 @@ def test_invalid_timezone(make_organization_and_user_with_plugin_token, make_use @pytest.mark.django_db def test_change_working_hours(make_organization_and_user_with_plugin_token, make_user_auth_headers): - _, user, token = make_organization_and_user_with_plugin_token(role=LegacyAccessControlRole.EDITOR) + _, user, token = make_organization_and_user_with_plugin_token(role=permissions.LegacyAccessControlRole.EDITOR) client = APIClient() url = reverse("api-internal:user-detail", kwargs={"pk": user.public_primary_key}) @@ -1758,7 +1803,7 @@ def test_change_working_hours(make_organization_and_user_with_plugin_token, make def test_invalid_working_hours( make_organization_and_user_with_plugin_token, make_user_auth_headers, working_hours_extra ): - _, user, token = make_organization_and_user_with_plugin_token(role=LegacyAccessControlRole.EDITOR) + _, user, token = make_organization_and_user_with_plugin_token(role=permissions.LegacyAccessControlRole.EDITOR) client = APIClient() url = reverse("api-internal:user-detail", kwargs={"pk": user.public_primary_key}) diff --git a/engine/apps/api/views/user.py b/engine/apps/api/views/user.py index d1cd14c788..56c46b35b3 100644 --- a/engine/apps/api/views/user.py +++ b/engine/apps/api/views/user.py @@ -21,10 +21,10 @@ from apps.api.permissions import ( ALL_PERMISSION_CHOICES, + ALL_PERMISSION_NAME_TO_CLASS_MAP, IsOwnerOrHasRBACPermissions, LegacyAccessControlRole, RBACPermission, - get_permission_from_permission_string, user_is_authorized, ) from apps.api.serializers.team import TeamSerializer @@ -185,14 +185,12 @@ class Meta: fields = ["email", "roles", "permission"] def filter_by_permission(self, queryset, name, value): - rbac_permission = get_permission_from_permission_string(value) + rbac_permission = ALL_PERMISSION_NAME_TO_CLASS_MAP.get(value, None) if not rbac_permission: # TODO: maybe raise a 400 here? return queryset - return queryset.filter( - **User.build_permissions_query(rbac_permission, self.request.user.organization), - ) + return queryset.filter_by_permission(rbac_permission, self.request.user.organization) class UserView( diff --git a/engine/apps/auth_token/auth.py b/engine/apps/auth_token/auth.py index 19374a7b16..d86ad6eedd 100644 --- a/engine/apps/auth_token/auth.py +++ b/engine/apps/auth_token/auth.py @@ -9,7 +9,7 @@ from rest_framework.authentication import BaseAuthentication, get_authorization_header from rest_framework.request import Request -from apps.api.permissions import GrafanaAPIPermission, LegacyAccessControlRole, RBACPermission, user_is_authorized +from apps.api.permissions import GrafanaAPIPermissions, LegacyAccessControlRole, RBACPermission, user_is_authorized from apps.grafana_plugin.helpers.gcom import check_token from apps.grafana_plugin.sync_data import SyncPermission, SyncUser from apps.user_management.exceptions import OrganizationDeletedException, OrganizationMovedException @@ -385,7 +385,7 @@ def authenticate_credentials(self, organization, token): name="Grafana Service Account", username="grafana_service_account", role=role, - permissions=[GrafanaAPIPermission(action=key) for key, _ in permissions.items()], + permissions=GrafanaAPIPermissions.construct_permissions(permissions.keys()), ) auth_token = ApiAuthToken(organization=organization, user=user, name="Grafana Service Account") diff --git a/engine/apps/grafana_plugin/helpers/client.py b/engine/apps/grafana_plugin/helpers/client.py index a5c1131c38..f1abc7a0ae 100644 --- a/engine/apps/grafana_plugin/helpers/client.py +++ b/engine/apps/grafana_plugin/helpers/client.py @@ -8,7 +8,7 @@ from django.conf import settings from rest_framework import status -from apps.api.permissions import GrafanaAPIPermission +from apps.api.permissions import GrafanaAPIPermission, GrafanaAPIPermissions from common.constants.plugin_ids import PluginID logger = logging.getLogger(__name__) @@ -238,7 +238,7 @@ def get_users_permissions(self) -> typing.Optional[UserPermissionsDict]: all_users_permissions: UserPermissionsDict = {} for user_id, user_permissions in data.items(): - all_users_permissions[user_id] = [GrafanaAPIPermission(action=key) for key, _ in user_permissions.items()] + all_users_permissions[user_id] = GrafanaAPIPermissions.construct_permissions(user_permissions.keys()) return all_users_permissions diff --git a/engine/apps/oss_installation/views/cloud_users.py b/engine/apps/oss_installation/views/cloud_users.py index 8261a73472..3882b600aa 100644 --- a/engine/apps/oss_installation/views/cloud_users.py +++ b/engine/apps/oss_installation/views/cloud_users.py @@ -37,10 +37,7 @@ class CloudUsersView(CloudUsersPagination, APIView): def get(self, request): organization = request.user.organization - queryset = User.objects.filter( - organization=organization, - **User.build_permissions_query(RBACPermission.Permissions.NOTIFICATIONS_READ, organization), - ) + queryset = User.objects.filter_by_permission(RBACPermission.Permissions.NOTIFICATIONS_READ, organization) if request.user.current_team is not None: queryset = queryset.filter(teams=request.user.current_team).distinct() diff --git a/engine/apps/schedules/ical_utils.py b/engine/apps/schedules/ical_utils.py index 614d08513a..c3917324ec 100644 --- a/engine/apps/schedules/ical_utils.py +++ b/engine/apps/schedules/ical_utils.py @@ -72,9 +72,12 @@ def users_in_ical( organization: "Organization", ) -> typing.List["User"]: """ - This method returns a sequence of `User` objects, filtered by users whose username, or case-insensitive e-mail, + This method returns a list of `User` objects, filtered by users whose username, or case-insensitive e-mail, is present in `usernames_from_ical`. + Additionally, it filters the users by the organization they belong to and checks if they have the required + permission to receive notifications. + Parameters ---------- usernames_from_ical : typing.List[str] @@ -86,18 +89,17 @@ def users_in_ical( emails_from_ical = [username.lower() for username in usernames_from_ical] - users_found_in_ical = organization.users.filter( - (Q(username__in=usernames_from_ical) | Q(email__lower__in=emails_from_ical)) - ).distinct() - - if organization.is_rbac_permissions_enabled: - # it is more efficient to check permissions on the subset of users filtered above - # than performing a regex query for the required permission - users_found_in_ical = [u for u in users_found_in_ical if {"action": required_permission.value} in u.permissions] - else: - users_found_in_ical = users_found_in_ical.filter(role__lte=required_permission.fallback_role.value) + # NOTE: doing a select_related for organization here, since we will be accessing u.organization for each user + # in the required_permission.user_has_permission calls below + users_found_in_ical = ( + organization.users.filter((Q(username__in=usernames_from_ical) | Q(email__lower__in=emails_from_ical))) + .distinct() + .select_related("organization") + ) - return list(users_found_in_ical) + # it is more efficient to check permissions on the subset of users filtered above + # than performing a regex query for the required permission + return [u for u in users_found_in_ical if required_permission.user_has_permission(u)] @timed_lru_cache(timeout=100) diff --git a/engine/apps/schedules/tests/test_ical_utils.py b/engine/apps/schedules/tests/test_ical_utils.py index ddb97ff3da..d3b6d7a463 100644 --- a/engine/apps/schedules/tests/test_ical_utils.py +++ b/engine/apps/schedules/tests/test_ical_utils.py @@ -9,7 +9,7 @@ from django.core.cache import cache from django.utils import timezone -from apps.api.permissions import LegacyAccessControlRole, RBACPermission +from apps.api.permissions import GrafanaAPIPermissions, LegacyAccessControlRole, RBACPermission from apps.schedules.ical_utils import ( get_cached_oncall_users_for_multiple_schedules, get_icalendar_tz_or_utc, @@ -138,7 +138,7 @@ def test_users_in_ical_rbac(make_organization_and_user, make_user_for_organizati # viewer doesn't yet have the required permission, they shouldn't be included assert len(users_in_ical(usernames, organization)) == 0 - viewer.permissions = [{"action": permission.value}] if permission else [] + viewer.permissions = GrafanaAPIPermissions.construct_permissions([permission.value]) if permission else [] viewer.save() assert users_in_ical(usernames, organization) == ([viewer] if included else []) diff --git a/engine/apps/slack/models/slack_team_identity.py b/engine/apps/slack/models/slack_team_identity.py index 79e603a742..ee73c006bb 100644 --- a/engine/apps/slack/models/slack_team_identity.py +++ b/engine/apps/slack/models/slack_team_identity.py @@ -14,18 +14,16 @@ SlackAPIInvalidAuthError, SlackAPITokenError, ) -from apps.user_management.models.user import User +from apps.user_management.models import Organization, User if typing.TYPE_CHECKING: from django.db.models.manager import RelatedManager - from apps.user_management.models import Organization - logger = logging.getLogger(__name__) class SlackTeamIdentity(models.Model): - organizations: "RelatedManager['Organization']" + organizations: "RelatedManager[Organization]" id = models.AutoField(primary_key=True) slack_id = models.CharField(max_length=100) @@ -141,13 +139,12 @@ def app_id(self): def needs_reinstall(self): return settings.UNIFIED_SLACK_APP_ENABLED and not self._unified_slack_app_installed - def get_users_from_slack_conversation_for_organization(self, channel_id, organization): + def get_users_from_slack_conversation_for_organization(self, channel_id: str, organization: Organization): sc = SlackClient(self) - members = self.get_conversation_members(sc, channel_id) - - return organization.users.filter( - slack_user_identity__slack_id__in=members, - **User.build_permissions_query(RBACPermission.Permissions.CHATOPS_WRITE, organization), + return User.objects.filter_by_permission( + RBACPermission.Permissions.CHATOPS_WRITE, + organization, + slack_user_identity__slack_id__in=self.get_conversation_members(sc, channel_id), ) def get_conversation_members(self, slack_client: SlackClient, channel_id: str): diff --git a/engine/apps/slack/models/slack_usergroup.py b/engine/apps/slack/models/slack_usergroup.py index 93d0341a57..b16a10f2b9 100644 --- a/engine/apps/slack/models/slack_usergroup.py +++ b/engine/apps/slack/models/slack_usergroup.py @@ -19,7 +19,7 @@ SlackAPIUsergroupPaidTeamOnlyError, ) from apps.slack.models import SlackTeamIdentity, SlackUserIdentity -from apps.user_management.models import User +from apps.user_management.models import Organization, User from common.public_primary_keys import generate_public_primary_key, increase_public_primary_key_length if typing.TYPE_CHECKING: @@ -140,10 +140,11 @@ def update_members(self, slack_ids: list[str]) -> None: self.save(update_fields=("members",)) logger.info(f"Saved cached memberlist for slack user group {self.slack_id}, members {slack_ids}") - def get_users_from_members_for_organization(self, organization): - return organization.users.filter( + def get_users_from_members_for_organization(self, organization: Organization): + return User.objects.filter_by_permission( + RBACPermission.Permissions.CHATOPS_WRITE, + organization, slack_user_identity__slack_id__in=self.members, - **User.build_permissions_query(RBACPermission.Permissions.CHATOPS_WRITE, organization), ) @classmethod diff --git a/engine/apps/user_management/models/user.py b/engine/apps/user_management/models/user.py index 278800ec2d..39531372e2 100644 --- a/engine/apps/user_management/models/user.py +++ b/engine/apps/user_management/models/user.py @@ -15,9 +15,11 @@ from emoji import demojize from apps.api.permissions import ( + GrafanaAPIPermissions, LegacyAccessControlCompatiblePermission, LegacyAccessControlRole, RBACPermission, + convert_oncall_permission_to_irm, user_is_authorized, ) from apps.google import utils as google_utils @@ -39,18 +41,6 @@ logger = logging.getLogger(__name__) -class PermissionsQuery(typing.TypedDict): - permissions__contains: typing.Dict - - -class PermissionsRegexQuery(typing.TypedDict): - permissions__regex: str - - -class RoleInQuery(typing.TypedDict): - role__in: typing.List[int] - - def generate_public_primary_key_for_user(): prefix = "U" new_public_primary_key = generate_public_primary_key(prefix) @@ -86,6 +76,44 @@ def filter(self, *args, **kwargs): def filter_with_deleted(self, *args, **kwargs): return super().filter(*args, **kwargs) + def filter_by_permission( + self, permission: LegacyAccessControlCompatiblePermission, organization: "Organization", *args, **kwargs + ): + """ + This method builds a filter query that is compatible with RBAC as well as legacy "basic" role based + authorization. If a permission is provided we simply do a regex search where the permission column + contains the permission value (need to use regex because the JSON contains method is not supported by sqlite). + + Additionally, if `organization.is_grafana_irm_enabled` is True, we convert the permission to the IRM version + when filtering. + + Lastly, if RBAC is not supported for the org, we make the assumption that we are looking for any users with AT + LEAST the fallback role. Ex: if the fallback role were editor than we would get editors and admins. + """ + if organization.is_rbac_permissions_enabled: + permission_value = ( + convert_oncall_permission_to_irm(permission) + if organization.is_grafana_irm_enabled + else permission.value + ) + + # https://stackoverflow.com/a/50251879 + if settings.DATABASE_TYPE == settings.DATABASE_TYPES.SQLITE3: + # contains is not supported on sqlite + # https://docs.djangoproject.com/en/4.2/topics/db/queries/#contains + query = Q(permissions__regex=re.escape(permission_value)) + else: + query = Q(permissions__contains=GrafanaAPIPermissions.construct_permissions([permission_value])) + else: + query = Q(role__lte=permission.fallback_role.value) + + return self.filter( + query, + *args, + **kwargs, + organization=organization, + ) + def delete(self): # is_active = None is used to be able to have multiple deleted users with the same user_id return super().update(is_active=None) @@ -341,28 +369,6 @@ def insight_logs_serialized(self): def insight_logs_metadata(self): return {} - @staticmethod - def build_permissions_query( - permission: LegacyAccessControlCompatiblePermission, organization - ) -> typing.Union[PermissionsQuery, PermissionsRegexQuery, RoleInQuery]: - """ - This method returns a django query filter that is compatible with RBAC - as well as legacy "basic" role based authorization. If a permission is provided we simply do - a regex search where the permission column contains the permission value (need to use regex because - the JSON contains method is not supported by sqlite) - - If RBAC is not supported for the org, we make the assumption that we are looking for any users with AT LEAST - the fallback role. Ex: if the fallback role were editor than we would get editors and admins. - """ - if organization.is_rbac_permissions_enabled: - # https://stackoverflow.com/a/50251879 - if settings.DATABASE_TYPE == settings.DATABASE_TYPES.SQLITE3: - # https://docs.djangoproject.com/en/4.2/topics/db/queries/#contains - return PermissionsRegexQuery(permissions__regex=re.escape(permission.value)) - required_permission = {"action": permission.value} - return PermissionsQuery(permissions__contains=[required_permission]) - return RoleInQuery(role__lte=permission.fallback_role.value) - def get_default_fallback_notification_policy(self) -> "UserNotificationPolicy": from apps.base.models import UserNotificationPolicy diff --git a/engine/apps/user_management/tests/test_user.py b/engine/apps/user_management/tests/test_user.py index 314f7f7f99..769fa44129 100644 --- a/engine/apps/user_management/tests/test_user.py +++ b/engine/apps/user_management/tests/test_user.py @@ -3,7 +3,7 @@ import pytest from django.utils import timezone -from apps.api.permissions import GrafanaAPIPermission, LegacyAccessControlRole, RBACPermission +from apps.api import permissions from apps.google import constants as google_constants from apps.google.models import GoogleOAuth2User from apps.user_management.models import User @@ -15,7 +15,7 @@ def test_self_or_has_user_settings_admin_permission(make_organization, make_user organization = make_organization(is_rbac_permissions_enabled=False) admin = make_user_for_organization(organization) second_admin = make_user_for_organization(organization) - editor = make_user_for_organization(organization, role=LegacyAccessControlRole.EDITOR) + editor = make_user_for_organization(organization, role=permissions.LegacyAccessControlRole.EDITOR) another_organization = make_organization(is_rbac_permissions_enabled=False) admin_from_another_organization = make_user_for_organization(another_organization) @@ -36,12 +36,14 @@ def test_self_or_has_user_settings_admin_permission(make_organization, make_user organization_with_rbac = make_organization(is_rbac_permissions_enabled=True) user_with_perms = make_user_for_organization( organization_with_rbac, - role=LegacyAccessControlRole.NONE, - permissions=[GrafanaAPIPermission(action=RBACPermission.Permissions.USER_SETTINGS_ADMIN.value)], + role=permissions.LegacyAccessControlRole.NONE, + permissions=permissions.GrafanaAPIPermissions.construct_permissions( + [permissions.RBACPermission.Permissions.USER_SETTINGS_ADMIN.value] + ), ) user_without_perms = make_user_for_organization( organization_with_rbac, - role=LegacyAccessControlRole.NONE, + role=permissions.LegacyAccessControlRole.NONE, permissions=[], ) @@ -69,8 +71,8 @@ def test_self_or_has_user_settings_admin_permission(make_organization, make_user def test_is_admin(make_organization, make_user_for_organization): # RBAC not enabled for org organization = make_organization(is_rbac_permissions_enabled=False) - admin = make_user_for_organization(organization, role=LegacyAccessControlRole.ADMIN) - editor = make_user_for_organization(organization, role=LegacyAccessControlRole.EDITOR) + admin = make_user_for_organization(organization, role=permissions.LegacyAccessControlRole.ADMIN) + editor = make_user_for_organization(organization, role=permissions.LegacyAccessControlRole.EDITOR) assert organization.is_rbac_permissions_enabled is False @@ -81,12 +83,14 @@ def test_is_admin(make_organization, make_user_for_organization): organization_with_rbac = make_organization(is_rbac_permissions_enabled=True) user_with_perms = make_user_for_organization( organization_with_rbac, - role=LegacyAccessControlRole.NONE, - permissions=[GrafanaAPIPermission(action=RBACPermission.Permissions.ADMIN.value)], + role=permissions.LegacyAccessControlRole.NONE, + permissions=permissions.GrafanaAPIPermissions.construct_permissions( + [permissions.RBACPermission.Permissions.ADMIN.value] + ), ) user_without_perms = make_user_for_organization( organization_with_rbac, - role=LegacyAccessControlRole.NONE, + role=permissions.LegacyAccessControlRole.NONE, permissions=[], ) @@ -193,7 +197,7 @@ def test_has_google_oauth2_connected(make_organization_and_user, make_google_oau @pytest.mark.django_db -def test_google_oauth2_token_is_missing_scopes(make_organization_and_user, make_google_oauth2_user_for_user): +def test_google_oauth2_token_is_missing_scopes(make_organization_and_user): initial_granted_scope = "foo bar baz" initial_oauth_response = { "access_token": "access", @@ -288,3 +292,53 @@ def test_reset_google_oauth2_settings(make_organization_and_user): assert GoogleOAuth2User.objects.filter(user=user).exists() is False assert user.google_calendar_settings is None + + +@pytest.mark.django_db +def test_filter_by_permission(make_organization, make_user_for_organization): + """ + Note that there are some conditions in `UserQuerySet.filter_by_permission` that're + specific to which database engine is being used. These cases are tested on CI where + we run the test against sqlite, mysql, and postgresql + """ + permission_to_test = permissions.RBACPermission.Permissions.ALERT_GROUPS_READ + user_permissions = permissions.GrafanaAPIPermissions.construct_permissions([permission_to_test.value]) + irm_permissions = permissions.GrafanaAPIPermissions.construct_permissions( + [permissions.convert_oncall_permission_to_irm(permission_to_test)] + ) + + org1_rbac = make_organization(is_rbac_permissions_enabled=True) + user1 = make_user_for_organization(org1_rbac, permissions=user_permissions) + user2 = make_user_for_organization(org1_rbac, permissions=user_permissions) + _ = make_user_for_organization(org1_rbac, permissions=[]) + + org2_rbac_irm = make_organization(is_rbac_permissions_enabled=True, is_grafana_irm_enabled=True) + user4 = make_user_for_organization(org2_rbac_irm, permissions=irm_permissions) + user5 = make_user_for_organization(org2_rbac_irm, permissions=irm_permissions) + _ = make_user_for_organization(org2_rbac_irm, permissions=[]) + + org3_no_rbac = make_organization(is_rbac_permissions_enabled=False) + user7 = make_user_for_organization(org3_no_rbac, role=permission_to_test.fallback_role) + user8 = make_user_for_organization(org3_no_rbac, role=permission_to_test.fallback_role) + _ = make_user_for_organization(org3_no_rbac, role=permissions.LegacyAccessControlRole.NONE) + + # rbac permissions enabled + users = User.objects.filter_by_permission(permission_to_test, org1_rbac) + + assert len(users) == 2 + assert user1 in users + assert user2 in users + + # rbac permissions + IRM enabled + users = User.objects.filter_by_permission(permission_to_test, org2_rbac_irm) + + assert len(users) == 2 + assert user4 in users + assert user5 in users + + # rbac permissions disabled + users = User.objects.filter_by_permission(permission_to_test, org3_no_rbac) + + assert len(users) == 2 + assert user7 in users + assert user8 in users