Skip to content

Commit

Permalink
feat: update RBAC permissioning to support grafana-irm-app (#5149)
Browse files Browse the repository at this point in the history
# What this PR does

Closes grafana/irm#31 (and supersedes
#4784)

Main changes:
- updates `apps.api.permissions.user_is_authorized` to check the value
of `organization.is_grafana_irm_enabled`. If it is, we check for the
presence of `grafana-irm-app` prefixed RBAC permissions rather than
`grafana-oncall-app`
- cleans-up `engine/apps/api/tests/test_permissions.py` (bulk of the
changes in the PR)
- converts `apps.user_management.models.User.build_permissions_query` to
a `UserQuerySet` method instead
  - means we can now do things like this instead:
  ```python3

User.objects.filter_by_permission(RBACPermission.Permissions.NOTIFICATIONS_READ,
organization)
  ```

## Checklist

- [x] Unit, integration, and e2e (if applicable) tests updated
- [x] Documentation added (or `pr:no public docs` PR label added if not
required)
- [x] Added the relevant release notes label (see labels prefixed w/
`release:`). These labels dictate how your PR will
    show up in the autogenerated release notes.
  • Loading branch information
joeyorlando authored Oct 10, 2024
1 parent f35f66a commit 04ab676
Show file tree
Hide file tree
Showing 13 changed files with 879 additions and 380 deletions.
75 changes: 59 additions & 16 deletions engine/apps/api/permissions.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
from common.utils import getattrd

if typing.TYPE_CHECKING:
from apps.user_management.models import User
from apps.user_management.models import Organization, User

RBAC_PERMISSIONS_ATTR = "rbac_permissions"
RBAC_OBJECT_PERMISSIONS_ATTR = "rbac_object_permissions"
Expand Down Expand Up @@ -50,6 +50,12 @@ class GrafanaAPIPermission(typing.TypedDict):
action: str


class GrafanaAPIPermissions:
@classmethod
def construct_permissions(cls, actions: typing.List[str]) -> typing.List[GrafanaAPIPermission]:
return [GrafanaAPIPermission(action=action) for action in actions]


class Resources(enum.Enum):
ALERT_GROUPS = "alert-groups"
INTEGRATIONS = "integrations"
Expand Down Expand Up @@ -103,6 +109,9 @@ def __init__(
self.value = f"{prefix}.{resource.value}:{action.value}"
self.fallback_role = fallback_role

def user_has_permission(self, user: "User") -> bool:
return user_is_authorized(user, [self])


LegacyAccessControlCompatiblePermissions = typing.List[LegacyAccessControlCompatiblePermission]
RBACPermissionsAttribute = typing.Dict[str, LegacyAccessControlCompatiblePermissions]
Expand All @@ -126,6 +135,36 @@ def get_most_authorized_role(permissions: LegacyAccessControlCompatiblePermissio
return min({p.fallback_role for p in permissions}, key=lambda r: r.value)


def convert_oncall_permission_to_irm(permission: LegacyAccessControlCompatiblePermission) -> str:
return permission.value.replace(PluginID.ONCALL, PluginID.IRM)


def get_required_permission_values(
organization: "Organization", required_permissions: LegacyAccessControlCompatiblePermissions
) -> typing.List[str]:
"""
This function returns a list of required permission values, taking into account whether or not the organization
is using the IRM plugin.
If the IRM plugin is being used, we substitue `grafana-oncall-app` with `grafana-irm-app`
as the RBAC permission prefix.
"""
permission_values = []

for permission in required_permissions:
permission_value = permission.value
if permission_value.startswith(PluginID.ONCALL) and organization.is_grafana_irm_enabled:
permission_values.append(convert_oncall_permission_to_irm(permission))
else:
permission_values.append(permission_value)

return permission_values


def user_has_minimum_required_basic_role(user: "User", required_basic_role: LegacyAccessControlRole) -> bool:
return user.role <= required_basic_role.value


def user_is_authorized(user: "User", required_permissions: LegacyAccessControlCompatiblePermissions) -> bool:
"""
This function checks whether `user` has all necessary permissions specified in `required_permissions`.
Expand All @@ -134,11 +173,12 @@ def user_is_authorized(user: "User", required_permissions: LegacyAccessControlCo
`user` - The user to check permissions for
`required_permissions` - A list of permissions that a user must have to be considered authorized
"""
if user.organization.is_rbac_permissions_enabled:
organization = user.organization
if organization.is_rbac_permissions_enabled:
user_permissions = [u["action"] for u in user.permissions]
required_permission_values = [p.value for p in required_permissions]
required_permission_values = get_required_permission_values(organization, required_permissions)
return all(permission in user_permissions for permission in required_permission_values)
return user.role <= get_most_authorized_role(required_permissions).value
return user_has_minimum_required_basic_role(user, get_most_authorized_role(required_permissions))


class RBACPermission(permissions.BasePermission):
Expand Down Expand Up @@ -310,20 +350,23 @@ def has_object_permission(self, request: AuthenticatedRequest, view: ViewSetOrAP


ALL_PERMISSION_NAMES = [perm for perm in dir(RBACPermission.Permissions) if not perm.startswith("_")]
ALL_PERMISSION_CLASSES = [
ALL_PERMISSION_CLASSES: LegacyAccessControlCompatiblePermissions = [
getattr(RBACPermission.Permissions, permission_name) for permission_name in ALL_PERMISSION_NAMES
]
ALL_PERMISSION_CHOICES = [
(permission_class.value, permission_name)
for permission_class, permission_name in zip(ALL_PERMISSION_CLASSES, ALL_PERMISSION_NAMES)
]


def get_permission_from_permission_string(perm: str) -> typing.Optional[LegacyAccessControlCompatiblePermission]:
for permission_class in ALL_PERMISSION_CLASSES:
if permission_class.value == perm:
return permission_class
return None
ALL_PERMISSION_CHOICES: typing.List[typing.Tuple[str, str]] = []
for permission_class, permission_name in zip(ALL_PERMISSION_CLASSES, ALL_PERMISSION_NAMES):
ALL_PERMISSION_CHOICES += [
(permission_class.value, permission_name),
(convert_oncall_permission_to_irm(permission_class), permission_name),
]
ALL_PERMISSION_NAME_TO_CLASS_MAP: typing.Dict[str, LegacyAccessControlCompatiblePermission] = {}
for permission_class in ALL_PERMISSION_CLASSES:
ALL_PERMISSION_NAME_TO_CLASS_MAP.update(
{
permission_class.value: permission_class,
convert_oncall_permission_to_irm(permission_class): permission_class,
}
)


class IsOwner(permissions.BasePermission):
Expand Down
Loading

0 comments on commit 04ab676

Please sign in to comment.