Skip to content

Commit

Permalink
Initial implementation of time series for observations
Browse files Browse the repository at this point in the history
  • Loading branch information
ricardogsilva committed May 28, 2024
1 parent 69057e7 commit e278f5a
Show file tree
Hide file tree
Showing 6 changed files with 859 additions and 218 deletions.
96 changes: 65 additions & 31 deletions arpav_ppcv/operations.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
from typing import Optional

import httpx
import numpy as np
import pandas as pd
import pymannkendall as mk
import pyproj
Expand All @@ -31,23 +30,30 @@


def get_observation_time_series(
session: sqlmodel.Session,
variable: observations.Variable,
station: observations.Station,
month: int,
temporal_range: str,
smoothing_strategy: base.ObservationDataSmoothingStrategy = base.ObservationDataSmoothingStrategy.NO_SMOOTHING,
include_decade_data: bool = False,
mann_kendall_parameters: base.MannKendallParameters | None = None
) -> pd.DataFrame:
session: sqlmodel.Session,
variable: observations.Variable,
station: observations.Station,
month: int,
temporal_range: str,
smoothing_strategies: list[base.ObservationDataSmoothingStrategy] = [ # noqa
base.ObservationDataSmoothingStrategy.NO_SMOOTHING
],
include_decade_data: bool = False,
mann_kendall_parameters: base.MannKendallParameters | None = None,
) -> tuple[
pd.DataFrame,
Optional[pd.DataFrame],
Optional[pd.DataFrame],
Optional[dict[str, str]],
]:
start, end = _parse_temporal_range(temporal_range)
raw_measurements = database.collect_all_monthly_measurements(
session=session,
station_id_filter=station.id,
variable_id_filter=variable.id,
month_filter=month,
)
df = pd.DataFrame([m.model_dump() for m in raw_measurements])
df = pd.DataFrame(m.model_dump() for m in raw_measurements)
base_name = variable.name
df = df[["value", "date"]].rename(columns={"value": base_name})
df["time"] = pd.to_datetime(df["date"], utc=True)
Expand All @@ -57,38 +63,66 @@ def get_observation_time_series(
df = df[start:]
if end is not None:
df = df[:end]
col_name = "__".join((base_name, smoothing_strategy.value))
if smoothing_strategy == base.ObservationDataSmoothingStrategy.NO_SMOOTHING:
df[col_name] = df[base_name]
elif smoothing_strategy == base.ObservationDataSmoothingStrategy.MOVING_AVERAGE_5_YEARS:
df[col_name] = df[base_name].rolling(window=5, center=True).mean()
unsmoothed_col_name = "__".join(
(base_name, base.ObservationDataSmoothingStrategy.NO_SMOOTHING.value)
)
df[unsmoothed_col_name] = df[base_name]
info = {}

if include_decade_data:
decade_df = df.groupby((df.index.year // 10) * 10).mean()
df["decade"] = (df.index.year // 10) * 10
df = df.join(decade_df, on="decade", rsuffix="**DECADE_MEAN")
df = df.drop(
columns=[
"decade",
f"{base_name}__{smoothing_strategy.value}**DECADE_MEAN",
]
decade_df = decade_df.drop(columns=[base_name])
decade_df["time"] = pd.to_datetime(decade_df.index.astype(str), utc=True)
decade_df.set_index("time", inplace=True)
decade_df = decade_df.rename(
columns={unsmoothed_col_name: f"{base_name}__DECADE_MEAN"}
)
else:
decade_df = None

if mann_kendall_parameters is not None:
mk_col = f"{base_name}**MANN_KENDALL"
mk_col = f"{base_name}__MANN_KENDALL"
mk_start = str(mann_kendall_parameters.start_year or df.index[0].year)
mk_end = str(mann_kendall_parameters.end_year or df.index[-1].year)
df_trend = df[mk_start:mk_end]
mk_result = mk.original_test(df_trend[base_name])
df[mk_col] = np.nan
df.loc[mk_start:mk_end, mk_col] = (
mk_result.slope * (df_trend.index.year - df_trend.index.year.min())
+ mk_result.intercept
mk_df = df[mk_start:mk_end].copy()
mk_result = mk.original_test(mk_df[base_name])
mk_df[mk_col] = (
mk_result.slope * (mk_df.index.year - mk_df.index.year.min())
+ mk_result.intercept
)
mk_df = mk_df.drop(columns=[base_name, unsmoothed_col_name])
info.update(
{
"mann_kendall": {
"trend": mk_result.trend,
"h": mk_result.h,
"p": mk_result.p,
"z": mk_result.z,
"tau": mk_result.Tau,
"s": mk_result.s,
"var_s": mk_result.var_s,
"slope": mk_result.slope,
"intercept": mk_result.intercept,
}
}
)
else:
mk_df = None

for smoothing_strategy in smoothing_strategies:
if (
smoothing_strategy
== base.ObservationDataSmoothingStrategy.MOVING_AVERAGE_5_YEARS
):
col_name = "__".join((base_name, smoothing_strategy.value))
df[col_name] = df[base_name].rolling(window=5, center=True).mean()

df = df.drop(
columns=[
base_name,
]
)
return df
return df, decade_df, mk_df, info if len(info) > 0 else None


def get_coverage_time_series(
Expand Down
12 changes: 4 additions & 8 deletions arpav_ppcv/webapp/api_v2/routers/coverages.py
Original file line number Diff line number Diff line change
Expand Up @@ -251,9 +251,7 @@ async def wms_endpoint(
raise HTTPException(status_code=400, detail="Invalid coverage_identifier")


@router.get(
"/time-series/{coverage_identifier}", response_model=TimeSeriesList
)
@router.get("/time-series/{coverage_identifier}", response_model=TimeSeriesList)
def get_time_series(
db_session: Annotated[Session, Depends(dependencies.get_db_session)],
settings: Annotated[ArpavPpcvSettings, Depends(dependencies.get_settings)],
Expand All @@ -271,9 +269,9 @@ def get_time_series(
)
),
] = False,
coverage_data_smoothing: Annotated[list[CoverageDataSmoothingStrategy], Query()] = [
coverage_data_smoothing: Annotated[list[CoverageDataSmoothingStrategy], Query()] = [ # noqa
ObservationDataSmoothingStrategy.NO_SMOOTHING
], # noqa
],
observation_data_smoothing: Annotated[
list[ObservationDataSmoothingStrategy], Query()
] = [ObservationDataSmoothingStrategy.NO_SMOOTHING], # noqa
Expand Down Expand Up @@ -424,9 +422,7 @@ def _serialize_dataframe(
else:
measurements = []
for timestamp, value in series_measurements.items():
measurements.append(
TimeSeriesItem(value=value, datetime=timestamp)
)
measurements.append(TimeSeriesItem(value=value, datetime=timestamp))
series.append(
TimeSeries(
name=series_name,
Expand Down
99 changes: 60 additions & 39 deletions arpav_ppcv/webapp/api_v2/routers/observations.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,9 @@
)

import fastapi
import numpy as np
import pandas as pd
import pydantic
from arpav_ppcv.schemas.base import ObservationDataSmoothingStrategy
from fastapi import (
APIRouter,
Depends,
Expand Down Expand Up @@ -70,9 +71,7 @@ def list_stations(
"""List known stations."""
filter_kwargs = {}
if variable_name is not None:
if (
db_var := db.get_variable_by_name(db_session, variable_name)
) is not None:
if (db_var := db.get_variable_by_name(db_session, variable_name)) is not None:
filter_kwargs.update(
{
"variable_id_filter": db_var.id,
Expand Down Expand Up @@ -292,9 +291,7 @@ def get_seasonal_measurement(
db_session: Annotated[Session, Depends(dependencies.get_db_session)],
seasonal_measurement_id: pydantic.UUID4,
):
db_measurement = db.get_seasonal_measurement(
db_session, seasonal_measurement_id
)
db_measurement = db.get_seasonal_measurement(db_session, seasonal_measurement_id)
return observations.SeasonalMeasurementReadListItem.from_db_instance(
db_measurement, request
)
Expand Down Expand Up @@ -362,60 +359,84 @@ def get_yearly_measurement(


@router.get(
"/{station_id}/time-series/{month}/{variable_id}", response_model=TimeSeriesList
"/time-series/{station_code}/{variable_name}/{month}", response_model=TimeSeriesList
)
def get_time_series(
db_session: Annotated[Session, Depends(dependencies.get_db_session)],
station_id: pydantic.UUID4,
month: Annotated[
int,
Path(ge=1, le=12)
],
variable_id: pydantic.UUID4,
datetime: Optional[str] = "../..",
smoothing: Annotated[base.ObservationDataSmoothingStrategy, Query()] = base.ObservationDataSmoothingStrategy.NO_SMOOTHING, # noqa
include_decade_data: bool = False,
include_mann_kendall_trend: bool = False,
mann_kendall_start_year: Optional[int] = None,
mann_kendall_end_year: Optional[int] = None,
db_session: Annotated[Session, Depends(dependencies.get_db_session)],
station_code: str,
month: Annotated[int, Path(ge=1, le=12)],
variable_name: str,
datetime: Optional[str] = "../..",
smoothing: Annotated[list[base.ObservationDataSmoothingStrategy], Query()] = [ # noqa
base.ObservationDataSmoothingStrategy.NO_SMOOTHING
],
include_decade_data: bool = False,
include_mann_kendall_trend: bool = False,
mann_kendall_start_year: Optional[int] = None,
mann_kendall_end_year: Optional[int] = None,
):
if (db_station := db.get_station(db_session, station_id)) is not None:
if (db_variable := db.get_variable(db_session, variable_id)) is not None:
if (db_station := db.get_station_by_code(db_session, station_code)) is not None:
if (
db_variable := db.get_variable_by_name(db_session, variable_name)
) is not None:
if include_mann_kendall_trend:
mann_kendall = base.MannKendallParameters(
start_year=mann_kendall_start_year,
end_year=mann_kendall_end_year,
)
else:
mann_kendall = None
time_series = operations.get_observation_time_series(
obs_df, decade_df, mk_df, info = operations.get_observation_time_series(
db_session,
variable=db_variable,
station=db_station,
month=month,
temporal_range=datetime,
smoothing_strategy=smoothing,
smoothing_strategies=smoothing,
include_decade_data=include_decade_data,
mann_kendall_parameters=mann_kendall
mann_kendall_parameters=mann_kendall,
)

series = []
for series_name, series_measurements in time_series.to_dict().items():
measurements = []
for timestamp, value in series_measurements.items():
if not math.isnan(value):
measurements.append(
TimeSeriesItem(value=value, datetime=timestamp)
)
series.append(
TimeSeries(
name=series_name,
values=measurements,
)
if include_decade_data and decade_df is not None:
series.extend(_serialize_dataframe(decade_df))

if include_mann_kendall_trend and mk_df is not None:
series.extend(
_serialize_dataframe(mk_df, info=(info or {}).get("mann_kendall"))
)

exclude_pattern = (
ObservationDataSmoothingStrategy.NO_SMOOTHING.value
if ObservationDataSmoothingStrategy.NO_SMOOTHING not in smoothing
else None
)
series.extend(
_serialize_dataframe(
obs_df,
exclude_series_name_pattern=exclude_pattern,
)
)
return TimeSeriesList(series=series)
else:
raise HTTPException(status_code=400, detail="Invalid variable identifier")
else:
raise HTTPException(status_code=400, detail="Invalid station identifier")


def _serialize_dataframe(
df: pd.DataFrame,
exclude_series_name_pattern: str | None = None,
info: dict[str, str] | None = None,
) -> list[TimeSeries]:
series = []
for series_name, series_measurements in df.to_dict().items():
if (
exclude_series_name_pattern is None
or exclude_series_name_pattern not in series_name
):
measurements = []
for timestamp, value in series_measurements.items():
if not math.isnan(value):
measurements.append(TimeSeriesItem(value=value, datetime=timestamp))
series.append(TimeSeries(name=series_name, values=measurements, info=info))
return series
2 changes: 1 addition & 1 deletion arpav_ppcv/webapp/api_v2/schemas/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ class TimeSeriesItem(pydantic.BaseModel):
class TimeSeries(pydantic.BaseModel):
name: str
values: list[TimeSeriesItem]
info: typing.Optional[dict[str, str]] = None
info: typing.Optional[dict[str, str | int | float | bool]] = None


class TimeSeriesList(pydantic.BaseModel):
Expand Down
Loading

0 comments on commit e278f5a

Please sign in to comment.