From 1d2ae792de0f800224ec1497152d7c02f3d31148 Mon Sep 17 00:00:00 2001 From: micafer Date: Fri, 16 Nov 2018 08:54:11 +0100 Subject: [PATCH 01/33] Implements: #76 --- cluesplugins/im.py | 1157 ++++++++++++++++++++++---------------------- 1 file changed, 573 insertions(+), 584 deletions(-) diff --git a/cluesplugins/im.py b/cluesplugins/im.py index 69b3ac5..5e5372c 100644 --- a/cluesplugins/im.py +++ b/cluesplugins/im.py @@ -21,7 +21,7 @@ @author: micafer ''' -import xmlrpclib +import requests from uuid import uuid1 import re @@ -52,588 +52,577 @@ class VirtualMachine: class powermanager(PowerManager): - class VM_Node: - def __init__(self, vm_id, radl, ec3_additional_vm): - self.vm_id = vm_id - self.radl = radl - self.timestamp_recovered = self.timestamp_created = self.timestamp_seen = cpyutils.eventloop.now() - self.ec3_additional_vm = ec3_additional_vm - self.last_state = None - - def seen(self): - self.timestamp_seen = cpyutils.eventloop.now() - # _LOGGER.debug("seen %s" % self.vm_id) - - def recovered(self): - self.timestamp_recovered = cpyutils.eventloop.now() - - def update(self, vm_id, radl): - self.vm_id = vm_id - self.radl = radl - - def __init__(self): - # - # NOTE: This fragment provides the support for global config files. It is a bit awful. - # I do not like it because it is like having global vars. But it is managed in - # this way for the sake of using configuration files - # - config_im = cpyutils.config.Configuration( - "IM VIRTUAL CLUSTER", - { - "IM_VIRTUAL_CLUSTER_XMLRPC": "http://localhost:8899", - "IM_VIRTUAL_CLUSTER_XMLRCP_SSL_CA_CERTS": "", - "IM_VIRTUAL_CLUSTER_XMLRCP_SSL": False, - "IM_VIRTUAL_CLUSTER_AUTH_DATA_FILE": "/usr/local/ec3/auth.dat", - "IM_VIRTUAL_CLUSTER_DROP_FAILING_VMS": 30, - "IM_VIRTUAL_CLUSTER_FORGET_MISSING_VMS": 30, - "IM_VIRTUAL_CLUSTER_DB_CONNECTION_STRING": "sqlite:///var/lib/clues2/clues.db" - } - ) - - self._IM_VIRTUAL_CLUSTER_AUTH_DATA_FILE = config_im.IM_VIRTUAL_CLUSTER_AUTH_DATA_FILE - self._IM_VIRTUAL_CLUSTER_DROP_FAILING_VMS = config_im.IM_VIRTUAL_CLUSTER_DROP_FAILING_VMS - self._IM_VIRTUAL_CLUSTER_FORGET_MISSING_VMS = config_im.IM_VIRTUAL_CLUSTER_FORGET_MISSING_VMS - self._IM_VIRTUAL_CLUSTER_XMLRCP_SSL = config_im.IM_VIRTUAL_CLUSTER_XMLRCP_SSL - self._IM_VIRTUAL_CLUSTER_XMLRPC = config_im.IM_VIRTUAL_CLUSTER_XMLRPC - self._IM_VIRTUAL_CLUSTER_XMLRCP_SSL_CA_CERTS = config_im.IM_VIRTUAL_CLUSTER_XMLRCP_SSL_CA_CERTS - - self._db = cpyutils.db.DB.create_from_string(config_im.IM_VIRTUAL_CLUSTER_DB_CONNECTION_STRING) - self._create_db() - - # Structure for the recovery of nodes - self._mvs_seen = {} - self._golden_images = self._load_golden_images() - self._stopped_vms = self._load_stopped_vms() - self._inf_id = None - - def _create_db(self): - try: - result, _, _ = self._db.sql_query("CREATE TABLE IF NOT EXISTS im_golden_images(ec3_class varchar(255) " - "PRIMARY KEY, image varchar(255), password varchar(255))", True) - result, _, _ = self._db.sql_query("CREATE TABLE IF NOT EXISTS im_stopped_vms(node_name varchar(255) " - "PRIMARY KEY, vm_id varchar(255))", True) - except: - _LOGGER.exception( - "Error creating IM plugin DB. The data persistence will not work!") - result = False - return result - - def _store_golden_image(self, ec3_class, image, password): - try: - self._db.sql_query("INSERT into im_golden_images values ('%s','%s','%s')" % (ec3_class, + class VM_Node: + def __init__(self, vm_id, radl, ec3_additional_vm): + self.vm_id = vm_id + self.radl = radl + self.timestamp_recovered = self.timestamp_created = self.timestamp_seen = cpyutils.eventloop.now() + self.ec3_additional_vm = ec3_additional_vm + self.last_state = None + + def seen(self): + self.timestamp_seen = cpyutils.eventloop.now() + # _LOGGER.debug("seen %s" % self.vm_id) + + def recovered(self): + self.timestamp_recovered = cpyutils.eventloop.now() + + def update(self, vm_id, radl): + self.vm_id = vm_id + self.radl = radl + + def __init__(self): + # + # NOTE: This fragment provides the support for global config files. It is a bit awful. + # I do not like it because it is like having global vars. But it is managed in + # this way for the sake of using configuration files + # + config_im = cpyutils.config.Configuration( + "IM VIRTUAL CLUSTER", + { + "IM_VIRTUAL_CLUSTER_REST_API": "http://localhost:8800", + "IM_VIRTUAL_CLUSTER_REST_SSL_CA_CERTS": "", + "IM_VIRTUAL_CLUSTER_REST_SSL": False, + "IM_VIRTUAL_CLUSTER_AUTH_DATA_FILE": "/usr/local/ec3/auth.dat", + "IM_VIRTUAL_CLUSTER_DROP_FAILING_VMS": 30, + "IM_VIRTUAL_CLUSTER_FORGET_MISSING_VMS": 30, + "IM_VIRTUAL_CLUSTER_DB_CONNECTION_STRING": "sqlite:///var/lib/clues2/clues.db" + } + ) + + self._IM_VIRTUAL_CLUSTER_AUTH_DATA_FILE = config_im.IM_VIRTUAL_CLUSTER_AUTH_DATA_FILE + self._IM_VIRTUAL_CLUSTER_DROP_FAILING_VMS = config_im.IM_VIRTUAL_CLUSTER_DROP_FAILING_VMS + self._IM_VIRTUAL_CLUSTER_FORGET_MISSING_VMS = config_im.IM_VIRTUAL_CLUSTER_FORGET_MISSING_VMS + self._IM_VIRTUAL_CLUSTER_REST_SSL = config_im.IM_VIRTUAL_CLUSTER_REST_SSL + self._IM_VIRTUAL_CLUSTER_REST_API = config_im.IM_VIRTUAL_CLUSTER_REST_API + self._IM_VIRTUAL_CLUSTER_REST_SSL_CA_CERTS = config_im.IM_VIRTUAL_CLUSTER_REST_SSL_CA_CERTS + + self._db = cpyutils.db.DB.create_from_string(config_im.IM_VIRTUAL_CLUSTER_DB_CONNECTION_STRING) + self._create_db() + + # Structure for the recovery of nodes + self._mvs_seen = {} + self._golden_images = self._load_golden_images() + self._stopped_vms = self._load_stopped_vms() + self._inf_id = None + + def _create_db(self): + try: + result, _, _ = self._db.sql_query("CREATE TABLE IF NOT EXISTS im_golden_images(ec3_class varchar(255) " + "PRIMARY KEY, image varchar(255), password varchar(255))", True) + result, _, _ = self._db.sql_query("CREATE TABLE IF NOT EXISTS im_stopped_vms(node_name varchar(255) " + "PRIMARY KEY, vm_id varchar(255))", True) + except: + _LOGGER.exception( + "Error creating IM plugin DB. The data persistence will not work!") + result = False + return result + + def _store_golden_image(self, ec3_class, image, password): + try: + self._db.sql_query("INSERT into im_golden_images values ('%s','%s','%s')" % (ec3_class, image, - password), True) - except: - _LOGGER.exception("Error trying to save IM golden image data.") - - def _store_stopped_vm(self, node_name, vm_id): - try: - self._db.sql_query("INSERT OR REPLACE into im_stopped_vms values ('%s','%s')" % (node_name, vm_id), True) - except: - _LOGGER.exception("Error trying to save IM stopped VMs data.") - - def _delete_stopped_vm(self, node_name): - try: - del self._stopped_vms[node_name] - self._db.sql_query("DELETE FROM im_stopped_vms where node_name = '%s'" % node_name, True) - except: - _LOGGER.exception("Error trying to delete IM stopped VMs data.") - - def _load_stopped_vms(self): - res = {} - try: - result, _, rows = self._db.sql_query("select * from im_stopped_vms") - if result: - for nname, vm_id in rows: - res[nname] = vm_id - else: - _LOGGER.error("Error trying to load IM stopped VMs data.") - except: - _LOGGER.exception("Error trying to load IM stopped VMs data.") - - return res - - def _load_golden_images(self): - res = {} - try: - result, _, rows = self._db.sql_query("select * from im_golden_images") - if result: - for (ec3_class, image, password) in rows: - res[ec3_class] = image, password - else: - _LOGGER.error("Error trying to load IM golden images data.") - except: - _LOGGER.exception("Error trying to load IM golden images data.") - - return res - - def _get_inf_id(self): - if self._inf_id is not None: - return self._inf_id - else: - server = self._get_server() - auth_data = self._read_auth_data(self._IM_VIRTUAL_CLUSTER_AUTH_DATA_FILE) - (success, inf_list) = server.GetInfrastructureList(auth_data) - if success: - if len(inf_list) > 0: - _LOGGER.debug("The IM Inf ID is %s" % inf_list[0]) - self._inf_id = inf_list[0] - return inf_list[0] - else: - _LOGGER.error("Error getting infrastructure list: No infrastructure!.") - else: - _LOGGER.error("Error getting infrastructure list: %s" % inf_list) - return None - - def _get_server(self): - if self._IM_VIRTUAL_CLUSTER_XMLRCP_SSL: - from springpython.remoting.xmlrpc import SSLClient - return SSLClient(self._IM_VIRTUAL_CLUSTER_XMLRPC, self._IM_VIRTUAL_CLUSTER_XMLRCP_SSL_CA_CERTS) - else: - return xmlrpclib.ServerProxy(self._IM_VIRTUAL_CLUSTER_XMLRPC,allow_none=True) - - # From IM.auth - @staticmethod - def _read_auth_data(filename): - if isinstance(filename, list): - lines = filename - else: - auth_file = open(filename, 'r') - lines = auth_file.readlines() - auth_file.close() - - res = [] - - for line in lines: - line = line.strip() - if len(line) > 0 and not line.startswith("#"): - auth = {} - tokens = line.split(";") - for token in tokens: - key_value = token.split(" = ") - if len(key_value) != 2: - break; - else: - value = key_value[1].strip().replace("\\n","\n") - # Enable to specify a filename and set the contents of it - if value.startswith("file(") and value.endswith(")"): - filename = value[5:len(value)-1] - try: - value_file = open(filename, 'r') - value = value_file.read() - value_file.close() - except: - pass - auth[key_value[0].strip()] = value - res.append(auth) - - return res - - def _get_system(self, vm_info, radl_all, nname): - - # First try to check if the user has specified the ec3_node_pattern - # it must be a regular expression to match with nname + password), True) + except: + _LOGGER.exception("Error trying to save IM golden image data.") + + def _store_stopped_vm(self, node_name, vm_id): + try: + self._db.sql_query("INSERT OR REPLACE into im_stopped_vms values ('%s','%s')" % (node_name, vm_id), True) + except: + _LOGGER.exception("Error trying to save IM stopped VMs data.") + + def _delete_stopped_vm(self, node_name): + try: + del self._stopped_vms[node_name] + self._db.sql_query("DELETE FROM im_stopped_vms where node_name = '%s'" % node_name, True) + except: + _LOGGER.exception("Error trying to delete IM stopped VMs data.") + + def _load_stopped_vms(self): + res = {} + try: + result, _, rows = self._db.sql_query("select * from im_stopped_vms") + if result: + for nname, vm_id in rows: + res[nname] = vm_id + else: + _LOGGER.error("Error trying to load IM stopped VMs data.") + except: + _LOGGER.exception("Error trying to load IM stopped VMs data.") + + return res + + def _load_golden_images(self): + res = {} + try: + result, _, rows = self._db.sql_query("select * from im_golden_images") + if result: + for (ec3_class, image, password) in rows: + res[ec3_class] = image, password + else: + _LOGGER.error("Error trying to load IM golden images data.") + except: + _LOGGER.exception("Error trying to load IM golden images data.") + + return res + + def _get_inf_id(self): + if self._inf_id is not None: + return self._inf_id + else: + auth_data = self._read_auth_data(self._IM_VIRTUAL_CLUSTER_AUTH_DATA_FILE) + headers = {"Authorization": auth_data, "Accept": "application/json"} + url = "%s/infrastructures" % self._IM_VIRTUAL_CLUSTER_REST_API + resp = requests.request("GET", url, verify=False, headers=headers) + + if resp.status_code == 200: + inf_list = resp.json()['uri-list'] + if len(inf_list) > 0: + _LOGGER.debug("The IM Inf ID is %s" % inf_list[0]) + self._inf_id = inf_list[0]['uri'] + return self._inf_id + else: + _LOGGER.error("Error getting infrastructure list: No infrastructure!.") + else: + _LOGGER.error("Error getting infrastructure list: %s. %s." % (resp.reason, resp.text)) + return None + + # From IM.auth + @staticmethod + def _read_auth_data(filename): + if isinstance(filename, list): + res = "\\n".join(filename) + else: + auth_file = open(filename, 'r') + res = auth_file.read().replace('\n', '\\n') + auth_file.close() + return res + + def _get_system(self, vm_info, radl_all, nname): + + # First try to check if the user has specified the ec3_node_pattern + # it must be a regular expression to match with nname # for example: vnode-[1,2,3,4,5] - for system in radl_all.systems: - if system.getValue("ec3_node_pattern"): - if re.match(system.getValue("ec3_node_pattern"), nname): - return system.name - - # Start with the system named "wn" - current_system = "wn" - while current_system: - system_orig = vm_info[current_system]["radl"] - ec3_max_instances = system_orig.getValue("ec3_max_instances", -1) - if ec3_max_instances < 0: - ec3_max_instances = 99999999 - if vm_info[current_system]["count"] < ec3_max_instances: - return current_system - else: - # we must change the system to the next one - current_system = system_orig.getValue("ec3_if_fail", '') - if not current_system: - _LOGGER.error("Error: we need more instances but ec3_if_fail of system %s is empty" % system_orig.name) - return None - - def _get_radl(self, nname): - inf_id = self._get_inf_id() - if not inf_id: - _LOGGER.error("Error getting RADL. No infrastructure ID!!") - return None - server = self._get_server() - auth_data = self._read_auth_data(self._IM_VIRTUAL_CLUSTER_AUTH_DATA_FILE) - (success, vm_ids) = server.GetInfrastructureInfo(inf_id, auth_data) - - # Get all the info from RADL - # Especial features in system: - #- 'ec3_max_instances': maximum number of nodes with this system configuration; a negative value is like no constrain; default value is -1. - #- 'ec3_destroy_interval': some cloud providers pay a certain amount of time in advance, like AWS EC2. The node will be destroyed only when it is idle at the end of the interval expressed by this option in seconds. The default value is 0. - #- 'ec3_destroy_safe': seconds before the deadline set by \'ec3_destroy_interval\' that the node can be destroyed; the default value is 0. - #- 'ec3_if_fail': name of the next system configuration to try after this fails a launch or the number of instances saturates; the default value is ''. - - vm_info = {} - if success: - # The first one is always the front-end node - for vm_id in vm_ids[1:]: - (success, radl_data) = server.GetVMInfo(inf_id, vm_id, auth_data) - if success: - radl = radl_parse.parse_radl(radl_data) - ec3_class = radl.systems[0].getValue("ec3_class") - if ec3_class not in vm_info: - vm_info[ec3_class] = {} - vm_info[ec3_class]['count'] = 0 - vm_info[ec3_class]['count'] += 1 - else: - _LOGGER.error("Error getting VM info: %s" % radl_data) - else: - _LOGGER.error("Error getting infrastructure info: %s" % vm_ids) - - (success, radl_data) = server.GetInfrastructureRADL(inf_id, auth_data) - if success: - radl_all = radl_parse.parse_radl(radl_data) - else: - _LOGGER.error("Error getting infrastructure RADL: %s" % radl_data) - return None - - # Get info from the original RADL - for system in radl_all.systems: - if system.name not in vm_info: - vm_info[system.name] = {} - vm_info[system.name]['count'] = 0 - vm_info[system.name]['radl'] = system - - current_system = self._get_system(vm_info, radl_all, nname) - if current_system: - # launch this system type - new_radl = "" - for net in radl_all.networks: - new_radl += "network " + net.id + "\n" - - system_orig = vm_info[current_system]["radl"] - system_orig.name = nname - system_orig.setValue("net_interface.0.dns_name", str(nname)) - system_orig.setValue("ec3_class", current_system) - if current_system in self._golden_images: - image, password = self._golden_images[current_system] - system_orig.setValue("disk.0.image.url", image) - _LOGGER.debug("A golden image for %s node is stored, using it: %s" % (current_system, image)) - if password: - system_orig.setValue("disk.0.os.credentials.password", password) - new_radl += str(system_orig) + "\n" - - for configure in radl_all.configures: - if configure.name == current_system: - configure.name = nname - new_radl += str(configure) + "\n" - - new_radl += "deploy " + nname + " 1" - - return new_radl - else: - _LOGGER.error("Error generating infrastructure RADL") - return None - - def _get_vms(self): - inf_id = self._get_inf_id() - if not inf_id: - _LOGGER.error("ERROR getting infrastructure info: No infrastructure ID!!") - return self._mvs_seen - now = cpyutils.eventloop.now() - server = self._get_server() - auth_data = self._read_auth_data(self._IM_VIRTUAL_CLUSTER_AUTH_DATA_FILE) - (success, vm_ids) = server.GetInfrastructureInfo(inf_id, auth_data) - if not success: - _LOGGER.error("ERROR getting infrastructure info: %s" % vm_ids) - else: - # The first one is always the front-end node - for vm_id in vm_ids[1:]: - clues_node_name = None - ec3_additional_vm = None - try: - (success, radl_data) = server.GetVMInfo(inf_id, vm_id, auth_data) - if success: - radl = radl_parse.parse_radl(radl_data) - clues_node_name = radl.systems[0].getValue('net_interface.0.dns_name') - ec3_additional_vm = radl.systems[0].getValue('ec3_additional_vm') - state = radl.systems[0].getValue('state') - else: - _LOGGER.error("ERROR getting VM info: %s" % vm_id) - except TypeError: - success = False - reload(radl_parse) - _LOGGER.exception("ERROR getting VM info: %s. Trying to reload radl_parse module." % vm_id) - except: - success = False - _LOGGER.exception("ERROR getting VM info: %s" % vm_id) - - if clues_node_name and state not in [VirtualMachine.STOPPED]: - # Create or update VM info - if clues_node_name not in self._mvs_seen: - self._mvs_seen[clues_node_name] = self.VM_Node(vm_id, radl, ec3_additional_vm) - else: - if self._mvs_seen[clues_node_name].vm_id != vm_id: - # this must not happen ... - _LOGGER.warning("Node %s in VM with id %s now have a new ID: %s" % (clues_node_name, self._mvs_seen[clues_node_name].vm_id, vm_id)) - if ec3_additional_vm: - _LOGGER.debug("Node %s is an additional not recovering it." % clues_node_name) - else: - self.power_off(clues_node_name) - self._mvs_seen[clues_node_name].update(vm_id, radl) - - enabled = True - node_found = self._clues_daemon.get_node(clues_node_name) - if node_found: - enabled = node_found.enabled - - self._mvs_seen[clues_node_name].seen() - last_state = self._mvs_seen[clues_node_name].last_state - self._mvs_seen[clues_node_name].last_state = state - - if state in [VirtualMachine.FAILED, VirtualMachine.UNCONFIGURED]: - # This VM is in "terminal" state remove it from the infrastructure - _LOGGER.error("Node %s in VM with id %s is in state: %s" % (clues_node_name, vm_id, state)) - - if state == VirtualMachine.UNCONFIGURED: - # in case of unconfigured show the log to make easier debug - # but only the first time - if last_state != VirtualMachine.UNCONFIGURED: - (success, contmsg) = server.GetVMContMsg(inf_id, vm_id, auth_data) - _LOGGER.debug("Contextualization msg: %s" % contmsg) - # check if node is disabled and do not recover it - if enabled: - if ec3_additional_vm: - _LOGGER.debug("Node %s is an additional not recovering it." % clues_node_name) - else: - self.recover(clues_node_name, node_found) - else: - _LOGGER.debug("Node %s is disabled not recovering it." % clues_node_name) - else: - if ec3_additional_vm: - _LOGGER.debug("Node %s is an additional not recovering it." % clues_node_name) - else: - self.recover(clues_node_name, node_found) - elif state == VirtualMachine.OFF: - _LOGGER.warning("Node %s in VM with id %s is in state: %s" % (clues_node_name, vm_id, state)) - if clues_node_name in self._stopped_vms: - _LOGGER.info("Node %s in the list of Stopped nodes. Remove VM with id %s." % (clues_node_name, vm_id)) - self.recover(clues_node_name, node_found) - # Otherwise Do not terminate this VM, let's wait to lifecycle to check if it must be terminated - elif state == VirtualMachine.UNKNOWN: - # Do not terminate this VM, let's wait to lifecycle to check if it must be terminated - _LOGGER.warning("Node %s in VM with id %s is in state: %s" % (clues_node_name, vm_id, state)) - else: - if state not in [VirtualMachine.STOPPED]: - _LOGGER.warning("VM with id %s does not have dns_name specified." % vm_id) - else: - continue - #_LOGGER.debug("Node %s with VM with id %s is stopped." % (clues_node_name, vm_id)) - - # from the nodes that we have powered on, check which of them are still running - for nname, node in self._mvs_seen.items(): - if (now - node.timestamp_seen) > self._IM_VIRTUAL_CLUSTER_FORGET_MISSING_VMS: - _LOGGER.debug("vm %s is not seen for a while... let's forget it" % nname) - del self._mvs_seen[nname] - - return self._mvs_seen - - def _recover_ids(self, vms): - for vm in vms: - self.recover(vm) - - def power_on(self, nname): - success = None - try: - vms = self._get_vms() - - inf_id = self._get_inf_id() - if not inf_id: - _LOGGER.error("ERROR launching %s node: No infrastructure ID!!" % nname) - return False, nname - - if nname in vms: - _LOGGER.warning("Trying to launch an existing node %s. Ignoring it." % nname) - return False, nname - - ec3_reuse_nodes = False - if len(self._stopped_vms) > 0: - if self._stopped_vms.get(nname): - ec3_reuse_nodes = True - vm_id = self._stopped_vms.get(nname) - - server = self._get_server() - auth_data = self._read_auth_data(self._IM_VIRTUAL_CLUSTER_AUTH_DATA_FILE) - if ec3_reuse_nodes: - (success, vms_id) = server.StartVM(inf_id, vm_id, auth_data) - else: - radl_data = self._get_radl(nname) - if radl_data: - _LOGGER.debug("RADL to launch/restart node %s: %s" % (nname, radl_data)) - (success, vms_id) = server.AddResource(inf_id, radl_data, auth_data) - else: - _LOGGER.error("RADL to launch node %s is empty!!" % nname) - return False, nname - except: - success = False - _LOGGER.exception("Error launching/restarting node %s " % nname) - return False, nname - - if success: - _LOGGER.debug("Node %s successfully created/restarted" % nname) - else: - _LOGGER.error("ERROR launching node %s: %s" % (nname, vms_id)) - - return success, nname - - def power_off(self, nname, destroy=False): - _LOGGER.debug("Powering off/stopping %s" % nname) - try: - inf_id = self._get_inf_id() - if not inf_id: - _LOGGER.error("ERROR deleting %s node: No infrastructure ID!!" % nname) - return False, nname - - server = self._get_server() - success = False - - if nname in self._mvs_seen: - vm = self._mvs_seen[nname] - ec3_destroy_interval = vm.radl.systems[0].getValue('ec3_destroy_interval', 0) - ec3_destroy_safe = vm.radl.systems[0].getValue('ec3_destroy_safe', 0) - ec3_reuse_nodes = vm.radl.systems[0].getValue('ec3_reuse_nodes', 0) - - poweroff = True - if ec3_destroy_interval > 0: - poweroff = False - live_time = cpyutils.eventloop.now() - vm.timestamp_created - remaining_paid_time = ec3_destroy_interval - live_time % ec3_destroy_interval - _LOGGER.debug("Remaining_paid_time = %d for node %s" % (int(remaining_paid_time), nname)) - if remaining_paid_time < ec3_destroy_safe: - poweroff = True - - if poweroff: - auth_data = self._read_auth_data(self._IM_VIRTUAL_CLUSTER_AUTH_DATA_FILE) - if ec3_reuse_nodes and not destroy: - (success, vm_ids) = server.StopVM(inf_id, vm.vm_id, auth_data) - self._stopped_vms[nname] = vm.vm_id - self._store_stopped_vm(nname, vm.vm_id) - if not success: - _LOGGER.error("ERROR stopping node: %s: %s" % (nname,vm_ids)) - elif vm_ids == 0: - _LOGGER.error("ERROR stopping node: %s. No VM has been stopped." % nname) - else: - if nname in self._stopped_vms: - self._delete_stopped_vm(nname) - - (success, vm_ids) = server.RemoveResource(inf_id, vm.vm_id, auth_data) - if not success: - _LOGGER.error("ERROR deleting node: %s: %s" % (nname,vm_ids)) - elif vm_ids == 0: - _LOGGER.error("ERROR deleting node: %s. No VM has been deleted." % nname) - else: - _LOGGER.debug("Not powering off/stopping node %s" % nname) - success = False - else: - _LOGGER.warning("There is not any VM associated to node %s (are IM credentials compatible to the VM?)" % nname) - except: - _LOGGER.exception("Error powering off/stopping node %s " % nname) - success = False - - return success, nname - - def lifecycle(self): - try: - monitoring_info = self._clues_daemon.get_monitoring_info() - now = cpyutils.eventloop.now() - - vms = self._get_vms() - - recover = [] - # To store the name of the nodes to use it in the third case - node_names = [] - - # Two cases: (1) a VM that is on in the monitoring info, but it is not seen in IM; and (2) a VM that is off in the monitoring info, but it is seen in IM - for node in monitoring_info.nodelist: - node_names.append(node.name) - - if node.name in vms and node.state in [Node.IDLE, Node.USED]: - if node.name in vms: - vm = vms[node.name] - state = vm.radl.systems[0].getValue('state') - # user request use of golden images and the image is fully configured - if vm.radl.systems[0].getValue("ec3_golden_images"): - if state == VirtualMachine.CONFIGURED: - ec3_class = vm.radl.systems[0].getValue("ec3_class") - # check if the image is in the list of saved images - if ec3_class not in self._golden_images: - # if not save it - self._save_golden_image(vm) - else: - _LOGGER.debug("node %s is idle/used but it is not yet configured. Do not save golden image." % (node.name)) - else: - # This may happen because it is launched manually not setting the dns_name of the node - _LOGGER.warning("node %s is detected by the monitoring system, but there is not any VM associated to it (are IM credentials compatible to the VM?)" % node.name) - - if node.enabled: - if node.state in [Node.OFF, Node.OFF_ERR, Node.UNKNOWN]: - if self._IM_VIRTUAL_CLUSTER_DROP_FAILING_VMS > 0: - if node.name in vms: - vm = vms[node.name] - time_off = now - node.timestamp_state - time_recovered = now - vm.timestamp_recovered - _LOGGER.warning("node %s has a VM running but it is OFF or UNKNOWN in the monitoring system since %d seconds" % (node.name, time_off)) - if time_off > self._IM_VIRTUAL_CLUSTER_DROP_FAILING_VMS: - if time_recovered > self._IM_VIRTUAL_CLUSTER_DROP_FAILING_VMS: - _LOGGER.warning("Trying to recover it (state: %s)" % node.state) - vm.recovered() - recover.append(node.name) - else: - _LOGGER.debug("node %s has been recently recovered %d seconds ago. Do not recover it yet." % (node.name, time_recovered)) - else: - if node.name not in vms: - # This may happen because it is launched by hand using other credentials than those for the user used for IM (and he cannot manage the VMS) - _LOGGER.warning("node %s is detected by the monitoring system, but there is not any VM associated to it (are IM credentials compatible to the VM?)" % node.name) - - # A third case: a VM that it is seen in IM but does not correspond to any node in the monitoring info - # This is a strange case but we assure not to have uncontrolled VMs - for name in vms: - vm = vms[name] - if name not in node_names and not vm.ec3_additional_vm: - _LOGGER.warning("VM with name %s is detected by the IM but it does not exist in the monitoring system... (%s) recovering it.)" % (name, node_names)) - vm.recovered() - recover.append(name) - - self._recover_ids(recover) - except: - _LOGGER.exception("Error executing lifecycle of IM PowerManager.") - - return PowerManager.lifecycle(self) - - def recover(self, nname, node=None): - success, nname = self.power_off(nname, True) - if success: - if node: - node.mark_poweredoff() - node.set_state(Node.OFF_ERR) - return Node.OFF - - return False - - def _save_golden_image(self, vm): - success = False - _LOGGER.debug("Saving golden image for VM id: " + vm.vm_id) - try: - server = self._get_server() - auth_data = self._read_auth_data(self._IM_VIRTUAL_CLUSTER_AUTH_DATA_FILE) - image_name = "im-%s" % str(uuid1()) - (success, new_image) = server.CreateDiskSnapshot(self._get_inf_id(), vm.vm_id, 0, image_name, True, auth_data) - if success: - ec3_class = vm.radl.systems[0].getValue("ec3_class") - password = vm.radl.systems[0].getValue("disk.0.os.credentials.password") - self._golden_images[ec3_class] = new_image, password - # Save it to DB for persistence - self._store_golden_image(ec3_class, new_image, password) - else: - _LOGGER.error("Error saving golden image: %s." % new_image) - except: - _LOGGER.exception("Error saving golden image.") - return success + for system in radl_all.systems: + if system.getValue("ec3_node_pattern"): + if re.match(system.getValue("ec3_node_pattern"), nname): + return system.name + + # Start with the system named "wn" + current_system = "wn" + while current_system: + system_orig = vm_info[current_system]["radl"] + ec3_max_instances = system_orig.getValue("ec3_max_instances", -1) + if ec3_max_instances < 0: + ec3_max_instances = 99999999 + if vm_info[current_system]["count"] < ec3_max_instances: + return current_system + else: + # we must change the system to the next one + current_system = system_orig.getValue("ec3_if_fail", '') + if not current_system: + _LOGGER.error("Error: we need more instances but ec3_if_fail of system %s is empty" % system_orig.name) + return None + + def _get_radl(self, nname): + inf_id = self._get_inf_id() + if not inf_id: + _LOGGER.error("Error getting RADL. No infrastructure ID!!") + return None + auth_data = self._read_auth_data(self._IM_VIRTUAL_CLUSTER_AUTH_DATA_FILE) + + headers = {"Authorization": auth_data, "Accept": "application/json"} + resp = requests.request("GET", inf_id, verify=False, headers=headers) + + # Get all the info from RADL + # Especial features in system: + #- 'ec3_max_instances': maximum number of nodes with this system configuration; a negative value is like no constrain; default value is -1. + #- 'ec3_destroy_interval': some cloud providers pay a certain amount of time in advance, like AWS EC2. The node will be destroyed only when it is idle at the end of the interval expressed by this option in seconds. The default value is 0. + #- 'ec3_destroy_safe': seconds before the deadline set by \'ec3_destroy_interval\' that the node can be destroyed; the default value is 0. + #- 'ec3_if_fail': name of the next system configuration to try after this fails a launch or the number of instances saturates; the default value is ''. + + vm_info = {} + if resp.status_code == 200: + vm_ids = [vm_id['uri'] for vm_id in resp.json()["uri-list"]] + # The first one is always the front-end node + for vm_id in vm_ids[1:]: + headers = {"Authorization": auth_data, "Accept": "text/*"} + resp = requests.request("GET", vm_id, verify=False, headers=headers) + if resp.status_code == 200: + radl = radl_parse.parse_radl(resp.text) + ec3_class = radl.systems[0].getValue("ec3_class") + if ec3_class not in vm_info: + vm_info[ec3_class] = {} + vm_info[ec3_class]['count'] = 0 + vm_info[ec3_class]['count'] += 1 + else: + _LOGGER.error("Error getting VM info: %s. %s." % (resp.reason, resp.text)) + else: + _LOGGER.error("Error getting infrastructure info: %s. %s." % (resp.reason, resp.text)) + + headers = {"Authorization": auth_data, "Accept": "text/*"} + resp = requests.request("GET", "%s/radl" % inf_id, verify=False, headers=headers) + if resp.status_code == 200: + radl_all = radl_parse.parse_radl(resp.text) + else: + _LOGGER.error("Error getting infrastructure RADL: %s. %s." % (resp.reason, resp.text)) + return None + + # Get info from the original RADL + for system in radl_all.systems: + if system.name not in vm_info: + vm_info[system.name] = {} + vm_info[system.name]['count'] = 0 + vm_info[system.name]['radl'] = system + + current_system = self._get_system(vm_info, radl_all, nname) + if current_system: + # launch this system type + new_radl = "" + for net in radl_all.networks: + new_radl += "network " + net.id + "\n" + + system_orig = vm_info[current_system]["radl"] + system_orig.name = nname + system_orig.setValue("net_interface.0.dns_name", str(nname)) + system_orig.setValue("ec3_class", current_system) + if current_system in self._golden_images: + image, password = self._golden_images[current_system] + system_orig.setValue("disk.0.image.url", image) + _LOGGER.debug("A golden image for %s node is stored, using it: %s" % (current_system, image)) + if password: + system_orig.setValue("disk.0.os.credentials.password", password) + new_radl += str(system_orig) + "\n" + + for configure in radl_all.configures: + if configure.name == current_system: + configure.name = nname + new_radl += str(configure) + "\n" + + new_radl += "deploy " + nname + " 1" + + return new_radl + else: + _LOGGER.error("Error generating infrastructure RADL") + return None + + def _get_vms(self): + inf_id = self._get_inf_id() + if not inf_id: + _LOGGER.error("ERROR getting infrastructure info: No infrastructure ID!!") + return self._mvs_seen + now = cpyutils.eventloop.now() + auth_data = self._read_auth_data(self._IM_VIRTUAL_CLUSTER_AUTH_DATA_FILE) + headers = {"Authorization": auth_data, "Accept": "application/json"} + resp = requests.request("GET", inf_id, verify=False, headers=headers) + + if resp.status_code != 200: + _LOGGER.error("Error getting infrastructure info: %s. %s." % (resp.reason, resp.text)) + else: + vm_ids = [vm_id['uri'] for vm_id in resp.json()["uri-list"]] + # The first one is always the front-end node + for vm_id in vm_ids[1:]: + clues_node_name = None + ec3_additional_vm = None + try: + headers = {"Authorization": auth_data, "Accept": "text/*"} + resp = requests.request("GET", vm_id, verify=False, headers=headers) + + if resp.status_code == 200: + radl = radl_parse.parse_radl(resp.text) + clues_node_name = radl.systems[0].getValue('net_interface.0.dns_name') + ec3_additional_vm = radl.systems[0].getValue('ec3_additional_vm') + state = radl.systems[0].getValue('state') + else: + _LOGGER.error("Error getting VM info: %s. %s." % (resp.reason, resp.text)) + except TypeError: + success = False + reload(radl_parse) + _LOGGER.exception("ERROR getting VM info: %s. Trying to reload radl_parse module." % vm_id) + except: + success = False + _LOGGER.exception("ERROR getting VM info: %s" % vm_id) + + if clues_node_name and state not in [VirtualMachine.STOPPED]: + # Create or update VM info + if clues_node_name not in self._mvs_seen: + self._mvs_seen[clues_node_name] = self.VM_Node(vm_id, radl, ec3_additional_vm) + else: + if self._mvs_seen[clues_node_name].vm_id != vm_id: + # this must not happen ... + _LOGGER.warning("Node %s in VM with id %s now have a new ID: %s" % (clues_node_name, self._mvs_seen[clues_node_name].vm_id, vm_id)) + if ec3_additional_vm: + _LOGGER.debug("Node %s is an additional not recovering it." % clues_node_name) + else: + self.power_off(clues_node_name) + self._mvs_seen[clues_node_name].update(vm_id, radl) + + enabled = True + node_found = self._clues_daemon.get_node(clues_node_name) + if node_found: + enabled = node_found.enabled + + self._mvs_seen[clues_node_name].seen() + last_state = self._mvs_seen[clues_node_name].last_state + self._mvs_seen[clues_node_name].last_state = state + + if state in [VirtualMachine.FAILED, VirtualMachine.UNCONFIGURED]: + # This VM is in "terminal" state remove it from the infrastructure + _LOGGER.error("Node %s in VM with id %s is in state: %s" % (clues_node_name, vm_id, state)) + + if state == VirtualMachine.UNCONFIGURED: + # in case of unconfigured show the log to make easier debug + # but only the first time + if last_state != VirtualMachine.UNCONFIGURED: + headers = {"Authorization": auth_data, "Accept": "text/*"} + resp = requests.request("GET", "%s/contmsg" % vm_id, verify=False, headers=headers) + _LOGGER.debug("Contextualization msg: %s" % resp.text) + # check if node is disabled and do not recover it + if enabled: + if ec3_additional_vm: + _LOGGER.debug("Node %s is an additional not recovering it." % clues_node_name) + else: + self.recover(clues_node_name, node_found) + else: + _LOGGER.debug("Node %s is disabled not recovering it." % clues_node_name) + else: + if ec3_additional_vm: + _LOGGER.debug("Node %s is an additional not recovering it." % clues_node_name) + else: + self.recover(clues_node_name, node_found) + elif state == VirtualMachine.OFF: + _LOGGER.warning("Node %s in VM with id %s is in state: %s" % (clues_node_name, vm_id, state)) + if clues_node_name in self._stopped_vms: + _LOGGER.info("Node %s in the list of Stopped nodes. Remove VM with id %s." % (clues_node_name, vm_id)) + self.recover(clues_node_name, node_found) + # Otherwise Do not terminate this VM, let's wait to lifecycle to check if it must be terminated + elif state == VirtualMachine.UNKNOWN: + # Do not terminate this VM, let's wait to lifecycle to check if it must be terminated + _LOGGER.warning("Node %s in VM with id %s is in state: %s" % (clues_node_name, vm_id, state)) + else: + if state not in [VirtualMachine.STOPPED]: + _LOGGER.warning("VM with id %s does not have dns_name specified." % vm_id) + else: + continue + #_LOGGER.debug("Node %s with VM with id %s is stopped." % (clues_node_name, vm_id)) + + # from the nodes that we have powered on, check which of them are still running + for nname, node in self._mvs_seen.items(): + if (now - node.timestamp_seen) > self._IM_VIRTUAL_CLUSTER_FORGET_MISSING_VMS: + _LOGGER.debug("vm %s is not seen for a while... let's forget it" % nname) + del self._mvs_seen[nname] + + return self._mvs_seen + + def _recover_ids(self, vms): + for vm in vms: + self.recover(vm) + + def power_on(self, nname): + success = None + try: + vms = self._get_vms() + + inf_id = self._get_inf_id() + if not inf_id: + _LOGGER.error("ERROR launching %s node: No infrastructure ID!!" % nname) + return False, nname + + if nname in vms: + _LOGGER.warning("Trying to launch an existing node %s. Ignoring it." % nname) + return False, nname + + ec3_reuse_nodes = False + if len(self._stopped_vms) > 0: + if self._stopped_vms.get(nname): + ec3_reuse_nodes = True + vm_id = self._stopped_vms.get(nname) + + auth_data = self._read_auth_data(self._IM_VIRTUAL_CLUSTER_AUTH_DATA_FILE) + if ec3_reuse_nodes: + headers = {"Authorization": auth_data} + resp = requests.request("PUT", "%s/start" % vm_id, verify=False, headers=headers) + success = resp.status_code == 200 + if not success: + _LOGGER.error("Error starting VM: %s. %s." % (resp.reason, resp.text)) + else: + radl_data = self._get_radl(nname) + if radl_data: + _LOGGER.debug("RADL to launch/restart node %s: %s" % (nname, radl_data)) + headers = {"Authorization": auth_data, "Content-Type": "text/plain", "Accept": "application/json"} + resp = requests.request("POST", inf_id, verify=False, headers=headers, data=radl_data) + success = resp.status_code == 200 + if success: + vms_id = [vm_id['uri'] for vm_id in resp.json()["uri-list"]] + else: + _LOGGER.error("Error adding resources %s. %s." % (resp.reason, resp.text)) + return False, nname + else: + _LOGGER.error("RADL to launch node %s is empty!!" % nname) + return False, nname + except: + success = False + _LOGGER.exception("Error launching/restarting node %s " % nname) + return False, nname + + if success: + _LOGGER.debug("Node %s successfully created/restarted" % nname) + + return success, nname + + def power_off(self, nname, destroy=False): + _LOGGER.debug("Powering off/stopping %s" % nname) + try: + inf_id = self._get_inf_id() + if not inf_id: + _LOGGER.error("ERROR deleting %s node: No infrastructure ID!!" % nname) + return False, nname + + success = False + + if nname in self._mvs_seen: + vm = self._mvs_seen[nname] + ec3_destroy_interval = vm.radl.systems[0].getValue('ec3_destroy_interval', 0) + ec3_destroy_safe = vm.radl.systems[0].getValue('ec3_destroy_safe', 0) + ec3_reuse_nodes = vm.radl.systems[0].getValue('ec3_reuse_nodes', 0) + + poweroff = True + if ec3_destroy_interval > 0: + poweroff = False + live_time = cpyutils.eventloop.now() - vm.timestamp_created + remaining_paid_time = ec3_destroy_interval - live_time % ec3_destroy_interval + _LOGGER.debug("Remaining_paid_time = %d for node %s" % (int(remaining_paid_time), nname)) + if remaining_paid_time < ec3_destroy_safe: + poweroff = True + + if poweroff: + auth_data = self._read_auth_data(self._IM_VIRTUAL_CLUSTER_AUTH_DATA_FILE) + if ec3_reuse_nodes and not destroy: + headers = {"Authorization": auth_data} + resp = requests.request("PUT", "%s/stop" % vm.vm_id, verify=False, headers=headers) + success = resp.status_code == 200 + self._stopped_vms[nname] = vm.vm_id + self._store_stopped_vm(nname, vm.vm_id) + if not success: + _LOGGER.error("ERROR stopping node %s: %s. %s." % (nname, resp.reason, resp.text)) + else: + if nname in self._stopped_vms: + self._delete_stopped_vm(nname) + + headers = {"Authorization": auth_data} + resp = requests.request("DELETE", vm.vm_id, verify=False, headers=headers) + success = resp.status_code == 200 + if not success: + _LOGGER.error("ERROR deleting node %s: %s. %s." % (nname, resp.reason, resp.text)) + else: + _LOGGER.debug("Not powering off/stopping node %s" % nname) + success = False + else: + _LOGGER.warning("There is not any VM associated to node %s (are IM credentials compatible to the VM?)" % nname) + except: + _LOGGER.exception("Error powering off/stopping node %s " % nname) + success = False + + return success, nname + + def lifecycle(self): + try: + monitoring_info = self._clues_daemon.get_monitoring_info() + now = cpyutils.eventloop.now() + + vms = self._get_vms() + + recover = [] + # To store the name of the nodes to use it in the third case + node_names = [] + + # Two cases: (1) a VM that is on in the monitoring info, but it is not seen in IM; and (2) a VM that is off in the monitoring info, but it is seen in IM + for node in monitoring_info.nodelist: + node_names.append(node.name) + + if node.name in vms and node.state in [Node.IDLE, Node.USED]: + if node.name in vms: + vm = vms[node.name] + state = vm.radl.systems[0].getValue('state') + # user request use of golden images and the image is fully configured + if vm.radl.systems[0].getValue("ec3_golden_images"): + if state == VirtualMachine.CONFIGURED: + ec3_class = vm.radl.systems[0].getValue("ec3_class") + # check if the image is in the list of saved images + if ec3_class not in self._golden_images: + # if not save it + self._save_golden_image(vm) + else: + _LOGGER.debug("node %s is idle/used but it is not yet configured. Do not save golden image." % (node.name)) + else: + # This may happen because it is launched manually not setting the dns_name of the node + _LOGGER.warning("node %s is detected by the monitoring system, but there is not any VM associated to it (are IM credentials compatible to the VM?)" % node.name) + + if node.enabled: + if node.state in [Node.OFF, Node.OFF_ERR, Node.UNKNOWN]: + if self._IM_VIRTUAL_CLUSTER_DROP_FAILING_VMS > 0: + if node.name in vms: + vm = vms[node.name] + time_off = now - node.timestamp_state + time_recovered = now - vm.timestamp_recovered + _LOGGER.warning("node %s has a VM running but it is OFF or UNKNOWN in the monitoring system since %d seconds" % (node.name, time_off)) + if time_off > self._IM_VIRTUAL_CLUSTER_DROP_FAILING_VMS: + if time_recovered > self._IM_VIRTUAL_CLUSTER_DROP_FAILING_VMS: + _LOGGER.warning("Trying to recover it (state: %s)" % node.state) + vm.recovered() + recover.append(node.name) + else: + _LOGGER.debug("node %s has been recently recovered %d seconds ago. Do not recover it yet." % (node.name, time_recovered)) + else: + if node.name not in vms: + # This may happen because it is launched by hand using other credentials than those for the user used for IM (and he cannot manage the VMS) + _LOGGER.warning("node %s is detected by the monitoring system, but there is not any VM associated to it (are IM credentials compatible to the VM?)" % node.name) + + # A third case: a VM that it is seen in IM but does not correspond to any node in the monitoring info + # This is a strange case but we assure not to have uncontrolled VMs + for name in vms: + vm = vms[name] + if name not in node_names and not vm.ec3_additional_vm: + _LOGGER.warning("VM with name %s is detected by the IM but it does not exist in the monitoring system... (%s) recovering it.)" % (name, node_names)) + vm.recovered() + recover.append(name) + + self._recover_ids(recover) + except: + _LOGGER.exception("Error executing lifecycle of IM PowerManager.") + + return PowerManager.lifecycle(self) + + def recover(self, nname, node=None): + success, nname = self.power_off(nname, True) + if success: + if node: + node.mark_poweredoff() + node.set_state(Node.OFF_ERR) + return Node.OFF + + return False + + def _save_golden_image(self, vm): + success = False + _LOGGER.debug("Saving golden image for VM id: " + vm.vm_id) + try: + auth_data = self._read_auth_data(self._IM_VIRTUAL_CLUSTER_AUTH_DATA_FILE) + image_name = "im-%s" % str(uuid1()) + headers = {"Authorization": auth_data} + resp = requests.request("PUT", "%s/disks/0/snapshot?image_name=%s&auto_delete=1" % (vm.vm_id, image_name), + verify=False, headers=headers) + + if resp.status_code == 200: + new_image = resp.text + ec3_class = vm.radl.systems[0].getValue("ec3_class") + password = vm.radl.systems[0].getValue("disk.0.os.credentials.password") + self._golden_images[ec3_class] = new_image, password + # Save it to DB for persistence + self._store_golden_image(ec3_class, new_image, password) + else: + _LOGGER.error("Error saving golden image: %s. %s." % (resp.reason, resp.text)) + except: + _LOGGER.exception("Error saving golden image.") + return success From 55d9a232ed0e8e41cf28a3d940853b2ab4a1c1cd Mon Sep 17 00:00:00 2001 From: micafer Date: Wed, 21 Nov 2018 09:54:20 +0100 Subject: [PATCH 02/33] Add TOSCA support --- cluesplugins/im.py | 91 +++++++++++++++++++++++++------- etc/conf.d/plugin-im.cfg-example | 7 +-- 2 files changed, 77 insertions(+), 21 deletions(-) diff --git a/cluesplugins/im.py b/cluesplugins/im.py index 5e5372c..2fd039e 100644 --- a/cluesplugins/im.py +++ b/cluesplugins/im.py @@ -24,6 +24,7 @@ import requests from uuid import uuid1 import re +import yaml from radl import radl_parse @@ -80,6 +81,7 @@ def __init__(self): config_im = cpyutils.config.Configuration( "IM VIRTUAL CLUSTER", { + "IM_VIRTUAL_CLUSTER_TOSCA": False, "IM_VIRTUAL_CLUSTER_REST_API": "http://localhost:8800", "IM_VIRTUAL_CLUSTER_REST_SSL_CA_CERTS": "", "IM_VIRTUAL_CLUSTER_REST_SSL": False, @@ -90,6 +92,7 @@ def __init__(self): } ) + self._IM_VIRTUAL_CLUSTER_TOSCA = config_im.IM_VIRTUAL_CLUSTER_TOSCA self._IM_VIRTUAL_CLUSTER_AUTH_DATA_FILE = config_im.IM_VIRTUAL_CLUSTER_AUTH_DATA_FILE self._IM_VIRTUAL_CLUSTER_DROP_FAILING_VMS = config_im.IM_VIRTUAL_CLUSTER_DROP_FAILING_VMS self._IM_VIRTUAL_CLUSTER_FORGET_MISSING_VMS = config_im.IM_VIRTUAL_CLUSTER_FORGET_MISSING_VMS @@ -225,15 +228,55 @@ def _get_system(self, vm_info, radl_all, nname): _LOGGER.error("Error: we need more instances but ec3_if_fail of system %s is empty" % system_orig.name) return None + def _find_wn_nodetemplate_name(self, template): + try: + for name, node in template['topology_template']['node_templates'].items(): + if node['type'].startswith("tosca.nodes.indigo.LRMS.WorkerNode"): + for req in node['requirements']: + if 'host' in req: + return req['host'] + except Exception: + _LOGGER.exception("Error trying to get the WN template.") + + return None + + def _get_template(self, nnode): + inf_id = self._get_inf_id() + auth_data = self._read_auth_data(self._IM_VIRTUAL_CLUSTER_AUTH_DATA_FILE) + + count = 0 + vm_ids = self._get_vm_ids(inf_id, auth_data) + if len(vm_ids) > 1: + count = len(vm_ids) - 1 + + headers = {"Authorization": auth_data, "Accept": "application/json"} + resp = requests.request("GET", "%s/tosca" % inf_id, verify=False, headers=headers) + + if resp.status_code != 200: + _LOGGER.error("ERROR getting infrastructure template: %s" % str(resp.text)) + return None + else: + templateo = yaml.load(resp.json()["tosca"]) + node_name = self._find_wn_nodetemplate_name(templateo) + node_template = templateo['topology_template']['node_templates'][node_name] + + node_template['capabilities']['scalable']['properties']['count'] = count + 1 + # Put the dns name + if 'endpoint' not in node_template['capabilities']: + node_template['capabilities']['endpoint'] = {} + if 'properties' not in node_template['capabilities']['endpoint']: + node_template['capabilities']['endpoint']['properties'] = {} + node_template['capabilities']['endpoint']['properties']['dns_name'] = nnode + + return yaml.dump(templateo) + def _get_radl(self, nname): inf_id = self._get_inf_id() if not inf_id: _LOGGER.error("Error getting RADL. No infrastructure ID!!") return None auth_data = self._read_auth_data(self._IM_VIRTUAL_CLUSTER_AUTH_DATA_FILE) - - headers = {"Authorization": auth_data, "Accept": "application/json"} - resp = requests.request("GET", inf_id, verify=False, headers=headers) + vm_ids = self._get_vm_ids(inf_id, auth_data) # Get all the info from RADL # Especial features in system: @@ -243,8 +286,7 @@ def _get_radl(self, nname): #- 'ec3_if_fail': name of the next system configuration to try after this fails a launch or the number of instances saturates; the default value is ''. vm_info = {} - if resp.status_code == 200: - vm_ids = [vm_id['uri'] for vm_id in resp.json()["uri-list"]] + if vm_ids: # The first one is always the front-end node for vm_id in vm_ids[1:]: headers = {"Authorization": auth_data, "Accept": "text/*"} @@ -258,8 +300,6 @@ def _get_radl(self, nname): vm_info[ec3_class]['count'] += 1 else: _LOGGER.error("Error getting VM info: %s. %s." % (resp.reason, resp.text)) - else: - _LOGGER.error("Error getting infrastructure info: %s. %s." % (resp.reason, resp.text)) headers = {"Authorization": auth_data, "Accept": "text/*"} resp = requests.request("GET", "%s/radl" % inf_id, verify=False, headers=headers) @@ -306,6 +346,16 @@ def _get_radl(self, nname): else: _LOGGER.error("Error generating infrastructure RADL") return None + + def _get_vm_ids(self, inf_id, auth_data): + headers = {"Authorization": auth_data, "Accept": "application/json"} + resp = requests.request("GET", inf_id, verify=False, headers=headers) + + if resp.status_code != 200: + _LOGGER.error("Error getting infrastructure info: %s. %s." % (resp.reason, resp.text)) + return [] + else: + return [vm_id['uri'] for vm_id in resp.json()["uri-list"]] def _get_vms(self): inf_id = self._get_inf_id() @@ -314,13 +364,9 @@ def _get_vms(self): return self._mvs_seen now = cpyutils.eventloop.now() auth_data = self._read_auth_data(self._IM_VIRTUAL_CLUSTER_AUTH_DATA_FILE) - headers = {"Authorization": auth_data, "Accept": "application/json"} - resp = requests.request("GET", inf_id, verify=False, headers=headers) + vm_ids = self._get_vm_ids(inf_id, auth_data) - if resp.status_code != 200: - _LOGGER.error("Error getting infrastructure info: %s. %s." % (resp.reason, resp.text)) - else: - vm_ids = [vm_id['uri'] for vm_id in resp.json()["uri-list"]] + if vm_ids: # The first one is always the front-end node for vm_id in vm_ids[1:]: clues_node_name = None @@ -447,11 +493,20 @@ def power_on(self, nname): if not success: _LOGGER.error("Error starting VM: %s. %s." % (resp.reason, resp.text)) else: - radl_data = self._get_radl(nname) - if radl_data: - _LOGGER.debug("RADL to launch/restart node %s: %s" % (nname, radl_data)) - headers = {"Authorization": auth_data, "Content-Type": "text/plain", "Accept": "application/json"} - resp = requests.request("POST", inf_id, verify=False, headers=headers, data=radl_data) + if self._IM_VIRTUAL_CLUSTER_TOSCA: + data = self._get_template(nname) + else: + data = self._get_radl(nname) + + if data: + headers = {"Authorization": auth_data, "Accept": "application/json"} + if self._IM_VIRTUAL_CLUSTER_TOSCA: + _LOGGER.debug("TOSCA to launch/restart node %s: %s" % (nname, data)) + headers["Content-Type"] = "text/yaml" + else: + _LOGGER.debug("RADL to launch/restart node %s: %s" % (nname, data)) + headers["Content-Type"] = "text/plain" + resp = requests.request("POST", inf_id, verify=False, headers=headers, data=data) success = resp.status_code == 200 if success: vms_id = [vm_id['uri'] for vm_id in resp.json()["uri-list"]] diff --git a/etc/conf.d/plugin-im.cfg-example b/etc/conf.d/plugin-im.cfg-example index 8af42fd..e665860 100644 --- a/etc/conf.d/plugin-im.cfg-example +++ b/etc/conf.d/plugin-im.cfg-example @@ -7,9 +7,10 @@ # ------------------------------------------------------------------------------------------------------------------------------------------------------------------------- [IM VIRTUAL CLUSTER] -IM_VIRTUAL_CLUSTER_XMLRPC=http://localhost:8899 -IM_VIRTUAL_CLUSTER_XMLRCP_SSL_CA_CERTS='' -IM_VIRTUAL_CLUSTER_XMLRCP_SSL=False +IM_VIRTUAL_CLUSTER_TOSCA=False +IM_VIRTUAL_CLUSTER_REST_API=http://localhost:8800 +IM_VIRTUAL_CLUSTER_REST_SSL_CA_CERTS='' +IM_VIRTUAL_CLUSTER_REST_SSL=False IM_VIRTUAL_CLUSTER_AUTH_DATA_FILE=/usr/local/ec3/auth.dat IM_VIRTUAL_CLUSTER_DROP_FAILING_VMS=30 IM_VIRTUAL_CLUSTER_FORGET_MISSING_VMS=30 From c46c834c682107da0ea848eea715f2922867398f Mon Sep 17 00:00:00 2001 From: Miguel Caballer Date: Tue, 27 Nov 2018 11:00:06 +0100 Subject: [PATCH 03/33] Update im.py --- cluesplugins/im.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cluesplugins/im.py b/cluesplugins/im.py index 2fd039e..93cde54 100644 --- a/cluesplugins/im.py +++ b/cluesplugins/im.py @@ -268,7 +268,7 @@ def _get_template(self, nnode): node_template['capabilities']['endpoint']['properties'] = {} node_template['capabilities']['endpoint']['properties']['dns_name'] = nnode - return yaml.dump(templateo) + return yaml.safe_dump(templateo) def _get_radl(self, nname): inf_id = self._get_inf_id() From 4c46d085f4ebf4fe44409d95575fbb06cb2c3ebd Mon Sep 17 00:00:00 2001 From: micafer Date: Tue, 12 Nov 2019 17:30:32 +0100 Subject: [PATCH 04/33] Fix error with #N# in dns_name --- cluesplugins/im.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/cluesplugins/im.py b/cluesplugins/im.py index 93cde54..c32a3d2 100644 --- a/cluesplugins/im.py +++ b/cluesplugins/im.py @@ -378,6 +378,8 @@ def _get_vms(self): if resp.status_code == 200: radl = radl_parse.parse_radl(resp.text) clues_node_name = radl.systems[0].getValue('net_interface.0.dns_name') + if '#N#' in clues_node_name: + clues_node_name = clues_node_name.replace('#N#', vm_id) ec3_additional_vm = radl.systems[0].getValue('ec3_additional_vm') state = radl.systems[0].getValue('state') else: From 8c84befe8d76522dfa8b4f2ed8635ed71f0e49a7 Mon Sep 17 00:00:00 2001 From: micafer Date: Mon, 17 Feb 2020 12:49:01 +0100 Subject: [PATCH 05/33] Fix errors with tabs --- cluesplugins/im.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/cluesplugins/im.py b/cluesplugins/im.py index c32a3d2..41702cf 100644 --- a/cluesplugins/im.py +++ b/cluesplugins/im.py @@ -378,8 +378,8 @@ def _get_vms(self): if resp.status_code == 200: radl = radl_parse.parse_radl(resp.text) clues_node_name = radl.systems[0].getValue('net_interface.0.dns_name') - if '#N#' in clues_node_name: - clues_node_name = clues_node_name.replace('#N#', vm_id) + if '#N#' in clues_node_name: + clues_node_name = clues_node_name.replace('#N#', vm_id) ec3_additional_vm = radl.systems[0].getValue('ec3_additional_vm') state = radl.systems[0].getValue('state') else: From c512d57a71c89e2072d5fef6079e10fbb2904e51 Mon Sep 17 00:00:00 2001 From: micafer Date: Tue, 4 May 2021 14:01:03 +0200 Subject: [PATCH 06/33] Add inf id parameter --- cluesplugins/im.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/cluesplugins/im.py b/cluesplugins/im.py index 41702cf..9c4a12a 100644 --- a/cluesplugins/im.py +++ b/cluesplugins/im.py @@ -88,7 +88,8 @@ def __init__(self): "IM_VIRTUAL_CLUSTER_AUTH_DATA_FILE": "/usr/local/ec3/auth.dat", "IM_VIRTUAL_CLUSTER_DROP_FAILING_VMS": 30, "IM_VIRTUAL_CLUSTER_FORGET_MISSING_VMS": 30, - "IM_VIRTUAL_CLUSTER_DB_CONNECTION_STRING": "sqlite:///var/lib/clues2/clues.db" + "IM_VIRTUAL_CLUSTER_DB_CONNECTION_STRING": "sqlite:///var/lib/clues2/clues.db", + "_IM_VIRTUAL_CLUSTER_INF_ID": None } ) @@ -99,6 +100,7 @@ def __init__(self): self._IM_VIRTUAL_CLUSTER_REST_SSL = config_im.IM_VIRTUAL_CLUSTER_REST_SSL self._IM_VIRTUAL_CLUSTER_REST_API = config_im.IM_VIRTUAL_CLUSTER_REST_API self._IM_VIRTUAL_CLUSTER_REST_SSL_CA_CERTS = config_im.IM_VIRTUAL_CLUSTER_REST_SSL_CA_CERTS + self._IM_VIRTUAL_CLUSTER_INF_ID = config_im.IM_VIRTUAL_CLUSTER_INF_ID self._db = cpyutils.db.DB.create_from_string(config_im.IM_VIRTUAL_CLUSTER_DB_CONNECTION_STRING) self._create_db() @@ -107,7 +109,7 @@ def __init__(self): self._mvs_seen = {} self._golden_images = self._load_golden_images() self._stopped_vms = self._load_stopped_vms() - self._inf_id = None + self._inf_id = self._IM_VIRTUAL_CLUSTER_INF_ID def _create_db(self): try: From 02ef0a9b911abc15f3d8e163c80eb8d5bee03be7 Mon Sep 17 00:00:00 2001 From: micafer Date: Tue, 4 May 2021 14:04:07 +0200 Subject: [PATCH 07/33] Add inf id parameter --- cluesplugins/im.py | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/cluesplugins/im.py b/cluesplugins/im.py index 9c4a12a..2e69a93 100644 --- a/cluesplugins/im.py +++ b/cluesplugins/im.py @@ -89,7 +89,7 @@ def __init__(self): "IM_VIRTUAL_CLUSTER_DROP_FAILING_VMS": 30, "IM_VIRTUAL_CLUSTER_FORGET_MISSING_VMS": 30, "IM_VIRTUAL_CLUSTER_DB_CONNECTION_STRING": "sqlite:///var/lib/clues2/clues.db", - "_IM_VIRTUAL_CLUSTER_INF_ID": None + "IM_VIRTUAL_CLUSTER_INF_ID": None } ) @@ -100,7 +100,6 @@ def __init__(self): self._IM_VIRTUAL_CLUSTER_REST_SSL = config_im.IM_VIRTUAL_CLUSTER_REST_SSL self._IM_VIRTUAL_CLUSTER_REST_API = config_im.IM_VIRTUAL_CLUSTER_REST_API self._IM_VIRTUAL_CLUSTER_REST_SSL_CA_CERTS = config_im.IM_VIRTUAL_CLUSTER_REST_SSL_CA_CERTS - self._IM_VIRTUAL_CLUSTER_INF_ID = config_im.IM_VIRTUAL_CLUSTER_INF_ID self._db = cpyutils.db.DB.create_from_string(config_im.IM_VIRTUAL_CLUSTER_DB_CONNECTION_STRING) self._create_db() @@ -109,7 +108,7 @@ def __init__(self): self._mvs_seen = {} self._golden_images = self._load_golden_images() self._stopped_vms = self._load_stopped_vms() - self._inf_id = self._IM_VIRTUAL_CLUSTER_INF_ID + self._inf_id = config_im.IM_VIRTUAL_CLUSTER_INF_ID def _create_db(self): try: From 26b0e95cbaaab222db735064ba75c709f48adfaf Mon Sep 17 00:00:00 2001 From: Miguel Caballer Date: Thu, 6 May 2021 10:00:09 +0200 Subject: [PATCH 08/33] Fix error --- cluesplugins/im.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/cluesplugins/im.py b/cluesplugins/im.py index 2e69a93..9a8befb 100644 --- a/cluesplugins/im.py +++ b/cluesplugins/im.py @@ -457,7 +457,8 @@ def _get_vms(self): #_LOGGER.debug("Node %s with VM with id %s is stopped." % (clues_node_name, vm_id)) # from the nodes that we have powered on, check which of them are still running - for nname, node in self._mvs_seen.items(): + vms = dict(self._mvs_seen) + for nname, node in vms.items(): if (now - node.timestamp_seen) > self._IM_VIRTUAL_CLUSTER_FORGET_MISSING_VMS: _LOGGER.debug("vm %s is not seen for a while... let's forget it" % nname) del self._mvs_seen[nname] From d0e5c73043b96e5e93c45bc200c287c8bed07e96 Mon Sep 17 00:00:00 2001 From: Miguel Caballer Date: Fri, 3 Sep 2021 09:43:32 +0200 Subject: [PATCH 09/33] Fix typo --- etc/conf.d/plugin-kubernetes.cfg-example | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/etc/conf.d/plugin-kubernetes.cfg-example b/etc/conf.d/plugin-kubernetes.cfg-example index ecdc16a..bc2417c 100644 --- a/etc/conf.d/plugin-kubernetes.cfg-example +++ b/etc/conf.d/plugin-kubernetes.cfg-example @@ -7,9 +7,9 @@ [KUBERNETES] KUBERNETES_SERVER=http://localhost:8080 -UBERNETES_PODS_API_URL_PATH=/api/v1/pods +KUBERNETES_PODS_API_URL_PATH=/api/v1/pods KUBERNETES_NODES_API_URL_PATH=/api/v1/nodes KUBERNETES_NODE_MEMORY=1073741824 KUBERNETES_NODE_SLOTS=1 KUBERNETES_NODE_PODS=110 -#KUBERNETES_TOKEN=some_token \ No newline at end of file +#KUBERNETES_TOKEN=some_token From 8ad6d46bcd022b4d72eb17ff85ed1fa98b59cbbc Mon Sep 17 00:00:00 2001 From: Miguel Caballer Date: Wed, 8 Sep 2021 12:01:45 +0200 Subject: [PATCH 10/33] Enable to set inf_id without the whole url --- cluesplugins/im.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/cluesplugins/im.py b/cluesplugins/im.py index 9a8befb..1944b13 100644 --- a/cluesplugins/im.py +++ b/cluesplugins/im.py @@ -109,6 +109,9 @@ def __init__(self): self._golden_images = self._load_golden_images() self._stopped_vms = self._load_stopped_vms() self._inf_id = config_im.IM_VIRTUAL_CLUSTER_INF_ID + # If the ID is not the whoule URL, complete it + if self._inf_id and not self._inf_id.startswith("http"): + self._inf_id = "%s/infrastructures/%s" % (self._IM_VIRTUAL_CLUSTER_REST_API, self._inf_id) def _create_db(self): try: From bf2ee122bb5df4280cdd7a86c3c0c36e6e7007eb Mon Sep 17 00:00:00 2001 From: micafer Date: Tue, 14 Sep 2021 10:07:28 +0200 Subject: [PATCH 11/33] Enable contextualize --- cluesplugins/im.py | 30 ++++++++++++++++++++++++++---- 1 file changed, 26 insertions(+), 4 deletions(-) diff --git a/cluesplugins/im.py b/cluesplugins/im.py index 2e69a93..0cdab6a 100644 --- a/cluesplugins/im.py +++ b/cluesplugins/im.py @@ -27,6 +27,7 @@ import yaml from radl import radl_parse +from radl.radl import contextualize, contextualize_item import cpyutils.db import cpyutils.config @@ -336,10 +337,31 @@ def _get_radl(self, nname): system_orig.setValue("disk.0.os.credentials.password", password) new_radl += str(system_orig) + "\n" - for configure in radl_all.configures: - if configure.name == current_system: - configure.name = nname - new_radl += str(configure) + "\n" + if radl_all.contextualize: + node_citems = [] + node_configures = [] + for citem in radl_all.contextualize.items.values(): + if citem.system == current_system: + node_configures.append(citem.configure) + node_citems.append(contextualize_item(nname, + citem.configure, + citem.num)) + for configure in radl_all.configures: + if configure.name in node_configures: + new_radl += str(configure) + "\n" + new_radl += str(contextualize(node_citems)) + "\n" + else: + node_configure = None + for configure in radl_all.configures: + if not node_configure and configure.name == 'wn': + node_configure = configure + if configure.name == current_system: + node_configure = configure + break + + if node_configure: + node_configure.name = nname + new_radl += str(node_configure) + "\n" new_radl += "deploy " + nname + " 1" From f042af60f44ae063c60388391033567595a040d1 Mon Sep 17 00:00:00 2001 From: Miguel Caballer Date: Tue, 21 Sep 2021 12:33:02 +0200 Subject: [PATCH 12/33] Fix problem with log --- clueslib/hooks.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/clueslib/hooks.py b/clueslib/hooks.py index 5a5827d..c378d67 100644 --- a/clueslib/hooks.py +++ b/clueslib/hooks.py @@ -20,7 +20,7 @@ import os.path from .configlib import _CONFIGURATION_HOOKS -cpyutils.log.Log.setup() +#cpyutils.log.Log.setup() _LOGGER = cpyutils.log.Log("HOOKS") class HookSystem: @@ -108,4 +108,4 @@ def request(self, request): try: HOOKS except: - HOOKS = HookSystem() \ No newline at end of file + HOOKS = HookSystem() From b07ac8a81763f0046fd2cfaae7ba2e2284b1be45 Mon Sep 17 00:00:00 2001 From: micafer Date: Tue, 21 Sep 2021 12:33:56 +0200 Subject: [PATCH 13/33] Fix issues --- cluesplugins/im.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/cluesplugins/im.py b/cluesplugins/im.py index 79b9acf..048e2be 100644 --- a/cluesplugins/im.py +++ b/cluesplugins/im.py @@ -590,6 +590,8 @@ def power_off(self, nname, destroy=False): self._store_stopped_vm(nname, vm.vm_id) if not success: _LOGGER.error("ERROR stopping node %s: %s. %s." % (nname, resp.reason, resp.text)) + else: + _LOGGER.debug("Node %s successfully stopped." % nname) else: if nname in self._stopped_vms: self._delete_stopped_vm(nname) @@ -599,6 +601,8 @@ def power_off(self, nname, destroy=False): success = resp.status_code == 200 if not success: _LOGGER.error("ERROR deleting node %s: %s. %s." % (nname, resp.reason, resp.text)) + else: + _LOGGER.debug("Node %s successfully deleted." % nname) else: _LOGGER.debug("Not powering off/stopping node %s" % nname) success = False @@ -648,7 +652,7 @@ def lifecycle(self): if self._IM_VIRTUAL_CLUSTER_DROP_FAILING_VMS > 0: if node.name in vms: vm = vms[node.name] - time_off = now - node.timestamp_state + time_off = now - max(node.timestamp_state, vm.timestamp_created) time_recovered = now - vm.timestamp_recovered _LOGGER.warning("node %s has a VM running but it is OFF or UNKNOWN in the monitoring system since %d seconds" % (node.name, time_off)) if time_off > self._IM_VIRTUAL_CLUSTER_DROP_FAILING_VMS: From 8c329f281b56ba3ce66d686b65923cdec70d91dc Mon Sep 17 00:00:00 2001 From: Miguel Caballer Date: Fri, 28 Jan 2022 13:24:26 +0100 Subject: [PATCH 14/33] use setuptools --- setup.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/setup.py b/setup.py index 413a0b9..cef0e26 100644 --- a/setup.py +++ b/setup.py @@ -1,7 +1,7 @@ #!/usr/bin/env python from version import VERSION -from distutils.core import setup -from distutils.command.install import install +from setuptools import setup +from setuptools.command.install import install import distutils.archive_util import os From ff6517b25a09420551220faef2121198e3d6523c Mon Sep 17 00:00:00 2001 From: Miguel Caballer Date: Fri, 28 Jan 2022 13:57:51 +0100 Subject: [PATCH 15/33] Update setup.py --- setup.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/setup.py b/setup.py index cef0e26..d69aba4 100644 --- a/setup.py +++ b/setup.py @@ -31,6 +31,10 @@ # oneuser chgrp clues oneadmin # cp conf.d/plugin-one.cfg-example conf.d/plugin-ipmi.cfg-example /etc/clues2/conf.d/ +# Avoid using wheel as it does not copy data_files to / dir +if 'bdist_wheel' in sys.argv: + raise RuntimeError("This setup.py does not support wheels") + class my_install(install): def touch(self, fname): if os.path.exists(fname): From 08a94a2eea25d7203de69bd67d7ef1e3906e08fd Mon Sep 17 00:00:00 2001 From: Miguel Caballer Date: Thu, 3 Feb 2022 15:34:55 +0100 Subject: [PATCH 16/33] Update setup.py --- setup.py | 1 + 1 file changed, 1 insertion(+) diff --git a/setup.py b/setup.py index d69aba4..182c5d2 100644 --- a/setup.py +++ b/setup.py @@ -4,6 +4,7 @@ from setuptools.command.install import install import distutils.archive_util import os +import sys # apt-get install python-mysqldb # tar xfz cluesonebindings-0.28.tar.gz From 2547d1a494f4dcc21af057a4e5c767553d1e6f9e Mon Sep 17 00:00:00 2001 From: micafer Date: Thu, 3 Feb 2022 17:14:58 +0100 Subject: [PATCH 17/33] Fix tests --- cluesplugins/im.py | 3 +- test/test_im.py | 186 +++++++++++++++++++++++++-------------------- 2 files changed, 107 insertions(+), 82 deletions(-) diff --git a/cluesplugins/im.py b/cluesplugins/im.py index 048e2be..e5136f3 100644 --- a/cluesplugins/im.py +++ b/cluesplugins/im.py @@ -25,6 +25,7 @@ from uuid import uuid1 import re import yaml +import os from radl import radl_parse from radl.radl import contextualize, contextualize_item @@ -405,7 +406,7 @@ def _get_vms(self): radl = radl_parse.parse_radl(resp.text) clues_node_name = radl.systems[0].getValue('net_interface.0.dns_name') if '#N#' in clues_node_name: - clues_node_name = clues_node_name.replace('#N#', vm_id) + clues_node_name = clues_node_name.replace('#N#', os.path.basename(vm_id)) ec3_additional_vm = radl.systems[0].getValue('ec3_additional_vm') state = radl.systems[0].getValue('state') else: diff --git a/test/test_im.py b/test/test_im.py index 9769837..f628b92 100644 --- a/test/test_im.py +++ b/test/test_im.py @@ -39,20 +39,18 @@ def __init__(self, *args): def test_read_auth_data(self): res = powermanager._read_auth_data(os.path.join(self.TESTS_PATH, 'test-files/auth.dat')) - self.assertEqual(res[0], {'type': 'InfrastructureManager', 'username': 'user', 'password': 'pass'}) - self.assertEqual(res[1], {'host': 'server:2633', - 'id': 'one', - 'password': 'pass', - 'type': 'OpenNebula', - 'username': 'user'}) + lines = res.split('\\n') + self.assertEqual(lines[0], 'type = InfrastructureManager; username = user; password = pass') + self.assertEqual(lines[1], 'id = one; type = OpenNebula; host = server:2633; username = user; password = pass') @patch("cluesplugins.im.powermanager._read_auth_data") - @patch("cluesplugins.im.powermanager._get_server") @patch("cpyutils.db.DB.create_from_string") - def test_get_inf_id(self, createdb, get_server, read_auth): - server = MagicMock() - server.GetInfrastructureList.return_value = (True, ['infid1']) - get_server.return_value = server + @patch("requests.request") + def test_get_inf_id(self, request, createdb, read_auth): + resp = MagicMock() + resp.status_code = 200 + resp.json.return_value = {"uri-list": [{"uri": "http://server.com/infid1"}]} + request.return_value = resp read_auth.return_value = {'type': 'InfrastructureManager', 'username': 'user', 'password': 'pass'} @@ -62,27 +60,54 @@ def test_get_inf_id(self, createdb, get_server, read_auth): test_im = powermanager() res = test_im._get_inf_id() - self.assertEqual(res, "infid1") + self.assertEqual(res, "http://server.com/infid1") @patch("cluesplugins.im.powermanager.recover") @patch("cluesplugins.im.powermanager._read_auth_data") @patch("cluesplugins.im.powermanager._get_inf_id") - @patch("cluesplugins.im.powermanager._get_server") @patch("cpyutils.db.DB.create_from_string") @patch("cpyutils.eventloop.now") - def test_get_vms(self, now, createdb, get_server, get_inf_id, read_auth, recover): - get_inf_id.return_value = "infid" - read_auth.return_value = {'type': 'InfrastructureManager', 'username': 'user', 'password': 'pass'} + @patch("requests.request") + def test_get_vms(self, request, now, createdb, get_inf_id, read_auth, recover): + get_inf_id.return_value = "http://server.com/infid" + read_auth.return_value = "type = InfrastructureManager; username = user; password = pass" now.return_value = 100 - server = MagicMock() - server.GetInfrastructureInfo.return_value = (True, ['0', '1']) - radl = """system wn ( + inf_info = MagicMock() + inf_info.status_code = 200 + inf_info.json.return_value = {"uri-list": [{"uri": "http://server.com/infid/vms/0"}, + {"uri": "http://server.com/infid/vms/1"}]} + vm_info = MagicMock() + vm_info.status_code = 200 + vm_info.text = """system wn ( net_interface.0.dns_name = 'node-#N#' and state = 'configured' )""" - server.GetVMInfo.return_value = (True, radl) - get_server.return_value = server + + vm_info2 = MagicMock() + vm_info2.status_code = 200 + vm_info2.text = """system wn ( + net_interface.0.dns_name = 'node-#N#' and + state = 'unconfigured' + )""" + + ctxt_out = MagicMock() + ctxt_out.status_code = 200 + ctxt_out.text = "ERROR!" + + vm_info3 = MagicMock() + vm_info3.status_code = 200 + vm_info3.text = """system wn ( + net_interface.0.dns_name = 'node-#N#' and + state = 'unconfigured' and + ec3_additional_vm = 'true' + )""" + + request.side_effect = [inf_info, vm_info, + inf_info, vm_info, + inf_info, vm_info2, ctxt_out, + inf_info, vm_info3, + inf_info, vm_info3] db = MagicMock() db.sql_query.return_value = True, "", [] @@ -96,8 +121,9 @@ def test_get_vms(self, now, createdb, get_server, get_inf_id, read_auth, recover node.state = Node.IDLE test_im._clues_daemon.get_node.return_value = node res = test_im._get_vms() + self.assertEqual(len(res), 1) - self.assertEqual(res['node-1'].vm_id, '1') + self.assertEqual(res['node-1'].vm_id, 'http://server.com/infid/vms/1') self.assertEqual(res['node-1'].last_state, "configured") self.assertEqual(res['node-1'].timestamp_seen, 100) self.assertEqual(res['node-1'].timestamp_created, 100) @@ -108,12 +134,6 @@ def test_get_vms(self, now, createdb, get_server, get_inf_id, read_auth, recover self.assertEqual(res2['node-1'].timestamp_seen, 200) # Test the node is unconfigured - radl = """system wn ( - net_interface.0.dns_name = 'node-#N#' and - state = 'unconfigured' - )""" - server.GetVMInfo.return_value = (True, radl) - server.GetVMContMsg.return_value = (True, "ERROR!") res = test_im._get_vms() # Recover must be called self.assertEqual(recover.call_count, 1) @@ -125,38 +145,49 @@ def test_get_vms(self, now, createdb, get_server, get_inf_id, read_auth, recover self.assertEqual(recover.call_count, 1) node.enabled = True - radl = """system wn ( - net_interface.0.dns_name = 'node-#N#' and - state = 'unconfigured' and - ec3_additional_vm = 'true' - )""" - server.GetVMInfo.return_value = (True, radl) res = test_im._get_vms() # Recover must NOT be called again in this case self.assertEqual(recover.call_count, 1) @patch("cluesplugins.im.powermanager._read_auth_data") @patch("cluesplugins.im.powermanager._get_inf_id") - @patch("cluesplugins.im.powermanager._get_server") @patch("cpyutils.db.DB.create_from_string") @patch("cpyutils.eventloop.now") - def test_get_radl(self, now, createdb, get_server, get_inf_id, read_auth): - get_inf_id.return_value = "infid" - read_auth.return_value = {'type': 'InfrastructureManager', 'username': 'user', 'password': 'pass'} + @patch("requests.request") + def test_get_radl(self, request, now, createdb, get_inf_id, read_auth): + get_inf_id.return_value = "http://server.com/infid" + read_auth.return_value = "type = InfrastructureManager; username = user; password = pass" now.return_value = 100 - server = MagicMock() - server.GetInfrastructureInfo.return_value = (True, ['0', '1']) - radl = """system wn ( + inf_info = MagicMock() + inf_info.status_code = 200 + inf_info.json.return_value = {"uri-list": [{"uri": "http://server.com/infid/vms/0"}, + {"uri": "http://server.com/infid/vms/1"}]} + vm_info = MagicMock() + vm_info.status_code = 200 + vm_info.text = """system wn ( net_interface.0.dns_name = 'node-#N#' and - ec3_class = 'wn' + state = 'unconfigured' and + ec3_additional_vm = 'true' )""" - server.GetVMInfo.return_value = (True, radl) - infra_radl = """system wn ( + + radl_info = MagicMock() + radl_info.status_code = 200 + radl_info.text = """system wn ( net_interface.0.dns_name = 'node-#N#' )""" - server.GetInfrastructureRADL.return_value = (True, infra_radl) - get_server.return_value = server + + radl_info2 = MagicMock() + radl_info2.status_code = 200 + radl_info2.text = """system wn ( + net_interface.0.dns_name = 'node-#N#' + ) + contextualize ( + system wn configure wn + )""" + + request.side_effect = [inf_info, vm_info, radl_info, + inf_info, vm_info, radl_info2] db = MagicMock() db.sql_query.return_value = True, "", [] @@ -172,13 +203,6 @@ def test_get_radl(self, now, createdb, get_server, get_inf_id, read_auth): self.assertEqual(radl_res.deploys[0].id, 'node-2') self.assertEqual(radl_res.deploys[0].vm_number, 1) - infra_radl = """system wn ( - net_interface.0.dns_name = 'node-#N#' - ) - contextualize ( - system wn configure wn - )""" - server.GetInfrastructureRADL.return_value = (True, infra_radl) res = test_im._get_radl('node-2') radl_res = parse_radl(res) self.assertEqual(radl_res.contextualize.items[('node-2', 'wn')].system, 'node-2') @@ -188,15 +212,15 @@ def test_get_radl(self, now, createdb, get_server, get_inf_id, read_auth): @patch("cluesplugins.im.powermanager._get_vms") @patch("cluesplugins.im.powermanager._read_auth_data") @patch("cluesplugins.im.powermanager._get_inf_id") - @patch("cluesplugins.im.powermanager._get_server") @patch("cpyutils.db.DB.create_from_string") - def test_power_on(self, createdb, get_server, get_inf_id, read_auth, get_vms, get_radl): - get_inf_id.return_value = "infid" - read_auth.return_value = {'type': 'InfrastructureManager', 'username': 'user', 'password': 'pass'} + @patch("requests.request") + def test_power_on(self, request, createdb, get_inf_id, read_auth, get_vms, get_radl): + get_inf_id.return_value = "http://server.com/infid" + read_auth.return_value = "type = InfrastructureManager; username = user; password = pass" - server = MagicMock() - server.AddResource.return_value = (True, ['2']) - get_server.return_value = server + resp = MagicMock() + resp.status_code = 200 + request.return_value = resp db = MagicMock() db.sql_query.return_value = True, "", [] @@ -217,19 +241,18 @@ def test_power_on(self, createdb, get_server, get_inf_id, read_auth, get_vms, ge self.assertTrue(res) @patch("cluesplugins.im.powermanager._get_inf_id") - @patch("cluesplugins.im.powermanager._get_server") @patch("cluesplugins.im.powermanager._read_auth_data") @patch("cpyutils.db.DB.create_from_string") @patch("cpyutils.eventloop.now") - def test_power_off(self, now, createdb, read_auth, get_server, get_inf_id): - get_inf_id.return_value = "infid" - read_auth.return_value = {'type': 'InfrastructureManager', 'username': 'user', 'password': 'pass'} + @patch("requests.request") + def test_power_off(self, request, now, createdb, read_auth, get_inf_id): + get_inf_id.return_value = "http://server.com/infid" + read_auth.return_value = "type = InfrastructureManager; username = user; password = pass" now.return_value = 100 - server = MagicMock() - server.StopVM.return_value = (True, ['2']) - server.RemoveResource.return_value = (True, ['2']) - get_server.return_value = server + resp = MagicMock() + resp.status_code = 200 + request.side_effect = [resp, resp, resp, resp] db = MagicMock() db.sql_query.return_value = True, "", [] @@ -237,7 +260,7 @@ def test_power_off(self, now, createdb, read_auth, get_server, get_inf_id): test_im = powermanager() vm = MagicMock() - vm.vm_id = "vmid" + vm.vm_id = "http://server.com/infid/vms/1" radl = """system node-1 ( net_interface.0.dns_name = 'node-#N#' )""" @@ -246,8 +269,7 @@ def test_power_off(self, now, createdb, read_auth, get_server, get_inf_id): res, nname = test_im.power_off('node-1') self.assertTrue(res) self.assertEqual(nname, 'node-1') - self.assertEqual(server.RemoveResource.call_count, 1) - self.assertEqual(server.StopVM.call_count, 0) + self.assertEqual(request.call_args_list[0][0], ('DELETE', 'http://server.com/infid/vms/1')) radl = """system node-1 ( net_interface.0.dns_name = 'node-#N#' and @@ -257,27 +279,27 @@ def test_power_off(self, now, createdb, read_auth, get_server, get_inf_id): res, nname = test_im.power_off('node-1') self.assertTrue(res) self.assertEqual(nname, 'node-1') - self.assertEqual(server.StopVM.call_count, 1) - self.assertEqual(server.RemoveResource.call_count, 1) + self.assertEqual(request.call_args_list[1][0], ('PUT', 'http://server.com/infid/vms/1/stop')) @patch("cluesplugins.im.uuid1") @patch("cluesplugins.im.powermanager._get_inf_id") - @patch("cluesplugins.im.powermanager._get_server") @patch("cluesplugins.im.powermanager._read_auth_data") @patch("cluesplugins.im.powermanager._get_vms") @patch("cpyutils.db.DB.create_from_string") @patch("cpyutils.eventloop.now") - def test_lifecycle(self, now, createdb, get_vms, read_auth, get_server, get_inf_id, uuid1): + @patch("requests.request") + def test_lifecycle(self, request, now, createdb, get_vms, read_auth, get_inf_id, uuid1): now.return_value = 100 vm = MagicMock() - vm.vm_id = "vmid" + vm.vm_id = "http://server.com/infid/vms/1" radl = """system node-1 ( net_interface.0.dns_name = 'node-#N#' and state = 'configured' )""" vm.radl = parse_radl(radl) vm.timestamp_recovered = 0 + vm.timestamp_created = 0 get_vms.return_value = {'node-1': vm} db = MagicMock() @@ -318,17 +340,19 @@ def test_lifecycle(self, now, createdb, get_vms, read_auth, get_server, get_inf_ auth = {'type': 'InfrastructureManager', 'username': 'user', 'password': 'pass'} read_auth.return_value = auth get_inf_id.return_value = "infid" - server = MagicMock() - server.CreateDiskSnapshot.return_value = (True, 'image') - get_server.return_value = server + + resp = MagicMock() + resp.status_code = 200 + resp.text = "image" + request.return_value = resp + uuid1.return_value = "uuid" test_im._store_golden_image = MagicMock() test_im.lifecycle() self.assertEqual(vm.recovered.call_count, 1) - self.assertEqual(server.CreateDiskSnapshot.call_count, 1) - self.assertEqual(server.CreateDiskSnapshot.call_args_list[0][0], ('infid', 'vmid', 0, 'im-uuid', True, auth)) + self.assertEqual(request.call_args_list[0][0], ('PUT', 'http://server.com/infid/vms/1/disks/0/snapshot?image_name=im-uuid&auto_delete=1')) self.assertEqual(test_im._store_golden_image.call_args_list[0][0], ('wn', 'image', 'pass')) From 29d0c8a70f5f2269b68f528cb1f67e7e0f0feda4 Mon Sep 17 00:00:00 2001 From: Miguel Caballer Date: Mon, 7 Feb 2022 13:57:40 +0100 Subject: [PATCH 18/33] Update setup.py --- setup.py | 5 ----- 1 file changed, 5 deletions(-) diff --git a/setup.py b/setup.py index 182c5d2..cef0e26 100644 --- a/setup.py +++ b/setup.py @@ -4,7 +4,6 @@ from setuptools.command.install import install import distutils.archive_util import os -import sys # apt-get install python-mysqldb # tar xfz cluesonebindings-0.28.tar.gz @@ -32,10 +31,6 @@ # oneuser chgrp clues oneadmin # cp conf.d/plugin-one.cfg-example conf.d/plugin-ipmi.cfg-example /etc/clues2/conf.d/ -# Avoid using wheel as it does not copy data_files to / dir -if 'bdist_wheel' in sys.argv: - raise RuntimeError("This setup.py does not support wheels") - class my_install(install): def touch(self, fname): if os.path.exists(fname): From ec2d84f028d30728420228cf6f8668513b16f92f Mon Sep 17 00:00:00 2001 From: micafer Date: Mon, 7 Feb 2022 16:23:58 +0100 Subject: [PATCH 19/33] Add POW_ON state --- clueslib/schedulers.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/clueslib/schedulers.py b/clueslib/schedulers.py index e4f6792..e8c5ba7 100644 --- a/clueslib/schedulers.py +++ b/clueslib/schedulers.py @@ -589,7 +589,7 @@ def schedule(self, requests_queue, monitoring_info, candidates_on, candidates_of nodes_that_can_be_poweron_on.append((node_slots_free, node.name)) else: slots_free += node_slots_free - if node.state == Node.IDLE: + if node.state in [Node.IDLE, Node.POW_ON]: nodes_free += 1 elif node.state in [ Node.OFF ]: From ca52d6a129957d5cee60b8f192f2e0e7961b1365 Mon Sep 17 00:00:00 2001 From: micafer Date: Tue, 8 Feb 2022 12:23:54 +0100 Subject: [PATCH 20/33] Add slurm test and fix parse_scontrol --- cluesplugins/slurm.py | 138 +++++++++++++++++--------------- test/test-files/slurm_jobs.txt | 2 + test/test-files/slurm_nodes.txt | 2 + test/test_slurm.py | 96 ++++++++++++++++++++++ 4 files changed, 175 insertions(+), 63 deletions(-) create mode 100644 test/test-files/slurm_jobs.txt create mode 100644 test/test-files/slurm_nodes.txt create mode 100644 test/test_slurm.py diff --git a/cluesplugins/slurm.py b/cluesplugins/slurm.py index d0e61b9..3d1b025 100644 --- a/cluesplugins/slurm.py +++ b/cluesplugins/slurm.py @@ -20,6 +20,7 @@ import clueslib.helpers from cpyutils.runcommand import runcommand +from clueslib.platform import LRMS from clueslib.node import NodeInfo from cpyutils.evaluate import TypedClass, TypedList import collections @@ -68,16 +69,27 @@ def parse_scontrol(out): for line in out.split("\n"): line = line.strip() if not line: continue - d = {}; r.append(d); s = False - for k in [ j for i in line.split("=") for j in i.rsplit(" ", 1) ]: - if s: d[f] = k - else: f = k - s = not s + d = {} + while line: + item = "" + while "=" not in item: + split_val = line.rsplit(" ", 1) + # in the last case split_val only has 1 elem + elem = split_val[-1] + line = split_val[0] if len(split_val) == 2 else "" + if "=" not in item and item: + item = "%s %s" % (elem, item) + else: + item += elem + k,v = item.split("=", 1) + d[k] = v.strip() + r.append(d) return r + # TODO: consider states in the second line of slurm # Function that translates the slurm node state into a valid clues2 node state -def infer_clues_node_state(self, state): +def infer_clues_node_state(state): # SLURM node states: "NoResp", "ALLOC", "ALLOCATED", "COMPLETING", "DOWN", "DRAIN", "ERROR, "FAIL", "FAILING", "FUTURE" "IDLE", # "MAINT", "MIXED", "PERFCTRS/NPC", "RESERVED", "POWER_DOWN", "POWER_UP", "RESUME" or "UNDRAIN". # CLUES2 node states: ERROR, UNKNOWN, IDLE, USED, OFF @@ -111,60 +123,7 @@ def infer_clues_job_state(state): return res_state -# Function that recovers the partitions of a node -# A node can be in several queues: SLURM has supported configuring nodes in more than one partition since version 0.7.0 -def get_partition(self, node_name): - - '''Exit example of scontrol show partitions: - PartitionName=wn - AllowGroups=ALL AllowAccounts=ALL AllowQos=ALL - AllocNodes=ALL Default=NO - DefaultTime=NONE DisableRootJobs=NO GraceTime=0 Hidden=NO - MaxNodes=UNLIMITED MaxTime=UNLIMITED MinNodes=1 LLN=NO MaxCPUsPerNode=UNLIMITED - Nodes=wn[0-4] - Priority=1 RootOnly=NO ReqResv=NO Shared=NO PreemptMode=OFF - State=UP TotalCPUs=5 TotalNodes=5 SelectTypeParameters=N/A - DefMemPerNode=UNLIMITED MaxMemPerNode=UNLIMITED''' - - res_queue = [] - exit = "" - - try: - success, out = runcommand(self._partition) - if not success: - _LOGGER.error("could not obtain information about SLURM partitions %s (command rc != 0)" % self._server_ip) - return None - else: - exit = parse_scontrol(out) - except: - _LOGGER.error("could not obtain information about SLURM partitions %s (%s)" % (self._server_ip, exit)) - return None - - if exit: - for key in exit: - nodes = str(key["Nodes"]) - if nodes == node_name: - #nodes is like wn1 - res_queue.append(key["PartitionName"]) - else: - #nodes is like wnone-[0-1] - pos1 = nodes.find("[") - pos2 = nodes.find("]") - pos3 = nodes.find("-", pos1) - if pos1 > -1 and pos2 > -1 and pos3 > -1: - num1 = int(nodes[pos1+1:pos3]) - num2 = int(nodes[pos3+1:pos2]) - name = nodes[:pos1] - while num1 <= num2: - nodename = name + str(num1) - if nodename == node_name: - res_queue.append(key["PartitionName"]) - break; - num1 = num1 + 1 - - return res_queue - -class lrms(clueslib.platform.LRMS): +class lrms(LRMS): def __init__(self, SLURM_SERVER = None, SLURM_PARTITION_COMMAND = None, SLURM_NODES_COMMAND = None, SLURM_JOBS_COMMAND = None): import cpyutils.config @@ -184,6 +143,59 @@ def __init__(self, SLURM_SERVER = None, SLURM_PARTITION_COMMAND = None, SLURM_NO self._jobs = clueslib.helpers.val_default(SLURM_JOBS_COMMAND, config_slurm.SLURM_JOBS_COMMAND) clueslib.platform.LRMS.__init__(self, "SLURM_%s" % self._server_ip) + # Function that recovers the partitions of a node + # A node can be in several queues: SLURM has supported configuring nodes in more than one partition since version 0.7.0 + def _get_partition(self, node_name): + + '''Exit example of scontrol show partitions: + PartitionName=wn + AllowGroups=ALL AllowAccounts=ALL AllowQos=ALL + AllocNodes=ALL Default=NO + DefaultTime=NONE DisableRootJobs=NO GraceTime=0 Hidden=NO + MaxNodes=UNLIMITED MaxTime=UNLIMITED MinNodes=1 LLN=NO MaxCPUsPerNode=UNLIMITED + Nodes=wn[0-4] + Priority=1 RootOnly=NO ReqResv=NO Shared=NO PreemptMode=OFF + State=UP TotalCPUs=5 TotalNodes=5 SelectTypeParameters=N/A + DefMemPerNode=UNLIMITED MaxMemPerNode=UNLIMITED''' + + res_queue = [] + exit = "" + + try: + success, out = runcommand(self._partition) + if not success: + _LOGGER.error("could not obtain information about SLURM partitions %s (command rc != 0)" % self._server_ip) + return None + else: + exit = parse_scontrol(out) + except Exception as ex: + _LOGGER.error("could not obtain information about SLURM partitions %s (%s)" % (self._server_ip, exit)) + return None + + if exit: + for key in exit: + nodes = str(key["Nodes"]) + if nodes == node_name: + #nodes is like wn1 + res_queue.append(key["PartitionName"]) + else: + #nodes is like wnone-[0-1] + pos1 = nodes.find("[") + pos2 = nodes.find("]") + pos3 = nodes.find("-", pos1) + if pos1 > -1 and pos2 > -1 and pos3 > -1: + num1 = int(nodes[pos1+1:pos3]) + num2 = int(nodes[pos3+1:pos2]) + name = nodes[:pos1] + while num1 <= num2: + nodename = name + str(num1) + if nodename == node_name: + res_queue.append(key["PartitionName"]) + break; + num1 = num1 + 1 + + return res_queue + def get_nodeinfolist(self): nodeinfolist = collections.OrderedDict() @@ -206,7 +218,7 @@ def get_nodeinfolist(self): return None else: exit = parse_scontrol(out) - except: + except Exception as ex: _LOGGER.error("could not obtain information about SLURM nodes %s (%s)" % (self._server_ip, exit)) return None @@ -219,9 +231,9 @@ def get_nodeinfolist(self): #NOTE: memory is in GB memory_total = _translate_mem_value(key["RealMemory"] + ".GB") memory_free = _translate_mem_value(key["RealMemory"] + ".GB") - _translate_mem_value(key["AllocMem"] + ".GB") - state = infer_clues_node_state(self, str(key["State"])) + state = infer_clues_node_state(str(key["State"])) keywords = {} - queues = get_partition(self, name) + queues = self._get_partition(name) keywords['hostname'] = TypedClass.auto(name) if queues: keywords['queues'] = TypedList([TypedClass.auto(q) for q in queues]) diff --git a/test/test-files/slurm_jobs.txt b/test/test-files/slurm_jobs.txt new file mode 100644 index 0000000..d29c7c0 --- /dev/null +++ b/test/test-files/slurm_jobs.txt @@ -0,0 +1,2 @@ +JobId=2 JobName=test.sh UserId=cloudadm(1000) GroupId=cloudadm(1000) MCS_label=N/A Priority=4294901759 Nice=0 Account=(null) QOS=(null) JobState=RUNNING Reason=None Dependency=(null) Requeue=0 Restarts=0 BatchFlag=1 Reboot=0 ExitCode=0:0 RunTime=00:00:03 TimeLimit=UNLIMITED TimeMin=N/A SubmitTime=2022-02-08T08:06:13 EligibleTime=2022-02-08T08:06:13 AccrueTime=2022-02-08T08:06:13 StartTime=2022-02-08T08:06:14 EndTime=Unknown Deadline=N/A SuspendTime=None SecsPreSuspend=0 LastSchedEval=2022-02-08T08:06:14 Partition=debug AllocNode:Sid=slurmserver:28190 ReqNodeList=(null) ExcNodeList=(null) NodeList=vnode-1 BatchHost=vnode-1 NumNodes=1 NumCPUs=1 NumTasks=1 CPUs/Task=1 ReqB:S:C:T=0:0:*:* TRES=cpu=1,node=1,billing=1 Socks/Node=* NtasksPerN:B:S:C=0:0:*:* CoreSpec=* MinCPUsNode=1 MinMemoryNode=1024 MinTmpDiskNode=0 Features=(null) DelayBoot=00:00:00 OverSubscribe=NO Contiguous=0 Licenses=(null) Network=(null) Command=/opt/cloudadm/test.sh WorkDir=/opt/cloudadm StdErr=/opt/cloudadm/slurm-2.out StdIn=/dev/null StdOut=/opt/cloudadm/slurm-2.out Power= MailUser=cloudadm MailType=NONE +JobId=3 JobName=test.sh UserId=cloudadm(1000) GroupId=cloudadm(1000) MCS_label=N/A Priority=4294901759 Nice=0 Account=(null) QOS=(null) JobState=PENDING Reason=None Dependency=(null) Requeue=0 Restarts=0 BatchFlag=1 Reboot=0 ExitCode=0:0 RunTime=00:00:03 TimeLimit=UNLIMITED TimeMin=N/A SubmitTime=2022-02-08T08:06:13 EligibleTime=2022-02-08T08:06:13 AccrueTime=2022-02-08T08:06:13 StartTime=2022-02-08T08:06:14 EndTime=Unknown Deadline=N/A SuspendTime=None SecsPreSuspend=0 LastSchedEval=2022-02-08T08:06:14 Partition=debug AllocNode:Sid=slurmserver:28190 ReqNodeList=(null) ExcNodeList=(null) NodeList=vnode-1 BatchHost=vnode-1 NumNodes=1 NumCPUs=1 NumTasks=1 CPUs/Task=1 ReqB:S:C:T=0:0:*:* TRES=cpu=1,node=1,billing=1 Socks/Node=* NtasksPerN:B:S:C=0:0:*:* CoreSpec=* MinCPUsNode=1 MinMemoryNode=0 MinTmpDiskNode=0 Features=(null) DelayBoot=00:00:00 OverSubscribe=NO Contiguous=0 Licenses=(null) Network=(null) Command=/opt/cloudadm/test.sh WorkDir=/opt/cloudadm StdErr=/opt/cloudadm/slurm-3.out StdIn=/dev/null StdOut=/opt/cloudadm/slurm-3.out Power= MailUser=cloudadm MailType=NONE diff --git a/test/test-files/slurm_nodes.txt b/test/test-files/slurm_nodes.txt new file mode 100644 index 0000000..a63d71f --- /dev/null +++ b/test/test-files/slurm_nodes.txt @@ -0,0 +1,2 @@ +NodeName=vnode-1 Arch=x86_64 CoresPerSocket=1 CPUAlloc=0 CPUTot=1 CPULoad=0.67 AvailableFeatures=(null) ActiveFeatures=(null) Gres=(null) NodeAddr=vnode-1 NodeHostName=vnode-1 Version=20.02.7 OS=Linux 5.4.0-73-generic #82-Ubuntu SMP Wed Apr 14 17:39:42 UTC 2021 RealMemory=1 AllocMem=0 FreeMem=111 Sockets=1 Boards=1 State=IDLE ThreadsPerCore=1 TmpDisk=0 Weight=1 Owner=N/A MCS_label=N/A Partitions=debug BootTime=2022-02-08T07:52:32 SlurmdStartTime=2022-02-08T08:04:14 CfgTRES=cpu=1,mem=1M,billing=1 AllocTRES= CapWatts=n/a CurrentWatts=0 AveWatts=0 ExtSensorsJoules=n/s ExtSensorsWatts=0 ExtSensorsTemp=n/s +NodeName=vnode-2 Arch=x86_64 CoresPerSocket=1 CPUAlloc=0 CPUTot=1 CPULoad=0.67 AvailableFeatures=(null) ActiveFeatures=(null) Gres=(null) NodeAddr=vnode-2 NodeHostName=vnode-2 Version=20.02.7 OS=Linux 5.4.0-73-generic #82-Ubuntu SMP Wed Apr 14 17:39:42 UTC 2021 RealMemory=1 AllocMem=0 FreeMem=111 Sockets=1 Boards=1 State=DOWN ThreadsPerCore=1 TmpDisk=0 Weight=1 Owner=N/A MCS_label=N/A Partitions=debug BootTime=2022-02-08T07:52:32 SlurmdStartTime=2022-02-08T08:04:14 CfgTRES=cpu=1,mem=1M,billing=1 AllocTRES= CapWatts=n/a CurrentWatts=0 AveWatts=0 ExtSensorsJoules=n/s ExtSensorsWatts=0 ExtSensorsTemp=n/s diff --git a/test/test_slurm.py b/test/test_slurm.py new file mode 100644 index 0000000..e0f9196 --- /dev/null +++ b/test/test_slurm.py @@ -0,0 +1,96 @@ +import unittest +import os +from mock import MagicMock, patch +import sys + +sys.path.append("..") +sys.path.append(".") + +from cpyutils.evaluate import TypedClass, TypedList +from clueslib.node import NodeInfo +from clueslib.request import Request +from cluesplugins import slurm + + +def read_file(file_name): + tests_path = os.path.dirname(os.path.abspath(__file__)) + abs_file_path = os.path.join(tests_path, file_name) + return open(abs_file_path, 'r').read() + + +class TestSLURMPlugin(unittest.TestCase): + + def __init__(self, *args): + """Init test class.""" + unittest.TestCase.__init__(self, *args) + + def test_infer_clues_node_state(self): + self.assertEqual(slurm.infer_clues_node_state('IDLE'), NodeInfo.IDLE) + self.assertEqual(slurm.infer_clues_node_state('FAIL'), NodeInfo.ERROR) + self.assertEqual(slurm.infer_clues_node_state('FAILING'), NodeInfo.ERROR) + self.assertEqual(slurm.infer_clues_node_state('ERROR'), NodeInfo.ERROR) + self.assertEqual(slurm.infer_clues_node_state('NoResp'), NodeInfo.ERROR) + self.assertEqual(slurm.infer_clues_node_state('DOWN'), NodeInfo.OFF) + self.assertEqual(slurm.infer_clues_node_state('DRAIN'), NodeInfo.OFF) + self.assertEqual(slurm.infer_clues_node_state('MAINT'), NodeInfo.OFF) + self.assertEqual(slurm.infer_clues_node_state('ALLOCATED'), NodeInfo.USED) + self.assertEqual(slurm.infer_clues_node_state('ALLOC'), NodeInfo.USED) + self.assertEqual(slurm.infer_clues_node_state('COMPLETING'), NodeInfo.USED) + self.assertEqual(slurm.infer_clues_node_state('MIXED'), NodeInfo.USED) + + def test_infer_clues_job_state(self): + self.assertEqual(slurm.infer_clues_job_state('PENDING'), Request.PENDING) + self.assertEqual(slurm.infer_clues_job_state('RUNNING'), Request.ATTENDED) + self.assertEqual(slurm.infer_clues_job_state('COMPLETED'), Request.ATTENDED) + + @patch('cluesplugins.slurm.runcommand') + def test_get_partition(self, runcommand): + lrms = slurm.lrms() + runcommand.return_value = True, read_file('./test-files/slurm_partitions.txt').encode() + self.assertEqual(lrms._get_partition('vnode-1'), ['debug']) + + def test_init_lrms_empty(self): + lrms = slurm.lrms() + self.assertEqual(lrms._server_ip, 'slurmserverpublic') + + def test_init_lrms(self): + lrms = slurm.lrms('test_ip') + self.assertEqual(lrms._server_ip, 'test_ip') + + @patch('cluesplugins.slurm.runcommand') + def test_get_nodeinfolist(self, runcommand): + lrms = slurm.lrms() + runcommand.side_effect = [(True, read_file('./test-files/slurm_nodes.txt').encode()), + (True, read_file('./test-files/slurm_partitions.txt').encode()), + (True, read_file('./test-files/slurm_partitions.txt').encode())] + node_info = lrms.get_nodeinfolist() + self.assertEqual(len(node_info), 2) + self.assertEqual(node_info['vnode-1'].name, 'vnode-1') + self.assertEqual(node_info['vnode-1'].slots_count, 1) + self.assertEqual(node_info['vnode-1'].slots_free, 1) + self.assertEqual(node_info['vnode-1'].memory_total, 1073741824.0) + self.assertEqual(node_info['vnode-1'].memory_free, 1073741824.0) + self.assertEqual(node_info['vnode-1'].state, NodeInfo.IDLE) + self.assertEqual(len(node_info['vnode-1'].keywords), 2) + self.assertEqual(node_info['vnode-1'].keywords['hostname'], TypedClass.auto('vnode-1')) + self.assertEqual(node_info['vnode-1'].keywords['queues'], TypedList([TypedClass.auto('debug')])) + self.assertEqual(node_info['vnode-2'].name, 'vnode-2') + self.assertEqual(len(node_info['vnode-2'].keywords), 2) + self.assertEqual(node_info['vnode-2'].keywords['queues'], TypedList([TypedClass.auto('debug')])) + self.assertEqual(node_info['vnode-2'].state, NodeInfo.OFF) + + @patch('cluesplugins.slurm.runcommand') + def test_get_jobinfolist(self, runcommand): + lrms = slurm.lrms() + runcommand.return_value = True, read_file('./test-files/slurm_jobs.txt').encode() + jobs_info = lrms.get_jobinfolist() + self.assertEqual(len(jobs_info), 2) + self.assertEqual(jobs_info[0].job_id, '2') + self.assertEqual(jobs_info[0].job_nodes_ids, ['vnode-1']) + self.assertEqual(jobs_info[0].resources.resources.slots, 1) + self.assertEqual(jobs_info[0].resources.resources.memory, 1073741824.0) + self.assertEqual(jobs_info[0].resources.resources.requests, ['"debug" in queues']) + self.assertEqual(jobs_info[1].job_id, '3') + +if __name__ == '__main__': + unittest.main() From 0cc539cf64cc4ab69a97d02c44bd209d308c5546 Mon Sep 17 00:00:00 2001 From: micafer Date: Tue, 8 Feb 2022 13:01:29 +0100 Subject: [PATCH 21/33] Add new IM test --- cluesplugins/im.py | 2 +- test/test-files/tosca.yml | 197 ++++++++++++++++++++++++++++++++++++++ test/test_im.py | 32 +++++++ 3 files changed, 230 insertions(+), 1 deletion(-) create mode 100644 test/test-files/tosca.yml diff --git a/cluesplugins/im.py b/cluesplugins/im.py index e5136f3..e3b82c5 100644 --- a/cluesplugins/im.py +++ b/cluesplugins/im.py @@ -262,7 +262,7 @@ def _get_template(self, nnode): _LOGGER.error("ERROR getting infrastructure template: %s" % str(resp.text)) return None else: - templateo = yaml.load(resp.json()["tosca"]) + templateo = yaml.safe_load(resp.json()["tosca"]) node_name = self._find_wn_nodetemplate_name(templateo) node_template = templateo['topology_template']['node_templates'][node_name] diff --git a/test/test-files/tosca.yml b/test/test-files/tosca.yml new file mode 100644 index 0000000..57916a1 --- /dev/null +++ b/test/test-files/tosca.yml @@ -0,0 +1,197 @@ +tosca_definitions_version: tosca_simple_yaml_1_0 + +imports: + - ec3_custom_types: https://raw.githubusercontent.com/grycap/ec3/tosca/tosca/custom_types.yaml + +metadata: + display_name: Launch a Kubernetes Virtual Cluster + icon: images/kubernetes.png + order: 5 + +description: TOSCA template for launching a Kubernetes Virtual Cluster. + +topology_template: + inputs: + wn_num: + type: integer + description: Number of WNs in the cluster + default: 1 + required: yes + fe_cpus: + type: integer + description: Number of CPUs for the front-end node + default: 2 + required: yes + constraints: + - valid_values: [ 2, 4, 8, 16, 32, 64 ] + fe_mem: + type: scalar-unit.size + description: Amount of Memory for the front-end node + default: 4 GB + required: yes + constraints: + - valid_values: [ 4 GB, 8 GB, 16 GB, 32 GB, 64 GB, 128 GB, 256 GB, 512 GB ] + fe_instance_type: + type: string + description: Flavor name of the front-end node + default: "" + wn_cpus: + type: integer + description: Number of CPUs for the WNs + default: 2 + required: yes + constraints: + - valid_values: [ 2, 4, 8, 16, 32, 64 ] + wn_mem: + type: scalar-unit.size + description: Amount of Memory for the WNs + default: 4 GB + required: yes + constraints: + - valid_values: [ 4 GB, 8 GB, 16 GB, 32 GB, 64 GB, 128 GB, 256 GB, 512 GB ] + wn_instance_type: + type: string + description: Flavor name for the WNs + default: "" + disk_size: + type: scalar-unit.size + description: Size of the disk to be attached to the FE instance + default: 10 GB + constraints: + - valid_values: [ 10 GB, 20 GB, 50 GB, 100 GB, 200 GB, 500 GB, 1 TB, 2 TB ] + volume_id: + type: string + description: "Or URL of the disk to be attached to the instance (format: ost://api.cloud.ifca.es/" + default: "" + + admin_token: + type: string + description: Access Token for the Kubernetes admin user + default: not_very_secret_token + kube_version: + type: string + description: Version of Kubernetes to install + default: "1.21.9" + constraints: + - valid_values: [ "1.21.9", "1.22.6", "1.23.2", "1.20.15", "1.19.16" ] + cri_runtime: + type: string + description: CRI Runtime to use with Kubernetes + default: "docker" + constraints: + - valid_values: [ docker, containerd ] + install_kubeapps: + type: boolean + description: Flag to set the kubeapps UI to be installed + default: true + constraints: + - valid_values: [ true, false ] + kube_nvidia_support: + type: boolean + description: Flag to add NVIDIA support + default: false + constraints: + - valid_values: [ false, true ] + kube_cert_manager: + type: boolean + description: Flag to install Cert-Manager + default: false + constraints: + - valid_values: [ false, true ] + kube_cert_user_email: + type: string + description: Email to be used in the Let's Encrypt issuer + default: "jhondoe@server.com" + + node_templates: + + lrms_front_end: + type: tosca.nodes.indigo.LRMS.FrontEnd.Kubernetes + capabilities: + endpoint: + properties: + ports: + http_port: + protocol: tcp + source: 80 + https_port: + protocol: tcp + source: 443 + kube_port: + protocol: tcp + source: 6443 + properties: + admin_username: kubeuser + install_nfs_client: true + admin_token: { get_input: admin_token } + install_kubeapps: { get_input: install_kubeapps } + version: { get_input: kube_version } + nvidia_support: { get_input: kube_nvidia_support } + cert_manager: { get_input: kube_cert_manager } + cert_user_email: { get_input: kube_cert_user_email } + cri_runtime: { get_input: cri_runtime } + requirements: + - host: front + + front: + type: tosca.nodes.indigo.Compute + capabilities: + endpoint: + properties: + dns_name: kubeserver + network_name: PUBLIC + host: + properties: + num_cpus: { get_input: fe_cpus } + mem_size: { get_input: fe_mem } + instance_type: { get_input: fe_instance_type } + os: + properties: + distribution: ubuntu + type: linux + requirements: + - local_storage: + node: fe_block_storage + relationship: + type: AttachesTo + properties: + location: /pv + + fe_block_storage: + type: tosca.nodes.BlockStorage + properties: + size: { get_input: disk_size } + volume_id: { get_input: volume_id } + + wn_node: + type: tosca.nodes.indigo.LRMS.WorkerNode.Kubernetes + properties: + front_end_ip: { get_attribute: [ front, private_address, 0 ] } + version: { get_input: kube_version } + nvidia_support: { get_input: kube_nvidia_support } + cri_runtime: { get_input: cri_runtime } + requirements: + - host: wn + + wn: + type: tosca.nodes.indigo.Compute + capabilities: + scalable: + properties: + count: 1 + host: + properties: + num_cpus: 1 + mem_size: 2 GB + os: + properties: + distribution: ubuntu + type: linux + + outputs: + dashboard_endpoint: + value: { concat: [ 'https://', get_attribute: [ front, public_address, 0 ], '/dashboard/' ] } + api_endpoint: + value: { concat: [ 'https://', get_attribute: [ front, public_address, 0 ], ':6443' ] } + kubeapps_endpoint: + value: { concat: [ 'https://', get_attribute: [ front, public_address, 0 ], '/kubeapps/' ] } diff --git a/test/test_im.py b/test/test_im.py index f628b92..63e9424 100644 --- a/test/test_im.py +++ b/test/test_im.py @@ -18,6 +18,7 @@ import unittest import sys import os +import yaml from mock.mock import MagicMock, patch from radl.radl_parse import parse_radl @@ -355,6 +356,37 @@ def test_lifecycle(self, request, now, createdb, get_vms, read_auth, get_inf_id, self.assertEqual(request.call_args_list[0][0], ('PUT', 'http://server.com/infid/vms/1/disks/0/snapshot?image_name=im-uuid&auto_delete=1')) self.assertEqual(test_im._store_golden_image.call_args_list[0][0], ('wn', 'image', 'pass')) + @patch("cluesplugins.im.powermanager._read_auth_data") + @patch("cluesplugins.im.powermanager._get_inf_id") + @patch("cpyutils.db.DB.create_from_string") + @patch("requests.request") + def test_get_template(self, request, createdb, get_inf_id, read_auth): + get_inf_id.return_value = "http://server.com/infid" + read_auth.return_value = "type = InfrastructureManager; username = user; password = pass" + + inf_info = MagicMock() + inf_info.status_code = 200 + inf_info.json.return_value = {"uri-list": [{"uri": "http://server.com/infid/vms/0"}, + {"uri": "http://server.com/infid/vms/1"}]} + tosca_info = MagicMock() + tosca_info.status_code = 200 + with open(os.path.join(self.TESTS_PATH, 'test-files/tosca.yml')) as f: + tosca_info.json.return_value = {"tosca": f.read()} + + request.side_effect = [inf_info, tosca_info] + + db = MagicMock() + db.sql_query.return_value = True, "", [] + createdb.return_value = db + + test_im = powermanager() + res = test_im._get_template('node-2') + + tosca_res = yaml.safe_load(res) + node_template = tosca_res['topology_template']['node_templates']['wn'] + self.assertEqual(node_template['capabilities']['scalable']['properties']['count'], 2) + self.assertEqual(node_template['capabilities']['endpoint']['properties']['dns_name'], 'node-2') + if __name__ == "__main__": unittest.main() From 4c250f18c31e2323c02f0d60adabeff2893dd03e Mon Sep 17 00:00:00 2001 From: micafer Date: Tue, 8 Feb 2022 13:01:41 +0100 Subject: [PATCH 22/33] Add missing slurm test file --- test/test-files/slurm_partitions.txt | 1 + 1 file changed, 1 insertion(+) create mode 100644 test/test-files/slurm_partitions.txt diff --git a/test/test-files/slurm_partitions.txt b/test/test-files/slurm_partitions.txt new file mode 100644 index 0000000..b5ff557 --- /dev/null +++ b/test/test-files/slurm_partitions.txt @@ -0,0 +1 @@ +PartitionName=debug AllowGroups=ALL AllowAccounts=ALL AllowQos=ALL AllocNodes=ALL Default=YES QoS=N/A DefaultTime=NONE DisableRootJobs=NO ExclusiveUser=NO GraceTime=0 Hidden=NO MaxNodes=UNLIMITED MaxTime=UNLIMITED MinNodes=0 LLN=NO MaxCPUsPerNode=UNLIMITED Nodes=vnode-[1-2] PriorityJobFactor=1 PriorityTier=1 RootOnly=NO ReqResv=NO OverSubscribe=NO OverTimeLimit=NONE PreemptMode=OFF State=UP TotalCPUs=1 TotalNodes=1 SelectTypeParameters=NONE JobDefaults=(null) DefMemPerNode=UNLIMITED MaxMemPerNode=UNLIMITED From c0e2da73155e7087b62f4f074749a21781794e75 Mon Sep 17 00:00:00 2001 From: Miguel Caballer Date: Mon, 4 Jul 2022 08:35:05 +0200 Subject: [PATCH 23/33] Fix py38 issues --- addons/slurm/clues-slurm-wrapper | 23 +++++++++++++++++------ 1 file changed, 17 insertions(+), 6 deletions(-) diff --git a/addons/slurm/clues-slurm-wrapper b/addons/slurm/clues-slurm-wrapper index 98579a0..c8b1177 100755 --- a/addons/slurm/clues-slurm-wrapper +++ b/addons/slurm/clues-slurm-wrapper @@ -21,12 +21,23 @@ import sys, os, os.path, stat, pwd, grp, time, string, tempfile import re, logging import subprocess import cpyutils -import platform -if platform.linux_distribution( supported_dists=('debian', 'redhat'),full_distribution_name=0)[0] == "redhat": - SBATCH_COMMAND= "/usr/bin/sbatch.o" -else: - SBATCH_COMMAND= "/usr/local/bin/sbatch.o" +try: + import distro + if "rhel" in distro.like(): + SBATCH_COMMAND= "/usr/bin/sbatch.o" + else: + SBATCH_COMMAND= "/usr/local/bin/sbatch.o" +except: + try: + import platform + + if platform.linux_distribution( supported_dists=('debian', 'redhat'),full_distribution_name=0)[0] == "redhat": + SBATCH_COMMAND= "/usr/bin/sbatch.o" + else: + SBATCH_COMMAND= "/usr/local/bin/sbatch.o" + except: + SBATCH_COMMAND= "/usr/local/bin/sbatch.o" # Method to execute bash commands def run_command(command): @@ -197,7 +208,7 @@ def new_job(cpus_per_task, mem, nodes, queue): print("Could not connect to CLUES server %s (please, check if it is running)" % clues.configcli.config_client.CLUES_XMLRPC) sys.exit() - if queue is not " ": + if queue != " ": req_str = '"' + queue + '" in queues' else: req_str = " " From e364a1f5181005bcbadbf2f6d0b59d3e99027b9a Mon Sep 17 00:00:00 2001 From: Miguel Caballer Date: Mon, 4 Jul 2022 09:07:42 +0200 Subject: [PATCH 24/33] Update clues-slurm-wrapper --- addons/slurm/clues-slurm-wrapper | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/addons/slurm/clues-slurm-wrapper b/addons/slurm/clues-slurm-wrapper index c8b1177..ca155fa 100755 --- a/addons/slurm/clues-slurm-wrapper +++ b/addons/slurm/clues-slurm-wrapper @@ -46,7 +46,7 @@ def run_command(command): (out, err) = p.communicate() if p.returncode != 0: raise Exception("return code: %d\nError output: %s" % (p.returncode, err)) - return out + return str(out).replace("\\n", "\n") except Exception as e: raise Exception("Error executing '%s': %s" % (" ".join(command), str(e))) From 88ae61833ff32aecdc57d9b0c0a85bbdca6be2a7 Mon Sep 17 00:00:00 2001 From: micafer Date: Thu, 15 Sep 2022 11:07:54 +0200 Subject: [PATCH 25/33] Add kube-flannel ns --- cluesplugins/kubernetes.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/cluesplugins/kubernetes.py b/cluesplugins/kubernetes.py index a15173a..07f20f5 100644 --- a/cluesplugins/kubernetes.py +++ b/cluesplugins/kubernetes.py @@ -140,7 +140,7 @@ def _get_node_used_resources(self, nodename, pods_data): # do not count the number of pods in case finished jobs if pod["status"]["phase"] not in ["Succeeded", "Failed"]: # do not count the number of pods in case of system ones - if pod["metadata"]["namespace"] == "kube-system": + if pod["metadata"]["namespace"] in ["kube-system", "kube-flannel"]: system_pods += 1 used_pods += 1 cpus, memory, ngpus, agpus, sgx = self._get_pod_cpus_and_memory(pod) @@ -317,7 +317,7 @@ def get_jobinfolist(self): pods_data = self._create_request('GET', self._pods_api_url_path, self.auth_data) if pods_data: for pod in pods_data["items"]: - if pod["metadata"]["namespace"] != "kube-system": + if pod["metadata"]["namespace"] not in ["kube-system", "kube-flannel"]: job_id = pod["metadata"]["uid"] state = pod["status"]["phase"] # Pending, Running, Succeeded, Failed or Unknown hostIP = None From 4d7388624901531c50418218f13493306eb4464c Mon Sep 17 00:00:00 2001 From: micafer Date: Thu, 13 Oct 2022 09:22:34 +0200 Subject: [PATCH 26/33] Ignore daemonsets --- cluesplugins/kubernetes.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/cluesplugins/kubernetes.py b/cluesplugins/kubernetes.py index 3060dcc..f466f78 100644 --- a/cluesplugins/kubernetes.py +++ b/cluesplugins/kubernetes.py @@ -140,7 +140,9 @@ def _get_node_used_resources(self, nodename, pods_data): # do not count the number of pods in case finished jobs if pod["status"]["phase"] not in ["Succeeded", "Failed"]: # do not count the number of pods in case of system ones - if pod["metadata"]["namespace"] in ["kube-system", "kube-flannel"]: + # nor in case of DaemonSets + if (pod["metadata"]["namespace"] in ["kube-system", "kube-flannel"] or + pod["metadata"]["ownerReferences"] and pod["metadata"]["ownerReferences"][0]["kind"] == "DaemonSet"): system_pods += 1 used_pods += 1 cpus, memory, ngpus, agpus, sgx = self._get_pod_cpus_and_memory(pod) From 8986994e3188ee7fdb958565bd469cc3e1f5b0cb Mon Sep 17 00:00:00 2001 From: micafer Date: Thu, 13 Oct 2022 12:28:04 +0200 Subject: [PATCH 27/33] fix error --- cluesplugins/kubernetes.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cluesplugins/kubernetes.py b/cluesplugins/kubernetes.py index f466f78..9148c80 100644 --- a/cluesplugins/kubernetes.py +++ b/cluesplugins/kubernetes.py @@ -142,7 +142,7 @@ def _get_node_used_resources(self, nodename, pods_data): # do not count the number of pods in case of system ones # nor in case of DaemonSets if (pod["metadata"]["namespace"] in ["kube-system", "kube-flannel"] or - pod["metadata"]["ownerReferences"] and pod["metadata"]["ownerReferences"][0]["kind"] == "DaemonSet"): + "ownerReferences" in pod["metadata"] and pod["metadata"]["ownerReferences"] and pod["metadata"]["ownerReferences"][0]["kind"] == "DaemonSet"): system_pods += 1 used_pods += 1 cpus, memory, ngpus, agpus, sgx = self._get_pod_cpus_and_memory(pod) From da6c77613f1689b6cdaea9b8fd523f48e10b8297 Mon Sep 17 00:00:00 2001 From: micafer Date: Thu, 13 Oct 2022 15:40:15 +0200 Subject: [PATCH 28/33] Improve mem config parse --- cluesplugins/kubernetes.py | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/cluesplugins/kubernetes.py b/cluesplugins/kubernetes.py index 9148c80..3fb9b36 100644 --- a/cluesplugins/kubernetes.py +++ b/cluesplugins/kubernetes.py @@ -16,6 +16,7 @@ # You should have received a copy of the GNU General Public License # along with this program. If not, see . +from cmath import isnan import collections import requests import base64 @@ -83,7 +84,7 @@ def __init__(self, KUBERNETES_SERVER=None, KUBERNETES_PODS_API_URL_PATH=None, "KUBERNETES_PODS_API_URL_PATH": "/api/v1/pods", "KUBERNETES_NODES_API_URL_PATH": "/api/v1/nodes", "KUBERNETES_TOKEN": None, - "KUBERNETES_NODE_MEMORY": 1073741824, + "KUBERNETES_NODE_MEMORY": "1 GB", "KUBERNETES_NODE_SLOTS": 1, "KUBERNETES_NODE_PODS": 110, } @@ -95,7 +96,8 @@ def __init__(self, KUBERNETES_SERVER=None, KUBERNETES_PODS_API_URL_PATH=None, self._nodes_api_url_path = Helpers.val_default(KUBERNETES_NODES_API_URL_PATH, config_kube.KUBERNETES_NODES_API_URL_PATH) token = Helpers.val_default(KUBERNETES_TOKEN, config_kube.KUBERNETES_TOKEN) - self._node_memory = Helpers.val_default(KUBERNETES_NODE_MEMORY, config_kube.KUBERNETES_NODE_MEMORY) + self._node_memory = self._get_memory_in_bytes(Helpers.val_default(KUBERNETES_NODE_MEMORY, + config_kube.KUBERNETES_NODE_MEMORY)) self._node_slots = Helpers.val_default(KUBERNETES_NODE_SLOTS, config_kube.KUBERNETES_NODE_SLOTS) self._node_pods = Helpers.val_default(KUBERNETES_NODE_PODS, config_kube.KUBERNETES_NODE_PODS) @@ -106,8 +108,10 @@ def __init__(self, KUBERNETES_SERVER=None, KUBERNETES_PODS_API_URL_PATH=None, LRMS.__init__(self, "KUBERNETES_%s" % self._server_url) def _get_memory_in_bytes(self, str_memory): + if (isinstance(str_memory, (int, float))): + return str_memory str_memory = str_memory.lower() - if str_memory.strip()[-2:] in ['mi', 'gi', 'ki', 'ti']: + if str_memory.strip()[-2:] in ['mi', 'mb', 'gi', 'gb', 'ki', 'kb', 'ti', 'tb']: unit = str_memory.strip()[-2:][0] memory = int(str_memory.strip()[:-2]) elif str_memory.strip()[-1:] in ['m', 'g', 'k', 't']: From 417e03d547a8a7ed930f71e7c97a57a23bef034f Mon Sep 17 00:00:00 2001 From: Miguel Caballer Date: Fri, 11 Mar 2022 11:38:39 +0100 Subject: [PATCH 29/33] Improve behavior with NoSchedule taint --- cluesplugins/kubernetes.py | 27 +++++++++++++++------------ test/test-files/nodes.json | 6 ++++++ test/test_kubernetes.py | 9 ++++++--- 3 files changed, 27 insertions(+), 15 deletions(-) diff --git a/cluesplugins/kubernetes.py b/cluesplugins/kubernetes.py index 3fb9b36..b520d0a 100644 --- a/cluesplugins/kubernetes.py +++ b/cluesplugins/kubernetes.py @@ -184,15 +184,10 @@ def get_nodeinfolist(self): if 'sgx.k8s.io/sgx' in node["status"]["allocatable"]: sgx = int(node["status"]["allocatable"]["sgx.k8s.io/sgx"]) - skip_node = False - # Get Taints - if 'taints' in node["spec"] and node["spec"]['taints']: - for taint in node["spec"]['taints']: - if taint['effect'] in ["NoSchedule", "PreferNoSchedule", "NoExecute"]: - skip_node = True - _LOGGER.debug("Node %s is tainted with %s, skiping." % (name, taint['effect'])) - - if not skip_node: + # Skip master node + if "node-role.kubernetes.io/master" in node["metadata"]["labels"]: + _LOGGER.debug("Node %s seems to be master node, skiping." % name) + else: used_mem, used_cpus, used_agpus, used_ngpus, used_sgx, used_pods, system_pods = \ self._get_node_used_resources(name, pods_data) @@ -210,7 +205,14 @@ def get_nodeinfolist(self): is_ready = False keywords = {'pods_free': TypedNumber(pods_free), - 'nodeName': TypedClass(name, TypedClass.STRING)} + 'nodeName': TypedClass(name, TypedClass.STRING), + 'schedule': TypedNumber(1)} + + # Get Taints + if 'taints' in node["spec"] and node["spec"]['taints']: + for taint in node["spec"]['taints']: + if taint['effect'] in ["NoSchedule", "NoExecute"]: + keywords['schedule'] = TypedNumber(0) if agpus_free: keywords['amd_gpu'] = TypedNumber(agpus_free) @@ -240,7 +242,8 @@ def get_nodeinfolist(self): name = vnode["name"] if name not in nodeinfolist: keywords = {'pods_free': TypedNumber(self._node_pods), - 'nodeName': TypedClass(name, TypedClass.STRING)} + 'nodeName': TypedClass(name, TypedClass.STRING), + 'schedule': TypedNumber(1)} cpus = self._node_slots if "cpu" in vnode: @@ -340,7 +343,7 @@ def get_jobinfolist(self): cpus, memory, ngpus, agpus, sgx = self._get_pod_cpus_and_memory(pod) - req_str = '(pods_free > 0)' + req_str = '(pods_free > 0) && (schedule = 1)' if 'nodeName' in pod["spec"] and pod["spec"]["nodeName"]: req_str += ' && (nodeName = "%s")' % pod["spec"]["nodeName"] if ngpus: diff --git a/test/test-files/nodes.json b/test/test-files/nodes.json index e3311f3..8b5a290 100644 --- a/test/test-files/nodes.json +++ b/test/test-files/nodes.json @@ -339,6 +339,12 @@ "podCIDR": "10.244.1.0/24", "podCIDRs": [ "10.244.1.0/24" + ], + "taints": [ + { + "effect": "NoSchedule", + "key": "wn-3" + } ] }, "status": { diff --git a/test/test_kubernetes.py b/test/test_kubernetes.py index f501dab..1f1cc9d 100644 --- a/test/test_kubernetes.py +++ b/test/test_kubernetes.py @@ -57,10 +57,11 @@ def test_get_nodeinfolist(self, request): self.assertEqual(nodes['gpuwn-1.localdomain'].memory_total, 16713633792) self.assertEqual(nodes['gpuwn-1.localdomain'].memory_free, 16713633792) self.assertEqual(nodes['gpuwn-1.localdomain'].state, Node.IDLE) - self.assertEqual(len(nodes['gpuwn-1.localdomain'].keywords), 8) + self.assertEqual(len(nodes['gpuwn-1.localdomain'].keywords), 9) self.assertEqual(nodes['gpuwn-1.localdomain'].keywords["pods_free"].value, 110) self.assertEqual(nodes['gpuwn-1.localdomain'].keywords["nodeName"].value, 'gpuwn-1.localdomain') self.assertEqual(nodes['gpuwn-1.localdomain'].keywords["nvidia_gpu"].value, 2) + self.assertEqual(nodes['gpuwn-1.localdomain'].keywords["schedule"].value, 1) self.assertEqual(nodes['wn4.localdomain'].slots_count, 2) self.assertEqual(nodes['wn4.localdomain'].memory_total, 1024) @@ -71,6 +72,7 @@ def test_get_nodeinfolist(self, request): self.assertEqual(nodes['wn5.localdomain'].keywords["sgx_epc_size"].value, 128) self.assertEqual(nodes['wn-3.localdomain'].keywords["sgx"].value, 1) + self.assertEqual(nodes['wn-3.localdomain'].keywords["schedule"].value, 0) os.unlink(kube.VNODE_FILE) @@ -88,8 +90,9 @@ def test_get_jobinfolist(self, request): self.assertEqual(jobs[1].state, Request.SERVED) self.assertEqual(jobs[1].resources.resources.slots, 0.25) self.assertEqual(jobs[1].resources.resources.memory, 134217728) - self.assertEqual(jobs[1].resources.resources.requests, [('(pods_free > 0) && (nodeName = "wn-2.localdomain")' + - ' && (nvidia_gpu >= 1) && (sgx >= 1)')]) + self.assertEqual(jobs[1].resources.resources.requests, [('(pods_free > 0) && (schedule = 1) && ' + + '(nodeName = "wn-2.localdomain") && ' + + '(nvidia_gpu >= 1) && (sgx >= 1)')]) if __name__ == "__main__": From c09d10bc6abb4d7ef8b9d577cf2072d815fcdeac Mon Sep 17 00:00:00 2001 From: micafer Date: Thu, 13 Oct 2022 16:01:01 +0200 Subject: [PATCH 30/33] Add control-plane node --- cluesplugins/kubernetes.py | 3 ++- test/test_kubernetes.py | 2 +- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/cluesplugins/kubernetes.py b/cluesplugins/kubernetes.py index b520d0a..5c0fdb9 100644 --- a/cluesplugins/kubernetes.py +++ b/cluesplugins/kubernetes.py @@ -185,7 +185,8 @@ def get_nodeinfolist(self): sgx = int(node["status"]["allocatable"]["sgx.k8s.io/sgx"]) # Skip master node - if "node-role.kubernetes.io/master" in node["metadata"]["labels"]: + if ("node-role.kubernetes.io/master" in node["metadata"]["labels"] or + "node-role.kubernetes.io/control-plane" in node["metadata"]["labels"]): _LOGGER.debug("Node %s seems to be master node, skiping." % name) else: used_mem, used_cpus, used_agpus, used_ngpus, used_sgx, used_pods, system_pods = \ diff --git a/test/test_kubernetes.py b/test/test_kubernetes.py index 1f1cc9d..4388441 100644 --- a/test/test_kubernetes.py +++ b/test/test_kubernetes.py @@ -91,7 +91,7 @@ def test_get_jobinfolist(self, request): self.assertEqual(jobs[1].resources.resources.slots, 0.25) self.assertEqual(jobs[1].resources.resources.memory, 134217728) self.assertEqual(jobs[1].resources.resources.requests, [('(pods_free > 0) && (schedule = 1) && ' + - '(nodeName = "wn-2.localdomain") && ' + + '(nodeName = "wn-2.localdomain") && ' + '(nvidia_gpu >= 1) && (sgx >= 1)')]) From dddbce7b362c641512758d4a04b4ce6ed7f46ee7 Mon Sep 17 00:00:00 2001 From: Miguel Caballer Date: Fri, 14 Oct 2022 08:19:31 +0200 Subject: [PATCH 31/33] Minor change --- cluesplugins/kubernetes.py | 1 - 1 file changed, 1 deletion(-) diff --git a/cluesplugins/kubernetes.py b/cluesplugins/kubernetes.py index 41df886..f951564 100644 --- a/cluesplugins/kubernetes.py +++ b/cluesplugins/kubernetes.py @@ -16,7 +16,6 @@ # You should have received a copy of the GNU General Public License # along with this program. If not, see . -from cmath import isnan import collections import requests import base64 From 7541b82bb38085ea1a987956e602dc3842cd7aec Mon Sep 17 00:00:00 2001 From: Miguel Caballer Date: Mon, 13 Mar 2023 09:42:15 +0100 Subject: [PATCH 32/33] Update kubernetes.py --- cluesplugins/kubernetes.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/cluesplugins/kubernetes.py b/cluesplugins/kubernetes.py index f951564..c3d4d26 100644 --- a/cluesplugins/kubernetes.py +++ b/cluesplugins/kubernetes.py @@ -343,9 +343,9 @@ def get_jobinfolist(self): cpus, memory, ngpus, agpus, sgx = self._get_pod_cpus_and_memory(pod) - req_str = '(pods_free > 0) && (schedule = 1)' + req_str = '(pods_free > 0) && (schedule == 1)' if 'nodeName' in pod["spec"] and pod["spec"]["nodeName"]: - req_str += ' && (nodeName = "%s")' % pod["spec"]["nodeName"] + req_str += ' && (nodeName == "%s")' % pod["spec"]["nodeName"] if ngpus: req_str += ' && (nvidia_gpu >= %d)' % ngpus if agpus: From 758faafaacc68a435da0096238c48c4f590a2ee9 Mon Sep 17 00:00:00 2001 From: Miguel Caballer Date: Tue, 14 Mar 2023 09:23:36 +0100 Subject: [PATCH 33/33] Update test_kubernetes.py fix test --- test/test_kubernetes.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/test/test_kubernetes.py b/test/test_kubernetes.py index 4388441..d1c25ef 100644 --- a/test/test_kubernetes.py +++ b/test/test_kubernetes.py @@ -90,8 +90,8 @@ def test_get_jobinfolist(self, request): self.assertEqual(jobs[1].state, Request.SERVED) self.assertEqual(jobs[1].resources.resources.slots, 0.25) self.assertEqual(jobs[1].resources.resources.memory, 134217728) - self.assertEqual(jobs[1].resources.resources.requests, [('(pods_free > 0) && (schedule = 1) && ' + - '(nodeName = "wn-2.localdomain") && ' + + self.assertEqual(jobs[1].resources.resources.requests, [('(pods_free > 0) && (schedule == 1) && ' + + '(nodeName == "wn-2.localdomain") && ' + '(nvidia_gpu >= 1) && (sgx >= 1)')])