Skip to content

Commit

Permalink
Clean up azure platform
Browse files Browse the repository at this point in the history
  • Loading branch information
lbrndnr committed Apr 26, 2022
1 parent 25ddf37 commit 2038313
Show file tree
Hide file tree
Showing 4 changed files with 41 additions and 153 deletions.
2 changes: 1 addition & 1 deletion sebs/azure/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from .azure import Azure # noqa
from .function import AzureFunction # noqa
from .function_app import AzureFunction, AzureWorkflow # noqa
from .config import AzureConfig # noqa
from .blob_storage import BlobStorage # noqa
148 changes: 33 additions & 115 deletions sebs/azure/azure.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,15 @@
import io
import shutil
import time
from typing import cast, Dict, List, Optional, Set, Tuple, Type # noqa
from typing import cast, Dict, List, Optional, Set, Tuple, Type, TypeVar # noqa

import docker
import pandas as pd
from azure.storage.blob import BlobServiceClient

from sebs.azure.blob_storage import BlobStorage
from sebs.azure.cli import AzureCLI
from sebs.azure.function import AzureFunction
from sebs.azure.workflow import AzureWorkflow
from sebs.azure.function_app import FunctionApp, AzureFunction, AzureWorkflow
from sebs.azure.config import AzureConfig, AzureResources
from sebs.azure.triggers import AzureTrigger, HTTPTrigger
from sebs.code_package import CodePackage
Expand Down Expand Up @@ -232,32 +231,25 @@ def package_code(self, code_package: CodePackage, directory: str, is_workflow: b
shell=True, cwd=directory)
return directory, code_size

def publish_function(
def publish_benchmark(
self,
function: Function,
benchmark: Benchmark,
code_package: CodePackage,
repeat_on_failure: bool = False,
) -> str:
success = False
url = ""
self.logging.info(
"Attempting publish of function {}".format(function.name))
"Attempting publish of {}".format(benchmark.name))
while not success:
try:
ret = self.cli_instance.execute(
"bash -c 'cd /mnt/function "
"&& func azure functionapp publish {} --{} --no-build'".format(
function.name, self.AZURE_RUNTIMES[code_package.language_name]
benchmark.name, self.AZURE_RUNTIMES[code_package.language_name]
)
)
# ret = self.cli_instance.execute(
# "bash -c 'cd /mnt/function "
# "&& az functionapp deployment source config-zip "
# "--src {}.zip -g {} -n {} --build-remote false '".format(
# code_package.name, resource_group, function.name
# )
# )
# print(ret)

