Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Building support for local deployment #142

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions sebs/cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,7 @@ def update_storage(self, deployment: str, benchmark: str, config: dict):
with self._lock:
with open(os.path.join(benchmark_dir, "config.json"), "r") as fp:
cached_config = json.load(fp)
cached_config[deployment] = {}
cached_config[deployment]["storage"] = config
with open(os.path.join(benchmark_dir, "config.json"), "w") as fp:
json.dump(cached_config, fp, indent=2)
Expand Down
40 changes: 20 additions & 20 deletions sebs/experiments/perf_cost.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,21 +46,22 @@ def prepare(self, sebs_client: "SeBS", deployment_client: FaaSSystem):
self._benchmark = sebs_client.get_benchmark(
settings["benchmark"], deployment_client, self.config
)
self._function = deployment_client.get_function(self._benchmark)
self._functions = deployment_client.get_function(self._benchmark, 3)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The allocation of multiple function instances should happen in the deployment.

For example, AWS/GCP/Azure will create instances for us automatically. In Local, you need to allocate them as requested by spawning more Docker containers hosting the function.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Furthermore, the number 3 should not be hardcoded anywhere.

Copy link
Author

@Rajiv2605 Rajiv2605 Mar 28, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry I had done this for testing purpose but missed to remove it before pushing. I will remove it.

# prepare benchmark input
self._storage = deployment_client.get_storage(replace_existing=self.config.update_storage)
self._benchmark_input = self._benchmark.prepare_input(
storage=self._storage, size=settings["input-size"]
)
for i in range(len(self._functions)):
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As mentioned above - we should have one function instance, and the Local instance and its triggers will allocate many Docker containers.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I imagine this seems more complex than anticipated since cloud platforms expose an HTTP trigger, and the local deployment would have a different HTTP address for each instance. We don't want to have to change every experiment - we want to hide this complexity behind the trigger.

The simplest solution would be to implement a new sebs.Local.ScalableHTTPTrigger (or something similar) that would allocate/deallocate container instances, and redirect invocations to the proper HTTPTrigger.


# add HTTP trigger
triggers = self._function.triggers(Trigger.TriggerType.HTTP)
if len(triggers) == 0:
self._trigger = deployment_client.create_trigger(
self._function, Trigger.TriggerType.HTTP
)
else:
self._trigger = triggers[0]
# add HTTP trigger
triggers = self._functions[i].triggers(Trigger.TriggerType.HTTP)
if len(triggers) == 0:
self._trigger = deployment_client.create_trigger(
self._functions[i], Trigger.TriggerType.HTTP
)
else:
self._trigger = triggers[0]

self._out_dir = os.path.join(sebs_client.output_dir, "perf-cost")
if not os.path.exists(self._out_dir):
Expand All @@ -77,15 +78,15 @@ def run(self):
if len(memory_sizes) == 0:
self.logging.info("Begin experiment")
self.run_configuration(settings, settings["repetitions"])
for memory in memory_sizes:
self.logging.info(f"Begin experiment on memory size {memory}")
self._function.memory = memory
self._deployment_client.update_function(self._function, self._benchmark)
self._sebs_client.cache_client.update_function(self._function)
self.run_configuration(settings, settings["repetitions"], suffix=str(memory))
for i in range(len(self._functions)):
for memory in memory_sizes:
self.logging.info(f"Begin experiment on memory size {memory}")
self._functions[i].memory = memory
self._deployment_client.update_function(self._functions[i], self._benchmark)
self._sebs_client.cache_client.update_function(self._functions[i])
self.run_configuration(settings, settings["repetitions"], suffix=str(memory))

def compute_statistics(self, times: List[float]):

