Skip to content

Commit

Permalink
Merge pull request RedHatInsights#1038 from MikelAlejoBR/RHCLOUD-3098…
Browse files Browse the repository at this point in the history
…6-service-accounts-same-as-user-principals

RHCLOUD-30986 | feature: principals with User Access Admin role should manage groups
  • Loading branch information
petracihalova authored Mar 22, 2024
2 parents 0d135a6 + 3c10703 commit bb0adcd
Show file tree
Hide file tree
Showing 9 changed files with 2,081 additions and 95 deletions.
12 changes: 6 additions & 6 deletions rbac/management/group/definer.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@

from api.models import Tenant

USER_ACCESS_ADMINISTRATOR_ROLE_KEY = "User Access administrator"
logger = logging.getLogger(__name__) # pylint: disable=invalid-name


Expand Down Expand Up @@ -133,12 +134,11 @@ def add_roles(group, roles_or_role_ids, tenant, user=None):
Q(tenant=tenant) | Q(tenant=Tenant.objects.get(tenant_name="public")), name__in=role_names
)
for role in roles:
accesses = role.access.all()
for access in accesses:
if access.permission_application() == "rbac" and user and not user.admin:
key = "add-roles"
message = f"Non-admin users cannot add RBAC role {role.display_name} to groups."
raise serializers.ValidationError({key: _(message)})
# Only Organization administrators are allowed to add 'User Access Administrator' role into a group.
if role.name == USER_ACCESS_ADMINISTRATOR_ROLE_KEY and user and not user.admin:
key = "add-roles"
message = f"Non-admin users cannot add '{USER_ACCESS_ADMINISTRATOR_ROLE_KEY}' role to groups."
raise serializers.ValidationError({key: _(message)})
# Only add the role if it was not attached
if not system_policy.roles.filter(pk=role.pk).exists():
system_policy.roles.add(role)
Expand Down
118 changes: 47 additions & 71 deletions rbac/management/group/view.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@

