Skip to content

Commit

Permalink
Add new workflow
Browse files Browse the repository at this point in the history
  • Loading branch information
jmaupetit committed Sep 23, 2024
1 parent fb0daab commit 9af596a
Show file tree
Hide file tree
Showing 3 changed files with 171 additions and 10 deletions.
1 change: 1 addition & 0 deletions env.d/prefect
Original file line number Diff line number Diff line change
Expand Up @@ -5,3 +5,4 @@ PREFECT_API_DATABASE_MIGRATE_ON_START=False
PREFECT_API_URL=http://prefect:4200/api
PREFECT_SERVER_API_HOST=0.0.0.0
PREFECT_SERVER_ALLOW_EPHEMERAL_MODE=False
QUALICHARGE_DATABASE_URL=postgresql+psycopg://qualicharge:pass@postgresql:5432/qualicharge-api
2 changes: 2 additions & 0 deletions src/prefect/Pipfile
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,9 @@ prefect = "==3.0.2"
geoalchemy2 = {extras = ["shapely"], version = "==0.15.2"}
geopandas = "==1.0.1"
pandas = "==2.2.2"
psycopg = {extras = ["pool", "binary"], version = "==3.2.2"}
pyarrow = "==17.0.0"
tabulate = "==0.9.0"

[dev-packages]
black = "==24.8.0"
Expand Down
178 changes: 168 additions & 10 deletions src/prefect/indicators/infrastructure/i1.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,21 @@
I1: the number of publicly open points of charge.
"""

import os
from datetime import datetime
from enum import IntEnum
from typing import Optional
from uuid import UUID

import pandas as pd
from prefect import flow, task
from prefect.artifacts import create_markdown_artifact
from prefect.futures import wait
from prefect.runtime import flow_run, task_run
from prefect.task_runners import ThreadPoolTaskRunner
from pydantic import BaseModel
from sqlalchemy import create_engine
from sqlalchemy.engine import Connection, Engine


class Level(IntEnum):
Expand All @@ -20,19 +31,166 @@ class Level(IntEnum):
METROPOLE = 5


class IndicatorPeriod(IntEnum):
"""Time-based indicator periods."""

HOUR = 0
DAY = 1
WEEK = 2
MONTH = 3
QUARTER = 4
YEAR = 5


class Indicator(BaseModel):
"""Indicator result."""

code: str
level: Level
target: Optional[str] = None
period: IndicatorPeriod
category: Optional[str] = None
value: float
extras: Optional[dict] = None
timestamp: datetime


@task
def fetch_points_of_charge() -> pd.DataFrame:
"""Fetch points of charge given input level."""
return pd.read_sql_query("""
def get_database_engine() -> Engine:
"""Get QualiCharge API database engine."""
return create_engine(os.environ.get("QUALICHARGE_DATABASE_URL"))


@task
def get_targets_for_level(connection: Connection, level: Level) -> pd.DataFrame:
"""Get registered targets for level from QualiCharge database."""
match level:
case Level.CITY:
table = "city"
case Level.EPCI:
table = "epci"
case Level.DEPARTMENT:
table = "department"
case Level.REGION:
table = "region"
case _:
raise NotImplementedError("Unsupported level %d", level)
return pd.read_sql_table(table, con=connection)


def get_points_of_charge_for_target_task_name():
"""Get task name."""
flow_name = flow_run.flow_name
task_name = task_run.task_name
parameters = task_run.parameters
level = parameters["level"]
index = parameters["index"]
return f"{flow_name}-{task_name}-for-level-{level}-index-{index}"


@task(
name="get-points-of-charge",
task_run_name=get_points_of_charge_for_target_task_name,
)
def get_points_of_charge_for_target(
connection: Connection, level: Level, index: UUID
) -> pd.DataFrame:
"""Fetch points of charge given input level and target index."""
query = """
SELECT
PointDeCharge.id_pdc_itinerance AS id_pdc_itinerance,
City.code AS city,
Department.code as department,
Region.code AS region
PointDeCharge.id_pdc_itinerance AS id_pdc_itinerance
FROM PointDeCharge
INNER JOIN Station ON PointDeCharge.station_id = Station.id
INNER JOIN Localisation ON Station.localisation_id = Localisation.id
INNER JOIN City ON Localisation.code_insee_commune = City.code
INNER JOIN Department ON City.department_id = Department.id
INNER JOIN Region ON Department.region_id = Region.id
""")
"""

match level:
case Level.CITY:
query += f"""
WHERE City.id = '{index}'
"""
case Level.EPCI:
query += f"""
INNER JOIN EPCI ON City.epci_id = EPCI.id
WHERE EPCI.id = '{index}'
"""
case Level.DEPARTMENT:
query += f"""
INNER JOIN Department ON City.department_id = Department.id
WHERE Department.id = '{index}'
"""
case Level.REGION:
query += f"""
INNER JOIN Department ON City.department_id = Department.id
INNER JOIN Region ON Department.region_id = Region.id
WHERE Region.id = '{index}'
"""
case _:
raise NotImplementedError("Unsupported level %d", level)
return pd.read_sql_query(query, con=connection)


@task
def count_points_of_charge(df: pd.DataFrame):
"""Count the number of points of charge for a given dataframe."""
return len(df)


def i1_for_level_flow_run_name():
"""Get flow run name."""
flow_name = flow_run.flow_name
parameters = flow_run.parameters
level = parameters["level"]
at = parameters["at"]
return f"{flow_name}-for-level-{level}-at-{at}"


@flow(
task_runner=ThreadPoolTaskRunner(max_workers=10),
flow_run_name=i1_for_level_flow_run_name,
)
def i1_for_level(level: Level, at: datetime):
"""Calculate the indicator."""
engine = get_database_engine()
with engine.connect() as connection:
targets = get_targets_for_level(connection, level)
futures = []
indicators = []
for _, row in targets.iterrows():
future = get_points_of_charge_for_target.submit(
connection, level, UUID(row["id"])
)
indicators.append(
Indicator(
code="i1",
level=level,
target=row["code"],
period=IndicatorPeriod.DAY,
value=0.0,
timestamp=at,
)
)
futures.append(future)
wait(futures)

# Store result
for future, indicator in zip(futures, indicators, strict=True):
indicator.value = len(future.result())

# Build dataframe from results
i1 = pd.DataFrame.from_records(
[indicator.model_dump() for indicator in indicators]
)
create_markdown_artifact(
key=f"i1-{level}-{at.date().isoformat()}",
markdown=i1.to_markdown(),
description="i1 report",
)


if __name__ == "__main__":
now = pd.Timestamp.now()
i1_for_level(Level.REGION, at=now)
i1_for_level(Level.DEPARTMENT, at=now)
i1_for_level(Level.EPCI, at=now)

0 comments on commit 9af596a

Please sign in to comment.