Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Minor updates to PR #199 #226

Merged
merged 3 commits into from
Nov 3, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 9 additions & 3 deletions sebs.py
Original file line number Diff line number Diff line change
Expand Up @@ -427,7 +427,7 @@ def start(benchmark, benchmark_input_size, output, deployments, storage_configur
"""

(config, output_dir, logging_filename, sebs_client, deployment_client) = parse_common_params(
ignore_cache=True, update_code=False, update_storage=False,
update_code=False, update_storage=False,
deployment="local", storage_configuration=storage_configuration, **kwargs
)
deployment_client = cast(sebs.local.Local, deployment_client)
Expand Down Expand Up @@ -457,8 +457,6 @@ def start(benchmark, benchmark_input_size, output, deployments, storage_configur
# Otherwise we want to clean up as much as possible
deployment_client.shutdown_storage = False

deployment_client.config.serialize()

result.serialize(output)
sebs_client.logging.info(f"Save results to {os.path.abspath(output)}")

Expand All @@ -475,8 +473,16 @@ def stop(input_json, output_json, **kwargs):
sebs.utils.global_logging()

logging.info(f"Stopping deployment from {os.path.abspath(input_json)}")
(config, output_dir, logging_filename, sebs_client, deployment_client) = parse_common_params(
update_code=False, update_storage=False,
deployment="local", **kwargs
)

deployment_client.res

deployment = sebs.local.Deployment.deserialize(input_json, None)
deployment.shutdown(output_json)

logging.info(f"Stopped deployment from {os.path.abspath(input_json)}")


Expand Down
4 changes: 2 additions & 2 deletions sebs/cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ def typename() -> str:

def load_config(self):
with self._lock:
for cloud in ["azure", "aws", "gcp", "openwhisk"]:
for cloud in ["azure", "aws", "gcp", "openwhisk", "local"]:
cloud_config_file = os.path.join(self.cache_dir, "{}.json".format(cloud))
if os.path.exists(cloud_config_file):
self.cached_config[cloud] = json.load(open(cloud_config_file, "r"))
Expand All @@ -86,7 +86,7 @@ def unlock(self):

def shutdown(self):
if self.config_updated:
for cloud in ["azure", "aws", "gcp", "openwhisk"]:
for cloud in ["azure", "aws", "gcp", "openwhisk", "local"]:
if cloud in self.cached_config:
cloud_config_file = os.path.join(self.cache_dir, "{}.json".format(cloud))
self.logging.info("Update cached config {}".format(cloud_config_file))
Expand Down
69 changes: 40 additions & 29 deletions sebs/local/config.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,9 @@
import json

from typing import cast, Optional
from typing import cast, Optional, Set

from sebs.cache import Cache
from sebs.faas.config import Config, Credentials, Resources
from sebs.storage.minio import MinioConfig
from sebs.utils import serialize, LoggingHandlers
from sebs.utils import LoggingHandlers


class LocalCredentials(Credentials):
Expand All @@ -28,41 +26,59 @@ def __init__(self, storage_cfg: Optional[MinioConfig] = None):
self._path: str = ""
super().__init__(name="local")
self._storage = storage_cfg
self._allocated_ports = set()
self._allocated_ports: Set[int] = set()

@property
def storage_config(self) -> Optional[MinioConfig]:
return self._storage

@property
def path(self) -> str:
return self._path

@property
def allocated_ports(self) -> set:
return self._allocated_ports

def serialize(self) -> dict:
out = {
"allocated_ports": list(self._allocated_ports)
}
out: dict = {}
out["allocated_ports"] = list(self._allocated_ports)
if self._storage is not None:
out["storage"] = self._storage.serialize()
return out

@staticmethod
def initialize(res: Resources, cfg: dict):
pass
def initialize(res: Resources, config: dict):

@staticmethod
def deserialize(config: dict, cache: Cache, handlers: LoggingHandlers) -> Resources:
ret = LocalResources()
ret._path = config["path"]
resources = cast(LocalResources, res)
# Check for new config
if "storage" in config:
ret._storage = MinioConfig.deserialize(config["storage"])
ret.logging.info("Using user-provided configuration of storage for local containers.")
resources._storage = MinioConfig.deserialize(config["storage"])
resources.logging.info(
"Using user-provided configuration of storage for local containers."
)

if "allocated_ports" in config:
ret._allocated_ports = set(config["allocated_ports"])
resources._allocated_ports = set(config["allocated_ports"])

def update_cache(self, cache: Cache):
super().update_cache(cache)
cache.update_config(
val=list(self._allocated_ports), keys=["local", "resources", "allocated_ports"]
)
if self._storage is not None:
self._storage.update_cache(["local", "resources", "storage"], cache)

@staticmethod
def deserialize(config: dict, cache: Cache, handlers: LoggingHandlers) -> Resources:
ret = LocalResources()

cached_config = cache.get_config("local")
# Load cached values
if cached_config and "resources" in cached_config:
LocalResources.initialize(ret, cached_config["resources"])
ret.logging_handlers = handlers
ret.logging.info("Using cached resources for Local")
else:
# Check for new config
ret.logging_handlers = handlers
LocalResources.initialize(ret, config)

return ret

Expand Down Expand Up @@ -104,13 +120,8 @@ def deserialize(config: dict, cache: Cache, handlers: LoggingHandlers) -> Config
return config_obj

def serialize(self) -> dict:
with open(self.resources.path, "r+") as out:
config = json.load(out)
config["deployment"]["local"].update(self.resources.serialize())
out.seek(0)
out.write(serialize(config))

return {}
out = {"name": "local", "region": self._region, "resources": self._resources.serialize()}
return out

def update_cache(self, cache: Cache):
pass
self.resources.update_cache(cache)
13 changes: 5 additions & 8 deletions sebs/local/local.py
Original file line number Diff line number Diff line change
Expand Up @@ -103,12 +103,8 @@ def get_storage(self, replace_existing: bool = False) -> PersistentStorage:
self.storage.replace_existing = replace_existing
return self.storage

"""
Shut down minio storage instance.
"""

def shutdown(self):
pass
super().shutdown()

"""
It would be sufficient to just pack the code and ship it as zip to AWS.
Expand Down Expand Up @@ -197,7 +193,8 @@ def create_function(self, code_package: Benchmark, func_name: str) -> "LocalFunc
# "tty": True,
}

