Skip to content

Commit

Permalink
Address feedbacks
Browse files Browse the repository at this point in the history
  • Loading branch information
astrozzc committed Dec 10, 2024
1 parent 23601bc commit 2a5d222
Show file tree
Hide file tree
Showing 3 changed files with 99 additions and 56 deletions.
46 changes: 20 additions & 26 deletions rbac/management/principal/cleaner.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,11 +47,16 @@
KEY_LOC = "/opt/rbac/rbac/management/principal/umb_certs/key.pem"
LOCK_ID = 42 # For Keith, with Love

METRIC_STOMP_MESSAGE_TOTAL = "stomp_messages_total"
umb_message_processed_count = Counter(
METRIC_STOMP_MESSAGE_TOTAL,
METRIC_STOMP_MESSAGES_ACK_TOTAL = "stomp_messages_ack_total"
METRIC_STOMP_MESSAGES_NACK_TOTAL = "stomp_messages_nack_total"
stomp_messages_ack_total = Counter(
METRIC_STOMP_MESSAGES_ACK_TOTAL,
"Number of stomp UMB messages processed",
)
stomp_messages_nack_total = Counter(
METRIC_STOMP_MESSAGES_NACK_TOTAL,
"Number of stomp UMB messages that failed to be processed",
)


def clean_tenant_principals(tenant):
Expand Down Expand Up @@ -200,35 +205,24 @@ def process_umb_event(frame, umb_client: Stomp, bootstrap_service: TenantBootstr
logger.info("process_umb_event: Another listener is running. Aborting.")
return False

data_dict = xmltodict.parse(frame.body)
canonical_message = data_dict.get("CanonicalMessage")
if canonical_message:
try:
user = retrieve_user_info(canonical_message)
except Exception as e: # Skip processing and leave the it to be processed later
logger.error("process_umb_event: Error retrieving user info: %s", str(e))
return True
try:
data_dict = xmltodict.parse(frame.body)
canonical_message = data_dict.get("CanonicalMessage")

user = retrieve_user_info(canonical_message)
# By default, only process disabled users.
# If the setting is enabled, process all users.
if not user.is_active or settings.PRINCIPAL_CLEANUP_UPDATE_ENABLED_UMB:
# If Tenant is not already ready, don't ready it
try:
bootstrap_service.update_user(user, ready_tenant=False)
except ValueError as e:
logger.error("process_umb_event: Error updating user: %s", str(e))
capture_exception(e)
umb_client.nack(frame)
umb_message_processed_count.inc()
return True
else:
# Message is malformed.
# Ensure we dont block the entire queue by discarding it.
# TODO: this is not the only way a message can be malformed
pass
bootstrap_service.update_user(user, ready_tenant=False)
umb_client.ack(frame)
stomp_messages_ack_total.inc()
except Exception as e:
logger.error("process_umb_event: Error processing umb message : %s", str(e))
capture_exception(e)
umb_client.nack(frame)
stomp_messages_nack_total.inc()

umb_client.ack(frame)
umb_message_processed_count.inc()
return True


Expand Down
12 changes: 4 additions & 8 deletions rbac/management/tenant_service/v2.py
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,6 @@ def _disable_user_in_tenant(self, user: User):
# Get tenant mapping if present but no need to create if not
tuples_to_remove = []
user_id = user.user_id
error_message = None

if user_id is None:
raise ValueError(f"User {user.username} has no user_id.")
Expand All @@ -222,11 +221,10 @@ def _disable_user_in_tenant(self, user: User):

for group in principal.group.all():
group.principals.remove(principal)
tupple = group.relationship_to_principal(principal)
if tupple is None:
error_message = f"principal {principal.username} has no user id."
else:
tuples_to_remove.append(tupple)
# The user id might be None for the principal so we use user instead
tuple = group.relationship_to_principal(user)
if tuple is None:
raise ValueError(f"relationship_to_principal is None for user {user.username}")

principal.delete()
except Principal.DoesNotExist:
Expand All @@ -240,8 +238,6 @@ def _disable_user_in_tenant(self, user: User):
remove=tuples_to_remove,
)
)
if error_message:
raise ValueError(error_message)

