From 1d277eb2ef0a129e81d435d5c760b44c12348788 Mon Sep 17 00:00:00 2001 From: antban Date: Thu, 8 Sep 2016 15:06:19 +0200 Subject: [PATCH] #59 Handle network problems, allow to correctly reinitialise after it --- bubuku/env_provider.py | 64 +++++++++++++++++++++-------------- bubuku/zookeeper/__init__.py | 11 ++++-- bubuku/zookeeper/exhibitor.py | 36 +++----------------- tests/test_exhibitor.py | 56 ++++++++++++++++++------------ tests/test_zookeeper.py | 21 ++++++++++++ 5 files changed, 108 insertions(+), 80 deletions(-) diff --git a/bubuku/env_provider.py b/bubuku/env_provider.py index 83786af..3f7065c 100644 --- a/bubuku/env_provider.py +++ b/bubuku/env_provider.py @@ -1,14 +1,15 @@ import json import logging +import uuid +from functools import partial +import boto3 import requests -from bubuku.id_generator import BrokerIDByIp, BrokerIdAutoAssign -from bubuku.zookeeper import BukuExhibitor -from bubuku.zookeeper.exhibitor import AWSExhibitorAddressProvider -from bubuku.zookeeper.exhibitor import LocalAddressProvider from bubuku.config import Config, KafkaProperties -import uuid +from bubuku.id_generator import BrokerIDByIp, BrokerIdAutoAssign +from bubuku.zookeeper import BukuExhibitor, AddressListProvider +from bubuku.zookeeper.exhibitor import ExhibitorAddressProvider _LOG = logging.getLogger('bubuku.amazon') @@ -35,37 +36,51 @@ def create_env_provider(config: Config): class AmazonEnvProvider(EnvProvider): def __init__(self, config: Config): - self.document = None self.aws_addr = '169.254.169.254' self.config = config + self.ip_address = None def _get_document(self) -> dict: - if not self.document: - try: - self.document = requests.get( - 'http://{}/latest/dynamic/instance-identity/document'.format(self.aws_addr), - timeout=5).json() - _LOG.info("Amazon specific information loaded from AWS: {}".format( - json.dumps(self.document, indent=2))) - except Exception as ex: - _LOG.warn('Failed to download AWS document', exc_info=ex) - return self.document - - def get_aws_region(self) -> str: - doc = self._get_document() - return doc['region'] if doc else None + document = requests.get('http://{}/latest/dynamic/instance-identity/document'.format(self.aws_addr), + timeout=5).json() + _LOG.info("Amazon specific information loaded from AWS: {}".format(json.dumps(document, indent=2))) + return document def get_id(self) -> str: - doc = self._get_document() - return doc['privateIp'] if doc else '127.0.0.1' + if not self.ip_address: + self.ip_address = self._get_document()['privateIp'] + return self.ip_address + + def _load_instance_ips(self, lb_name: str): + region = self._get_document()['region'] + + private_ips = [] + + elb = boto3.client('elb', region_name=region) + ec2 = boto3.client('ec2', region_name=region) + + response = elb.describe_instance_health(LoadBalancerName=lb_name) + + for instance in response['InstanceStates']: + if instance['State'] == 'InService': + private_ips.append(ec2.describe_instances( + InstanceIds=[instance['InstanceId']])['Reservations'][0]['Instances'][0]['PrivateIpAddress']) + + _LOG.info("Ip addresses for {} are: {}".format(lb_name, private_ips)) + return private_ips def get_address_provider(self): - return AWSExhibitorAddressProvider(self.config.zk_stack_name, self.get_aws_region()) + return ExhibitorAddressProvider(partial(self._load_instance_ips, self.config.zk_stack_name)) def create_broker_id_manager(self, zk: BukuExhibitor, kafka_props: KafkaProperties): return BrokerIDByIp(zk, self.get_id(), kafka_props) +class _LocalAddressProvider(AddressListProvider): + def get_latest_address(self) -> (list, int): + return ('zookeeper',), 2181 + + class LocalEnvProvider(EnvProvider): unique_id = str(uuid.uuid4()) @@ -73,8 +88,7 @@ def get_id(self) -> str: return self.unique_id def get_address_provider(self): - return LocalAddressProvider() + return _LocalAddressProvider() def create_broker_id_manager(self, zk: BukuExhibitor, kafka_props: KafkaProperties): return BrokerIdAutoAssign(zk, kafka_props) - diff --git a/bubuku/zookeeper/__init__.py b/bubuku/zookeeper/__init__.py index ed987ad..59e9afd 100644 --- a/bubuku/zookeeper/__init__.py +++ b/bubuku/zookeeper/__init__.py @@ -49,10 +49,10 @@ def touch(self): value = None if self.force: while value is None: - value = self.load_func() + value = self._load_value_safe() self.force = False else: - value = self.load_func() + value = self._load_value_safe() if value is not None: if value != self.value: self.value = value @@ -62,6 +62,13 @@ def touch(self): self.update_func(self.value) self.next_apply = None + def _load_value_safe(self): + try: + return self.load_func() + except Exception as e: + _LOG.error('Failed to load value to update', exc_info=e) + return None + class AddressListProvider(object): def get_latest_address(self) -> (list, int): diff --git a/bubuku/zookeeper/exhibitor.py b/bubuku/zookeeper/exhibitor.py index 3fd0190..f24bc19 100644 --- a/bubuku/zookeeper/exhibitor.py +++ b/bubuku/zookeeper/exhibitor.py @@ -3,23 +3,21 @@ import requests from requests import RequestException -import boto3 from bubuku.zookeeper import AddressListProvider _LOG = logging.getLogger('bubuku.zookeeper.exhibitor') -class AWSExhibitorAddressProvider(AddressListProvider): - def __init__(self, zk_stack_name: str, region: str): - self.zk_stack_name = zk_stack_name - self.region = region +class ExhibitorAddressProvider(AddressListProvider): + def __init__(self, initial_list_provider): + self.initial_list_provider = initial_list_provider self.exhibitors = [] def get_latest_address(self) -> (list, int): json_ = self._query_exhibitors(self.exhibitors) if not json_: - self.exhibitors = self.get_addresses_by_lb_name() + self.exhibitors = self.initial_list_provider() json_ = self._query_exhibitors(self.exhibitors) if isinstance(json_, dict) and 'servers' in json_ and 'port' in json_: self.exhibitors = json_['servers'] @@ -38,29 +36,3 @@ def _query_exhibitors(self, exhibitors): except RequestException as e: _LOG.warn('Failed to query zookeeper list information from {}'.format(url), exc_info=e) return None - - def get_addresses_by_lb_name(self) -> list: - lb_name = self.zk_stack_name - - private_ips = [] - - if self.region is not None: - elb = boto3.client('elb', region_name=self.region) - ec2 = boto3.client('ec2', region_name=self.region) - - response = elb.describe_instance_health(LoadBalancerName=lb_name) - - for instance in response['InstanceStates']: - if instance['State'] == 'InService': - private_ips.append(ec2.describe_instances( - InstanceIds=[instance['InstanceId']])['Reservations'][0]['Instances'][0]['PrivateIpAddress']) - - else: - private_ips = [lb_name] - _LOG.info("Ip addresses for {} are: {}".format(lb_name, private_ips)) - return private_ips - - -class LocalAddressProvider(AddressListProvider): - def get_latest_address(self) -> (list, int): - return ('zookeeper',), 2181 diff --git a/tests/test_exhibitor.py b/tests/test_exhibitor.py index 90e256b..efd2c32 100644 --- a/tests/test_exhibitor.py +++ b/tests/test_exhibitor.py @@ -1,34 +1,48 @@ +import unittest from unittest.mock import MagicMock -from bubuku.zookeeper.exhibitor import AWSExhibitorAddressProvider +from bubuku.zookeeper.exhibitor import ExhibitorAddressProvider -def test_get_latest_address(): - AWSExhibitorAddressProvider.get_addresses_by_lb_name = MagicMock(return_value=['aws-lb-1', 'aws-lb-2']) - AWSExhibitorAddressProvider._query_exhibitors = MagicMock(return_value={'servers':['aws-lb-1-new'], 'port': 99}) - address_provider = AWSExhibitorAddressProvider('zk-stack-name', 'eu-west') - actual_result = address_provider.get_latest_address() +class ExhibitorAddressProviderTest(unittest.TestCase): + def test_get_latest_address(self): + address_provider = ExhibitorAddressProvider(lambda: ['aws-lb-1', 'aws-lb-2']) + address_provider._query_exhibitors = lambda _: {'servers': ['aws-lb-1-new'], 'port': 99} - assert actual_result == (['aws-lb-1-new'], 99) + actual_result = address_provider.get_latest_address() + assert actual_result == (['aws-lb-1-new'], 99) -def test_get_latest_address_no_exhibitors(): - AWSExhibitorAddressProvider.get_addresses_by_lb_name = MagicMock(return_value=['aws-lb-1', 'aws-lb-2']) - AWSExhibitorAddressProvider._query_exhibitors = MagicMock(return_value=None) + def test_get_latest_address_no_exhibitors(self): + address_provider = ExhibitorAddressProvider(lambda: ['aws-lb-1', 'aws-lb-2']) + address_provider._query_exhibitors = lambda _: None - address_provider = AWSExhibitorAddressProvider('zk-stack-name', 'eu-west') - actual_result = address_provider.get_latest_address() + actual_result = address_provider.get_latest_address() + assert actual_result is None - assert actual_result is None + def test_get_latest_address_2(self): + address_provider = ExhibitorAddressProvider(lambda: ['aws-lb-1', 'aws-lb-2']) + address_provider._query_exhibitors = MagicMock() + address_provider._query_exhibitors.side_effect = [None, {'servers': ['aws-lb-1-new'], 'port': 99}] + actual_result = address_provider.get_latest_address() -def test_get_latest_address_2(): - AWSExhibitorAddressProvider.get_addresses_by_lb_name = MagicMock(return_value=['aws-lb-1', 'aws-lb-2']) - AWSExhibitorAddressProvider._query_exhibitors = MagicMock() - AWSExhibitorAddressProvider._query_exhibitors.side_effect = [None, {'servers':['aws-lb-1-new'], 'port': 99}] + assert address_provider._query_exhibitors.call_count == 2 + assert actual_result == (['aws-lb-1-new'], 99) - address_provider = AWSExhibitorAddressProvider('zk-stack-name', 'eu-west') - actual_result = address_provider.get_latest_address() + def test_addresses_are_sorted(self): + address_provider = ExhibitorAddressProvider(lambda: ['aws-lb-1', 'aws-lb-2']) + address_provider._query_exhibitors = lambda _: {'servers': ['1', '2', '3'], 'port': '1234'} + tmp_result = address_provider.get_latest_address() + + # Check that two calls in sequence will return the same value + assert tmp_result == address_provider.get_latest_address() + + # Check sort 1 + address_provider._query_exhibitors = lambda _: {'servers': ['2', '1', '3'], 'port': '1234'} + assert tmp_result == address_provider.get_latest_address() + + # Check sort again (just to be sure) + address_provider._query_exhibitors = lambda _: {'servers': ['3', '2', '1'], 'port': '1234'} + assert tmp_result == address_provider.get_latest_address() - assert AWSExhibitorAddressProvider._query_exhibitors.call_count == 2 - assert actual_result == (['aws-lb-1-new'], 99) diff --git a/tests/test_zookeeper.py b/tests/test_zookeeper.py index ef915c2..d8ba312 100644 --- a/tests/test_zookeeper.py +++ b/tests/test_zookeeper.py @@ -214,6 +214,27 @@ def _update(value_): cache.touch() assert result[0] == (['test'], 1) + def test_exception_eating(self): + result = [10, None] + + def _update(value_): + result[1] = value_ + + def _load(): + if result[0] > 0: + result[0] -= 1 + raise Exception() + return ['test'], 1 + + cache = SlowlyUpdatedCache(_load, _update, 0, 0) + cache.force = False # Small hack to avoid initial refresh cycle + for i in range(0, 10): + cache.touch() + assert result[1] is None + assert result[0] == 9 - i + cache.touch() + assert result[1] == (['test'], 1) + def test_initial_update_slow(self): result = [None] call_count = [0]