Skip to content
This repository has been archived by the owner on Oct 7, 2024. It is now read-only.

Make kafka startup timeout configurable #58

Merged
merged 3 commits into from
Sep 9, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 31 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ Bubuku can be configured using environment properties:
- `BUKU_FEATURES` - List of optional bubuku features, see [features](#features) section
- `HEALTH_PORT` - Port for health checks
- `FREE_SPACE_DIFF_THRESHOLD_MB` - Threshold for starting `balance_data_size` feature, if it's enabled
- `STARTUP_TIMEOUT_TYPE`, `STARTUP_TIMEOUT_INITIAL`, `STARTUP_TIMEOUT_STEP` - The way bubuku manages [time to start for kafka](#startup_timeout).

# Features #

Expand Down Expand Up @@ -89,6 +90,36 @@ Pluggable features are defined in configuration and are disabled by default. Lis
- `balance_data_size` - Swap partitions one by one by one if imbalance in size on brokers is bigger than
`FREE_SPACE_DIFF_THRESHOLD_MB` megabytes.

## <a name="startup_timeout"></a> Timeouts for startup
Each time when bubuku tries to start kafka, it uses special startup timeout. This means, that if kafka broker id
is not found within this timeout in zookeeper node `/broker/ids/{id}`, kafka process will be forcibly killed, timeout
for start updated, and startup will be retried.

There are two ways to increase timeout - linear and progressive. Linear adds the same amount of time after each
failed start. Progressive adds time, that is relative to current timeout. Configuration for that is provided by
`STARTUP_TIMEOUT_TYPE`, `STARTUP_TIMEOUT_INITIAL`, `STARTUP_TIMEOUT_STEP` parameters.
```
# Linear timeout configuration
# initial timeout=300 seconds, after each failed start increase by 60 seconds (360, 420 and so on)
export STARTUP_TIMEOUT_TYPE="linear"
export STARTUP_TIMEOUT_INITIAL="300"
export STARTUP_TIMEOUT_STEP="60"
```
```
# Progressive timeout configuration
# Initial timeout=300 seconds, after each failed start increase by timeout * 0.5 (450, 675 and so on)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it progressive enough? Maybe it needs to be more progressive (e.g. multiplied by 2 each time so that we have 300, 600, 1200 ...)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When we had fu** up on live each broker was starting about 15 mins. may be it is good timeout to start from?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@v-stepanov It's just defaults, anyone can set it's own parameters

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And btw, defaults are actually linear

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah ok, my bad. So in case of progressive the step is a multiplier.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@v-stepanov Finally I understood what is the problem. There is a bug in documentation - not step, but scale...

Copy link
Contributor Author

@antban antban Sep 8, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed in 0934fda

export STARTUP_TIMEOUT_TYPE="progressive"
export STARTUP_TIMEOUT_INITIAL="300"
export STARTUP_TIMEOUT_STEP="0.5"
```

Default values for timeout are
```
export STARTUP_TIMEOUT_TYPE="linear"
export STARTUP_TIMEOUT_INITIAL="300"
export STARTUP_TIMEOUT_STEP="60"
```

# How to contribute

If you have any features or bugfixes - make pull request providing feature/bugfix and tests that will test your
Expand Down
47 changes: 40 additions & 7 deletions bubuku/broker.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,15 +24,48 @@ def set(self, process):
self.process = process


class StartupTimeout(object):
def __init__(self, initial_value: float, config: str):
self.timeout = initial_value
self.config = config

def get_timeout(self) -> float:
return self.timeout

def on_timeout_fail(self):
self.timeout += self.get_step()

def get_step(self) -> float:
return 0.

def __str__(self):
return 'timeout={}, step={}, config={}'.format(self.get_timeout(), self.get_step(), self.config)

@staticmethod
def build(props: dict):
type_ = props.get('type', 'linear')
result = StartupTimeout(float(props.get('initial', '300')),
','.join('{}={}'.format(k, v) for k, v in props.items()))
if type_ == 'linear':
step = float(props.get('step', '60'))
result.get_step = lambda: step
elif type_ == 'progressive':
scale = float(props.get('step', '0.5'))
result.get_step = lambda: result.timeout * scale
else:
raise NotImplementedError('Startup timeout type {} is not valid'.format(type_))
return result


class BrokerManager(object):
def __init__(self, process_holder: KafkaProcessHolder, kafka_dir: str, exhibitor: BukuExhibitor,
id_manager: BrokerIdGenerator, kafka_properties: KafkaProperties):
id_manager: BrokerIdGenerator, kafka_properties: KafkaProperties, timeout: StartupTimeout):
self.kafka_dir = kafka_dir
self.id_manager = id_manager
self.exhibitor = exhibitor
self.kafka_properties = kafka_properties
self.process_holder = process_holder
self.wait_timeout = 15 * 60
self.timeout = timeout

def is_running_and_registered(self):
if not self.process_holder.get():
Expand Down Expand Up @@ -101,12 +134,12 @@ def start_kafka_process(self, zookeeper_address):
_LOG.info('Staring kafka process')
self.process_holder.set(self._open_process())

_LOG.info('Waiting for kafka to start up with timeout {} seconds'.format(self.wait_timeout))
if not self.id_manager.wait_for_broker_id_presence(self.wait_timeout):
self.wait_timeout += 60
_LOG.info('Waiting for kafka to start up with timeout {} seconds'.format(self.timeout.get_timeout()))
if not self.id_manager.wait_for_broker_id_presence(self.timeout.get_timeout()):
self.timeout.on_timeout_fail()
_LOG.error(
'Failed to wait for broker to start up, probably will kill, increasing timeout to {} seconds'.format(
self.wait_timeout))
'Failed to wait for broker to start up, probably will kill, next timeout is'.format(
self.timeout.get_timeout()))

