Skip to content

Commit

Permalink
next iter
Browse files Browse the repository at this point in the history
  • Loading branch information
laysabit committed Apr 19, 2024
1 parent 85178ea commit cbf7e11
Showing 1 changed file with 8 additions and 10 deletions.
18 changes: 8 additions & 10 deletions dags/stellar_etl_airflow/test_sources.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,21 +21,21 @@ def treating_errors(successful_transforms, BQ_results):
raise ValueError("Mismatch between operations in GCS and BQ operations")
elif successful_transforms["trades"] != BQ_results["trades"]:
print(
"bq_operations are {0} and successful_transforms operations are {1}".format(
"bq trades are {0} and successful_transforms trades are {1}".format(
BQ_results["trades"], successful_transforms["trades"]
)
)
raise ValueError("Mismatch between trades in GCS and BQ trades")
elif successful_transforms["effects"] != BQ_results["effects"]:
print(
"bq_operations are {0} and successful_transforms operations are {1}".format(
"bq effects are {0} and successful_transforms effects are {1}".format(
BQ_results["effects"], successful_transforms["effects"]
)
)
raise ValueError("Mismatch between effects in GCS and BQ effects")
elif successful_transforms["transactions"] != BQ_results["transactions"]:
print(
"bq_operations are {0} and successful_transforms operations are {1}".format(
"bq transactions are {0} and successful_transforms transactions are {1}".format(
BQ_results["transactions"], successful_transforms["transactions"]
)
)
Expand Down Expand Up @@ -184,21 +184,19 @@ def get_from_without_captiveCore(**context):
successful_transforms_ledgers = xcom_ledgers["successful_transforms"]
total_successful_transforms += successful_transforms_ledgers

# # Query number of rows in BigQuery table
# Query number of rows in BigQuery table
query_job = do_query("ledgers", yesterday)

results = query_job.result()

rows = [dict(row) for row in results]
bq_result = int(next(iter(query_job.result()))[0])

context["ti"].xcom_push(key="from BQ", value=rows[0])
context["ti"].xcom_push(key="from BQ", value=bq_result)
context["ti"].xcom_push(key="from GCS", value=total_successful_transforms)

# Compare successful_transforms and bq_rows
if total_successful_transforms != rows[0]:
if total_successful_transforms != next(iter(query_job.result()))[0]:
print(
"bq_rows are {0} and successful_transforms are {1}".format(
rows[0]["count_public"], total_successful_transforms
bq_result, total_successful_transforms
)
)
raise ValueError(
Expand Down

0 comments on commit cbf7e11

Please sign in to comment.