Skip to content

Commit

Permalink
Update seasonal measurements when new ones are available externally (#…
Browse files Browse the repository at this point in the history
…264)

* Fixed incorrect generation of seasonal measurement id key, which was preventing new measurements from being found

* Fixed incorrect default value for a coverage smoothing parameter

It is now possible to call the respective path operation without supplying the value, as originally intended
  • Loading branch information
ricardogsilva authored Oct 7, 2024
1 parent 58ca886 commit 0033026
Show file tree
Hide file tree
Showing 4 changed files with 129 additions and 281 deletions.
270 changes: 0 additions & 270 deletions arpav_ppcv/observations_harvester/operations.py
Original file line number Diff line number Diff line change
@@ -1,26 +1,19 @@
import datetime as dt
import logging
import uuid
from collections.abc import (
Generator,
Sequence,
)
from typing import (
Callable,
Optional,
)

import geojson_pydantic
import httpx
import pyproj
import shapely
import shapely.ops
import sqlmodel

from .. import (
database,
)
from ..schemas.base import Season
from ..schemas import observations

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -139,266 +132,3 @@ def harvest_stations(
):
stations.add(parse_station(raw_station, coord_converter))
return stations


def harvest_monthly_measurements(
client: httpx.Client,
db_session: sqlmodel.Session,
station_id: Optional[uuid.UUID] = None,
variable_id: Optional[uuid.UUID] = None,
) -> list[observations.MonthlyMeasurementCreate]:
existing_stations = _get_stations(db_session, station_id)
existing_variables = _get_variables(db_session, variable_id)
monthly_measurements_create = []
for station_idx, station in enumerate(existing_stations):
logger.info(
f"Processing station {station.code!r} ({station_idx+1}/"
f"{len(existing_stations)})..."
)
for var_idx, variable in enumerate(existing_variables):
logger.info(
f"\tProcessing variable {variable.name!r} ({var_idx+1}/"
f"{len(existing_variables)})..."
)
for month in range(1, 13):
logger.info(f"\t\tProcessing month {month!r} ({month}/12)...")
existing_measurements = database.collect_all_monthly_measurements(
db_session,
station_id_filter=station.id,
variable_id_filter=variable.id,
month_filter=month,
)
existing = {}
for db_measurement in existing_measurements:
measurement_id = build_monthly_measurement_id(db_measurement)
existing[measurement_id] = db_measurement
response = client.get(
"https://api.arpa.veneto.it/REST/v1/clima_indicatori",
params={
"statcd": station.code,
"indicatore": variable.name,
"tabella": "M",
"periodo": month,
},
)
response.raise_for_status()
for raw_measurement in response.json().get("data", []):
monthly_measurement_create = observations.MonthlyMeasurementCreate(
station_id=station.id,
variable_id=variable.id,
value=raw_measurement["valore"],
date=dt.date(raw_measurement["anno"], month, 1),
)
measurement_id = build_monthly_measurement_id(
monthly_measurement_create
)
if measurement_id not in existing:
monthly_measurements_create.append(monthly_measurement_create)
return monthly_measurements_create


def refresh_monthly_measurements(
client: httpx.Client,
db_session: sqlmodel.Session,
station_id: Optional[uuid.UUID] = None,
variable_id: Optional[uuid.UUID] = None,
) -> list[observations.MonthlyMeasurement]:
to_create = harvest_monthly_measurements(
client, db_session, station_id=station_id, variable_id=variable_id
)
logger.info(f"About to create {len(to_create)} monthly measurements...")
created_monthly_measurements = database.create_many_monthly_measurements(
db_session, to_create
)
return created_monthly_measurements


def harvest_seasonal_measurements(
client: httpx.Client,
db_session: sqlmodel.Session,
station_id: Optional[uuid.UUID] = None,
variable_id: Optional[uuid.UUID] = None,
) -> list[observations.SeasonalMeasurementCreate]:
existing_stations = _get_stations(db_session, station_id)
existing_variables = _get_variables(db_session, variable_id)
measurements_create = []
for station_idx, station in enumerate(existing_stations):
logger.info(
f"Processing station {station.code!r} ({station_idx+1}/"
f"{len(existing_stations)})..."
)
for var_idx, variable in enumerate(existing_variables):
logger.info(
f"\tProcessing variable {variable.name!r} ({var_idx+1}/"
f"{len(existing_variables)})..."
)
for current_season in Season:
logger.info(f"\t\tProcessing season {current_season!r}...")
existing_measurements = database.collect_all_seasonal_measurements(
db_session,
station_id_filter=station.id,
variable_id_filter=variable.id,
season_filter=current_season,
)
existing = {}
for db_measurement in existing_measurements:
measurement_id = build_seasonal_measurement_id(db_measurement)
existing[measurement_id] = db_measurement

