From 9db4e32a9ea30cb084e5b909bb52e6521113b5ec Mon Sep 17 00:00:00 2001 From: Simon Chow Date: Mon, 30 Oct 2023 23:59:38 -0400 Subject: [PATCH 1/2] Update mart dag timing --- dags/marts_tables_dag.py | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/dags/marts_tables_dag.py b/dags/marts_tables_dag.py index 5c447d50..61929d86 100644 --- a/dags/marts_tables_dag.py +++ b/dags/marts_tables_dag.py @@ -12,12 +12,16 @@ default_args=get_default_dag_args(), start_date=datetime.datetime(2015, 9, 30), description="This DAG runs dbt to create the tables for the models in marts/ but not any marts subdirectories.", - schedule_interval="0 11 * * *", # Daily 11 AM UTC + schedule_interval="0 17 * * *", # Daily 11 AM UTC params={}, catchup=True, max_active_runs=1, ) +wait_on_partnership_assets_dag = build_cross_deps( + dag, "wait_on_partnership_assets_pipeline", "partnership_assets_dag", time_delta=10 +) + # tasks for staging tables for marts stg_history_transactions = build_dbt_task(dag, "stg_history_transactions") stg_history_assets = build_dbt_task(dag, "stg_history_assets") @@ -48,7 +52,7 @@ network_stats_agg liquidity_providers -int_meaningful_asset_prices >> int_asset_stats_agg +wait_on_partnership_assets_dag >> int_meaningful_asset_prices >> int_asset_stats_agg stg_excluded_accounts >> int_asset_stats_agg stg_xlm_to_usd >> int_asset_stats_agg int_asset_stats_agg >> asset_stats_agg From eea5378c045b22609ffd4aaaf9974ea22687e341 Mon Sep 17 00:00:00 2001 From: Simon Chow Date: Tue, 31 Oct 2023 09:17:43 -0400 Subject: [PATCH 2/2] Add missing import --- dags/marts_tables_dag.py | 1 + 1 file changed, 1 insertion(+) diff --git a/dags/marts_tables_dag.py b/dags/marts_tables_dag.py index 61929d86..40c33936 100644 --- a/dags/marts_tables_dag.py +++ b/dags/marts_tables_dag.py @@ -2,6 +2,7 @@ from airflow import DAG from airflow.models.variable import Variable +from stellar_etl_airflow.build_cross_dependency_task import build_cross_deps from stellar_etl_airflow.build_dbt_task import build_dbt_task from stellar_etl_airflow.default import get_default_dag_args, init_sentry