From 003302642e46de16ffb8fbf82734cd989d05cef6 Mon Sep 17 00:00:00 2001 From: Ricardo Garcia Silva Date: Mon, 7 Oct 2024 19:00:50 +0100 Subject: [PATCH] Update seasonal measurements when new ones are available externally (#264) * Fixed incorrect generation of seasonal measurement id key, which was preventing new measurements from being found * Fixed incorrect default value for a coverage smoothing parameter It is now possible to call the respective path operation without supplying the value, as originally intended --- .../observations_harvester/operations.py | 270 ------------------ arpav_ppcv/prefect/flows/observations.py | 53 +++- arpav_ppcv/webapp/api_v2/routers/coverages.py | 6 +- tests/test_prefect_flows_observations.py | 81 ++++++ 4 files changed, 129 insertions(+), 281 deletions(-) create mode 100644 tests/test_prefect_flows_observations.py diff --git a/arpav_ppcv/observations_harvester/operations.py b/arpav_ppcv/observations_harvester/operations.py index 51b79367..2aceaed3 100644 --- a/arpav_ppcv/observations_harvester/operations.py +++ b/arpav_ppcv/observations_harvester/operations.py @@ -1,13 +1,11 @@ import datetime as dt import logging -import uuid from collections.abc import ( Generator, Sequence, ) from typing import ( Callable, - Optional, ) import geojson_pydantic @@ -15,12 +13,7 @@ import pyproj import shapely import shapely.ops -import sqlmodel -from .. import ( - database, -) -from ..schemas.base import Season from ..schemas import observations logger = logging.getLogger(__name__) @@ -139,266 +132,3 @@ def harvest_stations( ): stations.add(parse_station(raw_station, coord_converter)) return stations - - -def harvest_monthly_measurements( - client: httpx.Client, - db_session: sqlmodel.Session, - station_id: Optional[uuid.UUID] = None, - variable_id: Optional[uuid.UUID] = None, -) -> list[observations.MonthlyMeasurementCreate]: - existing_stations = _get_stations(db_session, station_id) - existing_variables = _get_variables(db_session, variable_id) - monthly_measurements_create = [] - for station_idx, station in enumerate(existing_stations): - logger.info( - f"Processing station {station.code!r} ({station_idx+1}/" - f"{len(existing_stations)})..." - ) - for var_idx, variable in enumerate(existing_variables): - logger.info( - f"\tProcessing variable {variable.name!r} ({var_idx+1}/" - f"{len(existing_variables)})..." - ) - for month in range(1, 13): - logger.info(f"\t\tProcessing month {month!r} ({month}/12)...") - existing_measurements = database.collect_all_monthly_measurements( - db_session, - station_id_filter=station.id, - variable_id_filter=variable.id, - month_filter=month, - ) - existing = {} - for db_measurement in existing_measurements: - measurement_id = build_monthly_measurement_id(db_measurement) - existing[measurement_id] = db_measurement - response = client.get( - "https://api.arpa.veneto.it/REST/v1/clima_indicatori", - params={ - "statcd": station.code, - "indicatore": variable.name, - "tabella": "M", - "periodo": month, - }, - ) - response.raise_for_status() - for raw_measurement in response.json().get("data", []): - monthly_measurement_create = observations.MonthlyMeasurementCreate( - station_id=station.id, - variable_id=variable.id, - value=raw_measurement["valore"], - date=dt.date(raw_measurement["anno"], month, 1), - ) - measurement_id = build_monthly_measurement_id( - monthly_measurement_create - ) - if measurement_id not in existing: - monthly_measurements_create.append(monthly_measurement_create) - return monthly_measurements_create - - -def refresh_monthly_measurements( - client: httpx.Client, - db_session: sqlmodel.Session, - station_id: Optional[uuid.UUID] = None, - variable_id: Optional[uuid.UUID] = None, -) -> list[observations.MonthlyMeasurement]: - to_create = harvest_monthly_measurements( - client, db_session, station_id=station_id, variable_id=variable_id - ) - logger.info(f"About to create {len(to_create)} monthly measurements...") - created_monthly_measurements = database.create_many_monthly_measurements( - db_session, to_create - ) - return created_monthly_measurements - - -def harvest_seasonal_measurements( - client: httpx.Client, - db_session: sqlmodel.Session, - station_id: Optional[uuid.UUID] = None, - variable_id: Optional[uuid.UUID] = None, -) -> list[observations.SeasonalMeasurementCreate]: - existing_stations = _get_stations(db_session, station_id) - existing_variables = _get_variables(db_session, variable_id) - measurements_create = [] - for station_idx, station in enumerate(existing_stations): - logger.info( - f"Processing station {station.code!r} ({station_idx+1}/" - f"{len(existing_stations)})..." - ) - for var_idx, variable in enumerate(existing_variables): - logger.info( - f"\tProcessing variable {variable.name!r} ({var_idx+1}/" - f"{len(existing_variables)})..." - ) - for current_season in Season: - logger.info(f"\t\tProcessing season {current_season!r}...") - existing_measurements = database.collect_all_seasonal_measurements( - db_session, - station_id_filter=station.id, - variable_id_filter=variable.id, - season_filter=current_season, - ) - existing = {} - for db_measurement in existing_measurements: - measurement_id = build_seasonal_measurement_id(db_measurement) - existing[measurement_id] = db_measurement - - season_query_param = { - Season.WINTER: 1, - Season.SPRING: 2, - Season.SUMMER: 3, - Season.AUTUMN: 4, - }[current_season] - response = client.get( - "https://api.arpa.veneto.it/REST/v1/clima_indicatori", - params={ - "statcd": station.code, - "indicatore": variable.name, - "tabella": "S", - "periodo": season_query_param, - }, - ) - response.raise_for_status() - for raw_measurement in response.json().get("data", []): - measurement_create = observations.SeasonalMeasurementCreate( - station_id=station.id, - variable_id=variable.id, - value=raw_measurement["valore"], - year=int(raw_measurement["anno"]), - season=current_season, - ) - measurement_id = build_seasonal_measurement_id(measurement_create) - if measurement_id not in existing: - measurements_create.append(measurement_create) - return measurements_create - - -def refresh_seasonal_measurements( - client: httpx.Client, - db_session: sqlmodel.Session, - station_id: Optional[uuid.UUID] = None, - variable_id: Optional[uuid.UUID] = None, -) -> list[observations.SeasonalMeasurement]: - to_create = harvest_seasonal_measurements( - client, db_session, station_id=station_id, variable_id=variable_id - ) - logger.info(f"About to create {len(to_create)} seasonal measurements...") - created_measurements = database.create_many_seasonal_measurements( - db_session, to_create - ) - return created_measurements - - -def build_monthly_measurement_id( - measurement: observations.MonthlyMeasurement - | observations.MonthlyMeasurementCreate, -) -> str: - return ( - f"{measurement.station_id}-{measurement.variable_id}-" - f"{measurement.date.strftime('%Y%m')}" - ) - - -def build_seasonal_measurement_id( - measurement: observations.SeasonalMeasurement - | observations.SeasonalMeasurementCreate, -) -> str: - return ( - f"{measurement.station_id}-{measurement.variable_id}-" - f"{measurement.season.value}" - ) - - -def build_yearly_measurement_id( - measurement: observations.YearlyMeasurement | observations.YearlyMeasurementCreate, -) -> str: - return f"{measurement.station_id}-{measurement.variable_id}-{measurement.year}" - - -def _get_stations( - db_session: sqlmodel.Session, station_id: Optional[uuid.UUID] -) -> list[observations.Station]: - if station_id is not None: - result = [database.get_station(db_session, station_id)] - else: - result = database.collect_all_stations(db_session) - return result - - -def _get_variables( - db_session: sqlmodel.Session, variable_id: Optional[uuid.UUID] -) -> list[observations.Variable]: - if variable_id is not None: - result = [database.get_variable(db_session, variable_id)] - else: - result = database.collect_all_variables(db_session) - return result - - -def harvest_yearly_measurements( - client: httpx.Client, - db_session: sqlmodel.Session, - station_id: Optional[uuid.UUID] = None, - variable_id: Optional[uuid.UUID] = None, -) -> list[observations.YearlyMeasurementCreate]: - existing_stations = _get_stations(db_session, station_id) - existing_variables = _get_variables(db_session, variable_id) - yearly_measurements_create = [] - for station_idx, station in enumerate(existing_stations): - logger.info( - f"Processing station {station.code!r} ({station_idx+1}/" - f"{len(existing_stations)})..." - ) - for var_idx, variable in enumerate(existing_variables): - logger.info( - f"\tProcessing variable {variable.name!r} ({var_idx+1}/" - f"{len(existing_variables)})..." - ) - existing_measurements = database.collect_all_yearly_measurements( - db_session, - station_id_filter=station.id, - variable_id_filter=variable.id, - ) - existing = {} - for db_measurement in existing_measurements: - measurement_id = build_yearly_measurement_id(db_measurement) - existing[measurement_id] = db_measurement - response = client.get( - "https://api.arpa.veneto.it/REST/v1/clima_indicatori", - params={ - "statcd": station.code, - "indicatore": variable.name, - "tabella": "A", - "periodo": "0", - }, - ) - response.raise_for_status() - for raw_measurement in response.json().get("data", []): - yearly_measurement_create = observations.YearlyMeasurementCreate( - station_id=station.id, - variable_id=variable.id, - value=raw_measurement["valore"], - year=int(raw_measurement["anno"]), - ) - measurement_id = build_yearly_measurement_id(yearly_measurement_create) - if measurement_id not in existing: - yearly_measurements_create.append(yearly_measurement_create) - return yearly_measurements_create - - -def refresh_yearly_measurements( - client: httpx.Client, - db_session: sqlmodel.Session, - station_id: Optional[uuid.UUID] = None, - variable_id: Optional[uuid.UUID] = None, -) -> list[observations.YearlyMeasurement]: - to_create = harvest_yearly_measurements( - client, db_session, station_id=station_id, variable_id=variable_id - ) - logger.info(f"About to create {len(to_create)} yearly measurements...") - created_measurements = database.create_many_yearly_measurements( - db_session, to_create - ) - return created_measurements diff --git a/arpav_ppcv/prefect/flows/observations.py b/arpav_ppcv/prefect/flows/observations.py index f0f65c6a..6a7ca500 100644 --- a/arpav_ppcv/prefect/flows/observations.py +++ b/arpav_ppcv/prefect/flows/observations.py @@ -175,7 +175,7 @@ def harvest_monthly_measurements( ) existing = {} for db_measurement in existing_measurements: - measurement_id = operations.build_monthly_measurement_id(db_measurement) + measurement_id = build_monthly_measurement_id(db_measurement) existing[measurement_id] = db_measurement response = client.get( "https://api.arpa.veneto.it/REST/v1/clima_indicatori", @@ -195,7 +195,7 @@ def harvest_monthly_measurements( value=raw_measurement["valore"], date=dt.date(raw_measurement["anno"], month, 1), ) - measurement_id = operations.build_monthly_measurement_id(measurement_create) + measurement_id = build_monthly_measurement_id(measurement_create) if measurement_id not in existing: to_create.append(measurement_create) return to_create @@ -272,7 +272,7 @@ def harvest_seasonal_measurements( ) existing = {} for db_measurement in existing_measurements: - measurement_id = operations.build_seasonal_measurement_id(db_measurement) + measurement_id = build_seasonal_measurement_id(db_measurement) existing[measurement_id] = db_measurement season_query_param = { @@ -300,7 +300,7 @@ def harvest_seasonal_measurements( year=int(raw_measurement["anno"]), season=season, ) - measurement_id = operations.build_seasonal_measurement_id(measurement_create) + measurement_id = build_seasonal_measurement_id(measurement_create) if measurement_id not in existing: to_create.append(measurement_create) return to_create @@ -372,7 +372,7 @@ def harvest_yearly_measurements( ) existing = {} for db_measurement in existing_measurements: - measurement_id = operations.build_yearly_measurement_id(db_measurement) + measurement_id = build_yearly_measurement_id(db_measurement) existing[measurement_id] = db_measurement response = client.get( "https://api.arpa.veneto.it/REST/v1/clima_indicatori", @@ -391,9 +391,7 @@ def harvest_yearly_measurements( value=raw_measurement["valore"], year=int(raw_measurement["anno"]), ) - measurement_id = operations.build_yearly_measurement_id( - yearly_measurement_create - ) + measurement_id = build_yearly_measurement_id(yearly_measurement_create) if measurement_id not in existing: to_create.append(yearly_measurement_create) return to_create @@ -509,3 +507,42 @@ def _build_created_measurements_table( } ) return table_contents + + +def build_monthly_measurement_id( + measurement: observations.MonthlyMeasurement + | observations.MonthlyMeasurementCreate, +) -> str: + return "-".join( + ( + str(measurement.station_id), + str(measurement.variable_id), + measurement.date.strftime("%Y%m"), + ) + ) + + +def build_seasonal_measurement_id( + measurement: observations.SeasonalMeasurement + | observations.SeasonalMeasurementCreate, +) -> str: + return "-".join( + ( + str(measurement.station_id), + str(measurement.variable_id), + str(measurement.year), + measurement.season.value, + ) + ) + + +def build_yearly_measurement_id( + measurement: observations.YearlyMeasurement | observations.YearlyMeasurementCreate, +) -> str: + return "-".join( + ( + str(measurement.station_id), + str(measurement.variable_id), + str(measurement.year), + ) + ) diff --git a/arpav_ppcv/webapp/api_v2/routers/coverages.py b/arpav_ppcv/webapp/api_v2/routers/coverages.py index 3538d939..a20b61c8 100644 --- a/arpav_ppcv/webapp/api_v2/routers/coverages.py +++ b/arpav_ppcv/webapp/api_v2/routers/coverages.py @@ -655,9 +655,9 @@ def get_time_series( ) ), ] = False, - coverage_data_smoothing: Annotated[list[CoverageDataSmoothingStrategy], Query()] = [ # noqa - ObservationDataSmoothingStrategy.NO_SMOOTHING - ], + coverage_data_smoothing: Annotated[list[CoverageDataSmoothingStrategy], Query()] = [ + CoverageDataSmoothingStrategy.NO_SMOOTHING + ], # noqa observation_data_smoothing: Annotated[ list[ObservationDataSmoothingStrategy], Query() ] = [ObservationDataSmoothingStrategy.NO_SMOOTHING], # noqa diff --git a/tests/test_prefect_flows_observations.py b/tests/test_prefect_flows_observations.py new file mode 100644 index 00000000..02d587f1 --- /dev/null +++ b/tests/test_prefect_flows_observations.py @@ -0,0 +1,81 @@ +import datetime as dt +import uuid + +import pytest + +from arpav_ppcv.prefect.flows import observations +from arpav_ppcv.schemas import observations as observation_schemas +from arpav_ppcv.schemas.base import Season + + +@pytest.mark.parametrize( + "station_id, variable_id, value, date, expected", + [ + pytest.param( + uuid.UUID("65af54f0-1df2-423b-994f-03fa1195dd7b"), + uuid.UUID("2a20f72f-2e0f-4a0c-9cd4-d3215aa9b8fc"), + 10.23, + dt.date(2020, 1, 1), + "65af54f0-1df2-423b-994f-03fa1195dd7b-2a20f72f-2e0f-4a0c-9cd4-d3215aa9b8fc-202001", + ), + ], +) +def test_build_monthly_measurement_id(station_id, variable_id, value, date, expected): + result = observations.build_monthly_measurement_id( + observation_schemas.MonthlyMeasurementCreate( + station_id=station_id, variable_id=variable_id, value=value, date=date + ) + ) + assert result == expected + + +@pytest.mark.parametrize( + "station_id, variable_id, value, year, season, expected", + [ + pytest.param( + uuid.UUID("65af54f0-1df2-423b-994f-03fa1195dd7b"), + uuid.UUID("2a20f72f-2e0f-4a0c-9cd4-d3215aa9b8fc"), + 10.23, + 2020, + Season.SUMMER, + "65af54f0-1df2-423b-994f-03fa1195dd7b-2a20f72f-2e0f-4a0c-9cd4-d3215aa9b8fc-2020-SUMMER", + ), + ], +) +def test_build_seasonal_measurement_id( + station_id, variable_id, value, year, season, expected +): + result = observations.build_seasonal_measurement_id( + observation_schemas.SeasonalMeasurementCreate( + station_id=station_id, + variable_id=variable_id, + value=value, + year=year, + season=season, + ) + ) + assert result == expected + + +@pytest.mark.parametrize( + "station_id, variable_id, value, year, expected", + [ + pytest.param( + uuid.UUID("65af54f0-1df2-423b-994f-03fa1195dd7b"), + uuid.UUID("2a20f72f-2e0f-4a0c-9cd4-d3215aa9b8fc"), + 10.23, + 2020, + "65af54f0-1df2-423b-994f-03fa1195dd7b-2a20f72f-2e0f-4a0c-9cd4-d3215aa9b8fc-2020", + ), + ], +) +def test_build_yearly_measurement_id(station_id, variable_id, value, year, expected): + result = observations.build_yearly_measurement_id( + observation_schemas.YearlyMeasurementCreate( + station_id=station_id, + variable_id=variable_id, + value=value, + year=year, + ) + ) + assert result == expected