Skip to content

Commit

Permalink
Merge pull request #83 from ricardogsilva/80-Add-uncertainty-data-to-…
Browse files Browse the repository at this point in the history
…coverage-time-series-endpoint

Add uncertainty data to coverage time series endpoint
  • Loading branch information
francbartoli authored May 20, 2024
2 parents 2f0e644 + 0f627f7 commit 863c80a
Show file tree
Hide file tree
Showing 13 changed files with 813 additions and 314 deletions.
10 changes: 10 additions & 0 deletions arpav_ppcv/exceptions.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
class ArpavError(Exception):
...


class InvalidCoverageIdentifierException(ArpavError):
...


class CoverageDataRetrievalError(ArpavError):
...
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
"""add-uncertainty-cov-conf-relationships
Revision ID: e8bc68ec327b
Revises: 9f2dedc5396e
Create Date: 2024-05-17 15:47:58.878242
"""
from typing import Sequence, Union

from alembic import op
import sqlalchemy as sa
import sqlmodel


# revision identifiers, used by Alembic.
revision: str = 'e8bc68ec327b'
down_revision: Union[str, None] = '9f2dedc5396e'
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None


def upgrade() -> None:
# ### commands auto generated by Alembic - please adjust! ###
op.add_column('coverageconfiguration', sa.Column('uncertainty_lower_bounds_coverage_configuration_id', sqlmodel.sql.sqltypes.GUID(), nullable=True))
op.add_column('coverageconfiguration', sa.Column('uncertainty_upper_bounds_coverage_configuration_id', sqlmodel.sql.sqltypes.GUID(), nullable=True))
op.create_foreign_key(None, 'coverageconfiguration', 'coverageconfiguration', ['uncertainty_lower_bounds_coverage_configuration_id'], ['id'])
op.create_foreign_key(None, 'coverageconfiguration', 'coverageconfiguration', ['uncertainty_upper_bounds_coverage_configuration_id'], ['id'])
# ### end Alembic commands ###


def downgrade() -> None:
# ### commands auto generated by Alembic - please adjust! ###
op.drop_constraint(None, 'coverageconfiguration', type_='foreignkey')
op.drop_constraint(None, 'coverageconfiguration', type_='foreignkey')
op.drop_column('coverageconfiguration', 'uncertainty_upper_bounds_coverage_configuration_id')
op.drop_column('coverageconfiguration', 'uncertainty_lower_bounds_coverage_configuration_id')
# ### end Alembic commands ###
187 changes: 133 additions & 54 deletions arpav_ppcv/operations.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,75 +32,85 @@ def get_coverage_time_series(
settings: ArpavPpcvSettings,
session: sqlmodel.Session,
http_client: httpx.Client,
coverage_configuration: coverages.CoverageConfiguration,
coverage_identifier: str,
coverage: coverages.CoverageInternal,
point_geom: shapely.Point,
temporal_range: str,
coverage_data_smoothing: list[base.CoverageDataSmoothingStrategy],
observation_data_smoothing: list[base.ObservationDataSmoothingStrategy],
coverage_smoothing_strategies: list[base.CoverageDataSmoothingStrategy],
observation_smoothing_strategies: list[base.ObservationDataSmoothingStrategy],
include_coverage_data: bool = True,
include_observation_data: bool = False,
include_coverage_uncertainty: bool = False,
include_coverage_related_data: bool = False,
) -> dict[str, pd.DataFrame]:
"""Retrieve time series for a coverage."""
start, end = _parse_temporal_range(temporal_range)
coverage_data_ncss_url = "/".join((
settings.thredds_server.base_url,
settings.thredds_server.netcdf_subset_service_url_fragment,
coverage_configuration.get_thredds_url_fragment(coverage_identifier)
coverage.configuration.get_thredds_url_fragment(coverage.identifier)
))
raw_coverage_data = ncss.query_dataset(
http_client,
thredds_ncss_url=coverage_data_ncss_url,
variable_name=coverage_configuration.netcdf_main_dataset_name,
variable_name=coverage.configuration.netcdf_main_dataset_name,
longitude=point_geom.x,
latitude=point_geom.y,
time_start=start,
time_end=end,
)
measurements = {}
if raw_coverage_data is not None:
if include_coverage_data:
coverage_data = _process_coverage_data(
raw_coverage_data,
coverage_configuration,
coverage_identifier,
coverage_data_smoothing,
start,
end
)
measurements[coverage_identifier] = coverage_data
if include_observation_data:
if include_coverage_data:
coverage_data = _process_coverage_data(
raw_coverage_data,
coverage.configuration.netcdf_main_dataset_name,
coverage_smoothing_strategies,
start,
end,
base_column_name=coverage.identifier
)
measurements[coverage.identifier] = coverage_data
if include_coverage_uncertainty:
has_uncertainty_cov_confs = any((
coverage.configuration.uncertainty_lower_bounds_coverage_configuration,
coverage.configuration.uncertainty_upper_bounds_coverage_configuration,
))
if has_uncertainty_cov_confs:
uncertainty_data = _get_coverage_uncertainty_time_series(
settings, http_client, coverage,
point_geom, start, end, coverage_smoothing_strategies,
)
measurements.update(**uncertainty_data)
if include_coverage_related_data:
# TODO: how to map to related data?
...
if include_observation_data:
if coverage.configuration.related_observation_variable is not None:
station_data = _get_station_data(
session,
settings,
point_geom,
coverage_configuration,
coverage_identifier,
coverage.configuration,
coverage.identifier,
)
if station_data is not None:
raw_station_data, station = station_data
data_ = _process_seasonal_station_data(
coverage_configuration.related_observation_variable,
raw_station_data,
data_smoothing=observation_data_smoothing,
time_start=start,
time_end=end
raw_station_data, observation_smoothing_strategies, start, end,
base_name=coverage.configuration.related_observation_variable.name
)
station_data_series_key = "_".join((
"station",
str(station.id),
coverage_configuration.related_observation_variable.name,
coverage.configuration.related_observation_variable.name,
))
measurements[station_data_series_key] = data_
if include_coverage_uncertainty:
# TODO: how to map to uncertainty related data?
...
if include_coverage_related_data:
# TODO: how to map to related data?
...
else:
raise RuntimeError("Could not retrieve coverage data")
else:
logger.info("No station data found, skipping...")
else:
logger.info(
"Cannot include observation data - no observation variable is related "
"to this coverage configuration"
)
return measurements


