Skip to content
This repository has been archived by the owner on Jan 2, 2025. It is now read-only.

Commit

Permalink
Fix: Not using bq_ops_dataset for uploaded records table creation
Browse files Browse the repository at this point in the history
Change-Id: I1970f7bb902691c71f012ff629e8ba84e75cfd30
  • Loading branch information
Antônio Moreira committed Aug 13, 2021
1 parent 7415da6 commit dac64cb
Show file tree
Hide file tree
Showing 3 changed files with 34 additions and 10 deletions.
18 changes: 12 additions & 6 deletions megalista_dataflow/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -197,8 +197,10 @@ def expand(self, executions):
executions
| "Load Data - GA measurement protocol"
>> BatchesFromExecutions(
DestinationType.GA_MEASUREMENT_PROTOCOL, 20, transactional=True
)
DestinationType.GA_MEASUREMENT_PROTOCOL,
20,
True,
self.params.dataflow_options.bq_ops_dataset)
| "Upload - GA measurement protocol"
>> beam.ParDo(GoogleAnalyticsMeasurementProtocolUploaderDoFn())
| "Persist results - GA measurement protocol"
Expand All @@ -214,8 +216,10 @@ def expand(self, executions):
executions
| "Load Data - GA 4 measurement protocol"
>> BatchesFromExecutions(
DestinationType.GA_4_MEASUREMENT_PROTOCOL, 20, transactional=True
)
DestinationType.GA_4_MEASUREMENT_PROTOCOL,
20,
True,
self.params.dataflow_options.bq_ops_dataset)
| "Upload - GA 4 measurement protocol"
>> beam.ParDo(GoogleAnalytics4MeasurementProtocolUploaderDoFn())
| "Persist results - GA 4 measurement protocol"
Expand All @@ -231,8 +235,10 @@ def expand(self, executions):
executions
| "Load Data - CM conversion"
>> BatchesFromExecutions(
DestinationType.CM_OFFLINE_CONVERSION, 1000, transactional=True
)
DestinationType.CM_OFFLINE_CONVERSION,
1000,
True,
self.params.dataflow_options.bq_ops_dataset)
| "Upload - CM conversion"
>> beam.ParDo(
CampaignManagerConversionUploaderDoFn(self.params._oauth_credentials)
Expand Down
20 changes: 17 additions & 3 deletions megalista_dataflow/sources/batches_from_executions.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@

import apache_beam as beam
import logging

from apache_beam.options.value_provider import ValueProvider
from google.cloud import bigquery
from apache_beam.io.gcp.bigquery import ReadFromBigQueryRequest

Expand All @@ -25,6 +27,7 @@

_LOGGER_NAME = 'megalista.BatchesFromExecutions'


def _convert_row_to_dict(row):
dict = {}
for key, value in row.items():
Expand All @@ -49,10 +52,16 @@ def process(self, execution: Execution) -> Iterable[ReadFromBigQueryRequest]:
yield {'execution': execution, 'row': _convert_row_to_dict(row)}

class _ExecutionIntoBigQueryRequestTransactional(beam.DoFn):

def __init__(self, bq_ops_dataset):
self._bq_ops_dataset = bq_ops_dataset

def process(self, execution: Execution) -> Iterable[ReadFromBigQueryRequest]:
table_name = execution.source.source_metadata[0] + \
'.' + execution.source.source_metadata[1]
uploaded_table_name = f"{table_name}_uploaded"
uploaded_table_name = self._bq_ops_dataset.get() + \
'.' + execution.source.source_metadata[1] + \
"_uploaded"
client = bigquery.Client()

query = f"CREATE TABLE IF NOT EXISTS {uploaded_table_name} ( \
Expand Down Expand Up @@ -97,16 +106,21 @@ def __init__(
self,
destination_type: DestinationType,
batch_size: int = 5000,
transactional: bool = False
transactional: bool = False,
bq_ops_dataset: ValueProvider = None
):
super().__init__()
if transactional and not bq_ops_dataset:
raise Exception('Missing bq_ops_dataset for this uploader')

self._destination_type = destination_type
self._batch_size = batch_size
self._transactional = transactional
self._bq_ops_dataset = bq_ops_dataset

def _get_bq_request_class(self):
if self._transactional:
return self._ExecutionIntoBigQueryRequestTransactional()
return self._ExecutionIntoBigQueryRequestTransactional(self._bq_ops_dataset)
return self._ExecutionIntoBigQueryRequest()

def expand(self, executions):
Expand Down
6 changes: 5 additions & 1 deletion megalista_dataflow/third_party/steps.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,11 @@ def expand(self, executions):
return (
executions
| 'Load Data - AppsFlyer S2S events' >>
BatchesFromExecutions(DestinationType.APPSFLYER_S2S_EVENTS, 1000, transactional=True)
BatchesFromExecutions(
DestinationType.APPSFLYER_S2S_EVENTS,
1000,
True,
self.params.dataflow_options.bq_ops_dataset)
| 'Upload - AppsFlyer S2S events' >>
beam.ParDo(AppsFlyerS2SUploaderDoFn(self.params.dataflow_options.appsflyer_dev_key))
| 'Persist results - AppsFlyer S2S events' >> beam.ParDo(TransactionalEventsResultsWriter(self.params.dataflow_options.bq_ops_dataset))
Expand Down

0 comments on commit dac64cb

Please sign in to comment.