Skip to content

Commit

Permalink
Merge branch 'issue112-config-migration'
Browse files Browse the repository at this point in the history
  • Loading branch information
soxofaan committed Mar 1, 2024
2 parents 3a1381e + c1e4549 commit 422eee8
Show file tree
Hide file tree
Showing 20 changed files with 383 additions and 703 deletions.
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,12 @@ All notable changes to this project will be documented in this file.

The format is roughly based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/).

## [0.27.0]

- Remove deprecated `AggregatorConfig` fields: `aggregator_backends`, `partitioned_job_tracking`,
`zookeeper_prefix`, `memoizer` ([#112](https://github.com/Open-EO/openeo-aggregator/issues/112))
- Remove `AggregatorConfig` usage (both in `src` and `tests`) ([#112](https://github.com/Open-EO/openeo-aggregator/issues/112))

## [0.26.0]

- Remove now unused `conf/backend_config.py` ([#112](https://github.com/Open-EO/openeo-aggregator/issues/112), [#117](https://github.com/Open-EO/openeo-aggregator/issues/117))
Expand Down
2 changes: 1 addition & 1 deletion src/openeo_aggregator/about.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
import sys
from typing import Optional

__version__ = "0.26.0a1"
__version__ = "0.27.0a1"


def log_version_info(logger: Optional[logging.Logger] = None):
Expand Down
12 changes: 4 additions & 8 deletions src/openeo_aggregator/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,10 @@
import logging
import os
from pathlib import Path
from typing import Any, List, Optional, Union
from typing import List, Optional, Union

import flask
import openeo_driver.views
from openeo_driver.config.load import ConfigGetter
from openeo_driver.util.logging import (
LOG_HANDLER_STDERR_JSON,
LOGGING_CONTEXT_FLASK,
Expand All @@ -22,12 +21,11 @@
AggregatorBackendImplementation,
MultiBackendConnection,
)
from openeo_aggregator.config import AggregatorConfig, get_config, get_config_dir

_log = logging.getLogger(__name__)


def create_app(config: Any = None, auto_logging_setup: bool = True, flask_error_handling: bool = True) -> flask.Flask:
def create_app(auto_logging_setup: bool = True, flask_error_handling: bool = True) -> flask.Flask:
"""
Flask application factory function.
"""
Expand All @@ -39,13 +37,11 @@ def create_app(config: Any = None, auto_logging_setup: bool = True, flask_error_

log_version_info(logger=_log)

config: AggregatorConfig = get_config(config)
_log.info(f"Using config: {config.config_source=!r}")

backends = MultiBackendConnection.from_config(config)
backends = MultiBackendConnection.from_config()

_log.info("Creating AggregatorBackendImplementation")
backend_implementation = AggregatorBackendImplementation(backends=backends, config=config)
backend_implementation = AggregatorBackendImplementation(backends=backends)

_log.info(f"Building Flask app with {backend_implementation=!r}")
app = openeo_driver.views.build_app(
Expand Down
40 changes: 15 additions & 25 deletions src/openeo_aggregator/backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,6 @@
CONNECTION_TIMEOUT_JOB_LOGS,
CONNECTION_TIMEOUT_JOB_START,
CONNECTION_TIMEOUT_RESULT,
AggregatorConfig,
get_backend_config,
)
from openeo_aggregator.connection import (
Expand All @@ -94,19 +93,14 @@
normalize_collection_metadata,
single_backend_collection_post_processing,
)
from openeo_aggregator.metadata.reporter import LoggerReporter
from openeo_aggregator.partitionedjobs import PartitionedJob
from openeo_aggregator.partitionedjobs.crossbackend import (
CrossBackendSplitter,
SubGraphId,
)
from openeo_aggregator.partitionedjobs.crossbackend import CrossBackendSplitter
from openeo_aggregator.partitionedjobs.splitting import FlimsySplitter, TileGridSplitter
from openeo_aggregator.partitionedjobs.tracking import (
PartitionedJobConnection,
PartitionedJobTracker,
)
from openeo_aggregator.utils import (
Clock,
FlatPG,
PGWithMetadata,
dict_merge,
Expand Down Expand Up @@ -147,9 +141,9 @@ def __jsonserde_load__(cls, data: dict):

class AggregatorCollectionCatalog(AbstractCollectionCatalog):

def __init__(self, backends: MultiBackendConnection, config: AggregatorConfig):
def __init__(self, backends: MultiBackendConnection):
self.backends = backends
self._memoizer = memoizer_from_config(config=config, namespace="CollectionCatalog")
self._memoizer = memoizer_from_config(namespace="CollectionCatalog")
self.backends.on_connections_change.add(self._memoizer.invalidate)

def get_all_metadata(self) -> List[dict]:
Expand Down Expand Up @@ -349,11 +343,10 @@ def __init__(
self,
backends: MultiBackendConnection,
catalog: AggregatorCollectionCatalog,
config: AggregatorConfig,
):
self.backends = backends
# TODO Cache per backend results instead of output?
self._memoizer = memoizer_from_config(config=config, namespace="Processing")
self._memoizer = memoizer_from_config(namespace="Processing")
self.backends.on_connections_change.add(self._memoizer.invalidate)
self._catalog = catalog

Expand Down Expand Up @@ -984,12 +977,11 @@ def __init__(
self,
backends: MultiBackendConnection,
processing: AggregatorProcessing,
config: AggregatorConfig
):
super(AggregatorSecondaryServices, self).__init__()

self._backends = backends
self._memoizer = memoizer_from_config(config=config, namespace="SecondaryServices")
self._memoizer = memoizer_from_config(namespace="SecondaryServices")
self._backends.on_connections_change.add(self._memoizer.invalidate)

self._processing = processing
Expand Down Expand Up @@ -1287,16 +1279,13 @@ class AggregatorBackendImplementation(OpenEoBackendImplementation):
# Simplify mocking time for unit tests.
_clock = time.time # TODO: centralized helper for this test pattern

def __init__(self, backends: MultiBackendConnection, config: AggregatorConfig):
def __init__(self, backends: MultiBackendConnection):
self._backends = backends
catalog = AggregatorCollectionCatalog(backends=backends, config=config)
processing = AggregatorProcessing(
backends=backends, catalog=catalog,
config=config,
)
catalog = AggregatorCollectionCatalog(backends=backends)
processing = AggregatorProcessing(backends=backends, catalog=catalog)

if get_backend_config().partitioned_job_tracking or config.partitioned_job_tracking:
partitioned_job_tracker = PartitionedJobTracker.from_config(config=config, backends=self._backends)
if get_backend_config().partitioned_job_tracking:
partitioned_job_tracker = PartitionedJobTracker.from_config(backends=self._backends)
else:
partitioned_job_tracker = None

Expand All @@ -1307,7 +1296,7 @@ def __init__(self, backends: MultiBackendConnection, config: AggregatorConfig):
partitioned_job_tracker=partitioned_job_tracker,
)

secondary_services = AggregatorSecondaryServices(backends=backends, processing=processing, config=config)
secondary_services = AggregatorSecondaryServices(backends=backends, processing=processing)
user_defined_processes = AggregatorUserDefinedProcesses(backends=backends)

super().__init__(
Expand All @@ -1317,11 +1306,10 @@ def __init__(self, backends: MultiBackendConnection, config: AggregatorConfig):
batch_jobs=batch_jobs,
user_defined_processes=user_defined_processes,
)
# TODO #112 once `AggregatorConfig.configured_oidc_providers` is eliminated, this `_configured_oidc_providers` is not necessary anymore.
self._configured_oidc_providers: List[OidcProvider] = get_backend_config().oidc_providers
self._auth_entitlement_check: Union[bool, dict] = get_backend_config().auth_entitlement_check

self._memoizer: Memoizer = memoizer_from_config(config=config, namespace="general")
self._memoizer: Memoizer = memoizer_from_config(namespace="general")
self._backends.on_connections_change.add(self._memoizer.invalidate)

# Shorter HTTP cache TTL to adapt quicker to changed back-end configurations
Expand All @@ -1330,7 +1318,9 @@ def __init__(self, backends: MultiBackendConnection, config: AggregatorConfig):
)

def oidc_providers(self) -> List[OidcProvider]:
# TODO #112 once `AggregatorConfig.configured_oidc_providers` is eliminated, this method override is not necessary anymore
# Technically, this implementation is redundant given the parent implementation
# But keeping it allows for some additional tests
# (until https://github.com/Open-EO/openeo-python-driver/issues/265 is resolved)
return self._configured_oidc_providers

def file_formats(self) -> dict:
Expand Down
23 changes: 4 additions & 19 deletions src/openeo_aggregator/background/prime_caches.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,7 @@
import contextlib
import functools
import logging
from pathlib import Path
from typing import List, Optional, Union
from typing import List, Optional

from openeo.util import TimingLogger
from openeo_driver.util.logging import (
Expand All @@ -21,12 +20,7 @@
from openeo_aggregator.about import log_version_info
from openeo_aggregator.app import get_aggregator_logging_config
from openeo_aggregator.backend import AggregatorBackendImplementation
from openeo_aggregator.config import (
OPENEO_AGGREGATOR_CONFIG,
AggregatorConfig,
get_backend_config,
get_config,
)
from openeo_aggregator.config import get_backend_config
from openeo_aggregator.connection import MultiBackendConnection

_log = logging.getLogger(__name__)
Expand All @@ -39,11 +33,6 @@
def main(args: Optional[List[str]] = None):
"""CLI entrypoint"""
cli = argparse.ArgumentParser()
cli.add_argument(
"--config",
default=None,
help=f"Optional: aggregator config to load (instead of env var {OPENEO_AGGREGATOR_CONFIG} based resolution).",
)
cli.add_argument(
"--require-zookeeper-writes",
action="store_true",
Expand Down Expand Up @@ -81,24 +70,20 @@ def main(args: Optional[List[str]] = None):
)
setup_logging(config=logging_config)
prime_caches(
config=arguments.config,
require_zookeeper_writes=arguments.require_zookeeper_writes,
fail_mode=arguments.fail_mode,
)


def prime_caches(
config: Union[str, Path, AggregatorConfig, None] = None,
require_zookeeper_writes: bool = False,
fail_mode: str = FAIL_MODE_FAILFAST,
):
log_version_info(logger=_log)
with TimingLogger(title=f"Prime caches", logger=_log):
config: AggregatorConfig = get_config(config)
_log.info(f"Using config: {config.get('config_source')=}")

backends = MultiBackendConnection.from_config(config)
backend_implementation = AggregatorBackendImplementation(backends=backends, config=config)
backends = MultiBackendConnection.from_config()
backend_implementation = AggregatorBackendImplementation(backends=backends)

if fail_mode == FAIL_MODE_FAILFAST:
# Do not intercept any exceptions.
Expand Down
19 changes: 9 additions & 10 deletions src/openeo_aggregator/caching.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
from kazoo.client import KazooClient
from openeo.util import TimingLogger

from openeo_aggregator.config import AggregatorConfig, get_backend_config
from openeo_aggregator.config import get_backend_config
from openeo_aggregator.utils import AttrStatsProxy, Clock, strip_join

DEFAULT_NAMESPACE = "_default"
Expand Down Expand Up @@ -492,12 +492,12 @@ def _zk_connect_or_not(self) -> Union[KazooClient, None]:

zk_memoizer_stats = {}

def memoizer_from_config(
config: AggregatorConfig,
namespace: str,
) -> Memoizer:

def memoizer_from_config(namespace: str) -> Memoizer:
"""Factory to create `ZkMemoizer` instance from config values."""

backend_config = get_backend_config()

def get_memoizer(memoizer_type: str, memoizer_conf: dict) -> Memoizer:
if memoizer_type == "null":
return NullMemoizer(namespace=namespace)
Expand All @@ -507,14 +507,14 @@ def get_memoizer(memoizer_type: str, memoizer_conf: dict) -> Memoizer:
return JsonDictMemoizer(namespace=namespace, default_ttl=memoizer_conf.get("default_ttl"))
elif memoizer_type == "zookeeper":
kazoo_client = KazooClient(hosts=memoizer_conf.get("zk_hosts", "localhost:2181"))
if get_backend_config().zk_memoizer_tracking:
if backend_config.zk_memoizer_tracking:
kazoo_client = AttrStatsProxy(
target=kazoo_client,
to_track=["start", "stop", "create", "get", "set"],
# TODO: better solution than using a module level global here?
stats=zk_memoizer_stats,
)
zookeeper_prefix = get_backend_config().zookeeper_prefix or config.zookeeper_prefix
zookeeper_prefix = backend_config.zookeeper_prefix
return ZkMemoizer(
client=kazoo_client,
path_prefix=f"{zookeeper_prefix}/cache/{namespace}",
Expand All @@ -530,8 +530,7 @@ def get_memoizer(memoizer_type: str, memoizer_conf: dict) -> Memoizer:
else:
raise ValueError(memoizer_type)

memoizer_config = get_backend_config().memoizer or config.memoizer
return get_memoizer(
memoizer_type=memoizer_config.get("type", "null"),
memoizer_conf=memoizer_config.get("config", {}),
memoizer_type=backend_config.memoizer.get("type", "null"),
memoizer_conf=backend_config.memoizer.get("config", {}),
)
Loading

0 comments on commit 422eee8

Please sign in to comment.