Skip to content
This repository has been archived by the owner on Oct 7, 2024. It is now read-only.

Commit

Permalink
I67 Fix rebalancing bug with no partitions left
Browse files Browse the repository at this point in the history
  • Loading branch information
antban committed Oct 6, 2016
1 parent acda8a5 commit e5dfb62
Show file tree
Hide file tree
Showing 3 changed files with 53 additions and 26 deletions.
11 changes: 3 additions & 8 deletions bubuku/features/rebalance/broker.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,14 +62,12 @@ class BrokerDescription(object):
'_broker_id',
'_leaders',
'_replicas',
'_topic_cardinality',
)

def __init__(self, broker_id: int):
self._broker_id = broker_id
self._leaders = _TopicPartitions()
self._replicas = _TopicPartitions()
self._topic_cardinality = None

@property
def broker_id(self):
Expand Down Expand Up @@ -153,14 +151,11 @@ def list_partitions(self, topic: str, replica: bool):
def list_replicas(self):
return self._replicas.iterate_items()

@property
def topic_cardinality(self):
def calculate_topic_cardinality(self):
"""
Calculates (or returns cached) 'topic to leader count' dictionary on this broker.
Calculates 'topic to leader count' dictionary on this broker.
For example, topic t0 have partitions 0, 1, 2, 3. If leaders for partitions 0, 3 are located on this broker
than return value will contain mapping t0->2 (there are 2 leaders for topic t0 on this broker)
:return: Dictionary with leaders count per topic for this broker.
"""
if self._topic_cardinality is None:
self._topic_cardinality = self._leaders.calculate_cardinality()
return self._topic_cardinality
return self._leaders.calculate_cardinality()
55 changes: 37 additions & 18 deletions bubuku/features/rebalance/change.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,22 +23,28 @@ class DistributionMap(object):
Topic distribution map. Used to correctly balance leadership across brokers. Internal collection of candidates is a
dict with reflection of (source_broker, target_broker) -> sorted_list_of_topics_and weights.
"""
__slots__ = [
'_candidates',
'_candidates_cardinality'
]

def __init__(self, brokers: iter):
self.candidates = {}
self._candidates = {}
self._candidates_cardinality = {broker: broker.calculate_topic_cardinality() for broker in brokers}
for source_broker in brokers:
if not source_broker.have_extra_leaders():
continue
for target_broker in brokers:
if not target_broker.have_less_leaders():
continue
weight_list = []
for topic in source_broker.topic_cardinality.keys():
delta = source_broker.topic_cardinality[topic] - target_broker.topic_cardinality.get(topic, 0)
for topic in self._candidates_cardinality[source_broker].keys():
delta = self._candidates_cardinality[source_broker][topic] - \
self._candidates_cardinality[target_broker].get(topic, 0)
weight_list.append((topic, delta))
# Now the first element is the first to rebalance.
if weight_list:
self.candidates[(source_broker, target_broker)] = sorted(
self._candidates[(source_broker, target_broker)] = sorted(
weight_list, key=lambda x: x[1], reverse=True)

def take_move_pair(self) -> tuple:
Expand All @@ -48,25 +54,36 @@ def take_move_pair(self) -> tuple:
:return: tuple source_broker, target_broker, topic_name
"""
top_candidate = None
for broker_pair, weight_list in self.candidates.items():
for broker_pair, weight_list in self._candidates.items():
if not top_candidate:
top_candidate = broker_pair
else:
if weight_list[0][1] > self.candidates[top_candidate][0][1]:
if weight_list[0][1] > self._candidates[top_candidate][0][1]:
top_candidate = broker_pair
# Taking best topic to move
topic, _ = self.candidates[top_candidate][0]
# Updating cardinality map
for broker_pair, weight_list in self.candidates.items():
if top_candidate == broker_pair:
DistributionMap._rearrange_topic_weights(weight_list, topic, -2)
elif top_candidate[0] == broker_pair[0] or top_candidate[1] == broker_pair[1]: # src or dst matches
DistributionMap._rearrange_topic_weights(weight_list, topic, -1)
topic, _ = self._candidates[top_candidate][0]
self._candidates_cardinality[top_candidate[0]][topic] -= 1
topic_exhausted = self._candidates_cardinality[top_candidate[0]][topic] == 0
if topic_exhausted:
for broker_pair, weight_list in self._candidates.items():
if top_candidate == broker_pair:
DistributionMap._rearrange_topic_weights(weight_list, topic, None)
elif top_candidate[0] == broker_pair[0]:
DistributionMap._rearrange_topic_weights(weight_list, topic, None)
elif top_candidate[1] == broker_pair[1]:
DistributionMap._rearrange_topic_weights(weight_list, topic, -1)
else:
# Updating cardinality map
for broker_pair, weight_list in self._candidates.items():
if top_candidate == broker_pair:
DistributionMap._rearrange_topic_weights(weight_list, topic, -2)
elif top_candidate[0] == broker_pair[0] or top_candidate[1] == broker_pair[1]: # src or dst matches
DistributionMap._rearrange_topic_weights(weight_list, topic, -1)
# Get broker objects
return top_candidate[0], top_candidate[1], topic

@staticmethod
def _rearrange_topic_weights(array: list, topic: str, cardinality_change: int):
def _rearrange_topic_weights(array: list, topic: str, cardinality_change):
old_cardinality = None
for item in array:
if item[0] == topic:
Expand All @@ -75,18 +92,20 @@ def _rearrange_topic_weights(array: list, topic: str, cardinality_change: int):
break
if old_cardinality is None:
return
if cardinality_change is None:
return
new_cardinality = old_cardinality + cardinality_change
idx = len(array)
for i in range(idx - 1, -1, -1):
idx = 0
for i in range(len(array) - 1, -1, -1):
if array[i][1] >= new_cardinality:
idx = i + 1
break
array.insert(idx, (topic, new_cardinality))

def cleanup(self):
for bp in [bp for bp, wl in self.candidates.items()
for bp in [bp for bp, wl in self._candidates.items()
if not bp[0].have_extra_leaders() or not bp[1].have_less_leaders()]:
del self.candidates[bp]
del self._candidates[bp]


class OptimizedRebalanceChange(BaseRebalanceChange):
Expand Down
13 changes: 13 additions & 0 deletions tests/test_rebalance.py
Original file line number Diff line number Diff line change
Expand Up @@ -194,3 +194,16 @@ def test_rebalance_invoked_on_broker_list_change(self):
zk.get_broker_ids.return_value = ['1', '2', '4']
assert check.check() is not None
assert check.check() is None

def test_leader_partition_limit(self):
distribution = {
('t0', '0'): ['1', '2'],
('t0', '1'): ['1', '2'],
('t0', '2'): ['1', '2'],
('t1', '2'): ['1', '2'],
}
_, zk = _create_zk_for_topics(distribution, ['2', '3'])
o = OptimizedRebalanceChange(zk, ['2', '3'])
while o.run([]):
pass
_verify_balanced(['2', '3'], distribution)

0 comments on commit e5dfb62

Please sign in to comment.