From aae9ce3a2b5517e1cf57c3c8fca1d031cf2a6156 Mon Sep 17 00:00:00 2001 From: antban Date: Tue, 13 Sep 2016 11:22:33 +0200 Subject: [PATCH] #61 Extend cli to be able to list actions and remove them --- bubuku/cli.py | 87 ++++++++++++++++++++++++++++++++++++++++ bubuku/communicate.py | 2 +- bubuku/health.py | 2 +- tests/test_controller.py | 24 +++++------ 4 files changed, 101 insertions(+), 14 deletions(-) diff --git a/bubuku/cli.py b/bubuku/cli.py index 2e054b4..5f8b67d 100644 --- a/bubuku/cli.py +++ b/bubuku/cli.py @@ -1,6 +1,8 @@ import logging import click +import requests +from requests import Response from bubuku.config import load_config, KafkaProperties, Config from bubuku.env_provider import EnvProvider @@ -100,6 +102,91 @@ def swap_partitions(threshold: int): RemoteCommandExecutorCheck.register_fatboy_slim(zookeeper, threshold_kb=threshold) +@cli.group(name='actions', help='Work with running actions') +def actions(): + pass + + +@actions.command('list', help='List all the actions on broker(s)') +@click.option('--broker', type=click.STRING, + help='Broker id to list actions on. By default all brokers are enumerated') +def list_actions(broker: str): + table = [] + config, env_provider = __prepare_configs() + + for broker_id, address in _list_broker_addresses(config, env_provider, broker): + try: + response = requests.get('http://{}:{}/api/controller/queue'.format(address, config.health_port)) + except Exception as e: + print('Failed to query information on {} ({})'.format(broker_id, address)) + _LOG.error('Failed to query information on {} ({})'.format(broker_id, address), exc_info=e) + continue + line = { + '_broker_id': broker_id, + '_broker_address': address, + } + if response.status_code != 200: + line['error'] = _extract_error(response) + table.append(line) + else: + changes = response.json() + if not changes: + line.update({ + 'type': None, + 'description': None, + 'running': None + }) + table.append(line) + else: + for change in changes: + line_copy = dict(line) + line_copy.update(change) + table.append(line_copy) + if not table: + print('No brokers found') + else: + _print_table(table) + + +@actions.command('delete', help='List all the actions on broker(s)') +@click.option('--action', type=click.STRING, + help='Action to delete') +@click.option('--broker', type=click.STRING, + help='Broker id to list actions on. By default all brokers are enumerated') +def delete_actions(action: str, broker: str): + if not action: + print('No action specified. Please specify it') + config, env_provider = __prepare_configs() + + for broker_id, address in _list_broker_addresses(config, env_provider, broker): + try: + response = requests.delete('http://{}:{}/api/controller/queue/{}'.format(address, config.health_port, action)) + except Exception as e: + print('Failed to query information on {} ({})'.format(broker_id, address)) + _LOG.error('Failed to query information on {} ({})'.format(broker_id, address), exc_info=e) + continue + if response.status_code not in (200, 204): + print('Failed to delete action from {} ({}): {}'.format(broker, address, _extract_error(response))) + else: + print('Removed action {} from {} ({})'.format(action, broker_id, address)) + + +def _extract_error(response: Response): + try: + return response.json()['message'] + except Exception as e: + _LOG.error('Failed to parse response message', exc_info=e) + return response.text() + + +def _list_broker_addresses(config, env_provider, broker): + with load_exhibitor_proxy(env_provider.get_address_provider(), config.zk_prefix) as zookeeper: + for broker_id in zookeeper.get_broker_ids(): + if broker and broker != broker_id: + continue + yield broker_id, zookeeper.get_broker_address(broker_id) + + @cli.command('stats', help='Display statistics about brokers') def show_stats(): config, env_provider = __prepare_configs() diff --git a/bubuku/communicate.py b/bubuku/communicate.py index 16c7123..234a8f8 100644 --- a/bubuku/communicate.py +++ b/bubuku/communicate.py @@ -5,7 +5,7 @@ __COMMAND_QUEUE = Queue() -_LOG = logging.getLogger('bubuku.communicates') +_LOG = logging.getLogger('bubuku.communicate') def sleep_and_operate(controller, timeout: float): diff --git a/bubuku/health.py b/bubuku/health.py index 3a98a51..76956fa 100644 --- a/bubuku/health.py +++ b/bubuku/health.py @@ -59,7 +59,7 @@ def do_DELETE(self): def _run_controller_action(self, action): if action.split('/')[0] == 'queue': - return execute_on_controller_thread(load_controller_queue, _CONTROLLER_TIMEOUT) + return self._send_response(execute_on_controller_thread(load_controller_queue, _CONTROLLER_TIMEOUT), 200) else: return self._send_response({'message': 'Action {} is not supported'.format(action)}, 404) diff --git a/tests/test_controller.py b/tests/test_controller.py index 3b4497e..118e794 100644 --- a/tests/test_controller.py +++ b/tests/test_controller.py @@ -41,40 +41,40 @@ def check(self): return FakeChange(self.changes_issued - 1) current_changes = {} - ip = 'fake' zk = MagicMock() zk.get_running_changes.return_value = current_changes zk.register_change = lambda x, y: current_changes.update({x: y}) zk.unregister_change = lambda x: current_changes.pop(x) controller = Controller(MagicMock(), zk, MagicMock()) + controller.provider_id = 'fake' controller.add_check(FakeCheck()) assert [3, 3, 3] == running_count - controller.make_step(ip) + controller.make_step() assert not current_changes assert [3, 3, 3] == running_count - controller.make_step(ip) + controller.make_step() assert current_changes assert [2, 3, 3] == running_count - controller.make_step(ip) + controller.make_step() assert [1, 3, 3] == running_count - controller.make_step(ip) + controller.make_step() assert [0, 3, 3] == running_count - controller.make_step(ip) + controller.make_step() assert [0, 2, 3] == running_count - controller.make_step(ip) + controller.make_step() assert [0, 1, 3] == running_count - controller.make_step(ip) + controller.make_step() assert [0, 0, 3] == running_count - controller.make_step(ip) + controller.make_step() assert [0, 0, 2] == running_count - controller.make_step(ip) + controller.make_step() assert [0, 0, 1] == running_count assert current_changes - controller.make_step(ip) + controller.make_step() assert [0, 0, 0] == running_count assert not current_changes - controller.make_step(ip) + controller.make_step() assert [0, 0, 0] == running_count assert not current_changes