From 6bc7f7af6c7015e44e7fc68d7bfb6ffe443b9a16 Mon Sep 17 00:00:00 2001 From: Matias Bordese Date: Fri, 13 Dec 2024 18:11:52 -0300 Subject: [PATCH] fix: validate grafana URL value during organization auth check (#5365) Related to https://github.com/grafana/irm/issues/538 --- engine/apps/auth_token/auth.py | 20 ++++++++------ .../auth_token/tests/test_grafana_auth.py | 27 ++++++++++++++++--- 2 files changed, 35 insertions(+), 12 deletions(-) diff --git a/engine/apps/auth_token/auth.py b/engine/apps/auth_token/auth.py index cac5e2743..36ea8d82c 100644 --- a/engine/apps/auth_token/auth.py +++ b/engine/apps/auth_token/auth.py @@ -16,6 +16,7 @@ from apps.user_management.models import User from apps.user_management.models.organization import Organization from apps.user_management.sync import get_or_create_user +from common.utils import validate_url from settings.base import SELF_HOSTED_SETTINGS from .constants import GOOGLE_OAUTH2_AUTH_TOKEN_NAME, SCHEDULE_EXPORT_TOKEN_NAME, SLACK_AUTH_TOKEN_NAME @@ -370,14 +371,17 @@ def authenticate(self, request): def get_organization(self, request, auth): grafana_url = request.headers.get(X_GRAFANA_URL) if grafana_url: - organization = Organization.objects.filter(grafana_url=grafana_url).first() - if not organization: - # trigger a request to sync the organization - # (ignore response since we can get a 400 if sync was already triggered; - # if organization exists, we are good) - setup_organization(grafana_url, auth) - organization = Organization.objects.filter(grafana_url=grafana_url).first() - return organization + url = validate_url(grafana_url) + if url is not None: + url = url.rstrip("/") + organization = Organization.objects.filter(grafana_url=url).first() + if not organization: + # trigger a request to sync the organization + # (ignore response since we can get a 400 if sync was already triggered; + # if organization exists, we are good) + setup_organization(url, auth) + organization = Organization.objects.filter(grafana_url=url).first() + return organization if settings.LICENSE == settings.CLOUD_LICENSE_NAME: instance_id = request.headers.get(X_GRAFANA_INSTANCE_ID) diff --git a/engine/apps/auth_token/tests/test_grafana_auth.py b/engine/apps/auth_token/tests/test_grafana_auth.py index 9774156bf..8da611a0f 100644 --- a/engine/apps/auth_token/tests/test_grafana_auth.py +++ b/engine/apps/auth_token/tests/test_grafana_auth.py @@ -98,7 +98,7 @@ def test_grafana_authentication_missing_org(): @pytest.mark.django_db @httpretty.activate(verbose=True, allow_net_connect=False) -def test_grafana_authentication_invalid_grafana_url(): +def test_grafana_authentication_no_org_grafana_url(): grafana_url = "http://grafana.test" token = f"{ServiceAccountToken.GRAFANA_SA_PREFIX}xyz" headers = { @@ -115,6 +115,23 @@ def test_grafana_authentication_invalid_grafana_url(): assert exc.value.detail == "Organization not found." +@pytest.mark.parametrize("grafana_url", ["null;", "foo", ""]) +@pytest.mark.django_db +@httpretty.activate(verbose=True, allow_net_connect=False) +def test_grafana_authentication_invalid_grafana_url(grafana_url): + token = f"{ServiceAccountToken.GRAFANA_SA_PREFIX}xyz" + headers = { + "HTTP_AUTHORIZATION": token, + "HTTP_X_GRAFANA_URL": grafana_url, # no org for this URL + } + request = APIRequestFactory().get("/", **headers) + + # NOTE: no sync requests are made in this case + with pytest.raises(exceptions.AuthenticationFailed) as exc: + GrafanaServiceAccountAuthentication().authenticate(request) + assert exc.value.detail == "Organization not found." + + @pytest.mark.django_db @httpretty.activate(verbose=True, allow_net_connect=False) def test_grafana_authentication_permissions_call_fails(make_organization): @@ -144,10 +161,12 @@ def test_grafana_authentication_permissions_call_fails(make_organization): @pytest.mark.django_db +@pytest.mark.parametrize("grafana_url", ["http://grafana.test", "http://grafana.test/"]) @httpretty.activate(verbose=True, allow_net_connect=False) def test_grafana_authentication_existing_token( - make_organization, make_service_account_for_organization, make_token_for_service_account + make_organization, make_service_account_for_organization, make_token_for_service_account, grafana_url ): + # org grafana_url is consistently stored without trailing slash organization = make_organization(grafana_url="http://grafana.test") service_account = make_service_account_for_organization(organization) token_string = "glsa_the-token" @@ -155,11 +174,11 @@ def test_grafana_authentication_existing_token( headers = { "HTTP_AUTHORIZATION": token_string, - "HTTP_X_GRAFANA_URL": organization.grafana_url, + "HTTP_X_GRAFANA_URL": grafana_url, # trailing slash is ignored } request = APIRequestFactory().get("/", **headers) - # setup Grafana API responses + # setup Grafana API responses (use URL without trailing slash) setup_service_account_api_mocks(organization.grafana_url, {"some-perm": "value"}) user, auth_token = GrafanaServiceAccountAuthentication().authenticate(request)