Skip to content

Commit

Permalink
update observations data periodically (#196)
Browse files Browse the repository at this point in the history
* Adding tests for coverage time series endpoint

* Adding tests for coverage time series endpoint

* fix failing ncss query

* Added missing geometry to sample station in tests

* Compiling translations before running tests in CI

* Refactor station harvesting code

* Adding prefect

* Tweaking prefect setup so that it works in the stack

* Added prefect flows for ingesting observations data

* Removed prefect profiles configuration

This is no longer needed thanks to prefect 3rc17

* Improved prefect flows with artifacts

* Set default value for nearby stations to 200m

fixes #161

* Added HTTP basic auth to protect access to prefect

* Removed unused code

* Add tests

* Change name of env variable that points to usersfile

* Added info to README.md
  • Loading branch information
ricardogsilva authored Aug 21, 2024
1 parent a2bce54 commit 3be86be
Show file tree
Hide file tree
Showing 18 changed files with 2,255 additions and 647 deletions.
16 changes: 2 additions & 14 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -120,22 +120,10 @@ celerybeat-schedule
## Plugin-specific files:

# IntelliJ
/out/
.idea/
# mpeltonen/sbt-idea plugin
.idea_modules/


# Django
backend/storage
backend/static
backend/.env
#backend/djcore

/docker/basemap/data/
/docker/basemap/gebco/
/proxy/node_modules/

/Arpav-PPCV

.venv

docker/traefik/basicauth-users.txt
7 changes: 7 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,13 @@ project's container registry. This is governed by a two-stage workflow, orchestr
The strategy described above employs an installation of the [webhook](https://github.com/adnanh/webhook) server,
together with some custom deployment scripts.

Relevant places to look for configuration in the staging environment, in addition to the `${HOME}` directory:

- `/opt/traefik`
- `/etc/system/system/docker.service.d`
- `/etc/system/system/traefik.service`
- `/etc/system/system/webhook.service`


##### Production environment

Expand Down
22 changes: 21 additions & 1 deletion arpav_ppcv/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,25 @@ class ContactSettings(pydantic.BaseModel):
email: str = "[email protected]"


class PrefectSettings(pydantic.BaseModel):
num_flow_retries: int = 5
flow_retry_delay_seconds: int = 5
num_task_retries: int = 5
task_retry_delay_seconds: int = 5
observation_stations_refresher_flow_cron_schedule: str = (
"0 1 * * 1" # run once every week, at 01:00 on monday
)
observation_monthly_measurements_refresher_flow_cron_schedule: str = (
"0 2 * * 1" # run once every week, at 02:00 on monday
)
observation_seasonal_measurements_refresher_flow_cron_schedule: str = (
"0 3 * * 1" # run once every week, at 03:00 on monday
)
observation_yearly_measurements_refresher_flow_cron_schedule: str = (
"0 4 * * 1" # run once every week, at 04:00 on monday
)


class ThreddsServerSettings(pydantic.BaseModel):
base_url: str = "http://localhost:8080/thredds"
wms_service_url_fragment: str = "wms"
Expand Down Expand Up @@ -74,8 +93,9 @@ class ArpavPpcvSettings(BaseSettings): # noqa
templates_dir: Optional[Path] = Path(__file__).parent / "webapp/templates"
static_dir: Optional[Path] = Path(__file__).parent / "webapp/static"
thredds_server: ThreddsServerSettings = ThreddsServerSettings()
prefect: PrefectSettings = PrefectSettings()
martin_tile_server_base_url: str = "http://localhost:3000"
nearest_station_radius_meters: int = 10_000
nearest_station_radius_meters: int = 200
v2_api_mount_prefix: str = "/api/v2"
log_config_file: Path | None = None
session_secret_key: str = "changeme"
Expand Down
2 changes: 2 additions & 0 deletions arpav_ppcv/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
from .cliapp.app import app as cli_app
from .bootstrapper.cliapp import app as bootstrapper_app
from .observations_harvester.cliapp import app as observations_harvester_app
from .prefect.cliapp import app as prefect_app
from .thredds import crawler

app = typer.Typer()
Expand All @@ -43,6 +44,7 @@
app.add_typer(observations_harvester_app, name="observations-harvester")
app.add_typer(bootstrapper_app, name="bootstrap")
app.add_typer(translations_app, name="translations")
app.add_typer(prefect_app, name="prefect")


@app.callback()
Expand Down
159 changes: 54 additions & 105 deletions arpav_ppcv/observations_harvester/cliapp.py
Original file line number Diff line number Diff line change
@@ -1,40 +1,63 @@
import httpx
import sqlmodel
import typer
from rich import print
from typing import (
Annotated,
Literal,
)
from typing import Annotated

from .. import database
from . import operations
from ..prefect.flows import observations as observations_flows

app = typer.Typer()


@app.command()
def refresh_stations(ctx: typer.Context) -> None:
client = httpx.Client()
with sqlmodel.Session(ctx.obj["engine"]) as session:
created = operations.refresh_stations(client, session)
print(f"Created {len(created)} stations:")
print("\n".join(s.code for s in created))
def refresh_stations(
variable: Annotated[
str,
typer.Option(
help=(
"Name of the variable to process. If not provided, all "
"variables are processed."
)
),
] = None,
refresh_monthly: Annotated[
bool,
typer.Option(
help=(
"Refresh stations that have monthly measurements for "
"the input month."
)
),
] = True,
refresh_seasonal: Annotated[
bool,
typer.Option(
help=(
"Refresh stations that have seasonal measurements for "
"the input season."
)
),
] = True,
refresh_yearly: Annotated[
bool, typer.Option(help=("Refresh stations that have yearly measurements"))
] = True,
) -> None:
observations_flows.refresh_stations(
variable_name=variable,
refresh_stations_with_monthly_data=refresh_monthly,
refresh_stations_with_seasonal_data=refresh_seasonal,
refresh_stations_with_yearly_data=refresh_yearly,
)


@app.command()
def refresh_monthly_measurements(
ctx: typer.Context,
station: Annotated[
list[str],
str,
typer.Option(
default_factory=list,
help=(
"Code of the station to process. If not provided, all "
"stations are processed."
),
),
],
] = None,
variable: Annotated[
str,
typer.Option(
Expand All @@ -45,29 +68,16 @@ def refresh_monthly_measurements(
),
] = None,
) -> None:
client = httpx.Client()
with sqlmodel.Session(ctx.obj["engine"]) as session:
for station_code in station:
print(f"Processing station: {station_code!r}...")
created = _refresh_measurements(
session, client, variable, station_code, "monthly"
)
print(f"Created {len(created)} monthly measurements:")
print(
"\n".join(
f"{m.station.code}-{m.variable.name}-{m.date.strftime('%Y-%m-%d')}"
for m in created
)
)
observations_flows.refresh_monthly_measurements(
station_code=station, variable_name=variable
)


@app.command()
def refresh_seasonal_measurements(
ctx: typer.Context,
station: Annotated[
list[str],
str,
typer.Option(
default_factory=list,
help=(
"Code of the station to process. If not provided, all "
"stations are processed."
Expand All @@ -84,35 +94,23 @@ def refresh_seasonal_measurements(
),
] = None,
) -> None:
client = httpx.Client()
with sqlmodel.Session(ctx.obj["engine"]) as session:
if len(station) > 0:
for station_code in station:
print(f"Processing station {station_code!r}...")
created = _refresh_measurements(
session, client, variable, station_code, "seasonal"
)
else:
created = _refresh_measurements(session, client, variable, None, "seasonal")
print(f"Created {len(created)} seasonal measurements:")
print(
"\n".join(f"{m.station.code}-{m.variable.name}-{m.year}" for m in created)
)
observations_flows.refresh_seasonal_measurements(
station_code=station,
variable_name=variable,
)


@app.command()
def refresh_yearly_measurements(
ctx: typer.Context,
station: Annotated[
list[str],
str,
typer.Option(
default_factory=list,
help=(
"Code of the station to process. If not provided, all "
"stations are processed."
),
),
],
] = None,
variable: Annotated[
str,
typer.Option(
Expand All @@ -123,55 +121,6 @@ def refresh_yearly_measurements(
),
] = None,
) -> None:
client = httpx.Client()
with sqlmodel.Session(ctx.obj["engine"]) as session:
if len(station) > 0:
for station_code in station:
print(f"Processing station {station_code!r}...")
created = _refresh_measurements(
session, client, variable, station_code, "yearly"
)
else:
created = _refresh_measurements(session, client, variable, None, "yearly")
print(f"Created {len(created)} yearly measurements:")
print(
"\n".join(f"{m.station.code}-{m.variable.name}-{m.year}" for m in created)
)


def _refresh_measurements(
db_session: sqlmodel.Session,
client: httpx.Client,
variable_name: str | None,
station_code: str | None,
measurement_type: Literal["monthly", "seasonal", "yearly"],
) -> list:
if station_code is not None:
db_station = database.get_station_by_code(db_session, station_code)
if db_station is not None:
station_id = db_station.id
else:
raise SystemExit("Invalid station code")
else:
station_id = None
if variable_name is not None:
db_variable = database.get_variable_by_name(db_session, variable_name)
if db_variable is not None:
variable_id = db_variable.id
else:
raise SystemExit("Invalid variable name")
else:
variable_id = None

handler = {
"monthly": operations.refresh_monthly_measurements,
"seasonal": operations.refresh_seasonal_measurements,
"yearly": operations.refresh_yearly_measurements,
}[measurement_type]

return handler(
client,
db_session,
station_id=station_id,
variable_id=variable_id,
observations_flows.refresh_yearly_measurements(
station_code=station, variable_name=variable
)
Loading

0 comments on commit 3be86be

Please sign in to comment.