diff --git a/rbac/management/principal/cleaner.py b/rbac/management/principal/cleaner.py index 85d226a3..d39ffff4 100644 --- a/rbac/management/principal/cleaner.py +++ b/rbac/management/principal/cleaner.py @@ -31,6 +31,7 @@ from management.tenant_service.tenant_service import TenantBootstrapService from prometheus_client import Counter from rest_framework import status +from sentry_sdk import capture_exception from stompest.config import StompConfig from stompest.error import StompConnectionError from stompest.protocol import StompSpec @@ -46,11 +47,16 @@ KEY_LOC = "/opt/rbac/rbac/management/principal/umb_certs/key.pem" LOCK_ID = 42 # For Keith, with Love -METRIC_STOMP_MESSAGE_TOTAL = "stomp_messages_total" -umb_message_processed_count = Counter( - METRIC_STOMP_MESSAGE_TOTAL, +METRIC_STOMP_MESSAGES_ACK_TOTAL = "stomp_messages_ack_total" +METRIC_STOMP_MESSAGES_NACK_TOTAL = "stomp_messages_nack_total" +stomp_messages_ack_total = Counter( + METRIC_STOMP_MESSAGES_ACK_TOTAL, "Number of stomp UMB messages processed", ) +stomp_messages_nack_total = Counter( + METRIC_STOMP_MESSAGES_NACK_TOTAL, + "Number of stomp UMB messages that failed to be processed", +) def clean_tenant_principals(tenant): @@ -199,28 +205,24 @@ def process_umb_event(frame, umb_client: Stomp, bootstrap_service: TenantBootstr logger.info("process_umb_event: Another listener is running. Aborting.") return False - data_dict = xmltodict.parse(frame.body) - canonical_message = data_dict.get("CanonicalMessage") - if canonical_message: - try: - user = retrieve_user_info(canonical_message) - except Exception as e: # Skip processing and leave the it to be processed later - logger.error("process_umb_event: Error retrieving user info: %s", str(e)) - return True + try: + data_dict = xmltodict.parse(frame.body) + canonical_message = data_dict.get("CanonicalMessage") + user = retrieve_user_info(canonical_message) # By default, only process disabled users. # If the setting is enabled, process all users. if not user.is_active or settings.PRINCIPAL_CLEANUP_UPDATE_ENABLED_UMB: # If Tenant is not already ready, don't ready it bootstrap_service.update_user(user, ready_tenant=False) - else: - # Message is malformed. - # Ensure we dont block the entire queue by discarding it. - # TODO: this is not the only way a message can be malformed - pass + umb_client.ack(frame) + stomp_messages_ack_total.inc() + except Exception as e: + logger.error("process_umb_event: Error processing umb message : %s", str(e)) + capture_exception(e) + umb_client.nack(frame) + stomp_messages_nack_total.inc() - umb_client.ack(frame) - umb_message_processed_count.inc() return True diff --git a/rbac/management/tenant_service/v2.py b/rbac/management/tenant_service/v2.py index 91b74699..22a26469 100644 --- a/rbac/management/tenant_service/v2.py +++ b/rbac/management/tenant_service/v2.py @@ -221,7 +221,10 @@ def _disable_user_in_tenant(self, user: User): for group in principal.group.all(): group.principals.remove(principal) - tuples_to_remove.append(group.relationship_to_principal(principal)) + # The user id might be None for the principal so we use user instead + tuple = group.relationship_to_principal(user) + if tuple is None: + raise ValueError(f"relationship_to_principal is None for user {user.username}") principal.delete() except Principal.DoesNotExist: diff --git a/tests/management/principal/test_cleaner.py b/tests/management/principal/test_cleaner.py index d08eab77..fceef06e 100644 --- a/tests/management/principal/test_cleaner.py +++ b/tests/management/principal/test_cleaner.py @@ -31,7 +31,11 @@ from management.policy.model import Policy from management.principal.cleaner import LOCK_ID, clean_tenant_principals from management.principal.model import Principal -from management.principal.cleaner import process_principal_events_from_umb, METRIC_STOMP_MESSAGE_TOTAL +from management.principal.cleaner import ( + process_principal_events_from_umb, + METRIC_STOMP_MESSAGES_ACK_TOTAL, + METRIC_STOMP_MESSAGES_NACK_TOTAL, +) from management.principal.proxy import external_principal_to_user from management.tenant_mapping.model import TenantMapping from management.tenant_service import get_tenant_bootstrap_service @@ -308,11 +312,11 @@ def setUp(self): @patch("management.principal.cleaner.UMB_CLIENT") def test_principal_cleanup_none(self, client_mock): """Test that we can run a principal clean up with no messages.""" - before = REGISTRY.get_sample_value(METRIC_STOMP_MESSAGE_TOTAL) + before = REGISTRY.get_sample_value(METRIC_STOMP_MESSAGES_ACK_TOTAL) client_mock.canRead.return_value = False process_principal_events_from_umb() - after = REGISTRY.get_sample_value(METRIC_STOMP_MESSAGE_TOTAL) + after = REGISTRY.get_sample_value(METRIC_STOMP_MESSAGES_ACK_TOTAL) self.assertTrue(before == after) client_mock.receiveFrame.assert_not_called() client_mock.disconnect.assert_called_once() @@ -334,14 +338,14 @@ def test_cleanup_principal_in_or_not_in_group(self, client_mock, cache_class, pr self.group.principals.add(self.principal) self.group.save() - before = REGISTRY.get_sample_value(METRIC_STOMP_MESSAGE_TOTAL) + before = REGISTRY.get_sample_value(METRIC_STOMP_MESSAGES_ACK_TOTAL) client_mock.canRead.side_effect = [True, False] client_mock.receiveFrame.return_value = MagicMock(body=FRAME_BODY) cache_mock = MagicMock() cache_class.return_value = cache_mock process_principal_events_from_umb() - after = REGISTRY.get_sample_value(METRIC_STOMP_MESSAGE_TOTAL) + after = REGISTRY.get_sample_value(METRIC_STOMP_MESSAGES_ACK_TOTAL) client_mock.receiveFrame.assert_called_once() client_mock.disconnect.assert_called_once() client_mock.ack.assert_called_once() @@ -890,6 +894,111 @@ def assertTenantBootstrappedByOrgId(self, org_id: str): ), ) + @patch( + "management.principal.proxy.PrincipalProxy._request_principals", + return_value={ + "status_code": status.HTTP_200_OK, + "data": [], + }, + ) + @patch("management.group.model.AccessCache") + @patch("management.principal.cleaner.UMB_CLIENT") + def test_disable_principal_which_is_in_or_not_in_group(self, client_mock, cache_class, proxy_mock): + """Process a umb message to disable a principal which is either in or not in a group.""" + principal_name = "principal-test" + self.principal = Principal.objects.create(username=principal_name, tenant=self.tenant, user_id="56780000") + self.group.principals.add(self.principal) + self.group.save() + + before = REGISTRY.get_sample_value(METRIC_STOMP_MESSAGES_ACK_TOTAL) + client_mock.canRead.side_effect = [True, False] + client_mock.receiveFrame.return_value = MagicMock(body=FRAME_BODY) + cache_mock = MagicMock() + cache_class.return_value = cache_mock + process_principal_events_from_umb() + + after = REGISTRY.get_sample_value(METRIC_STOMP_MESSAGES_ACK_TOTAL) + client_mock.receiveFrame.assert_called_once() + client_mock.disconnect.assert_called_once() + client_mock.ack.assert_called_once() + self.assertFalse(Principal.objects.filter(username=principal_name).exists()) + self.group.refresh_from_db() + self.assertFalse(self.group.principals.all()) + cache_mock.delete_policy.assert_called_once_with(self.principal.uuid) + self.assertTrue(before + 1 == after) + + # When principal not in group + self.principal = Principal(username=principal_name, tenant=self.tenant, user_id="56780000") + self.principal.save() + client_mock.canRead.side_effect = [True, False] + client_mock.ack.reset_mock() + process_principal_events_from_umb() + self.assertFalse(Principal.objects.filter(username=principal_name).exists()) + client_mock.ack.assert_called_once() + + @patch( + "management.principal.proxy.PrincipalProxy._request_principals", + return_value={ + "status_code": status.HTTP_200_OK, + "data": [], + }, + ) + @patch("management.group.model.AccessCache") + @patch("management.principal.cleaner.UMB_CLIENT") + def test_disable_principal_without_user_id_in_group(self, client_mock, cache_class, proxy_mock): + """Process a umb message to disable a principal which does not have user id.""" + principal_name = "principal-test" + principal = Principal.objects.create(username=principal_name, tenant=self.tenant) + principal.save() + self.group.principals.add(principal) + self.group.save() + + before = REGISTRY.get_sample_value(METRIC_STOMP_MESSAGES_ACK_TOTAL) + client_mock.canRead.side_effect = [True, False] + client_mock.receiveFrame.return_value = MagicMock(body=FRAME_BODY) + cache_mock = MagicMock() + cache_class.return_value = cache_mock + process_principal_events_from_umb() + + after = REGISTRY.get_sample_value(METRIC_STOMP_MESSAGES_ACK_TOTAL) + client_mock.receiveFrame.assert_called_once() + client_mock.disconnect.assert_called_once() + client_mock.ack.assert_called_once() + self.assertFalse(Principal.objects.filter(username=principal_name).exists()) + self.group.refresh_from_db() + self.assertFalse(self.group.principals.all()) + cache_mock.delete_policy.assert_called_once_with(principal.uuid) + self.assertTrue(before + 1 == after) + + @patch("management.principal.cleaner.retrieve_user_info") + @patch("management.principal.cleaner.UMB_CLIENT") + def test_failure_processing_message(self, client_mock, retrieve_user_mock): + """Test.""" + principal_name = "principal-test" + principal = Principal.objects.create(username=principal_name, tenant=self.tenant) + principal.save() + self.group.principals.add(principal) + self.group.save() + + before = REGISTRY.get_sample_value(METRIC_STOMP_MESSAGES_NACK_TOTAL) + ack_before = REGISTRY.get_sample_value(METRIC_STOMP_MESSAGES_ACK_TOTAL) + client_mock.canRead.side_effect = [True, False] + client_mock.receiveFrame.return_value = MagicMock(body=FRAME_BODY) + retrieve_user_mock.side_effect = Exception("Something went wrong") + process_principal_events_from_umb() + + after = REGISTRY.get_sample_value(METRIC_STOMP_MESSAGES_NACK_TOTAL) + ack_after = REGISTRY.get_sample_value(METRIC_STOMP_MESSAGES_ACK_TOTAL) + client_mock.receiveFrame.assert_called_once() + client_mock.disconnect.assert_called_once() + client_mock.nack.assert_called_once() + client_mock.ack.assert_not_called() + self.assertTrue(Principal.objects.filter(username=principal_name).exists()) + self.group.refresh_from_db() + self.assertTrue(self.group.principals.all()) + self.assertEqual(before + 1, after) + self.assertEqual(ack_before, ack_after) + @override_settings(V2_BOOTSTRAP_TENANT=False) class PrincipalUMBTestsWithV1TenantBootstrap(IdentityRequest):