season_query_param = {
Season.WINTER: 1,
Season.SPRING: 2,
Season.SUMMER: 3,
Season.AUTUMN: 4,
}[current_season]
response = client.get(
"https://api.arpa.veneto.it/REST/v1/clima_indicatori",
params={
"statcd": station.code,
"indicatore": variable.name,
"tabella": "S",
"periodo": season_query_param,
},
)
response.raise_for_status()
for raw_measurement in response.json().get("data", []):
measurement_create = observations.SeasonalMeasurementCreate(
station_id=station.id,
variable_id=variable.id,
value=raw_measurement["valore"],
year=int(raw_measurement["anno"]),
season=current_season,
)
measurement_id = build_seasonal_measurement_id(measurement_create)
if measurement_id not in existing:
measurements_create.append(measurement_create)
return measurements_create


def refresh_seasonal_measurements(
client: httpx.Client,
db_session: sqlmodel.Session,
station_id: Optional[uuid.UUID] = None,
variable_id: Optional[uuid.UUID] = None,
) -> list[observations.SeasonalMeasurement]:
to_create = harvest_seasonal_measurements(
client, db_session, station_id=station_id, variable_id=variable_id
)
logger.info(f"About to create {len(to_create)} seasonal measurements...")
created_measurements = database.create_many_seasonal_measurements(
db_session, to_create
)
return created_measurements


def build_monthly_measurement_id(
measurement: observations.MonthlyMeasurement
| observations.MonthlyMeasurementCreate,
) -> str:
return (
f"{measurement.station_id}-{measurement.variable_id}-"
f"{measurement.date.strftime('%Y%m')}"
)


def build_seasonal_measurement_id(
measurement: observations.SeasonalMeasurement
| observations.SeasonalMeasurementCreate,
) -> str:
return (
f"{measurement.station_id}-{measurement.variable_id}-"
f"{measurement.season.value}"
)


def build_yearly_measurement_id(
measurement: observations.YearlyMeasurement | observations.YearlyMeasurementCreate,
) -> str:
return f"{measurement.station_id}-{measurement.variable_id}-{measurement.year}"


def _get_stations(
db_session: sqlmodel.Session, station_id: Optional[uuid.UUID]
) -> list[observations.Station]:
if station_id is not None:
result = [database.get_station(db_session, station_id)]
else:
result = database.collect_all_stations(db_session)
return result


def _get_variables(
db_session: sqlmodel.Session, variable_id: Optional[uuid.UUID]
) -> list[observations.Variable]:
if variable_id is not None:
result = [database.get_variable(db_session, variable_id)]
else:
result = database.collect_all_variables(db_session)
return result


def harvest_yearly_measurements(
client: httpx.Client,
db_session: sqlmodel.Session,
station_id: Optional[uuid.UUID] = None,
variable_id: Optional[uuid.UUID] = None,
) -> list[observations.YearlyMeasurementCreate]:
existing_stations = _get_stations(db_session, station_id)
existing_variables = _get_variables(db_session, variable_id)
yearly_measurements_create = []
for station_idx, station in enumerate(existing_stations):
logger.info(
f"Processing station {station.code!r} ({station_idx+1}/"
f"{len(existing_stations)})..."
)
for var_idx, variable in enumerate(existing_variables):
logger.info(
f"\tProcessing variable {variable.name!r} ({var_idx+1}/"
f"{len(existing_variables)})..."
)
existing_measurements = database.collect_all_yearly_measurements(
db_session,
station_id_filter=station.id,
variable_id_filter=variable.id,
)
existing = {}
for db_measurement in existing_measurements:
measurement_id = build_yearly_measurement_id(db_measurement)
existing[measurement_id] = db_measurement
response = client.get(
"https://api.arpa.veneto.it/REST/v1/clima_indicatori",
params={
"statcd": station.code,
"indicatore": variable.name,
"tabella": "A",
"periodo": "0",
},
)
response.raise_for_status()
for raw_measurement in response.json().get("data", []):
yearly_measurement_create = observations.YearlyMeasurementCreate(
station_id=station.id,
variable_id=variable.id,
value=raw_measurement["valore"],
year=int(raw_measurement["anno"]),
)
measurement_id = build_yearly_measurement_id(yearly_measurement_create)
if measurement_id not in existing:
yearly_measurements_create.append(yearly_measurement_create)
return yearly_measurements_create