def _get_or_bootstrap_tenant(
self, org_id: str, ready: bool, account_number: Optional[str] = None
Expand Down
97 changes: 75 additions & 22 deletions tests/management/principal/test_cleaner.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,11 @@
from management.policy.model import Policy
from management.principal.cleaner import LOCK_ID, clean_tenant_principals
from management.principal.model import Principal
from management.principal.cleaner import process_principal_events_from_umb, METRIC_STOMP_MESSAGE_TOTAL
from management.principal.cleaner import (
process_principal_events_from_umb,
METRIC_STOMP_MESSAGES_ACK_TOTAL,
METRIC_STOMP_MESSAGES_NACK_TOTAL,
)
from management.principal.proxy import external_principal_to_user
from management.tenant_mapping.model import TenantMapping
from management.tenant_service import get_tenant_bootstrap_service
Expand Down Expand Up @@ -308,11 +312,11 @@ def setUp(self):
@patch("management.principal.cleaner.UMB_CLIENT")
def test_principal_cleanup_none(self, client_mock):
"""Test that we can run a principal clean up with no messages."""
before = REGISTRY.get_sample_value(METRIC_STOMP_MESSAGE_TOTAL)
before = REGISTRY.get_sample_value(METRIC_STOMP_MESSAGES_ACK_TOTAL)
client_mock.canRead.return_value = False
process_principal_events_from_umb()

after = REGISTRY.get_sample_value(METRIC_STOMP_MESSAGE_TOTAL)
after = REGISTRY.get_sample_value(METRIC_STOMP_MESSAGES_ACK_TOTAL)
self.assertTrue(before == after)
client_mock.receiveFrame.assert_not_called()
client_mock.disconnect.assert_called_once()
Expand All @@ -334,14 +338,14 @@ def test_cleanup_principal_in_or_not_in_group(self, client_mock, cache_class, pr
self.group.principals.add(self.principal)
self.group.save()

before = REGISTRY.get_sample_value(METRIC_STOMP_MESSAGE_TOTAL)
before = REGISTRY.get_sample_value(METRIC_STOMP_MESSAGES_ACK_TOTAL)
client_mock.canRead.side_effect = [True, False]
client_mock.receiveFrame.return_value = MagicMock(body=FRAME_BODY)
cache_mock = MagicMock()
cache_class.return_value = cache_mock
process_principal_events_from_umb()

after = REGISTRY.get_sample_value(METRIC_STOMP_MESSAGE_TOTAL)
after = REGISTRY.get_sample_value(METRIC_STOMP_MESSAGES_ACK_TOTAL)
client_mock.receiveFrame.assert_called_once()
client_mock.disconnect.assert_called_once()
client_mock.ack.assert_called_once()
Expand Down Expand Up @@ -899,22 +903,21 @@ def assertTenantBootstrappedByOrgId(self, org_id: str):
)
@patch("management.group.model.AccessCache")
@patch("management.principal.cleaner.UMB_CLIENT")
def test_cleanup_disabled_principal_in_or_not_in_group(self, client_mock, cache_class, proxy_mock):
"""Run a principal clean up on a tenant with a principal either in or not in a group."""
def test_disable_principal_which_is_in_or_not_in_group(self, client_mock, cache_class, proxy_mock):
"""Process a umb message to disable a principal which is either in or not in a group."""
principal_name = "principal-test"
self.principal = Principal(username=principal_name, tenant=self.tenant, user_id="56780000")
self.principal.save()
self.principal = Principal.objects.create(username=principal_name, tenant=self.tenant, user_id="56780000")
self.group.principals.add(self.principal)
self.group.save()

before = REGISTRY.get_sample_value(METRIC_STOMP_MESSAGE_TOTAL)
before = REGISTRY.get_sample_value(METRIC_STOMP_MESSAGES_ACK_TOTAL)
client_mock.canRead.side_effect = [True, False]
client_mock.receiveFrame.return_value = MagicMock(body=FRAME_BODY)
cache_mock = MagicMock()
cache_class.return_value = cache_mock
process_principal_events_from_umb()

after = REGISTRY.get_sample_value(METRIC_STOMP_MESSAGE_TOTAL)
after = REGISTRY.get_sample_value(METRIC_STOMP_MESSAGES_ACK_TOTAL)
client_mock.receiveFrame.assert_called_once()
client_mock.disconnect.assert_called_once()
client_mock.ack.assert_called_once()
Expand All @@ -924,27 +927,77 @@ def test_cleanup_disabled_principal_in_or_not_in_group(self, client_mock, cache_
cache_mock.delete_policy.assert_called_once_with(self.principal.uuid)
self.assertTrue(before + 1 == after)

# when principal does not have user id set
# When principal not in group
self.principal = Principal(username=principal_name, tenant=self.tenant, user_id="56780000")
self.principal.save()
client_mock.canRead.side_effect = [True, False]
client_mock.ack.reset_mock()
process_principal_events_from_umb()
self.assertFalse(Principal.objects.filter(username=principal_name).exists())
client_mock.ack.assert_called_once()

@patch(
"management.principal.proxy.PrincipalProxy._request_principals",
return_value={
"status_code": status.HTTP_200_OK,
"data": [],
},
)
@patch("management.group.model.AccessCache")
@patch("management.principal.cleaner.UMB_CLIENT")
def test_disable_principal_without_user_id_in_group(self, client_mock, cache_class, proxy_mock):
"""Process a umb message to disable a principal which does not have user id."""
principal_name = "principal-test"
principal = Principal.objects.create(username=principal_name, tenant=self.tenant)
principal.save()
self.group.principals.add(principal)
self.group.save()
client_mock.canRead.side_effect = [True, False]
client_mock.ack.reset_mock()

before = REGISTRY.get_sample_value(METRIC_STOMP_MESSAGES_ACK_TOTAL)
client_mock.canRead.side_effect = [True, False]
client_mock.receiveFrame.return_value = MagicMock(body=FRAME_BODY)
cache_mock = MagicMock()
cache_class.return_value = cache_mock
process_principal_events_from_umb()

client_mock.nack.assert_called_once()
after = REGISTRY.get_sample_value(METRIC_STOMP_MESSAGES_ACK_TOTAL)
client_mock.receiveFrame.assert_called_once()
client_mock.disconnect.assert_called_once()
client_mock.ack.assert_called_once()
self.assertFalse(Principal.objects.filter(username=principal_name).exists())
client_mock.ack.assert_not_called()
self.group.refresh_from_db()
self.assertFalse(self.group.principals.all())
cache_mock.delete_policy.assert_called_once_with(principal.uuid)
self.assertTrue(before + 1 == after)

# When principal not in group
self.principal = Principal(username=principal_name, tenant=self.tenant, user_id="56780000")
self.principal.save()
@patch("management.principal.cleaner.retrieve_user_info")
@patch("management.principal.cleaner.UMB_CLIENT")
def test_failure_processing_message(self, client_mock, retrieve_user_mock):
"""Test."""
principal_name = "principal-test"
principal = Principal.objects.create(username=principal_name, tenant=self.tenant)
principal.save()
self.group.principals.add(principal)
self.group.save()

before = REGISTRY.get_sample_value(METRIC_STOMP_MESSAGES_NACK_TOTAL)
ack_before = REGISTRY.get_sample_value(METRIC_STOMP_MESSAGES_ACK_TOTAL)
client_mock.canRead.side_effect = [True, False]
client_mock.ack.reset_mock()
client_mock.receiveFrame.return_value = MagicMock(body=FRAME_BODY)
retrieve_user_mock.side_effect = Exception("Something went wrong")
process_principal_events_from_umb()
self.assertFalse(Principal.objects.filter(username=principal_name).exists())
client_mock.ack.assert_called_once()

after = REGISTRY.get_sample_value(METRIC_STOMP_MESSAGES_NACK_TOTAL)
ack_after = REGISTRY.get_sample_value(METRIC_STOMP_MESSAGES_ACK_TOTAL)
client_mock.receiveFrame.assert_called_once()
client_mock.disconnect.assert_called_once()
client_mock.nack.assert_called_once()
client_mock.ack.assert_not_called()
self.assertTrue(Principal.objects.filter(username=principal_name).exists())
self.group.refresh_from_db()
self.assertTrue(self.group.principals.all())
self.assertEqual(before + 1, after)
self.assertEqual(ack_before, ack_after)


@override_settings(V2_BOOTSTRAP_TENANT=False)
Expand Down

0 comments on commit 2a5d222

Please sign in to comment.