Skip to content

Commit

Permalink
added engine for pd_to_gstorage
Browse files Browse the repository at this point in the history
  • Loading branch information
pualien committed Sep 4, 2024
1 parent 44b77da commit 9b43a58
Show file tree
Hide file tree
Showing 2 changed files with 21 additions and 11 deletions.
30 changes: 20 additions & 10 deletions gcloud_connectors/gstorage.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,20 +36,28 @@ def __init__(self, confs_path=None, auth_type='service_account', json_keyfile_di
self.logger = logger if logger is not None else EmptyLogger()

@retry((socket.timeout, requests.exceptions.ConnectionError, urllib3.exceptions.ProtocolError), tries=3, delay=2)
def pd_to_gstorage(self, df, bucket_name, file_name_path, tempfile_mode=True, partition_cols=None, **kwargs):
def pd_to_gstorage(self, df, bucket_name, file_name_path, tempfile_mode=True, partition_cols=None,
engine='pyarrow', **kwargs):
"""
:param df: pandas DataFrame to be saved on GCS
:param bucket_name: GCS bucket name
:param file_name_path: path to save file on bucket
:param tempfile_mode: if using a tempfile before pushing to GCS
:param partition_cols: columns for partitioning in order of partitions
:return: True or error whether file is correctly saved or not
Saves a pandas DataFrame to Google Cloud Storage (GCS).
:param df: The pandas DataFrame to be saved.
:param bucket_name: The name of the GCS bucket where the file will be saved.
:param file_name_path: The path to save the file on the bucket.
:param tempfile_mode: If True, saves the DataFrame to a temporary file before uploading it to GCS.
Defaults to True.
:param partition_cols: A list of column names to use for partitioning the DataFrame.
If None, no partitioning will be performed.
:param engine: The engine to use for saving the DataFrame to parquet format.
Defaults to 'pyarrow'.
:param **kwargs: Additional keyword arguments to pass to the `to_parquet` method.
:return: True if the file is saved successfully, or an error if the operation fails.
"""
if partition_cols is None:
if tempfile_mode:
bucket = self.service.get_bucket(bucket_name)
with tempfile.NamedTemporaryFile('w') as temp:
df.to_parquet(temp.name + '.parquet', index=False, **kwargs)
df.to_parquet(temp.name + '.parquet', index=False, engine=engine, **kwargs)
bucket.blob(file_name_path).upload_from_filename(temp.name + '.parquet',
content_type='application/octet-stream')
temp.flush()
Expand All @@ -60,13 +68,15 @@ def pd_to_gstorage(self, df, bucket_name, file_name_path, tempfile_mode=True, pa
# google compute metadata service, anonymous
df.to_parquet(
'gcs://{bucket_name}/{file_name_path}'.format(bucket_name=bucket_name, file_name_path=file_name_path),
index=False, **kwargs)
index=False, engine=engine, **kwargs)
return True
else:
# only works for the following order: gcloud CLI default, gcsfs cached token,
# google compute metadata service, anonymous
df.to_parquet(
'gcs://{bucket_name}/{file_name_path}'.format(bucket_name=bucket_name, file_name_path=file_name_path),
index=False, partition_cols=partition_cols, **kwargs)
index=False, partition_cols=partition_cols, engine=engine, **kwargs)
return True

def recursive_delete(self, bucket_name, directory_path_to_delete):
"""
Expand Down
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ def get_requirements(*requirements_file):


setup(name='gcloud-connectors',
version='0.2.0',
version='0.2.1',
url='https://github.com/pualien/py-gcloud-connector',
# download_url='https://github.com/pualien/py-gcloud-connectors/archive/0.1.23.tar.gz',
license='MIT',
Expand Down

0 comments on commit 9b43a58

Please sign in to comment.