Skip to content
This repository has been archived by the owner on Oct 7, 2024. It is now read-only.

Commit

Permalink
#61 Ability to list and cancel current actions
Browse files Browse the repository at this point in the history
  • Loading branch information
antban committed Sep 13, 2016
1 parent c9e5cf8 commit 433e009
Show file tree
Hide file tree
Showing 4 changed files with 144 additions and 21 deletions.
48 changes: 48 additions & 0 deletions bubuku/communicate.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
import logging
import threading
import time
from queue import Queue, Empty, Full

__COMMAND_QUEUE = Queue()

_LOG = logging.getLogger('bubuku.communicates')


def sleep_and_operate(controller, timeout: float):
cur_time = time.time()
finish = cur_time + (0.1 if timeout <= 0 else timeout)
while cur_time < finish:
try:
commnad = __COMMAND_QUEUE.get(block=True, timeout=finish - cur_time)
try:
commnad(controller)
except Exception as e:
_LOG.error('Command finished with error', exc_info=e)
except Empty:
pass
cur_time = time.time()


def execute_on_controller_thread(function, timeout):
condition = threading.Condition()
result = [None, True]

def _execute(controller):
with condition:
if result[1]:
try:
result[0] = function(controller)
finally:
condition.notify()

finish = time.time() + timeout
with condition:
try:
__COMMAND_QUEUE.put(_execute, timeout=timeout)
except Full:
raise TimeoutError('Timeout expired')
if condition.wait(timeout=finish - time.time()):
return result[0]
else:
result[1] = False
raise TimeoutError('Timeout expired')
63 changes: 45 additions & 18 deletions bubuku/controller.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
import logging
from time import sleep, time
from time import time

from bubuku.broker import BrokerManager
from bubuku.communicate import sleep_and_operate
from bubuku.env_provider import EnvProvider
from bubuku.zookeeper import BukuExhibitor

