diff --git a/bubuku/features/swap_partitions.py b/bubuku/features/swap_partitions.py index 35d3e14..a8f5f65 100644 --- a/bubuku/features/swap_partitions.py +++ b/bubuku/features/swap_partitions.py @@ -24,14 +24,6 @@ def run(self, current_actions): if self.should_be_paused(current_actions): _LOG.info("Pausing swap partitions change as there are conflicting actions: {}".format(current_actions)) return True - try: - _LOG.info("Running swap partitions change: {}".format(self)) - return self.__run_internal() - except Exception: - _LOG.warn("Error occurred when performing partitions swap change", exc_info=True) - return False - - def __run_internal(self): # if there's a rebalance currently running - postpone current change if self.zk.is_rebalancing(): return True @@ -83,19 +75,19 @@ def __perform_swap(self, rebalance_list): return self.zk.reallocate_partitions(rebalance_list) def __find_all_swap_candidates(self, fat_broker_id: int, slim_broker_id: int, topics_stats: dict) -> dict: - partition_assignment = self.zk.load_partition_assignment() - swap_partition_candidates = {} - for topic, partition, replicas in partition_assignment: + swap_partition_candidates = {fat_broker_id: [], slim_broker_id: []} + for topic, partition, replicas in self.zk.load_partition_assignment(): if topic not in topics_stats or str(partition) not in topics_stats[topic]: continue # we skip this partition as there is not data size stats for it + if replicas[0] in (fat_broker_id, slim_broker_id): + continue # Skip leadership transfer + if fat_broker_id in replicas and slim_broker_id in replicas: continue # we skip this partition as it exists on both involved brokers for broker_id in [slim_broker_id, fat_broker_id]: if broker_id in replicas: - if broker_id not in swap_partition_candidates: - swap_partition_candidates[broker_id] = [] swap_partition_candidates[broker_id].append( TpData(topic, partition, topics_stats[topic][str(partition)], replicas)) return swap_partition_candidates diff --git a/tests/test_partitions_swap.py b/tests/test_partitions_swap.py index 38fe993..884aa01 100644 --- a/tests/test_partitions_swap.py +++ b/tests/test_partitions_swap.py @@ -24,12 +24,12 @@ class TestPartitionsSwap(unittest.TestCase): } test_assignment = [ - ("t1", 1, [333, 111]), + ("t1", 1, [111, 333]), ("t1", 2, [111, 222]), - ("t2", 1, [111, 222]), + ("t2", 1, [222, 111]), ("t2", 2, [222, 333]), ("t3", 1, [333, 111]), - ("t3", 2, [222, 333]), + ("t3", 2, [333, 222]), ] def setUp(self): @@ -57,7 +57,7 @@ def _swap_data_provider(zk): result = swap_change.run([]) assert not result - self.zk.reallocate_partitions.assert_called_with([('t2', 2, [111, 222]), ('t2', 1, [222, 333])]) + self.zk.reallocate_partitions.assert_called_with([('t2', 2, [222, 111]), ('t2', 1, [222, 333])]) def test_swap_partitions_change_not_performed(self): swap_change = SwapPartitionsChange(self.zk, lambda x: load_swap_data(x, -1, 10001)) @@ -77,7 +77,7 @@ def test_swap_partitions_change_postponed(self): # if the write to ZK wasn't possible for some reason, the change should # return True and repeat write to ZK during next trigger by controller assert result - assert swap_change.to_move == [('t2', 2, [111, 222]), ('t2', 1, [222, 333])] + assert swap_change.to_move == [('t2', 2, [222, 111]), ('t2', 1, [222, 333])] def test_swap_partitions_change_postponed_when_rebalancing(self): self.zk.is_rebalancing.return_value = True