Skip to content

Commit

Permalink
Merge pull request RedHatInsights#1025 from MikelAlejoBR/RHCLOUD-2964…
Browse files Browse the repository at this point in the history
…1-users-should-see-service-account-group-info

RHCLOUD-29641 | RHCLOUD-29668 | RHCLOUD-29631 | feature: users should see service account group info
  • Loading branch information
petracihalova authored Feb 20, 2024
2 parents 6245f0c + 901cd49 commit a041745
Show file tree
Hide file tree
Showing 6 changed files with 673 additions and 31 deletions.
97 changes: 89 additions & 8 deletions rbac/management/principal/it_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,8 @@
"""Class to manage interactions with the IT service accounts service."""
import logging
import time
from typing import Tuple
import uuid
from typing import Optional, Tuple, Union

import requests
from django.conf import settings
Expand Down Expand Up @@ -72,24 +73,33 @@ def limit_offset_validation(offset, limit):
class ITService:
"""A class to handle interactions with the IT service."""

# Instance variable for the class.
_instance = None

def __new__(cls, *args, **kwargs):
"""Create a single instance of the class."""
if cls._instance is None:
cls._instance = super().__new__(cls, *args, **kwargs)

return cls._instance

def __init__(self):
"""Establish IT connection information."""
self.host = settings.IT_SERVICE_HOST
self.base_path = settings.IT_SERVICE_BASE_PATH
self.port = settings.IT_SERVICE_PORT
self.protocol = settings.IT_SERVICE_PROTOCOL_SCHEME
self.it_request_timeout = settings.IT_SERVICE_TIMEOUT_SECONDS
self.it_url = f"{self.protocol}://{self.host}:{self.port}{self.base_path}{IT_PATH_GET_SERVICE_ACCOUNTS}"

@it_request_all_service_accounts_time_tracking.time()
def request_service_accounts(self, bearer_token: str) -> list[dict]:
def request_service_accounts(self, bearer_token: str, client_ids: Optional[list[str]] = None) -> list[dict]:
"""Request the service accounts for a tenant and returns the entire list that IT has."""
# We cannot talk to IT if we don't have a bearer token.
if not bearer_token:
raise MissingAuthorizationError()

received_service_accounts: list[dict] = []
# Prepare the URL to fetch the service accounts.
url = f"{self.protocol}://{self.host}:{self.port}{self.base_path}{IT_PATH_GET_SERVICE_ACCOUNTS}"

# Attempt fetching all the service accounts for the tenant.
try:
Expand All @@ -98,11 +108,16 @@ def request_service_accounts(self, bearer_token: str) -> list[dict]:

# If the offset is zero, that means that we need to call the service at least once to get the first
# service accounts. If it equals the limit, that means that there are more pages to fetch.
parameters: dict[str, Union[int, list[str]]] = {"first": offset, "max": limit}
# If we were given client IDs to filter the collection with, do it!
if client_ids:
parameters["clientId"] = client_ids

while offset == 0 or offset == limit:
response = requests.get(
url=url,
url=self.it_url,
headers={"Authorization": f"Bearer {bearer_token}"},
params={"first": offset, "max": limit},
params=parameters,
timeout=self.it_request_timeout,
)

Expand Down Expand Up @@ -134,7 +149,7 @@ def request_service_accounts(self, bearer_token: str) -> list[dict]:
except requests.exceptions.ConnectionError as exception:
LOGGER.error(
"Unable to connect to IT to fetch the service accounts. Attempted URL %s with error: %s",
url,
self.it_url,
exception,
)

Expand All @@ -146,7 +161,7 @@ def request_service_accounts(self, bearer_token: str) -> list[dict]:
except requests.exceptions.Timeout as exception:
LOGGER.error(
"The connection to IT timed out when trying to fetch service accounts. Attempted URL %s with error: %s",
url,
self.it_url,
exception,
)

Expand All @@ -160,6 +175,49 @@ def request_service_accounts(self, bearer_token: str) -> list[dict]:

return service_accounts

def is_service_account_valid_by_client_id(self, user: User, service_account_client_id: str) -> bool:
"""Check if the specified service account is valid."""
if settings.IT_BYPASS_IT_CALLS:
return False

return self._is_service_account_valid(user=user, client_id=service_account_client_id)

def is_service_account_valid_by_username(self, user: User, service_account_username: str) -> bool:
"""Check if the specified service account is valid."""
# The usernames for the service accounts usually come in the form "service-account-${CLIENT_ID}". We don't need
# the prefix of the username to query IT, since they only accept client IDs to filter collections.
if settings.IT_BYPASS_IT_CALLS:
return True

