Skip to content

Commit

Permalink
next iter
Browse files Browse the repository at this point in the history
  • Loading branch information
laysabit committed Apr 19, 2024
1 parent ff6dbec commit 85178ea
Showing 1 changed file with 5 additions and 9 deletions.
14 changes: 5 additions & 9 deletions dags/stellar_etl_airflow/test_sources.py
Original file line number Diff line number Diff line change
Expand Up @@ -126,18 +126,16 @@ def get_from_combinedExport(**context):
for key, val in zip(successful_transforms, successful_values):
successful_transforms[key] += val

print(f"Total successful transforms for yesterday: {successful_transforms}")

query_job = do_query("operations", yesterday)
query_job2 = do_query("trades", yesterday)
query_job3 = do_query("effects", yesterday)
query_job4 = do_query("transactions", yesterday)

BQ_results = {
"operations": int(row[0] for row in query_job.result()),
"trades": int(row[0] for row in query_job2.result()),
"effects": int(row[0] for row in query_job3.result()),
"transactions": int(row[0] for row in query_job4.result()),
"operations": next(iter(query_job.result()))[0],
"trades": next(iter(query_job2.result()))[0],
"effects": next(iter(query_job3.result()))[0],
"transactions": next(iter(query_job4.result()))[0],
}

context["ti"].xcom_push(key="from BQ", value=BQ_results)
Expand Down Expand Up @@ -170,11 +168,9 @@ def get_from_without_captiveCore(**context):
.all()
)

# Get the DAG
dag_bag = DagBag()
dag = dag_bag.get_dag("history_archive_without_captive_core")

# Get the task
task = dag.get_task("export_ledgers_task")

total_successful_transforms = 0
Expand All @@ -199,7 +195,7 @@ def get_from_without_captiveCore(**context):
context["ti"].xcom_push(key="from GCS", value=total_successful_transforms)

# Compare successful_transforms and bq_rows
if total_successful_transforms != rows[0]["count_public"]:
if total_successful_transforms != rows[0]:
print(
"bq_rows are {0} and successful_transforms are {1}".format(
rows[0]["count_public"], total_successful_transforms
Expand Down

0 comments on commit 85178ea

Please sign in to comment.