Skip to content

Commit

Permalink
Merge branch 'devel'
Browse files Browse the repository at this point in the history
  • Loading branch information
micafer committed May 25, 2015
2 parents 7f485fb + 7b92340 commit f211f56
Show file tree
Hide file tree
Showing 23 changed files with 327 additions and 148 deletions.
16 changes: 12 additions & 4 deletions IM/ConfManager.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import shutil
import json
import copy
from StringIO import StringIO

from IM.ansible.ansible_launcher import AnsibleThread

Expand Down Expand Up @@ -114,6 +115,12 @@ def check_vm_ips(self, timeout = Config.WAIT_RUNNING_VM_TIMEOUT):
# If the IP is not Available try to update the info
vm.update_status(self.auth)

# If the VM is not in a "running" state, return false
if vm.state in [VirtualMachine.OFF, VirtualMachine.FAILED, VirtualMachine.STOPPED]:
ConfManager.logger.warn("Inf ID: " + str(self.inf.id) + ": Error waiting all the VMs to have a correct IP. VM ID: " + str(vm.id) + " is not running.")
self.inf.set_configured(False)
return False

if vm.hasPublicNet():
ip = vm.getPublicIP()
else:
Expand Down Expand Up @@ -894,16 +901,17 @@ def call_ansible(self, tmp_dir, inventory, playbook, ssh):
os.symlink(os.path.abspath(Config.RECIPES_DIR + "/utils"), tmp_dir + "/utils")

ConfManager.logger.debug("Inf ID: " + str(self.inf.id) + ": " + 'Lanzamos ansible.')
t = AnsibleThread(tmp_dir + "/" + playbook, None, 2, gen_pk_file, ssh.password, 1, tmp_dir + "/" + inventory, ssh.username)
output = StringIO()
t = AnsibleThread(output, tmp_dir + "/" + playbook, None, 2, gen_pk_file, ssh.password, 1, tmp_dir + "/" + inventory, ssh.username)
t.daemon = True
t.start()
t.join()
(return_code, output, _) = t.results
(return_code, _) = t.results

if return_code == 0:
return (True, output)
return (True, output.getvalue())
else:
return (False, output)
return (False, output.getvalue())

def add_ansible_header(self, host, os):
"""
Expand Down
28 changes: 26 additions & 2 deletions IM/InfrastructureInfo.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@ class InfrastructureInfo:
logger = logging.getLogger('InfrastructureManager')
"""Logger object."""

FAKE_SYSTEM = "F0000__FAKE_SYSTEM__"

def __init__(self):
self._lock = threading.Lock()
"""Threading Lock to avoid concurrency problems."""
Expand Down Expand Up @@ -239,17 +241,39 @@ def complete_radl(self, radl):
radl.add(aspect.clone(), "replace")

# Add fake deploys to indicate the cloud provider associated to a private network.
FAKE_SYSTEM, system_counter = "F0000__FAKE_SYSTEM__%s", 0
system_counter = 0
for n in radl.networks:
if n.id in self.private_networks:
system_id = FAKE_SYSTEM % system_counter
system_id = self.FAKE_SYSTEM + str(system_counter)
system_counter += 1
radl.add(system(system_id, [Feature("net_interface.0.connection", "=", n.id)]))
radl.add(deploy(system_id, 0, self.private_networks[n.id]))

# Check the RADL
radl.check();

def get_radl(self):
"""
Get the RADL of this Infrastructure
"""
# remove the F0000__FAKE_SYSTEM__ deploys
# TODO: Do in a better way
radl = self.radl.clone()
deploys = []
for deploy in radl.deploys:
if not deploy.id.startswith(self.FAKE_SYSTEM):
deploys.append(deploy)
radl.deploys = deploys

# remove the F0000__FAKE_SYSTEM__ deploys
# TODO: Do in a better way
systems = []
for system in radl.systems:
if not system.name.startswith(self.FAKE_SYSTEM):
systems.append(system)
radl.systems = systems

return radl
def select_vm_master(self):
"""
Select the VM master of the infrastructure.
Expand Down
22 changes: 9 additions & 13 deletions IM/InfrastructureManager.py
Original file line number Diff line number Diff line change
Expand Up @@ -702,18 +702,9 @@ def GetInfrastructureRADL(inf_id, auth):

