From 56ff00de9ca203b596fa7477103a97288850711a Mon Sep 17 00:00:00 2001 From: Magnus Ulimoen Date: Fri, 15 Nov 2024 17:04:15 +0100 Subject: [PATCH] Use automatic filter handling --- .../eeareader/EEATimeseriesReader.py | 106 +++++------------- 1 file changed, 31 insertions(+), 75 deletions(-) diff --git a/src/pyaro_readers/eeareader/EEATimeseriesReader.py b/src/pyaro_readers/eeareader/EEATimeseriesReader.py index abeaa82..1b73b6c 100644 --- a/src/pyaro_readers/eeareader/EEATimeseriesReader.py +++ b/src/pyaro_readers/eeareader/EEATimeseriesReader.py @@ -1,7 +1,7 @@ import logging from datetime import datetime, timedelta from pathlib import Path -from typing import Any, Literal +from typing import Literal from collections.abc import Iterable import importlib.resources import dataclasses @@ -9,10 +9,13 @@ from tqdm import tqdm import numpy as np import polars +from pyaro.timeseries.AutoFilterReaderEngine import ( + AutoFilterReader, + AutoFilterEngine, +) from pyaro.timeseries import ( - Data, Reader, - Engine, + Data, Station, ) import pyaro.timeseries @@ -33,7 +36,9 @@ def __init__(self, data, variable: str) -> None: @property def units(self) -> str: units = self._data["Unit"].unique() - if len(units) != 1: + if len(units) == 0: + return EEAReaderException("No units present in this dataset") + elif len(units) != 1: raise EEAReaderException("Multiple different units present in this dataset") return units[0] @@ -179,28 +184,18 @@ class _DataFrame: postfilter_time: pyaro.timeseries.Filter.TimeBoundsFilter | None -class EEATimeseriesReader(Reader): - # TODO: support more filters - supported_filters: list[str] = [ - # "variables", - "time_bounds", - # time_resolution, - "stations", - "countries", - # flags, - # altitude, - ] - +class EEATimeseriesReader(AutoFilterReader): def __init__( self, filename_or_obj_or_url, - filters=None, + filters=[], enable_progressbar: bool = False, dataset: Literal["historical", "verified", "unverified"] = "unverified", station_area: str | list[str] = "all", station_type: str | list[str] = "all", metadata_file: str | None = None, ): + self._set_filters(filters) data_directory = Path(filename_or_obj_or_url) if metadata_file is None: metadata_file = data_directory.joinpath("metadata.csv") @@ -227,24 +222,13 @@ def __init__( self._metadata_pollutant ), "Pollutants are not unique" - self._filters = [] - if isinstance(filters, dict): - filters = pyaro.timeseries.FilterCollection(filters) - if filters is not None: - for filter in filters: - if filter.name() in self.supported_filters: - self._filters.append(filter) - else: - raise NotImplementedError( - f"This reader does not support filter {filter.name()}" - ) self._data_directory = data_directory self._progressbar_enabled = enable_progressbar if isinstance(station_area, str): self._station_area = [station_area] else: - self._station_area = station_are + self._station_area = station_area self._station_type = station_type if isinstance(station_type, str): @@ -258,16 +242,9 @@ def metadata(self) -> dict[str, str]: metadata["download_url"] = "https://eeadmz1-downloads-webapp.azurewebsites.net/" return metadata - def data(self, varname: str) -> Data: + def _unfiltered_data(self, varname: str) -> Data: dataframe = self._read(varname) - if dataframe.postfilter_time is None: - return EEAData(dataframe.frame, varname) - else: - stations = self.stations() - data = dataframe.frame - return dataframe.postfilter_time.filter_data( - EEAData(data, varname), stations, [varname] - ) + return EEAData(dataframe.frame, varname) def _read( self, @@ -285,7 +262,7 @@ def _read( raise EEAReaderException(f"No variable ID found for {variable}") variable_id = pollutant_candidates["Id"][0] - filters = _transform_filters(self._filters, variable_id) + filters = _transform_filters(self._get_filters(), variable_id) historical_path = self._data_directory.joinpath("historical") verified_path = self._data_directory.joinpath("verified") unverified_path = self._data_directory.joinpath("unverified") @@ -367,11 +344,11 @@ def _read( extra_filters = [] if self._station_area != ["all"]: extra_filters.append( - polars.col("Air Quality Station Area").is_in(station_area) + polars.col("Air Quality Station Area").is_in(self._station_area) ) if self._station_type != ["all"]: extra_filters.append( - polars.col("Air Quality Station Type").is_in(station_type) + polars.col("Air Quality Station Type").is_in(self._station_type) ) # OBS: Times are given in this timezone for non-daily observations @@ -402,7 +379,7 @@ def _read( return _DataFrame(frame=joined, postfilter_time=filters.time) - def variables(self) -> list[str]: + def _unfiltered_variables(self) -> list[str]: # Todo: Filtering might affect available variables pollutants = self._metadata["Air Pollutant"].unique() pollutants_metadata = self._metadata_pollutant["Notation"].unique() @@ -410,7 +387,7 @@ def variables(self) -> list[str]: common = set(pollutants).intersection(pollutants_metadata) return list(sorted(common)) - def stations(self) -> dict[str, Station]: + def _unfiltered_stations(self) -> dict[str, Station]: stations = self._metadata.with_columns( ( ( @@ -460,17 +437,9 @@ def close(self) -> None: pass -class EEATimeseriesEngine(Engine): - args: list[str] = [ - "filename_or_obj_or_url", - "enable_progressbar", - "dataset", - "station_area", - "station_type", - "metadata_file", - ] - supported_filters: list[str] = EEATimeseriesReader.supported_filters - description: str = """EEA reader for parquet files +class EEATimeseriesEngine(AutoFilterEngine): + def description(self) -> str: + return """EEA reader for parquet files Read and filter hourly data from EEA stations using the unverified dataset. @@ -495,28 +464,15 @@ class EEATimeseriesEngine(Engine): airbase unverified --path datadir/unverified/ -p SO2 -p PM10 -p O3 -p NO2 -p CO -p NO -p PM2.5 -F hourly --metadata --overwrite """ - url: str = "https://github.com/metno/pyaro-readers" - def open( - self, - filename_or_obj_or_url, - enable_progressbar: bool = False, - dataset: Literal["historical", "verified", "unverified"] = "unverified", - station_area: str | list[str] = "all", - station_type: str | list[str] = "all", - metadata_file: str | None = None, - *, - filters=None, - ): - return EEATimeseriesReader( - filename_or_obj_or_url, - enable_progressbar=enable_progressbar, - dataset=dataset, - filters=filters, - station_area=station_area, - station_type=station_type, - metadata_file=metadata_file, - ) + def url(self) -> str: + return "https://github.com/metno/pyaro-readers" + + def reader_class(self) -> AutoFilterReader: + return EEATimeseriesReader + + def open(self, filename, *args, **kwargs) -> Reader: + return self.reader_class()(filename, *args, **kwargs) # ISO 3166-1 alpha-2 for countries in EEA