diff --git a/bubuku/cli.py b/bubuku/cli.py index 6601854..1a991c6 100644 --- a/bubuku/cli.py +++ b/bubuku/cli.py @@ -5,6 +5,7 @@ from bubuku.amazon import Amazon from bubuku.config import load_config, KafkaProperties, Config from bubuku.features.remote_exec import RemoteCommandExecutorCheck +from bubuku.features.swap_partitions import load_swap_data from bubuku.id_generator import get_broker_id_policy from bubuku.zookeeper import load_exhibitor_proxy, BukuExhibitor from bubuku.zookeeper.exhibior import AWSExhibitorAddressProvider @@ -58,5 +59,23 @@ def rebalance_partitions(broker: str): RemoteCommandExecutorCheck.register_rebalance(zookeeper, broker_id) +@cli.command('swap_fat_slim') +@click.option('--threshold', type=click.INT, default="100000", show_default=True, help="Threshold in kb to run swap") +def swap_partitions(threshold: int): + _, __, zookeeper = __prepare_configs() + RemoteCommandExecutorCheck.register_fatboy_slim(zookeeper, threshold_kb=threshold) + + +@cli.command('stats') +def show_stats(): + config, amazon, zookeeper = __prepare_configs() + slim_broker_id, fat_broker_id, calculated_gap, stats_data = load_swap_data(zookeeper, config.health_port, 0) + print('Broker list (broker_id, ip_address, free_space_kb, used_space_kb):') + for broker_id, data in stats_data.items(): + data_stats = data.get('disk') + print('{}\t{}\t{}\t{}'.format(broker_id, data.get('host'), data_stats.get('free_kb'), data.get('used_kb'))) + print('Calculated gap between {}(fat) and {}(slim) is {} kb'.format(fat_broker_id, slim_broker_id, calculated_gap)) + + if __name__ == '__main__': cli() diff --git a/bubuku/config.py b/bubuku/config.py index c974646..fb696c6 100644 --- a/bubuku/config.py +++ b/bubuku/config.py @@ -52,7 +52,8 @@ def dump(self): def load_config() -> Config: zk_prefix = os.getenv('ZOOKEEPER_PREFIX', '/') - features = {key: {} for key in os.getenv('BUKU_FEATURES', '').lower().split(',')} + features_str = os.getenv('BUKU_FEATURES', '').lower() + features = {key: {} for key in features_str.split(',')} if features_str else {} if "balance_data_size" in features: features["balance_data_size"]["diff_threshold_mb"] = int(os.getenv('FREE_SPACE_DIFF_THRESHOLD_MB', '50000')) diff --git a/bubuku/features/remote_exec.py b/bubuku/features/remote_exec.py index 4437706..f27031f 100644 --- a/bubuku/features/remote_exec.py +++ b/bubuku/features/remote_exec.py @@ -4,16 +4,18 @@ from bubuku.controller import Check, Change from bubuku.features.rebalance import RebalanceChange from bubuku.features.restart_on_zk_change import RestartBrokerChange +from bubuku.features.swap_partitions import SwapPartitionsChange, load_swap_data from bubuku.zookeeper import BukuExhibitor _LOG = logging.getLogger('bubuku.features.remote_exec') class RemoteCommandExecutorCheck(Check): - def __init__(self, zk: BukuExhibitor, broker_manager: BrokerManager): + def __init__(self, zk: BukuExhibitor, broker_manager: BrokerManager, api_port): super().__init__() self.zk = zk self.broker_manager = broker_manager + self.api_port = api_port def check(self) -> Change: with self.zk.lock(): @@ -25,8 +27,14 @@ def check(self) -> Change: return None if data['name'] == 'restart': return RestartBrokerChange(self.zk, self.broker_manager, lambda: False) - if data['name'] == 'rebalance': + elif data['name'] == 'rebalance': return RebalanceChange(self.zk, self.zk.get_broker_ids()) + elif data['name'] == 'fatboyslim': + try: + return SwapPartitionsChange(self.zk, + lambda x: load_swap_data(x, self.api_port, int(data['threshold_kb']))) + except Exception as e: + _LOG.error('Failed to create swap change for {}'.format(data), exc_info=e) return None def __str__(self): @@ -46,3 +54,8 @@ def register_rebalance(zk: BukuExhibitor, broker_id: str): zk.register_action({'name': 'rebalance'}, broker_id=broker_id) else: zk.register_action({'name': 'rebalance'}) + + @staticmethod + def register_fatboy_slim(zk: BukuExhibitor, threshold_kb: int): + with zk.lock(): + zk.register_action({'name': 'fatboyslim', 'threshold_kb': threshold_kb}) diff --git a/bubuku/features/swap_partitions.py b/bubuku/features/swap_partitions.py index 6c48620..35d3e14 100644 --- a/bubuku/features/swap_partitions.py +++ b/bubuku/features/swap_partitions.py @@ -153,7 +153,7 @@ def _load_disk_stats(zk: BukuExhibitor, api_port: int): return result -def load_swap_data(zk: BukuExhibitor, api_port: int, gap: int) -> (str, str, int): +def load_swap_data(zk: BukuExhibitor, api_port: int, gap: int) -> (str, str, int, dict): """ Finds brokers that could be used for gap of size gap :param zk: Bubuku exhibitor diff --git a/tests/test_daemon.py b/tests/test_daemon.py index ffbc303..c0b6564 100644 --- a/tests/test_daemon.py +++ b/tests/test_daemon.py @@ -21,7 +21,7 @@ def test_load_restart_on_exhibitor(): controller = TestController() - apply_features({'restart_on_exhibitor': {}}, controller, exhibitor, broker, None, None) + apply_features(-1, {'restart_on_exhibitor': {}}, controller, exhibitor, broker, None, None) assert len(controller.checks) == 1 check = controller.checks[0] @@ -36,7 +36,7 @@ def test_rebalance_on_start(): controller = TestController() - apply_features({'rebalance_on_start': {}}, controller, exhibitor, broker, None, None) + apply_features(-1, {'rebalance_on_start': {}}, controller, exhibitor, broker, None, None) assert len(controller.checks) == 1 check = controller.checks[0] @@ -52,7 +52,7 @@ def test_rebalance_on_broker_list_change(): controller = TestController() - apply_features({'rebalance_on_brokers_change': {}}, controller, exhibitor, broker, None, None) + apply_features(-1, {'rebalance_on_brokers_change': {}}, controller, exhibitor, broker, None, None) assert len(controller.checks) == 1 check = controller.checks[0] @@ -70,7 +70,7 @@ def test_graceful_terminate(): controller = TestController() - apply_features({'graceful_terminate': {}}, controller, None, broker, None, None) + apply_features(-1, {'graceful_terminate': {}}, controller, None, broker, None, None) assert len(controller.checks) == 0 @@ -86,6 +86,6 @@ def test_use_ip_address(): amazon = MagicMock() amazon.get_own_ip = MagicMock(return_value='172.31.146.57') - apply_features({'use_ip_address': {}}, None, None, None, props, amazon) + apply_features(-1, {'use_ip_address': {}}, None, None, None, props, amazon) assert props.get_property('advertised.host.name') == '172.31.146.57'