-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathhook.py
131 lines (116 loc) · 5.63 KB
/
hook.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
from database_handler import *
from logging_handler import log_error_msg
from lookups import Errors, HookSteps, DataWareHouseSchema, SQLCommandsPath, ETL_Checkpoint
from misc_handler import get_sql_files_list
import os
from data_extraction_handler import readData
from transformation_handler import clean_all_data
import datetime
def execute_sql_folder_hook(db_session, target_schema = DataWareHouseSchema.SCHEMA_NAME, sql_commands_path = SQLCommandsPath.SQL_FOLDER):
sql_files = None
try:
sql_files = get_sql_files_list(sql_commands_path)
for file in sql_files:
if '_hook' in file:
with open(os.path.join(sql_commands_path.value,file), 'r') as f:
sql_query = f.read()
sql_query = sql_query.replace('target_schema', target_schema.value)
return_val = execute_query(db_session= db_session, query= sql_query)
if not return_val == Errors.NO_ERROR:
raise Exception(f"{HookSteps.EXECUTE_SQL_QUERIES.value} = Error on SQL FILE = " + str(file))
except Exception as e:
log_error_msg(Errors.HOOK_SQL_ERROR.value, str(e))
finally:
return sql_files
def create_etl_checkpoint(target_schema, db_session):
query = None
try:
query = f"""CREATE TABLE IF NOT EXISTS {target_schema.value}.{ETL_Checkpoint.TABLE.value}
(
{ETL_Checkpoint.COLUMN.value} TIMESTAMP
);
"""
execute_query(db_session, query)
except Exception as e:
log_error_msg(HookSteps.CREATE_ETL_CHECKPOINT.value,str(e))
finally:
return query
def insert_or_update_etl_checkpoint(db_session,
etl_time_exists = False,
target_schema = DataWareHouseSchema.SCHEMA_NAME,
table_name = ETL_Checkpoint.TABLE,
column_name = ETL_Checkpoint.COLUMN):
try:
if etl_time_exists:
update_query = f"""
UPDATE {target_schema.value}.{table_name.value}
SET {column_name.value} = '{datetime.datetime.now()}'
"""
execute_query(db_session=db_session, query=update_query)
else:
insert_query = f"""
INSERT INTO {target_schema.value}.{table_name.value}
VALUES('{ETL_Checkpoint.ETL_DEFAULT_DATE.value}')
"""
execute_query(db_session=db_session, query=insert_query)
except Exception as e:
log_error_msg(HookSteps.INSERT_UPDATE_ETL_CHECKPOINT.value,str(e))
def return_etl_last_updated_date(db_session,
target_schema = DataWareHouseSchema.SCHEMA_NAME,
etl_date = ETL_Checkpoint.ETL_DEFAULT_DATE,
table_name = ETL_Checkpoint.TABLE,
column_name = ETL_Checkpoint.COLUMN):
etl_time_exists = False
return_date = None
try:
query = f"SELECT {column_name.value} FROM {target_schema.value}.{table_name.value} ORDER BY {column_name.value} DESC LIMIT 1"
etl_df = read_data_as_dataframe(file_type = InputTypes.SQL, file_path = query, db_session= db_session)
if len(etl_df) == 0:
return_date = pd.to_datetime(etl_date.value)
else:
return_date = etl_df[column_name.value].iloc[0]
etl_time_exists = True
except Exception as e:
log_error_msg(HookSteps.RETURN_LAST_ETL_RUN.value, str(e))
finally:
return return_date, etl_time_exists
def insert_data_into_stg_tables(db_session, etl_date, target_schema = DataWareHouseSchema.SCHEMA_NAME):
try:
source_dfs_dict = readData(etl_date = etl_date.strftime("%Y-%m-%dT%H:%M:%S.%f"), limit=10000000)
stg_dfs_dict = clean_all_data(source_dfs_dict)
for df_name,stg_df in stg_dfs_dict.items():
if len(stg_df) > 0:
insert_stmt = insert_into_sql_statement_from_df(stg_df, target_schema.value, 'stg_'+df_name)
execute_return = execute_query(db_session=db_session, query= insert_stmt)
if execute_return != Errors.NO_ERROR:
raise Exception(f"{HookSteps.INSERT_INTO_STG_TABLE.value}: error executing insert_stmt.")
except Exception as e:
log_error_msg(HookSteps.INSERT_INTO_STG_TABLE.value, str(e))
def execute_hook(logger):
step = None
logger.info("Executing hook:")
try:
step = 1
logger.info("Step 1: Creating a database connection")
db_session = create_connection()
step = 2
logger.info("Step 2: Creating ETL checkpoint")
create_etl_checkpoint(DataWareHouseSchema.SCHEMA_NAME, db_session)
step = 3
logger.info("Step 3: Retrieving ETL last updated date")
etl_date, etl_time_exists = return_etl_last_updated_date(db_session)
step = 4
logger.info("Step 4: Inserting data into staging tables")
insert_data_into_stg_tables(db_session=db_session, target_schema=DataWareHouseSchema.SCHEMA_NAME, etl_date=etl_date)
step = 5
logger.info("Step 5: Executing SQL folder hook")
execute_sql_folder_hook(db_session)
step = 6
logger.info("Step 6: Inserting or updating ETL checkpoint")
insert_or_update_etl_checkpoint(db_session, etl_time_exists=etl_time_exists)
step = 7
logger.info("Step 7: Closing the database connection\n")
close_connection(db_session)
except Exception as e:
error_prefix = f'{Errors.HOOK_SQL_ERROR.value} on step {step}'
logger.error(f'{error_prefix} - {str(e)}')