-
Notifications
You must be signed in to change notification settings - Fork 3
/
Copy pathalternative_cloud_etl.py
242 lines (194 loc) · 8.47 KB
/
alternative_cloud_etl.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
import os
import logging
from airflow.providers.postgres.hooks.postgres import PostgresHook
from airflow.models import Variable
from kaggle.api.kaggle_api_extended import KaggleApi
import boto3
import pandas as pd
import requests
import awswrangler as wr
import psycopg2
import zipfile
aws_credentials_path = os.environ["AWS_CREDENTIALS_PATH"]
aws_bucket = Variable.get("s3")
_redshift_host = ''
_redshift_role_arn = ''
def extract_url_aws():
url = "https://raw.githubusercontent.com/Patcharanat/ecommerce-invoice/master/data/cleaned_data.csv"
response = requests.get(url)
data_url = response.text
# retrieve credentials
key = pd.read_csv(aws_credentials_path)
_aws_access_key_id = key["Access key ID"][0]
_aws_secret_access_key = key["Secret access key"][0]
# authenticate and upload to S3
# session = boto3.Session(
# aws_access_key_id = _aws_access_key_id,
# aws_secret_access_key = _aws_secret_access_key
# )
# s3 = session.client('s3')
object_name = "data_url_uncleaned.csv"
s3 = boto3.client('s3',
aws_access_key_id=_aws_access_key_id,
aws_secret_access_key=_aws_secret_access_key)
s3.put_object(Bucket=aws_bucket, Key=object_name, Body=data_url)
logging.info(f"File {object_name} is stored.")
def extract_database_aws():
# initiate connection
postgres_hook = PostgresHook(
postgres_conn_id="postgres-source",
schema="mydatabase"
)
conn = postgres_hook.get_conn()
cursor = conn.cursor()
# Define the SQL query to extract data
query = "SELECT * FROM myschema.ecomm_invoice"
# specify filename
csv_file = "/opt/airflow/data/unloaded_data.csv"
# Define the COPY command with the query, CSV format, and headers
copy_command = f"COPY ({query}) TO STDOUT WITH CSV HEADER"
with open(csv_file, "w", encoding='utf-8') as f: # use "w+" to create file if it not exist
cursor.copy_expert(copy_command, file=f)
# close cursor and connection
cursor.close()
conn.close()
# specify desired target file name
object_name = "data_postgres_cleaned.csv"
# retrieve credentials
key = pd.read_csv(aws_credentials_path)
_aws_access_key_id = key["Access key ID"][0]
_aws_secret_access_key = key["Secret access key"][0]
# authenticate and upload to S3
s3 = boto3.client('s3',
aws_access_key_id=_aws_access_key_id,
aws_secret_access_key=_aws_secret_access_key)
with open(csv_file, "rb") as f:
# s3.upload_fileobj(f, aws_bucket, object_name)
s3.put_object(Bucket=aws_bucket, Key=object_name, Body=f)
f.close()
os.remove(csv_file)
logging.info(f"Completed extracting data from postgres database loaded to {object_name}")
def extract_api_aws():
# authenticate and download data from Kaggle API
api = KaggleApi()
api.authenticate()
api.dataset_download_files('carrie1/ecommerce-data', path='./data/')
path_to_zip_file = './data/ecommerce-data.zip'
with zipfile.ZipFile(path_to_zip_file, "r") as zip_ref:
zip_ref.extractall('./data/')
extracted_file_path = os.path.join('./data/', 'data.csv')
# authenticate upload to S3
key = pd.read_csv(aws_credentials_path)
_aws_access_key_id = key["Access key ID"][0]
_aws_secret_access_key = key["Secret access key"][0]
s3 = boto3.client('s3',
aws_access_key_id=_aws_access_key_id,
aws_secret_access_key=_aws_secret_access_key)
object_name = 'data_api_uncleaned.csv'
s3.upload_file(extracted_file_path, aws_bucket, object_name)
# in case not remove before google task load to GCS
# os.remove(path_to_zip_file)
logging.info(f"Completed extracting data from API loaded to {object_name}")
def clean_aws(bucket_name, object_name, destination_file):
# retrieve credentials
key = pd.read_csv(aws_credentials_path)
_aws_access_key_id = key["Access key ID"][0]
_aws_secret_access_key = key["Secret access key"][0]
# Authenticate by session
session = boto3.Session(
aws_access_key_id = _aws_access_key_id,
aws_secret_access_key = _aws_secret_access_key
)
df = wr.s3.read_csv(path=f"s3://{bucket_name}/{object_name}",
boto3_session=session,
encoding='cp1252')
# Clean the data with old script
df['Description'] = df['Description'].fillna('No Description')
df['CustomerID'] = df['CustomerID'].fillna(0)
df['InvoiceDate'] = pd.to_datetime(df['InvoiceDate'])
df['total_spend'] = df['Quantity'] * df['UnitPrice']
# Convert the data types
df['InvoiceNo'] = df['InvoiceNo'].astype(str)
df['StockCode'] = df['StockCode'].astype(str)
df['Description'] = df['Description'].astype(str)
df['CustomerID'] = df['CustomerID'].astype(float).astype(int)
df['Country'] = df['Country'].astype(str)
df['total_spend'] = df['total_spend'].astype(float)
# Replace several descriptions with the most frequent description for each stock code
df['StockCode'] = df['StockCode'].str.upper()
most_freq = df.groupby('StockCode')['Description'].agg(lambda x: x.value_counts().idxmax()).reset_index()
columns_index = df.columns
df = df.drop(columns=['Description'])
df = pd.merge(df, most_freq, on='StockCode', how='left')
df = df.reindex(columns=columns_index)
# Upload the cleaned data to S3 Staging Area
df.to_parquet(destination_file, index=False)
s3 = boto3.client('s3',
aws_access_key_id=_aws_access_key_id,
aws_secret_access_key=_aws_secret_access_key)
s3.upload_file(destination_file, bucket_name, f"staging_area/{destination_file}")
def load_data_aws():
# # retrieve credentials
# key = pd.read_csv(aws_credentials_path)
# _aws_access_key_id = key["Access key ID"][0]
# _aws_secret_access_key = key["Secret access key"][0]
# # authenticate and upload to S3
# s3 = boto3.client('s3',
# aws_access_key_id=_aws_access_key_id,
# aws_secret_access_key=_aws_secret_access_key)
object_key = "staging_area/ecomm_invoice_transaction.parquet"
# Configure your Redshift connection details
redshift_host = _redshift_host # need to change everytime new created
redshift_dbname = 'mydb'
redshift_user = 'admin'
redshift_password = 'Admin123'
redshift_port = '5439'
# Establish a connection to Redshift
conn = psycopg2.connect(
host = redshift_host,
dbname = redshift_dbname,
user = redshift_user,
password = redshift_password,
port = redshift_port
)
cur = conn.cursor()
# # Use boto3 to read a sample of the file from S3 into a DataFrame for creating table dynamically
# s3 = boto3.client('s3', aws_access_key_id=_aws_access_key_id, aws_secret_access_key=_aws_secret_access_key)
# file_obj = s3.get_object(Bucket=aws_bucket, Key=object_key)
# df_sample = pd.read_parquet(file_obj['Body'], nrows=10) # Read the first 10 rows as a sample
# # Infer the column names and data types from the DataFrame sample
# column_names = df_sample.columns.tolist()
# column_data_types = {col: str(df_sample.dtypes[col]) for col in column_names}
# # Generate the CREATE TABLE SQL statement dynamically
# create_table_sql = f"CREATE TABLE IF NOT EXISTS my_dynamic_table ("
# for col_name, data_type in column_data_types.items():
# create_table_sql += f"{col_name} {data_type}, "
# create_table_sql = create_table_sql.rstrip(', ') + ");"
# Create the target table in Redshift (if it doesn't exist)
create_table_sql = """
CREATE TABLE IF NOT EXISTS ecomm_invoice_transaction (
InvoiceNo STRING,
StockCode STRING,
Description STRING,
Quantity INTEGER,
InvoiceDate TIMESTAMP,
UnitPrice FLOAT,
CustomerID INTEGER,
Country STRING,
total_spend FLOAT
);
"""
cur.execute(create_table_sql)
conn.commit()
# Load the data from the DataFrame into the Redshift table
copy_command = f"""
COPY ecomm_invoice_transaction FROM 's3://{aws_bucket}/{object_key}'
IAM_ROLE '{_redshift_role_arn}'
FORMAT AS PARQUET;
"""
cur.execute(copy_command)
conn.commit()
# Close the connection
conn.close()
if __name__ == "__main__":
pass