Skip to content
This repository has been archived by the owner on Oct 7, 2024. It is now read-only.

Commit

Permalink
#56 Make kafka startup timeout configurable
Browse files Browse the repository at this point in the history
  • Loading branch information
antban committed Sep 7, 2016
1 parent 584387e commit e3f891c
Show file tree
Hide file tree
Showing 7 changed files with 126 additions and 17 deletions.
22 changes: 22 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ Bubuku can be configured using environment properties:
- `BUKU_FEATURES` - List of optional bubuku features, see [features](#features) section
- `HEALTH_PORT` - Port for health checks
- `FREE_SPACE_DIFF_THRESHOLD_MB` - Threshold for starting `balance_data_size` feature, if it's enabled
- `STARTUP_TIMEOUT` - The way bubuku manages [time to start for kafka](#startup_timeout).

# Features #

Expand Down Expand Up @@ -89,6 +90,27 @@ Pluggable features are defined in configuration and are disabled by default. Lis
- `balance_data_size` - Swap partitions one by one by one if imbalance in size on brokers is bigger than
`FREE_SPACE_DIFF_THRESHOLD_MB` megabytes.

## <a name="startup_timeout"></a> Timeouts for startup
Each time when bubuku tries to start kafka, it uses special startup timeout. This means, that if kafka broker id
is not found within this timeout in zookeeper node `/broker/ids/{id}`, kafka process will be forcibly killed, timeout
for start updated, and startup will be retried.

There are two ways to increase timeout - linear and progressive. Linear adds the same amount of time after each
failed start. Progressive adds time, that is relative to current timeout. Configuration for that is provided by
`STARTUP_TIMEOUT` parameter.
```
# Linear timeout configuration
# initial timeout=300 seconds, after each failed start increase by 60 seconds (360, 420 and so on)
export STARTUP_TIMEOUT="type=linear:initial=300:step=60"
```
```
# Progressive timeout configuration
# Initial timeout=300 seconds, after each failed start increase by timeout * 0.5 (450, 675 and so on)
export STARTUP_TIMEOUT="type=progressive:initial=300:step=0.5"
```

Default value for timeout is `type=linear:initial=300:step=60`.

# How to contribute

If you have any features or bugfixes - make pull request providing feature/bugfix and tests that will test your
Expand Down
47 changes: 40 additions & 7 deletions bubuku/broker.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,15 +24,48 @@ def set(self, process):
self.process = process


class StartupTimeout(object):
def __init__(self, initial_value: float, config: str):
self.timeout = initial_value
self.config = config

def get_timeout(self) -> float:
return self.timeout

def on_timeout_fail(self):
self.timeout += self.get_step()

def get_step(self) -> float:
return 0.

def __str__(self):
return 'timeout={}, step={}, config={}'.format(self.get_timeout(), self.get_step(), self.config)

@staticmethod
def build(props: dict):
type_ = props.get('type')
result = StartupTimeout(float(props.get('initial', '300')),
','.join('{}={}'.format(k, v) for k, v in props.items()))
if type_ == 'linear':
step = float(props.get('step', '60'))
result.get_step = lambda: step
elif type_ == 'progressive':
scale = float(props.get('scale', '0.5'))
result.get_step = lambda: result.timeout * scale
else:
raise NotImplementedError('Startup timeout type {} is not valid'.format(type_))
return result


class BrokerManager(object):
def __init__(self, process_holder: KafkaProcessHolder, kafka_dir: str, exhibitor: BukuExhibitor,
id_manager: BrokerIdGenerator, kafka_properties: KafkaProperties):
id_manager: BrokerIdGenerator, kafka_properties: KafkaProperties, timeout: StartupTimeout):
self.kafka_dir = kafka_dir
self.id_manager = id_manager
self.exhibitor = exhibitor
self.kafka_properties = kafka_properties
self.process_holder = process_holder
self.wait_timeout = 15 * 60
self.timeout = timeout

def is_running_and_registered(self):
if not self.process_holder.get():
Expand Down Expand Up @@ -101,12 +134,12 @@ def start_kafka_process(self, zookeeper_address):
_LOG.info('Staring kafka process')
self.process_holder.set(self._open_process())

_LOG.info('Waiting for kafka to start up with timeout {} seconds'.format(self.wait_timeout))
if not self.id_manager.wait_for_broker_id_presence(self.wait_timeout):
self.wait_timeout += 60
_LOG.info('Waiting for kafka to start up with timeout {} seconds'.format(self.timeout.get_timeout()))
if not self.id_manager.wait_for_broker_id_presence(self.timeout.get_timeout()):
self.timeout.on_timeout_fail()
_LOG.error(
'Failed to wait for broker to start up, probably will kill, increasing timeout to {} seconds'.format(
self.wait_timeout))
'Failed to wait for broker to start up, probably will kill, next timeout is'.format(
self.timeout.get_timeout()))

