Skip to content

Commit

Permalink
[RHCLOUD-37152] Add locks on Roles, CAR and groups in migrator (RedHa…
Browse files Browse the repository at this point in the history
…tInsights#1441)

* Lock group and cross account objects in migrator

* Use dual write handler for role in role migrator

* Extend scope for transaction for groups in migration and lock groups

* Add prepare prepare_for_update for role migration

* Don't require to run this in maintenance mode

* Extend scope for transaction in group principal removals

* Remove unnecessary condition

* Use primary keys for role and group queries
  • Loading branch information
lpichler authored Jan 22, 2025
1 parent 35d0da3 commit 8262ac5
Show file tree
Hide file tree
Showing 4 changed files with 143 additions and 119 deletions.
22 changes: 11 additions & 11 deletions rbac/management/group/view.py
Original file line number Diff line number Diff line change
Expand Up @@ -185,10 +185,10 @@ class GroupViewSet(

def get_queryset(self):
"""Obtain queryset for requesting user based on access."""
add_principals_method = self.action == "principals" and self.request.method == "POST"
principals_method = self.action == "principals" and (self.request.method != "GET")
destroy_method = self.action == "destroy"

if add_principals_method or destroy_method:
if principals_method or destroy_method:
# In this case, the group must be locked to prevent principal changes during deletion.
# If not locked, replication to relations may be out of sync due to phantom reads.
# We have to modify the starting queryset to support locking because
Expand Down Expand Up @@ -937,19 +937,19 @@ def principals(self, request: Request, uuid: Optional[UUID] = None):
page = self.paginate_queryset(resp.get("data"))
response = self.get_paginated_response(page)
else:
group = self.get_object()
with transaction.atomic():
group = self.get_object()

self.protect_system_groups("remove principals")
self.protect_system_groups("remove principals")

if not request.user.admin:
self.protect_group_with_user_access_admin_role(group.roles_with_access(), "remove_principals")
if not request.user.admin:
self.protect_group_with_user_access_admin_role(group.roles_with_access(), "remove_principals")

if SERVICE_ACCOUNTS_KEY not in request.query_params and USERNAMES_KEY not in request.query_params:
key = "detail"
message = "Query parameter {} or {} is required.".format(SERVICE_ACCOUNTS_KEY, USERNAMES_KEY)
raise serializers.ValidationError({key: _(message)})
if SERVICE_ACCOUNTS_KEY not in request.query_params and USERNAMES_KEY not in request.query_params:
key = "detail"
message = "Query parameter {} or {} is required.".format(SERVICE_ACCOUNTS_KEY, USERNAMES_KEY)
raise serializers.ValidationError({key: _(message)})

with transaction.atomic():
service_accounts_to_remove = []
# Remove the service accounts from the group.
if SERVICE_ACCOUNTS_KEY in request.query_params:
Expand Down
2 changes: 1 addition & 1 deletion rbac/management/role/relation_api_dual_write_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
from management.relation_replicator.relation_replicator import ReplicationEvent
from management.relation_replicator.relation_replicator import ReplicationEventType
from management.role.model import BindingMapping, Role
from migration_tool.migrate import migrate_role
from migration_tool.migrate_role import migrate_role
from migration_tool.sharedSystemRolesReplicatedRoleBindings import v1_perm_to_v2_perm
from migration_tool.utils import create_relationship

Expand Down
160 changes: 53 additions & 107 deletions rbac/migration_tool/migrate.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,140 +16,80 @@
"""

import logging
from typing import Iterable

from django.conf import settings
from django.db import transaction
from kessel.relations.v1beta1 import common_pb2
from management.group.relation_api_dual_write_group_handler import RelationApiDualWriteGroupHandler
from management.models import Workspace
from management.models import Group
from management.principal.model import Principal
from management.relation_replicator.logging_replicator import LoggingReplicator
from management.relation_replicator.outbox_replicator import OutboxReplicator
from management.relation_replicator.relation_replicator import (
PartitionKey,
RelationReplicator,
ReplicationEvent,
ReplicationEventType,
)
from management.relation_replicator.relations_api_replicator import RelationsApiReplicator
from management.role.model import BindingMapping, Role
from migration_tool.models import V2rolebinding
from migration_tool.sharedSystemRolesReplicatedRoleBindings import v1_role_to_v2_bindings
from migration_tool.utils import create_relationship
from management.role.model import Role
from management.role.relation_api_dual_write_handler import RelationApiDualWriteHandler

from api.cross_access.relation_api_dual_write_cross_access_handler import RelationApiDualWriteCrossAccessHandler
from api.models import CrossAccountRequest, Tenant

logger = logging.getLogger(__name__) # pylint: disable=invalid-name


def get_kessel_relation_tuples(
v2_role_bindings: Iterable[V2rolebinding],
default_workspace: Workspace,
) -> list[common_pb2.Relationship]:
"""Generate a set of relationships and BindingMappings for the given set of v2 role bindings."""
relationships: list[common_pb2.Relationship] = list()

for v2_role_binding in v2_role_bindings:
relationships.extend(v2_role_binding.as_tuples())

bound_resource = v2_role_binding.resource

# Is this a workspace binding, but not to the root workspace?
# If so, ensure this workspace is a child of the root workspace.
# All other resource-resource or resource-workspace relations
# which may be implied or necessary are intentionally ignored.
# These should come from the apps that own the resource.
if bound_resource.resource_type == ("rbac", "workspace") and not bound_resource.resource_id == str(
default_workspace.id
):
# This is not strictly necessary here and the relation may be a duplicate.
# Once we have more Workspace API / Inventory Group migration progress,
# this block can and probably should be removed.
# One of those APIs will add it themselves.
relationships.append(
create_relationship(
bound_resource.resource_type,
bound_resource.resource_id,
("rbac", "workspace"),
str(default_workspace.id),
"parent",
)
)

return relationships


def migrate_role(
role: Role,
default_workspace: Workspace,
current_bindings: Iterable[BindingMapping] = [],
) -> tuple[list[common_pb2.Relationship], list[BindingMapping]]:
"""
Migrate a role from v1 to v2, returning the tuples and mappings.
The mappings are returned so that we can reconstitute the corresponding tuples for a given role.
This is needed so we can remove those tuples when the role changes if needed.
"""
v2_role_bindings = v1_role_to_v2_bindings(role, default_workspace, current_bindings)
relationships = get_kessel_relation_tuples([m.get_role_binding() for m in v2_role_bindings], default_workspace)
return relationships, v2_role_bindings


def migrate_groups_for_tenant(tenant: Tenant, replicator: RelationReplicator):
"""Generate user relationships and system role assignments for groups in a tenant."""
groups = tenant.group_set.all()
groups = tenant.group_set.only("pk").values("pk")
for group in groups:
principals: list[Principal] = []
system_roles: list[Role] = []
if not group.platform_default:
principals = group.principals.all()
if group.system is False and group.admin_default is False:
system_roles = group.roles().public_tenant_only()
if any(True for _ in system_roles) or any(True for _ in principals):
# The migrator does not generally deal with concurrency control,
# but we require an atomic block due to use of select_for_update in the dual write handler.
with transaction.atomic():
# The migrator deals with concurrency control.
# We need an atomic block because the select_for_update is used in the dual write handler,
# and the group must be locked to add principals to the groups.
# NOTE: The lock on the group is not necessary when adding system roles to the group,
# as the binding mappings are locked during this process to ensure concurrency control.
# Start of transaction for group operations
with transaction.atomic():
# Requery the group with a lock
group = Group.objects.select_for_update().get(pk=group["pk"])
principals: list[Principal] = []
system_roles: list[Role] = []
if not group.platform_default:
principals = group.principals.all()
if group.system is False and group.admin_default is False:
system_roles = group.roles().public_tenant_only()
if any(True for _ in system_roles) or any(True for _ in principals):
dual_write_handler = RelationApiDualWriteGroupHandler(
group, ReplicationEventType.MIGRATE_TENANT_GROUPS, replicator=replicator
)
# this operation requires lock on group as well as in view,
# more details in GroupViewSet#get_queryset method which is used to add principals.
dual_write_handler.generate_relations_to_add_principals(principals)
# lock on group is not required to add system role, only binding mappings which is included in
# dual_write_handler
dual_write_handler.generate_relations_to_add_roles(system_roles)
dual_write_handler.replicate()
# End of transaction for group operations, locks are released


def migrate_roles_for_tenant(tenant, exclude_apps, replicator):
"""Migrate all roles for a given tenant."""
default_workspace = Workspace.objects.get(type=Workspace.Types.DEFAULT, tenant=tenant)

roles = tenant.role_set.all()
roles = tenant.role_set.only("pk")
if exclude_apps:
roles = roles.exclude(access__permission__application__in=exclude_apps)

for role in roles:
logger.info(f"Migrating role: {role.name} with UUID {role.uuid}.")

tuples, mappings = migrate_role(role, default_workspace)

# Conflicts are not ignored in order to prevent this from
# accidentally running concurrently with dual-writes.
# If migration should be rerun, then the bindings table should be dropped.
# If changing this to allow updates,
# always ensure writes are paused before running.
# This must always be the case, but this should at least start failing you if you forget.
BindingMapping.objects.bulk_create(mappings, ignore_conflicts=False)

replicator.replicate(
ReplicationEvent(
event_type=ReplicationEventType.MIGRATE_CUSTOM_ROLE,
info={"role_uuid": str(role.uuid)},
partition_key=PartitionKey.byEnvironment(),
add=tuples,
role_pks = roles.values_list("pk", flat=True)
for role in role_pks:
# The migrator deals with concurrency control and roles needs to be locked.
with transaction.atomic():
# Requery and lock role
role = Role.objects.select_for_update().get(pk=role)
logger.info(f"Migrating role: {role.name} with UUID {role.uuid}.")
dual_write_handler = RelationApiDualWriteHandler(
role, ReplicationEventType.MIGRATE_CUSTOM_ROLE, replicator
)
)

dual_write_handler.prepare_for_update()
dual_write_handler.replicate_new_or_updated_role(role)
# End of transaction, locks on role is released.
logger.info(f"Migration completed for role: {role.name} with UUID {role.uuid}.")

logger.info(f"Migrated {roles.count()} roles for tenant: {tenant.org_id}")


Expand All @@ -171,32 +111,38 @@ def migrate_data_for_tenant(tenant: Tenant, exclude_apps: list, replicator: Rela
logger.info("Finished relations of cross account requests.")


# The migrator does not generally deal with concurrency control,
# but we require an atomic block due to use of select_for_update in the dual write handler.
def migrate_cross_account_requests(tenant: Tenant, replicator: RelationReplicator):
"""Migrate approved account requests."""
cross_account_requests = CrossAccountRequest.objects.filter(status="approved", target_org=tenant.org_id)
for cross_account_request in cross_account_requests:
# The migrator deals with concurrency control.
# We need an atomic block because the select_for_update is used in the dual write handler,
# and cross account request must be locked to add roles.
# Start of transaction for approved cross account request and "add roles" operation
with transaction.atomic():
# Lock cross account request
cross_account_request = CrossAccountRequest.objects.select_for_update().get(pk=cross_account_request.pk)
cross_account_roles = cross_account_request.roles.all()
if any(True for _ in cross_account_roles):
dual_write_handler = RelationApiDualWriteCrossAccessHandler(
cross_account_request, ReplicationEventType.MIGRATE_CROSS_ACCOUNT_REQUEST, replicator
)
# This operation requires lock on cross account request as is done
# in CrossAccountRequestViewSet#get_queryset
# This also locks binding mapping if exists for passed system roles.
dual_write_handler.generate_relations_to_add_roles(cross_account_request.roles.all())
dual_write_handler.replicate()
# End of transaction for approved cross account request and its add role operation
# Locks on cross account request and eventually on default workspace are released.
# Default workspace is locked when related binding mapping did not exist yet
# (Considering the position of this algorithm,the binding mappings for system roles should already exist,
# as they are tied to the system roles.)


def migrate_data(
exclude_apps: list = [], orgs: list = [], write_relationships: str = "False", skip_roles: bool = False
):
"""Migrate all data for all tenants."""
# Only run this in maintanence mode or
# if we don't write relationships (testing out the migration and clean up the created bindingmappings)
if not settings.READ_ONLY_API_MODE and write_relationships != "False":
logger.fatal("Read-only API mode is required. READ_ONLY_API_MODE must be set to true.")
return

count = 0
tenants = Tenant.objects.filter(ready=True).exclude(tenant_name="public")
replicator = _get_replicator(write_relationships)
Expand Down
78 changes: 78 additions & 0 deletions rbac/migration_tool/migrate_role.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
"""
Copyright 2019 Red Hat, Inc.
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU Affero General Public License as
published by the Free Software Foundation, either version 3 of the
License, or (at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Affero General Public License for more details.
You should have received a copy of the GNU Affero General Public License
along with this program. If not, see <https://www.gnu.org/licenses/>.
"""

from typing import Iterable

from kessel.relations.v1beta1 import common_pb2
from management.role.model import BindingMapping, Role
from management.workspace.model import Workspace
from migration_tool.models import V2rolebinding
from migration_tool.sharedSystemRolesReplicatedRoleBindings import v1_role_to_v2_bindings
from migration_tool.utils import create_relationship


def get_kessel_relation_tuples(
v2_role_bindings: Iterable[V2rolebinding],
default_workspace: Workspace,
) -> list[common_pb2.Relationship]:
"""Generate a set of relationships and BindingMappings for the given set of v2 role bindings."""
relationships: list[common_pb2.Relationship] = list()

for v2_role_binding in v2_role_bindings:
relationships.extend(v2_role_binding.as_tuples())

bound_resource = v2_role_binding.resource

# Is this a workspace binding, but not to the root workspace?
# If so, ensure this workspace is a child of the root workspace.
# All other resource-resource or resource-workspace relations
# which may be implied or necessary are intentionally ignored.
# These should come from the apps that own the resource.
if bound_resource.resource_type == ("rbac", "workspace") and not bound_resource.resource_id == str(
default_workspace.id
):
# This is not strictly necessary here and the relation may be a duplicate.
# Once we have more Workspace API / Inventory Group migration progress,
# this block can and probably should be removed.
# One of those APIs will add it themselves.
relationships.append(
create_relationship(
bound_resource.resource_type,
bound_resource.resource_id,
("rbac", "workspace"),
str(default_workspace.id),
"parent",
)
)

return relationships


def migrate_role(
role: Role,
default_workspace: Workspace,
current_bindings: Iterable[BindingMapping] = [],
) -> tuple[list[common_pb2.Relationship], list[BindingMapping]]:
"""
Migrate a role from v1 to v2, returning the tuples and mappings.
The mappings are returned so that we can reconstitute the corresponding tuples for a given role.
This is needed so we can remove those tuples when the role changes if needed.
"""
v2_role_bindings = v1_role_to_v2_bindings(role, default_workspace, current_bindings)
relationships = get_kessel_relation_tuples([m.get_role_binding() for m in v2_role_bindings], default_workspace)
return relationships, v2_role_bindings

0 comments on commit 8262ac5

Please sign in to comment.