From 14db7255f4b85bcb7e6939fdfe41a2083e49e173 Mon Sep 17 00:00:00 2001 From: pcmxgti <16561338+pcmxgti@users.noreply.github.com> Date: Mon, 13 Nov 2023 22:33:21 -0500 Subject: [PATCH] Improve DUO testability (#139) * Make duo_api_post into a pass-through call * Improve DUO tests, and clean up methods for easier testing. * Clean up method names for readability * Adjust API calls and tests so that calls to DUO work --------- Co-authored-by: sevignyj <41591249+sevignyj@users.noreply.github.com> --- tests/unit/test_duo.py | 303 +++++++++++++++++++++++++++++++++++++-- tests/unit/test_okta.py | 14 +- tokendito/duo.py | 214 ++++++++++++++------------- tokendito/http_client.py | 4 +- tokendito/okta.py | 4 +- 5 files changed, 405 insertions(+), 134 deletions(-) diff --git a/tests/unit/test_duo.py b/tests/unit/test_duo.py index 6eb9ec9d..a0ee32d5 100644 --- a/tests/unit/test_duo.py +++ b/tests/unit/test_duo.py @@ -3,20 +3,26 @@ """Unit tests, and local fixtures for DUO module.""" from unittest.mock import Mock +import pytest +from tokendito.http_client import HTTP_client -def test_set_passcode(mocker): + +def test_get_passcode(mocker): """Check if numerical passcode can handle leading zero values.""" from tokendito import duo mocker.patch("tokendito.user.tty_assertion", return_value=True) mocker.patch("tokendito.user.input", return_value="0123456") - assert duo.set_passcode({"factor": "passcode"}) == "0123456" + assert duo.get_passcode({"factor": "passcode"}) == "0123456" + assert duo.get_passcode({"factor": "PassCode"}) == "0123456" + assert duo.get_passcode({"factor": "push"}) is None + assert duo.get_passcode("pytest") is None -def test_prepare_duo_info(): +def test_prepare_info(): """Test behaviour empty return duo info.""" from tokendito.config import config - from tokendito.duo import prepare_duo_info + from tokendito.duo import prepare_info selected_okta_factor = { "_embedded": { @@ -49,13 +55,17 @@ def test_prepare_duo_info(): "sid": "", "version": "3.7", } - assert prepare_duo_info(selected_okta_factor) == expected_duo_info + assert prepare_info(selected_okta_factor) == expected_duo_info + + with pytest.raises(SystemExit) as err: + prepare_info({"badresponse": "FAIL"}) + assert err.value.code == 1 -def test_get_duo_sid(mocker): +def test_get_sid(mocker): """Check if got sid correct.""" from tokendito.config import config - from tokendito.duo import get_duo_sid + from tokendito.duo import get_sid test_duo_info = { "okta_factor": "okta_factor", @@ -74,19 +84,290 @@ def test_get_duo_sid(mocker): duo_api_response = Mock() duo_api_response.url = test_url - mocker.patch("tokendito.duo.duo_api_post", return_value=duo_api_response) + mocker.patch("tokendito.duo.api_post", return_value=duo_api_response) - duo_sid_info, duo_auth_response = get_duo_sid(test_duo_info) + duo_sid_info, duo_auth_response = get_sid(test_duo_info) assert duo_sid_info["sid"] == "testval" assert duo_auth_response.url == test_url + mocker.patch("tokendito.duo.api_post", return_value="FAIL") + with pytest.raises(SystemExit) as err: + get_sid(test_duo_info) + assert err.value.code == 2 + def test_get_mfa_response(): """Test if mfa verify correctly.""" from tokendito.duo import get_mfa_response mfa_result = Mock() - mfa_result.json = Mock(return_value={"response": "test_response"}) - assert get_mfa_response(mfa_result) == "test_response" + # Test if response is correct + assert get_mfa_response({"response": "test_value"}) == "test_value" + + # Test if response is incorrect + mfa_result = Mock(return_value={"badresponse": "FAIL"}) + with pytest.raises(SystemExit) as err: + get_mfa_response(mfa_result) + assert err.value.code == 1 + + # Test no key available + with pytest.raises(SystemExit) as err: + get_mfa_response({"pytest": "FAIL"}) + assert err.value.code == 1 + + # Test generic failure + with pytest.raises(SystemExit) as err: + get_mfa_response(Mock(return_value="FAIL")) + assert err.value.code == 1 + + +def test_api_post(mocker): + """Test if duo api post correctly.""" + from tokendito.duo import api_post + + mock_post = mocker.patch("requests.Session.post") + mock_resp = mocker.Mock() + mock_resp.status_code = 201 + mock_resp.json.return_value = {"status": "pytest"} + mock_post.return_value = mock_resp + + response = api_post("https://pytest/") + assert response == {"status": "pytest"} + + +def test_get_devices(mocker): + """Test that we can get a list of devices.""" + from tokendito.duo import get_devices + + mock_resp = mocker.Mock() + mock_resp.status_code = 200 + mock_resp.content = "" + + # Test generic failure or empty response + with pytest.raises(SystemExit) as err: + get_devices(mock_resp) + assert err.value.code == 2 + + # Test no devices in list + mock_resp.content = """ + + """ + assert get_devices(mock_resp) == [] + + # Test devices in list + mock_resp.content = """ + +
+ """ + assert get_devices(mock_resp) == [ + {"device": "pytest_device - pytest_device_name", "factor": "factor_type"} + ] + + +def test_parse_mfa_challenge(): + """Test parsing the response to the challenge.""" + from tokendito.duo import parse_mfa_challenge + + mfa_challenge = Mock() + + # Test successful challenge + assert parse_mfa_challenge({"stat": "OK", "response": {"txid": "pytest"}}) == "pytest" + + # Test error + mfa_challenge.json = Mock(return_value={"stat": "OK", "response": "error"}) + with pytest.raises(SystemExit) as err: + parse_mfa_challenge(mfa_challenge) + assert err.value.code == 1 + + # Test no key in returned content + with pytest.raises(SystemExit) as err: + parse_mfa_challenge({"pyest": "OK", "badresponse": "error"}) + assert err.value.code == 1 + + # Test no response in returned content + mfa_challenge.json = Mock(return_value={"stat": "OK", "badresponse": "error"}) + with pytest.raises(SystemExit) as err: + parse_mfa_challenge(mfa_challenge) + assert err.value.code == 1 + + # Test failure + with pytest.raises(SystemExit) as err: + parse_mfa_challenge({"stat": "fail", "response": {"txid": "pytest_error"}}) + assert err.value.code == 1 + + # Test API failure + mfa_challenge.json = Mock(return_value={"stat": "fail", "response": {"txid": "error"}}) + with pytest.raises(SystemExit) as err: + parse_mfa_challenge(mfa_challenge) + assert err.value.code == 1 + + +def test_mfa_challenge(mocker): + """TODO: Test MFA challenge.""" + from tokendito.duo import mfa_challenge + + with pytest.raises(SystemExit) as err: + mfa_challenge(None, None, None) + assert err.value.code == 2 + + duo_info = { + "okta_factor": "okta_factor", + "factor_id": 1234, + "state_token": 12345, + "okta_callback_url": "http://test.okta.href", + "tx": "pytest_tx", + "tile_sig": "pytest_tile_sig", + "parent": "pytest_parent", + "host": "pytest_host", + "sid": "pytest_sid", + "version": "3.7", + } + passcode = "pytest_passcode" + mfa_option = {"factor": "pytest_factor", "device": "pytest_device - pytest_device_name"} + + mocker.patch( + "tokendito.duo.api_post", return_value={"stat": "OK", "response": {"txid": "pytest_txid"}} + ) + + txid = mfa_challenge(duo_info, mfa_option, passcode) + assert txid == "pytest_txid" + + +def test_parse_challenge(): + """Test that we can parse a challenge.""" + from tokendito.duo import parse_challenge + + verify_mfa = {"status": "SUCCESS", "result": "SUCCESS", "reason": "pytest"} + assert parse_challenge(verify_mfa, None) == ("success", "pytest") + + verify_mfa = {"status": "UNKNOWN", "reason": "UNKNOWN"} + challenge_result = {"result": "PYTEST"} + assert parse_challenge(verify_mfa, challenge_result) == (challenge_result, "UNKNOWN") + + +@pytest.mark.parametrize( + "return_value,side_effect,expected", + [ + (("success", "pytest"), None, "pytest"), + ((None, None), [(None, None), ("success", "pytest")], "pytest"), + (("failure", "pytest"), None, SystemExit), + ], +) +def test_mfa_verify(mocker, return_value, side_effect, expected): + """Test MFA challenge completion. + + side_effect is utilized to return different values on different iterations. + """ + from tokendito.duo import mfa_verify + + mocker.patch.object(HTTP_client, "post", return_value=None) + mocker.patch("time.sleep", return_value=None) + mocker.patch("tokendito.duo.get_mfa_response", return_value="pytest") + mocker.patch( + "tokendito.duo.parse_challenge", return_value=return_value, side_effect=side_effect + ) + + duo_info = {"host": "pytest_host", "sid": "pytest_sid"} + txid = "pytest_txid" + + if expected == SystemExit: + # Test failure as exit condition + with pytest.raises(expected) as err: + mfa_verify(duo_info, txid) + assert err.value.code == 2 + else: + # Test success, failure, and iterated calls + assert mfa_verify(duo_info, txid) == expected + + +def test_factor_callback(mocker): + """Test submitting factor to callback API.""" + from tokendito.duo import factor_callback + + duo_info = {"host": "pytest_host", "sid": "pytest_sid", "tile_sig": "pytest_tile_sig"} + verify_mfa = {"result_url": "/pytest_result_url"} + + # Test successful retrieval of the cookie + duo_api_response = { + "stat": "OK", + "response": {"txid": "pytest_txid", "cookie": "pytest_cookie"}, + } + mocker.patch("tokendito.duo.api_post", return_value=duo_api_response) + sig_response = factor_callback(duo_info, verify_mfa) + assert sig_response == "pytest_cookie:pytest_tile_sig" + + # Test bad data passed in + duo_api_response = "FAIL" + mocker.patch("tokendito.duo.api_post", return_value=duo_api_response) + with pytest.raises(SystemExit) as err: + factor_callback(duo_info, verify_mfa) + assert err.value.code == 2 + + # Test bad data passed in + duo_api_response = {"stat": "FAIL", "response": {"cookie": "pytest_cookie"}} + duo_info = {"host": "pytest", "sid": "pytest"} + mocker.patch("tokendito.duo.api_post", return_value=duo_api_response) + with pytest.raises(SystemExit) as err: + factor_callback(duo_info, verify_mfa) + assert err.value.code == 2 + + +def test_authenticate(mocker): + """Test end to end authentication.""" + from tokendito.duo import authenticate + + mocker.patch( + "tokendito.duo.get_sid", + return_value=( + { + "sid": "pytest", + "host": "pytest", + "state_token": "pytest", + "factor_id": "pytest", + "okta_callback_url": "pytest", + }, + "pytest", + ), + ) + # We mock a lot of functions here, but we're really just testing that the data can flow, + # and that it can be parsed correctly to be sent to the API endpoint. + mocker.patch("tokendito.duo.get_devices", return_value=[{"device": "pytest - device"}]) + mocker.patch("tokendito.user.select_preferred_mfa_index", return_value=0) + mocker.patch("tokendito.user.input", return_value="0123456") + mocker.patch("tokendito.duo.mfa_challenge", return_value="txid_pytest") + mocker.patch("tokendito.duo.mfa_verify", return_value={"result_url": "/pytest_result_url"}) + mocker.patch("tokendito.duo.api_post", return_value=None) + mocker.patch("tokendito.duo.factor_callback", return_value="pytest_cookie:pytest_tile_sig") + selected_okta_factor = { + "_embedded": { + "factor": { + "_embedded": { + "verification": { + "_links": { + "complete": {"href": "http://test.okta.href"}, + "script": {"href": "python-v3.7"}, + }, + "signature": "fdsafdsa:fdsfdfds:fdsfdsfds", + "host": "test_host", + } + }, + "id": 1234, + } + }, + "stateToken": 12345, + } + + res = authenticate(selected_okta_factor) + assert { + "id": "pytest", + "sig_response": "pytest_cookie:pytest_tile_sig", + "stateToken": "pytest", + } == res diff --git a/tests/unit/test_okta.py b/tests/unit/test_okta.py index fef61c32..f9e9ce02 100644 --- a/tests/unit/test_okta.py +++ b/tests/unit/test_okta.py @@ -89,19 +89,15 @@ def test_mfa_provider_type( mock_response = {"sessionToken": session_token} mocker.patch.object(HTTP_client, "post", return_value=mock_response) - mocker.patch("tokendito.duo.duo_api_post", return_value=None) + mocker.patch("tokendito.duo.api_post", return_value=None) payload = {"x": "y", "t": "z"} - callback_url = "https://www.acme.org" selected_mfa_option = 1 mfa_challenge_url = 1 primary_auth = 1 pytest_config = Config() - mocker.patch( - "tokendito.duo.authenticate_duo", - return_value=(payload, sample_headers, callback_url), - ) + mocker.patch("tokendito.duo.authenticate", return_value=payload) mocker.patch("tokendito.okta.push_approval", return_value={"sessionToken": session_token}) mocker.patch("tokendito.okta.totp_approval", return_value={"sessionToken": session_token}) @@ -128,7 +124,6 @@ def test_bad_mfa_provider_type(mocker, sample_headers): pytest_config = Config() payload = {"x": "y", "t": "z"} - callback_url = "https://www.acme.org" selected_mfa_option = 1 mfa_challenge_url = 1 primary_auth = 1 @@ -140,10 +135,7 @@ def test_bad_mfa_provider_type(mocker, sample_headers): mock_response = Mock() mock_response.json.return_value = mfa_verify - mocker.patch( - "tokendito.duo.authenticate_duo", - return_value=(payload, sample_headers, callback_url), - ) + mocker.patch("tokendito.duo.authenticate", return_value=payload) mocker.patch.object(HTTP_client, "post", return_value=mock_response) mocker.patch("tokendito.okta.totp_approval", return_value=mfa_verify) diff --git a/tokendito/duo.py b/tokendito/duo.py index 68caece0..ecc3804e 100644 --- a/tokendito/duo.py +++ b/tokendito/duo.py @@ -8,40 +8,44 @@ from urllib.parse import unquote from urllib.parse import urlparse +import bs4 from bs4 import BeautifulSoup -import requests from tokendito import user from tokendito.config import config +from tokendito.http_client import HTTP_client logger = logging.getLogger(__name__) -def prepare_duo_info(selected_okta_factor): +def prepare_info(selected_okta_factor): """Aggregate most of the parameters needed throughout the Duo authentication process. :param selected_okta_factor: dict response describing Duo factor in Okta. :return duo_info: dict of parameters for Duo """ duo_info = {} - okta_factor = selected_okta_factor["_embedded"]["factor"]["_embedded"]["verification"] - duo_info["okta_factor"] = okta_factor - duo_info["factor_id"] = selected_okta_factor["_embedded"]["factor"]["id"] - - duo_info["state_token"] = selected_okta_factor["stateToken"] - duo_info["okta_callback_url"] = okta_factor["_links"]["complete"]["href"] - duo_info["tx"] = okta_factor["signature"].split(":")[0] - duo_info["tile_sig"] = okta_factor["signature"].split(":")[1] - duo_info["parent"] = f"{config.okta['org']}/signin/verify/duo/web" - duo_info["host"] = okta_factor["host"] - duo_info["sid"] = "" - - version = okta_factor["_links"]["script"]["href"].split("-v")[1] - duo_info["version"] = version.strip(".js") - + try: + okta_factor = selected_okta_factor["_embedded"]["factor"]["_embedded"]["verification"] + duo_info["okta_factor"] = okta_factor + duo_info["factor_id"] = selected_okta_factor["_embedded"]["factor"]["id"] + + duo_info["state_token"] = selected_okta_factor["stateToken"] + duo_info["okta_callback_url"] = okta_factor["_links"]["complete"]["href"] + duo_info["tx"] = okta_factor["signature"].split(":")[0] + duo_info["tile_sig"] = okta_factor["signature"].split(":")[1] + duo_info["parent"] = f"{config.okta['org']}/signin/verify/duo/web" + duo_info["host"] = okta_factor["host"] + duo_info["sid"] = "" + + version = okta_factor["_links"]["script"]["href"].split("-v")[1] + duo_info["version"] = version.strip(".js") + except KeyError as missing_key: + logger.error(f"There was an issue parsing the Okta factor. Please try again: {missing_key}") + sys.exit(1) return duo_info -def duo_api_post(url, params=None, headers=None, payload=None): +def api_post(url, params=None, headers=None, payload=None, return_json=True): """Error handling and response parsing wrapper for Duo POSTs. :param url: The URL being connected to. @@ -50,33 +54,13 @@ def duo_api_post(url, params=None, headers=None, payload=None): :param payload: Request body. :return response: Response to the API request. """ - try: - response = requests.request("POST", url, params=params, headers=headers, data=payload) - except Exception as request_issue: - logger.error(f"There was an error connecting to the Duo API: {request_issue}") - sys.exit(1) - - json_message = None - try: - json_message = response.json() - except ValueError: - logger.debug(f"Non-json response from Duo API: {response}") - - if response.status_code != 200: - logger.error(f"Your Duo authentication has failed with status {response.status_code}.") - if json_message and json_message["stat"].lower() != "ok": - logger.error(f"{response.status_code}, {json_message['message']}") - else: - logger.error( - "Please re-run the program with parameter" - ' "--loglevel debug" to see more information.' - ) - sys.exit(2) - + response = HTTP_client.post( + url, params=params, headers=headers, data=payload, return_json=return_json + ) return response -def get_duo_sid(duo_info): +def get_sid(duo_info): """Perform the initial Duo authentication request to obtain the SID. The SID is referenced throughout the authentication process for Duo. @@ -89,7 +73,7 @@ def get_duo_sid(duo_info): url = f"https://{duo_info['host']}/frame/web/v1/auth" logger.debug(f"Calling Duo {urlparse(url).path} with params {params.keys()}") - duo_auth_response = duo_api_post(url, params=params) + duo_auth_response = api_post(url, params=params, return_json=False) try: duo_auth_redirect = urlparse(f"{unquote(duo_auth_response.url)}").query @@ -98,10 +82,10 @@ def get_duo_sid(duo_info): logger.error(f"There was an error getting your SID. Please try again: {sid_error}") sys.exit(2) - return duo_info, duo_auth_response + return (duo_info, duo_auth_response) -def get_duo_devices(duo_auth): +def get_devices(duo_auth): """Parse Duo auth response to extract user's MFA options. The /frame/web/v1/auth API returns an html page that lists @@ -113,35 +97,39 @@ def get_duo_devices(duo_auth): :return factor_options: list of dict objects describing each MFA option. """ soup = BeautifulSoup(duo_auth.content, "html.parser") + devices = [] - device_soup = soup.find("select", {"name": "device"}).findAll("option") # type: ignore - devices = [f"{d['value']} - {d.text}" for d in device_soup] + device_soup = soup.find("select", {"name": "device"}) + if type(device_soup) is bs4.element.Tag: + options = device_soup.findAll("option") + devices = [f"{d['value']} - {d.text}" for d in options] if not devices: logger.error("Please configure devices for your Duo MFA and retry.") sys.exit(2) factor_options = [] + factors = [] for device in devices: options = soup.find("fieldset", {"data-device-index": device.split(" - ")[0]}) - factors = options.findAll("input", {"name": "factor"}) # type: ignore (PEP 561) + if type(options) is bs4.element.Tag: + factors = options.findAll("input", {"name": "factor"}) for factor in factors: factor_option = {"device": device, "factor": factor["value"]} factor_options.append(factor_option) return factor_options -def parse_duo_mfa_challenge(mfa_challenge): +def parse_mfa_challenge(mfa_challenge): """Gracefully parse Duo MFA challenge response. :param mfa_challenge: Duo API response for MFA challenge. :return txid: Duo transaction ID. """ try: - mfa_challenge = mfa_challenge.json() mfa_status = mfa_challenge["stat"] txid = mfa_challenge["response"]["txid"] - except ValueError as value_error: - logger.error(f"The Duo API returned a non-json response: {value_error}") + except (TypeError, ValueError, AttributeError) as err: + logger.error(f"The Duo API returned a non-json response: {err}") sys.exit(1) except KeyError as key_error: logger.error(f"The Duo API response is missing a required parameter: {key_error}") @@ -149,12 +137,12 @@ def parse_duo_mfa_challenge(mfa_challenge): sys.exit(1) if mfa_status == "fail": - logger.error(f"Your Duo authentication has failed: {mfa_challenge['message']}") + logger.error(f"Your Duo authentication has failed: {mfa_challenge}") sys.exit(1) return txid -def duo_mfa_challenge(duo_info, mfa_option, passcode): +def mfa_challenge(duo_info, mfa_option, passcode): """Poke Duo to challenge the selected factor. After the user has selected their device and factor of choice, @@ -165,23 +153,29 @@ def duo_mfa_challenge(duo_info, mfa_option, passcode): :param mfa_option: the user's selected second factor. :return txid: Duo transaction ID used to track this auth attempt. """ - url = f"https://{duo_info['host']}/frame/prompt" - device = mfa_option["device"].split(" - ")[0] - mfa_data = { - "factor": mfa_option["factor"], - "device": device, - "sid": duo_info["sid"], - "out_of_date": False, - "days_out_of_date": 0, - "days_to_block": None, - "async": True, - } + mfa_data = {} + try: + url = f"https://{duo_info['host']}/frame/prompt" + device = mfa_option["device"].split(" - ")[0] + mfa_data = { + "factor": mfa_option["factor"], + "device": device, + "sid": duo_info["sid"], + "out_of_date": False, + "days_out_of_date": 0, + "days_to_block": None, + "async": True, + } + except Exception as key_error: + logger.error(f"Unable to parse Duo MFA data: {key_error}") + sys.exit(2) + if passcode: mfa_data["passcode"] = passcode - mfa_challenge = duo_api_post(url, payload=mfa_data) - txid = parse_duo_mfa_challenge(mfa_challenge) + mfa_challenge = api_post(url, payload=mfa_data) + txid = parse_mfa_challenge(mfa_challenge) - logger.debug("Sent MFA Challenge and obtained Duo transaction ID.") + logger.debug(f"Sent MFA Challenge and obtained Duo transaction ID {txid}") return txid @@ -192,9 +186,13 @@ def get_mfa_response(mfa_result): :return verify_mfa: json response from mfa api """ try: - verify_mfa = mfa_result.json()["response"] - except Exception as parse_error: - logger.error(f"There was an error parsing the mfa challenge result: {parse_error}") + verify_mfa = mfa_result["response"] + except KeyError as key_error: + logger.error(f"The mfa challenge response is missing a required parameter: {key_error}") + logger.debug(mfa_result) + sys.exit(1) + except Exception as other_error: + logger.error(f"There was an error getting the mfa challenge result: {other_error}") sys.exit(1) return verify_mfa @@ -219,10 +217,10 @@ def parse_challenge(verify_mfa, challenge_result): challenge_result = verify_mfa["result"].lower() logger.debug(f"Challenge result is {challenge_result}") - return challenge_result, challenge_reason + return (challenge_result, challenge_reason) -def duo_mfa_verify(duo_info, txid): +def mfa_verify(duo_info, txid): """Verify MFA challenge completion. After the user has received the MFA challenge, query the Duo API @@ -238,13 +236,11 @@ def duo_mfa_verify(duo_info, txid): while True: logger.debug("Waiting for MFA challenge response") - mfa_result = duo_api_post(url, payload=challenged_mfa) + mfa_result = api_post(url, payload=challenged_mfa) verify_mfa = get_mfa_response(mfa_result) - challenge_result, challenge_reason = parse_challenge(verify_mfa, challenge_result) + (challenge_result, challenge_reason) = parse_challenge(verify_mfa, challenge_result) - if challenge_result is None: - continue - elif challenge_result == "success": + if challenge_result == "success": logger.debug("Successful MFA challenge received") break elif challenge_result == "failure": @@ -257,7 +253,7 @@ def duo_mfa_verify(duo_info, txid): return verify_mfa -def duo_factor_callback(duo_info, verify_mfa): +def factor_callback(duo_info, verify_mfa): """Inform factor callback api of successful challenge. This request seems to inform this factor's callback url @@ -268,23 +264,25 @@ def duo_factor_callback(duo_info, verify_mfa): :return sig_response: required to sign final Duo callback request. """ factor_callback_url = f"https://{duo_info['host']}{verify_mfa['result_url']}" - factor_callback = duo_api_post(factor_callback_url, payload={"sid": duo_info["sid"]}) + factor_callback = api_post(factor_callback_url, payload={"sid": duo_info["sid"]}) - try: - sig_response = f"{factor_callback.json()['response']['cookie']}:{duo_info['tile_sig']}" - except Exception as sig_error: - logger.error( - "There was an error getting your application signature " - f"from Duo: {json.dumps(sig_error)}" - ) + if type(factor_callback) is not dict: + logger.error("There was an error getting your application signature. Please try again.") + logger.debug(f"Response from DUO: {factor_callback}") sys.exit(2) - logger.debug("Completed factor callback.") + try: + sig_response = f"{factor_callback['response']['cookie']}:{duo_info['tile_sig']}" + except (KeyError, TypeError) as err: + logger.error("There was an error getting your application signature. Please try again.") + logger.debug(f"Response from DUO: {err}") + sys.exit(2) + logger.debug(f"Completed callback to {factor_callback_url} with sig_response {sig_response}") return sig_response -def set_passcode(mfa_option): - """Set totp passcode. +def get_passcode(mfa_option): + """Get totp passcode. If the user has selected the passcode option, collect their TOTP. @@ -292,13 +290,17 @@ def set_passcode(mfa_option): :return passcode: passcode value from user """ passcode = None - if mfa_option["factor"].lower() == "passcode": - user.print("Type your TOTP and press Enter:") - passcode = user.get_input() + + try: + if mfa_option["factor"].lower() == "passcode": + user.print("Type your TOTP and press Enter:") + passcode = user.get_input() + except (TypeError, KeyError): + pass return passcode -def authenticate_duo(selected_okta_factor): +def authenticate(selected_okta_factor): """Accomplish MFA via Duo. This is the main function that coordinates the Duo @@ -307,29 +309,24 @@ def authenticate_duo(selected_okta_factor): :param selected_okta_factor: Duo factor information retrieved from Okta. :return payload: required payload for Okta callback - :return headers: required headers for Okta callback """ - try: - duo_info = prepare_duo_info(selected_okta_factor) - except KeyError as missing_key: - logger.error(f"There was an issue parsing the Okta factor. Please try again: {missing_key}") - sys.exit(1) + duo_info = prepare_info(selected_okta_factor) # Collect devices, factors, auth params for Duo - duo_info, duo_auth_response = get_duo_sid(duo_info) - factor_options = get_duo_devices(duo_auth_response) + (duo_info, duo_auth_response) = get_sid(duo_info) + factor_options = get_devices(duo_auth_response) mfa_index = user.select_preferred_mfa_index( factor_options, factor_key="factor", subfactor_key="device" ) mfa_option = factor_options[mfa_index] logger.debug(f"Selected MFA is [{mfa_option}]") - passcode = set_passcode(mfa_option) + passcode = get_passcode(mfa_option) - txid = duo_mfa_challenge(duo_info, mfa_option, passcode) - verify_mfa = duo_mfa_verify(duo_info, txid) + txid = mfa_challenge(duo_info, mfa_option, passcode) + verify_mfa = mfa_verify(duo_info, txid) # Make factor callback to Duo - sig_response = duo_factor_callback(duo_info, verify_mfa) + sig_response = factor_callback(duo_info, verify_mfa) # Prepare for Okta callback payload = { @@ -337,6 +334,7 @@ def authenticate_duo(selected_okta_factor): "sig_response": sig_response, "stateToken": duo_info["state_token"], } - headers = {"content-type": "application/json", "accept": "application/json"} - return payload, headers, duo_info["okta_callback_url"] + # Send Okta callback and return payload + api_post(duo_info["okta_callback_url"], payload=payload, return_json=False) + return payload diff --git a/tokendito/http_client.py b/tokendito/http_client.py index bd8f1cd2..51455aae 100644 --- a/tokendito/http_client.py +++ b/tokendito/http_client.py @@ -49,10 +49,10 @@ def get(self, url, params=None, headers=None): logger.error(f"The get request to {url} failed with {err}") sys.exit(1) - def post(self, url, data=None, json=None, headers=None, return_json=False): + def post(self, url, data=None, json=None, headers=None, params=None, return_json=False): """Perform a POST request.""" try: - response = self.session.post(url, data=data, json=json, headers=headers) + response = self.session.post(url, data=data, json=json, params=params, headers=headers) response.raise_for_status() if return_json is True: try: diff --git a/tokendito/okta.py b/tokendito/okta.py index 10e94ad5..a73a0887 100644 --- a/tokendito/okta.py +++ b/tokendito/okta.py @@ -493,8 +493,8 @@ def mfa_provider_type( factor_type = selected_factor.get("_embedded", {}).get("factor", {}).get("factorType", None) if mfa_provider == "DUO": - payload, headers, callback_url = duo.authenticate_duo(selected_factor) - duo.duo_api_post(callback_url, payload=payload) + mfa_verify = duo.authenticate(selected_factor) + headers = {"content-type": "application/json", "accept": "application/json"} mfa_verify = HTTP_client.post( mfa_challenge_url, json=payload, headers=headers, return_json=True )