mean, median, std, cv = basic_stats(times)
self.logging.info(f"Mean {mean} [ms], median {median} [ms], std {std}, CV {cv}")
for alpha in [0.95, 0.99]:
Expand Down Expand Up @@ -154,9 +155,8 @@ def _run_configuration(

if run_type == PerfCost.RunType.COLD or run_type == PerfCost.RunType.BURST:
self._deployment_client.enforce_cold_start(
[self._function], self._benchmark
self._functions, self._benchmark
)

time.sleep(5)

results = []
Expand All @@ -179,7 +179,8 @@ def _run_configuration(
elif run_type == PerfCost.RunType.WARM and ret.stats.cold_start:
self.logging.info(f"Invocation {ret.request_id} is cold!")
else:
result.add_invocation(self._function, ret)
for i in range(len(self._functions)):
result.add_invocation(self._functions[i], ret)
colds_count += ret.stats.cold_start
client_times.append(ret.times.client / 1000.0)
samples_gathered += 1
Expand Down Expand Up @@ -224,7 +225,6 @@ def _run_configuration(
)

def run_configuration(self, settings: dict, repetitions: int, suffix: str = ""):

for experiment_type in settings["experiments"]:
if experiment_type == "cold":
self._run_configuration(
Expand Down
27 changes: 15 additions & 12 deletions sebs/faas/system.py
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ def package_code(
pass

@abstractmethod
def create_function(self, code_package: Benchmark, func_name: str) -> Function:
def create_function(self, code_package: Benchmark, func_name: str, num: int) -> Function:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We shouldn't change this API

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I had added the num parameter to know how many containers to dispatch.

pass

@abstractmethod
Expand All @@ -139,7 +139,7 @@ def update_function(self, function: Function, code_package: Benchmark):

"""

def get_function(self, code_package: Benchmark, func_name: Optional[str] = None) -> Function:
def get_function(self, code_package: Benchmark, num: int, func_name: Optional[str] = None) -> List[Function]:

if code_package.language_version not in self.system_config.supported_language_versions(
self.name(), code_package.language_name
Expand Down Expand Up @@ -171,15 +171,16 @@ def get_function(self, code_package: Benchmark, func_name: Optional[str] = None)
else "function {} not found in cache.".format(func_name)
)
self.logging.info("Creating new function! Reason: " + msg)
function = self.create_function(code_package, func_name)
self.cache_client.add_function(
deployment_name=self.name(),
language_name=code_package.language_name,
code_package=code_package,
function=function,
)
code_package.query_cache()
return function
function_list = self.create_function(code_package, func_name, num)
for function in function_list:
self.cache_client.add_function(
deployment_name=self.name(),
language_name=code_package.language_name,
code_package=code_package,
function=function,
)
code_package.query_cache()
return function_list
else:
# retrieve function
cached_function = functions[func_name]
Expand Down Expand Up @@ -221,7 +222,9 @@ def get_function(self, code_package: Benchmark, func_name: Optional[str] = None)
code_package.query_cache()
else:
self.logging.info(f"Cached function {func_name} is up to date.")
return function
function_list = []
function_list.append(function)
return function_list

@abstractmethod
def update_function_configuration(self, cached_function: Function, benchmark: Benchmark):
Expand Down
1 change: 1 addition & 0 deletions sebs/local/function.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@ def deserialize(cached_config: dict) -> "LocalFunction":
)
except docker.errors.NotFound:
raise RuntimeError(f"Cached container {instance_id} not available anymore!")
# clear cache
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FIXME?


def stop(self):
self.logging.info(f"Stopping function container {self._instance_id}")
Expand Down
134 changes: 73 additions & 61 deletions sebs/local/local.py
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,8 @@ def get_storage(self, replace_existing: bool = False) -> PersistentStorage:
"""

def shutdown(self):
pass
if hasattr(self, "storage") and self.config.shutdownStorage:
self.storage.stop()

"""
It would be sufficient to just pack the code and ship it as zip to AWS.
Expand Down Expand Up @@ -151,8 +152,8 @@ def package_code(

return directory, bytes_size

def create_function(self, code_package: Benchmark, func_name: str) -> "LocalFunction":

def create_function(self, code_package: Benchmark, func_name: str, num: int) -> "LocalFunction":
func_list = []
container_name = "{}:run.local.{}.{}".format(
self._system_config.docker_repository(),
code_package.language_name,
Expand All @@ -170,66 +171,72 @@ def create_function(self, code_package: Benchmark, func_name: str) -> "LocalFunc
self.name(), code_package.language_name
),
}
container = self._docker_client.containers.run(
image=container_name,
command=f"/bin/bash /sebs/run_server.sh {self.DEFAULT_PORT}",
volumes={code_package.code_location: {"bind": "/function", "mode": "ro"}},
environment=environment,
# FIXME: make CPUs configurable
# FIXME: configure memory
# FIXME: configure timeout
# cpuset_cpus=cpuset,
# required to access perf counters
# alternative: use custom seccomp profile
privileged=True,
security_opt=["seccomp:unconfined"],
network_mode="bridge",
# somehow removal of containers prevents checkpointing from working?
remove=self.remove_containers,
stdout=True,
stderr=True,
detach=True,
# tty=True,
)

pid: Optional[int] = None
if self.measurements_enabled and self._memory_measurement_path is not None:
# launch subprocess to measure memory
proc = subprocess.Popen(
[
"python3",
"./sebs/local/measureMem.py",
"--container-id",
container.id,
"--measure-interval",
str(self._measure_interval),
"--measurement-file",
self._memory_measurement_path,
]
for i in range(num):
container = self._docker_client.containers.run(
image=container_name,
name=func_name+f"_{i}",
command=f"/bin/bash /sebs/run_server.sh {self.DEFAULT_PORT}",
volumes={code_package.code_location: {"bind": "/function", "mode": "ro"}},
environment=environment,
# FIXME: make CPUs configurable
# FIXME: configure memory
# FIXME: configure timeout
# cpuset_cpus=cpuset,
# required to access perf counters
# alternative: use custom seccomp profile
privileged=True,
security_opt=["seccomp:unconfined"],
network_mode="bridge",
# somehow removal of containers prevents checkpointing from working?
remove=self.remove_containers,
stdout=True,
stderr=True,
detach=True,
# tty=True,
)
pid = proc.pid

function_cfg = FunctionConfig.from_benchmark(code_package)
func = LocalFunction(
container,
self.DEFAULT_PORT,
func_name,
code_package.benchmark,
code_package.hash,
function_cfg,
pid,
)
self.logging.info(
f"Started {func_name} function at container {container.id} , running on {func._url}"
)
return func

"""
FIXME: restart Docker?
"""
pid: Optional[int] = None
if self.measurements_enabled and self._memory_measurement_path is not None:
# launch subprocess to measure memory
proc = subprocess.Popen(
[
"python3",
"./sebs/local/measureMem.py",
"--container-id",
container.id,
"--measure-interval",
str(self._measure_interval),
"--measurement-file",
self._memory_measurement_path,
]
)
pid = proc.pid

function_cfg = FunctionConfig.from_benchmark(code_package)
func = LocalFunction(
container,
self.DEFAULT_PORT,
func_name,
code_package.benchmark,
code_package.hash,
function_cfg,
pid,
)
func_list.append(func)
self.logging.info(
f"Started {func_name} function at container {container.id} , running on {func._url}"
)
return func_list

def update_function(self, function: Function, code_package: Benchmark):
pass
# kill existing containers
count = 0
for ctr in self._docker_client.containers.list():
if ctr.name in function.name:
count += 1
ctr.kill()
# deploy new containers with updated function
self.create_function(code_package, function.name, count)

"""
For local functions, we don't need to do anything for a cached function.
Expand All @@ -251,7 +258,8 @@ def create_trigger(self, func: Function, trigger_type: Trigger.TriggerType) -> T
return trigger

def cached_function(self, function: Function):
pass
for trigger in function.triggers(Trigger.TriggerType.LIBRARY):
trigger.logging_handlers = self.logging_handlers

def update_function_configuration(self, function: Function, code_package: Benchmark):
self.logging.error("Updating function configuration of local deployment is not supported")
Expand All @@ -268,7 +276,11 @@ def download_metrics(
pass

def enforce_cold_start(self, functions: List[Function], code_package: Benchmark):
raise NotImplementedError()
fn_names = [fn.name for fn in functions]
for ctr in self._docker_client.containers.list():
for i in range(len(fn_names)):
if ctr.name in fn_names[i]:
ctr.kill()

@staticmethod
def default_function_name(code_package: Benchmark) -> str:
Expand Down