From 01d459d7e11813fce3a76b8cacf13ab3b20550be Mon Sep 17 00:00:00 2001 From: Ellen-Yi-Dong Date: Wed, 31 Jul 2024 15:17:08 -0700 Subject: [PATCH] Log deletion of role and group include unit tests --- rbac/management/audit_log/model.py | 30 +++++++++++++++++------------ rbac/management/group/view.py | 5 ++++- rbac/management/role/view.py | 5 ++++- tests/management/group/test_view.py | 18 +++++++++++++++++ tests/management/role/test_view.py | 17 ++++++++++++++++ 5 files changed, 61 insertions(+), 14 deletions(-) diff --git a/rbac/management/audit_log/model.py b/rbac/management/audit_log/model.py index d62896746..b77895fcd 100644 --- a/rbac/management/audit_log/model.py +++ b/rbac/management/audit_log/model.py @@ -69,21 +69,13 @@ def get_resource_item(self, r_type, request, *args, **kwargs): """Find related information (eg, name, id, etc...) for each resource item.""" verify_tenant = self.get_tenant_id(request) if r_type == AuditLog.ROLE: - if request.data != {}: - role_object = get_object_or_404(Role, name=request.data["name"], tenant=verify_tenant) - else: - role_object = kwargs["kwargs"] - # retrieve role id and name + role_object = get_object_or_404(Role, name=request.data["name"], tenant=verify_tenant) role_object_id = role_object.id role_object_name = "role: " + role_object.name return role_object_id, role_object_name elif r_type == AuditLog.GROUP: - if request.data != {}: - group_object = get_object_or_404(Group, name=request.data["name"], tenant=verify_tenant) - else: - group_uuid = kwargs["kwargs"]["uuid"] - group_object = get_object_or_404(Group, uuid=group_uuid) + group_object = get_object_or_404(Group, name=request.data["name"], tenant=verify_tenant) group_object_id = group_object.id group_object_name = "group: " + group_object.name return group_object_id, group_object_name @@ -92,15 +84,29 @@ def get_resource_item(self, r_type, request, *args, **kwargs): # TODO: update for permission related items return None - def log_create(self, request, resource, **kwargs): + def log_create(self, request, resource): """Audit Log when a role or a group is created.""" self.principal_username = request.user.username self.resource_type = resource - self.resource_id, resource_name = self.get_resource_item(resource, request, kwargs=kwargs) + self.resource_id, resource_name = self.get_resource_item(resource, request) self.description = "Created " + resource_name self.action = AuditLog.CREATE self.tenant_id = self.get_tenant_id(request) super(AuditLog, self).save() + + def log_delete(self, request, resource, object): + """Audit Log when a role or a group is deleted.""" + self.principal_username = request.user.username + + self.resource_type = resource + self.resource_id = object.id + resource_name = self.resource_type + ": " + object.name + + self.description = "Deleted " + resource_name + + self.action = AuditLog.DELETE + self.tenant_id = self.get_tenant_id(request) + super(AuditLog, self).save() diff --git a/rbac/management/group/view.py b/rbac/management/group/view.py index 44aafa039..bffc9e329 100644 --- a/rbac/management/group/view.py +++ b/rbac/management/group/view.py @@ -255,7 +255,7 @@ def create(self, request, *args, **kwargs): if status.is_success(create_group.status_code): auditlog = AuditLog() - auditlog.log_create(request, AuditLog.GROUP, kwargs=kwargs) + auditlog.log_create(request, AuditLog.GROUP) return create_group @@ -367,6 +367,9 @@ def destroy(self, request, *args, **kwargs): response = super().destroy(request=request, args=args, kwargs=kwargs) if response.status_code == status.HTTP_204_NO_CONTENT: group_obj_change_notification_handler(request.user, group, "deleted") + + auditlog = AuditLog() + auditlog.log_delete(request, AuditLog.GROUP, group) return response def update(self, request, *args, **kwargs): diff --git a/rbac/management/role/view.py b/rbac/management/role/view.py index b4733d21c..3b970635a 100644 --- a/rbac/management/role/view.py +++ b/rbac/management/role/view.py @@ -214,7 +214,7 @@ def create(self, request, *args, **kwargs): if status.is_success(create_role.status_code): auditlog = AuditLog() - auditlog.log_create(request, AuditLog.ROLE, kwargs=kwargs) + auditlog.log_create(request, AuditLog.ROLE) return create_role @@ -332,6 +332,9 @@ def destroy(self, request, *args, **kwargs): response = super().destroy(request=request, args=args, kwargs=kwargs) if response.status_code == status.HTTP_204_NO_CONTENT: role_obj_change_notification_handler(role, "deleted", request.user) + + auditlog = AuditLog() + auditlog.log_delete(request, AuditLog.ROLE, role) return response def partial_update(self, request, *args, **kwargs): diff --git a/tests/management/group/test_view.py b/tests/management/group/test_view.py index 2f1abc214..f50655fb7 100644 --- a/tests/management/group/test_view.py +++ b/tests/management/group/test_view.py @@ -767,6 +767,24 @@ def test_delete_group_success(self, send_kafka_message): org_id = self.customer_data["org_id"] + # test whether correctly added to audit logs + al_url = "/api/v1/auditlogs/" + al_client = APIClient() + al_response = al_client.get(al_url, **self.headers) + retrieve_data = al_response.data.get("data") + al_list = retrieve_data + al_dict = al_list[0] + + al_dict_principal_username = al_dict["principal_username"] + al_dict_description = al_dict["description"] + al_dict_resource = al_dict["resource_type"] + al_dict_action = al_dict["action"] + + self.assertEqual(self.user_data["username"], al_dict_principal_username) + self.assertIsNotNone(al_dict_description) + self.assertEqual(al_dict_resource, "group") + self.assertEqual(al_dict_action, "delete") + # verify the group no longer exists response = client.get(url, **self.headers) self.assertEqual(response.status_code, status.HTTP_404_NOT_FOUND) diff --git a/tests/management/role/test_view.py b/tests/management/role/test_view.py index d388ab8de..c97f52820 100644 --- a/tests/management/role/test_view.py +++ b/tests/management/role/test_view.py @@ -1327,6 +1327,23 @@ def test_delete_role_success(self, send_kafka_message): org_id = self.customer_data["org_id"] self.assertEqual(response.status_code, status.HTTP_204_NO_CONTENT) + # test whether correctly added to audit logs + al_url = "/api/v1/auditlogs/" + al_client = APIClient() + al_response = al_client.get(al_url, **self.headers) + retrieve_data = al_response.data.get("data") + al_list = retrieve_data + al_dict = al_list[1] + + al_dict_principal_username = al_dict["principal_username"] + al_dict_description = al_dict["description"] + al_dict_resource = al_dict["resource_type"] + al_dict_action = al_dict["action"] + + self.assertEqual(self.user_data["username"], al_dict_principal_username) + self.assertIsNotNone(al_dict_description) + self.assertEqual(al_dict_resource, "role") + self.assertEqual(al_dict_action, "delete") send_kafka_message.assert_called_with( settings.NOTIFICATIONS_TOPIC,