Expand Down Expand Up @@ -173,17 +183,17 @@ def _get_station_data(


def _process_seasonal_station_data(
variable: observations.Variable,
raw_data: list[observations.SeasonalMeasurement],
data_smoothing: list[base.ObservationDataSmoothingStrategy],
smoothing_strategies: list[base.ObservationDataSmoothingStrategy],
time_start: Optional[dt.datetime],
time_end: Optional[dt.datetime],
base_name: str,
) -> pd.DataFrame:
df = pd.DataFrame(
[i.model_dump() for i in raw_data]
)
df = df[["value", "season", "year"]]
df = df.rename(columns={"value": variable.name})
df = df.rename(columns={"value": base_name})

df["season_month"] = df["season"].astype("string").replace({
"Season.WINTER": "01",
Expand All @@ -195,35 +205,35 @@ def _process_seasonal_station_data(
df["year"].astype("string") + "-" + df["season_month"] + "-01",
utc=True
)
df = df[[variable.name, "time"]]
df = df[[base_name, "time"]]
df.set_index("time", inplace=True)
if time_start is not None:
df = df[time_start:]
if time_end is not None:
df = df[:time_end]
for strategy in data_smoothing:
column_name = "__".join((variable.name, strategy.value))
for strategy in smoothing_strategies:
column_name = "__".join((base_name, strategy.value))
if strategy == base.ObservationDataSmoothingStrategy.NO_SMOOTHING:
df[column_name] = df[variable.name]
df[column_name] = df[base_name]
elif strategy == base.ObservationDataSmoothingStrategy.MOVING_AVERAGE_5_YEARS:
df[column_name] = df[variable.name].rolling(window=5, center=True).mean()
df = df.drop(columns=[variable.name])
df[column_name] = df[base_name].rolling(window=5, center=True).mean()
df = df.drop(columns=[base_name])
df = df.dropna()
return df


def _process_coverage_data(
raw_data: str,
coverage_configuration: coverages.CoverageConfiguration,
coverage_identifier: str,
data_smoothing: list[base.CoverageDataSmoothingStrategy],
netcdf_main_dataset_name: str,
smoothing_strategies: list[base.CoverageDataSmoothingStrategy],
time_start: Optional[dt.datetime],
time_end: Optional[dt.datetime],
base_column_name: str
) -> pd.DataFrame:
df = pd.read_csv(io.StringIO(raw_data), parse_dates=["time"])

# get name of the colum that holds the main variable
variable_name = coverage_configuration.netcdf_main_dataset_name
variable_name = netcdf_main_dataset_name
try:
col_name = [c for c in df.columns if c.startswith(f"{variable_name}[")][0]
except IndexError:
Expand All @@ -234,32 +244,101 @@ def _process_coverage_data(
else:
# keep only time and main variable - we don't care about other stuff
df = df[["time", col_name]]
df = df.rename(columns={col_name: coverage_identifier})
df = df.rename(columns={col_name: base_column_name})

# - filter out values outside the temporal range
df.set_index("time", inplace=True)
if time_start is not None:
df = df[time_start:]
if time_end is not None:
df = df[:time_end]
for strategy in data_smoothing:
column_name = "__".join((coverage_identifier, strategy.value))
for strategy in smoothing_strategies:
column_name = "__".join((base_column_name, strategy.value))
if strategy == base.CoverageDataSmoothingStrategy.NO_SMOOTHING:
df[column_name] = df[coverage_identifier]
df[column_name] = df[base_column_name]
elif strategy == base.CoverageDataSmoothingStrategy.MOVING_AVERAGE_11_YEARS:
df[column_name] = df[coverage_identifier].rolling(
df[column_name] = df[base_column_name].rolling(
center=True, window=11).mean()
elif strategy == base.CoverageDataSmoothingStrategy.LOESS_SMOOTHING:
_, loess_smoothed, _ = loess_1d(
df.index.astype("int64"),
df[coverage_identifier],
df[base_column_name],
)
df[column_name] = loess_smoothed
df = df.drop(columns=[coverage_identifier])
df = df.drop(columns=[base_column_name])
df = df.dropna()
return df


def _get_individual_uncertainty_time_series(
settings: ArpavPpcvSettings,
http_client: httpx.Client,
used_values: list[coverages.ConfigurationParameterValue],
uncert_coverage_configuration: coverages.CoverageConfiguration,
point_geom: shapely.Point,
time_start: Optional[dt.datetime],
time_end: Optional[dt.datetime],
smoothing_strategies: list[base.CoverageDataSmoothingStrategy],
base_column_name: str
) -> pd.DataFrame:
cov_identifier = (
uncert_coverage_configuration.build_coverage_identifier(used_values)
)
ncss_url = "/".join((
settings.thredds_server.base_url,
settings.thredds_server.netcdf_subset_service_url_fragment,
uncert_coverage_configuration.get_thredds_url_fragment(cov_identifier)
))
raw_coverage_data = ncss.query_dataset(
http_client,
thredds_ncss_url=ncss_url,
variable_name=uncert_coverage_configuration.netcdf_main_dataset_name,
longitude=point_geom.x,
latitude=point_geom.y,
time_start=time_start,
time_end=time_end,
)
return _process_coverage_data(
raw_coverage_data,
uncert_coverage_configuration.netcdf_main_dataset_name,
smoothing_strategies,
time_start,
time_end,
base_column_name=base_column_name
)


def _get_coverage_uncertainty_time_series(
settings: ArpavPpcvSettings,
http_client: httpx.Client,
coverage: coverages.CoverageInternal,
point_geom: shapely.Point,
time_start: Optional[dt.datetime],
time_end: Optional[dt.datetime],
smoothing_strategies: list[base.CoverageDataSmoothingStrategy],
) -> dict[str, pd.DataFrame]:
used_possible_values = coverage.configuration.retrieve_used_values(
coverage.identifier)
result = {}
used_values = [
pv.configuration_parameter_value for pv in used_possible_values]
if lower_conf := coverage.configuration.uncertainty_lower_bounds_coverage_configuration:
lower_df = _get_individual_uncertainty_time_series(
settings, http_client, used_values, lower_conf,
point_geom, time_start, time_end, smoothing_strategies,
base_column_name="__".join((coverage.identifier, "UNCERTAINTY_LOWER_BOUND"))
)
result[f"{base.UNCERTAINTY_TIME_SERIES_PATTERN}_LOWER_BOUND"] = lower_df
if upper_conf := coverage.configuration.uncertainty_upper_bounds_coverage_configuration:
upper_df = _get_individual_uncertainty_time_series(
settings, http_client, used_values, upper_conf,
point_geom, time_start, time_end, smoothing_strategies,
base_column_name="__".join((coverage.identifier, "UNCERTAINTY_UPPER_BOUND"))
)
result[f"{base.UNCERTAINTY_TIME_SERIES_PATTERN}_UPPER_BOUND"] = upper_df
return result


def _parse_temporal_range(
raw_temporal_range: str) -> tuple[dt.datetime | None, dt.datetime | None]:
"""Parse a temporal range string, converting time to UTC.
Expand Down
3 changes: 3 additions & 0 deletions arpav_ppcv/schemas/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,9 @@ class ObservationDataSmoothingStrategy(enum.Enum):
MOVING_AVERAGE_5_YEARS = "MOVING_AVERAGE_5_YEARS"


UNCERTAINTY_TIME_SERIES_PATTERN = "**UNCERTAINTY**"


class ObservationAggregationType(enum.Enum):
MONTHLY = "MONTHLY"
SEASONAL = "SEASONAL"
Expand Down
Loading

0 comments on commit 863c80a

Please sign in to comment.