diff --git a/bubuku/cli.py b/bubuku/cli.py index 2e054b4..6615c22 100644 --- a/bubuku/cli.py +++ b/bubuku/cli.py @@ -1,6 +1,8 @@ import logging import click +import requests +from requests import Response from bubuku.config import load_config, KafkaProperties, Config from bubuku.env_provider import EnvProvider @@ -100,6 +102,91 @@ def swap_partitions(threshold: int): RemoteCommandExecutorCheck.register_fatboy_slim(zookeeper, threshold_kb=threshold) +@cli.group(name='actions', help='Work with running actions') +def actions(): + pass + + +@actions.command('list', help='List all the actions on broker(s)') +@click.option('--broker', type=click.STRING, + help='Broker id to list actions on. By default all brokers are enumerated') +def list_actions(broker: str): + table = [] + config, env_provider = __prepare_configs() + + for broker_id, address in _list_broker_addresses(config, env_provider, broker): + try: + response = requests.get('http://{}:{}/api/controller/queue'.format(address, config.health_port)) + except Exception as e: + print('Failed to query information on {} ({})'.format(broker_id, address)) + _LOG.error('Failed to query information on {} ({})'.format(broker_id, address), exc_info=e) + continue + line = { + '_broker_id': broker_id, + '_broker_address': address, + } + if response.status_code != 200: + line['error'] = _extract_error(response) + table.append(line) + else: + changes = response.json() + if not changes: + line.update({ + 'type': None, + 'description': None, + 'running': None + }) + table.append(line) + else: + for change in changes: + line_copy = dict(line) + line_copy.update(change) + table.append(line_copy) + if not table: + print('No brokers found') + else: + _print_table(table) + + +@actions.command('delete', help='Remove all actions of specified type on broker(s)') +@click.option('--action', type=click.STRING, + help='Action to delete') +@click.option('--broker', type=click.STRING, + help='Broker id to delete actions on. By default actions are deleted on all brokers') +def delete_actions(action: str, broker: str): + if not action: + print('No action specified. Please specify it') + config, env_provider = __prepare_configs() + + for broker_id, address in _list_broker_addresses(config, env_provider, broker): + try: + response = requests.delete('http://{}:{}/api/controller/queue/{}'.format(address, config.health_port, action)) + except Exception as e: + print('Failed to query information on {} ({})'.format(broker_id, address)) + _LOG.error('Failed to query information on {} ({})'.format(broker_id, address), exc_info=e) + continue + if response.status_code not in (200, 204): + print('Failed to delete action from {} ({}): {}'.format(broker, address, _extract_error(response))) + else: + print('Removed action {} from {} ({})'.format(action, broker_id, address)) + + +def _extract_error(response: Response): + try: + return response.json()['message'] + except Exception as e: + _LOG.error('Failed to parse response message', exc_info=e) + return response.text() + + +def _list_broker_addresses(config, env_provider, broker): + with load_exhibitor_proxy(env_provider.get_address_provider(), config.zk_prefix) as zookeeper: + for broker_id in zookeeper.get_broker_ids(): + if broker and broker != broker_id: + continue + yield broker_id, zookeeper.get_broker_address(broker_id) + + @cli.command('stats', help='Display statistics about brokers') def show_stats(): config, env_provider = __prepare_configs() diff --git a/bubuku/communicate.py b/bubuku/communicate.py new file mode 100644 index 0000000..929a699 --- /dev/null +++ b/bubuku/communicate.py @@ -0,0 +1,48 @@ +import logging +import threading +import time +from queue import Queue, Empty, Full + +__COMMAND_QUEUE = Queue() + +_LOG = logging.getLogger('bubuku.communicate') + + +def sleep_and_operate(controller, timeout: float): + cur_time = time.time() + finish = cur_time + (0.1 if timeout <= 0 else timeout) + while cur_time < finish: + try: + command = __COMMAND_QUEUE.get(block=True, timeout=finish - cur_time) + try: + command(controller) + except Exception as e: + _LOG.error('Command finished with error', exc_info=e) + except Empty: + pass + cur_time = time.time() + + +def execute_on_controller_thread(function, timeout): + condition = threading.Condition() + result = [None, True] + + def _execute(controller): + with condition: + if result[1]: + try: + result[0] = function(controller) + finally: + condition.notify() + + finish = time.time() + timeout + with condition: + try: + __COMMAND_QUEUE.put(_execute, timeout=timeout) + except Full: + raise TimeoutError('Timeout expired') + if condition.wait(timeout=finish - time.time()): + return result[0] + else: + result[1] = False + raise TimeoutError('Timeout expired') diff --git a/bubuku/controller.py b/bubuku/controller.py index 47c1a1d..ac2707c 100644 --- a/bubuku/controller.py +++ b/bubuku/controller.py @@ -1,7 +1,8 @@ import logging -from time import sleep, time +from time import time from bubuku.broker import BrokerManager +from bubuku.communicate import sleep_and_operate from bubuku.env_provider import EnvProvider from bubuku.zookeeper import BukuExhibitor @@ -56,16 +57,43 @@ def __init__(self, broker_manager: BrokerManager, zk: BukuExhibitor, env_provide self.checks = [] self.changes = {} # Holds mapping from change name to array of pending changes self.running = True + self.provider_id = None # provider id must not be requested on initialization + + def enumerate_changes(self): + with self.zk.lock(self.provider_id): + running_changes = self.zk.get_running_changes() + + result = [] + for name, change_list in self.changes.items(): + running = running_changes.get(name) == self.provider_id + first = True + for change in change_list: + result.append({ + 'type': name, + 'description': str(change), + 'running': bool(first and running) + }) + first = False + return result + + def cancel_changes(self, name): + result = len(self.changes.get(name, {})) + if result: + with self.zk.lock(self.provider_id): + self.zk.unregister_change(name) + + del self.changes[name] + return result def add_check(self, check): _LOG.info('Adding check {}'.format(str(check))) self.checks.append(check) - def _register_running_changes(self, provider_id: str) -> dict: + def _register_running_changes(self) -> dict: if not self.changes: return {} # Do not take lock if there are no changes to register _LOG.debug('Taking lock for processing') - with self.zk.lock(provider_id): + with self.zk.lock(self.provider_id): _LOG.debug('Lock is taken') # Get list of current running changes running_changes = self.zk.get_running_changes() @@ -75,23 +103,23 @@ def _register_running_changes(self, provider_id: str) -> dict: for name, change_list in self.changes.items(): # Only first change is able to run first_change = change_list[0] - if first_change.can_run(_exclude_self(provider_id, name, running_changes)): + if first_change.can_run(_exclude_self(self.provider_id, name, running_changes)): if name not in running_changes: - self.zk.register_change(name, provider_id) - running_changes[name] = provider_id + self.zk.register_change(name, self.provider_id) + running_changes[name] = self.provider_id else: _LOG.info('Change {} is waiting for others: {}'.format(name, running_changes)) return running_changes - def _run_changes(self, running_changes: dict, provider_id: str) -> list: + def _run_changes(self, running_changes: dict) -> list: changes_to_remove = [] for name, change_list in self.changes.items(): - if name in running_changes and running_changes[name] == provider_id: + if name in running_changes and running_changes[name] == self.provider_id: change = change_list[0] _LOG.info('Executing action {} step'.format(change)) if self.running or change.can_run_at_exit(): try: - if not change.run(_exclude_self(provider_id, change.get_name(), running_changes)): + if not change.run(_exclude_self(self.provider_id, change.get_name(), running_changes)): _LOG.info('Action {} completed'.format(change)) changes_to_remove.append(change.get_name()) else: @@ -119,24 +147,23 @@ def _release_changes_lock(self, changes_to_remove): self.zk.unregister_change(name) def loop(self, change_on_init=None): - provider_id = self.env_provider.get_id() + self.provider_id = self.env_provider.get_id() if change_on_init: self._add_change_to_queue(change_on_init) while self.running or self.changes: - self.make_step(provider_id) + self.make_step() if self.changes: - sleep(0.5) + timeout = 0.5 else: - min_time_till_check = min([check.time_till_check() for check in self.checks]) - if min_time_till_check > 0: - sleep(min_time_till_check) + timeout = min([check.time_till_check() for check in self.checks]) + sleep_and_operate(self, timeout) - def make_step(self, provider_id): + def make_step(self): # register running changes - running_changes = self._register_running_changes(provider_id) + running_changes = self._register_running_changes() # apply changes without holding lock - changes_to_remove = self._run_changes(running_changes, provider_id) + changes_to_remove = self._run_changes(running_changes) # remove processed actions self._release_changes_lock(changes_to_remove) if self.running: diff --git a/bubuku/health.py b/bubuku/health.py index e80278f..76956fa 100644 --- a/bubuku/health.py +++ b/bubuku/health.py @@ -1,13 +1,30 @@ import json import logging import threading +from functools import partial from http.server import BaseHTTPRequestHandler, HTTPServer +from bubuku.communicate import execute_on_controller_thread +from bubuku.controller import Controller from bubuku.utils import CmdHelper +_CONTROLLER_TIMEOUT = 5 + +_API_CONTROLLER = '/api/controller/' + _LOG = logging.getLogger('bubuku.health') +def load_controller_queue(controller: Controller): + return controller.enumerate_changes() + + +def delete_from_controller_queue(name: str, controller: Controller): + return { + 'count': controller.cancel_changes(name) + } + + class _Handler(BaseHTTPRequestHandler): cmd_helper = None @@ -15,11 +32,39 @@ def do_GET(self): if self.path in ('/api/disk_stats', '/api/disk_stats/'): used_kb, free_kb = self.cmd_helper.get_disk_stats() self._send_response({'free_kb': free_kb, 'used_kb': used_kb}) + elif self.path.startswith(_API_CONTROLLER): + self.wrap_controller_execution(lambda: self._run_controller_action(self.path[len(_API_CONTROLLER):])) else: self._send_response({'status': 'OK'}) - def _send_response(self, json_): - self.send_response(200) + def wrap_controller_execution(self, call): + try: + call() + except TimeoutError as e: + _LOG.error('Failed to rum action because of timeouts', exc_info=e) + self._send_response({'code': 'timeout', 'message': 'Timeout occurred'}, 500) + + def do_DELETE(self): + if not self.path.startswith(_API_CONTROLLER): + return self._send_response({'message': 'Path {} is not supported'.format(self.path)}, 404) + action = self.path[len(_API_CONTROLLER):].split('/') + if action[0] == 'queue': + if len(action) < 2: + return self._send_response({'message': 'No second argument provided!'}, 400) + self.wrap_controller_execution( + lambda: self._send_response(execute_on_controller_thread( + partial(delete_from_controller_queue, action[1]), _CONTROLLER_TIMEOUT), 200)) + else: + return self._send_response({'message': 'Action {} is not supported'.format(action[0])}, 404) + + def _run_controller_action(self, action): + if action.split('/')[0] == 'queue': + return self._send_response(execute_on_controller_thread(load_controller_queue, _CONTROLLER_TIMEOUT), 200) + else: + return self._send_response({'message': 'Action {} is not supported'.format(action)}, 404) + + def _send_response(self, json_, status_code=200): + self.send_response(status_code) self.send_header('Content-type', 'application/json') self.end_headers() self.wfile.write(json.dumps(json_).encode('utf-8')) diff --git a/bubuku/zookeeper/__init__.py b/bubuku/zookeeper/__init__.py index 59e9afd..7bd01d8 100644 --- a/bubuku/zookeeper/__init__.py +++ b/bubuku/zookeeper/__init__.py @@ -144,7 +144,10 @@ def create(self, *args, **kwargs): def delete(self, *args, **kwargs): self.hosts_cache.touch() - return self.client.retry(self.client.delete, *args, **kwargs) + try: + return self.client.retry(self.client.delete, *args, **kwargs) + except NoNodeError: + pass def get_children(self, *params): self.hosts_cache.touch() diff --git a/tests/test_controller.py b/tests/test_controller.py index 3b4497e..118e794 100644 --- a/tests/test_controller.py +++ b/tests/test_controller.py @@ -41,40 +41,40 @@ def check(self): return FakeChange(self.changes_issued - 1) current_changes = {} - ip = 'fake' zk = MagicMock() zk.get_running_changes.return_value = current_changes zk.register_change = lambda x, y: current_changes.update({x: y}) zk.unregister_change = lambda x: current_changes.pop(x) controller = Controller(MagicMock(), zk, MagicMock()) + controller.provider_id = 'fake' controller.add_check(FakeCheck()) assert [3, 3, 3] == running_count - controller.make_step(ip) + controller.make_step() assert not current_changes assert [3, 3, 3] == running_count - controller.make_step(ip) + controller.make_step() assert current_changes assert [2, 3, 3] == running_count - controller.make_step(ip) + controller.make_step() assert [1, 3, 3] == running_count - controller.make_step(ip) + controller.make_step() assert [0, 3, 3] == running_count - controller.make_step(ip) + controller.make_step() assert [0, 2, 3] == running_count - controller.make_step(ip) + controller.make_step() assert [0, 1, 3] == running_count - controller.make_step(ip) + controller.make_step() assert [0, 0, 3] == running_count - controller.make_step(ip) + controller.make_step() assert [0, 0, 2] == running_count - controller.make_step(ip) + controller.make_step() assert [0, 0, 1] == running_count assert current_changes - controller.make_step(ip) + controller.make_step() assert [0, 0, 0] == running_count assert not current_changes - controller.make_step(ip) + controller.make_step() assert [0, 0, 0] == running_count assert not current_changes