diff --git a/.vscode/extensions.json b/.vscode/extensions.json new file mode 100644 index 0000000..c1ea73e --- /dev/null +++ b/.vscode/extensions.json @@ -0,0 +1,7 @@ +{ + "recommendations": [ + "charliermarsh.ruff", + "ms-python.python", + "ms-python.vscode-pylance" + ] +} diff --git a/.vscode/settings.json b/.vscode/settings.json index a6905db..d24b272 100644 --- a/.vscode/settings.json +++ b/.vscode/settings.json @@ -1,6 +1,12 @@ { - "python.formatting.provider": "none", "[python]": { - "editor.defaultFormatter": "ms-python.black-formatter" + "editor.formatOnSave": true, + "editor.codeActionsOnSave": { + "source.organizeImports": "explicit", + "source.fixAll": "explicit" + }, + "files.trimTrailingWhitespace": true, + "editor.defaultFormatter": "charliermarsh.ruff", + "editor.defaultFoldingRangeProvider": "charliermarsh.ruff" } } diff --git a/ddj_cloud/handler.py b/ddj_cloud/handler.py index e417d62..b62c80f 100644 --- a/ddj_cloud/handler.py +++ b/ddj_cloud/handler.py @@ -11,11 +11,11 @@ integrations=[AwsLambdaIntegration()], ) -from ddj_cloud.utils.date_and_time import local_now -from ddj_cloud.utils import storage +from ddj_cloud.utils import storage # noqa: E402 +from ddj_cloud.utils.date_and_time import local_now # noqa: E402 -def scrape(event, context): +def scrape(event, context): # noqa: ARG001 module_name = event["module_name"] with sentry_sdk.configure_scope() as scope: @@ -41,7 +41,7 @@ def scrape(event, context): try: storage.run_cloudfront_invalidations() except Exception as e: - print(f"Cloudfront invalidation failed with:") + print("Cloudfront invalidation failed with:") print(e) sentry_sdk.capture_exception(e) diff --git a/ddj_cloud/scrapers/talsperren/__init__.py b/ddj_cloud/scrapers/talsperren/__init__.py index c552562..a7fd012 100644 --- a/ddj_cloud/scrapers/talsperren/__init__.py +++ b/ddj_cloud/scrapers/talsperren/__init__.py @@ -1,5 +1,5 @@ # Load lxml because for some reason bs4 doesn't find it otherwise? -from lxml import etree # noqa: F401 +from lxml import etree # noqa: F401, I001 # Ensure that federation subclasses are loaded from . import federations # noqa: F401 diff --git a/ddj_cloud/scrapers/talsperren/common.py b/ddj_cloud/scrapers/talsperren/common.py index e2ea45d..cb5d004 100644 --- a/ddj_cloud/scrapers/talsperren/common.py +++ b/ddj_cloud/scrapers/talsperren/common.py @@ -1,11 +1,12 @@ +import datetime as dt from abc import abstractmethod +from collections.abc import Callable, Generator, Iterable from dataclasses import dataclass -import datetime as dt from io import BytesIO -from typing import Callable, Generator, Iterable, Optional, TypeVar, Protocol, TypedDict +from typing import Protocol, TypedDict, TypeVar from zoneinfo import ZoneInfo -import pandas as pd +import pandas as pd import sentry_sdk TZ_UTC = ZoneInfo("UTC") @@ -46,6 +47,18 @@ } +GELSENWASSER_DETAILED = [ + "Talsperre Haltern Nordbecken", + "Talsperre Haltern Südbecken", + "Talsperre Hullern", +] + + +GELSENWASSER_GESAMT = [ + "Talsperren Haltern und Hullern", +] + + @dataclass class ReservoirRecord: federation_name: str @@ -71,8 +84,8 @@ def __init__(self) -> None: ... def get_data( self, *, - start: Optional[dt.datetime] = None, - end: Optional[dt.datetime] = None, + start: dt.datetime | None = None, + end: dt.datetime | None = None, ) -> Iterable[ReservoirRecord]: ... @@ -81,7 +94,7 @@ def get_data( def apply_guarded( - func: Callable[[T2], Optional[T1]], + func: Callable[[T2], T1 | None], data: Iterable[T2], ) -> Generator[T1, None, None]: for item in data: diff --git a/ddj_cloud/scrapers/talsperren/exporters/__init__.py b/ddj_cloud/scrapers/talsperren/exporters/__init__.py index 33d0234..1973a27 100644 --- a/ddj_cloud/scrapers/talsperren/exporters/__init__.py +++ b/ddj_cloud/scrapers/talsperren/exporters/__init__.py @@ -1,5 +1,5 @@ -from pathlib import Path import importlib +from pathlib import Path current_dir = Path(__file__).parent diff --git a/ddj_cloud/scrapers/talsperren/exporters/current_federations.py b/ddj_cloud/scrapers/talsperren/exporters/current_federations.py index 3135225..759be70 100644 --- a/ddj_cloud/scrapers/talsperren/exporters/current_federations.py +++ b/ddj_cloud/scrapers/talsperren/exporters/current_federations.py @@ -2,6 +2,7 @@ from ddj_cloud.scrapers.talsperren.common import ( FEDERATION_RENAMES_BREAKS, + GELSENWASSER_DETAILED, Exporter, ) @@ -14,7 +15,10 @@ class CurrentFederationsExporter(Exporter): def run(self, df_base: pd.DataFrame) -> pd.DataFrame: df_base.insert(0, "id", df_base["federation_name"] + "_" + df_base["name"]) - # Gernerate map with latest data + # Only use "Haltern und Hullern Gesamt" for now, it should be more reliable + df_base = df_base[~df_base["name"].isin(GELSENWASSER_DETAILED)] + + # Generate map with latest data df_map = df_base.copy() df_map.sort_values(by=["ts_measured"], inplace=True) df_map.drop_duplicates(subset="id", keep="last", inplace=True) diff --git a/ddj_cloud/scrapers/talsperren/exporters/daily.py b/ddj_cloud/scrapers/talsperren/exporters/daily.py index 749793c..777ee56 100644 --- a/ddj_cloud/scrapers/talsperren/exporters/daily.py +++ b/ddj_cloud/scrapers/talsperren/exporters/daily.py @@ -2,9 +2,10 @@ from dateutil.relativedelta import relativedelta from ddj_cloud.scrapers.talsperren.common import ( - Exporter, - FEDERATION_RENAMES_BREAKS, FEDERATION_ORDER_SIZE, + FEDERATION_RENAMES_BREAKS, + GELSENWASSER_DETAILED, + Exporter, ) from ddj_cloud.utils.date_and_time import local_today_midnight @@ -15,6 +16,9 @@ class DailyExporter(Exporter): def run(self, df_base: pd.DataFrame) -> pd.DataFrame: df_base.insert(0, "id", df_base["federation_name"] + "_" + df_base["name"]) + # Only use "Haltern und Hullern Gesamt" for now, it should be more reliable and it has history + df_base = df_base[~df_base["name"].isin(GELSENWASSER_DETAILED)] + # Drop all data before one month ago (plus some extra so we don't underfill any medians/means) df_base = df_base.loc[ df_base["ts_measured"] > local_today_midnight() - relativedelta(months=3) @@ -30,7 +34,7 @@ def run(self, df_base: pd.DataFrame) -> pd.DataFrame: ["id"], ) .resample("D") - .aggregate( # type: ignore + .aggregate( { "content_mio_m3": "median", "capacity_mio_m3": "median", diff --git a/ddj_cloud/scrapers/talsperren/exporters/map.py b/ddj_cloud/scrapers/talsperren/exporters/map.py index 9381110..af21b8e 100644 --- a/ddj_cloud/scrapers/talsperren/exporters/map.py +++ b/ddj_cloud/scrapers/talsperren/exporters/map.py @@ -5,10 +5,12 @@ from slugify import slugify from ddj_cloud.scrapers.talsperren.common import ( - Exporter, FEDERATION_RENAMES, + GELSENWASSER_DETAILED, + GELSENWASSER_GESAMT, RESERVOIR_RENAMES, RESERVOIR_RENAMES_BREAKS, + Exporter, ) from ddj_cloud.scrapers.talsperren.federations.agger import AggerFederation from ddj_cloud.scrapers.talsperren.federations.eifel_rur import EifelRurFederation @@ -37,7 +39,7 @@ def _add_daily_fill_percent_to_map( ["id"], ) .resample("D") - .aggregate( # type: ignore + .aggregate( { "fill_percent": "median", } @@ -58,7 +60,8 @@ def _add_daily_fill_percent_to_map( # Add a new column to `df_map` for each of the last 7 days today_midnight = local_today_midnight() for days_offset in range(0, 8): - # Use Python to calculate the timestamp for correct timezone support, then convert to pandas + # Use Python to calculate the timestamp for correct timezone support, + # then convert to pandas ts = today_midnight - relativedelta(days=days_offset) ts = pd.Timestamp(ts) try: @@ -89,7 +92,7 @@ def _add_weekly_fill_percent_to_map( ["id"], ) .resample("W", closed="right", label="left") - .aggregate( # type: ignore + .aggregate( { "fill_percent": "median", } @@ -139,7 +142,7 @@ def _add_monthly_fill_percent_to_map( ["id"], ) .resample("M", closed="right", label="left") - .aggregate( # type: ignore + .aggregate( { "fill_percent": "median", } @@ -182,9 +185,19 @@ def _add_marker_size(self, df_map: pd.DataFrame) -> pd.DataFrame: ) return df_map - def run(self, df_base: pd.DataFrame, do_reservoir_rename: bool = True) -> pd.DataFrame: + def run( + self, + df_base: pd.DataFrame, + do_reservoir_rename: bool = True, + # Only use "Haltern und Hullern Gesamt" by default for now, it should be more reliable and it has history + # Overridden for filtered maps + ignored_reservoirs: list[str] | None = GELSENWASSER_DETAILED, + ) -> pd.DataFrame: df_base.insert(0, "id", df_base["federation_name"] + "_" + df_base["name"]) + if ignored_reservoirs: + df_base = df_base[~df_base["name"].isin(ignored_reservoirs)] + # Gernerate map with latest data df_map = df_base.copy() df_map.sort_values(by=["ts_measured"], inplace=True) @@ -241,8 +254,19 @@ def _make_filtered_map_exporter(federation_names: Sequence[str]) -> MapExporter: class FilteredMapExporter(MapExporter): filename = f"filtered_map_{slugify('_'.join(federation_names))}" - def run(self, df_base: pd.DataFrame) -> pd.DataFrame: - df_map = super().run(df_base, do_reservoir_rename=False) + def run( + self, + df_base: pd.DataFrame, + do_reservoir_rename: bool = False, + # For filtered maps, ignore "Haltern und Hullern Gesamt" because we don't use the + # history anyways and prefer detailed data for current fill level + ignored_reservoirs: list[str] | None = GELSENWASSER_GESAMT, + ) -> pd.DataFrame: + df_map = super().run( + df_base, + do_reservoir_rename=do_reservoir_rename, + ignored_reservoirs=ignored_reservoirs, + ) translated_names = [ FEDERATION_RENAMES.get(fed_name, fed_name) for fed_name in federation_names diff --git a/ddj_cloud/scrapers/talsperren/exporters/weekly.py b/ddj_cloud/scrapers/talsperren/exporters/weekly.py index 1f5611a..7620a2a 100644 --- a/ddj_cloud/scrapers/talsperren/exporters/weekly.py +++ b/ddj_cloud/scrapers/talsperren/exporters/weekly.py @@ -2,9 +2,10 @@ from dateutil.relativedelta import relativedelta from ddj_cloud.scrapers.talsperren.common import ( - Exporter, - FEDERATION_RENAMES_BREAKS, FEDERATION_ORDER_SIZE, + FEDERATION_RENAMES_BREAKS, + GELSENWASSER_DETAILED, + Exporter, ) from ddj_cloud.utils.date_and_time import local_today_midnight @@ -15,7 +16,11 @@ class WeeklyExporter(Exporter): def run(self, df_base: pd.DataFrame) -> pd.DataFrame: df_base.insert(0, "id", df_base["federation_name"] + "_" + df_base["name"]) - # Drop all data before one year ago (plus some extra so we don't underfill any medians/means) + # Only use "Haltern und Hullern Gesamt" for now, it should be more reliable and it has history + df_base = df_base[~df_base["name"].isin(GELSENWASSER_DETAILED)] + + # Drop all data before one year ago, + # plus some extra so we don't underfill any medians/means df_base = df_base.loc[ df_base["ts_measured"] > local_today_midnight() - relativedelta(years=1, weeks=3) ] @@ -30,7 +35,7 @@ def run(self, df_base: pd.DataFrame) -> pd.DataFrame: ["id"], ) .resample("W") - .aggregate( # type: ignore + .aggregate( { "content_mio_m3": "median", "capacity_mio_m3": "median", diff --git a/ddj_cloud/scrapers/talsperren/federations/__init__.py b/ddj_cloud/scrapers/talsperren/federations/__init__.py index 33d0234..1973a27 100644 --- a/ddj_cloud/scrapers/talsperren/federations/__init__.py +++ b/ddj_cloud/scrapers/talsperren/federations/__init__.py @@ -1,5 +1,5 @@ -from pathlib import Path import importlib +from pathlib import Path current_dir = Path(__file__).parent diff --git a/ddj_cloud/scrapers/talsperren/federations/agger.py b/ddj_cloud/scrapers/talsperren/federations/agger.py index 5a305e0..35da77c 100644 --- a/ddj_cloud/scrapers/talsperren/federations/agger.py +++ b/ddj_cloud/scrapers/talsperren/federations/agger.py @@ -1,9 +1,15 @@ import datetime as dt -from typing import Iterable +from collections.abc import Iterable import requests -from ..common import ReservoirRecord, Federation, ReservoirMeta, TZ_UTC, apply_guarded +from ddj_cloud.scrapers.talsperren.common import ( + TZ_UTC, + Federation, + ReservoirMeta, + ReservoirRecord, + apply_guarded, +) class AggerReservoirMeta(ReservoirMeta): @@ -13,7 +19,7 @@ class AggerReservoirMeta(ReservoirMeta): class AggerFederation(Federation): name = "Aggerverband" - reservoirs: dict[str, AggerReservoirMeta] = { + reservoirs: dict[str, AggerReservoirMeta] = { # type: ignore "Aggertalsperre": { "url": "https://gis.aggerverband.de/public/pegel/aggertalsperre_cm.json", "capacity_mio_m3": 17.06, @@ -58,6 +64,9 @@ def _get_reservoir_records(self, name: str) -> list[ReservoirRecord]: if row[value_idx] is not None and row[value_idx] >= 0 ] - def get_data(self, **kwargs) -> Iterable[ReservoirRecord]: + def get_data( + self, + **kwargs, # noqa: ARG002 + ) -> Iterable[ReservoirRecord]: for records in apply_guarded(self._get_reservoir_records, self.reservoirs.keys()): yield from records diff --git a/ddj_cloud/scrapers/talsperren/federations/eifel_rur.py b/ddj_cloud/scrapers/talsperren/federations/eifel_rur.py index 02b5e0c..374ffbb 100644 --- a/ddj_cloud/scrapers/talsperren/federations/eifel_rur.py +++ b/ddj_cloud/scrapers/talsperren/federations/eifel_rur.py @@ -1,9 +1,16 @@ import datetime as dt -from typing import Iterable, Literal, NotRequired, Optional +from collections.abc import Iterable +from typing import Literal, NotRequired import requests -from ..common import ReservoirMeta, ReservoirRecord, Federation, TZ_BERLIN, apply_guarded +from ddj_cloud.scrapers.talsperren.common import ( + TZ_BERLIN, + Federation, + ReservoirMeta, + ReservoirRecord, + apply_guarded, +) class EifelRurReservoirMeta(ReservoirMeta): @@ -14,7 +21,7 @@ class EifelRurReservoirMeta(ReservoirMeta): class EifelRurFederation(Federation): name = "Wasserverband Eifel-Rur" - reservoirs: dict[str, EifelRurReservoirMeta] = { + reservoirs: dict[str, EifelRurReservoirMeta] = { # type: ignore "Oleftalsperre": { "id": 6, "capacity_mio_m3": 19.30, @@ -107,9 +114,7 @@ def _get_reservoir_records(self, name: str) -> list[ReservoirRecord]: def get_data( self, - *, - start: Optional[dt.datetime] = None, - end: Optional[dt.datetime] = None, + **kwargs, # noqa: ARG002 ) -> Iterable[ReservoirRecord]: for records in apply_guarded( lambda name: self._get_reservoir_records(name), diff --git a/ddj_cloud/scrapers/talsperren/federations/gelsenwasser.py b/ddj_cloud/scrapers/talsperren/federations/gelsenwasser.py index 4cf5fd7..b3a22b4 100644 --- a/ddj_cloud/scrapers/talsperren/federations/gelsenwasser.py +++ b/ddj_cloud/scrapers/talsperren/federations/gelsenwasser.py @@ -1,46 +1,74 @@ -from typing import Generator, Iterable import re +from collections.abc import Generator, Iterable +from functools import lru_cache import bs4 import dateparser import requests import sentry_sdk -from ..common import ReservoirMeta, ReservoirRecord, Federation, TZ_BERLIN, apply_guarded +from ddj_cloud.scrapers.talsperren.common import ( + TZ_BERLIN, + Federation, + ReservoirMeta, + ReservoirRecord, + apply_guarded, +) class GelsenwasserReservoirMeta(ReservoirMeta): url: str +@lru_cache +def _get_html(url: str) -> str: + return requests.get(url).text + + class GelsenwasserFederation(Federation): name = "Gelsenwasser" - reservoirs: dict[str, GelsenwasserReservoirMeta] = { + reservoirs: dict[str, GelsenwasserReservoirMeta] = { # type: ignore "Talsperren Haltern und Hullern": { "url": "https://www.gelsenwasser.de/themen/unsere-talsperren", "capacity_mio_m3": 31.50, - # Hullern - "lat": 51.7463, - "lon": 7.2631, + # Mittig + "lat": 51.7426053, + "lon": 7.2485421, + }, + "Talsperre Haltern Nordbecken": { + "url": "https://www.gelsenwasser.de/themen/unsere-talsperren", + "capacity_mio_m3": 17.00, + "lat": 51.7491653, + "lon": 7.2207341, + }, + "Talsperre Haltern Südbecken": { + "url": "https://www.gelsenwasser.de/themen/unsere-talsperren", + "capacity_mio_m3": 3.50, + "lat": 51.7378838, + "lon": 7.210882, + }, + "Talsperre Hullern": { + "url": "https://www.gelsenwasser.de/themen/unsere-talsperren", + "capacity_mio_m3": 11.00, + "lat": 51.7457183, + "lon": 7.2882871, }, } - def _get_html(self, url: str) -> str: - return requests.get(url).text - def _get_reservoir_records( self, name: str, ) -> Generator[ReservoirRecord, None, None]: url = self.reservoirs[name]["url"] - html = self._get_html(url) + html = _get_html(url) soup = bs4.BeautifulSoup(html, "lxml") body_main: bs4.Tag = soup.find("main") # type: ignore assert body_main, "No body main found" body_text = body_main.text.strip() + body_text = body_text.replace("­", "") # Remove soft hyphens # Find timestamp from heading # Example heading: Aktueller Füllstand unserer Talsperren - Stand 30. Oktober 2023 @@ -55,22 +83,35 @@ def _get_reservoir_records( # This one is under construction, so try to get the current capacity from the text # Example (in heading): Spei­cher­volumen von 31,5 Mio. Ku­bik­metern - # May contain ­ + # May contain ­ (already stripped) capacity = self.reservoirs[name]["capacity_mio_m3"] - capacity_match = re.search(r"Speichervolumen von ([\d,]+) Mio. Kubikmetern", body_text) - if capacity_match: - capacity = float(capacity_match.group(1).replace(",", ".")) - else: - sentry_sdk.capture_message("Could not find capacity in text") + + if name == "Talsperren Haltern und Hullern": + capacity_match = re.search(r"Speichervolumen von ([\d,]+) Mio. Kubikmetern", body_text) + if capacity_match: + capacity = float(capacity_match.group(1).replace(",", ".")) + else: + sentry_sdk.capture_message("Could not find capacity in text") # Find content - # Example: Gesamt über alle Becken: 28,2 Mio. Kubikmeter / ca. 90 % - content_match = re.search( - r"Gesamt über alle Becken: ([\d,]+) Mio. Kubikmeter", - body_text, - ) - assert content_match, "Could not find content" - content_mio_m3 = float(content_match.group(1).replace(",", ".")) + if name == "Talsperren Haltern und Hullern": + # Example: Gesamt über alle Becken: 28,2 Mio. Kubikmeter / ca. 90 % + content_match = re.search( + r"Gesamt über alle Becken: ([\d,]+) Mio. Kubikmeter", + body_text, + ) + assert content_match, "Could not find content" + content_mio_m3 = float(content_match.group(1).replace(",", ".")) + else: + # Example: Tal­sperre Haltern Nord­becken: 39,18 Meter ü. NHN / 94 % + # May contain ­ (already stripped) + content_match = re.search( + name + r": ([\d,]+) Meter ü. NHN / ([\d,]+) %", + body_text, + ) + assert content_match, "Could not find content" + percent_filled = float(content_match.group(2).replace(",", ".")) + content_mio_m3 = percent_filled / 100 * capacity yield ReservoirRecord( federation_name=self.name, @@ -80,6 +121,9 @@ def _get_reservoir_records( content_mio_m3=content_mio_m3, ) - def get_data(self, **kwargs) -> Iterable[ReservoirRecord]: + def get_data( + self, + **kwargs, # noqa: ARG002 + ) -> Iterable[ReservoirRecord]: for records in apply_guarded(self._get_reservoir_records, self.reservoirs.keys()): yield from records diff --git a/ddj_cloud/scrapers/talsperren/federations/ruhr.py b/ddj_cloud/scrapers/talsperren/federations/ruhr.py index a9b72f7..ad270b6 100644 --- a/ddj_cloud/scrapers/talsperren/federations/ruhr.py +++ b/ddj_cloud/scrapers/talsperren/federations/ruhr.py @@ -1,10 +1,16 @@ -from typing import Iterable -import bs4 import datetime as dt -import requests import re +from collections.abc import Iterable + +import bs4 +import requests -from ..common import ReservoirRecord, Federation, TZ_BERLIN, apply_guarded +from ddj_cloud.scrapers.talsperren.common import ( + TZ_BERLIN, + Federation, + ReservoirRecord, + apply_guarded, +) class RuhrFederation(Federation): @@ -84,13 +90,16 @@ def _parse_coord_div(self, div: bs4.Tag) -> ReservoirRecord: content_mio_m3=content_mio_m3, ) - def get_data(self, **kwargs) -> Iterable[ReservoirRecord]: + def get_data( + self, + **kwargs, # noqa: ARG002 + ) -> Iterable[ReservoirRecord]: html = self._get_html() soup = bs4.BeautifulSoup(html, "lxml") coords_div: bs4.Tag = soup.find("div", {"id": "dam-coordinates"}) # type: ignore assert coords_div, "div with id `dam-coordinates` not found" - coord_divs: bs4.ResultSet[bs4.Tag] = coords_div.find_all("div", recursive=False) # type: ignore + coord_divs: bs4.ResultSet[bs4.Tag] = coords_div.find_all("div", recursive=False) return apply_guarded(self._parse_coord_div, coord_divs) diff --git a/ddj_cloud/scrapers/talsperren/federations/wahnbach.py b/ddj_cloud/scrapers/talsperren/federations/wahnbach.py index 2dcc566..5856dd1 100644 --- a/ddj_cloud/scrapers/talsperren/federations/wahnbach.py +++ b/ddj_cloud/scrapers/talsperren/federations/wahnbach.py @@ -1,11 +1,17 @@ -from typing import Generator, Iterable import re +from collections.abc import Generator, Iterable import bs4 import dateparser import requests -from ..common import ReservoirMeta, ReservoirRecord, Federation, TZ_BERLIN, apply_guarded +from ddj_cloud.scrapers.talsperren.common import ( + TZ_BERLIN, + Federation, + ReservoirMeta, + ReservoirRecord, + apply_guarded, +) class WahnbachReservoirMeta(ReservoirMeta): @@ -15,7 +21,7 @@ class WahnbachReservoirMeta(ReservoirMeta): class WahnbachReservoirFederation(Federation): name = "Wahnbachtalsperrenverband" - reservoirs: dict[str, WahnbachReservoirMeta] = { + reservoirs: dict[str, WahnbachReservoirMeta] = { # type: ignore "Wahnbachtalsperre": { "url": "https://www.wahnbach.de/die-wahnbachtalsperre/zahlen-und-fakten/pegelstand-stausee.html", "capacity_mio_m3": 40.92, @@ -39,7 +45,7 @@ def _get_reservoir_records( assert body_div, "No body div found" # Find headings and parse them and the following nodes - headings: bs4.ResultSet[bs4.Tag] = body_div.find_all("h5") # type: ignore + headings: bs4.ResultSet[bs4.Tag] = body_div.find_all("h5") assert len(headings) > 1, "No headings found" for heading in headings: @@ -84,7 +90,10 @@ def _get_reservoir_records( content_mio_m3=content_mio_m3, ) - def get_data(self, **kwargs) -> Iterable[ReservoirRecord]: + def get_data( + self, + **kwargs, # noqa: ARG002 + ) -> Iterable[ReservoirRecord]: for records in apply_guarded(self._get_reservoir_records, self.reservoirs.keys()): yield from records @@ -94,15 +103,19 @@ def get_data(self, **kwargs) -> Iterable[ReservoirRecord]:

