diff --git a/IM/ConfManager.py b/IM/ConfManager.py index efc8033ce..5181a281c 100644 --- a/IM/ConfManager.py +++ b/IM/ConfManager.py @@ -47,10 +47,12 @@ class ConfManager(threading.Thread): """ The file with the ansible steps to configure the second step of the the master node """ THREAD_SLEEP_DELAY = 5 - def __init__(self, inf, auth): + def __init__(self, inf, auth, max_ctxt_time = 1e9): threading.Thread.__init__(self) self.inf = inf self.auth = auth + self.init_time = time.time() + self.max_ctxt_time = max_ctxt_time self._stop = False def check_running_pids(self, vms_configuring): @@ -104,16 +106,18 @@ def check_vm_ips(self, timeout = Config.WAIT_RUNNING_VM_TIMEOUT): while not success and wait < timeout: success = True for vm in self.inf.get_vm_list(): - ip = vm.getPublicIP() - if not ip: + if vm.hasPublicNet(): + ip = vm.getPublicIP() + else: ip = vm.getPrivateIP() if not ip: # If the IP is not Available try to update the info vm.update_status(self.auth) - ip = vm.getPublicIP() - if not ip: + if vm.hasPublicNet(): + ip = vm.getPublicIP() + else: ip = vm.getPrivateIP() if not ip: @@ -121,7 +125,7 @@ def check_vm_ips(self, timeout = Config.WAIT_RUNNING_VM_TIMEOUT): break if not success: - ConfManager.logger.warn("Inf ID: " + str(self.inf.id) + ": Error waiting all the VMs to have an IP") + ConfManager.logger.warn("Inf ID: " + str(self.inf.id) + ": Error waiting all the VMs to have a correct IP") wait += self.THREAD_SLEEP_DELAY time.sleep(self.THREAD_SLEEP_DELAY) else: @@ -136,6 +140,10 @@ def run(self): vms_configuring = {} while not self._stop: + if self.init_time + self.max_ctxt_time < time.time(): + ConfManager.logger.debug("Inf ID: " + str(self.inf.id) + ": Max contextualization time passed. Exit thread.") + return + vms_configuring = self.check_running_pids(vms_configuring) # If the queue is empty but there are vms configuring wait and test again @@ -268,12 +276,13 @@ def generate_inventory(self, tmp_dir): ifaces_im_vars = '' for i in range(vm.getNumNetworkIfaces()): iface_ip = vm.getIfaceIP(i) - ifaces_im_vars += ' IM_NODE_NET_' + str(i) + '_IP=' + iface_ip - if vm.getRequestedNameIface(i): - (nodename, nodedom) = vm.getRequestedNameIface(i, default_domain = Config.DEFAULT_DOMAIN) - ifaces_im_vars += ' IM_NODE_NET_' + str(i) + '_HOSTNAME=' + nodename - ifaces_im_vars += ' IM_NODE_NET_' + str(i) + '_DOMAIN=' + nodedom - ifaces_im_vars += ' IM_NODE_NET_' + str(i) + '_FQDN=' + nodename + "." + nodedom + if iface_ip: + ifaces_im_vars += ' IM_NODE_NET_' + str(i) + '_IP=' + iface_ip + if vm.getRequestedNameIface(i): + (nodename, nodedom) = vm.getRequestedNameIface(i, default_domain = Config.DEFAULT_DOMAIN) + ifaces_im_vars += ' IM_NODE_NET_' + str(i) + '_HOSTNAME=' + nodename + ifaces_im_vars += ' IM_NODE_NET_' + str(i) + '_DOMAIN=' + nodedom + ifaces_im_vars += ' IM_NODE_NET_' + str(i) + '_FQDN=' + nodename + "." + nodedom # first try to use the public IP ip = vm.getPublicIP() @@ -333,8 +342,11 @@ def generate_etc_hosts(self, tmp_dir): for vm in vm_group[group]: for i in range(vm.getNumNetworkIfaces()): if vm.getRequestedNameIface(i): - (nodename, nodedom) = vm.getRequestedNameIface(i, default_domain = Config.DEFAULT_DOMAIN) - hosts_out.write(vm.getIfaceIP(i) + " " + nodename + "." + nodedom + " " + nodename + "\r\n") + if vm.getIfaceIP(i): + (nodename, nodedom) = vm.getRequestedNameIface(i, default_domain = Config.DEFAULT_DOMAIN) + hosts_out.write(vm.getIfaceIP(i) + " " + nodename + "." + nodedom + " " + nodename + "\r\n") + else: + ConfManager.logger.warn("Inf ID: " + str(self.inf.id) + ": Net interface " + str(i) + " request a name, but it does not have an IP.") # first try to use the public IP ip = vm.getPublicIP() @@ -488,8 +500,8 @@ def configure_master(self): def wait_master(self): """ - - selecciona la VM master - - espera a que arranque y este accesible por SSH + - Select the master VM + - Wait it to boot and has the SSH port open """ # First assure that ansible is installed in the master if not self.inf.vm_master or self.inf.vm_master.destroy: @@ -547,6 +559,7 @@ def wait_master(self): self.inf.set_configured(True) except: + ConfManager.logger.exception("Inf ID: " + str(self.inf.id) + ": Error waiting the master VM to be running") self.inf.set_configured(False) else: self.inf.set_configured(True) @@ -748,16 +761,11 @@ def wait_vm_ssh_acccess(self, vm, timeout): wait += delay time.sleep(delay) else: - ip = vm.getPrivateIP() - if ip != None: - ConfManager.logger.debug("Inf ID: " + str(self.inf.id) + ": " + 'VM ' + str(vm.id) + ' with private IP: ' + ip) - return False - else: - ConfManager.logger.debug("Inf ID: " + str(self.inf.id) + ": " + 'VM ' + str(vm.id) + ' with no IP') - # Update the VM info and wait to have a valid IP - wait += delay - time.sleep(delay) - vm.update_status(self.auth) + ConfManager.logger.debug("Inf ID: " + str(self.inf.id) + ": " + 'VM ' + str(vm.id) + ' with no IP') + # Update the VM info and wait to have a valid public IP + wait += delay + time.sleep(delay) + vm.update_status(self.auth) # Timeout, return False return False @@ -769,34 +777,41 @@ def change_master_credentials(self, ssh): Arguments: - ssh(:py:class:`IM.SSH`): Object with the authentication data to access the master VM. """ - creds = self.inf.vm_master.getCredentialValues() - (user, _, _, _) = creds - new_creds = self.inf.vm_master.getCredentialValues(new=True) - if len(list(set(new_creds))) > 1 or list(set(new_creds))[0] != None: - change_creds = False - if cmp(new_creds,creds) != 0: - (_, new_passwd, new_public_key, new_private_key) = new_creds - if new_passwd: - ConfManager.logger.info("Changing password to master VM") - (out, err, code) = ssh.execute('sudo bash -c \'echo "' + user + ':' + new_passwd + '" | /usr/sbin/chpasswd && echo "OK"\' 2> /dev/null') - - if code == 0: - change_creds = True - ssh.password = new_passwd - else: - ConfManager.logger.error("Error changing password to master VM. " + out + err) + change_creds = False + try: + creds = self.inf.vm_master.getCredentialValues() + (user, passwd, _, _) = creds + new_creds = self.inf.vm_master.getCredentialValues(new=True) + if len(list(set(new_creds))) > 1 or list(set(new_creds))[0] != None: + change_creds = False + if cmp(new_creds,creds) != 0: + (_, new_passwd, new_public_key, new_private_key) = new_creds + # only change to the new password if there are a previous passwd value + if passwd and new_passwd: + ConfManager.logger.info("Changing password to master VM") + (out, err, code) = ssh.execute('sudo bash -c \'echo "' + user + ':' + new_passwd + '" | /usr/sbin/chpasswd && echo "OK"\' 2> /dev/null') + + if code == 0: + change_creds = True + ssh.password = new_passwd + else: + ConfManager.logger.error("Error changing password to master VM. " + out + err) + + if new_public_key and new_private_key: + ConfManager.logger.info("Changing public key to master VM") + (out, err, code) = ssh.execute('echo ' + new_public_key + ' >> .ssh/authorized_keys') + if code != 0: + ConfManager.logger.error("Error changing public key to master VM. " + out + err) + else: + change_creds = True + ssh.private_key = new_private_key - if new_public_key and new_private_key: - ConfManager.logger.info("Changing public key to master VM") - (out, err, code) = ssh.execute('echo ' + new_public_key + ' >> .ssh/authorized_keys') - if code != 0: - ConfManager.logger.error("Error changing public key to master VM. " + out + err) - else: - change_creds = True - ssh.private_key = new_private_key + if change_creds: + self.inf.vm_master.info.systems[0].updateNewCredentialValues() + except: + ConfManager.logger.exception("Error changing credentials to master VM.") - if change_creds: - self.inf.vm_master.info.systems[0].updateNewCredentialValues() + return change_creds def call_ansible(self, tmp_dir, inventory, playbook, ssh): """ diff --git a/IM/InfrastructureInfo.py b/IM/InfrastructureInfo.py index 926da31de..c64fea758 100644 --- a/IM/InfrastructureInfo.py +++ b/IM/InfrastructureInfo.py @@ -160,8 +160,10 @@ def get_vm(self, str_vm_id): """ Get the VM with the specified ID (if it is not destroyed) """ - - vm_id = int(str_vm_id) + try: + vm_id = int(str_vm_id) + except: + raise IncorrectVMException() if vm_id >= 0 and vm_id < len(self.vm_list): vm = self.vm_list[vm_id] if not vm.destroy: @@ -347,6 +349,10 @@ def Contextualize(self, auth): # get the contextualize steps specified in the RADL, or use the default value contextualizes = self.radl.contextualize.get_contextualize_items_by_step({1:ctxts}) + max_ctxt_time = self.radl.contextualize.max_time + if not max_ctxt_time: + max_ctxt_time = Config.MAX_CONTEXTUALIZATION_TIME + ctxt_task = [] ctxt_task.append((-2,0,self,['wait_master', 'check_vm_ips'])) ctxt_task.append((-1,0,self,['configure_master', 'generate_playbooks_and_hosts'])) @@ -372,5 +378,7 @@ def Contextualize(self, auth): self.add_ctxt_tasks(ctxt_task) if self.cm is None or not self.cm.isAlive(): - self.cm = ConfManager.ConfManager(self,auth) + self.cm = ConfManager.ConfManager(self,auth,max_ctxt_time) self.cm.start() + else: + self.cm.init_time = time.time() diff --git a/IM/InfrastructureManager.py b/IM/InfrastructureManager.py index 030b486d8..69b9dc577 100755 --- a/IM/InfrastructureManager.py +++ b/IM/InfrastructureManager.py @@ -21,7 +21,6 @@ import threading import string import random -#from multiprocessing.pool import ThreadPool from VMRC import VMRC from CloudInfo import CloudInfo @@ -36,6 +35,9 @@ from config import Config +if Config.MAX_SIMULTANEOUS_LAUNCHES > 1: + from multiprocessing.pool import ThreadPool + class IncorrectInfrastructureException(Exception): """ Invalid infrastructure ID or access not granted. """ @@ -174,7 +176,7 @@ def _launch_group(sel_inf, deploy_group, deploys_group_cloud_list, cloud_list, c not cancel_deployment): concrete_system = concrete_systems[cloud_id][deploy.id][0] if not concrete_system: - InfrastructureManager.logger.error("Error, no concrete system to deploy: " + deploy.id + ". Check if a correct image is being used") + InfrastructureManager.logger.error("Error, no concrete system to deploy: " + deploy.id + " in cloud: " + cloud_id + ". Check if a correct image is being used") exceptions.append("Error, no concrete system to deploy: " + deploy.id + ". Check if a correct image is being used") break @@ -215,7 +217,10 @@ def _launch_group(sel_inf, deploy_group, deploys_group_cloud_list, cloud_list, c if cancel_deployment or all_ok: break if not all_ok and not cancel_deployment: - cancel_deployment.append(Exception("All machines could not be launched: %s" % exceptions)) + msg = "" + for i, e in enumerate(exceptions): + msg += "Attempt " + str(i+1) + ": " + str(e) + "\n" + cancel_deployment.append(Exception("All machines could not be launched: \n%s" % msg)) @staticmethod def get_infrastructure(inf_id, auth): @@ -363,6 +368,7 @@ def AddResource(inf_id, radl_data, auth, context = True, failed_clouds = []): # If any deploy is defined, only update definitions. if not radl.deploys: sel_inf.update_radl(radl, []) + InfrastructureManager.logger.debug("Infrastructure without any deploy. Exiting.") return [] for system in radl.systems: @@ -454,7 +460,7 @@ def AddResource(inf_id, radl_data, auth, context = True, failed_clouds = []): scored_clouds = [ (cloud_id, 1) for cloud_id, _ in cloud_list0 ] ordered_cloud_list = [ c.id for c in CloudInfo.get_cloud_list(auth) ] - # reverse the list to use the reverse order inthe sort function + # reverse the list to use the reverse order in the sort function ordered_cloud_list.reverse() # Order the clouds first by the score and then using the cloud order in the auth data sorted_scored_clouds = sorted(scored_clouds, key=lambda x: (x[1], ordered_cloud_list.index(x[0])), reverse=True) @@ -464,15 +470,17 @@ def AddResource(inf_id, radl_data, auth, context = True, failed_clouds = []): deployed_vm = {} cancel_deployment = [] try: - #pool = ThreadPool(processes=Config.MAX_SIMULTANEOUS_LAUNCHES) - #pool.map( - # lambda ds: InfrastructureManager._launch_group(sel_inf - # ds, deploys_group_cloud_list[id(ds)], cloud_list, concrete_systems, - # radl, auth, deployed_vm, cancel_deployment), deploy_groups) - for ds in deploy_groups: - InfrastructureManager._launch_group(sel_inf, - ds, deploys_group_cloud_list[id(ds)], cloud_list, concrete_systems, - radl, auth, deployed_vm, cancel_deployment) + if Config.MAX_SIMULTANEOUS_LAUNCHES > 1: + pool = ThreadPool(processes=Config.MAX_SIMULTANEOUS_LAUNCHES) + pool.map( + lambda ds: InfrastructureManager._launch_group(sel_inf, + ds, deploys_group_cloud_list[id(ds)], cloud_list, concrete_systems, + radl, auth, deployed_vm, cancel_deployment), deploy_groups) + else: + for ds in deploy_groups: + InfrastructureManager._launch_group(sel_inf, + ds, deploys_group_cloud_list[id(ds)], cloud_list, concrete_systems, + radl, auth, deployed_vm, cancel_deployment) except Exception, e: # Please, avoid exception to arrive to this level, because some virtual # machine may lost. @@ -492,7 +500,10 @@ def AddResource(inf_id, radl_data, auth, context = True, failed_clouds = []): # If error, all deployed virtual machine will be undeployed. for vm in new_vms: vm.finalize(auth) - raise Exception("Some deploys did not proceed successfully: %s" % cancel_deployment) + msg = "" + for e in cancel_deployment: + msg += str(e) + "\n" + raise Exception("Some deploys did not proceed successfully: %s" % msg) for vm in new_vms: @@ -661,7 +672,7 @@ def AlterVM(inf_id, vm_id, radl_data, auth): exception = None try: - (success, alter_res) = vm.alter(vm, radl, auth) + (success, alter_res) = vm.alter(radl, auth) except Exception, e: exception = e InfrastructureManager.save_data() @@ -692,7 +703,16 @@ def GetInfrastructureRADL(inf_id, auth): sel_inf = InfrastructureManager.get_infrastructure(inf_id, auth) InfrastructureManager.logger.info("RADL obtained successfully") - InfrastructureManager.logger.debug(str(sel_inf.radl)) + # remove the F0000__FAKE_SYSTEM__ deploys + # TODO: Do in a better way + radl = sel_inf.radl.clone() + deploys = [] + for deploy in radl.deploys: + if not deploy.id.startswith("F0000__FAKE_SYSTEM_"): + deploys.append(deploy) + radl.deploys = deploys + + InfrastructureManager.logger.debug(str(radl)) return str(sel_inf.radl) @staticmethod @@ -735,7 +755,7 @@ def GetInfrastructureContMsg(inf_id, auth): InfrastructureManager.logger.info("Getting cont msg of the inf: " + str(inf_id)) sel_inf = InfrastructureManager.get_infrastructure(inf_id, auth) - res = sel_inf.cont_out + "\n\n".join([vm.cont_out for vm in sel_inf.get_vm_list()]) + res = sel_inf.cont_out + "\n\n".join([vm.cont_out for vm in sel_inf.get_vm_list() if vm.cont_out]) InfrastructureManager.logger.debug(res) return res @@ -769,7 +789,10 @@ def StopInfrastructure(inf_id, auth): exceptions.append(msg) if exceptions: - raise Exception("Error stopping the infrastructure: %s" % "\n".join(exceptions)) + msg = "" + for e in exceptions: + msg += str(e) + "\n" + raise Exception("Error stopping the infrastructure: %s" % msg) InfrastructureManager.logger.info("Infrastructure successfully stopped") return "" @@ -803,7 +826,10 @@ def StartInfrastructure(inf_id, auth): exceptions.append(msg) if exceptions: - raise Exception("Error starting the infrastructure: %s" % "\n".join(exceptions)) + msg = "" + for e in exceptions: + msg += str(e) + "\n" + raise Exception("Error starting the infrastructure: %s" % msg) InfrastructureManager.logger.info("Infrastructure successfully restarted") return "" @@ -815,12 +841,24 @@ def remove_old_inf(): with InfrastructureManager._lock: items_to_delete = [] for infId, inf in InfrastructureManager.infrastructure_list.items(): - if inf.deleted and len(InfrastructureManager.infrastructure_list) - infId >= Config.MAX_INF_STORED: + if inf.deleted and InfrastructureManager.global_inf_id - infId >= Config.MAX_INF_STORED: items_to_delete.append(infId) for item in items_to_delete: del InfrastructureManager.infrastructure_list[item] + @staticmethod + def _delete_vm(vm, auth, exceptions): + try: + success = False + InfrastructureManager.logger.debug("Finalizing the VM id: " + str(vm.id)) + (success, msg) = vm.finalize(auth) + except Exception, e: + msg = str(e) + if not success: + InfrastructureManager.logger.info("The VM cannot be finalized") + exceptions.append(msg) + @staticmethod def DestroyInfrastructure(inf_id, auth): """ @@ -838,20 +876,23 @@ def DestroyInfrastructure(inf_id, auth): sel_inf = InfrastructureManager.get_infrastructure(inf_id, auth) exceptions = [] - # If IM server is the first VM, then it will be the last destroyed - for vm in reversed(sel_inf.get_vm_list()): - try: - success = False - InfrastructureManager.logger.debug("Finalizing the VM id: " + str(vm.id)) - (success, msg) = vm.finalize(auth) - except Exception, e: - msg = str(e) - if not success: - InfrastructureManager.logger.info("The VM cannot be finalized") - exceptions.append(msg) + + if Config.MAX_SIMULTANEOUS_LAUNCHES > 1: + pool = ThreadPool(processes=Config.MAX_SIMULTANEOUS_LAUNCHES) + pool.map( + lambda vm: InfrastructureManager._delete_vm(vm, auth, exceptions), + reversed(sel_inf.get_vm_list()) + ) + else: + # If IM server is the first VM, then it will be the last destroyed + for vm in reversed(sel_inf.get_vm_list()): + InfrastructureManager._delete_vm(vm, auth, exceptions) if exceptions: - raise Exception("Error destroying the infrastructure: %s" % "\n".join(exceptions)) + msg = "" + for e in exceptions: + msg += str(e) + "\n" + raise Exception("Error destroying the infrastructure: \n%s" % msg) sel_inf.delete() InfrastructureManager.remove_old_inf() @@ -907,6 +948,9 @@ def CreateInfrastructure(radl, auth): # First check if it is configured to check the users from a list if not InfrastructureManager.check_im_user(auth.getAuthInfo("InfrastructureManager")): raise UnauthorizedUserException() + + if not auth.getAuthInfo("InfrastructureManager"): + raise Exception("No credentials provided for the InfrastructureManager") # Create a new infrastructure inf = InfrastructureInfo.InfrastructureInfo() @@ -955,7 +999,7 @@ def ExportInfrastructure(inf_id, delete, auth_data): auth = Authentication(auth_data) sel_inf = InfrastructureManager.get_infrastructure(inf_id, auth) - str_inf = pickle.dumps(sel_inf) + str_inf = pickle.dumps(sel_inf, 2) InfrastructureManager.logger.info("Exporting infrastructure id: " + str(sel_inf.id)) if delete: sel_inf.deleted = True @@ -998,8 +1042,8 @@ def save_data(): if not InfrastructureManager._exiting: try: data_file = open(Config.DATA_FILE, 'wb') - pickle.dump(InfrastructureManager.global_inf_id, data_file) - pickle.dump(InfrastructureManager.infrastructure_list, data_file) + pickle.dump(InfrastructureManager.global_inf_id, data_file, 2) + pickle.dump(InfrastructureManager.infrastructure_list, data_file, 2) data_file.close() except Exception, ex: InfrastructureManager.logger.exception("ERROR saving data to the file: " + Config.DATA_FILE + ". Changes not stored!!") diff --git a/IM/__init__.py b/IM/__init__.py index b8bc4311c..29e5122aa 100644 --- a/IM/__init__.py +++ b/IM/__init__.py @@ -16,6 +16,6 @@ __all__ = ['auth','bottle','CloudManager','config','ConfManager','db','ganglia','HTTPHeaderTransport','ImageManager','InfrastructureInfo','InfrastructureManager','parsetab','radl','recipe','request','REST','SSH','timedcall','uriparse','VMRC','xmlobject'] -__version__ = '1.2.0' +__version__ = '1.2.1' __author__ = 'Miguel Caballer' diff --git a/IM/ansible/ansible_launcher.py b/IM/ansible/ansible_launcher.py index 091641b41..7263320b7 100755 --- a/IM/ansible/ansible_launcher.py +++ b/IM/ansible/ansible_launcher.py @@ -19,7 +19,7 @@ # Miguel Caballer: file based on the ansible-playbook - +import time import os import threading from StringIO import StringIO @@ -54,13 +54,6 @@ def launch_playbook(playbook_file, host, passwd, threads, pk_file = None, retrie options, _ = parser.parse_args([]) - if inventory_file: - inventory = ansible.inventory.Inventory(inventory_file) - else: - inventory = ansible.inventory.Inventory(options.inventory) - - inventory.subset(host) - sshpass = None sudopass = None options.sudo_user = options.sudo_user or C.DEFAULT_SUDO_USER @@ -85,9 +78,16 @@ def launch_playbook(playbook_file, host, passwd, threads, pk_file = None, retrie return_code = 4 hosts_with_errors = [] while return_code != 0 and num_retries < retries: + time.sleep(5*num_retries) num_retries += 1 return_code = 0 + if inventory_file: + inventory = ansible.inventory.Inventory(inventory_file) + else: + inventory = ansible.inventory.Inventory(options.inventory) + + inventory.subset(host) # let inventory know which playbooks are using so it can know the basedirs inventory.set_playbook_basedir(os.path.dirname(playbook_file)) @@ -118,10 +118,9 @@ def launch_playbook(playbook_file, host, passwd, threads, pk_file = None, retrie diff=options.diff ) - failed_hosts = [] - unreachable_hosts = [] - try: + failed_hosts = [] + unreachable_hosts = [] pb.run() diff --git a/IM/auth.py b/IM/auth.py index d2726f55a..240042d8f 100644 --- a/IM/auth.py +++ b/IM/auth.py @@ -51,6 +51,7 @@ def getAuthInfo(self, auth_type): for auth in self.auth_list: if auth['type'] == auth_type: res.append(auth) + break return res def getAuthInfoByID(self, auth_id): @@ -84,11 +85,13 @@ def compare(self, other_auth, auth_type): for auth in self.auth_list: if auth['type'] == auth_type: auth_with_type = auth + break other_auth_with_type = None for auth in other_auth.auth_list: if auth['type'] == auth_type: other_auth_with_type = auth + break if auth_with_type != None and other_auth_with_type != None: if len(auth_with_type) != len(other_auth_with_type): diff --git a/IM/config.py b/IM/config.py index 46ff66594..95a4eca9e 100644 --- a/IM/config.py +++ b/IM/config.py @@ -18,6 +18,21 @@ import os import logging +def parse_options(config, section_name, config_class): + options = config.options(section_name) + for option in options: + option = option.upper() + if option in config_class.__dict__ and not option.startswith("__"): + if isinstance(config_class.__dict__[option], bool): + config_class.__dict__[option] = config.getboolean(section_name, option) + elif isinstance(config_class.__dict__[option], int): + config_class.__dict__[option] = config.getint(section_name, option) + else: + config_class.__dict__[option] = config.get(section_name, option) + else: + logger = logging.getLogger('InfrastructureManager') + logger.warn("Unknown option in the IM config file. Ignoring it: " + option) + class Config: DEFAULT_VM_MEMORY = 512 @@ -59,21 +74,18 @@ class Config: VM_INFO_UPDATE_FREQUENCY = 10 REMOTE_CONF_DIR = "/tmp/.im" MAX_SSH_ERRORS = 5 + PRIVATE_NET_AS_PUBLIC = '' config = ConfigParser.ConfigParser() config.read([Config.IM_PATH + '/../im.cfg', Config.IM_PATH + '/../etc/im.cfg', '/etc/im/im.cfg']) section_name = "im" -options = config.options(section_name) -for option in options: - option = option.upper() - if option in Config.__dict__ and not option.startswith("__"): - if isinstance(Config.__dict__[option], bool): - Config.__dict__[option] = config.getboolean(section_name, option) - elif isinstance(Config.__dict__[option], int): - Config.__dict__[option] = config.getint(section_name, option) - else: - Config.__dict__[option] = config.get(section_name, option) - else: - logger = logging.getLogger('InfrastructureManager') - logger.warn("Unknown option in the IM config file. Ignoring it: " + option) \ No newline at end of file +if config.has_section(section_name): + parse_options(config, section_name, Config) + +class ConfigOpenNebula: + TEMPLATE_CONTEXT = '' + TEMPLATE_OTHER = 'GRAPHICS = [type="vnc",listen="0.0.0.0"]' + +if config.has_section("OpenNebula"): + parse_options(config, 'OpenNebula', ConfigOpenNebula) \ No newline at end of file diff --git a/IM/radl/radl.py b/IM/radl/radl.py index bc99ba18d..eb8c4bdfd 100644 --- a/IM/radl/radl.py +++ b/IM/radl/radl.py @@ -17,6 +17,7 @@ import copy import socket,struct from distutils.version import LooseVersion +from IM.config import Config def UnitToValue(unit): """Return the value of an unit.""" @@ -636,7 +637,9 @@ def check(self, radl): class network(Features, Aspect): """Store a RADL ``network``.""" - private_net_masks = ["10.0.0.0/8","172.16.0.0/12","192.168.0.0/16","169.254.0.0/16"] + private_net_masks = ["10.0.0.0/8","172.16.0.0/12","192.168.0.0/16","169.254.0.0/16","100.64.0.0/10","192.0.0.0/24","198.18.0.0/15"] + if Config.PRIVATE_NET_AS_PUBLIC in private_net_masks: + private_net_masks.remove(Config.PRIVATE_NET_AS_PUBLIC) def __init__(self, name, features=None, reference=False, line=None): self.id = name @@ -680,7 +683,7 @@ def check(self, radl): SIMPLE_FEATURES = { "outbound": (str, ["YES", "NO"]), - "outports": (str, check_outports_format), + "outports": (str, check_outports_format) } self.check_simple(SIMPLE_FEATURES, radl) diff --git a/IM/xmlobject.py b/IM/xmlobject.py index f4b9d0460..899f591bc 100644 --- a/IM/xmlobject.py +++ b/IM/xmlobject.py @@ -19,6 +19,9 @@ import os class XMLObject: + """ + Class to easily parse XML documents + """ tuples = {} tuples_lists = {} attributes = [] @@ -38,20 +41,20 @@ def to_xml(self, node_name = None): res += ">\n" - for tag, className in self.__class__.tuples.items(): + for tag, _ in self.__class__.tuples.items(): if self.__dict__[tag] != None: res += self.__dict__[tag].to_xml(tag) - for tag, className in self.__class__.tuples_lists.items(): + for tag, _ in self.__class__.tuples_lists.items(): if self.__dict__[tag] != None: - list = self.__dict__[tag] - for obj in list: + obj_list = self.__dict__[tag] + for obj in obj_list: res += obj.to_xml(tag) for tag in self.__class__.values_lists: if self.__dict__[tag] != None: - list = self.__dict__[tag] - for value in list: + obj_list = self.__dict__[tag] + for value in obj_list: if value != None and len(str(value)) > 0: res += "<" + tag + ">" + value + "\n" @@ -102,22 +105,25 @@ def __setattr__(self, name, value): self.__dict__[name] = value def __init__(self, input_str): - if os.path.isfile(input_str): - f = open(input_str) - xml_str = "" - for line in f.readlines(): - xml_str += line + if isinstance(input_str, xml.dom.minidom.Element): + dom = input_str else: - xml_str = input_str - - dom = xml.dom.minidom.parseString(xml_str) + if os.path.isfile(input_str): + f = open(input_str) + xml_str = "" + for line in f.readlines(): + xml_str += line + else: + xml_str = input_str + + dom = xml.dom.minidom.parseString(xml_str).documentElement for tag, className in self.__class__.tuples.items(): - objs = self.getChildByTagName(dom.documentElement, tag) + objs = self.getChildByTagName(dom, tag) if (len(objs) > 0): - newObj = className(objs[0].toxml()) + newObj = className(objs[0]) try: - dom.childNodes[0].removeChild(objs[0]) + dom.removeChild(objs[0]) except: pass else: @@ -125,11 +131,11 @@ def __init__(self, input_str): self.__setattr__(tag, newObj) for tag, className in self.__class__.tuples_lists.items(): - objs = self.getChildByTagName(dom.documentElement, tag) + objs = self.getChildByTagName(dom, tag) obj_list = [] for obj in objs: - newObj = className(obj.toxml()) - dom.childNodes[0].removeChild(obj) + newObj = className(obj) + dom.removeChild(obj) obj_list.append(newObj) self.__setattr__(tag, obj_list) @@ -140,14 +146,14 @@ def __init__(self, input_str): value = XMLObject.handleField(tag, dom) if (value is None): value = self.noneval - if (tag in self.__class__.numeric): + if (tag in self.__class__.numeric and value is not None): try: value = float(value) if (value == int(value)): value = int(value) except: - logging.error("se esperaba un valor numerico para %s y se encontro %s" % (tag, value)) + logging.error("Incorrect type for %s i must be numeric but it is %s" % (tag, value)) self.__setattr__(tag, value) for tag in self.__class__.attributes: - self.__setattr__(tag, dom.documentElement.getAttribute(tag)) + self.__setattr__(tag, dom.getAttribute(tag)) diff --git a/INSTALL b/INSTALL index 1e0d680e0..80335e10e 100644 --- a/INSTALL +++ b/INSTALL @@ -2,7 +2,7 @@ 1.1 REQUISITES -IM is based on python, so Python 2.4 or higher runtime and standard library must +IM is based on python, so Python 2.6 or higher runtime and standard library must be installed in the system. If you will use pip to install the IM, all the requisites will be installed. diff --git a/README.md b/README.md index f34b27647..51e164f5d 100644 --- a/README.md +++ b/README.md @@ -23,7 +23,7 @@ of the functionality of the platform: [YouTube IM channel](https://www.youtube.c 1.1 REQUISITES -------------- -IM is based on Python, so Python 2.4 or higher runtime and standard library must +IM is based on Python, so Python 2.6 or higher runtime and standard library must be installed in the system. If you use pip to install the IM, all the requisites will be installed. diff --git a/changelog b/changelog index 14cd5c577..7dafa3c84 100644 --- a/changelog +++ b/changelog @@ -71,3 +71,18 @@ IM 1.2.0 * Add GetInfrastructureContMsg function to the API * Add GetVMContMsg function to the API * Add new state 'unconfigured' + +IM 1.2.1 + * Add SSH keygen funtion to GCE connector + * Add PRIVATE_NET_AS_PUBLIC configuration variable + * Add MAX_SIMULTANEOUS_LAUNCHES configuration variable + * Add Azure connector + * Update EC2 instance features + * Update documentation to specify python version 2.6 + * Add provider_ids as network parameter in RADL + * Add support to VPC in EC2 connector + * Implement AlterVM in OpenNebula and EC2 connectors + * Add DeployedNode connector to enable to configure nodes not deployed with the IM (i.e. physical nodes) + * Minor bugfixed in InfrastructureManager class + * Improve error messages + * Bugfixes in OpenNebula connector diff --git a/connectors/Azure.py b/connectors/Azure.py new file mode 100644 index 000000000..b77c28b3f --- /dev/null +++ b/connectors/Azure.py @@ -0,0 +1,903 @@ +# IM - Infrastructure Manager +# Copyright (C) 2011 - GRyCAP - Universitat Politecnica de Valencia +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . + +import base64 +import httplib +import time +import os +import tempfile +from IM.xmlobject import XMLObject +from IM.uriparse import uriparse +from IM.VirtualMachine import VirtualMachine +from CloudConnector import CloudConnector +from IM.radl.radl import UserPassCredential, Feature +from IM.config import Config + +# Set of classes to parse the output of the REST API +class Endpoint(XMLObject): + values = ['Name', 'Vip', 'PublicPort', 'LocalPort', 'Protocol'] + +class InstanceEndpoints(XMLObject): + tuples_lists = { 'InstanceEndpoint': Endpoint } + +class DataVirtualHardDisk(XMLObject): + values = ['DiskName', 'Lun'] + +class DataVirtualHardDisks(XMLObject): + tuples_lists = { 'DataVirtualHardDisk': DataVirtualHardDisk } + +class Role(XMLObject): + values = ['RoleName', 'RoleType'] + tuples = { 'DataVirtualHardDisks': DataVirtualHardDisks } + +class RoleList(XMLObject): + tuples_lists = { 'Role': Role } + +class RoleInstance(XMLObject): + values = ['RoleName', 'InstanceStatus', 'InstanceSize', 'InstanceName', 'IpAddress', 'PowerState'] + tuples = { 'InstanceEndpoints': InstanceEndpoints } + +class RoleInstanceList(XMLObject): + tuples_lists = { 'RoleInstance': RoleInstance } + +class Deployment(XMLObject): + tuples = { 'RoleInstanceList': RoleInstanceList, 'RoleList': RoleList } + values = ['Name', 'Status', 'Url'] + +class StorageServiceProperties(XMLObject): + values = ['Description', 'Location', 'Label', 'Status', 'GeoReplicationEnabled', 'CreationTime', 'GeoPrimaryRegion', 'GeoSecondaryRegion'] + +class StorageService(XMLObject): + values = ['Url', 'ServiceName'] + tuples = { 'StorageServiceProperties': StorageServiceProperties } + +class InstanceTypeInfo: + """ + Information about the instance type + + Args: + - name(str, optional): name of the type of the instance + - cpu_arch(list of str, optional): cpu architectures supported + - num_cpu(int, optional): number of cpus + - cores_per_cpu(int, optional): number of cores per cpu + - mem(int, optional): amount of memory + - price(int, optional): price per hour + - disks(int, optional): number of disks + - disk_space(int, optional): size of the disks + """ + def __init__(self, name = "", cpu_arch = ["i386"], num_cpu = 1, cores_per_cpu = 1, mem = 0, price = 0, disks = 0, disk_space = 0): + self.name = name + self.num_cpu = num_cpu + self.cores_per_cpu = cores_per_cpu + self.mem = mem + self.cpu_arch = cpu_arch + self.price = price + self.disks = disks + self.disk_space = disk_space + +class AzureCloudConnector(CloudConnector): + """ + Cloud Launcher to the Azure platform + Using the Service Management REST API Reference: + https://msdn.microsoft.com/en-us/library/azure/ee460799.aspx + """ + + type = "Azure" + """str with the name of the provider.""" + INSTANCE_TYPE = 'ExtraSmall' + """Default instance type.""" + AZURE_SERVER = "management.core.windows.net" + """Address of the server with the Service Management REST API.""" + AZURE_PORT = 443 + """Port of the server with the Service Management REST API.""" + STORAGE_NAME = "infmanager" + """Name of the storage account the IM will create""" + DEFAULT_LOCATION = "West Europe" + """Default location to use""" + ROLE_NAME= "IMVMRole" + """Name of the Role""" + + DEPLOY_STATE_MAP = { + 'Running': VirtualMachine.RUNNING, + 'Suspended': VirtualMachine.STOPPED, + 'SuspendedTransitioning': VirtualMachine.STOPPED, + 'RunningTransitioning': VirtualMachine.RUNNING, + 'Starting': VirtualMachine.PENDING, + 'Suspending': VirtualMachine.STOPPED, + 'Deploying': VirtualMachine.PENDING, + 'Deleting': VirtualMachine.OFF, + } + + ROLE_STATE_MAP = { + 'Starting': VirtualMachine.PENDING, + 'Started': VirtualMachine.RUNNING, + 'Stopping': VirtualMachine.STOPPED, + 'Stopped': VirtualMachine.STOPPED, + 'Unknown': VirtualMachine.UNKNOWN + } + + def concreteSystem(self, radl_system, auth_data): + if radl_system.getValue("disk.0.image.url"): + url = uriparse(radl_system.getValue("disk.0.image.url")) + protocol = url[0] + if protocol == "azr": + res_system = radl_system.clone() + instance_type = self.get_instance_type(res_system) + if not instance_type: + self.logger.error("Error launching the VM, no instance type available for the requirements.") + self.logger.debug(res_system) + return [] + else: + res_system.addFeature(Feature("cpu.count", "=", instance_type.num_cpu * instance_type.cores_per_cpu), conflict="other", missing="other") + res_system.addFeature(Feature("memory.size", "=", instance_type.mem, 'M'), conflict="other", missing="other") + if instance_type.disks > 0: + res_system.addFeature(Feature("disks.free_size", "=", instance_type.disks * instance_type.disk_space, 'G'), conflict="other", missing="other") + for i in range(1,instance_type.disks+1): + res_system.addFeature(Feature("disk.%d.free_size" % i, "=", instance_type.disk_space, 'G'), conflict="other", missing="other") + res_system.addFeature(Feature("price", "=", instance_type.price), conflict="me", missing="other") + + res_system.addFeature(Feature("instance_type", "=", instance_type.name), conflict="other", missing="other") + + res_system.addFeature(Feature("provider.type", "=", self.type), conflict="other", missing="other") + + username = res_system.getValue('disk.0.os.credentials.username') + if not username: + res_system.setValue('disk.0.os.credentials.username','azureuser') + + res_system.updateNewCredentialValues() + + return [res_system] + else: + return [] + else: + return [radl_system.clone()] + + def gen_input_endpoints(self, radl): + """ + Gen the InputEndpoints part of the XML of the VM creation + using the outports field of the RADL network + """ + # SSH port must be allways available + res = """ + + + 22 + SSH + 22 + TCP + """ + + public_net = None + for net in radl.networks: + if net.isPublic(): + public_net = net + + if public_net: + outports = public_net.getOutPorts() + if outports: + for remote_port,remote_protocol,local_port,local_protocol in outports: + if local_port != 22: + protocol = remote_protocol + if remote_protocol != local_protocol: + self.logger.warn("Diferent protocols used in outports ignoring local port protocol!") + + res += """ + + %d + Port %d + %d + %s + """ % (local_port, local_port, remote_port, protocol.upper()) + + res += "\n " + return res + + def gen_configuration_set(self, hostname, system): + """ + Gen the ConfigurationSet part of the XML of the VM creation + """ + # Allways use the new credentials + system.updateNewCredentialValues() + credentials = system.getCredentials() + + if system.getValue("disk.0.os.name") == "windows": + ConfigurationSet = ''' + + WindowsProvisioningConfiguration + %s + %s + %s + true + false +''' % (hostname, credentials.password, credentials.username) + else: + if isinstance(credentials, UserPassCredential): + ConfigurationSet = ''' + + LinuxProvisioningConfiguration + %s + %s + %s + false + ''' % (hostname, credentials.username, credentials.password) + else: + ConfigurationSet = ''' + + LinuxProvisioningConfiguration + %s + %s + %s + true + + + + %s + /home/%s/.ssh/authorized_keys + + + + + %s + /home/%s/.ssh/id_rsa + + + + ''' % (hostname, credentials.username, "Pass+Not-Used1", + credentials.public_key, credentials.username, + credentials.public_key, credentials.username) + + return ConfigurationSet + + def gen_data_disks(self, system, storage_account): + """ + Gen the DataVirtualHardDisks part of the XML of the VM creation + """ + + disks = "" + cont = 1 + while system.getValue("disk." + str(cont) + ".size") and system.getValue("disk." + str(cont) + ".device"): + disk_size = system.getFeature("disk." + str(cont) + ".size").getValue('G') + + disk_name = "datadisk-1-" + str(int(time.time()*100)) + disks += ''' + + + ReadWrite + %d + %d + https://%s.blob.core.windows.net/vhds/%s.vhd + + ''' % (cont, int(disk_size), storage_account, disk_name) + + cont +=1 + + return disks + + def get_azure_vm_create_xml(self, vm, storage_account, radl, num): + """ + Generate the XML to create the VM + """ + system = radl.systems[0] + name = system.getValue("disk.0.image.name") + if not name: + name = "userimage" + url = uriparse(system.getValue("disk.0.image.url")) + + label = name + " IM created VM" + (hostname, _) = vm.getRequestedName(default_hostname = Config.DEFAULT_VM_NAME, default_domain = Config.DEFAULT_DOMAIN) + + if not hostname: + hostname = "AzureNode" + + SourceImageName = url[1] + MediaLink = "https://%s.blob.core.windows.net/vhds/%s.vhd" % (storage_account, SourceImageName) + instance_type = self.get_instance_type(system) + + DataVirtualHardDisks = self.gen_data_disks(system, storage_account) + ConfigurationSet = self.gen_configuration_set(hostname, system) + InputEndpoints = self.gen_input_endpoints(radl) + + res = ''' + + %s + Production + + + + %s + + PersistentVMRole + + %s + + NetworkConfiguration + %s + + + %s + + %s + %s + + %s + + + + ''' % (vm.id, label, self.ROLE_NAME, ConfigurationSet, InputEndpoints, + DataVirtualHardDisks, MediaLink, SourceImageName, instance_type.name) + + self.logger.debug("Azure VM Create XML: " + res) + + return res + + def get_user_subscription_id(self, auth_data): + """ + Get the Azure subscription ID from the auth data + """ + auth = auth_data.getAuthInfo(AzureCloudConnector.type) + if auth and 'username' in auth[0]: + return auth[0]['username'] + else: + return None + + def get_user_cert_data(self, auth_data): + """ + Get the Azure private_key and public_key files from the auth data + """ + auth = auth_data.getAuthInfo(AzureCloudConnector.type) + if auth and 'public_key' in auth[0] and 'private_key' in auth[0]: + certificate = auth[0]['public_key'] + fd, cert_file = tempfile.mkstemp() + os.write(fd, certificate) + os.close(fd) + os.chmod(cert_file,0644) + + private_key = auth[0]['private_key'] + fd, key_file = tempfile.mkstemp() + os.write(fd, private_key) + os.close(fd) + os.chmod(key_file,0600) + + return (cert_file, key_file) + else: + return None + + def create_service(self, subscription_id, cert_file, key_file, region): + """ + Create a Azure Cloud Service and return the name + """ + service_name = "IM-" + str(int(time.time()*100)) + self.logger.info("Create the service " + service_name + " in region: " + region) + + try: + + conn = httplib.HTTPSConnection(self.AZURE_SERVER, self.AZURE_PORT, key_file=key_file, cert_file=cert_file) + uri = "https://%s/%s/services/hostedservices" % (self.AZURE_SERVER,subscription_id) + service_create_xml = ''' + + %s + + Service %s created by the IM + %s + + ''' % (service_name, base64.b64encode(service_name), service_name, region ) + conn.request('POST', uri, body = service_create_xml, headers = {'x-ms-version' : '2013-03-01', 'Content-Type' : 'application/xml'}) + resp = conn.getresponse() + output = resp.read() + conn.close() + except Exception: + self.logger.exception("Error creating the service") + return None + + if resp.status != 201: + self.logger.error("Error creating the service: Error code: " + str(resp.status) + ". Msg: " + output) + return None + + return service_name + + def delete_service(self, service_name, subscription_id, cert_file, key_file): + """ + Delete the Azure Cloud Service with name "service_name" + """ + try: + conn = httplib.HTTPSConnection(self.AZURE_SERVER, self.AZURE_PORT, cert_file=cert_file, key_file=key_file) + uri = "/%s/services/hostedservices/%s?comp=media" % (subscription_id, service_name) + conn.request('DELETE', uri, headers = {'x-ms-version' : '2013-08-01'}) + resp = conn.getresponse() + output = resp.read() + conn.close() + except Exception, ex: + self.logger.exception("Error deleting the service") + return (False, "Error deleting the service: " + str(ex)) + + if resp.status != 202: + self.logger.error("Error deleting the service: Error Code " + str(resp.status) + ". Msg: " + output) + return (False, "Error deleting the service: Error Code " + str(resp.status) + ". Msg: " + output) + + request_id = resp.getheader('x-ms-request-id') + + # Call to GET OPERATION STATUS until 200 (OK) + success = self.wait_operation_status(request_id, subscription_id, cert_file, key_file) + + if success: + return (True, "") + else: + return (False, "Error waiting the VM termination") + + + def wait_operation_status(self, request_id, subscription_id, cert_file, key_file, req_status = 200, delay = 2, timeout = 60): + """ + Wait for the operation "request_id" to finish in the specified state + """ + self.logger.info("Wait the operation: " + request_id + " reach the state " + str(req_status)) + status = 0 + wait = 0 + while status != req_status and wait < timeout: + time.sleep(delay) + wait += delay + try: + conn = httplib.HTTPSConnection(self.AZURE_SERVER, self.AZURE_PORT, cert_file=cert_file, key_file=key_file) + uri = "/%s/operations/%s" % (subscription_id, request_id) + conn.request('GET', uri, headers = {'x-ms-version' : '2013-03-01'}) + resp = conn.getresponse() + status = resp.status + conn.close() + self.logger.debug("Operation state: " + str(status)) + except Exception: + self.logger.exception("Error getting the operation state: " + request_id) + + if status == req_status: + return True + else: + self.logger.exception("Error waiting the operation") + return False + + def create_storage_account(self, storage_account, subscription_id, cert_file, key_file, region, timeout = 120): + """ + Create an storage account with the name specified in "storage_account" + """ + self.logger.info("Creating the storage account " + storage_account) + try: + conn = httplib.HTTPSConnection(self.AZURE_SERVER, self.AZURE_PORT, cert_file=cert_file, key_file=key_file) + uri = "/%s/services/storageservices" % subscription_id + storage_create_xml = ''' + + %s + Storage %s created by the IM + + %s + false + + + AccountCreatedBy + RestAPI + + + + ''' % (storage_account, storage_account, base64.b64encode(storage_account), region) + conn.request('POST', uri, body = storage_create_xml, headers = {'x-ms-version' : '2013-03-01', 'Content-Type' : 'application/xml'}) + resp = conn.getresponse() + output = resp.read() + conn.close() + except Exception: + self.logger.exception("Error creating the storage account") + return None + + if resp.status != 202: + self.logger.error("Error creating the storage account: Error code " + str(resp.status) + ". Msg: " + output) + return None + + request_id = resp.getheader('x-ms-request-id') + + # Call to GET OPERATION STATUS until 200 (OK) + success = self.wait_operation_status(request_id, subscription_id, cert_file, key_file) + + # Wait the storage to be "Created" + status = None + delay = 2 + wait = 0 + while status != "Created" and wait < timeout: + storage = self.get_storage_account(storage_account, subscription_id, cert_file, key_file) + if storage: + status = storage.Status + if status != "Created": + time.sleep(delay) + wait += delay + + if success: + return storage_account + else: + self.logger.exception("Error creating the storage account") + self.delete_storage_account(storage_account, subscription_id, cert_file, key_file) + return None + + def delete_storage_account(self, storage_account, subscription_id, cert_file, key_file): + """ + Delete an storage account with the name specified in "storage_account" + """ + try: + conn = httplib.HTTPSConnection(self.AZURE_SERVER, self.AZURE_PORT, cert_file=cert_file, key_file=key_file) + uri = "/%s/services/storageservices/%s" % (subscription_id, storage_account) + conn.request('DELETE', uri, headers = {'x-ms-version' : '2013-03-01'}) + resp = conn.getresponse() + output = resp.read() + conn.close() + except Exception: + self.logger.exception("Error deleting the storage account") + return False + + if resp.status != 200: + self.logger.error("Error deleting the storage account: Error Code " + str(resp.status) + ". Msg: " + output) + return False + + return True + + def get_storage_account(self, storage_account, subscription_id, cert_file, key_file): + """ + Get the information about the Storage Account named "storage_account" or None if it does not exist + """ + try: + conn = httplib.HTTPSConnection(self.AZURE_SERVER, self.AZURE_PORT, cert_file=cert_file, key_file=key_file) + uri = "/%s/services/storageservices/%s" % (subscription_id, storage_account) + conn.request('GET', uri, headers = {'x-ms-version' : '2013-03-01'}) + resp = conn.getresponse() + output = resp.read() + conn.close() + if resp.status == 200: + storage_info = StorageService(output) + return storage_info.StorageServiceProperties + elif resp.status == 404: + self.logger.debug("Storage " + storage_account + " does not exist") + return None + else: + self.logger.warn("Error checking the storage account " + storage_account + ". Msg: " + output) + return None + except Exception: + self.logger.exception("Error checking the storage account") + return None + + def launch(self, inf, radl, requested_radl, num_vm, auth_data): + subscription_id = self.get_user_subscription_id(auth_data) + auth = self.get_user_cert_data(auth_data) + + if auth is None or subscription_id is None: + return [(False, "Incorrect auth data")] + else: + cert_file, key_file = auth + + region = self.DEFAULT_LOCATION + if radl.systems[0].getValue('availability_zone'): + region = radl.systems[0].getValue('availability_zone') + else: + radl.systems[0].setValue('availability_zone', region) + + res = [] + i = 0 + while i < num_vm: + try: + # Create storage account + storage_account = self.get_storage_account(self.STORAGE_NAME, subscription_id, cert_file, key_file) + if not storage_account: + storage_account_name = self.create_storage_account(self.STORAGE_NAME, subscription_id, cert_file, key_file, region) + if storage_account_name is None: + res.append((False, "Error creating the storage account")) + else: + storage_account_name = self.STORAGE_NAME + # if the user has specified the region + if radl.systems[0].getValue('availability_zone'): + # Check that the region of the storage account is the same of the service + if region != storage_account.GeoPrimaryRegion: + res.append((False, "Error creating the service. The specified region")) + else: + # Otherwise use the storage account region + region = storage_account.GeoPrimaryRegion + + # and the service + service_name = self.create_service(subscription_id, cert_file, key_file, region) + if service_name is None: + res.append((False, "Error creating the service")) + break + + self.logger.debug("Creating the VM with id: " + service_name) + + # Create the VM to get the nodename + vm = VirtualMachine(inf, service_name, self.cloud, radl, requested_radl) + + # Generate the XML to create the VM + vm_create_xml = self.get_azure_vm_create_xml(vm, storage_account_name, radl, i) + + if vm_create_xml == None: + self.delete_service(service_name, subscription_id, cert_file, key_file) + res.append((False, "Incorrect image or auth data")) + + conn = httplib.HTTPSConnection(self.AZURE_SERVER, self.AZURE_PORT, cert_file=cert_file, key_file=key_file) + uri = "/%s/services/hostedservices/%s/deployments" % (subscription_id, service_name) + conn.request('POST', uri, body = vm_create_xml, headers = {'x-ms-version' : '2013-03-01', 'Content-Type' : 'application/xml'}) + resp = conn.getresponse() + output = resp.read() + conn.close() + + if resp.status != 202: + self.delete_service(service_name, subscription_id, cert_file, key_file) + self.logger.error("Error creating the VM: Error Code " + str(resp.status) + ". Msg: " + output) + res.append((False, "Error creating the VM: Error Code " + str(resp.status) + ". Msg: " + output)) + else: + #Call the GET OPERATION STATUS until sea 200 (OK) + request_id = resp.getheader('x-ms-request-id') + success = self.wait_operation_status(request_id, subscription_id, cert_file, key_file) + if success: + res.append((True, vm)) + else: + self.logger.exception("Error waiting the VM creation") + res.append((False, "Error waiting the VM creation")) + + except Exception, ex: + self.logger.exception("Error creating the VM") + res.append((False, "Error creating the VM: " + str(ex))) + finally: + # delete tmp files with certificates + os.unlink(cert_file) + os.unlink(key_file) + + i += 1 + return res + + def get_instance_type(self, system): + """ + Get the name of the instance type to launch to EC2 + + Arguments: + - radl(str): RADL document with the requirements of the VM to get the instance type + Returns: a str with the name of the instance type to launch to EC2 + """ + instance_type_name = system.getValue('instance_type') + + cpu = system.getValue('cpu.count') + cpu_op = system.getFeature('cpu.count').getLogOperator() + arch = system.getValue('cpu.arch') + memory = system.getFeature('memory.size').getValue('M') + memory_op = system.getFeature('memory.size').getLogOperator() + disk_free = 0 + disk_free_op = ">=" + if system.getValue('disks.free_size'): + disk_free = system.getFeature('disks.free_size').getValue('G') + disk_free_op = system.getFeature('memory.size').getLogOperator() + + instace_types = self.get_all_instance_types() + + res = None + for instace_type in instace_types: + # get the instance type with the lowest price + if res is None or (instace_type.price <= res.price): + str_compare = "arch in instace_type.cpu_arch " + str_compare += " and instace_type.cores_per_cpu * instace_type.num_cpu " + cpu_op + " cpu " + str_compare += " and instace_type.mem " + memory_op + " memory " + str_compare += " and instace_type.disks * instace_type.disk_space " + disk_free_op + " disk_free" + + #if arch in instace_type.cpu_arch and instace_type.cores_per_cpu * instace_type.num_cpu >= cpu and instace_type.mem >= memory and instace_type.cpu_perf >= performance and instace_type.disks * instace_type.disk_space >= disk_free: + if eval(str_compare): + if not instance_type_name or instace_type.name == instance_type_name: + res = instace_type + + if res is None: + self.get_instance_type_by_name(self.INSTANCE_TYPE) + else: + return res + + def updateVMInfo(self, vm, auth_data): + self.logger.debug("Get the VM info with the id: " + vm.id) + auth = self.get_user_cert_data(auth_data) + subscription_id = self.get_user_subscription_id(auth_data) + + if auth is None or subscription_id is None: + return [(False, "Incorrect auth data")] + else: + cert_file, key_file = auth + + service_name = vm.id + + try: + conn = httplib.HTTPSConnection(self.AZURE_SERVER, self.AZURE_PORT, cert_file=cert_file, key_file=key_file) + uri = "/%s/services/hostedservices/%s/deployments/%s" % (subscription_id, service_name, service_name) + conn.request('GET', uri, headers = {'x-ms-version' : '2013-03-01'}) + resp = conn.getresponse() + output = resp.read() + conn.close() + except Exception, ex: + # delete tmp files with certificates + os.unlink(cert_file) + os.unlink(key_file) + self.logger.exception("Error getting the VM info: " + vm.id) + return (False, "Error getting the VM info: " + vm.id + ". " + str(ex)) + + if resp.status != 200: + self.logger.error("Error getting the VM info: " + vm.id + ". Error Code: " + str(resp.status) + ". Msg: " + output) + # delete tmp files with certificates + os.unlink(cert_file) + os.unlink(key_file) + return (False, "Error getting the VM info: " + vm.id + ". Error Code: " + str(resp.status) + ". Msg: " + output) + else: + # delete tmp files with certificates + os.unlink(cert_file) + os.unlink(key_file) + + self.logger.debug("VM info: " + vm.id + " obtained.") + self.logger.debug(output) + vm_info = Deployment(output) + + self.logger.debug("The VM state is: " + vm_info.Status) + + vm.state = self.get_vm_state(vm_info) + + # Update IP info + self.setIPs(vm,vm_info) + return (True, vm) + + def get_vm_state(self, vm_info): + """ + Return the state of the VM using the vm info in format "Deployment" + """ + try: + # If the deploy is running check the state of the RoleInstance + if vm_info.Status == "Running": + return self.ROLE_STATE_MAP.get(vm_info.RoleInstanceList.RoleInstance[0].PowerState, VirtualMachine.UNKNOWN) + else: + return self.DEPLOY_STATE_MAP.get(vm_info.Status, VirtualMachine.UNKNOWN) + except: + return self.DEPLOY_STATE_MAP.get(vm_info.Status, VirtualMachine.UNKNOWN) + + def setIPs(self, vm, vm_info): + """ + Set the information about the IPs of the VM + """ + private_ips = [] + public_ips = [] + + try: + role_instance = vm_info.RoleInstanceList.RoleInstance[0] + except: + return + try: + private_ips.append(role_instance.IpAddress) + except: + pass + try: + public_ips.append(role_instance.InstanceEndpoints.InstanceEndpoint[0].Vip) + except: + pass + + vm.setIps(public_ips, private_ips) + + def finalize(self, vm, auth_data): + self.logger.debug("Terminate VM: " + vm.id) + subscription_id = self.get_user_subscription_id(auth_data) + auth = self.get_user_cert_data(auth_data) + + if auth is None or subscription_id is None: + return (False, "Incorrect auth data") + else: + cert_file, key_file = auth + + service_name = vm.id + + # Delete the service + res = self.delete_service(service_name, subscription_id, cert_file, key_file) + + # delete tmp files with certificates + os.unlink(cert_file) + os.unlink(key_file) + + return res + + def call_role_operation(self, op, vm, auth_data): + """ + Call to the specified operation "op" to a Role + """ + subscription_id = self.get_user_subscription_id(auth_data) + auth = self.get_user_cert_data(auth_data) + + if auth is None or subscription_id is None: + return (False, "Incorrect auth data") + else: + cert_file, key_file = auth + + service_name = vm.id + + try: + conn = httplib.HTTPSConnection(self.AZURE_SERVER, self.AZURE_PORT, cert_file=cert_file, key_file=key_file) + uri = "/%s/services/hostedservices/%s/deployments/%s/roleinstances/%s/Operations" % (subscription_id, service_name, service_name, self.ROLE_NAME) + + conn.request('POST', uri, body = op, headers = {'x-ms-version' : '2013-06-01', 'Content-Type' : 'application/xml'}) + resp = conn.getresponse() + output = resp.read() + conn.close() + except Exception, ex: + # delete tmp files with certificates + os.unlink(cert_file) + os.unlink(key_file) + self.logger.exception("Error calling role operation") + return (False, "Error calling role operation: " + str(ex)) + + if resp.status != 202: + # delete tmp files with certificates + os.unlink(cert_file) + os.unlink(key_file) + self.logger.error("Error calling role operation: Error Code " + str(resp.status) + ". Msg: " + output) + return (False, "Error calling role operation: Error Code " + str(resp.status) + ". Msg: " + output) + + request_id = resp.getheader('x-ms-request-id') + + # Call to GET OPERATION STATUS until 200 (OK) + success = self.wait_operation_status(request_id, subscription_id, cert_file, key_file) + + if success: + return (True, "") + else: + return (False, "Error waiting the VM role operation") + + return (True, "") + + def stop(self, vm, auth_data): + self.logger.debug("Stop VM: " + vm.id) + + op = """ + ShutdownRoleOperation + StoppedDeallocated +""" + return self.call_role_operation(op, vm, auth_data) + + def start(self, vm, auth_data): + self.logger.debug("Start VM: " + vm.id) + + op = """ + StartRoleOperation +""" + return self.call_role_operation(op, vm, auth_data) + + @staticmethod + def get_all_instance_types(): + list = [] + + xsmall = InstanceTypeInfo("ExtraSmall", ["x86_64"], 1, 1, 768, 0.0135, 1, 20) + list.append(xsmall) + small = InstanceTypeInfo("Small", ["x86_64"], 1, 1, 1792, 0.0574, 1, 40) + list.append(small) + medium = InstanceTypeInfo("Medium", ["x86_64"], 1, 2, 3584, 0.1147, 1, 60) + list.append(medium) + large = InstanceTypeInfo("Large", ["x86_64"], 1, 4, 7168, 0.229, 1, 120) + list.append(large) + xlarge = InstanceTypeInfo("Extra Large", ["x86_64"], 1, 8, 14336, 0.4588, 1, 240) + list.append(xlarge) + a5 = InstanceTypeInfo("A5", ["x86_64"], 1, 2, 14336, 0.2458, 1, 135) + list.append(a5) + a6 = InstanceTypeInfo("A6", ["x86_64"], 1, 4, 28672, 0.4916, 1, 285) + list.append(a6) + a7 = InstanceTypeInfo("A7", ["x86_64"], 1, 8, 57344, 0.9831, 1, 605) + list.append(a7) + + + return list + + def get_instance_type_by_name(self, name): + """ + Get the Azure instance type with the specified name + + Returns: an :py:class:`InstanceTypeInfo` or None if the type is not found + """ + for inst_type in self.get_all_instance_types(): + if inst_type.name == name: + return inst_type + return None diff --git a/connectors/CloudConnector.py b/connectors/CloudConnector.py index 6ecac9fc4..9df15b8b4 100644 --- a/connectors/CloudConnector.py +++ b/connectors/CloudConnector.py @@ -1,4 +1,7 @@ import logging +import subprocess +import shutil +import tempfile class CloudConnector: """ @@ -118,4 +121,33 @@ def stop(self, vm, auth_data): """ - raise NotImplementedError( "Should have implemented this" ) \ No newline at end of file + raise NotImplementedError( "Should have implemented this" ) + + def keygen(self): + """ + Generates a keypair using the ssh-keygen command and returns a tuple (public, private) + """ + tmp_dir = tempfile.mkdtemp() + pk_file = tmp_dir + "/im-ssh-key" + command = 'ssh-keygen -t rsa -b 2048 -q -N "" -f ' + pk_file + p=subprocess.Popen(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True) + (out, err) = p.communicate() + if p.returncode!=0: + shutil.rmtree(tmp_dir, ignore_errors=True) + self.logger.error("Error executing ssh-keygen: " + out + err) + return (None, None) + else: + public = None + private = None + try: + with open(pk_file) as f: private = f.read() + except: + self.logger.exception("Error reading private_key file.") + + try: + with open(pk_file + ".pub") as f: public = f.read() + except: + self.logger.exception("Error reading public_key file.") + + shutil.rmtree(tmp_dir, ignore_errors=True) + return (public, private) \ No newline at end of file diff --git a/connectors/DeployedNode.py b/connectors/DeployedNode.py new file mode 100644 index 000000000..03d2d12c8 --- /dev/null +++ b/connectors/DeployedNode.py @@ -0,0 +1,69 @@ +# IM - Infrastructure Manager +# Copyright (C) 2011 - GRyCAP - Universitat Politecnica de Valencia +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . + +import time +from IM.VirtualMachine import VirtualMachine +from CloudConnector import CloudConnector + + +class DeployedNodeCloudConnector(CloudConnector): + """ + Cloud Launcher to manage existing running nodes. + The connector does nothing, but enable to integrate these nodes into the contextualization. + """ + + type = "DeployedNode" + """str with the name of the provider.""" + + def concreteSystem(self, radl_system, auth_data): + # we must check that the RADL has this information: + # At least one IP, username, password or private_key + ip = radl_system.getValue("net_interface.0.ip") + user = radl_system.getValue("disk.0.os.credentials.username") + passwd = radl_system.getValue("disk.0.os.credentials.password") + priv_key = radl_system.getValue("disk.0.os.credentials.private_key") + + if ip and user and (passwd or priv_key): + res_system = radl_system.clone() + return [res_system] + else: + return [] + + def updateVMInfo(self, vm, auth_data): + return (True, vm) + + def launch(self, inf, radl, requested_radl, num_vm, auth_data): + res = [] + for _ in range(num_vm): + now = str(int(time.time()*100)) + vm = VirtualMachine(inf, now, self.cloud, requested_radl, requested_radl) + vm.info.systems[0].setValue('provider.type', self.type) + vm.state = VirtualMachine.RUNNING + res.append((True, vm)) + + return res + + def finalize(self, vm, auth_data): + return (True, "") + + def stop(self, vm, auth_data): + return (False, "Operation not supported") + + def start(self, vm, auth_data): + return (False, "Operation not supported") + + def alterVM(self, vm, radl, auth_data): + return (False, "Not supported") diff --git a/connectors/EC2.py b/connectors/EC2.py index 1dfa0d4d7..add65c618 100644 --- a/connectors/EC2.py +++ b/connectors/EC2.py @@ -18,6 +18,7 @@ import base64 from IM.uriparse import uriparse import boto.ec2 +import boto.vpc import os from IM.VirtualMachine import VirtualMachine from CloudConnector import CloudConnector @@ -61,6 +62,15 @@ class EC2CloudConnector(CloudConnector): INSTANCE_TYPE = 't1.micro' """str with the name of the default instance type to launch.""" + VM_STATE_MAP = { + 'pending': VirtualMachine.PENDING, + 'running': VirtualMachine.RUNNING, + 'stopped': VirtualMachine.STOPPED, + 'stopping': VirtualMachine.RUNNING, + 'shutting-down': VirtualMachine.OFF, + 'terminated': VirtualMachine.OFF + } + """Dictionary with a map with the EC3 VM states to the IM states.""" def concreteSystem(self, radl_system, auth_data): if radl_system.getValue("disk.0.image.url"): @@ -78,36 +88,27 @@ def concreteSystem(self, radl_system, auth_data): self.logger.debug(res_system) return [] else: - res_system.addFeature(Feature("cpu.count", "=", instance_type.num_cpu * instance_type.cores_per_cpu), conflict="other", missing="other") - res_system.addFeature(Feature("memory.size", "=", instance_type.mem, 'M'), conflict="other", missing="other") - if instance_type.disks > 0: - res_system.addFeature(Feature("disks.free_size", "=", instance_type.disks * instance_type.disk_space, 'G'), conflict="other", missing="other") - for i in range(1,instance_type.disks+1): - res_system.addFeature(Feature("disk.%d.free_size" % i, "=", instance_type.disk_space, 'G'), conflict="other", missing="other") - res_system.addFeature(Feature("cpu.performance", "=", instance_type.cpu_perf, 'ECU'), conflict="other", missing="other") - res_system.addFeature(Feature("price", "=", instance_type.price), conflict="me", missing="other") - - res_system.addFeature(Feature("instance_type", "=", instance_type.name), conflict="other", missing="other") - - res_system.addFeature(Feature("provider.type", "=", self.type), conflict="other", missing="other") - + self.update_system_info_from_instance(res_system, instance_type) return [res_system] else: return [] else: return [radl_system.clone()] - # Set the EC2 credentials - def set_ec2_credentials(self, key_id, access_key): + def update_system_info_from_instance(self, system, instance_type): """ - Set the EC2 credentials as environment values - - Arguments: - - key_id(str): AWS_ACCESS_KEY_ID value. - - access_key(str): AWS_SECRET_ACCESS_KEY value. + Update the features of the system with the information of the instance_type """ - os.environ['AWS_ACCESS_KEY_ID'] = key_id - os.environ['AWS_SECRET_ACCESS_KEY'] = access_key + system.addFeature(Feature("cpu.count", "=", instance_type.num_cpu * instance_type.cores_per_cpu), conflict="other", missing="other") + system.addFeature(Feature("memory.size", "=", instance_type.mem, 'M'), conflict="other", missing="other") + if instance_type.disks > 0: + system.addFeature(Feature("disks.free_size", "=", instance_type.disks * instance_type.disk_space, 'G'), conflict="other", missing="other") + for i in range(1,instance_type.disks+1): + system.addFeature(Feature("disk.%d.free_size" % i, "=", instance_type.disk_space, 'G'), conflict="other", missing="other") + system.addFeature(Feature("cpu.performance", "=", instance_type.cpu_perf, 'ECU'), conflict="other", missing="other") + system.addFeature(Feature("price", "=", instance_type.price), conflict="me", missing="other") + + system.addFeature(Feature("instance_type", "=", instance_type.name), conflict="other", missing="other") # Get the EC2 connection object def get_connection(self, region_name, auth_data): @@ -123,22 +124,17 @@ def get_connection(self, region_name, auth_data): try: auth = auth_data.getAuthInfo(EC2CloudConnector.type) if auth and 'username' in auth[0] and 'password' in auth[0]: - self.set_ec2_credentials(auth[0]['username'], auth[0]['password']) + region = boto.ec2.get_region(region_name) + if region: + return boto.vpc.VPCConnection(aws_access_key_id=auth[0]['username'], aws_secret_access_key=auth[0]['password'], region=region) + else: + raise Exception("Incorrect region name: " + region_name) else: self.logger.error("Incorrect auth data") return None - region = None - regions = boto.ec2.regions() - - for r in regions: - if r.name == region_name: - region = r - if region != None: - conn = region.connect() - except Exception, e: - self.logger.error("Error getting the region " + region_name + ": ") - self.logger.error(e) + except Exception: + self.logger.exception("Error getting the region " + region_name + ": ") return None return conn @@ -211,21 +207,79 @@ def get_instance_type(self, radl): else: return res - def create_security_group(self, conn, inf, radl): - res = "default" + @staticmethod + def set_net_provider_id(radl, vpc, subnet): + """ + Set the provider ID on all the nets of the system + """ + system = radl.systems[0] + for i in range(system.getNumNetworkIfaces()): + net_id = system.getValue('net_interface.' + str(i) + '.connection') + net = radl.get_network_by_id(net_id) + if net: + net.setValue('provider_id', vpc + "." + subnet) + + @staticmethod + def get_net_provider_id(radl): + """ + Get the provider ID of the first net that has specified it + Returns: The net provider ID or None if not defined + """ + provider_id = None + system = radl.systems[0] + for i in range(system.getNumNetworkIfaces()): + net_id = system.getValue('net_interface.' + str(i) + '.connection') + net = radl.get_network_by_id(net_id) + + if net: + provider_id = net.getValue('provider_id') + break; + + if provider_id: + parts = provider_id.split(".") + if len(parts) == 2 and parts[0].startswith("vpc-") and parts[1].startswith("subnet-"): + # TODO: check that the VPC and subnet, exists + return parts[0], parts[1] + else: + raise Exception("Incorrect provider_id value: " + provider_id + ". It must be ..") + else: + return None + + @staticmethod + def _get_security_group(conn, sg_name): try: sg = None - - sg_name = "im-" + str(inf.uuid) for elem in conn.get_all_security_groups(): if elem.name == sg_name: sg = elem break + return sg + except Exception: + return None + + def create_security_group(self, conn, inf, radl, vpc = None): + res = None + try: + sg_name = "im-" + str(inf.uuid) + sg = self._get_security_group(conn, sg_name) + if not sg: self.logger.debug("Creating security group: " + sg_name) - sg = conn.create_security_group(sg_name, "Security group created by the IM") + try: + sg = conn.create_security_group(sg_name, "Security group created by the IM", vpc_id = vpc) + except Exception, crex: + # First check if the SG does exist + sg = self._get_security_group(conn, sg_name) + if not sg: + # if not raise the exception + raise crex + else: + self.logger.debug("Security group: " + sg_name + " already created.") - res = sg_name + if vpc: + res = [sg.id] + else: + res = [sg.name] public_net = None for net in radl.networks: @@ -239,20 +293,26 @@ def create_security_group(self, conn, inf, radl): if local_port != 22 and local_port != 5099: protocol = remote_protocol if remote_protocol != local_protocol: - self.logger.warn("Diferent protocols used in outports ignoring local port protocol!") + self.logger.warn("Different protocols used in outports ignoring local port protocol!") sg.authorize(protocol, remote_port, local_port, '0.0.0.0/0') - sg.authorize('tcp', 22, 22, '0.0.0.0/0') - sg.authorize('tcp', 5099, 5099, '0.0.0.0/0') - - # open all the ports for the VMs in the security group - sg.authorize('tcp', 0, 65535, src_group=sg) - sg.authorize('udp', 0, 65535, src_group=sg) - sg.authorize('icmp', 0, 65535, src_group=sg) + try: + sg.authorize('tcp', 22, 22, '0.0.0.0/0') + sg.authorize('tcp', 5099, 5099, '0.0.0.0/0') + + # open all the ports for the VMs in the security group + sg.authorize('tcp', 0, 65535, src_group=sg) + sg.authorize('udp', 0, 65535, src_group=sg) + #sg.authorize('icmp', 0, 65535, src_group=sg) + except Exception, addex: + self.logger.warn("Exception adding SG rules. Probably the rules exists:" + str(addex)) + pass - except Exception: + except Exception, ex: self.logger.exception("Error Creating the Security group") + if vpc: + raise Exception("Error Creating the Security group: " + str(ex)) pass return res @@ -292,6 +352,23 @@ def create_keypair(self, system, conn): return (created, keypair_name) + def get_default_subnet(self, conn): + """ + Get the default VPC and the first subnet + """ + vpc_id = None + subnet_id = None + + for vpc in conn.get_all_vpcs(): + if vpc.is_default: + vpc_id = vpc.id + for subnet in conn.get_all_subnets({"vpcId":vpc_id}): + subnet_id = subnet.id + break + break + + return vpc_id, subnet_id + def launch(self, inf, radl, requested_radl, num_vm, auth_data): system = radl.systems[0] @@ -329,15 +406,39 @@ def launch(self, inf, radl, requested_radl, num_vm, auth_data): res.append((False, "Error getting correct block_device name from AMI: " + str(ami))) return res + # Create the security group for the VMs + provider_id = self.get_net_provider_id(radl) + if provider_id: + vpc, subnet = provider_id + sg_names = None + sg_ids = self.create_security_group(conn, inf, radl, vpc) + if not sg_ids: + vpc = None + subnet = None + sg_ids = None + sg_names = ['default'] + else: + # Check the default VPC and get the first subnet with a connection with a gateway + # If there are no default VPC, use EC2-classic + vpc, subnet = self.get_default_subnet(conn) + if vpc: + self.set_net_provider_id(radl, vpc, subnet) + sg_names = None + sg_ids = self.create_security_group(conn, inf, radl, vpc) + else: + sg_ids = None + sg_names = self.create_security_group(conn, inf, radl, vpc) + if not sg_names: + sg_names = ['default'] + + # Now create the keypair (created_keypair, keypair_name) = self.create_keypair(system, conn) if not keypair_name: self.logger.error("Error managing the keypair.") for i in range(num_vm): res.append((False, "Error managing the keypair.")) return res - - # Create the security group for the VMs - sg_name = self.create_security_group(conn, inf, radl) + all_failed = True i = 0 @@ -384,7 +485,7 @@ def launch(self, inf, radl, requested_radl, num_vm, auth_data): # Force to use magnetic volumes bdm = boto.ec2.blockdevicemapping.BlockDeviceMapping(conn) bdm[block_device_name] = boto.ec2.blockdevicemapping.BlockDeviceType(volume_type="standard") - request = conn.request_spot_instances(price=price, image_id=image.id, count=1, type='one-time', instance_type=instance_type.name, placement=availability_zone, key_name=keypair_name, security_groups=[sg_name], block_device_map=bdm) + request = conn.request_spot_instances(price=price, image_id=image.id, count=1, type='one-time', instance_type=instance_type.name, placement=availability_zone, key_name=keypair_name, security_groups=sg_names, security_group_ids=sg_ids, block_device_map=bdm, subnet_id=subnet) if request: ec2_vm_id = region_name + ";" + request[0].id @@ -413,12 +514,13 @@ def launch(self, inf, radl, requested_radl, num_vm, auth_data): # Force to use magnetic volumes bdm = boto.ec2.blockdevicemapping.BlockDeviceMapping(conn) bdm[block_device_name] = boto.ec2.blockdevicemapping.BlockDeviceType(volume_type="standard") - reservation = image.run(min_count=1,max_count=1,key_name=keypair_name,instance_type=instance_type.name,security_groups=[sg_name],placement=placement,block_device_map=bdm) + # Check if the user has specified the net provider id + reservation = image.run(min_count=1,max_count=1,key_name=keypair_name,instance_type=instance_type.name,security_groups=sg_names,security_group_ids=sg_ids,placement=placement,block_device_map=bdm,subnet_id=subnet) if len(reservation.instances) == 1: instance = reservation.instances[0] ec2_vm_id = region_name + ";" + instance.id - + self.logger.debug("RADL:") self.logger.debug(system) @@ -441,8 +543,10 @@ def launch(self, inf, radl, requested_radl, num_vm, auth_data): if all_failed: if created_keypair: conn.delete_key_pair(keypair_name) - if sg_name != 'default': - conn.delete_security_group(sg_name) + if sg_ids: + conn.delete_security_group(group_id = sg_ids[0]) + if sg_names and sg_names[0] != 'default': + conn.delete_security_group(sg_names[0]) return res @@ -483,8 +587,9 @@ def attach_volumes(self, instance, vm): """ try: if instance.state == 'running' and not "volumes" in vm.__dict__.keys(): + # Flag to se that this VM has created (or is creating) the volumes + vm.volumes = True conn = instance.connection - vm.volumes = [] cont = 1 while vm.info.systems[0].getValue("disk." + str(cont) + ".size") and vm.info.systems[0].getValue("disk." + str(cont) + ".device"): disk_size = vm.info.systems[0].getFeature("disk." + str(cont) + ".size").getValue('G') @@ -492,47 +597,51 @@ def attach_volumes(self, instance, vm): self.logger.debug("Creating a %d GB volume for the disk %d" % (int(disk_size), cont)) volume = self.create_volume(conn, int(disk_size), instance.placement) if volume: - vm.volumes.append(volume.id) self.logger.debug("Attach the volume ID " + str(volume.id)) conn.attach_volume(volume.id, instance.id, "/dev/" + disk_device) cont += 1 except Exception: self.logger.exception("Error creating or attaching the volume to the instance") - def delete_volumes(self, conn, vm, timeout = 240): + def delete_volumes(self, conn, volumes, instance_id, timeout = 240): """ - Delete the volumes of a VM + Delete the volumes specified in the volumes list Arguments: - conn(:py:class:`boto.ec2.connection`): object to connect to EC2 API. - - vm(:py:class:`IM.VirtualMachine`): VM information. + - volumes(list of strings): Volume IDs to delete. - timeout(int): Time needed to delete the volume. """ - if "volumes" in vm.__dict__.keys() and vm.volumes: - instance_id = vm.id.split(";")[1] - for volume_id in vm.volumes: - cont = 0 - deleted = False - while not deleted and cont < timeout: - cont += 5 - try: - curr_vol = conn.get_all_volumes([volume_id])[0] - if str(curr_vol.attachment_state()) == "attached": - self.logger.debug("Detaching the volume " + volume_id + " from the instance " + instance_id) - conn.detach_volume(volume_id, instance_id, force=True) - elif curr_vol.attachment_state() is None: - self.logger.debug("Removing the volume " + volume_id) - conn.delete_volume(volume_id) - deleted = True - else: - self.logger.debug("State: " + str(curr_vol.attachment_state())) - except Exception: - self.logger.exception("Error removing the volume.") + for volume_id in volumes: + cont = 0 + deleted = False + while not deleted and cont < timeout: + cont += 5 + try: + curr_vol = conn.get_all_volumes([volume_id])[0] + except: + self.logger.warn("The volume " + volume_id + " does not exist. It cannot be removed. Ignore it.") + deleted = True + break + try: + curr_vol = conn.get_all_volumes([volume_id])[0] + if str(curr_vol.attachment_state()) == "attached": + self.logger.debug("Detaching the volume " + volume_id + " from the instance " + instance_id) + conn.detach_volume(volume_id, instance_id, force=True) + elif curr_vol.attachment_state() is None: + self.logger.debug("Removing the volume " + volume_id) + conn.delete_volume(volume_id) + deleted = True + else: + self.logger.debug("State: " + str(curr_vol.attachment_state())) + except Exception, ex: + self.logger.warn("Error removing the volume: " + str(ex)) + if not deleted: time.sleep(5) - - if not deleted: - self.logger.error("Error removing the volume " + volume_id) + + if not deleted: + self.logger.error("Error removing the volume " + volume_id) # Get the EC2 instance object with the specified ID def get_instance_by_id(self, instance_id, region_name, auth_data): @@ -567,7 +676,9 @@ def add_elastic_ip(self, vm, instance, fixed_ip = None): - fixed_ip(str, optional): specifies a fixed IP to add to the instance. Returns: a :py:class:`boto.ec2.address.Address` added or None if some problem occur. """ - if vm.state == VirtualMachine.RUNNING: + if vm.state == VirtualMachine.RUNNING and not "elastic_ip" in vm.__dict__.keys(): + # Flag to set that this VM has created (or is creating) the elastic IPs + vm.elastic_ip = True try: pub_address = None self.logger.debug("Add an Elastic IP") @@ -582,10 +693,15 @@ def add_elastic_ip(self, vm, instance, fixed_ip = None): self.logger.warn("Setting a fixed IP NOT ALLOCATED! (" + fixed_ip + "). Ignore it.") return None else: - pub_address = instance.connection.allocate_address() + provider_id = self.get_net_provider_id(vm.info) + if provider_id: + pub_address = instance.connection.allocate_address(domain="vpc") + instance.connection.associate_address(instance.id, allocation_id=pub_address.allocation_id) + else: + pub_address = instance.connection.allocate_address() + instance.connection.associate_address(instance.id, pub_address.public_ip) self.logger.debug(pub_address) - pub_address.associate(instance.id) return pub_address except Exception: self.logger.exception("Error adding an Elastic IP to VM ID: " + str(vm.id)) @@ -738,30 +854,15 @@ def updateVMInfo(self, vm, auth_data): vm.info.systems[0].setValue("virtual_system_type", "'" + instance.virtualization_type + "'") vm.info.systems[0].setValue("availability_zone", "'" + instance.placement + "'") - if instance.state == 'pending': - res_state = VirtualMachine.PENDING - elif instance.state == 'running': - res_state = VirtualMachine.RUNNING - elif instance.state == 'stopped': - res_state = VirtualMachine.STOPPED - elif instance.state == 'stopping': - res_state = VirtualMachine.RUNNING - elif instance.state == 'shutting-down': - res_state = VirtualMachine.OFF - elif instance.state == 'terminated': - res_state = VirtualMachine.OFF - else: - res_state = VirtualMachine.UNKNOWN - - vm.state = res_state + vm.state = self.VM_STATE_MAP.get(instance.state, VirtualMachine.UNKNOWN) self.setIPsFromInstance(vm, instance) self.attach_volumes(instance, vm) try: vm.info.systems[0].setValue('launch_time', int(time.mktime(time.strptime(instance.launch_time[:19],'%Y-%m-%dT%H:%M:%S')))) - except: - self.logger.exception("Error setting the launch_time of the instance") + except Exception, ex: + self.logger.warn("Error setting the launch_time of the instance. Probably the instance is not running:" + str(ex)) else: vm.state = VirtualMachine.OFF @@ -795,9 +896,13 @@ def finalize(self, vm, auth_data): conn = self.get_connection(region_name, auth_data) # Terminate the instance + volumes = [] instance = self.get_instance_by_id(instance_id, region_name, auth_data) if (instance != None): instance.update() + # Get the volumnes to delete + for volume in instance.block_device_mapping.values(): + volumes.append(volume.volume_id) instance.terminate() public_key = vm.getRequestedSystem().getValue('disk.0.os.credentials.public_key') @@ -812,7 +917,7 @@ def finalize(self, vm, auth_data): self.cancel_spot_requests(conn, vm) # Delete the EBS volumes - self.delete_volumes(conn, vm) + self.delete_volumes(conn, volumes, instance.id) # Delete the SG if this is the last VM self.delete_security_group(conn, vm.inf) @@ -827,12 +932,8 @@ def delete_security_group(self, conn, inf, timeout = 90): - conn(:py:class:`boto.ec2.connection`): object to connect to EC2 API. - inf(:py:class:`IM.InfrastructureInfo`): Infrastructure information. """ - sg_name = "im-" + str(id(inf)) - sg = None - for elem in conn.get_all_security_groups(): - if elem.name == sg_name: - sg = elem - break + sg_name = "im-" + str(inf.uuid) + sg = self._get_security_group(conn, sg_name) if sg: some_vm_running = False @@ -856,10 +957,33 @@ def delete_security_group(self, conn, inf, timeout = 90): if all_vms_terminated: self.logger.debug("Remove the SG: " + sg_name) - sg.delete() + try: + sg.revoke('tcp', 0, 65535, src_group=sg) + sg.revoke('udp', 0, 65535, src_group=sg) + time.sleep(2) + except Exception, ex: + self.logger.warn("Error revoking self rules: " + str(ex)) + + deleted = False + while not deleted and cont < timeout: + time.sleep(5) + cont += 5 + try: + sg.delete() + deleted = True + except Exception, ex: + # Check if it has been deleted yet + sg = self._get_security_group(conn, sg_name) + if not sg: + self.logger.debug("Error deleting the SG. But it does not exist. Ignore. " + str(ex)) + deleted = True + else: + self.logger.exception("Error deleting the SG.") else: # If there are more than 1, we skip this step self.logger.debug("There are active instances. Not removing the SG") + else: + self.logger.warn("No Security Group with name: " + sg_name) def stop(self, vm, auth_data): @@ -883,9 +1007,52 @@ def start(self, vm, auth_data): instance.start() return (True, "") + + def waitStop(self, instance, timeout = 60): + """ + Wait a instance to be stopped + """ + instance.stop() + wait = 0 + powered_off = False + while wait < timeout and not powered_off: + instance.update() + + powered_off = instance.state == 'stopped' + if not powered_off: + time.sleep(2) + wait += 2 + + return powered_off def alterVM(self, vm, radl, auth_data): - return (False, "Not supported") + region_name = vm.id.split(";")[0] + instance_id = vm.id.split(";")[1] + + # Terminate the instance + instance = self.get_instance_by_id(instance_id, region_name, auth_data) + if instance: + instance.update() + else: + return (False, "The instance has not been found") + + success = True + if radl.systems: + radl.systems[0].applyFeatures(vm.requested_radl.systems[0],conflict="me", missing="me") + instance_type = self.get_instance_type(radl.systems[0]) + + if instance.instance_type != instance_type.name: + self.waitStop(instance) + success = instance.modify_attribute('instanceType', instance_type.name) + if success: + self.update_system_info_from_instance(vm.info.systems[0], instance_type) + instance.start() + + if success: + return (success, self.updateVMInfo(vm, auth_data)) + else: + return (success, "Unknown Error") + def get_all_instance_types(self): """ @@ -906,20 +1073,20 @@ def get_all_instance_types(self): t2_medium = InstanceTypeInfo("t2.medium", ["i386", "x86_64"], 2, 1, 4096, 0.052, 0.5) list.append(t2_medium) - m1_small = InstanceTypeInfo("m1.small", ["i386", "x86_64"], 1, 1, 1740, 0.0071, 1, 1, 160) + m1_small = InstanceTypeInfo("m1.small", ["i386", "x86_64"], 1, 1, 1740, 0.0171, 1, 1, 160) list.append(m1_small) - m1_medium = InstanceTypeInfo("m1.medium", ["i386", "x86_64"], 1, 1, 3840, 0.0081, 2, 1, 410) + m1_medium = InstanceTypeInfo("m1.medium", ["i386", "x86_64"], 1, 1, 3840, 0.0331, 2, 1, 410) list.append(m1_medium) - m1_large = InstanceTypeInfo("m1.large", ["x86_64"], 1, 2, 7680, 0.0161, 4, 2, 420) + m1_large = InstanceTypeInfo("m1.large", ["x86_64"], 1, 2, 7680, 0.0661, 4, 2, 420) list.append(m1_large) - m1_xlarge = InstanceTypeInfo("m1.xlarge", ["x86_64"], 1, 4, 15360, 0.0321, 8, 4, 420) + m1_xlarge = InstanceTypeInfo("m1.xlarge", ["x86_64"], 1, 4, 15360, 0.1321, 8, 4, 420) list.append(m1_xlarge) - m2_xlarge = InstanceTypeInfo("m2.xlarge", ["x86_64"], 1, 2, 17510, 0.0161, 6.5, 1, 420) + m2_xlarge = InstanceTypeInfo("m2.xlarge", ["x86_64"], 1, 2, 17510, 0.0701, 6.5, 1, 420) list.append(m2_xlarge) - m2_2xlarge = InstanceTypeInfo("m2.2xlarge", ["x86_64"], 1, 4, 35020, 0.0321, 13, 1, 850) + m2_2xlarge = InstanceTypeInfo("m2.2xlarge", ["x86_64"], 1, 4, 35020, 0.1401, 13, 1, 850) list.append(m2_2xlarge) - m2_4xlarge = InstanceTypeInfo("m2.4xlarge", ["x86_64"], 1, 4, 70041, 0.075, 13, 2, 840) + m2_4xlarge = InstanceTypeInfo("m2.4xlarge", ["x86_64"], 1, 4, 70041, 0.2801, 13, 2, 840) list.append(m2_4xlarge) m3_medium = InstanceTypeInfo("m3.medium", ["x86_64"], 1, 1, 3840, 0.07, 3, 1, 4) @@ -931,15 +1098,15 @@ def get_all_instance_types(self): m3_2xlarge = InstanceTypeInfo("m3.2xlarge", ["x86_64"], 1, 8, 30720, 0.56, 26, 2, 80) list.append(m3_2xlarge) - c1_medium = InstanceTypeInfo("c1.medium", ["i386", "x86_64"], 1, 2, 1740, 0.0161, 5, 1, 350) + c1_medium = InstanceTypeInfo("c1.medium", ["i386", "x86_64"], 1, 2, 1740, 0.05, 5, 1, 350) list.append(c1_medium) - c1_xlarge = InstanceTypeInfo("c1.xlarge", ["x86_64"], 1, 8, 7680, 0.0641, 20, 4, 420) + c1_xlarge = InstanceTypeInfo("c1.xlarge", ["x86_64"], 1, 8, 7680, 0.2, 20, 4, 420) list.append(c1_xlarge) - cc2_8xlarge = InstanceTypeInfo("cc2.8xlarge", ["x86_64"], 2, 8, 61952, 0.2562, 88, 4, 840) + cc2_8xlarge = InstanceTypeInfo("cc2.8xlarge", ["x86_64"], 2, 8, 61952, 0.4281, 88, 4, 840) list.append(cc2_8xlarge) - cr1_8xlarge = InstanceTypeInfo("cr1.8xlarge", ["x86_64"], 2, 8, 249856, 0.3666, 88, 2, 120) + cr1_8xlarge = InstanceTypeInfo("cr1.8xlarge", ["x86_64"], 2, 8, 249856, 0.2687, 88, 2, 120) list.append(cr1_8xlarge) c3_large = InstanceTypeInfo("c3.large", ["x86_64"], 2, 1, 3840, 0.105, 7, 2, 16) @@ -973,6 +1140,20 @@ def get_all_instance_types(self): i2_8xlarge = InstanceTypeInfo("i2.8xlarge", ["x86_64"], 32, 1, 249856, 6.82, 104, 8, 800) list.append(i2_8xlarge) + hs1_8xlarge = InstanceTypeInfo("hs1.8xlarge", ["x86_64"], 16, 1, 119808, 4.6, 35, 24, 2048) + list.append(hs1_8xlarge) + + c4_large = InstanceTypeInfo("c4.large", ["x86_64"], 2, 1, 3840, 0.116, 8, 1, 0) + list.append(c4_large) + c4_xlarge = InstanceTypeInfo("c4.xlarge", ["x86_64"], 4, 1, 7680, 0.232, 16, 1, 0) + list.append(c4_xlarge) + c4_2xlarge = InstanceTypeInfo("c4.2xlarge", ["x86_64"], 8, 1, 15360, 0.464, 31, 1, 0) + list.append(c4_2xlarge) + c4_4xlarge = InstanceTypeInfo("c4.4xlarge", ["x86_64"], 16, 1, 30720, 0.928, 62, 1, 0) + list.append(c4_4xlarge) + c4_8xlarge = InstanceTypeInfo("c4.8xlarge", ["x86_64"], 36, 1, 61952, 1.856, 132, 1, 0) + list.append(c4_8xlarge) + return list def get_instance_type_by_name(self, name): diff --git a/connectors/GCE.py b/connectors/GCE.py index 766cd57a3..5d31ad8f4 100644 --- a/connectors/GCE.py +++ b/connectors/GCE.py @@ -31,6 +31,7 @@ class GCECloudConnector(CloudConnector): type = "GCE" """str with the name of the provider.""" + DEFAULT_ZONE = "us-central1" def get_driver(self, auth_data): """ @@ -50,7 +51,8 @@ def get_driver(self, auth_data): return driver else: self.logger.error("No correct auth data has been specified to GCE: username, password and project") - return None + self.logger.debug(auth) + raise Exception("No correct auth data has been specified to GCE: username, password and project") def concreteSystem(self, radl_system, auth_data): @@ -66,13 +68,15 @@ def concreteSystem(self, radl_system, auth_data): region = res_system.getValue('availability_zone') else: region, _ = self.get_image_data(res_system.getValue("disk.0.image.url")) - region = res_system.setValue('availability_zone', region) instance_type = self.get_instance_type(driver.list_sizes(region), res_system) + if not instance_type: + return [] + username = res_system.getValue('disk.0.os.credentials.username') if not username: - res_system.setValue('disk.0.os.credentials.username','root') + res_system.setValue('disk.0.os.credentials.username','gceuser') res_system.addFeature(Feature("memory.size", "=", instance_type.ram, 'M'), conflict="other", missing="other") if instance_type.disk: res_system.addFeature(Feature("disk.0.free_size", "=", instance_type.disk , 'G'), conflict="other", missing="other") @@ -88,6 +92,37 @@ def concreteSystem(self, radl_system, auth_data): else: return [radl_system.clone()] + @staticmethod + def set_net_provider_id(radl, net_name): + """ + Set the provider ID on all the nets of the system + """ + system = radl.systems[0] + for i in range(system.getNumNetworkIfaces()): + net_id = system.getValue('net_interface.' + str(i) + '.connection') + net = radl.get_network_by_id(net_id) + if net: + net.setValue('provider_id', net_name) + + @staticmethod + def get_net_provider_id(radl): + """ + Get the provider ID of the first net that has specified it + Returns: The net provider ID or None if not defined + """ + provider_id = None + system = radl.systems[0] + for i in range(system.getNumNetworkIfaces()): + net_id = system.getValue('net_interface.' + str(i) + '.connection') + net = radl.get_network_by_id(net_id) + + if net: + provider_id = net.getValue('provider_id') + break; + + # TODO: check that the net exist in GCE + return provider_id + def get_instance_type(self, sizes, radl): """ Get the name of the instance type to launch to LibCloud @@ -139,20 +174,36 @@ def request_external_ip(self, radl): else: return None - # el path sera algo asi: gce://us-central1/debian-7 + # The path must be: gce://us-central1/debian-7 or gce://debian-7 def get_image_data(self, path): """ Get the region and the image name from an URL of a VMI Arguments: - - path(str): URL of a VMI (some like this: gce://us-central1/debian-7) + - path(str): URL of a VMI (some like this: gce://us-central1/debian-7 or gce://debian-7) Returns: a tuple (region, image_name) with the region and the AMI ID """ - region = uriparse(path)[1] - image_name = uriparse(path)[2][1:] + uri = uriparse(path) + if uri[2]: + region = uri[1] + image_name = uri[2][1:] + else: + # If the image do not specify the zone, use the default one + region = self.DEFAULT_ZONE + image_name = uri[1] return (region, image_name) + def get_default_net(self, driver): + """ + Get the first net + """ + nets = driver.ex_list_networks() + if nets: + return nets[0].name + else: + return None + def launch(self, inf, radl, requested_radl, num_vm, auth_data): driver = self.get_driver(auth_data) @@ -161,7 +212,7 @@ def launch(self, inf, radl, requested_radl, num_vm, auth_data): image = driver.ex_get_image(image_id) if not image: - return [(False, "Incorrect image name") for i in range(num_vm)] + return [(False, "Incorrect image name") for _ in range(num_vm)] if system.getValue('availability_zone'): region = system.getValue('availability_zone') @@ -186,11 +237,30 @@ def launch(self, inf, radl, requested_radl, num_vm, auth_data): username = system.getValue('disk.0.os.credentials.username') private = system.getValue('disk.0.os.credentials.private_key') public = system.getValue('disk.0.os.credentials.public_key') + + if not public or not private: + # We must generate them + self.logger.debug("No keys. Generating key pair.") + (public, private) = self.keygen() + system.setValue('disk.0.os.credentials.private_key', private) + if private and public: #metadata = {"sshKeys": "root:ssh-rsa AAAAB3NzaC1yc2EAAAADAQABAAABAQC9i2KyVMk3Cz/rm9pCoIioFm/gMT0EvhobP5PFZnva+WxFeiH41j4shAim/+reyyUgC+hDpo9Pf6ZzvbOOCaWoGzgdEYtItixKmxE3wWoTUXZW4Lwks69+aKS2BXnOPm5z7BV6F72GVc9r7mlq/Xpd9e2EcDa5WyA6ilnBTVnMgWHOgEjQ+AEChswDELF3DSkXmLtQsWup+kVQmktwmC6+4sPztALwhUJiK1jJ+wshPCuJw0nY7t4Keybm2b/A3nLxDlLbJZay0kV70nlwAYSmTa+HcUkbPqgL0UNVlgW2/rdSNo8RSmoF1pFdXb+zii3YCFUnAC2l2FDmxUhRp0bT root@host"} metadata = {"sshKeys": username + ":" + public} args['ex_metadata'] = metadata self.logger.debug("Setting ssh for user: " + username) + self.logger.debug(metadata) + + net_provider_id = self.get_net_provider_id(radl) + if net_provider_id: + args['ex_network'] = net_provider_id + else: + net_name = self.get_default_net(driver) + if net_name: + args['ex_network'] = net_name + self.set_net_provider_id(radl, net_name) + else: + self.set_net_provider_id(radl, "default") res = [] i = 0 @@ -348,6 +418,9 @@ def updateVMInfo(self, vm, auth_data): vm.state = res_state + if 'zone' in node.extra: + vm.info.systems[0].setValue('availability_zone', node.extra['zone'].name) + vm.setIps(node.public_ips, node.private_ips) self.attach_volumes(vm,node) else: diff --git a/connectors/OCCI.py b/connectors/OCCI.py index f527d303a..aaad85b68 100644 --- a/connectors/OCCI.py +++ b/connectors/OCCI.py @@ -16,8 +16,6 @@ from ssl import SSLError import json -import subprocess -import shutil import os import re import base64 @@ -236,34 +234,6 @@ def updateVMInfo(self, vm, auth_data): self.logger.exception("Error connecting with OCCI server") return (False, "Error connecting with OCCI server: " + str(ex)) - def keygen(self): - """ - Generates a keypair using the ssh-keygen command and returns a tuple (public, private) - """ - tmp_dir = tempfile.mkdtemp() - pk_file = tmp_dir + "/occi-key" - command = 'ssh-keygen -t rsa -b 2048 -q -N "" -f ' + pk_file - p=subprocess.Popen(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True) - (out, err) = p.communicate() - if p.returncode!=0: - shutil.rmtree(tmp_dir, ignore_errors=True) - self.logger.error("Error executing ssh-keygen: " + out + err) - return (None, None) - else: - public = None - private = None - try: - with open(pk_file) as f: private = f.read() - except: - self.logger.exception("Error reading private_key file.") - - try: - with open(pk_file + ".pub") as f: public = f.read() - except: - self.logger.exception("Error reading public_key file.") - - shutil.rmtree(tmp_dir, ignore_errors=True) - return (public, private) def gen_cloud_config(self, public_key, user = 'cloudadm'): """ @@ -440,9 +410,12 @@ def launch(self, inf, radl, requested_radl, num_vm, auth_data): if 'location' in resp.msg.dict: occi_vm_id = os.path.basename(resp.msg.dict['location']) else: - occi_vm_id = os.path.basename(output) - vm = VirtualMachine(inf, occi_vm_id, self.cloud, radl, requested_radl) - res.append((True, vm)) + occi_vm_id = os.path.basename(output) + if occi_vm_id: + vm = VirtualMachine(inf, occi_vm_id, self.cloud, radl, requested_radl) + res.append((True, vm)) + else: + res.append((False, 'Unknown Error launching the VM.')) except Exception, ex: self.logger.exception("Error connecting with OCCI server") @@ -548,6 +521,7 @@ def get_keystone_uri(occi, auth_data): conn = occi.get_http_connection(auth_data) conn.request('HEAD', "/-/", headers = headers) resp = conn.getresponse() + occi.delete_proxy(conn) www_auth_head = resp.getheader('Www-Authenticate') if www_auth_head and www_auth_head.startswith('Keystone uri'): return www_auth_head.split('=')[1].replace("'","") @@ -582,6 +556,7 @@ def get_keystone_token(occi, keystone_uri, auth): conn.endheaders(body) resp = conn.getresponse() + occi.delete_proxy(conn) # format: -> "{\"access\": {\"token\": {\"issued_at\": \"2014-12-29T17:10:49.609894\", \"expires\": \"2014-12-30T17:10:49Z\", \"id\": \"c861ab413e844d12a61d09b23dc4fb9c\"}, \"serviceCatalog\": [], \"user\": {\"username\": \"/DC=es/DC=irisgrid/O=upv/CN=miguel-caballer\", \"roles_links\": [], \"id\": \"475ce4978fb042e49ce0391de9bab49b\", \"roles\": [], \"name\": \"/DC=es/DC=irisgrid/O=upv/CN=miguel-caballer\"}, \"metadata\": {\"is_admin\": 0, \"roles\": []}}}" output = json.loads(resp.read()) @@ -610,6 +585,7 @@ def get_keystone_token(occi, keystone_uri, auth): conn.endheaders(body) resp = conn.getresponse() + occi.delete_proxy(conn) # format: -> "{\"access\": {\"token\": {\"issued_at\": \"2014-12-29T17:10:49.609894\", \"expires\": \"2014-12-30T17:10:49Z\", \"id\": \"c861ab413e844d12a61d09b23dc4fb9c\"}, \"serviceCatalog\": [], \"user\": {\"username\": \"/DC=es/DC=irisgrid/O=upv/CN=miguel-caballer\", \"roles_links\": [], \"id\": \"475ce4978fb042e49ce0391de9bab49b\", \"roles\": [], \"name\": \"/DC=es/DC=irisgrid/O=upv/CN=miguel-caballer\"}, \"metadata\": {\"is_admin\": 0, \"roles\": []}}}" output = json.loads(resp.read()) diff --git a/connectors/OpenNebula.py b/connectors/OpenNebula.py index 2ea6e0994..f7431a7d6 100644 --- a/connectors/OpenNebula.py +++ b/connectors/OpenNebula.py @@ -16,14 +16,14 @@ import hashlib import xmlrpclib +import time from IM.xmlobject import XMLObject from IM.uriparse import uriparse from IM.VirtualMachine import VirtualMachine from CloudConnector import CloudConnector -from IM.radl.radl import network - -from IM.radl.radl import Feature +from IM.radl.radl import network, Feature +from IM.config import ConfigOpenNebula # clases para parsear el resultado de las llamadas a la API de ONE class NIC(XMLObject): @@ -57,7 +57,8 @@ class VM(XMLObject): STATE_SUSPENDED=5 STATE_DONE=6 STATE_FAILED=7 - STATE_STR = {'0': 'init', '1': 'pending', '2': 'hold', '3': 'active', '4': 'stopped', '5': 'suspended', '6': 'done', '7': 'failed' } + STATE_POWEROFF=8 + STATE_STR = {'0': 'init', '1': 'pending', '2': 'hold', '3': 'active', '4': 'stopped', '5': 'suspended', '6': 'done', '7': 'failed' , '8': 'poweroff' } LCM_STATE_STR={'0':'init','1':'prologing','2':'booting','3':'running','4':'migrating','5':'saving (stop)','6':'saving (suspend)','7':'saving (migrate)', '8':'prologing (migration)', '9':'prologing (resume)', '10': 'epilog (stop)','11':'epilog', '12':'cancel','13':'failure','14':'delete','15':'unknown'} values = [ 'ID','UID','NAME','LAST_POLL','STATE','LCM_STATE','DEPLOY_ID','MEMORY','CPU','NET_TX','NET_RX', 'STIME','ETIME' ] # tuples = { 'TEMPLATE': TEMPLATE, 'HISTORY': HISTORY } @@ -78,13 +79,13 @@ class RANGE(XMLObject): values = [ 'IP_START', 'IP_END' ] class AR(XMLObject): - values = [ 'IP', 'MAC', 'TYPE', 'ALLOCATED', 'GLOBAL_PREFIX', 'AR_ID' ] + values = [ 'IP', 'MAC', 'TYPE', 'ALLOCATED', 'GLOBAL_PREFIX', 'AR_ID', 'SIZE', 'USED_LEASES' ] class AR_POOL(XMLObject): tuples_lists = { 'AR': AR } class VNET(XMLObject): - values = [ 'ID', 'UID', 'GID', 'UNAME', 'GNAME', 'NAME', 'TYPE', 'BRIDGE', 'PUBLIC' ] + values = [ 'ID', 'UID', 'GID', 'UNAME', 'GNAME', 'NAME', 'TYPE', 'BRIDGE', 'PUBLIC', 'USED_LEASES', 'TOTAL_LEASES' ] tuples = { 'TEMPLATE': TEMPLATE_VNET, 'LEASES': LEASES, 'RANGE': RANGE, 'AR_POOL':AR_POOL } class VNET_POOL(XMLObject): @@ -210,9 +211,6 @@ def updateVMInfo(self, vm, auth_data): res_state = VirtualMachine.OFF vm.state = res_state - # currently only update the memory data, as it is the only one that can be changed - vm.info.systems[0].setValue('memory.size', int(res_vm.TEMPLATE.MEMORY), "M") - # Update network data self.setIPsFromTemplate(vm,res_vm.TEMPLATE) @@ -367,8 +365,8 @@ def getONETemplate(self, radl, auth_data): %s - GRAPHICS = [type="vnc",listen="0.0.0.0", keymap="es"] - ''' % (name, cpu, cpu, memory, arch, disks) + %s + ''' % (name, cpu, cpu, memory, arch, disks, ConfigOpenNebula.TEMPLATE_OTHER) res += self.get_networks_template(radl, auth_data) @@ -376,12 +374,16 @@ def getONETemplate(self, radl, auth_data): # It is supported since 3.8 version, (the VM must be prepared with the ONE contextualization script) private = system.getValue('disk.0.os.credentials.private_key') public = system.getValue('disk.0.os.credentials.public_key') - if private and public: - res += ''' - CONTEXT = [ - SSH_PUBLIC_KEY = "%s" - ] - ''' % public + + if (private and public) or ConfigOpenNebula.TEMPLATE_CONTEXT: + res += 'CONTEXT = [' + if private and public: + res += 'SSH_PUBLIC_KEY = "%s"' % public + if ConfigOpenNebula.TEMPLATE_CONTEXT: + if private and public: + res += ", " + res += ConfigOpenNebula.TEMPLATE_CONTEXT + res += ']' self.logger.debug("Template: " + res) @@ -417,18 +419,38 @@ def getONEVersion(self, auth_data): self.logger.debug("OpenNebula version: " + version) return version - def free_address(self, addres_range): + def free_range(self, range, total_leases): """ Check if there are at least one address free Arguments: - - leases(:py:class:`AR`): List of AddressRange of a ONE network. + - range(:py:class:`AR_POOL`): a Range of a ONE network. + - total_leases(str): Number of used leases Returns: bool, True if there are at least one lease free or False otherwise """ - for ar in addres_range: - if not ar.ALLOCATED: - return True + start = long(''.join(["%02X" % long(i) for i in range.IP_START.split('.')]), 16) + end = long(''.join(["%02X" % long(i) for i in range.IP_END.split('.')]), 16) + if end - start > int(total_leases): + return True + return False + + def free_address(self, addres_pool, used_leases): + """ + Check if there are at least one address free + + Arguments: + - address_pool(:py:class:`AR_POOL`): List of AddressRange of a ONE network. + - used_leases(str): Number of used leases + + Returns: bool, True if there are at least one lease free or False otherwise + """ + size = 0 + for ar in addres_pool.AR: + size += int(ar.SIZE) + + if size > int(used_leases): + return True return False def free_leases(self, leases): @@ -482,12 +504,17 @@ def getONENetworks(self, auth_data): elif net.TEMPLATE.LEASES and len(net.TEMPLATE.LEASES) > 0: ip = net.TEMPLATE.LEASES[0].IP elif net.AR_POOL and net.AR_POOL.AR and len(net.AR_POOL.AR) > 0: - # This is the case for one 4.8 - if self.free_address(net.AR_POOL.AR): + # This is the case for one 4.8 and later + if self.free_address(net.AR_POOL, net.USED_LEASES): ip = net.AR_POOL.AR[0].IP else: self.logger.warn("The network with IPs like: " + net.AR_POOL.AR[0].IP + " does not have free leases") - continue + continue + elif net.RANGE and net.RANGE.IP_START: + if self.free_range(net.RANGE, net.TOTAL_LEASES): + ip = net.RANGE.IP_START + else: + self.logger.warn("The network with IPs like: " + net.RANGE.IP_START + " does not have free leases") else: self.logger.warn("IP information is not in the VNET POOL. Use the vn.info") info_res = server.one.vn.info(session_id, int(net.ID)) @@ -498,7 +525,11 @@ def getONENetworks(self, auth_data): (success, info, err_code) = info_res else: self.logger.warn("Error in the one.vn.info return value. Ignoring network: " + net.NAME) - break + continue + + if not success: + self.logger.warn("Error in the one.vn.info function: " + info + ". Ignoring network: " + net.NAME) + continue net = VNET(info) @@ -509,7 +540,10 @@ def getONENetworks(self, auth_data): self.logger.warn("The network with IPs like: " + net.LEASES.LEASE[0].IP + " does not have free leases") break elif net.RANGE and net.RANGE.IP_START: - ip = net.RANGE.IP_START + if self.free_range(net.RANGE, net.TOTAL_LEASES): + ip = net.RANGE.IP_START + else: + self.logger.warn("The network with IPs like: " + net.RANGE.IP_START + " does not have free leases") else: self.logger.error("Unknown type of network") return (None, None) @@ -535,14 +569,24 @@ def map_radl_one_networks(self, radl_nets, one_nets): used_nets = [] last_net = None for radl_net in radl_nets: - for (net_name, net_id, is_public) in one_nets: - if net_id not in used_nets and radl_net.isPublic() == is_public : - res[radl_net.id] = (net_name, net_id, is_public) - used_nets.append(net_id) - last_net = (net_name, net_id, is_public) - break - if radl_net.id not in res: - res[radl_net.id] = last_net + # First check if the user has specified a provider ID + net_provider_id = radl_net.getValue('provider_id') + if net_provider_id: + for (net_name, net_id, is_public) in one_nets: + # If the name is the same and have the same "publicity" value + if net_name == net_provider_id and radl_net.isPublic() == is_public: + res[radl_net.id] = (net_name, net_id, is_public) + used_nets.append(net_id) + break + else: + for (net_name, net_id, is_public) in one_nets: + if net_id not in used_nets and radl_net.isPublic() == is_public: + res[radl_net.id] = (net_name, net_id, is_public) + used_nets.append(net_id) + last_net = (net_name, net_id, is_public) + break + if radl_net.id not in res: + res[radl_net.id] = last_net # In case of there are no private network, use public ones for non mapped networks used_nets = [] @@ -590,6 +634,7 @@ def get_networks_template(self, radl, auth_data): # get the one network info if nets[network]: (net_name, net_id, is_public) = nets[network] + radl.get_network_by_id(network).setValue('provider_id', str(net_name)) else: self.logger.error("No ONE network found for network: " + network) raise Exception("No ONE network found for network: " + network) @@ -612,21 +657,67 @@ def get_networks_template(self, radl, auth_data): return res - def checkSetMem(self): + def checkResize(self): """ - Check if the one.vm.setmem function appears in the ONE server + Check if the one.vm.resize function appears in the ONE server - Returns: bool, True if the one.vm.setmem function appears in the ONE server or false otherwise + Returns: bool, True if the one.vm.resize function appears in the ONE server or false otherwise """ server_url = "http://%s:%d/RPC2" % (self.cloud.server, self.cloud.port) server = xmlrpclib.ServerProxy(server_url,allow_none=True) methods = server.system.listMethods() - if "one.vm.setmem" in methods: + if "one.vm.resize" in methods: return True else: return False + def poweroff(self, vm, auth_data, timeout = 30): + """ + Poweroff the VM and waits for it to be in poweredoff state + """ + server_url = "http://%s:%d/RPC2" % (self.cloud.server, self.cloud.port) + server = xmlrpclib.ServerProxy(server_url,allow_none=True) + session_id = self.getSessionID(auth_data) + if session_id == None: + return (False, "Incorrect auth data") + func_res = server.one.vm.action(session_id, 'poweroff', int(vm.id)) + + if len(func_res) == 1: + success = True + err = vm.id + elif len(func_res) == 2: + (success, err) = func_res + elif len(func_res) == 3: + (success, err, err_code) = func_res + else: + return (False, "Error in the one.vm.action return value") + + if not success: + return (success, err) + + wait = 0 + powered_off = False + while wait < timeout and not powered_off: + func_res = server.one.vm.info(session_id, int(vm.id)) + if len(func_res) == 2: + (success, res_info) = func_res + elif len(func_res) == 3: + (success, res_info, err_code) = func_res + else: + return (False, "Error in the one.vm.info return value") + + res_vm = VM(res_info) + powered_off = res_vm.STATE == 8 + if not powered_off: + time.sleep(2) + wait += 2 + + if powered_off: + return (True, "") + else: + return (False, "Error waiting the VM to be powered off") + def alterVM(self, vm, radl, auth_data): server_url = "http://%s:%d/RPC2" % (self.cloud.server, self.cloud.port) server = xmlrpclib.ServerProxy(server_url,allow_none=True) @@ -634,12 +725,41 @@ def alterVM(self, vm, radl, auth_data): if session_id == None: return (False, "Incorrect auth data") - if self.checkSetMem(): - new_mem = radl.getValue('memory.size') - (success, info, err_code) = server.one.vm.setmem(str(vm.id), int(new_mem)) + if self.checkResize(): + if not radl.systems: + return "" + system = radl.systems[0] + + cpu = vm.info.systems[0].getValue('cpu.count') + memory = vm.info.systems[0].getFeature('memory.size').getValue('M') + new_cpu = system.getValue('cpu.count') + new_memory = system.getFeature('memory.size').getValue('M') + + new_temp = "" + if new_cpu and new_cpu != cpu: + new_temp += "CPU = %s\n" % new_cpu + new_temp += "VCPU = %s\n" % new_cpu + if new_memory and new_memory != memory: + new_temp += "MEMORY = %s\n" % new_memory + + self.logger.debug("New Template: " + new_temp) + if new_temp: + # First we must poweroff the VM + (success, info) = self.poweroff(vm, auth_data) + if not success: + return (success, info) + (success, info, err_code) = server.one.vm.resize(session_id, int(vm.id), new_temp, False) + self.start(vm, auth_data) + else: + return (True, self.updateVMInfo(vm, auth_data)) + if success: - return self.updateVMInfo(vm, auth_data) + if new_cpu: + vm.info.systems[0].setValue('cpu.count', new_cpu) + if new_memory: + vm.info.systems[0].addFeature(Feature("memory.size", "=", new_memory, 'M'), conflict="other", missing="other") + return (success, self.updateVMInfo(vm, auth_data)) else: return (success, info) else: diff --git a/connectors/__init__.py b/connectors/__init__.py index 982761c24..8f4529c07 100644 --- a/connectors/__init__.py +++ b/connectors/__init__.py @@ -15,4 +15,4 @@ # along with this program. If not, see . -__all__ = ['CloudConnector','EC2','OCCI','OpenNebula','OpenStack','LibVirt','LibCloud','Docker','GCE','FogBow'] +__all__ = ['CloudConnector','EC2','OCCI','OpenNebula','OpenStack','LibVirt','LibCloud','Docker','GCE','FogBow', 'Azure', 'DeployedNode'] diff --git a/contextualization/conf-ansible.yml b/contextualization/conf-ansible.yml index 5a163f33e..9db9574c5 100644 --- a/contextualization/conf-ansible.yml +++ b/contextualization/conf-ansible.yml @@ -4,13 +4,23 @@ vars: ANSIBLE_VERSION: 1.7.2 tasks: - - name: Apt-get update - apt: update_cache=yes cache_valid_time=604800 - when: ansible_os_family == "Debian" - - name: Install libselinux-python in RH action: yum pkg=libselinux-python state=installed when: ansible_os_family == "RedHat" + + # Disable IPv6 + - lineinfile: dest=/etc/sysctl.conf regexp="{{ item }}" line="{{ item }} = 1" + with_items: + - 'net.ipv6.conf.all.disable_ipv6' + - 'net.ipv6.conf.default.disable_ipv6' + - 'net.ipv6.conf.lo.disable_ipv6' + ignore_errors: yes + - command: sysctl -p + ignore_errors: yes + + - name: Apt-get update + apt: update_cache=yes + when: ansible_os_family == "Debian" - name: EPEL #template: src=utils/templates/epel.repo dest=/etc/yum.repos.d/epel.repo diff --git a/contextualization/ctxt_agent.py b/contextualization/ctxt_agent.py index cf8369e4f..9039bdb16 100755 --- a/contextualization/ctxt_agent.py +++ b/contextualization/ctxt_agent.py @@ -32,7 +32,9 @@ # This value enables to retry the playbooks to avoid some SSH connectivity problems # The minimum value is 1. This value will be in the data file generated by the ConfManager PLAYBOOK_RETRIES = 1 +INTERNAL_PLAYBOOK_RETRIES = 1 +PK_FILE = "/tmp/ansible_key" def wait_ssh_access(vm): """ @@ -40,30 +42,45 @@ def wait_ssh_access(vm): """ delay = 10 wait = 0 + success = False + res = None while wait < SSH_WAIT_TIMEOUT: logger.debug("Testing SSH access to VM: " + vm['ip']) wait += delay - success = False try: ssh_client = SSH(vm['ip'], vm['user'], vm['passwd'], vm['private_key'], vm['ssh_port']) success = ssh_client.test_connectivity() + res = 'init' except AuthenticationException: - # If the process of changing credentials has finished in the VM, we must use the new ones + try_ansible_key = True if 'new_passwd' in vm: + try_ansible_key = False + # If the process of changing credentials has finished in the VM, we must use the new ones logger.warn("Error connecting with SSH with initial credentials with: " + vm['ip'] + ". Try to use new ones.") try: ssh_client = SSH(vm['ip'], vm['user'], vm['new_passwd'], vm['private_key'], vm['ssh_port']) success = ssh_client.test_connectivity() + res = "new" + except AuthenticationException: + try_ansible_key = True + + if try_ansible_key: + # In some very special cases the last two cases fail, so check if the ansible key works + logger.warn("Error connecting with SSH with initial credentials with: " + vm['ip'] + ". Try to ansible_key.") + try: + ssh_client = SSH(vm['ip'], vm['user'], None, PK_FILE, vm['ssh_port']) + success = ssh_client.test_connectivity() + res = 'pk_file' except: logger.exception("Error connecting with SSH with: " + vm['ip']) success = False if success: - return True + return res else: time.sleep(delay) - return False + return None def run_command(command, timeout = None, poll_delay = 5): """ @@ -171,12 +188,11 @@ def removeRequiretty(vm): def contextualize_vm(general_conf_data, vm_conf_data): res_data = {} - pk_file = "/tmp/ansible_key" logger.info('Generate and copy the ssh key') # If the file exists, do not create it again - if not os.path.isfile(pk_file): - out = run_command('ssh-keygen -t rsa -C ' + getpass.getuser() + ' -q -N "" -f ' + pk_file) + if not os.path.isfile(PK_FILE): + out = run_command('ssh-keygen -t rsa -C ' + getpass.getuser() + ' -q -N "" -f ' + PK_FILE) logger.debug(out) # Check that we can SSH access the node @@ -191,45 +207,75 @@ def contextualize_vm(general_conf_data, vm_conf_data): return res_data for task in vm_conf_data['tasks']: - logger.debug('Launch task: ' + task) - playbook = general_conf_data['conf_dir'] + "/" + task + "_task_all.yml" - inventory_file = general_conf_data['conf_dir'] + "/hosts" - - if task == "basic": - # This is always the fist step, so put the SSH test, the requiretty removal and change password here - for vm in general_conf_data['vms']: - logger.info("Waiting SSH access to VM: " + vm['ip']) - if not wait_ssh_access(vm): - logger.error("Error Waiting SSH access to VM: " + vm['ip']) - res_data['SSH_WAIT'] = False - res_data['OK'] = False - return res_data - else: - res_data['SSH_WAIT'] = True - logger.info("SSH access to VM: " + vm['ip']+ " Open!") + task_ok = False + num_retries = 0 + while not task_ok and num_retries < PLAYBOOK_RETRIES: + num_retries += 1 + logger.debug('Launch task: ' + task) + playbook = general_conf_data['conf_dir'] + "/" + task + "_task_all.yml" + inventory_file = general_conf_data['conf_dir'] + "/hosts" - # First remove requiretty in the node - success = removeRequiretty(ctxt_vm) - if success: - logger.info("Requiretty successfully removed") + if task == "basic": + # This is always the fist step, so put the SSH test, the requiretty removal and change password here + for vm in general_conf_data['vms']: + logger.info("Waiting SSH access to VM: " + vm['ip']) + ssh_res = wait_ssh_access(vm) + logger.debug("SSH test result: " + ssh_res) + if vm['id'] == vm_conf_data['id']: + cred_used = ssh_res + if not ssh_res: + logger.error("Error Waiting SSH access to VM: " + vm['ip']) + res_data['SSH_WAIT'] = False + res_data['OK'] = False + return res_data + else: + res_data['SSH_WAIT'] = True + logger.info("SSH access to VM: " + vm['ip']+ " Open!") + + # First remove requiretty in the node + success = removeRequiretty(ctxt_vm) + if success: + logger.info("Requiretty successfully removed") + else: + logger.error("Error removing Requiretty") + # Check if we must chage user credentials + # Do not change it on the master. It must be changed only by the ConfManager + change_creds = False + if not ctxt_vm['master']: + change_creds = changeVMCredentials(ctxt_vm) + res_data['CHANGE_CREDS'] = change_creds + + # The basic task uses the credentials of VM stored in ctxt_vm + pk_file = None + if cred_used == "pk_file": + pk_file = PK_FILE + ansible_thread = LaunchAnsiblePlaybook(playbook, ctxt_vm, 2, inventory_file, pk_file, INTERNAL_PLAYBOOK_RETRIES, change_creds) else: - logger.error("Error removing Requiretty") - # Check if we must chage user credentials - # Do not change it on the master. It must be changed only by the ConfManager - change_creds = False - if not ctxt_vm['master']: - change_creds = changeVMCredentials(ctxt_vm) - res_data['CHANGE_CREDS'] = change_creds + # In some strange cases the pk_file disappears. So test it and remake basic recipe + success = False + try: + ssh_client = SSH(ctxt_vm['ip'], ctxt_vm['user'], None, PK_FILE, ctxt_vm['ssh_port']) + success = ssh_client.test_connectivity() + except: + success = False + + if not success: + logger.warn("Error connecting with SSH using the ansible key with: " + ctxt_vm['ip'] + ". Call the basic playbook again.") + basic_playbook = general_conf_data['conf_dir'] + "/basic_task_all.yml" + ansible_thread = LaunchAnsiblePlaybook(basic_playbook, ctxt_vm, 2, inventory_file, None, INTERNAL_PLAYBOOK_RETRIES, True) + ansible_thread.join() + + # in the other tasks pk_file can be used + ansible_thread = LaunchAnsiblePlaybook(playbook, ctxt_vm, 2, inventory_file, PK_FILE, INTERNAL_PLAYBOOK_RETRIES, True) - # The basic task uses the credentials of VM stored in ctxt_vm - ansible_thread = LaunchAnsiblePlaybook(playbook, ctxt_vm, 2, inventory_file, None, PLAYBOOK_RETRIES, change_creds) - else: - # in the other tasks pk_file can be used - ansible_thread = LaunchAnsiblePlaybook(playbook, ctxt_vm, 2, inventory_file, pk_file, PLAYBOOK_RETRIES, True) - - (success, _) = wait_thread(ansible_thread) - res_data[task] = success - if not success: + (task_ok, _) = wait_thread(ansible_thread) + if not task_ok: + logger.warn("ERROR executing task %s: (%s/%s)" % (task, num_retries, PLAYBOOK_RETRIES)) + else: + logger.info('Task %s finished successfully' % task) + + res_data[task] = task_ok + if not task_ok: res_data['OK'] = False return res_data diff --git a/doc/source/manual.rst b/doc/source/manual.rst index 654cc12fc..4c7abd166 100644 --- a/doc/source/manual.rst +++ b/doc/source/manual.rst @@ -5,7 +5,7 @@ IM Service Installation Prerequisites ------------- -IM needs at least Python 2.4 to run, as well as the next libraries: +IM needs at least Python 2.6 to run, as well as the next libraries: * `PLY `_, Python Lex & Yacc library for python. * `paramiko `_, ssh2 protocol library for python. @@ -64,7 +64,7 @@ Installation ------------ Form Pip -^^^^^^^^^^^ +^^^^^^^^ You only have to call the install command of the pip tool with the IM package:: @@ -129,7 +129,8 @@ Alternatively, it can be done manually:: IM reads the configuration from :file:`$IM_PATH/etc/im.cfg`, and if it is not available, does from ``/etc/im/im.cfg``. There is a template of :file:`im.cfg` -at the directory :file:`etc` on the tarball. The options are explained next. +at the directory :file:`etc` on the tarball. The IM reads the values of the ``im`` +section. The options are explained next. .. _options-basic: @@ -162,7 +163,11 @@ Basic Options ] } +.. confval:: MAX_SIMULTANEOUS_LAUNCHES + Maximum number of simultaneous VM launch operations. + The default value is 1. + .. confval:: MAX_VM_FAILS Number of attempts to launch a virtual machine before considering it @@ -356,3 +361,29 @@ GANGLIA INTEGRATION Maximum frequency to update the Ganglia info (in secs). The default value is ``30``. +IM IN LOCAL INSTALLATIONS +^^^^^^^^^^^^^^^^^^^^^^^^^ + +.. confval:: PRIVATE_NET_AS_PUBLIC + + Private network that IM will detect as public enabling to use + the IM service in installations of only one private Cloud provider. + It must be one of this values: 10.0.0.0/8, 172.16.0.0/12, 192.168.0.0/16, 169.254.0.0/16 + If this feature is not needed undefine or use an empty string. + The default value is ``''``. + +OpenNebula Options +^^^^^^^^^^^^^^^^^^ + +The configuration values under the ``OpenNebula`` section: + +.. confval:: TEMPLATE_CONTEXT + + Text to add to the CONTEXT section of the ONE template (except SSH_PUBLIC_KEY) + The default value is ``''``. + +.. confval:: TEMPLATE_OTHER + + Text to add to the ONE Template different to NAME, CPU, VCPU, MEMORY, OS, DISK and CONTEXT + The default value is ``GRAPHICS = [type="vnc",listen="0.0.0.0"]``. + diff --git a/doc/source/mimic/static/grycap.css b/doc/source/mimic/static/grycap.css index ee7a6144b..d03214ae7 100644 --- a/doc/source/mimic/static/grycap.css +++ b/doc/source/mimic/static/grycap.css @@ -249,3 +249,11 @@ div.hole { #content input.button { width: 100px; } +.figure { + text-align: center; +} +#content p.caption { + text-align: center; + font-weight: bold; +} + diff --git a/doc/source/radl.rst b/doc/source/radl.rst index 60b2f34d9..bc01386f6 100644 --- a/doc/source/radl.rst +++ b/doc/source/radl.rst @@ -54,7 +54,8 @@ The sentences under the keyword ``contextualize`` indicate the recipes that will be executed during the deployment of the virtual machine. The ``deploy`` keyword is a request to deploy a number of virtual machines. -Some identity of a cloud provider can be specified. +If some identity of a cloud provider is specified the VM will be deployed in the +Cloud provider with the "id" specified. Use Cases --------- @@ -136,6 +137,21 @@ The supported features are: If ``yes``, IPs will be public, and if ``no``, they will be private. The default value is ``no``. +``outports = `` + Indicate the ports to be open in the VM at the Cloud provider system. + Valid formats: + + * 8899/tcp-8899/tcp,22/tcp-22/tcp + * 8899/tcp-8899,22/tcp-22 + * 8899-8899,22-22 + * 8899/tcp,22/udp + * 8899,22 + + The default value is ``''``. + +``provider_id = `` + Indicate the name of the network in a specific Cloud provider. + The default value is ``''``. System Features --------------- @@ -182,6 +198,12 @@ machine. The supported features are: string contains ``#N#`` they are replaced by a number that is distinct for every virtual machine deployed with this ``system`` description. +``availability_zone`` + Set the availability zone or region where this VM will be launched. + +``instance_type`` + Set the instance type name of this VM. + ``disk..`` Features under this prefix refer to virtual storage devices attached to the virtual machine. ``disk.0`` refers to system boot device. @@ -190,8 +212,11 @@ machine. The supported features are: Set the source of the disk image. The URI designates the cloud provider: * ``one://:/``, for OpenNebula; - * ``ost://:/``, for OpenStack; and - * ``aws:///``, for Amazon Web Service. + * ``ost://:/``, for OpenStack; + * ``aws:///``, for Amazon Web Service; + * ``gce:///``, for Google Cloud; + * ``azr://``, for Microsoft Azure; and + * ``/``, for FedCloud OCCI connector. Either ``disk.0.image.url`` or ``disk.0.image.name`` must be set. @@ -249,6 +274,25 @@ machine. The supported features are: can be installed during the contextualization of the virtual machine if it is not installed. +Parametric Values +----------------- +RADL documents can use parametric values to be requested to the user in launch time. +It make easy to launch different infrastructures without modifying the RADL document, +only changing a set of values in launch time. + +This values are specified with the following syntax:: + + @input.@ + +In the following example the user will be asked for specifing the ``CPUs`` and the ``NumNodes`` +variables (in the CLI and in the Web Interface):: + + system node ( + cpu.count = @input.CPUs@ and + memory.size >= 512M + ) + deploy node @input.NumNodes@ + Configure Recipes ----------------- @@ -295,6 +339,7 @@ can be accessed by the recipes and have information about the virtual machine. ``IM__PATH`` The path to an installed application required by the virtual machine. + Including roles of Ansible Galaxy --------------------------------- @@ -325,7 +370,6 @@ documentation. In the particular case of the "micafer.hadoop" role is the follow @end ) - Examples -------- diff --git a/doc/source/videos.rst b/doc/source/videos.rst index ce09dd88b..35632c756 100644 --- a/doc/source/videos.rst +++ b/doc/source/videos.rst @@ -4,7 +4,7 @@ IM Videos There are an Infrastructure Manager youtube channel with a set of videos with demos of the functionality of the platform. -Currently there are two videos available, but soon more videos will be uploaded: +Currently there are tree videos available, but soon more videos will be uploaded: The first one shows how to use the IM web interface to launch a Hadoop Cluster with a single click in a OpenNebula on-premise cloud platform and in Amazon EC2. @@ -13,6 +13,9 @@ The second video shows a demo of how to create a cluster with a single click usi IM web interface with the EC3 tool. It also shows how CLUES works to dinamically manage the size of the cluster automatically. +The third one shows how to use the IM web interface to access `EGI FedCloud `_ + sites using the OCCI plugin, showing also how to launch a Hadoop Cluster with a single click. + `YouTube IM channel `_ diff --git a/etc/im.cfg b/etc/im.cfg index 4aec4e95d..e1d592ec9 100644 --- a/etc/im.cfg +++ b/etc/im.cfg @@ -26,6 +26,8 @@ DATA_FILE = /etc/im/inf.dat # IM user DB. To restrict the users that can access the IM service. # Comment it or set a blank value to disable user check. USER_DB = +# Maximum number of simultaneous VM launch/delete operations +MAX_SIMULTANEOUS_LAUNCHES = 1 # Max number of retries launching a VM MAX_VM_FAILS = 3 @@ -77,4 +79,14 @@ GET_GANGLIA_INFO = False GANGLIA_INFO_UPDATE_FREQUENCY = 30 # Number of retries of the Ansible playbooks in case of failure -PLAYBOOK_RETRIES = 3 \ No newline at end of file +PLAYBOOK_RETRIES = 3 + +[OpenNebula] +# OpenNebula connector configuration values + +# Text to add to the CONTEXT section of the ONE template (except SSH_PUBLIC_KEY) +TEMPLATE_CONTEXT = +# Text to add to the ONE Template different to NAME, CPU, VCPU, MEMORY, OS, DISK and CONTEXT +TEMPLATE_OTHER = GRAPHICS = [type="vnc",listen="0.0.0.0", keymap="es"] + + diff --git a/im_service.py b/im_service.py index 14d908c08..bd42cfd7b 100755 --- a/im_service.py +++ b/im_service.py @@ -16,10 +16,15 @@ # You should have received a copy of the GNU General Public License # along with this program. If not, see . +import sys + +if sys.version_info <= (2, 6): + print "Must use python 2.6 or greater" + sys.exit(1) + import logging import os import signal -import sys from IM.request import Request, AsyncRequest, AsyncXMLRPCServer, get_system_queue from IM.InfrastructureManager import InfrastructureManager @@ -326,7 +331,7 @@ def launch_daemon(): InfrastructureManager.logger.info('************ Start Infrastructure Manager daemon (v.%s) ************' % version) - # Launch the API XMLRPC thread + # Launch the API XMLRPC thread server.serve_forever_in_thread() if Config.ACTIVATE_REST: @@ -348,22 +353,27 @@ def config_logging(): fileh = logging.handlers.RotatingFileHandler(filename=Config.LOG_FILE, maxBytes=Config.LOG_FILE_MAX_SIZE, backupCount=3) formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s') fileh.setFormatter(formatter) + + try: + log_level = eval("logging." + Config.LOG_LEVEL) + except: + log_level = logging.DEBUG logging.RootLogger.propagate = 0 logging.root.setLevel(logging.ERROR) log = logging.getLogger('ConfManager') - log.setLevel(Config.LOG_LEVEL) + log.setLevel(log_level) log.propagate = 0 log.addHandler(fileh) log = logging.getLogger('CloudConnector') - log.setLevel(Config.LOG_LEVEL) + log.setLevel(log_level) log.propagate = 0 log.addHandler(fileh) log = logging.getLogger('InfrastructureManager') - log.setLevel(Config.LOG_LEVEL) + log.setLevel(log_level) log.propagate = 0 log.addHandler(fileh) diff --git a/setup.py b/setup.py index 7d60dae45..e4d5bb526 100644 --- a/setup.py +++ b/setup.py @@ -39,5 +39,5 @@ long_description="IM is a tool that ease the access and the usability of IaaS clouds by automating the VMI selection, deployment, configuration, software installation, monitoring and update of Virtual Appliances. It supports APIs from a large number of virtual platforms, making user applications cloud-agnostic. In addition it integrates a contextualization system to enable the installation and configuration of all the user required applications providing the user with a fully functional infrastructure.", description="IM is a tool to manage virtual infrastructures on Cloud deployments", platforms=["any"], - install_requires=["ansible >= 1.4","paramiko >= 1.14","PyYAML","SOAPpy","boto >= 2.29","apache-libcloud >= 0.15","ply"] + install_requires=["ansible >= 1.4","paramiko >= 1.14","PyYAML","SOAPpy","boto >= 2.29","apache-libcloud >= 0.16","ply"] ) diff --git a/test/test_im_logic.py b/test/test_im_logic.py index a39fe47b3..1dc3e93d0 100755 --- a/test/test_im_logic.py +++ b/test/test_im_logic.py @@ -21,11 +21,15 @@ import sys from mock import Mock +from IM.config import Config +# To load the ThreadPool class +Config.MAX_SIMULTANEOUS_LAUNCHES = 2 + from IM.VirtualMachine import VirtualMachine from IM.InfrastructureManager import InfrastructureManager as IM from IM.auth import Authentication from IM.radl.radl import RADL, system, deploy, Feature, SoftFeatures -from IM.config import Config +from IM.CloudInfo import CloudInfo from connectors.CloudConnector import CloudConnector class TestIM(unittest.TestCase): @@ -37,6 +41,9 @@ def setUp(self): IM._reinit() # Patch save_data IM.save_data = staticmethod(lambda: None) + + def tearDown(self): + IM.stop() @staticmethod def getAuth(im_users=[], vmrc_users=[], clouds=[]): @@ -55,7 +62,9 @@ def register_cloudconnector(self, name, cloud_connector): def gen_launch_res(self, inf, radl, requested_radl, num_vm, auth_data): res = [] for i in range(num_vm): - vm = VirtualMachine(inf, "1234", None, radl, requested_radl) + cloud = CloudInfo() + cloud.type = "DeployedNode" + vm = VirtualMachine(inf, "1234", cloud, radl, requested_radl) # create the mock for the vm finalize function vm.finalize = Mock(return_value=(True, vm)) res.append((True, vm)) @@ -100,7 +109,7 @@ def test_inf_addresources_without_credentials(self): with self.assertRaises(Exception) as ex: IM.AddResource(infId, str(radl), auth0) - self.assertIn("IncorrectVMCrecentialsException", ex.exception.message) + self.assertIn("No username", ex.exception.message) IM.DestroyInfrastructure(infId, auth0)