# If SeBS is running on non-linux platforms, container port must be mapped to host port to make it reachable
# If SeBS is running on non-linux platforms,
# container port must be mapped to host port to make it reachable
# Check if the system is NOT Linux or that it is WSL
port = self.DEFAULT_PORT
if not is_linux():
Expand All @@ -215,7 +212,7 @@ def create_function(self, code_package: Benchmark, func_name: str) -> "LocalFunc
port_found = True
self.config.resources.allocated_ports.add(p)
break
except socket.error as e:
except socket.error:
# The port is already in use
continue

Expand All @@ -226,7 +223,7 @@ def create_function(self, code_package: Benchmark, func_name: str) -> "LocalFunc
)

container_kwargs["command"] = f"/bin/bash /sebs/run_server.sh {port}"
container_kwargs["ports"] = {f'{port}/tcp': port}
container_kwargs["ports"] = {f"{port}/tcp": port}

container = self._docker_client.containers.run(**container_kwargs)

Expand Down
1 change: 0 additions & 1 deletion sebs/storage/minio.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
import os
import secrets
import uuid
import platform
from typing import List, Optional, Type, TypeVar

import docker
Expand Down
2 changes: 2 additions & 0 deletions sebs/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -252,10 +252,12 @@ def logging_handlers(self, handlers: LoggingHandlers):
def has_platform(name: str) -> bool:
return os.environ.get(f"SEBS_WITH_{name.upper()}", "False").lower() == "true"


# Check if the system is Linux and that it's not WSL
def is_linux() -> bool:
return platform.system() == "Linux" and "microsoft" not in platform.release().lower()


def catch_interrupt():

import signal
Expand Down