Aktuelle Daten vom 22. Januar 2024:
-

Staupegel: 122,88 mNN
Stauinhalt: 38,516 Mio. m3
Füllungsgrad: 94,16 %

+

Staupegel: 122,88 mNN
Stauinhalt: 38,516 Mio. m3
+ Füllungsgrad: 94,16 %


Daten vom 15. Januar 2024:
-

Staupegel: 123,12 mNN
Stauinhalt: 38,890 Mio. m3
Füllungsgrad: 95,29 %

+

Staupegel: 123,12 mNN
Stauinhalt: 38,890 Mio. m3
+ Füllungsgrad: 95,29 %


Daten vom 08. Januar 2024:
-

Staupegel: 123,21 mNN
Stauinhalt: 39,155 Mio. m3
Füllungsgrad: 95,72 %

+

Staupegel: 123,21 mNN
Stauinhalt: 39,155 Mio. m3
+ Füllungsgrad: 95,72 %

 

Bisherige Tages-Spitzenabgaben:

(Eine normale Abgabemenge liegt bei zirka 130.000 m³/d.)

-

193 400 m³ am 03. August 1990
189.450 m³ am 07. August 2020
189.062 m³ am 06. August 2020

+

193 400 m³ am 03. August 1990
189.450 m³ am 07. August 2020
+ 189.062 m³ am 06. August 2020

 

