diff --git a/bubuku/features/rebalance/broker.py b/bubuku/features/rebalance/broker.py index cdca121..4cac9f5 100644 --- a/bubuku/features/rebalance/broker.py +++ b/bubuku/features/rebalance/broker.py @@ -62,14 +62,12 @@ class BrokerDescription(object): '_broker_id', '_leaders', '_replicas', - '_topic_cardinality', ) def __init__(self, broker_id: int): self._broker_id = broker_id self._leaders = _TopicPartitions() self._replicas = _TopicPartitions() - self._topic_cardinality = None @property def broker_id(self): @@ -153,14 +151,11 @@ def list_partitions(self, topic: str, replica: bool): def list_replicas(self): return self._replicas.iterate_items() - @property - def topic_cardinality(self): + def calculate_topic_cardinality(self): """ - Calculates (or returns cached) 'topic to leader count' dictionary on this broker. + Calculates 'topic to leader count' dictionary on this broker. For example, topic t0 have partitions 0, 1, 2, 3. If leaders for partitions 0, 3 are located on this broker than return value will contain mapping t0->2 (there are 2 leaders for topic t0 on this broker) :return: Dictionary with leaders count per topic for this broker. """ - if self._topic_cardinality is None: - self._topic_cardinality = self._leaders.calculate_cardinality() - return self._topic_cardinality + return self._leaders.calculate_cardinality() diff --git a/bubuku/features/rebalance/change.py b/bubuku/features/rebalance/change.py index a55c1d2..04cad4d 100644 --- a/bubuku/features/rebalance/change.py +++ b/bubuku/features/rebalance/change.py @@ -23,9 +23,14 @@ class DistributionMap(object): Topic distribution map. Used to correctly balance leadership across brokers. Internal collection of candidates is a dict with reflection of (source_broker, target_broker) -> sorted_list_of_topics_and weights. """ + __slots__ = [ + '_candidates', + '_candidates_cardinality' + ] def __init__(self, brokers: iter): - self.candidates = {} + self._candidates = {} + self._candidates_cardinality = {broker: broker.calculate_topic_cardinality() for broker in brokers} for source_broker in brokers: if not source_broker.have_extra_leaders(): continue @@ -33,12 +38,13 @@ def __init__(self, brokers: iter): if not target_broker.have_less_leaders(): continue weight_list = [] - for topic in source_broker.topic_cardinality.keys(): - delta = source_broker.topic_cardinality[topic] - target_broker.topic_cardinality.get(topic, 0) + for topic in self._candidates_cardinality[source_broker].keys(): + delta = self._candidates_cardinality[source_broker][topic] - \ + self._candidates_cardinality[target_broker].get(topic, 0) weight_list.append((topic, delta)) # Now the first element is the first to rebalance. if weight_list: - self.candidates[(source_broker, target_broker)] = sorted( + self._candidates[(source_broker, target_broker)] = sorted( weight_list, key=lambda x: x[1], reverse=True) def take_move_pair(self) -> tuple: @@ -48,25 +54,36 @@ def take_move_pair(self) -> tuple: :return: tuple source_broker, target_broker, topic_name """ top_candidate = None - for broker_pair, weight_list in self.candidates.items(): + for broker_pair, weight_list in self._candidates.items(): if not top_candidate: top_candidate = broker_pair else: - if weight_list[0][1] > self.candidates[top_candidate][0][1]: + if weight_list[0][1] > self._candidates[top_candidate][0][1]: top_candidate = broker_pair # Taking best topic to move - topic, _ = self.candidates[top_candidate][0] - # Updating cardinality map - for broker_pair, weight_list in self.candidates.items(): - if top_candidate == broker_pair: - DistributionMap._rearrange_topic_weights(weight_list, topic, -2) - elif top_candidate[0] == broker_pair[0] or top_candidate[1] == broker_pair[1]: # src or dst matches - DistributionMap._rearrange_topic_weights(weight_list, topic, -1) + topic, _ = self._candidates[top_candidate][0] + self._candidates_cardinality[top_candidate[0]][topic] -= 1 + topic_exhausted = self._candidates_cardinality[top_candidate[0]][topic] == 0 + if topic_exhausted: + for broker_pair, weight_list in self._candidates.items(): + if top_candidate == broker_pair: + DistributionMap._rearrange_topic_weights(weight_list, topic, None) + elif top_candidate[0] == broker_pair[0]: + DistributionMap._rearrange_topic_weights(weight_list, topic, None) + elif top_candidate[1] == broker_pair[1]: + DistributionMap._rearrange_topic_weights(weight_list, topic, -1) + else: + # Updating cardinality map + for broker_pair, weight_list in self._candidates.items(): + if top_candidate == broker_pair: + DistributionMap._rearrange_topic_weights(weight_list, topic, -2) + elif top_candidate[0] == broker_pair[0] or top_candidate[1] == broker_pair[1]: # src or dst matches + DistributionMap._rearrange_topic_weights(weight_list, topic, -1) # Get broker objects return top_candidate[0], top_candidate[1], topic @staticmethod - def _rearrange_topic_weights(array: list, topic: str, cardinality_change: int): + def _rearrange_topic_weights(array: list, topic: str, cardinality_change): old_cardinality = None for item in array: if item[0] == topic: @@ -75,18 +92,20 @@ def _rearrange_topic_weights(array: list, topic: str, cardinality_change: int): break if old_cardinality is None: return + if cardinality_change is None: + return new_cardinality = old_cardinality + cardinality_change - idx = len(array) - for i in range(idx - 1, -1, -1): + idx = 0 + for i in range(len(array) - 1, -1, -1): if array[i][1] >= new_cardinality: idx = i + 1 break array.insert(idx, (topic, new_cardinality)) def cleanup(self): - for bp in [bp for bp, wl in self.candidates.items() + for bp in [bp for bp, wl in self._candidates.items() if not bp[0].have_extra_leaders() or not bp[1].have_less_leaders()]: - del self.candidates[bp] + del self._candidates[bp] class OptimizedRebalanceChange(BaseRebalanceChange): diff --git a/tests/test_rebalance.py b/tests/test_rebalance.py index caa336d..0a3e442 100644 --- a/tests/test_rebalance.py +++ b/tests/test_rebalance.py @@ -194,3 +194,16 @@ def test_rebalance_invoked_on_broker_list_change(self): zk.get_broker_ids.return_value = ['1', '2', '4'] assert check.check() is not None assert check.check() is None + + def test_leader_partition_limit(self): + distribution = { + ('t0', '0'): ['1', '2'], + ('t0', '1'): ['1', '2'], + ('t0', '2'): ['1', '2'], + ('t1', '2'): ['1', '2'], + } + _, zk = _create_zk_for_topics(distribution, ['2', '3']) + o = OptimizedRebalanceChange(zk, ['2', '3']) + while o.run([]): + pass + _verify_balanced(['2', '3'], distribution)