Skip to content

Commit

Permalink
Merge branch 'devel'
Browse files Browse the repository at this point in the history
  • Loading branch information
micafer committed May 12, 2015
2 parents ec22f0d + 757ae73 commit 145e570
Show file tree
Hide file tree
Showing 28 changed files with 1,375 additions and 953 deletions.
176 changes: 114 additions & 62 deletions IM/ConfManager.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
from SSH import AuthenticationException
from recipe import Recipe
from radl.radl import system, contextualize_item
import ServiceRequests

from config import Config

Expand All @@ -45,7 +46,6 @@ class ConfManager(threading.Thread):
""" The file with the ansible steps to configure the master node """
SECOND_STEP_YAML = 'conf-ansible-s2.yml'
""" The file with the ansible steps to configure the second step of the the master node """
THREAD_SLEEP_DELAY = 5

def __init__(self, inf, auth, max_ctxt_time = 1e9):
threading.Thread.__init__(self)
Expand All @@ -63,32 +63,31 @@ def check_running_pids(self, vms_configuring):
for step, vm_list in vms_configuring.iteritems():
for vm in vm_list:
if isinstance(vm,VirtualMachine):
if vm.check_ctxt_process():
# Update the info of the VM to check it is in a correct state
vm.update_status(self.auth)
if vm.is_ctxt_process_running():
if step not in res:
res[step] = []
res[step].append(vm)
ConfManager.logger.debug("Inf ID: " + str(self.inf.id) + ": Ansible process to configure " + str(vm.im_id) + " with PID " + vm.ctxt_pid + " is still running.")
else:
if vm.configured:
ConfManager.logger.debug("Inf ID: " + str(self.inf.id) + ": Configuration process in VM: " + str(vm.im_id) + " successfully finished.")
else:
ConfManager.logger.debug("Inf ID: " + str(self.inf.id) + ": Configuration process in VM: " + str(vm.im_id) + " failed.")
# Force to save the data to store the log data
InfrastructureManager.InfrastructureManager.save_data()
ConfManager.logger.debug("Inf ID: " + str(self.inf.id) + ": Configuration process in VM: " + str(vm.im_id) + " finished.")
# Force to save the data to store the log data ()
ServiceRequests.IMBaseRequest.create_request(ServiceRequests.IMBaseRequest.SAVE_DATA)
else:
# General Infrastructure tasks
if vm.check_ctxt_process():
if vm.is_ctxt_process_running():
if step not in res:
res[step] = []
res[step].append(vm)
ConfManager.logger.debug("Inf ID: " + str(self.inf.id) + ": Configuration process of master node is still running.")
ConfManager.logger.debug("Inf ID: " + str(self.inf.id) + ": Configuration process of master node: " + str(vm.get_ctxt_process_names()) + " is still running.")
else:
if vm.configured:
ConfManager.logger.debug("Inf ID: " + str(self.inf.id) + ":Configuration process of master node successfully finished.")
ConfManager.logger.debug("Inf ID: " + str(self.inf.id) + ": Configuration process of master node successfully finished.")
else:
ConfManager.logger.debug("Inf ID: " + str(self.inf.id) + ": Configuration process of master node failed.")
# Force to save the data to store the log data
InfrastructureManager.InfrastructureManager.save_data()
ServiceRequests.IMBaseRequest.create_request(ServiceRequests.IMBaseRequest.SAVE_DATA)

return res

Expand Down Expand Up @@ -126,8 +125,8 @@ def check_vm_ips(self, timeout = Config.WAIT_RUNNING_VM_TIMEOUT):

if not success:
ConfManager.logger.warn("Inf ID: " + str(self.inf.id) + ": Error waiting all the VMs to have a correct IP")
wait += self.THREAD_SLEEP_DELAY
time.sleep(self.THREAD_SLEEP_DELAY)
wait += Config.CONFMAMAGER_CHECK_STATE_INTERVAL
time.sleep(Config.CONFMAMAGER_CHECK_STATE_INTERVAL)
else:
self.inf.set_configured(True)

Expand All @@ -148,7 +147,7 @@ def run(self):

# If the queue is empty but there are vms configuring wait and test again
if self.inf.ctxt_tasks.empty() and vms_configuring:
time.sleep(self.THREAD_SLEEP_DELAY)
time.sleep(Config.CONFMAMAGER_CHECK_STATE_INTERVAL)
continue

