From cbf7e1176a4d896afbeb09c45cfa7d0bbb8c8d2a Mon Sep 17 00:00:00 2001 From: Laysa de Sousa Bitencourt Date: Fri, 19 Apr 2024 18:29:01 -0300 Subject: [PATCH] next iter --- dags/stellar_etl_airflow/test_sources.py | 18 ++++++++---------- 1 file changed, 8 insertions(+), 10 deletions(-) diff --git a/dags/stellar_etl_airflow/test_sources.py b/dags/stellar_etl_airflow/test_sources.py index 9ecae9da..ccfba3ed 100644 --- a/dags/stellar_etl_airflow/test_sources.py +++ b/dags/stellar_etl_airflow/test_sources.py @@ -21,21 +21,21 @@ def treating_errors(successful_transforms, BQ_results): raise ValueError("Mismatch between operations in GCS and BQ operations") elif successful_transforms["trades"] != BQ_results["trades"]: print( - "bq_operations are {0} and successful_transforms operations are {1}".format( + "bq trades are {0} and successful_transforms trades are {1}".format( BQ_results["trades"], successful_transforms["trades"] ) ) raise ValueError("Mismatch between trades in GCS and BQ trades") elif successful_transforms["effects"] != BQ_results["effects"]: print( - "bq_operations are {0} and successful_transforms operations are {1}".format( + "bq effects are {0} and successful_transforms effects are {1}".format( BQ_results["effects"], successful_transforms["effects"] ) ) raise ValueError("Mismatch between effects in GCS and BQ effects") elif successful_transforms["transactions"] != BQ_results["transactions"]: print( - "bq_operations are {0} and successful_transforms operations are {1}".format( + "bq transactions are {0} and successful_transforms transactions are {1}".format( BQ_results["transactions"], successful_transforms["transactions"] ) ) @@ -184,21 +184,19 @@ def get_from_without_captiveCore(**context): successful_transforms_ledgers = xcom_ledgers["successful_transforms"] total_successful_transforms += successful_transforms_ledgers - # # Query number of rows in BigQuery table + # Query number of rows in BigQuery table query_job = do_query("ledgers", yesterday) - results = query_job.result() - - rows = [dict(row) for row in results] + bq_result = int(next(iter(query_job.result()))[0]) - context["ti"].xcom_push(key="from BQ", value=rows[0]) + context["ti"].xcom_push(key="from BQ", value=bq_result) context["ti"].xcom_push(key="from GCS", value=total_successful_transforms) # Compare successful_transforms and bq_rows - if total_successful_transforms != rows[0]: + if total_successful_transforms != next(iter(query_job.result()))[0]: print( "bq_rows are {0} and successful_transforms are {1}".format( - rows[0]["count_public"], total_successful_transforms + bq_result, total_successful_transforms ) ) raise ValueError(