if self.is_username_service_account(service_account_username):
client_id = service_account_username.replace("service-account-", "")
else:
client_id = service_account_username

return self._is_service_account_valid(user=user, client_id=client_id)

def _is_service_account_valid(self, user: User, client_id: str) -> bool:
"""Check if the provided client ID can be found in the tenant's organization by calling IT."""
if settings.IT_BYPASS_IT_CALLS:
return True
else:
service_accounts: list[dict] = self.request_service_accounts(
bearer_token=user.bearer_token,
client_ids=[client_id],
)

if len(service_accounts) == 0:
return False
elif len(service_accounts) == 1:
sa = service_accounts[0]
return client_id == sa.get("clientId")
else:
LOGGER.error(
f'unexpected number of service accounts received from IT. Wanted one with client ID "{client_id}",'
f" got {len(service_accounts)}: {service_accounts}"
)
return False

def get_service_accounts(self, user: User, options: dict = {}) -> Tuple[list[dict], int]:
"""Request and returns the service accounts for the given tenant."""
# We might want to bypass calls to the IT service on ephemeral or test environments.
Expand Down Expand Up @@ -320,6 +378,29 @@ def get_service_accounts_group(self, group: Group, user: User, options: dict = {
else:
return service_accounts

@staticmethod
def is_username_service_account(username: str) -> bool:
"""Check if the given username belongs to a service account."""
return username.startswith("service-account-")

@staticmethod
def extract_client_id_service_account_username(username: str) -> uuid.UUID:
"""Extract the client ID from the service account's username."""
# If it has the "service-account" prefix, we just need to strip it and return the rest of the username, which
# contains the client ID. Else, we have just received the client ID.
try:
if ITService.is_username_service_account(username=username):
return uuid.UUID(username.replace("service-account-", ""))
else:
return uuid.UUID(username)
except ValueError:
raise serializers.ValidationError(
{
"detail": "unable to extract the client ID from the service account's username because the"
" provided UUID is invalid"
}
)

def _transform_incoming_payload(self, service_account_from_it_service: dict) -> dict:
"""Transform the incoming service account from IT into a dict which fits our response structure."""
service_account: dict = {}
Expand Down
37 changes: 28 additions & 9 deletions rbac/management/querysets.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,14 @@
#
"""Queryset helpers for management module."""
from django.conf import settings
from django.db.models import QuerySet
from django.db.models.aggregates import Count
from django.urls import reverse
from django.utils.translation import gettext as _
from management.group.model import Group
from management.permissions.role_access import RoleAccessPermission
from management.policy.model import Policy
from management.principal.it_service import ITService
from management.role.model import Access, Role
from management.utils import (
APPLICATION_KEY,
Expand All @@ -36,6 +38,7 @@
roles_for_principal,
)
from rest_framework import permissions, serializers
from rest_framework.request import Request

from api.models import Tenant, User
from rbac.env import ENVIRONMENT
Expand Down Expand Up @@ -148,7 +151,7 @@ def annotate_roles_with_counts(queryset):
return queryset.annotate(policyCount=Count("policies", distinct=True), accessCount=Count("access", distinct=True))


def get_role_queryset(request):
def get_role_queryset(request) -> QuerySet:
"""Obtain the queryset for roles."""
scope = request.query_params.get(SCOPE_KEY, ACCOUNT_SCOPE)
public_tenant = Tenant.objects.get(tenant_name="public")
Expand Down Expand Up @@ -179,7 +182,7 @@ def get_role_queryset(request):
if settings.BYPASS_BOP_VERIFICATION:
is_org_admin = request.user.admin
else:
is_org_admin = get_admin_from_proxy(username, request)
is_org_admin = _check_user_username_is_org_admin(request=request, username=username)

request.user_from_query = User()
request.user_from_query.username = username
Expand Down Expand Up @@ -233,7 +236,7 @@ def get_policy_queryset(request):
return filter_queryset_by_tenant(Policy.objects.filter(uuid__in=access), request.tenant)


def get_access_queryset(request):
def get_access_queryset(request: Request) -> QuerySet:
"""Obtain the queryset for policies."""
required_parameters = [APPLICATION_KEY]
have_parameters = all(param in request.query_params for param in required_parameters)
Expand All @@ -250,7 +253,7 @@ def get_access_queryset(request):
if not username or settings.BYPASS_BOP_VERIFICATION:
is_org_admin = request.user.admin
else:
is_org_admin = get_admin_from_proxy(username, request)
is_org_admin = _check_user_username_is_org_admin(request=request, username=username)

return get_object_principal_queryset(
request,
Expand Down Expand Up @@ -283,13 +286,14 @@ def get_object_principal_queryset(request, scope, clazz, **kwargs):
return queryset_by_id(objects, clazz, **kwargs)


def _filter_admin_default(request, queryset):
def _filter_admin_default(request: Request, queryset: QuerySet):
"""Filter out admin default groups unless the principal is an org admin."""
username = request.query_params.get("username")
if not username or settings.BYPASS_BOP_VERIFICATION:
is_org_admin = request.user.admin
else:
is_org_admin = get_admin_from_proxy(username, request)
is_org_admin = _check_user_username_is_org_admin(request=request, username=username)

# If the principal is an org admin, make sure they get any and all admin_default groups
if is_org_admin:
public_tenant = Tenant.objects.get(tenant_name="public")
Expand All @@ -302,9 +306,24 @@ def _filter_admin_default(request, queryset):
return queryset


def _filter_default_groups(request, queryset):
def _filter_default_groups(request: Request, queryset: QuerySet) -> QuerySet:
"""Filter out default access group and admin default group."""
username = request.query_params.get("username")
exclude_username = request.query_params.get("exclude_username")
if exclude_username:

if (username and ITService.is_username_service_account(username=username)) or exclude_username:
return queryset.exclude(platform_default=True).exclude(admin_default=True)
return queryset
else:
return queryset


def _check_user_username_is_org_admin(request: Request, username: str) -> bool:
"""Check whether the given username is from a user that is an org admin or not.
Service Accounts are considered to not be organization admins, and regular user principals need to be checked using
the proxy.
"""
if ITService.is_username_service_account(username=username):
return False
else:
return get_admin_from_proxy(request=request, username=username)
47 changes: 36 additions & 11 deletions rbac/management/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
"""Helper utilities for management module."""
import json
import os
import uuid
from uuid import UUID

from django.conf import settings
Expand All @@ -25,11 +26,12 @@
from django.utils.translation import gettext as _
from management.models import Access, Group, Policy, Principal, Role
from management.permissions.principal_access import PrincipalAccessPermission
from management.principal.it_service import ITService
from management.principal.proxy import PrincipalProxy
from rest_framework import serializers, status
from rest_framework.request import Request

from api.models import CrossAccountRequest, Tenant, User

from api.models import CrossAccountRequest, Tenant

USERNAME_KEY = "username"
APPLICATION_KEY = "application"
Expand Down Expand Up @@ -66,24 +68,47 @@ def get_principal_from_request(request):
return get_principal(username, request, verify_principal=bool(qs_user), from_query=from_query)


def get_principal(username, request, verify_principal=True, from_query=False):
def get_principal(
username: str, request: Request, verify_principal: bool = True, from_query: bool = False
) -> Principal:
"""Get principals from username."""
# First check if principal exist on our side,
# if not call BOP to check if user exist in the account.
tenant = request.tenant
tenant: Tenant = request.tenant
try:
# If the username was provided through a query we must verify if it is an org admin from the BOP
# If the username was provided through a query we must verify if it exists in the corresponding services first.
if from_query:
verify_principal_with_proxy(username, request, verify_principal=verify_principal)
if ITService.is_username_service_account(username=username):
it_service = ITService()
if not it_service.is_service_account_valid_by_username(
user=request.user, service_account_username=username
):
raise serializers.ValidationError(
{"detail": f"No data found for service account with username '{username}'"}
)
else:
verify_principal_with_proxy(username, request, verify_principal=verify_principal)
principal = Principal.objects.get(username__iexact=username, tenant=tenant)
except Principal.DoesNotExist:
verify_principal_with_proxy(username, request, verify_principal=verify_principal)
# If the "from query" parameter was specified, the username was validated above, so there is no need to
# validate it again.
if not from_query:
if ITService.is_username_service_account(username):
it_service = ITService()
if not it_service.is_service_account_valid_by_username(
user=request.user, service_account_username=username
):
raise serializers.ValidationError(
{"detail": f"No data found for service account with username '{username}'"}
)
else:
verify_principal_with_proxy(username, request, verify_principal=verify_principal)

if ITService.is_username_service_account(username=username):
client_id: uuid.UUID = ITService.extract_client_id_service_account_username(username)

# In the case that the user that made the request was a service account, store it accordingly in the database.
user: User = request.user
if user and user.is_service_account:
principal, _ = Principal.objects.get_or_create(
username=user.username, tenant=tenant, type=SERVICE_ACCOUNT_KEY, service_account_id=user.client_id
username=username, tenant=tenant, type=SERVICE_ACCOUNT_KEY, service_account_id=client_id
)
else:
# Avoid possible race condition if the user was created while checking BOP
Expand Down
Loading

0 comments on commit a041745

Please sign in to comment.