diff --git a/parsons/google/google_bigquery.py b/parsons/google/google_bigquery.py index 7308c97ea1..99e7e48497 100644 --- a/parsons/google/google_bigquery.py +++ b/parsons/google/google_bigquery.py @@ -324,6 +324,9 @@ def copy_from_gcs( quote: Optional[str] = None, schema: Optional[List[dict]] = None, job_config: Optional[LoadJobConfig] = None, + force_unzip_blobs: bool = False, + compression_type: str = "gzip", + new_file_extension: str = "csv", **load_kwargs, ): """ @@ -371,6 +374,12 @@ def copy_from_gcs( on the BigQuery client. The function will create its own if not provided. Note if there are any conflicts between the job_config and other parameters, the job_config values are preferred. + force_unzip_blobs: bool + If True, target blobs will be unzipped before being loaded to BigQuery. + compression_type: str + Accepts `zip` or `gzip` values to differentially unzip a compressed blob in cloud storage. + new_file_extension: str + Provides a file extension if a blob is decompressed and rewritten to cloud storage. **load_kwargs: kwargs Other arguments to pass to the underlying load_table_from_uri call on the BigQuery client. @@ -406,15 +415,32 @@ def copy_from_gcs( # load CSV from Cloud Storage into BigQuery table_ref = get_table_ref(self.client, table_name) - load_job = self.client.load_table_from_uri( - source_uris=gcs_blob_uri, - destination=table_ref, - job_config=job_config, - **load_kwargs, - ) - try: - load_job.result() + if force_unzip_blobs: + self.copy_large_compressed_file_from_gcs( + gcs_blob_uri=gcs_blob_uri, + table_name=table_name, + if_exists=if_exists, + max_errors=max_errors, + data_type=data_type, + csv_delimiter=csv_delimiter, + ignoreheader=ignoreheader, + nullas=nullas, + allow_quoted_newlines=allow_quoted_newlines, + quote=quote, + schema=schema, + job_config=job_config, + compression_type=compression_type, + new_file_extension=new_file_extension, + ) + else: + load_job = self.client.load_table_from_uri( + source_uris=gcs_blob_uri, + destination=table_ref, + job_config=job_config, + **load_kwargs, + ) + load_job.result() except exceptions.BadRequest as e: if "one of the files is larger than the maximum allowed size." in str(e): logger.debug( @@ -433,6 +459,8 @@ def copy_from_gcs( quote=quote, schema=schema, job_config=job_config, + compression_type=compression_type, + new_file_extension=new_file_extension, ) elif "Schema has no field" in str(e): logger.debug(f"{gcs_blob_uri.split('/')[-1]} is empty, skipping file") @@ -465,6 +493,8 @@ def copy_large_compressed_file_from_gcs( quote: Optional[str] = None, schema: Optional[List[dict]] = None, job_config: Optional[LoadJobConfig] = None, + compression_type: str = "gzip", + new_file_extension: str = "csv", **load_kwargs, ): """ @@ -513,6 +543,10 @@ def copy_large_compressed_file_from_gcs( on the BigQuery client. The function will create its own if not provided. Note if there are any conflicts between the job_config and other parameters, the job_config values are preferred. + compression_type: str + Accepts `zip` or `gzip` values to differentially unzip a compressed blob in cloud storage. + new_file_extension: str + Provides a file extension if a blob is decompressed and rewritten to cloud storage. **load_kwargs: kwargs Other arguments to pass to the underlying load_table_from_uri call on the BigQuery client. @@ -557,7 +591,8 @@ def copy_large_compressed_file_from_gcs( uncompressed_gcs_uri = gcs.unzip_blob( bucket_name=old_bucket_name, blob_name=old_blob_name, - new_file_extension="csv", + new_file_extension=new_file_extension, + compression_type=compression_type, ) logger.debug( diff --git a/parsons/google/google_cloud_storage.py b/parsons/google/google_cloud_storage.py index d5d0ad9b86..19993c5768 100644 --- a/parsons/google/google_cloud_storage.py +++ b/parsons/google/google_cloud_storage.py @@ -8,7 +8,7 @@ import time import uuid import gzip -import shutil +import zipfile from typing import Optional logger = logging.getLogger(__name__) @@ -431,6 +431,7 @@ def copy_bucket_to_gcs( }, "gcs_data_sink": { "bucket_name": gcs_sink_bucket, + "path": destination_path, }, } @@ -452,7 +453,6 @@ def copy_bucket_to_gcs( while polling: if latest_operation_name: - operation = client.get_operation({"name": latest_operation_name}) if not operation.done: @@ -520,6 +520,7 @@ def unzip_blob( self, bucket_name: str, blob_name: str, + compression_type: str = "gzip", new_filename: Optional[str] = None, new_file_extension: Optional[str] = None, ) -> str: @@ -535,6 +536,9 @@ def unzip_blob( blob_name: str Blob name in GCS bucket + compression_type: str + Either `zip` or `gzip` + new_filename: str If provided, replaces the existing blob name when the decompressed file is uploaded @@ -547,29 +551,82 @@ def unzip_blob( String representation of decompressed GCS URI """ + compression_params = { + "zip": { + "file_extension": ".zip", + "compression_function": self.__zip_decompress_and_write_to_gcs, + "read": "r", + }, + "gzip": { + "file_extension": ".gz", + "compression_function": self.__gzip_decompress_and_write_to_gcs, + }, + } + + file_extension = compression_params[compression_type]["file_extension"] + compression_function = compression_params[compression_type][ + "compression_function" + ] + compressed_filepath = self.download_blob( bucket_name=bucket_name, blob_name=blob_name ) - decompressed_filepath = compressed_filepath.replace(".gz", "") + decompressed_filepath = compressed_filepath.replace(file_extension, "") decompressed_blob_name = ( - new_filename if new_filename else blob_name.replace(".gz", "") + new_filename if new_filename else blob_name.replace(file_extension, "") ) if new_file_extension: decompressed_filepath += f".{new_file_extension}" decompressed_blob_name += f".{new_file_extension}" logger.debug("Decompressing file...") + compression_function( + compressed_filepath=compressed_filepath, + decompressed_filepath=decompressed_filepath, + decompressed_blob_name=decompressed_blob_name, + bucket_name=bucket_name, + new_file_extension=new_file_extension, + ) + + return self.format_uri(bucket=bucket_name, name=decompressed_blob_name) + + def __gzip_decompress_and_write_to_gcs(self, **kwargs): + """ + Handles `.gzip` decompression and streams blob contents + to a decompressed storage object + """ + + compressed_filepath = kwargs.pop("compressed_filepath") + decompressed_blob_name = kwargs.pop("decompressed_blob_name") + bucket_name = kwargs.pop("bucket_name") + with gzip.open(compressed_filepath, "rb") as f_in: - with open(decompressed_filepath, "wb") as f_out: - shutil.copyfileobj(f_in, f_out) + logger.debug( + f"Uploading uncompressed file to GCS: {decompressed_blob_name}" + ) + bucket = self.get_bucket(bucket_name=bucket_name) + blob = storage.Blob(name=decompressed_blob_name, bucket=bucket) + blob.upload_from_file(file_obj=f_in, rewind=True, timeout=3600) + + def __zip_decompress_and_write_to_gcs(self, **kwargs): + """ + Handles `.zip` decompression and streams blob contents + to a decompressed storage object + """ + + compressed_filepath = kwargs.pop("compressed_filepath") + decompressed_blob_name = kwargs.pop("decompressed_blob_name") + decompressed_blob_in_archive = decompressed_blob_name.split("/")[-1] + bucket_name = kwargs.pop("bucket_name") + + # Unzip the archive + with zipfile.ZipFile(compressed_filepath) as path_: + # Open the underlying file + with path_.open(decompressed_blob_in_archive) as f_in: logger.debug( f"Uploading uncompressed file to GCS: {decompressed_blob_name}" ) - self.put_blob( - bucket_name=bucket_name, - blob_name=decompressed_blob_name, - local_path=decompressed_filepath, - ) - - return self.format_uri(bucket=bucket_name, name=decompressed_blob_name) + bucket = self.get_bucket(bucket_name=bucket_name) + blob = storage.Blob(name=decompressed_blob_name, bucket=bucket) + blob.upload_from_file(file_obj=f_in, rewind=True, timeout=3600)