diff --git a/README.md b/README.md index e69de29..a35e26f 100644 --- a/README.md +++ b/README.md @@ -0,0 +1,24 @@ +# PY GCLOUD CONNECTORS +Python utilities to simplify connection with Google APIs + +## Google Wrappers +- `BigQueryConnector` to read and cast pandas DataFrame from BigQuery + +- `GAnalyticsConnector` to unsample data and return pandas DataFrame from Google Analytics + +- `GDriveConnector` to download, upload, search and rename files from Google Drive + +- `GSCConnector` to get data from Google Search Console + +- `GSheetsConnector` to read and upload pandas DataFrame from / to Google Spreadsheet + +- `GStorageConnector` to write pandas DataFrame in parquet format to Google Cloud Storage + + +### Bonus + +- `ForeignExchangeRatesConverter` to get currency conversion rates + +- `LTVCalculator` to compute Customer Lifetime Value + +- `pd_utils` to derive month, quarter column from date in pandas DataFrame \ No newline at end of file diff --git a/gcloud_connectors/bigquery.py b/gcloud_connectors/bigquery.py index 609873d..ec03fe4 100644 --- a/gcloud_connectors/bigquery.py +++ b/gcloud_connectors/bigquery.py @@ -26,6 +26,68 @@ def __init__(self, project_id, confs_path=None, auth_type='service_account', jso self.service = bigquery.Client(project=self.project_id, credentials=self.creds) + @staticmethod + def pd_cast_dtypes(df, table_dtypes): + """ + :param df: pandas DataFrame coming from BigQuery sql + :param table_dtypes: pandas dtypes + :return: pandas DataFrame casted + """ + table_dtypes = {col: d_type for col, d_type in table_dtypes.items() if col in df.columns} + try: + df = df.astype(table_dtypes) + except Exception as e: + print(e) + for col, d_type in table_dtypes.items(): + if col in df.columns: + try: + if d_type == 'integer': + d_type = int + if d_type == 'float': + d_type = float + if d_type == 'string' or d_type == 'category' or d_type == str: + d_type = str + if d_type in (str, float, int): + if d_type in (float, int): + pass + # df[col] = pd.to_numeric(df[col], errors='coerce') + df[col] = df[col].astype(d_type) + if d_type in ('boolean', 'bool'): + try: + df[col] = df[col].astype(int).fillna(False) + except Exception as e: + pass + df[col] = df[col].replace({1: True, 0: False}) + except Exception as e: + print('No casting for {} into {} from {}'.format(col, d_type, df[col].dtype)) + print(e) + return df + + def pd_get_dtypes(self, dataset, table): + """ + :param dataset: BigQuery dataset name + :param table: BigQuery table name + :return: dict of {col1: dtype1, col2: dtype1, col3: dtype3....} + """ + table_ref = '{}.{}'.format(dataset, table) + table = self.service.get_table(table_ref) + schema_dtypes = {} + for schema in table.schema: + if schema.field_type in ['STRING']: + type = str + elif schema.field_type in ['DATE', 'TIMESTAMP']: + type = 'datetime64[ns]' + elif schema.field_type in ['FLOAT']: + type = schema.field_type.lower() + elif schema.field_type in ['INTEGER']: + type = 'int' + elif schema.field_type in ['BOOLEAN']: + type = 'bool' + else: + raise AttributeError('Unknown type {} for {}'.format(schema.field_type, schema.name)) + schema_dtypes[schema.name] = type + return schema_dtypes + def get_pandas_dtypes(self, dataset, table): table_ref = '{}.{}'.format(dataset, table) table = self.service.get_table(table_ref) @@ -48,14 +110,12 @@ def get_pandas_dtypes(self, dataset, table): def pd_execute(self, query, progress_bar_type=None, bqstorage_enabled=False): """ - - :param bqstorage_enabled: - :param query: + :param query: sql query :param progress_bar_type: - :return: + :param bqstorage_enabled: whether to user or not bqstorage to download results more quickly + :return: pandas DataFrame from BigQuery sql execution """ - # Use a BigQuery Storage API client to download results more quickly. if bqstorage_enabled is True: bqstorage_client = BigQueryStorageClient( credentials=self.creds diff --git a/gcloud_connectors/currency_converter.py b/gcloud_connectors/currency_converter.py index 99df6a9..4f858f8 100644 --- a/gcloud_connectors/currency_converter.py +++ b/gcloud_connectors/currency_converter.py @@ -3,6 +3,11 @@ class ForeignExchangeRatesConverter: def get_exchange_by_date(self, date, base_currency='EUR'): + """ + :param date: date for currency conversion rates in %Y-%m-%d + :param base_currency: currency in iso format 3 to discover conversions + :return: json response of currency conversion + """ url = "https://api.exchangeratesapi.io/{date}?base={base_currency}" \ .format(date=date, base_currency=base_currency) response = requests.request("GET", url) diff --git a/gcloud_connectors/ganalytics.py b/gcloud_connectors/ganalytics.py index c501530..c7b013e 100644 --- a/gcloud_connectors/ganalytics.py +++ b/gcloud_connectors/ganalytics.py @@ -44,9 +44,7 @@ def __init__(self, confs_path=None, auth_type='service_accounts', json_keyfile_d self.management_service = None def get_segments_by_id(self, segment_id): - self.management_service = build('analytics', 'v3', http=self.http, - # discoveryServiceUrl=('https://analyticsreporting.googleapis.com/$discovery/rest') - ) + self.management_service = build('analytics', 'v3', http=self.http) segments = self.management_service.management().segments().list().execute().get('items', []) for segment in reversed(segments): pass @@ -54,6 +52,19 @@ def get_segments_by_id(self, segment_id): @retry(googleapiclient.errors.HttpError, tries=3, delay=2) def pd_get_report(self, view_id, start_date, end_date, metrics, dimensions, filters=None, page_size=100000, page_token=None, comes_from_sampling=False, segments=None): + """ + :param view_id: Google Analytics view id + :param start_date: start_date to get data + :param end_date: end_data to get data + :param metrics: list of ga: metrics + :param dimensions: list of ga: dimensions + :param filters: filters in string type format. Ex. ga:deviceCategory==tablet + :param page_size: max size of results per page + :param page_token: token that identifies the page of results (depending on page_size) + :param comes_from_sampling: to better debug unsampling capabilities, only with active logger initiated with GAnalyticsConnector + :param segments: list of Google Analytics segments in format ['gaid::...', ...] + :return: pandas DataFrame unsampled results from Google Analytics + """ self.logger.info('start view {view_id} from sampled data {comes_from_sampling} ' 'from {start_date} to {end_date} ' 'with metrics ({metrics}), dimensions ({dimensions}), filters ({filters}), page_token {page_token}' @@ -82,25 +93,14 @@ def pd_get_report(self, view_id, start_date, end_date, metrics, dimensions, filt segments[0], dict) else segments body['reportRequests'][0]["dimensions"].append({"name": "ga:segment"}) - # try: - # response = self.service.reports().batchGet(body=body).execute() - # except googleapiclient.errors.HttpError as e: - # json_error = json.loads(e.content) - # if json_error.get('error',{}).get('code') != 403: - # raise GAError(e) - # elif json_error.get('error',{}).get('message', '').startswith('User does not have sufficient permissions'): - # raise GAPermissionError(e) - # else: - # raise e - response = self.service.reports().batchGet(body=body).execute() df = self.get_df_from_response(response, dimensions, metrics) if response['reports'][0]['data'].get('samplesReadCounts') is not None: self.logger.info('unsampling for {} {}'.format(start_date, end_date)) - # differenza di start_date meno end_date in giorni poi dividere la prima chiamata in due: - # start_date fino alla metà della differenza in giorni - # metà della differenza in giorni + 1 fino a end_date + # difference of start_date minus end_date in days, then split api of api calls in two parts: + # start_date until half difference in days + # half difference in days plus one day until end_date start_date_dt = datetime.strptime(start_date, '%Y-%m-%d') end_date_dt = datetime.strptime(end_date, '%Y-%m-%d') middle_dt = start_date_dt + (end_date_dt - start_date_dt) / 2 @@ -134,13 +134,12 @@ def pd_get_report(self, view_id, start_date, end_date, metrics, dimensions, filt def pd_get_raw_report(self, report_request, dimensions, metrics, page_size=10000, page_token=None, comes_from_sampling=False): """ - + Useful if combined with Export Reporting API v4 taken from Da Vinci Tools Chrome Extension https://chrome.google.com/webstore/detail/da-vinci-tools/pekljbkpgnpphbkgjbfgiiclemodfpen :param report_request: accepts only one report request at time to unsample correctly - :param dimensions: - :param metrics: - :param page_size: - :param page_token: - :param comes_from_sampling: + :param page_size: max size of results per page + :param page_token: token that identifies the page of results (depending on page_size) + :param comes_from_sampling: to better debug unsampling capabilities, only with active logger initiated with GAnalyticsConnector + :return: pandas DataFrame unsampled results from Google Analytics :return: """ start_date = report_request['reportRequests'][0]['dateRanges'][0]['startDate'] @@ -154,9 +153,9 @@ def pd_get_raw_report(self, report_request, dimensions, metrics, page_size=10000 if response['reports'][0]['data'].get('samplesReadCounts') is not None: self.logger.info('unsampling for {} {}'.format(start_date, end_date)) - # differenza di start_date meno end_date in giorni poi dividere la prima chiamata in due: - # start_date fino alla metà della differenza in giorni - # metà della differenza in giorni + 1 fino a end_date + # difference of start_date minus end_date in days, then split api of api calls in two parts: + # start_date until half difference in days + # half difference in days plus one day until end_date start_date_dt = datetime.strptime(start_date, '%Y-%m-%d') end_date_dt = datetime.strptime(end_date, '%Y-%m-%d') middle_dt = start_date_dt + (end_date_dt - start_date_dt) / 2 @@ -190,6 +189,12 @@ def pd_get_raw_report(self, report_request, dimensions, metrics, page_size=10000 @staticmethod def get_df_from_response(response, dimensions, metrics): + """ + :param response: raw response from Google Analytics API + :param dimensions: list of Google Analytics dimensions + :param metrics: list of Google Analytics metrics + :return: pandas DataFrame of results from response + """ data_dic = {f"{i}": [] for i in dimensions + metrics} for report in response.get('reports', []): rows = report.get('data', {}).get('rows', []) diff --git a/gcloud_connectors/gdrive.py b/gcloud_connectors/gdrive.py index 4f47e3e..ac58caf 100644 --- a/gcloud_connectors/gdrive.py +++ b/gcloud_connectors/gdrive.py @@ -10,12 +10,10 @@ from googleapiclient.http import MediaIoBaseDownload, MediaFileUpload from oauth2client.service_account import ServiceAccountCredentials - - SCOPES = ['https://www.googleapis.com/auth/drive', 'https://www.googleapis.com/auth/drive.file'] -class GDriveConnector(): +class GDriveConnector: def __init__(self, confs_path=None, auth_type='service_accounts', json_keyfile_dict=None, logger=None): self.confs_path = confs_path self.auth_type = auth_type @@ -44,7 +42,8 @@ def __init__(self, confs_path=None, auth_type='service_accounts', json_keyfile_d self.creds = pickle.load(token) obj = lambda: None - lmao = {"auth_host_name": 'localhost', 'noauth_local_webserver': 'store_true', 'auth_host_port': [8081, 8090], + lmao = {"auth_host_name": 'localhost', 'noauth_local_webserver': 'store_true', + 'auth_host_port': [8081, 8090], 'logging_level': 'ERROR'} for k, v in lmao.items(): setattr(obj, k, v) @@ -58,11 +57,19 @@ def __init__(self, confs_path=None, auth_type='service_accounts', json_keyfile_d self.creds = tools.run_flow(flow, store, obj) self.service = build('drive', 'v3', credentials=self.creds) - def download_file(self, file_id, path, num_retries=50): + def download_file(self, file_id, path, supports_team_drives=None, supports_all_drives=None, num_retries=50): + """ + :param file_id: Drive file id + :param path: path to save files + :param supports_team_drives: whether to support or not team drives + :param supports_all_drives: whether to support or all drives + :param num_retries: number of retries if download leads to errors + :return: Drive file id + """ - request = self.service.files().get_media(fileId=file_id) + request = self.service.files().get_media(fileId=file_id, supportsTeamDrives=supports_team_drives, + supportsAllDrives=supports_all_drives) - # replace the filename and extension in the first field below fh = io.FileIO(path, mode='w') downloader = MediaIoBaseDownload(fh, request) done = False @@ -71,22 +78,58 @@ def download_file(self, file_id, path, num_retries=50): self.logger.info("Download %d%%." % int(status.progress() * 100)) return done - def upload_file(self, file_path, parent_id='', mime_type='application/pdf', description=None): + def upload_file(self, file_path, parent_id='', mime_type='application/pdf', description=None, + supports_team_drives=None): + """ + :param file_path: path for file to be uploaded + :param parent_id: parent id for Drive folder. + If not specified as part of a create request, the file will be placed directly in the user's My Drive folder + :param mime_type: file mime type. Ex. application/pdf + :param description: optional file description + :param supports_team_drives: whether to support or not team drives + :return: Drive file id + """ file_name = file_path.rsplit('/')[-1] if len(file_path.rsplit('/')) else file_path.rsplit('\\')[-1] file_metadata = {'name': file_name, 'parents': [parent_id], 'mimeType': mime_type, 'title': file_name} - # file_metadata = {'name': file_name, 'mimeType': mime_type, 'title': file_name} - if description is not None: file_metadata['description'] = description media_body = MediaFileUpload(file_path, mimetype=mime_type, resumable=True) - # file = self.service.files().create(body=file_metadata, - # media_body=media_body, - # fields='id').execute() file = self.service.files().create( body=file_metadata, - media_body=media_body, fields='id').execute() + media_body=media_body, fields='id', supportsTeamDrives=supports_team_drives).execute() self.logger.info('File ID: {}'.format(file.get('id'))) return file.get('id') + def rename_file(self, file_id, file_metadata, supports_team_drives=None): + """ + :param file_id: Drive file id to rename + :param file_metadata: the request body as specified here + https://developers.google.com/resources/api-libraries/documentation/drive/v3/python/latest/drive_v3.files.html#update + :param supports_team_drives: whether to support or not team drives + :return: Drive file id + """ + + file = self.service.files().update(fileId=file_id, + body=file_metadata, fields='id', + supportsTeamDrives=supports_team_drives).execute() + + self.logger.info('File ID: {}'.format(file.get('id'))) + return file.get('id') + def query_files(self, query="name='test.pdf' and '1Nx2yo7qWFILjDQl6aMpIyY_ej8zuQVO3' in parents", + fields='files(id, name)', + supports_team_drives=None, include_team_drive_items=None): + """ + :param query: Drive query, the same when using Drive from browser + :param fields: Drive fields to be returned. + Ex. files(id, name) to get back Drive file id and file name from search + :param supports_team_drives: whether to support or not team drives + :param include_team_drive_items: whether to support or not return of team drives items + :return: query results as a json response of list of files (starting with key "files": [{file1}, {file2}...] + """ + results = self.service.files().list(q=query, + spaces='drive', supportsTeamDrives=supports_team_drives, + includeTeamDriveItems=include_team_drive_items, + fields=fields).execute() + return results diff --git a/gcloud_connectors/gsc.py b/gcloud_connectors/gsc.py index 0adfbbe..cad0ff1 100644 --- a/gcloud_connectors/gsc.py +++ b/gcloud_connectors/gsc.py @@ -8,7 +8,7 @@ SCOPES = ['https://www.googleapis.com/auth/webmasters.readonly'] -class GSCConnector(): +class GSCConnector: def __init__(self, confs_path=None, auth_type='service_accounts', json_keyfile_dict=None, logger=None): self.confs_path = confs_path self.json_keyfile_dict = json_keyfile_dict @@ -26,13 +26,22 @@ def __init__(self, confs_path=None, auth_type='service_accounts', json_keyfile_d @retry(exceptions=Exception, tries=5) def execute_request(self, property_uri, request): - """Executes a searchAnalytics.query request. - Args: - service: The webmasters service to use when executing the query. - property_uri: The site or app URI to request data for. - request: The request to be executed. - Returns: - An array of response rows. + """ + :param property_uri: Site or app URI to request data for. + :param request: GSC api request. + Ex. + 'startDate': '2019-01-01', + 'endDate': '2020-02-05', + 'dimensions': ['date'], + 'dimensionFilterGroups': [{ + 'filters': [{ + 'dimension': 'page', + 'expression': '/us/', + 'operator': 'contains' + }] + }], + 'rowLimit': 25000 + :return: GSC response """ return self.service.searchanalytics().query( diff --git a/gcloud_connectors/gsheets.py b/gcloud_connectors/gsheets.py index 86c4f30..60df991 100644 --- a/gcloud_connectors/gsheets.py +++ b/gcloud_connectors/gsheets.py @@ -34,6 +34,15 @@ def get_sheet_id_by_name(self, spreadsheet, worksheet_name): def delete_cells(self, spreadsheet_key, worksheet_name, start_index=1, end_index=30, dimension='COLUMNS', additional_base_sheet=False): + """ + :param spreadsheet_key: id for Spreadsheet taken from URL + :param worksheet_name: name as visibile in worksheet + :param start_index: + :param end_index: + :param dimension: + :param additional_base_sheet: whethere to clean also base sheet + :return: + """ spreadsheet = self.gspread.open_by_key(spreadsheet_key) worksheets_to_delete = [worksheet_name, 'Foglio1'] if additional_base_sheet is True else [worksheet_name] for wk_name in worksheets_to_delete: @@ -63,9 +72,12 @@ def delete_cells(self, spreadsheet_key, worksheet_name, start_index=1, end_index @retry((requests.exceptions.ReadTimeout, gspread.exceptions.APIError), tries=3, delay=2) def pd_to_gsheet(self, df, spreadsheet_key, worksheet_name, value_input_option='USER_ENTERED', clean=True, use_df2gsprad=True): """ - :param df: - :param spreadsheet_key: - :param worksheet_name: + :param df: pandas DataFrame + :param spreadsheet_key: id for Spreadsheet taken from URL + :param worksheet_name: name as visibile in worksheet + :param value_input_option: 'USER_ENTERED' if scope is to maintain column types from pandas DataFrame + :param clean: whether to clean or not + :param use_df2gsprad: :return: """ if use_df2gsprad: @@ -83,9 +95,9 @@ def pd_to_gsheet(self, df, spreadsheet_key, worksheet_name, value_input_option=' def pd_read_gsheet(self, spreadsheet_key, worksheet_name): """ - :param spreadsheet_key: - :param worksheet_name: - :return: + :param spreadsheet_key: id for Spreadsheet taken from URL + :param worksheet_name: name as visibile in worksheet + :return: pandas DataFrame from worksheet """ return g2d.download(gfile=spreadsheet_key, wks_name=worksheet_name, col_names=True, credentials=self.creds) diff --git a/gcloud_connectors/gstorage.py b/gcloud_connectors/gstorage.py index b4edeb0..16745ef 100644 --- a/gcloud_connectors/gstorage.py +++ b/gcloud_connectors/gstorage.py @@ -28,15 +28,12 @@ def __init__(self, confs_path=None, auth_type='service_account', json_keyfile_di self.service = storage.Client.from_service_account_json(self.confs_path) self.logger = logger if logger is not None else EmptyLogger() - - - def pd_to_gstorage(self, df, bucket_name='docsity-da-test-gsc-store-bucket', file_name_path='da_gsc_macro/lang=it/country=Italy/y=2019/data.parquet'): + def pd_to_gstorage(self, df, bucket_name, file_name_path): """ - - :param df: - :param bucket_name: - :param file_name_path: - :return: + :param df: pandas DataFrame to be saved on GCS + :param bucket_name: GCS bucket name + :param file_name_path: path to save file on bucket + :return: True, False whether file is correctly saved or not """ bucket = self.service.get_bucket(bucket_name) with tempfile.NamedTemporaryFile('w') as temp: @@ -47,10 +44,18 @@ def pd_to_gstorage(self, df, bucket_name='docsity-da-test-gsc-store-bucket', fil return True return False - def recursive_delete(self,bucket_name, directory_path_to_delete): + def recursive_delete(self, bucket_name, directory_path_to_delete): + """ + :param bucket_name: GCS bucket name + :param directory_path_to_delete: path to start recursive deletion + :return: list of deleted files from GSC + """ bucket = self.service.get_bucket(bucket_name) blobs = bucket.list_blobs(prefix=directory_path_to_delete) + deleted_files = [] for blob in blobs: blob.delete() self.logger.info('deleted {}'.format(blob.name)) + deleted_files.append(blob.name) + return deleted_files diff --git a/gcloud_connectors/ltv_calculator.py b/gcloud_connectors/ltv_calculator.py index 6311b68..8a0e1c0 100644 --- a/gcloud_connectors/ltv_calculator.py +++ b/gcloud_connectors/ltv_calculator.py @@ -3,7 +3,8 @@ class LTVCalculator: - def __init__(self, base_df, date_order_column, date_register_column, cumsum_base_column, ltv_index_columns, ltv_type='Q'): + def __init__(self, base_df, date_order_column, date_register_column, cumsum_base_column, ltv_index_columns, + ltv_type='Q'): self.base_df = base_df.copy(deep=True) self.date_order_column = date_order_column self.date_register_column = date_register_column @@ -24,23 +25,23 @@ def __init__(self, base_df, date_order_column, date_register_column, cumsum_base def get_x_index(self, row): return "{}{}".format(self.ltv_type, - str(self.list_date_order.index(row[self.date_order_column]) - self.list_date_order.index( - row[self.date_register_column]) + 1).zfill( - 2)) + str(self.list_date_order.index(row[self.date_order_column]) - self.list_date_order.index( + row[self.date_register_column]) + 1).zfill( + 2)) def execute(self): - self.base_df[self.index_column] = self.base_df.apply(lambda row: self.get_x_index(row), axis=1) self.base_df = self.base_df.sort_values(by=[self.index_column, self.date_register_column]) self.base_df['cumsum'] = self.base_df.groupby(self.ltv_index_columns)[self.cumsum_base_column].transform( pd.Series.cumsum).round(2) - self.ltv_df = self.base_df.pivot_table(index=self.index_column, columns=self.ltv_index_columns, values=self.cumsum_base_column, - aggfunc=np.sum).cumsum().round(2) + self.ltv_df = self.base_df.pivot_table(index=self.index_column, columns=self.ltv_index_columns, + values=self.cumsum_base_column, + aggfunc=np.sum).cumsum().round(2) return self.base_df def pd_to_csv(self, filename, decimal=','): - if self.ltv_df is None: + if self.base_df is None: raise AttributeError('LTV DataFrame not defined, please launch execute before') - self.ltv_df.fillna('').to_csv(filename, index=False, decimal=decimal) \ No newline at end of file + self.base_df.fillna('').to_csv(filename, index=False, decimal=decimal) diff --git a/setup.py b/setup.py index 8403bda..af226c9 100644 --- a/setup.py +++ b/setup.py @@ -49,7 +49,7 @@ def get_requirements(*requirements_file): return dependencies setup(name='gcloud-connectors', - version='0.1.22', + version='0.1.23', url='https://github.com/pualien/py-gcloud-connector', license='MIT', author='Matteo Senardi',