Skip to content

Commit

Permalink
Made the tests more reliable
Browse files Browse the repository at this point in the history
  • Loading branch information
dlzhry2nhs committed Dec 6, 2023
1 parent 91d29b1 commit 6e4e75f
Show file tree
Hide file tree
Showing 3 changed files with 44 additions and 100 deletions.
Empty file removed tests/auth_utils/__init__.py
Empty file.
36 changes: 0 additions & 36 deletions tests/auth_utils/authorisation_wrapper.py

This file was deleted.

108 changes: 44 additions & 64 deletions tests/test_endpoints.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,14 @@
https://github.com/NHSDigital/pytest-nhsd-apim/blob/main/tests/test_examples.py
for more ideas on how to test the authorization of your API.
"""
import time
import json
import os
import pytest
import requests
import uuid
from os import getenv

from auth_utils.authorisation_wrapper import authz_wrapper


def read_json_file(current_file: str, filename: str):
"""
Expand Down Expand Up @@ -58,18 +57,6 @@ def test_wait_for_ping(nhsd_apim_proxy_url):
assert deployed_commit_id == getenv('SOURCE_COMMIT_ID')


@pytest.mark.smoketest
def test_status(nhsd_apim_proxy_url, status_endpoint_auth_headers):
resp = requests.get(
f"{nhsd_apim_proxy_url}/_status", headers=status_endpoint_auth_headers
)
resp_content = resp.json()

assert resp.status_code == 200
assert resp_content.get("commitId") == getenv('SOURCE_COMMIT_ID')
assert resp_content.get("status") == "pass"


@pytest.mark.smoketest
def test_wait_for_status(nhsd_apim_proxy_url, status_endpoint_auth_headers):
retries = 0
Expand All @@ -94,6 +81,18 @@ def test_wait_for_status(nhsd_apim_proxy_url, status_endpoint_auth_headers):
assert deployed_commit_id == getenv('SOURCE_COMMIT_ID')


@pytest.mark.smoketest
def test_status(nhsd_apim_proxy_url, status_endpoint_auth_headers):
resp = requests.get(
f"{nhsd_apim_proxy_url}/_status", headers=status_endpoint_auth_headers
)
resp_content = resp.json()

assert resp.status_code == 200
assert resp_content.get("commitId") == getenv('SOURCE_COMMIT_ID')
assert resp_content.get("status") == "pass"


@pytest.mark.nhsd_apim_authorization({"access": "application", "level": "level0"})
def test_app_level0(nhsd_apim_proxy_url, nhsd_apim_auth_headers):
resp = requests.get(f"{nhsd_apim_proxy_url}", headers=nhsd_apim_auth_headers)
Expand All @@ -107,66 +106,47 @@ def test_app_level3(nhsd_apim_proxy_url, nhsd_apim_auth_headers):


@pytest.mark.nhsd_apim_authorization({"access": "application", "level": "level3"})
@authz_wrapper
def test_events_endpoint_accepts_valid_mds_payload(
nhsd_apim_proxy_url,
nhsd_apim_auth_headers,
pds_change_of_gp_mds_event_mock,
_apigee_app_base_url,
_create_test_app
):
nhsd_apim_auth_headers["X-Correlation-ID"] = f"apim-smoketests-{uuid.uuid4()}"
resp = requests.post(
f"{nhsd_apim_proxy_url}/events",
headers=nhsd_apim_auth_headers,
json=pds_change_of_gp_mds_event_mock
created_test_app_name = _create_test_app["name"]
apigee_update_url = f"{_apigee_app_base_url}/{created_test_app_name}"

key_value_pairs = {
"attributes": [
{
"name": "permissions",
"value": "events:create:pds-change-of-gp-1"
}
]
}

update_response = requests.put(
f"{apigee_update_url}",
json=key_value_pairs,
headers={"Authorization": f"Bearer {os.environ['APIGEE_ACCESS_TOKEN']}"}
)
update_response.raise_for_status()

assert resp.status_code == 200
assert resp.json() == {"id": "236a1d4a-5d69-4fa9-9c7f-e72bf505aa5b"}


@pytest.mark.nhsd_apim_authorization({"access": "application", "level": "level3"})
@authz_wrapper
def test_events_endpoint_rejects_invalid_payload(
nhsd_apim_proxy_url,
nhsd_apim_auth_headers,
pds_change_of_gp_mds_event_mock,
_apigee_app_base_url,
_create_test_app
):
nhsd_apim_auth_headers["X-Correlation-ID"] = f"apim-smoketests-{uuid.uuid4()}"
invalid_payload = pds_change_of_gp_mds_event_mock
invalid_payload["time"] = "202-04-05T17:31:00.000Z"

resp = requests.post(
f"{nhsd_apim_proxy_url}/events",
headers=nhsd_apim_auth_headers,
json=pds_change_of_gp_mds_event_mock
)

assert resp.status_code == 400
assert resp.json() == {"validationErrors": {"time": "Please provide a valid time"}}
retries = 0

while retries < 5:
resp = requests.post(
f"{nhsd_apim_proxy_url}/events",
headers=nhsd_apim_auth_headers,
json=pds_change_of_gp_mds_event_mock
)

@pytest.mark.nhsd_apim_authorization({"access": "application", "level": "level3"})
@authz_wrapper
def test_events_endpoint_returns_unauthorized_error_when_client_sends_unauthorized_event_type(
nhsd_apim_proxy_url,
nhsd_apim_auth_headers,
pds_change_of_gp_mds_event_mock,
_apigee_app_base_url,
_create_test_app
):
nhsd_apim_auth_headers["X-Correlation-ID"] = f"apim-smoketests-{uuid.uuid4()}"
invalid_payload = pds_change_of_gp_mds_event_mock
invalid_payload["type"] = "pds-death-notification-1"

resp = requests.post(
f"{nhsd_apim_proxy_url}/events",
headers=nhsd_apim_auth_headers,
json=pds_change_of_gp_mds_event_mock
)
if resp.status_code == 403:
retries = retries + 1
continue

break

assert resp.status_code == 403
assert resp.json() == {"errors": "User is not authorized to handle the requested data type"}
assert resp.status_code == 200
assert resp.json() == {"id": "236a1d4a-5d69-4fa9-9c7f-e72bf505aa5b"}

0 comments on commit 6e4e75f

Please sign in to comment.