-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathfetch-etl-server.py
123 lines (102 loc) · 4.48 KB
/
fetch-etl-server.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
import boto3
import hashlib
import json
import psycopg2
import time
import logging
from datetime import datetime
sqs_client = boto3.client("sqs", endpoint_url="http://localhost:4566", region_name= 'us-east-1', aws_access_key_id='test', aws_secret_access_key='test')
queue_url = "http://localhost:4566/000000000000/login-queue"
# Connect to the AWS SQS queue and retrieve messages
def retrieve_sqs_messages():
messages = sqs_client.receive_message(QueueUrl=queue_url)
if "Messages" in messages:
return messages['Messages']
else:
return []
# Returns a boolean value based on whether the message is valid or not
def validate_message(data):
required_keys = ["user_id", "app_version", "device_type", "ip", "locale", "device_id"] #keys that are required in message
null_keys = ["locale"] #keys that can be null
for key in required_keys:
if key not in data:
logging.info("Invalid message: %s \nMandatory field: %s does not exist", data, key)
return False
if key not in null_keys and data[key] == None:
logging.info("Invalid message: %s \nField: %s is null", data, key)
return False
return True
# Mask the PII data (device_id and ip)
def mask_pii_data(data):
# Hash the device_id and ip fields
hashed_device_id = hashlib.sha256(data["device_id"].encode()).hexdigest()
hashed_ip = hashlib.sha256(data["ip"].encode()).hexdigest()
# Replace the original values with the hashed values
data["device_id"] = hashed_device_id
data["ip"] = hashed_ip
return data
# Convert appversion to int for storing in the table: 5.3.0 becomes 50300 (every version component ranges from 0-99)
def get_int_appversion(appversion):
l = [int(x, 10) for x in appversion.split('.')]
l.reverse()
return sum(x * (100 ** i) for i, x in enumerate(l))
# Write the data to the PostgreSQL database
def write_to_postgres(data, receipt_handle):
insert_statement = "INSERT INTO user_logins (user_id, device_type, masked_ip, masked_device_id, locale, app_version, create_date) VALUES (%s, %s, %s, %s, %s, %s, %s)"
values = (data["user_id"], data["device_type"], data["ip"], data["device_id"], data["locale"], get_int_appversion(data["app_version"]), datetime.now().date())
try:
conn = psycopg2.connect(
host="localhost",
database="postgres",
user="postgres",
password="postgres"
)
cursor = conn.cursor()
logging.info("Inserting data into user_logins for user_id: %s", data["user_id"])
cursor.execute(insert_statement, values)
conn.commit()
delete_from_sqs(receipt_handle)
except (Exception, psycopg2.Error) as error:
logging.error("Error occurred while inserting to user_logins table: %s", error)
finally:
cursor.close()
conn.close()
# Delete message from SQS
def delete_from_sqs(receipt_handle):
sqs_client.delete_message(
QueueUrl=queue_url,
ReceiptHandle=receipt_handle
)
def run_etl_process():
sleep_time = 0
while True:
# Retrieve messages from the AWS SQS queue
messages = retrieve_sqs_messages()
#add a wait time if no messages received
if messages == []:
sleep_time = 5
else:
sleep_time = 0
# Loop over the messages and perform the ETL process
for message in messages:
print(message)
receipt_handle = message['ReceiptHandle']
data = json.loads(message['Body'])
if validate_message(data):
# Mask the PII data
masked_data = mask_pii_data(data)
# Log the data after masking
logging.info("Received message from SQS: %s", masked_data)
# Write the data to the PostgreSQL database and delete message from SQS on successful processing
write_to_postgres(masked_data, receipt_handle)
else:
delete_from_sqs(receipt_handle) # Delete the invalid message from SQS
# Wait for a short period of time before checking for new messages
if sleep_time > 0:
logging.info("Waiting for new message..")
time.sleep(sleep_time)
if __name__ == "__main__":
#initialize logging
logging.basicConfig(level=logging.INFO, filename="fetch-etl-server.log", filemode="a+",
format="%(asctime)-15s %(levelname)-8s %(message)s")
run_etl_process()