From 2329cf8f7bd0869c9e58e27369afbab678467589 Mon Sep 17 00:00:00 2001 From: Jean-Pierre Sevigny <41591249+sevignyj@users.noreply.github.com> Date: Wed, 16 Aug 2023 10:36:47 -0400 Subject: [PATCH] WIP oie implementation, oauth2 only, authorization code flow. --- .gitignore | 1 + requirements.txt | 1 - tokendito/__init__.py | 16 +- tokendito/__main__.py | 4 +- tokendito/aws.py | 2 +- tokendito/config.py | 1 + tokendito/okta.py | 342 ++++++++++++++++++++++++++++++++++++++--- tokendito/tokendito.py | 6 +- tokendito/tool.py | 70 --------- tokendito/user.py | 98 +++++++++++- 10 files changed, 436 insertions(+), 105 deletions(-) delete mode 100644 tokendito/tool.py diff --git a/.gitignore b/.gitignore index 2954c5dd..6c6ba063 100644 --- a/.gitignore +++ b/.gitignore @@ -91,6 +91,7 @@ venv/ ENV/ env.bak/ venv.bak/ +.vscode/ # Spyder project settings .spyderproject diff --git a/requirements.txt b/requirements.txt index 3ea3a668..7a106b8f 100755 --- a/requirements.txt +++ b/requirements.txt @@ -1,5 +1,4 @@ beautifulsoup4>=4.6.0 botocore>=1.12.36 -certifi>=2022.12.07 # This can be removed when requests updates its requirements from 2017.4.17 to >=2022.12.07 platformdirs>=2.5.4 requests>=2.19.0 diff --git a/tokendito/__init__.py b/tokendito/__init__.py index 20898530..bd651ba6 100644 --- a/tokendito/__init__.py +++ b/tokendito/__init__.py @@ -1,7 +1,10 @@ # vim: set filetype=python ts=4 sw=4 # -*- coding: utf-8 -*- -"""Tokendito module initialization.""" -__version__ = "2.1.2" +"""tokendito module initialization.""" + +from requests import Session + +__version__ = "2.2.0.rc1" __title__ = "tokendito" __description__ = "Get AWS STS tokens from Okta SSO" __long_description_content_type__ = "text/markdown" @@ -9,3 +12,12 @@ __author__ = "tokendito" __author_email__ = "tokendito@dowjones.com" __license__ = "Apache 2.0" + +Session = Session() +Session.headers.update( + { + "User-Agent": f"tokendito/{__version__}", + "content-type": "application/json", + "accept": "application/json", + } +) diff --git a/tokendito/__main__.py b/tokendito/__main__.py index 73cacc9e..99bff005 100755 --- a/tokendito/__main__.py +++ b/tokendito/__main__.py @@ -12,10 +12,10 @@ def main(args=None): # needed for console script path = os.path.dirname(os.path.dirname(__file__)) sys.path[0:0] = [path] - from tokendito.tool import cli + from tokendito.user import cmd_interface try: - return cli(args) + return cmd_interface(args) except KeyboardInterrupt: print("\nInterrupted") sys.exit(1) diff --git a/tokendito/aws.py b/tokendito/aws.py index e5f81ccd..049ffec0 100644 --- a/tokendito/aws.py +++ b/tokendito/aws.py @@ -46,7 +46,7 @@ def get_output_types(): return ["json", "text", "csv", "yaml", "yaml-stream"] -def authenticate_to_roles(urls, cookies=None): +def authenticate_to_roles(urls, cookies): """Authenticate AWS user with saml. :param urls: list of tuples or tuple, with tiles info diff --git a/tokendito/config.py b/tokendito/config.py index eedb3027..c416f890 100644 --- a/tokendito/config.py +++ b/tokendito/config.py @@ -45,6 +45,7 @@ class Config(object): password="", mfa=None, mfa_response=None, + oauth_client_id=None, tile=None, org=None, ), diff --git a/tokendito/okta.py b/tokendito/okta.py index 238b051f..57b75537 100644 --- a/tokendito/okta.py +++ b/tokendito/okta.py @@ -7,10 +7,13 @@ 2. Update Okta Config File """ +import base64 import codecs from copy import deepcopy +import hashlib import json import logging +import os import re import sys import time @@ -19,6 +22,7 @@ from bs4 import BeautifulSoup import requests from tokendito import duo +from tokendito import Session from tokendito import user logger = logging.getLogger(__name__) @@ -73,6 +77,7 @@ def api_error_code_parser(status=None): param status: Response status return message: status message """ + logger.debug(f"api_error_code_parser({status})") if status and status in _status_dict.keys(): message = f"Okta auth failed: {_status_dict[status]}" else: @@ -82,17 +87,41 @@ def api_error_code_parser(status=None): return message +def get_auth_pipeline(url=None): + """Get auth pipeline version.""" + logger.debug(f"get_auth_pipeline({url})") + headers = {"accept": "application/json"} + # https://developer.okta.com/docs/api/openapi/okta-management/management/tag/OrgSetting/ + url = f"{url}/.well-known/okta-organization" + + response = user.request_wrapper("GET", url, headers=headers) + try: + ret = response.json() + except (KeyError, ValueError) as e: + logger.error(f"Failed to parse type in {url}:{str(e)}") + logger.debug(f"Response: {response.text}") + sys.exit(1) + logger.debug(f"we have {ret}") + auth_pipeline = ret.get("pipeline", None) + if auth_pipeline != "idx" and auth_pipeline != "v1": + logger.error(f"unsupported auth pipeline version {auth_pipeline}") + sys.exit(1) + logger.debug(f"Pipeline is of type {auth_pipeline}") + return auth_pipeline + + def get_auth_properties(userid=None, url=None): - """Make a call to the webfinger endpoint. + """Make a call to the webfinger endpoint to get the auth properties metadata. :param userid: User for which we are requesting an auth endpoint. :param url: Site where we are looking up the user. :returns: dictionary with authentication properties. """ + logger.debug(f"get_auth_properies({userid}, {url})") payload = {"resource": f"okta:acct:{userid}", "rel": "okta:idp"} + # payload = {"resource": f"okta:acct:{userid}"} headers = {"accept": "application/jrd+json"} url = f"{url}/.well-known/webfinger" - logger.debug(f"Looking up auth endpoint for {userid} in {url}") response = user.request_wrapper("GET", url, headers=headers, params=payload) @@ -174,7 +203,10 @@ def send_saml_response(saml_response): "SAMLResponse": saml_response["response"], "RelayState": saml_response["relay_state"], } - headers = {"accept": "text/html,application/xhtml+xml,application/xml"} + headers = { + "accept": "text/html,application/xhtml+xml,application/xml", + "Content-Type": "application/x-www-form-urlencoded", + } url = saml_response["post_url"] logger.debug(f"Sending SAML response back to {url}") @@ -195,7 +227,6 @@ def get_session_token(config, primary_auth, headers): :param primary_auth: Primary authentication :return: Session Token from JSON response """ - status = None try: status = primary_auth.get("status", None) except AttributeError: @@ -207,7 +238,7 @@ def get_session_token(config, primary_auth, headers): session_token = mfa_challenge(config, headers, primary_auth) else: logger.debug(f"Error parsing response: {json.dumps(primary_auth)}") - logger.error("Okta auth failed: unknown status.") + logger.error(f"Okta auth failed: unknown status {status}") sys.exit(1) user.add_sensitive_value_to_be_masked(session_token) @@ -215,34 +246,300 @@ def get_session_token(config, primary_auth, headers): return session_token -def authenticate(config): - """Authenticate user. +def get_oauth_token(token_endpoint_url, config, code_verifier, authz_code): + """Get OAuth token from Okta by calling /token endpoint. + + https://developer.okta.com/docs/reference/api/oidc/#token-endpoint + :param url: URL of the Okta OAuth token endpoint + :return: OAuth token + """ + payload = { + "code": f"{authz_code}", + "grant_type": "authorization_code", + "redirect_uri": f"{get_redirect_uri(config)}", + "client_id": f"{get_client_id(config)}", + "code_verifier": f"{code_verifier}", + } + # payload = {"resource": f"okta:acct:{userid}"} + # headers = {"accept": "application/jrd+json"} + # response = user.request_wrapper("GET", token_endpoint_url , headers=headers, params=payload) + + headers = {"accept": "application/json"} + payload = {"code": authz_code} + response = user.request_wrapper("POST", token_endpoint_url, headers=headers, data=payload) + return response.json() + + +def extract_authz_code(response_text): + """Extract authorization code from response text. + + :param response_text: response text from /authorize call + :return: authorization code + """ + authz_code = re.search(r"(?<=code=)[^&]+", response_text).group(0) + return authz_code + + +def get_client_id(config): + """TODO""" + if config.okta["oauth_client_id"] is None: + logger.error("oauth client_id is not set.") + sys.exit(1) + + return config.okta["oauth_client_id"] + + +def get_redirect_uri(config): + """TODO.""" + url = f"{config.okta['org']}/enduser/callback" + return url + + +def get_response_type(): + """TODO.""" + return "code" + + +def get_authorize_scope(): + """TODO + https://developer.okta.com/docs/reference/api/oidc/#access-token-scopes-and-claims + https://developer.okta.com/docs/guides/implement-grant-type/authcodepkce/main/#next-steps + """ + return "profile" + # return """openid profile email + # okta.users.read.self okta.users.manage.self okta.internal.enduser.read + # okta.internal.enduser.manage okta.enduser.dashboard.read + # okta.enduser.dashboard.manage""" + + +def get_oauth_state(): + """generate a random string for state + https://developer.okta.com/docs/guides/implement-grant-type/authcode/main/#flow-specifics + https://developer.okta.com/docs/guides/implement-grant-type/authcodepkce/main/#next-steps + """ + state = hashlib.sha256(os.urandom(1024)).hexdigest() + return state + + +def get_pkce_code_challenge_method(): + """TODO""" + return "S256" + + +def get_pkce_code_challenge(code_verifier=None): + """ + get_pkce_code_challenge + + Base64-URL-encoded string of the SHA256 hash of the code verifier + https://www.oauth.com/oauth2-servers/pkce/authorization-request/ + + :param: code_verifier + :return: code_challenge + """ + + code_challenge = hashlib.sha256(code_verifier.encode("utf-8")).digest() + code_challenge = base64.urlsafe_b64encode(code_challenge).decode("utf-8") + code_challenge = code_challenge.replace("=", "") + return code_challenge + + +def get_pkce_code_verifier(): + """ + to review + """ + # code_verifier = base64.urlsafe_b64encode(os.urandom(32)).decode("utf-8") + # code_verifier = base64.urlsafe_b64encode(os.urandom(50)).rstrip(b'=') + + code_verifier = base64.urlsafe_b64encode(os.urandom(40)).decode("utf-8") + code_verifier = re.sub("[^a-zA-Z0-9]+", "", code_verifier) + return code_verifier + + +def pkce_enabled(): + """TODO""" + return True + + +def get_idx_cookies(cookies): + """TODO""" + idx_val = cookies.get_dict("idx") + return idx_val + + +def oauth_authorize_request(config, authz_code_flow_data, session_token): + """implements authorization code request + calls /authorize endpoint with authenticated session_token. + https://developer.okta.com/docs/reference/api/oidc/#_2-okta-as-the-identity-platform-for-your-app-or-api + :param + :return: authorization code, needed for /token call + """ + logger.debug(f"oauth_code_request({config}, {authz_code_flow_data}, {session_token})") + headers = {"accept": "application/json", "content-type": "application/x-www-form-urlencoded"} + + # "scope": authz_code_flow_data["scope"], + payload = { + "client_id": authz_code_flow_data["client_id"], + "sessionToken": session_token, + "redirect_uri": authz_code_flow_data["redirect_uri"], + "response_type": authz_code_flow_data["response_type"], + "state": authz_code_flow_data["state"], + "code_challenge": authz_code_flow_data["code_challenge"], + "code_challenge_method": authz_code_flow_data["code_challenge_method"], + } + response = user.request_wrapper( + "GET", + authz_code_flow_data["authz_endpoint_url"], + headers=headers, + params=payload, + ) + logger.debug(f"response: {response.headers}") + logger.debug(f"Cookies in response: {response.cookies.get_dict()}") + logger.debug(f"Cookies in Session: {Session.cookies.get_dict()}") + + session_cookies = get_idx_cookies(Session.cookies) + + return session_cookies + + +def authorization_code_flow(config, session_token): + # Authorization Code flow (see + # https://developer.okta.com/docs/guides/implement-grant-type/authcode/main/#about-the-authorization-code-grant + # ) + authz_endpoint_url, token_endpoint_url = get_authorization_server_endpoints(config.okta["org"]) + + authz_code_flow_data = { + "client_id": get_client_id(config), + "redirect_uri": get_redirect_uri(config), + "response_type": get_response_type(), + "scope": get_authorize_scope(), + "state": get_oauth_state(), + "authz_endpoint_url": authz_endpoint_url, + "token_endpoint_url": token_endpoint_url, + } + + if pkce_enabled(): + code_verifier = get_pkce_code_verifier() + authz_code_flow_data["code_verifier"] = code_verifier + authz_code_flow_data["code_challenge"] = get_pkce_code_challenge(code_verifier) + authz_code_flow_data["code_challenge_method"] = get_pkce_code_challenge_method() + + logger.debug(f"authorization_code_worflow({config},{session_token})") + # authz_code = oauth_authorize_request(config, authz_code_flow_data, session_token) + oauth_authorize_request(config, authz_code_flow_data, session_token) + + # authz_token = get_oauth_token(config, authz_code_flow_data, authz_code) + # user.add_sensitive_value_to_be_masked(authz_token) + # return authz_token + + +def authorization_code_enabled(config): + """determines if authorization code grant is enabled + todo + """ + return True + + +def get_authorization_server_endpoints(url=None): + """Get /authorize and /token endpoints from Okta + + :param url: URL of the Okta + :return: tuple of URLs of the /authorize and /token endpoints + """ + url = f"{url}/.well-known/oauth-authorization-server" + headers = {"accept": "application/json"} + response = user.request_wrapper("GET", url, headers=headers) + logger.debug(f"Authorization Server info: {response.json()}") + # todo: handle errors. + return response.json()["authorization_endpoint"], response.json()["token_endpoint"] + + +def oie_authorize(config, authn_sid): + logger.debug(f"oie_auth({config}, {authn_sid})") + # get url where to authorize + if authorization_code_enabled(config): + sid = authorization_code_flow(config, authn_sid) + else: + logger.warning("Authorization Code is not enabled, basically skipping oie.") + sid = authn_sid + return sid + + +def idp_auth(config): + """authenticate and authorize with the idp. Authorize heppens if oie is enabled. :param config: Config object :return: session ID cookie. """ + + logger.debug(f"idp_auth({config})") auth_properties = get_auth_properties(userid=config.okta["username"], url=config.okta["org"]) + if "type" not in auth_properties: logger.error("Okta auth failed: unknown type.") sys.exit(1) - sid = None - if is_local_auth(auth_properties): - session_token = local_auth(config) - sid = user.request_cookies(config.okta["org"], session_token) - elif is_saml2_auth(auth_properties): - sid = saml2_auth(config, auth_properties) + session_cookies = None + session_token = None + + if local_authentication_enabled(auth_properties): + session_token = authenticate_locally(config) + if oie_enabled(config.okta["org"]): + session_token = oie_authorize(config, session_token) # todo: this is a JWT + elif is_saml2_authentication(auth_properties): + session_cookies = saml2_authenticate(config, auth_properties) else: logger.error(f"{auth_properties['type']} login via IdP Discovery is not curretly supported") sys.exit(1) - return sid + + if session_token is not None: + user.add_sensitive_value_to_be_masked(session_token) + # todo: change to idx cookies, for ioe + session_cookies = user.request_cookies(config.okta["org"], session_token) + + # todo: the called needs to handle if it's a sesoin cookies or a web token + # renme sessin_cookies to better name.. + return session_cookies + + +def oie_enabled(url): + """ + Determines if OIE is enabled. + :pamam url: okta org url + :return: True if OIE is enabled, False otherwise + """ + if get_auth_pipeline(url) == "idx": # oie + return True + else: + return False + + +def authenticate_locally(config): + """Authenticate user on local okta instance. + + :param config: Config object + :return: auth session ID cookie. + """ + + logger.debug(f"authenticate_locally({config}") + session_token = None + headers = {"content-type": "application/json", "accept": "application/json"} + payload = {"username": config.okta["username"], "password": config.okta["password"]} + + logger.debug(f"Authenticate user to {config.okta['org']}") + logger.debug(f"Sending {headers}, {payload} to {config.okta['org']}") + primary_auth = api_wrapper(f"{config.okta['org']}/api/v1/authn", payload, headers) + + while session_token is None: + session_token = get_session_token(config, primary_auth, headers) + logger.info(f"User has been succesfully authenticated to {config.okta['org']}.") + return session_token -def is_local_auth(auth_properties): - """Check whether authentication happens locally. +def local_authentication_enabled(auth_properties): + """Check whether authentication happens on the current instance. :param auth_properties: auth_properties dict - :return: True for local auth, False otherwise. + :return: True if this is the place to authenticate, False otherwise. """ try: if auth_properties["type"] == "OKTA": @@ -252,7 +549,7 @@ def is_local_auth(auth_properties): return False -def is_saml2_auth(auth_properties): +def is_saml2_authentication(auth_properties): """Check whether authentication happens via SAML2 on a different IdP. :param auth_properties: auth_properties dict @@ -266,12 +563,13 @@ def is_saml2_auth(auth_properties): return False -def local_auth(config): - """Authenticate local user with okta credential. +def idp_authenticate(config): + """Authenticate with okta. :param config: Config object :return: MFA session with options """ + logger.debug(f"idp_authenticate({config}") session_token = None headers = {"content-type": "application/json", "accept": "application/json"} payload = {"username": config.okta["username"], "password": config.okta["password"]} @@ -286,7 +584,7 @@ def local_auth(config): return session_token -def saml2_auth(config, auth_properties): +def saml2_authenticate(config, auth_properties): """SAML2 authentication flow. :param config: Config object @@ -303,7 +601,7 @@ def saml2_auth(config, auth_properties): logger.info(f"Authentication is being redirected to {saml2_config.okta['org']}.") # Try to authenticate using the new configuration. This could cause # recursive calls, which allows for IdP chaining. - session_cookies = authenticate(saml2_config) + session_cookies = idp_auth(saml2_config) # Once we are authenticated, send the SAML request to the IdP. # This call requires session cookies. diff --git a/tokendito/tokendito.py b/tokendito/tokendito.py index aca7de4d..85cc3747 100755 --- a/tokendito/tokendito.py +++ b/tokendito/tokendito.py @@ -1,7 +1,7 @@ #!/usr/bin/env python # vim: set filetype=python ts=4 sw=4 # -*- coding: utf-8 -*- -"""Tokendito cli entry point.""" +"""tokendito entry point.""" import sys @@ -12,9 +12,9 @@ def main(args=None): # needed for console script path = os.path.dirname(os.path.dirname(__file__)) sys.path[0:0] = [path] - from tokendito.tool import cli + from tokendito.user import cmd_interface - return cli(args) + return cmd_interface(args) if __name__ == "__main__": diff --git a/tokendito/tool.py b/tokendito/tool.py deleted file mode 100644 index 35c4334e..00000000 --- a/tokendito/tool.py +++ /dev/null @@ -1,70 +0,0 @@ -# vim: set filetype=python ts=4 sw=4 -# -*- coding: utf-8 -*- -"""CLI operations.""" -import logging -import sys - -from tokendito import aws -from tokendito import okta -from tokendito import user -from tokendito.config import config - -logger = logging.getLogger(__name__) - - -def cli(args): - """Tokendito retrieves AWS credentials after authenticating with Okta.""" - args = user.parse_cli_args(args) - - # Early logging, in case the user requests debugging via env/CLI - user.setup_early_logging(args) - - # Set some required initial values - user.process_options(args) - - # Late logging (default) - user.setup_logging(config.user) - - # Validate configuration - message = user.validate_configuration(config) - if message: - quiet_msg = "" - if config.user["quiet"] is not False: - quiet_msg = " to run in quiet mode" - logger.error( - f"Could not validate configuration{quiet_msg}: {'. '.join(message)}. " - "Please check your settings, and try again." - ) - sys.exit(1) - - # Authenticate to okta - session_cookies = okta.authenticate(config) - - if config.okta["tile"]: - tile_label = "" - config.okta["tile"] = (config.okta["tile"], tile_label) - else: - config.okta["tile"] = user.discover_tiles(config.okta["org"], session_cookies) - - # Authenticate to AWS roles - auth_tiles = aws.authenticate_to_roles(config.okta["tile"], cookies=session_cookies) - - (role_response, role_name) = aws.select_assumeable_role(auth_tiles) - - identity = aws.assert_credentials(role_response=role_response) - if "Arn" not in identity and "UserId" not in identity: - logger.error( - f"There was an error retrieving and verifying AWS credentials: {role_response}" - ) - sys.exit(1) - - user.set_profile_name(config, role_name) - - user.set_local_credentials( - response=role_response, - role=config.aws["profile"], - region=config.aws["region"], - output=config.aws["output"], - ) - - user.display_selected_role(profile_name=config.aws["profile"], role_response=role_response) diff --git a/tokendito/user.py b/tokendito/user.py index 36d91b7c..f90c95aa 100644 --- a/tokendito/user.py +++ b/tokendito/user.py @@ -7,6 +7,9 @@ import configparser from datetime import timezone from getpass import getpass + +# to debug http messages +import http.client as http_client import json import logging import os @@ -23,6 +26,8 @@ import requests from tokendito import __version__ from tokendito import aws +from tokendito import okta +from tokendito import Session from tokendito.config import Config from tokendito.config import config @@ -34,10 +39,67 @@ logger = logging.getLogger(__name__) - mask_items = [] +def cmd_interface(args): + """Tokendito retrieves AWS credentials after authenticating with Okta.""" + args = parse_cli_args(args) + + # Early logging, in case the user requests debugging via env/CLI + setup_early_logging(args) + + # Set some required initial values + process_options(args) + + # Late logging (default) + setup_logging(config.user) + + # Validate configuration + message = validate_configuration(config) + if message: + quiet_msg = "" + if config.user["quiet"] is not False: + quiet_msg = " to run in quiet mode" + logger.error( + f"Could not validate configuration{quiet_msg}: {'. '.join(message)}. " + "Please check your settings, and try again." + ) + sys.exit(1) + + # get authentication and authorization cookies from okta + session_cookies = okta.idp_auth(config) + + if config.okta["tile"]: + tile_label = "" + config.okta["tile"] = (config.okta["tile"], tile_label) + else: + config.okta["tile"] = discover_tiles(config.okta["org"], session_cookies) + + # Authenticate to AWS roles + auth_tiles = aws.authenticate_to_roles(config.okta["tile"], session_cookies) + + (role_response, role_name) = aws.select_assumeable_role(auth_tiles) + + identity = aws.assert_credentials(role_response=role_response) + if "Arn" not in identity and "UserId" not in identity: + logger.error( + f"There was an error retrieving and verifying AWS credentials: {role_response}" + ) + sys.exit(1) + + set_profile_name(config, role_name) + + set_local_credentials( + response=role_response, + role=config.aws["profile"], + region=config.aws["region"], + output=config.aws["output"], + ) + + display_selected_role(profile_name=config.aws["profile"], role_response=role_response) + + class MaskLoggerSecret(logging.Filter): """Masks secrets in logger messages.""" @@ -133,6 +195,12 @@ def parse_cli_args(args): "--okta-tile", help="Okta tile URL to use.", ) + # oauth_client_id + + parser.add_argument( + "--okta-oauth-client-id", + help="Sets the Okta client ID needed in oauth2.", + ) parser.add_argument( "--okta-mfa", help="Sets the MFA method. You " @@ -627,6 +695,7 @@ def process_arguments(args): pattern = re.compile(r"^(.*?)_(.*)") for key, val in vars(args).items(): + logger.debug(f"key is {key} and val is {val}") match = re.search(pattern, key.lower()) if match: if match.group(1) not in get_submodule_names(): @@ -1194,6 +1263,7 @@ def sanitize_config_values(config): config.aws["shared_credentials_file"] = os.path.expanduser( config.aws["shared_credentials_file"] ) + return config @@ -1219,6 +1289,11 @@ def request_cookies(url, session_token): return cookies +def request_oie_cookie(url, idx_id): + """ """ + return "todo" + + def discover_tiles(url, cookies): """ Discover aws tile url on user's okta dashboard. @@ -1267,11 +1342,23 @@ def request_wrapper(method, url, headers=None, **kwargs): :returns: response object """ if headers is None: - headers = {"content-type": "application/json", "accept": "application/json"} + Session.headers.update({"content-type": "application/json", "accept": "application/json"}) + else: + Session.headers.update(headers) + + if logging.getLogger(__name__).level == logging.DEBUG: + # set http log level to debug if we're in debug + # import http.client as httplib - logger.debug(f"Issuing {method} request to {url} with {headers} and {kwargs}") + http_client.HTTPConnection.debuglevel = 1 + requests_log = logging.getLogger("requests.packages.urllib3") + requests_log.setLevel(logging.DEBUG) + requests_log.addFilter(MaskLoggerSecret()) + requests_log.propagate = True + + logger.debug(f"Issuing request = session.request({method}, {url}, {kwargs})") try: - response = requests.request(method=method, url=url, headers=headers, **kwargs) + response = Session.request(method=method, url=url, **kwargs) response.raise_for_status() except requests.exceptions.HTTPError as err: logger.error( @@ -1283,4 +1370,7 @@ def request_wrapper(method, url, headers=None, **kwargs): logger.error(f"The {method} request to {url} failed with {err}") sys.exit(1) + if headers is not None: + Session.headers.update({"content-type": "application/json", "accept": "application/json"}) + return response