Skip to content

Commit

Permalink
🔀 Merge pull request #11 from wdr-data/main
Browse files Browse the repository at this point in the history
Prod merge: Gelsenwasser detailed reservoirs, Fix timezone in locator map tooltips, Ruff, PyRight
  • Loading branch information
jh0ker authored Oct 8, 2024
2 parents 72c8608 + 142a53f commit c14b3fd
Show file tree
Hide file tree
Showing 25 changed files with 342 additions and 148 deletions.
7 changes: 7 additions & 0 deletions .vscode/extensions.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
{
"recommendations": [
"charliermarsh.ruff",
"ms-python.python",
"ms-python.vscode-pylance"
]
}
10 changes: 8 additions & 2 deletions .vscode/settings.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,12 @@
{
"python.formatting.provider": "none",
"[python]": {
"editor.defaultFormatter": "ms-python.black-formatter"
"editor.formatOnSave": true,
"editor.codeActionsOnSave": {
"source.organizeImports": "explicit",
"source.fixAll": "explicit"
},
"files.trimTrailingWhitespace": true,
"editor.defaultFormatter": "charliermarsh.ruff",
"editor.defaultFoldingRangeProvider": "charliermarsh.ruff"
}
}
8 changes: 4 additions & 4 deletions ddj_cloud/handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,11 @@
integrations=[AwsLambdaIntegration()],
)

from ddj_cloud.utils.date_and_time import local_now
from ddj_cloud.utils import storage
from ddj_cloud.utils import storage # noqa: E402
from ddj_cloud.utils.date_and_time import local_now # noqa: E402


def scrape(event, context):
def scrape(event, context): # noqa: ARG001
module_name = event["module_name"]

with sentry_sdk.configure_scope() as scope:
Expand All @@ -41,7 +41,7 @@ def scrape(event, context):
try:
storage.run_cloudfront_invalidations()
except Exception as e:
print(f"Cloudfront invalidation failed with:")
print("Cloudfront invalidation failed with:")
print(e)
sentry_sdk.capture_exception(e)

Expand Down
2 changes: 1 addition & 1 deletion ddj_cloud/scrapers/talsperren/__init__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
# Load lxml because for some reason bs4 doesn't find it otherwise?
from lxml import etree # noqa: F401
from lxml import etree # noqa: F401, I001

# Ensure that federation subclasses are loaded
from . import federations # noqa: F401
Expand Down
25 changes: 19 additions & 6 deletions ddj_cloud/scrapers/talsperren/common.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
import datetime as dt
from abc import abstractmethod
from collections.abc import Callable, Generator, Iterable
from dataclasses import dataclass
import datetime as dt
from io import BytesIO
from typing import Callable, Generator, Iterable, Optional, TypeVar, Protocol, TypedDict
from typing import Protocol, TypedDict, TypeVar
from zoneinfo import ZoneInfo
import pandas as pd

import pandas as pd
import sentry_sdk

TZ_UTC = ZoneInfo("UTC")
Expand Down Expand Up @@ -46,6 +47,18 @@
}


GELSENWASSER_DETAILED = [
"Talsperre Haltern Nordbecken",
"Talsperre Haltern Südbecken",
"Talsperre Hullern",
]


GELSENWASSER_GESAMT = [
"Talsperren Haltern und Hullern",
]


@dataclass
class ReservoirRecord:
federation_name: str
Expand All @@ -71,8 +84,8 @@ def __init__(self) -> None: ...
def get_data(
self,
*,
start: Optional[dt.datetime] = None,
end: Optional[dt.datetime] = None,
start: dt.datetime | None = None,
end: dt.datetime | None = None,
) -> Iterable[ReservoirRecord]: ...


