Skip to content

Commit

Permalink
do_query and treating_errors functions
Browse files Browse the repository at this point in the history
  • Loading branch information
laysabit committed Apr 19, 2024
1 parent e36e330 commit 546b66c
Showing 1 changed file with 64 additions and 81 deletions.
145 changes: 64 additions & 81 deletions dags/stellar_etl_airflow/test_sources.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,51 @@
from google.oauth2 import service_account


def treating_errors(successful_transforms, BQ_results):
if successful_transforms["operations"] != BQ_results["operations"]:
print(
"bq_operations are {0} and successful_transforms operations are {1}".format(
BQ_results["operations"], successful_transforms["operations"]
)
)
raise ValueError("Mismatch between operations in GCS and BQ operations")
elif successful_transforms["trades"] != BQ_results["trades"]:
print(
"bq_operations are {0} and successful_transforms operations are {1}".format(
BQ_results["trades"], successful_transforms["trades"]
)
)
raise ValueError("Mismatch between trades in GCS and BQ trades")
elif successful_transforms["effects"] != BQ_results["effects"]:
print(
"bq_operations are {0} and successful_transforms operations are {1}".format(
BQ_results["effects"], successful_transforms["effects"]
)
)
raise ValueError("Mismatch between effects in GCS and BQ effects")
elif successful_transforms["transactions"] != BQ_results["transactions"]:
print(
"bq_operations are {0} and successful_transforms operations are {1}".format(
BQ_results["transactions"], successful_transforms["transactions"]
)
)
raise ValueError("Mismatch between transactions in GCS and BQ transactions")


def do_query(opType, date):
key_path = Variable.get("api_key_path")
credentials = service_account.Credentials.from_service_account_file(key_path)
client = bigquery.Client(credentials=credentials, project=credentials.project_id)

query_job = client.query(
f"""SELECT
(SELECT COUNT(*) FROM crypto-stellar.crypto_stellar.history_{opType}
WHERE DATE(batch_run_date)='{date.strftime("%Y-%m-%d")}')
"""
)
return query_job


def get_from_combinedExport(**context):
successful_transforms = {
"operations": 0,
Expand Down Expand Up @@ -83,37 +128,10 @@ def get_from_combinedExport(**context):

print(f"Total successful transforms for yesterday: {successful_transforms}")

key_path = Variable.get("api_key_path")
credentials = service_account.Credentials.from_service_account_file(key_path)
client = bigquery.Client(credentials=credentials, project=credentials.project_id)

query_job = client.query(
f"""SELECT
(SELECT COUNT(*) FROM crypto-stellar.crypto_stellar.history_operations
WHERE DATE(batch_run_date)='{yesterday.strftime("%Y-%m-%d")}')
"""
)

query_job2 = client.query(
f"""SELECT
(SELECT COUNT(*) FROM crypto-stellar.crypto_stellar.history_trades
WHERE DATE(batch_run_date)='{yesterday.strftime("%Y-%m-%d")}')
"""
)

query_job3 = client.query(
f"""SELECT
(SELECT COUNT(*) FROM crypto-stellar.crypto_stellar.history_effects
WHERE DATE(batch_run_date)='{yesterday.strftime("%Y-%m-%d")}')
"""
)

query_job4 = client.query(
f"""SELECT
(SELECT COUNT(*) FROM crypto-stellar.crypto_stellar.history_transactions
WHERE DATE(batch_run_date)='{yesterday.strftime("%Y-%m-%d")}')
"""
)
query_job = do_query("operations", yesterday)
query_job2 = do_query("trades", yesterday)
query_job3 = do_query("effects", yesterday)
query_job4 = do_query("transactions", yesterday)

BQ_results = {
"operations": int(row[0] for row in query_job.result()),
Expand All @@ -125,34 +143,7 @@ def get_from_combinedExport(**context):
context["ti"].xcom_push(key="from BQ", value=BQ_results)
context["ti"].xcom_push(key="from GCS", value=successful_transforms)

if successful_transforms["operations"] != BQ_results["operations"]:
print(
"bq_operations are {0} and successful_transforms operations are {1}".format(
BQ_results["operations"], successful_transforms["operations"]
)
)
raise ValueError("Mismatch between operations in GCS and BQ operations")
elif successful_transforms["trades"] != BQ_results["trades"]:
print(
"bq_operations are {0} and successful_transforms operations are {1}".format(
BQ_results["trades"], successful_transforms["trades"]
)
)
raise ValueError("Mismatch between trades in GCS and BQ trades")
elif successful_transforms["effects"] != BQ_results["effects"]:
print(
"bq_operations are {0} and successful_transforms operations are {1}".format(
BQ_results["effects"], successful_transforms["effects"]
)
)
raise ValueError("Mismatch between effects in GCS and BQ effects")
elif successful_transforms["transactions"] != BQ_results["transactions"]:
print(
"bq_operations are {0} and successful_transforms operations are {1}".format(
BQ_results["transactions"], successful_transforms["transactions"]
)
)
raise ValueError("Mismatch between transactions in GCS and BQ transactions")
treating_errors(successful_transforms, BQ_results)


def get_from_without_captiveCore(**context):
Expand Down Expand Up @@ -196,22 +187,14 @@ def get_from_without_captiveCore(**context):
successful_transforms_ledgers = xcom_ledgers["successful_transforms"]
total_successful_transforms += successful_transforms_ledgers

key_path = Variable.get("api_key_path")
credentials = service_account.Credentials.from_service_account_file(key_path)
client = bigquery.Client(credentials=credentials, project=credentials.project_id)

# # Query number of rows in BigQuery table
query_job = client.query(
f"""SELECT
(SELECT COUNT(*) FROM crypto-stellar.crypto_stellar.history_ledgers
WHERE DATE(batch_run_date)='{yesterday.strftime("%Y-%m-%d")}') AS count_public
"""
)
query_job = do_query("ledgers", yesterday)

results = query_job.result()
# Convert the results to a list of rows

rows = [dict(row) for row in results]

context["ti"].xcom_push(key="from BQ", value=rows[0]["count_public"])
context["ti"].xcom_push(key="from BQ", value=rows[0])
context["ti"].xcom_push(key="from GCS", value=total_successful_transforms)

# Compare successful_transforms and bq_rows
Expand All @@ -236,16 +219,16 @@ def get_from_without_captiveCore(**context):
},
)

# compare_task = PythonOperator(
# task_id="get_from_without_captiveCore",
# python_callable=get_from_without_captiveCore,
# provide_context=True,
# dag=dag,
# )

compare2_task = PythonOperator(
task_id="get_from_combinedExport",
python_callable=get_from_combinedExport,
compare_task = PythonOperator(
task_id="get_from_without_captiveCore",
python_callable=get_from_without_captiveCore,
provide_context=True,
dag=dag,
)

# compare2_task = PythonOperator(
# task_id="get_from_combinedExport",
# python_callable=get_from_combinedExport,
# provide_context=True,
# dag=dag,
# )

0 comments on commit 546b66c

Please sign in to comment.