Skip to content
This repository has been archived by the owner on Oct 7, 2024. It is now read-only.

Commit

Permalink
#61 Extend cli to be able to list actions and remove them
Browse files Browse the repository at this point in the history
  • Loading branch information
antban committed Sep 13, 2016
1 parent 433e009 commit aae9ce3
Show file tree
Hide file tree
Showing 4 changed files with 101 additions and 14 deletions.
87 changes: 87 additions & 0 deletions bubuku/cli.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
import logging

import click
import requests
from requests import Response

from bubuku.config import load_config, KafkaProperties, Config
from bubuku.env_provider import EnvProvider
Expand Down Expand Up @@ -100,6 +102,91 @@ def swap_partitions(threshold: int):
RemoteCommandExecutorCheck.register_fatboy_slim(zookeeper, threshold_kb=threshold)


@cli.group(name='actions', help='Work with running actions')
def actions():
pass


@actions.command('list', help='List all the actions on broker(s)')
@click.option('--broker', type=click.STRING,
help='Broker id to list actions on. By default all brokers are enumerated')
def list_actions(broker: str):
table = []
config, env_provider = __prepare_configs()

for broker_id, address in _list_broker_addresses(config, env_provider, broker):
try:
response = requests.get('http://{}:{}/api/controller/queue'.format(address, config.health_port))
except Exception as e:
print('Failed to query information on {} ({})'.format(broker_id, address))
_LOG.error('Failed to query information on {} ({})'.format(broker_id, address), exc_info=e)
continue
line = {
'_broker_id': broker_id,
'_broker_address': address,
}
if response.status_code != 200:
line['error'] = _extract_error(response)
table.append(line)
else:
changes = response.json()
if not changes:
line.update({
'type': None,
'description': None,
'running': None
})
table.append(line)
else:
for change in changes:
line_copy = dict(line)
line_copy.update(change)
table.append(line_copy)
if not table:
print('No brokers found')
else:
_print_table(table)


@actions.command('delete', help='List all the actions on broker(s)')
@click.option('--action', type=click.STRING,
help='Action to delete')
@click.option('--broker', type=click.STRING,
help='Broker id to list actions on. By default all brokers are enumerated')
def delete_actions(action: str, broker: str):
if not action:
print('No action specified. Please specify it')
config, env_provider = __prepare_configs()

for broker_id, address in _list_broker_addresses(config, env_provider, broker):
try:
response = requests.delete('http://{}:{}/api/controller/queue/{}'.format(address, config.health_port, action))
except Exception as e:
print('Failed to query information on {} ({})'.format(broker_id, address))
_LOG.error('Failed to query information on {} ({})'.format(broker_id, address), exc_info=e)
continue
if response.status_code not in (200, 204):
print('Failed to delete action from {} ({}): {}'.format(broker, address, _extract_error(response)))
else:
print('Removed action {} from {} ({})'.format(action, broker_id, address))


def _extract_error(response: Response):
try:
return response.json()['message']
except Exception as e:
_LOG.error('Failed to parse response message', exc_info=e)
return response.text()


def _list_broker_addresses(config, env_provider, broker):
with load_exhibitor_proxy(env_provider.get_address_provider(), config.zk_prefix) as zookeeper:
for broker_id in zookeeper.get_broker_ids():
if broker and broker != broker_id:
continue
yield broker_id, zookeeper.get_broker_address(broker_id)


@cli.command('stats', help='Display statistics about brokers')
def show_stats():
config, env_provider = __prepare_configs()
Expand Down
2 changes: 1 addition & 1 deletion bubuku/communicate.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

__COMMAND_QUEUE = Queue()

_LOG = logging.getLogger('bubuku.communicates')
_LOG = logging.getLogger('bubuku.communicate')


def sleep_and_operate(controller, timeout: float):
Expand Down
2 changes: 1 addition & 1 deletion bubuku/health.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ def do_DELETE(self):

def _run_controller_action(self, action):
if action.split('/')[0] == 'queue':
return execute_on_controller_thread(load_controller_queue, _CONTROLLER_TIMEOUT)
return self._send_response(execute_on_controller_thread(load_controller_queue, _CONTROLLER_TIMEOUT), 200)
else:
return self._send_response({'message': 'Action {} is not supported'.format(action)}, 404)

Expand Down
24 changes: 12 additions & 12 deletions tests/test_controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,40 +41,40 @@ def check(self):
return FakeChange(self.changes_issued - 1)

current_changes = {}
ip = 'fake'
zk = MagicMock()
zk.get_running_changes.return_value = current_changes
zk.register_change = lambda x, y: current_changes.update({x: y})
zk.unregister_change = lambda x: current_changes.pop(x)

controller = Controller(MagicMock(), zk, MagicMock())
controller.provider_id = 'fake'
controller.add_check(FakeCheck())

assert [3, 3, 3] == running_count
controller.make_step(ip)
controller.make_step()
assert not current_changes
assert [3, 3, 3] == running_count
controller.make_step(ip)
controller.make_step()
assert current_changes
assert [2, 3, 3] == running_count
controller.make_step(ip)
controller.make_step()
assert [1, 3, 3] == running_count
controller.make_step(ip)
controller.make_step()
assert [0, 3, 3] == running_count
controller.make_step(ip)
controller.make_step()
assert [0, 2, 3] == running_count
controller.make_step(ip)
controller.make_step()
assert [0, 1, 3] == running_count
controller.make_step(ip)
controller.make_step()
assert [0, 0, 3] == running_count
controller.make_step(ip)
controller.make_step()
assert [0, 0, 2] == running_count
controller.make_step(ip)
controller.make_step()
assert [0, 0, 1] == running_count
assert current_changes
controller.make_step(ip)
controller.make_step()
assert [0, 0, 0] == running_count
assert not current_changes
controller.make_step(ip)
controller.make_step()
assert [0, 0, 0] == running_count
assert not current_changes

0 comments on commit aae9ce3

Please sign in to comment.