Skip to content

Commit

Permalink
Linting 3
Browse files Browse the repository at this point in the history
  • Loading branch information
lbrndnr committed Apr 26, 2022
1 parent 9ea8d39 commit 89dafe4
Show file tree
Hide file tree
Showing 42 changed files with 885 additions and 558 deletions.
113 changes: 62 additions & 51 deletions sebs/aws/aws.py
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,9 @@ def get_storage(self, replace_existing: bool = False) -> PersistentStorage:
benchmark: benchmark name
"""

def package_code(self, code_package: CodePackage, directory: str, is_workflow: bool) -> Tuple[str, int]:
def package_code(
self, code_package: CodePackage, directory: str, is_workflow: bool
) -> Tuple[str, int]:
CONFIG_FILES = {
"python": ["handler.py", "requirements.txt", ".python_packages"],
"nodejs": ["handler.js", "package.json", "node_modules"],
Expand All @@ -152,8 +154,12 @@ def package_code(self, code_package: CodePackage, directory: str, is_workflow: b
file = os.path.join(directory, file)
shutil.move(file, function_dir)

handler_path = os.path.join(directory, CONFIG_FILES[code_package.language_name][0])
replace_string_in_file(handler_path, "{{REDIS_HOST}}", f"\"{self.config.redis_host}\"")
handler_path = os.path.join(
directory, CONFIG_FILES[code_package.language_name][0]
)
replace_string_in_file(
handler_path, "{{REDIS_HOST}}", f'"{self.config.redis_host}"'
)

# For python, add an __init__ file
if code_package.language_name == "python":
Expand All @@ -163,13 +169,15 @@ def package_code(self, code_package: CodePackage, directory: str, is_workflow: b

# FIXME: use zipfile
# create zip with hidden directory but without parent directory
execute("zip -qu -r9 {}.zip * .".format(code_package.name),
shell=True, cwd=directory)
execute(
"zip -qu -r9 {}.zip * .".format(code_package.name),
shell=True,
cwd=directory,
)
benchmark_archive = "{}.zip".format(os.path.join(directory, code_package.name))
self.logging.info("Created {} archive".format(benchmark_archive))

bytes_size = os.path.getsize(
os.path.join(directory, benchmark_archive))
bytes_size = os.path.getsize(os.path.join(directory, benchmark_archive))
mbytes = bytes_size / 1024.0 / 1024.0
self.logging.info("Zip archive size {:2f} MB".format(mbytes))

Expand All @@ -195,10 +203,13 @@ def wait_for_function(self, func_name: str):

if backoff_delay > 60:
self.logging.error(
f"Function {func_name} stuck in state {state} after 60s")
f"Function {func_name} stuck in state {state} after 60s"
)
break

def create_function(self, code_package: CodePackage, func_name: str) -> "LambdaFunction":
def create_function(
self, code_package: CodePackage, func_name: str
) -> "LambdaFunction":
package = code_package.code_location
benchmark = code_package.name
language = code_package.language_name
Expand All @@ -215,8 +226,7 @@ def create_function(self, code_package: CodePackage, func_name: str) -> "LambdaF
try:
ret = self.lambda_client.get_function(FunctionName=func_name)
self.logging.info(
"Function {} exists on AWS, retrieve configuration.".format(
func_name)
"Function {} exists on AWS, retrieve configuration.".format(func_name)
)
# Here we assume a single Lambda role
lambda_function = LambdaFunction(
Expand All @@ -233,8 +243,7 @@ def create_function(self, code_package: CodePackage, func_name: str) -> "LambdaF
lambda_function.updated_code = True
# TODO: get configuration of REST API
except self.lambda_client.exceptions.ResourceNotFoundException:
self.logging.info(
"Creating function {} from {}".format(func_name, package))
self.logging.info("Creating function {} from {}".format(func_name, package))

# AWS Lambda limit on zip deployment size
# Limit to 50 MB
Expand All @@ -250,9 +259,9 @@ def create_function(self, code_package: CodePackage, func_name: str) -> "LambdaF
code_bucket, idx = storage_client.add_input_bucket(benchmark)
storage_client.upload(code_bucket, package, code_package_name)
self.logging.info(
"Uploading function {} code to {}".format(func_name, code_bucket))
code_config = {"S3Bucket": code_bucket,
"S3Key": code_package_name}
"Uploading function {} code to {}".format(func_name, code_bucket)
)
code_config = {"S3Bucket": code_bucket, "S3Key": code_package_name}
ret = self.lambda_client.create_function(
FunctionName=func_name,
Runtime="{}{}".format(language, language_runtime),
Expand Down Expand Up @@ -318,7 +327,8 @@ def update_function(self, function: Function, code_package: CodePackage):
if code_size < 50 * 1024 * 1024:
with open(package, "rb") as code_body:
self.lambda_client.update_function_code(
FunctionName=name, ZipFile=code_body.read())
FunctionName=name, ZipFile=code_body.read()
)
# Upload code package to S3, then update
else:
code_package_name = os.path.basename(package)
Expand All @@ -338,15 +348,16 @@ def update_function(self, function: Function, code_package: CodePackage):
)
self.logging.info("Published new function code")

def create_function_trigger(self, func: Function, trigger_type: Trigger.TriggerType) -> Trigger:
def create_function_trigger(
self, func: Function, trigger_type: Trigger.TriggerType
) -> Trigger:
from sebs.aws.triggers import HTTPTrigger

function = cast(LambdaFunction, func)

if trigger_type == Trigger.TriggerType.HTTP:
api_name = "{}-http-api".format(function.name)
http_api = self.config.resources.http_api(
api_name, function, self.session)
http_api = self.config.resources.http_api(api_name, function, self.session)
# https://aws.amazon.com/blogs/compute/announcing-http-apis-for-amazon-api-gateway/
# but this is wrong - source arn must be {api-arn}/*/*
self.get_lambda_client().add_permission(
Expand All @@ -368,21 +379,24 @@ def create_function_trigger(self, func: Function, trigger_type: Trigger.TriggerT
self.cache_client.update_benchmark(function)
return trigger

def create_workflow(self, code_package: CodePackage, workflow_name: str) -> "SFNWorkflow":
def create_workflow(
self, code_package: CodePackage, workflow_name: str
) -> "SFNWorkflow":

workflow_name = AWS.format_resource_name(workflow_name)

# Make sure we have a valid workflow benchmark
definition_path = os.path.join(
code_package.path, "definition.json")
definition_path = os.path.join(code_package.path, "definition.json")
if not os.path.exists(definition_path):
raise ValueError(
f"No workflow definition found for {workflow_name}")
raise ValueError(f"No workflow definition found for {workflow_name}")

# First we create a lambda function for each code file
code_files = list(code_package.get_code_files(include_config=False))
func_names = [os.path.splitext(os.path.basename(p))[0] for p in code_files]
funcs = [self.create_function(code_package, workflow_name+"___"+fn) for fn in func_names]
funcs = [
self.create_function(code_package, workflow_name + "___" + fn)
for fn in func_names
]

# Generate workflow definition.json
gen = SFNGenerator({n: f.arn for (n, f) in zip(func_names, funcs)})
Expand All @@ -401,30 +415,28 @@ def create_workflow(self, code_package: CodePackage, workflow_name: str) -> "SFN
)

self.logging.info(
"Creating workflow {} from {}".format(workflow_name, package))
"Creating workflow {} from {}".format(workflow_name, package)
)

workflow = SFNWorkflow(
workflow_name,
funcs,
code_package.name,
ret["stateMachineArn"],
code_package.hash
code_package.hash,
)
except self.sfn_client.exceptions.StateMachineAlreadyExists as e:
arn = re.search("'([^']*)'", str(e)).group()[1:-1]

self.logging.info(
"Workflow {} exists on AWS, retrieve configuration.".format(
workflow_name)
workflow_name
)
)

# Here we assume a single Lambda role
workflow = SFNWorkflow(
workflow_name,
funcs,
code_package.name,
arn,
code_package.hash
workflow_name, funcs, code_package.name, arn, code_package.hash
)

self.update_workflow(workflow, code_package)
Expand All @@ -443,16 +455,17 @@ def update_workflow(self, workflow: Workflow, code_package: CodePackage):
workflow = cast(SFNWorkflow, workflow)

# Make sure we have a valid workflow benchmark
definition_path = os.path.join(
code_package.path, "definition.json")
definition_path = os.path.join(code_package.path, "definition.json")
if not os.path.exists(definition_path):
raise ValueError(
f"No workflow definition found for {workflow.name}")
raise ValueError(f"No workflow definition found for {workflow.name}")

# Create or update lambda function for each code file
code_files = list(code_package.get_code_files(include_config=False))
func_names = [os.path.splitext(os.path.basename(p))[0] for p in code_files]
funcs = [self.create_function(code_package, workflow.name+"___"+fn) for fn in func_names]
funcs = [
self.create_function(code_package, workflow.name + "___" + fn)
for fn in func_names
]

# Generate workflow definition.json
gen = SFNGenerator({n: f.arn for (n, f) in zip(func_names, funcs)})
Expand All @@ -467,7 +480,9 @@ def update_workflow(self, workflow: Workflow, code_package: CodePackage):
workflow.functions = funcs
self.logging.info("Published new workflow code")

def create_workflow_trigger(self, workflow: Workflow, trigger_type: Trigger.TriggerType) -> Trigger:
def create_workflow_trigger(
self, workflow: Workflow, trigger_type: Trigger.TriggerType
) -> Trigger:
workflow = cast(SFNWorkflow, workflow)

if trigger_type == Trigger.TriggerType.HTTP:
Expand Down Expand Up @@ -547,12 +562,12 @@ def parse_aws_report(
return request_id
output = requests[request_id]
output.request_id = request_id
output.provider_times.execution = int(
float(aws_vals["Duration"]) * 1000)
output.provider_times.execution = int(float(aws_vals["Duration"]) * 1000)
output.stats.memory_used = float(aws_vals["Max Memory Used"])
if "Init Duration" in aws_vals:
output.provider_times.initialization = int(
float(aws_vals["Init Duration"]) * 1000)
float(aws_vals["Init Duration"]) * 1000
)
output.billing.billed_time = int(aws_vals["Billed Duration"])
output.billing.memory = int(aws_vals["Memory Size"])
output.billing.gb_seconds = output.billing.billed_time * output.billing.memory
Expand Down Expand Up @@ -586,14 +601,12 @@ def get_invocation_error(self, function_name: str, start_time: int, end_time: in
time.sleep(5)
response = self.logs_client.get_query_results(queryId=query_id)
if len(response["results"]) == 0:
self.logging.info(
"AWS logs are not yet available, repeat after 15s...")
self.logging.info("AWS logs are not yet available, repeat after 15s...")
time.sleep(15)
response = None
else:
break
self.logging.error(
f"Invocation error for AWS Lambda function {function_name}")
self.logging.error(f"Invocation error for AWS Lambda function {function_name}")
for message in response["results"]:
for value in message:
if value["field"] == "@message":
Expand Down Expand Up @@ -640,8 +653,7 @@ def download_metrics(
for val in results:
for result_part in val:
if result_part["field"] == "@message":
request_id = AWS.parse_aws_report(
result_part["value"], requests)
request_id = AWS.parse_aws_report(result_part["value"], requests)
if request_id in requests:
results_processed += 1
requests_ids.remove(request_id)
Expand All @@ -656,8 +668,7 @@ def _enforce_cold_start(self, function: Function):
FunctionName=func.name,
Timeout=func.timeout,
MemorySize=func.memory,
Environment={"Variables": {
"ForceColdStart": str(self.cold_start_counter)}},
Environment={"Variables": {"ForceColdStart": str(self.cold_start_counter)}},
)

def enforce_cold_start(self, functions: List[Function], code_package: CodePackage):
Expand Down
Loading

0 comments on commit 89dafe4

Please sign in to comment.