From 098bb94b55d04fba743e92035799823330919b10 Mon Sep 17 00:00:00 2001 From: micafer Date: Thu, 26 Nov 2020 09:07:15 +0100 Subject: [PATCH 1/7] Fix/Iprove Kube conn --- IM/connectors/Kubernetes.py | 137 +++++++++++++++++++++++------ test/unit/connectors/Kubernetes.py | 10 ++- 2 files changed, 117 insertions(+), 30 deletions(-) diff --git a/IM/connectors/Kubernetes.py b/IM/connectors/Kubernetes.py index 7a8e9856c..1256fb8fe 100644 --- a/IM/connectors/Kubernetes.py +++ b/IM/connectors/Kubernetes.py @@ -17,13 +17,14 @@ import base64 import json import requests +from netaddr import IPNetwork, IPAddress try: from urlparse import urlparse except ImportError: from urllib.parse import urlparse from IM.VirtualMachine import VirtualMachine from .CloudConnector import CloudConnector -from radl.radl import Feature +from radl.radl import Feature, outport from IM.config import Config @@ -34,7 +35,7 @@ class KubernetesCloudConnector(CloudConnector): type = "Kubernetes" - _port_base_num = 35000 + _port_base_num = 30000 """ Base number to assign SSH port on Kubernetes node.""" _port_counter = 0 """ Counter to assign SSH port on Kubernetes node.""" @@ -51,6 +52,10 @@ class KubernetesCloudConnector(CloudConnector): } """Dictionary with a map with the Kubernetes POD states to the IM states.""" + def __init__(self, cloud_info, inf): + self.apiVersion = None + CloudConnector.__init__(self, cloud_info, inf) + def create_request(self, method, url, auth_data, headers=None, body=None): auth_header = self.get_auth_header(auth_data) if auth_header: @@ -93,6 +98,9 @@ def get_api_version(self, auth_data): """ Return the API version to use to connect with kubernetes API server """ + if self.apiVersion: + return self.apiVersion + version = self._apiVersions[0] try: @@ -102,11 +110,11 @@ def get_api_version(self, auth_data): output = json.loads(resp.text) for v in self._apiVersions: if v in output["versions"]: + self.apiVersion = v return v except Exception: - self.log_exception( - "Error connecting with Kubernetes API server") + self.log_exception("Error connecting with Kubernetes API server") self.log_warn("Error getting a compatible API version. Setting the default one.") self.log_debug("Using %s API version." % version) @@ -133,6 +141,7 @@ def _delete_volume_claim(self, namespace, vc_name, auth_data): try: apiVersion = self.get_api_version(auth_data) + self.log_debug("Deleting PVC: %s/%s" % (namespace, vc_name)) uri = "/api/" + apiVersion + "/namespaces/" + namespace + "/persistentvolumeclaims/" + vc_name resp = self.create_request('DELETE', uri, auth_data) @@ -202,6 +211,7 @@ def _create_volumes(self, apiVersion, namespace, system, pod_name, auth_data, pe claim_data['spec'] = {'accessModes': ['ReadWriteOnce'], 'resources': { 'requests': {'storage': disk_size}}} + self.log_debug("Creating PVC: %s/%s" % (namespace, name)) success = self._create_volume_claim(claim_data, auth_data) if success: res.append((name, disk_device, disk_size, disk_mount_path, persistent)) @@ -214,20 +224,45 @@ def _create_volumes(self, apiVersion, namespace, system, pod_name, auth_data, pe return res - def _generate_pod_data(self, apiVersion, namespace, name, outports, system, ssh_port, volumes): + def _generate_service_data(self, apiVersion, namespace, name, outports, ssh_port): + service_data = {'apiVersion': apiVersion, 'kind': 'Service'} + service_data['metadata'] = { + 'name': name, + 'namespace': namespace, + 'labels': {'name': name} + } + + ports = [{'port': 22, 'targetPort': 22, 'protocol': 'TCP', 'nodePort': ssh_port, 'name': 'ssh'}] + if outports: + for outport in outports: + if outport.is_range(): + self.log_warn("Port range not allowed in Kubernetes connector. Ignoring.") + elif outport.get_local_port() != 22: + ports.append({'port': outport.get_local_port(), 'protocol': outport.get_protocol().upper(), + 'targetPort': outport.get_local_port(), 'nodePort': outport.get_remote_port(), + 'name': 'port%s' % outport.get_local_port()}) + + service_data['spec'] = { + 'type': 'NodePort', + 'ports': ports, + 'selector': {'name': name} + } + + return service_data + + def _generate_pod_data(self, apiVersion, namespace, name, outports, system, volumes): cpu = str(system.getValue('cpu.count')) memory = "%s" % system.getFeature('memory.size').getValue('B') # The URI has this format: docker://image_name image_name = system.getValue("disk.0.image.url")[9:] - ports = [{'containerPort': 22, 'protocol': 'TCP', 'hostPort': ssh_port}] + ports = [{'containerPort': 22, 'protocol': 'TCP'}] if outports: for outport in outports: if outport.is_range(): self.log_warn("Port range not allowed in Kubernetes connector. Ignoring.") elif outport.get_local_port() != 22: - ports.append({'containerPort': outport.get_local_port(), 'protocol': outport.get_protocol().upper( - ), 'hostPort': outport.get_remote_port()}) + ports.append({'containerPort': outport.get_local_port(), 'protocol': outport.get_protocol().upper()}) pod_data = {'apiVersion': apiVersion, 'kind': 'Pod'} pod_data['metadata'] = { @@ -239,15 +274,17 @@ def _generate_pod_data(self, apiVersion, namespace, name, outports, system, ssh_ command += " ; " command += "apt-get update && apt-get install -y openssh-server python" command += " ; " + command += "apk add --no-cache openssh-server python" + command += " ; " command += "mkdir /var/run/sshd" command += " ; " - command += "sed -i '/PermitRootLogin/c\PermitRootLogin yes' /etc/ssh/sshd_config" + command += "sed -i '/PermitRootLogin/c\\PermitRootLogin yes' /etc/ssh/sshd_config" command += " ; " command += "ssh-keygen -t rsa -f /etc/ssh/ssh_host_rsa_key -N ''" command += " ; " command += "echo 'root:" + self._root_password + "' | chpasswd" command += " ; " - command += "sed 's@session\s*required\s*pam_loginuid.so@session optional pam_loginuid.so@g' -i /etc/pam.d/sshd" + command += "sed 's@session\\s*required\\s*pam_loginuid.so@session optional pam_loginuid.so@g' -i /etc/pam.d/sshd" command += " ; " command += " /usr/sbin/sshd -D" containers = [{ @@ -309,6 +346,7 @@ def launch(self, inf, radl, requested_radl, num_vm, auth_data): with inf._lock: resp = self.create_request('GET', uri + "/" + namespace, auth_data, headers) if resp.status_code != 200: + self.log_debug("Creating Namespace: %s" % namespace) namespace_data = {'apiVersion': apiVersion, 'kind': 'Namespace', 'metadata': {'name': namespace}} body = json.dumps(namespace_data) @@ -334,12 +372,10 @@ def launch(self, inf, radl, requested_radl, num_vm, auth_data): # Do not use the Persistent volumes yet volumes = self._create_volumes(apiVersion, namespace, system, pod_name, auth_data) - ssh_port = (KubernetesCloudConnector._port_base_num + KubernetesCloudConnector._port_counter) % 65535 - KubernetesCloudConnector._port_counter += 1 - pod_data = self._generate_pod_data(apiVersion, namespace, pod_name, outports, - system, ssh_port, volumes) + pod_data = self._generate_pod_data(apiVersion, namespace, pod_name, outports, system, volumes) body = json.dumps(pod_data) + self.log_debug("Creating POD: %s/%s" % (namespace, pod_name)) uri = "/api/" + apiVersion + "/namespaces/" + namespace + "/pods" resp = self.create_request('POST', uri, auth_data, headers, body) @@ -350,15 +386,31 @@ def launch(self, inf, radl, requested_radl, num_vm, auth_data): except Exception: self.log_exception("Error deleting volumes.") else: + ssh_port = vm.getSSHPort() + if ssh_port == 22: + ssh_port = (KubernetesCloudConnector._port_base_num + KubernetesCloudConnector._port_counter) % 65535 + KubernetesCloudConnector._port_counter += 1 + + try: + service_data = self._generate_service_data(apiVersion, namespace, pod_name, outports, ssh_port) + body = json.dumps(service_data) + self.log_debug("Creating Service: %s/%s" % (namespace, pod_name)) + uri = "/api/" + apiVersion + "/namespaces/" + namespace + "/services" + svc_resp = self.create_request('POST', uri, auth_data, headers, body) + if svc_resp.status_code != 201: + self.error_messages += "Error creating service to access pod %s" % pod_name + self.log_warn("Error creating service.") + except Exception: + self.error_messages += "Error creating service to access pod %s" % pod_name + self.log_exception("Error creating service.") + output = json.loads(resp.text) vm.id = output["metadata"]["name"] # Set SSH port in the RADL info of the VM vm.setSSHPort(ssh_port) # Set the default user and password to access the container - vm.info.systems[0].setValue( - 'disk.0.os.credentials.username', 'root') - vm.info.systems[0].setValue( - 'disk.0.os.credentials.password', self._root_password) + vm.info.systems[0].setValue('disk.0.os.credentials.username', 'root') + vm.info.systems[0].setValue('disk.0.os.credentials.password', self._root_password) vm.info.systems[0].setValue('instance_id', str(vm.id)) vm.info.systems[0].setValue('instance_name', str(vm.id)) @@ -387,16 +439,14 @@ def _get_pod(self, vm, auth_data): return (False, resp.status_code, resp.text) except Exception as ex: - self.log_exception( - "Error connecting with Kubernetes API server") + self.log_exception("Error connecting with Kubernetes API server") return (False, None, "Error connecting with Kubernetes API server: " + str(ex)) def updateVMInfo(self, vm, auth_data): success, status, output = self._get_pod(vm, auth_data) if success: output = json.loads(output) - vm.state = self.VM_STATE_MAP.get( - output["status"]["phase"], VirtualMachine.UNKNOWN) + vm.state = self.VM_STATE_MAP.get(output["status"]["phase"], VirtualMachine.UNKNOWN) # Update the network info self.setIPs(vm, output) @@ -405,8 +455,7 @@ def updateVMInfo(self, vm, auth_data): self.log_error("Error getting info about the POD: code: %s, msg: %s" % (status, output)) return (False, "Error getting info about the POD: code: %s, msg: %s" % (status, output)) - @staticmethod - def setIPs(vm, pod_info): + def setIPs(self, vm, pod_info): """ Adapt the RADL information of the VM to the real IPs assigned by the cloud provider @@ -418,24 +467,33 @@ def setIPs(vm, pod_info): public_ips = [] private_ips = [] if 'hostIP' in pod_info["status"]: - public_ips = [str(pod_info["status"]["hostIP"])] + host_ip = str(pod_info["status"]["hostIP"]) + is_private = any([IPAddress(host_ip) in IPNetwork(mask) for mask in Config.PRIVATE_NET_MASKS]) + if is_private: + public_ips = [self.cloud.server] + else: + public_ips = [host_ip] if 'podIP' in pod_info["status"]: private_ips = [str(pod_info["status"]["podIP"])] vm.setIps(public_ips, private_ips) def finalize(self, vm, last, auth_data): + msg = "" if vm.id: success, status, output = self._get_pod(vm, auth_data) if success: if status == 404: self.log_warn("Trying to remove a non existing POD id: %s" % vm.id) - return (True, vm.id) else: pod_data = json.loads(output) self._delete_volume_claims(pod_data, auth_data) + success, msg = self._delete_pod(vm, auth_data) + if not success: + self.log_error("Error deleting Pod %s: %s" % (vm.id, msg)) + return False, "Error deleting Pod %s: %s" % (vm.id, msg) - success = self._delete_pod(vm, auth_data) + success, msg = self._delete_service(vm, auth_data) else: self.log_warn("No VM ID. Ignoring") success = True @@ -443,11 +501,12 @@ def finalize(self, vm, last, auth_data): if last: self._delete_namespace(vm, auth_data) - return success + return success, msg def _delete_namespace(self, vm, auth_data): apiVersion = self.get_api_version(auth_data) headers = {'Content-Type': 'application/json'} + self.log_debug("Deleting Namespace: %s" % vm.inf.id) uri = "/api/" + apiVersion + "/namespaces/" + vm.inf.id resp = self.create_request('DELETE', uri, auth_data, headers) if resp.status_code != 200: @@ -455,11 +514,33 @@ def _delete_namespace(self, vm, auth_data): return False return True + def _delete_service(self, vm, auth_data): + try: + namespace = vm.inf.id + service_name = vm.id + + self.log_debug("Deleting Service: %s/%s" % (namespace, service_name)) + apiVersion = self.get_api_version(auth_data) + uri = "/api/" + apiVersion + "/namespaces/" + namespace + "/services/" + service_name + resp = self.create_request('DELETE', uri, auth_data) + + if resp.status_code == 404: + self.log_warn("Trying to remove a non existing Service id: " + service_name) + return (True, service_name) + elif resp.status_code != 200: + return (False, "Error deleting the Service: " + resp.text) + else: + return (True, service_name) + except Exception: + self.log_exception("Error connecting with Kubernetes API server") + return (False, "Error connecting with Kubernetes API server") + def _delete_pod(self, vm, auth_data): try: namespace = vm.inf.id pod_name = vm.id + self.log_debug("Deleting POD: %s/%s" % (namespace, pod_name)) apiVersion = self.get_api_version(auth_data) uri = "/api/" + apiVersion + "/namespaces/" + namespace + "/pods/" + pod_name resp = self.create_request('DELETE', uri, auth_data) diff --git a/test/unit/connectors/Kubernetes.py b/test/unit/connectors/Kubernetes.py index 0288e3edd..534802374 100755 --- a/test/unit/connectors/Kubernetes.py +++ b/test/unit/connectors/Kubernetes.py @@ -93,12 +93,16 @@ def get_response(self, method, url, verify, headers, data): if url.endswith("/pods"): resp.status_code = 201 resp.text = '{"metadata": {"namespace":"namespace", "name": "name"}}' - if url.endswith("/namespaces"): + elif url.endswith("/services"): + resp.status_code = 201 + elif url.endswith("/namespaces"): resp.status_code = 201 elif method == "DELETE": if url.endswith("/pods/1"): resp.status_code = 200 - if url.endswith("/namespaces/namespace"): + elif url.endswith("/services/1"): + resp.status_code = 200 + elif url.endswith("/namespaces/namespace"): resp.status_code = 200 elif "persistentvolumeclaims" in url: resp.status_code = 200 @@ -171,6 +175,8 @@ def test_30_updateVMInfo(self, requests): success, vm = kube_cloud.updateVMInfo(vm, auth) self.assertTrue(success, msg="ERROR: updating VM info.") + self.assertEqual(vm.info.systems[0].getValue("net_interface.0.ip"), "158.42.1.1") + self.assertEqual(vm.info.systems[0].getValue("net_interface.1.ip"), "10.0.0.1") self.assertNotIn("ERROR", self.log.getvalue(), msg="ERROR found in log: %s" % self.log.getvalue()) @patch('requests.request') From 64c65dfd10fa6b6dd1dc2287ef709fe81eceadc1 Mon Sep 17 00:00:00 2001 From: micafer Date: Thu, 26 Nov 2020 09:17:53 +0100 Subject: [PATCH 2/7] Fix/Iprove Kube conn --- IM/connectors/Kubernetes.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/IM/connectors/Kubernetes.py b/IM/connectors/Kubernetes.py index 1256fb8fe..bad1cb4d6 100644 --- a/IM/connectors/Kubernetes.py +++ b/IM/connectors/Kubernetes.py @@ -75,9 +75,7 @@ def get_auth_header(self, auth_data): url = urlparse(self.cloud.server) auths = auth_data.getAuthInfo(self.type, url[1]) if not auths: - self.log_error( - "No correct auth data has been specified to Kubernetes.") - return None + raise Exception("No correct auth data has been specified to Kubernetes.") else: auth = auths[0] @@ -91,6 +89,8 @@ def get_auth_header(self, auth_data): elif 'token' in auth: token = auth['token'] auth_header = {'Authorization': 'Bearer ' + token} + else: + raise Exception("No correct auth data has been specified to Kubernetes: username and password or token.") return auth_header From 7e387659b195c76372f7908e9ae1b8cb9dc2567c Mon Sep 17 00:00:00 2001 From: micafer Date: Thu, 26 Nov 2020 13:01:37 +0100 Subject: [PATCH 3/7] Fix/Iprove Kube conn and alpine images --- IM/ConfManager.py | 2 +- IM/connectors/Kubernetes.py | 8 ++++---- contextualization/conf-ansible.yml | 29 +++++++++++++++++++++++++++++ test/unit/connectors/Kubernetes.py | 15 ++++++++++----- 4 files changed, 44 insertions(+), 10 deletions(-) diff --git a/IM/ConfManager.py b/IM/ConfManager.py index 30efea6fc..26ae61fbf 100644 --- a/IM/ConfManager.py +++ b/IM/ConfManager.py @@ -1191,7 +1191,7 @@ def change_master_credentials(self, ssh): # passwd value if passwd and new_passwd: self.log_info("Changing password to master VM") - (out, err, code) = ssh.execute('echo "' + passwd + '" | sudo -S bash -c \'echo "' + + (out, err, code) = ssh.execute('echo "' + passwd + '" | sudo -S sh -c \'echo "' + user + ':' + new_passwd + '" | /usr/sbin/chpasswd && echo "OK"\' 2> /dev/null') diff --git a/IM/connectors/Kubernetes.py b/IM/connectors/Kubernetes.py index bad1cb4d6..cd5b0211f 100644 --- a/IM/connectors/Kubernetes.py +++ b/IM/connectors/Kubernetes.py @@ -270,11 +270,11 @@ def _generate_pod_data(self, apiVersion, namespace, name, outports, system, volu 'namespace': namespace, 'labels': {'name': name} } - command = "yum install -y openssh-server python" + command = "yum install -y openssh-server python sudo" command += " ; " - command += "apt-get update && apt-get install -y openssh-server python" + command += "apt-get update && apt-get install -y openssh-server python sudo" command += " ; " - command += "apk add --no-cache openssh-server python" + command += "apk add --no-cache openssh-server python2 sudo" command += " ; " command += "mkdir /var/run/sshd" command += " ; " @@ -290,7 +290,7 @@ def _generate_pod_data(self, apiVersion, namespace, name, outports, system, volu containers = [{ 'name': name, 'image': image_name, - 'command': ["/bin/bash", "-c", command], + 'command': ["/bin/sh", "-c", command], 'imagePullPolicy': 'IfNotPresent', 'ports': ports, 'resources': {'limits': {'cpu': cpu, 'memory': memory}} diff --git a/contextualization/conf-ansible.yml b/contextualization/conf-ansible.yml index 44898237f..973458774 100644 --- a/contextualization/conf-ansible.yml +++ b/contextualization/conf-ansible.yml @@ -100,6 +100,35 @@ zypper: name=python-pip,python-setuptools,gcc,python-devel,wget,libffi-devel,openssl-devel,python-cryptography,make state=present when: ansible_os_family == "Suse" + - name: Apk install requirements Alpine + package: name=py-setuptools,sshpass,openssh-client,unzip,gcc,libffi-dev,openssl-dev,musl-dev,make,wget,python2-dev + when: ansible_os_family == "Alpine" + + - name: Apk install pip in Alpine 3.11- + package: name=py-pip + when: ansible_os_family == "Alpine" and ansible_distribution_version is version('3.12', '<') + + - name: Apk install pip in Alpine 3.12+ + shell: wget https://bootstrap.pypa.io/get-pip.py && python2 get-pip.py warn=false creates=/usr/local/bin/pip2 chdir=/tmp + when: ansible_os_family == "Alpine" and ansible_distribution_version is version('3.12', '>=') + + - name: Move original ps in Alpine + command: mv /bin/ps /sbin/ps creates=/sbin/ps + when: ansible_os_family == "Alpine" + + - name: Install ps with -p support in Alpine + get_url: + url: https://gist.githubusercontent.com/micafer/f74de4dc21a636df30d51202cbeee475/raw/388945406b9e9d225a0c7e95b97fc2515f1a17ef/ps_opt_p_enabled_for_alpine.sh + dest: /bin/ps + mode: 0755 + owner: root + group: root + when: ansible_os_family == "Alpine" + + - name: Pip install cffi Alpine + pip: name=wheel + when: ansible_os_family == "Alpine" + - name: Install pip and setuptools Py3 package: name=python3-setuptools,python3-pip when: ansible_python.version.major > 2 diff --git a/test/unit/connectors/Kubernetes.py b/test/unit/connectors/Kubernetes.py index 534802374..e4e6d68a3 100755 --- a/test/unit/connectors/Kubernetes.py +++ b/test/unit/connectors/Kubernetes.py @@ -68,7 +68,8 @@ def test_10_concrete(self): radl = radl_parse.parse_radl(radl_data) radl_system = radl.systems[0] - auth = Authentication([{'id': 'fogbow', 'type': 'Kubernetes', 'host': 'http://server.com:8080'}]) + auth = Authentication([{'id': 'kube', 'type': 'Kubernetes', + 'host': 'http://server.com:8080', 'token': 'token'}]) kube_cloud = self.get_kube_cloud() concrete = kube_cloud.concreteSystem(radl_system, auth) @@ -135,7 +136,8 @@ def test_20_launch(self, save_data, requests): radl = radl_parse.parse_radl(radl_data) radl.check() - auth = Authentication([{'id': 'fogbow', 'type': 'Kubernetes', 'host': 'http://server.com:8080'}]) + auth = Authentication([{'id': 'kube', 'type': 'Kubernetes', + 'host': 'http://server.com:8080', 'token': 'token'}]) kube_cloud = self.get_kube_cloud() requests.side_effect = self.get_response @@ -163,7 +165,8 @@ def test_30_updateVMInfo(self, requests): radl = radl_parse.parse_radl(radl_data) radl.check() - auth = Authentication([{'id': 'fogbow', 'type': 'Kubernetes', 'host': 'http://server.com:8080'}]) + auth = Authentication([{'id': 'kube', 'type': 'Kubernetes', + 'host': 'http://server.com:8080', 'token': 'token'}]) kube_cloud = self.get_kube_cloud() inf = MagicMock() @@ -203,7 +206,8 @@ def test_55_alter(self, requests): )""" new_radl = radl_parse.parse_radl(new_radl_data) - auth = Authentication([{'id': 'fogbow', 'type': 'Kubernetes', 'host': 'http://server.com:8080'}]) + auth = Authentication([{'id': 'kube', 'type': 'Kubernetes', + 'host': 'http://server.com:8080', 'token': 'token'}]) kube_cloud = self.get_kube_cloud() inf = MagicMock() @@ -219,7 +223,8 @@ def test_55_alter(self, requests): @patch('requests.request') def test_60_finalize(self, requests): - auth = Authentication([{'id': 'fogbow', 'type': 'Kubernetes', 'host': 'http://server.com:8080'}]) + auth = Authentication([{'id': 'kube', 'type': 'Kubernetes', + 'host': 'http://server.com:8080', 'token': 'token'}]) kube_cloud = self.get_kube_cloud() inf = MagicMock() From cf8b3e2f9fb7b422cb2a7a1bd181eff69993032b Mon Sep 17 00:00:00 2001 From: micafer Date: Thu, 26 Nov 2020 13:03:27 +0100 Subject: [PATCH 4/7] Fix/Iprove Kube conn --- IM/connectors/Kubernetes.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/IM/connectors/Kubernetes.py b/IM/connectors/Kubernetes.py index cd5b0211f..a26637c99 100644 --- a/IM/connectors/Kubernetes.py +++ b/IM/connectors/Kubernetes.py @@ -24,7 +24,7 @@ from urllib.parse import urlparse from IM.VirtualMachine import VirtualMachine from .CloudConnector import CloudConnector -from radl.radl import Feature, outport +from radl.radl import Feature from IM.config import Config From 0eb8037be9f1b7260aecc5120a8329973852bb3f Mon Sep 17 00:00:00 2001 From: micafer Date: Thu, 26 Nov 2020 13:45:41 +0100 Subject: [PATCH 5/7] Fix/Iprove Kube conn --- IM/connectors/Kubernetes.py | 25 +++++++++++++++++++------ 1 file changed, 19 insertions(+), 6 deletions(-) diff --git a/IM/connectors/Kubernetes.py b/IM/connectors/Kubernetes.py index a26637c99..c036b438d 100644 --- a/IM/connectors/Kubernetes.py +++ b/IM/connectors/Kubernetes.py @@ -56,6 +56,13 @@ def __init__(self, cloud_info, inf): self.apiVersion = None CloudConnector.__init__(self, cloud_info, inf) + @staticmethod + def _get_port(): + KubernetesCloudConnector._port_counter += 1 + KubernetesCloudConnector._port_counter %= 35535 + port = KubernetesCloudConnector._port_base_num + KubernetesCloudConnector._port_counter + return port + def create_request(self, method, url, auth_data, headers=None, body=None): auth_header = self.get_auth_header(auth_data) if auth_header: @@ -238,8 +245,13 @@ def _generate_service_data(self, apiVersion, namespace, name, outports, ssh_port if outport.is_range(): self.log_warn("Port range not allowed in Kubernetes connector. Ignoring.") elif outport.get_local_port() != 22: + + remote_port = outport.get_remote_port() + if outport.get_local_port() == remote_port: + remote_port = self._get_port() + ports.append({'port': outport.get_local_port(), 'protocol': outport.get_protocol().upper(), - 'targetPort': outport.get_local_port(), 'nodePort': outport.get_remote_port(), + 'targetPort': outport.get_local_port(), 'nodePort': remote_port, 'name': 'port%s' % outport.get_local_port()}) service_data['spec'] = { @@ -388,8 +400,7 @@ def launch(self, inf, radl, requested_radl, num_vm, auth_data): else: ssh_port = vm.getSSHPort() if ssh_port == 22: - ssh_port = (KubernetesCloudConnector._port_base_num + KubernetesCloudConnector._port_counter) % 65535 - KubernetesCloudConnector._port_counter += 1 + ssh_port = self._get_port() try: service_data = self._generate_service_data(apiVersion, namespace, pod_name, outports, ssh_port) @@ -398,8 +409,8 @@ def launch(self, inf, radl, requested_radl, num_vm, auth_data): uri = "/api/" + apiVersion + "/namespaces/" + namespace + "/services" svc_resp = self.create_request('POST', uri, auth_data, headers, body) if svc_resp.status_code != 201: - self.error_messages += "Error creating service to access pod %s" % pod_name - self.log_warn("Error creating service.") + self.error_messages += "Error creating service to access pod %s: %s" % (pod_name, svc_resp.text) + self.log_warn("Error creating service: %s" % svc_resp.text) except Exception: self.error_messages += "Error creating service to access pod %s" % pod_name self.log_exception("Error creating service.") @@ -509,7 +520,9 @@ def _delete_namespace(self, vm, auth_data): self.log_debug("Deleting Namespace: %s" % vm.inf.id) uri = "/api/" + apiVersion + "/namespaces/" + vm.inf.id resp = self.create_request('DELETE', uri, auth_data, headers) - if resp.status_code != 200: + if resp.status_code == 404: + self.log_warn("Trying to remove a non existing Namespace id: " + vm.inf.id) + elif resp.status_code != 200: self.log_error("Error deleting Namespace") return False return True From 13f7479e45f5a5ca3f6d1d9cd2dbae2a10b348ee Mon Sep 17 00:00:00 2001 From: micafer Date: Thu, 26 Nov 2020 16:51:20 +0100 Subject: [PATCH 6/7] Fix/Improve Kube conn --- IM/connectors/Kubernetes.py | 88 +++++++++++++----------------- test/unit/connectors/Kubernetes.py | 2 +- 2 files changed, 38 insertions(+), 52 deletions(-) diff --git a/IM/connectors/Kubernetes.py b/IM/connectors/Kubernetes.py index c036b438d..17ebd680d 100644 --- a/IM/connectors/Kubernetes.py +++ b/IM/connectors/Kubernetes.py @@ -56,6 +56,10 @@ def __init__(self, cloud_info, inf): self.apiVersion = None CloudConnector.__init__(self, cloud_info, inf) + def _get_api_url(self, auth_data, namespace, path): + apiVersion = self.get_api_version(auth_data) + return "/api/" + apiVersion + "/namespaces/" + namespace + path + @staticmethod def _get_port(): KubernetesCloudConnector._port_counter += 1 @@ -70,8 +74,12 @@ def create_request(self, method, url, auth_data, headers=None, body=None): headers = {} headers.update(auth_header) + if body and isinstance(body, dict): + data = json.dumps(body) + else: + data = body url = "%s://%s:%d%s%s" % (self.cloud.protocol, self.cloud.server, self.cloud.port, self.cloud.path, url) - resp = requests.request(method, url, verify=self.verify_ssl, headers=headers, data=body) + resp = requests.request(method, url, verify=self.verify_ssl, headers=headers, data=data) return resp @@ -146,10 +154,8 @@ def concrete_system(self, radl_system, str_url, auth_data): def _delete_volume_claim(self, namespace, vc_name, auth_data): try: - apiVersion = self.get_api_version(auth_data) - self.log_debug("Deleting PVC: %s/%s" % (namespace, vc_name)) - uri = "/api/" + apiVersion + "/namespaces/" + namespace + "/persistentvolumeclaims/" + vc_name + uri = self._get_api_url(auth_data, namespace, "/persistentvolumeclaims/" + vc_name) resp = self.create_request('DELETE', uri, auth_data) if resp.status_code == 404: @@ -175,14 +181,9 @@ def _delete_volume_claims(self, pod_data, auth_data): def _create_volume_claim(self, claim_data, auth_data): try: - apiVersion = self.get_api_version(auth_data) - headers = {'Content-Type': 'application/json'} - uri = ("/api/" + apiVersion + "/namespaces/" + - claim_data['metadata']['namespace'] + - "/persistentvolumeclaims") - body = json.dumps(claim_data) - resp = self.create_request('POST', uri, auth_data, headers, body) + uri = self._get_api_url(auth_data, claim_data['metadata']['namespace'], "/persistentvolumeclaims") + resp = self.create_request('POST', uri, auth_data, headers, claim_data) output = str(resp.text) if resp.status_code != 201: @@ -191,11 +192,10 @@ def _create_volume_claim(self, claim_data, auth_data): else: return True except Exception: - self.log_exception( - "Error connecting with Kubernetes API server") + self.log_exception("Error connecting with Kubernetes API server") return False - def _create_volumes(self, apiVersion, namespace, system, pod_name, auth_data, persistent=False): + def _create_volumes(self, namespace, system, pod_name, auth_data, persistent=False): res = [] cont = 1 while (system.getValue("disk." + str(cont) + ".size") and @@ -213,7 +213,7 @@ def _create_volumes(self, apiVersion, namespace, system, pod_name, auth_data, pe name = "%s-%d" % (pod_name, cont) if persistent: - claim_data = {'apiVersion': apiVersion, 'kind': 'PersistentVolumeClaim'} + claim_data = {'apiVersion': 'v1', 'kind': 'PersistentVolumeClaim'} claim_data['metadata'] = {'name': name, 'namespace': namespace} claim_data['spec'] = {'accessModes': ['ReadWriteOnce'], 'resources': { 'requests': {'storage': disk_size}}} @@ -231,8 +231,8 @@ def _create_volumes(self, apiVersion, namespace, system, pod_name, auth_data, pe return res - def _generate_service_data(self, apiVersion, namespace, name, outports, ssh_port): - service_data = {'apiVersion': apiVersion, 'kind': 'Service'} + def _generate_service_data(self, namespace, name, outports, ssh_port): + service_data = {'apiVersion': 'v1', 'kind': 'Service'} service_data['metadata'] = { 'name': name, 'namespace': namespace, @@ -262,7 +262,7 @@ def _generate_service_data(self, apiVersion, namespace, name, outports, ssh_port return service_data - def _generate_pod_data(self, apiVersion, namespace, name, outports, system, volumes): + def _generate_pod_data(self, namespace, name, outports, system, volumes): cpu = str(system.getValue('cpu.count')) memory = "%s" % system.getFeature('memory.size').getValue('B') # The URI has this format: docker://image_name @@ -276,7 +276,7 @@ def _generate_pod_data(self, apiVersion, namespace, name, outports, system, volu elif outport.get_local_port() != 22: ports.append({'containerPort': outport.get_local_port(), 'protocol': outport.get_protocol().upper()}) - pod_data = {'apiVersion': apiVersion, 'kind': 'Pod'} + pod_data = {'apiVersion': 'v1', 'kind': 'Pod'} pod_data['metadata'] = { 'name': name, 'namespace': namespace, @@ -348,21 +348,18 @@ def launch(self, inf, radl, requested_radl, num_vm, auth_data): if public_net: outports = public_net.getOutPorts() - apiVersion = self.get_api_version(auth_data) - res = [] # First create the namespace for the infrastructure namespace = inf.id headers = {'Content-Type': 'application/json'} - uri = "/api/" + apiVersion + "/namespaces" + uri = self._get_api_url(auth_data, "", "") with inf._lock: - resp = self.create_request('GET', uri + "/" + namespace, auth_data, headers) + resp = self.create_request('GET', uri + namespace, auth_data, headers) if resp.status_code != 200: self.log_debug("Creating Namespace: %s" % namespace) - namespace_data = {'apiVersion': apiVersion, 'kind': 'Namespace', + namespace_data = {'apiVersion': 'v1', 'kind': 'Namespace', 'metadata': {'name': namespace}} - body = json.dumps(namespace_data) - resp = self.create_request('POST', uri, auth_data, headers, body) + resp = self.create_request('POST', uri, auth_data, headers, namespace_data) if resp.status_code != 201: for _ in range(num_vm): @@ -382,14 +379,13 @@ def launch(self, inf, radl, requested_radl, num_vm, auth_data): pod_name = nodename # Do not use the Persistent volumes yet - volumes = self._create_volumes(apiVersion, namespace, system, pod_name, auth_data) + volumes = self._create_volumes(namespace, system, pod_name, auth_data) - pod_data = self._generate_pod_data(apiVersion, namespace, pod_name, outports, system, volumes) - body = json.dumps(pod_data) + pod_data = self._generate_pod_data(namespace, pod_name, outports, system, volumes) self.log_debug("Creating POD: %s/%s" % (namespace, pod_name)) - uri = "/api/" + apiVersion + "/namespaces/" + namespace + "/pods" - resp = self.create_request('POST', uri, auth_data, headers, body) + uri = self._get_api_url(auth_data, namespace, '/pods') + resp = self.create_request('POST', uri, auth_data, headers, pod_data) if resp.status_code != 201: res.append((False, "Error creating the Container: " + resp.text)) @@ -403,11 +399,10 @@ def launch(self, inf, radl, requested_radl, num_vm, auth_data): ssh_port = self._get_port() try: - service_data = self._generate_service_data(apiVersion, namespace, pod_name, outports, ssh_port) - body = json.dumps(service_data) + service_data = self._generate_service_data(namespace, pod_name, outports, ssh_port) self.log_debug("Creating Service: %s/%s" % (namespace, pod_name)) - uri = "/api/" + apiVersion + "/namespaces/" + namespace + "/services" - svc_resp = self.create_request('POST', uri, auth_data, headers, body) + uri = self._get_api_url(auth_data, namespace, '/services') + svc_resp = self.create_request('POST', uri, auth_data, headers, service_data) if svc_resp.status_code != 201: self.error_messages += "Error creating service to access pod %s: %s" % (pod_name, svc_resp.text) self.log_warn("Error creating service: %s" % svc_resp.text) @@ -439,9 +434,7 @@ def _get_pod(self, vm, auth_data): namespace = vm.inf.id pod_name = vm.id - apiVersion = self.get_api_version(auth_data) - - uri = "/api/" + apiVersion + "/namespaces/" + namespace + "/pods/" + pod_name + uri = self._get_api_url(auth_data, namespace, "/pods/" + pod_name) resp = self.create_request('GET', uri, auth_data) if resp.status_code == 200: @@ -515,11 +508,9 @@ def finalize(self, vm, last, auth_data): return success, msg def _delete_namespace(self, vm, auth_data): - apiVersion = self.get_api_version(auth_data) - headers = {'Content-Type': 'application/json'} self.log_debug("Deleting Namespace: %s" % vm.inf.id) - uri = "/api/" + apiVersion + "/namespaces/" + vm.inf.id - resp = self.create_request('DELETE', uri, auth_data, headers) + uri = self._get_api_url(auth_data, vm.inf.id, '') + resp = self.create_request('DELETE', uri, auth_data) if resp.status_code == 404: self.log_warn("Trying to remove a non existing Namespace id: " + vm.inf.id) elif resp.status_code != 200: @@ -533,8 +524,7 @@ def _delete_service(self, vm, auth_data): service_name = vm.id self.log_debug("Deleting Service: %s/%s" % (namespace, service_name)) - apiVersion = self.get_api_version(auth_data) - uri = "/api/" + apiVersion + "/namespaces/" + namespace + "/services/" + service_name + uri = self._get_api_url(auth_data, namespace, "/services/" + service_name) resp = self.create_request('DELETE', uri, auth_data) if resp.status_code == 404: @@ -554,8 +544,7 @@ def _delete_pod(self, vm, auth_data): pod_name = vm.id self.log_debug("Deleting POD: %s/%s" % (namespace, pod_name)) - apiVersion = self.get_api_version(auth_data) - uri = "/api/" + apiVersion + "/namespaces/" + namespace + "/pods/" + pod_name + uri = self._get_api_url(auth_data, namespace, "/pods/" + pod_name) resp = self.create_request('DELETE', uri, auth_data) if resp.status_code == 404: @@ -583,8 +572,6 @@ def alterVM(self, vm, radl, auth_data): # But kubernetes does not permit cpu to be updated yet system = radl.systems[0] - apiVersion = self.get_api_version(auth_data) - try: pod_data = [] @@ -613,9 +600,8 @@ def alterVM(self, vm, radl, auth_data): pod_name = vm.id headers = {'Content-Type': 'application/json-patch+json'} - uri = "/api/" + apiVersion + "/namespaces/" + namespace + "/pods/" + pod_name - body = json.dumps(pod_data) - resp = self.create_request('PATCH', uri, auth_data, headers, body) + uri = self._get_api_url(auth_data, namespace, "/pods/" + pod_name) + resp = self.create_request('PATCH', uri, auth_data, headers, pod_data) if resp.status_code != 201: return (False, "Error updating the Pod: " + resp.text) diff --git a/test/unit/connectors/Kubernetes.py b/test/unit/connectors/Kubernetes.py index e4e6d68a3..98dd7486a 100755 --- a/test/unit/connectors/Kubernetes.py +++ b/test/unit/connectors/Kubernetes.py @@ -96,7 +96,7 @@ def get_response(self, method, url, verify, headers, data): resp.text = '{"metadata": {"namespace":"namespace", "name": "name"}}' elif url.endswith("/services"): resp.status_code = 201 - elif url.endswith("/namespaces"): + elif url.endswith("/namespaces/"): resp.status_code = 201 elif method == "DELETE": if url.endswith("/pods/1"): From ebe0a019f92375ab42b0d307f267811552aeac56 Mon Sep 17 00:00:00 2001 From: micafer Date: Thu, 26 Nov 2020 17:05:40 +0100 Subject: [PATCH 7/7] Fix style issues --- IM/connectors/Kubernetes.py | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/IM/connectors/Kubernetes.py b/IM/connectors/Kubernetes.py index 17ebd680d..8088d8f82 100644 --- a/IM/connectors/Kubernetes.py +++ b/IM/connectors/Kubernetes.py @@ -274,7 +274,8 @@ def _generate_pod_data(self, namespace, name, outports, system, volumes): if outport.is_range(): self.log_warn("Port range not allowed in Kubernetes connector. Ignoring.") elif outport.get_local_port() != 22: - ports.append({'containerPort': outport.get_local_port(), 'protocol': outport.get_protocol().upper()}) + ports.append({'containerPort': outport.get_local_port(), + 'protocol': outport.get_protocol().upper()}) pod_data = {'apiVersion': 'v1', 'kind': 'Pod'} pod_data['metadata'] = { @@ -296,7 +297,8 @@ def _generate_pod_data(self, namespace, name, outports, system, volumes): command += " ; " command += "echo 'root:" + self._root_password + "' | chpasswd" command += " ; " - command += "sed 's@session\\s*required\\s*pam_loginuid.so@session optional pam_loginuid.so@g' -i /etc/pam.d/sshd" + command += ("sed 's@session\\s*required\\s*pam_loginuid.so@session" + " optional pam_loginuid.so@g' -i /etc/pam.d/sshd") command += " ; " command += " /usr/sbin/sshd -D" containers = [{ @@ -404,7 +406,7 @@ def launch(self, inf, radl, requested_radl, num_vm, auth_data): uri = self._get_api_url(auth_data, namespace, '/services') svc_resp = self.create_request('POST', uri, auth_data, headers, service_data) if svc_resp.status_code != 201: - self.error_messages += "Error creating service to access pod %s: %s" % (pod_name, svc_resp.text) + self.error_messages += "Error creating service for pod %s: %s" % (pod_name, svc_resp.text) self.log_warn("Error creating service: %s" % svc_resp.text) except Exception: self.error_messages += "Error creating service to access pod %s" % pod_name