From 9bc6ece73cb27108710d5e341c5e103df1bb22bf Mon Sep 17 00:00:00 2001 From: sydneynotthecity Date: Tue, 30 Apr 2024 10:48:54 -0500 Subject: [PATCH 01/14] Fix upstream deps, refactor to be backfill compatible --- airflow_variables_prod.json | 8 ++++++-- dags/queries/create_table.sql | 1 + dags/queries/update_table.sql | 2 +- dags/sandbox_create_dag.py | 5 ++--- dags/sandbox_update_dag.py | 18 +++++++++++++----- 5 files changed, 23 insertions(+), 11 deletions(-) diff --git a/airflow_variables_prod.json b/airflow_variables_prod.json index 4f2e6b3d..f8339f6d 100644 --- a/airflow_variables_prod.json +++ b/airflow_variables_prod.json @@ -60,7 +60,7 @@ "dbt_image_name": "stellar/stellar-dbt:6ee7342", "dbt_job_execution_timeout_seconds": 1800, "dbt_job_retries": 1, - "dbt_mart_dataset": "sdf_marts", + "dbt_mart_dataset": "crypto_stellar_dbt", "dbt_maximum_bytes_billed": 100000000000000, "dbt_project": "hubble-261722", "dbt_target": "prod", @@ -246,9 +246,13 @@ "dbt_tables": { "signers_current": "account_signers_current", "accounts_current": "accounts_current", + "config_settings_current": "config_settings_current", + "contract_code_current": "contract_code_current", + "contract_data_current": "contract_data_current", "liquidity_pools_current": "liquidity_pools_current", "offers_current": "offers_current", - "trustlines_current": "trust_lines_current" + "trustlines_current": "trust_lines_current", + "ttl": "ttl" }, "use_testnet": "False", "sandbox_dataset": "crypto_stellar_internal_sandbox", diff --git a/dags/queries/create_table.sql b/dags/queries/create_table.sql index aeb28291..126941ef 100644 --- a/dags/queries/create_table.sql +++ b/dags/queries/create_table.sql @@ -4,4 +4,5 @@ options (partition_expiration_days = 180) as ( select * from `{project_id}.{dataset_id}.{table_id}` where batch_run_date >= date_sub(current_date(), interval 6 month) + and batch_run_date < date_sub(current_date()) ) diff --git a/dags/queries/update_table.sql b/dags/queries/update_table.sql index bfc3d297..65438ce5 100644 --- a/dags/queries/update_table.sql +++ b/dags/queries/update_table.sql @@ -1,4 +1,4 @@ insert into {project_id}.{target_dataset}.{table_id} select * from {project_id}.{dataset_id}.{table_id} -where date_trunc(batch_run_date, day) = date_trunc(current_date() - interval 1 day, day) +where date_trunc(batch_run_date, day) = date_trunc({batch_run_date}) diff --git a/dags/sandbox_create_dag.py b/dags/sandbox_create_dag.py index 7d5bdd5f..6fd19b1a 100644 --- a/dags/sandbox_create_dag.py +++ b/dags/sandbox_create_dag.py @@ -23,7 +23,6 @@ with DAG( "sandbox_create_dag", default_args=get_default_dag_args(), - start_date=datetime(2023, 1, 1), description="This DAG creates a sandbox", schedule_interval="@once", params={"alias": "sandbox_dataset"}, @@ -32,8 +31,8 @@ }, catchup=False, ) as dag: - PROJECT = Variable.get("bq_project") - DATASET = Variable.get("bq_dataset") + PROJECT = Variable.get("public_project") + DATASET = Variable.get("public_dataset") SANDBOX_DATASET = Variable.get("sandbox_dataset") DBT_DATASET = Variable.get("dbt_mart_dataset") TABLES_ID = Variable.get("table_ids", deserialize_json=True) diff --git a/dags/sandbox_update_dag.py b/dags/sandbox_update_dag.py index fb4b7b69..02ffca56 100644 --- a/dags/sandbox_update_dag.py +++ b/dags/sandbox_update_dag.py @@ -12,6 +12,7 @@ file_to_string, get_query_filepath, ) +from stellar_etl_airflow import macros from stellar_etl_airflow.build_cross_dependency_task import build_cross_deps from stellar_etl_airflow.default import ( alert_after_max_retries, @@ -24,22 +25,28 @@ with DAG( "sandbox_update_dag", default_args=get_default_dag_args(), - start_date=datetime(2023, 1, 1), + start_date=datetime(2024, 4, 30), description="This DAG updates a sandbox", schedule_interval="0 1 * * *", params={"alias": "sandbox_dataset"}, user_defined_filters={"fromjson": lambda s: loads(s)}, - catchup=False, + catchup=True, ) as dag: TABLES_ID = Variable.get("table_ids", deserialize_json=True) PROJECT = Variable.get("bq_project") BQ_DATASET = Variable.get("bq_dataset") SANDBOX_DATASET = Variable.get("sandbox_dataset") + batch_run_date = "{{ batch_run_date_as_datetime_string(dag, data_interval_start) }}" + start_tables_task = EmptyOperator(task_id="start_tables_task") - wait_on_dag = build_cross_deps( - dag, "wait_on_base_tables", "history_archive_with_captive_core_combined_export" + wait_on_history_dag = build_cross_deps( + dag, "wait_on_base_tables", "history_table_export" + ) + + wait_on_state_dag = build_cross_deps( + dag, "wait_on_base_tables", "state_table_export" ) for table_id in TABLES_ID: @@ -50,6 +57,7 @@ "dataset_id": BQ_DATASET, "table_id": TABLES_ID[table_id], "target_dataset": SANDBOX_DATASET, + "batch_run_date": batch_run_date } query = query.format(**sql_params) tables_update_task = BigQueryInsertJobOperator( @@ -63,4 +71,4 @@ on_failure_callback=alert_after_max_retries, ) - start_tables_task >> wait_on_dag >> tables_update_task + start_tables_task >> [wait_on_history_dag, wait_on_state_dag] >> tables_update_task From 50d100f542a9a53880adf460d8722342207d7caf Mon Sep 17 00:00:00 2001 From: sydneynotthecity Date: Tue, 30 Apr 2024 10:52:05 -0500 Subject: [PATCH 02/14] Update dev variables --- airflow_variables_dev.json | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/airflow_variables_dev.json b/airflow_variables_dev.json index 79b99c6e..0ef71034 100644 --- a/airflow_variables_dev.json +++ b/airflow_variables_dev.json @@ -55,7 +55,7 @@ "dbt_image_name": "stellar/stellar-dbt:6ee7342", "dbt_job_execution_timeout_seconds": 300, "dbt_job_retries": 1, - "dbt_mart_dataset": "test_sdf_marts", + "dbt_mart_dataset": "test_crypto_stellar_dbt", "dbt_maximum_bytes_billed": 250000000000, "dbt_project": "test-hubble-319619", "dbt_target": "test", @@ -251,9 +251,13 @@ "dbt_tables": { "signers_current": "account_signers_current", "accounts_current": "accounts_current", + "config_settings_current": "config_settings_current", + "contract_code_current": "contract_code_current", + "contract_data_current": "contract_data_current", "liquidity_pools_current": "liquidity_pools_current", "offers_current": "offers_current", - "trustlines_current": "trust_lines_current" + "trustlines_current": "trust_lines_current", + "ttl": "ttl" }, "use_testnet": "True", "sandbox_dataset": "crypto_stellar_internal_sandbox", From 4d651958a9c351e4cba268f1e0fc5acedf21a41d Mon Sep 17 00:00:00 2001 From: sydneynotthecity Date: Wed, 1 May 2024 10:44:38 -0500 Subject: [PATCH 03/14] Address linting errors --- dags/queries/create_table.sql | 5 +++-- dags/sandbox_create_dag.py | 1 - dags/sandbox_update_dag.py | 10 +++++++--- 3 files changed, 10 insertions(+), 6 deletions(-) diff --git a/dags/queries/create_table.sql b/dags/queries/create_table.sql index 126941ef..4bbf84cb 100644 --- a/dags/queries/create_table.sql +++ b/dags/queries/create_table.sql @@ -3,6 +3,7 @@ partition by date_trunc(batch_run_date, day) options (partition_expiration_days = 180) as ( select * from `{project_id}.{dataset_id}.{table_id}` - where batch_run_date >= date_sub(current_date(), interval 6 month) - and batch_run_date < date_sub(current_date()) + where + batch_run_date >= date_sub(current_date(), interval 6 month) + and batch_run_date < date_sub(current_date()) ) diff --git a/dags/sandbox_create_dag.py b/dags/sandbox_create_dag.py index 6fd19b1a..0f52aab3 100644 --- a/dags/sandbox_create_dag.py +++ b/dags/sandbox_create_dag.py @@ -1,7 +1,6 @@ """ This DAG creates the sandbox dataset with transactions tables, state tables with history and views. """ -from datetime import datetime from json import loads from airflow import DAG diff --git a/dags/sandbox_update_dag.py b/dags/sandbox_update_dag.py index 02ffca56..d099453b 100644 --- a/dags/sandbox_update_dag.py +++ b/dags/sandbox_update_dag.py @@ -8,11 +8,11 @@ from airflow.models.variable import Variable from airflow.operators.empty import EmptyOperator from airflow.providers.google.cloud.operators.bigquery import BigQueryInsertJobOperator +from stellar_etl_airflow import macros from stellar_etl_airflow.build_bq_insert_job_task import ( file_to_string, get_query_filepath, ) -from stellar_etl_airflow import macros from stellar_etl_airflow.build_cross_dependency_task import build_cross_deps from stellar_etl_airflow.default import ( alert_after_max_retries, @@ -57,7 +57,7 @@ "dataset_id": BQ_DATASET, "table_id": TABLES_ID[table_id], "target_dataset": SANDBOX_DATASET, - "batch_run_date": batch_run_date + "batch_run_date": batch_run_date, } query = query.format(**sql_params) tables_update_task = BigQueryInsertJobOperator( @@ -71,4 +71,8 @@ on_failure_callback=alert_after_max_retries, ) - start_tables_task >> [wait_on_history_dag, wait_on_state_dag] >> tables_update_task + ( + start_tables_task + >> [wait_on_history_dag, wait_on_state_dag] + >> tables_update_task + ) From 6ff30b66d66c2316a56e3bb5b151b3199f993018 Mon Sep 17 00:00:00 2001 From: sydneynotthecity Date: Wed, 1 May 2024 10:51:00 -0500 Subject: [PATCH 04/14] Update SQL logic to truncate date --- dags/queries/create_table.sql | 4 ++-- dags/queries/update_table.sql | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/dags/queries/create_table.sql b/dags/queries/create_table.sql index 4bbf84cb..ce09fba4 100644 --- a/dags/queries/create_table.sql +++ b/dags/queries/create_table.sql @@ -4,6 +4,6 @@ options (partition_expiration_days = 180) as ( select * from `{project_id}.{dataset_id}.{table_id}` where - batch_run_date >= date_sub(current_date(), interval 6 month) - and batch_run_date < date_sub(current_date()) + batch_run_date >= date_trunc(date_sub(current_date(), interval 6 month), day) + and batch_run_date < date_trunc((current_date(), day) ) diff --git a/dags/queries/update_table.sql b/dags/queries/update_table.sql index 65438ce5..5706674c 100644 --- a/dags/queries/update_table.sql +++ b/dags/queries/update_table.sql @@ -1,4 +1,4 @@ insert into {project_id}.{target_dataset}.{table_id} select * from {project_id}.{dataset_id}.{table_id} -where date_trunc(batch_run_date, day) = date_trunc({batch_run_date}) +where date_trunc(batch_run_date, day) = date_trunc('{batch_run_date}', day) From de2537b75a87b0185043728e2148f3efd1b28a6f Mon Sep 17 00:00:00 2001 From: sydneynotthecity Date: Wed, 1 May 2024 10:56:31 -0500 Subject: [PATCH 05/14] fix sql --- dags/queries/create_table.sql | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dags/queries/create_table.sql b/dags/queries/create_table.sql index ce09fba4..d76e33d5 100644 --- a/dags/queries/create_table.sql +++ b/dags/queries/create_table.sql @@ -5,5 +5,5 @@ options (partition_expiration_days = 180) as ( from `{project_id}.{dataset_id}.{table_id}` where batch_run_date >= date_trunc(date_sub(current_date(), interval 6 month), day) - and batch_run_date < date_trunc((current_date(), day) + and batch_run_date < date_trunc((current_date(), day)) ) From da8a3e8e9f905f900ae18915ec0c7a071fd859e4 Mon Sep 17 00:00:00 2001 From: sydneynotthecity Date: Wed, 1 May 2024 11:59:33 -0500 Subject: [PATCH 06/14] update task id to be unique --- dags/sandbox_update_dag.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dags/sandbox_update_dag.py b/dags/sandbox_update_dag.py index 949365d2..196dd41d 100644 --- a/dags/sandbox_update_dag.py +++ b/dags/sandbox_update_dag.py @@ -46,7 +46,7 @@ ) wait_on_state_dag = build_cross_deps( - dag, "wait_on_base_tables", "state_table_export" + dag, "wait_on_state_tables", "state_table_export" ) for table_id in TABLES_ID: From 3509e366234610af17b55d794fd30567498086f0 Mon Sep 17 00:00:00 2001 From: sydneynotthecity Date: Wed, 1 May 2024 12:09:23 -0500 Subject: [PATCH 07/14] Rename ttl dbt parm --- airflow_variables_dev.json | 2 +- airflow_variables_prod.json | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/airflow_variables_dev.json b/airflow_variables_dev.json index 7ae6c62a..1a66dac4 100644 --- a/airflow_variables_dev.json +++ b/airflow_variables_dev.json @@ -257,7 +257,7 @@ "liquidity_pools_current": "liquidity_pools_current", "offers_current": "offers_current", "trustlines_current": "trust_lines_current", - "ttl": "ttl" + "ttl": "ttl_current" }, "use_testnet": "True", "sandbox_dataset": "crypto_stellar_internal_sandbox", diff --git a/airflow_variables_prod.json b/airflow_variables_prod.json index 12e6f47b..8ba964b9 100644 --- a/airflow_variables_prod.json +++ b/airflow_variables_prod.json @@ -252,7 +252,7 @@ "liquidity_pools_current": "liquidity_pools_current", "offers_current": "offers_current", "trustlines_current": "trust_lines_current", - "ttl": "ttl" + "ttl": "ttl_current" }, "use_testnet": "False", "sandbox_dataset": "crypto_stellar_internal_sandbox", From 26e6c4180746eb277c5cc80f32b21c1a722896b8 Mon Sep 17 00:00:00 2001 From: sydneynotthecity Date: Wed, 1 May 2024 12:16:29 -0500 Subject: [PATCH 08/14] update last ttl key --- airflow_variables_dev.json | 2 +- airflow_variables_prod.json | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/airflow_variables_dev.json b/airflow_variables_dev.json index 1a66dac4..cf995dcb 100644 --- a/airflow_variables_dev.json +++ b/airflow_variables_dev.json @@ -257,7 +257,7 @@ "liquidity_pools_current": "liquidity_pools_current", "offers_current": "offers_current", "trustlines_current": "trust_lines_current", - "ttl": "ttl_current" + "ttl_current": "ttl_current" }, "use_testnet": "True", "sandbox_dataset": "crypto_stellar_internal_sandbox", diff --git a/airflow_variables_prod.json b/airflow_variables_prod.json index 8ba964b9..c7cc362f 100644 --- a/airflow_variables_prod.json +++ b/airflow_variables_prod.json @@ -252,7 +252,7 @@ "liquidity_pools_current": "liquidity_pools_current", "offers_current": "offers_current", "trustlines_current": "trust_lines_current", - "ttl": "ttl_current" + "ttl_current": "ttl_current" }, "use_testnet": "False", "sandbox_dataset": "crypto_stellar_internal_sandbox", From 2b8866f086a8fc034cf1ab898e15c320b28fbae9 Mon Sep 17 00:00:00 2001 From: sydneynotthecity Date: Wed, 1 May 2024 16:46:09 -0500 Subject: [PATCH 09/14] Remove sql breaking parens --- dags/queries/create_table.sql | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dags/queries/create_table.sql b/dags/queries/create_table.sql index d76e33d5..3f5a8569 100644 --- a/dags/queries/create_table.sql +++ b/dags/queries/create_table.sql @@ -5,5 +5,5 @@ options (partition_expiration_days = 180) as ( from `{project_id}.{dataset_id}.{table_id}` where batch_run_date >= date_trunc(date_sub(current_date(), interval 6 month), day) - and batch_run_date < date_trunc((current_date(), day)) + and batch_run_date < date_trunc(current_date(), day) ) From 4f79bd28a6a9581ebc80079357732613a0d5311d Mon Sep 17 00:00:00 2001 From: sydneynotthecity Date: Wed, 1 May 2024 16:54:50 -0500 Subject: [PATCH 10/14] remove diagnostic events, not a table --- airflow_variables_dev.json | 3 +-- airflow_variables_prod.json | 3 +-- 2 files changed, 2 insertions(+), 4 deletions(-) diff --git a/airflow_variables_dev.json b/airflow_variables_dev.json index cf995dcb..f138e4e8 100644 --- a/airflow_variables_dev.json +++ b/airflow_variables_dev.json @@ -235,8 +235,7 @@ "contract_data": "contract_data", "contract_code": "contract_code", "config_settings": "config_settings", - "ttl": "ttl", - "diagnostic_events": "diagnostic_events" + "ttl": "ttl" }, "task_timeout": { "build_batch_stats": 180, diff --git a/airflow_variables_prod.json b/airflow_variables_prod.json index c7cc362f..ce758ef0 100644 --- a/airflow_variables_prod.json +++ b/airflow_variables_prod.json @@ -230,8 +230,7 @@ "contract_data": "contract_data", "contract_code": "contract_code", "config_settings": "config_settings", - "ttl": "ttl", - "diagnostic_events": "diagnostic_events" + "ttl": "ttl" }, "task_timeout": { "build_batch_stats": 180, From 22601ee126ff8baff0ee68b1b1d6c2c801b3bd2f Mon Sep 17 00:00:00 2001 From: sydneynotthecity Date: Wed, 1 May 2024 17:02:48 -0500 Subject: [PATCH 11/14] fix batch stats diag events --- dags/history_tables_dag.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dags/history_tables_dag.py b/dags/history_tables_dag.py index 28e32ebd..97cf00e6 100644 --- a/dags/history_tables_dag.py +++ b/dags/history_tables_dag.py @@ -68,7 +68,7 @@ write_trade_stats = build_batch_stats(dag, table_names["trades"]) write_effects_stats = build_batch_stats(dag, table_names["effects"]) write_tx_stats = build_batch_stats(dag, table_names["transactions"]) -write_diagnostic_events_stats = build_batch_stats(dag, table_names["diagnostic_events"]) +write_diagnostic_events_stats = build_batch_stats(dag, "diagnostic_events") write_ledger_stats = build_batch_stats(dag, table_names["ledgers"]) write_asset_stats = build_batch_stats(dag, table_names["assets"]) From 2c14cefe5ca09b27e8e372a16c4ac8903cf210a2 Mon Sep 17 00:00:00 2001 From: sydneynotthecity Date: Wed, 1 May 2024 17:16:22 -0500 Subject: [PATCH 12/14] Update user macro for batch run date --- dags/sandbox_update_dag.py | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/dags/sandbox_update_dag.py b/dags/sandbox_update_dag.py index 196dd41d..896d87a4 100644 --- a/dags/sandbox_update_dag.py +++ b/dags/sandbox_update_dag.py @@ -37,6 +37,13 @@ BQ_DATASET = Variable.get("bq_dataset") SANDBOX_DATASET = Variable.get("sandbox_dataset") + user_defined_macros = ( + { + "subtract_data_interval": macros.subtract_data_interval, + "batch_run_date_as_datetime_string": macros.batch_run_date_as_datetime_string, + }, + ) + batch_run_date = "{{ batch_run_date_as_datetime_string(dag, data_interval_start) }}" start_tables_task = EmptyOperator(task_id="start_tables_task") From 302ca32e80a9f8adbdc638a9deafe52b9c3ff02d Mon Sep 17 00:00:00 2001 From: sydneynotthecity Date: Wed, 1 May 2024 17:24:06 -0500 Subject: [PATCH 13/14] move macros into dag definition --- dags/sandbox_update_dag.py | 12 +++++------- 1 file changed, 5 insertions(+), 7 deletions(-) diff --git a/dags/sandbox_update_dag.py b/dags/sandbox_update_dag.py index 896d87a4..48945f3b 100644 --- a/dags/sandbox_update_dag.py +++ b/dags/sandbox_update_dag.py @@ -30,20 +30,18 @@ schedule_interval="0 1 * * *", params={"alias": "sandbox_dataset"}, user_defined_filters={"fromjson": lambda s: loads(s)}, + render_template_as_native_obj=True, catchup=True, + user_defined_macros={ + "subtract_data_interval": macros.subtract_data_interval, + "batch_run_date_as_datetime_string": macros.batch_run_date_as_datetime_string, + }, ) as dag: TABLES_ID = Variable.get("table_ids", deserialize_json=True) PROJECT = Variable.get("bq_project") BQ_DATASET = Variable.get("bq_dataset") SANDBOX_DATASET = Variable.get("sandbox_dataset") - user_defined_macros = ( - { - "subtract_data_interval": macros.subtract_data_interval, - "batch_run_date_as_datetime_string": macros.batch_run_date_as_datetime_string, - }, - ) - batch_run_date = "{{ batch_run_date_as_datetime_string(dag, data_interval_start) }}" start_tables_task = EmptyOperator(task_id="start_tables_task") From eb052563afada5fac1334f48ccc49f1889288a88 Mon Sep 17 00:00:00 2001 From: sydneynotthecity Date: Wed, 1 May 2024 20:28:01 -0500 Subject: [PATCH 14/14] cast string date as datetime --- dags/queries/update_table.sql | 2 +- dags/sandbox_update_dag.py | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/dags/queries/update_table.sql b/dags/queries/update_table.sql index 5706674c..9b1d6337 100644 --- a/dags/queries/update_table.sql +++ b/dags/queries/update_table.sql @@ -1,4 +1,4 @@ insert into {project_id}.{target_dataset}.{table_id} select * from {project_id}.{dataset_id}.{table_id} -where date_trunc(batch_run_date, day) = date_trunc('{batch_run_date}', day) +where date_trunc(batch_run_date, day) = date_trunc(cast('{batch_run_date}' as datetime), day) diff --git a/dags/sandbox_update_dag.py b/dags/sandbox_update_dag.py index 48945f3b..1b5b8807 100644 --- a/dags/sandbox_update_dag.py +++ b/dags/sandbox_update_dag.py @@ -38,8 +38,8 @@ }, ) as dag: TABLES_ID = Variable.get("table_ids", deserialize_json=True) - PROJECT = Variable.get("bq_project") - BQ_DATASET = Variable.get("bq_dataset") + PROJECT = Variable.get("public_project") + BQ_DATASET = Variable.get("public_dataset") SANDBOX_DATASET = Variable.get("sandbox_dataset") batch_run_date = "{{ batch_run_date_as_datetime_string(dag, data_interval_start) }}"