From 3ba6eae2461c92a5b304e2530f5786bc8c5e3450 Mon Sep 17 00:00:00 2001 From: Jean-Pierre Sevigny <41591249+sevignyj@users.noreply.github.com> Date: Wed, 16 Aug 2023 10:36:47 -0400 Subject: [PATCH] oie implementation draft 1 --- .gitignore | 1 + tokendito/__main__.py | 2 +- tokendito/okta.py | 149 ++++++++++++++++++++++++++++++++++++++--- tokendito/tokendito.py | 6 +- tokendito/tool.py | 70 ------------------- tokendito/user.py | 64 +++++++++++++++++- 6 files changed, 206 insertions(+), 86 deletions(-) delete mode 100644 tokendito/tool.py diff --git a/.gitignore b/.gitignore index 2954c5dd..6c6ba063 100644 --- a/.gitignore +++ b/.gitignore @@ -91,6 +91,7 @@ venv/ ENV/ env.bak/ venv.bak/ +.vscode/ # Spyder project settings .spyderproject diff --git a/tokendito/__main__.py b/tokendito/__main__.py index 68fa0fba..ebe335b2 100755 --- a/tokendito/__main__.py +++ b/tokendito/__main__.py @@ -12,7 +12,7 @@ def main(args=None): # needed for console script path = os.path.dirname(os.path.dirname(__file__)) sys.path[0:0] = [path] - from tokendito.tool import cli + from tokendito.user import cli try: return cli(args) diff --git a/tokendito/okta.py b/tokendito/okta.py index cc0ef1c6..9570533e 100644 --- a/tokendito/okta.py +++ b/tokendito/okta.py @@ -82,17 +82,40 @@ def api_error_code_parser(status=None): return message +def get_auth_pipeline(url=None): + """Get auth pipeline version.""" + logger.debug(f"get_auth_pipeline({url})") + headers = {"accept": "application/json"} + # https://developer.okta.com/docs/api/openapi/okta-management/management/tag/OrgSetting/ + url = f"{url}/.well-known/okta-organization" + + response = user.request_wrapper("GET", url, headers=headers) + try: + ret = response.json() + except (KeyError, ValueError) as e: + logger.error(f"Failed to parse type in {url}:{str(e)}") + logger.debug(f"Response: {response.text}") + sys.exit(1) + logger.debug(f"we have {ret}") + auth_pipeline = ret.get("pipeline", None) + if auth_pipeline != "idx" and auth_pipeline != "v1": + logger.error(f"unsupported auth pipeline version {auth_pipeline}") + sys.exit(1) + logger.debug(f"Pipeline is of type {auth_pipeline}") + return auth_pipeline + + def get_auth_properties(userid=None, url=None): - """Make a call to the webfinger endpoint. + """Make a call to the webfinger endpoint to get the auth properties metadata :param userid: User for which we are requesting an auth endpoint. :param url: Site where we are looking up the user. :returns: dictionary with authentication properties. """ payload = {"resource": f"okta:acct:{userid}", "rel": "okta:idp"} + # payload = {"resource": f"okta:acct:{userid}"} headers = {"accept": "application/jrd+json"} url = f"{url}/.well-known/webfinger" - logger.debug(f"Looking up auth endpoint for {userid} in {url}") response = user.request_wrapper("GET", url, headers=headers, params=payload) @@ -214,17 +237,42 @@ def get_session_token(config, primary_auth, headers): return session_token +def get_authorization_endpoint (server_metadata): + """ + get entpoint where authorization is done + todo handle errors and perhaps provide default values + :param server_metadata: json string containing server metadata + :rerurn: endpoint where to authorize + """ + return server_metadata['authorization_endpoint'] + +def oie_auth(config): + server_metadata = get_authorization_server_info(config.okta["org"]) + # get url where to authorize + authorization_endpoint = get_authorization_endpoint(server_metadata) + # Authorization Code flow (see + # https://developer.okta.com/docs/guides/implement-grant-type/authcode/main/#about-the-authorization-code-grant + # ) + authn_code = authorization_code_request(authorization_endpoint) + authn_token_endpoint = get_token_endpoint(server_metadata) + auth_sid = get_oauth_token(authn_token_endpoint, authn_code) + + return auth_sid -def authenticate(config): - """Authenticate user. + +def idp_auth(config): + """authenticate and authorize with the idp :param config: Config object :return: session ID cookie. """ + auth_properties = get_auth_properties(userid=config.okta["username"], url=config.okta["org"]) + if "type" not in auth_properties: logger.error("Okta auth failed: unknown type.") sys.exit(1) + sid = None if is_local_auth(auth_properties): @@ -235,14 +283,96 @@ def authenticate(config): else: logger.error(f"{auth_properties['type']} login via IdP Discovery is not curretly supported") sys.exit(1) + + if oie_enabled(config.okta["org"]): + sid = oie_auth(config) + return sid +def get_authorization_server_info(url=None): + """Get OAuth token from Okta. + + :param url: URL of the Okta OAuth token endpoint + :return: OAuth token + """ + url = f"{url}/.well-known/oauth-authorization-server" + headers = {"accept": "application/json"} + response = user.request_wrapper("GET", url, headers=headers) + logger.info(f"Authorization Server info: {response.json()}") + return response.json() + + +def get_authorization_token(url=None): + """Get OAuth token from Okta. + + :param url: URL of the Okta OAuth token endpoint + :return: OAuth token + """ + auth_server_info = get_authorization_server_info(url) + # auth_server_url = auth_server_info["authorization_endpoint"] + token_url = auth_server_info["token_endpoint"] + # headers = {"accept": "application/json"} + headers = {"accept": "text/html,application/xhtml+xml,application/xml"} + response = user.request_wrapper("GET", token_url, headers=headers) + breakpoint() + return response.json() + + +def idp_authz(config, authenticated_sid): + """oauth2 authorization + :param: config object, and authenticated session id""" + + # https://developer.okta.com/docs/guides/implement-grant-type/authcode/main/#grant-type-flow + + return authz_sid + + +def oie_enabled(url): + """ + Determines if OIE is enabled. + :pamam url: okta org url + :return: True if OIE is enabled, False otherwise + """ + if get_auth_pipeline(url) == "idx": # oie + return True + else: + return False + + +def authorization_code_request(url): + """First step of Authorization Code Flow + """ + url+= oath2/v1/authorized + client_id + + + +def local_auth(config): + """Authenticate and authorize user on local okta instance. + + :param config: Config object + :return: auth session ID cookie. + """ + + url = config.okta["org"] + + authenticated_sid = idp_authn(config) + + if oie_enabled(url): + auth_sid = idp_authz(config, authenticated_sid) + else: + # if we didnt do authorization, so the auth sid is the authencation sid only. + auth_sid = authenticated_sid + + user.add_sensitive_value_to_be_masked(auth_sid) + return auth_sid + + def is_local_auth(auth_properties): - """Check whether authentication happens locally. + """Check whether authentication happens on the current instance. :param auth_properties: auth_properties dict - :return: True for local auth, False otherwise. + :return: True if this is the place to authenticate, False otherwise. """ try: if auth_properties["type"] == "OKTA": @@ -266,12 +396,13 @@ def is_saml2_auth(auth_properties): return False -def local_auth(config): - """Authenticate local user with okta credential. +def idp_authn(config): + """Authenticate with okta. :param config: Config object :return: MFA session with options """ + logger.debug(f"idp_authn({config}") session_token = None headers = {"content-type": "application/json", "accept": "application/json"} payload = {"username": config.okta["username"], "password": config.okta["password"]} @@ -303,7 +434,7 @@ def saml2_auth(config, auth_properties): logger.info(f"Authentication is being redirected to {saml2_config.okta['org']}.") # Try to authenticate using the new configuration. This could cause # recursive calls, which allows for IdP chaining. - session_cookies = authenticate(saml2_config) + session_cookies = idp_authn(saml2_config) # Once we are authenticated, send the SAML request to the IdP. # This call requires session cookies. diff --git a/tokendito/tokendito.py b/tokendito/tokendito.py index 26c04fbb..07e867b5 100755 --- a/tokendito/tokendito.py +++ b/tokendito/tokendito.py @@ -1,7 +1,8 @@ #!/usr/bin/env python # vim: set filetype=python ts=4 sw=4 # -*- coding: utf-8 -*- -"""tokendito cli entry point.""" +"""tokendito entry point.""" +import logging import sys @@ -12,8 +13,7 @@ def main(args=None): # needed for console script path = os.path.dirname(os.path.dirname(__file__)) sys.path[0:0] = [path] - from tokendito.tool import cli - + from tokendito.user import cli return cli(args) diff --git a/tokendito/tool.py b/tokendito/tool.py deleted file mode 100644 index 26125579..00000000 --- a/tokendito/tool.py +++ /dev/null @@ -1,70 +0,0 @@ -# vim: set filetype=python ts=4 sw=4 -# -*- coding: utf-8 -*- -"""This module retrieves AWS credentials after authenticating with Okta.""" -import logging -import sys - -from tokendito import aws -from tokendito import config -from tokendito import okta -from tokendito import user - -logger = logging.getLogger(__name__) - - -def cli(args): - """Tokendito retrieves AWS credentials after authenticating with Okta.""" - args = user.parse_cli_args(args) - - # Early logging, in case the user requests debugging via env/CLI - user.setup_early_logging(args) - - # Set some required initial values - user.process_options(args) - - # Late logging (default) - user.setup_logging(config.user) - - # Validate configuration - message = user.validate_configuration(config) - if message: - quiet_msg = "" - if config.user["quiet"] is not False: - quiet_msg = " to run in quiet mode" - logger.error( - f"Could not validate configuration{quiet_msg}: {'. '.join(message)}. " - "Please check your settings, and try again." - ) - sys.exit(1) - - # Authenticate to okta - session_cookies = okta.authenticate(config) - - if config.okta["tile"]: - tile_label = "" - config.okta["tile"] = (config.okta["tile"], tile_label) - else: - config.okta["tile"] = user.discover_tiles(config.okta["org"], session_cookies) - - # Authenticate to AWS roles - auth_tiles = aws.authenticate_to_roles(config.okta["tile"], cookies=session_cookies) - - (role_response, role_name) = aws.select_assumeable_role(auth_tiles) - - identity = aws.assert_credentials(role_response=role_response) - if "Arn" not in identity and "UserId" not in identity: - logger.error( - f"There was an error retrieving and verifying AWS credentials: {role_response}" - ) - sys.exit(1) - - user.set_profile_name(config, role_name) - - user.set_local_credentials( - response=role_response, - role=config.aws["profile"], - region=config.aws["region"], - output=config.aws["output"], - ) - - user.display_selected_role(profile_name=config.aws["profile"], role_response=role_response) diff --git a/tokendito/user.py b/tokendito/user.py index 9fff2fb8..944f1e98 100644 --- a/tokendito/user.py +++ b/tokendito/user.py @@ -21,10 +21,11 @@ from bs4 import __version__ as __bs4_version__ # type: ignore (bs4 does not have PEP 561 support) from bs4 import BeautifulSoup import requests -from tokendito import __version__ +from tokendito import config from tokendito import aws +from tokendito import okta +from tokendito import __version__ from tokendito import Config -from tokendito import config as config # Unfortunately, readline is only available in non-Windows systems. There is no substitution. try: @@ -34,9 +35,64 @@ logger = logging.getLogger(__name__) - mask_items = [] +def cli(args): + """Tokendito retrieves AWS credentials after authenticating with Okta.""" + args = parse_cli_args(args) + + # Early logging, in case the user requests debugging via env/CLI + setup_early_logging(args) + + # Set some required initial values + process_options(args) + + # Late logging (default) + setup_logging(config.user) + + # Validate configuration + message = validate_configuration(config) + if message: + quiet_msg = "" + if config.user["quiet"] is not False: + quiet_msg = " to run in quiet mode" + logger.error( + f"Could not validate configuration{quiet_msg}: {'. '.join(message)}. " + "Please check your settings, and try again." + ) + sys.exit(1) + + # get authentication and authorization cookies from okta + session_cookies = okta.idp_auth(config) + + if config.okta["tile"]: + tile_label = "" + config.okta["tile"] = (config.okta["tile"], tile_label) + else: + config.okta["tile"] = discover_tiles(config.okta["org"], session_cookies) + + # Authenticate to AWS roles + auth_tiles = aws.authenticate_to_roles(config.okta["tile"], cookies=session_cookies) + + (role_response, role_name) = aws.select_assumeable_role(auth_tiles) + + identity = aws.assert_credentials(role_response=role_response) + if "Arn" not in identity and "UserId" not in identity: + logger.error( + f"There was an error retrieving and verifying AWS credentials: {role_response}" + ) + sys.exit(1) + + set_profile_name(config, role_name) + + set_local_credentials( + response=role_response, + role=config.aws["profile"], + region=config.aws["region"], + output=config.aws["output"], + ) + + display_selected_role(profile_name=config.aws["profile"], role_response=role_response) class MaskLoggerSecret(logging.Filter): """Masks secrets in logger messages.""" @@ -54,6 +110,8 @@ def filter(self, record): return True + + def parse_cli_args(args): """Parse command line arguments.