-
Notifications
You must be signed in to change notification settings - Fork 7
/
Copy pathaanalytics.py
604 lines (555 loc) · 27 KB
/
aanalytics.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
# -*- coding: utf-8 -*-
"""
This library is the Adobe Analytics API for the 1.4 API version.
It works on Python and is mostly built on Classes.
@author: julienpiccini
"""
import json as _json#For reading the statement
import requests as _requests #for call to the API
import pandas as _pd
import time as _time
import os as _os
from pathlib import Path as _Path
_c_path = _Path.cwd() #get the current folder
_new_path = _c_path.joinpath('aanalytics') #create new folder
_new_path.mkdir(exist_ok=True) #create a new folder to store the data
"""
Configuration
You need to place your API key below for the Oauth 2.0 autentification
"""
"""Configuration for the token & different admin request"""
_apliId=''
_secretApli=''
_reportsuite=''
"""
basic filename function
"""
def __newfilename():
fmonth = _time.strftime("%m")
fday = _time.strftime("%d")
fhour = _time.strftime("%H")
fminute = _time.strftime("%M")
filename='report_'+fmonth+'_'+fday+'_'+fhour+'_'+fminute
return filename
"""
Configure
"""
"""
Statement class that enable to manipulate the statement easily with pre-made method.
"""
class Statement:
"""
Class to generate and / or create a statement for the Adobe Analytics API.
This class initiate an object based on 1 argument, this argument can take several possible values :
- an object that contains the dictionnary
- a file name as string, such as "filename.txt" that contains your JSON
- "new" that will create an empty statement
"""
__empty_statement = {"reportDescription":{
"reportSuiteID":"",
"dateFrom":"2018-01-01",
"dateTo":"2018-01-31",
"metrics":[{"id":"visits"}],
"sortBy":"visits",
"elements":[],
"segments":[]}
}
def _initiate_elements(self,statement):#initiate the different elements
self.statement = statement # return the dict
self.start_dates = self.statement['reportDescription']['dateFrom']
self.end_dates = self.statement['reportDescription']['dateTo']
self.metrics = [x['id'] for x in statement['reportDescription']['metrics']]
self.segments = [x['id'] for x in statement['reportDescription']['segments']]
self.dimensions = statement['reportDescription']['elements']
if 'source' in statement['reportDescription'].keys() :
if statement['reportDescription']['source'] == 'warehouse':#Check if warehouse asked
self.export = "csv"
else:
self.export = "json"
else:
self.export = "json"
if 'dateGranularity' in statement['reportDescription'].keys():#Check if granularity
self.date_granularity = statement['reportDescription']['dateGranularity']
else:
self.date_granularity = None
return self.statement
def __init__(self,*elements):
if len(elements) == 0:
print('an argument is required')
for element in elements:
if '.txt' in element :
with open(element,'r') as f:
read_statement = f.read()
try:
stat_json = _json.loads(read_statement)
self._initiate_elements(stat_json)# initiate the different objects
except :
print('Error reading your statement.\nPlease verify.')
elif type(element) is dict:
self._initiate_elements(element)# initiate the different objects
elif element == 'new':
obj=self.__empty_statement
self._initiate_elements(obj)
def __str__(self):
return 'Return an Adobe Analytics statement as object to work with underlying method'
def __repr__(self):
return str(self.statement)
def add_metrics(self,*metrics):
""" Add metrics to your statement, this method can take several elements"""
for metrics in metrics:
self.statement['reportDescription']['metrics'].append({'id':metrics})
self.metrics = [x['id'] for x in self.statement['reportDescription']['metrics']]##update the variables
def remove_metrics(self,*metrics):
""" Remove metrics to your statement, this method can take several elements"""
for metrics in metrics:
self.statement['reportDescription']['metrics'].remove({'id':metrics})
self.metrics = [x['id'] for x in self.statement['reportDescription']['metrics']]##update the variables
def add_segments(self,*segments):
""" Add segments to your statement, this method can take several elements"""
for segment in segments:
self.statement['reportDescription']['segments'].append({'id':segment})
self.segments = [x['id'] for x in self.statement['reportDescription']['segments']]##update the variables
def remove_segments(self, *segments):
""" Remove segments to your statement, this method can take several elements """
for segment in segments:
self.statement['reportDescription']['segments'].remove({'id':segment})
self.segments = [x['id'] for x in self.statement['reportDescription']['segments']]##update the variables
def add_dimensions(self,*dimensions):
""" Add a dimension to your statement, this method can take several elements"""
for dim in dimensions:
self.dimensions.append(dim)
def remove_dimensions(self, *dimensions):
""" Remove dimensions """
for dim in dimensions:
self.dimensions.remove(dim)
def add_granularity(self,granularity):
""" Add granularity to your statement (month, day, etc...)"""
self.statement['reportDescription']['dateGranularity'] = granularity
def remove_granularity(self):
""" Remove the granularity from your statement """
del self.statement['reportDescription']['dateGranularity']
def add_top(self,dim=None,top=10):
""" Change the number of element to retrieve for a specific dimension
By default, your statement return the top 10 for normal reporting.
Return everything for Data Warehouse.
"""
for dimension in self.statement['reportDescription']['elements']:
if dim == dimension['id']:
dimension['top']=top
self.dimensions = self.statement['reportDescription']['elements']##update the variable
def change_dates(self,start=None,end=None):
""" Change the start dates or end dates of your statement depending on the argument you pass
"""
if start is not None :
self.statement['reportDescription']['dateFrom'] = start
self.start_dates = self.statement['reportDescription']['dateFrom']
if end is not None :
self.statement['reportDescription']['dateTo'] = end
self.end_dates = self.statement['reportDescription']['dateTo']
def add_source(self):
""" Add Datawarehouse as source. Be careful as calculated metric,
some segments and events are not supported by data wareouse request
"""
self.statement['reportDescription']['source'] = 'warehouse'
self._initiate_elements(self.statement)
def remove_source(self):
""" remove Datawarehouse as source.
"""
del self.statement['reportDescription']['source']
self._initiate_elements(self.statement)
def _getToken(_apliId,_secretApli):
"""
Internal function to retrieve token for the API. Oauth usage.
"""
urladobetoken ='https://api.omniture.com/token'
params = {'grant_type':'client_credentials','client_id':_apliId,'client_secret':_secretApli}
query = _requests.post(urladobetoken, data=params)
data = _json.loads(query.text)
token = str(data['access_token'])
return token
"""
General API variables to be set
"""
_dict_api = {
'endpoint':'https://api.omniture.com/admin/1.4/rest/', #1.4 endpoint
'reportSuite' : {
'geteVars':'ReportSuite.GetEvars',
'getprops':'ReportSuite.GetProps',
'getevents':'ReportSuite.GetEvents'},
'calcmetrics':{
'getcalcmetrics':'CalculatedMetrics.Get'
},
'segments':{
'getsegments':'Segments.Get'
},
'Report':{
'create':'Report.Queue',
'check':'Report.GetQueue',
'retrieve':'Report.Get'
}
}
#Function to retrieve Admin elements (evars, props, metrics, events, segments)
def getElements(*elements,export=True,return_data=True):
"""
This function can take multiple predefined arguments, as well as some optional key value pairs.
the possibles arguments are :
- evars : list of evars on your reportsuite
- props : list of props on your reportsuite
- events : list of events on your reportsuite
- calcmetrics : list of calculated metrics on your reportsuite
- segments : list of segments on your reportsuite
the optional key values pairs are :
- export = determine if a csv file will be created with the data (default : True)
- return_data = determine if a dictionnary of dataframe will be return (default : True)
"""
token = _getToken(_apliId,_secretApli)#retrieve the token and reportSuite
r_statement = {"rsid_list":[_reportsuite]}##statement for reportSuite method
a_statement = {"accessLevel":"all"}
df_all = {}
for element in elements:
if element == 'evars':
reqReport = _requests.post(url=_dict_api['endpoint'], params={"method": _dict_api['reportSuite']['geteVars'], "access_token": token},json=r_statement)
responseReport = reqReport.json()[0]
df_fullevars = _pd.DataFrame(responseReport['evars'])
order_cols = ['id','name','type','enabled','allocation_type','expiration_type','merchandising_syntax','expiration_custom_days','description']
df_order_evars = df_fullevars[order_cols]
if export:
df_order_evars.to_csv(_new_path.as_posix()+'/evars.csv',index=False)
df_all['evars'] = df_order_evars
elif element == 'props':
reqReport = _requests.post(url=_dict_api['endpoint'], params={"method": _dict_api['reportSuite']['getprops'], "access_token": token},json=r_statement)
responseReport = reqReport.json()[0]
df_fullprops = _pd.DataFrame(responseReport['props'])
order_cols = ['id','name','enabled','participation_enabled','pathing_enabled','list_enabled','list_delimiter','case_insensitive','case_insensitive_date_enabled','description']
df_order_props = df_fullprops[order_cols]
if export:
df_order_props.to_csv(_new_path.as_posix()+'/props.csv',index=False)
df_all['props'] = df_order_props
elif element == 'events':
reqReport = _requests.post(url=_dict_api['endpoint'], params={"method": _dict_api['reportSuite']['getevents'], "access_token": token},json=r_statement)
responseReport = reqReport.json()[0]
df_fullevents = _pd.DataFrame(responseReport['events'])
order_cols = ['id','name','type','participation','serialization','polarity','description','default_metric','visibility']
df_order_events = df_fullevents[order_cols]
if export:
df_order_events.to_csv(_new_path.as_posix()+'/events.csv',index=False)
df_all['events']= df_order_events
elif element == 'calcmetrics':
reqReport = _requests.post(url=_dict_api['endpoint'], params={"method": _dict_api['calcmetrics']['getcalcmetrics'], "access_token": token},json=a_statement)
responseReport = reqReport.json()
df_calcMetrics = _pd.DataFrame(responseReport)
if export:
df_calcMetrics.to_csv(_new_path.as_posix()+'/calcmetrics.csv',index=False)
df_all['calcmetrics'] = df_calcMetrics
elif element == 'segments':
reqReport = _requests.post(url=_dict_api['endpoint'], params={"method": _dict_api['segments']['getsegments'], "access_token": token},json=a_statement)
responseReport = reqReport.json()
df_segments = _pd.DataFrame(responseReport)
if export:
df_segments.to_csv(_new_path.as_posix()+'/calcmetrics.csv',index=False)
df_all['segments'] = df_segments
#something
if export:##if the option has been selected
print('Please find the data on this folder : '+_new_path.as_posix())
if return_data : #if the option has been selected
return df_all
#function to check if the statement is correct.
#Work with a file and a dict
def _checkStatement(statement):
if '.txt' in statement : #if the statement is a txt file
try :
with open(statement, 'r') as f:
read_statement = f.read()
stat_json = _json.loads(read_statement)
try:
if stat_json['reportDescription']['source'] == 'warehouse':
export = 'csv'
else:
export = 'json'
del stat_json['reportDescription']['source']
except:
export = 'json'
except:
print('error with your statement. Please verify your file.')
exit()
elif type(statement) == dict:
stat_json = statement
try:
if stat_json['reportDescription']['source'] == 'warehouse':
export = 'csv'
else:
export = 'json'
del stat_json['reportDescription']['source']
except:
export = 'json'
else :
raise Exception("error with your statement. Please verify")
print('error with your statement. Please verify')
exit()
return stat_json, export
def _save_reportID(_report_id,_export):
""" This function will save the report id for further usage """
data = {'report_id':[_report_id],'request_type':[_export]}
save = _pd.DataFrame(data)
if 'save_report_id.txt' in _os.listdir(_new_path.as_posix()):
try:
df=_pd.read_csv('save_report_id.txt')
df = df.append(save)
df.to_csv('save_report_id.txt',index=False)
except:
save.to_csv('save_report_id.txt',index=False)
else:
save.to_csv('save_report_id.txt',index=False)
#Call to create report and save the reportID
def _CreateReport(statement,_token,_export):
""" This function send the request to Adobe and return the reportID"""
reqReport = _requests.post(url=_dict_api['endpoint'], params={"method": _dict_api['Report']['create'], "access_token": _token, },json=statement)
j_response = reqReport.json()
report_id = j_response['reportID']
return report_id
def _getQueue(_token,_reportID):
""" This function check the queue and if the report ID is in the queue """
reqQueue = _requests.post(url=_dict_api['endpoint'], params={"method": _dict_api['Report']['check'], "access_token": _token })
responseQueue = reqQueue.json()
if(len(responseQueue)==0):#if there is no queue
return True
else :#if there is a queue
for report_nb in range(len(responseQueue)):
if responseQueue[report_nb]['reportID'] != int(_reportID):
status_queue = True
elif responseQueue[report_nb]['reportID'] == int(_reportID):
return False
else:
return True
return status_queue
def _reportGet(_token,_reportid,_export):
""" API request to retrieve the data """
reqGet = _requests.post(url=_dict_api['endpoint'], params={"method":_dict_api['Report']['retrieve'], "access_token": _token, },json={"reportID":int(_reportid),"format":_export})
responseGet = reqGet ##cannot return a json because of DW
return responseGet
## Wait for the DW
def _waitingDataWarehouse():
_time.sleep(10*60)
## Check the status for the report ID
def _returnStatusDW(_reportID,_apliID,_secretApli):
""" Function to check the datawarehouse status
It returns True as value when the function succeed to retrieve data.
1 additional value returned : response from the report requested.
"""
_waitingDataWarehouse()##Waiting 10 mn
_token = _getToken(_apliID,_secretApli)##Creating new token
try :
raw_data = _reportGet(_token,_reportID,'csv').json()
if 'error' in raw_data.keys():
if raw_data['error'] == 'report_not_ready':
return False, 'report_not_ready'
except:
try:
raw_data = _reportGet(_token,_reportID,'csv') ##if the response is the data
return True, raw_data
except :
return False, []
def _data_preparation(_raw_data,_export):
"""data preparation for further steps.
This function return directly the dataframe for DW
It returns a json report for normal report"""
if _export == 'csv':
data_warehouse = _raw_data.content.decode("utf-8").replace('"','').splitlines()
dw_splitComma = [x.split(',') for x in data_warehouse]
df = _pd.DataFrame(dw_splitComma)
df.columns = [df.iat[0,x].replace('\ufeff','') for x in range(len(df.columns))] ## assign column name & clean names
df.drop(0,inplace=True)
df.dropna(inplace=True)
return df
else: ### if the report is normal report
dict_overview = _raw_data['report']
return dict_overview
def _csdmpr(j_report):
""" retrieve the column,segments, Length, Dimension, Metrics, Period, Report Type from the data"""
metrics = list() #retrieve the metrics as list
dict_overview = j_report
period = dict_overview['period']
if 'segments' in dict_overview.keys():
segments = [x['name'] for x in dict_overview['segments']]
metrics = [x['name'] for x in dict_overview['metrics']]
dim = [x['name'] for x in dict_overview['elements']]
###Check what kind of report it is
report_type = dict_overview['type']
if report_type =='ranked' or report_type =='overtime':
columns = dim+metrics
elif report_type=='trended':
columns = ["date"]+dim+metrics
return columns,segments, dim, metrics,period, report_type
def _data_retrieve(data,row=[],rows=[],start=False):#takes dict_overview['data'] as input
if start:##Sure to clean the rows at the beginning
rows=[]
if type(data) == list :
if len(row) == 0:
for x in range(len(data)):
_data_retrieve(data[x],rows=rows)
elif len(row) >0:
for x in range(len(data)):
_data_retrieve(data[x],row=row,rows=rows)
elif type(data) == dict:
if 'breakdownTotal' in data.keys(): #not the last element
name = [data['name']]
row = row + name
_data_retrieve(data['breakdown'],row=row,rows=rows)
elif 'counts' in data.keys():
name = [data['name']]
counts = data['counts']
row = row + name + counts
rows.append(row)
return _pd.DataFrame(rows)
def getReport(statement,export=True,return_data=True,recursive=False,verbose=False,safe_mode=False):
"""
This function takes 1 required arguments and 5 optionnals.
It returns a file and / or a dataframe with the data requested.
arguments :
statement : REQUIRED : statement for the data ask to Adobe
export : OPTIONNAL : boolean to determine if a csv file is going to be created (default : True)
return_data : OPTIONNAL : boolean to determine if a dataframe is returned (default : True)
recursive : OPTIONNAL : for non DW reports, automatically retrieve 50K rows for the first element until there is no more rows to fetch. (default : False)
verbose : OPTIONNAL : print comments if you want to follow the status of the request (default : False)
safe_mode : OPTIONNAL : save the report ID created into a file (default : False)
"""
_statement, _export = _checkStatement(statement)
if verbose:
print('your statement : \n'+str(_statement))
_token = _getToken(_apliId,_secretApli)#retrieve the token and reportSuite
if verbose:
print('Token is retrieved')
if _export == 'csv': ##if a datawarehouse request
_reportId = _CreateReport(_statement,_token,_export)
if safe_mode :
_save_reportID(_reportId,_export)
if verbose:
print('report ID retrieved & saved.\report ID : '+str(_reportId))
if verbose:
print('report ID retrieved : '+str(_reportId))
status_DW, _raw_data = _returnStatusDW(_reportId,_apliId,_secretApli)
while not status_DW:
if verbose:
print('Adobe processing...')
status_DW, _raw_data = _returnStatusDW(_reportId,_apliId,_secretApli)
df = _data_preparation(_raw_data,_export)
_filename = __newfilename()
if export:
df.to_csv(_new_path.as_posix()+'/'+_filename+'.csv',index=False)
if verbose:
print('File has been created in this folder: '+_new_path.as_posix())
if return_data:
return df
if _export == 'json':##if a normal request
if not recursive:## normal request with limited number of rows
_reportId = _CreateReport(_statement,_token,_export)
if safe_mode:
_save_reportID(_reportId,_export)
if verbose:
print('report ID retrieved & saved.\report ID : '+str(_reportId))
if verbose:
print('report ID retrieved : '+str(_reportId))
status = _getQueue(_token,_reportId)
if not status and verbose:
print('data are processed by Adobe')
while not status:
_time.sleep(60)
status = _getQueue(_token,_reportId)
_raw_data = _reportGet(_token,_reportId,_export).json()
if verbose:
print('Data retrieved')
j_report = _data_preparation(_raw_data,_export)
j_data = j_report['data']
columns, segments, dimensions, metrics, period, report_type = _csdmpr(j_report)
df = _data_retrieve(j_data,start=True) ##Start clean previous call data
df.columns = columns#rename the columns
_filename = __newfilename()
if export:
df.to_csv(_new_path.as_posix()+'/'+_filename+'.csv',index=False)
if verbose:
print('File has been created in this folder: '+_new_path.as_posix())
if return_data:
return df
else : ## normal request with unlimited number of rows
rows = 50000
start_with = 1
df_all = _pd.DataFrame() ##to regroup all the dataframe
statement['reportDescription']['elements'][0]['top'] = rows
while rows == 50000:
if verbose:
iter = round(start_with/50000)+1
print(str(iter) + ' iteration')
print('start with : '+str(start_with))
statement['reportDescription']['elements'][0]['startingWith'] = start_with
_reportId = _CreateReport(_statement,_token,_export)
status = _getQueue(_token,_reportId)
while not status:
if verbose:
print('data are processed by Adobe')
_time.sleep(60)
status = _getQueue(_token,_reportId)
_raw_data = _reportGet(_token,_reportId,_export).json()
if verbose:
print('iteration of data retrieved')
j_report = _data_preparation(_raw_data,_export)
j_data = j_report['data']
columns, segments, dimensions, metrics, period, report_type = _csdmpr(j_report)
element2check = dimensions[0]
if verbose:
print('element to check for unlimited data : '+str(element2check))
df = _data_retrieve(j_data,start=True)
df.columns = columns#rename the columns
rows = len(df[element2check].unique())
df_all = df_all.append(df)
if verbose:
print('number of rows : '+str(rows))
start_with += 50000
_filename = __newfilename()
del statement['reportDescription']['elements'][0]['startingWith']##remove the element that has been created during loop
del statement['reportDescription']['elements'][0]['top'] ##remove the element that has been created during loop
if export:
df_all.to_csv(_new_path.as_posix()+'/'+_filename+'.csv',index=False)
if verbose:
print('File has been created in this folder: '+_new_path.as_posix())
if return_data:
return df_all
def retrieveReport(report_id, request_type='json',export=True,return_data=True,verbose=False):
"""
This function takes 1 required arguments and 4 optionnals.
It returns a file and / or a dataframe with the data requested. Cannot do recursive requests.
report_id : REQUIRED : The report id that you want to retrieve data from
request_type : OPTIONNAL : The type of request you made to Adobe. It has been written in your save_report_id.txt file.
export : OPTIONNAL : boolean to determine if a csv file has to be created (default : True)
return_data : OPTIONNAL : boolean to determine if a dataframe is returned (default : True)
verbose : OPTIONNAL : print comments if you want to follow the status of the request (default : False)
"""
report_id = int(report_id)
_token = _getToken(_apliId,_secretApli)#retrieve the token and reportSuite
if verbose:
print('Token is retrieved')
if request_type=='json':
_raw_data = _reportGet(_token,report_id,request_type).json()
if verbose:
print('Data retrieved')
j_report = _data_preparation(_raw_data,request_type)
j_data = j_report['data']
columns, segments, dimensions, metrics, period, report_type = _csdmpr(j_report)
df = _data_retrieve(j_data,start=True) ##Start clean previous call data
df.columns = columns#rename the columns
_filename = __newfilename()
if export:
df.to_csv(_new_path.as_posix()+'/'+_filename+'.csv',index=False)
if verbose:
print('File has been created in this folder: '+_new_path.as_posix())
if return_data:
return df
elif request_type=='csv':
_raw_data = _reportGet(_token,report_id,request_type)
df = _data_preparation(_raw_data,request_type)
_filename = __newfilename()
if export:
df.to_csv(_new_path.as_posix()+'/'+_filename+'.csv',index=False)
if verbose:
print('File has been created in this folder: '+_new_path.as_posix())
if return_data:
return df