def _is_leadership_transferred(self, active_broker_ids=None, dead_broker_ids=None):
_LOG.info('Checking if leadership is transferred: active_broker_ids={}, dead_broker_ids={}'.format(
Expand Down
12 changes: 9 additions & 3 deletions bubuku/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
_LOG = logging.getLogger('bubuku.properties')

Config = namedtuple('Config', ('kafka_dir', 'kafka_settings_template', 'zk_stack_name',
'zk_prefix', 'features', 'health_port', 'mode'))
'zk_prefix', 'features', 'health_port', 'mode', 'timeout'))


class KafkaProperties(object):
Expand Down Expand Up @@ -49,22 +49,28 @@ def dump(self):
f.write('{}\n'.format(l))


def _load_timeout_dict(load_func):
startup_timeout_pairs = [(name, load_func('STARTUP_TIMEOUT_{}'.format(name.upper()))) for name in
['type', 'initial', 'step']]
return {name: value for name, value in startup_timeout_pairs if value}


def load_config() -> Config:
zk_prefix = os.getenv('ZOOKEEPER_PREFIX', '/')

features_str = os.getenv('BUKU_FEATURES', '').lower()
features = {key: {} for key in features_str.split(',')} if features_str else {}
if "balance_data_size" in features:
features["balance_data_size"]["diff_threshold_mb"] = int(os.getenv('FREE_SPACE_DIFF_THRESHOLD_MB', '50000'))

return Config(
kafka_dir=os.getenv('KAFKA_DIR'),
kafka_settings_template=os.getenv('KAFKA_SETTINGS'),
zk_stack_name=os.getenv('ZOOKEEPER_STACK_NAME'),
zk_prefix=zk_prefix if zk_prefix.startswith('/') or not zk_prefix else '/{}'.format(zk_prefix),
features=features,
health_port=int(os.getenv('HEALTH_PORT', '8888')),
mode=str(os.getenv('BUBUKU_MODE', 'amazon')).lower()
mode=str(os.getenv('BUBUKU_MODE', 'amazon')).lower(),
timeout=_load_timeout_dict(os.getenv)
)


Expand Down
10 changes: 6 additions & 4 deletions bubuku/daemon.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,10 @@
import logging

from bubuku import health
from bubuku.broker import BrokerManager, KafkaProcessHolder
from bubuku.broker import BrokerManager, KafkaProcessHolder, StartupTimeout
from bubuku.config import load_config, KafkaProperties, Config
from bubuku.controller import Controller
from bubuku.env_provider import EnvProvider
from bubuku.features.data_size_stats import GenerateDataSizeStatistics
from bubuku.features.rebalance import RebalanceOnStartCheck, RebalanceOnBrokerListChange
from bubuku.features.remote_exec import RemoteCommandExecutorCheck
Expand All @@ -16,7 +17,6 @@
from bubuku.features.terminate import register_terminate_on_interrupt
from bubuku.utils import CmdHelper
from bubuku.zookeeper import BukuExhibitor, load_exhibitor_proxy
from bubuku.env_provider import EnvProvider

_LOG = logging.getLogger('bubuku.main')

Expand Down Expand Up @@ -44,18 +44,20 @@ def apply_features(api_port, features: dict, controller: Controller, buku_proxy:
def run_daemon_loop(config: Config, process_holder: KafkaProcessHolder, cmd_helper: CmdHelper, restart_on_init: bool):
_LOG.info("Using configuration: {}".format(config))
kafka_props = KafkaProperties(config.kafka_settings_template,
'{}/config/server.properties'.format(config.kafka_dir))
'{}/config/server.properties'.format(config.kafka_dir))

env_provider = EnvProvider.create_env_provider(config)
address_provider = env_provider.get_address_provider()
startup_timeout = StartupTimeout.build(config.timeout)

_LOG.info("Loading exhibitor configuration")
with load_exhibitor_proxy(address_provider, config.zk_prefix) as zookeeper:
_LOG.info("Loading broker_id policy")
broker_id_manager = env_provider.create_broker_id_manager(zookeeper, kafka_props)

