Skip to content
This repository has been archived by the owner on Oct 7, 2024. It is now read-only.

Commit

Permalink
Make tests for SlowCache
Browse files Browse the repository at this point in the history
  • Loading branch information
antban committed Aug 8, 2016
1 parent c7be581 commit f79529d
Show file tree
Hide file tree
Showing 3 changed files with 103 additions and 8 deletions.
17 changes: 11 additions & 6 deletions bubuku/zookeeper/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,22 +38,27 @@ def __init__(self, load_func, update_func, refresh_timeout, delay):
self.next_apply = None
self.force = True

def __str__(self):
return 'SlowCache(refresh={}, delay={}, last_check={}, next_apply={})'.format(
self.refresh_timeout, self.delay, self.last_check, self.next_apply)

def touch(self):
now = time.time()
if self.last_check is None or (now - self.last_check) > self.delay:
if self.last_check is None or (now - self.last_check) > self.refresh_timeout:
value = None
if self.force:
while value is None:
value = self.load_func()
self.force = False
else:
value = self.load_func()
if value is not None:
if value is not None and value != self.value:
self.value = value
self.next_apply = (now + self.delay) if self.last_check is not None else now
self.last_check = self.last_check
self.last_check = now
if self.next_apply is not None and self.next_apply - now <= 0:
self.update_func(self.value)
self.next_apply = None


class AddressListProvider(object):
Expand All @@ -75,8 +80,8 @@ def __init__(self, address_provider: AddressListProvider, prefix: str):
self.hosts_cache = SlowlyUpdatedCache(
self.address_provider.get_latest_address,
self._update_hosts,
30,
3 * 60)
30, # Refresh every 30 seconds
3 * 60) # Update only after 180 seconds of stability

def _update_hosts(self, value):
hosts, port = value
Expand Down Expand Up @@ -279,6 +284,6 @@ def unregister_change(self, name):
self.exhibitor.delete('/bubuku/changes/{}'.format(name), recursive=True)


def load_exhibitor_proxy(address_provider: AddressListProvider, prefix:str) -> BukuExhibitor:
def load_exhibitor_proxy(address_provider: AddressListProvider, prefix: str) -> BukuExhibitor:
proxy = _ZookeeperProxy(address_provider, prefix)
return BukuExhibitor(proxy)
1 change: 0 additions & 1 deletion bubuku/zookeeper/exhibior.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ class AWSExhibitorAddressProvider(AddressListProvider):
def __init__(self, amazon: Amazon, zk_stack_name: str):
self.master_exhibitors = amazon.get_addresses_by_lb_name(zk_stack_name)
self.exhibitors = list(self.master_exhibitors)
self.port = 2181

def get_latest_address(self) -> (list, int):
json_ = self._query_exhibitors(self.exhibitors)
Expand Down
93 changes: 92 additions & 1 deletion tests/test_exhibitor.py → tests/test_zookeeper.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
import json
import math
import re
import time
import unittest
from unittest.mock import MagicMock

from kazoo.exceptions import NoNodeError, NodeExistsError

from bubuku.zookeeper import BukuExhibitor
from bubuku.zookeeper import BukuExhibitor, SlowlyUpdatedCache


def test_get_broker_ids():
Expand Down Expand Up @@ -197,3 +200,91 @@ def _create(path, value=None, **kwargs):
assert buku.reallocate_partition('t01', 0, [1, 2, 3])
# Node exists
assert not buku.reallocate_partition('t01', 0, [1, 2, 3])


class SlowlyUpdatedCacheTest(unittest.TestCase):
def test_initial_update_fast(self):
result = [None]

def _update(value_):
result[0] = value_

cache = SlowlyUpdatedCache(lambda: (['test'], 1), _update, 0, 0)

cache.touch()
assert result[0] == (['test'], 1)

def test_initial_update_slow(self):
result = [None]
call_count = [0]

def _load():
call_count[0] += 1
if call_count[0] == 100:
return ['test'], 1
return None

def _update(value_):
result[0] = value_

cache = SlowlyUpdatedCache(_load, _update, 0, 0)

cache.touch()
assert call_count[0] == 100
assert result[0] == (['test'], 1)

def test_delays_illegal(self):
result = [None]
load_calls = []
update_calls = []

def _load():
load_calls.append(time.time())
return ['test'], 0 if len(load_calls) > 1 else 1

def _update(value_):
update_calls.append(time.time())
result[0] = value_

# refresh every 1 second, delay 0.5 second
cache = SlowlyUpdatedCache(_load, _update, 0.5, 0.25)

while len(update_calls) != 2:
time.sleep(0.1)
cache.touch()
print(cache)

assert math.fabs(update_calls[0] - load_calls[0]) <= 0.15 # 0.1 + 0.1/2
# Verify that load calls were made one by another
assert math.fabs(load_calls[1] - load_calls[0] - .5) <= 0.15
# Verity that update call was made in correct interval

assert load_calls[1] + 0.25 <= update_calls[1] <= load_calls[1] + 0.25 + 0.15

def test_delays_legal(self):
result = [None]
main_call = []
load_calls = []
update_calls = []

def _load():
load_calls.append(time.time())
if len(load_calls) == 5:
main_call.append(time.time())
return ['test'], 0 if len(load_calls) >= 5 else len(load_calls)

def _update(value_):
update_calls.append(time.time())
result[0] = value_

# refresh every 1 second, delay 5 second - in case where situation is constantly changing - wait for
# last stable update
cache = SlowlyUpdatedCache(_load, _update, 0.5, 3)

while len(update_calls) != 2:
time.sleep(0.1)
cache.touch()
print(cache)

assert len(main_call) == 1
assert main_call[0] + 3 - .15 < update_calls[1] < main_call[0] + 3 + .15

0 comments on commit f79529d

Please sign in to comment.