From 9af596ae91f7bd86a2a7e1512e0c65a141085eb2 Mon Sep 17 00:00:00 2001 From: Julien Maupetit Date: Mon, 23 Sep 2024 21:33:48 +0200 Subject: [PATCH] Add new workflow --- env.d/prefect | 1 + src/prefect/Pipfile | 2 + src/prefect/indicators/infrastructure/i1.py | 178 ++++++++++++++++++-- 3 files changed, 171 insertions(+), 10 deletions(-) diff --git a/env.d/prefect b/env.d/prefect index a478e53d..376afa43 100644 --- a/env.d/prefect +++ b/env.d/prefect @@ -5,3 +5,4 @@ PREFECT_API_DATABASE_MIGRATE_ON_START=False PREFECT_API_URL=http://prefect:4200/api PREFECT_SERVER_API_HOST=0.0.0.0 PREFECT_SERVER_ALLOW_EPHEMERAL_MODE=False +QUALICHARGE_DATABASE_URL=postgresql+psycopg://qualicharge:pass@postgresql:5432/qualicharge-api diff --git a/src/prefect/Pipfile b/src/prefect/Pipfile index e876c320..468a2f87 100644 --- a/src/prefect/Pipfile +++ b/src/prefect/Pipfile @@ -8,7 +8,9 @@ prefect = "==3.0.2" geoalchemy2 = {extras = ["shapely"], version = "==0.15.2"} geopandas = "==1.0.1" pandas = "==2.2.2" +psycopg = {extras = ["pool", "binary"], version = "==3.2.2"} pyarrow = "==17.0.0" +tabulate = "==0.9.0" [dev-packages] black = "==24.8.0" diff --git a/src/prefect/indicators/infrastructure/i1.py b/src/prefect/indicators/infrastructure/i1.py index a16e4466..3b223dc1 100644 --- a/src/prefect/indicators/infrastructure/i1.py +++ b/src/prefect/indicators/infrastructure/i1.py @@ -3,10 +3,21 @@ I1: the number of publicly open points of charge. """ +import os +from datetime import datetime from enum import IntEnum +from typing import Optional +from uuid import UUID import pandas as pd from prefect import flow, task +from prefect.artifacts import create_markdown_artifact +from prefect.futures import wait +from prefect.runtime import flow_run, task_run +from prefect.task_runners import ThreadPoolTaskRunner +from pydantic import BaseModel +from sqlalchemy import create_engine +from sqlalchemy.engine import Connection, Engine class Level(IntEnum): @@ -20,19 +31,166 @@ class Level(IntEnum): METROPOLE = 5 +class IndicatorPeriod(IntEnum): + """Time-based indicator periods.""" + + HOUR = 0 + DAY = 1 + WEEK = 2 + MONTH = 3 + QUARTER = 4 + YEAR = 5 + + +class Indicator(BaseModel): + """Indicator result.""" + + code: str + level: Level + target: Optional[str] = None + period: IndicatorPeriod + category: Optional[str] = None + value: float + extras: Optional[dict] = None + timestamp: datetime + + @task -def fetch_points_of_charge() -> pd.DataFrame: - """Fetch points of charge given input level.""" - return pd.read_sql_query(""" +def get_database_engine() -> Engine: + """Get QualiCharge API database engine.""" + return create_engine(os.environ.get("QUALICHARGE_DATABASE_URL")) + + +@task +def get_targets_for_level(connection: Connection, level: Level) -> pd.DataFrame: + """Get registered targets for level from QualiCharge database.""" + match level: + case Level.CITY: + table = "city" + case Level.EPCI: + table = "epci" + case Level.DEPARTMENT: + table = "department" + case Level.REGION: + table = "region" + case _: + raise NotImplementedError("Unsupported level %d", level) + return pd.read_sql_table(table, con=connection) + + +def get_points_of_charge_for_target_task_name(): + """Get task name.""" + flow_name = flow_run.flow_name + task_name = task_run.task_name + parameters = task_run.parameters + level = parameters["level"] + index = parameters["index"] + return f"{flow_name}-{task_name}-for-level-{level}-index-{index}" + + +@task( + name="get-points-of-charge", + task_run_name=get_points_of_charge_for_target_task_name, +) +def get_points_of_charge_for_target( + connection: Connection, level: Level, index: UUID +) -> pd.DataFrame: + """Fetch points of charge given input level and target index.""" + query = """ SELECT - PointDeCharge.id_pdc_itinerance AS id_pdc_itinerance, - City.code AS city, - Department.code as department, - Region.code AS region + PointDeCharge.id_pdc_itinerance AS id_pdc_itinerance FROM PointDeCharge INNER JOIN Station ON PointDeCharge.station_id = Station.id INNER JOIN Localisation ON Station.localisation_id = Localisation.id INNER JOIN City ON Localisation.code_insee_commune = City.code - INNER JOIN Department ON City.department_id = Department.id - INNER JOIN Region ON Department.region_id = Region.id - """) + """ + + match level: + case Level.CITY: + query += f""" + WHERE City.id = '{index}' + """ + case Level.EPCI: + query += f""" + INNER JOIN EPCI ON City.epci_id = EPCI.id + WHERE EPCI.id = '{index}' + """ + case Level.DEPARTMENT: + query += f""" + INNER JOIN Department ON City.department_id = Department.id + WHERE Department.id = '{index}' + """ + case Level.REGION: + query += f""" + INNER JOIN Department ON City.department_id = Department.id + INNER JOIN Region ON Department.region_id = Region.id + WHERE Region.id = '{index}' + """ + case _: + raise NotImplementedError("Unsupported level %d", level) + return pd.read_sql_query(query, con=connection) + + +@task +def count_points_of_charge(df: pd.DataFrame): + """Count the number of points of charge for a given dataframe.""" + return len(df) + + +def i1_for_level_flow_run_name(): + """Get flow run name.""" + flow_name = flow_run.flow_name + parameters = flow_run.parameters + level = parameters["level"] + at = parameters["at"] + return f"{flow_name}-for-level-{level}-at-{at}" + + +@flow( + task_runner=ThreadPoolTaskRunner(max_workers=10), + flow_run_name=i1_for_level_flow_run_name, +) +def i1_for_level(level: Level, at: datetime): + """Calculate the indicator.""" + engine = get_database_engine() + with engine.connect() as connection: + targets = get_targets_for_level(connection, level) + futures = [] + indicators = [] + for _, row in targets.iterrows(): + future = get_points_of_charge_for_target.submit( + connection, level, UUID(row["id"]) + ) + indicators.append( + Indicator( + code="i1", + level=level, + target=row["code"], + period=IndicatorPeriod.DAY, + value=0.0, + timestamp=at, + ) + ) + futures.append(future) + wait(futures) + + # Store result + for future, indicator in zip(futures, indicators, strict=True): + indicator.value = len(future.result()) + + # Build dataframe from results + i1 = pd.DataFrame.from_records( + [indicator.model_dump() for indicator in indicators] + ) + create_markdown_artifact( + key=f"i1-{level}-{at.date().isoformat()}", + markdown=i1.to_markdown(), + description="i1 report", + ) + + +if __name__ == "__main__": + now = pd.Timestamp.now() + i1_for_level(Level.REGION, at=now) + i1_for_level(Level.DEPARTMENT, at=now) + i1_for_level(Level.EPCI, at=now)