From f14ef222c012739dc3802c1b042698b8ff077445 Mon Sep 17 00:00:00 2001 From: Amisha Singla Date: Wed, 2 Oct 2024 14:49:32 -0500 Subject: [PATCH] use subcommand fix --- airflow_variables_dev.json | 2 +- dags/dbt_source_data_freshness_test_dag.py | 4 ++-- dags/stellar_etl_airflow/build_dbt_task.py | 11 +++++++++-- 3 files changed, 12 insertions(+), 5 deletions(-) diff --git a/airflow_variables_dev.json b/airflow_variables_dev.json index 4776f5e1..9770af95 100644 --- a/airflow_variables_dev.json +++ b/airflow_variables_dev.json @@ -331,7 +331,6 @@ "ttl": "ttl" }, "task_sla": { - "all_sources": 720, "asset_stats": 720, "build_batch_stats": 840, "build_bq_insert_job": 1080, @@ -348,6 +347,7 @@ "enriched_history_operations": 780, "enriched_history_operations_with_exclude": 780, "fee_stats": 840, + "freshness": 720, "history_assets": 720, "liquidity_pool_trade_volume": 1140, "liquidity_pools_value": 840, diff --git a/dags/dbt_source_data_freshness_test_dag.py b/dags/dbt_source_data_freshness_test_dag.py index 337951dd..ac7b4ca2 100644 --- a/dags/dbt_source_data_freshness_test_dag.py +++ b/dags/dbt_source_data_freshness_test_dag.py @@ -31,8 +31,8 @@ # DBT tests to run source_freshness_tests = dbt_task( dag, - model_name="all_sources", - command_type="source freshness", + command_type="source", + sub_command="freshness", flag=None, tag=None, resource_cfg="dbt", diff --git a/dags/stellar_etl_airflow/build_dbt_task.py b/dags/stellar_etl_airflow/build_dbt_task.py index 3b120a6b..0e7055fc 100644 --- a/dags/stellar_etl_airflow/build_dbt_task.py +++ b/dags/stellar_etl_airflow/build_dbt_task.py @@ -61,6 +61,7 @@ def create_dbt_profile(project="prod"): def dbt_task( dag, model_name=None, + sub_command=None, tag=None, flag="select", operator="", @@ -87,7 +88,12 @@ def dbt_task( dbt_image = "{{ var.value.dbt_image_name }}" - args = [command_type, f"--{flag}"] if flag else [command_type] + args = [command_type] + if sub_command: + args.append(sub_command) + + if flag: + args.append(f"--{flag}") models = [] if tag: @@ -96,6 +102,8 @@ def dbt_task( if model_name: task_name = model_name models.append(f"{operator}{model_name}") + if sub_command: + task_name = sub_command if len(models) > 1: task_name = "multiple_models" args.append(",".join(models)) @@ -117,7 +125,6 @@ def dbt_task( logging.info(f"sh commands to run in pod: {args}") - command_type = command_type.replace(" ", "_") return KubernetesPodOperator( task_id=f"dbt_{command_type}_{task_name}", name=f"dbt_{command_type}_{task_name}",