Expand Down Expand Up @@ -56,16 +57,43 @@ def __init__(self, broker_manager: BrokerManager, zk: BukuExhibitor, env_provide
self.checks = []
self.changes = {} # Holds mapping from change name to array of pending changes
self.running = True
self.provider_id = None # provider id must not be requested on initialization

def enumerate_changes(self):
with self.zk.lock(self.provider_id):
running_changes = self.zk.get_running_changes()

result = []
for name, change_list in self.changes.items():
running = running_changes.get(name) == self.provider_id
first = True
for change in change_list:
result.append({
'type': name,
'description': str(change),
'running': bool(first and running)
})
first = False
return result

def cancel_changes(self, name):
result = len(self.changes.get(name, {}))
if result:
with self.zk.lock(self.provider_id):
self.zk.unregister_change(name)

del self.changes[name]
return result

def add_check(self, check):
_LOG.info('Adding check {}'.format(str(check)))
self.checks.append(check)

def _register_running_changes(self, provider_id: str) -> dict:
def _register_running_changes(self) -> dict:
if not self.changes:
return {} # Do not take lock if there are no changes to register
_LOG.debug('Taking lock for processing')
with self.zk.lock(provider_id):
with self.zk.lock(self.provider_id):
_LOG.debug('Lock is taken')
# Get list of current running changes
running_changes = self.zk.get_running_changes()
Expand All @@ -75,23 +103,23 @@ def _register_running_changes(self, provider_id: str) -> dict:
for name, change_list in self.changes.items():
# Only first change is able to run
first_change = change_list[0]
if first_change.can_run(_exclude_self(provider_id, name, running_changes)):
if first_change.can_run(_exclude_self(self.provider_id, name, running_changes)):
if name not in running_changes:
self.zk.register_change(name, provider_id)
running_changes[name] = provider_id
self.zk.register_change(name, self.provider_id)
running_changes[name] = self.provider_id
else:
_LOG.info('Change {} is waiting for others: {}'.format(name, running_changes))
return running_changes

def _run_changes(self, running_changes: dict, provider_id: str) -> list:
def _run_changes(self, running_changes: dict) -> list:
changes_to_remove = []
for name, change_list in self.changes.items():
if name in running_changes and running_changes[name] == provider_id:
if name in running_changes and running_changes[name] == self.provider_id:
change = change_list[0]
_LOG.info('Executing action {} step'.format(change))
if self.running or change.can_run_at_exit():
try:
if not change.run(_exclude_self(provider_id, change.get_name(), running_changes)):
if not change.run(_exclude_self(self.provider_id, change.get_name(), running_changes)):
_LOG.info('Action {} completed'.format(change))
changes_to_remove.append(change.get_name())
else:
Expand Down Expand Up @@ -119,24 +147,23 @@ def _release_changes_lock(self, changes_to_remove):
self.zk.unregister_change(name)

def loop(self, change_on_init=None):
provider_id = self.env_provider.get_id()
self.provider_id = self.env_provider.get_id()
if change_on_init:
self._add_change_to_queue(change_on_init)
while self.running or self.changes:
self.make_step(provider_id)
self.make_step()

if self.changes:
sleep(0.5)
timeout = 0.5
else:
min_time_till_check = min([check.time_till_check() for check in self.checks])
if min_time_till_check > 0:
sleep(min_time_till_check)
timeout = min([check.time_till_check() for check in self.checks])
sleep_and_operate(self, timeout)

def make_step(self, provider_id):
def make_step(self):
# register running changes
running_changes = self._register_running_changes(provider_id)
running_changes = self._register_running_changes()
# apply changes without holding lock
changes_to_remove = self._run_changes(running_changes, provider_id)
changes_to_remove = self._run_changes(running_changes)
# remove processed actions
self._release_changes_lock(changes_to_remove)
if self.running:
Expand Down
49 changes: 47 additions & 2 deletions bubuku/health.py
Original file line number Diff line number Diff line change
@@ -1,25 +1,70 @@
import json
import logging
import threading
from functools import partial
from http.server import BaseHTTPRequestHandler, HTTPServer

from bubuku.communicate import execute_on_controller_thread
from bubuku.controller import Controller
from bubuku.utils import CmdHelper

_CONTROLLER_TIMEOUT = 5

_API_CONTROLLER = '/api/controller/'

_LOG = logging.getLogger('bubuku.health')


def load_controller_queue(controller: Controller):
return controller.enumerate_changes()


def delete_from_controller_queue(name: str, controller: Controller):
return {
'count': controller.cancel_changes(name)
}


class _Handler(BaseHTTPRequestHandler):
cmd_helper = None

def do_GET(self):
if self.path in ('/api/disk_stats', '/api/disk_stats/'):
used_kb, free_kb = self.cmd_helper.get_disk_stats()
self._send_response({'free_kb': free_kb, 'used_kb': used_kb})
elif self.path.startswith(_API_CONTROLLER):
self.wrap_controller_execution(lambda: self._run_controller_action(self.path[len(_API_CONTROLLER):]))
else:
self._send_response({'status': 'OK'})

def _send_response(self, json_):
self.send_response(200)
def wrap_controller_execution(self, call):
try:
call()
except TimeoutError as e:
_LOG.error('Failed to rum action because of timeouts', exc_info=e)
self._send_response({'code': 'timeout', 'message': 'Timeout occurred'}, 500)

def do_DELETE(self):
if not self.path.startswith(_API_CONTROLLER):
return self._send_response({'message': 'Path {} is not supported'.format(self.path)}, 404)
action = self.path[len(_API_CONTROLLER):].split('/')
if action[0] == 'queue':
if len(action) < 2:
return self._send_response({'message': 'No second argument provided!'}, 400)
self.wrap_controller_execution(
lambda: self._send_response(execute_on_controller_thread(
partial(delete_from_controller_queue, action[1]), _CONTROLLER_TIMEOUT), 200))
else:
return self._send_response({'message': 'Action {} is not supported'.format(action[0])}, 404)

def _run_controller_action(self, action):
if action.split('/')[0] == 'queue':
return execute_on_controller_thread(load_controller_queue, _CONTROLLER_TIMEOUT)
else:
return self._send_response({'message': 'Action {} is not supported'.format(action)}, 404)

def _send_response(self, json_, status_code=200):
self.send_response(status_code)
self.send_header('Content-type', 'application/json')
self.end_headers()
self.wfile.write(json.dumps(json_).encode('utf-8'))
Expand Down
5 changes: 4 additions & 1 deletion bubuku/zookeeper/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,10 @@ def create(self, *args, **kwargs):

def delete(self, *args, **kwargs):
self.hosts_cache.touch()
return self.client.retry(self.client.delete, *args, **kwargs)
try:
return self.client.retry(self.client.delete, *args, **kwargs)
except NoNodeError:
pass

def get_children(self, *params):
self.hosts_cache.touch()
Expand Down

0 comments on commit 433e009

Please sign in to comment.