diff --git a/engine/apps/auth_token/auth.py b/engine/apps/auth_token/auth.py index bb419eb3af..19374a7b16 100644 --- a/engine/apps/auth_token/auth.py +++ b/engine/apps/auth_token/auth.py @@ -52,7 +52,7 @@ def authenticate(self, request): auth = get_authorization_header(request).decode("utf-8") user, auth_token = self.authenticate_credentials(auth) - if not user_is_authorized(user, [RBACPermission.Permissions.API_KEYS_WRITE]): + if not user.is_active or not user_is_authorized(user, [RBACPermission.Permissions.API_KEYS_WRITE]): raise exceptions.AuthenticationFailed( "Only users with Admin permissions are allowed to perform this action." ) diff --git a/engine/apps/auth_token/tests/test_plugin_auth.py b/engine/apps/auth_token/tests/test_plugin_auth.py index 56fe56974a..d03c40b388 100644 --- a/engine/apps/auth_token/tests/test_plugin_auth.py +++ b/engine/apps/auth_token/tests/test_plugin_auth.py @@ -83,6 +83,26 @@ def test_plugin_authentication_fail(authorization, instance_context): PluginAuthentication().authenticate(request) +@pytest.mark.django_db +def test_plugin_authentication_inactive_user(make_organization, make_user, make_token_for_organization): + organization = make_organization(stack_id=42, org_id=24) + token, token_string = make_token_for_organization(organization) + user = make_user(organization=organization, user_id=12) + # user is set to inactive if deleted via queryset (ie. during sync) + user.is_active = False + user.save() + + headers = { + "HTTP_AUTHORIZATION": token_string, + "HTTP_X-Instance-Context": INSTANCE_CONTEXT, + "HTTP_X-Grafana-Context": '{"UserId": 12}', + } + request = APIRequestFactory().get("/", **headers) + + with pytest.raises(AuthenticationFailed): + PluginAuthentication().authenticate(request) + + @pytest.mark.django_db def test_plugin_authentication_gcom_setup_new_user(make_organization): # Setting gcom_token_org_last_time_synced to now, so it doesn't try to sync with gcom diff --git a/engine/apps/mobile_app/auth.py b/engine/apps/mobile_app/auth.py index 6363f9f254..8a5a7178b9 100644 --- a/engine/apps/mobile_app/auth.py +++ b/engine/apps/mobile_app/auth.py @@ -33,7 +33,7 @@ class MobileAppAuthTokenAuthentication(BaseAuthentication): def authenticate(self, request) -> Optional[Tuple[User, MobileAppAuthToken]]: auth = get_authorization_header(request).decode("utf-8") user, auth_token = self.authenticate_credentials(auth) - if user is None: + if user is None or not user.is_active: return None return user, auth_token diff --git a/engine/apps/mobile_app/tests/test_mobile_app_auth_token.py b/engine/apps/mobile_app/tests/test_mobile_app_auth_token.py index 2f5f2f4de9..34e7d9d011 100644 --- a/engine/apps/mobile_app/tests/test_mobile_app_auth_token.py +++ b/engine/apps/mobile_app/tests/test_mobile_app_auth_token.py @@ -4,6 +4,7 @@ from rest_framework.test import APIClient from apps.mobile_app.models import MobileAppAuthToken +from apps.user_management.models import User @pytest.mark.django_db @@ -76,3 +77,17 @@ def test_mobile_app_auth_token( response = client.get(url, HTTP_AUTHORIZATION=verification_token) assert response.status_code == status.HTTP_404_NOT_FOUND + + +@pytest.mark.django_db +def test_mobile_app_auth_token_deleted_user( + make_organization_and_user_with_mobile_app_auth_token, +): + _, user, auth_token = make_organization_and_user_with_mobile_app_auth_token() + # user is deleted via queryset (ie. setting it to inactive, during sync) + User.objects.filter(id=user.id).delete() + + client = APIClient() + url = reverse("api-internal:alertgroup-list") + response = client.get(url, HTTP_AUTHORIZATION=auth_token) + assert response.status_code == status.HTTP_403_FORBIDDEN diff --git a/engine/apps/public_api/tests/test_alert_groups.py b/engine/apps/public_api/tests/test_alert_groups.py index 9c88fec880..3cfc9c8537 100644 --- a/engine/apps/public_api/tests/test_alert_groups.py +++ b/engine/apps/public_api/tests/test_alert_groups.py @@ -180,6 +180,20 @@ def test_get_alert_groups(alert_group_public_api_setup): assert response.json() == expected_response +@pytest.mark.django_db +def test_get_alert_groups_inactive_user(make_organization_and_user_with_token): + _, user, token = make_organization_and_user_with_token() + # user is set to inactive if deleted via queryset (ie. during sync) + user.is_active = False + user.save() + + client = APIClient() + url = reverse("api-public:alert_groups-list") + response = client.get(url, format="json", HTTP_AUTHORIZATION=token) + + assert response.status_code == status.HTTP_403_FORBIDDEN + + @pytest.mark.django_db def test_get_alert_groups_include_labels(alert_group_public_api_setup, make_alert_group_label_association): token, _, _, _ = alert_group_public_api_setup