Skip to content

Commit

Permalink
Apply black to src folder
Browse files Browse the repository at this point in the history
  • Loading branch information
soxofaan committed Mar 1, 2024
1 parent 9aedf93 commit f01577e
Show file tree
Hide file tree
Showing 22 changed files with 402 additions and 500 deletions.
8 changes: 4 additions & 4 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@
exec(fp.read())

tests_require = [
'pytest>=6.2.0',
'requests-mock>=1.9.0',
"pytest>=6.2.0",
"requests-mock>=1.9.0",
]

setup(
Expand Down Expand Up @@ -52,6 +52,6 @@
"Programming Language :: Python :: 3",
"License :: OSI Approved :: Apache Software License",
"Development Status :: 3 - Alpha",
"Operating System :: OS Independent"
]
"Operating System :: OS Independent",
],
)
5 changes: 1 addition & 4 deletions src/openeo_aggregator/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@ def create_app(auto_logging_setup: bool = True, flask_error_handling: bool = Tru

log_version_info(logger=_log)


backends = MultiBackendConnection.from_config()

_log.info("Creating AggregatorBackendImplementation")
Expand All @@ -59,9 +58,7 @@ def create_app(auto_logging_setup: bool = True, flask_error_handling: bool = Tru

@app.route("/_info", methods=["GET"])
def agg_backends():
info = {
"backends": [{"id": con.id, "root_url": con.root_url} for con in backends]
}
info = {"backends": [{"id": con.id, "root_url": con.root_url} for con in backends]}
return flask.jsonify(info)

_log.info(f"Built {app=!r}")
Expand Down
275 changes: 126 additions & 149 deletions src/openeo_aggregator/backend.py

Large diffs are not rendered by default.

44 changes: 21 additions & 23 deletions src/openeo_aggregator/caching.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,8 +88,7 @@ def get_or_call(self, key, callback, ttl=None, log_on_miss=False):
else:
if log_on_miss:
with TimingLogger(
title=f"Cache miss {self.name!r} key {key!r}, calling {callback.__qualname__!r}",
logger=_log.debug
title=f"Cache miss {self.name!r} key {key!r}, calling {callback.__qualname__!r}", logger=_log.debug
):
res = callback()
else:
Expand Down Expand Up @@ -117,6 +116,7 @@ class Memoizer(metaclass=abc.ABCMeta):
Concrete classes should just implement `get_or_call` and `invalidate`.
"""

log_on_miss = True

def __init__(self, namespace: str = DEFAULT_NAMESPACE):
Expand Down Expand Up @@ -226,11 +226,8 @@ def _default(self, o: Any) -> dict:
"""Implementation of `default` parameter of `json.dump` and related"""
if o.__class__ in self._custom_types:
# TODO: also add signing with a secret?
return {"_jsonserde": {
"type": self._type_id(o.__class__),
"data": o.__jsonserde_prepare__()
}}
raise TypeError(f'Object of type {o.__class__.__name__} is not JSON serializable')
return {"_jsonserde": {"type": self._type_id(o.__class__), "data": o.__jsonserde_prepare__()}}
raise TypeError(f"Object of type {o.__class__.__name__} is not JSON serializable")

def _object_hook(self, d: dict) -> Any:
"""Implementation of `object_hook` parameter of `json.load` and related"""
Expand All @@ -243,7 +240,7 @@ def serialize(self, data: dict) -> bytes:
data = json.dumps(
obj=data,
indent=None,
separators=(',', ':'),
separators=(",", ":"),
default=self._default if self._custom_types else None,
).encode("utf8")
if len(data) > self._gzip_threshold:
Expand All @@ -252,14 +249,11 @@ def serialize(self, data: dict) -> bytes:
return data

def deserialize(self, data: bytes) -> dict:
if data[:1] == b'\x78':
if data[:1] == b"\x78":
# First byte of zlib data is practically almost always x78
_log.debug(f"JsonSerDe.deserialize: detected zlib compressed data")
data = zlib.decompress(data)
return json.loads(
s=data.decode("utf8"),
object_hook=self._object_hook if self._decode_map else None
)
return json.loads(s=data.decode("utf8"), object_hook=self._object_hook if self._decode_map else None)


# Global JSON SerDe instance
Expand Down Expand Up @@ -358,18 +352,19 @@ class ZkMemoizer(Memoizer):
count = zk_cache.get_or_call("count", callback=calculate_count)
"""

DEFAULT_TTL = 5 * 60
DEFAULT_ZK_TIMEOUT = 5

_serde = json_serde

def __init__(
self,
client: KazooClient,
path_prefix: str,
namespace: str = DEFAULT_NAMESPACE,
default_ttl: Optional[float] = None,
zk_timeout: Optional[float] = None,
self,
client: KazooClient,
path_prefix: str,
namespace: str = DEFAULT_NAMESPACE,
default_ttl: Optional[float] = None,
zk_timeout: Optional[float] = None,
):
super().__init__(namespace=namespace)
self._client = client
Expand Down Expand Up @@ -523,10 +518,13 @@ def get_memoizer(memoizer_type: str, memoizer_conf: dict) -> Memoizer:
zk_timeout=memoizer_conf.get("zk_timeout"),
)
elif memoizer_type == "chained":
return ChainedMemoizer([
get_memoizer(memoizer_type=part["type"], memoizer_conf=part["config"])
for part in memoizer_conf["parts"]
], namespace=namespace)
return ChainedMemoizer(
[
get_memoizer(memoizer_type=part["type"], memoizer_conf=part["config"])
for part in memoizer_conf["parts"]
],
namespace=namespace,
)
else:
raise ValueError(memoizer_type)

Expand Down
2 changes: 0 additions & 2 deletions src/openeo_aggregator/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,6 @@ class AggregatorConfig(dict):
config_source = dict_item()




@attrs.frozen(kw_only=True)
class AggregatorBackendConfig(OpenEoBackendConfig):

Expand Down
23 changes: 7 additions & 16 deletions src/openeo_aggregator/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@

BackendId = str


class LockedAuthException(InternalException):
def __init__(self):
super().__init__(message="Setting auth while locked.")
Expand Down Expand Up @@ -211,11 +212,11 @@ class MultiBackendConnection:
"""
Collection of multiple connections to different backends
"""

# TODO: API version management: just do single/fixed-version federation, or also handle version discovery?
# TODO: keep track of (recent) backend failures, e.g. to automatically blacklist a backend
# TODO: synchronized backend connection caching/flushing across gunicorn workers, for better consistency?


_TIMEOUT = 5

def __init__(
Expand Down Expand Up @@ -277,8 +278,7 @@ def get_connections(self) -> List[BackendConnection]:
for con in self._connections_cache.connections:
con.invalidate()
self._connections_cache = _ConnectionsCache(
expiry=now + self._connections_cache_ttl,
connections=list(self._get_connections(skip_failures=True))
expiry=now + self._connections_cache_ttl, connections=list(self._get_connections(skip_failures=True))
)
new_bids = [c.id for c in self._connections_cache.connections]
_log.debug(
Expand Down Expand Up @@ -328,9 +328,7 @@ def _get_api_versions(self) -> List[str]:

def get_api_versions(self) -> Set[ComparableVersion]:
"""Get set of API versions reported by backends"""
versions = self._memoizer.get_or_call(
key="api_versions", callback=self._get_api_versions
)
versions = self._memoizer.get_or_call(key="api_versions", callback=self._get_api_versions)
versions = set(ComparableVersion(v) for v in versions)
return versions

Expand All @@ -344,9 +342,7 @@ def api_version_maximum(self) -> ComparableVersion:
"""Get the highest API version of all back-ends"""
return max(self.get_api_versions())

def map(
self, callback: Callable[[BackendConnection], Any]
) -> Iterator[Tuple[str, Any]]:
def map(self, callback: Callable[[BackendConnection], Any]) -> Iterator[Tuple[str, Any]]:
"""
Query each backend connection with given callable and return results as iterator
Expand Down Expand Up @@ -438,10 +434,8 @@ def do_request(
return ParallelResponse(successes=successes, failures=failures)



def streaming_flask_response(
backend_response: requests.Response,
chunk_size: int = STREAM_CHUNK_SIZE_DEFAULT
backend_response: requests.Response, chunk_size: int = STREAM_CHUNK_SIZE_DEFAULT
) -> flask.Response:
"""
Convert a `requests.Response` coming from a backend
Expand All @@ -450,10 +444,7 @@ def streaming_flask_response(
:param backend_response: `requests.Response` object (possibly created with "stream" option enabled)
:param chunk_size: chunk size to use for streaming
"""
headers = [
(k, v) for (k, v) in backend_response.headers.items()
if k.lower() in ["content-type"]
]
headers = [(k, v) for (k, v) in backend_response.headers.items() if k.lower() in ["content-type"]]
return flask.Response(
# Streaming response through `iter_content` generator (https://flask.palletsprojects.com/en/2.0.x/patterns/streaming/)
response=backend_response.iter_content(chunk_size=chunk_size),
Expand Down
17 changes: 4 additions & 13 deletions src/openeo_aggregator/egi.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,10 @@
\#(?P<authority>[a-z0-9._-]+)
$
""",
flags=re.VERBOSE | re.IGNORECASE
flags=re.VERBOSE | re.IGNORECASE,
)

Entitlement = namedtuple(
"Entitlement", ["namespace", "vo", "group", "role", "authority"]
)
Entitlement = namedtuple("Entitlement", ["namespace", "vo", "group", "role", "authority"])


@functools.lru_cache(maxsize=100)
Expand Down Expand Up @@ -56,8 +54,7 @@ class UserRole:
def __init__(self, title: str):
self._title = title
self._id = "".join(
w.title() if w.islower() else w
for w in self._title.replace("-", " ").replace("_", " ").split()
w.title() if w.islower() else w for w in self._title.replace("-", " ").replace("_", " ").split()
)
self._normalized = self.normalize_role(self._title)

Expand All @@ -81,19 +78,13 @@ def entitlement_match(self, entitlement: str):
)




class OpeneoPlatformUserRoles:
def __init__(self, roles: List[UserRole]):
self.roles = roles

def extract_roles(self, entitlements: List[str]) -> List[UserRole]:
"""Extract user roles based on list of eduperson_entitlement values"""
return [
role
for role in self.roles
if any(role.entitlement_match(e) for e in entitlements)
]
return [role for role in self.roles if any(role.entitlement_match(e) for e in entitlements)]


# Standardized roles in openEO Platform EGI Virtual Organisation
Expand Down
4 changes: 2 additions & 2 deletions src/openeo_aggregator/errors.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,6 @@

class BackendLookupFailureException(OpenEOApiException):
status_code = 400
code = 'BackendLookupFailure'
message = 'Failed to determine back-end to use.'
code = "BackendLookupFailure"
message = "Failed to determine back-end to use."
_description = None
Loading

0 comments on commit f01577e

Please sign in to comment.