@@ -127,7 +140,8 @@ def get_data(self, **kwargs) -> Iterable[ReservoirRecord]:
 
Bisherige Tages-Spitzenabgaben:

(Eine normale Abgabemenge liegt bei zirka 130.000 m³/d.)

-

193 400 m³ am 03. August 1990
189.450 m³ am 07. August 2020
189.062 m³ am 06. August 2020

+

193 400 m³ am 03. August 1990
189.450 m³ am 07. August 2020
+ 189.062 m³ am 06. August 2020

 

""" diff --git a/ddj_cloud/scrapers/talsperren/federations/wupper.py b/ddj_cloud/scrapers/talsperren/federations/wupper.py index ba5feb2..1309e6d 100644 --- a/ddj_cloud/scrapers/talsperren/federations/wupper.py +++ b/ddj_cloud/scrapers/talsperren/federations/wupper.py @@ -1,9 +1,15 @@ import datetime as dt -from typing import Iterable +from collections.abc import Iterable import requests -from ..common import TZ_UTC, ReservoirMeta, ReservoirRecord, Federation, apply_guarded +from ddj_cloud.scrapers.talsperren.common import ( + TZ_UTC, + Federation, + ReservoirMeta, + ReservoirRecord, + apply_guarded, +) class WupperReservoirMeta(ReservoirMeta): @@ -13,7 +19,7 @@ class WupperReservoirMeta(ReservoirMeta): class WupperFederation(Federation): name = "Wupperverband" - reservoirs: dict[str, WupperReservoirMeta] = { + reservoirs: dict[str, WupperReservoirMeta] = { # type: ignore "Bevertalsperre": { "ajax_id": "SBE$-T", "capacity_mio_m3": 23.76, @@ -143,6 +149,9 @@ def _get_reservoir_records(self, name: str) -> list[ReservoirRecord]: if row[value_idx] is not None ] - def get_data(self, **kwargs) -> Iterable[ReservoirRecord]: + def get_data( + self, + **kwargs, # noqa: ARG002 + ) -> Iterable[ReservoirRecord]: for records in apply_guarded(self._get_reservoir_records, self.reservoirs.keys()): yield from records diff --git a/ddj_cloud/scrapers/talsperren/locator_maps.py b/ddj_cloud/scrapers/talsperren/locator_maps.py index a70ea8c..cfa8fcf 100644 --- a/ddj_cloud/scrapers/talsperren/locator_maps.py +++ b/ddj_cloud/scrapers/talsperren/locator_maps.py @@ -3,14 +3,14 @@ import re from typing import Literal -from datawrapper import Datawrapper import pandas as pd import sentry_sdk +from datawrapper import Datawrapper from ddj_cloud.scrapers.talsperren.common import FEDERATION_RENAMES, RESERVOIR_RENAMES +from ddj_cloud.utils.date_and_time import BERLIN from ddj_cloud.utils.formatting import format_datetime, format_number - REVERSE_RESERVOIR_RENAMES = {v: k for k, v in RESERVOIR_RENAMES.items()} DATAWRAPPER_TOKEN = os.environ.get("TALSPERREN_DATAWRAPPER_TOKEN") @@ -51,18 +51,17 @@ def _get_color(fill_percent: float, color_map: dict) -> str: def _make_tooltip(current: dict, variant: Literal["desktop", "mobile"]) -> str: - bar_text = format_number(current["fill_percent"], places=1) + " %" bar_color = _get_color(current["fill_percent"], color_map_fill) bar_text_color = _get_color(current["fill_percent"], color_map_text) - bar_text_margin = "28px" if current["fill_percent"] < 25 else "3px" + bar_text_margin = "28px" if current["fill_percent"] < 25 else "3px" # noqa: PLR2004 name = RESERVOIR_RENAMES.get(current["name"], current["name"]) fill_percent = max(min(current["fill_percent"], 100), 0) content_mio_m3 = format_number(current["content_mio_m3"], places=2) capacity_mio_m3 = format_number(current["capacity_mio_m3"], places=2) federation_name = FEDERATION_RENAMES.get(current["federation_name"], current["federation_name"]) - ts_measured = format_datetime(current["ts_measured"]) + ts_measured = format_datetime(current["ts_measured"].astimezone(BERLIN)) width = "156px" # if variant == "desktop" else "100px" # Doesn't seem to shrink anyways font_size_header = "13px" if variant == "desktop" else "10px" @@ -93,11 +92,7 @@ def _make_tooltip(current: dict, variant: Literal["desktop", "mobile"]) -> str: -""".replace( - " ", "" - ).replace( - "\n", "" - ) +""".replace(" ", "").replace("\n", "") return tooltip_html @@ -125,6 +120,7 @@ def run(df_base: pd.DataFrame) -> None: ("cZfsi", "BL07F"), ("WSgd6", "aHate"), ("QZfQN", "kcuUG"), + ("ZyHl6", "nDeDt"), ] for chart_id_base, chart_id_live in charts: print() @@ -138,7 +134,7 @@ def run(df_base: pd.DataFrame) -> None: sentry_sdk.capture_exception(e) -def _process_chart(current: dict, dw: Datawrapper, chart_id_base: str, chart_id_live: str) -> None: +def _process_chart(current: dict, dw: Datawrapper, chart_id_base: str, chart_id_live: str) -> None: # noqa: PLR0915 chart_base = dw.get_chart(chart_id_base) chart_live = dw.get_chart(chart_id_live) @@ -185,7 +181,7 @@ def _process_chart(current: dict, dw: Datawrapper, chart_id_base: str, chart_id_ print("Updating tooltip markers") for marker in chart_data_base["markers"]: - if not marker["type"] == "point": + if marker["type"] != "point": continue match = re.match(r'Tooltipmarker: "(.*?)".*', marker["title"]) @@ -204,7 +200,9 @@ def _process_chart(current: dict, dw: Datawrapper, chart_id_base: str, chart_id_ variant = ( "desktop" if marker["visibility"]["desktop"] - else "mobile" if marker["visibility"]["mobile"] else None + else "mobile" + if marker["visibility"]["mobile"] + else None ) if variant is None: diff --git a/ddj_cloud/scrapers/talsperren/locator_maps_create_tooltip_markers.py b/ddj_cloud/scrapers/talsperren/locator_maps_create_tooltip_markers.py index 5b9aafb..1da7528 100644 --- a/ddj_cloud/scrapers/talsperren/locator_maps_create_tooltip_markers.py +++ b/ddj_cloud/scrapers/talsperren/locator_maps_create_tooltip_markers.py @@ -1,9 +1,9 @@ import os from uuid import uuid4 as uuid -from datawrapper import Datawrapper import pandas as pd import sentry_sdk +from datawrapper import Datawrapper from .locator_maps import RENAMES @@ -11,7 +11,6 @@ def _make_marker(current: dict) -> dict: - return { "title": f'Tooltipmarker: "{current["name"]}"', "id": str(uuid()), diff --git a/ddj_cloud/scrapers/talsperren/talsperren.py b/ddj_cloud/scrapers/talsperren/talsperren.py index c9807e5..2f57ed9 100644 --- a/ddj_cloud/scrapers/talsperren/talsperren.py +++ b/ddj_cloud/scrapers/talsperren/talsperren.py @@ -1,19 +1,20 @@ -from traceback import print_exc +import datetime as dt from os import getenv +from traceback import print_exc import pandas as pd -import datetime as dt import sentry_sdk from ddj_cloud.scrapers.talsperren.exporters.map import filtered_map_exporters from ddj_cloud.utils.storage import ( DownloadFailedException, + download_file, upload_dataframe, upload_file, - download_file, ) -from .common import Federation, Exporter, ReservoirMeta, to_parquet_bio + from . import locator_maps +from .common import Exporter, Federation, ReservoirMeta, to_parquet_bio IGNORE_LIST = [ "Rurtalsperre Gesamt", @@ -43,7 +44,7 @@ def _get_base_dataset(): # Instantiate all federation classes federation_classes = Federation.__subclasses__() - federations = [cls() for cls in federation_classes] # type: ignore + federations = [cls() for cls in federation_classes] # Get data from all federations data = [] @@ -62,13 +63,10 @@ def _get_base_dataset(): df_new["ts_measured"] = pd.to_datetime(df_new["ts_measured"], utc=True) # Add timestamp - df_new["ts_scraped"] = dt.datetime.now(dt.timezone.utc) + df_new["ts_scraped"] = dt.datetime.now(dt.UTC) # Merge with existing data - if not is_first_run: - df = pd.concat([df_db, df_new]) - else: - df = df_new + df = pd.concat([df_db, df_new]) if not is_first_run else df_new # Deduplicate, but keep new data if there are duplicates df = df.drop_duplicates( @@ -106,7 +104,7 @@ def _get_base_dataset(): continue df[column] = df.apply( - lambda row: metas[(row["federation_name"], row["name"])][column], + lambda row: metas[(row["federation_name"], row["name"])][column], # noqa: B023 axis=1, ) diff --git a/ddj_cloud/utils/bigquery.py b/ddj_cloud/utils/bigquery.py index 41db3ce..5e3c79a 100644 --- a/ddj_cloud/utils/bigquery.py +++ b/ddj_cloud/utils/bigquery.py @@ -1,6 +1,7 @@ """Utility functions .""" + import re -from typing import Callable, Generator +from collections.abc import Callable, Generator import pandas as pd from google.cloud import bigquery @@ -57,7 +58,7 @@ def iter_results( client: bigquery.Client, query: str, job_config: QueryJobConfig, - df_cleaner: Callable[[pd.DataFrame], pd.DataFrame] = None, + df_cleaner: Callable[[pd.DataFrame], pd.DataFrame] | None = None, ) -> Generator[pd.Series, None, None]: """ Page through the results of a query and yield each row as a pandas Series @@ -75,6 +76,7 @@ def iter_results( query_job.result() # Get reference to destination table + assert query_job.destination is not None destination = client.get_table(query_job.destination) rows = client.list_rows(destination, page_size=10000) @@ -89,8 +91,9 @@ def iter_results( _pandas_helpers.verify_pandas_imports = imports_verifier_orig for df in dfs: + df_cleaned = df if df_cleaner is not None: - df = df_cleaner(df) + df_cleaned = df_cleaner(df) - for index, row in df.iterrows(): + for _, row in df_cleaned.iterrows(): yield row diff --git a/ddj_cloud/utils/datawrapper_patched.py b/ddj_cloud/utils/datawrapper_patched.py index 0af291b..4e494f4 100644 --- a/ddj_cloud/utils/datawrapper_patched.py +++ b/ddj_cloud/utils/datawrapper_patched.py @@ -1,7 +1,7 @@ import json -from datawrapper import Datawrapper as DatawrapperOriginal import requests as r +from datawrapper import Datawrapper as DatawrapperOriginal class Datawrapper(DatawrapperOriginal): diff --git a/ddj_cloud/utils/date_and_time.py b/ddj_cloud/utils/date_and_time.py index 64fd04d..4fca767 100644 --- a/ddj_cloud/utils/date_and_time.py +++ b/ddj_cloud/utils/date_and_time.py @@ -1,7 +1,6 @@ """Provides helper functions for dates.""" import datetime as dt -from typing import List, Optional from zoneinfo import ZoneInfo BERLIN = ZoneInfo("Europe/Berlin") @@ -44,7 +43,7 @@ def local_yesterday() -> dt.date: return local_today() - dt.timedelta(days=1) -def date_range(start: dt.date, end: dt.date) -> List[dt.date]: +def date_range(start: dt.date, end: dt.date) -> list[dt.date]: """Generate a list of dates within a range. Start and end are both inclusive. @@ -61,12 +60,12 @@ def date_range(start: dt.date, end: dt.date) -> List[dt.date]: def date_param( - date: Optional[dt.date], + date: dt.date | None, *, - default: Optional[dt.date] = None, - earliest: Optional[dt.date] = None, - latest: Optional[dt.date] = None, -) -> Optional[dt.date]: + default: dt.date | None = None, + earliest: dt.date | None = None, + latest: dt.date | None = None, +) -> dt.date | None: """For when you have an optional date parameter in your function but you want to limit the range of dates allowed. Also allows you to set a default. @@ -91,7 +90,7 @@ def date_param( return date -def to_timedelta(seconds: Optional[int]) -> Optional[dt.timedelta]: +def to_timedelta(seconds: int | None) -> dt.timedelta | None: """Generate a timedelta from an int containing a number of seconds. Args: @@ -107,7 +106,7 @@ def to_timedelta(seconds: Optional[int]) -> Optional[dt.timedelta]: return None -def iso_as_local(date_time: Optional[str], tz: ZoneInfo = BERLIN) -> Optional[dt.datetime]: +def iso_as_local(date_time: str | None, tz: ZoneInfo = BERLIN) -> dt.datetime | None: """Add timezone info to timezone naive isoformat date/time string. Args: diff --git a/ddj_cloud/utils/storage.py b/ddj_cloud/utils/storage.py index 315f323..9520962 100644 --- a/ddj_cloud/utils/storage.py +++ b/ddj_cloud/utils/storage.py @@ -1,14 +1,14 @@ import os -from os.path import commonprefix as common_prefix +from collections.abc import Callable from io import BytesIO +from os.path import commonprefix as common_prefix from pathlib import Path -from typing import Any, Callable, Optional, Union +from typing import Any from uuid import uuid4 import pandas as pd - -from boto3 import client import sentry_sdk +from boto3 import client from ddj_cloud.utils.date_and_time import local_today @@ -47,7 +47,7 @@ def describe_events(*, clear: bool = True) -> list[str]: list[str]: List of events """ - def _describe(fs_event): + def _describe(fs_event): # noqa: PLR0911 if fs_event["type"] == "download": if fs_event["success"]: return f'Downloaded file "{fs_event["filename"]}" from storage' @@ -84,7 +84,7 @@ def simple_compare(old: Any, new: Any) -> bool: def make_df_compare_fn( - *, ignore_columns: Union[str, list[str], None] = None + *, ignore_columns: str | list[str] | None = None ) -> Callable[[bytes, bytes], bool]: """Create a function that can be used as the ``compare_fn`` argument to ``upload_dataframe`` to compare two pandas DataFrames by their contents while ignoring specified columns. @@ -122,11 +122,13 @@ def _download_file(filename: str) -> BytesIO: with open(LOCAL_STORAGE_ROOT / filename, "rb") as fp: bio = BytesIO(fp.read()) else: + assert s3 is not None bio = BytesIO() s3.download_fileobj(BUCKET_NAME, filename, bio) - except: - raise DownloadFailedException(f"Failed to download file {filename}") + except Exception as err: + msg = f"Failed to download file {filename}" + raise DownloadFailedException(msg) from err bio.seek(0) return bio @@ -158,8 +160,8 @@ def __upload_file( content: bytes, filename: str, *, - acl: Optional[str] = None, - content_type: Optional[str] = None, + acl: str | None = None, + content_type: str | None = None, ): bio = BytesIO(content) @@ -183,6 +185,7 @@ def __upload_file( if content_type is not None: extra_args["ContentType"] = content_type + assert s3 is not None s3.upload_fileobj( bio, BUCKET_NAME, @@ -195,8 +198,8 @@ def _upload_file( content: bytes, filename: str, *, - acl: Optional[str] = None, - content_type: Optional[str] = None, + acl: str | None = None, + content_type: str | None = None, archive: bool = True, ) -> list[str]: """Internal file upload function that performs optional achiving and storage event tracking""" @@ -223,14 +226,14 @@ def _upload_file( return filenames -def upload_file( +def upload_file( # noqa: PLR0913 content: bytes, filename: str, *, - content_type: Optional[str] = None, - change_notification: Optional[str] = None, + content_type: str | None = None, + change_notification: str | None = None, compare_fn: Callable[[bytes, bytes], bool] = simple_compare, - acl: Optional[str] = "public-read", + acl: str | None = "public-read", create_cloudfront_invalidation: bool = False, ): """Upload a file to storage. @@ -291,13 +294,13 @@ def upload_file( sentry_sdk.capture_message(change_notification) -def upload_dataframe( +def upload_dataframe( # noqa: PLR0913 df: pd.DataFrame, filename: str, *, - change_notification: Optional[str] = None, + change_notification: str | None = None, compare_fn: Callable[[bytes, bytes], bool] = simple_compare, - acl: Optional[str] = "public-read", + acl: str | None = "public-read", create_cloudfront_invalidation: bool = False, datawrapper_datetimes: bool = False, ): @@ -352,7 +355,7 @@ def _queue_cloudfront_invalidation(filename: str) -> Any: CLOUDFRONT_INVALIDATIONS_TO_CREATE.append(filename) -def run_cloudfront_invalidations(*, caller_reference: Optional[str] = None): +def run_cloudfront_invalidations(*, caller_reference: str | None = None): """Run CloudFront invalidations""" caller_reference = caller_reference or str(uuid4()) @@ -363,7 +366,8 @@ def run_cloudfront_invalidations(*, caller_reference: Optional[str] = None): invalidation_path = common_prefix(CLOUDFRONT_INVALIDATIONS_TO_CREATE) + "*" if "/" not in invalidation_path: - raise Exception("CloudFront invalidation path is too broad:", invalidation_path) + msg = f"CloudFront invalidation path is too broad: {invalidation_path}" + raise Exception(msg) if not USE_LOCAL_STORAGE and cloudfront: STORAGE_EVENTS.append({"type": "invalidation", "path": invalidation_path}) diff --git a/pyproject.toml b/pyproject.toml index 28c2070..86b24a9 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,3 +1,43 @@ -[tool.black] + +[tool.ruff] +target-version = "py311" +src = ["ddj_cloud", "scraper_template"] +indent-width = 4 line-length = 100 -target-version = ['py39'] +exclude = [] + +[tool.ruff.lint] +extend-select = [ + "ARG", # unused-arguments + "B", # bugbear + "E", # pycodestyle + "EM", # errmsg + "F", # flake8 + "I", # import order + "PYI", # pyi + "SIM", # simplify + # "T20", # print + "TCH", # type-checking + "TID", # tidy-imports + "UP", # pyupgrade + "FURB", # refurb + "W", # pycodestyle + # "D", # pydocstyle + "PL", # Pylint +] +ignore = [ + "E501", # line too long +] + + +[tool.pyright] +pythonVersion = "3.11" +include = ["ddj_cloud", "scraper_template"] +exclude = [] +typeCheckingMode = "standard" +reportUnnecessaryTypeIgnoreComment = "warning" +reportUnusedImport = "none" +# reportUnknownMemberType = "warning" +# reportUnknownVariableType = "warning" +# reportUnknownArgumentType = "warning" +# reportUnknownFunctionType = "warning"