Skip to content

Commit

Permalink
Add --ensure-temporary and fix for IMDS/ECS
Browse files Browse the repository at this point in the history
  • Loading branch information
benkehoe committed Jul 26, 2023
1 parent 054b59e commit 08197f9
Show file tree
Hide file tree
Showing 3 changed files with 50 additions and 26 deletions.
14 changes: 10 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,11 @@
>
> If you want to inject refreshable credentials into a locally-run container, [`imds-credential-server`](https://github.com/benkehoe/imds-credential-server) is a more focused solution for that.
There are a number of other projects that extract AWS credentials and/or inject them into programs, but all the ones I've seen use the CLI's cache files directly, rather than leveraging botocore's ability to retrieve and refresh credentials.
There are a number of other projects that extract AWS credentials and/or inject them into programs, but all the ones I've seen use the CLI's cache files directly, rather than leveraging boto3's ability to retrieve and refresh credentials.
So I wrote this to do that.

[botocore (the underlying Python SDK library)](https://botocore.amazonaws.com/v1/documentation/api/latest/index.html) has added support for loading credentials cached by [`aws sso login`](https://awscli.amazonaws.com/v2/documentation/api/latest/reference/sso/login.html) as of [version 1.17.0](https://github.com/boto/botocore/blob/develop/CHANGELOG.rst#1170).
`aws-export-credentials` now requires botocore >= 1.17.0, and so supports AWS SSO credentials as well.
[boto3 (the AWS Python SDK)](https://boto3.amazonaws.com/v1/documentation/api/latest/index.html) has added support for loading credentials cached by [`aws sso login`](https://awscli.amazonaws.com/v2/documentation/api/latest/reference/sso/login.html) as of [botocore version 1.17.0](https://github.com/boto/botocore/blob/develop/CHANGELOG.rst#1170).
`aws-export-credentials` now requires boto3 >= 1.14.0, and so supports AWS SSO credentials as well.
If all you want is AWS SSO support for an SDK other than Python, Go, or JavaScript (v3), take a look at [aws-sso-util](https://github.com/benkehoe/aws-sso-util#adding-aws-sso-support-to-aws-sdks), which can help you configure your profiles with a [credential process](https://docs.aws.amazon.com/cli/latest/userguide/cli-configure-sourcing-external.html) that doesn't require the credential injection process that `aws-export-credentials` does.

## Quickstart
Expand Down Expand Up @@ -37,7 +37,7 @@ You can also download the Python file directly [here](https://raw.githubusercont

## Usage
### Profile
Profiles work like in the AWS CLI (since it uses botocore); it will pick up the `AWS_PROFILE`
Profiles work like in the AWS CLI (since it uses boto3); it will pick up the `AWS_PROFILE`
or `AWS_DEFAULT_PROFILE` env vars, but the `--profile` argument takes precedence.

### JSON
Expand Down Expand Up @@ -132,6 +132,12 @@ You can change this value using the `--cache-expiration-buffer` argument, which

You can force the cache to refresh using `--refresh`.

## Temporary credentials
IAM Users and the account root have long-term credentials, which carry higher risk than temporary credentials.
The [STS.GetSessionToken](https://docs.aws.amazon.com/STS/latest/APIReference/API_GetSessionToken.html) API call exists to provide temporary credentials based on these long-term credentials.
If you pass the `--ensure-temporary` flag, it will use this API to get temporary credentials to export for an IAM User or account root.
This process is always used for `--imds` and `--container`.

# Role assumption
In general, it's better to do role assumption by using profiles in `~/.aws/config` like this:

Expand Down
60 changes: 39 additions & 21 deletions aws_export_credentials/aws_export_credentials.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
import secrets
import subprocess

from botocore.session import Session
from boto3 import Session

__version__ = '0.16.0'

Expand Down Expand Up @@ -71,26 +71,50 @@ def parse_imds_arg(value):
return host, port

def serialize_date(dt):
if isinstance(dt, str):
return dt
return dt.strftime(TIME_FORMAT)

def deserialize_date(dt_str):
return datetime.strptime(dt_str, TIME_FORMAT).replace(tzinfo=timezone.utc)

def get_credentials(session):
def get_credentials(session, ensure_temporary=False, ensure_expiration=False):
session_credentials = session.get_credentials()
if not session_credentials:
return None

read_only_credentials = session_credentials.get_frozen_credentials()

if ensure_temporary and not read_only_credentials.token:
return get_temporary_credentials(session)

expiration = None

if hasattr(session_credentials, '_expiry_time') and session_credentials._expiry_time:
if isinstance(session_credentials._expiry_time, datetime):
expiration = session_credentials._expiry_time
else:
LOGGER.debug("Expiration in session credentials is of type {}, not datetime".format(type(expiration)))

if not expiration and ensure_expiration:
# provide an expiration, even if it's wrong
expiration = datetime.now(tz=timezone.utc) + timedelta(hours=1)

credentials = convert_creds(read_only_credentials, expiration)
return credentials

def get_temporary_credentials(session):
sts_client = session.client('sts')
response = sts_client.get_session_token()
response_creds = response['Credentials']

return Credentials(
AccessKeyId=response_creds['AccessKeyId'],
SecretAccessKey=response_creds['SecretAccessKey'],
SessionToken=response_creds['SessionToken'],
Expiration=response_creds['Expiration']
)

def main():
parser = argparse.ArgumentParser(description=DESCRIPTION)

Expand All @@ -105,6 +129,7 @@ def main():
group.add_argument('--container', nargs=2, metavar=('HOST_PORT', 'TOKEN'), help="Start an ECS-compatible server on [HOST:]PORT, requires TOKEN for auth")
group.add_argument('--imds', metavar='HOST_PORT', help="Start an IMDSv2 server on [HOST:]PORT")

parser.add_argument('--ensure-temporary', action='store_true', help='Get temporary credentials for IAM and root users')
parser.add_argument('--pretty', action='store_true', help='For --json, pretty-print')

parser.add_argument('--version', action='store_true')
Expand Down Expand Up @@ -158,12 +183,12 @@ def main():
credentials = load_cache(args.cache_file, args.cache_expiration_buffer)

if credentials and args.credentials_file_profile:
session = Session(profile=args.profile)
session = Session(profile_name=args.profile)
elif not credentials:
try:
session = Session(profile=args.profile)
session = Session(profile_name=args.profile)

credentials = get_credentials(session)
credentials = get_credentials(session, ensure_temporary=args.ensure_temporary)

if not credentials:
print('Unable to locate credentials.', file=sys.stderr)
Expand Down Expand Up @@ -193,10 +218,10 @@ def main():
if credentials.Expiration:
env['AWS_CREDENTIAL_EXPIRATION'] = serialize_date(credentials.Expiration)

region_name = session.get_config_variable('region')
region_name = session.region_name
if region_name:
env['AWS_DEFAULT_REGION'] = region_name

command = ' '.join(shlex.quote(arg) for arg in args.exec)
result = subprocess.run(command, shell=True, env=env)
sys.exit(result.returncode)
Expand Down Expand Up @@ -385,15 +410,13 @@ def do_GET(self):
body = {"Error": {"Code": "InvalidToken", "Message": "The provided token was invalid"}}
else:
self.send_response(HTTPStatus.OK)
credentials = get_credentials(self._session)
credentials = get_credentials(self._session, ensure_temporary=True, ensure_expiration=True)
body = {
'AccessKeyId': credentials.AccessKeyId,
'SecretAccessKey': credentials.SecretAccessKey,
'Token': credentials.SessionToken,
'Expiration': serialize_date(credentials.Expiration)
}
if credentials.SessionToken:
body['Token'] = credentials.SessionToken
if credentials.Expiration:
body['Expiration'] = serialize_date(credentials.Expiration)

body_bytes = json.dumps(body).encode('utf-8')

Expand All @@ -407,7 +430,7 @@ class IMDSRequestHandler(BaseHTTPRequestHandler):
def __init__(self, request, client_address, server, token, session):
self._token = token
self._session = session
self._sts_client = self._session.create_client("sts")
self._sts_client = self._session.client("sts")
self._role_name = None
super().__init__(request, client_address, server)

Expand Down Expand Up @@ -494,18 +517,13 @@ def do_GET(self):
"The role name is incorrect"
)
else:
credentials = get_credentials(self._session)
credentials = get_credentials(self._session, ensure_temporary=True, ensure_expiration=True)
body = {
'AccessKeyId': credentials.AccessKeyId,
'SecretAccessKey': credentials.SecretAccessKey,
'Token': credentials.SessionToken,
'Expiration': serialize_date(credentials.Expiration)
}
if credentials.SessionToken:
body['Token'] = credentials.SessionToken
if credentials.Expiration:
body['Expiration'] = serialize_date(credentials.Expiration)
else:
# An expiration is required
body['Expiration'] = serialize_date(datetime.now(tz=timezone.utc) + timedelta(minutes=60))
return self.send_ok("application/json", body)
return self.send_error(
HTTPStatus.NOT_FOUND,
Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ aws-export-credentials = 'aws_export_credentials:main'

[tool.poetry.dependencies]
python = "^3.7"
botocore = ">=1.17"
boto3 = ">=1.14"

[tool.poetry.dev-dependencies]

Expand Down

0 comments on commit 08197f9

Please sign in to comment.