From dac64cb0ec5c7ac7d5e0bd591543bf1876c0c70b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ant=C3=B4nio=20Moreira?= Date: Fri, 13 Aug 2021 18:15:12 -0300 Subject: [PATCH] Fix: Not using bq_ops_dataset for uploaded records table creation Change-Id: I1970f7bb902691c71f012ff629e8ba84e75cfd30 --- megalista_dataflow/main.py | 18 +++++++++++------ .../sources/batches_from_executions.py | 20 ++++++++++++++++--- megalista_dataflow/third_party/steps.py | 6 +++++- 3 files changed, 34 insertions(+), 10 deletions(-) diff --git a/megalista_dataflow/main.py b/megalista_dataflow/main.py index 8139dcf6..bc5175f3 100644 --- a/megalista_dataflow/main.py +++ b/megalista_dataflow/main.py @@ -197,8 +197,10 @@ def expand(self, executions): executions | "Load Data - GA measurement protocol" >> BatchesFromExecutions( - DestinationType.GA_MEASUREMENT_PROTOCOL, 20, transactional=True - ) + DestinationType.GA_MEASUREMENT_PROTOCOL, + 20, + True, + self.params.dataflow_options.bq_ops_dataset) | "Upload - GA measurement protocol" >> beam.ParDo(GoogleAnalyticsMeasurementProtocolUploaderDoFn()) | "Persist results - GA measurement protocol" @@ -214,8 +216,10 @@ def expand(self, executions): executions | "Load Data - GA 4 measurement protocol" >> BatchesFromExecutions( - DestinationType.GA_4_MEASUREMENT_PROTOCOL, 20, transactional=True - ) + DestinationType.GA_4_MEASUREMENT_PROTOCOL, + 20, + True, + self.params.dataflow_options.bq_ops_dataset) | "Upload - GA 4 measurement protocol" >> beam.ParDo(GoogleAnalytics4MeasurementProtocolUploaderDoFn()) | "Persist results - GA 4 measurement protocol" @@ -231,8 +235,10 @@ def expand(self, executions): executions | "Load Data - CM conversion" >> BatchesFromExecutions( - DestinationType.CM_OFFLINE_CONVERSION, 1000, transactional=True - ) + DestinationType.CM_OFFLINE_CONVERSION, + 1000, + True, + self.params.dataflow_options.bq_ops_dataset) | "Upload - CM conversion" >> beam.ParDo( CampaignManagerConversionUploaderDoFn(self.params._oauth_credentials) diff --git a/megalista_dataflow/sources/batches_from_executions.py b/megalista_dataflow/sources/batches_from_executions.py index 7be75aae..70e95289 100644 --- a/megalista_dataflow/sources/batches_from_executions.py +++ b/megalista_dataflow/sources/batches_from_executions.py @@ -16,6 +16,8 @@ import apache_beam as beam import logging + +from apache_beam.options.value_provider import ValueProvider from google.cloud import bigquery from apache_beam.io.gcp.bigquery import ReadFromBigQueryRequest @@ -25,6 +27,7 @@ _LOGGER_NAME = 'megalista.BatchesFromExecutions' + def _convert_row_to_dict(row): dict = {} for key, value in row.items(): @@ -49,10 +52,16 @@ def process(self, execution: Execution) -> Iterable[ReadFromBigQueryRequest]: yield {'execution': execution, 'row': _convert_row_to_dict(row)} class _ExecutionIntoBigQueryRequestTransactional(beam.DoFn): + + def __init__(self, bq_ops_dataset): + self._bq_ops_dataset = bq_ops_dataset + def process(self, execution: Execution) -> Iterable[ReadFromBigQueryRequest]: table_name = execution.source.source_metadata[0] + \ '.' + execution.source.source_metadata[1] - uploaded_table_name = f"{table_name}_uploaded" + uploaded_table_name = self._bq_ops_dataset.get() + \ + '.' + execution.source.source_metadata[1] + \ + "_uploaded" client = bigquery.Client() query = f"CREATE TABLE IF NOT EXISTS {uploaded_table_name} ( \ @@ -97,16 +106,21 @@ def __init__( self, destination_type: DestinationType, batch_size: int = 5000, - transactional: bool = False + transactional: bool = False, + bq_ops_dataset: ValueProvider = None ): super().__init__() + if transactional and not bq_ops_dataset: + raise Exception('Missing bq_ops_dataset for this uploader') + self._destination_type = destination_type self._batch_size = batch_size self._transactional = transactional + self._bq_ops_dataset = bq_ops_dataset def _get_bq_request_class(self): if self._transactional: - return self._ExecutionIntoBigQueryRequestTransactional() + return self._ExecutionIntoBigQueryRequestTransactional(self._bq_ops_dataset) return self._ExecutionIntoBigQueryRequest() def expand(self, executions): diff --git a/megalista_dataflow/third_party/steps.py b/megalista_dataflow/third_party/steps.py index ae8324de..348b4073 100644 --- a/megalista_dataflow/third_party/steps.py +++ b/megalista_dataflow/third_party/steps.py @@ -17,7 +17,11 @@ def expand(self, executions): return ( executions | 'Load Data - AppsFlyer S2S events' >> - BatchesFromExecutions(DestinationType.APPSFLYER_S2S_EVENTS, 1000, transactional=True) + BatchesFromExecutions( + DestinationType.APPSFLYER_S2S_EVENTS, + 1000, + True, + self.params.dataflow_options.bq_ops_dataset) | 'Upload - AppsFlyer S2S events' >> beam.ParDo(AppsFlyerS2SUploaderDoFn(self.params.dataflow_options.appsflyer_dev_key)) | 'Persist results - AppsFlyer S2S events' >> beam.ParDo(TransactionalEventsResultsWriter(self.params.dataflow_options.bq_ops_dataset))