Skip to content
This repository has been archived by the owner on Oct 7, 2024. It is now read-only.

Commit

Permalink
Merge pull request #28 from zalando-incubator/I25
Browse files Browse the repository at this point in the history
Use get_async for bulk operations on zookeeper, closes #28
  • Loading branch information
antban authored Aug 5, 2016
2 parents 643f69e + 9012eb7 commit 69906cf
Show file tree
Hide file tree
Showing 14 changed files with 488 additions and 273 deletions.
39 changes: 18 additions & 21 deletions bubuku/broker.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

from bubuku.config import KafkaProperties
from bubuku.id_generator import BrokerIdGenerator
from bubuku.zookeeper import Exhibitor
from bubuku.zookeeper import BukuExhibitor

_LOG = logging.getLogger('bubuku.broker')

Expand All @@ -14,7 +14,7 @@ class LeaderElectionInProgress(Exception):


class BrokerManager(object):
def __init__(self, kafka_dir: str, exhibitor: Exhibitor, id_manager: BrokerIdGenerator,
def __init__(self, kafka_dir: str, exhibitor: BukuExhibitor, id_manager: BrokerIdGenerator,
kafka_properties: KafkaProperties):
self.kafka_dir = kafka_dir
self.id_manager = id_manager
Expand Down Expand Up @@ -72,7 +72,7 @@ def start_kafka_process(self, zookeeper_address):
:raise LeaderElectionInProgress: raised when broker can not be started because leader election is in progress
"""
if not self.process:
if not self._is_leadership_transferred(active_broker_ids=self.exhibitor.get_children('/brokers/ids')):
if not self._is_leadership_transferred(active_broker_ids=self.exhibitor.get_broker_ids()):
raise LeaderElectionInProgress()

broker_id = self.id_manager.get_broker_id()
Expand Down Expand Up @@ -101,25 +101,22 @@ def _is_leadership_transferred(self, active_broker_ids=None, dead_broker_ids=Non
_LOG.info('Checking if leadership is transferred: active_broker_ids={}, dead_broker_ids={}'.format(
active_broker_ids, dead_broker_ids))
if self._is_clean_election():
for topic in self.exhibitor.get_children('/brokers/topics'):
for partition in self.exhibitor.get_children('/brokers/topics/{}/partitions'.format(topic)):
state_str = self.exhibitor.get('/brokers/topics/{}/partitions/{}/state'.format(
topic, partition))[0].decode('utf-8')
state = json.loads(state_str)
leader = str(state['leader'])
if active_broker_ids and leader not in active_broker_ids:
if any(str(x) in active_broker_ids for x in state.get('isr', [])):
_LOG.warn(
'Leadership is not transferred for {} {} ({}, brokers: {})'.format(
topic, partition, json.dumps(state), active_broker_ids))
return False
else:
_LOG.warn('Shit happens! No single isr available for {}, {}, state: {}, '
'skipping check for that'.format(topic, partition, json.dumps(state)))
if dead_broker_ids and leader in dead_broker_ids:
_LOG.warn('Leadership is not transferred for {} {}, {} (dead list: {})'.format(
topic, partition, json.dumps(state), dead_broker_ids))
for topic, partition, state in self.exhibitor.load_partition_states():
leader = str(state['leader'])
if active_broker_ids and leader not in active_broker_ids:
if any(str(x) in active_broker_ids for x in state.get('isr', [])):
_LOG.warn(
'Leadership is not transferred for {} {} ({}, brokers: {})'.format(
topic, partition, json.dumps(state), active_broker_ids))
return False
else:
_LOG.warn('Shit happens! No single isr available for {}, {}, state: {}, '
'skipping check for that'.format(topic, partition, json.dumps(state)))
if dead_broker_ids and leader in dead_broker_ids:
_LOG.warn('Leadership is not transferred for {} {}, {} (dead list: {})'.format(
topic, partition, json.dumps(state), dead_broker_ids))
return False

return True

def _open_process(self):
Expand Down
25 changes: 7 additions & 18 deletions bubuku/controller.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,9 @@
import logging
from time import sleep, time

from kazoo.exceptions import NodeExistsError

from bubuku.amazon import Amazon
from bubuku.broker import BrokerManager
from bubuku.zookeeper import Exhibitor
from bubuku.zookeeper import BukuExhibitor

_LOG = logging.getLogger('bubuku.controller')

Expand Down Expand Up @@ -47,7 +45,7 @@ def _exclude_self(ip, name, running_actions):


class Controller(object):
def __init__(self, broker_manager: BrokerManager, zk: Exhibitor, amazon: Amazon):
def __init__(self, broker_manager: BrokerManager, zk: BukuExhibitor, amazon: Amazon):
self.broker_manager = broker_manager
self.zk = zk
self.amazon = amazon
Expand All @@ -61,13 +59,10 @@ def add_check(self, check):

def _register_running_changes(self, ip: str) -> dict:
_LOG.debug('Taking lock for processing')
with self.zk.take_lock('/bubuku/global_lock', ip):
with self.zk.lock(ip):
_LOG.debug('Lock is taken')
# Get list of current running changes
running_changes = {
change: self.zk.get('/bubuku/changes/{}'.format(change))[0].decode('utf-8')
for change in self.zk.get_children('/bubuku/changes')
}
running_changes = self.zk.get_running_changes()
if running_changes:
_LOG.info("Running changes: {}".format(running_changes))
# Register changes to run
Expand All @@ -76,8 +71,7 @@ def _register_running_changes(self, ip: str) -> dict:
first_change = change_list[0]
if first_change.can_run(_exclude_self(ip, name, running_changes)):
if name not in running_changes:
_LOG.info('Registering change in zk: {}'.format(name))
self.zk.create('/bubuku/changes/{}'.format(name), ip.encode('utf-8'), ephemeral=True)
self.zk.register_change(name, ip)
running_changes[name] = ip
else:
_LOG.info('Change {} is waiting for others: {}'.format(name, running_changes))
Expand Down Expand Up @@ -107,18 +101,13 @@ def _release_changes_lock(self, changes_to_remove):
del self.changes[change_name][0]
if not self.changes[change_name]:
del self.changes[change_name]
with self.zk.take_lock('/bubuku/global_lock'):
with self.zk.lock():
for name in changes_to_remove:
_LOG.info('Removing action {} from locks'.format(name))
self.zk.delete('/bubuku/changes/{}'.format(name), recursive=True)
self.zk.unregister_change(name)

def loop(self):
ip = self.amazon.get_own_ip()

try:
self.zk.create('/bubuku/changes', makepath=True)
except NodeExistsError:
pass
while self.running or self.changes:
self.make_step(ip)

Expand Down
23 changes: 12 additions & 11 deletions bubuku/daemon.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,20 +15,21 @@
from bubuku.features.terminate import register_terminate_on_interrupt
from bubuku.id_generator import get_broker_id_policy
from bubuku.utils import CmdHelper
from bubuku.zookeeper import load_exhibitor, Exhibitor
from bubuku.zookeeper import load_exhibitor_proxy, BukuExhibitor
from bubuku.zookeeper import BukuExhibitor, load_exhibitor_proxy

_LOG = logging.getLogger('bubuku.main')


def apply_features(features: str, controller: Controller, exhibitor: Exhibitor, broker: BrokerManager,
def apply_features(features: str, controller: Controller, buku_proxy: BukuExhibitor, broker: BrokerManager,
kafka_properties: KafkaProperties, amazon: Amazon) -> list:
for feature in set(features.split(',')):
if feature == 'restart_on_exhibitor':
controller.add_check(CheckExhibitorAddressChanged(exhibitor, broker))
controller.add_check(CheckExhibitorAddressChanged(buku_proxy, broker))
elif feature == 'rebalance_on_start':
controller.add_check(RebalanceOnStartCheck(exhibitor, broker))
controller.add_check(RebalanceOnStartCheck(buku_proxy, broker))
elif feature == 'rebalance_on_brokers_change':
controller.add_check(RebalanceOnBrokerListChange(exhibitor, broker))
controller.add_check(RebalanceOnBrokerListChange(buku_proxy, broker))
elif feature == 'rebalance_by_size':
controller.add_check(GenerateDataSizeStatistics(exhibitor, broker, CmdHelper(),
kafka_properties.get_property("log.dirs").split(",")))
Expand All @@ -52,20 +53,20 @@ def main():
amazon = Amazon()

_LOG.info("Loading exhibitor configuration")
exhibitor = load_exhibitor(amazon.get_addresses_by_lb_name(config.zk_stack_name), config.zk_prefix)
buku_proxy = load_exhibitor_proxy(amazon.get_addresses_by_lb_name(config.zk_stack_name), config.zk_prefix)

_LOG.info("Loading broker_id policy")
broker_id_manager = get_broker_id_policy(config.id_policy, exhibitor, kafka_properties, amazon)
broker_id_manager = get_broker_id_policy(config.id_policy, buku_proxy, kafka_properties, amazon)

_LOG.info("Building broker manager")
broker = BrokerManager(config.kafka_dir, exhibitor, broker_id_manager, kafka_properties)
broker = BrokerManager(config.kafka_dir, buku_proxy, broker_id_manager, kafka_properties)

_LOG.info("Creating controller")
controller = Controller(broker, exhibitor, amazon)
controller = Controller(broker, buku_proxy, amazon)

controller.add_check(CheckBrokerStopped(broker, exhibitor))
controller.add_check(CheckBrokerStopped(broker, buku_proxy))

apply_features(config.features, controller, exhibitor, broker, kafka_properties, amazon)
apply_features(config.features, controller, buku_proxy, broker, kafka_properties, amazon)

_LOG.info('Starting health server')
health.start_server(config.health_port)
Expand Down
62 changes: 15 additions & 47 deletions bubuku/features/rebalance.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,9 @@
import json
import logging

from kazoo.exceptions import NodeExistsError, NoNodeError

from bubuku.broker import BrokerManager
from bubuku.controller import Check, Change
from bubuku.zookeeper import Exhibitor
from bubuku.zookeeper import BukuExhibitor

_LOG = logging.getLogger('bubuku.features.rebalance')

Expand Down Expand Up @@ -46,7 +44,7 @@ def _removal_func():


class RebalanceChange(Change):
def __init__(self, zk: Exhibitor, broker_list):
def __init__(self, zk: BukuExhibitor, broker_list):
self.zk = zk
self.broker_ids = broker_list
self.stale_data = {} # partition count to topic data
Expand All @@ -63,14 +61,6 @@ def get_name(self) -> str:
def can_run(self, current_actions):
return all([a not in current_actions for a in ['start', 'restart', 'rebalance', 'stop']])

def load_current_data(self):
result = []
for topic in self.zk.get_children('/brokers/topics'):
data = json.loads(self.zk.get("/brokers/topics/" + topic)[0].decode('utf-8'))
for k, v in data['partitions'].items():
result.append({"topic": topic, "partition": int(k), "replicas": v})
return result

def take_next(self) -> dict:
if self.stale_data:
k = [x for x in self.stale_data.keys()][0]
Expand Down Expand Up @@ -98,14 +88,10 @@ def run(self, current_actions):
_LOG.warning("Rebalance stopped, because other blocking events running: {}".format(current_actions))
return False

try:
rebalance_data = self.zk.get('/admin/reassign_partitions')[0].decode('utf-8')
_LOG.info('Old rebalance is still in progress: {}, waiting'.format(rebalance_data))
if self.zk.is_rebalancing():
return True
except NoNodeError:
pass

new_broker_ids = sorted(self.zk.get_children('/brokers/ids'))
new_broker_ids = self.zk.get_broker_ids()

if new_broker_ids != self.broker_ids:
_LOG.warning("Rebalance stopped because of broker list change from {} to {}".format(self.broker_ids,
Expand All @@ -117,17 +103,19 @@ def run(self, current_actions):
# Load existing data from zookeeper and try to split it for different purposes
if self.shuffled_broker_ids is None:
self.shuffled_broker_ids = {}
for d in self.load_current_data():
replication_factor = len(d['replicas'])
for topic, partition, replicas in self.zk.load_partition_assignment():
replication_factor = len(replicas)
if replication_factor > len(self.broker_ids):
_LOG.warning(
"Will not rebalance partition {} because only {} brokers available".format(d, self.broker_ids))
"Will not rebalance partition {}:{} because only {} brokers available".format(
topic, partition, self.broker_ids))
continue
if replication_factor not in self.shuffled_broker_ids:
self.shuffled_broker_ids[replication_factor] = {
k: [] for k in combine_broker_ids(self.broker_ids, replication_factor)
}
name = _optimise_broker_ids([str(i) for i in d['replicas']])
name = _optimise_broker_ids([str(i) for i in replicas])
d = {"topic": topic, "partition": partition, "replicas": replicas}
if name not in self.shuffled_broker_ids[replication_factor]:
if name not in self.stale_data:
self.stale_data[name] = []
Expand All @@ -149,17 +137,7 @@ def run(self, current_actions):
min_length = min([len(v) for v in self.shuffled_broker_ids[replication_factor].values()])
for k, v in self.shuffled_broker_ids[replication_factor].items():
if len(v) == min_length:
j = {
"version": "1",
"partitions": [
{
"topic": to_move['topic'],
"partition": to_move['partition'],
"replicas": [int(v) for v in k.split(',')],
}
]
}
if self.reallocate(j):
if self.zk.reallocate_partition(to_move['topic'], to_move['partition'], k.split(',')):
_LOG.info("Current allocation: \n{}".format(self.dump_allocations()))
removal_func()
v.append(to_move)
Expand All @@ -169,23 +147,13 @@ def run(self, current_actions):
_LOG.info("Current allocation: \n{}".format(self.dump_allocations()))
return False

def reallocate(self, j: dict):
try:
data = json.dumps(j)
self.zk.create("/admin/reassign_partitions", data.encode('utf-8'))
_LOG.info("Reallocating {}".format(data))
return True
except NodeExistsError:
_LOG.info("Waiting for free reallocation slot, still in progress...")
return False

def dump_allocations(self):
return '\n'.join(
['\n'.join(['{}:{}'.format(x, len(y)) for x, y in v.items()]) for v in self.shuffled_broker_ids.values()])


class RebalanceOnStartCheck(Check):
def __init__(self, zk, broker: BrokerManager):
def __init__(self, zk: BukuExhibitor, broker: BrokerManager):
super().__init__()
self.zk = zk
self.broker = broker
Expand All @@ -198,14 +166,14 @@ def check(self):
return None
_LOG.info("Rebalance on start, triggering rebalance")
self.executed = True
return RebalanceChange(self.zk, sorted(self.zk.get_children('/brokers/ids')))
return RebalanceChange(self.zk, self.zk.get_broker_ids())

def __str__(self):
return 'RebalanceOnStartCheck (executed={})'.format(self.executed)


class RebalanceOnBrokerListChange(Check):
def __init__(self, zk, broker: BrokerManager):
def __init__(self, zk: BukuExhibitor, broker: BrokerManager):
super().__init__()
self.zk = zk
self.broker = broker
Expand All @@ -214,7 +182,7 @@ def __init__(self, zk, broker: BrokerManager):
def check(self):
if not self.broker.is_running_and_registered():
return None
new_list = sorted(self.zk.get_children('/brokers/ids'))
new_list = self.zk.get_broker_ids()
if not new_list == self.old_broker_list:
_LOG.info('Broker list changed from {} to {}, triggering rebalance'.format(self.old_broker_list, new_list))
self.old_broker_list = new_list
Expand Down
22 changes: 5 additions & 17 deletions bubuku/features/rebalance_by_size.py
Original file line number Diff line number Diff line change
@@ -1,18 +1,15 @@
import json
import logging

from kazoo.exceptions import NodeExistsError

from bubuku.broker import BrokerManager
from bubuku.controller import Check, Change
from bubuku.utils import CmdHelper
from bubuku.zookeeper import Exhibitor
from bubuku.zookeeper import BukuExhibitor

_LOG = logging.getLogger('bubuku.features.rebalance_by_size')


class RebalanceBySizeChange(Change):
def __init__(self, zk: Exhibitor):
def __init__(self, zk: BukuExhibitor):
self.zk = zk

def get_name(self):
Expand All @@ -29,7 +26,7 @@ def can_run_at_exit(self):


class RebalanceBySize(Check):
def __init__(self, zk: Exhibitor, broker: BrokerManager):
def __init__(self, zk: BukuExhibitor, broker: BrokerManager):
super().__init__(check_interval_s=600)
self.zk = zk
self.broker = broker
Expand All @@ -45,7 +42,7 @@ def __is_data_imbalanced(self) -> bool:


class GenerateDataSizeStatistics(Check):
def __init__(self, zk: Exhibitor, broker: BrokerManager, cmd_helper: CmdHelper, kafka_log_dirs):
def __init__(self, zk: BukuExhibitor, broker: BrokerManager, cmd_helper: CmdHelper, kafka_log_dirs):
super().__init__(check_interval_s=1800)
self.zk = zk
self.broker = broker
Expand All @@ -66,7 +63,7 @@ def __generate_stats(self):
topics_stats = self.__get_topics_stats()
disk_stats = self.__get_disk_stats()
stats = {"disk": disk_stats, "topics": topics_stats}
self.__write_stats_to_zk(stats)
self.zk.update_disk_stats(self.broker.id_manager.get_broker_id(), stats)

def __get_topics_stats(self):
topics_stats = {}
Expand Down Expand Up @@ -109,12 +106,3 @@ def __get_disk_stats(self):
total_used += int(used)
total_free += int(free)
return {"used_kb": total_used, "free_kb": total_free}

def __write_stats_to_zk(self, stats):
broker_id = self.broker.id_manager.get_broker_id()
data = json.dumps(stats, sort_keys=True, separators=(',', ':')).encode("utf-8")
path = "/bubuku/size_stats/{}".format(broker_id)
try:
self.zk.create(path, data, ephemeral=True, makepath=True)
except NodeExistsError:
self.zk.set(path, data)
Loading

0 comments on commit 69906cf

Please sign in to comment.