Skip to content

Commit

Permalink
Merge pull request RedHatInsights#1374 from astrozzc/fix
Browse files Browse the repository at this point in the history
RHCLOUD-36669: Use user_id in case it is not available from principal
  • Loading branch information
astrozzc authored Dec 10, 2024
2 parents b890db6 + 2a5d222 commit 1c201cb
Show file tree
Hide file tree
Showing 3 changed files with 138 additions and 24 deletions.
38 changes: 20 additions & 18 deletions rbac/management/principal/cleaner.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
from management.tenant_service.tenant_service import TenantBootstrapService
from prometheus_client import Counter
from rest_framework import status
from sentry_sdk import capture_exception
from stompest.config import StompConfig
from stompest.error import StompConnectionError
from stompest.protocol import StompSpec
Expand All @@ -46,11 +47,16 @@
KEY_LOC = "/opt/rbac/rbac/management/principal/umb_certs/key.pem"
LOCK_ID = 42 # For Keith, with Love

METRIC_STOMP_MESSAGE_TOTAL = "stomp_messages_total"
umb_message_processed_count = Counter(
METRIC_STOMP_MESSAGE_TOTAL,
METRIC_STOMP_MESSAGES_ACK_TOTAL = "stomp_messages_ack_total"
METRIC_STOMP_MESSAGES_NACK_TOTAL = "stomp_messages_nack_total"
stomp_messages_ack_total = Counter(
METRIC_STOMP_MESSAGES_ACK_TOTAL,
"Number of stomp UMB messages processed",
)
stomp_messages_nack_total = Counter(
METRIC_STOMP_MESSAGES_NACK_TOTAL,
"Number of stomp UMB messages that failed to be processed",
)


def clean_tenant_principals(tenant):
Expand Down Expand Up @@ -199,28 +205,24 @@ def process_umb_event(frame, umb_client: Stomp, bootstrap_service: TenantBootstr
logger.info("process_umb_event: Another listener is running. Aborting.")
return False

data_dict = xmltodict.parse(frame.body)
canonical_message = data_dict.get("CanonicalMessage")
if canonical_message:
try:
user = retrieve_user_info(canonical_message)
except Exception as e: # Skip processing and leave the it to be processed later
logger.error("process_umb_event: Error retrieving user info: %s", str(e))
return True
try:
data_dict = xmltodict.parse(frame.body)
canonical_message = data_dict.get("CanonicalMessage")

user = retrieve_user_info(canonical_message)
# By default, only process disabled users.
# If the setting is enabled, process all users.
if not user.is_active or settings.PRINCIPAL_CLEANUP_UPDATE_ENABLED_UMB:
# If Tenant is not already ready, don't ready it
bootstrap_service.update_user(user, ready_tenant=False)
else:
# Message is malformed.
# Ensure we dont block the entire queue by discarding it.
# TODO: this is not the only way a message can be malformed
pass
umb_client.ack(frame)
stomp_messages_ack_total.inc()
except Exception as e:
logger.error("process_umb_event: Error processing umb message : %s", str(e))
capture_exception(e)
umb_client.nack(frame)
stomp_messages_nack_total.inc()

umb_client.ack(frame)
umb_message_processed_count.inc()
return True


Expand Down
5 changes: 4 additions & 1 deletion rbac/management/tenant_service/v2.py
Original file line number Diff line number Diff line change
Expand Up @@ -221,7 +221,10 @@ def _disable_user_in_tenant(self, user: User):

for group in principal.group.all():
group.principals.remove(principal)
tuples_to_remove.append(group.relationship_to_principal(principal))
# The user id might be None for the principal so we use user instead
tuple = group.relationship_to_principal(user)
if tuple is None:
raise ValueError(f"relationship_to_principal is None for user {user.username}")

principal.delete()
except Principal.DoesNotExist:
Expand Down
119 changes: 114 additions & 5 deletions tests/management/principal/test_cleaner.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,11 @@
from management.policy.model import Policy
from management.principal.cleaner import LOCK_ID, clean_tenant_principals
from management.principal.model import Principal
from management.principal.cleaner import process_principal_events_from_umb, METRIC_STOMP_MESSAGE_TOTAL
from management.principal.cleaner import (
process_principal_events_from_umb,
METRIC_STOMP_MESSAGES_ACK_TOTAL,
METRIC_STOMP_MESSAGES_NACK_TOTAL,
)
from management.principal.proxy import external_principal_to_user
from management.tenant_mapping.model import TenantMapping
from management.tenant_service import get_tenant_bootstrap_service
Expand Down Expand Up @@ -308,11 +312,11 @@ def setUp(self):
@patch("management.principal.cleaner.UMB_CLIENT")
def test_principal_cleanup_none(self, client_mock):
"""Test that we can run a principal clean up with no messages."""
before = REGISTRY.get_sample_value(METRIC_STOMP_MESSAGE_TOTAL)
before = REGISTRY.get_sample_value(METRIC_STOMP_MESSAGES_ACK_TOTAL)
client_mock.canRead.return_value = False
process_principal_events_from_umb()

after = REGISTRY.get_sample_value(METRIC_STOMP_MESSAGE_TOTAL)
after = REGISTRY.get_sample_value(METRIC_STOMP_MESSAGES_ACK_TOTAL)
self.assertTrue(before == after)
client_mock.receiveFrame.assert_not_called()
client_mock.disconnect.assert_called_once()
Expand All @@ -334,14 +338,14 @@ def test_cleanup_principal_in_or_not_in_group(self, client_mock, cache_class, pr
self.group.principals.add(self.principal)
self.group.save()

before = REGISTRY.get_sample_value(METRIC_STOMP_MESSAGE_TOTAL)
before = REGISTRY.get_sample_value(METRIC_STOMP_MESSAGES_ACK_TOTAL)
client_mock.canRead.side_effect = [True, False]
client_mock.receiveFrame.return_value = MagicMock(body=FRAME_BODY)
cache_mock = MagicMock()
cache_class.return_value = cache_mock
process_principal_events_from_umb()

after = REGISTRY.get_sample_value(METRIC_STOMP_MESSAGE_TOTAL)
after = REGISTRY.get_sample_value(METRIC_STOMP_MESSAGES_ACK_TOTAL)
client_mock.receiveFrame.assert_called_once()
client_mock.disconnect.assert_called_once()
client_mock.ack.assert_called_once()
Expand Down Expand Up @@ -890,6 +894,111 @@ def assertTenantBootstrappedByOrgId(self, org_id: str):
),
)

@patch(
"management.principal.proxy.PrincipalProxy._request_principals",
return_value={
"status_code": status.HTTP_200_OK,
"data": [],
},
)
@patch("management.group.model.AccessCache")
@patch("management.principal.cleaner.UMB_CLIENT")
def test_disable_principal_which_is_in_or_not_in_group(self, client_mock, cache_class, proxy_mock):
"""Process a umb message to disable a principal which is either in or not in a group."""
principal_name = "principal-test"
self.principal = Principal.objects.create(username=principal_name, tenant=self.tenant, user_id="56780000")
self.group.principals.add(self.principal)
self.group.save()

before = REGISTRY.get_sample_value(METRIC_STOMP_MESSAGES_ACK_TOTAL)
client_mock.canRead.side_effect = [True, False]
client_mock.receiveFrame.return_value = MagicMock(body=FRAME_BODY)
cache_mock = MagicMock()
cache_class.return_value = cache_mock
process_principal_events_from_umb()

after = REGISTRY.get_sample_value(METRIC_STOMP_MESSAGES_ACK_TOTAL)
client_mock.receiveFrame.assert_called_once()
client_mock.disconnect.assert_called_once()
client_mock.ack.assert_called_once()
self.assertFalse(Principal.objects.filter(username=principal_name).exists())
self.group.refresh_from_db()
self.assertFalse(self.group.principals.all())
cache_mock.delete_policy.assert_called_once_with(self.principal.uuid)
self.assertTrue(before + 1 == after)

# When principal not in group
self.principal = Principal(username=principal_name, tenant=self.tenant, user_id="56780000")
self.principal.save()
client_mock.canRead.side_effect = [True, False]
client_mock.ack.reset_mock()
process_principal_events_from_umb()
self.assertFalse(Principal.objects.filter(username=principal_name).exists())
client_mock.ack.assert_called_once()

@patch(
"management.principal.proxy.PrincipalProxy._request_principals",
return_value={
"status_code": status.HTTP_200_OK,
"data": [],
},
)
@patch("management.group.model.AccessCache")
@patch("management.principal.cleaner.UMB_CLIENT")
def test_disable_principal_without_user_id_in_group(self, client_mock, cache_class, proxy_mock):
"""Process a umb message to disable a principal which does not have user id."""
principal_name = "principal-test"
principal = Principal.objects.create(username=principal_name, tenant=self.tenant)
principal.save()
self.group.principals.add(principal)
self.group.save()

before = REGISTRY.get_sample_value(METRIC_STOMP_MESSAGES_ACK_TOTAL)
client_mock.canRead.side_effect = [True, False]
client_mock.receiveFrame.return_value = MagicMock(body=FRAME_BODY)
cache_mock = MagicMock()
cache_class.return_value = cache_mock
process_principal_events_from_umb()

after = REGISTRY.get_sample_value(METRIC_STOMP_MESSAGES_ACK_TOTAL)
client_mock.receiveFrame.assert_called_once()
client_mock.disconnect.assert_called_once()
client_mock.ack.assert_called_once()
self.assertFalse(Principal.objects.filter(username=principal_name).exists())
self.group.refresh_from_db()
self.assertFalse(self.group.principals.all())
cache_mock.delete_policy.assert_called_once_with(principal.uuid)
self.assertTrue(before + 1 == after)

@patch("management.principal.cleaner.retrieve_user_info")
@patch("management.principal.cleaner.UMB_CLIENT")
def test_failure_processing_message(self, client_mock, retrieve_user_mock):
"""Test."""
principal_name = "principal-test"
principal = Principal.objects.create(username=principal_name, tenant=self.tenant)
principal.save()
self.group.principals.add(principal)
self.group.save()

before = REGISTRY.get_sample_value(METRIC_STOMP_MESSAGES_NACK_TOTAL)
ack_before = REGISTRY.get_sample_value(METRIC_STOMP_MESSAGES_ACK_TOTAL)
client_mock.canRead.side_effect = [True, False]
client_mock.receiveFrame.return_value = MagicMock(body=FRAME_BODY)
retrieve_user_mock.side_effect = Exception("Something went wrong")
process_principal_events_from_umb()

after = REGISTRY.get_sample_value(METRIC_STOMP_MESSAGES_NACK_TOTAL)
ack_after = REGISTRY.get_sample_value(METRIC_STOMP_MESSAGES_ACK_TOTAL)
client_mock.receiveFrame.assert_called_once()
client_mock.disconnect.assert_called_once()
client_mock.nack.assert_called_once()
client_mock.ack.assert_not_called()
self.assertTrue(Principal.objects.filter(username=principal_name).exists())
self.group.refresh_from_db()
self.assertTrue(self.group.principals.all())
self.assertEqual(before + 1, after)
self.assertEqual(ack_before, ack_after)


@override_settings(V2_BOOTSTRAP_TENANT=False)
class PrincipalUMBTestsWithV1TenantBootstrap(IdentityRequest):
Expand Down

0 comments on commit 1c201cb

Please sign in to comment.