sel_inf = InfrastructureManager.get_infrastructure(inf_id, auth)

InfrastructureManager.logger.info("RADL obtained successfully")
# remove the F0000__FAKE_SYSTEM__ deploys
# TODO: Do in a better way
radl = sel_inf.radl.clone()
deploys = []
for deploy in radl.deploys:
if not deploy.id.startswith("F0000__FAKE_SYSTEM_"):
deploys.append(deploy)
radl.deploys = deploys

InfrastructureManager.logger.debug(str(radl))
return str(sel_inf.radl)
radl = str(sel_inf.get_radl())
InfrastructureManager.logger.debug(radl)
return radl

@staticmethod
def GetInfrastructureInfo(inf_id, auth):
Expand Down Expand Up @@ -755,7 +746,12 @@ def GetInfrastructureContMsg(inf_id, auth):
InfrastructureManager.logger.info("Getting cont msg of the inf: " + str(inf_id))

sel_inf = InfrastructureManager.get_infrastructure(inf_id, auth)
res = sel_inf.cont_out + "\n\n".join([vm.cont_out for vm in sel_inf.get_vm_list() if vm.cont_out])
res = sel_inf.cont_out

for vm in sel_inf.get_vm_list():
if vm.cont_out:
res += "VM " + str(vm.id) + ":\n" + vm.cont_out + "\n"
res += "***************************************************************************\n"

InfrastructureManager.logger.debug(res)
return res
Expand Down
16 changes: 16 additions & 0 deletions IM/SSH.py
Original file line number Diff line number Diff line change
Expand Up @@ -394,3 +394,19 @@ def execute_timeout(self, command, timeout, retry = 1, kill_command = None):
return res

raise TimeOutException("Error: Timeout")

