Passing AWS profile credentials to Dask Gateway workers #331
-
We've had several people show up on our Qhub from different agencies, bringing with them their own cloud credentials, for example, an AWS keypair that allows that IAM role to write to a specified S3 bucket. They can run How best to communicate these credentials set in the |
Beta Was this translation helpful? Give feedback.
Replies: 8 comments 15 replies
-
I see that @jkellndorfer uses this approach: import configparser
def set_aws_credentials(cfile=os.path.join(os.environ['HOME'],'shared','.aws','credentials'),
profile_name='default', region_name='us-east-1',
endpoint='s3.amazonaws.com',verbose=False):
'''Sets the aws credentials if not set already and profilename is default'''
cp = configparser.ConfigParser()
cp.read(cfile)
os.environ['aws_access_key_id'.upper()]=cp[profile_name]['aws_access_key_id']
os.environ['aws_secret_access_key'.upper()]=cp[profile_name]['aws_secret_access_key']
os.environ['aws_profile'.upper()]=profile_name
os.environ['aws_default_profile'.upper()]=profile_name
os.environ['aws_s3_region'.upper()]=region_name
os.environ['aws_s3_endpoint'.upper()]=endpoint
os.environ['aws_default_region'.upper()]=region_name
if verbose:
print('export {}={}'.format('aws_access_key_id'.upper(),cp[profile_name]['aws_access_key_id'] ))
print('export {}={}'.format('aws_secret_access_key'.upper(),cp[profile_name]['aws_secret_access_key'] )) Is this the best method? |
Beta Was this translation helpful? Give feedback.
-
So on an older version of qhub we had multiple sets of aws credentials and we had a couple of ways of handling this. But that was using dask_kubernetes and not dask_gateway so I'm not sure if those are relevant here. basically the |
Beta Was this translation helpful? Give feedback.
-
@kcpevey how are we handling credentials on our client projects? |
Beta Was this translation helpful? Give feedback.
-
Okay, this is embarrassing, as I totally forgot I already raised this as an issue a few months ago. From that issue, it looks like it's possible to just pass the environment variables via Since our Dask Gateway version doesn't yet allow that on our Qhub, we can use the approach by @jkellndorfer , or here's another approach, which copies the credentials from the import os
from dask.distributed import WorkerPlugin
class UploadFile(WorkerPlugin):
"""A WorkerPlugin to upload a local file to workers.
Parameters
----------
filepath: str
A path to the file to upload
Examples
--------
>>> client.register_worker_plugin(UploadFile(".env"))
"""
def __init__(self, filepath):
"""
Initialize the plugin by reading in the data from the given file.
"""
self.filename = os.path.basename(filepath)
self.dirname = os.path.dirname(filepath)
with open(filepath, "rb") as f:
self.data = f.read()
async def setup(self, worker):
if not os.path.exists(self.dirname):
os.mkdir(self.dirname)
os.chdir(self.dirname)
with open(self.filename, "wb+") as f:
f.write(self.data)
return os.listdir() and then you can copy the credentials by doing: client.register_worker_plugin(UploadFile('/home/jovyan/.aws/credentials'))
client.register_worker_plugin(UploadFile('/home/jovyan/.aws/config')) |
Beta Was this translation helpful? Give feedback.
-
@rsignell-usgs, class InitWorker(WorkerPlugin):
name = "init_worker"
def __init__(self, filepath=None, script=None):
self.data = {}
if filepath:
if isinstance(filepath, str):
filepath = [filepath]
for file_ in filepath:
with open(file_, "rb") as f:
filename = os.path.basename(file_)
self.data[filename] = f.read()
if script:
filename = f"{uuid.uuid1()}.py"
self.data[filename] = script
async def setup(self, worker):
responses = await asyncio.gather(
*[
worker.upload_file(
comm=None, filename=filename, data=data, load=True
)
for filename, data in self.data.items()
]
)
assert all(
len(data) == r["nbytes"]
for r, data in zip(responses, self.data.values())
) The magic happens with Here is a snippet of the script we are passing to the plugin: script = f"""
import os
os.environ["AWS_ACCESS_KEY_ID"] = "{os.getenv("AWS_ACCESS_KEY_ID")}"
os.environ["AWS_SECRET_ACCESS_KEY"] = "{os.getenv("AWS_SECRET_ACCESS_KEY")}"
""" And then to register the plugin: plugin = InitWorker(script=script)
client.register_worker_plugin(plugin) I am sure there are more elegant ways to do this and that this could be cleaned up and updated, but it shows that the approach works and provides flexibility. |
Beta Was this translation helpful? Give feedback.
-
@brl0, very nice. So now I first use the approach of @jkellndorfer to set the env vars from the config file, then set them for the workers using your approach. So the whole thing looks like this: import configparser
import os
def set_aws_credentials(cfile=os.path.join(os.environ['HOME'],'.aws','credentials'),profile_name='default',region_name='us-east-1',endpoint='s3.amazonaws.com',verbose=False):
'''Sets the aws credentials if not set already and profilename is default'''
cp = configparser.ConfigParser()
cp.read(cfile)
os.environ['aws_access_key_id'.upper()]=cp[profile_name]['aws_access_key_id']
os.environ['aws_secret_access_key'.upper()]=cp[profile_name]['aws_secret_access_key']
os.environ['aws_profile'.upper()]=profile_name
os.environ['aws_default_profile'.upper()]=profile_name
os.environ['aws_s3_region'.upper()]=region_name
os.environ['aws_s3_endpoint'.upper()]=endpoint
os.environ['aws_default_region'.upper()]=region_name
if verbose:
print('export {}={}'.format('aws_access_key_id'.upper(),cp[profile_name]['aws_access_key_id'] ))
print('export {}={}'.format('aws_secret_access_key'.upper(),cp[profile_name]['aws_secret_access_key'] )) set_aws_credentials(profile_name='esip-qhub') from dask.distributed import WorkerPlugin
import os
import uuid
import asyncio
class InitWorker(WorkerPlugin):
name = "init_worker"
def __init__(self, filepath=None, script=None):
self.data = {}
if filepath:
if isinstance(filepath, str):
filepath = [filepath]
for file_ in filepath:
with open(file_, "rb") as f:
filename = os.path.basename(file_)
self.data[filename] = f.read()
if script:
filename = f"{uuid.uuid1()}.py"
self.data[filename] = script
async def setup(self, worker):
responses = await asyncio.gather(
*[
worker.upload_file(
comm=None, filename=filename, data=data, load=True
)
for filename, data in self.data.items()
]
)
assert all(
len(data) == r["nbytes"]
for r, data in zip(responses, self.data.values())
) script = f"""
import os
os.environ["AWS_ACCESS_KEY_ID"] = "{os.getenv("AWS_ACCESS_KEY_ID")}"
os.environ["AWS_SECRET_ACCESS_KEY"] = "{os.getenv("AWS_SECRET_ACCESS_KEY")}"
os.environ["AWS_DEFAULT_REGION"] = "{os.getenv("AWS_DEFAULT_REGION")}"
""" plugin = InitWorker(script=script)
client.register_worker_plugin(plugin) |
Beta Was this translation helpful? Give feedback.
-
@dharhas just tipped me off that with #600, we now have a much easier way to pass environment variables to Dask workers. Hurrah! @jkellndorfer we need to upgrade our ESIP Qhub to v0.3.12 to get this. Question: @costrouc , @tylerpotts, @aktech, @danlester or @brl0 , can we just update the part of our config containing
to upgrade from 0.3.11 to 0.3.12 and let github actions do their magic? Or do we need to backup user data, redeploy, recover user data, etc? |
Beta Was this translation helpful? Give feedback.
-
Tried upgrading to 0.3.12 on a development deployment and failed |
Beta Was this translation helpful? Give feedback.
@brl0, very nice.
So now I first use the approach of @jkellndorfer to set the env vars from the config file, then set them for the workers using your approach.
So the whole thing looks like this: