diff --git a/tests/unit/test_duo.py b/tests/unit/test_duo.py index efb95b07..a0ee32d5 100644 --- a/tests/unit/test_duo.py +++ b/tests/unit/test_duo.py @@ -104,15 +104,19 @@ def test_get_mfa_response(): mfa_result = Mock() # Test if response is correct - mfa_result.json = Mock(return_value={"response": "test_value"}) - assert get_mfa_response(mfa_result) == "test_value" + assert get_mfa_response({"response": "test_value"}) == "test_value" # Test if response is incorrect - mfa_result.json = Mock(return_value={"badresponse": "FAIL"}) + mfa_result = Mock(return_value={"badresponse": "FAIL"}) with pytest.raises(SystemExit) as err: get_mfa_response(mfa_result) assert err.value.code == 1 + # Test no key available + with pytest.raises(SystemExit) as err: + get_mfa_response({"pytest": "FAIL"}) + assert err.value.code == 1 + # Test generic failure with pytest.raises(SystemExit) as err: get_mfa_response(Mock(return_value="FAIL")) @@ -175,8 +179,7 @@ def test_parse_mfa_challenge(): mfa_challenge = Mock() # Test successful challenge - mfa_challenge.json = Mock(return_value={"stat": "OK", "response": {"txid": "pytest"}}) - assert parse_mfa_challenge(mfa_challenge) == "pytest" + assert parse_mfa_challenge({"stat": "OK", "response": {"txid": "pytest"}}) == "pytest" # Test error mfa_challenge.json = Mock(return_value={"stat": "OK", "response": "error"}) @@ -184,12 +187,22 @@ def test_parse_mfa_challenge(): parse_mfa_challenge(mfa_challenge) assert err.value.code == 1 + # Test no key in returned content + with pytest.raises(SystemExit) as err: + parse_mfa_challenge({"pyest": "OK", "badresponse": "error"}) + assert err.value.code == 1 + # Test no response in returned content mfa_challenge.json = Mock(return_value={"stat": "OK", "badresponse": "error"}) with pytest.raises(SystemExit) as err: parse_mfa_challenge(mfa_challenge) assert err.value.code == 1 + # Test failure + with pytest.raises(SystemExit) as err: + parse_mfa_challenge({"stat": "fail", "response": {"txid": "pytest_error"}}) + assert err.value.code == 1 + # Test API failure mfa_challenge.json = Mock(return_value={"stat": "fail", "response": {"txid": "error"}}) with pytest.raises(SystemExit) as err: @@ -220,10 +233,9 @@ def test_mfa_challenge(mocker): passcode = "pytest_passcode" mfa_option = {"factor": "pytest_factor", "device": "pytest_device - pytest_device_name"} - duo_api_response = mocker.Mock() - duo_api_response.json.return_value = {"stat": "OK", "response": {"txid": "pytest_txid"}} - - mocker.patch("tokendito.duo.api_post", return_value=duo_api_response) + mocker.patch( + "tokendito.duo.api_post", return_value={"stat": "OK", "response": {"txid": "pytest_txid"}} + ) txid = mfa_challenge(duo_info, mfa_option, passcode) assert txid == "pytest_txid" @@ -283,19 +295,26 @@ def test_factor_callback(mocker): duo_info = {"host": "pytest_host", "sid": "pytest_sid", "tile_sig": "pytest_tile_sig"} verify_mfa = {"result_url": "/pytest_result_url"} - duo_api_response = mocker.Mock() - duo_api_response.json.return_value = { + # Test successful retrieval of the cookie + duo_api_response = { "stat": "OK", "response": {"txid": "pytest_txid", "cookie": "pytest_cookie"}, } mocker.patch("tokendito.duo.api_post", return_value=duo_api_response) - - # Test successful retrieval of the cookie sig_response = factor_callback(duo_info, verify_mfa) assert sig_response == "pytest_cookie:pytest_tile_sig" - # Test failure to retrieve the cookie - duo_api_response.json.return_value = {"stat": "FAIL", "response": "pytest_error"} + # Test bad data passed in + duo_api_response = "FAIL" + mocker.patch("tokendito.duo.api_post", return_value=duo_api_response) + with pytest.raises(SystemExit) as err: + factor_callback(duo_info, verify_mfa) + assert err.value.code == 2 + + # Test bad data passed in + duo_api_response = {"stat": "FAIL", "response": {"cookie": "pytest_cookie"}} + duo_info = {"host": "pytest", "sid": "pytest"} + mocker.patch("tokendito.duo.api_post", return_value=duo_api_response) with pytest.raises(SystemExit) as err: factor_callback(duo_info, verify_mfa) assert err.value.code == 2 diff --git a/tokendito/duo.py b/tokendito/duo.py index 3853dcbd..ecc3804e 100644 --- a/tokendito/duo.py +++ b/tokendito/duo.py @@ -45,7 +45,7 @@ def prepare_info(selected_okta_factor): return duo_info -def api_post(url, params=None, headers=None, payload=None): +def api_post(url, params=None, headers=None, payload=None, return_json=True): """Error handling and response parsing wrapper for Duo POSTs. :param url: The URL being connected to. @@ -54,7 +54,9 @@ def api_post(url, params=None, headers=None, payload=None): :param payload: Request body. :return response: Response to the API request. """ - response = HTTP_client.post(url, params=params, headers=headers, data=payload, return_json=True) + response = HTTP_client.post( + url, params=params, headers=headers, data=payload, return_json=return_json + ) return response @@ -71,7 +73,7 @@ def get_sid(duo_info): url = f"https://{duo_info['host']}/frame/web/v1/auth" logger.debug(f"Calling Duo {urlparse(url).path} with params {params.keys()}") - duo_auth_response = api_post(url, params=params) + duo_auth_response = api_post(url, params=params, return_json=False) try: duo_auth_redirect = urlparse(f"{unquote(duo_auth_response.url)}").query @@ -124,10 +126,9 @@ def parse_mfa_challenge(mfa_challenge): :return txid: Duo transaction ID. """ try: - mfa_challenge = mfa_challenge.json() mfa_status = mfa_challenge["stat"] txid = mfa_challenge["response"]["txid"] - except (TypeError, ValueError) as err: + except (TypeError, ValueError, AttributeError) as err: logger.error(f"The Duo API returned a non-json response: {err}") sys.exit(1) except KeyError as key_error: @@ -185,10 +186,10 @@ def get_mfa_response(mfa_result): :return verify_mfa: json response from mfa api """ try: - verify_mfa = mfa_result.json()["response"] + verify_mfa = mfa_result["response"] except KeyError as key_error: logger.error(f"The mfa challenge response is missing a required parameter: {key_error}") - logger.debug(json.dumps(mfa_result.json())) + logger.debug(mfa_result) sys.exit(1) except Exception as other_error: logger.error(f"There was an error getting the mfa challenge result: {other_error}") @@ -265,13 +266,17 @@ def factor_callback(duo_info, verify_mfa): factor_callback_url = f"https://{duo_info['host']}{verify_mfa['result_url']}" factor_callback = api_post(factor_callback_url, payload={"sid": duo_info["sid"]}) - try: - sig_response = f"{factor_callback.json()['response']['cookie']}:{duo_info['tile_sig']}" - except Exception as sig_error: + if type(factor_callback) is not dict: logger.error("There was an error getting your application signature. Please try again.") - logger.debug(f"from Duo: {sig_error}") + logger.debug(f"Response from DUO: {factor_callback}") sys.exit(2) + try: + sig_response = f"{factor_callback['response']['cookie']}:{duo_info['tile_sig']}" + except (KeyError, TypeError) as err: + logger.error("There was an error getting your application signature. Please try again.") + logger.debug(f"Response from DUO: {err}") + sys.exit(2) logger.debug(f"Completed callback to {factor_callback_url} with sig_response {sig_response}") return sig_response @@ -331,5 +336,5 @@ def authenticate(selected_okta_factor): } # Send Okta callback and return payload - api_post(duo_info["okta_callback_url"], payload=payload) + api_post(duo_info["okta_callback_url"], payload=payload, return_json=False) return payload