Skip to content

Commit

Permalink
feat: Add support for step up authentication (#140)
Browse files Browse the repository at this point in the history
* feat: Add support for step up authentication

When "Extra Verification" is required in authenticate_to_roles, make
an additional request to '/api/v1/authn' using the current state token.

Use '/login/sessionCookieRedirect' in authenticate_to_roles to ensure
the the 'HTTP_client' cookies have the values needed for step up
authentication and also the device token.

Add optional support for device token across session so that step up
authentication does not require multiple MFA requests.

Add device token options to 'config.py'.

Add getting and setting the device token to 'http_client.py'.

Add storing the device token to the ini file to 'user.py'.

Add the device token options to 'docs/README.md'.

* fix: Address linting issues

* chore: Revert version change

* fix: Clear out MFA response after using it
  • Loading branch information
ruhulio authored Nov 14, 2023
1 parent d3e2e35 commit fa7b00b
Show file tree
Hide file tree
Showing 11 changed files with 215 additions and 7 deletions.
4 changes: 3 additions & 1 deletion docs/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ tokendito --profile engineer
usage: tokendito [-h] [--version] [--configure] [--username OKTA_USERNAME] [--password OKTA_PASSWORD] [--profile USER_CONFIG_PROFILE] [--config-file USER_CONFIG_FILE]
[--loglevel {DEBUG,INFO,WARN,ERROR}] [--log-output-file USER_LOG_OUTPUT_FILE] [--aws-config-file AWS_CONFIG_FILE] [--aws-output AWS_OUTPUT]
[--aws-profile AWS_PROFILE] [--aws-region AWS_REGION] [--aws-role-arn AWS_ROLE_ARN] [--aws-shared-credentials-file AWS_SHARED_CREDENTIALS_FILE]
[--okta-org OKTA_ORG | --okta-tile OKTA_TILE] [--okta-mfa OKTA_MFA] [--okta-mfa-response OKTA_MFA_RESPONSE] [--quiet]
[--okta-org OKTA_ORG | --okta-tile OKTA_TILE] [--okta-mfa OKTA_MFA] [--okta-mfa-response OKTA_MFA_RESPONSE] [--use-device-token] [--quiet]
Gets an STS token to use with the AWS CLI and SDK.
Expand Down Expand Up @@ -112,6 +112,7 @@ options:
--okta-mfa OKTA_MFA Sets the MFA method
--okta-mfa-response OKTA_MFA_RESPONSE
Sets the MFA response to a challenge
--use-device-token Use device token across sessions
--quiet Suppress output
```

Expand Down Expand Up @@ -153,6 +154,7 @@ The following table lists the environment variable and user configuration entry
| `--okta-tile` | `TOKENDITO_OKTA_TILE` | `okta_tile` |
| `--okta-mfa` | `TOKENDITO_OKTA_MFA` | `okta_mfa` |
| `--okta-mfa-response` | `TOKENDITO_OKTA_MFA_RESPONSE` | `okta_mfa_response` |
| `--use-device-token` | `TOKENDITO_USER_USE_DEVICE_TOKEN` | `user_use_device_token` |
| `--quiet` | `TOKENDITO_USER_QUIET` | `quiet` |

# Configuration file location
Expand Down
8 changes: 7 additions & 1 deletion tests/unit/test_aws.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ def test_select_assumeable_role_no_tiles():
def test_authenticate_to_roles(status_code, monkeypatch):
"""Test if function return correct response."""
from tokendito.aws import authenticate_to_roles
from tokendito.config import Config
import tokendito.http_client as http_client

# Create a mock response object
Expand All @@ -104,7 +105,12 @@ def test_authenticate_to_roles(status_code, monkeypatch):
# Use monkeypatch to replace the HTTP_client.get method with the mock
monkeypatch.setattr(http_client.HTTP_client, "get", lambda *args, **kwargs: mock_response)

pytest_config = Config(
okta={
"org": "https://acme.okta.org/",
}
)
cookies = {"some_cookie": "some_value"}

with pytest.raises(SystemExit):
authenticate_to_roles([("http://test.url.com", "")], cookies)
authenticate_to_roles(pytest_config, [("http://test.url.com", "")], cookies)
19 changes: 19 additions & 0 deletions tests/unit/test_http_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -155,3 +155,22 @@ def test_post_logging_on_exception(client, mocker):
with pytest.raises(SystemExit):
client.post("http://test.com", json={"key": "value"})
mock_logger.assert_called()


def test_get_device_token(client):
"""Test getting device token from the session."""
device_token = "test-device-token"
cookies = {"DT": device_token}
client.set_cookies(cookies)

# Check if the device token is set correctly in the session
assert client.get_device_token() == device_token


def test_set_device_token(client):
"""Test setting device token in the session."""
device_token = "test-device-token"
client.set_device_token("http://test.com", device_token)

# Check if the device token is set correctly in the session
assert client.session.cookies.get("DT") == device_token
49 changes: 49 additions & 0 deletions tests/unit/test_okta.py
Original file line number Diff line number Diff line change
Expand Up @@ -540,6 +540,55 @@ def test_authenticate(mocker):
assert okta.authenticate(pytest_config) == error


def test_step_up_authenticate(mocker):
"""Test set up authenticate method."""
from tokendito import okta
from tokendito.config import Config
from tokendito.http_client import HTTP_client

pytest_config = Config(
okta={
"username": "pytest",
"org": "https://acme.okta.org/",
}
)

state_token = "test-state-token"

# Test missing auth type
mocker.patch("tokendito.okta.get_auth_properties", return_value={})
assert okta.step_up_authenticate(pytest_config, state_token) is False

# Test unsupported auth type
mocker.patch("tokendito.okta.get_auth_properties", return_value={"type": "SAML2"})
assert okta.step_up_authenticate(pytest_config, state_token) is False

# Test supported auth type...
mocker.patch("tokendito.okta.get_auth_properties", return_value={"type": "OKTA"})

# ...with SUCCESS status
mock_response_data = {"status": "SUCCESS"}
mocker.patch.object(HTTP_client, "post", return_value=mock_response_data)

assert okta.step_up_authenticate(pytest_config, state_token) is True

# ...with MFA_REQUIRED status
mock_response_data = {"status": "MFA_REQUIRED"}
mocker.patch.object(HTTP_client, "post", return_value=mock_response_data)
patched_mfa_challenge = mocker.patch.object(
okta, "mfa_challenge", return_value="test-session-token"
)

assert okta.step_up_authenticate(pytest_config, state_token) is True
assert patched_mfa_challenge.call_count == 1

# ...with unknown status
mock_response_data = {"status": "unknown"}
mocker.patch.object(HTTP_client, "post", return_value=mock_response_data)

assert okta.step_up_authenticate(pytest_config, state_token) is False


def test_local_auth(mocker):
"""Test local auth method."""
from tokendito import okta
Expand Down
20 changes: 20 additions & 0 deletions tests/unit/test_user.py
Original file line number Diff line number Diff line change
Expand Up @@ -466,6 +466,26 @@ def test_update_configuration(tmpdir):
assert ret.okta["mfa"] == "pytest"


def test_update_device_token(tmpdir):
"""Test writing and reading device token to a configuration file."""
from tokendito import user
from tokendito.config import Config

path = tmpdir.mkdir("pytest").join("pytest_tokendito.ini")

device_token = "test-device-token"

pytest_config = Config(
okta={"device_token": device_token},
user={"config_file": path, "config_profile": "pytest"},
)

# Write out a config file via configure() and ensure it's functional
user.update_device_token(pytest_config)
ret = user.process_ini_file(path, "pytest")
assert ret.okta["device_token"] == device_token


def test_process_ini_file(tmpdir):
"""Test whether ini config elements are set correctly.
Expand Down
15 changes: 13 additions & 2 deletions tokendito/aws.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ def get_output_types():
return ["json", "text", "csv", "yaml", "yaml-stream"]


def authenticate_to_roles(urls, cookies=None):
def authenticate_to_roles(config, urls, cookies=None):
"""Authenticate AWS user with saml.
:param urls: list of tuples or tuple, with tiles info
Expand All @@ -67,11 +67,22 @@ def authenticate_to_roles(urls, cookies=None):
logger.info(f"Discovering roles in {tile_count} tile{plural}.")
for url, label in url_list:
response = HTTP_client.get(url) # Use the HTTPClient's get method

session_url = config.okta["org"] + "/login/sessionCookieRedirect"
params = {"token": cookies.get("sessionToken"), "redirectUrl": url}

response = HTTP_client.get(session_url, params=params)

saml_response_string = response.text

saml_xml = okta.extract_saml_response(saml_response_string)
if not saml_xml:
if "Extra Verification" in saml_response_string:
state_token = okta.extract_state_token(saml_response_string)
if "Extra Verification" in saml_response_string and state_token:
logger.info(f"Step-Up authentication required for {url}.")
if okta.step_up_authenticate(config, state_token):
return authenticate_to_roles(config, urls, cookies)

logger.error("Step-Up Authentication required, but not supported.")
elif "App Access Locked" in saml_response_string:
logger.error(
Expand Down
2 changes: 2 additions & 0 deletions tokendito/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ class Config(object):
encoding=_default_encoding,
loglevel="INFO",
log_output_file="",
use_device_token=False,
mask_items=[],
quiet=False,
),
Expand All @@ -47,6 +48,7 @@ class Config(object):
mfa_response=None,
tile=None,
org=None,
device_token=None,
),
)

Expand Down
20 changes: 20 additions & 0 deletions tokendito/http_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

import logging
import sys
from urllib.parse import urlparse

import requests
from tokendito import __title__
Expand Down Expand Up @@ -75,5 +76,24 @@ def reset(self):
self.session.headers = requests.utils.default_headers()
self.session.headers.update({"User-Agent": user_agent})

def get_device_token(self):
"""Get the device token from the current session cookies.
:return: Device token or None
"""
return self.session.cookies.get("DT", None)

def set_device_token(self, org_url, device_token):
"""Set the device token in the current session cookies.
:param org_url: The organization URL
:param device_token: The device token
:return: None
"""
if not device_token:
return

self.session.cookies.set("DT", device_token, domain=urlparse(org_url).netloc, path="/")


HTTP_client = HTTPClient()
34 changes: 33 additions & 1 deletion tokendito/okta.py
Original file line number Diff line number Diff line change
Expand Up @@ -258,6 +258,35 @@ def authenticate(config):
return sid


def step_up_authenticate(config, state_token):
"""Try to step up authenticate the user. Only supported for local auth.
:param config: Configuration object
:param state_token: The state token
:return: True if step up authentication was successful; False otherwise
"""
auth_properties = get_auth_properties(userid=config.okta["username"], url=config.okta["org"])
if "type" not in auth_properties or not is_local_auth(auth_properties):
return False

headers = {"content-type": "application/json", "accept": "application/json"}
payload = {"stateToken": state_token}

auth = HTTP_client.post(
f"{config.okta['org']}/api/v1/authn", json=payload, headers=headers, return_json=True
)

status = auth.get("status", None)
if status == "SUCCESS":
return True
elif status == "MFA_REQUIRED":
mfa_challenge(config, headers, auth)
return True

logger.error("Okta auth failed: unknown status for step up authentication.")
return False


def is_local_auth(auth_properties):
"""Check whether authentication happens locally.
Expand Down Expand Up @@ -429,7 +458,7 @@ def extract_state_token(html):
state_token = None
pattern = re.compile(r"var stateToken = '(?P<stateToken>.*)';", re.MULTILINE)

script = soup.find("script", text=pattern)
script = soup.find("script", string=pattern)
if type(script) is bs4.element.Tag:
match = pattern.search(script.text)
if match:
Expand Down Expand Up @@ -605,6 +634,9 @@ def totp_approval(config, selected_mfa_option, headers, mfa_challenge_url, paylo
user.add_sensitive_value_to_be_masked(mfa_verify["sessionToken"])
logger.debug(f"mfa_verify [{json.dumps(mfa_verify)}]")

# Clear out any MFA response since it is no longer valid
config.okta["mfa_response"] = None

return mfa_verify


Expand Down
18 changes: 17 additions & 1 deletion tokendito/tool.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,16 @@ def cli(args):
)
sys.exit(1)

if config.user["use_device_token"]:
device_token = config.okta["device_token"]
if device_token:
HTTP_client.set_device_token(config.okta["org"], device_token)
else:
logger.warning(
f"Device token unavailable for config profile {args.user_config_profile}. "
"May see multiple MFA requests this time."
)

# Authenticate to okta
session_cookies = okta.authenticate(config)

Expand All @@ -50,7 +60,7 @@ def cli(args):
config.okta["tile"] = user.discover_tiles(config.okta["org"])

# Authenticate to AWS roles
auth_tiles = aws.authenticate_to_roles(config.okta["tile"], cookies=session_cookies)
auth_tiles = aws.authenticate_to_roles(config, config.okta["tile"], cookies=session_cookies)

(role_response, role_name) = aws.select_assumeable_role(auth_tiles)

Expand All @@ -70,4 +80,10 @@ def cli(args):
output=config.aws["output"],
)

device_token = HTTP_client.get_device_token()
if config.user["use_device_token"] and device_token:
logger.info(f"Saving device token to config profile {args.user_config_profile}")
config.okta["device_token"] = device_token
user.update_device_token(config)

user.display_selected_role(profile_name=config.aws["profile"], role_response=role_response)
33 changes: 32 additions & 1 deletion tokendito/user.py
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,13 @@ def parse_cli_args(args):
help="Sets the MFA response to a challenge. You "
"can also use the TOKENDITO_OKTA_MFA_RESPONSE environment variable.",
)
parser.add_argument(
"--use-device-token",
dest="user_use_device_token",
action="store_true",
default=False,
help="Use device token across sessions",
)
parser.add_argument(
"--quiet",
dest="user_quiet",
Expand Down Expand Up @@ -911,6 +918,27 @@ def update_configuration(config):
logger.info(f"Updated {ini_file} with profile {profile}")


def update_device_token(config):
"""Update configuration file on local system with device token.
:param config: the current configuration
:return: None
"""
logger.debug("Update configuration file on local system with device token.")
ini_file = config.user["config_file"]
profile = config.user["config_profile"]

contents = {}
# Copy relevant parts of the configuration into an dictionary that
# will be written out to disk
if "device_token" in config.okta and config.okta["device_token"] is not None:
contents["okta_device_token"] = config.okta["device_token"]

logger.debug(f"Adding {contents} to config file.")
update_ini(profile=profile, ini_file=ini_file, **contents)
logger.info(f"Updated {ini_file} with profile {profile}")


def set_local_credentials(response={}, role="default", region="us-east-1", output="json"):
"""Write to local files to insert credentials.
Expand Down Expand Up @@ -1227,8 +1255,11 @@ def request_cookies(url, session_token):
add_sensitive_value_to_be_masked(sess_id)

# create cookies with sid 'sid'.
domain = urlparse(url).netloc

cookies = requests.cookies.RequestsCookieJar()
cookies.set("sid", sess_id, domain=urlparse(url).netloc, path="/")
cookies.set("sid", sess_id, domain=domain, path="/")
cookies.set("sessionToken", session_token, domain=domain, path="/")

# Log the session cookies.
logger.debug(f"Received session cookies: {cookies}")
Expand Down

0 comments on commit fa7b00b

Please sign in to comment.