From c610a5660d609f9cbaa286d55a1ae698d11c1095 Mon Sep 17 00:00:00 2001 From: Laysa de Sousa Bitencourt Date: Wed, 17 Apr 2024 15:48:16 -0300 Subject: [PATCH] queries --- dags/stellar_etl_airflow/test_sources.py | 24 ++++++++++++++---------- 1 file changed, 14 insertions(+), 10 deletions(-) diff --git a/dags/stellar_etl_airflow/test_sources.py b/dags/stellar_etl_airflow/test_sources.py index 865ca88f..9f151c4c 100644 --- a/dags/stellar_etl_airflow/test_sources.py +++ b/dags/stellar_etl_airflow/test_sources.py @@ -49,21 +49,25 @@ def compare_transforms_and_bq_rows(): print(f"Total successful transforms for yesterday: {total_successful_transforms}") - # key_path = Variable.get("api_key_path") - # credentials = service_account.Credentials.from_service_account_file(key_path) - # client = bigquery.Client(credentials=credentials, project=credentials.project_id) + key_path = Variable.get("api_key_path") + credentials = service_account.Credentials.from_service_account_file(key_path) + client = bigquery.Client(credentials=credentials, project=credentials.project_id) # # Query number of rows in BigQuery table - # query_job = client.query(f"SELECT - # (SELECT COUNT(*) FROM crypto-stellar.crypto_stellar.history_ledgers - # WHERE batch_run_date BETWEEN '2020-12-04' AND '2020-12-05') AS count_public, - # " - # ) - # results = query_job.result() - # bq_rows = [row for row in results][0][0] + query_job = client.query(f"SELECT + (SELECT COUNT(*) FROM crypto-stellar.crypto_stellar.history_ledgers + WHERE DATE(batch_run_date)='2020-04-16') AS count_public, + (SELECT COUNT(*) FROM hubble-261722.crypto_stellar_internal_2.account_signers + WHERE DATE(batch_run_date)='2020-04-16') AS count_internal; + " + ) + results = query_job.result() + bq_rows = [row for row in results][0][0] + print(f"bq_rows are: {bq_rows}") # # Compare successful_transforms and bq_rows # if successful_transforms_ledgers != bq_rows: + # print("bq_rows are {0} and successful_transforms are {1}".format(bq_rows, successful_transforms_ledgers) ) # raise ValueError('Mismatch between successful_transforms in ledgers and bq_rows')