From 8872fc6663854e426cd1d3c9ccab6cae7e55d020 Mon Sep 17 00:00:00 2001 From: MarioRobres Date: Thu, 30 May 2024 13:19:48 +0200 Subject: [PATCH 01/27] F OpenNebula/one-aiops#70: opennebula backend basic skeleton --- lithops/constants.py | 6 +- lithops/serverless/backends/one/__init__.py | 3 + lithops/serverless/backends/one/config.py | 93 +++++++++++++++++++ lithops/serverless/backends/one/one.py | 49 ++++++++++ .../serverless/backends/one/one_config.yaml | 9 ++ 5 files changed, 158 insertions(+), 2 deletions(-) create mode 100644 lithops/serverless/backends/one/__init__.py create mode 100644 lithops/serverless/backends/one/config.py create mode 100644 lithops/serverless/backends/one/one.py create mode 100644 lithops/serverless/backends/one/one_config.yaml diff --git a/lithops/constants.py b/lithops/constants.py index 0009c90ae..da1807da9 100644 --- a/lithops/constants.py +++ b/lithops/constants.py @@ -110,7 +110,8 @@ 'azure_containers', 'aliyun_fc', 'oracle_f', - 'k8s' + 'k8s', + 'one' ] STANDALONE_BACKENDS = [ @@ -140,6 +141,7 @@ 'azure_vms', 'aws_batch', 'k8s', - 'code_engine' + 'one', + 'code_engine', 'vm' ] diff --git a/lithops/serverless/backends/one/__init__.py b/lithops/serverless/backends/one/__init__.py new file mode 100644 index 000000000..3d05453b1 --- /dev/null +++ b/lithops/serverless/backends/one/__init__.py @@ -0,0 +1,3 @@ +from .one import OpenNebula as ServerlessBackend + +__all__ = ['ServerlessBackend'] diff --git a/lithops/serverless/backends/one/config.py b/lithops/serverless/backends/one/config.py new file mode 100644 index 000000000..58fd17880 --- /dev/null +++ b/lithops/serverless/backends/one/config.py @@ -0,0 +1,93 @@ +import os +import json + +from lithops.serverless.backends.k8s.config import ( + DEFAULT_CONFIG_KEYS, + DEFAULT_GROUP, + DEFAULT_VERSION, + MASTER_NAME, + MASTER_PORT, + DOCKERFILE_DEFAULT, + JOB_DEFAULT, + POD, + load_config as original_load_config +) + +DEFAULT_ONEKE_CONFIG = """ +{ + "name": "OneKE/1", + "networks_values": [ + {"Public": {"id": "0"}}, + {"Private": {"id": "1"}} + ], + "custom_attrs_values": { + "ONEAPP_VROUTER_ETH0_VIP0": "", + "ONEAPP_VROUTER_ETH1_VIP0": "", + + "ONEAPP_RKE2_SUPERVISOR_EP": "ep0.eth0.vr:9345", + "ONEAPP_K8S_CONTROL_PLANE_EP": "ep0.eth0.vr:6443", + "ONEAPP_K8S_EXTRA_SANS": "localhost,127.0.0.1,ep0.eth0.vr,${vnf.TEMPLATE.CONTEXT.ETH0_IP},k8s.yourdomain.it", + + "ONEAPP_K8S_MULTUS_ENABLED": "NO", + "ONEAPP_K8S_MULTUS_CONFIG": "", + "ONEAPP_K8S_CNI_PLUGIN": "cilium", + "ONEAPP_K8S_CNI_CONFIG": "", + "ONEAPP_K8S_CILIUM_RANGE": "", + + "ONEAPP_K8S_METALLB_ENABLED": "NO", + "ONEAPP_K8S_METALLB_CONFIG": "", + "ONEAPP_K8S_METALLB_RANGE": "", + + "ONEAPP_K8S_LONGHORN_ENABLED": "YES", + "ONEAPP_STORAGE_DEVICE": "/dev/vdb", + "ONEAPP_STORAGE_FILESYSTEM": "xfs", + + "ONEAPP_K8S_TRAEFIK_ENABLED": "YES", + "ONEAPP_VNF_HAPROXY_INTERFACES": "eth0", + "ONEAPP_VNF_HAPROXY_REFRESH_RATE": "30", + "ONEAPP_VNF_HAPROXY_LB0_PORT": "9345", + "ONEAPP_VNF_HAPROXY_LB1_PORT": "6443", + "ONEAPP_VNF_HAPROXY_LB2_PORT": "443", + "ONEAPP_VNF_HAPROXY_LB3_PORT": "80", + + "ONEAPP_VNF_DNS_ENABLED": "YES", + "ONEAPP_VNF_DNS_INTERFACES": "eth1", + "ONEAPP_VNF_DNS_NAMESERVERS": "1.1.1.1,8.8.8.8", + "ONEAPP_VNF_NAT4_ENABLED": "YES", + "ONEAPP_VNF_NAT4_INTERFACES_OUT": "eth0", + "ONEAPP_VNF_ROUTER4_ENABLED": "YES", + "ONEAPP_VNF_ROUTER4_INTERFACES": "eth0,eth1" + } +} +""" + +DEFAULT_PRIVATE_VNET = """ +NAME = "private-oneke" +VN_MAD = "bridge" +AUTOMATIC_VLAN_ID = "YES" +AR = [TYPE = "IP4", IP = "192.168.150.0", SIZE = "51"] +""" + +DEFAULT_CONFIG_KEYS = { + 'public_vnet_id': -1, + 'private_vnet_id': -1, + 'oneke_config': DEFAULT_ONEKE_CONFIG, + #TODO: Add kube.config file +} + +FH_ZIP_LOCATION = os.path.join(os.getcwd(), 'lithops_one.zip') + +# Overwrite default Dockerfile +DOCKERFILE_DEFAULT = "\n".join(DOCKERFILE_DEFAULT.split('\n')[:-2]) + """ +COPY lithops_one.zip . +RUN unzip lithops_one.zip && rm lithops_one.zip +""" + +def load_config(config_data): + if 'oneke_config' in config_data: + try: + with open(config_data['oneke_config'], 'r') as f: + config_data['oneke_config'] = json.load(f) + except (IOError, json.JSONDecodeError) as err: + raise Exception(f"Error reading OneKE config file: {err}") + original_load_config(config_data) \ No newline at end of file diff --git a/lithops/serverless/backends/one/one.py b/lithops/serverless/backends/one/one.py new file mode 100644 index 000000000..469dd7749 --- /dev/null +++ b/lithops/serverless/backends/one/one.py @@ -0,0 +1,49 @@ +from ..k8s.k8s import KubernetesBackend + +class OpenNebula(KubernetesBackend): + """ + A wrap-up around OpenNebula backend. + """ + def __init__(self, one_config, internal_storage): + # TODO: check One_KE is deployed + # (if not) initialize OpenNebula One_KE & wait for it to be ready + + # Overwrite config values + self.name = 'one' + + super().__init__(one_config, internal_storage) + + + def invoke(self, docker_image_name, runtime_memory, job_payload): + super().invoke(docker_image_name, runtime_memory, job_payload) + + + def clear(self, job_keys=None): + # First, we clean Kubernetes jobs + super().clear(all) + + # TODO: if all are deteleted -> suspend OneKE VMs (scale down) and + # delete them after X minutes + pass + + + def _check_oneke(self): + # CASE1: client has created their own OneKE cluster + # CASE2: OneKE cluster was created by lithops (with or without JSON file) + pass + + + def _instantiate_oneke(self): + # TODO: check OneKE JSON is passed (if not use default) + + # TODO: check networks (public/private vnets) + + # TODO: instantiate OneKE + pass + + + def _wait_for_oneke(self): + # TODO: wait for all the VMs + + # TODO: look onegate connectivity + pass \ No newline at end of file diff --git a/lithops/serverless/backends/one/one_config.yaml b/lithops/serverless/backends/one/one_config.yaml new file mode 100644 index 000000000..eb930f25c --- /dev/null +++ b/lithops/serverless/backends/one/one_config.yaml @@ -0,0 +1,9 @@ +lithops: + backend: k8s +one: + # PATH to OneKE JSON config + oneke_config: + # ID for Public vnet (if not passed: service default network or create a new one) + public_vnet_id: + # ID for Private vnet (if not passed: create a new one) + private_vnet_id: \ No newline at end of file From 9cd053f0e3234d6fa54a8540fba3789feaa114c3 Mon Sep 17 00:00:00 2001 From: MarioRobres Date: Thu, 30 May 2024 13:36:54 +0200 Subject: [PATCH 02/27] F OpenNebula/one-aiops#70: basic opennebula backend skeleton --- lithops/serverless/backends/one/config.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lithops/serverless/backends/one/config.py b/lithops/serverless/backends/one/config.py index 58fd17880..0b6c291ce 100644 --- a/lithops/serverless/backends/one/config.py +++ b/lithops/serverless/backends/one/config.py @@ -72,7 +72,7 @@ 'public_vnet_id': -1, 'private_vnet_id': -1, 'oneke_config': DEFAULT_ONEKE_CONFIG, - #TODO: Add kube.config file + # TODO: Add kube.config file } FH_ZIP_LOCATION = os.path.join(os.getcwd(), 'lithops_one.zip') From 5ca5fac8f2b234009705224bd21ef87028d6d289 Mon Sep 17 00:00:00 2001 From: MarioRobres Date: Tue, 4 Jun 2024 11:38:44 +0200 Subject: [PATCH 03/27] F OpenNebula/one-aiops#70: refactor skeleton --- lithops/serverless/backends/one/config.py | 59 +++++++++++++------ .../serverless/backends/one/one_config.yaml | 14 +++-- 2 files changed, 48 insertions(+), 25 deletions(-) diff --git a/lithops/serverless/backends/one/config.py b/lithops/serverless/backends/one/config.py index 0b6c291ce..9c52fe4dd 100644 --- a/lithops/serverless/backends/one/config.py +++ b/lithops/serverless/backends/one/config.py @@ -1,18 +1,17 @@ import os import json + from lithops.serverless.backends.k8s.config import ( - DEFAULT_CONFIG_KEYS, - DEFAULT_GROUP, - DEFAULT_VERSION, - MASTER_NAME, - MASTER_PORT, DOCKERFILE_DEFAULT, - JOB_DEFAULT, - POD, load_config as original_load_config ) + +class OneConfigError(Exception): + pass + + DEFAULT_ONEKE_CONFIG = """ { "name": "OneKE/1", @@ -61,6 +60,13 @@ } """ + +MANDATORY_CONFIG_KEYS = { + "public_network_id", + "private_network_id" +} + + DEFAULT_PRIVATE_VNET = """ NAME = "private-oneke" VN_MAD = "bridge" @@ -68,26 +74,41 @@ AR = [TYPE = "IP4", IP = "192.168.150.0", SIZE = "51"] """ -DEFAULT_CONFIG_KEYS = { - 'public_vnet_id': -1, - 'private_vnet_id': -1, - 'oneke_config': DEFAULT_ONEKE_CONFIG, - # TODO: Add kube.config file -} FH_ZIP_LOCATION = os.path.join(os.getcwd(), 'lithops_one.zip') + # Overwrite default Dockerfile DOCKERFILE_DEFAULT = "\n".join(DOCKERFILE_DEFAULT.split('\n')[:-2]) + """ COPY lithops_one.zip . RUN unzip lithops_one.zip && rm lithops_one.zip """ + def load_config(config_data): - if 'oneke_config' in config_data: - try: - with open(config_data['oneke_config'], 'r') as f: - config_data['oneke_config'] = json.load(f) - except (IOError, json.JSONDecodeError) as err: - raise Exception(f"Error reading OneKE config file: {err}") + if 'oneke_config' in config_data['one']: + oneke_config = config_data['one']['oneke_config'] + + # Validate mandatory params + for key in MANDATORY_CONFIG_KEYS: + if key not in oneke_config: + raise OneConfigError(f"'{key}' is missing in 'oneke_config'") + public_network_id = oneke_config['public_network_id'] + private_network_id = oneke_config['private_network_id'] + + # Optional params: name + name = oneke_config.get('name', 'OneKE for lithops') + + oneke_update = { + "name": name, + "networks_values": [ + {"Public": {"id": str(public_network_id)}}, + {"Private": {"id": str(private_network_id)}} + ] + } + + # Override oneke_config with valid JSON to update the service + config_data['one']['oneke_config'] = json.dumps(oneke_update) + + # Load k8s default config original_load_config(config_data) \ No newline at end of file diff --git a/lithops/serverless/backends/one/one_config.yaml b/lithops/serverless/backends/one/one_config.yaml index eb930f25c..beccd57a8 100644 --- a/lithops/serverless/backends/one/one_config.yaml +++ b/lithops/serverless/backends/one/one_config.yaml @@ -1,9 +1,11 @@ lithops: - backend: k8s + backend: one one: - # PATH to OneKE JSON config + service_id: # oneke service id (means OneKE is already deployed) + service_template_id: # oneke_tempalte_id (client has downloaded before) + oneconfig_path: # PATH to OneKE JSON config + oneke_config: - # ID for Public vnet (if not passed: service default network or create a new one) - public_vnet_id: - # ID for Private vnet (if not passed: create a new one) - private_vnet_id: \ No newline at end of file + public_vnet_id: # ID for Public vnet + private_vnet_id: # ID for Private vnet (if not passed: create a new one) + delete: # if set to True, OneKE VMs will be deleted after all the jobs are finished \ No newline at end of file From 7d2885cb08ee508a84c19a4d6a06bf924fda0959 Mon Sep 17 00:00:00 2001 From: MarioRobres Date: Wed, 5 Jun 2024 13:11:55 +0200 Subject: [PATCH 04/27] F OpenNebula/one-aiops#70: basic initialization --- lithops/serverless/backends/one/one.py | 42 ++++++++++++++++++++------ 1 file changed, 32 insertions(+), 10 deletions(-) diff --git a/lithops/serverless/backends/one/one.py b/lithops/serverless/backends/one/one.py index 469dd7749..5b72bcab5 100644 --- a/lithops/serverless/backends/one/one.py +++ b/lithops/serverless/backends/one/one.py @@ -1,12 +1,35 @@ from ..k8s.k8s import KubernetesBackend +import logging +import urllib3 +import oneflow + +logger = logging.getLogger(__name__) +urllib3.disable_warnings() + + +class OneError(Exception): + pass + + class OpenNebula(KubernetesBackend): """ A wrap-up around OpenNebula backend. """ def __init__(self, one_config, internal_storage): - # TODO: check One_KE is deployed - # (if not) initialize OpenNebula One_KE & wait for it to be ready + logger.debug("Initializing OpenNebula backend") + self.client = oneflow.OneFlowClient() + + # template_id: instantiate OneKE + if 'template_id' in one_config: + service_id = self._instantiate_oneke(one_config['template_id'], one_config['oneke_config']) + self._wait_for_oneke(service_id) + # service_id: check deployed OneKE is available + elif 'service_id' in one_config: + self._check_oneke(one_config['service_id']) + else: + raise OneError(f"OpenNebula backend must contain 'template_id' or 'service_id'") + # Overwrite config values self.name = 'one' @@ -27,22 +50,21 @@ def clear(self, job_keys=None): pass - def _check_oneke(self): + def _check_oneke(self, service_id): # CASE1: client has created their own OneKE cluster # CASE2: OneKE cluster was created by lithops (with or without JSON file) pass - def _instantiate_oneke(self): - # TODO: check OneKE JSON is passed (if not use default) - - # TODO: check networks (public/private vnets) - - # TODO: instantiate OneKE + def _instantiate_oneke(self, template_id, oneke_config): + # TODO: create private network if not passed + _json = self.client.templatepool[template_id].instantiate(oneke_config) + logger.info("JSON: {}".format(_json)) + # Get service_id from JSON pass - def _wait_for_oneke(self): + def _wait_for_oneke(self, service_id): # TODO: wait for all the VMs # TODO: look onegate connectivity From e4848cd81bfcf6f8aecb19ecb42ba036402e306c Mon Sep 17 00:00:00 2001 From: MarioRobres Date: Thu, 6 Jun 2024 10:32:50 +0200 Subject: [PATCH 05/27] F OpenNebula/one-aiops#70: add optionals parameters to instantiate OneKE --- lithops/serverless/backends/one/config.py | 53 +++++++++++++++++-- lithops/serverless/backends/one/one.py | 20 ++++++- .../serverless/backends/one/one_config.yaml | 4 +- 3 files changed, 68 insertions(+), 9 deletions(-) diff --git a/lithops/serverless/backends/one/config.py b/lithops/serverless/backends/one/config.py index 9c52fe4dd..d73d7f5c1 100644 --- a/lithops/serverless/backends/one/config.py +++ b/lithops/serverless/backends/one/config.py @@ -3,6 +3,7 @@ from lithops.serverless.backends.k8s.config import ( + DEFAULT_CONFIG_KEYS, DOCKERFILE_DEFAULT, load_config as original_load_config ) @@ -67,6 +68,40 @@ class OneConfigError(Exception): } +OPTIONAL_CONFIG_KEYS = { + "ONEAPP_VROUTER_ETH0_VIP0": "", + "ONEAPP_VROUTER_ETH1_VIP0": "", + "ONEAPP_RKE2_SUPERVISOR_EP": "ep0.eth0.vr:9345", + "ONEAPP_K8S_CONTROL_PLANE_EP": "ep0.eth0.vr:6443", + "ONEAPP_K8S_EXTRA_SANS": "localhost,127.0.0.1,ep0.eth0.vr,${vnf.TEMPLATE.CONTEXT.ETH0_IP},k8s.yourdomain.it", + "ONEAPP_K8S_MULTUS_ENABLED": "NO", + "ONEAPP_K8S_MULTUS_CONFIG": "", + "ONEAPP_K8S_CNI_PLUGIN": "cilium", + "ONEAPP_K8S_CNI_CONFIG": "", + "ONEAPP_K8S_CILIUM_RANGE": "", + "ONEAPP_K8S_METALLB_ENABLED": "NO", + "ONEAPP_K8S_METALLB_CONFIG": "", + "ONEAPP_K8S_METALLB_RANGE": "", + "ONEAPP_K8S_LONGHORN_ENABLED": "YES", + "ONEAPP_STORAGE_DEVICE": "/dev/vdb", + "ONEAPP_STORAGE_FILESYSTEM": "xfs", + "ONEAPP_K8S_TRAEFIK_ENABLED": "YES", + "ONEAPP_VNF_HAPROXY_INTERFACES": "eth0", + "ONEAPP_VNF_HAPROXY_REFRESH_RATE": "30", + "ONEAPP_VNF_HAPROXY_LB0_PORT": "9345", + "ONEAPP_VNF_HAPROXY_LB1_PORT": "6443", + "ONEAPP_VNF_HAPROXY_LB2_PORT": "443", + "ONEAPP_VNF_HAPROXY_LB3_PORT": "80", + "ONEAPP_VNF_DNS_ENABLED": "YES", + "ONEAPP_VNF_DNS_INTERFACES": "eth1", + "ONEAPP_VNF_DNS_NAMESERVERS": "1.1.1.1,8.8.8.8", + "ONEAPP_VNF_NAT4_ENABLED": "YES", + "ONEAPP_VNF_NAT4_INTERFACES_OUT": "eth0", + "ONEAPP_VNF_ROUTER4_ENABLED": "YES", + "ONEAPP_VNF_ROUTER4_INTERFACES": "eth0,eth1" +} + + DEFAULT_PRIVATE_VNET = """ NAME = "private-oneke" VN_MAD = "bridge" @@ -88,27 +123,35 @@ class OneConfigError(Exception): def load_config(config_data): if 'oneke_config' in config_data['one']: oneke_config = config_data['one']['oneke_config'] - + # Validate mandatory params for key in MANDATORY_CONFIG_KEYS: if key not in oneke_config: raise OneConfigError(f"'{key}' is missing in 'oneke_config'") public_network_id = oneke_config['public_network_id'] private_network_id = oneke_config['private_network_id'] - - # Optional params: name + + # Optional params name = oneke_config.get('name', 'OneKE for lithops') + custom_attrs_values = {key: oneke_config.get(key, default_value) + for key, default_value in OPTIONAL_CONFIG_KEYS.items()} oneke_update = { "name": name, "networks_values": [ {"Public": {"id": str(public_network_id)}}, {"Private": {"id": str(private_network_id)}} - ] + ], + "custom_attrs_values": custom_attrs_values } - # Override oneke_config with valid JSON to update the service + # Override oneke_config with a valid JSON to update the service config_data['one']['oneke_config'] = json.dumps(oneke_update) + # TODO: change me + for key in DEFAULT_CONFIG_KEYS: + if key not in config_data['one']: + config_data['one'][key] = DEFAULT_CONFIG_KEYS[key] + # Load k8s default config original_load_config(config_data) \ No newline at end of file diff --git a/lithops/serverless/backends/one/one.py b/lithops/serverless/backends/one/one.py index 5b72bcab5..293f2c434 100644 --- a/lithops/serverless/backends/one/one.py +++ b/lithops/serverless/backends/one/one.py @@ -1,5 +1,7 @@ from ..k8s.k8s import KubernetesBackend +import os +import json import logging import urllib3 import oneflow @@ -22,6 +24,7 @@ def __init__(self, one_config, internal_storage): # template_id: instantiate OneKE if 'template_id' in one_config: + logger.info(one_config['template_id']) service_id = self._instantiate_oneke(one_config['template_id'], one_config['oneke_config']) self._wait_for_oneke(service_id) # service_id: check deployed OneKE is available @@ -58,9 +61,22 @@ def _check_oneke(self, service_id): def _instantiate_oneke(self, template_id, oneke_config): # TODO: create private network if not passed - _json = self.client.templatepool[template_id].instantiate(oneke_config) - logger.info("JSON: {}".format(_json)) + logger.info(oneke_config) + + #tmp_file_path = '/tmp/oneke_config.json' + #with open(tmp_file_path, 'w') as f: + #json.dump(oneke_config, f) + + # Pass the temporary file path to the update() function + _json = self.client.templatepool[template_id].instantiate(json_str=oneke_config) + + # Remove the temporary file after use + #os.remove(tmp_file_path) + + # Get service_id from JSON + logger.info("JSON: {}".format(_json)) + pass diff --git a/lithops/serverless/backends/one/one_config.yaml b/lithops/serverless/backends/one/one_config.yaml index beccd57a8..4124fb532 100644 --- a/lithops/serverless/backends/one/one_config.yaml +++ b/lithops/serverless/backends/one/one_config.yaml @@ -6,6 +6,6 @@ one: oneconfig_path: # PATH to OneKE JSON config oneke_config: - public_vnet_id: # ID for Public vnet - private_vnet_id: # ID for Private vnet (if not passed: create a new one) + public_network_id: # ID for Public vnet + private_network_id: # ID for Private vnet (if not passed: create a new one) delete: # if set to True, OneKE VMs will be deleted after all the jobs are finished \ No newline at end of file From 007005060364b18dc17201566766579767e5fcff Mon Sep 17 00:00:00 2001 From: MarioRobres Date: Thu, 6 Jun 2024 11:09:18 +0200 Subject: [PATCH 06/27] F OpenNebula/one-aiops#70: instantiate OneKE basic version --- lithops/serverless/backends/one/one.py | 18 +++++------------- 1 file changed, 5 insertions(+), 13 deletions(-) diff --git a/lithops/serverless/backends/one/one.py b/lithops/serverless/backends/one/one.py index 293f2c434..990bc52b4 100644 --- a/lithops/serverless/backends/one/one.py +++ b/lithops/serverless/backends/one/one.py @@ -61,23 +61,15 @@ def _check_oneke(self, service_id): def _instantiate_oneke(self, template_id, oneke_config): # TODO: create private network if not passed - logger.info(oneke_config) - - #tmp_file_path = '/tmp/oneke_config.json' - #with open(tmp_file_path, 'w') as f: - #json.dump(oneke_config, f) # Pass the temporary file path to the update() function - _json = self.client.templatepool[template_id].instantiate(json_str=oneke_config) - - # Remove the temporary file after use - #os.remove(tmp_file_path) - + oneke_json = json.loads(oneke_config) + _json = self.client.templatepool[template_id].instantiate(json_str=oneke_json) # Get service_id from JSON - logger.info("JSON: {}".format(_json)) - - pass + service_id = list(_json.keys())[0] + logger.info("OneKE service ID: {}".format(service_id)) + return service_id def _wait_for_oneke(self, service_id): From 422c81fcadb394c83decb1c921dbc64682be29b8 Mon Sep 17 00:00:00 2001 From: MarioRobres Date: Thu, 6 Jun 2024 13:01:09 +0200 Subject: [PATCH 07/27] F OpenNebula/one-aiops#70: add pyone and start wait for OneKE --- lithops/serverless/backends/one/one.py | 42 +++++++++++++++++++++++--- 1 file changed, 38 insertions(+), 4 deletions(-) diff --git a/lithops/serverless/backends/one/one.py b/lithops/serverless/backends/one/one.py index 990bc52b4..4aa60ed37 100644 --- a/lithops/serverless/backends/one/one.py +++ b/lithops/serverless/backends/one/one.py @@ -1,10 +1,12 @@ from ..k8s.k8s import KubernetesBackend +import oneflow +import pyone + import os import json import logging import urllib3 -import oneflow logger = logging.getLogger(__name__) urllib3.disable_warnings() @@ -13,6 +15,22 @@ class OneError(Exception): pass +def _config_one(): + env = os.environ + + # Reading the `one_auth` file. + # The `one_auth` file path is given in the environment variable + # `ONE_AUTH` if exists, otherwise it is in `$HOME/.one/one_auth`. + auth_path = env.get('ONE_AUTH') or os.path.expanduser('~/.one/one_auth') + with open(auth_path, mode='r') as auth_file: + credentials = auth_file.readlines()[0].strip() + + # Reading environment variables. + # Environment variable `ONESERVER_URL` superseeds the default URL. + url = env.get('ONESERVER_URL', 'http://localhost:2633/RPC2') + + return pyone.OneServer(url, session=credentials) + class OpenNebula(KubernetesBackend): """ @@ -20,11 +38,15 @@ class OpenNebula(KubernetesBackend): """ def __init__(self, one_config, internal_storage): logger.debug("Initializing OpenNebula backend") + + logger.debug("Initializing Oneflow python client") self.client = oneflow.OneFlowClient() + logger.debug("Initializing OpenNebula python client") + self.one = _config_one() + # template_id: instantiate OneKE if 'template_id' in one_config: - logger.info(one_config['template_id']) service_id = self._instantiate_oneke(one_config['template_id'], one_config['oneke_config']) self._wait_for_oneke(service_id) # service_id: check deployed OneKE is available @@ -68,12 +90,24 @@ def _instantiate_oneke(self, template_id, oneke_config): # Get service_id from JSON service_id = list(_json.keys())[0] - logger.info("OneKE service ID: {}".format(service_id)) + logger.debug("OneKE service ID: {}".format(service_id)) return service_id def _wait_for_oneke(self, service_id): - # TODO: wait for all the VMs + _service_json = self.client.servicepool[service_id].info() + logger.debug(_service_json) + logs = _service_json[service_id]['TEMPLATE']['BODY'].get('log', []) + if logs: + last_log = logs[-1] + logger.debug(last_log) + state = last_log['message'].split(':')[-1].strip() + if state == 'FAILED_DEPLOYING': + raise OneError(f"OneKE deployment has failed") + if state == 'RUNNING': + logger.info("OneKE is Running") + logger.debug("Deployment state: {}".format(state)) + # TODO: look onegate connectivity pass \ No newline at end of file From 013b38be73b6c4e1f9733f99dfb27c0cc10aafdc Mon Sep 17 00:00:00 2001 From: MarioRobres Date: Thu, 6 Jun 2024 13:52:03 +0200 Subject: [PATCH 08/27] F OpenNebula/one-aiops#70: wait for OneKE implemented --- lithops/serverless/backends/one/one.py | 42 +++++++++++++++----------- 1 file changed, 25 insertions(+), 17 deletions(-) diff --git a/lithops/serverless/backends/one/one.py b/lithops/serverless/backends/one/one.py index 4aa60ed37..c65fb5296 100644 --- a/lithops/serverless/backends/one/one.py +++ b/lithops/serverless/backends/one/one.py @@ -5,6 +5,7 @@ import os import json +import time import logging import urllib3 @@ -94,20 +95,27 @@ def _instantiate_oneke(self, template_id, oneke_config): return service_id - def _wait_for_oneke(self, service_id): - _service_json = self.client.servicepool[service_id].info() - logger.debug(_service_json) - logs = _service_json[service_id]['TEMPLATE']['BODY'].get('log', []) - if logs: - last_log = logs[-1] - logger.debug(last_log) - state = last_log['message'].split(':')[-1].strip() - if state == 'FAILED_DEPLOYING': - raise OneError(f"OneKE deployment has failed") - if state == 'RUNNING': - logger.info("OneKE is Running") - logger.debug("Deployment state: {}".format(state)) - - - # TODO: look onegate connectivity - pass \ No newline at end of file + def _wait_for_oneke(self, service_id, timeout=600): + start_time = time.time() + minutes_timeout = int(timeout/60) + logger.debug("Initializing OneKE service. Be patient, this process can take up to {} minutes".format(minutes_timeout)) + while True: + _service_json = self.client.servicepool[service_id].info() + logs = _service_json[service_id]['TEMPLATE']['BODY'].get('log', []) + if logs: + last_log = logs[-1] + logger.debug(last_log) + state = last_log['message'].split(':')[-1].strip() + # Check OneKE deployment status + if state == 'FAILED_DEPLOYING': + raise OneError("OneKE deployment has failed") + if state == 'RUNNING': + logger.debug("OneKE is running") + break + + # Check timeout + elapsed_time = time.time() - start_time + if elapsed_time > timeout: + raise OneError("Deployment timed out after {} seconds. You can try again once OneKE is in RUNNING state with the service_id option.".format(timeout)) + + time.sleep(10) \ No newline at end of file From 76c17c4c3700c801be0e3d3c0dfa42ca75d849ac Mon Sep 17 00:00:00 2001 From: MarioRobres Date: Fri, 7 Jun 2024 10:52:35 +0200 Subject: [PATCH 09/27] F OpenNebula/one-aiops#70: get kubecfg from master node --- lithops/serverless/backends/one/config.py | 57 +++---------------- lithops/serverless/backends/one/one.py | 36 ++++++++++-- .../serverless/backends/one/one_config.yaml | 3 +- 3 files changed, 39 insertions(+), 57 deletions(-) diff --git a/lithops/serverless/backends/one/config.py b/lithops/serverless/backends/one/config.py index d73d7f5c1..f54e43ed5 100644 --- a/lithops/serverless/backends/one/config.py +++ b/lithops/serverless/backends/one/config.py @@ -13,55 +13,6 @@ class OneConfigError(Exception): pass -DEFAULT_ONEKE_CONFIG = """ -{ - "name": "OneKE/1", - "networks_values": [ - {"Public": {"id": "0"}}, - {"Private": {"id": "1"}} - ], - "custom_attrs_values": { - "ONEAPP_VROUTER_ETH0_VIP0": "", - "ONEAPP_VROUTER_ETH1_VIP0": "", - - "ONEAPP_RKE2_SUPERVISOR_EP": "ep0.eth0.vr:9345", - "ONEAPP_K8S_CONTROL_PLANE_EP": "ep0.eth0.vr:6443", - "ONEAPP_K8S_EXTRA_SANS": "localhost,127.0.0.1,ep0.eth0.vr,${vnf.TEMPLATE.CONTEXT.ETH0_IP},k8s.yourdomain.it", - - "ONEAPP_K8S_MULTUS_ENABLED": "NO", - "ONEAPP_K8S_MULTUS_CONFIG": "", - "ONEAPP_K8S_CNI_PLUGIN": "cilium", - "ONEAPP_K8S_CNI_CONFIG": "", - "ONEAPP_K8S_CILIUM_RANGE": "", - - "ONEAPP_K8S_METALLB_ENABLED": "NO", - "ONEAPP_K8S_METALLB_CONFIG": "", - "ONEAPP_K8S_METALLB_RANGE": "", - - "ONEAPP_K8S_LONGHORN_ENABLED": "YES", - "ONEAPP_STORAGE_DEVICE": "/dev/vdb", - "ONEAPP_STORAGE_FILESYSTEM": "xfs", - - "ONEAPP_K8S_TRAEFIK_ENABLED": "YES", - "ONEAPP_VNF_HAPROXY_INTERFACES": "eth0", - "ONEAPP_VNF_HAPROXY_REFRESH_RATE": "30", - "ONEAPP_VNF_HAPROXY_LB0_PORT": "9345", - "ONEAPP_VNF_HAPROXY_LB1_PORT": "6443", - "ONEAPP_VNF_HAPROXY_LB2_PORT": "443", - "ONEAPP_VNF_HAPROXY_LB3_PORT": "80", - - "ONEAPP_VNF_DNS_ENABLED": "YES", - "ONEAPP_VNF_DNS_INTERFACES": "eth1", - "ONEAPP_VNF_DNS_NAMESERVERS": "1.1.1.1,8.8.8.8", - "ONEAPP_VNF_NAT4_ENABLED": "YES", - "ONEAPP_VNF_NAT4_INTERFACES_OUT": "eth0", - "ONEAPP_VNF_ROUTER4_ENABLED": "YES", - "ONEAPP_VNF_ROUTER4_INTERFACES": "eth0,eth1" - } -} -""" - - MANDATORY_CONFIG_KEYS = { "public_network_id", "private_network_id" @@ -120,6 +71,13 @@ class OneConfigError(Exception): """ +# Add OpenNebula defaults +DEFAULT_CONFIG_KEYS = { + 'timeout': 600, + 'kubcfg_path': '/tmp/kube_config' +} + + def load_config(config_data): if 'oneke_config' in config_data['one']: oneke_config = config_data['one']['oneke_config'] @@ -144,7 +102,6 @@ def load_config(config_data): ], "custom_attrs_values": custom_attrs_values } - # Override oneke_config with a valid JSON to update the service config_data['one']['oneke_config'] = json.dumps(oneke_update) diff --git a/lithops/serverless/backends/one/one.py b/lithops/serverless/backends/one/one.py index c65fb5296..f95e47f6a 100644 --- a/lithops/serverless/backends/one/one.py +++ b/lithops/serverless/backends/one/one.py @@ -6,6 +6,7 @@ import os import json import time +import base64 import logging import urllib3 @@ -48,18 +49,23 @@ def __init__(self, one_config, internal_storage): # template_id: instantiate OneKE if 'template_id' in one_config: - service_id = self._instantiate_oneke(one_config['template_id'], one_config['oneke_config']) - self._wait_for_oneke(service_id) + one_config['service_id'] = self._instantiate_oneke(one_config['template_id'], one_config['oneke_config']) + self._wait_for_oneke(one_config['service_id'], one_config['timeout']) # service_id: check deployed OneKE is available elif 'service_id' in one_config: self._check_oneke(one_config['service_id']) else: raise OneError(f"OpenNebula backend must contain 'template_id' or 'service_id'") + # Get and Save kubeconfig from OneKE + kubecfg = self._get_kube_config(one_config['service_id']) + with open(one_config['oneconfig_path'], 'w') as file: + file.write(kubecfg) # Overwrite config values self.name = 'one' - + self.kubecfg_path = one_config['oneconfig_path'] + super().__init__(one_config, internal_storage) @@ -95,7 +101,7 @@ def _instantiate_oneke(self, template_id, oneke_config): return service_id - def _wait_for_oneke(self, service_id, timeout=600): + def _wait_for_oneke(self, service_id, timeout): start_time = time.time() minutes_timeout = int(timeout/60) logger.debug("Initializing OneKE service. Be patient, this process can take up to {} minutes".format(minutes_timeout)) @@ -104,7 +110,7 @@ def _wait_for_oneke(self, service_id, timeout=600): logs = _service_json[service_id]['TEMPLATE']['BODY'].get('log', []) if logs: last_log = logs[-1] - logger.debug(last_log) + logger.debug("Last log: {}".format(last_log)) state = last_log['message'].split(':')[-1].strip() # Check OneKE deployment status if state == 'FAILED_DEPLOYING': @@ -118,4 +124,22 @@ def _wait_for_oneke(self, service_id, timeout=600): if elapsed_time > timeout: raise OneError("Deployment timed out after {} seconds. You can try again once OneKE is in RUNNING state with the service_id option.".format(timeout)) - time.sleep(10) \ No newline at end of file + time.sleep(10) + + + def _get_kube_config(self, service_id): + # Get master VM ID + _service_json = self.client.servicepool[service_id].info() + master_vm_id = next( + (role['nodes'][0]['vm_info']['VM']['ID'] for role in _service_json[str(service_id)]['TEMPLATE']['BODY']['roles'] + if role['name'] == 'master'), + None + ) + if master_vm_id is None: + raise OneError("Master VM ID not found. Please change the name of the master node to 'master' and try again.") + # Get kubeconfig + vm = self.one.vm.info(int(master_vm_id)) + encoded_kubeconfig = vm.USER_TEMPLATE.get('ONEKE_KUBECONFIG') + decoded_kubeconfig = base64.b64decode(encoded_kubeconfig).decode('utf-8') + logger.debug("OpneNebula OneKE Kubeconfig: {}".format(decoded_kubeconfig)) + return decoded_kubeconfig \ No newline at end of file diff --git a/lithops/serverless/backends/one/one_config.yaml b/lithops/serverless/backends/one/one_config.yaml index 4124fb532..741a44d08 100644 --- a/lithops/serverless/backends/one/one_config.yaml +++ b/lithops/serverless/backends/one/one_config.yaml @@ -4,7 +4,8 @@ one: service_id: # oneke service id (means OneKE is already deployed) service_template_id: # oneke_tempalte_id (client has downloaded before) oneconfig_path: # PATH to OneKE JSON config - + timeout: # time to wait for OneKE to be ready + kubcfg_path': # PATH were kubeconfig will be stored oneke_config: public_network_id: # ID for Public vnet private_network_id: # ID for Private vnet (if not passed: create a new one) From b3256fd27f8fe10a2f83a692dcf21954c2707d76 Mon Sep 17 00:00:00 2001 From: MarioRobres Date: Fri, 7 Jun 2024 11:05:29 +0200 Subject: [PATCH 10/27] F OpenNebula/one-aiops#70: fix kubecfg path --- lithops/serverless/backends/one/config.py | 6 +++--- lithops/serverless/backends/one/one.py | 10 +++++----- lithops/serverless/backends/one/one_config.yaml | 4 ++-- 3 files changed, 10 insertions(+), 10 deletions(-) diff --git a/lithops/serverless/backends/one/config.py b/lithops/serverless/backends/one/config.py index f54e43ed5..9d20bb9e0 100644 --- a/lithops/serverless/backends/one/config.py +++ b/lithops/serverless/backends/one/config.py @@ -72,10 +72,10 @@ class OneConfigError(Exception): # Add OpenNebula defaults -DEFAULT_CONFIG_KEYS = { +DEFAULT_CONFIG_KEYS.update({ 'timeout': 600, - 'kubcfg_path': '/tmp/kube_config' -} + 'kubecfg_path': '/tmp/kube_config', +}) def load_config(config_data): diff --git a/lithops/serverless/backends/one/one.py b/lithops/serverless/backends/one/one.py index f95e47f6a..4ebb64caa 100644 --- a/lithops/serverless/backends/one/one.py +++ b/lithops/serverless/backends/one/one.py @@ -47,7 +47,7 @@ def __init__(self, one_config, internal_storage): logger.debug("Initializing OpenNebula python client") self.one = _config_one() - # template_id: instantiate OneKE + # template_id: instantiate master node if 'template_id' in one_config: one_config['service_id'] = self._instantiate_oneke(one_config['template_id'], one_config['oneke_config']) self._wait_for_oneke(one_config['service_id'], one_config['timeout']) @@ -59,13 +59,13 @@ def __init__(self, one_config, internal_storage): # Get and Save kubeconfig from OneKE kubecfg = self._get_kube_config(one_config['service_id']) - with open(one_config['oneconfig_path'], 'w') as file: + with open(one_config['kubecfg_path'], 'w') as file: file.write(kubecfg) # Overwrite config values self.name = 'one' - self.kubecfg_path = one_config['oneconfig_path'] - + self.kubecfg_path = one_config['kubecfg_path'] + super().__init__(one_config, internal_storage) @@ -91,7 +91,7 @@ def _check_oneke(self, service_id): def _instantiate_oneke(self, template_id, oneke_config): # TODO: create private network if not passed - # Pass the temporary file path to the update() function + # Instantiate OneKE oneke_json = json.loads(oneke_config) _json = self.client.templatepool[template_id].instantiate(json_str=oneke_json) diff --git a/lithops/serverless/backends/one/one_config.yaml b/lithops/serverless/backends/one/one_config.yaml index 741a44d08..fe1ac57a3 100644 --- a/lithops/serverless/backends/one/one_config.yaml +++ b/lithops/serverless/backends/one/one_config.yaml @@ -3,9 +3,9 @@ lithops: one: service_id: # oneke service id (means OneKE is already deployed) service_template_id: # oneke_tempalte_id (client has downloaded before) - oneconfig_path: # PATH to OneKE JSON config + oneke_config_path: # PATH to OneKE JSON config timeout: # time to wait for OneKE to be ready - kubcfg_path': # PATH were kubeconfig will be stored + kubecfg_path': # PATH were kubeconfig will be stored oneke_config: public_network_id: # ID for Public vnet private_network_id: # ID for Private vnet (if not passed: create a new one) From f1ef645bb28396101e6118d14e9a21e7e0ed2a32 Mon Sep 17 00:00:00 2001 From: MarioRobres Date: Fri, 7 Jun 2024 12:29:40 +0200 Subject: [PATCH 11/27] F OpenNebula/one-aiops#70: method _chek_oneke and add new params --- lithops/serverless/backends/one/config.py | 88 +++++++++++++++++++ lithops/serverless/backends/one/one.py | 59 +++++++++---- .../serverless/backends/one/one_config.yaml | 11 ++- 3 files changed, 138 insertions(+), 20 deletions(-) diff --git a/lithops/serverless/backends/one/config.py b/lithops/serverless/backends/one/config.py index 9d20bb9e0..d536c2d85 100644 --- a/lithops/serverless/backends/one/config.py +++ b/lithops/serverless/backends/one/config.py @@ -71,10 +71,98 @@ class OneConfigError(Exception): """ +STATE = { + 0: "INIT", + 1: "PENDING", + 2: "HOLD", + 3: "ACTIVE", + 4: "STOPPED", + 5: "SUSPENDED", + 6: "DONE", + 8: "POWEROFF", + 9: "UNDEPLOYED", + 10: "CLONING", + 11: "CLONING_FAILURE" +} + + +LCM_STATE = { + 0: "LCM_INIT", + 1: "PROLOG", + 2: "BOOT", + 3: "RUNNING", + 4: "MIGRATE", + 5: "SAVE_STOP", + 6: "SAVE_SUSPEND", + 7: "SAVE_MIGRATE", + 8: "PROLOG_MIGRATE", + 9: "PROLOG_RESUME", + 10: "EPILOG_STOP", + 11: "EPILOG", + 12: "SHUTDOWN", + 15: "CLEANUP_RESUBMIT", + 16: "UNKNOWN", + 17: "HOTPLUG", + 18: "SHUTDOWN_POWEROFF", + 19: "BOOT_UNKNOWN", + 20: "BOOT_POWEROFF", + 21: "BOOT_SUSPENDED", + 22: "BOOT_STOPPED", + 23: "CLEANUP_DELETE", + 24: "HOTPLUG_SNAPSHOT", + 25: "HOTPLUG_NIC", + 26: "HOTPLUG_SAVEAS", + 27: "HOTPLUG_SAVEAS_POWEROFF", + 28: "HOTPLUG_SAVEAS_SUSPENDED", + 29: "SHUTDOWN_UNDEPLOY", + 30: "EPILOG_UNDEPLOY", + 31: "PROLOG_UNDEPLOY", + 32: "BOOT_UNDEPLOY", + 33: "HOTPLUG_PROLOG_POWEROFF", + 34: "HOTPLUG_EPILOG_POWEROFF", + 35: "BOOT_MIGRATE", + 36: "BOOT_FAILURE", + 37: "BOOT_MIGRATE_FAILURE", + 38: "PROLOG_MIGRATE_FAILURE", + 39: "PROLOG_FAILURE", + 40: "EPILOG_FAILURE", + 41: "EPILOG_STOP_FAILURE", + 42: "EPILOG_UNDEPLOY_FAILURE", + 43: "PROLOG_MIGRATE_POWEROFF", + 44: "PROLOG_MIGRATE_POWEROFF_FAILURE", + 45: "PROLOG_MIGRATE_SUSPEND", + 46: "PROLOG_MIGRATE_SUSPEND_FAILURE", + 47: "BOOT_UNDEPLOY_FAILURE", + 48: "BOOT_STOPPED_FAILURE", + 49: "PROLOG_RESUME_FAILURE", + 50: "PROLOG_UNDEPLOY_FAILURE", + 51: "DISK_SNAPSHOT_POWEROFF", + 52: "DISK_SNAPSHOT_REVERT_POWEROFF", + 53: "DISK_SNAPSHOT_DELETE_POWEROFF", + 54: "DISK_SNAPSHOT_SUSPENDED", + 55: "DISK_SNAPSHOT_REVERT_SUSPENDED", + 56: "DISK_SNAPSHOT_DELETE_SUSPENDED", + 57: "DISK_SNAPSHOT", + 59: "DISK_SNAPSHOT_DELETE", + 60: "PROLOG_MIGRATE_UNKNOWN", + 61: "PROLOG_MIGRATE_UNKNOWN_FAILURE", + 62: "DISK_RESIZE", + 63: "DISK_RESIZE_POWEROFF", + 64: "DISK_RESIZE_UNDEPLOYED", + 65: "HOTPLUG_NIC_POWEROFF", + 66: "HOTPLUG_RESIZE", + 67: "HOTPLUG_SAVEAS_UNDEPLOYED", + 68: "HOTPLUG_SAVEAS_STOPPED", + 69: "BACKUP", + 70: "BACKUP_POWEROFF" +} + + # Add OpenNebula defaults DEFAULT_CONFIG_KEYS.update({ 'timeout': 600, 'kubecfg_path': '/tmp/kube_config', + 'oneke_config_path': None, }) diff --git a/lithops/serverless/backends/one/one.py b/lithops/serverless/backends/one/one.py index 4ebb64caa..f24682bb1 100644 --- a/lithops/serverless/backends/one/one.py +++ b/lithops/serverless/backends/one/one.py @@ -1,4 +1,5 @@ from ..k8s.k8s import KubernetesBackend +from .config import STATE, LCM_STATE import oneflow import pyone @@ -49,14 +50,16 @@ def __init__(self, one_config, internal_storage): # template_id: instantiate master node if 'template_id' in one_config: - one_config['service_id'] = self._instantiate_oneke(one_config['template_id'], one_config['oneke_config']) - self._wait_for_oneke(one_config['service_id'], one_config['timeout']) - # service_id: check deployed OneKE is available + one_config['service_id'] = self._instantiate_oneke(one_config['template_id'], one_config['oneke_config'], one_config['oneke_config_path']) + self._wait_for_oneke(one_config['service_id'], one_config['timeout']) elif 'service_id' in one_config: - self._check_oneke(one_config['service_id']) + pass else: - raise OneError(f"OpenNebula backend must contain 'template_id' or 'service_id'") - + raise OneError("OpenNebula backend must contain 'template_id' or 'service_id'") + + # Check OneKE status + self._check_oneke(one_config['service_id']) + # Get and Save kubeconfig from OneKE kubecfg = self._get_kube_config(one_config['service_id']) with open(one_config['kubecfg_path'], 'w') as file: @@ -75,25 +78,48 @@ def invoke(self, docker_image_name, runtime_memory, job_payload): def clear(self, job_keys=None): # First, we clean Kubernetes jobs - super().clear(all) + super().clear(job_keys) # TODO: if all are deteleted -> suspend OneKE VMs (scale down) and # delete them after X minutes pass - + def _check_oneke(self, service_id): - # CASE1: client has created their own OneKE cluster - # CASE2: OneKE cluster was created by lithops (with or without JSON file) + logger.debug("Checking OpenNebula OneKE service status") + # Check service status + _service_json = self.client.servicepool[service_id].info() + last_log = _service_json[str(service_id)service_id]['TEMPLATE']['BODY'].get('log', [])[-1] + state = last_log['message'].split(':')[-1].strip() + if state != 'RUNNING': + raise OneError(f"OpenNebula OneKE service is not running: {state}") + # Check VMs status + vm_ids = { + node['vm_info']['VM']['ID'] + for role in _service_json[str(service_id)]['TEMPLATE']['BODY']['roles'] + for node in role['nodes'] + } + for vm_id in vm_ids: + vm = self.one.vm.info(int(vm_id)) + state = vm.STATE + lcm_state = vm.LCM_STATE + if state != 3 or lcm_state != 3: + state_desc = STATE.get(state, "UNKNOWN_STATE") + lcm_state_desc = LCM_STATE.get(lcm_state, "UNKNOWN_LCM_STATE") + raise OneError(f"VM {vm_id} fails validation: STATE={state_desc} (code {state}), LCM_STATE={lcm_state_desc} (code {lcm_state})") pass - def _instantiate_oneke(self, template_id, oneke_config): + def _instantiate_oneke(self, template_id, oneke_config, oneke_config_path): # TODO: create private network if not passed - # Instantiate OneKE - oneke_json = json.loads(oneke_config) - _json = self.client.templatepool[template_id].instantiate(json_str=oneke_json) + # Instantiate OneKE (with JSON or oneke_config parameters) + logger.debug("Instantiating OpenNebula OneKE service") + if oneke_config_path is not None: + _json = self.client.templatepool[template_id].instantiate(path=oneke_config_path) + else: + oneke_json = json.loads(oneke_config) + _json = self.client.templatepool[template_id].instantiate(json_str=oneke_json) # Get service_id from JSON service_id = list(_json.keys())[0] @@ -107,7 +133,7 @@ def _wait_for_oneke(self, service_id, timeout): logger.debug("Initializing OneKE service. Be patient, this process can take up to {} minutes".format(minutes_timeout)) while True: _service_json = self.client.servicepool[service_id].info() - logs = _service_json[service_id]['TEMPLATE']['BODY'].get('log', []) + logs = _service_json[str(service_id)]['TEMPLATE']['BODY'].get('log', []) if logs: last_log = logs[-1] logger.debug("Last log: {}".format(last_log)) @@ -125,6 +151,7 @@ def _wait_for_oneke(self, service_id, timeout): raise OneError("Deployment timed out after {} seconds. You can try again once OneKE is in RUNNING state with the service_id option.".format(timeout)) time.sleep(10) + logger.debug("OneKE service is running after {} seconds".format(int(elapsed_time))) def _get_kube_config(self, service_id): @@ -141,5 +168,5 @@ def _get_kube_config(self, service_id): vm = self.one.vm.info(int(master_vm_id)) encoded_kubeconfig = vm.USER_TEMPLATE.get('ONEKE_KUBECONFIG') decoded_kubeconfig = base64.b64decode(encoded_kubeconfig).decode('utf-8') - logger.debug("OpneNebula OneKE Kubeconfig: {}".format(decoded_kubeconfig)) + logger.debug("OpenNebula OneKE Kubeconfig: {}".format(decoded_kubeconfig)) return decoded_kubeconfig \ No newline at end of file diff --git a/lithops/serverless/backends/one/one_config.yaml b/lithops/serverless/backends/one/one_config.yaml index fe1ac57a3..e501e409c 100644 --- a/lithops/serverless/backends/one/one_config.yaml +++ b/lithops/serverless/backends/one/one_config.yaml @@ -3,10 +3,13 @@ lithops: one: service_id: # oneke service id (means OneKE is already deployed) service_template_id: # oneke_tempalte_id (client has downloaded before) - oneke_config_path: # PATH to OneKE JSON config - timeout: # time to wait for OneKE to be ready - kubecfg_path': # PATH were kubeconfig will be stored + oneke_config: public_network_id: # ID for Public vnet private_network_id: # ID for Private vnet (if not passed: create a new one) - delete: # if set to True, OneKE VMs will be deleted after all the jobs are finished \ No newline at end of file + + delete: # if set to True, OneKE VMs will be deleted after all the jobs are finished + + oneke_config_path: # PATH to OneKE JSON config + timeout: # time to wait for OneKE to be ready + kubecfg_path': # PATH were kubeconfig will be stored \ No newline at end of file From 9df5c931270e9e1ebca3025f67fb6227db3b003a Mon Sep 17 00:00:00 2001 From: MarioRobres Date: Fri, 7 Jun 2024 12:33:36 +0200 Subject: [PATCH 12/27] F OpenNebula/one-aiops#70: minor fix --- lithops/serverless/backends/one/one.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lithops/serverless/backends/one/one.py b/lithops/serverless/backends/one/one.py index f24682bb1..eeb4b73cc 100644 --- a/lithops/serverless/backends/one/one.py +++ b/lithops/serverless/backends/one/one.py @@ -89,7 +89,7 @@ def _check_oneke(self, service_id): logger.debug("Checking OpenNebula OneKE service status") # Check service status _service_json = self.client.servicepool[service_id].info() - last_log = _service_json[str(service_id)service_id]['TEMPLATE']['BODY'].get('log', [])[-1] + last_log = _service_json[str(service_id)]['TEMPLATE']['BODY'].get('log', [])[-1] state = last_log['message'].split(':')[-1].strip() if state != 'RUNNING': raise OneError(f"OpenNebula OneKE service is not running: {state}") From d5d91c4af52f95139d0d83d5e4af00e00a4ddbe9 Mon Sep 17 00:00:00 2001 From: MarioRobres Date: Tue, 11 Jun 2024 12:50:52 +0200 Subject: [PATCH 13/27] F OpenNebula/one-aiops#70: ONE_AUTH can directly contain the credentials and fix typos --- lithops/serverless/backends/one/config.py | 1 + lithops/serverless/backends/one/one.py | 32 ++++++++++++----------- 2 files changed, 18 insertions(+), 15 deletions(-) diff --git a/lithops/serverless/backends/one/config.py b/lithops/serverless/backends/one/config.py index d536c2d85..2c0e3aab8 100644 --- a/lithops/serverless/backends/one/config.py +++ b/lithops/serverless/backends/one/config.py @@ -163,6 +163,7 @@ class OneConfigError(Exception): 'timeout': 600, 'kubecfg_path': '/tmp/kube_config', 'oneke_config_path': None, + 'delete': False, }) diff --git a/lithops/serverless/backends/one/one.py b/lithops/serverless/backends/one/one.py index eeb4b73cc..fcb1fe920 100644 --- a/lithops/serverless/backends/one/one.py +++ b/lithops/serverless/backends/one/one.py @@ -21,12 +21,15 @@ class OneError(Exception): def _config_one(): env = os.environ - # Reading the `one_auth` file. - # The `one_auth` file path is given in the environment variable - # `ONE_AUTH` if exists, otherwise it is in `$HOME/.one/one_auth`. - auth_path = env.get('ONE_AUTH') or os.path.expanduser('~/.one/one_auth') - with open(auth_path, mode='r') as auth_file: - credentials = auth_file.readlines()[0].strip() + # Reading the `one_auth` file or environment variable. + # `ONE_AUTH` can be a file path or the credentials, otherwise + # the default credentials in `$HOME/.one/one_auth` are used. + one_auth = env.get('ONE_AUTH') or os.path.expanduser('~/.one/one_auth') + if os.path.isfile(one_auth): + with open(one_auth, mode='r') as auth_file: + credentials = auth_file.readlines()[0].strip() + else: + credentials = one_auth.strip() # Reading environment variables. # Environment variable `ONESERVER_URL` superseeds the default URL. @@ -48,14 +51,14 @@ def __init__(self, one_config, internal_storage): logger.debug("Initializing OpenNebula python client") self.one = _config_one() - # template_id: instantiate master node - if 'template_id' in one_config: - one_config['service_id'] = self._instantiate_oneke(one_config['template_id'], one_config['oneke_config'], one_config['oneke_config_path']) + # service_template_id: instantiate master node + if 'service_template_id' in one_config: + one_config['service_id'] = self._instantiate_oneke(one_config['service_template_id'], one_config['oneke_config'], one_config['oneke_config_path']) self._wait_for_oneke(one_config['service_id'], one_config['timeout']) elif 'service_id' in one_config: pass else: - raise OneError("OpenNebula backend must contain 'template_id' or 'service_id'") + raise OneError("OpenNebula backend must contain 'service_template_id' or 'service_id'") # Check OneKE status self._check_oneke(one_config['service_id']) @@ -73,13 +76,12 @@ def __init__(self, one_config, internal_storage): def invoke(self, docker_image_name, runtime_memory, job_payload): + # TODO: add dynamic scaling logic super().invoke(docker_image_name, runtime_memory, job_payload) def clear(self, job_keys=None): - # First, we clean Kubernetes jobs super().clear(job_keys) - # TODO: if all are deteleted -> suspend OneKE VMs (scale down) and # delete them after X minutes pass @@ -110,16 +112,16 @@ def _check_oneke(self, service_id): pass - def _instantiate_oneke(self, template_id, oneke_config, oneke_config_path): + def _instantiate_oneke(self, service_template_id, oneke_config, oneke_config_path): # TODO: create private network if not passed # Instantiate OneKE (with JSON or oneke_config parameters) logger.debug("Instantiating OpenNebula OneKE service") if oneke_config_path is not None: - _json = self.client.templatepool[template_id].instantiate(path=oneke_config_path) + _json = self.client.templatepool[service_template_id].instantiate(path=oneke_config_path) else: oneke_json = json.loads(oneke_config) - _json = self.client.templatepool[template_id].instantiate(json_str=oneke_json) + _json = self.client.templatepool[service_template_id].instantiate(json_str=oneke_json) # Get service_id from JSON service_id = list(_json.keys())[0] From a61e458bd5402702fdcd2b5ed4e1bcee4f1fb62d Mon Sep 17 00:00:00 2001 From: MarioRobres Date: Fri, 21 Jun 2024 12:27:21 +0200 Subject: [PATCH 14/27] F OpenNebula/one-aiops#70: fix service status + refactor code --- lithops/serverless/backends/one/one.py | 119 ++++++++++++++++--------- 1 file changed, 77 insertions(+), 42 deletions(-) diff --git a/lithops/serverless/backends/one/one.py b/lithops/serverless/backends/one/one.py index fcb1fe920..f578dbd6c 100644 --- a/lithops/serverless/backends/one/one.py +++ b/lithops/serverless/backends/one/one.py @@ -43,22 +43,26 @@ class OpenNebula(KubernetesBackend): A wrap-up around OpenNebula backend. """ def __init__(self, one_config, internal_storage): - logger.debug("Initializing OpenNebula backend") + logger.info("Initializing OpenNebula backend") logger.debug("Initializing Oneflow python client") self.client = oneflow.OneFlowClient() logger.debug("Initializing OpenNebula python client") - self.one = _config_one() + self.pyone = _config_one() # service_template_id: instantiate master node if 'service_template_id' in one_config: - one_config['service_id'] = self._instantiate_oneke(one_config['service_template_id'], one_config['oneke_config'], one_config['oneke_config_path']) + one_config['service_id'] = self._instantiate_oneke( + one_config['service_template_id'], + one_config['oneke_config'], + one_config['oneke_config_path'] + ) self._wait_for_oneke(one_config['service_id'], one_config['timeout']) - elif 'service_id' in one_config: - pass - else: - raise OneError("OpenNebula backend must contain 'service_template_id' or 'service_id'") + elif 'service_id' not in one_config: + raise OneError( + "OpenNebula backend must contain 'service_template_id' or 'service_id'" + ) # Check OneKE status self._check_oneke(one_config['service_id']) @@ -76,10 +80,14 @@ def __init__(self, one_config, internal_storage): def invoke(self, docker_image_name, runtime_memory, job_payload): + # Get current nodes + super()._get_nodes() + logger.info(f"Found {len(self.nodes)} nodes") + logger.info(f"Nodes: {self.nodes}") # TODO: add dynamic scaling logic super().invoke(docker_image_name, runtime_memory, job_payload) - + def clear(self, job_keys=None): super().clear(job_keys) # TODO: if all are deteleted -> suspend OneKE VMs (scale down) and @@ -88,35 +96,31 @@ def clear(self, job_keys=None): def _check_oneke(self, service_id): - logger.debug("Checking OpenNebula OneKE service status") + logger.info("Checking OpenNebula OneKE service status") + # Check service status _service_json = self.client.servicepool[service_id].info() - last_log = _service_json[str(service_id)]['TEMPLATE']['BODY'].get('log', [])[-1] - state = last_log['message'].split(':')[-1].strip() + logs = _service_json[str(service_id)]['TEMPLATE']['BODY'].get('log', []) + state = self._get_latest_state(logs) + if state is None: + raise OneError("No state found in logs") if state != 'RUNNING': - raise OneError(f"OpenNebula OneKE service is not running: {state}") + raise OneError(f"OpenNebula OneKE service is not 'RUNNING': {state}") + # Check VMs status vm_ids = { node['vm_info']['VM']['ID'] for role in _service_json[str(service_id)]['TEMPLATE']['BODY']['roles'] for node in role['nodes'] } - for vm_id in vm_ids: - vm = self.one.vm.info(int(vm_id)) - state = vm.STATE - lcm_state = vm.LCM_STATE - if state != 3 or lcm_state != 3: - state_desc = STATE.get(state, "UNKNOWN_STATE") - lcm_state_desc = LCM_STATE.get(lcm_state, "UNKNOWN_LCM_STATE") - raise OneError(f"VM {vm_id} fails validation: STATE={state_desc} (code {state}), LCM_STATE={lcm_state_desc} (code {lcm_state})") - pass + self._check_vms_status(vm_ids) def _instantiate_oneke(self, service_template_id, oneke_config, oneke_config_path): # TODO: create private network if not passed # Instantiate OneKE (with JSON or oneke_config parameters) - logger.debug("Instantiating OpenNebula OneKE service") + logger.info("Instantiating OpenNebula OneKE service") if oneke_config_path is not None: _json = self.client.templatepool[service_template_id].instantiate(path=oneke_config_path) else: @@ -125,50 +129,81 @@ def _instantiate_oneke(self, service_template_id, oneke_config, oneke_config_pat # Get service_id from JSON service_id = list(_json.keys())[0] - logger.debug("OneKE service ID: {}".format(service_id)) + logger.info(f"OneKE service ID: {service_id}") return service_id def _wait_for_oneke(self, service_id, timeout): start_time = time.time() minutes_timeout = int(timeout/60) - logger.debug("Initializing OneKE service. Be patient, this process can take up to {} minutes".format(minutes_timeout)) + logger.info( + f"Waiting for OneKE service to become 'RUNNING'. " + f"Be patient, this process can take up to {minutes_timeout} minutes" + ) while True: _service_json = self.client.servicepool[service_id].info() logs = _service_json[str(service_id)]['TEMPLATE']['BODY'].get('log', []) if logs: - last_log = logs[-1] - logger.debug("Last log: {}".format(last_log)) - state = last_log['message'].split(':')[-1].strip() + state = self._get_latest_state(logs) # Check OneKE deployment status if state == 'FAILED_DEPLOYING': raise OneError("OneKE deployment has failed") - if state == 'RUNNING': - logger.debug("OneKE is running") + elif state == 'RUNNING': break - + # Check timeout elapsed_time = time.time() - start_time if elapsed_time > timeout: - raise OneError("Deployment timed out after {} seconds. You can try again once OneKE is in RUNNING state with the service_id option.".format(timeout)) - + raise OneError( + f"Deployment timed out after {timeout} seconds. " + f"You can try again once OneKE is in RUNNING state with the service_id option" + ) time.sleep(10) - logger.debug("OneKE service is running after {} seconds".format(int(elapsed_time))) + logger.info(f"OneKE service is RUNNING after {int(elapsed_time)} seconds") def _get_kube_config(self, service_id): # Get master VM ID + master_vm_id = None _service_json = self.client.servicepool[service_id].info() - master_vm_id = next( - (role['nodes'][0]['vm_info']['VM']['ID'] for role in _service_json[str(service_id)]['TEMPLATE']['BODY']['roles'] - if role['name'] == 'master'), - None - ) + roles = _service_json[str(service_id)]['TEMPLATE']['BODY']['roles'] + for role in roles: + if role['name'] == 'master': + master_vm_id = role['nodes'][0]['vm_info']['VM']['ID'] + break if master_vm_id is None: - raise OneError("Master VM ID not found. Please change the name of the master node to 'master' and try again.") + raise OneError( + "Master VM ID not found. " + "Please change the name of the master node to 'master' and try again" + ) # Get kubeconfig - vm = self.one.vm.info(int(master_vm_id)) + vm = self.pyone.vm.info(int(master_vm_id)) encoded_kubeconfig = vm.USER_TEMPLATE.get('ONEKE_KUBECONFIG') decoded_kubeconfig = base64.b64decode(encoded_kubeconfig).decode('utf-8') - logger.debug("OpenNebula OneKE Kubeconfig: {}".format(decoded_kubeconfig)) - return decoded_kubeconfig \ No newline at end of file + logger.debug(f"OpenNebula OneKE Kubeconfig: {decoded_kubeconfig}") + return decoded_kubeconfig + + + def _get_latest_state(self, logs): + for log in reversed(logs): + if 'New state:' in log['message']: + return log['message'].split(':')[-1].strip() + return None + + + def _check_vms_status(self, vm_ids): + if len(vm_ids) == 0: + # TODO: scale up OneKE VMs to default size + raise OneError("No VMs found in OneKE service") + for vm_id in vm_ids: + vm = self.pyone.vm.info(int(vm_id)) + state = vm.STATE + lcm_state = vm.LCM_STATE + if state != 3 or lcm_state != 3: + state_desc = STATE.get(state, "UNKNOWN_STATE") + lcm_state_desc = LCM_STATE.get(lcm_state, "UNKNOWN_LCM_STATE") + raise OneError( + f"VM {vm_id} fails validation: " + f"STATE={state_desc} (code {state}), " + f"LCM_STATE={lcm_state_desc} (code {lcm_state})" + ) \ No newline at end of file From 7983dd7288149373ce785638057c1fdab1761838 Mon Sep 17 00:00:00 2001 From: MarioRobres Date: Fri, 28 Jun 2024 11:06:15 +0200 Subject: [PATCH 15/27] F OpenNebula/one-aiops#70: basic scaling for one backend --- lithops/serverless/backends/one/config.py | 1 + lithops/serverless/backends/one/one.py | 167 ++++++++++++------ .../serverless/backends/one/one_config.yaml | 3 +- 3 files changed, 113 insertions(+), 58 deletions(-) diff --git a/lithops/serverless/backends/one/config.py b/lithops/serverless/backends/one/config.py index 2c0e3aab8..c55000a17 100644 --- a/lithops/serverless/backends/one/config.py +++ b/lithops/serverless/backends/one/config.py @@ -164,6 +164,7 @@ class OneConfigError(Exception): 'kubecfg_path': '/tmp/kube_config', 'oneke_config_path': None, 'delete': False, + 'minimum_nodes': 0 }) diff --git a/lithops/serverless/backends/one/one.py b/lithops/serverless/backends/one/one.py index f578dbd6c..eb9fc6ef2 100644 --- a/lithops/serverless/backends/one/one.py +++ b/lithops/serverless/backends/one/one.py @@ -45,6 +45,11 @@ class OpenNebula(KubernetesBackend): def __init__(self, one_config, internal_storage): logger.info("Initializing OpenNebula backend") + # Overwrite config values + self.name = 'one' + self.timeout = one_config['timeout'] + self.minimum_nodes = one_config['minimum_nodes'] + logger.debug("Initializing Oneflow python client") self.client = oneflow.OneFlowClient() @@ -53,67 +58,66 @@ def __init__(self, one_config, internal_storage): # service_template_id: instantiate master node if 'service_template_id' in one_config: - one_config['service_id'] = self._instantiate_oneke( + self.service_id = self._instantiate_oneke( one_config['service_template_id'], one_config['oneke_config'], one_config['oneke_config_path'] ) - self._wait_for_oneke(one_config['service_id'], one_config['timeout']) - elif 'service_id' not in one_config: + self._wait_for_oneke('RUNNING') + elif 'service_id' in one_config: + self.service_id = one_config['service_id'] + else: raise OneError( "OpenNebula backend must contain 'service_template_id' or 'service_id'" ) - - # Check OneKE status - self._check_oneke(one_config['service_id']) + self._check_oneke() # Get and Save kubeconfig from OneKE - kubecfg = self._get_kube_config(one_config['service_id']) + kubecfg = self._get_kube_config() with open(one_config['kubecfg_path'], 'w') as file: file.write(kubecfg) - - # Overwrite config values - self.name = 'one' self.kubecfg_path = one_config['kubecfg_path'] super().__init__(one_config, internal_storage) def invoke(self, docker_image_name, runtime_memory, job_payload): - # Get current nodes super()._get_nodes() - logger.info(f"Found {len(self.nodes)} nodes") - logger.info(f"Nodes: {self.nodes}") - # TODO: add dynamic scaling logic + pods, scale_nodes, chunksize, worker_processes = self._granularity( + job_payload['total_calls'] + ) + + # Scale nodes + # TODO: Add logic to see if it's worth waiting for the cooldown or not + if scale_nodes > len(self.nodes): + self._scale_oneke(self.nodes, scale_nodes) + + # Setup granularity + job_payload['max_workers'] = pods + job_payload['chunksize'] = chunksize + job_payload['worker_processes'] = worker_processes super().invoke(docker_image_name, runtime_memory, job_payload) def clear(self, job_keys=None): super().clear(job_keys) - # TODO: if all are deteleted -> suspend OneKE VMs (scale down) and - # delete them after X minutes - pass + # scale down only if the cooldown time has passed + state = self._get_latest_state() + if state != 'COOLDOWN': + super()._get_nodes() + self._scale_oneke(self.nodes, self.minimum_nodes) - def _check_oneke(self, service_id): + def _check_oneke(self): logger.info("Checking OpenNebula OneKE service status") # Check service status - _service_json = self.client.servicepool[service_id].info() - logs = _service_json[str(service_id)]['TEMPLATE']['BODY'].get('log', []) - state = self._get_latest_state(logs) - if state is None: - raise OneError("No state found in logs") + state = self._get_latest_state() if state != 'RUNNING': raise OneError(f"OpenNebula OneKE service is not 'RUNNING': {state}") # Check VMs status - vm_ids = { - node['vm_info']['VM']['ID'] - for role in _service_json[str(service_id)]['TEMPLATE']['BODY']['roles'] - for node in role['nodes'] - } - self._check_vms_status(vm_ids) + self._check_vms_status() def _instantiate_oneke(self, service_template_id, oneke_config, oneke_config_path): @@ -133,40 +137,40 @@ def _instantiate_oneke(self, service_template_id, oneke_config, oneke_config_pat return service_id - def _wait_for_oneke(self, service_id, timeout): + def _wait_for_oneke(self, state): start_time = time.time() - minutes_timeout = int(timeout/60) + minutes_timeout = int(self.timeout/60) logger.info( - f"Waiting for OneKE service to become 'RUNNING'. " - f"Be patient, this process can take up to {minutes_timeout} minutes" + f"Waiting for OneKE service to become {state}. " + f"Wait time: {minutes_timeout} minutes" ) while True: - _service_json = self.client.servicepool[service_id].info() - logs = _service_json[str(service_id)]['TEMPLATE']['BODY'].get('log', []) - if logs: - state = self._get_latest_state(logs) - # Check OneKE deployment status - if state == 'FAILED_DEPLOYING': - raise OneError("OneKE deployment has failed") - elif state == 'RUNNING': - break - # Check timeout elapsed_time = time.time() - start_time - if elapsed_time > timeout: + if elapsed_time > self.timeout: raise OneError( - f"Deployment timed out after {timeout} seconds. " - f"You can try again once OneKE is in RUNNING state with the service_id option" + f"Can't reach {state} state. OneKE timed out after {self.timeout} seconds. " + f"You can try again once OneKE is in `'RUNNING'` state with the `service_id` option" ) - time.sleep(10) - logger.info(f"OneKE service is RUNNING after {int(elapsed_time)} seconds") + # Check OneKE deployment status + current_state = self._get_latest_state() + if current_state == 'FAILED_DEPLOYING': + raise OneError("OneKE deployment has failed") + elif current_state == 'FAILED_SCALING': + raise OneError("OneKE scaling has failed") + elif current_state == state: + break - def _get_kube_config(self, service_id): + time.sleep(5) + logger.info(f"OneKE service is {state} after {int(elapsed_time)} seconds") + + + def _get_kube_config(self): # Get master VM ID master_vm_id = None - _service_json = self.client.servicepool[service_id].info() - roles = _service_json[str(service_id)]['TEMPLATE']['BODY']['roles'] + _service_json = self.client.servicepool[self.service_id].info() + roles = _service_json[str(self.service_id)]['TEMPLATE']['BODY']['roles'] for role in roles: if role['name'] == 'master': master_vm_id = role['nodes'][0]['vm_info']['VM']['ID'] @@ -174,8 +178,9 @@ def _get_kube_config(self, service_id): if master_vm_id is None: raise OneError( "Master VM ID not found. " - "Please change the name of the master node to 'master' and try again" + "Please change the name of the master node role to 'master' and try again" ) + # Get kubeconfig vm = self.pyone.vm.info(int(master_vm_id)) encoded_kubeconfig = vm.USER_TEMPLATE.get('ONEKE_KUBECONFIG') @@ -184,16 +189,23 @@ def _get_kube_config(self, service_id): return decoded_kubeconfig - def _get_latest_state(self, logs): + def _get_latest_state(self): + _service_json = self.client.servicepool[self.service_id].info() + logs = _service_json[str(self.service_id)]['TEMPLATE']['BODY'].get('log', []) for log in reversed(logs): if 'New state:' in log['message']: return log['message'].split(':')[-1].strip() - return None + raise OneError("No state found in logs") - def _check_vms_status(self, vm_ids): + def _check_vms_status(self): + _service_json = self.client.servicepool[self.service_id].info() + vm_ids = { + node['vm_info']['VM']['ID'] + for role in _service_json[str(self.service_id)]['TEMPLATE']['BODY']['roles'] + for node in role['nodes'] + } if len(vm_ids) == 0: - # TODO: scale up OneKE VMs to default size raise OneError("No VMs found in OneKE service") for vm_id in vm_ids: vm = self.pyone.vm.info(int(vm_id)) @@ -206,4 +218,45 @@ def _check_vms_status(self, vm_ids): f"VM {vm_id} fails validation: " f"STATE={state_desc} (code {state}), " f"LCM_STATE={lcm_state_desc} (code {lcm_state})" - ) \ No newline at end of file + ) + + + def _granularity(self, total_functions): + MAX_PODS_PER_NODE = 1 + MAX_FUNCTIONS_PER_POD = 25 + + # Calculate number of WORKERS (PODs) + # TODO: current number of pods depends on node resources + current_pods = len(self.nodes) // MAX_PODS_PER_NODE + req_pods = ( + total_functions + MAX_FUNCTIONS_PER_POD - 1 + ) // MAX_FUNCTIONS_PER_POD + pods = max(req_pods, current_pods) + + # Calculate number of NODES + # TODO: 1 node can have multiple pods + nodes = max(pods, len(self.nodes)) + + # Calculate number of functions executors per WORKER (POD) + # TODO: current number of executors depends on POD resources + worker_processes = 1 + + # Calculate number of functions per WORKER (POD) + # TODO: depends on worker_processes + chunksize = 1 + + logger.info( + f"Pods: {pods}, Nodes: {nodes}, " + f"Chunksize: {chunksize}, Worker Processes: {worker_processes}" + ) + return pods, nodes, chunksize, worker_processes + + + def _scale_oneke(self, nodes, scale_nodes): + logger.info(f"Scaling workers from {len(nodes)} to {scale_nodes} nodes") + # Ensure the service can be scaled + state = self._get_latest_state() + if len(self.nodes) == 0 and state == 'COOLDOWN': + self._wait_for_oneke('RUNNING') + self.client.servicepool[self.service_id].role["worker"].scale(int(scale_nodes)) + self._wait_for_oneke('COOLDOWN') diff --git a/lithops/serverless/backends/one/one_config.yaml b/lithops/serverless/backends/one/one_config.yaml index e501e409c..3c09f004b 100644 --- a/lithops/serverless/backends/one/one_config.yaml +++ b/lithops/serverless/backends/one/one_config.yaml @@ -8,7 +8,8 @@ one: public_network_id: # ID for Public vnet private_network_id: # ID for Private vnet (if not passed: create a new one) - delete: # if set to True, OneKE VMs will be deleted after all the jobs are finished + delete: # if set to True, OneKE will be deleted after all the jobs are finished + minimum_nodes: # minimum number of nodes in OneKE oneke_config_path: # PATH to OneKE JSON config timeout: # time to wait for OneKE to be ready From b92b712a31263fd14e5f876f7ceb21a664083784 Mon Sep 17 00:00:00 2001 From: MarioRobres Date: Tue, 2 Jul 2024 13:53:58 +0200 Subject: [PATCH 16/27] F OpenNebula/one-aiops#70: fix missing k8s config key --- lithops/serverless/backends/one/config.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/lithops/serverless/backends/one/config.py b/lithops/serverless/backends/one/config.py index c55000a17..156aef9dd 100644 --- a/lithops/serverless/backends/one/config.py +++ b/lithops/serverless/backends/one/config.py @@ -195,10 +195,13 @@ def load_config(config_data): # Override oneke_config with a valid JSON to update the service config_data['one']['oneke_config'] = json.dumps(oneke_update) - # TODO: change me for key in DEFAULT_CONFIG_KEYS: if key not in config_data['one']: config_data['one'][key] = DEFAULT_CONFIG_KEYS[key] + # Ensure 'k8s' key exists and is a dictionary + if 'k8s' not in config_data or config_data['k8s'] is None: + config_data['k8s'] = {} + # Load k8s default config original_load_config(config_data) \ No newline at end of file From 74558988de41088909bfe9f8cba3bd47546a4bab Mon Sep 17 00:00:00 2001 From: MarioRobres Date: Tue, 2 Jul 2024 16:03:56 +0200 Subject: [PATCH 17/27] F OpenNebula/one-aiops#70: add new one params --- lithops/serverless/backends/one/config.py | 2 ++ lithops/serverless/backends/one/one_config.yaml | 6 +++++- 2 files changed, 7 insertions(+), 1 deletion(-) diff --git a/lithops/serverless/backends/one/config.py b/lithops/serverless/backends/one/config.py index 156aef9dd..0b39682ce 100644 --- a/lithops/serverless/backends/one/config.py +++ b/lithops/serverless/backends/one/config.py @@ -202,6 +202,8 @@ def load_config(config_data): # Ensure 'k8s' key exists and is a dictionary if 'k8s' not in config_data or config_data['k8s'] is None: config_data['k8s'] = {} + config_data['k8s']['docker_user'] = config_data['one']['docker_user'] + config_data['k8s']['docker_password'] = config_data['one']['docker_password'] # Load k8s default config original_load_config(config_data) \ No newline at end of file diff --git a/lithops/serverless/backends/one/one_config.yaml b/lithops/serverless/backends/one/one_config.yaml index 3c09f004b..915b9cf0b 100644 --- a/lithops/serverless/backends/one/one_config.yaml +++ b/lithops/serverless/backends/one/one_config.yaml @@ -13,4 +13,8 @@ one: oneke_config_path: # PATH to OneKE JSON config timeout: # time to wait for OneKE to be ready - kubecfg_path': # PATH were kubeconfig will be stored \ No newline at end of file + kubecfg_path': # PATH were kubeconfig will be stored + + oneflow_url: + oneserver_url: + one_auth: \ No newline at end of file From a166b6f1f88124d692f5ee627559c175ea475b58 Mon Sep 17 00:00:00 2001 From: MarioRobres Date: Tue, 2 Jul 2024 16:19:26 +0200 Subject: [PATCH 18/27] F OpenNebula/one-aiops#70: improve scaling functionality --- lithops/serverless/backends/one/config.py | 4 +- lithops/serverless/backends/one/one.py | 123 +++++++++++++----- .../serverless/backends/one/one_config.yaml | 10 +- 3 files changed, 99 insertions(+), 38 deletions(-) diff --git a/lithops/serverless/backends/one/config.py b/lithops/serverless/backends/one/config.py index 0b39682ce..848c94859 100644 --- a/lithops/serverless/backends/one/config.py +++ b/lithops/serverless/backends/one/config.py @@ -164,7 +164,9 @@ class OneConfigError(Exception): 'kubecfg_path': '/tmp/kube_config', 'oneke_config_path': None, 'delete': False, - 'minimum_nodes': 0 + 'minimum_nodes': 0, + 'maximum_nodes': -1, + 'average_job_execution': 1, }) diff --git a/lithops/serverless/backends/one/one.py b/lithops/serverless/backends/one/one.py index eb9fc6ef2..795a340ac 100644 --- a/lithops/serverless/backends/one/one.py +++ b/lithops/serverless/backends/one/one.py @@ -5,6 +5,7 @@ import pyone import os +import re import json import time import base64 @@ -49,6 +50,13 @@ def __init__(self, one_config, internal_storage): self.name = 'one' self.timeout = one_config['timeout'] self.minimum_nodes = one_config['minimum_nodes'] + self.maximum_nodes = one_config['maximum_nodes'] + self.average_job_execution = one_config['average_job_execution'] + + # Export environment variables + os.environ['ONEFLOW_URL'] = one_config['oneflow_url'] + os.environ['ONESERVER_URL'] = one_config['oneserver_url'] + os.environ['ONE_AUTH'] = one_config['one_auth'] logger.debug("Initializing Oneflow python client") self.client = oneflow.OneFlowClient() @@ -82,8 +90,8 @@ def __init__(self, one_config, internal_storage): def invoke(self, docker_image_name, runtime_memory, job_payload): - super()._get_nodes() - pods, scale_nodes, chunksize, worker_processes = self._granularity( + self._get_nodes() + scale_nodes, pods, chunksize, worker_processes = self._granularity( job_payload['total_calls'] ) @@ -101,12 +109,9 @@ def invoke(self, docker_image_name, runtime_memory, job_payload): def clear(self, job_keys=None): super().clear(job_keys) - # scale down only if the cooldown time has passed - state = self._get_latest_state() - if state != 'COOLDOWN': - super()._get_nodes() - self._scale_oneke(self.nodes, self.minimum_nodes) - + super()._get_nodes() + self._scale_oneke(self.nodes, self.minimum_nodes) + def _check_oneke(self): logger.info("Checking OpenNebula OneKE service status") @@ -222,41 +227,91 @@ def _check_vms_status(self): def _granularity(self, total_functions): - MAX_PODS_PER_NODE = 1 - MAX_FUNCTIONS_PER_POD = 25 - - # Calculate number of WORKERS (PODs) - # TODO: current number of pods depends on node resources - current_pods = len(self.nodes) // MAX_PODS_PER_NODE - req_pods = ( - total_functions + MAX_FUNCTIONS_PER_POD - 1 - ) // MAX_FUNCTIONS_PER_POD - pods = max(req_pods, current_pods) - - # Calculate number of NODES - # TODO: 1 node can have multiple pods - nodes = max(pods, len(self.nodes)) - - # Calculate number of functions executors per WORKER (POD) - # TODO: current number of executors depends on POD resources - worker_processes = 1 + # Set by the user, otherwise calculated based on OpenNebula available Resources + MAX_NODES = 3 + max_nodes= MAX_NODES if self.maximum_nodes is -1 else self.maximum_nodes + # TODO: get info from VM template + cpus_per_new_node=2 + # TODO: monitor Scaling to set this value + first_node_creation_time=90 + additional_node_creation_time=20 + + current_nodes = len(self.nodes) + total_cpus_available = sum(node['total_cpu'] - node['used_cpu'] for node in self.nodes) + current_pods = total_cpus_available + + if total_cpus_available > 0: + estimated_time_no_scaling = (total_functions / total_cpus_available) * self.average_job_execution + else: + estimated_time_no_scaling = float('inf') - # Calculate number of functions per WORKER (POD) - # TODO: depends on worker_processes - chunksize = 1 + best_time = estimated_time_no_scaling + best_nodes_needed = 0 + for additional_nodes in range(1, max_nodes - current_nodes + 1): + new_total_cpus_available = total_cpus_available + (additional_nodes * cpus_per_new_node) + estimated_time_with_scaling = (total_functions / new_total_cpus_available) * self.average_job_execution + + if current_nodes == 0: + total_creation_time = first_node_creation_time + (additional_nodes - 1) * additional_node_creation_time + else: + total_creation_time = additional_node_creation_time * additional_nodes + + total_estimated_time_with_scaling = estimated_time_with_scaling + total_creation_time + + if total_estimated_time_with_scaling < best_time: + best_time = total_estimated_time_with_scaling + best_nodes_needed = additional_nodes + current_pods = total_cpus_available + new_total_cpus_available + + + nodes = current_nodes + best_nodes_needed + pods = current_pods logger.info( - f"Pods: {pods}, Nodes: {nodes}, " - f"Chunksize: {chunksize}, Worker Processes: {worker_processes}" + f"Nodes: {nodes}, Pods: {pods}, Chunksize: 1, Worker Processes: 1" ) - return pods, nodes, chunksize, worker_processes + return nodes, pods, 1, 1 def _scale_oneke(self, nodes, scale_nodes): logger.info(f"Scaling workers from {len(nodes)} to {scale_nodes} nodes") # Ensure the service can be scaled state = self._get_latest_state() - if len(self.nodes) == 0 and state == 'COOLDOWN': - self._wait_for_oneke('RUNNING') + if state == 'COOLDOWN': + if len(self.nodes) == 0: + self._wait_for_oneke('RUNNING') + else: + logger.info("OneKE service is in 'COOLDOWN' state and does not need to be scaled") + return self.client.servicepool[self.service_id].role["worker"].scale(int(scale_nodes)) self._wait_for_oneke('COOLDOWN') + + + def _get_nodes(self): + self.nodes = [] + list_all_nodes = self.core_api.list_node() + for node in list_all_nodes.items: + if node.spec.taints: + continue + + total_cpu = node.status.allocatable['cpu'] + used_cpu = node.status.capacity['cpu'] + total_memory = node.status.allocatable['memory'] + used_memory = node.status.capacity['memory'] + + if 'm' in total_cpu: + total_cpu = int(re.search(r'\d+', total_cpu).group()) / 1000 + if 'm' in used_cpu: + used_cpu = int(re.search(r'\d+', used_cpu).group()) / 1000 + if 'Ki' in total_memory: + total_memory = int(re.search(r'\d+', total_memory).group()) / 1024 + if 'Ki' in used_memory: + used_memory = int(re.search(r'\d+', used_memory).group()) / 1024 + + self.nodes.append({ + "name": node.metadata.name, + "total_cpu": total_cpu, + "total_memory": total_memory, + "used_cpu": used_cpu, + "used_memory": used_memory + }) \ No newline at end of file diff --git a/lithops/serverless/backends/one/one_config.yaml b/lithops/serverless/backends/one/one_config.yaml index 915b9cf0b..31976949f 100644 --- a/lithops/serverless/backends/one/one_config.yaml +++ b/lithops/serverless/backends/one/one_config.yaml @@ -8,13 +8,17 @@ one: public_network_id: # ID for Public vnet private_network_id: # ID for Private vnet (if not passed: create a new one) + oneflow_url: + oneserver_url: + one_auth: + delete: # if set to True, OneKE will be deleted after all the jobs are finished minimum_nodes: # minimum number of nodes in OneKE + maximum_nodes: # maximum number of nodes in OneKE oneke_config_path: # PATH to OneKE JSON config timeout: # time to wait for OneKE to be ready kubecfg_path': # PATH were kubeconfig will be stored - oneflow_url: - oneserver_url: - one_auth: \ No newline at end of file + average_job_execution: # average job execution time + auto_scale: # if set to True, OneKE will be scaled automatically \ No newline at end of file From 2ebae22cf1374b6cea7f7e4df1f593bdda14a10e Mon Sep 17 00:00:00 2001 From: MarioRobres Date: Tue, 2 Jul 2024 18:01:41 +0200 Subject: [PATCH 19/27] F OpenNebula/one-aiops#70: fix scaling functionality --- lithops/serverless/backends/one/one.py | 70 +++++++++----------------- 1 file changed, 24 insertions(+), 46 deletions(-) diff --git a/lithops/serverless/backends/one/one.py b/lithops/serverless/backends/one/one.py index 795a340ac..76aa9c22b 100644 --- a/lithops/serverless/backends/one/one.py +++ b/lithops/serverless/backends/one/one.py @@ -90,13 +90,12 @@ def __init__(self, one_config, internal_storage): def invoke(self, docker_image_name, runtime_memory, job_payload): - self._get_nodes() + super()._get_nodes() scale_nodes, pods, chunksize, worker_processes = self._granularity( job_payload['total_calls'] ) # Scale nodes - # TODO: Add logic to see if it's worth waiting for the cooldown or not if scale_nodes > len(self.nodes): self._scale_oneke(self.nodes, scale_nodes) @@ -229,50 +228,59 @@ def _check_vms_status(self): def _granularity(self, total_functions): # Set by the user, otherwise calculated based on OpenNebula available Resources MAX_NODES = 3 - max_nodes= MAX_NODES if self.maximum_nodes is -1 else self.maximum_nodes + max_nodes = MAX_NODES if self.maximum_nodes == -1 else self.maximum_nodes # TODO: get info from VM template - cpus_per_new_node=2 + cpus_per_new_node = 2 # TODO: monitor Scaling to set this value - first_node_creation_time=90 - additional_node_creation_time=20 + first_node_creation_time = 90 + additional_node_creation_time = 20 current_nodes = len(self.nodes) - total_cpus_available = sum(node['total_cpu'] - node['used_cpu'] for node in self.nodes) + total_cpus_available = int(sum(float(node['cpu']) for node in self.nodes)) current_pods = total_cpus_available if total_cpus_available > 0: estimated_time_no_scaling = (total_functions / total_cpus_available) * self.average_job_execution else: - estimated_time_no_scaling = float('inf') + estimated_time_no_scaling = float('inf') best_time = estimated_time_no_scaling best_nodes_needed = 0 + estimated_execution_time = float('inf') for additional_nodes in range(1, max_nodes - current_nodes + 1): new_total_cpus_available = total_cpus_available + (additional_nodes * cpus_per_new_node) estimated_time_with_scaling = (total_functions / new_total_cpus_available) * self.average_job_execution - if current_nodes == 0: - total_creation_time = first_node_creation_time + (additional_nodes - 1) * additional_node_creation_time + if current_nodes == 0 and additional_nodes == 1: + total_creation_time = first_node_creation_time + elif current_nodes > 0 and additional_nodes == 1: + total_creation_time = additional_node_creation_time else: - total_creation_time = additional_node_creation_time * additional_nodes - + total_creation_time = first_node_creation_time + (additional_nodes - 1) * additional_node_creation_time if current_nodes == 0 else additional_node_creation_time * additional_nodes + total_estimated_time_with_scaling = estimated_time_with_scaling + total_creation_time - if total_estimated_time_with_scaling < best_time: + if total_estimated_time_with_scaling < best_time and new_total_cpus_available <= total_functions: best_time = total_estimated_time_with_scaling best_nodes_needed = additional_nodes - current_pods = total_cpus_available + new_total_cpus_available + current_pods = new_total_cpus_available + estimated_execution_time = estimated_time_with_scaling nodes = current_nodes + best_nodes_needed - pods = current_pods + pods = min(total_functions, current_pods) + logger.info( f"Nodes: {nodes}, Pods: {pods}, Chunksize: 1, Worker Processes: 1" ) + logger.info( + f"Estimated Execution Time (without creation): {estimated_execution_time:.2f} seconds" + ) return nodes, pods, 1, 1 + def _scale_oneke(self, nodes, scale_nodes): logger.info(f"Scaling workers from {len(nodes)} to {scale_nodes} nodes") # Ensure the service can be scaled @@ -284,34 +292,4 @@ def _scale_oneke(self, nodes, scale_nodes): logger.info("OneKE service is in 'COOLDOWN' state and does not need to be scaled") return self.client.servicepool[self.service_id].role["worker"].scale(int(scale_nodes)) - self._wait_for_oneke('COOLDOWN') - - - def _get_nodes(self): - self.nodes = [] - list_all_nodes = self.core_api.list_node() - for node in list_all_nodes.items: - if node.spec.taints: - continue - - total_cpu = node.status.allocatable['cpu'] - used_cpu = node.status.capacity['cpu'] - total_memory = node.status.allocatable['memory'] - used_memory = node.status.capacity['memory'] - - if 'm' in total_cpu: - total_cpu = int(re.search(r'\d+', total_cpu).group()) / 1000 - if 'm' in used_cpu: - used_cpu = int(re.search(r'\d+', used_cpu).group()) / 1000 - if 'Ki' in total_memory: - total_memory = int(re.search(r'\d+', total_memory).group()) / 1024 - if 'Ki' in used_memory: - used_memory = int(re.search(r'\d+', used_memory).group()) / 1024 - - self.nodes.append({ - "name": node.metadata.name, - "total_cpu": total_cpu, - "total_memory": total_memory, - "used_cpu": used_cpu, - "used_memory": used_memory - }) \ No newline at end of file + self._wait_for_oneke('COOLDOWN') \ No newline at end of file From f9234e3741ed854fb8e04dfeee27c179af9313fc Mon Sep 17 00:00:00 2001 From: MarioRobres Date: Wed, 3 Jul 2024 18:50:37 +0200 Subject: [PATCH 20/27] F OpenNebula/one-aiops#70: get host and vm_template resources + refactor --- lithops/serverless/backends/one/one.py | 84 ++++++++++++++++++-------- 1 file changed, 60 insertions(+), 24 deletions(-) diff --git a/lithops/serverless/backends/one/one.py b/lithops/serverless/backends/one/one.py index 76aa9c22b..0e278550a 100644 --- a/lithops/serverless/backends/one/one.py +++ b/lithops/serverless/backends/one/one.py @@ -226,39 +226,32 @@ def _check_vms_status(self): def _granularity(self, total_functions): - # Set by the user, otherwise calculated based on OpenNebula available Resources - MAX_NODES = 3 - max_nodes = MAX_NODES if self.maximum_nodes == -1 else self.maximum_nodes - # TODO: get info from VM template - cpus_per_new_node = 2 - # TODO: monitor Scaling to set this value - first_node_creation_time = 90 - additional_node_creation_time = 20 - + _host_cpu, _host_mem = self._one_resources() + _node_cpu, _node_mem = self._node_resources() + # OpenNebula available resources + max_nodes_cpu = int(_host_cpu / _node_cpu) + max_nodes_mem = int(_host_mem / _node_mem) + # OneKE current available resources current_nodes = len(self.nodes) total_cpus_available = int(sum(float(node['cpu']) for node in self.nodes)) - current_pods = total_cpus_available - if total_cpus_available > 0: estimated_time_no_scaling = (total_functions / total_cpus_available) * self.average_job_execution else: estimated_time_no_scaling = float('inf') + + # Set by the user, otherwise calculated based on OpenNebula available Resources + max_nodes = min(max_nodes_cpu, max_nodes_mem) + current_nodes + total_nodes = max_nodes if self.maximum_nodes == -1 else self.maximum_nodes best_time = estimated_time_no_scaling best_nodes_needed = 0 - estimated_execution_time = float('inf') + estimated_execution_time = (total_functions / total_cpus_available) * self.average_job_execution + current_pods = total_cpus_available - for additional_nodes in range(1, max_nodes - current_nodes + 1): - new_total_cpus_available = total_cpus_available + (additional_nodes * cpus_per_new_node) + for additional_nodes in range(1, total_nodes - current_nodes + 1): + new_total_cpus_available = total_cpus_available + (additional_nodes * int(_node_cpu)) estimated_time_with_scaling = (total_functions / new_total_cpus_available) * self.average_job_execution - - if current_nodes == 0 and additional_nodes == 1: - total_creation_time = first_node_creation_time - elif current_nodes > 0 and additional_nodes == 1: - total_creation_time = additional_node_creation_time - else: - total_creation_time = first_node_creation_time + (additional_nodes - 1) * additional_node_creation_time if current_nodes == 0 else additional_node_creation_time * additional_nodes - + total_creation_time = self._get_total_creation_time(additional_nodes) total_estimated_time_with_scaling = estimated_time_with_scaling + total_creation_time if total_estimated_time_with_scaling < best_time and new_total_cpus_available <= total_functions: @@ -267,7 +260,6 @@ def _granularity(self, total_functions): current_pods = new_total_cpus_available estimated_execution_time = estimated_time_with_scaling - nodes = current_nodes + best_nodes_needed pods = min(total_functions, current_pods) @@ -292,4 +284,48 @@ def _scale_oneke(self, nodes, scale_nodes): logger.info("OneKE service is in 'COOLDOWN' state and does not need to be scaled") return self.client.servicepool[self.service_id].role["worker"].scale(int(scale_nodes)) - self._wait_for_oneke('COOLDOWN') \ No newline at end of file + self._wait_for_oneke('COOLDOWN') + + + def _one_resources(self): + hostpool = self.pyone.hostpool.info() + host = hostpool.HOST[0] + + total_cpu = host.HOST_SHARE.TOTAL_CPU + used_cpu = host.HOST_SHARE.CPU_USAGE + + total_memory = host.HOST_SHARE.TOTAL_MEM + used_memory = host.HOST_SHARE.MEM_USAGE + + one_cpu = (total_cpu - used_cpu)/100 + one_memory = (total_memory - used_memory)/1000 + logger.info( + f"Available CPU: {one_cpu}, Available Memory: {one_memory}" + ) + return one_cpu, one_memory + + + def _node_resources(self): + _service_json = self.client.servicepool[self.service_id].info() + _service_roles = _service_json[str(self.service_id)]['TEMPLATE']['BODY']['roles'] + + for role in _service_roles: + if role['name'] == 'worker': + vm_template_id = role['vm_template'] + break + + vm_template = self.pyone.template.info(int(vm_template_id)).TEMPLATE + template_cpu = float(vm_template['CPU']) + template_memory = float(vm_template['MEMORY']) + logger.info(f"Template CPU: {template_cpu}, Template Memory: {template_memory}") + return template_cpu, template_memory + + + def _get_total_creation_time(self, additional_nodes): + # TODO: monitor Scaling to set this value + first_node_creation_time = 90 + additional_node_creation_time = 20 + + if additional_nodes == 1: + return first_node_creation_time + return first_node_creation_time + (additional_nodes - 1) * additional_node_creation_time \ No newline at end of file From a5b9f13c2650993e06680b85b694c8714b097dde Mon Sep 17 00:00:00 2001 From: MarioRobres Date: Thu, 4 Jul 2024 10:13:20 +0200 Subject: [PATCH 21/27] F OpenNebula/one-aiops#70: get runtime meta if oneke has 0 workers --- lithops/serverless/backends/one/one.py | 19 ++++++++++++------- 1 file changed, 12 insertions(+), 7 deletions(-) diff --git a/lithops/serverless/backends/one/one.py b/lithops/serverless/backends/one/one.py index 0e278550a..daf58ffc3 100644 --- a/lithops/serverless/backends/one/one.py +++ b/lithops/serverless/backends/one/one.py @@ -234,16 +234,14 @@ def _granularity(self, total_functions): # OneKE current available resources current_nodes = len(self.nodes) total_cpus_available = int(sum(float(node['cpu']) for node in self.nodes)) - if total_cpus_available > 0: - estimated_time_no_scaling = (total_functions / total_cpus_available) * self.average_job_execution - else: - estimated_time_no_scaling = float('inf') - # Set by the user, otherwise calculated based on OpenNebula available Resources max_nodes = min(max_nodes_cpu, max_nodes_mem) + current_nodes total_nodes = max_nodes if self.maximum_nodes == -1 else self.maximum_nodes - best_time = estimated_time_no_scaling + if total_cpus_available > 0: + best_time = (total_functions / total_cpus_available) * self.average_job_execution + else: + best_time = float('inf') best_nodes_needed = 0 estimated_execution_time = (total_functions / total_cpus_available) * self.average_job_execution current_pods = total_cpus_available @@ -328,4 +326,11 @@ def _get_total_creation_time(self, additional_nodes): if additional_nodes == 1: return first_node_creation_time - return first_node_creation_time + (additional_nodes - 1) * additional_node_creation_time \ No newline at end of file + return first_node_creation_time + (additional_nodes - 1) * additional_node_creation_time + + + def _generate_runtime_meta(self, docker_image_name): + super()._get_nodes() + if len(self.nodes) == 0: + self._scale_oneke(self.nodes, 1) + super()._generate_runtime_meta(docker_image_name) \ No newline at end of file From 7625f121a0ace1ed49730f22f9132174783c3246 Mon Sep 17 00:00:00 2001 From: MarioRobres Date: Wed, 10 Jul 2024 12:16:36 +0200 Subject: [PATCH 22/27] F OpenNebula/one-aiops#70: fix divide by 0 & runtime metadata --- lithops/serverless/backends/one/one.py | 22 ++++++++++++---------- 1 file changed, 12 insertions(+), 10 deletions(-) diff --git a/lithops/serverless/backends/one/one.py b/lithops/serverless/backends/one/one.py index daf58ffc3..3c785c98e 100644 --- a/lithops/serverless/backends/one/one.py +++ b/lithops/serverless/backends/one/one.py @@ -87,7 +87,7 @@ def __init__(self, one_config, internal_storage): self.kubecfg_path = one_config['kubecfg_path'] super().__init__(one_config, internal_storage) - + def invoke(self, docker_image_name, runtime_memory, job_payload): super()._get_nodes() @@ -103,7 +103,7 @@ def invoke(self, docker_image_name, runtime_memory, job_payload): job_payload['max_workers'] = pods job_payload['chunksize'] = chunksize job_payload['worker_processes'] = worker_processes - super().invoke(docker_image_name, runtime_memory, job_payload) + return super().invoke(docker_image_name, runtime_memory, job_payload) def clear(self, job_keys=None): @@ -237,20 +237,23 @@ def _granularity(self, total_functions): # Set by the user, otherwise calculated based on OpenNebula available Resources max_nodes = min(max_nodes_cpu, max_nodes_mem) + current_nodes total_nodes = max_nodes if self.maximum_nodes == -1 else self.maximum_nodes - + if total_cpus_available > 0: best_time = (total_functions / total_cpus_available) * self.average_job_execution else: best_time = float('inf') best_nodes_needed = 0 - estimated_execution_time = (total_functions / total_cpus_available) * self.average_job_execution + estimated_execution_time = best_time current_pods = total_cpus_available for additional_nodes in range(1, total_nodes - current_nodes + 1): new_total_cpus_available = total_cpus_available + (additional_nodes * int(_node_cpu)) - estimated_time_with_scaling = (total_functions / new_total_cpus_available) * self.average_job_execution - total_creation_time = self._get_total_creation_time(additional_nodes) - total_estimated_time_with_scaling = estimated_time_with_scaling + total_creation_time + if new_total_cpus_available > 0: + estimated_time_with_scaling = (total_functions / new_total_cpus_available) * self.average_job_execution + total_creation_time = self._get_total_creation_time(additional_nodes) + total_estimated_time_with_scaling = estimated_time_with_scaling + total_creation_time + else: + total_estimated_time_with_scaling = float('inf') if total_estimated_time_with_scaling < best_time and new_total_cpus_available <= total_functions: best_time = total_estimated_time_with_scaling @@ -270,7 +273,6 @@ def _granularity(self, total_functions): return nodes, pods, 1, 1 - def _scale_oneke(self, nodes, scale_nodes): logger.info(f"Scaling workers from {len(nodes)} to {scale_nodes} nodes") # Ensure the service can be scaled @@ -329,8 +331,8 @@ def _get_total_creation_time(self, additional_nodes): return first_node_creation_time + (additional_nodes - 1) * additional_node_creation_time - def _generate_runtime_meta(self, docker_image_name): + def deploy_runtime(self, docker_image_name, memory, timeout): super()._get_nodes() if len(self.nodes) == 0: self._scale_oneke(self.nodes, 1) - super()._generate_runtime_meta(docker_image_name) \ No newline at end of file + return super().deploy_runtime(docker_image_name, memory, timeout) \ No newline at end of file From 9ea2f09ede4ff8d804ccdbbead67a49a75c662cf Mon Sep 17 00:00:00 2001 From: MarioRobres Date: Wed, 10 Jul 2024 13:58:46 +0200 Subject: [PATCH 23/27] F OpenNebula/one-aiops#70: duplicate clears and not detected kubernetes nodes --- lithops/serverless/backends/one/one.py | 16 +++++++++++++--- 1 file changed, 13 insertions(+), 3 deletions(-) diff --git a/lithops/serverless/backends/one/one.py b/lithops/serverless/backends/one/one.py index 3c785c98e..ccf6f3605 100644 --- a/lithops/serverless/backends/one/one.py +++ b/lithops/serverless/backends/one/one.py @@ -52,6 +52,7 @@ def __init__(self, one_config, internal_storage): self.minimum_nodes = one_config['minimum_nodes'] self.maximum_nodes = one_config['maximum_nodes'] self.average_job_execution = one_config['average_job_execution'] + self.job_clean = set() # Export environment variables os.environ['ONEFLOW_URL'] = one_config['oneflow_url'] @@ -96,6 +97,11 @@ def invoke(self, docker_image_name, runtime_memory, job_payload): ) # Scale nodes + if scale_nodes == 0 and len(self.nodes) == 0: + raise OneError( + f"No nodes available and can't scale. Ensure nodes are active, detected by " + f"Kubernetes, and have enough resources to scale." + ) if scale_nodes > len(self.nodes): self._scale_oneke(self.nodes, scale_nodes) @@ -107,9 +113,13 @@ def invoke(self, docker_image_name, runtime_memory, job_payload): def clear(self, job_keys=None): - super().clear(job_keys) - super()._get_nodes() - self._scale_oneke(self.nodes, self.minimum_nodes) + if job_keys: + new_keys = [key for key in job_keys if key not in self.job_clean] + if new_keys: + self.job_clean.update(new_keys) + super().clear(job_keys) + super()._get_nodes() + self._scale_oneke(self.nodes, self.minimum_nodes) def _check_oneke(self): From 90e12b10f0399f90f90e8ac3931106444f2ced53 Mon Sep 17 00:00:00 2001 From: MarioRobres Date: Wed, 24 Jul 2024 14:57:57 +0200 Subject: [PATCH 24/27] F OpenNebula/one-aiops#70: add scale on demand, pod resources, fix calculations + refactor --- lithops/serverless/backends/one/config.py | 18 +--- lithops/serverless/backends/one/one.py | 95 +++++++++++-------- .../serverless/backends/one/one_config.yaml | 23 ++--- 3 files changed, 67 insertions(+), 69 deletions(-) diff --git a/lithops/serverless/backends/one/config.py b/lithops/serverless/backends/one/config.py index 848c94859..3d9bbe559 100644 --- a/lithops/serverless/backends/one/config.py +++ b/lithops/serverless/backends/one/config.py @@ -4,7 +4,6 @@ from lithops.serverless.backends.k8s.config import ( DEFAULT_CONFIG_KEYS, - DOCKERFILE_DEFAULT, load_config as original_load_config ) @@ -18,7 +17,6 @@ class OneConfigError(Exception): "private_network_id" } - OPTIONAL_CONFIG_KEYS = { "ONEAPP_VROUTER_ETH0_VIP0": "", "ONEAPP_VROUTER_ETH1_VIP0": "", @@ -52,7 +50,6 @@ class OneConfigError(Exception): "ONEAPP_VNF_ROUTER4_INTERFACES": "eth0,eth1" } - DEFAULT_PRIVATE_VNET = """ NAME = "private-oneke" VN_MAD = "bridge" @@ -60,17 +57,6 @@ class OneConfigError(Exception): AR = [TYPE = "IP4", IP = "192.168.150.0", SIZE = "51"] """ - -FH_ZIP_LOCATION = os.path.join(os.getcwd(), 'lithops_one.zip') - - -# Overwrite default Dockerfile -DOCKERFILE_DEFAULT = "\n".join(DOCKERFILE_DEFAULT.split('\n')[:-2]) + """ -COPY lithops_one.zip . -RUN unzip lithops_one.zip && rm lithops_one.zip -""" - - STATE = { 0: "INIT", 1: "PENDING", @@ -85,7 +71,6 @@ class OneConfigError(Exception): 11: "CLONING_FAILURE" } - LCM_STATE = { 0: "LCM_INIT", 1: "PROLOG", @@ -157,7 +142,6 @@ class OneConfigError(Exception): 70: "BACKUP_POWEROFF" } - # Add OpenNebula defaults DEFAULT_CONFIG_KEYS.update({ 'timeout': 600, @@ -167,6 +151,7 @@ class OneConfigError(Exception): 'minimum_nodes': 0, 'maximum_nodes': -1, 'average_job_execution': 1, + 'auto_scale': 'all', }) @@ -197,6 +182,7 @@ def load_config(config_data): # Override oneke_config with a valid JSON to update the service config_data['one']['oneke_config'] = json.dumps(oneke_update) + # Load default config for key in DEFAULT_CONFIG_KEYS: if key not in config_data['one']: config_data['one'][key] = DEFAULT_CONFIG_KEYS[key] diff --git a/lithops/serverless/backends/one/one.py b/lithops/serverless/backends/one/one.py index ccf6f3605..e5a662347 100644 --- a/lithops/serverless/backends/one/one.py +++ b/lithops/serverless/backends/one/one.py @@ -49,9 +49,12 @@ def __init__(self, one_config, internal_storage): # Overwrite config values self.name = 'one' self.timeout = one_config['timeout'] + self.auto_scale = one_config['auto_scale'] self.minimum_nodes = one_config['minimum_nodes'] self.maximum_nodes = one_config['maximum_nodes'] - self.average_job_execution = one_config['average_job_execution'] + self.runtime_cpu = float(one_config['runtime_cpu']) + self.runtime_memory = float(one_config['runtime_memory']) + self.average_job_execution = float(one_config['average_job_execution']) self.job_clean = set() # Export environment variables @@ -102,7 +105,7 @@ def invoke(self, docker_image_name, runtime_memory, job_payload): f"No nodes available and can't scale. Ensure nodes are active, detected by " f"Kubernetes, and have enough resources to scale." ) - if scale_nodes > len(self.nodes): + if scale_nodes > len(self.nodes) and self.auto_scale in {'all', 'up'}: self._scale_oneke(self.nodes, scale_nodes) # Setup granularity @@ -113,13 +116,19 @@ def invoke(self, docker_image_name, runtime_memory, job_payload): def clear(self, job_keys=None): - if job_keys: - new_keys = [key for key in job_keys if key not in self.job_clean] - if new_keys: - self.job_clean.update(new_keys) - super().clear(job_keys) - super()._get_nodes() - self._scale_oneke(self.nodes, self.minimum_nodes) + if not job_keys: + return + + new_keys = [key for key in job_keys if key not in self.job_clean] + if not new_keys: + return + + self.job_clean.update(new_keys) + super().clear(job_keys) + super()._get_nodes() + + if self.auto_scale in {'all', 'down'}: + self._scale_oneke(self.nodes, self.minimum_nodes) def _check_oneke(self): @@ -243,43 +252,36 @@ def _granularity(self, total_functions): max_nodes_mem = int(_host_mem / _node_mem) # OneKE current available resources current_nodes = len(self.nodes) - total_cpus_available = int(sum(float(node['cpu']) for node in self.nodes)) + total_pods_cpu = sum(int(float(node['cpu']) // self.runtime_cpu) for node in self.nodes) + total_pods_mem = sum( + int(self._parse_unit(node['memory']) // self.runtime_memory) + for node in self.nodes + ) + current_pods = min(total_pods_cpu, total_pods_mem) # Set by the user, otherwise calculated based on OpenNebula available Resources max_nodes = min(max_nodes_cpu, max_nodes_mem) + current_nodes total_nodes = max_nodes if self.maximum_nodes == -1 else self.maximum_nodes - - if total_cpus_available > 0: - best_time = (total_functions / total_cpus_available) * self.average_job_execution - else: - best_time = float('inf') - best_nodes_needed = 0 - estimated_execution_time = best_time - current_pods = total_cpus_available - + # Calculate the best time with scaling + best_time = (total_functions / current_pods) * self.average_job_execution if current_pods > 0 else float('inf') for additional_nodes in range(1, total_nodes - current_nodes + 1): - new_total_cpus_available = total_cpus_available + (additional_nodes * int(_node_cpu)) - if new_total_cpus_available > 0: - estimated_time_with_scaling = (total_functions / new_total_cpus_available) * self.average_job_execution + new_pods = min(int(_node_cpu // self.runtime_cpu), int(_node_mem // self.runtime_memory)) + if new_pods > 0 and (current_pods <= total_functions): + estimated_time_with_scaling = (total_functions / (current_pods+new_pods)) * self.average_job_execution total_creation_time = self._get_total_creation_time(additional_nodes) total_estimated_time_with_scaling = estimated_time_with_scaling + total_creation_time - else: - total_estimated_time_with_scaling = float('inf') - - if total_estimated_time_with_scaling < best_time and new_total_cpus_available <= total_functions: - best_time = total_estimated_time_with_scaling - best_nodes_needed = additional_nodes - current_pods = new_total_cpus_available - estimated_execution_time = estimated_time_with_scaling - - nodes = current_nodes + best_nodes_needed + if total_estimated_time_with_scaling < best_time: + best_time = total_estimated_time_with_scaling + current_nodes += 1 + new_pods_cpu = int(_node_cpu // self.runtime_cpu) + new_pods_mem = int(_node_mem // self.runtime_memory) + current_pods += min(new_pods_cpu, new_pods_mem) + + nodes = current_nodes pods = min(total_functions, current_pods) logger.info( f"Nodes: {nodes}, Pods: {pods}, Chunksize: 1, Worker Processes: 1" ) - logger.info( - f"Estimated Execution Time (without creation): {estimated_execution_time:.2f} seconds" - ) return nodes, pods, 1, 1 @@ -332,13 +334,22 @@ def _node_resources(self): def _get_total_creation_time(self, additional_nodes): - # TODO: monitor Scaling to set this value - first_node_creation_time = 90 - additional_node_creation_time = 20 - - if additional_nodes == 1: - return first_node_creation_time - return first_node_creation_time + (additional_nodes - 1) * additional_node_creation_time + # First node creation time is 90 seconds + # Additional nodes take 30 seconds in total, regardless of the number + return 90 if additional_nodes == 1 else 120 + + + def _parse_unit(self, unit_str): + unit = unit_str[-2:] + value = float(unit_str[:-2]) + if unit == 'Ki': + return value + elif unit == 'Mi': + return value * 1024 + elif unit == 'Gi': + return value * 1024 * 1024 + else: + raise ValueError(f"Unsupported unit: {unit_str}") def deploy_runtime(self, docker_image_name, memory, timeout): diff --git a/lithops/serverless/backends/one/one_config.yaml b/lithops/serverless/backends/one/one_config.yaml index 31976949f..304b6dd93 100644 --- a/lithops/serverless/backends/one/one_config.yaml +++ b/lithops/serverless/backends/one/one_config.yaml @@ -2,23 +2,24 @@ lithops: backend: one one: service_id: # oneke service id (means OneKE is already deployed) - service_template_id: # oneke_tempalte_id (client has downloaded before) + service_template_id: # oneke_tempalte_id (client has only downloaded the template) oneke_config: public_network_id: # ID for Public vnet - private_network_id: # ID for Private vnet (if not passed: create a new one) + private_network_id: # ID for Private vnet - oneflow_url: - oneserver_url: - one_auth: + oneflow_url: # OneFlow server URL + oneserver_url: #Oned server URL + one_auth: # OpenNebula credentials - delete: # if set to True, OneKE will be deleted after all the jobs are finished - minimum_nodes: # minimum number of nodes in OneKE + runtime_cpu: # number of vCPU x POD (default: 1) + runtime_memory: # amount of memory x POD (default: 512MiB) + minimum_nodes: # minimum number of nodes in OneKE (default: 0) maximum_nodes: # maximum number of nodes in OneKE oneke_config_path: # PATH to OneKE JSON config - timeout: # time to wait for OneKE to be ready - kubecfg_path': # PATH were kubeconfig will be stored + timeout: # time to wait for OneKE to be ready (default: 600s) + kubecfg_path': # PATH were kubeconfig will be stored (default: `/tmp/kubeconfig`) - average_job_execution: # average job execution time - auto_scale: # if set to True, OneKE will be scaled automatically \ No newline at end of file + average_job_execution: # average job execution time (default: 1s) + auto_scale: # options: `'all'` (default), `'up'`, `'down'`, `'none'` \ No newline at end of file From a5d7a71dbceada807452d3deb30cb8baff64439d Mon Sep 17 00:00:00 2001 From: MarioRobres Date: Thu, 25 Jul 2024 11:19:37 +0200 Subject: [PATCH 25/27] F OpenNebula/one-aiops#70: set node and vcpu limit --- lithops/serverless/backends/one/one.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/lithops/serverless/backends/one/one.py b/lithops/serverless/backends/one/one.py index e5a662347..abc67da1e 100644 --- a/lithops/serverless/backends/one/one.py +++ b/lithops/serverless/backends/one/one.py @@ -252,7 +252,7 @@ def _granularity(self, total_functions): max_nodes_mem = int(_host_mem / _node_mem) # OneKE current available resources current_nodes = len(self.nodes) - total_pods_cpu = sum(int(float(node['cpu']) // self.runtime_cpu) for node in self.nodes) + total_pods_cpu = sum(int((float(node['cpu'])-1) // self.runtime_cpu) for node in self.nodes) total_pods_mem = sum( int(self._parse_unit(node['memory']) // self.runtime_memory) for node in self.nodes @@ -260,7 +260,7 @@ def _granularity(self, total_functions): current_pods = min(total_pods_cpu, total_pods_mem) # Set by the user, otherwise calculated based on OpenNebula available Resources max_nodes = min(max_nodes_cpu, max_nodes_mem) + current_nodes - total_nodes = max_nodes if self.maximum_nodes == -1 else self.maximum_nodes + total_nodes = max_nodes if self.maximum_nodes == -1 else min(self.maximum_nodes, max_nodes) # Calculate the best time with scaling best_time = (total_functions / current_pods) * self.average_job_execution if current_pods > 0 else float('inf') for additional_nodes in range(1, total_nodes - current_nodes + 1): From 198170bed24a4e4d776c8473cf1d2a3233162b9c Mon Sep 17 00:00:00 2001 From: MarioRobres Date: Fri, 26 Jul 2024 10:08:41 +0200 Subject: [PATCH 26/27] F OpenNebula/one-aiops#70: wait until kubernetes detect all workers nodes --- lithops/serverless/backends/one/one.py | 20 ++++++++++++++++++-- 1 file changed, 18 insertions(+), 2 deletions(-) diff --git a/lithops/serverless/backends/one/one.py b/lithops/serverless/backends/one/one.py index abc67da1e..7a136c2b7 100644 --- a/lithops/serverless/backends/one/one.py +++ b/lithops/serverless/backends/one/one.py @@ -94,12 +94,17 @@ def __init__(self, one_config, internal_storage): def invoke(self, docker_image_name, runtime_memory, job_payload): + # Wait for nodes to become available in Kubernetes + vms = len(self._get_vm_workers()) super()._get_nodes() + while len(self.nodes) < vms: + time.sleep(1) + super()._get_nodes() + + # Scale nodes scale_nodes, pods, chunksize, worker_processes = self._granularity( job_payload['total_calls'] ) - - # Scale nodes if scale_nodes == 0 and len(self.nodes) == 0: raise OneError( f"No nodes available and can't scale. Ensure nodes are active, detected by " @@ -244,6 +249,17 @@ def _check_vms_status(self): ) + def _get_vm_workers(self): + workers_ids = set() + _service_json = self.client.servicepool[self.service_id].info() + roles = _service_json[str(self.service_id)]['TEMPLATE']['BODY']['roles'] + for role in roles: + if role['name'] == 'worker': + for node in role['nodes']: + workers_ids.add(node['vm_info']['VM']['ID']) + return workers_ids + + def _granularity(self, total_functions): _host_cpu, _host_mem = self._one_resources() _node_cpu, _node_mem = self._node_resources() From 5eb427e7a3f37293846eca50d1854fe464082885 Mon Sep 17 00:00:00 2001 From: MarioRobres Date: Mon, 29 Jul 2024 08:41:36 +0200 Subject: [PATCH 27/27] F OpenNebula/one-aiops#70: wait for nodes after scaling --- lithops/serverless/backends/one/one.py | 18 +++++++++++------- 1 file changed, 11 insertions(+), 7 deletions(-) diff --git a/lithops/serverless/backends/one/one.py b/lithops/serverless/backends/one/one.py index 7a136c2b7..b289e64ab 100644 --- a/lithops/serverless/backends/one/one.py +++ b/lithops/serverless/backends/one/one.py @@ -5,7 +5,6 @@ import pyone import os -import re import json import time import base64 @@ -95,11 +94,8 @@ def __init__(self, one_config, internal_storage): def invoke(self, docker_image_name, runtime_memory, job_payload): # Wait for nodes to become available in Kubernetes - vms = len(self._get_vm_workers()) - super()._get_nodes() - while len(self.nodes) < vms: - time.sleep(1) - super()._get_nodes() + vm_workers = len(self._get_vm_workers()) + self._wait_kubernetes_nodes(vm_workers) # Scale nodes scale_nodes, pods, chunksize, worker_processes = self._granularity( @@ -112,6 +108,7 @@ def invoke(self, docker_image_name, runtime_memory, job_payload): ) if scale_nodes > len(self.nodes) and self.auto_scale in {'all', 'up'}: self._scale_oneke(self.nodes, scale_nodes) + self._wait_kubernetes_nodes(scale_nodes) # Setup granularity job_payload['max_workers'] = pods @@ -372,4 +369,11 @@ def deploy_runtime(self, docker_image_name, memory, timeout): super()._get_nodes() if len(self.nodes) == 0: self._scale_oneke(self.nodes, 1) - return super().deploy_runtime(docker_image_name, memory, timeout) \ No newline at end of file + return super().deploy_runtime(docker_image_name, memory, timeout) + + + def _wait_kubernetes_nodes(self, total): + super()._get_nodes() + while len(self.nodes) < total: + time.sleep(1) + super()._get_nodes() \ No newline at end of file