From 433e009a91251853a6c24b63af7dc0016b564675 Mon Sep 17 00:00:00 2001 From: antban Date: Tue, 13 Sep 2016 09:44:52 +0200 Subject: [PATCH] #61 Ability to list and cancel current actions --- bubuku/communicate.py | 48 +++++++++++++++++++++++++++ bubuku/controller.py | 63 +++++++++++++++++++++++++----------- bubuku/health.py | 49 ++++++++++++++++++++++++++-- bubuku/zookeeper/__init__.py | 5 ++- 4 files changed, 144 insertions(+), 21 deletions(-) create mode 100644 bubuku/communicate.py diff --git a/bubuku/communicate.py b/bubuku/communicate.py new file mode 100644 index 0000000..16c7123 --- /dev/null +++ b/bubuku/communicate.py @@ -0,0 +1,48 @@ +import logging +import threading +import time +from queue import Queue, Empty, Full + +__COMMAND_QUEUE = Queue() + +_LOG = logging.getLogger('bubuku.communicates') + + +def sleep_and_operate(controller, timeout: float): + cur_time = time.time() + finish = cur_time + (0.1 if timeout <= 0 else timeout) + while cur_time < finish: + try: + commnad = __COMMAND_QUEUE.get(block=True, timeout=finish - cur_time) + try: + commnad(controller) + except Exception as e: + _LOG.error('Command finished with error', exc_info=e) + except Empty: + pass + cur_time = time.time() + + +def execute_on_controller_thread(function, timeout): + condition = threading.Condition() + result = [None, True] + + def _execute(controller): + with condition: + if result[1]: + try: + result[0] = function(controller) + finally: + condition.notify() + + finish = time.time() + timeout + with condition: + try: + __COMMAND_QUEUE.put(_execute, timeout=timeout) + except Full: + raise TimeoutError('Timeout expired') + if condition.wait(timeout=finish - time.time()): + return result[0] + else: + result[1] = False + raise TimeoutError('Timeout expired') diff --git a/bubuku/controller.py b/bubuku/controller.py index 47c1a1d..ac2707c 100644 --- a/bubuku/controller.py +++ b/bubuku/controller.py @@ -1,7 +1,8 @@ import logging -from time import sleep, time +from time import time from bubuku.broker import BrokerManager +from bubuku.communicate import sleep_and_operate from bubuku.env_provider import EnvProvider from bubuku.zookeeper import BukuExhibitor @@ -56,16 +57,43 @@ def __init__(self, broker_manager: BrokerManager, zk: BukuExhibitor, env_provide self.checks = [] self.changes = {} # Holds mapping from change name to array of pending changes self.running = True + self.provider_id = None # provider id must not be requested on initialization + + def enumerate_changes(self): + with self.zk.lock(self.provider_id): + running_changes = self.zk.get_running_changes() + + result = [] + for name, change_list in self.changes.items(): + running = running_changes.get(name) == self.provider_id + first = True + for change in change_list: + result.append({ + 'type': name, + 'description': str(change), + 'running': bool(first and running) + }) + first = False + return result + + def cancel_changes(self, name): + result = len(self.changes.get(name, {})) + if result: + with self.zk.lock(self.provider_id): + self.zk.unregister_change(name) + + del self.changes[name] + return result def add_check(self, check): _LOG.info('Adding check {}'.format(str(check))) self.checks.append(check) - def _register_running_changes(self, provider_id: str) -> dict: + def _register_running_changes(self) -> dict: if not self.changes: return {} # Do not take lock if there are no changes to register _LOG.debug('Taking lock for processing') - with self.zk.lock(provider_id): + with self.zk.lock(self.provider_id): _LOG.debug('Lock is taken') # Get list of current running changes running_changes = self.zk.get_running_changes() @@ -75,23 +103,23 @@ def _register_running_changes(self, provider_id: str) -> dict: for name, change_list in self.changes.items(): # Only first change is able to run first_change = change_list[0] - if first_change.can_run(_exclude_self(provider_id, name, running_changes)): + if first_change.can_run(_exclude_self(self.provider_id, name, running_changes)): if name not in running_changes: - self.zk.register_change(name, provider_id) - running_changes[name] = provider_id + self.zk.register_change(name, self.provider_id) + running_changes[name] = self.provider_id else: _LOG.info('Change {} is waiting for others: {}'.format(name, running_changes)) return running_changes - def _run_changes(self, running_changes: dict, provider_id: str) -> list: + def _run_changes(self, running_changes: dict) -> list: changes_to_remove = [] for name, change_list in self.changes.items(): - if name in running_changes and running_changes[name] == provider_id: + if name in running_changes and running_changes[name] == self.provider_id: change = change_list[0] _LOG.info('Executing action {} step'.format(change)) if self.running or change.can_run_at_exit(): try: - if not change.run(_exclude_self(provider_id, change.get_name(), running_changes)): + if not change.run(_exclude_self(self.provider_id, change.get_name(), running_changes)): _LOG.info('Action {} completed'.format(change)) changes_to_remove.append(change.get_name()) else: @@ -119,24 +147,23 @@ def _release_changes_lock(self, changes_to_remove): self.zk.unregister_change(name) def loop(self, change_on_init=None): - provider_id = self.env_provider.get_id() + self.provider_id = self.env_provider.get_id() if change_on_init: self._add_change_to_queue(change_on_init) while self.running or self.changes: - self.make_step(provider_id) + self.make_step() if self.changes: - sleep(0.5) + timeout = 0.5 else: - min_time_till_check = min([check.time_till_check() for check in self.checks]) - if min_time_till_check > 0: - sleep(min_time_till_check) + timeout = min([check.time_till_check() for check in self.checks]) + sleep_and_operate(self, timeout) - def make_step(self, provider_id): + def make_step(self): # register running changes - running_changes = self._register_running_changes(provider_id) + running_changes = self._register_running_changes() # apply changes without holding lock - changes_to_remove = self._run_changes(running_changes, provider_id) + changes_to_remove = self._run_changes(running_changes) # remove processed actions self._release_changes_lock(changes_to_remove) if self.running: diff --git a/bubuku/health.py b/bubuku/health.py index e80278f..3a98a51 100644 --- a/bubuku/health.py +++ b/bubuku/health.py @@ -1,13 +1,30 @@ import json import logging import threading +from functools import partial from http.server import BaseHTTPRequestHandler, HTTPServer +from bubuku.communicate import execute_on_controller_thread +from bubuku.controller import Controller from bubuku.utils import CmdHelper +_CONTROLLER_TIMEOUT = 5 + +_API_CONTROLLER = '/api/controller/' + _LOG = logging.getLogger('bubuku.health') +def load_controller_queue(controller: Controller): + return controller.enumerate_changes() + + +def delete_from_controller_queue(name: str, controller: Controller): + return { + 'count': controller.cancel_changes(name) + } + + class _Handler(BaseHTTPRequestHandler): cmd_helper = None @@ -15,11 +32,39 @@ def do_GET(self): if self.path in ('/api/disk_stats', '/api/disk_stats/'): used_kb, free_kb = self.cmd_helper.get_disk_stats() self._send_response({'free_kb': free_kb, 'used_kb': used_kb}) + elif self.path.startswith(_API_CONTROLLER): + self.wrap_controller_execution(lambda: self._run_controller_action(self.path[len(_API_CONTROLLER):])) else: self._send_response({'status': 'OK'}) - def _send_response(self, json_): - self.send_response(200) + def wrap_controller_execution(self, call): + try: + call() + except TimeoutError as e: + _LOG.error('Failed to rum action because of timeouts', exc_info=e) + self._send_response({'code': 'timeout', 'message': 'Timeout occurred'}, 500) + + def do_DELETE(self): + if not self.path.startswith(_API_CONTROLLER): + return self._send_response({'message': 'Path {} is not supported'.format(self.path)}, 404) + action = self.path[len(_API_CONTROLLER):].split('/') + if action[0] == 'queue': + if len(action) < 2: + return self._send_response({'message': 'No second argument provided!'}, 400) + self.wrap_controller_execution( + lambda: self._send_response(execute_on_controller_thread( + partial(delete_from_controller_queue, action[1]), _CONTROLLER_TIMEOUT), 200)) + else: + return self._send_response({'message': 'Action {} is not supported'.format(action[0])}, 404) + + def _run_controller_action(self, action): + if action.split('/')[0] == 'queue': + return execute_on_controller_thread(load_controller_queue, _CONTROLLER_TIMEOUT) + else: + return self._send_response({'message': 'Action {} is not supported'.format(action)}, 404) + + def _send_response(self, json_, status_code=200): + self.send_response(status_code) self.send_header('Content-type', 'application/json') self.end_headers() self.wfile.write(json.dumps(json_).encode('utf-8')) diff --git a/bubuku/zookeeper/__init__.py b/bubuku/zookeeper/__init__.py index 59e9afd..7bd01d8 100644 --- a/bubuku/zookeeper/__init__.py +++ b/bubuku/zookeeper/__init__.py @@ -144,7 +144,10 @@ def create(self, *args, **kwargs): def delete(self, *args, **kwargs): self.hosts_cache.touch() - return self.client.retry(self.client.delete, *args, **kwargs) + try: + return self.client.retry(self.client.delete, *args, **kwargs) + except NoNodeError: + pass def get_children(self, *params): self.hosts_cache.touch()