From 999c38819126c623c4cb292def5e248f3ae70c60 Mon Sep 17 00:00:00 2001 From: pcmxgti <16561338+pcmxgti@users.noreply.github.com> Date: Wed, 25 Oct 2023 20:33:52 -0400 Subject: [PATCH 1/4] Make duo_api_post into a pass-through call --- tokendito/duo.py | 26 ++------------------------ tokendito/http_client.py | 4 ++-- 2 files changed, 4 insertions(+), 26 deletions(-) diff --git a/tokendito/duo.py b/tokendito/duo.py index 68caece0..0b010e80 100644 --- a/tokendito/duo.py +++ b/tokendito/duo.py @@ -9,9 +9,9 @@ from urllib.parse import urlparse from bs4 import BeautifulSoup -import requests from tokendito import user from tokendito.config import config +from tokendito.http_client import HTTP_client logger = logging.getLogger(__name__) @@ -50,29 +50,7 @@ def duo_api_post(url, params=None, headers=None, payload=None): :param payload: Request body. :return response: Response to the API request. """ - try: - response = requests.request("POST", url, params=params, headers=headers, data=payload) - except Exception as request_issue: - logger.error(f"There was an error connecting to the Duo API: {request_issue}") - sys.exit(1) - - json_message = None - try: - json_message = response.json() - except ValueError: - logger.debug(f"Non-json response from Duo API: {response}") - - if response.status_code != 200: - logger.error(f"Your Duo authentication has failed with status {response.status_code}.") - if json_message and json_message["stat"].lower() != "ok": - logger.error(f"{response.status_code}, {json_message['message']}") - else: - logger.error( - "Please re-run the program with parameter" - ' "--loglevel debug" to see more information.' - ) - sys.exit(2) - + response = HTTP_client.post(url, params=params, headers=headers, data=payload, return_json=True) return response diff --git a/tokendito/http_client.py b/tokendito/http_client.py index cb67acc1..b40f1fae 100644 --- a/tokendito/http_client.py +++ b/tokendito/http_client.py @@ -48,10 +48,10 @@ def get(self, url, params=None, headers=None): logger.error(f"The get request to {url} failed with {err}") sys.exit(1) - def post(self, url, data=None, json=None, headers=None, return_json=False): + def post(self, url, data=None, json=None, headers=None, params=None, return_json=False): """Perform a POST request.""" try: - response = self.session.post(url, data=data, json=json, headers=headers) + response = self.session.post(url, data=data, json=json, params=params, headers=headers) response.raise_for_status() if return_json is True: try: From 83c75409b612371e57630dd4840a0df7906a4620 Mon Sep 17 00:00:00 2001 From: pcmxgti <16561338+pcmxgti@users.noreply.github.com> Date: Wed, 25 Oct 2023 17:33:29 -0400 Subject: [PATCH 2/4] Improve DUO tests, and clean up methods for easier testing. --- tests/unit/test_duo.py | 270 +++++++++++++++++++++++++++++++++++++++- tests/unit/test_okta.py | 12 +- tokendito/duo.py | 137 +++++++++++--------- tokendito/okta.py | 4 +- 4 files changed, 346 insertions(+), 77 deletions(-) diff --git a/tests/unit/test_duo.py b/tests/unit/test_duo.py index 6eb9ec9d..e01c18c3 100644 --- a/tests/unit/test_duo.py +++ b/tests/unit/test_duo.py @@ -3,14 +3,20 @@ """Unit tests, and local fixtures for DUO module.""" from unittest.mock import Mock +import pytest +from tokendito.http_client import HTTP_client -def test_set_passcode(mocker): + +def test_get_passcode(mocker): """Check if numerical passcode can handle leading zero values.""" from tokendito import duo mocker.patch("tokendito.user.tty_assertion", return_value=True) mocker.patch("tokendito.user.input", return_value="0123456") - assert duo.set_passcode({"factor": "passcode"}) == "0123456" + assert duo.get_passcode({"factor": "passcode"}) == "0123456" + assert duo.get_passcode({"factor": "PassCode"}) == "0123456" + assert duo.get_passcode({"factor": "push"}) is None + assert duo.get_passcode("pytest") is None def test_prepare_duo_info(): @@ -51,6 +57,10 @@ def test_prepare_duo_info(): } assert prepare_duo_info(selected_okta_factor) == expected_duo_info + with pytest.raises(SystemExit) as err: + prepare_duo_info({"badresponse": "FAIL"}) + assert err.value.code == 1 + def test_get_duo_sid(mocker): """Check if got sid correct.""" @@ -81,12 +91,264 @@ def test_get_duo_sid(mocker): assert duo_sid_info["sid"] == "testval" assert duo_auth_response.url == test_url + mocker.patch("tokendito.duo.duo_api_post", return_value="FAIL") + with pytest.raises(SystemExit) as err: + get_duo_sid(test_duo_info) + assert err.value.code == 2 + def test_get_mfa_response(): """Test if mfa verify correctly.""" from tokendito.duo import get_mfa_response mfa_result = Mock() - mfa_result.json = Mock(return_value={"response": "test_response"}) - assert get_mfa_response(mfa_result) == "test_response" + # Test if response is correct + mfa_result.json = Mock(return_value={"response": "test_value"}) + assert get_mfa_response(mfa_result) == "test_value" + + # Test if response is incorrect + mfa_result.json = Mock(return_value={"badresponse": "FAIL"}) + with pytest.raises(SystemExit) as err: + get_mfa_response(mfa_result) + assert err.value.code == 1 + + # Test generic failure + with pytest.raises(SystemExit) as err: + get_mfa_response(Mock(return_value="FAIL")) + assert err.value.code == 1 + + +def test_duo_api_post(mocker): + """Test if duo api post correctly.""" + from tokendito.duo import duo_api_post + + mock_post = mocker.patch("requests.Session.post") + mock_resp = mocker.Mock() + mock_resp.status_code = 201 + mock_resp.json.return_value = {"status": "pytest"} + mock_post.return_value = mock_resp + + response = duo_api_post("https://pytest/") + assert response == {"status": "pytest"} + + +def test_get_duo_devices(mocker): + """Test that we can get a list of devices.""" + from tokendito.duo import get_duo_devices + + mock_resp = mocker.Mock() + mock_resp.status_code = 200 + mock_resp.content = "" + + # Test generic failure or empty response + with pytest.raises(SystemExit) as err: + get_duo_devices(mock_resp) + assert err.value.code == 2 + + # Test no devices in list + mock_resp.content = """ + + """ + assert get_duo_devices(mock_resp) == [] + + # Test devices in list + mock_resp.content = """ + +
+ +
+ """ + assert get_duo_devices(mock_resp) == [ + {"device": "pytest_device - pytest_device_name", "factor": "factor_type"} + ] + + +def test_parse_duo_mfa_challenge(): + """Test parsing the response to the challenge.""" + from tokendito.duo import parse_duo_mfa_challenge + + mfa_challenge = Mock() + + # Test successful challenge + mfa_challenge.json = Mock(return_value={"stat": "OK", "response": {"txid": "pytest"}}) + assert parse_duo_mfa_challenge(mfa_challenge) == "pytest" + + # Test error + mfa_challenge.json = Mock(return_value={"stat": "OK", "response": "error"}) + with pytest.raises(SystemExit) as err: + parse_duo_mfa_challenge(mfa_challenge) + assert err.value.code == 1 + + # Test no response in returned content + mfa_challenge.json = Mock(return_value={"stat": "OK", "badresponse": "error"}) + with pytest.raises(SystemExit) as err: + parse_duo_mfa_challenge(mfa_challenge) + assert err.value.code == 1 + + # Test API failure + mfa_challenge.json = Mock(return_value={"stat": "fail", "response": {"txid": "error"}}) + with pytest.raises(SystemExit) as err: + parse_duo_mfa_challenge(mfa_challenge) + assert err.value.code == 1 + + +def test_duo_mfa_challenge(mocker): + """TODO: Test MFA challenge.""" + from tokendito.duo import duo_mfa_challenge + + with pytest.raises(SystemExit) as err: + duo_mfa_challenge(None, None, None) + assert err.value.code == 2 + + duo_info = { + "okta_factor": "okta_factor", + "factor_id": 1234, + "state_token": 12345, + "okta_callback_url": "http://test.okta.href", + "tx": "pytest_tx", + "tile_sig": "pytest_tile_sig", + "parent": "pytest_parent", + "host": "pytest_host", + "sid": "pytest_sid", + "version": "3.7", + } + passcode = "pytest_passcode" + mfa_option = {"factor": "pytest_factor", "device": "pytest_device - pytest_device_name"} + + duo_api_response = mocker.Mock() + duo_api_response.json.return_value = {"stat": "OK", "response": {"txid": "pytest_txid"}} + + mocker.patch("tokendito.duo.duo_api_post", return_value=duo_api_response) + + txid = duo_mfa_challenge(duo_info, mfa_option, passcode) + assert txid == "pytest_txid" + + +def test_parse_challenge(): + """Test that we can parse a challenge.""" + from tokendito.duo import parse_challenge + + verify_mfa = {"status": "SUCCESS", "result": "SUCCESS", "reason": "pytest"} + assert parse_challenge(verify_mfa, None) == ("success", "pytest") + + verify_mfa = {"status": "UNKNOWN", "reason": "UNKNOWN"} + challenge_result = {"result": "PYTEST"} + assert parse_challenge(verify_mfa, challenge_result) == (challenge_result, "UNKNOWN") + + +@pytest.mark.parametrize( + "return_value,side_effect,expected", + [ + (("success", "pytest"), None, "pytest"), + ((None, None), [(None, None), ("success", "pytest")], "pytest"), + (("failure", "pytest"), None, SystemExit), + ], +) +def test_duo_mfa_verify(mocker, return_value, side_effect, expected): + """Test MFA challenge completion. + + side_effect is utilized to return different values on different iterations. + """ + from tokendito.duo import duo_mfa_verify + + mocker.patch.object(HTTP_client, "post", return_value=None) + mocker.patch("time.sleep", return_value=None) + mocker.patch("tokendito.duo.get_mfa_response", return_value="pytest") + mocker.patch( + "tokendito.duo.parse_challenge", return_value=return_value, side_effect=side_effect + ) + + duo_info = {"host": "pytest_host", "sid": "pytest_sid"} + txid = "pytest_txid" + + if expected == SystemExit: + # Test failure as exit condition + with pytest.raises(expected) as err: + duo_mfa_verify(duo_info, txid) + assert err.value.code == 2 + else: + # Test success, failure, and iterated calls + assert duo_mfa_verify(duo_info, txid) == expected + + +def test_duo_factor_callback(mocker): + """Test submitting factor to callback API.""" + from tokendito.duo import duo_factor_callback + + duo_info = {"host": "pytest_host", "sid": "pytest_sid", "tile_sig": "pytest_tile_sig"} + verify_mfa = {"result_url": "/pytest_result_url"} + + duo_api_response = mocker.Mock() + duo_api_response.json.return_value = { + "stat": "OK", + "response": {"txid": "pytest_txid", "cookie": "pytest_cookie"}, + } + mocker.patch("tokendito.duo.duo_api_post", return_value=duo_api_response) + + # Test successful retrieval of the cookie + sig_response = duo_factor_callback(duo_info, verify_mfa) + assert sig_response == "pytest_cookie:pytest_tile_sig" + + # Test failure to retrieve the cookie + duo_api_response.json.return_value = {"stat": "FAIL", "response": "pytest_error"} + with pytest.raises(SystemExit) as err: + duo_factor_callback(duo_info, verify_mfa) + assert err.value.code == 2 + + +def test_authenticate_duo(mocker): + """Test end to end authentication.""" + from tokendito.duo import authenticate_duo + + mocker.patch( + "tokendito.duo.get_duo_sid", + return_value=( + { + "sid": "pytest", + "host": "pytest", + "state_token": "pytest", + "factor_id": "pytest", + "okta_callback_url": "pytest", + }, + "pytest", + ), + ) + # We mock a lot of functions here, but we're really just testing that the data can flow, + # and that it can be parsed correctly to be sent to the API endpoint. + mocker.patch("tokendito.duo.get_duo_devices", return_value=[{"device": "pytest - device"}]) + mocker.patch("tokendito.user.select_preferred_mfa_index", return_value=0) + mocker.patch("tokendito.user.input", return_value="0123456") + mocker.patch("tokendito.duo.duo_mfa_challenge", return_value="txid_pytest") + mocker.patch("tokendito.duo.duo_mfa_verify", return_value={"result_url": "/pytest_result_url"}) + mocker.patch("tokendito.duo.duo_api_post", return_value=None) + mocker.patch("tokendito.duo.duo_factor_callback", return_value="pytest_cookie:pytest_tile_sig") + selected_okta_factor = { + "_embedded": { + "factor": { + "_embedded": { + "verification": { + "_links": { + "complete": {"href": "http://test.okta.href"}, + "script": {"href": "python-v3.7"}, + }, + "signature": "fdsafdsa:fdsfdfds:fdsfdsfds", + "host": "test_host", + } + }, + "id": 1234, + } + }, + "stateToken": 12345, + } + + res = authenticate_duo(selected_okta_factor) + assert { + "id": "pytest", + "sig_response": "pytest_cookie:pytest_tile_sig", + "stateToken": "pytest", + } == res diff --git a/tests/unit/test_okta.py b/tests/unit/test_okta.py index b2769b22..1d322b7d 100644 --- a/tests/unit/test_okta.py +++ b/tests/unit/test_okta.py @@ -92,16 +92,12 @@ def test_mfa_provider_type( mocker.patch("tokendito.duo.duo_api_post", return_value=None) payload = {"x": "y", "t": "z"} - callback_url = "https://www.acme.org" selected_mfa_option = 1 mfa_challenge_url = 1 primary_auth = 1 pytest_config = Config() - mocker.patch( - "tokendito.duo.authenticate_duo", - return_value=(payload, sample_headers, callback_url), - ) + mocker.patch("tokendito.duo.authenticate_duo", return_value=payload) mocker.patch("tokendito.okta.push_approval", return_value={"sessionToken": session_token}) mocker.patch("tokendito.okta.totp_approval", return_value={"sessionToken": session_token}) @@ -128,7 +124,6 @@ def test_bad_mfa_provider_type(mocker, sample_headers): pytest_config = Config() payload = {"x": "y", "t": "z"} - callback_url = "https://www.acme.org" selected_mfa_option = 1 mfa_challenge_url = 1 primary_auth = 1 @@ -140,10 +135,7 @@ def test_bad_mfa_provider_type(mocker, sample_headers): mock_response = Mock() mock_response.json.return_value = mfa_verify - mocker.patch( - "tokendito.duo.authenticate_duo", - return_value=(payload, sample_headers, callback_url), - ) + mocker.patch("tokendito.duo.authenticate_duo", return_value=payload) mocker.patch.object(HTTP_client, "post", return_value=mock_response) mocker.patch("tokendito.okta.totp_approval", return_value=mfa_verify) diff --git a/tokendito/duo.py b/tokendito/duo.py index 0b010e80..ba3b25c2 100644 --- a/tokendito/duo.py +++ b/tokendito/duo.py @@ -8,6 +8,7 @@ from urllib.parse import unquote from urllib.parse import urlparse +import bs4 from bs4 import BeautifulSoup from tokendito import user from tokendito.config import config @@ -23,21 +24,24 @@ def prepare_duo_info(selected_okta_factor): :return duo_info: dict of parameters for Duo """ duo_info = {} - okta_factor = selected_okta_factor["_embedded"]["factor"]["_embedded"]["verification"] - duo_info["okta_factor"] = okta_factor - duo_info["factor_id"] = selected_okta_factor["_embedded"]["factor"]["id"] - - duo_info["state_token"] = selected_okta_factor["stateToken"] - duo_info["okta_callback_url"] = okta_factor["_links"]["complete"]["href"] - duo_info["tx"] = okta_factor["signature"].split(":")[0] - duo_info["tile_sig"] = okta_factor["signature"].split(":")[1] - duo_info["parent"] = f"{config.okta['org']}/signin/verify/duo/web" - duo_info["host"] = okta_factor["host"] - duo_info["sid"] = "" - - version = okta_factor["_links"]["script"]["href"].split("-v")[1] - duo_info["version"] = version.strip(".js") - + try: + okta_factor = selected_okta_factor["_embedded"]["factor"]["_embedded"]["verification"] + duo_info["okta_factor"] = okta_factor + duo_info["factor_id"] = selected_okta_factor["_embedded"]["factor"]["id"] + + duo_info["state_token"] = selected_okta_factor["stateToken"] + duo_info["okta_callback_url"] = okta_factor["_links"]["complete"]["href"] + duo_info["tx"] = okta_factor["signature"].split(":")[0] + duo_info["tile_sig"] = okta_factor["signature"].split(":")[1] + duo_info["parent"] = f"{config.okta['org']}/signin/verify/duo/web" + duo_info["host"] = okta_factor["host"] + duo_info["sid"] = "" + + version = okta_factor["_links"]["script"]["href"].split("-v")[1] + duo_info["version"] = version.strip(".js") + except KeyError as missing_key: + logger.error(f"There was an issue parsing the Okta factor. Please try again: {missing_key}") + sys.exit(1) return duo_info @@ -76,7 +80,7 @@ def get_duo_sid(duo_info): logger.error(f"There was an error getting your SID. Please try again: {sid_error}") sys.exit(2) - return duo_info, duo_auth_response + return (duo_info, duo_auth_response) def get_duo_devices(duo_auth): @@ -91,17 +95,22 @@ def get_duo_devices(duo_auth): :return factor_options: list of dict objects describing each MFA option. """ soup = BeautifulSoup(duo_auth.content, "html.parser") + devices = [] - device_soup = soup.find("select", {"name": "device"}).findAll("option") # type: ignore - devices = [f"{d['value']} - {d.text}" for d in device_soup] + device_soup = soup.find("select", {"name": "device"}) + if type(device_soup) is bs4.element.Tag: + options = device_soup.findAll("option") + devices = [f"{d['value']} - {d.text}" for d in options] if not devices: logger.error("Please configure devices for your Duo MFA and retry.") sys.exit(2) factor_options = [] + factors = [] for device in devices: options = soup.find("fieldset", {"data-device-index": device.split(" - ")[0]}) - factors = options.findAll("input", {"name": "factor"}) # type: ignore (PEP 561) + if type(options) is bs4.element.Tag: + factors = options.findAll("input", {"name": "factor"}) for factor in factors: factor_option = {"device": device, "factor": factor["value"]} factor_options.append(factor_option) @@ -118,8 +127,8 @@ def parse_duo_mfa_challenge(mfa_challenge): mfa_challenge = mfa_challenge.json() mfa_status = mfa_challenge["stat"] txid = mfa_challenge["response"]["txid"] - except ValueError as value_error: - logger.error(f"The Duo API returned a non-json response: {value_error}") + except (TypeError, ValueError) as err: + logger.error(f"The Duo API returned a non-json response: {err}") sys.exit(1) except KeyError as key_error: logger.error(f"The Duo API response is missing a required parameter: {key_error}") @@ -127,7 +136,7 @@ def parse_duo_mfa_challenge(mfa_challenge): sys.exit(1) if mfa_status == "fail": - logger.error(f"Your Duo authentication has failed: {mfa_challenge['message']}") + logger.error(f"Your Duo authentication has failed: {mfa_challenge}") sys.exit(1) return txid @@ -143,23 +152,29 @@ def duo_mfa_challenge(duo_info, mfa_option, passcode): :param mfa_option: the user's selected second factor. :return txid: Duo transaction ID used to track this auth attempt. """ - url = f"https://{duo_info['host']}/frame/prompt" - device = mfa_option["device"].split(" - ")[0] - mfa_data = { - "factor": mfa_option["factor"], - "device": device, - "sid": duo_info["sid"], - "out_of_date": False, - "days_out_of_date": 0, - "days_to_block": None, - "async": True, - } + mfa_data = {} + try: + url = f"https://{duo_info['host']}/frame/prompt" + device = mfa_option["device"].split(" - ")[0] + mfa_data = { + "factor": mfa_option["factor"], + "device": device, + "sid": duo_info["sid"], + "out_of_date": False, + "days_out_of_date": 0, + "days_to_block": None, + "async": True, + } + except Exception as key_error: + logger.error(f"Unable to parse Duo MFA data: {key_error}") + sys.exit(2) + if passcode: mfa_data["passcode"] = passcode mfa_challenge = duo_api_post(url, payload=mfa_data) txid = parse_duo_mfa_challenge(mfa_challenge) - logger.debug("Sent MFA Challenge and obtained Duo transaction ID.") + logger.debug(f"Sent MFA Challenge and obtained Duo transaction ID {txid}") return txid @@ -171,8 +186,12 @@ def get_mfa_response(mfa_result): """ try: verify_mfa = mfa_result.json()["response"] - except Exception as parse_error: - logger.error(f"There was an error parsing the mfa challenge result: {parse_error}") + except KeyError as key_error: + logger.error(f"The mfa challenge response is missing a required parameter: {key_error}") + logger.debug(json.dumps(mfa_result.json())) + sys.exit(1) + except Exception as other_error: + logger.error(f"There was an error getting the mfa challenge result: {other_error}") sys.exit(1) return verify_mfa @@ -197,7 +216,7 @@ def parse_challenge(verify_mfa, challenge_result): challenge_result = verify_mfa["result"].lower() logger.debug(f"Challenge result is {challenge_result}") - return challenge_result, challenge_reason + return (challenge_result, challenge_reason) def duo_mfa_verify(duo_info, txid): @@ -218,11 +237,9 @@ def duo_mfa_verify(duo_info, txid): logger.debug("Waiting for MFA challenge response") mfa_result = duo_api_post(url, payload=challenged_mfa) verify_mfa = get_mfa_response(mfa_result) - challenge_result, challenge_reason = parse_challenge(verify_mfa, challenge_result) + (challenge_result, challenge_reason) = parse_challenge(verify_mfa, challenge_result) - if challenge_result is None: - continue - elif challenge_result == "success": + if challenge_result == "success": logger.debug("Successful MFA challenge received") break elif challenge_result == "failure": @@ -251,18 +268,16 @@ def duo_factor_callback(duo_info, verify_mfa): try: sig_response = f"{factor_callback.json()['response']['cookie']}:{duo_info['tile_sig']}" except Exception as sig_error: - logger.error( - "There was an error getting your application signature " - f"from Duo: {json.dumps(sig_error)}" - ) + logger.error("There was an error getting your application signature. Please try again.") + logger.debug(f"from Duo: {sig_error}") sys.exit(2) - logger.debug("Completed factor callback.") + logger.debug(f"Completed callback to {factor_callback_url} with sig_response {sig_response}") return sig_response -def set_passcode(mfa_option): - """Set totp passcode. +def get_passcode(mfa_option): + """Get totp passcode. If the user has selected the passcode option, collect their TOTP. @@ -270,9 +285,13 @@ def set_passcode(mfa_option): :return passcode: passcode value from user """ passcode = None - if mfa_option["factor"].lower() == "passcode": - user.print("Type your TOTP and press Enter:") - passcode = user.get_input() + + try: + if mfa_option["factor"].lower() == "passcode": + user.print("Type your TOTP and press Enter:") + passcode = user.get_input() + except (TypeError, KeyError): + pass return passcode @@ -285,15 +304,10 @@ def authenticate_duo(selected_okta_factor): :param selected_okta_factor: Duo factor information retrieved from Okta. :return payload: required payload for Okta callback - :return headers: required headers for Okta callback """ - try: - duo_info = prepare_duo_info(selected_okta_factor) - except KeyError as missing_key: - logger.error(f"There was an issue parsing the Okta factor. Please try again: {missing_key}") - sys.exit(1) + duo_info = prepare_duo_info(selected_okta_factor) # Collect devices, factors, auth params for Duo - duo_info, duo_auth_response = get_duo_sid(duo_info) + (duo_info, duo_auth_response) = get_duo_sid(duo_info) factor_options = get_duo_devices(duo_auth_response) mfa_index = user.select_preferred_mfa_index( factor_options, factor_key="factor", subfactor_key="device" @@ -301,7 +315,7 @@ def authenticate_duo(selected_okta_factor): mfa_option = factor_options[mfa_index] logger.debug(f"Selected MFA is [{mfa_option}]") - passcode = set_passcode(mfa_option) + passcode = get_passcode(mfa_option) txid = duo_mfa_challenge(duo_info, mfa_option, passcode) verify_mfa = duo_mfa_verify(duo_info, txid) @@ -315,6 +329,7 @@ def authenticate_duo(selected_okta_factor): "sig_response": sig_response, "stateToken": duo_info["state_token"], } - headers = {"content-type": "application/json", "accept": "application/json"} - return payload, headers, duo_info["okta_callback_url"] + # Send Okta callback and return payload + duo_api_post(duo_info["okta_callback_url"], payload=payload) + return payload diff --git a/tokendito/okta.py b/tokendito/okta.py index e23e5981..9c33b4e4 100644 --- a/tokendito/okta.py +++ b/tokendito/okta.py @@ -464,8 +464,8 @@ def mfa_provider_type( factor_type = selected_factor.get("_embedded", {}).get("factor", {}).get("factorType", None) if mfa_provider == "DUO": - payload, headers, callback_url = duo.authenticate_duo(selected_factor) - duo.duo_api_post(callback_url, payload=payload) + mfa_verify = duo.authenticate_duo(selected_factor) + headers = {"content-type": "application/json", "accept": "application/json"} mfa_verify = HTTP_client.post( mfa_challenge_url, json=payload, headers=headers, return_json=True ) From 7dd7e621b7c091f475d37f5c12c2e4d4e3ce3edf Mon Sep 17 00:00:00 2001 From: pcmxgti <16561338+pcmxgti@users.noreply.github.com> Date: Mon, 30 Oct 2023 09:48:59 -0400 Subject: [PATCH 3/4] Clean up method names for readability --- tests/unit/test_duo.py | 94 ++++++++++++++++++++--------------------- tests/unit/test_okta.py | 6 +-- tokendito/duo.py | 42 +++++++++--------- tokendito/okta.py | 2 +- 4 files changed, 72 insertions(+), 72 deletions(-) diff --git a/tests/unit/test_duo.py b/tests/unit/test_duo.py index e01c18c3..efb95b07 100644 --- a/tests/unit/test_duo.py +++ b/tests/unit/test_duo.py @@ -19,10 +19,10 @@ def test_get_passcode(mocker): assert duo.get_passcode("pytest") is None -def test_prepare_duo_info(): +def test_prepare_info(): """Test behaviour empty return duo info.""" from tokendito.config import config - from tokendito.duo import prepare_duo_info + from tokendito.duo import prepare_info selected_okta_factor = { "_embedded": { @@ -55,17 +55,17 @@ def test_prepare_duo_info(): "sid": "", "version": "3.7", } - assert prepare_duo_info(selected_okta_factor) == expected_duo_info + assert prepare_info(selected_okta_factor) == expected_duo_info with pytest.raises(SystemExit) as err: - prepare_duo_info({"badresponse": "FAIL"}) + prepare_info({"badresponse": "FAIL"}) assert err.value.code == 1 -def test_get_duo_sid(mocker): +def test_get_sid(mocker): """Check if got sid correct.""" from tokendito.config import config - from tokendito.duo import get_duo_sid + from tokendito.duo import get_sid test_duo_info = { "okta_factor": "okta_factor", @@ -84,16 +84,16 @@ def test_get_duo_sid(mocker): duo_api_response = Mock() duo_api_response.url = test_url - mocker.patch("tokendito.duo.duo_api_post", return_value=duo_api_response) + mocker.patch("tokendito.duo.api_post", return_value=duo_api_response) - duo_sid_info, duo_auth_response = get_duo_sid(test_duo_info) + duo_sid_info, duo_auth_response = get_sid(test_duo_info) assert duo_sid_info["sid"] == "testval" assert duo_auth_response.url == test_url - mocker.patch("tokendito.duo.duo_api_post", return_value="FAIL") + mocker.patch("tokendito.duo.api_post", return_value="FAIL") with pytest.raises(SystemExit) as err: - get_duo_sid(test_duo_info) + get_sid(test_duo_info) assert err.value.code == 2 @@ -119,9 +119,9 @@ def test_get_mfa_response(): assert err.value.code == 1 -def test_duo_api_post(mocker): +def test_api_post(mocker): """Test if duo api post correctly.""" - from tokendito.duo import duo_api_post + from tokendito.duo import api_post mock_post = mocker.patch("requests.Session.post") mock_resp = mocker.Mock() @@ -129,13 +129,13 @@ def test_duo_api_post(mocker): mock_resp.json.return_value = {"status": "pytest"} mock_post.return_value = mock_resp - response = duo_api_post("https://pytest/") + response = api_post("https://pytest/") assert response == {"status": "pytest"} -def test_get_duo_devices(mocker): +def test_get_devices(mocker): """Test that we can get a list of devices.""" - from tokendito.duo import get_duo_devices + from tokendito.duo import get_devices mock_resp = mocker.Mock() mock_resp.status_code = 200 @@ -143,7 +143,7 @@ def test_get_duo_devices(mocker): # Test generic failure or empty response with pytest.raises(SystemExit) as err: - get_duo_devices(mock_resp) + get_devices(mock_resp) assert err.value.code == 2 # Test no devices in list @@ -152,7 +152,7 @@ def test_get_duo_devices(mocker): """ - assert get_duo_devices(mock_resp) == [] + assert get_devices(mock_resp) == [] # Test devices in list mock_resp.content = """ @@ -163,46 +163,46 @@ def test_get_duo_devices(mocker): """ - assert get_duo_devices(mock_resp) == [ + assert get_devices(mock_resp) == [ {"device": "pytest_device - pytest_device_name", "factor": "factor_type"} ] -def test_parse_duo_mfa_challenge(): +def test_parse_mfa_challenge(): """Test parsing the response to the challenge.""" - from tokendito.duo import parse_duo_mfa_challenge + from tokendito.duo import parse_mfa_challenge mfa_challenge = Mock() # Test successful challenge mfa_challenge.json = Mock(return_value={"stat": "OK", "response": {"txid": "pytest"}}) - assert parse_duo_mfa_challenge(mfa_challenge) == "pytest" + assert parse_mfa_challenge(mfa_challenge) == "pytest" # Test error mfa_challenge.json = Mock(return_value={"stat": "OK", "response": "error"}) with pytest.raises(SystemExit) as err: - parse_duo_mfa_challenge(mfa_challenge) + parse_mfa_challenge(mfa_challenge) assert err.value.code == 1 # Test no response in returned content mfa_challenge.json = Mock(return_value={"stat": "OK", "badresponse": "error"}) with pytest.raises(SystemExit) as err: - parse_duo_mfa_challenge(mfa_challenge) + parse_mfa_challenge(mfa_challenge) assert err.value.code == 1 # Test API failure mfa_challenge.json = Mock(return_value={"stat": "fail", "response": {"txid": "error"}}) with pytest.raises(SystemExit) as err: - parse_duo_mfa_challenge(mfa_challenge) + parse_mfa_challenge(mfa_challenge) assert err.value.code == 1 -def test_duo_mfa_challenge(mocker): +def test_mfa_challenge(mocker): """TODO: Test MFA challenge.""" - from tokendito.duo import duo_mfa_challenge + from tokendito.duo import mfa_challenge with pytest.raises(SystemExit) as err: - duo_mfa_challenge(None, None, None) + mfa_challenge(None, None, None) assert err.value.code == 2 duo_info = { @@ -223,9 +223,9 @@ def test_duo_mfa_challenge(mocker): duo_api_response = mocker.Mock() duo_api_response.json.return_value = {"stat": "OK", "response": {"txid": "pytest_txid"}} - mocker.patch("tokendito.duo.duo_api_post", return_value=duo_api_response) + mocker.patch("tokendito.duo.api_post", return_value=duo_api_response) - txid = duo_mfa_challenge(duo_info, mfa_option, passcode) + txid = mfa_challenge(duo_info, mfa_option, passcode) assert txid == "pytest_txid" @@ -249,12 +249,12 @@ def test_parse_challenge(): (("failure", "pytest"), None, SystemExit), ], ) -def test_duo_mfa_verify(mocker, return_value, side_effect, expected): +def test_mfa_verify(mocker, return_value, side_effect, expected): """Test MFA challenge completion. side_effect is utilized to return different values on different iterations. """ - from tokendito.duo import duo_mfa_verify + from tokendito.duo import mfa_verify mocker.patch.object(HTTP_client, "post", return_value=None) mocker.patch("time.sleep", return_value=None) @@ -269,16 +269,16 @@ def test_duo_mfa_verify(mocker, return_value, side_effect, expected): if expected == SystemExit: # Test failure as exit condition with pytest.raises(expected) as err: - duo_mfa_verify(duo_info, txid) + mfa_verify(duo_info, txid) assert err.value.code == 2 else: # Test success, failure, and iterated calls - assert duo_mfa_verify(duo_info, txid) == expected + assert mfa_verify(duo_info, txid) == expected -def test_duo_factor_callback(mocker): +def test_factor_callback(mocker): """Test submitting factor to callback API.""" - from tokendito.duo import duo_factor_callback + from tokendito.duo import factor_callback duo_info = {"host": "pytest_host", "sid": "pytest_sid", "tile_sig": "pytest_tile_sig"} verify_mfa = {"result_url": "/pytest_result_url"} @@ -288,25 +288,25 @@ def test_duo_factor_callback(mocker): "stat": "OK", "response": {"txid": "pytest_txid", "cookie": "pytest_cookie"}, } - mocker.patch("tokendito.duo.duo_api_post", return_value=duo_api_response) + mocker.patch("tokendito.duo.api_post", return_value=duo_api_response) # Test successful retrieval of the cookie - sig_response = duo_factor_callback(duo_info, verify_mfa) + sig_response = factor_callback(duo_info, verify_mfa) assert sig_response == "pytest_cookie:pytest_tile_sig" # Test failure to retrieve the cookie duo_api_response.json.return_value = {"stat": "FAIL", "response": "pytest_error"} with pytest.raises(SystemExit) as err: - duo_factor_callback(duo_info, verify_mfa) + factor_callback(duo_info, verify_mfa) assert err.value.code == 2 -def test_authenticate_duo(mocker): +def test_authenticate(mocker): """Test end to end authentication.""" - from tokendito.duo import authenticate_duo + from tokendito.duo import authenticate mocker.patch( - "tokendito.duo.get_duo_sid", + "tokendito.duo.get_sid", return_value=( { "sid": "pytest", @@ -320,13 +320,13 @@ def test_authenticate_duo(mocker): ) # We mock a lot of functions here, but we're really just testing that the data can flow, # and that it can be parsed correctly to be sent to the API endpoint. - mocker.patch("tokendito.duo.get_duo_devices", return_value=[{"device": "pytest - device"}]) + mocker.patch("tokendito.duo.get_devices", return_value=[{"device": "pytest - device"}]) mocker.patch("tokendito.user.select_preferred_mfa_index", return_value=0) mocker.patch("tokendito.user.input", return_value="0123456") - mocker.patch("tokendito.duo.duo_mfa_challenge", return_value="txid_pytest") - mocker.patch("tokendito.duo.duo_mfa_verify", return_value={"result_url": "/pytest_result_url"}) - mocker.patch("tokendito.duo.duo_api_post", return_value=None) - mocker.patch("tokendito.duo.duo_factor_callback", return_value="pytest_cookie:pytest_tile_sig") + mocker.patch("tokendito.duo.mfa_challenge", return_value="txid_pytest") + mocker.patch("tokendito.duo.mfa_verify", return_value={"result_url": "/pytest_result_url"}) + mocker.patch("tokendito.duo.api_post", return_value=None) + mocker.patch("tokendito.duo.factor_callback", return_value="pytest_cookie:pytest_tile_sig") selected_okta_factor = { "_embedded": { "factor": { @@ -346,7 +346,7 @@ def test_authenticate_duo(mocker): "stateToken": 12345, } - res = authenticate_duo(selected_okta_factor) + res = authenticate(selected_okta_factor) assert { "id": "pytest", "sig_response": "pytest_cookie:pytest_tile_sig", diff --git a/tests/unit/test_okta.py b/tests/unit/test_okta.py index 1d322b7d..7162b1d5 100644 --- a/tests/unit/test_okta.py +++ b/tests/unit/test_okta.py @@ -89,7 +89,7 @@ def test_mfa_provider_type( mock_response = {"sessionToken": session_token} mocker.patch.object(HTTP_client, "post", return_value=mock_response) - mocker.patch("tokendito.duo.duo_api_post", return_value=None) + mocker.patch("tokendito.duo.api_post", return_value=None) payload = {"x": "y", "t": "z"} selected_mfa_option = 1 @@ -97,7 +97,7 @@ def test_mfa_provider_type( primary_auth = 1 pytest_config = Config() - mocker.patch("tokendito.duo.authenticate_duo", return_value=payload) + mocker.patch("tokendito.duo.authenticate", return_value=payload) mocker.patch("tokendito.okta.push_approval", return_value={"sessionToken": session_token}) mocker.patch("tokendito.okta.totp_approval", return_value={"sessionToken": session_token}) @@ -135,7 +135,7 @@ def test_bad_mfa_provider_type(mocker, sample_headers): mock_response = Mock() mock_response.json.return_value = mfa_verify - mocker.patch("tokendito.duo.authenticate_duo", return_value=payload) + mocker.patch("tokendito.duo.authenticate", return_value=payload) mocker.patch.object(HTTP_client, "post", return_value=mock_response) mocker.patch("tokendito.okta.totp_approval", return_value=mfa_verify) diff --git a/tokendito/duo.py b/tokendito/duo.py index ba3b25c2..3853dcbd 100644 --- a/tokendito/duo.py +++ b/tokendito/duo.py @@ -17,7 +17,7 @@ logger = logging.getLogger(__name__) -def prepare_duo_info(selected_okta_factor): +def prepare_info(selected_okta_factor): """Aggregate most of the parameters needed throughout the Duo authentication process. :param selected_okta_factor: dict response describing Duo factor in Okta. @@ -45,7 +45,7 @@ def prepare_duo_info(selected_okta_factor): return duo_info -def duo_api_post(url, params=None, headers=None, payload=None): +def api_post(url, params=None, headers=None, payload=None): """Error handling and response parsing wrapper for Duo POSTs. :param url: The URL being connected to. @@ -58,7 +58,7 @@ def duo_api_post(url, params=None, headers=None, payload=None): return response -def get_duo_sid(duo_info): +def get_sid(duo_info): """Perform the initial Duo authentication request to obtain the SID. The SID is referenced throughout the authentication process for Duo. @@ -71,7 +71,7 @@ def get_duo_sid(duo_info): url = f"https://{duo_info['host']}/frame/web/v1/auth" logger.debug(f"Calling Duo {urlparse(url).path} with params {params.keys()}") - duo_auth_response = duo_api_post(url, params=params) + duo_auth_response = api_post(url, params=params) try: duo_auth_redirect = urlparse(f"{unquote(duo_auth_response.url)}").query @@ -83,7 +83,7 @@ def get_duo_sid(duo_info): return (duo_info, duo_auth_response) -def get_duo_devices(duo_auth): +def get_devices(duo_auth): """Parse Duo auth response to extract user's MFA options. The /frame/web/v1/auth API returns an html page that lists @@ -117,7 +117,7 @@ def get_duo_devices(duo_auth): return factor_options -def parse_duo_mfa_challenge(mfa_challenge): +def parse_mfa_challenge(mfa_challenge): """Gracefully parse Duo MFA challenge response. :param mfa_challenge: Duo API response for MFA challenge. @@ -141,7 +141,7 @@ def parse_duo_mfa_challenge(mfa_challenge): return txid -def duo_mfa_challenge(duo_info, mfa_option, passcode): +def mfa_challenge(duo_info, mfa_option, passcode): """Poke Duo to challenge the selected factor. After the user has selected their device and factor of choice, @@ -171,8 +171,8 @@ def duo_mfa_challenge(duo_info, mfa_option, passcode): if passcode: mfa_data["passcode"] = passcode - mfa_challenge = duo_api_post(url, payload=mfa_data) - txid = parse_duo_mfa_challenge(mfa_challenge) + mfa_challenge = api_post(url, payload=mfa_data) + txid = parse_mfa_challenge(mfa_challenge) logger.debug(f"Sent MFA Challenge and obtained Duo transaction ID {txid}") return txid @@ -219,7 +219,7 @@ def parse_challenge(verify_mfa, challenge_result): return (challenge_result, challenge_reason) -def duo_mfa_verify(duo_info, txid): +def mfa_verify(duo_info, txid): """Verify MFA challenge completion. After the user has received the MFA challenge, query the Duo API @@ -235,7 +235,7 @@ def duo_mfa_verify(duo_info, txid): while True: logger.debug("Waiting for MFA challenge response") - mfa_result = duo_api_post(url, payload=challenged_mfa) + mfa_result = api_post(url, payload=challenged_mfa) verify_mfa = get_mfa_response(mfa_result) (challenge_result, challenge_reason) = parse_challenge(verify_mfa, challenge_result) @@ -252,7 +252,7 @@ def duo_mfa_verify(duo_info, txid): return verify_mfa -def duo_factor_callback(duo_info, verify_mfa): +def factor_callback(duo_info, verify_mfa): """Inform factor callback api of successful challenge. This request seems to inform this factor's callback url @@ -263,7 +263,7 @@ def duo_factor_callback(duo_info, verify_mfa): :return sig_response: required to sign final Duo callback request. """ factor_callback_url = f"https://{duo_info['host']}{verify_mfa['result_url']}" - factor_callback = duo_api_post(factor_callback_url, payload={"sid": duo_info["sid"]}) + factor_callback = api_post(factor_callback_url, payload={"sid": duo_info["sid"]}) try: sig_response = f"{factor_callback.json()['response']['cookie']}:{duo_info['tile_sig']}" @@ -295,7 +295,7 @@ def get_passcode(mfa_option): return passcode -def authenticate_duo(selected_okta_factor): +def authenticate(selected_okta_factor): """Accomplish MFA via Duo. This is the main function that coordinates the Duo @@ -305,10 +305,10 @@ def authenticate_duo(selected_okta_factor): :param selected_okta_factor: Duo factor information retrieved from Okta. :return payload: required payload for Okta callback """ - duo_info = prepare_duo_info(selected_okta_factor) + duo_info = prepare_info(selected_okta_factor) # Collect devices, factors, auth params for Duo - (duo_info, duo_auth_response) = get_duo_sid(duo_info) - factor_options = get_duo_devices(duo_auth_response) + (duo_info, duo_auth_response) = get_sid(duo_info) + factor_options = get_devices(duo_auth_response) mfa_index = user.select_preferred_mfa_index( factor_options, factor_key="factor", subfactor_key="device" ) @@ -317,11 +317,11 @@ def authenticate_duo(selected_okta_factor): logger.debug(f"Selected MFA is [{mfa_option}]") passcode = get_passcode(mfa_option) - txid = duo_mfa_challenge(duo_info, mfa_option, passcode) - verify_mfa = duo_mfa_verify(duo_info, txid) + txid = mfa_challenge(duo_info, mfa_option, passcode) + verify_mfa = mfa_verify(duo_info, txid) # Make factor callback to Duo - sig_response = duo_factor_callback(duo_info, verify_mfa) + sig_response = factor_callback(duo_info, verify_mfa) # Prepare for Okta callback payload = { @@ -331,5 +331,5 @@ def authenticate_duo(selected_okta_factor): } # Send Okta callback and return payload - duo_api_post(duo_info["okta_callback_url"], payload=payload) + api_post(duo_info["okta_callback_url"], payload=payload) return payload diff --git a/tokendito/okta.py b/tokendito/okta.py index 9c33b4e4..1fa249bd 100644 --- a/tokendito/okta.py +++ b/tokendito/okta.py @@ -464,7 +464,7 @@ def mfa_provider_type( factor_type = selected_factor.get("_embedded", {}).get("factor", {}).get("factorType", None) if mfa_provider == "DUO": - mfa_verify = duo.authenticate_duo(selected_factor) + mfa_verify = duo.authenticate(selected_factor) headers = {"content-type": "application/json", "accept": "application/json"} mfa_verify = HTTP_client.post( mfa_challenge_url, json=payload, headers=headers, return_json=True From 5a8c7b70e2c184c4a3863b35839f54eac94bf8d9 Mon Sep 17 00:00:00 2001 From: pcmxgti <16561338+pcmxgti@users.noreply.github.com> Date: Wed, 8 Nov 2023 15:28:24 -0500 Subject: [PATCH 4/4] Adjust API calls and tests so that calls to DUO work --- tests/unit/test_duo.py | 49 +++++++++++++++++++++++++++++------------- tokendito/duo.py | 29 ++++++++++++++----------- 2 files changed, 51 insertions(+), 27 deletions(-) diff --git a/tests/unit/test_duo.py b/tests/unit/test_duo.py index efb95b07..a0ee32d5 100644 --- a/tests/unit/test_duo.py +++ b/tests/unit/test_duo.py @@ -104,15 +104,19 @@ def test_get_mfa_response(): mfa_result = Mock() # Test if response is correct - mfa_result.json = Mock(return_value={"response": "test_value"}) - assert get_mfa_response(mfa_result) == "test_value" + assert get_mfa_response({"response": "test_value"}) == "test_value" # Test if response is incorrect - mfa_result.json = Mock(return_value={"badresponse": "FAIL"}) + mfa_result = Mock(return_value={"badresponse": "FAIL"}) with pytest.raises(SystemExit) as err: get_mfa_response(mfa_result) assert err.value.code == 1 + # Test no key available + with pytest.raises(SystemExit) as err: + get_mfa_response({"pytest": "FAIL"}) + assert err.value.code == 1 + # Test generic failure with pytest.raises(SystemExit) as err: get_mfa_response(Mock(return_value="FAIL")) @@ -175,8 +179,7 @@ def test_parse_mfa_challenge(): mfa_challenge = Mock() # Test successful challenge - mfa_challenge.json = Mock(return_value={"stat": "OK", "response": {"txid": "pytest"}}) - assert parse_mfa_challenge(mfa_challenge) == "pytest" + assert parse_mfa_challenge({"stat": "OK", "response": {"txid": "pytest"}}) == "pytest" # Test error mfa_challenge.json = Mock(return_value={"stat": "OK", "response": "error"}) @@ -184,12 +187,22 @@ def test_parse_mfa_challenge(): parse_mfa_challenge(mfa_challenge) assert err.value.code == 1 + # Test no key in returned content + with pytest.raises(SystemExit) as err: + parse_mfa_challenge({"pyest": "OK", "badresponse": "error"}) + assert err.value.code == 1 + # Test no response in returned content mfa_challenge.json = Mock(return_value={"stat": "OK", "badresponse": "error"}) with pytest.raises(SystemExit) as err: parse_mfa_challenge(mfa_challenge) assert err.value.code == 1 + # Test failure + with pytest.raises(SystemExit) as err: + parse_mfa_challenge({"stat": "fail", "response": {"txid": "pytest_error"}}) + assert err.value.code == 1 + # Test API failure mfa_challenge.json = Mock(return_value={"stat": "fail", "response": {"txid": "error"}}) with pytest.raises(SystemExit) as err: @@ -220,10 +233,9 @@ def test_mfa_challenge(mocker): passcode = "pytest_passcode" mfa_option = {"factor": "pytest_factor", "device": "pytest_device - pytest_device_name"} - duo_api_response = mocker.Mock() - duo_api_response.json.return_value = {"stat": "OK", "response": {"txid": "pytest_txid"}} - - mocker.patch("tokendito.duo.api_post", return_value=duo_api_response) + mocker.patch( + "tokendito.duo.api_post", return_value={"stat": "OK", "response": {"txid": "pytest_txid"}} + ) txid = mfa_challenge(duo_info, mfa_option, passcode) assert txid == "pytest_txid" @@ -283,19 +295,26 @@ def test_factor_callback(mocker): duo_info = {"host": "pytest_host", "sid": "pytest_sid", "tile_sig": "pytest_tile_sig"} verify_mfa = {"result_url": "/pytest_result_url"} - duo_api_response = mocker.Mock() - duo_api_response.json.return_value = { + # Test successful retrieval of the cookie + duo_api_response = { "stat": "OK", "response": {"txid": "pytest_txid", "cookie": "pytest_cookie"}, } mocker.patch("tokendito.duo.api_post", return_value=duo_api_response) - - # Test successful retrieval of the cookie sig_response = factor_callback(duo_info, verify_mfa) assert sig_response == "pytest_cookie:pytest_tile_sig" - # Test failure to retrieve the cookie - duo_api_response.json.return_value = {"stat": "FAIL", "response": "pytest_error"} + # Test bad data passed in + duo_api_response = "FAIL" + mocker.patch("tokendito.duo.api_post", return_value=duo_api_response) + with pytest.raises(SystemExit) as err: + factor_callback(duo_info, verify_mfa) + assert err.value.code == 2 + + # Test bad data passed in + duo_api_response = {"stat": "FAIL", "response": {"cookie": "pytest_cookie"}} + duo_info = {"host": "pytest", "sid": "pytest"} + mocker.patch("tokendito.duo.api_post", return_value=duo_api_response) with pytest.raises(SystemExit) as err: factor_callback(duo_info, verify_mfa) assert err.value.code == 2 diff --git a/tokendito/duo.py b/tokendito/duo.py index 3853dcbd..ecc3804e 100644 --- a/tokendito/duo.py +++ b/tokendito/duo.py @@ -45,7 +45,7 @@ def prepare_info(selected_okta_factor): return duo_info -def api_post(url, params=None, headers=None, payload=None): +def api_post(url, params=None, headers=None, payload=None, return_json=True): """Error handling and response parsing wrapper for Duo POSTs. :param url: The URL being connected to. @@ -54,7 +54,9 @@ def api_post(url, params=None, headers=None, payload=None): :param payload: Request body. :return response: Response to the API request. """ - response = HTTP_client.post(url, params=params, headers=headers, data=payload, return_json=True) + response = HTTP_client.post( + url, params=params, headers=headers, data=payload, return_json=return_json + ) return response @@ -71,7 +73,7 @@ def get_sid(duo_info): url = f"https://{duo_info['host']}/frame/web/v1/auth" logger.debug(f"Calling Duo {urlparse(url).path} with params {params.keys()}") - duo_auth_response = api_post(url, params=params) + duo_auth_response = api_post(url, params=params, return_json=False) try: duo_auth_redirect = urlparse(f"{unquote(duo_auth_response.url)}").query @@ -124,10 +126,9 @@ def parse_mfa_challenge(mfa_challenge): :return txid: Duo transaction ID. """ try: - mfa_challenge = mfa_challenge.json() mfa_status = mfa_challenge["stat"] txid = mfa_challenge["response"]["txid"] - except (TypeError, ValueError) as err: + except (TypeError, ValueError, AttributeError) as err: logger.error(f"The Duo API returned a non-json response: {err}") sys.exit(1) except KeyError as key_error: @@ -185,10 +186,10 @@ def get_mfa_response(mfa_result): :return verify_mfa: json response from mfa api """ try: - verify_mfa = mfa_result.json()["response"] + verify_mfa = mfa_result["response"] except KeyError as key_error: logger.error(f"The mfa challenge response is missing a required parameter: {key_error}") - logger.debug(json.dumps(mfa_result.json())) + logger.debug(mfa_result) sys.exit(1) except Exception as other_error: logger.error(f"There was an error getting the mfa challenge result: {other_error}") @@ -265,13 +266,17 @@ def factor_callback(duo_info, verify_mfa): factor_callback_url = f"https://{duo_info['host']}{verify_mfa['result_url']}" factor_callback = api_post(factor_callback_url, payload={"sid": duo_info["sid"]}) - try: - sig_response = f"{factor_callback.json()['response']['cookie']}:{duo_info['tile_sig']}" - except Exception as sig_error: + if type(factor_callback) is not dict: logger.error("There was an error getting your application signature. Please try again.") - logger.debug(f"from Duo: {sig_error}") + logger.debug(f"Response from DUO: {factor_callback}") sys.exit(2) + try: + sig_response = f"{factor_callback['response']['cookie']}:{duo_info['tile_sig']}" + except (KeyError, TypeError) as err: + logger.error("There was an error getting your application signature. Please try again.") + logger.debug(f"Response from DUO: {err}") + sys.exit(2) logger.debug(f"Completed callback to {factor_callback_url} with sig_response {sig_response}") return sig_response @@ -331,5 +336,5 @@ def authenticate(selected_okta_factor): } # Send Okta callback and return payload - api_post(duo_info["okta_callback_url"], payload=payload) + api_post(duo_info["okta_callback_url"], payload=payload, return_json=False) return payload