Skip to content

Commit

Permalink
Implement Dynamic Port Mapping for Function Containers for Local Depl…
Browse files Browse the repository at this point in the history
…oyment (#199)

* rebase from master

* implement port mapping for function containers for local deployments on non-linux platforms

* undo accidental push (configuration files)

* use is_linux utility function in sebs/storage/minio.py
  • Loading branch information
Kaleab-git authored Nov 2, 2024
1 parent 655f644 commit df3a54f
Show file tree
Hide file tree
Showing 8 changed files with 115 additions and 38 deletions.
8 changes: 7 additions & 1 deletion sebs.py
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ def parse_common_params(

sebs_client = sebs.SeBS(cache, output_dir, verbose, logging_filename)
output_dir = sebs.utils.create_output(output_dir, preserve_out, verbose)

sebs_client.logging.info("Created experiment output at {}".format(output_dir))

# CLI overrides JSON options
Expand All @@ -140,6 +140,9 @@ def parse_common_params(
update_nested_dict(config_obj, ["experiments", "update_code"], update_code)
update_nested_dict(config_obj, ["experiments", "update_storage"], update_storage)

# set the path the configuration was loaded from
update_nested_dict(config_obj, ["deployment", "local", "path"], config)

if storage_configuration:
cfg = json.load(open(storage_configuration, 'r'))
update_nested_dict(config_obj, ["deployment", deployment, "storage"], cfg)
Expand Down Expand Up @@ -453,6 +456,9 @@ def start(benchmark, benchmark_input_size, output, deployments, storage_configur
# Disable shutdown of storage only after we succed
# Otherwise we want to clean up as much as possible
deployment_client.shutdown_storage = False

deployment_client.config.serialize()

result.serialize(output)
sebs_client.logging.info(f"Save results to {os.path.abspath(output)}")

Expand Down
1 change: 1 addition & 0 deletions sebs/aws/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,7 @@ def http_api(
self.logging.info(f"Using cached HTTP API {api_name}")
return http_api

# FIXME: python3.7+ future annotations
@staticmethod
def initialize(res: Resources, dct: dict):

Expand Down
30 changes: 28 additions & 2 deletions sebs/local/config.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
import json

from typing import cast, Optional

from sebs.cache import Cache
from sebs.faas.config import Config, Credentials, Resources
from sebs.storage.minio import MinioConfig
from sebs.utils import LoggingHandlers
from sebs.utils import serialize, LoggingHandlers


class LocalCredentials(Credentials):
Expand All @@ -23,15 +25,28 @@ def deserialize(config: dict, cache: Cache, handlers: LoggingHandlers) -> Creden

class LocalResources(Resources):
def __init__(self, storage_cfg: Optional[MinioConfig] = None):
self._path: str = ""
super().__init__(name="local")
self._storage = storage_cfg
self._allocated_ports = set()

@property
def storage_config(self) -> Optional[MinioConfig]:
return self._storage

@property
def path(self) -> str:
return self._path

@property
def allocated_ports(self) -> set:
return self._allocated_ports

def serialize(self) -> dict:
return {}
out = {
"allocated_ports": list(self._allocated_ports)
}
return out

@staticmethod
def initialize(res: Resources, cfg: dict):
Expand All @@ -40,10 +55,15 @@ def initialize(res: Resources, cfg: dict):
@staticmethod
def deserialize(config: dict, cache: Cache, handlers: LoggingHandlers) -> Resources:
ret = LocalResources()
ret._path = config["path"]
# Check for new config
if "storage" in config:
ret._storage = MinioConfig.deserialize(config["storage"])
ret.logging.info("Using user-provided configuration of storage for local containers.")

if "allocated_ports" in config:
ret._allocated_ports = set(config["allocated_ports"])

return ret


Expand Down Expand Up @@ -84,6 +104,12 @@ def deserialize(config: dict, cache: Cache, handlers: LoggingHandlers) -> Config
return config_obj

def serialize(self) -> dict:
with open(self.resources.path, "r+") as out:
config = json.load(out)
config["deployment"]["local"].update(self.resources.serialize())
out.seek(0)
out.write(serialize(config))

return {}

def update_cache(self, cache: Cache):
Expand Down
25 changes: 15 additions & 10 deletions sebs/local/function.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import json
from typing import Optional

from sebs.utils import is_linux
from sebs.faas.function import ExecutionResult, Function, FunctionConfig, Trigger


Expand Down Expand Up @@ -53,17 +54,21 @@ def __init__(
self._instance.reload()
networks = self._instance.attrs["NetworkSettings"]["Networks"]
self._port = port
self._url = "{IPAddress}:{Port}".format(
IPAddress=networks["bridge"]["IPAddress"], Port=port
)
if not self._url:
self.logging.error(
f"Couldn't read the IP address of container from attributes "
f"{json.dumps(self._instance.attrs, indent=2)}"
)
raise RuntimeError(
f"Incorrect detection of IP address for container with id {self._instance_id}"

if is_linux():
self._url = "{IPAddress}:{Port}".format(
IPAddress=networks["bridge"]["IPAddress"], Port=port
)
if not self._url:
self.logging.error(
f"Couldn't read the IP address of container from attributes "
f"{json.dumps(self._instance.attrs, indent=2)}"
)
raise RuntimeError(
f"Incorrect detection of IP address for container with id {self._instance_id}"
)
else:
self._url = f"localhost:{port}"

self._measurement_pid = measurement_pid

Expand Down
80 changes: 57 additions & 23 deletions sebs/local/local.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,13 @@
import time
from typing import cast, Dict, List, Optional, Type, Tuple # noqa
import subprocess
import socket

import docker

from sebs.cache import Cache
from sebs.config import SeBSConfig
from sebs.utils import LoggingHandlers
from sebs.utils import LoggingHandlers, is_linux
from sebs.local.config import LocalConfig
from sebs.local.storage import Minio
from sebs.local.function import LocalFunction
Expand Down Expand Up @@ -174,27 +175,60 @@ def create_function(self, code_package: Benchmark, func_name: str) -> "LocalFunc
self.name(), code_package.language_name
),
}
container = self._docker_client.containers.run(
image=container_name,
command=f"/bin/bash /sebs/run_server.sh {self.DEFAULT_PORT}",
volumes={code_package.code_location: {"bind": "/function", "mode": "ro"}},
environment=environment,
# FIXME: make CPUs configurable
# FIXME: configure memory
# FIXME: configure timeout
# cpuset_cpus=cpuset,
# required to access perf counters
# alternative: use custom seccomp profile
privileged=True,
security_opt=["seccomp:unconfined"],
network_mode="bridge",
# somehow removal of containers prevents checkpointing from working?
remove=self.remove_containers,
stdout=True,
stderr=True,
detach=True,
# tty=True,
)

# FIXME: make CPUs configurable
# FIXME: configure memory
# FIXME: configure timeout
# cpuset_cpus=cpuset,
# required to access perf counters
# alternative: use custom seccomp profile
container_kwargs = {
"image": container_name,
"command": f"/bin/bash /sebs/run_server.sh {self.DEFAULT_PORT}",
"volumes": {code_package.code_location: {"bind": "/function", "mode": "ro"}},
"environment": environment,
"privileged": True,
"security_opt": ["seccomp:unconfined"],
"network_mode": "bridge",
"remove": self.remove_containers,
"stdout": True,
"stderr": True,
"detach": True,
# "tty": True,
}

# If SeBS is running on non-linux platforms, container port must be mapped to host port to make it reachable
# Check if the system is NOT Linux or that it is WSL
port = self.DEFAULT_PORT
if not is_linux():
port_found = False
for p in range(self.DEFAULT_PORT, self.DEFAULT_PORT + 1000):
# check no container has been deployed on docker's port p
if p not in self.config.resources.allocated_ports:
# check if port p on the host is free
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
try:
s.bind(("127.0.0.1", p))
# The port is available
port = p
port_found = True
self.config.resources.allocated_ports.add(p)
break
except socket.error as e:
# The port is already in use
continue

if not port_found:
raise RuntimeError(
f"Failed to allocate port for container: No ports available between "
f"{self.DEFAULT_PORT} and {self.DEFAULT_PORT + 999}"
)

container_kwargs["command"] = f"/bin/bash /sebs/run_server.sh {port}"
container_kwargs["ports"] = {f'{port}/tcp': port}

container = self._docker_client.containers.run(**container_kwargs)

pid: Optional[int] = None
if self.measurements_enabled and self._memory_measurement_path is not None:
Expand All @@ -216,7 +250,7 @@ def create_function(self, code_package: Benchmark, func_name: str) -> "LocalFunc
function_cfg = FunctionConfig.from_benchmark(code_package)
func = LocalFunction(
container,
self.DEFAULT_PORT,
port,
func_name,
code_package.benchmark,
code_package.hash,
Expand Down
2 changes: 1 addition & 1 deletion sebs/openwhisk/openwhisk.py
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ def build_base_image(
else:
registry_name = "Docker Hub"

# Check if we the image is already in the registry.
# Check if the image is already in the registry.
# cached package, rebuild not enforced -> check for new one
if is_cached:
if self.find_image(repository_name, image_tag):
Expand Down
3 changes: 2 additions & 1 deletion sebs/storage/minio.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
from sebs.faas.config import Resources
from sebs.faas.storage import PersistentStorage
from sebs.storage.config import MinioConfig
from sebs.utils import is_linux


class Minio(PersistentStorage):
Expand Down Expand Up @@ -108,7 +109,7 @@ def configure_connection(self):
self._storage_container.reload()

# Check if the system is Linux and that it's not WSL
if platform.system() == "Linux" and "microsoft" not in platform.release().lower():
if is_linux():
networks = self._storage_container.attrs["NetworkSettings"]["Networks"]
self._cfg.address = "{IPAddress}:{Port}".format(
IPAddress=networks["bridge"]["IPAddress"], Port=9000
Expand Down
4 changes: 4 additions & 0 deletions sebs/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import uuid
import click
import datetime
import platform

from typing import List, Optional

Expand Down Expand Up @@ -251,6 +252,9 @@ def logging_handlers(self, handlers: LoggingHandlers):
def has_platform(name: str) -> bool:
return os.environ.get(f"SEBS_WITH_{name.upper()}", "False").lower() == "true"

# Check if the system is Linux and that it's not WSL
def is_linux() -> bool:
return platform.system() == "Linux" and "microsoft" not in platform.release().lower()

def catch_interrupt():

Expand Down

0 comments on commit df3a54f

Please sign in to comment.