Skip to content
This repository has been archived by the owner on Oct 7, 2024. It is now read-only.

Commit

Permalink
Merge pull request #60 from zalando-incubator/I59
Browse files Browse the repository at this point in the history
Handle network errors in proper way, Closes #59
  • Loading branch information
antban authored Sep 12, 2016
2 parents dbf0297 + 1d277eb commit c9e5cf8
Show file tree
Hide file tree
Showing 8 changed files with 111 additions and 83 deletions.
2 changes: 1 addition & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -34,4 +34,4 @@ RUN pip3 install --no-cache-dir -r requirements.txt
EXPOSE 9092 8080 8778

RUN python3 setup.py develop
ENTRYPOINT ["/bin/bash", "-c", "exec bubuku"]
ENTRYPOINT ["/bin/bash", "-c", "exec bubuku-daemon"]
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ Start supervisor:
```
KAFKA_DIR=/opt/kafka KAFKA_SETTINGS=/opt/kafka/config/server.properties ZOOKEEPER_STACK_NAME=my_zookeeper \
ZOOKEEPER_PREFIX=/test BROKER_ID_POLICY=ip BUKU_FEATURES=restart_on_exhibitor,graceful_terminate \
HEALTH_PORT=8888 bubuku
HEALTH_PORT=8888 bubuku-daemon
```
Run commands on cluster:
```
Expand Down
64 changes: 39 additions & 25 deletions bubuku/env_provider.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,15 @@
import json
import logging
import uuid
from functools import partial

import boto3
import requests

from bubuku.id_generator import BrokerIDByIp, BrokerIdAutoAssign
from bubuku.zookeeper import BukuExhibitor
from bubuku.zookeeper.exhibitor import AWSExhibitorAddressProvider
from bubuku.zookeeper.exhibitor import LocalAddressProvider
from bubuku.config import Config, KafkaProperties
import uuid
from bubuku.id_generator import BrokerIDByIp, BrokerIdAutoAssign
from bubuku.zookeeper import BukuExhibitor, AddressListProvider
from bubuku.zookeeper.exhibitor import ExhibitorAddressProvider

_LOG = logging.getLogger('bubuku.amazon')

Expand All @@ -35,46 +36,59 @@ def create_env_provider(config: Config):

class AmazonEnvProvider(EnvProvider):
def __init__(self, config: Config):
self.document = None
self.aws_addr = '169.254.169.254'
self.config = config
self.ip_address = None

def _get_document(self) -> dict:
if not self.document:
try:
self.document = requests.get(
'http://{}/latest/dynamic/instance-identity/document'.format(self.aws_addr),
timeout=5).json()
_LOG.info("Amazon specific information loaded from AWS: {}".format(
json.dumps(self.document, indent=2)))
except Exception as ex:
_LOG.warn('Failed to download AWS document', exc_info=ex)
return self.document

def get_aws_region(self) -> str:
doc = self._get_document()
return doc['region'] if doc else None
document = requests.get('http://{}/latest/dynamic/instance-identity/document'.format(self.aws_addr),
timeout=5).json()
_LOG.info("Amazon specific information loaded from AWS: {}".format(json.dumps(document, indent=2)))
return document

def get_id(self) -> str:
doc = self._get_document()
return doc['privateIp'] if doc else '127.0.0.1'
if not self.ip_address:
self.ip_address = self._get_document()['privateIp']
return self.ip_address

def _load_instance_ips(self, lb_name: str):
region = self._get_document()['region']

private_ips = []

elb = boto3.client('elb', region_name=region)
ec2 = boto3.client('ec2', region_name=region)

response = elb.describe_instance_health(LoadBalancerName=lb_name)

for instance in response['InstanceStates']:
if instance['State'] == 'InService':
private_ips.append(ec2.describe_instances(
InstanceIds=[instance['InstanceId']])['Reservations'][0]['Instances'][0]['PrivateIpAddress'])

_LOG.info("Ip addresses for {} are: {}".format(lb_name, private_ips))
return private_ips

def get_address_provider(self):
return AWSExhibitorAddressProvider(self.config.zk_stack_name, self.get_aws_region())
return ExhibitorAddressProvider(partial(self._load_instance_ips, self.config.zk_stack_name))

def create_broker_id_manager(self, zk: BukuExhibitor, kafka_props: KafkaProperties):
return BrokerIDByIp(zk, self.get_id(), kafka_props)


class _LocalAddressProvider(AddressListProvider):
def get_latest_address(self) -> (list, int):
return ('zookeeper',), 2181


class LocalEnvProvider(EnvProvider):
unique_id = str(uuid.uuid4())

def get_id(self) -> str:
return self.unique_id

def get_address_provider(self):
return LocalAddressProvider()
return _LocalAddressProvider()

def create_broker_id_manager(self, zk: BukuExhibitor, kafka_props: KafkaProperties):
return BrokerIdAutoAssign(zk, kafka_props)

11 changes: 9 additions & 2 deletions bubuku/zookeeper/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,10 +49,10 @@ def touch(self):
value = None
if self.force:
while value is None:
value = self.load_func()
value = self._load_value_safe()
self.force = False
else:
value = self.load_func()
value = self._load_value_safe()
if value is not None:
if value != self.value:
self.value = value
Expand All @@ -62,6 +62,13 @@ def touch(self):
self.update_func(self.value)
self.next_apply = None

def _load_value_safe(self):
try:
return self.load_func()
except Exception as e:
_LOG.error('Failed to load value to update', exc_info=e)
return None


class AddressListProvider(object):
def get_latest_address(self) -> (list, int):
Expand Down
36 changes: 4 additions & 32 deletions bubuku/zookeeper/exhibitor.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,23 +3,21 @@

import requests
from requests import RequestException
import boto3

from bubuku.zookeeper import AddressListProvider

_LOG = logging.getLogger('bubuku.zookeeper.exhibitor')


class AWSExhibitorAddressProvider(AddressListProvider):
def __init__(self, zk_stack_name: str, region: str):
self.zk_stack_name = zk_stack_name
self.region = region
class ExhibitorAddressProvider(AddressListProvider):
def __init__(self, initial_list_provider):
self.initial_list_provider = initial_list_provider
self.exhibitors = []

def get_latest_address(self) -> (list, int):
json_ = self._query_exhibitors(self.exhibitors)
if not json_:
self.exhibitors = self.get_addresses_by_lb_name()
self.exhibitors = self.initial_list_provider()
json_ = self._query_exhibitors(self.exhibitors)
if isinstance(json_, dict) and 'servers' in json_ and 'port' in json_:
self.exhibitors = json_['servers']
Expand All @@ -38,29 +36,3 @@ def _query_exhibitors(self, exhibitors):
except RequestException as e:
_LOG.warn('Failed to query zookeeper list information from {}'.format(url), exc_info=e)
return None

def get_addresses_by_lb_name(self) -> list:
lb_name = self.zk_stack_name

private_ips = []

if self.region is not None:
elb = boto3.client('elb', region_name=self.region)
ec2 = boto3.client('ec2', region_name=self.region)

response = elb.describe_instance_health(LoadBalancerName=lb_name)

for instance in response['InstanceStates']:
if instance['State'] == 'InService':
private_ips.append(ec2.describe_instances(
InstanceIds=[instance['InstanceId']])['Reservations'][0]['Instances'][0]['PrivateIpAddress'])

else:
private_ips = [lb_name]
_LOG.info("Ip addresses for {} are: {}".format(lb_name, private_ips))
return private_ips


class LocalAddressProvider(AddressListProvider):
def get_latest_address(self) -> (list, int):
return ('zookeeper',), 2181
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ def read_version(package):
]

CONSOLE_SCRIPTS = [
'bubuku = bubuku.daemon:main',
'bubuku-daemon = bubuku.daemon:main',
'bubuku-cli = bubuku.cli:cli'
]

Expand Down
56 changes: 35 additions & 21 deletions tests/test_exhibitor.py
Original file line number Diff line number Diff line change
@@ -1,34 +1,48 @@
import unittest
from unittest.mock import MagicMock
from bubuku.zookeeper.exhibitor import AWSExhibitorAddressProvider

from bubuku.zookeeper.exhibitor import ExhibitorAddressProvider

def test_get_latest_address():
AWSExhibitorAddressProvider.get_addresses_by_lb_name = MagicMock(return_value=['aws-lb-1', 'aws-lb-2'])
AWSExhibitorAddressProvider._query_exhibitors = MagicMock(return_value={'servers':['aws-lb-1-new'], 'port': 99})

address_provider = AWSExhibitorAddressProvider('zk-stack-name', 'eu-west')
actual_result = address_provider.get_latest_address()
class ExhibitorAddressProviderTest(unittest.TestCase):
def test_get_latest_address(self):
address_provider = ExhibitorAddressProvider(lambda: ['aws-lb-1', 'aws-lb-2'])
address_provider._query_exhibitors = lambda _: {'servers': ['aws-lb-1-new'], 'port': 99}

assert actual_result == (['aws-lb-1-new'], 99)
actual_result = address_provider.get_latest_address()

assert actual_result == (['aws-lb-1-new'], 99)

def test_get_latest_address_no_exhibitors():
AWSExhibitorAddressProvider.get_addresses_by_lb_name = MagicMock(return_value=['aws-lb-1', 'aws-lb-2'])
AWSExhibitorAddressProvider._query_exhibitors = MagicMock(return_value=None)
def test_get_latest_address_no_exhibitors(self):
address_provider = ExhibitorAddressProvider(lambda: ['aws-lb-1', 'aws-lb-2'])
address_provider._query_exhibitors = lambda _: None

address_provider = AWSExhibitorAddressProvider('zk-stack-name', 'eu-west')
actual_result = address_provider.get_latest_address()
actual_result = address_provider.get_latest_address()
assert actual_result is None

assert actual_result is None
def test_get_latest_address_2(self):
address_provider = ExhibitorAddressProvider(lambda: ['aws-lb-1', 'aws-lb-2'])
address_provider._query_exhibitors = MagicMock()
address_provider._query_exhibitors.side_effect = [None, {'servers': ['aws-lb-1-new'], 'port': 99}]

actual_result = address_provider.get_latest_address()

def test_get_latest_address_2():
AWSExhibitorAddressProvider.get_addresses_by_lb_name = MagicMock(return_value=['aws-lb-1', 'aws-lb-2'])
AWSExhibitorAddressProvider._query_exhibitors = MagicMock()
AWSExhibitorAddressProvider._query_exhibitors.side_effect = [None, {'servers':['aws-lb-1-new'], 'port': 99}]
assert address_provider._query_exhibitors.call_count == 2
assert actual_result == (['aws-lb-1-new'], 99)

address_provider = AWSExhibitorAddressProvider('zk-stack-name', 'eu-west')
actual_result = address_provider.get_latest_address()
def test_addresses_are_sorted(self):
address_provider = ExhibitorAddressProvider(lambda: ['aws-lb-1', 'aws-lb-2'])
address_provider._query_exhibitors = lambda _: {'servers': ['1', '2', '3'], 'port': '1234'}
tmp_result = address_provider.get_latest_address()

# Check that two calls in sequence will return the same value
assert tmp_result == address_provider.get_latest_address()

# Check sort 1
address_provider._query_exhibitors = lambda _: {'servers': ['2', '1', '3'], 'port': '1234'}
assert tmp_result == address_provider.get_latest_address()

# Check sort again (just to be sure)
address_provider._query_exhibitors = lambda _: {'servers': ['3', '2', '1'], 'port': '1234'}
assert tmp_result == address_provider.get_latest_address()

assert AWSExhibitorAddressProvider._query_exhibitors.call_count == 2
assert actual_result == (['aws-lb-1-new'], 99)
21 changes: 21 additions & 0 deletions tests/test_zookeeper.py
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,27 @@ def _update(value_):
cache.touch()
assert result[0] == (['test'], 1)

def test_exception_eating(self):
result = [10, None]

def _update(value_):
result[1] = value_

def _load():
if result[0] > 0:
result[0] -= 1
raise Exception()
return ['test'], 1

cache = SlowlyUpdatedCache(_load, _update, 0, 0)
cache.force = False # Small hack to avoid initial refresh cycle
for i in range(0, 10):
cache.touch()
assert result[1] is None
assert result[0] == 9 - i
cache.touch()
assert result[1] == (['test'], 1)

def test_initial_update_slow(self):
result = [None]
call_count = [0]
Expand Down

0 comments on commit c9e5cf8

Please sign in to comment.