_LOG.info("Building broker manager")
broker = BrokerManager(process_holder, config.kafka_dir, zookeeper, broker_id_manager, kafka_props)
broker = BrokerManager(process_holder, config.kafka_dir, zookeeper, broker_id_manager, kafka_props,
startup_timeout)

_LOG.info("Creating controller")
controller = Controller(broker, zookeeper, env_provider)
Expand Down
9 changes: 6 additions & 3 deletions tests/test_broker.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from unittest.mock import MagicMock

from bubuku.broker import BrokerManager, LeaderElectionInProgress, KafkaProcessHolder
from bubuku.broker import BrokerManager, LeaderElectionInProgress, KafkaProcessHolder, StartupTimeout
from test_config import build_test_properties

zk_fake_host = 'zk_host:8181/path'
Expand Down Expand Up @@ -30,7 +30,8 @@ def _load_states():
kafka_props = build_test_properties()
kafka_props.set_property('unclean.leader.election.enable', 'true')

manager = FakeProcessManager(KafkaProcessHolder(), 'kafka_dir', exhibitor, id_manager, kafka_props)
manager = FakeProcessManager(KafkaProcessHolder(), 'kafka_dir', exhibitor, id_manager, kafka_props,
StartupTimeout.build({'type': 'linear'}))

assert not manager.has_leadership()

Expand All @@ -48,7 +49,8 @@ def __prepare_for_start_fail(broker_ids, leader, isr):
id_manager.get_broker_id = lambda: '1'
kafka_props = build_test_properties()

broker = FakeProcessManager(KafkaProcessHolder(), 'kafka_dir', exhibitor, id_manager, kafka_props)
broker = FakeProcessManager(KafkaProcessHolder(), 'kafka_dir', exhibitor, id_manager, kafka_props,
StartupTimeout.build({'type': 'linear'}))

kafka_props.set_property('unclean.leader.election.enable', 'false')
return kafka_props, broker
Expand Down Expand Up @@ -99,6 +101,7 @@ def test_broker_start_success_unclean_2():
# suppose that broker is free to start
broker.start_kafka_process(zk_fake_host)


def test_broker_start_fail_no_zk_conn():
kafka_props, broker = __prepare_for_start_fail(['1', '2'], 3, [1, 5])
try:
Expand Down
11 changes: 10 additions & 1 deletion tests/test_config.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import os
from tempfile import mkstemp

from bubuku.config import KafkaProperties, load_config
from bubuku.config import KafkaProperties, load_config, _load_timeout_dict

__PROPS = """
log.dirs=/data/kafka-logs
Expand Down Expand Up @@ -109,3 +109,12 @@ def test_zk_prefix_replacement():

os.environ['ZOOKEEPER_PREFIX'] = '/test'
assert load_config().zk_prefix == '/test'


def test_parse_timeout():
assert {'type': 'linear', 'initial': '300', 'step': '60'} == _load_timeout_dict(
{'STARTUP_TIMEOUT_TYPE': 'linear', 'STARTUP_TIMEOUT_INITIAL': '300', 'STARTUP_TIMEOUT_STEP': '60'}.get)
assert {'type': 'linear', 'step': '60'} == _load_timeout_dict(
{'STARTUP_TIMEOUT_TYPE': 'linear', 'STARTUP_TIMEOUT_STEP': '60'}.get)
assert {'initial': '300', 'step': '60'} == _load_timeout_dict(
{'STARTUP_TIMEOUT_INITIAL': '300', 'STARTUP_TIMEOUT_STEP': '60'}.get)
40 changes: 40 additions & 0 deletions tests/test_startup_timeout.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
import unittest

from bubuku.broker import StartupTimeout


class TestDataSizeStats(unittest.TestCase):
def test_linear_defaults(self):
o = StartupTimeout.build({'type': 'linear'})
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is it an egg?

TestDataSizeStats._verify(o, 300., 60.)

@staticmethod
def _verify(o: StartupTimeout, value: float, step: float):
print(o)
assert o.get_timeout() == value
assert o.get_step() == step

def test_linear(self):
o = StartupTimeout.build({'type': 'linear', 'initial': '10', 'step': 2})
TestDataSizeStats._verify(o, 10., 2.)

o.on_timeout_fail()
TestDataSizeStats._verify(o, 12., 2.)

o.on_timeout_fail()
TestDataSizeStats._verify(o, 14., 2.)

def test_progressive_defaults(self):
o = StartupTimeout.build({'type': 'progressive'})
TestDataSizeStats._verify(o, 300., 150.)

def test_progressive(self):
o = StartupTimeout.build({'type': 'progressive', 'initial': '16', 'step': '0.25'})

TestDataSizeStats._verify(o, 16., 4.)

o.on_timeout_fail()
TestDataSizeStats._verify(o, 20., 5.)

o.on_timeout_fail()
TestDataSizeStats._verify(o, 25., 6.25)