Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow setting gcs_tmp_bucket on GoogleBigQuery class #1147

Merged
merged 1 commit into from
Oct 11, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 21 additions & 5 deletions parsons/google/google_bigquery.py
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,10 @@ class GoogleBigQuery(DatabaseConnector):
A dictionary containing any requested client options. Defaults to the required
scopes for making API calls against External tables stored in Google Drive.
Can be set to None if these permissions are not desired
gcs_temp_bucket: str
Name of the GCS bucket that will be used for storing data during bulk transfers.
Required if you intend to perform bulk data transfers (eg. the copy_from_gcs method),
and env variable ``GCS_TEMP_BUCKET`` is not populated.
"""

def __init__(
Expand All @@ -157,6 +161,7 @@ def __init__(
"https://www.googleapis.com/auth/cloud-platform",
]
},
tmp_gcs_bucket: Optional[str] = None,
):
self.app_creds = app_creds

Expand All @@ -172,6 +177,7 @@ def __init__(
self.project = project
self.location = location
self.client_options = client_options
self.tmp_gcs_bucket = tmp_gcs_bucket

# We will not create the client until we need to use it, since creating the client
# without valid GOOGLE_APPLICATION_CREDENTIALS raises an exception.
Expand Down Expand Up @@ -684,7 +690,8 @@ def copy_s3(
The GoogleCloudStorage Connector to use for loading data into Google Cloud Storage.
tmp_gcs_bucket: str
The name of the Google Cloud Storage bucket to use to stage the data to load
into BigQuery. Required if `GCS_TEMP_BUCKET` is not specified.
into BigQuery. Required if `GCS_TEMP_BUCKET` is not specified or set on
the class instance.
template_table: str
Table name to be used as the load schema. Load operation wil use the same
columns and data types as the template table.
Expand All @@ -700,8 +707,12 @@ def copy_s3(
"""

# copy from S3 to GCS
tmp_gcs_bucket = check_env.check("GCS_TEMP_BUCKET", tmp_gcs_bucket)
gcs_client = gcs_client or GoogleCloudStorage(app_creds=self.app_creds)
tmp_gcs_bucket = (
tmp_gcs_bucket
or self.tmp_gcs_bucket
or check_env.check("GCS_TEMP_BUCKET", tmp_gcs_bucket)
)
gcs_client = gcs_client or GoogleCloudStorage()
temp_blob_uri = gcs_client.copy_s3_to_gcs(
aws_source_bucket=bucket,
aws_access_key_id=aws_access_key_id,
Expand Down Expand Up @@ -767,7 +778,8 @@ def copy(
the job fails.
tmp_gcs_bucket: str
The name of the Google Cloud Storage bucket to use to stage the data to load
into BigQuery. Required if `GCS_TEMP_BUCKET` is not specified.
into BigQuery. Required if `GCS_TEMP_BUCKET` is not specified or set on
the class instance.
gcs_client: object
The GoogleCloudStorage Connector to use for loading data into Google Cloud Storage.
job_config: object
Expand All @@ -783,7 +795,11 @@ def copy(
client.
"""
data_type = "csv"
tmp_gcs_bucket = check_env.check("GCS_TEMP_BUCKET", tmp_gcs_bucket)
tmp_gcs_bucket = (
tmp_gcs_bucket
or self.tmp_gcs_bucket
or check_env.check("GCS_TEMP_BUCKET", tmp_gcs_bucket)
)
if not tmp_gcs_bucket:
raise ValueError(
"Must set GCS_TEMP_BUCKET environment variable or pass in tmp_gcs_bucket parameter"
Expand Down
Loading