def refresh_yearly_measurements(
client: httpx.Client,
db_session: sqlmodel.Session,
station_id: Optional[uuid.UUID] = None,
variable_id: Optional[uuid.UUID] = None,
) -> list[observations.YearlyMeasurement]:
to_create = harvest_yearly_measurements(
client, db_session, station_id=station_id, variable_id=variable_id
)
logger.info(f"About to create {len(to_create)} yearly measurements...")
created_measurements = database.create_many_yearly_measurements(
db_session, to_create
)
return created_measurements
53 changes: 45 additions & 8 deletions arpav_ppcv/prefect/flows/observations.py
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,7 @@ def harvest_monthly_measurements(
)
existing = {}
for db_measurement in existing_measurements:
measurement_id = operations.build_monthly_measurement_id(db_measurement)
measurement_id = build_monthly_measurement_id(db_measurement)
existing[measurement_id] = db_measurement
response = client.get(
"https://api.arpa.veneto.it/REST/v1/clima_indicatori",
Expand All @@ -195,7 +195,7 @@ def harvest_monthly_measurements(
value=raw_measurement["valore"],
date=dt.date(raw_measurement["anno"], month, 1),
)
measurement_id = operations.build_monthly_measurement_id(measurement_create)
measurement_id = build_monthly_measurement_id(measurement_create)
if measurement_id not in existing:
to_create.append(measurement_create)
return to_create
Expand Down Expand Up @@ -272,7 +272,7 @@ def harvest_seasonal_measurements(
)
existing = {}
for db_measurement in existing_measurements:
measurement_id = operations.build_seasonal_measurement_id(db_measurement)
measurement_id = build_seasonal_measurement_id(db_measurement)
existing[measurement_id] = db_measurement

season_query_param = {
Expand Down Expand Up @@ -300,7 +300,7 @@ def harvest_seasonal_measurements(
year=int(raw_measurement["anno"]),
season=season,
)
measurement_id = operations.build_seasonal_measurement_id(measurement_create)
measurement_id = build_seasonal_measurement_id(measurement_create)
if measurement_id not in existing:
to_create.append(measurement_create)
return to_create
Expand Down Expand Up @@ -372,7 +372,7 @@ def harvest_yearly_measurements(
)
existing = {}
for db_measurement in existing_measurements:
measurement_id = operations.build_yearly_measurement_id(db_measurement)
measurement_id = build_yearly_measurement_id(db_measurement)
existing[measurement_id] = db_measurement
response = client.get(
"https://api.arpa.veneto.it/REST/v1/clima_indicatori",
Expand All @@ -391,9 +391,7 @@ def harvest_yearly_measurements(
value=raw_measurement["valore"],
year=int(raw_measurement["anno"]),
)
measurement_id = operations.build_yearly_measurement_id(
yearly_measurement_create
)
measurement_id = build_yearly_measurement_id(yearly_measurement_create)
if measurement_id not in existing:
to_create.append(yearly_measurement_create)
return to_create
Expand Down Expand Up @@ -509,3 +507,42 @@ def _build_created_measurements_table(
}
)
return table_contents


def build_monthly_measurement_id(
measurement: observations.MonthlyMeasurement
| observations.MonthlyMeasurementCreate,
) -> str:
return "-".join(
(
str(measurement.station_id),
str(measurement.variable_id),
measurement.date.strftime("%Y%m"),
)
)


def build_seasonal_measurement_id(
measurement: observations.SeasonalMeasurement
| observations.SeasonalMeasurementCreate,
) -> str:
return "-".join(
(
str(measurement.station_id),
str(measurement.variable_id),
str(measurement.year),
measurement.season.value,
)
)


def build_yearly_measurement_id(
measurement: observations.YearlyMeasurement | observations.YearlyMeasurementCreate,
) -> str:
return "-".join(
(
str(measurement.station_id),
str(measurement.variable_id),
str(measurement.year),
)
)
Loading

0 comments on commit 0033026

Please sign in to comment.