Skip to content

Commit

Permalink
Issue #112 Add AggregatorBackendConfig.memoizer
Browse files Browse the repository at this point in the history
and deprecate `AggregatorConfig.memoizer`
  • Loading branch information
soxofaan committed Mar 1, 2024
1 parent c82a212 commit 1feb97d
Show file tree
Hide file tree
Showing 8 changed files with 163 additions and 116 deletions.
5 changes: 3 additions & 2 deletions src/openeo_aggregator/caching.py
Original file line number Diff line number Diff line change
Expand Up @@ -530,7 +530,8 @@ def get_memoizer(memoizer_type: str, memoizer_conf: dict) -> Memoizer:
else:
raise ValueError(memoizer_type)

memoizer_config = get_backend_config().memoizer or config.memoizer
return get_memoizer(
memoizer_type=config.memoizer.get("type", "null"),
memoizer_conf=config.memoizer.get("config", {}),
memoizer_type=memoizer_config.get("type", "null"),
memoizer_conf=memoizer_config.get("config", {}),
)
12 changes: 9 additions & 3 deletions src/openeo_aggregator/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,14 +43,16 @@ class AggregatorConfig(dict):
config_source = dict_item()

# Dictionary mapping backend id to backend url
aggregator_backends = dict_item() # TODO #112 deprecated
# TODO #112 deprecated, instead use by AggregatorBackendConfig.aggregator_backends
aggregator_backends = dict_item()

partitioned_job_tracking = dict_item(default=None) # TODO #112 deprecated
# TODO #112 deprecated, instead use AggregatorBackendConfig.partitioned_job_tracking
partitioned_job_tracking = dict_item(default=None)

# TODO #112 Deprecated, use AggregatorBackendConfig.zookeeper_prefix instead
zookeeper_prefix = dict_item(default="/openeo-aggregator/")

# See `memoizer_from_config` for details.
# TODO #112 Deprecated, use AggregatorBackendConfig.memoizer instead
memoizer = dict_item(default={"type": "dict"})

# Just a config field for test purposes (while were stripping down this config class)
Expand Down Expand Up @@ -147,6 +149,10 @@ class AggregatorBackendConfig(OpenEoBackendConfig):
# To be replaced eventually with "/openeo-aggregator/"
zookeeper_prefix: str = ""

# See `memoizer_from_config` for details.
# TODO #112: empty default is to allow migration. Te be replaced with `attrs.Factory(lambda: {"type": "dict"})`
memoizer: Dict = attrs.Factory(dict)

zk_memoizer_tracking: bool = smart_bool(os.environ.get("OPENEO_AGGREGATOR_ZK_MEMOIZER_TRACKING"))


Expand Down
4 changes: 4 additions & 0 deletions tests/backend_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,4 +19,8 @@
},
connections_cache_ttl=1.0,
zookeeper_prefix="/o-a/",
memoizer={
"type": "dict",
"config": {"default_ttl": 66},
},
)
23 changes: 12 additions & 11 deletions tests/background/test_prime_caches.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,17 +20,18 @@
}


@pytest.fixture
def config(backend1, backend2, backend1_id, backend2_id, zk_client) -> AggregatorConfig:
conf = AggregatorConfig()
conf.memoizer = {
"type": "zookeeper",
"config": {
"zk_hosts": "zk.test:2181",
"default_ttl": 24 * 60 * 60,
},
}
return conf
@pytest.fixture(autouse=True)
def _use_zookeeper_memoizer():
with config_overrides(
memoizer={
"type": "zookeeper",
"config": {
"zk_hosts": "zk.test:2181",
"default_ttl": 24 * 60 * 60,
},
}
):
yield


@pytest.fixture(autouse=True)
Expand Down
16 changes: 1 addition & 15 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,29 +68,15 @@ def zk_client() -> DummyKazooClient:
return DummyKazooClient()


DEFAULT_MEMOIZER_CONFIG = {
"type": "dict",
"config": {"default_ttl": 66},
}


@pytest.fixture
def memoizer_config() -> dict:
"""
Fixture for global memoizer config, to allow overriding/parameterizing it for certain tests.
Also see https://docs.pytest.org/en/7.1.x/how-to/fixtures.html#override-a-fixture-with-direct-test-parametrization
"""
return DEFAULT_MEMOIZER_CONFIG


