From 89dafe48166e57fdced845a490769226265276d1 Mon Sep 17 00:00:00 2001 From: Laurin Brandner Date: Tue, 26 Apr 2022 15:00:40 +0200 Subject: [PATCH] Linting 3 --- sebs/aws/aws.py | 113 +++++++------ sebs/aws/config.py | 75 +++++---- sebs/aws/function.py | 4 +- sebs/aws/generator.py | 31 ++-- sebs/aws/s3.py | 12 +- sebs/aws/triggers.py | 27 ++-- sebs/aws/workflow.py | 10 +- sebs/azure/azure.py | 138 +++++++++------- sebs/azure/blob_storage.py | 8 +- sebs/azure/cli.py | 4 +- sebs/azure/config.py | 64 ++++++-- sebs/azure/function_app.py | 4 +- sebs/azure/triggers.py | 4 +- sebs/cache.py | 46 ++++-- sebs/code_package.py | 51 ++++-- sebs/config.py | 28 ++-- sebs/experiments/environment.py | 16 +- sebs/experiments/eviction_model.py | 24 ++- sebs/experiments/invocation_overhead.py | 62 +++++-- sebs/experiments/network_ping_pong.py | 16 +- sebs/experiments/perf_cost.py | 38 +++-- sebs/experiments/result.py | 8 +- sebs/faas/benchmark.py | 42 +++-- sebs/faas/config.py | 8 +- sebs/faas/fsm.py | 54 ++----- sebs/faas/storage.py | 20 ++- sebs/faas/system.py | 44 +++-- sebs/gcp/config.py | 42 +++-- sebs/gcp/function.py | 4 +- sebs/gcp/gcp.py | 204 ++++++++++++++---------- sebs/gcp/generator.py | 89 ++++------- sebs/gcp/storage.py | 12 +- sebs/gcp/triggers.py | 14 +- sebs/gcp/workflow.py | 4 +- sebs/local/config.py | 4 +- sebs/local/deployment.py | 6 +- sebs/local/function.py | 7 +- sebs/local/local.py | 26 ++- sebs/local/storage.py | 21 ++- sebs/regression.py | 30 +++- sebs/sebs.py | 8 +- sebs/utils.py | 21 ++- 42 files changed, 885 insertions(+), 558 deletions(-) diff --git a/sebs/aws/aws.py b/sebs/aws/aws.py index 65c8dd8d..eab47b33 100644 --- a/sebs/aws/aws.py +++ b/sebs/aws/aws.py @@ -138,7 +138,9 @@ def get_storage(self, replace_existing: bool = False) -> PersistentStorage: benchmark: benchmark name """ - def package_code(self, code_package: CodePackage, directory: str, is_workflow: bool) -> Tuple[str, int]: + def package_code( + self, code_package: CodePackage, directory: str, is_workflow: bool + ) -> Tuple[str, int]: CONFIG_FILES = { "python": ["handler.py", "requirements.txt", ".python_packages"], "nodejs": ["handler.js", "package.json", "node_modules"], @@ -152,8 +154,12 @@ def package_code(self, code_package: CodePackage, directory: str, is_workflow: b file = os.path.join(directory, file) shutil.move(file, function_dir) - handler_path = os.path.join(directory, CONFIG_FILES[code_package.language_name][0]) - replace_string_in_file(handler_path, "{{REDIS_HOST}}", f"\"{self.config.redis_host}\"") + handler_path = os.path.join( + directory, CONFIG_FILES[code_package.language_name][0] + ) + replace_string_in_file( + handler_path, "{{REDIS_HOST}}", f'"{self.config.redis_host}"' + ) # For python, add an __init__ file if code_package.language_name == "python": @@ -163,13 +169,15 @@ def package_code(self, code_package: CodePackage, directory: str, is_workflow: b # FIXME: use zipfile # create zip with hidden directory but without parent directory - execute("zip -qu -r9 {}.zip * .".format(code_package.name), - shell=True, cwd=directory) + execute( + "zip -qu -r9 {}.zip * .".format(code_package.name), + shell=True, + cwd=directory, + ) benchmark_archive = "{}.zip".format(os.path.join(directory, code_package.name)) self.logging.info("Created {} archive".format(benchmark_archive)) - bytes_size = os.path.getsize( - os.path.join(directory, benchmark_archive)) + bytes_size = os.path.getsize(os.path.join(directory, benchmark_archive)) mbytes = bytes_size / 1024.0 / 1024.0 self.logging.info("Zip archive size {:2f} MB".format(mbytes)) @@ -195,10 +203,13 @@ def wait_for_function(self, func_name: str): if backoff_delay > 60: self.logging.error( - f"Function {func_name} stuck in state {state} after 60s") + f"Function {func_name} stuck in state {state} after 60s" + ) break - def create_function(self, code_package: CodePackage, func_name: str) -> "LambdaFunction": + def create_function( + self, code_package: CodePackage, func_name: str + ) -> "LambdaFunction": package = code_package.code_location benchmark = code_package.name language = code_package.language_name @@ -215,8 +226,7 @@ def create_function(self, code_package: CodePackage, func_name: str) -> "LambdaF try: ret = self.lambda_client.get_function(FunctionName=func_name) self.logging.info( - "Function {} exists on AWS, retrieve configuration.".format( - func_name) + "Function {} exists on AWS, retrieve configuration.".format(func_name) ) # Here we assume a single Lambda role lambda_function = LambdaFunction( @@ -233,8 +243,7 @@ def create_function(self, code_package: CodePackage, func_name: str) -> "LambdaF lambda_function.updated_code = True # TODO: get configuration of REST API except self.lambda_client.exceptions.ResourceNotFoundException: - self.logging.info( - "Creating function {} from {}".format(func_name, package)) + self.logging.info("Creating function {} from {}".format(func_name, package)) # AWS Lambda limit on zip deployment size # Limit to 50 MB @@ -250,9 +259,9 @@ def create_function(self, code_package: CodePackage, func_name: str) -> "LambdaF code_bucket, idx = storage_client.add_input_bucket(benchmark) storage_client.upload(code_bucket, package, code_package_name) self.logging.info( - "Uploading function {} code to {}".format(func_name, code_bucket)) - code_config = {"S3Bucket": code_bucket, - "S3Key": code_package_name} + "Uploading function {} code to {}".format(func_name, code_bucket) + ) + code_config = {"S3Bucket": code_bucket, "S3Key": code_package_name} ret = self.lambda_client.create_function( FunctionName=func_name, Runtime="{}{}".format(language, language_runtime), @@ -318,7 +327,8 @@ def update_function(self, function: Function, code_package: CodePackage): if code_size < 50 * 1024 * 1024: with open(package, "rb") as code_body: self.lambda_client.update_function_code( - FunctionName=name, ZipFile=code_body.read()) + FunctionName=name, ZipFile=code_body.read() + ) # Upload code package to S3, then update else: code_package_name = os.path.basename(package) @@ -338,15 +348,16 @@ def update_function(self, function: Function, code_package: CodePackage): ) self.logging.info("Published new function code") - def create_function_trigger(self, func: Function, trigger_type: Trigger.TriggerType) -> Trigger: + def create_function_trigger( + self, func: Function, trigger_type: Trigger.TriggerType + ) -> Trigger: from sebs.aws.triggers import HTTPTrigger function = cast(LambdaFunction, func) if trigger_type == Trigger.TriggerType.HTTP: api_name = "{}-http-api".format(function.name) - http_api = self.config.resources.http_api( - api_name, function, self.session) + http_api = self.config.resources.http_api(api_name, function, self.session) # https://aws.amazon.com/blogs/compute/announcing-http-apis-for-amazon-api-gateway/ # but this is wrong - source arn must be {api-arn}/*/* self.get_lambda_client().add_permission( @@ -368,21 +379,24 @@ def create_function_trigger(self, func: Function, trigger_type: Trigger.TriggerT self.cache_client.update_benchmark(function) return trigger - def create_workflow(self, code_package: CodePackage, workflow_name: str) -> "SFNWorkflow": + def create_workflow( + self, code_package: CodePackage, workflow_name: str + ) -> "SFNWorkflow": workflow_name = AWS.format_resource_name(workflow_name) # Make sure we have a valid workflow benchmark - definition_path = os.path.join( - code_package.path, "definition.json") + definition_path = os.path.join(code_package.path, "definition.json") if not os.path.exists(definition_path): - raise ValueError( - f"No workflow definition found for {workflow_name}") + raise ValueError(f"No workflow definition found for {workflow_name}") # First we create a lambda function for each code file code_files = list(code_package.get_code_files(include_config=False)) func_names = [os.path.splitext(os.path.basename(p))[0] for p in code_files] - funcs = [self.create_function(code_package, workflow_name+"___"+fn) for fn in func_names] + funcs = [ + self.create_function(code_package, workflow_name + "___" + fn) + for fn in func_names + ] # Generate workflow definition.json gen = SFNGenerator({n: f.arn for (n, f) in zip(func_names, funcs)}) @@ -401,30 +415,28 @@ def create_workflow(self, code_package: CodePackage, workflow_name: str) -> "SFN ) self.logging.info( - "Creating workflow {} from {}".format(workflow_name, package)) + "Creating workflow {} from {}".format(workflow_name, package) + ) workflow = SFNWorkflow( workflow_name, funcs, code_package.name, ret["stateMachineArn"], - code_package.hash + code_package.hash, ) except self.sfn_client.exceptions.StateMachineAlreadyExists as e: arn = re.search("'([^']*)'", str(e)).group()[1:-1] self.logging.info( "Workflow {} exists on AWS, retrieve configuration.".format( - workflow_name) + workflow_name + ) ) # Here we assume a single Lambda role workflow = SFNWorkflow( - workflow_name, - funcs, - code_package.name, - arn, - code_package.hash + workflow_name, funcs, code_package.name, arn, code_package.hash ) self.update_workflow(workflow, code_package) @@ -443,16 +455,17 @@ def update_workflow(self, workflow: Workflow, code_package: CodePackage): workflow = cast(SFNWorkflow, workflow) # Make sure we have a valid workflow benchmark - definition_path = os.path.join( - code_package.path, "definition.json") + definition_path = os.path.join(code_package.path, "definition.json") if not os.path.exists(definition_path): - raise ValueError( - f"No workflow definition found for {workflow.name}") + raise ValueError(f"No workflow definition found for {workflow.name}") # Create or update lambda function for each code file code_files = list(code_package.get_code_files(include_config=False)) func_names = [os.path.splitext(os.path.basename(p))[0] for p in code_files] - funcs = [self.create_function(code_package, workflow.name+"___"+fn) for fn in func_names] + funcs = [ + self.create_function(code_package, workflow.name + "___" + fn) + for fn in func_names + ] # Generate workflow definition.json gen = SFNGenerator({n: f.arn for (n, f) in zip(func_names, funcs)}) @@ -467,7 +480,9 @@ def update_workflow(self, workflow: Workflow, code_package: CodePackage): workflow.functions = funcs self.logging.info("Published new workflow code") - def create_workflow_trigger(self, workflow: Workflow, trigger_type: Trigger.TriggerType) -> Trigger: + def create_workflow_trigger( + self, workflow: Workflow, trigger_type: Trigger.TriggerType + ) -> Trigger: workflow = cast(SFNWorkflow, workflow) if trigger_type == Trigger.TriggerType.HTTP: @@ -547,12 +562,12 @@ def parse_aws_report( return request_id output = requests[request_id] output.request_id = request_id - output.provider_times.execution = int( - float(aws_vals["Duration"]) * 1000) + output.provider_times.execution = int(float(aws_vals["Duration"]) * 1000) output.stats.memory_used = float(aws_vals["Max Memory Used"]) if "Init Duration" in aws_vals: output.provider_times.initialization = int( - float(aws_vals["Init Duration"]) * 1000) + float(aws_vals["Init Duration"]) * 1000 + ) output.billing.billed_time = int(aws_vals["Billed Duration"]) output.billing.memory = int(aws_vals["Memory Size"]) output.billing.gb_seconds = output.billing.billed_time * output.billing.memory @@ -586,14 +601,12 @@ def get_invocation_error(self, function_name: str, start_time: int, end_time: in time.sleep(5) response = self.logs_client.get_query_results(queryId=query_id) if len(response["results"]) == 0: - self.logging.info( - "AWS logs are not yet available, repeat after 15s...") + self.logging.info("AWS logs are not yet available, repeat after 15s...") time.sleep(15) response = None else: break - self.logging.error( - f"Invocation error for AWS Lambda function {function_name}") + self.logging.error(f"Invocation error for AWS Lambda function {function_name}") for message in response["results"]: for value in message: if value["field"] == "@message": @@ -640,8 +653,7 @@ def download_metrics( for val in results: for result_part in val: if result_part["field"] == "@message": - request_id = AWS.parse_aws_report( - result_part["value"], requests) + request_id = AWS.parse_aws_report(result_part["value"], requests) if request_id in requests: results_processed += 1 requests_ids.remove(request_id) @@ -656,8 +668,7 @@ def _enforce_cold_start(self, function: Function): FunctionName=func.name, Timeout=func.timeout, MemorySize=func.memory, - Environment={"Variables": { - "ForceColdStart": str(self.cold_start_counter)}}, + Environment={"Variables": {"ForceColdStart": str(self.cold_start_counter)}}, ) def enforce_cold_start(self, functions: List[Function], code_package: CodePackage): diff --git a/sebs/aws/config.py b/sebs/aws/config.py index 40c359d5..53c8bd67 100644 --- a/sebs/aws/config.py +++ b/sebs/aws/config.py @@ -39,7 +39,9 @@ def initialize(dct: dict) -> Credentials: return AWSCredentials(dct["access_key"], dct["secret_key"]) @staticmethod - def deserialize(config: dict, cache: Cache, handlers: LoggingHandlers) -> Credentials: + def deserialize( + config: dict, cache: Cache, handlers: LoggingHandlers + ) -> Credentials: # FIXME: update return types of both functions to avoid cast # needs 3.7+ to support annotations @@ -47,15 +49,17 @@ def deserialize(config: dict, cache: Cache, handlers: LoggingHandlers) -> Creden ret: AWSCredentials # Load cached values if cached_config and "credentials" in cached_config: - ret = cast(AWSCredentials, AWSCredentials.initialize( - cached_config["credentials"])) + ret = cast( + AWSCredentials, AWSCredentials.initialize(cached_config["credentials"]) + ) ret.logging_handlers = handlers ret.logging.info("Using cached credentials for AWS") else: # Check for new config if "credentials" in config: - ret = cast(AWSCredentials, AWSCredentials.initialize( - config["credentials"])) + ret = cast( + AWSCredentials, AWSCredentials.initialize(config["credentials"]) + ) elif "AWS_ACCESS_KEY_ID" in os.environ: ret = AWSCredentials( os.environ["AWS_ACCESS_KEY_ID"], os.environ["AWS_SECRET_ACCESS_KEY"] @@ -66,16 +70,17 @@ def deserialize(config: dict, cache: Cache, handlers: LoggingHandlers) -> Creden "up environmental variables AWS_ACCESS_KEY_ID and " "AWS_SECRET_ACCESS_KEY" ) - ret.logging.info( - "No cached credentials for AWS found, initialize!") + ret.logging.info("No cached credentials for AWS found, initialize!") ret.logging_handlers = handlers return ret def update_cache(self, cache: Cache): - cache.update_config(val=self.access_key, keys=[ - "aws", "credentials", "access_key"]) - cache.update_config(val=self.secret_key, keys=[ - "aws", "credentials", "secret_key"]) + cache.update_config( + val=self.access_key, keys=["aws", "credentials", "access_key"] + ) + cache.update_config( + val=self.secret_key, keys=["aws", "credentials", "secret_key"] + ) def serialize(self) -> dict: out = {"access_key": self.access_key, "secret_key": self.secret_key} @@ -127,10 +132,7 @@ def lambda_role(self, boto3_session: boto3.session.Session) -> str: "Sid": "", "Effect": "Allow", "Principal": { - "Service": [ - "lambda.amazonaws.com", - "states.amazonaws.com" - ] + "Service": ["lambda.amazonaws.com", "states.amazonaws.com"] }, "Action": "sts:AssumeRole", } @@ -142,13 +144,12 @@ def lambda_role(self, boto3_session: boto3.session.Session) -> str: "arn:aws:iam::aws:policy/CloudWatchLogsFullAccess", "arn:aws:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole", "arn:aws:iam::aws:policy/service-role/AWSLambdaRole", - "arn:aws:iam::aws:policy/AWSXRayDaemonWriteAccess" + "arn:aws:iam::aws:policy/AWSXRayDaemonWriteAccess", ] try: out = iam_client.get_role(RoleName=role_name) self._lambda_role = out["Role"]["Arn"] - self.logging.info( - f"AWS: Selected {self._lambda_role} IAM role") + self.logging.info(f"AWS: Selected {self._lambda_role} IAM role") except iam_client.exceptions.NoSuchEntityException: out = iam_client.create_role( RoleName=role_name, @@ -162,8 +163,7 @@ def lambda_role(self, boto3_session: boto3.session.Session) -> str: time.sleep(10) # Attach basic AWS Lambda and S3 policies. for policy in attached_policies: - iam_client.attach_role_policy( - RoleName=role_name, PolicyArn=policy) + iam_client.attach_role_policy(RoleName=role_name, PolicyArn=policy) return self._lambda_role def http_api( @@ -215,16 +215,20 @@ def initialize(dct: dict) -> Resources: def serialize(self) -> dict: out = { "lambda-role": self._lambda_role, - "http-apis": {key: value.serialize() for (key, value) in self._http_apis.items()}, + "http-apis": { + key: value.serialize() for (key, value) in self._http_apis.items() + }, } return out def update_cache(self, cache: Cache): - cache.update_config(val=self._lambda_role, keys=[ - "aws", "resources", "lambda-role"]) + cache.update_config( + val=self._lambda_role, keys=["aws", "resources", "lambda-role"] + ) for name, api in self._http_apis.items(): - cache.update_config(val=api.serialize(), keys=[ - "aws", "resources", "http-apis", name]) + cache.update_config( + val=api.serialize(), keys=["aws", "resources", "http-apis", name] + ) @staticmethod def deserialize(config: dict, cache: Cache, handlers: LoggingHandlers) -> Resources: @@ -233,18 +237,19 @@ def deserialize(config: dict, cache: Cache, handlers: LoggingHandlers) -> Resour ret: AWSResources # Load cached values if cached_config and "resources" in cached_config: - ret = cast(AWSResources, AWSResources.initialize( - cached_config["resources"])) + ret = cast( + AWSResources, AWSResources.initialize(cached_config["resources"]) + ) ret.logging_handlers = handlers ret.logging.info("Using cached resources for AWS") else: # Check for new config if "resources" in config: - ret = cast(AWSResources, AWSResources.initialize( - config["resources"])) + ret = cast(AWSResources, AWSResources.initialize(config["resources"])) ret.logging_handlers = handlers ret.logging.info( - "No cached resources for AWS found, using user configuration.") + "No cached resources for AWS found, using user configuration." + ) else: ret = AWSResources(lambda_role="") ret.logging_handlers = handlers @@ -288,9 +293,11 @@ def deserialize(config: dict, cache: Cache, handlers: LoggingHandlers) -> Config cached_config = cache.get_config("aws") # FIXME: use future annotations (see sebs/faas/system) credentials = cast( - AWSCredentials, AWSCredentials.deserialize(config, cache, handlers)) - resources = cast(AWSResources, AWSResources.deserialize( - config, cache, handlers)) + AWSCredentials, AWSCredentials.deserialize(config, cache, handlers) + ) + resources = cast( + AWSResources, AWSResources.deserialize(config, cache, handlers) + ) config_obj = AWSConfig(credentials, resources) config_obj.logging_handlers = handlers # Load cached values @@ -322,6 +329,6 @@ def serialize(self) -> dict: "region": self._region, "credentials": self._credentials.serialize(), "resources": self._resources.serialize(), - "redis_host": self._redis_host + "redis_host": self._redis_host, } return out diff --git a/sebs/aws/function.py b/sebs/aws/function.py index 20816745..a3d77d54 100644 --- a/sebs/aws/function.py +++ b/sebs/aws/function.py @@ -59,7 +59,9 @@ def deserialize(cached_config: dict) -> "LambdaFunction": for trigger in cached_config["triggers"]: trigger_type = cast( Trigger, - {"Library": FunctionLibraryTrigger, "HTTP": HTTPTrigger}.get(trigger["type"]), + {"Library": FunctionLibraryTrigger, "HTTP": HTTPTrigger}.get( + trigger["type"] + ), ) assert trigger_type, "Unknown trigger type {}".format(trigger["type"]) ret.add_trigger(trigger_type.deserialize(trigger)) diff --git a/sebs/aws/generator.py b/sebs/aws/generator.py index 1a736a98..d00cfe2b 100644 --- a/sebs/aws/generator.py +++ b/sebs/aws/generator.py @@ -5,27 +5,22 @@ class SFNGenerator(Generator): - def __init__(self, func_arns: Dict[str, str]): super().__init__() self._func_arns = func_arns - def postprocess(self, states: List[State], payloads: List[dict]) -> dict: payloads = super().postprocess(states, payloads) definition = { "Comment": "SeBS auto-generated benchmark", "StartAt": self.root.name, - "States": payloads + "States": payloads, } return definition def encode_task(self, state: Task) -> Union[dict, List[dict]]: - payload = { - "Type": "Task", - "Resource": self._func_arns[state.func_name] - } + payload = {"Type": "Task", "Resource": self._func_arns[state.func_name]} if state.next: payload["Next"] = state.next @@ -36,11 +31,7 @@ def encode_task(self, state: Task) -> Union[dict, List[dict]]: def encode_switch(self, state: Switch) -> Union[dict, List[dict]]: choises = [self._encode_case(c) for c in state.cases] - return { - "Type": "Choice", - "Choices": choises, - "Default": state.default - } + return {"Type": "Choice", "Choices": choises, "Default": state.default} def _encode_case(self, case: Switch.Case) -> dict: type = "Numeric" if isinstance(case.val, numbers.Number) else "String" @@ -49,30 +40,26 @@ def _encode_case(self, case: Switch.Case) -> dict: "<=": "LessThanEquals", "==": "Equals", ">=": "GreaterThanEquals", - ">": "GreaterThan" + ">": "GreaterThan", } cond = type + comp[case.op] - return { - "Variable": "$." + case.var, - cond: case.val, - "Next": case.next - } + return {"Variable": "$." + case.var, cond: case.val, "Next": case.next} def encode_map(self, state: Map) -> Union[dict, List[dict]]: payload = { "Type": "Map", - "ItemsPath": "$."+state.array, + "ItemsPath": "$." + state.array, "Iterator": { "StartAt": "func", "States": { "func": { "Type": "Task", "Resource": self._func_arns[state.func_name], - "End": True + "End": True, } - } - } + }, + }, } if state.next: diff --git a/sebs/aws/s3.py b/sebs/aws/s3.py index e47bd77f..72560717 100644 --- a/sebs/aws/s3.py +++ b/sebs/aws/s3.py @@ -49,7 +49,9 @@ def _create_bucket(self, name: str, buckets: List[str] = []): for bucket_name in buckets: if name in bucket_name: self.logging.info( - "Bucket {} for {} already exists, skipping.".format(bucket_name, name) + "Bucket {} for {} already exists, skipping.".format( + bucket_name, name + ) ) return bucket_name random_name = str(uuid.uuid4())[0:16] @@ -66,7 +68,9 @@ def _create_bucket(self, name: str, buckets: List[str] = []): self.client.create_bucket(Bucket=bucket_name) self.logging.info("Created bucket {}".format(bucket_name)) except self.client.exceptions.BucketAlreadyExists as e: - self.logging.error(f"The bucket {bucket_name} exists already in region {self.region}!") + self.logging.error( + f"The bucket {bucket_name} exists already in region {self.region}!" + ) raise e except self.client.exceptions.ClientError as e: self.logging.error( @@ -114,7 +118,9 @@ def list_bucket(self, bucket_name: str): def list_buckets(self, bucket_name: str) -> List[str]: s3_buckets = self.client.list_buckets()["Buckets"] - return [bucket["Name"] for bucket in s3_buckets if bucket_name in bucket["Name"]] + return [ + bucket["Name"] for bucket in s3_buckets if bucket_name in bucket["Name"] + ] def clean_bucket(self, bucket: str): objects = self.client.list_objects_v2(Bucket=bucket) diff --git a/sebs/aws/triggers.py b/sebs/aws/triggers.py index 53a47db6..e368a641 100644 --- a/sebs/aws/triggers.py +++ b/sebs/aws/triggers.py @@ -50,22 +50,21 @@ def sync_invoke(self, payload: dict) -> ExecutionResult: serialized_payload = json.dumps(payload).encode("utf-8") client = self.deployment_client.get_lambda_client() begin = datetime.datetime.now() - ret = client.invoke(FunctionName=self.name, - Payload=serialized_payload, LogType="Tail") + ret = client.invoke( + FunctionName=self.name, Payload=serialized_payload, LogType="Tail" + ) end = datetime.datetime.now() aws_result = ExecutionResult.from_times(begin, end) aws_result.request_id = ret["ResponseMetadata"]["RequestId"] if ret["StatusCode"] != 200: self.logging.error("Invocation of {} failed!".format(self.name)) - self.logging.error("Input: {}".format( - serialized_payload.decode("utf-8"))) + self.logging.error("Input: {}".format(serialized_payload.decode("utf-8"))) aws_result.stats.failure = True return aws_result if "FunctionError" in ret: self.logging.error("Invocation of {} failed!".format(self.name)) - self.logging.error("Input: {}".format( - serialized_payload.decode("utf-8"))) + self.logging.error("Input: {}".format(serialized_payload.decode("utf-8"))) aws_result.stats.failure = True return aws_result self.logging.debug(f"Invoke of function {self.name} was successful") @@ -79,8 +78,7 @@ def sync_invoke(self, payload: dict) -> ExecutionResult: if isinstance(function_output["body"], dict): aws_result.parse_benchmark_output(function_output["body"]) else: - aws_result.parse_benchmark_output( - json.loads(function_output["body"])) + aws_result.parse_benchmark_output(json.loads(function_output["body"])) return aws_result def async_invoke(self, payload: dict): @@ -95,10 +93,8 @@ def async_invoke(self, payload: dict): LogType="Tail", ) if ret["StatusCode"] != 202: - self.logging.error( - "Async invocation of {} failed!".format(self.name)) - self.logging.error("Input: {}".format( - serialized_payload.decode("utf-8"))) + self.logging.error("Async invocation of {} failed!".format(self.name)) + self.logging.error("Input: {}".format(serialized_payload.decode("utf-8"))) raise RuntimeError() return ret @@ -111,7 +107,8 @@ def sync_invoke(self, payload: dict) -> ExecutionResult: client = self.deployment_client.get_sfn_client() begin = datetime.datetime.now() ret = client.start_execution( - stateMachineArn=self.name, input=json.dumps(payload)) + stateMachineArn=self.name, input=json.dumps(payload) + ) end = datetime.datetime.now() aws_result = ExecutionResult.from_times(begin, end) @@ -121,7 +118,7 @@ def sync_invoke(self, payload: dict) -> ExecutionResult: # Wait for execution to finish, then print results. execution_finished = False backoff_delay = 1 # Start wait with delay of 1 second - while (not execution_finished): + while not execution_finished: execution = client.describe_execution(executionArn=execution_arn) status = execution["status"] execution_finished = status != "RUNNING" @@ -141,7 +138,7 @@ def sync_invoke(self, payload: dict) -> ExecutionResult: def async_invoke(self, payload: dict): - raise NotImplementedError('Async invocation is not implemented') + raise NotImplementedError("Async invocation is not implemented") class HTTPTrigger(Trigger): diff --git a/sebs/aws/workflow.py b/sebs/aws/workflow.py index 09debdce..3489a469 100644 --- a/sebs/aws/workflow.py +++ b/sebs/aws/workflow.py @@ -12,7 +12,7 @@ def __init__( functions: List[LambdaFunction], benchmark: str, arn: str, - code_package_hash: str + code_package_hash: str, ): super().__init__(benchmark, name, code_package_hash) self.functions = functions @@ -26,7 +26,7 @@ def serialize(self) -> dict: return { **super().serialize(), "functions": [f.serialize() for f in self.functions], - "arn": self.arn + "arn": self.arn, } @staticmethod @@ -40,12 +40,14 @@ def deserialize(cached_config: dict) -> "SFNWorkflow": funcs, cached_config["code_package"], cached_config["arn"], - cached_config["hash"] + cached_config["hash"], ) for trigger in cached_config["triggers"]: trigger_type = cast( Trigger, - {"Library": WorkflowLibraryTrigger, "HTTP": HTTPTrigger}.get(trigger["type"]), + {"Library": WorkflowLibraryTrigger, "HTTP": HTTPTrigger}.get( + trigger["type"] + ), ) assert trigger_type, "Unknown trigger type {}".format(trigger["type"]) ret.add_trigger(trigger_type.deserialize(trigger)) diff --git a/sebs/azure/azure.py b/sebs/azure/azure.py index e740b2d0..4ac8fed3 100644 --- a/sebs/azure/azure.py +++ b/sebs/azure/azure.py @@ -103,7 +103,8 @@ def get_storage(self, replace_existing: bool = False) -> PersistentStorage: self.config.region, self.cache_client, self.config.resources.data_storage_account( - self.cli_instance).connection_string, + self.cli_instance + ).connection_string, replace_existing=replace_existing, ) self.storage.logging_handlers = self.logging_handlers @@ -119,7 +120,9 @@ def get_storage(self, replace_existing: bool = False) -> PersistentStorage: # - function.json # host.json # requirements.txt/package.json - def package_code(self, code_package: CodePackage, directory: str, is_workflow: bool) -> Tuple[str, int]: + def package_code( + self, code_package: CodePackage, directory: str, is_workflow: bool + ) -> Tuple[str, int]: # In previous step we ran a Docker container which installed packages # Python packages are in .python_packages because this is expected by Azure @@ -130,7 +133,7 @@ def package_code(self, code_package: CodePackage, directory: str, is_workflow: b } WRAPPER_FILES = { "python": ["handler.py", "storage.py", "fsm.py"], - "nodejs": ["handler.js", "storage.js"] + "nodejs": ["handler.js", "storage.js"], } file_type = FILES[code_package.language_name] package_config = CONFIG_FILES[code_package.language_name] @@ -141,11 +144,9 @@ def package_code(self, code_package: CodePackage, directory: str, is_workflow: b os.rename(main_path, os.path.join(directory, "main.py")) # Make sure we have a valid workflow benchmark - src_path = os.path.join( - code_package.path, "definition.json") + src_path = os.path.join(code_package.path, "definition.json") if not os.path.exists(src_path): - raise ValueError( - f"No workflow definition found in {directory}") + raise ValueError(f"No workflow definition found in {directory}") dst_path = os.path.join(directory, "definition.json") shutil.copy2(src_path, dst_path) @@ -154,10 +155,15 @@ def package_code(self, code_package: CodePackage, directory: str, is_workflow: b # TODO: extension to other triggers than HTTP main_bindings = [ - {"name": "req", "type": "httpTrigger", "direction": "in", - "authLevel": "function", "methods": ["post"]}, + { + "name": "req", + "type": "httpTrigger", + "direction": "in", + "authLevel": "function", + "methods": ["post"], + }, {"name": "starter", "type": "durableClient", "direction": "in"}, - {"name": "$return", "type": "http", "direction": "out"} + {"name": "$return", "type": "http", "direction": "out"}, ] activity_bindings = [ {"name": "event", "type": "activityTrigger", "direction": "in"}, @@ -167,10 +173,7 @@ def package_code(self, code_package: CodePackage, directory: str, is_workflow: b ] if is_workflow: - bindings = { - "main": main_bindings, - "run_workflow": orchestrator_bindings - } + bindings = {"main": main_bindings, "run_workflow": orchestrator_bindings} else: bindings = {"function": main_bindings} @@ -196,13 +199,17 @@ def package_code(self, code_package: CodePackage, directory: str, is_workflow: b payload = { "bindings": bindings.get(name, activity_bindings), "scriptFile": script_file, - "disabled": False + "disabled": False, } dst_json = os.path.join(os.path.dirname(dst_file), "function.json") json.dump(payload, open(dst_json, "w"), indent=2) - handler_path = os.path.join(directory, WRAPPER_FILES[code_package.language_name][0]) - replace_string_in_file(handler_path, "{{REDIS_HOST}}", f"\"{self.config.redis_host}\"") + handler_path = os.path.join( + directory, WRAPPER_FILES[code_package.language_name][0] + ) + replace_string_in_file( + handler_path, "{{REDIS_HOST}}", f'"{self.config.redis_host}"' + ) # copy every wrapper file to respective function dirs for wrapper_file in wrapper_files: @@ -217,15 +224,17 @@ def package_code(self, code_package: CodePackage, directory: str, is_workflow: b "version": "2.0", "extensionBundle": { "id": "Microsoft.Azure.Functions.ExtensionBundle", - "version": "[2.*, 3.0.0)" + "version": "[2.*, 3.0.0)", }, } - json.dump(host_json, open( - os.path.join(directory, "host.json"), "w"), indent=2) + json.dump(host_json, open(os.path.join(directory, "host.json"), "w"), indent=2) code_size = CodePackage.directory_size(directory) - execute("zip -qu -r9 {}.zip * .".format(code_package.name), - shell=True, cwd=directory) + execute( + "zip -qu -r9 {}.zip * .".format(code_package.name), + shell=True, + cwd=directory, + ) return directory, code_size def publish_benchmark( @@ -236,8 +245,7 @@ def publish_benchmark( ) -> str: success = False url = "" - self.logging.info( - "Attempting publish of {}".format(benchmark.name)) + self.logging.info("Attempting publish of {}".format(benchmark.name)) while not success: try: ret = self.cli_instance.execute( @@ -255,7 +263,8 @@ def publish_benchmark( break if url == "": raise RuntimeError( - "Couldnt find URL in {}".format(ret.decode("utf-8"))) + "Couldnt find URL in {}".format(ret.decode("utf-8")) + ) success = True except RuntimeError as e: error = str(e) @@ -292,13 +301,13 @@ def update_benchmark(self, benchmark: Benchmark, code_package: CodePackage): url = self.publish_benchmark(benchmark, code_package, True) trigger = HTTPTrigger( - url, self.config.resources.data_storage_account(self.cli_instance)) + url, self.config.resources.data_storage_account(self.cli_instance) + ) trigger.logging_handlers = self.logging_handlers benchmark.add_trigger(trigger) def _mount_function_code(self, code_package: CodePackage): - self.cli_instance.upload_package( - code_package.code_location, "/mnt/function/") + self.cli_instance.upload_package(code_package.code_location, "/mnt/function/") def default_benchmark_name(self, code_package: CodePackage) -> str: """ @@ -316,11 +325,13 @@ def default_benchmark_name(self, code_package: CodePackage) -> str: return func_name B = TypeVar("B", bound=FunctionApp) - def create_benchmark(self, code_package: CodePackage, name: str, benchmark_cls: B) -> B: + + def create_benchmark( + self, code_package: CodePackage, name: str, benchmark_cls: B + ) -> B: language = code_package.language_name language_runtime = code_package.language_version - resource_group = self.config.resources.resource_group( - self.cli_instance) + resource_group = self.config.resources.resource_group(self.cli_instance) region = self.config.region config = { @@ -344,17 +355,18 @@ def create_benchmark(self, code_package: CodePackage, name: str, benchmark_cls: for setting in json.loads(ret.decode()): if setting["name"] == "AzureWebJobsStorage": connection_string = setting["value"] - elems = [z for y in connection_string.split( - ";") for z in y.split("=")] + elems = [ + z for y in connection_string.split(";") for z in y.split("=") + ] account_name = elems[elems.index("AccountName") + 1] function_storage_account = AzureResources.Storage.from_cache( account_name, connection_string ) - self.logging.info( - "Azure: Selected {} function app".format(name)) + self.logging.info("Azure: Selected {} function app".format(name)) except RuntimeError: function_storage_account = self.config.resources.add_storage_account( - self.cli_instance) + self.cli_instance + ) config["storage_account"] = function_storage_account.account_name # FIXME: only Linux type is supported while True: @@ -369,8 +381,7 @@ def create_benchmark(self, code_package: CodePackage, name: str, benchmark_cls: " --name {name} --storage-account {storage_account}" ).format(**config) ) - self.logging.info( - "Azure: Created function app {}".format(name)) + self.logging.info("Azure: Created function app {}".format(name)) break except RuntimeError as e: # Azure does not allow some concurrent operations @@ -396,25 +407,29 @@ def create_benchmark(self, code_package: CodePackage, name: str, benchmark_cls: def cached_benchmark(self, benchmark: Benchmark): data_storage_account = self.config.resources.data_storage_account( - self.cli_instance) + self.cli_instance + ) for trigger in benchmark.triggers_all(): azure_trigger = cast(AzureTrigger, trigger) azure_trigger.logging_handlers = self.logging_handlers azure_trigger.data_storage_account = data_storage_account - def create_function(self, code_package: CodePackage, func_name: str) -> AzureFunction: + def create_function( + self, code_package: CodePackage, func_name: str + ) -> AzureFunction: return self.create_benchmark(code_package, func_name, AzureFunction) def update_function(self, function: Function, code_package: CodePackage): self.update_benchmark(function, code_package) - def create_workflow(self, code_package: CodePackage, workflow_name: str) -> AzureWorkflow: + def create_workflow( + self, code_package: CodePackage, workflow_name: str + ) -> AzureWorkflow: return self.create_benchmark(code_package, workflow_name, AzureWorkflow) def update_workflow(self, workflow: Workflow, code_package: CodePackage): self.update_benchmark(workflow, code_package) - """ Prepare Azure resources to store experiment results. Allocate one container. @@ -424,8 +439,7 @@ def update_workflow(self, workflow: Workflow, code_package: CodePackage): """ def prepare_experiment(self, benchmark: str): - logs_container = self.storage.add_output_bucket( - benchmark, suffix="logs") + logs_container = self.storage.add_output_bucket(benchmark, suffix="logs") return logs_container def download_metrics( @@ -437,16 +451,16 @@ def download_metrics( metrics: Dict[str, dict], ): - resource_group = self.config.resources.resource_group( - self.cli_instance) + resource_group = self.config.resources.resource_group(self.cli_instance) # Avoid warnings in the next step ret = self.cli_instance.execute( - "az feature register --name AIWorkspacePreview " "--namespace microsoft.insights" + "az feature register --name AIWorkspacePreview " + "--namespace microsoft.insights" ) app_id_query = self.cli_instance.execute( - ("az monitor app-insights component show " "--app {} --resource-group {}").format( - function_name, resource_group - ) + ( + "az monitor app-insights component show " "--app {} --resource-group {}" + ).format(function_name, resource_group) ).decode("utf-8") application_id = json.loads(app_id_query)["appId"] @@ -457,8 +471,9 @@ def download_metrics( start_time_str = datetime.datetime.fromtimestamp(start_time).strftime( "%Y-%m-%d %H:%M:%S.%f" ) - end_time_str = datetime.datetime.fromtimestamp( - end_time + 1).strftime("%Y-%m-%d %H:%M:%S") + end_time_str = datetime.datetime.fromtimestamp(end_time + 1).strftime( + "%Y-%m-%d %H:%M:%S" + ) from tzlocal import get_localzone timezone_str = datetime.datetime.now(get_localzone()).strftime("%z") @@ -498,7 +513,8 @@ def download_metrics( func_exec_time = request[-1] invocations_processed.add(invocation_id) requests[invocation_id].provider_times.execution = int( - float(func_exec_time) * 1000) + float(func_exec_time) * 1000 + ) self.logging.info( f"Azure: Found time metrics for {len(invocations_processed)} " f"out of {len(requests.keys())} invocations." @@ -506,15 +522,15 @@ def download_metrics( if len(invocations_processed) < len(requests.keys()): time.sleep(5) self.logging.info( - f"Missing the requests: {invocations_to_process - invocations_processed}") + f"Missing the requests: {invocations_to_process - invocations_processed}" + ) # TODO: query performance counters for mem def _enforce_cold_start(self, function: Function, code_package: CodePackage): fname = function.name - resource_group = self.config.resources.resource_group( - self.cli_instance) + resource_group = self.config.resources.resource_group(self.cli_instance) self.cli_instance.execute( f"az functionapp config appsettings set --name {fname} " @@ -537,10 +553,12 @@ def enforce_cold_start(self, functions: List[Function], code_package: CodePackag It is automatically created for each function. """ - def create_function_trigger(self, function: Function, - trigger_type: Trigger.TriggerType) -> Trigger: + def create_function_trigger( + self, function: Function, trigger_type: Trigger.TriggerType + ) -> Trigger: raise NotImplementedError() - def create_workflow_trigger(self, workflow: Workflow, - trigger_type: Trigger.TriggerType) -> Trigger: + def create_workflow_trigger( + self, workflow: Workflow, trigger_type: Trigger.TriggerType + ) -> Trigger: raise NotImplementedError() diff --git a/sebs/azure/blob_storage.py b/sebs/azure/blob_storage.py index cad108a8..e87d8d75 100644 --- a/sebs/azure/blob_storage.py +++ b/sebs/azure/blob_storage.py @@ -16,7 +16,9 @@ def typename() -> str: def deployment_name(): return "azure" - def __init__(self, region: str, cache_client: Cache, conn_string: str, replace_existing: bool): + def __init__( + self, region: str, cache_client: Cache, conn_string: str, replace_existing: bool + ): super().__init__(region, cache_client, replace_existing) self.client = BlobServiceClient.from_connection_string(conn_string) @@ -27,7 +29,9 @@ def __init__(self, region: str, cache_client: Cache, conn_string: str, replace_e def _create_bucket(self, name: str, containers: List[str] = []) -> str: for c in containers: if name in c: - self.logging.info("Container {} for {} already exists, skipping.".format(c, name)) + self.logging.info( + "Container {} for {} already exists, skipping.".format(c, name) + ) return c random_name = str(uuid.uuid4())[0:16] name = "{}-{}".format(name, random_name) diff --git a/sebs/azure/cli.py b/sebs/azure/cli.py index f98226e4..9d15eeb0 100644 --- a/sebs/azure/cli.py +++ b/sebs/azure/cli.py @@ -17,7 +17,9 @@ def __init__(self, system_config: SeBSConfig, docker_client: docker.client): except docker.errors.ImageNotFound: try: logging.info( - "Docker pull of image {repo}:{image}".format(repo=repo_name, image=image_name) + "Docker pull of image {repo}:{image}".format( + repo=repo_name, image=image_name + ) ) docker_client.images.pull(repo_name, image_name) except docker.errors.APIError: diff --git a/sebs/azure/config.py b/sebs/azure/config.py index 23b5936d..a5bb3277 100644 --- a/sebs/azure/config.py +++ b/sebs/azure/config.py @@ -40,7 +40,9 @@ def initialize(dct: dict) -> Credentials: return AzureCredentials(dct["appId"], dct["tenant"], dct["password"]) @staticmethod - def deserialize(config: dict, cache: Cache, handlers: LoggingHandlers) -> Credentials: + def deserialize( + config: dict, cache: Cache, handlers: LoggingHandlers + ) -> Credentials: # FIXME: update return types of both functions to avoid cast # needs 3.7+ to support annotations @@ -94,12 +96,18 @@ def __init__(self, account_name: str, connection_string: str): # FIXME: 3.7+ migration with future annotations @staticmethod - def from_cache(account_name: str, connection_string: str) -> "AzureResources.Storage": - assert connection_string, "Empty connection string for account {}".format(account_name) + def from_cache( + account_name: str, connection_string: str + ) -> "AzureResources.Storage": + assert connection_string, "Empty connection string for account {}".format( + account_name + ) return AzureResources.Storage(account_name, connection_string) @staticmethod - def from_allocation(account_name: str, cli_instance: AzureCLI) -> "AzureResources.Storage": + def from_allocation( + account_name: str, cli_instance: AzureCLI + ) -> "AzureResources.Storage": connection_string = AzureResources.Storage.query_connection_string( account_name, cli_instance ) @@ -113,7 +121,9 @@ def from_allocation(account_name: str, cli_instance: AzureCLI) -> "AzureResource @staticmethod def query_connection_string(account_name: str, cli_instance: AzureCLI) -> str: ret = cli_instance.execute( - "az storage account show-connection-string --name {}".format(account_name) + "az storage account show-connection-string --name {}".format( + account_name + ) ) ret = json.loads(ret.decode("utf-8")) connection_string = ret["connectionString"] @@ -124,7 +134,9 @@ def serialize(self) -> dict: @staticmethod def deserialize(obj: dict) -> "AzureResources.Storage": - return AzureResources.Storage.from_cache(obj["account_name"], obj["connection_string"]) + return AzureResources.Storage.from_cache( + obj["account_name"], obj["connection_string"] + ) # FIXME: 3.7 Python, future annotations def __init__( @@ -195,12 +207,16 @@ def add_storage_account(self, cli_instance: AzureCLI) -> "AzureResources.Storage does NOT add the account to any resource collection. """ - def _create_storage_account(self, cli_instance: AzureCLI) -> "AzureResources.Storage": + def _create_storage_account( + self, cli_instance: AzureCLI + ) -> "AzureResources.Storage": sku = "Standard_LRS" # Create account. Only alphanumeric characters are allowed uuid_name = str(uuid.uuid1())[0:8] account_name = "sebsstorage{}".format(uuid_name) - self.logging.info("Starting allocation of storage account {}.".format(account_name)) + self.logging.info( + "Starting allocation of storage account {}.".format(account_name) + ) cli_instance.execute( ( "az storage account create --name {0} --location {1} " @@ -233,7 +249,9 @@ def initialize(dct: dict) -> Resources: storage_accounts=[ AzureResources.Storage.deserialize(x) for x in dct["storage_accounts"] ], - data_storage_account=AzureResources.Storage.deserialize(dct["data_storage_account"]), + data_storage_account=AzureResources.Storage.deserialize( + dct["data_storage_account"] + ), ) def serialize(self) -> dict: @@ -252,15 +270,25 @@ def deserialize(config: dict, cache: Cache, handlers: LoggingHandlers) -> Resour cached_config = cache.get_config("azure") ret: AzureResources # Load cached values - if cached_config and "resources" in cached_config and len(cached_config["resources"]) > 0: + if ( + cached_config + and "resources" in cached_config + and len(cached_config["resources"]) > 0 + ): logging.info("Using cached resources for Azure") - ret = cast(AzureResources, AzureResources.initialize(cached_config["resources"])) + ret = cast( + AzureResources, AzureResources.initialize(cached_config["resources"]) + ) else: # Check for new config if "resources" in config: - ret = cast(AzureResources, AzureResources.initialize(config["resources"])) + ret = cast( + AzureResources, AzureResources.initialize(config["resources"]) + ) ret.logging_handlers = handlers - ret.logging.info("No cached resources for Azure found, using user configuration.") + ret.logging.info( + "No cached resources for Azure found, using user configuration." + ) else: ret = AzureResources() ret.logging_handlers = handlers @@ -311,8 +339,12 @@ def deserialize(config: dict, cache: Cache, handlers: LoggingHandlers) -> Config cached_config = cache.get_config("azure") # FIXME: use future annotations (see sebs/faas/system) - credentials = cast(AzureCredentials, AzureCredentials.deserialize(config, cache, handlers)) - resources = cast(AzureResources, AzureResources.deserialize(config, cache, handlers)) + credentials = cast( + AzureCredentials, AzureCredentials.deserialize(config, cache, handlers) + ) + resources = cast( + AzureResources, AzureResources.deserialize(config, cache, handlers) + ) config_obj = AzureConfig(credentials, resources) config_obj.logging_handlers = handlers # Load cached values @@ -346,6 +378,6 @@ def serialize(self) -> dict: "resources_id": self.resources_id, "credentials": self._credentials.serialize(), "resources": self._resources.serialize(), - "redis_host": self._redis_host + "redis_host": self._redis_host, } return out diff --git a/sebs/azure/function_app.py b/sebs/azure/function_app.py index bf86df8e..7667ca0c 100644 --- a/sebs/azure/function_app.py +++ b/sebs/azure/function_app.py @@ -35,8 +35,10 @@ def deserialize(cached_config: dict) -> Function: ret.add_trigger(trigger_type.deserialize(trigger)) return ret + class AzureFunction(FunctionApp): pass + class AzureWorkflow(FunctionApp): - pass \ No newline at end of file + pass diff --git a/sebs/azure/triggers.py b/sebs/azure/triggers.py index a0c8bfdc..9376a71f 100644 --- a/sebs/azure/triggers.py +++ b/sebs/azure/triggers.py @@ -21,7 +21,9 @@ def data_storage_account(self, data_storage_account: AzureResources.Storage): class HTTPTrigger(AzureTrigger): - def __init__(self, url: str, data_storage_account: Optional[AzureResources.Storage] = None): + def __init__( + self, url: str, data_storage_account: Optional[AzureResources.Storage] = None + ): super().__init__(data_storage_account) self.url = url diff --git a/sebs/cache.py b/sebs/cache.py index 42adc8b6..9b17c4b6 100644 --- a/sebs/cache.py +++ b/sebs/cache.py @@ -60,7 +60,9 @@ def typename() -> str: def load_config(self): with self._lock: for cloud in ["azure", "aws", "gcp"]: - cloud_config_file = os.path.join(self.cache_dir, "{}.json".format(cloud)) + cloud_config_file = os.path.join( + self.cache_dir, "{}.json".format(cloud) + ) if os.path.exists(cloud_config_file): self.cached_config[cloud] = json.load(open(cloud_config_file, "r")) @@ -88,8 +90,12 @@ def shutdown(self): if self.config_updated: for cloud in ["azure", "aws", "gcp"]: if cloud in self.cached_config: - cloud_config_file = os.path.join(self.cache_dir, "{}.json".format(cloud)) - self.logging.info("Update cached config {}".format(cloud_config_file)) + cloud_config_file = os.path.join( + self.cache_dir, "{}.json".format(cloud) + ) + self.logging.info( + "Update cached config {}".format(cloud_config_file) + ) with open(cloud_config_file, "w") as out: json.dump(self.cached_config[cloud], out, indent=2) @@ -149,7 +155,11 @@ def get_benchmarks( def get_storage_config(self, deployment: str, benchmark: str): cfg = self.get_benchmark_config(deployment, benchmark) - return cfg["storage"] if cfg and "storage" in cfg and not self.ignore_storage else None + return ( + cfg["storage"] + if cfg and "storage" in cfg and not self.ignore_storage + else None + ) def update_storage(self, deployment: str, benchmark: str, config: dict): if self.ignore_storage: @@ -162,7 +172,9 @@ def update_storage(self, deployment: str, benchmark: str, config: dict): with open(os.path.join(benchmark_dir, "config.json"), "w") as fp: json.dump(cached_config, fp, indent=2) - def add_code_package(self, deployment_name: str, language_name: str, code_package: "CodePackage"): + def add_code_package( + self, deployment_name: str, language_name: str, code_package: "CodePackage" + ): with self._lock: language = code_package.language_name benchmark_dir = os.path.join(self.cache_dir, code_package.name) @@ -242,8 +254,12 @@ def update_code_package( with open(os.path.join(benchmark_dir, "config.json"), "r") as fp: config = json.load(fp) date = str(datetime.datetime.now()) - config[deployment_name][language]["code_package"]["date"]["modified"] = date - config[deployment_name][language]["code_package"]["hash"] = code_package.hash + config[deployment_name][language]["code_package"]["date"][ + "modified" + ] = date + config[deployment_name][language]["code_package"][ + "hash" + ] = code_package.hash with open(os.path.join(benchmark_dir, "config.json"), "w") as fp: json.dump(config, fp, indent=2) else: @@ -275,12 +291,16 @@ def add_benchmark( cache_config = os.path.join(benchmark_dir, "config.json") if os.path.exists(cache_config): - benchmarks_config: Dict[str, Any] = {benchmark.name: {**benchmark.serialize()}} + benchmarks_config: Dict[str, Any] = { + benchmark.name: {**benchmark.serialize()} + } with open(cache_config, "r") as fp: cached_config = json.load(fp) if "benchmarks" not in cached_config[deployment_name][language]: - cached_config[deployment_name][language]["benchmarks"] = benchmarks_config + cached_config[deployment_name][language][ + "benchmarks" + ] = benchmarks_config else: cached_config[deployment_name][language]["benchmarks"].update( benchmarks_config @@ -290,7 +310,9 @@ def add_benchmark( json.dump(config, fp, indent=2) else: raise RuntimeError( - "Can't cache benchmark {} for a non-existing code package!".format(benchmark.name) + "Can't cache benchmark {} for a non-existing code package!".format( + benchmark.name + ) ) def update_benchmark(self, benchmark: "Benchmark"): @@ -317,5 +339,7 @@ def update_benchmark(self, benchmark: "Benchmark"): json.dump(cached_config, fp, indent=2) else: raise RuntimeError( - "Can't cache benchmark {} for a non-existing code package!".format(benchmark.name) + "Can't cache benchmark {} for a non-existing code package!".format( + benchmark.name + ) ) diff --git a/sebs/code_package.py b/sebs/code_package.py index 23a85b3a..5f5cdb56 100644 --- a/sebs/code_package.py +++ b/sebs/code_package.py @@ -129,7 +129,9 @@ def language_version(self): @property # noqa: A003 def hash(self): path = os.path.join(self.path, self.language_name) - self._hash_value = CodePackage.hash_directory(path, self._deployment_name, self.language_name) + self._hash_value = CodePackage.hash_directory( + path, self._deployment_name, self.language_name + ) return self._hash_value @hash.setter # noqa: A003 @@ -165,7 +167,9 @@ def __init__( if self.language not in self.config.languages: raise RuntimeError( - "Benchmark {} not available for language {}".format(self.name, self.language) + "Benchmark {} not available for language {}".format( + self.name, self.language + ) ) self._cache_client = cache_client self._docker_client = docker_client @@ -300,7 +304,7 @@ def add_deployment_package_python(self, output_dir): if len(packages): with open(os.path.join(output_dir, "requirements.txt"), "a") as out: for package in packages: - out.write(package+"\n") + out.write(package + "\n") def add_deployment_package_nodejs(self, output_dir): # modify package.json @@ -363,11 +367,15 @@ def install_dependencies(self, output_dir): ) self._docker_client.images.pull(repo_name, image_name) except docker.errors.APIError: - raise RuntimeError("Docker pull of image {} failed!".format(image_name)) + raise RuntimeError( + "Docker pull of image {} failed!".format(image_name) + ) # Create set of mounted volumes unless Docker volumes are disabled if not self._experiment_config.check_flag("docker_copy_build_files"): - volumes = {os.path.abspath(output_dir): {"bind": "/mnt/function", "mode": "rw"}} + volumes = { + os.path.abspath(output_dir): {"bind": "/mnt/function", "mode": "rw"} + } package_script = os.path.abspath( os.path.join(self._path, self.language_name, "package.sh") ) @@ -385,11 +393,15 @@ def install_dependencies(self, output_dir): try: self.logging.info( "Docker build of benchmark dependencies in container " - "of image {repo}:{image}".format(repo=repo_name, image=image_name) + "of image {repo}:{image}".format( + repo=repo_name, image=image_name + ) ) uid = os.getuid() # Standard, simplest build - if not self._experiment_config.check_flag("docker_copy_build_files"): + if not self._experiment_config.check_flag( + "docker_copy_build_files" + ): self.logging.info( "Docker mount of benchmark code from path {path}".format( path=os.path.abspath(output_dir) @@ -425,7 +437,9 @@ def install_dependencies(self, output_dir): "Send benchmark code from path {path} to " "Docker instance".format(path=os.path.abspath(output_dir)) ) - tar_archive = os.path.join(output_dir, os.path.pardir, "function.tar") + tar_archive = os.path.join( + output_dir, os.path.pardir, "function.tar" + ) with tarfile.open(tar_archive, "w") as tar: for f in os.listdir(output_dir): tar.add(os.path.join(output_dir, f), arcname=f) @@ -467,8 +481,9 @@ def recalculate_code_size(self): return self._code_size def build( - self, deployment_build_step: Callable[["CodePackage", str, bool], Tuple[str, int]], - is_workflow: bool + self, + deployment_build_step: Callable[["CodePackage", str, bool], Tuple[str, int]], + is_workflow: bool, ) -> Tuple[bool, str]: # Skip build if files are up to date and user didn't enforce rebuild @@ -514,9 +529,13 @@ def build( # package already exists if self.is_cached: - self._cache_client.update_code_package(self._deployment_name, self.language_name, self) + self._cache_client.update_code_package( + self._deployment_name, self.language_name, self + ) else: - self._cache_client.add_code_package(self._deployment_name, self.language_name, self) + self._cache_client.add_code_package( + self._deployment_name, self.language_name, self + ) self.query_cache() return True, self._code_location @@ -556,7 +575,9 @@ def code_package_modify(self, filename: str, data: bytes): if self.is_archive(): self._update_zip(self.code_location, filename, data) new_size = self.recompute_size() / 1024.0 / 1024.0 - self.logging.info(f"Modified zip package {self.code_location}, new size {new_size} MB") + self.logging.info( + f"Modified zip package {self.code_location}, new size {new_size} MB" + ) else: raise NotImplementedError() @@ -629,7 +650,9 @@ def load_benchmark_input(path: str) -> CodePackageModuleInterface: # Look for input generator file in the directory containing benchmark import importlib.machinery - loader = importlib.machinery.SourceFileLoader("input", os.path.join(path, "input.py")) + loader = importlib.machinery.SourceFileLoader( + "input", os.path.join(path, "input.py") + ) spec = importlib.util.spec_from_loader(loader.name, loader) assert spec mod = importlib.util.module_from_spec(spec) diff --git a/sebs/config.py b/sebs/config.py index fd7f66aa..d238dedd 100644 --- a/sebs/config.py +++ b/sebs/config.py @@ -12,23 +12,31 @@ def __init__(self): def docker_repository(self) -> str: return self._system_config["general"]["docker_repository"] - def deployment_packages(self, deployment_name: str, language_name: str) -> Dict[str, str]: - return self._system_config[deployment_name]["languages"][language_name]["deployment"][ - "packages" - ] + def deployment_packages( + self, deployment_name: str, language_name: str + ) -> Dict[str, str]: + return self._system_config[deployment_name]["languages"][language_name][ + "deployment" + ]["packages"] def deployment_files(self, deployment_name: str, language_name: str) -> List[str]: - return self._system_config[deployment_name]["languages"][language_name]["deployment"][ - "files" - ] + return self._system_config[deployment_name]["languages"][language_name][ + "deployment" + ]["files"] def docker_image_types(self, deployment_name: str, language_name: str) -> List[str]: - return self._system_config[deployment_name]["languages"][language_name]["images"] + return self._system_config[deployment_name]["languages"][language_name][ + "images" + ] - def supported_language_versions(self, deployment_name: str, language_name: str) -> List[str]: + def supported_language_versions( + self, deployment_name: str, language_name: str + ) -> List[str]: return self._system_config[deployment_name]["languages"][language_name][ "base_images" ].keys() def username(self, deployment_name: str, language_name: str) -> str: - return self._system_config[deployment_name]["languages"][language_name]["username"] + return self._system_config[deployment_name]["languages"][language_name][ + "username" + ] diff --git a/sebs/experiments/environment.py b/sebs/experiments/environment.py index 86576f11..29bf608b 100644 --- a/sebs/experiments/environment.py +++ b/sebs/experiments/environment.py @@ -13,7 +13,9 @@ class ExperimentEnvironment: def __init__(self): # find CPU mapping - ret = execute('cat /proc/cpuinfo | grep -e "processor" -e "core id"', shell=True) + ret = execute( + 'cat /proc/cpuinfo | grep -e "processor" -e "core id"', shell=True + ) # skip empty line at the end mapping = [int(x.split(":")[1]) for x in ret.split("\n") if x] @@ -47,7 +49,9 @@ def __init__(self): raise NotImplementedError() # Assume all CPU use the same - scaling_governor_path = "/sys/devices/system/cpu/cpu{cpu_id}/cpufreq/scaling_driver" + scaling_governor_path = ( + "/sys/devices/system/cpu/cpu{cpu_id}/cpufreq/scaling_driver" + ) governor = execute("cat {path}".format(path=scaling_governor_path)) if governor == "intel_pstate": self._governor = governor @@ -62,7 +66,9 @@ def write_cpu_status(self, cores: List[int], status: int): for logical_core in logical_cores[1:]: path = cpu_status_path.format(cpu_id=logical_core["core"]) execute( - cmd="echo {status} | sudo tee {path}".format(status=status, path=path), + cmd="echo {status} | sudo tee {path}".format( + status=status, path=path + ), shell=True, ) @@ -101,7 +107,9 @@ def set_frequency(self, max_freq: int): def unset_frequency(self): path = "/sys/devices/system/cpu/intel_pstate/min_perf_pct" - execute("echo {freq} | sudo tee {path}".format(freq=self._prev_min_freq, path=path)) + execute( + "echo {freq} | sudo tee {path}".format(freq=self._prev_min_freq, path=path) + ) def setup_benchmarking(self, cores: List[int]): self.disable_boost(cores) diff --git a/sebs/experiments/eviction_model.py b/sebs/experiments/eviction_model.py index 4d55c66c..839b6856 100644 --- a/sebs/experiments/eviction_model.py +++ b/sebs/experiments/eviction_model.py @@ -95,10 +95,14 @@ def accept_replies(port: int, invocations: int): s.close() @staticmethod - def execute_instance(sleep_time: int, pid: int, tid: int, func: Function, payload: dict): + def execute_instance( + sleep_time: int, pid: int, tid: int, func: Function, payload: dict + ): try: - print(f"Process {pid} Thread {tid} Invoke function {func.name} with {payload} now!") + print( + f"Process {pid} Thread {tid} Invoke function {func.name} with {payload} now!" + ) begin = datetime.now() res = func.triggers(Trigger.TriggerType.HTTP)[0].sync_invoke(payload) end = datetime.now() @@ -111,7 +115,9 @@ def execute_instance(sleep_time: int, pid: int, tid: int, func: Function, payloa logging.error(f"First Invocation Failed at function {func.name}, {e}") raise RuntimeError() - time_spent = float(datetime.now().strftime("%s.%f")) - float(end.strftime("%s.%f")) + time_spent = float(datetime.now().strftime("%s.%f")) - float( + end.strftime("%s.%f") + ) seconds_sleep = sleep_time - time_spent print(f"PID {pid} TID {tid} with time {time}, sleep {seconds_sleep}") time.sleep(seconds_sleep) @@ -198,7 +204,9 @@ def prepare(self, sebs_client: "SeBS", deployment_client: FaaSSystem): # if self._benchmark.functions and fname in self._benchmark.functions: # self.logging.info(f"Skip {fname}, exists already.") # continue - self.functions.append(deployment_client.get_function(self._benchmark, func_name=fname)) + self.functions.append( + deployment_client.get_function(self._benchmark, func_name=fname) + ) def run(self): @@ -215,7 +223,9 @@ def run(self): # function_names = self.functions_names[invocation_idx :: self.function_copies_per_time] # flake8 issue # https://github.com/PyCQA/pycodestyle/issues/373 - functions = self.functions[invocation_idx :: self.function_copies_per_time] # noqa + functions = self.functions[ + invocation_idx :: self.function_copies_per_time + ] # noqa results = {} # Disable logging - otherwise we have RLock that can't get be pickled @@ -257,7 +267,9 @@ def run(self): """ for j in range(0, threads): servers_results.append( - pool.apply_async(EvictionModel.accept_replies, args=(port + j, invocations)) + pool.apply_async( + EvictionModel.accept_replies, args=(port + j, invocations) + ) ) """ diff --git a/sebs/experiments/invocation_overhead.py b/sebs/experiments/invocation_overhead.py index 11bbe403..0cbbdd8c 100644 --- a/sebs/experiments/invocation_overhead.py +++ b/sebs/experiments/invocation_overhead.py @@ -15,7 +15,9 @@ class CodePackageSize: - def __init__(self, deployment_client: FaaSSystem, benchmark: CodePackage, settings: dict): + def __init__( + self, deployment_client: FaaSSystem, benchmark: CodePackage, settings: dict + ): import math from numpy import linspace @@ -26,7 +28,9 @@ def __init__(self, deployment_client: FaaSSystem, benchmark: CodePackage, settin ) from sebs.utils import find_package_code - self._benchmark_path = find_package_code("030.clock-synchronization", "benchmarks") + self._benchmark_path = find_package_code( + "030.clock-synchronization", "benchmarks" + ) self._benchmark = benchmark random.seed(1410) @@ -89,7 +93,9 @@ def prepare(self, sebs_client: "SeBS", deployment_client: FaaSSystem): self._trigger = triggers[0] self._storage = deployment_client.get_storage(replace_existing=True) - self.benchmark_input = self._benchmark.prepare_input(storage=self._storage, size="test") + self.benchmark_input = self._benchmark.prepare_input( + storage=self._storage, size="test" + ) self._out_dir = os.path.join( sebs_client.output_dir, "invocation-overhead", self.settings["type"] ) @@ -107,7 +113,9 @@ def run(self): N = self.settings["N"] if self.settings["type"] == "code": - experiment = CodePackageSize(self._deployment_client, self._benchmark, self.settings) + experiment = CodePackageSize( + self._deployment_client, self._benchmark, self.settings + ) else: experiment = PayloadSize(self.settings) @@ -143,9 +151,13 @@ def run(self): for i in range(repetitions): succesful = False while not succesful: - self.logging.info(f"Starting with {size} bytes, repetition {i}") + self.logging.info( + f"Starting with {size} bytes, repetition {i}" + ) if result_type == "cold": - self._deployment_client.enforce_cold_start([self._function]) + self._deployment_client.enforce_cold_start( + [self._function] + ) time.sleep(1) row = self.receive_datagrams(input_benchmark, N, 12000, ip) if result_type == "cold": @@ -160,7 +172,9 @@ def run(self): succesful = True time.sleep(5) - self._storage.download_bucket(self.benchmark_input["output-bucket"], self._out_dir) + self._storage.download_bucket( + self.benchmark_input["output-bucket"], self._out_dir + ) def process( self, @@ -175,7 +189,9 @@ def process( full_data: Dict[str, pd.Dataframe] = {} for f in glob.glob( - os.path.join(directory, "invocation-overhead", self.settings["type"], "*.csv") + os.path.join( + directory, "invocation-overhead", self.settings["type"], "*.csv" + ) ): if "result.csv" in f or "result-processed.csv" in f: @@ -188,13 +204,18 @@ def process( else: full_data[request_id] = data df = pd.concat(full_data.values()).reset_index(drop=True) - df["rtt"] = (df["server_rcv"] - df["client_send"]) + (df["client_rcv"] - df["server_send"]) + df["rtt"] = (df["server_rcv"] - df["client_send"]) + ( + df["client_rcv"] - df["server_send"] + ) df["clock_drift"] = ( - (df["client_send"] - df["server_rcv"]) + (df["client_rcv"] - df["server_send"]) + (df["client_send"] - df["server_rcv"]) + + (df["client_rcv"] - df["server_send"]) ) / 2 with open( - os.path.join(directory, "invocation-overhead", self.settings["type"], "result.csv") + os.path.join( + directory, "invocation-overhead", self.settings["type"], "result.csv" + ) ) as csvfile: with open( os.path.join( @@ -226,15 +247,23 @@ def process( request_id = row[-1] clock_drift = df[df["id"] == request_id]["clock_drift"].mean() clock_drift_std = df[df["id"] == request_id]["clock_drift"].std() - invocation_time = float(row[5]) - float(row[4]) - float(row[3]) + clock_drift - writer.writerow(row + [clock_drift, clock_drift_std, invocation_time]) + invocation_time = ( + float(row[5]) - float(row[4]) - float(row[3]) + clock_drift + ) + writer.writerow( + row + [clock_drift, clock_drift_std, invocation_time] + ) - def receive_datagrams(self, input_benchmark: dict, repetitions: int, port: int, ip: str): + def receive_datagrams( + self, input_benchmark: dict, repetitions: int, port: int, ip: str + ): import socket input_benchmark["server-port"] = port - self.logging.info(f"Starting invocation with {repetitions} repetitions on port {port}") + self.logging.info( + f"Starting invocation with {repetitions} repetitions on port {port}" + ) socket.setdefaulttimeout(4) server_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) server_socket.bind(("", port)) @@ -260,7 +289,8 @@ def receive_datagrams(self, input_benchmark: dict, repetitions: int, port: int, # stop after 5 attempts if j == 5: self.logging.error( - "Failing after 5 unsuccesfull attempts to " "communicate with the function!" + "Failing after 5 unsuccesfull attempts to " + "communicate with the function!" ) break # check if function invocation failed, and if yes: raise the exception diff --git a/sebs/experiments/network_ping_pong.py b/sebs/experiments/network_ping_pong.py index a95506de..b9a767d3 100644 --- a/sebs/experiments/network_ping_pong.py +++ b/sebs/experiments/network_ping_pong.py @@ -30,7 +30,9 @@ def prepare(self, sebs_client: "SeBS", deployment_client: FaaSSystem): ) self._function = deployment_client.get_function(benchmark) self._storage = deployment_client.get_storage(replace_existing=True) - self.benchmark_input = benchmark.prepare_input(storage=self._storage, size="test") + self.benchmark_input = benchmark.prepare_input( + storage=self._storage, size="test" + ) self._out_dir = os.path.join(sebs_client.output_dir, "network-ping-pong") if not os.path.exists(self._out_dir): # shutil.rmtree(self._out_dir) @@ -59,7 +61,9 @@ def run(self): # give functions time to finish and upload result time.sleep(5) - self._storage.download_bucket(self.benchmark_input["output-bucket"], self._out_dir) + self._storage.download_bucket( + self.benchmark_input["output-bucket"], self._out_dir + ) def process(self, directory: str): @@ -73,7 +77,9 @@ def process(self, directory: str): else: full_data[request_id] = data df = pd.concat(full_data.values()).reset_index(drop=True) - df["rtt"] = (df["server_rcv"] - df["client_send"]) + (df["client_rcv"] - df["server_send"]) + df["rtt"] = (df["server_rcv"] - df["client_send"]) + ( + df["client_rcv"] - df["server_send"] + ) print("Rows: ", df.shape[0]) print("Mean: ", df["rtt"].mean()) print("STD: ", df["rtt"].std()) @@ -101,7 +107,9 @@ def receive_datagrams(self, repetitions: int, port: int, ip: str): "repetitions": repetitions, **self.benchmark_input, } - self._function.triggers(Trigger.TriggerType.HTTP)[0].async_invoke(input_benchmark) + self._function.triggers(Trigger.TriggerType.HTTP)[0].async_invoke( + input_benchmark + ) begin = datetime.now() times = [] diff --git a/sebs/experiments/perf_cost.py b/sebs/experiments/perf_cost.py index 2457c0e0..7c41003e 100644 --- a/sebs/experiments/perf_cost.py +++ b/sebs/experiments/perf_cost.py @@ -48,7 +48,9 @@ def prepare(self, sebs_client: "SeBS", deployment_client: FaaSSystem): ) self._function = deployment_client.get_function(self._benchmark) # prepare benchmark input - self._storage = deployment_client.get_storage(replace_existing=self.config.update_storage) + self._storage = deployment_client.get_storage( + replace_existing=self.config.update_storage + ) self._benchmark_input = self._benchmark.prepare_input( storage=self._storage, size=settings["input-size"] ) @@ -82,7 +84,9 @@ def run(self): self._function.memory = memory self._deployment_client.update_function(self._function, self._benchmark) self._sebs_client.cache_client.update_function(self._function) - self.run_configuration(settings, settings["repetitions"], suffix=str(memory)) + self.run_configuration( + settings, settings["repetitions"], suffix=str(memory) + ) def compute_statistics(self, times: List[float]): @@ -152,7 +156,10 @@ def _run_configuration( first_iteration = True while samples_gathered < repetitions: - if run_type == PerfCost.RunType.COLD or run_type == PerfCost.RunType.BURST: + if ( + run_type == PerfCost.RunType.COLD + or run_type == PerfCost.RunType.BURST + ): self._deployment_client.enforce_cold_start( [self._function], self._benchmark ) @@ -173,8 +180,12 @@ def _run_configuration( ret = res.get() if first_iteration: continue - if (run_type == PerfCost.RunType.COLD and not ret.stats.cold_start) or ( - run_type == PerfCost.RunType.WARM and ret.stats.cold_start + if ( + run_type == PerfCost.RunType.COLD + and not ret.stats.cold_start + ) or ( + run_type == PerfCost.RunType.WARM + and ret.stats.cold_start ): self.logging.info( f"Invocation {ret.request_id} " @@ -258,7 +269,9 @@ def run_configuration(self, settings: dict, repetitions: int, suffix: str = ""): PerfCost.RunType.SEQUENTIAL, settings, 1, repetitions, suffix ) else: - raise RuntimeError(f"Unknown experiment type {experiment_type} for Perf-Cost!") + raise RuntimeError( + f"Unknown experiment type {experiment_type} for Perf-Cost!" + ) def process( self, @@ -305,7 +318,9 @@ def process( else: if os.path.exists( - os.path.join(directory, "perf-cost", f"{name}-processed{extension}") + os.path.join( + directory, "perf-cost", f"{name}-processed{extension}" + ) ): self.logging.info(f"Skipping already processed {f}") continue @@ -349,12 +364,17 @@ def process( name, extension = os.path.splitext(f) with open( - os.path.join(directory, "perf-cost", f"{name}-processed{extension}"), + os.path.join( + directory, "perf-cost", f"{name}-processed{extension}" + ), "w", ) as out_f: out_f.write( serialize( - {**json.loads(serialize(experiments)), "statistics": statistics} + { + **json.loads(serialize(experiments)), + "statistics": statistics, + } ) ) for func in experiments.functions(): diff --git a/sebs/experiments/result.py b/sebs/experiments/result.py index 5087b904..3357ace5 100644 --- a/sebs/experiments/result.py +++ b/sebs/experiments/result.py @@ -61,7 +61,9 @@ def metrics(self, func: str) -> dict: return self._metrics[func] @staticmethod - def deserialize(cached_config: dict, cache: Cache, handlers: LoggingHandlers) -> "Result": + def deserialize( + cached_config: dict, cache: Cache, handlers: LoggingHandlers + ) -> "Result": invocations: Dict[str, dict] = {} for func, func_invocations in cached_config["_invocations"].items(): invocations[func] = {} @@ -69,7 +71,9 @@ def deserialize(cached_config: dict, cache: Cache, handlers: LoggingHandlers) -> invocations[func][invoc_id] = ExecutionResult.deserialize(invoc) ret = Result( ExperimentConfig.deserialize(cached_config["config"]["experiments"]), - DeploymentConfig.deserialize(cached_config["config"]["deployment"], cache, handlers), + DeploymentConfig.deserialize( + cached_config["config"]["deployment"], cache, handlers + ), invocations, # FIXME: compatibility with old results cached_config["metrics"] if "metrics" in cached_config else {}, diff --git a/sebs/faas/benchmark.py b/sebs/faas/benchmark.py index 891a9924..60458495 100644 --- a/sebs/faas/benchmark.py +++ b/sebs/faas/benchmark.py @@ -131,12 +131,15 @@ def __init__(self): self.billing = ExecutionBilling() @staticmethod - def from_times(client_time_begin: datetime, client_time_end: datetime) -> "ExecutionResult": + def from_times( + client_time_begin: datetime, client_time_end: datetime + ) -> "ExecutionResult": ret = ExecutionResult() ret.times.client_begin = client_time_begin ret.times.client_end = client_time_end ret.times.client = int( - (client_time_end - client_time_begin) / timedelta(microseconds=1)) + (client_time_end - client_time_begin) / timedelta(microseconds=1) + ) return ret def parse_benchmark_output(self, output: dict): @@ -153,8 +156,7 @@ def parse_benchmark_output(self, output: dict): def parse_benchmark_execution(self, execution: Execution): self.output = json.loads(execution.result) self.times.benchmark = int( - (execution.start_time - execution.end_time) - / timedelta(microseconds=1) + (execution.start_time - execution.end_time) / timedelta(microseconds=1) ) @staticmethod @@ -162,8 +164,7 @@ def deserialize(cached_config: dict) -> "ExecutionResult": ret = ExecutionResult() ret.times = ExecutionTimes.deserialize(cached_config["times"]) ret.billing = ExecutionBilling.deserialize(cached_config["billing"]) - ret.provider_times = ProviderTimes.deserialize( - cached_config["provider_times"]) + ret.provider_times = ProviderTimes.deserialize(cached_config["provider_times"]) ret.stats = ExecutionStats.deserialize(cached_config["stats"]) ret.request_id = cached_config["request_id"] ret.output = cached_config["output"] @@ -215,10 +216,12 @@ def _http_invoke(self, payload: dict, url: str) -> ExecutionResult: if status_code != 200: self.logging.error( - "Invocation on URL {} failed with status code {}!".format(url, status_code)) + "Invocation on URL {} failed with status code {}!".format( + url, status_code + ) + ) self.logging.error("Output: {}".format(output)) - raise RuntimeError( - f"Failed invocation of function! Output: {output}") + raise RuntimeError(f"Failed invocation of function! Output: {output}") self.logging.debug("Invoke of function was successful") result = ExecutionResult.from_times(begin, end) @@ -230,10 +233,14 @@ def _http_invoke(self, payload: dict, url: str) -> ExecutionResult: return result except json.decoder.JSONDecodeError: self.logging.error( - "Invocation on URL {} failed with status code {}!".format(url, status_code)) + "Invocation on URL {} failed with status code {}!".format( + url, status_code + ) + ) self.logging.error("Output: {}".format(data.getvalue().decode())) raise RuntimeError( - f"Failed invocation of function! Output: {data.getvalue().decode()}") + f"Failed invocation of function! Output: {data.getvalue().decode()}" + ) # FIXME: 3.7+, future annotations @staticmethod @@ -300,7 +307,11 @@ def updated_code(self, val: bool): self._updated_code = val def triggers_all(self) -> List[Trigger]: - return [trig for trigger_type, triggers in self._triggers.items() for trig in triggers] + return [ + trig + for trigger_type, triggers in self._triggers.items() + for trig in triggers + ] def triggers(self, trigger_type: Trigger.TriggerType) -> List[Trigger]: try: @@ -320,7 +331,9 @@ def serialize(self) -> dict: "hash": self._code_package_hash, "code_package": self._code_package, "triggers": [ - obj.serialize() for t_type, triggers in self._triggers.items() for obj in triggers + obj.serialize() + for t_type, triggers in self._triggers.items() + for obj in triggers ], } @@ -333,5 +346,6 @@ def deserialize(cached_config: dict) -> "Function": class Function(Benchmark): pass + class Workflow(Benchmark): - pass \ No newline at end of file + pass diff --git a/sebs/faas/config.py b/sebs/faas/config.py index 55730e88..4eb349fe 100644 --- a/sebs/faas/config.py +++ b/sebs/faas/config.py @@ -29,7 +29,9 @@ def __init__(self): @staticmethod @abstractmethod - def deserialize(config: dict, cache: Cache, handlers: LoggingHandlers) -> "Credentials": + def deserialize( + config: dict, cache: Cache, handlers: LoggingHandlers + ) -> "Credentials": pass """ @@ -60,7 +62,9 @@ def __init__(self): @staticmethod @abstractmethod - def deserialize(config: dict, cache: Cache, handlers: LoggingHandlers) -> "Resources": + def deserialize( + config: dict, cache: Cache, handlers: LoggingHandlers + ) -> "Resources": pass """ diff --git a/sebs/faas/fsm.py b/sebs/faas/fsm.py index fb46c1ef..896b1f36 100644 --- a/sebs/faas/fsm.py +++ b/sebs/faas/fsm.py @@ -5,7 +5,6 @@ class State(ABC): - def __init__(self, name: str): self.name = name @@ -16,32 +15,19 @@ def deserialize(name: str, payload: dict) -> "State": class Task(State): - - def __init__(self, - name: str, - func_name: str, - next: Optional[str]): + def __init__(self, name: str, func_name: str, next: Optional[str]): self.name = name self.func_name = func_name self.next = next @classmethod def deserialize(cls, name: str, payload: dict) -> "Task": - return cls( - name=name, - func_name=payload["func_name"], - next=payload.get("next") - ) + return cls(name=name, func_name=payload["func_name"], next=payload.get("next")) class Switch(State): - class Case: - def __init__(self, - var: str, - op: str, - val: str, - next: str): + def __init__(self, var: str, op: str, val: str, next: str): self.var = var self.op = op self.val = val @@ -51,10 +37,7 @@ def __init__(self, def deserialize(payload: dict) -> "Switch.Case": return Switch.Case(**payload) - def __init__(self, - name: str, - cases: List[Case], - default: Optional[str]): + def __init__(self, name: str, cases: List[Case], default: Optional[str]): self.name = name self.cases = cases self.default = default @@ -63,20 +46,11 @@ def __init__(self, def deserialize(cls, name: str, payload: dict) -> "Switch": cases = [Switch.Case.deserialize(c) for c in payload["cases"]] - return cls( - name=name, - cases=cases, - default=payload["default"] - ) + return cls(name=name, cases=cases, default=payload["default"]) class Map(State): - - def __init__(self, - name: str, - func_name: str, - array: str, - next: Optional[str]): + def __init__(self, name: str, func_name: str, array: str, next: Optional[str]): self.name = name self.func_name = func_name self.array = array @@ -88,19 +62,14 @@ def deserialize(cls, name: str, payload: dict) -> "Map": name=name, func_name=payload["func_name"], array=payload["array"], - next=payload.get("next") + next=payload.get("next"), ) -_STATE_TYPES = { - "task": Task, - "switch": Switch, - "map": Map -} +_STATE_TYPES = {"task": Task, "switch": Switch, "map": Map} class Generator(ABC): - def __init__(self, export_func: Callable[[dict], str] = json.dumps): self._export_func = export_func @@ -108,8 +77,9 @@ def parse(self, path: str): with open(path) as f: definition = json.load(f) - self.states = {n: State.deserialize(n, s) - for n, s in definition["states"].items()} + self.states = { + n: State.deserialize(n, s) for n, s in definition["states"].items() + } self.root = self.states[definition["root"]] def generate(self) -> str: @@ -151,4 +121,4 @@ def encode_switch(self, state: Switch) -> Union[dict, List[dict]]: @abstractmethod def encode_map(self, state: Map) -> Union[dict, List[dict]]: - pass \ No newline at end of file + pass diff --git a/sebs/faas/storage.py b/sebs/faas/storage.py index e54812e2..77f8fd80 100644 --- a/sebs/faas/storage.py +++ b/sebs/faas/storage.py @@ -168,7 +168,9 @@ def clean_bucket(self, bucket_name: str): def allocate_buckets(self, benchmark: str, requested_buckets: Tuple[int, int]): # Load cached information - cached_buckets = self.cache_client.get_storage_config(self.deployment_name(), benchmark) + cached_buckets = self.cache_client.get_storage_config( + self.deployment_name(), benchmark + ) if cached_buckets: self.input_buckets = cached_buckets["buckets"]["input"] for bucket in self.input_buckets: @@ -177,19 +179,27 @@ def allocate_buckets(self, benchmark: str, requested_buckets: Tuple[int, int]): # for bucket in self.output_buckets: # self.clean_bucket(bucket) self.cached = True - self.logging.info("Using cached storage input buckets {}".format(self.input_buckets)) - self.logging.info("Using cached storage output buckets {}".format(self.output_buckets)) + self.logging.info( + "Using cached storage input buckets {}".format(self.input_buckets) + ) + self.logging.info( + "Using cached storage output buckets {}".format(self.output_buckets) + ) return buckets = self.list_buckets(self.correct_name(benchmark)) for i in range(0, requested_buckets[0]): self.input_buckets.append( - self._create_bucket(self.correct_name("{}-{}-input".format(benchmark, i)), buckets) + self._create_bucket( + self.correct_name("{}-{}-input".format(benchmark, i)), buckets + ) ) self.input_buckets_files.append(self.list_bucket(self.input_buckets[-1])) for i in range(0, requested_buckets[1]): self.output_buckets.append( - self._create_bucket(self.correct_name("{}-{}-output".format(benchmark, i)), buckets) + self._create_bucket( + self.correct_name("{}-{}-output".format(benchmark, i)), buckets + ) ) self.save_storage(benchmark) diff --git a/sebs/faas/system.py b/sebs/faas/system.py index 6b345208..24274fba 100644 --- a/sebs/faas/system.py +++ b/sebs/faas/system.py @@ -109,7 +109,9 @@ def get_storage(self, replace_existing: bool) -> PersistentStorage: """ @abstractmethod - def package_code(self, code_package: CodePackage, directory: str, is_workflow: bool) -> Tuple[str, int]: + def package_code( + self, code_package: CodePackage, directory: str, is_workflow: bool + ) -> Tuple[str, int]: pass @abstractmethod @@ -117,7 +119,9 @@ def create_function(self, code_package: CodePackage, func_name: str) -> Function pass @abstractmethod - def create_workflow(self, code_package: CodePackage, workflow_name: str) -> Workflow: + def create_workflow( + self, code_package: CodePackage, workflow_name: str + ) -> Workflow: pass @abstractmethod @@ -141,9 +145,14 @@ def update_function(self, function: Function, code_package: CodePackage): """ - def get_function(self, code_package: CodePackage, func_name: Optional[str] = None) -> Function: - if code_package.language_version not in self.system_config.supported_language_versions( - self.name(), code_package.language_name + def get_function( + self, code_package: CodePackage, func_name: Optional[str] = None + ) -> Function: + if ( + code_package.language_version + not in self.system_config.supported_language_versions( + self.name(), code_package.language_name + ) ): raise Exception( "Unsupported {language} version {version} in {system}!".format( @@ -189,7 +198,8 @@ def get_function(self, code_package: CodePackage, func_name: Optional[str] = Non self.cached_benchmark(function) self.logging.info( "Using cached function {fname} in {loc}".format( - fname=func_name, loc=code_location) + fname=func_name, loc=code_location + ) ) # is the function up-to-date? if function.code_package_hash != code_package.hash or rebuilt: @@ -215,9 +225,14 @@ def get_function(self, code_package: CodePackage, func_name: Optional[str] = Non def update_workflow(self, workflow: Workflow, code_package: CodePackage): pass - def get_workflow(self, code_package: CodePackage, workflow_name: Optional[str] = None): - if code_package.language_version not in self.system_config.supported_language_versions( - self.name(), code_package.language_name + def get_workflow( + self, code_package: CodePackage, workflow_name: Optional[str] = None + ): + if ( + code_package.language_version + not in self.system_config.supported_language_versions( + self.name(), code_package.language_name + ) ): raise Exception( "Unsupported {language} version {version} in {system}!".format( @@ -263,7 +278,8 @@ def get_workflow(self, code_package: CodePackage, workflow_name: Optional[str] = self.cached_benchmark(workflow) self.logging.info( "Using cached workflow {workflow_name} in {loc}".format( - workflow_name=workflow_name, loc=code_location) + workflow_name=workflow_name, loc=code_location + ) ) # is the function up-to-date? if workflow.code_package_hash != code_package.hash or rebuilt: @@ -313,11 +329,15 @@ def create_trigger(self, obj, trigger_type: Trigger.TriggerType) -> Trigger: raise TypeError("Cannot create trigger for {obj}") @abstractmethod - def create_function_trigger(self, function: Function, trigger_type: Trigger.TriggerType) -> Trigger: + def create_function_trigger( + self, function: Function, trigger_type: Trigger.TriggerType + ) -> Trigger: pass @abstractmethod - def create_workflow_trigger(self, workflow: Workflow, trigger_type: Trigger.TriggerType) -> Trigger: + def create_workflow_trigger( + self, workflow: Workflow, trigger_type: Trigger.TriggerType + ) -> Trigger: pass # @abstractmethod diff --git a/sebs/gcp/config.py b/sebs/gcp/config.py index 11453376..ed448e02 100644 --- a/sebs/gcp/config.py +++ b/sebs/gcp/config.py @@ -36,7 +36,9 @@ def initialize(gcp_credentials: str) -> Credentials: return GCPCredentials(gcp_credentials) @staticmethod - def deserialize(config: dict, cache: Cache, handlers: LoggingHandlers) -> Credentials: + def deserialize( + config: dict, cache: Cache, handlers: LoggingHandlers + ) -> Credentials: cached_config = cache.get_config("gcp") ret: GCPCredentials # Load cached values but only if they are non-empty @@ -55,7 +57,9 @@ def deserialize(config: dict, cache: Cache, handlers: LoggingHandlers) -> Creden else: # Check for new config if "credentials" in config and config["credentials"]: - ret = cast(GCPCredentials, GCPCredentials.initialize(config["credentials"])) + ret = cast( + GCPCredentials, GCPCredentials.initialize(config["credentials"]) + ) os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = ret.gcp_credentials # Look for default GCP credentials elif "GOOGLE_APPLICATION_CREDENTIALS" in os.environ: @@ -89,7 +93,9 @@ def serialize(self) -> dict: return out def update_cache(self, cache: Cache): - cache.update_config(val=self.gcp_credentials, keys=["gcp", "credentials", "keys_json"]) + cache.update_config( + val=self.gcp_credentials, keys=["gcp", "credentials", "keys_json"] + ) """ @@ -117,17 +123,23 @@ def serialize(self) -> dict: return {} @staticmethod - def deserialize(config: dict, cache: Cache, handlers: LoggingHandlers) -> "Resources": + def deserialize( + config: dict, cache: Cache, handlers: LoggingHandlers + ) -> "Resources": cached_config = cache.get_config("gcp") ret: GCPResources if cached_config and "resources" in cached_config: - ret = cast(GCPResources, GCPResources.initialize(cached_config["resources"])) + ret = cast( + GCPResources, GCPResources.initialize(cached_config["resources"]) + ) ret.logging_handlers = handlers ret.logging.info("Using cached resources for GCP") else: ret = cast(GCPResources, GCPResources.initialize(config)) ret.logging_handlers = handlers - ret.logging.info("No cached resources for GCP found, using user configuration.") + ret.logging.info( + "No cached resources for GCP found, using user configuration." + ) return ret def update_cache(self, cache: Cache): @@ -172,8 +184,12 @@ def redis_host(self) -> str: @staticmethod def deserialize(config: dict, cache: Cache, handlers: LoggingHandlers) -> "Config": cached_config = cache.get_config("gcp") - credentials = cast(GCPCredentials, GCPCredentials.deserialize(config, cache, handlers)) - resources = cast(GCPResources, GCPResources.deserialize(config, cache, handlers)) + credentials = cast( + GCPCredentials, GCPCredentials.deserialize(config, cache, handlers) + ) + resources = cast( + GCPResources, GCPResources.deserialize(config, cache, handlers) + ) config_obj = GCPConfig(credentials, resources) config_obj.logging_handlers = handlers if cached_config: @@ -186,7 +202,8 @@ def deserialize(config: dict, cache: Cache, handlers: LoggingHandlers) -> "Confi if "project_name" not in config or not config["project_name"]: if "GCP_PROJECT_NAME" in os.environ: GCPConfig.initialize( - config_obj, {**config, "project_name": os.environ["GCP_PROJECT_NAME"]} + config_obj, + {**config, "project_name": os.environ["GCP_PROJECT_NAME"]}, ) else: raise RuntimeError( @@ -208,7 +225,10 @@ def deserialize(config: dict, cache: Cache, handlers: LoggingHandlers) -> "Confi old_value = getattr(config_obj, config_key) # ignore empty values - if getattr(config_obj, config_key) != config[config_key] and config[config_key]: + if ( + getattr(config_obj, config_key) != config[config_key] + and config[config_key] + ): config_obj.logging.info( f"Updating cached key {config_key} with {old_value} " f"to user-provided value {config[config_key]}." @@ -232,7 +252,7 @@ def serialize(self) -> dict: "region": self._region, "credentials": self._credentials.serialize(), "resources": self._resources.serialize(), - "redis_host": self._redis_host + "redis_host": self._redis_host, } return out diff --git a/sebs/gcp/function.py b/sebs/gcp/function.py index 317781cf..1a70fe4c 100644 --- a/sebs/gcp/function.py +++ b/sebs/gcp/function.py @@ -47,7 +47,9 @@ def deserialize(cached_config: dict) -> "GCPFunction": for trigger in cached_config["triggers"]: trigger_type = cast( Trigger, - {"Library": FunctionLibraryTrigger, "HTTP": HTTPTrigger}.get(trigger["type"]), + {"Library": FunctionLibraryTrigger, "HTTP": HTTPTrigger}.get( + trigger["type"] + ), ) assert trigger_type, "Unknown trigger type {}".format(trigger["type"]) ret.add_trigger(trigger_type.deserialize(trigger)) diff --git a/sebs/gcp/gcp.py b/sebs/gcp/gcp.py index 27640e00..76a76dca 100644 --- a/sebs/gcp/gcp.py +++ b/sebs/gcp/gcp.py @@ -78,8 +78,7 @@ def workflow_type() -> "Type[Workflow]": """ def initialize(self, config: Dict[str, str] = {}): - self.function_client = build( - "cloudfunctions", "v1", cache_discovery=False) + self.function_client = build("cloudfunctions", "v1", cache_discovery=False) self.workflow_client = build("workflows", "v1", cache_discovery=False) self.get_storage() @@ -105,7 +104,8 @@ def get_storage( ) -> PersistentStorage: if not self.storage: self.storage = GCPStorage( - self.config.region, self.cache_client, replace_existing) + self.config.region, self.cache_client, replace_existing + ) self.storage.logging_handlers = self.logging_handlers else: self.storage.replace_existing = replace_existing @@ -143,7 +143,9 @@ def format_function_name(func_name: str) -> str: :return: path to packaged code and its size """ - def package_code(self, code_package: CodePackage, directory: str, is_workflow: bool) -> Tuple[str, int]: + def package_code( + self, code_package: CodePackage, directory: str, is_workflow: bool + ) -> Tuple[str, int]: CONFIG_FILES = { "python": ["handler.py", ".python_packages"], "nodejs": ["handler.js", "node_modules"], @@ -167,7 +169,9 @@ def package_code(self, code_package: CodePackage, directory: str, is_workflow: b new_path = os.path.join(directory, new_name) shutil.move(old_path, new_path) - replace_string_in_file(new_path, "{{REDIS_HOST}}", f"\"{self.config.redis_host}\"") + replace_string_in_file( + new_path, "{{REDIS_HOST}}", f'"{self.config.redis_host}"' + ) """ zip the whole directroy (the zip-file gets uploaded to gcp later) @@ -193,7 +197,9 @@ def package_code(self, code_package: CodePackage, directory: str, is_workflow: b return os.path.join(directory, "{}.zip".format(code_package.name)), bytes_size - def create_function(self, code_package: CodePackage, func_name: str) -> "GCPFunction": + def create_function( + self, code_package: CodePackage, func_name: str + ) -> "GCPFunction": package = code_package.code_location benchmark = code_package.name @@ -209,12 +215,16 @@ def create_function(self, code_package: CodePackage, func_name: str) -> "GCPFunc code_bucket, idx = storage_client.add_input_bucket(benchmark) storage_client.upload(code_bucket, package, code_package_name) self.logging.info( - "Uploading function {} code to {}".format(func_name, code_bucket)) + "Uploading function {} code to {}".format(func_name, code_bucket) + ) - full_func_name = GCP.get_full_function_name( - project_name, location, func_name) - get_req = self.function_client.projects( - ).locations().functions().get(name=full_func_name) + full_func_name = GCP.get_full_function_name(project_name, location, func_name) + get_req = ( + self.function_client.projects() + .locations() + .functions() + .get(name=full_func_name) + ) try: get_req.execute() except HttpError: @@ -227,12 +237,16 @@ def create_function(self, code_package: CodePackage, func_name: str) -> "GCPFunc body={ "name": full_func_name, "entryPoint": "handler", - "runtime": code_package.language_name + language_runtime.replace(".", ""), + "runtime": code_package.language_name + + language_runtime.replace(".", ""), "availableMemoryMb": memory, "timeout": str(timeout) + "s", "httpsTrigger": {}, "ingressSettings": "ALLOW_ALL", - "sourceArchiveUrl": "gs://" + code_bucket + "/" + code_package_name, + "sourceArchiveUrl": "gs://" + + code_bucket + + "/" + + code_package_name, }, ) ) @@ -247,8 +261,10 @@ def create_function(self, code_package: CodePackage, func_name: str) -> "GCPFunc body={ "policy": { "bindings": [ - {"role": "roles/cloudfunctions.invoker", - "members": ["allUsers"]} + { + "role": "roles/cloudfunctions.invoker", + "members": ["allUsers"], + } ] } }, @@ -256,7 +272,8 @@ def create_function(self, code_package: CodePackage, func_name: str) -> "GCPFunc ) allow_unauthenticated_req.execute() self.logging.info( - f"Function {func_name} accepts now unauthenticated invocations!") + f"Function {func_name} accepts now unauthenticated invocations!" + ) function = GCPFunction( func_name, benchmark, code_package.hash, timeout, memory, code_bucket @@ -264,7 +281,8 @@ def create_function(self, code_package: CodePackage, func_name: str) -> "GCPFunc else: # if result is not empty, then function does exists self.logging.info( - "Function {} exists on GCP, update the instance.".format(func_name)) + "Function {} exists on GCP, update the instance.".format(func_name) + ) function = GCPFunction( name=func_name, @@ -285,8 +303,9 @@ def create_function(self, code_package: CodePackage, func_name: str) -> "GCPFunc return function - def create_function_trigger(self, function: Function, - trigger_type: Trigger.TriggerType) -> Trigger: + def create_function_trigger( + self, function: Function, trigger_type: Trigger.TriggerType + ) -> Trigger: from sebs.gcp.triggers import HTTPTrigger if trigger_type == Trigger.TriggerType.HTTP: @@ -294,11 +313,14 @@ def create_function_trigger(self, function: Function, location = self.config.region project_name = self.config.project_name full_func_name = GCP.get_full_function_name( - project_name, location, function.name) - self.logging.info( - f"Function {function.name} - waiting for deployment...") + project_name, location, function.name + ) + self.logging.info(f"Function {function.name} - waiting for deployment...") our_function_req = ( - self.function_client.projects().locations().functions().get(name=full_func_name) + self.function_client.projects() + .locations() + .functions() + .get(name=full_func_name) ) deployed = False @@ -339,8 +361,7 @@ def update_function(self, function: Function, code_package: CodePackage): bucket = function.code_bucket(code_package.name, storage) storage.upload(bucket, code_package.code_location, code_package_name) - self.logging.info( - f"Uploaded new code package to {bucket}/{code_package_name}") + self.logging.info(f"Uploaded new code package to {bucket}/{code_package_name}") full_func_name = GCP.get_full_function_name( self.config.project_name, self.config.region, function.name ) @@ -353,7 +374,8 @@ def update_function(self, function: Function, code_package: CodePackage): body={ "name": full_func_name, "entryPoint": "handler", - "runtime": code_package.language_name + language_runtime.replace(".", ""), + "runtime": code_package.language_name + + language_runtime.replace(".", ""), "availableMemoryMb": function.memory, "timeout": str(function.timeout) + "s", "httpsTrigger": {}, @@ -374,7 +396,9 @@ def update_function(self, function: Function, code_package: CodePackage): def get_full_function_name(project_name: str, location: str, func_name: str): return f"projects/{project_name}/locations/{location}/functions/{func_name}" - def create_workflow(self, code_package: CodePackage, workflow_name: str) -> "GCPWorkflow": + def create_workflow( + self, code_package: CodePackage, workflow_name: str + ) -> "GCPWorkflow": benchmark = code_package.name timeout = code_package.config.timeout memory = code_package.config.memory @@ -383,20 +407,20 @@ def create_workflow(self, code_package: CodePackage, workflow_name: str) -> "GCP project_name = self.config.project_name # Make sure we have a valid workflow benchmark - definition_path = os.path.join( - code_package.path, "definition.json") + definition_path = os.path.join(code_package.path, "definition.json") if not os.path.exists(definition_path): - raise ValueError( - f"No workflow definition found for {workflow_name}") + raise ValueError(f"No workflow definition found for {workflow_name}") # First we create a function for each code file - prefix = workflow_name+"___" + prefix = workflow_name + "___" code_files = list(code_package.get_code_files(include_config=False)) func_names = [os.path.splitext(os.path.basename(p))[0] for p in code_files] - funcs = [self.create_function(code_package, prefix+fn) for fn in func_names] + funcs = [self.create_function(code_package, prefix + fn) for fn in func_names] # generate workflow definition.json - urls = [self.create_function_trigger(f, Trigger.TriggerType.HTTP).url for f in funcs] + urls = [ + self.create_function_trigger(f, Trigger.TriggerType.HTTP).url for f in funcs + ] func_triggers = {n: u for (n, u) in zip(func_names, urls)} gen = GCPGenerator(workflow_name, func_triggers) @@ -407,7 +431,8 @@ def create_workflow(self, code_package: CodePackage, workflow_name: str) -> "GCP parent = GCP.get_location(project_name, location) for map_id, map_def in gen.generate_maps(): full_workflow_name = GCP.get_full_workflow_name( - project_name, location, map_id) + project_name, location, map_id + ) create_req = ( self.workflow_client.projects() .locations() @@ -418,16 +443,21 @@ def create_workflow(self, code_package: CodePackage, workflow_name: str) -> "GCP body={ "name": full_workflow_name, "sourceContents": map_def, - } + }, ) ) ret = create_req.execute() self.logging.info(f"Map workflow {map_id} has been created!") full_workflow_name = GCP.get_full_workflow_name( - project_name, location, workflow_name) - get_req = self.workflow_client.projects().locations( - ).workflows().get(name=full_workflow_name) + project_name, location, workflow_name + ) + get_req = ( + self.workflow_client.projects() + .locations() + .workflows() + .get(name=full_workflow_name) + ) try: get_req.execute() @@ -442,19 +472,26 @@ def create_workflow(self, code_package: CodePackage, workflow_name: str) -> "GCP body={ "name": full_workflow_name, "sourceContents": definition, - } + }, ) ) ret = create_req.execute() self.logging.info(f"Workflow {workflow_name} has been created!") workflow = GCPWorkflow( - workflow_name, funcs, benchmark, code_package.hash, timeout, memory, code_bucket + workflow_name, + funcs, + benchmark, + code_package.hash, + timeout, + memory, + code_bucket, ) else: # if result is not empty, then function does exists self.logging.info( - "Workflow {} exists on GCP, update the instance.".format(workflow_name)) + "Workflow {} exists on GCP, update the instance.".format(workflow_name) + ) workflow = GCPWorkflow( name=workflow_name, @@ -476,13 +513,13 @@ def create_workflow(self, code_package: CodePackage, workflow_name: str) -> "GCP return workflow - def create_workflow_trigger(self, workflow: Workflow, - trigger_type: Trigger.TriggerType) -> Trigger: + def create_workflow_trigger( + self, workflow: Workflow, trigger_type: Trigger.TriggerType + ) -> Trigger: from sebs.gcp.triggers import WorkflowLibraryTrigger if trigger_type == Trigger.TriggerType.HTTP: - raise NotImplementedError( - 'Cannot create http triggers for workflows.') + raise NotImplementedError("Cannot create http triggers for workflows.") else: trigger = WorkflowLibraryTrigger(workflow.name, self) @@ -495,20 +532,20 @@ def update_workflow(self, workflow: Workflow, code_package: CodePackage): workflow = cast(GCPWorkflow, workflow) # Make sure we have a valid workflow benchmark - definition_path = os.path.join( - code_package.path, "definition.json") + definition_path = os.path.join(code_package.path, "definition.json") if not os.path.exists(definition_path): - raise ValueError( - f"No workflow definition found for {workflow.name}") + raise ValueError(f"No workflow definition found for {workflow.name}") # First we create a function for each code file - prefix = workflow.name+"___" + prefix = workflow.name + "___" code_files = list(code_package.get_code_files(include_config=False)) func_names = [os.path.splitext(os.path.basename(p))[0] for p in code_files] - funcs = [self.create_function(code_package, prefix+fn) for fn in func_names] + funcs = [self.create_function(code_package, prefix + fn) for fn in func_names] # Generate workflow definition.json - urls = [self.create_function_trigger(f, Trigger.TriggerType.HTTP).url for f in funcs] + urls = [ + self.create_function_trigger(f, Trigger.TriggerType.HTTP).url for f in funcs + ] func_triggers = {n: u for (n, u) in zip(func_names, urls)} gen = GCPGenerator(workflow.name, func_triggers) gen.parse(definition_path) @@ -527,7 +564,7 @@ def update_workflow(self, workflow: Workflow, code_package: CodePackage): body={ "name": full_workflow_name, "sourceContents": map_def, - } + }, ) ) ret = patch_req.execute() @@ -542,10 +579,7 @@ def update_workflow(self, workflow: Workflow, code_package: CodePackage): .workflows() .patch( name=full_workflow_name, - body={ - "name": full_workflow_name, - "sourceContents": definition - }, + body={"name": full_workflow_name, "sourceContents": definition}, ) ) req.execute() @@ -564,7 +598,12 @@ def shutdown(self) -> None: super().shutdown() def download_metrics( - self, function_name: str, start_time: int, end_time: int, requests: dict, metrics: dict + self, + function_name: str, + start_time: int, + end_time: int, + requests: dict, + metrics: dict, ): from google.api_core import exceptions @@ -577,8 +616,7 @@ def wrapper(gen): except StopIteration: break except exceptions.ResourceExhausted: - self.logging.info( - "Google Cloud resources exhausted, sleeping 30s") + self.logging.info("Google Cloud resources exhausted, sleeping 30s") sleep(30) """ @@ -591,7 +629,8 @@ def wrapper(gen): logging_client = gcp_logging.Client() logger = logging_client.logger( - "cloudfunctions.googleapis.com%2Fcloud-functions") + "cloudfunctions.googleapis.com%2Fcloud-functions" + ) """ GCP accepts only single date format: 'YYYY-MM-DDTHH:MM:SSZ'. @@ -633,8 +672,9 @@ def wrapper(gen): assert regex_result exec_time = regex_result.group().split()[0] # convert into microseconds - requests[execution_id].provider_times.execution = int( - exec_time) * 1000 + requests[execution_id].provider_times.execution = ( + int(exec_time) * 1000 + ) invocations_processed += 1 self.logging.info( f"GCP: Received {entries} entries, found time metrics for {invocations_processed} " @@ -648,8 +688,7 @@ def wrapper(gen): """ # Set expected metrics here - available_metrics = ["execution_times", - "user_memory_bytes", "network_egress"] + available_metrics = ["execution_times", "user_memory_bytes", "network_egress"] client = monitoring_v3.MetricServiceClient() project_name = client.common_project_path(self.config.project_name) @@ -671,7 +710,8 @@ def wrapper(gen): list_request = monitoring_v3.ListTimeSeriesRequest( name=project_name, filter='metric.type = "cloudfunctions.googleapis.com/function/{}"'.format( - metric), + metric + ), interval=interval, ) @@ -699,8 +739,9 @@ def _enforce_cold_start(self, function: Function): .patch( name=name, updateMask="environmentVariables", - body={"environmentVariables": { - "cold_start": str(self.cold_start_counter)}}, + body={ + "environmentVariables": {"cold_start": str(self.cold_start_counter)} + }, ) ) res = req.execute() @@ -723,8 +764,7 @@ def enforce_cold_start(self, functions: List[Function], code_package: CodePackag if not self.is_deployed(func.name, versionId): undeployed_functions.append((versionId, func)) deployed = len(new_versions) - len(undeployed_functions) - self.logging.info( - f"Redeployed {deployed} out of {len(new_versions)}") + self.logging.info(f"Redeployed {deployed} out of {len(new_versions)}") if deployed == len(new_versions): deployment_done = True break @@ -734,7 +774,9 @@ def enforce_cold_start(self, functions: List[Function], code_package: CodePackag self.cold_start_counter += 1 - def get_functions(self, code_package: CodePackage, function_names: List[str]) -> List["Function"]: + def get_functions( + self, code_package: CodePackage, function_names: List[str] + ) -> List["Function"]: functions: List["Function"] = [] undeployed_functions_before = [] @@ -750,10 +792,10 @@ def get_functions(self, code_package: CodePackage, function_names: List[str]) -> for func in undeployed_functions_before: if not self.is_deployed(func.name): undeployed_functions.append(func) - deployed = len(undeployed_functions_before) - \ - len(undeployed_functions) + deployed = len(undeployed_functions_before) - len(undeployed_functions) self.logging.info( - f"Deployed {deployed} out of {len(undeployed_functions_before)}") + f"Deployed {deployed} out of {len(undeployed_functions_before)}" + ) if deployed == len(undeployed_functions_before): deployment_done = True break @@ -766,7 +808,8 @@ def get_functions(self, code_package: CodePackage, function_names: List[str]) -> def is_deployed(self, func_name: str, versionId: int = -1) -> bool: name = GCP.get_full_function_name( - self.config.project_name, self.config.region, func_name) + self.config.project_name, self.config.region, func_name + ) function_client = self.get_function_client() status_req = function_client.projects().locations().functions().get(name=name) status_res = status_req.execute() @@ -777,7 +820,8 @@ def is_deployed(self, func_name: str, versionId: int = -1) -> bool: def deployment_version(self, func: Function) -> int: name = GCP.get_full_function_name( - self.config.project_name, self.config.region, func.name) + self.config.project_name, self.config.region, func.name + ) function_client = self.get_function_client() status_req = function_client.projects().locations().functions().get(name=name) status_res = status_req.execute() @@ -813,8 +857,7 @@ def helper_zip(base_directory: str, path: str, archive: zipfile.ZipFile): GCP.helper_zip(base_directory, directory, archive) else: if directory != archive.filename: # prevent form including itself - archive.write(directory, os.path.relpath( - directory, base_directory)) + archive.write(directory, os.path.relpath(directory, base_directory)) """ https://gist.github.com/felixSchl/d38b455df8bf83a78d3d @@ -829,8 +872,7 @@ def helper_zip(base_directory: str, path: str, archive: zipfile.ZipFile): @staticmethod def recursive_zip(directory: str, archname: str): - archive = zipfile.ZipFile( - archname, "w", zipfile.ZIP_DEFLATED, compresslevel=9) + archive = zipfile.ZipFile(archname, "w", zipfile.ZIP_DEFLATED, compresslevel=9) if os.path.isdir(directory): GCP.helper_zip(directory, directory, archive) else: diff --git a/sebs/gcp/generator.py b/sebs/gcp/generator.py index 83ef9f14..0dde4a6a 100644 --- a/sebs/gcp/generator.py +++ b/sebs/gcp/generator.py @@ -5,7 +5,6 @@ class GCPGenerator(Generator): - def __init__(self, workflow_name: str, func_triggers: Dict[str, str]): super().__init__() self._workflow_name = workflow_name @@ -13,22 +12,9 @@ def __init__(self, workflow_name: str, func_triggers: Dict[str, str]): self._map_funcs = dict() def postprocess(self, states: List[State], payloads: List[dict]) -> dict: - payloads.append({ - "final": { - "return": [ - "${res}" - ] - } - }) + payloads.append({"final": {"return": ["${res}"]}}) - definition = { - "main" : { - "params": [ - "res" - ], - "steps": payloads - } - } + definition = {"main": {"params": ["res"], "steps": payloads}} return definition @@ -39,38 +25,24 @@ def encode_task(self, state: Task) -> Union[dict, List[dict]]: { state.name: { "call": "http.post", - "args": { - "url": url, - "body": "${res}" - }, - "result": "res" + "args": {"url": url, "body": "${res}"}, + "result": "res", } }, - { - "assign_res_"+state.name: { - "assign": [ - { - "res": "${res.body}" - } - ] - } - } + {"assign_res_" + state.name: {"assign": [{"res": "${res.body}"}]}}, ] def encode_switch(self, state: Switch) -> Union[dict, List[dict]]: return { state.name: { "switch": [self._encode_case(c) for c in state.cases], - "next": state.default + "next": state.default, } } def _encode_case(self, case: Switch.Case) -> dict: cond = "res." + case.var + " " + case.op + " " + str(case.val) - return { - "condition": "${"+cond+"}", - "next": case.next - } + return {"condition": "${" + cond + "}", "next": case.next} def encode_map(self, state: Map) -> Union[dict, List[dict]]: id = self._workflow_name + "_" + "map" + str(uuid.uuid4())[0:8] @@ -79,37 +51,30 @@ def encode_map(self, state: Map) -> Union[dict, List[dict]]: return { state.name: { "call": "experimental.executions.map", - "args": { - "workflow_id": id, - "arguments": "${res." + state.array + "}" - }, - "result": "res" + "args": {"workflow_id": id, "arguments": "${res." + state.array + "}"}, + "result": "res", } } - def generate_maps(self): for workflow_id, url in self._map_funcs.items(): - yield (workflow_id, self._export_func({ - "main" : { - "params": ["elem"], - "steps": [ - { - "map": { - "call": "http.post", - "args": { - "url": url, - "body": "${elem}" + yield ( + workflow_id, + self._export_func( + { + "main": { + "params": ["elem"], + "steps": [ + { + "map": { + "call": "http.post", + "args": {"url": url, "body": "${elem}"}, + "result": "elem", + } }, - "result": "elem" - } - }, - { - "ret": { - "return": "${elem.body}" - } + {"ret": {"return": "${elem.body}"}}, + ], } - ] - } - })) - + } + ), + ) diff --git a/sebs/gcp/storage.py b/sebs/gcp/storage.py index 8202cd0e..9b8503da 100644 --- a/sebs/gcp/storage.py +++ b/sebs/gcp/storage.py @@ -48,7 +48,9 @@ def _create_bucket(self, name, buckets: List[str] = []): logging.info("Created bucket {}".format(bucket_name)) return bucket_name else: - logging.info("Bucket {} for {} already exists, skipping.".format(bucket_name, name)) + logging.info( + "Bucket {} for {} already exists, skipping.".format(bucket_name, name) + ) return bucket_name def download(self, bucket_name: str, key: str, filepath: str) -> None: @@ -61,7 +63,9 @@ def upload(self, bucket_name: str, filepath: str, key: str): logging.info("Upload {} to {}".format(filepath, bucket_name)) bucket_instance = self.client.bucket(bucket_name) blob = bucket_instance.blob(key, chunk_size=4 * 1024 * 1024) - gcp_storage.blob._MAX_MULTIPART_SIZE = 5 * 1024 * 1024 # workaround for connection timeout + gcp_storage.blob._MAX_MULTIPART_SIZE = ( + 5 * 1024 * 1024 + ) # workaround for connection timeout blob.upload_from_filename(filepath) def list_bucket(self, bucket_name: str) -> List[str]: @@ -96,7 +100,9 @@ def uploader_func(self, bucket_idx: int, key: str, filepath: str) -> None: if not self.replace_existing: for blob in self.input_buckets_files[bucket_idx]: if key == blob: - logging.info("Skipping upload of {} to {}".format(filepath, bucket_name)) + logging.info( + "Skipping upload of {} to {}".format(filepath, bucket_name) + ) return bucket_name = self.input_buckets[bucket_idx] self.upload(bucket_name, filepath, key) diff --git a/sebs/gcp/triggers.py b/sebs/gcp/triggers.py index 88dc9e12..fe000b8e 100644 --- a/sebs/gcp/triggers.py +++ b/sebs/gcp/triggers.py @@ -61,7 +61,8 @@ def sync_invoke(self, payload: dict) -> ExecutionResult: # GCP's fixed style for a function name config = self.deployment_client.config full_func_name = ( - f"projects/{config.project_name}/locations/" f"{config.region}/functions/{self.name}" + f"projects/{config.project_name}/locations/" + f"{config.region}/functions/{self.name}" ) function_client = self.deployment_client.get_function_client() req = ( @@ -103,14 +104,16 @@ def sync_invoke(self, payload: dict) -> ExecutionResult: # GCP's fixed style for a function name config = self.deployment_client.config full_workflow_name = GCP.get_full_workflow_name( - config.project_name, config.region, self.name) + config.project_name, config.region, self.name + ) execution_client = ExecutionsClient() execution = Execution(argument=json.dumps(payload)) begin = datetime.datetime.now() res = execution_client.create_execution( - parent=full_workflow_name, execution=execution) + parent=full_workflow_name, execution=execution + ) end = datetime.datetime.now() gcp_result = ExecutionResult.from_times(begin, end) @@ -118,9 +121,8 @@ def sync_invoke(self, payload: dict) -> ExecutionResult: # Wait for execution to finish, then print results. execution_finished = False backoff_delay = 1 # Start wait with delay of 1 second - while (not execution_finished): - execution = execution_client.get_execution( - request={"name": res.name}) + while not execution_finished: + execution = execution_client.get_execution(request={"name": res.name}) execution_finished = execution.state != Execution.State.ACTIVE # If we haven't seen the result yet, wait a second. diff --git a/sebs/gcp/workflow.py b/sebs/gcp/workflow.py index f1846bc4..d598a16e 100644 --- a/sebs/gcp/workflow.py +++ b/sebs/gcp/workflow.py @@ -53,7 +53,9 @@ def deserialize(cached_config: dict) -> "GCPWorkflow": for trigger in cached_config["triggers"]: trigger_type = cast( Trigger, - {"Library": WorkflowLibraryTrigger, "HTTP": HTTPTrigger}.get(trigger["type"]), + {"Library": WorkflowLibraryTrigger, "HTTP": HTTPTrigger}.get( + trigger["type"] + ), ) assert trigger_type, "Unknown trigger type {}".format(trigger["type"]) ret.add_trigger(trigger_type.deserialize(trigger)) diff --git a/sebs/local/config.py b/sebs/local/config.py index 3c5e18ec..9a33c93a 100644 --- a/sebs/local/config.py +++ b/sebs/local/config.py @@ -8,7 +8,9 @@ def serialize(self) -> dict: return {} @staticmethod - def deserialize(config: dict, cache: Cache, handlers: LoggingHandlers) -> Credentials: + def deserialize( + config: dict, cache: Cache, handlers: LoggingHandlers + ) -> Credentials: return LocalCredentials() diff --git a/sebs/local/deployment.py b/sebs/local/deployment.py index d3f0e4b7..66a1d50c 100644 --- a/sebs/local/deployment.py +++ b/sebs/local/deployment.py @@ -26,7 +26,11 @@ def serialize(self, path: str): with open(path, "w") as out: out.write( serialize( - {"functions": self._functions, "storage": self._storage, "inputs": self._inputs} + { + "functions": self._functions, + "storage": self._storage, + "inputs": self._inputs, + } ) ) diff --git a/sebs/local/function.py b/sebs/local/function.py index 1516390b..397efabb 100644 --- a/sebs/local/function.py +++ b/sebs/local/function.py @@ -37,7 +37,12 @@ def deserialize(cls, obj: dict) -> Trigger: class LocalFunction(Function): def __init__( - self, docker_container, port: int, name: str, benchmark: str, code_package_hash: str + self, + docker_container, + port: int, + name: str, + benchmark: str, + code_package_hash: str, ): super().__init__(benchmark, name, code_package_hash) self._instance = docker_container diff --git a/sebs/local/local.py b/sebs/local/local.py index e8b06c5c..5f38ffc0 100644 --- a/sebs/local/local.py +++ b/sebs/local/local.py @@ -119,7 +119,9 @@ def shutdown(self): benchmark: benchmark name """ - def package_code(self, code_package: CodePackage, directory: str, is_workflow: bool) -> Tuple[str, int]: + def package_code( + self, code_package: CodePackage, directory: str, is_workflow: bool + ) -> Tuple[str, int]: CONFIG_FILES = { "python": ["handler.py", "requirements.txt", ".python_packages"], @@ -140,10 +142,13 @@ def package_code(self, code_package: CodePackage, directory: str, is_workflow: b return directory, bytes_size - def create_function(self, code_package: CodePackage, func_name: str) -> "LocalFunction": + def create_function( + self, code_package: CodePackage, func_name: str + ) -> "LocalFunction": home_dir = os.path.join( - "/home", self._system_config.username(self.name(), code_package.language_name) + "/home", + self._system_config.username(self.name(), code_package.language_name), ) container_name = "{}:run.local.{}.{}".format( self._system_config.docker_repository(), @@ -161,7 +166,10 @@ def create_function(self, code_package: CodePackage, func_name: str) -> "LocalFu image=container_name, command=f"python3 server.py {self.DEFAULT_PORT}", volumes={ - code_package.code_location: {"bind": os.path.join(home_dir, "code"), "mode": "ro"} + code_package.code_location: { + "bind": os.path.join(home_dir, "code"), + "mode": "ro", + } }, environment=environment, # FIXME: make CPUs configurable @@ -180,7 +188,11 @@ def create_function(self, code_package: CodePackage, func_name: str) -> "LocalFu # tty=True, ) func = LocalFunction( - container, self.DEFAULT_PORT, func_name, code_package.name, code_package.hash + container, + self.DEFAULT_PORT, + func_name, + code_package.name, + code_package.hash, ) self.logging.info( f"Started {func_name} function at container {container.id} , running on {func._url}" @@ -199,7 +211,9 @@ def update_function(self, function: Function, code_package: CodePackage): There's only one trigger - HTTP. """ - def create_trigger(self, func: Function, trigger_type: Trigger.TriggerType) -> Trigger: + def create_trigger( + self, func: Function, trigger_type: Trigger.TriggerType + ) -> Trigger: from sebs.local.function import HTTPTrigger function = cast(LocalFunction, func) diff --git a/sebs/local/storage.py b/sebs/local/storage.py index c34f4c0d..2caaab3c 100644 --- a/sebs/local/storage.py +++ b/sebs/local/storage.py @@ -23,7 +23,9 @@ def deployment_name(): # the location does not matter MINIO_REGION = "us-east-1" - def __init__(self, docker_client: docker.client, cache_client: Cache, replace_existing: bool): + def __init__( + self, docker_client: docker.client, cache_client: Cache, replace_existing: bool + ): super().__init__(self.MINIO_REGION, cache_client, replace_existing) self._docker_client = docker_client self._port = 9000 @@ -54,7 +56,9 @@ def start(self): self.logging.error("Starting Minio storage failed! Reason: {}".format(e)) raise RuntimeError("Starting Minio storage unsuccesful") except Exception as e: - self.logging.error("Starting Minio storage failed! Unknown error: {}".format(e)) + self.logging.error( + "Starting Minio storage failed! Unknown error: {}".format(e) + ) raise RuntimeError("Starting Minio storage unsuccesful") def configure_connection(self): @@ -83,14 +87,19 @@ def stop(self): def get_connection(self): return minio.Minio( - self._url, access_key=self._access_key, secret_key=self._secret_key, secure=False + self._url, + access_key=self._access_key, + secret_key=self._secret_key, + secure=False, ) def _create_bucket(self, name: str, buckets: List[str] = []): for bucket_name in buckets: if name in bucket_name: self.logging.info( - "Bucket {} for {} already exists, skipping.".format(bucket_name, name) + "Bucket {} for {} already exists, skipping.".format( + bucket_name, name + ) ) return bucket_name # minio has limit of bucket name to 16 characters @@ -137,7 +146,9 @@ def clean_bucket(self, bucket: str): ) errors = self.connection.remove_objects(bucket, delete_object_list) for error in errors: - self.logging.error("Error when deleting object from bucket {}: {}!", bucket, error) + self.logging.error( + "Error when deleting object from bucket {}: {}!", bucket, error + ) def correct_name(self, name: str) -> str: return name diff --git a/sebs/regression.py b/sebs/regression.py index 9c7e5bf8..e75b002e 100644 --- a/sebs/regression.py +++ b/sebs/regression.py @@ -40,7 +40,9 @@ def test(self): f"Begin regression test of {benchmark_name} on {deployment_client.name()}, " f"region: {deployment_client.config.region}." ) - experiment_config = self.client.get_experiment_config(self.experiment_config) + experiment_config = self.client.get_experiment_config( + self.experiment_config + ) benchmark = self.client.get_benchmark( benchmark_name, deployment_client, experiment_config ) @@ -72,7 +74,9 @@ def test(self): failure = True print(f"{benchmark_name} fail on trigger: {trigger_type}") else: - print(f"{benchmark_name} success on trigger: {trigger_type}") + print( + f"{benchmark_name} success on trigger: {trigger_type}" + ) except RuntimeError: failure = True print(f"{benchmark_name} fail on trigger: {trigger_type}") @@ -162,7 +166,9 @@ def __init__(self): # no way to directly access test instance from here def status(self, *args, **kwargs): - self.all_correct = self.all_correct and (kwargs["test_status"] in ["inprogress", "success"]) + self.all_correct = self.all_correct and ( + kwargs["test_status"] in ["inprogress", "success"] + ) test_name = kwargs["test_id"].split("_")[-1] if not kwargs["test_status"]: test_id = kwargs["test_id"] @@ -172,7 +178,11 @@ def status(self, *args, **kwargs): elif kwargs["test_status"] == "fail": print("\n-------------\n") print("{0[test_id]}: {0[test_status]}".format(kwargs)) - print("{0[test_id]}: {1}".format(kwargs, self.output[kwargs["test_id"]].decode())) + print( + "{0[test_id]}: {1}".format( + kwargs, self.output[kwargs["test_id"]].decode() + ) + ) print("\n-------------\n") self.failures.add(test_name) elif kwargs["test_status"] == "success": @@ -194,7 +204,9 @@ def regression_suite( suite.addTest(unittest.defaultTestLoader.loadTestsFromTestCase(AWSTestSequence)) if "azure" in providers: assert "azure" in cloud_config - suite.addTest(unittest.defaultTestLoader.loadTestsFromTestCase(AzureTestSequence)) + suite.addTest( + unittest.defaultTestLoader.loadTestsFromTestCase(AzureTestSequence) + ) if "gcp" in providers: assert "gcp" in cloud_config suite.addTest(unittest.defaultTestLoader.loadTestsFromTestCase(GCPTestSequence)) @@ -211,7 +223,9 @@ def regression_suite( print(f"Select test {test_name}") else: print(f"Skip test {test_name}") - concurrent_suite = testtools.ConcurrentStreamTestSuite(lambda: ((test, None) for test in tests)) + concurrent_suite = testtools.ConcurrentStreamTestSuite( + lambda: ((test, None) for test in tests) + ) result = TracingStreamResult() result.startTestRun() concurrent_suite.run(result) @@ -220,7 +234,9 @@ def regression_suite( for suc in result.success: print(f"- {suc}") if len(result.failures): - print(f"Failures when executing {len(result.failures)} out of {len(tests)} functions") + print( + f"Failures when executing {len(result.failures)} out of {len(tests)} functions" + ) for failure in result.failures: print(f"- {failure}") return not result.all_correct diff --git a/sebs/sebs.py b/sebs/sebs.py index e19ae9cd..37edcb09 100644 --- a/sebs/sebs.py +++ b/sebs/sebs.py @@ -35,7 +35,9 @@ def verbose(self) -> bool: def logging_filename(self) -> Optional[str]: return self._logging_filename - def generate_logging_handlers(self, logging_filename: Optional[str] = None) -> LoggingHandlers: + def generate_logging_handlers( + self, logging_filename: Optional[str] = None + ) -> LoggingHandlers: filename = logging_filename if logging_filename else self.logging_filename if filename in self._handlers: return self._handlers[filename] @@ -137,7 +139,9 @@ def get_experiment( } if experiment_type not in implementations: raise RuntimeError(f"Experiment {experiment_type} not supported!") - experiment = implementations[experiment_type](self.get_experiment_config(config)) + experiment = implementations[experiment_type]( + self.get_experiment_config(config) + ) experiment.logging_handlers = self.generate_logging_handlers( logging_filename=logging_filename ) diff --git a/sebs/utils.py b/sebs/utils.py index 16f6dea2..cf3b9cf5 100644 --- a/sebs/utils.py +++ b/sebs/utils.py @@ -98,20 +98,21 @@ def replace_string_in_file(path: str, from_str: str, to_str: str): def connect_to_redis_cache(host: str): - redis = Redis(host=host, - port=6379, - decode_responses=True, - socket_connect_timeout=10) + redis = Redis( + host=host, port=6379, decode_responses=True, socket_connect_timeout=10 + ) redis.ping() return redis -def download_measurements(redis: Redis, workflow_name: str, after: float, **static_args): +def download_measurements( + redis: Redis, workflow_name: str, after: float, **static_args +): payloads = [] for key in redis.scan_iter(match=f"{workflow_name}/*"): - assert key[:len(workflow_name)] == workflow_name + assert key[: len(workflow_name)] == workflow_name payload = redis.get(key) redis.delete(key) @@ -183,7 +184,9 @@ def find_package_code(benchmark: str, path: str): def global_logging(): logging_format = "%(asctime)s,%(msecs)d %(levelname)s %(name)s: %(message)s" logging_date_format = "%H:%M:%S" - logging.basicConfig(format=logging_format, datefmt=logging_date_format, level=logging.INFO) + logging.basicConfig( + format=logging_format, datefmt=logging_date_format, level=logging.INFO + ) class LoggingHandlers: @@ -191,7 +194,9 @@ def __init__(self, verbose: bool = False, filename: Optional[str] = None): logging_format = "%(asctime)s,%(msecs)d %(levelname)s %(name)s: %(message)s" logging_date_format = "%H:%M:%S" formatter = logging.Formatter(logging_format, logging_date_format) - self.handlers: List[Union[logging.FileHandler, logging.StreamHandler[TextIO]]] = [] + self.handlers: List[ + Union[logging.FileHandler, logging.StreamHandler[TextIO]] + ] = [] # Add stdout output if verbose: