Skip to content
This repository has been archived by the owner on Oct 7, 2024. It is now read-only.

Commit

Permalink
Merge pull request #54 from zalando-incubator/I53
Browse files Browse the repository at this point in the history
Bubuku must live even after catching exceptions, closes #53
  • Loading branch information
antban authored Aug 19, 2016
2 parents 75c5012 + 7d2a13e commit a4e6c55
Show file tree
Hide file tree
Showing 8 changed files with 82 additions and 46 deletions.
31 changes: 21 additions & 10 deletions bubuku/broker.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,18 +13,29 @@ class LeaderElectionInProgress(Exception):
pass


class KafkaProcessHolder(object):
def __init__(self):
self.process = None

def get(self):
return self.process

def set(self, process):
self.process = process


class BrokerManager(object):
def __init__(self, kafka_dir: str, exhibitor: BukuExhibitor, id_manager: BrokerIdGenerator,
kafka_properties: KafkaProperties):
def __init__(self, process_holder: KafkaProcessHolder, kafka_dir: str, exhibitor: BukuExhibitor,
id_manager: BrokerIdGenerator, kafka_properties: KafkaProperties):
self.kafka_dir = kafka_dir
self.id_manager = id_manager
self.exhibitor = exhibitor
self.kafka_properties = kafka_properties
self.process = None
self.process_holder = process_holder
self.wait_timeout = 5 * 60

def is_running_and_registered(self):
if not self.process:
if not self.process_holder.get():
return False
return self.id_manager.is_registered()

Expand All @@ -47,14 +58,14 @@ def has_leadership(self):
return not self._is_leadership_transferred(dead_broker_ids=[broker_id])

def _terminate_process(self):
if self.process is not None:
if self.process_holder.get() is not None:
try:
self.process.terminate()
self.process.wait()
self.process_holder.get().terminate()
self.process_holder.get().wait()
except Exception as e:
_LOG.error('Failed to wait for termination of kafka process', exc_info=e)
finally:
self.process = None
self.process_holder.set(None)

