Skip to content
This repository has been archived by the owner on Oct 7, 2024. It is now read-only.

Ability to list and cancel running (and queued) actions of bubuku #64

Merged
merged 3 commits into from
Sep 21, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
87 changes: 87 additions & 0 deletions bubuku/cli.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
import logging

import click
import requests
from requests import Response

from bubuku.config import load_config, KafkaProperties, Config
from bubuku.env_provider import EnvProvider
Expand Down Expand Up @@ -100,6 +102,91 @@ def swap_partitions(threshold: int):
RemoteCommandExecutorCheck.register_fatboy_slim(zookeeper, threshold_kb=threshold)


@cli.group(name='actions', help='Work with running actions')
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's this?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@v-stepanov I don't fully get the question. It's group description if you run bubuku-cli actions

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What does group description do?

def actions():
pass


@actions.command('list', help='List all the actions on broker(s)')
@click.option('--broker', type=click.STRING,
help='Broker id to list actions on. By default all brokers are enumerated')
def list_actions(broker: str):
table = []
config, env_provider = __prepare_configs()

for broker_id, address in _list_broker_addresses(config, env_provider, broker):
try:
response = requests.get('http://{}:{}/api/controller/queue'.format(address, config.health_port))
except Exception as e:
print('Failed to query information on {} ({})'.format(broker_id, address))
_LOG.error('Failed to query information on {} ({})'.format(broker_id, address), exc_info=e)
continue
line = {
'_broker_id': broker_id,
'_broker_address': address,
}
if response.status_code != 200:
line['error'] = _extract_error(response)
table.append(line)
else:
changes = response.json()
if not changes:
line.update({
'type': None,
'description': None,
'running': None
})
table.append(line)
else:
for change in changes:
line_copy = dict(line)
line_copy.update(change)
table.append(line_copy)
if not table:
print('No brokers found')
else:
_print_table(table)


@actions.command('delete', help='Remove all actions of specified type on broker(s)')
@click.option('--action', type=click.STRING,
help='Action to delete')
@click.option('--broker', type=click.STRING,
help='Broker id to delete actions on. By default actions are deleted on all brokers')
def delete_actions(action: str, broker: str):
if not action:
print('No action specified. Please specify it')
config, env_provider = __prepare_configs()

for broker_id, address in _list_broker_addresses(config, env_provider, broker):
try:
response = requests.delete('http://{}:{}/api/controller/queue/{}'.format(address, config.health_port, action))
except Exception as e:
print('Failed to query information on {} ({})'.format(broker_id, address))
_LOG.error('Failed to query information on {} ({})'.format(broker_id, address), exc_info=e)
continue
if response.status_code not in (200, 204):
print('Failed to delete action from {} ({}): {}'.format(broker, address, _extract_error(response)))
else:
print('Removed action {} from {} ({})'.format(action, broker_id, address))


def _extract_error(response: Response):
try:
return response.json()['message']
except Exception as e:
_LOG.error('Failed to parse response message', exc_info=e)
return response.text()


def _list_broker_addresses(config, env_provider, broker):
with load_exhibitor_proxy(env_provider.get_address_provider(), config.zk_prefix) as zookeeper:
for broker_id in zookeeper.get_broker_ids():
if broker and broker != broker_id:
continue
yield broker_id, zookeeper.get_broker_address(broker_id)


@cli.command('stats', help='Display statistics about brokers')
def show_stats():
config, env_provider = __prepare_configs()
Expand Down
48 changes: 48 additions & 0 deletions bubuku/communicate.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
import logging
import threading
import time
from queue import Queue, Empty, Full

__COMMAND_QUEUE = Queue()

_LOG = logging.getLogger('bubuku.communicate')


def sleep_and_operate(controller, timeout: float):
cur_time = time.time()
finish = cur_time + (0.1 if timeout <= 0 else timeout)
while cur_time < finish:
try:
command = __COMMAND_QUEUE.get(block=True, timeout=finish - cur_time)
try:
command(controller)
except Exception as e:
_LOG.error('Command finished with error', exc_info=e)
except Empty:
pass
cur_time = time.time()


