diff --git a/clueslib/hooks.py b/clueslib/hooks.py index 7777abb..82bca0b 100644 --- a/clueslib/hooks.py +++ b/clueslib/hooks.py @@ -21,6 +21,7 @@ from .configlib import _CONFIGURATION_HOOKS # cpyutils.log.Log.setup() + _LOGGER = cpyutils.log.Log("HOOKS") class HookSystem: @@ -108,4 +109,4 @@ def request(self, request): try: HOOKS except: - HOOKS = HookSystem() \ No newline at end of file + HOOKS = HookSystem() diff --git a/cluesplugins/im.py b/cluesplugins/im.py index e668eb3..e3b82c5 100644 --- a/cluesplugins/im.py +++ b/cluesplugins/im.py @@ -21,12 +21,11 @@ @author: micafer ''' -try: - from xmlrpclib import ServerProxy -except ImportError: - from xmlrpc.client import ServerProxy +import requests from uuid import uuid1 import re +import yaml +import os from radl import radl_parse from radl.radl import contextualize, contextualize_item @@ -56,618 +55,663 @@ class VirtualMachine: class powermanager(PowerManager): - class VM_Node: - def __init__(self, vm_id, radl, ec3_additional_vm): - self.vm_id = vm_id - self.radl = radl - self.timestamp_recovered = self.timestamp_created = self.timestamp_seen = cpyutils.eventloop.now() - self.ec3_additional_vm = ec3_additional_vm - self.last_state = None - - def seen(self): - self.timestamp_seen = cpyutils.eventloop.now() - # _LOGGER.debug("seen %s" % self.vm_id) - - def recovered(self): - self.timestamp_recovered = cpyutils.eventloop.now() - - def update(self, vm_id, radl): - self.vm_id = vm_id - self.radl = radl - - def __init__(self): - # - # NOTE: This fragment provides the support for global config files. It is a bit awful. - # I do not like it because it is like having global vars. But it is managed in - # this way for the sake of using configuration files - # - config_im = cpyutils.config.Configuration( - "IM VIRTUAL CLUSTER", - { - "IM_VIRTUAL_CLUSTER_XMLRPC": "http://localhost:8899", - "IM_VIRTUAL_CLUSTER_XMLRCP_SSL_CA_CERTS": "", - "IM_VIRTUAL_CLUSTER_XMLRCP_SSL": False, - "IM_VIRTUAL_CLUSTER_AUTH_DATA_FILE": "/usr/local/ec3/auth.dat", - "IM_VIRTUAL_CLUSTER_DROP_FAILING_VMS": 30, - "IM_VIRTUAL_CLUSTER_FORGET_MISSING_VMS": 30, - "IM_VIRTUAL_CLUSTER_DB_CONNECTION_STRING": "sqlite:///var/lib/clues2/clues.db" - } - ) - - self._IM_VIRTUAL_CLUSTER_AUTH_DATA_FILE = config_im.IM_VIRTUAL_CLUSTER_AUTH_DATA_FILE - self._IM_VIRTUAL_CLUSTER_DROP_FAILING_VMS = config_im.IM_VIRTUAL_CLUSTER_DROP_FAILING_VMS - self._IM_VIRTUAL_CLUSTER_FORGET_MISSING_VMS = config_im.IM_VIRTUAL_CLUSTER_FORGET_MISSING_VMS - self._IM_VIRTUAL_CLUSTER_XMLRCP_SSL = config_im.IM_VIRTUAL_CLUSTER_XMLRCP_SSL - self._IM_VIRTUAL_CLUSTER_XMLRPC = config_im.IM_VIRTUAL_CLUSTER_XMLRPC - self._IM_VIRTUAL_CLUSTER_XMLRCP_SSL_CA_CERTS = config_im.IM_VIRTUAL_CLUSTER_XMLRCP_SSL_CA_CERTS - - self._db = cpyutils.db.DB.create_from_string(config_im.IM_VIRTUAL_CLUSTER_DB_CONNECTION_STRING) - self._create_db() - - # Structure for the recovery of nodes - self._mvs_seen = {} - self._golden_images = self._load_golden_images() - self._stopped_vms = self._load_stopped_vms() - self._inf_id = None - - def _create_db(self): - try: - result, _, _ = self._db.sql_query("CREATE TABLE IF NOT EXISTS im_golden_images(ec3_class varchar(255) " - "PRIMARY KEY, image varchar(255), password varchar(255))", True) - result, _, _ = self._db.sql_query("CREATE TABLE IF NOT EXISTS im_stopped_vms(node_name varchar(255) " - "PRIMARY KEY, vm_id varchar(255))", True) - except: - _LOGGER.exception( - "Error creating IM plugin DB. The data persistence will not work!") - result = False - return result - - def _store_golden_image(self, ec3_class, image, password): - try: - self._db.sql_query("INSERT into im_golden_images values ('%s','%s','%s')" % (ec3_class, + class VM_Node: + def __init__(self, vm_id, radl, ec3_additional_vm): + self.vm_id = vm_id + self.radl = radl + self.timestamp_recovered = self.timestamp_created = self.timestamp_seen = cpyutils.eventloop.now() + self.ec3_additional_vm = ec3_additional_vm + self.last_state = None + + def seen(self): + self.timestamp_seen = cpyutils.eventloop.now() + # _LOGGER.debug("seen %s" % self.vm_id) + + def recovered(self): + self.timestamp_recovered = cpyutils.eventloop.now() + + def update(self, vm_id, radl): + self.vm_id = vm_id + self.radl = radl + + def __init__(self): + # + # NOTE: This fragment provides the support for global config files. It is a bit awful. + # I do not like it because it is like having global vars. But it is managed in + # this way for the sake of using configuration files + # + config_im = cpyutils.config.Configuration( + "IM VIRTUAL CLUSTER", + { + "IM_VIRTUAL_CLUSTER_TOSCA": False, + "IM_VIRTUAL_CLUSTER_REST_API": "http://localhost:8800", + "IM_VIRTUAL_CLUSTER_REST_SSL_CA_CERTS": "", + "IM_VIRTUAL_CLUSTER_REST_SSL": False, + "IM_VIRTUAL_CLUSTER_AUTH_DATA_FILE": "/usr/local/ec3/auth.dat", + "IM_VIRTUAL_CLUSTER_DROP_FAILING_VMS": 30, + "IM_VIRTUAL_CLUSTER_FORGET_MISSING_VMS": 30, + "IM_VIRTUAL_CLUSTER_DB_CONNECTION_STRING": "sqlite:///var/lib/clues2/clues.db", + "IM_VIRTUAL_CLUSTER_INF_ID": None + } + ) + + self._IM_VIRTUAL_CLUSTER_TOSCA = config_im.IM_VIRTUAL_CLUSTER_TOSCA + self._IM_VIRTUAL_CLUSTER_AUTH_DATA_FILE = config_im.IM_VIRTUAL_CLUSTER_AUTH_DATA_FILE + self._IM_VIRTUAL_CLUSTER_DROP_FAILING_VMS = config_im.IM_VIRTUAL_CLUSTER_DROP_FAILING_VMS + self._IM_VIRTUAL_CLUSTER_FORGET_MISSING_VMS = config_im.IM_VIRTUAL_CLUSTER_FORGET_MISSING_VMS + self._IM_VIRTUAL_CLUSTER_REST_SSL = config_im.IM_VIRTUAL_CLUSTER_REST_SSL + self._IM_VIRTUAL_CLUSTER_REST_API = config_im.IM_VIRTUAL_CLUSTER_REST_API + self._IM_VIRTUAL_CLUSTER_REST_SSL_CA_CERTS = config_im.IM_VIRTUAL_CLUSTER_REST_SSL_CA_CERTS + + self._db = cpyutils.db.DB.create_from_string(config_im.IM_VIRTUAL_CLUSTER_DB_CONNECTION_STRING) + self._create_db() + + # Structure for the recovery of nodes + self._mvs_seen = {} + self._golden_images = self._load_golden_images() + self._stopped_vms = self._load_stopped_vms() + self._inf_id = config_im.IM_VIRTUAL_CLUSTER_INF_ID + # If the ID is not the whoule URL, complete it + if self._inf_id and not self._inf_id.startswith("http"): + self._inf_id = "%s/infrastructures/%s" % (self._IM_VIRTUAL_CLUSTER_REST_API, self._inf_id) + + def _create_db(self): + try: + result, _, _ = self._db.sql_query("CREATE TABLE IF NOT EXISTS im_golden_images(ec3_class varchar(255) " + "PRIMARY KEY, image varchar(255), password varchar(255))", True) + result, _, _ = self._db.sql_query("CREATE TABLE IF NOT EXISTS im_stopped_vms(node_name varchar(255) " + "PRIMARY KEY, vm_id varchar(255))", True) + except: + _LOGGER.exception( + "Error creating IM plugin DB. The data persistence will not work!") + result = False + return result + + def _store_golden_image(self, ec3_class, image, password): + try: + self._db.sql_query("INSERT into im_golden_images values ('%s','%s','%s')" % (ec3_class, image, - password), True) - except: - _LOGGER.exception("Error trying to save IM golden image data.") - - def _store_stopped_vm(self, node_name, vm_id): - try: - self._db.sql_query("INSERT OR REPLACE into im_stopped_vms values ('%s','%s')" % (node_name, vm_id), True) - except: - _LOGGER.exception("Error trying to save IM stopped VMs data.") - - def _delete_stopped_vm(self, node_name): - try: - del self._stopped_vms[node_name] - self._db.sql_query("DELETE FROM im_stopped_vms where node_name = '%s'" % node_name, True) - except: - _LOGGER.exception("Error trying to delete IM stopped VMs data.") - - def _load_stopped_vms(self): - res = {} - try: - result, _, rows = self._db.sql_query("select * from im_stopped_vms") - if result: - for nname, vm_id in rows: - res[nname] = vm_id - else: - _LOGGER.error("Error trying to load IM stopped VMs data.") - except: - _LOGGER.exception("Error trying to load IM stopped VMs data.") - - return res - - def _load_golden_images(self): - res = {} - try: - result, _, rows = self._db.sql_query("select * from im_golden_images") - if result: - for (ec3_class, image, password) in rows: - res[ec3_class] = image, password - else: - _LOGGER.error("Error trying to load IM golden images data.") - except: - _LOGGER.exception("Error trying to load IM golden images data.") - - return res - - def _get_inf_id(self): - if self._inf_id is not None: - return self._inf_id - else: - server = self._get_server() - auth_data = self._read_auth_data(self._IM_VIRTUAL_CLUSTER_AUTH_DATA_FILE) - (success, inf_list) = server.GetInfrastructureList(auth_data) - if success: - if len(inf_list) > 0: - _LOGGER.debug("The IM Inf ID is %s" % inf_list[0]) - self._inf_id = inf_list[0] - return inf_list[0] - else: - _LOGGER.error("Error getting infrastructure list: No infrastructure!.") - else: - _LOGGER.error("Error getting infrastructure list: %s" % inf_list) - return None - - def _get_server(self): - if self._IM_VIRTUAL_CLUSTER_XMLRCP_SSL: - from springpython.remoting.xmlrpc import SSLClient - return SSLClient(self._IM_VIRTUAL_CLUSTER_XMLRPC, self._IM_VIRTUAL_CLUSTER_XMLRCP_SSL_CA_CERTS) - else: - return ServerProxy(self._IM_VIRTUAL_CLUSTER_XMLRPC,allow_none=True) - - # From IM.auth - @staticmethod - def _read_auth_data(filename): - if isinstance(filename, list): - lines = filename - else: - auth_file = open(filename, 'r') - lines = auth_file.readlines() - auth_file.close() - - res = [] - - for line in lines: - line = line.strip() - if len(line) > 0 and not line.startswith("#"): - auth = {} - tokens = line.split(";") - for token in tokens: - key_value = token.split(" = ") - if len(key_value) != 2: - break; - else: - value = key_value[1].strip().replace("\\n","\n") - # Enable to specify a filename and set the contents of it - if value.startswith("file(") and value.endswith(")"): - filename = value[5:len(value)-1] - try: - value_file = open(filename, 'r') - value = value_file.read() - value_file.close() - except: - pass - auth[key_value[0].strip()] = value - res.append(auth) - - return res - - def _get_system(self, vm_info, radl_all, nname): - - # First try to check if the user has specified the ec3_node_pattern - # it must be a regular expression to match with nname + password), True) + except: + _LOGGER.exception("Error trying to save IM golden image data.") + + def _store_stopped_vm(self, node_name, vm_id): + try: + self._db.sql_query("INSERT OR REPLACE into im_stopped_vms values ('%s','%s')" % (node_name, vm_id), True) + except: + _LOGGER.exception("Error trying to save IM stopped VMs data.") + + def _delete_stopped_vm(self, node_name): + try: + del self._stopped_vms[node_name] + self._db.sql_query("DELETE FROM im_stopped_vms where node_name = '%s'" % node_name, True) + except: + _LOGGER.exception("Error trying to delete IM stopped VMs data.") + + def _load_stopped_vms(self): + res = {} + try: + result, _, rows = self._db.sql_query("select * from im_stopped_vms") + if result: + for nname, vm_id in rows: + res[nname] = vm_id + else: + _LOGGER.error("Error trying to load IM stopped VMs data.") + except: + _LOGGER.exception("Error trying to load IM stopped VMs data.") + + return res + + def _load_golden_images(self): + res = {} + try: + result, _, rows = self._db.sql_query("select * from im_golden_images") + if result: + for (ec3_class, image, password) in rows: + res[ec3_class] = image, password + else: + _LOGGER.error("Error trying to load IM golden images data.") + except: + _LOGGER.exception("Error trying to load IM golden images data.") + + return res + + def _get_inf_id(self): + if self._inf_id is not None: + return self._inf_id + else: + auth_data = self._read_auth_data(self._IM_VIRTUAL_CLUSTER_AUTH_DATA_FILE) + headers = {"Authorization": auth_data, "Accept": "application/json"} + url = "%s/infrastructures" % self._IM_VIRTUAL_CLUSTER_REST_API + resp = requests.request("GET", url, verify=False, headers=headers) + + if resp.status_code == 200: + inf_list = resp.json()['uri-list'] + if len(inf_list) > 0: + _LOGGER.debug("The IM Inf ID is %s" % inf_list[0]) + self._inf_id = inf_list[0]['uri'] + return self._inf_id + else: + _LOGGER.error("Error getting infrastructure list: No infrastructure!.") + else: + _LOGGER.error("Error getting infrastructure list: %s. %s." % (resp.reason, resp.text)) + return None + + # From IM.auth + @staticmethod + def _read_auth_data(filename): + if isinstance(filename, list): + res = "\\n".join(filename) + else: + auth_file = open(filename, 'r') + res = auth_file.read().replace('\n', '\\n') + auth_file.close() + return res + + def _get_system(self, vm_info, radl_all, nname): + + # First try to check if the user has specified the ec3_node_pattern + # it must be a regular expression to match with nname # for example: vnode-[1,2,3,4,5] - for system in radl_all.systems: - if system.getValue("ec3_node_pattern"): - if re.match(system.getValue("ec3_node_pattern"), nname): - return system.name - - # Start with the system named "wn" - current_system = "wn" - while current_system: - system_orig = vm_info[current_system]["radl"] - ec3_max_instances = system_orig.getValue("ec3_max_instances", -1) - if ec3_max_instances < 0: - ec3_max_instances = 99999999 - if vm_info[current_system]["count"] < ec3_max_instances: - return current_system - else: - # we must change the system to the next one - current_system = system_orig.getValue("ec3_if_fail", '') - if not current_system: - _LOGGER.error("Error: we need more instances but ec3_if_fail of system %s is empty" % system_orig.name) - return None - - def _get_radl(self, nname): - inf_id = self._get_inf_id() - if not inf_id: - _LOGGER.error("Error getting RADL. No infrastructure ID!!") - return None - server = self._get_server() - auth_data = self._read_auth_data(self._IM_VIRTUAL_CLUSTER_AUTH_DATA_FILE) - (success, vm_ids) = server.GetInfrastructureInfo(inf_id, auth_data) - - # Get all the info from RADL - # Especial features in system: - #- 'ec3_max_instances': maximum number of nodes with this system configuration; a negative value is like no constrain; default value is -1. - #- 'ec3_destroy_interval': some cloud providers pay a certain amount of time in advance, like AWS EC2. The node will be destroyed only when it is idle at the end of the interval expressed by this option in seconds. The default value is 0. - #- 'ec3_destroy_safe': seconds before the deadline set by \'ec3_destroy_interval\' that the node can be destroyed; the default value is 0. - #- 'ec3_if_fail': name of the next system configuration to try after this fails a launch or the number of instances saturates; the default value is ''. - - vm_info = {} - if success: - # The first one is always the front-end node - for vm_id in vm_ids[1:]: - (success, radl_data) = server.GetVMInfo(inf_id, vm_id, auth_data) - if success: - radl = radl_parse.parse_radl(radl_data) - ec3_class = radl.systems[0].getValue("ec3_class") - if ec3_class not in vm_info: - vm_info[ec3_class] = {} - vm_info[ec3_class]['count'] = 0 - vm_info[ec3_class]['count'] += 1 - else: - _LOGGER.error("Error getting VM info: %s" % radl_data) - else: - _LOGGER.error("Error getting infrastructure info: %s" % vm_ids) - - (success, radl_data) = server.GetInfrastructureRADL(inf_id, auth_data) - if success: - radl_all = radl_parse.parse_radl(radl_data) - else: - _LOGGER.error("Error getting infrastructure RADL: %s" % radl_data) - return None - - # Get info from the original RADL - for system in radl_all.systems: - if system.name not in vm_info: - vm_info[system.name] = {} - vm_info[system.name]['count'] = 0 - vm_info[system.name]['radl'] = system - - current_system = self._get_system(vm_info, radl_all, nname) - if current_system: - # launch this system type - new_radl = "" - for net in radl_all.networks: - new_radl += "network " + net.id + "\n" - - system_orig = vm_info[current_system]["radl"] - system_orig.name = nname - system_orig.setValue("net_interface.0.dns_name", str(nname)) - system_orig.setValue("ec3_class", current_system) - if current_system in self._golden_images: - image, password = self._golden_images[current_system] - system_orig.setValue("disk.0.image.url", image) - _LOGGER.debug("A golden image for %s node is stored, using it: %s" % (current_system, image)) - if password: - system_orig.setValue("disk.0.os.credentials.password", password) - new_radl += str(system_orig) + "\n" - - if radl_all.contextualize: - node_citems = [] - node_configures = [] - for citem in radl_all.contextualize.items.values(): - if citem.system == current_system: - node_configures.append(citem.configure) - node_citems.append(contextualize_item(nname, - citem.configure, - citem.num)) - for configure in radl_all.configures: - if configure.name in node_configures: - new_radl += str(configure) + "\n" - new_radl += str(contextualize(node_citems)) + "\n" - else: - node_configure = None - for configure in radl_all.configures: - if not node_configure and configure.name == 'wn': - node_configure = configure - if configure.name == current_system: - node_configure = configure - break - - if node_configure: - node_configure.name = nname - new_radl += str(node_configure) + "\n" - - new_radl += "deploy " + nname + " 1" - - return new_radl - else: - _LOGGER.error("Error generating infrastructure RADL") - return None - - def _get_vms(self): - inf_id = self._get_inf_id() - if not inf_id: - _LOGGER.error("ERROR getting infrastructure info: No infrastructure ID!!") - return self._mvs_seen - now = cpyutils.eventloop.now() - server = self._get_server() - auth_data = self._read_auth_data(self._IM_VIRTUAL_CLUSTER_AUTH_DATA_FILE) - (success, vm_ids) = server.GetInfrastructureInfo(inf_id, auth_data) - if not success: - _LOGGER.error("ERROR getting infrastructure info: %s" % vm_ids) - else: - # The first one is always the front-end node - for vm_id in vm_ids[1:]: - clues_node_name = None - ec3_additional_vm = None - try: - (success, radl_data) = server.GetVMInfo(inf_id, vm_id, auth_data) - if success: - radl = radl_parse.parse_radl(radl_data) - clues_node_name = radl.systems[0].getValue('net_interface.0.dns_name') - # In case that the name is not set, use the default value - if clues_node_name is None: - clues_node_name = "vnode-#N#" - if '#N#' in clues_node_name: - clues_node_name = clues_node_name.replace('#N#', vm_id) - ec3_additional_vm = radl.systems[0].getValue('ec3_additional_vm') - state = radl.systems[0].getValue('state') - else: - _LOGGER.error("ERROR getting VM info: %s" % vm_id) - except TypeError: - success = False - reload(radl_parse) - _LOGGER.exception("ERROR getting VM info: %s. Trying to reload radl_parse module." % vm_id) - except: - success = False - _LOGGER.exception("ERROR getting VM info: %s" % vm_id) - - if clues_node_name and state not in [VirtualMachine.STOPPED]: - # Create or update VM info - if clues_node_name not in self._mvs_seen: - self._mvs_seen[clues_node_name] = self.VM_Node(vm_id, radl, ec3_additional_vm) - else: - if self._mvs_seen[clues_node_name].vm_id != vm_id: - # this must not happen ... - _LOGGER.warning("Node %s in VM with id %s now have a new ID: %s" % (clues_node_name, self._mvs_seen[clues_node_name].vm_id, vm_id)) - if ec3_additional_vm: - _LOGGER.debug("Node %s is an additional not recovering it." % clues_node_name) - else: - self.power_off(clues_node_name) - self._mvs_seen[clues_node_name].update(vm_id, radl) - - enabled = True - node_found = self._clues_daemon.get_node(clues_node_name) - if node_found: - enabled = node_found.enabled - - self._mvs_seen[clues_node_name].seen() - last_state = self._mvs_seen[clues_node_name].last_state - self._mvs_seen[clues_node_name].last_state = state - - if state in [VirtualMachine.FAILED, VirtualMachine.UNCONFIGURED]: - # This VM is in "terminal" state remove it from the infrastructure - _LOGGER.error("Node %s in VM with id %s is in state: %s" % (clues_node_name, vm_id, state)) - - if state == VirtualMachine.UNCONFIGURED: - # in case of unconfigured show the log to make easier debug - # but only the first time - if last_state != VirtualMachine.UNCONFIGURED: - (success, contmsg) = server.GetVMContMsg(inf_id, vm_id, auth_data) - _LOGGER.debug("Contextualization msg: %s" % contmsg) - # check if node is disabled and do not recover it - if enabled: - if ec3_additional_vm: - _LOGGER.debug("Node %s is an additional not recovering it." % clues_node_name) - elif node_found and node_found.state == Node.USED: - _LOGGER.warning("Node %s is in use, not recovering it." % clues_node_name) - else: - self.recover(clues_node_name, node_found) - else: - _LOGGER.debug("Node %s is disabled not recovering it." % clues_node_name) - else: - if ec3_additional_vm: - _LOGGER.debug("Node %s is an additional not recovering it." % clues_node_name) - else: - self.recover(clues_node_name, node_found) - elif state == VirtualMachine.OFF: - _LOGGER.warning("Node %s in VM with id %s is in state: %s" % (clues_node_name, vm_id, state)) - if clues_node_name in self._stopped_vms: - _LOGGER.info("Node %s in the list of Stopped nodes. Remove VM with id %s." % (clues_node_name, vm_id)) - self.recover(clues_node_name, node_found) - # Otherwise Do not terminate this VM, let's wait to lifecycle to check if it must be terminated - elif state == VirtualMachine.UNKNOWN: - # Do not terminate this VM, let's wait to lifecycle to check if it must be terminated - _LOGGER.warning("Node %s in VM with id %s is in state: %s" % (clues_node_name, vm_id, state)) - else: - if state not in [VirtualMachine.STOPPED]: - _LOGGER.warning("VM with id %s does not have dns_name specified." % vm_id) - else: - continue - #_LOGGER.debug("Node %s with VM with id %s is stopped." % (clues_node_name, vm_id)) - - # from the nodes that we have powered on, check which of them are still running - vms = self._mvs_seen.items() - for nname, node in vms: - if (now - node.timestamp_seen) > self._IM_VIRTUAL_CLUSTER_FORGET_MISSING_VMS: - _LOGGER.debug("vm %s is not seen for a while... let's forget it" % nname) - del self._mvs_seen[nname] - - return self._mvs_seen - - def _recover_ids(self, vms): - for vm in vms: - self.recover(vm) - - def power_on(self, nname): - success = None - try: - vms = self._get_vms() - - inf_id = self._get_inf_id() - if not inf_id: - _LOGGER.error("ERROR launching %s node: No infrastructure ID!!" % nname) - return False, nname - - if nname in vms: - _LOGGER.warning("Trying to launch an existing node %s. Ignoring it." % nname) - return False, nname - - ec3_reuse_nodes = False - if len(self._stopped_vms) > 0: - if self._stopped_vms.get(nname): - ec3_reuse_nodes = True - vm_id = self._stopped_vms.get(nname) - - server = self._get_server() - auth_data = self._read_auth_data(self._IM_VIRTUAL_CLUSTER_AUTH_DATA_FILE) - if ec3_reuse_nodes: - (success, vms_id) = server.StartVM(inf_id, vm_id, auth_data) - else: - radl_data = self._get_radl(nname) - if radl_data: - _LOGGER.debug("RADL to launch/restart node %s: %s" % (nname, radl_data)) - (success, vms_id) = server.AddResource(inf_id, radl_data, auth_data) - else: - _LOGGER.error("RADL to launch node %s is empty!!" % nname) - return False, nname - except: - success = False - _LOGGER.exception("Error launching/restarting node %s " % nname) - return False, nname - - if success: - _LOGGER.debug("Node %s successfully created/restarted" % nname) - else: - _LOGGER.error("ERROR launching node %s: %s" % (nname, vms_id)) - - return success, nname - - def power_off(self, nname, destroy=False): - _LOGGER.debug("Powering off/stopping %s" % nname) - try: - inf_id = self._get_inf_id() - if not inf_id: - _LOGGER.error("ERROR deleting %s node: No infrastructure ID!!" % nname) - return False, nname - - server = self._get_server() - success = False - - if nname in self._mvs_seen: - vm = self._mvs_seen[nname] - ec3_destroy_interval = vm.radl.systems[0].getValue('ec3_destroy_interval', 0) - ec3_destroy_safe = vm.radl.systems[0].getValue('ec3_destroy_safe', 0) - ec3_reuse_nodes = vm.radl.systems[0].getValue('ec3_reuse_nodes', 0) - - poweroff = True - if ec3_destroy_interval > 0: - poweroff = False - live_time = cpyutils.eventloop.now() - vm.timestamp_created - remaining_paid_time = ec3_destroy_interval - live_time % ec3_destroy_interval - _LOGGER.debug("Remaining_paid_time = %d for node %s" % (int(remaining_paid_time), nname)) - if remaining_paid_time < ec3_destroy_safe: - poweroff = True - - if poweroff: - auth_data = self._read_auth_data(self._IM_VIRTUAL_CLUSTER_AUTH_DATA_FILE) - if ec3_reuse_nodes and not destroy: - (success, vm_ids) = server.StopVM(inf_id, vm.vm_id, auth_data) - self._stopped_vms[nname] = vm.vm_id - self._store_stopped_vm(nname, vm.vm_id) - if not success: - _LOGGER.error("ERROR stopping node: %s: %s" % (nname,vm_ids)) - elif vm_ids == 0: - _LOGGER.error("ERROR stopping node: %s. No VM has been stopped." % nname) - else: - if nname in self._stopped_vms: - self._delete_stopped_vm(nname) - - (success, vm_ids) = server.RemoveResource(inf_id, vm.vm_id, auth_data) - if not success: - _LOGGER.error("ERROR deleting node: %s: %s" % (nname,vm_ids)) - elif vm_ids == 0: - _LOGGER.error("ERROR deleting node: %s. No VM has been deleted." % nname) - else: - _LOGGER.debug("Not powering off/stopping node %s" % nname) - success = False - else: - _LOGGER.warning("There is not any VM associated to node %s (are IM credentials compatible to the VM?)" % nname) - except: - _LOGGER.exception("Error powering off/stopping node %s " % nname) - success = False - - return success, nname - - def lifecycle(self): - try: - monitoring_info = self._clues_daemon.get_monitoring_info() - now = cpyutils.eventloop.now() - - vms = self._get_vms() - - recover = [] - # To store the name of the nodes to use it in the third case - node_names = [] - - # Two cases: (1) a VM that is on in the monitoring info, but it is not seen in IM; and (2) a VM that is off in the monitoring info, but it is seen in IM - for node in monitoring_info.nodelist: - node_names.append(node.name) - - if node.name in vms and node.state in [Node.IDLE, Node.USED]: - if node.name in vms: - vm = vms[node.name] - state = vm.radl.systems[0].getValue('state') - # user request use of golden images and the image is fully configured - if vm.radl.systems[0].getValue("ec3_golden_images"): - if state == VirtualMachine.CONFIGURED: - ec3_class = vm.radl.systems[0].getValue("ec3_class") - # check if the image is in the list of saved images - if ec3_class not in self._golden_images: - # if not save it - self._save_golden_image(vm) - else: - _LOGGER.debug("node %s is idle/used but it is not yet configured. Do not save golden image." % (node.name)) - else: - # This may happen because it is launched manually not setting the dns_name of the node - _LOGGER.warning("node %s is detected by the monitoring system, but there is not any VM associated to it (are IM credentials compatible to the VM?)" % node.name) - - if node.enabled: - if node.state in [Node.OFF, Node.OFF_ERR, Node.UNKNOWN]: - if self._IM_VIRTUAL_CLUSTER_DROP_FAILING_VMS > 0: - if node.name in vms: - vm = vms[node.name] - time_off = now - node.timestamp_state - time_recovered = now - vm.timestamp_recovered - _LOGGER.warning("node %s has a VM running but it is OFF or UNKNOWN in the monitoring system since %d seconds" % (node.name, time_off)) - if time_off > self._IM_VIRTUAL_CLUSTER_DROP_FAILING_VMS: - if time_recovered > self._IM_VIRTUAL_CLUSTER_DROP_FAILING_VMS: - _LOGGER.warning("Trying to recover it (state: %s)" % node.state) - vm.recovered() - recover.append(node.name) - else: - _LOGGER.debug("node %s has been recently recovered %d seconds ago. Do not recover it yet." % (node.name, time_recovered)) - else: - if node.name not in vms: - # This may happen because it is launched by hand using other credentials than those for the user used for IM (and he cannot manage the VMS) - _LOGGER.warning("node %s is detected by the monitoring system, but there is not any VM associated to it (are IM credentials compatible to the VM?)" % node.name) - - # A third case: a VM that it is seen in IM but does not correspond to any node in the monitoring info - # This is a strange case but we assure not to have uncontrolled VMs - for name in vms: - vm = vms[name] - full_name = name + ".localdomain" - if name not in node_names and full_name not in node_names and not vm.ec3_additional_vm: - _LOGGER.warning("VM with name %s is detected by the IM but it does not exist in the monitoring system... (%s) recovering it.)" % (name, node_names)) - vm.recovered() - recover.append(name) - - self._recover_ids(recover) - except: - _LOGGER.exception("Error executing lifecycle of IM PowerManager.") - - return PowerManager.lifecycle(self) - - def recover(self, nname, node=None): - success, nname = self.power_off(nname, True) - if success: - if node: - node.mark_poweredoff() - node.set_state(Node.OFF_ERR) - return Node.OFF - - return False - - def _save_golden_image(self, vm): - success = False - _LOGGER.debug("Saving golden image for VM id: " + vm.vm_id) - try: - server = self._get_server() - auth_data = self._read_auth_data(self._IM_VIRTUAL_CLUSTER_AUTH_DATA_FILE) - image_name = "im-%s" % str(uuid1()) - (success, new_image) = server.CreateDiskSnapshot(self._get_inf_id(), vm.vm_id, 0, image_name, True, auth_data) - if success: - ec3_class = vm.radl.systems[0].getValue("ec3_class") - password = vm.radl.systems[0].getValue("disk.0.os.credentials.password") - self._golden_images[ec3_class] = new_image, password - # Save it to DB for persistence - self._store_golden_image(ec3_class, new_image, password) - else: - _LOGGER.error("Error saving golden image: %s." % new_image) - except: - _LOGGER.exception("Error saving golden image.") - return success + for system in radl_all.systems: + if system.getValue("ec3_node_pattern"): + if re.match(system.getValue("ec3_node_pattern"), nname): + return system.name + + # Start with the system named "wn" + current_system = "wn" + while current_system: + system_orig = vm_info[current_system]["radl"] + ec3_max_instances = system_orig.getValue("ec3_max_instances", -1) + if ec3_max_instances < 0: + ec3_max_instances = 99999999 + if vm_info[current_system]["count"] < ec3_max_instances: + return current_system + else: + # we must change the system to the next one + current_system = system_orig.getValue("ec3_if_fail", '') + if not current_system: + _LOGGER.error("Error: we need more instances but ec3_if_fail of system %s is empty" % system_orig.name) + return None + + def _find_wn_nodetemplate_name(self, template): + try: + for name, node in template['topology_template']['node_templates'].items(): + if node['type'].startswith("tosca.nodes.indigo.LRMS.WorkerNode"): + for req in node['requirements']: + if 'host' in req: + return req['host'] + except Exception: + _LOGGER.exception("Error trying to get the WN template.") + + return None + + def _get_template(self, nnode): + inf_id = self._get_inf_id() + auth_data = self._read_auth_data(self._IM_VIRTUAL_CLUSTER_AUTH_DATA_FILE) + + count = 0 + vm_ids = self._get_vm_ids(inf_id, auth_data) + if len(vm_ids) > 1: + count = len(vm_ids) - 1 + + headers = {"Authorization": auth_data, "Accept": "application/json"} + resp = requests.request("GET", "%s/tosca" % inf_id, verify=False, headers=headers) + + if resp.status_code != 200: + _LOGGER.error("ERROR getting infrastructure template: %s" % str(resp.text)) + return None + else: + templateo = yaml.safe_load(resp.json()["tosca"]) + node_name = self._find_wn_nodetemplate_name(templateo) + node_template = templateo['topology_template']['node_templates'][node_name] + + node_template['capabilities']['scalable']['properties']['count'] = count + 1 + # Put the dns name + if 'endpoint' not in node_template['capabilities']: + node_template['capabilities']['endpoint'] = {} + if 'properties' not in node_template['capabilities']['endpoint']: + node_template['capabilities']['endpoint']['properties'] = {} + node_template['capabilities']['endpoint']['properties']['dns_name'] = nnode + + return yaml.safe_dump(templateo) + + def _get_radl(self, nname): + inf_id = self._get_inf_id() + if not inf_id: + _LOGGER.error("Error getting RADL. No infrastructure ID!!") + return None + auth_data = self._read_auth_data(self._IM_VIRTUAL_CLUSTER_AUTH_DATA_FILE) + vm_ids = self._get_vm_ids(inf_id, auth_data) + + # Get all the info from RADL + # Especial features in system: + #- 'ec3_max_instances': maximum number of nodes with this system configuration; a negative value is like no constrain; default value is -1. + #- 'ec3_destroy_interval': some cloud providers pay a certain amount of time in advance, like AWS EC2. The node will be destroyed only when it is idle at the end of the interval expressed by this option in seconds. The default value is 0. + #- 'ec3_destroy_safe': seconds before the deadline set by \'ec3_destroy_interval\' that the node can be destroyed; the default value is 0. + #- 'ec3_if_fail': name of the next system configuration to try after this fails a launch or the number of instances saturates; the default value is ''. + + vm_info = {} + if vm_ids: + # The first one is always the front-end node + for vm_id in vm_ids[1:]: + headers = {"Authorization": auth_data, "Accept": "text/*"} + resp = requests.request("GET", vm_id, verify=False, headers=headers) + if resp.status_code == 200: + radl = radl_parse.parse_radl(resp.text) + ec3_class = radl.systems[0].getValue("ec3_class") + if ec3_class not in vm_info: + vm_info[ec3_class] = {} + vm_info[ec3_class]['count'] = 0 + vm_info[ec3_class]['count'] += 1 + else: + _LOGGER.error("Error getting VM info: %s. %s." % (resp.reason, resp.text)) + + headers = {"Authorization": auth_data, "Accept": "text/*"} + resp = requests.request("GET", "%s/radl" % inf_id, verify=False, headers=headers) + if resp.status_code == 200: + radl_all = radl_parse.parse_radl(resp.text) + else: + _LOGGER.error("Error getting infrastructure RADL: %s. %s." % (resp.reason, resp.text)) + return None + + # Get info from the original RADL + for system in radl_all.systems: + if system.name not in vm_info: + vm_info[system.name] = {} + vm_info[system.name]['count'] = 0 + vm_info[system.name]['radl'] = system + + current_system = self._get_system(vm_info, radl_all, nname) + if current_system: + # launch this system type + new_radl = "" + for net in radl_all.networks: + new_radl += "network " + net.id + "\n" + + system_orig = vm_info[current_system]["radl"] + system_orig.name = nname + system_orig.setValue("net_interface.0.dns_name", str(nname)) + system_orig.setValue("ec3_class", current_system) + if current_system in self._golden_images: + image, password = self._golden_images[current_system] + system_orig.setValue("disk.0.image.url", image) + _LOGGER.debug("A golden image for %s node is stored, using it: %s" % (current_system, image)) + if password: + system_orig.setValue("disk.0.os.credentials.password", password) + new_radl += str(system_orig) + "\n" + + if radl_all.contextualize: + node_citems = [] + node_configures = [] + for citem in radl_all.contextualize.items.values(): + if citem.system == current_system: + node_configures.append(citem.configure) + node_citems.append(contextualize_item(nname, + citem.configure, + citem.num)) + for configure in radl_all.configures: + if configure.name in node_configures: + new_radl += str(configure) + "\n" + new_radl += str(contextualize(node_citems)) + "\n" + else: + node_configure = None + for configure in radl_all.configures: + if not node_configure and configure.name == 'wn': + node_configure = configure + if configure.name == current_system: + node_configure = configure + break + + if node_configure: + node_configure.name = nname + new_radl += str(node_configure) + "\n" + + new_radl += "deploy " + nname + " 1" + + return new_radl + else: + _LOGGER.error("Error generating infrastructure RADL") + return None + + def _get_vm_ids(self, inf_id, auth_data): + headers = {"Authorization": auth_data, "Accept": "application/json"} + resp = requests.request("GET", inf_id, verify=False, headers=headers) + + if resp.status_code != 200: + _LOGGER.error("Error getting infrastructure info: %s. %s." % (resp.reason, resp.text)) + return [] + else: + return [vm_id['uri'] for vm_id in resp.json()["uri-list"]] + + def _get_vms(self): + inf_id = self._get_inf_id() + if not inf_id: + _LOGGER.error("ERROR getting infrastructure info: No infrastructure ID!!") + return self._mvs_seen + now = cpyutils.eventloop.now() + auth_data = self._read_auth_data(self._IM_VIRTUAL_CLUSTER_AUTH_DATA_FILE) + vm_ids = self._get_vm_ids(inf_id, auth_data) + + if vm_ids: + # The first one is always the front-end node + for vm_id in vm_ids[1:]: + clues_node_name = None + ec3_additional_vm = None + try: + headers = {"Authorization": auth_data, "Accept": "text/*"} + resp = requests.request("GET", vm_id, verify=False, headers=headers) + + if resp.status_code == 200: + radl = radl_parse.parse_radl(resp.text) + clues_node_name = radl.systems[0].getValue('net_interface.0.dns_name') + if '#N#' in clues_node_name: + clues_node_name = clues_node_name.replace('#N#', os.path.basename(vm_id)) + ec3_additional_vm = radl.systems[0].getValue('ec3_additional_vm') + state = radl.systems[0].getValue('state') + else: + _LOGGER.error("Error getting VM info: %s. %s." % (resp.reason, resp.text)) + except TypeError: + success = False + reload(radl_parse) + _LOGGER.exception("ERROR getting VM info: %s. Trying to reload radl_parse module." % vm_id) + except: + success = False + _LOGGER.exception("ERROR getting VM info: %s" % vm_id) + + if clues_node_name and state not in [VirtualMachine.STOPPED]: + # Create or update VM info + if clues_node_name not in self._mvs_seen: + self._mvs_seen[clues_node_name] = self.VM_Node(vm_id, radl, ec3_additional_vm) + else: + if self._mvs_seen[clues_node_name].vm_id != vm_id: + # this must not happen ... + _LOGGER.warning("Node %s in VM with id %s now have a new ID: %s" % (clues_node_name, self._mvs_seen[clues_node_name].vm_id, vm_id)) + if ec3_additional_vm: + _LOGGER.debug("Node %s is an additional not recovering it." % clues_node_name) + else: + self.power_off(clues_node_name) + self._mvs_seen[clues_node_name].update(vm_id, radl) + + enabled = True + node_found = self._clues_daemon.get_node(clues_node_name) + if node_found: + enabled = node_found.enabled + + self._mvs_seen[clues_node_name].seen() + last_state = self._mvs_seen[clues_node_name].last_state + self._mvs_seen[clues_node_name].last_state = state + + if state in [VirtualMachine.FAILED, VirtualMachine.UNCONFIGURED]: + # This VM is in "terminal" state remove it from the infrastructure + _LOGGER.error("Node %s in VM with id %s is in state: %s" % (clues_node_name, vm_id, state)) + + if state == VirtualMachine.UNCONFIGURED: + # in case of unconfigured show the log to make easier debug + # but only the first time + if last_state != VirtualMachine.UNCONFIGURED: + headers = {"Authorization": auth_data, "Accept": "text/*"} + resp = requests.request("GET", "%s/contmsg" % vm_id, verify=False, headers=headers) + _LOGGER.debug("Contextualization msg: %s" % resp.text) + # check if node is disabled and do not recover it + if enabled: + if ec3_additional_vm: + _LOGGER.debug("Node %s is an additional not recovering it." % clues_node_name) + else: + self.recover(clues_node_name, node_found) + else: + _LOGGER.debug("Node %s is disabled not recovering it." % clues_node_name) + else: + if ec3_additional_vm: + _LOGGER.debug("Node %s is an additional not recovering it." % clues_node_name) + else: + self.recover(clues_node_name, node_found) + elif state == VirtualMachine.OFF: + _LOGGER.warning("Node %s in VM with id %s is in state: %s" % (clues_node_name, vm_id, state)) + if clues_node_name in self._stopped_vms: + _LOGGER.info("Node %s in the list of Stopped nodes. Remove VM with id %s." % (clues_node_name, vm_id)) + self.recover(clues_node_name, node_found) + # Otherwise Do not terminate this VM, let's wait to lifecycle to check if it must be terminated + elif state == VirtualMachine.UNKNOWN: + # Do not terminate this VM, let's wait to lifecycle to check if it must be terminated + _LOGGER.warning("Node %s in VM with id %s is in state: %s" % (clues_node_name, vm_id, state)) + else: + if state not in [VirtualMachine.STOPPED]: + _LOGGER.warning("VM with id %s does not have dns_name specified." % vm_id) + else: + continue + #_LOGGER.debug("Node %s with VM with id %s is stopped." % (clues_node_name, vm_id)) + + # from the nodes that we have powered on, check which of them are still running + vms = dict(self._mvs_seen) + for nname, node in vms.items(): + if (now - node.timestamp_seen) > self._IM_VIRTUAL_CLUSTER_FORGET_MISSING_VMS: + _LOGGER.debug("vm %s is not seen for a while... let's forget it" % nname) + del self._mvs_seen[nname] + + return self._mvs_seen + + def _recover_ids(self, vms): + for vm in vms: + self.recover(vm) + + def power_on(self, nname): + success = None + try: + vms = self._get_vms() + + inf_id = self._get_inf_id() + if not inf_id: + _LOGGER.error("ERROR launching %s node: No infrastructure ID!!" % nname) + return False, nname + + if nname in vms: + _LOGGER.warning("Trying to launch an existing node %s. Ignoring it." % nname) + return False, nname + + ec3_reuse_nodes = False + if len(self._stopped_vms) > 0: + if self._stopped_vms.get(nname): + ec3_reuse_nodes = True + vm_id = self._stopped_vms.get(nname) + + auth_data = self._read_auth_data(self._IM_VIRTUAL_CLUSTER_AUTH_DATA_FILE) + if ec3_reuse_nodes: + headers = {"Authorization": auth_data} + resp = requests.request("PUT", "%s/start" % vm_id, verify=False, headers=headers) + success = resp.status_code == 200 + if not success: + _LOGGER.error("Error starting VM: %s. %s." % (resp.reason, resp.text)) + else: + if self._IM_VIRTUAL_CLUSTER_TOSCA: + data = self._get_template(nname) + else: + data = self._get_radl(nname) + + if data: + headers = {"Authorization": auth_data, "Accept": "application/json"} + if self._IM_VIRTUAL_CLUSTER_TOSCA: + _LOGGER.debug("TOSCA to launch/restart node %s: %s" % (nname, data)) + headers["Content-Type"] = "text/yaml" + else: + _LOGGER.debug("RADL to launch/restart node %s: %s" % (nname, data)) + headers["Content-Type"] = "text/plain" + resp = requests.request("POST", inf_id, verify=False, headers=headers, data=data) + success = resp.status_code == 200 + if success: + vms_id = [vm_id['uri'] for vm_id in resp.json()["uri-list"]] + else: + _LOGGER.error("Error adding resources %s. %s." % (resp.reason, resp.text)) + return False, nname + else: + _LOGGER.error("RADL to launch node %s is empty!!" % nname) + return False, nname + except: + success = False + _LOGGER.exception("Error launching/restarting node %s " % nname) + return False, nname + + if success: + _LOGGER.debug("Node %s successfully created/restarted" % nname) + + return success, nname + + def power_off(self, nname, destroy=False): + _LOGGER.debug("Powering off/stopping %s" % nname) + try: + inf_id = self._get_inf_id() + if not inf_id: + _LOGGER.error("ERROR deleting %s node: No infrastructure ID!!" % nname) + return False, nname + + success = False + + if nname in self._mvs_seen: + vm = self._mvs_seen[nname] + ec3_destroy_interval = vm.radl.systems[0].getValue('ec3_destroy_interval', 0) + ec3_destroy_safe = vm.radl.systems[0].getValue('ec3_destroy_safe', 0) + ec3_reuse_nodes = vm.radl.systems[0].getValue('ec3_reuse_nodes', 0) + + poweroff = True + if ec3_destroy_interval > 0: + poweroff = False + live_time = cpyutils.eventloop.now() - vm.timestamp_created + remaining_paid_time = ec3_destroy_interval - live_time % ec3_destroy_interval + _LOGGER.debug("Remaining_paid_time = %d for node %s" % (int(remaining_paid_time), nname)) + if remaining_paid_time < ec3_destroy_safe: + poweroff = True + + if poweroff: + auth_data = self._read_auth_data(self._IM_VIRTUAL_CLUSTER_AUTH_DATA_FILE) + if ec3_reuse_nodes and not destroy: + headers = {"Authorization": auth_data} + resp = requests.request("PUT", "%s/stop" % vm.vm_id, verify=False, headers=headers) + success = resp.status_code == 200 + self._stopped_vms[nname] = vm.vm_id + self._store_stopped_vm(nname, vm.vm_id) + if not success: + _LOGGER.error("ERROR stopping node %s: %s. %s." % (nname, resp.reason, resp.text)) + else: + _LOGGER.debug("Node %s successfully stopped." % nname) + else: + if nname in self._stopped_vms: + self._delete_stopped_vm(nname) + + headers = {"Authorization": auth_data} + resp = requests.request("DELETE", vm.vm_id, verify=False, headers=headers) + success = resp.status_code == 200 + if not success: + _LOGGER.error("ERROR deleting node %s: %s. %s." % (nname, resp.reason, resp.text)) + else: + _LOGGER.debug("Node %s successfully deleted." % nname) + else: + _LOGGER.debug("Not powering off/stopping node %s" % nname) + success = False + else: + _LOGGER.warning("There is not any VM associated to node %s (are IM credentials compatible to the VM?)" % nname) + except: + _LOGGER.exception("Error powering off/stopping node %s " % nname) + success = False + + return success, nname + + def lifecycle(self): + try: + monitoring_info = self._clues_daemon.get_monitoring_info() + now = cpyutils.eventloop.now() + + vms = self._get_vms() + + recover = [] + # To store the name of the nodes to use it in the third case + node_names = [] + + # Two cases: (1) a VM that is on in the monitoring info, but it is not seen in IM; and (2) a VM that is off in the monitoring info, but it is seen in IM + for node in monitoring_info.nodelist: + node_names.append(node.name) + + if node.name in vms and node.state in [Node.IDLE, Node.USED]: + if node.name in vms: + vm = vms[node.name] + state = vm.radl.systems[0].getValue('state') + # user request use of golden images and the image is fully configured + if vm.radl.systems[0].getValue("ec3_golden_images"): + if state == VirtualMachine.CONFIGURED: + ec3_class = vm.radl.systems[0].getValue("ec3_class") + # check if the image is in the list of saved images + if ec3_class not in self._golden_images: + # if not save it + self._save_golden_image(vm) + else: + _LOGGER.debug("node %s is idle/used but it is not yet configured. Do not save golden image." % (node.name)) + else: + # This may happen because it is launched manually not setting the dns_name of the node + _LOGGER.warning("node %s is detected by the monitoring system, but there is not any VM associated to it (are IM credentials compatible to the VM?)" % node.name) + + if node.enabled: + if node.state in [Node.OFF, Node.OFF_ERR, Node.UNKNOWN]: + if self._IM_VIRTUAL_CLUSTER_DROP_FAILING_VMS > 0: + if node.name in vms: + vm = vms[node.name] + time_off = now - max(node.timestamp_state, vm.timestamp_created) + time_recovered = now - vm.timestamp_recovered + _LOGGER.warning("node %s has a VM running but it is OFF or UNKNOWN in the monitoring system since %d seconds" % (node.name, time_off)) + if time_off > self._IM_VIRTUAL_CLUSTER_DROP_FAILING_VMS: + if time_recovered > self._IM_VIRTUAL_CLUSTER_DROP_FAILING_VMS: + _LOGGER.warning("Trying to recover it (state: %s)" % node.state) + vm.recovered() + recover.append(node.name) + else: + _LOGGER.debug("node %s has been recently recovered %d seconds ago. Do not recover it yet." % (node.name, time_recovered)) + else: + if node.name not in vms: + # This may happen because it is launched by hand using other credentials than those for the user used for IM (and he cannot manage the VMS) + _LOGGER.warning("node %s is detected by the monitoring system, but there is not any VM associated to it (are IM credentials compatible to the VM?)" % node.name) + + # A third case: a VM that it is seen in IM but does not correspond to any node in the monitoring info + # This is a strange case but we assure not to have uncontrolled VMs + for name in vms: + vm = vms[name] + if name not in node_names and not vm.ec3_additional_vm: + _LOGGER.warning("VM with name %s is detected by the IM but it does not exist in the monitoring system... (%s) recovering it.)" % (name, node_names)) + vm.recovered() + recover.append(name) + + self._recover_ids(recover) + except: + _LOGGER.exception("Error executing lifecycle of IM PowerManager.") + + return PowerManager.lifecycle(self) + + def recover(self, nname, node=None): + success, nname = self.power_off(nname, True) + if success: + if node: + node.mark_poweredoff() + node.set_state(Node.OFF_ERR) + return Node.OFF + + return False + + def _save_golden_image(self, vm): + success = False + _LOGGER.debug("Saving golden image for VM id: " + vm.vm_id) + try: + auth_data = self._read_auth_data(self._IM_VIRTUAL_CLUSTER_AUTH_DATA_FILE) + image_name = "im-%s" % str(uuid1()) + headers = {"Authorization": auth_data} + resp = requests.request("PUT", "%s/disks/0/snapshot?image_name=%s&auto_delete=1" % (vm.vm_id, image_name), + verify=False, headers=headers) + + if resp.status_code == 200: + new_image = resp.text + ec3_class = vm.radl.systems[0].getValue("ec3_class") + password = vm.radl.systems[0].getValue("disk.0.os.credentials.password") + self._golden_images[ec3_class] = new_image, password + # Save it to DB for persistence + self._store_golden_image(ec3_class, new_image, password) + else: + _LOGGER.error("Error saving golden image: %s. %s." % (resp.reason, resp.text)) + except: + _LOGGER.exception("Error saving golden image.") + return success diff --git a/cluesplugins/kubernetes.py b/cluesplugins/kubernetes.py index f5221cb..c3d4d26 100644 --- a/cluesplugins/kubernetes.py +++ b/cluesplugins/kubernetes.py @@ -142,6 +142,7 @@ def _get_node_used_resources(self, nodename, pods_data): if "nodeName" in pod["spec"] and nodename == pod["spec"]["nodeName"]: # do not count the number of pods in case finished jobs if pod["status"]["phase"] not in ["Succeeded", "Failed"]: + # do not count the number of pods in case of system ones # nor in case of DaemonSets if (pod["metadata"]["namespace"] in ["kube-system", "kube-flannel"] or "ownerReferences" in pod["metadata"] and pod["metadata"]["ownerReferences"] and pod["metadata"]["ownerReferences"][0]["kind"] == "DaemonSet"): @@ -183,7 +184,6 @@ def get_nodeinfolist(self): sgx = int(node["status"]["allocatable"]["sgx.k8s.io/sgx"]) # Skip master node - if ("node-role.kubernetes.io/master" in node["metadata"]["labels"] or "node-role.kubernetes.io/control-plane" in node["metadata"]["labels"]): _LOGGER.debug("Node %s seems to be master node, skiping." % name) @@ -343,9 +343,9 @@ def get_jobinfolist(self): cpus, memory, ngpus, agpus, sgx = self._get_pod_cpus_and_memory(pod) - req_str = '(pods_free > 0) && (schedule = 1)' + req_str = '(pods_free > 0) && (schedule == 1)' if 'nodeName' in pod["spec"] and pod["spec"]["nodeName"]: - req_str += ' && (nodeName = "%s")' % pod["spec"]["nodeName"] + req_str += ' && (nodeName == "%s")' % pod["spec"]["nodeName"] if ngpus: req_str += ' && (nvidia_gpu >= %d)' % ngpus if agpus: diff --git a/etc/conf.d/plugin-im.cfg-example b/etc/conf.d/plugin-im.cfg-example index 8af42fd..e665860 100644 --- a/etc/conf.d/plugin-im.cfg-example +++ b/etc/conf.d/plugin-im.cfg-example @@ -7,9 +7,10 @@ # ------------------------------------------------------------------------------------------------------------------------------------------------------------------------- [IM VIRTUAL CLUSTER] -IM_VIRTUAL_CLUSTER_XMLRPC=http://localhost:8899 -IM_VIRTUAL_CLUSTER_XMLRCP_SSL_CA_CERTS='' -IM_VIRTUAL_CLUSTER_XMLRCP_SSL=False +IM_VIRTUAL_CLUSTER_TOSCA=False +IM_VIRTUAL_CLUSTER_REST_API=http://localhost:8800 +IM_VIRTUAL_CLUSTER_REST_SSL_CA_CERTS='' +IM_VIRTUAL_CLUSTER_REST_SSL=False IM_VIRTUAL_CLUSTER_AUTH_DATA_FILE=/usr/local/ec3/auth.dat IM_VIRTUAL_CLUSTER_DROP_FAILING_VMS=30 IM_VIRTUAL_CLUSTER_FORGET_MISSING_VMS=30 diff --git a/test/test-files/tosca.yml b/test/test-files/tosca.yml new file mode 100644 index 0000000..57916a1 --- /dev/null +++ b/test/test-files/tosca.yml @@ -0,0 +1,197 @@ +tosca_definitions_version: tosca_simple_yaml_1_0 + +imports: + - ec3_custom_types: https://raw.githubusercontent.com/grycap/ec3/tosca/tosca/custom_types.yaml + +metadata: + display_name: Launch a Kubernetes Virtual Cluster + icon: images/kubernetes.png + order: 5 + +description: TOSCA template for launching a Kubernetes Virtual Cluster. + +topology_template: + inputs: + wn_num: + type: integer + description: Number of WNs in the cluster + default: 1 + required: yes + fe_cpus: + type: integer + description: Number of CPUs for the front-end node + default: 2 + required: yes + constraints: + - valid_values: [ 2, 4, 8, 16, 32, 64 ] + fe_mem: + type: scalar-unit.size + description: Amount of Memory for the front-end node + default: 4 GB + required: yes + constraints: + - valid_values: [ 4 GB, 8 GB, 16 GB, 32 GB, 64 GB, 128 GB, 256 GB, 512 GB ] + fe_instance_type: + type: string + description: Flavor name of the front-end node + default: "" + wn_cpus: + type: integer + description: Number of CPUs for the WNs + default: 2 + required: yes + constraints: + - valid_values: [ 2, 4, 8, 16, 32, 64 ] + wn_mem: + type: scalar-unit.size + description: Amount of Memory for the WNs + default: 4 GB + required: yes + constraints: + - valid_values: [ 4 GB, 8 GB, 16 GB, 32 GB, 64 GB, 128 GB, 256 GB, 512 GB ] + wn_instance_type: + type: string + description: Flavor name for the WNs + default: "" + disk_size: + type: scalar-unit.size + description: Size of the disk to be attached to the FE instance + default: 10 GB + constraints: + - valid_values: [ 10 GB, 20 GB, 50 GB, 100 GB, 200 GB, 500 GB, 1 TB, 2 TB ] + volume_id: + type: string + description: "Or URL of the disk to be attached to the instance (format: ost://api.cloud.ifca.es/" + default: "" + + admin_token: + type: string + description: Access Token for the Kubernetes admin user + default: not_very_secret_token + kube_version: + type: string + description: Version of Kubernetes to install + default: "1.21.9" + constraints: + - valid_values: [ "1.21.9", "1.22.6", "1.23.2", "1.20.15", "1.19.16" ] + cri_runtime: + type: string + description: CRI Runtime to use with Kubernetes + default: "docker" + constraints: + - valid_values: [ docker, containerd ] + install_kubeapps: + type: boolean + description: Flag to set the kubeapps UI to be installed + default: true + constraints: + - valid_values: [ true, false ] + kube_nvidia_support: + type: boolean + description: Flag to add NVIDIA support + default: false + constraints: + - valid_values: [ false, true ] + kube_cert_manager: + type: boolean + description: Flag to install Cert-Manager + default: false + constraints: + - valid_values: [ false, true ] + kube_cert_user_email: + type: string + description: Email to be used in the Let's Encrypt issuer + default: "jhondoe@server.com" + + node_templates: + + lrms_front_end: + type: tosca.nodes.indigo.LRMS.FrontEnd.Kubernetes + capabilities: + endpoint: + properties: + ports: + http_port: + protocol: tcp + source: 80 + https_port: + protocol: tcp + source: 443 + kube_port: + protocol: tcp + source: 6443 + properties: + admin_username: kubeuser + install_nfs_client: true + admin_token: { get_input: admin_token } + install_kubeapps: { get_input: install_kubeapps } + version: { get_input: kube_version } + nvidia_support: { get_input: kube_nvidia_support } + cert_manager: { get_input: kube_cert_manager } + cert_user_email: { get_input: kube_cert_user_email } + cri_runtime: { get_input: cri_runtime } + requirements: + - host: front + + front: + type: tosca.nodes.indigo.Compute + capabilities: + endpoint: + properties: + dns_name: kubeserver + network_name: PUBLIC + host: + properties: + num_cpus: { get_input: fe_cpus } + mem_size: { get_input: fe_mem } + instance_type: { get_input: fe_instance_type } + os: + properties: + distribution: ubuntu + type: linux + requirements: + - local_storage: + node: fe_block_storage + relationship: + type: AttachesTo + properties: + location: /pv + + fe_block_storage: + type: tosca.nodes.BlockStorage + properties: + size: { get_input: disk_size } + volume_id: { get_input: volume_id } + + wn_node: + type: tosca.nodes.indigo.LRMS.WorkerNode.Kubernetes + properties: + front_end_ip: { get_attribute: [ front, private_address, 0 ] } + version: { get_input: kube_version } + nvidia_support: { get_input: kube_nvidia_support } + cri_runtime: { get_input: cri_runtime } + requirements: + - host: wn + + wn: + type: tosca.nodes.indigo.Compute + capabilities: + scalable: + properties: + count: 1 + host: + properties: + num_cpus: 1 + mem_size: 2 GB + os: + properties: + distribution: ubuntu + type: linux + + outputs: + dashboard_endpoint: + value: { concat: [ 'https://', get_attribute: [ front, public_address, 0 ], '/dashboard/' ] } + api_endpoint: + value: { concat: [ 'https://', get_attribute: [ front, public_address, 0 ], ':6443' ] } + kubeapps_endpoint: + value: { concat: [ 'https://', get_attribute: [ front, public_address, 0 ], '/kubeapps/' ] } diff --git a/test/test_im.py b/test/test_im.py index 9769837..63e9424 100644 --- a/test/test_im.py +++ b/test/test_im.py @@ -18,6 +18,7 @@ import unittest import sys import os +import yaml from mock.mock import MagicMock, patch from radl.radl_parse import parse_radl @@ -39,20 +40,18 @@ def __init__(self, *args): def test_read_auth_data(self): res = powermanager._read_auth_data(os.path.join(self.TESTS_PATH, 'test-files/auth.dat')) - self.assertEqual(res[0], {'type': 'InfrastructureManager', 'username': 'user', 'password': 'pass'}) - self.assertEqual(res[1], {'host': 'server:2633', - 'id': 'one', - 'password': 'pass', - 'type': 'OpenNebula', - 'username': 'user'}) + lines = res.split('\\n') + self.assertEqual(lines[0], 'type = InfrastructureManager; username = user; password = pass') + self.assertEqual(lines[1], 'id = one; type = OpenNebula; host = server:2633; username = user; password = pass') @patch("cluesplugins.im.powermanager._read_auth_data") - @patch("cluesplugins.im.powermanager._get_server") @patch("cpyutils.db.DB.create_from_string") - def test_get_inf_id(self, createdb, get_server, read_auth): - server = MagicMock() - server.GetInfrastructureList.return_value = (True, ['infid1']) - get_server.return_value = server + @patch("requests.request") + def test_get_inf_id(self, request, createdb, read_auth): + resp = MagicMock() + resp.status_code = 200 + resp.json.return_value = {"uri-list": [{"uri": "http://server.com/infid1"}]} + request.return_value = resp read_auth.return_value = {'type': 'InfrastructureManager', 'username': 'user', 'password': 'pass'} @@ -62,27 +61,54 @@ def test_get_inf_id(self, createdb, get_server, read_auth): test_im = powermanager() res = test_im._get_inf_id() - self.assertEqual(res, "infid1") + self.assertEqual(res, "http://server.com/infid1") @patch("cluesplugins.im.powermanager.recover") @patch("cluesplugins.im.powermanager._read_auth_data") @patch("cluesplugins.im.powermanager._get_inf_id") - @patch("cluesplugins.im.powermanager._get_server") @patch("cpyutils.db.DB.create_from_string") @patch("cpyutils.eventloop.now") - def test_get_vms(self, now, createdb, get_server, get_inf_id, read_auth, recover): - get_inf_id.return_value = "infid" - read_auth.return_value = {'type': 'InfrastructureManager', 'username': 'user', 'password': 'pass'} + @patch("requests.request") + def test_get_vms(self, request, now, createdb, get_inf_id, read_auth, recover): + get_inf_id.return_value = "http://server.com/infid" + read_auth.return_value = "type = InfrastructureManager; username = user; password = pass" now.return_value = 100 - server = MagicMock() - server.GetInfrastructureInfo.return_value = (True, ['0', '1']) - radl = """system wn ( + inf_info = MagicMock() + inf_info.status_code = 200 + inf_info.json.return_value = {"uri-list": [{"uri": "http://server.com/infid/vms/0"}, + {"uri": "http://server.com/infid/vms/1"}]} + vm_info = MagicMock() + vm_info.status_code = 200 + vm_info.text = """system wn ( net_interface.0.dns_name = 'node-#N#' and state = 'configured' )""" - server.GetVMInfo.return_value = (True, radl) - get_server.return_value = server + + vm_info2 = MagicMock() + vm_info2.status_code = 200 + vm_info2.text = """system wn ( + net_interface.0.dns_name = 'node-#N#' and + state = 'unconfigured' + )""" + + ctxt_out = MagicMock() + ctxt_out.status_code = 200 + ctxt_out.text = "ERROR!" + + vm_info3 = MagicMock() + vm_info3.status_code = 200 + vm_info3.text = """system wn ( + net_interface.0.dns_name = 'node-#N#' and + state = 'unconfigured' and + ec3_additional_vm = 'true' + )""" + + request.side_effect = [inf_info, vm_info, + inf_info, vm_info, + inf_info, vm_info2, ctxt_out, + inf_info, vm_info3, + inf_info, vm_info3] db = MagicMock() db.sql_query.return_value = True, "", [] @@ -96,8 +122,9 @@ def test_get_vms(self, now, createdb, get_server, get_inf_id, read_auth, recover node.state = Node.IDLE test_im._clues_daemon.get_node.return_value = node res = test_im._get_vms() + self.assertEqual(len(res), 1) - self.assertEqual(res['node-1'].vm_id, '1') + self.assertEqual(res['node-1'].vm_id, 'http://server.com/infid/vms/1') self.assertEqual(res['node-1'].last_state, "configured") self.assertEqual(res['node-1'].timestamp_seen, 100) self.assertEqual(res['node-1'].timestamp_created, 100) @@ -108,12 +135,6 @@ def test_get_vms(self, now, createdb, get_server, get_inf_id, read_auth, recover self.assertEqual(res2['node-1'].timestamp_seen, 200) # Test the node is unconfigured - radl = """system wn ( - net_interface.0.dns_name = 'node-#N#' and - state = 'unconfigured' - )""" - server.GetVMInfo.return_value = (True, radl) - server.GetVMContMsg.return_value = (True, "ERROR!") res = test_im._get_vms() # Recover must be called self.assertEqual(recover.call_count, 1) @@ -125,38 +146,49 @@ def test_get_vms(self, now, createdb, get_server, get_inf_id, read_auth, recover self.assertEqual(recover.call_count, 1) node.enabled = True - radl = """system wn ( - net_interface.0.dns_name = 'node-#N#' and - state = 'unconfigured' and - ec3_additional_vm = 'true' - )""" - server.GetVMInfo.return_value = (True, radl) res = test_im._get_vms() # Recover must NOT be called again in this case self.assertEqual(recover.call_count, 1) @patch("cluesplugins.im.powermanager._read_auth_data") @patch("cluesplugins.im.powermanager._get_inf_id") - @patch("cluesplugins.im.powermanager._get_server") @patch("cpyutils.db.DB.create_from_string") @patch("cpyutils.eventloop.now") - def test_get_radl(self, now, createdb, get_server, get_inf_id, read_auth): - get_inf_id.return_value = "infid" - read_auth.return_value = {'type': 'InfrastructureManager', 'username': 'user', 'password': 'pass'} + @patch("requests.request") + def test_get_radl(self, request, now, createdb, get_inf_id, read_auth): + get_inf_id.return_value = "http://server.com/infid" + read_auth.return_value = "type = InfrastructureManager; username = user; password = pass" now.return_value = 100 - server = MagicMock() - server.GetInfrastructureInfo.return_value = (True, ['0', '1']) - radl = """system wn ( + inf_info = MagicMock() + inf_info.status_code = 200 + inf_info.json.return_value = {"uri-list": [{"uri": "http://server.com/infid/vms/0"}, + {"uri": "http://server.com/infid/vms/1"}]} + vm_info = MagicMock() + vm_info.status_code = 200 + vm_info.text = """system wn ( net_interface.0.dns_name = 'node-#N#' and - ec3_class = 'wn' + state = 'unconfigured' and + ec3_additional_vm = 'true' )""" - server.GetVMInfo.return_value = (True, radl) - infra_radl = """system wn ( + + radl_info = MagicMock() + radl_info.status_code = 200 + radl_info.text = """system wn ( net_interface.0.dns_name = 'node-#N#' )""" - server.GetInfrastructureRADL.return_value = (True, infra_radl) - get_server.return_value = server + + radl_info2 = MagicMock() + radl_info2.status_code = 200 + radl_info2.text = """system wn ( + net_interface.0.dns_name = 'node-#N#' + ) + contextualize ( + system wn configure wn + )""" + + request.side_effect = [inf_info, vm_info, radl_info, + inf_info, vm_info, radl_info2] db = MagicMock() db.sql_query.return_value = True, "", [] @@ -172,13 +204,6 @@ def test_get_radl(self, now, createdb, get_server, get_inf_id, read_auth): self.assertEqual(radl_res.deploys[0].id, 'node-2') self.assertEqual(radl_res.deploys[0].vm_number, 1) - infra_radl = """system wn ( - net_interface.0.dns_name = 'node-#N#' - ) - contextualize ( - system wn configure wn - )""" - server.GetInfrastructureRADL.return_value = (True, infra_radl) res = test_im._get_radl('node-2') radl_res = parse_radl(res) self.assertEqual(radl_res.contextualize.items[('node-2', 'wn')].system, 'node-2') @@ -188,15 +213,15 @@ def test_get_radl(self, now, createdb, get_server, get_inf_id, read_auth): @patch("cluesplugins.im.powermanager._get_vms") @patch("cluesplugins.im.powermanager._read_auth_data") @patch("cluesplugins.im.powermanager._get_inf_id") - @patch("cluesplugins.im.powermanager._get_server") @patch("cpyutils.db.DB.create_from_string") - def test_power_on(self, createdb, get_server, get_inf_id, read_auth, get_vms, get_radl): - get_inf_id.return_value = "infid" - read_auth.return_value = {'type': 'InfrastructureManager', 'username': 'user', 'password': 'pass'} + @patch("requests.request") + def test_power_on(self, request, createdb, get_inf_id, read_auth, get_vms, get_radl): + get_inf_id.return_value = "http://server.com/infid" + read_auth.return_value = "type = InfrastructureManager; username = user; password = pass" - server = MagicMock() - server.AddResource.return_value = (True, ['2']) - get_server.return_value = server + resp = MagicMock() + resp.status_code = 200 + request.return_value = resp db = MagicMock() db.sql_query.return_value = True, "", [] @@ -217,19 +242,18 @@ def test_power_on(self, createdb, get_server, get_inf_id, read_auth, get_vms, ge self.assertTrue(res) @patch("cluesplugins.im.powermanager._get_inf_id") - @patch("cluesplugins.im.powermanager._get_server") @patch("cluesplugins.im.powermanager._read_auth_data") @patch("cpyutils.db.DB.create_from_string") @patch("cpyutils.eventloop.now") - def test_power_off(self, now, createdb, read_auth, get_server, get_inf_id): - get_inf_id.return_value = "infid" - read_auth.return_value = {'type': 'InfrastructureManager', 'username': 'user', 'password': 'pass'} + @patch("requests.request") + def test_power_off(self, request, now, createdb, read_auth, get_inf_id): + get_inf_id.return_value = "http://server.com/infid" + read_auth.return_value = "type = InfrastructureManager; username = user; password = pass" now.return_value = 100 - server = MagicMock() - server.StopVM.return_value = (True, ['2']) - server.RemoveResource.return_value = (True, ['2']) - get_server.return_value = server + resp = MagicMock() + resp.status_code = 200 + request.side_effect = [resp, resp, resp, resp] db = MagicMock() db.sql_query.return_value = True, "", [] @@ -237,7 +261,7 @@ def test_power_off(self, now, createdb, read_auth, get_server, get_inf_id): test_im = powermanager() vm = MagicMock() - vm.vm_id = "vmid" + vm.vm_id = "http://server.com/infid/vms/1" radl = """system node-1 ( net_interface.0.dns_name = 'node-#N#' )""" @@ -246,8 +270,7 @@ def test_power_off(self, now, createdb, read_auth, get_server, get_inf_id): res, nname = test_im.power_off('node-1') self.assertTrue(res) self.assertEqual(nname, 'node-1') - self.assertEqual(server.RemoveResource.call_count, 1) - self.assertEqual(server.StopVM.call_count, 0) + self.assertEqual(request.call_args_list[0][0], ('DELETE', 'http://server.com/infid/vms/1')) radl = """system node-1 ( net_interface.0.dns_name = 'node-#N#' and @@ -257,27 +280,27 @@ def test_power_off(self, now, createdb, read_auth, get_server, get_inf_id): res, nname = test_im.power_off('node-1') self.assertTrue(res) self.assertEqual(nname, 'node-1') - self.assertEqual(server.StopVM.call_count, 1) - self.assertEqual(server.RemoveResource.call_count, 1) + self.assertEqual(request.call_args_list[1][0], ('PUT', 'http://server.com/infid/vms/1/stop')) @patch("cluesplugins.im.uuid1") @patch("cluesplugins.im.powermanager._get_inf_id") - @patch("cluesplugins.im.powermanager._get_server") @patch("cluesplugins.im.powermanager._read_auth_data") @patch("cluesplugins.im.powermanager._get_vms") @patch("cpyutils.db.DB.create_from_string") @patch("cpyutils.eventloop.now") - def test_lifecycle(self, now, createdb, get_vms, read_auth, get_server, get_inf_id, uuid1): + @patch("requests.request") + def test_lifecycle(self, request, now, createdb, get_vms, read_auth, get_inf_id, uuid1): now.return_value = 100 vm = MagicMock() - vm.vm_id = "vmid" + vm.vm_id = "http://server.com/infid/vms/1" radl = """system node-1 ( net_interface.0.dns_name = 'node-#N#' and state = 'configured' )""" vm.radl = parse_radl(radl) vm.timestamp_recovered = 0 + vm.timestamp_created = 0 get_vms.return_value = {'node-1': vm} db = MagicMock() @@ -318,19 +341,52 @@ def test_lifecycle(self, now, createdb, get_vms, read_auth, get_server, get_inf_ auth = {'type': 'InfrastructureManager', 'username': 'user', 'password': 'pass'} read_auth.return_value = auth get_inf_id.return_value = "infid" - server = MagicMock() - server.CreateDiskSnapshot.return_value = (True, 'image') - get_server.return_value = server + + resp = MagicMock() + resp.status_code = 200 + resp.text = "image" + request.return_value = resp + uuid1.return_value = "uuid" test_im._store_golden_image = MagicMock() test_im.lifecycle() self.assertEqual(vm.recovered.call_count, 1) - self.assertEqual(server.CreateDiskSnapshot.call_count, 1) - self.assertEqual(server.CreateDiskSnapshot.call_args_list[0][0], ('infid', 'vmid', 0, 'im-uuid', True, auth)) + self.assertEqual(request.call_args_list[0][0], ('PUT', 'http://server.com/infid/vms/1/disks/0/snapshot?image_name=im-uuid&auto_delete=1')) self.assertEqual(test_im._store_golden_image.call_args_list[0][0], ('wn', 'image', 'pass')) + @patch("cluesplugins.im.powermanager._read_auth_data") + @patch("cluesplugins.im.powermanager._get_inf_id") + @patch("cpyutils.db.DB.create_from_string") + @patch("requests.request") + def test_get_template(self, request, createdb, get_inf_id, read_auth): + get_inf_id.return_value = "http://server.com/infid" + read_auth.return_value = "type = InfrastructureManager; username = user; password = pass" + + inf_info = MagicMock() + inf_info.status_code = 200 + inf_info.json.return_value = {"uri-list": [{"uri": "http://server.com/infid/vms/0"}, + {"uri": "http://server.com/infid/vms/1"}]} + tosca_info = MagicMock() + tosca_info.status_code = 200 + with open(os.path.join(self.TESTS_PATH, 'test-files/tosca.yml')) as f: + tosca_info.json.return_value = {"tosca": f.read()} + + request.side_effect = [inf_info, tosca_info] + + db = MagicMock() + db.sql_query.return_value = True, "", [] + createdb.return_value = db + + test_im = powermanager() + res = test_im._get_template('node-2') + + tosca_res = yaml.safe_load(res) + node_template = tosca_res['topology_template']['node_templates']['wn'] + self.assertEqual(node_template['capabilities']['scalable']['properties']['count'], 2) + self.assertEqual(node_template['capabilities']['endpoint']['properties']['dns_name'], 'node-2') + if __name__ == "__main__": unittest.main() diff --git a/test/test_kubernetes.py b/test/test_kubernetes.py index 1f1cc9d..d1c25ef 100644 --- a/test/test_kubernetes.py +++ b/test/test_kubernetes.py @@ -90,8 +90,8 @@ def test_get_jobinfolist(self, request): self.assertEqual(jobs[1].state, Request.SERVED) self.assertEqual(jobs[1].resources.resources.slots, 0.25) self.assertEqual(jobs[1].resources.resources.memory, 134217728) - self.assertEqual(jobs[1].resources.resources.requests, [('(pods_free > 0) && (schedule = 1) && ' + - '(nodeName = "wn-2.localdomain") && ' + + self.assertEqual(jobs[1].resources.resources.requests, [('(pods_free > 0) && (schedule == 1) && ' + + '(nodeName == "wn-2.localdomain") && ' + '(nvidia_gpu >= 1) && (sgx >= 1)')])