diff --git a/dags/stellar_etl_airflow/test_sources.py b/dags/stellar_etl_airflow/test_sources.py index 317a75df..9ecae9da 100644 --- a/dags/stellar_etl_airflow/test_sources.py +++ b/dags/stellar_etl_airflow/test_sources.py @@ -126,18 +126,16 @@ def get_from_combinedExport(**context): for key, val in zip(successful_transforms, successful_values): successful_transforms[key] += val - print(f"Total successful transforms for yesterday: {successful_transforms}") - query_job = do_query("operations", yesterday) query_job2 = do_query("trades", yesterday) query_job3 = do_query("effects", yesterday) query_job4 = do_query("transactions", yesterday) BQ_results = { - "operations": int(row[0] for row in query_job.result()), - "trades": int(row[0] for row in query_job2.result()), - "effects": int(row[0] for row in query_job3.result()), - "transactions": int(row[0] for row in query_job4.result()), + "operations": next(iter(query_job.result()))[0], + "trades": next(iter(query_job2.result()))[0], + "effects": next(iter(query_job3.result()))[0], + "transactions": next(iter(query_job4.result()))[0], } context["ti"].xcom_push(key="from BQ", value=BQ_results) @@ -170,11 +168,9 @@ def get_from_without_captiveCore(**context): .all() ) - # Get the DAG dag_bag = DagBag() dag = dag_bag.get_dag("history_archive_without_captive_core") - # Get the task task = dag.get_task("export_ledgers_task") total_successful_transforms = 0 @@ -199,7 +195,7 @@ def get_from_without_captiveCore(**context): context["ti"].xcom_push(key="from GCS", value=total_successful_transforms) # Compare successful_transforms and bq_rows - if total_successful_transforms != rows[0]["count_public"]: + if total_successful_transforms != rows[0]: print( "bq_rows are {0} and successful_transforms are {1}".format( rows[0]["count_public"], total_successful_transforms