Skip to content

Commit

Permalink
Issue #112 Add AggregatorBackendConfig.partitioned_job_tracking
Browse files Browse the repository at this point in the history
and deprecate `AggregatorConfig.partitioned_job_tracking`
  • Loading branch information
soxofaan committed Mar 1, 2024
1 parent eb52345 commit c82a212
Show file tree
Hide file tree
Showing 10 changed files with 50 additions and 24 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,10 @@ All notable changes to this project will be documented in this file.

The format is roughly based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/).

## [0.24.0]

- Add `AggregatorBackendConfig.partitioned_job_tracking` and deprecate `AggregatorConfig.partitioned_job_tracking` ([#112](https://github.com/Open-EO/openeo-aggregator/issues/112))

## [0.23.0]

- Add `AggregatorBackendConfig.aggregator_backends` and deprecate `AggregatorConfig.aggregator_backends` ([#112](https://github.com/Open-EO/openeo-aggregator/issues/112))
Expand Down
6 changes: 3 additions & 3 deletions conf/aggregator.dev.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,9 +44,6 @@

aggregator_config = AggregatorConfig(
config_source=__file__,
partitioned_job_tracking={
"zk_hosts": ZK_HOSTS,
},
zookeeper_prefix="/openeo/aggregator-dev/",
memoizer={
# See `memoizer_from_config` for more details
Expand Down Expand Up @@ -82,4 +79,7 @@
# Sentinel Hub OpenEO by Sinergise
"sentinelhub": "https://openeo.sentinel-hub.com/production/",
},
partitioned_job_tracking={
"zk_hosts": ZK_HOSTS,
},
)
6 changes: 3 additions & 3 deletions conf/aggregator.prod.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,6 @@

aggregator_config = AggregatorConfig(
config_source=__file__,
partitioned_job_tracking={
"zk_hosts": ZK_HOSTS,
},
zookeeper_prefix="/openeo/aggregator/",
memoizer={
# See `memoizer_from_config` for more details
Expand Down Expand Up @@ -75,4 +72,7 @@
# Sentinel Hub OpenEO by Sinergise
"sentinelhub": "https://openeo.sentinel-hub.com/production/",
},
partitioned_job_tracking={
"zk_hosts": ZK_HOSTS,
},
)
2 changes: 1 addition & 1 deletion src/openeo_aggregator/about.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
import sys
from typing import Optional

__version__ = "0.23.0a1"
__version__ = "0.24.0a1"


def log_version_info(logger: Optional[logging.Logger] = None):
Expand Down
2 changes: 1 addition & 1 deletion src/openeo_aggregator/backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -1295,7 +1295,7 @@ def __init__(self, backends: MultiBackendConnection, config: AggregatorConfig):
config=config,
)

if config.partitioned_job_tracking:
if get_backend_config().partitioned_job_tracking or config.partitioned_job_tracking:
partitioned_job_tracker = PartitionedJobTracker.from_config(config=config, backends=self._backends)
else:
partitioned_job_tracker = None
Expand Down
6 changes: 5 additions & 1 deletion src/openeo_aggregator/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,8 @@ class AggregatorConfig(dict):
# Dictionary mapping backend id to backend url
aggregator_backends = dict_item() # TODO #112 deprecated

partitioned_job_tracking = dict_item(default=None)
partitioned_job_tracking = dict_item(default=None) # TODO #112 deprecated

# TODO #112 Deprecated, use AggregatorBackendConfig.zookeeper_prefix instead
zookeeper_prefix = dict_item(default="/openeo-aggregator/")

Expand Down Expand Up @@ -129,6 +130,9 @@ class AggregatorBackendConfig(OpenEoBackendConfig):
# TODO #112 temporary default to allow migration, but make this field mandatory (and require non-empty)
aggregator_backends: Dict[str, str] = attrs.Factory(dict)

# See `ZooKeeperPartitionedJobDB.from_config` for supported fields.
partitioned_job_tracking: Optional[dict] = None

streaming_chunk_size: int = STREAM_CHUNK_SIZE_DEFAULT

auth_entitlement_check: Union[bool, dict] = False
Expand Down
11 changes: 6 additions & 5 deletions src/openeo_aggregator/partitionedjobs/zookeeper.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,16 +43,17 @@ def __init__(self, client: KazooClient, prefix: str = None):
@classmethod
def from_config(cls, config: AggregatorConfig) -> "ZooKeeperPartitionedJobDB":
# Get ZooKeeper client
if config.partitioned_job_tracking.get("zk_client"):
zk_client = config.partitioned_job_tracking["zk_client"]
elif config.partitioned_job_tracking.get("zk_hosts"):
zk_client = KazooClient(config.partitioned_job_tracking.get("zk_hosts"))
pjt_config = get_backend_config().partitioned_job_tracking or config.partitioned_job_tracking
if pjt_config.get("zk_client"):
zk_client = pjt_config["zk_client"]
elif pjt_config.get("zk_hosts"):
zk_client = KazooClient(pjt_config.get("zk_hosts"))
else:
raise ConfigException("Failed to construct zk_client")
# Determine ZooKeeper prefix
base_prefix = get_backend_config().zookeeper_prefix or config.zookeeper_prefix
assert len(base_prefix.replace("/", "")) >= 3
partitioned_jobs_prefix = config.partitioned_job_tracking.get("zookeeper_prefix", cls.NAMESPACE)
partitioned_jobs_prefix = pjt_config.get("zookeeper_prefix", cls.NAMESPACE)
prefix = strip_join("/", base_prefix, partitioned_jobs_prefix)
return cls(client=zk_client, prefix=prefix)

Expand Down
5 changes: 0 additions & 5 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,11 +92,6 @@ def base_config(zk_client, memoizer_config) -> AggregatorConfig:

conf.memoizer = memoizer_config

conf.partitioned_job_tracking = {
"zk_client": zk_client,
}


return conf


Expand Down
23 changes: 22 additions & 1 deletion tests/partitionedjobs/test_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,12 @@
AggregatorBatchJobs,
)
from openeo_aggregator.partitionedjobs.zookeeper import ZooKeeperPartitionedJobDB
from openeo_aggregator.testing import approx_str_contains, approx_str_prefix, clock_mock
from openeo_aggregator.testing import (
approx_str_contains,
approx_str_prefix,
clock_mock,
config_overrides,
)
from openeo_aggregator.utils import BoundingBox

from .conftest import (
Expand Down Expand Up @@ -63,6 +68,11 @@ def dummy2(backend2, requests_mock) -> DummyBackend:
class TestFlimsyBatchJobSplitting:
now = _Now("2022-01-19T12:34:56Z")

@pytest.fixture(autouse=True)
def _partitioned_job_tracking(self, zk_client):
with config_overrides(partitioned_job_tracking={"zk_client": zk_client}):
yield

@now.mock
def test_create_job_basic(self, api100, zk_db, dummy1):
api100.set_auth_bearer_token(token=TEST_USER_BEARER_TOKEN)
Expand Down Expand Up @@ -454,6 +464,12 @@ class TestTileGridBatchJobSplitting:
}
}


@pytest.fixture(autouse=True)
def _partitioned_job_tracking(self, zk_client):
with config_overrides(partitioned_job_tracking={"zk_client": zk_client}):
yield

@now.mock
def test_create_job_basic(self, flask_app, api100, zk_db, dummy1):
api100.set_auth_bearer_token(token=TEST_USER_BEARER_TOKEN)
Expand Down Expand Up @@ -626,6 +642,11 @@ def test_job_results_basic(self, flask_app, api100, dummy1):
class TestCrossBackendSplitting:
now = _Now("2022-01-19T12:34:56Z")

@pytest.fixture(autouse=True)
def _partitioned_job_tracking(self, zk_client):
with config_overrides(partitioned_job_tracking={"zk_client": zk_client}):
yield

@now.mock
def test_create_job_simple(self, flask_app, api100, zk_db, dummy1):
"""Handling of single "load_collection" process graph"""
Expand Down
9 changes: 5 additions & 4 deletions tests/test_app.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

from openeo_aggregator.app import create_app
from openeo_aggregator.config import AggregatorConfig
from openeo_aggregator.testing import config_overrides

from .conftest import get_api100, get_flask_app

Expand All @@ -17,7 +18,7 @@ def test_create_app(config: AggregatorConfig):
({"zk_client": "dummy"}, True),
])
def test_create_app_no_partitioned_job_tracking(config: AggregatorConfig, partitioned_job_tracking, expected):
config.partitioned_job_tracking = partitioned_job_tracking
api100 = get_api100(get_flask_app(config))
res = api100.get("/").assert_status_code(200).json
assert res["_partitioned_job_tracking"] is expected
with config_overrides(partitioned_job_tracking=partitioned_job_tracking):
api100 = get_api100(get_flask_app(config))
res = api100.get("/").assert_status_code(200).json
assert res["_partitioned_job_tracking"] is expected

0 comments on commit c82a212

Please sign in to comment.