(step, prio, vm, tasks) = self.inf.ctxt_tasks.get()
Expand All @@ -167,7 +166,7 @@ def run(self):
# If there are any process running of last step, wait
if last_step in vms_configuring and len(vms_configuring[last_step]) > 0:
ConfManager.logger.debug("Inf ID: " + str(self.inf.id) + ": Waiting processes of step " + str(last_step) + " to finish.")
time.sleep(self.THREAD_SLEEP_DELAY)
time.sleep(Config.CONFMAMAGER_CHECK_STATE_INTERVAL)
else:
# if not, update the step, to go ahead with the new step
ConfManager.logger.debug("Inf ID: " + str(self.inf.id) + ": Step " + str(last_step) + " finished. Go to step: " + str(step))
Expand All @@ -180,72 +179,99 @@ def run(self):
elif vm.ctxt_pid:
ConfManager.logger.debug("Inf ID: " + str(self.inf.id) + ": VM ID " + str(vm.im_id) + " has running processes, wait.")
# If there are, add the tasks again to the queue
# Set the priority to a higher number to decrease the proprity enabling to select other items of the queue before
# Set the priority to a higher number to decrease the priority enabling to select other items of the queue before
self.inf.add_ctxt_tasks([(step, prio+1, vm, tasks)])
# Sleep to check this later
time.sleep(self.THREAD_SLEEP_DELAY)
time.sleep(Config.CONFMAMAGER_CHECK_STATE_INTERVAL)
else:
# If not, launch it
try:
# Mark this VM as configuring
vm.configured = None
vm.ctxt_pid = self.launch_ctxt_agent(vm, tasks)
if step not in vms_configuring:
vms_configuring[step] = []
vms_configuring[step].append(vm)
# Force to save the data to store the log data
InfrastructureManager.InfrastructureManager.save_data()
except:
ConfManager.logger.exception("Inf ID: " + str(self.inf.id) + ": Error launching ctxt agent on VM: " + str(vm.im_id))
# Set this VM as configuration failed
vm.configured = False
# Mark this VM as configuring
vm.configured = None
# Launch the ctxt_agent using a thread
t = threading.Thread(name="launch_ctxt_agent_" + str(vm.id), target=eval("self.launch_ctxt_agent"),args=(vm, tasks))
t.daemon = True
t.start()
vm.inf.conf_threads.append(t)
if step not in vms_configuring:
vms_configuring[step] = []
vms_configuring[step].append(vm.inf)
# Add the VM to the list of configuring vms
vms_configuring[step].append(vm)
# Set the "special pid" to wait untill the real pid is assigned
vm.ctxt_pid = VirtualMachine.WAIT_TO_PID
# Force to save the data to store the log data
ServiceRequests.IMBaseRequest.create_request(ServiceRequests.IMBaseRequest.SAVE_DATA)
else:
# Launch the Infrastructure tasks
vm.configured = None
for task in tasks:
t = threading.Thread(target=eval("self." + task))
t = threading.Thread(name=task ,target=eval("self." + task))
t.daemon = True
t.start()
vm.conf_threads.append(t)
if step not in vms_configuring:
vms_configuring[step] = []
vms_configuring[step].append(vm)
# Force to save the data to store the log data
InfrastructureManager.InfrastructureManager.save_data()
ServiceRequests.IMBaseRequest.create_request(ServiceRequests.IMBaseRequest.SAVE_DATA)


last_step = step

def launch_ctxt_agent(self, vm, tasks):
ip = vm.getPublicIP()
if not ip:
ip = vm.getPrivateIP()
remote_dir = Config.REMOTE_CONF_DIR + "/" + ip + "_" + str(vm.getSSHPort())
tmp_dir = tempfile.mkdtemp()

ConfManager.logger.debug("Inf ID: " + str(self.inf.id) + ": Create the configuration file for the contextualization agent")
conf_file = tmp_dir + "/config.cfg"
self.create_vm_conf_file(conf_file, vm.im_id, tasks, remote_dir)
def launch_ctxt_agent(self, vm, tasks, max_retries = 3):
"""
Launch the ctxt agent to configure the specified tasks in the specified VM
"""
pid = None
retries = 0
while not pid and retries < max_retries:
retries += 1
try:
ip = vm.getPublicIP()
if not ip:
ip = vm.getPrivateIP()
remote_dir = Config.REMOTE_CONF_DIR + "/" + ip + "_" + str(vm.getSSHPort())
tmp_dir = tempfile.mkdtemp()

ConfManager.logger.debug("Inf ID: " + str(self.inf.id) + ": Copy the contextualization agent config file")

# Copy the contextualization agent config file
ssh = self.inf.vm_master.get_ssh()
ssh.sftp_mkdir(remote_dir)
ssh.sftp_put(conf_file, remote_dir + "/" + os.path.basename(conf_file))
ConfManager.logger.debug("Inf ID: " + str(self.inf.id) + ": Create the configuration file for the contextualization agent")
conf_file = tmp_dir + "/config.cfg"
self.create_vm_conf_file(conf_file, vm.im_id, tasks, remote_dir)

ConfManager.logger.debug("Inf ID: " + str(self.inf.id) + ": Copy the contextualization agent config file")

