From 351252304c050d925d6e016a6091e4fac93fbc8f Mon Sep 17 00:00:00 2001 From: antban Date: Thu, 4 Aug 2016 12:23:50 +0200 Subject: [PATCH 01/10] Rebalance is using BukuProxy --- bubuku/daemon.py | 7 ++-- bubuku/features/rebalance.py | 62 +++++++++--------------------------- bubuku/zookeeper.py | 62 +++++++++++++++++++++++++++++++++++- tests/test_daemon.py | 4 +-- tests/test_rebalance.py | 57 ++++++++++++--------------------- 5 files changed, 103 insertions(+), 89 deletions(-) diff --git a/bubuku/daemon.py b/bubuku/daemon.py index 162ae0d..016b2f7 100644 --- a/bubuku/daemon.py +++ b/bubuku/daemon.py @@ -15,20 +15,21 @@ from bubuku.features.terminate import register_terminate_on_interrupt from bubuku.id_generator import get_broker_id_policy from bubuku.utils import CmdHelper -from bubuku.zookeeper import load_exhibitor, Exhibitor +from bubuku.zookeeper import load_exhibitor, Exhibitor, BukuProxy _LOG = logging.getLogger('bubuku.main') def apply_features(features: str, controller: Controller, exhibitor: Exhibitor, broker: BrokerManager, kafka_properties: KafkaProperties, amazon: Amazon) -> list: + buku_proxy = BukuProxy(exhibitor) for feature in set(features.split(',')): if feature == 'restart_on_exhibitor': controller.add_check(CheckExhibitorAddressChanged(exhibitor, broker)) elif feature == 'rebalance_on_start': - controller.add_check(RebalanceOnStartCheck(exhibitor, broker)) + controller.add_check(RebalanceOnStartCheck(buku_proxy, broker)) elif feature == 'rebalance_on_brokers_change': - controller.add_check(RebalanceOnBrokerListChange(exhibitor, broker)) + controller.add_check(RebalanceOnBrokerListChange(buku_proxy, broker)) elif feature == 'rebalance_by_size': controller.add_check(GenerateDataSizeStatistics(exhibitor, broker, CmdHelper(), kafka_properties.get_property("log.dirs").split(","))) diff --git a/bubuku/features/rebalance.py b/bubuku/features/rebalance.py index ee315bc..ebde98e 100644 --- a/bubuku/features/rebalance.py +++ b/bubuku/features/rebalance.py @@ -1,11 +1,9 @@ import json import logging -from kazoo.exceptions import NodeExistsError, NoNodeError - from bubuku.broker import BrokerManager from bubuku.controller import Check, Change -from bubuku.zookeeper import Exhibitor +from bubuku.zookeeper import BukuProxy _LOG = logging.getLogger('bubuku.features.rebalance') @@ -46,7 +44,7 @@ def _removal_func(): class RebalanceChange(Change): - def __init__(self, zk: Exhibitor, broker_list): + def __init__(self, zk: BukuProxy, broker_list): self.zk = zk self.broker_ids = broker_list self.stale_data = {} # partition count to topic data @@ -63,14 +61,6 @@ def get_name(self) -> str: def can_run(self, current_actions): return all([a not in current_actions for a in ['start', 'restart', 'rebalance', 'stop']]) - def load_current_data(self): - result = [] - for topic in self.zk.get_children('/brokers/topics'): - data = json.loads(self.zk.get("/brokers/topics/" + topic)[0].decode('utf-8')) - for k, v in data['partitions'].items(): - result.append({"topic": topic, "partition": int(k), "replicas": v}) - return result - def take_next(self) -> dict: if self.stale_data: k = [x for x in self.stale_data.keys()][0] @@ -98,14 +88,10 @@ def run(self, current_actions): _LOG.warning("Rebalance stopped, because other blocking events running: {}".format(current_actions)) return False - try: - rebalance_data = self.zk.get('/admin/reassign_partitions')[0].decode('utf-8') - _LOG.info('Old rebalance is still in progress: {}, waiting'.format(rebalance_data)) + if self.zk.is_rebalancing(): return True - except NoNodeError: - pass - new_broker_ids = sorted(self.zk.get_children('/brokers/ids')) + new_broker_ids = self.zk.get_broker_ids() if new_broker_ids != self.broker_ids: _LOG.warning("Rebalance stopped because of broker list change from {} to {}".format(self.broker_ids, @@ -117,17 +103,19 @@ def run(self, current_actions): # Load existing data from zookeeper and try to split it for different purposes if self.shuffled_broker_ids is None: self.shuffled_broker_ids = {} - for d in self.load_current_data(): - replication_factor = len(d['replicas']) + for topic, partition, replicas in self.zk.load_partition_assignment(): + replication_factor = len(replicas) if replication_factor > len(self.broker_ids): _LOG.warning( - "Will not rebalance partition {} because only {} brokers available".format(d, self.broker_ids)) + "Will not rebalance partition {}:{} because only {} brokers available".format( + topic, partition, self.broker_ids)) continue if replication_factor not in self.shuffled_broker_ids: self.shuffled_broker_ids[replication_factor] = { k: [] for k in combine_broker_ids(self.broker_ids, replication_factor) } - name = _optimise_broker_ids([str(i) for i in d['replicas']]) + name = _optimise_broker_ids([str(i) for i in replicas]) + d = {"topic": topic, "partition": partition, "replicas": replicas} if name not in self.shuffled_broker_ids[replication_factor]: if name not in self.stale_data: self.stale_data[name] = [] @@ -149,17 +137,7 @@ def run(self, current_actions): min_length = min([len(v) for v in self.shuffled_broker_ids[replication_factor].values()]) for k, v in self.shuffled_broker_ids[replication_factor].items(): if len(v) == min_length: - j = { - "version": "1", - "partitions": [ - { - "topic": to_move['topic'], - "partition": to_move['partition'], - "replicas": [int(v) for v in k.split(',')], - } - ] - } - if self.reallocate(j): + if self.zk.reallocate_partition(to_move['topic'], to_move['partition'], k.split(',')): _LOG.info("Current allocation: \n{}".format(self.dump_allocations())) removal_func() v.append(to_move) @@ -169,23 +147,13 @@ def run(self, current_actions): _LOG.info("Current allocation: \n{}".format(self.dump_allocations())) return False - def reallocate(self, j: dict): - try: - data = json.dumps(j) - self.zk.create("/admin/reassign_partitions", data.encode('utf-8')) - _LOG.info("Reallocating {}".format(data)) - return True - except NodeExistsError: - _LOG.info("Waiting for free reallocation slot, still in progress...") - return False - def dump_allocations(self): return '\n'.join( ['\n'.join(['{}:{}'.format(x, len(y)) for x, y in v.items()]) for v in self.shuffled_broker_ids.values()]) class RebalanceOnStartCheck(Check): - def __init__(self, zk, broker: BrokerManager): + def __init__(self, zk: BukuProxy, broker: BrokerManager): super().__init__() self.zk = zk self.broker = broker @@ -198,14 +166,14 @@ def check(self): return None _LOG.info("Rebalance on start, triggering rebalance") self.executed = True - return RebalanceChange(self.zk, sorted(self.zk.get_children('/brokers/ids'))) + return RebalanceChange(self.zk, self.zk.get_broker_ids()) def __str__(self): return 'RebalanceOnStartCheck (executed={})'.format(self.executed) class RebalanceOnBrokerListChange(Check): - def __init__(self, zk, broker: BrokerManager): + def __init__(self, zk: BukuProxy, broker: BrokerManager): super().__init__() self.zk = zk self.broker = broker @@ -214,7 +182,7 @@ def __init__(self, zk, broker: BrokerManager): def check(self): if not self.broker.is_running_and_registered(): return None - new_list = sorted(self.zk.get_children('/brokers/ids')) + new_list = self.zk.get_broker_ids() if not new_list == self.old_broker_list: _LOG.info('Broker list changed from {} to {}, triggering rebalance'.format(self.old_broker_list, new_list)) self.old_broker_list = new_list diff --git a/bubuku/zookeeper.py b/bubuku/zookeeper.py index d71aed8..ad01e63 100644 --- a/bubuku/zookeeper.py +++ b/bubuku/zookeeper.py @@ -1,10 +1,11 @@ +import json import logging import random import time import requests from kazoo.client import KazooClient -from kazoo.exceptions import NoNodeError +from kazoo.exceptions import NoNodeError, NodeExistsError from requests.exceptions import RequestException _LOG = logging.getLogger('bubuku.exhibitor') @@ -118,5 +119,64 @@ def take_lock(self, *args, **kwargs): _LOG.error('Failed to obtain lock for exhibitor, retrying', exc_info=e) +class BukuProxy(object): + def __init__(self, exhibitor: Exhibitor): + self.exhibitor = exhibitor + + def get_broker_ids(self) -> list: + """ + Gets list of available broker ids + :return: Sorted list of strings - active broker ids. + """ + return sorted(self.exhibitor.get('/brokers/ids')) + + def load_partition_assignment(self) -> list: + """ + Lists all the assignments of partitions to particular broker ids. + :returns list of tuples (topic_name:str, partition:int, replica_list:list(int)), for ex. "test", 0, [1,2,3] + """ + result = [] + for topic in self.exhibitor.get_children('/brokers/topics'): + data = json.loads(self.exhibitor.get("/brokers/topics/" + topic)[0].decode('utf-8')) + for k, v in data['partitions'].items(): + result.append((topic, int(k), v)) + return result + + def reallocate_partition(self, topic: str, partition: object, replicas: list) -> bool: + """ + Reallocates partition to replica list + :param topic: topic to move + :param partition: partition to move (can be str or int) + :param replicas: list of replicas to move to + :return: If reallocation was successful (node for reallocation was created) + """ + j = { + "version": "1", + "partitions": [ + { + "topic": topic, + "partition": int(partition), + "replicas": [int(p) for p in replicas], + } + ] + } + try: + data = json.dumps(j) + self.exhibitor.create("/admin/reassign_partitions", data.encode('utf-8')) + _LOG.info("Reallocating {}".format(data)) + return True + except NodeExistsError: + _LOG.info("Waiting for free reallocation slot, still in progress...") + return False + + def is_rebalancing(self): + try: + rebalance_data = self.exhibitor.get('/admin/reassign_partitions')[0].decode('utf-8') + _LOG.info('Old rebalance is still in progress: {}, waiting'.format(rebalance_data)) + return True + except NoNodeError: + return False + + def load_exhibitor(initial_hosts: list, zookeeper_prefix): return Exhibitor(initial_hosts, 8181, zookeeper_prefix) diff --git a/tests/test_daemon.py b/tests/test_daemon.py index 71c01f8..963c083 100644 --- a/tests/test_daemon.py +++ b/tests/test_daemon.py @@ -41,7 +41,7 @@ def test_rebalance_on_start(): assert len(controller.checks) == 1 check = controller.checks[0] assert type(check) == RebalanceOnStartCheck - assert check.zk == exhibitor + assert check.zk.exhibitor == exhibitor assert check.broker == broker assert not check.executed @@ -57,7 +57,7 @@ def test_rebalance_on_broker_list_change(): assert len(controller.checks) == 1 check = controller.checks[0] assert type(check) == RebalanceOnBrokerListChange - assert check.zk == exhibitor + assert check.zk.exhibitor == exhibitor assert check.broker == broker diff --git a/tests/test_rebalance.py b/tests/test_rebalance.py index 43b9761..19012e5 100644 --- a/tests/test_rebalance.py +++ b/tests/test_rebalance.py @@ -1,11 +1,10 @@ import functools -import json from unittest.mock import MagicMock from kazoo.exceptions import NoNodeError from bubuku.features.rebalance import RebalanceChange, RebalanceOnBrokerListChange, combine_broker_ids -from bubuku.zookeeper import Exhibitor +from bubuku.zookeeper import BukuProxy def test_rebalance_can_run(): @@ -26,37 +25,23 @@ def test_rebalance_get_name(): assert o.get_name() == 'rebalance' -def __create_zk_for_topics(topic_data, broker_ids=None) -> (list, Exhibitor): - def _get_children(path: str): - if path == '/brokers/ids': - if broker_ids: - return broker_ids - return list(set(functools.reduce(lambda x, y: x + y, topic_data.values(), []))) - if path == '/brokers/topics': - return list(set([k[0] for k in topic_data.keys()])) - raise NotImplementedError('get_children {} is not supported'.format(path)) - - def _get(path): - if path.startswith('/brokers/topics/'): - topic = path[len('/brokers/topics/'):] - return json.dumps({'partitions': {k[1]: br for k, br in topic_data.items() if k[0] == topic}}).encode( - 'utf-8'), object() - elif path == '/admin/reassign_partitions': - raise NoNodeError() - raise NotImplementedError('get {} is not supported'.format(path)) - - def _create(path, value: bytes): - if path == '/admin/reassign_partitions': - for item in json.loads(value.decode('utf-8'))['partitions']: - topic_data[(item['topic'], str(item['partition']))] = [str(x) for x in item['replicas']] - return - raise NotImplementedError('set {}, {} is not supported'.format(path, value)) - - a = MagicMock() - a.get_children = _get_children - a.get = _get - a.create = _create - return sorted(list(set(functools.reduce(lambda x, y: x + y, topic_data.values(), [])))), a +def __create_zk_for_topics(topic_data, broker_ids=None) -> (list, BukuProxy): + buku_proxy = MagicMock() + buku_proxy.get_broker_ids.return_value = broker_ids if broker_ids else sorted(list( + set(functools.reduce(lambda x, y: x + y, topic_data.values(), [])))) + + def _load_assignment(): + return [(k[0], int(k[1]), [int(p) for p in v]) for k, v in topic_data.items()] + + buku_proxy.load_partition_assignment = _load_assignment + buku_proxy.is_rebalancing.return_value = False + + def _reassign(topic, partition, replicas): + topic_data[(topic, str(partition))] = [str(x) for x in replicas] + return True + + buku_proxy.reallocate_partition = _reassign + return sorted(list(set(functools.reduce(lambda x, y: x + y, topic_data.values(), [])))), buku_proxy def test_rebalance_on_empty1(): @@ -160,13 +145,13 @@ def test_rebalance_invoked_on_broker_list_change(): zk.get = MagicMock(side_effect=NoNodeError) check = RebalanceOnBrokerListChange(zk, MagicMock()) - zk.get_children = lambda x: ['1', '2', '3'] + zk.get_broker_ids.return_value = ['1', '2', '3'] assert check.check() is not None assert check.check() is None - zk.get_children = lambda x: ['2', '1', '3'] + zk.get_broker_ids.return_value = ['1', '2', '3'] assert check.check() is None - zk.get_children = lambda x: ['2', '1', '4'] + zk.get_broker_ids.return_value = ['1', '2', '4'] assert check.check() is not None assert check.check() is None From 53aa8fe8e95ec34e6d123758a98c6d09865ee2f4 Mon Sep 17 00:00:00 2001 From: antban Date: Thu, 4 Aug 2016 12:50:30 +0200 Subject: [PATCH 02/10] Use BukuProxy in BrokerManager --- bubuku/broker.py | 39 ++++++++-------- bubuku/daemon.py | 12 ++--- bubuku/features/restart_on_zk_change.py | 6 +-- bubuku/zookeeper.py | 33 +++++++++++--- tests/test_broker.py | 60 +++++-------------------- tests/test_daemon.py | 4 +- 6 files changed, 68 insertions(+), 86 deletions(-) diff --git a/bubuku/broker.py b/bubuku/broker.py index 3dcff85..e8c4910 100644 --- a/bubuku/broker.py +++ b/bubuku/broker.py @@ -4,7 +4,7 @@ from bubuku.config import KafkaProperties from bubuku.id_generator import BrokerIdGenerator -from bubuku.zookeeper import Exhibitor +from bubuku.zookeeper import BukuProxy _LOG = logging.getLogger('bubuku.broker') @@ -14,7 +14,7 @@ class LeaderElectionInProgress(Exception): class BrokerManager(object): - def __init__(self, kafka_dir: str, exhibitor: Exhibitor, id_manager: BrokerIdGenerator, + def __init__(self, kafka_dir: str, exhibitor: BukuProxy, id_manager: BrokerIdGenerator, kafka_properties: KafkaProperties): self.kafka_dir = kafka_dir self.id_manager = id_manager @@ -72,7 +72,7 @@ def start_kafka_process(self, zookeeper_address): :raise LeaderElectionInProgress: raised when broker can not be started because leader election is in progress """ if not self.process: - if not self._is_leadership_transferred(active_broker_ids=self.exhibitor.get_children('/brokers/ids')): + if not self._is_leadership_transferred(active_broker_ids=self.exhibitor.get_broker_ids()): raise LeaderElectionInProgress() broker_id = self.id_manager.get_broker_id() @@ -101,25 +101,22 @@ def _is_leadership_transferred(self, active_broker_ids=None, dead_broker_ids=Non _LOG.info('Checking if leadership is transferred: active_broker_ids={}, dead_broker_ids={}'.format( active_broker_ids, dead_broker_ids)) if self._is_clean_election(): - for topic in self.exhibitor.get_children('/brokers/topics'): - for partition in self.exhibitor.get_children('/brokers/topics/{}/partitions'.format(topic)): - state_str = self.exhibitor.get('/brokers/topics/{}/partitions/{}/state'.format( - topic, partition))[0].decode('utf-8') - state = json.loads(state_str) - leader = str(state['leader']) - if active_broker_ids and leader not in active_broker_ids: - if any(str(x) in active_broker_ids for x in state.get('isr', [])): - _LOG.warn( - 'Leadership is not transferred for {} {} ({}, brokers: {})'.format( - topic, partition, json.dumps(state), active_broker_ids)) - return False - else: - _LOG.warn('Shit happens! No single isr available for {}, {}, state: {}, ' - 'skipping check for that'.format(topic, partition, json.dumps(state))) - if dead_broker_ids and leader in dead_broker_ids: - _LOG.warn('Leadership is not transferred for {} {}, {} (dead list: {})'.format( - topic, partition, json.dumps(state), dead_broker_ids)) + for topic, partition, state in self.exhibitor.load_partition_states(): + leader = str(state['leader']) + if active_broker_ids and leader not in active_broker_ids: + if any(str(x) in active_broker_ids for x in state.get('isr', [])): + _LOG.warn( + 'Leadership is not transferred for {} {} ({}, brokers: {})'.format( + topic, partition, json.dumps(state), active_broker_ids)) return False + else: + _LOG.warn('Shit happens! No single isr available for {}, {}, state: {}, ' + 'skipping check for that'.format(topic, partition, json.dumps(state))) + if dead_broker_ids and leader in dead_broker_ids: + _LOG.warn('Leadership is not transferred for {} {}, {} (dead list: {})'.format( + topic, partition, json.dumps(state), dead_broker_ids)) + return False + return True def _open_process(self): diff --git a/bubuku/daemon.py b/bubuku/daemon.py index 016b2f7..dbe0ba7 100644 --- a/bubuku/daemon.py +++ b/bubuku/daemon.py @@ -15,17 +15,16 @@ from bubuku.features.terminate import register_terminate_on_interrupt from bubuku.id_generator import get_broker_id_policy from bubuku.utils import CmdHelper -from bubuku.zookeeper import load_exhibitor, Exhibitor, BukuProxy +from bubuku.zookeeper import load_exhibitor, BukuProxy _LOG = logging.getLogger('bubuku.main') -def apply_features(features: str, controller: Controller, exhibitor: Exhibitor, broker: BrokerManager, +def apply_features(features: str, controller: Controller, buku_proxy: BukuProxy, broker: BrokerManager, kafka_properties: KafkaProperties, amazon: Amazon) -> list: - buku_proxy = BukuProxy(exhibitor) for feature in set(features.split(',')): if feature == 'restart_on_exhibitor': - controller.add_check(CheckExhibitorAddressChanged(exhibitor, broker)) + controller.add_check(CheckExhibitorAddressChanged(buku_proxy, broker)) elif feature == 'rebalance_on_start': controller.add_check(RebalanceOnStartCheck(buku_proxy, broker)) elif feature == 'rebalance_on_brokers_change': @@ -54,19 +53,20 @@ def main(): _LOG.info("Loading exhibitor configuration") exhibitor = load_exhibitor(amazon.get_addresses_by_lb_name(config.zk_stack_name), config.zk_prefix) + buku_proxy = BukuProxy(exhibitor) _LOG.info("Loading broker_id policy") broker_id_manager = get_broker_id_policy(config.id_policy, exhibitor, kafka_properties, amazon) _LOG.info("Building broker manager") - broker = BrokerManager(config.kafka_dir, exhibitor, broker_id_manager, kafka_properties) + broker = BrokerManager(config.kafka_dir, buku_proxy, broker_id_manager, kafka_properties) _LOG.info("Creating controller") controller = Controller(broker, exhibitor, amazon) controller.add_check(CheckBrokerStopped(broker, exhibitor)) - apply_features(config.features, controller, exhibitor, broker, kafka_properties, amazon) + apply_features(config.features, controller, buku_proxy, broker, kafka_properties, amazon) _LOG.info('Starting health server') health.start_server(config.health_port) diff --git a/bubuku/features/restart_on_zk_change.py b/bubuku/features/restart_on_zk_change.py index c805346..c592a19 100644 --- a/bubuku/features/restart_on_zk_change.py +++ b/bubuku/features/restart_on_zk_change.py @@ -2,7 +2,7 @@ from bubuku.broker import BrokerManager from bubuku.controller import Change, Check -from bubuku.zookeeper import Exhibitor +from bubuku.zookeeper import BukuProxy _LOG = logging.getLogger('bubuku.features.restart_on_zk') @@ -11,7 +11,7 @@ class RestartBrokerOnZkChange(Change): - def __init__(self, zk_hosts: str, zk: Exhibitor, broker: BrokerManager): + def __init__(self, zk_hosts: str, zk: BukuProxy, broker: BrokerManager): self.conn_str = zk_hosts self.zk = zk self.broker = broker @@ -53,7 +53,7 @@ def __str__(self): class CheckExhibitorAddressChanged(Check): - def __init__(self, zk: Exhibitor, broker: BrokerManager): + def __init__(self, zk: BukuProxy, broker: BrokerManager): super().__init__() self.zk = zk self.broker = broker diff --git a/bubuku/zookeeper.py b/bubuku/zookeeper.py index ad01e63..cb4f206 100644 --- a/bubuku/zookeeper.py +++ b/bubuku/zookeeper.py @@ -30,17 +30,17 @@ def poll(self): if self._next_poll and self._next_poll > time.time(): return False - json = self._query_exhibitors(self._exhibitors) - if not json: - json = self._query_exhibitors(self._master_exhibitors) + json_ = self._query_exhibitors(self._exhibitors) + if not json_: + json_ = self._query_exhibitors(self._master_exhibitors) - if isinstance(json, dict) and 'servers' in json and 'port' in json: + if isinstance(json_, dict) and 'servers' in json_ and 'port' in json_: self._next_poll = time.time() + self._poll_interval - zookeeper_hosts = ','.join([h + ':' + str(json['port']) for h in sorted(json['servers'])]) + zookeeper_hosts = ','.join([h + ':' + str(json_['port']) for h in sorted(json_['servers'])]) if self._zookeeper_hosts != zookeeper_hosts: _LOG.info('ZooKeeper connection string has changed: %s => %s', self._zookeeper_hosts, zookeeper_hosts) self._zookeeper_hosts = zookeeper_hosts - self._exhibitors = json['servers'] + self._exhibitors = json_['servers'] return True return False @@ -142,6 +142,20 @@ def load_partition_assignment(self) -> list: result.append((topic, int(k), v)) return result + def load_partition_states(self) -> list: + """ + Lists all the current partition states (leaders and isr list) + :return: list of tuples + (topic_name: str, partition: int, state: json from /brokers/topics/{}/partitions/{}/state) + """ + result = [] + for topic in self.exhibitor.get_children('/brokers/topics'): + for partition in self.exhibitor.get_children('/brokers/topics/{}/partitions'.format(topic)): + state = json.loads(self.exhibitor.get('/brokers/topics/{}/partitions/{}/state'.format( + topic, partition))[0].decode('utf-8')) + result.append((topic, int(partition), state)) + return result + def reallocate_partition(self, topic: str, partition: object, replicas: list) -> bool: """ Reallocates partition to replica list @@ -169,6 +183,13 @@ def reallocate_partition(self, topic: str, partition: object, replicas: list) -> _LOG.info("Waiting for free reallocation slot, still in progress...") return False + def get_conn_str(self): + """ + Calculates connection string in format usable by kafka + :return: connection string in form host:port[,host:port[...]]/path + """ + return self.exhibitor.get_conn_str() + def is_rebalancing(self): try: rebalance_data = self.exhibitor.get('/admin/reassign_partitions')[0].decode('utf-8') diff --git a/tests/test_broker.py b/tests/test_broker.py index ec8254b..b09473a 100644 --- a/tests/test_broker.py +++ b/tests/test_broker.py @@ -1,4 +1,3 @@ -import json from unittest.mock import MagicMock from bubuku.broker import BrokerManager, LeaderElectionInProgress @@ -12,32 +11,17 @@ def _open_process(self): def test_broker_checks_death(): exhibitor = MagicMock() - - def _get_children(path): - if path == '/brokers/topics': - return ['t1', 't2'] - if path == '/brokers/topics/t1/partitions': - return ['0'] - if path == '/brokers/topics/t2/partitions': - return ['1'] - states = [2, 2] - def _get(path): - if path == '/brokers/topics/t1/partitions/0/state': - topic = 1 - elif path == '/brokers/topics/t2/partitions/1/state': - topic = 2 - else: - raise NotImplementedError('Not implemented for path {}'.format(path)) - curstate = states[topic - 1] - states[topic - 1] -= 1 - return json.dumps( - {"leader": 1 if curstate == 2 else 3, "isr": [1, 3] if curstate >= 1 else [3]} - ).encode('utf-8'), 'xxx' - - exhibitor.get_children = _get_children - exhibitor.get = _get + def _load_states(): + for idx in range(0, len(states)): + states[idx] -= 1 + return [ + ('t1', 0, {'leader': states[0], 'isr': [1, 3] if states[0] >= 1 else [3]}), + ('t2', 0, {'leader': states[1], 'isr': [1, 3] if states[1] >= 1 else [3]}) + ] + + exhibitor.load_partition_states = _load_states id_manager = MagicMock() id_manager.get_broker_id = lambda: '1' @@ -49,35 +33,15 @@ def _get(path): assert not manager.has_leadership() kafka_props.set_property('unclean.leader.election.enable', 'false') - for i in range(0, 2): - assert manager.has_leadership(), 'For iteration {}'.format(i) + assert manager.has_leadership() assert not manager.has_leadership() def __prepare_for_start_fail(broker_ids, leader, isr): exhibitor = MagicMock() + exhibitor.get_broker_ids.return_value = broker_ids + exhibitor.load_partition_states.return_value = [('t0', 0, {'leader': int(leader), 'isr': [int(i) for i in isr]})] - def _get_children(path): - if path == '/brokers/ids': - return broker_ids - elif path == '/brokers/topics': - return ['t1'] - elif path == '/brokers/topics/t1/partitions': - return ['0'] - else: - raise NotImplementedError('Not implemented for path {}'.format(path)) - - def _get(path): - if path == '/brokers/topics/t1/partitions/0/state': - return json.dumps({ - 'leader': leader, - 'isr': isr - }).encode('utf-8'), 'zzz' - else: - raise NotImplementedError('Not implemented for path {}'.format(path)) - - exhibitor.get_children = _get_children - exhibitor.get = _get id_manager = MagicMock() id_manager.get_broker_id = lambda: '1' kafka_props = build_test_properties() diff --git a/tests/test_daemon.py b/tests/test_daemon.py index 963c083..71c01f8 100644 --- a/tests/test_daemon.py +++ b/tests/test_daemon.py @@ -41,7 +41,7 @@ def test_rebalance_on_start(): assert len(controller.checks) == 1 check = controller.checks[0] assert type(check) == RebalanceOnStartCheck - assert check.zk.exhibitor == exhibitor + assert check.zk == exhibitor assert check.broker == broker assert not check.executed @@ -57,7 +57,7 @@ def test_rebalance_on_broker_list_change(): assert len(controller.checks) == 1 check = controller.checks[0] assert type(check) == RebalanceOnBrokerListChange - assert check.zk.exhibitor == exhibitor + assert check.zk == exhibitor assert check.broker == broker From f309c7c7ca62ddc8dabb3dca8993291f473aabc9 Mon Sep 17 00:00:00 2001 From: antban Date: Thu, 4 Aug 2016 13:04:10 +0200 Subject: [PATCH 03/10] Use BukuProxy in Controller --- bubuku/controller.py | 25 +++++++------------------ bubuku/daemon.py | 2 +- bubuku/zookeeper.py | 21 +++++++++++++++++++++ tests/test_controller.py | 39 +++------------------------------------ 4 files changed, 32 insertions(+), 55 deletions(-) diff --git a/bubuku/controller.py b/bubuku/controller.py index 0d04747..04c3693 100644 --- a/bubuku/controller.py +++ b/bubuku/controller.py @@ -1,11 +1,9 @@ import logging from time import sleep, time -from kazoo.exceptions import NodeExistsError - from bubuku.amazon import Amazon from bubuku.broker import BrokerManager -from bubuku.zookeeper import Exhibitor +from bubuku.zookeeper import BukuProxy _LOG = logging.getLogger('bubuku.controller') @@ -47,7 +45,7 @@ def _exclude_self(ip, name, running_actions): class Controller(object): - def __init__(self, broker_manager: BrokerManager, zk: Exhibitor, amazon: Amazon): + def __init__(self, broker_manager: BrokerManager, zk: BukuProxy, amazon: Amazon): self.broker_manager = broker_manager self.zk = zk self.amazon = amazon @@ -61,13 +59,10 @@ def add_check(self, check): def _register_running_changes(self, ip: str) -> dict: _LOG.debug('Taking lock for processing') - with self.zk.take_lock('/bubuku/global_lock', ip): + with self.zk.lock(ip): _LOG.debug('Lock is taken') # Get list of current running changes - running_changes = { - change: self.zk.get('/bubuku/changes/{}'.format(change))[0].decode('utf-8') - for change in self.zk.get_children('/bubuku/changes') - } + running_changes = self.zk.get_running_changes() if running_changes: _LOG.info("Running changes: {}".format(running_changes)) # Register changes to run @@ -76,8 +71,7 @@ def _register_running_changes(self, ip: str) -> dict: first_change = change_list[0] if first_change.can_run(_exclude_self(ip, name, running_changes)): if name not in running_changes: - _LOG.info('Registering change in zk: {}'.format(name)) - self.zk.create('/bubuku/changes/{}'.format(name), ip.encode('utf-8'), ephemeral=True) + self.zk.register_change(name, ip) running_changes[name] = ip else: _LOG.info('Change {} is waiting for others: {}'.format(name, running_changes)) @@ -107,18 +101,13 @@ def _release_changes_lock(self, changes_to_remove): del self.changes[change_name][0] if not self.changes[change_name]: del self.changes[change_name] - with self.zk.take_lock('/bubuku/global_lock'): + with self.zk.lock(): for name in changes_to_remove: - _LOG.info('Removing action {} from locks'.format(name)) - self.zk.delete('/bubuku/changes/{}'.format(name), recursive=True) + self.zk.unregister_change(name) def loop(self): ip = self.amazon.get_own_ip() - try: - self.zk.create('/bubuku/changes', makepath=True) - except NodeExistsError: - pass while self.running or self.changes: self.make_step(ip) diff --git a/bubuku/daemon.py b/bubuku/daemon.py index dbe0ba7..a5efd16 100644 --- a/bubuku/daemon.py +++ b/bubuku/daemon.py @@ -62,7 +62,7 @@ def main(): broker = BrokerManager(config.kafka_dir, buku_proxy, broker_id_manager, kafka_properties) _LOG.info("Creating controller") - controller = Controller(broker, exhibitor, amazon) + controller = Controller(broker, buku_proxy, amazon) controller.add_check(CheckBrokerStopped(broker, exhibitor)) diff --git a/bubuku/zookeeper.py b/bubuku/zookeeper.py index cb4f206..7969518 100644 --- a/bubuku/zookeeper.py +++ b/bubuku/zookeeper.py @@ -122,6 +122,10 @@ def take_lock(self, *args, **kwargs): class BukuProxy(object): def __init__(self, exhibitor: Exhibitor): self.exhibitor = exhibitor + try: + self.exhibitor.create('/bubuku/changes', makepath=True) + except NodeExistsError: + pass def get_broker_ids(self) -> list: """ @@ -198,6 +202,23 @@ def is_rebalancing(self): except NoNodeError: return False + def lock(self, lock_data=None): + return self.exhibitor.take_lock('/bubuku/global_lock', lock_data) + + def get_running_changes(self) -> dict: + return { + change: self.exhibitor.get('/bubuku/changes/{}'.format(change))[0].decode('utf-8') + for change in self.exhibitor.get_children('/bubuku/changes') + } + + def register_change(self, name, ip): + _LOG.info('Registering change in zk: {}'.format(name)) + self.exhibitor.create('/bubuku/changes/{}'.format(name), ip.encode('utf-8'), ephemeral=True) + + def unregister_change(self, name): + _LOG.info('Removing change {} from locks'.format(name)) + self.exhibitor.delete('/bubuku/changes/{}'.format(name), recursive=True) + def load_exhibitor(initial_hosts: list, zookeeper_prefix): return Exhibitor(initial_hosts, 8181, zookeeper_prefix) diff --git a/tests/test_controller.py b/tests/test_controller.py index b39575d..d8f93aa 100644 --- a/tests/test_controller.py +++ b/tests/test_controller.py @@ -1,7 +1,5 @@ from unittest.mock import MagicMock -from kazoo.exceptions import NoNodeError, NodeExistsError - from bubuku.controller import Controller, Check, Change @@ -36,41 +34,10 @@ def check(self): current_changes = {} ip = 'fake' zk = MagicMock() + zk.get_running_changes.return_value = current_changes + zk.register_change = lambda x, y: current_changes.update({x: y}) + zk.unregister_change = lambda x: current_changes.pop(x) - def _get_children(path): - if path == '/bubuku/changes': - return current_changes.keys() - else: - raise NotImplementedError() - - def _get(path: str): - if path.startswith('/bubuku/changes/'): - return current_changes.get(path[len('/bubuku/changes/'):]), 'xxx' - raise NoNodeError() - - def _create(path: str, data: bytes, ephemeral=False): - if path.startswith('/bubuku/changes/'): - name = path[len('/bubuku/changes/'):] - if name in current_changes: - raise NodeExistsError() - assert ephemeral - current_changes[name] = data - else: - raise NotImplementedError() - - def _delete(path, **kwargs): - if path.startswith('/bubuku/changes/'): - name = path[len('/bubuku/changes/'):] - if name not in current_changes: - raise NoNodeError() - del current_changes[name] - else: - raise NotImplementedError() - - zk.get_children = _get_children - zk.get = _get - zk.create = _create - zk.delete = _delete controller = Controller(MagicMock(), zk, MagicMock()) controller.add_check(FakeCheck()) From 50bd9349145f6e169db509af89aa943fca7b1801 Mon Sep 17 00:00:00 2001 From: antban Date: Thu, 4 Aug 2016 13:28:52 +0200 Subject: [PATCH 04/10] Use BukuProxy in id_generator --- bubuku/daemon.py | 2 +- bubuku/id_generator.py | 29 +++++++++-------------------- bubuku/zookeeper.py | 7 +++++++ 3 files changed, 17 insertions(+), 21 deletions(-) diff --git a/bubuku/daemon.py b/bubuku/daemon.py index a5efd16..ee410ba 100644 --- a/bubuku/daemon.py +++ b/bubuku/daemon.py @@ -56,7 +56,7 @@ def main(): buku_proxy = BukuProxy(exhibitor) _LOG.info("Loading broker_id policy") - broker_id_manager = get_broker_id_policy(config.id_policy, exhibitor, kafka_properties, amazon) + broker_id_manager = get_broker_id_policy(config.id_policy, buku_proxy, kafka_properties, amazon) _LOG.info("Building broker manager") broker = BrokerManager(config.kafka_dir, buku_proxy, broker_id_manager, kafka_properties) diff --git a/bubuku/id_generator.py b/bubuku/id_generator.py index f0ec0ec..f5d7093 100755 --- a/bubuku/id_generator.py +++ b/bubuku/id_generator.py @@ -5,19 +5,14 @@ import re from time import sleep, time -from kazoo.client import NoNodeError - from bubuku.amazon import Amazon from bubuku.config import KafkaProperties -from bubuku.zookeeper import Exhibitor +from bubuku.zookeeper import BukuProxy _LOG = logging.getLogger('bubuku.id_generator') class BrokerIdGenerator(object): - def __init__(self, zk: Exhibitor): - self.zk = zk - def get_broker_id(self) -> str: raise NotImplementedError('Not implemented') @@ -25,13 +20,6 @@ def wait_for_broker_id_absence(self): while self.is_registered(): sleep(1) - def _is_registered_in_zk(self, id_): - try: - _, stat = self.zk.get('/brokers/ids/{}'.format(id_)) - return stat is not None - except NoNodeError: - return False - def wait_for_broker_id_presence(self, timeout) -> bool: start = time() while not self.is_registered(): @@ -59,8 +47,8 @@ def _create_rfc1918_address_hash(ip: str) -> (str, str): class BrokerIDByIp(BrokerIdGenerator): - def __init__(self, zk: Exhibitor, ip: str, kafka_props: KafkaProperties): - super().__init__(zk) + def __init__(self, zk: BukuProxy, ip: str, kafka_props: KafkaProperties): + self.zk = zk self.broker_id, max_id = _create_rfc1918_address_hash(ip) kafka_props.set_property('reserved.broker.max.id', max_id) _LOG.info('Built broker id {} from ip: {}'.format(self.broker_id, ip)) @@ -71,12 +59,13 @@ def get_broker_id(self): return self.broker_id def is_registered(self): - return self._is_registered_in_zk(self.broker_id) + return self.zk.is_broker_registered(self.broker_id) class BrokerIdAutoAssign(BrokerIdGenerator): - def __init__(self, zk: Exhibitor, kafka_properties: KafkaProperties): - super().__init__(zk) + def __init__(self, zk: BukuProxy, kafka_properties: KafkaProperties): + super().__init__() + self.zk = zk self.kafka_properties = kafka_properties self.broker_id = None @@ -92,11 +81,11 @@ def is_registered(self): for line in lines: match = re.search('broker\.id=(\d+)', line) if match: - return self._is_registered_in_zk(match.group(1)) + return self.zk.is_broker_registered(match.group(1)) return False -def get_broker_id_policy(policy: str, zk: Exhibitor, kafka_props: KafkaProperties, amazon: Amazon) -> BrokerIdGenerator: +def get_broker_id_policy(policy: str, zk: BukuProxy, kafka_props: KafkaProperties, amazon: Amazon) -> BrokerIdGenerator: if policy == 'ip': return BrokerIDByIp(zk, amazon.get_own_ip(), kafka_props) elif policy == 'auto': diff --git a/bubuku/zookeeper.py b/bubuku/zookeeper.py index 7969518..16acee3 100644 --- a/bubuku/zookeeper.py +++ b/bubuku/zookeeper.py @@ -127,6 +127,13 @@ def __init__(self, exhibitor: Exhibitor): except NodeExistsError: pass + def is_broker_registered(self, broker_id): + try: + _, stat = self.exhibitor.get('/brokers/ids/{}'.format(broker_id)) + return stat is not None + except NoNodeError: + return False + def get_broker_ids(self) -> list: """ Gets list of available broker ids From b02ca9688fa7574bd4871ca9d1c61e611441ae3a Mon Sep 17 00:00:00 2001 From: antban Date: Thu, 4 Aug 2016 13:30:53 +0200 Subject: [PATCH 05/10] Use BukuProxy in restart check --- bubuku/daemon.py | 2 +- bubuku/features/restart_if_dead.py | 6 +++--- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/bubuku/daemon.py b/bubuku/daemon.py index ee410ba..d0f4c40 100644 --- a/bubuku/daemon.py +++ b/bubuku/daemon.py @@ -64,7 +64,7 @@ def main(): _LOG.info("Creating controller") controller = Controller(broker, buku_proxy, amazon) - controller.add_check(CheckBrokerStopped(broker, exhibitor)) + controller.add_check(CheckBrokerStopped(broker, buku_proxy)) apply_features(config.features, controller, buku_proxy, broker, kafka_properties, amazon) diff --git a/bubuku/features/restart_if_dead.py b/bubuku/features/restart_if_dead.py index d5ff6e1..e47dbb1 100644 --- a/bubuku/features/restart_if_dead.py +++ b/bubuku/features/restart_if_dead.py @@ -2,13 +2,13 @@ from bubuku.broker import BrokerManager from bubuku.controller import Change, Check -from bubuku.zookeeper import Exhibitor +from bubuku.zookeeper import BukuProxy _LOG = logging.getLogger('bubuku.features.restart_if_dead') class StartBrokerChange(Change): - def __init__(self, broker: BrokerManager, zk: Exhibitor): + def __init__(self, broker: BrokerManager, zk: BukuProxy): self.broker = broker self.zk = zk self.stopped = False @@ -41,7 +41,7 @@ def __str__(self): class CheckBrokerStopped(Check): - def __init__(self, broker: BrokerManager, zk: Exhibitor): + def __init__(self, broker: BrokerManager, zk: BukuProxy): super().__init__() self.broker = broker self.zk = zk From 2ea6569725107f5322072bab64a216519dcba336 Mon Sep 17 00:00:00 2001 From: antban Date: Thu, 4 Aug 2016 13:34:11 +0200 Subject: [PATCH 06/10] Rename buku exhibitor to correct name --- bubuku/broker.py | 4 ++-- bubuku/controller.py | 4 ++-- bubuku/daemon.py | 8 ++++---- bubuku/features/rebalance.py | 8 ++++---- bubuku/features/rebalance_by_size.py | 22 +++++----------------- bubuku/features/restart_if_dead.py | 6 +++--- bubuku/features/restart_on_zk_change.py | 6 +++--- bubuku/id_generator.py | 8 ++++---- bubuku/zookeeper.py | 18 +++++++++++++----- tests/test_rebalance.py | 4 ++-- tests/test_size_stats_collecting.py | 4 +--- 11 files changed, 43 insertions(+), 49 deletions(-) diff --git a/bubuku/broker.py b/bubuku/broker.py index e8c4910..12f8133 100644 --- a/bubuku/broker.py +++ b/bubuku/broker.py @@ -4,7 +4,7 @@ from bubuku.config import KafkaProperties from bubuku.id_generator import BrokerIdGenerator -from bubuku.zookeeper import BukuProxy +from bubuku.zookeeper import BukuExhibitor _LOG = logging.getLogger('bubuku.broker') @@ -14,7 +14,7 @@ class LeaderElectionInProgress(Exception): class BrokerManager(object): - def __init__(self, kafka_dir: str, exhibitor: BukuProxy, id_manager: BrokerIdGenerator, + def __init__(self, kafka_dir: str, exhibitor: BukuExhibitor, id_manager: BrokerIdGenerator, kafka_properties: KafkaProperties): self.kafka_dir = kafka_dir self.id_manager = id_manager diff --git a/bubuku/controller.py b/bubuku/controller.py index 04c3693..f8c6e72 100644 --- a/bubuku/controller.py +++ b/bubuku/controller.py @@ -3,7 +3,7 @@ from bubuku.amazon import Amazon from bubuku.broker import BrokerManager -from bubuku.zookeeper import BukuProxy +from bubuku.zookeeper import BukuExhibitor _LOG = logging.getLogger('bubuku.controller') @@ -45,7 +45,7 @@ def _exclude_self(ip, name, running_actions): class Controller(object): - def __init__(self, broker_manager: BrokerManager, zk: BukuProxy, amazon: Amazon): + def __init__(self, broker_manager: BrokerManager, zk: BukuExhibitor, amazon: Amazon): self.broker_manager = broker_manager self.zk = zk self.amazon = amazon diff --git a/bubuku/daemon.py b/bubuku/daemon.py index d0f4c40..f21164f 100644 --- a/bubuku/daemon.py +++ b/bubuku/daemon.py @@ -15,12 +15,13 @@ from bubuku.features.terminate import register_terminate_on_interrupt from bubuku.id_generator import get_broker_id_policy from bubuku.utils import CmdHelper -from bubuku.zookeeper import load_exhibitor, BukuProxy +from bubuku.zookeeper import load_exhibitor_proxy, BukuExhibitor +from bubuku.zookeeper import BukuExhibitor, load_exhibitor_proxy _LOG = logging.getLogger('bubuku.main') -def apply_features(features: str, controller: Controller, buku_proxy: BukuProxy, broker: BrokerManager, +def apply_features(features: str, controller: Controller, buku_proxy: BukuExhibitor, broker: BrokerManager, kafka_properties: KafkaProperties, amazon: Amazon) -> list: for feature in set(features.split(',')): if feature == 'restart_on_exhibitor': @@ -52,8 +53,7 @@ def main(): amazon = Amazon() _LOG.info("Loading exhibitor configuration") - exhibitor = load_exhibitor(amazon.get_addresses_by_lb_name(config.zk_stack_name), config.zk_prefix) - buku_proxy = BukuProxy(exhibitor) + buku_proxy = load_exhibitor_proxy(amazon.get_addresses_by_lb_name(config.zk_stack_name), config.zk_prefix) _LOG.info("Loading broker_id policy") broker_id_manager = get_broker_id_policy(config.id_policy, buku_proxy, kafka_properties, amazon) diff --git a/bubuku/features/rebalance.py b/bubuku/features/rebalance.py index ebde98e..708b31b 100644 --- a/bubuku/features/rebalance.py +++ b/bubuku/features/rebalance.py @@ -3,7 +3,7 @@ from bubuku.broker import BrokerManager from bubuku.controller import Check, Change -from bubuku.zookeeper import BukuProxy +from bubuku.zookeeper import BukuExhibitor _LOG = logging.getLogger('bubuku.features.rebalance') @@ -44,7 +44,7 @@ def _removal_func(): class RebalanceChange(Change): - def __init__(self, zk: BukuProxy, broker_list): + def __init__(self, zk: BukuExhibitor, broker_list): self.zk = zk self.broker_ids = broker_list self.stale_data = {} # partition count to topic data @@ -153,7 +153,7 @@ def dump_allocations(self): class RebalanceOnStartCheck(Check): - def __init__(self, zk: BukuProxy, broker: BrokerManager): + def __init__(self, zk: BukuExhibitor, broker: BrokerManager): super().__init__() self.zk = zk self.broker = broker @@ -173,7 +173,7 @@ def __str__(self): class RebalanceOnBrokerListChange(Check): - def __init__(self, zk: BukuProxy, broker: BrokerManager): + def __init__(self, zk: BukuExhibitor, broker: BrokerManager): super().__init__() self.zk = zk self.broker = broker diff --git a/bubuku/features/rebalance_by_size.py b/bubuku/features/rebalance_by_size.py index c354948..5f0f6e2 100644 --- a/bubuku/features/rebalance_by_size.py +++ b/bubuku/features/rebalance_by_size.py @@ -1,18 +1,15 @@ -import json import logging -from kazoo.exceptions import NodeExistsError - from bubuku.broker import BrokerManager from bubuku.controller import Check, Change from bubuku.utils import CmdHelper -from bubuku.zookeeper import Exhibitor +from bubuku.zookeeper import BukuExhibitor _LOG = logging.getLogger('bubuku.features.rebalance_by_size') class RebalanceBySizeChange(Change): - def __init__(self, zk: Exhibitor): + def __init__(self, zk: BukuExhibitor): self.zk = zk def get_name(self): @@ -29,7 +26,7 @@ def can_run_at_exit(self): class RebalanceBySize(Check): - def __init__(self, zk: Exhibitor, broker: BrokerManager): + def __init__(self, zk: BukuExhibitor, broker: BrokerManager): super().__init__(check_interval_s=600) self.zk = zk self.broker = broker @@ -45,7 +42,7 @@ def __is_data_imbalanced(self) -> bool: class GenerateDataSizeStatistics(Check): - def __init__(self, zk: Exhibitor, broker: BrokerManager, cmd_helper: CmdHelper, kafka_log_dirs): + def __init__(self, zk: BukuExhibitor, broker: BrokerManager, cmd_helper: CmdHelper, kafka_log_dirs): super().__init__(check_interval_s=1800) self.zk = zk self.broker = broker @@ -66,7 +63,7 @@ def __generate_stats(self): topics_stats = self.__get_topics_stats() disk_stats = self.__get_disk_stats() stats = {"disk": disk_stats, "topics": topics_stats} - self.__write_stats_to_zk(stats) + self.zk.update_disk_stats(self.broker.id_manager.get_broker_id(), stats) def __get_topics_stats(self): topics_stats = {} @@ -109,12 +106,3 @@ def __get_disk_stats(self): total_used += int(used) total_free += int(free) return {"used_kb": total_used, "free_kb": total_free} - - def __write_stats_to_zk(self, stats): - broker_id = self.broker.id_manager.get_broker_id() - data = json.dumps(stats, sort_keys=True, separators=(',', ':')).encode("utf-8") - path = "/bubuku/size_stats/{}".format(broker_id) - try: - self.zk.create(path, data, ephemeral=True, makepath=True) - except NodeExistsError: - self.zk.set(path, data) diff --git a/bubuku/features/restart_if_dead.py b/bubuku/features/restart_if_dead.py index e47dbb1..a714498 100644 --- a/bubuku/features/restart_if_dead.py +++ b/bubuku/features/restart_if_dead.py @@ -2,13 +2,13 @@ from bubuku.broker import BrokerManager from bubuku.controller import Change, Check -from bubuku.zookeeper import BukuProxy +from bubuku.zookeeper import BukuExhibitor _LOG = logging.getLogger('bubuku.features.restart_if_dead') class StartBrokerChange(Change): - def __init__(self, broker: BrokerManager, zk: BukuProxy): + def __init__(self, broker: BrokerManager, zk: BukuExhibitor): self.broker = broker self.zk = zk self.stopped = False @@ -41,7 +41,7 @@ def __str__(self): class CheckBrokerStopped(Check): - def __init__(self, broker: BrokerManager, zk: BukuProxy): + def __init__(self, broker: BrokerManager, zk: BukuExhibitor): super().__init__() self.broker = broker self.zk = zk diff --git a/bubuku/features/restart_on_zk_change.py b/bubuku/features/restart_on_zk_change.py index c592a19..b6b3be2 100644 --- a/bubuku/features/restart_on_zk_change.py +++ b/bubuku/features/restart_on_zk_change.py @@ -2,7 +2,7 @@ from bubuku.broker import BrokerManager from bubuku.controller import Change, Check -from bubuku.zookeeper import BukuProxy +from bubuku.zookeeper import BukuExhibitor _LOG = logging.getLogger('bubuku.features.restart_on_zk') @@ -11,7 +11,7 @@ class RestartBrokerOnZkChange(Change): - def __init__(self, zk_hosts: str, zk: BukuProxy, broker: BrokerManager): + def __init__(self, zk_hosts: str, zk: BukuExhibitor, broker: BrokerManager): self.conn_str = zk_hosts self.zk = zk self.broker = broker @@ -53,7 +53,7 @@ def __str__(self): class CheckExhibitorAddressChanged(Check): - def __init__(self, zk: BukuProxy, broker: BrokerManager): + def __init__(self, zk: BukuExhibitor, broker: BrokerManager): super().__init__() self.zk = zk self.broker = broker diff --git a/bubuku/id_generator.py b/bubuku/id_generator.py index f5d7093..e741a36 100755 --- a/bubuku/id_generator.py +++ b/bubuku/id_generator.py @@ -7,7 +7,7 @@ from bubuku.amazon import Amazon from bubuku.config import KafkaProperties -from bubuku.zookeeper import BukuProxy +from bubuku.zookeeper import BukuExhibitor _LOG = logging.getLogger('bubuku.id_generator') @@ -47,7 +47,7 @@ def _create_rfc1918_address_hash(ip: str) -> (str, str): class BrokerIDByIp(BrokerIdGenerator): - def __init__(self, zk: BukuProxy, ip: str, kafka_props: KafkaProperties): + def __init__(self, zk: BukuExhibitor, ip: str, kafka_props: KafkaProperties): self.zk = zk self.broker_id, max_id = _create_rfc1918_address_hash(ip) kafka_props.set_property('reserved.broker.max.id', max_id) @@ -63,7 +63,7 @@ def is_registered(self): class BrokerIdAutoAssign(BrokerIdGenerator): - def __init__(self, zk: BukuProxy, kafka_properties: KafkaProperties): + def __init__(self, zk: BukuExhibitor, kafka_properties: KafkaProperties): super().__init__() self.zk = zk self.kafka_properties = kafka_properties @@ -85,7 +85,7 @@ def is_registered(self): return False -def get_broker_id_policy(policy: str, zk: BukuProxy, kafka_props: KafkaProperties, amazon: Amazon) -> BrokerIdGenerator: +def get_broker_id_policy(policy: str, zk: BukuExhibitor, kafka_props: KafkaProperties, amazon: Amazon) -> BrokerIdGenerator: if policy == 'ip': return BrokerIDByIp(zk, amazon.get_own_ip(), kafka_props) elif policy == 'auto': diff --git a/bubuku/zookeeper.py b/bubuku/zookeeper.py index 16acee3..87e5f6e 100644 --- a/bubuku/zookeeper.py +++ b/bubuku/zookeeper.py @@ -62,7 +62,7 @@ def zookeeper_hosts(self): return self._zookeeper_hosts -class Exhibitor: +class _Exhibitor: def __init__(self, hosts, port, prefix): self.prefix = prefix self.exhibitor = ExhibitorEnsembleProvider(hosts, port, poll_interval=30) @@ -119,8 +119,8 @@ def take_lock(self, *args, **kwargs): _LOG.error('Failed to obtain lock for exhibitor, retrying', exc_info=e) -class BukuProxy(object): - def __init__(self, exhibitor: Exhibitor): +class BukuExhibitor(object): + def __init__(self, exhibitor: _Exhibitor): self.exhibitor = exhibitor try: self.exhibitor.create('/bubuku/changes', makepath=True) @@ -194,6 +194,14 @@ def reallocate_partition(self, topic: str, partition: object, replicas: list) -> _LOG.info("Waiting for free reallocation slot, still in progress...") return False + def update_disk_stats(self, broker_id: str, data: dict): + data_bytes = json.dumps(data, sort_keys=True).encode('utf-8') + path = '/bubuku/size_stats/{}'.format(broker_id) + try: + self.exhibitor.create(path, data_bytes, ephemeral=True, makepath=True) + except NodeExistsError: + self.exhibitor.set(path, data_bytes) + def get_conn_str(self): """ Calculates connection string in format usable by kafka @@ -227,5 +235,5 @@ def unregister_change(self, name): self.exhibitor.delete('/bubuku/changes/{}'.format(name), recursive=True) -def load_exhibitor(initial_hosts: list, zookeeper_prefix): - return Exhibitor(initial_hosts, 8181, zookeeper_prefix) +def load_exhibitor_proxy(initial_hosts: list, zookeeper_prefix) -> BukuExhibitor: + return BukuExhibitor(_Exhibitor(initial_hosts, 8181, zookeeper_prefix)) diff --git a/tests/test_rebalance.py b/tests/test_rebalance.py index 19012e5..5215e7f 100644 --- a/tests/test_rebalance.py +++ b/tests/test_rebalance.py @@ -4,7 +4,7 @@ from kazoo.exceptions import NoNodeError from bubuku.features.rebalance import RebalanceChange, RebalanceOnBrokerListChange, combine_broker_ids -from bubuku.zookeeper import BukuProxy +from bubuku.zookeeper import BukuExhibitor def test_rebalance_can_run(): @@ -25,7 +25,7 @@ def test_rebalance_get_name(): assert o.get_name() == 'rebalance' -def __create_zk_for_topics(topic_data, broker_ids=None) -> (list, BukuProxy): +def __create_zk_for_topics(topic_data, broker_ids=None) -> (list, BukuExhibitor): buku_proxy = MagicMock() buku_proxy.get_broker_ids.return_value = broker_ids if broker_ids else sorted(list( set(functools.reduce(lambda x, y: x + y, topic_data.values(), [])))) diff --git a/tests/test_size_stats_collecting.py b/tests/test_size_stats_collecting.py index ce85d5a..bc4fa5f 100644 --- a/tests/test_size_stats_collecting.py +++ b/tests/test_size_stats_collecting.py @@ -1,4 +1,3 @@ -import json from unittest.mock import MagicMock from bubuku.features.rebalance_by_size import GenerateDataSizeStatistics @@ -18,8 +17,7 @@ def test_size_stats_collecting(): "my-topic": {"0": 10, "2": 200} } } - expected_data = json.dumps(expected_json, sort_keys=True, separators=(',', ':')).encode("utf-8") - zk.create.assert_called_with("/bubuku/size_stats/dummy_id", expected_data, ephemeral=True, makepath=True) + zk.update_disk_stats.assert_called_with('dummy_id', expected_json) def mock_cmd_helper() -> CmdHelper: From da9e2d4bd9d5f2306c0d8ef8ee39322a90258527 Mon Sep 17 00:00:00 2001 From: antban Date: Thu, 4 Aug 2016 15:18:41 +0200 Subject: [PATCH 07/10] Create tests for BukuExhibitor --- bubuku/zookeeper.py | 2 +- tests/test_exhibitor.py | 159 ++++++++++++++++++++++++++++++++++++++++ 2 files changed, 160 insertions(+), 1 deletion(-) create mode 100644 tests/test_exhibitor.py diff --git a/bubuku/zookeeper.py b/bubuku/zookeeper.py index 87e5f6e..7cfde01 100644 --- a/bubuku/zookeeper.py +++ b/bubuku/zookeeper.py @@ -139,7 +139,7 @@ def get_broker_ids(self) -> list: Gets list of available broker ids :return: Sorted list of strings - active broker ids. """ - return sorted(self.exhibitor.get('/brokers/ids')) + return sorted(self.exhibitor.get_children('/brokers/ids')) def load_partition_assignment(self) -> list: """ diff --git a/tests/test_exhibitor.py b/tests/test_exhibitor.py new file mode 100644 index 0000000..6222ee2 --- /dev/null +++ b/tests/test_exhibitor.py @@ -0,0 +1,159 @@ +import json +import re +from unittest.mock import MagicMock + +from kazoo.exceptions import NoNodeError, NodeExistsError + +from bubuku.zookeeper import BukuExhibitor + + +def test_get_broker_ids(): + real_ex = MagicMock() + + def _get_children(path): + if path == '/brokers/ids': + return ['3', '1', '2'] + else: + raise NotImplementedError() + + real_ex.get_children = _get_children + + buku = BukuExhibitor(real_ex) + + assert ['1', '2', '3'] == buku.get_broker_ids() # ensure that return list is sorted + + +def test_is_broker_registered(): + def _get(path): + if path == '/brokers/ids/123': + return '123', object() + elif path == '/brokers/ids/321': + return None, None + else: + raise NoNodeError() + + real_ex = MagicMock() + real_ex.get = _get + buku = BukuExhibitor(real_ex) + + assert buku.is_broker_registered('123') + assert buku.is_broker_registered(123) + assert not buku.is_broker_registered('321') + assert not buku.is_broker_registered(321) + assert not buku.is_broker_registered(333) + assert not buku.is_broker_registered('333') + + +def test_load_partition_assignment(): + real_ex = MagicMock() + + def _get_children(path): + if path == '/brokers/topics': + return ['t01', 't02'] + else: + raise NotImplementedError() + + def _get(path): + if path == '/brokers/topics/t01': + return json.dumps({'partitions': {0: [1, 2, 3], 1: [3, 2, 1]}}).encode('utf-8'), object() + elif path == '/brokers/topics/t02': + return json.dumps({'partitions': {0: [4, 5, 6], 1: [5, 1, 2]}}).encode('utf-8'), object() + else: + raise NotImplementedError() + + real_ex.get = _get + real_ex.get_children = _get_children + + buku_ex = BukuExhibitor(real_ex) + + expected_result = [ + ('t01', 0, [1, 2, 3]), + ('t01', 1, [3, 2, 1]), + ('t02', 0, [4, 5, 6]), + ('t02', 1, [5, 1, 2]), + ] + result = buku_ex.load_partition_assignment() + assert len(expected_result) == len(result) + for e in expected_result: + assert e in result + + +def test_load_partition_states(): + real_ex = MagicMock() + + def _get_children(path): + if path == '/brokers/topics': + return ['t01', 't02'] + elif path == '/brokers/topics/t01/partitions': + return ['0', '1'] + elif path == '/brokers/topics/t02/partitions': + return ['0', '1', '2'] + else: + raise NotImplementedError() + + def _get(path): + matched = re.match('/brokers/topics/(.*)/partitions/(.*)/state', path) + if not matched: + raise NotImplementedError('Not implemented for path {}'.format(path)) + topic = matched.group(1) + partition = matched.group(2) + if topic == 't01' and partition not in ('0', '1'): + raise NotImplementedError() + elif topic == 't02' and partition not in ('0', '1', '2'): + raise NotImplementedError() + elif topic not in ('t01', 't02'): + raise NotImplementedError() + idx = (100 if topic == 't01' else 200) + int(partition) + return json.dumps({'fake_data': idx}).encode('utf-8'), object() + + real_ex.get = _get + real_ex.get_children = _get_children + + buku_ex = BukuExhibitor(real_ex) + + expected_result = [ + ('t01', 0, {'fake_data': 100}), + ('t01', 1, {'fake_data': 101}), + ('t02', 0, {'fake_data': 200}), + ('t02', 1, {'fake_data': 201}), + ('t02', 2, {'fake_data': 202}), + ] + + result = buku_ex.load_partition_states() + assert len(expected_result) == len(result) + for e in expected_result: + assert e in result + + +def test_reallocate_partition(): + call_idx = [0] + + def _create(path, value=None, **kwargs): + if path == '/bubuku/changes': + pass + elif path == '/admin/reassign_partitions': + if call_idx[0] >= 5: + raise NodeExistsError() + call_idx[0] += 1 + j = json.loads(value.decode('utf-8')) + assert j['version'] == '1' + assert len(j['partitions']) == 1 + p = j['partitions'][0] + assert p['topic'] == 't01' + assert p['partition'] == 0 + assert p['replicas'] == [1, 2, 3] + else: + raise NotImplementedError('Not implemented for path {}'.format(path)) + + zk = MagicMock() + zk.create = _create + + buku = BukuExhibitor(zk) + + assert buku.reallocate_partition('t01', 0, ['1', '2', '3']) + assert buku.reallocate_partition('t01', 0, ['1', '2', 3]) + assert buku.reallocate_partition('t01', 0, [1, 2, 3]) + assert buku.reallocate_partition('t01', 0, [1, 2, 3]) + assert buku.reallocate_partition('t01', 0, [1, 2, 3]) + # Node exists + assert not buku.reallocate_partition('t01', 0, [1, 2, 3]) From e98826ceb0799724e36f7e848854695083cb8d7d Mon Sep 17 00:00:00 2001 From: antban Date: Thu, 4 Aug 2016 15:26:33 +0200 Subject: [PATCH 08/10] Move to generators --- bubuku/zookeeper.py | 12 ++++-------- tests/test_exhibitor.py | 4 ++-- 2 files changed, 6 insertions(+), 10 deletions(-) diff --git a/bubuku/zookeeper.py b/bubuku/zookeeper.py index 7cfde01..8d30bdd 100644 --- a/bubuku/zookeeper.py +++ b/bubuku/zookeeper.py @@ -144,28 +144,24 @@ def get_broker_ids(self) -> list: def load_partition_assignment(self) -> list: """ Lists all the assignments of partitions to particular broker ids. - :returns list of tuples (topic_name:str, partition:int, replica_list:list(int)), for ex. "test", 0, [1,2,3] + :returns generator of tuples (topic_name:str, partition:int, replica_list:list(int)), for ex. "test", 0, [1,2,3] """ - result = [] for topic in self.exhibitor.get_children('/brokers/topics'): data = json.loads(self.exhibitor.get("/brokers/topics/" + topic)[0].decode('utf-8')) for k, v in data['partitions'].items(): - result.append((topic, int(k), v)) - return result + yield (topic, int(k), v) def load_partition_states(self) -> list: """ Lists all the current partition states (leaders and isr list) - :return: list of tuples + :return: generator of tuples (topic_name: str, partition: int, state: json from /brokers/topics/{}/partitions/{}/state) """ - result = [] for topic in self.exhibitor.get_children('/brokers/topics'): for partition in self.exhibitor.get_children('/brokers/topics/{}/partitions'.format(topic)): state = json.loads(self.exhibitor.get('/brokers/topics/{}/partitions/{}/state'.format( topic, partition))[0].decode('utf-8')) - result.append((topic, int(partition), state)) - return result + yield (topic, int(partition), state) def reallocate_partition(self, topic: str, partition: object, replicas: list) -> bool: """ diff --git a/tests/test_exhibitor.py b/tests/test_exhibitor.py index 6222ee2..ade780d 100644 --- a/tests/test_exhibitor.py +++ b/tests/test_exhibitor.py @@ -72,7 +72,7 @@ def _get(path): ('t02', 0, [4, 5, 6]), ('t02', 1, [5, 1, 2]), ] - result = buku_ex.load_partition_assignment() + result = [r for r in buku_ex.load_partition_assignment()] assert len(expected_result) == len(result) for e in expected_result: assert e in result @@ -119,7 +119,7 @@ def _get(path): ('t02', 2, {'fake_data': 202}), ] - result = buku_ex.load_partition_states() + result = [r for r in buku_ex.load_partition_states()] assert len(expected_result) == len(result) for e in expected_result: assert e in result From 9aa2b0ad28f6fff091124e15b7e73c63296883fd Mon Sep 17 00:00:00 2001 From: antban Date: Thu, 4 Aug 2016 17:40:18 +0200 Subject: [PATCH 09/10] Use async request counter - do not send more than 100 commands to zk --- bubuku/zookeeper.py | 79 +++++++++++++++++++++++++++++++++++------ tests/test_exhibitor.py | 52 +++++++++++++++++++++++---- 2 files changed, 114 insertions(+), 17 deletions(-) diff --git a/bubuku/zookeeper.py b/bubuku/zookeeper.py index 8d30bdd..8a0f723 100644 --- a/bubuku/zookeeper.py +++ b/bubuku/zookeeper.py @@ -1,11 +1,12 @@ import json import logging import random +import threading import time import requests from kazoo.client import KazooClient -from kazoo.exceptions import NoNodeError, NodeExistsError +from kazoo.exceptions import NoNodeError, NodeExistsError, ConnectionLossException from requests.exceptions import RequestException _LOG = logging.getLogger('bubuku.exhibitor') @@ -62,9 +63,28 @@ def zookeeper_hosts(self): return self._zookeeper_hosts +class WaitingCounter(object): + def __init__(self, limit=100): + self.limit = limit + self.counter = 0 + self.cv = threading.Condition() + + def increment(self): + with self.cv: + while self.counter >= self.limit: + self.cv.wait() + self.counter += 1 + + def decrement(self): + with self.cv: + self.counter -= 1 + self.cv.notify() + + class _Exhibitor: def __init__(self, hosts, port, prefix): self.prefix = prefix + self.async_counter = WaitingCounter(limit=100) self.exhibitor = ExhibitorEnsembleProvider(hosts, port, poll_interval=30) self.client = KazooClient(hosts=self.exhibitor.zookeeper_hosts + self.prefix, command_retry={ @@ -91,6 +111,17 @@ def get(self, *params): self._poll_exhibitor() return self.client.retry(self.client.get, *params) + def get_async(self, *params): + # Exhibitor is not polled here and it's totally fine! + self.async_counter.increment() + try: + i_async = self.client.get_async(*params) + i_async.rawlink(self.async_counter.decrement) + return i_async + except Exception as e: + self.async_counter.decrement() + raise e + def set(self, *args, **kwargs): self._poll_exhibitor() return self.client.retry(self.client.set, *args, **kwargs) @@ -120,8 +151,9 @@ def take_lock(self, *args, **kwargs): class BukuExhibitor(object): - def __init__(self, exhibitor: _Exhibitor): + def __init__(self, exhibitor: _Exhibitor, async=True): self.exhibitor = exhibitor + self.async = async try: self.exhibitor.create('/bubuku/changes', makepath=True) except NodeExistsError: @@ -146,10 +178,23 @@ def load_partition_assignment(self) -> list: Lists all the assignments of partitions to particular broker ids. :returns generator of tuples (topic_name:str, partition:int, replica_list:list(int)), for ex. "test", 0, [1,2,3] """ - for topic in self.exhibitor.get_children('/brokers/topics'): - data = json.loads(self.exhibitor.get("/brokers/topics/" + topic)[0].decode('utf-8')) - for k, v in data['partitions'].items(): - yield (topic, int(k), v) + if self.async: + results = [(topic, self.exhibitor.get_async('/brokers/topics/{}'.format(topic))) for topic in + self.exhibitor.get_children('/brokers/topics')] + for topic, cb in results: + try: + value, stat = cb.get(block=True) + except ConnectionLossException: + value, stat = self.exhibitor.get('/brokers/topics/{}'.format(topic)) + data = json.loads(value.decode('utf-8')) + for k, v in data['partitions'].items(): + yield (topic, int(k), v) + + else: + for topic in self.exhibitor.get_children('/brokers/topics'): + data = json.loads(self.exhibitor.get('/brokers/topics/{}'.format(topic))[0].decode('utf-8')) + for k, v in data['partitions'].items(): + yield (topic, int(k), v) def load_partition_states(self) -> list: """ @@ -157,11 +202,23 @@ def load_partition_states(self) -> list: :return: generator of tuples (topic_name: str, partition: int, state: json from /brokers/topics/{}/partitions/{}/state) """ - for topic in self.exhibitor.get_children('/brokers/topics'): - for partition in self.exhibitor.get_children('/brokers/topics/{}/partitions'.format(topic)): - state = json.loads(self.exhibitor.get('/brokers/topics/{}/partitions/{}/state'.format( - topic, partition))[0].decode('utf-8')) - yield (topic, int(partition), state) + if self.async: + asyncs = [] + for topic, partition, _ in self.load_partition_assignment(): + asyncs.append((topic, partition, self.exhibitor.get_async( + '/brokers/topics/{}/partitions/{}/state'.format(topic, partition)))) + for topic, partition, async in asyncs: + try: + value, stat = async.get(block=True) + except ConnectionLossException: + value, stat = self.exhibitor.get('/brokers/topics/{}/partitions/{}/state'.format(topic, partition)) + yield (topic, int(partition), json.loads(value.decode('utf-8'))) + else: + for topic in self.exhibitor.get_children('/brokers/topics'): + for partition in self.exhibitor.get_children('/brokers/topics/{}/partitions'.format(topic)): + state = json.loads(self.exhibitor.get('/brokers/topics/{}/partitions/{}/state'.format( + topic, partition))[0].decode('utf-8')) + yield (topic, int(partition), state) def reallocate_partition(self, topic: str, partition: object, replicas: list) -> bool: """ diff --git a/tests/test_exhibitor.py b/tests/test_exhibitor.py index ade780d..a9d4c2c 100644 --- a/tests/test_exhibitor.py +++ b/tests/test_exhibitor.py @@ -44,7 +44,7 @@ def _get(path): assert not buku.is_broker_registered('333') -def test_load_partition_assignment(): +def _test_load_partition_assignment(async: bool): real_ex = MagicMock() def _get_children(path): @@ -61,10 +61,20 @@ def _get(path): else: raise NotImplementedError() + def _get_async(path): + def _get_iresult(block): + assert block + return _get(path) + + mock = MagicMock() + mock.get = _get_iresult + return mock + real_ex.get = _get + real_ex.get_async = _get_async real_ex.get_children = _get_children - buku_ex = BukuExhibitor(real_ex) + buku_ex = BukuExhibitor(real_ex, async) expected_result = [ ('t01', 0, [1, 2, 3]), @@ -78,7 +88,15 @@ def _get(path): assert e in result -def test_load_partition_states(): +def test_load_partition_assignment_sync(): + _test_load_partition_assignment(False) + + +def test_load_partition_assignment_async(): + _test_load_partition_assignment(True) + + +def _test_load_partition_states(async: bool): real_ex = MagicMock() def _get_children(path): @@ -91,10 +109,14 @@ def _get_children(path): else: raise NotImplementedError() - def _get(path): + def _get(path: str): matched = re.match('/brokers/topics/(.*)/partitions/(.*)/state', path) if not matched: - raise NotImplementedError('Not implemented for path {}'.format(path)) + topic = path[len('/brokers/topics/'):] + if topic not in ['t01', 't02']: + raise NotImplementedError('Not implemented for path {}'.format(path)) + cnt = 2 if topic == 't01' else 3 + return json.dumps({'partitions': {x: None for x in range(0, cnt)}}).encode('utf-8'), object() topic = matched.group(1) partition = matched.group(2) if topic == 't01' and partition not in ('0', '1'): @@ -106,10 +128,20 @@ def _get(path): idx = (100 if topic == 't01' else 200) + int(partition) return json.dumps({'fake_data': idx}).encode('utf-8'), object() + def _get_async(path): + def _get_iasync(block): + assert block + return _get(path) + + mock = MagicMock() + mock.get = _get_iasync + return mock + real_ex.get = _get + real_ex.get_async = _get_async real_ex.get_children = _get_children - buku_ex = BukuExhibitor(real_ex) + buku_ex = BukuExhibitor(real_ex, async=async) expected_result = [ ('t01', 0, {'fake_data': 100}), @@ -125,6 +157,14 @@ def _get(path): assert e in result +def test_load_partition_states_sync(): + _test_load_partition_states(False) + + +def test_load_partition_states_async(): + _test_load_partition_states(True) + + def test_reallocate_partition(): call_idx = [0] From 9012eb7b7a481c979a936198bdeb96deea07c34d Mon Sep 17 00:00:00 2001 From: antban Date: Fri, 5 Aug 2016 12:14:36 +0200 Subject: [PATCH 10/10] Fixes after review --- bubuku/zookeeper.py | 2 +- tests/test_exhibitor.py | 38 +++++++++++++++++++------------------- 2 files changed, 20 insertions(+), 20 deletions(-) diff --git a/bubuku/zookeeper.py b/bubuku/zookeeper.py index 8a0f723..c6b2573 100644 --- a/bubuku/zookeeper.py +++ b/bubuku/zookeeper.py @@ -248,7 +248,7 @@ def reallocate_partition(self, topic: str, partition: object, replicas: list) -> return False def update_disk_stats(self, broker_id: str, data: dict): - data_bytes = json.dumps(data, sort_keys=True).encode('utf-8') + data_bytes = json.dumps(data, separators=(',', ':')).encode('utf-8') path = '/bubuku/size_stats/{}'.format(broker_id) try: self.exhibitor.create(path, data_bytes, ephemeral=True, makepath=True) diff --git a/tests/test_exhibitor.py b/tests/test_exhibitor.py index a9d4c2c..806124a 100644 --- a/tests/test_exhibitor.py +++ b/tests/test_exhibitor.py @@ -8,7 +8,7 @@ def test_get_broker_ids(): - real_ex = MagicMock() + exhibitor_mock = MagicMock() def _get_children(path): if path == '/brokers/ids': @@ -16,9 +16,9 @@ def _get_children(path): else: raise NotImplementedError() - real_ex.get_children = _get_children + exhibitor_mock.get_children = _get_children - buku = BukuExhibitor(real_ex) + buku = BukuExhibitor(exhibitor_mock) assert ['1', '2', '3'] == buku.get_broker_ids() # ensure that return list is sorted @@ -32,9 +32,9 @@ def _get(path): else: raise NoNodeError() - real_ex = MagicMock() - real_ex.get = _get - buku = BukuExhibitor(real_ex) + exhibitor_mock = MagicMock() + exhibitor_mock.get = _get + buku = BukuExhibitor(exhibitor_mock) assert buku.is_broker_registered('123') assert buku.is_broker_registered(123) @@ -45,7 +45,7 @@ def _get(path): def _test_load_partition_assignment(async: bool): - real_ex = MagicMock() + exhibitor_mock = MagicMock() def _get_children(path): if path == '/brokers/topics': @@ -70,11 +70,11 @@ def _get_iresult(block): mock.get = _get_iresult return mock - real_ex.get = _get - real_ex.get_async = _get_async - real_ex.get_children = _get_children + exhibitor_mock.get = _get + exhibitor_mock.get_async = _get_async + exhibitor_mock.get_children = _get_children - buku_ex = BukuExhibitor(real_ex, async) + buku_ex = BukuExhibitor(exhibitor_mock, async) expected_result = [ ('t01', 0, [1, 2, 3]), @@ -97,7 +97,7 @@ def test_load_partition_assignment_async(): def _test_load_partition_states(async: bool): - real_ex = MagicMock() + exhibitor_mock = MagicMock() def _get_children(path): if path == '/brokers/topics': @@ -137,11 +137,11 @@ def _get_iasync(block): mock.get = _get_iasync return mock - real_ex.get = _get - real_ex.get_async = _get_async - real_ex.get_children = _get_children + exhibitor_mock.get = _get + exhibitor_mock.get_async = _get_async + exhibitor_mock.get_children = _get_children - buku_ex = BukuExhibitor(real_ex, async=async) + buku_ex = BukuExhibitor(exhibitor_mock, async=async) expected_result = [ ('t01', 0, {'fake_data': 100}), @@ -185,10 +185,10 @@ def _create(path, value=None, **kwargs): else: raise NotImplementedError('Not implemented for path {}'.format(path)) - zk = MagicMock() - zk.create = _create + exhibitor_mock = MagicMock() + exhibitor_mock.create = _create - buku = BukuExhibitor(zk) + buku = BukuExhibitor(exhibitor_mock) assert buku.reallocate_partition('t01', 0, ['1', '2', '3']) assert buku.reallocate_partition('t01', 0, ['1', '2', 3])