def _is_leadership_transferred(self, active_broker_ids=None, dead_broker_ids=None):
_LOG.info('Checking if leadership is transferred: active_broker_ids={}, dead_broker_ids={}'.format(
Expand Down
9 changes: 7 additions & 2 deletions bubuku/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
_LOG = logging.getLogger('bubuku.properties')

Config = namedtuple('Config', ('kafka_dir', 'kafka_settings_template', 'zk_stack_name',
'zk_prefix', 'features', 'health_port', 'mode'))
'zk_prefix', 'features', 'health_port', 'mode', 'timeout'))


class KafkaProperties(object):
Expand Down Expand Up @@ -49,6 +49,10 @@ def dump(self):
f.write('{}\n'.format(l))


def _parse_timeout(value: str):
return {a: b for a, b in [tuple(x.split('=', 1)) for x in value.split(':')]}


def load_config() -> Config:
zk_prefix = os.getenv('ZOOKEEPER_PREFIX', '/')

Expand All @@ -64,7 +68,8 @@ def load_config() -> Config:
zk_prefix=zk_prefix if zk_prefix.startswith('/') or not zk_prefix else '/{}'.format(zk_prefix),
features=features,
health_port=int(os.getenv('HEALTH_PORT', '8888')),
mode=str(os.getenv('BUBUKU_MODE', 'amazon')).lower()
mode=str(os.getenv('BUBUKU_MODE', 'amazon')).lower(),
timeout=_parse_timeout(os.getenv('STARTUP_TIMEOUT', 'type=linear:initial=300:step=60'))
)


Expand Down
10 changes: 6 additions & 4 deletions bubuku/daemon.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,10 @@
import logging

from bubuku import health
from bubuku.broker import BrokerManager, KafkaProcessHolder
from bubuku.broker import BrokerManager, KafkaProcessHolder, StartupTimeout
from bubuku.config import load_config, KafkaProperties, Config
from bubuku.controller import Controller
from bubuku.env_provider import EnvProvider
from bubuku.features.data_size_stats import GenerateDataSizeStatistics
from bubuku.features.rebalance import RebalanceOnStartCheck, RebalanceOnBrokerListChange
from bubuku.features.remote_exec import RemoteCommandExecutorCheck
Expand All @@ -16,7 +17,6 @@
from bubuku.features.terminate import register_terminate_on_interrupt
from bubuku.utils import CmdHelper
from bubuku.zookeeper import BukuExhibitor, load_exhibitor_proxy
from bubuku.env_provider import EnvProvider

_LOG = logging.getLogger('bubuku.main')

Expand Down Expand Up @@ -44,18 +44,20 @@ def apply_features(api_port, features: dict, controller: Controller, buku_proxy:
def run_daemon_loop(config: Config, process_holder: KafkaProcessHolder, cmd_helper: CmdHelper, restart_on_init: bool):
_LOG.info("Using configuration: {}".format(config))
kafka_props = KafkaProperties(config.kafka_settings_template,
'{}/config/server.properties'.format(config.kafka_dir))
'{}/config/server.properties'.format(config.kafka_dir))

env_provider = EnvProvider.create_env_provider(config)
address_provider = env_provider.get_address_provider()
startup_timeout = StartupTimeout.build(config.timeout)

_LOG.info("Loading exhibitor configuration")
with load_exhibitor_proxy(address_provider, config.zk_prefix) as zookeeper:
_LOG.info("Loading broker_id policy")
broker_id_manager = env_provider.create_broker_id_manager(zookeeper, kafka_props)