shutil.rmtree(tmp_dir, ignore_errors=True)

(pid, _, _) = ssh.execute("nohup python_ansible " + Config.REMOTE_CONF_DIR + "/ctxt_agent.py "
+ Config.REMOTE_CONF_DIR + "/general_info.cfg "
+ remote_dir + "/" + os.path.basename(conf_file)
+ " > " + remote_dir + "/stdout" + " 2> " + remote_dir + "/stderr < /dev/null & echo -n $!")
# Copy the contextualization agent config file
ssh = self.inf.vm_master.get_ssh()
ssh.sftp_mkdir(remote_dir)
ssh.sftp_put(conf_file, remote_dir + "/" + os.path.basename(conf_file))

shutil.rmtree(tmp_dir, ignore_errors=True)

ConfManager.logger.debug("Inf ID: " + str(self.inf.id) + ": Ansible process to configure " + str(vm.im_id) + " launched with pid: " + pid)
(pid, _, _) = ssh.execute("nohup python_ansible " + Config.REMOTE_CONF_DIR + "/ctxt_agent.py "
+ Config.REMOTE_CONF_DIR + "/general_info.cfg "
+ remote_dir + "/" + os.path.basename(conf_file)
+ " > " + remote_dir + "/stdout" + " 2> " + remote_dir + "/stderr < /dev/null & echo -n $!")

ConfManager.logger.debug("Inf ID: " + str(self.inf.id) + ": Ansible process to configure " + str(vm.im_id) + " launched with pid: " + pid)

vm.ctxt_pid = pid
vm.launch_check_ctxt_process()
except:
pid = None
ConfManager.logger.exception("Inf ID: " + str(self.inf.id) + ": Error (%d/%d) launching the ansible process to configure %s" % (retries, max_retries, str(vm.im_id)))
time.sleep(retries*2)

# If the process is not correctly launched the configuration of this VM fails
if pid is None:
vm.ctxt_pid = None
vm.configured = False
vm.cont_out = "Error launching the contextualization agent to configure the VM. Check the SSH connection."

return pid
return pid

def generate_inventory(self, tmp_dir):
"""
Generate the ansible inventory file
"""
ConfManager.logger.debug("Inf ID: " + str(self.inf.id) + ": create the ansible configuration file")
res_filename = "hosts"
ansible_file = tmp_dir + "/" + res_filename
Expand Down Expand Up @@ -296,7 +322,10 @@ def generate_inventory(self, tmp_dir):
(nodename, nodedom) = vm.getRequestedName(default_domain = Config.DEFAULT_DOMAIN)

node_line = ip + ":" + str(vm.getSSHPort())
node_line += ' IM_NODE_HOSTNAME=' + nodename

if vm.id == self.inf.vm_master.id:
node_line += ' ansible_connection=local'

node_line += ' IM_NODE_HOSTNAME=' + nodename
node_line += ' IM_NODE_FQDN=' + nodename + "." + nodedom
node_line += ' IM_NODE_DOMAIN=' + nodedom
Expand Down Expand Up @@ -330,6 +359,9 @@ def generate_inventory(self, tmp_dir):
return res_filename

def generate_etc_hosts(self, tmp_dir):
"""
Generate the /etc/hosts file to the infrastructure
"""
res_filename = "etc_hosts"
hosts_file = tmp_dir + "/" + res_filename
hosts_out = open(hosts_file, 'w')
Expand Down Expand Up @@ -363,6 +395,9 @@ def generate_etc_hosts(self, tmp_dir):
return res_filename

def generate_basic_playbook(self, tmp_dir):
"""
Generate the basic playbook to be launched in all the VMs
"""
recipe_files = []
pk_file = "/tmp/ansible_key"
shutil.copy(Config.CONTEXTUALIZATION_DIR + "/basic.yml", tmp_dir + "/basic_task_all.yml")
Expand All @@ -376,6 +411,11 @@ def generate_basic_playbook(self, tmp_dir):
return recipe_files

def generate_main_playbook(self, vm, group, tmp_dir):
"""
Generate the main playbook to be launched in all the VMs.
This playbook basically install the apps specified in the RADL
(as apps not in the configure section)
"""
recipe_files = []
# Get the info about the apps from the recipes DB
_, recipes = Recipe.getInfoApps(vm.getAppsToInstall())
Expand Down Expand Up @@ -424,6 +464,9 @@ def generate_main_playbook(self, vm, group, tmp_dir):
return recipe_files

def generate_playbook(self, vm, ctxt_elem, tmp_dir):
"""
Generate the playbook for the specified configure section
"""
recipe_files = []

conf_filename = tmp_dir + "/" + ctxt_elem.configure + "_" + ctxt_elem.system + "_task.yml"
Expand All @@ -444,6 +487,12 @@ def generate_playbook(self, vm, ctxt_elem, tmp_dir):
return recipe_files