def execute_on_controller_thread(function, timeout):
condition = threading.Condition()
result = [None, True]

def _execute(controller):
with condition:
if result[1]:
try:
result[0] = function(controller)
finally:
condition.notify()

finish = time.time() + timeout
with condition:
try:
__COMMAND_QUEUE.put(_execute, timeout=timeout)
except Full:
raise TimeoutError('Timeout expired')
if condition.wait(timeout=finish - time.time()):
return result[0]
else:
result[1] = False
raise TimeoutError('Timeout expired')
63 changes: 45 additions & 18 deletions bubuku/controller.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
import logging
from time import sleep, time
from time import time

from bubuku.broker import BrokerManager
from bubuku.communicate import sleep_and_operate
from bubuku.env_provider import EnvProvider
from bubuku.zookeeper import BukuExhibitor

Expand Down Expand Up @@ -56,16 +57,43 @@ def __init__(self, broker_manager: BrokerManager, zk: BukuExhibitor, env_provide
self.checks = []
self.changes = {} # Holds mapping from change name to array of pending changes
self.running = True
self.provider_id = None # provider id must not be requested on initialization

def enumerate_changes(self):
with self.zk.lock(self.provider_id):
running_changes = self.zk.get_running_changes()

result = []
for name, change_list in self.changes.items():
running = running_changes.get(name) == self.provider_id
first = True
for change in change_list:
result.append({
'type': name,
'description': str(change),
'running': bool(first and running)
})
first = False
return result

def cancel_changes(self, name):
result = len(self.changes.get(name, {}))
if result:
with self.zk.lock(self.provider_id):
self.zk.unregister_change(name)

del self.changes[name]
return result

def add_check(self, check):
_LOG.info('Adding check {}'.format(str(check)))
self.checks.append(check)

def _register_running_changes(self, provider_id: str) -> dict:
def _register_running_changes(self) -> dict:
if not self.changes:
return {} # Do not take lock if there are no changes to register
_LOG.debug('Taking lock for processing')
with self.zk.lock(provider_id):
with self.zk.lock(self.provider_id):
_LOG.debug('Lock is taken')
# Get list of current running changes
running_changes = self.zk.get_running_changes()
Expand All @@ -75,23 +103,23 @@ def _register_running_changes(self, provider_id: str) -> dict:
for name, change_list in self.changes.items():
# Only first change is able to run
first_change = change_list[0]
if first_change.can_run(_exclude_self(provider_id, name, running_changes)):
if first_change.can_run(_exclude_self(self.provider_id, name, running_changes)):
if name not in running_changes:
self.zk.register_change(name, provider_id)
running_changes[name] = provider_id
self.zk.register_change(name, self.provider_id)
running_changes[name] = self.provider_id
else:
_LOG.info('Change {} is waiting for others: {}'.format(name, running_changes))
return running_changes

def _run_changes(self, running_changes: dict, provider_id: str) -> list:
def _run_changes(self, running_changes: dict) -> list:
changes_to_remove = []
for name, change_list in self.changes.items():
if name in running_changes and running_changes[name] == provider_id:
if name in running_changes and running_changes[name] == self.provider_id:
change = change_list[0]
_LOG.info('Executing action {} step'.format(change))
if self.running or change.can_run_at_exit():
try:
if not change.run(_exclude_self(provider_id, change.get_name(), running_changes)):
if not change.run(_exclude_self(self.provider_id, change.get_name(), running_changes)):
_LOG.info('Action {} completed'.format(change))
changes_to_remove.append(change.get_name())
else:
Expand Down Expand Up @@ -119,24 +147,23 @@ def _release_changes_lock(self, changes_to_remove):
self.zk.unregister_change(name)

def loop(self, change_on_init=None):
provider_id = self.env_provider.get_id()
self.provider_id = self.env_provider.get_id()
if change_on_init:
self._add_change_to_queue(change_on_init)
while self.running or self.changes:
self.make_step(provider_id)
self.make_step()

if self.changes:
sleep(0.5)
timeout = 0.5
else:
min_time_till_check = min([check.time_till_check() for check in self.checks])
if min_time_till_check > 0:
sleep(min_time_till_check)
timeout = min([check.time_till_check() for check in self.checks])
sleep_and_operate(self, timeout)

def make_step(self, provider_id):
def make_step(self):
# register running changes
running_changes = self._register_running_changes(provider_id)
running_changes = self._register_running_changes()
# apply changes without holding lock
changes_to_remove = self._run_changes(running_changes, provider_id)
changes_to_remove = self._run_changes(running_changes)
# remove processed actions
self._release_changes_lock(changes_to_remove)
if self.running:
Expand Down
49 changes: 47 additions & 2 deletions bubuku/health.py
Original file line number Diff line number Diff line change
@@ -1,25 +1,70 @@
import json
import logging
import threading
from functools import partial
from http.server import BaseHTTPRequestHandler, HTTPServer

from bubuku.communicate import execute_on_controller_thread
from bubuku.controller import Controller
from bubuku.utils import CmdHelper

_CONTROLLER_TIMEOUT = 5

_API_CONTROLLER = '/api/controller/'

_LOG = logging.getLogger('bubuku.health')


def load_controller_queue(controller: Controller):
return controller.enumerate_changes()


def delete_from_controller_queue(name: str, controller: Controller):
return {
'count': controller.cancel_changes(name)
}


class _Handler(BaseHTTPRequestHandler):
cmd_helper = None

def do_GET(self):
if self.path in ('/api/disk_stats', '/api/disk_stats/'):
used_kb, free_kb = self.cmd_helper.get_disk_stats()
self._send_response({'free_kb': free_kb, 'used_kb': used_kb})
elif self.path.startswith(_API_CONTROLLER):
self.wrap_controller_execution(lambda: self._run_controller_action(self.path[len(_API_CONTROLLER):]))
else:
self._send_response({'status': 'OK'})

def _send_response(self, json_):
self.send_response(200)
def wrap_controller_execution(self, call):
try:
call()
except TimeoutError as e:
_LOG.error('Failed to rum action because of timeouts', exc_info=e)
self._send_response({'code': 'timeout', 'message': 'Timeout occurred'}, 500)

def do_DELETE(self):
if not self.path.startswith(_API_CONTROLLER):
return self._send_response({'message': 'Path {} is not supported'.format(self.path)}, 404)
action = self.path[len(_API_CONTROLLER):].split('/')
if action[0] == 'queue':
if len(action) < 2:
return self._send_response({'message': 'No second argument provided!'}, 400)
self.wrap_controller_execution(
lambda: self._send_response(execute_on_controller_thread(
partial(delete_from_controller_queue, action[1]), _CONTROLLER_TIMEOUT), 200))
else:
return self._send_response({'message': 'Action {} is not supported'.format(action[0])}, 404)

def _run_controller_action(self, action):
if action.split('/')[0] == 'queue':
return self._send_response(execute_on_controller_thread(load_controller_queue, _CONTROLLER_TIMEOUT), 200)
else:
return self._send_response({'message': 'Action {} is not supported'.format(action)}, 404)

def _send_response(self, json_, status_code=200):
self.send_response(status_code)
self.send_header('Content-type', 'application/json')
self.end_headers()
self.wfile.write(json.dumps(json_).encode('utf-8'))
Expand Down
5 changes: 4 additions & 1 deletion bubuku/zookeeper/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,10 @@ def create(self, *args, **kwargs):

def delete(self, *args, **kwargs):
self.hosts_cache.touch()
return self.client.retry(self.client.delete, *args, **kwargs)
try:
return self.client.retry(self.client.delete, *args, **kwargs)
except NoNodeError:
pass

def get_children(self, *params):
self.hosts_cache.touch()
Expand Down
Loading