_LOG.info("Building broker manager")
broker = BrokerManager(process_holder, config.kafka_dir, zookeeper, broker_id_manager, kafka_props)
broker = BrokerManager(process_holder, config.kafka_dir, zookeeper, broker_id_manager, kafka_props,
startup_timeout)

_LOG.info("Creating controller")
controller = Controller(broker, zookeeper, env_provider)
Expand Down
9 changes: 6 additions & 3 deletions tests/test_broker.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from unittest.mock import MagicMock

from bubuku.broker import BrokerManager, LeaderElectionInProgress, KafkaProcessHolder
from bubuku.broker import BrokerManager, LeaderElectionInProgress, KafkaProcessHolder, StartupTimeout
from test_config import build_test_properties

zk_fake_host = 'zk_host:8181/path'
Expand Down Expand Up @@ -30,7 +30,8 @@ def _load_states():
kafka_props = build_test_properties()
kafka_props.set_property('unclean.leader.election.enable', 'true')

manager = FakeProcessManager(KafkaProcessHolder(), 'kafka_dir', exhibitor, id_manager, kafka_props)
manager = FakeProcessManager(KafkaProcessHolder(), 'kafka_dir', exhibitor, id_manager, kafka_props,
StartupTimeout.build({'type': 'linear'}))

assert not manager.has_leadership()

Expand All @@ -48,7 +49,8 @@ def __prepare_for_start_fail(broker_ids, leader, isr):
id_manager.get_broker_id = lambda: '1'
kafka_props = build_test_properties()

broker = FakeProcessManager(KafkaProcessHolder(), 'kafka_dir', exhibitor, id_manager, kafka_props)
broker = FakeProcessManager(KafkaProcessHolder(), 'kafka_dir', exhibitor, id_manager, kafka_props,
StartupTimeout.build({'type': 'linear'}))

kafka_props.set_property('unclean.leader.election.enable', 'false')
return kafka_props, broker
Expand Down Expand Up @@ -99,6 +101,7 @@ def test_broker_start_success_unclean_2():
# suppose that broker is free to start
broker.start_kafka_process(zk_fake_host)


def test_broker_start_fail_no_zk_conn():
kafka_props, broker = __prepare_for_start_fail(['1', '2'], 3, [1, 5])
try:
Expand Down
6 changes: 5 additions & 1 deletion tests/test_config.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import os
from tempfile import mkstemp

from bubuku.config import KafkaProperties, load_config
from bubuku.config import KafkaProperties, load_config, _parse_timeout

__PROPS = """
log.dirs=/data/kafka-logs
Expand Down Expand Up @@ -109,3 +109,7 @@ def test_zk_prefix_replacement():

os.environ['ZOOKEEPER_PREFIX'] = '/test'
assert load_config().zk_prefix == '/test'


def test_parse_timeout():
assert {'type': 'linear', 'initial': '300', 'step': '60'} == _parse_timeout('type=linear:initial=300:step=60')
40 changes: 40 additions & 0 deletions tests/test_startup_timeout.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
import unittest

from bubuku.broker import StartupTimeout


class TestDataSizeStats(unittest.TestCase):
def test_linear_defaults(self):
o = StartupTimeout.build({'type': 'linear'})
TestDataSizeStats._verify(o, 300., 60.)

@staticmethod
def _verify(o: StartupTimeout, value: float, step: float):
print(o)
assert o.get_timeout() == value
assert o.get_step() == step

def test_linear(self):
o = StartupTimeout.build({'type': 'linear', 'initial': '10', 'step': 2})
TestDataSizeStats._verify(o, 10., 2.)

o.on_timeout_fail()
TestDataSizeStats._verify(o, 12., 2.)

o.on_timeout_fail()
TestDataSizeStats._verify(o, 14., 2.)

def test_progressive_defaults(self):
o = StartupTimeout.build({'type': 'progressive'})
TestDataSizeStats._verify(o, 300., 150.)

def test_progressive(self):
o = StartupTimeout.build({'type': 'progressive', 'initial': '16', 'scale': '0.25'})

TestDataSizeStats._verify(o, 16., 4.)

o.on_timeout_fail()
TestDataSizeStats._verify(o, 20., 5.)

o.on_timeout_fail()
TestDataSizeStats._verify(o, 25., 6.25)

0 comments on commit e3f891c

Please sign in to comment.