def configure_master(self):
"""
Perform all the tasks to configure the master VM.
* Change the password
* Install ansible
* Copy the contextualization agent files
"""
success = True
if not self.inf.ansible_configured:
try:
Expand Down Expand Up @@ -491,7 +540,7 @@ def configure_master(self):
self.inf.ansible_configured = True
self.inf.set_configured(True)
# Force to save the data to store the log data
InfrastructureManager.InfrastructureManager.save_data()
ServiceRequests.IMBaseRequest.create_request(ServiceRequests.IMBaseRequest.SAVE_DATA)
else:
self.inf.ansible_configured = False
self.inf.set_configured(False)
Expand Down Expand Up @@ -555,7 +604,7 @@ def wait_master(self):
self.change_master_credentials(ssh)

# Force to save the data to store the log data
InfrastructureManager.InfrastructureManager.save_data()
ServiceRequests.IMBaseRequest.create_request(ServiceRequests.IMBaseRequest.SAVE_DATA)

self.inf.set_configured(True)
except:
Expand All @@ -567,6 +616,9 @@ def wait_master(self):
return success

def generate_playbooks_and_hosts(self):
"""
Generate all the files needed in the contextualization, playbooks, /etc/hosts, inventory
"""
try:
tmp_dir = tempfile.mkdtemp()
remote_dir = Config.REMOTE_CONF_DIR
Expand Down Expand Up @@ -995,7 +1047,7 @@ def create_general_conf_file(self, conf_file, vm_list):
conf_data['conf_dir'] = Config.REMOTE_CONF_DIR

conf_out = open(conf_file, 'w')
ConfManager.logger.debug("Ctxt agent configuration file: " + json.dumps(conf_data))
ConfManager.logger.debug("Ctxt agent general configuration file: " + json.dumps(conf_data))
json.dump(conf_data, conf_out, indent=2)
conf_out.close()

Expand All @@ -1010,7 +1062,7 @@ def create_vm_conf_file(self, conf_file, vm_id, tasks, remote_dir):
conf_data['remote_dir'] = remote_dir

conf_out = open(conf_file, 'w')
ConfManager.logger.debug("Ctxt agent configuration file: " + json.dumps(conf_data))
ConfManager.logger.debug("Ctxt agent vm configuration file: " + json.dumps(conf_data))
json.dump(conf_data, conf_out, indent=2)
conf_out.close()

Expand Down
11 changes: 9 additions & 2 deletions IM/InfrastructureInfo.py
Original file line number Diff line number Diff line change
Expand Up @@ -331,8 +331,11 @@ def add_ctxt_tasks(self, ctxt_tasks):

for elem in to_add:
self.ctxt_tasks.put(elem)

def get_ctxt_process_names(self):
return [t.name for t in self.conf_threads]

def check_ctxt_process(self):
def is_ctxt_process_running(self):
all_finished = True
for t in self.conf_threads:
if t.isAlive():
Expand Down Expand Up @@ -373,7 +376,11 @@ def Contextualize(self, auth):
tasks[ctxt_num].append(ctxt_elem.configure + "_" + ctxt_elem.system)

for step in tasks.keys():
ctxt_task.append((step,0,vm,tasks[step]))
priority = 0
# Set more priority to the new VMs to launch the ctxt process first in them
if vm.configured is None:
priority = -1
ctxt_task.append((step,priority,vm,tasks[step]))

self.add_ctxt_tasks(ctxt_task)

Expand Down
6 changes: 3 additions & 3 deletions IM/InfrastructureManager.py
Original file line number Diff line number Diff line change
Expand Up @@ -999,7 +999,7 @@ def ExportInfrastructure(inf_id, delete, auth_data):
auth = Authentication(auth_data)

sel_inf = InfrastructureManager.get_infrastructure(inf_id, auth)
str_inf = pickle.dumps(sel_inf, 2)
str_inf = pickle.dumps(sel_inf)
InfrastructureManager.logger.info("Exporting infrastructure id: " + str(sel_inf.id))
if delete:
sel_inf.deleted = True
Expand Down Expand Up @@ -1042,8 +1042,8 @@ def save_data():
if not InfrastructureManager._exiting:
try:
data_file = open(Config.DATA_FILE, 'wb')
pickle.dump(InfrastructureManager.global_inf_id, data_file, 2)
pickle.dump(InfrastructureManager.infrastructure_list, data_file, 2)
pickle.dump(InfrastructureManager.global_inf_id, data_file)
pickle.dump(InfrastructureManager.infrastructure_list, data_file)
data_file.close()
except Exception, ex:
InfrastructureManager.logger.exception("ERROR saving data to the file: " + Config.DATA_FILE + ". Changes not stored!!")
Expand Down
Loading

0 comments on commit 145e570

Please sign in to comment.