import requests
from django.conf import settings
from django.db import connection
from django.db import transaction
from django.db.models import Q
from django.db.models.aggregates import Count
Expand All @@ -31,7 +30,12 @@
from management.authorization.scope_claims import ScopeClaims
from management.authorization.token_validator import ITSSOTokenValidator
from management.filters import CommonFilters
from management.group.definer import add_roles, remove_roles, set_system_flag_before_update
from management.group.definer import (
USER_ACCESS_ADMINISTRATOR_ROLE_KEY,
add_roles,
remove_roles,
set_system_flag_before_update,
)
from management.group.model import Group
from management.group.serializer import (
GroupInputSerializer,
Expand Down Expand Up @@ -205,6 +209,19 @@ def protect_default_admin_group_roles(self, group):
error = {"group": [_("Default admin access cannot be modified.")]}
raise serializers.ValidationError(error)

def protect_group_with_user_access_admin_role(self, roles, source_key):
"""Disallow group with 'User Access administrator' role from being updated."""
# Only organization administrators are allowed to create, modify, or delete a group
# that is assigned the User Access administrator role.
for role in roles:
if role.name == USER_ACCESS_ADMINISTRATOR_ROLE_KEY:
key = source_key
message = (
"Non-admin users are not allowed to create, modify, or delete a group that is assigned the "
f"'{USER_ACCESS_ADMINISTRATOR_ROLE_KEY}' role."
)
raise serializers.ValidationError({key: _(message)})

def create(self, request, *args, **kwargs):
"""Create a group.
Expand Down Expand Up @@ -337,6 +354,9 @@ def destroy(self, request, *args, **kwargs):
validate_uuid(kwargs.get("uuid"), "group uuid validation")
self.protect_system_groups("delete")
group = self.get_object()
if not request.user.admin:
self.protect_group_with_user_access_admin_role(group.roles_with_access(), "remove_group")

response = super().destroy(request=request, args=args, kwargs=kwargs)
if response.status_code == status.HTTP_204_NO_CONTENT:
group_obj_change_notification_handler(request.user, group, "deleted")
Expand All @@ -345,7 +365,7 @@ def destroy(self, request, *args, **kwargs):
def update(self, request, *args, **kwargs):
"""Update a group.
@api {post} /api/v1/groups/:uuid Update a group
@api {put} /api/v1/groups/:uuid Update a group
@apiName updateGroup
@apiGroup Group
@apiVersion 1.0.0
Expand All @@ -366,6 +386,11 @@ def update(self, request, *args, **kwargs):
"""
validate_uuid(kwargs.get("uuid"), "group uuid validation")
self.protect_system_groups("update")

group = self.get_object()
if not request.user.admin:
self.protect_group_with_user_access_admin_role(group.roles_with_access(), "update_group")

return super().update(request=request, args=args, kwargs=kwargs)

def add_principals(self, group, principals, account=None, org_id=None):
Expand Down Expand Up @@ -435,8 +460,6 @@ def add_service_accounts(
# Fetch the service account from our database to add it to the group. If it doesn't exist, we create
# it.
for specified_sa in service_accounts:
self.user_has_permission_act_on_service_account(user=user, service_account=specified_sa)

client_id = specified_sa["clientID"]
try:
principal = Principal.objects.get(
Expand Down Expand Up @@ -590,14 +613,12 @@ def principals(self, request: Request, uuid: Optional[UUID] = None):
account = self.request.user.account
org_id = self.request.user.org_id
if request.method == "POST":
# Make sure that system groups are kept unmodified.
self.protect_system_groups("add principals")

if not request.user.admin:
for role in group.roles_with_access():
for access in role.access.all():
if access.permission_application() == "rbac":
key = "add_principals"
message = "Non-admin users may not add principals to Groups with RBAC permissions."
raise serializers.ValidationError({key: _(message)})
self.protect_group_with_user_access_admin_role(group.roles_with_access(), "add_principals")

serializer = GroupPrincipalInputSerializer(data=request.data)

# Serialize the payload and validate that it is correct.
Expand Down Expand Up @@ -828,6 +849,9 @@ def principals(self, request: Request, uuid: Optional[UUID] = None):
else:
self.protect_system_groups("remove principals")

if not request.user.admin:
self.protect_group_with_user_access_admin_role(group.roles_with_access(), "remove_principals")

if SERVICE_ACCOUNTS_KEY not in request.query_params and USERNAMES_KEY not in request.query_params:
key = "detail"
message = "Query parameter {} or {} is required.".format(SERVICE_ACCOUNTS_KEY, USERNAMES_KEY)
Expand Down Expand Up @@ -875,7 +899,10 @@ def principals(self, request: Request, uuid: Optional[UUID] = None):
},
)

return Response(status=status.HTTP_204_NO_CONTENT)
# Create a default and successful response object. If no user principals are to be removed below, this
# response will be returned. Else, it will be overridden with whichever response the user removal
# generates.
response = Response(status=status.HTTP_204_NO_CONTENT)

# Remove the users from the group too.
if USERNAMES_KEY in request.query_params:
Expand Down Expand Up @@ -979,6 +1006,10 @@ def roles(self, request, uuid=None, principals=None):
group = self.get_object()
if request.method == "POST":
self.protect_default_admin_group_roles(group)

if not request.user.admin:
self.protect_group_with_user_access_admin_role(group.roles_with_access(), "add_role")

serializer = GroupRoleSerializerIn(data=request.data)
if serializer.is_valid(raise_exception=True):
roles = request.data.pop(ROLES_KEY, [])
Expand All @@ -992,6 +1023,10 @@ def roles(self, request, uuid=None, principals=None):
return self.get_paginated_response(serializer.data)
else:
self.protect_default_admin_group_roles(group)

if not request.user.admin:
self.protect_group_with_user_access_admin_role(group.roles_with_access(), "remove_role")

if ROLES_KEY not in request.query_params:
key = "detail"
message = "Query parameter {} is required.".format(ROLES_KEY)
Expand Down Expand Up @@ -1114,62 +1149,3 @@ def remove_service_accounts(
)
for username in service_accounts:
group_principal_change_notification_handler(self.request.user, group, username, "removed")

def user_has_permission_act_on_service_account(self, user: User, service_account: dict = {}):
"""Check if the user has permission to create or delete the service account.
Only org admins, users with the "User Access administrator" or the owner of the service account can create or
remove service accounts.
"""
if settings.IT_BYPASS_PERMISSIONS_MODIFY_SERVICE_ACCOUNTS:
return

# Is the user an organization administrator?
is_organization_admin: bool = user.admin

# Is the user the owner of the service account?
is_user_owner: bool = False

owner = service_account.get("owner")
if owner:
is_user_owner = user.username == owner

# Check if the user has the "User Access administrator" permission. Leaving the RAW query here
username: str = user.username # type: ignore
query = (
"SELECT EXISTS ( "
"SELECT "
"1 "
"FROM "
'"management_principal" AS "mp" '
"INNER JOIN "
'"management_group_principals" AS "mgp" ON "mgp"."principal_id" = "mp"."id" '
"INNER JOIN "
'"management_policy" AS "mpolicy" ON "mpolicy"."group_id" = "mgp"."group_id" '
"INNER JOIN "
'"management_policy_roles" AS "mpr" ON "mpr"."policy_id" = "mpolicy"."id" '
"INNER JOIN "
'"management_role" AS "mr" ON "mr"."id" = "mpr"."role_id" '
"WHERE "
'"mp"."username" = %s '
"AND "
"mr.\"name\" = 'User Access administrator' "
"LIMIT 1 "
') AS "user_has_user_access_administrator_permission"'
)

with connection.cursor() as cursor:
cursor.execute(query, [username])

row: tuple = cursor.fetchone()

user_has_user_access_administrator_permission = row[0]

if (not is_organization_admin) and (not user_has_user_access_administrator_permission) and (not is_user_owner):
logger.debug(
f"User {user} was denied altering service account {service_account} due to insufficient privileges."
)

raise InsufficientPrivilegesError(
f"Unable to alter service account {service_account} due to insufficient privileges."
)
16 changes: 15 additions & 1 deletion rbac/management/permissions/group_access.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,18 @@
from django.urls import reverse
from management.permissions.utils import is_scope_principal
from rest_framework import permissions
from rest_framework.request import Request

from rbac.env import ENVIRONMENT

# Allowed methods to be able to modify principals from a group.
ALLOWED_METHODS = ["DELETE", "POST"]


class GroupAccessPermission(permissions.BasePermission):
"""Determines if a user has access to Group APIs."""

def has_permission(self, request, view):
def has_permission(self, request: Request, view):
"""Check permission based on the defined access."""
if ENVIRONMENT.get_value("ALLOW_ANY", default=False, cast=bool):
return True
Expand All @@ -43,6 +47,16 @@ def has_permission(self, request, view):
return True
else:
group_write = request.user.access.get("group", {}).get("write", [])

# In the case that group principals are trying to be modified, check that the user or the service account
# has the proper "User Access Administrator" rights.
if request.method in ALLOWED_METHODS and view.basename == "group" and view.action == "principals":
principal_write = request.user.access.get("principal", {}).get("write", [])
if group_write and principal_write:
return True
else:
return False

if group_write:
return True

Expand Down
2 changes: 1 addition & 1 deletion rbac/management/role/definitions/rbac_local_test.json
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
{
"roles": [
{
"name": "RBAC Administrator Local Test",
"name": "User Access administrator",
"description": "Grants a non-org admin full access to configure and manage user access to services hosted on cloud.redhat.com. This role can only be viewed and assigned by Organization Administrators.",
"system": true,
"version": 3,
Expand Down
2 changes: 1 addition & 1 deletion tests/identity_request.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ def _create_service_account_data(cls) -> dict[str, str]:
def _create_request_context(
cls,
customer_data: dict[str, str],
user_data: dict[str, str],
user_data: dict[str, str] = None,
is_org_admin: bool = True,
is_internal: bool = False,
cross_account: bool = False,
Expand Down
4 changes: 2 additions & 2 deletions tests/management/group/test_definer.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ def test_default_group_seeding_reassign_roles(self, send_kafka_message):
new_platform_role = Role.objects.create(
name="new_platform_role", platform_default=True, tenant=self.public_tenant
)
role_to_remove = Role.objects.get(name="RBAC Administrator Local Test")
role_to_remove = Role.objects.get(name="User Access administrator")
with self.settings(NOTIFICATIONS_RH_ENABLED=True, NOTIFICATIONS_ENABLED=True):
try:
seed_group()
Expand Down Expand Up @@ -147,7 +147,7 @@ def test_default_group_seeding_reassign_roles(self, send_kafka_message):
def modify_default_group(self, system=True):
"""Add a role to the default group and/or change the system flag"""
group = Group.objects.get(platform_default=True)
roles = Role.objects.filter(name="RBAC Administrator Local Test")
roles = Role.objects.filter(name="User Access administrator")
add_roles(group, roles, self.public_tenant)

group.system = system
Expand Down
Loading

0 comments on commit bb0adcd

Please sign in to comment.