url = ""
for line in ret.split(b"\n"):
line = line.decode("utf-8")
Expand All @@ -277,7 +269,7 @@ def publish_function(
time.sleep(30)
self.logging.info(
"Sleep 30 seconds for Azure to register function app {}".format(
function.name
benchmark.name
)
)
# escape loop. we failed!
Expand All @@ -296,11 +288,11 @@ def publish_function(
:return: URL to reach HTTP-triggered function
"""

def update_function(self, function: Function, code_package: CodePackage):
def update_benchmark(self, benchmark: Benchmark, code_package: CodePackage):

# Mount code package in Docker instance
self._mount_function_code(code_package)
url = self.publish_function(function, code_package, True)
url = self.publish_benchmark(res, code_package, True)

trigger = HTTPTrigger(
url, self.config.resources.data_storage_account(self.cli_instance))
Expand All @@ -326,8 +318,8 @@ def default_benchmark_name(self, code_package: CodePackage) -> str:
)
return func_name

def create_function(self, code_package: CodePackage, func_name: str) -> AzureFunction:

B = TypeVar("B", bound=FunctionApp)
def create_benchmark(self, code_package: CodePackage, name: str, benchmark_cls: B) -> B:
language = code_package.language_name
language_runtime = code_package.language_version
resource_group = self.config.resources.resource_group(
Expand All @@ -336,7 +328,7 @@ def create_function(self, code_package: CodePackage, func_name: str) -> AzureFun

config = {
"resource_group": resource_group,
"func_name": func_name,
"name": name,
"region": region,
"runtime": self.AZURE_RUNTIMES[language],
"runtime_version": language_runtime,
Expand All @@ -349,7 +341,7 @@ def create_function(self, code_package: CodePackage, func_name: str) -> AzureFun
(
" az functionapp config appsettings list "
" --resource-group {resource_group} "
" --name {func_name} "
" --name {name} "
).format(**config)
)
for setting in json.loads(ret.decode()):
Expand All @@ -362,7 +354,7 @@ def create_function(self, code_package: CodePackage, func_name: str) -> AzureFun
account_name, connection_string
)
self.logging.info(
"Azure: Selected {} function app".format(func_name))
"Azure: Selected {} function app".format(name))
except RuntimeError:
function_storage_account = self.config.resources.add_storage_account(
self.cli_instance)
Expand All @@ -373,35 +365,36 @@ def create_function(self, code_package: CodePackage, func_name: str) -> AzureFun
# create function app
self.cli_instance.execute(
(
" az functionapp create --resource-group {resource_group} "
" --os-type Linux --consumption-plan-location {region} "
" az functionapp create --functions-version 3 "
" --resource-group {resource_group} --os-type Linux"
" --consumption-plan-location {region} "
" --runtime {runtime} --runtime-version {runtime_version} "
" --name {func_name} --storage-account {storage_account}"
" --name {name} --storage-account {storage_account}"
).format(**config)
)
self.logging.info(
"Azure: Created function app {}".format(func_name))
"Azure: Created function app {}".format(name))
break
except RuntimeError as e:
# Azure does not allow some concurrent operations
if "another operation is in progress" in str(e):
self.logging.info(
f"Repeat {func_name} creation, another operation in progress"
f"Repeat {name} creation, another operation in progress"
)
# Rethrow -> another error
else:
raise
function = AzureFunction(
name=func_name,
benchmark = benchmark_cls(
name=name,
benchmark=code_package.name,
code_hash=code_package.hash,
function_storage=function_storage_account,
)

# update existing function app
self.update_function(function, code_package)
self.update_benchmark(benchmark, code_package)

return function
return benchmark

def cached_benchmark(self, benchmark: Benchmark):

Expand All @@ -412,92 +405,17 @@ def cached_benchmark(self, benchmark: Benchmark):
azure_trigger.logging_handlers = self.logging_handlers
azure_trigger.data_storage_account = data_storage_account

def create_workflow(self, code_package: CodePackage, workflow_name: str) -> AzureFunction:
language = code_package.language_name
language_runtime = code_package.language_version
resource_group = self.config.resources.resource_group(
self.cli_instance)
region = self.config.region

config = {
"resource_group": resource_group,
"workflow_name": workflow_name,
"region": region,
"runtime": self.AZURE_RUNTIMES[language],
"runtime_version": language_runtime,
}

# check if function does not exist
# no API to verify existence
try:
ret = self.cli_instance.execute(
(
" az functionapp config appsettings list "
" --resource-group {resource_group} "
" --name {workflow_name} "
).format(**config)
)
for setting in json.loads(ret.decode()):
if setting["name"] == "AzureWebJobsStorage":
connection_string = setting["value"]
elems = [z for y in connection_string.split(
";") for z in y.split("=")]
account_name = elems[elems.index("AccountName") + 1]
function_storage_account = AzureResources.Storage.from_cache(
account_name, connection_string
)
self.logging.info(
"Azure: Selected {} function app".format(workflow_name))
except RuntimeError:
function_storage_account = self.config.resources.add_storage_account(
self.cli_instance)
config["storage_account"] = function_storage_account.account_name

# FIXME: only Linux type is supported
while True:
try:
# create function app
self.cli_instance.execute(
(
" az functionapp create --resource-group {resource_group} "
" --os-type Linux --consumption-plan-location {region} "
" --runtime {runtime} --runtime-version {runtime_version} "
" --name {workflow_name} --storage-account {storage_account}"
).format(**config)
)
self.logging.info(
"Azure: Created workflow app {}".format(workflow_name))
break
except RuntimeError as e:
# Azure does not allow some concurrent operations
if "another operation is in progress" in str(e):
self.logging.info(
f"Repeat {workflow_name} creation, another operation in progress"
)
# Rethrow -> another error
else:
raise
workflow = AzureWorkflow(
name=workflow_name,
benchmark=code_package.name,
code_hash=code_package.hash,
function_storage=function_storage_account,
)
def create_function(self, code_package: CodePackage, func_name: str) -> AzureFunction:
return self.create_benchmark(code_package, func_name, AzureFunction)

# update existing function app
self.update_function(workflow, code_package)
def update_function(self, function: Function, code_package: CodePackage):
self.update_benchmark(function, code_package)

return workflow
def create_workflow(self, code_package: CodePackage, workflow_name: str) -> AzureWorkflow:
return self.create_benchmark(code_package, workflow_name, AzureWorkflow)

def update_workflow(self, workflow: Workflow, code_package: CodePackage):
# Mount code package in Docker instance
self._mount_function_code(code_package)
url = self.publish_function(workflow, code_package, True)

trigger = HTTPTrigger(
url, self.config.resources.data_storage_account(self.cli_instance))
trigger.logging_handlers = self.logging_handlers
workflow.add_trigger(trigger)
self.update_benchmark(workflow, code_package)


"""
Expand Down Expand Up @@ -607,7 +525,7 @@ def _enforce_cold_start(self, function: Function, code_package: CodePackage):
f" --settings ForceColdStart={self.cold_start_counter}"
)

self.update_function(function, code_package)
self.update_benchmark(function, code_package)

def enforce_cold_start(self, functions: List[Function], code_package: CodePackage):
self.cold_start_counter += 1
Expand Down
8 changes: 7 additions & 1 deletion sebs/azure/function.py → sebs/azure/function_app.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
from sebs.faas.benchmark import Function


class AzureFunction(Function):
class FunctionApp(Function):
def __init__(
self,
name: str,
Expand Down Expand Up @@ -34,3 +34,9 @@ def deserialize(cached_config: dict) -> Function:
assert trigger_type, "Unknown trigger type {}".format(trigger["type"])
ret.add_trigger(trigger_type.deserialize(trigger))
return ret

class AzureFunction(FunctionApp):
pass

class AzureWorkflow(FunctionApp):
pass
36 changes: 0 additions & 36 deletions sebs/azure/workflow.py

This file was deleted.

0 comments on commit 2038313

Please sign in to comment.