Skip to content

Commit

Permalink
Testnet reset dag (#132)
Browse files Browse the repository at this point in the history
* created the dag to reset the datasets

* inserted dates in variables

* all proposed changes have been corrected
  • Loading branch information
cayod authored Apr 14, 2023
1 parent f85fb7f commit 2b002e9
Show file tree
Hide file tree
Showing 4 changed files with 343 additions and 0 deletions.
7 changes: 7 additions & 0 deletions airflow_variables_dev.json
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,13 @@
"api_key_path": "/home/airflow/gcs/data/apiKey.json",
"bq_dataset": "test_gcp_airflow_internal",
"bq_project": "test-hubble-319619",
"date_for_resets": {
"date": [
"2023-03-15",
"2023-06-14",
"2023-09-13",
"2023-12-13"]
},
"cluster_fields": {
"accounts": ["account_id", "last_modified_ledger"],
"account_signers": ["account_id", "signer", "last_modified_ledger"],
Expand Down
271 changes: 271 additions & 0 deletions dags/dataset_reset_dag.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,271 @@
"""
When the Test net server is reset, the dataset reset DAG deletes all the datasets in the test Hubble.
"""


import json
import datetime
from airflow import DAG
from airflow.models import Variable
from stellar_etl_airflow.default import get_default_dag_args, init_sentry
from stellar_etl_airflow.build_delete_data_for_reset_task import build_delete_data_task
from stellar_etl_airflow.build_check_execution_date_task import build_check_execution_date, path_to_execute

init_sentry()

dag = DAG(
"testnet_data_reset",
default_args=get_default_dag_args(),
description="This DAG runs after the Testnet data reset that occurs periodically.",
start_date=datetime.datetime(2023, 1, 1, 0, 0),
schedule_interval="10 9 * * *",
params={
"alias": "testnet-reset",
},
user_defined_filters={"fromjson": lambda s: json.loads(s)},
)

internal_project = 'test-hubble-319619'
internal_dataset = Variable.get("bq_dataset")
public_project = 'test-hubble-319619'
public_dataset = Variable.get("public_dataset")
table_names = Variable.get("table_ids", deserialize_json=True)

check_run_date = build_check_execution_date(
dag,
"check_run_date",
"start_reset",
"end_of_execution"
)

end_of_execution = path_to_execute(
dag,
"end_of_execution"
)

start_reset = path_to_execute(
dag,
"start_reset"
)


delete_internal_accounts = build_delete_data_task(
dag,
internal_project,
internal_dataset,
table_names['accounts'],
'internal',
)

delete_internal_assets = build_delete_data_task(
dag,
internal_project,
internal_dataset,
table_names['assets'],
'internal',
)

delete_internal_claimable_balances = build_delete_data_task(
dag,
internal_project,
internal_dataset,
table_names['claimable_balances'],
'internal',
)

delete_internal_effects = build_delete_data_task(
dag,
internal_project,
internal_dataset,
table_names['effects'],
'internal',
)

delete_internal_ledgers = build_delete_data_task(
dag,
internal_project,
internal_dataset,
table_names['ledgers'],
'internal',
)

delete_internal_liquidity_pools = build_delete_data_task(
dag,
internal_project,
internal_dataset,
table_names['liquidity_pools'],
'internal',
)

delete_internal_offers = build_delete_data_task(
dag,
internal_project,
internal_dataset,
table_names['offers'],
'internal',
)

delete_internal_operations = build_delete_data_task(
dag,
internal_project,
internal_dataset,
table_names['operations'],
'internal',
)

delete_internal_signers = build_delete_data_task(
dag,
internal_project,
internal_dataset,
table_names['signers'],
'internal',
)

delete_internal_trades = build_delete_data_task(
dag,
internal_project,
internal_dataset,
table_names['trades'],
'internal',
)

delete_internal_transactions = build_delete_data_task(
dag,
internal_project,
internal_dataset,
table_names['transactions'],
'internal',
)

delete_internal_trustlines = build_delete_data_task(
dag,
internal_project,
internal_dataset,
table_names['trustlines'],
'internal',
)

delete_internal_enriched = build_delete_data_task(
dag,
internal_project,
internal_dataset,
"enriched_history_operations",
'internal',
)

delete_internal_enriched_meaningful = build_delete_data_task(
dag,
internal_project,
internal_dataset,
"enriched_meaningful_history_operations",
'internal',
)

delete_public_accounts = build_delete_data_task(
dag,
public_project,
public_dataset,
table_names['accounts'],
'public',
)

delete_public_assets = build_delete_data_task(
dag,
public_project,
public_dataset,
table_names['assets'],
'public',
)

delete_public_claimable_balances = build_delete_data_task(
dag,
public_project,
public_dataset,
table_names['claimable_balances'],
'public',
)

delete_public_effects = build_delete_data_task(
dag,
public_project,
public_dataset,
table_names['effects'],
'public',
)

delete_public_ledgers = build_delete_data_task(
dag,
public_project,
public_dataset,
table_names['ledgers'],
'public',
)

delete_public_liquidity_pools = build_delete_data_task(
dag,
public_project,
public_dataset,
table_names['liquidity_pools'],
'public',
)

delete_public_offers = build_delete_data_task(
dag,
public_project,
public_dataset,
table_names['offers'],
'public',
)

delete_public_operations = build_delete_data_task(
dag,
public_project,
public_dataset,
table_names['operations'],
'public',
)

delete_public_signers = build_delete_data_task(
dag,
public_project,
public_dataset,
table_names['signers'],
'public',
)

delete_public_trades = build_delete_data_task(
dag,
public_project,
public_dataset,
table_names['trades'],
'public',
)

delete_public_transactions = build_delete_data_task(
dag,
public_project,
public_dataset,
table_names['transactions'],
'public',
)

delete_public_trustlines = build_delete_data_task(
dag,
public_project,
public_dataset,
table_names['trustlines'],
'public',
)

delete_public_enriched = build_delete_data_task(
dag,
public_project,
public_dataset,
"enriched_history_operations",
'public',
)


check_run_date >> [start_reset, end_of_execution]

start_reset >> [delete_internal_accounts, delete_internal_assets, delete_internal_claimable_balances, delete_internal_effects, delete_internal_ledgers, delete_internal_liquidity_pools, delete_internal_offers, delete_internal_operations, delete_internal_signers, delete_internal_trades, delete_internal_transactions, delete_internal_trustlines, delete_internal_enriched, delete_internal_enriched_meaningful, delete_public_accounts, delete_public_assets, delete_public_claimable_balances, delete_public_effects, delete_public_ledgers, delete_public_liquidity_pools, delete_public_offers, delete_public_operations, delete_public_signers, delete_public_trades, delete_public_transactions, delete_public_trustlines, delete_public_enriched]
36 changes: 36 additions & 0 deletions dags/stellar_etl_airflow/build_check_execution_date_task.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
from datetime import datetime

from airflow.models import Variable
from stellar_etl_airflow.default import alert_after_max_retries
from airflow.operators.python_operator import PythonOperator, BranchPythonOperator
from airflow.operators.dummy_operator import DummyOperator


def check_execution_date(**kwargs):
date_for_resets = Variable.get('date_for_resets', deserialize_json=True)
date_for_resets = date_for_resets['date']
today = datetime.today().date()
for date in date_for_resets:
date = datetime.strptime(date, '%Y-%m-%d').date()
if today == date:
return kwargs.get('start_task')
return kwargs.get('stop_task')



def build_check_execution_date(dag,task_name, start_task, stop_task):

return BranchPythonOperator(
task_id=task_name,
python_callable=check_execution_date,
op_kwargs={'start_task': start_task, 'stop_task': stop_task},
provide_context=True,
dag=dag
)

def path_to_execute(dag,task_name):

return DummyOperator(
task_id=task_name,
dag=dag
)
29 changes: 29 additions & 0 deletions dags/stellar_etl_airflow/build_delete_data_for_reset_task.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
from datetime import timedelta

from airflow.models import Variable
from airflow.providers.google.cloud.operators.bigquery import BigQueryInsertJobOperator
from stellar_etl_airflow.default import alert_after_max_retries


def build_delete_data_task(dag, project, dataset, table, type_of_dataset):
DELETE_ROWS_QUERY = (
f"TRUNCATE table {project}.{dataset}.{table}"
)

return BigQueryInsertJobOperator(
dag=dag,
project_id=project,
task_id=f"delete_{table}_{type_of_dataset}",
execution_timeout=timedelta(
seconds=Variable.get("task_timeout", deserialize_json=True)[
build_delete_data_task.__name__
]
),
on_failure_callback=alert_after_max_retries,
configuration={
"query": {
"query": DELETE_ROWS_QUERY,
"useLegacySql": False,
}
},
)

0 comments on commit 2b002e9

Please sign in to comment.