Skip to content

Commit

Permalink
Merge pull request RedHatInsights#1246 from RedHatInsights/test-seed-…
Browse files Browse the repository at this point in the history
…roles

RHCLOUD-35303: Add tests to seed_roles() for verifying seeding events are replicated
  • Loading branch information
lennysgarage authored Oct 21, 2024
2 parents af85308 + 0189116 commit b3c3d3d
Show file tree
Hide file tree
Showing 2 changed files with 123 additions and 4 deletions.
11 changes: 8 additions & 3 deletions rbac/management/role/definer.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,14 +83,12 @@ def _make_role(data, dual_write_handler):
if role.display_name != display_name:
role.display_name = display_name
role.save()
dual_write_handler.replicate_new_system_role(role)
logger.info("Created system role %s.", name)
role_obj_change_notification_handler(role, "created")
else:
if role.version != defaults["version"]:
dual_write_handler.prepare_for_update(role)
Role.objects.filter(name=name).update(**defaults, display_name=display_name, modified=timezone.now())
dual_write_handler.replicate_update_system_role(role)
logger.info("Updated system role %s.", name)
role.access.all().delete()
role_obj_change_notification_handler(role, "updated")
Expand All @@ -101,13 +99,20 @@ def _make_role(data, dual_write_handler):
if access_list: # Allow external roles to have none access object
for access_item in access_list:
resource_def_list = access_item.pop("resourceDefinitions", [])
permission, created = Permission.objects.get_or_create(**access_item, tenant=public_tenant)
permission, _ = Permission.objects.get_or_create(**access_item, tenant=public_tenant)

access_obj = Access.objects.create(permission=permission, role=role, tenant=public_tenant)
for resource_def_item in resource_def_list:
ResourceDefinition.objects.create(**resource_def_item, access=access_obj, tenant=public_tenant)

_add_ext_relation_if_it_exists(data.get("external"), role)

if created:
dual_write_handler.replicate_new_system_role(role)
else:
if role.version != defaults["version"]:
dual_write_handler.replicate_update_system_role(role)

return role


Expand Down
116 changes: 115 additions & 1 deletion tests/management/role/test_definer.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,13 @@
#
"""Test the role definer."""
from django.conf import settings
from unittest.mock import ANY, call, patch
from unittest.mock import ANY, call, patch, mock_open
from management.role.definer import seed_roles, seed_permissions
from api.models import Tenant
from tests.core.test_kafka import copy_call_args
from tests.identity_request import IdentityRequest
from management.models import Access, ExtRoleRelation, Permission, ResourceDefinition, Role
from management.relation_replicator.relation_replicator import ReplicationEvent, ReplicationEventType


class RoleDefinerTests(IdentityRequest):
Expand Down Expand Up @@ -240,3 +241,116 @@ def test_try_seed_permissions_update_description(self):
self.assertEqual(permission.first().description, "Approval local test templates read.")
# Previous string verb still works
self.assertEqual(Permission.objects.filter(permission="inventory:*:*").count(), 1)

def is_create_event(self, relation: str, evt: ReplicationEvent) -> bool:
return evt.event_type == ReplicationEventType.CREATE_SYSTEM_ROLE and any(
t.relation == relation for t in evt.add
)

def is_remove_event(self, relation: str, evt: ReplicationEvent) -> bool:
return evt.event_type == ReplicationEventType.DELETE_SYSTEM_ROLE and any(
t.relation == relation for t in evt.remove
)

def is_update_event(self, relation: str, evt: ReplicationEvent) -> bool:
return evt.event_type == ReplicationEventType.UPDATE_SYSTEM_ROLE and any(
t.relation == relation for t in evt.add
)

@patch("management.relation_replicator.outbox_replicator.OutboxReplicator.replicate")
def test_seed_roles_new_role(self, mock_replicate):
seed_roles()
self.assertTrue(
any(self.is_create_event("inventory_hosts_read", args[0]) for args, _ in mock_replicate.call_args_list)
)

@patch("management.relation_replicator.outbox_replicator.OutboxReplicator.replicate")
@patch("management.role.definer.destructive_ok")
@patch("builtins.open", new_callable=mock_open, read_data='{"roles": []}')
@patch("os.listdir")
@patch("os.path.isfile")
def test_seed_roles_delete_role(
self,
mock_isfile,
mock_listdir,
mock_open,
mock_destructive_ok,
mock_replicate,
):
mock_destructive_ok.return_value = True
# mock files
mock_listdir.return_value = ["role.json"]
mock_isfile.return_value = True

# create a role in the database that's not in config
role_to_delete = Role.objects.create(name="dummy_role_delete", system=True, tenant=self.public_tenant)
permission, _ = Permission.objects.get_or_create(permission="inventory:hosts:read", tenant=self.public_tenant)
_ = Access.objects.create(permission=permission, role=role_to_delete, tenant=self.public_tenant)

role_to_delete.save()

seed_roles()

self.assertTrue(
any(self.is_remove_event("inventory_hosts_read", args[0]) for args, _ in mock_replicate.call_args_list)
)

# verify role was deleted from the database
self.assertFalse(Role.objects.filter(id=role_to_delete.id).exists())

@patch("management.relation_replicator.outbox_replicator.OutboxReplicator.replicate")
@patch(
"builtins.open",
new_callable=mock_open,
read_data='{"roles": [{"name": "dummy_role_update", "system": true, "version": 3, "access": [{"permission": "dummy:hosts:read"}]}]}',
)
@patch("os.listdir")
@patch("os.path.isfile")
def test_seed_roles_update_role(
self,
mock_isfile,
mock_listdir,
mock_open,
mock_replicate,
):
# mock files
mock_listdir.return_value = ["role.json"]
mock_isfile.return_value = True

# create a role in the database that exists in config
Role.objects.create(name="dummy_role_update", system=True, version=1, tenant=self.public_tenant)

seed_roles()

self.assertTrue(
any(self.is_update_event("dummy_hosts_read", args[0]) for args, _ in mock_replicate.call_args_list)
)

@patch("management.relation_replicator.outbox_replicator.OutboxReplicator.replicate")
def test_seed_roles_create_and_delete_role(
self,
mock_replicate,
):
# seed to create role
seed_roles()
self.assertTrue(
any(self.is_create_event("inventory_hosts_read", args[0]) for args, _ in mock_replicate.call_args_list)
)

# seed to remove role
with (
patch("os.path.isfile") as mock_isfile,
patch("os.listdir") as mock_listdir,
patch("builtins.open", mock_open(read_data='{"roles": []}')) as mock_file,
patch("management.role.definer.destructive_ok") as mock_destructive_ok,
):
# mock files
mock_destructive_ok.return_value = True
mock_listdir.return_value = ["role.json"]
mock_isfile.return_value = True

seed_roles()

self.assertTrue(
any(self.is_remove_event("inventory_hosts_read", args[0]) for args, _ in mock_replicate.call_args_list)
)

0 comments on commit b3c3d3d

Please sign in to comment.