def sftp_remove(self, path):
""" Delete a file, if possible.
Arguments:
- path: Name of the file in the remote server to delete.
Returns: True if the file is deleted or False if it exists.
"""
client = self.connect()
transport = client.get_transport()
sftp = paramiko.SFTPClient.from_transport(transport)
res = sftp.remove(path)
sftp.close()
transport.close()
return res
66 changes: 47 additions & 19 deletions IM/VirtualMachine.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ def __init__(self, inf, cloud_id, cloud, info, requested_radl, cloud_connector =
"""Last update of the VM info"""
self.destroy = False
"""Flag to specify that this VM has been destroyed"""
self.state = self.UNKNOWN
self.state = self.PENDING
"""VM State"""
self.inf = inf
"""Infrastructure which this VM is part of"""
Expand Down Expand Up @@ -404,7 +404,7 @@ def update_status(self, auth):
# If we have problems to update the VM info too much time, set to unknown
if now - self.last_update > Config.VM_INFO_UPDATE_ERROR_GRACE_PERIOD:
new_state = VirtualMachine.UNKNOWN
VirtualMachine.logger.WARN("Grace period to update VM info passed. Set state to 'unknown'")
VirtualMachine.logger.warn("Grace period to update VM info passed. Set state to 'unknown'")
else:
if state not in [VirtualMachine.RUNNING, VirtualMachine.CONFIGURED, VirtualMachine.UNCONFIGURED]:
new_state = state
Expand Down Expand Up @@ -522,11 +522,18 @@ def check_ctxt_process(self):
self.ctxt_pid = None
self.configured = False

ip = self.getPublicIP()
if not ip:
ip = ip = self.getPrivateIP()
remote_dir = Config.REMOTE_CONF_DIR + "/" + ip + "_" + str(self.getSSHPort())

initial_count_out = self.cont_out
wait = 0
while self.ctxt_pid:
if self.ctxt_pid != self.WAIT_TO_PID:
ssh = self.inf.vm_master.get_ssh()

if self.state in [VirtualMachine.OFF, VirtualMachine.FAILED]:
if self.state in [VirtualMachine.OFF, VirtualMachine.FAILED, VirtualMachine.STOPPED]:
try:
ssh.execute("kill -9 " + str(self.ctxt_pid))
except:
Expand All @@ -551,12 +558,24 @@ def check_ctxt_process(self):
if exit_status != 0:
self.ctxt_pid = None
# The process has finished, get the outputs
ip = self.getPublicIP()
if not ip:
ip = ip = self.getPrivateIP()
remote_dir = Config.REMOTE_CONF_DIR + "/" + ip + "_" + str(self.getSSHPort())
self.get_ctxt_output(remote_dir)
ctxt_log = self.get_ctxt_log(remote_dir, True)
self.get_ctxt_output(remote_dir, True)
if ctxt_log:
self.cont_out = initial_count_out + ctxt_log
else:
self.cont_out = initial_count_out + "Error getting contextualization process log."
else:
# Get the log of the process to update the cont_out dynamically
if Config.UPDATE_CTXT_LOG_INTERVAL > 0 and wait > Config.UPDATE_CTXT_LOG_INTERVAL:
wait = 0
VirtualMachine.logger.debug("Get the log of the ctxt process with pid: "+ str(self.ctxt_pid))
ctxt_log = self.get_ctxt_log(remote_dir)
self.cont_out = initial_count_out + ctxt_log
# The process is still running, wait
time.sleep(Config.CHECK_CTXT_PROCESS_INTERVAL)
wait += Config.CHECK_CTXT_PROCESS_INTERVAL
else:
# We are waiting the PID, sleep
time.sleep(Config.CHECK_CTXT_PROCESS_INTERVAL)

return self.ctxt_pid
Expand All @@ -572,31 +591,40 @@ def is_configured(self):
# Otherwise return the value of configured
return self.configured

def get_ctxt_output(self, remote_dir):
def get_ctxt_log(self, remote_dir, delete = False):
ssh = self.inf.vm_master.get_ssh()
tmp_dir = tempfile.mkdtemp()

# Donwload the contextualization agent log
conf_out = ""

# Download the contextualization agent log
try:
# Get the messages of the contextualization process
ssh.sftp_get(remote_dir + '/ctxt_agent.log', tmp_dir + '/ctxt_agent.log')
with open(tmp_dir + '/ctxt_agent.log') as f: conf_out = f.read()

# Remove problematic chars
conf_out = filter(lambda x: x in string.printable, conf_out)
self.cont_out += conf_out.encode("ascii", "replace")

ssh.execute("rm -rf " + remote_dir + '/ctxt_agent.log')
except Exception, ex:
VirtualMachine.logger.exception("Error getting contextualization process output")
conf_out = filter(lambda x: x in string.printable, conf_out).encode("ascii", "replace")
if delete:
ssh.sftp_remove(remote_dir + '/ctxt_agent.log')
except Exception:
VirtualMachine.logger.exception("Error getting contextualization process log")
self.configured = False
self.cont_out += "Error getting contextualization process output: " + str(ex)
finally:
shutil.rmtree(tmp_dir, ignore_errors=True)

return conf_out

def get_ctxt_output(self, remote_dir, delete = False):
ssh = self.inf.vm_master.get_ssh()
tmp_dir = tempfile.mkdtemp()

# Donwload the contextualization agent log
# Download the contextualization agent log
try:
# Get the JSON output of the ctxt_agent
ssh.sftp_get(remote_dir + '/ctxt_agent.out', tmp_dir + '/ctxt_agent.out')
with open(tmp_dir + '/ctxt_agent.out') as f: ctxt_agent_out = json.load(f)
if delete:
ssh.sftp_remove(remote_dir + '/ctxt_agent.out')
# And process it
self.process_ctxt_agent_out(ctxt_agent_out)
except Exception, ex:
Expand Down
2 changes: 1 addition & 1 deletion IM/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,6 @@


__all__ = ['auth','bottle','CloudManager','config','ConfManager','db','ganglia','HTTPHeaderTransport','ImageManager','InfrastructureInfo','InfrastructureManager','parsetab','radl','recipe','request','REST', 'ServiceRequests','SSH','timedcall','uriparse','VMRC','xmlobject']
__version__ = '1.2.3'
__version__ = '1.2.4'
__author__ = 'Miguel Caballer'

6 changes: 5 additions & 1 deletion IM/ansible/ansible_callbacks.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,15 @@
import getpass
import fnmatch
import datetime
import logging

def display(msg, color=None, stderr=False, screen_only=False, log_only=False, runner=None, output=sys.stdout):
if not log_only:
msg2 = msg
print >>output, msg2
if isinstance(output, logging.Logger):
output.info(msg2)
else:
print >>output, msg2

class AggregateStats(object):
''' holds stats about per-host activity during playbook runs '''
Expand Down
18 changes: 8 additions & 10 deletions IM/ansible/ansible_launcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import time
import os
import threading
from StringIO import StringIO
import ansible.playbook
import ansible.inventory
import ansible.constants as C
Expand All @@ -38,7 +37,7 @@ def colorize(lead, num, color):
def hostcolor(host, stats, color=True):
return "%-26s" % host

def launch_playbook(playbook_file, host, passwd, threads, pk_file = None, retries = 1, inventory_file=None, user=None, extra_vars={}):
def launch_playbook(output, playbook_file, host, passwd, threads, pk_file = None, retries = 1, inventory_file=None, user=None, extra_vars={}):
''' run ansible-playbook operations '''

# create parser for CLI options
Expand Down Expand Up @@ -70,7 +69,6 @@ def launch_playbook(playbook_file, host, passwd, threads, pk_file = None, retrie
if not os.path.isfile(playbook_file):
raise errors.AnsibleError("the playbook: %s does not appear to be a file" % playbook_file)

output = StringIO()
num_retries = 0
return_code = 4
hosts_with_errors = []
Expand Down Expand Up @@ -161,14 +159,14 @@ def launch_playbook(playbook_file, host, passwd, threads, pk_file = None, retrie
if return_code != 0:
display("ERROR executing playbook (%s/%s)" % (num_retries, retries), color='red', output=output)

return (return_code, output.getvalue(), hosts_with_errors)
return (return_code, hosts_with_errors)


class AnsibleThread(threading.Thread):
"""
Class to call the ansible playbooks in a Thread
"""
def __init__(self, playbook_file, host = None, threads = 1, pk_file = None, passwd = None, retries = 1, inventory_file=None, user=None, extra_vars={}):
def __init__(self, output, playbook_file, host = None, threads = 1, pk_file = None, passwd = None, retries = 1, inventory_file=None, user=None, extra_vars={}):
threading.Thread.__init__(self)

self.playbook_file = playbook_file
Expand All @@ -180,12 +178,12 @@ def __init__(self, playbook_file, host = None, threads = 1, pk_file = None, pass
self.inventory_file = inventory_file
self.user = user
self.extra_vars=extra_vars
self.results = (None, None, None)
self.output = output
self.results = (None, None)

def run(self):
try:
self.results = launch_playbook(self.playbook_file, self.host, self.passwd, self.threads, self.pk_file, self.retries, self.inventory_file, self.user, self.extra_vars)
self.results = launch_playbook(self.output, self.playbook_file, self.host, self.passwd, self.threads, self.pk_file, self.retries, self.inventory_file, self.user, self.extra_vars)
except errors.AnsibleError, e:
output = StringIO()
display("ERROR: %s" % e, color='red', stderr=True, output=output)
self.results = (1, output.getvalue(), [])
display("ERROR: %s" % e, color='red', stderr=True, output=self.output)
self.results = (1, [])
10 changes: 7 additions & 3 deletions IM/auth.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ def __init__(self, auth_data):
else:
self.auth_list = auth_data

def getAuthInfo(self, auth_type):
def getAuthInfo(self, auth_type, host = None):
"""
Get the auth data of the specified type
Expand All @@ -50,8 +50,12 @@ def getAuthInfo(self, auth_type):
res = []
for auth in self.auth_list:
if auth['type'] == auth_type:
res.append(auth)
break
if host:
if 'host' in auth and auth['host'].find(host) != -1:
res.append(auth)
else:
res.append(auth)

return res

def getAuthInfoByID(self, auth_id):
Expand Down
1 change: 1 addition & 0 deletions IM/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ class Config:
PRIVATE_NET_AS_PUBLIC = ''
CHECK_CTXT_PROCESS_INTERVAL = 5
CONFMAMAGER_CHECK_STATE_INTERVAL = 5
UPDATE_CTXT_LOG_INTERVAL = 20

config = ConfigParser.ConfigParser()
config.read([Config.IM_PATH + '/../im.cfg', Config.IM_PATH + '/../etc/im.cfg', '/etc/im/im.cfg'])
Expand Down
Loading

0 comments on commit f211f56

Please sign in to comment.