Skip to content
This repository has been archived by the owner on Oct 7, 2024. It is now read-only.

Commit

Permalink
#45 Add remote execution
Browse files Browse the repository at this point in the history
  • Loading branch information
antban committed Aug 16, 2016
1 parent cda0086 commit bcefa5a
Show file tree
Hide file tree
Showing 5 changed files with 42 additions and 9 deletions.
19 changes: 19 additions & 0 deletions bubuku/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
from bubuku.amazon import Amazon
from bubuku.config import load_config, KafkaProperties, Config
from bubuku.features.remote_exec import RemoteCommandExecutorCheck
from bubuku.features.swap_partitions import load_swap_data
from bubuku.id_generator import get_broker_id_policy
from bubuku.zookeeper import load_exhibitor_proxy, BukuExhibitor
from bubuku.zookeeper.exhibior import AWSExhibitorAddressProvider
Expand Down Expand Up @@ -58,5 +59,23 @@ def rebalance_partitions(broker: str):
RemoteCommandExecutorCheck.register_rebalance(zookeeper, broker_id)


@cli.command('swap_fat_slim')
@click.option('--threshold', type=click.INT, default="100000", show_default=True, help="Threshold in kb to run swap")
def swap_partitions(threshold: int):
_, __, zookeeper = __prepare_configs()
RemoteCommandExecutorCheck.register_fatboy_slim(zookeeper, threshold_kb=threshold)


@cli.command('stats')
def show_stats():
config, amazon, zookeeper = __prepare_configs()
slim_broker_id, fat_broker_id, calculated_gap, stats_data = load_swap_data(zookeeper, config.health_port, 0)
print('Broker list (broker_id, ip_address, free_space_kb, used_space_kb):')
for broker_id, data in stats_data.items():
data_stats = data.get('disk')
print('{}\t{}\t{}\t{}'.format(broker_id, data.get('host'), data_stats.get('free_kb'), data.get('used_kb')))
print('Calculated gap between {}(fat) and {}(slim) is {} kb'.format(fat_broker_id, slim_broker_id, calculated_gap))


if __name__ == '__main__':
cli()
3 changes: 2 additions & 1 deletion bubuku/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,8 @@ def dump(self):
def load_config() -> Config:
zk_prefix = os.getenv('ZOOKEEPER_PREFIX', '/')

features = {key: {} for key in os.getenv('BUKU_FEATURES', '').lower().split(',')}
features_str = os.getenv('BUKU_FEATURES', '').lower()
features = {key: {} for key in features_str.split(',')} if features_str else {}
if "balance_data_size" in features:
features["balance_data_size"]["diff_threshold_mb"] = int(os.getenv('FREE_SPACE_DIFF_THRESHOLD_MB', '50000'))

Expand Down
17 changes: 15 additions & 2 deletions bubuku/features/remote_exec.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,18 @@
from bubuku.controller import Check, Change
from bubuku.features.rebalance import RebalanceChange
from bubuku.features.restart_on_zk_change import RestartBrokerChange
from bubuku.features.swap_partitions import SwapPartitionsChange, load_swap_data
from bubuku.zookeeper import BukuExhibitor

_LOG = logging.getLogger('bubuku.features.remote_exec')


class RemoteCommandExecutorCheck(Check):
def __init__(self, zk: BukuExhibitor, broker_manager: BrokerManager):
def __init__(self, zk: BukuExhibitor, broker_manager: BrokerManager, api_port):
super().__init__()
self.zk = zk
self.broker_manager = broker_manager
self.api_port = api_port

def check(self) -> Change:
with self.zk.lock():
Expand All @@ -25,8 +27,14 @@ def check(self) -> Change:
return None
if data['name'] == 'restart':
return RestartBrokerChange(self.zk, self.broker_manager, lambda: False)
if data['name'] == 'rebalance':
elif data['name'] == 'rebalance':
return RebalanceChange(self.zk, self.zk.get_broker_ids())
elif data['name'] == 'fatboyslim':
try:
return SwapPartitionsChange(self.zk,
lambda x: load_swap_data(x, self.api_port, int(data['threshold_kb'])))
except Exception as e:
_LOG.error('Failed to create swap change for {}'.format(data), exc_info=e)
return None

def __str__(self):
Expand All @@ -46,3 +54,8 @@ def register_rebalance(zk: BukuExhibitor, broker_id: str):
zk.register_action({'name': 'rebalance'}, broker_id=broker_id)
else:
zk.register_action({'name': 'rebalance'})

@staticmethod
def register_fatboy_slim(zk: BukuExhibitor, threshold_kb: int):
with zk.lock():
zk.register_action({'name': 'fatboyslim', 'threshold_kb': threshold_kb})
2 changes: 1 addition & 1 deletion bubuku/features/swap_partitions.py
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ def _load_disk_stats(zk: BukuExhibitor, api_port: int):
return result


def load_swap_data(zk: BukuExhibitor, api_port: int, gap: int) -> (str, str, int):
def load_swap_data(zk: BukuExhibitor, api_port: int, gap: int) -> (str, str, int, dict):
"""
Finds brokers that could be used for gap of size gap
:param zk: Bubuku exhibitor
Expand Down
10 changes: 5 additions & 5 deletions tests/test_daemon.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ def test_load_restart_on_exhibitor():

controller = TestController()

apply_features({'restart_on_exhibitor': {}}, controller, exhibitor, broker, None, None)
apply_features(-1, {'restart_on_exhibitor': {}}, controller, exhibitor, broker, None, None)

assert len(controller.checks) == 1
check = controller.checks[0]
Expand All @@ -36,7 +36,7 @@ def test_rebalance_on_start():

controller = TestController()

apply_features({'rebalance_on_start': {}}, controller, exhibitor, broker, None, None)
apply_features(-1, {'rebalance_on_start': {}}, controller, exhibitor, broker, None, None)

assert len(controller.checks) == 1
check = controller.checks[0]
Expand All @@ -52,7 +52,7 @@ def test_rebalance_on_broker_list_change():

controller = TestController()

apply_features({'rebalance_on_brokers_change': {}}, controller, exhibitor, broker, None, None)
apply_features(-1, {'rebalance_on_brokers_change': {}}, controller, exhibitor, broker, None, None)

assert len(controller.checks) == 1
check = controller.checks[0]
Expand All @@ -70,7 +70,7 @@ def test_graceful_terminate():

controller = TestController()

apply_features({'graceful_terminate': {}}, controller, None, broker, None, None)
apply_features(-1, {'graceful_terminate': {}}, controller, None, broker, None, None)

assert len(controller.checks) == 0

Expand All @@ -86,6 +86,6 @@ def test_use_ip_address():
amazon = MagicMock()
amazon.get_own_ip = MagicMock(return_value='172.31.146.57')

apply_features({'use_ip_address': {}}, None, None, None, props, amazon)
apply_features(-1, {'use_ip_address': {}}, None, None, None, props, amazon)

assert props.get_property('advertised.host.name') == '172.31.146.57'

0 comments on commit bcefa5a

Please sign in to comment.