diff --git a/airflow_variables.json b/airflow_variables.json index d2e9a992..fbca13cd 100644 --- a/airflow_variables.json +++ b/airflow_variables.json @@ -116,7 +116,7 @@ "partnership_assets__asset_activity_fact": false, "trade_agg": false }, - "dbt_image_name": "stellar/stellar-dbt:0643adc", + "dbt_image_name": "stellar/stellar-dbt:011d897", "dbt_job_execution_timeout_seconds": 6000, "dbt_job_retries": 1, "dbt_keyfile_profile": "", @@ -130,7 +130,7 @@ "dbt_token_uri": "https://oauth2.googleapis.com/token", "gcs_exported_data_bucket_name": "us-central1-hubble-2-d948d67b-bucket", "gcs_exported_object_prefix": "dag-exported", - "image_name": "stellar/stellar-etl:6211230", + "image_name": "stellar/stellar-etl:e9c803c", "image_output_path": "/etl/exported_data/", "image_pull_policy": "IfNotPresent", "kube_config_location": "", diff --git a/airflow_variables_dev.json b/airflow_variables_dev.json index 8da2f0ad..fbdc4f87 100644 --- a/airflow_variables_dev.json +++ b/airflow_variables_dev.json @@ -94,7 +94,7 @@ "partnership_assets__account_holders_activity_fact": true, "partnership_assets__asset_activity_fact": true }, - "dbt_image_name": "stellar/stellar-dbt:0643adc", + "dbt_image_name": "stellar/stellar-dbt:011d897", "dbt_job_execution_timeout_seconds": 300, "dbt_job_retries": 1, "dbt_keyfile_profile": "", diff --git a/dags/asset_pricing_pipeline_dag.py b/dags/asset_pricing_pipeline_dag.py index b154a6fe..14120229 100644 --- a/dags/asset_pricing_pipeline_dag.py +++ b/dags/asset_pricing_pipeline_dag.py @@ -1,4 +1,5 @@ import datetime +import json from airflow import DAG from airflow.models.variable import Variable diff --git a/dags/marts_tables_dag.py b/dags/marts_tables_dag.py index 98626d38..5c447d50 100644 --- a/dags/marts_tables_dag.py +++ b/dags/marts_tables_dag.py @@ -10,16 +10,16 @@ dag = DAG( "marts_tables", default_args=get_default_dag_args(), - start_date=datetime.datetime(2023, 4, 4, 0, 0), + start_date=datetime.datetime(2015, 9, 30), description="This DAG runs dbt to create the tables for the models in marts/ but not any marts subdirectories.", schedule_interval="0 11 * * *", # Daily 11 AM UTC params={}, - catchup=False, + catchup=True, + max_active_runs=1, ) # tasks for staging tables for marts stg_history_transactions = build_dbt_task(dag, "stg_history_transactions") -stg_history_ledgers = build_dbt_task(dag, "stg_history_ledgers") stg_history_assets = build_dbt_task(dag, "stg_history_assets") stg_history_trades = build_dbt_task(dag, "stg_history_trades") @@ -31,11 +31,12 @@ # tasks for intermediate asset stats tables int_meaningful_asset_prices = build_dbt_task(dag, "int_meaningful_asset_prices") +int_asset_stats_agg = build_dbt_task(dag, "int_asset_stats_agg") stg_excluded_accounts = build_dbt_task(dag, "stg_excluded_accounts") stg_xlm_to_usd = build_dbt_task(dag, "stg_xlm_to_usd") # tasks for marts tables -agg_network_stats = build_dbt_task(dag, "agg_network_stats") +network_stats_agg = build_dbt_task(dag, "network_stats_agg") asset_stats_agg = build_dbt_task(dag, "asset_stats_agg") fee_stats_agg = build_dbt_task(dag, "fee_stats_agg") history_assets = build_dbt_task(dag, "history_assets") @@ -44,14 +45,14 @@ # DAG task graph # graph for marts tables -agg_network_stats +network_stats_agg liquidity_providers -int_meaningful_asset_prices >> asset_stats_agg -stg_excluded_accounts >> asset_stats_agg -stg_xlm_to_usd >> asset_stats_agg +int_meaningful_asset_prices >> int_asset_stats_agg +stg_excluded_accounts >> int_asset_stats_agg +stg_xlm_to_usd >> int_asset_stats_agg +int_asset_stats_agg >> asset_stats_agg stg_history_transactions >> fee_stats_agg -stg_history_ledgers >> fee_stats_agg stg_history_assets >> history_assets