diff --git a/bubuku/zookeeper/__init__.py b/bubuku/zookeeper/__init__.py index 34c7ff5..4d85454 100644 --- a/bubuku/zookeeper/__init__.py +++ b/bubuku/zookeeper/__init__.py @@ -38,9 +38,13 @@ def __init__(self, load_func, update_func, refresh_timeout, delay): self.next_apply = None self.force = True + def __str__(self): + return 'SlowCache(refresh={}, delay={}, last_check={}, next_apply={})'.format( + self.refresh_timeout, self.delay, self.last_check, self.next_apply) + def touch(self): now = time.time() - if self.last_check is None or (now - self.last_check) > self.delay: + if self.last_check is None or (now - self.last_check) > self.refresh_timeout: value = None if self.force: while value is None: @@ -48,12 +52,13 @@ def touch(self): self.force = False else: value = self.load_func() - if value is not None: + if value is not None and value != self.value: self.value = value self.next_apply = (now + self.delay) if self.last_check is not None else now - self.last_check = self.last_check + self.last_check = now if self.next_apply is not None and self.next_apply - now <= 0: self.update_func(self.value) + self.next_apply = None class AddressListProvider(object): @@ -75,8 +80,8 @@ def __init__(self, address_provider: AddressListProvider, prefix: str): self.hosts_cache = SlowlyUpdatedCache( self.address_provider.get_latest_address, self._update_hosts, - 30, - 3 * 60) + 30, # Refresh every 30 seconds + 3 * 60) # Update only after 180 seconds of stability def _update_hosts(self, value): hosts, port = value @@ -279,6 +284,6 @@ def unregister_change(self, name): self.exhibitor.delete('/bubuku/changes/{}'.format(name), recursive=True) -def load_exhibitor_proxy(address_provider: AddressListProvider, prefix:str) -> BukuExhibitor: +def load_exhibitor_proxy(address_provider: AddressListProvider, prefix: str) -> BukuExhibitor: proxy = _ZookeeperProxy(address_provider, prefix) return BukuExhibitor(proxy) diff --git a/bubuku/zookeeper/exhibior.py b/bubuku/zookeeper/exhibior.py index 7813f41..25fe489 100644 --- a/bubuku/zookeeper/exhibior.py +++ b/bubuku/zookeeper/exhibior.py @@ -14,7 +14,6 @@ class AWSExhibitorAddressProvider(AddressListProvider): def __init__(self, amazon: Amazon, zk_stack_name: str): self.master_exhibitors = amazon.get_addresses_by_lb_name(zk_stack_name) self.exhibitors = list(self.master_exhibitors) - self.port = 2181 def get_latest_address(self) -> (list, int): json_ = self._query_exhibitors(self.exhibitors) diff --git a/tests/test_exhibitor.py b/tests/test_zookeeper.py similarity index 69% rename from tests/test_exhibitor.py rename to tests/test_zookeeper.py index 806124a..e9a3f2d 100644 --- a/tests/test_exhibitor.py +++ b/tests/test_zookeeper.py @@ -1,10 +1,13 @@ import json +import math import re +import time +import unittest from unittest.mock import MagicMock from kazoo.exceptions import NoNodeError, NodeExistsError -from bubuku.zookeeper import BukuExhibitor +from bubuku.zookeeper import BukuExhibitor, SlowlyUpdatedCache def test_get_broker_ids(): @@ -197,3 +200,91 @@ def _create(path, value=None, **kwargs): assert buku.reallocate_partition('t01', 0, [1, 2, 3]) # Node exists assert not buku.reallocate_partition('t01', 0, [1, 2, 3]) + + +class SlowlyUpdatedCacheTest(unittest.TestCase): + def test_initial_update_fast(self): + result = [None] + + def _update(value_): + result[0] = value_ + + cache = SlowlyUpdatedCache(lambda: (['test'], 1), _update, 0, 0) + + cache.touch() + assert result[0] == (['test'], 1) + + def test_initial_update_slow(self): + result = [None] + call_count = [0] + + def _load(): + call_count[0] += 1 + if call_count[0] == 100: + return ['test'], 1 + return None + + def _update(value_): + result[0] = value_ + + cache = SlowlyUpdatedCache(_load, _update, 0, 0) + + cache.touch() + assert call_count[0] == 100 + assert result[0] == (['test'], 1) + + def test_delays_illegal(self): + result = [None] + load_calls = [] + update_calls = [] + + def _load(): + load_calls.append(time.time()) + return ['test'], 0 if len(load_calls) > 1 else 1 + + def _update(value_): + update_calls.append(time.time()) + result[0] = value_ + + # refresh every 1 second, delay 0.5 second + cache = SlowlyUpdatedCache(_load, _update, 0.5, 0.25) + + while len(update_calls) != 2: + time.sleep(0.1) + cache.touch() + print(cache) + + assert math.fabs(update_calls[0] - load_calls[0]) <= 0.15 # 0.1 + 0.1/2 + # Verify that load calls were made one by another + assert math.fabs(load_calls[1] - load_calls[0] - .5) <= 0.15 + # Verity that update call was made in correct interval + + assert load_calls[1] + 0.25 <= update_calls[1] <= load_calls[1] + 0.25 + 0.15 + + def test_delays_legal(self): + result = [None] + main_call = [] + load_calls = [] + update_calls = [] + + def _load(): + load_calls.append(time.time()) + if len(load_calls) == 5: + main_call.append(time.time()) + return ['test'], 0 if len(load_calls) >= 5 else len(load_calls) + + def _update(value_): + update_calls.append(time.time()) + result[0] = value_ + + # refresh every 1 second, delay 5 second - in case where situation is constantly changing - wait for + # last stable update + cache = SlowlyUpdatedCache(_load, _update, 0.5, 3) + + while len(update_calls) != 2: + time.sleep(0.1) + cache.touch() + print(cache) + + assert len(main_call) == 1 + assert main_call[0] + 3 - .15 < update_calls[1] < main_call[0] + 3 + .15