@pytest.fixture
def base_config(zk_client, memoizer_config) -> AggregatorConfig:
def base_config(zk_client) -> AggregatorConfig:
"""Base config for tests (without any configured backends)."""
conf = AggregatorConfig()
conf.config_source = "test fixture base_config"
# conf.flask_error_handling = False # Temporary disable flask error handlers to simplify debugging (better stack traces).

conf.memoizer = memoizer_config

return conf

Expand Down
95 changes: 67 additions & 28 deletions tests/test_backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,9 @@
_InternalCollectionMetadata,
)
from openeo_aggregator.caching import DictMemoizer
from openeo_aggregator.config import get_backend_config
from openeo_aggregator.testing import clock_mock, config_overrides

from .conftest import DEFAULT_MEMOIZER_CONFIG

# TODO: "backend.py" should not really be authentication-aware, can we eliminate these constants
# and move the tested functionality to test_views.py?
# Also see https://github.com/Open-EO/openeo-aggregator/pull/79#discussion_r1022018851
Expand Down Expand Up @@ -72,21 +71,26 @@ def test_file_formats_simple(self, multi_backend_connection, config, backend1, b
file_formats = implementation.file_formats()
assert file_formats == just_geotiff

@pytest.mark.parametrize("memoizer_config", [
DEFAULT_MEMOIZER_CONFIG,
{"type": "jsondict", "config": {"default_ttl": 66}} # Test caching with JSON serialization too
])
@pytest.mark.parametrize(
["overrides", "expected_cache_types"],
[
({"memoizer": {"type": "dict"}}, {dict}),
({"memoizer": {"type": "jsondict"}}, {bytes}),
],
)
def test_file_formats_caching(
self,
multi_backend_connection, config, backend1, backend2, requests_mock, memoizer_config,
self, multi_backend_connection, config, backend1, backend2, requests_mock, overrides, expected_cache_types
):
just_geotiff = {
"input": {"GTiff": {"gis_data_types": ["raster"], "parameters": {}, "title": "GeoTiff"}},
"output": {"GTiff": {"gis_data_types": ["raster"], "parameters": {}, "title": "GeoTiff"}}
}
mock1 = requests_mock.get(backend1 + "/file_formats", json=just_geotiff)
mock2 = requests_mock.get(backend2 + "/file_formats", json=just_geotiff)
implementation = AggregatorBackendImplementation(backends=multi_backend_connection, config=config)

with config_overrides(**overrides):
implementation = AggregatorBackendImplementation(backends=multi_backend_connection, config=config)

file_formats = implementation.file_formats()
assert file_formats == just_geotiff
assert mock1.call_count == 1
Expand All @@ -102,8 +106,7 @@ def test_file_formats_caching(
assert isinstance(implementation._memoizer, DictMemoizer)
cache_dump = implementation._memoizer.dump(values_only=True)
assert len(cache_dump) == 1
expected_type = {"dict": dict, "jsondict": bytes}[memoizer_config["type"]]
assert all(isinstance(x, expected_type) for x in cache_dump)
assert set(type(v) for v in cache_dump) == expected_cache_types

def test_file_formats_merging(self, multi_backend_connection, config, backend1, backend2, requests_mock):
requests_mock.get(backend1 + "/file_formats", json={
Expand Down Expand Up @@ -1798,14 +1801,22 @@ def test_generate_backend_constraint_callables(self):
assert differs_from_b2("b2") is False
assert differs_from_b2("b3") is True

@pytest.mark.parametrize("memoizer_config", [
DEFAULT_MEMOIZER_CONFIG,
{"type": "jsondict", "config": {"default_ttl": 66}} # Test caching with JSON serialization too
])
def test_get_all_metadata_caching(self, catalog, backend1, backend2, requests_mock, memoizer_config):
@pytest.mark.parametrize(
["overrides", "expected_cache_types"],
[
({"memoizer": {"type": "dict"}}, {tuple}),
({"memoizer": {"type": "jsondict", "config": {"default_ttl": 66}}}, {bytes}),
],
)
def test_get_all_metadata_caching(
self, multi_backend_connection, config, backend1, backend2, requests_mock, overrides, expected_cache_types
):
b1am = requests_mock.get(backend1 + "/collections", json={"collections": [{"id": "S2"}]})
b2am = requests_mock.get(backend2 + "/collections", json={"collections": [{"id": "S2"}]})

with config_overrides(**overrides):
catalog = AggregatorCollectionCatalog(backends=multi_backend_connection, config=config)

metadata = catalog.get_all_metadata()
assert metadata == [DictSubSet({"id": "S2"})]
assert (b1am.call_count, b2am.call_count) == (1, 1)
Expand All @@ -1820,16 +1831,29 @@ def test_get_all_metadata_caching(self, catalog, backend1, backend2, requests_mo
assert metadata == [DictSubSet({"id": "S2"})]
assert (b1am.call_count, b2am.call_count) == (2, 2)

@pytest.mark.parametrize("memoizer_config", [
DEFAULT_MEMOIZER_CONFIG,
{"type": "jsondict", "config": {"default_ttl": 66}} # Test caching with JSON serialization too
])
def test_get_collection_metadata_caching(self, catalog, backend1, backend2, requests_mock, memoizer_config):
assert isinstance(catalog._memoizer, DictMemoizer)
cache_dump = catalog._memoizer.dump(values_only=True)
assert set(type(v) for v in cache_dump) == expected_cache_types

@pytest.mark.parametrize(
["overrides", "expected_cache_types"],
[
({"memoizer": {"type": "dict"}}, {tuple, dict}),
({"memoizer": {"type": "jsondict"}}, {bytes}),
],
)
def test_get_collection_metadata_caching(
self, multi_backend_connection, config, backend1, backend2, requests_mock, overrides, expected_cache_types
):
requests_mock.get(backend1 + "/collections", json={"collections": [{"id": "S2"}]})
b1s2 = requests_mock.get(backend1 + "/collections/S2", json={"id": "S2", "title": "b1's S2"})
requests_mock.get(backend2 + "/collections", json={"collections": [{"id": "S2"}]})
b2s2 = requests_mock.get(backend2 + "/collections/S2", json={"id": "S2", "title": "b2's S2"})

with config_overrides(**overrides):
catalog = AggregatorCollectionCatalog(backends=multi_backend_connection, config=config)


metadata = catalog.get_collection_metadata("S2")
assert metadata == DictSubSet({'id': 'S2', 'title': "b1's S2"})
assert (b1s2.call_count, b2s2.call_count) == (1, 1)
Expand All @@ -1844,6 +1868,10 @@ def test_get_collection_metadata_caching(self, catalog, backend1, backend2, requ
assert metadata == DictSubSet({'id': 'S2', 'title': "b1's S2"})
assert (b1s2.call_count, b2s2.call_count) == (2, 2)

assert isinstance(catalog._memoizer, DictMemoizer)
cache_dump = catalog._memoizer.dump(values_only=True)
assert set(type(v) for v in cache_dump) == expected_cache_types


class TestJobIdMapping:

Expand Down Expand Up @@ -1978,21 +2006,27 @@ def test_get_process_registry(
},
]

@pytest.mark.parametrize("memoizer_config", [
DEFAULT_MEMOIZER_CONFIG,
{"type": "jsondict", "config": {"default_ttl": 66}} # Test caching with JSON serialization too
])
@pytest.mark.parametrize(
["overrides", "expected_cache_types"],
[
({"memoizer": {"type": "dict"}}, {dict}),
({"memoizer": {"type": "jsondict"}}, {bytes}),
],
)
def test_get_process_registry_caching(
self, catalog, multi_backend_connection, config, backend1, backend2, requests_mock,
memoizer_config
self, multi_backend_connection, config, backend1, backend2, requests_mock, overrides, expected_cache_types
):
b1p = requests_mock.get(backend1 + "/processes", json={"processes": [
{"id": "add", "parameters": [{"name": "x"}, {"name": "y"}]},
]})
b2p = requests_mock.get(backend2 + "/processes", json={"processes": [
{"id": "multiply", "parameters": [{"name": "x"}, {"name": "y"}]},
]})
processing = AggregatorProcessing(backends=multi_backend_connection, catalog=catalog, config=config)

with config_overrides(**overrides):
catalog = AggregatorCollectionCatalog(backends=multi_backend_connection, config=config)
processing = AggregatorProcessing(backends=multi_backend_connection, catalog=catalog, config=config)

assert (b1p.call_count, b2p.call_count) == (0, 0)

_ = processing.get_process_registry(api_version="1.0.0")
Expand All @@ -2006,6 +2040,11 @@ def test_get_process_registry_caching(
_ = processing.get_process_registry(api_version="1.0.0")
assert (b1p.call_count, b2p.call_count) == (2, 2)

assert isinstance(processing._memoizer, DictMemoizer)
cache_dump = processing._memoizer.dump(values_only=True)
assert set(type(v) for v in cache_dump) == expected_cache_types


def test_get_process_registry_parameter_differences(
self, catalog, multi_backend_connection, config, backend1, backend2,
requests_mock,
Expand Down
73 changes: 36 additions & 37 deletions tests/test_caching.py
Original file line number Diff line number Diff line change
Expand Up @@ -642,15 +642,16 @@ def test_corrupted_cache(self, zk_client, caplog):

@clock_mock(0)
def test_from_config(self, config, zk_client):
config.memoizer = {
"type": "zookeeper",
"config": {
"zk_hosts": "zk1.test:2181,zk2.test:2181",
"default_ttl": 123,
"zk_timeout": 7.25,
with config_overrides(
memoizer={
"type": "zookeeper",
"config": {
"zk_hosts": "zk1.test:2181,zk2.test:2181",
"default_ttl": 123,
"zk_timeout": 7.25,
},
}
}
with mock.patch.object(openeo_aggregator.caching, "KazooClient", return_value=zk_client) as KazooClient:
), mock.patch.object(openeo_aggregator.caching, "KazooClient", return_value=zk_client) as KazooClient:
zk_cache = memoizer_from_config(config, namespace="tezt")

KazooClient.assert_called_with(hosts="zk1.test:2181,zk2.test:2181")
Expand Down Expand Up @@ -697,49 +698,47 @@ def fun2_cached():

class TestMemoizerFromConfig:

def test_null_memoizer(self):
config = AggregatorConfig()
config.memoizer = {"type": "null"}
memoizer = memoizer_from_config(config, namespace="test")
def test_null_memoizer(self, config):
with config_overrides(memoizer={"type": "null"}):
memoizer = memoizer_from_config(config, namespace="test")
assert isinstance(memoizer, NullMemoizer)

def test_dict_memoizer(self):
config = AggregatorConfig()
config.memoizer = {"type": "dict", "config": {"default_ttl": 99}}
memoizer = memoizer_from_config(config, namespace="test")
def test_dict_memoizer(self, config):
with config_overrides(memoizer={"type": "dict", "config": {"default_ttl": 99}}):
memoizer = memoizer_from_config(config, namespace="test")
assert isinstance(memoizer, DictMemoizer)
assert memoizer._default_ttl == 99

def test_jsondict_memoizer(self):
config = AggregatorConfig()
config.memoizer = {"type": "jsondict", "config": {"default_ttl": 99}}
memoizer = memoizer_from_config(config, namespace="test")
def test_jsondict_memoizer(self, config):
with config_overrides(memoizer={"type": "jsondict", "config": {"default_ttl": 99}}):
memoizer = memoizer_from_config(config, namespace="test")
assert isinstance(memoizer, JsonDictMemoizer)
assert memoizer._default_ttl == 99

def test_zookeeper_memoizer(self):
config = AggregatorConfig()
config.memoizer = {
"type": "zookeeper",
"config": {"zk_hosts": "zk.test:2181", "default_ttl": 99, "zk_timeout": 88}
}
with config_overrides(zookeeper_prefix="/oea/test"):
def test_zookeeper_memoizer(self, config):
with config_overrides(
memoizer={"type": "zookeeper", "config": {"zk_hosts": "zk.test:2181", "default_ttl": 99, "zk_timeout": 88}},
zookeeper_prefix="/oea/test",
):
memoizer = memoizer_from_config(config, namespace="test-ns")
assert isinstance(memoizer, ZkMemoizer)
assert memoizer._default_ttl == 99
assert memoizer._prefix == "/oea/test/cache/test-ns"
assert memoizer._zk_timeout == 88

def test_chained_memoizer(self):
config = AggregatorConfig()
config.memoizer = {
"type": "chained",
"config": {"parts": [
{"type": "jsondict", "config": {"default_ttl": 99}},
{"type": "dict", "config": {"default_ttl": 333}},
]}
}
memoizer = memoizer_from_config(config, namespace="test-ns")
def test_chained_memoizer(self, config):
with config_overrides(
memoizer={
"type": "chained",
"config": {
"parts": [
{"type": "jsondict", "config": {"default_ttl": 99}},
{"type": "dict", "config": {"default_ttl": 333}},
]
},
}
):
memoizer = memoizer_from_config(config, namespace="test-ns")
assert isinstance(memoizer, ChainedMemoizer)
assert len(memoizer._memoizers) == 2
assert isinstance(memoizer._memoizers[0], JsonDictMemoizer)
Expand Down
Loading

0 comments on commit 1feb97d

Please sign in to comment.