Skip to content

Commit

Permalink
Merge pull request #98 from ricardogsilva/96-Implement-API-endpoint-f…
Browse files Browse the repository at this point in the history
…or-generating-time-series-for-historical-data

Generate time series for historical data
  • Loading branch information
francbartoli authored May 28, 2024
2 parents e4b7e62 + e278f5a commit 0bedcbd
Show file tree
Hide file tree
Showing 11 changed files with 1,111 additions and 72 deletions.
97 changes: 97 additions & 0 deletions arpav_ppcv/operations.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

import httpx
import pandas as pd
import pymannkendall as mk
import pyproj
import shapely
import shapely.io
Expand All @@ -28,6 +29,102 @@
logger = logging.getLogger(__name__)


def get_observation_time_series(
session: sqlmodel.Session,
variable: observations.Variable,
station: observations.Station,
month: int,
temporal_range: str,
smoothing_strategies: list[base.ObservationDataSmoothingStrategy] = [ # noqa
base.ObservationDataSmoothingStrategy.NO_SMOOTHING
],
include_decade_data: bool = False,
mann_kendall_parameters: base.MannKendallParameters | None = None,
) -> tuple[
pd.DataFrame,
Optional[pd.DataFrame],
Optional[pd.DataFrame],
Optional[dict[str, str]],
]:
start, end = _parse_temporal_range(temporal_range)
raw_measurements = database.collect_all_monthly_measurements(
session=session,
station_id_filter=station.id,
variable_id_filter=variable.id,
month_filter=month,
)
df = pd.DataFrame(m.model_dump() for m in raw_measurements)
base_name = variable.name
df = df[["value", "date"]].rename(columns={"value": base_name})
df["time"] = pd.to_datetime(df["date"], utc=True)
df = df[["time", base_name]]
df.set_index("time", inplace=True)
if start is not None:
df = df[start:]
if end is not None:
df = df[:end]
unsmoothed_col_name = "__".join(
(base_name, base.ObservationDataSmoothingStrategy.NO_SMOOTHING.value)
)
df[unsmoothed_col_name] = df[base_name]
info = {}

if include_decade_data:
decade_df = df.groupby((df.index.year // 10) * 10).mean()
decade_df = decade_df.drop(columns=[base_name])
decade_df["time"] = pd.to_datetime(decade_df.index.astype(str), utc=True)
decade_df.set_index("time", inplace=True)
decade_df = decade_df.rename(
columns={unsmoothed_col_name: f"{base_name}__DECADE_MEAN"}
)
else:
decade_df = None

if mann_kendall_parameters is not None:
mk_col = f"{base_name}__MANN_KENDALL"
mk_start = str(mann_kendall_parameters.start_year or df.index[0].year)
mk_end = str(mann_kendall_parameters.end_year or df.index[-1].year)
mk_df = df[mk_start:mk_end].copy()
mk_result = mk.original_test(mk_df[base_name])
mk_df[mk_col] = (
mk_result.slope * (mk_df.index.year - mk_df.index.year.min())
+ mk_result.intercept
)
mk_df = mk_df.drop(columns=[base_name, unsmoothed_col_name])
info.update(
{
"mann_kendall": {
"trend": mk_result.trend,
"h": mk_result.h,
"p": mk_result.p,
"z": mk_result.z,
"tau": mk_result.Tau,
"s": mk_result.s,
"var_s": mk_result.var_s,
"slope": mk_result.slope,
"intercept": mk_result.intercept,
}
}
)
else:
mk_df = None

for smoothing_strategy in smoothing_strategies:
if (
smoothing_strategy
== base.ObservationDataSmoothingStrategy.MOVING_AVERAGE_5_YEARS
):
col_name = "__".join((base_name, smoothing_strategy.value))
df[col_name] = df[base_name].rolling(window=5, center=True).mean()

df = df.drop(
columns=[
base_name,
]
)
return df, decade_df, mk_df, info if len(info) > 0 else None


def get_coverage_time_series(
settings: ArpavPpcvSettings,
session: sqlmodel.Session,
Expand Down
7 changes: 7 additions & 0 deletions arpav_ppcv/schemas/base.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,15 @@
import dataclasses
import enum
import pydantic
import sqlmodel


@dataclasses.dataclass
class MannKendallParameters:
start_year: int | None = None
end_year: int | None = None


class Season(enum.Enum):
WINTER = "WINTER"
SPRING = "SPRING"
Expand Down
23 changes: 12 additions & 11 deletions arpav_ppcv/webapp/api_v2/routers/coverages.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,11 @@
from ....schemas.coverages import CoverageInternal
from ... import dependencies
from ..schemas import coverages as coverage_schemas
from ..schemas.base import (
TimeSeries,
TimeSeriesItem,
TimeSeriesList,
)


logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -246,9 +251,7 @@ async def wms_endpoint(
raise HTTPException(status_code=400, detail="Invalid coverage_identifier")


@router.get(
"/time-series/{coverage_identifier}", response_model=coverage_schemas.TimeSeriesList
)
@router.get("/time-series/{coverage_identifier}", response_model=TimeSeriesList)
def get_time_series(
db_session: Annotated[Session, Depends(dependencies.get_db_session)],
settings: Annotated[ArpavPpcvSettings, Depends(dependencies.get_settings)],
Expand All @@ -266,9 +269,9 @@ def get_time_series(
)
),
] = False,
coverage_data_smoothing: Annotated[list[CoverageDataSmoothingStrategy], Query()] = [
coverage_data_smoothing: Annotated[list[CoverageDataSmoothingStrategy], Query()] = [ # noqa
ObservationDataSmoothingStrategy.NO_SMOOTHING
], # noqa
],
observation_data_smoothing: Annotated[
list[ObservationDataSmoothingStrategy], Query()
] = [ObservationDataSmoothingStrategy.NO_SMOOTHING], # noqa
Expand Down Expand Up @@ -392,7 +395,7 @@ def get_time_series(
},
)
series.extend(station_series)
return coverage_schemas.TimeSeriesList(series=series)
return TimeSeriesList(series=series)
else:
raise HTTPException(status_code=400, detail="Invalid coverage_identifier")
else:
Expand All @@ -406,7 +409,7 @@ def _serialize_dataframe(
ObservationDataSmoothingStrategy | CoverageDataSmoothingStrategy
],
extra_info: Optional[dict[str, str]] = None,
) -> list[coverage_schemas.TimeSeries]:
) -> list[TimeSeries]:
series = []
for series_name, series_measurements in data_.to_dict().items():
name_prefix, smoothing_strategy = series_name.rpartition("__")[::2]
Expand All @@ -419,11 +422,9 @@ def _serialize_dataframe(
else:
measurements = []
for timestamp, value in series_measurements.items():
measurements.append(
coverage_schemas.TimeSeriesItem(value=value, datetime=timestamp)
)
measurements.append(TimeSeriesItem(value=value, datetime=timestamp))
series.append(
coverage_schemas.TimeSeries(
TimeSeries(
name=series_name,
values=measurements,
info={
Expand Down
Loading

0 comments on commit 0bedcbd

Please sign in to comment.