-
Notifications
You must be signed in to change notification settings - Fork 0
/
aws_lambda_ga_event_db_insert.py
150 lines (121 loc) · 4.69 KB
/
aws_lambda_ga_event_db_insert.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
"""
Analytics Reporting API V4 to retrieve event click data and write to a postgreSQL database.
"""
import datetime
import psycopg2
import argparse
import logging
import credentials
from apiclient.discovery import build
from oauth2client.service_account import ServiceAccountCredentials
import httplib2
def lambda_handler(event, context):
# Analytics credential config
SCOPES = ['https://www.googleapis.com/auth/analytics.readonly']
KEY_FILE_LOCATION = 'key_file_location.json'
VIEW_ID = credentials.analytics['view_id']
print('Loaded credentials')
# Postgres connection config
HOST = credentials.db['host']
PORT = credentials.db['port']
DB_NAME = credentials.db['db_name']
USER_NAME = credentials.db['user_name']
PASSWORD = credentials.db['password']
print('Loaded db credentials')
# Connection to Postgres db
conn_string = f"host={HOST} port={PORT} dbname={DB_NAME} user={USER_NAME} password={PASSWORD}"
print('connection string built')
conn = psycopg2.connect(conn_string)
print('connected to postgres')
# Open a cursor
cursor = conn.cursor()
print('opened a cursor')
def initialize_analyticsreporting():
"""Initializes an analyticsreporting service object.
Returns:
analytics an authorized analytics reporting service object.
"""
print("Intializing analytics")
credentials = ServiceAccountCredentials.from_json_keyfile_name(
KEY_FILE_LOCATION, SCOPES)
logging.getLogger('googleapicliet.discovery_cache').setLevel(logging.ERROR)
# Build the service object.
analytics = build('analyticsreporting', 'v4', credentials=credentials)
print("Analytics initialized")
return analytics
def get_analytics(analytics):
print("Getting analytics data")
# Use the Analytics Service Object to query the Analytics Reporting API V4.
return analytics.reports().batchGet(
body = {
'reportRequests': [{
'viewId': VIEW_ID,
'dateRanges': [{
'startDate': '10daysAgo', #'10daysAgo', #'2017-01-01'
'endDate': 'yesterday' #'today'
}],
'metrics': [{
'expression': 'ga:totalEvents'
}],
'dimensions': [{
'name': 'ga:date'
}, {
'name': 'ga:pageTitle'
}, {
'name': 'ga:medium'
}, {
'name': 'ga:source'
}, {
'name': 'ga:pagePath'
}],
"dimensionFilterClauses": [{
"operator": "AND",
"filters": [{
"dimensionName": "ga:pagePath",
"operator": "PARTIAL",
"expressions": [
"test/test"
]
}, {
"dimensionName": "ga:eventAction",
"operator": "EXACT",
"expressions": [
"eventAction"
]
}]
}],
}]
}
).execute()
def write_to_db(response):
print("Writing to db")
if response.get('reports', []):
cursor.execute("""DELETE from public.event_reg_clicks
WHERE date > current_date - interval '11 day'""")
for report in response.get('reports', []):
rows = report.get('data', {}).get('rows', [])
for row in rows:
dimensions = row.get('dimensions', [])
dimensions[0] = datetime.datetime.strptime(dimensions[0], '%Y%m%d')
dimensions[1] = dimensions[1].split(' - ')[0]
if len(dimensions[4]) > 100:
dimensions[4] = dimensions[4].split('#')[0]
else:
dimensions[4] = dimensions[4].replace('/test/test/', '')
metrics = row.get('metrics', [])
for metric in metrics:
metrics = dimensions + metric.get('values', [])
cursor.execute("""INSERT INTO public.event_reg_clicks (date, event, medium, source, page, click)
VALUES(%s, %s, %s, %s, %s, %s)""", (metrics[0], metrics[1], metrics[2], metrics[3],
metrics[4], metrics[5]))
def close_db():
# Close cursor and db connection
cursor.close()
conn.close()
def main():
analytics = initialize_analyticsreporting()
response = get_analytics(analytics)
write_to_db(response)
conn.commit() # commit to db
close_db()
main()