Skip to content

Commit

Permalink
added README.md - summer cleaning
Browse files Browse the repository at this point in the history
  • Loading branch information
matteosdocsity committed Jul 3, 2020
1 parent cea0e16 commit 3cc3984
Show file tree
Hide file tree
Showing 10 changed files with 242 additions and 78 deletions.
24 changes: 24 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
# PY GCLOUD CONNECTORS
Python utilities to simplify connection with Google APIs

## Google Wrappers
- `BigQueryConnector` to read and cast pandas DataFrame from BigQuery

- `GAnalyticsConnector` to unsample data and return pandas DataFrame from Google Analytics

- `GDriveConnector` to download, upload, search and rename files from Google Drive

- `GSCConnector` to get data from Google Search Console

- `GSheetsConnector` to read and upload pandas DataFrame from / to Google Spreadsheet

- `GStorageConnector` to write pandas DataFrame in parquet format to Google Cloud Storage


### Bonus

- `ForeignExchangeRatesConverter` to get currency conversion rates

- `LTVCalculator` to compute Customer Lifetime Value

- `pd_utils` to derive month, quarter column from date in pandas DataFrame
70 changes: 65 additions & 5 deletions gcloud_connectors/bigquery.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,68 @@ def __init__(self, project_id, confs_path=None, auth_type='service_account', jso

self.service = bigquery.Client(project=self.project_id, credentials=self.creds)

@staticmethod
def pd_cast_dtypes(df, table_dtypes):
"""
:param df: pandas DataFrame coming from BigQuery sql
:param table_dtypes: pandas dtypes
:return: pandas DataFrame casted
"""
table_dtypes = {col: d_type for col, d_type in table_dtypes.items() if col in df.columns}
try:
df = df.astype(table_dtypes)
except Exception as e:
print(e)
for col, d_type in table_dtypes.items():
if col in df.columns:
try:
if d_type == 'integer':
d_type = int
if d_type == 'float':
d_type = float
if d_type == 'string' or d_type == 'category' or d_type == str:
d_type = str
if d_type in (str, float, int):
if d_type in (float, int):
pass
# df[col] = pd.to_numeric(df[col], errors='coerce')
df[col] = df[col].astype(d_type)
if d_type in ('boolean', 'bool'):
try:
df[col] = df[col].astype(int).fillna(False)
except Exception as e:
pass
df[col] = df[col].replace({1: True, 0: False})
except Exception as e:
print('No casting for {} into {} from {}'.format(col, d_type, df[col].dtype))
print(e)
return df

def pd_get_dtypes(self, dataset, table):
"""
:param dataset: BigQuery dataset name
:param table: BigQuery table name
:return: dict of {col1: dtype1, col2: dtype1, col3: dtype3....}
"""
table_ref = '{}.{}'.format(dataset, table)
table = self.service.get_table(table_ref)
schema_dtypes = {}
for schema in table.schema:
if schema.field_type in ['STRING']:
type = str
elif schema.field_type in ['DATE', 'TIMESTAMP']:
type = 'datetime64[ns]'
elif schema.field_type in ['FLOAT']:
type = schema.field_type.lower()
elif schema.field_type in ['INTEGER']:
type = 'int'
elif schema.field_type in ['BOOLEAN']:
type = 'bool'
else:
raise AttributeError('Unknown type {} for {}'.format(schema.field_type, schema.name))
schema_dtypes[schema.name] = type
return schema_dtypes

def get_pandas_dtypes(self, dataset, table):
table_ref = '{}.{}'.format(dataset, table)
table = self.service.get_table(table_ref)
Expand All @@ -48,14 +110,12 @@ def get_pandas_dtypes(self, dataset, table):

def pd_execute(self, query, progress_bar_type=None, bqstorage_enabled=False):
"""
:param bqstorage_enabled:
:param query:
:param query: sql query
:param progress_bar_type:
:return:
:param bqstorage_enabled: whether to user or not bqstorage to download results more quickly
:return: pandas DataFrame from BigQuery sql execution
"""

# Use a BigQuery Storage API client to download results more quickly.
if bqstorage_enabled is True:
bqstorage_client = BigQueryStorageClient(
credentials=self.creds
Expand Down
5 changes: 5 additions & 0 deletions gcloud_connectors/currency_converter.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,11 @@

class ForeignExchangeRatesConverter:
def get_exchange_by_date(self, date, base_currency='EUR'):
"""
:param date: date for currency conversion rates in %Y-%m-%d
:param base_currency: currency in iso format 3 to discover conversions
:return: json response of currency conversion
"""
url = "https://api.exchangeratesapi.io/{date}?base={base_currency}" \
.format(date=date, base_currency=base_currency)
response = requests.request("GET", url)
Expand Down
57 changes: 31 additions & 26 deletions gcloud_connectors/ganalytics.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,16 +44,27 @@ def __init__(self, confs_path=None, auth_type='service_accounts', json_keyfile_d
self.management_service = None

def get_segments_by_id(self, segment_id):
self.management_service = build('analytics', 'v3', http=self.http,
# discoveryServiceUrl=('https://analyticsreporting.googleapis.com/$discovery/rest')
)
self.management_service = build('analytics', 'v3', http=self.http)
segments = self.management_service.management().segments().list().execute().get('items', [])
for segment in reversed(segments):
pass

@retry(googleapiclient.errors.HttpError, tries=3, delay=2)
def pd_get_report(self, view_id, start_date, end_date, metrics, dimensions, filters=None, page_size=100000,
page_token=None, comes_from_sampling=False, segments=None):
"""
:param view_id: Google Analytics view id
:param start_date: start_date to get data
:param end_date: end_data to get data
:param metrics: list of ga: metrics
:param dimensions: list of ga: dimensions
:param filters: filters in string type format. Ex. ga:deviceCategory==tablet
:param page_size: max size of results per page
:param page_token: token that identifies the page of results (depending on page_size)
:param comes_from_sampling: to better debug unsampling capabilities, only with active logger initiated with GAnalyticsConnector
:param segments: list of Google Analytics segments in format ['gaid::...', ...]
:return: pandas DataFrame unsampled results from Google Analytics
"""
self.logger.info('start view {view_id} from sampled data {comes_from_sampling} '
'from {start_date} to {end_date} '
'with metrics ({metrics}), dimensions ({dimensions}), filters ({filters}), page_token {page_token}'
Expand Down Expand Up @@ -82,25 +93,14 @@ def pd_get_report(self, view_id, start_date, end_date, metrics, dimensions, filt
segments[0], dict) else segments
body['reportRequests'][0]["dimensions"].append({"name": "ga:segment"})

# try:
# response = self.service.reports().batchGet(body=body).execute()
# except googleapiclient.errors.HttpError as e:
# json_error = json.loads(e.content)
# if json_error.get('error',{}).get('code') != 403:
# raise GAError(e)
# elif json_error.get('error',{}).get('message', '').startswith('User does not have sufficient permissions'):
# raise GAPermissionError(e)
# else:
# raise e

response = self.service.reports().batchGet(body=body).execute()

df = self.get_df_from_response(response, dimensions, metrics)
if response['reports'][0]['data'].get('samplesReadCounts') is not None:
self.logger.info('unsampling for {} {}'.format(start_date, end_date))
# differenza di start_date meno end_date in giorni poi dividere la prima chiamata in due:
# start_date fino alla metà della differenza in giorni
# metà della differenza in giorni + 1 fino a end_date
# difference of start_date minus end_date in days, then split api of api calls in two parts:
# start_date until half difference in days
# half difference in days plus one day until end_date
start_date_dt = datetime.strptime(start_date, '%Y-%m-%d')
end_date_dt = datetime.strptime(end_date, '%Y-%m-%d')
middle_dt = start_date_dt + (end_date_dt - start_date_dt) / 2
Expand Down Expand Up @@ -134,13 +134,12 @@ def pd_get_report(self, view_id, start_date, end_date, metrics, dimensions, filt
def pd_get_raw_report(self, report_request, dimensions, metrics, page_size=10000, page_token=None,
comes_from_sampling=False):
"""
Useful if combined with Export Reporting API v4 taken from Da Vinci Tools Chrome Extension https://chrome.google.com/webstore/detail/da-vinci-tools/pekljbkpgnpphbkgjbfgiiclemodfpen
:param report_request: accepts only one report request at time to unsample correctly
:param dimensions:
:param metrics:
:param page_size:
:param page_token:
:param comes_from_sampling:
:param page_size: max size of results per page
:param page_token: token that identifies the page of results (depending on page_size)
:param comes_from_sampling: to better debug unsampling capabilities, only with active logger initiated with GAnalyticsConnector
:return: pandas DataFrame unsampled results from Google Analytics
:return:
"""
start_date = report_request['reportRequests'][0]['dateRanges'][0]['startDate']
Expand All @@ -154,9 +153,9 @@ def pd_get_raw_report(self, report_request, dimensions, metrics, page_size=10000

if response['reports'][0]['data'].get('samplesReadCounts') is not None:
self.logger.info('unsampling for {} {}'.format(start_date, end_date))
# differenza di start_date meno end_date in giorni poi dividere la prima chiamata in due:
# start_date fino alla metà della differenza in giorni
# metà della differenza in giorni + 1 fino a end_date
# difference of start_date minus end_date in days, then split api of api calls in two parts:
# start_date until half difference in days
# half difference in days plus one day until end_date
start_date_dt = datetime.strptime(start_date, '%Y-%m-%d')
end_date_dt = datetime.strptime(end_date, '%Y-%m-%d')
middle_dt = start_date_dt + (end_date_dt - start_date_dt) / 2
Expand Down Expand Up @@ -190,6 +189,12 @@ def pd_get_raw_report(self, report_request, dimensions, metrics, page_size=10000

@staticmethod
def get_df_from_response(response, dimensions, metrics):
"""
:param response: raw response from Google Analytics API
:param dimensions: list of Google Analytics dimensions
:param metrics: list of Google Analytics metrics
:return: pandas DataFrame of results from response
"""
data_dic = {f"{i}": [] for i in dimensions + metrics}
for report in response.get('reports', []):
rows = report.get('data', {}).get('rows', [])
Expand Down
71 changes: 57 additions & 14 deletions gcloud_connectors/gdrive.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,10 @@
from googleapiclient.http import MediaIoBaseDownload, MediaFileUpload
from oauth2client.service_account import ServiceAccountCredentials



SCOPES = ['https://www.googleapis.com/auth/drive', 'https://www.googleapis.com/auth/drive.file']


class GDriveConnector():
class GDriveConnector:
def __init__(self, confs_path=None, auth_type='service_accounts', json_keyfile_dict=None, logger=None):
self.confs_path = confs_path
self.auth_type = auth_type
Expand Down Expand Up @@ -44,7 +42,8 @@ def __init__(self, confs_path=None, auth_type='service_accounts', json_keyfile_d
self.creds = pickle.load(token)

obj = lambda: None
lmao = {"auth_host_name": 'localhost', 'noauth_local_webserver': 'store_true', 'auth_host_port': [8081, 8090],
lmao = {"auth_host_name": 'localhost', 'noauth_local_webserver': 'store_true',
'auth_host_port': [8081, 8090],
'logging_level': 'ERROR'}
for k, v in lmao.items():
setattr(obj, k, v)
Expand All @@ -58,11 +57,19 @@ def __init__(self, confs_path=None, auth_type='service_accounts', json_keyfile_d
self.creds = tools.run_flow(flow, store, obj)
self.service = build('drive', 'v3', credentials=self.creds)

def download_file(self, file_id, path, num_retries=50):
def download_file(self, file_id, path, supports_team_drives=None, supports_all_drives=None, num_retries=50):
"""
:param file_id: Drive file id
:param path: path to save files
:param supports_team_drives: whether to support or not team drives
:param supports_all_drives: whether to support or all drives
:param num_retries: number of retries if download leads to errors
:return: Drive file id
"""

request = self.service.files().get_media(fileId=file_id)
request = self.service.files().get_media(fileId=file_id, supportsTeamDrives=supports_team_drives,
supportsAllDrives=supports_all_drives)

# replace the filename and extension in the first field below
fh = io.FileIO(path, mode='w')
downloader = MediaIoBaseDownload(fh, request)
done = False
Expand All @@ -71,22 +78,58 @@ def download_file(self, file_id, path, num_retries=50):
self.logger.info("Download %d%%." % int(status.progress() * 100))
return done

def upload_file(self, file_path, parent_id='', mime_type='application/pdf', description=None):
def upload_file(self, file_path, parent_id='', mime_type='application/pdf', description=None,
supports_team_drives=None):
"""
:param file_path: path for file to be uploaded
:param parent_id: parent id for Drive folder.
If not specified as part of a create request, the file will be placed directly in the user's My Drive folder
:param mime_type: file mime type. Ex. application/pdf
:param description: optional file description
:param supports_team_drives: whether to support or not team drives
:return: Drive file id
"""
file_name = file_path.rsplit('/')[-1] if len(file_path.rsplit('/')) else file_path.rsplit('\\')[-1]
file_metadata = {'name': file_name, 'parents': [parent_id], 'mimeType': mime_type, 'title': file_name}
# file_metadata = {'name': file_name, 'mimeType': mime_type, 'title': file_name}

if description is not None:
file_metadata['description'] = description
media_body = MediaFileUpload(file_path, mimetype=mime_type, resumable=True)
# file = self.service.files().create(body=file_metadata,
# media_body=media_body,
# fields='id').execute()
file = self.service.files().create(
body=file_metadata,
media_body=media_body, fields='id').execute()
media_body=media_body, fields='id', supportsTeamDrives=supports_team_drives).execute()

self.logger.info('File ID: {}'.format(file.get('id')))
return file.get('id')

def rename_file(self, file_id, file_metadata, supports_team_drives=None):
"""
:param file_id: Drive file id to rename
:param file_metadata: the request body as specified here
https://developers.google.com/resources/api-libraries/documentation/drive/v3/python/latest/drive_v3.files.html#update
:param supports_team_drives: whether to support or not team drives
:return: Drive file id
"""

file = self.service.files().update(fileId=file_id,
body=file_metadata, fields='id',
supportsTeamDrives=supports_team_drives).execute()

self.logger.info('File ID: {}'.format(file.get('id')))
return file.get('id')

def query_files(self, query="name='test.pdf' and '1Nx2yo7qWFILjDQl6aMpIyY_ej8zuQVO3' in parents",
fields='files(id, name)',
supports_team_drives=None, include_team_drive_items=None):
"""
:param query: Drive query, the same when using Drive from browser
:param fields: Drive fields to be returned.
Ex. files(id, name) to get back Drive file id and file name from search
:param supports_team_drives: whether to support or not team drives
:param include_team_drive_items: whether to support or not return of team drives items
:return: query results as a json response of list of files (starting with key "files": [{file1}, {file2}...]
"""
results = self.service.files().list(q=query,
spaces='drive', supportsTeamDrives=supports_team_drives,
includeTeamDriveItems=include_team_drive_items,
fields=fields).execute()
return results
25 changes: 17 additions & 8 deletions gcloud_connectors/gsc.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
SCOPES = ['https://www.googleapis.com/auth/webmasters.readonly']


class GSCConnector():
class GSCConnector:
def __init__(self, confs_path=None, auth_type='service_accounts', json_keyfile_dict=None, logger=None):
self.confs_path = confs_path
self.json_keyfile_dict = json_keyfile_dict
Expand All @@ -26,13 +26,22 @@ def __init__(self, confs_path=None, auth_type='service_accounts', json_keyfile_d

@retry(exceptions=Exception, tries=5)
def execute_request(self, property_uri, request):
"""Executes a searchAnalytics.query request.
Args:
service: The webmasters service to use when executing the query.
property_uri: The site or app URI to request data for.
request: The request to be executed.
Returns:
An array of response rows.
"""
:param property_uri: Site or app URI to request data for.
:param request: GSC api request.
Ex.
'startDate': '2019-01-01',
'endDate': '2020-02-05',
'dimensions': ['date'],
'dimensionFilterGroups': [{
'filters': [{
'dimension': 'page',
'expression': '/us/',
'operator': 'contains'
}]
}],
'rowLimit': 25000
:return: GSC response
"""

return self.service.searchanalytics().query(
Expand Down
Loading

0 comments on commit 3cc3984

Please sign in to comment.