Skip to content

Commit

Permalink
新增: 加入臺北市服務業汰換節能設備補助相關資訊 v1
Browse files Browse the repository at this point in the history
  • Loading branch information
perry committed Dec 6, 2024
1 parent 32d0d57 commit f2805b8
Show file tree
Hide file tree
Showing 3 changed files with 160 additions and 0 deletions.
Empty file.
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
from airflow import DAG
from operators.common_pipeline import CommonDag
import pandas as pd
from sqlalchemy import create_engine
from utils.extract_stage import get_data_taipei_api
from utils.load_stage import (
save_geodataframe_to_postgresql,
update_lasttime_in_data_to_dataset_info,
)
from datetime import datetime
from utils.transform_address import (
clean_data,
get_addr_xy_parallel,
main_process,
save_data,
)
from utils.transform_geometry import add_point_wkbgeometry_column_to_df
from utils.transform_time import convert_str_to_time_format


dag_id = "env_srv_energy_subsidy"


def _transfer(**kwargs):


##############
## get config
##############
ready_data_db_uri = kwargs.get("ready_data_db_uri")
data_path = kwargs.get("data_path")
dag_infos = kwargs.get("dag_infos")
dag_id = dag_infos.get("dag_id")
load_behavior = dag_infos.get("load_behavior")
default_table = dag_infos.get("ready_data_default_table")
history_table = dag_infos.get("ready_data_history_table")
RID = "5f239e5c-9388-4dc2-b6a6-f65491edd78a"
FROM_CRS = 4326
GEOMETRY_TYPE = "Point"

##############
## Extract
##############
raw_list = get_data_taipei_api(RID)
raw_data = pd.DataFrame(raw_list)
# raw_data["data_time"] = raw_data["_importdate"].iloc[0]["date"]

raw_data["etl_dtm"] = datetime.now()
print(raw_data)

##############
## Transform
##############
data = raw_data.copy()
# rename
data = data.rename(
columns={
"年度": "data_year",
"核撥件數": "num_of_approval",
"累積核撥件數": "acc_num_of_approval",
"補助金額": "subsidy_amt",
"累計補助金額": "acc_subsidy_amt",
"節電量": "enegry_saving_amt",
"累積節電量": "acc_enegry_saving_amt",
}
)

# # standardize time
# data["etl_dtm"] = convert_str_to_time_format(data["etl_dtm"])
# # geocoding
# addr = data["address"]
# addr_cleaned = clean_data(addr)
# standard_addr_list = main_process(addr_cleaned)
# result, output = save_data(addr, addr_cleaned, standard_addr_list)
# data["address"] = output
# unique_addr = pd.Series(output.unique())
# x, y = get_addr_xy_parallel(unique_addr)
# temp = pd.DataFrame({"lng": x, "lat": y, "address": unique_addr})
# data = pd.merge(data, temp, on="address", how="left")
# # add town
# town_pattern = "(中正|大同|中山|松山|大安|萬華|信義|士林|北投|內湖|南港|文山)區"
# data["town"] = data["address"].str.extract(town_pattern, expand=False) + "區"
# data.loc[data["town"] == "區", "town"] = ""
# # define columns
# data["lng"] = pd.to_numeric(data["lng"], errors="coerce")
# data["lat"] = pd.to_numeric(data["lat"], errors="coerce")
# # geometry
# gdata = add_point_wkbgeometry_column_to_df(
# data, data["lng"], data["lat"], from_crs=FROM_CRS
# )


# select columns
ready_data = gdata[
[
"data_time",
"town",
"address",
"area",
"person_capacity",
"is_accessible",
"lng",
"lat",
"wkb_geometry",
]
]

# Load
engine = create_engine(ready_data_db_uri)
save_dataframe_to_postgresql(
engine,
gdata=ready_data,
load_behavior=load_behavior,
default_table=default_table,
history_table=history_table,
# geometry_type=GEOMETRY_TYPE,
)
lasttime_in_data = data["data_time"].max()
update_lasttime_in_data_to_dataset_info(engine, dag_id, lasttime_in_data)


dag = CommonDag(proj_folder="proj_city_dashboard", dag_folder= dag_id)
dag.create_dag(etl_func=_transfer)
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
{
"dag_infos": {
"dag_id": "env_srv_energy_subsidy",
"start_date": "2024-05-30",
"schedule_interval": "0 3 24 6,12 *",
"catchup": false,
"tags": ["env_srv_energy_subsidy", "產業局", "臺北市服務業汰換節能設備補助相關資訊"],
"description": "Taipei city's air raid shelter list.",
"default_args": {
"owner": "airflow",
"email": ["DEFAULT_EMAIL_LIST"],
"email_on_retry": false,
"email_on_failure": true,
"retries": 1,
"retry_delay" : 60
},
"ready_data_db": "postgres_default",
"ready_data_default_table": "env_srv_energy_subsidy",
"ready_data_history_table": "env_srv_energy_subsidy_history",
"raw_data_db": "postgres_default",
"raw_data_table": "",
"load_behavior": "current+history"
},
"data_infos":{
"name_cn": "臺北市服務業汰換節能設備補助相關資訊",
"airflow_update_freq": "03:00 24th Jun and Dec every year",
"source": "https://data.taipei/dataset/detail?id=01370301-b843-4b60-ae8c-6a8789880bfe",
"source_type": "data.taipei api",
"source_dept": "產業局",
"gis_format": "Point",
"output_coordinate": "EPSG:4326",
"is_geometry": 0,
"dataset_description": "臺北市服務業汰換節能設備補助相關資訊",
"etl_description": "standardize time, geocoding, standardize geometry",
"sensitivity": "public"
}
}

0 comments on commit f2805b8

Please sign in to comment.