def _wait_for_zk_absence(self):
try:
Expand All @@ -71,7 +82,7 @@ def start_kafka_process(self, zookeeper_address):
:param zookeeper_address: Address to use for kafka
:raise LeaderElectionInProgress: raised when broker can not be started because leader election is in progress
"""
if not self.process:
if not self.process_holder.get():
if not self._is_leadership_transferred(active_broker_ids=self.exhibitor.get_broker_ids()):
raise LeaderElectionInProgress()

Expand All @@ -88,7 +99,7 @@ def start_kafka_process(self, zookeeper_address):
self.kafka_properties.dump()

_LOG.info('Staring kafka process')
self.process = self._open_process()
self.process_holder.set(self._open_process())

_LOG.info('Waiting for kafka to start up with timeout {} seconds'.format(self.wait_timeout))
if not self.id_manager.wait_for_broker_id_presence(self.wait_timeout):
Expand Down
4 changes: 3 additions & 1 deletion bubuku/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ class KafkaProperties(object):
def __init__(self, template: str, kafka_settings: str):
self.lines = []
self.settings_file = kafka_settings
_LOG.info('Loading template properties from {}'.format(self.settings_file))
_LOG.info('Loading template properties from {}'.format(template))
with open(template, 'r') as f:
for l in f.readlines():
self.lines.append(_make_clean_line(l))
Expand Down Expand Up @@ -72,5 +72,7 @@ def _make_clean_line(l: str) -> str:
result = l.strip()
if result.startswith('#') or not result:
return result
if '=' not in result:
return ''
n, v = result.split('=', 1)
return '{}={}'.format(n.strip(), v)
5 changes: 3 additions & 2 deletions bubuku/controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -118,9 +118,10 @@ def _release_changes_lock(self, changes_to_remove):
for name in changes_to_remove:
self.zk.unregister_change(name)

def loop(self):
def loop(self, change_on_init=None):
ip = self.amazon.get_own_ip()

if change_on_init:
self._add_change_to_queue(change_on_init)
while self.running or self.changes:
self.make_step(ip)

Expand Down
60 changes: 36 additions & 24 deletions bubuku/daemon.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,14 @@

from bubuku import health
from bubuku.amazon import Amazon
from bubuku.broker import BrokerManager
from bubuku.config import load_config, KafkaProperties
from bubuku.broker import BrokerManager, KafkaProcessHolder
from bubuku.config import load_config, KafkaProperties, Config
from bubuku.controller import Controller
from bubuku.features.data_size_stats import GenerateDataSizeStatistics
from bubuku.features.rebalance import RebalanceOnStartCheck, RebalanceOnBrokerListChange
from bubuku.features.remote_exec import RemoteCommandExecutorCheck
from bubuku.features.restart_if_dead import CheckBrokerStopped
from bubuku.features.restart_on_zk_change import CheckExhibitorAddressChanged
from bubuku.features.restart_on_zk_change import CheckExhibitorAddressChanged, RestartBrokerChange
from bubuku.features.swap_partitions import CheckBrokersDiskImbalance
from bubuku.features.terminate import register_terminate_on_interrupt
from bubuku.id_generator import get_broker_id_policy
Expand Down Expand Up @@ -43,42 +43,54 @@ def apply_features(api_port, features: dict, controller: Controller, buku_proxy:
_LOG.error('Using of unsupported feature "{}", skipping it'.format(feature))


def main():
logging.basicConfig(level=getattr(logging, 'INFO', None))

config = load_config()
def run_daemon_loop(config: Config, process_holder: KafkaProcessHolder, cmd_helper: CmdHelper, restart_on_init: bool):
_LOG.info("Using configuration: {}".format(config))
kafka_properties = KafkaProperties(config.kafka_settings_template,
'{}/config/server.properties'.format(config.kafka_dir))

amazon = Amazon()

address_provider = AWSExhibitorAddressProvider(amazon, config.zk_stack_name)

_LOG.info("Loading exhibitor configuration")
buku_proxy = load_exhibitor_proxy(address_provider, config.zk_prefix)
with load_exhibitor_proxy(address_provider, config.zk_prefix) as buku_proxy:
_LOG.info("Loading broker_id policy")
broker_id_manager = get_broker_id_policy(config.id_policy, buku_proxy, kafka_properties, amazon)

_LOG.info("Loading broker_id policy")
broker_id_manager = get_broker_id_policy(config.id_policy, buku_proxy, kafka_properties, amazon)
_LOG.info("Building broker manager")
broker = BrokerManager(process_holder, config.kafka_dir, buku_proxy, broker_id_manager, kafka_properties)

_LOG.info("Building broker manager")
broker = BrokerManager(config.kafka_dir, buku_proxy, broker_id_manager, kafka_properties)
_LOG.info("Creating controller")
controller = Controller(broker, buku_proxy, amazon)

_LOG.info("Creating controller")
controller = Controller(broker, buku_proxy, amazon)
controller.add_check(CheckBrokerStopped(broker, buku_proxy))
controller.add_check(RemoteCommandExecutorCheck(buku_proxy, broker, config.health_port))
controller.add_check(GenerateDataSizeStatistics(buku_proxy, broker, cmd_helper,
kafka_properties.get_property("log.dirs").split(",")))
apply_features(config.health_port, config.features, controller, buku_proxy, broker, kafka_properties, amazon)

_LOG.info('Starting main controller loop')
controller.loop(RestartBrokerChange(buku_proxy, broker, lambda: False) if restart_on_init else None)

cmd_helper = CmdHelper()
controller.add_check(CheckBrokerStopped(broker, buku_proxy))
controller.add_check(RemoteCommandExecutorCheck(buku_proxy, broker, config.health_port))
controller.add_check(GenerateDataSizeStatistics(buku_proxy, broker, cmd_helper,
kafka_properties.get_property("log.dirs").split(",")))
apply_features(config.health_port, config.features, controller, buku_proxy, broker, kafka_properties, amazon)

def main():
logging.basicConfig(level=getattr(logging, 'INFO', None))

config = load_config()
_LOG.info("Using configuration: {}".format(config))
process_holder = KafkaProcessHolder()
_LOG.info('Starting health server')
cmd_helper = CmdHelper()
health.start_server(config.health_port, cmd_helper)

_LOG.info('Starting main controller loop')
controller.loop()
restart_on_init = False
while True:
try:
run_daemon_loop(config, process_holder, cmd_helper, restart_on_init)
break
except:
_LOG.error("WOW! Almost died! Will try to restart from the begin. "
"After initialization will be complete, will try to restart", exc_info=True)
if process_holder.get():
restart_on_init = True


if __name__ == '__main__':
Expand Down
4 changes: 0 additions & 4 deletions bubuku/health.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,3 @@ def _thread_func():
_LOG.info('Starting health server on port {}'.format(port))
t.start()
return t


if __name__ == '__main__':
start_server(8080, CmdHelper())
12 changes: 12 additions & 0 deletions bubuku/zookeeper/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,10 @@ def _update_hosts(self, value):
self.client.set_hosts(self.conn_str)
self.client.start()

def terminate(self):
if self.client:
self.client.stop()

def session_listener(self, state):
pass

Expand Down Expand Up @@ -161,6 +165,14 @@ def __init__(self, exhibitor: _ZookeeperProxy, async=True):
except NodeExistsError:
pass

def __enter__(self):
_LOG.info('Entered safe exhibitor space')
return self

def __exit__(self, exc_type, exc_val, exc_tb):
_LOG.info()
self.exhibitor.terminate()

def is_broker_registered(self, broker_id):
try:
_, stat = self.exhibitor.get('/brokers/ids/{}'.format(broker_id))
Expand Down
6 changes: 4 additions & 2 deletions bubuku/zookeeper/exhibior.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ class AWSExhibitorAddressProvider(AddressListProvider):
def __init__(self, amazon: Amazon, zk_stack_name: str):
self.amazon = amazon
self.zk_stack_name = zk_stack_name
self.exhibitors = self.query_from_amazon()
self.exhibitors = []

def get_latest_address(self) -> (list, int):
json_ = self._query_exhibitors(self.exhibitors)
Expand All @@ -27,6 +27,8 @@ def get_latest_address(self) -> (list, int):
return None

def _query_exhibitors(self, exhibitors):
if not exhibitors:
return None
random.shuffle(exhibitors)
for host in exhibitors:
url = 'http://{}:{}{}'.format(host, 8181, '/exhibitor/v1/cluster/list')
Expand All @@ -38,4 +40,4 @@ def _query_exhibitors(self, exhibitors):
return None

def query_from_amazon(self):
return self.amazon.get_addresses_by_lb_name(self.zk_stack_name)
return self.amazon.get_addresses_by_lb_name(self.zk_stack_name)
6 changes: 3 additions & 3 deletions tests/test_broker.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from unittest.mock import MagicMock

from bubuku.broker import BrokerManager, LeaderElectionInProgress
from bubuku.broker import BrokerManager, LeaderElectionInProgress, KafkaProcessHolder
from test_config import build_test_properties

zk_fake_host = 'zk_host:8181/path'
Expand Down Expand Up @@ -30,7 +30,7 @@ def _load_states():
kafka_props = build_test_properties()
kafka_props.set_property('unclean.leader.election.enable', 'true')

manager = FakeProcessManager('kafka_dir', exhibitor, id_manager, kafka_props)
manager = FakeProcessManager(KafkaProcessHolder(), 'kafka_dir', exhibitor, id_manager, kafka_props)

assert not manager.has_leadership()

Expand All @@ -48,7 +48,7 @@ def __prepare_for_start_fail(broker_ids, leader, isr):
id_manager.get_broker_id = lambda: '1'
kafka_props = build_test_properties()

broker = FakeProcessManager('kafka_dir', exhibitor, id_manager, kafka_props)
broker = FakeProcessManager(KafkaProcessHolder(), 'kafka_dir', exhibitor, id_manager, kafka_props)

kafka_props.set_property('unclean.leader.election.enable', 'false')
return kafka_props, broker
Expand Down

0 comments on commit a4e6c55

Please sign in to comment.