Expand All @@ -81,7 +94,7 @@ def get_data(


def apply_guarded(
func: Callable[[T2], Optional[T1]],
func: Callable[[T2], T1 | None],
data: Iterable[T2],
) -> Generator[T1, None, None]:
for item in data:
Expand Down
2 changes: 1 addition & 1 deletion ddj_cloud/scrapers/talsperren/exporters/__init__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from pathlib import Path
import importlib
from pathlib import Path

current_dir = Path(__file__).parent

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

from ddj_cloud.scrapers.talsperren.common import (
FEDERATION_RENAMES_BREAKS,
GELSENWASSER_DETAILED,
Exporter,
)

Expand All @@ -14,7 +15,10 @@ class CurrentFederationsExporter(Exporter):
def run(self, df_base: pd.DataFrame) -> pd.DataFrame:
df_base.insert(0, "id", df_base["federation_name"] + "_" + df_base["name"])

# Gernerate map with latest data
# Only use "Haltern und Hullern Gesamt" for now, it should be more reliable
df_base = df_base[~df_base["name"].isin(GELSENWASSER_DETAILED)]

# Generate map with latest data
df_map = df_base.copy()
df_map.sort_values(by=["ts_measured"], inplace=True)
df_map.drop_duplicates(subset="id", keep="last", inplace=True)
Expand Down
10 changes: 7 additions & 3 deletions ddj_cloud/scrapers/talsperren/exporters/daily.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,10 @@
from dateutil.relativedelta import relativedelta

from ddj_cloud.scrapers.talsperren.common import (
Exporter,
FEDERATION_RENAMES_BREAKS,
FEDERATION_ORDER_SIZE,
FEDERATION_RENAMES_BREAKS,
GELSENWASSER_DETAILED,
Exporter,
)
from ddj_cloud.utils.date_and_time import local_today_midnight

Expand All @@ -15,6 +16,9 @@ class DailyExporter(Exporter):
def run(self, df_base: pd.DataFrame) -> pd.DataFrame:
df_base.insert(0, "id", df_base["federation_name"] + "_" + df_base["name"])

# Only use "Haltern und Hullern Gesamt" for now, it should be more reliable and it has history
df_base = df_base[~df_base["name"].isin(GELSENWASSER_DETAILED)]

# Drop all data before one month ago (plus some extra so we don't underfill any medians/means)
df_base = df_base.loc[
df_base["ts_measured"] > local_today_midnight() - relativedelta(months=3)
Expand All @@ -30,7 +34,7 @@ def run(self, df_base: pd.DataFrame) -> pd.DataFrame:
["id"],
)
.resample("D")
.aggregate( # type: ignore
.aggregate(
{
"content_mio_m3": "median",
"capacity_mio_m3": "median",
Expand Down
40 changes: 32 additions & 8 deletions ddj_cloud/scrapers/talsperren/exporters/map.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,12 @@
from slugify import slugify

from ddj_cloud.scrapers.talsperren.common import (
Exporter,
FEDERATION_RENAMES,
GELSENWASSER_DETAILED,
GELSENWASSER_GESAMT,
RESERVOIR_RENAMES,
RESERVOIR_RENAMES_BREAKS,
Exporter,
)
from ddj_cloud.scrapers.talsperren.federations.agger import AggerFederation
from ddj_cloud.scrapers.talsperren.federations.eifel_rur import EifelRurFederation
Expand Down Expand Up @@ -37,7 +39,7 @@ def _add_daily_fill_percent_to_map(
["id"],
)
.resample("D")
.aggregate( # type: ignore
.aggregate(
{
"fill_percent": "median",
}
Expand All @@ -58,7 +60,8 @@ def _add_daily_fill_percent_to_map(
# Add a new column to `df_map` for each of the last 7 days
today_midnight = local_today_midnight()
for days_offset in range(0, 8):
# Use Python to calculate the timestamp for correct timezone support, then convert to pandas
# Use Python to calculate the timestamp for correct timezone support,
# then convert to pandas
ts = today_midnight - relativedelta(days=days_offset)
ts = pd.Timestamp(ts)
try:
Expand Down Expand Up @@ -89,7 +92,7 @@ def _add_weekly_fill_percent_to_map(
["id"],
)
.resample("W", closed="right", label="left")
.aggregate( # type: ignore
.aggregate(
{
"fill_percent": "median",
}
Expand Down Expand Up @@ -139,7 +142,7 @@ def _add_monthly_fill_percent_to_map(
["id"],
)
.resample("M", closed="right", label="left")
.aggregate( # type: ignore
.aggregate(
{
"fill_percent": "median",
}
Expand Down Expand Up @@ -182,9 +185,19 @@ def _add_marker_size(self, df_map: pd.DataFrame) -> pd.DataFrame:
)
return df_map

def run(self, df_base: pd.DataFrame, do_reservoir_rename: bool = True) -> pd.DataFrame:
def run(
self,
df_base: pd.DataFrame,
do_reservoir_rename: bool = True,
# Only use "Haltern und Hullern Gesamt" by default for now, it should be more reliable and it has history
# Overridden for filtered maps
ignored_reservoirs: list[str] | None = GELSENWASSER_DETAILED,
) -> pd.DataFrame:
df_base.insert(0, "id", df_base["federation_name"] + "_" + df_base["name"])

if ignored_reservoirs:
df_base = df_base[~df_base["name"].isin(ignored_reservoirs)]

# Gernerate map with latest data
df_map = df_base.copy()
df_map.sort_values(by=["ts_measured"], inplace=True)
Expand Down Expand Up @@ -241,8 +254,19 @@ def _make_filtered_map_exporter(federation_names: Sequence[str]) -> MapExporter:
class FilteredMapExporter(MapExporter):
filename = f"filtered_map_{slugify('_'.join(federation_names))}"

def run(self, df_base: pd.DataFrame) -> pd.DataFrame:
df_map = super().run(df_base, do_reservoir_rename=False)
def run(
self,
df_base: pd.DataFrame,
do_reservoir_rename: bool = False,
# For filtered maps, ignore "Haltern und Hullern Gesamt" because we don't use the
# history anyways and prefer detailed data for current fill level
ignored_reservoirs: list[str] | None = GELSENWASSER_GESAMT,
) -> pd.DataFrame:
df_map = super().run(
df_base,
do_reservoir_rename=do_reservoir_rename,
ignored_reservoirs=ignored_reservoirs,
)

translated_names = [
FEDERATION_RENAMES.get(fed_name, fed_name) for fed_name in federation_names
Expand Down
13 changes: 9 additions & 4 deletions ddj_cloud/scrapers/talsperren/exporters/weekly.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,10 @@
from dateutil.relativedelta import relativedelta

from ddj_cloud.scrapers.talsperren.common import (
Exporter,
FEDERATION_RENAMES_BREAKS,
FEDERATION_ORDER_SIZE,
FEDERATION_RENAMES_BREAKS,
GELSENWASSER_DETAILED,
Exporter,
)
from ddj_cloud.utils.date_and_time import local_today_midnight

Expand All @@ -15,7 +16,11 @@ class WeeklyExporter(Exporter):
def run(self, df_base: pd.DataFrame) -> pd.DataFrame:
df_base.insert(0, "id", df_base["federation_name"] + "_" + df_base["name"])

# Drop all data before one year ago (plus some extra so we don't underfill any medians/means)
# Only use "Haltern und Hullern Gesamt" for now, it should be more reliable and it has history
df_base = df_base[~df_base["name"].isin(GELSENWASSER_DETAILED)]

# Drop all data before one year ago,
# plus some extra so we don't underfill any medians/means
df_base = df_base.loc[
df_base["ts_measured"] > local_today_midnight() - relativedelta(years=1, weeks=3)
]
Expand All @@ -30,7 +35,7 @@ def run(self, df_base: pd.DataFrame) -> pd.DataFrame:
["id"],
)
.resample("W")
.aggregate( # type: ignore
.aggregate(
{
"content_mio_m3": "median",
"capacity_mio_m3": "median",
Expand Down
2 changes: 1 addition & 1 deletion ddj_cloud/scrapers/talsperren/federations/__init__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from pathlib import Path
import importlib
from pathlib import Path

current_dir = Path(__file__).parent

Expand Down
17 changes: 13 additions & 4 deletions ddj_cloud/scrapers/talsperren/federations/agger.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,15 @@
import datetime as dt
from typing import Iterable
from collections.abc import Iterable

import requests

from ..common import ReservoirRecord, Federation, ReservoirMeta, TZ_UTC, apply_guarded
from ddj_cloud.scrapers.talsperren.common import (
TZ_UTC,
Federation,
ReservoirMeta,
ReservoirRecord,
apply_guarded,
)


class AggerReservoirMeta(ReservoirMeta):
Expand All @@ -13,7 +19,7 @@ class AggerReservoirMeta(ReservoirMeta):
class AggerFederation(Federation):
name = "Aggerverband"

reservoirs: dict[str, AggerReservoirMeta] = {
reservoirs: dict[str, AggerReservoirMeta] = { # type: ignore
"Aggertalsperre": {
"url": "https://gis.aggerverband.de/public/pegel/aggertalsperre_cm.json",
"capacity_mio_m3": 17.06,
Expand Down Expand Up @@ -58,6 +64,9 @@ def _get_reservoir_records(self, name: str) -> list[ReservoirRecord]:
if row[value_idx] is not None and row[value_idx] >= 0
]

def get_data(self, **kwargs) -> Iterable[ReservoirRecord]:
def get_data(
self,
**kwargs, # noqa: ARG002
) -> Iterable[ReservoirRecord]:
for records in apply_guarded(self._get_reservoir_records, self.reservoirs.keys()):
yield from records
17 changes: 11 additions & 6 deletions ddj_cloud/scrapers/talsperren/federations/eifel_rur.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,16 @@
import datetime as dt
from typing import Iterable, Literal, NotRequired, Optional
from collections.abc import Iterable
from typing import Literal, NotRequired

import requests

from ..common import ReservoirMeta, ReservoirRecord, Federation, TZ_BERLIN, apply_guarded
from ddj_cloud.scrapers.talsperren.common import (
TZ_BERLIN,
Federation,
ReservoirMeta,
ReservoirRecord,
apply_guarded,
)


class EifelRurReservoirMeta(ReservoirMeta):
Expand All @@ -14,7 +21,7 @@ class EifelRurReservoirMeta(ReservoirMeta):
class EifelRurFederation(Federation):
name = "Wasserverband Eifel-Rur"

reservoirs: dict[str, EifelRurReservoirMeta] = {
reservoirs: dict[str, EifelRurReservoirMeta] = { # type: ignore
"Oleftalsperre": {
"id": 6,
"capacity_mio_m3": 19.30,
Expand Down Expand Up @@ -107,9 +114,7 @@ def _get_reservoir_records(self, name: str) -> list[ReservoirRecord]:

def get_data(
self,
*,
start: Optional[dt.datetime] = None,
end: Optional[dt.datetime] = None,
**kwargs, # noqa: ARG002
) -> Iterable[ReservoirRecord]:
for records in apply_guarded(
lambda name: self._get_reservoir_records(name),
Expand Down
Loading

0 comments on commit c14b3fd

Please sign in to comment.