diff --git a/README.md b/README.md index 2b5ca7f..3fe12d8 100644 --- a/README.md +++ b/README.md @@ -58,6 +58,7 @@ Bubuku can be configured using environment properties: - `BUKU_FEATURES` - List of optional bubuku features, see [features](#features) section - `HEALTH_PORT` - Port for health checks - `FREE_SPACE_DIFF_THRESHOLD_MB` - Threshold for starting `balance_data_size` feature, if it's enabled + - `STARTUP_TIMEOUT_TYPE`, `STARTUP_TIMEOUT_INITIAL`, `STARTUP_TIMEOUT_STEP` - The way bubuku manages [time to start for kafka](#startup_timeout). # Features # @@ -89,6 +90,36 @@ Pluggable features are defined in configuration and are disabled by default. Lis - `balance_data_size` - Swap partitions one by one by one if imbalance in size on brokers is bigger than `FREE_SPACE_DIFF_THRESHOLD_MB` megabytes. +## Timeouts for startup + Each time when bubuku tries to start kafka, it uses special startup timeout. This means, that if kafka broker id + is not found within this timeout in zookeeper node `/broker/ids/{id}`, kafka process will be forcibly killed, timeout + for start updated, and startup will be retried. + + There are two ways to increase timeout - linear and progressive. Linear adds the same amount of time after each + failed start. Progressive adds time, that is relative to current timeout. Configuration for that is provided by + `STARTUP_TIMEOUT_TYPE`, `STARTUP_TIMEOUT_INITIAL`, `STARTUP_TIMEOUT_STEP` parameters. + ``` + # Linear timeout configuration + # initial timeout=300 seconds, after each failed start increase by 60 seconds (360, 420 and so on) + export STARTUP_TIMEOUT_TYPE="linear" + export STARTUP_TIMEOUT_INITIAL="300" + export STARTUP_TIMEOUT_STEP="60" + ``` + ``` + # Progressive timeout configuration + # Initial timeout=300 seconds, after each failed start increase by timeout * 0.5 (450, 675 and so on) + export STARTUP_TIMEOUT_TYPE="progressive" + export STARTUP_TIMEOUT_INITIAL="300" + export STARTUP_TIMEOUT_STEP="0.5" + ``` + + Default values for timeout are + ``` + export STARTUP_TIMEOUT_TYPE="linear" + export STARTUP_TIMEOUT_INITIAL="300" + export STARTUP_TIMEOUT_STEP="60" + ``` + # How to contribute If you have any features or bugfixes - make pull request providing feature/bugfix and tests that will test your diff --git a/bubuku/broker.py b/bubuku/broker.py index c693f0e..9e36d98 100644 --- a/bubuku/broker.py +++ b/bubuku/broker.py @@ -24,15 +24,48 @@ def set(self, process): self.process = process +class StartupTimeout(object): + def __init__(self, initial_value: float, config: str): + self.timeout = initial_value + self.config = config + + def get_timeout(self) -> float: + return self.timeout + + def on_timeout_fail(self): + self.timeout += self.get_step() + + def get_step(self) -> float: + return 0. + + def __str__(self): + return 'timeout={}, step={}, config={}'.format(self.get_timeout(), self.get_step(), self.config) + + @staticmethod + def build(props: dict): + type_ = props.get('type', 'linear') + result = StartupTimeout(float(props.get('initial', '300')), + ','.join('{}={}'.format(k, v) for k, v in props.items())) + if type_ == 'linear': + step = float(props.get('step', '60')) + result.get_step = lambda: step + elif type_ == 'progressive': + scale = float(props.get('step', '0.5')) + result.get_step = lambda: result.timeout * scale + else: + raise NotImplementedError('Startup timeout type {} is not valid'.format(type_)) + return result + + class BrokerManager(object): def __init__(self, process_holder: KafkaProcessHolder, kafka_dir: str, exhibitor: BukuExhibitor, - id_manager: BrokerIdGenerator, kafka_properties: KafkaProperties): + id_manager: BrokerIdGenerator, kafka_properties: KafkaProperties, timeout: StartupTimeout): self.kafka_dir = kafka_dir self.id_manager = id_manager self.exhibitor = exhibitor self.kafka_properties = kafka_properties self.process_holder = process_holder - self.wait_timeout = 15 * 60 + self.timeout = timeout def is_running_and_registered(self): if not self.process_holder.get(): @@ -101,12 +134,12 @@ def start_kafka_process(self, zookeeper_address): _LOG.info('Staring kafka process') self.process_holder.set(self._open_process()) - _LOG.info('Waiting for kafka to start up with timeout {} seconds'.format(self.wait_timeout)) - if not self.id_manager.wait_for_broker_id_presence(self.wait_timeout): - self.wait_timeout += 60 + _LOG.info('Waiting for kafka to start up with timeout {} seconds'.format(self.timeout.get_timeout())) + if not self.id_manager.wait_for_broker_id_presence(self.timeout.get_timeout()): + self.timeout.on_timeout_fail() _LOG.error( - 'Failed to wait for broker to start up, probably will kill, increasing timeout to {} seconds'.format( - self.wait_timeout)) + 'Failed to wait for broker to start up, probably will kill, next timeout is'.format( + self.timeout.get_timeout())) def _is_leadership_transferred(self, active_broker_ids=None, dead_broker_ids=None): _LOG.info('Checking if leadership is transferred: active_broker_ids={}, dead_broker_ids={}'.format( diff --git a/bubuku/config.py b/bubuku/config.py index c992e1d..92f8961 100644 --- a/bubuku/config.py +++ b/bubuku/config.py @@ -5,7 +5,7 @@ _LOG = logging.getLogger('bubuku.properties') Config = namedtuple('Config', ('kafka_dir', 'kafka_settings_template', 'zk_stack_name', - 'zk_prefix', 'features', 'health_port', 'mode')) + 'zk_prefix', 'features', 'health_port', 'mode', 'timeout')) class KafkaProperties(object): @@ -49,6 +49,12 @@ def dump(self): f.write('{}\n'.format(l)) +def _load_timeout_dict(load_func): + startup_timeout_pairs = [(name, load_func('STARTUP_TIMEOUT_{}'.format(name.upper()))) for name in + ['type', 'initial', 'step']] + return {name: value for name, value in startup_timeout_pairs if value} + + def load_config() -> Config: zk_prefix = os.getenv('ZOOKEEPER_PREFIX', '/') @@ -56,7 +62,6 @@ def load_config() -> Config: features = {key: {} for key in features_str.split(',')} if features_str else {} if "balance_data_size" in features: features["balance_data_size"]["diff_threshold_mb"] = int(os.getenv('FREE_SPACE_DIFF_THRESHOLD_MB', '50000')) - return Config( kafka_dir=os.getenv('KAFKA_DIR'), kafka_settings_template=os.getenv('KAFKA_SETTINGS'), @@ -64,7 +69,8 @@ def load_config() -> Config: zk_prefix=zk_prefix if zk_prefix.startswith('/') or not zk_prefix else '/{}'.format(zk_prefix), features=features, health_port=int(os.getenv('HEALTH_PORT', '8888')), - mode=str(os.getenv('BUBUKU_MODE', 'amazon')).lower() + mode=str(os.getenv('BUBUKU_MODE', 'amazon')).lower(), + timeout=_load_timeout_dict(os.getenv) ) diff --git a/bubuku/daemon.py b/bubuku/daemon.py index e93334f..0a86696 100644 --- a/bubuku/daemon.py +++ b/bubuku/daemon.py @@ -4,9 +4,10 @@ import logging from bubuku import health -from bubuku.broker import BrokerManager, KafkaProcessHolder +from bubuku.broker import BrokerManager, KafkaProcessHolder, StartupTimeout from bubuku.config import load_config, KafkaProperties, Config from bubuku.controller import Controller +from bubuku.env_provider import EnvProvider from bubuku.features.data_size_stats import GenerateDataSizeStatistics from bubuku.features.rebalance import RebalanceOnStartCheck, RebalanceOnBrokerListChange from bubuku.features.remote_exec import RemoteCommandExecutorCheck @@ -16,7 +17,6 @@ from bubuku.features.terminate import register_terminate_on_interrupt from bubuku.utils import CmdHelper from bubuku.zookeeper import BukuExhibitor, load_exhibitor_proxy -from bubuku.env_provider import EnvProvider _LOG = logging.getLogger('bubuku.main') @@ -44,10 +44,11 @@ def apply_features(api_port, features: dict, controller: Controller, buku_proxy: def run_daemon_loop(config: Config, process_holder: KafkaProcessHolder, cmd_helper: CmdHelper, restart_on_init: bool): _LOG.info("Using configuration: {}".format(config)) kafka_props = KafkaProperties(config.kafka_settings_template, - '{}/config/server.properties'.format(config.kafka_dir)) + '{}/config/server.properties'.format(config.kafka_dir)) env_provider = EnvProvider.create_env_provider(config) address_provider = env_provider.get_address_provider() + startup_timeout = StartupTimeout.build(config.timeout) _LOG.info("Loading exhibitor configuration") with load_exhibitor_proxy(address_provider, config.zk_prefix) as zookeeper: @@ -55,7 +56,8 @@ def run_daemon_loop(config: Config, process_holder: KafkaProcessHolder, cmd_help broker_id_manager = env_provider.create_broker_id_manager(zookeeper, kafka_props) _LOG.info("Building broker manager") - broker = BrokerManager(process_holder, config.kafka_dir, zookeeper, broker_id_manager, kafka_props) + broker = BrokerManager(process_holder, config.kafka_dir, zookeeper, broker_id_manager, kafka_props, + startup_timeout) _LOG.info("Creating controller") controller = Controller(broker, zookeeper, env_provider) diff --git a/tests/test_broker.py b/tests/test_broker.py index 40601d0..b21ecc5 100644 --- a/tests/test_broker.py +++ b/tests/test_broker.py @@ -1,6 +1,6 @@ from unittest.mock import MagicMock -from bubuku.broker import BrokerManager, LeaderElectionInProgress, KafkaProcessHolder +from bubuku.broker import BrokerManager, LeaderElectionInProgress, KafkaProcessHolder, StartupTimeout from test_config import build_test_properties zk_fake_host = 'zk_host:8181/path' @@ -30,7 +30,8 @@ def _load_states(): kafka_props = build_test_properties() kafka_props.set_property('unclean.leader.election.enable', 'true') - manager = FakeProcessManager(KafkaProcessHolder(), 'kafka_dir', exhibitor, id_manager, kafka_props) + manager = FakeProcessManager(KafkaProcessHolder(), 'kafka_dir', exhibitor, id_manager, kafka_props, + StartupTimeout.build({'type': 'linear'})) assert not manager.has_leadership() @@ -48,7 +49,8 @@ def __prepare_for_start_fail(broker_ids, leader, isr): id_manager.get_broker_id = lambda: '1' kafka_props = build_test_properties() - broker = FakeProcessManager(KafkaProcessHolder(), 'kafka_dir', exhibitor, id_manager, kafka_props) + broker = FakeProcessManager(KafkaProcessHolder(), 'kafka_dir', exhibitor, id_manager, kafka_props, + StartupTimeout.build({'type': 'linear'})) kafka_props.set_property('unclean.leader.election.enable', 'false') return kafka_props, broker @@ -99,6 +101,7 @@ def test_broker_start_success_unclean_2(): # suppose that broker is free to start broker.start_kafka_process(zk_fake_host) + def test_broker_start_fail_no_zk_conn(): kafka_props, broker = __prepare_for_start_fail(['1', '2'], 3, [1, 5]) try: diff --git a/tests/test_config.py b/tests/test_config.py index f61cb46..2669f90 100644 --- a/tests/test_config.py +++ b/tests/test_config.py @@ -1,7 +1,7 @@ import os from tempfile import mkstemp -from bubuku.config import KafkaProperties, load_config +from bubuku.config import KafkaProperties, load_config, _load_timeout_dict __PROPS = """ log.dirs=/data/kafka-logs @@ -109,3 +109,12 @@ def test_zk_prefix_replacement(): os.environ['ZOOKEEPER_PREFIX'] = '/test' assert load_config().zk_prefix == '/test' + + +def test_parse_timeout(): + assert {'type': 'linear', 'initial': '300', 'step': '60'} == _load_timeout_dict( + {'STARTUP_TIMEOUT_TYPE': 'linear', 'STARTUP_TIMEOUT_INITIAL': '300', 'STARTUP_TIMEOUT_STEP': '60'}.get) + assert {'type': 'linear', 'step': '60'} == _load_timeout_dict( + {'STARTUP_TIMEOUT_TYPE': 'linear', 'STARTUP_TIMEOUT_STEP': '60'}.get) + assert {'initial': '300', 'step': '60'} == _load_timeout_dict( + {'STARTUP_TIMEOUT_INITIAL': '300', 'STARTUP_TIMEOUT_STEP': '60'}.get) diff --git a/tests/test_startup_timeout.py b/tests/test_startup_timeout.py new file mode 100644 index 0000000..93d2a40 --- /dev/null +++ b/tests/test_startup_timeout.py @@ -0,0 +1,40 @@ +import unittest + +from bubuku.broker import StartupTimeout + + +class TestDataSizeStats(unittest.TestCase): + def test_linear_defaults(self): + o = StartupTimeout.build({'type': 'linear'}) + TestDataSizeStats._verify(o, 300., 60.) + + @staticmethod + def _verify(o: StartupTimeout, value: float, step: float): + print(o) + assert o.get_timeout() == value + assert o.get_step() == step + + def test_linear(self): + o = StartupTimeout.build({'type': 'linear', 'initial': '10', 'step': 2}) + TestDataSizeStats._verify(o, 10., 2.) + + o.on_timeout_fail() + TestDataSizeStats._verify(o, 12., 2.) + + o.on_timeout_fail() + TestDataSizeStats._verify(o, 14., 2.) + + def test_progressive_defaults(self): + o = StartupTimeout.build({'type': 'progressive'}) + TestDataSizeStats._verify(o, 300., 150.) + + def test_progressive(self): + o = StartupTimeout.build({'type': 'progressive', 'initial': '16', 'step': '0.25'}) + + TestDataSizeStats._verify(o, 16., 4.) + + o.on_timeout_fail() + TestDataSizeStats._verify(o, 20., 5.) + + o.on_timeout_fail() + TestDataSizeStats._verify(o, 25., 6.25)